Merge pull request #8170 from sfc-gh-sgwydir/ddsketch
Use DDSketch for sample data
This commit is contained in:
commit
8e8c4b4489
|
@ -57,11 +57,11 @@ BlobCipherMetrics::CounterSet::CounterSet(CounterCollection& cc, std::string nam
|
|||
getCipherKeysLatency(name + "GetCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
|
||||
getLatestCipherKeysLatency(name + "GetLatestCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE) {}
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY) {}
|
||||
|
||||
BlobCipherMetrics::BlobCipherMetrics()
|
||||
: cc("BlobCipher"), cipherKeyCacheHit("CipherKeyCacheHit", cc), cipherKeyCacheMiss("CipherKeyCacheMiss", cc),
|
||||
|
@ -71,15 +71,15 @@ BlobCipherMetrics::BlobCipherMetrics()
|
|||
getCipherKeysLatency("GetCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
|
||||
getLatestCipherKeysLatency("GetLatestCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
|
||||
getBlobMetadataLatency("GetBlobMetadataLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
|
||||
counterSets({ CounterSet(cc, "TLog"),
|
||||
CounterSet(cc, "KVMemory"),
|
||||
CounterSet(cc, "KVRedwood"),
|
||||
|
|
|
@ -578,7 +578,7 @@ void traceTSSErrors(const char* name, UID tssId, const std::unordered_map<int, u
|
|||
Example:
|
||||
GetValueLatencySSMean
|
||||
*/
|
||||
void traceSSOrTSSPercentiles(TraceEvent& ev, const std::string name, ContinuousSample<double>& sample) {
|
||||
void traceSSOrTSSPercentiles(TraceEvent& ev, const std::string name, DDSketch<double>& sample) {
|
||||
ev.detail(name + "Mean", sample.mean());
|
||||
// don't log the larger percentiles unless we actually have enough samples to log the accurate percentile instead of
|
||||
// the largest sample in this window
|
||||
|
@ -595,8 +595,8 @@ void traceSSOrTSSPercentiles(TraceEvent& ev, const std::string name, ContinuousS
|
|||
|
||||
void traceTSSPercentiles(TraceEvent& ev,
|
||||
const std::string name,
|
||||
ContinuousSample<double>& ssSample,
|
||||
ContinuousSample<double>& tssSample) {
|
||||
DDSketch<double>& ssSample,
|
||||
DDSketch<double>& tssSample) {
|
||||
ASSERT(ssSample.getPopulationSize() == tssSample.getPopulationSize());
|
||||
ev.detail(name + "Count", ssSample.getPopulationSize());
|
||||
if (ssSample.getPopulationSize() > 0) {
|
||||
|
@ -1534,17 +1534,16 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
|
||||
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
|
||||
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
|
||||
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000),
|
||||
usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
|
||||
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(), bgGranulesPerRequest(), usedAnyChangeFeeds(false),
|
||||
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
|
||||
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
|
||||
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
|
||||
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), outstandingWatches(0), sharedStatePtr(nullptr),
|
||||
lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0),
|
||||
lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
|
||||
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(), readLatencies(), commitLatencies(), GRVLatencies(),
|
||||
mutationsPerCommit(), bytesPerCommit(), outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0),
|
||||
cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
|
||||
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
|
||||
coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
|
||||
detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
|
||||
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
|
||||
|
||||
|
@ -1838,13 +1837,13 @@ DatabaseContext::DatabaseContext(const Error& err)
|
|||
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
|
||||
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
|
||||
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
|
||||
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000),
|
||||
usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
|
||||
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(), bgGranulesPerRequest(), usedAnyChangeFeeds(false),
|
||||
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
|
||||
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
|
||||
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
|
||||
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), sharedStatePtr(nullptr),
|
||||
transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(), readLatencies(), commitLatencies(), GRVLatencies(),
|
||||
mutationsPerCommit(), bytesPerCommit(), sharedStatePtr(nullptr), transactionTracingSample(false),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
|
||||
|
||||
// Static constructor used by server processes to create a DatabaseContext
|
||||
|
|
|
@ -955,8 +955,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT, false );
|
||||
|
||||
// Server request latency measurement
|
||||
init( LATENCY_SAMPLE_SIZE, 100000 );
|
||||
init( FILE_LATENCY_SAMPLE_SIZE, 10000 );
|
||||
init( LATENCY_SKETCH_ACCURACY, 0.01 );
|
||||
init( FILE_LATENCY_SKETCH_ACCURACY, 0.01 );
|
||||
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
|
||||
|
||||
// Cluster recovery
|
||||
|
|
|
@ -75,8 +75,8 @@ struct BlobWorkerStats {
|
|||
Reference<FlowLock> resnapshotLock,
|
||||
Reference<FlowLock> deltaWritesLock,
|
||||
double sampleLoggingInterval,
|
||||
int fileOpLatencySampleSize,
|
||||
int requestLatencySampleSize)
|
||||
double fileOpLatencySketchAccuracy,
|
||||
double requestLatencySketchAccuracy)
|
||||
: cc("BlobWorkerStats", id.toString()),
|
||||
|
||||
s3PutReqs("S3PutReqs", cc), s3GetReqs("S3GetReqs", cc), s3DeleteReqs("S3DeleteReqs", cc),
|
||||
|
@ -95,10 +95,13 @@ struct BlobWorkerStats {
|
|||
forceFlushCleanups("ForceFlushCleanups", cc), readDrivenCompactions("ReadDrivenCompactions", cc),
|
||||
numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0), granulesPendingSplitCheck(0),
|
||||
minimumCFVersion(0), cfVersionLag(0), notAtLatestChangeFeeds(0), lastResidentMemory(0),
|
||||
snapshotBlobWriteLatencySample("SnapshotBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
deltaBlobWriteLatencySample("DeltaBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
reSnapshotLatencySample("GranuleResnapshotMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
readLatencySample("GranuleReadLatencyMetrics", id, sampleLoggingInterval, requestLatencySampleSize),
|
||||
snapshotBlobWriteLatencySample("SnapshotBlobWriteMetrics",
|
||||
id,
|
||||
sampleLoggingInterval,
|
||||
fileOpLatencySketchAccuracy),
|
||||
deltaBlobWriteLatencySample("DeltaBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySketchAccuracy),
|
||||
reSnapshotLatencySample("GranuleResnapshotMetrics", id, sampleLoggingInterval, fileOpLatencySketchAccuracy),
|
||||
readLatencySample("GranuleReadLatencyMetrics", id, sampleLoggingInterval, requestLatencySketchAccuracy),
|
||||
estimatedMaxResidentMemory(0), initialSnapshotLock(initialSnapshotLock), resnapshotLock(resnapshotLock),
|
||||
deltaWritesLock(deltaWritesLock) {
|
||||
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
|
||||
|
|
|
@ -42,8 +42,8 @@
|
|||
#include "fdbrpc/MultiInterface.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "fdbclient/EventTypes.actor.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/Smoother.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
|
||||
class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
|
||||
public:
|
||||
|
@ -567,7 +567,7 @@ public:
|
|||
Counter bgReadRowsCleared;
|
||||
Counter bgReadRowsInserted;
|
||||
Counter bgReadRowsUpdated;
|
||||
ContinuousSample<double> bgLatencies, bgGranulesPerRequest;
|
||||
DDSketch<double> bgLatencies, bgGranulesPerRequest;
|
||||
|
||||
// Change Feed metrics. Omit change feed metrics from logging if not used
|
||||
bool usedAnyChangeFeeds;
|
||||
|
@ -579,8 +579,7 @@ public:
|
|||
Counter feedPops;
|
||||
Counter feedPopsFallback;
|
||||
|
||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit,
|
||||
bytesPerCommit;
|
||||
DDSketch<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit;
|
||||
|
||||
int outstandingWatches;
|
||||
int maxOutstandingWatches;
|
||||
|
|
|
@ -924,8 +924,8 @@ public:
|
|||
std::string REDWOOD_IO_PRIORITIES;
|
||||
|
||||
// Server request latency measurement
|
||||
int LATENCY_SAMPLE_SIZE;
|
||||
int FILE_LATENCY_SAMPLE_SIZE;
|
||||
double LATENCY_SKETCH_ACCURACY;
|
||||
double FILE_LATENCY_SKETCH_ACCURACY;
|
||||
double LATENCY_METRICS_LOGGING_INTERVAL;
|
||||
|
||||
// Cluster recovery
|
||||
|
|
|
@ -878,11 +878,11 @@ Peer::Peer(TransportData* transport, NetworkAddress const& destination)
|
|||
: transport(transport), destination(destination), compatible(true), outgoingConnectionIdle(true),
|
||||
lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), peerReferences(-1),
|
||||
bytesReceived(0), bytesSent(0), lastDataPacketSentTime(now()), outstandingReplies(0),
|
||||
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedTime(0.0),
|
||||
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SKETCH_ACCURACY : 0.1), lastLoggedTime(0.0),
|
||||
lastLoggedBytesReceived(0), lastLoggedBytesSent(0), timeoutCount(0),
|
||||
protocolVersion(Reference<AsyncVar<Optional<ProtocolVersion>>>(new AsyncVar<Optional<ProtocolVersion>>())),
|
||||
connectOutgoingCount(0), connectIncomingCount(0), connectFailedCount(0),
|
||||
connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1) {
|
||||
connectLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SKETCH_ACCURACY : 0.1) {
|
||||
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
|
||||
}
|
||||
|
||||
|
|
|
@ -62,15 +62,15 @@ public:
|
|||
LatencySample readLatencySample = { "AsyncFileKAIOReadLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->KAIO_LATENCY_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->KAIO_LATENCY_SAMPLE_SIZE };
|
||||
FLOW_KNOBS->KAIO_LATENCY_SKETCH_ACCURACY };
|
||||
LatencySample writeLatencySample = { "AsyncFileKAIOWriteLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->KAIO_LATENCY_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->KAIO_LATENCY_SAMPLE_SIZE };
|
||||
FLOW_KNOBS->KAIO_LATENCY_SKETCH_ACCURACY };
|
||||
LatencySample syncLatencySample = { "AsyncFileKAIOSyncLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->KAIO_LATENCY_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->KAIO_LATENCY_SAMPLE_SIZE };
|
||||
FLOW_KNOBS->KAIO_LATENCY_SKETCH_ACCURACY };
|
||||
};
|
||||
|
||||
static AsyncFileKAIOMetrics& getMetrics() {
|
||||
|
|
|
@ -0,0 +1,326 @@
|
|||
/*
|
||||
* DDSketch.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef DDSKETCH_H
|
||||
#define DDSKETCH_H
|
||||
#include <iterator>
|
||||
#include <limits>
|
||||
#include <type_traits>
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
#include <cmath>
|
||||
#include "flow/Error.h"
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
// A namespace for fast log() computation.
|
||||
namespace fastLogger {
|
||||
// Basically, the goal is to compute log(x)/log(r).
|
||||
// For double, it is represented as 2^e*(1+s) (0<=s<1), so our goal becomes
|
||||
// e*log(2)/log(r)*log(1+s), and we approximate log(1+s) with a cubic function.
|
||||
// See more details on Datadog's paper, or CubicallyInterpolatedMapping.java in
|
||||
// https://github.com/DataDog/sketches-java/
|
||||
inline const double correctingFactor = 1.00988652862227438516; // = 7 / (10 * log(2));
|
||||
constexpr inline const double A = 6.0 / 35.0, B = -3.0 / 5.0, C = 10.0 / 7.0;
|
||||
|
||||
inline double fastlog(double value) {
|
||||
int e;
|
||||
double s = frexp(value, &e);
|
||||
s = s * 2 - 1;
|
||||
return ((A * s + B) * s + C) * s + e - 1;
|
||||
}
|
||||
|
||||
inline double reverseLog(double index) {
|
||||
long exponent = floor(index);
|
||||
// Derived from Cardano's formula
|
||||
double d0 = B * B - 3 * A * C;
|
||||
double d1 = 2 * B * B * B - 9 * A * B * C - 27 * A * A * (index - exponent);
|
||||
double p = cbrt((d1 - sqrt(d1 * d1 - 4 * d0 * d0 * d0)) / 2);
|
||||
double significandPlusOne = -(B + p + d0 / p) / (3 * A) + 1;
|
||||
return ldexp(significandPlusOne / 2, exponent + 1);
|
||||
}
|
||||
}; // namespace fastLogger
|
||||
|
||||
// DDSketch for non-negative numbers (those < EPS = 10^-18 are
|
||||
// treated as 0, and huge numbers (>1/EPS) fail ASSERT). This is the base
|
||||
// class without a concrete log() implementation.
|
||||
template <class Impl, class T>
|
||||
class DDSketchBase {
|
||||
|
||||
static constexpr T defaultMin() { return std::numeric_limits<T>::max(); }
|
||||
|
||||
static constexpr T defaultMax() {
|
||||
if constexpr (std::is_floating_point_v<T>) {
|
||||
return -std::numeric_limits<T>::max();
|
||||
} else {
|
||||
return std::numeric_limits<T>::min();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit DDSketchBase(double errorGuarantee)
|
||||
: errorGuarantee(errorGuarantee), populationSize(0), zeroPopulationSize(0), minValue(defaultMin()),
|
||||
maxValue(defaultMax()), sum(T()) {
|
||||
ASSERT(errorGuarantee > 0 && errorGuarantee < 1);
|
||||
}
|
||||
|
||||
DDSketchBase<Impl, T>& addSample(T sample) {
|
||||
// Call it addSample for now, while it is not a sample anymore
|
||||
if (!populationSize)
|
||||
minValue = maxValue = sample;
|
||||
|
||||
if (sample <= EPS) {
|
||||
zeroPopulationSize++;
|
||||
} else {
|
||||
size_t index = static_cast<Impl*>(this)->getIndex(sample);
|
||||
ASSERT(index >= 0 && index < buckets.size());
|
||||
try {
|
||||
buckets.at(index)++;
|
||||
} catch (std::out_of_range const& e) {
|
||||
fmt::print(stderr,
|
||||
"ERROR: Invalid DDSketch bucket index ({}) at {}/{} for sample: {}\n",
|
||||
e.what(),
|
||||
index,
|
||||
buckets.size(),
|
||||
sample);
|
||||
}
|
||||
}
|
||||
|
||||
populationSize++;
|
||||
sum += sample;
|
||||
maxValue = std::max(maxValue, sample);
|
||||
minValue = std::min(minValue, sample);
|
||||
return *this;
|
||||
}
|
||||
|
||||
double mean() const {
|
||||
if (populationSize == 0)
|
||||
return 0;
|
||||
return (double)sum / populationSize;
|
||||
}
|
||||
|
||||
T median() { return percentile(0.5); }
|
||||
|
||||
T percentile(double percentile) {
|
||||
ASSERT(percentile >= 0 && percentile <= 1);
|
||||
|
||||
if (populationSize == 0)
|
||||
return T();
|
||||
uint64_t targetPercentilePopulation = percentile * (populationSize - 1);
|
||||
// Now find the tPP-th (0-indexed) element
|
||||
if (targetPercentilePopulation < zeroPopulationSize)
|
||||
return T(0);
|
||||
|
||||
size_t index = 0;
|
||||
[[maybe_unused]] bool found = false;
|
||||
if (percentile <= 0.5) { // count up
|
||||
uint64_t count = zeroPopulationSize;
|
||||
for (size_t i = 0; i < buckets.size(); i++) {
|
||||
if (targetPercentilePopulation < count + buckets[i]) {
|
||||
// count + buckets[i] = # of numbers so far (from the rightmost to
|
||||
// this bucket, inclusive), so if target is in this bucket, it should
|
||||
// means tPP < cnt + bck[i]
|
||||
found = true;
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
count += buckets[i];
|
||||
}
|
||||
} else { // and count down
|
||||
uint64_t count = 0;
|
||||
for (auto rit = buckets.rbegin(); rit != buckets.rend(); rit++) {
|
||||
if (targetPercentilePopulation + count + *rit >= populationSize) {
|
||||
// cnt + bkt[i] is # of numbers to the right of this bucket (incl.),
|
||||
// so if target is not in this bucket (i.e., to the left of this
|
||||
// bucket), it would be as right as the left bucket's rightmost
|
||||
// number, so we would have tPP + cnt + bkt[i] < total population (tPP
|
||||
// is 0-indexed), that means target is in this bucket if this
|
||||
// condition is not satisfied.
|
||||
found = true;
|
||||
index = std::distance(rit, buckets.rend()) - 1;
|
||||
break;
|
||||
}
|
||||
count += *rit;
|
||||
}
|
||||
}
|
||||
ASSERT(found);
|
||||
if (!found)
|
||||
return -1;
|
||||
return static_cast<Impl*>(this)->getValue(index);
|
||||
}
|
||||
|
||||
T min() const { return minValue; }
|
||||
T max() const { return maxValue; }
|
||||
|
||||
void clear() {
|
||||
std::fill(buckets.begin(), buckets.end(), 0);
|
||||
populationSize = zeroPopulationSize = 0;
|
||||
sum = 0;
|
||||
minValue = defaultMin();
|
||||
maxValue = defaultMax();
|
||||
}
|
||||
|
||||
uint64_t getPopulationSize() const { return populationSize; }
|
||||
|
||||
double getErrorGuarantee() const { return errorGuarantee; }
|
||||
|
||||
size_t getBucketSize() const { return buckets.size(); }
|
||||
|
||||
DDSketchBase<Impl, T>& mergeWith(const DDSketchBase<Impl, T>& anotherSketch) {
|
||||
// Must have the same guarantee
|
||||
ASSERT(fabs(errorGuarantee - anotherSketch.errorGuarantee) < EPS &&
|
||||
anotherSketch.buckets.size() == buckets.size());
|
||||
for (size_t i = 0; i < anotherSketch.buckets.size(); i++) {
|
||||
buckets[i] += anotherSketch.buckets[i];
|
||||
}
|
||||
populationSize += anotherSketch.populationSize;
|
||||
zeroPopulationSize += anotherSketch.zeroPopulationSize;
|
||||
minValue = std::min(minValue, anotherSketch.minValue);
|
||||
maxValue = std::max(maxValue, anotherSketch.maxValue);
|
||||
sum += anotherSketch.sum;
|
||||
return *this;
|
||||
}
|
||||
|
||||
constexpr static double EPS = 1e-18; // smaller numbers are considered as 0
|
||||
protected:
|
||||
double errorGuarantee; // As defined in the paper
|
||||
|
||||
uint64_t populationSize, zeroPopulationSize; // we need to separately count 0s
|
||||
std::vector<uint64_t> buckets;
|
||||
T minValue, maxValue, sum;
|
||||
void setBucketSize(size_t capacity) { buckets.resize(capacity, 0); }
|
||||
};
|
||||
|
||||
// DDSketch with fast log implementation for float numbers
|
||||
template <class T>
|
||||
class DDSketch : public DDSketchBase<DDSketch<T>, T> {
|
||||
public:
|
||||
explicit DDSketch(double errorGuarantee = 0.01)
|
||||
: DDSketchBase<DDSketch<T>, T>(errorGuarantee), gamma((1.0 + errorGuarantee) / (1.0 - errorGuarantee)),
|
||||
multiplier(fastLogger::correctingFactor * log(2) / log(gamma)) {
|
||||
ASSERT(errorGuarantee > 0);
|
||||
offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS);
|
||||
ASSERT(offset > 0);
|
||||
this->setBucketSize(2 * offset);
|
||||
}
|
||||
|
||||
size_t getIndex(T sample) {
|
||||
static_assert(__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__, "Do not support non-little-endian systems");
|
||||
return ceil(fastLogger::fastlog(sample) * multiplier) + offset;
|
||||
}
|
||||
|
||||
T getValue(size_t index) { return fastLogger::reverseLog((index - offset) / multiplier) * 2.0 / (1 + gamma); }
|
||||
|
||||
private:
|
||||
double gamma, multiplier;
|
||||
size_t offset = 0;
|
||||
};
|
||||
|
||||
// DDSketch with <cmath> log. Slow and only use this when others doesn't work.
|
||||
template <class T>
|
||||
class DDSketchSlow : public DDSketchBase<DDSketchSlow<T>, T> {
|
||||
public:
|
||||
DDSketchSlow(double errorGuarantee = 0.1)
|
||||
: DDSketchBase<DDSketchSlow<T>, T>(errorGuarantee), gamma((1.0 + errorGuarantee) / (1.0 - errorGuarantee)),
|
||||
logGamma(log(gamma)) {
|
||||
offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS) + 5;
|
||||
this->setBucketSize(2 * offset);
|
||||
}
|
||||
|
||||
size_t getIndex(T sample) { return ceil(log(sample) / logGamma) + offset; }
|
||||
|
||||
T getValue(size_t index) { return (T)(2.0 * pow(gamma, (index - offset)) / (1 + gamma)); }
|
||||
|
||||
private:
|
||||
double gamma, logGamma;
|
||||
size_t offset = 0;
|
||||
};
|
||||
|
||||
// DDSketch for unsigned int. Faster than the float version. Fixed accuracy.
|
||||
class DDSketchFastUnsigned : public DDSketchBase<DDSketchFastUnsigned, unsigned> {
|
||||
public:
|
||||
DDSketchFastUnsigned() : DDSketchBase<DDSketchFastUnsigned, unsigned>(errorGuarantee) { this->setBucketSize(129); }
|
||||
|
||||
size_t getIndex(unsigned sample) {
|
||||
__uint128_t v = sample;
|
||||
v *= v;
|
||||
v *= v; // sample^4
|
||||
uint64_t low = (uint64_t)v, high = (uint64_t)(v >> 64);
|
||||
|
||||
return 128 - (high == 0 ? ((low == 0 ? 64 : __builtin_clzll(low)) + 64) : __builtin_clzll(high));
|
||||
}
|
||||
|
||||
unsigned getValue(size_t index) {
|
||||
double r = 1, g = gamma;
|
||||
while (index) { // quick power method for power(gamma, index)
|
||||
if (index & 1)
|
||||
r *= g;
|
||||
g *= g;
|
||||
index >>= 1;
|
||||
}
|
||||
// 2.0 * pow(gamma, index) / (1 + gamma) is what we need
|
||||
return (unsigned)(2.0 * r / (1 + gamma) + 0.5); // round to nearest int
|
||||
}
|
||||
|
||||
private:
|
||||
constexpr static double errorGuarantee = 0.08642723372;
|
||||
// getIndex basically calc floor(log_2(x^4)) + 1,
|
||||
// which is almost ceil(log_2(x^4)) as it only matters when x is a power of 2,
|
||||
// and it does not change the error bound. Original sketch asks for
|
||||
// ceil(log_r(x)), so we know r = pow(2, 1/4) = 1.189207115. And r = (1 + eG)
|
||||
// / (1 - eG) so eG = 0.08642723372.
|
||||
constexpr static double gamma = 1.189207115;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
TEST_CASE("/fdbrpc/ddsketch/accuracy") {
|
||||
|
||||
int TRY = 100, SIZE = 1e6;
|
||||
const int totalPercentiles = 7;
|
||||
double targetPercentiles[totalPercentiles] = { .0001, .01, .1, .50, .90, .99, .9999 };
|
||||
double stat[totalPercentiles] = { 0 };
|
||||
for (int t = 0; t < TRY; t++) {
|
||||
DDSketch<double> dd;
|
||||
std::vector<double> nums;
|
||||
for (int i = 0; i < SIZE; i++) {
|
||||
static double a = 1, b = 1; // a skewed distribution
|
||||
auto y = deterministicRandom()->random01();
|
||||
auto num = b / pow(1 - y, 1 / a);
|
||||
nums.push_back(num);
|
||||
dd.addSample(num);
|
||||
}
|
||||
std::sort(nums.begin(), nums.end());
|
||||
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
|
||||
double percentile = targetPercentiles[percentID];
|
||||
double ground = nums[percentile * (SIZE - 1)], ddvalue = dd.percentile(percentile);
|
||||
double relativeError = fabs(ground - ddvalue) / ground;
|
||||
stat[percentID] += relativeError;
|
||||
}
|
||||
}
|
||||
|
||||
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
|
||||
printf("%.4lf per, relative error %.4lf\n", targetPercentiles[percentID], stat[percentID] / TRY);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
#include <algorithm>
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbrpc/HealthMonitor.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/network.h"
|
||||
|
@ -159,7 +159,7 @@ struct Peer : public ReferenceCounted<Peer> {
|
|||
int64_t bytesSent;
|
||||
double lastDataPacketSentTime;
|
||||
int outstandingReplies;
|
||||
ContinuousSample<double> pingLatencies;
|
||||
DDSketch<double> pingLatencies;
|
||||
double lastLoggedTime;
|
||||
int64_t lastLoggedBytesReceived;
|
||||
int64_t lastLoggedBytesSent;
|
||||
|
@ -171,7 +171,7 @@ struct Peer : public ReferenceCounted<Peer> {
|
|||
int connectOutgoingCount;
|
||||
int connectIncomingCount;
|
||||
int connectFailedCount;
|
||||
ContinuousSample<double> connectLatencies;
|
||||
DDSketch<double> connectLatencies;
|
||||
Promise<Void> disconnect;
|
||||
|
||||
explicit Peer(TransportData* transport, NetworkAddress const& destination);
|
||||
|
|
|
@ -38,7 +38,7 @@ MyCounters() : foo("foo", cc), bar("bar", cc), baz("baz", cc) {}
|
|||
#include <cstddef>
|
||||
#include "flow/flow.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
|
||||
struct ICounter {
|
||||
// All counters have a name and value
|
||||
|
@ -216,40 +216,44 @@ public:
|
|||
|
||||
class LatencySample {
|
||||
public:
|
||||
LatencySample(std::string name, UID id, double loggingInterval, int sampleSize)
|
||||
: name(name), id(id), sampleStart(now()), sample(sampleSize),
|
||||
LatencySample(std::string name, UID id, double loggingInterval, double accuracy)
|
||||
: name(name), id(id), sampleStart(now()), sketch(accuracy),
|
||||
latencySampleEventHolder(makeReference<EventCacheHolder>(id.toString() + "/" + name)) {
|
||||
assert(accuracy > 0);
|
||||
if (accuracy <= 0) {
|
||||
fmt::print(stderr, "ERROR: LatencySample {} has invalid accuracy ({})", name, accuracy);
|
||||
}
|
||||
logger = recurring([this]() { logSample(); }, loggingInterval);
|
||||
}
|
||||
|
||||
void addMeasurement(double measurement) { sample.addSample(measurement); }
|
||||
void addMeasurement(double measurement) { sketch.addSample(measurement); }
|
||||
|
||||
private:
|
||||
std::string name;
|
||||
UID id;
|
||||
double sampleStart;
|
||||
|
||||
ContinuousSample<double> sample;
|
||||
DDSketch<double> sketch;
|
||||
Future<Void> logger;
|
||||
|
||||
Reference<EventCacheHolder> latencySampleEventHolder;
|
||||
|
||||
void logSample() {
|
||||
TraceEvent(name.c_str(), id)
|
||||
.detail("Count", sample.getPopulationSize())
|
||||
.detail("Count", sketch.getPopulationSize())
|
||||
.detail("Elapsed", now() - sampleStart)
|
||||
.detail("Min", sample.min())
|
||||
.detail("Max", sample.max())
|
||||
.detail("Mean", sample.mean())
|
||||
.detail("Median", sample.median())
|
||||
.detail("P25", sample.percentile(0.25))
|
||||
.detail("P90", sample.percentile(0.9))
|
||||
.detail("P95", sample.percentile(0.95))
|
||||
.detail("P99", sample.percentile(0.99))
|
||||
.detail("P99.9", sample.percentile(0.999))
|
||||
.detail("Min", sketch.min())
|
||||
.detail("Max", sketch.max())
|
||||
.detail("Mean", sketch.mean())
|
||||
.detail("Median", sketch.median())
|
||||
.detail("P25", sketch.percentile(0.25))
|
||||
.detail("P90", sketch.percentile(0.9))
|
||||
.detail("P95", sketch.percentile(0.95))
|
||||
.detail("P99", sketch.percentile(0.99))
|
||||
.detail("P99.9", sketch.percentile(0.999))
|
||||
.trackLatest(latencySampleEventHolder->trackingKey);
|
||||
|
||||
sample.clear();
|
||||
sketch.clear();
|
||||
sampleStart = now();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -25,7 +25,6 @@
|
|||
#ifndef FDBRPC_TSS_COMPARISON_H
|
||||
#define FDBRPC_TSS_COMPARISON_H
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/Stats.h"
|
||||
|
||||
// refcounted + noncopyable because both DatabaseContext and individual endpoints share ownership
|
||||
|
@ -48,15 +47,15 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||
Counter mismatches;
|
||||
|
||||
// We could probably just ignore getKey as it's seldom used?
|
||||
ContinuousSample<double> SSgetValueLatency;
|
||||
ContinuousSample<double> SSgetKeyLatency;
|
||||
ContinuousSample<double> SSgetKeyValuesLatency;
|
||||
ContinuousSample<double> SSgetMappedKeyValuesLatency;
|
||||
DDSketch<double> SSgetValueLatency;
|
||||
DDSketch<double> SSgetKeyLatency;
|
||||
DDSketch<double> SSgetKeyValuesLatency;
|
||||
DDSketch<double> SSgetMappedKeyValuesLatency;
|
||||
|
||||
ContinuousSample<double> TSSgetValueLatency;
|
||||
ContinuousSample<double> TSSgetKeyLatency;
|
||||
ContinuousSample<double> TSSgetKeyValuesLatency;
|
||||
ContinuousSample<double> TSSgetMappedKeyValuesLatency;
|
||||
DDSketch<double> TSSgetValueLatency;
|
||||
DDSketch<double> TSSgetKeyLatency;
|
||||
DDSketch<double> TSSgetKeyValuesLatency;
|
||||
DDSketch<double> TSSgetMappedKeyValuesLatency;
|
||||
|
||||
std::unordered_map<int, uint64_t> ssErrorsByCode;
|
||||
std::unordered_map<int, uint64_t> tssErrorsByCode;
|
||||
|
@ -106,9 +105,9 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||
TSSMetrics()
|
||||
: cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc),
|
||||
ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc),
|
||||
mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000), SSgetKeyValuesLatency(1000),
|
||||
SSgetMappedKeyValuesLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000),
|
||||
TSSgetKeyValuesLatency(1000), TSSgetMappedKeyValuesLatency(1000) {}
|
||||
mismatches("Mismatches", cc), SSgetValueLatency(), SSgetKeyLatency(), SSgetKeyValuesLatency(),
|
||||
SSgetMappedKeyValuesLatency(), TSSgetValueLatency(), TSSgetKeyLatency(), TSSgetKeyValuesLatency(),
|
||||
TSSgetMappedKeyValuesLatency() {}
|
||||
};
|
||||
|
||||
template <class Rep>
|
||||
|
|
|
@ -305,8 +305,8 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
|
|||
resnapshotLock,
|
||||
deltaWritesLock,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->FILE_LATENCY_SAMPLE_SIZE,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->FILE_LATENCY_SKETCH_ACCURACY,
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
isEncryptionEnabled(isEncryptionOpSupported(EncryptOperationType::BLOB_GRANULE_ENCRYPTION)) {}
|
||||
|
||||
bool managerEpochOk(int64_t epoch) {
|
||||
|
|
|
@ -241,15 +241,15 @@ public:
|
|||
kmsLookupByIdsReqLatency("EKPKmsLookupByIdsReqLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
kmsLookupByDomainIdsReqLatency("EKPKmsLookupByDomainIdsReqLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
kmsBlobMetadataReqLatency("EKPKmsBlobMetadataReqLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE) {}
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY) {}
|
||||
|
||||
EncryptBaseCipherDomainIdKeyIdCacheKey getBaseCipherDomainIdKeyIdCacheKey(
|
||||
const EncryptCipherDomainId domainId,
|
||||
|
|
|
@ -117,20 +117,20 @@ struct GrvProxyStats {
|
|||
defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
|
||||
grvLatencySample("GRVLatencyMetrics",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
grvBatchLatencySample("GRVBatchLatencyMetrics",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
recentRequests(0), lastBucketBegin(now()),
|
||||
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
|
||||
grvConfirmEpochLiveDist(
|
||||
|
@ -215,7 +215,7 @@ struct GrvProxyData {
|
|||
versionVectorSizeOnGRVReply("VersionVectorSizeOnGRVReply",
|
||||
dbgid,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
updateCommitRequests(0), lastCommitTime(0), version(0), minKnownCommittedVersion(invalidVersion),
|
||||
tagThrottler(SERVER_KNOBS->PROXY_MAX_TAG_THROTTLE_DURATION) {}
|
||||
};
|
||||
|
|
|
@ -111,15 +111,15 @@ SharedRocksDBState::SharedRocksDBState(UID id)
|
|||
readOptions(initialReadOptions()), commitLatency(LatencySample("RocksDBCommitLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE)),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)),
|
||||
commitQueueLatency(LatencySample("RocksDBCommitQueueLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE)),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)),
|
||||
dbWriteLatency(LatencySample("RocksDBWriteLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE)) {}
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)) {}
|
||||
|
||||
rocksdb::ColumnFamilyOptions SharedRocksDBState::initialCfOptions() {
|
||||
rocksdb::ColumnFamilyOptions options;
|
||||
|
|
|
@ -1824,8 +1824,11 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
|
|||
if (logData->blockingPeekLatencies.find(reqTag) == logData->blockingPeekLatencies.end()) {
|
||||
UID ssID = nondeterministicRandom()->randomUniqueID();
|
||||
std::string s = "BlockingPeekLatencies-" + reqTag.toString();
|
||||
logData->blockingPeekLatencies.try_emplace(
|
||||
reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE);
|
||||
logData->blockingPeekLatencies.try_emplace(reqTag,
|
||||
s,
|
||||
ssID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY);
|
||||
}
|
||||
LatencySample& sample = logData->blockingPeekLatencies.at(reqTag);
|
||||
sample.addMeasurement(latency);
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/DeltaTree.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
|
|
|
@ -121,20 +121,20 @@ struct ProxyStats {
|
|||
commitLatencySample("CommitLatencyMetrics",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
|
||||
commitBatchingEmptyMessageRatio("CommitBatchingEmptyMessageRatio",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
commitBatchingWindowSize("CommitBatchingWindowSize",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
computeLatency("ComputeLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
maxComputeNS(0), minComputeNS(1e12),
|
||||
commitBatchQueuingDist(
|
||||
Histogram::getHistogram("CommitProxy"_sr, "CommitBatchQueuing"_sr, Histogram::Unit::microseconds)),
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#elif !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_H)
|
||||
#define FDBSERVER_READWRITEWORKLOAD_ACTOR_H
|
||||
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -46,7 +47,7 @@ DESCR struct ReadMetric {
|
|||
|
||||
// Common ReadWrite test settings
|
||||
struct ReadWriteCommon : KVWorkload {
|
||||
static constexpr int sampleSize = 10000;
|
||||
static constexpr double sampleError = 0.01;
|
||||
friend struct ReadWriteCommonImpl;
|
||||
|
||||
// general test setting
|
||||
|
@ -75,7 +76,7 @@ struct ReadWriteCommon : KVWorkload {
|
|||
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
|
||||
EventMetricHandle<ReadMetric> readMetric;
|
||||
PerfIntCounter aTransactions, bTransactions, retries;
|
||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
|
||||
DDSketch<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
|
||||
double readLatencyTotal;
|
||||
int readLatencyCount;
|
||||
std::vector<PerfMetric> periodicMetrics;
|
||||
|
@ -87,9 +88,9 @@ struct ReadWriteCommon : KVWorkload {
|
|||
|
||||
explicit ReadWriteCommon(WorkloadContext const& wcx)
|
||||
: KVWorkload(wcx), totalReadsMetric("ReadWrite.TotalReads"_sr), totalRetriesMetric("ReadWrite.TotalRetries"_sr),
|
||||
aTransactions("A Transactions"), bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize),
|
||||
readLatencies(sampleSize), commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize),
|
||||
readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), clientBegin(0) {
|
||||
aTransactions("A Transactions"), bTransactions("B Transactions"), retries("Retries"), latencies(sampleError),
|
||||
readLatencies(sampleError), commitLatencies(sampleError), GRVLatencies(sampleError),
|
||||
fullReadLatencies(sampleError), readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), clientBegin(0) {
|
||||
|
||||
transactionSuccessMetric.init("ReadWrite.SuccessfulTransaction"_sr);
|
||||
transactionFailureMetric.init("ReadWrite.FailedTransaction"_sr);
|
||||
|
|
|
@ -102,17 +102,17 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
versionVectorTagUpdates("VersionVectorTagUpdates",
|
||||
dbgid,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
waitForPrevCommitRequests("WaitForPrevCommitRequests", cc),
|
||||
nonWaitForPrevCommitRequests("NonWaitForPrevCommitRequests", cc),
|
||||
versionVectorSizeOnCVReply("VersionVectorSizeOnCVReply",
|
||||
dbgid,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
waitForPrevLatencies("WaitForPrevLatencies",
|
||||
dbgid,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
addActor(addActor) {
|
||||
logger = cc.traceCounters("MasterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, "MasterMetrics");
|
||||
if (forceRecovery && !myInterface.locality.dcId().present()) {
|
||||
|
|
|
@ -1273,48 +1273,48 @@ public:
|
|||
readLatencySample("ReadLatencyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
readKeyLatencySample("GetKeyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
readValueLatencySample("GetValueMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
readRangeLatencySample("GetRangeMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
readVersionWaitSample("ReadVersionWaitMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
readQueueWaitSample("ReadQueueWaitMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
|
||||
mappedRangeSample("GetMappedRangeMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
mappedRangeRemoteSample("GetMappedRangeRemoteMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
mappedRangeLocalSample("GetMappedRangeLocalMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
kvReadRangeLatencySample("KVGetRangeMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
|
||||
updateLatencySample("UpdateLatencyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE) {
|
||||
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY) {
|
||||
specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; });
|
||||
specialCounter(cc, "Version", [self]() { return self->version.get(); });
|
||||
specialCounter(cc, "StorageVersion", [self]() { return self->storageVersion(); });
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
@ -34,11 +34,10 @@ struct BulkLoadWorkload : TestWorkload {
|
|||
|
||||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter transactions, retries;
|
||||
ContinuousSample<double> latencies;
|
||||
DDSketch<double> latencies;
|
||||
|
||||
BulkLoadWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx), clientCount(wcx.clientCount), transactions("Transactions"), retries("Retries"),
|
||||
latencies(2000) {
|
||||
: TestWorkload(wcx), clientCount(wcx.clientCount), transactions("Transactions"), retries("Retries"), latencies() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
actorCount = getOption(options, "actorCount"_sr, 20);
|
||||
writesPerTransaction = getOption(options, "writesPerTransaction"_sr, 10);
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
@ -33,10 +33,10 @@ struct DDBalanceWorkload : TestWorkload {
|
|||
|
||||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter bin_shifts, operations, retries;
|
||||
ContinuousSample<double> latencies;
|
||||
DDSketch<double> latencies;
|
||||
|
||||
DDBalanceWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx), bin_shifts("Bin_Shifts"), operations("Operations"), retries("Retries"), latencies(2000) {
|
||||
: TestWorkload(wcx), bin_shifts("Bin_Shifts"), operations("Operations"), retries("Retries"), latencies() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
binCount = getOption(options, "binCount"_sr, 1000);
|
||||
writesPerTransaction = getOption(options, "writesPerTransaction"_sr, 1);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
@ -33,8 +33,8 @@ struct FileSystemWorkload : TestWorkload {
|
|||
|
||||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter queries, writes;
|
||||
ContinuousSample<double> latencies;
|
||||
ContinuousSample<double> writeLatencies;
|
||||
DDSketch<double> latencies;
|
||||
DDSketch<double> writeLatencies;
|
||||
|
||||
class FileSystemOp {
|
||||
public:
|
||||
|
@ -44,7 +44,7 @@ struct FileSystemWorkload : TestWorkload {
|
|||
};
|
||||
|
||||
FileSystemWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx), queries("Queries"), writes("Latency"), latencies(2500), writeLatencies(1000) {
|
||||
: TestWorkload(wcx), queries("Queries"), writes("Latency"), latencies(), writeLatencies() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
transactionsPerSecond = getOption(options, "transactionsPerSecond"_sr, 5000.0) / clientCount;
|
||||
double allowedLatency = getOption(options, "allowedLatency"_sr, 0.250);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/IKnobCollection.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
|
|
|
@ -63,7 +63,7 @@ struct MakoWorkload : TestWorkload {
|
|||
// used for periodically tracing
|
||||
std::vector<PerfMetric> periodicMetrics;
|
||||
// store latency of each operation with sampling
|
||||
std::vector<ContinuousSample<double>> opLatencies;
|
||||
std::vector<DDSketch<double>> opLatencies;
|
||||
// key used to store checkSum for given key range
|
||||
std::vector<Key> csKeys;
|
||||
// key prefix of for all generated keys
|
||||
|
@ -142,7 +142,7 @@ struct MakoWorkload : TestWorkload {
|
|||
parseOperationsSpec();
|
||||
for (int i = 0; i < MAX_OP; ++i) {
|
||||
// initilize per-operation latency record
|
||||
opLatencies.push_back(ContinuousSample<double>(rowCount / sampleSize));
|
||||
opLatencies.push_back(DDSketch<double>());
|
||||
// initialize per-operation counter
|
||||
opCounters.push_back(PerfIntCounter(opNames[i]));
|
||||
}
|
||||
|
@ -658,7 +658,7 @@ struct MakoWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
ACTOR template <class T>
|
||||
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies) {
|
||||
static Future<Void> logLatency(Future<T> f, DDSketch<double>* opLatencies) {
|
||||
state double opBegin = timer();
|
||||
wait(success(f));
|
||||
opLatencies->addSample(timer() - opBegin);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
*/
|
||||
#include <vector>
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
@ -38,10 +38,10 @@ struct QueuePushWorkload : TestWorkload {
|
|||
|
||||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter transactions, retries;
|
||||
ContinuousSample<double> commitLatencies, GRVLatencies;
|
||||
DDSketch<double> commitLatencies, GRVLatencies;
|
||||
|
||||
QueuePushWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx), transactions("Transactions"), retries("Retries"), commitLatencies(2000), GRVLatencies(2000) {
|
||||
: TestWorkload(wcx), transactions("Transactions"), retries("Retries"), commitLatencies(), GRVLatencies() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
actorCount = getOption(options, "actorCount"_sr, 50);
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
|
|
|
@ -25,8 +25,6 @@
|
|||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
static constexpr int SAMPLE_SIZE = 10000;
|
||||
|
||||
// If the log->storage propagation delay is longer than 1 second, then it's likely that our read
|
||||
// will see a `future_version` error from the storage server. We need to retry the read until
|
||||
// a value is returned, or a different error is thrown.
|
||||
|
@ -51,9 +49,9 @@ struct ReadAfterWriteWorkload : KVWorkload {
|
|||
static constexpr auto NAME = "ReadAfterWrite";
|
||||
|
||||
double testDuration;
|
||||
ContinuousSample<double> propagationLatency;
|
||||
DDSketch<double> propagationLatency;
|
||||
|
||||
ReadAfterWriteWorkload(WorkloadContext const& wcx) : KVWorkload(wcx), propagationLatency(SAMPLE_SIZE) {
|
||||
ReadAfterWriteWorkload(WorkloadContext const& wcx) : KVWorkload(wcx), propagationLatency() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
@ -200,7 +200,7 @@ struct ReadWriteCommonImpl {
|
|||
}
|
||||
}
|
||||
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f,
|
||||
ContinuousSample<double>* latencies,
|
||||
DDSketch<double>* latencies,
|
||||
double* totalLatency,
|
||||
int* latencyCount,
|
||||
EventMetricHandle<ReadMetric> readMetric,
|
||||
|
@ -220,7 +220,7 @@ struct ReadWriteCommonImpl {
|
|||
return Void();
|
||||
}
|
||||
ACTOR static Future<Void> logLatency(Future<RangeResult> f,
|
||||
ContinuousSample<double>* latencies,
|
||||
DDSketch<double>* latencies,
|
||||
double* totalLatency,
|
||||
int* latencyCount,
|
||||
EventMetricHandle<ReadMetric> readMetric,
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
@ -389,4 +389,4 @@ TEST_CASE("/KVWorkload/methods/ParseKeyForIndex") {
|
|||
ASSERT(parse == idx);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/SimpleIni.h"
|
||||
#include "fdbserver/Status.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
@ -37,11 +37,11 @@ struct StreamingReadWorkload : TestWorkload {
|
|||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter transactions, readKeys;
|
||||
PerfIntCounter readValueBytes;
|
||||
ContinuousSample<double> latencies;
|
||||
DDSketch<double> latencies;
|
||||
|
||||
StreamingReadWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx), transactions("Transactions"), readKeys("Keys Read"), readValueBytes("Value Bytes Read"),
|
||||
latencies(2000) {
|
||||
latencies() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
actorCount = getOption(options, "actorCount"_sr, 20);
|
||||
readsPerTransaction = getOption(options, "readsPerTransaction"_sr, 10);
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
@ -189,12 +189,11 @@ struct MeasureSinglePeriod : IMeasurer {
|
|||
double delay, duration;
|
||||
double startT;
|
||||
|
||||
ContinuousSample<double> totalLatency, grvLatency, rowReadLatency, commitLatency;
|
||||
DDSketch<double> totalLatency, grvLatency, rowReadLatency, commitLatency;
|
||||
ITransactor::Stats stats; // totalled over the period
|
||||
|
||||
MeasureSinglePeriod(double delay, double duration)
|
||||
: delay(delay), duration(duration), totalLatency(2000), grvLatency(2000), rowReadLatency(2000),
|
||||
commitLatency(2000) {}
|
||||
: delay(delay), duration(duration), totalLatency(), grvLatency(), rowReadLatency(), commitLatency() {}
|
||||
|
||||
Future<Void> start() override {
|
||||
startT = now();
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/ClusterConnectionMemoryRecord.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
|
|
|
@ -18,15 +18,13 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const int sampleSize = 10000;
|
||||
|
||||
struct WatchesWorkload : TestWorkload {
|
||||
static constexpr auto NAME = "Watches";
|
||||
|
||||
|
@ -34,10 +32,10 @@ struct WatchesWorkload : TestWorkload {
|
|||
double testDuration;
|
||||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter cycles;
|
||||
ContinuousSample<double> cycleLatencies;
|
||||
DDSketch<double> cycleLatencies;
|
||||
std::vector<int> nodeOrder;
|
||||
|
||||
WatchesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), cycles("Cycles"), cycleLatencies(sampleSize) {
|
||||
WatchesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), cycles("Cycles"), cycleLatencies() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 600.0);
|
||||
nodes = getOption(options, "nodeCount"_sr, 100);
|
||||
extraPerNode = getOption(options, "extraPerNode"_sr, 1000);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
@ -37,11 +37,11 @@ struct WriteBandwidthWorkload : KVWorkload {
|
|||
|
||||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter transactions, retries;
|
||||
ContinuousSample<double> commitLatencies, GRVLatencies;
|
||||
DDSketch<double> commitLatencies, GRVLatencies;
|
||||
|
||||
WriteBandwidthWorkload(WorkloadContext const& wcx)
|
||||
: KVWorkload(wcx), loadTime(0.0), transactions("Transactions"), retries("Retries"), commitLatencies(2000),
|
||||
GRVLatencies(2000) {
|
||||
: KVWorkload(wcx), loadTime(0.0), transactions("Transactions"), retries("Retries"), commitLatencies(),
|
||||
GRVLatencies() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
keysPerTransaction = getOption(options, "keysPerTransaction"_sr, 100);
|
||||
valueString = std::string(maxValueBytes, '.');
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
constexpr int SAMPLE_SIZE = 10000;
|
||||
// workload description:
|
||||
// This workload aims to test whether we can throttling some bad clients that doing penetrating write on write hot-spot
|
||||
// range. There are several good clientActor just randomly do read and write ops in transaction. Also, some bad
|
||||
|
@ -41,8 +40,8 @@ struct WriteTagThrottlingWorkload : KVWorkload {
|
|||
int badActorTrNum = 0, badActorRetries = 0, badActorTooOldRetries = 0, badActorCommitFailedRetries = 0;
|
||||
int goodActorThrottleRetries = 0, badActorThrottleRetries = 0;
|
||||
double badActorTotalLatency = 0.0, goodActorTotalLatency = 0.0;
|
||||
ContinuousSample<double> badActorReadLatency, goodActorReadLatency;
|
||||
ContinuousSample<double> badActorCommitLatency, goodActorCommitLatency;
|
||||
DDSketch<double> badActorReadLatency, goodActorReadLatency;
|
||||
DDSketch<double> badActorCommitLatency, goodActorCommitLatency;
|
||||
// Test configuration
|
||||
// KVWorkload::actorCount
|
||||
int goodActorPerClient, badActorPerClient;
|
||||
|
@ -64,8 +63,8 @@ struct WriteTagThrottlingWorkload : KVWorkload {
|
|||
static constexpr int MIN_TRANSACTION_TAG_LENGTH = 2;
|
||||
|
||||
WriteTagThrottlingWorkload(WorkloadContext const& wcx)
|
||||
: KVWorkload(wcx), badActorReadLatency(SAMPLE_SIZE), goodActorReadLatency(SAMPLE_SIZE),
|
||||
badActorCommitLatency(SAMPLE_SIZE), goodActorCommitLatency(SAMPLE_SIZE) {
|
||||
: KVWorkload(wcx), badActorReadLatency(), goodActorReadLatency(), badActorCommitLatency(),
|
||||
goodActorCommitLatency() {
|
||||
testDuration = getOption(options, "testDuration"_sr, 120.0);
|
||||
badOpRate = getOption(options, "badOpRate"_sr, 0.9);
|
||||
numWritePerTr = getOption(options, "numWritePerTr"_sr, 1);
|
||||
|
|
|
@ -112,8 +112,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
init( PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT, 3600.0 );
|
||||
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
|
||||
init( PING_LOGGING_INTERVAL, 3.0 );
|
||||
init( PING_SAMPLE_AMOUNT, 100 );
|
||||
init( NETWORK_CONNECT_SAMPLE_AMOUNT, 100 );
|
||||
init( PING_SKETCH_ACCURACY, 0.1 );
|
||||
|
||||
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
|
||||
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
|
||||
|
@ -168,7 +167,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
init( MIN_SUBMIT, 10 );
|
||||
init( SQLITE_DISK_METRIC_LOGGING_INTERVAL, 5.0 );
|
||||
init( KAIO_LATENCY_LOGGING_INTERVAL, 30.0 );
|
||||
init( KAIO_LATENCY_SAMPLE_SIZE, 30000 );
|
||||
init( KAIO_LATENCY_SKETCH_ACCURACY, 0.01 );
|
||||
|
||||
init( PAGE_WRITE_CHECKSUM_HISTORY, 0 ); if( randomize && BUGGIFY ) PAGE_WRITE_CHECKSUM_HISTORY = 10000000;
|
||||
init( DISABLE_POSIX_KERNEL_AIO, 0 );
|
||||
|
@ -303,7 +302,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
if ( randomize && BUGGIFY) { ENCRYPT_KEY_REFRESH_INTERVAL = deterministicRandom()->randomInt(2, 10); }
|
||||
init( TOKEN_CACHE_SIZE, 100 );
|
||||
init( ENCRYPT_KEY_CACHE_LOGGING_INTERVAL, 5.0 );
|
||||
init( ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE, 1000 );
|
||||
init( ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY, 0.01 );
|
||||
// Refer to EncryptUtil::EncryptAuthTokenAlgo for more details
|
||||
init( ENCRYPT_HEADER_AUTH_TOKEN_ENABLED, true ); if ( randomize && BUGGIFY ) { ENCRYPT_HEADER_AUTH_TOKEN_ENABLED = !ENCRYPT_HEADER_AUTH_TOKEN_ENABLED; }
|
||||
init( ENCRYPT_HEADER_AUTH_TOKEN_ALGO, 1 ); if ( randomize && BUGGIFY ) { ENCRYPT_HEADER_AUTH_TOKEN_ALGO = getRandomAuthTokenAlgo(); }
|
||||
|
|
|
@ -176,8 +176,7 @@ public:
|
|||
int ACCEPT_BATCH_SIZE;
|
||||
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
|
||||
double PING_LOGGING_INTERVAL;
|
||||
int PING_SAMPLE_AMOUNT;
|
||||
int NETWORK_CONNECT_SAMPLE_AMOUNT;
|
||||
double PING_SKETCH_ACCURACY;
|
||||
|
||||
int TLS_CERT_REFRESH_DELAY_SECONDS;
|
||||
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
|
||||
|
@ -231,7 +230,7 @@ public:
|
|||
int MIN_SUBMIT;
|
||||
double SQLITE_DISK_METRIC_LOGGING_INTERVAL;
|
||||
double KAIO_LATENCY_LOGGING_INTERVAL;
|
||||
int KAIO_LATENCY_SAMPLE_SIZE;
|
||||
double KAIO_LATENCY_SKETCH_ACCURACY;
|
||||
|
||||
int PAGE_WRITE_CHECKSUM_HISTORY;
|
||||
int DISABLE_POSIX_KERNEL_AIO;
|
||||
|
@ -365,7 +364,7 @@ public:
|
|||
int64_t ENCRYPT_CIPHER_KEY_CACHE_TTL;
|
||||
int64_t ENCRYPT_KEY_REFRESH_INTERVAL;
|
||||
double ENCRYPT_KEY_CACHE_LOGGING_INTERVAL;
|
||||
double ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE;
|
||||
double ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY;
|
||||
bool ENCRYPT_HEADER_AUTH_TOKEN_ENABLED;
|
||||
int ENCRYPT_HEADER_AUTH_TOKEN_ALGO;
|
||||
|
||||
|
|
|
@ -22,8 +22,62 @@
|
|||
#include "flow/IRandom.h"
|
||||
#include "flowbench/GlobalData.h"
|
||||
#include "fdbrpc/Stats.h"
|
||||
#include "fdbrpc/DDSketch.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "flow/Histogram.h"
|
||||
|
||||
static void bench_ddsketchUnsigned(benchmark::State& state) {
|
||||
DDSketchFastUnsigned dds;
|
||||
InputGenerator<unsigned> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });
|
||||
|
||||
for (auto _ : state) {
|
||||
dds.addSample(data.next());
|
||||
}
|
||||
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
// DDSketchFastUnsigned has a fixed error margin (~8%)
|
||||
BENCHMARK(bench_ddsketchUnsigned)->ReportAggregatesOnly(true);
|
||||
|
||||
static void bench_ddsketchInt(benchmark::State& state) {
|
||||
DDSketch<int64_t> dds((double)state.range(0) / 100);
|
||||
InputGenerator<int64_t> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });
|
||||
|
||||
for (auto _ : state) {
|
||||
dds.addSample(data.next());
|
||||
}
|
||||
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
// Try with 10%, 5% and 1% error margins
|
||||
BENCHMARK(bench_ddsketchInt)->Arg(10)->Arg(5)->Arg(1)->ReportAggregatesOnly(true);
|
||||
|
||||
static void bench_ddsketchDouble(benchmark::State& state) {
|
||||
DDSketch<double> dds((double)state.range(0) / 100);
|
||||
InputGenerator<double> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });
|
||||
|
||||
for (auto _ : state) {
|
||||
dds.addSample(data.next());
|
||||
}
|
||||
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
// Try with 10%, 5% and 1% error margins
|
||||
BENCHMARK(bench_ddsketchDouble)->Arg(10)->Arg(5)->Arg(1)->ReportAggregatesOnly(true);
|
||||
|
||||
static void bench_ddsketchLatency(benchmark::State& state) {
|
||||
DDSketch<double> dds((double)state.range(0) / 100);
|
||||
InputGenerator<double> data(1e6, []() { return deterministicRandom()->random01() * 2.0; });
|
||||
|
||||
for (auto _ : state) {
|
||||
dds.addSample(data.next());
|
||||
}
|
||||
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
// Try with 10%, 5% and 1% error margins
|
||||
BENCHMARK(bench_ddsketchLatency)->Arg(10)->Arg(5)->Arg(1)->ReportAggregatesOnly(true);
|
||||
|
||||
static void bench_continuousSampleInt(benchmark::State& state) {
|
||||
ContinuousSample<int64_t> cs(state.range(0));
|
||||
InputGenerator<int64_t> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });
|
||||
|
|
|
@ -56,6 +56,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES BlobManagerUnit.toml)
|
||||
add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES DDSketch.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES DiskDurability.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES FileSystem.txt IGNORE)
|
||||
|
|
Loading…
Reference in New Issue