Merge remote-tracking branch 'origin/main' into features/private-request-streams

This commit is contained in:
Markus Pilman 2022-04-08 09:58:56 -06:00
commit 7631d299bf
12 changed files with 328 additions and 140 deletions

View File

@ -190,6 +190,11 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
case ConfigurationResult::DATABASE_CREATED:
printf("Database created\n");
break;
case ConfigurationResult::DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL:
printf("Database created\n");
fprintf(stderr,
"WARN: RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::DATABASE_UNAVAILABLE:
fprintf(stderr, "ERROR: The database is unavailable\n");
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
@ -250,6 +255,11 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
"storage_migration_type=gradual' to set the gradual migration type.\n");
ret = false;
break;
case ConfigurationResult::SUCCESS_WARN_ROCKSDB_EXPERIMENTAL:
printf("Configuration changed\n");
fprintf(stderr,
"WARN: RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
default:
ASSERT(false);
ret = false;

View File

@ -65,6 +65,8 @@ enum class ConfigurationResult {
LOCKED_NOT_NEW,
SUCCESS_WARN_PPW_GRADUAL,
SUCCESS,
SUCCESS_WARN_ROCKSDB_EXPERIMENTAL,
DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL,
};
enum class CoordinatorsResult {
@ -290,6 +292,7 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
state bool oldReplicationUsesDcId = false;
state bool warnPPWGradual = false;
state bool warnChangeStorageNoMigrate = false;
state bool warnRocksDBIsExperimental = false;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -477,6 +480,9 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
} else if (newConfig.storageMigrationType == StorageMigrationType::GRADUAL &&
newConfig.perpetualStorageWiggleSpeed == 0) {
warnPPWGradual = true;
} else if (newConfig.storageServerStoreType != oldConfig.storageServerStoreType &&
newConfig.storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
warnRocksDBIsExperimental = true;
}
}
}
@ -525,6 +531,9 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
Optional<Value> v = wait(safeThreadFutureToFuture(vF));
if (v != m[initIdKey.toString()])
return ConfigurationResult::DATABASE_ALREADY_CREATED;
else if (m[configKeysPrefix.toString() + "storage_engine"] ==
std::to_string(KeyValueStoreType::SSD_ROCKSDB_V1))
return ConfigurationResult::DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL;
else
return ConfigurationResult::DATABASE_CREATED;
} catch (Error& e2) {
@ -538,6 +547,8 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
if (warnPPWGradual) {
return ConfigurationResult::SUCCESS_WARN_PPW_GRADUAL;
} else if (warnRocksDBIsExperimental) {
return ConfigurationResult::SUCCESS_WARN_ROCKSDB_EXPERIMENTAL;
} else {
return ConfigurationResult::SUCCESS;
}

View File

@ -1536,7 +1536,7 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion
.detail("OldProtocolVersion", dbProtocolVersion);
// When the protocol version changes, clear the corresponding entry in the shared state map
// so it can be re-initialized. Only do so if there was a valid previous protocol version.
if (dbProtocolVersion.present()) {
if (dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) {
MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterFilePath);
}
@ -2333,9 +2333,14 @@ ThreadFuture<Void> MultiVersionApi::updateClusterSharedStateMap(std::string clus
void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePath) {
MutexHolder holder(lock);
auto ssPtr = clusterSharedStateMap[clusterFilePath].get();
auto mapEntry = clusterSharedStateMap.find(clusterFilePath);
if (mapEntry == clusterSharedStateMap.end()) {
TraceEvent(SevError, "ClusterSharedStateMapEntryNotFound").detail("ClusterFilePath", clusterFilePath);
return;
}
auto ssPtr = mapEntry->second.get();
ssPtr->delRef(ssPtr);
clusterSharedStateMap.erase(clusterFilePath);
clusterSharedStateMap.erase(mapEntry);
}
std::vector<std::string> parseOptionValues(std::string valueStr) {

View File

@ -527,6 +527,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CC_HEALTH_TRIGGER_FAILOVER, false );
init( CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION, 5 );
init( CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION, 10 );
init( CC_ENABLE_ENTIRE_SATELLITE_MONITORING, false );
init( CC_SATELLITE_DEGRADATION_MIN_COMPLAINER, 3 );
init( CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER, 3 );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );
@ -717,6 +720,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PEER_LATENCY_CHECK_MIN_POPULATION, 30 );
init( PEER_LATENCY_DEGRADATION_PERCENTILE, 0.90 );
init( PEER_LATENCY_DEGRADATION_THRESHOLD, 0.05 );
init( PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE, 0.90 );
init( PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE, 0.1 );
init( PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD, 0.1 );
init( PEER_DEGRADATION_CONNECTION_FAILURE_COUNT, 1 );
init( WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER, true );

View File

@ -464,6 +464,14 @@ public:
// failover.
int CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION; // The maximum number of degraded servers that can trigger a
// failover.
bool CC_ENABLE_ENTIRE_SATELLITE_MONITORING; // When enabled, gray failure tries to detect whether the entire
// satellite DC is degraded.
int CC_SATELLITE_DEGRADATION_MIN_COMPLAINER; // When the network between primary and satellite becomes bad, all the
// workers in primary may have bad network talking to the satellite.
// This is the minimum amount of complainer for a satellite worker to
// be determined as degraded worker.
int CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER; // The minimum amount of degraded server in satellite DC to be
// determined as degraded satellite.
// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)
@ -657,8 +665,12 @@ public:
bool ENABLE_WORKER_HEALTH_MONITOR;
double WORKER_HEALTH_MONITOR_INTERVAL; // Interval between two health monitor health check.
int PEER_LATENCY_CHECK_MIN_POPULATION; // The minimum number of latency samples required to check a peer.
double PEER_LATENCY_DEGRADATION_PERCENTILE; // The percentile latency used to check peer health.
double PEER_LATENCY_DEGRADATION_PERCENTILE; // The percentile latency used to check peer health among workers inside
// primary or remote DC.
double PEER_LATENCY_DEGRADATION_THRESHOLD; // The latency threshold to consider a peer degraded.
double PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE; // The percentile latency used to check peer health between
// primary and primary satellite.
double PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE; // The latency threshold to consider a peer degraded.
double PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD; // The percentage of timeout to consider a peer degraded.
int PEER_DEGRADATION_CONNECTION_FAILURE_COUNT; // The number of connection failures experienced during measurement
// period to consider a peer degraded.

View File

@ -35,11 +35,10 @@ void HealthMonitor::purgeOutdatedHistory() {
auto& count = peerClosedNum[p.second];
--count;
ASSERT(count >= 0);
peerClosedHistory.pop_front();
if (count == 0) {
peerClosedNum.erase(p.second);
}
peerClosedHistory.pop_front();
} else {
break;
}

View File

@ -2410,24 +2410,26 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
wait(lowPriorityDelay(SERVER_KNOBS->CC_WORKER_HEALTH_CHECKING_INTERVAL));
}
self->degradedServers = self->getServersWithDegradedLink();
self->degradationInfo = self->getDegradationInfo();
// Compare `self->degradedServers` with `self->excludedDegradedServers` and remove those that have
// recovered.
for (auto it = self->excludedDegradedServers.begin(); it != self->excludedDegradedServers.end();) {
if (self->degradedServers.find(*it) == self->degradedServers.end()) {
if (self->degradationInfo.degradedServers.find(*it) == self->degradationInfo.degradedServers.end()) {
self->excludedDegradedServers.erase(it++);
} else {
++it;
}
}
if (!self->degradedServers.empty()) {
if (!self->degradationInfo.degradedServers.empty() || self->degradationInfo.degradedSatellite) {
std::string degradedServerString;
for (const auto& server : self->degradedServers) {
for (const auto& server : self->degradationInfo.degradedServers) {
degradedServerString += server.toString() + " ";
}
TraceEvent("ClusterControllerHealthMonitor").detail("DegradedServers", degradedServerString);
TraceEvent("ClusterControllerHealthMonitor")
.detail("DegradedServers", degradedServerString)
.detail("DegradedSatellite", self->degradationInfo.degradedSatellite);
// Check if the cluster controller should trigger a recovery to exclude any degraded servers from
// the transaction system.
@ -2435,7 +2437,7 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
if (SERVER_KNOBS->CC_HEALTH_TRIGGER_RECOVERY) {
if (self->recentRecoveryCountDueToHealth() < SERVER_KNOBS->CC_MAX_HEALTH_RECOVERY_COUNT) {
self->recentHealthTriggeredRecoveryTime.push(now());
self->excludedDegradedServers = self->degradedServers;
self->excludedDegradedServers = self->degradationInfo.degradedServers;
TraceEvent("DegradedServerDetectedAndTriggerRecovery")
.detail("RecentRecoveryCountDueToHealth", self->recentRecoveryCountDueToHealth());
self->db.forceMasterFailure.trigger();
@ -2784,7 +2786,7 @@ TEST_CASE("/fdbserver/clustercontroller/updateRecoveredWorkers") {
return Void();
}
TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
@ -2800,18 +2802,18 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
// cluster controller.
{
data.workerHealth[worker].degradedPeers[badPeer1] = { now(), now() };
ASSERT(data.getServersWithDegradedLink().empty());
ASSERT(data.getDegradationInfo().degradedServers.empty());
data.workerHealth.clear();
}
// Test that when there is only one reported degraded link, getServersWithDegradedLink can return correct
// Test that when there is only one reported degraded link, getDegradationInfo can return correct
// degraded server.
{
data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
auto degradedServers = data.getServersWithDegradedLink();
ASSERT(degradedServers.size() == 1);
ASSERT(degradedServers.find(badPeer1) != degradedServers.end());
auto degradationInfo = data.getDegradationInfo();
ASSERT(degradationInfo.degradedServers.size() == 1);
ASSERT(degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end());
data.workerHealth.clear();
}
@ -2821,10 +2823,10 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
now() };
data.workerHealth[badPeer1].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
auto degradedServers = data.getServersWithDegradedLink();
ASSERT(degradedServers.size() == 1);
ASSERT(degradedServers.find(worker) != degradedServers.end() ||
degradedServers.find(badPeer1) != degradedServers.end());
auto degradationInfo = data.getDegradationInfo();
ASSERT(degradationInfo.degradedServers.size() == 1);
ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end() ||
degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end());
data.workerHealth.clear();
}
@ -2839,9 +2841,9 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
now() };
data.workerHealth[badPeer2].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
auto degradedServers = data.getServersWithDegradedLink();
ASSERT(degradedServers.size() == 1);
ASSERT(degradedServers.find(worker) != degradedServers.end());
auto degradationInfo = data.getDegradationInfo();
ASSERT(degradationInfo.degradedServers.size() == 1);
ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end());
data.workerHealth.clear();
}
@ -2856,7 +2858,7 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
now() };
data.workerHealth[badPeer4].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
ASSERT(data.getServersWithDegradedLink().empty());
ASSERT(data.getDegradationInfo().degradedServers.empty());
data.workerHealth.clear();
}
@ -2880,7 +2882,7 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
now() };
data.workerHealth[badPeer4].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
ASSERT(data.getServersWithDegradedLink().empty());
ASSERT(data.getDegradationInfo().degradedServers.empty());
data.workerHealth.clear();
}
@ -2978,42 +2980,42 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
// Trigger recovery when master is degraded.
data.degradedServers.insert(master);
data.degradationInfo.degradedServers.insert(master);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// Trigger recovery when primary TLog is degraded.
data.degradedServers.insert(tlog);
data.degradationInfo.degradedServers.insert(tlog);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// No recovery when satellite Tlog is degraded.
data.degradedServers.insert(satelliteTlog);
data.degradationInfo.degradedServers.insert(satelliteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// No recovery when remote tlog is degraded.
data.degradedServers.insert(remoteTlog);
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// No recovery when log router is degraded.
data.degradedServers.insert(logRouter);
data.degradationInfo.degradedServers.insert(logRouter);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// No recovery when backup worker is degraded.
data.degradedServers.insert(backup);
data.degradationInfo.degradedServers.insert(backup);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// Trigger recovery when proxy is degraded.
data.degradedServers.insert(proxy);
data.degradationInfo.degradedServers.insert(proxy);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// Trigger recovery when resolver is degraded.
data.degradedServers.insert(resolver);
data.degradationInfo.degradedServers.insert(resolver);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
return Void();
@ -3092,16 +3094,16 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
// No failover when small number of degraded servers
data.degradedServers.insert(master);
data.degradationInfo.degradedServers.insert(master);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// Trigger failover when enough servers in the txn system are degraded.
data.degradedServers.insert(master);
data.degradedServers.insert(tlog);
data.degradedServers.insert(proxy);
data.degradedServers.insert(proxy2);
data.degradedServers.insert(resolver);
data.degradationInfo.degradedServers.insert(master);
data.degradationInfo.degradedServers.insert(tlog);
data.degradationInfo.degradedServers.insert(proxy);
data.degradationInfo.degradedServers.insert(proxy2);
data.degradationInfo.degradedServers.insert(resolver);
ASSERT(data.shouldTriggerFailoverDueToDegradedServers());
// No failover when usable region is 1.
@ -3110,18 +3112,29 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
data.db.config.usableRegions = 2;
// No failover when remote is also degraded.
data.degradedServers.insert(remoteTlog);
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// No failover when some are not from transaction system
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 2));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 3));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 4));
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 5));
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1));
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 2));
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 3));
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 4));
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 5));
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradedServers.clear();
data.degradationInfo.degradedServers.clear();
// Trigger failover when satellite is degraded.
data.degradationInfo.degradedSatellite = true;
ASSERT(data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
// No failover when satellite is degraded, but remote is not healthy.
data.degradationInfo.degradedSatellite = true;
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
return Void();
}

View File

@ -2981,9 +2981,16 @@ public:
}
}
struct DegradationInfo {
std::unordered_set<NetworkAddress>
degradedServers; // The servers that the cluster controller is considered as degraded. The servers in this
// list are not excluded unless they are added to `excludedDegradedServers`.
bool degradedSatellite = false; // Indicates that the entire satellite DC is degraded.
};
// Returns a list of servers who are experiencing degraded links. These are candidates to perform exclusion. Note
// that only one endpoint of a bad link will be included in this list.
std::unordered_set<NetworkAddress> getServersWithDegradedLink() {
DegradationInfo getDegradationInfo() {
updateRecoveredWorkers();
// Build a map keyed by measured degraded peer. This map gives the info that who complains a particular server.
@ -3014,7 +3021,11 @@ public:
//
// For example, if server A is already considered as a degraded server, and A complains B, we won't add B as
// degraded since A is already considered as degraded.
//
// In the meantime, we also count the number of satellite workers got complained. If enough number of satellite
// workers are degraded, this may indicates that the whole network between primary and satellite is bad.
std::unordered_set<NetworkAddress> currentDegradedServers;
int satelliteBadServerCount = 0;
for (const auto& [complainerCount, badServer] : count2DegradedPeer) {
for (const auto& complainer : degradedLinkDst2Src[badServer]) {
if (currentDegradedServers.find(complainer) == currentDegradedServers.end()) {
@ -3022,23 +3033,36 @@ public:
break;
}
}
if (SERVER_KNOBS->CC_ENABLE_ENTIRE_SATELLITE_MONITORING &&
addressInDbAndPrimarySatelliteDc(badServer, db.serverInfo) &&
complainerCount >= SERVER_KNOBS->CC_SATELLITE_DEGRADATION_MIN_COMPLAINER) {
++satelliteBadServerCount;
}
}
// For degraded server that are complained by more than SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE, we
// don't know if it is a hot server, or the network is bad. We remove from the returned degraded server list.
std::unordered_set<NetworkAddress> currentDegradedServersWithinLimit;
DegradationInfo currentDegradationInfo;
for (const auto& badServer : currentDegradedServers) {
if (degradedLinkDst2Src[badServer].size() <= SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE) {
currentDegradedServersWithinLimit.insert(badServer);
currentDegradationInfo.degradedServers.insert(badServer);
}
}
return currentDegradedServersWithinLimit;
// If enough number of satellite workers are bad, we mark the entire satellite is bad. Note that this needs to
// be used with caution (controlled by CC_ENABLE_ENTIRE_SATELLITE_MONITORING knob), since the slow workers may
// also be caused by workload.
if (satelliteBadServerCount >= SERVER_KNOBS->CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER) {
currentDegradationInfo.degradedSatellite = true;
}
return currentDegradationInfo;
}
// Whether the transaction system (in primary DC if in HA setting) contains degraded servers.
bool transactionSystemContainsDegradedServers() {
const ServerDBInfo dbi = db.serverInfo->get();
for (const auto& excludedServer : degradedServers) {
for (const auto& excludedServer : degradationInfo.degradedServers) {
if (dbi.master.addresses().contains(excludedServer)) {
return true;
}
@ -3083,7 +3107,7 @@ public:
return false;
}
for (const auto& excludedServer : degradedServers) {
for (const auto& excludedServer : degradationInfo.degradedServers) {
if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
return true;
}
@ -3121,7 +3145,7 @@ public:
// Returns true when the cluster controller should trigger a recovery due to degraded servers used in the
// transaction system in the primary data center.
bool shouldTriggerRecoveryDueToDegradedServers() {
if (degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
if (degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
return false;
}
@ -3154,8 +3178,14 @@ public:
return false;
}
if (degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION ||
degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) {
bool remoteIsHealthy = !remoteTransactionSystemContainsDegradedServers();
if (degradationInfo.degradedSatellite && remoteIsHealthy) {
// If the satellite DC is bad, a failover is desired despite the number of degraded servers.
return true;
}
if (degradationInfo.degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION ||
degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) {
return false;
}
@ -3165,7 +3195,7 @@ public:
return false;
}
return transactionSystemContainsDegradedServers() && !remoteTransactionSystemContainsDegradedServers();
return transactionSystemContainsDegradedServers() && remoteIsHealthy;
}
int recentRecoveryCountDueToHealth() {
@ -3248,9 +3278,7 @@ public:
// TODO(zhewu): Include disk and CPU signals.
};
std::unordered_map<NetworkAddress, WorkerHealth> workerHealth;
std::unordered_set<NetworkAddress>
degradedServers; // The servers that the cluster controller is considered as degraded. The servers in this list
// are not excluded unless they are added to `excludedDegradedServers`.
DegradationInfo degradationInfo;
std::unordered_set<NetworkAddress>
excludedDegradedServers; // The degraded servers to be excluded when assigning workers to roles.
std::queue<double> recentHealthTriggeredRecoveryTime;

View File

@ -1142,8 +1142,19 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
int8_t tagLocality = inputTag.locality;
if (isPseudoLocality(tagLocality)) {
if (logData->logSystem->get().isValid()) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(inputTag, to);
tagLocality = tagLocalityLogRouter;
// if the configuration change from multi-region to single region mode, the delayed pop created during
// multi-region stage should be skipped. Same thing applies to the backup worker
if (isPseudoLocality(inputTag.locality) &&
logData->logSystem->get()->hasPseudoLocality(inputTag.locality)) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(inputTag, to);
tagLocality = tagLocalityLogRouter;
} else {
ASSERT_WE_THINK(tagLocality == tagLocalityLogRouterMapped);
TraceEvent(SevWarn, "TLogPopNoPseudoLocality", self->dbgid)
.detail("Locality", tagLocality)
.detail("Version", upTo);
return Void();
}
} else {
TraceEvent(SevWarn, "TLogPopNoLogSystem", self->dbgid)
.detail("Locality", tagLocality)

View File

@ -1142,6 +1142,10 @@ ACTOR Future<Void> backupWorker(BackupInterface bi,
void registerThreadForProfiling();
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's primary
// satellite DC.
bool addressInDbAndPrimarySatelliteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's remote DC.
bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo);

View File

@ -778,6 +778,82 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
} // namespace
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's primary
// satellite DC.
bool addressInDbAndPrimarySatelliteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
for (const auto& logSet : dbInfo->get().logSystemConfig.tLogs) {
if (logSet.isLocal && logSet.locality == tagLocalitySatellite) {
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(address)) {
return true;
}
}
}
}
return false;
}
bool addressesInDbAndPrimarySatelliteDc(const NetworkAddressList& addresses,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
return addressInDbAndPrimarySatelliteDc(addresses.address, dbInfo) ||
(addresses.secondaryAddress.present() &&
addressInDbAndPrimarySatelliteDc(addresses.secondaryAddress.get(), dbInfo));
}
namespace {
TEST_CASE("/fdbserver/worker/addressInDbAndPrimarySatelliteDc") {
// Setup a ServerDBInfo for test.
ServerDBInfo testDbInfo;
LocalityData testLocal;
testLocal.set(LiteralStringRef("dcid"), StringRef(std::to_string(1)));
testDbInfo.master.locality = testLocal;
// First, create an empty TLogInterface, and check that it shouldn't be considered as in satellite DC.
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().isLocal = true;
testDbInfo.logSystemConfig.tLogs.back().locality = tagLocalitySatellite;
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface<TLogInterface>());
ASSERT(!addressInDbAndPrimarySatelliteDc(g_network->getLocalAddress(),
makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a satellite tlog, and it should be considered as in primary satellite DC.
NetworkAddress satelliteTLogAddress(IPAddress(0x13131313), 1);
TLogInterface satelliteTLog(testLocal);
satelliteTLog.initEndpoints();
satelliteTLog.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTLogAddress }, UID(1, 2)));
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(satelliteTLog));
ASSERT(addressInDbAndPrimarySatelliteDc(satelliteTLogAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a primary TLog, and it shouldn't be considered as in primary Satellite DC.
NetworkAddress primaryTLogAddress(IPAddress(0x26262626), 1);
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().isLocal = true;
TLogInterface primaryTLog(testLocal);
primaryTLog.initEndpoints();
primaryTLog.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ primaryTLogAddress }, UID(1, 2)));
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(primaryTLog));
ASSERT(!addressInDbAndPrimarySatelliteDc(primaryTLogAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a remote TLog, and it should be considered as in remote DC.
NetworkAddress remoteTLogAddress(IPAddress(0x37373737), 1);
LocalityData fakeRemote;
fakeRemote.set(LiteralStringRef("dcid"), StringRef(std::to_string(2)));
TLogInterface remoteTLog(fakeRemote);
remoteTLog.initEndpoints();
remoteTLog.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTLogAddress }, UID(1, 2)));
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().isLocal = false;
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(remoteTLog));
ASSERT(!addressInDbAndPrimarySatelliteDc(remoteTLogAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
return Void();
}
} // namespace
bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
const auto& dbi = dbInfo->get();
@ -872,17 +948,15 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
const auto& allPeers = FlowTransport::transport().getAllPeers();
UpdateWorkerHealthRequest req;
bool workerInDb = false;
bool workerInPrimary = false;
enum WorkerLocation { None, Primary, Remote };
WorkerLocation workerLocation = None;
if (addressesInDbAndPrimaryDc(interf.addresses(), dbInfo)) {
workerInDb = true;
workerInPrimary = true;
workerLocation = Primary;
} else if (addressesInDbAndRemoteDc(interf.addresses(), dbInfo)) {
workerInDb = true;
workerInPrimary = false;
workerLocation = Remote;
}
if (workerInDb) {
if (workerLocation != None) {
for (const auto& [address, peer] : allPeers) {
if (peer->connectFailedCount == 0 &&
peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
@ -895,37 +969,50 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
// last ping latencies logged.
continue;
}
if ((workerInPrimary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(!workerInPrimary && addressInDbAndRemoteDc(address, dbInfo))) {
// Only monitoring the servers that in the primary or remote DC's transaction systems.
// Note that currently we are not monitor storage servers, since lagging in storage servers
// today already can trigger server exclusion by data distributor.
bool degradedPeer = false;
if ((workerLocation == Primary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(workerLocation == Remote && addressInDbAndRemoteDc(address, dbInfo))) {
// Monitors intra DC latencies between servers that in the primary or remote DC's transaction
// systems. Note that currently we are not monitor storage servers, since lagging in storage
// servers today already can trigger server exclusion by data distributor.
if (peer->connectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT ||
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) >
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD ||
peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) >
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
// This is a degraded peer.
TraceEvent("HealthMonitorDetectDegradedPeer")
.suppressFor(30)
.detail("Peer", address)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)
.detail(
"CheckedPercentileLatency",
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE))
.detail("PingCount", peer->pingLatencies.getPopulationSize())
.detail("PingTimeoutCount", peer->timeoutCount)
.detail("ConnectionFailureCount", peer->connectFailedCount);
req.degradedPeers.push_back(address);
degradedPeer = true;
}
} else if (workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo)) {
// Monitors inter DC latencies between servers in primary and primary satellite DC. Note that
// TLog workers in primary satellite DC are on the critical path of serving a commit.
if (peer->connectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT ||
peer->pingLatencies.percentile(
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE) >
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE ||
peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) >
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
degradedPeer = true;
}
}
if (degradedPeer) {
TraceEvent("HealthMonitorDetectDegradedPeer")
.suppressFor(30)
.detail("Peer", address)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)
.detail("CheckedPercentileLatency",
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE))
.detail("PingCount", peer->pingLatencies.getPopulationSize())
.detail("PingTimeoutCount", peer->timeoutCount)
.detail("ConnectionFailureCount", peer->connectFailedCount);
req.degradedPeers.push_back(address);
}
}
@ -941,8 +1028,9 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
continue;
}
if ((workerInPrimary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(!workerInPrimary && addressInDbAndRemoteDc(address, dbInfo))) {
if ((workerLocation == Primary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(workerLocation == Remote && addressInDbAndRemoteDc(address, dbInfo)) ||
(workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo))) {
TraceEvent("HealthMonitorDetectRecentClosedPeer").suppressFor(30).detail("Peer", address);
req.degradedPeers.push_back(address);
}

View File

@ -690,7 +690,7 @@ TEST_CASE("/flow/Tracing/AddLinks") {
return Void();
};
uint64_t swapUint16BE(uint8_t* index) {
uint16_t swapUint16BE(uint8_t* index) {
uint16_t value;
memcpy(&value, index, sizeof(value));
return fromBigEndian16(value);
@ -718,6 +718,26 @@ std::string readMPString(uint8_t* index, int len) {
return reinterpret_cast<char*>(data);
}
std::string readMPString(uint8_t* index) {
auto len = 0;
switch (*index) {
case 0xda:
index++; // read the size in the next 2 bytes
len = swapUint16BE(index);
index += 2; // move index past the size bytes
break;
default:
// We & out the bits here that contain the length the initial 3 higher order bits are
// to signify this is a string of len <= 31 chars.
len = static_cast<uint8_t>(*index & 0b00011111);
index++;
}
uint8_t data[len + 1];
std::copy(index, index + len, data);
data[len] = '\0';
return reinterpret_cast<char*>(data);
}
// Windows doesn't like lack of header and declaration of constructor for FastUDPTracer
#ifndef WIN32
TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
@ -754,9 +774,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
ASSERT(data[46] == 0xcf);
ASSERT(swapUint64BE(&data[47]) == 1);
// Read and verify span name
ASSERT(data[55] == (0b10100000 | strlen("encoded_span")));
ASSERT(strncmp(readMPString(&data[56], strlen("encoded_span")).c_str(), "encoded_span", strlen("encoded_span")) ==
0);
ASSERT(readMPString(&data[55]) == "encoded_span");
// Verify begin/end is encoded, we don't care about the values
ASSERT(data[68] == 0xcb);
ASSERT(data[77] == 0xcb);
@ -795,10 +813,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
ASSERT(data[0] == 0b10011110); // 14 element array.
// We don't care about the next 54 bytes as there is no parent and a randomly assigned Trace and SpanID
// Read and verify span name
ASSERT(data[55] == (0b10100000 | strlen("encoded_span_3")));
ASSERT(strncmp(readMPString(&data[56], strlen("encoded_span_3")).c_str(),
"encoded_span_3",
strlen("encoded_span_3")) == 0);
ASSERT(readMPString(&data[55]) == "encoded_span_3");
// Verify begin/end is encoded, we don't care about the values
ASSERT(data[70] == 0xcb);
ASSERT(data[79] == 0xcb);
@ -818,43 +833,32 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
ASSERT(swapUint64BE(&data[112]) == 400);
// Events
ASSERT(data[120] == 0b10010001); // empty
ASSERT(data[121] == (0b10100000 | strlen("event1")));
ASSERT(strncmp(readMPString(&data[122], strlen("event1")).c_str(), "event1", strlen("event1")) == 0);
ASSERT(readMPString(&data[121]) == "event1");
ASSERT(data[128] == 0xcb);
ASSERT(swapDoubleBE(&data[129]) == 100.101);
// Events Attributes
ASSERT(data[137] == 0b10000001); // single k/v pair
ASSERT(data[138] == 0b10100011); // length of key string "foo" == 3
ASSERT(strncmp(readMPString(&data[139], strlen("foo")).c_str(), "foo", strlen("foo")) == 0);
ASSERT(data[142] == 0b10100011); // length of key string "bar" == 3
ASSERT(strncmp(readMPString(&data[143], strlen("bar")).c_str(), "bar", strlen("bar")) == 0);
ASSERT(readMPString(&data[138]) == "foo");
ASSERT(readMPString(&data[142]) == "bar");
// Attributes
ASSERT(data[146] == 0b10000010); // two k/v pair
// Reconstruct map from MessagePack wire format data and verify.
std::unordered_map<std::string, std::string> attributes;
auto index = 147;
// We & out the bits here that contain the length the initial 4 higher order bits are
// to signify this is a string of len <= 31 chars.
auto firstKeyLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto firstKey = readMPString(&data[index], firstKeyLength);
index += firstKeyLength;
auto firstValueLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto firstValue = readMPString(&data[index], firstValueLength);
index += firstValueLength;
auto firstKey = readMPString(&data[index]);
index += firstKey.length() + 1; // +1 for control byte
auto firstValue = readMPString(&data[index]);
index += firstValue.length() + 1; // +1 for control byte
attributes[firstKey] = firstValue;
auto secondKeyLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto secondKey = readMPString(&data[index], secondKeyLength);
index += secondKeyLength;
auto secondValueLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto secondValue = readMPString(&data[index], secondValueLength);
auto secondKey = readMPString(&data[index]);
index += secondKey.length() + 1; // +1 for control byte
auto secondValue = readMPString(&data[index]);
attributes[secondKey] = secondValue;
// We don't know what the value for address will be, so just verify it is in the map.
ASSERT(attributes.find("address") != attributes.end());
ASSERT(strncmp(attributes["operation"].c_str(), "grv", strlen("grv")) == 0);
ASSERT(attributes["operation"] == "grv");
request.reset();
@ -876,9 +880,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
// We don't care about the next 54 bytes as there is no parent and a randomly assigned Trace and SpanID
// Read and verify span name
ASSERT(data[55] == 0xda);
auto locationLength = swapUint16BE(&data[56]);
ASSERT(locationLength == strlen(longString));
ASSERT(strncmp(readMPString(&data[58], locationLength).c_str(), longString, strlen(longString)) == 0);
ASSERT(readMPString(&data[55]) == longString);
return Void();
};
#endif