Clean up cluster controller's wait on recoveredDiskFiles (#9105)

The `recoveredDiskFiles` is a promise that will be fulfilled once all the local TLog and storage files have been initialized in a process. It was added previously to make a process wait on it before joining the cluster, and it was to avoid a slow recovering TLog to join the cluster to slowdown cluster recovery. 

With #7510, we allow a process to join the cluster to play stateless role, while still avoid it to join the cluster as stateful role before its TLog and storage is recovered. As such, the `recoveredDiskFiles` wait is no longer needed. This PR cleanup the logic.
This commit is contained in:
Yi Wu 2023-01-09 16:26:32 -08:00 committed by GitHub
parent 97d810cdba
commit 0849f60ef1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 56 deletions

View File

@ -93,8 +93,7 @@ ACTOR Future<Optional<Value>> getPreviousCoordinators(ClusterControllerData* sel
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
ClusterControllerData::DBInfo* db,
ServerCoordinators coordinators,
Future<Void> recoveredDiskFiles) {
ServerCoordinators coordinators) {
state MasterInterface iMaster;
state Reference<ClusterRecoveryData> recoveryData;
state PromiseStream<Future<Void>> addActor;
@ -2903,7 +2902,6 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
ServerCoordinators coordinators,
LocalityData locality,
ConfigDBType configDBType,
Future<Void> recoveredDiskFiles,
Reference<AsyncVar<Optional<UID>>> clusterId) {
state ClusterControllerData self(interf, locality, coordinators, clusterId);
state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY);
@ -2916,8 +2914,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
// EncryptKeyProxy is necessary for TLog recovery, recruit it as the first process
self.addActor.send(monitorEncryptKeyProxy(&self));
self.addActor.send(
clusterWatchDatabase(&self, &self.db, coordinators, recoveredDiskFiles)); // Start the master database
self.addActor.send(clusterWatchDatabase(&self, &self.db, coordinators)); // Start the master database
self.addActor.send(self.updateWorkerList.init(self.db.db));
self.addActor.send(statusServer(interf.clientInterface.databaseStatus.getFuture(),
&self,
@ -3061,7 +3058,6 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
LocalityData locality,
ConfigDBType configDBType,
Future<Void> recoveredDiskFiles,
Reference<AsyncVar<Optional<UID>>> clusterId) {
loop {
state ClusterControllerFullInterface cci;
@ -3091,8 +3087,7 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
inRole = true;
wait(clusterControllerCore(
cci, leaderFail, coordinators, locality, configDBType, recoveredDiskFiles, clusterId));
wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType, clusterId));
}
} catch (Error& e) {
if (inRole)
@ -3114,7 +3109,6 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
Future<Void> recoveredDiskFiles,
LocalityData locality,
ConfigDBType configDBType,
Reference<AsyncVar<Optional<UID>>> clusterId) {
@ -3122,14 +3116,8 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
loop {
try {
ServerCoordinators coordinators(connRecord, configDBType);
wait(clusterController(coordinators,
currentCC,
hasConnected,
asyncPriorityInfo,
locality,
configDBType,
recoveredDiskFiles,
clusterId));
wait(clusterController(
coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType, clusterId));
hasConnected = true;
} catch (Error& e) {
if (e.code() != error_code_coordinators_changed)

View File

@ -1156,7 +1156,6 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> ccr,
ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
Future<Void> recoveredDiskFiles,
LocalityData locality,
ConfigDBType configDBType,
Reference<AsyncVar<Optional<UID>>> clusterId);

View File

@ -1732,7 +1732,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
int64_t memoryLimit,
std::string metricsConnFile,
std::string metricsPrefix,
Promise<Void> recoveredDiskFiles,
int64_t memoryProfileThreshold,
std::string _coordFolder,
std::string whitelistBinPaths,
@ -2064,24 +2063,18 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// However, the worker server can still serve stateless roles, and if encryption is on, it is crucial to have
// some worker available to serve the EncryptKeyProxy role, before opening encrypted storage files.
//
// When encryption-at-rest is enabled, the follow code allows a worker to first register with the
// cluster controller to be recruited only as a stateless process i.e. it can't be recruited as a SS or TLog
// process; once the local disk recovery is complete (if applicable), the process re-registers with cluster
// controller as a stateful process role.
//
// TODO(yiwu): Unify behavior for encryption and non-encryption once the change is stable.
// To achieve it, registrationClient allows a worker to first register with the cluster controller to be
// recruited only as a stateless process i.e. it can't be recruited as a SS or TLog process; once the local disk
// recovery is complete (if applicable), the process re-registers with cluster controller as a stateful process
// role.
Promise<Void> recoveredDiskFiles;
Future<Void> recoverDiskFiles = trigger(
[=]() {
TraceEvent("RecoveriesComplete", interf.id());
TraceEvent("DiskFileRecoveriesComplete", interf.id());
recoveredDiskFiles.send(Void());
return Void();
},
waitForAll(recoveries));
if (!SERVER_KNOBS->ENABLE_ENCRYPTION) {
wait(recoverDiskFiles);
} else {
errorForwarders.add(recoverDiskFiles);
}
errorForwarders.add(recoverDiskFiles);
errorForwarders.add(registrationClient(ccInterface,
interf,
@ -3489,7 +3482,6 @@ ACTOR Future<Void> monitorLeaderWithDelayedCandidacy(
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
Future<Void> recoveredDiskFiles,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
ConfigDBType configDBType,
@ -3497,8 +3489,6 @@ ACTOR Future<Void> monitorLeaderWithDelayedCandidacy(
state Future<Void> monitor = monitorLeaderWithDelayedCandidacyImpl(connRecord, currentCC);
state Future<Void> timeout;
wait(recoveredDiskFiles);
loop {
if (currentCC->get().present() && dbInfo->get().clusterInterface == currentCC->get().get() &&
IFailureMonitor::failureMonitor()
@ -3519,8 +3509,7 @@ ACTOR Future<Void> monitorLeaderWithDelayedCandidacy(
: Never())) {}
when(wait(timeout.isValid() ? timeout : Never())) {
monitor.cancel();
wait(clusterController(
connRecord, currentCC, asyncPriorityInfo, recoveredDiskFiles, locality, configDBType, clusterId));
wait(clusterController(connRecord, currentCC, asyncPriorityInfo, locality, configDBType, clusterId));
return Void();
}
}
@ -3595,7 +3584,6 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
std::map<std::string, std::string> manualKnobOverrides,
ConfigDBType configDBType) {
state std::vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
state Reference<ConfigNode> configNode;
state Reference<LocalConfiguration> localConfig;
if (configDBType != ConfigDBType::DISABLED) {
@ -3664,24 +3652,14 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
actors.push_back(reportErrors(monitorLeader(connRecord, cc), "ClusterController"));
} else if (processClass.machineClassFitness(ProcessClass::ClusterController) == ProcessClass::WorstFit &&
SERVER_KNOBS->MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS > 0) {
actors.push_back(reportErrors(monitorLeaderWithDelayedCandidacy(connRecord,
cc,
asyncPriorityInfo,
recoveredDiskFiles.getFuture(),
localities,
dbInfo,
configDBType,
clusterId),
"ClusterController"));
actors.push_back(
reportErrors(monitorLeaderWithDelayedCandidacy(
connRecord, cc, asyncPriorityInfo, localities, dbInfo, configDBType, clusterId),
"ClusterController"));
} else {
actors.push_back(reportErrors(clusterController(connRecord,
cc,
asyncPriorityInfo,
recoveredDiskFiles.getFuture(),
localities,
configDBType,
clusterId),
"ClusterController"));
actors.push_back(
reportErrors(clusterController(connRecord, cc, asyncPriorityInfo, localities, configDBType, clusterId),
"ClusterController"));
}
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface"));
actors.push_back(reportErrorsExcept(workerServer(connRecord,
@ -3693,7 +3671,6 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
memoryLimit,
metricsConnFile,
metricsPrefix,
recoveredDiskFiles,
memoryProfileThreshold,
coordFolder,
whitelistBinPaths,