Merge pull request #3566 from sfc-gh-anoyes/anoyes/issue-3565

Use IPAddress instead of NetworkAddress for exclude
This commit is contained in:
Meng Xu 2020-08-02 10:53:09 -07:00 committed by GitHub
commit e1a5988c5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 27 deletions

View File

@ -2154,8 +2154,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
return false;
} else {
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
state std::vector<AddressExclusion> exclusionVector;
state std::set<AddressExclusion> exclusionSet;
bool force = false;
state bool waitForAllExcluded = true;
state bool markFailed = false;
@ -2174,8 +2174,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
printf(" Do not include the `:tls' suffix when naming a process\n");
return true;
}
addresses.push_back( a );
exclusions.insert( a );
exclusionVector.push_back(a);
exclusionSet.insert(a);
}
}
@ -2183,7 +2183,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
if (markFailed) {
state bool safe;
try {
bool _safe = wait(makeInterruptable(checkSafeExclusions(db, addresses)));
bool _safe = wait(makeInterruptable(checkSafeExclusions(db, exclusionVector)));
safe = _safe;
} catch (Error& e) {
TraceEvent("CheckSafeExclusionsError").error(e);
@ -2244,7 +2244,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
return true;
}
NetworkAddress addr = NetworkAddress::parse(addrStr);
bool excluded = (process.has("excluded") && process.last().get_bool()) || addressExcluded(exclusions, addr);
bool excluded =
(process.has("excluded") && process.last().get_bool()) || addressExcluded(exclusionSet, addr);
ssTotalCount++;
if (excluded)
ssExcludedCount++;
@ -2285,7 +2286,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
}
}
wait(makeInterruptable(excludeServers(db, addresses, markFailed)));
wait(makeInterruptable(excludeServers(db, exclusionVector, markFailed)));
if (waitForAllExcluded) {
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
@ -2296,7 +2297,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
warn.cancel();
state std::set<NetworkAddress> notExcludedServers =
wait(makeInterruptable(checkForExcludingServers(db, addresses, waitForAllExcluded)));
wait(makeInterruptable(checkForExcludingServers(db, exclusionVector, waitForAllExcluded)));
std::vector<ProcessData> workers = wait( makeInterruptable(getWorkers(db)) );
std::map<IPAddress, std::set<uint16_t>> workerPorts;
for(auto addr : workers)
@ -2304,7 +2305,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
// Print a list of all excluded addresses that don't have a corresponding worker
std::set<AddressExclusion> absentExclusions;
for(auto addr : addresses) {
for (const auto& addr : exclusionVector) {
auto worker = workerPorts.find(addr.ip);
if(worker == workerPorts.end())
absentExclusions.insert(addr);
@ -2312,43 +2313,46 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
absentExclusions.insert(addr);
}
for (auto addr : addresses) {
NetworkAddress _addr(addr.ip, addr.port);
if (absentExclusions.find(addr) != absentExclusions.end()) {
if(addr.port == 0)
for (const auto& exclusion : exclusionVector) {
if (absentExclusions.find(exclusion) != absentExclusions.end()) {
if (exclusion.port == 0) {
printf(" %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the "
"correct machines before removing them from the cluster!\n",
addr.ip.toString().c_str());
else
exclusion.ip.toString().c_str());
} else {
printf(" %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes "
"before removing them from the cluster!\n",
addr.toString().c_str());
} else if (notExcludedServers.find(_addr) != notExcludedServers.end()) {
if (addr.port == 0)
exclusion.toString().c_str());
}
} else if (std::any_of(notExcludedServers.begin(), notExcludedServers.end(),
[&](const NetworkAddress& a) { return addressExcluded({ exclusion }, a); })) {
if (exclusion.port == 0) {
printf(" %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this "
"machine from the cluster\n",
addr.ip.toString().c_str());
else
exclusion.ip.toString().c_str());
} else {
printf(" %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the "
"cluster\n",
addr.toString().c_str());
exclusion.toString().c_str());
}
} else {
if (addr.port == 0)
if (exclusion.port == 0) {
printf(" %s(Whole machine) ---- Successfully excluded. It is now safe to remove this machine "
"from the cluster.\n",
addr.ip.toString().c_str());
else
exclusion.ip.toString().c_str());
} else {
printf(
" %s ---- Successfully excluded. It is now safe to remove this process from the cluster.\n",
addr.toString().c_str());
exclusion.toString().c_str());
}
}
}
bool foundCoordinator = false;
auto ccs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
for( auto& c : ccs.coordinators()) {
if (std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip, c.port) ) ||
std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip) )) {
if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) ||
std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) {
printf("WARNING: %s is a coordinator!\n", c.toString().c_str());
foundCoordinator = true;
}