diff --git a/fdbclient/RestoreWorkerInterface.actor.h b/fdbclient/RestoreWorkerInterface.actor.h index 5871d1483e..cc008cad2d 100644 --- a/fdbclient/RestoreWorkerInterface.actor.h +++ b/fdbclient/RestoreWorkerInterface.actor.h @@ -209,6 +209,8 @@ struct RestoreAsset { int64_t offset; int64_t len; + UID uid; + RestoreAsset() = default; bool operator==(const RestoreAsset& r) const { @@ -227,30 +229,29 @@ struct RestoreAsset { template void serialize(Ar& ar) { - serializer(ar, beginVersion, endVersion, range, filename, fileIndex, offset, len); + serializer(ar, beginVersion, endVersion, range, filename, fileIndex, offset, len, uid); } std::string toString() { std::stringstream ss; - ss << "begin:" << beginVersion << " end:" << endVersion << " range:" << range.toString() - << " filename:" << filename << " fileIndex:" << fileIndex << " offset:" << offset << " len:" << len; + ss << "UID:" << uid.toString() << " begin:" << beginVersion << " end:" << endVersion + << " range:" << range.toString() << " filename:" << filename << " fileIndex:" << fileIndex + << " offset:" << offset << " len:" << len; return ss.str(); } + // RestoreAsset and VersionBatch both use endVersion as exclusive in version range bool isInVersionRange(Version commitVersion) const { return commitVersion >= beginVersion && commitVersion < endVersion; } }; -// TODO: It is probably better to specify the (beginVersion, endVersion] for each loadingParam. -// beginVersion (endVersion) is the version the applier is before (after) it receives the request. struct LoadingParam { constexpr static FileIdentifier file_identifier = 17023837; bool isRangeFile; Key url; - Version prevVersion; - Version endVersion; // range file's mutations are all at the endVersion + Optional rangeVersion; // range file's version int64_t blockSize; RestoreAsset asset; @@ -266,13 +267,14 @@ struct LoadingParam { template void serialize(Ar& ar) { - serializer(ar, isRangeFile, url, prevVersion, endVersion, blockSize, asset); + serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset); } std::string toString() { std::stringstream str; - str << "isRangeFile:" << isRangeFile << " url:" << url.toString() << " prevVersion:" << prevVersion - << " endVersion:" << endVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString(); + str << "isRangeFile:" << isRangeFile << " url:" << url.toString() + << " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize + << " RestoreAsset:" << asset.toString(); return str.str(); } }; diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index af359981bb..caf1f635f3 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -112,7 +112,8 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMu // Sanity check: mutations in range file is in [beginVersion, endVersion); // mutations in log file is in [beginVersion, endVersion], both inclusive. ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion); - ASSERT_WE_THINK((req.isRangeFile && commitVersion < req.asset.endVersion) || + // Loader sends the endVersion to ensure all useful versions are sent + ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) || (!req.isRangeFile && commitVersion <= req.asset.endVersion)); if (self->kvOps.find(commitVersion) == self->kvOps.end()) { diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index fe568e1fc6..1906958444 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -44,8 +44,7 @@ ACTOR Future handleLoadFileRequest(RestoreLoadFileRequest req, Reference handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req, Reference self); ACTOR Future sendMutationsToApplier(Reference self, VersionedMutationsMap* kvOps, - bool isRangeFile, Version startVersion, Version endVersion, - RestoreAsset asset); + bool isRangeFile, RestoreAsset asset); ACTOR static Future _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset, SerializedMutationListMap* mutationMap, SerializedMutationPartMap* mutationPartMap, @@ -153,8 +152,8 @@ ACTOR Future _processLoadingParam(LoadingParam param, Reference(param.blockSize, param.asset.len - j); if (param.isRangeFile) { - fileParserFutures.push_back( - _parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc, param.endVersion, subAsset)); + fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc, + param.rangeVersion.get(), subAsset)); } else { // TODO: Sanity check the log file's range is overlapped with the restored version range fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, @@ -198,8 +197,7 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ for (; item != self->kvOpsPerLP.end(); item++) { if (item->first.isRangeFile == req.useRangeFile) { // Send the parsed mutation to applier who will apply the mutation to DB - wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.prevVersion, - item->first.endVersion, item->first.asset)); + wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.asset)); } } @@ -210,26 +208,24 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ // TODO: This function can be revised better // Assume: kvOps data are from the same file. ACTOR Future sendMutationsToApplier(Reference self, VersionedMutationsMap* pkvOps, - bool isRangeFile, Version startVersion, Version endVersion, - RestoreAsset asset) { + bool isRangeFile, RestoreAsset asset) { state VersionedMutationsMap& kvOps = *pkvOps; state VersionedMutationsMap::iterator kvOp = kvOps.begin(); state int kvCount = 0; state int splitMutationIndex = 0; state std::vector applierIDs = self->getWorkingApplierIDs(); state std::vector> requests; - state Version prevVersion = startVersion; + state Version prevVersion = 0; // startVersion TraceEvent("FastRestore_SendMutationToApplier") .detail("Loader", self->id()) .detail("IsRangeFile", isRangeFile) - .detail("StartVersion", startVersion) - .detail("EndVersion", endVersion) - .detail("FileIndex", asset.filename); + .detail("EndVersion", asset.endVersion) + .detail("RestoreAsset", asset.toString()); // Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion - if (kvOps.find(endVersion) == kvOps.end()) { - kvOps[endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier + if (kvOps.find(asset.endVersion) == kvOps.end()) { + kvOps[asset.endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier } splitMutationIndex = 0; @@ -245,6 +241,13 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver applierMutationsSize[applierID] = 0.0; } Version commitVersion = kvOp->first; + if (!(commitVersion >= asset.beginVersion && commitVersion <= asset.endVersion)) { // Debug purpose + TraceEvent(SevError, "FastRestore_SendMutationsToApplier") + .detail("CommitVersion", commitVersion) + .detail("RestoreAsset", asset.toString()); + } + ASSERT(commitVersion >= asset.beginVersion); + ASSERT(commitVersion <= asset.endVersion); // endVersion is an empty commit to ensure progress for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) { MutationRef kvm = kvOp->second[mIndex]; @@ -289,11 +292,11 @@ ACTOR Future sendMutationsToApplier(Reference self, Ver applierID, RestoreSendVersionedMutationsRequest(asset, prevVersion, commitVersion, isRangeFile, applierMutationsBuffer[applierID]))); } - TraceEvent(SevDebug, "FastRestore_Debug") + TraceEvent(SevDebug, "FastRestore_SendMutationToApplier") .detail("Loader", self->id()) .detail("PrevVersion", prevVersion) .detail("CommitVersion", commitVersion) - .detail("Filename", asset.filename); + .detail("RestoreAsset", asset.toString()); ASSERT(prevVersion < commitVersion); prevVersion = commitVersion; wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests)); diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index c11803615b..0d8a18f477 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -36,8 +36,8 @@ #include "flow/actorcompiler.h" // This must be the last #include. ACTOR static Future clearDB(Database cx); -ACTOR static Future collectBackupFiles(Reference bc, std::vector* files, - Database cx, RestoreRequest request); +ACTOR static Future collectBackupFiles(Reference bc, std::vector* rangeFiles, + std::vector* logFiles, Database cx, RestoreRequest request); ACTOR static Future processRestoreRequest(Reference self, Database cx, RestoreRequest request); ACTOR static Future startProcessRestoreRequests(Reference self, Database cx); @@ -226,15 +226,24 @@ ACTOR Future startProcessRestoreRequests(Reference self ACTOR static Future processRestoreRequest(Reference self, Database cx, RestoreRequest request) { - state std::vector files; + state std::vector rangeFiles; + state std::vector logFiles; state std::vector allFiles; state std::map::iterator versionBatch = self->versionBatches.begin(); self->initBackupContainer(request.url); // Get all backup files' description and save them to files - wait(collectBackupFiles(self->bc, &files, cx, request)); - self->buildVersionBatches(files, &self->versionBatches); // Divide files into version batches + wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, cx, request)); + + std::sort(rangeFiles.begin(), rangeFiles.end()); + std::sort(logFiles.begin(), logFiles.end(), [](RestoreFileFR const& f1, RestoreFileFR const& f2) -> bool { + return std::tie(f1.endVersion, f1.beginVersion, f1.fileIndex) < + std::tie(f2.endVersion, f2.beginVersion, f2.fileIndex); + }); + + self->buildVersionBatches(rangeFiles, logFiles, &self->versionBatches); // Divide files into version batches + self->dumpVersionBatches(self->versionBatches); ASSERT(self->batchIndex == 1); // versionBatchIndex starts at 1 because NotifiedVersion starts at 0 for (versionBatch = self->versionBatches.begin(); versionBatch != self->versionBatches.end(); versionBatch++) { @@ -267,10 +276,6 @@ ACTOR static Future loadFilesOnLoaders(Reference self, std::vector> requests; std::map::iterator loader = self->loadersInterf.begin(); - // TODO: Remove files that are empty before proceed - // ASSERT(files->size() > 0); // files should not be empty - - Version prevVersion = 0; for (auto& file : *files) { // NOTE: Cannot skip empty files because empty files, e.g., log file, still need to generate dummy mutation to // drive applier's NotifiedVersion. @@ -279,13 +284,12 @@ ACTOR static Future loadFilesOnLoaders(Reference self, } // Prepare loading LoadingParam param; - - param.prevVersion = 0; // Each file's NotifiedVersion starts from 0 - param.endVersion = file.isRange ? file.version : file.endVersion; param.url = request.url; param.isRangeFile = file.isRange; + param.rangeVersion = file.isRange ? file.version : -1; param.blockSize = file.blockSize; + param.asset.uid = deterministicRandom()->randomUniqueID(); param.asset.filename = file.fileName; param.asset.fileIndex = file.fileIndex; param.asset.offset = 0; @@ -294,14 +298,11 @@ ACTOR static Future loadFilesOnLoaders(Reference self, param.asset.beginVersion = versionBatch.beginVersion; param.asset.endVersion = versionBatch.endVersion; - prevVersion = param.endVersion; - - // Log file to be loaded TraceEvent("FastRestore").detail("LoadParam", param.toString()).detail("LoaderID", loader->first.toString()); - ASSERT_WE_THINK(param.asset.len >= 0); // we may load an empty file + ASSERT_WE_THINK(param.asset.len > 0); ASSERT_WE_THINK(param.asset.offset >= 0); ASSERT_WE_THINK(param.asset.offset <= file.fileSize); - ASSERT_WE_THINK(param.prevVersion <= param.endVersion); + ASSERT_WE_THINK(param.asset.beginVersion <= param.asset.endVersion); requests.emplace_back(loader->first, RestoreLoadFileRequest(param)); loader++; @@ -444,16 +445,18 @@ ACTOR static Future>> collectRestoreRequest } // Collect the backup files' description into output_files by reading the backupContainer bc. -ACTOR static Future collectBackupFiles(Reference bc, std::vector* files, - Database cx, RestoreRequest request) { +ACTOR static Future collectBackupFiles(Reference bc, std::vector* rangeFiles, + std::vector* logFiles, Database cx, + RestoreRequest request) { state BackupDescription desc = wait(bc->describeBackup()); // Convert version to real time for operators to read the BackupDescription desc. wait(desc.resolveVersionTimes(cx)); TraceEvent("FastRestore").detail("BackupDesc", desc.toString()); - if (request.targetVersion == invalidVersion && desc.maxRestorableVersion.present()) + if (request.targetVersion == invalidVersion && desc.maxRestorableVersion.present()) { request.targetVersion = desc.maxRestorableVersion.get(); + } Optional restorable = wait(bc->getRestoreSet(request.targetVersion)); @@ -462,22 +465,26 @@ ACTOR static Future collectBackupFiles(Reference bc, std throw restore_missing_data(); } - if (!files->empty()) { - TraceEvent(SevError, "FastRestore").detail("ClearOldFiles", files->size()); - files->clear(); - } + ASSERT(rangeFiles->empty()); + ASSERT(logFiles->empty()); for (const RangeFile& f : restorable.get().ranges) { TraceEvent("FastRestore").detail("RangeFile", f.toString()); + if (f.fileSize <= 0) { + continue; + } RestoreFileFR file(f.version, f.fileName, true, f.blockSize, f.fileSize, f.version, f.version); TraceEvent("FastRestore").detail("RangeFileFR", file.toString()); - files->push_back(file); + rangeFiles->push_back(file); } for (const LogFile& f : restorable.get().logs) { TraceEvent("FastRestore").detail("LogFile", f.toString()); + if (f.fileSize <= 0) { + continue; + } RestoreFileFR file(f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion, f.beginVersion); TraceEvent("FastRestore").detail("LogFileFR", file.toString()); - files->push_back(file); + logFiles->push_back(file); } return Void(); diff --git a/fdbserver/RestoreMaster.actor.h b/fdbserver/RestoreMaster.actor.h index 8bd82c07d6..7b9e741dd2 100644 --- a/fdbserver/RestoreMaster.actor.h +++ b/fdbserver/RestoreMaster.actor.h @@ -29,6 +29,7 @@ #include #include "flow/Stats.h" +#include "flow/Platform.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbrpc/fdbrpc.h" @@ -44,11 +45,24 @@ extern int restoreStatusIndex; struct VersionBatch { Version beginVersion; // Inclusive - Version endVersion; // Inclusive if it has log files, exclusive if it has only range file + Version endVersion; // exclusive std::vector logFiles; std::vector rangeFiles; + double size; // size of data in range and log files + + VersionBatch() = default; bool isEmpty() { return logFiles.empty() && rangeFiles.empty(); } + void reset() { + beginVersion = 0; + endVersion = 0; + logFiles.clear(); + rangeFiles.clear(); + size = 0; + } + + // RestoreAsset and VersionBatch both use endVersion as exclusive in version range + bool isInVersionRange(Version version) const { return version >= beginVersion && version < endVersion; } }; struct RestoreMasterData : RestoreRoleData, public ReferenceCounted { @@ -90,97 +104,191 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted& allFiles, - std::map* versionBatches) { - // A version batch includes a log file; Because log file's verion range does not overlap, - // we use log file's version range as the version range of a version batch. - Version beginVersion = 0; - Version maxVersion = 0; - for (int i = 0; i < allFiles.size(); ++i) { - if (!allFiles[i].isRange) { - ASSERT(versionBatches->find(allFiles[i].beginVersion) == versionBatches->end()); - VersionBatch vb; - vb.beginVersion = beginVersion; - vb.endVersion = allFiles[i].endVersion; - versionBatches->insert(std::make_pair(vb.beginVersion, vb)); - //(*versionBatches)[vb.beginVersion] = vb; // Ensure continuous version range across version batches - beginVersion = allFiles[i].endVersion; + void dumpVersionBatches(const std::map& versionBatches) { + int i = 0; + for (auto& vb : versionBatches) { + TraceEvent("FastRestoreVersionBatches") + .detail("BatchIndex", i) + .detail("BeginVersion", vb.second.beginVersion) + .detail("EndVersion", vb.second.endVersion) + .detail("Size", vb.second.size); + for (auto& f : vb.second.rangeFiles) { + bool invalidVersion = (f.beginVersion != f.endVersion) || (f.beginVersion >= vb.second.endVersion || + f.beginVersion < vb.second.beginVersion); + TraceEvent(invalidVersion ? SevError : SevInfo, "FastRestoreVersionBatches") + .detail("BatchIndex", i) + .detail("RangeFile", f.toString()); } - if (maxVersion < allFiles[i].endVersion) { - maxVersion = allFiles[i].endVersion; + for (auto& f : vb.second.logFiles) { + bool outOfRange = (f.beginVersion >= vb.second.endVersion || f.endVersion <= vb.second.beginVersion); + TraceEvent(outOfRange ? SevError : SevInfo, "FastRestoreVersionBatches") + .detail("BatchIndex", i) + .detail("LogFile", f.toString()); } + ++i; } - // In case there is no log file - if (versionBatches->empty()) { - VersionBatch vb; - vb.beginVersion = 0; - vb.endVersion = maxVersion + 1; // version batch's endVersion is exclusive - versionBatches->insert(std::make_pair(vb.beginVersion, vb)); - //(*versionBatches)[vb.beginVersion] = vb; // We ensure the version range are continuous across version batches - } - // Put range and log files into its version batch - for (int i = 0; i < allFiles.size(); ++i) { - // vbiter's beginVersion > allFiles[i].beginVersion. - std::map::iterator vbIter = versionBatches->upper_bound(allFiles[i].beginVersion); - --vbIter; - ASSERT_WE_THINK(vbIter != versionBatches->end()); - if (allFiles[i].isRange) { - vbIter->second.rangeFiles.push_back(allFiles[i]); + } + + // Input: Get the size of data in backup files in version range [prevVersion, nextVersion) + // Return: param1: the size of data at nextVersion, param2: the minimum range file index whose version > + // nextVersion, param3: log files with data in [prevVersion, nextVersion) + std::tuple> getVersionSize(Version prevVersion, Version nextVersion, + const std::vector& rangeFiles, + int rangeIdx, + const std::vector& logFiles) { + double size = 0; + TraceEvent("FastRestoreGetVersionSize") + .detail("PreviousVersion", prevVersion) + .detail("NextVersion", nextVersion) + .detail("RangeFiles", rangeFiles.size()) + .detail("RangeIndex", rangeIdx) + .detail("LogFiles", logFiles.size()); + ASSERT(prevVersion <= nextVersion); + while (rangeIdx < rangeFiles.size()) { + TraceEvent(SevDebug, "FastRestoreGetVersionSize").detail("RangeFile", rangeFiles[rangeIdx].toString()); + if (rangeFiles[rangeIdx].version < nextVersion) { + ASSERT(rangeFiles[rangeIdx].version >= prevVersion); + size += rangeFiles[rangeIdx].fileSize; } else { - vbIter->second.logFiles.push_back(allFiles[i]); + break; + } + ++rangeIdx; + } + int logIdx = 0; + std::vector retLogs; + // Scan all logFiles every time to avoid assumption on log files' version ranges. + // For example, we do not assume each version range only exists in one log file + while (logIdx < logFiles.size()) { + Version begin = std::max(prevVersion, logFiles[logIdx].beginVersion); + Version end = std::min(nextVersion, logFiles[logIdx].endVersion); + if (begin < end) { // logIdx file overlap in [prevVersion, nextVersion) + double ratio = (end - begin) * 1.0 / (logFiles[logIdx].endVersion - logFiles[logIdx].beginVersion); + size += logFiles[logIdx].fileSize * ratio; + retLogs.push_back(logFiles[logIdx]); + } + ++logIdx; + } + return std::make_tuple(size, rangeIdx, retLogs); + } + + // Split backup files into version batches, each of which has similar data size + // Input: sorted range files, sorted log files; + // Output: a set of version batches whose size is less than opConfig.batchSizeThreshold + // and each mutation in backup files is included in the version batches exactly once. + // Assumption 1: input files has no empty files; + // Assumption 2: range files at one version <= batchSizeThreshold. + // Note: We do not allow a versionBatch size larger than the batchSizeThreshold because the range file size at + // a version depends on the number of backupAgents and its upper bound is hard to get. + void buildVersionBatches(const std::vector& rangeFiles, const std::vector& logFiles, + std::map* versionBatches) { + bool rewriteNextVersion = false; + int rangeIdx = 0; + int logIdx = 0; // Ensure each log file is included in version batch + Version prevEndVersion = 0; + Version nextVersion = 0; // Used to calculate the batch's endVersion + VersionBatch vb; + vb.beginVersion = 0; // Version batch range [beginVersion, endVersion) + + while (rangeIdx < rangeFiles.size() || logIdx < logFiles.size()) { + if (!rewriteNextVersion) { + if (rangeIdx < rangeFiles.size() && logIdx < logFiles.size()) { + // nextVersion as endVersion is exclusive in the version range + nextVersion = std::max(rangeFiles[rangeIdx].version + 1, nextVersion); + } else if (rangeIdx < rangeFiles.size()) { // i.e., logIdx >= logFiles.size() + nextVersion = rangeFiles[rangeIdx].version + 1; + } else if (logIdx < logFiles.size()) { + while (logIdx < logFiles.size() && logFiles[logIdx].endVersion <= nextVersion) { + logIdx++; + } + if (logIdx < logFiles.size()) { + nextVersion = logFiles[logIdx].endVersion; + } else { + break; // Finished all log files + } + } else { + TraceEvent(SevError, "FastRestoreBuildVersionBatch") + .detail("RangeIndex", rangeIdx) + .detail("RangeFiles", rangeFiles.size()) + .detail("LogIndex", logIdx) + .detail("LogFiles", logFiles.size()); + } + } else { + rewriteNextVersion = false; + } + + double nextVersionSize; + int nextRangeIdx; + std::vector curLogFiles; + std::tie(nextVersionSize, nextRangeIdx, curLogFiles) = + getVersionSize(prevEndVersion, nextVersion, rangeFiles, rangeIdx, logFiles); + + TraceEvent("FastRestoreBuildVersionBatch") + .detail("VersionBatchBeginVersion", vb.beginVersion) + .detail("PreviousEndVersion", prevEndVersion) + .detail("NextVersion", nextVersion) + .detail("RangeIndex", rangeIdx) + .detail("RangeFiles", rangeFiles.size()) + .detail("LogIndex", logIdx) + .detail("LogFiles", logFiles.size()) + .detail("BatchSizeThreshold", opConfig.batchSizeThreshold) + .detail("CurrentBatchSize", vb.size) + .detail("NextVersionIntervalSize", nextVersionSize) + .detail("NextRangeIndex", nextRangeIdx) + .detail("UsedLogFiles", curLogFiles.size()); + + ASSERT(prevEndVersion < nextVersion); // Ensure progress + if (vb.size + nextVersionSize <= opConfig.batchSizeThreshold) { + // nextVersion should be included in this batch + vb.size += nextVersionSize; + while (rangeIdx < nextRangeIdx) { + ASSERT(rangeFiles[rangeIdx].fileSize > 0); + vb.rangeFiles.push_back(rangeFiles[rangeIdx]); + ++rangeIdx; + } + + for (auto& log : curLogFiles) { + ASSERT(log.beginVersion < nextVersion); + ASSERT(log.endVersion > prevEndVersion); + ASSERT(log.fileSize > 0); + vb.logFiles.push_back(log); + } + + vb.endVersion = nextVersion; + prevEndVersion = vb.endVersion; + } else { + if (vb.size < 1) { + // [vb.endVersion, nextVersion) > opConfig.batchSizeThreshold. We should split the version range + if (prevEndVersion >= nextVersion) { + // If range files at one version > batchSizeThreshold, DBA should increase batchSizeThreshold to + // some value larger than nextVersion + TraceEvent(SevError, "FastRestoreBuildVersionBatch") + .detail("NextVersion", nextVersion) + .detail("PreviousEndVersion", prevEndVersion) + .detail("NextVersionIntervalSize", nextVersionSize) + .detail("BatchSizeThreshold", opConfig.batchSizeThreshold) + .detail("SuggestedMinimumBatchSizeThreshold", nextVersion); + // Exit restore early if it won't succeed + flushAndExit(FDB_EXIT_ERROR); + } + ASSERT(prevEndVersion < nextVersion); // Ensure progress + nextVersion = (prevEndVersion + nextVersion) / 2; + rewriteNextVersion = true; + TraceEvent("FastRestoreBuildVersionBatch") + .detail("NextVersionIntervalSize", nextVersionSize); // Duplicate Trace + continue; + } + // Finalize the current version batch + versionBatches->emplace(vb.beginVersion, vb); // copy vb to versionBatch + // start finding the next version batch + vb.reset(); + vb.size = 0; + vb.beginVersion = prevEndVersion; } } - - // Sort files in each of versionBatches and set fileIndex, which is used in deduplicating mutations sent from - // loader to applier. - // Assumption: fileIndex starts at 1. Each loader's initized fileIndex (NotifiedVersion type) starts at 0 - int fileIndex = 0; // fileIndex must be unique; ideally it continuously increase across verstionBatches for - // easier progress tracking - int versionBatchId = 1; - for (auto versionBatch = versionBatches->begin(); versionBatch != versionBatches->end(); versionBatch++) { - std::sort(versionBatch->second.rangeFiles.begin(), versionBatch->second.rangeFiles.end()); - std::sort(versionBatch->second.logFiles.begin(), versionBatch->second.logFiles.end()); - for (auto& logFile : versionBatch->second.logFiles) { - logFile.fileIndex = ++fileIndex; - TraceEvent("FastRestore") - .detail("VersionBatchId", versionBatchId) - .detail("LogFile", logFile.toString()); - } - for (auto& rangeFile : versionBatch->second.rangeFiles) { - rangeFile.fileIndex = ++fileIndex; - TraceEvent("FastRestore") - .detail("VersionBatchId", versionBatchId) - .detail("RangeFile", rangeFile.toString()); - } - versionBatchId++; - } - - TraceEvent("FastRestore").detail("VersionBatches", versionBatches->size()); - // Sanity check - std::set fIndexSet; - for (auto& versionBatch : *versionBatches) { - Version prevVersion = 0; - for (auto& logFile : versionBatch.second.logFiles) { - TraceEvent("FastRestore").detail("PrevVersion", prevVersion).detail("LogFile", logFile.toString()); - ASSERT(logFile.beginVersion >= versionBatch.second.beginVersion); - ASSERT(logFile.endVersion <= versionBatch.second.endVersion); - ASSERT(prevVersion <= logFile.beginVersion); - prevVersion = logFile.endVersion; - ASSERT(fIndexSet.find(logFile.fileIndex) == fIndexSet.end()); - fIndexSet.insert(logFile.fileIndex); - } - prevVersion = 0; - for (auto& rangeFile : versionBatch.second.rangeFiles) { - TraceEvent("FastRestore").detail("PrevVersion", prevVersion).detail("RangeFile", rangeFile.toString()); - ASSERT(rangeFile.beginVersion == rangeFile.endVersion); - ASSERT(rangeFile.beginVersion >= versionBatch.second.beginVersion); - ASSERT(rangeFile.endVersion < versionBatch.second.endVersion); - ASSERT(prevVersion <= rangeFile.beginVersion); - prevVersion = rangeFile.beginVersion; - ASSERT(fIndexSet.find(rangeFile.fileIndex) == fIndexSet.end()); - fIndexSet.insert(rangeFile.fileIndex); - } + // The last wip version batch has some files + if (vb.size > 0) { + vb.endVersion = nextVersion; + versionBatches->emplace(vb.beginVersion, vb); } } @@ -224,4 +332,4 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted startRestoreMaster(Reference masterWorker, Database cx); #include "flow/unactorcompiler.h" -#endif \ No newline at end of file +#endif diff --git a/fdbserver/RestoreUtil.h b/fdbserver/RestoreUtil.h index 3d38634163..5f2a2b2bbb 100644 --- a/fdbserver/RestoreUtil.h +++ b/fdbserver/RestoreUtil.h @@ -55,6 +55,8 @@ struct FastRestoreOpConfig { int num_appliers = 40; // transactionBatchSizeThreshold is used when applier applies multiple mutations in a transaction to DB double transactionBatchSizeThreshold = 512; // 512 in Bytes + // batchSizeThreshold is the maximum data size in each version batch + double batchSizeThreshold = 10.0 * 1024.0 * 1024.0 * 1024.0; // 10 GB }; extern FastRestoreOpConfig opConfig; diff --git a/fdbserver/RestoreWorker.actor.cpp b/fdbserver/RestoreWorker.actor.cpp index 43dd007467..93756b9c70 100644 --- a/fdbserver/RestoreWorker.actor.cpp +++ b/fdbserver/RestoreWorker.actor.cpp @@ -185,8 +185,10 @@ ACTOR Future monitorWorkerLiveness(Reference self) { void initRestoreWorkerConfig() { opConfig.num_loaders = g_network->isSimulated() ? 3 : opConfig.num_loaders; opConfig.num_appliers = g_network->isSimulated() ? 3 : opConfig.num_appliers; + // TODO: Set the threshold to a random value in a range opConfig.transactionBatchSizeThreshold = g_network->isSimulated() ? 512 : opConfig.transactionBatchSizeThreshold; // Byte + opConfig.batchSizeThreshold = g_network->isSimulated() ? 10 * 1024 * 1024 : opConfig.batchSizeThreshold; // Byte TraceEvent("FastRestore") .detail("InitOpConfig", "Result") .detail("NumLoaders", opConfig.num_loaders)