Implement the core logic of grey failure triggered failover

This commit is contained in:
Zhe Wu 2021-09-09 20:29:28 -07:00
parent e2fa511036
commit c0fbe5471f
4 changed files with 367 additions and 58 deletions

View File

@ -465,6 +465,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CHECK_OUTSTANDING_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) CHECK_OUTSTANDING_INTERVAL = 0.001;
init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0;
init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND );
init( INITIAL_UPDATE_CROSS_DC_INFO_DELAY, 300 );
init( CHECK_REMOTE_HEALTH_INTERVAL, 60 );
init( FORCE_RECOVERY_CHECK_DELAY, 5.0 );
init( RATEKEEPER_FAILURE_TIME, 1.0 );
init( REPLACE_INTERFACE_DELAY, 60.0 );
@ -479,7 +481,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CC_MAX_EXCLUSION_DUE_TO_HEALTH, 2 );
init( CC_HEALTH_TRIGGER_RECOVERY, false );
init( CC_TRACKING_HEALTH_RECOVERY_INTERVAL, 3600.0 );
init( CC_MAX_HEALTH_RECOVERY_COUNT, 2 );
init( CC_MAX_HEALTH_RECOVERY_COUNT, 5 );
init( CC_HEALTH_TRIGGER_FAILOVER, false );
init( CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION, 5 );
init( CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION, 10 );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );

View File

@ -389,6 +389,8 @@ public:
double INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
double VERSION_LAG_METRIC_INTERVAL;
int64_t MAX_VERSION_DIFFERENCE;
double INITIAL_UPDATE_CROSS_DC_INFO_DELAY;
double CHECK_REMOTE_HEALTH_INTERVAL;
double FORCE_RECOVERY_CHECK_DELAY;
double RATEKEEPER_FAILURE_TIME;
double REPLACE_INTERFACE_DELAY;
@ -412,6 +414,11 @@ public:
// CC_MAX_HEALTH_RECOVERY_COUNT within
// CC_TRACKING_HEALTH_RECOVERY_INTERVAL.
int CC_MAX_HEALTH_RECOVERY_COUNT;
bool CC_HEALTH_TRIGGER_FAILOVER; // Whether to enable health triggered failover in CC.
int CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION; // The minimum number of degraded servers that can trigger a
// failover.
int CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION; // The maximum number of degraded servers that can trigger a
// failover.
// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

View File

@ -2233,7 +2233,8 @@ public:
if (db.config.regions.size() > 1 && db.config.regions[0].priority > db.config.regions[1].priority &&
db.config.regions[0].dcId != clusterControllerDcId.get() && versionDifferenceUpdated &&
datacenterVersionDifference < SERVER_KNOBS->MAX_VERSION_DIFFERENCE) {
datacenterVersionDifference < SERVER_KNOBS->MAX_VERSION_DIFFERENCE &&
!remoteTransactionSystemContainsDegradedServers()) {
checkRegions(db.config.regions);
}
@ -2795,7 +2796,7 @@ public:
void updateWorkerHealth(const UpdateWorkerHealthRequest& req) {
std::string degradedPeersString;
for (int i = 0; i < req.degradedPeers.size(); ++i) {
degradedPeersString += i == 0 ? "" : " " + req.degradedPeers[i].toString();
degradedPeersString += (i == 0 ? "" : " ") + req.degradedPeers[i].toString();
}
TraceEvent("ClusterControllerUpdateWorkerHealth")
.detail("WorkerAddress", req.address)
@ -2929,24 +2930,9 @@ public:
return currentDegradedServersWithinLimit;
}
// Returns true when the cluster controller should trigger a recovery due to degraded servers are used in the
// transaction system in the primary data center.
bool shouldTriggerRecoveryDueToDegradedServers() {
if (degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
return false;
}
// Whether the transaction system (in primary DC if in HA setting) contains degraded servers.
bool transactionSystemContainsDegradedServers() {
const ServerDBInfo dbi = db.serverInfo->get();
if (dbi.recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return false;
}
// Do not trigger recovery if the cluster controller is excluded, since the master will change
// anyways once the cluster controller is moved
if (id_worker[clusterControllerProcessId].priorityInfo.isExcluded) {
return false;
}
for (const auto& excludedServer : degradedServers) {
if (dbi.master.addresses().contains(excludedServer)) {
return true;
@ -2985,6 +2971,78 @@ public:
return false;
}
// Whether transaction system in the remote DC, e.g. log router and tlogs in the remote DC, contains degraded
// servers.
bool remoteTransactionSystemContainsDegradedServers() {
if (db.config.usableRegions <= 1) {
return false;
}
const ServerDBInfo dbi = db.serverInfo->get();
for (const auto& excludedServer : degradedServers) {
for (auto& logSet : dbi.logSystemConfig.tLogs) {
if (logSet.isLocal || logSet.locality == tagLocalitySatellite) {
continue;
}
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(excludedServer)) {
return true;
}
}
for (const auto& logRouter : logSet.logRouters) {
if (logRouter.present() && logRouter.interf().addresses().contains(excludedServer)) {
return true;
}
}
}
}
return false;
}
// Returns true when the cluster controller should trigger a recovery due to degraded servers used in the
// transaction system in the primary data center.
bool shouldTriggerRecoveryDueToDegradedServers() {
if (degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
return false;
}
if (db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return false;
}
// Do not trigger recovery if the cluster controller is excluded, since the master will change
// anyways once the cluster controller is moved
if (id_worker[clusterControllerProcessId].priorityInfo.isExcluded) {
return false;
}
return transactionSystemContainsDegradedServers();
}
// Returns true when the cluster controller should trigger a failover due to degraded servers used in the
// transaction system in the primary data center, and no degradation in the remote data center.
bool shouldTriggerFailoverDueToDegradedServers() {
if (db.config.usableRegions <= 1) {
return false;
}
if (degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION ||
degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) {
return false;
}
// Do not trigger recovery if the cluster controller is excluded, since the master will change
// anyways once the cluster controller is moved
if (id_worker[clusterControllerProcessId].priorityInfo.isExcluded) {
return false;
}
return transactionSystemContainsDegradedServers() && !remoteTransactionSystemContainsDegradedServers();
}
int recentRecoveryCountDueToHealth() {
while (!recentHealthTriggeredRecoveryTime.empty() &&
now() - recentHealthTriggeredRecoveryTime.front() > SERVER_KNOBS->CC_TRACKING_HEALTH_RECOVERY_INTERVAL) {
@ -3041,6 +3099,9 @@ public:
// We use AsyncVars to kill (i.e. halt) singletons that have been replaced.
AsyncVar<bool> recruitDistributor;
Optional<UID> recruitingDistributorID;
bool remoteTransactionSystemDegraded;
bool recruitingDistributor;
AsyncVar<bool> recruitRatekeeper;
Optional<UID> recruitingRatekeeperID;
@ -3080,7 +3141,7 @@ public:
clusterControllerDcId(locality.dcId()), id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()),
outstandingRemoteRequestChecker(Void()), startTime(now()), goodRecruitmentTime(Never()),
goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false),
recruitDistributor(false), recruitRatekeeper(false),
remoteTransactionSystemDegraded(false), recruitingDistributor(false), recruitRatekeeper(false),
clusterControllerMetrics("ClusterController", id.toString()),
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),
@ -4563,6 +4624,11 @@ ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData* self) {
ACTOR Future<Void> updateDatacenterVersionDifference(ClusterControllerData* self) {
state double lastLogTime = 0;
// The purpose of the initial delay is to wait for the cluster to achieve a steady state before calculating the DC
// version difference, since DC version difference may trigger a failover, and we don't want that to happen too
// frequently.
wait(delay(SERVER_KNOBS->INITIAL_UPDATE_CROSS_DC_INFO_DELAY));
loop {
self->versionDifferenceUpdated = false;
if (self->db.serverInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
@ -4645,6 +4711,22 @@ ACTOR Future<Void> updateDatacenterVersionDifference(ClusterControllerData* self
}
}
// A background actor that periodically checks remote DC health, and `checkOutstandingRequests` if remote DC recovers.
ACTOR Future<Void> updateRemoteDCHealth(ClusterControllerData* self) {
// The purpose of the initial delay is to wait for the cluster to achieve a steady state before checking remote DC
// health, since remote DC healthy may trigger a failover, and we don't want that to happen too frequently.
wait(delay(SERVER_KNOBS->INITIAL_UPDATE_CROSS_DC_INFO_DELAY));
loop {
bool oldRemoteTransactionSystemDegraded = self->remoteTransactionSystemDegraded;
self->remoteTransactionSystemDegraded = self->remoteTransactionSystemContainsDegradedServers();
if (oldRemoteTransactionSystemDegraded && !self->remoteTransactionSystemDegraded) {
checkOutstandingRequests(self);
}
wait(delay(SERVER_KNOBS->CHECK_REMOTE_HEALTH_INTERVAL));
}
}
ACTOR Future<Void> doEmptyCommit(Database cx) {
state Transaction tr(cx);
loop {
@ -4953,6 +5035,19 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
self->excludedDegradedServers.clear();
TraceEvent("DegradedServerDetectedAndSuggestRecovery").log();
}
} else if (self->shouldTriggerFailoverDueToDegradedServers()) {
if (SERVER_KNOBS->CC_HEALTH_TRIGGER_FAILOVER) {
TraceEvent("DegradedServerDetectedAndTriggerFailover").log();
vector<Optional<Key>> dcPriority;
auto remoteDcId = self->db.config.regions[0].dcId == self->clusterControllerDcId.get()
? self->db.config.regions[1].dcId
: self->db.config.regions[0].dcId;
dcPriority.push_back(remoteDcId);
dcPriority.push_back(self->clusterControllerDcId);
self->desiredDcIds.set(dcPriority);
} else {
TraceEvent("DegradedServerDetectedAndSuggestFailover").log();
}
}
}
@ -4990,6 +5085,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(updatedChangingDatacenters(&self));
self.addActor.send(updatedChangedDatacenters(&self));
self.addActor.send(updateDatacenterVersionDifference(&self));
self.addActor.send(updateRemoteDCHealth(&self));
self.addActor.send(handleForcedRecoveries(&self, interf));
self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self));
@ -5499,4 +5595,108 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
return Void();
}
TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServers") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
NetworkAddress master(IPAddress(0x01010101), 1);
NetworkAddress tlog(IPAddress(0x02020202), 1);
NetworkAddress satelliteTlog(IPAddress(0x03030303), 1);
NetworkAddress remoteTlog(IPAddress(0x04040404), 1);
NetworkAddress logRouter(IPAddress(0x05050505), 1);
NetworkAddress backup(IPAddress(0x06060606), 1);
NetworkAddress proxy(IPAddress(0x07070707), 1);
NetworkAddress proxy2(IPAddress(0x08080808), 1);
NetworkAddress resolver(IPAddress(0x09090909), 1);
data.db.config.usableRegions = 2;
// Create a ServerDBInfo using above addresses.
ServerDBInfo testDbInfo;
testDbInfo.master.changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ master }, UID(1, 2)));
TLogInterface localTLogInterf;
localTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ tlog }, UID(1, 2)));
TLogInterface localLogRouterInterf;
localLogRouterInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouter }, UID(1, 2)));
BackupInterface backupInterf;
backupInterf.waitFailure = RequestStream<ReplyPromise<Void>>(Endpoint({ backup }, UID(1, 2)));
TLogSet localTLogSet;
localTLogSet.isLocal = true;
localTLogSet.tLogs.push_back(OptionalInterface(localTLogInterf));
localTLogSet.logRouters.push_back(OptionalInterface(localLogRouterInterf));
localTLogSet.backupWorkers.push_back(OptionalInterface(backupInterf));
testDbInfo.logSystemConfig.tLogs.push_back(localTLogSet);
TLogInterface sateTLogInterf;
sateTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTlog }, UID(1, 2)));
TLogSet sateTLogSet;
sateTLogSet.isLocal = true;
sateTLogSet.locality = tagLocalitySatellite;
sateTLogSet.tLogs.push_back(OptionalInterface(sateTLogInterf));
testDbInfo.logSystemConfig.tLogs.push_back(sateTLogSet);
TLogInterface remoteTLogInterf;
remoteTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTlog }, UID(1, 2)));
TLogSet remoteTLogSet;
remoteTLogSet.isLocal = false;
remoteTLogSet.tLogs.push_back(OptionalInterface(remoteTLogInterf));
testDbInfo.logSystemConfig.tLogs.push_back(remoteTLogSet);
GrvProxyInterface grvProxyInterf;
grvProxyInterf.getConsistentReadVersion =
RequestStream<struct GetReadVersionRequest>(Endpoint({ proxy }, UID(1, 2)));
testDbInfo.client.grvProxies.push_back(grvProxyInterf);
CommitProxyInterface commitProxyInterf;
commitProxyInterf.commit = RequestStream<struct CommitTransactionRequest>(Endpoint({ proxy2 }, UID(1, 2)));
testDbInfo.client.commitProxies.push_back(commitProxyInterf);
ResolverInterface resolverInterf;
resolverInterf.resolve = RequestStream<struct ResolveTransactionBatchRequest>(Endpoint({ resolver }, UID(1, 2)));
testDbInfo.resolvers.push_back(resolverInterf);
testDbInfo.recoveryState = RecoveryState::ACCEPTING_COMMITS;
// No failover when no degraded servers.
data.db.serverInfo->set(testDbInfo);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
// No failover when small number of degraded servers
data.degradedServers.insert(master);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradedServers.clear();
// Trigger failover when enough is degraded.
data.degradedServers.insert(master);
data.degradedServers.insert(tlog);
data.degradedServers.insert(proxy);
data.degradedServers.insert(proxy2);
data.degradedServers.insert(resolver);
ASSERT(data.shouldTriggerFailoverDueToDegradedServers());
// No failover when usable region is 1.
data.db.config.usableRegions = 1;
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.db.config.usableRegions = 2;
// No failover when remote is also degraded.
data.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradedServers.clear();
// No failover when some are not from transaction system
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 2));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 3));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 4));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 5));
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradedServers.clear();
return Void();
}
} // namespace

View File

@ -723,6 +723,89 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
} // namespace
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's remote DC.
bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
const auto& dbi = dbInfo->get();
for (const auto& logSet : dbi.logSystemConfig.tLogs) {
if (logSet.isLocal || logSet.locality == tagLocalitySatellite) {
continue;
}
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(address)) {
return true;
}
}
for (const auto& logRouter : logSet.logRouters) {
if (logRouter.present() && logRouter.interf().addresses().contains(address)) {
return true;
}
}
}
return false;
}
bool addressesInDbAndRemoteDc(const NetworkAddressList& addresses, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
return addressInDbAndRemoteDc(addresses.address, dbInfo) ||
(addresses.secondaryAddress.present() && addressInDbAndRemoteDc(addresses.secondaryAddress.get(), dbInfo));
}
namespace {
TEST_CASE("/fdbserver/worker/addressInDbAndRemoteDc") {
// Setup a ServerDBInfo for test.
ServerDBInfo testDbInfo;
LocalityData testLocal;
testLocal.set(LiteralStringRef("dcid"), StringRef(std::to_string(1)));
testDbInfo.master.locality = testLocal;
// First, create an empty TLogInterface, and check that it shouldn't be considered as in remote DC.
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().isLocal = true;
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface<TLogInterface>());
ASSERT(!addressInDbAndRemoteDc(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
TLogInterface localTlog(testLocal);
localTlog.initEndpoints();
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(localTlog));
ASSERT(!addressInDbAndRemoteDc(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a remote TLog, and it should be considered as in remote DC.
LocalityData fakeRemote;
fakeRemote.set(LiteralStringRef("dcid"), StringRef(std::to_string(2)));
TLogInterface remoteTlog(fakeRemote);
remoteTlog.initEndpoints();
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().isLocal = false;
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(remoteTlog));
ASSERT(addressInDbAndRemoteDc(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a remote log router, and it should be considered as in remote DC.
NetworkAddress logRouterAddress(IPAddress(0x26262626), 1);
TLogInterface remoteLogRouter(fakeRemote);
remoteLogRouter.initEndpoints();
remoteLogRouter.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouterAddress }, UID(1, 2)));
testDbInfo.logSystemConfig.tLogs.back().logRouters.push_back(OptionalInterface(remoteLogRouter));
ASSERT(addressInDbAndRemoteDc(logRouterAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a satellite tlog, and it shouldn't be considered as in remote DC.
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().locality == tagLocalitySatellite;
NetworkAddress satelliteTLogAddress(IPAddress(0x13131313), 1);
TLogInterface satelliteTLog(fakeRemote);
satelliteTLog.initEndpoints();
satelliteTLog.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTLogAddress }, UID(1, 2)));
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(satelliteTLog));
ASSERT(!addressInDbAndRemoteDc(satelliteTLogAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
return Void();
}
} // namespace
// The actor that actively monitors the health of local and peer servers, and reports anomaly to the cluster controller.
ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
@ -730,49 +813,63 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
loop {
Future<Void> nextHealthCheckDelay = Never();
if (dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
addressesInDbAndPrimaryDc(interf.addresses(), dbInfo) && ccInterface->get().present()) {
if (dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && ccInterface->get().present()) {
nextHealthCheckDelay = delay(SERVER_KNOBS->WORKER_HEALTH_MONITOR_INTERVAL);
const auto& allPeers = FlowTransport::transport().getAllPeers();
UpdateWorkerHealthRequest req;
for (const auto& [address, peer] : allPeers) {
if (peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
// Ignore peers that don't have enough samples.
// TODO(zhewu): Currently, FlowTransport latency monitor clears ping latency samples on a regular
// basis, which may affect the measurement count. Currently,
// WORKER_HEALTH_MONITOR_INTERVAL is much smaller than the ping clearance interval, so
// it may be ok. If this ends to be a problem, we need to consider keep track of last
// ping latencies logged.
continue;
}
if (!addressInDbAndPrimaryDc(address, dbInfo)) {
// Ignore the servers that are not in the database's transaction system and not in the primary DC.
// Note that currently we are not monitor storage servers, since lagging in storage servers today
// already can trigger server exclusion by data distributor.
continue;
}
bool workerInDb = false;
bool workerInPrimary = false;
if (addressesInDbAndPrimaryDc(interf.addresses(), dbInfo)) {
workerInDb = true;
workerInPrimary = true;
} else if (addressesInDbAndRemoteDc(interf.addresses(), dbInfo)) {
workerInDb = true;
workerInPrimary = false;
}
if (peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) >
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD ||
peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) >
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
// This is a degraded peer.
TraceEvent("HealthMonitorDetectDegradedPeer")
.suppressFor(30)
.detail("Peer", address)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)
.detail("CheckedPercentileLatency",
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE))
.detail("Count", peer->pingLatencies.getPopulationSize())
.detail("TimeoutCount", peer->timeoutCount);
if (workerInDb) {
for (const auto& [address, peer] : allPeers) {
if (peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
// Ignore peers that don't have enough samples.
// TODO(zhewu): Currently, FlowTransport latency monitor clears ping latency samples on a
// regular
// basis, which may affect the measurement count. Currently,
// WORKER_HEALTH_MONITOR_INTERVAL is much smaller than the ping clearance interval,
// so it may be ok. If this ends to be a problem, we need to consider keep track of
// last ping latencies logged.
continue;
}
req.degradedPeers.push_back(address);
if ((workerInPrimary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(!workerInPrimary && addressInDbAndRemoteDc(address, dbInfo))) {
// Ignore the servers that are not in the database's transaction system and not in the primary
// DC. Note that currently we are not monitor storage servers, since lagging in storage servers
// today already can trigger server exclusion by data distributor.
if (peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) >
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD ||
peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) >
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
// This is a degraded peer.
TraceEvent("HealthMonitorDetectDegradedPeer")
.suppressFor(30)
.detail("Peer", address)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)
.detail(
"CheckedPercentileLatency",
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE))
.detail("Count", peer->pingLatencies.getPopulationSize())
.detail("TimeoutCount", peer->timeoutCount);
req.degradedPeers.push_back(address);
}
}
}
}