Merge pull request #4582 from sfc-gh-clin/add-dd-and-maintenance
Add dd and maintenance
This commit is contained in:
commit
5489de985c
|
@ -949,6 +949,12 @@ that process, and wait for necessary data to be moved away.
|
|||
#. ``\xff\xff/management/options/failed/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed/<exclusion>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
|
||||
#. ``\xff\xff/management/min_required_commit_version`` Read/write. Changing this key will change the corresponding system key ``\xff/minRequiredCommitVersion = [[Version]]``. The value of this special key is the literal text of the underlying ``Version``, which is ``int64_t``. If you set the key with a value failed to be parsed as ``int64_t``, ``special_keys_api_failure`` will be thrown. In addition, the given ``Version`` should be larger than the current read version and smaller than the upper bound(``2**63-1-version_per_second*3600*24*365*1000``). Otherwise, ``special_keys_api_failure`` is thrown. For more details, see help text of ``fdbcli`` command ``advanceversion``.
|
||||
#. ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` Read/write. Changing these two keys will change the corresponding system keys ``\xff\x02/fdbClientInfo/<client_txn_sample_rate|client_txn_size_limit>``, respectively. The value of ``\xff\xff/management/client_txn_sample_rate`` is a literal text of ``double``, and the value of ``\xff\xff/management/client_txn_size_limit`` is a literal text of ``int64_t``. A special value ``default`` can be set to or read from these two keys, representing the client profiling is disabled. In addition, ``clear`` in this range is not allowed. For more details, see help text of ``fdbcli`` command ``profile client``.
|
||||
#. ``\xff\xff/management/maintenance/<zone_id> := <seconds>`` Read/write. Set/clear a key in this range will change the corresponding system key ``\xff\x02/healthyZone``. The value is a literal text of a non-negative ``double`` which represents the remaining time for the zone to be in maintenance. Commiting with an invalid value will throw ``special_keys_api_failure``. Only one zone is allowed to be in maintenance at the same time. Setting a new key in the range will override the old one and the transaction will throw ``special_keys_api_failure`` error if more than one zone is given. For more details, see help text of ``fdbcli`` command ``maintenance``.
|
||||
In addition, a special key ``\xff\xff/management/maintenance/IgnoreSSFailures`` in the range, if set, will disable datadistribution for storage server failures.
|
||||
It is doing the same thing as the fdbcli command ``datadistribution disable ssfailure``.
|
||||
Maintenance mode will be unable to use until the key is cleared, which is the same as the fdbcli command ``datadistribution enable ssfailure``.
|
||||
While the key is set, any commit that tries to set a key in the range will fail with the ``special_keys_api_failure`` error.
|
||||
#. ``\xff\xff/management/data_distribution/<mode|rebalance_ignored>`` Read/write. Changing these two keys will change the two corresponding system keys ``\xff/dataDistributionMode`` and ``\xff\x02/rebalanceDDIgnored``. The value of ``\xff\xff/management/data_distribution/mode`` is a literal text of ``0`` (disable) or ``1`` (enable). Transactions committed with invalid values will throw ``special_keys_api_failure`` . The value of ``\xff\xff/management/data_distribution/rebalance_ignored`` is empty. If present, it means data distribution is disabled for rebalance. Any transaction committed with non-empty value for this key will throw ``special_keys_api_failure``. For more details, see help text of ``fdbcli`` command ``datadistribution``.
|
||||
|
||||
An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
|
||||
an ip address and port (e.g. ``127.0.0.1:4500``). If no port is specified,
|
||||
|
|
|
@ -1053,6 +1053,16 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
std::make_unique<ClientProfilingImpl>(
|
||||
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<MaintenanceImpl>(
|
||||
KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<DataDistributionImpl>(
|
||||
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
}
|
||||
if (apiVersionAtLeast(630)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
|
||||
|
|
|
@ -89,6 +89,12 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
|
|||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "profile",
|
||||
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "maintenance",
|
||||
KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "datadistribution",
|
||||
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
};
|
||||
|
||||
|
@ -1859,6 +1865,7 @@ ACTOR static Future<Standalone<RangeResultRef>> ClientProfilingGetRangeActor(Rea
|
|||
return result;
|
||||
}
|
||||
|
||||
// TODO : add limitation on set operation
|
||||
Future<Standalone<RangeResultRef>> ClientProfilingImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
return ClientProfilingGetRangeActor(ryw, getKeyRange().begin, kr);
|
||||
}
|
||||
|
@ -1915,3 +1922,196 @@ void ClientProfilingImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& ke
|
|||
"profile",
|
||||
"Clear operation is forbidden for profile client. You can set it to default to disable profiling.");
|
||||
}
|
||||
|
||||
MaintenanceImpl::MaintenanceImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
// Used to read the healthZoneKey
|
||||
// If the key is persisted and the delayed read version is still larger than current read version,
|
||||
// we will calculate the remaining time(truncated to integer, the same as fdbcli) and return back as the value
|
||||
// If the zoneId is the special one `ignoreSSFailuresZoneString`,
|
||||
// value will be 0 (same as fdbcli)
|
||||
ACTOR static Future<Standalone<RangeResultRef>> MaintenanceGetRangeActor(ReadYourWritesTransaction* ryw,
|
||||
KeyRef prefix,
|
||||
KeyRangeRef kr) {
|
||||
state Standalone<RangeResultRef> result;
|
||||
// zoneId
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> val = wait(ryw->getTransaction().get(healthyZoneKey));
|
||||
if (val.present()) {
|
||||
auto healthyZone = decodeHealthyZoneValue(val.get());
|
||||
if ((healthyZone.first == ignoreSSFailuresZoneString) ||
|
||||
(healthyZone.second > ryw->getTransaction().getReadVersion().get())) {
|
||||
Key zone_key = healthyZone.first.withPrefix(prefix);
|
||||
double seconds = healthyZone.first == ignoreSSFailuresZoneString
|
||||
? 0
|
||||
: (healthyZone.second - ryw->getTransaction().getReadVersion().get()) /
|
||||
CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
|
||||
if (kr.contains(zone_key)) {
|
||||
result.push_back_deep(result.arena(),
|
||||
KeyValueRef(zone_key, Value(boost::lexical_cast<std::string>(seconds))));
|
||||
}
|
||||
}
|
||||
}
|
||||
return rywGetRange(ryw, kr, result);
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> MaintenanceImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
return MaintenanceGetRangeActor(ryw, getKeyRange().begin, kr);
|
||||
}
|
||||
|
||||
// Commit the change to healthZoneKey
|
||||
// We do not allow more than one zone to be set in maintenance in one transaction
|
||||
// In addition, if the zoneId now is 'ignoreSSFailuresZoneString',
|
||||
// which means the data distribution is disabled for storage failures.
|
||||
// Only clear this specific key is allowed, any other operations will throw error
|
||||
ACTOR static Future<Optional<std::string>> maintenanceCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
|
||||
// read
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
ryw->getTransaction().setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Optional<Value> val = wait(ryw->getTransaction().get(healthyZoneKey));
|
||||
Optional<std::pair<Key, Version>> healthyZone =
|
||||
val.present() ? decodeHealthyZoneValue(val.get()) : Optional<std::pair<Key, Version>>();
|
||||
|
||||
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
|
||||
ryw->getSpecialKeySpaceWriteMap().containedRanges(kr);
|
||||
Key zoneId;
|
||||
double seconds;
|
||||
bool isSet = false;
|
||||
// Since maintenance only allows one zone at the same time,
|
||||
// if a transaction has more than one set operation on different zone keys,
|
||||
// the commit will throw an error
|
||||
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
|
||||
if (!iter->value().first)
|
||||
continue;
|
||||
if (iter->value().second.present()) {
|
||||
if (isSet)
|
||||
return Optional<std::string>(ManagementAPIError::toJsonString(
|
||||
false, "maintenance", "Multiple zones given for maintenance, only one allowed at the same time"));
|
||||
isSet = true;
|
||||
zoneId = iter->begin().removePrefix(kr.begin);
|
||||
seconds = boost::lexical_cast<double>(iter->value().second.get().toString());
|
||||
} else {
|
||||
// if we already have set operation, then all clear operations will be meaningless, thus skip
|
||||
if (!isSet && healthyZone.present() && iter.range().contains(healthyZone.get().first.withPrefix(kr.begin)))
|
||||
ryw->getTransaction().clear(healthyZoneKey);
|
||||
}
|
||||
}
|
||||
|
||||
if (isSet) {
|
||||
if (healthyZone.present() && healthyZone.get().first == ignoreSSFailuresZoneString) {
|
||||
std::string msg = "Maintenance mode cannot be used while data distribution is disabled for storage "
|
||||
"server failures.";
|
||||
return Optional<std::string>(ManagementAPIError::toJsonString(false, "maintenance", msg));
|
||||
} else if (seconds < 0) {
|
||||
std::string msg =
|
||||
"The specified maintenance time " + boost::lexical_cast<std::string>(seconds) + " is a negative value";
|
||||
return Optional<std::string>(ManagementAPIError::toJsonString(false, "maintenance", msg));
|
||||
} else {
|
||||
TraceEvent(SevDebug, "SKSMaintenanceSet").detail("ZoneId", zoneId.toString());
|
||||
ryw->getTransaction().set(healthyZoneKey,
|
||||
healthyZoneValue(zoneId,
|
||||
ryw->getTransaction().getReadVersion().get() +
|
||||
(seconds * CLIENT_KNOBS->CORE_VERSIONSPERSECOND)));
|
||||
}
|
||||
}
|
||||
return Optional<std::string>();
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> MaintenanceImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
return maintenanceCommitActor(ryw, getKeyRange());
|
||||
}
|
||||
|
||||
DataDistributionImpl::DataDistributionImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
// Read the system keys dataDistributionModeKey and rebalanceDDIgnoreKey
|
||||
ACTOR static Future<Standalone<RangeResultRef>> DataDistributionGetRangeActor(ReadYourWritesTransaction* ryw,
|
||||
KeyRef prefix,
|
||||
KeyRangeRef kr) {
|
||||
state Standalone<RangeResultRef> result;
|
||||
// dataDistributionModeKey
|
||||
state Key modeKey = LiteralStringRef("mode").withPrefix(prefix);
|
||||
if (kr.contains(modeKey)) {
|
||||
auto entry = ryw->getSpecialKeySpaceWriteMap()[modeKey];
|
||||
if (ryw->readYourWritesDisabled() || !entry.first) {
|
||||
Optional<Value> f = wait(ryw->getTransaction().get(dataDistributionModeKey));
|
||||
int mode = -1;
|
||||
if (f.present()) {
|
||||
mode = BinaryReader::fromStringRef<int>(f.get(), Unversioned());
|
||||
}
|
||||
result.push_back_deep(result.arena(), KeyValueRef(modeKey, Value(boost::lexical_cast<std::string>(mode))));
|
||||
}
|
||||
}
|
||||
// rebalanceDDIgnoreKey
|
||||
state Key rebalanceIgnoredKey = LiteralStringRef("rebalance_ignored").withPrefix(prefix);
|
||||
if (kr.contains(rebalanceIgnoredKey)) {
|
||||
auto entry = ryw->getSpecialKeySpaceWriteMap()[rebalanceIgnoredKey];
|
||||
if (ryw->readYourWritesDisabled() || !entry.first) {
|
||||
Optional<Value> f = wait(ryw->getTransaction().get(rebalanceDDIgnoreKey));
|
||||
if (f.present()) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(rebalanceIgnoredKey, Value()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return rywGetRange(ryw, kr, result);
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> DataDistributionImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
return DataDistributionGetRangeActor(ryw, getKeyRange().begin, kr);
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> DataDistributionImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
// there are two valid keys in the range
|
||||
// <prefix>/mode -> dataDistributionModeKey, the value is only allowed to be set as "0"(disable) or "1"(enable)
|
||||
// <prefix>/rebalance_ignored -> rebalanceDDIgnoreKey, value is unused thus empty
|
||||
Optional<std::string> msg;
|
||||
KeyRangeRef kr = getKeyRange();
|
||||
Key modeKey = LiteralStringRef("mode").withPrefix(kr.begin);
|
||||
Key rebalanceIgnoredKey = LiteralStringRef("rebalance_ignored").withPrefix(kr.begin);
|
||||
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(kr);
|
||||
for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) {
|
||||
if (!iter->value().first)
|
||||
continue;
|
||||
if (iter->value().second.present()) {
|
||||
if (iter->range() == singleKeyRange(modeKey)) {
|
||||
try {
|
||||
int mode = boost::lexical_cast<int>(iter->value().second.get().toString());
|
||||
Value modeVal = BinaryWriter::toValue(mode, Unversioned());
|
||||
if (mode == 0 || mode == 1)
|
||||
ryw->getTransaction().set(dataDistributionModeKey, modeVal);
|
||||
else
|
||||
msg = ManagementAPIError::toJsonString(false,
|
||||
"datadistribution",
|
||||
"Please set the value of the data_distribution/mode to "
|
||||
"0(disable) or 1(enable), other values are not allowed");
|
||||
} catch (boost::bad_lexical_cast& e) {
|
||||
msg = ManagementAPIError::toJsonString(false,
|
||||
"datadistribution",
|
||||
"Invalid datadistribution mode(int): " +
|
||||
iter->value().second.get().toString());
|
||||
}
|
||||
} else if (iter->range() == singleKeyRange(rebalanceIgnoredKey)) {
|
||||
if (iter->value().second.get().size())
|
||||
msg =
|
||||
ManagementAPIError::toJsonString(false,
|
||||
"datadistribution",
|
||||
"Value is unused for the data_distribution/rebalance_ignored "
|
||||
"key, please set it to an empty value");
|
||||
else
|
||||
ryw->getTransaction().set(rebalanceDDIgnoreKey, LiteralStringRef("on"));
|
||||
} else {
|
||||
msg = ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"datadistribution",
|
||||
"Changing invalid keys, please read the documentation to check valid keys in the range");
|
||||
}
|
||||
} else {
|
||||
// clear
|
||||
if (iter->range().contains(modeKey))
|
||||
ryw->getTransaction().clear(dataDistributionModeKey);
|
||||
else if (iter->range().contains(rebalanceIgnoredKey))
|
||||
ryw->getTransaction().clear(rebalanceDDIgnoreKey);
|
||||
}
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
|
|
@ -388,5 +388,18 @@ public:
|
|||
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
|
||||
};
|
||||
|
||||
class MaintenanceImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit MaintenanceImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
class DataDistributionImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit DataDistributionImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -1253,6 +1253,180 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
}
|
||||
// data_distribution & maintenance get
|
||||
loop {
|
||||
try {
|
||||
// maintenance
|
||||
Standalone<RangeResultRef> maintenanceKVs = wait(
|
||||
tx->getRange(SpecialKeySpace::getManamentApiCommandRange("maintenance"), CLIENT_KNOBS->TOO_MANY));
|
||||
// By default, no maintenance is going on
|
||||
ASSERT(!maintenanceKVs.more && !maintenanceKVs.size());
|
||||
// datadistribution
|
||||
Standalone<RangeResultRef> ddKVs = wait(tx->getRange(
|
||||
SpecialKeySpace::getManamentApiCommandRange("datadistribution"), CLIENT_KNOBS->TOO_MANY));
|
||||
// By default, data_distribution/mode := "-1"
|
||||
ASSERT(!ddKVs.more && ddKVs.size() == 1);
|
||||
ASSERT(ddKVs[0].key == LiteralStringRef("mode").withPrefix(
|
||||
SpecialKeySpace::getManagementApiCommandPrefix("datadistribution")));
|
||||
ASSERT(ddKVs[0].value == Value(boost::lexical_cast<std::string>(-1)));
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "MaintenanceGet").error(e);
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// maintenance set
|
||||
{
|
||||
// Make sure setting more than one zone as maintenance will fail
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tx->set(Key(deterministicRandom()->randomAlphaNumeric(8))
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
|
||||
Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
|
||||
// make sure this is a different zone id
|
||||
tx->set(Key(deterministicRandom()->randomAlphaNumeric(9))
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
|
||||
Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
|
||||
wait(tx->commit());
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone").error(e);
|
||||
if (e.code() == error_code_special_keys_api_failure) {
|
||||
Optional<Value> errorMsg =
|
||||
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
|
||||
ASSERT(errorMsg.present());
|
||||
std::string errorStr;
|
||||
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
|
||||
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
|
||||
// special_key_space_management_api_error_msg schema validation
|
||||
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
|
||||
ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
|
||||
TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone")
|
||||
.detail("ErrorMessage", valueObj["message"].get_str());
|
||||
tx->reset();
|
||||
break;
|
||||
} else {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
}
|
||||
}
|
||||
// Disable DD for SS failures
|
||||
state int ignoreSSFailuresRetry = 0;
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tx->set(ignoreSSFailuresZoneString.withPrefix(
|
||||
SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
|
||||
Value(boost::lexical_cast<std::string>(0)));
|
||||
wait(tx->commit());
|
||||
tx->reset();
|
||||
ignoreSSFailuresRetry++;
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures").error(e);
|
||||
// the second commit will fail since maintenance not allowed to use while DD disabled for SS
|
||||
// failures
|
||||
if (e.code() == error_code_special_keys_api_failure) {
|
||||
Optional<Value> errorMsg =
|
||||
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
|
||||
ASSERT(errorMsg.present());
|
||||
std::string errorStr;
|
||||
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
|
||||
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
|
||||
// special_key_space_management_api_error_msg schema validation
|
||||
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
|
||||
ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
|
||||
ASSERT(ignoreSSFailuresRetry > 0);
|
||||
TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures")
|
||||
.detail("Retry", ignoreSSFailuresRetry)
|
||||
.detail("ErrorMessage", valueObj["message"].get_str());
|
||||
tx->reset();
|
||||
break;
|
||||
} else {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
ignoreSSFailuresRetry++;
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
}
|
||||
}
|
||||
// set dd mode to 0 and disable DD for rebalance
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
|
||||
tx->set(LiteralStringRef("mode").withPrefix(ddPrefix), LiteralStringRef("0"));
|
||||
tx->set(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix), Value());
|
||||
wait(tx->commit());
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "DataDistributionDisableModeAndRebalance").error(e);
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// verify underlying system keys are consistent with the change
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
// check DD disabled for SS failures
|
||||
Optional<Value> val1 = wait(tx->get(healthyZoneKey));
|
||||
ASSERT(val1.present());
|
||||
auto healthyZone = decodeHealthyZoneValue(val1.get());
|
||||
ASSERT(healthyZone.first == ignoreSSFailuresZoneString);
|
||||
// check DD mode
|
||||
Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
|
||||
ASSERT(val2.present());
|
||||
// mode should be set to 0
|
||||
ASSERT(BinaryReader::fromStringRef<int>(val2.get(), Unversioned()) == 0);
|
||||
// check DD disabled for rebalance
|
||||
Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
|
||||
// default value "on"
|
||||
ASSERT(val3.present() && val3.get() == LiteralStringRef("on"));
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// then, clear all changes
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tx->clear(ignoreSSFailuresZoneString.withPrefix(
|
||||
SpecialKeySpace::getManagementApiCommandPrefix("maintenance")));
|
||||
KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
|
||||
tx->clear(LiteralStringRef("mode").withPrefix(ddPrefix));
|
||||
tx->clear(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix));
|
||||
wait(tx->commit());
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// verify all changes are cleared
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
// check DD SSFailures key
|
||||
Optional<Value> val1 = wait(tx->get(healthyZoneKey));
|
||||
ASSERT(!val1.present());
|
||||
// check DD mode
|
||||
Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
|
||||
ASSERT(!val2.present());
|
||||
// check DD rebalance key
|
||||
Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
|
||||
ASSERT(!val3.present());
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue