Batch sending all mutations of a version from RestoreLoader

This optimization is to reduce the number of messages sent from loader to
applier, which was unintentionally done when introducing sub sequence numbers
for mutations.
This commit is contained in:
Jingyu Zhou 2020-03-16 18:20:02 -07:00
parent 4065ca2a65
commit 4bdb32be14
1 changed files with 42 additions and 31 deletions

View File

@ -436,17 +436,17 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
splitMutationIndex = 0;
kvCount = 0;
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
state std::map<UID, MutationsVec> applierMutationsBuffer;
state std::map<UID, SubSequenceVec> applierSubsBuffer;
state std::map<UID, double> applierMutationsSize;
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec();
applierSubsBuffer[applierID] = SubSequenceVec();
applierMutationsSize[applierID] = 0.0;
}
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
std::map<UID, MutationsVec> applierMutationsBuffer;
std::map<UID, SubSequenceVec> applierSubsBuffer;
std::map<UID, double> applierMutationsSize;
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec();
applierSubsBuffer[applierID] = SubSequenceVec();
applierMutationsSize[applierID] = 0.0;
}
const LogMessageVersion& commitVersion = kvOp->first;
ASSERT(commitVersion.version >= asset.beginVersion);
ASSERT(commitVersion.version <= asset.endVersion); // endVersion is an empty commit to ensure progress
@ -493,35 +493,46 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
--itlow; // make sure itlow->first <= m.param1
ASSERT(itlow->first <= kvm.param1);
UID applierID = itlow->second;
// printf("KV--Applier: K:%s ApplierID:%s\n", kvm.param1.toString().c_str(),
// applierID.toString().c_str());
kvCount++;
if (debugMutation("RestoreLoader", commitVersion.version, kvm)) {
TraceEvent("SendMutation")
.detail("Applier", applierID)
.detail("Version", commitVersion.toString())
.detail("Mutation", kvm.toString());
}
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), kvm);
applierSubsBuffer[applierID].push_back(applierSubsBuffer[applierID].arena(), commitVersion.sub);
applierMutationsSize[applierID] += kvm.expectedSize();
}
} // Mutations at the same version
} // Mutations at the same LogMessageVersion
// TODO: Sanity check each asset has been received exactly once!
// Send the mutations to appliers for each version
for (const UID& applierID : applierIDs) {
requests.emplace_back(applierID, RestoreSendVersionedMutationsRequest(
batchIndex, asset, prevVersion, commitVersion.version, isRangeFile,
applierMutationsBuffer[applierID], applierSubsBuffer[applierID]));
// Batch same Version's mutations in one request. We could batch more by
// changing the version comparison below.
auto next = std::next(kvOp, 1);
if (next == kvOps.end() || commitVersion.version < next->first.version) {
// TODO: Sanity check each asset has been received exactly once!
// Send the mutations to appliers for each version
for (const UID& applierID : applierIDs) {
requests.emplace_back(applierID, RestoreSendVersionedMutationsRequest(
batchIndex, asset, prevVersion, commitVersion.version, isRangeFile,
applierMutationsBuffer[applierID], applierSubsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestore_SendMutationToApplier")
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion.toString())
.detail("RestoreAsset", asset.toString());
ASSERT(prevVersion < commitVersion.version);
prevVersion = commitVersion.version;
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
requests.clear();
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec();
applierSubsBuffer[applierID] = SubSequenceVec();
applierMutationsSize[applierID] = 0.0;
}
}
TraceEvent(SevDebug, "FastRestore_SendMutationToApplier")
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion.toString())
.detail("RestoreAsset", asset.toString());
ASSERT(prevVersion <= commitVersion.version);
prevVersion = commitVersion.version;
// Tracking this request can be spammy
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations,
SERVER_KNOBS->FASTRESTORE_TRACK_LOADER_SEND_REQUESTS));
requests.clear();
} // all versions of mutations in the same file
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);