FastRestore:Recruit exact number of restore worker

We can configure 1 loader and 1 applier to simplify
the debug process.
This commit is contained in:
Meng Xu 2019-05-13 22:06:44 -07:00
parent c115e3ceb1
commit 3fcdc39b93
4 changed files with 76 additions and 9 deletions

View File

@ -51,8 +51,10 @@
#include "flow/actorcompiler.h" // This must be the last #include.
// These configurations for restore workers will be set in initRestoreWorkerConfig() later.
int MIN_NUM_WORKERS = 3; //10; // TODO: This can become a configuration param later
int MIN_NUM_WORKERS = 2; //10; // TODO: This can become a configuration param later
int ratio_loader_to_applier = 1; // the ratio of loader over applier. The loader number = total worker * (ratio / (ratio + 1) )
int NUM_LOADERS = 1;
int NUM_APPLIERS = 1;
int FastRestore_Failure_Timeout = 3600; // seconds
double loadBatchSizeMB = 1; // MB
double loadBatchSizeThresholdB = loadBatchSizeMB * 1024 * 1024;
@ -71,7 +73,7 @@ ACTOR Future<Void> handlerTerminateWorkerRequest(RestoreSimpleRequest req, Refer
ACTOR Future<Void> monitorWorkerLiveness(Reference<RestoreWorkerData> self);
ACTOR Future<Void> commitRestoreRoleInterfaces(Reference<RestoreWorkerData> self, Database cx);
ACTOR Future<Void> handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWorkerData> self, ActorCollection *actors, Database cx);
ACTOR Future<Void> collectRestoreWorkerInterface(Reference<RestoreWorkerData> self, Database cx, int min_num_workers);
ACTOR Future<Void> collectRestoreWorkerInterface(Reference<RestoreWorkerData> self, Database cx, int min_num_workers = 2);
ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> self);
bool debug_verbose = true;
@ -230,6 +232,8 @@ ACTOR Future<Void> handlerTerminateWorkerRequest(RestoreSimpleRequest req, Refer
void initRestoreWorkerConfig() {
MIN_NUM_WORKERS = g_network->isSimulated() ? 3 : 120; //10; // TODO: This can become a configuration param later
ratio_loader_to_applier = 1; // the ratio of loader over applier. The loader number = total worker * (ratio / (ratio + 1) )
NUM_LOADERS = 1;
NUM_APPLIERS = 1;
FastRestore_Failure_Timeout = 3600; // seconds
loadBatchSizeMB = g_network->isSimulated() ? 1 : 10 * 1000.0; // MB
loadBatchSizeThresholdB = loadBatchSizeMB * 1024 * 1024;
@ -356,6 +360,65 @@ ACTOR Future<Void> handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Refer
}
// Keep only k restore workers and remove redundant restore workers
ACTOR Future<Void> removeRedundantRestoreWorkers(Reference<RestoreWorkerData> self, Database cx) {
printf("%s:Start configuring roles for workers\n", self->describeNode().c_str());
ASSERT( self->masterData.isValid() );
// Set up the role, and the global status for each node
int numNodes = self->workers_workerInterface.size();
state int numLoader = NUM_LOADERS; //numNodes * ratio_loader_to_applier / (ratio_loader_to_applier + 1);
int numApplier = NUM_APPLIERS; //numNodes - numLoader;
state int numWorkers = numLoader + numApplier;
if ( numNodes == numWorkers ) {
return Void();
} else if ( numNodes < numWorkers ) {
fprintf(stderr, "actual number_of_workers:%d < expected number_of_workers:%d\n", numNodes, numWorkers);
}
state int nodeIndex = 0;
state UID nodeID;
loop {
try {
std::vector<Future<RestoreCommonReply>> cmdReplies;
nodeIndex = 0;
printf("Node:%s Start remove %d redundant restore worker\n", self->describeNode().c_str(), self->workers_workerInterface.size() - numWorkers);
self->cmdID.initPhase(RestoreCommandEnum::Remove_Redundant_Worker);
for (auto &workerInterf : self->workers_workerInterface) {
if ( nodeIndex < numWorkers ) {
nodeIndex++;
continue;
}
nodeID = workerInterf.first;
self->cmdID.nextCmd();
printf("[CMD:%s] Node:%s Remove restore worker(index=%d uid=%s)\n", self->cmdID.toString().c_str(), self->describeNode().c_str(),
nodeIndex, nodeID.toString().c_str());
cmdReplies.push_back( workerInterf.second.terminateWorker.getReply(RestoreSimpleRequest(self->cmdID)) );
nodeIndex++;
}
std::vector<RestoreCommonReply> reps = wait( timeoutError(getAll(cmdReplies), FastRestore_Failure_Timeout) );
// Get the updated key-value for restore worker interfaces
self->workers_workerInterface.clear();
wait( collectRestoreWorkerInterface(self, cx) );
printf("[RemoveRedundantWorkers] Finished\n");
break;
} catch (Error &e) {
// Handle the command reply timeout error
fprintf(stdout, "[ERROR] Node:%s, Commands before cmdID:%s error. error code:%d, error message:%s\n", self->describeNode().c_str(),
self->cmdID.toString().c_str(), e.code(), e.what());
printf("Node:%s waits on replies time out. Current phase: removeRedundantRestoreWorkers, Retry all commands.\n", self->describeNode().c_str());
wait( delay(5.0) );
self->workers_workerInterface.clear();
wait( collectRestoreWorkerInterface(self, cx) );
}
}
return Void();
}
// RestoreWorker that has restore master role: Recruite a role for each worker
ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> self) {
printf("%s:Start configuring roles for workers\n", self->describeNode().c_str());
@ -363,8 +426,8 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> self) {
// Set up the role, and the global status for each node
int numNodes = self->workers_workerInterface.size();
state int numLoader = numNodes * ratio_loader_to_applier / (ratio_loader_to_applier + 1);
int numApplier = numNodes - numLoader;
state int numLoader = NUM_LOADERS; //numNodes * ratio_loader_to_applier / (ratio_loader_to_applier + 1);
state int numApplier = NUM_APPLIERS; //numNodes - numLoader;
if (numLoader <= 0 || numApplier <= 0) {
ASSERT( numLoader > 0 ); // Quick check in correctness
ASSERT( numApplier > 0 );
@ -382,6 +445,8 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> self) {
try {
std::vector<Future<RestoreCommonReply>> cmdReplies;
self->cmdID.initPhase(RestoreCommandEnum::Recruit_Role_On_Worker);
printf("numLoader:%d, numApplier:%d, self->workers_workerInterface.size:%d\n", numLoader, numApplier, self->workers_workerInterface.size());
ASSERT( numLoader + numApplier == self->workers_workerInterface.size() ); // We assign 1 role per worker for now
for (auto &workerInterf : self->workers_workerInterface) {
if ( nodeIndex < numLoader ) {
role = RestoreRole::Loader;
@ -438,7 +503,7 @@ ACTOR Future<Void> startRestoreWorker(Reference<RestoreWorkerData> self, Restore
// Destroy the worker at the end of the restore
// TODO: Cancel its own actors
requestTypeStr = "terminateWorker";
actors.add( handlerTerminateWorkerRequest(req, self, interf, cx) );
wait( handlerTerminateWorkerRequest(req, self, interf, cx) );
return Void();
}
}
@ -524,6 +589,8 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
wait( collectRestoreWorkerInterface(self, cx, MIN_NUM_WORKERS) );
wait( removeRedundantRestoreWorkers(self, cx) );
state Future<Void> workersFailureMonitor = monitorWorkerLiveness(self);
// configureRoles must be after collectWorkerInterface

View File

@ -164,11 +164,11 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
}
//ASSERT(lowerBounds.size() <= numAppliers + 1); // We may have at most numAppliers + 1 key ranges
if ( lowerBounds.size() >= numAppliers ) {
if ( lowerBounds.size() > numAppliers ) {
printf("[WARNING] Key ranges number:%ld > numAppliers:%d. Merge the last ones\n", lowerBounds.size(), numAppliers);
}
while ( lowerBounds.size() >= numAppliers ) {
while ( lowerBounds.size() > numAppliers ) {
printf("[WARNING] Key ranges number:%ld > numAppliers:%d. Merge the last ones\n", lowerBounds.size(), numAppliers);
lowerBounds.pop_back();
}

View File

@ -695,7 +695,7 @@ ACTOR static Future<Void> sampleWorkload(Reference<RestoreMasterData> self, Rest
printf("[Sampling][CMDRep] number of key ranges calculated by master applier:%d\n", rep.keyRangeNum);
numKeyRanges = rep.keyRangeNum;
if (numKeyRanges <= 0 || numKeyRanges >= self->appliersInterf.size() ) {
if (numKeyRanges <= 0 || numKeyRanges > self->appliersInterf.size() ) {
printf("[WARNING] Calculate_Applier_KeyRange receives wrong reply (numKeyRanges:%ld) from other phases. appliersInterf.size:%d Retry Calculate_Applier_KeyRange\n", numKeyRanges, self->appliersInterf.size());
UNREACHABLE();
}

View File

@ -45,7 +45,7 @@ enum class RestoreCommandEnum : uint32_t {Init = 0,
Loader_Notify_Appler_To_Apply_Mutation,
Notify_Loader_ApplierKeyRange, Notify_Loader_ApplierKeyRange_Done, //20
Finish_Restore, Reset_VersionBatch, Set_WorkerInterface, Collect_RestoreRoleInterface, // 24
Heart_Beat, Recruit_Role_On_Worker};
Heart_Beat, Recruit_Role_On_Worker, Remove_Redundant_Worker};
BINARY_SERIALIZABLE(RestoreCommandEnum);
enum class RestoreRole {Invalid = 0, Master = 1, Loader, Applier};