Merge remote-tracking branch 'upstream/release-5.1' into bindings-format
This commit is contained in:
commit
1c1ae7d70e
|
@ -1557,12 +1557,12 @@ ACTOR Future<Void> statusDBBackup(Database src, Database dest, std::string tagNa
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> statusBackup(Database db, std::string tagName, int errorLimit) {
|
||||
ACTOR Future<Void> statusBackup(Database db, std::string tagName, bool showErrors) {
|
||||
try
|
||||
{
|
||||
state FileBackupAgent backupAgent;
|
||||
|
||||
std::string statusText = wait(backupAgent.getStatus(db, errorLimit, tagName));
|
||||
std::string statusText = wait(backupAgent.getStatus(db, showErrors, tagName));
|
||||
printf("%s\n", statusText.c_str());
|
||||
}
|
||||
catch (Error& e) {
|
||||
|
@ -2279,6 +2279,8 @@ int main(int argc, char* argv[]) {
|
|||
bool dryRun = false;
|
||||
std::string traceDir = "";
|
||||
std::string traceLogGroup;
|
||||
uint64_t traceRollSize = TRACE_DEFAULT_ROLL_SIZE;
|
||||
uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE;
|
||||
ESOError lastError;
|
||||
bool partial = true;
|
||||
LocalityData localities;
|
||||
|
@ -2706,6 +2708,14 @@ int main(int argc, char* argv[]) {
|
|||
}
|
||||
}
|
||||
|
||||
// Opens a trace file if trace is set (and if a trace file isn't already open)
|
||||
// For most modes, initCluster() will open a trace file, but some fdbbackup operations do not require
|
||||
// a cluster so they should use this instead.
|
||||
auto initTraceFile = [&]() {
|
||||
if(trace)
|
||||
openTraceFile(NetworkAddress(), traceRollSize, traceMaxLogsSize, traceDir, "trace", traceLogGroup);
|
||||
};
|
||||
|
||||
auto initCluster = [&](bool quiet = false) {
|
||||
auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile);
|
||||
try {
|
||||
|
@ -2784,7 +2794,7 @@ int main(int argc, char* argv[]) {
|
|||
case BACKUP_STATUS:
|
||||
if(!initCluster())
|
||||
return FDB_EXIT_ERROR;
|
||||
f = stopAfter( statusBackup(db, tagName, maxErrors) );
|
||||
f = stopAfter( statusBackup(db, tagName, true) );
|
||||
break;
|
||||
|
||||
case BACKUP_ABORT:
|
||||
|
@ -2818,6 +2828,7 @@ int main(int argc, char* argv[]) {
|
|||
break;
|
||||
|
||||
case BACKUP_EXPIRE:
|
||||
initTraceFile();
|
||||
// Must have a usable cluster if either expire DateTime options were used
|
||||
if(!expireDatetime.empty() || !expireRestorableAfterDatetime.empty()) {
|
||||
if(!initCluster())
|
||||
|
@ -2827,10 +2838,12 @@ int main(int argc, char* argv[]) {
|
|||
break;
|
||||
|
||||
case BACKUP_DELETE:
|
||||
initTraceFile();
|
||||
f = stopAfter( deleteBackupContainer(argv[0], destinationContainer) );
|
||||
break;
|
||||
|
||||
case BACKUP_DESCRIBE:
|
||||
initTraceFile();
|
||||
// If timestamp lookups are desired, require a cluster file
|
||||
if(describeTimestamps && !initCluster())
|
||||
return FDB_EXIT_ERROR;
|
||||
|
@ -2839,6 +2852,7 @@ int main(int argc, char* argv[]) {
|
|||
f = stopAfter( describeBackup(argv[0], destinationContainer, describeDeep, describeTimestamps ? Optional<Database>(db) : Optional<Database>()) );
|
||||
break;
|
||||
case BACKUP_LIST:
|
||||
initTraceFile();
|
||||
f = stopAfter( listBackup(baseUrl) );
|
||||
break;
|
||||
|
||||
|
|
|
@ -129,6 +129,5 @@
|
|||
<Exec Command="copy /y "$(OutDir)fdbbackup.exe" "$(OutDir)backup_agent.exe"" />
|
||||
<Exec Command="copy /y "$(OutDir)fdbbackup.exe" "$(OutDir)dr_agent.exe"" />
|
||||
<Exec Command="copy /y "$(OutDir)fdbbackup.exe" "$(OutDir)fdbdr.exe"" />
|
||||
<Exec Command="copy /y "$(OutDir)fdbbackup.exe" "$(OutDir)fdbblob.exe"" />
|
||||
</Target>
|
||||
</Project>
|
||||
|
|
|
@ -47,7 +47,7 @@ bin/fdbbackup: bin/coverage.fdbbackup.xml
|
|||
|
||||
bin/fdbbackup.debug: bin/fdbbackup
|
||||
|
||||
BACKUP_ALIASES = fdbrestore fdbblob fdbdr dr_agent backup_agent
|
||||
BACKUP_ALIASES = fdbrestore fdbdr dr_agent backup_agent
|
||||
|
||||
$(addprefix bin/, $(BACKUP_ALIASES)): bin/fdbbackup
|
||||
@[ -f $@ ] || (echo "SymLinking $@" && ln -s fdbbackup $@)
|
||||
|
|
|
@ -624,7 +624,8 @@ void getBackupDRTags(StatusObjectReader &statusObjCluster, const char *context,
|
|||
for(auto itr : tags.obj()) {
|
||||
JSONDoc tag(itr.second);
|
||||
bool running = false;
|
||||
if(tag.tryGet("running_backup", running)) {
|
||||
tag.tryGet("running_backup", running);
|
||||
if(running) {
|
||||
std::string uid;
|
||||
if(tag.tryGet("mutation_stream_id", uid)) {
|
||||
tagMap[itr.first] = uid;
|
||||
|
|
|
@ -265,7 +265,7 @@ public:
|
|||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return abortBackup(tr, tagName); });
|
||||
}
|
||||
|
||||
Future<std::string> getStatus(Database cx, int errorLimit, std::string tagName);
|
||||
Future<std::string> getStatus(Database cx, bool showErrors, std::string tagName);
|
||||
|
||||
Future<Version> getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
||||
void setLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName, Version version);
|
||||
|
@ -530,10 +530,31 @@ public:
|
|||
}
|
||||
|
||||
// lastError is a pair of error message and timestamp expressed as an int64_t
|
||||
KeyBackedProperty<std::pair<std::string, int64_t>> lastError() {
|
||||
KeyBackedProperty<std::pair<std::string, Version>> lastError() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
KeyBackedMap<int64_t, std::pair<std::string, Version>> lastErrorPerType() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
// Updates the error per type map and the last error property
|
||||
Future<Void> updateErrorInfo(Database cx, Error e, std::string message) {
|
||||
// Avoid capture of this ptr
|
||||
auto © = *this;
|
||||
|
||||
return runRYWTransaction(cx, [=] (Reference<ReadYourWritesTransaction> tr) mutable {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return map(tr->getReadVersion(), [=] (Version v) mutable {
|
||||
copy.lastError().set(tr, {message, v});
|
||||
copy.lastErrorPerType().set(tr, e.code(), {message, v});
|
||||
return Void();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
protected:
|
||||
UID uid;
|
||||
Key prefix;
|
||||
|
@ -715,7 +736,8 @@ public:
|
|||
if(e.code() == error_code_key_not_found)
|
||||
t.backtrace();
|
||||
std::string msg = format("ERROR: %s %s", e.what(), details.c_str());
|
||||
return lastError().set(cx, {msg, (int64_t)now()});
|
||||
|
||||
return updateErrorInfo(cx, e, msg);
|
||||
}
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -962,7 +962,7 @@ private:
|
|||
// Backup files to under a single folder prefix with subfolders for each named backup
|
||||
static const std::string DATAFOLDER;
|
||||
|
||||
// The metafolder contains keys for which user-named backups exist. Backup names can contain an arbitrary
|
||||
// Indexfolder contains keys for which user-named backups exist. Backup names can contain an arbitrary
|
||||
// number of slashes so the backup names are kept in a separate folder tree from their actual data.
|
||||
static const std::string INDEXFOLDER;
|
||||
|
||||
|
@ -1080,36 +1080,8 @@ public:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> deleteContainer_impl(Reference<BackupContainerBlobStore> bc, int *pNumDeleted) {
|
||||
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
|
||||
state Future<Void> done = bc->m_bstore->listBucketStream(BUCKET, resultStream, bc->dataPath(""), '/', std::numeric_limits<int>::max());
|
||||
state std::list<Future<Void>> deleteFutures;
|
||||
loop {
|
||||
choose {
|
||||
when(Void _ = wait(done)) {
|
||||
break;
|
||||
}
|
||||
when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) {
|
||||
for(auto &object : list.objects) {
|
||||
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
|
||||
deleteFutures.push_back(map(bc->m_bstore->deleteObject(BUCKET, object.name), [pNumDeletedCopy](Void) {
|
||||
if(pNumDeletedCopy != nullptr)
|
||||
++*pNumDeletedCopy;
|
||||
return Void();
|
||||
}));
|
||||
}
|
||||
|
||||
while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) {
|
||||
Void _ = wait(deleteFutures.front());
|
||||
deleteFutures.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while(deleteFutures.size() > 0) {
|
||||
Void _ = wait(deleteFutures.front());
|
||||
deleteFutures.pop_front();
|
||||
}
|
||||
// First delete everything under the data prefix in the bucket
|
||||
Void _ = wait(bc->m_bstore->deleteRecursively(BUCKET, bc->dataPath(""), pNumDeleted));
|
||||
|
||||
// Now that all files are deleted, delete the index entry
|
||||
Void _ = wait(bc->m_bstore->deleteObject(BUCKET, bc->indexEntry()));
|
||||
|
@ -1357,23 +1329,23 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
|
|||
state int64_t versionShift = g_random->randomInt64(0, std::numeric_limits<Version>::max() - 500);
|
||||
|
||||
state Reference<IBackupFile> log1 = wait(c->writeLogFile(100 + versionShift, 150 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, log1, 0));
|
||||
|
||||
state Reference<IBackupFile> log2 = wait(c->writeLogFile(150 + versionShift, 300 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000)));
|
||||
|
||||
state Reference<IBackupFile> range1 = wait(c->writeRangeFile(160 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000)));
|
||||
|
||||
state Reference<IBackupFile> range2 = wait(c->writeRangeFile(300 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000)));
|
||||
|
||||
state Reference<IBackupFile> range3 = wait(c->writeRangeFile(310 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000)));
|
||||
|
||||
Void _ = wait(c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size()));
|
||||
Void _ = wait(
|
||||
writeAndVerifyFile(c, log1, 0)
|
||||
&& writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000))
|
||||
&& writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000))
|
||||
&& writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000))
|
||||
&& writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000))
|
||||
);
|
||||
|
||||
Void _ = wait(c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size()));
|
||||
Void _ = wait(
|
||||
c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size())
|
||||
&& c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size())
|
||||
);
|
||||
|
||||
printf("Checking file list dump\n");
|
||||
FullBackupListing listing = wait(c->dumpFileList());
|
||||
|
|
|
@ -178,7 +178,8 @@ public:
|
|||
if(e.code() == error_code_key_not_found)
|
||||
t.backtrace();
|
||||
std::string msg = format("ERROR: %s (%s)", details.c_str(), e.what());
|
||||
return lastError().set(cx, {msg, (int64_t)now()});
|
||||
|
||||
return updateErrorInfo(cx, e, msg);
|
||||
}
|
||||
|
||||
Key mutationLogPrefix() {
|
||||
|
@ -283,7 +284,7 @@ ACTOR Future<std::string> RestoreConfig::getProgress_impl(RestoreConfig restore,
|
|||
state Future<StringRef> status = restore.stateText(tr);
|
||||
state Future<Version> lag = restore.getApplyVersionLag(tr);
|
||||
state Future<std::string> tag = restore.tag().getD(tr);
|
||||
state Future<std::pair<std::string, int64_t>> lastError = restore.lastError().getD(tr);
|
||||
state Future<std::pair<std::string, Version>> lastError = restore.lastError().getD(tr);
|
||||
|
||||
// restore might no longer be valid after the first wait so make sure it is not needed anymore.
|
||||
state UID uid = restore.getUid();
|
||||
|
@ -291,7 +292,7 @@ ACTOR Future<std::string> RestoreConfig::getProgress_impl(RestoreConfig restore,
|
|||
|
||||
std::string errstr = "None";
|
||||
if(lastError.get().second != 0)
|
||||
errstr = format("'%s' %llds ago.\n", lastError.get().first.c_str(), (int64_t)now() - lastError.get().second);
|
||||
errstr = format("'%s' %llds ago.\n", lastError.get().first.c_str(), (tr->getReadVersion().get() - lastError.get().second) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND );
|
||||
|
||||
TraceEvent("FileRestoreProgress")
|
||||
.detail("RestoreUID", uid)
|
||||
|
@ -1748,9 +1749,10 @@ namespace fileBackup {
|
|||
state Version beginVersion = Params.beginVersion().get(task);
|
||||
state Version endVersion = Params.endVersion().get(task);
|
||||
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
|
||||
state BackupConfig config(task);
|
||||
|
||||
if(Params.fileSize().exists(task)) {
|
||||
BackupConfig(task).logBytesWritten().atomicOp(tr, Params.fileSize().get(task), MutationRef::AddValue);
|
||||
config.logBytesWritten().atomicOp(tr, Params.fileSize().get(task), MutationRef::AddValue);
|
||||
}
|
||||
|
||||
if (Params.addBackupLogRangeTasks().get(task)) {
|
||||
|
@ -1761,7 +1763,7 @@ namespace fileBackup {
|
|||
}
|
||||
|
||||
if(endVersion > beginVersion) {
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, task->params[FileBackupAgent::keyConfigLogUid]);
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, config.getUidAsKey());
|
||||
for (auto & rng : ranges)
|
||||
tr->clear(rng);
|
||||
}
|
||||
|
@ -1799,10 +1801,17 @@ namespace fileBackup {
|
|||
state bool stopWhenDone;
|
||||
state Optional<Version> restorableVersion;
|
||||
state EBackupState backupState;
|
||||
state Optional<std::string> tag;
|
||||
|
||||
Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState));
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState)
|
||||
&& store(config.tag().get(tr), tag));
|
||||
|
||||
// If restorable, update the last restorable version for this tag
|
||||
if(restorableVersion.present() && tag.present()) {
|
||||
FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get());
|
||||
}
|
||||
|
||||
// If the backup is restorable but the state is not differential then set state to differential
|
||||
if(restorableVersion.present() && backupState != BackupAgentBase::STATE_DIFFERENTIAL)
|
||||
|
@ -2014,11 +2023,18 @@ namespace fileBackup {
|
|||
state EBackupState backupState;
|
||||
state Optional<Version> restorableVersion;
|
||||
state Optional<Version> firstSnapshotEndVersion;
|
||||
state Optional<std::string> tag;
|
||||
|
||||
Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState)
|
||||
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
|
||||
&& store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion));
|
||||
&& store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion)
|
||||
&& store(config.tag().get(tr), tag));
|
||||
|
||||
// If restorable, update the last restorable version for this tag
|
||||
if(restorableVersion.present() && tag.present()) {
|
||||
FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get());
|
||||
}
|
||||
|
||||
if(!firstSnapshotEndVersion.present()) {
|
||||
config.firstSnapshotEndVersion().set(tr, Params.endVersion().get(task));
|
||||
|
@ -3460,7 +3476,7 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<std::string> getStatus(FileBackupAgent* backupAgent, Database cx, int errorLimit, std::string tagName) {
|
||||
ACTOR static Future<std::string> getStatus(FileBackupAgent* backupAgent, Database cx, bool showErrors, std::string tagName) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state std::string statusText;
|
||||
|
||||
|
@ -3487,8 +3503,15 @@ public:
|
|||
statusText += "No previous backups found.\n";
|
||||
} else {
|
||||
state std::string backupStatus(BackupAgentBase::getStateText(backupState));
|
||||
state Reference<IBackupContainer> bc = wait(config.backupContainer().getOrThrow(tr));
|
||||
state Optional<Version> stopVersion = wait(config.getLatestRestorableVersion(tr));
|
||||
state Reference<IBackupContainer> bc;
|
||||
state Optional<Version> latestRestorableVersion;
|
||||
state Version recentReadVersion;
|
||||
|
||||
Void _ = wait( store(config.getLatestRestorableVersion(tr), latestRestorableVersion)
|
||||
&& store(config.backupContainer().getOrThrow(tr), bc)
|
||||
&& store(tr->getReadVersion(), recentReadVersion)
|
||||
);
|
||||
|
||||
bool snapshotProgress = false;
|
||||
|
||||
switch (backupState) {
|
||||
|
@ -3504,7 +3527,7 @@ public:
|
|||
snapshotProgress = true;
|
||||
break;
|
||||
case BackupAgentBase::STATE_COMPLETED:
|
||||
statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " completed at version " + format("%lld", stopVersion.orDefault(-1)) + ".\n";
|
||||
statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " completed at version " + format("%lld", latestRestorableVersion.orDefault(-1)) + ".\n";
|
||||
break;
|
||||
default:
|
||||
statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " " + backupStatus + ".\n";
|
||||
|
@ -3513,29 +3536,45 @@ public:
|
|||
|
||||
if(snapshotProgress) {
|
||||
state int64_t snapshotInterval;
|
||||
state Version recentReadVersion;
|
||||
state Version snapshotBeginVersion;
|
||||
state Version snapshotTargetEndVersion;
|
||||
|
||||
Void _ = wait(store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
|
||||
Void _ = wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
|
||||
&& store(config.snapshotTargetEndVersion().getOrThrow(tr), snapshotTargetEndVersion)
|
||||
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotInterval)
|
||||
&& store(tr->getReadVersion(), recentReadVersion));
|
||||
);
|
||||
|
||||
statusText += format("Snapshot interval is %lld seconds. ", snapshotInterval);
|
||||
if(backupState == BackupAgentBase::STATE_DIFFERENTIAL)
|
||||
statusText += format("Current snapshot progress target is %3.2f%%\n", 100.0 * (recentReadVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion)) ;
|
||||
statusText += format("Current snapshot progress target is %3.2f%% (>100%% means the snapshot is supposed to be done)\n", 100.0 * (recentReadVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion)) ;
|
||||
else
|
||||
statusText += "The initial snapshot is still running.\n";
|
||||
}
|
||||
}
|
||||
|
||||
// Append the errors, if requested
|
||||
if (errorLimit > 0 && config.getUid().isValid()) {
|
||||
Optional<std::pair<std::string, int64_t>> errMsg = wait(config.lastError().get(tr));
|
||||
if (errMsg.present()) {
|
||||
statusText += "WARNING: Some backup agents have reported issues:\n";
|
||||
statusText += format("[%lld]: %s\n", errMsg.get().second, errMsg.get().first.c_str());
|
||||
if (showErrors) {
|
||||
KeyBackedMap<int64_t, std::pair<std::string, Version>>::PairsType errors = wait(config.lastErrorPerType().getRange(tr, 0, std::numeric_limits<int>::max(), CLIENT_KNOBS->TOO_MANY));
|
||||
std::string recentErrors;
|
||||
std::string pastErrors;
|
||||
|
||||
for(auto &e : errors) {
|
||||
Version v = e.second.second;
|
||||
std::string msg = format("%s (%lld seconds ago)\n", e.second.first.c_str(),
|
||||
(recentReadVersion - v) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND);
|
||||
|
||||
// If error version is at or more recent than the latest restorable version then it could be inhibiting progress
|
||||
if(v >= latestRestorableVersion.orDefault(0)) {
|
||||
recentErrors += msg;
|
||||
}
|
||||
else {
|
||||
pastErrors += msg;
|
||||
}
|
||||
}
|
||||
|
||||
if(!recentErrors.empty())
|
||||
statusText += "Errors possibly preventing progress:\n" + recentErrors;
|
||||
if(!pastErrors.empty())
|
||||
statusText += "Older errors:\n" + pastErrors;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3756,8 +3795,8 @@ Future<Void> FileBackupAgent::abortBackup(Reference<ReadYourWritesTransaction> t
|
|||
return FileBackupAgentImpl::abortBackup(this, tr, tagName);
|
||||
}
|
||||
|
||||
Future<std::string> FileBackupAgent::getStatus(Database cx, int errorLimit, std::string tagName) {
|
||||
return FileBackupAgentImpl::getStatus(this, cx, errorLimit, tagName);
|
||||
Future<std::string> FileBackupAgent::getStatus(Database cx, bool showErrors, std::string tagName) {
|
||||
return FileBackupAgentImpl::getStatus(this, cx, showErrors, tagName);
|
||||
}
|
||||
|
||||
Future<Version> FileBackupAgent::getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName) {
|
||||
|
|
|
@ -1242,7 +1242,7 @@ ThreadFuture<Reference<ICluster>> MultiVersionApi::createCluster(const char *clu
|
|||
lock.enter();
|
||||
if(!networkSetup) {
|
||||
lock.leave();
|
||||
throw network_not_setup();
|
||||
return network_not_setup();
|
||||
}
|
||||
lock.leave();
|
||||
|
||||
|
|
|
@ -1307,7 +1307,7 @@ Future< Standalone<VectorRef<const char*> >> ReadYourWritesTransaction::getAddre
|
|||
|
||||
// If key >= allKeys.end, then our resulting address vector will be empty.
|
||||
|
||||
Future< Standalone<VectorRef<const char*> >> result = tr.getAddressesForKey(key);
|
||||
Future< Standalone<VectorRef<const char*> >> result = waitOrError(tr.getAddressesForKey(key), resetPromise.getFuture());
|
||||
reading.add( success( result ) );
|
||||
return result;
|
||||
}
|
||||
|
@ -1681,6 +1681,14 @@ Future<Void> ReadYourWritesTransaction::commit() {
|
|||
return RYWImpl::commit( this );
|
||||
}
|
||||
|
||||
Future<Standalone<StringRef>> ReadYourWritesTransaction::getVersionstamp() {
|
||||
if(checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
}
|
||||
|
||||
return waitOrError(tr.getVersionstamp(), resetPromise.getFuture());
|
||||
}
|
||||
|
||||
void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
|
||||
switch(option) {
|
||||
case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
|
||||
|
|
|
@ -103,13 +103,7 @@ public:
|
|||
|
||||
Future<Void> commit();
|
||||
Version getCommittedVersion() { return tr.getCommittedVersion(); }
|
||||
Future<Standalone<StringRef>> getVersionstamp() {
|
||||
if(checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
}
|
||||
|
||||
return tr.getVersionstamp();
|
||||
}
|
||||
Future<Standalone<StringRef>> getVersionstamp();
|
||||
|
||||
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
|
||||
|
||||
|
|
|
@ -146,98 +146,113 @@ void JSONDoc::mergeInto(json_spirit::mObject &dst, const json_spirit::mObject &s
|
|||
}
|
||||
}
|
||||
|
||||
void JSONDoc::mergeValueInto(json_spirit::mValue &d, const json_spirit::mValue &s) {
|
||||
if(s.is_null())
|
||||
void JSONDoc::mergeValueInto(json_spirit::mValue &dst, const json_spirit::mValue &src) {
|
||||
if(src.is_null())
|
||||
return;
|
||||
|
||||
if(d.is_null()) {
|
||||
d = s;
|
||||
if(dst.is_null()) {
|
||||
dst = src;
|
||||
return;
|
||||
}
|
||||
|
||||
if(d.type() != s.type()) {
|
||||
// Skip errors already found
|
||||
if(d.type() == json_spirit::obj_type && d.get_obj().count("ERROR"))
|
||||
// Do nothing if d is already an error
|
||||
if(dst.type() == json_spirit::obj_type && dst.get_obj().count("ERROR"))
|
||||
return;
|
||||
d = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", d}, {"b", s}});
|
||||
|
||||
if(dst.type() != src.type()) {
|
||||
dst = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", dst}, {"b", src}});
|
||||
return;
|
||||
}
|
||||
|
||||
switch(d.type()) {
|
||||
switch(dst.type()) {
|
||||
case json_spirit::obj_type:
|
||||
{
|
||||
std::string op = getOperator(d.get_obj());
|
||||
// Refs to the objects, for convenience.
|
||||
json_spirit::mObject &aObj = dst.get_obj();
|
||||
const json_spirit::mObject &bObj = src.get_obj();
|
||||
|
||||
//printf("Operator: %s\n", op.c_str());
|
||||
const std::string &op = getOperator(aObj);
|
||||
const std::string &opB = getOperator(bObj);
|
||||
|
||||
// Operators must be the same, which could mean both are empty (if these objects are not operators)
|
||||
if(op != opB) {
|
||||
dst = json_spirit::mObject({ {"ERROR", "Operators do not match"}, {"a", dst}, {"b", src} });
|
||||
break;
|
||||
}
|
||||
|
||||
// If objects are not operators then defer to mergeInto
|
||||
if(op.empty()) {
|
||||
mergeInto(d.get_obj(), s.get_obj());
|
||||
mergeInto(dst.get_obj(), src.get_obj());
|
||||
break;
|
||||
}
|
||||
|
||||
// Refs to the operator objects to combine
|
||||
const json_spirit::mObject &op_a = d.get_obj();
|
||||
const json_spirit::mObject &op_b = s.get_obj();
|
||||
|
||||
if(!op_b.count(op)) {
|
||||
d = json_spirit::mObject({{"ERROR", "Operators do not match"}, {"s", s}, {"d", d}});
|
||||
break;
|
||||
}
|
||||
|
||||
const json_spirit::mValue &a = d.get_obj().at(op);
|
||||
const json_spirit::mValue &b = s.get_obj().at(op);
|
||||
// Get the operator values
|
||||
json_spirit::mValue &a = aObj.at(op);
|
||||
const json_spirit::mValue &b = bObj.at(op);
|
||||
|
||||
// First try the operators that are type-agnostic
|
||||
try {
|
||||
d = mergeOperator<json_spirit::mValue>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperator<json_spirit::mValue>(op, aObj, bObj, a, b);
|
||||
return;
|
||||
} catch(std::exception &e) {
|
||||
}
|
||||
|
||||
// If that didn't work, the types must match or we have no operators left to try.
|
||||
// Now try type and type pair specific operators
|
||||
// First, if types are incompatible try to make them compatible or return an error
|
||||
if(a.type() != b.type()) {
|
||||
d = json_spirit::mObject({{"ERROR", "Types do not match"}, {"s", s}, {"d", d}});
|
||||
// It's actually okay if the type mismatch is double vs int since once can be converted to the other.
|
||||
if( (a.type() == json_spirit::int_type && b.type() == json_spirit::real_type)
|
||||
|| (b.type() == json_spirit::int_type && a.type() == json_spirit::real_type) )
|
||||
{
|
||||
// Convert d's op value (which a is a reference to) to a double so that the
|
||||
// switch block below will do the operation with doubles.
|
||||
a = a.get_real();
|
||||
}
|
||||
else {
|
||||
// Otherwise, output an error as the types do not match
|
||||
dst = json_spirit::mObject({{"ERROR", "Incompatible operator value types"}, {"a", dst}, {"b", src}});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Now try the type-specific operators.
|
||||
try {
|
||||
switch(a.type()) {
|
||||
case json_spirit::bool_type:
|
||||
d = mergeOperatorWrapper<bool>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<bool>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::int_type:
|
||||
d = mergeOperatorWrapper<int64_t>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<int64_t>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::real_type:
|
||||
d = mergeOperatorWrapper<double>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<double>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::str_type:
|
||||
d = mergeOperatorWrapper<std::string>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<std::string>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::array_type:
|
||||
d = mergeOperatorWrapper<json_spirit::mArray>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<json_spirit::mArray>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::obj_type:
|
||||
d = mergeOperatorWrapper<json_spirit::mObject>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<json_spirit::mObject>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::null_type:
|
||||
break;
|
||||
}
|
||||
} catch(...) {
|
||||
d = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}});
|
||||
dst = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case json_spirit::array_type:
|
||||
for(auto &ai : s.get_array())
|
||||
d.get_array().push_back(ai);
|
||||
for(auto &ai : src.get_array())
|
||||
dst.get_array().push_back(ai);
|
||||
break;
|
||||
|
||||
default:
|
||||
if(d != s)
|
||||
d = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", d}, {"b", s}});
|
||||
if(dst != src)
|
||||
dst = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", dst}, {"b", src}});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -389,20 +389,19 @@ public:
|
|||
}));
|
||||
}
|
||||
} catch(Error &e) {
|
||||
state Error e2 = e;
|
||||
TraceEvent(SevWarn, "TB_ExecuteFailure")
|
||||
.detail("TaskUID", task->key.printable())
|
||||
.detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable())
|
||||
.detail("Priority", task->getPriority())
|
||||
.error(e);
|
||||
try {
|
||||
Void _ = wait(taskFunc->handleError(cx, task, e2));
|
||||
Void _ = wait(taskFunc->handleError(cx, task, e));
|
||||
} catch(Error &e) {
|
||||
TraceEvent(SevWarn, "TB_ExecuteFailureLogErrorFailed")
|
||||
.detail("TaskUID", task->key.printable())
|
||||
.detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable())
|
||||
.detail("Priority", task->getPriority())
|
||||
.error(e2);
|
||||
.error(e); // output handleError() error instead of original task error
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -215,15 +215,23 @@ Future<Void> BlobStoreEndpoint::deleteObject(std::string const &bucket, std::str
|
|||
return deleteObject_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, object);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteBucket_impl(Reference<BlobStoreEndpoint> b, std::string bucket, int *pNumDeleted) {
|
||||
ACTOR Future<Void> deleteRecursively_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string prefix, int *pNumDeleted) {
|
||||
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
|
||||
state Future<Void> done = b->listBucketStream(bucket, resultStream);
|
||||
state std::vector<Future<Void>> deleteFutures;
|
||||
// Start a recursive parallel listing which will send results to resultStream as they are received
|
||||
state Future<Void> done = b->listBucketStream(bucket, resultStream, prefix, '/', std::numeric_limits<int>::max());
|
||||
// Wrap done in an actor which will send end_of_stream since listBucketStream() does not (so that many calls can write to the same stream)
|
||||
done = map(done, [=](Void) {
|
||||
resultStream.sendError(end_of_stream());
|
||||
return Void();
|
||||
});
|
||||
|
||||
state std::list<Future<Void>> deleteFutures;
|
||||
try {
|
||||
loop {
|
||||
choose {
|
||||
when(Void _ = wait(done)) {
|
||||
break;
|
||||
}
|
||||
// Throw if done throws, otherwise don't stop until end_of_stream
|
||||
when(Void _ = wait(done)) {}
|
||||
|
||||
when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) {
|
||||
for(auto &object : list.objects) {
|
||||
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
|
||||
|
@ -235,14 +243,28 @@ ACTOR Future<Void> deleteBucket_impl(Reference<BlobStoreEndpoint> b, std::string
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is just a precaution to avoid having too many outstanding delete actors waiting to run
|
||||
while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) {
|
||||
Void _ = wait(deleteFutures.front());
|
||||
deleteFutures.pop_front();
|
||||
}
|
||||
}
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_end_of_stream)
|
||||
throw;
|
||||
}
|
||||
|
||||
while(deleteFutures.size() > 0) {
|
||||
Void _ = wait(deleteFutures.front());
|
||||
deleteFutures.pop_front();
|
||||
}
|
||||
|
||||
Void _ = wait(waitForAll(deleteFutures));
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> BlobStoreEndpoint::deleteBucket(std::string const &bucket, int *pNumDeleted) {
|
||||
return deleteBucket_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, pNumDeleted);
|
||||
Future<Void> BlobStoreEndpoint::deleteRecursively(std::string const &bucket, std::string prefix, int *pNumDeleted) {
|
||||
return deleteRecursively_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, pNumDeleted);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> createBucket_impl(Reference<BlobStoreEndpoint> b, std::string bucket) {
|
||||
|
@ -274,35 +296,35 @@ Future<int64_t> BlobStoreEndpoint::objectSize(std::string const &bucket, std::st
|
|||
// Try to read a file, parse it as JSON, and return the resulting document.
|
||||
// It will NOT throw if any errors are encountered, it will just return an empty
|
||||
// JSON object and will log trace events for the errors encountered.
|
||||
ACTOR Future<json_spirit::mObject> tryReadJSONFile(std::string path) {
|
||||
ACTOR Future<Optional<json_spirit::mObject>> tryReadJSONFile(std::string path) {
|
||||
state std::string content;
|
||||
|
||||
// Event type to be logged in the event of an exception
|
||||
state const char *errorEventType = "BlobCredentialFileError";
|
||||
|
||||
try {
|
||||
state Reference<IAsyncFile> f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY, 0));
|
||||
state Reference<IAsyncFile> f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0));
|
||||
state int64_t size = wait(f->size());
|
||||
state Standalone<StringRef> buf = makeString(size);
|
||||
int r = wait(f->read(mutateString(buf), size, 0));
|
||||
ASSERT(r == size);
|
||||
content = buf.toString();
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
TraceEvent(SevWarn, "BlobCredentialFileError").detail("File", path).error(e).suppressFor(60, true);
|
||||
return json_spirit::mObject();
|
||||
}
|
||||
|
||||
try {
|
||||
// Any exceptions from hehre forward are parse failures
|
||||
errorEventType = "BlobCredentialFileParseFailed";
|
||||
json_spirit::mValue json;
|
||||
json_spirit::read_string(content, json);
|
||||
if(json.type() == json_spirit::obj_type)
|
||||
return json.get_obj();
|
||||
else
|
||||
TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").detail("File", path).suppressFor(60, true);
|
||||
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
TraceEvent(SevWarn, "BlobCredentialFileParseFailed").detail("File", path).error(e).suppressFor(60, true);
|
||||
TraceEvent(SevWarn, errorEventType).detail("File", path).error(e).suppressFor(60, true);
|
||||
}
|
||||
|
||||
return json_spirit::mObject();
|
||||
return Optional<json_spirit::mObject>();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
||||
|
@ -310,7 +332,7 @@ ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
|||
if(pFiles == nullptr)
|
||||
return Void();
|
||||
|
||||
state std::vector<Future<json_spirit::mObject>> reads;
|
||||
state std::vector<Future<Optional<json_spirit::mObject>>> reads;
|
||||
for(auto &f : *pFiles)
|
||||
reads.push_back(tryReadJSONFile(f));
|
||||
|
||||
|
@ -318,13 +340,22 @@ ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
|||
|
||||
std::string key = b->key + "@" + b->host;
|
||||
|
||||
int invalid = 0;
|
||||
|
||||
for(auto &f : reads) {
|
||||
JSONDoc doc(f.get());
|
||||
// If value not present then the credentials file wasn't readable or valid. Continue to check other results.
|
||||
if(!f.get().present()) {
|
||||
++invalid;
|
||||
continue;
|
||||
}
|
||||
|
||||
JSONDoc doc(f.get().get());
|
||||
if(doc.has("accounts") && doc.last().type() == json_spirit::obj_type) {
|
||||
JSONDoc accounts(doc.last().get_obj());
|
||||
if(accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) {
|
||||
JSONDoc account(accounts.last());
|
||||
std::string secret;
|
||||
// Once we find a matching account, use it.
|
||||
if(account.tryGet("secret", secret)) {
|
||||
b->secret = secret;
|
||||
return Void();
|
||||
|
@ -333,7 +364,12 @@ ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
|||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
// If any sources were invalid
|
||||
if(invalid > 0)
|
||||
throw backup_auth_unreadable();
|
||||
|
||||
// All sources were valid but didn't contain the desired info
|
||||
throw backup_auth_missing();
|
||||
}
|
||||
|
||||
Future<Void> BlobStoreEndpoint::updateSecret() {
|
||||
|
@ -435,12 +471,9 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
|
|||
rconn.conn.clear();
|
||||
|
||||
} catch(Error &e) {
|
||||
// For timeouts, conn failure, or bad reponse reported by HTTP:doRequest, save the error and handle it / possibly retry below.
|
||||
// Any other error is rethrown.
|
||||
if(e.code() == error_code_connection_failed || e.code() == error_code_timed_out || e.code() == error_code_http_bad_response)
|
||||
err = e;
|
||||
else
|
||||
if(e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
err = e;
|
||||
}
|
||||
|
||||
// If err is not present then r is valid.
|
||||
|
@ -649,17 +682,29 @@ ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreE
|
|||
state BlobStoreEndpoint::ListResult results;
|
||||
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
|
||||
state Future<Void> done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter);
|
||||
// Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same stream
|
||||
done = map(done, [=](Void) {
|
||||
resultStream.sendError(end_of_stream());
|
||||
return Void();
|
||||
});
|
||||
|
||||
try {
|
||||
loop {
|
||||
choose {
|
||||
when(Void _ = wait(done)) {
|
||||
break;
|
||||
}
|
||||
// Throw if done throws, otherwise don't stop until end_of_stream
|
||||
when(Void _ = wait(done)) {}
|
||||
|
||||
when(BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) {
|
||||
results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end());
|
||||
results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_end_of_stream)
|
||||
throw;
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
|
|
|
@ -185,11 +185,12 @@ public:
|
|||
// Delete an object in a bucket
|
||||
Future<Void> deleteObject(std::string const &bucket, std::string const &object);
|
||||
|
||||
// Delete a bucket - note this is not atomic as blob store does not support this operation directly.
|
||||
// This method is just a convenience method that lists and deletes all of the objects in the bucket
|
||||
// Delete all objects in a bucket under a prefix. Note this is not atomic as blob store does not
|
||||
// support this operation directly. This method is just a convenience method that lists and deletes
|
||||
// all of the objects in the bucket under the given prefix.
|
||||
// Since it can take a while, if a pNumDeleted is provided then it will be incremented every time
|
||||
// a deletion of an object completes.
|
||||
Future<Void> deleteBucket(std::string const &bucket, int *pNumDeleted = NULL);
|
||||
Future<Void> deleteRecursively(std::string const &bucket, std::string prefix = "", int *pNumDeleted = NULL);
|
||||
|
||||
// Create a bucket if it does not already exists.
|
||||
Future<Void> createBucket(std::string const &bucket);
|
||||
|
|
|
@ -307,6 +307,9 @@ namespace HTTP {
|
|||
if(pContent == NULL)
|
||||
pContent = ∅
|
||||
|
||||
state bool earlyResponse = false;
|
||||
state int total_sent = 0;
|
||||
|
||||
try {
|
||||
// Write headers to a packet buffer chain
|
||||
PacketBuffer *pFirst = new PacketBuffer();
|
||||
|
@ -321,11 +324,25 @@ namespace HTTP {
|
|||
printf("Request Header: %s: %s\n", h.first.c_str(), h.second.c_str());
|
||||
}
|
||||
|
||||
state Reference<HTTP::Response> r(new HTTP::Response());
|
||||
state Future<Void> responseReading = r->read(conn, verb == "HEAD" || verb == "DELETE");
|
||||
|
||||
state double send_start = timer();
|
||||
state double total_sent = 0;
|
||||
|
||||
loop {
|
||||
Void _ = wait(conn->onWritable());
|
||||
Void _ = wait( delay( 0, TaskWriteSocket ) );
|
||||
|
||||
// If we already got a response, before finishing sending the request, then close the connection,
|
||||
// set the Connection header to "close" as a hint to the caller that this connection can't be used
|
||||
// again, and break out of the send loop.
|
||||
if(responseReading.isReady()) {
|
||||
conn->close();
|
||||
r->headers["Connection"] = "close";
|
||||
earlyResponse = true;
|
||||
break;
|
||||
}
|
||||
|
||||
state int trySend = CLIENT_KNOBS->HTTP_SEND_SIZE;
|
||||
Void _ = wait(sendRate->getAllowance(trySend));
|
||||
int len = conn->write(pContent->getUnsent(), trySend);
|
||||
|
@ -338,18 +355,20 @@ namespace HTTP {
|
|||
break;
|
||||
}
|
||||
|
||||
state Reference<HTTP::Response> r(new HTTP::Response());
|
||||
Void _ = wait(r->read(conn, verb == "HEAD" || verb == "DELETE"));
|
||||
Void _ = wait(responseReading);
|
||||
|
||||
double elapsed = timer() - send_start;
|
||||
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0)
|
||||
printf("[%s] HTTP code=%d, time=%fs %s %s [%u out, response content len %d]\n", conn->getDebugID().toString().c_str(), r->code, elapsed, verb.c_str(), resource.c_str(), (int)total_sent, (int)r->contentLen);
|
||||
printf("[%s] HTTP code=%d early=%d, time=%fs %s %s contentLen=%d [%d out, response content len %d]\n",
|
||||
conn->getDebugID().toString().c_str(), r->code, earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent, (int)r->contentLen);
|
||||
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 2)
|
||||
printf("[%s] HTTP RESPONSE: %s %s\n%s\n", conn->getDebugID().toString().c_str(), verb.c_str(), resource.c_str(), r->toString().c_str());
|
||||
return r;
|
||||
} catch(Error &e) {
|
||||
double elapsed = timer() - send_start;
|
||||
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0)
|
||||
printf("[%s] HTTP *ERROR*=%s, time=%fs %s %s [%u out]\n", conn->getDebugID().toString().c_str(), e.name(), elapsed, verb.c_str(), resource.c_str(), (int)total_sent);
|
||||
printf("[%s] HTTP *ERROR*=%s early=%d, time=%fs %s %s contentLen=%d [%d out]\n",
|
||||
conn->getDebugID().toString().c_str(), e.name(), earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -219,11 +219,12 @@ struct JSONDoc {
|
|||
return mergeOperator<T>(op, op_a, op_b, a.get_value<T>(), b.get_value<T>());
|
||||
}
|
||||
|
||||
static inline std::string getOperator(const json_spirit::mObject &obj) {
|
||||
static inline const std::string & getOperator(const json_spirit::mObject &obj) {
|
||||
static const std::string empty;
|
||||
for(auto &k : obj)
|
||||
if(!k.first.empty() && k.first[0] == '$')
|
||||
return k.first;
|
||||
return std::string();
|
||||
return empty;
|
||||
}
|
||||
|
||||
// Merge src into dest, applying merge operators
|
||||
|
|
|
@ -734,7 +734,7 @@ public:
|
|||
|
||||
if ( oldMasterFit < newMasterFit )
|
||||
return false;
|
||||
if ( oldMasterFit > newMasterFit )
|
||||
if ( oldMasterFit > newMasterFit || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.first.locality.processId() != clusterControllerProcessId ) )
|
||||
return true;
|
||||
|
||||
// Check tLog fitness
|
||||
|
@ -840,7 +840,8 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
|
|||
state double recoveryStart = now();
|
||||
TraceEvent("CCWDB", cluster->id).detail("Recruiting", "Master");
|
||||
state std::pair<WorkerInterface, ProcessClass> masterWorker = cluster->getMasterWorker(db->config);
|
||||
if( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
|
||||
if( ( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.first.locality.processId() == cluster->clusterControllerProcessId )
|
||||
&& now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
|
||||
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.second.machineClassFitness( ProcessClass::Master ));
|
||||
Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
|
||||
continue;
|
||||
|
|
|
@ -259,8 +259,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( SIM_SHUTDOWN_TIMEOUT, 10 );
|
||||
init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0;
|
||||
init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0;
|
||||
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 );
|
||||
init( ATTEMPT_RECRUITMENT_DELAY, 0.05 );
|
||||
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 0.1 );
|
||||
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );
|
||||
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
|
||||
init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001;
|
||||
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
|
||||
|
|
|
@ -1933,6 +1933,8 @@ TEST_CASE("status/json/merging") {
|
|||
a.create("not_expired_and_merged.$expires.seven.$sum") = 1;
|
||||
a.create("not_expired_and_merged.$expires.one.$min") = 3;
|
||||
a.create("not_expired_and_merged.version") = 3;
|
||||
a.create("mixed_numeric_sum_6.$sum") = 0.5;
|
||||
a.create("mixed_numeric_min_0.$min") = 1.5;
|
||||
|
||||
b.create("int_one") = 1;
|
||||
b.create("int_unmatched") = 3;
|
||||
|
@ -1956,6 +1958,8 @@ TEST_CASE("status/json/merging") {
|
|||
b.create("latest_obj.timestamp") = 2;
|
||||
b.create("latest_int_5.$latest") = 7;
|
||||
b.create("latest_int_5.timestamp") = 2;
|
||||
b.create("mixed_numeric_sum_6.$sum") = 1;
|
||||
b.create("mixed_numeric_min_0.$min") = 4.5;
|
||||
|
||||
c.create("int_total_30.$sum") = 0;
|
||||
c.create("not_expired.$expires") = "I am still valid";
|
||||
|
@ -1971,18 +1975,25 @@ TEST_CASE("status/json/merging") {
|
|||
c.create("latest_obj.$latest.not_expired.$expires") = "Still alive.";
|
||||
c.create("latest_obj.$latest.not_expired.version") = 3;
|
||||
c.create("latest_obj.timestamp") = 3;
|
||||
b.create("latest_int_5.$latest") = 5;
|
||||
b.create("latest_int_5.timestamp") = 3;
|
||||
c.create("latest_int_5.$latest") = 5;
|
||||
c.create("latest_int_5.timestamp") = 3;
|
||||
c.create("mixed_numeric_sum_6.$sum") = 4.5;
|
||||
c.create("mixed_numeric_min_0.$min") = (double)0.0;
|
||||
|
||||
printf("a = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str());
|
||||
printf("b = \n%s\n", json_spirit::write_string(json_spirit::mValue(objB), json_spirit::pretty_print).c_str());
|
||||
printf("c = \n%s\n", json_spirit::write_string(json_spirit::mValue(objC), json_spirit::pretty_print).c_str());
|
||||
|
||||
JSONDoc::expires_reference_version = 2;
|
||||
a.absorb(b);
|
||||
a.absorb(c);
|
||||
a.cleanOps();
|
||||
printf("result = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str());
|
||||
std::string result = json_spirit::write_string(json_spirit::mValue(objA));
|
||||
std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}";
|
||||
std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"mixed_numeric_min_0\":0,\"mixed_numeric_sum_6\":6,\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}";
|
||||
|
||||
if(result != expected) {
|
||||
printf("ERROR: Combined doc does not match expected.\nexpected: %s\nresult: %s\n", expected.c_str(), result.c_str());
|
||||
printf("ERROR: Combined doc does not match expected.\nexpected:\n\n%s\nresult:\n%s\n", expected.c_str(), result.c_str());
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
|
|
|
@ -124,6 +124,15 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> statusLoop(Database cx, std::string tag) {
|
||||
state FileBackupAgent agent;
|
||||
loop {
|
||||
std::string status = wait(agent.getStatus(cx, true, tag));
|
||||
puts(status.c_str());
|
||||
Void _ = wait(delay(2.0));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> doBackup(BackupAndRestoreCorrectnessWorkload* self, double startDelay, FileBackupAgent* backupAgent, Database cx,
|
||||
Key tag, Standalone<VectorRef<KeyRangeRef>> backupRanges, double stopDifferentialDelay, Promise<Void> submittted) {
|
||||
|
||||
|
@ -148,6 +157,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
TraceEvent("BARW_doBackupSubmitBackup", randomID).detail("tag", printable(tag)).detail("stopWhenDone", stopDifferentialDelay ? "False" : "True");
|
||||
|
||||
state std::string backupContainer = "file://simfdb/backups/";
|
||||
state Future<Void> status = statusLoop(cx, tag.toString());
|
||||
|
||||
try {
|
||||
Void _ = wait(backupAgent->submitBackup(cx, StringRef(backupContainer), g_random->randomInt(0, 100), tag.toString(), backupRanges, stopDifferentialDelay ? false : true));
|
||||
}
|
||||
|
|
|
@ -835,7 +835,7 @@ Future< Reference<IConnection> > Net2::connect( NetworkAddress toAddr ) {
|
|||
}
|
||||
|
||||
ACTOR static Future<std::vector<NetworkAddress>> resolveTCPEndpoint_impl( Net2 *self, std::string host, std::string service) {
|
||||
state Promise<std::vector<NetworkAddress>> result;
|
||||
Promise<std::vector<NetworkAddress>> result;
|
||||
|
||||
self->tcpResolver.async_resolve(tcp::resolver::query(host, service), [=](const boost::system::error_code &ec, tcp::resolver::iterator iter) {
|
||||
if(ec) {
|
||||
|
|
|
@ -174,6 +174,8 @@ ERROR( backup_bad_block_size, 2313, "Backup file block size too small")
|
|||
ERROR( backup_invalid_url, 2314, "Backup Container URL invalid")
|
||||
ERROR( backup_invalid_info, 2315, "Backup Container URL invalid")
|
||||
ERROR( backup_cannot_expire, 2316, "Cannot expire requested data from backup without violating minimum restorability")
|
||||
ERROR( backup_auth_missing, 2317, "Cannot find authentication details (such as a password or secret key) for the specified Backup Container URL")
|
||||
ERROR( backup_auth_unreadable, 2318, "Cannot read or parse one or more sources of authentication information for Backup Container URLs")
|
||||
ERROR( restore_invalid_version, 2361, "Invalid restore version")
|
||||
ERROR( restore_corrupted_data, 2362, "Corrupted backup data")
|
||||
ERROR( restore_missing_data, 2363, "Missing backup data")
|
||||
|
|
|
@ -1112,6 +1112,18 @@ ACTOR template <class T> void tagAndForwardError( Promise<T>* pOutputPromise, Er
|
|||
out.sendError(value);
|
||||
}
|
||||
|
||||
ACTOR template <class T> Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
|
||||
choose {
|
||||
when(T val = wait(f)) {
|
||||
return val;
|
||||
}
|
||||
when(Void _ = wait(errorSignal)) {
|
||||
ASSERT(false);
|
||||
throw internal_error();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
|
||||
// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between
|
||||
// wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the
|
||||
|
|
|
@ -60,7 +60,6 @@ chmod 0644 $CLIENTSDIR/usr/share/doc/foundationdb-clients/README
|
|||
install -m 0755 bin/fdbbackup $CLIENTSDIR/usr/lib/foundationdb/backup_agent/backup_agent
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbbackup
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbrestore
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbblob
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbdr
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/dr_agent
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{62160341-2769-4104-9D74-4235618775EA}'
|
||||
Id='{61C46988-7589-4B8A-9BB9-D850FD5B8B05}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
@ -151,7 +151,6 @@
|
|||
<Component Id='FDBBackupClientExecutable' Guid='{F5E0A796-EC21-406F-88E1-3195165E3078}' Win64='yes'>
|
||||
<File Id='FDBBACKUPEXE' Name='fdbbackup.exe' DiskId='1' Source='$(var.bindir)fdbbackup.exe' KeyPath='yes'/>
|
||||
<File Id='FDBRESTOREEXE' Name='fdbrestore.exe' DiskId='1' Source='$(var.bindir)fdbrestore.exe' KeyPath='no'/>
|
||||
<File Id='FDBBLOBEXE' Name='fdbblob.exe' DiskId='1' Source='$(var.bindir)fdbblob.exe' KeyPath='no'/>
|
||||
<File Id='FDBDREXE' Name='fdbdr.exe' DiskId='1' Source='$(var.bindir)fdbdr.exe' KeyPath='no'/>
|
||||
</Component>
|
||||
|
||||
|
|
|
@ -33,7 +33,6 @@ dos2unix README.md $CLIENTSDIR/usr/local/foundationdb/README
|
|||
chmod 0644 $CLIENTSDIR/usr/local/foundationdb/README
|
||||
ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbbackup
|
||||
ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbrestore
|
||||
ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbblob
|
||||
ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbdr
|
||||
ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/dr_agent
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#!/bin/bash -x
|
||||
|
||||
rm -f /usr/local/libexec/{fdbserver,fdbmonitor}
|
||||
rm -f /usr/local/bin/{fdbcli,fdbbackup,fdbrestore,fdbblob,fdbdr}
|
||||
rm -f /usr/local/bin/{fdbcli,fdbbackup,fdbrestore,fdbdr}
|
||||
rm -f /usr/local/lib/libfdb_c.dylib
|
||||
rm -rf /usr/local/include/foundationdb
|
||||
rm -rf /usr/local/foundationdb/backup_agent
|
||||
|
|
|
@ -44,7 +44,6 @@ install -m 0755 packaging/make_public.py $INSTDIR/usr/lib/foundationdb
|
|||
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbbackup
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbrestore
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbblob
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbdr
|
||||
ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/dr_agent
|
||||
|
||||
|
|
|
@ -133,6 +133,5 @@ ifdef(`RHEL6', `/usr/lib/foundationdb/argparse.py')
|
|||
/usr/bin/fdbdr
|
||||
/usr/bin/fdbbackup
|
||||
/usr/bin/fdbrestore
|
||||
/usr/bin/fdbblob
|
||||
/usr/lib64/libfdb_c.so
|
||||
/usr/include/*
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
<?xml version="1.0"?>
|
||||
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<PropertyGroup>
|
||||
<Version>5.1.1</Version>
|
||||
<Version>5.1.3</Version>
|
||||
<PackageName>5.1</PackageName>
|
||||
</PropertyGroup>
|
||||
</Project>
|
||||
|
|
Loading…
Reference in New Issue