Add special keys for command options

This commit is contained in:
Chaoguang Lin 2020-07-16 18:15:35 -07:00
parent 52778bdf84
commit 5fa6785fb8
4 changed files with 127 additions and 35 deletions

View File

@ -1282,8 +1282,12 @@ ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers,
loop {
try{
ryw.setOption( FDBTransactionOptions::SPECIAL_KEY_SPACE_CHANGE_CONFIGURATION);
Key optionKey = failed ? LiteralStringRef("\xff\xff/conf/options/failed/force")
: LiteralStringRef("\xff\xff/conf/options/exclude/force");
ryw.set(optionKey, ValueRef());
for(auto& s : servers) {
Key addr = failed ? SpecialKeySpace::getCommandPrefix("failed").withSuffix(s.toString()) : SpecialKeySpace::getCommandPrefix("exclude").withSuffix(s.toString());
Key addr = failed ? SpecialKeySpace::getCommandPrefix("failed").withSuffix(s.toString())
: SpecialKeySpace::getCommandPrefix("exclude").withSuffix(s.toString());
ryw.set(addr, ValueRef());
}
TraceEvent("ExcludeServersSpecialKeySpaceCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed);

View File

@ -907,8 +907,12 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
}
));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
std::make_unique<ManagementCommandsOptionsImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/conf/options/"), LiteralStringRef("\xff\xff/conf/options0")
)), true);
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
std::make_unique<ExcludeServersRangeImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/conf/excluded"), LiteralStringRef("\xff\xff/conf/excluded0")
LiteralStringRef("\xff\xff/conf/excluded/"), LiteralStringRef("\xff\xff/conf/excluded0")
)), true);
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
std::make_unique<FailedServersRangeImpl>(KeyRangeRef(

View File

@ -352,6 +352,10 @@ void SpecialKeySpace::set(ReadYourWritesTransaction* ryw, const KeyRef& key, con
auto impl = writeImpls[key];
// TODO : do we need the separate error here to differentiate from read?
if (impl == nullptr) throw special_keys_no_write_module_found();
TraceEvent(SevDebug, "SpecialKeySpaceSet")
.detail("Key", key.toString())
.detail("Value", value.toString())
.detail("Impl", impl->getKeyRange().toString());
return impl->set(ryw, key, value);
}
@ -411,7 +415,11 @@ ACTOR Future<Void> commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction*
while (iter != ranges.end()) {
std::pair<bool, Optional<Value>> entry = iter->value();
if (entry.first) {
writeModulePtrs.insert(sks->getRWImpls().rangeContaining(iter->begin())->value());
auto modulePtr = sks->getRWImpls().rangeContaining(iter->begin())->value();
writeModulePtrs.insert(modulePtr);
TraceEvent(SevDebug, "SpecialKeySpaceWrites")
.detail("Key", iter->begin())
.detail("Range", modulePtr->getKeyRange().toString());
}
++iter;
}
@ -420,7 +428,9 @@ ACTOR Future<Void> commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction*
Optional<std::string> msg = wait((*it)->commit(ryw));
if (msg.present()) {
ryw->setSpecialKeySpaceErrorMsg(msg.get());
TraceEvent(SevDebug, "SpecialKeySpaceManagemetnAPIError").detail("Reason", msg.get());
TraceEvent(SevDebug, "SpecialKeySpaceManagemetnAPIError")
.detail("Reason", msg.get())
.detail("Range", (*it)->getKeyRange().toString());
throw special_keys_management_api_failure();
}
}
@ -500,14 +510,70 @@ Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTran
return ddMetricsGetRangeActor(ryw, kr);
}
std::unordered_set<std::string> ManagementCommandsOptionsImpl::options = { "exclude/force", "failed/force" };
// Optional<Key> ManagementCommandsOptionsImpl::getOptionSpecialKey(const std::string& command, const std::string&
// option) { auto pair = command + "/" + option; if (options.find(pair) != options.end()) { return
// Optional<Key>(getKeyRange().begin.withSuffix(pair)); } else return Optional<Key>();
// }
ManagementCommandsOptionsImpl::ManagementCommandsOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<Standalone<RangeResultRef>> ManagementCommandsOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
// Since we only have limit number of options, a brute force loop here is enough
for (const auto& option : options) {
auto key = getKeyRange().begin.withSuffix(option);
// ignore all invalid keys
auto r = ryw->getSpecialKeySpaceWriteMap()[key];
if (kr.contains(key) && r.first && r.second.present())
result.push_back(result.arena(), KeyValueRef(key, ValueRef()));
}
return result;
}
void ManagementCommandsOptionsImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
std::string option = key.removePrefix(getKeyRange().begin).toString();
// ignore all invalid keys
if (options.find(option) != options.end()) {
TraceEvent(SevDebug, "ManagementAPIOption").detail("Option", option).detail("Key", key);
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(value)));
}
}
void ManagementCommandsOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
// Since we only have limit number of options, a brute force loop here is enough
for (const auto& option : options) {
auto key = getKeyRange().begin.withSuffix(option);
// ignore all invalid keys
if (range.contains(key))
// ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
ryw->getSpecialKeySpaceWriteMap().rawErase(singleKeyRange(key));
}
}
void ManagementCommandsOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
std::string option = key.removePrefix(getKeyRange().begin).toString();
// ignore all invalid keys
if (options.find(option) != options.end()) {
// ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
ryw->getSpecialKeySpaceWriteMap().rawErase(singleKeyRange(key));
}
}
Future<Optional<std::string>> ManagementCommandsOptionsImpl::commit(ReadYourWritesTransaction* ryw) {
// Nothing to do, keys should be used by other impls' commit callback
return Optional<std::string>();
}
// read from rwModule
ACTOR Future<Standalone<RangeResultRef>> rwModuleGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef range,
KeyRangeRef kr) {
KeyRangeRef krWithoutPrefix = kr.removePrefix(normalKeys.end);
KeyRangeRef krWithoutPrefix = kr.removePrefix(normalKeys.end); // TODO : need to update
Standalone<RangeResultRef> resultWithoutPrefix = wait(ryw->getRange(krWithoutPrefix, CLIENT_KNOBS->TOO_MANY));
ASSERT(!resultWithoutPrefix.more && resultWithoutPrefix.size() < CLIENT_KNOBS->TOO_MANY);
Standalone<RangeResultRef> result;
// TODO : add support for readYourWritesDisabled
if (ryw->readYourWritesDisabled()) {
for (const KeyValueRef& kv : resultWithoutPrefix) {
KeyRef rk = kv.key.withPrefix(normalKeys.end, result.arena());
@ -565,7 +631,7 @@ Future<Standalone<RangeResultRef>> ExcludeServersRangeImpl::getRange(ReadYourWri
void ExcludeServersRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
// TODO : check key / value valid
Value val(value);
Value val(value); // TODO : update this one
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(val)));
}
@ -621,9 +687,9 @@ ACTOR Future<bool> checkExclusion(Database db, std::vector<AddressExclusion>* ad
}
if (!safe) {
std::string temp = "ERROR: It is unsafe to exclude the specified servers at this time.\n"
"Please check that this exclusion does not bring down an entire storage team.\n"
"Please also ensure that the exclusion will keep a majority of coordinators alive.\n"
"You may add more storage processes or coordinators to make the operation safe.\n";
"Please check that this exclusion does not bring down an entire storage team.\n"
"Please also ensure that the exclusion will keep a majority of coordinators alive.\n"
"You may add more storage processes or coordinators to make the operation safe.\n";
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", temp);
return false;
}
@ -678,19 +744,22 @@ ACTOR Future<bool> checkExclusion(Database db, std::vector<AddressExclusion>* ad
if (!excluded) {
StatusObjectReader disk;
if (!process.get("disk", disk)) {
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
*msg =
ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
int64_t total_bytes;
if (!disk.get("total_bytes", total_bytes)) {
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
*msg =
ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
int64_t free_bytes;
if (!disk.get("free_bytes", free_bytes)) {
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
*msg =
ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
@ -706,7 +775,8 @@ ACTOR Future<bool> checkExclusion(Database db, std::vector<AddressExclusion>* ad
if (ssExcludedCount == ssTotalCount ||
(1 - worstFreeSpaceRatio) * ssTotalCount / (ssTotalCount - ssExcludedCount) > 0.9) {
std::string temp = "ERROR: This exclude may cause the total free space in the cluster to drop below 10%.";
// "\nType `exclude FORCE <ADDRESS...>' to exclude without checking free space.\n"; // TODO : update message here
// "\nType `exclude FORCE <ADDRESS...>' to exclude without checking free space.\n"; // TODO : update message
// here
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", temp);
return false;
}
@ -751,23 +821,35 @@ void includeServers(ReadYourWritesTransaction* ryw) {
}
}
ACTOR Future<Optional<std::string>> excludeCommitActor(ReadYourWritesTransaction* ryw) {
ACTOR Future<Optional<std::string>> excludeCommitActor(ReadYourWritesTransaction* ryw, bool failed) {
// parse network addresses
state Optional<std::string> result;
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
if (!parseNetWorkAddrFromKeys(ryw, excludedServersKeys.withPrefix(normalKeys.end), addresses, exclusions, result))
if (!parseNetWorkAddrFromKeys(
ryw, failed ? failedServersKeys.withPrefix(normalKeys.end) : excludedServersKeys.withPrefix(normalKeys.end),
addresses, exclusions, result))
return result;
bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, false, &result));
if (!safe) return result;
excludeServers(ryw->getTransaction(), addresses, false);
// If force option is not set, we need to do safety check
auto force = ryw->getSpecialKeySpaceWriteMap()[failed ? LiteralStringRef("\xff\xff/conf/options/failed/force")
: LiteralStringRef("\xff\xff/conf/options/exclude/force")];
TraceEvent(SevDebug, "ExclusionCommit")
.detail("Failed", failed)
.detail("Force", force.first)
.detail("Addresses", describe(addresses));
// only do safety check when we have servers to be excluded and the force option key is not set
if (addresses.size() && !(force.first && force.second.present())) {
bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, failed, &result));
if (!safe) return result;
}
excludeServers(ryw->getTransaction(), addresses, failed);
includeServers(ryw);
return result;
}
Future<Optional<std::string>> ExcludeServersRangeImpl::commit(ReadYourWritesTransaction* ryw) {
return excludeCommitActor(ryw);
return excludeCommitActor(ryw, false);
}
FailedServersRangeImpl::FailedServersRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
@ -791,22 +873,8 @@ void FailedServersRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRang
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
}
ACTOR Future<Optional<std::string>> failedServerCommitActor(ReadYourWritesTransaction* ryw) {
// parse network addresses
state Optional<std::string> result;
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
if (!parseNetWorkAddrFromKeys(ryw, failedServersKeys.withPrefix(normalKeys.end), addresses, exclusions, result))
return result;
bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, true, &result));
if (!safe) return result;
excludeServers(ryw->getTransaction(), addresses, true);
return result;
}
Future<Optional<std::string>> FailedServersRangeImpl::commit(ReadYourWritesTransaction* ryw) {
return failedServerCommitActor(ryw);
return excludeCommitActor(ryw, true);
}
ACTOR Future<Standalone<RangeResultRef>> ExclusionInProgressActor(ReadYourWritesTransaction* ryw, KeyRef prefix,

View File

@ -209,6 +209,22 @@ public:
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class ManagementCommandsOptionsImpl : public SpecialKeyRangeRWImpl {
public:
explicit ManagementCommandsOptionsImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
// // Given a command name and its option name, returns the special key we use to set.
// // No key will be returned if there is no such a mapping
// Optional<Key> getOptionSpecialKey(const std::string& command, const std::string& option);
private:
static std::unordered_set<std::string> options; // "<command>/<option>"
};
class ExcludeServersRangeImpl : public SpecialKeyRangeRWImpl {
public:
explicit ExcludeServersRangeImpl(KeyRangeRef kr);