From 7f9ee224a4e849a7f5f1320357110110cadb1df7 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Mon, 26 Apr 2021 22:50:44 -0700 Subject: [PATCH] Refactor samples to include wait state --- fdbclient/ActorLineageProfiler.cpp | 21 ++++++++--------- fdbclient/ActorLineageProfiler.h | 9 +++++--- fdbclient/ProcessInterface.h | 5 ++-- fdbclient/SpecialKeySpace.actor.cpp | 36 +++++++++++++++++++++++------ fdbserver/worker.actor.cpp | 17 +++++++++----- 5 files changed, 57 insertions(+), 31 deletions(-) diff --git a/fdbclient/ActorLineageProfiler.cpp b/fdbclient/ActorLineageProfiler.cpp index 8bb2910001..e0a2e1bdf5 100644 --- a/fdbclient/ActorLineageProfiler.cpp +++ b/fdbclient/ActorLineageProfiler.cpp @@ -150,12 +150,9 @@ public: } } - std::shared_ptr done(double time) { - auto res = std::make_shared(); - res->time = time; - res->size = sbuffer.size(); - res->data = sbuffer.release(); - return res; + std::pair getbuf() { + unsigned size = sbuffer.size(); + return std::make_pair(sbuffer.release(), size); } }; @@ -175,11 +172,11 @@ std::map SampleCollectorT::collect(ActorLineage* lin } std::shared_ptr SampleCollectorT::collect() { - Packer packer; - std::map res; + auto sample = std::make_shared(); double time = g_network->now(); - res["time"sv] = time; + sample->time = time; for (auto& p : getSamples) { + Packer packer; std::vector> samples; auto sampleVec = p.second(); for (auto& val : sampleVec) { @@ -189,11 +186,11 @@ std::shared_ptr SampleCollectorT::collect() { } } if (!samples.empty()) { - res[to_string(p.first)] = samples; + packer.pack(samples); + sample->data[p.first] = packer.getbuf(); } } - packer.pack(res); - return packer.done(time); + return sample; } void SampleCollection_t::refresh() { diff --git a/fdbclient/ActorLineageProfiler.h b/fdbclient/ActorLineageProfiler.h index c612274133..67c6c83ff3 100644 --- a/fdbclient/ActorLineageProfiler.h +++ b/fdbclient/ActorLineageProfiler.h @@ -47,9 +47,12 @@ struct IALPCollector : IALPCollectorBase { struct Sample : std::enable_shared_from_this { double time = 0.0; - unsigned size = 0u; - char* data = nullptr; - ~Sample() { ::free(data); } + std::unordered_map> data; + ~Sample() { + std::for_each(data.begin(), data.end(), [](std::pair> entry) { + ::free(entry.second.first); + }); + } }; class SampleCollectorT { diff --git a/fdbclient/ProcessInterface.h b/fdbclient/ProcessInterface.h index 0c57107106..04ecf76181 100644 --- a/fdbclient/ProcessInterface.h +++ b/fdbclient/ProcessInterface.h @@ -62,14 +62,13 @@ struct EchoRequest { struct SerializedSample { constexpr static FileIdentifier file_identifier = 15785634; - WaitState waitState; double time; int seq; - std::string data; + std::unordered_map data; template void serialize(Ar& ar) { - serializer(ar, waitState, time, seq, data); + serializer(ar, time, seq, data); } }; diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index d2f0e57d55..ac200e20eb 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -2095,15 +2095,37 @@ ACTOR static Future> actorLineageGetRangeActor(ReadYo actorLineageRequest.seqEnd = seqEnd; ActorLineageReply reply = wait(process.actorLineage.getReply(actorLineageRequest)); + time_t dt = 0; + int seq = -1; for (const auto& sample : reply.samples) { - msgpack::object_handle oh = msgpack::unpack(sample.data.data(), sample.data.size()); - msgpack::object deserialized = oh.get(); + for (const auto& [waitState, data] : sample.data) { + time_t datetime = (time_t)sample.time; + seq = dt == datetime ? seq + 1 : 0; + dt = datetime; - std::ostringstream stream; - stream << deserialized; - // TODO: Fix return value for ranges - Key returnKey = prefix.withSuffix(host.toString() + "/" + std::to_string(sample.seq)); - result.push_back_deep(result.arena(), KeyValueRef(returnKey, stream.str())); + if (seq < seqStart) { + continue; + } else if (seq >= seqEnd) { + break; + } + + char buf[200]; + struct tm* tm; + tm = localtime(&datetime); + size_t size = strftime(buf, 200, "%FT%T%z", tm); + std::string date(buf, size); + + msgpack::object_handle oh = msgpack::unpack(data.data(), data.size()); + msgpack::object deserialized = oh.get(); + + std::ostringstream stream; + stream << deserialized; + + // TODO: Fix return value for time range + Key returnKey = prefix.withSuffix(host.toString() + "/" + std::string(to_string(waitState)) + "/" + date + + "/" + std::to_string(seq)); + result.push_back_deep(result.arena(), KeyValueRef(returnKey, stream.str())); + } } return result; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 4c5dfecb16..c897b80354 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -2060,13 +2060,18 @@ ACTOR Future serveProcess() { int maxSeq = std::min(req.seqEnd, static_cast(samples.size())); std::vector serializedSamples; - for (int i = req.seqStart; i < maxSeq; ++i) { - auto samplePtr = samples.at(i); - auto serialized = SerializedSample{ .waitState = WaitState::Network, // TODO: Currently unused - .time = samplePtr->time, - .seq = i, - .data = std::string(samplePtr->data, samplePtr->size) }; + for (const auto& samplePtr : samples) { + int seq = 0; + auto serialized = SerializedSample{ .time = samplePtr->time, .seq = seq }; + for (const auto& [waitState, pair] : samplePtr->data) { + serialized.data[waitState] = std::string(pair.first, pair.second); + } serializedSamples.push_back(std::move(serialized)); + + // TODO: Don't need to transmit seq over the network anymore + if (++seq >= maxSeq) { + continue; + }; } ActorLineageReply reply{ serializedSamples }; req.reply.send(reply);