Refactor samples to include wait state

This commit is contained in:
Lukas Joswiak 2021-04-26 22:50:44 -07:00
parent 76acb0fcb9
commit 7f9ee224a4
5 changed files with 57 additions and 31 deletions

View File

@ -150,12 +150,9 @@ public:
} }
} }
std::shared_ptr<Sample> done(double time) { std::pair<char*, unsigned> getbuf() {
auto res = std::make_shared<Sample>(); unsigned size = sbuffer.size();
res->time = time; return std::make_pair(sbuffer.release(), size);
res->size = sbuffer.size();
res->data = sbuffer.release();
return res;
} }
}; };
@ -175,11 +172,11 @@ std::map<std::string_view, std::any> SampleCollectorT::collect(ActorLineage* lin
} }
std::shared_ptr<Sample> SampleCollectorT::collect() { std::shared_ptr<Sample> SampleCollectorT::collect() {
Packer packer; auto sample = std::make_shared<Sample>();
std::map<std::string_view, std::any> res;
double time = g_network->now(); double time = g_network->now();
res["time"sv] = time; sample->time = time;
for (auto& p : getSamples) { for (auto& p : getSamples) {
Packer packer;
std::vector<std::map<std::string_view, std::any>> samples; std::vector<std::map<std::string_view, std::any>> samples;
auto sampleVec = p.second(); auto sampleVec = p.second();
for (auto& val : sampleVec) { for (auto& val : sampleVec) {
@ -189,11 +186,11 @@ std::shared_ptr<Sample> SampleCollectorT::collect() {
} }
} }
if (!samples.empty()) { if (!samples.empty()) {
res[to_string(p.first)] = samples; packer.pack(samples);
sample->data[p.first] = packer.getbuf();
} }
} }
packer.pack(res); return sample;
return packer.done(time);
} }
void SampleCollection_t::refresh() { void SampleCollection_t::refresh() {

View File

@ -47,9 +47,12 @@ struct IALPCollector : IALPCollectorBase {
struct Sample : std::enable_shared_from_this<Sample> { struct Sample : std::enable_shared_from_this<Sample> {
double time = 0.0; double time = 0.0;
unsigned size = 0u; std::unordered_map<WaitState, std::pair<char*, unsigned>> data;
char* data = nullptr; ~Sample() {
~Sample() { ::free(data); } std::for_each(data.begin(), data.end(), [](std::pair<WaitState, std::pair<char*, unsigned>> entry) {
::free(entry.second.first);
});
}
}; };
class SampleCollectorT { class SampleCollectorT {

View File

@ -62,14 +62,13 @@ struct EchoRequest {
struct SerializedSample { struct SerializedSample {
constexpr static FileIdentifier file_identifier = 15785634; constexpr static FileIdentifier file_identifier = 15785634;
WaitState waitState;
double time; double time;
int seq; int seq;
std::string data; std::unordered_map<WaitState, std::string> data;
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, waitState, time, seq, data); serializer(ar, time, seq, data);
} }
}; };

View File

@ -2095,15 +2095,37 @@ ACTOR static Future<Standalone<RangeResultRef>> actorLineageGetRangeActor(ReadYo
actorLineageRequest.seqEnd = seqEnd; actorLineageRequest.seqEnd = seqEnd;
ActorLineageReply reply = wait(process.actorLineage.getReply(actorLineageRequest)); ActorLineageReply reply = wait(process.actorLineage.getReply(actorLineageRequest));
time_t dt = 0;
int seq = -1;
for (const auto& sample : reply.samples) { for (const auto& sample : reply.samples) {
msgpack::object_handle oh = msgpack::unpack(sample.data.data(), sample.data.size()); for (const auto& [waitState, data] : sample.data) {
msgpack::object deserialized = oh.get(); time_t datetime = (time_t)sample.time;
seq = dt == datetime ? seq + 1 : 0;
dt = datetime;
std::ostringstream stream; if (seq < seqStart) {
stream << deserialized; continue;
// TODO: Fix return value for ranges } else if (seq >= seqEnd) {
Key returnKey = prefix.withSuffix(host.toString() + "/" + std::to_string(sample.seq)); break;
result.push_back_deep(result.arena(), KeyValueRef(returnKey, stream.str())); }
char buf[200];
struct tm* tm;
tm = localtime(&datetime);
size_t size = strftime(buf, 200, "%FT%T%z", tm);
std::string date(buf, size);
msgpack::object_handle oh = msgpack::unpack(data.data(), data.size());
msgpack::object deserialized = oh.get();
std::ostringstream stream;
stream << deserialized;
// TODO: Fix return value for time range
Key returnKey = prefix.withSuffix(host.toString() + "/" + std::string(to_string(waitState)) + "/" + date +
"/" + std::to_string(seq));
result.push_back_deep(result.arena(), KeyValueRef(returnKey, stream.str()));
}
} }
return result; return result;

View File

@ -2060,13 +2060,18 @@ ACTOR Future<Void> serveProcess() {
int maxSeq = std::min(req.seqEnd, static_cast<int>(samples.size())); int maxSeq = std::min(req.seqEnd, static_cast<int>(samples.size()));
std::vector<SerializedSample> serializedSamples; std::vector<SerializedSample> serializedSamples;
for (int i = req.seqStart; i < maxSeq; ++i) { for (const auto& samplePtr : samples) {
auto samplePtr = samples.at(i); int seq = 0;
auto serialized = SerializedSample{ .waitState = WaitState::Network, // TODO: Currently unused auto serialized = SerializedSample{ .time = samplePtr->time, .seq = seq };
.time = samplePtr->time, for (const auto& [waitState, pair] : samplePtr->data) {
.seq = i, serialized.data[waitState] = std::string(pair.first, pair.second);
.data = std::string(samplePtr->data, samplePtr->size) }; }
serializedSamples.push_back(std::move(serialized)); serializedSamples.push_back(std::move(serialized));
// TODO: Don't need to transmit seq over the network anymore
if (++seq >= maxSeq) {
continue;
};
} }
ActorLineageReply reply{ serializedSamples }; ActorLineageReply reply{ serializedSamples };
req.reply.send(reply); req.reply.send(reply);