refactor ReadWriteWorkload
This commit is contained in:
parent
b0c26e93b2
commit
4f3a7b7e7f
|
@ -267,6 +267,7 @@ set(FDBSERVER_SRCS
|
|||
workloads/ReadAfterWrite.actor.cpp
|
||||
workloads/ReadHotDetection.actor.cpp
|
||||
workloads/ReadWrite.actor.cpp
|
||||
workloads/ReadWriteWorkload.actor.h
|
||||
workloads/RemoveServersSafely.actor.cpp
|
||||
workloads/ReportConflictingKeys.actor.cpp
|
||||
workloads/RestoreBackup.actor.cpp
|
||||
|
|
|
@ -28,209 +28,13 @@
|
|||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
#include "fdbserver/workloads/ReadWriteWorkload.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const int sampleSize = 10000;
|
||||
static Future<Version> nextRV;
|
||||
static Version lastRV = invalidVersion;
|
||||
|
||||
ACTOR static Future<Version> getNextRV(Database db) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
Version v = wait(tr.getReadVersion());
|
||||
return v;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
static Future<Version> getInconsistentReadVersion(Database const& db) {
|
||||
if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running
|
||||
if (nextRV.isValid())
|
||||
lastRV = nextRV.get();
|
||||
nextRV = getNextRV(db);
|
||||
}
|
||||
if (lastRV == invalidVersion)
|
||||
return nextRV;
|
||||
else
|
||||
return lastRV;
|
||||
}
|
||||
|
||||
DESCR struct TransactionSuccessMetric {
|
||||
int64_t totalLatency; // ns
|
||||
int64_t startLatency; // ns
|
||||
int64_t commitLatency; // ns
|
||||
int64_t retries; // count
|
||||
};
|
||||
|
||||
DESCR struct TransactionFailureMetric {
|
||||
int64_t startLatency; // ns
|
||||
int64_t errorCode; // flow error code
|
||||
};
|
||||
|
||||
DESCR struct ReadMetric {
|
||||
int64_t readLatency; // ns
|
||||
};
|
||||
|
||||
struct ReadWriteWorkload : KVWorkload {
|
||||
// general test setting
|
||||
Standalone<StringRef> descriptionString;
|
||||
bool doSetup, cancelWorkersAtDuration;
|
||||
double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime;
|
||||
double metricsStart, metricsDuration;
|
||||
std::vector<uint64_t> insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup
|
||||
|
||||
// test log setting
|
||||
bool enableReadLatencyLogging;
|
||||
double periodicLoggingInterval;
|
||||
|
||||
// use ReadWrite as a ramp up workload
|
||||
bool rampUpLoad; // indicate this is a ramp up workload
|
||||
int rampSweepCount; // how many times of ramp up
|
||||
bool rampTransactionType; // choose transaction type based on client start time
|
||||
bool rampUpConcurrency; // control client concurrency
|
||||
|
||||
// transaction setting
|
||||
bool useRYW;
|
||||
bool batchPriority;
|
||||
bool rangeReads; // read operations are all single key range read
|
||||
bool dependentReads; // read operations are issued sequentially
|
||||
bool inconsistentReads; // read with previous read version
|
||||
bool adjacentReads; // keys are adjacent within a transaction
|
||||
bool adjacentWrites;
|
||||
double alpha; // probability for run TransactionA type
|
||||
// two type of transaction
|
||||
int readsPerTransactionA, writesPerTransactionA;
|
||||
int readsPerTransactionB, writesPerTransactionB;
|
||||
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
|
||||
std::string valueString;
|
||||
// hot traffic pattern
|
||||
double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting
|
||||
|
||||
// states of metric
|
||||
Int64MetricHandle totalReadsMetric;
|
||||
Int64MetricHandle totalRetriesMetric;
|
||||
EventMetricHandle<TransactionSuccessMetric> transactionSuccessMetric;
|
||||
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
|
||||
EventMetricHandle<ReadMetric> readMetric;
|
||||
PerfIntCounter aTransactions, bTransactions, retries;
|
||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
|
||||
double readLatencyTotal;
|
||||
int readLatencyCount;
|
||||
std::vector<PerfMetric> periodicMetrics;
|
||||
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts; // sequential insertion speed
|
||||
|
||||
// other internal states
|
||||
std::vector<Future<Void>> clients;
|
||||
double loadTime, clientBegin;
|
||||
|
||||
ReadWriteWorkload(WorkloadContext const& wcx)
|
||||
: KVWorkload(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false),
|
||||
totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")),
|
||||
totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"),
|
||||
bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize),
|
||||
commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0),
|
||||
readLatencyCount(0), loadTime(0.0), clientBegin(0) {
|
||||
transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction"));
|
||||
transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction"));
|
||||
readMetric.init(LiteralStringRef("RWWorkload.Read"));
|
||||
|
||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
|
||||
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
|
||||
actorCount = ceil(transactionsPerSecond * allowedLatency);
|
||||
actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount);
|
||||
|
||||
readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10);
|
||||
writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0);
|
||||
readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1);
|
||||
writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9);
|
||||
alpha = getOption(options, LiteralStringRef("alpha"), 0.1);
|
||||
|
||||
extraReadConflictRangesPerTransaction =
|
||||
getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0);
|
||||
extraWriteConflictRangesPerTransaction =
|
||||
getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0);
|
||||
|
||||
valueString = std::string(maxValueBytes, '.');
|
||||
if (nodePrefix > 0) {
|
||||
keyBytes += 16;
|
||||
}
|
||||
|
||||
metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0);
|
||||
metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration);
|
||||
if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) {
|
||||
// discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test
|
||||
metricsStart += testDuration * 0.125;
|
||||
metricsDuration *= 0.75;
|
||||
}
|
||||
|
||||
dependentReads = getOption(options, LiteralStringRef("dependentReads"), false);
|
||||
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
||||
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
||||
debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0);
|
||||
debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0);
|
||||
enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false);
|
||||
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
|
||||
cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true);
|
||||
inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false);
|
||||
adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false);
|
||||
adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false);
|
||||
rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false);
|
||||
useRYW = getOption(options, LiteralStringRef("useRYW"), false);
|
||||
rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1);
|
||||
rangeReads = getOption(options, LiteralStringRef("rangeReads"), false);
|
||||
rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false);
|
||||
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
|
||||
doSetup = getOption(options, LiteralStringRef("setup"), true);
|
||||
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
|
||||
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
|
||||
|
||||
if (rampUpConcurrency)
|
||||
ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down
|
||||
|
||||
// Validate that keyForIndex() is monotonic
|
||||
for (int i = 0; i < 30; i++) {
|
||||
int64_t a = deterministicRandom()->randomInt64(0, nodeCount);
|
||||
int64_t b = deterministicRandom()->randomInt64(0, nodeCount);
|
||||
if (a > b) {
|
||||
std::swap(a, b);
|
||||
}
|
||||
ASSERT(a <= b);
|
||||
ASSERT((keyForIndex(a, false) <= keyForIndex(b, false)));
|
||||
}
|
||||
|
||||
std::vector<std::string> insertionCountsToMeasureString =
|
||||
getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector<std::string>());
|
||||
for (int i = 0; i < insertionCountsToMeasureString.size(); i++) {
|
||||
try {
|
||||
uint64_t count = boost::lexical_cast<uint64_t>(insertionCountsToMeasureString[i]);
|
||||
insertionCountsToMeasure.push_back(count);
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// with P(hotTrafficFraction) an access is directed to one of a fraction
|
||||
// of hot keys, else it is directed to a disjoint set of cold keys
|
||||
hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0);
|
||||
double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0);
|
||||
ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1);
|
||||
ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot!
|
||||
// p(Cold key) = (1-FHP) * (1-hkf)
|
||||
// p(Cold key) = (1-htf)
|
||||
// solving for FHP gives:
|
||||
forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction);
|
||||
}
|
||||
}
|
||||
|
||||
std::string description() const override { return descriptionString.toString(); }
|
||||
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
|
||||
Future<Void> start(Database const& cx) override { return _start(cx, this); }
|
||||
|
||||
struct ReadWriteCommonImpl {
|
||||
// trace methods
|
||||
ACTOR static Future<bool> traceDumpWorkers(Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
try {
|
||||
loop {
|
||||
|
@ -257,91 +61,7 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override {
|
||||
clients.clear();
|
||||
|
||||
if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration)
|
||||
metricsDuration = now() - metricsStart;
|
||||
|
||||
g_traceBatch.dump();
|
||||
if (clientId == 0)
|
||||
return traceDumpWorkers(dbInfo);
|
||||
else
|
||||
return true;
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||
double duration = metricsDuration;
|
||||
int reads =
|
||||
(aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB);
|
||||
int writes =
|
||||
(aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB);
|
||||
m.emplace_back("Measured Duration", duration, Averaged::True);
|
||||
m.emplace_back(
|
||||
"Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False);
|
||||
m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False);
|
||||
m.push_back(aTransactions.getMetric());
|
||||
m.push_back(bTransactions.getMetric());
|
||||
m.push_back(retries.getMetric());
|
||||
m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True);
|
||||
m.emplace_back("Read rows", reads, Averaged::False);
|
||||
m.emplace_back("Write rows", writes, Averaged::False);
|
||||
|
||||
if (!rampUpLoad) {
|
||||
m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True);
|
||||
m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True);
|
||||
m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True);
|
||||
m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True);
|
||||
m.emplace_back(
|
||||
"Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True);
|
||||
}
|
||||
|
||||
m.emplace_back("Read rows/sec", reads / duration, Averaged::False);
|
||||
m.emplace_back("Write rows/sec", writes / duration, Averaged::False);
|
||||
m.emplace_back(
|
||||
"Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
|
||||
m.emplace_back("Bytes written/sec",
|
||||
(writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration,
|
||||
Averaged::False);
|
||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||
|
||||
std::vector<std::pair<uint64_t, double>>::iterator ratesItr = ratesAtKeyCounts.begin();
|
||||
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++)
|
||||
m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False);
|
||||
}
|
||||
|
||||
Value randomValue() {
|
||||
return StringRef((uint8_t*)valueString.c_str(),
|
||||
deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1));
|
||||
}
|
||||
|
||||
Standalone<KeyValueRef> operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); }
|
||||
|
||||
template <class Trans>
|
||||
void setupTransaction(Trans* tr) {
|
||||
if (batchPriority) {
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tracePeriodically(ReadWriteWorkload* self) {
|
||||
ACTOR static Future<Void> tracePeriodically(ReadWriteCommon* self) {
|
||||
state double start = now();
|
||||
state double elapsed = 0.0;
|
||||
state int64_t last_ops = 0;
|
||||
|
@ -477,7 +197,6 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
self->readLatencyCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f,
|
||||
ContinuousSample<double>* latencies,
|
||||
double* totalLatency,
|
||||
|
@ -498,7 +217,6 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> logLatency(Future<RangeResult> f,
|
||||
ContinuousSample<double>* latencies,
|
||||
double* totalLatency,
|
||||
|
@ -520,52 +238,7 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class Trans>
|
||||
Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, ReadWriteWorkload* self, bool shouldRecord) {
|
||||
if (!keys.size())
|
||||
return Void();
|
||||
if (!self->dependentReads) {
|
||||
std::vector<Future<Void>> readers;
|
||||
if (self->rangeReads) {
|
||||
for (int op = 0; op < keys.size(); op++) {
|
||||
++self->totalReadsMetric;
|
||||
readers.push_back(logLatency(
|
||||
tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))),
|
||||
GetRangeLimits(-1, 80000)),
|
||||
&self->readLatencies,
|
||||
&self->readLatencyTotal,
|
||||
&self->readLatencyCount,
|
||||
self->readMetric,
|
||||
shouldRecord));
|
||||
}
|
||||
} else {
|
||||
for (int op = 0; op < keys.size(); op++) {
|
||||
++self->totalReadsMetric;
|
||||
readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])),
|
||||
&self->readLatencies,
|
||||
&self->readLatencyTotal,
|
||||
&self->readLatencyCount,
|
||||
self->readMetric,
|
||||
shouldRecord));
|
||||
}
|
||||
}
|
||||
wait(waitForAll(readers));
|
||||
} else {
|
||||
state int op;
|
||||
for (op = 0; op < keys.size(); op++) {
|
||||
++self->totalReadsMetric;
|
||||
wait(logLatency(tr->get(self->keyForIndex(keys[op])),
|
||||
&self->readLatencies,
|
||||
&self->readLatencyTotal,
|
||||
&self->readLatencyCount,
|
||||
self->readMetric,
|
||||
shouldRecord));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _setup(Database cx, ReadWriteWorkload* self) {
|
||||
ACTOR static Future<Void> setup(Database cx, ReadWriteCommon* self) {
|
||||
if (!self->doSetup)
|
||||
return Void();
|
||||
|
||||
|
@ -587,8 +260,232 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> _start(Database cx, ReadWriteWorkload* self) {
|
||||
Future<Void> ReadWriteCommon::tracePeriodically() {
|
||||
return ReadWriteCommonImpl::tracePeriodically(this);
|
||||
}
|
||||
|
||||
Future<Void> ReadWriteCommon::logLatency(Future<Optional<Value>> f, bool shouldRecord) {
|
||||
return ReadWriteCommonImpl::logLatency(
|
||||
f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord);
|
||||
}
|
||||
|
||||
Future<Void> ReadWriteCommon::logLatency(Future<RangeResult> f, bool shouldRecord) {
|
||||
return ReadWriteCommonImpl::logLatency(
|
||||
f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord);
|
||||
}
|
||||
|
||||
Future<Void> ReadWriteCommon::setup(Database const& cx) {
|
||||
return ReadWriteCommonImpl::setup(cx, this);
|
||||
}
|
||||
|
||||
Future<bool> ReadWriteCommon::check(Database const& cx) {
|
||||
clients.clear();
|
||||
|
||||
if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration)
|
||||
metricsDuration = now() - metricsStart;
|
||||
|
||||
g_traceBatch.dump();
|
||||
if (clientId == 0)
|
||||
return ReadWriteCommonImpl::traceDumpWorkers(dbInfo);
|
||||
else
|
||||
return true;
|
||||
}
|
||||
|
||||
void ReadWriteCommon::getMetrics(std::vector<PerfMetric>& m) {
|
||||
double duration = metricsDuration;
|
||||
int reads = (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB);
|
||||
int writes =
|
||||
(aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB);
|
||||
m.emplace_back("Measured Duration", duration, Averaged::True);
|
||||
m.emplace_back(
|
||||
"Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False);
|
||||
m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False);
|
||||
m.push_back(aTransactions.getMetric());
|
||||
m.push_back(bTransactions.getMetric());
|
||||
m.push_back(retries.getMetric());
|
||||
m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True);
|
||||
m.emplace_back("Read rows", reads, Averaged::False);
|
||||
m.emplace_back("Write rows", writes, Averaged::False);
|
||||
m.emplace_back("Read rows/sec", reads / duration, Averaged::False);
|
||||
m.emplace_back("Write rows/sec", writes / duration, Averaged::False);
|
||||
m.emplace_back(
|
||||
"Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
|
||||
m.emplace_back(
|
||||
"Bytes written/sec", (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
|
||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||
|
||||
std::vector<std::pair<uint64_t, double>>::iterator ratesItr = ratesAtKeyCounts.begin();
|
||||
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++)
|
||||
m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False);
|
||||
}
|
||||
|
||||
Value ReadWriteCommon::randomValue() {
|
||||
return StringRef((uint8_t*)valueString.c_str(), deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1));
|
||||
}
|
||||
|
||||
Standalone<KeyValueRef> ReadWriteCommon::operator()(uint64_t n) {
|
||||
return KeyValueRef(keyForIndex(n, false), randomValue());
|
||||
}
|
||||
|
||||
bool ReadWriteCommon::shouldRecord(double checkTime) {
|
||||
double timeSinceStart = checkTime - clientBegin;
|
||||
return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration);
|
||||
}
|
||||
|
||||
static Future<Version> nextRV;
|
||||
static Version lastRV = invalidVersion;
|
||||
|
||||
ACTOR static Future<Version> getNextRV(Database db) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
Version v = wait(tr.getReadVersion());
|
||||
return v;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Future<Version> getInconsistentReadVersion(Database const& db) {
|
||||
if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running
|
||||
if (nextRV.isValid())
|
||||
lastRV = nextRV.get();
|
||||
nextRV = getNextRV(db);
|
||||
}
|
||||
if (lastRV == invalidVersion)
|
||||
return nextRV;
|
||||
else
|
||||
return lastRV;
|
||||
}
|
||||
|
||||
struct ReadWriteWorkload : ReadWriteCommon {
|
||||
// use ReadWrite as a ramp up workload
|
||||
bool rampUpLoad; // indicate this is a ramp up workload
|
||||
int rampSweepCount; // how many times of ramp up
|
||||
bool rampTransactionType; // choose transaction type based on client start time
|
||||
bool rampUpConcurrency; // control client concurrency
|
||||
|
||||
// transaction setting
|
||||
bool batchPriority;
|
||||
bool rangeReads; // read operations are all single key range read
|
||||
bool dependentReads; // read operations are issued sequentially
|
||||
bool inconsistentReads; // read with previous read version
|
||||
bool adjacentReads; // keys are adjacent within a transaction
|
||||
bool adjacentWrites;
|
||||
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
|
||||
|
||||
// hot traffic pattern
|
||||
double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting
|
||||
|
||||
ReadWriteWorkload(WorkloadContext const& wcx)
|
||||
: ReadWriteCommon(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false) {
|
||||
extraReadConflictRangesPerTransaction =
|
||||
getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0);
|
||||
extraWriteConflictRangesPerTransaction =
|
||||
getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0);
|
||||
dependentReads = getOption(options, LiteralStringRef("dependentReads"), false);
|
||||
inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false);
|
||||
adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false);
|
||||
adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false);
|
||||
rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false);
|
||||
rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1);
|
||||
rangeReads = getOption(options, LiteralStringRef("rangeReads"), false);
|
||||
rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false);
|
||||
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
|
||||
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
|
||||
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
|
||||
|
||||
if (rampUpConcurrency)
|
||||
ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down
|
||||
|
||||
{
|
||||
// with P(hotTrafficFraction) an access is directed to one of a fraction
|
||||
// of hot keys, else it is directed to a disjoint set of cold keys
|
||||
hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0);
|
||||
double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0);
|
||||
ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1);
|
||||
ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot!
|
||||
// p(Cold key) = (1-FHP) * (1-hkf)
|
||||
// p(Cold key) = (1-htf)
|
||||
// solving for FHP gives:
|
||||
forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction);
|
||||
}
|
||||
}
|
||||
|
||||
std::string description() const override { return descriptionString.toString(); }
|
||||
|
||||
template <class Trans>
|
||||
void setupTransaction(Trans* tr) {
|
||||
if (batchPriority) {
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
|
||||
}
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||
ReadWriteCommon::getMetrics(m);
|
||||
if (!rampUpLoad) {
|
||||
m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True);
|
||||
m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True);
|
||||
m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True);
|
||||
m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True);
|
||||
m.emplace_back(
|
||||
"Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True);
|
||||
|
||||
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True);
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> start(Database const& cx) override { return _start(cx, this); }
|
||||
|
||||
ACTOR template <class Trans>
|
||||
static Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, ReadWriteWorkload* self, bool shouldRecord) {
|
||||
if (!keys.size())
|
||||
return Void();
|
||||
if (!self->dependentReads) {
|
||||
std::vector<Future<Void>> readers;
|
||||
if (self->rangeReads) {
|
||||
for (int op = 0; op < keys.size(); op++) {
|
||||
++self->totalReadsMetric;
|
||||
readers.push_back(self->logLatency(
|
||||
tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))),
|
||||
GetRangeLimits(-1, 80000)),
|
||||
shouldRecord));
|
||||
}
|
||||
} else {
|
||||
for (int op = 0; op < keys.size(); op++) {
|
||||
++self->totalReadsMetric;
|
||||
readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord));
|
||||
}
|
||||
}
|
||||
wait(waitForAll(readers));
|
||||
} else {
|
||||
state int op;
|
||||
for (op = 0; op < keys.size(); op++) {
|
||||
++self->totalReadsMetric;
|
||||
wait(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _start(Database cx, ReadWriteWorkload* self) {
|
||||
// Read one record from the database to warm the cache of keyServers
|
||||
state std::vector<int64_t> keys;
|
||||
keys.push_back(deterministicRandom()->randomInt64(0, self->nodeCount));
|
||||
|
@ -610,7 +507,7 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
|
||||
std::vector<Future<Void>> clients;
|
||||
if (self->enableReadLatencyLogging)
|
||||
clients.push_back(tracePeriodically(self));
|
||||
clients.push_back(self->tracePeriodically());
|
||||
|
||||
self->clientBegin = now();
|
||||
for (int c = 0; c < self->actorCount; c++) {
|
||||
|
@ -632,13 +529,6 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
bool shouldRecord() { return shouldRecord(now()); }
|
||||
|
||||
bool shouldRecord(double checkTime) {
|
||||
double timeSinceStart = checkTime - clientBegin;
|
||||
return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration);
|
||||
}
|
||||
|
||||
int64_t getRandomKey(uint64_t nodeCount) {
|
||||
if (forceHotProbability && deterministicRandom()->random01() < forceHotProbability)
|
||||
return deterministicRandom()->randomInt64(0, nodeCount * hotKeyFraction) /
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* ReadWriteWorkload.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H)
|
||||
#define FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H
|
||||
#include "fdbserver/workloads/ReadWriteWorkload.actor.g.h"
|
||||
#elif !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_H)
|
||||
#define FDBSERVER_READWRITEWORKLOAD_ACTOR_H
|
||||
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
DESCR struct TransactionSuccessMetric {
|
||||
int64_t totalLatency; // ns
|
||||
int64_t startLatency; // ns
|
||||
int64_t commitLatency; // ns
|
||||
int64_t retries; // count
|
||||
};
|
||||
|
||||
DESCR struct TransactionFailureMetric {
|
||||
int64_t startLatency; // ns
|
||||
int64_t errorCode; // flow error code
|
||||
};
|
||||
|
||||
DESCR struct ReadMetric {
|
||||
int64_t readLatency; // ns
|
||||
};
|
||||
|
||||
// Common ReadWrite test settings
|
||||
struct ReadWriteCommon : KVWorkload {
|
||||
static constexpr int sampleSize = 10000;
|
||||
friend struct ReadWriteCommonImpl;
|
||||
|
||||
// general test setting
|
||||
Standalone<StringRef> descriptionString;
|
||||
bool doSetup, cancelWorkersAtDuration;
|
||||
double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime;
|
||||
double metricsStart, metricsDuration;
|
||||
std::vector<uint64_t> insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup
|
||||
|
||||
// test log setting
|
||||
bool enableReadLatencyLogging;
|
||||
double periodicLoggingInterval;
|
||||
|
||||
// two type of transaction
|
||||
int readsPerTransactionA, writesPerTransactionA;
|
||||
int readsPerTransactionB, writesPerTransactionB;
|
||||
std::string valueString;
|
||||
double alpha; // probability for run TransactionA type
|
||||
// transaction setting
|
||||
bool useRYW;
|
||||
|
||||
// states of metric
|
||||
Int64MetricHandle totalReadsMetric;
|
||||
Int64MetricHandle totalRetriesMetric;
|
||||
EventMetricHandle<TransactionSuccessMetric> transactionSuccessMetric;
|
||||
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
|
||||
EventMetricHandle<ReadMetric> readMetric;
|
||||
PerfIntCounter aTransactions, bTransactions, retries;
|
||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
|
||||
double readLatencyTotal;
|
||||
int readLatencyCount;
|
||||
std::vector<PerfMetric> periodicMetrics;
|
||||
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts; // sequential insertion speed
|
||||
|
||||
// other internal states
|
||||
std::vector<Future<Void>> clients;
|
||||
double loadTime, clientBegin;
|
||||
|
||||
explicit ReadWriteCommon(WorkloadContext const& wcx)
|
||||
: KVWorkload(wcx), totalReadsMetric(LiteralStringRef("ReadWrite.TotalReads")),
|
||||
totalRetriesMetric(LiteralStringRef("ReadWrite.TotalRetries")), aTransactions("A Transactions"),
|
||||
bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize),
|
||||
commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0),
|
||||
readLatencyCount(0), loadTime(0.0), clientBegin(0) {
|
||||
|
||||
transactionSuccessMetric.init(LiteralStringRef("ReadWrite.SuccessfulTransaction"));
|
||||
transactionFailureMetric.init(LiteralStringRef("ReadWrite.FailedTransaction"));
|
||||
readMetric.init(LiteralStringRef("ReadWrite.Read"));
|
||||
|
||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
|
||||
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
|
||||
actorCount = ceil(transactionsPerSecond * allowedLatency);
|
||||
actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount);
|
||||
|
||||
readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10);
|
||||
writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0);
|
||||
readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1);
|
||||
writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9);
|
||||
alpha = getOption(options, LiteralStringRef("alpha"), 0.1);
|
||||
|
||||
valueString = std::string(maxValueBytes, '.');
|
||||
if (nodePrefix > 0) {
|
||||
keyBytes += 16;
|
||||
}
|
||||
|
||||
metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0);
|
||||
metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration);
|
||||
if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) {
|
||||
// discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test
|
||||
metricsStart += testDuration * 0.125;
|
||||
metricsDuration *= 0.75;
|
||||
}
|
||||
|
||||
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
||||
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
||||
debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0);
|
||||
debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0);
|
||||
enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false);
|
||||
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
|
||||
cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true);
|
||||
|
||||
useRYW = getOption(options, LiteralStringRef("useRYW"), false);
|
||||
doSetup = getOption(options, LiteralStringRef("setup"), true);
|
||||
|
||||
// Validate that keyForIndex() is monotonic
|
||||
for (int i = 0; i < 30; i++) {
|
||||
int64_t a = deterministicRandom()->randomInt64(0, nodeCount);
|
||||
int64_t b = deterministicRandom()->randomInt64(0, nodeCount);
|
||||
if (a > b) {
|
||||
std::swap(a, b);
|
||||
}
|
||||
ASSERT(a <= b);
|
||||
ASSERT((keyForIndex(a, false) <= keyForIndex(b, false)));
|
||||
}
|
||||
|
||||
std::vector<std::string> insertionCountsToMeasureString =
|
||||
getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector<std::string>());
|
||||
for (int i = 0; i < insertionCountsToMeasureString.size(); i++) {
|
||||
try {
|
||||
uint64_t count = boost::lexical_cast<uint64_t>(insertionCountsToMeasureString[i]);
|
||||
insertionCountsToMeasure.push_back(count);
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> tracePeriodically();
|
||||
Future<Void> logLatency(Future<Optional<Value>> f, bool shouldRecord);
|
||||
Future<Void> logLatency(Future<RangeResult> f, bool shouldRecord);
|
||||
|
||||
Future<Void> setup(Database const& cx) override;
|
||||
Future<bool> check(Database const& cx) override;
|
||||
void getMetrics(std::vector<PerfMetric>& m) override;
|
||||
|
||||
Standalone<KeyValueRef> operator()(uint64_t n);
|
||||
bool shouldRecord(double checkTime = now());
|
||||
Value randomValue();
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif // FDBSERVER_READWRITEWORKLOAD_ACTOR_H
|
|
@ -28,48 +28,13 @@
|
|||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
#include "fdbserver/workloads/ReadWriteWorkload.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "fdbclient/RunTransaction.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const int sampleSize = 10000;
|
||||
DESCR struct TransactionSuccessMetric {
|
||||
int64_t totalLatency; // ns
|
||||
int64_t startLatency; // ns
|
||||
int64_t commitLatency; // ns
|
||||
int64_t retries; // count
|
||||
};
|
||||
|
||||
DESCR struct TransactionFailureMetric {
|
||||
int64_t startLatency; // ns
|
||||
int64_t errorCode; // flow error code
|
||||
};
|
||||
|
||||
DESCR struct ReadMetric {
|
||||
int64_t readLatency; // ns
|
||||
};
|
||||
|
||||
struct SkewedReadWriteWorkload : KVWorkload {
|
||||
// general test setting
|
||||
Standalone<StringRef> descriptionString;
|
||||
bool doSetup, cancelWorkersAtDuration;
|
||||
double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime;
|
||||
double metricsStart, metricsDuration;
|
||||
std::vector<uint64_t> insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup
|
||||
|
||||
// test log setting
|
||||
bool enableReadLatencyLogging;
|
||||
double periodicLoggingInterval;
|
||||
|
||||
// transaction setting
|
||||
bool useRYW;
|
||||
double alpha; // probability for run TransactionA type
|
||||
// two type of transaction
|
||||
int readsPerTransactionA, writesPerTransactionA;
|
||||
int readsPerTransactionB, writesPerTransactionB;
|
||||
std::string valueString;
|
||||
|
||||
struct SkewedReadWriteWorkload : ReadWriteCommon {
|
||||
// server based hot traffic setting
|
||||
int skewRound = 0; // skewDuration = ceil(testDuration / skewRound)
|
||||
double hotServerFraction = 0, hotServerShardFraction = 1.0; // set > 0 to issue hot key based on shard map
|
||||
|
@ -83,184 +48,20 @@ struct SkewedReadWriteWorkload : KVWorkload {
|
|||
std::map<UID, StorageServerInterface> serverInterfaces;
|
||||
int hotServerCount = 0, currentHotRound = -1;
|
||||
|
||||
// states of metric
|
||||
Int64MetricHandle totalReadsMetric;
|
||||
Int64MetricHandle totalRetriesMetric;
|
||||
EventMetricHandle<TransactionSuccessMetric> transactionSuccessMetric;
|
||||
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
|
||||
EventMetricHandle<ReadMetric> readMetric;
|
||||
PerfIntCounter aTransactions, bTransactions, retries;
|
||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
|
||||
double readLatencyTotal;
|
||||
int readLatencyCount;
|
||||
std::vector<PerfMetric> periodicMetrics;
|
||||
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts; // sequential insertion speed
|
||||
|
||||
// other internal states
|
||||
std::vector<Future<Void>> clients;
|
||||
double loadTime, clientBegin;
|
||||
|
||||
SkewedReadWriteWorkload(WorkloadContext const& wcx)
|
||||
: KVWorkload(wcx), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")),
|
||||
totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"),
|
||||
bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize),
|
||||
commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0),
|
||||
readLatencyCount(0), loadTime(0.0), clientBegin(0) {
|
||||
|
||||
transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction"));
|
||||
transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction"));
|
||||
readMetric.init(LiteralStringRef("RWWorkload.Read"));
|
||||
|
||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
|
||||
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
|
||||
actorCount = ceil(transactionsPerSecond * allowedLatency);
|
||||
actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount);
|
||||
|
||||
readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10);
|
||||
writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0);
|
||||
readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1);
|
||||
writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9);
|
||||
alpha = getOption(options, LiteralStringRef("alpha"), 0.1);
|
||||
|
||||
valueString = std::string(maxValueBytes, '.');
|
||||
if (nodePrefix > 0) {
|
||||
keyBytes += 16;
|
||||
}
|
||||
|
||||
metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0);
|
||||
metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration);
|
||||
if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) {
|
||||
// discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test
|
||||
metricsStart += testDuration * 0.125;
|
||||
metricsDuration *= 0.75;
|
||||
}
|
||||
|
||||
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
||||
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
||||
debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0);
|
||||
debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0);
|
||||
enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false);
|
||||
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
|
||||
cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true);
|
||||
useRYW = getOption(options, LiteralStringRef("useRYW"), false);
|
||||
doSetup = getOption(options, LiteralStringRef("setup"), true);
|
||||
SkewedReadWriteWorkload(WorkloadContext const& wcx) : ReadWriteCommon(wcx) {
|
||||
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("SkewedReadWrite"));
|
||||
|
||||
// Validate that keyForIndex() is monotonic
|
||||
for (int i = 0; i < 30; i++) {
|
||||
int64_t a = deterministicRandom()->randomInt64(0, nodeCount);
|
||||
int64_t b = deterministicRandom()->randomInt64(0, nodeCount);
|
||||
if (a > b) {
|
||||
std::swap(a, b);
|
||||
}
|
||||
ASSERT(a <= b);
|
||||
ASSERT((keyForIndex(a, false) <= keyForIndex(b, false)));
|
||||
}
|
||||
|
||||
std::vector<std::string> insertionCountsToMeasureString =
|
||||
getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector<std::string>());
|
||||
for (int i = 0; i < insertionCountsToMeasureString.size(); i++) {
|
||||
try {
|
||||
uint64_t count = boost::lexical_cast<uint64_t>(insertionCountsToMeasureString[i]);
|
||||
insertionCountsToMeasure.push_back(count);
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2);
|
||||
hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0);
|
||||
hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0);
|
||||
skewRound = getOption(options, "skewRound"_sr, 1);
|
||||
hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8);
|
||||
hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0);
|
||||
ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) &&
|
||||
skewRound > 0);
|
||||
}
|
||||
hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2);
|
||||
hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0);
|
||||
hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0);
|
||||
skewRound = getOption(options, "skewRound"_sr, 1);
|
||||
hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8);
|
||||
hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0);
|
||||
ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) && skewRound > 0);
|
||||
}
|
||||
|
||||
std::string description() const override { return descriptionString.toString(); }
|
||||
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
|
||||
Future<Void> start(Database const& cx) override { return _start(cx, this); }
|
||||
|
||||
ACTOR static Future<bool> traceDumpWorkers(Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
try {
|
||||
loop {
|
||||
choose {
|
||||
when(wait(db->onChange())) {}
|
||||
|
||||
when(ErrorOr<std::vector<WorkerDetails>> workerList =
|
||||
wait(db->get().clusterInterface.getWorkers.tryGetReply(GetWorkersRequest()))) {
|
||||
if (workerList.present()) {
|
||||
std::vector<Future<ErrorOr<Void>>> dumpRequests;
|
||||
dumpRequests.reserve(workerList.get().size());
|
||||
for (int i = 0; i < workerList.get().size(); i++)
|
||||
dumpRequests.push_back(workerList.get()[i].interf.traceBatchDumpRequest.tryGetReply(
|
||||
TraceBatchDumpRequest()));
|
||||
wait(waitForAll(dumpRequests));
|
||||
return true;
|
||||
}
|
||||
wait(delay(1.0));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FailedToDumpWorkers").error(e);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override {
|
||||
clients.clear();
|
||||
|
||||
if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration)
|
||||
metricsDuration = now() - metricsStart;
|
||||
|
||||
g_traceBatch.dump();
|
||||
if (clientId == 0)
|
||||
return traceDumpWorkers(dbInfo);
|
||||
else
|
||||
return true;
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||
double duration = metricsDuration;
|
||||
int reads =
|
||||
(aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB);
|
||||
int writes =
|
||||
(aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB);
|
||||
m.emplace_back("Measured Duration", duration, Averaged::True);
|
||||
m.emplace_back(
|
||||
"Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False);
|
||||
m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False);
|
||||
m.push_back(aTransactions.getMetric());
|
||||
m.push_back(bTransactions.getMetric());
|
||||
m.push_back(retries.getMetric());
|
||||
m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True);
|
||||
m.emplace_back("Read rows", reads, Averaged::False);
|
||||
m.emplace_back("Write rows", writes, Averaged::False);
|
||||
m.emplace_back("Read rows/sec", reads / duration, Averaged::False);
|
||||
m.emplace_back("Write rows/sec", writes / duration, Averaged::False);
|
||||
m.emplace_back(
|
||||
"Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
|
||||
m.emplace_back("Bytes written/sec",
|
||||
(writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration,
|
||||
Averaged::False);
|
||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||
|
||||
std::vector<std::pair<uint64_t, double>>::iterator ratesItr = ratesAtKeyCounts.begin();
|
||||
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++)
|
||||
m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False);
|
||||
}
|
||||
|
||||
Value randomValue() {
|
||||
return StringRef((uint8_t*)valueString.c_str(),
|
||||
deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1));
|
||||
}
|
||||
|
||||
Standalone<KeyValueRef> operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); }
|
||||
|
||||
void debugPrintServerShards() const {
|
||||
std::cout << std::hex;
|
||||
for (auto it : this->serverShards) {
|
||||
|
@ -375,164 +176,6 @@ struct SkewedReadWriteWorkload : KVWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tracePeriodically(SkewedReadWriteWorkload* self) {
|
||||
state double start = now();
|
||||
state double elapsed = 0.0;
|
||||
state int64_t last_ops = 0;
|
||||
|
||||
loop {
|
||||
elapsed += self->periodicLoggingInterval;
|
||||
wait(delayUntil(start + elapsed));
|
||||
|
||||
TraceEvent((self->description() + "_RowReadLatency").c_str())
|
||||
.detail("Mean", self->readLatencies.mean())
|
||||
.detail("Median", self->readLatencies.median())
|
||||
.detail("Percentile5", self->readLatencies.percentile(.05))
|
||||
.detail("Percentile95", self->readLatencies.percentile(.95))
|
||||
.detail("Percentile99", self->readLatencies.percentile(.99))
|
||||
.detail("Percentile99_9", self->readLatencies.percentile(.999))
|
||||
.detail("Max", self->readLatencies.max())
|
||||
.detail("Count", self->readLatencyCount)
|
||||
.detail("Elapsed", elapsed);
|
||||
|
||||
TraceEvent((self->description() + "_GRVLatency").c_str())
|
||||
.detail("Mean", self->GRVLatencies.mean())
|
||||
.detail("Median", self->GRVLatencies.median())
|
||||
.detail("Percentile5", self->GRVLatencies.percentile(.05))
|
||||
.detail("Percentile95", self->GRVLatencies.percentile(.95))
|
||||
.detail("Percentile99", self->GRVLatencies.percentile(.99))
|
||||
.detail("Percentile99_9", self->GRVLatencies.percentile(.999))
|
||||
.detail("Max", self->GRVLatencies.max());
|
||||
|
||||
TraceEvent((self->description() + "_CommitLatency").c_str())
|
||||
.detail("Mean", self->commitLatencies.mean())
|
||||
.detail("Median", self->commitLatencies.median())
|
||||
.detail("Percentile5", self->commitLatencies.percentile(.05))
|
||||
.detail("Percentile95", self->commitLatencies.percentile(.95))
|
||||
.detail("Percentile99", self->commitLatencies.percentile(.99))
|
||||
.detail("Percentile99_9", self->commitLatencies.percentile(.999))
|
||||
.detail("Max", self->commitLatencies.max());
|
||||
|
||||
TraceEvent((self->description() + "_TotalLatency").c_str())
|
||||
.detail("Mean", self->latencies.mean())
|
||||
.detail("Median", self->latencies.median())
|
||||
.detail("Percentile5", self->latencies.percentile(.05))
|
||||
.detail("Percentile95", self->latencies.percentile(.95))
|
||||
.detail("Percentile99", self->latencies.percentile(.99))
|
||||
.detail("Percentile99_9", self->latencies.percentile(.999))
|
||||
.detail("Max", self->latencies.max());
|
||||
|
||||
int64_t ops =
|
||||
(self->aTransactions.getValue() * (self->readsPerTransactionA + self->writesPerTransactionA)) +
|
||||
(self->bTransactions.getValue() * (self->readsPerTransactionB + self->writesPerTransactionB));
|
||||
bool recordBegin = self->shouldRecord(std::max(now() - self->periodicLoggingInterval, self->clientBegin));
|
||||
bool recordEnd = self->shouldRecord(now());
|
||||
if (recordBegin && recordEnd) {
|
||||
std::string ts = format("T=%04.0fs:", elapsed);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Operations/sec", (ops - last_ops) / self->periodicLoggingInterval, Averaged::False);
|
||||
|
||||
// if(self->rampUpLoad) {
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Mean Latency (ms)", 1000 * self->latencies.mean(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Median Latency (ms, averaged)", 1000 * self->latencies.median(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "5% Latency (ms, averaged)", 1000 * self->latencies.percentile(.05), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "95% Latency (ms, averaged)", 1000 * self->latencies.percentile(.95), Averaged::True);
|
||||
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Mean Row Read Latency (ms)", 1000 * self->readLatencies.mean(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Median Row Read Latency (ms, averaged)", 1000 * self->readLatencies.median(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(ts + "5% Row Read Latency (ms, averaged)",
|
||||
1000 * self->readLatencies.percentile(.05),
|
||||
Averaged::True);
|
||||
self->periodicMetrics.emplace_back(ts + "95% Row Read Latency (ms, averaged)",
|
||||
1000 * self->readLatencies.percentile(.95),
|
||||
Averaged::True);
|
||||
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Mean Total Read Latency (ms)", 1000 * self->fullReadLatencies.mean(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(ts + "Median Total Read Latency (ms, averaged)",
|
||||
1000 * self->fullReadLatencies.median(),
|
||||
Averaged::True);
|
||||
self->periodicMetrics.emplace_back(ts + "5% Total Read Latency (ms, averaged)",
|
||||
1000 * self->fullReadLatencies.percentile(.05),
|
||||
Averaged::True);
|
||||
self->periodicMetrics.emplace_back(ts + "95% Total Read Latency (ms, averaged)",
|
||||
1000 * self->fullReadLatencies.percentile(.95),
|
||||
Averaged::True);
|
||||
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Mean GRV Latency (ms)", 1000 * self->GRVLatencies.mean(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Median GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.median(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "5% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.05), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "95% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.95), Averaged::True);
|
||||
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Mean Commit Latency (ms)", 1000 * self->commitLatencies.mean(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Median Commit Latency (ms, averaged)", 1000 * self->commitLatencies.median(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(ts + "5% Commit Latency (ms, averaged)",
|
||||
1000 * self->commitLatencies.percentile(.05),
|
||||
Averaged::True);
|
||||
self->periodicMetrics.emplace_back(ts + "95% Commit Latency (ms, averaged)",
|
||||
1000 * self->commitLatencies.percentile(.95),
|
||||
Averaged::True);
|
||||
//}
|
||||
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Max Latency (ms, averaged)", 1000 * self->latencies.max(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Max Row Read Latency (ms, averaged)", 1000 * self->readLatencies.max(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Max Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.max(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Max GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.max(), Averaged::True);
|
||||
self->periodicMetrics.emplace_back(
|
||||
ts + "Max Commit Latency (ms, averaged)", 1000 * self->commitLatencies.max(), Averaged::True);
|
||||
}
|
||||
last_ops = ops;
|
||||
|
||||
// if(self->rampUpLoad) {
|
||||
self->latencies.clear();
|
||||
self->readLatencies.clear();
|
||||
self->fullReadLatencies.clear();
|
||||
self->GRVLatencies.clear();
|
||||
self->commitLatencies.clear();
|
||||
//}
|
||||
|
||||
self->readLatencyTotal = 0.0;
|
||||
self->readLatencyCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f,
|
||||
ContinuousSample<double>* latencies,
|
||||
double* totalLatency,
|
||||
int* latencyCount,
|
||||
EventMetricHandle<ReadMetric> readMetric,
|
||||
bool shouldRecord) {
|
||||
state double readBegin = now();
|
||||
Optional<Value> value = wait(f);
|
||||
|
||||
double latency = now() - readBegin;
|
||||
readMetric->readLatency = latency * 1e9;
|
||||
readMetric->log();
|
||||
|
||||
if (shouldRecord) {
|
||||
*totalLatency += latency;
|
||||
++*latencyCount;
|
||||
latencies->addSample(latency);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class Trans>
|
||||
Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, SkewedReadWriteWorkload* self, bool shouldRecord) {
|
||||
if (!keys.size())
|
||||
|
@ -541,41 +184,13 @@ struct SkewedReadWriteWorkload : KVWorkload {
|
|||
std::vector<Future<Void>> readers;
|
||||
for (int op = 0; op < keys.size(); op++) {
|
||||
++self->totalReadsMetric;
|
||||
readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])),
|
||||
&self->readLatencies,
|
||||
&self->readLatencyTotal,
|
||||
&self->readLatencyCount,
|
||||
self->readMetric,
|
||||
shouldRecord));
|
||||
readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord));
|
||||
}
|
||||
|
||||
wait(waitForAll(readers));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _setup(Database cx, SkewedReadWriteWorkload* self) {
|
||||
if (!self->doSetup)
|
||||
return Void();
|
||||
|
||||
state Promise<double> loadTime;
|
||||
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
||||
|
||||
wait(bulkSetup(cx,
|
||||
self,
|
||||
self->nodeCount,
|
||||
loadTime,
|
||||
self->insertionCountsToMeasure.empty(),
|
||||
self->warmingDelay,
|
||||
self->maxInsertRate,
|
||||
self->insertionCountsToMeasure,
|
||||
ratesAtKeyCounts));
|
||||
|
||||
self->loadTime = loadTime.getFuture().get();
|
||||
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
void startReadWriteClients(Database cx, std::vector<Future<Void>>& clients) {
|
||||
clientBegin = now();
|
||||
for (int c = 0; c < actorCount; c++) {
|
||||
|
@ -592,7 +207,7 @@ struct SkewedReadWriteWorkload : KVWorkload {
|
|||
ACTOR static Future<Void> _start(Database cx, SkewedReadWriteWorkload* self) {
|
||||
state std::vector<Future<Void>> clients;
|
||||
if (self->enableReadLatencyLogging)
|
||||
clients.push_back(tracePeriodically(self));
|
||||
clients.push_back(self->tracePeriodically());
|
||||
|
||||
wait(updateServerShards(cx, self));
|
||||
for (self->currentHotRound = 0; self->currentHotRound < self->skewRound; ++self->currentHotRound) {
|
||||
|
@ -606,13 +221,6 @@ struct SkewedReadWriteWorkload : KVWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
bool shouldRecord() { return shouldRecord(now()); }
|
||||
|
||||
bool shouldRecord(double checkTime) {
|
||||
double timeSinceStart = checkTime - clientBegin;
|
||||
return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration);
|
||||
}
|
||||
|
||||
// calculate hot server count
|
||||
void setHotServers() {
|
||||
hotServerCount = ceil(hotServerFraction * serverShards.size());
|
||||
|
|
Loading…
Reference in New Issue