Add subsequence number to restore loader & applier
The subsequence number is needed so that mutations of the same commit version number, but from different partitioned logs can be correctly reassembled in order. For old backup files, the sub number is always 0. For partitioned mutation logs, the actual sub number is used. For range files, the sub number is always 0.
This commit is contained in:
parent
fda6c08640
commit
ab0b59b0c3
|
@ -453,26 +453,28 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest {
|
|||
Version prevVersion, version; // version is the commitVersion of the mutation vector.
|
||||
bool isRangeFile;
|
||||
MutationsVec mutations; // All mutations at the same version parsed by one loader
|
||||
SubSequenceVec subs; // Sub-sequence number for mutations
|
||||
|
||||
ReplyPromise<RestoreCommonReply> reply;
|
||||
|
||||
RestoreSendVersionedMutationsRequest() = default;
|
||||
explicit RestoreSendVersionedMutationsRequest(int batchIndex, const RestoreAsset& asset, Version prevVersion,
|
||||
Version version, bool isRangeFile, MutationsVec mutations)
|
||||
Version version, bool isRangeFile, MutationsVec mutations,
|
||||
SubSequenceVec subs)
|
||||
: batchIndex(batchIndex), asset(asset), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile),
|
||||
mutations(mutations) {}
|
||||
mutations(mutations), subs(subs) {}
|
||||
|
||||
std::string toString() {
|
||||
std::stringstream ss;
|
||||
ss << "VersionBatchIndex:" << batchIndex << "RestoreAsset:" << asset.toString()
|
||||
<< " prevVersion:" << prevVersion << " version:" << version << " isRangeFile:" << isRangeFile
|
||||
<< " mutations.size:" << mutations.size();
|
||||
<< " mutations.size:" << mutations.size() << " subs.size:" << subs.size();
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, batchIndex, asset, prevVersion, version, isRangeFile, mutations, reply);
|
||||
serializer(ar, batchIndex, asset, prevVersion, version, isRangeFile, mutations, subs, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
|
||||
// The actor may be invovked multiple times and executed async.
|
||||
// No race condition as long as we do not wait or yield when operate the shared data.
|
||||
// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions;
|
||||
// Multiple such actors can run on different fileIDs;
|
||||
// Only one actor can process mutations from the same file
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
|
||||
Reference<RestoreApplierData> self) {
|
||||
|
@ -126,21 +126,22 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
state bool isDuplicated = true;
|
||||
if (curFilePos.get() == req.prevVersion) {
|
||||
isDuplicated = false;
|
||||
Version commitVersion = req.version;
|
||||
const Version commitVersion = req.version;
|
||||
uint16_t numVersionStampedKV = 0;
|
||||
MutationsVec mutations(req.mutations);
|
||||
// Sanity check: mutations in range file is in [beginVersion, endVersion);
|
||||
// mutations in log file is in [beginVersion, endVersion], both inclusive.
|
||||
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion);
|
||||
// Loader sends the endVersion to ensure all useful versions are sent
|
||||
ASSERT_WE_THINK(commitVersion <= req.asset.endVersion);
|
||||
ASSERT(req.mutations.size() == req.subs.size());
|
||||
|
||||
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
|
||||
MutationRef mutation = mutations[mIndex];
|
||||
for (int mIndex = 0; mIndex < req.mutations.size(); mIndex++) {
|
||||
const MutationRef& mutation = req.mutations[mIndex];
|
||||
const LogMessageVersion mutationVersion(commitVersion, req.subs[mIndex]);
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||
.detail("ApplierNode", self->id())
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("Version", commitVersion)
|
||||
.detail("Version", mutationVersion.toString())
|
||||
.detail("Index", mIndex)
|
||||
.detail("MutationReceived", mutation.toString());
|
||||
batchData->counters.receivedBytes += mutation.totalSize();
|
||||
|
@ -159,10 +160,10 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
// Note: Log and range mutations may be delivered out of order. Can we handle it?
|
||||
if (mutation.type == MutationRef::SetVersionstampedKey ||
|
||||
mutation.type == MutationRef::SetVersionstampedValue) {
|
||||
batchData->addVersionStampedKV(mutation, commitVersion, numVersionStampedKV);
|
||||
batchData->addVersionStampedKV(mutation, mutationVersion, numVersionStampedKV);
|
||||
numVersionStampedKV++;
|
||||
} else {
|
||||
batchData->addMutation(mutation, commitVersion);
|
||||
batchData->addMutation(mutation, mutationVersion);
|
||||
}
|
||||
}
|
||||
curFilePos.set(req.version);
|
||||
|
@ -239,7 +240,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
for (auto& vm : key.second->second.pendingMutations) {
|
||||
for (auto& m : vm.second) {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
|
||||
.detail("PendingMutationVersion", vm.first)
|
||||
.detail("PendingMutationVersion", vm.first.toString())
|
||||
.detail("PendingMutation", m.toString());
|
||||
}
|
||||
}
|
||||
|
@ -250,7 +251,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
// The key's version ideally should be the most recently committed version.
|
||||
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
|
||||
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
|
||||
key.second->second.add(m, (Version)1);
|
||||
key.second->second.add(m, LogMessageVersion(1));
|
||||
key.second->second.precomputeResult();
|
||||
i++;
|
||||
}
|
||||
|
|
|
@ -51,14 +51,14 @@ struct StagingKey {
|
|||
Key key; // TODO: Maybe not needed?
|
||||
Value val;
|
||||
MutationRef::Type type; // set or clear
|
||||
Version version; // largest version of set or clear for the key
|
||||
std::map<Version, MutationsVec> pendingMutations; // mutations not set or clear type
|
||||
LogMessageVersion version; // largest version of set or clear for the key
|
||||
std::map<LogMessageVersion, MutationsVec> pendingMutations; // mutations not set or clear type
|
||||
|
||||
explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {}
|
||||
|
||||
// Add mutation m at newVersion to stagingKey
|
||||
// Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set
|
||||
void add(const MutationRef& m, Version newVersion) {
|
||||
void add(const MutationRef& m, LogMessageVersion newVersion) {
|
||||
ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue);
|
||||
if (version < newVersion) {
|
||||
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
|
||||
|
@ -76,14 +76,14 @@ struct StagingKey {
|
|||
}
|
||||
} else if (version == newVersion) { // Sanity check
|
||||
TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion")
|
||||
.detail("Version", newVersion)
|
||||
.detail("Version", newVersion.toString())
|
||||
.detail("NewMutation", m.toString())
|
||||
.detail("ExistingKeyType", typeString[type]);
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
if (type == MutationRef::SetValue) {
|
||||
if (m.param2 != val) {
|
||||
TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled")
|
||||
.detail("Version", newVersion)
|
||||
.detail("Version", newVersion.toString())
|
||||
.detail("NewMutation", m.toString())
|
||||
.detail("ExistingKeyType", typeString[type])
|
||||
.detail("ExitingKeyValue", val)
|
||||
|
@ -92,7 +92,7 @@ struct StagingKey {
|
|||
} // else {} Backup has duplicate set at the same version
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionOverride")
|
||||
.detail("Version", newVersion)
|
||||
.detail("Version", newVersion.toString())
|
||||
.detail("NewMutation", m.toString())
|
||||
.detail("ExistingKeyType", typeString[type])
|
||||
.detail("ExitingKeyValue", val);
|
||||
|
@ -101,7 +101,7 @@ struct StagingKey {
|
|||
}
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionSkipped")
|
||||
.detail("Version", newVersion)
|
||||
.detail("Version", newVersion.toString())
|
||||
.detail("NewMutation", m.toString())
|
||||
.detail("ExistingKeyType", typeString[type])
|
||||
.detail("ExitingKeyValue", val);
|
||||
|
@ -113,9 +113,10 @@ struct StagingKey {
|
|||
void precomputeResult() {
|
||||
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
|
||||
.detail("Key", key)
|
||||
.detail("Version", version)
|
||||
.detail("LargestPendingVersion", (pendingMutations.empty() ? -1 : pendingMutations.rbegin()->first));
|
||||
std::map<Version, MutationsVec>::iterator lb = pendingMutations.lower_bound(version);
|
||||
.detail("Version", version.toString())
|
||||
.detail("LargestPendingVersion",
|
||||
(pendingMutations.empty() ? "-1" : pendingMutations.rbegin()->first.toString()));
|
||||
std::map<LogMessageVersion, MutationsVec>::iterator lb = pendingMutations.lower_bound(version);
|
||||
if (lb == pendingMutations.end()) {
|
||||
return;
|
||||
}
|
||||
|
@ -158,11 +159,11 @@ struct StagingKey {
|
|||
type = MutationRef::SetValue; // Precomputed result should be set to DB.
|
||||
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet")
|
||||
.detail("Type", typeString[mutation.type])
|
||||
.detail("Version", lb->first);
|
||||
.detail("Version", lb->first.toString());
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation")
|
||||
.detail("Type", typeString[mutation.type])
|
||||
.detail("Version", lb->first);
|
||||
.detail("Version", lb->first.toString());
|
||||
}
|
||||
}
|
||||
version = lb->first;
|
||||
|
@ -172,10 +173,10 @@ struct StagingKey {
|
|||
|
||||
// Does the key has at least 1 set or clear mutation to get the base value
|
||||
bool hasBaseValue() {
|
||||
if (version > 0) {
|
||||
if (version.version > 0) {
|
||||
ASSERT(type == MutationRef::SetValue || type == MutationRef::ClearRange);
|
||||
}
|
||||
return version > 0;
|
||||
return version.version > 0;
|
||||
}
|
||||
|
||||
// Has all pendingMutations been pre-applied to the val?
|
||||
|
@ -191,9 +192,9 @@ struct StagingKey {
|
|||
// Range mutations should be applied both to the destination DB and to the StagingKeys
|
||||
struct StagingKeyRange {
|
||||
Standalone<MutationRef> mutation;
|
||||
Version version;
|
||||
LogMessageVersion version;
|
||||
|
||||
explicit StagingKeyRange(MutationRef m, Version newVersion) : mutation(m), version(newVersion) {}
|
||||
explicit StagingKeyRange(MutationRef m, LogMessageVersion newVersion) : mutation(m), version(newVersion) {}
|
||||
|
||||
bool operator<(const StagingKeyRange& rhs) const {
|
||||
return std::tie(version, mutation.type, mutation.param1, mutation.param2) <
|
||||
|
@ -263,7 +264,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
}
|
||||
~ApplierBatchData() = default;
|
||||
|
||||
void addMutation(MutationRef m, Version ver) {
|
||||
void addMutation(MutationRef m, LogMessageVersion ver) {
|
||||
if (!isRangeMutation(m)) {
|
||||
auto item = stagingKeys.emplace(m.param1, StagingKey());
|
||||
item.first->second.add(m, ver);
|
||||
|
@ -272,20 +273,20 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
}
|
||||
}
|
||||
|
||||
void addVersionStampedKV(MutationRef m, Version ver, uint16_t numVersionStampedKV) {
|
||||
void addVersionStampedKV(MutationRef m, LogMessageVersion ver, uint16_t numVersionStampedKV) {
|
||||
if (m.type == MutationRef::SetVersionstampedKey) {
|
||||
// Assume transactionNumber = 0 does not affect result
|
||||
TraceEvent(SevDebug, "FastRestoreApplierAddMutation")
|
||||
.detail("MutationType", typeString[m.type])
|
||||
.detail("FakedTransactionNumber", numVersionStampedKV);
|
||||
transformVersionstampMutation(m, &MutationRef::param1, ver, numVersionStampedKV);
|
||||
transformVersionstampMutation(m, &MutationRef::param1, ver.version, numVersionStampedKV);
|
||||
addMutation(m, ver);
|
||||
} else if (m.type == MutationRef::SetVersionstampedValue) {
|
||||
// Assume transactionNumber = 0 does not affect result
|
||||
TraceEvent(SevDebug, "FastRestoreApplierAddMutation")
|
||||
.detail("MutationType", typeString[m.type])
|
||||
.detail("FakedTransactionNumber", numVersionStampedKV);
|
||||
transformVersionstampMutation(m, &MutationRef::param2, ver, numVersionStampedKV);
|
||||
transformVersionstampMutation(m, &MutationRef::param2, ver.version, numVersionStampedKV);
|
||||
addMutation(m, ver);
|
||||
} else {
|
||||
ASSERT(false);
|
||||
|
@ -298,8 +299,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
if (!stagingKey.second.hasPrecomputed()) {
|
||||
TraceEvent("FastRestoreApplierAllKeysPrecomputedFalse")
|
||||
.detail("Key", stagingKey.first)
|
||||
.detail("BufferedVersion", stagingKey.second.version)
|
||||
.detail("MaxPendingVersion", stagingKey.second.pendingMutations.rbegin()->first);
|
||||
.detail("BufferedVersion", stagingKey.second.version.toString())
|
||||
.detail("MaxPendingVersion", stagingKey.second.pendingMutations.rbegin()->first.toString());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -320,20 +321,17 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
}
|
||||
|
||||
bool isKVOpsSorted() {
|
||||
bool ret = true;
|
||||
auto prev = kvOps.begin();
|
||||
for (auto it = kvOps.begin(); it != kvOps.end(); ++it) {
|
||||
if (prev->first > it->first) {
|
||||
ret = false;
|
||||
break;
|
||||
return false;
|
||||
}
|
||||
prev = it;
|
||||
}
|
||||
return ret;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool allOpsAreKnown() {
|
||||
bool ret = true;
|
||||
for (auto it = kvOps.begin(); it != kvOps.end(); ++it) {
|
||||
for (auto m = it->second.begin(); m != it->second.end(); ++m) {
|
||||
if (m->type == MutationRef::SetValue || m->type == MutationRef::ClearRange ||
|
||||
|
@ -341,11 +339,11 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
continue;
|
||||
else {
|
||||
TraceEvent(SevError, "FastRestore").detail("UnknownMutationType", m->type);
|
||||
ret = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -165,7 +165,6 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
// Read block header
|
||||
if (reader.consume<int32_t>() != PARTITIONED_MLOG_VERSION) throw restore_unsupported_file_version();
|
||||
|
||||
Version lastVersion = invalidVersion;
|
||||
VersionedMutationsMap& kvOps = kvOpsIter->second;
|
||||
VersionedMutationsMap::iterator it = kvOps.end();
|
||||
while (1) {
|
||||
|
@ -173,20 +172,18 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
if (reader.eof() || *reader.rptr == 0xFF) break;
|
||||
|
||||
// Deserialize messages written in saveMutationsToFile().
|
||||
Version msgVersion = bigEndian64(reader.consume<Version>());
|
||||
bigEndian32(reader.consume<uint32_t>()); // subsequence number
|
||||
LogMessageVersion msgVersion;
|
||||
msgVersion.version = bigEndian64(reader.consume<Version>());
|
||||
msgVersion.sub = bigEndian32(reader.consume<uint32_t>());
|
||||
int msgSize = bigEndian32(reader.consume<int>());
|
||||
const uint8_t* message = reader.consume(msgSize);
|
||||
|
||||
// Skip mutations out of the version range
|
||||
if (!asset.isInVersionRange(msgVersion)) continue;
|
||||
if (!asset.isInVersionRange(msgVersion.version)) continue;
|
||||
|
||||
if (lastVersion != msgVersion) {
|
||||
bool inserted;
|
||||
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
|
||||
lastVersion = msgVersion;
|
||||
}
|
||||
ASSERT(it != kvOps.end());
|
||||
bool inserted;
|
||||
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
|
||||
ASSERT(inserted);
|
||||
|
||||
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion));
|
||||
MutationRef mutation;
|
||||
|
@ -205,7 +202,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
}
|
||||
|
||||
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
|
||||
.detail("CommitVersion", msgVersion)
|
||||
.detail("CommitVersion", msgVersion.toString())
|
||||
.detail("ParsedMutation", mutation.toString());
|
||||
it->second.push_back_deep(it->second.arena(), mutation);
|
||||
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
|
||||
|
@ -306,7 +303,6 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("ProcessLoadParam", req.param.toString());
|
||||
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
|
||||
batchData->processedFileParams[req.param] = Never(); // Ensure second exec. wait on _processLoadingParam()
|
||||
batchData->processedFileParams[req.param] = _processLoadingParam(req.param, batchData, self->id(), self->bc);
|
||||
isDuplicated = false;
|
||||
} else {
|
||||
|
@ -314,8 +310,9 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("WaitOnProcessLoadParam", req.param.toString());
|
||||
}
|
||||
ASSERT(batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end());
|
||||
wait(batchData->processedFileParams[req.param]); // wait on the processing of the req.param.
|
||||
auto it = batchData->processedFileParams.find(req.param);
|
||||
ASSERT(it != batchData->processedFileParams.end());
|
||||
wait(it->second); // wait on the processing of the req.param.
|
||||
|
||||
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param], isDuplicated));
|
||||
TraceEvent("FastRestoreLoaderPhaseLoadFileDone", self->id())
|
||||
|
@ -426,16 +423,15 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
.detail("RestoreAsset", asset.toString());
|
||||
|
||||
// There should be no mutation at asset.endVersion version because it is exclusive
|
||||
if (kvOps.find(asset.endVersion) != kvOps.end()) {
|
||||
if (kvOps.find(LogMessageVersion(asset.endVersion)) != kvOps.end()) {
|
||||
TraceEvent(SevError, "FastRestoreLoaderSendMutationToApplier")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("RestoreAsset", asset.toString())
|
||||
.detail("IsRangeFile", isRangeFile)
|
||||
.detail("Data loss at version", asset.endVersion);
|
||||
}
|
||||
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
|
||||
if (kvOps.find(asset.endVersion) == kvOps.end()) {
|
||||
kvOps[asset.endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
|
||||
} else {
|
||||
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
|
||||
kvOps[LogMessageVersion(asset.endVersion)] = MutationsVec(); // Empty mutation vector will be handled by applier
|
||||
}
|
||||
|
||||
splitMutationIndex = 0;
|
||||
|
@ -445,22 +441,24 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
// applierMutationsBuffer is the mutation vector to be sent to each applier
|
||||
// applierMutationsSize is buffered mutation vector size for each applier
|
||||
std::map<UID, MutationsVec> applierMutationsBuffer;
|
||||
std::map<UID, SubSequenceVec> applierSubsBuffer;
|
||||
std::map<UID, double> applierMutationsSize;
|
||||
for (auto& applierID : applierIDs) {
|
||||
applierMutationsBuffer[applierID] = MutationsVec();
|
||||
applierSubsBuffer[applierID] = SubSequenceVec();
|
||||
applierMutationsSize[applierID] = 0.0;
|
||||
}
|
||||
Version commitVersion = kvOp->first;
|
||||
if (!(commitVersion >= asset.beginVersion && commitVersion <= asset.endVersion)) { // Debug purpose
|
||||
const LogMessageVersion& commitVersion = kvOp->first;
|
||||
if (!(commitVersion.version >= asset.beginVersion &&
|
||||
commitVersion.version <= asset.endVersion)) { // Debug purpose
|
||||
TraceEvent(SevError, "FastRestore_SendMutationsToApplier")
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("CommitVersion", commitVersion.version)
|
||||
.detail("RestoreAsset", asset.toString());
|
||||
}
|
||||
ASSERT(commitVersion >= asset.beginVersion);
|
||||
ASSERT(commitVersion <= asset.endVersion); // endVersion is an empty commit to ensure progress
|
||||
ASSERT(commitVersion.version >= asset.beginVersion);
|
||||
ASSERT(commitVersion.version <= asset.endVersion); // endVersion is an empty commit to ensure progress
|
||||
|
||||
for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
|
||||
MutationRef kvm = kvOp->second[mIndex];
|
||||
for (const MutationRef& kvm : kvOp->second) {
|
||||
// Send the mutation to applier
|
||||
if (isRangeMutation(kvm)) {
|
||||
MutationsVec mvector;
|
||||
|
@ -478,6 +476,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
// printf("SPLITTED MUTATION: %d: mutation:%s applierID:%s\n", splitMutationIndex,
|
||||
// mutation.toString().c_str(), applierID.toString().c_str());
|
||||
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation);
|
||||
applierSubsBuffer[applierID].push_back(applierSubsBuffer[applierID].arena(), commitVersion.sub);
|
||||
applierMutationsSize[applierID] += mutation.expectedSize();
|
||||
|
||||
kvCount++;
|
||||
|
@ -493,30 +492,30 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
kvCount++;
|
||||
|
||||
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation);
|
||||
applierSubsBuffer[applierID].push_back(applierSubsBuffer[applierID].arena(), commitVersion.sub);
|
||||
applierMutationsSize[applierID] += mutation.expectedSize();
|
||||
}
|
||||
} // Mutations at the same version
|
||||
|
||||
// TODO: Sanity check each asset has been received exactly once!
|
||||
// Send the mutations to appliers for each version
|
||||
for (auto& applierID : applierIDs) {
|
||||
requests.push_back(std::make_pair(
|
||||
applierID, RestoreSendVersionedMutationsRequest(batchIndex, asset, prevVersion, commitVersion,
|
||||
isRangeFile, applierMutationsBuffer[applierID])));
|
||||
for (const UID& applierID : applierIDs) {
|
||||
requests.emplace_back(applierID, RestoreSendVersionedMutationsRequest(
|
||||
batchIndex, asset, prevVersion, commitVersion.version, isRangeFile,
|
||||
applierMutationsBuffer[applierID], applierSubsBuffer[applierID]));
|
||||
}
|
||||
TraceEvent(SevDebug, "FastRestore_SendMutationToApplier")
|
||||
.detail("PrevVersion", prevVersion)
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("CommitVersion", commitVersion.toString())
|
||||
.detail("RestoreAsset", asset.toString());
|
||||
ASSERT(prevVersion < commitVersion);
|
||||
prevVersion = commitVersion;
|
||||
ASSERT(prevVersion <= commitVersion.version);
|
||||
prevVersion = commitVersion.version;
|
||||
// Tracking this request can be spammy
|
||||
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
|
||||
TaskPriority::RestoreLoaderSendMutations,
|
||||
SERVER_KNOBS->FASTRESTORE_TRACK_LOADER_SEND_REQUESTS));
|
||||
|
||||
requests.clear();
|
||||
|
||||
} // all versions of mutations in the same file
|
||||
|
||||
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);
|
||||
|
@ -655,7 +654,8 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
|
|||
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
|
||||
// We have already filter the commit not in [beginVersion, endVersion) when we concatenate kv pair in log file
|
||||
ASSERT_WE_THINK(asset.isInVersionRange(commitVersion));
|
||||
kvOps.insert(std::make_pair(commitVersion, MutationsVec()));
|
||||
auto it = kvOps.insert(std::make_pair(LogMessageVersion(commitVersion), MutationsVec()));
|
||||
ASSERT(it.second); // inserted is true
|
||||
|
||||
StringRefReader vReader(val, restore_corrupted_data());
|
||||
vReader.consume<uint64_t>(); // Consume the includeVersion
|
||||
|
@ -695,7 +695,7 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
|
|||
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("ParsedMutation", mutation.toString());
|
||||
kvOps[commitVersion].push_back_deep(kvOps[commitVersion].arena(), mutation);
|
||||
it.first->second.push_back_deep(it.first->second.arena(), mutation);
|
||||
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
|
||||
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
|
||||
samples.push_back_deep(samples.arena(), mutation);
|
||||
|
@ -774,13 +774,13 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
|||
cc->loadedRangeBytes += m.totalSize();
|
||||
|
||||
// We cache all kv operations into kvOps, and apply all kv operations later in one place
|
||||
kvOps.insert(std::make_pair(version, MutationsVec()));
|
||||
auto it = kvOps.insert(std::make_pair(LogMessageVersion(version), MutationsVec()));
|
||||
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
|
||||
.detail("CommitVersion", version)
|
||||
.detail("ParsedMutationKV", m.toString());
|
||||
|
||||
ASSERT_WE_THINK(kvOps.find(version) != kvOps.end());
|
||||
kvOps[version].push_back_deep(kvOps[version].arena(), m);
|
||||
ASSERT_WE_THINK(kvOps.find(LogMessageVersion(version)) != kvOps.end());
|
||||
it.first->second.push_back_deep(it.first->second.arena(), m);
|
||||
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
|
||||
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
|
||||
cc->sampledRangeBytes += m.totalSize();
|
||||
|
|
|
@ -51,9 +51,11 @@ struct RestoreMasterData;
|
|||
|
||||
struct RestoreSimpleRequest;
|
||||
|
||||
// VersionedMutationsMap: Key is the version of parsed backup mutations
|
||||
// Value MutationsVec is the vector of parsed backup mutations
|
||||
using VersionedMutationsMap = std::map<Version, MutationsVec>;
|
||||
// Key is the (version, subsequence) of parsed backup mutations.
|
||||
// Value MutationsVec is the vector of parsed backup mutations.
|
||||
// For old mutation logs, the subsequence number is always 0.
|
||||
// For partitioned mutation logs, each mutation has a unique LogMessageVersion.
|
||||
using VersionedMutationsMap = std::map<LogMessageVersion, MutationsVec>;
|
||||
|
||||
ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatchIndex, std::string name);
|
||||
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
//#define SevFRMutationInfo SevInfo
|
||||
|
||||
using MutationsVec = Standalone<VectorRef<MutationRef>>;
|
||||
using SubSequenceVec = Standalone<VectorRef<uint32_t>>;
|
||||
|
||||
enum class RestoreRole { Invalid = 0, Master = 1, Loader, Applier };
|
||||
BINARY_SERIALIZABLE(RestoreRole);
|
||||
|
|
Loading…
Reference in New Issue