Revert "Fix global config not triggering changes on server processes"
This commit is contained in:
parent
ccfc48d89a
commit
cdf98f987d
|
@ -296,6 +296,17 @@ boost::asio::io_context& ActorLineageProfilerT::context() {
|
|||
|
||||
SampleIngestor::~SampleIngestor() {}
|
||||
|
||||
// Callback used to update the sampling profilers run frequency whenever the
|
||||
// frequency changes.
|
||||
void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
|
||||
double frequency = 0;
|
||||
if (freq.has_value()) {
|
||||
frequency = std::any_cast<double>(freq.value());
|
||||
}
|
||||
TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency);
|
||||
ActorLineageProfiler::instance().setFrequency(frequency);
|
||||
}
|
||||
|
||||
void ProfilerConfigT::reset(std::map<std::string, std::string> const& config) {
|
||||
bool expectNoMore = false, useFluentD = false, useTCP = false;
|
||||
std::string endpoint;
|
||||
|
@ -359,17 +370,6 @@ std::map<std::string, std::string> ProfilerConfigT::getConfig() const {
|
|||
return res;
|
||||
}
|
||||
|
||||
// Callback used to update the sampling profilers run frequency whenever the
|
||||
// frequency changes.
|
||||
void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
|
||||
double frequency = 0;
|
||||
if (freq.has_value()) {
|
||||
frequency = std::any_cast<double>(freq.value());
|
||||
}
|
||||
TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency);
|
||||
ActorLineageProfiler::instance().setFrequency(frequency);
|
||||
}
|
||||
|
||||
// Callback used to update the sample collector window size.
|
||||
void samplingProfilerUpdateWindow(std::optional<std::any> window) {
|
||||
double duration = 0;
|
||||
|
|
|
@ -39,12 +39,26 @@ const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window");
|
|||
|
||||
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
|
||||
|
||||
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
|
||||
auto config = new GlobalConfig{};
|
||||
config->cx = Database(cx);
|
||||
config->dbInfo = dbInfo;
|
||||
g_network->setGlobal(INetwork::enGlobalConfig, config);
|
||||
config->_updater = updater(config);
|
||||
}
|
||||
}
|
||||
|
||||
GlobalConfig& GlobalConfig::globalConfig() {
|
||||
void* res = g_network->global(INetwork::enGlobalConfig);
|
||||
ASSERT(res);
|
||||
return *reinterpret_cast<GlobalConfig*>(res);
|
||||
}
|
||||
|
||||
void GlobalConfig::updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
// this->dbInfo = dbInfo;
|
||||
}
|
||||
|
||||
Key GlobalConfig::prefixedKey(KeyRef key) {
|
||||
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
|
||||
}
|
||||
|
@ -106,7 +120,7 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
|||
callbacks[stableKey](data[stableKey]->value);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "GlobalConfigTupleParseError").detail("What", e.what());
|
||||
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,32 +159,29 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
|||
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
|
||||
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
|
||||
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
// The value doesn't matter too much, as long as the key is set.
|
||||
tr->set(migratedKey.contents(), "1"_sr);
|
||||
if (sampleRate.present()) {
|
||||
const double sampleRateDbl =
|
||||
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
|
||||
Tuple rate = Tuple().appendDouble(sampleRateDbl);
|
||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
|
||||
}
|
||||
if (sizeLimit.present()) {
|
||||
const int64_t sizeLimitInt =
|
||||
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
|
||||
Tuple size = Tuple().append(sizeLimitInt);
|
||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
// The value doesn't matter too much, as long as the key is set.
|
||||
tr->set(migratedKey.contents(), "1"_sr);
|
||||
if (sampleRate.present()) {
|
||||
const double sampleRateDbl =
|
||||
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
|
||||
Tuple rate = Tuple().appendDouble(sampleRateDbl);
|
||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
|
||||
}
|
||||
if (sizeLimit.present()) {
|
||||
const int64_t sizeLimitInt =
|
||||
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
|
||||
Tuple size = Tuple().append(sizeLimitInt);
|
||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
|
||||
}
|
||||
|
||||
wait(tr->commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
// If multiple fdbserver processes are started at once, they will all
|
||||
// attempt this migration at the same time, sometimes resulting in
|
||||
// aborts due to conflicts. Purposefully avoid retrying, making this
|
||||
// migration best-effort.
|
||||
TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what());
|
||||
throw;
|
||||
wait(tr->commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,8 +201,8 @@ ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
|||
|
||||
// Applies updates to the local copy of the global configuration when this
|
||||
// process receives an updated history.
|
||||
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo* dbInfo) {
|
||||
wait(self->cx->onConnected());
|
||||
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self) {
|
||||
// wait(self->cx->onConnected());
|
||||
wait(self->migrate(self));
|
||||
|
||||
wait(self->refresh(self));
|
||||
|
@ -199,9 +210,9 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo*
|
|||
|
||||
loop {
|
||||
try {
|
||||
wait(self->dbInfoChanged.onTrigger());
|
||||
wait(self->dbInfo->onChange());
|
||||
|
||||
auto& history = dbInfo->history;
|
||||
auto& history = self->dbInfo->get().history;
|
||||
if (history.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
|
@ -211,8 +222,8 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo*
|
|||
// history updates or the protocol version changed, so it
|
||||
// must re-read the entire configuration range.
|
||||
wait(self->refresh(self));
|
||||
if (dbInfo->history.size() > 0) {
|
||||
self->lastUpdate = dbInfo->history.back().version;
|
||||
if (self->dbInfo->get().history.size() > 0) {
|
||||
self->lastUpdate = self->dbInfo->get().history.back().version;
|
||||
}
|
||||
} else {
|
||||
// Apply history in order, from lowest version to highest
|
||||
|
|
|
@ -67,31 +67,23 @@ struct ConfigValue : ReferenceCounted<ConfigValue> {
|
|||
|
||||
class GlobalConfig : NonCopyable {
|
||||
public:
|
||||
// Creates a GlobalConfig singleton, accessed by calling globalConfig().
|
||||
// This function requires a database context object to allow global
|
||||
// configuration to run transactions on the database, and an AsyncVar
|
||||
// object to watch for changes on. The ClientDBInfo pointer should point to
|
||||
// a ClientDBInfo object which will contain the updated global
|
||||
// configuration history when the given AsyncVar changes. This function
|
||||
// should only be called once (however, it is idempotent and calling it
|
||||
// multiple times will have no effect).
|
||||
template <class T>
|
||||
static void create(DatabaseContext* cx, Reference<AsyncVar<T>> db, const ClientDBInfo* dbInfo) {
|
||||
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
|
||||
auto config = new GlobalConfig{};
|
||||
config->cx = Database(cx);
|
||||
g_network->setGlobal(INetwork::enGlobalConfig, config);
|
||||
config->_updater = updater(config, dbInfo);
|
||||
// Bind changes in `db` to the `dbInfoChanged` AsyncTrigger.
|
||||
forward(db, std::addressof(config->dbInfoChanged));
|
||||
}
|
||||
}
|
||||
// Creates a GlobalConfig singleton, accessed by calling GlobalConfig().
|
||||
// This function should only be called once by each process (however, it is
|
||||
// idempotent and calling it multiple times will have no effect).
|
||||
static void create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
||||
|
||||
// Returns a reference to the global GlobalConfig object. Clients should
|
||||
// call this function whenever they need to read a value out of the global
|
||||
// configuration.
|
||||
static GlobalConfig& globalConfig();
|
||||
|
||||
// Updates the ClientDBInfo object used by global configuration to read new
|
||||
// data. For server processes, this value needs to be set by the cluster
|
||||
// controller, but global config is initialized before the cluster
|
||||
// controller is, so this function provides a mechanism to update the
|
||||
// object after initialization.
|
||||
void updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
||||
|
||||
// Use this function to turn a global configuration key defined above into
|
||||
// the full path needed to set the value in the database.
|
||||
//
|
||||
|
@ -164,10 +156,10 @@ private:
|
|||
|
||||
ACTOR static Future<Void> migrate(GlobalConfig* self);
|
||||
ACTOR static Future<Void> refresh(GlobalConfig* self);
|
||||
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);
|
||||
ACTOR static Future<Void> updater(GlobalConfig* self);
|
||||
|
||||
Database cx;
|
||||
AsyncTrigger dbInfoChanged;
|
||||
Reference<AsyncVar<ClientDBInfo>> dbInfo;
|
||||
Future<Void> _updater;
|
||||
Promise<Void> initialized;
|
||||
AsyncTrigger configChanged;
|
||||
|
|
|
@ -962,6 +962,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
|
||||
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
|
||||
|
||||
GlobalConfig::create(this, clientInfo);
|
||||
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);
|
||||
|
||||
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
cacheListMonitor = monitorCacheList(this);
|
||||
|
@ -1564,9 +1568,6 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
|
|||
/*switchable*/ true);
|
||||
}
|
||||
|
||||
GlobalConfig::create(db, clientInfo, std::addressof(clientInfo->get()));
|
||||
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);
|
||||
return Database(db);
|
||||
}
|
||||
|
||||
|
|
|
@ -135,7 +135,9 @@ public:
|
|||
true,
|
||||
TaskPriority::DefaultEndpoint,
|
||||
true)) // SOMEDAY: Locality!
|
||||
{}
|
||||
{
|
||||
GlobalConfig::globalConfig().updateDBInfo(clientInfo);
|
||||
}
|
||||
|
||||
void setDistributor(const DataDistributorInterface& interf) {
|
||||
auto newInfo = serverInfo->get();
|
||||
|
@ -3769,7 +3771,7 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
|||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
|
||||
state ClientDBInfo clientInfo = db->serverInfo->get().client;
|
||||
state ClientDBInfo clientInfo = db->clientInfo->get();
|
||||
|
||||
if (globalConfigVersion.present()) {
|
||||
// Since the history keys end with versionstamps, they
|
||||
|
@ -3827,15 +3829,6 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
|||
}
|
||||
|
||||
clientInfo.id = deterministicRandom()->randomUniqueID();
|
||||
|
||||
// Update ServerDBInfo so fdbserver processes receive updated history.
|
||||
ServerDBInfo serverInfo = db->serverInfo->get();
|
||||
serverInfo.id = deterministicRandom()->randomUniqueID();
|
||||
serverInfo.infoGeneration = ++db->dbInfoCount;
|
||||
serverInfo.client = clientInfo;
|
||||
db->serverInfo->set(serverInfo);
|
||||
|
||||
// Update ClientDBInfo so client processes receive updated history.
|
||||
db->clientInfo->set(clientInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -147,16 +147,12 @@ Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
|
|||
bool enableLocalityLoadBalance,
|
||||
bool lockAware) {
|
||||
auto info = makeReference<AsyncVar<ClientDBInfo>>();
|
||||
auto cx = DatabaseContext::create(info,
|
||||
extractClientInfo(db, info),
|
||||
enableLocalityLoadBalance ? db->get().myLocality : LocalityData(),
|
||||
enableLocalityLoadBalance,
|
||||
taskID,
|
||||
lockAware);
|
||||
GlobalConfig::create(cx.getPtr(), db, std::addressof(db->get().client));
|
||||
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);
|
||||
return cx;
|
||||
return DatabaseContext::create(info,
|
||||
extractClientInfo(db, info),
|
||||
enableLocalityLoadBalance ? db->get().myLocality : LocalityData(),
|
||||
enableLocalityLoadBalance,
|
||||
taskID,
|
||||
lockAware);
|
||||
}
|
||||
|
||||
struct ErrorInfo {
|
||||
|
@ -1053,6 +1049,8 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
|||
metricsLogger = runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, lockAware),
|
||||
KeyRef(metricsPrefix));
|
||||
}
|
||||
|
||||
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
}
|
||||
|
||||
errorForwarders.add(resetAfter(degraded,
|
||||
|
|
|
@ -697,16 +697,6 @@ private:
|
|||
AsyncVar<Void> v;
|
||||
};
|
||||
|
||||
// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
|
||||
// the AsyncTrigger is triggered.
|
||||
ACTOR template <class T>
|
||||
void forward(Reference<AsyncVar<T>> from, AsyncTrigger* to) {
|
||||
loop {
|
||||
wait(from->onChange());
|
||||
to->trigger();
|
||||
}
|
||||
}
|
||||
|
||||
class Debouncer : NonCopyable {
|
||||
public:
|
||||
explicit Debouncer(double delay) { worker = debounceWorker(this, delay); }
|
||||
|
|
Loading…
Reference in New Issue