Merge pull request #195 from cie/throttle-trace-events
Increase message limit for throttling to 100,000 for circus runs.
This commit is contained in:
commit
e377d1985c
|
@ -21,13 +21,7 @@
|
|||
#include "Knobs.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
FlowKnobs const* FLOW_KNOBS = getFlowKnobs();
|
||||
|
||||
FlowKnobs const* getFlowKnobs() {
|
||||
if (!FLOW_KNOBS)
|
||||
FLOW_KNOBS = new FlowKnobs();
|
||||
return FLOW_KNOBS;
|
||||
}
|
||||
FlowKnobs const* FLOW_KNOBS = new FlowKnobs();
|
||||
|
||||
#define init( knob, value ) initKnob( knob, value, #knob )
|
||||
|
||||
|
|
|
@ -170,7 +170,4 @@ public:
|
|||
|
||||
extern FlowKnobs const* FLOW_KNOBS;
|
||||
|
||||
// This api should be used if FLOW_KNOBS is required during static initialization of global variables.
|
||||
FlowKnobs const* getFlowKnobs();
|
||||
|
||||
#endif
|
||||
|
|
|
@ -92,4 +92,82 @@ private:
|
|||
return metric;
|
||||
}
|
||||
};
|
||||
|
||||
template <class T>
|
||||
struct TransientThresholdMetricSample : MetricSample<T> {
|
||||
Deque< std::tuple<double, T, int64_t> > queue;
|
||||
IndexedSet<T, int64_t> thresholdCrossedSet;
|
||||
int64_t thresholdLimit;
|
||||
|
||||
TransientThresholdMetricSample(int64_t metricUnitsPerSample, int64_t threshold) : MetricSample<T>(metricUnitsPerSample), thresholdLimit(threshold) { }
|
||||
|
||||
bool roll(int64_t metric) {
|
||||
return g_nondeterministic_random->random01() < (double)metric / this->metricUnitsPerSample; //< SOMEDAY: Better randomInt64?
|
||||
}
|
||||
|
||||
template <class U>
|
||||
bool isAboveThreshold(const U& key) {
|
||||
auto i = thresholdCrossedSet.find(key);
|
||||
if (i == thresholdCrossedSet.end())
|
||||
return false;
|
||||
else
|
||||
return true;
|
||||
}
|
||||
|
||||
// Returns the sampled metric value (possibly 0, possibly increased by the sampling factor)
|
||||
template <class T_>
|
||||
int64_t addAndExpire(T_&& key, int64_t metric, double expiration) {
|
||||
int64_t x = add(std::forward<T_>(key), metric);
|
||||
if (x)
|
||||
queue.push_back(std::make_tuple(expiration, *this->sample.find(key), -x));
|
||||
return x;
|
||||
}
|
||||
|
||||
void poll() {
|
||||
double now = ::now();
|
||||
while (queue.size() &&
|
||||
std::get<0>(queue.front()) <= now)
|
||||
{
|
||||
const T& key = std::get<1>(queue.front());
|
||||
int64_t delta = std::get<2>(queue.front());
|
||||
ASSERT(delta != 0);
|
||||
|
||||
int64_t val = this->sample.addMetric(T(key), delta);
|
||||
if (val < thresholdLimit && (val + std::abs(delta)) >= thresholdLimit) {
|
||||
auto iter = thresholdCrossedSet.find(key);
|
||||
ASSERT(iter != thresholdCrossedSet.end())
|
||||
thresholdCrossedSet.erase(iter);
|
||||
}
|
||||
if (val == 0)
|
||||
this->sample.erase(key);
|
||||
|
||||
queue.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
template <class T_>
|
||||
int64_t add(T_&& key, int64_t metric) {
|
||||
if (!metric) return 0;
|
||||
int64_t mag = std::abs(metric);
|
||||
|
||||
if (mag < this->metricUnitsPerSample) {
|
||||
if (!roll(mag))
|
||||
return 0;
|
||||
|
||||
metric = metric<0 ? -this->metricUnitsPerSample : this->metricUnitsPerSample;
|
||||
}
|
||||
|
||||
int64_t val = this->sample.addMetric(T(key), metric);
|
||||
if (val >= thresholdLimit) {
|
||||
ASSERT((val - metric) < thresholdLimit ? thresholdCrossedSet.find(key) == thresholdCrossedSet.end() : thresholdCrossedSet.find(key) != thresholdCrossedSet.end());
|
||||
thresholdCrossedSet.insert(key, val);
|
||||
}
|
||||
|
||||
if (val == 0)
|
||||
this->sample.erase(key);
|
||||
|
||||
return metric;
|
||||
}
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -132,9 +132,10 @@ IRandom* trace_random = NULL;
|
|||
LatestEventCache latestEventCache;
|
||||
SuppressionMap suppressedEvents;
|
||||
|
||||
static TransientMetricSample<const char *> traceEventThrottlerCache(getFlowKnobs()->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE);
|
||||
static TransientThresholdMetricSample<Standalone<StringRef>> *traceEventThrottlerCache;
|
||||
static const char *TRACE_EVENT_THROTTLE_STARTING_TYPE = "TraceEventThrottle_";
|
||||
|
||||
|
||||
struct TraceLog {
|
||||
Standalone< VectorRef<StringRef> > buffer;
|
||||
int file_length;
|
||||
|
@ -425,7 +426,7 @@ struct TraceLog {
|
|||
}
|
||||
|
||||
ThreadFuture<Void> flush() {
|
||||
traceEventThrottlerCache.poll();
|
||||
traceEventThrottlerCache->poll();
|
||||
|
||||
MutexHolder hold(mutex);
|
||||
bool roll = false;
|
||||
|
@ -863,16 +864,15 @@ TraceEvent& TraceEvent::backtrace(std::string prefix) {
|
|||
TraceEvent::~TraceEvent() {
|
||||
try {
|
||||
if (enabled) {
|
||||
|
||||
// TRACE_EVENT_THROTTLER
|
||||
if (severity > SevDebug) {
|
||||
if (traceEventThrottlerCache.getMetric(type) >= FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT) {
|
||||
if (severity > SevDebug && isNetworkThread()) {
|
||||
if (traceEventThrottlerCache->isAboveThreshold(StringRef((uint8_t *)type, strlen(type)))) {
|
||||
TraceEvent(SevWarnAlways, std::string(TRACE_EVENT_THROTTLE_STARTING_TYPE).append(type).c_str()).suppressFor(5);
|
||||
// Throttle Msg
|
||||
return;
|
||||
}
|
||||
else {
|
||||
traceEventThrottlerCache.addAndExpire(type, 1, now() + FLOW_KNOBS->TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY);
|
||||
traceEventThrottlerCache->addAndExpire(StringRef((uint8_t *)type, strlen(type)), 1, now() + FLOW_KNOBS->TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY);
|
||||
}
|
||||
} // End of Throttler
|
||||
|
||||
|
@ -980,6 +980,7 @@ void TraceEvent::writeEscapedfv( const char* format, va_list args ) {
|
|||
thread_local bool TraceEvent::networkThread = false;
|
||||
|
||||
void TraceEvent::setNetworkThread() {
|
||||
traceEventThrottlerCache = new TransientThresholdMetricSample<Standalone<StringRef>>(FLOW_KNOBS->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT);
|
||||
networkThread = true;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue