Merge pull request #2442 from xumengpanda/mengxu/fastrestore-code-cleanup-PR

Performant restore [11/XX]: Clean up unused fields in LoadingParam
This commit is contained in:
Jingyu Zhou 2019-12-11 08:47:26 -08:00 committed by GitHub
commit 8312be9c2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 41 deletions

View File

@ -48,7 +48,7 @@ struct RestoreSysInfoRequest;
struct RestoreLoadFileRequest;
struct RestoreVersionBatchRequest;
struct RestoreSendMutationsToAppliersRequest;
struct RestoreSendMutationVectorVersionedRequest;
struct RestoreSendVersionedMutationsRequest;
struct RestoreSysInfo;
struct RestoreApplierInterface;
@ -162,7 +162,7 @@ struct RestoreApplierInterface : RestoreRoleInterface {
constexpr static FileIdentifier file_identifier = 54253048;
RequestStream<RestoreSimpleRequest> heartbeat;
RequestStream<RestoreSendMutationVectorVersionedRequest> sendMutationVector;
RequestStream<RestoreSendVersionedMutationsRequest> sendMutationVector;
RequestStream<RestoreVersionBatchRequest> applyToDB;
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
RequestStream<RestoreSimpleRequest> collectRestoreRoleInterfaces;
@ -204,17 +204,13 @@ struct LoadingParam {
bool isRangeFile;
Key url;
Version prevVersion;
Version endVersion;
Version endVersion; // range file's mutations are all at the endVersion
int fileIndex;
Version version;
std::string filename;
int64_t offset;
int64_t length;
int64_t blockSize;
KeyRange restoreRange;
Key addPrefix;
Key removePrefix;
Key mutationLogPrefix;
LoadingParam() = default;
@ -227,16 +223,16 @@ struct LoadingParam {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, isRangeFile, url, prevVersion, endVersion, fileIndex, version, filename, offset, length,
blockSize, restoreRange, addPrefix, removePrefix, mutationLogPrefix);
serializer(ar, isRangeFile, url, prevVersion, endVersion, fileIndex, filename, offset, length, blockSize,
restoreRange);
}
std::string toString() {
std::stringstream str;
str << "isRangeFile:" << isRangeFile << " url:" << url.toString() << " prevVersion:" << prevVersion
<< " fileIndex:" << fileIndex << " endVersion:" << endVersion << " version:" << version
<< " filename:" << filename << " offset:" << offset << " length:" << length << " blockSize:" << blockSize
<< " restoreRange:" << restoreRange.toString() << " addPrefix:" << addPrefix.toString();
<< " endVersion:" << endVersion << " fileIndex:" << fileIndex << " filename:" << filename
<< " offset:" << offset << " length:" << length << " blockSize:" << blockSize
<< " restoreRange:" << restoreRange.toString();
return str.str();
}
};
@ -390,7 +386,7 @@ struct RestoreSendMutationsToAppliersRequest : TimedRequest {
}
};
struct RestoreSendMutationVectorVersionedRequest : TimedRequest {
struct RestoreSendVersionedMutationsRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 69764565;
Version prevVersion, version; // version is the commitVersion of the mutation vector.
@ -400,9 +396,9 @@ struct RestoreSendMutationVectorVersionedRequest : TimedRequest {
ReplyPromise<RestoreCommonReply> reply;
RestoreSendMutationVectorVersionedRequest() = default;
explicit RestoreSendMutationVectorVersionedRequest(int fileIndex, Version prevVersion, Version version,
bool isRangeFile, VectorRef<MutationRef> mutations)
RestoreSendVersionedMutationsRequest() = default;
explicit RestoreSendVersionedMutationsRequest(int fileIndex, Version prevVersion, Version version, bool isRangeFile,
VectorRef<MutationRef> mutations)
: fileIndex(fileIndex), prevVersion(prevVersion), version(version), isRangeFile(isRangeFile),
mutations(mutations) {}

View File

@ -34,7 +34,7 @@
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVersionedRequest req,
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
Reference<RestoreApplierData> self);
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
Database cx);
@ -54,7 +54,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
requestTypeStr = "heartbeat";
actors.add(handleHeartbeat(req, applierInterf.id()));
}
when(RestoreSendMutationVectorVersionedRequest req =
when(RestoreSendVersionedMutationsRequest req =
waitNext(applierInterf.sendMutationVector.getFuture())) {
requestTypeStr = "sendMutationVector";
actors.add(handleSendMutationVectorRequest(req, self));
@ -92,7 +92,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
// No race condition as long as we do not wait or yield when operate the shared data.
// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions;
// Only one actor can process mutations from the same file
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVersionedRequest req,
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
Reference<RestoreApplierData> self) {
// Assume: self->processedFileState[req.fileIndex] will not be erased while the actor is active.
// Note: Insert new items into processedFileState will not invalidate the reference.

View File

@ -47,10 +47,11 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
Reference<RestoreLoaderData> self);
ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, VersionedMutationsMap* kvOps,
bool isRangeFile, Version startVersion, Version endVersion, int fileIndex);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(
NotifiedVersion* pProcessedFileOffset, SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap, Reference<IBackupContainer> bc, Version version, std::string fileName,
int64_t readOffset, int64_t readLen, KeyRange restoreRange, Key addPrefix, Key removePrefix, Key mutationLogPrefix);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap,
Reference<IBackupContainer> bc, std::string fileName,
int64_t readOffset, int64_t readLen, KeyRange restoreRange);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, Version version,
@ -154,12 +155,12 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
readLen = std::min<int64_t>(param.blockSize, param.length - j);
if (param.isRangeFile) {
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc,
param.version, param.filename, readOffset,
param.endVersion, param.filename, readOffset,
readLen, param.restoreRange));
} else {
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(
&processedFileOffset, &mutationMap, &mutationPartMap, self->bc, param.version, param.filename,
readOffset, readLen, param.restoreRange, param.addPrefix, param.removePrefix, param.mutationLogPrefix));
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
&mutationPartMap, self->bc, param.filename,
readOffset, readLen, param.restoreRange));
}
}
wait(waitForAll(fileParserFutures));
@ -218,7 +219,7 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
state int kvCount = 0;
state int splitMutationIndex = 0;
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::vector<std::pair<UID, RestoreSendMutationVectorVersionedRequest>> requests;
state std::vector<std::pair<UID, RestoreSendVersionedMutationsRequest>> requests;
state Version prevVersion = startVersion;
TraceEvent("FastRestore_SendMutationToApplier")
@ -293,8 +294,8 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
// Send the mutations to appliers for each version
for (auto& applierID : applierIDs) {
requests.push_back(std::make_pair(
applierID, RestoreSendMutationVectorVersionedRequest(fileIndex, prevVersion, commitVersion, isRangeFile,
applierMutationsBuffer[applierID])));
applierID, RestoreSendVersionedMutationsRequest(fileIndex, prevVersion, commitVersion, isRangeFile,
applierMutationsBuffer[applierID])));
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0;
}
@ -534,9 +535,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// Convert KV in data into mutations in kvOps
for (int i = start; i < end; ++i) {
// NOTE: The KV pairs in range files are the real KV pairs in original DB.
// Should NOT removePrefix and addPrefix for the backup data!
// In other words, the following operation is wrong:
// data[i].key.removePrefix(removePrefix).withPrefix(addPrefix)
// Should NOT add prefix or remove surfix for the backup data!
MutationRef m(MutationRef::Type::SetValue, data[i].key,
data[i].value); // ASSUME: all operation in range file is set.
@ -563,10 +562,8 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* pMutationMap,
SerializedMutationPartMap* pMutationPartMap,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen,
KeyRange restoreRange, Key addPrefix, Key removePrefix,
Key mutationLogPrefix) {
Reference<IBackupContainer> bc, std::string fileName,
int64_t readOffset, int64_t readLen, KeyRange restoreRange) {
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
// decodeLogFileBlock() must read block by block!
state Standalone<VectorRef<KeyValueRef>> data =

View File

@ -278,15 +278,11 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
param.url = request.url;
param.isRangeFile = file.isRange;
param.version = file.version;
param.filename = file.fileName;
param.offset = 0;
param.length = file.fileSize; // We load file by file, instead of data block by data block for now
param.blockSize = file.blockSize;
param.restoreRange = request.range;
param.addPrefix = request.addPrefix;
param.removePrefix = request.removePrefix;
param.mutationLogPrefix = mutationLogPrefix;
prevVersion = param.endVersion;