StagingKey can add out-of-order mutations
For partitioned logs, mutations of the same version may be sent to applier out-of-order. If one loader advances to the next version, an applier may receive later version mutations for different loaders. So, dropping of early mutations is wrong.
This commit is contained in:
@ -60,21 +60,7 @@ struct StagingKey {
// Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set
void add(const MutationRef& m, LogMessageVersion newVersion) {
ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue);
if (version < newVersion) {
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
key = m.param1;
val = m.param2;
type = (MutationRef::Type)m.type;
version = newVersion;
} else {
if (pendingMutations.find(newVersion) == pendingMutations.end()) {
pendingMutations.emplace(newVersion, MutationsVec());
// TODO: Do we really need deep copy?
MutationsVec& mutations = pendingMutations[newVersion];
mutations.push_back_deep(mutations.arena(), m);
} else if (version == newVersion) { // Sanity check
if (version == newVersion) { // Sanity check
.detail("Version", newVersion.toString())
.detail("NewMutation", m.toString())
@ -106,7 +92,25 @@ struct StagingKey {
.detail("ExistingKeyType", typeString[type])
.detail("ExitingKeyValue", val);
} // else input mutation is old and can be ignored
// newVersion can be smaller than version as different loaders can send
// mutations out of order.
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
if (version < newVersion) {
key = m.param1;
val = m.param2;
type = (MutationRef::Type)m.type;
version = newVersion;
} else {
auto it = pendingMutations.find(newVersion);
if (it == pendingMutations.end()) {
bool inserted;
std::tie(it, inserted) = pendingMutations.emplace(newVersion, MutationsVec());
// TODO: Do we really need deep copy?
it->second.push_back_deep(it->second.arena(), m);
// Precompute the final value of the key.
@ -166,7 +166,6 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
if (reader.consume<int32_t>() != PARTITIONED_MLOG_VERSION) throw restore_unsupported_file_version();
VersionedMutationsMap& kvOps = kvOpsIter->second;
VersionedMutationsMap::iterator it = kvOps.end();
while (1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if (reader.eof() || *reader.rptr == 0xFF) break;
@ -181,6 +180,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
// Skip mutations out of the version range
if (!asset.isInVersionRange(msgVersion.version)) continue;
VersionedMutationsMap::iterator it;
bool inserted;
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
@ -327,7 +327,6 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
state std::map<LoadingParam, VersionedMutationsMap>::iterator item = batchData->kvOpsPerLP.begin();
state Reference<LoaderBatchStatus> batchStatus = self->status[req.batchIndex];
state bool isDuplicated = true;
@ -377,11 +376,11 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
if (!isDuplicated) {
vector<Future<Void>> fSendMutations;
batchData->rangeToApplier = req.rangeToApplier;
for (; item != batchData->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {
for (auto& [loadParam, kvOps] : batchData->kvOpsPerLP) {
if (loadParam.isRangeFile == req.useRangeFile) {
// Send the parsed mutation to applier who will apply the mutation to DB
fSendMutations.push_back(sendMutationsToApplier(&item->second, req.batchIndex, item->first.asset,
item->first.isRangeFile, &batchData->rangeToApplier,
fSendMutations.push_back(sendMutationsToApplier(&kvOps, req.batchIndex, loadParam.asset,
loadParam.isRangeFile, &batchData->rangeToApplier,
@ -423,7 +422,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("RestoreAsset", asset.toString());
// There should be no mutation at asset.endVersion version because it is exclusive
if (kvOps.find(LogMessageVersion(asset.endVersion)) != kvOps.end()) {
if (kvOps.lower_bound(LogMessageVersion(asset.endVersion)) != kvOps.end()) {
TraceEvent(SevError, "FastRestoreLoaderSendMutationToApplier")
.detail("BatchIndex", batchIndex)
.detail("RestoreAsset", asset.toString())
@ -449,12 +448,6 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
applierMutationsSize[applierID] = 0.0;
const LogMessageVersion& commitVersion = kvOp->first;
if (!(commitVersion.version >= asset.beginVersion &&
commitVersion.version <= asset.endVersion)) { // Debug purpose
TraceEvent(SevError, "FastRestore_SendMutationsToApplier")
.detail("CommitVersion", commitVersion.version)
.detail("RestoreAsset", asset.toString());
ASSERT(commitVersion.version >= asset.beginVersion);
ASSERT(commitVersion.version <= asset.endVersion); // endVersion is an empty commit to ensure progress
@ -485,15 +478,14 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
std::map<Key, UID>::iterator itlow = pRangeToApplier->upper_bound(kvm.param1);
--itlow; // make sure itlow->first <= m.param1
ASSERT(itlow->first <= kvm.param1);
MutationRef mutation = kvm;
UID applierID = itlow->second;
// printf("KV--Applier: K:%s ApplierID:%s\n", kvm.param1.toString().c_str(),
// applierID.toString().c_str());
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation);
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), kvm);
applierSubsBuffer[applierID].push_back(applierSubsBuffer[applierID].arena(), commitVersion.sub);
applierMutationsSize[applierID] += mutation.expectedSize();
applierMutationsSize[applierID] += kvm.expectedSize();
} // Mutations at the same version
@ -606,20 +598,23 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
if (it == mutationMap.end()) {
mutationMap.insert(std::make_pair(id, val_input));
if (part != 0) {
TraceEvent(SevError, "FastRestore").detail("FirstPartNotZero", part).detail("KeyInput", getHexString(key_input));
TraceEvent(SevError, "FastRestore")
.detail("FirstPartNotZero", part)
.detail("KeyInput", getHexString(key_input));
mutationPartMap.insert(std::make_pair(id, part));
} else { // Concatenate the val string with the same commitVersion
it->second = it->second.contents().withSuffix(val_input.contents()); // Assign the new Areana to the map's value
if (part != (mutationPartMap[id] + 1)) {
auto& currentPart = mutationPartMap[id];
if (part != (currentPart + 1)) {
// Check if the same range or log file has been processed more than once!
TraceEvent(SevError, "FastRestore")
.detail("CurrentPart1", mutationPartMap[id])
.detail("CurrentPart2", part)
.detail("KeyInput", getHexString(key_input))
.detail("Hint", "Check if the same range or log file has been processed more than once");
.detail("CurrentPart1", currentPart)
.detail("CurrentPart2", part)
.detail("KeyInput", getHexString(key_input))
.detail("Hint", "Check if the same range or log file has been processed more than once");
mutationPartMap[id] = part;
currentPart = part;
concatenated = true;
@ -55,6 +55,9 @@ struct RestoreSimpleRequest;
// Value MutationsVec is the vector of parsed backup mutations.
// For old mutation logs, the subsequence number is always 0.
// For partitioned mutation logs, each mutation has a unique LogMessageVersion.
// Note for partitioned logs, one LogMessageVersion can have multiple mutations,
// because a clear mutation may be split into several smaller clear mutations by
// backup workers.
using VersionedMutationsMap = std::map<LogMessageVersion, MutationsVec>;
ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatchIndex, std::string name);
Reference in New Issue