Rework to make ElectionResult code similar to OpenDatabase code.
And also restore and fix the delayed cluster controller code.
This commit is contained in:
parent
43a63452d8
commit
1117eae2b5
|
@ -680,7 +680,7 @@ ACTOR Future<Void> getClientInfoFromLeader( Reference<AsyncVar<Optional<ClusterC
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddress> coordinators, ClientData* clientData ) {
|
||||
ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddress> coordinators, ClientData* clientData, Reference<AsyncVar<Optional<LeaderInfo>>> leaderInfo ) {
|
||||
state vector< ClientLeaderRegInterface > clientLeaderServers;
|
||||
state AsyncTrigger nomineeChange;
|
||||
state std::vector<Optional<LeaderInfo>> nominees;
|
||||
|
@ -719,6 +719,9 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
|
|||
ClusterControllerClientInterface res;
|
||||
reader.deserialize(res);
|
||||
knownLeader->set(res);
|
||||
if (leader.get().second) {
|
||||
leaderInfo->set(leader.get().first);
|
||||
}
|
||||
}
|
||||
}
|
||||
wait( nomineeChange.onTrigger() || allActors );
|
||||
|
|
|
@ -55,7 +55,7 @@ struct ClientData {
|
|||
template <class LeaderInterface>
|
||||
Future<Void> monitorLeader( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader );
|
||||
|
||||
Future<Void> monitorLeaderForProxies( Value const& key, vector<NetworkAddress> const& coordinators, ClientData* const& clientData );
|
||||
Future<Void> monitorLeaderForProxies( Value const& key, vector<NetworkAddress> const& coordinators, ClientData* const& clientData, Reference<AsyncVar<Optional<LeaderInfo>>> const& leaderInfo );
|
||||
|
||||
Future<Void> monitorProxies( Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile, Reference<AsyncVar<ClientDBInfo>> const& clientInfo, Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> const& supportedVersions, Key const& traceLogGroup );
|
||||
|
||||
|
|
|
@ -238,13 +238,27 @@ ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<Asyn
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> remoteMonitorLeader( Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader, ReplyPromise<Optional<LeaderInfo>> reply ) {
|
||||
loop {
|
||||
wait( currentElectedLeader->onChange() );
|
||||
if (currentElectedLeader->get().present())
|
||||
break;
|
||||
ACTOR Future<Void> remoteMonitorLeader( int* clientCount, Reference<AsyncVar<bool>> hasConnectedClients, Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader, ElectionResultRequest req ) {
|
||||
if (currentElectedLeader->get().present() && req.knownLeader != currentElectedLeader->get().get().changeID) {
|
||||
req.reply.send( currentElectedLeader->get() );
|
||||
}
|
||||
reply.send( currentElectedLeader->get() );
|
||||
|
||||
++(*clientCount);
|
||||
hasConnectedClients->set(true);
|
||||
|
||||
while (!currentElectedLeader->get().present() || req.knownLeader == currentElectedLeader->get().get().changeID) {
|
||||
choose {
|
||||
when (wait( yieldedFuture(currentElectedLeader->onChange()) ) ) {}
|
||||
when (wait( delayJittered( SERVER_KNOBS->CLIENT_REGISTER_INTERVAL ) )) { break; }
|
||||
}
|
||||
}
|
||||
|
||||
req.reply.send( currentElectedLeader->get() );
|
||||
|
||||
if(--(*clientCount) == 0) {
|
||||
hasConnectedClients->set(false);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -264,26 +278,22 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
state int clientCount = 0;
|
||||
state Reference<AsyncVar<bool>> hasConnectedClients = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
|
||||
state ActorCollection actors(false);
|
||||
state Future<Void> clientLeaderMon;
|
||||
state Future<Void> leaderMon;
|
||||
state AsyncVar<Value> leaderInterface;
|
||||
state Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader = Reference<AsyncVar<Optional<LeaderInfo>>>( new AsyncVar<Optional<LeaderInfo>>() );
|
||||
state Future<Void> fullLeaderMon;
|
||||
|
||||
loop choose {
|
||||
when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) { if(!clientLeaderMon.isValid()) {
|
||||
clientLeaderMon = monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData);
|
||||
when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) {
|
||||
if(!leaderMon.isValid()) {
|
||||
leaderMon = monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData, currentElectedLeader);
|
||||
}
|
||||
actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req));
|
||||
}
|
||||
when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) {
|
||||
if (!fullLeaderMon.isValid()) {
|
||||
fullLeaderMon = monitorLeaderInternal2( ClientCoordinators(req.key, req.coordinators), currentElectedLeader );
|
||||
}
|
||||
if (currentElectedLeader->get().present() && currentElectedLeader->get().get().changeID != req.knownLeader) {
|
||||
req.reply.send( currentElectedLeader->get() );
|
||||
} else {
|
||||
actors.add( remoteMonitorLeader( currentElectedLeader, req.reply ) );
|
||||
if(!leaderMon.isValid()) {
|
||||
leaderMon = monitorLeaderForProxies(req.key, req.coordinators, &clientData, currentElectedLeader);
|
||||
}
|
||||
actors.add( remoteMonitorLeader( &clientCount, hasConnectedClients, currentElectedLeader, req ) );
|
||||
}
|
||||
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
|
||||
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <tuple>
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include "flow/ActorCollection.h"
|
||||
|
@ -1557,35 +1558,12 @@ ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
ACTOR Future<Void> monitorLeaderWithDelayedCandidacy( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Future<Void> recoveredDiskFiles, LocalityData locality ) {
|
||||
state Future<Void> leader = monitorLeader( connFile, currentCC );
|
||||
state Future<Void> timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS );
|
||||
state Future<Void> failed = Never();
|
||||
state Future<Void> leader = Never();
|
||||
|
||||
loop choose {
|
||||
when( wait(currentCC->onChange()) ) {
|
||||
failed = IFailureMonitor::failureMonitor().onFailed( currentCC->get().get().getWorkers.getEndpoint() );
|
||||
timeout = Never();
|
||||
}
|
||||
when ( wait(timeout) ) {
|
||||
leader = clusterController( connFile, currentCC , asyncPriorityInfo, recoveredDiskFiles, locality );
|
||||
}
|
||||
when ( wait(failed) ) {
|
||||
timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS );
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
*/
|
||||
|
||||
ACTOR Future<ClusterConnectionString> monitorLeaderRemotelyOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> result ) {
|
||||
ACTOR Future<std::pair<ClusterConnectionString, UID>> monitorLeaderRemotelyOneGeneration( Reference<ClusterConnectionFile> connFile, UID changeID, Reference<AsyncVar<Value>> result ) {
|
||||
state ClusterConnectionString ccf = connFile->getConnectionString();
|
||||
state ElectionResultRequest request;
|
||||
request.key = ccf.clusterKey();
|
||||
request.coordinators = ccf.coordinators();
|
||||
request.knownLeader = changeID;
|
||||
state int index = 0;
|
||||
|
||||
loop {
|
||||
|
@ -1594,9 +1572,11 @@ ACTOR Future<ClusterConnectionString> monitorLeaderRemotelyOneGeneration( Refere
|
|||
try {
|
||||
ErrorOr<Optional<LeaderInfo>> leader = wait( interf.electionResult.tryGetReply( request ) );
|
||||
if (leader.isError()) throw leader.getError();
|
||||
if (!leader.get().present()) continue;
|
||||
|
||||
request.knownLeader = leader.get().get().changeID;
|
||||
if( leader.get().get().forward ) {
|
||||
return ClusterConnectionString( leader.get().get().serializedInfo.toString() );
|
||||
return std::make_pair(ClusterConnectionString( leader.get().get().serializedInfo.toString() ), request.knownLeader);
|
||||
}
|
||||
|
||||
if (leader.get().get().serializedInfo.size()) {
|
||||
|
@ -1613,9 +1593,12 @@ ACTOR Future<ClusterConnectionString> monitorLeaderRemotelyOneGeneration( Refere
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorLeaderRemotelyInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo ) {
|
||||
ACTOR Future<Void> monitorLeaderRemotelyInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedInterface ) {
|
||||
state UID changeID = UID(-1, -1);
|
||||
state ClusterConnectionString ccs;
|
||||
loop {
|
||||
ClusterConnectionString ccs = wait( monitorLeaderRemotelyOneGeneration( connFile, outSerializedLeaderInfo ) );
|
||||
std::pair<ClusterConnectionString, UID> result = wait( monitorLeaderRemotelyOneGeneration( connFile, changeID, outSerializedInterface ) );
|
||||
std::tie(ccs, changeID) = result;
|
||||
connFile->setConnectionString(ccs);
|
||||
}
|
||||
}
|
||||
|
@ -1629,6 +1612,28 @@ Future<Void> monitorLeaderRemotely(Reference<ClusterConnectionFile> const& connF
|
|||
return m || deserializer( serializedInfo, outKnownLeader );
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorLeaderRemotelyWithDelayedCandidacy( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Future<Void> recoveredDiskFiles, LocalityData locality ) {
|
||||
state Future<Void> monitor = monitorLeaderRemotely( connFile, currentCC );
|
||||
state Future<Void> timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS );
|
||||
state Future<Void> leader = Never();
|
||||
state Future<Void> failed = Never();
|
||||
|
||||
loop choose {
|
||||
when( wait(currentCC->onChange()) ) {
|
||||
failed = IFailureMonitor::failureMonitor().onFailed( currentCC->get().get().getWorkers.getEndpoint() );
|
||||
timeout = Never();
|
||||
}
|
||||
when ( wait(timeout) ) {
|
||||
wait( clusterController( connFile, currentCC , asyncPriorityInfo, recoveredDiskFiles, locality ) );
|
||||
return Void();
|
||||
}
|
||||
when ( wait(failed) ) {
|
||||
failed = Never();
|
||||
timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> fdbd(
|
||||
Reference<ClusterConnectionFile> connFile,
|
||||
LocalityData localities,
|
||||
|
@ -1673,8 +1678,7 @@ ACTOR Future<Void> fdbd(
|
|||
if (processClass == ProcessClass::TesterClass) {
|
||||
actors.push_back( reportErrors( monitorLeader( connFile, cc ), "ClusterController" ) );
|
||||
} else if (processClass == ProcessClass::StorageClass && SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS) {
|
||||
actors.push_back( reportErrors( monitorLeaderRemotely( connFile, cc ), "ClusterController" ) );
|
||||
//monitorLeaderWithDelayedCandidacy( connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController" ) );
|
||||
actors.push_back( reportErrors( monitorLeaderRemotelyWithDelayedCandidacy( connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController" ) );
|
||||
} else {
|
||||
actors.push_back( reportErrors( clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController") );
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue