FastRestore:Fix bug that cause nondeterminism

1) Use map iterator instead of pointer to maintain stability when map is inserted or deleted
2) dummySampleWorkload: clear rangeToApplier data in each sampling phase. otherwise, we can
have an increasing number of keys assigned to the applier.
This commit is contained in:
Meng Xu 2019-11-13 10:57:21 -08:00
parent 9e36b897e6
commit 3f5491318d
4 changed files with 19 additions and 14 deletions

View File

@ -36,7 +36,7 @@ typedef std::map<Standalone<StringRef>, uint32_t> SerializedMutationPartMap;
bool isRangeMutation(MutationRef m);
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(VersionedMutationsMap* kvOps, SerializedMutationListMap* mutationMap,
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, SerializedMutationListMap* mutationMap,
bool isSampling = false);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
@ -50,7 +50,7 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(
NotifiedVersion* pProcessedFileOffset, SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap, Reference<IBackupContainer> bc, Version version, std::string fileName,
int64_t readOffset, int64_t readLen, KeyRange restoreRange, Key addPrefix, Key removePrefix, Key mutationLogPrefix);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsMap* kvOps,
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset_input,
int64_t readLen_input, KeyRange restoreRange);
@ -130,6 +130,10 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
ASSERT(param.blockSize > 0);
ASSERT(param.offset % param.blockSize == 0); // Parse file must be at block bondary.
ASSERT(self->kvOpsPerLP.find(param) == self->kvOpsPerLP.end());
// NOTE: map's iterator is guaranteed to be stable, but pointer may not.
//state VersionedMutationsMap* kvOps = &self->kvOpsPerLP[param];
self->kvOpsPerLP.insert(std::make_pair(param, VersionedMutationsMap()));
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = self->kvOpsPerLP.find(param);
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
@ -146,7 +150,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
readOffset = j;
readLen = std::min<int64_t>(param.blockSize, param.length - j);
if (param.isRangeFile) {
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(&self->kvOpsPerLP[param], self->bc,
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, self->bc,
param.version, param.filename, readOffset,
readLen, param.restoreRange));
} else {
@ -158,7 +162,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
wait(waitForAll(fileParserFutures));
if (!param.isRangeFile) {
_parseSerializedMutation(&self->kvOpsPerLP[param], &mutationMap);
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap);
}
TraceEvent("FastRestore").detail("Loader", self->id()).detail("FinishLoadingFile", param.filename);
@ -434,8 +438,8 @@ bool isRangeMutation(MutationRef m) {
// we may not get the entire mutation list for the version encoded_list_of_mutations:
// [mutation1][mutation2]...[mutationk], where
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
void _parseSerializedMutation(VersionedMutationsMap* pkvOps, SerializedMutationListMap* pmutationMap, bool isSampling) {
VersionedMutationsMap& kvOps = *pkvOps;
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, SerializedMutationListMap* pmutationMap, bool isSampling) {
VersionedMutationsMap& kvOps = kvOpsIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap;
for (auto& m : mutationMap) {
@ -477,11 +481,11 @@ void _parseSerializedMutation(VersionedMutationsMap* pkvOps, SerializedMutationL
}
// Parsing the data blocks in a range file
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsMap* pkvOps,
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen,
KeyRange restoreRange) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap& kvOps = kvOpsIter->second;
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));

View File

@ -352,19 +352,20 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
// Produce the key-range for each applier
void dummySampleWorkload(Reference<RestoreMasterData> self) {
int numAppliers = self->appliersInterf.size();
std::vector<UID> keyrangeSplitter;
std::vector<Key> keyrangeSplitter;
// We will use the splitter at [1, numAppliers - 1]. The first splitter is normalKeys.begin
int i;
for (i = 0; i < numAppliers - 1; i++) {
keyrangeSplitter.push_back(deterministicRandom()->randomUniqueID());
for (i = 0; i < numAppliers; i++) {
keyrangeSplitter.push_back(Key(deterministicRandom()->randomUniqueID().toString()));
}
std::sort(keyrangeSplitter.begin(), keyrangeSplitter.end());
i = 0;
self->rangeToApplier.clear();
for (auto& applier : self->appliersInterf) {
if (i == 0) {
self->rangeToApplier[normalKeys.begin] = applier.first;
} else {
self->rangeToApplier[StringRef(keyrangeSplitter[i].toString())] = applier.first;
self->rangeToApplier[Key(keyrangeSplitter[i])] = applier.first;
}
i++;
}

View File

@ -119,7 +119,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
return;
}
printf("[CheckDB] KV Number. Prev DB:%d Current DB:%d\n", self->dbKVs.size(), newDbKVs.size());
printf("[CheckDB] KV Number. Prev DB:%ld Current DB:%ld\n", self->dbKVs.size(), newDbKVs.size());
// compare the KV pairs in the DB
printf("------------------Now print out the diff between the prev DB and current DB-------------------\n");
if (self->dbKVs.size() >= newDbKVs.size()) {

View File

@ -45,7 +45,7 @@ struct RunRestoreWorkerWorkload : TestWorkload {
for (int i = 0; i < num_myWorkers; ++i) {
myWorkers.push_back(_restoreWorker(cx, LocalityData()));
}
printf("RunParallelRestoreWorkerWorkload, wait on reply from %d restore workers\n", myWorkers.size());
printf("RunParallelRestoreWorkerWorkload, wait on reply from %ld restore workers\n", myWorkers.size());
worker = waitForAll(myWorkers);
printf("RunParallelRestoreWorkerWorkload, got all replies from restore workers\n");
return Void();