From 0eff74f2053bd91a37ec947f31edd55e2c29f681 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Fri, 26 Mar 2021 12:19:33 -0700 Subject: [PATCH] Add special keys for maintenance and datadistribution --- fdbclient/NativeAPI.actor.cpp | 10 + fdbclient/SpecialKeySpace.actor.cpp | 177 +++++++++++++++++- fdbclient/SpecialKeySpace.actor.h | 13 ++ .../SpecialKeySpaceCorrectness.actor.cpp | 174 +++++++++++++++++ 4 files changed, 373 insertions(+), 1 deletion(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index f5058df92b..73ad8abc07 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -970,6 +970,16 @@ DatabaseContext::DatabaseContext(Reference( KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0")) .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 4a987238b3..1fc2dbf15f 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -79,7 +79,11 @@ std::unordered_map SpecialKeySpace::managementApiCommandT { "advanceversion", singleKeyRange(LiteralStringRef("min_required_commit_version")) .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, { "profile", KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0")) - .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, + { "maintenance", KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0")) + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, + { "datadistribution", KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0")) + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } }; std::set SpecialKeySpace::options = { "excluded/force", "failed/force" }; @@ -1679,6 +1683,7 @@ ACTOR static Future> ClientProfilingGetRangeActor(Rea return result; } +// TODO : add limitation on set operation Future> ClientProfilingImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { return ClientProfilingGetRangeActor(ryw, getKeyRange().begin, kr); } @@ -1734,3 +1739,173 @@ void ClientProfilingImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& ke ryw, "profile", "Clear operation is forbidden for profile client. You can set it to default to disable profiling."); } + +MaintenanceImpl::MaintenanceImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +ACTOR static Future> MaintenanceGetRangeActor(ReadYourWritesTransaction* ryw, KeyRef prefix, + KeyRangeRef kr) { + state Standalone result; + // zoneId + ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE); + Optional val = wait(ryw->getTransaction().get(healthyZoneKey)); + if (val.present()) { + TraceEvent(SevDebug, "MaintenanceDebug2").detail("KeyRange", kr.toString()); + auto healthyZone = decodeHealthyZoneValue(val.get()); + if ((healthyZone.first == ignoreSSFailuresZoneString) || + (healthyZone.second > ryw->getTransaction().getReadVersion().get())) { + Key zone_key = healthyZone.first.withPrefix(prefix); + int64_t seconds = healthyZone.first == ignoreSSFailuresZoneString + ? 0 + : (healthyZone.second - ryw->getTransaction().getReadVersion().get()) / + CLIENT_KNOBS->CORE_VERSIONSPERSECOND; + if (kr.contains(zone_key)) { + result.push_back_deep(result.arena(), + KeyValueRef(zone_key, Value(boost::lexical_cast(seconds)))); + } + } + } + return rywGetRange(ryw, kr, result); +} + +Future> MaintenanceImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + return MaintenanceGetRangeActor(ryw, getKeyRange().begin, kr); +} + +ACTOR static Future> maintenanceCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { + // read + ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE); + ryw->getTransaction().setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + Optional val = wait(ryw->getTransaction().get(healthyZoneKey)); + Optional> healthyZone = + val.present() ? decodeHealthyZoneValue(val.get()) : Optional>(); + + state RangeMap>, KeyRangeRef>::Ranges ranges = + ryw->getSpecialKeySpaceWriteMap().containedRanges(kr); + Key zoneId; + int64_t seconds; + bool isSet = false; + // Since maintenance only allows one zone at the same time, + // if a transaction has more than one set operation on different zone keys, + // the commit will throw an error + for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) { + if (!iter->value().first) continue; + if (iter->value().second.present()) { + if (isSet) + return Optional(ManagementAPIError::toJsonString( + false, "maintenance", "Multiple zones given for maintenance, only one allowed at the same time")); + isSet = true; + zoneId = iter->begin().removePrefix(kr.begin); + seconds = boost::lexical_cast(iter->value().second.get().toString()); + } else { + // if we already have set operation, then all clear operations will be meaningless, thus skip + if (!isSet && healthyZone.present() && iter.range().contains(healthyZone.get().first.withPrefix(kr.begin))) + ryw->getTransaction().clear(healthyZoneKey); + } + } + + if (isSet) { + if (healthyZone.present() && healthyZone.get().first == ignoreSSFailuresZoneString) { + std::string msg = "Maintenance mode cannot be used while data distribution is disabled for storage " + "server failures."; + return Optional(ManagementAPIError::toJsonString(false, "maintenance", msg)); + } else { + TraceEvent(SevDebug, "SKSMaintenanceSet").detail("ZoneId", zoneId.toString()); + ryw->getTransaction().set(healthyZoneKey, + healthyZoneValue(zoneId, ryw->getTransaction().getReadVersion().get() + + (seconds * CLIENT_KNOBS->CORE_VERSIONSPERSECOND))); + } + } + return Optional(); +} + +Future> MaintenanceImpl::commit(ReadYourWritesTransaction* ryw) { + return maintenanceCommitActor(ryw, getKeyRange()); +} + +DataDistributionImpl::DataDistributionImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +ACTOR static Future> DataDistributionGetRangeActor(ReadYourWritesTransaction* ryw, + KeyRef prefix, KeyRangeRef kr) { + state Standalone result; + // dataDistributionModeKey + state Key modeKey = LiteralStringRef("mode").withPrefix(prefix); + if (kr.contains(modeKey)) { + auto entry = ryw->getSpecialKeySpaceWriteMap()[modeKey]; + if (ryw->readYourWritesDisabled() || !entry.first) { + Optional f = wait(ryw->getTransaction().get(dataDistributionModeKey)); + int mode = -1; + if (f.present()) { + mode = BinaryReader::fromStringRef(f.get(), Unversioned()); + } + result.push_back_deep(result.arena(), KeyValueRef(modeKey, Value(boost::lexical_cast(mode)))); + } + } + // rebalanceDDIgnoreKey + state Key rebalanceIgnoredKey = LiteralStringRef("rebalance_ignored").withPrefix(prefix); + if (kr.contains(rebalanceIgnoredKey)) { + auto entry = ryw->getSpecialKeySpaceWriteMap()[rebalanceIgnoredKey]; + if (ryw->readYourWritesDisabled() || !entry.first) { + Optional f = wait(ryw->getTransaction().get(rebalanceDDIgnoreKey)); + if (f.present()) { + result.push_back_deep(result.arena(), KeyValueRef(rebalanceIgnoredKey, Value())); + } + } + } + return rywGetRange(ryw, kr, result); +} + +Future> DataDistributionImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr) const { + return DataDistributionGetRangeActor(ryw, getKeyRange().begin, kr); +} + +Future> DataDistributionImpl::commit(ReadYourWritesTransaction* ryw) { + // there are two valid keys in the range + // /mode -> dataDistributionModeKey, the value is only allowed to be set as "0"(disable) or "1"(enable) + // /rebalance_ignored -> rebalanceDDIgnoreKey, value is unused thus empty + Optional msg; + KeyRangeRef kr = getKeyRange(); + Key modeKey = LiteralStringRef("mode").withPrefix(kr.begin); + Key rebalanceIgnoredKey = LiteralStringRef("rebalance_ignored").withPrefix(kr.begin); + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(kr); + for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) { + if (!iter->value().first) continue; + if (iter->value().second.present()) { + if (iter->range() == singleKeyRange(modeKey)) { + try { + int mode = boost::lexical_cast(iter->value().second.get().toString()); + Value modeVal = BinaryWriter::toValue(mode, Unversioned()); + if (mode == 0 || mode == 1) + ryw->getTransaction().set(dataDistributionModeKey, modeVal); + else + msg = ManagementAPIError::toJsonString(false, "datadistribution", + "Please set the value of the data_distribution/mode to " + "0(disable) or 1(enable), other values are not allowed"); + } catch (boost::bad_lexical_cast& e) { + msg = ManagementAPIError::toJsonString(false, "datadistribution", + "Invalid datadistribution mode(int): " + + iter->value().second.get().toString()); + } + } else if (iter->range() == singleKeyRange(rebalanceIgnoredKey)) { + if (iter->value().second.get().size()) + msg = + ManagementAPIError::toJsonString(false, "datadistribution", + "Value is unused for the data_distribution/rebalance_ignored " + "key, please set it to an empty value"); + else + ryw->getTransaction().set(rebalanceDDIgnoreKey, LiteralStringRef("on")); + } else { + msg = ManagementAPIError::toJsonString( + false, "datadistribution", + "Changing invalid keys, please read the documentation to check valid keys in the range"); + } + } else { + // clear + if (iter->range().contains(modeKey)) + ryw->getTransaction().clear(dataDistributionModeKey); + else if (iter->range().contains(rebalanceIgnoredKey)) + ryw->getTransaction().clear(rebalanceDDIgnoreKey); + } + } + return msg; +} diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 0c70e43525..0d46f51961 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -363,5 +363,18 @@ public: void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; }; +class MaintenanceImpl : public SpecialKeyRangeRWImpl { +public: + explicit MaintenanceImpl(KeyRangeRef kr); + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; + Future> commit(ReadYourWritesTransaction* ryw) override; +}; +class DataDistributionImpl : public SpecialKeyRangeRWImpl { +public: + explicit DataDistributionImpl(KeyRangeRef kr); + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; + Future> commit(ReadYourWritesTransaction* ryw) override; +}; + #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index f101a02360..ce41d98812 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -1229,6 +1229,180 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { } } } + // data_distribution & maintenance get + loop { + try { + // maintenance + Standalone maintenanceKVs = wait( + tx->getRange(SpecialKeySpace::getManamentApiCommandRange("maintenance"), CLIENT_KNOBS->TOO_MANY)); + // By default, no maintenance is going on + ASSERT(!maintenanceKVs.more && !maintenanceKVs.size()); + // datadistribution + Standalone ddKVs = wait(tx->getRange( + SpecialKeySpace::getManamentApiCommandRange("datadistribution"), CLIENT_KNOBS->TOO_MANY)); + // By default, data_distribution/mode := "-1" + ASSERT(!ddKVs.more && ddKVs.size() == 1); + ASSERT(ddKVs[0].key == LiteralStringRef("mode").withPrefix( + SpecialKeySpace::getManagementApiCommandPrefix("datadistribution"))); + ASSERT(ddKVs[0].value == Value(boost::lexical_cast(-1))); + tx->reset(); + break; + } catch (Error& e) { + TraceEvent(SevDebug, "MaintenanceGet").error(e); + wait(tx->onError(e)); + } + } + // maintenance set + { + // Make sure setting more than one zone as maintenance will fail + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tx->set(Key(deterministicRandom()->randomAlphaNumeric(8)) + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")), + Value(boost::lexical_cast(deterministicRandom()->randomInt(1, 100)))); + // make sure this is a different zone id + tx->set(Key(deterministicRandom()->randomAlphaNumeric(9)) + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")), + Value(boost::lexical_cast(deterministicRandom()->randomInt(1, 100)))); + wait(tx->commit()); + ASSERT(false); + } catch (Error& e) { + TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone").error(e); + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool()); + TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone") + .detail("ErrorMessage", valueObj["message"].get_str()); + tx->reset(); + break; + } else { + wait(tx->onError(e)); + } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + } + } + // Disable DD for SS failures + state int ignoreSSFailuresRetry = 0; + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tx->set(ignoreSSFailuresZoneString.withPrefix( + SpecialKeySpace::getManagementApiCommandPrefix("maintenance")), + Value(boost::lexical_cast(0))); + wait(tx->commit()); + tx->reset(); + ignoreSSFailuresRetry++; + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + } catch (Error& e) { + TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures").error(e); + // the second commit will fail since maintenance not allowed to use while DD disabled for SS + // failures + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool()); + ASSERT(ignoreSSFailuresRetry > 0); + TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures") + .detail("Retry", ignoreSSFailuresRetry) + .detail("ErrorMessage", valueObj["message"].get_str()); + tx->reset(); + break; + } else { + wait(tx->onError(e)); + } + ignoreSSFailuresRetry++; + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + } + } + // set dd mode to 0 and disable DD for rebalance + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution"); + tx->set(LiteralStringRef("mode").withPrefix(ddPrefix), LiteralStringRef("0")); + tx->set(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix), Value()); + wait(tx->commit()); + tx->reset(); + break; + } catch (Error& e) { + TraceEvent(SevDebug, "DataDistributionDisableModeAndRebalance").error(e); + wait(tx->onError(e)); + } + } + // verify underlying system keys are consistent with the change + loop { + try { + tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + // check DD disabled for SS failures + Optional val1 = wait(tx->get(healthyZoneKey)); + ASSERT(val1.present()); + auto healthyZone = decodeHealthyZoneValue(val1.get()); + ASSERT(healthyZone.first == ignoreSSFailuresZoneString); + // check DD mode + Optional val2 = wait(tx->get(dataDistributionModeKey)); + ASSERT(val2.present()); + // mode should be set to 0 + ASSERT(BinaryReader::fromStringRef(val2.get(), Unversioned()) == 0); + // check DD disabled for rebalance + Optional val3 = wait(tx->get(rebalanceDDIgnoreKey)); + // default value "on" + ASSERT(val3.present() && val3.get() == LiteralStringRef("on")); + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + // then, clear all changes + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tx->clear(ignoreSSFailuresZoneString.withPrefix( + SpecialKeySpace::getManagementApiCommandPrefix("maintenance"))); + KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution"); + tx->clear(LiteralStringRef("mode").withPrefix(ddPrefix)); + tx->clear(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix)); + wait(tx->commit()); + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + // verify all changes are cleared + loop { + try { + tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + // check DD SSFailures key + Optional val1 = wait(tx->get(healthyZoneKey)); + ASSERT(!val1.present()); + // check DD mode + Optional val2 = wait(tx->get(dataDistributionModeKey)); + ASSERT(!val2.present()); + // check DD rebalance key + Optional val3 = wait(tx->get(rebalanceDDIgnoreKey)); + ASSERT(!val3.present()); + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + } return Void(); } };