foundationdb/fdbclient/GlobalConfig.actor.cpp

324 lines
11 KiB
C++
Raw Normal View History

/*
* GlobalConfig.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
2022-03-22 04:36:23 +08:00
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tuple.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
2021-04-13 01:27:41 +08:00
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("config/fdb_client_info/client_txn_sample_rate");
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_info/client_txn_size_limit");
2021-04-13 01:27:41 +08:00
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
const KeyRef samplingFrequency = LiteralStringRef("visibility/sampling/frequency");
const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window");
2022-05-17 08:23:09 +08:00
GlobalConfig::GlobalConfig(DatabaseContext* cx) : cx(cx), lastUpdate(0) {}
Refactor profiling special keys to use GlobalConfig The special keys `\xff\xff/management/profiling/client_txn_sample_rate` and `\xff\xff/management/profiling/client_txn_size_limit` are deprecated in FDB 7.2. However, GlobalConfig was introduced in 7.0, and reading and writing these keys through the special key space was broken in 7.0+. This change modifies the profiling special keys to use GlobalConfig behind the scenes, fixing the broken special keys. The following Python script was used to make sure both GlobalConfig and the profiling special key can be used to read/write/clear profiling data: ``` import fdb import time fdb.api_version(710) @fdb.transactional def set_sample_rate(tr): tr.options.set_special_key_space_enable_writes() # Alternative way to write the key #tr[b'\xff\xff/global_config/config/fdb_client_info/client_txn_sample_rate'] = fdb.tuple.pack((5.0,)) tr[b'\xff\xff/management/profiling/client_txn_sample_rate'] = '5.0' @fdb.transactional def clear_sample_rate(tr): tr.options.set_special_key_space_enable_writes() # Alternative way to clear the key #tr.clear(b'\xff\xff/global_config/config/fdb_client_info/client_txn_sample_rate') tr[b'\xff\xff/management/profiling/client_txn_sample_rate'] = 'default' @fdb.transactional def get_sample_rate(tr): print(tr.get(b'\xff\xff/global_config/config/fdb_client_info/client_txn_sample_rate')) # Alternative way to read the key #print(tr.get(b'\xff\xff/management/profiling/client_txn_sample_rate')) fdb.options.set_trace_enable() fdb.options.set_trace_format('json') db = fdb.open() get_sample_rate(db) # None (or 'default') set_sample_rate(db) time.sleep(1) # Allow time for global config changes to propagate get_sample_rate(db) # 5.0 clear_sample_rate(db) time.sleep(1) get_sample_rate(db) # None (or 'default') ``` It can be run with `PYTHONPATH=./bindings/python/ python profiling.py`, and reads the `fdb.cluster` file in the current directory. ``` $ PYTHONPATH=./bindings/python/ python sps.py None 5.000000 None ```
2022-04-28 03:45:24 +08:00
void GlobalConfig::applyChanges(Transaction& tr,
const VectorRef<KeyValueRef>& insertions,
const VectorRef<KeyRangeRef>& clears) {
VersionHistory vh{ 0 };
for (const auto& kv : insertions) {
vh.mutations.emplace_back_deep(vh.mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value));
tr.set(kv.key.withPrefix(globalConfigKeysPrefix), kv.value);
}
for (const auto& range : clears) {
vh.mutations.emplace_back_deep(vh.mutations.arena(),
MutationRef(MutationRef::ClearRange, range.begin, range.end));
tr.clear(
KeyRangeRef(range.begin.withPrefix(globalConfigKeysPrefix), range.end.withPrefix(globalConfigKeysPrefix)));
}
// Record the mutations in this commit into the global configuration history.
Key historyKey = addVersionStampAtEnd(globalConfigHistoryPrefix);
ObjectWriter historyWriter(IncludeVersion());
historyWriter.serialize(vh);
tr.atomicOp(historyKey, historyWriter.toStringRef(), MutationRef::SetVersionstampedKey);
// Write version key to trigger update in cluster controller.
tr.atomicOp(globalConfigVersionKey,
LiteralStringRef("0123456789\x00\x00\x00\x00"), // versionstamp
MutationRef::SetVersionstampedValue);
}
Key GlobalConfig::prefixedKey(KeyRef key) {
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
}
2021-03-20 04:28:03 +08:00
const Reference<ConfigValue> GlobalConfig::get(KeyRef name) {
auto it = data.find(name);
if (it == data.end()) {
2021-03-20 04:28:03 +08:00
return Reference<ConfigValue>();
}
return it->second;
}
2021-03-20 04:28:03 +08:00
const std::map<KeyRef, Reference<ConfigValue>> GlobalConfig::get(KeyRangeRef range) {
std::map<KeyRef, Reference<ConfigValue>> results;
for (const auto& [key, value] : data) {
if (range.contains(key)) {
results[key] = value;
}
}
return results;
}
Future<Void> GlobalConfig::onInitialized() {
return initialized.getFuture();
}
Future<Void> GlobalConfig::onChange() {
return configChanged.onTrigger();
}
void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn) {
callbacks.emplace(key, std::move(fn));
}
void GlobalConfig::insert(KeyRef key, ValueRef value) {
2021-08-04 11:34:44 +08:00
// TraceEvent(SevInfo, "GlobalConfig_Insert").detail("Key", key).detail("Value", value);
2021-03-20 04:28:03 +08:00
data.erase(key);
2021-02-27 01:27:55 +08:00
Arena arena(key.expectedSize() + value.expectedSize());
KeyRef stableKey = KeyRef(arena, key);
try {
2021-03-20 04:28:03 +08:00
std::any any;
Tuple t = Tuple::unpack(value);
if (t.getType(0) == Tuple::ElementType::UTF8) {
2021-03-20 04:28:03 +08:00
any = StringRef(arena, t.getString(0).contents());
} else if (t.getType(0) == Tuple::ElementType::INT) {
2021-03-20 04:28:03 +08:00
any = t.getInt(0);
} else if (t.getType(0) == Tuple::ElementType::BOOL) {
any = t.getBool(0);
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
2021-03-20 04:28:03 +08:00
any = t.getFloat(0);
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
2021-03-20 04:28:03 +08:00
any = t.getDouble(0);
} else {
ASSERT(false);
}
2021-03-20 04:28:03 +08:00
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
if (callbacks.find(stableKey) != callbacks.end()) {
callbacks[stableKey](data[stableKey]->value);
}
} catch (Error& e) {
TraceEvent(SevWarn, "GlobalConfigTupleParseError").detail("What", e.what());
}
}
void GlobalConfig::erase(Key key) {
erase(KeyRangeRef(key, keyAfter(key)));
}
void GlobalConfig::erase(KeyRangeRef range) {
2021-08-04 11:34:44 +08:00
// TraceEvent(SevInfo, "GlobalConfig_Erase").detail("Range", range);
auto it = data.begin();
while (it != data.end()) {
if (range.contains(it->first)) {
if (callbacks.find(it->first) != callbacks.end()) {
callbacks[it->first](std::nullopt);
}
it = data.erase(it);
} else {
++it;
}
}
}
2021-02-20 06:22:58 +08:00
// Similar to tr.onError(), but doesn't require a DatabaseContext.
struct Backoff {
Future<Void> onError() {
double currentBackoff = backoff;
backoff = std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, CLIENT_KNOBS->DEFAULT_MAX_BACKOFF);
return delay(currentBackoff * deterministicRandom()->random01());
}
private:
double backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
};
// Older FDB versions used different keys for client profiling data. This
// function performs a one-time migration of data in these keys to the new
// global configuration key space.
ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
2022-05-24 02:56:37 +08:00
state Key migratedKey("\xff\x02/fdbClientInfo/migrated/"_sr);
2022-05-17 07:50:50 +08:00
state Reference<ReadYourWritesTransaction> tr;
try {
state Backoff backoff;
2022-05-17 07:50:50 +08:00
loop {
tr = makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(self->cx)));
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
state Optional<Value> migrated = wait(tr->get(migratedKey));
if (migrated.present()) {
// Already performed migration.
return Void();
}
state Optional<Value> sampleRate =
wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
state Optional<Value> sizeLimit =
wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}
wait(tr->commit());
break;
} catch (Error& e) {
// If multiple fdbserver processes are started at once, they will all
// attempt this migration at the same time, sometimes resulting in
// aborts due to conflicts. Purposefully avoid retrying, making this
// migration best-effort.
TraceEvent(SevInfo, "GlobalConfig_RetryableMigrationError").errorUnsuppressed(e).suppressFor(1.0);
2022-05-17 07:50:50 +08:00
wait(tr->onError(e));
2022-05-24 02:56:37 +08:00
tr.clear();
// tr is cleared, so it won't backoff properly. Use custom backoff logic here.
wait(backoff.onError());
2022-05-17 07:50:50 +08:00
}
}
} catch (Error& e) {
2022-05-17 07:50:50 +08:00
// Catch non-retryable errors (and do nothing).
2022-05-24 02:56:37 +08:00
TraceEvent(SevWarnAlways, "GlobalConfig_MigrationError").error(e);
}
2021-06-04 02:12:48 +08:00
return Void();
}
// Updates local copy of global configuration by reading the entire key-range
// from storage.
2021-02-20 06:22:58 +08:00
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
2021-08-04 11:34:44 +08:00
// TraceEvent trace(SevInfo, "GlobalConfig_Refresh");
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
state Backoff backoff;
2022-05-17 08:23:09 +08:00
state Reference<ReadYourWritesTransaction> tr;
2022-05-17 07:50:50 +08:00
loop {
try {
2022-05-17 08:23:09 +08:00
tr = makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(self->cx)));
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
RangeResult result = wait(tr->getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
2022-05-17 07:50:50 +08:00
for (const auto& kv : result) {
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
self->insert(systemKey, kv.value);
}
break;
} catch (Error& e) {
TraceEvent("GlobalConfigRefreshError").errorUnsuppressed(e).suppressFor(1.0);
2022-05-17 08:23:09 +08:00
wait(tr->onError(e));
2022-05-24 02:56:37 +08:00
tr.clear();
// tr is cleared, so it won't backoff properly. Use custom backoff logic here.
wait(backoff.onError());
2022-05-17 07:50:50 +08:00
}
2021-02-20 06:22:58 +08:00
}
return Void();
}
// Applies updates to the local copy of the global configuration when this
// process receives an updated history.
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo* dbInfo) {
2021-02-20 06:22:58 +08:00
loop {
try {
2022-05-24 02:56:37 +08:00
if (self->initialized.canBeSet()) {
wait(self->cx->onConnected());
wait(self->migrate(self));
2021-02-20 06:22:58 +08:00
2022-05-24 02:56:37 +08:00
wait(self->refresh(self));
self->initialized.send(Void());
}
2022-05-24 02:56:37 +08:00
loop {
try {
wait(self->dbInfoChanged.onTrigger());
auto& history = dbInfo->history;
if (history.size() == 0) {
continue;
2021-02-20 06:22:58 +08:00
}
2022-05-24 02:56:37 +08:00
if (self->lastUpdate < history[0].version) {
// This process missed too many global configuration
// history updates or the protocol version changed, so it
// must re-read the entire configuration range.
wait(self->refresh(self));
if (dbInfo->history.size() > 0) {
self->lastUpdate = dbInfo->history.back().version;
}
} else {
// Apply history in order, from lowest version to highest
// version. Mutation history should already be stored in
// ascending version order.
for (const auto& vh : history) {
if (vh.version <= self->lastUpdate) {
continue; // already applied this mutation
}
for (const auto& mutation : vh.mutations.contents()) {
if (mutation.type == MutationRef::SetValue) {
self->insert(mutation.param1, mutation.param2);
} else if (mutation.type == MutationRef::ClearRange) {
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
} else {
ASSERT(false);
}
}
ASSERT(vh.version > self->lastUpdate);
self->lastUpdate = vh.version;
2021-02-20 06:22:58 +08:00
}
}
2022-05-24 02:56:37 +08:00
self->configChanged.trigger();
} catch (Error& e) {
throw;
2021-02-20 06:22:58 +08:00
}
}
} catch (Error& e) {
2022-05-24 02:56:37 +08:00
// There shouldn't be any uncaught errors that fall to this point,
// but in case there are, catch them and restart the updater.
TraceEvent("GlobalConfigUpdaterError").error(e);
wait(delay(1.0));
2021-02-20 06:22:58 +08:00
}
}
}