FastRestore: Code cleanup

This commit is contained in:
Meng Xu 2019-04-10 18:50:12 -07:00
parent e6dae4d1bf
commit 010b069da6
1 changed files with 28 additions and 101 deletions

View File

@ -2745,20 +2745,13 @@ ACTOR static Future<Void> finishRestore(Database cx, Standalone<VectorRef<Restor
};
// TODO: Clean up the fields in restore data structure
//state RestoreConfig restore(task);
// state RestoreConfig restore(uid);
// restore.stateEnum().set(tr, ERestoreState::COMPLETED);
// Clear the file map now since it could be huge.
// restore.fileSet().clear(tr);
// TODO: Validate that the range version map has exactly the restored ranges in it. This means that for any restore operation
// the ranges to restore must be within the backed up ranges, otherwise from the restore perspective it will appear that some
// key ranges were missing and so the backup set is incomplete and the restore has failed.
// This validation cannot be done currently because Restore only supports a single restore range but backups can have many ranges.
// Clear the applyMutations stuff, including any unapplied mutations from versions beyond the restored version.
// restore.clearApplyMutationsKeys(tr);
// restore.clearApplyMutationsKeys(tr);
printf("[INFO] Notify the end of the restore\n");
TraceEvent("NotifyRestoreFinished");
@ -3765,79 +3758,6 @@ ACTOR Future<Void> registerMutationsToMasterApplier(Reference<RestoreData> rd) {
return Void();
}
// Master applier: Receive sampled mutations sent from loader
// ACTOR Future<Void> receiveSampledMutations(Reference<RestoreData> rd, RestoreInterface interf) {
// if ( rd->localNodeStatus.role != RestoreRole::Applier) {
// printf("[ERROR] non-applier node:%s (role:%d) is waiting for cmds for appliers\n",
// rd->describeNode().c_str(), rd->localNodeStatus.role);
// } else {
// printf("[Sampling][Loader_Send_Sample_Mutation_To_Applier] nodeID:%s starts \n",
// rd->describeNode().c_str());
// }
// state int numMutations = 0;
// rd->numSampledMutations = 0;
// loop {
// choose {
// when(RestoreCommand req = waitNext(interf.cmd.getFuture())) {
// //printf("[INFO][Applier] Got Restore Command: cmd:%d UID:%s\n",
// // req.cmd, req.id.toString().c_str());
// if ( rd->localNodeStatus.nodeID != req.id ) {
// printf("[ERROR]CMDID:%s Node:%s receive request with a different nodeId:%s\n",
// req.cmdID.toString().c_str(), rd->describeNode().c_str(), req.id.toString().c_str());
// }
// if ( req.cmd == RestoreCommandEnum::Loader_Send_Sample_Mutation_To_Applier ) {
// ASSERT(req.cmd == (RestoreCommandEnum) req.cmdID.phase);
// // Handle duplicate message
// if (rd->isCmdProcessed(req.cmdID)) {
// printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
// req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
// continue;
// }
// // Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
// state uint64_t commitVersion = req.commitVersion;
// // TODO: Change the req.mutation to a vector of mutations
// MutationRef mutation(req.mutation);
// if ( rd->keyOpsCount.find(mutation.param1) == rd->keyOpsCount.end() ) {
// rd->keyOpsCount.insert(std::make_pair(mutation.param1, 0));
// }
// // NOTE: We may receive the same mutation more than once due to network package lost.
// // Since sampling is just an estimation and the network should be stable enough, we do NOT handle the duplication for now
// // In a very unreliable network, we may get many duplicate messages and get a bad key-range splits for appliers. But the restore should still work except for running slower.
// rd->keyOpsCount[mutation.param1]++;
// rd->numSampledMutations++;
// if ( rd->numSampledMutations % 1000 == 1 ) {
// printf("[Sampling][Applier] Node:%s Receives %d sampled mutations. cur_mutation:%s\n",
// rd->describeNode().c_str(), rd->numSampledMutations, mutation.toString().c_str());
// }
// req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
// rd->processedCmd[req.cmdID] = 1;
// } else if ( req.cmd == RestoreCommandEnum::Loader_Send_Sample_Mutation_To_Applier_Done ) {
// printf("[Sampling][Applier] NodeID:%s receive all sampled mutations, num_of_total_sampled_muations:%d\n",
// rd->describeNode().c_str(), rd->numSampledMutations);
// ASSERT(req.cmd == (RestoreCommandEnum) req.cmdID.phase);
// req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
// break;
// } else {
// if ( IsCmdInPreviousPhase(RestoreCommandEnum::Loader_Send_Sample_Mutation_To_Applier_Done, req.cmd) ) {
// logExpectedOldCmd(rd, RestoreCommandEnum::Loader_Send_Sample_Mutation_To_Applier_Done, req.cmd, req.cmdID);
// req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
// } else {
// logUnexpectedCmd(rd, RestoreCommandEnum::Loader_Send_Sample_Mutation_To_Applier_Done, req.cmd, req.cmdID);
// }
// }
// }
// }
// }
// return Void();
// }
////---------------Helper Functions and Class copied from old file---------------
@ -3936,7 +3856,7 @@ ACTOR Future<Void> handleSampleRangeFileRequest(RestoreLoadFileRequest req, Refe
// Handle duplicate, assuming cmdUID is always unique for the same workload
if ( rd->isCmdProcessed(req.cmdID) ) {
printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
//req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
return Void();
}
@ -3992,7 +3912,7 @@ ACTOR Future<Void> handleSampleLogFileRequest(RestoreLoadFileRequest req, Refere
// Handle duplicate message
if ( rd->isCmdProcessed(req.cmdID) ) {
printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
//req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
return Void();
}
@ -4128,14 +4048,18 @@ ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Refere
param.toString().c_str());
//Note: handle duplicate message delivery
if (rd->processedFiles.find(param.filename) != rd->processedFiles.end()) {
if (rd->processedFiles.find(param.filename) != rd->processedFiles.end() ||
rd->isCmdProcessed(req.cmdID)) {
// printf("[WARNING]Node:%s, CMDUID:%s file:%s is delivered more than once! Reply directly without loading the file\n",
// rd->describeNode().c_str(), req.cmdID.toString().c_str(),
// param.filename.c_str());
req.reply.send(RestoreCommonReply(interf.id(),req.cmdID));
//req.reply.send(RestoreCommonReply(interf.id(),req.cmdID));
return Void();
}
rd->processedFiles[param.filename] = 1;
rd->processedCmd[req.cmdID] = 1;
bc = IBackupContainer::openContainer(param.url.toString());
// printf("[INFO] Node:%s CMDUID:%s open backup container for url:%s\n",
// rd->describeNode().c_str(), req.cmdID.toString().c_str(),
@ -4171,8 +4095,7 @@ ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Refere
//Send ack to master that loader has finished loading the data
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
rd->processedFiles[param.filename] = 1;
rd->processedCmd[req.cmdID] = 1;
return Void();
@ -4203,14 +4126,18 @@ ACTOR Future<Void> handleLoadLogFileRequest(RestoreLoadFileRequest req, Referenc
//ASSERT(req.cmd == (RestoreCommandEnum) req.cmdID.phase);
//Note: handle duplicate message delivery
if (rd->processedFiles.find(param.filename) != rd->processedFiles.end()) {
if (rd->processedFiles.find(param.filename) != rd->processedFiles.end()
|| rd->isCmdProcessed(req.cmdID)) {
printf("[WARNING] Node:%s CMDUID:%s file:%s is delivered more than once! Reply directly without loading the file\n",
rd->describeNode().c_str(), req.cmdID.toString().c_str(),
param.filename.c_str());
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
//req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
return Void();
}
rd->processedFiles[param.filename] = 1;
rd->processedCmd[req.cmdID] = 1;
bc = IBackupContainer::openContainer(param.url.toString());
printf("[INFO][Loader] Node:%s CMDUID:%s open backup container for url:%s\n",
rd->describeNode().c_str(), req.cmdID.toString().c_str(),
@ -4250,9 +4177,7 @@ ACTOR Future<Void> handleLoadLogFileRequest(RestoreLoadFileRequest req, Referenc
wait( registerMutationsToApplier(rd) ); // Send the parsed mutation to applier who will apply the mutation to DB
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID)); // master node is waiting
rd->processedFiles[param.filename] = 1;
rd->processedCmd[req.cmdID] = 1;
return Void();
}
@ -4268,9 +4193,12 @@ ACTOR Future<Void> handleSendMutationRequest(RestoreSendMutationRequest req, Ref
if ( rd->isCmdProcessed(req.cmdID) ) {
//printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
//printf("[DEBUG] Skipped mutation:%s\n", req.mutation.toString().c_str());
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
//req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
return Void();
}
// Avoid race condition when this actor is called twice on the same command
rd->processedCmd[req.cmdID] = 1;
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state uint64_t commitVersion = req.commitVersion;
MutationRef mutation(req.mutation);
@ -4285,7 +4213,6 @@ ACTOR Future<Void> handleSendMutationRequest(RestoreSendMutationRequest req, Ref
}
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
rd->processedCmd[req.cmdID] = 1;
return Void();
}
@ -4297,10 +4224,12 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
// Handle duplicate message
if (rd->isCmdProcessed(req.cmdID)) {
printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
//req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
return Void();
}
rd->processedCmd[req.cmdID] = 1;
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state uint64_t commitVersion = req.commitVersion;
// TODO: Change the req.mutation to a vector of mutations
@ -4321,7 +4250,6 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
}
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
rd->processedCmd[req.cmdID] = 1;
return Void();
}
@ -4356,11 +4284,10 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
wait(delay(5.0));
}
rd->inProgressApplyToDB = true;
rd->processedCmd[req.cmdID] = 1;
if ( rd->isCmdProcessed(req.cmdID) ) {
printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
//req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
return Void();
}
@ -4370,9 +4297,10 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
return Void();
}
rd->processedCmd[req.cmdID] = 1;
sanityCheckMutationOps(rd);
if ( debug_verbose == false ) {
if ( debug_verbose ) {
TraceEvent("ApplyKVOPsToDB").detail("MapSize", rd->kvOps.size());
printf("ApplyKVOPsToDB num_of_version:%ld\n", rd->kvOps.size());
}
@ -4385,7 +4313,7 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
if ( debug_verbose ) {
TraceEvent("ApplyKVOPsToDB\t").detail("Version", it->first).detail("OpNum", it->second.size());
}
printf("ApplyKVOPsToDB numVersion:%d Version:%08lx num_of_ops:%d, \n", numVersion, it->first, it->second.size());
//printf("ApplyKVOPsToDB numVersion:%d Version:%08lx num_of_ops:%d, \n", numVersion, it->first, it->second.size());
state MutationRef m;
@ -4410,7 +4338,6 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] Node:%s apply mutation:%s\n", rd->describeNode().c_str(), m.toString().c_str());
}
printf("[VERBOSE_DEBUG] Node:%s apply mutation:%s\n", rd->describeNode().c_str(), m.toString().c_str());
loop {
try {