PR comments; add missing addRef when re-using Histogram in HistogramRegistry; Instrument UnsentPacketQueue in FlowTransport

This commit is contained in:
Russell Sears 2020-11-06 19:13:45 -08:00
parent df18f20efd
commit 08e8a06933
5 changed files with 50 additions and 16 deletions

View File

@ -51,6 +51,7 @@ HistogramRegistry& GetHistogramRegistry() {
// avoid link order issues where the registry hasn't been initialized, but we're
// instantiating a histogram
if (globalHistograms == nullptr) {
// Note: This will show up as a leak on shutdown, but we're OK with that.
globalHistograms = new HistogramRegistry();
}
return *globalHistograms;
@ -65,10 +66,12 @@ void HistogramRegistry::registerHistogram(Histogram* h) {
}
void HistogramRegistry::unregisterHistogram(Histogram* h) {
if (histograms.find(h->name()) == histograms.end()) {
std::string name = h->name();
if (histograms.find(name) == histograms.end()) {
TraceEvent(SevError, "HistogramNotRegistered").detail("group", h->group).detail("op", h->op);
}
ASSERT(histograms.erase(h->name()) == 1);
int count = histograms.erase(name);
ASSERT(count == 1);
}
Histogram* HistogramRegistry::lookupHistogram(std::string name) {

View File

@ -42,22 +42,27 @@ public:
void logReport();
private:
// This map is ordered by key so that ops within the same group end up
// next to each other in the trace log.
std::map<std::string, Histogram*> histograms;
};
// TODO: This should be scoped properly for simulation (instead of just having all the "machines" share one histogram
// namespace)
HistogramRegistry& GetHistogramRegistry();
class Histogram : public ReferenceCounted<Histogram> {
/*
* A fast histogram with power-of-two spaced buckets.
*
* For more information about this technique, see:
* https://www.fsl.cs.stonybrook.edu/project-osprof.html
*/
class Histogram sealed : public ReferenceCounted<Histogram> {
public:
enum class Unit { microseconds, bytes };
private:
Histogram(std::string group, std::string op, Unit unit)
: group(group), op(op), unit(unit), registry(GetHistogramRegistry()) {
Histogram(std::string group, std::string op, Unit unit, HistogramRegistry& registry)
: group(group), op(op), unit(unit), registry(registry), ReferenceCounted<Histogram>() {
clear();
registry.registerHistogram(this);
}
static std::string generateName(std::string group, std::string op) { return group + ":" + op; }
@ -72,18 +77,30 @@ public:
HistogramRegistry& registry = GetHistogramRegistry();
Histogram* h = registry.lookupHistogram(name);
if (!h) {
h = new Histogram(group_str, op_str, unit);
h = new Histogram(group_str, op_str, unit, registry);
registry.registerHistogram(h);
return Reference<Histogram>(h);
} else {
return Reference<Histogram>::addRef(h);
}
return Reference(h);
}
// This histogram buckets samples into powers of two.
inline void sample(uint32_t sample) {
size_t idx;
#ifdef _WIN32
unsigned long index;
buckets[_BitScanReverse(&index, sample) ? index : 0]++;
// _BitScanReverse sets index to the position of the first non-zero bit, so
// _BitScanReverse(sample) ~= log_2(sample). _BitScanReverse returns false if
// sample is zero.
idx = _BitScanReverse(&index, sample) ? index : 0;
#else
buckets[sample ? (31 - __builtin_clz(sample)) : 0]++;
// __builtin_clz counts the leading zeros in its uint32_t argument. So, 31-clz ~= log_2(sample).
// __builtin_clz(0) is undefined.
idx = sample ? (31 - __builtin_clz(sample)) : 0;
#endif
ASSERT(idx < 32);
buckets[idx]++;
}
inline void sampleSeconds(double delta) {

View File

@ -150,6 +150,8 @@ void UnsentPacketQueue::sent(int bytes) {
bytes -= b->bytes_written - b->bytes_sent;
b->bytes_sent = b->bytes_written;
ASSERT(b->bytes_written <= b->size());
double queue_time = now() - b->enqueue_time;
sendQueueLatencyHistogram->sampleSeconds(queue_time);
unsent_first = b->nextPacketBuffer();
if (!unsent_first) unsent_last = NULL;
b->delref();

View File

@ -23,6 +23,7 @@
#pragma once
#include "flow/flow.h"
#include "flow/Histogram.h"
// PacketWriter and PacketBuffer are in serialize.h because they are needed by the SerializeSource<> template
@ -40,8 +41,17 @@ struct ReliablePacket : FastAllocated<ReliablePacket> {
class UnsentPacketQueue : NonCopyable {
public:
UnsentPacketQueue() : unsent_first(0), unsent_last(0) {}
~UnsentPacketQueue() { discardAll(); }
UnsentPacketQueue()
: unsent_first(0), unsent_last(0),
sendQueueLatencyHistogram(Histogram::getHistogram(
LiteralStringRef("UnsentPacketQueue"), LiteralStringRef("QueueWait"), Histogram::Unit::microseconds)) {}
~UnsentPacketQueue() {
discardAll();
unsent_first = (PacketBuffer*)0xDEADBEEF;
unsent_last = (PacketBuffer*)0xCAFEBABE;
sendQueueLatencyHistogram = Reference<Histogram>(nullptr);
}
// Get a PacketBuffer to write new packets into
PacketBuffer* getWriteBuffer() {
@ -70,6 +80,7 @@ public:
private:
PacketBuffer *unsent_first, *unsent_last; // Both NULL, or inclusive range of PacketBuffers that haven't been sent. The last one may have space for more packets to be written.
Reference<Histogram> sendQueueLatencyHistogram;
};
class ReliablePacketList : NonCopyable {

View File

@ -667,12 +667,13 @@ private:
struct PacketBuffer : SendBuffer {
private:
static constexpr size_t PACKET_BUFFER_OVERHEAD = 40;
int reference_count;
uint32_t const size_;
double const enqueue_time;
static constexpr size_t PACKET_BUFFER_OVERHEAD = 40;
public:
double const enqueue_time;
uint8_t* data() { return const_cast<uint8_t*>(static_cast<SendBuffer*>(this)->data); }
size_t size() { return size_; }