Implement check if locality is already excluded in exclude locality command
This commit is contained in:
parent
bfd0c3a8e5
commit
50a20946d1
|
@ -1678,24 +1678,36 @@ ACTOR Future<Void> excludeServers(Database cx, std::vector<AddressExclusion> ser
|
|||
}
|
||||
|
||||
// excludes localities by setting the keys in api version below 7.0
|
||||
// TODO(zhewu): update this function that if the locality is already excluded, don't need to update the system metadata.
|
||||
void excludeLocalities(Transaction& tr, std::unordered_set<std::string> localities, bool failed) {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
|
||||
std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString();
|
||||
auto localityVersionKey = failed ? failedLocalityVersionKey : excludedLocalityVersionKey;
|
||||
tr.addReadConflictRange(singleKeyRange(localityVersionKey)); // To conflict with parallel includeLocalities
|
||||
tr.set(localityVersionKey, excludeVersionKey);
|
||||
ACTOR Future<Void> excludeLocalities(Transaction* tr, std::unordered_set<std::string> localities, bool failed) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
|
||||
std::vector<std::string> excl = wait(failed ? getExcludedFailedLocalityList(tr) : getExcludedLocalityList(tr));
|
||||
std::set<std::string> exclusion(excl.begin(), excl.end());
|
||||
bool containNewExclusion = false;
|
||||
for (const auto& l : localities) {
|
||||
if (exclusion.find(l) != exclusion.end()) {
|
||||
continue;
|
||||
}
|
||||
containNewExclusion = true;
|
||||
if (failed) {
|
||||
tr.set(encodeFailedLocalityKey(l), StringRef());
|
||||
tr->set(encodeFailedLocalityKey(l), StringRef());
|
||||
} else {
|
||||
tr.set(encodeExcludedLocalityKey(l), StringRef());
|
||||
tr->set(encodeExcludedLocalityKey(l), StringRef());
|
||||
}
|
||||
}
|
||||
TraceEvent("ExcludeLocalitiesCommit").detail("Localities", describe(localities)).detail("ExcludeFailed", failed);
|
||||
if (containNewExclusion) {
|
||||
std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString();
|
||||
auto localityVersionKey = failed ? failedLocalityVersionKey : excludedLocalityVersionKey;
|
||||
tr->addReadConflictRange(singleKeyRange(localityVersionKey)); // To conflict with parallel includeLocalities
|
||||
tr->set(localityVersionKey, excludeVersionKey);
|
||||
}
|
||||
TraceEvent("ExcludeLocalitiesCommit")
|
||||
.detail("Localities", describe(localities))
|
||||
.detail("ExcludeFailed", failed)
|
||||
.detail("ExclusionUpdated", containNewExclusion);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Exclude the servers matching the given set of localities from use as state servers.
|
||||
|
@ -1731,7 +1743,7 @@ ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string
|
|||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
excludeLocalities(tr, localities, failed);
|
||||
wait(excludeLocalities(&tr, localities, failed));
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
|
@ -2018,19 +2030,25 @@ ACTOR Future<std::vector<AddressExclusion>> getAllExcludedServers(Database cx) {
|
|||
}
|
||||
}
|
||||
|
||||
// Get the current list of excluded localities by reading the keys.
|
||||
ACTOR Future<std::vector<std::string>> getExcludedLocalities(Transaction* tr) {
|
||||
ACTOR Future<std::vector<std::string>> getExcludedLocalityList(Transaction* tr) {
|
||||
state RangeResult r = wait(tr->getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
state RangeResult r2 = wait(tr->getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!r2.more && r2.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
std::vector<std::string> excludedLocalities;
|
||||
for (const auto& i : r) {
|
||||
auto a = decodeExcludedLocalityKey(i.key);
|
||||
excludedLocalities.push_back(a);
|
||||
}
|
||||
for (const auto& i : r2) {
|
||||
uniquify(excludedLocalities);
|
||||
return excludedLocalities;
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<std::string>> getExcludedFailedLocalityList(Transaction* tr) {
|
||||
state RangeResult r = wait(tr->getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
std::vector<std::string> excludedLocalities;
|
||||
for (const auto& i : r) {
|
||||
auto a = decodeFailedLocalityKey(i.key);
|
||||
excludedLocalities.push_back(a);
|
||||
}
|
||||
|
@ -2038,15 +2056,25 @@ ACTOR Future<std::vector<std::string>> getExcludedLocalities(Transaction* tr) {
|
|||
return excludedLocalities;
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<std::string>> getAllExcludedLocalities(Transaction* tr) {
|
||||
state std::vector<std::string> exclusions;
|
||||
std::vector<std::string> excludedLocalities = wait(getExcludedLocalityList(tr));
|
||||
exclusions.insert(exclusions.end(), excludedLocalities.begin(), excludedLocalities.end());
|
||||
std::vector<std::string> failedLocalities = wait(getExcludedFailedLocalityList(tr));
|
||||
exclusions.insert(exclusions.end(), failedLocalities.begin(), failedLocalities.end());
|
||||
uniquify(exclusions);
|
||||
return exclusions;
|
||||
}
|
||||
|
||||
// Get the list of excluded localities by reading the keys.
|
||||
ACTOR Future<std::vector<std::string>> getExcludedLocalities(Database cx) {
|
||||
ACTOR Future<std::vector<std::string>> getAllExcludedLocalities(Database cx) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
std::vector<std::string> exclusions = wait(getExcludedLocalities(&tr));
|
||||
std::vector<std::string> exclusions = wait(getAllExcludedLocalities(&tr));
|
||||
return exclusions;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
|
|
@ -2766,7 +2766,7 @@ ACTOR Future<Optional<std::string>> excludeLocalityCommitActor(ReadYourWritesTra
|
|||
return result;
|
||||
}
|
||||
|
||||
excludeLocalities(ryw->getTransaction(), localities, failed);
|
||||
wait(excludeLocalities(&ryw->getTransaction(), localities, failed));
|
||||
includeLocalities(ryw);
|
||||
|
||||
return result;
|
||||
|
|
|
@ -72,7 +72,7 @@ ACTOR Future<Void> excludeServers(Transaction* tr, std::vector<AddressExclusion>
|
|||
// Exclude the servers matching the given set of localities from use as state servers. Returns as soon as the change
|
||||
// is durable, without necessarily waiting for the servers to be evacuated.
|
||||
ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string> localities, bool failed = false);
|
||||
void excludeLocalities(Transaction& tr, std::unordered_set<std::string> localities, bool failed = false);
|
||||
ACTOR Future<Void> excludeLocalities(Transaction* tr, std::unordered_set<std::string> localities, bool failed = false);
|
||||
|
||||
// Remove the given servers from the exclusion list. A NetworkAddress with a port of 0 means all servers on the given
|
||||
// IP. A NetworkAddress() means all servers (don't exclude anything)
|
||||
|
@ -99,8 +99,14 @@ ACTOR Future<std::vector<AddressExclusion>> getExcludedServerList(Transaction* t
|
|||
ACTOR Future<std::vector<AddressExclusion>> getExcludedFailedServerList(Transaction* tr);
|
||||
|
||||
// Get the current list of excluded localities
|
||||
ACTOR Future<std::vector<std::string>> getExcludedLocalities(Database cx);
|
||||
ACTOR Future<std::vector<std::string>> getExcludedLocalities(Transaction* tr);
|
||||
ACTOR Future<std::vector<std::string>> getAllExcludedLocalities(Database cx);
|
||||
ACTOR Future<std::vector<std::string>> getAllExcludedLocalities(Transaction* tr);
|
||||
|
||||
// Get the current list of excluded localities.
|
||||
ACTOR Future<std::vector<std::string>> getExcludedLocalityList(Transaction* tr);
|
||||
|
||||
// Get the current list of failed localities.
|
||||
ACTOR Future<std::vector<std::string>> getExcludedFailedLocalityList(Transaction* tr);
|
||||
|
||||
std::set<AddressExclusion> getAddressesByLocality(const std::vector<ProcessData>& workers, const std::string& locality);
|
||||
|
||||
|
|
Loading…
Reference in New Issue