BugFix: master should wait until at least 2 workers have registered their interfaces

otherwise, when master proceeds to distribute workload, it will find 0 loader or applier, which violates the invariant
This commit is contained in:
Meng Xu 2019-01-10 19:56:17 -08:00
parent db3f1a9663
commit c91d143504
3 changed files with 51 additions and 71 deletions

View File

@ -606,6 +606,8 @@ std::pair<int, int> getNumLoaderAndApplier(Reference<RestoreData> restoreData){
numLoaders++;
} else if (restoreData->globalNodeStatus[i].role == RestoreRole::Applier) {
numAppliers++;
} else {
printf("[ERROR] unknown role: %d\n", restoreData->globalNodeStatus[i].role);
}
}
@ -1231,13 +1233,13 @@ ACTOR static Future<Void> prepareRestoreFilesV2(Reference<RestoreData> restoreDa
ACTOR Future<Void> setWorkerInterface(Reference<RestoreData> restoreData, Database cx) {
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state vector<RestoreCommandInterface> agents; // agents is cmdsInterf
printf("[INFO][Master] Start configuring roles for workers\n");
printf("[INFO][Worker] Node:%s Get the interface for all workers\n", restoreData->getNodeID().c_str());
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<RangeResultRef> agentValues = wait(tr.getRange(restoreWorkersKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!agentValues.more);
if(agentValues.size()) {
@ -1265,16 +1267,18 @@ ACTOR Future<Void> setWorkerInterface(Reference<RestoreData> restoreData, Databa
// The master node's localNodeStatus has been set outside of this function
ACTOR Future<Void> configureRoles(Reference<RestoreData> restoreData, Database cx) { //, VectorRef<RestoreInterface> ret_agents
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state int min_num_workers = 2; // TODO: This can become a configuration param later
state vector<RestoreCommandInterface> agents; // agents is cmdsInterf
printf("[INFO][Master] Start configuring roles for workers\n");
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<RangeResultRef> agentValues = wait(tr.getRange(restoreWorkersKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!agentValues.more);
if(agentValues.size()) {
// If agentValues.size() < min_num_workers, we should wait for coming workers to register their interface before we read them once for all
if(agentValues.size() >= min_num_workers) {
for(auto& it : agentValues) {
agents.push_back(BinaryReader::fromStringRef<RestoreCommandInterface>(it.value, IncludeVersion()));
// Save the RestoreCommandInterface for the later operations
@ -1288,11 +1292,14 @@ ACTOR Future<Void> configureRoles(Reference<RestoreData> restoreData, Database c
wait( tr.onError(e) );
}
}
ASSERT(agents.size() >= min_num_workers); // ASSUMPTION: We must have at least 1 loader and 1 applier
// Set up the role, and the global status for each node
int numNodes = agents.size();
int numLoader = numNodes / 2;
int numApplier = numNodes - numLoader;
if (numLoader <= 0 || numApplier <= 0) {
ASSERT( numLoader > 0 ); // Quick check in correctness
ASSERT( numApplier > 0 );
fprintf(stderr, "[ERROR] not enough nodes for loader and applier. numLoader:%d, numApplier:%d\n", numLoader, numApplier);
} else {
printf("[INFO] numWorkders:%d numLoader:%d numApplier:%d\n", numNodes, numLoader, numApplier);
@ -1361,7 +1368,13 @@ ACTOR Future<Void> configureRoles(Reference<RestoreData> restoreData, Database c
break;
}
//Sanity check roles configuration
std::pair<int, int> numWorkers = getNumLoaderAndApplier(restoreData);
int numLoaders = numWorkers.first;
int numAppliers = numWorkers.second;
ASSERT( restoreData->globalNodeStatus.size() > 0 );
ASSERT( numLoaders > 0 );
ASSERT( numAppliers > 0 );
printf("Role:%s finish configure roles\n", getRoleStr(restoreData->localNodeStatus.role).c_str());
return Void();
@ -1370,6 +1383,7 @@ ACTOR Future<Void> configureRoles(Reference<RestoreData> restoreData, Database c
// Handle restore command request on workers
ACTOR Future<Void> configureRolesHandler(Reference<RestoreData> restoreData, RestoreCommandInterface interf) {
printf("[INFO][Worker] Node: ID_unset yet, starts configureRolesHandler\n");
loop {
choose {
when(RestoreCommand req = waitNext(interf.cmd.getFuture())) {
@ -1385,11 +1399,11 @@ ACTOR Future<Void> configureRolesHandler(Reference<RestoreData> restoreData, Res
restoreData->localNodeStatus.init(req.role);
restoreData->localNodeStatus.nodeID = interf.id();
restoreData->masterApplier = req.masterApplier;
printf("[INFO][Worker] Set localNodeID to %s, set role to %s\n",
printf("[INFO][Worker] Set_Role localNodeID to %s, set role to %s\n",
restoreData->localNodeStatus.nodeID.toString().c_str(), getRoleStr(restoreData->localNodeStatus.role).c_str());
req.reply.send(RestoreCommandReply(interf.id()));
} else if (req.cmd == RestoreCommandEnum::Set_Role_Done) {
printf("[INFO][Worker] NodeID:%s (interf ID:%s) set to role:%s Done.\n",
printf("[INFO][Worker] Set_Role_Done NodeID:%s (interf ID:%s) set to role:%s Done.\n",
restoreData->localNodeStatus.nodeID.toString().c_str(),
interf.id().toString().c_str(),
getRoleStr(restoreData->localNodeStatus.role).c_str());
@ -1927,6 +1941,7 @@ ACTOR static Future<Void> distributeWorkload(RestoreCommandInterface interf, Ref
std::pair<int, int> numWorkers = getNumLoaderAndApplier(restoreData);
int numLoaders = numWorkers.first;
int numAppliers = numWorkers.second;
ASSERT( restoreData->globalNodeStatus.size() > 0 );
ASSERT( numLoaders > 0 );
ASSERT( numAppliers > 0 );
@ -1978,7 +1993,7 @@ ACTOR static Future<Void> distributeWorkload(RestoreCommandInterface interf, Ref
// We need to concatenate the related KVs to a big KV before we can parse the value into a vector of mutations at that version
// (2) The backuped KV are arranged in blocks in range file.
// For simplicity, we distribute at the granularity of files for now.
int loadingSizeMB = 10;
int loadingSizeMB = 10000; //NOTE: We want to load the entire file in the first version, so we want to make this as large as possible
state int loadSizeB = loadingSizeMB * 1024 * 1024;
state int loadingCmdIndex = 0;
state int curFileIndex = 0; // The smallest index of the files that has not been FULLY loaded
@ -2014,11 +2029,17 @@ ACTOR static Future<Void> distributeWorkload(RestoreCommandInterface interf, Ref
param.offset = restoreData->files[curFileIndex].cursor;
//param.length = std::min(restoreData->files[curFileIndex].fileSize - restoreData->files[curFileIndex].cursor, loadSizeB);
param.length = restoreData->files[curFileIndex].fileSize;
loadSizeB = param.length;
param.blockSize = restoreData->files[curFileIndex].blockSize;
param.restoreRange = restoreRange;
param.addPrefix = addPrefix;
param.removePrefix = removePrefix;
param.mutationLogPrefix = mutationLogPrefix;
if ( !(param.length > 0 && param.offset >= 0 && param.offset < restoreData->files[curFileIndex].fileSize) ) {
printf("[ERROR] param: length:%d offset:%d fileSize:%d for %dth filename:%s\n",
param.length, param.offset, restoreData->files[curFileIndex].fileSize, curFileIndex,
restoreData->files[curFileIndex].fileName.c_str());
}
ASSERT( param.length > 0 );
ASSERT( param.offset >= 0 );
ASSERT( param.offset < restoreData->files[curFileIndex].fileSize );
@ -2415,7 +2436,7 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
wait(tr.commit());
break;
} catch( Error &e ) {
printf("restoreWorker select leader error\n");
printf("restoreWorker select leader error, error code:%d error info:%s\n", e.code(), e.what());
wait( tr.onError(e) );
}
}
@ -2424,25 +2445,29 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
if(leaderInterf.present()) {
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
//tr.set(restoreWorkerKeyFor(interf.id()), BinaryWriter::toValue(interf, IncludeVersion()));
printf("[Worker] Worker restore interface id:%s\n", interf.id().toString().c_str());
tr.set(restoreWorkerKeyFor(interf.id()), restoreCommandInterfaceValue(interf));
wait(tr.commit());
break;
} catch( Error &e ) {
printf("[WARNING][Worker] Transaction of register worker interface fails for worker:%s\n", interf.id().toString().c_str());
wait( tr.onError(e) );
}
}
//Find other worker's interfaces
wait( setWorkerInterface(restoreData, cx) );
// Step: configure its role
printf("[INFO][Worker] Configure its role\n");
wait( configureRolesHandler(restoreData, interf) );
printf("[INFO][Worker] NodeID:%s is configure to %s\n",
restoreData->localNodeStatus.nodeID.toString().c_str(), getRoleStr(restoreData->localNodeStatus.role).c_str());
// Step: Find other worker's interfaces
// NOTE: This must be after wait(configureRolesHandler()) because we must ensure all workers have registered their interfaces into DB before we can read the interface.
wait( setWorkerInterface(restoreData, cx) );
// Step: prepare restore info: applier waits for the responsible keyRange,
// loader waits for the info of backup block it needs to load
if ( restoreData->localNodeStatus.role == RestoreRole::Applier ) {
@ -2491,7 +2516,8 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
}
//we are the leader
wait( delay(5.0) );
// We must wait for enough time to make sure all restore workers have registered their interfaces into the DB
wait( delay(10.0) );
//state vector<RestoreInterface> agents;
state VectorRef<RestoreInterface> agents;
@ -2560,16 +2586,17 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
tr3.set(restoreRequestDoneKey, restoreRequestDoneValue(restoreRequests.size()));
wait(tr3.commit());
TraceEvent("LeaderFinishRestoreRequest");
printf("[INFO] RestoreLeader write restoreRequestDoneKey\n");
printf("[INFO] RestoreLeader write restoreRequestDoneKey, restoreRequests.size:%d\n", restoreRequests.size());
// Verify by reading the key
tr3.reset();
tr3.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr3.setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<Value> numFinished = wait(tr3.get(restoreRequestDoneKey));
ASSERT(numFinished.present());
int num = decodeRestoreRequestDoneValue(numFinished.get());
printf("[INFO] RestoreLeader read restoreRequestDoneKey, numFinished:%d\n", num);
//NOTE: The restoreRequestDoneKey may be cleared by restore requester. Can NOT read this.
// tr3.reset();
// tr3.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// tr3.setOption(FDBTransactionOptions::LOCK_AWARE);
// state Optional<Value> numFinished = wait(tr3.get(restoreRequestDoneKey));
// ASSERT(numFinished.present());
// int num = decodeRestoreRequestDoneValue(numFinished.get());
// printf("[INFO] RestoreLeader read restoreRequestDoneKey, numFinished:%d\n", num);
break;
} catch( Error &e ) {
TraceEvent("RestoreAgentLeaderErrorTr3").detail("ErrorCode", e.code()).detail("ErrorName", e.name());
@ -2657,50 +2684,6 @@ ACTOR static Future<Version> restoreMX(RestoreCommandInterface interf, Reference
lockDB = true;
}
/*
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
state BackupDescription desc = wait(bc->describeBackup());
wait(desc.resolveVersionTimes(cx));
printf("Backup Description\n%s", desc.toString().c_str());
printf("MX: Restore for url:%s, lockDB:%d\n", url.toString().c_str(), lockDB);
if(targetVersion == invalidVersion && desc.maxRestorableVersion.present())
targetVersion = desc.maxRestorableVersion.get();
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion));
//Above is the restore master code
//Below is the agent code
TraceEvent("RestoreMX").detail("StartRestoreForRequest", request.toString());
printf("RestoreMX: start restore for request: %s\n", request.toString().c_str());
if(!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
.detail("BackupContainer", bc->getURL())
.detail("TargetVersion", targetVersion);
fprintf(stderr, "ERROR: Restore version %lld is not possible from %s\n", targetVersion, bc->getURL().c_str());
throw restore_invalid_version();
} else {
printf("---To restore from the following files: num_logs_file:%d num_range_files:%d---\n",
restoreSet.get().logs.size(), restoreSet.get().ranges.size());
for (int i = 0; i < restoreSet.get().logs.size(); ++i) {
printf("log file:%s\n", restoreSet.get().logs[i].toString().c_str());
}
for (int i = 0; i < restoreSet.get().ranges.size(); ++i) {
printf("range file:%s\n", restoreSet.get().ranges[i].toString().c_str());
}
}
if (verbose) {
printf("Restoring backup to version: %lld\n", (long long) targetVersion);
TraceEvent("RestoreBackupMX").detail("TargetVersion", (long long) targetVersion);
}
*/
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Reference<RestoreConfig> restoreConfig(new RestoreConfig(randomUid));

View File

@ -660,6 +660,7 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
if( spec.phases & TestWorkload::EXECUTION ) {
TraceEvent("TestStarting").detail("WorkloadTitle", printable(spec.title));
printf("running test...\n");
printf("test WorkloadTitle:%s\n", printable(spec.title).c_str());
std::vector< Future<Void> > starts;
for(int i= 0; i < workloads.size(); i++)
starts.push_back( workloads[i].start.template getReply<Void>() );

View File

@ -9,10 +9,6 @@ testTitle=BackupAndRestore
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
testName=RunRestoreWorkerWorkload
testName=RunRestoreWorkerWorkload
testName=RunRestoreWorkerWorkload
testName=RunRestoreWorkerWorkload
; Test case for parallel restore
testName=BackupAndParallelRestoreCorrectness