FastRestore: Actors execution seq. can be weird
This commit is contained in:
parent
010b069da6
commit
d32a1489f0
|
@ -768,6 +768,7 @@ struct RestoreData : NonCopyable, public ReferenceCounted<RestoreData> {
|
|||
mutationMap.clear();
|
||||
mutationPartMap.clear();
|
||||
processedCmd.clear();
|
||||
inProgressApplyToDB = false;
|
||||
}
|
||||
|
||||
vector<UID> getBusyAppliers() {
|
||||
|
@ -2384,6 +2385,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(RestoreInterface int
|
|||
|
||||
// We should load log file before we do range file
|
||||
state RestoreCommandEnum phaseType = RestoreCommandEnum::Assign_Loader_Log_File;
|
||||
state std::vector<Future<RestoreCommonReply>> cmdReplies;
|
||||
loop {
|
||||
state int curFileIndex = 0; // The smallest index of the files that has not been FULLY loaded
|
||||
state bool allLoadReqsSent = false;
|
||||
|
@ -2394,7 +2396,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(RestoreInterface int
|
|||
}
|
||||
wait(delay(1.0));
|
||||
|
||||
state std::vector<Future<RestoreCommonReply>> cmdReplies;
|
||||
cmdReplies.clear();
|
||||
printf("[INFO] Number of backup files:%ld\n", rd->files.size());
|
||||
rd->cmdID.initPhase(phaseType);
|
||||
for (auto &loaderID : loaderIDs) {
|
||||
|
@ -2482,8 +2484,10 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(RestoreInterface int
|
|||
// Question: How to set reps to different value based on cmdReplies.empty()?
|
||||
if ( !cmdReplies.empty() ) {
|
||||
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) ); //TODO: change to getAny. NOTE: need to keep the still-waiting replies
|
||||
//std::vector<RestoreCommonReply> reps = wait( getAll(cmdReplies) );
|
||||
|
||||
finishedLoaderIDs.clear();
|
||||
cmdReplies.clear();
|
||||
for (int i = 0; i < reps.size(); ++i) {
|
||||
printf("[INFO] Get Ack reply:%s for Assign_Loader_File\n",
|
||||
reps[i].toString().c_str());
|
||||
|
@ -2519,7 +2523,10 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(RestoreInterface int
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT( cmdReplies.empty() );
|
||||
|
||||
wait( delay(5.0) );
|
||||
// Notify the applier to applly mutation to DB
|
||||
wait( notifyApplierToApplyMutations(rd) );
|
||||
|
||||
|
@ -2547,7 +2554,8 @@ ACTOR Future<Void> notifyApplierToApplyMutations(Reference<RestoreData> rd) {
|
|||
cmdReplies.push_back( cmdInterf.applyToDB.getReply(RestoreSimpleRequest(rd->cmdID)) );
|
||||
}
|
||||
printf("[INFO] Wait for %ld appliers to apply mutations to DB\n", appliers.size());
|
||||
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
|
||||
//std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
|
||||
std::vector<RestoreCommonReply> reps = wait( getAll(cmdReplies) );
|
||||
printf("[INFO] %ld appliers finished applying mutations to DB\n", appliers.size());
|
||||
|
||||
cmdReplies.clear();
|
||||
|
@ -3628,7 +3636,7 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreData> rd) {
|
|||
kvCount++;
|
||||
if (packMutationNum >= packMutationThreshold) {
|
||||
ASSERT( packMutationNum == packMutationThreshold );
|
||||
printf("[INFO][Loader] Waits for applier to receive %ld mutations\n", cmdReplies.size());
|
||||
printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", cmdReplies.size());
|
||||
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
|
||||
cmdReplies.clear();
|
||||
packMutationNum = 0;
|
||||
|
@ -3663,7 +3671,8 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreData> rd) {
|
|||
}
|
||||
|
||||
if (!cmdReplies.empty()) {
|
||||
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
|
||||
//std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
|
||||
std::vector<RestoreCommonReply> reps = wait( getAll(cmdReplies) );
|
||||
cmdReplies.clear();
|
||||
}
|
||||
printf("[Summary][Loader] Node:%s Last CMDUID:%s produces %d mutation operations\n",
|
||||
|
@ -3679,8 +3688,8 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreData> rd) {
|
|||
fprintf(stderr, "[ERROR] Node:%s, Commands before cmdID:%s error. error code:%d, error message:%s\n", rd->describeNode().c_str(),
|
||||
rd->cmdID.toString().c_str(), e.code(), e.what());
|
||||
}
|
||||
fprintf(stderr, "[ERROR] WE STOP HERE FOR DEBUG\n");
|
||||
break;
|
||||
//fprintf(stderr, "[ERROR] WE STOP HERE FOR DEBUG\n");
|
||||
//break;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -4053,7 +4062,7 @@ ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Refere
|
|||
// printf("[WARNING]Node:%s, CMDUID:%s file:%s is delivered more than once! Reply directly without loading the file\n",
|
||||
// rd->describeNode().c_str(), req.cmdID.toString().c_str(),
|
||||
// param.filename.c_str());
|
||||
//req.reply.send(RestoreCommonReply(interf.id(),req.cmdID));
|
||||
req.reply.send(RestoreCommonReply(interf.id(),req.cmdID));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -4093,6 +4102,8 @@ ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Refere
|
|||
// rd->describeNode().c_str(), rd->cmdID.toString().c_str());
|
||||
wait( registerMutationsToApplier(rd) ); // Send the parsed mutation to applier who will apply the mutation to DB
|
||||
|
||||
printf("[INFO][Loader] Node:%s CMDUID:%s send ack.\n",
|
||||
rd->describeNode().c_str(), rd->cmdID.toString().c_str());
|
||||
//Send ack to master that loader has finished loading the data
|
||||
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
|
||||
|
||||
|
@ -4186,7 +4197,7 @@ ACTOR Future<Void> handleSendMutationRequest(RestoreSendMutationRequest req, Ref
|
|||
state int numMutations = 0;
|
||||
|
||||
//ASSERT(req.cmdID.phase == RestoreCommandEnum::Loader_Send_Mutations_To_Applier);
|
||||
if ( debug_verbose ) {
|
||||
if ( debug_verbose || true ) {
|
||||
printf("[VERBOSE_DEBUG] Node:%s receive mutation:%s\n", rd->describeNode().c_str(), req.mutation.toString().c_str());
|
||||
}
|
||||
// Handle duplicat cmd
|
||||
|
@ -4254,27 +4265,6 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
|
|||
return Void();
|
||||
}
|
||||
|
||||
|
||||
// ACTOR Future<Void> handleApplyToDBRequest(Reference<RestoreData> rd, Database cx) {
|
||||
// if ( rd->isCmdProcessed(req.cmdID) ) {
|
||||
// printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
|
||||
// req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
|
||||
// return Void();
|
||||
// }
|
||||
// sanityCheckMutationOps(rd);
|
||||
// // Applier apply mutations to DB
|
||||
// printf("[INFO][Applier] apply KV ops to DB starts...\n");
|
||||
// wait( applyKVOpsToDB(rd, cx) );
|
||||
// printf("[INFO][Applier] apply KV ops to DB finishes...\n");
|
||||
// req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
|
||||
// printf("[INFO][Applier] Node: %s, At the end of its functionality! Hang here to make sure master proceeds!\n",
|
||||
// rd->describeNode().c_str());
|
||||
// rd->processedCmd[req.cmdID] = 1;
|
||||
|
||||
// return Void();
|
||||
// }
|
||||
|
||||
|
||||
ACTOR Future<Void> handleApplyToDBRequest(RestoreSimpleRequest req, Reference<RestoreData> rd, RestoreInterface interf, Database cx) {
|
||||
state bool isPrint = false; //Debug message
|
||||
state std::string typeStr = "";
|
||||
|
@ -4292,6 +4282,7 @@ ACTOR Future<Void> handleSendSampleMutationRequest(RestoreSendMutationRequest re
|
|||
}
|
||||
|
||||
if (rd->kvOps.empty()) {
|
||||
printf("Node:%s kvOps is empty. No-op for apply to DB\n", rd->describeNode().c_str());
|
||||
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
|
||||
rd->processedCmd[req.cmdID] = 1;
|
||||
return Void();
|
||||
|
|
|
@ -115,12 +115,12 @@ struct CycleWorkload : TestWorkload {
|
|||
tr.set( self->key(r), self->value(r3) );
|
||||
tr.set( self->key(r2), self->value(r4) );
|
||||
tr.set( self->key(r3), self->value(r2) );
|
||||
// TraceEvent("CyclicTestMX").detail("Key", self->key(r).toString()).detail("Value", self->value(r3).toString());
|
||||
// TraceEvent("CyclicTestMX").detail("Key", self->key(r2).toString()).detail("Value", self->value(r4).toString());
|
||||
// TraceEvent("CyclicTestMX").detail("Key", self->key(r3).toString()).detail("Value", self->value(r2).toString());
|
||||
TraceEvent("CyclicTestMX").detail("Key", self->key(r).toString()).detail("Value", self->value(r3).toString());
|
||||
TraceEvent("CyclicTestMX").detail("Key", self->key(r2).toString()).detail("Value", self->value(r4).toString());
|
||||
TraceEvent("CyclicTestMX").detail("Key", self->key(r3).toString()).detail("Value", self->value(r2).toString());
|
||||
|
||||
wait( tr.commit() );
|
||||
//TraceEvent("CycleCommit");
|
||||
TraceEvent("CycleCommit");
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) ++self->tooOldRetries;
|
||||
|
|
Loading…
Reference in New Issue