Merge pull request #2120 from xumengpanda/mengxu/6.2.4-DD-patch-no-simulation
Improve FDB reliability from misconfigured localities - Part 1
This commit is contained in:
@ -2,7 +2,7 @@
Release Notes
@ -46,6 +46,7 @@ Fixes
* The ``fileconfigure`` command in ``fdbcli`` could fail with an unknown error if the file did not contain a valid JSON object. `(PR #2017) <>`_.
* Configuring regions would fail with an internal error if the cluster contained storage servers that didn't set a datacenter ID. `(PR #2017) <>`_.
* Clients no longer prefer reading from servers with the same zone ID, because it could create hot shards. [6.2.3] `(PR #2019) <>`_.
* Data distribution could fail to start if any storage servers had misconfigured locality information. This problem could persist even after the offending storage servers were removed or fixed. [6.2.5] `(PR #2110) <>`_.
@ -42,6 +42,8 @@ class TCTeamInfo;
struct TCMachineInfo;
class TCMachineTeamInfo;
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self);
struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
UID id;
StorageServerInterface lastKnownInterface;
@ -74,7 +76,10 @@ struct TCMachineInfo : public ReferenceCounted<TCMachineInfo> {
explicit TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry) : localityEntry(entry) {
machineID = server->lastKnownInterface.locality.zoneId().get();
LocalityData& locality = server->lastKnownInterface.locality;
machineID = locality.zoneId().get();
std::string getServersIDStr() {
@ -594,6 +599,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
AsyncVar<bool> zeroOptimalTeams;
AsyncMap< AddressExclusion, bool > excludedServers; // true if an address is in the excluded list in the database. Updated asynchronously (eventually)
std::set<AddressExclusion> invalidLocalityAddr; // These address have invalidLocality for the configured storagePolicy
std::vector<Optional<Key>> includedDCs;
Optional<std::vector<Optional<Key>>> otherTrackedDCs;
@ -605,6 +611,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Future<Void> badTeamRemover;
Future<Void> redundantMachineTeamRemover;
Future<Void> redundantServerTeamRemover;
Future<Void> checkInvalidLocalities;
Reference<LocalitySet> storageServerSet;
std::vector<LocalityEntry> forcedEntries, resultEntries;
@ -645,9 +652,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
Reference<AsyncVar<bool>> processingUnhealthy)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()),
badTeamRemover(Void()), redundantMachineTeamRemover(Void()), redundantServerTeamRemover(Void()),
configuration(configuration), readyToStart(readyToStart), clearHealthyZoneFuture(true),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), redundantMachineTeamRemover(Void()),
redundantServerTeamRemover(Void()), checkInvalidLocalities(Void()), configuration(configuration),
readyToStart(readyToStart), clearHealthyZoneFuture(true),
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)),
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)),
@ -983,6 +991,17 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// we preferentially mark the least used server as undesirable?
for (auto i = initTeams->allServers.begin(); i != initTeams->allServers.end(); ++i) {
if (self->shouldHandleServer(i->first)) {
if (!self->isValidLocality(self->configuration.storagePolicy, i->first.locality)) {
TraceEvent(SevWarnAlways, "MissingLocality")
.detail("Server", i->first.uniqueID)
.detail("Locality", i->first.locality.toString());
auto addr = i->first.address();
self->invalidLocalityAddr.insert(AddressExclusion(addr.ip, addr.port));
if (self->checkInvalidLocalities.isReady()) {
self->checkInvalidLocalities = checkAndRemoveInvalidLocalityAddr(self);
self->addServer(i->first, i->second, self->serverTrackerErrorOut, 0);
@ -997,6 +1016,25 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return Void();
// Check if server or machine has a valid locality based on configured replication policy
bool isValidLocality(Reference<IReplicationPolicy> storagePolicy, const LocalityData& locality) {
// Future: Once we add simulation test that misconfigure a cluster, such as not setting some locality entries,
// DD_VALIDATE_LOCALITY should always be true. Otherwise, simulation test may fail.
// Disable the checking if locality is valid
return true;
std::set<std::string> replicationPolicyKeys = storagePolicy->attributeKeys();
for (auto& policy : replicationPolicyKeys) {
if (!locality.isPresent(policy)) {
return false;
return true;
void evaluateTeamQuality() {
int teamCount = teams.size(), serverCount = allServers.size();
double teamsPerServer = (double)teamCount * configuration.storageTeamSize / serverCount;
@ -1374,6 +1412,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<TCServerInfo> representativeServer = machine->second->serversOnMachine[0];
auto& locality = representativeServer->lastKnownInterface.locality;
if (!isValidLocality(configuration.storagePolicy, locality)) {
TraceEvent(SevWarn, "RebuildMachineLocalityMapError")
.detail("Machine", machine->second->machineID.toString())
.detail("InvalidLocality", locality.toString());
const LocalityEntry& localityEntry = machineLocalityMap.add(locality, &representativeServer->id);
machine->second->localityEntry = localityEntry;
@ -1411,6 +1455,11 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
ASSERT_WE_THINK(server_info.find(machine.second->serversOnMachine[0]->id) != server_info.end());
// Skip unhealthy machines
if (!isMachineHealthy(machine.second)) continue;
// Skip machine with incomplete locality
if (!isValidLocality(configuration.storagePolicy,
machine.second->serversOnMachine[0]->lastKnownInterface.locality)) {
// Invariant: We only create correct size machine teams.
// When configuration (e.g., team size) is changed, the DDTeamCollection will be destroyed and rebuilt
@ -1580,6 +1629,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
for (auto& server : server_info) {
// Only pick healthy server, which is not failed or excluded.
if (server_status.get(server.first).isUnhealthy()) continue;
if (!isValidLocality(configuration.storagePolicy, server.second->lastKnownInterface.locality)) continue;
int numTeams = server.second->teams.size();
if (numTeams < minTeams) {
@ -1590,9 +1640,15 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
if (!leastUsedServers.empty()) {
return deterministicRandom()->randomChoice(leastUsedServers);
// If we cannot find a healthy server with valid locality
.detail("Servers", server_info.size())
.detail("UnhealthyServers", unhealthyServers);
return Reference<TCServerInfo>();
// Randomly choose one machine team that has chosenServer and has the correct size
// When configuration is changed, we may have machine teams with old storageTeamSize
@ -1882,6 +1938,9 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
for (int i = 0; i < maxAttempts && i < 100; ++i) {
// Step 2: Choose 1 least used server and then choose 1 least used machine team from the server
Reference<TCServerInfo> chosenServer = findOneLeastUsedServer();
if (!chosenServer.isValid()) {
break; // We cannot find a valid server
// Note: To avoid creating correlation of picked machine teams, we simply choose a random machine team
// instead of choosing the least used machine team.
// The correlation happens, for example, when we add two new machines, we may always choose the machine
@ -3119,8 +3178,13 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
//Returns the KeyValueStoreType of server if it is different from self->storeType
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(TaskPriority::DataDistribution)));
if(type == self->configuration.storageServerStoreType && (self->includedDCs.empty() || std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != self->includedDCs.end()) )
if (type == self->configuration.storageServerStoreType &&
(self->includedDCs.empty() ||
std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) !=
self->includedDCs.end()) &&
(self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality))) {
return type;
@ -3467,7 +3531,9 @@ ACTOR Future<Void> storageServerTracker(
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", type.toString())
.detail("DesiredType", self->configuration.storageServerStoreType.toString());
.detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy,
TEST(true); //KeyValueStore type changed
storeTracker = Never();
@ -3519,7 +3585,76 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection* self) {
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self) {
state double start = now();
loop {
try {
// Because worker's processId can be changed when its locality is changed, we cannot watch on the old
// processId; This actor is inactive most time, so iterating all workers incurs little performance overhead.
state vector<ProcessData> workers = wait(getWorkers(self->cx));
state std::set<AddressExclusion> existingAddrs;
for (int i = 0; i < workers.size(); i++) {
const ProcessData& workerData = workers[i];
AddressExclusion addr(workerData.address.ip, workerData.address.port);
if (self->invalidLocalityAddr.count(addr) &&
self->isValidLocality(self->configuration.storagePolicy, workerData.locality)) {
// The locality info on the addr has been corrected
TraceEvent("InvalidLocalityCorrected").detail("Addr", addr.toString());
// In case system operator permanently excludes workers on the address with invalid locality
for (auto addr = self->invalidLocalityAddr.begin(); addr != self->invalidLocalityAddr.end();) {
if (!existingAddrs.count(*addr)) {
// The address no longer has a worker
addr = self->invalidLocalityAddr.erase(addr);
TraceEvent("InvalidLocalityNoLongerExists").detail("Addr", addr->toString());
} else {
if (self->invalidLocalityAddr.empty()) {
if (now() - start > 300) { // Report warning if invalid locality is not corrected within 300 seconds
// The incorrect locality info has not been properly corrected in a reasonable time
TraceEvent(SevWarn, "PersistentInvalidLocality").detail("Addresses", self->invalidLocalityAddr.size());
start = now();
} catch (Error& e) {
TraceEvent("CheckAndRemoveInvalidLocalityAddrRetry", self->distributorId).detail("Error", e.what());
return Void();
ACTOR Future<Void> initializeStorage( DDTeamCollection* self, RecruitStorageReply candidateWorker ) {
// Exclude the worker that has invalid locality
if (!self->isValidLocality(self->configuration.storagePolicy, candidateWorker.worker.locality)) {
TraceEvent(SevWarn, "DDRecruiting")
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
.detail("Addr", candidateWorker.worker.address());
auto addr = candidateWorker.worker.address();
self->invalidLocalityAddr.insert(AddressExclusion(addr.ip, addr.port));
if (self->checkInvalidLocalities.isReady()) {
self->checkInvalidLocalities = checkAndRemoveInvalidLocalityAddr(self);
return Void();
// SOMEDAY: Cluster controller waits for availability, retry quickly if a server's Locality changes
@ -3585,11 +3720,19 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
auto excl = self->excludedServers.getKeys();
for(auto& s : excl)
for (auto& s : excl) {
if (self->excludedServers.get(s)) {
TraceEvent(SevDebug, "DDRecruitExcl2").detail("Excluding", s.toString());
exclusions.insert( s );
// Exclude workers that have invalid locality
for (auto& addr : self->invalidLocalityAddr) {
TraceEvent(SevDebug, "DDRecruitExclInvalidAddr").detail("Excluding", addr.toString());
rsr.criticalRecruitment = self->healthyTeamCount == 0;
for(auto it : exclusions) {
@ -187,6 +187,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( DD_OVERLAP_PENALTY, 10000 );
init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false;
init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600;
// TeamRemover
TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = false; if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
@ -145,6 +145,8 @@ public:
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor
Reference in New Issue