Merge remote-tracking branch 'origin/main' into authz-security-tests

This commit is contained in:
Junhyun Shim 2022-11-17 22:44:31 +01:00
commit ce041576f5
62 changed files with 784 additions and 227 deletions

View File

@ -475,7 +475,7 @@ Deletes a tenant from the cluster. The tenant must be empty.
list
^^^^
``tenant list [BEGIN] [END] [LIMIT]``
``tenant list [BEGIN] [END] [limit=LIMIT] [offset=OFFSET] [state=<STATE1>,<STATE2>,...]``
Lists the tenants present in the cluster.
@ -485,6 +485,10 @@ Lists the tenants present in the cluster.
``LIMIT`` - the number of tenants to list. Defaults to 100.
``OFFSET`` - the number of items to skip over, starting from the beginning of the range. Defaults to 0.
``STATE``` - TenantState(s) to filter the list with. Defaults to no filters.
get
^^^

View File

@ -87,6 +87,56 @@ parseTenantConfiguration(std::vector<StringRef> const& tokens, int startIndex, b
return configParams;
}
bool parseTenantListOptions(std::vector<StringRef> const& tokens,
int startIndex,
int& limit,
int& offset,
std::vector<TenantState>& filters) {
for (int tokenNum = startIndex; tokenNum < tokens.size(); ++tokenNum) {
Optional<Value> value;
StringRef token = tokens[tokenNum];
StringRef param;
bool foundEquals;
param = token.eat("=", &foundEquals);
if (!foundEquals) {
fmt::print(stderr,
"ERROR: invalid option string `{}'. String must specify a value using `='.\n",
param.toString().c_str());
return false;
}
value = token;
if (tokencmp(param, "limit")) {
int n = 0;
if (sscanf(value.get().toString().c_str(), "%d%n", &limit, &n) != 1 || n != value.get().size() ||
limit <= 0) {
fmt::print(stderr, "ERROR: invalid limit `{}'\n", token.toString().c_str());
return false;
}
} else if (tokencmp(param, "offset")) {
int n = 0;
if (sscanf(value.get().toString().c_str(), "%d%n", &offset, &n) != 1 || n != value.get().size() ||
offset < 0) {
fmt::print(stderr, "ERROR: invalid offset `{}'\n", token.toString().c_str());
return false;
}
} else if (tokencmp(param, "state")) {
auto filterStrings = value.get().splitAny(","_sr);
try {
for (auto sref : filterStrings) {
filters.push_back(TenantMapEntry::stringToTenantState(sref.toString()));
}
} catch (Error& e) {
fmt::print(stderr, "ERROR: unrecognized tenant state(s) `{}'.\n", value.get().toString());
return false;
}
} else {
fmt::print(stderr, "ERROR: unrecognized parameter `{}'.\n", param.toString().c_str());
return false;
}
}
return true;
}
Key makeConfigKey(TenantNameRef tenantName, StringRef configName) {
return tenantConfigSpecialKeyRange.begin.withSuffix(Tuple().append(tenantName).append(configName).pack());
}
@ -225,17 +275,21 @@ ACTOR Future<bool> tenantDeleteCommand(Reference<IDatabase> db, std::vector<Stri
// tenant list command
ACTOR Future<bool> tenantListCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() > 5) {
fmt::print("Usage: tenant list [BEGIN] [END] [LIMIT]\n\n");
if (tokens.size() > 7) {
fmt::print("Usage: tenant list [BEGIN] [END] [limit=LIMIT] [offset=OFFSET] [state=<STATE1>,<STATE2>,...]\n\n");
fmt::print("Lists the tenants in a cluster.\n");
fmt::print("Only tenants in the range BEGIN - END will be printed.\n");
fmt::print("An optional LIMIT can be specified to limit the number of results (default 100).\n");
fmt::print("Optionally skip over the first OFFSET results (default 0).\n");
fmt::print("Optional comma-separated tenant state(s) can be provided to filter the list.\n");
return false;
}
state StringRef beginTenant = ""_sr;
state StringRef endTenant = "\xff\xff"_sr;
state int limit = 100;
state int offset = 0;
state std::vector<TenantState> filters;
if (tokens.size() >= 3) {
beginTenant = tokens[2];
@ -243,14 +297,12 @@ ACTOR Future<bool> tenantListCommand(Reference<IDatabase> db, std::vector<String
if (tokens.size() >= 4) {
endTenant = tokens[3];
if (endTenant <= beginTenant) {
fmt::print(stderr, "ERROR: end must be larger than begin");
fmt::print(stderr, "ERROR: end must be larger than begin\n");
return false;
}
}
if (tokens.size() == 5) {
int n = 0;
if (sscanf(tokens[4].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[4].size() || limit <= 0) {
fmt::print(stderr, "ERROR: invalid limit `{}'\n", tokens[4].toString().c_str());
if (tokens.size() >= 5) {
if (!parseTenantListOptions(tokens, 4, limit, offset, filters)) {
return false;
}
}
@ -266,7 +318,7 @@ ACTOR Future<bool> tenantListCommand(Reference<IDatabase> db, std::vector<String
state std::vector<TenantName> tenantNames;
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
wait(MetaclusterAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit));
wait(MetaclusterAPI::listTenants(db, beginTenant, endTenant, limit, offset, filters));
for (auto tenant : tenants) {
tenantNames.push_back(tenant.first);
}
@ -613,8 +665,10 @@ std::vector<const char*> tenantHintGenerator(std::vector<StringRef> const& token
} else if (tokencmp(tokens[1], "delete") && tokens.size() < 3) {
static std::vector<const char*> opts = { "<NAME>" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "list") && tokens.size() < 5) {
static std::vector<const char*> opts = { "[BEGIN]", "[END]", "[LIMIT]" };
} else if (tokencmp(tokens[1], "list") && tokens.size() < 7) {
static std::vector<const char*> opts = {
"[BEGIN]", "[END]", "[limit=LIMIT]", "[offset=OFFSET]", "[state=<STATE1>,<STATE2>,...]"
};
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "get") && tokens.size() < 4) {
static std::vector<const char*> opts = { "<NAME>", "[JSON]" };

View File

@ -786,7 +786,7 @@ def tenant_list(logger):
output = run_fdbcli_command('tenant list')
assert output == '1. tenant\n 2. tenant2'
output = run_fdbcli_command('tenant list a z 1')
output = run_fdbcli_command('tenant list a z limit=1')
assert output == '1. tenant'
output = run_fdbcli_command('tenant list a tenant2')
@ -801,9 +801,15 @@ def tenant_list(logger):
output = run_fdbcli_command_and_get_error('tenant list b a')
assert output == 'ERROR: end must be larger than begin'
output = run_fdbcli_command_and_get_error('tenant list a b 12x')
output = run_fdbcli_command_and_get_error('tenant list a b limit=12x')
assert output == 'ERROR: invalid limit `12x\''
output = run_fdbcli_command_and_get_error('tenant list a b offset=13y')
assert output == 'ERROR: invalid offset `13y\''
output = run_fdbcli_command_and_get_error('tenant list a b state=14z')
assert output == 'ERROR: unrecognized tenant state(s) `14z\'.'
@enable_logging()
def tenant_get(logger):
setup_tenants(['tenant', 'tenant2 tenant_group=tenant_group2'])

View File

@ -57,11 +57,11 @@ BlobCipherMetrics::CounterSet::CounterSet(CounterCollection& cc, std::string nam
getCipherKeysLatency(name + "GetCipherKeysLatency",
UID(),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
getLatestCipherKeysLatency(name + "GetLatestCipherKeysLatency",
UID(),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE) {}
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY) {}
BlobCipherMetrics::BlobCipherMetrics()
: cc("BlobCipher"), cipherKeyCacheHit("CipherKeyCacheHit", cc), cipherKeyCacheMiss("CipherKeyCacheMiss", cc),
@ -71,15 +71,15 @@ BlobCipherMetrics::BlobCipherMetrics()
getCipherKeysLatency("GetCipherKeysLatency",
UID(),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
getLatestCipherKeysLatency("GetLatestCipherKeysLatency",
UID(),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
getBlobMetadataLatency("GetBlobMetadataLatency",
UID(),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY),
counterSets({ CounterSet(cc, "TLog"),
CounterSet(cc, "KVMemory"),
CounterSet(cc, "KVRedwood"),

View File

@ -578,7 +578,7 @@ void traceTSSErrors(const char* name, UID tssId, const std::unordered_map<int, u
Example:
GetValueLatencySSMean
*/
void traceSSOrTSSPercentiles(TraceEvent& ev, const std::string name, ContinuousSample<double>& sample) {
void traceSSOrTSSPercentiles(TraceEvent& ev, const std::string name, DDSketch<double>& sample) {
ev.detail(name + "Mean", sample.mean());
// don't log the larger percentiles unless we actually have enough samples to log the accurate percentile instead of
// the largest sample in this window
@ -595,8 +595,8 @@ void traceSSOrTSSPercentiles(TraceEvent& ev, const std::string name, ContinuousS
void traceTSSPercentiles(TraceEvent& ev,
const std::string name,
ContinuousSample<double>& ssSample,
ContinuousSample<double>& tssSample) {
DDSketch<double>& ssSample,
DDSketch<double>& tssSample) {
ASSERT(ssSample.getPopulationSize() == tssSample.getPopulationSize());
ev.detail(name + "Count", ssSample.getPopulationSize());
if (ssSample.getPopulationSize() > 0) {
@ -1534,17 +1534,16 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000),
usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(), bgGranulesPerRequest(), usedAnyChangeFeeds(false),
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), outstandingWatches(0), sharedStatePtr(nullptr),
lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0),
lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(), readLatencies(), commitLatencies(), GRVLatencies(),
mutationsPerCommit(), bytesPerCommit(), outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0),
cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
@ -1838,13 +1837,13 @@ DatabaseContext::DatabaseContext(const Error& err)
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000),
usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(), bgGranulesPerRequest(), usedAnyChangeFeeds(false),
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), sharedStatePtr(nullptr),
transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(), readLatencies(), commitLatencies(), GRVLatencies(),
mutationsPerCommit(), bytesPerCommit(), sharedStatePtr(nullptr), transactionTracingSample(false),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
// Static constructor used by server processes to create a DatabaseContext

View File

@ -955,8 +955,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT, false );
// Server request latency measurement
init( LATENCY_SAMPLE_SIZE, 100000 );
init( FILE_LATENCY_SAMPLE_SIZE, 10000 );
init( LATENCY_SKETCH_ACCURACY, 0.01 );
init( FILE_LATENCY_SKETCH_ACCURACY, 0.01 );
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
// Cluster recovery

View File

@ -70,6 +70,7 @@ std::string TenantMapEntry::tenantStateToString(TenantState tenantState) {
}
TenantState TenantMapEntry::stringToTenantState(std::string stateStr) {
std::transform(stateStr.begin(), stateStr.end(), stateStr.begin(), [](unsigned char c) { return std::tolower(c); });
if (stateStr == "registering") {
return TenantState::REGISTERING;
} else if (stateStr == "ready") {
@ -86,7 +87,7 @@ TenantState TenantMapEntry::stringToTenantState(std::string stateStr) {
return TenantState::ERROR;
}
UNREACHABLE();
throw invalid_option();
}
std::string TenantMapEntry::tenantLockStateToString(TenantLockState tenantState) {
@ -103,6 +104,7 @@ std::string TenantMapEntry::tenantLockStateToString(TenantLockState tenantState)
}
TenantLockState TenantMapEntry::stringToTenantLockState(std::string stateStr) {
std::transform(stateStr.begin(), stateStr.end(), stateStr.begin(), [](unsigned char c) { return std::tolower(c); });
if (stateStr == "unlocked") {
return TenantLockState::UNLOCKED;
} else if (stateStr == "read only") {

View File

@ -75,8 +75,8 @@ struct BlobWorkerStats {
Reference<FlowLock> resnapshotLock,
Reference<FlowLock> deltaWritesLock,
double sampleLoggingInterval,
int fileOpLatencySampleSize,
int requestLatencySampleSize)
double fileOpLatencySketchAccuracy,
double requestLatencySketchAccuracy)
: cc("BlobWorkerStats", id.toString()),
s3PutReqs("S3PutReqs", cc), s3GetReqs("S3GetReqs", cc), s3DeleteReqs("S3DeleteReqs", cc),
@ -95,10 +95,13 @@ struct BlobWorkerStats {
forceFlushCleanups("ForceFlushCleanups", cc), readDrivenCompactions("ReadDrivenCompactions", cc),
numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0), granulesPendingSplitCheck(0),
minimumCFVersion(0), cfVersionLag(0), notAtLatestChangeFeeds(0), lastResidentMemory(0),
snapshotBlobWriteLatencySample("SnapshotBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
deltaBlobWriteLatencySample("DeltaBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
reSnapshotLatencySample("GranuleResnapshotMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
readLatencySample("GranuleReadLatencyMetrics", id, sampleLoggingInterval, requestLatencySampleSize),
snapshotBlobWriteLatencySample("SnapshotBlobWriteMetrics",
id,
sampleLoggingInterval,
fileOpLatencySketchAccuracy),
deltaBlobWriteLatencySample("DeltaBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySketchAccuracy),
reSnapshotLatencySample("GranuleResnapshotMetrics", id, sampleLoggingInterval, fileOpLatencySketchAccuracy),
readLatencySample("GranuleReadLatencyMetrics", id, sampleLoggingInterval, requestLatencySketchAccuracy),
estimatedMaxResidentMemory(0), initialSnapshotLock(initialSnapshotLock), resnapshotLock(resnapshotLock),
deltaWritesLock(deltaWritesLock) {
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });

View File

@ -42,8 +42,8 @@
#include "fdbrpc/MultiInterface.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/EventTypes.actor.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/Smoother.h"
#include "fdbrpc/DDSketch.h"
class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
public:
@ -567,7 +567,7 @@ public:
Counter bgReadRowsCleared;
Counter bgReadRowsInserted;
Counter bgReadRowsUpdated;
ContinuousSample<double> bgLatencies, bgGranulesPerRequest;
DDSketch<double> bgLatencies, bgGranulesPerRequest;
// Change Feed metrics. Omit change feed metrics from logging if not used
bool usedAnyChangeFeeds;
@ -579,8 +579,7 @@ public:
Counter feedPops;
Counter feedPopsFallback;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit,
bytesPerCommit;
DDSketch<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit;
int outstandingWatches;
int maxOutstandingWatches;

View File

@ -1561,26 +1561,64 @@ Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listTenantsTransactio
int limit) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
KeyBackedRangeResult<std::pair<TenantName, TenantMapEntry>> results =
state KeyBackedRangeResult<std::pair<TenantName, TenantMapEntry>> results =
wait(ManagementClusterMetadata::tenantMetadata().tenantMap.getRange(tr, begin, end, limit));
return results.results;
}
ACTOR template <class DB>
Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listTenants(Reference<DB> db,
TenantName begin,
TenantName end,
int limit) {
Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listTenants(
Reference<DB> db,
TenantName begin,
TenantName end,
int limit,
int offset = 0,
std::vector<TenantState> filters = std::vector<TenantState>()) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
wait(listTenantsTransaction(tr, begin, end, limit));
return tenants;
if (filters.empty()) {
state std::vector<std::pair<TenantName, TenantMapEntry>> tenants;
wait(store(tenants, listTenantsTransaction(tr, begin, end, limit + offset)));
if (offset >= tenants.size()) {
tenants.clear();
} else if (offset > 0) {
tenants.erase(tenants.begin(), tenants.begin() + offset);
}
return tenants;
}
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
state KeyBackedRangeResult<std::pair<TenantName, TenantMapEntry>> results =
wait(ManagementClusterMetadata::tenantMetadata().tenantMap.getRange(
tr, begin, end, std::max(limit + offset, 100)));
state std::vector<std::pair<TenantName, TenantMapEntry>> filterResults;
state int count = 0;
loop {
for (auto pair : results.results) {
if (filters.empty() || std::count(filters.begin(), filters.end(), pair.second.tenantState)) {
++count;
if (count > offset) {
filterResults.push_back(pair);
if (count - offset == limit) {
ASSERT(count - offset == filterResults.size());
return filterResults;
}
}
}
}
if (!results.more) {
return filterResults;
}
begin = keyAfter(results.results.back().first);
wait(store(results,
ManagementClusterMetadata::tenantMetadata().tenantMap.getRange(
tr, begin, end, std::max(limit + offset, 100))));
}
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}

View File

@ -924,8 +924,8 @@ public:
std::string REDWOOD_IO_PRIORITIES;
// Server request latency measurement
int LATENCY_SAMPLE_SIZE;
int FILE_LATENCY_SAMPLE_SIZE;
double LATENCY_SKETCH_ACCURACY;
double FILE_LATENCY_SKETCH_ACCURACY;
double LATENCY_METRICS_LOGGING_INTERVAL;
// Cluster recovery

View File

@ -878,11 +878,11 @@ Peer::Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), compatible(true), outgoingConnectionIdle(true),
lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), peerReferences(-1),
bytesReceived(0), bytesSent(0), lastDataPacketSentTime(now()), outstandingReplies(0),
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedTime(0.0),
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SKETCH_ACCURACY : 0.1), lastLoggedTime(0.0),
lastLoggedBytesReceived(0), lastLoggedBytesSent(0), timeoutCount(0),
protocolVersion(Reference<AsyncVar<Optional<ProtocolVersion>>>(new AsyncVar<Optional<ProtocolVersion>>())),
connectOutgoingCount(0), connectIncomingCount(0), connectFailedCount(0),
connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1) {
connectLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SKETCH_ACCURACY : 0.1) {
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
}

View File

@ -62,15 +62,15 @@ public:
LatencySample readLatencySample = { "AsyncFileKAIOReadLatency",
UID(),
FLOW_KNOBS->KAIO_LATENCY_LOGGING_INTERVAL,
FLOW_KNOBS->KAIO_LATENCY_SAMPLE_SIZE };
FLOW_KNOBS->KAIO_LATENCY_SKETCH_ACCURACY };
LatencySample writeLatencySample = { "AsyncFileKAIOWriteLatency",
UID(),
FLOW_KNOBS->KAIO_LATENCY_LOGGING_INTERVAL,
FLOW_KNOBS->KAIO_LATENCY_SAMPLE_SIZE };
FLOW_KNOBS->KAIO_LATENCY_SKETCH_ACCURACY };
LatencySample syncLatencySample = { "AsyncFileKAIOSyncLatency",
UID(),
FLOW_KNOBS->KAIO_LATENCY_LOGGING_INTERVAL,
FLOW_KNOBS->KAIO_LATENCY_SAMPLE_SIZE };
FLOW_KNOBS->KAIO_LATENCY_SKETCH_ACCURACY };
};
static AsyncFileKAIOMetrics& getMetrics() {

View File

@ -0,0 +1,326 @@
/*
* DDSketch.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DDSKETCH_H
#define DDSKETCH_H
#include <iterator>
#include <limits>
#include <type_traits>
#pragma once
#include <vector>
#include <algorithm>
#include <cassert>
#include <cmath>
#include "flow/Error.h"
#include "flow/UnitTest.h"
// A namespace for fast log() computation.
namespace fastLogger {
// Basically, the goal is to compute log(x)/log(r).
// For double, it is represented as 2^e*(1+s) (0<=s<1), so our goal becomes
// e*log(2)/log(r)*log(1+s), and we approximate log(1+s) with a cubic function.
// See more details on Datadog's paper, or CubicallyInterpolatedMapping.java in
// https://github.com/DataDog/sketches-java/
inline const double correctingFactor = 1.00988652862227438516; // = 7 / (10 * log(2));
constexpr inline const double A = 6.0 / 35.0, B = -3.0 / 5.0, C = 10.0 / 7.0;
inline double fastlog(double value) {
int e;
double s = frexp(value, &e);
s = s * 2 - 1;
return ((A * s + B) * s + C) * s + e - 1;
}
inline double reverseLog(double index) {
long exponent = floor(index);
// Derived from Cardano's formula
double d0 = B * B - 3 * A * C;
double d1 = 2 * B * B * B - 9 * A * B * C - 27 * A * A * (index - exponent);
double p = cbrt((d1 - sqrt(d1 * d1 - 4 * d0 * d0 * d0)) / 2);
double significandPlusOne = -(B + p + d0 / p) / (3 * A) + 1;
return ldexp(significandPlusOne / 2, exponent + 1);
}
}; // namespace fastLogger
// DDSketch for non-negative numbers (those < EPS = 10^-18 are
// treated as 0, and huge numbers (>1/EPS) fail ASSERT). This is the base
// class without a concrete log() implementation.
template <class Impl, class T>
class DDSketchBase {
static constexpr T defaultMin() { return std::numeric_limits<T>::max(); }
static constexpr T defaultMax() {
if constexpr (std::is_floating_point_v<T>) {
return -std::numeric_limits<T>::max();
} else {
return std::numeric_limits<T>::min();
}
}
public:
explicit DDSketchBase(double errorGuarantee)
: errorGuarantee(errorGuarantee), populationSize(0), zeroPopulationSize(0), minValue(defaultMin()),
maxValue(defaultMax()), sum(T()) {
ASSERT(errorGuarantee > 0 && errorGuarantee < 1);
}
DDSketchBase<Impl, T>& addSample(T sample) {
// Call it addSample for now, while it is not a sample anymore
if (!populationSize)
minValue = maxValue = sample;
if (sample <= EPS) {
zeroPopulationSize++;
} else {
size_t index = static_cast<Impl*>(this)->getIndex(sample);
ASSERT(index >= 0 && index < buckets.size());
try {
buckets.at(index)++;
} catch (std::out_of_range const& e) {
fmt::print(stderr,
"ERROR: Invalid DDSketch bucket index ({}) at {}/{} for sample: {}\n",
e.what(),
index,
buckets.size(),
sample);
}
}
populationSize++;
sum += sample;
maxValue = std::max(maxValue, sample);
minValue = std::min(minValue, sample);
return *this;
}
double mean() const {
if (populationSize == 0)
return 0;
return (double)sum / populationSize;
}
T median() { return percentile(0.5); }
T percentile(double percentile) {
ASSERT(percentile >= 0 && percentile <= 1);
if (populationSize == 0)
return T();
uint64_t targetPercentilePopulation = percentile * (populationSize - 1);
// Now find the tPP-th (0-indexed) element
if (targetPercentilePopulation < zeroPopulationSize)
return T(0);
size_t index = 0;
[[maybe_unused]] bool found = false;
if (percentile <= 0.5) { // count up
uint64_t count = zeroPopulationSize;
for (size_t i = 0; i < buckets.size(); i++) {
if (targetPercentilePopulation < count + buckets[i]) {
// count + buckets[i] = # of numbers so far (from the rightmost to
// this bucket, inclusive), so if target is in this bucket, it should
// means tPP < cnt + bck[i]
found = true;
index = i;
break;
}
count += buckets[i];
}
} else { // and count down
uint64_t count = 0;
for (auto rit = buckets.rbegin(); rit != buckets.rend(); rit++) {
if (targetPercentilePopulation + count + *rit >= populationSize) {
// cnt + bkt[i] is # of numbers to the right of this bucket (incl.),
// so if target is not in this bucket (i.e., to the left of this
// bucket), it would be as right as the left bucket's rightmost
// number, so we would have tPP + cnt + bkt[i] < total population (tPP
// is 0-indexed), that means target is in this bucket if this
// condition is not satisfied.
found = true;
index = std::distance(rit, buckets.rend()) - 1;
break;
}
count += *rit;
}
}
ASSERT(found);
if (!found)
return -1;
return static_cast<Impl*>(this)->getValue(index);
}
T min() const { return minValue; }
T max() const { return maxValue; }
void clear() {
std::fill(buckets.begin(), buckets.end(), 0);
populationSize = zeroPopulationSize = 0;
sum = 0;
minValue = defaultMin();
maxValue = defaultMax();
}
uint64_t getPopulationSize() const { return populationSize; }
double getErrorGuarantee() const { return errorGuarantee; }
size_t getBucketSize() const { return buckets.size(); }
DDSketchBase<Impl, T>& mergeWith(const DDSketchBase<Impl, T>& anotherSketch) {
// Must have the same guarantee
ASSERT(fabs(errorGuarantee - anotherSketch.errorGuarantee) < EPS &&
anotherSketch.buckets.size() == buckets.size());
for (size_t i = 0; i < anotherSketch.buckets.size(); i++) {
buckets[i] += anotherSketch.buckets[i];
}
populationSize += anotherSketch.populationSize;
zeroPopulationSize += anotherSketch.zeroPopulationSize;
minValue = std::min(minValue, anotherSketch.minValue);
maxValue = std::max(maxValue, anotherSketch.maxValue);
sum += anotherSketch.sum;
return *this;
}
constexpr static double EPS = 1e-18; // smaller numbers are considered as 0
protected:
double errorGuarantee; // As defined in the paper
uint64_t populationSize, zeroPopulationSize; // we need to separately count 0s
std::vector<uint64_t> buckets;
T minValue, maxValue, sum;
void setBucketSize(size_t capacity) { buckets.resize(capacity, 0); }
};
// DDSketch with fast log implementation for float numbers
template <class T>
class DDSketch : public DDSketchBase<DDSketch<T>, T> {
public:
explicit DDSketch(double errorGuarantee = 0.01)
: DDSketchBase<DDSketch<T>, T>(errorGuarantee), gamma((1.0 + errorGuarantee) / (1.0 - errorGuarantee)),
multiplier(fastLogger::correctingFactor * log(2) / log(gamma)) {
ASSERT(errorGuarantee > 0);
offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS);
ASSERT(offset > 0);
this->setBucketSize(2 * offset);
}
size_t getIndex(T sample) {
static_assert(__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__, "Do not support non-little-endian systems");
return ceil(fastLogger::fastlog(sample) * multiplier) + offset;
}
T getValue(size_t index) { return fastLogger::reverseLog((index - offset) / multiplier) * 2.0 / (1 + gamma); }
private:
double gamma, multiplier;
size_t offset = 0;
};
// DDSketch with <cmath> log. Slow and only use this when others doesn't work.
template <class T>
class DDSketchSlow : public DDSketchBase<DDSketchSlow<T>, T> {
public:
DDSketchSlow(double errorGuarantee = 0.1)
: DDSketchBase<DDSketchSlow<T>, T>(errorGuarantee), gamma((1.0 + errorGuarantee) / (1.0 - errorGuarantee)),
logGamma(log(gamma)) {
offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS) + 5;
this->setBucketSize(2 * offset);
}
size_t getIndex(T sample) { return ceil(log(sample) / logGamma) + offset; }
T getValue(size_t index) { return (T)(2.0 * pow(gamma, (index - offset)) / (1 + gamma)); }
private:
double gamma, logGamma;
size_t offset = 0;
};
// DDSketch for unsigned int. Faster than the float version. Fixed accuracy.
class DDSketchFastUnsigned : public DDSketchBase<DDSketchFastUnsigned, unsigned> {
public:
DDSketchFastUnsigned() : DDSketchBase<DDSketchFastUnsigned, unsigned>(errorGuarantee) { this->setBucketSize(129); }
size_t getIndex(unsigned sample) {
__uint128_t v = sample;
v *= v;
v *= v; // sample^4
uint64_t low = (uint64_t)v, high = (uint64_t)(v >> 64);
return 128 - (high == 0 ? ((low == 0 ? 64 : __builtin_clzll(low)) + 64) : __builtin_clzll(high));
}
unsigned getValue(size_t index) {
double r = 1, g = gamma;
while (index) { // quick power method for power(gamma, index)
if (index & 1)
r *= g;
g *= g;
index >>= 1;
}
// 2.0 * pow(gamma, index) / (1 + gamma) is what we need
return (unsigned)(2.0 * r / (1 + gamma) + 0.5); // round to nearest int
}
private:
constexpr static double errorGuarantee = 0.08642723372;
// getIndex basically calc floor(log_2(x^4)) + 1,
// which is almost ceil(log_2(x^4)) as it only matters when x is a power of 2,
// and it does not change the error bound. Original sketch asks for
// ceil(log_r(x)), so we know r = pow(2, 1/4) = 1.189207115. And r = (1 + eG)
// / (1 - eG) so eG = 0.08642723372.
constexpr static double gamma = 1.189207115;
};
#endif
TEST_CASE("/fdbrpc/ddsketch/accuracy") {
int TRY = 100, SIZE = 1e6;
const int totalPercentiles = 7;
double targetPercentiles[totalPercentiles] = { .0001, .01, .1, .50, .90, .99, .9999 };
double stat[totalPercentiles] = { 0 };
for (int t = 0; t < TRY; t++) {
DDSketch<double> dd;
std::vector<double> nums;
for (int i = 0; i < SIZE; i++) {
static double a = 1, b = 1; // a skewed distribution
auto y = deterministicRandom()->random01();
auto num = b / pow(1 - y, 1 / a);
nums.push_back(num);
dd.addSample(num);
}
std::sort(nums.begin(), nums.end());
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
double percentile = targetPercentiles[percentID];
double ground = nums[percentile * (SIZE - 1)], ddvalue = dd.percentile(percentile);
double relativeError = fabs(ground - ddvalue) / ground;
stat[percentID] += relativeError;
}
}
for (int percentID = 0; percentID < totalPercentiles; percentID++) {
printf("%.4lf per, relative error %.4lf\n", targetPercentiles[percentID], stat[percentID] / TRY);
}
return Void();
}

View File

@ -24,7 +24,7 @@
#include <algorithm>
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbrpc/HealthMonitor.h"
#include "flow/genericactors.actor.h"
#include "flow/network.h"
@ -159,7 +159,7 @@ struct Peer : public ReferenceCounted<Peer> {
int64_t bytesSent;
double lastDataPacketSentTime;
int outstandingReplies;
ContinuousSample<double> pingLatencies;
DDSketch<double> pingLatencies;
double lastLoggedTime;
int64_t lastLoggedBytesReceived;
int64_t lastLoggedBytesSent;
@ -171,7 +171,7 @@ struct Peer : public ReferenceCounted<Peer> {
int connectOutgoingCount;
int connectIncomingCount;
int connectFailedCount;
ContinuousSample<double> connectLatencies;
DDSketch<double> connectLatencies;
Promise<Void> disconnect;
explicit Peer(TransportData* transport, NetworkAddress const& destination);

View File

@ -38,7 +38,7 @@ MyCounters() : foo("foo", cc), bar("bar", cc), baz("baz", cc) {}
#include <cstddef>
#include "flow/flow.h"
#include "flow/TDMetric.actor.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
struct ICounter {
// All counters have a name and value
@ -216,40 +216,44 @@ public:
class LatencySample {
public:
LatencySample(std::string name, UID id, double loggingInterval, int sampleSize)
: name(name), id(id), sampleStart(now()), sample(sampleSize),
LatencySample(std::string name, UID id, double loggingInterval, double accuracy)
: name(name), id(id), sampleStart(now()), sketch(accuracy),
latencySampleEventHolder(makeReference<EventCacheHolder>(id.toString() + "/" + name)) {
assert(accuracy > 0);
if (accuracy <= 0) {
fmt::print(stderr, "ERROR: LatencySample {} has invalid accuracy ({})", name, accuracy);
}
logger = recurring([this]() { logSample(); }, loggingInterval);
}
void addMeasurement(double measurement) { sample.addSample(measurement); }
void addMeasurement(double measurement) { sketch.addSample(measurement); }
private:
std::string name;
UID id;
double sampleStart;
ContinuousSample<double> sample;
DDSketch<double> sketch;
Future<Void> logger;
Reference<EventCacheHolder> latencySampleEventHolder;
void logSample() {
TraceEvent(name.c_str(), id)
.detail("Count", sample.getPopulationSize())
.detail("Count", sketch.getPopulationSize())
.detail("Elapsed", now() - sampleStart)
.detail("Min", sample.min())
.detail("Max", sample.max())
.detail("Mean", sample.mean())
.detail("Median", sample.median())
.detail("P25", sample.percentile(0.25))
.detail("P90", sample.percentile(0.9))
.detail("P95", sample.percentile(0.95))
.detail("P99", sample.percentile(0.99))
.detail("P99.9", sample.percentile(0.999))
.detail("Min", sketch.min())
.detail("Max", sketch.max())
.detail("Mean", sketch.mean())
.detail("Median", sketch.median())
.detail("P25", sketch.percentile(0.25))
.detail("P90", sketch.percentile(0.9))
.detail("P95", sketch.percentile(0.95))
.detail("P99", sketch.percentile(0.99))
.detail("P99.9", sketch.percentile(0.999))
.trackLatest(latencySampleEventHolder->trackingKey);
sample.clear();
sketch.clear();
sampleStart = now();
}
};

View File

@ -25,7 +25,6 @@
#ifndef FDBRPC_TSS_COMPARISON_H
#define FDBRPC_TSS_COMPARISON_H
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/Stats.h"
// refcounted + noncopyable because both DatabaseContext and individual endpoints share ownership
@ -48,15 +47,15 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
Counter mismatches;
// We could probably just ignore getKey as it's seldom used?
ContinuousSample<double> SSgetValueLatency;
ContinuousSample<double> SSgetKeyLatency;
ContinuousSample<double> SSgetKeyValuesLatency;
ContinuousSample<double> SSgetMappedKeyValuesLatency;
DDSketch<double> SSgetValueLatency;
DDSketch<double> SSgetKeyLatency;
DDSketch<double> SSgetKeyValuesLatency;
DDSketch<double> SSgetMappedKeyValuesLatency;
ContinuousSample<double> TSSgetValueLatency;
ContinuousSample<double> TSSgetKeyLatency;
ContinuousSample<double> TSSgetKeyValuesLatency;
ContinuousSample<double> TSSgetMappedKeyValuesLatency;
DDSketch<double> TSSgetValueLatency;
DDSketch<double> TSSgetKeyLatency;
DDSketch<double> TSSgetKeyValuesLatency;
DDSketch<double> TSSgetMappedKeyValuesLatency;
std::unordered_map<int, uint64_t> ssErrorsByCode;
std::unordered_map<int, uint64_t> tssErrorsByCode;
@ -106,9 +105,9 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
TSSMetrics()
: cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc),
ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc),
mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000), SSgetKeyValuesLatency(1000),
SSgetMappedKeyValuesLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000),
TSSgetKeyValuesLatency(1000), TSSgetMappedKeyValuesLatency(1000) {}
mismatches("Mismatches", cc), SSgetValueLatency(), SSgetKeyLatency(), SSgetKeyValuesLatency(),
SSgetMappedKeyValuesLatency(), TSSgetValueLatency(), TSSgetKeyLatency(), TSSgetKeyValuesLatency(),
TSSgetMappedKeyValuesLatency() {}
};
template <class Rep>

View File

@ -305,8 +305,8 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
resnapshotLock,
deltaWritesLock,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->FILE_LATENCY_SAMPLE_SIZE,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->FILE_LATENCY_SKETCH_ACCURACY,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
isEncryptionEnabled(isEncryptionOpSupported(EncryptOperationType::BLOB_GRANULE_ENCRYPTION)) {}
bool managerEpochOk(int64_t epoch) {

View File

@ -241,15 +241,15 @@ public:
kmsLookupByIdsReqLatency("EKPKmsLookupByIdsReqLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
kmsLookupByDomainIdsReqLatency("EKPKmsLookupByDomainIdsReqLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
kmsBlobMetadataReqLatency("EKPKmsBlobMetadataReqLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE) {}
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY) {}
EncryptBaseCipherDomainIdKeyIdCacheKey getBaseCipherDomainIdKeyIdCacheKey(
const EncryptCipherDomainId domainId,

View File

@ -117,20 +117,20 @@ struct GrvProxyStats {
defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencySample("GRVLatencyMetrics",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
grvBatchLatencySample("GRVBatchLatencyMetrics",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
grvConfirmEpochLiveDist(
@ -215,7 +215,7 @@ struct GrvProxyData {
versionVectorSizeOnGRVReply("VersionVectorSizeOnGRVReply",
dbgid,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
updateCommitRequests(0), lastCommitTime(0), version(0), minKnownCommittedVersion(invalidVersion),
tagThrottler(SERVER_KNOBS->PROXY_MAX_TAG_THROTTLE_DURATION) {}
};

View File

@ -111,15 +111,15 @@ SharedRocksDBState::SharedRocksDBState(UID id)
readOptions(initialReadOptions()), commitLatency(LatencySample("RocksDBCommitLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE)),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)),
commitQueueLatency(LatencySample("RocksDBCommitQueueLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE)),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)),
dbWriteLatency(LatencySample("RocksDBWriteLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE)) {}
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)) {}
rocksdb::ColumnFamilyOptions SharedRocksDBState::initialCfOptions() {
rocksdb::ColumnFamilyOptions options;

View File

@ -28,6 +28,7 @@
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/Histogram.h"
#include "flow/Trace.h"
#include "flow/network.h"
#include "flow/DebugTrace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -448,6 +449,14 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
state int sequence = -1;
state UID peekId;
DebugLogTraceEvent("LogRouterPeek0", self->dbgid)
.detail("ReturnIfBlocked", reqReturnIfBlocked)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Ver", self->version.get())
.detail("Begin", reqBegin);
if (reqSequence.present()) {
try {
peekId = reqSequence.get().first;
@ -481,6 +490,13 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
reqOnlySpilled = prevPeekData.second;
wait(yield());
} catch (Error& e) {
DebugLogTraceEvent("LogRouterPeekError", self->dbgid)
.error(e)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Begin", reqBegin);
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
replyPromise.sendError(e);
return Void();
@ -490,12 +506,6 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
}
}
DebugLogTraceEvent("LogRouterPeek0", self->dbgid)
.detail("ReturnIfBlocked", reqReturnIfBlocked)
.detail("Tag", reqTag.toString())
.detail("Ver", self->version.get())
.detail("Begin", reqBegin);
if (reqReturnIfBlocked && self->version.get() < reqBegin) {
replyPromise.sendError(end_of_stream());
if (reqSequence.present()) {
@ -528,19 +538,22 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid)
.detail("Begin", reqBegin)
.detail("Popped", poppedVer)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Start", self->startVersion);
if (std::is_same<PromiseType, Promise<TLogPeekReply>>::value) {
// kills logRouterPeekStream actor, otherwise that actor becomes stuck
throw operation_obsolete();
}
replyPromise.send(Never());
if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
if (std::is_same<PromiseType, ReplyPromise<TLogPeekReply>>::value) {
// Send error to avoid a race condition that the peer is really retrying,
// otherwise, the peer could be blocked forever.
replyPromise.sendError(operation_obsolete());
} else {
replyPromise.send(Never());
}
return Void();
}
@ -681,6 +694,7 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
if (!tagData) {
tagData = self->createTagData(req.tag, req.to, req.durableKnownCommittedVersion);
} else if (req.to > tagData->popped) {
DebugLogTraceEvent("LogRouterPop", self->dbgid).detail("Tag", req.tag.toString()).detail("PopVersion", req.to);
tagData->popped = req.to;
tagData->durableKnownCommittedVersion = req.durableKnownCommittedVersion;
wait(tagData->eraseMessagesBefore(req.to, self, TaskPriority::TLogPop));

View File

@ -62,6 +62,8 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf
this->results.minKnownCommittedVersion = 0;
DebugLogTraceEvent(SevDebug, "SPC_Starting", randomID)
.detail("Tag", tag.toString())
.detail("Parallel", parallelGetMore)
.detail("Interf", interf && interf->get().present() ? interf->get().id() : UID())
.detail("UsePeekStream", usePeekStream)
.detail("Begin", begin)
.detail("End", end);
@ -111,7 +113,9 @@ bool ILogSystem::ServerPeekCursor::hasMessage() const {
}
void ILogSystem::ServerPeekCursor::nextMessage() {
//TraceEvent("SPC_NextMessage", randomID).detail("MessageVersion", messageVersion.toString());
DebugLogTraceEvent("SPC_NextMessage", randomID)
.detail("Tag", tag.toString())
.detail("MessageVersion", messageVersion.toString());
ASSERT(hasMsg);
if (rd.empty()) {
messageVersion.reset(std::min(results.end, end.version));
@ -143,11 +147,13 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
rd.rewind();
rd.readBytes(messageAndTags.getHeaderSize());
hasMsg = true;
//TraceEvent("SPC_NextMessageB", randomID).detail("MessageVersion", messageVersion.toString());
DebugLogTraceEvent("SPC_NextMessageB", randomID)
.detail("Tag", tag.toString())
.detail("MessageVersion", messageVersion.toString());
}
StringRef ILogSystem::ServerPeekCursor::getMessage() {
//TraceEvent("SPC_GetMessage", randomID);
DebugLogTraceEvent("SPC_GetMessage", randomID).detail("Tag", tag.toString());
StringRef message = messageAndTags.getMessageWithoutTags();
rd.readBytes(message.size()); // Consumes the message.
return message;
@ -260,6 +266,14 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
}
loop {
DebugLogTraceEvent("SPC_GetMoreP", self->randomID)
.detail("Tag", self->tag.toString())
.detail("Has", self->hasMessage())
.detail("Begin", self->messageVersion.version)
.detail("Parallel", self->parallelGetMore)
.detail("Seq", self->sequence)
.detail("Sizes", self->futureResults.size())
.detail("Interf", self->interf->get().present() ? self->interf->get().id() : UID());
state Version expectedBegin = self->messageVersion.version;
try {
if (self->parallelGetMore || self->onlySpilled) {
@ -294,7 +308,12 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
expectedBegin = res.end;
self->futureResults.pop_front();
updateCursorWithReply(self, res);
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
DebugLogTraceEvent("SPC_GetMoreReply", self->randomID)
.detail("Has", self->hasMessage())
.detail("Tag", self->tag.toString())
.detail("End", res.end)
.detail("Size", self->futureResults.size())
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when(wait(self->interfaceChanged)) {
@ -306,11 +325,17 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
}
}
} catch (Error& e) {
DebugLogTraceEvent("PeekCursorError", self->randomID)
.error(e)
.detail("Tag", self->tag.toString())
.detail("Begin", self->messageVersion.version)
.detail("Interf", self->interf->get().present() ? self->interf->get().id() : UID());
if (e.code() == error_code_end_of_stream) {
self->end.reset(self->messageVersion.version);
return Void();
} else if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
TraceEvent("PeekCursorTimedOut", self->randomID).error(e);
TraceEvent ev("PeekCursorTimedOut", self->randomID);
// We *should* never get timed_out(), as it means the TLog got stuck while handling a parallel peek,
// and thus we've likely just wasted 10min.
// timed_out() is sent by cleanupPeekTrackers as value PEEK_TRACKER_EXPIRATION_TIME
@ -326,6 +351,11 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;
self->futureResults.clear();
ev.error(e)
.detail("Tag", self->tag.toString())
.detail("Begin", self->messageVersion.version)
.detail("NewID", self->randomID)
.detail("Interf", self->interf->get().present() ? self->interf->get().id() : UID());
} else {
throw e;
}
@ -415,7 +445,11 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri
taskID))
: Never())) {
updateCursorWithReply(self, res);
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
DebugLogTraceEvent("SPC_GetMoreB", self->randomID)
.detail("Tag", self->tag.toString())
.detail("Has", self->hasMessage())
.detail("End", res.end)
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when(wait(self->interf->onChange())) { self->onlySpilled = false; }
@ -431,11 +465,13 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri
}
Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
// TraceEvent("SPC_GetMore", randomID)
// .detail("HasMessage", hasMessage())
// .detail("More", !more.isValid() || more.isReady())
// .detail("MessageVersion", messageVersion.toString())
// .detail("End", end.toString());
DebugLogTraceEvent("SPC_GetMore", randomID)
.detail("Tag", tag.toString())
.detail("HasMessage", hasMessage())
.detail("More", !more.isValid() || more.isReady())
.detail("Parallel", parallelGetMore)
.detail("MessageVersion", messageVersion.toString())
.detail("End", end.toString());
if (hasMessage() && !parallelGetMore)
return Void();
if (!more.isValid() || more.isReady()) {

View File

@ -1824,8 +1824,11 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
if (logData->blockingPeekLatencies.find(reqTag) == logData->blockingPeekLatencies.end()) {
UID ssID = nondeterministicRandom()->randomUniqueID();
std::string s = "BlockingPeekLatencies-" + reqTag.toString();
logData->blockingPeekLatencies.try_emplace(
reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE);
logData->blockingPeekLatencies.try_emplace(reqTag,
s,
ssID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY);
}
LatencySample& sample = logData->blockingPeekLatencies.at(reqTag);
sample.addMeasurement(latency);
@ -2184,6 +2187,9 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
if (logData->logSystem->get() &&
(!logData->isPrimary || logData->logRouterPoppedVersion < logData->logRouterPopToVersion)) {
logData->logRouterPoppedVersion = ver;
DebugLogTraceEvent("LogPop", self->dbgid)
.detail("Tag", logData->remoteTag.toString())
.detail("Version", knownCommittedVersion);
logData->logSystem->get()->pop(ver, logData->remoteTag, knownCommittedVersion, logData->locality);
}

View File

@ -21,7 +21,7 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tuple.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/DeltaTree.h"
#include "fdbserver/IKeyValueStore.h"

View File

@ -121,20 +121,20 @@ struct ProxyStats {
commitLatencySample("CommitLatencyMetrics",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
commitBatchingEmptyMessageRatio("CommitBatchingEmptyMessageRatio",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
commitBatchingWindowSize("CommitBatchingWindowSize",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
computeLatency("ComputeLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
maxComputeNS(0), minComputeNS(1e12),
commitBatchQueuingDist(
Histogram::getHistogram("CommitProxy"_sr, "CommitBatchQueuing"_sr, Histogram::Unit::microseconds)),

View File

@ -25,6 +25,7 @@
#elif !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_H)
#define FDBSERVER_READWRITEWORKLOAD_ACTOR_H
#include "fdbrpc/DDSketch.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/TDMetric.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -46,7 +47,7 @@ DESCR struct ReadMetric {
// Common ReadWrite test settings
struct ReadWriteCommon : KVWorkload {
static constexpr int sampleSize = 10000;
static constexpr double sampleError = 0.01;
friend struct ReadWriteCommonImpl;
// general test setting
@ -75,7 +76,7 @@ struct ReadWriteCommon : KVWorkload {
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
EventMetricHandle<ReadMetric> readMetric;
PerfIntCounter aTransactions, bTransactions, retries;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
DDSketch<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
double readLatencyTotal;
int readLatencyCount;
std::vector<PerfMetric> periodicMetrics;
@ -87,9 +88,9 @@ struct ReadWriteCommon : KVWorkload {
explicit ReadWriteCommon(WorkloadContext const& wcx)
: KVWorkload(wcx), totalReadsMetric("ReadWrite.TotalReads"_sr), totalRetriesMetric("ReadWrite.TotalRetries"_sr),
aTransactions("A Transactions"), bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize),
readLatencies(sampleSize), commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize),
readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), clientBegin(0) {
aTransactions("A Transactions"), bTransactions("B Transactions"), retries("Retries"), latencies(sampleError),
readLatencies(sampleError), commitLatencies(sampleError), GRVLatencies(sampleError),
fullReadLatencies(sampleError), readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), clientBegin(0) {
transactionSuccessMetric.init("ReadWrite.SuccessfulTransaction"_sr);
transactionFailureMetric.init("ReadWrite.FailedTransaction"_sr);

View File

@ -102,17 +102,17 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
versionVectorTagUpdates("VersionVectorTagUpdates",
dbgid,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
waitForPrevCommitRequests("WaitForPrevCommitRequests", cc),
nonWaitForPrevCommitRequests("NonWaitForPrevCommitRequests", cc),
versionVectorSizeOnCVReply("VersionVectorSizeOnCVReply",
dbgid,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
waitForPrevLatencies("WaitForPrevLatencies",
dbgid,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
addActor(addActor) {
logger = cc.traceCounters("MasterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, "MasterMetrics");
if (forceRecovery && !myInterface.locality.dcId().present()) {

View File

@ -1273,48 +1273,48 @@ public:
readLatencySample("ReadLatencyMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
readKeyLatencySample("GetKeyMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
readValueLatencySample("GetValueMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
readRangeLatencySample("GetRangeMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
readVersionWaitSample("ReadVersionWaitMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
readQueueWaitSample("ReadQueueWaitMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
mappedRangeSample("GetMappedRangeMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
mappedRangeRemoteSample("GetMappedRangeRemoteMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
mappedRangeLocalSample("GetMappedRangeLocalMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
kvReadRangeLatencySample("KVGetRangeMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY),
updateLatencySample("UpdateLatencyMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE) {
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY) {
specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; });
specialCounter(cc, "Version", [self]() { return self->version.get(); });
specialCounter(cc, "StorageVersion", [self]() { return self->storageVersion(); });

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -34,11 +34,10 @@ struct BulkLoadWorkload : TestWorkload {
std::vector<Future<Void>> clients;
PerfIntCounter transactions, retries;
ContinuousSample<double> latencies;
DDSketch<double> latencies;
BulkLoadWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), clientCount(wcx.clientCount), transactions("Transactions"), retries("Retries"),
latencies(2000) {
: TestWorkload(wcx), clientCount(wcx.clientCount), transactions("Transactions"), retries("Retries"), latencies() {
testDuration = getOption(options, "testDuration"_sr, 10.0);
actorCount = getOption(options, "actorCount"_sr, 20);
writesPerTransaction = getOption(options, "writesPerTransaction"_sr, 10);

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -33,10 +33,10 @@ struct DDBalanceWorkload : TestWorkload {
std::vector<Future<Void>> clients;
PerfIntCounter bin_shifts, operations, retries;
ContinuousSample<double> latencies;
DDSketch<double> latencies;
DDBalanceWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), bin_shifts("Bin_Shifts"), operations("Operations"), retries("Retries"), latencies(2000) {
: TestWorkload(wcx), bin_shifts("Bin_Shifts"), operations("Operations"), retries("Retries"), latencies() {
testDuration = getOption(options, "testDuration"_sr, 10.0);
binCount = getOption(options, "binCount"_sr, 1000);
writesPerTransaction = getOption(options, "writesPerTransaction"_sr, 1);

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbclient/ReadYourWrites.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -33,8 +33,8 @@ struct FileSystemWorkload : TestWorkload {
std::vector<Future<Void>> clients;
PerfIntCounter queries, writes;
ContinuousSample<double> latencies;
ContinuousSample<double> writeLatencies;
DDSketch<double> latencies;
DDSketch<double> writeLatencies;
class FileSystemOp {
public:
@ -44,7 +44,7 @@ struct FileSystemWorkload : TestWorkload {
};
FileSystemWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), queries("Queries"), writes("Latency"), latencies(2500), writeLatencies(1000) {
: TestWorkload(wcx), queries("Queries"), writes("Latency"), latencies(), writeLatencies() {
testDuration = getOption(options, "testDuration"_sr, 10.0);
transactionsPerSecond = getOption(options, "transactionsPerSecond"_sr, 5000.0) / clientCount;
double allowedLatency = getOption(options, "allowedLatency"_sr, 0.250);

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/IKnobCollection.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"

View File

@ -63,7 +63,7 @@ struct MakoWorkload : TestWorkload {
// used for periodically tracing
std::vector<PerfMetric> periodicMetrics;
// store latency of each operation with sampling
std::vector<ContinuousSample<double>> opLatencies;
std::vector<DDSketch<double>> opLatencies;
// key used to store checkSum for given key range
std::vector<Key> csKeys;
// key prefix of for all generated keys
@ -142,7 +142,7 @@ struct MakoWorkload : TestWorkload {
parseOperationsSpec();
for (int i = 0; i < MAX_OP; ++i) {
// initilize per-operation latency record
opLatencies.push_back(ContinuousSample<double>(rowCount / sampleSize));
opLatencies.push_back(DDSketch<double>());
// initialize per-operation counter
opCounters.push_back(PerfIntCounter(opNames[i]));
}
@ -658,7 +658,7 @@ struct MakoWorkload : TestWorkload {
return Void();
}
ACTOR template <class T>
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies) {
static Future<Void> logLatency(Future<T> f, DDSketch<double>* opLatencies) {
state double opBegin = timer();
wait(success(f));
opLatencies->addSample(timer() - opBegin);

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "flow/DeterministicRandom.h"

View File

@ -392,6 +392,35 @@ struct MetaclusterManagementWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> verifyListFilter(MetaclusterManagementWorkload* self, TenantName tenant) {
try {
state TenantMapEntry checkEntry = wait(MetaclusterAPI::getTenant(self->managementDb, tenant));
state TenantState checkState = checkEntry.tenantState;
state std::vector<TenantState> filters;
filters.push_back(checkState);
state std::vector<std::pair<TenantName, TenantMapEntry>> tenantList;
// Possible to have changed state between now and the getTenant call above
state TenantMapEntry checkEntry2;
wait(store(checkEntry2, MetaclusterAPI::getTenant(self->managementDb, tenant)) &&
store(tenantList,
MetaclusterAPI::listTenants(self->managementDb, ""_sr, "\xff\xff"_sr, 10e6, 0, filters)));
bool found = false;
for (auto pair : tenantList) {
ASSERT(pair.second.tenantState == checkState);
if (pair.first == tenant) {
found = true;
}
}
ASSERT(found || checkEntry2.tenantState != checkState);
} catch (Error& e) {
if (e.code() != error_code_tenant_not_found) {
TraceEvent(SevError, "VerifyListFilterFailure").error(e).detail("Tenant", tenant);
throw;
}
}
return Void();
}
ACTOR static Future<Void> createTenant(MetaclusterManagementWorkload* self) {
state TenantName tenant = self->chooseTenantName();
state Optional<TenantGroupName> tenantGroup = self->chooseTenantGroup();
@ -433,6 +462,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
break;
} else {
retried = true;
wait(verifyListFilter(self, tenant));
}
} catch (Error& e) {
if (e.code() == error_code_tenant_already_exists && retried && !exists) {
@ -533,6 +563,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
break;
} else {
retried = true;
wait(verifyListFilter(self, tenant));
}
} catch (Error& e) {
if (e.code() == error_code_tenant_not_found && retried && exists) {
@ -622,6 +653,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
if (result.present()) {
break;
}
wait(verifyListFilter(self, tenant));
}
ASSERT(exists);
@ -716,6 +748,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
}
retried = true;
wait(verifyListFilter(self, tenant));
wait(verifyListFilter(self, newTenantName));
} catch (Error& e) {
// If we retry the rename after it had succeeded, we will get an error that we should ignore
if (e.code() == error_code_tenant_not_found && exists && !newTenantExists && retried) {

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "flow/TDMetric.actor.h"

View File

@ -19,7 +19,7 @@
*/
#include <vector>
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -38,10 +38,10 @@ struct QueuePushWorkload : TestWorkload {
std::vector<Future<Void>> clients;
PerfIntCounter transactions, retries;
ContinuousSample<double> commitLatencies, GRVLatencies;
DDSketch<double> commitLatencies, GRVLatencies;
QueuePushWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), transactions("Transactions"), retries("Retries"), commitLatencies(2000), GRVLatencies(2000) {
: TestWorkload(wcx), transactions("Transactions"), retries("Retries"), commitLatencies(), GRVLatencies() {
testDuration = getOption(options, "testDuration"_sr, 10.0);
actorCount = getOption(options, "actorCount"_sr, 50);

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbclient/ReadYourWrites.h"

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbclient/ReadYourWrites.h"

View File

@ -25,8 +25,6 @@
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
static constexpr int SAMPLE_SIZE = 10000;
// If the log->storage propagation delay is longer than 1 second, then it's likely that our read
// will see a `future_version` error from the storage server. We need to retry the read until
// a value is returned, or a different error is thrown.
@ -51,9 +49,9 @@ struct ReadAfterWriteWorkload : KVWorkload {
static constexpr auto NAME = "ReadAfterWrite";
double testDuration;
ContinuousSample<double> propagationLatency;
DDSketch<double> propagationLatency;
ReadAfterWriteWorkload(WorkloadContext const& wcx) : KVWorkload(wcx), propagationLatency(SAMPLE_SIZE) {
ReadAfterWriteWorkload(WorkloadContext const& wcx) : KVWorkload(wcx), propagationLatency() {
testDuration = getOption(options, "testDuration"_sr, 10.0);
}

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"

View File

@ -23,7 +23,7 @@
#include <vector>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -200,7 +200,7 @@ struct ReadWriteCommonImpl {
}
}
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f,
ContinuousSample<double>* latencies,
DDSketch<double>* latencies,
double* totalLatency,
int* latencyCount,
EventMetricHandle<ReadMetric> readMetric,
@ -220,7 +220,7 @@ struct ReadWriteCommonImpl {
return Void();
}
ACTOR static Future<Void> logLatency(Future<RangeResult> f,
ContinuousSample<double>* latencies,
DDSketch<double>* latencies,
double* totalLatency,
int* latencyCount,
EventMetricHandle<ReadMetric> readMetric,

View File

@ -22,7 +22,7 @@
#include <utility>
#include <vector>
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -389,4 +389,4 @@ TEST_CASE("/KVWorkload/methods/ParseKeyForIndex") {
ASSERT(parse == idx);
}
return Void();
}
}

View File

@ -23,7 +23,6 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/SimpleIni.h"
#include "fdbserver/Status.actor.h"
#include "fdbserver/TesterInterface.actor.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -37,11 +37,11 @@ struct StreamingReadWorkload : TestWorkload {
std::vector<Future<Void>> clients;
PerfIntCounter transactions, readKeys;
PerfIntCounter readValueBytes;
ContinuousSample<double> latencies;
DDSketch<double> latencies;
StreamingReadWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), transactions("Transactions"), readKeys("Keys Read"), readValueBytes("Value Bytes Read"),
latencies(2000) {
latencies() {
testDuration = getOption(options, "testDuration"_sr, 10.0);
actorCount = getOption(options, "actorCount"_sr, 20);
readsPerTransaction = getOption(options, "readsPerTransaction"_sr, 10);

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -189,12 +189,11 @@ struct MeasureSinglePeriod : IMeasurer {
double delay, duration;
double startT;
ContinuousSample<double> totalLatency, grvLatency, rowReadLatency, commitLatency;
DDSketch<double> totalLatency, grvLatency, rowReadLatency, commitLatency;
ITransactor::Stats stats; // totalled over the period
MeasureSinglePeriod(double delay, double duration)
: delay(delay), duration(duration), totalLatency(2000), grvLatency(2000), rowReadLatency(2000),
commitLatency(2000) {}
: delay(delay), duration(duration), totalLatency(), grvLatency(), rowReadLatency(), commitLatency() {}
Future<Void> start() override {
startT = now();

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"

View File

@ -18,15 +18,13 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "flow/DeterministicRandom.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const int sampleSize = 10000;
struct WatchesWorkload : TestWorkload {
static constexpr auto NAME = "Watches";
@ -34,10 +32,10 @@ struct WatchesWorkload : TestWorkload {
double testDuration;
std::vector<Future<Void>> clients;
PerfIntCounter cycles;
ContinuousSample<double> cycleLatencies;
DDSketch<double> cycleLatencies;
std::vector<int> nodeOrder;
WatchesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), cycles("Cycles"), cycleLatencies(sampleSize) {
WatchesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), cycles("Cycles"), cycleLatencies() {
testDuration = getOption(options, "testDuration"_sr, 600.0);
nodes = getOption(options, "nodeCount"_sr, 100);
extraPerNode = getOption(options, "extraPerNode"_sr, 1000);

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"

View File

@ -20,7 +20,7 @@
#include <boost/lexical_cast.hpp>
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -37,11 +37,11 @@ struct WriteBandwidthWorkload : KVWorkload {
std::vector<Future<Void>> clients;
PerfIntCounter transactions, retries;
ContinuousSample<double> commitLatencies, GRVLatencies;
DDSketch<double> commitLatencies, GRVLatencies;
WriteBandwidthWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), loadTime(0.0), transactions("Transactions"), retries("Retries"), commitLatencies(2000),
GRVLatencies(2000) {
: KVWorkload(wcx), loadTime(0.0), transactions("Transactions"), retries("Retries"), commitLatencies(),
GRVLatencies() {
testDuration = getOption(options, "testDuration"_sr, 10.0);
keysPerTransaction = getOption(options, "keysPerTransaction"_sr, 100);
valueString = std::string(maxValueBytes, '.');

View File

@ -26,7 +26,6 @@
#include "fdbclient/TagThrottle.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
constexpr int SAMPLE_SIZE = 10000;
// workload description:
// This workload aims to test whether we can throttling some bad clients that doing penetrating write on write hot-spot
// range. There are several good clientActor just randomly do read and write ops in transaction. Also, some bad
@ -41,8 +40,8 @@ struct WriteTagThrottlingWorkload : KVWorkload {
int badActorTrNum = 0, badActorRetries = 0, badActorTooOldRetries = 0, badActorCommitFailedRetries = 0;
int goodActorThrottleRetries = 0, badActorThrottleRetries = 0;
double badActorTotalLatency = 0.0, goodActorTotalLatency = 0.0;
ContinuousSample<double> badActorReadLatency, goodActorReadLatency;
ContinuousSample<double> badActorCommitLatency, goodActorCommitLatency;
DDSketch<double> badActorReadLatency, goodActorReadLatency;
DDSketch<double> badActorCommitLatency, goodActorCommitLatency;
// Test configuration
// KVWorkload::actorCount
int goodActorPerClient, badActorPerClient;
@ -64,8 +63,8 @@ struct WriteTagThrottlingWorkload : KVWorkload {
static constexpr int MIN_TRANSACTION_TAG_LENGTH = 2;
WriteTagThrottlingWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), badActorReadLatency(SAMPLE_SIZE), goodActorReadLatency(SAMPLE_SIZE),
badActorCommitLatency(SAMPLE_SIZE), goodActorCommitLatency(SAMPLE_SIZE) {
: KVWorkload(wcx), badActorReadLatency(), goodActorReadLatency(), badActorCommitLatency(),
goodActorCommitLatency() {
testDuration = getOption(options, "testDuration"_sr, 120.0);
badOpRate = getOption(options, "badOpRate"_sr, 0.9);
numWritePerTr = getOption(options, "numWritePerTr"_sr, 1);

View File

@ -112,8 +112,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT, 3600.0 );
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
init( PING_LOGGING_INTERVAL, 3.0 );
init( PING_SAMPLE_AMOUNT, 100 );
init( NETWORK_CONNECT_SAMPLE_AMOUNT, 100 );
init( PING_SKETCH_ACCURACY, 0.1 );
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
@ -168,7 +167,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( MIN_SUBMIT, 10 );
init( SQLITE_DISK_METRIC_LOGGING_INTERVAL, 5.0 );
init( KAIO_LATENCY_LOGGING_INTERVAL, 30.0 );
init( KAIO_LATENCY_SAMPLE_SIZE, 30000 );
init( KAIO_LATENCY_SKETCH_ACCURACY, 0.01 );
init( PAGE_WRITE_CHECKSUM_HISTORY, 0 ); if( randomize && BUGGIFY ) PAGE_WRITE_CHECKSUM_HISTORY = 10000000;
init( DISABLE_POSIX_KERNEL_AIO, 0 );
@ -303,7 +302,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
if ( randomize && BUGGIFY) { ENCRYPT_KEY_REFRESH_INTERVAL = deterministicRandom()->randomInt(2, 10); }
init( TOKEN_CACHE_SIZE, 100 );
init( ENCRYPT_KEY_CACHE_LOGGING_INTERVAL, 5.0 );
init( ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE, 1000 );
init( ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY, 0.01 );
// Refer to EncryptUtil::EncryptAuthTokenAlgo for more details
init( ENCRYPT_HEADER_AUTH_TOKEN_ENABLED, true ); if ( randomize && BUGGIFY ) { ENCRYPT_HEADER_AUTH_TOKEN_ENABLED = !ENCRYPT_HEADER_AUTH_TOKEN_ENABLED; }
init( ENCRYPT_HEADER_AUTH_TOKEN_ALGO, 1 ); if ( randomize && BUGGIFY ) { ENCRYPT_HEADER_AUTH_TOKEN_ALGO = getRandomAuthTokenAlgo(); }

View File

@ -176,8 +176,7 @@ public:
int ACCEPT_BATCH_SIZE;
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
double PING_LOGGING_INTERVAL;
int PING_SAMPLE_AMOUNT;
int NETWORK_CONNECT_SAMPLE_AMOUNT;
double PING_SKETCH_ACCURACY;
int TLS_CERT_REFRESH_DELAY_SECONDS;
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
@ -231,7 +230,7 @@ public:
int MIN_SUBMIT;
double SQLITE_DISK_METRIC_LOGGING_INTERVAL;
double KAIO_LATENCY_LOGGING_INTERVAL;
int KAIO_LATENCY_SAMPLE_SIZE;
double KAIO_LATENCY_SKETCH_ACCURACY;
int PAGE_WRITE_CHECKSUM_HISTORY;
int DISABLE_POSIX_KERNEL_AIO;
@ -365,7 +364,7 @@ public:
int64_t ENCRYPT_CIPHER_KEY_CACHE_TTL;
int64_t ENCRYPT_KEY_REFRESH_INTERVAL;
double ENCRYPT_KEY_CACHE_LOGGING_INTERVAL;
double ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE;
double ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY;
bool ENCRYPT_HEADER_AUTH_TOKEN_ENABLED;
int ENCRYPT_HEADER_AUTH_TOKEN_ALGO;

View File

@ -22,8 +22,62 @@
#include "flow/IRandom.h"
#include "flowbench/GlobalData.h"
#include "fdbrpc/Stats.h"
#include "fdbrpc/DDSketch.h"
#include "fdbrpc/ContinuousSample.h"
#include "flow/Histogram.h"
static void bench_ddsketchUnsigned(benchmark::State& state) {
DDSketchFastUnsigned dds;
InputGenerator<unsigned> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });
for (auto _ : state) {
dds.addSample(data.next());
}
state.SetItemsProcessed(state.iterations());
}
// DDSketchFastUnsigned has a fixed error margin (~8%)
BENCHMARK(bench_ddsketchUnsigned)->ReportAggregatesOnly(true);
static void bench_ddsketchInt(benchmark::State& state) {
DDSketch<int64_t> dds((double)state.range(0) / 100);
InputGenerator<int64_t> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });
for (auto _ : state) {
dds.addSample(data.next());
}
state.SetItemsProcessed(state.iterations());
}
// Try with 10%, 5% and 1% error margins
BENCHMARK(bench_ddsketchInt)->Arg(10)->Arg(5)->Arg(1)->ReportAggregatesOnly(true);
static void bench_ddsketchDouble(benchmark::State& state) {
DDSketch<double> dds((double)state.range(0) / 100);
InputGenerator<double> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });
for (auto _ : state) {
dds.addSample(data.next());
}
state.SetItemsProcessed(state.iterations());
}
// Try with 10%, 5% and 1% error margins
BENCHMARK(bench_ddsketchDouble)->Arg(10)->Arg(5)->Arg(1)->ReportAggregatesOnly(true);
static void bench_ddsketchLatency(benchmark::State& state) {
DDSketch<double> dds((double)state.range(0) / 100);
InputGenerator<double> data(1e6, []() { return deterministicRandom()->random01() * 2.0; });
for (auto _ : state) {
dds.addSample(data.next());
}
state.SetItemsProcessed(state.iterations());
}
// Try with 10%, 5% and 1% error margins
BENCHMARK(bench_ddsketchLatency)->Arg(10)->Arg(5)->Arg(1)->ReportAggregatesOnly(true);
static void bench_continuousSampleInt(benchmark::State& state) {
ContinuousSample<int64_t> cs(state.range(0));
InputGenerator<int64_t> data(1e6, []() { return deterministicRandom()->randomInt64(0, 1e9); });

View File

@ -56,6 +56,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES BlobManagerUnit.toml)
add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE)
add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)
add_fdb_test(TEST_FILES DDSketch.txt IGNORE)
add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)
add_fdb_test(TEST_FILES DiskDurability.txt IGNORE)
add_fdb_test(TEST_FILES FileSystem.txt IGNORE)