FastRestore: Use setWorkerInterface request

Use setWorkerInterface request to trigger each worker to
read all workers interface from DB
This commit is contained in:
Meng Xu 2019-04-09 21:07:16 -07:00
parent 84707e6a50
commit 699c95ea1d
2 changed files with 17 additions and 6 deletions

View File

@ -1435,7 +1435,7 @@ ACTOR static Future<Void> prepareRestoreFilesV2(Reference<RestoreData> rd, Datab
}
ACTOR Future<Void> setWorkerInterface(Reference<RestoreData> rd, Database cx) {
ACTOR Future<Void> setWorkerInterface(RestoreSimpleRequest req, Reference<RestoreData> rd, RestoreInterface interf, Database cx) {
state Transaction tr(cx);
state vector<RestoreInterface> agents; // agents is cmdsInterf
@ -1456,6 +1456,7 @@ ACTOR Future<Void> setWorkerInterface(Reference<RestoreData> rd, Database cx) {
break;
}
wait( delay(5.0) );
req.reply.send(RestoreCommonReply(interf.id(), req.cmdID));
} catch( Error &e ) {
printf("[WARNING] Node:%s setWorkerInterface() transaction error:%s\n", rd->describeNode().c_str(), e.what());
wait( tr.onError(e) );
@ -1463,6 +1464,7 @@ ACTOR Future<Void> setWorkerInterface(Reference<RestoreData> rd, Database cx) {
printf("[WARNING] Node:%s setWorkerInterface should always succeed in the first loop! Something goes wrong!\n", rd->describeNode().c_str());
};
return Void();
}
@ -2602,12 +2604,8 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
//we are not the leader, so put our interface in the agent list
if(leaderInterf.present()) {
// Initialize the node's UID
rd->localNodeStatus.nodeID = interf.id();
//rd->localNodeStatus.nodeID = interf.id();
// Step: Find other worker's interfaces
// NOTE: This must be after wait(configureRolesHandler()) because we must ensure all workers have registered their interfaces into DB before we can read the interface.
// TODO: Wait until all workers have registered their interface.
wait( setWorkerInterface(rd, cx) );
wait( workerCore(rd, interf, cx) );
}
@ -3606,6 +3604,8 @@ ACTOR Future<Void> registerMutationsToMasterApplier(Reference<RestoreData> rd) {
rd->workers_interface.find(rd->masterApplier) != rd->workers_interface.end());
//printAppliersKeyRange(rd);
ASSERT(rd->workers_interface.find(rd->masterApplier) != rd->workers_interface.end());
state RestoreInterface applierCmdInterf = rd->workers_interface[rd->masterApplier];
state UID applierID = rd->masterApplier;
state int packMutationNum = 0;
@ -4413,6 +4413,15 @@ ACTOR Future<Void> workerCore(Reference<RestoreData> rd, RestoreInterface ri, Da
wait(handleVersionBatchRequest(req, rd, ri));
}
when ( RestoreSimpleRequest req = waitNext(ri.setWorkerInterface.getFuture()) ) {
// Step: Find other worker's interfaces
// NOTE: This must be after wait(configureRolesHandler()) because we must ensure all workers have registered their interfaces into DB before we can read the interface.
// TODO: Wait until all workers have registered their interface.
wait( setWorkerInterface(req, rd, ri, cx) );
}
}
} catch (Error &e) {

View File

@ -131,6 +131,8 @@ struct RestoreInterface {
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
RequestStream<RestoreSimpleRequest> setWorkerInterface;
// ToDelete
// RequestStream< struct RestoreCommand > cmd; // Restore commands from master to loader and applier
// RequestStream< struct RestoreRequest > request; // Restore requests used by loader and applier