FastRestore:Refactor RestoreLoader and fix bugs

Refactor RestoreLoader code and
Fix a bug in notifying restore finish.
This commit is contained in:
Meng Xu 2019-06-04 11:40:23 -07:00
parent 477fd152c0
commit 3fcb6ec0a1
9 changed files with 88 additions and 107 deletions

View File

@ -3759,9 +3759,7 @@ ACTOR static Future<FileBackupAgent::ERestoreState> waitFastRestore(Database cx,
wait( tr.commit() );
}
// The clear transaction may fail in uncertain state, which may already clear the restoreRequestDoneKey
if ( !restoreRequestDone ) {
wait(watch4RestoreRequestDone);
}
if ( restoreRequestDone ) break;
} catch( Error &e ) {
wait( tr.onError(e) );
}

View File

@ -36,7 +36,7 @@
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVersionedRequest req, Reference<RestoreApplierData> self);
ACTOR Future<Void> handleApplyToDBRequest(RestoreSimpleRequest req, Reference<RestoreApplierData> self, Database cx);
ACTOR Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self, Database cx);
ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, RestoreApplierInterface applierInterf, Database cx) {
state ActorCollection actors(false);
@ -62,7 +62,7 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
requestTypeStr = "sendMutationVector";
actors.add( handleSendMutationVectorRequest(req, self) );
}
when ( RestoreSimpleRequest req = waitNext(applierInterf.applyToDB.getFuture()) ) {
when ( RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture()) ) {
requestTypeStr = "applyToDB";
actors.add( handleApplyToDBRequest(req, self, cx) );
}
@ -70,12 +70,12 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
requestTypeStr = "initVersionBatch";
actors.add(handleInitVersionBatchRequest(req, self));
}
when ( RestoreSimpleRequest req = waitNext(applierInterf.finishRestore.getFuture()) ) {
when ( RestoreVersionBatchRequest req = waitNext(applierInterf.finishRestore.getFuture()) ) {
requestTypeStr = "finishRestore";
exitRole = handlerFinishRestoreRequest(req, self, cx);
exitRole = handleFinishRestoreRequest(req, self, cx);
}
when ( wait(exitRole) ) {
TraceEvent("FastRestore").detail("RestoreApplierCore", "ExitRole");
TraceEvent("FastRestore").detail("RestoreApplierCore", "ExitRole").detail("NodeID", self->id());
break;
}
}
@ -114,7 +114,7 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state Version commitVersion = req.version;
VectorRef<MutationRef> mutations(req.mutations);
printf("[DEBUG] Node:%s receive %d mutations at version:%ld\n", self->describeNode().c_str(), mutations.size(), commitVersion);
// printf("[DEBUG] Node:%s receive %d mutations at version:%ld\n", self->describeNode().c_str(), mutations.size(), commitVersion);
if ( self->kvOps.find(commitVersion) == self->kvOps.end() ) {
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
}
@ -124,8 +124,8 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
self->kvOps[commitVersion].push_back_deep(self->kvOps[commitVersion].arena(), mutation);
numMutations++;
//if ( numMutations % 100000 == 1 ) { // Should be different value in simulation and in real mode
printf("[INFO][Applier] Node:%s Receives %d mutations. cur_mutation:%s\n",
self->describeNode().c_str(), numMutations, mutation.toString().c_str());
// printf("[INFO][Applier] Node:%s Receives %d mutations. cur_mutation:%s\n",
// self->describeNode().c_str(), numMutations, mutation.toString().c_str());
//}
}
@ -148,8 +148,14 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
// Assume the process will not crash when it apply mutations to DB. The reply message can be lost though
if (self->kvOps.empty()) {
printf("Node:%s kvOps is empty. No-op for apply to DB\n", self->describeNode().c_str());
TraceEvent("FastRestore").detail("ApplierApplyToDBEmpty", self->id());
return Void();
}
std::map<Version, Standalone<VectorRef<MutationRef>>>::iterator begin = self->kvOps.begin();
std::map<Version, Standalone<VectorRef<MutationRef>>>::iterator end = self->kvOps.end();
end--;
ASSERT_WE_THINK(end != self->kvOps.end());
TraceEvent("FastRestore").detail("ApplierApplyToDB", self->id()).detail("FromVersion", begin->first).detail("EndVersion", end->first);
self->sanityCheckMutationOps();
@ -174,6 +180,7 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
for ( ; it != self->kvOps.end(); ++it ) {
numVersion++;
//TraceEvent("FastRestore").detail("Applier", self->id()).detail("ApplyKVsToDBVersion", it->first);
if ( debug_verbose ) {
TraceEvent("ApplyKVOPsToDB\t").detail("Version", it->first).detail("OpNum", it->second.size());
}
@ -263,20 +270,17 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
return Void();
}
ACTOR Future<Void> handleApplyToDBRequest(RestoreSimpleRequest req, Reference<RestoreApplierData> self, Database cx) {
ACTOR Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self, Database cx) {
TraceEvent("FastRestore").detail("ApplierApplyToDB", self->id()).detail("DBApplierPresent", self->dbApplier.present());
if ( !self->dbApplier.present() ) {
self->dbApplier = Never();
//self->dbApplier = Never();
self->dbApplier = applyToDB(self, cx);
wait( self->dbApplier.get() );
} else {
ASSERT( self->dbApplier.present() );
wait( self->dbApplier.get() );
}
ASSERT(self->dbApplier.present());
wait( self->dbApplier.get() );
req.reply.send(RestoreCommonReply(self->id()));
return Void();
}
}

View File

@ -80,12 +80,12 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
requestTypeStr = "initVersionBatch";
actors.add( handleInitVersionBatchRequest(req, self) );
}
when ( RestoreSimpleRequest req = waitNext(loaderInterf.finishRestore.getFuture()) ) {
when ( RestoreVersionBatchRequest req = waitNext(loaderInterf.finishRestore.getFuture()) ) {
requestTypeStr = "finishRestore";
exitRole = handlerFinishRestoreRequest(req, self, cx);
exitRole = handleFinishRestoreRequest(req, self, cx);
}
when ( wait(exitRole) ) {
TraceEvent("FastRestore").detail("RestoreLoaderCore", "ExitRole");
TraceEvent("FastRestore").detail("RestoreLoaderCore", "ExitRole").detail("NodeID", self->id());
break;
}
}
@ -109,21 +109,18 @@ ACTOR Future<Void> handleSetApplierKeyRangeVectorRequest(RestoreSetApplierKeyRan
}
ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoaderData> self) {
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartProcessLoadParam", param.toString());
ASSERT( param.blockSize > 0 );
ASSERT(param.offset % param.blockSize == 0); // Parse file must be at block bondary.
// Temporary data structure for parsing range and log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
state VersionedMutationsMap kvOps;
state SerializedMutationListMap mutationMap; // Key is the unique identifier for a batch of mutation logs at the same version
state std::map<Standalone<StringRef>, uint32_t> mutationPartMap; // Sanity check the data parsing is correct
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestore").detail("Loader", self->id()).detail("StartLoadingFile", param.filename);
ASSERT( param.blockSize > 0 );
state std::vector<Future<Void>> fileParserFutures;
if (param.offset % param.blockSize != 0) {
fprintf(stderr, "[WARNING] Parse file not at block boundary! param.offset:%ld param.blocksize:%ld, remainder:%ld\n",
param.offset, param.blockSize, param.offset % param.blockSize);
}
state int64_t j;
state int64_t readOffset;
state int64_t readLen;
@ -151,7 +148,7 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, bool isSampling) {
if (self->processedFileParams.find(req.param) == self->processedFileParams.end()) {
//printf("self->processedFileParams.size:%d Process param:%s\n", self->processedFileParams.size(), req.param.toString().c_str());
TraceEvent("FastRestore").detail("Loader", self->id()).detail("ProcessLoadParam", req.param.toString());
self->processedFileParams[req.param] = Never();
self->processedFileParams[req.param] = _processLoadingParam(req.param, self);
}
@ -177,7 +174,6 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
if ( kvOps.find(endVersion) == kvOps.end() ) {
kvOps[endVersion] = VectorRef<MutationRef>(); // Empty mutation vector will be handled by applier
}
//self->printAppliersKeyRange();
state std::map<UID, Standalone<VectorRef<MutationRef>>> applierMutationsBuffer; // The mutation vector to be sent to each applier
state std::map<UID, double> applierMutationsSize; // buffered mutation vector size for each applier
@ -193,7 +189,6 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
state VersionedMutationsMap::iterator kvOp;
for ( kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
// In case try-catch has error and loop back
applierMutationsBuffer.clear();
applierMutationsSize.clear();
for (auto &applierID : applierIDs) {
@ -213,32 +208,24 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
nodeIDs.pop_front(nodeIDs.size());
// WARNING: The splitMutation() may have bugs
splitMutation(self, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
printf("SPLITMUTATION: mvector.size:%d\n", mvector.size());
ASSERT(mvector.size() == nodeIDs.size());
for (splitMutationIndex = 0; splitMutationIndex < mvector.size(); splitMutationIndex++ ) {
MutationRef mutation = mvector[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
printf("SPLITTED MUTATION: %d: mutation:%s applierID:%s\n", splitMutationIndex, mutation.toString().c_str(), applierID.toString().c_str());
//printf("SPLITTED MUTATION: %d: mutation:%s applierID:%s\n", splitMutationIndex, mutation.toString().c_str(), applierID.toString().c_str());
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation); // Q: Maybe push_back_deep()?
applierMutationsSize[applierID] += mutation.expectedSize();
kvCount++;
}
} else { // mutation operates on a particular key
std::map<Standalone<KeyRef>, UID>::iterator itlow = self->range2Applier.lower_bound(kvm.param1); // lower_bound returns the iterator that is >= m.param1
// make sure itlow->first <= m.param1
if ( itlow == self->range2Applier.end() || itlow->first > kvm.param1 ) {
if ( itlow == self->range2Applier.begin() ) {
fprintf(stderr, "KV-Applier: SHOULD NOT HAPPEN. kvm.param1:%s\n", kvm.param1.toString().c_str());
}
--itlow;
}
std::map<Standalone<KeyRef>, UID>::iterator itlow = self->range2Applier.upper_bound(kvm.param1); // lower_bound returns the iterator that is > m.param1
--itlow; // make sure itlow->first <= m.param1
ASSERT( itlow->first <= kvm.param1 );
MutationRef mutation = kvm;
UID applierID = itlow->second;
printf("KV--Applier: K:%s ApplierID:%s\n", kvm.param1.toString().c_str(), applierID.toString().c_str());
//printf("KV--Applier: K:%s ApplierID:%s\n", kvm.param1.toString().c_str(), applierID.toString().c_str());
kvCount++;
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation); // Q: Maybe push_back_deep()?
@ -269,7 +256,7 @@ void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mve
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
// key range [m->param1, m->param2)
printf("SPLITMUTATION: orignal mutation:%s\n", m.toString().c_str());
// printf("SPLITMUTATION: orignal mutation:%s\n", m.toString().c_str());
std::map<Standalone<KeyRef>, UID>::iterator itlow, itup; //we will return [itlow, itup)
itlow = self->range2Applier.lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
if ( itlow->first > m.param1 ) {
@ -279,7 +266,7 @@ void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mve
}
itup = self->range2Applier.upper_bound(m.param2); // upper_bound returns the iterator that is > m.param2; return rmap::end if no keys are considered to go after m.param2.
printf("SPLITMUTATION: itlow_key:%s itup_key:%s\n", itlow->first.toString().c_str(), itup == self->range2Applier.end() ? "[end]" : itup->first.toString().c_str());
// printf("SPLITMUTATION: itlow_key:%s itup_key:%s\n", itlow->first.toString().c_str(), itup == self->range2Applier.end() ? "[end]" : itup->first.toString().c_str());
ASSERT( itup == self->range2Applier.end() || itup->first > m.param2 );
std::map<Standalone<KeyRef>, UID>::iterator itApplier;
@ -303,13 +290,13 @@ void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mve
} else {
curm.param2 = itlow->first;
}
printf("SPLITMUTATION: mvector.push_back:%s\n", curm.toString().c_str());
// printf("SPLITMUTATION: mvector.push_back:%s\n", curm.toString().c_str());
ASSERT( curm.param1 <= curm.param2 );
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
}
printf("SPLITMUTATION: mvector.size:%d\n", mvector.size());
// printf("SPLITMUTATION: mvector.size:%d\n", mvector.size());
return;
}
@ -430,6 +417,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsM
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
state Standalone<VectorRef<KeyValueRef>> blockData = wait(parallelFileRestore::decodeRangeFileBlock(inFile, readOffset, readLen));
TraceEvent("FastRestore").detail("DecodedRangeFile", fileName).detail("DataSize", blockData.contents().size());
// First and last key are the range for this file
state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
@ -457,8 +445,6 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsM
// Now data only contains the kv mutation within restoreRange
state VectorRef<KeyValueRef> data = blockData.slice(rangeStart, rangeEnd);
printf("[INFO] RangeFile:%s blockData entry size:%d recovered data size:%d\n", fileName.c_str(), blockData.size(), data.size()); // TO_DELETE
state int start = 0;
state int end = data.size();
@ -488,14 +474,10 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsM
std::string fileName, int64_t readOffset, int64_t readLen,
KeyRange restoreRange, Key addPrefix, Key removePrefix,
Key mutationLogPrefix) {
state Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
printf("Parse log file:%s readOffset:%d readLen:%ld\n", fileName.c_str(), readOffset, readLen);
// decodeLogFileBlock() must read block by block!
state Standalone<VectorRef<KeyValueRef>> data = wait(parallelFileRestore::decodeLogFileBlock(inFile, readOffset, readLen));
TraceEvent("FastRestore").detail("DecodedLogFileName", fileName).detail("DataSize", data.contents().size());
TraceEvent("FastRestore").detail("DecodedLogFile", fileName).detail("DataSize", data.contents().size());
state int start = 0;
state int end = data.size();

View File

@ -114,19 +114,6 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
bcUrl = url;
bc = IBackupContainer::openContainer(url.toString());
}
void printAppliersKeyRange() {
printf("[INFO] The mapping of KeyRange_start --> Applier ID: getHexString\n");
// applier type: std::map<Standalone<KeyRef>, UID>
for (auto &applier : range2Applier) {
printf("\t[INFO]%s -> %s\n", getHexString(applier.first).c_str(), applier.second.toString().c_str());
}
printf("[INFO] The mapping of KeyRange_start --> Applier ID: toString\n");
// applier type: std::map<Standalone<KeyRef>, UID>
for (auto &applier : range2Applier) {
printf("\t[INFO]%s -> %s\n", applier.first.toString().c_str(), applier.second.toString().c_str());
}
}
};

View File

@ -66,7 +66,7 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreMasterData> self, Databas
state Standalone<VectorRef<RestoreRequest>> restoreRequests = wait( collectRestoreRequests(cx) );
// lock DB for restore
wait(lockDatabase(cx,randomUID));
wait( lockDatabase(cx,randomUID) );
wait( _clearDB(cx) );
// Step: Perform the restore requests
@ -76,8 +76,14 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreMasterData> self, Databas
}
// Step: Notify all restore requests have been handled by cleaning up the restore keys
wait( notifyRestoreCompleted(self, cx) );
wait(unlockDatabase(cx,randomUID));
wait( notifyRestoreCompleted(self, cx) );
try {
wait( unlockDatabase(cx,randomUID) );
} catch(Error &e) {
printf(" unlockDB fails. uid:%s\n", randomUID.toString().c_str());
}
TraceEvent("FastRestore").detail("RestoreMasterComplete", self->id());
@ -118,9 +124,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
for (auto &file : *files) {
// NOTE: Cannot skip empty files because empty files, e.g., log file, still need to generate dummy mutation to drive applier's NotifiedVersion (e.g., logVersion and rangeVersion)
// if (file.fileSize <= 0) {
// continue;
// }
if ( loader == self->loadersInterf.end() ) {
loader = self->loadersInterf.begin();
}
@ -159,17 +162,12 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
}
ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMasterData> self, Database cx, RestoreRequest request, VersionBatch versionBatch) {
if ( self->isBackupEmpty() ) { // TODO: Change to the version batch files
printf("[WARNING] Node:%s distributeWorkloadPerVersionBatch() load an empty batch of backup. Print out the empty backup files info.\n", self->describeNode().c_str());
self->printBackupFilesInfo();
return Void();
}
ASSERT( !versionBatch.isEmpty() );
ASSERT( self->loadersInterf.size() > 0 );
ASSERT( self->appliersInterf.size() > 0 );
dummySampleWorkload(self);
wait( notifyLoaderAppliersKeyRange(self) );
// Parse log files and send mutations to appliers before we parse range files
@ -201,6 +199,7 @@ void dummySampleWorkload(Reference<RestoreMasterData> self) {
self->range2Applier[StringRef(keyrangeSplitter[i].toString())] = applier.first;
}
}
self->logApplierKeyRange();
}
ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequests(Database cx) {
@ -308,12 +307,13 @@ ACTOR static Future<Void> initializeVersionBatch(Reference<RestoreMasterData> se
// Ask each applier to apply its received mutations to DB
ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<RestoreMasterData> self) {
// Prepare the applyToDB requests
std::vector<std::pair<UID, RestoreSimpleRequest>> requests;
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
for (auto& applier : self->appliersInterf) {
requests.push_back( std::make_pair(applier.first, RestoreSimpleRequest()) );
requests.push_back( std::make_pair(applier.first, RestoreVersionBatchRequest(self->batchIndex)) );
}
wait( sendBatchRequests(&RestoreApplierInterface::applyToDB, self->appliersInterf, requests) );
TraceEvent("FastRestore").detail("Master", self->id()).detail("ApplyToDB", "Completed");
return Void();
}
@ -331,15 +331,15 @@ ACTOR static Future<Void> notifyLoaderAppliersKeyRange(Reference<RestoreMasterDa
// Ask all loaders and appliers to perform housecleaning at the end of restore and
// Register the restoreRequestDoneKey to signal the end of restore
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> self, Database cx) {
std::vector<std::pair<UID, RestoreSimpleRequest>> requests;
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
for ( auto &loader : self->loadersInterf ) {
requests.push_back( std::make_pair(loader.first, RestoreSimpleRequest()) );
requests.push_back( std::make_pair(loader.first, RestoreVersionBatchRequest(self->batchIndex)) );
}
wait( sendBatchRequests(&RestoreLoaderInterface::finishRestore, self->loadersInterf, requests) );
std::vector<std::pair<UID, RestoreSimpleRequest>> requests;
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
for ( auto &applier : self->appliersInterf ) {
requests.push_back( std::make_pair(applier.first, RestoreSimpleRequest()) );
requests.push_back( std::make_pair(applier.first, RestoreVersionBatchRequest(self->batchIndex)) );
}
wait( sendBatchRequests(&RestoreApplierInterface::finishRestore, self->appliersInterf, requests) );

View File

@ -46,6 +46,10 @@ struct VersionBatch {
Version endVersion; // Exclusive
std::vector<RestoreFileFR> logFiles;
std::vector<RestoreFileFR> rangeFiles;
bool isEmpty() {
return logFiles.empty() && rangeFiles.empty();
}
};
struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMasterData> {
@ -178,11 +182,10 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMast
}
}
void printAppliersKeyRange() {
printf("[INFO] The mapping of KeyRange_start --> Applier ID\n");
// applier type: std::map<Standalone<KeyRef>, UID>
void logApplierKeyRange() {
TraceEvent("FastRestore").detail("ApplierKeyRangeNum", range2Applier.size());
for (auto &applier : range2Applier) {
printf("\t[INFO]%s -> %s\n", getHexString(applier.first).c_str(), applier.second.toString().c_str());
TraceEvent("FastRestore").detail("KeyRangeLowerBound", applier.first).detail("Applier", applier.second);
}
}

View File

@ -43,11 +43,14 @@ ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id) {
return Void();
}
ACTOR Future<Void> handlerFinishRestoreRequest(RestoreSimpleRequest req, Reference<RestoreRoleData> self, Database cx) {
ACTOR Future<Void> handleFinishRestoreRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self, Database cx) {
if ( self->versionBatchStart ) {
self->versionBatchStart = false;
}
TraceEvent("FastRestore").detail("FinishRestoreRequest", req.batchID)
.detail("Role", getRoleStr(self->role)).detail("Node", self->id());
req.reply.send( RestoreCommonReply(self->id()) );
return Void();
@ -57,6 +60,13 @@ ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req,
if ( !self->versionBatchStart ) {
self->versionBatchStart = true;
self->resetPerVersionBatch();
// if ( self->role == RestoreRole::Applier) {
// RestoreApplierData* applier = (RestoreApplierData*) self.getPtr();
// applier->dbApplier = Optional<Future<Void>>(); // reset dbApplier for next version batch
// // if ( applier->dbApplier.present() ) {
// // applier->dbApplier.~Optional(); // reset dbApplier for next version batch
// // }
// }
}
TraceEvent("FastRestore").detail("InitVersionBatch", req.batchID)
.detail("Role", getRoleStr(self->role)).detail("Node", self->id());
@ -183,11 +193,3 @@ void printLowerBounds(std::vector<Standalone<KeyRef>> lowerBounds) {
}
}
void printApplierKeyRangeInfo(std::map<UID, Standalone<KeyRangeRef>> appliers) {
printf("[INFO] appliers num:%ld\n", appliers.size());
int index = 0;
for(auto &applier : appliers) {
printf("\t[INFO][Applier:%d] ID:%s --> KeyRange:%s\n", index, applier.first.toString().c_str(), applier.second.toString().c_str());
}
}

View File

@ -55,7 +55,7 @@ typedef std::map<Version, Standalone<VectorRef<MutationRef>>> VersionedMutations
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
ACTOR Future<Void> handlerFinishRestoreRequest(RestoreSimpleRequest req, Reference<RestoreRoleData> self, Database cx);
ACTOR Future<Void> handleFinishRestoreRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self, Database cx);
// Helper class for reading restore data from a buffer and throwing the right errors.
@ -186,7 +186,6 @@ public:
};
void printLowerBounds(std::vector<Standalone<KeyRef>> lowerBounds);
void printApplierKeyRangeInfo(std::map<UID, Standalone<KeyRangeRef>> appliers);
#include "flow/unactorcompiler.h"
#endif

View File

@ -78,6 +78,12 @@ struct RestoreSimpleRequest : TimedRequest {
void serialize( Ar& ar ) {
serializer(ar, reply);
}
std::string toString() const {
std::stringstream ss;
ss << "RestoreSimpleRequest";
return ss.str();
}
};
#endif //FDBSERVER_RESTOREUTIL_ACTOR_H