Add subsequence number to restore loader & applier

The subsequence number is needed so that mutations of the same commit version
number, but from different partitioned logs can be correctly reassembled in
order.

For old backup files, the sub number is always 0. For partitioned mutation
logs, the actual sub number is used. For range files, the sub number is always
0.
This commit is contained in:
Jingyu Zhou 2020-02-21 11:47:51 -08:00
parent d8c6bf585d
commit ace409b49a
6 changed files with 89 additions and 85 deletions

View File

@ -453,26 +453,28 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest {
Version prevVersion, version; // version is the commitVersion of the mutation vector.
bool isRangeFile;
MutationsVec mutations; // All mutations at the same version parsed by one loader
SubSequenceVec subs; // Sub-sequence number for mutations
ReplyPromise<RestoreCommonReply> reply;
RestoreSendVersionedMutationsRequest() = default;
explicit RestoreSendVersionedMutationsRequest(int batchIndex, const RestoreAsset& asset, Version prevVersion,
Version version, bool isRangeFile, MutationsVec mutations)
Version version, bool isRangeFile, MutationsVec mutations,
SubSequenceVec subs)
: batchIndex(batchIndex), asset(asset), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile),
mutations(mutations) {}
mutations(mutations), subs(subs) {}
std::string toString() {
std::stringstream ss;
ss << "VersionBatchIndex:" << batchIndex << "RestoreAsset:" << asset.toString()
<< " prevVersion:" << prevVersion << " version:" << version << " isRangeFile:" << isRangeFile
<< " mutations.size:" << mutations.size();
<< " mutations.size:" << mutations.size() << " subs.size:" << subs.size();
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, batchIndex, asset, prevVersion, version, isRangeFile, mutations, reply);
serializer(ar, batchIndex, asset, prevVersion, version, isRangeFile, mutations, subs, reply);
}
};

View File

@ -101,7 +101,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
// The actor may be invovked multiple times and executed async.
// No race condition as long as we do not wait or yield when operate the shared data.
// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions;
// Multiple such actors can run on different fileIDs;
// Only one actor can process mutations from the same file
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
Reference<RestoreApplierData> self) {
@ -126,21 +126,22 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
state bool isDuplicated = true;
if (curFilePos.get() == req.prevVersion) {
isDuplicated = false;
Version commitVersion = req.version;
const Version commitVersion = req.version;
uint16_t numVersionStampedKV = 0;
MutationsVec mutations(req.mutations);
// Sanity check: mutations in range file is in [beginVersion, endVersion);
// mutations in log file is in [beginVersion, endVersion], both inclusive.
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion);
// Loader sends the endVersion to ensure all useful versions are sent
ASSERT_WE_THINK(commitVersion <= req.asset.endVersion);
ASSERT(req.mutations.size() == req.subs.size());
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
MutationRef mutation = mutations[mIndex];
for (int mIndex = 0; mIndex < req.mutations.size(); mIndex++) {
const MutationRef& mutation = req.mutations[mIndex];
const LogMessageVersion mutationVersion(commitVersion, req.subs[mIndex]);
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("ApplierNode", self->id())
.detail("RestoreAsset", req.asset.toString())
.detail("Version", commitVersion)
.detail("Version", mutationVersion.toString())
.detail("Index", mIndex)
.detail("MutationReceived", mutation.toString());
batchData->counters.receivedBytes += mutation.totalSize();
@ -159,10 +160,10 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Note: Log and range mutations may be delivered out of order. Can we handle it?
if (mutation.type == MutationRef::SetVersionstampedKey ||
mutation.type == MutationRef::SetVersionstampedValue) {
batchData->addVersionStampedKV(mutation, commitVersion, numVersionStampedKV);
batchData->addVersionStampedKV(mutation, mutationVersion, numVersionStampedKV);
numVersionStampedKV++;
} else {
batchData->addMutation(mutation, commitVersion);
batchData->addMutation(mutation, mutationVersion);
}
}
curFilePos.set(req.version);
@ -239,7 +240,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
for (auto& vm : key.second->second.pendingMutations) {
for (auto& m : vm.second) {
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("PendingMutationVersion", vm.first)
.detail("PendingMutationVersion", vm.first.toString())
.detail("PendingMutation", m.toString());
}
}
@ -250,7 +251,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
// The key's version ideally should be the most recently committed version.
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
key.second->second.add(m, (Version)1);
key.second->second.add(m, LogMessageVersion(1));
key.second->second.precomputeResult();
i++;
}

View File

@ -51,14 +51,14 @@ struct StagingKey {
Key key; // TODO: Maybe not needed?
Value val;
MutationRef::Type type; // set or clear
Version version; // largest version of set or clear for the key
std::map<Version, MutationsVec> pendingMutations; // mutations not set or clear type
LogMessageVersion version; // largest version of set or clear for the key
std::map<LogMessageVersion, MutationsVec> pendingMutations; // mutations not set or clear type
explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {}
// Add mutation m at newVersion to stagingKey
// Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set
void add(const MutationRef& m, Version newVersion) {
void add(const MutationRef& m, LogMessageVersion newVersion) {
ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue);
if (version < newVersion) {
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
@ -76,14 +76,14 @@ struct StagingKey {
}
} else if (version == newVersion) { // Sanity check
TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion")
.detail("Version", newVersion)
.detail("Version", newVersion.toString())
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type]);
if (m.type == MutationRef::SetValue) {
if (type == MutationRef::SetValue) {
if (m.param2 != val) {
TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled")
.detail("Version", newVersion)
.detail("Version", newVersion.toString())
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type])
.detail("ExitingKeyValue", val)
@ -92,7 +92,7 @@ struct StagingKey {
} // else {} Backup has duplicate set at the same version
} else {
TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionOverride")
.detail("Version", newVersion)
.detail("Version", newVersion.toString())
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type])
.detail("ExitingKeyValue", val);
@ -101,7 +101,7 @@ struct StagingKey {
}
} else if (m.type == MutationRef::ClearRange) {
TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionSkipped")
.detail("Version", newVersion)
.detail("Version", newVersion.toString())
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type])
.detail("ExitingKeyValue", val);
@ -113,9 +113,10 @@ struct StagingKey {
void precomputeResult() {
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
.detail("Key", key)
.detail("Version", version)
.detail("LargestPendingVersion", (pendingMutations.empty() ? -1 : pendingMutations.rbegin()->first));
std::map<Version, MutationsVec>::iterator lb = pendingMutations.lower_bound(version);
.detail("Version", version.toString())
.detail("LargestPendingVersion",
(pendingMutations.empty() ? "-1" : pendingMutations.rbegin()->first.toString()));
std::map<LogMessageVersion, MutationsVec>::iterator lb = pendingMutations.lower_bound(version);
if (lb == pendingMutations.end()) {
return;
}
@ -158,11 +159,11 @@ struct StagingKey {
type = MutationRef::SetValue; // Precomputed result should be set to DB.
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet")
.detail("Type", typeString[mutation.type])
.detail("Version", lb->first);
.detail("Version", lb->first.toString());
} else {
TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation")
.detail("Type", typeString[mutation.type])
.detail("Version", lb->first);
.detail("Version", lb->first.toString());
}
}
version = lb->first;
@ -172,10 +173,10 @@ struct StagingKey {
// Does the key has at least 1 set or clear mutation to get the base value
bool hasBaseValue() {
if (version > 0) {
if (version.version > 0) {
ASSERT(type == MutationRef::SetValue || type == MutationRef::ClearRange);
}
return version > 0;
return version.version > 0;
}
// Has all pendingMutations been pre-applied to the val?
@ -191,9 +192,9 @@ struct StagingKey {
// Range mutations should be applied both to the destination DB and to the StagingKeys
struct StagingKeyRange {
Standalone<MutationRef> mutation;
Version version;
LogMessageVersion version;
explicit StagingKeyRange(MutationRef m, Version newVersion) : mutation(m), version(newVersion) {}
explicit StagingKeyRange(MutationRef m, LogMessageVersion newVersion) : mutation(m), version(newVersion) {}
bool operator<(const StagingKeyRange& rhs) const {
return std::tie(version, mutation.type, mutation.param1, mutation.param2) <
@ -263,7 +264,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
}
~ApplierBatchData() = default;
void addMutation(MutationRef m, Version ver) {
void addMutation(MutationRef m, LogMessageVersion ver) {
if (!isRangeMutation(m)) {
auto item = stagingKeys.emplace(m.param1, StagingKey());
item.first->second.add(m, ver);
@ -272,20 +273,20 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
}
}
void addVersionStampedKV(MutationRef m, Version ver, uint16_t numVersionStampedKV) {
void addVersionStampedKV(MutationRef m, LogMessageVersion ver, uint16_t numVersionStampedKV) {
if (m.type == MutationRef::SetVersionstampedKey) {
// Assume transactionNumber = 0 does not affect result
TraceEvent(SevDebug, "FastRestoreApplierAddMutation")
.detail("MutationType", typeString[m.type])
.detail("FakedTransactionNumber", numVersionStampedKV);
transformVersionstampMutation(m, &MutationRef::param1, ver, numVersionStampedKV);
transformVersionstampMutation(m, &MutationRef::param1, ver.version, numVersionStampedKV);
addMutation(m, ver);
} else if (m.type == MutationRef::SetVersionstampedValue) {
// Assume transactionNumber = 0 does not affect result
TraceEvent(SevDebug, "FastRestoreApplierAddMutation")
.detail("MutationType", typeString[m.type])
.detail("FakedTransactionNumber", numVersionStampedKV);
transformVersionstampMutation(m, &MutationRef::param2, ver, numVersionStampedKV);
transformVersionstampMutation(m, &MutationRef::param2, ver.version, numVersionStampedKV);
addMutation(m, ver);
} else {
ASSERT(false);
@ -298,8 +299,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
if (!stagingKey.second.hasPrecomputed()) {
TraceEvent("FastRestoreApplierAllKeysPrecomputedFalse")
.detail("Key", stagingKey.first)
.detail("BufferedVersion", stagingKey.second.version)
.detail("MaxPendingVersion", stagingKey.second.pendingMutations.rbegin()->first);
.detail("BufferedVersion", stagingKey.second.version.toString())
.detail("MaxPendingVersion", stagingKey.second.pendingMutations.rbegin()->first.toString());
return false;
}
}
@ -320,20 +321,17 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
}
bool isKVOpsSorted() {
bool ret = true;
auto prev = kvOps.begin();
for (auto it = kvOps.begin(); it != kvOps.end(); ++it) {
if (prev->first > it->first) {
ret = false;
break;
return false;
}
prev = it;
}
return ret;
return true;
}
bool allOpsAreKnown() {
bool ret = true;
for (auto it = kvOps.begin(); it != kvOps.end(); ++it) {
for (auto m = it->second.begin(); m != it->second.end(); ++m) {
if (m->type == MutationRef::SetValue || m->type == MutationRef::ClearRange ||
@ -341,11 +339,11 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
continue;
else {
TraceEvent(SevError, "FastRestore").detail("UnknownMutationType", m->type);
ret = false;
return false;
}
}
}
return ret;
return true;
}
};

View File

@ -165,7 +165,6 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
// Read block header
if (reader.consume<int32_t>() != PARTITIONED_MLOG_VERSION) throw restore_unsupported_file_version();
Version lastVersion = invalidVersion;
VersionedMutationsMap& kvOps = kvOpsIter->second;
VersionedMutationsMap::iterator it = kvOps.end();
while (1) {
@ -173,20 +172,18 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
if (reader.eof() || *reader.rptr == 0xFF) break;
// Deserialize messages written in saveMutationsToFile().
Version msgVersion = bigEndian64(reader.consume<Version>());
bigEndian32(reader.consume<uint32_t>()); // subsequence number
LogMessageVersion msgVersion;
msgVersion.version = bigEndian64(reader.consume<Version>());
msgVersion.sub = bigEndian32(reader.consume<uint32_t>());
int msgSize = bigEndian32(reader.consume<int>());
const uint8_t* message = reader.consume(msgSize);
// Skip mutations out of the version range
if (!asset.isInVersionRange(msgVersion)) continue;
if (!asset.isInVersionRange(msgVersion.version)) continue;
if (lastVersion != msgVersion) {
bool inserted;
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
lastVersion = msgVersion;
}
ASSERT(it != kvOps.end());
bool inserted;
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
ASSERT(inserted);
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion));
MutationRef mutation;
@ -205,7 +202,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
}
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
.detail("CommitVersion", msgVersion)
.detail("CommitVersion", msgVersion.toString())
.detail("ParsedMutation", mutation.toString());
it->second.push_back_deep(it->second.arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
@ -306,7 +303,6 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
.detail("BatchIndex", req.batchIndex)
.detail("ProcessLoadParam", req.param.toString());
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
batchData->processedFileParams[req.param] = Never(); // Ensure second exec. wait on _processLoadingParam()
batchData->processedFileParams[req.param] = _processLoadingParam(req.param, batchData, self->id(), self->bc);
isDuplicated = false;
} else {
@ -314,8 +310,9 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
.detail("BatchIndex", req.batchIndex)
.detail("WaitOnProcessLoadParam", req.param.toString());
}
ASSERT(batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end());
wait(batchData->processedFileParams[req.param]); // wait on the processing of the req.param.
auto it = batchData->processedFileParams.find(req.param);
ASSERT(it != batchData->processedFileParams.end());
wait(it->second); // wait on the processing of the req.param.
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param], isDuplicated));
TraceEvent("FastRestoreLoaderPhaseLoadFileDone", self->id())
@ -426,16 +423,15 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("RestoreAsset", asset.toString());
// There should be no mutation at asset.endVersion version because it is exclusive
if (kvOps.find(asset.endVersion) != kvOps.end()) {
if (kvOps.find(LogMessageVersion(asset.endVersion)) != kvOps.end()) {
TraceEvent(SevError, "FastRestoreLoaderSendMutationToApplier")
.detail("BatchIndex", batchIndex)
.detail("RestoreAsset", asset.toString())
.detail("IsRangeFile", isRangeFile)
.detail("Data loss at version", asset.endVersion);
}
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if (kvOps.find(asset.endVersion) == kvOps.end()) {
kvOps[asset.endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
} else {
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
kvOps[LogMessageVersion(asset.endVersion)] = MutationsVec(); // Empty mutation vector will be handled by applier
}
splitMutationIndex = 0;
@ -445,22 +441,24 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
std::map<UID, MutationsVec> applierMutationsBuffer;
std::map<UID, SubSequenceVec> applierSubsBuffer;
std::map<UID, double> applierMutationsSize;
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec();
applierSubsBuffer[applierID] = SubSequenceVec();
applierMutationsSize[applierID] = 0.0;
}
Version commitVersion = kvOp->first;
if (!(commitVersion >= asset.beginVersion && commitVersion <= asset.endVersion)) { // Debug purpose
const LogMessageVersion& commitVersion = kvOp->first;
if (!(commitVersion.version >= asset.beginVersion &&
commitVersion.version <= asset.endVersion)) { // Debug purpose
TraceEvent(SevError, "FastRestore_SendMutationsToApplier")
.detail("CommitVersion", commitVersion)
.detail("CommitVersion", commitVersion.version)
.detail("RestoreAsset", asset.toString());
}
ASSERT(commitVersion >= asset.beginVersion);
ASSERT(commitVersion <= asset.endVersion); // endVersion is an empty commit to ensure progress
ASSERT(commitVersion.version >= asset.beginVersion);
ASSERT(commitVersion.version <= asset.endVersion); // endVersion is an empty commit to ensure progress
for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
MutationRef kvm = kvOp->second[mIndex];
for (const MutationRef& kvm : kvOp->second) {
// Send the mutation to applier
if (isRangeMutation(kvm)) {
MutationsVec mvector;
@ -478,6 +476,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
// printf("SPLITTED MUTATION: %d: mutation:%s applierID:%s\n", splitMutationIndex,
// mutation.toString().c_str(), applierID.toString().c_str());
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation);
applierSubsBuffer[applierID].push_back(applierSubsBuffer[applierID].arena(), commitVersion.sub);
applierMutationsSize[applierID] += mutation.expectedSize();
kvCount++;
@ -493,30 +492,30 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
kvCount++;
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation);
applierSubsBuffer[applierID].push_back(applierSubsBuffer[applierID].arena(), commitVersion.sub);
applierMutationsSize[applierID] += mutation.expectedSize();
}
} // Mutations at the same version
// TODO: Sanity check each asset has been received exactly once!
// Send the mutations to appliers for each version
for (auto& applierID : applierIDs) {
requests.push_back(std::make_pair(
applierID, RestoreSendVersionedMutationsRequest(batchIndex, asset, prevVersion, commitVersion,
isRangeFile, applierMutationsBuffer[applierID])));
for (const UID& applierID : applierIDs) {
requests.emplace_back(applierID, RestoreSendVersionedMutationsRequest(
batchIndex, asset, prevVersion, commitVersion.version, isRangeFile,
applierMutationsBuffer[applierID], applierSubsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestore_SendMutationToApplier")
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion)
.detail("CommitVersion", commitVersion.toString())
.detail("RestoreAsset", asset.toString());
ASSERT(prevVersion < commitVersion);
prevVersion = commitVersion;
ASSERT(prevVersion <= commitVersion.version);
prevVersion = commitVersion.version;
// Tracking this request can be spammy
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations,
SERVER_KNOBS->FASTRESTORE_TRACK_LOADER_SEND_REQUESTS));
requests.clear();
} // all versions of mutations in the same file
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);
@ -655,7 +654,8 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
// We have already filter the commit not in [beginVersion, endVersion) when we concatenate kv pair in log file
ASSERT_WE_THINK(asset.isInVersionRange(commitVersion));
kvOps.insert(std::make_pair(commitVersion, MutationsVec()));
auto it = kvOps.insert(std::make_pair(LogMessageVersion(commitVersion), MutationsVec()));
ASSERT(it.second); // inserted is true
StringRefReader vReader(val, restore_corrupted_data());
vReader.consume<uint64_t>(); // Consume the includeVersion
@ -695,7 +695,7 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
kvOps[commitVersion].push_back_deep(kvOps[commitVersion].arena(), mutation);
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
samples.push_back_deep(samples.arena(), mutation);
@ -774,13 +774,13 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
cc->loadedRangeBytes += m.totalSize();
// We cache all kv operations into kvOps, and apply all kv operations later in one place
kvOps.insert(std::make_pair(version, MutationsVec()));
auto it = kvOps.insert(std::make_pair(LogMessageVersion(version), MutationsVec()));
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
.detail("CommitVersion", version)
.detail("ParsedMutationKV", m.toString());
ASSERT_WE_THINK(kvOps.find(version) != kvOps.end());
kvOps[version].push_back_deep(kvOps[version].arena(), m);
ASSERT_WE_THINK(kvOps.find(LogMessageVersion(version)) != kvOps.end());
it.first->second.push_back_deep(it.first->second.arena(), m);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
cc->sampledRangeBytes += m.totalSize();

View File

@ -51,9 +51,11 @@ struct RestoreMasterData;
struct RestoreSimpleRequest;
// VersionedMutationsMap: Key is the version of parsed backup mutations
// Value MutationsVec is the vector of parsed backup mutations
using VersionedMutationsMap = std::map<Version, MutationsVec>;
// Key is the (version, subsequence) of parsed backup mutations.
// Value MutationsVec is the vector of parsed backup mutations.
// For old mutation logs, the subsequence number is always 0.
// For partitioned mutation logs, each mutation has a unique LogMessageVersion.
using VersionedMutationsMap = std::map<LogMessageVersion, MutationsVec>;
ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatchIndex, std::string name);
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);

View File

@ -39,6 +39,7 @@
//#define SevFRMutationInfo SevInfo
using MutationsVec = Standalone<VectorRef<MutationRef>>;
using SubSequenceVec = Standalone<VectorRef<uint32_t>>;
enum class RestoreRole { Invalid = 0, Master = 1, Loader, Applier };
BINARY_SERIALIZABLE(RestoreRole);