FastRestore:Introduce RestoreAsset

This commit is contained in:
Meng Xu 2019-12-19 16:50:39 -08:00
parent ffc8f76710
commit e98b2a0d1c
8 changed files with 178 additions and 94 deletions

View File

@ -284,7 +284,7 @@ public:
void clearApplyMutationsKeys(Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
// Clear add/remove prefix keys
tr->clear(uidPrefixKey(applyMutationsAddPrefixRange.begin, uid));
tr->clear(uidPrefixKey(applyMutationsRemovePrefixRange.begin, uid));

View File

@ -196,8 +196,49 @@ struct RestoreApplierInterface : RestoreRoleInterface {
std::string toString() { return nodeID.toString(); }
};
// RestoreAsset uniquely identifies the work unit done by restore roles;
// It is used to ensure exact-once processing on restore loader and applier;
// By combining all RestoreAssets across all verstion batches, restore should process all mutations in
// backup range and log files up to the target restore version.
struct RestoreAsset {
Version beginVersion, endVersion; // Only use mutation in [begin, end) versions;
KeyRange range; // Only use mutations in range
std::string filename;
int fileIndex;
int64_t offset;
int64_t len;
RestoreAsset() = default;
bool operator==(const RestoreAsset& r) const {
return filename == r.filename && offset == r.offset && len == r.len && beginVersion == r.beginVersion &&
endVersion == r.endVersion && range == r.range;
}
bool operator!=(const RestoreAsset& r) const {
return filename != r.filename || offset != r.offset || len != r.len || beginVersion != r.beginVersion ||
endVersion != r.endVersion || range != r.range;
}
bool operator<(const RestoreAsset& r) const {
return std::make_tuple(filename, offset, len, beginVersion, endVersion, range.begin, range.end) <
std::make_tuple(r.filename, r.offset, r.len, r.beginVersion, r.endVersion, r.range.begin, r.range.end);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, beginVersion, endVersion, range, filename, fileIndex, offset, len);
}
std::string toString() {
std::stringstream ss;
ss << "begin:" << beginVersion << " end:" << endVersion << " range:" << range.toString()
<< " filename:" << filename << " fileIndex:" << fileIndex << " offset:" << offset << " len:" << len;
return ss.str();
}
};
// TODO: It is probably better to specify the (beginVersion, endVersion] for each loadingParam.
// beginVersion (endVersion) is the version the applier is before (after) it receives the request.
// begin}Version (endVersion) is the version the applier is before (after) it receives the request.
struct LoadingParam {
constexpr static FileIdentifier file_identifier = 17023837;
@ -205,34 +246,37 @@ struct LoadingParam {
Key url;
Version prevVersion;
Version endVersion; // range file's mutations are all at the endVersion
int fileIndex;
std::string filename;
int64_t offset;
int64_t length;
int64_t blockSize;
KeyRange restoreRange;
// int fileIndex;
// std::string filename;
// int64_t offset;
// int64_t length;
// KeyRange restoreRange;
RestoreAsset asset;
LoadingParam() = default;
// TODO: Compare all fields for loadingParam
bool operator==(const LoadingParam& r) const { return isRangeFile == r.isRangeFile && filename == r.filename; }
bool operator!=(const LoadingParam& r) const { return isRangeFile != r.isRangeFile || filename != r.filename; }
bool operator==(const LoadingParam& r) const { return isRangeFile == r.isRangeFile && asset == r.asset; }
bool operator!=(const LoadingParam& r) const { return isRangeFile != r.isRangeFile || asset != r.asset; }
bool operator<(const LoadingParam& r) const {
return (isRangeFile < r.isRangeFile) || (isRangeFile == r.isRangeFile && filename < r.filename);
return (isRangeFile < r.isRangeFile) || (isRangeFile == r.isRangeFile && asset < r.asset);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, isRangeFile, url, prevVersion, endVersion, fileIndex, filename, offset, length, blockSize,
restoreRange);
// serializer(ar, isRangeFile, url, prevVersion, endVersion, fileIndex, filename, offset, length, blockSize,
// restoreRange);
serializer(ar, isRangeFile, url, prevVersion, endVersion, blockSize, asset);
}
std::string toString() {
std::stringstream str;
str << "isRangeFile:" << isRangeFile << " url:" << url.toString() << " prevVersion:" << prevVersion
<< " endVersion:" << endVersion << " fileIndex:" << fileIndex << " filename:" << filename
<< " offset:" << offset << " length:" << length << " blockSize:" << blockSize
<< " restoreRange:" << restoreRange.toString();
<< " endVersion:" << endVersion << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString();
return str.str();
}
};
@ -347,7 +391,7 @@ struct RestoreLoadFileRequest : TimedRequest {
ReplyPromise<RestoreLoadFileReply> reply;
RestoreLoadFileRequest() = default;
explicit RestoreLoadFileRequest(LoadingParam param) : param(param) {}
RestoreLoadFileRequest(LoadingParam& param): param(param) {};
template <class Ar>
void serialize(Ar& ar) {
@ -389,29 +433,29 @@ struct RestoreSendMutationsToAppliersRequest : TimedRequest {
struct RestoreSendVersionedMutationsRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 69764565;
RestoreAsset asset; // Unique identifier for the current restore asset
Version prevVersion, version; // version is the commitVersion of the mutation vector.
int fileIndex; // Unique index for a backup file
bool isRangeFile;
MutationsVec mutations; // All mutations at the same version parsed by one loader
ReplyPromise<RestoreCommonReply> reply;
RestoreSendVersionedMutationsRequest() = default;
explicit RestoreSendVersionedMutationsRequest(int fileIndex, Version prevVersion, Version version, bool isRangeFile,
MutationsVec mutations)
: fileIndex(fileIndex), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile),
mutations(mutations) {}
explicit RestoreSendVersionedMutationsRequest(RestoreAsset asset, Version prevVersion, Version version,
bool isRangeFile, MutationsVec mutations)
: asset(asset), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile), mutations(mutations) {}
std::string toString() {
std::stringstream ss;
ss << "fileIndex:" << fileIndex << " prevVersion:" << prevVersion << " version:" << version
ss << "RestoreAsset:" << asset.toString() << " prevVersion:" << prevVersion << " version:" << version
<< " isRangeFile:" << isRangeFile << " mutations.size:" << mutations.size();
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, fileIndex, prevVersion, version, isRangeFile, mutations, reply);
serializer(ar, asset, prevVersion, version, isRangeFile, mutations, reply);
}
};

View File

@ -96,11 +96,11 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
Reference<RestoreApplierData> self) {
// Assume: self->processedFileState[req.fileIndex] will not be erased while the actor is active.
// Note: Insert new items into processedFileState will not invalidate the reference.
state NotifiedVersion& curFilePos = self->processedFileState[req.fileIndex];
state NotifiedVersion& curFilePos = self->processedFileState[req.asset];
TraceEvent("FastRestore")
.detail("ApplierNode", self->id())
.detail("FileIndex", req.fileIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("ProcessedFileVersion", curFilePos.get())
.detail("Request", req.toString());
@ -109,6 +109,9 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
if (curFilePos.get() == req.prevVersion) {
Version commitVersion = req.version;
MutationsVec mutations(req.mutations);
// Sanity check
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion && commitVersion < req.asset.endVersion);
if (self->kvOps.find(commitVersion) == self->kvOps.end()) {
self->kvOps.insert(std::make_pair(commitVersion, MutationsVec()));
}
@ -116,10 +119,19 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
MutationRef mutation = mutations[mIndex];
TraceEvent(SevFRMutationInfo, "FastRestore")
.detail("ApplierNode", self->id())
.detail("FileUID", req.fileIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("Version", commitVersion)
.detail("Index", mIndex)
.detail("MutationReceived", mutation.toString());
// Sanity check
if (g_network->isSimulated()) {
if (isRangeMutation(mutation)) {
ASSERT(mutation.param1 >= req.asset.range.begin &&
mutation.param2 <= req.asset.range.end); // Range mutation's right side is exclusive
} else {
ASSERT(mutation.param1 >= req.asset.range.begin && mutation.param1 < req.asset.range.end);
}
}
self->kvOps[commitVersion].push_back_deep(self->kvOps[commitVersion].arena(), mutation);
// TODO: What if log file's mutations are delivered out-of-order (behind) the range file's mutations?!
}

View File

@ -41,8 +41,8 @@
#include "flow/actorcompiler.h" // has to be last include
struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreApplierData> {
// processedFileState: key: file unique index; value: largest version of mutation received on the applier
std::map<uint32_t, NotifiedVersion> processedFileState;
// processedFileState: key: RestoreAsset; value: largest version of mutation received on the applier
std::map<RestoreAsset, NotifiedVersion> processedFileState;
Optional<Future<Void>> dbApplier;
// rangeToApplier is in master and loader. Loader uses it to determine which applier a mutation should be sent

View File

@ -33,29 +33,27 @@ typedef std::map<Standalone<StringRef>, Standalone<StringRef>> SerializedMutatio
// Key has the same semantics as SerializedMutationListMap; Value is the part number of the splitted mutation list
typedef std::map<Standalone<StringRef>, uint32_t> SerializedMutationPartMap;
bool isRangeMutation(MutationRef m);
// bool isRangeMutation(MutationRef m);
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, bool isSampling = false);
std::map<LoadingParam, MutationsVec>::iterator samplesIter, RestoreAsset asset);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self,
bool isSampling = false);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self);
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* kvOps,
bool isRangeFile, Version startVersion, Version endVersion, int fileIndex);
bool isRangeFile, Version startVersion, Version endVersion, RestoreAsset asset);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap,
Reference<IBackupContainer> bc, std::string fileName,
int64_t readOffset, int64_t readLen, KeyRange restoreRange);
Reference<IBackupContainer> bc, RestoreAsset asset);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset_input, int64_t readLen_input, KeyRange restoreRange);
RestoreAsset asset);
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx) {
state Reference<RestoreLoaderData> self =
@ -79,7 +77,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
when(RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture())) {
requestTypeStr = "loadFile";
self->initBackupContainer(req.param.url);
actors.add(handleLoadFileRequest(req, self, false));
actors.add(handleLoadFileRequest(req, self));
}
when(RestoreSendMutationsToAppliersRequest req = waitNext(loaderInterf.sendMutations.getFuture())) {
requestTypeStr = "sendMutations";
@ -140,7 +138,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartProcessLoadParam", param.toString());
ASSERT(param.blockSize > 0);
ASSERT(param.offset % param.blockSize == 0); // Parse file must be at block bondary.
ASSERT(param.asset.offset % param.blockSize == 0); // Parse file must be at block bondary.
ASSERT(self->kvOpsPerLP.find(param) == self->kvOpsPerLP.end());
// NOTE: map's iterator is guaranteed to be stable, but pointer may not.
@ -153,33 +151,32 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
int64_t j;
int64_t readOffset;
int64_t readLen;
for (j = param.offset; j < param.length; j += param.blockSize) {
for (j = param.asset.offset; j < param.asset.len; j += param.blockSize) {
readOffset = j;
readLen = std::min<int64_t>(param.blockSize, param.length - j);
readLen = std::min<int64_t>(param.blockSize, param.asset.len - j);
if (param.isRangeFile) {
// TODO: Sanity check the range file is within the restored version range
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc,
param.endVersion, param.filename, readOffset,
readLen, param.restoreRange));
param.endVersion, param.asset));
} else {
// TODO: Sanity check the log file's range is overlapped with the restored version range
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
&mutationPartMap, self->bc, param.filename,
readOffset, readLen, param.restoreRange));
&mutationPartMap, self->bc, param.asset));
}
}
wait(waitForAll(fileParserFutures));
if (!param.isRangeFile) {
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap, samplesIter);
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap, samplesIter, param.asset);
}
TraceEvent("FastRestore").detail("Loader", self->id()).detail("FinishLoadingFile", param.filename);
TraceEvent("FastRestore").detail("Loader", self->id()).detail("FinishLoadingFile", param.asset.filename);
return Void();
}
// A loader can process multiple RestoreLoadFileRequest in parallel.
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self,
bool isSampling) {
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self) {
if (self->processedFileParams.find(req.param) == self->processedFileParams.end()) {
TraceEvent("FastRestore").detail("Loader", self->id()).detail("ProcessLoadParam", req.param.toString());
ASSERT(self->sampleMutations.find(req.param) == self->sampleMutations.end());
@ -205,7 +202,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
if (item->first.isRangeFile == req.useRangeFile) {
// Send the parsed mutation to applier who will apply the mutation to DB
wait(sendMutationsToApplier(self, &item->second, item->first.isRangeFile, item->first.prevVersion,
item->first.endVersion, item->first.fileIndex));
item->first.endVersion, item->first.asset));
}
}
@ -216,7 +213,8 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
// TODO: This function can be revised better
// Assume: kvOps data are from the same file.
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* pkvOps,
bool isRangeFile, Version startVersion, Version endVersion, int fileIndex) {
bool isRangeFile, Version startVersion, Version endVersion,
RestoreAsset asset) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
@ -230,7 +228,7 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
.detail("IsRangeFile", isRangeFile)
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion)
.detail("FileIndex", fileIndex);
.detail("FileIndex", asset.filename);
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if (kvOps.find(endVersion) == kvOps.end()) {
@ -291,14 +289,14 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
// Send the mutations to appliers for each version
for (auto& applierID : applierIDs) {
requests.push_back(std::make_pair(
applierID, RestoreSendVersionedMutationsRequest(fileIndex, prevVersion, commitVersion, isRangeFile,
applierID, RestoreSendVersionedMutationsRequest(asset, prevVersion, commitVersion, isRangeFile,
applierMutationsBuffer[applierID])));
}
TraceEvent(SevDebug, "FastRestore_Debug")
.detail("Loader", self->id())
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion)
.detail("FileIndex", fileIndex);
.detail("FileIndex", asset.filename);
ASSERT(prevVersion < commitVersion);
prevVersion = commitVersion;
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests));
@ -362,11 +360,12 @@ void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvec
// value_input: serialized binary of mutations at the same version
bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standalone<StringRef>>* pMutationMap,
std::map<Standalone<StringRef>, uint32_t>* pMutationPartMap,
Standalone<StringRef> key_input, Standalone<StringRef> val_input) {
Standalone<StringRef> key_input, Standalone<StringRef> val_input,
RestoreAsset asset) {
SerializedMutationListMap& mutationMap = *pMutationMap;
std::map<Standalone<StringRef>, uint32_t>& mutationPartMap = *pMutationPartMap;
std::string prefix = "||\t";
std::stringstream ss;
// std::string prefix = "||\t";
// std::stringstream ss;
const int key_prefix_len = sizeof(uint8_t) + sizeof(Version) + sizeof(uint32_t);
BackupStringRefReader readerKey(key_input, restore_corrupted_data()); // read key_input!
@ -382,6 +381,11 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
readerKey.consume<uint8_t>(); // uint8_t hashValue = readerKey.consume<uint8_t>()
Version commitVersion = readerKey.consumeNetworkUInt64();
// Skip mutations not in [asset.beginVersion, asset.endVersion), which is what we are only processing right now
if (commitVersion < asset.beginVersion || commitVersion >= asset.endVersion) {
return false;
}
uint32_t part = readerKey.consumeNetworkUInt32();
// Use commitVersion as id
Standalone<StringRef> id = StringRef((uint8_t*)&commitVersion, sizeof(Version));
@ -410,16 +414,6 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
return concatenated;
}
bool isRangeMutation(MutationRef m) {
if (m.type == MutationRef::Type::ClearRange) {
ASSERT(m.type != MutationRef::Type::DebugKeyRange);
return true;
} else {
ASSERT(m.type == MutationRef::Type::SetValue || isAtomicOp((MutationRef::Type)m.type));
return false;
}
}
// Parse the kv pair (version, serialized_mutation), which are the results parsed from log file, into
// (version, <K, V, mutationType>) pair;
// Put the parsed versioned mutations into *pkvOps.
@ -434,7 +428,7 @@ bool isRangeMutation(MutationRef m) {
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, bool isSampling) {
std::map<LoadingParam, MutationsVec>::iterator samplesIter, RestoreAsset asset) {
VersionedMutationsMap& kvOps = kvOpsIter->second;
MutationsVec& samples = samplesIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap;
@ -445,6 +439,8 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
BackupStringRefReader kReader(k, restore_corrupted_data());
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
// We have already filter the commit not in [beginVersion, endVersion) when we concatenate kv pair in log file
ASSERT_WE_THINK(commitVersion >= asset.beginVersion && commitVersion < asset.endVersion);
kvOps.insert(std::make_pair(commitVersion, MutationsVec()));
BackupStringRefReader vReader(val, restore_corrupted_data());
@ -468,6 +464,10 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
const uint8_t* v = vReader.consume(vLen);
MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
if (isRangeMutation(mutation)) {
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
}
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
@ -486,21 +486,30 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen, KeyRange restoreRange) {
RestoreAsset asset) {
state VersionedMutationsMap& kvOps = kvOpsIter->second;
state MutationsVec& sampleMutations = samplesIter->second;
TraceEvent("FastRestoreDecodedRangeFile")
.detail("Filename", asset.filename)
.detail("Version", version)
.detail("BeginVersion", asset.beginVersion)
.detail("EndVersion", asset.endVersion);
ASSERT_WE_THINK(version >= asset.beginVersion && version < asset.endVersion);
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
Standalone<VectorRef<KeyValueRef>> blockData =
wait(parallelFileRestore::decodeRangeFileBlock(inFile, readOffset, readLen));
TraceEvent("FastRestore").detail("DecodedRangeFile", fileName).detail("DataSize", blockData.contents().size());
wait(parallelFileRestore::decodeRangeFileBlock(inFile, asset.offset, asset.len));
TraceEvent("FastRestore")
.detail("DecodedRangeFile", asset.filename)
.detail("DataSize", blockData.contents().size());
// First and last key are the range for this file
KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
// If fileRange doesn't intersect restore range then we're done.
if (!fileRange.intersects(restoreRange)) {
if (!fileRange.intersects(asset.range)) {
return Void();
}
@ -513,11 +522,11 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// Slide start from begining, stop if something in range is found
// Move rangeStart and rangeEnd until they is within restoreRange
while (rangeStart < rangeEnd && !restoreRange.contains(blockData[rangeStart].key)) {
while (rangeStart < rangeEnd && !asset.range.contains(blockData[rangeStart].key)) {
++rangeStart;
}
// Side end from back, stop if something at (rangeEnd-1) is found in range
while (rangeEnd > rangeStart && !restoreRange.contains(blockData[rangeEnd - 1].key)) {
while (rangeEnd > rangeStart && !asset.range.contains(blockData[rangeEnd - 1].key)) {
--rangeEnd;
}
@ -556,22 +565,21 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* pMutationMap,
SerializedMutationPartMap* pMutationPartMap,
Reference<IBackupContainer> bc, std::string fileName,
int64_t readOffset, int64_t readLen, KeyRange restoreRange) {
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
Reference<IBackupContainer> bc, RestoreAsset asset) {
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
// decodeLogFileBlock() must read block by block!
state Standalone<VectorRef<KeyValueRef>> data =
wait(parallelFileRestore::decodeLogFileBlock(inFile, readOffset, readLen));
wait(parallelFileRestore::decodeLogFileBlock(inFile, asset.offset, asset.len));
TraceEvent("FastRestore")
.detail("DecodedLogFile", fileName)
.detail("Offset", readOffset)
.detail("Length", readLen)
.detail("DecodedLogFile", asset.filename)
.detail("Offset", asset.offset)
.detail("Length", asset.len)
.detail("DataSize", data.contents().size());
// Ensure data blocks in the same file are processed in order
wait(pProcessedFileOffset->whenAtLeast(readOffset));
wait(pProcessedFileOffset->whenAtLeast(asset.offset));
if (pProcessedFileOffset->get() == readOffset) {
if (pProcessedFileOffset->get() == asset.offset) {
int start = 0;
int end = data.size();
int numConcatenated = 0;
@ -580,10 +588,10 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
// ValueRef v = data[i].value;
// Concatenate the backuped param1 and param2 (KV) at the same version.
bool concatenated =
concatenateBackupMutationForLogFile(pMutationMap, pMutationPartMap, data[i].key, data[i].value);
concatenateBackupMutationForLogFile(pMutationMap, pMutationPartMap, data[i].key, data[i].value, asset);
numConcatenated += (concatenated ? 1 : 0);
}
pProcessedFileOffset->set(readOffset + readLen);
pProcessedFileOffset->set(asset.offset + asset.len);
}
return Void();

View File

@ -282,23 +282,31 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
param.prevVersion = 0; // Each file's NotifiedVersion starts from 0
param.endVersion = file.isRange ? file.version : file.endVersion;
param.fileIndex = file.fileIndex;
param.url = request.url;
param.isRangeFile = file.isRange;
param.filename = file.fileName;
param.offset = 0;
param.length = file.fileSize; // We load file by file, instead of data block by data block for now
param.blockSize = file.blockSize;
param.restoreRange = request.range;
param.asset.filename = file.fileName;
param.asset.fileIndex = file.fileIndex;
param.asset.offset = 0;
param.asset.len = file.fileSize;
param.asset.range = request.range;
param.asset.beginVersion = versionBatch.beginVersion;
param.asset.endVersion = versionBatch.endVersion;
// //param.fileIndex = file.fileIndex;
// param.filename = file.fileName;
// param.offset = 0;
// param.length = file.fileSize; // We load file by file, instead of data block by data block for now
// param.restoreRange = request.range;
prevVersion = param.endVersion;
// Log file to be loaded
TraceEvent("FastRestore").detail("LoadParam", param.toString()).detail("LoaderID", loader->first.toString());
ASSERT_WE_THINK(param.length >= 0); // we may load an empty file
ASSERT_WE_THINK(param.offset >= 0);
ASSERT_WE_THINK(param.offset <= file.fileSize);
ASSERT_WE_THINK(param.asset.len >= 0); // we may load an empty file
ASSERT_WE_THINK(param.asset.offset >= 0);
ASSERT_WE_THINK(param.asset.offset <= file.fileSize);
ASSERT_WE_THINK(param.prevVersion <= param.endVersion);
requests.emplace_back(loader->first, RestoreLoadFileRequest(param));

View File

@ -33,4 +33,14 @@ std::string getRoleStr(RestoreRole role) {
return "[Unset]";
}
return RestoreRoleStr[(int)role];
}
bool isRangeMutation(MutationRef m) {
if (m.type == MutationRef::Type::ClearRange) {
ASSERT(m.type != MutationRef::Type::DebugKeyRange);
return true;
} else {
ASSERT(m.type == MutationRef::Type::SetValue || isAtomicOp((MutationRef::Type)m.type));
return false;
}
}

View File

@ -96,4 +96,6 @@ struct RestoreSimpleRequest : TimedRequest {
}
};
bool isRangeMutation(MutationRef m);
#endif // FDBSERVER_RESTOREUTIL_ACTOR_H