choose leader on the perferred process class

This commit is contained in:
Yichi Chiang 2017-08-28 14:41:04 -07:00
parent 9e2b0debcd
commit 9fe927127f
8 changed files with 42 additions and 17 deletions

View File

@ -27,7 +27,7 @@ const StringRef LocalityData::keyDcId = LiteralStringRef("dcid");
const StringRef LocalityData::keyMachineId = LiteralStringRef("machineid");
const StringRef LocalityData::keyDataHallId = LiteralStringRef("data_hall");
ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) {
ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) const {
switch( role ) {
case ProcessClass::Storage:
switch( _class ) {
@ -108,6 +108,23 @@ ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) {
default:
return ProcessClass::WorstFit;
}
case ProcessClass::ClusterController:
switch( _class ) {
case ProcessClass::StatelessClass:
return ProcessClass::BestFit;
case ProcessClass::MasterClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::ProxyClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
default:
return ProcessClass::NeverAssign;
}

View File

@ -28,7 +28,7 @@ struct ProcessClass {
// This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items!
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, InvalidClass = -1 };
enum Fitness { BestFit, GoodFit, BestOtherFit, UnsetFit, WorstFit, NeverAssign };
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver };
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, ClusterController };
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };
int16_t _class;
int16_t _source;
@ -100,7 +100,7 @@ public:
}
}
Fitness machineClassFitness( ClusterRole role );
Fitness machineClassFitness( ClusterRole role ) const ;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -1560,14 +1560,14 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
}
}
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected ) {
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) {
loop {
state ClusterControllerFullInterface cci;
state bool inRole = false;
cci.initEndpoints();
try {
//Register as a possible leader; wait to be elected
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected );
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass );
while (!currentCC->get().present() || currentCC->get().get() != cci) {
choose {
@ -1591,12 +1591,12 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
}
}
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC ) {
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass) {
state bool hasConnected = false;
loop {
try {
ServerCoordinators coordinators( connFile );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected ) );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass ) );
} catch( Error &e ) {
if( e.code() != error_code_coordinators_changed )
throw; // Expected to terminate fdbserver

View File

@ -247,8 +247,8 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
return Void();
} else {
Optional<LeaderInfo> nextNominee =
availableLeaders.size() ? *availableLeaders.rbegin() :
availableCandidates.size() ? *availableCandidates.rbegin() : Optional<LeaderInfo>();
availableLeaders.size() ? *availableLeaders.begin() :
availableCandidates.size() ? *availableCandidates.begin() : Optional<LeaderInfo>();
if (nextNominee != currentNominee || !availableLeaders.size()) {
TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())

View File

@ -20,6 +20,7 @@
#include "flow/actorcompiler.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Locality.h"
#include "ClusterRecruitmentInterface.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/MonitorLeader.h"
@ -74,7 +75,7 @@ ACTOR Future<Void> changeLeaderCoordinators( ServerCoordinators coordinators, Va
return Void();
}
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected ) {
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) {
state Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees( new AsyncVar<vector<Optional<LeaderInfo>>>() );
state LeaderInfo myInfo;
state Future<Void> candidacies;
@ -90,7 +91,10 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
while (!iAmLeader) {
state Future<Void> badCandidateTimeout;
myInfo.changeID = g_random->randomUniqueID();
UID randomID = g_random->randomUniqueID();
int64_t mask = 15ll << 60;
int64_t modifiedFirstPart = (randomID.first() & ~mask) | ((int64_t)asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController) << 60);
myInfo.changeID = UID(modifiedFirstPart, randomID.second());
vector<Future<Void>> cand;
for(int i=0; i<coordinators.leaderElectionServers.size(); i++)

View File

@ -23,6 +23,7 @@
#pragma once
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
class ServerCoordinators;
@ -30,7 +31,8 @@ template <class LeaderInterface>
Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected);
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass);
// Participates in the given coordination group's leader election process, nominating the given
// LeaderInterface (presumed to be a local interface) as leader. The leader election process is
@ -46,16 +48,17 @@ Future<Void> changeLeaderCoordinators( ServerCoordinators const& coordinators, V
#pragma region Implementation
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected );
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );
template <class LeaderInterface>
Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected)
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass)
{
Reference<AsyncVar<Value>> serializedInfo( new AsyncVar<Value> );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass );
return m || asyncDeserialize( serializedInfo, outKnownLeader );
}

View File

@ -273,7 +273,7 @@ Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct Cluster
Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, ProcessClass const& processClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );
// These servers are started by workerServer
Future<Void> storageServer(

View File

@ -907,10 +907,11 @@ ACTOR Future<Void> fdbd(
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> );
Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> );
Reference<AsyncVar<ProcessClass>> asyncProcessClass(new AsyncVar<ProcessClass>(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource)));
vector<Future<Void>> v;
if ( coordFolder.size() )
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc ), "clusterController") );
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );