Control number of replies on wait in getBatchReplies

This commit is contained in:
Meng Xu 2020-05-01 10:09:08 -07:00
parent 2c17fff6bb
commit 05ba743f96
2 changed files with 3 additions and 7 deletions

View File

@ -296,9 +296,9 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
if (ongoingReplies.empty()) {
break;
} else {
wait(waitForAny(ongoingReplies));
// wait(quorum(ongoingReplies, std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL,
// (int)ongoingReplies.size())));
// wait(waitForAny(ongoingReplies));
wait(quorum(ongoingReplies, std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL,
(int)ongoingReplies.size())));
}
// At least one reply is received; Calculate the reply duration
for (int j = 0; j < ongoingReplies.size(); ++j) {

View File

@ -562,8 +562,6 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("Requests", requests.size());
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces,
requests, TaskPriority::RestoreLoaderSendMutations));
// wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
// TaskPriority::RestoreLoaderSendMutations));
msgIndex++;
msgSize = 0;
for (auto& applierID : applierIDs) {
@ -588,8 +586,6 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("Requests", requests.size());
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
// wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
// TaskPriority::RestoreLoaderSendMutations));
}
wait(waitForAll(fSends));