Add special keys for maintenance and datadistribution

This commit is contained in:
Chaoguang Lin 2021-03-26 12:19:33 -07:00
parent d4940d9598
commit 0eff74f205
4 changed files with 373 additions and 1 deletions

View File

@ -970,6 +970,16 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
std::make_unique<ClientProfilingImpl>(
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<MaintenanceImpl>(
KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<DataDistributionImpl>(
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,

View File

@ -79,6 +79,10 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
{ "advanceversion", singleKeyRange(LiteralStringRef("min_required_commit_version"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "profile", KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "maintenance", KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "datadistribution", KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
};
@ -1679,6 +1683,7 @@ ACTOR static Future<Standalone<RangeResultRef>> ClientProfilingGetRangeActor(Rea
return result;
}
// TODO : add limitation on set operation
Future<Standalone<RangeResultRef>> ClientProfilingImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
return ClientProfilingGetRangeActor(ryw, getKeyRange().begin, kr);
}
@ -1734,3 +1739,173 @@ void ClientProfilingImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& ke
ryw, "profile",
"Clear operation is forbidden for profile client. You can set it to default to disable profiling.");
}
MaintenanceImpl::MaintenanceImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
ACTOR static Future<Standalone<RangeResultRef>> MaintenanceGetRangeActor(ReadYourWritesTransaction* ryw, KeyRef prefix,
KeyRangeRef kr) {
state Standalone<RangeResultRef> result;
// zoneId
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(ryw->getTransaction().get(healthyZoneKey));
if (val.present()) {
TraceEvent(SevDebug, "MaintenanceDebug2").detail("KeyRange", kr.toString());
auto healthyZone = decodeHealthyZoneValue(val.get());
if ((healthyZone.first == ignoreSSFailuresZoneString) ||
(healthyZone.second > ryw->getTransaction().getReadVersion().get())) {
Key zone_key = healthyZone.first.withPrefix(prefix);
int64_t seconds = healthyZone.first == ignoreSSFailuresZoneString
? 0
: (healthyZone.second - ryw->getTransaction().getReadVersion().get()) /
CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
if (kr.contains(zone_key)) {
result.push_back_deep(result.arena(),
KeyValueRef(zone_key, Value(boost::lexical_cast<std::string>(seconds))));
}
}
}
return rywGetRange(ryw, kr, result);
}
Future<Standalone<RangeResultRef>> MaintenanceImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
return MaintenanceGetRangeActor(ryw, getKeyRange().begin, kr);
}
ACTOR static Future<Optional<std::string>> maintenanceCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
// read
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
ryw->getTransaction().setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> val = wait(ryw->getTransaction().get(healthyZoneKey));
Optional<std::pair<Key, Version>> healthyZone =
val.present() ? decodeHealthyZoneValue(val.get()) : Optional<std::pair<Key, Version>>();
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(kr);
Key zoneId;
int64_t seconds;
bool isSet = false;
// Since maintenance only allows one zone at the same time,
// if a transaction has more than one set operation on different zone keys,
// the commit will throw an error
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (!iter->value().first) continue;
if (iter->value().second.present()) {
if (isSet)
return Optional<std::string>(ManagementAPIError::toJsonString(
false, "maintenance", "Multiple zones given for maintenance, only one allowed at the same time"));
isSet = true;
zoneId = iter->begin().removePrefix(kr.begin);
seconds = boost::lexical_cast<int64_t>(iter->value().second.get().toString());
} else {
// if we already have set operation, then all clear operations will be meaningless, thus skip
if (!isSet && healthyZone.present() && iter.range().contains(healthyZone.get().first.withPrefix(kr.begin)))
ryw->getTransaction().clear(healthyZoneKey);
}
}
if (isSet) {
if (healthyZone.present() && healthyZone.get().first == ignoreSSFailuresZoneString) {
std::string msg = "Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures.";
return Optional<std::string>(ManagementAPIError::toJsonString(false, "maintenance", msg));
} else {
TraceEvent(SevDebug, "SKSMaintenanceSet").detail("ZoneId", zoneId.toString());
ryw->getTransaction().set(healthyZoneKey,
healthyZoneValue(zoneId, ryw->getTransaction().getReadVersion().get() +
(seconds * CLIENT_KNOBS->CORE_VERSIONSPERSECOND)));
}
}
return Optional<std::string>();
}
Future<Optional<std::string>> MaintenanceImpl::commit(ReadYourWritesTransaction* ryw) {
return maintenanceCommitActor(ryw, getKeyRange());
}
DataDistributionImpl::DataDistributionImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
ACTOR static Future<Standalone<RangeResultRef>> DataDistributionGetRangeActor(ReadYourWritesTransaction* ryw,
KeyRef prefix, KeyRangeRef kr) {
state Standalone<RangeResultRef> result;
// dataDistributionModeKey
state Key modeKey = LiteralStringRef("mode").withPrefix(prefix);
if (kr.contains(modeKey)) {
auto entry = ryw->getSpecialKeySpaceWriteMap()[modeKey];
if (ryw->readYourWritesDisabled() || !entry.first) {
Optional<Value> f = wait(ryw->getTransaction().get(dataDistributionModeKey));
int mode = -1;
if (f.present()) {
mode = BinaryReader::fromStringRef<int>(f.get(), Unversioned());
}
result.push_back_deep(result.arena(), KeyValueRef(modeKey, Value(boost::lexical_cast<std::string>(mode))));
}
}
// rebalanceDDIgnoreKey
state Key rebalanceIgnoredKey = LiteralStringRef("rebalance_ignored").withPrefix(prefix);
if (kr.contains(rebalanceIgnoredKey)) {
auto entry = ryw->getSpecialKeySpaceWriteMap()[rebalanceIgnoredKey];
if (ryw->readYourWritesDisabled() || !entry.first) {
Optional<Value> f = wait(ryw->getTransaction().get(rebalanceDDIgnoreKey));
if (f.present()) {
result.push_back_deep(result.arena(), KeyValueRef(rebalanceIgnoredKey, Value()));
}
}
}
return rywGetRange(ryw, kr, result);
}
Future<Standalone<RangeResultRef>> DataDistributionImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
return DataDistributionGetRangeActor(ryw, getKeyRange().begin, kr);
}
Future<Optional<std::string>> DataDistributionImpl::commit(ReadYourWritesTransaction* ryw) {
// there are two valid keys in the range
// <prefix>/mode -> dataDistributionModeKey, the value is only allowed to be set as "0"(disable) or "1"(enable)
// <prefix>/rebalance_ignored -> rebalanceDDIgnoreKey, value is unused thus empty
Optional<std::string> msg;
KeyRangeRef kr = getKeyRange();
Key modeKey = LiteralStringRef("mode").withPrefix(kr.begin);
Key rebalanceIgnoredKey = LiteralStringRef("rebalance_ignored").withPrefix(kr.begin);
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(kr);
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
if (!iter->value().first) continue;
if (iter->value().second.present()) {
if (iter->range() == singleKeyRange(modeKey)) {
try {
int mode = boost::lexical_cast<int>(iter->value().second.get().toString());
Value modeVal = BinaryWriter::toValue(mode, Unversioned());
if (mode == 0 || mode == 1)
ryw->getTransaction().set(dataDistributionModeKey, modeVal);
else
msg = ManagementAPIError::toJsonString(false, "datadistribution",
"Please set the value of the data_distribution/mode to "
"0(disable) or 1(enable), other values are not allowed");
} catch (boost::bad_lexical_cast& e) {
msg = ManagementAPIError::toJsonString(false, "datadistribution",
"Invalid datadistribution mode(int): " +
iter->value().second.get().toString());
}
} else if (iter->range() == singleKeyRange(rebalanceIgnoredKey)) {
if (iter->value().second.get().size())
msg =
ManagementAPIError::toJsonString(false, "datadistribution",
"Value is unused for the data_distribution/rebalance_ignored "
"key, please set it to an empty value");
else
ryw->getTransaction().set(rebalanceDDIgnoreKey, LiteralStringRef("on"));
} else {
msg = ManagementAPIError::toJsonString(
false, "datadistribution",
"Changing invalid keys, please read the documentation to check valid keys in the range");
}
} else {
// clear
if (iter->range().contains(modeKey))
ryw->getTransaction().clear(dataDistributionModeKey);
else if (iter->range().contains(rebalanceIgnoredKey))
ryw->getTransaction().clear(rebalanceDDIgnoreKey);
}
}
return msg;
}

View File

@ -363,5 +363,18 @@ public:
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
};
class MaintenanceImpl : public SpecialKeyRangeRWImpl {
public:
explicit MaintenanceImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class DataDistributionImpl : public SpecialKeyRangeRWImpl {
public:
explicit DataDistributionImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -1229,6 +1229,180 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
}
}
}
// data_distribution & maintenance get
loop {
try {
// maintenance
Standalone<RangeResultRef> maintenanceKVs = wait(
tx->getRange(SpecialKeySpace::getManamentApiCommandRange("maintenance"), CLIENT_KNOBS->TOO_MANY));
// By default, no maintenance is going on
ASSERT(!maintenanceKVs.more && !maintenanceKVs.size());
// datadistribution
Standalone<RangeResultRef> ddKVs = wait(tx->getRange(
SpecialKeySpace::getManamentApiCommandRange("datadistribution"), CLIENT_KNOBS->TOO_MANY));
// By default, data_distribution/mode := "-1"
ASSERT(!ddKVs.more && ddKVs.size() == 1);
ASSERT(ddKVs[0].key == LiteralStringRef("mode").withPrefix(
SpecialKeySpace::getManagementApiCommandPrefix("datadistribution")));
ASSERT(ddKVs[0].value == Value(boost::lexical_cast<std::string>(-1)));
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "MaintenanceGet").error(e);
wait(tx->onError(e));
}
}
// maintenance set
{
// Make sure setting more than one zone as maintenance will fail
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(Key(deterministicRandom()->randomAlphaNumeric(8))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
// make sure this is a different zone id
tx->set(Key(deterministicRandom()->randomAlphaNumeric(9))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone").error(e);
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone")
.detail("ErrorMessage", valueObj["message"].get_str());
tx->reset();
break;
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
// Disable DD for SS failures
state int ignoreSSFailuresRetry = 0;
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(ignoreSSFailuresZoneString.withPrefix(
SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
Value(boost::lexical_cast<std::string>(0)));
wait(tx->commit());
tx->reset();
ignoreSSFailuresRetry++;
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} catch (Error& e) {
TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures").error(e);
// the second commit will fail since maintenance not allowed to use while DD disabled for SS
// failures
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
ASSERT(ignoreSSFailuresRetry > 0);
TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures")
.detail("Retry", ignoreSSFailuresRetry)
.detail("ErrorMessage", valueObj["message"].get_str());
tx->reset();
break;
} else {
wait(tx->onError(e));
}
ignoreSSFailuresRetry++;
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
// set dd mode to 0 and disable DD for rebalance
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
tx->set(LiteralStringRef("mode").withPrefix(ddPrefix), LiteralStringRef("0"));
tx->set(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix), Value());
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "DataDistributionDisableModeAndRebalance").error(e);
wait(tx->onError(e));
}
}
// verify underlying system keys are consistent with the change
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// check DD disabled for SS failures
Optional<Value> val1 = wait(tx->get(healthyZoneKey));
ASSERT(val1.present());
auto healthyZone = decodeHealthyZoneValue(val1.get());
ASSERT(healthyZone.first == ignoreSSFailuresZoneString);
// check DD mode
Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
ASSERT(val2.present());
// mode should be set to 0
ASSERT(BinaryReader::fromStringRef<int>(val2.get(), Unversioned()) == 0);
// check DD disabled for rebalance
Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
// default value "on"
ASSERT(val3.present() && val3.get() == LiteralStringRef("on"));
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// then, clear all changes
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->clear(ignoreSSFailuresZoneString.withPrefix(
SpecialKeySpace::getManagementApiCommandPrefix("maintenance")));
KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
tx->clear(LiteralStringRef("mode").withPrefix(ddPrefix));
tx->clear(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix));
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// verify all changes are cleared
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// check DD SSFailures key
Optional<Value> val1 = wait(tx->get(healthyZoneKey));
ASSERT(!val1.present());
// check DD mode
Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
ASSERT(!val2.present());
// check DD rebalance key
Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
ASSERT(!val3.present());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
}
return Void();
}
};