Implemented configuration
This commit is contained in:
parent
3e18b857a8
commit
2d6fafde64
|
@ -360,3 +360,11 @@ void ProfilerConfigT::reset(std::map<std::string, std::string> const& config) {
|
||||||
useTCP ? FluentDIngestor::Protocol::TCP : FluentDIngestor::Protocol::TCP, address));
|
useTCP ? FluentDIngestor::Protocol::TCP : FluentDIngestor::Protocol::TCP, address));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::map<std::string, std::string> ProfilerConfigT::getConfig() const {
|
||||||
|
std::map<std::string, std::string> res;
|
||||||
|
if (ingestor) {
|
||||||
|
ingestor->getConfig(res);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
|
@ -55,11 +55,13 @@ class SampleIngestor : std::enable_shared_from_this<SampleIngestor> {
|
||||||
public:
|
public:
|
||||||
virtual ~SampleIngestor();
|
virtual ~SampleIngestor();
|
||||||
virtual void ingest(std::shared_ptr<Sample> const& sample) = 0;
|
virtual void ingest(std::shared_ptr<Sample> const& sample) = 0;
|
||||||
|
virtual void getConfig(std::map<std::string, std::string>&) const = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class NoneIngestor : public SampleIngestor {
|
class NoneIngestor : public SampleIngestor {
|
||||||
public:
|
public:
|
||||||
void ingest(std::shared_ptr<Sample> const& sample) override {}
|
void ingest(std::shared_ptr<Sample> const& sample) override {}
|
||||||
|
void getConfig(std::map<std::string, std::string>& res) const override { res["ingestor"] = "none"; }
|
||||||
};
|
};
|
||||||
|
|
||||||
// The FluentD ingestor uses the pimp idiom. This is to make compilation less heavy weight as this implementation has
|
// The FluentD ingestor uses the pimp idiom. This is to make compilation less heavy weight as this implementation has
|
||||||
|
@ -76,6 +78,7 @@ private: // members
|
||||||
public: // interface
|
public: // interface
|
||||||
void ingest(std::shared_ptr<Sample> const& sample) override;
|
void ingest(std::shared_ptr<Sample> const& sample) override;
|
||||||
FluentDIngestor(Protocol protocol, NetworkAddress& endpoint);
|
FluentDIngestor(Protocol protocol, NetworkAddress& endpoint);
|
||||||
|
void getConfig(std::map<std::string, std::string>& res) const override;
|
||||||
~FluentDIngestor();
|
~FluentDIngestor();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -99,6 +102,7 @@ private: // construction
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void reset(std::map<std::string, std::string> const& config);
|
void reset(std::map<std::string, std::string> const& config);
|
||||||
|
std::map<std::string, std::string> getConfig() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
using ProfilerConfig = crossbow::singleton<ProfilerConfigT>;
|
using ProfilerConfig = crossbow::singleton<ProfilerConfigT>;
|
||||||
|
|
|
@ -170,3 +170,9 @@ void FluentDIngestor::ingest(const std::shared_ptr<Sample>& sample) {
|
||||||
impl->socket->send(sample);
|
impl->socket->send(sample);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FluentDIngestor::getConfig(std::map<std::string, std::string>& res) const {
|
||||||
|
res["ingestor"] = "fluentd";
|
||||||
|
res["collector_endpoint"] = impl->endpoint.toString();
|
||||||
|
res["collector_protocol"] = impl->protocol == Protocol::TCP ? "tcp" : "udp";
|
||||||
|
}
|
||||||
|
|
|
@ -1060,6 +1060,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
||||||
SpecialKeySpace::MODULE::ACTORLINEAGE,
|
SpecialKeySpace::MODULE::ACTORLINEAGE,
|
||||||
SpecialKeySpace::IMPLTYPE::READONLY,
|
SpecialKeySpace::IMPLTYPE::READONLY,
|
||||||
std::make_unique<ActorLineageImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE)));
|
std::make_unique<ActorLineageImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE)));
|
||||||
|
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
|
||||||
|
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||||
|
std::make_unique<ActorProfilerConf>(
|
||||||
|
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE)));
|
||||||
}
|
}
|
||||||
if (apiVersionAtLeast(630)) {
|
if (apiVersionAtLeast(630)) {
|
||||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
|
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "boost/lexical_cast.hpp"
|
#include "boost/lexical_cast.hpp"
|
||||||
#include "boost/algorithm/string.hpp"
|
#include "boost/algorithm/string.hpp"
|
||||||
|
|
||||||
|
#include "fdbclient/ActorLineageProfiler.h"
|
||||||
#include "fdbclient/Knobs.h"
|
#include "fdbclient/Knobs.h"
|
||||||
#include "fdbclient/ProcessInterface.h"
|
#include "fdbclient/ProcessInterface.h"
|
||||||
#include "fdbclient/GlobalConfig.actor.h"
|
#include "fdbclient/GlobalConfig.actor.h"
|
||||||
|
@ -71,7 +72,10 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
|
||||||
{ SpecialKeySpace::MODULE::TRACING,
|
{ SpecialKeySpace::MODULE::TRACING,
|
||||||
KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) },
|
KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) },
|
||||||
{ SpecialKeySpace::MODULE::ACTORLINEAGE,
|
{ SpecialKeySpace::MODULE::ACTORLINEAGE,
|
||||||
KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) }
|
KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) },
|
||||||
|
{ SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
|
||||||
|
KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"),
|
||||||
|
LiteralStringRef("\xff\xff/actor_profiler_conf0")) }
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
|
std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
|
||||||
|
@ -1953,3 +1957,64 @@ ACTOR static Future<Standalone<RangeResultRef>> actorLineageGetRangeActor(ReadYo
|
||||||
Future<Standalone<RangeResultRef>> ActorLineageImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
Future<Standalone<RangeResultRef>> ActorLineageImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||||
return actorLineageGetRangeActor(ryw, getKeyRange().begin, kr);
|
return actorLineageGetRangeActor(ryw, getKeyRange().begin, kr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
std::string_view to_string_view(StringRef sr) {
|
||||||
|
return std::string_view(reinterpret_cast<const char*>(sr.begin()), sr.size());
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
ActorProfilerConf::ActorProfilerConf(KeyRangeRef kr)
|
||||||
|
: SpecialKeyRangeRWImpl(kr), config(ProfilerConfig::instance().getConfig()) {}
|
||||||
|
|
||||||
|
Future<Standalone<RangeResultRef>> ActorProfilerConf::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||||
|
Standalone<RangeResultRef> res;
|
||||||
|
std::string_view begin(to_string_view(kr.begin.removePrefix(range.begin))),
|
||||||
|
end(to_string_view(kr.end.removePrefix(range.begin)));
|
||||||
|
for (auto& p : config) {
|
||||||
|
if (p.first > end) {
|
||||||
|
break;
|
||||||
|
} else if (p.first > begin) {
|
||||||
|
KeyValueRef kv;
|
||||||
|
kv.key = StringRef(res.arena(), p.first);
|
||||||
|
kv.value = StringRef(res.arena(), p.second);
|
||||||
|
res.push_back(res.arena(), kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ActorProfilerConf::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
|
||||||
|
config[key.removePrefix(range.begin).toString()] = value.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& kr) {
|
||||||
|
std::string begin(kr.begin.removePrefix(range.begin).toString()), end(kr.end.removePrefix(range.begin).toString());
|
||||||
|
auto first = config.lower_bound(begin);
|
||||||
|
if (first == config.end()) {
|
||||||
|
// nothing to clear
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto last = config.upper_bound(end);
|
||||||
|
config.erase(first, last);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
|
||||||
|
std::string k = key.removePrefix(range.begin).toString();
|
||||||
|
auto iter = config.find(k);
|
||||||
|
if (iter != config.end()) {
|
||||||
|
config.erase(iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Optional<std::string>> ActorProfilerConf::commit(ReadYourWritesTransaction* ryw) {
|
||||||
|
Optional<std::string> res{};
|
||||||
|
try {
|
||||||
|
if (didWrite) {
|
||||||
|
ProfilerConfig::instance().reset(config);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
} catch (ConfigError& err) {
|
||||||
|
return Optional<std::string>{ err.description };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -143,6 +143,7 @@ class SpecialKeySpace {
|
||||||
public:
|
public:
|
||||||
enum class MODULE {
|
enum class MODULE {
|
||||||
ACTORLINEAGE, // Sampling data
|
ACTORLINEAGE, // Sampling data
|
||||||
|
ACTOR_PROFILER_CONF, // profiler configuration
|
||||||
CLUSTERFILEPATH,
|
CLUSTERFILEPATH,
|
||||||
CONFIGURATION, // Configuration of the cluster
|
CONFIGURATION, // Configuration of the cluster
|
||||||
CONNECTIONSTRING,
|
CONNECTIONSTRING,
|
||||||
|
@ -395,5 +396,18 @@ public:
|
||||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class ActorProfilerConf : public SpecialKeyRangeRWImpl {
|
||||||
|
bool didWrite = false;
|
||||||
|
std::map<std::string, std::string> config;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit ActorProfilerConf(KeyRangeRef kr);
|
||||||
|
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||||
|
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
|
||||||
|
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
|
||||||
|
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
|
||||||
|
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||||
|
};
|
||||||
|
|
||||||
#include "flow/unactorcompiler.h"
|
#include "flow/unactorcompiler.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue