Coordinator lets client know if it cannot communicate with cluster controller
This commit is contained in:
parent
9379bab04e
commit
3c1cabf041
|
@ -842,6 +842,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
clientInfo->set(ni);
|
||||
successIdx = idx;
|
||||
} else {
|
||||
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cannot talk to cluster controller
|
||||
idx = (idx + 1) % addrs.size();
|
||||
if (idx == successIdx) {
|
||||
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
|
||||
|
|
|
@ -123,6 +123,14 @@ public:
|
|||
void sendError(const E& exc) const {
|
||||
sav->sendError(exc);
|
||||
}
|
||||
template <class U>
|
||||
void sendErrorOr(U&& value) const {
|
||||
if (value.present()) {
|
||||
sav->send(std::forward<U>(value).get());
|
||||
} else {
|
||||
sav->sendError(value.getError());
|
||||
}
|
||||
}
|
||||
|
||||
Future<T> getFuture() const {
|
||||
sav->addFutureRef();
|
||||
|
|
|
@ -37,6 +37,30 @@
|
|||
|
||||
// This module implements coordinationServer() and the interfaces in CoordinationInterface.h
|
||||
|
||||
namespace {
|
||||
|
||||
class LivenessChecker {
|
||||
double threshold;
|
||||
AsyncVar<double> lastTime;
|
||||
ACTOR static Future<Void> checkStuck(LivenessChecker const* self) {
|
||||
loop {
|
||||
choose {
|
||||
when(wait(delayUntil(self->lastTime.get() + self->threshold))) { return Void(); }
|
||||
when(wait(self->lastTime.onChange())) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit LivenessChecker(double threshold) : threshold(threshold), lastTime(now()) {}
|
||||
|
||||
void confirmLiveness() { lastTime.set(now()); }
|
||||
|
||||
Future<Void> checkStuck() const { return checkStuck(this); }
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
struct GenerationRegVal {
|
||||
UniqueGeneration readGen, writeGen;
|
||||
Optional<Value> val;
|
||||
|
@ -179,7 +203,10 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
|
|||
ACTOR Future<Void> openDatabase(ClientData* db,
|
||||
int* clientCount,
|
||||
Reference<AsyncVar<bool>> hasConnectedClients,
|
||||
OpenDatabaseCoordRequest req) {
|
||||
OpenDatabaseCoordRequest req,
|
||||
Future<Void> checkStuck) {
|
||||
state ErrorOr<CachedSerialization<ClientDBInfo>> replyContents;
|
||||
|
||||
++(*clientCount);
|
||||
hasConnectedClients->set(true);
|
||||
|
||||
|
@ -191,18 +218,22 @@ ACTOR Future<Void> openDatabase(ClientData* db,
|
|||
while (db->clientInfo->get().read().id == req.knownClientInfoID &&
|
||||
!db->clientInfo->get().read().forward.present()) {
|
||||
choose {
|
||||
when(wait(checkStuck)) {
|
||||
replyContents = failed_to_progress();
|
||||
break;
|
||||
}
|
||||
when(wait(yieldedFuture(db->clientInfo->onChange()))) {}
|
||||
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) {
|
||||
if (req.supportedVersions.size() > 0) {
|
||||
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
replyContents = db->clientInfo->get();
|
||||
break;
|
||||
} // The client might be long gone!
|
||||
}
|
||||
}
|
||||
|
||||
if (req.supportedVersions.size() > 0) {
|
||||
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
|
||||
req.reply.send(db->clientInfo->get());
|
||||
req.reply.sendErrorOr(replyContents);
|
||||
|
||||
if (--(*clientCount) == 0) {
|
||||
hasConnectedClients->set(false);
|
||||
|
@ -255,6 +286,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
state AsyncVar<Value> leaderInterface;
|
||||
state Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader =
|
||||
makeReference<AsyncVar<Optional<LeaderInfo>>>();
|
||||
state LivenessChecker canConnectToLeader(20.0);
|
||||
|
||||
loop choose {
|
||||
when(OpenDatabaseCoordRequest req = waitNext(interf.openDatabase.getFuture())) {
|
||||
|
@ -266,7 +298,8 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
leaderMon =
|
||||
monitorLeaderForProxies(req.clusterKey, req.coordinators, &clientData, currentElectedLeader);
|
||||
}
|
||||
actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req));
|
||||
actors.add(
|
||||
openDatabase(&clientData, &clientCount, hasConnectedClients, req, canConnectToLeader.checkStuck()));
|
||||
}
|
||||
}
|
||||
when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) {
|
||||
|
@ -320,8 +353,11 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
// TODO: use notify to only send a heartbeat once per interval
|
||||
availableLeaders.erase(LeaderInfo(req.prevChangeID));
|
||||
availableLeaders.insert(req.myInfo);
|
||||
req.reply.send(
|
||||
LeaderHeartbeatReply{ currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) });
|
||||
bool const isCurrentLeader = currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo);
|
||||
if (isCurrentLeader) {
|
||||
canConnectToLeader.confirmLiveness();
|
||||
}
|
||||
req.reply.send(LeaderHeartbeatReply{ isCurrentLeader });
|
||||
}
|
||||
when(ForwardRequest req = waitNext(interf.forward.getFuture())) {
|
||||
LeaderInfo newInfo;
|
||||
|
|
|
@ -99,6 +99,7 @@ ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup w
|
|||
ERROR( tag_throttled, 1213, "Transaction tag is being throttled" )
|
||||
ERROR( grv_proxy_failed, 1214, "Master terminating because a GRV CommitProxy failed" )
|
||||
ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" )
|
||||
ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress" )
|
||||
|
||||
// 15xx Platform errors
|
||||
ERROR( platform_error, 1500, "Platform error" )
|
||||
|
|
Loading…
Reference in New Issue