FastRestore:Filter out empty files before distributing workload
and clean up unused code
This commit is contained in:
parent
c29e380076
commit
a2b26906e8
|
@ -246,16 +246,12 @@ struct RestoreAsset {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: It is probably better to specify the (beginVersion, endVersion] for each loadingParam.
|
|
||||||
// beginVersion (endVersion) is the version the applier is before (after) it receives the request.
|
|
||||||
struct LoadingParam {
|
struct LoadingParam {
|
||||||
constexpr static FileIdentifier file_identifier = 17023837;
|
constexpr static FileIdentifier file_identifier = 17023837;
|
||||||
|
|
||||||
bool isRangeFile;
|
bool isRangeFile;
|
||||||
Key url;
|
Key url;
|
||||||
//Version prevVersion;
|
|
||||||
Version rangeVersion; // range file's version
|
Version rangeVersion; // range file's version
|
||||||
// Version endVersion; // range file's mutations are all at the endVersion
|
|
||||||
|
|
||||||
int64_t blockSize;
|
int64_t blockSize;
|
||||||
RestoreAsset asset;
|
RestoreAsset asset;
|
||||||
|
@ -271,14 +267,11 @@ struct LoadingParam {
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
// serializer(ar, isRangeFile, url, prevVersion, endVersion, blockSize, asset);
|
|
||||||
serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset);
|
serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string toString() {
|
std::string toString() {
|
||||||
std::stringstream str;
|
std::stringstream str;
|
||||||
// str << "isRangeFile:" << isRangeFile << " url:" << url.toString() << " prevVersion:" << prevVersion
|
|
||||||
// << " endVersion:" << endVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
|
|
||||||
str << "isRangeFile:" << isRangeFile << " url:" << url.toString()
|
str << "isRangeFile:" << isRangeFile << " url:" << url.toString()
|
||||||
<< " rangeVersion:" << rangeVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
|
<< " rangeVersion:" << rangeVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
|
||||||
return str.str();
|
return str.str();
|
||||||
|
|
|
@ -273,9 +273,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
|
||||||
std::vector<std::pair<UID, RestoreLoadFileRequest>> requests;
|
std::vector<std::pair<UID, RestoreLoadFileRequest>> requests;
|
||||||
std::map<UID, RestoreLoaderInterface>::iterator loader = self->loadersInterf.begin();
|
std::map<UID, RestoreLoaderInterface>::iterator loader = self->loadersInterf.begin();
|
||||||
|
|
||||||
// TODO: Remove files that are empty before proceed
|
|
||||||
// ASSERT(files->size() > 0); // files should not be empty
|
|
||||||
|
|
||||||
Version prevVersion = 0;
|
Version prevVersion = 0;
|
||||||
for (auto& file : *files) {
|
for (auto& file : *files) {
|
||||||
// NOTE: Cannot skip empty files because empty files, e.g., log file, still need to generate dummy mutation to
|
// NOTE: Cannot skip empty files because empty files, e.g., log file, still need to generate dummy mutation to
|
||||||
|
@ -285,9 +282,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
|
||||||
}
|
}
|
||||||
// Prepare loading
|
// Prepare loading
|
||||||
LoadingParam param;
|
LoadingParam param;
|
||||||
|
|
||||||
//param.prevVersion = 0; // Each file's NotifiedVersion starts from 0
|
|
||||||
// param.endVersion = file.isRange ? file.version : file.endVersion;
|
|
||||||
param.url = request.url;
|
param.url = request.url;
|
||||||
param.isRangeFile = file.isRange;
|
param.isRangeFile = file.isRange;
|
||||||
param.rangeVersion = file.isRange ? file.version : -1;
|
param.rangeVersion = file.isRange ? file.version : -1;
|
||||||
|
@ -304,9 +298,8 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
|
||||||
|
|
||||||
prevVersion = param.asset.endVersion;
|
prevVersion = param.asset.endVersion;
|
||||||
|
|
||||||
// Log file to be loaded
|
|
||||||
TraceEvent("FastRestore").detail("LoadParam", param.toString()).detail("LoaderID", loader->first.toString());
|
TraceEvent("FastRestore").detail("LoadParam", param.toString()).detail("LoaderID", loader->first.toString());
|
||||||
ASSERT_WE_THINK(param.asset.len >= 0); // we may load an empty file
|
ASSERT_WE_THINK(param.asset.len > 0); // TODO: ensure empty files are not included here
|
||||||
ASSERT_WE_THINK(param.asset.offset >= 0);
|
ASSERT_WE_THINK(param.asset.offset >= 0);
|
||||||
ASSERT_WE_THINK(param.asset.offset <= file.fileSize);
|
ASSERT_WE_THINK(param.asset.offset <= file.fileSize);
|
||||||
ASSERT_WE_THINK(param.asset.beginVersion <= param.asset.endVersion);
|
ASSERT_WE_THINK(param.asset.beginVersion <= param.asset.endVersion);
|
||||||
|
|
|
@ -127,7 +127,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
||||||
|
|
||||||
// Input: Get the size of data in backup files in version range [prevVersion, nextVersion)
|
// Input: Get the size of data in backup files in version range [prevVersion, nextVersion)
|
||||||
// Return: param1: the size of data at nextVersion, param2: the minimum range file index whose version >
|
// Return: param1: the size of data at nextVersion, param2: the minimum range file index whose version >
|
||||||
// nextVersion, param3: log files with data in [prevVersion, nextVersion]
|
// nextVersion, param3: log files with data in [prevVersion, nextVersion)
|
||||||
std::tuple<double, int, std::vector<RestoreFileFR>> getVersionSize(Version prevVersion, Version nextVersion,
|
std::tuple<double, int, std::vector<RestoreFileFR>> getVersionSize(Version prevVersion, Version nextVersion,
|
||||||
const std::vector<RestoreFileFR>& rangeFiles,
|
const std::vector<RestoreFileFR>& rangeFiles,
|
||||||
int rangeIdx,
|
int rangeIdx,
|
||||||
|
@ -170,9 +170,9 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
||||||
// Split backup files into version batches, each of which has similar data size
|
// Split backup files into version batches, each of which has similar data size
|
||||||
// Input: sorted range files, sorted log files;
|
// Input: sorted range files, sorted log files;
|
||||||
// Output: a set of version batches whose size is less than opConfig.batchSizeThreshold
|
// Output: a set of version batches whose size is less than opConfig.batchSizeThreshold
|
||||||
// and each mutation data in backup files is included in the version batches exactly once
|
// and each mutation data in backup files is included in the version batches exactly once.
|
||||||
// Assumption 1: input files has no empty files
|
// Assumption 1: input files has no empty files;
|
||||||
// Assumption 2: range files at one version > batchSizeThreshold
|
// Assumption 2: range files at one version > batchSizeThreshold.
|
||||||
void buildVersionBatches(const std::vector<RestoreFileFR>& rangeFiles, const std::vector<RestoreFileFR>& logFiles,
|
void buildVersionBatches(const std::vector<RestoreFileFR>& rangeFiles, const std::vector<RestoreFileFR>& logFiles,
|
||||||
std::map<Version, VersionBatch>* versionBatches) {
|
std::map<Version, VersionBatch>* versionBatches) {
|
||||||
// Version batch range [beginVersion, endVersion)
|
// Version batch range [beginVersion, endVersion)
|
||||||
|
@ -204,9 +204,9 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
|
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
|
||||||
.detail("RangeIdx", rangeIdx)
|
.detail("RangeIndex", rangeIdx)
|
||||||
.detail("RangeFiles", rangeFiles.size())
|
.detail("RangeFiles", rangeFiles.size())
|
||||||
.detail("LogIdx", logIdx)
|
.detail("LogIndex", logIdx)
|
||||||
.detail("LogFiles", logFiles.size());
|
.detail("LogFiles", logFiles.size());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -257,8 +257,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
||||||
if (batchSize < 1) {
|
if (batchSize < 1) {
|
||||||
// [vb.endVersion, nextVersion) > opConfig.batchSizeThreshold. We should split the version range
|
// [vb.endVersion, nextVersion) > opConfig.batchSizeThreshold. We should split the version range
|
||||||
if (prevEndVersion >= nextVersion) {
|
if (prevEndVersion >= nextVersion) {
|
||||||
// If range files at one version > batchSizeThreshold, DBA should increase the
|
// If range files at one version > batchSizeThreshold, DBA should increase batchSizeThreshold
|
||||||
// batchSizeThreshold
|
|
||||||
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
|
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
|
||||||
.detail("NextVersion", nextVersion)
|
.detail("NextVersion", nextVersion)
|
||||||
.detail("PrevEndVersion", prevEndVersion)
|
.detail("PrevEndVersion", prevEndVersion)
|
||||||
|
@ -273,7 +272,6 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Finalize the current version batch
|
// Finalize the current version batch
|
||||||
// vb.endVersion = nextVersion;
|
|
||||||
vb.size = batchSize;
|
vb.size = batchSize;
|
||||||
versionBatches->emplace(vb.beginVersion, vb); // copy vb to versionBatch
|
versionBatches->emplace(vb.beginVersion, vb); // copy vb to versionBatch
|
||||||
// start finding the next version batch
|
// start finding the next version batch
|
||||||
|
|
Loading…
Reference in New Issue