FastRestore:Refactor LoadFileRequest

1) Remove global map to buffer the parsed mutations on loader.
   Use local map instead to increase parallelism.
2) Use std::map<LoadingParam, Future<Void>> to hold the actor
that parse a backup file and to de-duplicate requests.
3) Remove unused code.
This commit is contained in:
Meng Xu 2019-05-27 18:39:30 -07:00
parent 8daea823d8
commit d56837ba16
4 changed files with 239 additions and 508 deletions

View File

@ -81,7 +81,7 @@ ACTOR Future<Void> monitorleader(Reference<AsyncVar<RestoreWorkerInterface>> lea
ACTOR Future<Void> startRestoreWorkerLeader(Reference<RestoreWorkerData> self, RestoreWorkerInterface workerInterf, Database cx);
ACTOR Future<Void> handleRestoreSysInfoRequest(RestoreSysInfoRequest req, Reference<RestoreWorkerData> self);
bool debug_verbose = true;
bool debug_verbose = false;
void printGlobalNodeStatus(Reference<RestoreWorkerData>);
@ -428,9 +428,9 @@ ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreWorkerData> self)
ASSERT( self->masterData.isValid() );
ASSERT( !self->masterData->loadersInterf.empty() );
RestoreSysInfo sysInfo(self->masterData->appliersInterf);
std::map<UID, RestoreSysInfoRequest> requests;
std::vector<std::pair<UID, RestoreSysInfoRequest>> requests;
for (auto &worker : self->workerInterfaces) {
requests[worker.first] = RestoreSysInfoRequest(sysInfo);
requests.push_back( std::make_pair(worker.first, RestoreSysInfoRequest(sysInfo)) );
}
printf("Master: distributeRestoreSysInfo\n");
wait( sendBatchRequests(&RestoreWorkerInterface::updateRestoreSysInfo, self->workerInterfaces, requests) );

View File

@ -26,22 +26,26 @@
#include "flow/actorcompiler.h" // This must be the last #include.
typedef std::map<Version, Standalone<VectorRef<MutationRef>>> VersionedMutationsMap;
ACTOR Future<Void> handleSetApplierKeyRangeVectorRequest(RestoreSetApplierKeyRangeVectorRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, bool isSampling = false);
ACTOR Future<Void> handleLoadLogFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, bool isSampling = false);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(Reference<RestoreLoaderData> self,
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, bool isSampling = false);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(std::map<Standalone<StringRef>, Standalone<StringRef>> *mutationMap,
std::map<Standalone<StringRef>, uint32_t> *mutationPartMap,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen,
KeyRange restoreRange, Key addPrefix, Key removePrefix,
Key mutationLogPrefix);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLoaderData> self,
Key mutationLogPrefix);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(std::map<Version, Standalone<VectorRef<MutationRef>>> *kvOps,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset_input, int64_t readLen_input,
KeyRange restoreRange, Key addPrefix, Key removePrefix);
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self, bool isRangeFile, Version prevVersion, Version endVersion);
void parseSerializedMutation(Reference<RestoreLoaderData> self, bool isSampling);
KeyRange restoreRange, Key addPrefix, Key removePrefix);
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
std::map<Version, Standalone<VectorRef<MutationRef>>> *kvOps,
bool isRangeFile, Version startVersion, Version endVersion);
void _parseSerializedMutation(std::map<Version, Standalone<VectorRef<MutationRef>>> *kvOps,
std::map<Standalone<StringRef>, Standalone<StringRef>> *mutationMap,
bool isSampling = false);
bool isRangeMutation(MutationRef m);
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) ;
@ -71,17 +75,11 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
requestTypeStr = "setApplierKeyRangeVectorRequest";
actors.add(handleSetApplierKeyRangeVectorRequest(req, self));
}
when ( RestoreLoadFileRequest req = waitNext(loaderInterf.loadRangeFile.getFuture()) ) {
requestTypeStr = "loadRangeFile";
when ( RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture()) ) {
requestTypeStr = "loadFile";
self->initBackupContainer(req.param.url);
actors.add( handleLoadRangeFileRequest(req, self, false) );
actors.add( handleLoadFileRequest(req, self, false) );
}
when ( RestoreLoadFileRequest req = waitNext(loaderInterf.loadLogFile.getFuture()) ) {
requestTypeStr = "loadLogFile";
self->initBackupContainer(req.param.url);
actors.add( handleLoadLogFileRequest(req, self, false) );
}
when ( RestoreVersionBatchRequest req = waitNext(loaderInterf.initVersionBatch.getFuture()) ) {
requestTypeStr = "initVersionBatch";
actors.add( handleInitVersionBatchRequest(req, self) );
@ -136,57 +134,16 @@ ACTOR Future<Void> handleSetApplierKeyRangeVectorRequest(RestoreSetApplierKeyRan
return Void();
}
ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, bool isSampling) {
//printf("[INFO] Worker Node:%s starts handleLoadRangeFileRequest\n", self->describeNode().c_str());
// TODO: MX:
ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoaderData> self) {
// Temporary data structure for parsing range and log files into (version, <K, V, mutationType>)
state std::map<Version, Standalone<VectorRef<MutationRef>>> kvOps;
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
state std::map<Standalone<StringRef>, Standalone<StringRef>> mutationMap; // Key is the unique identifier for a batch of mutation logs at the same version
state std::map<Standalone<StringRef>, uint32_t> mutationPartMap; // Sanity check the data parsing is correct
state LoadingParam param;
state int64_t beginBlock = 0;
state int64_t j = 0;
state int64_t readLen = 0;
state int64_t readOffset = 0;
state Reference<IBackupContainer> bc;
param = req.param;
beginBlock = 0;
j = 0;
readLen = 0;
readOffset = 0;
readOffset = param.offset;
state RestoreCommandEnum cmdType = RestoreCommandEnum::Init;
if ( isSampling ) {
cmdType = RestoreCommandEnum::Sample_Range_File;
} else {
cmdType = RestoreCommandEnum::Assign_Loader_Range_File;
}
while (self->isInProgress(cmdType)) {
printf("[DEBUG] NODE:%s handleLoadRangeFileRequest wait for 5s\n", self->describeNode().c_str());
wait(delay(1.0));
}
//Note: handle duplicate message delivery
if (self->processedFiles.find(param.filename) != self->processedFiles.end() ||
self->isCmdProcessed(req.cmdID)) {
// printf("[WARNING]Node:%s, CMDUID:%s file:%s is delivered more than once! Reply directly without loading the file\n",
// self->describeNode().c_str(), req.cmdID.toString().c_str(),
// param.filename.c_str());
req.reply.send(RestoreCommonReply(self->id(),req.cmdID));
return Void();
}
self->setInProgressFlag(cmdType);
printf("[INFO][Loader] Node:%s, CMDUID:%s Execute: handleLoadRangeFileRequest, loading param:%s\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.toString().c_str());
bc = self->bc;
self->kvOps.clear(); //Clear kvOps so that kvOps only hold mutations for the current data block. We will send all mutations in kvOps to applier
self->mutationMap.clear();
self->mutationPartMap.clear();
printf("[INFO][Loader] Node:%s, Execute: handleLoadFileRequest, loading param:%s\n",
self->describeNode().c_str(), param.toString().c_str());
ASSERT( param.blockSize > 0 );
//state std::vector<Future<Void>> fileParserFutures;
@ -194,155 +151,61 @@ ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Refere
printf("[WARNING] Parse file not at block boundary! param.offset:%ld param.blocksize:%ld, remainder:%ld\n",
param.offset, param.blockSize, param.offset % param.blockSize);
}
state int64_t j;
state int64_t readOffset;
state int64_t readLen;
for (j = param.offset; j < param.length; j += param.blockSize) {
readOffset = j;
readLen = std::min<int64_t>(param.blockSize, param.length - j);
printf("[DEBUG_TMP] _parseRangeFileToMutationsOnLoader starts\n");
wait( _parseRangeFileToMutationsOnLoader(self, bc, param.version, param.filename, readOffset, readLen, param.restoreRange, param.addPrefix, param.removePrefix) );
if ( param.isRangeFile ) {
wait( _parseRangeFileToMutationsOnLoader(&kvOps, self->bc, param.version, param.filename, readOffset, readLen, param.restoreRange, param.addPrefix, param.removePrefix) );
} else {
wait( _parseLogFileToMutationsOnLoader(&mutationMap, &mutationPartMap, self->bc, param.version, param.filename, readOffset, readLen, param.restoreRange, param.addPrefix, param.removePrefix, param.mutationLogPrefix) );
}
printf("[DEBUG_TMP] _parseRangeFileToMutationsOnLoader ends\n");
++beginBlock;
}
printf("[INFO][Loader] Node:%s CMDUID:%s finishes process Range file:%s\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.filename.c_str());
// TODO: Send to applier to apply the mutations
// printf("[INFO][Loader] Node:%s CMDUID:%s will send range mutations to applier\n",
// // self->describeNode().c_str(), self->cmdID.toString().c_str());
// if ( isSampling ) {
// wait( registerMutationsToMasterApplier(self) );
// } else {
// wait( registerMutationsToApplier(self, true, req.param.prevVersion, req.param.endVersion) ); // Send the parsed mutation to applier who will apply the mutation to DB
// }
wait( registerMutationsToApplier(self, true, req.param.prevVersion, req.param.endVersion) ); // Send the parsed mutation to applier who will apply the mutation to DB
printf("[INFO][Loader] Finishes process Range file:%s\n", param.filename.c_str());
// wait ( delay(1.0) );
if ( !isSampling ) {
self->processedFiles[param.filename] = 1;
if ( !param.isRangeFile ) {
_parseSerializedMutation(&kvOps, &mutationMap);
}
self->processedCmd[req.cmdID] = 1;
wait( registerMutationsToApplier(self, &kvOps, true, param.prevVersion, param.endVersion) ); // Send the parsed mutation to applier who will apply the mutation to DB
return Void();
}
self->clearInProgressFlag(cmdType);
printf("[INFO][Loader] Node:%s CMDUID:%s clear inProgressFlag :%lx for Assign_Loader_Range_File.\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(), self->inProgressFlag);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, bool isSampling) {
try {
if (self->processedFileParams.find(req.param) == self->processedFileParams.end()) {
// Deduplicate the same requests
printf("self->processedFileParams.size:%d Process param:%s\n", self->processedFileParams.size(), req.param.toString().c_str());
self->processedFileParams[req.param] = Never();
self->processedFileParams[req.param] = _processLoadingParam(req.param, self);
printf("processedFileParam.size:%d\n", self->processedFileParams.size());
printf("processedFileParam[req.param].ready:%d\n", self->processedFileParams[req.param].isReady());
ASSERT(self->processedFileParams.find(req.param) != self->processedFileParams.end());
wait(self->processedFileParams[req.param]);
} else {
ASSERT(self->processedFileParams.find(req.param) != self->processedFileParams.end());
printf("Process param that is being processed:%s\n", req.param.toString().c_str());
wait(self->processedFileParams[req.param]);
}
} catch (Error &e) {
fprintf(stdout, "[ERROR] handleLoadFileRequest Node:%s, error. error code:%d, error message:%s\n", self->describeNode().c_str(),
e.code(), e.what());
}
//Send ack to master that loader has finished loading the data
printf("[INFO][Loader] Node:%s CMDUID:%s send ack.\n",
self->describeNode().c_str(), self->cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(self->id(), req.cmdID));
return Void();
}
ACTOR Future<Void> handleLoadLogFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, bool isSampling) {
printf("[INFO] Worker Node:%s starts handleLoadLogFileRequest\n", self->describeNode().c_str());
state LoadingParam param;
state int64_t beginBlock = 0;
state int64_t j = 0;
state int64_t readLen = 0;
state int64_t readOffset = 0;
state Reference<IBackupContainer> bc;
param = req.param;
beginBlock = 0;
j = 0;
readLen = 0;
readOffset = 0;
readOffset = param.offset;
state RestoreCommandEnum cmdType = isSampling ? RestoreCommandEnum::Sample_Log_File : RestoreCommandEnum::Assign_Loader_Log_File;
while (self->isInProgress(cmdType)) {
printf("[DEBUG] NODE:%s loadLogFile wait for 5s\n", self->describeNode().c_str());
wait(delay(1.0));
}
//Note: handle duplicate message delivery
if (self->processedFiles.find(param.filename) != self->processedFiles.end()
|| self->isCmdProcessed(req.cmdID)) {
printf("[WARNING] Node:%s CMDUID:%s file:%s is delivered more than once! Reply directly without loading the file\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.filename.c_str());
req.reply.send(RestoreCommonReply(self->id(), req.cmdID));
return Void();
}
self->setInProgressFlag(cmdType);
printf("[INFO][Loader] Node:%s CMDUID:%s Assign_Loader_Log_File loading param:%s\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.toString().c_str());
bc = self->bc;
printf("[INFO][Loader] Node:%s CMDUID:%s open backup container for url:%s\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.url.toString().c_str());
printf("[INFO][Loader] Node:%s CMDUID:%s filename:%s blockSize:%ld\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.filename.c_str(), param.blockSize);
self->kvOps.clear(); //Clear kvOps so that kvOps only hold mutations for the current data block. We will send all mutations in kvOps to applier
self->mutationMap.clear();
self->mutationPartMap.clear();
ASSERT( param.blockSize > 0 );
//state std::vector<Future<Void>> fileParserFutures;
if (param.offset % param.blockSize != 0) {
printf("[WARNING] Parse file not at block boundary! param.offset:%ld param.blocksize:%ld, remainder:%ld\n",
param.offset, param.blockSize, param.offset % param.blockSize);
}
for (j = param.offset; j < param.length; j += param.blockSize) {
readOffset = j;
readLen = std::min<int64_t>(param.blockSize, param.length - j);
// NOTE: Log file holds set of blocks of data. We need to parse the data block by block and get the kv pair(version, serialized_mutations)
// The set of mutations at the same version may be splitted into multiple kv pairs ACROSS multiple data blocks when the size of serialized_mutations is larger than 20000.
wait( _parseLogFileToMutationsOnLoader(self, bc, param.version, param.filename, readOffset, readLen, param.restoreRange, param.addPrefix, param.removePrefix, param.mutationLogPrefix) );
++beginBlock;
}
printf("[INFO][Loader] Node:%s CMDUID:%s finishes parsing the data block into kv pairs (version, serialized_mutations) for file:%s\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.filename.c_str());
parseSerializedMutation(self, isSampling);
printf("[INFO][Loader] Node:%s CMDUID:%s finishes process Log file:%s\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(),
param.filename.c_str());
printf("[INFO][Loader] Node:%s CMDUID:%s will send log mutations to applier\n",
self->describeNode().c_str(), req.cmdID.toString().c_str());
// if ( isSampling ) {
// wait( registerMutationsToMasterApplier(self) );
// } else {
// wait( registerMutationsToApplier(self, false, req.param.prevVersion, req.param.endVersion) ); // Send the parsed mutation to applier who will apply the mutation to DB
// }
wait( registerMutationsToApplier(self, false, req.param.prevVersion, req.param.endVersion) ); // Send the parsed mutation to applier who will apply the mutation to DB
// TODO: NOTE: If we parse log file, the DB status will be incorrect.
if ( !isSampling ) {
self->processedFiles[param.filename] = 1;
}
self->processedCmd[req.cmdID] = 1;
self->clearInProgressFlag(cmdType);
printf("[INFO][Loader] Node:%s CMDUID:%s clear inProgressFlag :%lx for Assign_Log_Range_File.\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(), self->inProgressFlag);
//Send ack to master that loader has finished loading the data
printf("[INFO][Loader] Node:%s CMDUID:%s send ack.\n",
self->describeNode().c_str(), self->cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(self->id(), req.cmdID)); // master node is waiting
return Void();
}
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self, bool isRangeFile, Version startVersion, Version endVersion) {
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
VersionedMutationsMap *pkvOps,
bool isRangeFile, Version startVersion, Version endVersion) {
state VersionedMutationsMap &kvOps = *pkvOps;
printf("[INFO][Loader] Node:%s self->masterApplierInterf:%s, registerMutationsToApplier\n",
self->describeNode().c_str(), self->masterApplierInterf.toString().c_str());
@ -354,8 +217,8 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
state int splitMutationIndex = 0;
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if ( self->kvOps.find(endVersion) == self->kvOps.end() ) {
self->kvOps[endVersion] = VectorRef<MutationRef>();
if ( kvOps.find(endVersion) == kvOps.end() ) {
kvOps[endVersion] = VectorRef<MutationRef>();
}
self->printAppliersKeyRange();
@ -367,7 +230,7 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
state Standalone<VectorRef<UID>> nodeIDs;
// Initialize the above two maps
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::map<UID, RestoreSendMutationVectorVersionedRequest> requestsToAppliers;
state std::vector<std::pair<UID, RestoreSendMutationVectorVersionedRequest>> requests;
state Version prevVersion = startVersion;
loop {
try {
@ -378,7 +241,7 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
// MX: NEED TO A WAY TO GENERATE NON_DUPLICATE CMDUID across loaders
self->cmdID.setPhase(RestoreCommandEnum::Loader_Send_Mutations_To_Applier); //MX: THIS MAY BE WRONG! CMDID may duplicate across loaders
for ( kvOp = self->kvOps.begin(); kvOp != self->kvOps.end(); kvOp++) {
for ( kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
// In case try-catch has error and loop back
applierMutationsBuffer.clear();
applierMutationsSize.clear();
@ -422,21 +285,6 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
kvCount++;
}
// for (auto &applierID : applierIDs) {
// if ( applierMutationsSize[applierID] >= mutationVectorThreshold ) {
// state int tmpNumMutations = applierMutationsBuffer[applierID].size();
// self->cmdID.nextCmd();
// cmdReplies.push_back(self->appliersInterf[applierID].sendMutationVector.getReply(
// RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID])));
// applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
// applierMutationsSize[applierID] = 0;
// printf("[INFO][Loader] Waits for applier:%s to receive %ld range mutations\n", applierID.toString().c_str(), tmpNumMutations);
// std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
// cmdReplies.clear();
// }
// }
} else { // mutation operates on a particular key
std::map<Standalone<KeyRef>, UID>::iterator itlow = self->range2Applier.lower_bound(kvm.param1); // lower_bound returns the iterator that is >= m.param1
// make sure itlow->first <= m.param1
@ -454,17 +302,6 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation); // Q: Maybe push_back_deep()?
applierMutationsSize[applierID] += mutation.expectedSize();
// if ( applierMutationsSize[applierID] >= mutationVectorThreshold ) {
// self->cmdID.nextCmd();
// cmdReplies.push_back(self->appliersInterf[applierID].sendMutationVector.getReply(
// RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID])));
// printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", applierMutationsBuffer[applierID].size());
// applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
// applierMutationsSize[applierID] = 0;
// std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
// cmdReplies.clear();
// }
}
} // Mutations at the same version
@ -474,28 +311,22 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
for (auto &applierID : applierIDs) {
printf("[DEBUG][Loader] sendMutationVector size:%d for applierID:%s\n", applierMutationsBuffer[applierID].size(), applierID.toString().c_str());
self->cmdID.nextCmd(); // no-use
requestsToAppliers[applierID] = RestoreSendMutationVectorVersionedRequest(self->cmdID, prevVersion, commitVersion, isRangeFile, applierMutationsBuffer[applierID]);
requests.push_back( std::make_pair(applierID, RestoreSendMutationVectorVersionedRequest(self->cmdID, prevVersion, commitVersion, isRangeFile, applierMutationsBuffer[applierID])) );
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0;
//std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) ); // Q: We need to wait for each reply, otherwise, correctness has error. Why?
//cmdReplies.clear();
}
wait( sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requestsToAppliers) );
requestsToAppliers.clear();
wait( sendBatchRequests(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requests) );
requests.clear();
ASSERT( prevVersion < commitVersion );
prevVersion = commitVersion;
} // all versions of mutations
// if (!cmdReplies.empty()) {
// printf("[INFO][Loader] Last Waits for applier to receive %ld range mutations\n", cmdReplies.size());
// std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
// //std::vector<RestoreCommonReply> reps = wait( getAll(cmdReplies) );
// cmdReplies.clear();
// }
printf("[Summary][Loader] Node:%s Last CMDUID:%s produces %d mutation operations\n",
self->describeNode().c_str(), self->cmdID.toString().c_str(), kvCount);
self->kvOps.clear();
//kvOps.clear();
break;
} catch (Error &e) {
@ -507,37 +338,6 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self,
return Void();
}
// std::map<Standalone<MutationRef>, UID> splitMutationv2(Reference<RestoreLoaderData> self, MutationRef m) {
// std::map<Standalone<MutationRef>, UID> m2appliers;
// // key range [m->param1, m->param2)
// //std::map<Standalone<KeyRef>, UID>;
// printf("SPLITMUTATION: mutation:%s\n", m.toString().c_str());
// std::map<Standalone<KeyRef>, UID>::iterator itlow, itup; //we will return [itlow, itup)
// itlow = self->range2Applier.lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
// itup = self->range2Applier.upper_bound(m.param2); // upper_bound returns the iterator that is > m.param2; return rmap::end if no keys are considered to go after m.param2.
// printf("SPLITMUTATION: itlow_key:%s itup_key:%s\n", itlow->first.toString().c_str(), itup->first.toString().c_str());
// ASSERT( itup == self->range2Applier.end() || itup->first >= m.param2 );
// while (itlow != itup) {
// MutationRef curm; //current mutation
// curm.type = m.type;
// curm.param1 = itlow->first;
// itlow++;
// if (itlow == self->range2Applier.end()) {
// curm.param2 = normalKeys.end;
// } else {
// curm.param2 = itlow->first;
// }
// printf("SPLITMUTATION: m2appliers.push_back:%s\n", curm.toString().c_str());
// m2appliers[curm] = itlow->second;
// }
// printf("SPLITMUTATION: m2appliers.size:%d\n", m2appliers.size());
// return m2appliers;
// }
// TODO: Add a unit test for this function
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
@ -608,7 +408,11 @@ void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mve
//key_input format: [logRangeMutation.first][hash_value_of_commit_version:1B][bigEndian64(commitVersion)][bigEndian32(part)]
bool concatenateBackupMutationForLogFile(Reference<RestoreLoaderData> self, Standalone<StringRef> val_input, Standalone<StringRef> key_input) {
bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standalone<StringRef>> *pMutationMap,
std::map<Standalone<StringRef>, uint32_t> *pMutationPartMap,
Standalone<StringRef> val_input, Standalone<StringRef> key_input) {
std::map<Standalone<StringRef>, Standalone<StringRef>> &mutationMap = *pMutationMap;
std::map<Standalone<StringRef>, uint32_t> &mutationPartMap = *pMutationPartMap;
std::string prefix = "||\t";
std::stringstream ss;
// const int version_size = 12;
@ -663,26 +467,26 @@ bool concatenateBackupMutationForLogFile(Reference<RestoreLoaderData> self, Stan
key_input.size(), longRangeMutationFirst.printable().c_str(), hashValue,
commitVersion, commitVersionBE,
part, partBE,
part_direct, self->mutationMap.size());
part_direct, mutationMap.size());
}
if ( self->mutationMap.find(id) == self->mutationMap.end() ) {
self->mutationMap.insert(std::make_pair(id, val_input));
if ( mutationMap.find(id) == mutationMap.end() ) {
mutationMap.insert(std::make_pair(id, val_input));
if ( part_direct != 0 ) {
printf("[ERROR]!!! part:%d != 0 for key_input:%s\n", part_direct, getHexString(key_input).c_str());
}
self->mutationPartMap.insert(std::make_pair(id, part_direct));
mutationPartMap.insert(std::make_pair(id, part_direct));
} else { // concatenate the val string
// printf("[INFO] Concatenate the log's val string at version:%ld\n", id.toString().c_str());
self->mutationMap[id] = self->mutationMap[id].contents().withSuffix(val_input.contents()); //Assign the new Areana to the map's value
if ( part_direct != (self->mutationPartMap[id] + 1) ) {
printf("[ERROR]!!! current part id:%d new part_direct:%d is not the next integer of key_input:%s\n", self->mutationPartMap[id], part_direct, getHexString(key_input).c_str());
mutationMap[id] = mutationMap[id].contents().withSuffix(val_input.contents()); //Assign the new Areana to the map's value
if ( part_direct != (mutationPartMap[id] + 1) ) {
fprintf(stderr, "[ERROR]!!! current part id:%d new part_direct:%d is not the next integer of key_input:%s\n", mutationPartMap[id], part_direct, getHexString(key_input).c_str());
printf("[HINT] Check if the same range or log file has been processed more than once!\n");
}
if ( part_direct != part ) {
printf("part_direct:%08x != part:%08x\n", part_direct, part);
}
self->mutationPartMap[id] = part_direct;
mutationPartMap[id] = part_direct;
concatenated = true;
}
@ -707,8 +511,13 @@ bool isRangeMutation(MutationRef m) {
// Parse the kv pair (version, serialized_mutation), which are the results parsed from log file.
void parseSerializedMutation(Reference<RestoreLoaderData> self, bool isSampling) {
void _parseSerializedMutation(VersionedMutationsMap *pkvOps,
std::map<Standalone<StringRef>, Standalone<StringRef>> *pmutationMap,
bool isSampling) {
// Step: Parse the concatenated KV pairs into (version, <K, V, mutationType>) pair
VersionedMutationsMap &kvOps = *pkvOps;
std::map<Standalone<StringRef>, Standalone<StringRef>> &mutationMap = *pmutationMap;
printf("[INFO] Parse the concatenated log data\n");
std::string prefix = "||\t";
std::stringstream ss;
@ -716,7 +525,7 @@ bool isRangeMutation(MutationRef m) {
// const int header_size = 12;
int kvCount = 0;
for ( auto& m : self->mutationMap ) {
for ( auto& m : mutationMap ) {
StringRef k = m.first.contents();
StringRefReaderMX readerVersion(k, restore_corrupted_data());
uint64_t commitVersion = readerVersion.consume<uint64_t>(); // Consume little Endian data
@ -734,9 +543,7 @@ bool isRangeMutation(MutationRef m) {
uint32_t val_length_decode = reader.consume<uint32_t>(); //Parse little endian value, confirmed it is correct!
count_size += 4;
if ( self->kvOps.find(commitVersion) == self->kvOps.end() ) {
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
}
kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
if ( debug_verbose ) {
printf("----------------------------------------------------------Register Backup Mutation into KVOPs version:0x%08lx (%08ld)\n", commitVersion, commitVersion);
@ -777,7 +584,7 @@ bool isRangeMutation(MutationRef m) {
count_size += 4 * 3 + kLen + vLen;
MutationRef mutation((MutationRef::Type) type, KeyRef(k, kLen), KeyRef(v, vLen));
self->kvOps[commitVersion].push_back_deep(self->kvOps[commitVersion].arena(), mutation);
kvOps[commitVersion].push_back_deep(kvOps[commitVersion].arena(), mutation);
kvCount++;
if ( kLen < 0 || kLen > val.size() || vLen < 0 || vLen > val.size() ) {
@ -802,32 +609,33 @@ bool isRangeMutation(MutationRef m) {
}
// Parsing log file, which is the same for sampling and loading phases
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLoaderData> self,
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(VersionedMutationsMap *pkvOps,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset_input, int64_t readLen_input,
KeyRange restoreRange, Key addPrefix, Key removePrefix) {
state VersionedMutationsMap &kvOps = *pkvOps;
state int64_t readOffset = readOffset_input;
state int64_t readLen = readLen_input;
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] Parse range file and get mutations 1, bc:%lx\n", bc.getPtr());
}
// if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] Parse range file and get mutations 1, bc:%lx\n", bc.getPtr());
// }
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] Parse range file and get mutations 2\n");
}
// if ( debug_verbose ) {
// printf("[VERBOSE_DEBUG] Parse range file and get mutations 2\n");
// }
state Standalone<VectorRef<KeyValueRef>> blockData = wait(parallelFileRestore::decodeRangeFileBlock(inFile, readOffset, readLen));
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] Parse range file and get mutations 3\n");
int tmpi = 0;
for (tmpi = 0; tmpi < blockData.size(); tmpi++) {
printf("\t[VERBOSE_DEBUG] mutation: key:%s value:%s\n", blockData[tmpi].key.toString().c_str(), blockData[tmpi].value.toString().c_str());
}
}
// if ( debug_verbose ) {
// printf("[VERBOSE_DEBUG] Parse range file and get mutations 3\n");
// int tmpi = 0;
// for (tmpi = 0; tmpi < blockData.size(); tmpi++) {
// printf("\t[VERBOSE_DEBUG] mutation: key:%s value:%s\n", blockData[tmpi].key.toString().c_str(), blockData[tmpi].value.toString().c_str());
// }
// }
// First and last key are the range for this file
state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
@ -845,26 +653,26 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLo
// The blockData's first and last entries are metadata, not the real data
int rangeStart = 1; //1
int rangeEnd = blockData.size() -1; //blockData.size() - 1 // Q: the rangeStart and rangeEnd is [,)?
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] Range file decoded blockData\n");
for (auto& data : blockData ) {
printf("\t[VERBOSE_DEBUG] data key:%s val:%s\n", data.key.toString().c_str(), data.value.toString().c_str());
}
}
// if ( debug_verbose ) {
// printf("[VERBOSE_DEBUG] Range file decoded blockData\n");
// for (auto& data : blockData ) {
// printf("\t[VERBOSE_DEBUG] data key:%s val:%s\n", data.key.toString().c_str(), data.value.toString().c_str());
// }
// }
// Slide start forwaself, stop if something in range is found
// Slide start from begining, stop if something in range is found
// Move rangeStart and rangeEnd until they is within restoreRange
while(rangeStart < rangeEnd && !restoreRange.contains(blockData[rangeStart].key)) {
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] rangeStart:%d key:%s is not in the range:%s\n", rangeStart, blockData[rangeStart].key.toString().c_str(), restoreRange.toString().c_str());
}
// if ( debug_verbose ) {
// printf("[VERBOSE_DEBUG] rangeStart:%d key:%s is not in the range:%s\n", rangeStart, blockData[rangeStart].key.toString().c_str(), restoreRange.toString().c_str());
// }
++rangeStart;
}
// Side end backwaself, stop if something in range is found
while(rangeEnd > rangeStart && !restoreRange.contains(blockData[rangeEnd - 1].key)) {
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] (rangeEnd:%d - 1) key:%s is not in the range:%s\n", rangeEnd, blockData[rangeStart].key.toString().c_str(), restoreRange.toString().c_str());
}
// if ( debug_verbose ) {
// printf("[VERBOSE_DEBUG] (rangeEnd:%d - 1) key:%s is not in the range:%s\n", rangeEnd, blockData[rangeStart].key.toString().c_str(), restoreRange.toString().c_str());
// }
--rangeEnd;
}
@ -916,20 +724,15 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLo
//// getHexString(data[i].key.c_str(), getHexString(data[i].value).c_str(), data[i].key.size(), data[i].value.size());
//NOTE: Should NOT removePrefix and addPrefix for the backup data!
// In other woselfs, the following operation is wrong: data[i].key.removePrefix(removePrefix).withPrefix(addPrefix)
// In other words, the following operation is wrong: data[i].key.removePrefix(removePrefix).withPrefix(addPrefix)
MutationRef m(MutationRef::Type::SetValue, data[i].key, data[i].value); //ASSUME: all operation in range file is set.
++kvCount;
// TODO: we can commit the kv operation into DB.
// Right now, we cache all kv operations into kvOps, and apply all kv operations later in one place
if ( self->kvOps.find(version) == self->kvOps.end() ) { // Create the map's key if mutation m is the first on to be inserted
//kvOps.insert(std::make_pair(rangeFile.version, Standalone<VectorRef<MutationRef>>(VectorRef<MutationRef>())));
self->kvOps.insert(std::make_pair(version, VectorRef<MutationRef>()));
}
ASSERT(self->kvOps.find(version) != self->kvOps.end());
self->kvOps[version].push_back_deep(self->kvOps[version].arena(), m);
// We cache all kv operations into kvOps, and apply all kv operations later in one place
kvOps.insert(std::make_pair(version, VectorRef<MutationRef>()));
ASSERT(kvOps.find(version) != kvOps.end());
kvOps[version].push_back_deep(kvOps[version].arena(), m);
}
// Commit succeeded, so advance starting point
@ -937,15 +740,14 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLo
if(start == end) {
//TraceEvent("ExtraApplyRangeFileToDB_MX").detail("Progress", "DoneApplyKVToDB");
printf("[INFO][Loader] NodeID:%s Parse RangeFile:%s: the number of kv operations = %d\n",
self->describeNode().c_str(), fileName.c_str(), kvCount);
printf("[INFO][Loader] Parse RangeFile:%s: the number of kv operations = %d\n", fileName.c_str(), kvCount);
return Void();
}
}
}
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(Reference<RestoreLoaderData> self,
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(std::map<Standalone<StringRef>, Standalone<StringRef>> *pMutationMap,
std::map<Standalone<StringRef>, uint32_t> *pMutationPartMap,
Reference<IBackupContainer> bc, Version version,
std::string fileName, int64_t readOffset, int64_t readLen,
KeyRange restoreRange, Key addPrefix, Key removePrefix,
@ -990,7 +792,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLo
// printf("LogFile [KEY:%s, VALUE:%s, VERSION:%ld, op:NoOp]\n", getHexString(k).c_str(), getHexString(v).c_str(), logFile.version);
// printBackupMutationRefValueHex(v, " |\t");
// printf("[DEBUG]||Concatenate backup mutation:fileInfo:%s, data:%d\n", logFile.toString().c_str(), i);
bool concatenated = concatenateBackupMutationForLogFile(self, data[i].value, data[i].key);
bool concatenated = concatenateBackupMutationForLogFile(pMutationMap, pMutationPartMap, data[i].value, data[i].key);
numConcatenated += ( concatenated ? 1 : 0);
// //TODO: Decode the value to get the mutation type. Use NoOp to distinguish from range kv for now.
// MutationRef m(MutationRef::Type::NoOp, data[i].key, data[i].value); //ASSUME: all operation in log file is NoOp.
@ -1009,7 +811,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLo
}
}
printf("[INFO] raw kv number:%d parsed from log file, concatenated:%d kv, num_log_versions:%d\n", data.size(), numConcatenated, self->mutationMap.size());
printf("[INFO] raw kv number:%d parsed from log file, concatenated:%d kv, num_log_versions:%d\n", data.size(), numConcatenated, pMutationMap->size());
return Void();
}

View File

@ -44,22 +44,13 @@
#include "flow/actorcompiler.h" // has to be last include
struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoaderData> {
public:
std::map<LoadingParam, Future<Void>> processedFileParams;
// range2Applier is in master and loader node. Loader node uses this to determine which applier a mutation should be sent
std::map<Standalone<KeyRef>, UID> range2Applier; // KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Standalone<KeyRef>, int> keyOpsCount; // The number of operations per key which is used to determine the key-range boundary for appliers
int numSampledMutations; // The total number of mutations received from sampled data.
// Loader's state to handle the duplicate delivery of loading commands
std::map<std::string, int> processedFiles; //first is filename of processed file, second is not used
// Temporary data structure for parsing range and log files into (version, <K, V, mutationType>)
std::map<Version, Standalone<VectorRef<MutationRef>>> kvOps;
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
std::map<Standalone<StringRef>, Standalone<StringRef>> mutationMap; // Key is the unique identifier for a batch of mutation logs at the same version
std::map<Standalone<StringRef>, uint32_t> mutationPartMap; // Recoself the most recent
Reference<IBackupContainer> bc; // Backup container is used to read backup files
Key bcUrl; // The url used to get the bc
@ -95,11 +86,7 @@ public:
keyOpsCount.clear();
numSampledMutations = 0;
processedFiles.clear();
kvOps.clear();
mutationMap.clear();
mutationPartMap.clear();
processedFileParams.clear();
curWorkloadSize = 0;
}

View File

@ -239,18 +239,10 @@ ACTOR static Future<Version> processRestoreRequest(RestoreRequest request, Refer
return request.targetVersion;
}
enum RestoreFileType { RangeFileType = 0, LogFileType = 1 };
// Distribution workload per version batch
ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMasterData> self, Database cx, RestoreRequest request, Reference<RestoreConfig> restoreConfig) {
// state Key tagName = request.tagName;
// state Key url = request.url;
// state bool waitForComplete = request.waitForComplete;
// state Version targetVersion = request.targetVersion;
// state bool verbose = request.verbose;
// state KeyRange restoreRange = request.range;
// state Key addPrefix = request.addPrefix;
// state Key removePrefix = request.removePrefix;
// state bool lockDB = request.lockDB;
// state UID randomUid = request.randomUid;
state Key mutationLogPrefix = restoreConfig->mutationLogPrefix();
if ( self->isBackupEmpty() ) {
@ -313,165 +305,117 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
state int loadSizeB = loadingSizeMB * 1024 * 1024;
state int loadingCmdIndex = 0;
state int checkpointCurFileIndex = 0;
state long checkpointCurOffset = 0;
startTime = now();
// We should load log file before we do range file
state RestoreCommandEnum phaseType = RestoreCommandEnum::Assign_Loader_Log_File;
state std::vector<Future<RestoreCommonReply>> cmdReplies;
state int typeOfFilesProcessed = 0;
state RestoreFileType processedFileType = RestoreFileType::LogFileType;
state int curFileIndex;
state long curOffset;
state bool allLoadReqsSent;
state Version prevVersion;
loop {
state int curFileIndex = 0; // The smallest index of the files that has not been FULLY loaded
state long curOffset = 0;
state bool allLoadReqsSent = false;
state Version prevVersion = 0; // Start version for range or log file is 0
curFileIndex = 0; // The smallest index of the files that has not been FULLY loaded
curOffset = 0;
allLoadReqsSent = false;
prevVersion = 0; // Start version for range or log file is 0
std::vector<std::pair<UID, RestoreLoadFileRequest>> requests;
loop {
try {
if ( allLoadReqsSent ) {
break; // All load requests have been handled
if ( allLoadReqsSent ) {
break; // All load requests have been handled
}
printf("[INFO] Number of backup files:%ld\n", self->files.size());
for (auto &loader : self->loadersInterf) {
UID loaderID = loader.first;
RestoreLoaderInterface loaderInterf = loader.second;
while ( curFileIndex < self->files.size() && self->files[curFileIndex].fileSize == 0 ) {
// NOTE: && self->files[curFileIndex].cursor >= self->files[curFileIndex].fileSize
printf("[INFO] File %ld:%s filesize:%ld skip the file\n", curFileIndex,
self->files[curFileIndex].fileName.c_str(), self->files[curFileIndex].fileSize);
curFileIndex++;
curOffset = 0;
}
// wait(delay(1.0));
if ( curFileIndex >= self->files.size() ) {
allLoadReqsSent = true;
break;
}
LoadingParam param;
param.url = request.url;
param.version = self->files[curFileIndex].version;
param.filename = self->files[curFileIndex].fileName;
param.offset = 0; //curOffset; //self->files[curFileIndex].cursor;
//param.length = std::min(self->files[curFileIndex].fileSize - curOffset, self->files[curFileIndex].blockSize);
//param.cursor = 0;
param.length = self->files[curFileIndex].fileSize;
loadSizeB = param.length;
param.blockSize = self->files[curFileIndex].blockSize;
param.restoreRange = request.range;
param.addPrefix = request.addPrefix;
param.removePrefix = request.removePrefix;
param.mutationLogPrefix = mutationLogPrefix;
param.isRangeFile = self->files[curFileIndex].isRange;
if ( !(param.length > 0 && param.offset >= 0 && param.offset < self->files[curFileIndex].fileSize) ) {
printf("[ERROR] param: length:%ld offset:%ld fileSize:%ld for %ldth filename:%s\n",
param.length, param.offset, self->files[curFileIndex].fileSize, curFileIndex,
self->files[curFileIndex].fileName.c_str());
}
ASSERT( param.length > 0 );
ASSERT( param.offset >= 0 );
ASSERT( param.offset < self->files[curFileIndex].fileSize );
cmdReplies.clear();
printf("[INFO] Number of backup files:%ld\n", self->files.size());
self->cmdID.initPhase(phaseType);
for (auto &loader : self->loadersInterf) {
UID loaderID = loader.first;
RestoreLoaderInterface loaderInterf = loader.second;
if ( (processedFileType == RestoreFileType::LogFileType && self->files[curFileIndex].isRange)
|| (processedFileType == RestoreFileType::RangeFileType && !self->files[curFileIndex].isRange) ) {
printf("Skip fileIndex:%d processedFileType:%d file.isRange:%d\n", curFileIndex, processedFileType, self->files[curFileIndex].isRange);
self->files[curFileIndex].cursor = 0;
curFileIndex++;
curOffset = 0;
} else { // Create the request
param.prevVersion = prevVersion;
prevVersion = self->files[curFileIndex].isRange ? self->files[curFileIndex].version : self->files[curFileIndex].endVersion;
param.endVersion = prevVersion;
requests.push_back( std::make_pair(loader.first, RestoreLoadFileRequest(self->cmdID, param)) );
printf("[CMD] Loading fileIndex:%ld fileInfo:%s loadingParam:%s on node %s\n",
curFileIndex, self->files[curFileIndex].toString().c_str(),
param.toString().c_str(), loaderID.toString().c_str()); // VERY USEFUL INFO
printf("[INFO] Node:%s CMDUID:%s isRange:%d loaderNode:%s\n", self->describeNode().c_str(), self->cmdID.toString().c_str(),
(int) self->files[curFileIndex].isRange, loaderID.toString().c_str());
//curOffset += param.length;
while ( curFileIndex < self->files.size() && self->files[curFileIndex].fileSize == 0 ) {
// NOTE: && self->files[curFileIndex].cursor >= self->files[curFileIndex].fileSize
printf("[INFO] File %ld:%s filesize:%ld skip the file\n", curFileIndex,
self->files[curFileIndex].fileName.c_str(), self->files[curFileIndex].fileSize);
// Reach the end of the file
if ( param.length + param.offset >= self->files[curFileIndex].fileSize ) {
curFileIndex++;
curOffset = 0;
}
if ( curFileIndex >= self->files.size() ) {
allLoadReqsSent = true;
break;
}
LoadingParam param;
//self->files[curFileIndex].cursor = 0; // This is a hacky way to make sure cursor is correct in current version when we load 1 file at a time
// MX: May Need to specify endVersion as well because the
param.url = request.url;
param.version = self->files[curFileIndex].version;
param.filename = self->files[curFileIndex].fileName;
param.offset = curOffset; //self->files[curFileIndex].cursor;
param.length = std::min(self->files[curFileIndex].fileSize - curOffset, self->files[curFileIndex].blockSize);
//param.length = self->files[curFileIndex].fileSize;
loadSizeB = param.length;
param.blockSize = self->files[curFileIndex].blockSize;
param.restoreRange = request.range;
param.addPrefix = request.addPrefix;
param.removePrefix = request.removePrefix;
param.mutationLogPrefix = mutationLogPrefix;
if ( !(param.length > 0 && param.offset >= 0 && param.offset < self->files[curFileIndex].fileSize) ) {
printf("[ERROR] param: length:%ld offset:%ld fileSize:%ld for %ldth filename:%s\n",
param.length, param.offset, self->files[curFileIndex].fileSize, curFileIndex,
self->files[curFileIndex].fileName.c_str());
}
ASSERT( param.length > 0 );
ASSERT( param.offset >= 0 );
ASSERT( param.offset < self->files[curFileIndex].fileSize );
self->files[curFileIndex].cursor = self->files[curFileIndex].cursor + param.length;
RestoreCommandEnum cmdType = RestoreCommandEnum::Assign_Loader_Range_File;
if (self->files[curFileIndex].isRange) {
cmdType = RestoreCommandEnum::Assign_Loader_Range_File;
self->cmdID.setPhase(RestoreCommandEnum::Assign_Loader_Range_File);
} else {
cmdType = RestoreCommandEnum::Assign_Loader_Log_File;
self->cmdID.setPhase(RestoreCommandEnum::Assign_Loader_Log_File);
}
if ( (phaseType == RestoreCommandEnum::Assign_Loader_Log_File && self->files[curFileIndex].isRange)
|| (phaseType == RestoreCommandEnum::Assign_Loader_Range_File && !self->files[curFileIndex].isRange) ) {
self->files[curFileIndex].cursor = 0;
curFileIndex++;
curOffset = 0;
} else { // load the type of file in the phaseType
self->cmdID.nextCmd();
param.prevVersion = prevVersion;
prevVersion = self->files[curFileIndex].isRange ? self->files[curFileIndex].version : self->files[curFileIndex].endVersion;
param.endVersion = prevVersion;
printf("[CMD] Loading fileIndex:%ld fileInfo:%s loadingParam:%s on node %s\n",
curFileIndex, self->files[curFileIndex].toString().c_str(),
param.toString().c_str(), loaderID.toString().c_str()); // VERY USEFUL INFO
printf("[INFO] Node:%s CMDUID:%s cmdType:%d isRange:%d loaderNode:%s\n", self->describeNode().c_str(), self->cmdID.toString().c_str(),
(int) cmdType, (int) self->files[curFileIndex].isRange, loaderID.toString().c_str());
if (self->files[curFileIndex].isRange) {
cmdReplies.push_back( loaderInterf.loadRangeFile.getReply(RestoreLoadFileRequest(self->cmdID, param)) );
} else {
cmdReplies.push_back( loaderInterf.loadLogFile.getReply(RestoreLoadFileRequest(self->cmdID, param)) );
}
curOffset += param.length;
// Reach the end of the file
if ( param.length + param.offset >= self->files[curFileIndex].fileSize ) {
curFileIndex++;
curOffset = 0;
}
// if (param.length <= loadSizeB) { // Reach the end of the file
// ASSERT( self->files[curFileIndex].cursor == self->files[curFileIndex].fileSize );
// curFileIndex++;
// }
}
if ( curFileIndex >= self->files.size() ) {
allLoadReqsSent = true;
break;
}
//++loadingCmdIndex; // Replaced by cmdUID
}
printf("[INFO] Wait for %ld loaders to accept the cmd Assign_Loader_File\n", cmdReplies.size());
// Question: How to set reps to different value based on cmdReplies.empty()?
if ( !cmdReplies.empty() ) {
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) ); //TODO: change to getAny. NOTE: need to keep the still-waiting replies
//std::vector<RestoreCommonReply> reps = wait( getAll(cmdReplies) );
cmdReplies.clear();
for (int i = 0; i < reps.size(); ++i) {
printf("[INFO] Get Ack reply:%s for Assign_Loader_File\n",
reps[i].toString().c_str());
}
checkpointCurFileIndex = curFileIndex; // Save the previous success point
checkpointCurOffset = curOffset;
if ( curFileIndex >= self->files.size() ) {
allLoadReqsSent = true;
break;
}
}
// TODO: Let master print all nodes status. Note: We need a function to print out all nodes status
if (allLoadReqsSent) {
printf("[INFO] allLoadReqsSent has finished.\n");
break; // NOTE: need to change when change to wait on any cmdReplies
}
} catch (Error &e) {
fprintf(stdout, "[ERROR] Node:%s, Commands before cmdID:%s error. error code:%d, error message:%s\n", self->describeNode().c_str(),
self->cmdID.toString().c_str(), e.code(), e.what());
curFileIndex = checkpointCurFileIndex;
curOffset = checkpointCurOffset;
if (allLoadReqsSent) {
printf("[INFO] allLoadReqsSent has finished.\n");
break; // NOTE: need to change when change to wait on any cmdReplies
}
}
// Wait on the batch of load files or log files
++typeOfFilesProcessed;
wait( sendBatchRequests(&RestoreLoaderInterface::loadFile, self->loadersInterf, requests) );
if (phaseType == RestoreCommandEnum::Assign_Loader_Log_File) {
phaseType = RestoreCommandEnum::Assign_Loader_Range_File;
} else if (phaseType == RestoreCommandEnum::Assign_Loader_Range_File) {
processedFileType = RestoreFileType::RangeFileType; // The second batch is RangeFile
if ( typeOfFilesProcessed == 2 ) { // We only have 2 types of files
break;
}
}
printf("[Progress] distributeWorkloadPerVersionBatch loadFiles time:%.2f seconds\n", now() - startTime);
ASSERT( cmdReplies.empty() );
// Notify the applier to applly mutation to DB
startTime = now();
wait( notifyApplierToApplyMutations(self) );
printf("[Progress] distributeWorkloadPerVersionBatch applyToDB time:%.2f seconds\n", now() - startTime);
@ -484,7 +428,6 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
runningTime, endTime - startTimeAfterSampling);
return Void();
}
// Placehold for sample workload
@ -675,34 +618,33 @@ ACTOR static Future<Void> _clearDB(Reference<ReadYourWritesTransaction> tr) {
ACTOR Future<Void> initializeVersionBatch(Reference<RestoreMasterData> self) {
self->cmdID.initPhase(RestoreCommandEnum::Reset_VersionBatch);
std::map<UID, RestoreVersionBatchRequest> applierRequests;
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
for (auto &applier : self->appliersInterf) {
self->cmdID.nextCmd();
applierRequests[applier.first] = RestoreVersionBatchRequest(self->cmdID, self->batchIndex);
requests.push_back( std::make_pair(applier.first, RestoreVersionBatchRequest(self->cmdID, self->batchIndex)) );
}
wait( sendBatchRequests(&RestoreApplierInterface::initVersionBatch, self->appliersInterf, applierRequests) );
wait( sendBatchRequests(&RestoreApplierInterface::initVersionBatch, self->appliersInterf, requests) );
std::map<UID, RestoreVersionBatchRequest> loaderRequests;
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
for (auto &loader : self->loadersInterf) {
self->cmdID.nextCmd();
loaderRequests[loader.first] = RestoreVersionBatchRequest(self->cmdID, self->batchIndex);
requests.push_back( std::make_pair(loader.first, RestoreVersionBatchRequest(self->cmdID, self->batchIndex)) );
}
wait( sendBatchRequests(&RestoreLoaderInterface::initVersionBatch, self->loadersInterf, loaderRequests) );
wait( sendBatchRequests(&RestoreLoaderInterface::initVersionBatch, self->loadersInterf, requests) );
return Void();
}
ACTOR Future<Void> notifyApplierToApplyMutations(Reference<RestoreMasterData> self) {
state std::vector<Future<RestoreCommonReply>> cmdReplies;
loop {
try {
self->cmdID.initPhase( RestoreCommandEnum::Apply_Mutation_To_DB );
// Prepare the applyToDB requests
std::map<UID, RestoreSimpleRequest> requests;
std::vector<std::pair<UID, RestoreSimpleRequest>> requests;
for (auto& applier : self->appliersInterf) {
self->cmdID.nextCmd();
requests[applier.first] = RestoreSimpleRequest(self->cmdID);
requests.push_back( std::make_pair(applier.first, RestoreSimpleRequest(self->cmdID)) );
}
wait( sendBatchRequests(&RestoreApplierInterface::applyToDB, self->appliersInterf, requests) );