Merge pull request #15 from sfc-gh-ljoswiak/features/actor-lineage-interface
Add API to read samples from worker
This commit is contained in:
commit
3400ab5b36
|
@ -63,13 +63,14 @@ class Packer : public msgpack::packer<msgpack::sbuffer> {
|
|||
std::string_view,
|
||||
std::vector<std::any>,
|
||||
std::map<std::string, std::any>,
|
||||
std::map<std::string_view, std::any>>::populate(visitorMap);
|
||||
std::map<std::string_view, std::any>,
|
||||
std::vector<std::map<std::string_view, std::any>>>::populate(visitorMap);
|
||||
}
|
||||
|
||||
void visit(const std::any& val, Packer& packer) {
|
||||
auto iter = visitorMap.find(val.type());
|
||||
if (iter == visitorMap.end()) {
|
||||
// TODO: trace error
|
||||
TraceEvent(SevError, "PackerTypeNotFound").detail("Type", val.type().name());
|
||||
} else {
|
||||
iter->second(val, packer);
|
||||
}
|
||||
|
@ -149,12 +150,9 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Sample> done(double time) {
|
||||
auto res = std::make_shared<Sample>();
|
||||
res->time = time;
|
||||
res->size = sbuffer.size();
|
||||
res->data = sbuffer.release();
|
||||
return res;
|
||||
std::pair<char*, unsigned> getbuf() {
|
||||
unsigned size = sbuffer.size();
|
||||
return std::make_pair(sbuffer.release(), size);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -174,11 +172,11 @@ std::map<std::string_view, std::any> SampleCollectorT::collect(ActorLineage* lin
|
|||
}
|
||||
|
||||
std::shared_ptr<Sample> SampleCollectorT::collect() {
|
||||
Packer packer;
|
||||
std::map<std::string_view, std::any> res;
|
||||
auto sample = std::make_shared<Sample>();
|
||||
double time = g_network->now();
|
||||
res["time"sv] = time;
|
||||
sample->time = time;
|
||||
for (auto& p : getSamples) {
|
||||
Packer packer;
|
||||
std::vector<std::map<std::string_view, std::any>> samples;
|
||||
auto sampleVec = p.second();
|
||||
for (auto& val : sampleVec) {
|
||||
|
@ -188,16 +186,16 @@ std::shared_ptr<Sample> SampleCollectorT::collect() {
|
|||
}
|
||||
}
|
||||
if (!samples.empty()) {
|
||||
res[to_string(p.first)] = samples;
|
||||
packer.pack(samples);
|
||||
sample->data[p.first] = packer.getbuf();
|
||||
}
|
||||
}
|
||||
packer.pack(res);
|
||||
return packer.done(time);
|
||||
return sample;
|
||||
}
|
||||
|
||||
void SampleCollection_t::refresh() {
|
||||
auto sample = _collector->collect();
|
||||
auto min = std::max(sample->time - windowSize, sample->time);
|
||||
auto min = std::min(sample->time - windowSize, sample->time);
|
||||
{
|
||||
Lock _{ mutex };
|
||||
data.emplace_back(std::move(sample));
|
||||
|
@ -237,7 +235,7 @@ ActorLineageProfilerT::ActorLineageProfilerT() {
|
|||
std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet())));
|
||||
collection->collector()->addGetter(WaitState::Running, []() {
|
||||
auto res = currentLineageThreadSafe.get();
|
||||
return std::vector<Reference<ActorLineage>>({ currentLineageThreadSafe.get() });
|
||||
return std::vector<Reference<ActorLineage>>({ res });
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -47,9 +47,12 @@ struct IALPCollector : IALPCollectorBase {
|
|||
|
||||
struct Sample : std::enable_shared_from_this<Sample> {
|
||||
double time = 0.0;
|
||||
unsigned size = 0u;
|
||||
char* data = nullptr;
|
||||
~Sample() { ::free(data); }
|
||||
std::unordered_map<WaitState, std::pair<char*, unsigned>> data;
|
||||
~Sample() {
|
||||
std::for_each(data.begin(), data.end(), [](std::pair<WaitState, std::pair<char*, unsigned>> entry) {
|
||||
::free(entry.second.first);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
class SampleCollectorT {
|
||||
|
|
|
@ -123,7 +123,7 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
|||
}
|
||||
}
|
||||
|
||||
void GlobalConfig::erase(KeyRef key) {
|
||||
void GlobalConfig::erase(Key key) {
|
||||
erase(KeyRangeRef(key, keyAfter(key)));
|
||||
}
|
||||
|
||||
|
@ -187,9 +187,7 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
|||
// Updates local copy of global configuration by reading the entire key-range
|
||||
// from storage.
|
||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
||||
for (const auto& [key, _] : self->data) {
|
||||
self->erase(key);
|
||||
}
|
||||
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
|
||||
|
||||
Transaction tr(self->cx);
|
||||
Standalone<RangeResultRef> result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
|
|
|
@ -150,7 +150,7 @@ private:
|
|||
void insert(KeyRef key, ValueRef value);
|
||||
// Removes the given key (and associated value) from the local copy of the
|
||||
// global configuration keyspace.
|
||||
void erase(KeyRef key);
|
||||
void erase(Key key);
|
||||
// Removes the given key range (and associated values) from the local copy
|
||||
// of the global configuration keyspace.
|
||||
void erase(KeyRangeRef range);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/AnnotateActor.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
|
||||
|
@ -26,11 +27,11 @@ constexpr UID WLTOKEN_PROCESS(-1, 11);
|
|||
struct ProcessInterface {
|
||||
constexpr static FileIdentifier file_identifier = 985636;
|
||||
RequestStream<struct GetProcessInterfaceRequest> getInterface;
|
||||
RequestStream<struct EchoRequest> echo;
|
||||
RequestStream<struct ActorLineageRequest> actorLineage;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, echo);
|
||||
serializer(ar, actorLineage);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -44,14 +45,37 @@ struct GetProcessInterfaceRequest {
|
|||
}
|
||||
};
|
||||
|
||||
// TODO: Used for demonstration purposes, remove in later PR
|
||||
struct EchoRequest {
|
||||
constexpr static FileIdentifier file_identifier = 10624019;
|
||||
std::string message;
|
||||
ReplyPromise<std::string> reply;
|
||||
// This type is used to send serialized sample data over the network.
|
||||
struct SerializedSample {
|
||||
constexpr static FileIdentifier file_identifier = 15785634;
|
||||
|
||||
double time;
|
||||
std::unordered_map<WaitState, std::string> data;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, message, reply);
|
||||
serializer(ar, time, data);
|
||||
}
|
||||
};
|
||||
|
||||
struct ActorLineageReply {
|
||||
constexpr static FileIdentifier file_identifier = 1887656;
|
||||
std::vector<SerializedSample> samples;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, samples);
|
||||
}
|
||||
};
|
||||
|
||||
struct ActorLineageRequest {
|
||||
constexpr static FileIdentifier file_identifier = 11654765;
|
||||
WaitState waitStateStart, waitStateEnd;
|
||||
time_t timeStart, timeEnd;
|
||||
ReplyPromise<ActorLineageReply> reply;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, waitStateStart, waitStateEnd, timeStart, timeEnd, reply);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -21,6 +21,11 @@
|
|||
#include "boost/lexical_cast.hpp"
|
||||
#include "boost/algorithm/string.hpp"
|
||||
|
||||
#include <time.h>
|
||||
#include <msgpack.hpp>
|
||||
|
||||
#include <exception>
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/ProcessInterface.h"
|
||||
#include "fdbclient/GlobalConfig.actor.h"
|
||||
|
@ -102,6 +107,15 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
|
|||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
};
|
||||
|
||||
std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiCommandToRange = {
|
||||
{ "state",
|
||||
KeyRangeRef(LiteralStringRef("state/"), LiteralStringRef("state0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) },
|
||||
{ "time",
|
||||
KeyRangeRef(LiteralStringRef("time/"), LiteralStringRef("time0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) }
|
||||
};
|
||||
|
||||
std::set<std::string> SpecialKeySpace::options = { "excluded/force", "failed/force" };
|
||||
|
||||
std::set<std::string> SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey };
|
||||
|
@ -1932,26 +1946,197 @@ void ClientProfilingImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& ke
|
|||
|
||||
ActorLineageImpl::ActorLineageImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
||||
|
||||
void parse(StringRef& val, int& i) {
|
||||
i = std::stoi(val.toString());
|
||||
}
|
||||
|
||||
void parse(StringRef& val, double& d) {
|
||||
d = std::stod(val.toString());
|
||||
}
|
||||
|
||||
void parse(StringRef& val, WaitState& w) {
|
||||
if (val == LiteralStringRef("disk")) {
|
||||
w = WaitState::Disk;
|
||||
} else if (val == LiteralStringRef("network")) {
|
||||
w = WaitState::Network;
|
||||
} else if (val == LiteralStringRef("running")) {
|
||||
w = WaitState::Running;
|
||||
} else {
|
||||
throw std::range_error("failed to parse run state");
|
||||
}
|
||||
}
|
||||
|
||||
void parse(StringRef& val, time_t& t) {
|
||||
struct tm tm = { 0 };
|
||||
if (strptime(val.toString().c_str(), "%FT%T%z", &tm) == nullptr) {
|
||||
throw std::invalid_argument("failed to parse ISO 8601 datetime");
|
||||
}
|
||||
|
||||
long timezone = tm.tm_gmtoff;
|
||||
t = timegm(&tm);
|
||||
if (t == -1) {
|
||||
throw std::runtime_error("failed to convert ISO 8601 datetime");
|
||||
}
|
||||
t -= timezone;
|
||||
}
|
||||
|
||||
void parse(StringRef& val, NetworkAddress& a) {
|
||||
auto address = NetworkAddress::parse(val.toString());
|
||||
if (!address.isValid()) {
|
||||
throw std::invalid_argument("invalid host");
|
||||
}
|
||||
a = address;
|
||||
}
|
||||
|
||||
// Base case function for parsing function below.
|
||||
template <typename T>
|
||||
void parse(std::vector<StringRef>::iterator it, std::vector<StringRef>::iterator end, T& t1) {
|
||||
if (it == end) {
|
||||
return;
|
||||
}
|
||||
parse(*it, t1);
|
||||
}
|
||||
|
||||
// Given an iterator into a vector of string tokens, an iterator to the end of
|
||||
// the search space in the vector (exclusive), and a list of references to
|
||||
// types, parses each token in the vector into the associated type according to
|
||||
// the order of the arguments.
|
||||
//
|
||||
// For example, given the vector ["1", "1.5", "127.0.0.1:4000"] and the
|
||||
// argument list int a, double b, NetworkAddress c, after this function returns
|
||||
// each parameter passed in will hold the parsed value from the token list.
|
||||
//
|
||||
// The appropriate parsing function must be implemented for the type you wish
|
||||
// to parse. See the existing parsing functions above, and add your own if
|
||||
// necessary.
|
||||
template <typename T, typename... Types>
|
||||
void parse(std::vector<StringRef>::iterator it, std::vector<StringRef>::iterator end, T& t1, Types&... remaining) {
|
||||
// Return as soon as all tokens have been parsed. This allows parameters
|
||||
// passed at the end to act as optional parameters -- they will only be set
|
||||
// if the value exists.
|
||||
if (it == end) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
parse(*it, t1);
|
||||
parse(++it, end, remaining...);
|
||||
} catch (Error& e) {
|
||||
throw e;
|
||||
} catch (std::exception& e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> actorLineageGetRangeActor(ReadYourWritesTransaction* ryw,
|
||||
KeyRef prefix,
|
||||
KeyRangeRef kr) {
|
||||
state Standalone<RangeResultRef> result;
|
||||
Standalone<StringRef> addressString = kr.begin.removePrefix(prefix);
|
||||
|
||||
// Set default values for all fields. The default will be used if the field
|
||||
// is missing in the key.
|
||||
state NetworkAddress host;
|
||||
state WaitState waitStateStart = WaitState{ 0 };
|
||||
state WaitState waitStateEnd = WaitState{ 2 };
|
||||
state time_t timeStart = 0;
|
||||
state time_t timeEnd = std::numeric_limits<time_t>::max();
|
||||
state int seqStart = 0;
|
||||
state int seqEnd = std::numeric_limits<int>::max();
|
||||
|
||||
state std::vector<StringRef> beginValues = kr.begin.removePrefix(prefix).splitAny("/"_sr);
|
||||
state std::vector<StringRef> endValues = kr.end.removePrefix(prefix).splitAny("/"_sr);
|
||||
// Require index (either "state" or "time") and address:port.
|
||||
if (beginValues.size() < 2 || endValues.size() < 2) {
|
||||
ryw->setSpecialKeySpaceErrorMsg("missing required parameters (index, host)");
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
try {
|
||||
auto address = NetworkAddress::parse(addressString.contents().toString());
|
||||
|
||||
state ProcessInterface process;
|
||||
process.getInterface = RequestStream<GetProcessInterfaceRequest>(Endpoint({ address }, WLTOKEN_PROCESS));
|
||||
ProcessInterface p = wait(retryBrokenPromise(process.getInterface, GetProcessInterfaceRequest{}));
|
||||
process = p;
|
||||
|
||||
EchoRequest echoRequest;
|
||||
echoRequest.message = "Hello";
|
||||
std::string response = wait(process.echo.getReply(echoRequest));
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, response));
|
||||
state NetworkAddress endRangeHost;
|
||||
if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) {
|
||||
// For the range \xff\xff/actor_lineage/state/ip:port/wait-state/time/seq
|
||||
parse(beginValues.begin() + 1, beginValues.end(), host, waitStateStart, timeStart, seqStart);
|
||||
if (kr.begin != kr.end) {
|
||||
parse(endValues.begin() + 1, endValues.end(), endRangeHost, waitStateEnd, timeEnd, seqEnd);
|
||||
}
|
||||
} else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) {
|
||||
// For the range \xff\xff/actor_lineage/time/ip:port/time/wait-state/seq
|
||||
parse(beginValues.begin() + 1, beginValues.end(), host, timeStart, waitStateStart, seqStart);
|
||||
if (kr.begin != kr.end) {
|
||||
parse(endValues.begin() + 1, endValues.end(), endRangeHost, timeEnd, waitStateEnd, seqEnd);
|
||||
}
|
||||
} else {
|
||||
ryw->setSpecialKeySpaceErrorMsg("invalid index in actor_lineage");
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "SpecialKeysNetworkParseError").error(e);
|
||||
if (e.code() != special_keys_api_failure().code()) {
|
||||
ryw->setSpecialKeySpaceErrorMsg("failed to parse key");
|
||||
throw special_keys_api_failure();
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
if (kr.begin != kr.end && host != endRangeHost) {
|
||||
// The client doesn't know about all the hosts, so a get range covering
|
||||
// multiple hosts has no way of knowing which IP:port combos to use.
|
||||
ryw->setSpecialKeySpaceErrorMsg("the host must remain the same on both ends of the range");
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
// Open endpoint to target process on each call. This can be optimized at
|
||||
// some point...
|
||||
state ProcessInterface process;
|
||||
process.getInterface = RequestStream<GetProcessInterfaceRequest>(Endpoint({ host }, WLTOKEN_PROCESS));
|
||||
ProcessInterface p = wait(retryBrokenPromise(process.getInterface, GetProcessInterfaceRequest{}));
|
||||
process = p;
|
||||
|
||||
ActorLineageRequest actorLineageRequest;
|
||||
actorLineageRequest.waitStateStart = waitStateStart;
|
||||
actorLineageRequest.waitStateEnd = waitStateEnd;
|
||||
actorLineageRequest.timeStart = timeStart;
|
||||
actorLineageRequest.timeEnd = timeEnd;
|
||||
ActorLineageReply reply = wait(process.actorLineage.getReply(actorLineageRequest));
|
||||
|
||||
time_t dt = 0;
|
||||
int seq = -1;
|
||||
for (const auto& sample : reply.samples) {
|
||||
for (const auto& [waitState, data] : sample.data) {
|
||||
time_t datetime = (time_t)sample.time;
|
||||
seq = dt == datetime ? seq + 1 : 0;
|
||||
dt = datetime;
|
||||
|
||||
if (seq < seqStart) { continue; }
|
||||
else if (seq >= seqEnd) { break; }
|
||||
|
||||
char buf[50];
|
||||
struct tm* tm;
|
||||
tm = localtime(&datetime);
|
||||
size_t size = strftime(buf, 50, "%FT%T%z", tm);
|
||||
std::string date(buf, size);
|
||||
|
||||
std::ostringstream streamKey;
|
||||
if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) {
|
||||
streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("state").toString() << host.toString()
|
||||
<< "/" << to_string(waitState) << "/" << date;
|
||||
} else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) {
|
||||
streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("time").toString() << host.toString()
|
||||
<< "/" << date << "/" << to_string(waitState);
|
||||
;
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
streamKey << "/" << seq;
|
||||
|
||||
msgpack::object_handle oh = msgpack::unpack(data.data(), data.size());
|
||||
msgpack::object deserialized = oh.get();
|
||||
|
||||
std::ostringstream stream;
|
||||
stream << deserialized;
|
||||
|
||||
result.push_back_deep(result.arena(), KeyValueRef(streamKey.str(), stream.str()));
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -200,6 +200,12 @@ public:
|
|||
static KeyRef getManagementApiCommandPrefix(const std::string& command) {
|
||||
return managementApiCommandToRange.at(command).begin;
|
||||
}
|
||||
static KeyRangeRef getActorLineageApiCommandRange(const std::string& command) {
|
||||
return actorLineageApiCommandToRange.at(command);
|
||||
}
|
||||
static KeyRef getActorLineageApiCommandPrefix(const std::string& command) {
|
||||
return actorLineageApiCommandToRange.at(command).begin;
|
||||
}
|
||||
static Key getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option);
|
||||
static const std::set<std::string>& getManagementApiOptionsSet() { return options; }
|
||||
static const std::set<std::string>& getTracingOptions() { return tracingOptions; }
|
||||
|
@ -228,6 +234,7 @@ private:
|
|||
static std::unordered_map<SpecialKeySpace::MODULE, KeyRange> moduleToBoundary;
|
||||
static std::unordered_map<std::string, KeyRange>
|
||||
managementApiCommandToRange; // management command to its special keys' range
|
||||
static std::unordered_map<std::string, KeyRange> actorLineageApiCommandToRange;
|
||||
static std::set<std::string> options; // "<command>/<option>"
|
||||
static std::set<std::string> tracingOptions;
|
||||
|
||||
|
|
|
@ -2040,6 +2040,8 @@ ACTOR Future<Void> serveProtocolInfo() {
|
|||
}
|
||||
}
|
||||
|
||||
// Handles requests from ProcessInterface, an interface meant for direct
|
||||
// communication between the client and FDB processes.
|
||||
ACTOR Future<Void> serveProcess() {
|
||||
state ProcessInterface process;
|
||||
process.getInterface.makeWellKnownEndpoint(WLTOKEN_PROCESS, TaskPriority::DefaultEndpoint);
|
||||
|
@ -2048,7 +2050,23 @@ ACTOR Future<Void> serveProcess() {
|
|||
when(GetProcessInterfaceRequest req = waitNext(process.getInterface.getFuture())) {
|
||||
req.reply.send(process);
|
||||
}
|
||||
when(EchoRequest req = waitNext(process.echo.getFuture())) { req.reply.send(req.message); }
|
||||
when(ActorLineageRequest req = waitNext(process.actorLineage.getFuture())) {
|
||||
state SampleCollection sampleCollector;
|
||||
auto samples = sampleCollector->get(req.timeStart, req.timeEnd);
|
||||
|
||||
std::vector<SerializedSample> serializedSamples;
|
||||
for (const auto& samplePtr : samples) {
|
||||
auto serialized = SerializedSample{ .time = samplePtr->time };
|
||||
for (const auto& [waitState, pair] : samplePtr->data) {
|
||||
if (waitState >= req.waitStateStart && waitState <= req.waitStateEnd) {
|
||||
serialized.data[waitState] = std::string(pair.first, pair.second);
|
||||
}
|
||||
}
|
||||
serializedSamples.push_back(std::move(serialized));
|
||||
}
|
||||
ActorLineageReply reply{ serializedSamples };
|
||||
req.reply.send(reply);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue