Merge pull request #3383 from vishesh/release-6.3
refactor: Remove dead failureDetectionServer code
This commit is contained in:
commit
05220c045c
|
@ -1349,10 +1349,6 @@ public:
|
|||
Counter getClientWorkersRequests;
|
||||
Counter registerMasterRequests;
|
||||
Counter statusRequests;
|
||||
Counter failureMonitoringRequests;
|
||||
|
||||
Counter serversFailed;
|
||||
Counter serversUnfailed;
|
||||
|
||||
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
|
||||
: clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()),
|
||||
|
@ -1366,10 +1362,7 @@ public:
|
|||
getWorkersRequests("GetWorkersRequests", clusterControllerMetrics),
|
||||
getClientWorkersRequests("GetClientWorkersRequests", clusterControllerMetrics),
|
||||
registerMasterRequests("RegisterMasterRequests", clusterControllerMetrics),
|
||||
statusRequests("StatusRequests", clusterControllerMetrics),
|
||||
failureMonitoringRequests("FailureMonitoringRequests", clusterControllerMetrics),
|
||||
serversFailed("ServersFailed", clusterControllerMetrics),
|
||||
serversUnfailed("ServersUnfailed", clusterControllerMetrics)
|
||||
statusRequests("StatusRequests", clusterControllerMetrics)
|
||||
{
|
||||
auto serverInfo = ServerDBInfo();
|
||||
serverInfo.id = deterministicRandom()->randomUniqueID();
|
||||
|
@ -1798,137 +1791,6 @@ struct FailureStatusInfo {
|
|||
}
|
||||
};
|
||||
|
||||
//The failure monitor client relies on the fact that the failure detection server will not declare itself failed
|
||||
ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData* self, FutureStream< FailureMonitoringRequest > requests ) {
|
||||
state Version currentVersion = 0;
|
||||
state std::map<NetworkAddressList, FailureStatusInfo> currentStatus; // The status at currentVersion
|
||||
state std::deque<SystemFailureStatus> statusHistory; // The last change in statusHistory is from currentVersion-1 to currentVersion
|
||||
state Future<Void> periodically = Void();
|
||||
state double lastT = 0;
|
||||
|
||||
loop choose {
|
||||
when ( FailureMonitoringRequest req = waitNext( requests ) ) {
|
||||
// TODO: Handling this request should no longer be necessary.
|
||||
++self->failureMonitoringRequests;
|
||||
if ( req.senderStatus.present() ) {
|
||||
// Update the status of requester, if necessary
|
||||
auto& stat = currentStatus[ req.addresses ];
|
||||
auto& newStat = req.senderStatus.get();
|
||||
|
||||
ASSERT( !newStat.failed || req.addresses != g_network->getLocalAddresses() );
|
||||
|
||||
stat.insertRequest(now());
|
||||
if (req.senderStatus != stat.status) {
|
||||
if(newStat.failed) {
|
||||
++self->serversFailed;
|
||||
}
|
||||
else {
|
||||
++self->serversUnfailed;
|
||||
}
|
||||
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", req.addresses.toString()).detail("Status", newStat.failed ? "Failed" : "OK").detail("Why", "Request");
|
||||
statusHistory.push_back( SystemFailureStatus( req.addresses, newStat ) );
|
||||
++currentVersion;
|
||||
|
||||
if (req.senderStatus == FailureStatus()){
|
||||
// failureMonitorClient reports explicitly that it is failed
|
||||
ASSERT(false); // This can't happen at the moment; if that changes, make this a TEST instead
|
||||
currentStatus.erase(req.addresses);
|
||||
} else {
|
||||
TEST(true);
|
||||
stat.status = newStat;
|
||||
}
|
||||
|
||||
while (statusHistory.size() > currentStatus.size())
|
||||
statusHistory.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
// Return delta-compressed status changes to requester
|
||||
Version reqVersion = req.failureInformationVersion;
|
||||
if (reqVersion > currentVersion){
|
||||
req.reply.sendError( future_version() );
|
||||
ASSERT(false);
|
||||
} else {
|
||||
TEST(true); // failureDetectionServer sending failure data to requester
|
||||
FailureMonitoringReply reply;
|
||||
reply.failureInformationVersion = currentVersion;
|
||||
if( req.senderStatus.present() ) {
|
||||
reply.clientRequestIntervalMS = FLOW_KNOBS->SERVER_REQUEST_INTERVAL * 1000;
|
||||
reply.considerServerFailedTimeoutMS = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY * 1000;
|
||||
} else {
|
||||
reply.clientRequestIntervalMS = FLOW_KNOBS->CLIENT_REQUEST_INTERVAL * 1000;
|
||||
reply.considerServerFailedTimeoutMS = CLIENT_KNOBS->CLIENT_FAILURE_TIMEOUT_DELAY * 1000;
|
||||
}
|
||||
|
||||
ASSERT( currentVersion >= (int64_t)statusHistory.size());
|
||||
|
||||
if (reqVersion < currentVersion - (int64_t)statusHistory.size() || reqVersion == 0) {
|
||||
// Send everything
|
||||
TEST(true); // failureDetectionServer sending all current data to requester
|
||||
reply.allOthersFailed = true;
|
||||
for(auto it = currentStatus.begin(); it != currentStatus.end(); ++it)
|
||||
reply.changes.push_back( reply.arena, SystemFailureStatus( it->first, it->second.status ) );
|
||||
} else {
|
||||
TEST(true); // failureDetectionServer sending delta-compressed data to requester
|
||||
// SOMEDAY: Send only the last change for a given address?
|
||||
reply.allOthersFailed = false;
|
||||
for(int v = reqVersion - currentVersion + statusHistory.size(); v < statusHistory.size(); v++) {
|
||||
reply.changes.push_back( reply.arena, statusHistory[v] );
|
||||
}
|
||||
}
|
||||
req.reply.send( reply );
|
||||
}
|
||||
}
|
||||
when ( wait( periodically ) ) {
|
||||
periodically = delay( FLOW_KNOBS->SERVER_REQUEST_INTERVAL );
|
||||
double t = now();
|
||||
if (lastT != 0 && t - lastT > 1)
|
||||
TraceEvent("LongDelayOnClusterController").detail("Duration", t - lastT);
|
||||
lastT = t;
|
||||
|
||||
// Adapt to global unresponsiveness
|
||||
vector<double> delays;
|
||||
for(auto it=currentStatus.begin(); it!=currentStatus.end(); it++)
|
||||
if (it->second.penultimateRequestTime) {
|
||||
delays.push_back(it->second.latency(t));
|
||||
//TraceEvent("FDData", uniqueID).detail("S", it->first.toString()).detail("L", it->second.latency(t));
|
||||
}
|
||||
int pivot = std::max(0, (int)delays.size()-2);
|
||||
double pivotDelay = 0;
|
||||
if (delays.size()) {
|
||||
std::nth_element(delays.begin(), delays.begin()+pivot, delays.end());
|
||||
pivotDelay = *(delays.begin()+pivot);
|
||||
}
|
||||
pivotDelay = std::max(0.0, pivotDelay - FLOW_KNOBS->SERVER_REQUEST_INTERVAL);
|
||||
|
||||
//TraceEvent("FailureDetectionPoll", uniqueID).detail("PivotDelay", pivotDelay).detail("Clients", currentStatus.size());
|
||||
//TraceEvent("FailureDetectionAcceptableDelay").detail("Delay", acceptableDelay1000);
|
||||
|
||||
bool useEmergencyDelay = (std::max(self->db.unfinishedRecoveries, self->db.logGenerations) > CLIENT_KNOBS->FAILURE_MAX_GENERATIONS) ||
|
||||
(now() - self->startTime < CLIENT_KNOBS->FAILURE_EMERGENCY_DELAY);
|
||||
|
||||
for(auto it = currentStatus.begin(); it != currentStatus.end(); ) {
|
||||
double delay = t - it->second.lastRequestTime;
|
||||
if ( it->first != g_network->getLocalAddresses() && ( useEmergencyDelay ?
|
||||
( delay > CLIENT_KNOBS->FAILURE_EMERGENCY_DELAY ) :
|
||||
( delay > pivotDelay * 2 + FLOW_KNOBS->SERVER_REQUEST_INTERVAL + CLIENT_KNOBS->FAILURE_MIN_DELAY || delay > CLIENT_KNOBS->FAILURE_MAX_DELAY ) ) ) {
|
||||
//printf("Failure Detection Server: Status of '%s' is now '%s' after %f sec\n", it->first.toString().c_str(), "Failed", now() - it->second.lastRequestTime);
|
||||
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", describe(it->first)).detail("Status","Failed").detail("Why", "Timeout").detail("LastRequestAge", delay)
|
||||
.detail("PivotDelay", pivotDelay).detail("UnfinishedRecoveries", self->db.unfinishedRecoveries).detail("LogGenerations", self->db.logGenerations);
|
||||
++self->serversFailed;
|
||||
statusHistory.push_back( SystemFailureStatus( it->first, FailureStatus(true) ) );
|
||||
++currentVersion;
|
||||
it = currentStatus.erase(it);
|
||||
while (statusHistory.size() > currentStatus.size())
|
||||
statusHistory.pop_front();
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<vector<TLogInterface>> requireAll( vector<Future<Optional<vector<TLogInterface>>>> in ) {
|
||||
state vector<TLogInterface> out;
|
||||
state int i;
|
||||
|
@ -3077,7 +2939,6 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
state uint64_t step = 0;
|
||||
state Future<ErrorOr<Void>> error = errorOr( actorCollection( self.addActor.getFuture() ) );
|
||||
|
||||
self.addActor.send( failureDetectionServer( self.id, &self, interf.clientInterface.failureMonitoring.getFuture() ) );
|
||||
self.addActor.send( clusterWatchDatabase( &self, &self.db ) ); // Start the master database
|
||||
self.addActor.send( self.updateWorkerList.init( self.db.db ) );
|
||||
self.addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));
|
||||
|
|
Loading…
Reference in New Issue