Merge pull request #9890 from halfprice/zhewu/log-router-gray-failure

Gray failure detects disconnected remote log router and recover high DC lag
This commit is contained in:
Zhe Wu 2023-04-07 16:25:11 -07:00 committed by GitHub
commit 10a6f3d2d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 134 additions and 26 deletions

View File

@ -663,6 +663,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CC_ENABLE_ENTIRE_SATELLITE_MONITORING, false );
init( CC_SATELLITE_DEGRADATION_MIN_COMPLAINER, 3 );
init( CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER, 3 );
init( CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING, true );
init( CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL, 0.5 );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;

View File

@ -613,6 +613,8 @@ public:
// be determined as degraded worker.
int CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER; // The minimum amount of degraded server in satellite DC to be
// determined as degraded satellite.
bool CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING; // When enabled, gray failure tries to detect whether the remote log
// router is degraded and may use trigger recovery to recover from it.
double CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL; // The interval to prevent re-recruiting the same singleton if a
// recruiting fight between two cluster controllers occurs.

View File

@ -3468,14 +3468,11 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
TLogInterface localTLogInterf;
localTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ tlog }, testUID));
TLogInterface localLogRouterInterf;
localLogRouterInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouter }, testUID));
BackupInterface backupInterf;
backupInterf.waitFailure = RequestStream<ReplyPromise<Void>>(Endpoint({ backup }, testUID));
TLogSet localTLogSet;
localTLogSet.isLocal = true;
localTLogSet.tLogs.push_back(OptionalInterface(localTLogInterf));
localTLogSet.logRouters.push_back(OptionalInterface(localLogRouterInterf));
localTLogSet.backupWorkers.push_back(OptionalInterface(backupInterf));
testDbInfo.logSystemConfig.tLogs.push_back(localTLogSet);
@ -3489,9 +3486,12 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
TLogInterface remoteTLogInterf;
remoteTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTlog }, testUID));
TLogInterface remoteLogRouterInterf;
remoteLogRouterInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouter }, testUID));
TLogSet remoteTLogSet;
remoteTLogSet.isLocal = false;
remoteTLogSet.tLogs.push_back(OptionalInterface(remoteTLogInterf));
remoteTLogSet.logRouters.push_back(OptionalInterface(remoteLogRouterInterf));
testDbInfo.logSystemConfig.tLogs.push_back(remoteTLogSet);
GrvProxyInterface proxyInterf;
@ -3543,12 +3543,14 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No recovery when log router is degraded.
// No recovery when remote log router is degraded.
data.degradationInfo.degradedServers.insert(logRouter);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
// Trigger recovery when remote log router is disconnected.
data.degradationInfo.disconnectedServers.insert(logRouter);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No recovery when backup worker is degraded.

View File

@ -3168,25 +3168,34 @@ public:
// Whether the transaction system (in primary DC if in HA setting) contains degraded servers.
bool transactionSystemContainsDegradedServers() {
const ServerDBInfo& dbi = db.serverInfo->get();
auto transactionWorkerInList = [&dbi](const std::unordered_set<NetworkAddress>& serverList,
bool skipSatellite) -> bool {
auto transactionWorkerInList =
[&dbi](const std::unordered_set<NetworkAddress>& serverList, bool skipSatellite, bool skipRemote) -> bool {
for (const auto& server : serverList) {
if (dbi.master.addresses().contains(server)) {
return true;
}
for (const auto& logSet : dbi.logSystemConfig.tLogs) {
if (!logSet.isLocal) {
// We don't check server degradation for remote TLogs since it is not on the transaction system
// critical path.
continue;
}
if (skipSatellite && logSet.locality == tagLocalitySatellite) {
continue;
}
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(server)) {
return true;
if (skipRemote && !logSet.isLocal) {
continue;
}
if (!logSet.isLocal) {
// Only check log routers in the remote region.
for (const auto& logRouter : logSet.logRouters) {
if (logRouter.present() && logRouter.interf().addresses().contains(server)) {
return true;
}
}
} else {
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(server)) {
return true;
}
}
}
}
@ -3213,10 +3222,13 @@ public:
return false;
};
// Check if transaction system contains degraded/disconnected servers. For satellite, we only check for
// disconnection since the latency between prmary and satellite is across WAN and may not be very stable.
return transactionWorkerInList(degradationInfo.degradedServers, /*skipSatellite=*/true) ||
transactionWorkerInList(degradationInfo.disconnectedServers, /*skipSatellite=*/false);
// Check if transaction system contains degraded/disconnected servers. For satellite and remote regions, we only
// check for disconnection since the latency between prmary and satellite is across WAN and may not be very
// stable.
return transactionWorkerInList(degradationInfo.degradedServers, /*skipSatellite=*/true, /*skipRemote=*/true) ||
transactionWorkerInList(degradationInfo.disconnectedServers,
/*skipSatellite=*/false,
/*skipRemote=*/!SERVER_KNOBS->CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING);
}
// Whether transaction system in the remote DC, e.g. log router and tlogs in the remote DC, contains degraded

View File

@ -1036,6 +1036,67 @@ TEST_CASE("/fdbserver/worker/addressInDbAndRemoteDc") {
} // namespace
bool addressIsRemoteLogRouter(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
const auto& dbi = dbInfo->get();
for (const auto& logSet : dbi.logSystemConfig.tLogs) {
if (!logSet.isLocal) {
for (const auto& logRouter : logSet.logRouters) {
if (logRouter.present() && logRouter.interf().addresses().contains(address)) {
return true;
}
}
}
}
return false;
}
namespace {
TEST_CASE("/fdbserver/worker/addressIsRemoteLogRouter") {
// Setup a ServerDBInfo for test.
ServerDBInfo testDbInfo;
LocalityData testLocal;
testLocal.set("dcid"_sr, StringRef(std::to_string(1)));
testDbInfo.master.locality = testLocal;
// First, create an empty TLogInterface, and check that it shouldn't be considered as remote log router.
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().isLocal = true;
testDbInfo.logSystemConfig.tLogs.back().logRouters.push_back(OptionalInterface<TLogInterface>());
ASSERT(!addressIsRemoteLogRouter(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a local log router, and it shouldn't be considered as remote log router.
TLogInterface localLogRouter(testLocal);
localLogRouter.initEndpoints();
testDbInfo.logSystemConfig.tLogs.back().logRouters.push_back(OptionalInterface(localLogRouter));
ASSERT(!addressIsRemoteLogRouter(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a remote TLog, and it shouldn't be considered as remote log router.
LocalityData fakeRemote;
fakeRemote.set("dcid"_sr, StringRef(std::to_string(2)));
TLogInterface remoteTlog(fakeRemote);
remoteTlog.initEndpoints();
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().isLocal = false;
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(remoteTlog));
ASSERT(!addressIsRemoteLogRouter(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a remote log router, and it should be considered as remote log router.
NetworkAddress logRouterAddress(IPAddress(0x26262626), 1);
TLogInterface remoteLogRouter(fakeRemote);
remoteLogRouter.initEndpoints();
remoteLogRouter.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouterAddress }, UID(1, 2)));
testDbInfo.logSystemConfig.tLogs.back().logRouters.push_back(OptionalInterface(remoteLogRouter));
ASSERT(addressIsRemoteLogRouter(logRouterAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
return Void();
}
} // namespace
// The actor that actively monitors the health of local and peer servers, and reports anomaly to the cluster controller.
ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
@ -1046,14 +1107,20 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
if (dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && ccInterface->get().present()) {
nextHealthCheckDelay = delay(SERVER_KNOBS->WORKER_HEALTH_MONITOR_INTERVAL);
const auto& allPeers = FlowTransport::transport().getAllPeers();
// Check remote log router connectivity only when remote TLogs are recruited and in use.
bool checkRemoteLogRouterConnectivity = dbInfo->get().recoveryState == RecoveryState::ALL_LOGS_RECRUITED ||
dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED;
UpdateWorkerHealthRequest req;
enum WorkerLocation { None, Primary, Remote };
enum WorkerLocation { None, Primary, Satellite, Remote };
WorkerLocation workerLocation = None;
if (addressesInDbAndPrimaryDc(interf.addresses(), dbInfo)) {
workerLocation = Primary;
} else if (addressesInDbAndRemoteDc(interf.addresses(), dbInfo)) {
workerLocation = Remote;
} else if (addressesInDbAndPrimarySatelliteDc(interf.addresses(), dbInfo)) {
workerLocation = Satellite;
}
if (workerLocation != None) {
@ -1062,11 +1129,10 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
// Ignore peers that don't have enough samples.
// TODO(zhewu): Currently, FlowTransport latency monitor clears ping latency samples on a
// regular
// basis, which may affect the measurement count. Currently,
// WORKER_HEALTH_MONITOR_INTERVAL is much smaller than the ping clearance interval,
// so it may be ok. If this ends to be a problem, we need to consider keep track of
// last ping latencies logged.
// regular basis, which may affect the measurement count. Currently,
// WORKER_HEALTH_MONITOR_INTERVAL is much smaller than the ping clearance interval, so it may be
// ok. If this ends to be a problem, we need to consider keep track of last ping latencies
// logged.
continue;
}
bool degradedPeer = false;
@ -1137,6 +1203,28 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
.detail("PingTimeoutCount", peer->timeoutCount)
.detail("ConnectionFailureCount", peer->connectFailedCount);
}
} else if (checkRemoteLogRouterConnectivity &&
(workerLocation == Primary || workerLocation == Satellite) &&
addressIsRemoteLogRouter(address, dbInfo)) {
// Monitor remote log router's connectivity to the primary DCs' transaction system. We ignore
// latency based degradation between primary region and remote region due to that remote region
// may be distant from primary region.
if (peer->connectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT) {
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("WorkerLocation", workerLocation)
.detail("Peer", address)
.detail("RemoteLogRouter", true)
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", true)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("PingCount", peer->pingLatencies.getPopulationSize())
.detail("PingTimeoutCount", peer->timeoutCount)
.detail("ConnectionFailureCount", peer->connectFailedCount);
disconnectedPeer = true;
}
}
if (disconnectedPeer) {
@ -1160,7 +1248,10 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
if ((workerLocation == Primary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(workerLocation == Remote && addressInDbAndRemoteDc(address, dbInfo)) ||
(workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo))) {
(workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo)) ||
(checkRemoteLogRouterConnectivity &&
(workerLocation == Primary || workerLocation == Satellite) &&
addressIsRemoteLogRouter(address, dbInfo))) {
TraceEvent("HealthMonitorDetectRecentClosedPeer").suppressFor(30).detail("Peer", address);
req.disconnectedPeers.push_back(address);
}