Move GlobalConfig to DatabaseContext
This commit is contained in:
parent
7578d5ebc7
commit
9173e2e19b
|
@ -48,7 +48,7 @@ ACTOR Future<bool> profileCommandActor(Database db,
|
|||
fprintf(stderr, "ERROR: Usage: profile client <get|set>\n");
|
||||
return false;
|
||||
}
|
||||
wait(GlobalConfig::globalConfig(db).onInitialized());
|
||||
wait(db->globalConfig->onInitialized());
|
||||
if (tokencmp(tokens[2], "get")) {
|
||||
if (tokens.size() != 3) {
|
||||
fprintf(stderr, "ERROR: Addtional arguments to `get` are not supported.\n");
|
||||
|
@ -56,12 +56,12 @@ ACTOR Future<bool> profileCommandActor(Database db,
|
|||
}
|
||||
std::string sampleRateStr = "default";
|
||||
std::string sizeLimitStr = "default";
|
||||
const double sampleRateDbl = GlobalConfig::globalConfig(db).get<double>(
|
||||
const double sampleRateDbl = db->globalConfig->get<double>(
|
||||
fdbClientInfoTxnSampleRate, std::numeric_limits<double>::infinity());
|
||||
if (!std::isinf(sampleRateDbl)) {
|
||||
sampleRateStr = std::to_string(sampleRateDbl);
|
||||
}
|
||||
const int64_t sizeLimit = GlobalConfig::globalConfig(db).get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
|
||||
const int64_t sizeLimit = db->globalConfig->get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
|
||||
if (sizeLimit != -1) {
|
||||
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
|
||||
}
|
||||
|
|
|
@ -222,6 +222,8 @@ struct KeyRangeLocationInfo {
|
|||
: tenantEntry(tenantEntry), range(range), locations(locations) {}
|
||||
};
|
||||
|
||||
class GlobalConfig;
|
||||
|
||||
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
|
||||
public:
|
||||
static DatabaseContext* allocateOnForeignThread() {
|
||||
|
@ -627,6 +629,7 @@ public:
|
|||
using TransactionT = ReadYourWritesTransaction;
|
||||
Reference<TransactionT> createTransaction();
|
||||
|
||||
std::unique_ptr<GlobalConfig> globalConfig;
|
||||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||
|
||||
private:
|
||||
|
|
|
@ -37,19 +37,7 @@ const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag
|
|||
const KeyRef samplingFrequency = LiteralStringRef("visibility/sampling/frequency");
|
||||
const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window");
|
||||
|
||||
GlobalConfig::GlobalConfig(Database& cx) : cx(cx), lastUpdate(0) {}
|
||||
|
||||
GlobalConfig& GlobalConfig::globalConfig(const Database& cx) {
|
||||
return GlobalConfig::globalConfig(cx->dbId);
|
||||
}
|
||||
|
||||
GlobalConfig& GlobalConfig::globalConfig(UID dbid) {
|
||||
ConfigMap* config_map = reinterpret_cast<ConfigMap*>(g_network->global(INetwork::enGlobalConfig));
|
||||
auto res = config_map->find(dbid);
|
||||
ASSERT(res != config_map->end());
|
||||
ASSERT(res->second);
|
||||
return *reinterpret_cast<GlobalConfig*>(res->second);
|
||||
}
|
||||
GlobalConfig::GlobalConfig(const Database& cx) : cx(cx), lastUpdate(0) {}
|
||||
|
||||
Key GlobalConfig::prefixedKey(KeyRef key) {
|
||||
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_GLOBALCONFIG_ACTOR_G_H)
|
||||
#define FDBCLIENT_GLOBALCONFIG_ACTOR_G_H
|
||||
#include "fdbclient/GlobalConfig.actor.g.h"
|
||||
|
@ -69,46 +70,24 @@ class GlobalConfig : NonCopyable {
|
|||
typedef std::unordered_map<UID, GlobalConfig*> ConfigMap;
|
||||
|
||||
public:
|
||||
// Creates a GlobalConfig singleton, accessed by calling
|
||||
// GlobalConfig::globalConfig(). This function requires a database object
|
||||
// to allow global configuration to run transactions on the database, and
|
||||
// an AsyncVar object to watch for changes on. The ClientDBInfo pointer
|
||||
// Requires a database object to allow global configuration to run
|
||||
// transactions on the database.
|
||||
explicit GlobalConfig(const Database& cx);
|
||||
|
||||
// Requires an AsyncVar object to watch for changes on. The ClientDBInfo pointer
|
||||
// should point to a ClientDBInfo object which will contain the updated
|
||||
// global configuration history when the given AsyncVar changes. This
|
||||
// function should be called whenever the database object changes, in order
|
||||
// to allow global configuration to run transactions on the latest
|
||||
// database.
|
||||
template <class T>
|
||||
static void create(Database& cx, Reference<AsyncVar<T> const> db, const ClientDBInfo* dbInfo) {
|
||||
auto config_map =
|
||||
reinterpret_cast<ConfigMap*>(g_network->global(INetwork::enGlobalConfig));
|
||||
if (config_map == nullptr) {
|
||||
auto m = new ConfigMap();
|
||||
g_network->setGlobal(INetwork::enGlobalConfig, m);
|
||||
config_map = m;
|
||||
}
|
||||
|
||||
auto it = config_map->find(cx.dbId());
|
||||
if (it == config_map->end()) {
|
||||
auto config = new GlobalConfig{ cx };
|
||||
config_map->emplace(cx.dbId(), config);
|
||||
config->_updater = updater(config, dbInfo);
|
||||
// Bind changes in `db` to the `dbInfoChanged` AsyncTrigger.
|
||||
// TODO: Change AsyncTrigger to a Reference
|
||||
forward(db, std::addressof(config->dbInfoChanged));
|
||||
} else {
|
||||
GlobalConfig* config = it->second;
|
||||
config->cx = cx;
|
||||
config->_updater = updater(config, dbInfo);
|
||||
}
|
||||
void init(Reference<AsyncVar<T> const> db, const ClientDBInfo* dbInfo) {
|
||||
_updater = updater(this, dbInfo);
|
||||
// Bind changes in `db` to the `dbInfoChanged` AsyncTrigger.
|
||||
// TODO: Change AsyncTrigger to a Reference
|
||||
_forward = forward(db, std::addressof(dbInfoChanged));
|
||||
}
|
||||
|
||||
// Returns a reference to the global GlobalConfig object. Clients should
|
||||
// call this function whenever they need to read a value out of the global
|
||||
// configuration.
|
||||
static GlobalConfig& globalConfig(const Database& cx);
|
||||
static GlobalConfig& globalConfig(UID dbid);
|
||||
|
||||
// Use this function to turn a global configuration key defined above into
|
||||
// the full path needed to set the value in the database.
|
||||
//
|
||||
|
@ -164,8 +143,6 @@ public:
|
|||
void trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn);
|
||||
|
||||
private:
|
||||
GlobalConfig(Database& cx);
|
||||
|
||||
// The functions below only affect the local copy of the global
|
||||
// configuration keyspace! To insert or remove values across all nodes you
|
||||
// must use a transaction (see the note above).
|
||||
|
@ -187,6 +164,7 @@ private:
|
|||
|
||||
Database cx;
|
||||
AsyncTrigger dbInfoChanged;
|
||||
Future<Void> _forward;
|
||||
Future<Void> _updater;
|
||||
Promise<Void> initialized;
|
||||
AsyncTrigger configChanged;
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <algorithm>
|
||||
#include <cstdio>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <regex>
|
||||
#include <unordered_set>
|
||||
#include <tuple>
|
||||
|
@ -809,12 +810,12 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
|
|||
}
|
||||
}
|
||||
cx->clientStatusUpdater.outStatusQ.clear();
|
||||
wait(GlobalConfig::globalConfig(cx->dbId).onInitialized());
|
||||
double sampleRate = GlobalConfig::globalConfig(cx->dbId).get<double>(
|
||||
wait(cx->globalConfig->onInitialized());
|
||||
double sampleRate = cx->globalConfig->get<double>(
|
||||
fdbClientInfoTxnSampleRate, std::numeric_limits<double>::infinity());
|
||||
double clientSamplingProbability =
|
||||
std::isinf(sampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : sampleRate;
|
||||
int64_t sizeLimit = GlobalConfig::globalConfig(cx->dbId).get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
|
||||
int64_t sizeLimit = cx->globalConfig->get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
|
||||
int64_t clientTxnInfoSizeLimit = sizeLimit == -1 ? CLIENT_KNOBS->CSI_SIZE_LIMIT : sizeLimit;
|
||||
if (!trChunksQ.empty() && deterministicRandom()->random01() < clientSamplingProbability)
|
||||
wait(delExcessClntTxnEntriesActor(&tr, clientTxnInfoSizeLimit));
|
||||
|
@ -1481,6 +1482,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
cacheListMonitor = monitorCacheList(this);
|
||||
|
||||
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
|
||||
globalConfig = std::make_unique<GlobalConfig>(Database(this));
|
||||
|
||||
if (apiVersionAtLeast(710)) {
|
||||
registerSpecialKeysImpl(
|
||||
|
@ -1554,7 +1556,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
registerSpecialKeysImpl(
|
||||
SpecialKeySpace::MODULE::GLOBALCONFIG,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<GlobalConfigImpl>(dbId, SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
|
||||
std::make_unique<GlobalConfigImpl>(globalConfig.get(),
|
||||
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::TRACING,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
|
@ -1937,13 +1940,12 @@ Future<Void> DatabaseContext::onProxiesChanged() const {
|
|||
}
|
||||
|
||||
bool DatabaseContext::sampleReadTags() const {
|
||||
double sampleRate = GlobalConfig::globalConfig(dbId).get(transactionTagSampleRate, CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
|
||||
double sampleRate = globalConfig->get(transactionTagSampleRate, CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
|
||||
return sampleRate > 0 && deterministicRandom()->random01() <= sampleRate;
|
||||
}
|
||||
|
||||
bool DatabaseContext::sampleOnCost(uint64_t cost) const {
|
||||
double sampleCost =
|
||||
GlobalConfig::globalConfig(dbId).get<double>(transactionTagSampleCost, CLIENT_KNOBS->COMMIT_SAMPLE_COST);
|
||||
double sampleCost = globalConfig->get<double>(transactionTagSampleCost, CLIENT_KNOBS->COMMIT_SAMPLE_COST);
|
||||
if (sampleCost <= 0)
|
||||
return false;
|
||||
return deterministicRandom()->random01() <= (double)cost / sampleCost;
|
||||
|
@ -2219,10 +2221,9 @@ Database Database::createDatabase(Reference<IClusterConnectionRecord> connRecord
|
|||
}
|
||||
|
||||
auto database = Database(db);
|
||||
GlobalConfig::create(
|
||||
database, Reference<AsyncVar<ClientDBInfo> const>(clientInfo), std::addressof(clientInfo->get()));
|
||||
GlobalConfig::globalConfig(database).trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
GlobalConfig::globalConfig(database).trigger(samplingWindow, samplingProfilerUpdateWindow);
|
||||
database->globalConfig->init(Reference<AsyncVar<ClientDBInfo> const>(clientInfo), std::addressof(clientInfo->get()));
|
||||
database->globalConfig->trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
database->globalConfig->trigger(samplingWindow, samplingProfilerUpdateWindow);
|
||||
|
||||
TraceEvent("ConnectToDatabase", database->dbId)
|
||||
.detail("Version", FDB_VT_VERSION)
|
||||
|
@ -7948,7 +7949,7 @@ void Transaction::checkDeferredError() const {
|
|||
|
||||
Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) {
|
||||
if (!cx->isError()) {
|
||||
double clientSamplingProbability = GlobalConfig::globalConfig(cx).get<double>(
|
||||
double clientSamplingProbability = cx->globalConfig->get<double>(
|
||||
fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY);
|
||||
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
|
||||
deterministicRandom()->random01() < clientSamplingProbability &&
|
||||
|
|
|
@ -1454,7 +1454,8 @@ Future<Optional<std::string>> ConsistencyCheckImpl::commit(ReadYourWritesTransac
|
|||
return Optional<std::string>();
|
||||
}
|
||||
|
||||
GlobalConfigImpl::GlobalConfigImpl(UID dbId, KeyRangeRef kr) : dbId(dbId), SpecialKeyRangeRWImpl(kr) {}
|
||||
GlobalConfigImpl::GlobalConfigImpl(GlobalConfig* config, KeyRangeRef kr)
|
||||
: globalConfig(config), SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
// Returns key-value pairs for each value stored in the global configuration
|
||||
// framework within the range specified. The special-key-space getrange
|
||||
|
@ -1464,11 +1465,9 @@ Future<RangeResult> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw,
|
|||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
RangeResult result;
|
||||
|
||||
auto& globalConfig = GlobalConfig::globalConfig(dbId);
|
||||
KeyRangeRef modified =
|
||||
KeyRangeRef(kr.begin.removePrefix(getKeyRange().begin), kr.end.removePrefix(getKeyRange().begin));
|
||||
std::map<KeyRef, Reference<ConfigValue>> values = globalConfig.get(modified);
|
||||
std::map<KeyRef, Reference<ConfigValue>> values = globalConfig->get(modified);
|
||||
for (const auto& [key, config] : values) {
|
||||
Key prefixedKey = key.withPrefix(getKeyRange().begin);
|
||||
if (config.isValid() && config->value.has_value()) {
|
||||
|
|
|
@ -425,9 +425,10 @@ public:
|
|||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
class GlobalConfig;
|
||||
class GlobalConfigImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit GlobalConfigImpl(UID dbId, KeyRangeRef kr);
|
||||
explicit GlobalConfigImpl(GlobalConfig* config, KeyRangeRef kr);
|
||||
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const override;
|
||||
|
@ -437,7 +438,7 @@ public:
|
|||
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
|
||||
|
||||
private:
|
||||
UID dbId;
|
||||
GlobalConfig* globalConfig;
|
||||
};
|
||||
|
||||
class TracingOptionsImpl : public SpecialKeyRangeRWImpl {
|
||||
|
|
|
@ -167,9 +167,9 @@ Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
|
|||
enableLocalityLoadBalance,
|
||||
taskID,
|
||||
lockAware);
|
||||
GlobalConfig::create(cx, db, std::addressof(db->get().client));
|
||||
GlobalConfig::globalConfig(cx).trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
GlobalConfig::globalConfig(cx).trigger(samplingWindow, samplingProfilerUpdateWindow);
|
||||
cx->globalConfig->init(db, std::addressof(db->get().client));
|
||||
cx->globalConfig->trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
cx->globalConfig->trigger(samplingWindow, samplingProfilerUpdateWindow);
|
||||
return cx;
|
||||
}
|
||||
|
||||
|
@ -1606,7 +1606,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
state Database db =
|
||||
Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, IsInternal::True, locality);
|
||||
metricsLogger = runMetrics(db, KeyRef(metricsPrefix));
|
||||
GlobalConfig::globalConfig(db).trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
db->globalConfig->trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);
|
||||
}
|
||||
|
@ -1614,7 +1614,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
auto lockAware = metricsPrefix.size() && metricsPrefix[0] == '\xff' ? LockAware::True : LockAware::False;
|
||||
auto database = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, lockAware);
|
||||
metricsLogger = runMetrics(database, KeyRef(metricsPrefix));
|
||||
GlobalConfig::globalConfig(database).trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
database->globalConfig->trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -707,7 +707,7 @@ private:
|
|||
// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
|
||||
// the AsyncTrigger is triggered.
|
||||
ACTOR template <class T>
|
||||
void forward(Reference<AsyncVar<T> const> from, AsyncTrigger* to) {
|
||||
Future<Void> forward(Reference<AsyncVar<T> const> from, AsyncTrigger* to) {
|
||||
loop {
|
||||
wait(from->onChange());
|
||||
to->trigger();
|
||||
|
|
Loading…
Reference in New Issue