diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 3b1c60bb80..a2e808c7dd 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -1557,12 +1557,12 @@ ACTOR Future statusDBBackup(Database src, Database dest, std::string tagNa return Void(); } -ACTOR Future statusBackup(Database db, std::string tagName, int errorLimit) { +ACTOR Future statusBackup(Database db, std::string tagName, bool showErrors) { try { state FileBackupAgent backupAgent; - std::string statusText = wait(backupAgent.getStatus(db, errorLimit, tagName)); + std::string statusText = wait(backupAgent.getStatus(db, showErrors, tagName)); printf("%s\n", statusText.c_str()); } catch (Error& e) { @@ -2279,6 +2279,8 @@ int main(int argc, char* argv[]) { bool dryRun = false; std::string traceDir = ""; std::string traceLogGroup; + uint64_t traceRollSize = TRACE_DEFAULT_ROLL_SIZE; + uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE; ESOError lastError; bool partial = true; LocalityData localities; @@ -2706,6 +2708,14 @@ int main(int argc, char* argv[]) { } } + // Opens a trace file if trace is set (and if a trace file isn't already open) + // For most modes, initCluster() will open a trace file, but some fdbbackup operations do not require + // a cluster so they should use this instead. + auto initTraceFile = [&]() { + if(trace) + openTraceFile(NetworkAddress(), traceRollSize, traceMaxLogsSize, traceDir, "trace", traceLogGroup); + }; + auto initCluster = [&](bool quiet = false) { auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile); try { @@ -2784,7 +2794,7 @@ int main(int argc, char* argv[]) { case BACKUP_STATUS: if(!initCluster()) return FDB_EXIT_ERROR; - f = stopAfter( statusBackup(db, tagName, maxErrors) ); + f = stopAfter( statusBackup(db, tagName, true) ); break; case BACKUP_ABORT: @@ -2818,6 +2828,7 @@ int main(int argc, char* argv[]) { break; case BACKUP_EXPIRE: + initTraceFile(); // Must have a usable cluster if either expire DateTime options were used if(!expireDatetime.empty() || !expireRestorableAfterDatetime.empty()) { if(!initCluster()) @@ -2827,10 +2838,12 @@ int main(int argc, char* argv[]) { break; case BACKUP_DELETE: + initTraceFile(); f = stopAfter( deleteBackupContainer(argv[0], destinationContainer) ); break; case BACKUP_DESCRIBE: + initTraceFile(); // If timestamp lookups are desired, require a cluster file if(describeTimestamps && !initCluster()) return FDB_EXIT_ERROR; @@ -2839,6 +2852,7 @@ int main(int argc, char* argv[]) { f = stopAfter( describeBackup(argv[0], destinationContainer, describeDeep, describeTimestamps ? Optional(db) : Optional()) ); break; case BACKUP_LIST: + initTraceFile(); f = stopAfter( listBackup(baseUrl) ); break; diff --git a/fdbbackup/fdbbackup.vcxproj b/fdbbackup/fdbbackup.vcxproj index fa36ff3f15..c215b45322 100644 --- a/fdbbackup/fdbbackup.vcxproj +++ b/fdbbackup/fdbbackup.vcxproj @@ -129,6 +129,5 @@ - diff --git a/fdbbackup/local.mk b/fdbbackup/local.mk index 602daaf1d6..69c9318827 100644 --- a/fdbbackup/local.mk +++ b/fdbbackup/local.mk @@ -47,7 +47,7 @@ bin/fdbbackup: bin/coverage.fdbbackup.xml bin/fdbbackup.debug: bin/fdbbackup -BACKUP_ALIASES = fdbrestore fdbblob fdbdr dr_agent backup_agent +BACKUP_ALIASES = fdbrestore fdbdr dr_agent backup_agent $(addprefix bin/, $(BACKUP_ALIASES)): bin/fdbbackup @[ -f $@ ] || (echo "SymLinking $@" && ln -s fdbbackup $@) diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index e0878784cb..91b803f3ca 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -624,7 +624,8 @@ void getBackupDRTags(StatusObjectReader &statusObjCluster, const char *context, for(auto itr : tags.obj()) { JSONDoc tag(itr.second); bool running = false; - if(tag.tryGet("running_backup", running)) { + tag.tryGet("running_backup", running); + if(running) { std::string uid; if(tag.tryGet("mutation_stream_id", uid)) { tagMap[itr.first] = uid; diff --git a/fdbclient/BackupAgent.h b/fdbclient/BackupAgent.h index 1f7fdcfd2f..54aeed363a 100644 --- a/fdbclient/BackupAgent.h +++ b/fdbclient/BackupAgent.h @@ -265,7 +265,7 @@ public: return runRYWTransaction(cx, [=](Reference tr){ return abortBackup(tr, tagName); }); } - Future getStatus(Database cx, int errorLimit, std::string tagName); + Future getStatus(Database cx, bool showErrors, std::string tagName); Future getLastRestorable(Reference tr, Key tagName); void setLastRestorable(Reference tr, Key tagName, Version version); @@ -530,10 +530,31 @@ public: } // lastError is a pair of error message and timestamp expressed as an int64_t - KeyBackedProperty> lastError() { + KeyBackedProperty> lastError() { return configSpace.pack(LiteralStringRef(__FUNCTION__)); } + KeyBackedMap> lastErrorPerType() { + return configSpace.pack(LiteralStringRef(__FUNCTION__)); + } + + // Updates the error per type map and the last error property + Future updateErrorInfo(Database cx, Error e, std::string message) { + // Avoid capture of this ptr + auto © = *this; + + return runRYWTransaction(cx, [=] (Reference tr) mutable { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + return map(tr->getReadVersion(), [=] (Version v) mutable { + copy.lastError().set(tr, {message, v}); + copy.lastErrorPerType().set(tr, e.code(), {message, v}); + return Void(); + }); + }); + } + protected: UID uid; Key prefix; @@ -715,7 +736,8 @@ public: if(e.code() == error_code_key_not_found) t.backtrace(); std::string msg = format("ERROR: %s %s", e.what(), details.c_str()); - return lastError().set(cx, {msg, (int64_t)now()}); + + return updateErrorInfo(cx, e, msg); } }; #endif diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index ead2f6835c..5288eae43d 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -962,7 +962,7 @@ private: // Backup files to under a single folder prefix with subfolders for each named backup static const std::string DATAFOLDER; - // The metafolder contains keys for which user-named backups exist. Backup names can contain an arbitrary + // Indexfolder contains keys for which user-named backups exist. Backup names can contain an arbitrary // number of slashes so the backup names are kept in a separate folder tree from their actual data. static const std::string INDEXFOLDER; @@ -1080,36 +1080,8 @@ public: } ACTOR static Future deleteContainer_impl(Reference bc, int *pNumDeleted) { - state PromiseStream resultStream; - state Future done = bc->m_bstore->listBucketStream(BUCKET, resultStream, bc->dataPath(""), '/', std::numeric_limits::max()); - state std::list> deleteFutures; - loop { - choose { - when(Void _ = wait(done)) { - break; - } - when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) { - for(auto &object : list.objects) { - int *pNumDeletedCopy = pNumDeleted; // avoid capture of this - deleteFutures.push_back(map(bc->m_bstore->deleteObject(BUCKET, object.name), [pNumDeletedCopy](Void) { - if(pNumDeletedCopy != nullptr) - ++*pNumDeletedCopy; - return Void(); - })); - } - - while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) { - Void _ = wait(deleteFutures.front()); - deleteFutures.pop_front(); - } - } - } - } - - while(deleteFutures.size() > 0) { - Void _ = wait(deleteFutures.front()); - deleteFutures.pop_front(); - } + // First delete everything under the data prefix in the bucket + Void _ = wait(bc->m_bstore->deleteRecursively(BUCKET, bc->dataPath(""), pNumDeleted)); // Now that all files are deleted, delete the index entry Void _ = wait(bc->m_bstore->deleteObject(BUCKET, bc->indexEntry())); @@ -1357,23 +1329,23 @@ ACTOR Future testBackupContainer(std::string url) { state int64_t versionShift = g_random->randomInt64(0, std::numeric_limits::max() - 500); state Reference log1 = wait(c->writeLogFile(100 + versionShift, 150 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, log1, 0)); - state Reference log2 = wait(c->writeLogFile(150 + versionShift, 300 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000))); - state Reference range1 = wait(c->writeRangeFile(160 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000))); - state Reference range2 = wait(c->writeRangeFile(300 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000))); - state Reference range3 = wait(c->writeRangeFile(310 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000))); - Void _ = wait(c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size())); + Void _ = wait( + writeAndVerifyFile(c, log1, 0) + && writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000)) + && writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000)) + && writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000)) + && writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000)) + ); - Void _ = wait(c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size())); + Void _ = wait( + c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size()) + && c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size()) + ); printf("Checking file list dump\n"); FullBackupListing listing = wait(c->dumpFileList()); diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index b98c4f1263..a70963efec 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -178,7 +178,8 @@ public: if(e.code() == error_code_key_not_found) t.backtrace(); std::string msg = format("ERROR: %s (%s)", details.c_str(), e.what()); - return lastError().set(cx, {msg, (int64_t)now()}); + + return updateErrorInfo(cx, e, msg); } Key mutationLogPrefix() { @@ -283,7 +284,7 @@ ACTOR Future RestoreConfig::getProgress_impl(RestoreConfig restore, state Future status = restore.stateText(tr); state Future lag = restore.getApplyVersionLag(tr); state Future tag = restore.tag().getD(tr); - state Future> lastError = restore.lastError().getD(tr); + state Future> lastError = restore.lastError().getD(tr); // restore might no longer be valid after the first wait so make sure it is not needed anymore. state UID uid = restore.getUid(); @@ -291,7 +292,7 @@ ACTOR Future RestoreConfig::getProgress_impl(RestoreConfig restore, std::string errstr = "None"; if(lastError.get().second != 0) - errstr = format("'%s' %llds ago.\n", lastError.get().first.c_str(), (int64_t)now() - lastError.get().second); + errstr = format("'%s' %llds ago.\n", lastError.get().first.c_str(), (tr->getReadVersion().get() - lastError.get().second) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND ); TraceEvent("FileRestoreProgress") .detail("RestoreUID", uid) @@ -1748,9 +1749,10 @@ namespace fileBackup { state Version beginVersion = Params.beginVersion().get(task); state Version endVersion = Params.endVersion().get(task); state Reference taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); + state BackupConfig config(task); if(Params.fileSize().exists(task)) { - BackupConfig(task).logBytesWritten().atomicOp(tr, Params.fileSize().get(task), MutationRef::AddValue); + config.logBytesWritten().atomicOp(tr, Params.fileSize().get(task), MutationRef::AddValue); } if (Params.addBackupLogRangeTasks().get(task)) { @@ -1761,7 +1763,7 @@ namespace fileBackup { } if(endVersion > beginVersion) { - Standalone> ranges = getLogRanges(beginVersion, endVersion, task->params[FileBackupAgent::keyConfigLogUid]); + Standalone> ranges = getLogRanges(beginVersion, endVersion, config.getUidAsKey()); for (auto & rng : ranges) tr->clear(rng); } @@ -1799,10 +1801,17 @@ namespace fileBackup { state bool stopWhenDone; state Optional restorableVersion; state EBackupState backupState; + state Optional tag; Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone) && store(config.getLatestRestorableVersion(tr), restorableVersion) - && store(config.stateEnum().getOrThrow(tr), backupState)); + && store(config.stateEnum().getOrThrow(tr), backupState) + && store(config.tag().get(tr), tag)); + + // If restorable, update the last restorable version for this tag + if(restorableVersion.present() && tag.present()) { + FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get()); + } // If the backup is restorable but the state is not differential then set state to differential if(restorableVersion.present() && backupState != BackupAgentBase::STATE_DIFFERENTIAL) @@ -2014,11 +2023,18 @@ namespace fileBackup { state EBackupState backupState; state Optional restorableVersion; state Optional firstSnapshotEndVersion; + state Optional tag; Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone) && store(config.stateEnum().getOrThrow(tr), backupState) && store(config.getLatestRestorableVersion(tr), restorableVersion) - && store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion)); + && store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion) + && store(config.tag().get(tr), tag)); + + // If restorable, update the last restorable version for this tag + if(restorableVersion.present() && tag.present()) { + FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get()); + } if(!firstSnapshotEndVersion.present()) { config.firstSnapshotEndVersion().set(tr, Params.endVersion().get(task)); @@ -3460,7 +3476,7 @@ public: return Void(); } - ACTOR static Future getStatus(FileBackupAgent* backupAgent, Database cx, int errorLimit, std::string tagName) { + ACTOR static Future getStatus(FileBackupAgent* backupAgent, Database cx, bool showErrors, std::string tagName) { state Reference tr(new ReadYourWritesTransaction(cx)); state std::string statusText; @@ -3487,8 +3503,15 @@ public: statusText += "No previous backups found.\n"; } else { state std::string backupStatus(BackupAgentBase::getStateText(backupState)); - state Reference bc = wait(config.backupContainer().getOrThrow(tr)); - state Optional stopVersion = wait(config.getLatestRestorableVersion(tr)); + state Reference bc; + state Optional latestRestorableVersion; + state Version recentReadVersion; + + Void _ = wait( store(config.getLatestRestorableVersion(tr), latestRestorableVersion) + && store(config.backupContainer().getOrThrow(tr), bc) + && store(tr->getReadVersion(), recentReadVersion) + ); + bool snapshotProgress = false; switch (backupState) { @@ -3504,7 +3527,7 @@ public: snapshotProgress = true; break; case BackupAgentBase::STATE_COMPLETED: - statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " completed at version " + format("%lld", stopVersion.orDefault(-1)) + ".\n"; + statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " completed at version " + format("%lld", latestRestorableVersion.orDefault(-1)) + ".\n"; break; default: statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " " + backupStatus + ".\n"; @@ -3513,29 +3536,45 @@ public: if(snapshotProgress) { state int64_t snapshotInterval; - state Version recentReadVersion; state Version snapshotBeginVersion; state Version snapshotTargetEndVersion; - Void _ = wait(store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion) + Void _ = wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion) && store(config.snapshotTargetEndVersion().getOrThrow(tr), snapshotTargetEndVersion) && store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotInterval) - && store(tr->getReadVersion(), recentReadVersion)); + ); statusText += format("Snapshot interval is %lld seconds. ", snapshotInterval); if(backupState == BackupAgentBase::STATE_DIFFERENTIAL) - statusText += format("Current snapshot progress target is %3.2f%%\n", 100.0 * (recentReadVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion)) ; + statusText += format("Current snapshot progress target is %3.2f%% (>100%% means the snapshot is supposed to be done)\n", 100.0 * (recentReadVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion)) ; else statusText += "The initial snapshot is still running.\n"; } - } - // Append the errors, if requested - if (errorLimit > 0 && config.getUid().isValid()) { - Optional> errMsg = wait(config.lastError().get(tr)); - if (errMsg.present()) { - statusText += "WARNING: Some backup agents have reported issues:\n"; - statusText += format("[%lld]: %s\n", errMsg.get().second, errMsg.get().first.c_str()); + // Append the errors, if requested + if (showErrors) { + KeyBackedMap>::PairsType errors = wait(config.lastErrorPerType().getRange(tr, 0, std::numeric_limits::max(), CLIENT_KNOBS->TOO_MANY)); + std::string recentErrors; + std::string pastErrors; + + for(auto &e : errors) { + Version v = e.second.second; + std::string msg = format("%s (%lld seconds ago)\n", e.second.first.c_str(), + (recentReadVersion - v) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND); + + // If error version is at or more recent than the latest restorable version then it could be inhibiting progress + if(v >= latestRestorableVersion.orDefault(0)) { + recentErrors += msg; + } + else { + pastErrors += msg; + } + } + + if(!recentErrors.empty()) + statusText += "Errors possibly preventing progress:\n" + recentErrors; + if(!pastErrors.empty()) + statusText += "Older errors:\n" + pastErrors; } } @@ -3756,8 +3795,8 @@ Future FileBackupAgent::abortBackup(Reference t return FileBackupAgentImpl::abortBackup(this, tr, tagName); } -Future FileBackupAgent::getStatus(Database cx, int errorLimit, std::string tagName) { - return FileBackupAgentImpl::getStatus(this, cx, errorLimit, tagName); +Future FileBackupAgent::getStatus(Database cx, bool showErrors, std::string tagName) { + return FileBackupAgentImpl::getStatus(this, cx, showErrors, tagName); } Future FileBackupAgent::getLastRestorable(Reference tr, Key tagName) { diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 6103e22003..5bd8f295a3 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -1242,7 +1242,7 @@ ThreadFuture> MultiVersionApi::createCluster(const char *clu lock.enter(); if(!networkSetup) { lock.leave(); - throw network_not_setup(); + return network_not_setup(); } lock.leave(); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 290c8f6615..f2c4889bd6 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1307,7 +1307,7 @@ Future< Standalone >> ReadYourWritesTransaction::getAddre // If key >= allKeys.end, then our resulting address vector will be empty. - Future< Standalone >> result = tr.getAddressesForKey(key); + Future< Standalone >> result = waitOrError(tr.getAddressesForKey(key), resetPromise.getFuture()); reading.add( success( result ) ); return result; } @@ -1681,6 +1681,14 @@ Future ReadYourWritesTransaction::commit() { return RYWImpl::commit( this ); } +Future> ReadYourWritesTransaction::getVersionstamp() { + if(checkUsedDuringCommit()) { + return used_during_commit(); + } + + return waitOrError(tr.getVersionstamp(), resetPromise.getFuture()); +} + void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional value ) { switch(option) { case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE: diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 7fdc52b140..9bf97402e9 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -103,13 +103,7 @@ public: Future commit(); Version getCommittedVersion() { return tr.getCommittedVersion(); } - Future> getVersionstamp() { - if(checkUsedDuringCommit()) { - return used_during_commit(); - } - - return tr.getVersionstamp(); - } + Future> getVersionstamp(); void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ); diff --git a/fdbclient/StatusClient.actor.cpp b/fdbclient/StatusClient.actor.cpp index 2814f58fd0..a4ddf05d2b 100644 --- a/fdbclient/StatusClient.actor.cpp +++ b/fdbclient/StatusClient.actor.cpp @@ -146,98 +146,113 @@ void JSONDoc::mergeInto(json_spirit::mObject &dst, const json_spirit::mObject &s } } -void JSONDoc::mergeValueInto(json_spirit::mValue &d, const json_spirit::mValue &s) { - if(s.is_null()) +void JSONDoc::mergeValueInto(json_spirit::mValue &dst, const json_spirit::mValue &src) { + if(src.is_null()) return; - if(d.is_null()) { - d = s; + if(dst.is_null()) { + dst = src; return; } - if(d.type() != s.type()) { - // Skip errors already found - if(d.type() == json_spirit::obj_type && d.get_obj().count("ERROR")) - return; - d = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", d}, {"b", s}}); + // Do nothing if d is already an error + if(dst.type() == json_spirit::obj_type && dst.get_obj().count("ERROR")) + return; + + if(dst.type() != src.type()) { + dst = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", dst}, {"b", src}}); return; } - switch(d.type()) { + switch(dst.type()) { case json_spirit::obj_type: { - std::string op = getOperator(d.get_obj()); - - //printf("Operator: %s\n", op.c_str()); + // Refs to the objects, for convenience. + json_spirit::mObject &aObj = dst.get_obj(); + const json_spirit::mObject &bObj = src.get_obj(); + const std::string &op = getOperator(aObj); + const std::string &opB = getOperator(bObj); + + // Operators must be the same, which could mean both are empty (if these objects are not operators) + if(op != opB) { + dst = json_spirit::mObject({ {"ERROR", "Operators do not match"}, {"a", dst}, {"b", src} }); + break; + } + + // If objects are not operators then defer to mergeInto if(op.empty()) { - mergeInto(d.get_obj(), s.get_obj()); + mergeInto(dst.get_obj(), src.get_obj()); break; } - // Refs to the operator objects to combine - const json_spirit::mObject &op_a = d.get_obj(); - const json_spirit::mObject &op_b = s.get_obj(); - - if(!op_b.count(op)) { - d = json_spirit::mObject({{"ERROR", "Operators do not match"}, {"s", s}, {"d", d}}); - break; - } - - const json_spirit::mValue &a = d.get_obj().at(op); - const json_spirit::mValue &b = s.get_obj().at(op); + // Get the operator values + json_spirit::mValue &a = aObj.at(op); + const json_spirit::mValue &b = bObj.at(op); // First try the operators that are type-agnostic try { - d = mergeOperator(op, op_a, op_b, a, b); + dst = mergeOperator(op, aObj, bObj, a, b); return; } catch(std::exception &e) { } - // If that didn't work, the types must match or we have no operators left to try. + // Now try type and type pair specific operators + // First, if types are incompatible try to make them compatible or return an error if(a.type() != b.type()) { - d = json_spirit::mObject({{"ERROR", "Types do not match"}, {"s", s}, {"d", d}}); - return; + // It's actually okay if the type mismatch is double vs int since once can be converted to the other. + if( (a.type() == json_spirit::int_type && b.type() == json_spirit::real_type) + || (b.type() == json_spirit::int_type && a.type() == json_spirit::real_type) ) + { + // Convert d's op value (which a is a reference to) to a double so that the + // switch block below will do the operation with doubles. + a = a.get_real(); + } + else { + // Otherwise, output an error as the types do not match + dst = json_spirit::mObject({{"ERROR", "Incompatible operator value types"}, {"a", dst}, {"b", src}}); + return; + } } // Now try the type-specific operators. try { switch(a.type()) { case json_spirit::bool_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::int_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::real_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::str_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::array_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::obj_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::null_type: break; } } catch(...) { - d = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}}); + dst = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}}); } break; } case json_spirit::array_type: - for(auto &ai : s.get_array()) - d.get_array().push_back(ai); + for(auto &ai : src.get_array()) + dst.get_array().push_back(ai); break; default: - if(d != s) - d = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", d}, {"b", s}}); + if(dst != src) + dst = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", dst}, {"b", src}}); } } diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.actor.cpp index b1e4e6e947..977c7af30b 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.actor.cpp @@ -389,20 +389,19 @@ public: })); } } catch(Error &e) { - state Error e2 = e; TraceEvent(SevWarn, "TB_ExecuteFailure") .detail("TaskUID", task->key.printable()) .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) .detail("Priority", task->getPriority()) .error(e); try { - Void _ = wait(taskFunc->handleError(cx, task, e2)); + Void _ = wait(taskFunc->handleError(cx, task, e)); } catch(Error &e) { TraceEvent(SevWarn, "TB_ExecuteFailureLogErrorFailed") .detail("TaskUID", task->key.printable()) .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) .detail("Priority", task->getPriority()) - .error(e2); + .error(e); // output handleError() error instead of original task error } } diff --git a/fdbrpc/BlobStore.actor.cpp b/fdbrpc/BlobStore.actor.cpp index 5ca222d3d5..9d0b510ffc 100644 --- a/fdbrpc/BlobStore.actor.cpp +++ b/fdbrpc/BlobStore.actor.cpp @@ -215,34 +215,56 @@ Future BlobStoreEndpoint::deleteObject(std::string const &bucket, std::str return deleteObject_impl(Reference::addRef(this), bucket, object); } -ACTOR Future deleteBucket_impl(Reference b, std::string bucket, int *pNumDeleted) { +ACTOR Future deleteRecursively_impl(Reference b, std::string bucket, std::string prefix, int *pNumDeleted) { state PromiseStream resultStream; - state Future done = b->listBucketStream(bucket, resultStream); - state std::vector> deleteFutures; - loop { - choose { - when(Void _ = wait(done)) { - break; - } - when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) { - for(auto &object : list.objects) { - int *pNumDeletedCopy = pNumDeleted; // avoid capture of this - deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void { - if(pNumDeletedCopy != nullptr) - ++*pNumDeletedCopy; - return Void(); - })); + // Start a recursive parallel listing which will send results to resultStream as they are received + state Future done = b->listBucketStream(bucket, resultStream, prefix, '/', std::numeric_limits::max()); + // Wrap done in an actor which will send end_of_stream since listBucketStream() does not (so that many calls can write to the same stream) + done = map(done, [=](Void) { + resultStream.sendError(end_of_stream()); + return Void(); + }); + + state std::list> deleteFutures; + try { + loop { + choose { + // Throw if done throws, otherwise don't stop until end_of_stream + when(Void _ = wait(done)) {} + + when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) { + for(auto &object : list.objects) { + int *pNumDeletedCopy = pNumDeleted; // avoid capture of this + deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void { + if(pNumDeletedCopy != nullptr) + ++*pNumDeletedCopy; + return Void(); + })); + } } } + + // This is just a precaution to avoid having too many outstanding delete actors waiting to run + while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) { + Void _ = wait(deleteFutures.front()); + deleteFutures.pop_front(); + } } + } catch(Error &e) { + if(e.code() != error_code_end_of_stream) + throw; + } + + while(deleteFutures.size() > 0) { + Void _ = wait(deleteFutures.front()); + deleteFutures.pop_front(); } - Void _ = wait(waitForAll(deleteFutures)); return Void(); } -Future BlobStoreEndpoint::deleteBucket(std::string const &bucket, int *pNumDeleted) { - return deleteBucket_impl(Reference::addRef(this), bucket, pNumDeleted); +Future BlobStoreEndpoint::deleteRecursively(std::string const &bucket, std::string prefix, int *pNumDeleted) { + return deleteRecursively_impl(Reference::addRef(this), bucket, prefix, pNumDeleted); } ACTOR Future createBucket_impl(Reference b, std::string bucket) { @@ -274,35 +296,35 @@ Future BlobStoreEndpoint::objectSize(std::string const &bucket, std::st // Try to read a file, parse it as JSON, and return the resulting document. // It will NOT throw if any errors are encountered, it will just return an empty // JSON object and will log trace events for the errors encountered. -ACTOR Future tryReadJSONFile(std::string path) { +ACTOR Future> tryReadJSONFile(std::string path) { state std::string content; + // Event type to be logged in the event of an exception + state const char *errorEventType = "BlobCredentialFileError"; + try { - state Reference f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY, 0)); + state Reference f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0)); state int64_t size = wait(f->size()); state Standalone buf = makeString(size); int r = wait(f->read(mutateString(buf), size, 0)); ASSERT(r == size); content = buf.toString(); - } catch(Error &e) { - if(e.code() != error_code_actor_cancelled) - TraceEvent(SevWarn, "BlobCredentialFileError").detail("File", path).error(e).suppressFor(60, true); - return json_spirit::mObject(); - } - try { + // Any exceptions from hehre forward are parse failures + errorEventType = "BlobCredentialFileParseFailed"; json_spirit::mValue json; json_spirit::read_string(content, json); if(json.type() == json_spirit::obj_type) return json.get_obj(); else TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").detail("File", path).suppressFor(60, true); + } catch(Error &e) { if(e.code() != error_code_actor_cancelled) - TraceEvent(SevWarn, "BlobCredentialFileParseFailed").detail("File", path).error(e).suppressFor(60, true); + TraceEvent(SevWarn, errorEventType).detail("File", path).error(e).suppressFor(60, true); } - return json_spirit::mObject(); + return Optional(); } ACTOR Future updateSecret_impl(Reference b) { @@ -310,7 +332,7 @@ ACTOR Future updateSecret_impl(Reference b) { if(pFiles == nullptr) return Void(); - state std::vector> reads; + state std::vector>> reads; for(auto &f : *pFiles) reads.push_back(tryReadJSONFile(f)); @@ -318,13 +340,22 @@ ACTOR Future updateSecret_impl(Reference b) { std::string key = b->key + "@" + b->host; + int invalid = 0; + for(auto &f : reads) { - JSONDoc doc(f.get()); + // If value not present then the credentials file wasn't readable or valid. Continue to check other results. + if(!f.get().present()) { + ++invalid; + continue; + } + + JSONDoc doc(f.get().get()); if(doc.has("accounts") && doc.last().type() == json_spirit::obj_type) { JSONDoc accounts(doc.last().get_obj()); if(accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) { JSONDoc account(accounts.last()); std::string secret; + // Once we find a matching account, use it. if(account.tryGet("secret", secret)) { b->secret = secret; return Void(); @@ -333,7 +364,12 @@ ACTOR Future updateSecret_impl(Reference b) { } } - return Void(); + // If any sources were invalid + if(invalid > 0) + throw backup_auth_unreadable(); + + // All sources were valid but didn't contain the desired info + throw backup_auth_missing(); } Future BlobStoreEndpoint::updateSecret() { @@ -435,12 +471,9 @@ ACTOR Future> doRequest_impl(Reference listBucket_impl(Reference resultStream; state Future done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter); - loop { - choose { - when(Void _ = wait(done)) { - break; - } - when(BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) { - results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end()); - results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end()); + // Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same stream + done = map(done, [=](Void) { + resultStream.sendError(end_of_stream()); + return Void(); + }); + + try { + loop { + choose { + // Throw if done throws, otherwise don't stop until end_of_stream + when(Void _ = wait(done)) {} + + when(BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) { + results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end()); + results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end()); + } } } + } catch(Error &e) { + if(e.code() != error_code_end_of_stream) + throw; } + return results; } diff --git a/fdbrpc/BlobStore.h b/fdbrpc/BlobStore.h index 9d900ccc5f..df281b2291 100644 --- a/fdbrpc/BlobStore.h +++ b/fdbrpc/BlobStore.h @@ -185,11 +185,12 @@ public: // Delete an object in a bucket Future deleteObject(std::string const &bucket, std::string const &object); - // Delete a bucket - note this is not atomic as blob store does not support this operation directly. - // This method is just a convenience method that lists and deletes all of the objects in the bucket + // Delete all objects in a bucket under a prefix. Note this is not atomic as blob store does not + // support this operation directly. This method is just a convenience method that lists and deletes + // all of the objects in the bucket under the given prefix. // Since it can take a while, if a pNumDeleted is provided then it will be incremented every time // a deletion of an object completes. - Future deleteBucket(std::string const &bucket, int *pNumDeleted = NULL); + Future deleteRecursively(std::string const &bucket, std::string prefix = "", int *pNumDeleted = NULL); // Create a bucket if it does not already exists. Future createBucket(std::string const &bucket); diff --git a/fdbrpc/HTTP.actor.cpp b/fdbrpc/HTTP.actor.cpp index 08a1cff63b..63bb24af78 100644 --- a/fdbrpc/HTTP.actor.cpp +++ b/fdbrpc/HTTP.actor.cpp @@ -307,6 +307,9 @@ namespace HTTP { if(pContent == NULL) pContent = ∅ + state bool earlyResponse = false; + state int total_sent = 0; + try { // Write headers to a packet buffer chain PacketBuffer *pFirst = new PacketBuffer(); @@ -321,11 +324,25 @@ namespace HTTP { printf("Request Header: %s: %s\n", h.first.c_str(), h.second.c_str()); } + state Reference r(new HTTP::Response()); + state Future responseReading = r->read(conn, verb == "HEAD" || verb == "DELETE"); + state double send_start = timer(); - state double total_sent = 0; + loop { Void _ = wait(conn->onWritable()); Void _ = wait( delay( 0, TaskWriteSocket ) ); + + // If we already got a response, before finishing sending the request, then close the connection, + // set the Connection header to "close" as a hint to the caller that this connection can't be used + // again, and break out of the send loop. + if(responseReading.isReady()) { + conn->close(); + r->headers["Connection"] = "close"; + earlyResponse = true; + break; + } + state int trySend = CLIENT_KNOBS->HTTP_SEND_SIZE; Void _ = wait(sendRate->getAllowance(trySend)); int len = conn->write(pContent->getUnsent(), trySend); @@ -338,18 +355,20 @@ namespace HTTP { break; } - state Reference r(new HTTP::Response()); - Void _ = wait(r->read(conn, verb == "HEAD" || verb == "DELETE")); + Void _ = wait(responseReading); + double elapsed = timer() - send_start; if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0) - printf("[%s] HTTP code=%d, time=%fs %s %s [%u out, response content len %d]\n", conn->getDebugID().toString().c_str(), r->code, elapsed, verb.c_str(), resource.c_str(), (int)total_sent, (int)r->contentLen); + printf("[%s] HTTP code=%d early=%d, time=%fs %s %s contentLen=%d [%d out, response content len %d]\n", + conn->getDebugID().toString().c_str(), r->code, earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent, (int)r->contentLen); if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 2) printf("[%s] HTTP RESPONSE: %s %s\n%s\n", conn->getDebugID().toString().c_str(), verb.c_str(), resource.c_str(), r->toString().c_str()); return r; } catch(Error &e) { double elapsed = timer() - send_start; if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0) - printf("[%s] HTTP *ERROR*=%s, time=%fs %s %s [%u out]\n", conn->getDebugID().toString().c_str(), e.name(), elapsed, verb.c_str(), resource.c_str(), (int)total_sent); + printf("[%s] HTTP *ERROR*=%s early=%d, time=%fs %s %s contentLen=%d [%d out]\n", + conn->getDebugID().toString().c_str(), e.name(), earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent); throw; } } diff --git a/fdbrpc/JSONDoc.h b/fdbrpc/JSONDoc.h index a368996325..ba66581baa 100644 --- a/fdbrpc/JSONDoc.h +++ b/fdbrpc/JSONDoc.h @@ -219,11 +219,12 @@ struct JSONDoc { return mergeOperator(op, op_a, op_b, a.get_value(), b.get_value()); } - static inline std::string getOperator(const json_spirit::mObject &obj) { + static inline const std::string & getOperator(const json_spirit::mObject &obj) { + static const std::string empty; for(auto &k : obj) if(!k.first.empty() && k.first[0] == '$') return k.first; - return std::string(); + return empty; } // Merge src into dest, applying merge operators diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index eb9cd17b34..ecc635ce33 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -734,7 +734,7 @@ public: if ( oldMasterFit < newMasterFit ) return false; - if ( oldMasterFit > newMasterFit ) + if ( oldMasterFit > newMasterFit || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.first.locality.processId() != clusterControllerProcessId ) ) return true; // Check tLog fitness @@ -840,7 +840,8 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster state double recoveryStart = now(); TraceEvent("CCWDB", cluster->id).detail("Recruiting", "Master"); state std::pair masterWorker = cluster->getMasterWorker(db->config); - if( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) { + if( ( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.first.locality.processId() == cluster->clusterControllerProcessId ) + && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) { TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.second.machineClassFitness( ProcessClass::Master )); Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) ); continue; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index ebf4db028e..797b969534 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -259,8 +259,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( SIM_SHUTDOWN_TIMEOUT, 10 ); init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0; init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0; - init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 ); - init( ATTEMPT_RECRUITMENT_DELAY, 0.05 ); + init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 0.1 ); + init( ATTEMPT_RECRUITMENT_DELAY, 0.035 ); init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0; init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001; init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0; diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index cb98a1efdc..58046b0a90 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -1933,6 +1933,8 @@ TEST_CASE("status/json/merging") { a.create("not_expired_and_merged.$expires.seven.$sum") = 1; a.create("not_expired_and_merged.$expires.one.$min") = 3; a.create("not_expired_and_merged.version") = 3; + a.create("mixed_numeric_sum_6.$sum") = 0.5; + a.create("mixed_numeric_min_0.$min") = 1.5; b.create("int_one") = 1; b.create("int_unmatched") = 3; @@ -1956,6 +1958,8 @@ TEST_CASE("status/json/merging") { b.create("latest_obj.timestamp") = 2; b.create("latest_int_5.$latest") = 7; b.create("latest_int_5.timestamp") = 2; + b.create("mixed_numeric_sum_6.$sum") = 1; + b.create("mixed_numeric_min_0.$min") = 4.5; c.create("int_total_30.$sum") = 0; c.create("not_expired.$expires") = "I am still valid"; @@ -1971,18 +1975,25 @@ TEST_CASE("status/json/merging") { c.create("latest_obj.$latest.not_expired.$expires") = "Still alive."; c.create("latest_obj.$latest.not_expired.version") = 3; c.create("latest_obj.timestamp") = 3; - b.create("latest_int_5.$latest") = 5; - b.create("latest_int_5.timestamp") = 3; + c.create("latest_int_5.$latest") = 5; + c.create("latest_int_5.timestamp") = 3; + c.create("mixed_numeric_sum_6.$sum") = 4.5; + c.create("mixed_numeric_min_0.$min") = (double)0.0; + + printf("a = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str()); + printf("b = \n%s\n", json_spirit::write_string(json_spirit::mValue(objB), json_spirit::pretty_print).c_str()); + printf("c = \n%s\n", json_spirit::write_string(json_spirit::mValue(objC), json_spirit::pretty_print).c_str()); JSONDoc::expires_reference_version = 2; a.absorb(b); a.absorb(c); a.cleanOps(); + printf("result = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str()); std::string result = json_spirit::write_string(json_spirit::mValue(objA)); - std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}"; + std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"mixed_numeric_min_0\":0,\"mixed_numeric_sum_6\":6,\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}"; if(result != expected) { - printf("ERROR: Combined doc does not match expected.\nexpected: %s\nresult: %s\n", expected.c_str(), result.c_str()); + printf("ERROR: Combined doc does not match expected.\nexpected:\n\n%s\nresult:\n%s\n", expected.c_str(), result.c_str()); ASSERT(false); } diff --git a/fdbserver/workloads/BackupCorrectness.actor.cpp b/fdbserver/workloads/BackupCorrectness.actor.cpp index d70ad4fd8e..c9519282c5 100644 --- a/fdbserver/workloads/BackupCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupCorrectness.actor.cpp @@ -124,6 +124,15 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { } } + ACTOR static Future statusLoop(Database cx, std::string tag) { + state FileBackupAgent agent; + loop { + std::string status = wait(agent.getStatus(cx, true, tag)); + puts(status.c_str()); + Void _ = wait(delay(2.0)); + } + } + ACTOR static Future doBackup(BackupAndRestoreCorrectnessWorkload* self, double startDelay, FileBackupAgent* backupAgent, Database cx, Key tag, Standalone> backupRanges, double stopDifferentialDelay, Promise submittted) { @@ -148,6 +157,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { TraceEvent("BARW_doBackupSubmitBackup", randomID).detail("tag", printable(tag)).detail("stopWhenDone", stopDifferentialDelay ? "False" : "True"); state std::string backupContainer = "file://simfdb/backups/"; + state Future status = statusLoop(cx, tag.toString()); + try { Void _ = wait(backupAgent->submitBackup(cx, StringRef(backupContainer), g_random->randomInt(0, 100), tag.toString(), backupRanges, stopDifferentialDelay ? false : true)); } diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 1c4396cc9a..30db98234d 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -835,7 +835,7 @@ Future< Reference > Net2::connect( NetworkAddress toAddr ) { } ACTOR static Future> resolveTCPEndpoint_impl( Net2 *self, std::string host, std::string service) { - state Promise> result; + Promise> result; self->tcpResolver.async_resolve(tcp::resolver::query(host, service), [=](const boost::system::error_code &ec, tcp::resolver::iterator iter) { if(ec) { diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 372103b11b..02d69c3dad 100644 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -174,6 +174,8 @@ ERROR( backup_bad_block_size, 2313, "Backup file block size too small") ERROR( backup_invalid_url, 2314, "Backup Container URL invalid") ERROR( backup_invalid_info, 2315, "Backup Container URL invalid") ERROR( backup_cannot_expire, 2316, "Cannot expire requested data from backup without violating minimum restorability") +ERROR( backup_auth_missing, 2317, "Cannot find authentication details (such as a password or secret key) for the specified Backup Container URL") +ERROR( backup_auth_unreadable, 2318, "Cannot read or parse one or more sources of authentication information for Backup Container URLs") ERROR( restore_invalid_version, 2361, "Invalid restore version") ERROR( restore_corrupted_data, 2362, "Corrupted backup data") ERROR( restore_missing_data, 2363, "Missing backup data") diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index fe51d7153a..e45c5e5179 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1112,6 +1112,18 @@ ACTOR template void tagAndForwardError( Promise* pOutputPromise, Er out.sendError(value); } +ACTOR template Future waitOrError(Future f, Future errorSignal) { + choose { + when(T val = wait(f)) { + return val; + } + when(Void _ = wait(errorSignal)) { + ASSERT(false); + throw internal_error(); + } + } +} + struct FlowLock : NonCopyable, public ReferenceCounted { // FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between // wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the diff --git a/packaging/deb/builddebs.sh b/packaging/deb/builddebs.sh index 21587bae27..24f17a5c6e 100755 --- a/packaging/deb/builddebs.sh +++ b/packaging/deb/builddebs.sh @@ -60,7 +60,6 @@ chmod 0644 $CLIENTSDIR/usr/share/doc/foundationdb-clients/README install -m 0755 bin/fdbbackup $CLIENTSDIR/usr/lib/foundationdb/backup_agent/backup_agent ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbbackup ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbrestore -ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbblob ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/fdbdr ln -s ../lib/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/bin/dr_agent diff --git a/packaging/msi/FDBInstaller.wxs b/packaging/msi/FDBInstaller.wxs index d71aa8fa29..e49398eb30 100644 --- a/packaging/msi/FDBInstaller.wxs +++ b/packaging/msi/FDBInstaller.wxs @@ -32,7 +32,7 @@ - diff --git a/packaging/osx/buildpkg.sh b/packaging/osx/buildpkg.sh index 5252521961..7109810078 100755 --- a/packaging/osx/buildpkg.sh +++ b/packaging/osx/buildpkg.sh @@ -33,7 +33,6 @@ dos2unix README.md $CLIENTSDIR/usr/local/foundationdb/README chmod 0644 $CLIENTSDIR/usr/local/foundationdb/README ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbbackup ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbrestore -ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbblob ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/fdbdr ln -s /usr/local/foundationdb/backup_agent/backup_agent $CLIENTSDIR/usr/local/bin/dr_agent diff --git a/packaging/osx/uninstall-FoundationDB.sh b/packaging/osx/uninstall-FoundationDB.sh index a628f5fa00..4fbe418817 100644 --- a/packaging/osx/uninstall-FoundationDB.sh +++ b/packaging/osx/uninstall-FoundationDB.sh @@ -1,7 +1,7 @@ #!/bin/bash -x rm -f /usr/local/libexec/{fdbserver,fdbmonitor} -rm -f /usr/local/bin/{fdbcli,fdbbackup,fdbrestore,fdbblob,fdbdr} +rm -f /usr/local/bin/{fdbcli,fdbbackup,fdbrestore,fdbdr} rm -f /usr/local/lib/libfdb_c.dylib rm -rf /usr/local/include/foundationdb rm -rf /usr/local/foundationdb/backup_agent diff --git a/packaging/rpm/buildrpms.sh b/packaging/rpm/buildrpms.sh index c65c36f830..3eb189d0e6 100755 --- a/packaging/rpm/buildrpms.sh +++ b/packaging/rpm/buildrpms.sh @@ -44,7 +44,6 @@ install -m 0755 packaging/make_public.py $INSTDIR/usr/lib/foundationdb ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbbackup ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbrestore -ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbblob ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/fdbdr ln -s ../lib/foundationdb/backup_agent/backup_agent $INSTDIR/usr/bin/dr_agent diff --git a/packaging/rpm/foundationdb.spec.in b/packaging/rpm/foundationdb.spec.in index 3c8d5b4fd5..65dd2812bf 100644 --- a/packaging/rpm/foundationdb.spec.in +++ b/packaging/rpm/foundationdb.spec.in @@ -133,6 +133,5 @@ ifdef(`RHEL6', `/usr/lib/foundationdb/argparse.py') /usr/bin/fdbdr /usr/bin/fdbbackup /usr/bin/fdbrestore -/usr/bin/fdbblob /usr/lib64/libfdb_c.so /usr/include/* diff --git a/versions.target b/versions.target index f9b9192f0a..10c96af7dd 100644 --- a/versions.target +++ b/versions.target @@ -1,7 +1,7 @@ - 5.1.1 + 5.1.3 5.1