From 84f7565d046ce2313dd224365e2dd75c75d94aa2 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 31 Jan 2018 10:03:09 -0800 Subject: [PATCH 01/17] MultiVersionApi::createCluster was throwing network_not_setup rather than returning it. --- fdbclient/MultiVersionTransaction.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 6103e22003..5bd8f295a3 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -1242,7 +1242,7 @@ ThreadFuture> MultiVersionApi::createCluster(const char *clu lock.enter(); if(!networkSetup) { lock.leave(); - throw network_not_setup(); + return network_not_setup(); } lock.leave(); From 41ebc3409712462619613bc6ea0d9891330a8825 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 31 Jan 2018 11:13:57 -0800 Subject: [PATCH 02/17] Fix some weird spacing --- fdbclient/NativeAPI.actor.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index c9eac5d495..ecaa848d4f 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -464,8 +464,9 @@ DatabaseContext::DatabaseContext( transactionsReadVersions(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsPastVersions(0), transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID), outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware), - latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) { - logger = databaseLogger( this ); + latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) +{ + logger = databaseLogger( this ); locationCacheSize = g_network->isSimulated() ? CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE_SIM : CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE; From 0c601d6f85034b27286b22c16d4cef440efe648f Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 31 Jan 2018 12:05:41 -0800 Subject: [PATCH 03/17] Purge past version references --- fdbclient/DatabaseContext.h | 2 +- fdbclient/NativeAPI.actor.cpp | 6 +++--- fdbserver/storageserver.actor.cpp | 16 ++++++++-------- fdbserver/workloads/Cycle.actor.cpp | 8 ++++---- fdbserver/workloads/Increment.actor.cpp | 8 ++++---- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index c0acc7a312..163556f5ee 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -135,7 +135,7 @@ public: int64_t transactionsReadVersions; int64_t transactionsCommitStarted; int64_t transactionsCommitCompleted; - int64_t transactionsPastVersions; + int64_t transactionsTooOld; int64_t transactionsFutureVersions; int64_t transactionsNotCommitted; int64_t transactionsMaybeCommitted; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index ecaa848d4f..f471560f7d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -209,7 +209,7 @@ ACTOR Future databaseLogger( DatabaseContext *cx ) { .detail("ReadVersions", cx->transactionsReadVersions) .detail("CommitStarted", cx->transactionsCommitStarted) .detail("CommitCompleted", cx->transactionsCommitCompleted) - .detail("PastVersions", cx->transactionsPastVersions) + .detail("TooOld", cx->transactionsTooOld) .detail("FutureVersions", cx->transactionsFutureVersions) .detail("NotCommitted", cx->transactionsNotCommitted) .detail("MaybeCommitted", cx->transactionsMaybeCommitted) @@ -461,7 +461,7 @@ DatabaseContext::DatabaseContext( Standalone dbName, Standalone dbId, int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware ) : clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId), - transactionsReadVersions(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsPastVersions(0), + transactionsReadVersions(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID), outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) @@ -2765,7 +2765,7 @@ Future Transaction::onError( Error const& e ) { e.code() == error_code_future_version) { if( e.code() == error_code_transaction_too_old ) - cx->transactionsPastVersions++; + cx->transactionsTooOld++; else if( e.code() == error_code_future_version ) cx->transactionsFutureVersions++; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 02ba30b7e8..f56bd5f942 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -631,7 +631,7 @@ ACTOR Future waitForVersion( StorageServer* data, Version version ) { } } -ACTOR Future waitForVersionNoPastVersion( StorageServer* data, Version version ) { +ACTOR Future waitForVersionNoTooOld( StorageServer* data, Version version ) { // This could become an Actor transparently, but for now it just does the lookup if (version == latestVersion) version = std::max(Version(1), data->version.get()); @@ -735,7 +735,7 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) if( req.debugID.present() ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask()); - Version version = wait( waitForVersionNoPastVersion( data, req.version ) ); + Version version = wait( waitForVersionNoTooOld( data, req.version ) ); if( req.debugID.present() ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); @@ -1624,13 +1624,13 @@ void coalesceShards(StorageServer *data, KeyRangeRef keys) { } } -ACTOR Future> tryGetRange( Database cx, Version version, KeyRangeRef keys, GetRangeLimits limits, bool* isPastVersion ) { +ACTOR Future> tryGetRange( Database cx, Version version, KeyRangeRef keys, GetRangeLimits limits, bool* isTooOld ) { state Transaction tr( cx ); state Standalone output; state KeySelectorRef begin = firstGreaterOrEqual( keys.begin ); state KeySelectorRef end = firstGreaterOrEqual( keys.end ); - if( *isPastVersion ) + if( *isTooOld ) throw transaction_too_old(); tr.setVersion( version ); @@ -1672,7 +1672,7 @@ ACTOR Future> tryGetRange( Database cx, Version versi } catch( Error &e ) { if( begin.getKey() != keys.begin && ( e.code() == error_code_transaction_too_old || e.code() == error_code_future_version ) ) { if( e.code() == error_code_transaction_too_old ) - *isPastVersion = true; + *isTooOld = true; output.more = true; if( begin.isFirstGreaterOrEqual() ) output.readThrough = begin.getKey(); @@ -1769,13 +1769,13 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { // Get the history state int debug_getRangeRetries = 0; state int debug_nextRetryToLog = 1; - state bool isPastVersion = false; + state bool isTooOld = false; loop { try { TEST(true); // Fetching keys for transferred shard - state Standalone this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isPastVersion ) ); + state Standalone this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isTooOld ) ); int expectedSize = (int)this_block.expectedSize() + (8-(int)sizeof(KeyValueRef))*this_block.size(); @@ -1846,7 +1846,7 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { Void _ = wait( delayJittered( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) ); Version lastFV = fetchVersion; fetchVersion = data->version.get(); - isPastVersion = false; + isTooOld = false; // Throw away deferred updates from before fetchVersion, since we don't need them to use blocks fetched at that version while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front(); diff --git a/fdbserver/workloads/Cycle.actor.cpp b/fdbserver/workloads/Cycle.actor.cpp index 7079342a77..a57176a483 100644 --- a/fdbserver/workloads/Cycle.actor.cpp +++ b/fdbserver/workloads/Cycle.actor.cpp @@ -30,13 +30,13 @@ struct CycleWorkload : TestWorkload { Key keyPrefix; vector> clients; - PerfIntCounter transactions, retries, pastVersionRetries, commitFailedRetries; + PerfIntCounter transactions, retries, tooOldRetries, commitFailedRetries; PerfDoubleCounter totalLatency; CycleWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), transactions("Transactions"), retries("Retries"), totalLatency("Latency"), - pastVersionRetries("Retries.past_version"), commitFailedRetries("Retries.commit_failed") + tooOldRetries("Retries.too_old"), commitFailedRetries("Retries.commit_failed") { testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 ); transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 ) / clientCount; @@ -69,7 +69,7 @@ struct CycleWorkload : TestWorkload { virtual void getMetrics( vector& m ) { m.push_back( transactions.getMetric() ); m.push_back( retries.getMetric() ); - m.push_back( pastVersionRetries.getMetric() ); + m.push_back( tooOldRetries.getMetric() ); m.push_back( commitFailedRetries.getMetric() ); m.push_back( PerfMetric( "Avg Latency (ms)", 1000 * totalLatency.getValue() / transactions.getValue(), true ) ); m.push_back( PerfMetric( "Read rows/simsec (approx)", transactions.getValue() * 3 / testDuration, false ) ); @@ -120,7 +120,7 @@ struct CycleWorkload : TestWorkload { //TraceEvent("CycleCommit"); break; } catch (Error& e) { - if (e.code() == error_code_transaction_too_old) ++self->pastVersionRetries; + if (e.code() == error_code_transaction_too_old) ++self->tooOldRetries; else if (e.code() == error_code_not_committed) ++self->commitFailedRetries; Void _ = wait( tr.onError(e) ); } diff --git a/fdbserver/workloads/Increment.actor.cpp b/fdbserver/workloads/Increment.actor.cpp index 9f1e00c94e..cc6045e902 100644 --- a/fdbserver/workloads/Increment.actor.cpp +++ b/fdbserver/workloads/Increment.actor.cpp @@ -29,13 +29,13 @@ struct Increment : TestWorkload { double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond; vector> clients; - PerfIntCounter transactions, retries, pastVersionRetries, commitFailedRetries; + PerfIntCounter transactions, retries, tooOldRetries, commitFailedRetries; PerfDoubleCounter totalLatency; Increment(WorkloadContext const& wcx) : TestWorkload(wcx), transactions("Transactions"), retries("Retries"), totalLatency("Latency"), - pastVersionRetries("Retries.past_version"), commitFailedRetries("Retries.commit_failed") + tooOldRetries("Retries.too_old"), commitFailedRetries("Retries.commit_failed") { testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 ); transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 ); @@ -68,7 +68,7 @@ struct Increment : TestWorkload { virtual void getMetrics( vector& m ) { m.push_back( transactions.getMetric() ); m.push_back( retries.getMetric() ); - m.push_back( pastVersionRetries.getMetric() ); + m.push_back( tooOldRetries.getMetric() ); m.push_back( commitFailedRetries.getMetric() ); m.push_back( PerfMetric( "Avg Latency (ms)", 1000 * totalLatency.getValue() / transactions.getValue(), true ) ); m.push_back( PerfMetric( "Read rows/simsec (approx)", transactions.getValue() * 3 / testDuration, false ) ); @@ -94,7 +94,7 @@ struct Increment : TestWorkload { Void _ = wait( tr.commit() ); break; } catch (Error& e) { - if (e.code() == error_code_transaction_too_old) ++self->pastVersionRetries; + if (e.code() == error_code_transaction_too_old) ++self->tooOldRetries; else if (e.code() == error_code_not_committed) ++self->commitFailedRetries; Void _ = wait( tr.onError(e) ); } From 080a454051beafd6c01b935ebef3590785d134ca Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 31 Jan 2018 13:47:36 -0800 Subject: [PATCH 04/17] fix: getVersionstamp would return broken promise if a transaction was disposed before being set. getAddressesForKey would not return when resetPromise was set. --- fdbclient/ReadYourWrites.actor.cpp | 10 +++++++++- fdbclient/ReadYourWrites.h | 8 +------- flow/genericactors.actor.h | 12 ++++++++++++ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 290c8f6615..f2c4889bd6 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1307,7 +1307,7 @@ Future< Standalone >> ReadYourWritesTransaction::getAddre // If key >= allKeys.end, then our resulting address vector will be empty. - Future< Standalone >> result = tr.getAddressesForKey(key); + Future< Standalone >> result = waitOrError(tr.getAddressesForKey(key), resetPromise.getFuture()); reading.add( success( result ) ); return result; } @@ -1681,6 +1681,14 @@ Future ReadYourWritesTransaction::commit() { return RYWImpl::commit( this ); } +Future> ReadYourWritesTransaction::getVersionstamp() { + if(checkUsedDuringCommit()) { + return used_during_commit(); + } + + return waitOrError(tr.getVersionstamp(), resetPromise.getFuture()); +} + void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional value ) { switch(option) { case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE: diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 7fdc52b140..9bf97402e9 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -103,13 +103,7 @@ public: Future commit(); Version getCommittedVersion() { return tr.getCommittedVersion(); } - Future> getVersionstamp() { - if(checkUsedDuringCommit()) { - return used_during_commit(); - } - - return tr.getVersionstamp(); - } + Future> getVersionstamp(); void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ); diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index fe51d7153a..e45c5e5179 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1112,6 +1112,18 @@ ACTOR template void tagAndForwardError( Promise* pOutputPromise, Er out.sendError(value); } +ACTOR template Future waitOrError(Future f, Future errorSignal) { + choose { + when(T val = wait(f)) { + return val; + } + when(Void _ = wait(errorSignal)) { + ASSERT(false); + throw internal_error(); + } + } +} + struct FlowLock : NonCopyable, public ReferenceCounted { // FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between // wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the From b7dde8802940a27388687f33e5e9e8eff4001672 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Tue, 6 Feb 2018 11:30:05 -0800 Subject: [PATCH 05/17] fix: the cluster controller did not consider the master sharing the same process as the cluster controller as bad in all needed locations waited too long for good recruitment locations, which would add too much time to recoveries of clusters that do not use machine classes --- fdbserver/ClusterController.actor.cpp | 5 +++-- fdbserver/Knobs.cpp | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index eb9cd17b34..ecc635ce33 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -734,7 +734,7 @@ public: if ( oldMasterFit < newMasterFit ) return false; - if ( oldMasterFit > newMasterFit ) + if ( oldMasterFit > newMasterFit || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.first.locality.processId() != clusterControllerProcessId ) ) return true; // Check tLog fitness @@ -840,7 +840,8 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster state double recoveryStart = now(); TraceEvent("CCWDB", cluster->id).detail("Recruiting", "Master"); state std::pair masterWorker = cluster->getMasterWorker(db->config); - if( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) { + if( ( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.first.locality.processId() == cluster->clusterControllerProcessId ) + && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) { TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.second.machineClassFitness( ProcessClass::Master )); Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) ); continue; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index ebf4db028e..80e0e0351a 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -259,8 +259,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( SIM_SHUTDOWN_TIMEOUT, 10 ); init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0; init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0; - init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 ); - init( ATTEMPT_RECRUITMENT_DELAY, 0.05 ); + init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 0.1 ); + init( ATTEMPT_RECRUITMENT_DELAY, 0.35 ); init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0; init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001; init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0; From 0792d5e3ddba4a3939b29a2351dd9d8952520087 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Tue, 6 Feb 2018 13:44:04 -0800 Subject: [PATCH 06/17] Fix: last restorable version for a backup tag name (a separate value from the latest restorable version for a configured backup) was not being updated. Fix: backup blob speed was sometimes an error because the JSON $sum merge operator did not support mixed numeric types. Fix: JSON merge operator handling was squashing errors in some cases, which was generally obscuring the backup speed metric issue. Cleaned up some of the JSON object merging logic. Improved error messages in JSON merge operators. Added JSON merge operator tests for mixed numeric math and improved readability of test output. --- fdbclient/FileBackupAgent.actor.cpp | 18 +++++- fdbclient/StatusClient.actor.cpp | 95 +++++++++++++++++------------ fdbrpc/JSONDoc.h | 5 +- fdbserver/Status.actor.cpp | 19 ++++-- 4 files changed, 89 insertions(+), 48 deletions(-) diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index b98c4f1263..a5370486d4 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -1799,10 +1799,17 @@ namespace fileBackup { state bool stopWhenDone; state Optional restorableVersion; state EBackupState backupState; + state Optional tag; Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone) && store(config.getLatestRestorableVersion(tr), restorableVersion) - && store(config.stateEnum().getOrThrow(tr), backupState)); + && store(config.stateEnum().getOrThrow(tr), backupState) + && store(config.tag().get(tr), tag)); + + // If restorable, update the last restorable version for this tag + if(restorableVersion.present() && tag.present()) { + FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get()); + } // If the backup is restorable but the state is not differential then set state to differential if(restorableVersion.present() && backupState != BackupAgentBase::STATE_DIFFERENTIAL) @@ -2014,11 +2021,18 @@ namespace fileBackup { state EBackupState backupState; state Optional restorableVersion; state Optional firstSnapshotEndVersion; + state Optional tag; Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone) && store(config.stateEnum().getOrThrow(tr), backupState) && store(config.getLatestRestorableVersion(tr), restorableVersion) - && store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion)); + && store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion) + && store(config.tag().get(tr), tag)); + + // If restorable, update the last restorable version for this tag + if(restorableVersion.present() && tag.present()) { + FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get()); + } if(!firstSnapshotEndVersion.present()) { config.firstSnapshotEndVersion().set(tr, Params.endVersion().get(task)); diff --git a/fdbclient/StatusClient.actor.cpp b/fdbclient/StatusClient.actor.cpp index 2814f58fd0..a4ddf05d2b 100644 --- a/fdbclient/StatusClient.actor.cpp +++ b/fdbclient/StatusClient.actor.cpp @@ -146,98 +146,113 @@ void JSONDoc::mergeInto(json_spirit::mObject &dst, const json_spirit::mObject &s } } -void JSONDoc::mergeValueInto(json_spirit::mValue &d, const json_spirit::mValue &s) { - if(s.is_null()) +void JSONDoc::mergeValueInto(json_spirit::mValue &dst, const json_spirit::mValue &src) { + if(src.is_null()) return; - if(d.is_null()) { - d = s; + if(dst.is_null()) { + dst = src; return; } - if(d.type() != s.type()) { - // Skip errors already found - if(d.type() == json_spirit::obj_type && d.get_obj().count("ERROR")) - return; - d = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", d}, {"b", s}}); + // Do nothing if d is already an error + if(dst.type() == json_spirit::obj_type && dst.get_obj().count("ERROR")) + return; + + if(dst.type() != src.type()) { + dst = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", dst}, {"b", src}}); return; } - switch(d.type()) { + switch(dst.type()) { case json_spirit::obj_type: { - std::string op = getOperator(d.get_obj()); - - //printf("Operator: %s\n", op.c_str()); + // Refs to the objects, for convenience. + json_spirit::mObject &aObj = dst.get_obj(); + const json_spirit::mObject &bObj = src.get_obj(); + const std::string &op = getOperator(aObj); + const std::string &opB = getOperator(bObj); + + // Operators must be the same, which could mean both are empty (if these objects are not operators) + if(op != opB) { + dst = json_spirit::mObject({ {"ERROR", "Operators do not match"}, {"a", dst}, {"b", src} }); + break; + } + + // If objects are not operators then defer to mergeInto if(op.empty()) { - mergeInto(d.get_obj(), s.get_obj()); + mergeInto(dst.get_obj(), src.get_obj()); break; } - // Refs to the operator objects to combine - const json_spirit::mObject &op_a = d.get_obj(); - const json_spirit::mObject &op_b = s.get_obj(); - - if(!op_b.count(op)) { - d = json_spirit::mObject({{"ERROR", "Operators do not match"}, {"s", s}, {"d", d}}); - break; - } - - const json_spirit::mValue &a = d.get_obj().at(op); - const json_spirit::mValue &b = s.get_obj().at(op); + // Get the operator values + json_spirit::mValue &a = aObj.at(op); + const json_spirit::mValue &b = bObj.at(op); // First try the operators that are type-agnostic try { - d = mergeOperator(op, op_a, op_b, a, b); + dst = mergeOperator(op, aObj, bObj, a, b); return; } catch(std::exception &e) { } - // If that didn't work, the types must match or we have no operators left to try. + // Now try type and type pair specific operators + // First, if types are incompatible try to make them compatible or return an error if(a.type() != b.type()) { - d = json_spirit::mObject({{"ERROR", "Types do not match"}, {"s", s}, {"d", d}}); - return; + // It's actually okay if the type mismatch is double vs int since once can be converted to the other. + if( (a.type() == json_spirit::int_type && b.type() == json_spirit::real_type) + || (b.type() == json_spirit::int_type && a.type() == json_spirit::real_type) ) + { + // Convert d's op value (which a is a reference to) to a double so that the + // switch block below will do the operation with doubles. + a = a.get_real(); + } + else { + // Otherwise, output an error as the types do not match + dst = json_spirit::mObject({{"ERROR", "Incompatible operator value types"}, {"a", dst}, {"b", src}}); + return; + } } // Now try the type-specific operators. try { switch(a.type()) { case json_spirit::bool_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::int_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::real_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::str_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::array_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::obj_type: - d = mergeOperatorWrapper(op, op_a, op_b, a, b); + dst = mergeOperatorWrapper(op, aObj, bObj, a, b); break; case json_spirit::null_type: break; } } catch(...) { - d = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}}); + dst = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}}); } break; } case json_spirit::array_type: - for(auto &ai : s.get_array()) - d.get_array().push_back(ai); + for(auto &ai : src.get_array()) + dst.get_array().push_back(ai); break; default: - if(d != s) - d = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", d}, {"b", s}}); + if(dst != src) + dst = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", dst}, {"b", src}}); } } diff --git a/fdbrpc/JSONDoc.h b/fdbrpc/JSONDoc.h index a368996325..ba66581baa 100644 --- a/fdbrpc/JSONDoc.h +++ b/fdbrpc/JSONDoc.h @@ -219,11 +219,12 @@ struct JSONDoc { return mergeOperator(op, op_a, op_b, a.get_value(), b.get_value()); } - static inline std::string getOperator(const json_spirit::mObject &obj) { + static inline const std::string & getOperator(const json_spirit::mObject &obj) { + static const std::string empty; for(auto &k : obj) if(!k.first.empty() && k.first[0] == '$') return k.first; - return std::string(); + return empty; } // Merge src into dest, applying merge operators diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index cb98a1efdc..58046b0a90 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -1933,6 +1933,8 @@ TEST_CASE("status/json/merging") { a.create("not_expired_and_merged.$expires.seven.$sum") = 1; a.create("not_expired_and_merged.$expires.one.$min") = 3; a.create("not_expired_and_merged.version") = 3; + a.create("mixed_numeric_sum_6.$sum") = 0.5; + a.create("mixed_numeric_min_0.$min") = 1.5; b.create("int_one") = 1; b.create("int_unmatched") = 3; @@ -1956,6 +1958,8 @@ TEST_CASE("status/json/merging") { b.create("latest_obj.timestamp") = 2; b.create("latest_int_5.$latest") = 7; b.create("latest_int_5.timestamp") = 2; + b.create("mixed_numeric_sum_6.$sum") = 1; + b.create("mixed_numeric_min_0.$min") = 4.5; c.create("int_total_30.$sum") = 0; c.create("not_expired.$expires") = "I am still valid"; @@ -1971,18 +1975,25 @@ TEST_CASE("status/json/merging") { c.create("latest_obj.$latest.not_expired.$expires") = "Still alive."; c.create("latest_obj.$latest.not_expired.version") = 3; c.create("latest_obj.timestamp") = 3; - b.create("latest_int_5.$latest") = 5; - b.create("latest_int_5.timestamp") = 3; + c.create("latest_int_5.$latest") = 5; + c.create("latest_int_5.timestamp") = 3; + c.create("mixed_numeric_sum_6.$sum") = 4.5; + c.create("mixed_numeric_min_0.$min") = (double)0.0; + + printf("a = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str()); + printf("b = \n%s\n", json_spirit::write_string(json_spirit::mValue(objB), json_spirit::pretty_print).c_str()); + printf("c = \n%s\n", json_spirit::write_string(json_spirit::mValue(objC), json_spirit::pretty_print).c_str()); JSONDoc::expires_reference_version = 2; a.absorb(b); a.absorb(c); a.cleanOps(); + printf("result = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str()); std::string result = json_spirit::write_string(json_spirit::mValue(objA)); - std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}"; + std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"mixed_numeric_min_0\":0,\"mixed_numeric_sum_6\":6,\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}"; if(result != expected) { - printf("ERROR: Combined doc does not match expected.\nexpected: %s\nresult: %s\n", expected.c_str(), result.c_str()); + printf("ERROR: Combined doc does not match expected.\nexpected:\n\n%s\nresult:\n%s\n", expected.c_str(), result.c_str()); ASSERT(false); } From d0caffd3395f0aa2b14c6508914a7ad2f8f59be5 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Tue, 6 Feb 2018 18:11:45 -0800 Subject: [PATCH 07/17] fix: knob was set to incorrect value --- fdbserver/Knobs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 80e0e0351a..797b969534 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -260,7 +260,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0; init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0; init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 0.1 ); - init( ATTEMPT_RECRUITMENT_DELAY, 0.35 ); + init( ATTEMPT_RECRUITMENT_DELAY, 0.035 ); init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0; init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001; init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0; From d8879dc3f302a5ea8573bd47eedcba262232fc5c Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Wed, 7 Feb 2018 10:38:31 -0800 Subject: [PATCH 08/17] HTTP::doRequest() now reads responses in parallel with sending requests, so if the server responds before receiving all of the the request the client can stop sending the remainder of the request. For PUT requests which upload files, this prevents sending potentially several megabytes of unnecessary bytes if the server responds with an error (such as 429) before the request is completely sent. Updated the backup container unit test to use more parallelism in order to test this new behavior. --- fdbclient/BackupContainer.actor.cpp | 22 +++++++++++----------- fdbrpc/HTTP.actor.cpp | 29 ++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index 12a732cc89..5288eae43d 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -1329,23 +1329,23 @@ ACTOR Future testBackupContainer(std::string url) { state int64_t versionShift = g_random->randomInt64(0, std::numeric_limits::max() - 500); state Reference log1 = wait(c->writeLogFile(100 + versionShift, 150 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, log1, 0)); - state Reference log2 = wait(c->writeLogFile(150 + versionShift, 300 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000))); - state Reference range1 = wait(c->writeRangeFile(160 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000))); - state Reference range2 = wait(c->writeRangeFile(300 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000))); - state Reference range3 = wait(c->writeRangeFile(310 + versionShift, 10)); - Void _ = wait(writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000))); - Void _ = wait(c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size())); + Void _ = wait( + writeAndVerifyFile(c, log1, 0) + && writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000)) + && writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000)) + && writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000)) + && writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000)) + ); - Void _ = wait(c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size())); + Void _ = wait( + c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size()) + && c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size()) + ); printf("Checking file list dump\n"); FullBackupListing listing = wait(c->dumpFileList()); diff --git a/fdbrpc/HTTP.actor.cpp b/fdbrpc/HTTP.actor.cpp index 08a1cff63b..63bb24af78 100644 --- a/fdbrpc/HTTP.actor.cpp +++ b/fdbrpc/HTTP.actor.cpp @@ -307,6 +307,9 @@ namespace HTTP { if(pContent == NULL) pContent = ∅ + state bool earlyResponse = false; + state int total_sent = 0; + try { // Write headers to a packet buffer chain PacketBuffer *pFirst = new PacketBuffer(); @@ -321,11 +324,25 @@ namespace HTTP { printf("Request Header: %s: %s\n", h.first.c_str(), h.second.c_str()); } + state Reference r(new HTTP::Response()); + state Future responseReading = r->read(conn, verb == "HEAD" || verb == "DELETE"); + state double send_start = timer(); - state double total_sent = 0; + loop { Void _ = wait(conn->onWritable()); Void _ = wait( delay( 0, TaskWriteSocket ) ); + + // If we already got a response, before finishing sending the request, then close the connection, + // set the Connection header to "close" as a hint to the caller that this connection can't be used + // again, and break out of the send loop. + if(responseReading.isReady()) { + conn->close(); + r->headers["Connection"] = "close"; + earlyResponse = true; + break; + } + state int trySend = CLIENT_KNOBS->HTTP_SEND_SIZE; Void _ = wait(sendRate->getAllowance(trySend)); int len = conn->write(pContent->getUnsent(), trySend); @@ -338,18 +355,20 @@ namespace HTTP { break; } - state Reference r(new HTTP::Response()); - Void _ = wait(r->read(conn, verb == "HEAD" || verb == "DELETE")); + Void _ = wait(responseReading); + double elapsed = timer() - send_start; if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0) - printf("[%s] HTTP code=%d, time=%fs %s %s [%u out, response content len %d]\n", conn->getDebugID().toString().c_str(), r->code, elapsed, verb.c_str(), resource.c_str(), (int)total_sent, (int)r->contentLen); + printf("[%s] HTTP code=%d early=%d, time=%fs %s %s contentLen=%d [%d out, response content len %d]\n", + conn->getDebugID().toString().c_str(), r->code, earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent, (int)r->contentLen); if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 2) printf("[%s] HTTP RESPONSE: %s %s\n%s\n", conn->getDebugID().toString().c_str(), verb.c_str(), resource.c_str(), r->toString().c_str()); return r; } catch(Error &e) { double elapsed = timer() - send_start; if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0) - printf("[%s] HTTP *ERROR*=%s, time=%fs %s %s [%u out]\n", conn->getDebugID().toString().c_str(), e.name(), elapsed, verb.c_str(), resource.c_str(), (int)total_sent); + printf("[%s] HTTP *ERROR*=%s early=%d, time=%fs %s %s contentLen=%d [%d out]\n", + conn->getDebugID().toString().c_str(), e.name(), earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent); throw; } } From e06f80bb6e5709fb929c8f433b364bf5bdebaa40 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 7 Feb 2018 11:56:47 -0800 Subject: [PATCH 09/17] Add client logging for number of logical and physical reads at the NativeAPI layer as well as committed mutation counts and bytes. --- fdbclient/DatabaseContext.h | 6 +++++- fdbclient/NativeAPI.actor.cpp | 23 +++++++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 163556f5ee..ec5b12d7b7 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -132,7 +132,11 @@ public: Standalone dbName; Standalone dbId; - int64_t transactionsReadVersions; + int64_t transactionReadVersions; + int64_t transactionLogicalReads; + int64_t transactionPhysicalReads; + int64_t transactionCommittedMutations; + int64_t transactionCommittedMutationBytes; int64_t transactionsCommitStarted; int64_t transactionsCommitCompleted; int64_t transactionsTooOld; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index f471560f7d..b2ac98226d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -206,7 +206,11 @@ ACTOR Future databaseLogger( DatabaseContext *cx ) { loop { Void _ = wait( delay( CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID ) ); TraceEvent("TransactionMetrics") - .detail("ReadVersions", cx->transactionsReadVersions) + .detail("ReadVersions", cx->transactionReadVersions) + .detail("LogicalUncachedReads", cx->transactionLogicalReads) + .detail("PhysicalReadRequests", cx->transactionPhysicalReads) + .detail("CommittedMutations", cx->transactionCommittedMutations) + .detail("CommittedMutationBytes", cx->transactionCommittedMutationBytes) .detail("CommitStarted", cx->transactionsCommitStarted) .detail("CommitCompleted", cx->transactionsCommitCompleted) .detail("TooOld", cx->transactionsTooOld) @@ -461,8 +465,8 @@ DatabaseContext::DatabaseContext( Standalone dbName, Standalone dbId, int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware ) : clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId), - transactionsReadVersions(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), - transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID), + transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0), transactionsCommitStarted(0), + transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID), outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) { @@ -1163,6 +1167,7 @@ ACTOR Future> getValue( Future version, Key key, Databa ++cx->getValueSubmitted; startTime = timer_int(); startTimeD = now(); + ++cx->transactionPhysicalReads; state GetValueReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getValue, GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); double latency = now() - startTimeD; cx->readLatencies.addSample(latency); @@ -1223,6 +1228,7 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T try { if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", printable(k.getKey())).detail("offset",k.offset).detail("orEqual",k.orEqual); + ++cx->transactionPhysicalReads; GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",printable(reply.sel.key)).detail("offset", reply.sel.offset).detail("orEqual", k.orEqual); @@ -1381,6 +1387,7 @@ ACTOR Future> getExactRange( Database cx, Version ver .detail("Reverse", reverse) .detail("Servers", locations[shard].second->description());*/ } + ++cx->transactionPhysicalReads; GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After"); @@ -1650,6 +1657,7 @@ ACTOR Future> getRange( Database cx, Referencedescription());*/ } + ++cx->transactionPhysicalReads; GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); if( info.debugID.present() ) { @@ -1828,6 +1836,7 @@ void Transaction::setVersion( Version v ) { } Future> Transaction::get( const Key& key, bool snapshot ) { + ++cx->transactionLogicalReads; //ASSERT (key < allKeys.end); //There are no keys in the database with size greater than KEY_SIZE_LIMIT @@ -1914,6 +1923,7 @@ ACTOR Future< Standalone< VectorRef< const char*>>> getAddressesForKeyActor( Key } Future< Standalone< VectorRef< const char*>>> Transaction::getAddressesForKey( const Key& key ) { + ++cx->transactionLogicalReads; auto ver = getReadVersion(); return getAddressesForKeyActor(key, ver, cx, info); @@ -1936,6 +1946,7 @@ ACTOR Future< Key > getKeyAndConflictRange( } Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) { + ++cx->transactionLogicalReads; if( snapshot ) return ::getKey(cx, key, getReadVersion(), info); @@ -1951,6 +1962,8 @@ Future< Standalone > Transaction::getRange( bool snapshot, bool reverse ) { + ++cx->transactionLogicalReads; + if( limits.isReached() ) return Standalone(); @@ -2367,6 +2380,8 @@ ACTOR static Future tryCommit( Database cx, Reference tr->numErrors = 0; cx->transactionsCommitCompleted++; + cx->transactionCommittedMutations += req.transaction.mutations.size(); + cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize(); if(info.debugID.present()) g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After"); @@ -2720,7 +2735,7 @@ ACTOR Future extractReadVersion(DatabaseContext* cx, Reference Transaction::getReadVersion(uint32_t flags) { - cx->transactionsReadVersions++; + cx->transactionReadVersions++; flags |= options.getReadVersionFlags; auto& batcher = cx->versionBatcher[ flags ]; From 9c7a39d5b259d2cf8a87f3e21c73ca7c4ae5151c Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Wed, 7 Feb 2018 12:54:34 -0800 Subject: [PATCH 10/17] Rename SevWarnAlways trace event DeviceNotFound to GetDiskStatisticsDeviceNotFound --- flow/Platform.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/Platform.cpp b/flow/Platform.cpp index 5eeb7ecaf1..166c958b89 100644 --- a/flow/Platform.cpp +++ b/flow/Platform.cpp @@ -701,7 +701,7 @@ void getDiskStatistics(std::string const& directory, uint64_t& currentIOs, uint6 disk_stream.ignore( std::numeric_limits::max(), '\n'); } - if(!g_network->isSimulated()) TraceEvent(SevWarnAlways, "DeviceNotFound").detail("Directory", directory); + if(!g_network->isSimulated()) TraceEvent(SevWarnAlways, "GetDiskStatisticsDeviceNotFound").detail("Directory", directory); } dev_t getDeviceId(std::string path) { From f8522248cb88c81b3adce3e23be1b170a8756e76 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Wed, 7 Feb 2018 16:25:16 -0800 Subject: [PATCH 11/17] Blob credentials files were being opened in read-write mode despite the read-only option being specified because the underlying caching layer opens always opens files for read/write access. For now, disabled caching for this file. --- fdbrpc/BlobStore.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbrpc/BlobStore.actor.cpp b/fdbrpc/BlobStore.actor.cpp index 4ee595793c..328a9ecb37 100644 --- a/fdbrpc/BlobStore.actor.cpp +++ b/fdbrpc/BlobStore.actor.cpp @@ -294,7 +294,7 @@ ACTOR Future tryReadJSONFile(std::string path) { state std::string content; try { - state Reference f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY, 0)); + state Reference f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0)); state int64_t size = wait(f->size()); state Standalone buf = makeString(size); int r = wait(f->read(mutateString(buf), size, 0)); From 69425a303b2a09eb8eda38e11d962e51476b1898 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Wed, 7 Feb 2018 21:50:43 -0800 Subject: [PATCH 12/17] Improved error handling for cases where blob account credentials are either not found in the provided credentials sources and/or some of the credentials sources provided are not readable or parseable. --- fdbrpc/BlobStore.actor.cpp | 91 ++++++++++++++++++++++++-------------- flow/error_definitions.h | 2 + 2 files changed, 59 insertions(+), 34 deletions(-) diff --git a/fdbrpc/BlobStore.actor.cpp b/fdbrpc/BlobStore.actor.cpp index 328a9ecb37..9d0b510ffc 100644 --- a/fdbrpc/BlobStore.actor.cpp +++ b/fdbrpc/BlobStore.actor.cpp @@ -221,21 +221,27 @@ ACTOR Future deleteRecursively_impl(Reference b, std::s state Future done = b->listBucketStream(bucket, resultStream, prefix, '/', std::numeric_limits::max()); // Wrap done in an actor which will send end_of_stream since listBucketStream() does not (so that many calls can write to the same stream) done = map(done, [=](Void) { - resultStream.sendError(end_of_stream()); - return Void(); - }); + resultStream.sendError(end_of_stream()); + return Void(); + }); state std::list> deleteFutures; try { loop { - BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture()); - for(auto &object : list.objects) { - int *pNumDeletedCopy = pNumDeleted; // avoid capture of this - deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void { - if(pNumDeletedCopy != nullptr) - ++*pNumDeletedCopy; - return Void(); - })); + choose { + // Throw if done throws, otherwise don't stop until end_of_stream + when(Void _ = wait(done)) {} + + when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) { + for(auto &object : list.objects) { + int *pNumDeletedCopy = pNumDeleted; // avoid capture of this + deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void { + if(pNumDeletedCopy != nullptr) + ++*pNumDeletedCopy; + return Void(); + })); + } + } } // This is just a precaution to avoid having too many outstanding delete actors waiting to run @@ -290,9 +296,12 @@ Future BlobStoreEndpoint::objectSize(std::string const &bucket, std::st // Try to read a file, parse it as JSON, and return the resulting document. // It will NOT throw if any errors are encountered, it will just return an empty // JSON object and will log trace events for the errors encountered. -ACTOR Future tryReadJSONFile(std::string path) { +ACTOR Future> tryReadJSONFile(std::string path) { state std::string content; + // Event type to be logged in the event of an exception + state const char *errorEventType = "BlobCredentialFileError"; + try { state Reference f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0)); state int64_t size = wait(f->size()); @@ -300,25 +309,22 @@ ACTOR Future tryReadJSONFile(std::string path) { int r = wait(f->read(mutateString(buf), size, 0)); ASSERT(r == size); content = buf.toString(); - } catch(Error &e) { - if(e.code() != error_code_actor_cancelled) - TraceEvent(SevWarn, "BlobCredentialFileError").detail("File", path).error(e).suppressFor(60, true); - return json_spirit::mObject(); - } - try { + // Any exceptions from hehre forward are parse failures + errorEventType = "BlobCredentialFileParseFailed"; json_spirit::mValue json; json_spirit::read_string(content, json); if(json.type() == json_spirit::obj_type) return json.get_obj(); else TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").detail("File", path).suppressFor(60, true); + } catch(Error &e) { if(e.code() != error_code_actor_cancelled) - TraceEvent(SevWarn, "BlobCredentialFileParseFailed").detail("File", path).error(e).suppressFor(60, true); + TraceEvent(SevWarn, errorEventType).detail("File", path).error(e).suppressFor(60, true); } - return json_spirit::mObject(); + return Optional(); } ACTOR Future updateSecret_impl(Reference b) { @@ -326,7 +332,7 @@ ACTOR Future updateSecret_impl(Reference b) { if(pFiles == nullptr) return Void(); - state std::vector> reads; + state std::vector>> reads; for(auto &f : *pFiles) reads.push_back(tryReadJSONFile(f)); @@ -334,13 +340,22 @@ ACTOR Future updateSecret_impl(Reference b) { std::string key = b->key + "@" + b->host; + int invalid = 0; + for(auto &f : reads) { - JSONDoc doc(f.get()); + // If value not present then the credentials file wasn't readable or valid. Continue to check other results. + if(!f.get().present()) { + ++invalid; + continue; + } + + JSONDoc doc(f.get().get()); if(doc.has("accounts") && doc.last().type() == json_spirit::obj_type) { JSONDoc accounts(doc.last().get_obj()); if(accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) { JSONDoc account(accounts.last()); std::string secret; + // Once we find a matching account, use it. if(account.tryGet("secret", secret)) { b->secret = secret; return Void(); @@ -349,7 +364,12 @@ ACTOR Future updateSecret_impl(Reference b) { } } - return Void(); + // If any sources were invalid + if(invalid > 0) + throw backup_auth_unreadable(); + + // All sources were valid but didn't contain the desired info + throw backup_auth_missing(); } Future BlobStoreEndpoint::updateSecret() { @@ -451,12 +471,9 @@ ACTOR Future> doRequest_impl(Reference listBucket_impl(Reference done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter); // Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same stream done = map(done, [=](Void) { - resultStream.sendError(end_of_stream()); - return Void(); - }); + resultStream.sendError(end_of_stream()); + return Void(); + }); try { loop { - BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture()); - results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end()); - results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end()); + choose { + // Throw if done throws, otherwise don't stop until end_of_stream + when(Void _ = wait(done)) {} + + when(BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) { + results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end()); + results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end()); + } + } } } catch(Error &e) { if(e.code() != error_code_end_of_stream) diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 372103b11b..02d69c3dad 100644 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -174,6 +174,8 @@ ERROR( backup_bad_block_size, 2313, "Backup file block size too small") ERROR( backup_invalid_url, 2314, "Backup Container URL invalid") ERROR( backup_invalid_info, 2315, "Backup Container URL invalid") ERROR( backup_cannot_expire, 2316, "Cannot expire requested data from backup without violating minimum restorability") +ERROR( backup_auth_missing, 2317, "Cannot find authentication details (such as a password or secret key) for the specified Backup Container URL") +ERROR( backup_auth_unreadable, 2318, "Cannot read or parse one or more sources of authentication information for Backup Container URLs") ERROR( restore_invalid_version, 2361, "Invalid restore version") ERROR( restore_corrupted_data, 2362, "Corrupted backup data") ERROR( restore_missing_data, 2363, "Missing backup data") From 2d483e7a1c910803600b872cd4ac6105d12f26d8 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Wed, 7 Feb 2018 22:36:01 -0800 Subject: [PATCH 13/17] Fdbbackup will now still write a trace file if --log is specified when doing operations that do not require a cluster. Previous behavior was to rely on createCluster()'s trace file initialization. --- fdbbackup/backup.actor.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 3b1c60bb80..04748d6964 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -2279,6 +2279,8 @@ int main(int argc, char* argv[]) { bool dryRun = false; std::string traceDir = ""; std::string traceLogGroup; + uint64_t traceRollSize = TRACE_DEFAULT_ROLL_SIZE; + uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE; ESOError lastError; bool partial = true; LocalityData localities; @@ -2706,6 +2708,14 @@ int main(int argc, char* argv[]) { } } + // Opens a trace file if trace is set (and if a trace file isn't already open) + // For most modes, initCluster() will open a trace file, but some fdbbackup operations do not require + // a cluster so they should use this instead. + auto initTraceFile = [&]() { + if(trace) + openTraceFile(NetworkAddress(), traceRollSize, traceMaxLogsSize, traceDir, "trace", traceLogGroup); + }; + auto initCluster = [&](bool quiet = false) { auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile); try { @@ -2818,6 +2828,7 @@ int main(int argc, char* argv[]) { break; case BACKUP_EXPIRE: + initTraceFile(); // Must have a usable cluster if either expire DateTime options were used if(!expireDatetime.empty() || !expireRestorableAfterDatetime.empty()) { if(!initCluster()) @@ -2827,10 +2838,12 @@ int main(int argc, char* argv[]) { break; case BACKUP_DELETE: + initTraceFile(); f = stopAfter( deleteBackupContainer(argv[0], destinationContainer) ); break; case BACKUP_DESCRIBE: + initTraceFile(); // If timestamp lookups are desired, require a cluster file if(describeTimestamps && !initCluster()) return FDB_EXIT_ERROR; @@ -2839,6 +2852,7 @@ int main(int argc, char* argv[]) { f = stopAfter( describeBackup(argv[0], destinationContainer, describeDeep, describeTimestamps ? Optional(db) : Optional()) ); break; case BACKUP_LIST: + initTraceFile(); f = stopAfter( listBackup(baseUrl) ); break; From 1f78b98ac9e18b15ce2aa55a5d434f595dbe263c Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Thu, 8 Feb 2018 09:54:47 -0800 Subject: [PATCH 14/17] Bug fix, result as a state variable causes 'this' to be captured instead of copying result. --- flow/Net2.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 1c4396cc9a..30db98234d 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -835,7 +835,7 @@ Future< Reference > Net2::connect( NetworkAddress toAddr ) { } ACTOR static Future> resolveTCPEndpoint_impl( Net2 *self, std::string host, std::string service) { - state Promise> result; + Promise> result; self->tcpResolver.async_resolve(tcp::resolver::query(host, service), [=](const boost::system::error_code &ec, tcp::resolver::iterator iter) { if(ec) { From 93226ffed9a642230a8297d81ece70173a2a37f7 Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Thu, 8 Feb 2018 15:42:01 -0800 Subject: [PATCH 15/17] Upgrade msi GUID for 5.1.1 --- packaging/msi/FDBInstaller.wxs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/msi/FDBInstaller.wxs b/packaging/msi/FDBInstaller.wxs index d71aa8fa29..0ad587103f 100644 --- a/packaging/msi/FDBInstaller.wxs +++ b/packaging/msi/FDBInstaller.wxs @@ -32,7 +32,7 @@ Date: Thu, 8 Feb 2018 18:23:01 -0800 Subject: [PATCH 16/17] Bumping GUID post 5.1.1 release --- packaging/msi/FDBInstaller.wxs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/msi/FDBInstaller.wxs b/packaging/msi/FDBInstaller.wxs index 0ad587103f..6bdfd6f915 100644 --- a/packaging/msi/FDBInstaller.wxs +++ b/packaging/msi/FDBInstaller.wxs @@ -32,7 +32,7 @@ Date: Thu, 8 Feb 2018 18:36:28 -0800 Subject: [PATCH 17/17] Incrementing versions.target --- versions.target | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/versions.target b/versions.target index f9b9192f0a..11a5075577 100644 --- a/versions.target +++ b/versions.target @@ -1,7 +1,7 @@ - 5.1.1 + 5.1.2 5.1