Add global configuration framework implementation
This commit is contained in:
parent
eeee15f524
commit
f1415412f1
|
@ -28,6 +28,8 @@ set(FDBCLIENT_SRCS
|
|||
FDBOptions.h
|
||||
FDBTypes.h
|
||||
FileBackupAgent.actor.cpp
|
||||
GlobalConfig.actor.h
|
||||
GlobalConfig.actor.cpp
|
||||
GrvProxyInterface.h
|
||||
HTTP.actor.cpp
|
||||
IClientApi.h
|
||||
|
|
|
@ -113,6 +113,7 @@ struct ClientDBInfo {
|
|||
vector<CommitProxyInterface> commitProxies;
|
||||
Optional<CommitProxyInterface>
|
||||
firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk
|
||||
vector<Standalone<std::pair<Version, VectorRef<MutationRef>>>> history;
|
||||
double clientTxnInfoSampleRate;
|
||||
int64_t clientTxnInfoSizeLimit;
|
||||
Optional<Value> forward;
|
||||
|
@ -132,15 +133,8 @@ struct ClientDBInfo {
|
|||
if constexpr (!is_fb_function<Archive>) {
|
||||
ASSERT(ar.protocolVersion().isValid());
|
||||
}
|
||||
serializer(ar,
|
||||
grvProxies,
|
||||
commitProxies,
|
||||
id,
|
||||
clientTxnInfoSampleRate,
|
||||
clientTxnInfoSizeLimit,
|
||||
forward,
|
||||
transactionTagSampleRate,
|
||||
transactionTagSampleCost);
|
||||
serializer(ar, grvProxies, commitProxies, id, history, clientTxnInfoSampleRate, clientTxnInfoSizeLimit,
|
||||
forward, transactionTagSampleRate, transactionTagSampleCost);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* GlobalConfig.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/GlobalConfig.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
|
||||
|
||||
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
auto config = new GlobalConfig{}; // TODO: memory leak?
|
||||
config->cx = Database(cx);
|
||||
g_network->setGlobal(INetwork::enGlobalConfig, config);
|
||||
config->_updater = updater(config, dbInfo);
|
||||
}
|
||||
|
||||
GlobalConfig& GlobalConfig::globalConfig() {
|
||||
void* res = g_network->global(INetwork::enGlobalConfig);
|
||||
ASSERT(res);
|
||||
return *reinterpret_cast<GlobalConfig*>(res);
|
||||
}
|
||||
|
||||
const std::any GlobalConfig::get(StringRef name) {
|
||||
auto it = data.find(name);
|
||||
if (it == data.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
Future<Void> GlobalConfig::onInitialized() {
|
||||
return initialized.getFuture();
|
||||
}
|
||||
|
||||
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||
Tuple t = Tuple::unpack(value);
|
||||
// TODO: Add more Tuple types
|
||||
if (t.getType(0) == Tuple::ElementType::UTF8) {
|
||||
data[key] = t.getString(0);
|
||||
} else if (t.getType(0) == Tuple::ElementType::INT) {
|
||||
data[key] = t.getInt(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* GlobalConfig.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_GLOBALCONFIG_ACTOR_G_H)
|
||||
#define FDBCLIENT_GLOBALCONFIG_ACTOR_G_H
|
||||
#include "fdbclient/GlobalConfig.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_GLOBALCONFIG_ACTOR_H)
|
||||
#define FDBCLIENT_GLOBALCONFIG_ACTOR_H
|
||||
|
||||
#include <any>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/Knobs.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
class GlobalConfig {
|
||||
public:
|
||||
GlobalConfig();
|
||||
GlobalConfig(const GlobalConfig&) = delete;
|
||||
GlobalConfig& operator=(const GlobalConfig&) = delete;
|
||||
|
||||
static void create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
||||
static GlobalConfig& globalConfig();
|
||||
const std::any get(StringRef name);
|
||||
Future<Void> onInitialized();
|
||||
|
||||
private:
|
||||
void insert(KeyRef key, ValueRef value);
|
||||
|
||||
ACTOR static Future<Void> refresh(GlobalConfig* self) {
|
||||
Transaction tr(self->cx);
|
||||
Standalone<RangeResultRef> result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
for (const auto& kv : result) {
|
||||
KeyRef systemKey = kv.key.removePrefix(globalConfigDataPrefix);
|
||||
self->insert(systemKey, kv.value);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
wait(refresh(self));
|
||||
self->initialized.send(Void());
|
||||
|
||||
loop {
|
||||
try {
|
||||
wait(dbInfo->onChange());
|
||||
|
||||
auto& history = dbInfo->get().history;
|
||||
if (history.size() == 0 || (self->lastUpdate < history[0].first && self->lastUpdate != 0)) {
|
||||
// This process missed too many global configuration
|
||||
// history updates or the protocol version changed, so it
|
||||
// must re-read the entire configuration range.
|
||||
wait(refresh(self));
|
||||
self->lastUpdate = dbInfo->get().history.back().contents().first;
|
||||
} else {
|
||||
// Apply history in order, from lowest version to highest
|
||||
// version. Mutation history should already be stored in
|
||||
// ascending version order.
|
||||
for (int i = 0; i < history.size(); ++i) {
|
||||
std::pair<Version, VectorRef<MutationRef>> pair = history[i].contents();
|
||||
|
||||
Version version = pair.first;
|
||||
if (version <= self->lastUpdate) {
|
||||
continue; // already applied this mutation
|
||||
}
|
||||
|
||||
VectorRef<MutationRef>& mutations = pair.second;
|
||||
for (const auto& mutation : mutations) {
|
||||
if (mutation.type == MutationRef::SetValue) {
|
||||
self->insert(mutation.param1, mutation.param2);
|
||||
} else if (mutation.type == MutationRef::ClearRange) {
|
||||
// TODO: Could be optimized if using std::map..
|
||||
KeyRangeRef range(mutation.param1, mutation.param2);
|
||||
auto it = self->data.begin();
|
||||
while (it != self->data.end()) {
|
||||
if (range.contains(it->first)) {
|
||||
it = self->data.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(version > self->lastUpdate);
|
||||
self->lastUpdate = version;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Database cx;
|
||||
Future<Void> _updater;
|
||||
Promise<Void> initialized;
|
||||
// TODO: Arena to store all data in
|
||||
// TODO: Change to std::map for faster range access
|
||||
std::unordered_map<StringRef, std::any> data;
|
||||
Version lastUpdate;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -36,6 +36,7 @@
|
|||
#include "fdbclient/ClusterInterface.h"
|
||||
#include "fdbclient/CoordinationInterface.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/GlobalConfig.actor.h"
|
||||
#include "fdbclient/JsonBuilder.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
|
@ -962,6 +963,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
|
||||
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
|
||||
|
||||
GlobalConfig::create(this, clientInfo);
|
||||
|
||||
if (apiVersionAtLeast(700)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ERRORMSG,
|
||||
SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
|
@ -1018,9 +1021,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
singleKeyRange(LiteralStringRef("consistency_check_suspended"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::TRACING,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<TracingOptionsImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
|
||||
SpecialKeySpace::MODULE::GLOBALCONFIG, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<GlobalConfigImpl>(
|
||||
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::TRACING, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<TracingOptionsImpl>(
|
||||
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::CONFIGURATION,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#include "boost/lexical_cast.hpp"
|
||||
#include "boost/algorithm/string.hpp"
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/GlobalConfig.actor.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
|
@ -64,6 +64,8 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
|
|||
{ SpecialKeySpace::MODULE::ERRORMSG, singleKeyRange(LiteralStringRef("\xff\xff/error_message")) },
|
||||
{ SpecialKeySpace::MODULE::CONFIGURATION,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/"), LiteralStringRef("\xff\xff/configuration0")) },
|
||||
{ SpecialKeySpace::MODULE::GLOBALCONFIG,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/global_config/"), LiteralStringRef("\xff\xff/global_config0")) },
|
||||
{ SpecialKeySpace::MODULE::TRACING,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) }
|
||||
};
|
||||
|
@ -1369,11 +1371,128 @@ Future<Optional<std::string>> ConsistencyCheckImpl::commit(ReadYourWritesTransac
|
|||
return Optional<std::string>();
|
||||
}
|
||||
|
||||
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {
|
||||
TraceEvent("TracingOptionsImpl::TracingOptionsImpl").detail("Range", kr);
|
||||
GlobalConfigImpl::GlobalConfigImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
Standalone<RangeResultRef> result;
|
||||
|
||||
// if (kr.begin != kr.end) {
|
||||
// ryw->setSpecialKeySpaceErrorMsg("get range disabled, please fetch a single key");
|
||||
// throw special_keys_api_failure();
|
||||
// }
|
||||
|
||||
auto& globalConfig = GlobalConfig::globalConfig();
|
||||
KeyRef key = kr.begin.removePrefix(getKeyRange().begin);
|
||||
const std::any& any = globalConfig.get(key);
|
||||
if (any.has_value()) {
|
||||
if (any.type() == typeid(Standalone<StringRef>)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, std::any_cast<Standalone<StringRef>>(globalConfig.get(key)).contents()));
|
||||
} else if (any.type() == typeid(int64_t)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, std::to_string(std::any_cast<int64_t>(globalConfig.get(key)))));
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
void GlobalConfigImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
|
||||
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(value)));
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* globalConfig, ReadYourWritesTransaction* ryw) {
|
||||
state Transaction& tr = ryw->getTransaction();
|
||||
|
||||
// History should only contain three most recent updates. If it currently
|
||||
// has three items, remove the oldest to make room for a new item.
|
||||
Standalone<RangeResultRef> history = wait(tr.getRange(globalConfigHistoryKeys, CLIENT_KNOBS->TOO_MANY, false, true));
|
||||
constexpr int kGlobalConfigMaxHistorySize = 3;
|
||||
if (history.size() > kGlobalConfigMaxHistorySize - 1) {
|
||||
std::vector<KeyRef> keys;
|
||||
for (const auto& kv : history) {
|
||||
keys.push_back(kv.key);
|
||||
}
|
||||
// Fix ordering of returned keys. This will ensure versions are ordered
|
||||
// numerically; for example \xff/globalConfig/h/1000 should come after
|
||||
// \xff/globalConfig/h/999.
|
||||
std::sort(keys.begin(), keys.end(), [](const KeyRef& lhs, const KeyRef& rhs) {
|
||||
if (lhs.size() != rhs.size()) {
|
||||
return lhs.size() < rhs.size();
|
||||
}
|
||||
return lhs.compare(rhs) < 0;
|
||||
});
|
||||
|
||||
// Cannot use a range clear because of how keys are ordered in FDB.
|
||||
// \xff/globalConfig/h/999 -> ...
|
||||
// \xff/globalConfig/h/1000 -> ...
|
||||
// \xff/globalConfig/h/1001 -> ...
|
||||
//
|
||||
// clear_range(\xff/globalConfig/h, \xff/globalConfig/h/1000) results
|
||||
// in zero key-value pairs being deleted (999 is lexicographically
|
||||
// larger than 1000, and the range is exclusive).
|
||||
for (int i = 0; i < keys.size() - (kGlobalConfigMaxHistorySize - 1); ++i) {
|
||||
tr.clear(keys[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Should probably be using the commit version...
|
||||
Version readVersion = wait(ryw->getReadVersion());
|
||||
BinaryWriter wr = BinaryWriter(AssumeVersion(g_network->protocolVersion()));
|
||||
|
||||
Arena arena;
|
||||
VectorRef<MutationRef> mutations;
|
||||
|
||||
// Transform writes from special-key-space (\xff\xff/global_config/) to
|
||||
// system key space (\xff/globalConfig/).
|
||||
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
|
||||
ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys);
|
||||
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::iterator iter = ranges.begin();
|
||||
while (iter != ranges.end()) {
|
||||
Key bareKey = iter->begin().removePrefix(globalConfig->getKeyRange().begin);
|
||||
Key systemKey = bareKey.withPrefix(globalConfigDataPrefix);
|
||||
std::pair<bool, Optional<Value>> entry = iter->value();
|
||||
if (entry.first) {
|
||||
if (entry.second.present()) {
|
||||
mutations.emplace_back_deep(arena, MutationRef(MutationRef::SetValue, bareKey, entry.second.get()));
|
||||
tr.set(systemKey, entry.second.get());
|
||||
} else {
|
||||
mutations.emplace_back_deep(arena, MutationRef(MutationRef::ClearRange, bareKey, keyAfter(bareKey)));
|
||||
tr.clear(systemKey);
|
||||
}
|
||||
}
|
||||
++iter;
|
||||
}
|
||||
|
||||
wr << std::make_pair(readVersion, mutations);
|
||||
|
||||
// Record the mutations in this commit into the global configuration history.
|
||||
Key historyVersionKey = globalConfigHistoryPrefix.withSuffix(std::to_string(readVersion));
|
||||
tr.set(historyVersionKey, wr.toValue());
|
||||
|
||||
ProtocolVersion protocolVersion = g_network->protocolVersion();
|
||||
BinaryWriter versionWriter = BinaryWriter(AssumeVersion(protocolVersion));
|
||||
versionWriter << readVersion << protocolVersion;
|
||||
tr.set(globalConfigVersionKey, versionWriter.toValue());
|
||||
|
||||
return Optional<std::string>();
|
||||
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> GlobalConfigImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
return globalConfigCommitActor(this, ryw);
|
||||
}
|
||||
|
||||
void GlobalConfigImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
void GlobalConfigImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
|
||||
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
Standalone<RangeResultRef> result;
|
||||
for (const auto& option : SpecialKeySpace::getTracingOptions()) {
|
||||
auto key = getKeyRange().begin.withSuffix(option);
|
||||
|
|
|
@ -146,6 +146,7 @@ public:
|
|||
CONFIGURATION, // Configuration of the cluster
|
||||
CONNECTIONSTRING,
|
||||
ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space
|
||||
GLOBALCONFIG, // Global configuration options synchronized to all nodes
|
||||
MANAGEMENT, // Management-API
|
||||
METRICS, // data-distribution metrics
|
||||
TESTONLY, // only used by correctness tests
|
||||
|
@ -336,6 +337,16 @@ public:
|
|||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
class GlobalConfigImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit GlobalConfigImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
|
||||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
|
||||
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
|
||||
};
|
||||
|
||||
class TracingOptionsImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit TracingOptionsImpl(KeyRangeRef kr);
|
||||
|
|
|
@ -632,7 +632,18 @@ std::string encodeFailedServersKey(AddressExclusion const& addr) {
|
|||
return failedServersPrefix.toString() + addr.toString();
|
||||
}
|
||||
|
||||
const KeyRangeRef workerListKeys(LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0"));
|
||||
const KeyRangeRef globalConfigKeys( LiteralStringRef("\xff/globalConfig/"), LiteralStringRef("\xff/globalConfig0") );
|
||||
const KeyRef globalConfigPrefix = globalConfigKeys.begin;
|
||||
|
||||
const KeyRangeRef globalConfigDataKeys( LiteralStringRef("\xff/globalConfig/k/"), LiteralStringRef("\xff/globalConfig/k0") );
|
||||
const KeyRef globalConfigDataPrefix = globalConfigDataKeys.begin;
|
||||
|
||||
const KeyRangeRef globalConfigHistoryKeys( LiteralStringRef("\xff/globalConfig/h/"), LiteralStringRef("\xff/globalConfig/h0") );
|
||||
const KeyRef globalConfigHistoryPrefix = globalConfigHistoryKeys.begin;
|
||||
|
||||
const KeyRef globalConfigVersionKey = LiteralStringRef("\xff/globalConfig/v");
|
||||
|
||||
const KeyRangeRef workerListKeys( LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0") );
|
||||
const KeyRef workerListPrefix = workerListKeys.begin;
|
||||
|
||||
const Key workerListKeyFor(StringRef processID) {
|
||||
|
|
|
@ -230,6 +230,30 @@ extern const KeyRef failedServersVersionKey; // The value of this key shall be c
|
|||
const AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix)
|
||||
std::string encodeFailedServersKey(AddressExclusion const&);
|
||||
|
||||
// "\xff/globalConfig/[[option]]" := "value"
|
||||
// An umbrella prefix for global configuration data synchronized to all nodes.
|
||||
extern const KeyRangeRef globalConfigData;
|
||||
extern const KeyRef globalConfigDataPrefix;
|
||||
|
||||
// "\xff/globalConfig/k/[[key]]" := "value"
|
||||
// Key-value pairs that have been set. The range this keyspace represents
|
||||
// contains all globally configured options.
|
||||
extern const KeyRangeRef globalConfigDataKeys;
|
||||
extern const KeyRef globalConfigDataPrefix;
|
||||
|
||||
// "\xff/globalConfig/h/[[version]]" := "value"
|
||||
// Maps a commit version to a list of mutations made to the global
|
||||
// configuration at that commit. Shipped to nodes periodically. In general,
|
||||
// clients should not write to keys in this keyspace; it will be written
|
||||
// automatically when updating global configuration keys.
|
||||
extern const KeyRangeRef globalConfigHistoryKeys;
|
||||
extern const KeyRef globalConfigHistoryPrefix;
|
||||
|
||||
// "\xff/globalConfig/v" := "version,protocol"
|
||||
// Read-only key which returns the version and protocol of the most recent
|
||||
// data written to the global configuration keyspace.
|
||||
extern const KeyRef globalConfigVersionKey;
|
||||
|
||||
// "\xff/workers/[[processID]]" := ""
|
||||
// Asynchronously updated by the cluster controller, this is a list of fdbserver processes that have joined the cluster
|
||||
// and are currently (recently) available
|
||||
|
|
|
@ -3198,26 +3198,65 @@ ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db
|
|||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
|
||||
state Optional<Value> rateVal = wait(tr.get(fdbClientInfoTxnSampleRate));
|
||||
state Optional<Value> limitVal = wait(tr.get(fdbClientInfoTxnSizeLimit));
|
||||
ClientDBInfo clientInfo = db->clientInfo->get();
|
||||
double sampleRate = rateVal.present()
|
||||
? BinaryReader::fromStringRef<double>(rateVal.get(), Unversioned())
|
||||
: std::numeric_limits<double>::infinity();
|
||||
int64_t sizeLimit =
|
||||
limitVal.present() ? BinaryReader::fromStringRef<int64_t>(limitVal.get(), Unversioned()) : -1;
|
||||
if (sampleRate != clientInfo.clientTxnInfoSampleRate ||
|
||||
sizeLimit != clientInfo.clientTxnInfoSampleRate) {
|
||||
state ClientDBInfo clientInfo = db->clientInfo->get();
|
||||
|
||||
if (globalConfigVersion.present()) {
|
||||
BinaryReader versionReader = BinaryReader(globalConfigVersion.get(), AssumeVersion(g_network->protocolVersion()));
|
||||
Version version;
|
||||
ProtocolVersion protocolVersion;
|
||||
versionReader >> version >> protocolVersion;
|
||||
|
||||
state Arena arena;
|
||||
if (protocolVersion == g_network->protocolVersion()) {
|
||||
Standalone<RangeResultRef> globalConfigHistory = wait(tr.getRange(globalConfigHistoryKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
// If the global configuration version key has been
|
||||
// set, the history should contain at least one item.
|
||||
ASSERT(globalConfigHistory.size() > 0);
|
||||
clientInfo.history.clear();
|
||||
|
||||
for (const auto& kv : globalConfigHistory) {
|
||||
BinaryReader rd = BinaryReader(kv.value, AssumeVersion(g_network->protocolVersion()));
|
||||
Standalone<std::pair<Version, VectorRef<MutationRef>>> data;
|
||||
rd >> data >> arena;
|
||||
clientInfo.history.push_back(data);
|
||||
}
|
||||
|
||||
// History should be ordered by version, ascending.
|
||||
std::sort(clientInfo.history.begin(), clientInfo.history.end(), [](const auto& lhs, const auto& rhs) {
|
||||
return lhs.first < rhs.first;
|
||||
});
|
||||
} else {
|
||||
// If the protocol version has changed, the
|
||||
// GlobalConfig actor should refresh its view by
|
||||
// reading the entire global configuration key range.
|
||||
// An empty mutation list will signal the actor to
|
||||
// refresh.
|
||||
clientInfo.history.clear();
|
||||
}
|
||||
|
||||
clientInfo.id = deterministicRandom()->randomUniqueID();
|
||||
db->clientInfo->set(clientInfo);
|
||||
}
|
||||
|
||||
// TODO: Remove this and move to global config space
|
||||
double sampleRate = rateVal.present() ? BinaryReader::fromStringRef<double>(rateVal.get(), Unversioned()) : std::numeric_limits<double>::infinity();
|
||||
int64_t sizeLimit = limitVal.present() ? BinaryReader::fromStringRef<int64_t>(limitVal.get(), Unversioned()) : -1;
|
||||
if (sampleRate != clientInfo.clientTxnInfoSampleRate || sizeLimit != clientInfo.clientTxnInfoSampleRate) {
|
||||
clientInfo.id = deterministicRandom()->randomUniqueID();
|
||||
clientInfo.clientTxnInfoSampleRate = sampleRate;
|
||||
clientInfo.clientTxnInfoSizeLimit = sizeLimit;
|
||||
db->clientInfo->set(clientInfo);
|
||||
}
|
||||
|
||||
state Future<Void> globalConfigFuture = tr.watch(globalConfigVersionKey);
|
||||
state Future<Void> watchRateFuture = tr.watch(fdbClientInfoTxnSampleRate);
|
||||
state Future<Void> watchLimitFuture = tr.watch(fdbClientInfoTxnSizeLimit);
|
||||
wait(tr.commit());
|
||||
choose {
|
||||
when (wait(globalConfigFuture)) { break; }
|
||||
when(wait(watchRateFuture)) { break; }
|
||||
when(wait(watchLimitFuture)) { break; }
|
||||
}
|
||||
|
|
|
@ -481,7 +481,8 @@ public:
|
|||
enBlobCredentialFiles = 10,
|
||||
enNetworkAddressesFunc = 11,
|
||||
enClientFailureMonitor = 12,
|
||||
enSQLiteInjectedError = 13
|
||||
enSQLiteInjectedError = 13,
|
||||
enGlobalConfig = 14
|
||||
};
|
||||
|
||||
virtual void longTaskCheck(const char* name) {}
|
||||
|
|
Loading…
Reference in New Issue