Avoid recruiting workers with different cluster ID
This commit is contained in:
parent
43854c5ac8
commit
2394a9f4b9
|
@ -1228,6 +1228,15 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
|
|||
std::vector<NetworkAddress> coordinatorAddresses = wait(cs.tryResolveHostnames());
|
||||
|
||||
const WorkerInterface& w = req.wi;
|
||||
if (req.clusterId.present() && self->clusterId->get().present() && req.clusterId != self->clusterId->get()) {
|
||||
TraceEvent(g_network->isSimulated() ? SevWarnAlways : SevError, "WorkerBelongsToExistingCluster", self->id)
|
||||
.detail("WorkerClusterId", req.clusterId)
|
||||
.detail("ClusterControllerClusterId", self->clusterId->get())
|
||||
.detail("WorkerId", w.id())
|
||||
.detail("ProcessId", w.locality.processId());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ProcessClass newProcessClass = req.processClass;
|
||||
auto info = self->id_worker.find(w.locality.processId());
|
||||
ClusterControllerPriorityInfo newPriorityInfo = req.priorityInfo;
|
||||
|
@ -2968,8 +2977,9 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
|
|||
ServerCoordinators coordinators,
|
||||
LocalityData locality,
|
||||
ConfigDBType configDBType,
|
||||
Future<Void> recoveredDiskFiles) {
|
||||
state ClusterControllerData self(interf, locality, coordinators);
|
||||
Future<Void> recoveredDiskFiles,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId) {
|
||||
state ClusterControllerData self(interf, locality, coordinators, clusterId);
|
||||
state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY);
|
||||
state uint64_t step = 0;
|
||||
state Future<ErrorOr<Void>> error = errorOr(actorCollection(self.addActor.getFuture()));
|
||||
|
@ -3123,7 +3133,8 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
|
|||
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
|
||||
LocalityData locality,
|
||||
ConfigDBType configDBType,
|
||||
Future<Void> recoveredDiskFiles) {
|
||||
Future<Void> recoveredDiskFiles,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId) {
|
||||
loop {
|
||||
state ClusterControllerFullInterface cci;
|
||||
state bool inRole = false;
|
||||
|
@ -3150,7 +3161,8 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
|
|||
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
|
||||
inRole = true;
|
||||
|
||||
wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType, recoveredDiskFiles));
|
||||
wait(clusterControllerCore(
|
||||
cci, leaderFail, coordinators, locality, configDBType, recoveredDiskFiles, clusterId));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (inRole)
|
||||
|
@ -3174,7 +3186,8 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
|
|||
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
|
||||
Future<Void> recoveredDiskFiles,
|
||||
LocalityData locality,
|
||||
ConfigDBType configDBType) {
|
||||
ConfigDBType configDBType,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId) {
|
||||
|
||||
// Defer this wait optimization of cluster configuration has 'Encryption data at-rest' enabled.
|
||||
// Encryption depends on available of EncryptKeyProxy (EKP) FDB role to enable fetch/refresh of encryption keys
|
||||
|
@ -3194,8 +3207,14 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
|
|||
loop {
|
||||
try {
|
||||
ServerCoordinators coordinators(connRecord, configDBType);
|
||||
wait(clusterController(
|
||||
coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType, recoveredDiskFiles));
|
||||
wait(clusterController(coordinators,
|
||||
currentCC,
|
||||
hasConnected,
|
||||
asyncPriorityInfo,
|
||||
locality,
|
||||
configDBType,
|
||||
recoveredDiskFiles,
|
||||
clusterId));
|
||||
hasConnected = true;
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_coordinators_changed)
|
||||
|
@ -3213,7 +3232,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
|
|||
state ClusterControllerData data(ClusterControllerFullInterface(),
|
||||
LocalityData(),
|
||||
ServerCoordinators(Reference<IClusterConnectionRecord>(
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))),
|
||||
makeReference<AsyncVar<Optional<UID>>>());
|
||||
state NetworkAddress workerAddress(IPAddress(0x01010101), 1);
|
||||
state NetworkAddress badPeer1(IPAddress(0x02020202), 1);
|
||||
state NetworkAddress badPeer2(IPAddress(0x03030303), 1);
|
||||
|
@ -3308,7 +3328,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateRecoveredWorkers") {
|
|||
ClusterControllerData data(ClusterControllerFullInterface(),
|
||||
LocalityData(),
|
||||
ServerCoordinators(Reference<IClusterConnectionRecord>(
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))),
|
||||
makeReference<AsyncVar<Optional<UID>>>());
|
||||
NetworkAddress worker1(IPAddress(0x01010101), 1);
|
||||
NetworkAddress worker2(IPAddress(0x11111111), 1);
|
||||
NetworkAddress badPeer1(IPAddress(0x02020202), 1);
|
||||
|
@ -3357,7 +3378,8 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
|
|||
ClusterControllerData data(ClusterControllerFullInterface(),
|
||||
LocalityData(),
|
||||
ServerCoordinators(Reference<IClusterConnectionRecord>(
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))),
|
||||
makeReference<AsyncVar<Optional<UID>>>());
|
||||
NetworkAddress worker(IPAddress(0x01010101), 1);
|
||||
NetworkAddress badPeer1(IPAddress(0x02020202), 1);
|
||||
NetworkAddress badPeer2(IPAddress(0x03030303), 1);
|
||||
|
@ -3510,7 +3532,8 @@ TEST_CASE("/fdbserver/clustercontroller/recentRecoveryCountDueToHealth") {
|
|||
ClusterControllerData data(ClusterControllerFullInterface(),
|
||||
LocalityData(),
|
||||
ServerCoordinators(Reference<IClusterConnectionRecord>(
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))),
|
||||
makeReference<AsyncVar<Optional<UID>>>());
|
||||
|
||||
ASSERT_EQ(data.recentRecoveryCountDueToHealth(), 0);
|
||||
|
||||
|
@ -3531,7 +3554,8 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
|
|||
ClusterControllerData data(ClusterControllerFullInterface(),
|
||||
LocalityData(),
|
||||
ServerCoordinators(Reference<IClusterConnectionRecord>(
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))),
|
||||
makeReference<AsyncVar<Optional<UID>>>());
|
||||
NetworkAddress master(IPAddress(0x01010101), 1);
|
||||
NetworkAddress tlog(IPAddress(0x02020202), 1);
|
||||
NetworkAddress satelliteTlog(IPAddress(0x03030303), 1);
|
||||
|
@ -3667,7 +3691,8 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
|
|||
ClusterControllerData data(ClusterControllerFullInterface(),
|
||||
LocalityData(),
|
||||
ServerCoordinators(Reference<IClusterConnectionRecord>(
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
|
||||
new ClusterConnectionMemoryRecord(ClusterConnectionString()))),
|
||||
makeReference<AsyncVar<Optional<UID>>>());
|
||||
NetworkAddress master(IPAddress(0x01010101), 1);
|
||||
NetworkAddress tlog(IPAddress(0x02020202), 1);
|
||||
NetworkAddress satelliteTlog(IPAddress(0x03030303), 1);
|
||||
|
|
|
@ -2619,6 +2619,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
|
|||
return Void();
|
||||
}
|
||||
|
||||
// TODO: Remove all cluster ID logic from tlog and storage server
|
||||
ACTOR Future<Void> updateDurableClusterID(TLogData* self) {
|
||||
loop {
|
||||
// Persist cluster ID once cluster has recovered.
|
||||
|
@ -3607,9 +3608,9 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
|||
if (recovered.canBeSet())
|
||||
recovered.send(Void());
|
||||
|
||||
if (!self.durableClusterId.isValid()) {
|
||||
self.sharedActors.send(updateDurableClusterID(&self));
|
||||
}
|
||||
// if (!self.durableClusterId.isValid()) {
|
||||
// self.sharedActors.send(updateDurableClusterID(&self));
|
||||
// }
|
||||
self.sharedActors.send(commitQueue(&self));
|
||||
self.sharedActors.send(updateStorageLoop(&self));
|
||||
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
|
||||
|
|
|
@ -3341,6 +3341,7 @@ public:
|
|||
AsyncVar<std::pair<bool, Optional<std::vector<Optional<Key>>>>>
|
||||
changedDcIds; // current DC priorities to change second, and whether the cluster controller has been changed
|
||||
UID id;
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId;
|
||||
std::vector<Reference<RecruitWorkersInfo>> outstandingRecruitmentRequests;
|
||||
std::vector<Reference<RecruitRemoteWorkersInfo>> outstandingRemoteRecruitmentRequests;
|
||||
std::vector<std::pair<RecruitStorageRequest, double>> outstandingStorageRequests;
|
||||
|
@ -3412,15 +3413,16 @@ public:
|
|||
|
||||
ClusterControllerData(ClusterControllerFullInterface const& ccInterface,
|
||||
LocalityData const& locality,
|
||||
ServerCoordinators const& coordinators)
|
||||
ServerCoordinators const& coordinators,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId)
|
||||
: gotProcessClasses(false), gotFullyRecoveredConfig(false), shouldCommitSuicide(false),
|
||||
clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()), id(ccInterface.id()),
|
||||
ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), startTime(now()),
|
||||
goodRecruitmentTime(Never()), goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0),
|
||||
versionDifferenceUpdated(false), remoteDCMonitorStarted(false), remoteTransactionSystemDegraded(false),
|
||||
recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), recruitBlobMigrator(false),
|
||||
recruitEncryptKeyProxy(false), recruitConsistencyScan(false),
|
||||
clusterControllerMetrics("ClusterController", id.toString()),
|
||||
clusterId(clusterId), ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()),
|
||||
startTime(now()), goodRecruitmentTime(Never()), goodRemoteRecruitmentTime(Never()),
|
||||
datacenterVersionDifference(0), versionDifferenceUpdated(false), remoteDCMonitorStarted(false),
|
||||
remoteTransactionSystemDegraded(false), recruitDistributor(false), recruitRatekeeper(false),
|
||||
recruitBlobManager(false), recruitBlobMigrator(false), recruitEncryptKeyProxy(false),
|
||||
recruitConsistencyScan(false), clusterControllerMetrics("ClusterController", id.toString()),
|
||||
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
|
||||
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),
|
||||
getWorkersRequests("GetWorkersRequests", clusterControllerMetrics),
|
||||
|
|
|
@ -445,6 +445,7 @@ struct RegisterWorkerRequest {
|
|||
bool requestDbInfo;
|
||||
bool recoveredDiskFiles;
|
||||
ConfigBroadcastInterface configBroadcastInterface;
|
||||
Optional<UID> clusterId;
|
||||
|
||||
RegisterWorkerRequest()
|
||||
: priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
|
||||
|
@ -463,13 +464,14 @@ struct RegisterWorkerRequest {
|
|||
Optional<Version> lastSeenKnobVersion,
|
||||
Optional<ConfigClassSet> knobConfigClassSet,
|
||||
bool recoveredDiskFiles,
|
||||
ConfigBroadcastInterface configBroadcastInterface)
|
||||
ConfigBroadcastInterface configBroadcastInterface,
|
||||
Optional<UID> clusterId)
|
||||
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
|
||||
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),
|
||||
blobMigratorInterf(mgInterf), encryptKeyProxyInterf(ekpInterf), consistencyScanInterf(csInterf),
|
||||
degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet),
|
||||
requestDbInfo(false), recoveredDiskFiles(recoveredDiskFiles),
|
||||
configBroadcastInterface(configBroadcastInterface) {}
|
||||
configBroadcastInterface(configBroadcastInterface), clusterId(clusterId) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
@ -493,7 +495,8 @@ struct RegisterWorkerRequest {
|
|||
knobConfigClassSet,
|
||||
requestDbInfo,
|
||||
recoveredDiskFiles,
|
||||
configBroadcastInterface);
|
||||
configBroadcastInterface,
|
||||
clusterId);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1129,7 +1132,8 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> ccr,
|
|||
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
|
||||
Future<Void> recoveredDiskFiles,
|
||||
LocalityData locality,
|
||||
ConfigDBType configDBType);
|
||||
ConfigDBType configDBType,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId);
|
||||
|
||||
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwi,
|
||||
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady,
|
||||
|
|
|
@ -572,7 +572,8 @@ ACTOR Future<Void> registrationClient(
|
|||
Reference<LocalConfiguration> localConfig,
|
||||
ConfigBroadcastInterface configBroadcastInterface,
|
||||
Reference<AsyncVar<ServerDBInfo>> dbInfo,
|
||||
Promise<Void> recoveredDiskFiles) {
|
||||
Promise<Void> recoveredDiskFiles,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId) {
|
||||
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
|
||||
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
|
||||
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
|
||||
|
@ -611,7 +612,8 @@ ACTOR Future<Void> registrationClient(
|
|||
localConfig.isValid() ? localConfig->lastSeenVersion() : Optional<Version>(),
|
||||
localConfig.isValid() ? localConfig->configClassSet() : Optional<ConfigClassSet>(),
|
||||
recoveredDiskFiles.isSet(),
|
||||
configBroadcastInterface);
|
||||
configBroadcastInterface,
|
||||
clusterId->get());
|
||||
|
||||
for (auto const& i : issues->get()) {
|
||||
request.issues.push_back_deep(request.issues.arena(), i);
|
||||
|
@ -651,7 +653,8 @@ ACTOR Future<Void> registrationClient(
|
|||
TraceEvent("WorkerRegister")
|
||||
.detail("CCID", ccInterface->get().get().id())
|
||||
.detail("Generation", requestGeneration)
|
||||
.detail("RecoveredDiskFiles", recoveredDiskFiles.isSet());
|
||||
.detail("RecoveredDiskFiles", recoveredDiskFiles.isSet())
|
||||
.detail("ClusterId", clusterId->get());
|
||||
}
|
||||
state Future<RegisterWorkerReply> registrationReply =
|
||||
ccInterfacePresent ? brokenPromiseToNever(ccInterface->get().get().registerWorker.getReply(request))
|
||||
|
@ -1636,6 +1639,50 @@ ACTOR Future<Void> resetBlobManagerWhenDoneOrError(
|
|||
return Void();
|
||||
}
|
||||
|
||||
static const std::string clusterIdFilename = "clusterId";
|
||||
|
||||
ACTOR Future<Void> createClusterIdFile(std::string folder, UID clusterId) {
|
||||
state std::string clusterIdPath = joinPath(folder, clusterIdFilename);
|
||||
if (fileExists(clusterIdPath)) {
|
||||
return Void();
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
state ErrorOr<Reference<IAsyncFile>> clusterIdFile =
|
||||
wait(errorOr(IAsyncFileSystem::filesystem(g_network)->open(
|
||||
clusterIdPath, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK, 0600)));
|
||||
|
||||
if (clusterIdFile.isError() && clusterIdFile.getError().code() == error_code_file_not_found &&
|
||||
!fileExists(clusterIdPath)) {
|
||||
Reference<IAsyncFile> _clusterIdFile = wait(IAsyncFileSystem::filesystem()->open(
|
||||
clusterIdPath,
|
||||
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK |
|
||||
IAsyncFile::OPEN_READWRITE,
|
||||
0600));
|
||||
clusterIdFile = _clusterIdFile;
|
||||
BinaryWriter wr(IncludeVersion());
|
||||
wr << clusterId;
|
||||
wait(clusterIdFile.get()->write(wr.getData(), wr.getLength(), 0));
|
||||
wait(clusterIdFile.get()->sync());
|
||||
return Void();
|
||||
} else {
|
||||
throw clusterIdFile.getError();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw;
|
||||
}
|
||||
if (!e.isInjectedFault()) {
|
||||
fprintf(stderr,
|
||||
"ERROR: error creating or opening cluster id file `%s'.\n",
|
||||
joinPath(folder, clusterIdFilename).c_str());
|
||||
}
|
||||
TraceEvent(SevError, "OpenClusterIdError").error(e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
||||
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
|
||||
LocalityData locality,
|
||||
|
@ -1652,7 +1699,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
Reference<AsyncVar<ServerDBInfo>> dbInfo,
|
||||
ConfigBroadcastInterface configBroadcastInterface,
|
||||
Reference<ConfigNode> configNode,
|
||||
Reference<LocalConfiguration> localConfig) {
|
||||
Reference<LocalConfiguration> localConfig,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId) {
|
||||
state PromiseStream<ErrorInfo> errors;
|
||||
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
|
||||
new AsyncVar<Optional<DataDistributorInterface>>());
|
||||
|
@ -1997,7 +2045,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
localConfig,
|
||||
configBroadcastInterface,
|
||||
dbInfo,
|
||||
recoveredDiskFiles));
|
||||
recoveredDiskFiles,
|
||||
clusterId));
|
||||
|
||||
if (configNode.isValid()) {
|
||||
errorForwarders.add(brokenPromiseToNever(localConfig->consume(configBroadcastInterface)));
|
||||
|
@ -2008,7 +2057,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
}
|
||||
|
||||
loop choose {
|
||||
when(UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
|
||||
when(state UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
|
||||
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(
|
||||
req.serializedDbInfo, AssumeVersion(g_network->protocolVersion()));
|
||||
localInfo.myLocality = locality;
|
||||
|
@ -2044,6 +2093,18 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
}
|
||||
errorForwarders.add(
|
||||
success(broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, notUpdated, true)));
|
||||
|
||||
if (!clusterId->get().present() && localInfo.recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
|
||||
localInfo.client.clusterId.isValid()) {
|
||||
// Persist the cluster ID as a file in the data
|
||||
// directory once recovery has made the transaction
|
||||
// state store durable. The txnStateStore also stores
|
||||
// the cluster ID.
|
||||
// TODO: Does the txnStateStore need to store the cluster ID?
|
||||
state UID tmpClusterId = localInfo.client.clusterId;
|
||||
wait(createClusterIdFile(folder, tmpClusterId));
|
||||
clusterId->set(tmpClusterId);
|
||||
}
|
||||
}
|
||||
}
|
||||
when(RebootRequest req = waitNext(interf.clientInterface.reboot.getFuture())) {
|
||||
|
@ -3360,7 +3421,8 @@ ACTOR Future<Void> monitorLeaderWithDelayedCandidacy(
|
|||
Future<Void> recoveredDiskFiles,
|
||||
LocalityData locality,
|
||||
Reference<AsyncVar<ServerDBInfo>> dbInfo,
|
||||
ConfigDBType configDBType) {
|
||||
ConfigDBType configDBType,
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId) {
|
||||
state Future<Void> monitor = monitorLeaderWithDelayedCandidacyImpl(connRecord, currentCC);
|
||||
state Future<Void> timeout;
|
||||
|
||||
|
@ -3387,7 +3449,7 @@ ACTOR Future<Void> monitorLeaderWithDelayedCandidacy(
|
|||
when(wait(timeout.isValid() ? timeout : Never())) {
|
||||
monitor.cancel();
|
||||
wait(clusterController(
|
||||
connRecord, currentCC, asyncPriorityInfo, recoveredDiskFiles, locality, configDBType));
|
||||
connRecord, currentCC, asyncPriorityInfo, recoveredDiskFiles, locality, configDBType, clusterId));
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
@ -3437,6 +3499,17 @@ ACTOR Future<Void> serveProcess() {
|
|||
}
|
||||
}
|
||||
|
||||
Optional<UID> readClusterId(std::string filePath) {
|
||||
if (!fileExists(filePath)) {
|
||||
return Optional<UID>();
|
||||
}
|
||||
std::string contents(readFileBytes(filePath, 10000));
|
||||
BinaryReader br(StringRef(contents), IncludeVersion());
|
||||
UID clusterId;
|
||||
br >> clusterId;
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
|
||||
LocalityData localities,
|
||||
ProcessClass processClass,
|
||||
|
@ -3511,6 +3584,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
|
|||
serverDBInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
|
||||
serverDBInfo.myLocality = localities;
|
||||
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>(serverDBInfo);
|
||||
Reference<AsyncVar<Optional<UID>>> clusterId(
|
||||
new AsyncVar<Optional<UID>>(readClusterId(joinPath(dataFolder, clusterIdFilename))));
|
||||
TraceEvent("MyLocality").detail("Locality", dbInfo->get().myLocality.toString());
|
||||
|
||||
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
|
||||
|
@ -3525,13 +3600,18 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
|
|||
recoveredDiskFiles.getFuture(),
|
||||
localities,
|
||||
dbInfo,
|
||||
configDBType),
|
||||
configDBType,
|
||||
clusterId),
|
||||
"ClusterController"));
|
||||
} else {
|
||||
actors.push_back(reportErrors(
|
||||
clusterController(
|
||||
connRecord, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities, configDBType),
|
||||
"ClusterController"));
|
||||
actors.push_back(reportErrors(clusterController(connRecord,
|
||||
cc,
|
||||
asyncPriorityInfo,
|
||||
recoveredDiskFiles.getFuture(),
|
||||
localities,
|
||||
configDBType,
|
||||
clusterId),
|
||||
"ClusterController"));
|
||||
}
|
||||
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface"));
|
||||
actors.push_back(reportErrorsExcept(workerServer(connRecord,
|
||||
|
@ -3550,7 +3630,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
|
|||
dbInfo,
|
||||
configBroadcastInterface,
|
||||
configNode,
|
||||
localConfig),
|
||||
localConfig,
|
||||
clusterId),
|
||||
"WorkerServer",
|
||||
UID(),
|
||||
&normalWorkerErrors()));
|
||||
|
|
Loading…
Reference in New Issue