Collect actors waiting on disk and network

This commit is contained in:
Lukas Joswiak 2021-06-17 16:50:34 -07:00
parent 805c4200ce
commit 2d248a5926
8 changed files with 58 additions and 70 deletions

View File

@ -169,13 +169,8 @@ IALPCollectorBase::IALPCollectorBase() {
SampleCollector::instance().addCollector(this);
}
std::shared_ptr<Sample> SampleCollectorT::collect(ActorLineage* lineage) {
std::map<std::string_view, std::any> SampleCollectorT::collect(ActorLineage* lineage) {
ASSERT(lineage != nullptr);
auto sample = std::make_shared<Sample>();
double time = g_network->now();
sample->time = time;
Packer packer;
std::map<std::string_view, std::any> out;
for (auto& collector : collectors) {
auto val = collector->collect(lineage);
@ -183,37 +178,36 @@ std::shared_ptr<Sample> SampleCollectorT::collect(ActorLineage* lineage) {
out[collector->name()] = val.value();
}
}
if (!out.empty()) {
packer.pack(out);
sample->data[WaitState::Running] = packer.getbuf();
return out;
}
std::shared_ptr<Sample> SampleCollectorT::collect() {
auto sample = std::make_shared<Sample>();
double time = g_network->now();
sample->time = time;
for (auto& p : getSamples) {
Packer packer;
std::vector<std::map<std::string_view, std::any>> samples;
auto sampleVec = p.second();
for (auto& val : sampleVec) {
auto m = collect(val.getPtr());
if (!m.empty()) {
samples.emplace_back(std::move(m));
}
}
if (!samples.empty()) {
packer.pack(samples);
sample->data[p.first] = packer.getbuf();
}
}
return sample;
}
// TODO: Remove
void SampleCollection_t::refresh() {
if (data.empty()) {
return;
}
auto min = std::min(data.back()->time - windowSize, data.back()->time);
double oldest = data.front()->time;
// we don't need to check for data.empty() in this loop (or the inner loop) as we know that we will end
// up with at least one entry which is the most recent sample
while (oldest < min) {
Lock _{ mutex };
// we remove at most 10 elements at a time. This is so we don't block the main thread for too long.
for (int i = 0; i < 10 && oldest < min; ++i) {
data.pop_front();
oldest = data.front()->time;
}
}
}
void SampleCollection_t::collect(const Reference<ActorLineage>& lineage) {
if (!lineage.isValid()) {
return;
}
auto sample = _collector->collect(lineage.getPtr());
ASSERT(lineage.isValid());
_currentLineage = lineage;
auto sample = _collector->collect();
ASSERT(sample);
{
Lock _{ mutex };
data.emplace_back(sample);
@ -231,9 +225,7 @@ void SampleCollection_t::collect(const Reference<ActorLineage>& lineage) {
}
}
// TODO: Should only call ingest when deleting from memory
if (sample.get() != 0) {
config->ingest(sample);
}
config->ingest(sample);
}
std::vector<std::shared_ptr<Sample>> SampleCollection_t::get(double from /*= 0.0*/,
@ -280,7 +272,6 @@ struct ProfilerImpl {
if (ec) {
return;
}
// collection->refresh();
startSampling = true;
timer = boost::asio::steady_timer(context, std::chrono::microseconds(1000000 / frequency));
timer.async_wait([this](auto const& ec) { profileHandler(ec); });
@ -297,20 +288,15 @@ struct ProfilerImpl {
}
};
// TODO: Remove
ActorLineageProfilerT::ActorLineageProfilerT() : impl(new ProfilerImpl()) {
// collection->collector()->addGetter(WaitState::Network,
// std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet())));
// collection->collector()->addGetter(
// WaitState::Disk,
// std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet())));
// collection->collector()->addGetter(WaitState::Running, []() {
// auto res = currentLineageThreadSafe.get();
// if (res.isValid()) {
// return std::vector<Reference<ActorLineage>>({ res });
// }
// return std::vector<Reference<ActorLineage>>();
// });
collection->collector()->addGetter(WaitState::Network,
std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet())));
collection->collector()->addGetter(
WaitState::Disk,
std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet())));
collection->collector()->addGetter(WaitState::Running, []() {
return std::vector<Reference<ActorLineage>>({ SampleCollection::instance().getCurrentLineage() });
});
}
ActorLineageProfilerT::~ActorLineageProfilerT() {

View File

@ -127,10 +127,11 @@ private:
std::vector<IALPCollectorBase*> collectors;
std::map<WaitState, Getter> getSamples;
SampleCollectorT() {}
std::map<std::string_view, std::any> collect(ActorLineage* lineage);
public:
void addCollector(IALPCollectorBase* collector) { collectors.push_back(collector); }
std::shared_ptr<Sample> collect(ActorLineage* lineage);
std::shared_ptr<Sample> collect();
void addGetter(WaitState waitState, Getter const& getter) { getSamples[waitState] = getter; };
};
@ -146,6 +147,7 @@ class SampleCollection_t {
std::atomic<double> windowSize = 0.0;
std::deque<std::shared_ptr<Sample>> data;
ProfilerConfig config;
Reference<ActorLineage> _currentLineage;
public:
/**
@ -161,13 +163,10 @@ public:
* \param to The max age of all returned samples.
*/
std::vector<std::shared_ptr<Sample>> get(double from = 0.0, double to = std::numeric_limits<double>::max()) const;
/**
* Collects all new samples from the sample collector and stores them in the collection.
*/
void refresh();
void collect(const Reference<ActorLineage>& lineage);
const SampleCollector& collector() const { return _collector; }
SampleCollector& collector() { return _collector; }
Reference<ActorLineage> getCurrentLineage() { return _currentLineage; }
};
using SampleCollection = crossbow::singleton<SampleCollection_t>;

View File

@ -33,10 +33,10 @@ struct AnnotateActor {
AnnotateActor() : set(false) {}
AnnotateActor(Reference<ActorLineage> lineage) : set(true) {
index = g_network->getActorLineageSet().insert(lineage);
if (index == ActorLineageSet::npos) {
set = false;
AnnotateActor(LineageReference* lineage) : set(false) {
if (lineage->getPtr() != 0) {
index = g_network->getActorLineageSet().insert(*lineage);
set = (index != ActorLineageSet::npos);
}
}

View File

@ -33,7 +33,10 @@ struct NameLineageCollector : IALPCollector<NameLineage> {
NameLineageCollector() : IALPCollector() {}
std::optional<std::any> collect(ActorLineage* lineage) override {
auto str = lineage->get(&NameLineage::actorName);
ASSERT(str.has_value());
return std::string_view(*str, std::strlen(*str));
if (str.has_value()) {
return std::string_view(*str, std::strlen(*str));
} else {
return {};
}
}
};

View File

@ -3318,7 +3318,7 @@ ACTOR Future<RangeResult> getRange(Database cx,
throw deterministicRandom()->randomChoice(
std::vector<Error>{ transaction_too_old(), future_version() });
}
state AnnotateActor annotation(getCurrentLineage());
state AnnotateActor annotation(currentLineage);
GetKeyValuesReply _rep =
wait(loadBalance(cx.getPtr(),
beginServer.second,

View File

@ -242,11 +242,11 @@ public:
// result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
// auto& actorLineageSet = IAsyncFileSystem::filesystem()->getActorLineageSet();
// auto index = actorLineageSet.insert(currentLineage);
// ASSERT(index != ActorLineageSet::npos);
auto& actorLineageSet = IAsyncFileSystem::filesystem()->getActorLineageSet();
auto index = actorLineageSet.insert(*currentLineage);
ASSERT(index != ActorLineageSet::npos);
Future<Void> res = success(result);
// actorLineageSet.erase(index);
actorLineageSet.erase(index);
return res;
}
// TODO(alexmiller): Remove when we upgrade the dev docker image to >14.10

View File

@ -41,7 +41,7 @@ ActorLineage::~ActorLineage() {
}
}
Reference<ActorLineage> getCurrentLineage() {
LineageReference getCurrentLineage() {
if (!currentLineage->isValid() || !currentLineage->isAllocated()) {
currentLineage->allocate();
}

View File

@ -547,9 +547,9 @@ public:
// getCurrentLineage()).
class LineageReference : public Reference<ActorLineage> {
public:
LineageReference() : Reference<ActorLineage>(nullptr), allocated_(false) {}
explicit LineageReference(ActorLineage* ptr) : Reference<ActorLineage>(ptr), allocated_(false) {}
LineageReference(const LineageReference& r) : Reference<ActorLineage>(r), allocated_(false) {}
LineageReference() : Reference<ActorLineage>(nullptr), actorName_(""), allocated_(false) {}
explicit LineageReference(ActorLineage* ptr) : Reference<ActorLineage>(ptr), actorName_(""), allocated_(false) {}
LineageReference(const LineageReference& r) : Reference<ActorLineage>(r), actorName_(""), allocated_(false) {}
void setActorName(const char* name) { actorName_ = name; }
const char* actorName() { return actorName_; }
@ -570,7 +570,7 @@ private:
extern std::atomic<bool> startSampling;
extern thread_local LineageReference* currentLineage;
Reference<ActorLineage> getCurrentLineage();
LineageReference getCurrentLineage();
void replaceLineage(LineageReference* lineage);
struct StackLineage : LineageProperties<StackLineage> {