diff --git a/fdbclient/ActorLineageProfiler.cpp b/fdbclient/ActorLineageProfiler.cpp index 7ceedf144d..31478cb418 100644 --- a/fdbclient/ActorLineageProfiler.cpp +++ b/fdbclient/ActorLineageProfiler.cpp @@ -169,13 +169,8 @@ IALPCollectorBase::IALPCollectorBase() { SampleCollector::instance().addCollector(this); } -std::shared_ptr SampleCollectorT::collect(ActorLineage* lineage) { +std::map SampleCollectorT::collect(ActorLineage* lineage) { ASSERT(lineage != nullptr); - auto sample = std::make_shared(); - double time = g_network->now(); - sample->time = time; - - Packer packer; std::map out; for (auto& collector : collectors) { auto val = collector->collect(lineage); @@ -183,37 +178,36 @@ std::shared_ptr SampleCollectorT::collect(ActorLineage* lineage) { out[collector->name()] = val.value(); } } - if (!out.empty()) { - packer.pack(out); - sample->data[WaitState::Running] = packer.getbuf(); + return out; +} + +std::shared_ptr SampleCollectorT::collect() { + auto sample = std::make_shared(); + double time = g_network->now(); + sample->time = time; + for (auto& p : getSamples) { + Packer packer; + std::vector> samples; + auto sampleVec = p.second(); + for (auto& val : sampleVec) { + auto m = collect(val.getPtr()); + if (!m.empty()) { + samples.emplace_back(std::move(m)); + } + } + if (!samples.empty()) { + packer.pack(samples); + sample->data[p.first] = packer.getbuf(); + } } return sample; } -// TODO: Remove -void SampleCollection_t::refresh() { - if (data.empty()) { - return; - } - auto min = std::min(data.back()->time - windowSize, data.back()->time); - double oldest = data.front()->time; - // we don't need to check for data.empty() in this loop (or the inner loop) as we know that we will end - // up with at least one entry which is the most recent sample - while (oldest < min) { - Lock _{ mutex }; - // we remove at most 10 elements at a time. This is so we don't block the main thread for too long. - for (int i = 0; i < 10 && oldest < min; ++i) { - data.pop_front(); - oldest = data.front()->time; - } - } -} - void SampleCollection_t::collect(const Reference& lineage) { - if (!lineage.isValid()) { - return; - } - auto sample = _collector->collect(lineage.getPtr()); + ASSERT(lineage.isValid()); + _currentLineage = lineage; + auto sample = _collector->collect(); + ASSERT(sample); { Lock _{ mutex }; data.emplace_back(sample); @@ -231,9 +225,7 @@ void SampleCollection_t::collect(const Reference& lineage) { } } // TODO: Should only call ingest when deleting from memory - if (sample.get() != 0) { - config->ingest(sample); - } + config->ingest(sample); } std::vector> SampleCollection_t::get(double from /*= 0.0*/, @@ -280,7 +272,6 @@ struct ProfilerImpl { if (ec) { return; } - // collection->refresh(); startSampling = true; timer = boost::asio::steady_timer(context, std::chrono::microseconds(1000000 / frequency)); timer.async_wait([this](auto const& ec) { profileHandler(ec); }); @@ -297,20 +288,15 @@ struct ProfilerImpl { } }; -// TODO: Remove ActorLineageProfilerT::ActorLineageProfilerT() : impl(new ProfilerImpl()) { - // collection->collector()->addGetter(WaitState::Network, - // std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet()))); - // collection->collector()->addGetter( - // WaitState::Disk, - // std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet()))); - // collection->collector()->addGetter(WaitState::Running, []() { - // auto res = currentLineageThreadSafe.get(); - // if (res.isValid()) { - // return std::vector>({ res }); - // } - // return std::vector>(); - // }); + collection->collector()->addGetter(WaitState::Network, + std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet()))); + collection->collector()->addGetter( + WaitState::Disk, + std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet()))); + collection->collector()->addGetter(WaitState::Running, []() { + return std::vector>({ SampleCollection::instance().getCurrentLineage() }); + }); } ActorLineageProfilerT::~ActorLineageProfilerT() { diff --git a/fdbclient/ActorLineageProfiler.h b/fdbclient/ActorLineageProfiler.h index 60d5c9f815..9827d8f009 100644 --- a/fdbclient/ActorLineageProfiler.h +++ b/fdbclient/ActorLineageProfiler.h @@ -127,10 +127,11 @@ private: std::vector collectors; std::map getSamples; SampleCollectorT() {} + std::map collect(ActorLineage* lineage); public: void addCollector(IALPCollectorBase* collector) { collectors.push_back(collector); } - std::shared_ptr collect(ActorLineage* lineage); + std::shared_ptr collect(); void addGetter(WaitState waitState, Getter const& getter) { getSamples[waitState] = getter; }; }; @@ -146,6 +147,7 @@ class SampleCollection_t { std::atomic windowSize = 0.0; std::deque> data; ProfilerConfig config; + Reference _currentLineage; public: /** @@ -161,13 +163,10 @@ public: * \param to The max age of all returned samples. */ std::vector> get(double from = 0.0, double to = std::numeric_limits::max()) const; - /** - * Collects all new samples from the sample collector and stores them in the collection. - */ - void refresh(); void collect(const Reference& lineage); const SampleCollector& collector() const { return _collector; } SampleCollector& collector() { return _collector; } + Reference getCurrentLineage() { return _currentLineage; } }; using SampleCollection = crossbow::singleton; diff --git a/fdbclient/AnnotateActor.h b/fdbclient/AnnotateActor.h index dfc944fd02..fd9e965a7d 100644 --- a/fdbclient/AnnotateActor.h +++ b/fdbclient/AnnotateActor.h @@ -33,10 +33,10 @@ struct AnnotateActor { AnnotateActor() : set(false) {} - AnnotateActor(Reference lineage) : set(true) { - index = g_network->getActorLineageSet().insert(lineage); - if (index == ActorLineageSet::npos) { - set = false; + AnnotateActor(LineageReference* lineage) : set(false) { + if (lineage->getPtr() != 0) { + index = g_network->getActorLineageSet().insert(*lineage); + set = (index != ActorLineageSet::npos); } } diff --git a/fdbclient/NameLineage.h b/fdbclient/NameLineage.h index 691fde4baa..f949aeb13a 100644 --- a/fdbclient/NameLineage.h +++ b/fdbclient/NameLineage.h @@ -33,7 +33,10 @@ struct NameLineageCollector : IALPCollector { NameLineageCollector() : IALPCollector() {} std::optional collect(ActorLineage* lineage) override { auto str = lineage->get(&NameLineage::actorName); - ASSERT(str.has_value()); - return std::string_view(*str, std::strlen(*str)); + if (str.has_value()) { + return std::string_view(*str, std::strlen(*str)); + } else { + return {}; + } } }; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index a8028f2c13..4273ff0571 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -3318,7 +3318,7 @@ ACTOR Future getRange(Database cx, throw deterministicRandom()->randomChoice( std::vector{ transaction_too_old(), future_version() }); } - state AnnotateActor annotation(getCurrentLineage()); + state AnnotateActor annotation(currentLineage); GetKeyValuesReply _rep = wait(loadBalance(cx.getPtr(), beginServer.second, diff --git a/fdbrpc/AsyncFileKAIO.actor.h b/fdbrpc/AsyncFileKAIO.actor.h index e4abeb1ea5..878128be75 100644 --- a/fdbrpc/AsyncFileKAIO.actor.h +++ b/fdbrpc/AsyncFileKAIO.actor.h @@ -242,11 +242,11 @@ public: // result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; }); #endif - // auto& actorLineageSet = IAsyncFileSystem::filesystem()->getActorLineageSet(); - // auto index = actorLineageSet.insert(currentLineage); - // ASSERT(index != ActorLineageSet::npos); + auto& actorLineageSet = IAsyncFileSystem::filesystem()->getActorLineageSet(); + auto index = actorLineageSet.insert(*currentLineage); + ASSERT(index != ActorLineageSet::npos); Future res = success(result); - // actorLineageSet.erase(index); + actorLineageSet.erase(index); return res; } // TODO(alexmiller): Remove when we upgrade the dev docker image to >14.10 diff --git a/flow/flow.cpp b/flow/flow.cpp index ee95c14c80..257f035928 100644 --- a/flow/flow.cpp +++ b/flow/flow.cpp @@ -41,7 +41,7 @@ ActorLineage::~ActorLineage() { } } -Reference getCurrentLineage() { +LineageReference getCurrentLineage() { if (!currentLineage->isValid() || !currentLineage->isAllocated()) { currentLineage->allocate(); } diff --git a/flow/flow.h b/flow/flow.h index d080546597..b85de76077 100644 --- a/flow/flow.h +++ b/flow/flow.h @@ -547,9 +547,9 @@ public: // getCurrentLineage()). class LineageReference : public Reference { public: - LineageReference() : Reference(nullptr), allocated_(false) {} - explicit LineageReference(ActorLineage* ptr) : Reference(ptr), allocated_(false) {} - LineageReference(const LineageReference& r) : Reference(r), allocated_(false) {} + LineageReference() : Reference(nullptr), actorName_(""), allocated_(false) {} + explicit LineageReference(ActorLineage* ptr) : Reference(ptr), actorName_(""), allocated_(false) {} + LineageReference(const LineageReference& r) : Reference(r), actorName_(""), allocated_(false) {} void setActorName(const char* name) { actorName_ = name; } const char* actorName() { return actorName_; } @@ -570,7 +570,7 @@ private: extern std::atomic startSampling; extern thread_local LineageReference* currentLineage; -Reference getCurrentLineage(); +LineageReference getCurrentLineage(); void replaceLineage(LineageReference* lineage); struct StackLineage : LineageProperties {