removed verbose trace messages
This commit is contained in:
parent
2434d06726
commit
8b73a1c998
|
@ -42,6 +42,8 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
|||
init( FAILURE_EMERGENCY_DELAY, 30.0 );
|
||||
init( FAILURE_MAX_GENERATIONS, 10 );
|
||||
|
||||
init( COORDINATOR_RECONNECTION_DELAY, 1.0 );
|
||||
|
||||
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
|
||||
|
||||
init( WRONG_SHARD_SERVER_DELAY, .01 ); if( randomize && BUGGIFY ) WRONG_SHARD_SERVER_DELAY = deterministicRandom()->random01(); // FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)
|
||||
|
|
|
@ -41,6 +41,8 @@ public:
|
|||
double FAILURE_EMERGENCY_DELAY;
|
||||
double FAILURE_MAX_GENERATIONS;
|
||||
|
||||
double COORDINATOR_RECONNECTION_DELAY;
|
||||
|
||||
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
|
||||
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)
|
||||
double FUTURE_VERSION_RETRY_DELAY;
|
||||
|
|
|
@ -538,10 +538,9 @@ ACTOR Future<Void> getClientInfoFromLeader( Reference<AsyncVar<Optional<ClusterC
|
|||
}
|
||||
|
||||
loop {
|
||||
TraceEvent("SendMessageToCC", knownLeader->get().get().clientInterface.id()).detail("ClientID", clientData->clientInfo->get().id);
|
||||
choose {
|
||||
when( ClientDBInfo ni = wait( brokenPromiseToNever( knownLeader->get().get().clientInterface.openDatabase.getReply( clientData->getRequest() ) ) ) ) {
|
||||
TraceEvent("GotClientInfo", knownLeader->get().get().clientInterface.id()).detail("Proxy0", ni.proxies.size() ? ni.proxies[0].id() : UID()).detail("ClientID", ni.id);
|
||||
TraceEvent("MonitorLeaderForProxiesGotClientInfo", knownLeader->get().get().clientInterface.id()).detail("Proxy0", ni.proxies.size() ? ni.proxies[0].id() : UID()).detail("ClientID", ni.id);
|
||||
clientData->clientInfo->set(ni);
|
||||
}
|
||||
when( wait( knownLeader->onChange() ) ) {}
|
||||
|
@ -556,54 +555,47 @@ ACTOR Future<Void> monitorLeaderForProxies( Value serializedInfo, ClientData* cl
|
|||
state Future<Void> allActors;
|
||||
state Reference<AsyncVar<Optional<ClusterControllerClientInterface>>> knownLeader(new AsyncVar<Optional<ClusterControllerClientInterface>>{});
|
||||
state ClusterConnectionString cs(serializedInfo.toString());
|
||||
try {
|
||||
for(auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) {
|
||||
clientLeaderServers.push_back( ClientLeaderRegInterface( *s ) );
|
||||
}
|
||||
|
||||
nominees.resize(clientLeaderServers.size());
|
||||
for(auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) {
|
||||
clientLeaderServers.push_back( ClientLeaderRegInterface( *s ) );
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> actors;
|
||||
// Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
|
||||
for(int i=0; i<clientLeaderServers.size(); i++) {
|
||||
TraceEvent("MonitorLeaderForProxiesMon").detail("Addr", clientLeaderServers[i].openDatabase.getEndpoint().getPrimaryAddress()).detail("Key", cs.clusterKey().printable());
|
||||
actors.push_back( monitorNominee( cs.clusterKey(), clientLeaderServers[i], &nomineeChange, &nominees[i] ) );
|
||||
}
|
||||
actors.push_back( getClientInfoFromLeader( knownLeader, clientData ) );
|
||||
allActors = waitForAll(actors);
|
||||
nominees.resize(clientLeaderServers.size());
|
||||
|
||||
loop {
|
||||
Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
|
||||
TraceEvent("MonitorLeaderForProxiesChange").detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1,1)).detail("Key", cs.clusterKey().printable());
|
||||
if (leader.present()) {
|
||||
if( leader.get().first.forward ) {
|
||||
ClientDBInfo outInfo;
|
||||
outInfo.id = deterministicRandom()->randomUniqueID();
|
||||
outInfo.forward = leader.get().first.serializedInfo;
|
||||
clientData->clientInfo->set(outInfo);
|
||||
TraceEvent("MonitorLeaderForProxiesForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString()).detail("OldConnStr", serializedInfo.toString());
|
||||
return Void();
|
||||
}
|
||||
std::vector<Future<Void>> actors;
|
||||
// Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
|
||||
for(int i=0; i<clientLeaderServers.size(); i++) {
|
||||
actors.push_back( monitorNominee( cs.clusterKey(), clientLeaderServers[i], &nomineeChange, &nominees[i] ) );
|
||||
}
|
||||
actors.push_back( getClientInfoFromLeader( knownLeader, clientData ) );
|
||||
allActors = waitForAll(actors);
|
||||
|
||||
if (leader.get().first.serializedInfo.size()) {
|
||||
if (g_network->useObjectSerializer()) {
|
||||
ObjectReader reader(leader.get().first.serializedInfo.begin());
|
||||
ClusterControllerClientInterface res;
|
||||
reader.deserialize(res);
|
||||
TraceEvent("MonitorLeaderForProxiesParse1", res.clientInterface.id()).detail("Key", cs.clusterKey().printable());
|
||||
knownLeader->set(res);
|
||||
} else {
|
||||
ClusterControllerClientInterface res = BinaryReader::fromStringRef<ClusterControllerClientInterface>( leader.get().first.serializedInfo, IncludeVersion() );
|
||||
TraceEvent("MonitorLeaderForProxiesParse2", res.clientInterface.id()).detail("Key", cs.clusterKey().printable());
|
||||
knownLeader->set(res);
|
||||
}
|
||||
loop {
|
||||
Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
|
||||
TraceEvent("MonitorLeaderForProxiesChange").detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1,1)).detail("Key", cs.clusterKey().printable());
|
||||
if (leader.present()) {
|
||||
if( leader.get().first.forward ) {
|
||||
ClientDBInfo outInfo;
|
||||
outInfo.id = deterministicRandom()->randomUniqueID();
|
||||
outInfo.forward = leader.get().first.serializedInfo;
|
||||
clientData->clientInfo->set(outInfo);
|
||||
TraceEvent("MonitorLeaderForProxiesForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString()).detail("OldConnStr", serializedInfo.toString());
|
||||
return Void();
|
||||
}
|
||||
|
||||
if (leader.get().first.serializedInfo.size()) {
|
||||
if (g_network->useObjectSerializer()) {
|
||||
ObjectReader reader(leader.get().first.serializedInfo.begin());
|
||||
ClusterControllerClientInterface res;
|
||||
reader.deserialize(res);
|
||||
knownLeader->set(res);
|
||||
} else {
|
||||
ClusterControllerClientInterface res = BinaryReader::fromStringRef<ClusterControllerClientInterface>( leader.get().first.serializedInfo, IncludeVersion() );
|
||||
knownLeader->set(res);
|
||||
}
|
||||
}
|
||||
wait( nomineeChange.onTrigger() || allActors );
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("MonitorLeaderForProxiesError").error(e,true).detail("Key", cs.clusterKey().printable()).backtrace();
|
||||
throw e;
|
||||
wait( nomineeChange.onTrigger() || allActors );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -614,46 +606,39 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
|
|||
state int idx = 0;
|
||||
state int successIdx = 0;
|
||||
deterministicRandom()->randomShuffle(addrs);
|
||||
try {
|
||||
loop {
|
||||
state ClientLeaderRegInterface clientLeaderServer( addrs[idx] );
|
||||
state OpenDatabaseCoordRequest req;
|
||||
req.key = cs.clusterKey();
|
||||
req.serializedInfo = info.intermediateConnFile->getConnectionString().toString();
|
||||
req.knownClientInfoID = clientInfo->get().id;
|
||||
loop {
|
||||
state ClientLeaderRegInterface clientLeaderServer( addrs[idx] );
|
||||
state OpenDatabaseCoordRequest req;
|
||||
req.key = cs.clusterKey();
|
||||
req.serializedInfo = info.intermediateConnFile->getConnectionString().toString();
|
||||
req.knownClientInfoID = clientInfo->get().id;
|
||||
|
||||
TraceEvent("MPOG_Start").detail("Addr", addrs[idx]).detail("Key", cs.clusterKey().printable());
|
||||
state ErrorOr<ClientDBInfo> rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) );
|
||||
TraceEvent("MPOG_Reply").detail("Addr", addrs[idx]).detail("Present", rep.present()).detail("Key", cs.clusterKey().printable()).detail("Proxy0", rep.present() && rep.get().proxies.size() ? rep.get().proxies[0].id() : UID());
|
||||
if (rep.present()) {
|
||||
if( rep.get().forward.present() ) {
|
||||
TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
|
||||
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().forward.get().toString())));
|
||||
return info;
|
||||
}
|
||||
if(connFile != info.intermediateConnFile) {
|
||||
if(!info.hasConnected) {
|
||||
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection").detail("Filename", connFile->getFilename())
|
||||
.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
|
||||
.detail("CurrentConnectionString", info.intermediateConnFile->getConnectionString().toString());
|
||||
}
|
||||
connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
|
||||
info.intermediateConnFile = connFile;
|
||||
}
|
||||
|
||||
info.hasConnected = true;
|
||||
connFile->notifyConnected();
|
||||
|
||||
clientInfo->set( rep.get() );
|
||||
successIdx = idx;
|
||||
} else if(idx == successIdx) {
|
||||
wait(delay(1.0));
|
||||
state ErrorOr<ClientDBInfo> rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) );
|
||||
if (rep.present()) {
|
||||
if( rep.get().forward.present() ) {
|
||||
TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
|
||||
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().forward.get().toString())));
|
||||
return info;
|
||||
}
|
||||
idx = (idx+1)%addrs.size();
|
||||
if(connFile != info.intermediateConnFile) {
|
||||
if(!info.hasConnected) {
|
||||
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection").detail("Filename", connFile->getFilename())
|
||||
.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
|
||||
.detail("CurrentConnectionString", info.intermediateConnFile->getConnectionString().toString());
|
||||
}
|
||||
connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
|
||||
info.intermediateConnFile = connFile;
|
||||
}
|
||||
|
||||
info.hasConnected = true;
|
||||
connFile->notifyConnected();
|
||||
|
||||
clientInfo->set( rep.get() );
|
||||
successIdx = idx;
|
||||
} else if(idx == successIdx) {
|
||||
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
|
||||
}
|
||||
} catch (Error &e) {
|
||||
TraceEvent("MPOG_Error").error(e,true).detail("Key", cs.clusterKey().printable());
|
||||
throw e;
|
||||
idx = (idx+1)%addrs.size();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -209,38 +209,31 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<AsyncVar<bool>> hasConnectedClients, OpenDatabaseCoordRequest req) {
|
||||
try {
|
||||
if(db->clientInfo->get().id != req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
|
||||
TraceEvent("OpenDatabaseCoordReply").detail("Forward", db->clientInfo->get().forward.present()).detail("Key", req.key.printable());
|
||||
req.reply.send( db->clientInfo->get() );
|
||||
return Void();
|
||||
}
|
||||
++(*clientCount);
|
||||
hasConnectedClients->set(true);
|
||||
|
||||
db->clientStatusInfoMap[req.reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(req.traceLogGroup.toString(), req.supportedVersions, req.issues);
|
||||
|
||||
TraceEvent("OpenDatabaseCoordWait").detail("Key", req.key.printable());
|
||||
while (db->clientInfo->get().id == req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
|
||||
choose {
|
||||
when (wait( db->clientInfo->onChange() )) {}
|
||||
when (wait( delayJittered( 300 ) )) { break; } // The client might be long gone!
|
||||
}
|
||||
}
|
||||
|
||||
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
|
||||
|
||||
if(db->clientInfo->get().id != req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
|
||||
req.reply.send( db->clientInfo->get() );
|
||||
|
||||
if(--(*clientCount) == 0) {
|
||||
hasConnectedClients->set(false);
|
||||
}
|
||||
TraceEvent("OpenDatabaseCoordReply").detail("Forward", db->clientInfo->get().forward.present()).detail("Key", req.key.printable()).detail("Proxy0", db->clientInfo->get().proxies.size() ? db->clientInfo->get().proxies[0].id() : UID());
|
||||
return Void();
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("OpenDatabaseCoordError").error(e,true).detail("Key", req.key.printable());
|
||||
throw;
|
||||
}
|
||||
++(*clientCount);
|
||||
hasConnectedClients->set(true);
|
||||
|
||||
db->clientStatusInfoMap[req.reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(req.traceLogGroup.toString(), req.supportedVersions, req.issues);
|
||||
|
||||
while (db->clientInfo->get().id == req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
|
||||
choose {
|
||||
when (wait( db->clientInfo->onChange() )) {}
|
||||
when (wait( delayJittered( 300 ) )) { break; } // The client might be long gone!
|
||||
}
|
||||
}
|
||||
|
||||
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
|
||||
|
||||
req.reply.send( db->clientInfo->get() );
|
||||
|
||||
if(--(*clientCount) == 0) {
|
||||
hasConnectedClients->set(false);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// This actor implements a *single* leader-election register (essentially, it ignores
|
||||
|
@ -261,134 +254,129 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
state ActorCollection actors(false);
|
||||
state Future<Void> leaderMon;
|
||||
|
||||
try {
|
||||
loop choose {
|
||||
when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) {
|
||||
if(!leaderMon.isValid()) {
|
||||
leaderMon = monitorLeaderForProxies(req.serializedInfo, &clientData);
|
||||
}
|
||||
actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req));
|
||||
loop choose {
|
||||
when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) {
|
||||
if(!leaderMon.isValid()) {
|
||||
leaderMon = monitorLeaderForProxies(req.serializedInfo, &clientData);
|
||||
}
|
||||
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
|
||||
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
|
||||
req.reply.send( currentNominee.get() );
|
||||
} else {
|
||||
notify.push_back( req.reply );
|
||||
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
|
||||
TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
|
||||
for (uint32_t i=0; i<notify.size(); i++)
|
||||
notify[i].send( currentNominee.get() );
|
||||
notify.clear();
|
||||
} else if(!nextInterval.isValid()) {
|
||||
nextInterval = delay(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
|
||||
if(!nextInterval.isValid()) {
|
||||
actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req));
|
||||
}
|
||||
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
|
||||
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
|
||||
req.reply.send( currentNominee.get() );
|
||||
} else {
|
||||
notify.push_back( req.reply );
|
||||
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
|
||||
TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
|
||||
for (uint32_t i=0; i<notify.size(); i++)
|
||||
notify[i].send( currentNominee.get() );
|
||||
notify.clear();
|
||||
} else if(!nextInterval.isValid()) {
|
||||
nextInterval = delay(0);
|
||||
}
|
||||
//TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID );
|
||||
availableCandidates.erase( LeaderInfo(req.prevChangeID) );
|
||||
availableCandidates.insert( req.myInfo );
|
||||
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
|
||||
req.reply.send( currentNominee.get() );
|
||||
}
|
||||
}
|
||||
when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
|
||||
if(!nextInterval.isValid()) {
|
||||
nextInterval = delay(0);
|
||||
}
|
||||
//TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID );
|
||||
availableCandidates.erase( LeaderInfo(req.prevChangeID) );
|
||||
availableCandidates.insert( req.myInfo );
|
||||
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
|
||||
req.reply.send( currentNominee.get() );
|
||||
} else {
|
||||
notify.push_back( req.reply );
|
||||
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
|
||||
TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
|
||||
for (uint32_t i=0; i<notify.size(); i++)
|
||||
notify[i].send( currentNominee.get() );
|
||||
notify.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
when (LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
|
||||
if(!nextInterval.isValid()) {
|
||||
nextInterval = delay(0);
|
||||
}
|
||||
//TODO: use notify to only send a heartbeat once per interval
|
||||
availableLeaders.erase( LeaderInfo(req.prevChangeID) );
|
||||
availableLeaders.insert( req.myInfo );
|
||||
req.reply.send( currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) );
|
||||
}
|
||||
when (ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
|
||||
LeaderInfo newInfo;
|
||||
newInfo.forward = true;
|
||||
newInfo.serializedInfo = req.conn.toString();
|
||||
for(unsigned int i=0; i<notify.size(); i++)
|
||||
notify[i].send( newInfo );
|
||||
notify.clear();
|
||||
ClientDBInfo outInfo;
|
||||
outInfo.id = deterministicRandom()->randomUniqueID();
|
||||
outInfo.forward = req.conn.toString();
|
||||
clientData.clientInfo->set(outInfo);
|
||||
req.reply.send( Void() );
|
||||
ASSERT(!hasConnectedClients->get());
|
||||
return Void();
|
||||
}
|
||||
when ( wait(nextInterval.isValid() ? nextInterval : Never()) ) {
|
||||
if (!availableLeaders.size() && !availableCandidates.size() && !notify.size() &&
|
||||
!currentNominee.present())
|
||||
{
|
||||
// Our state is back to the initial state, so we can safely stop this actor
|
||||
TraceEvent("EndingLeaderNomination").detail("Key", key).detail("HasConnectedClients", hasConnectedClients->get());
|
||||
if(!hasConnectedClients->get()) {
|
||||
return Void();
|
||||
} else {
|
||||
notify.push_back( req.reply );
|
||||
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
|
||||
TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
|
||||
for (uint32_t i=0; i<notify.size(); i++)
|
||||
notify[i].send( currentNominee.get() );
|
||||
notify.clear();
|
||||
nextInterval = Future<Void>();
|
||||
}
|
||||
} else {
|
||||
Optional<LeaderInfo> nextNominee;
|
||||
if( availableCandidates.size() && (!availableLeaders.size() || availableLeaders.begin()->leaderChangeRequired(*availableCandidates.begin())) ) {
|
||||
nextNominee = *availableCandidates.begin();
|
||||
} else if( availableLeaders.size() ) {
|
||||
nextNominee = *availableLeaders.begin();
|
||||
}
|
||||
|
||||
if( !currentNominee.present() || !nextNominee.present() || !currentNominee.get().equalInternalId(nextNominee.get()) || nextNominee.get() > currentNominee.get() ) {
|
||||
TraceEvent("NominatingLeader").detail("NextNominee", nextNominee.present() ? nextNominee.get().changeID : UID())
|
||||
.detail("CurrentNominee", currentNominee.present() ? currentNominee.get().changeID : UID()).detail("Key", printable(key));
|
||||
for(unsigned int i=0; i<notify.size(); i++)
|
||||
notify[i].send( nextNominee );
|
||||
notify.clear();
|
||||
}
|
||||
|
||||
currentNominee = nextNominee;
|
||||
|
||||
if( availableLeaders.size() ) {
|
||||
nextInterval = delay( SERVER_KNOBS->POLLING_FREQUENCY );
|
||||
if(leaderIntervalCount++ > 5) {
|
||||
candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
|
||||
}
|
||||
} else {
|
||||
nextInterval = delay( candidateDelay );
|
||||
candidateDelay = std::min(SERVER_KNOBS->CANDIDATE_MAX_DELAY, candidateDelay * SERVER_KNOBS->CANDIDATE_GROWTH_RATE);
|
||||
leaderIntervalCount = 0;
|
||||
}
|
||||
|
||||
availableLeaders.clear();
|
||||
availableCandidates.clear();
|
||||
}
|
||||
when (LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
|
||||
if(!nextInterval.isValid()) {
|
||||
nextInterval = delay(0);
|
||||
}
|
||||
//TODO: use notify to only send a heartbeat once per interval
|
||||
availableLeaders.erase( LeaderInfo(req.prevChangeID) );
|
||||
availableLeaders.insert( req.myInfo );
|
||||
req.reply.send( currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) );
|
||||
}
|
||||
when( wait(notifyCheck) ) {
|
||||
notifyCheck = delay( SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / std::max<double>(SERVER_KNOBS->MIN_NOTIFICATIONS, notify.size()) );
|
||||
if(!notify.empty() && currentNominee.present()) {
|
||||
notify.front().send( currentNominee.get() );
|
||||
notify.pop_front();
|
||||
}
|
||||
when (ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
|
||||
LeaderInfo newInfo;
|
||||
newInfo.forward = true;
|
||||
newInfo.serializedInfo = req.conn.toString();
|
||||
for(unsigned int i=0; i<notify.size(); i++)
|
||||
notify[i].send( newInfo );
|
||||
notify.clear();
|
||||
ClientDBInfo outInfo;
|
||||
outInfo.id = deterministicRandom()->randomUniqueID();
|
||||
outInfo.forward = req.conn.toString();
|
||||
clientData.clientInfo->set(outInfo);
|
||||
req.reply.send( Void() );
|
||||
ASSERT(!hasConnectedClients->get());
|
||||
}
|
||||
when( wait(hasConnectedClients->onChange()) ) {
|
||||
if(!hasConnectedClients->get() && !nextInterval.isValid()) {
|
||||
TraceEvent("LeaderRegisterUnneeded").detail("Key", key);
|
||||
return Void();
|
||||
}
|
||||
when ( wait(nextInterval.isValid() ? nextInterval : Never()) ) {
|
||||
if (!availableLeaders.size() && !availableCandidates.size() && !notify.size() &&
|
||||
!currentNominee.present())
|
||||
{
|
||||
// Our state is back to the initial state, so we can safely stop this actor
|
||||
TraceEvent("EndingLeaderNomination").detail("Key", key).detail("HasConnectedClients", hasConnectedClients->get());
|
||||
if(!hasConnectedClients->get()) {
|
||||
return Void();
|
||||
} else {
|
||||
nextInterval = Future<Void>();
|
||||
}
|
||||
} else {
|
||||
Optional<LeaderInfo> nextNominee;
|
||||
if( availableCandidates.size() && (!availableLeaders.size() || availableLeaders.begin()->leaderChangeRequired(*availableCandidates.begin())) ) {
|
||||
nextNominee = *availableCandidates.begin();
|
||||
} else if( availableLeaders.size() ) {
|
||||
nextNominee = *availableLeaders.begin();
|
||||
}
|
||||
|
||||
if( !currentNominee.present() || !nextNominee.present() || !currentNominee.get().equalInternalId(nextNominee.get()) || nextNominee.get() > currentNominee.get() ) {
|
||||
TraceEvent("NominatingLeader").detail("NextNominee", nextNominee.present() ? nextNominee.get().changeID : UID())
|
||||
.detail("CurrentNominee", currentNominee.present() ? currentNominee.get().changeID : UID()).detail("Key", printable(key));
|
||||
for(unsigned int i=0; i<notify.size(); i++)
|
||||
notify[i].send( nextNominee );
|
||||
notify.clear();
|
||||
}
|
||||
|
||||
currentNominee = nextNominee;
|
||||
|
||||
if( availableLeaders.size() ) {
|
||||
nextInterval = delay( SERVER_KNOBS->POLLING_FREQUENCY );
|
||||
if(leaderIntervalCount++ > 5) {
|
||||
candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
|
||||
}
|
||||
} else {
|
||||
nextInterval = delay( candidateDelay );
|
||||
candidateDelay = std::min(SERVER_KNOBS->CANDIDATE_MAX_DELAY, candidateDelay * SERVER_KNOBS->CANDIDATE_GROWTH_RATE);
|
||||
leaderIntervalCount = 0;
|
||||
}
|
||||
|
||||
availableLeaders.clear();
|
||||
availableCandidates.clear();
|
||||
}
|
||||
}
|
||||
when( wait(notifyCheck) ) {
|
||||
notifyCheck = delay( SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / std::max<double>(SERVER_KNOBS->MIN_NOTIFICATIONS, notify.size()) );
|
||||
if(!notify.empty() && currentNominee.present()) {
|
||||
notify.front().send( currentNominee.get() );
|
||||
notify.pop_front();
|
||||
}
|
||||
}
|
||||
when( wait(hasConnectedClients->onChange()) ) {
|
||||
if(!hasConnectedClients->get() && !nextInterval.isValid()) {
|
||||
TraceEvent("LeaderRegisterUnneeded").detail("Key", key);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
when( wait(actors.getResult()) ) {}
|
||||
}
|
||||
} catch (Error &e ) {
|
||||
TraceEvent("LeaderRegisterError").error(e,true);
|
||||
throw e;
|
||||
when( wait(actors.getResult()) ) {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -485,7 +473,6 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
|
|||
loop choose {
|
||||
when ( OpenDatabaseCoordRequest req = waitNext( interf.openDatabase.getFuture() ) ) {
|
||||
Optional<LeaderInfo> forward = regs.getForward(req.key);
|
||||
TraceEvent("OpenDatabaseCoordReq").detail("Forward", forward.present()).detail("Key", req.key.printable());
|
||||
if( forward.present() ) {
|
||||
ClientDBInfo info;
|
||||
info.forward = forward.get().serializedInfo;
|
||||
|
|
Loading…
Reference in New Issue