FastRestore: Initiliaze version batch as a request
This commit is contained in:
parent
6a86492e6e
commit
4e0b01cc10
|
@ -2557,9 +2557,11 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
|
|||
state Database cx = cx_input;
|
||||
state RestoreInterface interf;
|
||||
interf.initEndpoints();
|
||||
interf.initNodeID();
|
||||
state Optional<RestoreInterface> leaderInterf;
|
||||
//Global data for the worker
|
||||
state Reference<RestoreData> rd = Reference<RestoreData>(new RestoreData());
|
||||
rd->localNodeStatus.nodeID = interf.id();
|
||||
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
|
@ -2872,16 +2874,13 @@ ACTOR Future<Void> initializeVersionBatch(Reference<RestoreData> rd, int batchIn
|
|||
RestoreRole role = rd->globalNodeStatus[index].role;
|
||||
UID nodeID = rd->globalNodeStatus[index].nodeID;
|
||||
rd->cmdID.nextCmd();
|
||||
printf("[CMD:%s] Node:%s Set role (%s) to node (index=%d uid=%s)\n", rd->cmdID.toString().c_str(), rd->describeNode().c_str(),
|
||||
getRoleStr(role).c_str(), index, nodeID.toString().c_str());
|
||||
printf("[CMD:%s] Node:%s Initialize version batch %d\n", rd->cmdID.toString().c_str(), rd->describeNode().c_str(),
|
||||
batchIndex);
|
||||
cmdReplies.push_back( cmdInterf.initVersionBatch.getReply(RestoreVersionBatchRequest(rd->cmdID, batchIndex)) );
|
||||
index++;
|
||||
}
|
||||
std::vector<RestoreCommonReply> reps = wait( timeoutError(getAll(cmdReplies), FastRestore_Failure_Timeout) );
|
||||
for (int i = 0; i < reps.size(); ++i) {
|
||||
printf("[INFO] Node:%s, CMDReply for CMD:%s, node:%s\n", rd->describeNode().c_str(), reps[i].cmdID.toString().c_str(),
|
||||
reps[i].id.toString().c_str());
|
||||
}
|
||||
printf("Initilaize Version Batch done\n");
|
||||
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
|
@ -3834,6 +3833,17 @@ ACTOR Future<std::string> RestoreConfig::getProgress_impl(Reference<RestoreConfi
|
|||
|
||||
//// -- New implementation of restore following storage server example
|
||||
|
||||
|
||||
ACTOR Future<Void> handleVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreData> rd, RestoreInterface interf) {
|
||||
printf("[Batch:%d] Node:%s Start...\n", req.batchID, rd->describeNode().c_str());
|
||||
rd->resetPerVersionBatch();
|
||||
rd->processedFiles.clear();
|
||||
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
|
||||
|
||||
// This actor never returns. You may cancel it in master
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handleSetRoleRequest(RestoreSetRoleRequest req, Reference<RestoreData> rd, RestoreInterface interf) {
|
||||
|
||||
//ASSERT(req.cmdID.phase == RestoreCommandEnum::Set_Role);
|
||||
|
@ -4439,9 +4449,7 @@ ACTOR Future<Void> workerCore(Reference<RestoreData> rd, RestoreInterface ri, Da
|
|||
}
|
||||
|
||||
when ( RestoreVersionBatchRequest req = waitNext(ri.initVersionBatch.getFuture()) ) {
|
||||
printf("[Batch:%d] Node:%s Start...\n", req.batchID, rd->describeNode().c_str());
|
||||
rd->resetPerVersionBatch();
|
||||
rd->processedFiles.clear();
|
||||
wait(handleVersionBatchRequest(req, rd, ri));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue