Merge pull request #2519 from xumengpanda/mengxu/fast-restore-versionBatch-fixSize-PR

Performant restore [14/XX]: Ensure each version-batch not exceed a configured size
This commit is contained in:
A.J. Beamon 2020-01-23 16:49:01 -08:00 committed by GitHub
commit b2c8a4a34c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 265 additions and 140 deletions

View File

@ -209,6 +209,8 @@ struct RestoreAsset {
int64_t offset;
int64_t len;
UID uid;
RestoreAsset() = default;
bool operator==(const RestoreAsset& r) const {
@ -227,30 +229,29 @@ struct RestoreAsset {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, beginVersion, endVersion, range, filename, fileIndex, offset, len);
serializer(ar, beginVersion, endVersion, range, filename, fileIndex, offset, len, uid);
}
std::string toString() {
std::stringstream ss;
ss << "begin:" << beginVersion << " end:" << endVersion << " range:" << range.toString()
<< " filename:" << filename << " fileIndex:" << fileIndex << " offset:" << offset << " len:" << len;
ss << "UID:" << uid.toString() << " begin:" << beginVersion << " end:" << endVersion
<< " range:" << range.toString() << " filename:" << filename << " fileIndex:" << fileIndex
<< " offset:" << offset << " len:" << len;
return ss.str();
}
// RestoreAsset and VersionBatch both use endVersion as exclusive in version range
bool isInVersionRange(Version commitVersion) const {
return commitVersion >= beginVersion && commitVersion < endVersion;
}
};
// TODO: It is probably better to specify the (beginVersion, endVersion] for each loadingParam.
// beginVersion (endVersion) is the version the applier is before (after) it receives the request.
struct LoadingParam {
constexpr static FileIdentifier file_identifier = 17023837;
bool isRangeFile;
Key url;
Version prevVersion;
Version endVersion; // range file's mutations are all at the endVersion
Optional<Version> rangeVersion; // range file's version
int64_t blockSize;
RestoreAsset asset;
@ -266,13 +267,14 @@ struct LoadingParam {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, isRangeFile, url, prevVersion, endVersion, blockSize, asset);
serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset);
}
std::string toString() {
std::stringstream str;
str << "isRangeFile:" << isRangeFile << " url:" << url.toString() << " prevVersion:" << prevVersion
<< " endVersion:" << endVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
str << "isRangeFile:" << isRangeFile << " url:" << url.toString()
<< " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize
<< " RestoreAsset:" << asset.toString();
return str.str();
}
};

View File

@ -112,7 +112,8 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Sanity check: mutations in range file is in [beginVersion, endVersion);
// mutations in log file is in [beginVersion, endVersion], both inclusive.
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion);
ASSERT_WE_THINK((req.isRangeFile && commitVersion < req.asset.endVersion) ||
// Loader sends the endVersion to ensure all useful versions are sent
ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) ||
(!req.isRangeFile && commitVersion <= req.asset.endVersion));
if (self->kvOps.find(commitVersion) == self->kvOps.end()) {

View File

@ -44,8 +44,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self);
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* kvOps,
bool isRangeFile, Version startVersion, Version endVersion,
RestoreAsset asset);
bool isRangeFile, RestoreAsset asset);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap,
@ -153,8 +152,8 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
subAsset.offset = j;
subAsset.len = std::min<int64_t>(param.blockSize, param.asset.len - j);
if (param.isRangeFile) {
fileParserFutures.push_back(
_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc, param.endVersion, subAsset));
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc,
param.rangeVersion.get(), subAsset));
} else {
// TODO: Sanity check the log file's range is overlapped with the restored version range
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
@ -198,8 +197,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
for (; item != self->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {
// Send the parsed mutation to applier who will apply the mutation to DB
wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.prevVersion,
item->first.endVersion, item->first.asset));
wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.asset));
}
}
@ -210,26 +208,24 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
// TODO: This function can be revised better
// Assume: kvOps data are from the same file.
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* pkvOps,
bool isRangeFile, Version startVersion, Version endVersion,
RestoreAsset asset) {
bool isRangeFile, RestoreAsset asset) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
state int splitMutationIndex = 0;
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::vector<std::pair<UID, RestoreSendVersionedMutationsRequest>> requests;
state Version prevVersion = startVersion;
state Version prevVersion = 0; // startVersion
TraceEvent("FastRestore_SendMutationToApplier")
.detail("Loader", self->id())
.detail("IsRangeFile", isRangeFile)
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion)
.detail("FileIndex", asset.filename);
.detail("EndVersion", asset.endVersion)
.detail("RestoreAsset", asset.toString());
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if (kvOps.find(endVersion) == kvOps.end()) {
kvOps[endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
if (kvOps.find(asset.endVersion) == kvOps.end()) {
kvOps[asset.endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
}
splitMutationIndex = 0;
@ -245,6 +241,13 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
applierMutationsSize[applierID] = 0.0;
}
Version commitVersion = kvOp->first;
if (!(commitVersion >= asset.beginVersion && commitVersion <= asset.endVersion)) { // Debug purpose
TraceEvent(SevError, "FastRestore_SendMutationsToApplier")
.detail("CommitVersion", commitVersion)
.detail("RestoreAsset", asset.toString());
}
ASSERT(commitVersion >= asset.beginVersion);
ASSERT(commitVersion <= asset.endVersion); // endVersion is an empty commit to ensure progress
for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
MutationRef kvm = kvOp->second[mIndex];
@ -289,11 +292,11 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
applierID, RestoreSendVersionedMutationsRequest(asset, prevVersion, commitVersion, isRangeFile,
applierMutationsBuffer[applierID])));
}
TraceEvent(SevDebug, "FastRestore_Debug")
TraceEvent(SevDebug, "FastRestore_SendMutationToApplier")
.detail("Loader", self->id())
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion)
.detail("Filename", asset.filename);
.detail("RestoreAsset", asset.toString());
ASSERT(prevVersion < commitVersion);
prevVersion = commitVersion;
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests));

View File

@ -36,8 +36,8 @@
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR static Future<Void> clearDB(Database cx);
ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* files,
Database cx, RestoreRequest request);
ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
std::vector<RestoreFileFR>* logFiles, Database cx, RestoreRequest request);
ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData> self, Database cx, RestoreRequest request);
ACTOR static Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self, Database cx);
@ -226,15 +226,24 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData> self, Database cx,
RestoreRequest request) {
state std::vector<RestoreFileFR> files;
state std::vector<RestoreFileFR> rangeFiles;
state std::vector<RestoreFileFR> logFiles;
state std::vector<RestoreFileFR> allFiles;
state std::map<Version, VersionBatch>::iterator versionBatch = self->versionBatches.begin();
self->initBackupContainer(request.url);
// Get all backup files' description and save them to files
wait(collectBackupFiles(self->bc, &files, cx, request));
self->buildVersionBatches(files, &self->versionBatches); // Divide files into version batches
wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, cx, request));
std::sort(rangeFiles.begin(), rangeFiles.end());
std::sort(logFiles.begin(), logFiles.end(), [](RestoreFileFR const& f1, RestoreFileFR const& f2) -> bool {
return std::tie(f1.endVersion, f1.beginVersion, f1.fileIndex) <
std::tie(f2.endVersion, f2.beginVersion, f2.fileIndex);
});
self->buildVersionBatches(rangeFiles, logFiles, &self->versionBatches); // Divide files into version batches
self->dumpVersionBatches(self->versionBatches);
ASSERT(self->batchIndex == 1); // versionBatchIndex starts at 1 because NotifiedVersion starts at 0
for (versionBatch = self->versionBatches.begin(); versionBatch != self->versionBatches.end(); versionBatch++) {
@ -267,10 +276,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
std::vector<std::pair<UID, RestoreLoadFileRequest>> requests;
std::map<UID, RestoreLoaderInterface>::iterator loader = self->loadersInterf.begin();
// TODO: Remove files that are empty before proceed
// ASSERT(files->size() > 0); // files should not be empty
Version prevVersion = 0;
for (auto& file : *files) {
// NOTE: Cannot skip empty files because empty files, e.g., log file, still need to generate dummy mutation to
// drive applier's NotifiedVersion.
@ -279,13 +284,12 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
}
// Prepare loading
LoadingParam param;
param.prevVersion = 0; // Each file's NotifiedVersion starts from 0
param.endVersion = file.isRange ? file.version : file.endVersion;
param.url = request.url;
param.isRangeFile = file.isRange;
param.rangeVersion = file.isRange ? file.version : -1;
param.blockSize = file.blockSize;
param.asset.uid = deterministicRandom()->randomUniqueID();
param.asset.filename = file.fileName;
param.asset.fileIndex = file.fileIndex;
param.asset.offset = 0;
@ -294,14 +298,11 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
param.asset.beginVersion = versionBatch.beginVersion;
param.asset.endVersion = versionBatch.endVersion;
prevVersion = param.endVersion;
// Log file to be loaded
TraceEvent("FastRestore").detail("LoadParam", param.toString()).detail("LoaderID", loader->first.toString());
ASSERT_WE_THINK(param.asset.len >= 0); // we may load an empty file
ASSERT_WE_THINK(param.asset.len > 0);
ASSERT_WE_THINK(param.asset.offset >= 0);
ASSERT_WE_THINK(param.asset.offset <= file.fileSize);
ASSERT_WE_THINK(param.prevVersion <= param.endVersion);
ASSERT_WE_THINK(param.asset.beginVersion <= param.asset.endVersion);
requests.emplace_back(loader->first, RestoreLoadFileRequest(param));
loader++;
@ -444,16 +445,18 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
}
// Collect the backup files' description into output_files by reading the backupContainer bc.
ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* files,
Database cx, RestoreRequest request) {
ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
std::vector<RestoreFileFR>* logFiles, Database cx,
RestoreRequest request) {
state BackupDescription desc = wait(bc->describeBackup());
// Convert version to real time for operators to read the BackupDescription desc.
wait(desc.resolveVersionTimes(cx));
TraceEvent("FastRestore").detail("BackupDesc", desc.toString());
if (request.targetVersion == invalidVersion && desc.maxRestorableVersion.present())
if (request.targetVersion == invalidVersion && desc.maxRestorableVersion.present()) {
request.targetVersion = desc.maxRestorableVersion.get();
}
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(request.targetVersion));
@ -462,22 +465,26 @@ ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std
throw restore_missing_data();
}
if (!files->empty()) {
TraceEvent(SevError, "FastRestore").detail("ClearOldFiles", files->size());
files->clear();
}
ASSERT(rangeFiles->empty());
ASSERT(logFiles->empty());
for (const RangeFile& f : restorable.get().ranges) {
TraceEvent("FastRestore").detail("RangeFile", f.toString());
if (f.fileSize <= 0) {
continue;
}
RestoreFileFR file(f.version, f.fileName, true, f.blockSize, f.fileSize, f.version, f.version);
TraceEvent("FastRestore").detail("RangeFileFR", file.toString());
files->push_back(file);
rangeFiles->push_back(file);
}
for (const LogFile& f : restorable.get().logs) {
TraceEvent("FastRestore").detail("LogFile", f.toString());
if (f.fileSize <= 0) {
continue;
}
RestoreFileFR file(f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion, f.beginVersion);
TraceEvent("FastRestore").detail("LogFileFR", file.toString());
files->push_back(file);
logFiles->push_back(file);
}
return Void();

View File

@ -29,6 +29,7 @@
#include <sstream>
#include "flow/Stats.h"
#include "flow/Platform.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/fdbrpc.h"
@ -44,11 +45,24 @@ extern int restoreStatusIndex;
struct VersionBatch {
Version beginVersion; // Inclusive
Version endVersion; // Inclusive if it has log files, exclusive if it has only range file
Version endVersion; // exclusive
std::vector<RestoreFileFR> logFiles;
std::vector<RestoreFileFR> rangeFiles;
double size; // size of data in range and log files
VersionBatch() = default;
bool isEmpty() { return logFiles.empty() && rangeFiles.empty(); }
void reset() {
beginVersion = 0;
endVersion = 0;
logFiles.clear();
rangeFiles.clear();
size = 0;
}
// RestoreAsset and VersionBatch both use endVersion as exclusive in version range
bool isInVersionRange(Version version) const { return version >= beginVersion && version < endVersion; }
};
struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMasterData> {
@ -90,97 +104,191 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
return ss.str();
}
// Split allFiles into multiple versionBatches based on files' version
void buildVersionBatches(const std::vector<RestoreFileFR>& allFiles,
std::map<Version, VersionBatch>* versionBatches) {
// A version batch includes a log file; Because log file's verion range does not overlap,
// we use log file's version range as the version range of a version batch.
Version beginVersion = 0;
Version maxVersion = 0;
for (int i = 0; i < allFiles.size(); ++i) {
if (!allFiles[i].isRange) {
ASSERT(versionBatches->find(allFiles[i].beginVersion) == versionBatches->end());
VersionBatch vb;
vb.beginVersion = beginVersion;
vb.endVersion = allFiles[i].endVersion;
versionBatches->insert(std::make_pair(vb.beginVersion, vb));
//(*versionBatches)[vb.beginVersion] = vb; // Ensure continuous version range across version batches
beginVersion = allFiles[i].endVersion;
void dumpVersionBatches(const std::map<Version, VersionBatch>& versionBatches) {
int i = 0;
for (auto& vb : versionBatches) {
TraceEvent("FastRestoreVersionBatches")
.detail("BatchIndex", i)
.detail("BeginVersion", vb.second.beginVersion)
.detail("EndVersion", vb.second.endVersion)
.detail("Size", vb.second.size);
for (auto& f : vb.second.rangeFiles) {
bool invalidVersion = (f.beginVersion != f.endVersion) || (f.beginVersion >= vb.second.endVersion ||
f.beginVersion < vb.second.beginVersion);
TraceEvent(invalidVersion ? SevError : SevInfo, "FastRestoreVersionBatches")
.detail("BatchIndex", i)
.detail("RangeFile", f.toString());
}
if (maxVersion < allFiles[i].endVersion) {
maxVersion = allFiles[i].endVersion;
for (auto& f : vb.second.logFiles) {
bool outOfRange = (f.beginVersion >= vb.second.endVersion || f.endVersion <= vb.second.beginVersion);
TraceEvent(outOfRange ? SevError : SevInfo, "FastRestoreVersionBatches")
.detail("BatchIndex", i)
.detail("LogFile", f.toString());
}
++i;
}
// In case there is no log file
if (versionBatches->empty()) {
VersionBatch vb;
vb.beginVersion = 0;
vb.endVersion = maxVersion + 1; // version batch's endVersion is exclusive
versionBatches->insert(std::make_pair(vb.beginVersion, vb));
//(*versionBatches)[vb.beginVersion] = vb; // We ensure the version range are continuous across version batches
}
// Put range and log files into its version batch
for (int i = 0; i < allFiles.size(); ++i) {
// vbiter's beginVersion > allFiles[i].beginVersion.
std::map<Version, VersionBatch>::iterator vbIter = versionBatches->upper_bound(allFiles[i].beginVersion);
--vbIter;
ASSERT_WE_THINK(vbIter != versionBatches->end());
if (allFiles[i].isRange) {
vbIter->second.rangeFiles.push_back(allFiles[i]);
}
// Input: Get the size of data in backup files in version range [prevVersion, nextVersion)
// Return: param1: the size of data at nextVersion, param2: the minimum range file index whose version >
// nextVersion, param3: log files with data in [prevVersion, nextVersion)
std::tuple<double, int, std::vector<RestoreFileFR>> getVersionSize(Version prevVersion, Version nextVersion,
const std::vector<RestoreFileFR>& rangeFiles,
int rangeIdx,
const std::vector<RestoreFileFR>& logFiles) {
double size = 0;
TraceEvent("FastRestoreGetVersionSize")
.detail("PreviousVersion", prevVersion)
.detail("NextVersion", nextVersion)
.detail("RangeFiles", rangeFiles.size())
.detail("RangeIndex", rangeIdx)
.detail("LogFiles", logFiles.size());
ASSERT(prevVersion <= nextVersion);
while (rangeIdx < rangeFiles.size()) {
TraceEvent(SevDebug, "FastRestoreGetVersionSize").detail("RangeFile", rangeFiles[rangeIdx].toString());
if (rangeFiles[rangeIdx].version < nextVersion) {
ASSERT(rangeFiles[rangeIdx].version >= prevVersion);
size += rangeFiles[rangeIdx].fileSize;
} else {
vbIter->second.logFiles.push_back(allFiles[i]);
break;
}
++rangeIdx;
}
int logIdx = 0;
std::vector<RestoreFileFR> retLogs;
// Scan all logFiles every time to avoid assumption on log files' version ranges.
// For example, we do not assume each version range only exists in one log file
while (logIdx < logFiles.size()) {
Version begin = std::max(prevVersion, logFiles[logIdx].beginVersion);
Version end = std::min(nextVersion, logFiles[logIdx].endVersion);
if (begin < end) { // logIdx file overlap in [prevVersion, nextVersion)
double ratio = (end - begin) * 1.0 / (logFiles[logIdx].endVersion - logFiles[logIdx].beginVersion);
size += logFiles[logIdx].fileSize * ratio;
retLogs.push_back(logFiles[logIdx]);
}
++logIdx;
}
return std::make_tuple(size, rangeIdx, retLogs);
}
// Split backup files into version batches, each of which has similar data size
// Input: sorted range files, sorted log files;
// Output: a set of version batches whose size is less than opConfig.batchSizeThreshold
// and each mutation in backup files is included in the version batches exactly once.
// Assumption 1: input files has no empty files;
// Assumption 2: range files at one version <= batchSizeThreshold.
// Note: We do not allow a versionBatch size larger than the batchSizeThreshold because the range file size at
// a version depends on the number of backupAgents and its upper bound is hard to get.
void buildVersionBatches(const std::vector<RestoreFileFR>& rangeFiles, const std::vector<RestoreFileFR>& logFiles,
std::map<Version, VersionBatch>* versionBatches) {
bool rewriteNextVersion = false;
int rangeIdx = 0;
int logIdx = 0; // Ensure each log file is included in version batch
Version prevEndVersion = 0;
Version nextVersion = 0; // Used to calculate the batch's endVersion
VersionBatch vb;
vb.beginVersion = 0; // Version batch range [beginVersion, endVersion)
while (rangeIdx < rangeFiles.size() || logIdx < logFiles.size()) {
if (!rewriteNextVersion) {
if (rangeIdx < rangeFiles.size() && logIdx < logFiles.size()) {
// nextVersion as endVersion is exclusive in the version range
nextVersion = std::max(rangeFiles[rangeIdx].version + 1, nextVersion);
} else if (rangeIdx < rangeFiles.size()) { // i.e., logIdx >= logFiles.size()
nextVersion = rangeFiles[rangeIdx].version + 1;
} else if (logIdx < logFiles.size()) {
while (logIdx < logFiles.size() && logFiles[logIdx].endVersion <= nextVersion) {
logIdx++;
}
if (logIdx < logFiles.size()) {
nextVersion = logFiles[logIdx].endVersion;
} else {
break; // Finished all log files
}
} else {
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
.detail("RangeIndex", rangeIdx)
.detail("RangeFiles", rangeFiles.size())
.detail("LogIndex", logIdx)
.detail("LogFiles", logFiles.size());
}
} else {
rewriteNextVersion = false;
}
double nextVersionSize;
int nextRangeIdx;
std::vector<RestoreFileFR> curLogFiles;
std::tie(nextVersionSize, nextRangeIdx, curLogFiles) =
getVersionSize(prevEndVersion, nextVersion, rangeFiles, rangeIdx, logFiles);
TraceEvent("FastRestoreBuildVersionBatch")
.detail("VersionBatchBeginVersion", vb.beginVersion)
.detail("PreviousEndVersion", prevEndVersion)
.detail("NextVersion", nextVersion)
.detail("RangeIndex", rangeIdx)
.detail("RangeFiles", rangeFiles.size())
.detail("LogIndex", logIdx)
.detail("LogFiles", logFiles.size())
.detail("BatchSizeThreshold", opConfig.batchSizeThreshold)
.detail("CurrentBatchSize", vb.size)
.detail("NextVersionIntervalSize", nextVersionSize)
.detail("NextRangeIndex", nextRangeIdx)
.detail("UsedLogFiles", curLogFiles.size());
ASSERT(prevEndVersion < nextVersion); // Ensure progress
if (vb.size + nextVersionSize <= opConfig.batchSizeThreshold) {
// nextVersion should be included in this batch
vb.size += nextVersionSize;
while (rangeIdx < nextRangeIdx) {
ASSERT(rangeFiles[rangeIdx].fileSize > 0);
vb.rangeFiles.push_back(rangeFiles[rangeIdx]);
++rangeIdx;
}
for (auto& log : curLogFiles) {
ASSERT(log.beginVersion < nextVersion);
ASSERT(log.endVersion > prevEndVersion);
ASSERT(log.fileSize > 0);
vb.logFiles.push_back(log);
}
vb.endVersion = nextVersion;
prevEndVersion = vb.endVersion;
} else {
if (vb.size < 1) {
// [vb.endVersion, nextVersion) > opConfig.batchSizeThreshold. We should split the version range
if (prevEndVersion >= nextVersion) {
// If range files at one version > batchSizeThreshold, DBA should increase batchSizeThreshold to
// some value larger than nextVersion
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
.detail("NextVersion", nextVersion)
.detail("PreviousEndVersion", prevEndVersion)
.detail("NextVersionIntervalSize", nextVersionSize)
.detail("BatchSizeThreshold", opConfig.batchSizeThreshold)
.detail("SuggestedMinimumBatchSizeThreshold", nextVersion);
// Exit restore early if it won't succeed
flushAndExit(FDB_EXIT_ERROR);
}
ASSERT(prevEndVersion < nextVersion); // Ensure progress
nextVersion = (prevEndVersion + nextVersion) / 2;
rewriteNextVersion = true;
TraceEvent("FastRestoreBuildVersionBatch")
.detail("NextVersionIntervalSize", nextVersionSize); // Duplicate Trace
continue;
}
// Finalize the current version batch
versionBatches->emplace(vb.beginVersion, vb); // copy vb to versionBatch
// start finding the next version batch
vb.reset();
vb.size = 0;
vb.beginVersion = prevEndVersion;
}
}
// Sort files in each of versionBatches and set fileIndex, which is used in deduplicating mutations sent from
// loader to applier.
// Assumption: fileIndex starts at 1. Each loader's initized fileIndex (NotifiedVersion type) starts at 0
int fileIndex = 0; // fileIndex must be unique; ideally it continuously increase across verstionBatches for
// easier progress tracking
int versionBatchId = 1;
for (auto versionBatch = versionBatches->begin(); versionBatch != versionBatches->end(); versionBatch++) {
std::sort(versionBatch->second.rangeFiles.begin(), versionBatch->second.rangeFiles.end());
std::sort(versionBatch->second.logFiles.begin(), versionBatch->second.logFiles.end());
for (auto& logFile : versionBatch->second.logFiles) {
logFile.fileIndex = ++fileIndex;
TraceEvent("FastRestore")
.detail("VersionBatchId", versionBatchId)
.detail("LogFile", logFile.toString());
}
for (auto& rangeFile : versionBatch->second.rangeFiles) {
rangeFile.fileIndex = ++fileIndex;
TraceEvent("FastRestore")
.detail("VersionBatchId", versionBatchId)
.detail("RangeFile", rangeFile.toString());
}
versionBatchId++;
}
TraceEvent("FastRestore").detail("VersionBatches", versionBatches->size());
// Sanity check
std::set<uint32_t> fIndexSet;
for (auto& versionBatch : *versionBatches) {
Version prevVersion = 0;
for (auto& logFile : versionBatch.second.logFiles) {
TraceEvent("FastRestore").detail("PrevVersion", prevVersion).detail("LogFile", logFile.toString());
ASSERT(logFile.beginVersion >= versionBatch.second.beginVersion);
ASSERT(logFile.endVersion <= versionBatch.second.endVersion);
ASSERT(prevVersion <= logFile.beginVersion);
prevVersion = logFile.endVersion;
ASSERT(fIndexSet.find(logFile.fileIndex) == fIndexSet.end());
fIndexSet.insert(logFile.fileIndex);
}
prevVersion = 0;
for (auto& rangeFile : versionBatch.second.rangeFiles) {
TraceEvent("FastRestore").detail("PrevVersion", prevVersion).detail("RangeFile", rangeFile.toString());
ASSERT(rangeFile.beginVersion == rangeFile.endVersion);
ASSERT(rangeFile.beginVersion >= versionBatch.second.beginVersion);
ASSERT(rangeFile.endVersion < versionBatch.second.endVersion);
ASSERT(prevVersion <= rangeFile.beginVersion);
prevVersion = rangeFile.beginVersion;
ASSERT(fIndexSet.find(rangeFile.fileIndex) == fIndexSet.end());
fIndexSet.insert(rangeFile.fileIndex);
}
// The last wip version batch has some files
if (vb.size > 0) {
vb.endVersion = nextVersion;
versionBatches->emplace(vb.beginVersion, vb);
}
}
@ -224,4 +332,4 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker, Database cx);
#include "flow/unactorcompiler.h"
#endif
#endif

View File

@ -55,6 +55,8 @@ struct FastRestoreOpConfig {
int num_appliers = 40;
// transactionBatchSizeThreshold is used when applier applies multiple mutations in a transaction to DB
double transactionBatchSizeThreshold = 512; // 512 in Bytes
// batchSizeThreshold is the maximum data size in each version batch
double batchSizeThreshold = 10.0 * 1024.0 * 1024.0 * 1024.0; // 10 GB
};
extern FastRestoreOpConfig opConfig;

View File

@ -185,8 +185,10 @@ ACTOR Future<Void> monitorWorkerLiveness(Reference<RestoreWorkerData> self) {
void initRestoreWorkerConfig() {
opConfig.num_loaders = g_network->isSimulated() ? 3 : opConfig.num_loaders;
opConfig.num_appliers = g_network->isSimulated() ? 3 : opConfig.num_appliers;
// TODO: Set the threshold to a random value in a range
opConfig.transactionBatchSizeThreshold =
g_network->isSimulated() ? 512 : opConfig.transactionBatchSizeThreshold; // Byte
opConfig.batchSizeThreshold = g_network->isSimulated() ? 10 * 1024 * 1024 : opConfig.batchSizeThreshold; // Byte
TraceEvent("FastRestore")
.detail("InitOpConfig", "Result")
.detail("NumLoaders", opConfig.num_loaders)