Merge remote-tracking branch 'sfc/features/actor-lineage' into features/actor-lineage-fluentd
This commit is contained in:
commit
3fcbed1fbd
|
@ -213,7 +213,7 @@ void SampleCollection_t::refresh() {
|
|||
oldest = data.front()->time;
|
||||
}
|
||||
}
|
||||
config->ingest(sample);
|
||||
//config->ingest(sample);
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<Sample>> SampleCollection_t::get(double from /*= 0.0*/,
|
||||
|
@ -251,19 +251,30 @@ void ActorLineageProfilerT::stop() {
|
|||
}
|
||||
|
||||
void ActorLineageProfilerT::setFrequency(unsigned frequency) {
|
||||
unsigned oldFrequency = this->frequency;
|
||||
bool change = this->frequency != frequency;
|
||||
this->frequency = frequency;
|
||||
if (frequency != 0 && !profilerThread.joinable()) {
|
||||
profilerThread = std::thread(std::bind(&ActorLineageProfilerT::profile, this));
|
||||
} else if (change) {
|
||||
|
||||
if (change) {
|
||||
// Profiler thread will automatically switch to new frequency after
|
||||
// being triggered by the the condition variable. Only need to start a
|
||||
// new profiler thread if the old one has been stopped due to the
|
||||
// profiler thread returning (frequency set to 0).
|
||||
if (oldFrequency == 0 && frequency != 0) {
|
||||
std::thread(&ActorLineageProfilerT::profile, this).detach();
|
||||
}
|
||||
cond.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
void ActorLineageProfilerT::profile() {
|
||||
static std::atomic_int profileThreadCount = 0;
|
||||
ASSERT(++profileThreadCount == 1);
|
||||
|
||||
for (;;) {
|
||||
collection->refresh();
|
||||
if (frequency == 0) {
|
||||
profileThreadCount--;
|
||||
return;
|
||||
}
|
||||
{
|
||||
|
@ -272,9 +283,21 @@ void ActorLineageProfilerT::profile() {
|
|||
// cond.wait_until(lock, lastSample + std::chrono::milliseconds)
|
||||
}
|
||||
if (frequency == 0) {
|
||||
profileThreadCount--;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SampleIngestor::~SampleIngestor() {}
|
||||
|
||||
// Callback used to update the sampling profilers run frequency whenever the
|
||||
// frequency changes.
|
||||
void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
|
||||
double frequency = 0;
|
||||
if (freq.has_value()) {
|
||||
frequency = std::any_cast<double>(freq.value());
|
||||
}
|
||||
TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency);
|
||||
ActorLineageProfiler::instance().setFrequency(frequency);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/AnnotateActor.h"
|
||||
|
||||
#include <optional>
|
||||
|
@ -30,6 +31,8 @@
|
|||
#include "flow/singleton.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
void samplingProfilerUpdateFrequency(std::optional<std::any> freq);
|
||||
|
||||
struct IALPCollectorBase {
|
||||
virtual std::optional<std::any> collect(ActorLineage*) = 0;
|
||||
virtual const std::string_view& name() = 0;
|
||||
|
@ -109,7 +112,7 @@ private:
|
|||
public:
|
||||
void addCollector(IALPCollectorBase* collector) { collectors.push_back(collector); }
|
||||
std::shared_ptr<Sample> collect();
|
||||
void addGetter(WaitState waitState, Getter const& getter);
|
||||
void addGetter(WaitState waitState, Getter const& getter) { getSamples[waitState] = getter; };
|
||||
};
|
||||
|
||||
using SampleCollector = crossbow::singleton<SampleCollectorT>;
|
||||
|
@ -164,3 +167,5 @@ public:
|
|||
void setFrequency(unsigned frequency);
|
||||
void stop();
|
||||
};
|
||||
|
||||
using ActorLineageProfiler = crossbow::singleton<ActorLineageProfilerT>;
|
||||
|
|
|
@ -34,6 +34,8 @@ const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_inf
|
|||
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
|
||||
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
|
||||
|
||||
const KeyRef samplingFrequency = LiteralStringRef("visibility/sampling/frequency");
|
||||
|
||||
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
|
||||
|
||||
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
|
@ -45,6 +47,10 @@ void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>>
|
|||
}
|
||||
}
|
||||
|
||||
void GlobalConfig::updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
_updater = updater(&GlobalConfig::globalConfig(), dbInfo);
|
||||
}
|
||||
|
||||
GlobalConfig& GlobalConfig::globalConfig() {
|
||||
void* res = g_network->global(INetwork::enGlobalConfig);
|
||||
ASSERT(res);
|
||||
|
@ -77,6 +83,14 @@ Future<Void> GlobalConfig::onInitialized() {
|
|||
return initialized.getFuture();
|
||||
}
|
||||
|
||||
Future<Void> GlobalConfig::onChange() {
|
||||
return configChanged.onTrigger();
|
||||
}
|
||||
|
||||
void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn) {
|
||||
callbacks.emplace(key, std::move(fn));
|
||||
}
|
||||
|
||||
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||
data.erase(key);
|
||||
|
||||
|
@ -89,6 +103,8 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
|||
any = StringRef(arena, t.getString(0).contents());
|
||||
} else if (t.getType(0) == Tuple::ElementType::INT) {
|
||||
any = t.getInt(0);
|
||||
} else if (t.getType(0) == Tuple::ElementType::BOOL) {
|
||||
any = t.getBool(0);
|
||||
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
|
||||
any = t.getFloat(0);
|
||||
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
|
||||
|
@ -97,19 +113,26 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
|||
ASSERT(false);
|
||||
}
|
||||
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
|
||||
|
||||
if (callbacks.find(stableKey) != callbacks.end()) {
|
||||
callbacks[stableKey](data[stableKey]->value);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void GlobalConfig::erase(KeyRef key) {
|
||||
data.erase(key);
|
||||
erase(KeyRangeRef(key, keyAfter(key)));
|
||||
}
|
||||
|
||||
void GlobalConfig::erase(KeyRangeRef range) {
|
||||
auto it = data.begin();
|
||||
while (it != data.end()) {
|
||||
if (range.contains(it->first)) {
|
||||
if (callbacks.find(it->first) != callbacks.end()) {
|
||||
callbacks[it->first](std::nullopt);
|
||||
}
|
||||
it = data.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
|
@ -163,7 +186,9 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
|||
// Updates local copy of global configuration by reading the entire key-range
|
||||
// from storage.
|
||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
||||
self->data.clear();
|
||||
for (const auto& [key, _] : self->data) {
|
||||
self->erase(key);
|
||||
}
|
||||
|
||||
Transaction tr(self->cx);
|
||||
Standalone<RangeResultRef> result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
|
@ -222,6 +247,8 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<
|
|||
self->lastUpdate = vh.version;
|
||||
}
|
||||
}
|
||||
|
||||
self->configChanged.trigger();
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
|
|
|
@ -27,7 +27,9 @@
|
|||
#define FDBCLIENT_GLOBALCONFIG_ACTOR_H
|
||||
|
||||
#include <any>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
|
||||
|
@ -49,6 +51,8 @@ extern const KeyRef fdbClientInfoTxnSizeLimit;
|
|||
extern const KeyRef transactionTagSampleRate;
|
||||
extern const KeyRef transactionTagSampleCost;
|
||||
|
||||
extern const KeyRef samplingFrequency;
|
||||
|
||||
// Structure used to hold the values stored by global configuration. The arena
|
||||
// is used as memory to store both the key and the value (the value is only
|
||||
// stored in the arena if it is an object; primitives are just copied).
|
||||
|
@ -78,6 +82,14 @@ public:
|
|||
// For example, given "config/a", returns "\xff\xff/global_config/config/a".
|
||||
static Key prefixedKey(KeyRef key);
|
||||
|
||||
// Update the ClientDBInfo object used internally to check for updates to
|
||||
// global configuration. The ClientDBInfo reference must be the same one
|
||||
// used in the cluster controller, but fdbserver requires initial creation
|
||||
// of the GlobalConfig class before the cluster controller is initialized.
|
||||
// This function allows the ClientDBInfo object to be updated after create
|
||||
// was called.
|
||||
void updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
||||
|
||||
// Get a value from the framework. Values are returned as a ConfigValue
|
||||
// reference which also contains the arena holding the object. As long as
|
||||
// the caller keeps the ConfigValue reference, the value is guaranteed to
|
||||
|
@ -114,6 +126,16 @@ public:
|
|||
// been created and is ready.
|
||||
Future<Void> onInitialized();
|
||||
|
||||
// Triggers the returned future when any key-value pair in the global
|
||||
// configuration changes.
|
||||
Future<Void> onChange();
|
||||
|
||||
// Calls \ref fn when the value associated with \ref key is changed. \ref
|
||||
// fn is passed the updated value for the key, or an empty optional if the
|
||||
// key has been cleared. If the value is an allocated object, its memory
|
||||
// remains in the control of the global configuration.
|
||||
void trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn);
|
||||
|
||||
private:
|
||||
GlobalConfig();
|
||||
|
||||
|
@ -139,8 +161,10 @@ private:
|
|||
Database cx;
|
||||
Future<Void> _updater;
|
||||
Promise<Void> initialized;
|
||||
AsyncTrigger configChanged;
|
||||
std::unordered_map<StringRef, Reference<ConfigValue>> data;
|
||||
Version lastUpdate;
|
||||
std::unordered_map<KeyRef, std::function<void(std::optional<std::any>)>> callbacks;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbrpc/MultiInterface.h"
|
||||
|
||||
#include "fdbclient/ActorLineageProfiler.h"
|
||||
#include "fdbclient/AnnotateActor.h"
|
||||
#include "fdbclient/Atomic.h"
|
||||
#include "fdbclient/ClusterInterface.h"
|
||||
|
@ -960,6 +961,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
|
||||
|
||||
GlobalConfig::create(this, clientInfo);
|
||||
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
|
||||
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
|
|
|
@ -1397,6 +1397,9 @@ Future<Standalone<RangeResultRef>> GlobalConfigImpl::getRange(ReadYourWritesTran
|
|||
} else if (config->value.type() == typeid(int64_t)) {
|
||||
result.push_back_deep(result.arena(),
|
||||
KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(config->value))));
|
||||
} else if (config->value.type() == typeid(bool)) {
|
||||
result.push_back_deep(result.arena(),
|
||||
KeyValueRef(prefixedKey, std::to_string(std::any_cast<bool>(config->value))));
|
||||
} else if (config->value.type() == typeid(float)) {
|
||||
result.push_back_deep(result.arena(),
|
||||
KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(config->value))));
|
||||
|
|
|
@ -71,6 +71,8 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
|
|||
i += sizeof(float) + 1;
|
||||
} else if (data[i] == 0x21) {
|
||||
i += sizeof(double) + 1;
|
||||
} else if (data[i] == 0x26 || data[i] == 0x27) {
|
||||
i += 1;
|
||||
} else if (data[i] == '\x00') {
|
||||
i += 1;
|
||||
} else {
|
||||
|
@ -144,6 +146,16 @@ Tuple& Tuple::append(int64_t value) {
|
|||
return *this;
|
||||
}
|
||||
|
||||
Tuple& Tuple::appendBool(bool value) {
|
||||
offsets.push_back(data.size());
|
||||
if (value) {
|
||||
data.push_back(data.arena(), 0x27);
|
||||
} else {
|
||||
data.push_back(data.arena(), 0x26);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
Tuple& Tuple::appendFloat(float value) {
|
||||
offsets.push_back(data.size());
|
||||
float swap = bigEndianFloat(value);
|
||||
|
@ -192,6 +204,8 @@ Tuple::ElementType Tuple::getType(size_t index) const {
|
|||
return ElementType::FLOAT;
|
||||
} else if (code == 0x21) {
|
||||
return ElementType::DOUBLE;
|
||||
} else if (code == 0x26 || code == 0x27) {
|
||||
return ElementType::BOOL;
|
||||
} else {
|
||||
throw invalid_tuple_data_type();
|
||||
}
|
||||
|
@ -287,6 +301,21 @@ int64_t Tuple::getInt(size_t index, bool allow_incomplete) const {
|
|||
}
|
||||
|
||||
// TODO: Combine with bindings/flow/Tuple.*. This code is copied from there.
|
||||
bool Tuple::getBool(size_t index) const {
|
||||
if (index >= offsets.size()) {
|
||||
throw invalid_tuple_index();
|
||||
}
|
||||
ASSERT_LT(offsets[index], data.size());
|
||||
uint8_t code = data[offsets[index]];
|
||||
if (code == 0x26) {
|
||||
return false;
|
||||
} else if (code == 0x27) {
|
||||
return true;
|
||||
} else {
|
||||
throw invalid_tuple_data_type();
|
||||
}
|
||||
}
|
||||
|
||||
float Tuple::getFloat(size_t index) const {
|
||||
if (index >= offsets.size()) {
|
||||
throw invalid_tuple_index();
|
||||
|
|
|
@ -40,6 +40,7 @@ struct Tuple {
|
|||
Tuple& append(int64_t);
|
||||
// There are some ambiguous append calls in fdbclient, so to make it easier
|
||||
// to add append for floats and doubles, name them differently for now.
|
||||
Tuple& appendBool(bool);
|
||||
Tuple& appendFloat(float);
|
||||
Tuple& appendDouble(double);
|
||||
Tuple& appendNull();
|
||||
|
@ -51,7 +52,7 @@ struct Tuple {
|
|||
return append(t);
|
||||
}
|
||||
|
||||
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, FLOAT, DOUBLE };
|
||||
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE };
|
||||
|
||||
// this is number of elements, not length of data
|
||||
size_t size() const { return offsets.size(); }
|
||||
|
@ -59,6 +60,7 @@ struct Tuple {
|
|||
ElementType getType(size_t index) const;
|
||||
Standalone<StringRef> getString(size_t index) const;
|
||||
int64_t getInt(size_t index, bool allow_incomplete = false) const;
|
||||
bool getBool(size_t index) const;
|
||||
float getFloat(size_t index) const;
|
||||
double getDouble(size_t index) const;
|
||||
|
||||
|
|
|
@ -135,7 +135,9 @@ public:
|
|||
true,
|
||||
TaskPriority::DefaultEndpoint,
|
||||
true)) // SOMEDAY: Locality!
|
||||
{}
|
||||
{
|
||||
GlobalConfig::globalConfig().updateDBInfo(clientInfo);
|
||||
}
|
||||
|
||||
void setDistributor(const DataDistributorInterface& interf) {
|
||||
auto newInfo = serverInfo->get();
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbclient/GlobalConfig.actor.h"
|
||||
#include "fdbclient/ProcessInterface.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -1038,6 +1039,8 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
|||
metricsLogger = runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, lockAware),
|
||||
KeyRef(metricsPrefix));
|
||||
}
|
||||
|
||||
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
|
||||
}
|
||||
|
||||
errorForwarders.add(resetAfter(degraded,
|
||||
|
|
Loading…
Reference in New Issue