fix: the cluster controller did not pass in its own locality when creating its database object, therefore it was not using locality aware load balancing
This commit is contained in:
parent
b560b94ebc
commit
22e6afbb18
|
@ -1008,13 +1008,14 @@ public:
|
|||
Version datacenterVersionDifference;
|
||||
bool versionDifferenceUpdated;
|
||||
|
||||
explicit ClusterControllerData( ClusterControllerFullInterface ccInterface )
|
||||
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
|
||||
: id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0), versionDifferenceUpdated(false)
|
||||
{
|
||||
auto serverInfo = db.serverInfo->get();
|
||||
serverInfo.id = g_random->randomUniqueID();
|
||||
serverInfo.masterLifetime.ccID = id;
|
||||
serverInfo.clusterInterface = ccInterface;
|
||||
serverInfo.myLocality = locality;
|
||||
db.serverInfo->set( serverInfo );
|
||||
cx = openDBOnServer(db.serverInfo, TaskDefaultEndpoint, true, true);
|
||||
}
|
||||
|
@ -2146,8 +2147,8 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf, Future<Void> leaderFail, ServerCoordinators coordinators ) {
|
||||
state ClusterControllerData self( interf );
|
||||
ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf, Future<Void> leaderFail, ServerCoordinators coordinators, LocalityData locality ) {
|
||||
state ClusterControllerData self( interf, locality );
|
||||
state Future<Void> coordinationPingDelay = delay( SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY );
|
||||
state uint64_t step = 0;
|
||||
state PromiseStream<Future<Void>> addActor;
|
||||
|
@ -2254,7 +2255,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo ) {
|
||||
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, LocalityData locality ) {
|
||||
loop {
|
||||
state ClusterControllerFullInterface cci;
|
||||
state bool inRole = false;
|
||||
|
@ -2274,7 +2275,7 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
|
|||
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
|
||||
inRole = true;
|
||||
|
||||
Void _ = wait( clusterControllerCore( cci, leaderFail, coordinators ) );
|
||||
Void _ = wait( clusterControllerCore( cci, leaderFail, coordinators, locality ) );
|
||||
} catch(Error& e) {
|
||||
if (inRole)
|
||||
endRole(Role::CLUSTER_CONTROLLER, cci.id(), "Error", e.code() == error_code_actor_cancelled || e.code() == error_code_coordinators_changed, e);
|
||||
|
@ -2285,13 +2286,13 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Future<Void> recoveredDiskFiles ) {
|
||||
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Future<Void> recoveredDiskFiles, LocalityData locality ) {
|
||||
Void _ = wait(recoveredDiskFiles);
|
||||
state bool hasConnected = false;
|
||||
loop {
|
||||
try {
|
||||
ServerCoordinators coordinators( connFile );
|
||||
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncPriorityInfo ) );
|
||||
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncPriorityInfo, locality ) );
|
||||
} catch( Error &e ) {
|
||||
if( e.code() != error_code_coordinators_changed )
|
||||
throw; // Expected to terminate fdbserver
|
||||
|
|
|
@ -319,8 +319,7 @@ class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, int
|
|||
Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b );
|
||||
|
||||
Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
|
||||
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix, Promise<Void> const& recoveredDiskFiles );
|
||||
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo, Future<Void> const& recoveredDiskFiles );
|
||||
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo, Future<Void> const& recoveredDiskFiles, LocalityData const& locality );
|
||||
|
||||
// These servers are started by workerServer
|
||||
Future<Void> storageServer(
|
||||
|
|
|
@ -486,9 +486,8 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities,
|
||||
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix, Promise<Void> recoveredDiskFiles,
|
||||
UID processIDUid) {
|
||||
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData locality,
|
||||
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix, Promise<Void> recoveredDiskFiles) {
|
||||
state PromiseStream< ErrorInfo > errors;
|
||||
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
|
||||
state ActorCollection errorForwarders(false);
|
||||
|
@ -502,9 +501,6 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
state PromiseStream<InitializeTLogRequest> tlogRequests;
|
||||
state Future<Void> tlog = Void();
|
||||
|
||||
state Standalone<StringRef> processID = processIDUid.toString();
|
||||
state LocalityData locality = localities;
|
||||
locality.set(LocalityData::keyProcessId, processID);
|
||||
state WorkerInterface interf( locality );
|
||||
|
||||
if(metricsPrefix.size() > 0) {
|
||||
|
@ -1022,6 +1018,7 @@ ACTOR Future<Void> fdbd(
|
|||
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
|
||||
|
||||
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));
|
||||
localities.set(LocalityData::keyProcessId, processIDUid.toString());
|
||||
// Only one process can execute on a dataFolder from this point onwards
|
||||
|
||||
std::string fitnessFilePath = joinPath(dataFolder, "fitness");
|
||||
|
@ -1031,10 +1028,10 @@ ACTOR Future<Void> fdbd(
|
|||
Promise<Void> recoveredDiskFiles;
|
||||
|
||||
v.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo), "MonitorAndWriteCCPriorityInfo"));
|
||||
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture()), "ClusterController") );
|
||||
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController") );
|
||||
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "ExtractClusterInterface") );
|
||||
v.push_back( reportErrors(failureMonitorClient( ci, true ), "FailureMonitorClient") );
|
||||
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix, recoveredDiskFiles, processIDUid), "WorkerServer", UID(), &normalWorkerErrors()) );
|
||||
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix, recoveredDiskFiles), "WorkerServer", UID(), &normalWorkerErrors()) );
|
||||
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
|
||||
|
||||
Void _ = wait( quorum(v,1) );
|
||||
|
|
Loading…
Reference in New Issue