metrics: Make LatencySample and Counter inherit from IMetric
This commit is contained in:
parent
c96cb24af2
commit
30744c1310
|
@ -22,8 +22,8 @@
|
||||||
#include "flow/actorcompiler.h" // has to be last include
|
#include "flow/actorcompiler.h" // has to be last include
|
||||||
|
|
||||||
Counter::Counter(std::string const& name, CounterCollection& collection)
|
Counter::Counter(std::string const& name, CounterCollection& collection)
|
||||||
: name(name), interval_start(0), last_event(0), interval_sq_time(0), roughness_interval_start(0), interval_delta(0),
|
: IMetric(name), interval_start(0), last_event(0), interval_sq_time(0), roughness_interval_start(0),
|
||||||
interval_start_value(0) {
|
interval_delta(0), interval_start_value(0) {
|
||||||
metric.init(collection.getName() + "." + (char)toupper(name.at(0)) + name.substr(1), collection.getId());
|
metric.init(collection.getName() + "." + (char)toupper(name.at(0)) + name.substr(1), collection.getId());
|
||||||
collection.addCounter(this);
|
collection.addCounter(this);
|
||||||
}
|
}
|
||||||
|
@ -81,6 +81,8 @@ void Counter::clear() {
|
||||||
metric = 0;
|
metric = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Counter::flush(MetricBatch& batch) {}
|
||||||
|
|
||||||
void CounterCollection::logToTraceEvent(TraceEvent& te) const {
|
void CounterCollection::logToTraceEvent(TraceEvent& te) const {
|
||||||
for (ICounter* c : counters) {
|
for (ICounter* c : counters) {
|
||||||
te.detail(c->getName().c_str(), c);
|
te.detail(c->getName().c_str(), c);
|
||||||
|
|
|
@ -100,7 +100,7 @@ public:
|
||||||
std::function<void(TraceEvent&)> const& decorator = [](auto& te) {});
|
std::function<void(TraceEvent&)> const& decorator = [](auto& te) {});
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Counter final : ICounter, NonCopyable {
|
struct Counter final : ICounter, NonCopyable, IMetric {
|
||||||
public:
|
public:
|
||||||
typedef int64_t Value;
|
typedef int64_t Value;
|
||||||
|
|
||||||
|
@ -134,8 +134,9 @@ public:
|
||||||
bool hasRate() const override { return true; }
|
bool hasRate() const override { return true; }
|
||||||
bool hasRoughness() const override { return true; }
|
bool hasRoughness() const override { return true; }
|
||||||
|
|
||||||
|
void flush(MetricBatch& batch) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string name;
|
|
||||||
double interval_start, last_event, interval_sq_time, roughness_interval_start;
|
double interval_start, last_event, interval_sq_time, roughness_interval_start;
|
||||||
Value interval_delta, interval_start_value;
|
Value interval_delta, interval_start_value;
|
||||||
Int64MetricHandle metric;
|
Int64MetricHandle metric;
|
||||||
|
@ -214,10 +215,10 @@ public:
|
||||||
~LatencyBands();
|
~LatencyBands();
|
||||||
};
|
};
|
||||||
|
|
||||||
class LatencySample {
|
class LatencySample : public IMetric {
|
||||||
public:
|
public:
|
||||||
LatencySample(std::string name, UID id, double loggingInterval, double accuracy)
|
LatencySample(std::string name, UID id, double loggingInterval, double accuracy)
|
||||||
: name(name), id(id), sampleStart(now()), sketch(accuracy),
|
: IMetric(name), id(id), sampleStart(now()), sketch(accuracy),
|
||||||
latencySampleEventHolder(makeReference<EventCacheHolder>(id.toString() + "/" + name)) {
|
latencySampleEventHolder(makeReference<EventCacheHolder>(id.toString() + "/" + name)) {
|
||||||
assert(accuracy > 0);
|
assert(accuracy > 0);
|
||||||
if (accuracy <= 0) {
|
if (accuracy <= 0) {
|
||||||
|
@ -227,9 +228,9 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
void addMeasurement(double measurement) { sketch.addSample(measurement); }
|
void addMeasurement(double measurement) { sketch.addSample(measurement); }
|
||||||
|
void flush(MetricBatch& batch) override {}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string name;
|
|
||||||
UID id;
|
UID id;
|
||||||
double sampleStart;
|
double sampleStart;
|
||||||
|
|
||||||
|
|
|
@ -198,7 +198,7 @@ void DynamicEventMetric::flushData(MetricKeyRef const& mk, uint64_t rollTime, Me
|
||||||
for (auto& [name, field] : fields)
|
for (auto& [name, field] : fields)
|
||||||
field->flushField(mk, rollTime, batch);
|
field->flushField(mk, rollTime, batch);
|
||||||
if (!latestRecorded) {
|
if (!latestRecorded) {
|
||||||
batch.updates.emplace_back(mk.packLatestKey(), StringRef());
|
batch.scope.updates.emplace_back(mk.packLatestKey(), StringRef());
|
||||||
latestRecorded = true;
|
latestRecorded = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,6 +168,16 @@ struct MetricBatch {
|
||||||
FDBScope scope;
|
FDBScope scope;
|
||||||
std::string statsd_message;
|
std::string statsd_message;
|
||||||
|
|
||||||
|
MetricBatch() {}
|
||||||
|
|
||||||
|
MetricBatch(FDBScope* in) {
|
||||||
|
assert(in != nullptr);
|
||||||
|
scope.inserts = std::move(in->inserts);
|
||||||
|
scope.appends = std::move(in->appends);
|
||||||
|
scope.updates = std::move(in->updates);
|
||||||
|
scope.callbacks = std::move(in->callbacks);
|
||||||
|
}
|
||||||
|
|
||||||
void clear() {
|
void clear() {
|
||||||
scope.clear();
|
scope.clear();
|
||||||
statsd_message.clear();
|
statsd_message.clear();
|
||||||
|
@ -618,7 +628,7 @@ public:
|
||||||
IMetricDB* db,
|
IMetricDB* db,
|
||||||
Standalone<MetricKeyRef> mk,
|
Standalone<MetricKeyRef> mk,
|
||||||
uint64_t rollTime,
|
uint64_t rollTime,
|
||||||
MetricBatch* batch) {
|
FDBScope* scope) {
|
||||||
|
|
||||||
Optional<Standalone<StringRef>> block = wait(db->getLastBlock(mk.packDataKey(-1)));
|
Optional<Standalone<StringRef>> block = wait(db->getLastBlock(mk.packDataKey(-1)));
|
||||||
|
|
||||||
|
@ -647,7 +657,8 @@ public:
|
||||||
|
|
||||||
// Now flush the level data up to the rollTime argument and patch anything older than
|
// Now flush the level data up to the rollTime argument and patch anything older than
|
||||||
// lastTimeRequiringHeaderPatch
|
// lastTimeRequiringHeaderPatch
|
||||||
self->flushUpdates(mk, rollTime, *batch);
|
MetricBatch batch{ scope };
|
||||||
|
self->flushUpdates(mk, rollTime, batch);
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
@ -666,8 +677,8 @@ public:
|
||||||
Standalone<MetricKeyRef> mkCopy = mk;
|
Standalone<MetricKeyRef> mkCopy = mk;
|
||||||
|
|
||||||
// Previous header is not present so queue a callback which will update it
|
// Previous header is not present so queue a callback which will update it
|
||||||
batch.scope.callbacks.push_back([=](IMetricDB* db, MetricBatch* batch) mutable -> Future<Void> {
|
batch.scope.callbacks.push_back([=](IMetricDB* db, FDBScope* s) mutable -> Future<Void> {
|
||||||
return updatePreviousHeader(this, db, mkCopy, rollTime, batch);
|
return updatePreviousHeader(this, db, mkCopy, rollTime, s);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -1433,11 +1444,12 @@ template <typename E>
|
||||||
using EventMetricHandle = MetricHandle<EventMetric<E>>;
|
using EventMetricHandle = MetricHandle<EventMetric<E>>;
|
||||||
|
|
||||||
class IMetric {
|
class IMetric {
|
||||||
|
protected:
|
||||||
std::string name;
|
std::string name;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
IMetric(const std::string& n) : name{ n } {}
|
IMetric(const std::string& n) : name{ n } {}
|
||||||
virtual ~IMetric() = 0;
|
virtual ~IMetric() {}
|
||||||
virtual void flush(MetricBatch&) = 0;
|
virtual void flush(MetricBatch&) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue