FastRestore:Applier:Store received mutation by key
This commit is contained in:
parent
c0f75d77b1
commit
2bc82ffd70
|
@ -36,6 +36,8 @@
|
|||
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
|
||||
Reference<RestoreApplierData> self);
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req,
|
||||
Reference<RestoreApplierData> self);
|
||||
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
|
||||
Database cx);
|
||||
|
||||
|
@ -61,7 +63,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
when(RestoreSendVersionedMutationsRequest req =
|
||||
waitNext(applierInterf.sendMutationVector.getFuture())) {
|
||||
requestTypeStr = "sendMutationVector";
|
||||
actors.add(handleSendMutationVectorRequest(req, self));
|
||||
actors.add(handleSendMutationVectorRequestV2(req, self));
|
||||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture())) {
|
||||
requestTypeStr = "applyToDB";
|
||||
|
@ -98,6 +100,68 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
return Void();
|
||||
}
|
||||
|
||||
// The actor may be invovked multiple times and executed async.
|
||||
// No race condition as long as we do not wait or yield when operate the shared data.
|
||||
// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions;
|
||||
// Only one actor can process mutations from the same file
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req,
|
||||
Reference<RestoreApplierData> self) {
|
||||
state Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
||||
// Assume: processedFileState[req.asset] will not be erased while the actor is active.
|
||||
// Note: Insert new items into processedFileState will not invalidate the reference.
|
||||
state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset];
|
||||
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("ProcessedFileVersion", curFilePos.get())
|
||||
.detail("Request", req.toString());
|
||||
|
||||
wait(curFilePos.whenAtLeast(req.prevVersion));
|
||||
|
||||
state bool isDuplicated = true;
|
||||
if (curFilePos.get() == req.prevVersion) {
|
||||
isDuplicated = false;
|
||||
Version commitVersion = req.version;
|
||||
MutationsVec mutations(req.mutations);
|
||||
// Sanity check: mutations in range file is in [beginVersion, endVersion);
|
||||
// mutations in log file is in [beginVersion, endVersion], both inclusive.
|
||||
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion);
|
||||
// Loader sends the endVersion to ensure all useful versions are sent
|
||||
ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) ||
|
||||
(!req.isRangeFile && commitVersion <= req.asset.endVersion));
|
||||
|
||||
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
|
||||
MutationRef mutation = mutations[mIndex];
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations")
|
||||
.detail("ApplierNode", self->id())
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("Version", commitVersion)
|
||||
.detail("Index", mIndex)
|
||||
.detail("MutationReceived", mutation.toString());
|
||||
batchData->counters.receivedBytes += mutation.totalSize();
|
||||
batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified
|
||||
batchData->counters.receivedMutations += 1;
|
||||
batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type) mutation.type) ? 1 : 0;
|
||||
// Sanity check
|
||||
if (g_network->isSimulated()) {
|
||||
if (isRangeMutation(mutation)) {
|
||||
ASSERT(mutation.param1 >= req.asset.range.begin &&
|
||||
mutation.param2 <= req.asset.range.end); // Range mutation's right side is exclusive
|
||||
} else {
|
||||
ASSERT(mutation.param1 >= req.asset.range.begin && mutation.param1 < req.asset.range.end);
|
||||
}
|
||||
}
|
||||
// Note: Log and range mutations may be delivered out of order. Can we handle it?
|
||||
batchData->stagingKeys.addMutation(mutation, commitVersion);
|
||||
}
|
||||
curFilePos.set(req.version);
|
||||
}
|
||||
|
||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// The actor may be invovked multiple times and executed async.
|
||||
// No race condition as long as we do not wait or yield when operate the shared data.
|
||||
// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions;
|
||||
|
|
|
@ -47,15 +47,9 @@ struct StagingKey {
|
|||
Version version; // largest version of set or clear for the key
|
||||
std::map<Version, MutationsVec> pendingMutations; // mutations not set or clear type
|
||||
|
||||
// bool operator < (const StagingKey& rhs) const {
|
||||
// return std::tie(key, version, type, value)
|
||||
// }
|
||||
explicit StagingKey() : version(0) {}
|
||||
explicit StagingKey(MutationRef m, Version version)
|
||||
: key(m.param1), val(m.param2), type(m.type), version(versoin) {}
|
||||
|
||||
void add(const MutationRef& m, Version newVersion) {
|
||||
ASSERT(version > 0); // Only add mutation
|
||||
if (version < newVersion) {
|
||||
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
|
||||
key = m.param1;
|
||||
|
@ -81,7 +75,6 @@ struct StagingKey {
|
|||
.detail("ExistingKeyType", typeString[type]);
|
||||
}
|
||||
} // else input mutation is old and can be ignored
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,6 +121,13 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
}
|
||||
~ApplierBatchData() = default;
|
||||
|
||||
void addMutation(MutationRef m, Version ver) {
|
||||
if (stagingKeys.find(m.param1) == stagingKeys.end()) {
|
||||
stagingKeys.emplace(m.param1, StagingKey());
|
||||
}
|
||||
stagingKeys[m.param1].add(m, ver);
|
||||
}
|
||||
|
||||
void reset() {
|
||||
kvOps.clear();
|
||||
dbApplier = Optional<Future<Void>>();
|
||||
|
|
Loading…
Reference in New Issue