FastRestore:Resolve review comment

1) Sort logfiles by endVersion

2) Exit program early when restore will not succeed

3) Do not increase nextVersion unncessarily when
calculate version batches.

4) Change assert condition that ensures progress in
calculating version batches.
This commit is contained in:
Meng Xu 2020-01-09 11:31:27 -08:00
parent dba85d28fc
commit f436ea806e
5 changed files with 30 additions and 25 deletions

View File

@ -251,7 +251,7 @@ struct LoadingParam {
bool isRangeFile;
Key url;
Version rangeVersion; // range file's version
Optional<Version> rangeVersion; // range file's version
int64_t blockSize;
RestoreAsset asset;
@ -273,7 +273,8 @@ struct LoadingParam {
std::string toString() {
std::stringstream str;
str << "isRangeFile:" << isRangeFile << " url:" << url.toString()
<< " rangeVersion:" << rangeVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
<< " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize
<< " RestoreAsset:" << asset.toString();
return str.str();
}
};

View File

@ -44,8 +44,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self);
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* kvOps,
bool isRangeFile, Version endVersion,
RestoreAsset asset);
bool isRangeFile, RestoreAsset asset);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap,
@ -154,7 +153,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
subAsset.len = std::min<int64_t>(param.blockSize, param.asset.len - j);
if (param.isRangeFile) {
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc,
param.rangeVersion, subAsset));
param.rangeVersion.get(), subAsset));
} else {
// TODO: Sanity check the log file's range is overlapped with the restored version range
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
@ -198,8 +197,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
for (; item != self->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {
// Send the parsed mutation to applier who will apply the mutation to DB
wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile,
item->first.asset.endVersion, item->first.asset));
wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.asset));
}
}
@ -210,8 +208,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
// TODO: This function can be revised better
// Assume: kvOps data are from the same file.
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* pkvOps,
bool isRangeFile, Version endVersion,
RestoreAsset asset) {
bool isRangeFile, RestoreAsset asset) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
@ -223,12 +220,12 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
TraceEvent("FastRestore_SendMutationToApplier")
.detail("Loader", self->id())
.detail("IsRangeFile", isRangeFile)
.detail("EndVersion", endVersion)
.detail("EndVersion", asset.endVersion)
.detail("RestoreAsset", asset.toString());
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if (kvOps.find(endVersion) == kvOps.end()) {
kvOps[endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
if (kvOps.find(asset.endVersion) == kvOps.end()) {
kvOps[asset.endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
}
splitMutationIndex = 0;

View File

@ -237,7 +237,10 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, cx, request));
std::sort(rangeFiles.begin(), rangeFiles.end());
std::sort(logFiles.begin(), logFiles.end());
std::sort(logFiles.begin(), logFiles.end(), [](RestoreFileFR const& f1, RestoreFileFR const& f2) -> bool {
return std::tie(f1.endVersion, f1.beginVersion, f1.fileIndex) <
std::tie(f2.endVersion, f2.beginVersion, f2.fileIndex);
});
self->buildVersionBatches(rangeFiles, logFiles, &self->versionBatches); // Divide files into version batches
self->dumpVersionBatches(self->versionBatches);
@ -273,7 +276,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
std::vector<std::pair<UID, RestoreLoadFileRequest>> requests;
std::map<UID, RestoreLoaderInterface>::iterator loader = self->loadersInterf.begin();
Version prevVersion = 0;
for (auto& file : *files) {
// NOTE: Cannot skip empty files because empty files, e.g., log file, still need to generate dummy mutation to
// drive applier's NotifiedVersion.
@ -296,8 +298,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
param.asset.beginVersion = versionBatch.beginVersion;
param.asset.endVersion = versionBatch.endVersion;
prevVersion = param.asset.endVersion;
TraceEvent("FastRestore").detail("LoadParam", param.toString()).detail("LoaderID", loader->first.toString());
ASSERT_WE_THINK(param.asset.len > 0);
ASSERT_WE_THINK(param.asset.offset >= 0);

View File

@ -29,6 +29,7 @@
#include <sstream>
#include "flow/Stats.h"
#include "flow/Platform.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/fdbrpc.h"
@ -49,6 +50,8 @@ struct VersionBatch {
std::vector<RestoreFileFR> rangeFiles;
double size; // size of data in range and log files
VersionBatch() = default;
bool isEmpty() { return logFiles.empty() && rangeFiles.empty(); }
void reset() {
beginVersion = 0;
@ -159,7 +162,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
Version begin = std::max(prevVersion, logFiles[logIdx].beginVersion);
Version end = std::min(nextVersion, logFiles[logIdx].endVersion);
if (begin < end) { // logIdx file overlap in [prevVersion, nextVersion)
double ratio = (end - begin) / (logFiles[logIdx].endVersion - logFiles[logIdx].beginVersion);
double ratio = (end - begin) * 1.0 / (logFiles[logIdx].endVersion - logFiles[logIdx].beginVersion);
size += logFiles[logIdx].fileSize * ratio;
retLogs.push_back(logFiles[logIdx]);
}
@ -174,6 +177,8 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
// and each mutation in backup files is included in the version batches exactly once.
// Assumption 1: input files has no empty files;
// Assumption 2: range files at one version <= batchSizeThreshold.
// Note: We do not allow a versoinBatch size larger than the batchSizeThreshold because the range file size at
// a version depends on the number of backupAgents and its upper bound is hard to get.
void buildVersionBatches(const std::vector<RestoreFileFR>& rangeFiles, const std::vector<RestoreFileFR>& logFiles,
std::map<Version, VersionBatch>* versionBatches) {
bool rewriteNextVersion = false;
@ -231,36 +236,39 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
.detail("NextRangeIndex", nextRangeIdx)
.detail("UsedLogFiles", curLogFiles.size());
ASSERT(prevEndVersion < nextVersion); // Ensure progress
if (vb.size + nextVersionSize <= opConfig.batchSizeThreshold) {
// nextVersion should be included in this batch
bool advanced = false;
vb.size += nextVersionSize;
while (rangeIdx < nextRangeIdx) {
ASSERT(rangeFiles[rangeIdx].fileSize > 0);
vb.rangeFiles.push_back(rangeFiles[rangeIdx]);
++rangeIdx;
advanced = true;
}
for (auto& log : curLogFiles) {
ASSERT(log.beginVersion < nextVersion);
ASSERT(log.endVersion > prevEndVersion);
ASSERT(log.fileSize > 0);
vb.logFiles.push_back(log);
advanced = true;
}
ASSERT(advanced == true || nextVersionSize > 0); // Ensure progress
vb.endVersion = nextVersion;
prevEndVersion = vb.endVersion;
} else {
if (vb.size < 1) {
// [vb.endVersion, nextVersion) > opConfig.batchSizeThreshold. We should split the version range
if (prevEndVersion >= nextVersion) {
// If range files at one version > batchSizeThreshold, DBA should increase batchSizeThreshold
// If range files at one version > batchSizeThreshold, DBA should increase batchSizeThreshold to
// some value larger than nextVersion
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
.detail("NextVersion", nextVersion)
.detail("PreviousEndVersion", prevEndVersion)
.detail("NextVersionIntervalSize", nextVersionSize)
.detail("BatchSizeThreshold", opConfig.batchSizeThreshold);
.detail("BatchSizeThreshold", opConfig.batchSizeThreshold)
.detail("SuggestedMinimumBatchSizeThreshold", nextVersion);
// Exit restore early if it won't succeed
flushAndExit(FDB_EXIT_ERROR);
}
ASSERT(prevEndVersion < nextVersion); // Ensure progress
nextVersion = (prevEndVersion + nextVersion) / 2;
@ -275,7 +283,6 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
vb.reset();
vb.size = 0;
vb.beginVersion = prevEndVersion;
nextVersion = prevEndVersion + 1;
}
}
// The last wip version batch has some files

View File

@ -56,7 +56,7 @@ struct FastRestoreOpConfig {
// transactionBatchSizeThreshold is used when applier applies multiple mutations in a transaction to DB
double transactionBatchSizeThreshold = 512; // 512 in Bytes
// batchSizeThreshold is the maximum data size in each version batch
double batchSizeThreshold = 10.0 * 1024.0 * 1024.0 * 1024.0; // 1 GB
double batchSizeThreshold = 10.0 * 1024.0 * 1024.0 * 1024.0; // 10 GB
};
extern FastRestoreOpConfig opConfig;