Merge pull request #5909 from sfc-gh-jfu/jfu-cc-request-dbinfo

Change dbinfo broadcast to be explicitly requested by the worker registration message
This commit is contained in:
Evan Tschannen 2021-11-16 15:01:42 -08:00 committed by GitHub
commit 557186ed17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 6 deletions

View File

@ -3876,8 +3876,6 @@ ACTOR Future<Void> workerAvailabilityWatch(WorkerInterface worker,
: waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME); : waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME);
cluster->updateWorkerList.set(worker.locality.processId(), cluster->updateWorkerList.set(worker.locality.processId(),
ProcessData(worker.locality, startingClass, worker.stableAddress())); ProcessData(worker.locality, startingClass, worker.stableAddress()));
cluster->updateDBInfoEndpoints.insert(worker.updateServerDBInfo.getEndpoint());
cluster->updateDBInfo.trigger();
// This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch // This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch
// fails for the worker. // fails for the worker.
wait(delay(0)); wait(delay(0));
@ -4301,6 +4299,8 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
self->id_worker[w.locality.processId()].watcher, self->id_worker[w.locality.processId()].watcher,
self->id_worker[w.locality.processId()].details.interf.configBroadcastInterface)); self->id_worker[w.locality.processId()].details.interf.configBroadcastInterface));
} }
self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint());
self->updateDBInfo.trigger();
checkOutstandingRequests(self); checkOutstandingRequests(self);
} else if (info->second.details.interf.id() != w.id() || req.generation >= info->second.gen) { } else if (info->second.details.interf.id() != w.id() || req.generation >= info->second.gen) {
if (!info->second.reply.isSet()) { if (!info->second.reply.isSet()) {
@ -4319,6 +4319,10 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
info->second.details.interf = w; info->second.details.interf = w;
info->second.watcher = workerAvailabilityWatch(w, newProcessClass, self); info->second.watcher = workerAvailabilityWatch(w, newProcessClass, self);
} }
if (req.requestDbInfo) {
self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint());
self->updateDBInfo.trigger();
}
if (configBroadcaster != nullptr) { if (configBroadcaster != nullptr) {
self->addActor.send( self->addActor.send(
configBroadcaster->registerWorker(req.lastSeenKnobVersion, configBroadcaster->registerWorker(req.lastSeenKnobVersion,

View File

@ -415,6 +415,7 @@ struct RegisterWorkerRequest {
bool degraded; bool degraded;
Version lastSeenKnobVersion; Version lastSeenKnobVersion;
ConfigClassSet knobConfigClassSet; ConfigClassSet knobConfigClassSet;
bool requestDbInfo;
RegisterWorkerRequest() RegisterWorkerRequest()
: priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {} : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
@ -431,7 +432,8 @@ struct RegisterWorkerRequest {
ConfigClassSet knobConfigClassSet) ConfigClassSet knobConfigClassSet)
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), : wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),
degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet) {} degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet),
requestDbInfo(false) {}
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
@ -449,7 +451,8 @@ struct RegisterWorkerRequest {
reply, reply,
degraded, degraded,
lastSeenKnobVersion, lastSeenKnobVersion,
knobConfigClassSet); knobConfigClassSet,
requestDbInfo);
} }
}; };

View File

@ -522,7 +522,8 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
Reference<AsyncVar<bool> const> degraded, Reference<AsyncVar<bool> const> degraded,
Reference<IClusterConnectionRecord> connRecord, Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<std::set<std::string>> const> issues, Reference<AsyncVar<std::set<std::string>> const> issues,
Reference<LocalConfiguration> localConfig) { Reference<LocalConfiguration> localConfig,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists // Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply // The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists. // (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
@ -533,6 +534,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
state Future<Void> cacheProcessFuture; state Future<Void> cacheProcessFuture;
state Future<Void> cacheErrorsFuture; state Future<Void> cacheErrorsFuture;
state Optional<double> incorrectTime; state Optional<double> incorrectTime;
state bool firstReg = true;
loop { loop {
state ClusterConnectionString storedConnectionString; state ClusterConnectionString storedConnectionString;
state bool upToDate = true; state bool upToDate = true;
@ -585,6 +587,13 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
} }
state bool ccInterfacePresent = ccInterface->get().present(); state bool ccInterfacePresent = ccInterface->get().present();
if (ccInterfacePresent) {
request.requestDbInfo = (ccInterface->get().get().id() != dbInfo->get().clusterInterface.id());
if (firstReg) {
request.requestDbInfo = true;
firstReg = false;
}
}
state Future<RegisterWorkerReply> registrationReply = state Future<RegisterWorkerReply> registrationReply =
ccInterfacePresent ? brokenPromiseToNever(ccInterface->get().get().registerWorker.getReply(request)) ccInterfacePresent ? brokenPromiseToNever(ccInterface->get().get().registerWorker.getReply(request))
: Never(); : Never();
@ -1613,7 +1622,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
degraded, degraded,
connRecord, connRecord,
issues, issues,
localConfig)); localConfig,
dbInfo));
if (configDBType != ConfigDBType::DISABLED) { if (configDBType != ConfigDBType::DISABLED) {
errorForwarders.add(localConfig->consume(interf.configBroadcastInterface)); errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));