slowly send notifications to clients to clear the list of dead clients

This commit is contained in:
Evan Tschannen 2018-08-08 17:29:32 -07:00
parent 0ca11aabe6
commit 7f7755165c
3 changed files with 15 additions and 8 deletions

View File

@ -204,11 +204,11 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
state std::set<LeaderInfo> availableCandidates;
state std::set<LeaderInfo> availableLeaders;
state Optional<LeaderInfo> currentNominee;
state vector<ReplyPromise<Optional<LeaderInfo>>> notify;
state Deque<ReplyPromise<Optional<LeaderInfo>>> notify;
state Future<Void> nextInterval = delay( 0 );
state double candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
state int leaderIntervalCount = 0;
state double lastNotifiedCleared = now();
state Future<Void> notifyCheck = delay(SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / SERVER_KNOBS->MIN_NOTIFICATIONS);
loop choose {
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
@ -217,11 +217,10 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
} else {
notify.push_back( req.reply );
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
TraceEvent(now() - lastNotifiedCleared < 100000 ? SevWarnAlways : SevWarn, "TooManyNotifications").detail("Amount", notify.size());
TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
for(int i=0; i<notify.size(); i++)
notify[i].send( currentNominee.get() );
notify.clear();
lastNotifiedCleared = now();
}
}
}
@ -234,11 +233,10 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
} else {
notify.push_back( req.reply );
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
TraceEvent(now() - lastNotifiedCleared < 100000 ? SevWarnAlways : SevWarn, "TooManyNotifications").detail("Amount", notify.size());
TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
for(int i=0; i<notify.size(); i++)
notify[i].send( currentNominee.get() );
notify.clear();
lastNotifiedCleared = now();
}
}
}
@ -255,7 +253,6 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
for(int i=0; i<notify.size(); i++)
notify[i].send( newInfo );
notify.clear();
lastNotifiedCleared = now();
req.reply.send( Void() );
return Void();
}
@ -284,7 +281,6 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
for(int i=0; i<notify.size(); i++)
notify[i].send( nextNominee );
notify.clear();
lastNotifiedCleared = now();
currentNominee = nextNominee;
} else if (currentNominee.present() && nextNominee.present() && currentNominee.get().equalInternalId(nextNominee.get())) {
// leader becomes better
@ -306,6 +302,13 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
availableCandidates.clear();
}
}
when( Void _ = wait(notifyCheck) ) {
notifyCheck = delay( SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / std::max<double>(SERVER_KNOBS->MIN_NOTIFICATIONS, notify.size()) );
if(!notify.empty() && currentNominee.present()) {
notify.front().send( currentNominee.get() );
notify.pop_front();
}
}
}
}

View File

@ -212,6 +212,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
// Leader election
bool longLeaderElection = randomize && BUGGIFY;
init( MAX_NOTIFICATIONS, 100000 );
init( MIN_NOTIFICATIONS, 100 );
init( NOTIFICATION_FULL_CLEAR_TIME, 10000.0 );
init( CANDIDATE_MIN_DELAY, 0.05 );
init( CANDIDATE_MAX_DELAY, 1.0 );
init( CANDIDATE_GROWTH_RATE, 1.2 );

View File

@ -158,6 +158,8 @@ public:
// Leader election
int MAX_NOTIFICATIONS;
int MIN_NOTIFICATIONS;
double NOTIFICATION_FULL_CLEAR_TIME;
double CANDIDATE_MIN_DELAY;
double CANDIDATE_MAX_DELAY;
double CANDIDATE_GROWTH_RATE;