Add cluster controller metrics
This commit is contained in:
parent
ca9fff8704
commit
31ce56eddf
|
@ -1161,11 +1161,36 @@ public:
|
|||
Optional<UID> recruitingRatekeeperID;
|
||||
AsyncVar<bool> recruitRatekeeper;
|
||||
|
||||
CounterCollection clusterControllerMetrics;
|
||||
|
||||
Counter openDatabaseRequests;
|
||||
Counter registerWorkerRequests;
|
||||
Counter getWorkersRequests;
|
||||
Counter getClientWorkersRequests;
|
||||
Counter registerMasterRequests;
|
||||
Counter getServerDBInfoRequests;
|
||||
Counter statusRequests;
|
||||
Counter failureMonitoringRequests;
|
||||
|
||||
Counter serversFailed;
|
||||
Counter serversUnfailed;
|
||||
|
||||
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
|
||||
: clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()),
|
||||
id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), gotProcessClasses(false),
|
||||
gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0),
|
||||
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false)
|
||||
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false),
|
||||
clusterControllerMetrics("ClusterController", id.toString()),
|
||||
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
|
||||
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),
|
||||
getWorkersRequests("GetWorkersRequests", clusterControllerMetrics),
|
||||
getClientWorkersRequests("GetClientWorkersRequests", clusterControllerMetrics),
|
||||
registerMasterRequests("RegisterMasterRequests", clusterControllerMetrics),
|
||||
getServerDBInfoRequests("GetServerDBInfoRequests", clusterControllerMetrics),
|
||||
statusRequests("StatusRequests", clusterControllerMetrics),
|
||||
failureMonitoringRequests("FailureMonitoringRequests", clusterControllerMetrics),
|
||||
serversFailed("ServersFailed", clusterControllerMetrics),
|
||||
serversUnfailed("ServersUnfailed", clusterControllerMetrics)
|
||||
{
|
||||
CachedSerialization<ServerDBInfo> newInfoCache = db.serverInfo->get();
|
||||
auto& serverInfo = newInfoCache.mutate();
|
||||
|
@ -1518,7 +1543,7 @@ struct FailureStatusInfo {
|
|||
};
|
||||
|
||||
//The failure monitor client relies on the fact that the failure detection server will not declare itself failed
|
||||
ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::DBInfo* db, FutureStream< FailureMonitoringRequest > requests ) {
|
||||
ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData* self, FutureStream< FailureMonitoringRequest > requests ) {
|
||||
state Version currentVersion = 0;
|
||||
state std::map<NetworkAddressList, FailureStatusInfo> currentStatus; // The status at currentVersion
|
||||
state std::deque<SystemFailureStatus> statusHistory; // The last change in statusHistory is from currentVersion-1 to currentVersion
|
||||
|
@ -1527,6 +1552,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
|
||||
loop choose {
|
||||
when ( FailureMonitoringRequest req = waitNext( requests ) ) {
|
||||
++self->failureMonitoringRequests;
|
||||
if ( req.senderStatus.present() ) {
|
||||
// Update the status of requester, if necessary
|
||||
auto& stat = currentStatus[ req.addresses ];
|
||||
|
@ -1536,6 +1562,12 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
|
||||
stat.insertRequest(now());
|
||||
if (req.senderStatus != stat.status) {
|
||||
if(newStat.failed) {
|
||||
++self->serversFailed;
|
||||
}
|
||||
else {
|
||||
++self->serversUnfailed;
|
||||
}
|
||||
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", req.addresses.toString()).detail("Status", newStat.failed ? "Failed" : "OK").detail("Why", "Request");
|
||||
statusHistory.push_back( SystemFailureStatus( req.addresses, newStat ) );
|
||||
++currentVersion;
|
||||
|
@ -1615,7 +1647,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
//TraceEvent("FailureDetectionPoll", uniqueID).detail("PivotDelay", pivotDelay).detail("Clients", currentStatus.size());
|
||||
//TraceEvent("FailureDetectionAcceptableDelay").detail("Delay", acceptableDelay1000);
|
||||
|
||||
bool tooManyLogGenerations = std::max(db->unfinishedRecoveries, db->logGenerations) > CLIENT_KNOBS->FAILURE_MAX_GENERATIONS;
|
||||
bool tooManyLogGenerations = std::max(self->db.unfinishedRecoveries, self->db.logGenerations) > CLIENT_KNOBS->FAILURE_MAX_GENERATIONS;
|
||||
|
||||
for(auto it = currentStatus.begin(); it != currentStatus.end(); ) {
|
||||
double delay = t - it->second.lastRequestTime;
|
||||
|
@ -1624,7 +1656,8 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::
|
|||
( delay > pivotDelay * 2 + FLOW_KNOBS->SERVER_REQUEST_INTERVAL + CLIENT_KNOBS->FAILURE_MIN_DELAY || delay > CLIENT_KNOBS->FAILURE_MAX_DELAY ) ) ) {
|
||||
//printf("Failure Detection Server: Status of '%s' is now '%s' after %f sec\n", it->first.toString().c_str(), "Failed", now() - it->second.lastRequestTime);
|
||||
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", describe(it->first)).detail("Status","Failed").detail("Why", "Timeout").detail("LastRequestAge", delay)
|
||||
.detail("PivotDelay", pivotDelay).detail("UnfinishedRecoveries", db->unfinishedRecoveries).detail("LogGenerations", db->logGenerations);
|
||||
.detail("PivotDelay", pivotDelay).detail("UnfinishedRecoveries", self->db.unfinishedRecoveries).detail("LogGenerations", self->db.logGenerations);
|
||||
++self->serversFailed;
|
||||
statusHistory.push_back( SystemFailureStatus( it->first, FailureStatus(true) ) );
|
||||
++currentVersion;
|
||||
it = currentStatus.erase(it);
|
||||
|
@ -2005,6 +2038,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
|
|||
try {
|
||||
// Wait til first request is ready
|
||||
StatusRequest req = waitNext(requests);
|
||||
++self->statusRequests;
|
||||
requests_batch.push_back(req);
|
||||
|
||||
// Earliest time at which we may begin a new request
|
||||
|
@ -2584,7 +2618,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
state uint64_t step = 0;
|
||||
state Future<ErrorOr<Void>> error = errorOr( actorCollection( self.addActor.getFuture() ) );
|
||||
|
||||
self.addActor.send( failureDetectionServer( self.id, &self.db, interf.clientInterface.failureMonitoring.getFuture() ) );
|
||||
self.addActor.send( failureDetectionServer( self.id, &self, interf.clientInterface.failureMonitoring.getFuture() ) );
|
||||
self.addActor.send( clusterWatchDatabase( &self, &self.db ) ); // Start the master database
|
||||
self.addActor.send( self.updateWorkerList.init( self.db.db ) );
|
||||
self.addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));
|
||||
|
@ -2598,6 +2632,8 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
self.addActor.send( handleForcedRecoveries(&self, interf) );
|
||||
self.addActor.send( monitorDataDistributor(&self) );
|
||||
self.addActor.send( monitorRatekeeper(&self) );
|
||||
self.addActor.send( traceCounters("ClusterControllerMetrics", self.id, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self.clusterControllerMetrics, self.id.toString() + "/ClusterControllerMetrics") );
|
||||
|
||||
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
|
||||
|
||||
loop choose {
|
||||
|
@ -2613,6 +2649,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
return Void();
|
||||
}
|
||||
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
|
||||
++self.openDatabaseRequests;
|
||||
self.addActor.send(clusterOpenDatabase(&self.db, req));
|
||||
}
|
||||
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
|
||||
|
@ -2625,9 +2662,11 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
clusterRecruitStorage( &self, req );
|
||||
}
|
||||
when( RegisterWorkerRequest req = waitNext( interf.registerWorker.getFuture() ) ) {
|
||||
++self.registerWorkerRequests;
|
||||
registerWorker( req, &self );
|
||||
}
|
||||
when( GetWorkersRequest req = waitNext( interf.getWorkers.getFuture() ) ) {
|
||||
++self.getWorkersRequests;
|
||||
vector<WorkerDetails> workers;
|
||||
|
||||
for(auto& it : self.id_worker) {
|
||||
|
@ -2645,6 +2684,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
req.reply.send( workers );
|
||||
}
|
||||
when( GetClientWorkersRequest req = waitNext( interf.clientInterface.getClientWorkers.getFuture() ) ) {
|
||||
++self.getClientWorkersRequests;
|
||||
vector<ClientWorkerInterface> workers;
|
||||
for(auto& it : self.id_worker) {
|
||||
if (it.second.details.processClass.classType() != ProcessClass::TesterClass) {
|
||||
|
@ -2661,9 +2701,11 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
TraceEvent("CoordinationPingSent", self.id).detail("TimeStep", message.timeStep);
|
||||
}
|
||||
when( RegisterMasterRequest req = waitNext( interf.registerMaster.getFuture() ) ) {
|
||||
++self.registerMasterRequests;
|
||||
clusterRegisterMaster( &self, req );
|
||||
}
|
||||
when( GetServerDBInfoRequest req = waitNext( interf.getServerDBInfo.getFuture() ) ) {
|
||||
++self.getServerDBInfoRequests;
|
||||
self.addActor.send(
|
||||
clusterGetServerInfo(&self.db, req.knownServerInfoID, req.issues, req.incompatiblePeers, req.reply));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue