Status json schema update, includelocalities back for consistency check, review comments.

This commit is contained in:
Neethu Haneesha Bingi 2021-06-17 00:27:26 -07:00
parent 4ad5926a25
commit cbe714acd0
6 changed files with 125 additions and 267 deletions

View File

@ -2391,7 +2391,7 @@ ACTOR Future<bool> coordinators(Database db, std::vector<StringRef> tokens, bool
return err;
}
// Includes the servers that could be ipaddresses or localities back to the cluster.
// Includes the servers that could be IP addresses or localities back to the cluster.
ACTOR Future<bool> include(Database db, std::vector<StringRef> tokens) {
std::vector<AddressExclusion> addresses;
state std::vector<std::string> localities;
@ -2423,14 +2423,14 @@ ACTOR Future<bool> include(Database db, std::vector<StringRef> tokens) {
std::vector<AddressExclusion> includeAll;
includeAll.push_back(AddressExclusion());
wait(makeInterruptable(includeServers(db, includeAll, failed)));
wait(makeInterruptable(includeLocalities(db, &localities, failed, all)));
wait(makeInterruptable(includeLocalities(db, localities, failed, all)));
} else {
if (!addresses.empty()) {
wait(makeInterruptable(includeServers(db, addresses, failed)));
}
if (!localities.empty()) {
// includes the servers that belong to given localities.
wait(makeInterruptable(includeLocalities(db, &localities, failed, all)));
wait(makeInterruptable(includeLocalities(db, localities, failed, all)));
}
}
return false;
@ -2492,7 +2492,7 @@ ACTOR Future<bool> exclude(Database db,
noMatchLocalities.push_back(t->toString());
} else {
// add all the server ipaddresses that belong to the given localities to the exclusionSet.
std::copy(localityAddresses.begin(), localityAddresses.end(), back_inserter(exclusionVector));
exclusionVector.insert(exclusionVector.end(), localityAddresses.begin(), localityAddresses.end());
exclusionSet.insert(localityAddresses.begin(), localityAddresses.end());
}
exclusionLocalities.insert(t->toString());
@ -2632,7 +2632,7 @@ ACTOR Future<bool> exclude(Database db,
wait(makeInterruptable(excludeServers(db, exclusionAddresses, markFailed)));
}
if (!exclusionLocalities.empty()) {
wait(makeInterruptable(excludeLocalities(db, &exclusionLocalities, markFailed)));
wait(makeInterruptable(excludeLocalities(db, exclusionLocalities, markFailed)));
}
if (waitForAllExcluded) {

View File

@ -1597,7 +1597,7 @@ ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers,
}
// excludes localities by setting the keys in api version below 7.0
void excludeLocalities(Transaction& tr, std::unordered_set<std::string>* localities, bool failed) {
void excludeLocalities(Transaction& tr, std::unordered_set<std::string> localities, bool failed) {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
@ -1606,35 +1606,35 @@ void excludeLocalities(Transaction& tr, std::unordered_set<std::string>* localit
auto localityVersionKey = failed ? failedLocalityVersionKey : excludedLocalityVersionKey;
tr.addReadConflictRange(singleKeyRange(localityVersionKey)); // To conflict with parallel includeLocalities
tr.set(localityVersionKey, excludeVersionKey);
for (const auto& l : *localities) {
for (const auto& l : localities) {
if (failed) {
tr.set(encodeFailedLocalityKey(l), StringRef());
} else {
tr.set(encodeExcludedLocalityKey(l), StringRef());
}
}
TraceEvent("ExcludeLocalitiesCommit").detail("Localities", describe(*localities)).detail("ExcludeFailed", failed);
TraceEvent("ExcludeLocalitiesCommit").detail("Localities", describe(localities)).detail("ExcludeFailed", failed);
}
// Exclude the servers matching the given set of localities from use as state servers.
// excludes localities by setting the keys.
ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string>* localities, bool failed) {
ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string> localities, bool failed) {
if (cx->apiVersionAtLeast(700)) {
state ReadYourWritesTransaction ryw(cx);
loop {
try {
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
ryw.set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
failed ? "failedlocality" : "excludedlocality", "force"),
failed ? "failed_locality" : "excluded_locality", "force"),
ValueRef());
for (const auto& l : *localities) {
for (const auto& l : localities) {
Key addr = failed
? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l)
: SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l);
ryw.set(addr, ValueRef());
}
TraceEvent("ExcludeLocalitiesSpecialKeySpaceCommit")
.detail("Localities", describe(*localities))
.detail("Localities", describe(localities))
.detail("ExcludeFailed", failed);
wait(ryw.commit());
@ -1762,7 +1762,7 @@ ACTOR Future<Void> includeServers(Database cx, vector<AddressExclusion> servers,
// Remove the given localities from the exclusion list.
// include localities by clearing the keys.
ACTOR Future<Void> includeLocalities(Database cx, vector<std::string>* localities, bool failed, bool includeAll) {
ACTOR Future<Void> includeLocalities(Database cx, vector<std::string> localities, bool failed, bool includeAll) {
state std::string versionKey = deterministicRandom()->randomUniqueID().toString();
if (cx->apiVersionAtLeast(700)) {
state ReadYourWritesTransaction ryw(cx);
@ -1776,7 +1776,7 @@ ACTOR Future<Void> includeLocalities(Database cx, vector<std::string>* localitie
ryw.clear(SpecialKeySpace::getManamentApiCommandRange("excludedlocality"));
}
} else {
for (const auto& l : *localities) {
for (const auto& l : localities) {
Key locality =
failed ? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l)
: SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l);
@ -1784,7 +1784,7 @@ ACTOR Future<Void> includeLocalities(Database cx, vector<std::string>* localitie
}
}
TraceEvent("IncludeLocalitiesCommit")
.detail("Localities", describe(*localities))
.detail("Localities", describe(localities))
.detail("Failed", failed)
.detail("IncludeAll", includeAll);
@ -1822,7 +1822,7 @@ ACTOR Future<Void> includeLocalities(Database cx, vector<std::string>* localitie
tr.clear(excludedLocalityKeys);
}
} else {
for (const auto& l : *localities) {
for (const auto& l : localities) {
if (failed) {
tr.clear(encodeFailedLocalityKey(l));
} else {
@ -1832,7 +1832,7 @@ ACTOR Future<Void> includeLocalities(Database cx, vector<std::string>* localitie
}
TraceEvent("IncludeLocalitiesCommit")
.detail("Localities", describe(*localities))
.detail("Localities", describe(localities))
.detail("Failed", failed)
.detail("IncludeAll", includeAll);
@ -1955,7 +1955,7 @@ ACTOR Future<vector<std::string>> getExcludedLocalities(Database cx) {
// Decodes the locality string to a pair of locality prefix and its value.
// The prefix could be dcid, processid, machineid, processid.
std::pair<std::string, std::string> decodeLocality(std::string& locality) {
std::pair<std::string, std::string> decodeLocality(const std::string& locality) {
StringRef localityRef(locality.c_str());
if (localityRef.startsWith(ExcludeLocalityKeyDcIdPrefix)) {
return std::make_pair(LocalityData::keyDcId.toString(),
@ -1976,7 +1976,8 @@ std::pair<std::string, std::string> decodeLocality(std::string& locality) {
// Returns the list of IPAddresses of the workers that match the given locality.
// Example: locality="dcid:primary" returns all the ip addresses of the workers in the primary dc.
std::set<AddressExclusion> getAddressesByLocality(std::vector<ProcessData>& workers, std::string locality) {
std::set<AddressExclusion> getAddressesByLocality(const std::vector<ProcessData>& workers,
const std::string& locality) {
std::pair<std::string, std::string> localityKeyValue = decodeLocality(locality);
std::set<AddressExclusion> localityAddresses;

View File

@ -165,8 +165,8 @@ void excludeServers(Transaction& tr, vector<AddressExclusion>& servers, bool fai
// Exclude the servers matching the given set of localities from use as state servers. Returns as soon as the change
// is durable, without necessarily waiting for the servers to be evacuated.
ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string>* localities, bool failed = false);
void excludeLocalities(Transaction& tr, std::unordered_set<std::string>* localities, bool failed = false);
ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string> localities, bool failed = false);
void excludeLocalities(Transaction& tr, std::unordered_set<std::string> localities, bool failed = false);
// Remove the given servers from the exclusion list. A NetworkAddress with a port of 0 means all servers on the given
// IP. A NetworkAddress() means all servers (don't exclude anything)
@ -174,7 +174,7 @@ ACTOR Future<Void> includeServers(Database cx, vector<AddressExclusion> servers,
// Remove the given localities from the exclusion list.
ACTOR Future<Void> includeLocalities(Database cx,
vector<std::string>* localities,
vector<std::string> localities,
bool failed = false,
bool includeAll = false);
@ -190,7 +190,7 @@ ACTOR Future<vector<AddressExclusion>> getExcludedServers(Transaction* tr);
ACTOR Future<vector<std::string>> getExcludedLocalities(Database cx);
ACTOR Future<vector<std::string>> getExcludedLocalities(Transaction* tr);
std::set<AddressExclusion> getAddressesByLocality(std::vector<ProcessData>& workers, std::string locality);
std::set<AddressExclusion> getAddressesByLocality(const std::vector<ProcessData>& workers, const std::string& locality);
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is
// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and

View File

@ -747,7 +747,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"coordinators_count":1,
"excluded_servers":[
{
"address":"10.0.4.1"
"address":"10.0.4.1",
"locality":"processid:e9816ca4a89ff64ddb7ba2a5ec10b75b"
}
],
"auto_commit_proxies":3,

View File

@ -106,8 +106,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
std::set<std::string> SpecialKeySpace::options = { "excluded/force",
"failed/force",
"excludedlocality/force",
"failedlocality/force" };
"excluded_locality/force",
"failed_locality/force" };
std::set<std::string> SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey };
@ -2211,7 +2211,7 @@ ACTOR Future<Optional<std::string>> excludeLocalityCommitActor(ReadYourWritesTra
return result;
// If force option is not set, we need to do safety check
auto force = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
failed ? "failedlocality" : "excludedlocality", "force")];
failed ? "failed_locality" : "excluded_locality", "force")];
// only do safety check when we have localities to be excluded and the force option key is not set
if (localities.size() && !(force.first && force.second.present())) {
bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, failed, &result));
@ -2219,7 +2219,7 @@ ACTOR Future<Optional<std::string>> excludeLocalityCommitActor(ReadYourWritesTra
return result;
}
excludeLocalities(ryw->getTransaction(), &localities, failed);
excludeLocalities(ryw->getTransaction(), localities, failed);
includeLocalities(ryw);
return result;

View File

@ -35,7 +35,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
bool enabled, killProcesses;
int minMachinesToKill, maxMachinesToKill, maxSafetyCheckRetries;
double minDelay, maxDelay;
double kill1Timeout, kill2Timeout, kill3Timeout;
double kill1Timeout, kill2Timeout;
std::set<AddressExclusion> toKill1, toKill2;
std::map<AddressExclusion, Optional<Standalone<StringRef>>> machine_ids; // ip -> Locality Zone id
@ -52,7 +52,6 @@ struct RemoveServersSafelyWorkload : TestWorkload {
maxDelay = getOption(options, LiteralStringRef("maxDelay"), 60.0);
kill1Timeout = getOption(options, LiteralStringRef("kill1Timeout"), 60.0);
kill2Timeout = getOption(options, LiteralStringRef("kill2Timeout"), 6000.0);
kill3Timeout = getOption(options, LiteralStringRef("kill3Timeout"), 6000.0);
killProcesses = deterministicRandom()->random01() < 0.5;
if (g_network->isSimulated()) {
g_simulator.allowLogSetKills = false;
@ -144,7 +143,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
if (!enabled)
return Void();
double delay = deterministicRandom()->random01() * (maxDelay - minDelay) + minDelay;
return workloadMain(this, cx, delay, toKill1, toKill2, minMachinesToKill, maxMachinesToKill);
return workloadMain(this, cx, delay, toKill1, toKill2);
}
Future<bool> check(Database const& cx) override { return true; }
@ -338,9 +337,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
Database cx,
double waitSeconds,
std::set<AddressExclusion> toKill1,
std::set<AddressExclusion> toKill2,
int minMachinesToKill,
int maxMachinesToKill) {
std::set<AddressExclusion> toKill2) {
wait(updateProcessIds(cx));
wait(delay(waitSeconds));
@ -381,6 +378,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
// Include the servers, if unable to exclude
// Reinclude when buggify is on to increase the surface area of the next set of excludes
state bool failed = true;
if (!bClearedFirst || BUGGIFY) {
// Get the updated list of processes which may have changed due to reboots, deletes, etc
TraceEvent("RemoveAndKill")
@ -389,6 +387,8 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("ToKill", describe(toKill1))
.detail("ClusterAvailable", g_simulator.isAvailable());
wait(includeServers(cx, vector<AddressExclusion>(1)));
wait(includeLocalities(cx, vector<std::string>(), failed, true));
wait(includeLocalities(cx, vector<std::string>(), !failed, true));
self->includeAddresses(toKill1);
}
@ -425,85 +425,16 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("ToKill", describe(toKill2))
.detail("ClusterAvailable", g_simulator.isAvailable());
// Reinclude all of the machine, if buggified
if (BUGGIFY) {
// Get the updated list of processes which may have changed due to reboots, deletes, etc
TraceEvent("RemoveAndKill")
.detail("Step", "include all second")
.detail("KillTotal", toKill2.size())
.detail("ToKill", describe(toKill2))
.detail("ClusterAvailable", g_simulator.isAvailable());
wait(includeServers(cx, vector<AddressExclusion>(1)));
self->includeAddresses(toKill2);
}
std::unordered_set<std::string> localities;
auto processes = getServers();
// collecting all localities
for (const auto& it : processes) {
if (it->locality.dcId().present())
localities.insert(ExcludeLocalityKeyDcIdPrefix.toString() + it->locality.dcId().get().toString());
if (it->locality.zoneId().present())
localities.insert(ExcludeLocalityKeyZoneIdPrefix.toString() + it->locality.zoneId().get().toString());
if (it->locality.machineId().present())
localities.insert(ExcludeLocalityKeyMachineIdPrefix.toString() +
it->locality.machineId().get().toString());
if (it->locality.processId().present())
localities.insert(ExcludeLocalityKeyProcessIdPrefix.toString() +
it->locality.processId().get().toString());
}
int localitiesCount = localities.size();
int nToKill3 = deterministicRandom()->randomInt(std::min(localitiesCount, minMachinesToKill),
std::min(localitiesCount, maxMachinesToKill) + 1);
// get random subset of localities.
state std::set<std::string> toKill3 = random_subset(localities, nToKill3);
TraceEvent("RemoveAndKillLocalities")
.detail("LocalitiesSize", localities.size())
.detail("Kill3Size", toKill3.size())
.detail("ToKill3", describe(toKill3));
// toKill3 may kill too many servers to make cluster unavailable.
// Get the processes in toKill3 that are safe to kill
state std::set<AddressExclusion> toKill3Addresses = wait(self->getLocalitiesAddresses(cx, toKill3));
killProcArray = self->protectServers(toKill3Addresses);
// Update the kill networks to the killable localities
toKill3 = self->getSafeLocalitiesToKill(killProcArray);
state bool failed =
deterministicRandom()->randomInt(0, 2) ? true : false; // excluding with random failed option
TraceEvent("RemoveAndKillLocalities")
.detail("Step", "exclude third list")
.detail("ToKill3AddressesSize", toKill3Addresses.size())
.detail("KillProcArraySize", killProcArray.size())
.detail("Kill3Size", toKill3.size())
.detail("ToKill3", describe(toKill3))
// Get the updated list of processes which may have changed due to reboots, deletes, etc
TraceEvent("RemoveAndKill")
.detail("Step", "include all second")
.detail("KillTotal", toKill2.size())
.detail("ToKill", describe(toKill2))
.detail("ClusterAvailable", g_simulator.isAvailable());
// exclude localities
wait(reportErrors(
timeoutError(removeAndKillLocalities(self, cx, toKill3, bClearedFirst ? &toKill2 : nullptr, failed),
self->kill3Timeout),
"RemoveServersSafelyError",
UID()));
TraceEvent("RemoveAndKillLocalities")
.detail("Step", "excluded thrid list")
.detail("Kill3Size", toKill3.size())
.detail("ToKill3", describe(toKill3))
.detail("ClusterAvailable", g_simulator.isAvailable());
// Reinclude all of the machine, if buggified
if (BUGGIFY) {
// Get the updated list of processes which may have changed due to reboots, deletes, etc
TraceEvent("RemoveAndKillLocalities")
.detail("Step", "include all third")
.detail("Kill3Size", toKill3.size())
.detail("ToKill3", describe(toKill3))
.detail("ClusterAvailable", g_simulator.isAvailable());
vector<std::string> emptyLocalities;
wait(includeLocalities(cx, &emptyLocalities, failed, true));
}
wait(includeServers(cx, vector<AddressExclusion>(1)));
wait(includeLocalities(cx, vector<std::string>(), failed, true));
wait(includeLocalities(cx, vector<std::string>(), !failed, true));
self->includeAddresses(toKill2);
return Void();
}
@ -591,7 +522,10 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("Step", "Including all")
.detail("ClusterAvailable", g_simulator.isAvailable())
.detail("MarkExcludeAsFailed", markExcludeAsFailed);
state bool failed = true;
wait(includeServers(cx, vector<AddressExclusion>(1)));
wait(includeLocalities(cx, vector<std::string>(), failed, true));
wait(includeLocalities(cx, vector<std::string>(), !failed, true));
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Included all")
.detail("ClusterAvailable", g_simulator.isAvailable())
@ -692,20 +626,60 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("FailedAddresses", describe(toKillMarkFailedArray))
.detail("ClusterAvailable", g_simulator.isAvailable())
.detail("MarkExcludeAsFailed", markExcludeAsFailed);
state bool excludeLocalitiesInsteadOfServers = deterministicRandom()->coinflip();
if (markExcludeAsFailed) {
wait(excludeServers(cx, toKillMarkFailedArray, true));
if (excludeLocalitiesInsteadOfServers) {
state std::unordered_set<std::string> toKillLocalitiesFailed =
self->getLocalitiesFromAddresses(toKillMarkFailedArray);
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding localities with failed option")
.detail("FailedAddressesSize", toKillMarkFailedArray.size())
.detail("FailedAddresses", describe(toKillMarkFailedArray))
.detail("FailedLocaitiesSize", toKillLocalitiesFailed.size())
.detail("FailedLocaities", describe(toKillLocalitiesFailed));
wait(excludeLocalities(cx, toKillLocalitiesFailed, true));
} else {
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding servers with failed option")
.detail("FailedAddressesSize", toKillMarkFailedArray.size())
.detail("FailedAddresses", describe(toKillMarkFailedArray));
wait(excludeServers(cx, toKillMarkFailedArray, true));
}
}
if (excludeLocalitiesInsteadOfServers) {
state std::unordered_set<std::string> toKillLocalities = self->getLocalitiesFromAddresses(toKillArray);
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding localities without failed option")
.detail("AddressesSize", toKillArray.size())
.detail("Addresses", describe(toKillArray))
.detail("LocaitiesSize", toKillLocalities.size())
.detail("Locaities", describe(toKillLocalities));
wait(excludeLocalities(cx, toKillLocalities, false));
} else {
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding servers without failed option")
.detail("AddressesSize", toKillArray.size())
.detail("Addresses", describe(toKillArray));
wait(excludeServers(cx, toKillArray));
}
wait(excludeServers(cx, toKillArray));
// We need to skip at least the quorum change if there's nothing to kill, because there might not be enough
// servers left alive to do a coordinators auto (?)
if (toKill.size()) {
// Wait for removal to be safe
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Wait For Server Exclusion")
.detail("Addresses", describe(toKill))
.detail("ClusterAvailable", g_simulator.isAvailable());
wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */)));
if (!excludeLocalitiesInsteadOfServers) {
// Wait for removal to be safe
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Wait For Server Exclusion")
.detail("Addresses", describe(toKill))
.detail("ClusterAvailable", g_simulator.isAvailable());
wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */)));
}
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "coordinators auto")
@ -768,181 +742,63 @@ struct RemoveServersSafelyWorkload : TestWorkload {
return subset;
}
// creates a random set of size n from given unordered_set.
template <class T>
static std::set<T> random_subset(std::unordered_set<T>& s, int n) {
return random_subset(std::vector<T>(s.begin(), s.end()), n);
}
// creates a random set of size n from given set.
template <class T>
static std::set<T> random_subset(std::set<T>& s, int n) {
return random_subset(std::vector<T>(s.begin(), s.end()), n);
}
bool killContainsProcess(AddressExclusion kill, NetworkAddress process) {
return kill.excludes(process) || (machineProcesses.find(kill) != machineProcesses.end() &&
machineProcesses[kill].count(AddressExclusion(process.ip, process.port)) > 0);
}
// Returns the list of IPAddresses of the workers that match the given list of localities.
ACTOR Future<std::set<AddressExclusion>> getLocalitiesAddresses(Database cx, std::set<std::string> localities) {
state Transaction tr(cx);
state std::vector<ProcessData> workers = wait(getWorkers(&tr));
state std::set<AddressExclusion> addressesSet;
for (const auto& l : localities) {
std::set<AddressExclusion> localityAddresses = getAddressesByLocality(workers, l);
addressesSet.insert(localityAddresses.begin(), localityAddresses.end());
}
return addressesSet;
}
// Finds the safe localities list that can be excluded from the killable safeProcesses list.
// Finds the localities list that can be excluded from the safe killable addresses list.
// If excluding based on a particular locality of the safe process, kills any other process, that
// particular locality is not included in the killable safeLocalities list.
std::set<std::string> getSafeLocalitiesToKill(std::vector<ISimulator::ProcessInfo*> const& safeProcesses) {
std::unordered_map<std::string, int> safeLocalitiesCount;
for (const auto& processInfo : safeProcesses) {
if (processInfo->locality.dcId().present())
safeLocalitiesCount[ExcludeLocalityKeyDcIdPrefix.toString() +
processInfo->locality.dcId().get().toString()]++;
if (processInfo->locality.machineId().present())
safeLocalitiesCount[ExcludeLocalityKeyMachineIdPrefix.toString() +
processInfo->locality.machineId().get().toString()]++;
if (processInfo->locality.processId().present())
safeLocalitiesCount[ExcludeLocalityKeyProcessIdPrefix.toString() +
processInfo->locality.processId().get().toString()]++;
if (processInfo->locality.zoneId().present())
safeLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() +
processInfo->locality.zoneId().get().toString()]++;
}
auto processes = getServers();
// particular locality is not included in the killable localities list.
std::unordered_set<std::string> getLocalitiesFromAddresses(const std::vector<AddressExclusion>& addresses) {
std::unordered_map<std::string, int> allLocalitiesCount;
std::unordered_map<std::string, int> killableLocalitiesCount;
auto processes = getServers();
for (const auto& processInfo : processes) {
if (processInfo->locality.dcId().present())
allLocalitiesCount[ExcludeLocalityKeyDcIdPrefix.toString() +
processInfo->locality.dcId().get().toString()]++;
if (processInfo->locality.zoneId().present())
allLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() +
processInfo->locality.zoneId().get().toString()]++;
if (processInfo->locality.machineId().present())
allLocalitiesCount[ExcludeLocalityKeyMachineIdPrefix.toString() +
processInfo->locality.machineId().get().toString()]++;
if (processInfo->locality.processId().present())
allLocalitiesCount[ExcludeLocalityKeyProcessIdPrefix.toString() +
processInfo->locality.processId().get().toString()]++;
if (processInfo->locality.zoneId().present())
allLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() +
processInfo->locality.zoneId().get().toString()]++;
AddressExclusion pAddr(processInfo->address.ip, processInfo->address.port);
if (std::find(addresses.begin(), addresses.end(), pAddr) != addresses.end()) {
if (processInfo->locality.dcId().present())
killableLocalitiesCount[ExcludeLocalityKeyDcIdPrefix.toString() +
processInfo->locality.dcId().get().toString()]++;
if (processInfo->locality.zoneId().present())
killableLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() +
processInfo->locality.zoneId().get().toString()]++;
if (processInfo->locality.machineId().present())
killableLocalitiesCount[ExcludeLocalityKeyMachineIdPrefix.toString() +
processInfo->locality.machineId().get().toString()]++;
if (processInfo->locality.processId().present())
killableLocalitiesCount[ExcludeLocalityKeyProcessIdPrefix.toString() +
processInfo->locality.processId().get().toString()]++;
}
}
std::set<std::string> safeLocalities;
for (const auto& l : safeLocalitiesCount) {
std::unordered_set<std::string> toKillLocalities;
for (const auto& l : killableLocalitiesCount) {
if (l.second == allLocalitiesCount[l.first]) {
safeLocalities.insert(l.first);
toKillLocalities.insert(l.first);
}
}
return safeLocalities;
}
// Attempts to exclude a set of localities, and once the exclusion is successful it kills them.
// If markExcludeAsFailed is true, then it is an error if we cannot complete the exclusion.
ACTOR static Future<Void> removeAndKillLocalities(RemoveServersSafelyWorkload* self,
Database cx,
std::set<std::string> toKill,
std::set<AddressExclusion>* pIncAddrs,
bool markExcludeAsFailed) {
state UID functionId = nondeterministicRandom()->randomUniqueID();
// First clear the exclusion list and exclude the given list
TraceEvent("RemoveAndKillLocalities", functionId)
.detail("Step", "Including all")
.detail("ClusterAvailable", g_simulator.isAvailable())
.detail("MarkExcludeAsFailed", markExcludeAsFailed);
wait(includeServers(cx, vector<AddressExclusion>(1)));
TraceEvent("RemoveAndKillLocalities", functionId)
.detail("Step", "Included all")
.detail("ClusterAvailable", g_simulator.isAvailable())
.detail("MarkExcludeAsFailed", markExcludeAsFailed);
// Reinclude the addresses that were excluded, if present
if (pIncAddrs) {
self->includeAddresses(*pIncAddrs);
}
state std::set<std::string> toKillMarkFailedSet;
// if markExcludedasFailed is true, get random subset of tokill, check for
// safe exclusions and exclude them with failed option.
if (markExcludeAsFailed) {
state int retries = 0;
loop {
state bool safe = false;
toKillMarkFailedSet = random_subset(toKill, deterministicRandom()->randomInt(0, toKill.size() + 1));
state std::set<AddressExclusion> toKillMarkFailedAddressesSet =
wait(self->getLocalitiesAddresses(cx, toKillMarkFailedSet));
state std::vector<AddressExclusion> toKillMarkFailedAddressesVector(
toKillMarkFailedAddressesSet.begin(), toKillMarkFailedAddressesSet.end());
TraceEvent("RemoveAndKillLocalities", functionId)
.detail("Step", "SafetyCheck")
.detail("Exclusion Localities", describe(toKillMarkFailedSet))
.detail("Exclusion Addresses", describe(toKillMarkFailedAddressesSet));
choose {
when(bool _safe = wait(checkSafeExclusions(cx, toKillMarkFailedAddressesVector))) {
safe = _safe && self->protectServers(toKillMarkFailedAddressesSet).size() ==
toKillMarkFailedAddressesSet.size();
}
when(wait(delay(5.0))) {
TraceEvent("RemoveAndKillLocalities", functionId)
.detail("Step", "SafetyCheckTimedOut")
.detail("Exclusion Localities", describe(toKillMarkFailedSet))
.detail("Exclusion Addresses", describe(toKillMarkFailedAddressesSet));
}
}
if (retries == self->maxSafetyCheckRetries) {
// Do not mark as failed if limit is reached
TraceEvent("RemoveAndKillLocalities", functionId)
.detail("Step", "SafetyCheckLimitReached")
.detail("Retries", retries);
markExcludeAsFailed = false;
safe = true;
}
if (safe)
break;
retries++;
}
}
state std::unordered_set<std::string> toKillUnorderedSet;
state std::unordered_set<std::string> toKillMarkFailedUnorderedSet;
toKillUnorderedSet.insert(toKill.begin(), toKill.end());
toKillMarkFailedUnorderedSet.insert(toKillMarkFailedSet.begin(), toKillMarkFailedSet.end());
TraceEvent("RemoveAndKillLocalities", functionId)
.detail("Step", "Activate Localities Exclusion")
.detail("KillLocalitiesSize", toKill.size())
.detail("ToKill", describe(toKill))
.detail("Localities", describe(toKillUnorderedSet))
.detail("FailedLocalities", describe(toKillMarkFailedUnorderedSet))
.detail("ClusterAvailable", g_simulator.isAvailable())
.detail("MarkExcludeAsFailed", markExcludeAsFailed);
// exclude localities with failed option as true
if (markExcludeAsFailed) {
wait(excludeLocalities(cx, &toKillMarkFailedUnorderedSet, true));
}
// exclude localities with failed option as false.
wait(excludeLocalities(cx, &toKillUnorderedSet));
TraceEvent("RemoveAndKillLocalities", functionId)
.detail("Step", "done")
.detail("ClusterAvailable", g_simulator.isAvailable());
return Void();
return toKillLocalities;
}
// Update the g_simulator processes list with the process ids
// of the workers, that are generated as part of worker creation.
ACTOR static Future<Void> updateProcessIds(Database cx) {
Transaction tr(cx);
std::vector<ProcessData> workers = wait(getWorkers(&tr));
std::vector<ProcessData> workers = wait(getWorkers(cx));
std::unordered_map<NetworkAddress, int> addressToIndexMap;
for (int i = 0; i < workers.size(); i++) {
addressToIndexMap[workers[i].address] = i;