foundationdb/fdbclient/GlobalConfig.actor.cpp

272 lines
9.4 KiB
C++
Raw Normal View History

/*
* GlobalConfig.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
2022-03-22 04:36:23 +08:00
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tuple.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
2021-04-13 01:27:41 +08:00
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("config/fdb_client_info/client_txn_sample_rate");
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_info/client_txn_size_limit");
2021-04-13 01:27:41 +08:00
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
const KeyRef samplingFrequency = LiteralStringRef("visibility/sampling/frequency");
const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window");
2022-05-05 07:36:04 +08:00
GlobalConfig::GlobalConfig(const Database& cx) : cx(cx), lastUpdate(0) {}
Refactor profiling special keys to use GlobalConfig The special keys `\xff\xff/management/profiling/client_txn_sample_rate` and `\xff\xff/management/profiling/client_txn_size_limit` are deprecated in FDB 7.2. However, GlobalConfig was introduced in 7.0, and reading and writing these keys through the special key space was broken in 7.0+. This change modifies the profiling special keys to use GlobalConfig behind the scenes, fixing the broken special keys. The following Python script was used to make sure both GlobalConfig and the profiling special key can be used to read/write/clear profiling data: ``` import fdb import time fdb.api_version(710) @fdb.transactional def set_sample_rate(tr): tr.options.set_special_key_space_enable_writes() # Alternative way to write the key #tr[b'\xff\xff/global_config/config/fdb_client_info/client_txn_sample_rate'] = fdb.tuple.pack((5.0,)) tr[b'\xff\xff/management/profiling/client_txn_sample_rate'] = '5.0' @fdb.transactional def clear_sample_rate(tr): tr.options.set_special_key_space_enable_writes() # Alternative way to clear the key #tr.clear(b'\xff\xff/global_config/config/fdb_client_info/client_txn_sample_rate') tr[b'\xff\xff/management/profiling/client_txn_sample_rate'] = 'default' @fdb.transactional def get_sample_rate(tr): print(tr.get(b'\xff\xff/global_config/config/fdb_client_info/client_txn_sample_rate')) # Alternative way to read the key #print(tr.get(b'\xff\xff/management/profiling/client_txn_sample_rate')) fdb.options.set_trace_enable() fdb.options.set_trace_format('json') db = fdb.open() get_sample_rate(db) # None (or 'default') set_sample_rate(db) time.sleep(1) # Allow time for global config changes to propagate get_sample_rate(db) # 5.0 clear_sample_rate(db) time.sleep(1) get_sample_rate(db) # None (or 'default') ``` It can be run with `PYTHONPATH=./bindings/python/ python profiling.py`, and reads the `fdb.cluster` file in the current directory. ``` $ PYTHONPATH=./bindings/python/ python sps.py None 5.000000 None ```
2022-04-28 03:45:24 +08:00
void GlobalConfig::applyChanges(Transaction& tr,
const VectorRef<KeyValueRef>& insertions,
const VectorRef<KeyRangeRef>& clears) {
VersionHistory vh{ 0 };
for (const auto& kv : insertions) {
vh.mutations.emplace_back_deep(vh.mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value));
tr.set(kv.key.withPrefix(globalConfigKeysPrefix), kv.value);
}
for (const auto& range : clears) {
vh.mutations.emplace_back_deep(vh.mutations.arena(),
MutationRef(MutationRef::ClearRange, range.begin, range.end));
tr.clear(
KeyRangeRef(range.begin.withPrefix(globalConfigKeysPrefix), range.end.withPrefix(globalConfigKeysPrefix)));
}
// Record the mutations in this commit into the global configuration history.
Key historyKey = addVersionStampAtEnd(globalConfigHistoryPrefix);
ObjectWriter historyWriter(IncludeVersion());
historyWriter.serialize(vh);
tr.atomicOp(historyKey, historyWriter.toStringRef(), MutationRef::SetVersionstampedKey);
// Write version key to trigger update in cluster controller.
tr.atomicOp(globalConfigVersionKey,
LiteralStringRef("0123456789\x00\x00\x00\x00"), // versionstamp
MutationRef::SetVersionstampedValue);
}
Key GlobalConfig::prefixedKey(KeyRef key) {
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
}
2021-03-20 04:28:03 +08:00
const Reference<ConfigValue> GlobalConfig::get(KeyRef name) {
auto it = data.find(name);
if (it == data.end()) {
2021-03-20 04:28:03 +08:00
return Reference<ConfigValue>();
}
return it->second;
}
2021-03-20 04:28:03 +08:00
const std::map<KeyRef, Reference<ConfigValue>> GlobalConfig::get(KeyRangeRef range) {
std::map<KeyRef, Reference<ConfigValue>> results;
for (const auto& [key, value] : data) {
if (range.contains(key)) {
results[key] = value;
}
}
return results;
}
Future<Void> GlobalConfig::onInitialized() {
return initialized.getFuture();
}
Future<Void> GlobalConfig::onChange() {
return configChanged.onTrigger();
}
void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn) {
callbacks.emplace(key, std::move(fn));
}
void GlobalConfig::insert(KeyRef key, ValueRef value) {
2021-08-04 11:34:44 +08:00
// TraceEvent(SevInfo, "GlobalConfig_Insert").detail("Key", key).detail("Value", value);
2021-03-20 04:28:03 +08:00
data.erase(key);
2021-02-27 01:27:55 +08:00
Arena arena(key.expectedSize() + value.expectedSize());
KeyRef stableKey = KeyRef(arena, key);
try {
2021-03-20 04:28:03 +08:00
std::any any;
Tuple t = Tuple::unpack(value);
if (t.getType(0) == Tuple::ElementType::UTF8) {
2021-03-20 04:28:03 +08:00
any = StringRef(arena, t.getString(0).contents());
} else if (t.getType(0) == Tuple::ElementType::INT) {
2021-03-20 04:28:03 +08:00
any = t.getInt(0);
} else if (t.getType(0) == Tuple::ElementType::BOOL) {
any = t.getBool(0);
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
2021-03-20 04:28:03 +08:00
any = t.getFloat(0);
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
2021-03-20 04:28:03 +08:00
any = t.getDouble(0);
} else {
ASSERT(false);
}
2021-03-20 04:28:03 +08:00
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
if (callbacks.find(stableKey) != callbacks.end()) {
callbacks[stableKey](data[stableKey]->value);
}
} catch (Error& e) {
TraceEvent(SevWarn, "GlobalConfigTupleParseError").detail("What", e.what());
}
}
void GlobalConfig::erase(Key key) {
erase(KeyRangeRef(key, keyAfter(key)));
}
void GlobalConfig::erase(KeyRangeRef range) {
2021-08-04 11:34:44 +08:00
// TraceEvent(SevInfo, "GlobalConfig_Erase").detail("Range", range);
auto it = data.begin();
while (it != data.end()) {
if (range.contains(it->first)) {
if (callbacks.find(it->first) != callbacks.end()) {
callbacks[it->first](std::nullopt);
}
it = data.erase(it);
} else {
++it;
}
}
}
2021-02-20 06:22:58 +08:00
// Older FDB versions used different keys for client profiling data. This
// function performs a one-time migration of data in these keys to the new
// global configuration key space.
ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Key migratedKey("\xff\x02/fdbClientInfo/migrated/"_sr);
state Optional<Value> migrated = wait(tr->get(migratedKey));
if (migrated.present()) {
// Already performed migration.
return Void();
}
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}
wait(tr->commit());
} catch (Error& e) {
// If multiple fdbserver processes are started at once, they will all
// attempt this migration at the same time, sometimes resulting in
// aborts due to conflicts. Purposefully avoid retrying, making this
// migration best-effort.
2021-08-04 11:34:44 +08:00
TraceEvent(SevInfo, "GlobalConfig_MigrationError").detail("What", e.what());
}
2021-06-04 02:12:48 +08:00
return Void();
}
// Updates local copy of global configuration by reading the entire key-range
// from storage.
2021-02-20 06:22:58 +08:00
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
2021-08-04 11:34:44 +08:00
// TraceEvent trace(SevInfo, "GlobalConfig_Refresh");
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
2021-02-20 06:22:58 +08:00
Transaction tr(self->cx);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
2021-05-04 04:14:16 +08:00
RangeResult result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
2021-02-20 06:22:58 +08:00
for (const auto& kv : result) {
2021-02-20 16:43:54 +08:00
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
2021-02-20 06:22:58 +08:00
self->insert(systemKey, kv.value);
}
return Void();
}
// Applies updates to the local copy of the global configuration when this
// process receives an updated history.
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo* dbInfo) {
wait(self->cx->onConnected());
wait(self->migrate(self));
2021-02-20 06:22:58 +08:00
wait(self->refresh(self));
self->initialized.send(Void());
loop {
try {
wait(self->dbInfoChanged.onTrigger());
2021-02-20 06:22:58 +08:00
auto& history = dbInfo->history;
2021-03-20 04:28:03 +08:00
if (history.size() == 0) {
continue;
}
2021-03-20 04:28:03 +08:00
if (self->lastUpdate < history[0].version) {
2021-02-20 06:22:58 +08:00
// This process missed too many global configuration
// history updates or the protocol version changed, so it
// must re-read the entire configuration range.
wait(self->refresh(self));
if (dbInfo->history.size() > 0) {
self->lastUpdate = dbInfo->history.back().version;
2021-02-25 04:59:40 +08:00
}
2021-02-20 06:22:58 +08:00
} else {
// Apply history in order, from lowest version to highest
// version. Mutation history should already be stored in
// ascending version order.
for (const auto& vh : history) {
if (vh.version <= self->lastUpdate) {
2021-03-17 08:20:25 +08:00
continue; // already applied this mutation
2021-02-20 06:22:58 +08:00
}
for (const auto& mutation : vh.mutations.contents()) {
2021-02-20 06:22:58 +08:00
if (mutation.type == MutationRef::SetValue) {
self->insert(mutation.param1, mutation.param2);
} else if (mutation.type == MutationRef::ClearRange) {
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
} else {
ASSERT(false);
}
}
ASSERT(vh.version > self->lastUpdate);
self->lastUpdate = vh.version;
2021-02-20 06:22:58 +08:00
}
}
self->configChanged.trigger();
2021-02-20 06:22:58 +08:00
} catch (Error& e) {
throw;
}
}
}