Merge pull request #2408 from xumengpanda/mengxu/fast-restore-sampling-PR

Performant restore [10/XX]: Add sampling phase
This commit is contained in:
Jingyu Zhou 2019-12-05 09:29:05 -08:00 committed by GitHub
commit c75b7f63c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 175 additions and 88 deletions

View File

@ -216,6 +216,8 @@ struct LoadingParam {
Key removePrefix;
Key mutationLogPrefix;
LoadingParam() = default;
// TODO: Compare all fields for loadingParam
bool operator==(const LoadingParam& r) const { return isRangeFile == r.isRangeFile && filename == r.filename; }
bool operator!=(const LoadingParam& r) const { return isRangeFile != r.isRangeFile || filename != r.filename; }
@ -319,13 +321,34 @@ struct RestoreSysInfoRequest : TimedRequest {
}
};
struct RestoreLoadFileReply : TimedRequest {
constexpr static FileIdentifier file_identifier = 34077902;
LoadingParam param;
MutationsVec samples; // sampled mutations
RestoreLoadFileReply() = default;
explicit RestoreLoadFileReply(LoadingParam param, MutationsVec samples) : param(param), samples(samples) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, param, samples);
}
std::string toString() {
std::stringstream ss;
ss << "LoadingParam:" << param.toString() << " samples.size:" << samples.size();
return ss.str();
}
};
// Sample_Range_File and Assign_Loader_Range_File, Assign_Loader_Log_File
struct RestoreLoadFileRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 26557364;
LoadingParam param;
ReplyPromise<RestoreCommonReply> reply;
ReplyPromise<RestoreLoadFileReply> reply;
RestoreLoadFileRequest() = default;
explicit RestoreLoadFileRequest(LoadingParam param) : param(param) {}
@ -373,7 +396,7 @@ struct RestoreSendMutationVectorVersionedRequest : TimedRequest {
Version prevVersion, version; // version is the commitVersion of the mutation vector.
int fileIndex; // Unique index for a backup file
bool isRangeFile;
Standalone<VectorRef<MutationRef>> mutations; // All mutations are at version
MutationsVec mutations; // All mutations at the same version parsed by one loader
ReplyPromise<RestoreCommonReply> reply;

View File

@ -531,6 +531,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
// Fast Restore
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
init( FASTRESTORE_SAMPLING_PERCENT, 1 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
// clang-format on
if(clientKnobs)

View File

@ -471,6 +471,7 @@ public:
// Fast Restore
int64_t FASTRESTORE_FAILURE_TIMEOUT;
int64_t FASTRESTORE_HEARTBEAT_INTERVAL;
double FASTRESTORE_SAMPLING_PERCENT;
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL);
};

View File

@ -1776,7 +1776,7 @@ ACTOR Future<Void> masterProxyServerCore(
if(!data.size()) break;
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
Standalone<VectorRef<MutationRef>> mutations;
MutationsVec mutations;
std::vector<std::pair<MapPair<Key,ServerCacheInfo>,int>> keyInfoData;
vector<UID> src, dest;
ServerCacheInfo info;

View File

@ -107,14 +107,12 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVec
wait(curFilePos.whenAtLeast(req.prevVersion));
if (curFilePos.get() == req.prevVersion) {
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state Version commitVersion = req.version;
Version commitVersion = req.version;
VectorRef<MutationRef> mutations(req.mutations);
if (self->kvOps.find(commitVersion) == self->kvOps.end()) {
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
}
state int mIndex = 0;
for (mIndex = 0; mIndex < mutations.size(); mIndex++) {
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
MutationRef mutation = mutations[mIndex];
TraceEvent(SevDebug, "FastRestore")
.detail("ApplierNode", self->id())
@ -161,7 +159,7 @@ struct DBApplyProgress {
Reference<RestoreApplierData> self;
DBApplyProgress() = default;
DBApplyProgress(Reference<RestoreApplierData> self)
explicit DBApplyProgress(Reference<RestoreApplierData> self)
: self(self), curIndexInCurTxn(0), startIndexInUncommittedTxn(0), curTxnId(0), uncommittedTxnId(0),
lastTxnHasError(false), startNextVersion(false), numAtomicOps(0), transactionSize(0) {
curItInCurTxn = self->kvOps.begin();
@ -245,7 +243,10 @@ struct DBApplyProgress {
};
ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
// state variables must be defined at the start of actor to be initialized in the actor constructor
state std::string typeStr = "";
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state DBApplyProgress progress(self);
// Assume the process will not crash when it apply mutations to DB. The reply message can be lost though
if (self->kvOps.empty()) {
@ -262,8 +263,6 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
self->sanityCheckMutationOps();
state DBApplyProgress progress(self);
if (progress.isDone()) {
TraceEvent("FastRestore_ApplierTxn")
.detail("ApplierApplyToDBFinished", self->id())
@ -271,7 +270,6 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
return Void();
}
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
// Sanity check the restoreApplierKeys, which should be empty at this point
loop {
try {
@ -399,8 +397,6 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
TraceEvent("FastRestore_ApplierTxn")
.detail("ApplierApplyToDBFinished", self->id())
.detail("CleanupCurTxnIds", progress.curTxnId);
// House cleaning
self->kvOps.clear();
// clean up txn ids
loop {
try {
@ -416,6 +412,8 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
wait(tr->onError(e));
}
}
// House cleaning
self->kvOps.clear();
TraceEvent("FastRestore_ApplierTxn").detail("ApplierApplyToDBFinished", self->id());
return Void();

View File

@ -293,7 +293,7 @@ Future<Void> sendBatchRequests(RequestStream<Request> Interface::*channel, std::
// This actor can be combined with sendBatchRequests(...)
ACTOR template <class Interface, class Request>
Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
std::map<UID, Request> requests, std::vector<REPLY_TYPE(Request)>* replies) {
std::vector<std::pair<UID, Request>> requests, std::vector<REPLY_TYPE(Request)>* replies) {
if (requests.empty()) {
return Void();

View File

@ -37,7 +37,8 @@ bool isRangeMutation(MutationRef m);
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap, bool isSampling = false);
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, bool isSampling = false);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self,
@ -51,7 +52,8 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(
SerializedMutationPartMap* mutationPartMap, Reference<IBackupContainer> bc, Version version, std::string fileName,
int64_t readOffset, int64_t readLen, KeyRange restoreRange, Key addPrefix, Key removePrefix, Key mutationLogPrefix);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, Reference<IBackupContainer> bc, Version version,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset_input, int64_t readLen_input, KeyRange restoreRange);
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx) {
@ -132,7 +134,9 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
// NOTE: map's iterator is guaranteed to be stable, but pointer may not.
// state VersionedMutationsMap* kvOps = &self->kvOpsPerLP[param];
self->kvOpsPerLP.emplace(param, VersionedMutationsMap());
self->sampleMutations.emplace(param, MutationsVec());
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = self->kvOpsPerLP.find(param);
state std::map<LoadingParam, MutationsVec>::iterator samplesIter = self->sampleMutations.find(param);
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
@ -142,15 +146,16 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state int64_t j;
state int64_t readOffset;
state int64_t readLen;
int64_t j;
int64_t readOffset;
int64_t readLen;
for (j = param.offset; j < param.length; j += param.blockSize) {
readOffset = j;
readLen = std::min<int64_t>(param.blockSize, param.length - j);
if (param.isRangeFile) {
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(
kvOpsPerLPIter, self->bc, param.version, param.filename, readOffset, readLen, param.restoreRange));
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, self->bc,
param.version, param.filename, readOffset,
readLen, param.restoreRange));
} else {
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(
&processedFileOffset, &mutationMap, &mutationPartMap, self->bc, param.version, param.filename,
@ -160,7 +165,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
wait(waitForAll(fileParserFutures));
if (!param.isRangeFile) {
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap);
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap, samplesIter);
}
TraceEvent("FastRestore").detail("Loader", self->id()).detail("FinishLoadingFile", param.filename);
@ -173,6 +178,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
bool isSampling) {
if (self->processedFileParams.find(req.param) == self->processedFileParams.end()) {
TraceEvent("FastRestore").detail("Loader", self->id()).detail("ProcessLoadParam", req.param.toString());
ASSERT(self->sampleMutations.find(req.param) == self->sampleMutations.end());
self->processedFileParams[req.param] = Never();
self->processedFileParams[req.param] = _processLoadingParam(req.param, self);
} else {
@ -182,19 +188,15 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
wait(self->processedFileParams[req.param]); // wait on the processing of the req.param.
// TODO: Send sampled mutations back to master
req.reply.send(RestoreCommonReply(self->id()));
req.reply.send(RestoreLoadFileReply(req.param, self->sampleMutations[req.param]));
// TODO: clear self->sampleMutations[req.param] memory to save memory on loader
return Void();
}
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
if (self->rangeToApplier.empty()) {
self->rangeToApplier = req.rangeToApplier;
} else {
ASSERT(self->rangeToApplier == req.rangeToApplier);
}
self->rangeToApplier = req.rangeToApplier;
// Send mutations from log files first to ensure log mutation at the same version is before the range kv
state std::map<LoadingParam, VersionedMutationsMap>::iterator item = self->kvOpsPerLP.begin();
for (; item != self->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {
@ -215,9 +217,12 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
state VersionedMutationsMap& kvOps = *pkvOps;
state int kvCount = 0;
state int splitMutationIndex = 0;
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::vector<std::pair<UID, RestoreSendMutationVectorVersionedRequest>> requests;
state Version prevVersion = startVersion;
TraceEvent("FastRestore")
.detail("SendMutationToApplier", self->id())
TraceEvent("FastRestore_SendMutationToApplier")
.detail("Loader", self->id())
.detail("IsRangeFile", isRangeFile)
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion)
@ -230,31 +235,26 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
state std::map<UID, Standalone<VectorRef<MutationRef>>> applierMutationsBuffer;
state std::map<UID, MutationsVec> applierMutationsBuffer;
state std::map<UID, double> applierMutationsSize;
state Standalone<VectorRef<MutationRef>> mvector;
state MutationsVec mvector;
state Standalone<VectorRef<UID>> nodeIDs;
// Initialize the above two maps
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::vector<std::pair<UID, RestoreSendMutationVectorVersionedRequest>> requests;
state Version prevVersion = startVersion;
splitMutationIndex = 0;
kvCount = 0;
state VersionedMutationsMap::iterator kvOp;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
applierMutationsBuffer.clear();
applierMutationsSize.clear();
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = Standalone<VectorRef<MutationRef>>(VectorRef<MutationRef>());
applierMutationsBuffer[applierID] = MutationsVec(VectorRef<MutationRef>());
applierMutationsSize[applierID] = 0.0;
}
state Version commitVersion = kvOp->first;
state int mIndex;
state MutationRef kvm;
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
kvm = kvOp->second[mIndex];
for (int mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
MutationRef kvm = kvOp->second[mIndex];
// Send the mutation to applier
if (isRangeMutation(kvm)) {
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
@ -276,7 +276,7 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
kvCount++;
}
} else { // mutation operates on a particular key
std::map<Standalone<KeyRef>, UID>::iterator itlow = self->rangeToApplier.upper_bound(kvm.param1);
std::map<Key, UID>::iterator itlow = self->rangeToApplier.upper_bound(kvm.param1);
--itlow; // make sure itlow->first <= m.param1
ASSERT(itlow->first <= kvm.param1);
MutationRef mutation = kvm;
@ -438,8 +438,10 @@ bool isRangeMutation(MutationRef m) {
// [mutation1][mutation2]...[mutationk], where
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap, bool isSampling) {
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, bool isSampling) {
VersionedMutationsMap& kvOps = kvOpsIter->second;
MutationsVec& samples = samplesIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap;
for (auto& m : mutationMap) {
@ -475,6 +477,10 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
kvOps[commitVersion].push_back_deep(kvOps[commitVersion].arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
samples.push_back_deep(samples.arena(), mutation);
}
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
}
@ -483,18 +489,20 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
// Parsing the data blocks in a range file
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, Reference<IBackupContainer> bc, Version version,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen, KeyRange restoreRange) {
state VersionedMutationsMap& kvOps = kvOpsIter->second;
state MutationsVec& sampleMutations = samplesIter->second;
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
state Standalone<VectorRef<KeyValueRef>> blockData =
Standalone<VectorRef<KeyValueRef>> blockData =
wait(parallelFileRestore::decodeRangeFileBlock(inFile, readOffset, readLen));
TraceEvent("FastRestore").detail("DecodedRangeFile", fileName).detail("DataSize", blockData.contents().size());
// First and last key are the range for this file
state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
// If fileRange doesn't intersect restore range then we're done.
if (!fileRange.intersects(restoreRange)) {
@ -519,9 +527,9 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
}
// Now data only contains the kv mutation within restoreRange
state VectorRef<KeyValueRef> data = blockData.slice(rangeStart, rangeEnd);
state int start = 0;
state int end = data.size();
VectorRef<KeyValueRef> data = blockData.slice(rangeStart, rangeEnd);
int start = 0;
int end = data.size();
// Convert KV in data into mutations in kvOps
for (int i = start; i < end; ++i) {
@ -540,6 +548,10 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
ASSERT_WE_THINK(kvOps.find(version) != kvOps.end());
kvOps[version].push_back_deep(kvOps[version].arena(), m);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
sampleMutations.push_back_deep(sampleMutations.arena(), m);
}
}
return Void();
@ -555,7 +567,7 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
std::string fileName, int64_t readOffset, int64_t readLen,
KeyRange restoreRange, Key addPrefix, Key removePrefix,
Key mutationLogPrefix) {
state Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
// decodeLogFileBlock() must read block by block!
state Standalone<VectorRef<KeyValueRef>> data =
wait(parallelFileRestore::decodeLogFileBlock(inFile, readOffset, readLen));
@ -569,9 +581,9 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
wait(pProcessedFileOffset->whenAtLeast(readOffset));
if (pProcessedFileOffset->get() == readOffset) {
state int start = 0;
state int end = data.size();
state int numConcatenated = 0;
int start = 0;
int end = data.size();
int numConcatenated = 0;
for (int i = start; i < end; ++i) {
// Key k = data[i].key.withPrefix(mutationLogPrefix);
// ValueRef v = data[i].value;

View File

@ -47,8 +47,11 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
std::map<LoadingParam, VersionedMutationsMap> kvOpsPerLP; // Buffered kvOps for each loading param
// rangeToApplier is in master and loader. Loader uses this to determine which applier a mutation should be sent
// KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Standalone<KeyRef>, UID> rangeToApplier;
// Key is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Key, UID> rangeToApplier;
// Sampled mutations to be sent back to restore master
std::map<LoadingParam, MutationsVec> sampleMutations;
// keyOpsCount is the number of operations per key which is used to determine the key-range boundary for appliers
std::map<Standalone<KeyRef>, int> keyOpsCount;
int numSampledMutations; // The total number of mutations received from sampled data.
@ -81,6 +84,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
numSampledMutations = 0;
processedFileParams.clear();
kvOpsPerLP.clear();
sampleMutations.clear();
}
// Only get the appliers that are responsible for a range

View File

@ -54,7 +54,7 @@ ACTOR static Future<Void> initializeVersionBatch(Reference<RestoreMasterData> se
ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<RestoreMasterData> self);
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> self, Database cx);
void dummySampleWorkload(Reference<RestoreMasterData> self);
void splitKeyRangeForAppliers(Reference<RestoreMasterData> self);
ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker, Database cx) {
state Reference<RestoreMasterData> self = Reference<RestoreMasterData>(new RestoreMasterData());
@ -93,7 +93,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker
// Assign a role to each worker
state int nodeIndex = 0;
state RestoreRole role;
std::map<UID, RestoreRecruitRoleRequest> requests;
std::vector<std::pair<UID, RestoreRecruitRoleRequest>> requests;
for (auto& workerInterf : masterWorker->workerInterfaces) {
if (nodeIndex >= 0 && nodeIndex < opConfig.num_appliers) {
// [0, numApplier) are appliers
@ -109,7 +109,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker
.detail("Role", getRoleStr(role))
.detail("NodeIndex", nodeIndex)
.detail("WorkerNode", workerInterf.first);
requests[workerInterf.first] = RestoreRecruitRoleRequest(role, nodeIndex);
requests.emplace_back(workerInterf.first, RestoreRecruitRoleRequest(role, nodeIndex));
nodeIndex++;
}
@ -297,12 +297,22 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
ASSERT_WE_THINK(param.offset <= file.fileSize);
ASSERT_WE_THINK(param.prevVersion <= param.endVersion);
requests.push_back(std::make_pair(loader->first, RestoreLoadFileRequest(param)));
requests.emplace_back(loader->first, RestoreLoadFileRequest(param));
loader++;
}
state std::vector<RestoreLoadFileReply> replies;
// Wait on the batch of load files or log files
wait(sendBatchRequests(&RestoreLoaderInterface::loadFile, self->loadersInterf, requests));
wait(getBatchReplies(&RestoreLoaderInterface::loadFile, self->loadersInterf, requests, &replies));
TraceEvent("FastRestore").detail("VersionBatch", self->batchIndex).detail("SamplingReplies", replies.size());
for (auto& reply : replies) {
TraceEvent("FastRestore").detail("VersionBatch", self->batchIndex).detail("SamplingReplies", reply.toString());
for (int i = 0; i < reply.samples.size(); ++i) {
MutationRef mutation = reply.samples[i];
self->samples.addMetric(mutation.param1, mutation.totalSize());
self->samplesSize += mutation.totalSize();
}
}
return Void();
}
@ -329,13 +339,13 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
ASSERT(self->loadersInterf.size() > 0);
ASSERT(self->appliersInterf.size() > 0);
dummySampleWorkload(self); // TODO: Delete
// Parse log files and send mutations to appliers before we parse range files
// TODO: Allow loading both range and log files in parallel
wait(loadFilesOnLoaders(self, cx, request, versionBatch, false));
wait(loadFilesOnLoaders(self, cx, request, versionBatch, true));
splitKeyRangeForAppliers(self);
// Loaders should ensure log files' mutations sent to appliers before range files' mutations
// TODO: Let applier buffer mutations from log and range files differently so that loaders can send mutations in
// parallel
@ -347,25 +357,43 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
return Void();
}
// Placehold for sample workload
// Produce the key-range for each applier
void dummySampleWorkload(Reference<RestoreMasterData> self) {
// Decide which key range should be taken by which applier
void splitKeyRangeForAppliers(Reference<RestoreMasterData> self) {
int numAppliers = self->appliersInterf.size();
double slotSize = std::max(self->samplesSize / numAppliers, 1.0);
std::vector<Key> keyrangeSplitter;
// We will use the splitter at [1, numAppliers - 1]. The first splitter is normalKeys.begin
int i;
for (i = 0; i < numAppliers; i++) {
keyrangeSplitter.push_back(Key(deterministicRandom()->randomUniqueID().toString()));
keyrangeSplitter.push_back(normalKeys.begin); // First slot
double cumulativeSize = slotSize;
TraceEvent("FastRestore").detail("VersionBatch", self->batchIndex).detail("SamplingSize", self->samplesSize);
while (cumulativeSize < self->samplesSize) {
IndexedSet<Key, int64_t>::iterator lowerBound = self->samples.index(cumulativeSize);
if (lowerBound == self->samples.end()) {
break;
}
keyrangeSplitter.push_back(*lowerBound);
TraceEvent("FastRestore")
.detail("VersionBatch", self->batchIndex)
.detail("CumulativeSize", cumulativeSize)
.detail("SlotSize", slotSize);
cumulativeSize += slotSize;
}
std::sort(keyrangeSplitter.begin(), keyrangeSplitter.end());
i = 0;
if (keyrangeSplitter.size() < numAppliers) {
TraceEvent(SevWarnAlways, "FastRestore")
.detail("NotAllAppliersAreUsed", keyrangeSplitter.size())
.detail("NumAppliers", numAppliers);
} else if (keyrangeSplitter.size() > numAppliers) {
TraceEvent(SevError, "FastRestore")
.detail("TooManySlotsThanAppliers", keyrangeSplitter.size())
.detail("NumAppliers", numAppliers);
}
// std::sort(keyrangeSplitter.begin(), keyrangeSplitter.end());
int i = 0;
self->rangeToApplier.clear();
for (auto& applier : self->appliersInterf) {
if (i == 0) {
self->rangeToApplier[normalKeys.begin] = applier.first;
} else {
self->rangeToApplier[keyrangeSplitter[i]] = applier.first;
if (i >= keyrangeSplitter.size()) {
break; // Not all appliers will be used
}
self->rangeToApplier[keyrangeSplitter[i]] = applier.first;
i++;
}
self->logApplierKeyRange();
@ -472,6 +500,8 @@ ACTOR static Future<Void> initializeVersionBatch(Reference<RestoreMasterData> se
}
wait(sendBatchRequests(&RestoreLoaderInterface::initVersionBatch, self->loadersInterf, requestsToLoaders));
self->resetPerVersionBatch();
return Void();
}

View File

@ -62,6 +62,9 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
Reference<IBackupContainer> bc; // Backup container is used to read backup files
Key bcUrl; // The url used to get the bc
IndexedSet<Key, int64_t> samples; // sample of range and log files
double samplesSize; // sum of the metric of all samples
void addref() { return ReferenceCounted<RestoreMasterData>::addref(); }
void delref() { return ReferenceCounted<RestoreMasterData>::delref(); }
@ -73,7 +76,13 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
~RestoreMasterData() = default;
void resetPerVersionBatch() {}
void resetPerVersionBatch() {
TraceEvent("FastRestore")
.detail("RestoreMaster", "ResetPerVersionBatch")
.detail("VersionBatchIndex", batchIndex);
samplesSize = 0;
samples.clear();
}
std::string describeNode() {
std::stringstream ss;

View File

@ -52,7 +52,7 @@ struct RestoreMasterData;
struct RestoreSimpleRequest;
typedef std::map<Version, Standalone<VectorRef<MutationRef>>> VersionedMutationsMap;
using VersionedMutationsMap = std::map<Version, MutationsVec>;
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
@ -111,8 +111,8 @@ public:
UID nodeID;
int nodeIndex;
std::map<UID, RestoreLoaderInterface> loadersInterf;
std::map<UID, RestoreApplierInterface> appliersInterf;
std::map<UID, RestoreLoaderInterface> loadersInterf; // UID: loaderInterf's id
std::map<UID, RestoreApplierInterface> appliersInterf; // UID: applierInterf's id
RestoreApplierInterface masterApplierInterf;
NotifiedVersion versionBatchId; // Continuously increase for each versionBatch

View File

@ -26,6 +26,7 @@
#pragma once
#include "fdbclient/Tuple.h"
#include "fdbclient/CommitTransaction.h"
#include "flow/flow.h"
#include "flow/Stats.h"
#include "fdbrpc/TimedRequest.h"
@ -37,6 +38,8 @@
//#define SevFRMutationInfo SevVerbose
#define SevFRMutationInfo SevInfo
using MutationsVec = Standalone<VectorRef<MutationRef>>;
enum class RestoreRole { Invalid = 0, Master = 1, Loader, Applier };
BINARY_SERIALIZABLE(RestoreRole);
std::string getRoleStr(RestoreRole role);

View File

@ -105,7 +105,8 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
DUMPTOKEN(recruited.finishRestore);
actors->add(restoreLoaderCore(self->loaderInterf.get(), req.nodeIndex, cx));
TraceEvent("FastRestore").detail("RecruitedLoaderNodeIndex", req.nodeIndex);
req.reply.send(RestoreRecruitRoleReply(self->id(), RestoreRole::Loader, self->loaderInterf.get()));
req.reply.send(
RestoreRecruitRoleReply(self->loaderInterf.get().id(), RestoreRole::Loader, self->loaderInterf.get()));
} else if (req.role == RestoreRole::Applier) {
ASSERT(!self->applierInterf.present());
self->applierInterf = RestoreApplierInterface();
@ -118,7 +119,8 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
DUMPTOKEN(recruited.finishRestore);
actors->add(restoreApplierCore(self->applierInterf.get(), req.nodeIndex, cx));
TraceEvent("FastRestore").detail("RecruitedApplierNodeIndex", req.nodeIndex);
req.reply.send(RestoreRecruitRoleReply(self->id(), RestoreRole::Applier, self->applierInterf.get()));
req.reply.send(
RestoreRecruitRoleReply(self->applierInterf.get().id(), RestoreRole::Applier, self->applierInterf.get()));
} else {
TraceEvent(SevError, "FastRestore")
.detail("HandleRecruitRoleRequest", "UnknownRole"); //.detail("Request", req.printable());

View File

@ -492,6 +492,12 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state FileBackupAgent backupAgent;
state Future<Void> extraBackup;
state bool extraTasks = false;
state ReadYourWritesTransaction tr1(cx);
state ReadYourWritesTransaction tr2(cx);
state UID randomID = nondeterministicRandom()->randomUniqueID();
state int restoreIndex = 0;
state bool restoreDone = false;
TraceEvent("BARW_Arguments")
.detail("BackupTag", printable(self->backupTag))
.detail("PerformRestore", self->performRestore)
@ -500,7 +506,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("AbortAndRestartAfter", self->abortAndRestartAfter)
.detail("DifferentialAfter", self->stopDifferentialAfter);
state UID randomID = nondeterministicRandom()->randomUniqueID();
if (self->allowPauses && BUGGIFY) {
state Future<Void> cp = changePaused(cx, &backupAgent);
}
@ -610,12 +615,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state std::vector<Future<Version>> restores;
state std::vector<Standalone<StringRef>> restoreTags;
state int restoreIndex;
// Restore each range by calling backupAgent.restore()
printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size());
state Transaction tr1(cx);
loop {
tr1.reset();
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr1.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
@ -667,8 +671,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// We should wait on all restore before proceeds
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
state bool restoreDone = false;
state ReadYourWritesTransaction tr2(cx);
restoreDone = false;
state Future<Void> watchForRestoreRequestDone;
loop {
try {