From 3c1cabf04125e80d0f8ad3aef9e910b2854790e7 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Tue, 13 Jul 2021 16:43:09 -0700 Subject: [PATCH] Coordinator lets client know if it cannot communicate with cluster controller --- fdbclient/MonitorLeader.actor.cpp | 1 + fdbrpc/fdbrpc.h | 8 +++++ fdbserver/Coordination.actor.cpp | 54 +++++++++++++++++++++++++------ flow/error_definitions.h | 1 + 4 files changed, 55 insertions(+), 9 deletions(-) diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 920a09e247..f8c1769384 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -842,6 +842,7 @@ ACTOR Future monitorProxiesOneGeneration( clientInfo->set(ni); successIdx = idx; } else { + TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cannot talk to cluster controller idx = (idx + 1) % addrs.size(); if (idx == successIdx) { wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY)); diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index df13a7fa0c..53eb6b13d5 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -123,6 +123,14 @@ public: void sendError(const E& exc) const { sav->sendError(exc); } + template + void sendErrorOr(U&& value) const { + if (value.present()) { + sav->send(std::forward(value).get()); + } else { + sav->sendError(value.getError()); + } + } Future getFuture() const { sav->addFutureRef(); diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index eeffd7b4d7..c284c2eabd 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -37,6 +37,30 @@ // This module implements coordinationServer() and the interfaces in CoordinationInterface.h +namespace { + +class LivenessChecker { + double threshold; + AsyncVar lastTime; + ACTOR static Future checkStuck(LivenessChecker const* self) { + loop { + choose { + when(wait(delayUntil(self->lastTime.get() + self->threshold))) { return Void(); } + when(wait(self->lastTime.onChange())) {} + } + } + } + +public: + explicit LivenessChecker(double threshold) : threshold(threshold), lastTime(now()) {} + + void confirmLiveness() { lastTime.set(now()); } + + Future checkStuck() const { return checkStuck(this); } +}; + +} // namespace + struct GenerationRegVal { UniqueGeneration readGen, writeGen; Optional val; @@ -179,7 +203,10 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") { ACTOR Future openDatabase(ClientData* db, int* clientCount, Reference> hasConnectedClients, - OpenDatabaseCoordRequest req) { + OpenDatabaseCoordRequest req, + Future checkStuck) { + state ErrorOr> replyContents; + ++(*clientCount); hasConnectedClients->set(true); @@ -191,18 +218,22 @@ ACTOR Future openDatabase(ClientData* db, while (db->clientInfo->get().read().id == req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) { choose { + when(wait(checkStuck)) { + replyContents = failed_to_progress(); + break; + } when(wait(yieldedFuture(db->clientInfo->onChange()))) {} when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) { + if (req.supportedVersions.size() > 0) { + db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress()); + } + replyContents = db->clientInfo->get(); break; } // The client might be long gone! } } - if (req.supportedVersions.size() > 0) { - db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress()); - } - - req.reply.send(db->clientInfo->get()); + req.reply.sendErrorOr(replyContents); if (--(*clientCount) == 0) { hasConnectedClients->set(false); @@ -255,6 +286,7 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { state AsyncVar leaderInterface; state Reference>> currentElectedLeader = makeReference>>(); + state LivenessChecker canConnectToLeader(20.0); loop choose { when(OpenDatabaseCoordRequest req = waitNext(interf.openDatabase.getFuture())) { @@ -266,7 +298,8 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { leaderMon = monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData, currentElectedLeader); } - actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req)); + actors.add( + openDatabase(&clientData, &clientCount, hasConnectedClients, req, canConnectToLeader.checkStuck())); } } when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) { @@ -320,8 +353,11 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { // TODO: use notify to only send a heartbeat once per interval availableLeaders.erase(LeaderInfo(req.prevChangeID)); availableLeaders.insert(req.myInfo); - req.reply.send( - LeaderHeartbeatReply{ currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) }); + bool const isCurrentLeader = currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo); + if (isCurrentLeader) { + canConnectToLeader.confirmLiveness(); + } + req.reply.send(LeaderHeartbeatReply{ isCurrentLeader }); } when(ForwardRequest req = waitNext(interf.forward.getFuture())) { LeaderInfo newInfo; diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 8ffb54f290..b69801cfd7 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -99,6 +99,7 @@ ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup w ERROR( tag_throttled, 1213, "Transaction tag is being throttled" ) ERROR( grv_proxy_failed, 1214, "Master terminating because a GRV CommitProxy failed" ) ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" ) +ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress" ) // 15xx Platform errors ERROR( platform_error, 1500, "Platform error" )