Merge pull request #2443 from xumengpanda/mengxu/fast-restore-fix-valgrind-PR

Performant restore [12/XX]: Code clean up
This commit is contained in:
Jingyu Zhou 2019-12-13 14:35:20 -08:00 committed by GitHub
commit ded2a301e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 100 additions and 264 deletions

View File

@ -90,7 +90,7 @@ struct MutationRef {
return format("code: %s param1: %s param2: %s", typeString[type], printable(param1).c_str(), printable(param2).c_str());
}
else {
return format("code: %s param1: %s param2: %s", "Invalid", printable(param1).c_str(), printable(param2).c_str());
return format("code: Invalid param1: %s param2: %s", printable(param1).c_str(), printable(param2).c_str());
}
}

View File

@ -398,7 +398,7 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest {
RestoreSendVersionedMutationsRequest() = default;
explicit RestoreSendVersionedMutationsRequest(int fileIndex, Version prevVersion, Version version, bool isRangeFile,
VectorRef<MutationRef> mutations)
MutationsVec mutations)
: fileIndex(fileIndex), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile),
mutations(mutations) {}
@ -453,17 +453,12 @@ struct RestoreRequest {
bool lockDB;
UID randomUid;
int testData;
std::vector<int> restoreRequests;
// Key restoreTag;
ReplyPromise<struct RestoreCommonReply> reply;
RestoreRequest() : testData(0) {}
explicit RestoreRequest(int testData) : testData(testData) {}
explicit RestoreRequest(int testData, std::vector<int>& restoreRequests)
: testData(testData), restoreRequests(restoreRequests) {}
RestoreRequest() = default;
explicit RestoreRequest(const int index, const Key& tagName, const Key& url, bool waitForComplete,
Version targetVersion, bool verbose, const KeyRange& range, const Key& addPrefix,
const Key& removePrefix, bool lockDB, const UID& randomUid)
@ -474,7 +469,7 @@ struct RestoreRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, index, tagName, url, waitForComplete, targetVersion, verbose, range, addPrefix, removePrefix,
lockDB, randomUid, testData, restoreRequests, reply);
lockDB, randomUid, restoreRequests, reply);
}
std::string toString() const {

View File

@ -108,13 +108,13 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
if (curFilePos.get() == req.prevVersion) {
Version commitVersion = req.version;
VectorRef<MutationRef> mutations(req.mutations);
MutationsVec mutations(req.mutations);
if (self->kvOps.find(commitVersion) == self->kvOps.end()) {
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
self->kvOps.insert(std::make_pair(commitVersion, MutationsVec()));
}
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
MutationRef mutation = mutations[mIndex];
TraceEvent(SevDebug, "FastRestore")
TraceEvent(SevFRMutationInfo, "FastRestore")
.detail("ApplierNode", self->id())
.detail("FileUID", req.fileIndex)
.detail("Version", commitVersion)
@ -338,7 +338,7 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
TraceEvent(SevError, "FastRestore").detail("InvalidMutationType", m.type);
}
TraceEvent(SevDebug, "FastRestore_Debug")
TraceEvent(SevFRMutationInfo, "FastRestore")
.detail("ApplierApplyToDB", self->describeNode())
.detail("Version", progress.curItInCurTxn->first)
.detail("Index", progress.curIndexInCurTxn)

View File

@ -47,12 +47,12 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
// rangeToApplier is in master and loader. Loader uses it to determine which applier a mutation should be sent
// KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Standalone<KeyRef>, UID> rangeToApplier;
std::map<Key, UID> rangeToApplier;
// keyOpsCount is the number of operations per key that is used to determine the key-range boundary for appliers
std::map<Standalone<KeyRef>, int> keyOpsCount;
std::map<Key, int> keyOpsCount;
// For master applier to hold the lower bound of key ranges for each appliers
std::vector<Standalone<KeyRef>> keyRangeLowerBounds;
std::vector<Key> keyRangeLowerBounds;
// TODO: This block of variables may be moved to RestoreRoleData
bool inProgressApplyToDB = false;

View File

@ -127,18 +127,6 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
}
ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoaderData> self) {
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartProcessLoadParam", param.toString());
ASSERT(param.blockSize > 0);
ASSERT(param.offset % param.blockSize == 0); // Parse file must be at block bondary.
ASSERT(self->kvOpsPerLP.find(param) == self->kvOpsPerLP.end());
// NOTE: map's iterator is guaranteed to be stable, but pointer may not.
// state VersionedMutationsMap* kvOps = &self->kvOpsPerLP[param];
self->kvOpsPerLP.emplace(param, VersionedMutationsMap());
self->sampleMutations.emplace(param, MutationsVec());
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = self->kvOpsPerLP.find(param);
state std::map<LoadingParam, MutationsVec>::iterator samplesIter = self->sampleMutations.find(param);
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
// mutationMap: Key is the unique identifier for a batch of mutation logs at the same version
@ -146,6 +134,21 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
state std::map<Standalone<StringRef>, uint32_t> mutationPartMap; // Sanity check the data parsing is correct
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = self->kvOpsPerLP.end();
state std::map<LoadingParam, MutationsVec>::iterator samplesIter = self->sampleMutations.end();
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartProcessLoadParam", param.toString());
ASSERT(param.blockSize > 0);
ASSERT(param.offset % param.blockSize == 0); // Parse file must be at block bondary.
ASSERT(self->kvOpsPerLP.find(param) == self->kvOpsPerLP.end());
// NOTE: map's iterator is guaranteed to be stable, but pointer may not.
// state VersionedMutationsMap* kvOps = &self->kvOpsPerLP[param];
self->kvOpsPerLP.emplace(param, VersionedMutationsMap());
self->sampleMutations.emplace(param, MutationsVec());
kvOpsPerLPIter = self->kvOpsPerLP.find(param);
samplesIter = self->sampleMutations.find(param);
int64_t j;
int64_t readOffset;
@ -188,7 +191,6 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ASSERT(self->processedFileParams.find(req.param) != self->processedFileParams.end());
wait(self->processedFileParams[req.param]); // wait on the processing of the req.param.
// TODO: Send sampled mutations back to master
req.reply.send(RestoreLoadFileReply(req.param, self->sampleMutations[req.param]));
// TODO: clear self->sampleMutations[req.param] memory to save memory on loader
return Void();
@ -196,9 +198,9 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
self->rangeToApplier = req.rangeToApplier;
state std::map<LoadingParam, VersionedMutationsMap>::iterator item = self->kvOpsPerLP.begin();
self->rangeToApplier = req.rangeToApplier;
for (; item != self->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {
// Send the parsed mutation to applier who will apply the mutation to DB
@ -216,6 +218,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* pkvOps,
bool isRangeFile, Version startVersion, Version endVersion, int fileIndex) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
state int splitMutationIndex = 0;
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
@ -231,37 +234,31 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if (kvOps.find(endVersion) == kvOps.end()) {
kvOps[endVersion] = VectorRef<MutationRef>(); // Empty mutation vector will be handled by applier
kvOps[endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
}
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
state std::map<UID, MutationsVec> applierMutationsBuffer;
state std::map<UID, double> applierMutationsSize;
state MutationsVec mvector;
state Standalone<VectorRef<UID>> nodeIDs;
splitMutationIndex = 0;
kvCount = 0;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
applierMutationsBuffer.clear();
applierMutationsSize.clear();
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
std::map<UID, MutationsVec> applierMutationsBuffer;
std::map<UID, double> applierMutationsSize;
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec(VectorRef<MutationRef>());
applierMutationsBuffer[applierID] = MutationsVec();
applierMutationsSize[applierID] = 0.0;
}
state Version commitVersion = kvOp->first;
Version commitVersion = kvOp->first;
for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
MutationRef kvm = kvOp->second[mIndex];
// Send the mutation to applier
if (isRangeMutation(kvm)) {
MutationsVec mvector;
Standalone<VectorRef<UID>> nodeIDs;
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
mvector.pop_front(mvector.size());
nodeIDs.pop_front(nodeIDs.size());
// WARNING: The splitMutation() may have bugs
splitMutation(self, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
@ -296,8 +293,6 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
requests.push_back(std::make_pair(
applierID, RestoreSendVersionedMutationsRequest(fileIndex, prevVersion, commitVersion, isRangeFile,
applierMutationsBuffer[applierID])));
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0;
}
TraceEvent(SevDebug, "FastRestore_Debug")
.detail("Loader", self->id())
@ -305,10 +300,11 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
.detail("CommitVersion", commitVersion)
.detail("FileIndex", fileIndex);
ASSERT(prevVersion < commitVersion);
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests));
requests.clear();
ASSERT(prevVersion < commitVersion);
prevVersion = commitVersion;
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests));
requests.clear();
} // all versions of mutations in the same file
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);
@ -371,10 +367,8 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
std::map<Standalone<StringRef>, uint32_t>& mutationPartMap = *pMutationPartMap;
std::string prefix = "||\t";
std::stringstream ss;
StringRef val = val_input.contents();
const int key_prefix_len = sizeof(uint8_t) + sizeof(Version) + sizeof(uint32_t);
StringRefReaderMX reader(val, restore_corrupted_data());
StringRefReaderMX readerKey(key_input, restore_corrupted_data()); // read key_input!
int logRangeMutationFirstLength = key_input.size() - key_prefix_len;
bool concatenated = false;
@ -451,7 +445,7 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
StringRefReaderMX kReader(k, restore_corrupted_data());
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
kvOps.insert(std::make_pair(commitVersion, MutationsVec()));
StringRefReaderMX vReader(val, restore_corrupted_data());
vReader.consume<uint64_t>(); // Consume the includeVersion
@ -540,7 +534,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
data[i].value); // ASSUME: all operation in range file is set.
// We cache all kv operations into kvOps, and apply all kv operations later in one place
kvOps.insert(std::make_pair(version, VectorRef<MutationRef>()));
kvOps.insert(std::make_pair(version, MutationsVec()));
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
.detail("CommitVersion", version)
.detail("ParsedMutationKV", m.toString());

View File

@ -79,6 +79,9 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker,
// RestoreWorker that has restore master role: Recruite a role for each worker
ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker,
Reference<RestoreMasterData> masterData) {
state int nodeIndex = 0;
state RestoreRole role = RestoreRole::Invalid;
TraceEvent("FastRestore")
.detail("RecruitRestoreRoles", masterWorker->workerInterfaces.size())
.detail("NumLoaders", opConfig.num_loaders)
@ -91,8 +94,6 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker
ASSERT(opConfig.num_loaders + opConfig.num_appliers <= masterWorker->workerInterfaces.size());
// Assign a role to each worker
state int nodeIndex = 0;
state RestoreRole role;
std::vector<std::pair<UID, RestoreRecruitRoleRequest>> requests;
for (auto& workerInterf : masterWorker->workerInterfaces) {
if (nodeIndex >= 0 && nodeIndex < opConfig.num_appliers) {
@ -158,11 +159,14 @@ ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreWorkerData> masterW
// and ask all restore roles to quit.
ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self, Database cx) {
state UID randomUID = deterministicRandom()->randomUniqueID();
TraceEvent("FastRestore").detail("RestoreMaster", "WaitOnRestoreRequests");
state Standalone<VectorRef<RestoreRequest>> restoreRequests = wait(collectRestoreRequests(cx));
state int numTries = 0;
state int restoreIndex = 0;
TraceEvent("FastRestore").detail("RestoreMaster", "WaitOnRestoreRequests");
// lock DB for restore
state int numTries = 0;
numTries = 0;
loop {
try {
wait(lockDatabase(cx, randomUID));
@ -187,7 +191,6 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
wait(clearDB(cx));
// Step: Perform the restore requests
state int restoreIndex = 0;
try {
for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) {
RestoreRequest& request = restoreRequests[restoreIndex];
@ -195,7 +198,14 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
wait(success(processRestoreRequest(self, cx, request)));
}
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreFailed").detail("RestoreRequest", restoreRequests[restoreIndex].toString());
if (restoreIndex < restoreRequests.size()) {
TraceEvent(SevError, "FastRestoreFailed")
.detail("RestoreRequest", restoreRequests[restoreIndex].toString());
} else {
TraceEvent(SevError, "FastRestoreFailed")
.detail("RestoreRequests", restoreRequests.size())
.detail("RestoreIndex", restoreIndex);
}
}
// Step: Notify all restore requests have been handled by cleaning up the restore keys
@ -218,6 +228,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
RestoreRequest request) {
state std::vector<RestoreFileFR> files;
state std::vector<RestoreFileFR> allFiles;
state std::map<Version, VersionBatch>::iterator versionBatch = self->versionBatches.begin();
self->initBackupContainer(request.url);
@ -225,7 +236,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
wait(collectBackupFiles(self->bc, &files, cx, request));
self->buildVersionBatches(files, &self->versionBatches); // Divide files into version batches
state std::map<Version, VersionBatch>::iterator versionBatch;
ASSERT(self->batchIndex == 1); // versionBatchIndex starts at 1 because NotifiedVersion starts at 0
for (versionBatch = self->versionBatches.begin(); versionBatch != self->versionBatches.end(); versionBatch++) {
wait(initializeVersionBatch(self));
wait(distributeWorkloadPerVersionBatch(self, cx, request, versionBatch->second));
@ -243,14 +254,11 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
.detail("BeginVersion", versionBatch.beginVersion)
.detail("EndVersion", versionBatch.endVersion);
Key mutationLogPrefix;
std::vector<RestoreFileFR>* files;
std::vector<RestoreFileFR>* files = nullptr;
if (isRangeFile) {
files = &versionBatch.rangeFiles;
} else {
files = &versionBatch.logFiles;
Reference<RestoreConfigFR> restoreConfig(new RestoreConfigFR(request.randomUid));
mutationLogPrefix = restoreConfig->mutationLogPrefix();
}
// sort files in increasing order of beginVersion
@ -331,7 +339,6 @@ ACTOR static Future<Void> sendMutationsFromLoaders(Reference<RestoreMasterData>
ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMasterData> self, Database cx,
RestoreRequest request, VersionBatch versionBatch) {
ASSERT(!versionBatch.isEmpty());
ASSERT(self->loadersInterf.size() > 0);
ASSERT(self->appliersInterf.size() > 0);
@ -355,6 +362,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
// Decide which key range should be taken by which applier
void splitKeyRangeForAppliers(Reference<RestoreMasterData> self) {
ASSERT(self->samplesSize >= 0);
int numAppliers = self->appliersInterf.size();
double slotSize = std::max(self->samplesSize / numAppliers, 1.0);
std::vector<Key> keyrangeSplitter;
@ -392,15 +400,17 @@ void splitKeyRangeForAppliers(Reference<RestoreMasterData> self) {
self->rangeToApplier[keyrangeSplitter[i]] = applier.first;
i++;
}
ASSERT(self->rangeToApplier.size() > 0);
ASSERT(self->sanityCheckApplierKeyRange());
self->logApplierKeyRange();
}
ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequests(Database cx) {
state Standalone<VectorRef<RestoreRequest>> restoreRequests;
state Future<Void> watch4RestoreRequest;
state ReadYourWritesTransaction tr(cx);
// wait for the restoreRequestTriggerKey to be set by the client/test workload
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.reset();
@ -517,6 +527,8 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<RestoreMasterD
// Ask all loaders and appliers to perform housecleaning at the end of restore and
// Register the restoreRequestDoneKey to signal the end of restore
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> self, Database cx) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
for (auto& loader : self->loadersInterf) {
requests.push_back(std::make_pair(loader.first, RestoreVersionBatchRequest(self->batchIndex)));
@ -534,7 +546,6 @@ ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> se
wait(delay(5.0)); // Give some time for loaders and appliers to exit
// Notify tester that the restore has finished
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);

View File

@ -162,9 +162,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
for (auto& versionBatch : *versionBatches) {
Version prevVersion = 0;
for (auto& logFile : versionBatch.second.logFiles) {
TraceEvent("FastRestore_Debug")
.detail("PrevVersion", prevVersion)
.detail("LogFile", logFile.toString());
TraceEvent("FastRestore").detail("PrevVersion", prevVersion).detail("LogFile", logFile.toString());
ASSERT(logFile.beginVersion >= versionBatch.second.beginVersion);
ASSERT(logFile.endVersion <= versionBatch.second.endVersion);
ASSERT(prevVersion <= logFile.beginVersion);
@ -174,9 +172,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
}
prevVersion = 0;
for (auto& rangeFile : versionBatch.second.rangeFiles) {
TraceEvent("FastRestore_Debug")
.detail("PrevVersion", prevVersion)
.detail("RangeFile", rangeFile.toString());
TraceEvent("FastRestore").detail("PrevVersion", prevVersion).detail("RangeFile", rangeFile.toString());
ASSERT(rangeFile.beginVersion == rangeFile.endVersion);
ASSERT(rangeFile.beginVersion >= versionBatch.second.beginVersion);
ASSERT(rangeFile.endVersion < versionBatch.second.endVersion);
@ -188,6 +184,26 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
}
}
// Return true if pass the sanity check
bool sanityCheckApplierKeyRange() {
bool ret = true;
// An applier should only appear once in rangeToApplier
std::map<UID, Key> applierToRange;
for (auto& applier : rangeToApplier) {
if (applierToRange.find(applier.second) == applierToRange.end()) {
applierToRange[applier.second] = applier.first;
} else {
TraceEvent(SevError, "FastRestore")
.detail("SanityCheckApplierKeyRange", applierToRange.size())
.detail("ApplierID", applier.second)
.detail("Key1", applierToRange[applier.second])
.detail("Key2", applier.first);
ret = false;
}
}
return ret;
}
void logApplierKeyRange() {
TraceEvent("FastRestore").detail("ApplierKeyRangeNum", rangeToApplier.size());
for (auto& applier : rangeToApplier) {

View File

@ -82,8 +82,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
} else {
// Add backup ranges
// Q: why the range endpoints (the range interval) are randomly generated?
// Won't this cause unbalanced range interval in backup?
std::set<std::string> rangeEndpoints;
while (rangeEndpoints.size() < backupRangesCount * 2) {
rangeEndpoints.insert(deterministicRandom()->randomAlphaNumeric(
@ -105,163 +103,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
}
}
static void compareDBKVs(Standalone<RangeResultRef> data, BackupAndParallelRestoreCorrectnessWorkload* self) {
bool hasDiff = false;
// Get the new KV pairs in the DB
std::map<Standalone<KeyRef>, Standalone<ValueRef>> newDbKVs;
for (auto kvRef = data.contents().begin(); kvRef != data.contents().end(); kvRef++) {
newDbKVs.insert(std::make_pair(kvRef->key, kvRef->value));
}
if (self->dbKVs.empty()) {
printf("[CheckDB] set DB kv for the first time.\n");
self->dbKVs = newDbKVs;
return;
}
printf("[CheckDB] KV Number. Prev DB:%ld Current DB:%ld\n", self->dbKVs.size(), newDbKVs.size());
// compare the KV pairs in the DB
printf("------------------Now print out the diff between the prev DB and current DB-------------------\n");
if (self->dbKVs.size() >= newDbKVs.size()) {
for (auto kv = self->dbKVs.begin(); kv != self->dbKVs.end(); kv++) {
bool exist = (newDbKVs.find(kv->first) != newDbKVs.end());
if (!exist) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(kv->first).c_str(),
getHexString(kv->second).c_str(), "[Not Exist]");
hasDiff = true;
}
if (exist && (newDbKVs[kv->first] != self->dbKVs[kv->first])) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(kv->first).c_str(),
getHexString(kv->second).c_str(), getHexString(newDbKVs[kv->first]).c_str());
hasDiff = true;
}
}
} else {
for (auto newKV = newDbKVs.begin(); newKV != newDbKVs.end(); newKV++) {
bool exist = (self->dbKVs.find(newKV->first) != self->dbKVs.end());
if (!exist) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", "[Not Exist]", getHexString(newKV->first).c_str(),
getHexString(newKV->second).c_str());
hasDiff = true;
}
if (exist && (newDbKVs[newKV->first] != self->dbKVs[newKV->first])) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(newKV->first).c_str(),
getHexString(self->dbKVs[newKV->first]).c_str(),
getHexString(newDbKVs[newKV->first]).c_str());
hasDiff = true;
}
}
}
int numEntries = 10;
int i = 0;
if (hasDiff) {
// print out the first and last 10 entries
printf("\t---Prev DB first and last %d entries\n", numEntries);
if (!self->dbKVs.empty()) {
auto kv = self->dbKVs.begin();
for (; kv != self->dbKVs.end(); kv++) {
if (i >= numEntries) break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i++, getHexString(kv->first).c_str(),
getHexString(kv->second).c_str());
}
i = self->dbKVs.size();
kv = self->dbKVs.end();
for (--kv; kv != self->dbKVs.begin(); kv--) {
if (i <= self->dbKVs.size() - numEntries) break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i--, getHexString(kv->first).c_str(),
getHexString(kv->second).c_str());
}
}
printf("\t---Current DB first and last %d entries\n", numEntries);
if (!newDbKVs.empty()) {
auto kv = newDbKVs.begin();
i = 0;
for (; kv != newDbKVs.end(); kv++) {
if (i >= numEntries) break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i++, getHexString(kv->first).c_str(),
getHexString(kv->second).c_str());
}
i = newDbKVs.size();
kv = newDbKVs.end();
for (--kv; kv != newDbKVs.begin(); kv--) {
if (i <= newDbKVs.size() - numEntries) break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i--, getHexString(kv->first).c_str(),
getHexString(kv->second).c_str());
}
}
}
self->dbKVs = newDbKVs; // update the dbKVs
}
static void dumpDBKVs(Standalone<RangeResultRef> data, BackupAndParallelRestoreCorrectnessWorkload* self) {
// bool hasDiff = false;
// Get the new KV pairs in the DB
std::map<Standalone<KeyRef>, Standalone<ValueRef>> newDbKVs;
for (auto kvRef = data.contents().begin(); kvRef != data.contents().end(); kvRef++) {
newDbKVs.insert(std::make_pair(kvRef->key, kvRef->value));
}
printf("---------------------Now print out the KV in the current DB---------------------\n");
for (auto newKV = newDbKVs.begin(); newKV != newDbKVs.end(); newKV++) {
printf("\tKey:%s Value:%s\n", getHexString(newKV->first).c_str(), getHexString(newKV->second).c_str());
}
}
ACTOR static Future<Void> checkDB(Database cx, std::string when,
BackupAndParallelRestoreCorrectnessWorkload* self) {
state Key keyPrefix = LiteralStringRef("");
state Transaction tr(cx);
state int retryCount = 0;
loop {
try {
state Version v = wait(tr.getReadVersion());
state Standalone<RangeResultRef> data = wait(
tr.getRange(firstGreaterOrEqual(doubleToTestKey(0.0, keyPrefix)),
firstGreaterOrEqual(doubleToTestKey(1.0, keyPrefix)), std::numeric_limits<int>::max()));
// compareDBKVs(data, self);
break;
} catch (Error& e) {
retryCount++;
TraceEvent(retryCount > 20 ? SevWarnAlways : SevWarn, "CheckDBError").error(e);
wait(tr.onError(e));
}
}
return Void();
}
ACTOR static Future<Void> dumpDB(Database cx, std::string when, BackupAndParallelRestoreCorrectnessWorkload* self) {
state Key keyPrefix = LiteralStringRef("");
// int numPrint = 20; //number of entries in the front and end to print out.
state Transaction tr(cx);
state int retryCount = 0;
loop {
try {
state Standalone<RangeResultRef> data = wait(
tr.getRange(firstGreaterOrEqual(doubleToTestKey(0.0, keyPrefix)),
firstGreaterOrEqual(doubleToTestKey(1.0, keyPrefix)), std::numeric_limits<int>::max()));
printf("dump DB, at %s. retryCount:%d Data size:%d, rangeResultInfo:%s\n", when.c_str(), retryCount,
data.size(), data.contents().toString().c_str());
dumpDBKVs(data, self);
return Void();
} catch (Error& e) {
retryCount++;
TraceEvent(retryCount > 20 ? SevWarnAlways : SevWarn, "dumpDBError").error(e);
wait(tr.onError(e));
}
}
}
virtual std::string description() { return "BackupAndParallelRestoreCorrectness"; }
virtual Future<Void> setup(Database const& cx) { return Void(); }
@ -310,10 +151,9 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
FileBackupAgent* backupAgent, Database cx, Key tag,
Standalone<VectorRef<KeyRangeRef>> backupRanges, double stopDifferentialDelay,
Promise<Void> submittted) {
state UID randomID = nondeterministicRandom()->randomUniqueID();
state Future<Void> stopDifferentialFuture = delay(stopDifferentialDelay);
wait(delay(startDelay));
if (startDelay || BUGGIFY) {
@ -492,11 +332,10 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state FileBackupAgent backupAgent;
state Future<Void> extraBackup;
state bool extraTasks = false;
state ReadYourWritesTransaction tr1(cx);
state ReadYourWritesTransaction tr2(cx);
state UID randomID = nondeterministicRandom()->randomUniqueID();
state int restoreIndex = 0;
state bool restoreDone = false;
state ReadYourWritesTransaction tr2(cx);
TraceEvent("BARW_Arguments")
.detail("BackupTag", printable(self->backupTag))
@ -521,8 +360,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// backup
wait(delay(self->backupAfter));
wait(checkDB(cx, "BeforeStartBackup", self));
TraceEvent("BARW_DoBackup1", randomID).detail("Tag", printable(self->backupTag));
state Promise<Void> submitted;
state Future<Void> b = doBackup(self, 0, &backupAgent, cx, self->backupTag, self->backupRanges,
@ -551,8 +388,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("BackupTag", printable(self->backupTag))
.detail("AbortAndRestartAfter", self->abortAndRestartAfter);
wait(checkDB(cx, "BackupDone", self));
state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString());
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx));
state UID logUid = uidFlag.first;
@ -617,13 +452,13 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state std::vector<Standalone<StringRef>> restoreTags;
// Restore each range by calling backupAgent.restore()
printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size());
TraceEvent("FastRestore").detail("PrepareRestores", self->backupRanges.size());
loop {
state ReadYourWritesTransaction tr1(cx);
tr1.reset();
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr1.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size());
// Note: we always lock DB here in case DB is modified at the bacupRanges boundary.
for (restoreIndex = 0; restoreIndex < self->backupRanges.size(); restoreIndex++) {
auto range = self->backupRanges[restoreIndex];
@ -645,7 +480,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
wait(tr1.onError(e));
}
};
printf("FastRestore:Test workload triggers the restore by setting up restoreRequestTriggerKey\n");
TraceEvent("FastRestore").detail("TriggerRestore", "Setting up restoreRequestTriggerKey");
// Sometimes kill and restart the restore
if (BUGGIFY) {
@ -698,7 +533,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
}
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "RestoreFinished");
wait(checkDB(cx, "FinishRestore", self));
for (auto& restore : restores) {
ASSERT(!restore.isError());

View File

@ -200,8 +200,8 @@ add_fdb_test(TEST_FILES slow/VersionStampSwitchover.txt)
add_fdb_test(TEST_FILES slow/WriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/WriteDuringReadSwitchover.txt)
add_fdb_test(TEST_FILES slow/ddbalance.txt)
add_fdb_test(TEST_FILES ParallelRestoreCorrectnessAtomicOpTinyData.txt)
add_fdb_test(TEST_FILES ParallelRestoreCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessCycle.txt)
# Note that status tests are not deterministic.
add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt)
add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt)

View File

@ -3,9 +3,9 @@ testTitle=BackupAndParallelRestoreWithAtomicOp
nodeCount=30000
; Make ops space only 1 key per group
; nodeCount=100
transactionsPerSecond=2500.0
; transactionsPerSecond=2500.0
; transactionsPerSecond=500.0
; transactionsPerSecond=100.0
transactionsPerSecond=500.0
; nodeCount=4
; transactionsPerSecond=250.0
testDuration=30.0
@ -14,20 +14,6 @@ testTitle=BackupAndParallelRestoreWithAtomicOp
; opType=0
; actorsPerClient=1
; AtomicBackupCorrectness.txt does not mix Cycle and AtomicOps workloads
; testName=Cycle
;; nodeCount=30000
;; nodeCount=1000
; nodeCount=4
;; transactionsPerSecond=2.0
;; transactionsPerSecond=10.0
;; transactionsPerSecond=20.0
; transactionsPerSecond=2500.0
; testDuration=30.0
; expectedRate=0
; clearAfterTest=false
; keyPrefix=a
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload