Add documentation
This commit is contained in:
parent
85035c9269
commit
2078a0055a
|
@ -104,6 +104,9 @@ bool ClusterControllerData::transactionSystemContainsDegradedServers() {
|
|||
if (dbi.master.addresses().contains(server)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Getting the current log system config to check if any TLogs are degraded. recoveryData->logSystem has the
|
||||
// most up to date log system and we should use it whenever available.
|
||||
auto logSystemConfig =
|
||||
recoveryData->logSystem.isValid() ? recoveryData->logSystem->getLogSystemConfig() : dbi.logSystemConfig;
|
||||
for (const auto& logSet : logSystemConfig.tLogs) {
|
||||
|
@ -132,14 +135,20 @@ bool ClusterControllerData::transactionSystemContainsDegradedServers() {
|
|||
}
|
||||
|
||||
if (recoveryData->recoveryState < RecoveryState::ACCEPTING_COMMITS) {
|
||||
for (const auto& tlog : recoveryData->recruitment.tLogs) {
|
||||
// During recovery, TLogs may not be able to pull data from previous generation TLogs due to gray
|
||||
// failures. In this case, we rely on the latest recruitment information and see if any newly recruited
|
||||
// TLogs are degraded.
|
||||
for (const auto& tlog : recoveryData->primaryRecruitment.tLogs) {
|
||||
if (tlog.addresses().contains(server)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for (const auto& satelliteLog : recoveryData->recruitment.satelliteTLogs) {
|
||||
if (satelliteLog.addresses().contains(server)) {
|
||||
return true;
|
||||
|
||||
if (!skipSatellite) {
|
||||
for (const auto& satelliteLog : recoveryData->primaryRecruitment.satelliteTLogs) {
|
||||
if (satelliteLog.addresses().contains(server)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -174,6 +183,26 @@ bool ClusterControllerData::transactionSystemContainsDegradedServers() {
|
|||
/*skipRemote=*/!SERVER_KNOBS->CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING);
|
||||
}
|
||||
|
||||
bool ClusterControllerData::remoteTransactionSystemContainsDegradedServers() {
|
||||
if (db.config.usableRegions <= 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const auto& excludedServer : degradationInfo.degradedServers) {
|
||||
if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& excludedServer : degradationInfo.disconnectedServers) {
|
||||
if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
|
||||
ClusterControllerData::DBInfo* db,
|
||||
ServerCoordinators coordinators) {
|
||||
|
@ -2912,6 +2941,8 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
|
|||
}
|
||||
|
||||
if (hasRecoveredServer) {
|
||||
// The best transaction system might have changed after a server is recovered. Check outstanding request
|
||||
// and check if a better transaction system exists.
|
||||
checkOutstandingRequests(self);
|
||||
}
|
||||
|
||||
|
|
|
@ -1015,7 +1015,9 @@ ACTOR Future<std::vector<Standalone<CommitTransactionRef>>> recruitEverything(
|
|||
}
|
||||
self->backupWorkers.swap(recruits.backupWorkers);
|
||||
|
||||
self->recruitment = recruits;
|
||||
// Store recruitment result, which may be used to check the current being recruited transaction system in gray
|
||||
// failure detection.
|
||||
self->primaryRecruitment = recruits;
|
||||
|
||||
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), self->dbgid)
|
||||
.detail("StatusCode", RecoveryStatus::initializing_transaction_servers)
|
||||
|
|
|
@ -375,6 +375,7 @@ struct TLogData : NonCopyable {
|
|||
// and ends when the data is flushed and durable.
|
||||
Reference<Histogram> timeUntilDurableDist;
|
||||
|
||||
// Controls whether the health monitoring running in this TLog force checking any other processes are degraded.
|
||||
Reference<AsyncVar<bool>> enablePrimaryTxnSystemHealthCheck;
|
||||
|
||||
TLogData(UID dbgid,
|
||||
|
@ -3627,12 +3628,16 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
|
|||
logData->logRouterPopToVersion = recoverAt;
|
||||
std::vector<Tag> tags;
|
||||
tags.push_back(logData->remoteTag);
|
||||
|
||||
// Force gray failure monitoring during recovery.
|
||||
self->enablePrimaryTxnSystemHealthCheck->set(true);
|
||||
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, recoverAt, true) ||
|
||||
logData->removed || logData->stopCommit.onTrigger());
|
||||
self->enablePrimaryTxnSystemHealthCheck->set(false);
|
||||
} else if (!req.recoverTags.empty()) {
|
||||
ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion);
|
||||
|
||||
// Force gray failure monitoring during recovery.
|
||||
self->enablePrimaryTxnSystemHealthCheck->set(true);
|
||||
wait(pullAsyncData(
|
||||
self, logData, req.recoverTags, req.knownCommittedVersion + 1, recoverAt, false) ||
|
||||
|
|
|
@ -3198,25 +3198,7 @@ public:
|
|||
|
||||
// Whether transaction system in the remote DC, e.g. log router and tlogs in the remote DC, contains degraded
|
||||
// servers.
|
||||
bool remoteTransactionSystemContainsDegradedServers() {
|
||||
if (db.config.usableRegions <= 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const auto& excludedServer : degradationInfo.degradedServers) {
|
||||
if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& excludedServer : degradationInfo.disconnectedServers) {
|
||||
if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
bool remoteTransactionSystemContainsDegradedServers();
|
||||
|
||||
// Returns true if remote DC is healthy and can failover to.
|
||||
bool remoteDCIsHealthy() {
|
||||
|
|
|
@ -206,7 +206,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
|
|||
std::map<Optional<Value>, int8_t> dcId_locality;
|
||||
std::vector<Tag> allTags;
|
||||
|
||||
RecruitFromConfigurationReply recruitment;
|
||||
RecruitFromConfigurationReply primaryRecruitment;
|
||||
|
||||
int8_t getNextLocality() {
|
||||
int8_t maxLocality = -1;
|
||||
|
|
|
@ -1273,6 +1273,8 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
|
|||
}
|
||||
} else if (enablePrimaryTxnSystemHealthCheck->get() &&
|
||||
(addressInDbAndPrimaryDc(address, dbInfo) || addressInDbAndPrimarySatelliteDc(address, dbInfo))) {
|
||||
// For force checking, we only detect connection timeout. Currently this should only be used during recovery
|
||||
// and only used in TLogs.
|
||||
if (peer->connectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT) {
|
||||
TraceEvent("HealthMonitorDetectDegradedPeer")
|
||||
.detail("WorkerLocation", workerLocation)
|
||||
|
@ -2130,6 +2132,9 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
|
||||
|
||||
state Future<Void> updateClusterIdFuture;
|
||||
|
||||
// When set to true, the health monitor running in this worker starts monitor other transaction process in this
|
||||
// cluster.
|
||||
state Reference<AsyncVar<bool>> enablePrimaryTxnSystemHealthCheck = makeReference<AsyncVar<bool>>(false);
|
||||
|
||||
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
|
||||
|
|
|
@ -176,11 +176,13 @@ struct ClogTlogWorkload : TestWorkload {
|
|||
|
||||
state bool useGrayFailureToRecover = false;
|
||||
if (deterministicRandom()->coinflip() && self->useDisconnection) {
|
||||
// Use gray failure instead of exclusion to recover the cluster.
|
||||
TraceEvent("ClogTlogUseGrayFailreToRecover").log();
|
||||
useGrayFailureToRecover = true;
|
||||
}
|
||||
|
||||
// start exclusion and wait for fully recovery
|
||||
// start exclusion and wait for fully recovery. When using gray failure, the cluster should recover by itself
|
||||
// eventually.
|
||||
state Future<Void> excludeLog = useGrayFailureToRecover ? Never() : excludeFailedLog(self, cx);
|
||||
state Future<Void> onChange = self->dbInfo->onChange();
|
||||
loop choose {
|
||||
|
|
Loading…
Reference in New Issue