From 05ba743f96d13445085c4425f17e3716a537081c Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Fri, 1 May 2020 10:09:08 -0700 Subject: [PATCH] Control number of replies on wait in getBatchReplies --- fdbserver/RestoreCommon.actor.h | 6 +++--- fdbserver/RestoreLoader.actor.cpp | 4 ---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/fdbserver/RestoreCommon.actor.h b/fdbserver/RestoreCommon.actor.h index 32265ca102..9b0d0f6497 100644 --- a/fdbserver/RestoreCommon.actor.h +++ b/fdbserver/RestoreCommon.actor.h @@ -296,9 +296,9 @@ Future getBatchReplies(RequestStream Interface::*channel, std::ma if (ongoingReplies.empty()) { break; } else { - wait(waitForAny(ongoingReplies)); - // wait(quorum(ongoingReplies, std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL, - // (int)ongoingReplies.size()))); + // wait(waitForAny(ongoingReplies)); + wait(quorum(ongoingReplies, std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL, + (int)ongoingReplies.size()))); } // At least one reply is received; Calculate the reply duration for (int j = 0; j < ongoingReplies.size(); ++j) { diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 7e190efa61..8e4b0f25ff 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -562,8 +562,6 @@ ACTOR Future sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat .detail("Requests", requests.size()); fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests, TaskPriority::RestoreLoaderSendMutations)); - // wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests, - // TaskPriority::RestoreLoaderSendMutations)); msgIndex++; msgSize = 0; for (auto& applierID : applierIDs) { @@ -588,8 +586,6 @@ ACTOR Future sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat .detail("Requests", requests.size()); fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests, TaskPriority::RestoreLoaderSendMutations)); - // wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests, - // TaskPriority::RestoreLoaderSendMutations)); } wait(waitForAll(fSends));