From e167e63eaf69b3157768cdca3054b0eda2116d67 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Thu, 23 Jan 2020 18:31:51 -0800 Subject: [PATCH] Add delays between proxy batches which roughly corresponding to the amount of work the proxy needs to do. This will help avoid getting a version from the master and then waiting a long time before committing it. --- fdbserver/Knobs.cpp | 1 + fdbserver/Knobs.h | 1 + fdbserver/MasterProxyServer.actor.cpp | 18 +++++++++++++++--- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 9ad6ff6997..c27d708a90 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -319,6 +319,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( REQUIRED_MIN_RECOVERY_DURATION, 0.080 ); if( shortRecoveryDuration ) REQUIRED_MIN_RECOVERY_DURATION = 0.01; init( ALWAYS_CAUSAL_READ_RISKY, false ); init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1; + init( MAX_PROXY_COMMIT_BYTES_PER_SECOND, 200000000.0 ); // Master Server // masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution) diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 788bbb701a..584cd4b670 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -264,6 +264,7 @@ public: double REQUIRED_MIN_RECOVERY_DURATION; bool ALWAYS_CAUSAL_READ_RISKY; int MAX_COMMIT_UPDATES; + double MAX_PROXY_COMMIT_BYTES_PER_SECOND; // Master Server double COMMIT_SLEEP_TIME; diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 9193c7f540..6a278804e8 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -475,6 +475,13 @@ bool isWhitelisted(const vector>& binPathVec, StringRef bi return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end(); } +ACTOR Future releaseResolvingAfter(ProxyCommitData* self, Future releaseDelay, int64_t localBatchNumber) { + wait(releaseDelay); + ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1); + self->latestLocalCommitBatchResolving.set(localBatchNumber); + return Void(); +} + ACTOR Future commitBatch( ProxyCommitData* self, vector trs, @@ -486,6 +493,11 @@ ACTOR Future commitBatch( state Optional debugID; state bool forceRecovery = false; state BinaryWriter valueWriter(Unversioned()); + + state int batchBytes = 0; + for (int t = 0; tMAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT); // since we are using just the former to limit the number of versions actually in flight! @@ -515,6 +527,7 @@ ACTOR Future commitBatch( /////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield) TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1)); + state Future releaseDelay = delay(batchBytes/SERVER_KNOBS->MAX_PROXY_COMMIT_BYTES_PER_SECOND, TaskPriority::ProxyCommitYield1); wait(yield(TaskPriority::ProxyCommitYield1)); if (debugID.present()) @@ -566,9 +579,7 @@ ACTOR Future commitBatch( } state vector> transactionResolverMap = std::move( requests.transactionResolverMap ); - - ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1); - self->latestLocalCommitBatchResolving.set(localBatchNumber); + state Future releaseFuture = releaseResolvingAfter(self, releaseDelay, localBatchNumber); /////// Phase 2: Resolution (waiting on the network; pipelined) state vector resolution = wait( getAll(replies) ); @@ -1068,6 +1079,7 @@ ACTOR Future commitBatch( self->commitBatchesMemBytesCount -= currentBatchMemBytesCount; ASSERT_ABORT(self->commitBatchesMemBytesCount >= 0); + wait(releaseFuture); return Void(); }