Add delays between proxy batches which roughly corresponding to the amount of work the proxy needs to do. This will help avoid getting a version from the master and then waiting a long time before committing it.
This commit is contained in:
parent
453600aba5
commit
e167e63eaf
|
@ -319,6 +319,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( REQUIRED_MIN_RECOVERY_DURATION, 0.080 ); if( shortRecoveryDuration ) REQUIRED_MIN_RECOVERY_DURATION = 0.01;
|
||||
init( ALWAYS_CAUSAL_READ_RISKY, false );
|
||||
init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1;
|
||||
init( MAX_PROXY_COMMIT_BYTES_PER_SECOND, 200000000.0 );
|
||||
|
||||
// Master Server
|
||||
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
|
||||
|
|
|
@ -264,6 +264,7 @@ public:
|
|||
double REQUIRED_MIN_RECOVERY_DURATION;
|
||||
bool ALWAYS_CAUSAL_READ_RISKY;
|
||||
int MAX_COMMIT_UPDATES;
|
||||
double MAX_PROXY_COMMIT_BYTES_PER_SECOND;
|
||||
|
||||
// Master Server
|
||||
double COMMIT_SLEEP_TIME;
|
||||
|
|
|
@ -475,6 +475,13 @@ bool isWhitelisted(const vector<Standalone<StringRef>>& binPathVec, StringRef bi
|
|||
return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> releaseDelay, int64_t localBatchNumber) {
|
||||
wait(releaseDelay);
|
||||
ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1);
|
||||
self->latestLocalCommitBatchResolving.set(localBatchNumber);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commitBatch(
|
||||
ProxyCommitData* self,
|
||||
vector<CommitTransactionRequest> trs,
|
||||
|
@ -486,6 +493,11 @@ ACTOR Future<Void> commitBatch(
|
|||
state Optional<UID> debugID;
|
||||
state bool forceRecovery = false;
|
||||
state BinaryWriter valueWriter(Unversioned());
|
||||
|
||||
state int batchBytes = 0;
|
||||
for (int t = 0; t<trs.size(); t++) {
|
||||
batchBytes += trs[t].transaction.expectedSize();
|
||||
}
|
||||
|
||||
ASSERT(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT); // since we are using just the former to limit the number of versions actually in flight!
|
||||
|
||||
|
@ -515,6 +527,7 @@ ACTOR Future<Void> commitBatch(
|
|||
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
|
||||
TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
|
||||
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
|
||||
state Future<Void> releaseDelay = delay(batchBytes/SERVER_KNOBS->MAX_PROXY_COMMIT_BYTES_PER_SECOND, TaskPriority::ProxyCommitYield1);
|
||||
wait(yield(TaskPriority::ProxyCommitYield1));
|
||||
|
||||
if (debugID.present())
|
||||
|
@ -566,9 +579,7 @@ ACTOR Future<Void> commitBatch(
|
|||
}
|
||||
|
||||
state vector<vector<int>> transactionResolverMap = std::move( requests.transactionResolverMap );
|
||||
|
||||
ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1);
|
||||
self->latestLocalCommitBatchResolving.set(localBatchNumber);
|
||||
state Future<Void> releaseFuture = releaseResolvingAfter(self, releaseDelay, localBatchNumber);
|
||||
|
||||
/////// Phase 2: Resolution (waiting on the network; pipelined)
|
||||
state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );
|
||||
|
@ -1068,6 +1079,7 @@ ACTOR Future<Void> commitBatch(
|
|||
|
||||
self->commitBatchesMemBytesCount -= currentBatchMemBytesCount;
|
||||
ASSERT_ABORT(self->commitBatchesMemBytesCount >= 0);
|
||||
wait(releaseFuture);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue