From 1117eae2b52fa720e6ea5027c20973802126a747 Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Tue, 5 May 2020 01:00:17 -0700 Subject: [PATCH] Rework to make ElectionResult code similar to OpenDatabase code. And also restore and fix the delayed cluster controller code. --- fdbclient/MonitorLeader.actor.cpp | 5 ++- fdbclient/MonitorLeader.h | 2 +- fdbserver/Coordination.actor.cpp | 44 +++++++++++++-------- fdbserver/worker.actor.cpp | 64 ++++++++++++++++--------------- 4 files changed, 66 insertions(+), 49 deletions(-) diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 5646a7b9ba..4269d286ab 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -680,7 +680,7 @@ ACTOR Future getClientInfoFromLeader( Reference monitorLeaderForProxies( Key clusterKey, vector coordinators, ClientData* clientData ) { +ACTOR Future monitorLeaderForProxies( Key clusterKey, vector coordinators, ClientData* clientData, Reference>> leaderInfo ) { state vector< ClientLeaderRegInterface > clientLeaderServers; state AsyncTrigger nomineeChange; state std::vector> nominees; @@ -719,6 +719,9 @@ ACTOR Future monitorLeaderForProxies( Key clusterKey, vectorset(res); + if (leader.get().second) { + leaderInfo->set(leader.get().first); + } } } wait( nomineeChange.onTrigger() || allActors ); diff --git a/fdbclient/MonitorLeader.h b/fdbclient/MonitorLeader.h index eb196d1110..3a9b988b10 100644 --- a/fdbclient/MonitorLeader.h +++ b/fdbclient/MonitorLeader.h @@ -55,7 +55,7 @@ struct ClientData { template Future monitorLeader( Reference const& connFile, Reference>> const& outKnownLeader ); -Future monitorLeaderForProxies( Value const& key, vector const& coordinators, ClientData* const& clientData ); +Future monitorLeaderForProxies( Value const& key, vector const& coordinators, ClientData* const& clientData, Reference>> const& leaderInfo ); Future monitorProxies( Reference>> const& connFile, Reference> const& clientInfo, Reference>>> const& supportedVersions, Key const& traceLogGroup ); diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index f8d131d331..6d96dac4ea 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -238,13 +238,27 @@ ACTOR Future openDatabase(ClientData* db, int* clientCount, Reference remoteMonitorLeader( Reference>> currentElectedLeader, ReplyPromise> reply ) { - loop { - wait( currentElectedLeader->onChange() ); - if (currentElectedLeader->get().present()) - break; +ACTOR Future remoteMonitorLeader( int* clientCount, Reference> hasConnectedClients, Reference>> currentElectedLeader, ElectionResultRequest req ) { + if (currentElectedLeader->get().present() && req.knownLeader != currentElectedLeader->get().get().changeID) { + req.reply.send( currentElectedLeader->get() ); } - reply.send( currentElectedLeader->get() ); + + ++(*clientCount); + hasConnectedClients->set(true); + + while (!currentElectedLeader->get().present() || req.knownLeader == currentElectedLeader->get().get().changeID) { + choose { + when (wait( yieldedFuture(currentElectedLeader->onChange()) ) ) {} + when (wait( delayJittered( SERVER_KNOBS->CLIENT_REGISTER_INTERVAL ) )) { break; } + } + } + + req.reply.send( currentElectedLeader->get() ); + + if(--(*clientCount) == 0) { + hasConnectedClients->set(false); + } + return Void(); } @@ -264,26 +278,22 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { state int clientCount = 0; state Reference> hasConnectedClients = Reference>( new AsyncVar(false) ); state ActorCollection actors(false); - state Future clientLeaderMon; + state Future leaderMon; state AsyncVar leaderInterface; state Reference>> currentElectedLeader = Reference>>( new AsyncVar>() ); - state Future fullLeaderMon; loop choose { - when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) { if(!clientLeaderMon.isValid()) { - clientLeaderMon = monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData); + when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) { + if(!leaderMon.isValid()) { + leaderMon = monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData, currentElectedLeader); } actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req)); } when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) { - if (!fullLeaderMon.isValid()) { - fullLeaderMon = monitorLeaderInternal2( ClientCoordinators(req.key, req.coordinators), currentElectedLeader ); - } - if (currentElectedLeader->get().present() && currentElectedLeader->get().get().changeID != req.knownLeader) { - req.reply.send( currentElectedLeader->get() ); - } else { - actors.add( remoteMonitorLeader( currentElectedLeader, req.reply ) ); + if(!leaderMon.isValid()) { + leaderMon = monitorLeaderForProxies(req.key, req.coordinators, &clientData, currentElectedLeader); } + actors.add( remoteMonitorLeader( &clientCount, hasConnectedClients, currentElectedLeader, req ) ); } when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) { if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) { diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 74c4ff17f7..f8fa145d72 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include #include #include "flow/ActorCollection.h" @@ -1557,35 +1558,12 @@ ACTOR Future createAndLockProcessIdFile(std::string folder) { } } -/* -ACTOR Future monitorLeaderWithDelayedCandidacy( Reference connFile, Reference>> currentCC, Reference> asyncPriorityInfo, Future recoveredDiskFiles, LocalityData locality ) { - state Future leader = monitorLeader( connFile, currentCC ); - state Future timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS ); - state Future failed = Never(); - state Future leader = Never(); - - loop choose { - when( wait(currentCC->onChange()) ) { - failed = IFailureMonitor::failureMonitor().onFailed( currentCC->get().get().getWorkers.getEndpoint() ); - timeout = Never(); - } - when ( wait(timeout) ) { - leader = clusterController( connFile, currentCC , asyncPriorityInfo, recoveredDiskFiles, locality ); - } - when ( wait(failed) ) { - timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS ); - } - } - - return Void(); -} -*/ - -ACTOR Future monitorLeaderRemotelyOneGeneration( Reference connFile, Reference> result ) { +ACTOR Future> monitorLeaderRemotelyOneGeneration( Reference connFile, UID changeID, Reference> result ) { state ClusterConnectionString ccf = connFile->getConnectionString(); state ElectionResultRequest request; request.key = ccf.clusterKey(); request.coordinators = ccf.coordinators(); + request.knownLeader = changeID; state int index = 0; loop { @@ -1594,9 +1572,11 @@ ACTOR Future monitorLeaderRemotelyOneGeneration( Refere try { ErrorOr> leader = wait( interf.electionResult.tryGetReply( request ) ); if (leader.isError()) throw leader.getError(); + if (!leader.get().present()) continue; + request.knownLeader = leader.get().get().changeID; if( leader.get().get().forward ) { - return ClusterConnectionString( leader.get().get().serializedInfo.toString() ); + return std::make_pair(ClusterConnectionString( leader.get().get().serializedInfo.toString() ), request.knownLeader); } if (leader.get().get().serializedInfo.size()) { @@ -1613,9 +1593,12 @@ ACTOR Future monitorLeaderRemotelyOneGeneration( Refere } } -ACTOR Future monitorLeaderRemotelyInternal( Reference connFile, Reference> outSerializedLeaderInfo ) { +ACTOR Future monitorLeaderRemotelyInternal( Reference connFile, Reference> outSerializedInterface ) { + state UID changeID = UID(-1, -1); + state ClusterConnectionString ccs; loop { - ClusterConnectionString ccs = wait( monitorLeaderRemotelyOneGeneration( connFile, outSerializedLeaderInfo ) ); + std::pair result = wait( monitorLeaderRemotelyOneGeneration( connFile, changeID, outSerializedInterface ) ); + std::tie(ccs, changeID) = result; connFile->setConnectionString(ccs); } } @@ -1629,6 +1612,28 @@ Future monitorLeaderRemotely(Reference const& connF return m || deserializer( serializedInfo, outKnownLeader ); } +ACTOR Future monitorLeaderRemotelyWithDelayedCandidacy( Reference connFile, Reference>> currentCC, Reference> asyncPriorityInfo, Future recoveredDiskFiles, LocalityData locality ) { + state Future monitor = monitorLeaderRemotely( connFile, currentCC ); + state Future timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS ); + state Future leader = Never(); + state Future failed = Never(); + + loop choose { + when( wait(currentCC->onChange()) ) { + failed = IFailureMonitor::failureMonitor().onFailed( currentCC->get().get().getWorkers.getEndpoint() ); + timeout = Never(); + } + when ( wait(timeout) ) { + wait( clusterController( connFile, currentCC , asyncPriorityInfo, recoveredDiskFiles, locality ) ); + return Void(); + } + when ( wait(failed) ) { + failed = Never(); + timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS ); + } + } +} + ACTOR Future fdbd( Reference connFile, LocalityData localities, @@ -1673,8 +1678,7 @@ ACTOR Future fdbd( if (processClass == ProcessClass::TesterClass) { actors.push_back( reportErrors( monitorLeader( connFile, cc ), "ClusterController" ) ); } else if (processClass == ProcessClass::StorageClass && SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS) { - actors.push_back( reportErrors( monitorLeaderRemotely( connFile, cc ), "ClusterController" ) ); - //monitorLeaderWithDelayedCandidacy( connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController" ) ); + actors.push_back( reportErrors( monitorLeaderRemotelyWithDelayedCandidacy( connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController" ) ); } else { actors.push_back( reportErrors( clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController") ); }