FastRestore:Loader buffer data for multiple batches

This commit is contained in:
Meng Xu 2020-01-15 13:39:06 -08:00
parent bfbf2164c4
commit d69bd2f661
5 changed files with 85 additions and 64 deletions

View File

@ -337,6 +337,7 @@ struct RestoreRecruitRoleRequest : TimedRequest {
std::string toString() { return printable(); }
};
// Static info. across version batches
struct RestoreSysInfoRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 75960741;
@ -384,21 +385,22 @@ struct RestoreLoadFileReply : TimedRequest {
struct RestoreLoadFileRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 26557364;
int batchIndex;
LoadingParam param;
ReplyPromise<RestoreLoadFileReply> reply;
RestoreLoadFileRequest() = default;
explicit RestoreLoadFileRequest(LoadingParam& param) : param(param){};
explicit RestoreLoadFileRequest(int batchIndex, LoadingParam& param) : batchIndex(batchIndex), param(param){};
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, param, reply);
serializer(ar, batchIndex, param, reply);
}
std::string toString() {
std::stringstream ss;
ss << "RestoreLoadFileRequest param:" << param.toString();
ss << "RestoreLoadFileRequest batchIndex:" << batchIndex << " param:" << param.toString();
return ss.str();
}
};

View File

@ -33,7 +33,8 @@ typedef std::map<Standalone<StringRef>, Standalone<StringRef>> SerializedMutatio
// Key has the same semantics as SerializedMutationListMap; Value is the part number of the splitted mutation list
typedef std::map<Standalone<StringRef>, uint32_t> SerializedMutationPartMap;
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena,
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier);
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
@ -43,8 +44,9 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self);
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* pkvOps,
bool isRangeFile, RestoreAsset asset, int batchIndex);
ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int batchIndex, RestoreAsset asset,
bool isRangeFile, std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap,
@ -123,7 +125,8 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
req.reply.send(RestoreCommonReply(self->id()));
}
ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoaderData> self) {
ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatchData> batchData, UID loaderID,
Reference<IBackupContainer> bc) {
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
// mutationMap: Key is the unique identifier for a batch of mutation logs at the same version
@ -131,33 +134,33 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
state std::map<Standalone<StringRef>, uint32_t> mutationPartMap; // Sanity check the data parsing is correct
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = self->kvOpsPerLP.end();
state std::map<LoadingParam, MutationsVec>::iterator samplesIter = self->sampleMutations.end();
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = batchData->kvOpsPerLP.end();
state std::map<LoadingParam, MutationsVec>::iterator samplesIter = batchData->sampleMutations.end();
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartProcessLoadParam", param.toString());
TraceEvent("FastRestore").detail("Loader", loaderID).detail("StartProcessLoadParam", param.toString());
ASSERT(param.blockSize > 0);
ASSERT(param.asset.offset % param.blockSize == 0); // Parse file must be at block bondary.
ASSERT(self->kvOpsPerLP.find(param) == self->kvOpsPerLP.end());
ASSERT(batchData->kvOpsPerLP.find(param) == batchData->kvOpsPerLP.end());
// NOTE: map's iterator is guaranteed to be stable, but pointer may not.
// state VersionedMutationsMap* kvOps = &self->kvOpsPerLP[param];
self->kvOpsPerLP.emplace(param, VersionedMutationsMap());
self->sampleMutations.emplace(param, MutationsVec());
kvOpsPerLPIter = self->kvOpsPerLP.find(param);
samplesIter = self->sampleMutations.find(param);
batchData->kvOpsPerLP.emplace(param, VersionedMutationsMap());
batchData->sampleMutations.emplace(param, MutationsVec());
kvOpsPerLPIter = batchData->kvOpsPerLP.find(param);
samplesIter = batchData->sampleMutations.find(param);
for (int64_t j = param.asset.offset; j < param.asset.len; j += param.blockSize) {
RestoreAsset subAsset = param.asset;
subAsset.offset = j;
subAsset.len = std::min<int64_t>(param.blockSize, param.asset.len - j);
if (param.isRangeFile) {
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc,
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, bc,
param.rangeVersion.get(), subAsset));
} else {
// TODO: Sanity check the log file's range is overlapped with the restored version range
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
&mutationPartMap, self->bc, subAsset));
fileParserFutures.push_back(
_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, &mutationPartMap, bc, subAsset));
}
}
wait(waitForAll(fileParserFutures));
@ -166,41 +169,45 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap, samplesIter, param.asset);
}
TraceEvent("FastRestore").detail("Loader", self->id()).detail("FinishLoadingFile", param.asset.filename);
TraceEvent("FastRestore").detail("Loader", loaderID).detail("FinishLoadingFile", param.asset.filename);
return Void();
}
// A loader can process multiple RestoreLoadFileRequest in parallel.
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self) {
if (self->processedFileParams.find(req.param) == self->processedFileParams.end()) {
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
ASSERT(batchData.isValid());
if (batchData->processedFileParams.find(req.param) == batchData->processedFileParams.end()) {
TraceEvent("FastRestore").detail("Loader", self->id()).detail("ProcessLoadParam", req.param.toString());
ASSERT(self->sampleMutations.find(req.param) == self->sampleMutations.end());
self->processedFileParams[req.param] = Never();
self->processedFileParams[req.param] = _processLoadingParam(req.param, self);
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
batchData->processedFileParams[req.param] = Never(); // Ensure second exec. wait on _processLoadingParam()
batchData->processedFileParams[req.param] = _processLoadingParam(req.param, batchData, self->id(), self->bc);
} else {
TraceEvent("FastRestore").detail("Loader", self->id()).detail("WaitOnProcessLoadParam", req.param.toString());
}
ASSERT(self->processedFileParams.find(req.param) != self->processedFileParams.end());
wait(self->processedFileParams[req.param]); // wait on the processing of the req.param.
ASSERT(batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end());
wait(batchData->processedFileParams[req.param]); // wait on the processing of the req.param.
req.reply.send(RestoreLoadFileReply(req.param, self->sampleMutations[req.param]));
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param]));
// TODO: clear self->sampleMutations[req.param] memory to save memory on loader
return Void();
}
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
state std::map<LoadingParam, VersionedMutationsMap>::iterator item = self->kvOpsPerLP.begin();
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
state std::map<LoadingParam, VersionedMutationsMap>::iterator item = batchData->kvOpsPerLP.begin();
self->rangeToApplier = req.rangeToApplier;
for (; item != self->kvOpsPerLP.end(); item++) {
batchData->rangeToApplier = req.rangeToApplier;
for (; item != batchData->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {
// Send the parsed mutation to applier who will apply the mutation to DB
// TODO: Change to parallel sending
// TODO: item should based on batchIndex
wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.asset,
req.batchIndex));
wait(sendMutationsToApplier(&item->second, req.batchIndex, item->first.asset, item->first.isRangeFile,
&batchData->rangeToApplier, &self->appliersInterf));
}
}
@ -210,18 +217,22 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
// TODO: This function can be revised better
// Assume: kvOps data are from the same file.
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* pkvOps,
bool isRangeFile, RestoreAsset asset, int batchIndex) {
// Input: pkvOps: versioned kv mutation for the asset in the version batch (batchIndex)
// isRangeFile: is pkvOps from range file? Let receiver (applier) know if the mutation is log mutation;
// pRangeToApplier: range to applierID mapping, deciding which applier is responsible for which range
// pApplierInterfaces: applier interfaces to send the mutations to
ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int batchIndex, RestoreAsset asset,
bool isRangeFile, std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
state int splitMutationIndex = 0;
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::vector<std::pair<UID, RestoreSendVersionedMutationsRequest>> requests;
state Version prevVersion = 0; // startVersion
state std::vector<UID> applierIDs = getApplierIDs(*pRangeToApplier);
TraceEvent("FastRestore_SendMutationToApplier")
.detail("Loader", self->id())
.detail("IsRangeFile", isRangeFile)
.detail("EndVersion", asset.endVersion)
.detail("RestoreAsset", asset.toString());
@ -261,7 +272,8 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
// WARNING: The splitMutation() may have bugs
splitMutation(self, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
splitMutation(pRangeToApplier, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(),
nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
for (splitMutationIndex = 0; splitMutationIndex < mvector.size(); splitMutationIndex++) {
@ -275,7 +287,7 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
kvCount++;
}
} else { // mutation operates on a particular key
std::map<Key, UID>::iterator itlow = self->rangeToApplier.upper_bound(kvm.param1);
std::map<Key, UID>::iterator itlow = pRangeToApplier->upper_bound(kvm.param1);
--itlow; // make sure itlow->first <= m.param1
ASSERT(itlow->first <= kvm.param1);
MutationRef mutation = kvm;
@ -296,13 +308,12 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
isRangeFile, applierMutationsBuffer[applierID])));
}
TraceEvent(SevDebug, "FastRestore_SendMutationToApplier")
.detail("Loader", self->id())
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion)
.detail("RestoreAsset", asset.toString());
ASSERT(prevVersion < commitVersion);
prevVersion = commitVersion;
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests));
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests));
requests.clear();
@ -313,22 +324,22 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
}
// TODO: Add a unit test for this function
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena,
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
// key range [m->param1, m->param2)
std::map<Standalone<KeyRef>, UID>::iterator itlow, itup; // we will return [itlow, itup)
itlow = self->rangeToApplier.lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
itlow = pRangeToApplier->lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
if (itlow->first > m.param1) {
if (itlow != self->rangeToApplier.begin()) {
if (itlow != pRangeToApplier->begin()) {
--itlow;
}
}
itup = self->rangeToApplier.upper_bound(m.param2); // return rmap::end if no key is after m.param2.
ASSERT(itup == self->rangeToApplier.end() || itup->first > m.param2);
itup = pRangeToApplier->upper_bound(m.param2); // return rmap::end if no key is after m.param2.
ASSERT(itup == pRangeToApplier->end() || itup->first > m.param2);
std::map<Standalone<KeyRef>, UID>::iterator itApplier;
while (itlow != itup) {
@ -606,3 +617,14 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
return Void();
}
// Return applier IDs that are used to apply key-values
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier) {
std::vector<UID> applierIDs;
for (auto& applier : rangeToApplier) {
applierIDs.push_back(applier.second);
}
ASSERT(!applierIDs.empty());
return applierIDs;
}

View File

@ -42,8 +42,7 @@
#include "flow/actorcompiler.h" // has to be last include
struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoaderData> {
struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
std::map<LoadingParam, Future<Void>> processedFileParams;
std::map<LoadingParam, VersionedMutationsMap> kvOpsPerLP; // Buffered kvOps for each loading param
@ -55,6 +54,19 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
std::map<LoadingParam, MutationsVec> sampleMutations;
int numSampledMutations; // The total number of mutations received from sampled data.
void reset() {
processedFileParams.clear();
kvOpsPerLP.clear();
sampleMutations.clear();
numSampledMutations = 0;
rangeToApplier.clear();
}
};
struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoaderData> {
// buffered data per version batch
std::map<int, Reference<LoaderBatchData>> batch;
Reference<IBackupContainer> bc; // Backup container is used to read backup files
Key bcUrl; // The url used to get the bc
@ -78,22 +90,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
void resetPerVersionBatch(int batchIndex) {
TraceEvent("FastRestore").detail("ResetPerVersionBatchOnLoader", nodeID);
rangeToApplier.clear();
numSampledMutations = 0;
processedFileParams.clear();
kvOpsPerLP.clear();
sampleMutations.clear();
}
// Only get the appliers that are responsible for a range
std::vector<UID> getWorkingApplierIDs() {
std::vector<UID> applierIDs;
for (auto& applier : rangeToApplier) {
applierIDs.push_back(applier.second);
}
ASSERT(!applierIDs.empty());
return applierIDs;
batch[batchIndex] = Reference<LoaderBatchData>(new LoaderBatchData());
}
void initBackupContainer(Key url) {

View File

@ -304,7 +304,7 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
ASSERT_WE_THINK(param.asset.offset <= file.fileSize);
ASSERT_WE_THINK(param.asset.beginVersion <= param.asset.endVersion);
requests.emplace_back(loader->first, RestoreLoadFileRequest(param));
requests.emplace_back(loader->first, RestoreLoadFileRequest(self->batchIndex, param));
loader++;
}

View File

@ -113,7 +113,7 @@ public:
std::map<UID, RestoreLoaderInterface> loadersInterf; // UID: loaderInterf's id
std::map<UID, RestoreApplierInterface> appliersInterf; // UID: applierInterf's id
RestoreApplierInterface masterApplierInterf;
RestoreApplierInterface masterApplierInterf; // TODO: Delete
NotifiedVersion versionBatchId; // Continuously increase for each versionBatch