FastRestore:Loader:Enable sending message out of order

This commit is contained in:
Meng Xu 2020-04-21 22:29:57 -07:00
parent d85a39df69
commit 9b2f6d5c13
2 changed files with 7 additions and 6 deletions

View File

@ -466,8 +466,7 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest {
Version msgIndex; // Monitonically increasing index of mutation messages
bool isRangeFile;
VersionedMutationsVec
versionedMutations; // Versioned mutations that may be at different versions parsed by one loader
VersionedMutationsVec versionedMutations; // Versioned mutations may be at different versions parsed by one loader
ReplyPromise<RestoreCommonReply> reply;

View File

@ -483,6 +483,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
state std::map<UID, VersionedMutationsVec> applierVersionedMutationsBuffer;
state int mIndex = 0;
state LogMessageVersion commitVersion;
state std::vector<Future<Void>> fSends;
for (auto& applierID : applierIDs) {
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
@ -556,8 +557,8 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces,
requests, TaskPriority::RestoreLoaderSendMutations));
msgIndex++;
msgSize = 0;
for (auto& applierID : applierIDs) {
@ -580,9 +581,10 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
}
wait(waitForAll(fSends));
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);
return Void();