Fix DD stuck when remote DC is dead

When remote DC is down, the remote team collection of DD can initializing
waiting for the remote to recover (all_tlog_recruited state). However, the
getTeam request can already be served by the remote team collection. So, for
a RelocateShard (data movement such as split, move), it will get a team for
the remote DC. But the data movement can't make progress on the remote team
because the remote DC hasn't recovered yet. Because of the stuck of data
movement, the primary cannot reach the "storage_recovered" state and stay in
accepting_commit state.

The specifc test failure: slow/ApiCorrectness.toml -s 339026305 -b on
at commit:  0edd899d65

In this test, primary DC has 1 SS killed, remote DC has 2 TLog and 2 SS killed.
So the remote is dead, the remaining 2 SSes can't make progress because of the
loss of 2 TLogs. The repairDeadDatacenter() can't reach the "storage_recovered"
state due to DD's failure of moving shards away from the killed SS in the
primary.

The fix is to exclude all remote in repairDeadDatacenter() so that tells DD to
mark all SSes in the remote as unhealthy. Another fix is to return empty
results for getTeam request if the remote team collection is not ready. This
will allow the data movement to continue, essentially remote team is not changed
for the data movement.
This commit is contained in:
Jingyu Zhou 2023-02-08 23:15:09 -08:00
parent ce49bfb8ac
commit 6c4a9b5f23
8 changed files with 111 additions and 46 deletions

View File

@ -1136,45 +1136,6 @@ struct LogMessageVersion {
}
};
struct AddressExclusion {
IPAddress ip;
int port;
AddressExclusion() : ip(0), port(0) {}
explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {}
explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {}
bool operator<(AddressExclusion const& r) const {
if (ip != r.ip)
return ip < r.ip;
return port < r.port;
}
bool operator==(AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
bool isWholeMachine() const { return port == 0; }
bool isValid() const { return ip.isValid() || port != 0; }
bool excludes(NetworkAddress const& addr) const {
if (isWholeMachine())
return ip == addr.ip;
return ip == addr.ip && port == addr.port;
}
// This is for debugging and IS NOT to be used for serialization to persistant state
std::string toString() const {
if (!isWholeMachine())
return formatIpPort(ip, port);
return ip.toString();
}
static AddressExclusion parse(StringRef const&);
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ip, port);
}
};
inline bool addressExcluded(std::set<AddressExclusion> const& exclusions, NetworkAddress const& addr) {
return exclusions.count(AddressExclusion(addr.ip, addr.port)) || exclusions.count(AddressExclusion(addr.ip));
}

View File

@ -130,6 +130,7 @@ public:
KillType kt,
KillType* newKillType) const = 0;
virtual bool isAvailable() const = 0;
virtual std::vector<AddressExclusion> getExcludeDCAddresses(Optional<Standalone<StringRef>> dcId) const = 0;
virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const = 0;
virtual void displayWorkers() const;
ProtocolVersion protocolVersion() const override = 0;

View File

@ -1422,6 +1422,19 @@ public:
return canKillProcesses(processesLeft, processesDead, KillType::KillInstantly, nullptr);
}
std::vector<AddressExclusion> getExcludeDCAddresses(Optional<Standalone<StringRef>> dcId) const override {
std::vector<AddressExclusion> addresses;
if (!dcId.present()) {
return addresses;
}
for (const auto& processInfo : getAllProcesses()) {
if (processInfo->locality.dcId() == dcId) {
addresses.emplace_back(processInfo->address.ip, processInfo->address.port);
}
}
return addresses;
}
bool datacenterDead(Optional<Standalone<StringRef>> dcId) const override {
if (!dcId.present()) {
return false;

View File

@ -176,6 +176,9 @@ public:
ACTOR static Future<Void> getTeam(DDTeamCollection* self, GetTeamRequest req) {
try {
wait(self->checkBuildTeams());
TraceEvent("GetTeam", self->distributorId)
.detail("Primary", self->primary)
.detail("ZeroHealthy", self->zeroHealthyTeams->get());
// report the median available space
if (now() - self->lastMedianAvailableSpaceUpdate > SERVER_KNOBS->AVAILABLE_SPACE_UPDATE_DELAY) {
self->lastMedianAvailableSpaceUpdate = now();
@ -225,6 +228,9 @@ public:
// The situation happens rarely. We may want to eliminate this situation someday
if (!self->teams.size()) {
req.reply.send(std::make_pair(Optional<Reference<IDataDistributionTeam>>(), foundSrc));
TraceEvent("GetTeam2", self->distributorId)
.detail("Primary", self->primary)
.detail("ZeroHealthy", self->zeroHealthyTeams->get());
return Void();
}
@ -253,6 +259,10 @@ public:
if (found && teamList[j]->isHealthy()) {
bestOption = teamList[j];
req.reply.send(std::make_pair(bestOption, foundSrc));
TraceEvent("GetTeam3", self->distributorId)
.detail("Primary", self->primary)
.detail("Team", teamList[j]->getServerIDsStr())
.detail("ZeroHealthy", self->zeroHealthyTeams->get());
return Void();
}
}
@ -382,7 +392,19 @@ public:
// self->traceAllInfo(true);
// }
if (!self->readyToStart.isReady()) {
TraceEvent("GetTeam4", self->distributorId)
.detail("Primary", self->primary)
.detail("ZeroHealthy", self->zeroHealthyTeams->get());
req.reply.send(std::make_pair(Optional<Reference<IDataDistributionTeam>>(), foundSrc));
return Void();
}
req.reply.send(std::make_pair(bestOption, foundSrc));
TraceEvent("GetTeam5", self->distributorId)
.detail("Primary", self->primary)
.detail("Team", bestOption.present() ? describe(bestOption.get()->getServerIDs()) : "")
.detail("ZeroHealthy", self->zeroHealthyTeams->get());
return Void();
} catch (Error& e) {
@ -1138,6 +1160,10 @@ public:
NetworkAddress a = server->getLastKnownInterface().address();
AddressExclusion worstAddr(a.ip, a.port);
DDTeamCollection::Status worstStatus = self->excludedServers.get(worstAddr);
TraceEvent("DDD", self->distributorId)
.detail("Address", worstAddr.toString())
.detail("ServerId", server->getId())
.detail("Status", (int)worstStatus);
if (worstStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(worstAddr, self, server)) {
TraceEvent(SevInfo, "InvalidWiggleServer", self->distributorId)
@ -1395,7 +1421,6 @@ public:
state Error err = e;
TraceEvent("StorageServerTrackerCancelled", self->distributorId)
.errorUnsuppressed(e)
.suppressFor(1.0)
.detail("Primary", self->primary)
.detail("Server", server->getId());
if (e.code() != error_code_actor_cancelled && errorOut.canBeSet()) {
@ -1852,6 +1877,7 @@ public:
// Fetch the list of excluded servers
state ReadYourWritesTransaction tr(self->dbContext());
loop {
TraceEvent("DDExcludedServersChanged000", self->distributorId).detail("Primary", self->isPrimary());
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
@ -1890,8 +1916,10 @@ public:
failed.insert(addr);
}
}
TraceEvent("DDExcludedServersChangedBefore", self->distributorId).detail("Primary", self->isPrimary());
wait(success(fworkers));
TraceEvent("DDExcludedServersChangedAfter", self->distributorId).detail("Primary", self->isPrimary());
std::vector<ProcessData> workers = fworkers.get();
for (const auto& r : excludedLocalityResults) {
std::string locality = decodeExcludedLocalityKey(r.key);
@ -1916,6 +1944,10 @@ public:
}
}
for (const auto& n : excluded) {
TraceEvent("DDExcludedServersChanged0", self->distributorId)
.detail("N", n.toString())
.detail("Primary", self->isPrimary())
.detail("Failed", failed.count(n));
if (!failed.count(n)) {
self->excludedServers.set(n, DDTeamCollection::Status::EXCLUDED);
}
@ -1929,6 +1961,7 @@ public:
.detail("AddressesExcluded", excludedResults.size())
.detail("AddressesFailed", failedResults.size())
.detail("LocalitiesExcluded", excludedLocalityResults.size())
.detail("Primary", self->isPrimary())
.detail("LocalitiesFailed", failedLocalityResults.size());
self->restartRecruiting.trigger();
@ -3023,6 +3056,7 @@ public:
.detail("UnhealthyServers", self->unhealthyServers)
.detail("ServerCount", self->server_info.size())
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("ZeroHealthy", self->zeroOptimalTeams.get())
.detail("HighestPriority", highestPriority)
.trackLatest(self->primary ? "TotalDataInFlight"
: "TotalDataInFlightRemote"); // This trace event's trackLatest

View File

@ -364,7 +364,9 @@ public:
wait(self->loadDatabaseConfiguration());
self->initDcInfo();
TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", self->configuration.toString());
TraceEvent("DDInitGotConfiguration", self->ddId)
.setMaxFieldLength(-1)
.detail("Conf", self->configuration.toString());
wait(self->updateReplicaKeys());
TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log();
@ -461,7 +463,7 @@ public:
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false));
}
if (traceShard) {
TraceEvent(SevDebug, "DDInitShard")
TraceEvent(SevDebug, "DDInitShard", self->ddId)
.detail("Keys", keys)
.detail("PrimarySrc", describe(iShard.primarySrc))
.detail("RemoteSrc", describe(iShard.remoteSrc))

View File

@ -802,13 +802,16 @@ ACTOR Future<Void> waitForShardReady(StorageServerInterface server,
try {
GetShardStateReply rep =
wait(server.getShardState.getReply(GetShardStateRequest(keys, mode), TaskPriority::MoveKeys));
TraceEvent("GetShardStateReadyDD").detail("RepVersion", rep.first).detail("MinVersion", rep.second).log();
TraceEvent("GetShardStateReadyDD", server.id())
.detail("RepVersion", rep.first)
.detail("MinVersion", rep.second)
.log();
if (rep.first >= minVersion) {
return Void();
}
wait(delayJittered(SERVER_KNOBS->SHARD_READY_DELAY, TaskPriority::MoveKeys));
} catch (Error& e) {
TraceEvent("GetShardStateReadyError").error(e).log();
TraceEvent("GetShardStateReadyError", server.id()).error(e).log();
if (e.code() != error_code_timed_out) {
if (e.code() != error_code_broken_promise)
throw e;
@ -1174,7 +1177,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
tssCount += tssReady[s].isReady() && !tssReady[s].isError();
TraceEvent readyServersEv(SevDebug, waitInterval.end(), relocationIntervalId);
readyServersEv.detail("ReadyServers", count);
readyServersEv.detail("ReadyServers", count).detail("Dests", dest.size());
if (tssReady.size()) {
readyServersEv.detail("ReadyTSS", tssCount);
}

View File

@ -19,6 +19,7 @@
*/
#include <cinttypes>
#include <unordered_map>
#include <vector>
#include <type_traits>
@ -715,7 +716,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
if (g_network->isSimulated() && g_simulator->usableRegions > 1 && !g_simulator->quiesced) {
bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId);
state bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId);
bool remoteDead = g_simulator->datacenterDead(g_simulator->remoteDcId);
// FIXME: the primary and remote can both be considered dead because excludes are not handled properly by the
@ -731,6 +732,15 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx,
.detail("RemoteDead", remoteDead)
.detail("PrimaryDead", primaryDead);
g_simulator->usableRegions = 1;
state std::vector<AddressExclusion> servers =
g_simulator->getExcludeDCAddresses(primaryDead ? g_simulator->primaryDcId : g_simulator->remoteDcId);
wait(excludeServers(cx, servers, false));
TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration")
.detail("Location", context)
.detail("Stage", "LocalityExcluded")
.detail("Localities", describe(servers));
wait(success(ManagementAPI::changeConfig(
cx.getReference(),
(primaryDead ? g_simulator->disablePrimary : g_simulator->disableRemote) + " repopulate_anti_quorum=1",

View File

@ -163,4 +163,45 @@ struct NetworkAddressList {
}
};
extern std::string formatIpPort(const IPAddress& ip, uint16_t port);
struct AddressExclusion {
IPAddress ip;
int port;
AddressExclusion() : ip(0), port(0) {}
explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {}
explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {}
bool operator<(AddressExclusion const& r) const {
if (ip != r.ip)
return ip < r.ip;
return port < r.port;
}
bool operator==(AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
bool isWholeMachine() const { return port == 0; }
bool isValid() const { return ip.isValid() || port != 0; }
bool excludes(NetworkAddress const& addr) const {
if (isWholeMachine())
return ip == addr.ip;
return ip == addr.ip && port == addr.port;
}
// This is for debugging and IS NOT to be used for serialization to persistant state
std::string toString() const {
if (!isWholeMachine())
return formatIpPort(ip, port);
return ip.toString();
}
static AddressExclusion parse(StringRef const&);
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ip, port);
}
};
#endif // FLOW_NETWORKADDRESS_H