FastRestore:fix bug due to non-unique cmdid

This commit identifies the bug
why DB may be restored to an inconsistent state.

The cmdid is used to achieve exact once delivery even when
network can deliver a request twice.
This is under assumption that cmdid is unique for each request!

However, this assumption may not hold for
the phase Loader_Send_Mutations_To_Applier, when loaders send parsed
mutations to appliers:
1) When the same loader loads multiple files, we reset the cmdid
for the phase;
2) When different loaders load files, each loader's cmdid starts from
0 for the phase.
Both situations can break the assumption, which causes appliers to
miss some mutations to apply. This breaks the cycle test.
This commit is contained in:
Meng Xu 2019-05-14 01:49:44 -07:00
parent 85227c1233
commit 6c4c807801
2 changed files with 14 additions and 9 deletions

View File

@ -209,6 +209,7 @@ ACTOR Future<Void> handleSetApplierKeyRangeRequest(RestoreSetApplierKeyRangeRequ
self->range2Applier[req.range.begin] = req.applierID; self->range2Applier[req.range.begin] = req.applierID;
self->processedCmd.clear(); // The Loader_Register_Mutation_to_Applier command can be sent in both sampling and actual loading phases
self->processedCmd[req.cmdID] = 1; self->processedCmd[req.cmdID] = 1;
self->clearInProgressFlag(RestoreCommandEnum::Assign_Applier_KeyRange); self->clearInProgressFlag(RestoreCommandEnum::Assign_Applier_KeyRange);
@ -236,8 +237,8 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorRequ
// Handle duplicat cmd // Handle duplicat cmd
if ( self->isCmdProcessed(req.cmdID) ) { if ( self->isCmdProcessed(req.cmdID) ) {
//printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", self->describeNode().c_str(), req.cmdID.toString().c_str()); printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", self->describeNode().c_str(), req.cmdID.toString().c_str());
//printf("[DEBUG] Skipped mutation:%s\n", req.mutation.toString().c_str()); //printf("[DEBUG] Skipped duplicate cmd:%s\n", req.cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(self->id(), req.cmdID)); req.reply.send(RestoreCommonReply(self->id(), req.cmdID));
return Void(); return Void();
} }

View File

@ -338,9 +338,9 @@ ACTOR Future<Void> handleLoadLogFileRequest(RestoreLoadFileRequest req, Referenc
req.reply.send(RestoreCommonReply(self->id(), req.cmdID)); // master node is waiting req.reply.send(RestoreCommonReply(self->id(), req.cmdID)); // master node is waiting
// TODO: NOTE: If we parse log file, the DB status will be incorrect. // TODO: NOTE: If we parse log file, the DB status will be incorrect.
//if ( !isSampling ) { if ( !isSampling ) {
self->processedFiles[param.filename] = 1; self->processedFiles[param.filename] = 1;
//} }
self->processedCmd[req.cmdID] = 1; self->processedCmd[req.cmdID] = 1;
self->clearInProgressFlag(cmdType); self->clearInProgressFlag(cmdType);
@ -443,7 +443,7 @@ ACTOR Future<Void> registerMutationsToMasterApplier(Reference<RestoreLoaderData>
} }
// TODO: ATTENTION: Different loaders may generate the same CMDUID, which may let applier miss some mutations
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self) { ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self) {
printf("[INFO][Loader] Node:%s self->masterApplierInterf:%s, registerMutationsToApplier\n", printf("[INFO][Loader] Node:%s self->masterApplierInterf:%s, registerMutationsToApplier\n",
self->describeNode().c_str(), self->masterApplierInterf.toString().c_str()); self->describeNode().c_str(), self->masterApplierInterf.toString().c_str());
@ -471,7 +471,8 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
splitMutationIndex = 0; splitMutationIndex = 0;
kvCount = 0; kvCount = 0;
state std::map<Version, Standalone<VectorRef<MutationRef>>>::iterator kvOp; state std::map<Version, Standalone<VectorRef<MutationRef>>>::iterator kvOp;
self->cmdID.initPhase(RestoreCommandEnum::Loader_Send_Mutations_To_Applier); // MX: NEED TO A WAY TO GENERATE NON_DUPLICATE CMDUID across loaders
self->cmdID.setPhase(RestoreCommandEnum::Loader_Send_Mutations_To_Applier); //MX: THIS MAY BE WRONG! CMDID may duplicate across loaders
// In case try-catch has error and loop back // In case try-catch has error and loop back
applierMutationsBuffer.clear(); applierMutationsBuffer.clear();
applierMutationsSize.clear(); applierMutationsSize.clear();
@ -550,10 +551,10 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
self->cmdID.nextCmd(); self->cmdID.nextCmd();
cmdReplies.push_back(applierCmdInterf.sendMutationVector.getReply( cmdReplies.push_back(applierCmdInterf.sendMutationVector.getReply(
RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID]))); RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID])));
printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", applierMutationsBuffer[applierID].size());
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size()); applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0; applierMutationsSize[applierID] = 0;
printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", cmdReplies.size());
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) ); std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
cmdReplies.clear(); cmdReplies.clear();
} }
@ -572,9 +573,9 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
self->cmdID.nextCmd(); self->cmdID.nextCmd();
cmdReplies.push_back(applierCmdInterf.sendMutationVector.getReply( cmdReplies.push_back(applierCmdInterf.sendMutationVector.getReply(
RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID]))); RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID])));
printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", applierMutationsBuffer[applierID].size());
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size()); applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0; applierMutationsSize[applierID] = 0;
printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", cmdReplies.size());
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) ); // Q: We need to wait for each reply, otherwise, correctness has error. Why? std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) ); // Q: We need to wait for each reply, otherwise, correctness has error. Why?
cmdReplies.clear(); cmdReplies.clear();
} }
@ -816,7 +817,7 @@ bool isRangeMutation(MutationRef m) {
} }
if ( debug_verbose ) { if ( debug_verbose ) {
printf("----------------------------------------------------------Register Backup Mutation into KVOPs version:%08lx\n", commitVersion); printf("----------------------------------------------------------Register Backup Mutation into KVOPs version:0x%08lx (%08ld)\n", commitVersion, commitVersion);
printf("To decode value:%s\n", getHexString(val).c_str()); printf("To decode value:%s\n", getHexString(val).c_str());
} }
// In sampling, the last mutation vector may be not complete, we do not concatenate for performance benefit // In sampling, the last mutation vector may be not complete, we do not concatenate for performance benefit
@ -865,6 +866,9 @@ bool isRangeMutation(MutationRef m) {
printf("%s---LogFile parsed mutations. Prefix:[%d]: Version:%016lx Type:%d K:%s V:%s k_size:%d v_size:%d\n", prefix.c_str(), printf("%s---LogFile parsed mutations. Prefix:[%d]: Version:%016lx Type:%d K:%s V:%s k_size:%d v_size:%d\n", prefix.c_str(),
kvCount, kvCount,
commitVersion, type, getHexString(KeyRef(k, kLen)).c_str(), getHexString(KeyRef(v, vLen)).c_str(), kLen, vLen); commitVersion, type, getHexString(KeyRef(k, kLen)).c_str(), getHexString(KeyRef(v, vLen)).c_str(), kLen, vLen);
printf("%s[PrintAgain]---LogFile parsed mutations. Prefix:[%d]: Version:%016lx (%016ld) Type:%d K:%s V:%s k_size:%d v_size:%d\n", prefix.c_str(),
kvCount,
commitVersion, commitVersion, type, KeyRef(k, kLen).toString().c_str(), KeyRef(v, vLen).toString().c_str(), kLen, vLen);
} }
} }