FastRestore:Loader:Handle multiple mutations at same verions in multiple files

This commit is contained in:
Meng Xu 2019-10-16 20:30:11 -07:00
parent 27db9c326b
commit 78b1ebc7c2
2 changed files with 52 additions and 27 deletions

View File

@ -42,8 +42,9 @@ ACTOR Future<Void> handleSetApplierKeyRangeVectorRequest(RestoreSetApplierKeyRan
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self,
bool isSampling = false);
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* kvOps,
bool isRangeFile, Version startVersion, Version endVersion);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(SerializedMutationListMap* mutationMap,
bool isRangeFile, Version startVersion, Version endVersion, int fileIndex);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
std::map<Standalone<StringRef>, uint32_t>* mutationPartMap,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen,
@ -147,6 +148,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
// mutationMap: Key is the unique identifier for a batch of mutation logs at the same version
state SerializedMutationListMap mutationMap;
state std::map<Standalone<StringRef>, uint32_t> mutationPartMap; // Sanity check the data parsing is correct
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state int64_t j;
@ -160,8 +162,8 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
&kvOps, self->bc, param.version, param.filename, readOffset, readLen, param.restoreRange));
} else {
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(
&mutationMap, &mutationPartMap, self->bc, param.version, param.filename, readOffset, readLen,
param.restoreRange, param.addPrefix, param.removePrefix, param.mutationLogPrefix));
&processedFileOffset, &mutationMap, &mutationPartMap, self->bc, param.version, param.filename,
readOffset, readLen, param.restoreRange, param.addPrefix, param.removePrefix, param.mutationLogPrefix));
}
}
wait(waitForAll(fileParserFutures));
@ -171,13 +173,14 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
}
// Send the parsed mutation to applier who will apply the mutation to DB
wait(sendMutationsToApplier(self, &kvOps, param.isRangeFile, param.prevVersion, param.endVersion));
wait(sendMutationsToApplier(self, &kvOps, param.isRangeFile, param.prevVersion, param.endVersion, param.fileIndex));
TraceEvent("FastRestore").detail("Loader", self->id()).detail("FinishLoadingFile", param.filename);
return Void();
}
// A loader can process multiple RestoreLoadFileRequest in parallel.
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self,
bool isSampling) {
if (self->processedFileParams.find(req.param) == self->processedFileParams.end()) {
@ -193,8 +196,9 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
}
// TODO: This function can be revised better
// Assume: kvOps data are from the same file.
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* pkvOps,
bool isRangeFile, Version startVersion, Version endVersion) {
bool isRangeFile, Version startVersion, Version endVersion, int fileIndex) {
state VersionedMutationsMap& kvOps = *pkvOps;
state int kvCount = 0;
state int splitMutationIndex = 0;
@ -203,7 +207,8 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
.detail("SendMutationToApplier", self->id())
.detail("IsRangeFile", isRangeFile)
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion);
.detail("EndVersion", endVersion)
.detail("FileIndex", fileIndex);
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if (kvOps.find(endVersion) == kvOps.end()) {
@ -275,16 +280,22 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
// Send the mutations to appliers for each version
for (auto& applierID : applierIDs) {
requests.push_back(std::make_pair(
applierID, RestoreSendMutationVectorVersionedRequest(prevVersion, commitVersion, isRangeFile,
applierID, RestoreSendMutationVectorVersionedRequest(fileIndex, prevVersion, commitVersion, isRangeFile,
applierMutationsBuffer[applierID])));
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0;
}
TraceEvent(SevDebug, "FastRestore_Debug")
.detail("Loader", self->id())
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion)
.detail("FileIndex", fileIndex);
ASSERT(prevVersion < commitVersion);
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests));
requests.clear();
ASSERT(prevVersion < commitVersion);
prevVersion = commitVersion;
} // all versions of mutations
} // all versions of mutations in the same file
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);
return Void();
@ -446,6 +457,7 @@ void _parseSerializedMutation(VersionedMutationsMap* pkvOps, SerializedMutationL
const uint8_t* v = vReader.consume(vLen);
MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
//TraceEvent(SevDebug, "FastRestore_VerboseDebug").detail("CommitVersion", commitVersion).detail("ParsedMutation", mutation.toString());
kvOps[commitVersion].push_back_deep(kvOps[commitVersion].arena(), mutation);
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
@ -507,6 +519,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsM
// We cache all kv operations into kvOps, and apply all kv operations later in one place
kvOps.insert(std::make_pair(version, VectorRef<MutationRef>()));
//TraceEvent(SevDebug, "FastRestore_VerboseDebug").detail("CommitVersion", version).detail("ParsedMutationKV", m.toString());
ASSERT_WE_THINK(kvOps.find(version) != kvOps.end());
kvOps[version].push_back_deep(kvOps[version].arena(), m);
@ -519,7 +532,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsM
// version encoded in pair.first Step 1: decodeLogFileBlock into <string, string> pairs Step 2: Concatenate the
// pair.second of pairs with the same pair.first.
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(
std::map<Standalone<StringRef>, Standalone<StringRef>>* pMutationMap,
NotifiedVersion* pProcessedFileOffset, std::map<Standalone<StringRef>, Standalone<StringRef>>* pMutationMap,
std::map<Standalone<StringRef>, uint32_t>* pMutationPartMap, Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen, KeyRange restoreRange, Key addPrefix, Key removePrefix,
Key mutationLogPrefix) {
@ -527,18 +540,28 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(
// decodeLogFileBlock() must read block by block!
state Standalone<VectorRef<KeyValueRef>> data =
wait(parallelFileRestore::decodeLogFileBlock(inFile, readOffset, readLen));
TraceEvent("FastRestore").detail("DecodedLogFile", fileName).detail("DataSize", data.contents().size());
TraceEvent("FastRestore")
.detail("DecodedLogFile", fileName)
.detail("Offset", readOffset)
.detail("Length", readLen)
.detail("DataSize", data.contents().size());
state int start = 0;
state int end = data.size();
state int numConcatenated = 0;
for (int i = start; i < end; ++i) {
//Key k = data[i].key.withPrefix(mutationLogPrefix);
//ValueRef v = data[i].value;
// Concatenate the backuped param1 and param2 (KV) at the same version.
bool concatenated =
concatenateBackupMutationForLogFile(pMutationMap, pMutationPartMap, data[i].key, data[i].value);
numConcatenated += (concatenated ? 1 : 0);
// Ensure data blocks in the same file are processed in order
wait(pProcessedFileOffset->whenAtLeast(readOffset));
if (pProcessedFileOffset->get() == readOffset) {
state int start = 0;
state int end = data.size();
state int numConcatenated = 0;
for (int i = start; i < end; ++i) {
// Key k = data[i].key.withPrefix(mutationLogPrefix);
// ValueRef v = data[i].value;
// Concatenate the backuped param1 and param2 (KV) at the same version.
bool concatenated =
concatenateBackupMutationForLogFile(pMutationMap, pMutationPartMap, data[i].key, data[i].value);
numConcatenated += (concatenated ? 1 : 0);
}
pProcessedFileOffset->set(readOffset + readLen);
}
return Void();

View File

@ -342,26 +342,28 @@ struct RestoreSendMutationVectorVersionedRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 69764565;
Version prevVersion, version; // version is the commitVersion of the mutation vector.
int fileIndex; // Unique index for a backup file
bool isRangeFile;
Standalone<VectorRef<MutationRef>> mutations; // All mutations are at version
ReplyPromise<RestoreCommonReply> reply;
RestoreSendMutationVectorVersionedRequest() = default;
explicit RestoreSendMutationVectorVersionedRequest(Version prevVersion, Version version, bool isRangeFile,
VectorRef<MutationRef> mutations)
: prevVersion(prevVersion), version(version), isRangeFile(isRangeFile), mutations(mutations) {}
explicit RestoreSendMutationVectorVersionedRequest(int fileIndex, Version prevVersion, Version version,
bool isRangeFile, VectorRef<MutationRef> mutations)
: fileIndex(fileIndex), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile),
mutations(mutations) {}
std::string toString() {
std::stringstream ss;
ss << "prevVersion:" << prevVersion << " version:" << version << " isRangeFile:" << isRangeFile
<< " mutations.size:" << mutations.size();
ss << "fileIndex" << fileIndex << "prevVersion:" << prevVersion << " version:" << version
<< " isRangeFile:" << isRangeFile << " mutations.size:" << mutations.size();
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, prevVersion, version, isRangeFile, mutations, reply);
serializer(ar, fileIndex, prevVersion, version, isRangeFile, mutations, reply);
}
};