DD:IsValidLocality:Consider configured replica policy

This commit is contained in:
Meng Xu 2019-09-13 11:55:04 -07:00
parent 52f6297b52
commit 90d6a27a0d
1 changed files with 20 additions and 10 deletions

View File

@ -983,7 +983,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// we preferentially mark the least used server as undesirable? // we preferentially mark the least used server as undesirable?
for (auto i = initTeams->allServers.begin(); i != initTeams->allServers.end(); ++i) { for (auto i = initTeams->allServers.begin(); i != initTeams->allServers.end(); ++i) {
if (self->shouldHandleServer(i->first)) { if (self->shouldHandleServer(i->first)) {
if (!self->isValidLocality(i->first.locality)) { if (!self->isValidLocality(self->configuration.storagePolicy, i->first.locality)) {
TraceEvent(SevWarnAlways, "MissingLocality").detail("Server", i->first.uniqueID).detail("Locality", i->first.locality.toString()); TraceEvent(SevWarnAlways, "MissingLocality").detail("Server", i->first.uniqueID).detail("Locality", i->first.locality.toString());
} }
self->addServer(i->first, i->second, self->serverTrackerErrorOut, 0); self->addServer(i->first, i->second, self->serverTrackerErrorOut, 0);
@ -1000,10 +1000,20 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return Void(); return Void();
} }
bool isValidLocality(LocalityData &locality) { // Check if server or machine has a valid locality based on configured replication policy
return locality.isValidProcesId() && locality.isValidMachineId() && bool isValidLocality(Reference<IReplicationPolicy> storagePolicy, LocalityData &locality) {
locality.isValidZoneId() && locality.isValidDcId() && if (!locality.isValidZoneId()) {
locality.isValidDataHallId(); // zoneId is used for machine_id. Must have no matter what policy is used
return false;
}
std::set<std::string> replicationPolicyKeys = storagePolicy->attributeKeys();
if (replicationPolicyKeys.count("dcid") && !locality.isValidDcId()) {
return false;
}
if (replicationPolicyKeys.count("data_hall") && !locality.isValidDataHallId()) {
return false;
}
return true;
} }
void evaluateTeamQuality() { void evaluateTeamQuality() {
@ -1383,7 +1393,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
} }
Reference<TCServerInfo> representativeServer = machine->second->serversOnMachine[0]; Reference<TCServerInfo> representativeServer = machine->second->serversOnMachine[0];
auto& locality = representativeServer->lastKnownInterface.locality; auto& locality = representativeServer->lastKnownInterface.locality;
if (!isValidLocality(locality)) { if (!isValidLocality(configuration.storagePolicy, locality)) {
TraceEvent(SevWarn, "RebuildMachineLocalityMapError") TraceEvent(SevWarn, "RebuildMachineLocalityMapError")
.detail("Machine", machine->second->machineID.toString()) .detail("Machine", machine->second->machineID.toString())
.detail("InvalidLocality", locality.toString()); .detail("InvalidLocality", locality.toString());
@ -1427,7 +1437,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Skip unhealthy machines // Skip unhealthy machines
if (!isMachineHealthy(machine.second)) continue; if (!isMachineHealthy(machine.second)) continue;
// Skip machine with incomplete locality // Skip machine with incomplete locality
if (!isValidLocality(machine.second->serversOnMachine[0]->lastKnownInterface.locality))continue; if (!isValidLocality(configuration.storagePolicy, machine.second->serversOnMachine[0]->lastKnownInterface.locality))continue;
// Invariant: We only create correct size machine teams. // Invariant: We only create correct size machine teams.
// When configuration (e.g., team size) is changed, the DDTeamCollection will be destroyed and rebuilt // When configuration (e.g., team size) is changed, the DDTeamCollection will be destroyed and rebuilt
@ -1597,7 +1607,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
for (auto& server : server_info) { for (auto& server : server_info) {
// Only pick healthy server, which is not failed or excluded. // Only pick healthy server, which is not failed or excluded.
if (server_status.get(server.first).isUnhealthy()) continue; if (server_status.get(server.first).isUnhealthy()) continue;
if (!isValidLocality(server.second->lastKnownInterface.locality)) continue; if (!isValidLocality(configuration.storagePolicy, server.second->lastKnownInterface.locality)) continue;
int numTeams = server.second->teams.size(); int numTeams = server.second->teams.size();
if (numTeams < minTeams) { if (numTeams < minTeams) {
@ -3147,7 +3157,7 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) { ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(TaskPriority::DataDistribution))); state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(TaskPriority::DataDistribution)));
if(type == self->configuration.storageServerStoreType && (self->includedDCs.empty() || std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != self->includedDCs.end()) if(type == self->configuration.storageServerStoreType && (self->includedDCs.empty() || std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != self->includedDCs.end())
&& (self->isValidLocality(server->lastKnownInterface.locality)) ) && (self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality)) )
wait(Future<Void>(Never())); wait(Future<Void>(Never()));
return type; return type;
@ -3496,7 +3506,7 @@ ACTOR Future<Void> storageServerTracker(
.detail("ServerID", server->id) .detail("ServerID", server->id)
.detail("StoreType", type.toString()) .detail("StoreType", type.toString())
.detail("DesiredType", self->configuration.storageServerStoreType.toString()) .detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(server->lastKnownInterface.locality)); .detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality));
TEST(true); //KeyValueStore type changed TEST(true); //KeyValueStore type changed
storeTracker = Never(); storeTracker = Never();