Bug fixes to get it actually doing the right thing:

* Intialize electionResult when constructing with NetworkAddress.
* Return after sending a reply.
* Reset the reply promise on each new request.
This commit is contained in:
Alex Miller 2020-05-08 01:00:18 -07:00
parent 5f51d97444
commit 383099aef3
3 changed files with 17 additions and 3 deletions

View File

@ -66,6 +66,7 @@ GenerationRegInterface::GenerationRegInterface( INetwork* local )
LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress remote)
: ClientLeaderRegInterface(remote),
candidacy( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_CANDIDACY) ),
electionResult( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT) ),
leaderHeartbeat( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT) ),
forward( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_FORWARD) )
{
@ -245,19 +246,24 @@ ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<Asyn
ACTOR Future<Void> remoteMonitorLeader( int* clientCount, Reference<AsyncVar<bool>> hasConnectedClients, Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader, ElectionResultRequest req ) {
if (currentElectedLeader->get().present() && req.knownLeader != currentElectedLeader->get().get().changeID) {
TraceEvent("ElectionResultQuickReply").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
req.reply.send( currentElectedLeader->get() );
return Void();
}
++(*clientCount);
hasConnectedClients->set(true);
while (!currentElectedLeader->get().present() || req.knownLeader == currentElectedLeader->get().get().changeID) {
TraceEvent("ElectionResultWaitBefore").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
choose {
when (wait( yieldedFuture(currentElectedLeader->onChange()) ) ) {}
when (wait( delayJittered( SERVER_KNOBS->CLIENT_REGISTER_INTERVAL ) )) { break; }
}
TraceEvent("ElectionResultWaitAfter").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
}
TraceEvent("ElectionResultReply").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
req.reply.send( currentElectedLeader->get() );
if(--(*clientCount) == 0) {
@ -295,6 +301,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req));
}
when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) {
TraceEvent("ElectionResultRequestReceivedRegister").detail("RequestID", req.requestID);
if(!leaderMon.isValid()) {
leaderMon = monitorLeaderForProxies(req.key, req.coordinators, &clientData, currentElectedLeader);
}
@ -525,11 +532,13 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
}
}
when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) {
TraceEvent("ElectionResultRequestReceivedServer").detail("RequestID", req.requestID);
Optional<LeaderInfo> forward = regs.getForward(req.key);
if( forward.present() )
if( forward.present() ) {
req.reply.send( forward.get() );
else
} else {
regs.getInterface(req.key, id).electionResult.send( req );
}
}
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
Optional<LeaderInfo> forward = regs.getForward(req.key);

View File

@ -142,6 +142,7 @@ struct ElectionResultRequest {
Key key;
vector<NetworkAddress> coordinators;
UID knownLeader;
UID requestID;
ReplyPromise<Optional<LeaderInfo>> reply;
ElectionResultRequest() = default;
@ -149,7 +150,7 @@ struct ElectionResultRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, coordinators, knownLeader, reply);
serializer(ar, key, coordinators, knownLeader, requestID, reply);
}
};

View File

@ -1584,8 +1584,12 @@ ACTOR Future<std::pair<ClusterConnectionString, UID>> monitorLeaderRemotelyOneGe
loop {
LeaderElectionRegInterface interf( request.coordinators[index] );
request.reply = ReplyPromise<Optional<LeaderInfo>>();
try {
UID requestID = deterministicRandom()->randomUniqueID();
TraceEvent("ElectionResultRequest").detail("RequestID", requestID).detail("Destination", request.coordinators[index].toString());
request.requestID = requestID;
ErrorOr<Optional<LeaderInfo>> leader = wait( interf.electionResult.tryGetReply( request ) );
if (leader.isError()) throw leader.getError();
if (!leader.get().present()) continue;