Add more comments to the code

This commit is contained in:
Zhe Wu 2021-09-10 14:51:55 -07:00
parent c0fbe5471f
commit 62197faa46
4 changed files with 36 additions and 41 deletions

View File

@ -389,8 +389,10 @@ public:
double INCOMPATIBLE_PEERS_LOGGING_INTERVAL; double INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
double VERSION_LAG_METRIC_INTERVAL; double VERSION_LAG_METRIC_INTERVAL;
int64_t MAX_VERSION_DIFFERENCE; int64_t MAX_VERSION_DIFFERENCE;
double INITIAL_UPDATE_CROSS_DC_INFO_DELAY; double INITIAL_UPDATE_CROSS_DC_INFO_DELAY; // The intial delay in a new Cluster Controller just started to refresh
double CHECK_REMOTE_HEALTH_INTERVAL; // the info of remote DC, such as remote DC version difference and
// remote DC health.
double CHECK_REMOTE_HEALTH_INTERVAL; // Remote DC health refresh interval.
double FORCE_RECOVERY_CHECK_DELAY; double FORCE_RECOVERY_CHECK_DELAY;
double RATEKEEPER_FAILURE_TIME; double RATEKEEPER_FAILURE_TIME;
double REPLACE_INTERFACE_DELAY; double REPLACE_INTERFACE_DELAY;
@ -413,7 +415,8 @@ public:
double CC_TRACKING_HEALTH_RECOVERY_INTERVAL; // The number of recovery count should not exceed double CC_TRACKING_HEALTH_RECOVERY_INTERVAL; // The number of recovery count should not exceed
// CC_MAX_HEALTH_RECOVERY_COUNT within // CC_MAX_HEALTH_RECOVERY_COUNT within
// CC_TRACKING_HEALTH_RECOVERY_INTERVAL. // CC_TRACKING_HEALTH_RECOVERY_INTERVAL.
int CC_MAX_HEALTH_RECOVERY_COUNT; int CC_MAX_HEALTH_RECOVERY_COUNT; // The max number of recoveries can be triggered due to worker health within
// CC_TRACKING_HEALTH_RECOVERY_INTERVAL
bool CC_HEALTH_TRIGGER_FAILOVER; // Whether to enable health triggered failover in CC. bool CC_HEALTH_TRIGGER_FAILOVER; // Whether to enable health triggered failover in CC.
int CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION; // The minimum number of degraded servers that can trigger a int CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION; // The minimum number of degraded servers that can trigger a
// failover. // failover.

View File

@ -2980,22 +2980,8 @@ public:
const ServerDBInfo dbi = db.serverInfo->get(); const ServerDBInfo dbi = db.serverInfo->get();
for (const auto& excludedServer : degradedServers) { for (const auto& excludedServer : degradedServers) {
for (auto& logSet : dbi.logSystemConfig.tLogs) { if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
if (logSet.isLocal || logSet.locality == tagLocalitySatellite) { return true;
continue;
}
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(excludedServer)) {
return true;
}
}
for (const auto& logRouter : logSet.logRouters) {
if (logRouter.present() && logRouter.interf().addresses().contains(excludedServer)) {
return true;
}
}
} }
} }
@ -3101,7 +3087,6 @@ public:
Optional<UID> recruitingDistributorID; Optional<UID> recruitingDistributorID;
bool remoteTransactionSystemDegraded; bool remoteTransactionSystemDegraded;
bool recruitingDistributor;
AsyncVar<bool> recruitRatekeeper; AsyncVar<bool> recruitRatekeeper;
Optional<UID> recruitingRatekeeperID; Optional<UID> recruitingRatekeeperID;
@ -3141,7 +3126,7 @@ public:
clusterControllerDcId(locality.dcId()), id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), clusterControllerDcId(locality.dcId()), id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()),
outstandingRemoteRequestChecker(Void()), startTime(now()), goodRecruitmentTime(Never()), outstandingRemoteRequestChecker(Void()), startTime(now()), goodRecruitmentTime(Never()),
goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false), goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false),
remoteTransactionSystemDegraded(false), recruitingDistributor(false), recruitRatekeeper(false), remoteTransactionSystemDegraded(false), recruitDistributor(false), recruitRatekeeper(false),
clusterControllerMetrics("ClusterController", id.toString()), clusterControllerMetrics("ClusterController", id.toString()),
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics), openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics), registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),
@ -5042,6 +5027,9 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
auto remoteDcId = self->db.config.regions[0].dcId == self->clusterControllerDcId.get() auto remoteDcId = self->db.config.regions[0].dcId == self->clusterControllerDcId.get()
? self->db.config.regions[1].dcId ? self->db.config.regions[1].dcId
: self->db.config.regions[0].dcId; : self->db.config.regions[0].dcId;
// Switch the current primary DC and remote DC in desiredDcIds, so that the remote DC becomes
// the new primary, and the primary DC becomes the new remote.
dcPriority.push_back(remoteDcId); dcPriority.push_back(remoteDcId);
dcPriority.push_back(self->clusterControllerDcId); dcPriority.push_back(self->clusterControllerDcId);
self->desiredDcIds.set(dcPriority); self->desiredDcIds.set(dcPriority);
@ -5505,18 +5493,19 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
NetworkAddress backup(IPAddress(0x06060606), 1); NetworkAddress backup(IPAddress(0x06060606), 1);
NetworkAddress proxy(IPAddress(0x07070707), 1); NetworkAddress proxy(IPAddress(0x07070707), 1);
NetworkAddress resolver(IPAddress(0x08080808), 1); NetworkAddress resolver(IPAddress(0x08080808), 1);
UID testUID(1, 2);
// Create a ServerDBInfo using above addresses. // Create a ServerDBInfo using above addresses.
ServerDBInfo testDbInfo; ServerDBInfo testDbInfo;
testDbInfo.master.changeCoordinators = testDbInfo.master.changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ master }, UID(1, 2))); RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ master }, testUID));
TLogInterface localTLogInterf; TLogInterface localTLogInterf;
localTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ tlog }, UID(1, 2))); localTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ tlog }, testUID));
TLogInterface localLogRouterInterf; TLogInterface localLogRouterInterf;
localLogRouterInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouter }, UID(1, 2))); localLogRouterInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouter }, testUID));
BackupInterface backupInterf; BackupInterface backupInterf;
backupInterf.waitFailure = RequestStream<ReplyPromise<Void>>(Endpoint({ backup }, UID(1, 2))); backupInterf.waitFailure = RequestStream<ReplyPromise<Void>>(Endpoint({ backup }, testUID));
TLogSet localTLogSet; TLogSet localTLogSet;
localTLogSet.isLocal = true; localTLogSet.isLocal = true;
localTLogSet.tLogs.push_back(OptionalInterface(localTLogInterf)); localTLogSet.tLogs.push_back(OptionalInterface(localTLogInterf));
@ -5525,7 +5514,7 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
testDbInfo.logSystemConfig.tLogs.push_back(localTLogSet); testDbInfo.logSystemConfig.tLogs.push_back(localTLogSet);
TLogInterface sateTLogInterf; TLogInterface sateTLogInterf;
sateTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTlog }, UID(1, 2))); sateTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTlog }, testUID));
TLogSet sateTLogSet; TLogSet sateTLogSet;
sateTLogSet.isLocal = true; sateTLogSet.isLocal = true;
sateTLogSet.locality = tagLocalitySatellite; sateTLogSet.locality = tagLocalitySatellite;
@ -5533,18 +5522,18 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
testDbInfo.logSystemConfig.tLogs.push_back(sateTLogSet); testDbInfo.logSystemConfig.tLogs.push_back(sateTLogSet);
TLogInterface remoteTLogInterf; TLogInterface remoteTLogInterf;
remoteTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTlog }, UID(1, 2))); remoteTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTlog }, testUID));
TLogSet remoteTLogSet; TLogSet remoteTLogSet;
remoteTLogSet.isLocal = false; remoteTLogSet.isLocal = false;
remoteTLogSet.tLogs.push_back(OptionalInterface(remoteTLogInterf)); remoteTLogSet.tLogs.push_back(OptionalInterface(remoteTLogInterf));
testDbInfo.logSystemConfig.tLogs.push_back(remoteTLogSet); testDbInfo.logSystemConfig.tLogs.push_back(remoteTLogSet);
GrvProxyInterface proxyInterf; GrvProxyInterface proxyInterf;
proxyInterf.getConsistentReadVersion = RequestStream<struct GetReadVersionRequest>(Endpoint({ proxy }, UID(1, 2))); proxyInterf.getConsistentReadVersion = RequestStream<struct GetReadVersionRequest>(Endpoint({ proxy }, testUID));
testDbInfo.client.grvProxies.push_back(proxyInterf); testDbInfo.client.grvProxies.push_back(proxyInterf);
ResolverInterface resolverInterf; ResolverInterface resolverInterf;
resolverInterf.resolve = RequestStream<struct ResolveTransactionBatchRequest>(Endpoint({ resolver }, UID(1, 2))); resolverInterf.resolve = RequestStream<struct ResolveTransactionBatchRequest>(Endpoint({ resolver }, testUID));
testDbInfo.resolvers.push_back(resolverInterf); testDbInfo.resolvers.push_back(resolverInterf);
testDbInfo.recoveryState = RecoveryState::ACCEPTING_COMMITS; testDbInfo.recoveryState = RecoveryState::ACCEPTING_COMMITS;
@ -5609,20 +5598,21 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
NetworkAddress proxy(IPAddress(0x07070707), 1); NetworkAddress proxy(IPAddress(0x07070707), 1);
NetworkAddress proxy2(IPAddress(0x08080808), 1); NetworkAddress proxy2(IPAddress(0x08080808), 1);
NetworkAddress resolver(IPAddress(0x09090909), 1); NetworkAddress resolver(IPAddress(0x09090909), 1);
UID testUID(1, 2);
data.db.config.usableRegions = 2; data.db.config.usableRegions = 2;
// Create a ServerDBInfo using above addresses. // Create a ServerDBInfo using above addresses.
ServerDBInfo testDbInfo; ServerDBInfo testDbInfo;
testDbInfo.master.changeCoordinators = testDbInfo.master.changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ master }, UID(1, 2))); RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ master }, testUID));
TLogInterface localTLogInterf; TLogInterface localTLogInterf;
localTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ tlog }, UID(1, 2))); localTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ tlog }, testUID));
TLogInterface localLogRouterInterf; TLogInterface localLogRouterInterf;
localLogRouterInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouter }, UID(1, 2))); localLogRouterInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ logRouter }, testUID));
BackupInterface backupInterf; BackupInterface backupInterf;
backupInterf.waitFailure = RequestStream<ReplyPromise<Void>>(Endpoint({ backup }, UID(1, 2))); backupInterf.waitFailure = RequestStream<ReplyPromise<Void>>(Endpoint({ backup }, testUID));
TLogSet localTLogSet; TLogSet localTLogSet;
localTLogSet.isLocal = true; localTLogSet.isLocal = true;
localTLogSet.tLogs.push_back(OptionalInterface(localTLogInterf)); localTLogSet.tLogs.push_back(OptionalInterface(localTLogInterf));
@ -5631,7 +5621,7 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
testDbInfo.logSystemConfig.tLogs.push_back(localTLogSet); testDbInfo.logSystemConfig.tLogs.push_back(localTLogSet);
TLogInterface sateTLogInterf; TLogInterface sateTLogInterf;
sateTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTlog }, UID(1, 2))); sateTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTlog }, testUID));
TLogSet sateTLogSet; TLogSet sateTLogSet;
sateTLogSet.isLocal = true; sateTLogSet.isLocal = true;
sateTLogSet.locality = tagLocalitySatellite; sateTLogSet.locality = tagLocalitySatellite;
@ -5639,23 +5629,22 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
testDbInfo.logSystemConfig.tLogs.push_back(sateTLogSet); testDbInfo.logSystemConfig.tLogs.push_back(sateTLogSet);
TLogInterface remoteTLogInterf; TLogInterface remoteTLogInterf;
remoteTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTlog }, UID(1, 2))); remoteTLogInterf.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTlog }, testUID));
TLogSet remoteTLogSet; TLogSet remoteTLogSet;
remoteTLogSet.isLocal = false; remoteTLogSet.isLocal = false;
remoteTLogSet.tLogs.push_back(OptionalInterface(remoteTLogInterf)); remoteTLogSet.tLogs.push_back(OptionalInterface(remoteTLogInterf));
testDbInfo.logSystemConfig.tLogs.push_back(remoteTLogSet); testDbInfo.logSystemConfig.tLogs.push_back(remoteTLogSet);
GrvProxyInterface grvProxyInterf; GrvProxyInterface grvProxyInterf;
grvProxyInterf.getConsistentReadVersion = grvProxyInterf.getConsistentReadVersion = RequestStream<struct GetReadVersionRequest>(Endpoint({ proxy }, testUID));
RequestStream<struct GetReadVersionRequest>(Endpoint({ proxy }, UID(1, 2)));
testDbInfo.client.grvProxies.push_back(grvProxyInterf); testDbInfo.client.grvProxies.push_back(grvProxyInterf);
CommitProxyInterface commitProxyInterf; CommitProxyInterface commitProxyInterf;
commitProxyInterf.commit = RequestStream<struct CommitTransactionRequest>(Endpoint({ proxy2 }, UID(1, 2))); commitProxyInterf.commit = RequestStream<struct CommitTransactionRequest>(Endpoint({ proxy2 }, testUID));
testDbInfo.client.commitProxies.push_back(commitProxyInterf); testDbInfo.client.commitProxies.push_back(commitProxyInterf);
ResolverInterface resolverInterf; ResolverInterface resolverInterf;
resolverInterf.resolve = RequestStream<struct ResolveTransactionBatchRequest>(Endpoint({ resolver }, UID(1, 2))); resolverInterf.resolve = RequestStream<struct ResolveTransactionBatchRequest>(Endpoint({ resolver }, testUID));
testDbInfo.resolvers.push_back(resolverInterf); testDbInfo.resolvers.push_back(resolverInterf);
testDbInfo.recoveryState = RecoveryState::ACCEPTING_COMMITS; testDbInfo.recoveryState = RecoveryState::ACCEPTING_COMMITS;
@ -5669,7 +5658,7 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers()); ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradedServers.clear(); data.degradedServers.clear();
// Trigger failover when enough is degraded. // Trigger failover when enough servers in the txn system are degraded.
data.degradedServers.insert(master); data.degradedServers.insert(master);
data.degradedServers.insert(tlog); data.degradedServers.insert(tlog);
data.degradedServers.insert(proxy); data.degradedServers.insert(proxy);

View File

@ -966,6 +966,10 @@ ACTOR Future<Void> backupWorker(BackupInterface bi,
Reference<AsyncVar<ServerDBInfo> const> db); Reference<AsyncVar<ServerDBInfo> const> db);
void registerThreadForProfiling(); void registerThreadForProfiling();
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's remote DC.
bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
void updateCpuProfiler(ProfilerRequest req); void updateCpuProfiler(ProfilerRequest req);
namespace oldTLog_4_6 { namespace oldTLog_4_6 {

View File

@ -723,7 +723,6 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
} // namespace } // namespace
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's remote DC.
bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) { bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
const auto& dbi = dbInfo->get(); const auto& dbi = dbInfo->get();