From d95180f8538cb0c626dc5736d71cd8a578bac092 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Fri, 6 Dec 2019 18:25:52 -0800 Subject: [PATCH 01/11] Reduce parallel restore atomic op test workload --- ...llelRestoreCorrectnessAtomicOpTinyData.txt | 67 ------------------- 1 file changed, 67 deletions(-) delete mode 100644 tests/ParallelRestoreCorrectnessAtomicOpTinyData.txt diff --git a/tests/ParallelRestoreCorrectnessAtomicOpTinyData.txt b/tests/ParallelRestoreCorrectnessAtomicOpTinyData.txt deleted file mode 100644 index c61ba6255d..0000000000 --- a/tests/ParallelRestoreCorrectnessAtomicOpTinyData.txt +++ /dev/null @@ -1,67 +0,0 @@ -testTitle=BackupAndParallelRestoreWithAtomicOp - testName=AtomicOps - nodeCount=30000 -; Make ops space only 1 key per group -; nodeCount=100 - transactionsPerSecond=2500.0 -; transactionsPerSecond=500.0 -; transactionsPerSecond=100.0 -; nodeCount=4 -; transactionsPerSecond=250.0 - testDuration=30.0 - clearAfterTest=false -; Specify a type of atomicOp -; opType=0 -; actorsPerClient=1 - -; AtomicBackupCorrectness.txt does not mix Cycle and AtomicOps workloads -; testName=Cycle -;; nodeCount=30000 -;; nodeCount=1000 -; nodeCount=4 -;; transactionsPerSecond=2.0 -;; transactionsPerSecond=10.0 -;; transactionsPerSecond=20.0 -; transactionsPerSecond=2500.0 -; testDuration=30.0 -; expectedRate=0 -; clearAfterTest=false -; keyPrefix=a - -; Each testName=RunRestoreWorkerWorkload creates a restore worker -; We need at least 3 restore workers: master, loader, and applier - testName=RunRestoreWorkerWorkload - -; Test case for parallel restore - testName=BackupAndParallelRestoreCorrectness - backupAfter=10.0 - restoreAfter=60.0 - clearAfterTest=false - simBackupAgents=BackupToFile - backupRangesCount=-1 - - testName=RandomClogging - testDuration=90.0 - -; testName=Rollback -; meanDelay=90.0 -; testDuration=90.0 - -; Do NOT consider machine crash yet -; testName=Attrition -; machinesToKill=10 -; machinesToLeave=3 -; reboot=true -; testDuration=90.0 - -; testName=Attrition -; machinesToKill=10 -; machinesToLeave=3 -; reboot=true -; testDuration=90.0 - -; Disable buggify for parallel restore -buggify=off -;testDuration=360000 ;not work -;timeout is in seconds -timeout=360000 From 4a66366a0582482111f2e394cbeb7a716d9ac696 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Fri, 6 Dec 2019 22:00:40 -0800 Subject: [PATCH 02/11] Use MutationsVec instead of VectorRef --- fdbserver/RestoreLoader.actor.cpp | 8 ++++---- .../BackupAndParallelRestoreCorrectness.actor.cpp | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 80112ca031..8a91c6ed6f 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -230,7 +230,7 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver // Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion if (kvOps.find(endVersion) == kvOps.end()) { - kvOps[endVersion] = VectorRef(); // Empty mutation vector will be handled by applier + kvOps[endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier } // applierMutationsBuffer is the mutation vector to be sent to each applier @@ -248,7 +248,7 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver applierMutationsBuffer.clear(); applierMutationsSize.clear(); for (auto& applierID : applierIDs) { - applierMutationsBuffer[applierID] = MutationsVec(VectorRef()); + applierMutationsBuffer[applierID] = MutationsVec(); applierMutationsSize[applierID] = 0.0; } state Version commitVersion = kvOp->first; @@ -450,7 +450,7 @@ void _parseSerializedMutation(std::map::ite StringRefReaderMX kReader(k, restore_corrupted_data()); uint64_t commitVersion = kReader.consume(); // Consume little Endian data - kvOps.insert(std::make_pair(commitVersion, VectorRef())); + kvOps.insert(std::make_pair(commitVersion, MutationsVec())); StringRefReaderMX vReader(val, restore_corrupted_data()); vReader.consume(); // Consume the includeVersion @@ -541,7 +541,7 @@ ACTOR static Future _parseRangeFileToMutationsOnLoader( data[i].value); // ASSUME: all operation in range file is set. // We cache all kv operations into kvOps, and apply all kv operations later in one place - kvOps.insert(std::make_pair(version, VectorRef())); + kvOps.insert(std::make_pair(version, MutationsVec())); TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug") .detail("CommitVersion", version) .detail("ParsedMutationKV", m.toString()); diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index d83bfb1cfa..b52d669272 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -492,7 +492,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { state FileBackupAgent backupAgent; state Future extraBackup; state bool extraTasks = false; - state ReadYourWritesTransaction tr1(cx); state ReadYourWritesTransaction tr2(cx); state UID randomID = nondeterministicRandom()->randomUniqueID(); state int restoreIndex = 0; @@ -619,6 +618,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { // Restore each range by calling backupAgent.restore() printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size()); loop { + state ReadYourWritesTransaction tr1(cx); tr1.reset(); tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr1.setOption(FDBTransactionOptions::LOCK_AWARE); From e8dfc1c187bc9cd4241165228accecc9f4da297d Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Fri, 6 Dec 2019 23:16:49 -0800 Subject: [PATCH 03/11] Replace pop_front(size) with new empty standalone obj --- fdbclient/RestoreWorkerInterface.actor.h | 2 +- fdbserver/RestoreLoader.actor.cpp | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/fdbclient/RestoreWorkerInterface.actor.h b/fdbclient/RestoreWorkerInterface.actor.h index 94fc9647b2..f612d125ec 100644 --- a/fdbclient/RestoreWorkerInterface.actor.h +++ b/fdbclient/RestoreWorkerInterface.actor.h @@ -402,7 +402,7 @@ struct RestoreSendMutationVectorVersionedRequest : TimedRequest { RestoreSendMutationVectorVersionedRequest() = default; explicit RestoreSendMutationVectorVersionedRequest(int fileIndex, Version prevVersion, Version version, - bool isRangeFile, VectorRef mutations) + bool isRangeFile, MutationsVec mutations) : fileIndex(fileIndex), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile), mutations(mutations) {} diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 8a91c6ed6f..14105f9c53 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -237,8 +237,6 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver // applierMutationsSize is buffered mutation vector size for each applier state std::map applierMutationsBuffer; state std::map applierMutationsSize; - state MutationsVec mvector; - state Standalone> nodeIDs; splitMutationIndex = 0; kvCount = 0; @@ -257,10 +255,12 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver MutationRef kvm = kvOp->second[mIndex]; // Send the mutation to applier if (isRangeMutation(kvm)) { + MutationsVec mvector; + Standalone> nodeIDs; // Because using a vector of mutations causes overhead, and the range mutation should happen rarely; // We handle the range mutation and key mutation differently for the benefit of avoiding memory copy - mvector.pop_front(mvector.size()); - nodeIDs.pop_front(nodeIDs.size()); + // mvector.pop_front(mvector.size()); + // nodeIDs.pop_front(nodeIDs.size()); // WARNING: The splitMutation() may have bugs splitMutation(self, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents()); ASSERT(mvector.size() == nodeIDs.size()); @@ -295,8 +295,8 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver requests.push_back(std::make_pair( applierID, RestoreSendMutationVectorVersionedRequest(fileIndex, prevVersion, commitVersion, isRangeFile, applierMutationsBuffer[applierID]))); - applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size()); - applierMutationsSize[applierID] = 0; + //applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size()); + //applierMutationsSize[applierID] = 0; } TraceEvent(SevDebug, "FastRestore_Debug") .detail("Loader", self->id()) @@ -305,6 +305,12 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver .detail("FileIndex", fileIndex); ASSERT(prevVersion < commitVersion); wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests)); + // Clear buffers + for (auto& applierID : applierIDs) { + applierMutationsBuffer[applierID] = MutationsVec(); + applierMutationsSize[applierID] = 0; + } + requests.clear(); ASSERT(prevVersion < commitVersion); prevVersion = commitVersion; From 04230b59bfd5999558fc01a31276f68bd713a07d Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Sat, 7 Dec 2019 21:14:56 -0800 Subject: [PATCH 04/11] Increase load a bit --- ...llelRestoreCorrectnessAtomicOpTinyData.txt | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt diff --git a/tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt b/tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt new file mode 100644 index 0000000000..82f13193cf --- /dev/null +++ b/tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt @@ -0,0 +1,53 @@ +testTitle=BackupAndParallelRestoreWithAtomicOp + testName=AtomicOps + nodeCount=30000 +; Make ops space only 1 key per group +; nodeCount=100 +; transactionsPerSecond=2500.0 +; transactionsPerSecond=500.0 + transactionsPerSecond=50.0 +; nodeCount=4 +; transactionsPerSecond=250.0 + testDuration=15.0 + clearAfterTest=false +; Specify a type of atomicOp +; opType=0 +; actorsPerClient=1 + +; Each testName=RunRestoreWorkerWorkload creates a restore worker +; We need at least 3 restore workers: master, loader, and applier + testName=RunRestoreWorkerWorkload + +; Test case for parallel restore + testName=BackupAndParallelRestoreCorrectness + backupAfter=10.0 + restoreAfter=60.0 + clearAfterTest=false + simBackupAgents=BackupToFile + backupRangesCount=-1 + + testName=RandomClogging + testDuration=90.0 + +; testName=Rollback +; meanDelay=90.0 +; testDuration=90.0 + +; Do NOT consider machine crash yet +; testName=Attrition +; machinesToKill=10 +; machinesToLeave=3 +; reboot=true +; testDuration=90.0 + +; testName=Attrition +; machinesToKill=10 +; machinesToLeave=3 +; reboot=true +; testDuration=90.0 + +; Disable buggify for parallel restore +buggify=off +;testDuration=360000 ;not work +;timeout is in seconds +timeout=360000 From 1371db4cdced52c8b580673a44021d6c8678e139 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Tue, 10 Dec 2019 22:55:40 -0800 Subject: [PATCH 05/11] FastRestore:Self code review and cleanup 1. Review memory use cases and improve: Ensure state varialble is initialized and change unnecessary state variable to variable. 2. Remove debug code that is no longer useful; 3. Mute verbose debug. --- fdbclient/RestoreWorkerInterface.actor.h | 13 +- fdbserver/RestoreApplier.actor.cpp | 8 +- fdbserver/RestoreApplier.actor.h | 6 +- fdbserver/RestoreLoader.actor.cpp | 65 ++++--- fdbserver/RestoreMaster.actor.cpp | 39 ++-- fdbserver/RestoreMaster.actor.h | 20 +- fdbserver/RestoreUtil.h | 4 +- ...kupAndParallelRestoreCorrectness.actor.cpp | 175 +----------------- 8 files changed, 97 insertions(+), 233 deletions(-) diff --git a/fdbclient/RestoreWorkerInterface.actor.h b/fdbclient/RestoreWorkerInterface.actor.h index 0be34c8b8f..d8c773f83e 100644 --- a/fdbclient/RestoreWorkerInterface.actor.h +++ b/fdbclient/RestoreWorkerInterface.actor.h @@ -397,8 +397,8 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest { ReplyPromise reply; RestoreSendVersionedMutationsRequest() = default; - explicit RestoreSendVersionedMutationsRequest(int fileIndex, Version prevVersion, Version version, - bool isRangeFile, VectorRef mutations) + explicit RestoreSendVersionedMutationsRequest(int fileIndex, Version prevVersion, Version version, bool isRangeFile, + MutationsVec mutations) : fileIndex(fileIndex), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile), mutations(mutations) {} @@ -453,17 +453,12 @@ struct RestoreRequest { bool lockDB; UID randomUid; - int testData; std::vector restoreRequests; // Key restoreTag; ReplyPromise reply; - RestoreRequest() : testData(0) {} - explicit RestoreRequest(int testData) : testData(testData) {} - explicit RestoreRequest(int testData, std::vector& restoreRequests) - : testData(testData), restoreRequests(restoreRequests) {} - + RestoreRequest() = default; explicit RestoreRequest(const int index, const Key& tagName, const Key& url, bool waitForComplete, Version targetVersion, bool verbose, const KeyRange& range, const Key& addPrefix, const Key& removePrefix, bool lockDB, const UID& randomUid) @@ -474,7 +469,7 @@ struct RestoreRequest { template void serialize(Ar& ar) { serializer(ar, index, tagName, url, waitForComplete, targetVersion, verbose, range, addPrefix, removePrefix, - lockDB, randomUid, testData, restoreRequests, reply); + lockDB, randomUid, restoreRequests, reply); } std::string toString() const { diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index f6627d271d..43160a5b6d 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -108,13 +108,13 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMu if (curFilePos.get() == req.prevVersion) { Version commitVersion = req.version; - VectorRef mutations(req.mutations); + MutationsVec mutations(req.mutations); if (self->kvOps.find(commitVersion) == self->kvOps.end()) { - self->kvOps.insert(std::make_pair(commitVersion, VectorRef())); + self->kvOps.insert(std::make_pair(commitVersion, MutationsVec())); } for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { MutationRef mutation = mutations[mIndex]; - TraceEvent(SevDebug, "FastRestore") + TraceEvent(SevFRMutationInfo, "FastRestore") .detail("ApplierNode", self->id()) .detail("FileUID", req.fileIndex) .detail("Version", commitVersion) @@ -338,7 +338,7 @@ ACTOR Future applyToDB(Reference self, Database cx) { TraceEvent(SevError, "FastRestore").detail("InvalidMutationType", m.type); } - TraceEvent(SevDebug, "FastRestore_Debug") + TraceEvent(SevFRMutationInfo, "FastRestore") .detail("ApplierApplyToDB", self->describeNode()) .detail("Version", progress.curItInCurTxn->first) .detail("Index", progress.curIndexInCurTxn) diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 038d3c3d4a..882881b1d0 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -47,12 +47,12 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted, UID> rangeToApplier; + std::map rangeToApplier; // keyOpsCount is the number of operations per key that is used to determine the key-range boundary for appliers - std::map, int> keyOpsCount; + std::map keyOpsCount; // For master applier to hold the lower bound of key ranges for each appliers - std::vector> keyRangeLowerBounds; + std::vector keyRangeLowerBounds; // TODO: This block of variables may be moved to RestoreRoleData bool inProgressApplyToDB = false; diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index a3e77d8d0c..415c0043e0 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -127,18 +127,6 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference _processLoadingParam(LoadingParam param, Reference self) { - // Q: How to record the param's fields inside LoadingParam Refer to storageMetrics - TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartProcessLoadParam", param.toString()); - ASSERT(param.blockSize > 0); - ASSERT(param.offset % param.blockSize == 0); // Parse file must be at block bondary. - ASSERT(self->kvOpsPerLP.find(param) == self->kvOpsPerLP.end()); - // NOTE: map's iterator is guaranteed to be stable, but pointer may not. - // state VersionedMutationsMap* kvOps = &self->kvOpsPerLP[param]; - self->kvOpsPerLP.emplace(param, VersionedMutationsMap()); - self->sampleMutations.emplace(param, MutationsVec()); - state std::map::iterator kvOpsPerLPIter = self->kvOpsPerLP.find(param); - state std::map::iterator samplesIter = self->sampleMutations.find(param); - // Temporary data structure for parsing log files into (version, ) // Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted // mutationMap: Key is the unique identifier for a batch of mutation logs at the same version @@ -146,6 +134,21 @@ ACTOR Future _processLoadingParam(LoadingParam param, Reference, uint32_t> mutationPartMap; // Sanity check the data parsing is correct state NotifiedVersion processedFileOffset(0); state std::vector> fileParserFutures; + state std::map::iterator kvOpsPerLPIter = self->kvOpsPerLP.end(); + state std::map::iterator samplesIter = self->sampleMutations.end(); + + // Q: How to record the param's fields inside LoadingParam Refer to storageMetrics + TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartProcessLoadParam", param.toString()); + ASSERT(param.blockSize > 0); + ASSERT(param.offset % param.blockSize == 0); // Parse file must be at block bondary. + ASSERT(self->kvOpsPerLP.find(param) == self->kvOpsPerLP.end()); + + // NOTE: map's iterator is guaranteed to be stable, but pointer may not. + // state VersionedMutationsMap* kvOps = &self->kvOpsPerLP[param]; + self->kvOpsPerLP.emplace(param, VersionedMutationsMap()); + self->sampleMutations.emplace(param, MutationsVec()); + kvOpsPerLPIter = self->kvOpsPerLP.find(param); + samplesIter = self->sampleMutations.find(param); int64_t j; int64_t readOffset; @@ -188,7 +191,6 @@ ACTOR Future handleLoadFileRequest(RestoreLoadFileRequest req, ReferenceprocessedFileParams.find(req.param) != self->processedFileParams.end()); wait(self->processedFileParams[req.param]); // wait on the processing of the req.param. - // TODO: Send sampled mutations back to master req.reply.send(RestoreLoadFileReply(req.param, self->sampleMutations[req.param])); // TODO: clear self->sampleMutations[req.param] memory to save memory on loader return Void(); @@ -196,9 +198,9 @@ ACTOR Future handleLoadFileRequest(RestoreLoadFileRequest req, Reference handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req, Reference self) { - self->rangeToApplier = req.rangeToApplier; - state std::map::iterator item = self->kvOpsPerLP.begin(); + + self->rangeToApplier = req.rangeToApplier; for (; item != self->kvOpsPerLP.end(); item++) { if (item->first.isRangeFile == req.useRangeFile) { // Send the parsed mutation to applier who will apply the mutation to DB @@ -216,6 +218,7 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ ACTOR Future sendMutationsToApplier(Reference self, VersionedMutationsMap* pkvOps, bool isRangeFile, Version startVersion, Version endVersion, int fileIndex) { state VersionedMutationsMap& kvOps = *pkvOps; + state VersionedMutationsMap::iterator kvOp = kvOps.end(); state int kvCount = 0; state int splitMutationIndex = 0; state std::vector applierIDs = self->getWorkingApplierIDs(); @@ -236,21 +239,25 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver // applierMutationsBuffer is the mutation vector to be sent to each applier // applierMutationsSize is buffered mutation vector size for each applier - state std::map applierMutationsBuffer; - state std::map applierMutationsSize; + // state std::map applierMutationsBuffer; + // state std::map applierMutationsSize; splitMutationIndex = 0; kvCount = 0; - state VersionedMutationsMap::iterator kvOp = kvOps.begin(); + kvOp = kvOps.begin(); for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) { - applierMutationsBuffer.clear(); - applierMutationsSize.clear(); + // applierMutationsBuffer is the mutation vector to be sent to each applier + // applierMutationsSize is buffered mutation vector size for each applier + std::map applierMutationsBuffer; + std::map applierMutationsSize; + // applierMutationsBuffer.clear(); + // applierMutationsSize.clear(); for (auto& applierID : applierIDs) { applierMutationsBuffer[applierID] = MutationsVec(); applierMutationsSize[applierID] = 0.0; } - state Version commitVersion = kvOp->first; + Version commitVersion = kvOp->first; for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) { MutationRef kvm = kvOp->second[mIndex]; @@ -305,16 +312,16 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver .detail("CommitVersion", commitVersion) .detail("FileIndex", fileIndex); ASSERT(prevVersion < commitVersion); + prevVersion = commitVersion; wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests)); // Clear buffers - for (auto& applierID : applierIDs) { - applierMutationsBuffer[applierID] = MutationsVec(); - applierMutationsSize[applierID] = 0; - } + // for (auto& applierID : applierIDs) { + // applierMutationsBuffer[applierID] = MutationsVec(); + // applierMutationsSize[applierID] = 0; + // } requests.clear(); - ASSERT(prevVersion < commitVersion); - prevVersion = commitVersion; + } // all versions of mutations in the same file TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount); @@ -377,10 +384,10 @@ bool concatenateBackupMutationForLogFile(std::map, Standal std::map, uint32_t>& mutationPartMap = *pMutationPartMap; std::string prefix = "||\t"; std::stringstream ss; - StringRef val = val_input.contents(); + //StringRef val = val_input.contents(); const int key_prefix_len = sizeof(uint8_t) + sizeof(Version) + sizeof(uint32_t); - StringRefReaderMX reader(val, restore_corrupted_data()); + //StringRefReaderMX reader(val, restore_corrupted_data()); StringRefReaderMX readerKey(key_input, restore_corrupted_data()); // read key_input! int logRangeMutationFirstLength = key_input.size() - key_prefix_len; bool concatenated = false; diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index 6019fbe54f..036c61efdb 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -79,6 +79,9 @@ ACTOR Future startRestoreMaster(Reference masterWorker, // RestoreWorker that has restore master role: Recruite a role for each worker ACTOR Future recruitRestoreRoles(Reference masterWorker, Reference masterData) { + state int nodeIndex = 0; + state RestoreRole role = RestoreRole::Invalid; + TraceEvent("FastRestore") .detail("RecruitRestoreRoles", masterWorker->workerInterfaces.size()) .detail("NumLoaders", opConfig.num_loaders) @@ -91,8 +94,6 @@ ACTOR Future recruitRestoreRoles(Reference masterWorker ASSERT(opConfig.num_loaders + opConfig.num_appliers <= masterWorker->workerInterfaces.size()); // Assign a role to each worker - state int nodeIndex = 0; - state RestoreRole role; std::vector> requests; for (auto& workerInterf : masterWorker->workerInterfaces) { if (nodeIndex >= 0 && nodeIndex < opConfig.num_appliers) { @@ -158,11 +159,14 @@ ACTOR Future distributeRestoreSysInfo(Reference masterW // and ask all restore roles to quit. ACTOR Future startProcessRestoreRequests(Reference self, Database cx) { state UID randomUID = deterministicRandom()->randomUniqueID(); - TraceEvent("FastRestore").detail("RestoreMaster", "WaitOnRestoreRequests"); state Standalone> restoreRequests = wait(collectRestoreRequests(cx)); + state int numTries = 0; + state int restoreIndex = 0; + + TraceEvent("FastRestore").detail("RestoreMaster", "WaitOnRestoreRequests"); // lock DB for restore - state int numTries = 0; + numTries = 0; loop { try { wait(lockDatabase(cx, randomUID)); @@ -187,7 +191,6 @@ ACTOR Future startProcessRestoreRequests(Reference self wait(clearDB(cx)); // Step: Perform the restore requests - state int restoreIndex = 0; try { for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) { RestoreRequest& request = restoreRequests[restoreIndex]; @@ -195,7 +198,11 @@ ACTOR Future startProcessRestoreRequests(Reference self wait(success(processRestoreRequest(self, cx, request))); } } catch (Error& e) { - TraceEvent(SevError, "FastRestoreFailed").detail("RestoreRequest", restoreRequests[restoreIndex].toString()); + if (restoreIndex < restoreRequests.size()) { + TraceEvent(SevError, "FastRestoreFailed").detail("RestoreRequest", restoreRequests[restoreIndex].toString()); + } else { + TraceEvent(SevError, "FastRestoreFailed").detail("RestoreRequests", restoreRequests.size()).detail("RestoreIndex", restoreIndex); + } } // Step: Notify all restore requests have been handled by cleaning up the restore keys @@ -218,6 +225,7 @@ ACTOR static Future processRestoreRequest(Reference RestoreRequest request) { state std::vector files; state std::vector allFiles; + state std::map::iterator versionBatch = self->versionBatches.end(); self->initBackupContainer(request.url); @@ -225,7 +233,7 @@ ACTOR static Future processRestoreRequest(Reference wait(collectBackupFiles(self->bc, &files, cx, request)); self->buildVersionBatches(files, &self->versionBatches); // Divide files into version batches - state std::map::iterator versionBatch; + ASSERT(self->batchIndex == 1); // versionBatchIndex starts at 1 because NotifiedVersion starts at 0 for (versionBatch = self->versionBatches.begin(); versionBatch != self->versionBatches.end(); versionBatch++) { wait(initializeVersionBatch(self)); wait(distributeWorkloadPerVersionBatch(self, cx, request, versionBatch->second)); @@ -243,14 +251,14 @@ ACTOR static Future loadFilesOnLoaders(Reference self, .detail("BeginVersion", versionBatch.beginVersion) .detail("EndVersion", versionBatch.endVersion); - Key mutationLogPrefix; - std::vector* files; + // Key mutationLogPrefix; + std::vector* files = NULL; if (isRangeFile) { files = &versionBatch.rangeFiles; } else { files = &versionBatch.logFiles; - Reference restoreConfig(new RestoreConfigFR(request.randomUid)); - mutationLogPrefix = restoreConfig->mutationLogPrefix(); + // Reference restoreConfig(new RestoreConfigFR(request.randomUid)); + // mutationLogPrefix = restoreConfig->mutationLogPrefix(); } // sort files in increasing order of beginVersion @@ -331,7 +339,6 @@ ACTOR static Future sendMutationsFromLoaders(Reference ACTOR static Future distributeWorkloadPerVersionBatch(Reference self, Database cx, RestoreRequest request, VersionBatch versionBatch) { ASSERT(!versionBatch.isEmpty()); - ASSERT(self->loadersInterf.size() > 0); ASSERT(self->appliersInterf.size() > 0); @@ -355,6 +362,7 @@ ACTOR static Future distributeWorkloadPerVersionBatch(Reference self) { + ASSERT(self->samplesSize >= 0); int numAppliers = self->appliersInterf.size(); double slotSize = std::max(self->samplesSize / numAppliers, 1.0); std::vector keyrangeSplitter; @@ -392,15 +400,17 @@ void splitKeyRangeForAppliers(Reference self) { self->rangeToApplier[keyrangeSplitter[i]] = applier.first; i++; } + ASSERT(self->rangeToApplier.size() > 0); + ASSERT(self->sanityCheckApplierKeyRange()); self->logApplierKeyRange(); } ACTOR static Future>> collectRestoreRequests(Database cx) { state Standalone> restoreRequests; state Future watch4RestoreRequest; + state ReadYourWritesTransaction tr(cx); // wait for the restoreRequestTriggerKey to be set by the client/test workload - state ReadYourWritesTransaction tr(cx); loop { try { tr.reset(); @@ -517,6 +527,8 @@ ACTOR static Future notifyApplierToApplyMutations(Reference notifyRestoreCompleted(Reference self, Database cx) { + state Reference tr(new ReadYourWritesTransaction(cx)); + std::vector> requests; for (auto& loader : self->loadersInterf) { requests.push_back(std::make_pair(loader.first, RestoreVersionBatchRequest(self->batchIndex))); @@ -534,7 +546,6 @@ ACTOR static Future notifyRestoreCompleted(Reference se wait(delay(5.0)); // Give some time for loaders and appliers to exit // Notify tester that the restore has finished - state Reference tr(new ReadYourWritesTransaction(cx)); loop { try { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); diff --git a/fdbserver/RestoreMaster.actor.h b/fdbserver/RestoreMaster.actor.h index aa7b6d0ba0..40c83eddb2 100644 --- a/fdbserver/RestoreMaster.actor.h +++ b/fdbserver/RestoreMaster.actor.h @@ -162,7 +162,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted= versionBatch.second.beginVersion); @@ -174,7 +174,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted applierToRange; + for (auto& applier : rangeToApplier) { + if (applierToRange.find(applier.second) == applierToRange.end()) { + applierToRange[applier.second] = applier.first; + } else { + TraceEvent(SevError, "FastRestore").detail("SanityCheckApplierKeyRange", applierToRange.size()).detail("ApplierID", applier.second).detail("Key1", applierToRange[applier.second]).detail("Key2", applier.first); + ret = false; + } + } + return ret; + } + void logApplierKeyRange() { TraceEvent("FastRestore").detail("ApplierKeyRangeNum", rangeToApplier.size()); for (auto& applier : rangeToApplier) { diff --git a/fdbserver/RestoreUtil.h b/fdbserver/RestoreUtil.h index 9ba86ec9b4..a16aadcd57 100644 --- a/fdbserver/RestoreUtil.h +++ b/fdbserver/RestoreUtil.h @@ -35,8 +35,8 @@ #include #include -//#define SevFRMutationInfo SevVerbose -#define SevFRMutationInfo SevInfo +#define SevFRMutationInfo SevVerbose +//#define SevFRMutationInfo SevInfo using MutationsVec = Standalone>; diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index b52d669272..99730c6bea 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -82,8 +82,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { backupRanges.push_back_deep(backupRanges.arena(), normalKeys); } else { // Add backup ranges - // Q: why the range endpoints (the range interval) are randomly generated? - // Won't this cause unbalanced range interval in backup? std::set rangeEndpoints; while (rangeEndpoints.size() < backupRangesCount * 2) { rangeEndpoints.insert(deterministicRandom()->randomAlphaNumeric( @@ -105,163 +103,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { } } - static void compareDBKVs(Standalone data, BackupAndParallelRestoreCorrectnessWorkload* self) { - bool hasDiff = false; - // Get the new KV pairs in the DB - std::map, Standalone> newDbKVs; - for (auto kvRef = data.contents().begin(); kvRef != data.contents().end(); kvRef++) { - newDbKVs.insert(std::make_pair(kvRef->key, kvRef->value)); - } - - if (self->dbKVs.empty()) { - printf("[CheckDB] set DB kv for the first time.\n"); - self->dbKVs = newDbKVs; - return; - } - - printf("[CheckDB] KV Number. Prev DB:%ld Current DB:%ld\n", self->dbKVs.size(), newDbKVs.size()); - // compare the KV pairs in the DB - printf("------------------Now print out the diff between the prev DB and current DB-------------------\n"); - if (self->dbKVs.size() >= newDbKVs.size()) { - for (auto kv = self->dbKVs.begin(); kv != self->dbKVs.end(); kv++) { - bool exist = (newDbKVs.find(kv->first) != newDbKVs.end()); - if (!exist) { - printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(kv->first).c_str(), - getHexString(kv->second).c_str(), "[Not Exist]"); - hasDiff = true; - } - if (exist && (newDbKVs[kv->first] != self->dbKVs[kv->first])) { - printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(kv->first).c_str(), - getHexString(kv->second).c_str(), getHexString(newDbKVs[kv->first]).c_str()); - hasDiff = true; - } - } - } else { - for (auto newKV = newDbKVs.begin(); newKV != newDbKVs.end(); newKV++) { - bool exist = (self->dbKVs.find(newKV->first) != self->dbKVs.end()); - if (!exist) { - printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", "[Not Exist]", getHexString(newKV->first).c_str(), - getHexString(newKV->second).c_str()); - hasDiff = true; - } - if (exist && (newDbKVs[newKV->first] != self->dbKVs[newKV->first])) { - printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(newKV->first).c_str(), - getHexString(self->dbKVs[newKV->first]).c_str(), - getHexString(newDbKVs[newKV->first]).c_str()); - hasDiff = true; - } - } - } - - int numEntries = 10; - int i = 0; - if (hasDiff) { - // print out the first and last 10 entries - printf("\t---Prev DB first and last %d entries\n", numEntries); - if (!self->dbKVs.empty()) { - auto kv = self->dbKVs.begin(); - for (; kv != self->dbKVs.end(); kv++) { - if (i >= numEntries) break; - - printf("\t[Entry:%d]Key:%s Value:%s\n", i++, getHexString(kv->first).c_str(), - getHexString(kv->second).c_str()); - } - - i = self->dbKVs.size(); - kv = self->dbKVs.end(); - for (--kv; kv != self->dbKVs.begin(); kv--) { - if (i <= self->dbKVs.size() - numEntries) break; - - printf("\t[Entry:%d]Key:%s Value:%s\n", i--, getHexString(kv->first).c_str(), - getHexString(kv->second).c_str()); - } - } - - printf("\t---Current DB first and last %d entries\n", numEntries); - if (!newDbKVs.empty()) { - auto kv = newDbKVs.begin(); - i = 0; - for (; kv != newDbKVs.end(); kv++) { - if (i >= numEntries) break; - - printf("\t[Entry:%d]Key:%s Value:%s\n", i++, getHexString(kv->first).c_str(), - getHexString(kv->second).c_str()); - } - - i = newDbKVs.size(); - kv = newDbKVs.end(); - for (--kv; kv != newDbKVs.begin(); kv--) { - if (i <= newDbKVs.size() - numEntries) break; - - printf("\t[Entry:%d]Key:%s Value:%s\n", i--, getHexString(kv->first).c_str(), - getHexString(kv->second).c_str()); - } - } - } - - self->dbKVs = newDbKVs; // update the dbKVs - } - - static void dumpDBKVs(Standalone data, BackupAndParallelRestoreCorrectnessWorkload* self) { - // bool hasDiff = false; - // Get the new KV pairs in the DB - std::map, Standalone> newDbKVs; - for (auto kvRef = data.contents().begin(); kvRef != data.contents().end(); kvRef++) { - newDbKVs.insert(std::make_pair(kvRef->key, kvRef->value)); - } - - printf("---------------------Now print out the KV in the current DB---------------------\n"); - for (auto newKV = newDbKVs.begin(); newKV != newDbKVs.end(); newKV++) { - printf("\tKey:%s Value:%s\n", getHexString(newKV->first).c_str(), getHexString(newKV->second).c_str()); - } - } - - ACTOR static Future checkDB(Database cx, std::string when, - BackupAndParallelRestoreCorrectnessWorkload* self) { - - state Key keyPrefix = LiteralStringRef(""); - state Transaction tr(cx); - state int retryCount = 0; - loop { - try { - state Version v = wait(tr.getReadVersion()); - state Standalone data = wait( - tr.getRange(firstGreaterOrEqual(doubleToTestKey(0.0, keyPrefix)), - firstGreaterOrEqual(doubleToTestKey(1.0, keyPrefix)), std::numeric_limits::max())); - // compareDBKVs(data, self); - break; - } catch (Error& e) { - retryCount++; - TraceEvent(retryCount > 20 ? SevWarnAlways : SevWarn, "CheckDBError").error(e); - wait(tr.onError(e)); - } - } - - return Void(); - } - - ACTOR static Future dumpDB(Database cx, std::string when, BackupAndParallelRestoreCorrectnessWorkload* self) { - state Key keyPrefix = LiteralStringRef(""); - // int numPrint = 20; //number of entries in the front and end to print out. - state Transaction tr(cx); - state int retryCount = 0; - loop { - try { - state Standalone data = wait( - tr.getRange(firstGreaterOrEqual(doubleToTestKey(0.0, keyPrefix)), - firstGreaterOrEqual(doubleToTestKey(1.0, keyPrefix)), std::numeric_limits::max())); - printf("dump DB, at %s. retryCount:%d Data size:%d, rangeResultInfo:%s\n", when.c_str(), retryCount, - data.size(), data.contents().toString().c_str()); - dumpDBKVs(data, self); - return Void(); - } catch (Error& e) { - retryCount++; - TraceEvent(retryCount > 20 ? SevWarnAlways : SevWarn, "dumpDBError").error(e); - wait(tr.onError(e)); - } - } - } - virtual std::string description() { return "BackupAndParallelRestoreCorrectness"; } virtual Future setup(Database const& cx) { return Void(); } @@ -310,10 +151,9 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { FileBackupAgent* backupAgent, Database cx, Key tag, Standalone> backupRanges, double stopDifferentialDelay, Promise submittted) { - state UID randomID = nondeterministicRandom()->randomUniqueID(); - state Future stopDifferentialFuture = delay(stopDifferentialDelay); + wait(delay(startDelay)); if (startDelay || BUGGIFY) { @@ -492,10 +332,10 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { state FileBackupAgent backupAgent; state Future extraBackup; state bool extraTasks = false; - state ReadYourWritesTransaction tr2(cx); state UID randomID = nondeterministicRandom()->randomUniqueID(); state int restoreIndex = 0; state bool restoreDone = false; + state ReadYourWritesTransaction tr2(cx); TraceEvent("BARW_Arguments") .detail("BackupTag", printable(self->backupTag)) @@ -520,8 +360,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { // backup wait(delay(self->backupAfter)); - wait(checkDB(cx, "BeforeStartBackup", self)); - TraceEvent("BARW_DoBackup1", randomID).detail("Tag", printable(self->backupTag)); state Promise submitted; state Future b = doBackup(self, 0, &backupAgent, cx, self->backupTag, self->backupRanges, @@ -550,8 +388,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { .detail("BackupTag", printable(self->backupTag)) .detail("AbortAndRestartAfter", self->abortAndRestartAfter); - wait(checkDB(cx, "BackupDone", self)); - state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString()); UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx)); state UID logUid = uidFlag.first; @@ -616,14 +452,13 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { state std::vector> restoreTags; // Restore each range by calling backupAgent.restore() - printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size()); + TraceEvent("FastRestore").detail("PrepareRestores", self->backupRanges.size()); loop { state ReadYourWritesTransaction tr1(cx); tr1.reset(); tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr1.setOption(FDBTransactionOptions::LOCK_AWARE); try { - printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size()); // Note: we always lock DB here in case DB is modified at the bacupRanges boundary. for (restoreIndex = 0; restoreIndex < self->backupRanges.size(); restoreIndex++) { auto range = self->backupRanges[restoreIndex]; @@ -645,7 +480,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { wait(tr1.onError(e)); } }; - printf("FastRestore:Test workload triggers the restore by setting up restoreRequestTriggerKey\n"); + TraceEvent("FastRestore").detail("TriggerRestore", "Setting up restoreRequestTriggerKey"); // Sometimes kill and restart the restore if (BUGGIFY) { @@ -698,7 +533,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { } TraceEvent("FastRestore").detail("BackupAndParallelRestore", "RestoreFinished"); - wait(checkDB(cx, "FinishRestore", self)); + // wait(checkDB(cx, "FinishRestore", self)); for (auto& restore : restores) { ASSERT(!restore.isError()); From a6cfad6c3fd74c07af3bcbbd0c3d20125e5cb357 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 11 Dec 2019 16:39:17 -0800 Subject: [PATCH 06/11] FastRestore: Add ParallelRestoreCorrectnessAtomicOpTinyData test back --- tests/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index bf87b83c1e..39eafed6e2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -200,7 +200,7 @@ add_fdb_test(TEST_FILES slow/VersionStampSwitchover.txt) add_fdb_test(TEST_FILES slow/WriteDuringReadAtomicRestore.txt) add_fdb_test(TEST_FILES slow/WriteDuringReadSwitchover.txt) add_fdb_test(TEST_FILES slow/ddbalance.txt) -add_fdb_test(TEST_FILES ParallelRestoreCorrectnessAtomicOpTinyData.txt) +add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt) add_fdb_test(TEST_FILES ParallelRestoreCorrectnessCycle.txt) # Note that status tests are not deterministic. add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt) From 9670d64fbdc26d9c662063fc56c02b4762ca7b91 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 11 Dec 2019 16:48:40 -0800 Subject: [PATCH 07/11] FastRestore:Remove commented code --- fdbserver/RestoreLoader.actor.cpp | 20 +------------------ fdbserver/RestoreMaster.actor.cpp | 2 -- ...kupAndParallelRestoreCorrectness.actor.cpp | 1 - 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 415c0043e0..dea3bb04a9 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -237,11 +237,6 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver kvOps[endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier } - // applierMutationsBuffer is the mutation vector to be sent to each applier - // applierMutationsSize is buffered mutation vector size for each applier - // state std::map applierMutationsBuffer; - // state std::map applierMutationsSize; - splitMutationIndex = 0; kvCount = 0; kvOp = kvOps.begin(); @@ -251,8 +246,6 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver // applierMutationsSize is buffered mutation vector size for each applier std::map applierMutationsBuffer; std::map applierMutationsSize; - // applierMutationsBuffer.clear(); - // applierMutationsSize.clear(); for (auto& applierID : applierIDs) { applierMutationsBuffer[applierID] = MutationsVec(); applierMutationsSize[applierID] = 0.0; @@ -267,8 +260,6 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver Standalone> nodeIDs; // Because using a vector of mutations causes overhead, and the range mutation should happen rarely; // We handle the range mutation and key mutation differently for the benefit of avoiding memory copy - // mvector.pop_front(mvector.size()); - // nodeIDs.pop_front(nodeIDs.size()); // WARNING: The splitMutation() may have bugs splitMutation(self, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents()); ASSERT(mvector.size() == nodeIDs.size()); @@ -303,8 +294,6 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver requests.push_back(std::make_pair( applierID, RestoreSendVersionedMutationsRequest(fileIndex, prevVersion, commitVersion, isRangeFile, applierMutationsBuffer[applierID]))); - // applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size()); - // applierMutationsSize[applierID] = 0; } TraceEvent(SevDebug, "FastRestore_Debug") .detail("Loader", self->id()) @@ -314,12 +303,7 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver ASSERT(prevVersion < commitVersion); prevVersion = commitVersion; wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests)); - // Clear buffers - // for (auto& applierID : applierIDs) { - // applierMutationsBuffer[applierID] = MutationsVec(); - // applierMutationsSize[applierID] = 0; - // } - + requests.clear(); } // all versions of mutations in the same file @@ -384,10 +368,8 @@ bool concatenateBackupMutationForLogFile(std::map, Standal std::map, uint32_t>& mutationPartMap = *pMutationPartMap; std::string prefix = "||\t"; std::stringstream ss; - //StringRef val = val_input.contents(); const int key_prefix_len = sizeof(uint8_t) + sizeof(Version) + sizeof(uint32_t); - //StringRefReaderMX reader(val, restore_corrupted_data()); StringRefReaderMX readerKey(key_input, restore_corrupted_data()); // read key_input! int logRangeMutationFirstLength = key_input.size() - key_prefix_len; bool concatenated = false; diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index 036c61efdb..a32427d207 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -257,8 +257,6 @@ ACTOR static Future loadFilesOnLoaders(Reference self, files = &versionBatch.rangeFiles; } else { files = &versionBatch.logFiles; - // Reference restoreConfig(new RestoreConfigFR(request.randomUid)); - // mutationLogPrefix = restoreConfig->mutationLogPrefix(); } // sort files in increasing order of beginVersion diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index 99730c6bea..93b9117895 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -533,7 +533,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { } TraceEvent("FastRestore").detail("BackupAndParallelRestore", "RestoreFinished"); - // wait(checkDB(cx, "FinishRestore", self)); for (auto& restore : restores) { ASSERT(!restore.isError()); From b5d7890ce080f37e0474261852830fb69fb5b944 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 12 Dec 2019 07:44:57 -0800 Subject: [PATCH 08/11] FastRestore:Resolve review comments --- fdbserver/RestoreLoader.actor.cpp | 7 +++---- fdbserver/RestoreMaster.actor.cpp | 10 ++++++---- fdbserver/RestoreMaster.actor.h | 30 +++++++++++++++--------------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index dea3bb04a9..01d90b807a 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -218,7 +218,7 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ ACTOR Future sendMutationsToApplier(Reference self, VersionedMutationsMap* pkvOps, bool isRangeFile, Version startVersion, Version endVersion, int fileIndex) { state VersionedMutationsMap& kvOps = *pkvOps; - state VersionedMutationsMap::iterator kvOp = kvOps.end(); + state VersionedMutationsMap::iterator kvOp = kvOps.begin(); state int kvCount = 0; state int splitMutationIndex = 0; state std::vector applierIDs = self->getWorkingApplierIDs(); @@ -239,7 +239,6 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver splitMutationIndex = 0; kvCount = 0; - kvOp = kvOps.begin(); for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) { // applierMutationsBuffer is the mutation vector to be sent to each applier @@ -303,9 +302,9 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver ASSERT(prevVersion < commitVersion); prevVersion = commitVersion; wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests)); - + requests.clear(); - + } // all versions of mutations in the same file TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount); diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index a32427d207..cb215d7c29 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -199,9 +199,12 @@ ACTOR Future startProcessRestoreRequests(Reference self } } catch (Error& e) { if (restoreIndex < restoreRequests.size()) { - TraceEvent(SevError, "FastRestoreFailed").detail("RestoreRequest", restoreRequests[restoreIndex].toString()); + TraceEvent(SevError, "FastRestoreFailed") + .detail("RestoreRequest", restoreRequests[restoreIndex].toString()); } else { - TraceEvent(SevError, "FastRestoreFailed").detail("RestoreRequests", restoreRequests.size()).detail("RestoreIndex", restoreIndex); + TraceEvent(SevError, "FastRestoreFailed") + .detail("RestoreRequests", restoreRequests.size()) + .detail("RestoreIndex", restoreIndex); } } @@ -251,8 +254,7 @@ ACTOR static Future loadFilesOnLoaders(Reference self, .detail("BeginVersion", versionBatch.beginVersion) .detail("EndVersion", versionBatch.endVersion); - // Key mutationLogPrefix; - std::vector* files = NULL; + std::vector* files = nullptr; if (isRangeFile) { files = &versionBatch.rangeFiles; } else { diff --git a/fdbserver/RestoreMaster.actor.h b/fdbserver/RestoreMaster.actor.h index 40c83eddb2..8bd82c07d6 100644 --- a/fdbserver/RestoreMaster.actor.h +++ b/fdbserver/RestoreMaster.actor.h @@ -162,9 +162,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted= versionBatch.second.beginVersion); ASSERT(logFile.endVersion <= versionBatch.second.endVersion); ASSERT(prevVersion <= logFile.beginVersion); @@ -174,9 +172,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted= versionBatch.second.beginVersion); ASSERT(rangeFile.endVersion < versionBatch.second.endVersion); @@ -193,15 +189,19 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted applierToRange; - for (auto& applier : rangeToApplier) { - if (applierToRange.find(applier.second) == applierToRange.end()) { - applierToRange[applier.second] = applier.first; - } else { - TraceEvent(SevError, "FastRestore").detail("SanityCheckApplierKeyRange", applierToRange.size()).detail("ApplierID", applier.second).detail("Key1", applierToRange[applier.second]).detail("Key2", applier.first); - ret = false; - } - } - return ret; + for (auto& applier : rangeToApplier) { + if (applierToRange.find(applier.second) == applierToRange.end()) { + applierToRange[applier.second] = applier.first; + } else { + TraceEvent(SevError, "FastRestore") + .detail("SanityCheckApplierKeyRange", applierToRange.size()) + .detail("ApplierID", applier.second) + .detail("Key1", applierToRange[applier.second]) + .detail("Key2", applier.first); + ret = false; + } + } + return ret; } void logApplierKeyRange() { From 64abaaf02d77dc576ff4c91063de3bf24ce31c13 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 12 Dec 2019 07:57:28 -0800 Subject: [PATCH 09/11] FastRestore:Reenable tests for nightly test --- tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt | 4 ++-- tests/{ => slow}/ParallelRestoreCorrectnessCycle.txt | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename tests/{ => slow}/ParallelRestoreCorrectnessCycle.txt (100%) diff --git a/tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt b/tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt index 82f13193cf..d075180004 100644 --- a/tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt +++ b/tests/slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt @@ -5,10 +5,10 @@ testTitle=BackupAndParallelRestoreWithAtomicOp ; nodeCount=100 ; transactionsPerSecond=2500.0 ; transactionsPerSecond=500.0 - transactionsPerSecond=50.0 + transactionsPerSecond=500.0 ; nodeCount=4 ; transactionsPerSecond=250.0 - testDuration=15.0 + testDuration=30.0 clearAfterTest=false ; Specify a type of atomicOp ; opType=0 diff --git a/tests/ParallelRestoreCorrectnessCycle.txt b/tests/slow/ParallelRestoreCorrectnessCycle.txt similarity index 100% rename from tests/ParallelRestoreCorrectnessCycle.txt rename to tests/slow/ParallelRestoreCorrectnessCycle.txt From 650be617f1e38f9ee2a98129e3e264429917c3f2 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 12 Dec 2019 10:32:13 -0800 Subject: [PATCH 10/11] FastRestore:Add tests to CMakefile --- fdbserver/RestoreMaster.actor.cpp | 2 +- tests/CMakeLists.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index cb215d7c29..3ccef0e4f6 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -228,7 +228,7 @@ ACTOR static Future processRestoreRequest(Reference RestoreRequest request) { state std::vector files; state std::vector allFiles; - state std::map::iterator versionBatch = self->versionBatches.end(); + state std::map::iterator versionBatch = self->versionBatches.begin(); self->initBackupContainer(request.url); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 39eafed6e2..d24ea3a9df 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -201,7 +201,7 @@ add_fdb_test(TEST_FILES slow/WriteDuringReadAtomicRestore.txt) add_fdb_test(TEST_FILES slow/WriteDuringReadSwitchover.txt) add_fdb_test(TEST_FILES slow/ddbalance.txt) add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt) -add_fdb_test(TEST_FILES ParallelRestoreCorrectnessCycle.txt) +add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessCycle.txt) # Note that status tests are not deterministic. add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt) add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt) From 97030d91683eea25127b623178dce1e9b9f721ec Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 12 Dec 2019 21:55:50 -0800 Subject: [PATCH 11/11] FastRestore:Revise and test SevFRMutationInfo Enabled SevFRMutationInfo for valgrind test, no error found, and disable it again. Revise debug trace message a bit. --- fdbclient/CommitTransaction.h | 2 +- fdbserver/RestoreUtil.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 5ebb245c72..540157e5c9 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -90,7 +90,7 @@ struct MutationRef { return format("code: %s param1: %s param2: %s", typeString[type], printable(param1).c_str(), printable(param2).c_str()); } else { - return format("code: %s param1: %s param2: %s", "Invalid", printable(param1).c_str(), printable(param2).c_str()); + return format("code: Invalid param1: %s param2: %s", printable(param1).c_str(), printable(param2).c_str()); } } diff --git a/fdbserver/RestoreUtil.h b/fdbserver/RestoreUtil.h index a16aadcd57..9ba86ec9b4 100644 --- a/fdbserver/RestoreUtil.h +++ b/fdbserver/RestoreUtil.h @@ -35,8 +35,8 @@ #include #include -#define SevFRMutationInfo SevVerbose -//#define SevFRMutationInfo SevInfo +//#define SevFRMutationInfo SevVerbose +#define SevFRMutationInfo SevInfo using MutationsVec = Standalone>;