From 08e8a0693305d3762a768a2a977cfe180f19310e Mon Sep 17 00:00:00 2001 From: Russell Sears Date: Fri, 6 Nov 2020 19:13:45 -0800 Subject: [PATCH] PR comments; add missing addRef when re-using Histogram in HistogramRegistry; Instrument UnsentPacketQueue in FlowTransport --- flow/Histogram.cpp | 7 +++++-- flow/Histogram.h | 37 +++++++++++++++++++++++++++---------- flow/Net2Packet.cpp | 2 ++ flow/Net2Packet.h | 15 +++++++++++++-- flow/serialize.h | 5 +++-- 5 files changed, 50 insertions(+), 16 deletions(-) diff --git a/flow/Histogram.cpp b/flow/Histogram.cpp index 6fa0f14384..e5281093c9 100644 --- a/flow/Histogram.cpp +++ b/flow/Histogram.cpp @@ -51,6 +51,7 @@ HistogramRegistry& GetHistogramRegistry() { // avoid link order issues where the registry hasn't been initialized, but we're // instantiating a histogram if (globalHistograms == nullptr) { + // Note: This will show up as a leak on shutdown, but we're OK with that. globalHistograms = new HistogramRegistry(); } return *globalHistograms; @@ -65,10 +66,12 @@ void HistogramRegistry::registerHistogram(Histogram* h) { } void HistogramRegistry::unregisterHistogram(Histogram* h) { - if (histograms.find(h->name()) == histograms.end()) { + std::string name = h->name(); + if (histograms.find(name) == histograms.end()) { TraceEvent(SevError, "HistogramNotRegistered").detail("group", h->group).detail("op", h->op); } - ASSERT(histograms.erase(h->name()) == 1); + int count = histograms.erase(name); + ASSERT(count == 1); } Histogram* HistogramRegistry::lookupHistogram(std::string name) { diff --git a/flow/Histogram.h b/flow/Histogram.h index 93dd508686..fd765f4d86 100644 --- a/flow/Histogram.h +++ b/flow/Histogram.h @@ -42,22 +42,27 @@ public: void logReport(); private: + // This map is ordered by key so that ops within the same group end up + // next to each other in the trace log. std::map histograms; }; -// TODO: This should be scoped properly for simulation (instead of just having all the "machines" share one histogram -// namespace) HistogramRegistry& GetHistogramRegistry(); -class Histogram : public ReferenceCounted { +/* + * A fast histogram with power-of-two spaced buckets. + * + * For more information about this technique, see: + * https://www.fsl.cs.stonybrook.edu/project-osprof.html + */ +class Histogram sealed : public ReferenceCounted { public: enum class Unit { microseconds, bytes }; private: - Histogram(std::string group, std::string op, Unit unit) - : group(group), op(op), unit(unit), registry(GetHistogramRegistry()) { + Histogram(std::string group, std::string op, Unit unit, HistogramRegistry& registry) + : group(group), op(op), unit(unit), registry(registry), ReferenceCounted() { clear(); - registry.registerHistogram(this); } static std::string generateName(std::string group, std::string op) { return group + ":" + op; } @@ -72,18 +77,30 @@ public: HistogramRegistry& registry = GetHistogramRegistry(); Histogram* h = registry.lookupHistogram(name); if (!h) { - h = new Histogram(group_str, op_str, unit); + h = new Histogram(group_str, op_str, unit, registry); + registry.registerHistogram(h); + return Reference(h); + } else { + return Reference::addRef(h); } - return Reference(h); } + // This histogram buckets samples into powers of two. inline void sample(uint32_t sample) { + size_t idx; #ifdef _WIN32 unsigned long index; - buckets[_BitScanReverse(&index, sample) ? index : 0]++; + // _BitScanReverse sets index to the position of the first non-zero bit, so + // _BitScanReverse(sample) ~= log_2(sample). _BitScanReverse returns false if + // sample is zero. + idx = _BitScanReverse(&index, sample) ? index : 0; #else - buckets[sample ? (31 - __builtin_clz(sample)) : 0]++; + // __builtin_clz counts the leading zeros in its uint32_t argument. So, 31-clz ~= log_2(sample). + // __builtin_clz(0) is undefined. + idx = sample ? (31 - __builtin_clz(sample)) : 0; #endif + ASSERT(idx < 32); + buckets[idx]++; } inline void sampleSeconds(double delta) { diff --git a/flow/Net2Packet.cpp b/flow/Net2Packet.cpp index 74ce689be6..c126a5271d 100644 --- a/flow/Net2Packet.cpp +++ b/flow/Net2Packet.cpp @@ -150,6 +150,8 @@ void UnsentPacketQueue::sent(int bytes) { bytes -= b->bytes_written - b->bytes_sent; b->bytes_sent = b->bytes_written; ASSERT(b->bytes_written <= b->size()); + double queue_time = now() - b->enqueue_time; + sendQueueLatencyHistogram->sampleSeconds(queue_time); unsent_first = b->nextPacketBuffer(); if (!unsent_first) unsent_last = NULL; b->delref(); diff --git a/flow/Net2Packet.h b/flow/Net2Packet.h index 3d9a0f99b3..a1588f7d9e 100644 --- a/flow/Net2Packet.h +++ b/flow/Net2Packet.h @@ -23,6 +23,7 @@ #pragma once #include "flow/flow.h" +#include "flow/Histogram.h" // PacketWriter and PacketBuffer are in serialize.h because they are needed by the SerializeSource<> template @@ -40,8 +41,17 @@ struct ReliablePacket : FastAllocated { class UnsentPacketQueue : NonCopyable { public: - UnsentPacketQueue() : unsent_first(0), unsent_last(0) {} - ~UnsentPacketQueue() { discardAll(); } + UnsentPacketQueue() + : unsent_first(0), unsent_last(0), + sendQueueLatencyHistogram(Histogram::getHistogram( + LiteralStringRef("UnsentPacketQueue"), LiteralStringRef("QueueWait"), Histogram::Unit::microseconds)) {} + + ~UnsentPacketQueue() { + discardAll(); + unsent_first = (PacketBuffer*)0xDEADBEEF; + unsent_last = (PacketBuffer*)0xCAFEBABE; + sendQueueLatencyHistogram = Reference(nullptr); + } // Get a PacketBuffer to write new packets into PacketBuffer* getWriteBuffer() { @@ -70,6 +80,7 @@ public: private: PacketBuffer *unsent_first, *unsent_last; // Both NULL, or inclusive range of PacketBuffers that haven't been sent. The last one may have space for more packets to be written. + Reference sendQueueLatencyHistogram; }; class ReliablePacketList : NonCopyable { diff --git a/flow/serialize.h b/flow/serialize.h index 4028df16e7..ec250c1421 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -667,12 +667,13 @@ private: struct PacketBuffer : SendBuffer { private: + static constexpr size_t PACKET_BUFFER_OVERHEAD = 40; int reference_count; uint32_t const size_; - double const enqueue_time; - static constexpr size_t PACKET_BUFFER_OVERHEAD = 40; public: + double const enqueue_time; + uint8_t* data() { return const_cast(static_cast(this)->data); } size_t size() { return size_; }