Change LoadingParam struct and endVersion definition
1) Remove endVersion field because it has been included in RestoreAsset; 2) Ensure endVersion in VersionBatch and RestoreAsset is always exclusive; 3) Revise ASSERT in laoder and applier in situations when the dummy commit version is endVersion, to avoid false positive ASSERT failure.
This commit is contained in:
parent
c3f8f3b445
commit
67e913c3d5
|
@ -236,7 +236,7 @@ struct RestoreAsset {
|
|||
|
||||
std::string toString() {
|
||||
std::stringstream ss;
|
||||
ss << "UID" << uid.toString() << "begin:" << beginVersion << " end:" << endVersion << " range:" << range.toString()
|
||||
ss << "UID:" << uid.toString() << " begin:" << beginVersion << " end:" << endVersion << " range:" << range.toString()
|
||||
<< " filename:" << filename << " fileIndex:" << fileIndex << " offset:" << offset << " len:" << len;
|
||||
return ss.str();
|
||||
}
|
||||
|
@ -255,7 +255,8 @@ struct LoadingParam {
|
|||
bool isRangeFile;
|
||||
Key url;
|
||||
Version prevVersion;
|
||||
Version endVersion; // range file's mutations are all at the endVersion
|
||||
Version rangeVersion; // range file's version
|
||||
//Version endVersion; // range file's mutations are all at the endVersion
|
||||
|
||||
int64_t blockSize;
|
||||
RestoreAsset asset;
|
||||
|
@ -271,13 +272,16 @@ struct LoadingParam {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, isRangeFile, url, prevVersion, endVersion, blockSize, asset);
|
||||
//serializer(ar, isRangeFile, url, prevVersion, endVersion, blockSize, asset);
|
||||
serializer(ar, isRangeFile, url, prevVersion, rangeVersion, blockSize, asset);
|
||||
}
|
||||
|
||||
std::string toString() {
|
||||
std::stringstream str;
|
||||
// str << "isRangeFile:" << isRangeFile << " url:" << url.toString() << " prevVersion:" << prevVersion
|
||||
// << " endVersion:" << endVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
|
||||
str << "isRangeFile:" << isRangeFile << " url:" << url.toString() << " prevVersion:" << prevVersion
|
||||
<< " endVersion:" << endVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
|
||||
<< " rangeVersion:" << rangeVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
|
||||
return str.str();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -112,7 +112,8 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
// Sanity check: mutations in range file is in [beginVersion, endVersion);
|
||||
// mutations in log file is in [beginVersion, endVersion], both inclusive.
|
||||
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion);
|
||||
ASSERT_WE_THINK((req.isRangeFile && commitVersion < req.asset.endVersion) ||
|
||||
// Loader sends the endVersion to ensure all useful versions are sent
|
||||
ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) ||
|
||||
(!req.isRangeFile && commitVersion <= req.asset.endVersion));
|
||||
|
||||
if (self->kvOps.find(commitVersion) == self->kvOps.end()) {
|
||||
|
|
|
@ -154,7 +154,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
|
|||
subAsset.len = std::min<int64_t>(param.blockSize, param.asset.len - j);
|
||||
if (param.isRangeFile) {
|
||||
fileParserFutures.push_back(
|
||||
_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc, param.endVersion, subAsset));
|
||||
_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc, param.rangeVersion, subAsset));
|
||||
} else {
|
||||
// TODO: Sanity check the log file's range is overlapped with the restored version range
|
||||
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
|
||||
|
@ -199,7 +199,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
if (item->first.isRangeFile == req.useRangeFile) {
|
||||
// Send the parsed mutation to applier who will apply the mutation to DB
|
||||
wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.prevVersion,
|
||||
item->first.endVersion, item->first.asset));
|
||||
item->first.asset.endVersion, item->first.asset));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -245,11 +245,11 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
|
|||
applierMutationsSize[applierID] = 0.0;
|
||||
}
|
||||
Version commitVersion = kvOp->first;
|
||||
if (!(commitVersion >= asset.beginVersion && commitVersion < asset.endVersion)) { // Debug purpose
|
||||
if (!(commitVersion >= asset.beginVersion && commitVersion <= asset.endVersion)) { // Debug purpose
|
||||
TraceEvent(SevError, "FastRestore_SendMutationsToApplier").detail("CommitVersion", commitVersion).detail("RestoreAsset", asset.toString());
|
||||
}
|
||||
ASSERT(commitVersion >= asset.beginVersion);
|
||||
ASSERT(commitVersion < asset.endVersion);
|
||||
ASSERT(commitVersion <= asset.endVersion); // endVersion is an empty commit to ensure progress
|
||||
|
||||
for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
|
||||
MutationRef kvm = kvOp->second[mIndex];
|
||||
|
|
|
@ -288,9 +288,10 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
|
|||
LoadingParam param;
|
||||
|
||||
param.prevVersion = 0; // Each file's NotifiedVersion starts from 0
|
||||
param.endVersion = file.isRange ? file.version : file.endVersion;
|
||||
//param.endVersion = file.isRange ? file.version : file.endVersion;
|
||||
param.url = request.url;
|
||||
param.isRangeFile = file.isRange;
|
||||
param.rangeVersion = file.isRange ? file.version : -1;
|
||||
param.blockSize = file.blockSize;
|
||||
|
||||
param.asset.filename = file.fileName;
|
||||
|
@ -301,14 +302,14 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
|
|||
param.asset.beginVersion = versionBatch.beginVersion;
|
||||
param.asset.endVersion = versionBatch.endVersion;
|
||||
|
||||
prevVersion = param.endVersion;
|
||||
prevVersion = param.asset.endVersion;
|
||||
|
||||
// Log file to be loaded
|
||||
TraceEvent("FastRestore").detail("LoadParam", param.toString()).detail("LoaderID", loader->first.toString());
|
||||
ASSERT_WE_THINK(param.asset.len >= 0); // we may load an empty file
|
||||
ASSERT_WE_THINK(param.asset.offset >= 0);
|
||||
ASSERT_WE_THINK(param.asset.offset <= file.fileSize);
|
||||
ASSERT_WE_THINK(param.prevVersion <= param.endVersion);
|
||||
ASSERT_WE_THINK(param.prevVersion <= param.asset.endVersion);
|
||||
|
||||
requests.emplace_back(loader->first, RestoreLoadFileRequest(param));
|
||||
loader++;
|
||||
|
|
|
@ -122,7 +122,7 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
|||
}
|
||||
}
|
||||
|
||||
// Input: Get the size of data at nextVersion in rangeFiles from rangeIdx and logFiles from logIdx
|
||||
// Input: Get the size of data in backup files in version range [prevVersion, nextVersion)
|
||||
// Return: param1: the size of data at nextVersion, param2: the minimum range file index whose version >
|
||||
// nextVersion, param3: log files with data in [prevVersion, nextVersion]
|
||||
std::tuple<double, int, std::vector<RestoreFileFR>> getVersionSize(Version prevVersion, Version nextVersion,
|
||||
|
@ -139,12 +139,10 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
|||
ASSERT(prevVersion <= nextVersion);
|
||||
while (rangeIdx < rangeFiles.size()) {
|
||||
TraceEvent(SevDebug, "FastRestoreGetVersionSize").detail("RangeFile", rangeFiles[rangeIdx].toString());
|
||||
if (rangeFiles[rangeIdx].version <= nextVersion) {
|
||||
if (rangeFiles[rangeIdx].version < nextVersion) {
|
||||
ASSERT(rangeFiles[rangeIdx].version >= prevVersion);
|
||||
size += rangeFiles[rangeIdx].fileSize;
|
||||
} else {
|
||||
//TraceEvent("FastRestoreGetVersionSize").detail("RangeIdx", rangeIdx).detail("FileVersion", rangeFiles[rangeIdx].version).detail("NextVersion", nextVersion);
|
||||
// ASSERT(rangeFiles[rangeIdx].version > nextVersion);
|
||||
break;
|
||||
}
|
||||
++rangeIdx;
|
||||
|
@ -155,8 +153,8 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
|||
// For example, we do not assume each version range only exists in one log file
|
||||
while (logIdx < logFiles.size()) {
|
||||
Version begin = std::max(prevVersion, logFiles[logIdx].beginVersion);
|
||||
Version end = std::min(nextVersion + 1, logFiles[logIdx].endVersion);
|
||||
if (begin < end) { // logIdx file overlap in [prevVersion, nextVersion]
|
||||
Version end = std::min(nextVersion, logFiles[logIdx].endVersion);
|
||||
if (begin < end) { // logIdx file overlap in [prevVersion, nextVersion)
|
||||
double ratio = (end - begin) / (logFiles[logIdx].endVersion - logFiles[logIdx].beginVersion);
|
||||
size += logFiles[logIdx].fileSize * ratio;
|
||||
retLogs.push_back(logFiles[logIdx]);
|
||||
|
@ -170,26 +168,28 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
|||
// Input: sorted range files, sorted log files;
|
||||
// Output: a set of version batches whose size is less than opConfig.batchSizeThreshold
|
||||
// and each mutation data in backup files is included in the version batches exactly once
|
||||
// Assume: input files has no empty files
|
||||
// Assumption 1: input files has no empty files
|
||||
// Assumption 2: range files at one version > batchSizeThreshold
|
||||
void buildVersionBatches(const std::vector<RestoreFileFR>& rangeFiles, const std::vector<RestoreFileFR>& logFiles,
|
||||
std::map<Version, VersionBatch>* versionBatches) {
|
||||
// Version batch range [beginVersion, endVersion)
|
||||
Version beginVersion = 0;
|
||||
Version endVersion = 0;
|
||||
Version prevEndVersion = 0;
|
||||
double batchSize = 0;
|
||||
double batchSize = 0; // TODO: Can be deleted
|
||||
int rangeIdx = 0;
|
||||
int logIdx = 0;
|
||||
Version nextVersion = 0;
|
||||
Version nextVersion = 0; // Used to calculate the batch's endVersion
|
||||
VersionBatch vb;
|
||||
vb.beginVersion = beginVersion;
|
||||
bool rewriteNextVersion = false;
|
||||
while (rangeIdx < rangeFiles.size() || logIdx < logFiles.size()) {
|
||||
if (!rewriteNextVersion) {
|
||||
if (rangeIdx < rangeFiles.size() && logIdx < logFiles.size()) {
|
||||
nextVersion = std::max(rangeFiles[rangeIdx].version, nextVersion);
|
||||
// nextVersion as endVersion is exclusive in the version range
|
||||
nextVersion = std::max(rangeFiles[rangeIdx].version + 1, nextVersion);
|
||||
} else if (rangeIdx < rangeFiles.size()) { // i.e., logIdx >= logFiles.size()
|
||||
nextVersion = rangeFiles[rangeIdx].version;
|
||||
nextVersion = rangeFiles[rangeIdx].version + 1;
|
||||
} else if (logIdx < logFiles.size()) {
|
||||
while (logIdx < logFiles.size() && logFiles[logIdx].endVersion <= nextVersion) {
|
||||
logIdx++;
|
||||
|
@ -241,14 +241,14 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
|||
}
|
||||
|
||||
for (auto& log : curLogFiles) {
|
||||
ASSERT(log.beginVersion <= nextVersion);
|
||||
ASSERT(log.beginVersion < nextVersion);
|
||||
ASSERT(log.endVersion > prevEndVersion);
|
||||
vb.logFiles.push_back(log);
|
||||
advanced = true;
|
||||
}
|
||||
|
||||
ASSERT(advanced == true || nextVersionSize > 0); // Ensure progress
|
||||
vb.endVersion = nextVersion + 1;
|
||||
vb.endVersion = nextVersion;
|
||||
prevEndVersion = vb.endVersion;
|
||||
} else {
|
||||
if (batchSize < 1) {
|
||||
|
@ -270,18 +270,19 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
|
|||
continue;
|
||||
}
|
||||
// Finalize the current version batch
|
||||
vb.endVersion = nextVersion;
|
||||
//vb.endVersion = nextVersion;
|
||||
vb.size = batchSize;
|
||||
versionBatches->emplace(vb.beginVersion, vb); // copy vb to versionBatch
|
||||
// start finding the next version batch
|
||||
vb.reset();
|
||||
batchSize = 0;
|
||||
vb.beginVersion = nextVersion;
|
||||
prevEndVersion = nextVersion; //ensure prevEndVersion is in the batch's version range
|
||||
vb.beginVersion = prevEndVersion;
|
||||
nextVersion = prevEndVersion + 1;
|
||||
}
|
||||
}
|
||||
// The last wip version batch has some files
|
||||
if (batchSize > 0) {
|
||||
vb.endVersion = nextVersion + 1;
|
||||
vb.endVersion = nextVersion;
|
||||
vb.size = batchSize;
|
||||
versionBatches->emplace(vb.beginVersion, vb);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue