added a yield to the proxy when committing a large batch of mutations

This commit is contained in:
Evan Tschannen 2018-10-18 15:26:00 -07:00
parent 0613a34845
commit 0b304495ad
1 changed files with 15 additions and 5 deletions

View File

@ -566,19 +566,29 @@ ACTOR Future<Void> commitBatch(
}
// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers' tags
int mutationCount = 0, mutationBytes = 0;
state int mutationCount = 0;
state int mutationBytes = 0;
state std::map<Key, MutationListRef> logRangeMutations;
state Arena logRangeMutationsArena;
state uint32_t v = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
state int transactionNum = 0;
state int yieldBytes = 0;
for (int t = 0; t<trs.size(); t++) {
for (; transactionNum<trs.size(); transactionNum++) {
if (committed[transactionNum] == ConflictBatch::TransactionCommitted && (!locked || trs[transactionNum].isLockAware())) {
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[transactionNum].transaction.mutations;
for (; mutationNum < pMutations->size(); mutationNum++) {
if(yieldBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
yieldBytes = 0;
Void _ = wait(yield());
}
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
for (auto m : trs[t].transaction.mutations) {
auto& m = (*pMutations)[mutationNum];
mutationCount++;
mutationBytes += m.expectedSize();
yieldBytes += m.expectedSize();
// Determine the set of tags (responsible storage servers) for the mutation, splitting it
// if necessary. Serialize (splits of) the mutation into the message buffer and add the tags.