Return the source team if remote DC is dead

Also refactor the code with findTeamFromServers().
This commit is contained in:
Jingyu Zhou 2023-02-09 17:47:23 -08:00
parent 9aa15b459c
commit 622520bd2d
5 changed files with 55 additions and 54 deletions

View File

@ -130,7 +130,7 @@ public:
KillType kt,
KillType* newKillType) const = 0;
virtual bool isAvailable() const = 0;
virtual std::vector<AddressExclusion> getExcludeDCAddresses(Optional<Standalone<StringRef>> dcId) const = 0;
virtual std::vector<AddressExclusion> getAllAddressesInDCToExclude(Optional<Standalone<StringRef>> dcId) const = 0;
virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const = 0;
virtual void displayWorkers() const;
ProtocolVersion protocolVersion() const override = 0;

View File

@ -1422,7 +1422,7 @@ public:
return canKillProcesses(processesLeft, processesDead, KillType::KillInstantly, nullptr);
}
std::vector<AddressExclusion> getExcludeDCAddresses(Optional<Standalone<StringRef>> dcId) const override {
std::vector<AddressExclusion> getAllAddressesInDCToExclude(Optional<Standalone<StringRef>> dcId) const override {
std::vector<AddressExclusion> addresses;
if (!dcId.present()) {
return addresses;

View File

@ -176,6 +176,20 @@ public:
ACTOR static Future<Void> getTeam(DDTeamCollection* self, GetTeamRequest req) {
try {
wait(self->checkBuildTeams());
if (!self->primary && !self->readyToStart.isReady()) {
// When remote DC is not ready, DD shouldn't reply with a new team because
// a data movement to that team can't be completed and such a move
// may block the primary DC from reaching "storage_recovered".
auto team = self->findTeamFromServers(req.completeSources, /*wantHealthy=*/false);
TraceEvent("GetTeamNotReady", self->distributorId)
.suppressFor(1.0)
.detail("Primary", self->primary)
.detail("Team", team.present() ? describe(team.get()->getServerIDs()) : "");
req.reply.send(std::make_pair(team, true));
return Void();
}
// report the median available space
if (now() - self->lastMedianAvailableSpaceUpdate > SERVER_KNOBS->AVAILABLE_SPACE_UPDATE_DELAY) {
self->lastMedianAvailableSpaceUpdate = now();
@ -232,30 +246,13 @@ public:
bool wigglingBestOption = false; // best option contains server in paused wiggle state
Optional<Reference<IDataDistributionTeam>> bestOption;
std::vector<Reference<TCTeamInfo>> randomTeams;
const std::set<UID> completeSources(req.completeSources.begin(), req.completeSources.end());
// Note: this block does not apply any filters from the request
if (!req.wantsNewServers) {
for (int i = 0; i < req.completeSources.size(); i++) {
if (!self->server_info.count(req.completeSources[i])) {
continue;
}
auto const& teamList = self->server_info[req.completeSources[i]]->getTeams();
for (int j = 0; j < teamList.size(); j++) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
for (int k = 0; k < teamList[j]->size(); k++) {
if (!completeSources.count(serverIDs[k])) {
found = false;
break;
}
}
if (found && teamList[j]->isHealthy()) {
bestOption = teamList[j];
req.reply.send(std::make_pair(bestOption, foundSrc));
return Void();
}
}
auto healthyTeam = self->findTeamFromServers(req.completeSources, /* wantHealthy=*/true);
if (healthyTeam.present()) {
req.reply.send(std::make_pair(healthyTeam, foundSrc));
return Void();
}
}
@ -355,26 +352,10 @@ public:
// Note: this block does not apply any filters from the request
if (!bestOption.present() && self->zeroHealthyTeams->get()) {
// Attempt to find the unhealthy source server team and return it
for (int i = 0; i < req.completeSources.size(); i++) {
if (!self->server_info.count(req.completeSources[i])) {
continue;
}
auto const& teamList = self->server_info[req.completeSources[i]]->getTeams();
for (int j = 0; j < teamList.size(); j++) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
for (int k = 0; k < teamList[j]->size(); k++) {
if (!completeSources.count(serverIDs[k])) {
found = false;
break;
}
}
if (found) {
bestOption = teamList[j];
req.reply.send(std::make_pair(bestOption, foundSrc));
return Void();
}
}
auto healthyTeam = self->findTeamFromServers(req.completeSources, /* wantHealthy=*/false);
if (healthyTeam.present()) {
req.reply.send(std::make_pair(healthyTeam, foundSrc));
return Void();
}
}
// if (!bestOption.present()) {
@ -382,15 +363,6 @@ public:
// self->traceAllInfo(true);
// }
if (!self->readyToStart.isReady()) {
// When remote DC is not ready, DD shouldn't reply with a team because
// a data movement to that team can't be completed and such a move
// may block the primary DC from reaching "storage_recovered".
TraceEvent("GetTeamNotReady", self->distributorId).detail("Primary", self->primary);
req.reply.send(std::make_pair(Optional<Reference<IDataDistributionTeam>>(), foundSrc));
return Void();
}
req.reply.send(std::make_pair(bestOption, foundSrc));
return Void();
} catch (Error& e) {
@ -3410,6 +3382,31 @@ bool DDTeamCollection::teamContainsFailedServer(Reference<TCTeamInfo> team) cons
return false;
}
Optional<Reference<IDataDistributionTeam>> DDTeamCollection::findTeamFromServers(const std::vector<UID>& servers,
bool wantHealthy) {
const std::set<UID> completeSources(servers.begin(), servers.end());
for (const auto& server : servers) {
if (!server_info.count(server)) {
continue;
}
auto const& teamList = server_info[server]->getTeams();
for (const auto& team : teamList) {
bool found = true;
for (const UID& s : team->getServerIDs()) {
if (!completeSources.count(s)) {
found = false;
break;
}
}
if (found && (!wantHealthy || team->isHealthy())) {
return team;
}
}
}
return Optional<Reference<IDataDistributionTeam>>();
}
Future<Void> DDTeamCollection::logOnCompletion(Future<Void> signal) {
return DDTeamCollectionImpl::logOnCompletion(this, signal);
}

View File

@ -731,8 +731,8 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx,
.detail("PrimaryDead", primaryDead);
g_simulator->usableRegions = 1;
state std::vector<AddressExclusion> servers =
g_simulator->getExcludeDCAddresses(primaryDead ? g_simulator->primaryDcId : g_simulator->remoteDcId);
state std::vector<AddressExclusion> servers = g_simulator->getAllAddressesInDCToExclude(
primaryDead ? g_simulator->primaryDcId : g_simulator->remoteDcId);
wait(excludeServers(cx, servers, false));
TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration")
.detail("Location", context)

View File

@ -312,6 +312,10 @@ protected:
// When configuration is changed, we may have machine teams with old storageTeamSize
Reference<TCMachineTeamInfo> findOneRandomMachineTeam(TCServerInfo const& chosenServer) const;
// Returns a server team from given "servers", empty team if not found.
// When "wantHealthy" is true, only return if the team is healthy.
Optional<Reference<IDataDistributionTeam>> findTeamFromServers(const std::vector<UID>& servers, bool wantHealthy);
Future<Void> logOnCompletion(Future<Void> signal);
void resetLocalitySet();