Move UID generation and add initialClass

This commit is contained in:
Yichi Chiang 2017-10-13 13:46:37 -07:00
parent 12edd27281
commit 5bcdd37c0d
4 changed files with 7 additions and 9 deletions

View File

@ -1335,7 +1335,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
}
// Notify the worker to register again with new process class
if (classIter != self->id_class.end() && classIter->second != req.processClass && !req.reply.isSet()) {
if (newProcessClass != req.processClass && !req.reply.isSet()) {
req.reply.send( newProcessClass );
}
}

View File

@ -89,11 +89,10 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
state Future<Void> buggifyDelay = (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) ? buggifyDelayedAsyncVar( outSerializedLeader ) : Void();
myInfo.changeID = g_random->randomUniqueID();
while (!iAmLeader) {
state Future<Void> badCandidateTimeout;
myInfo.changeID = g_random->randomUniqueID();
prevChangeID = myInfo.changeID;
myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController));

View File

@ -272,7 +272,7 @@ class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, int
Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b );
Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );
// These servers are started by workerServer

View File

@ -248,11 +248,10 @@ std::vector< DiskStore > getDiskStores( std::string folder ) {
return result;
}
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) {
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass ) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
state Generation requestGeneration = 0;
state ProcessClass initialClass = asyncProcessClass->get();
loop {
Future<ProcessClass> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), requestGeneration++) ) ) : Never();
@ -464,7 +463,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities,
Reference<AsyncVar<ProcessClass>> asyncProcessClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
state PromiseStream< ErrorInfo > errors;
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
state ActorCollection errorForwarders(false);
@ -627,7 +626,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
startRole( interf.id(), interf.id(), "Worker", details );
Void _ = wait(waitForAll(recoveries));
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass ) );
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass ) );
TraceEvent("RecoveriesComplete", interf.id());
@ -931,7 +930,7 @@ ACTOR Future<Void> fdbd(
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
Void _ = wait( quorum(v,1) );