Merge pull request #7087 from sfc-gh-xwang/features/read-skew
Add SkewedReadWriteWorkload
This commit is contained in:
commit
8e2a78bf3c
|
@ -302,7 +302,8 @@ std::pair<std::vector<std::pair<UID, NetworkAddress>>, std::vector<std::pair<UID
|
||||||
return std::make_pair(logs, oldLogs);
|
return std::make_pair(logs, oldLogs);
|
||||||
}
|
}
|
||||||
|
|
||||||
const KeyRef serverKeysPrefix = "\xff/serverKeys/"_sr;
|
const KeyRangeRef serverKeysRange = KeyRangeRef("\xff/serverKeys/"_sr, "\xff/serverKeys0"_sr);
|
||||||
|
const KeyRef serverKeysPrefix = serverKeysRange.begin;
|
||||||
const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTrue
|
const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTrue
|
||||||
serverKeysTrueEmptyRange = "3"_sr, // the server treats the range as empty.
|
serverKeysTrueEmptyRange = "3"_sr, // the server treats the range as empty.
|
||||||
serverKeysFalse;
|
serverKeysFalse;
|
||||||
|
@ -328,6 +329,18 @@ UID serverKeysDecodeServer(const KeyRef& key) {
|
||||||
rd >> server_id;
|
rd >> server_id;
|
||||||
return server_id;
|
return server_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key) {
|
||||||
|
UID server_id;
|
||||||
|
BinaryReader rd(key.removePrefix(serverKeysPrefix), Unversioned());
|
||||||
|
rd >> server_id;
|
||||||
|
rd.readBytes(1); // skip "/"
|
||||||
|
const auto remainingBytes = rd.remainingBytes();
|
||||||
|
KeyRef ref = KeyRef(rd.arenaRead(remainingBytes), remainingBytes);
|
||||||
|
// std::cout << ref.size() << " " << ref.toString() << std::endl;
|
||||||
|
return std::make_pair(server_id, Key(ref));
|
||||||
|
}
|
||||||
|
|
||||||
bool serverHasKey(ValueRef storedValue) {
|
bool serverHasKey(ValueRef storedValue) {
|
||||||
return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange;
|
return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange;
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,11 +99,13 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector<uint16_t>& serve
|
||||||
// Using the serverID as a prefix, then followed by the beginning of the shard range
|
// Using the serverID as a prefix, then followed by the beginning of the shard range
|
||||||
// as the key, the value indicates whether the shard does or does not exist on the server.
|
// as the key, the value indicates whether the shard does or does not exist on the server.
|
||||||
// These values can be changed as data movement occurs.
|
// These values can be changed as data movement occurs.
|
||||||
|
extern const KeyRangeRef serverKeysRange;
|
||||||
extern const KeyRef serverKeysPrefix;
|
extern const KeyRef serverKeysPrefix;
|
||||||
extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
|
extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
|
||||||
const Key serverKeysKey(UID serverID, const KeyRef& keys);
|
const Key serverKeysKey(UID serverID, const KeyRef& keys);
|
||||||
const Key serverKeysPrefixFor(UID serverID);
|
const Key serverKeysPrefixFor(UID serverID);
|
||||||
UID serverKeysDecodeServer(const KeyRef& key);
|
UID serverKeysDecodeServer(const KeyRef& key);
|
||||||
|
std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key);
|
||||||
bool serverHasKey(ValueRef storedValue);
|
bool serverHasKey(ValueRef storedValue);
|
||||||
|
|
||||||
extern const KeyRangeRef conflictingKeysRange;
|
extern const KeyRangeRef conflictingKeysRange;
|
||||||
|
|
|
@ -267,6 +267,7 @@ set(FDBSERVER_SRCS
|
||||||
workloads/ReadAfterWrite.actor.cpp
|
workloads/ReadAfterWrite.actor.cpp
|
||||||
workloads/ReadHotDetection.actor.cpp
|
workloads/ReadHotDetection.actor.cpp
|
||||||
workloads/ReadWrite.actor.cpp
|
workloads/ReadWrite.actor.cpp
|
||||||
|
workloads/ReadWriteWorkload.actor.h
|
||||||
workloads/RemoveServersSafely.actor.cpp
|
workloads/RemoveServersSafely.actor.cpp
|
||||||
workloads/ReportConflictingKeys.actor.cpp
|
workloads/ReportConflictingKeys.actor.cpp
|
||||||
workloads/RestoreBackup.actor.cpp
|
workloads/RestoreBackup.actor.cpp
|
||||||
|
@ -281,6 +282,7 @@ set(FDBSERVER_SRCS
|
||||||
workloads/Sideband.actor.cpp
|
workloads/Sideband.actor.cpp
|
||||||
workloads/SidebandSingle.actor.cpp
|
workloads/SidebandSingle.actor.cpp
|
||||||
workloads/SimpleAtomicAdd.actor.cpp
|
workloads/SimpleAtomicAdd.actor.cpp
|
||||||
|
workloads/SkewedReadWrite.actor.cpp
|
||||||
workloads/SlowTaskWorkload.actor.cpp
|
workloads/SlowTaskWorkload.actor.cpp
|
||||||
workloads/SnapTest.actor.cpp
|
workloads/SnapTest.actor.cpp
|
||||||
workloads/SpecialKeySpaceCorrectness.actor.cpp
|
workloads/SpecialKeySpaceCorrectness.actor.cpp
|
||||||
|
|
|
@ -99,6 +99,20 @@ Key KVWorkload::keyForIndex(uint64_t index) const {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t KVWorkload::indexForKey(const KeyRef& key, bool absent) const {
|
||||||
|
int idx = 0;
|
||||||
|
if (nodePrefix > 0) {
|
||||||
|
ASSERT(keyBytes >= 32);
|
||||||
|
idx += 16;
|
||||||
|
}
|
||||||
|
ASSERT(keyBytes >= 16);
|
||||||
|
// extract int64_t index, the reverse process of emplaceIndex()
|
||||||
|
auto end = key.size() - idx - (absent ? 1 : 0);
|
||||||
|
std::string str((char*)key.begin() + idx, end);
|
||||||
|
int64_t res = std::stoll(str, nullptr, 16);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
Key KVWorkload::keyForIndex(uint64_t index, bool absent) const {
|
Key KVWorkload::keyForIndex(uint64_t index, bool absent) const {
|
||||||
int adjustedKeyBytes = (absent) ? (keyBytes + 1) : keyBytes;
|
int adjustedKeyBytes = (absent) ? (keyBytes + 1) : keyBytes;
|
||||||
Key result = makeString(adjustedKeyBytes);
|
Key result = makeString(adjustedKeyBytes);
|
||||||
|
@ -112,8 +126,8 @@ Key KVWorkload::keyForIndex(uint64_t index, bool absent) const {
|
||||||
idx += 16;
|
idx += 16;
|
||||||
}
|
}
|
||||||
ASSERT(keyBytes >= 16);
|
ASSERT(keyBytes >= 16);
|
||||||
double d = double(index) / nodeCount;
|
emplaceIndex(data, idx, (int64_t)index);
|
||||||
emplaceIndex(data, idx, *(int64_t*)&d);
|
// ASSERT(indexForKey(result) == (int64_t)index); // debug assert
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,202 +28,13 @@
|
||||||
#include "fdbserver/WorkerInterface.actor.h"
|
#include "fdbserver/WorkerInterface.actor.h"
|
||||||
#include "fdbserver/workloads/workloads.actor.h"
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||||
|
#include "fdbserver/workloads/ReadWriteWorkload.actor.h"
|
||||||
#include "fdbclient/ReadYourWrites.h"
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
#include "flow/TDMetric.actor.h"
|
#include "flow/TDMetric.actor.h"
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
const int sampleSize = 10000;
|
struct ReadWriteCommonImpl {
|
||||||
static Future<Version> nextRV;
|
// trace methods
|
||||||
static Version lastRV = invalidVersion;
|
|
||||||
|
|
||||||
ACTOR static Future<Version> getNextRV(Database db) {
|
|
||||||
state Transaction tr(db);
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
Version v = wait(tr.getReadVersion());
|
|
||||||
return v;
|
|
||||||
} catch (Error& e) {
|
|
||||||
wait(tr.onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
static Future<Version> getInconsistentReadVersion(Database const& db) {
|
|
||||||
if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running
|
|
||||||
if (nextRV.isValid())
|
|
||||||
lastRV = nextRV.get();
|
|
||||||
nextRV = getNextRV(db);
|
|
||||||
}
|
|
||||||
if (lastRV == invalidVersion)
|
|
||||||
return nextRV;
|
|
||||||
else
|
|
||||||
return lastRV;
|
|
||||||
}
|
|
||||||
|
|
||||||
DESCR struct TransactionSuccessMetric {
|
|
||||||
int64_t totalLatency; // ns
|
|
||||||
int64_t startLatency; // ns
|
|
||||||
int64_t commitLatency; // ns
|
|
||||||
int64_t retries; // count
|
|
||||||
};
|
|
||||||
|
|
||||||
DESCR struct TransactionFailureMetric {
|
|
||||||
int64_t startLatency; // ns
|
|
||||||
int64_t errorCode; // flow error code
|
|
||||||
};
|
|
||||||
|
|
||||||
DESCR struct ReadMetric {
|
|
||||||
int64_t readLatency; // ns
|
|
||||||
};
|
|
||||||
|
|
||||||
struct ReadWriteWorkload : KVWorkload {
|
|
||||||
int readsPerTransactionA, writesPerTransactionA;
|
|
||||||
int readsPerTransactionB, writesPerTransactionB;
|
|
||||||
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
|
|
||||||
double testDuration, transactionsPerSecond, alpha, warmingDelay, loadTime, maxInsertRate, debugInterval, debugTime;
|
|
||||||
double metricsStart, metricsDuration, clientBegin;
|
|
||||||
std::string valueString;
|
|
||||||
|
|
||||||
bool dependentReads;
|
|
||||||
bool enableReadLatencyLogging;
|
|
||||||
double periodicLoggingInterval;
|
|
||||||
bool cancelWorkersAtDuration;
|
|
||||||
bool inconsistentReads;
|
|
||||||
bool adjacentReads;
|
|
||||||
bool adjacentWrites;
|
|
||||||
bool rampUpLoad;
|
|
||||||
int rampSweepCount;
|
|
||||||
double hotKeyFraction, forceHotProbability;
|
|
||||||
bool rangeReads;
|
|
||||||
bool useRYW;
|
|
||||||
bool rampTransactionType;
|
|
||||||
bool rampUpConcurrency;
|
|
||||||
bool batchPriority;
|
|
||||||
|
|
||||||
Standalone<StringRef> descriptionString;
|
|
||||||
|
|
||||||
Int64MetricHandle totalReadsMetric;
|
|
||||||
Int64MetricHandle totalRetriesMetric;
|
|
||||||
EventMetricHandle<TransactionSuccessMetric> transactionSuccessMetric;
|
|
||||||
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
|
|
||||||
EventMetricHandle<ReadMetric> readMetric;
|
|
||||||
|
|
||||||
std::vector<Future<Void>> clients;
|
|
||||||
PerfIntCounter aTransactions, bTransactions, retries;
|
|
||||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
|
|
||||||
double readLatencyTotal;
|
|
||||||
int readLatencyCount;
|
|
||||||
|
|
||||||
std::vector<uint64_t> insertionCountsToMeasure;
|
|
||||||
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
|
|
||||||
|
|
||||||
std::vector<PerfMetric> periodicMetrics;
|
|
||||||
|
|
||||||
bool doSetup;
|
|
||||||
|
|
||||||
ReadWriteWorkload(WorkloadContext const& wcx)
|
|
||||||
: KVWorkload(wcx), loadTime(0.0), clientBegin(0), dependentReads(false), adjacentReads(false),
|
|
||||||
adjacentWrites(false), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")),
|
|
||||||
totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"),
|
|
||||||
bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize),
|
|
||||||
commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0),
|
|
||||||
readLatencyCount(0) {
|
|
||||||
transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction"));
|
|
||||||
transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction"));
|
|
||||||
readMetric.init(LiteralStringRef("RWWorkload.Read"));
|
|
||||||
|
|
||||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
|
||||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
|
|
||||||
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
|
|
||||||
actorCount = ceil(transactionsPerSecond * allowedLatency);
|
|
||||||
actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount);
|
|
||||||
|
|
||||||
readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10);
|
|
||||||
writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0);
|
|
||||||
readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1);
|
|
||||||
writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9);
|
|
||||||
alpha = getOption(options, LiteralStringRef("alpha"), 0.1);
|
|
||||||
|
|
||||||
extraReadConflictRangesPerTransaction =
|
|
||||||
getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0);
|
|
||||||
extraWriteConflictRangesPerTransaction =
|
|
||||||
getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0);
|
|
||||||
|
|
||||||
valueString = std::string(maxValueBytes, '.');
|
|
||||||
if (nodePrefix > 0) {
|
|
||||||
keyBytes += 16;
|
|
||||||
}
|
|
||||||
|
|
||||||
metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0);
|
|
||||||
metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration);
|
|
||||||
if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) {
|
|
||||||
// discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test
|
|
||||||
metricsStart += testDuration * 0.125;
|
|
||||||
metricsDuration *= 0.75;
|
|
||||||
}
|
|
||||||
|
|
||||||
dependentReads = getOption(options, LiteralStringRef("dependentReads"), false);
|
|
||||||
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
|
||||||
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
|
||||||
debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0);
|
|
||||||
debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0);
|
|
||||||
enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false);
|
|
||||||
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
|
|
||||||
cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true);
|
|
||||||
inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false);
|
|
||||||
adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false);
|
|
||||||
adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false);
|
|
||||||
rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false);
|
|
||||||
useRYW = getOption(options, LiteralStringRef("useRYW"), false);
|
|
||||||
rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1);
|
|
||||||
rangeReads = getOption(options, LiteralStringRef("rangeReads"), false);
|
|
||||||
rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false);
|
|
||||||
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
|
|
||||||
doSetup = getOption(options, LiteralStringRef("setup"), true);
|
|
||||||
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
|
|
||||||
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
|
|
||||||
|
|
||||||
if (rampUpConcurrency)
|
|
||||||
ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down
|
|
||||||
|
|
||||||
// Validate that keyForIndex() is monotonic
|
|
||||||
for (int i = 0; i < 30; i++) {
|
|
||||||
int64_t a = deterministicRandom()->randomInt64(0, nodeCount);
|
|
||||||
int64_t b = deterministicRandom()->randomInt64(0, nodeCount);
|
|
||||||
if (a > b) {
|
|
||||||
std::swap(a, b);
|
|
||||||
}
|
|
||||||
ASSERT(a <= b);
|
|
||||||
ASSERT((keyForIndex(a, false) <= keyForIndex(b, false)));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<std::string> insertionCountsToMeasureString =
|
|
||||||
getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector<std::string>());
|
|
||||||
for (int i = 0; i < insertionCountsToMeasureString.size(); i++) {
|
|
||||||
try {
|
|
||||||
uint64_t count = boost::lexical_cast<uint64_t>(insertionCountsToMeasureString[i]);
|
|
||||||
insertionCountsToMeasure.push_back(count);
|
|
||||||
} catch (...) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// with P(hotTrafficFraction) an access is directed to one of a fraction
|
|
||||||
// of hot keys, else it is directed to a disjoint set of cold keys
|
|
||||||
hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0);
|
|
||||||
double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0);
|
|
||||||
ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1);
|
|
||||||
ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot!
|
|
||||||
// p(Cold key) = (1-FHP) * (1-hkf)
|
|
||||||
// p(Cold key) = (1-htf)
|
|
||||||
// solving for FHP gives:
|
|
||||||
forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string description() const override { return descriptionString.toString(); }
|
|
||||||
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
|
|
||||||
Future<Void> start(Database const& cx) override { return _start(cx, this); }
|
|
||||||
|
|
||||||
ACTOR static Future<bool> traceDumpWorkers(Reference<AsyncVar<ServerDBInfo> const> db) {
|
ACTOR static Future<bool> traceDumpWorkers(Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||||
try {
|
try {
|
||||||
loop {
|
loop {
|
||||||
|
@ -250,91 +61,7 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ACTOR static Future<Void> tracePeriodically(ReadWriteCommon* self) {
|
||||||
Future<bool> check(Database const& cx) override {
|
|
||||||
clients.clear();
|
|
||||||
|
|
||||||
if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration)
|
|
||||||
metricsDuration = now() - metricsStart;
|
|
||||||
|
|
||||||
g_traceBatch.dump();
|
|
||||||
if (clientId == 0)
|
|
||||||
return traceDumpWorkers(dbInfo);
|
|
||||||
else
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
|
||||||
double duration = metricsDuration;
|
|
||||||
int reads =
|
|
||||||
(aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB);
|
|
||||||
int writes =
|
|
||||||
(aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB);
|
|
||||||
m.emplace_back("Measured Duration", duration, Averaged::True);
|
|
||||||
m.emplace_back(
|
|
||||||
"Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False);
|
|
||||||
m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False);
|
|
||||||
m.push_back(aTransactions.getMetric());
|
|
||||||
m.push_back(bTransactions.getMetric());
|
|
||||||
m.push_back(retries.getMetric());
|
|
||||||
m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True);
|
|
||||||
m.emplace_back("Read rows", reads, Averaged::False);
|
|
||||||
m.emplace_back("Write rows", writes, Averaged::False);
|
|
||||||
|
|
||||||
if (!rampUpLoad) {
|
|
||||||
m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True);
|
|
||||||
m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True);
|
|
||||||
m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True);
|
|
||||||
m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True);
|
|
||||||
m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True);
|
|
||||||
|
|
||||||
m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True);
|
|
||||||
m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True);
|
|
||||||
m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True);
|
|
||||||
|
|
||||||
m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True);
|
|
||||||
m.emplace_back(
|
|
||||||
"Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True);
|
|
||||||
m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True);
|
|
||||||
|
|
||||||
m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True);
|
|
||||||
m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True);
|
|
||||||
m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True);
|
|
||||||
|
|
||||||
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True);
|
|
||||||
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True);
|
|
||||||
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True);
|
|
||||||
}
|
|
||||||
|
|
||||||
m.emplace_back("Read rows/sec", reads / duration, Averaged::False);
|
|
||||||
m.emplace_back("Write rows/sec", writes / duration, Averaged::False);
|
|
||||||
m.emplace_back(
|
|
||||||
"Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
|
|
||||||
m.emplace_back("Bytes written/sec",
|
|
||||||
(writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration,
|
|
||||||
Averaged::False);
|
|
||||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
|
||||||
|
|
||||||
std::vector<std::pair<uint64_t, double>>::iterator ratesItr = ratesAtKeyCounts.begin();
|
|
||||||
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++)
|
|
||||||
m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False);
|
|
||||||
}
|
|
||||||
|
|
||||||
Value randomValue() {
|
|
||||||
return StringRef((uint8_t*)valueString.c_str(),
|
|
||||||
deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
Standalone<KeyValueRef> operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); }
|
|
||||||
|
|
||||||
template <class Trans>
|
|
||||||
void setupTransaction(Trans* tr) {
|
|
||||||
if (batchPriority) {
|
|
||||||
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR static Future<Void> tracePeriodically(ReadWriteWorkload* self) {
|
|
||||||
state double start = now();
|
state double start = now();
|
||||||
state double elapsed = 0.0;
|
state double elapsed = 0.0;
|
||||||
state int64_t last_ops = 0;
|
state int64_t last_ops = 0;
|
||||||
|
@ -470,7 +197,6 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
self->readLatencyCount = 0;
|
self->readLatencyCount = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f,
|
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f,
|
||||||
ContinuousSample<double>* latencies,
|
ContinuousSample<double>* latencies,
|
||||||
double* totalLatency,
|
double* totalLatency,
|
||||||
|
@ -491,7 +217,6 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> logLatency(Future<RangeResult> f,
|
ACTOR static Future<Void> logLatency(Future<RangeResult> f,
|
||||||
ContinuousSample<double>* latencies,
|
ContinuousSample<double>* latencies,
|
||||||
double* totalLatency,
|
double* totalLatency,
|
||||||
|
@ -513,52 +238,7 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR template <class Trans>
|
ACTOR static Future<Void> setup(Database cx, ReadWriteCommon* self) {
|
||||||
Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, ReadWriteWorkload* self, bool shouldRecord) {
|
|
||||||
if (!keys.size())
|
|
||||||
return Void();
|
|
||||||
if (!self->dependentReads) {
|
|
||||||
std::vector<Future<Void>> readers;
|
|
||||||
if (self->rangeReads) {
|
|
||||||
for (int op = 0; op < keys.size(); op++) {
|
|
||||||
++self->totalReadsMetric;
|
|
||||||
readers.push_back(logLatency(
|
|
||||||
tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))),
|
|
||||||
GetRangeLimits(-1, 80000)),
|
|
||||||
&self->readLatencies,
|
|
||||||
&self->readLatencyTotal,
|
|
||||||
&self->readLatencyCount,
|
|
||||||
self->readMetric,
|
|
||||||
shouldRecord));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (int op = 0; op < keys.size(); op++) {
|
|
||||||
++self->totalReadsMetric;
|
|
||||||
readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])),
|
|
||||||
&self->readLatencies,
|
|
||||||
&self->readLatencyTotal,
|
|
||||||
&self->readLatencyCount,
|
|
||||||
self->readMetric,
|
|
||||||
shouldRecord));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wait(waitForAll(readers));
|
|
||||||
} else {
|
|
||||||
state int op;
|
|
||||||
for (op = 0; op < keys.size(); op++) {
|
|
||||||
++self->totalReadsMetric;
|
|
||||||
wait(logLatency(tr->get(self->keyForIndex(keys[op])),
|
|
||||||
&self->readLatencies,
|
|
||||||
&self->readLatencyTotal,
|
|
||||||
&self->readLatencyCount,
|
|
||||||
self->readMetric,
|
|
||||||
shouldRecord));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> _setup(Database cx, ReadWriteWorkload* self) {
|
|
||||||
if (!self->doSetup)
|
if (!self->doSetup)
|
||||||
return Void();
|
return Void();
|
||||||
|
|
||||||
|
@ -580,8 +260,232 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
ACTOR Future<Void> _start(Database cx, ReadWriteWorkload* self) {
|
Future<Void> ReadWriteCommon::tracePeriodically() {
|
||||||
|
return ReadWriteCommonImpl::tracePeriodically(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> ReadWriteCommon::logLatency(Future<Optional<Value>> f, bool shouldRecord) {
|
||||||
|
return ReadWriteCommonImpl::logLatency(
|
||||||
|
f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> ReadWriteCommon::logLatency(Future<RangeResult> f, bool shouldRecord) {
|
||||||
|
return ReadWriteCommonImpl::logLatency(
|
||||||
|
f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> ReadWriteCommon::setup(Database const& cx) {
|
||||||
|
return ReadWriteCommonImpl::setup(cx, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<bool> ReadWriteCommon::check(Database const& cx) {
|
||||||
|
clients.clear();
|
||||||
|
|
||||||
|
if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration)
|
||||||
|
metricsDuration = now() - metricsStart;
|
||||||
|
|
||||||
|
g_traceBatch.dump();
|
||||||
|
if (clientId == 0)
|
||||||
|
return ReadWriteCommonImpl::traceDumpWorkers(dbInfo);
|
||||||
|
else
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadWriteCommon::getMetrics(std::vector<PerfMetric>& m) {
|
||||||
|
double duration = metricsDuration;
|
||||||
|
int reads = (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB);
|
||||||
|
int writes =
|
||||||
|
(aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB);
|
||||||
|
m.emplace_back("Measured Duration", duration, Averaged::True);
|
||||||
|
m.emplace_back(
|
||||||
|
"Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False);
|
||||||
|
m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False);
|
||||||
|
m.push_back(aTransactions.getMetric());
|
||||||
|
m.push_back(bTransactions.getMetric());
|
||||||
|
m.push_back(retries.getMetric());
|
||||||
|
m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True);
|
||||||
|
m.emplace_back("Read rows", reads, Averaged::False);
|
||||||
|
m.emplace_back("Write rows", writes, Averaged::False);
|
||||||
|
m.emplace_back("Read rows/sec", reads / duration, Averaged::False);
|
||||||
|
m.emplace_back("Write rows/sec", writes / duration, Averaged::False);
|
||||||
|
m.emplace_back(
|
||||||
|
"Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
|
||||||
|
m.emplace_back(
|
||||||
|
"Bytes written/sec", (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
|
||||||
|
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||||
|
|
||||||
|
std::vector<std::pair<uint64_t, double>>::iterator ratesItr = ratesAtKeyCounts.begin();
|
||||||
|
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++)
|
||||||
|
m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False);
|
||||||
|
}
|
||||||
|
|
||||||
|
Value ReadWriteCommon::randomValue() {
|
||||||
|
return StringRef((uint8_t*)valueString.c_str(), deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
Standalone<KeyValueRef> ReadWriteCommon::operator()(uint64_t n) {
|
||||||
|
return KeyValueRef(keyForIndex(n, false), randomValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReadWriteCommon::shouldRecord(double checkTime) {
|
||||||
|
double timeSinceStart = checkTime - clientBegin;
|
||||||
|
return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Future<Version> nextRV;
|
||||||
|
static Version lastRV = invalidVersion;
|
||||||
|
|
||||||
|
ACTOR static Future<Version> getNextRV(Database db) {
|
||||||
|
state Transaction tr(db);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
Version v = wait(tr.getReadVersion());
|
||||||
|
return v;
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Future<Version> getInconsistentReadVersion(Database const& db) {
|
||||||
|
if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running
|
||||||
|
if (nextRV.isValid())
|
||||||
|
lastRV = nextRV.get();
|
||||||
|
nextRV = getNextRV(db);
|
||||||
|
}
|
||||||
|
if (lastRV == invalidVersion)
|
||||||
|
return nextRV;
|
||||||
|
else
|
||||||
|
return lastRV;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ReadWriteWorkload : ReadWriteCommon {
|
||||||
|
// use ReadWrite as a ramp up workload
|
||||||
|
bool rampUpLoad; // indicate this is a ramp up workload
|
||||||
|
int rampSweepCount; // how many times of ramp up
|
||||||
|
bool rampTransactionType; // choose transaction type based on client start time
|
||||||
|
bool rampUpConcurrency; // control client concurrency
|
||||||
|
|
||||||
|
// transaction setting
|
||||||
|
bool batchPriority;
|
||||||
|
bool rangeReads; // read operations are all single key range read
|
||||||
|
bool dependentReads; // read operations are issued sequentially
|
||||||
|
bool inconsistentReads; // read with previous read version
|
||||||
|
bool adjacentReads; // keys are adjacent within a transaction
|
||||||
|
bool adjacentWrites;
|
||||||
|
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
|
||||||
|
|
||||||
|
// hot traffic pattern
|
||||||
|
double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting
|
||||||
|
|
||||||
|
ReadWriteWorkload(WorkloadContext const& wcx)
|
||||||
|
: ReadWriteCommon(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false) {
|
||||||
|
extraReadConflictRangesPerTransaction =
|
||||||
|
getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0);
|
||||||
|
extraWriteConflictRangesPerTransaction =
|
||||||
|
getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0);
|
||||||
|
dependentReads = getOption(options, LiteralStringRef("dependentReads"), false);
|
||||||
|
inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false);
|
||||||
|
adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false);
|
||||||
|
adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false);
|
||||||
|
rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false);
|
||||||
|
rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1);
|
||||||
|
rangeReads = getOption(options, LiteralStringRef("rangeReads"), false);
|
||||||
|
rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false);
|
||||||
|
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
|
||||||
|
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
|
||||||
|
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
|
||||||
|
|
||||||
|
if (rampUpConcurrency)
|
||||||
|
ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down
|
||||||
|
|
||||||
|
{
|
||||||
|
// with P(hotTrafficFraction) an access is directed to one of a fraction
|
||||||
|
// of hot keys, else it is directed to a disjoint set of cold keys
|
||||||
|
hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0);
|
||||||
|
double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0);
|
||||||
|
ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1);
|
||||||
|
ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot!
|
||||||
|
// p(Cold key) = (1-FHP) * (1-hkf)
|
||||||
|
// p(Cold key) = (1-htf)
|
||||||
|
// solving for FHP gives:
|
||||||
|
forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string description() const override { return descriptionString.toString(); }
|
||||||
|
|
||||||
|
template <class Trans>
|
||||||
|
void setupTransaction(Trans* tr) {
|
||||||
|
if (batchPriority) {
|
||||||
|
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||||
|
ReadWriteCommon::getMetrics(m);
|
||||||
|
if (!rampUpLoad) {
|
||||||
|
m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True);
|
||||||
|
m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True);
|
||||||
|
m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True);
|
||||||
|
m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True);
|
||||||
|
m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True);
|
||||||
|
|
||||||
|
m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True);
|
||||||
|
m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True);
|
||||||
|
m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True);
|
||||||
|
|
||||||
|
m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True);
|
||||||
|
m.emplace_back(
|
||||||
|
"Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True);
|
||||||
|
m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True);
|
||||||
|
|
||||||
|
m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True);
|
||||||
|
m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True);
|
||||||
|
m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True);
|
||||||
|
|
||||||
|
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True);
|
||||||
|
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True);
|
||||||
|
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> start(Database const& cx) override { return _start(cx, this); }
|
||||||
|
|
||||||
|
ACTOR template <class Trans>
|
||||||
|
static Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, ReadWriteWorkload* self, bool shouldRecord) {
|
||||||
|
if (!keys.size())
|
||||||
|
return Void();
|
||||||
|
if (!self->dependentReads) {
|
||||||
|
std::vector<Future<Void>> readers;
|
||||||
|
if (self->rangeReads) {
|
||||||
|
for (int op = 0; op < keys.size(); op++) {
|
||||||
|
++self->totalReadsMetric;
|
||||||
|
readers.push_back(self->logLatency(
|
||||||
|
tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))),
|
||||||
|
GetRangeLimits(-1, 80000)),
|
||||||
|
shouldRecord));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int op = 0; op < keys.size(); op++) {
|
||||||
|
++self->totalReadsMetric;
|
||||||
|
readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wait(waitForAll(readers));
|
||||||
|
} else {
|
||||||
|
state int op;
|
||||||
|
for (op = 0; op < keys.size(); op++) {
|
||||||
|
++self->totalReadsMetric;
|
||||||
|
wait(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> _start(Database cx, ReadWriteWorkload* self) {
|
||||||
// Read one record from the database to warm the cache of keyServers
|
// Read one record from the database to warm the cache of keyServers
|
||||||
state std::vector<int64_t> keys;
|
state std::vector<int64_t> keys;
|
||||||
keys.push_back(deterministicRandom()->randomInt64(0, self->nodeCount));
|
keys.push_back(deterministicRandom()->randomInt64(0, self->nodeCount));
|
||||||
|
@ -603,7 +507,7 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
|
|
||||||
std::vector<Future<Void>> clients;
|
std::vector<Future<Void>> clients;
|
||||||
if (self->enableReadLatencyLogging)
|
if (self->enableReadLatencyLogging)
|
||||||
clients.push_back(tracePeriodically(self));
|
clients.push_back(self->tracePeriodically());
|
||||||
|
|
||||||
self->clientBegin = now();
|
self->clientBegin = now();
|
||||||
for (int c = 0; c < self->actorCount; c++) {
|
for (int c = 0; c < self->actorCount; c++) {
|
||||||
|
@ -625,13 +529,6 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool shouldRecord() { return shouldRecord(now()); }
|
|
||||||
|
|
||||||
bool shouldRecord(double checkTime) {
|
|
||||||
double timeSinceStart = checkTime - clientBegin;
|
|
||||||
return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration);
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t getRandomKey(uint64_t nodeCount) {
|
int64_t getRandomKey(uint64_t nodeCount) {
|
||||||
if (forceHotProbability && deterministicRandom()->random01() < forceHotProbability)
|
if (forceHotProbability && deterministicRandom()->random01() < forceHotProbability)
|
||||||
return deterministicRandom()->randomInt64(0, nodeCount * hotKeyFraction) /
|
return deterministicRandom()->randomInt64(0, nodeCount * hotKeyFraction) /
|
||||||
|
|
|
@ -0,0 +1,171 @@
|
||||||
|
/*
|
||||||
|
* ReadWriteWorkload.h
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H)
|
||||||
|
#define FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H
|
||||||
|
#include "fdbserver/workloads/ReadWriteWorkload.actor.g.h"
|
||||||
|
#elif !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_H)
|
||||||
|
#define FDBSERVER_READWRITEWORKLOAD_ACTOR_H
|
||||||
|
|
||||||
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
|
#include "flow/TDMetric.actor.h"
|
||||||
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
DESCR struct TransactionSuccessMetric {
|
||||||
|
int64_t totalLatency; // ns
|
||||||
|
int64_t startLatency; // ns
|
||||||
|
int64_t commitLatency; // ns
|
||||||
|
int64_t retries; // count
|
||||||
|
};
|
||||||
|
|
||||||
|
DESCR struct TransactionFailureMetric {
|
||||||
|
int64_t startLatency; // ns
|
||||||
|
int64_t errorCode; // flow error code
|
||||||
|
};
|
||||||
|
|
||||||
|
DESCR struct ReadMetric {
|
||||||
|
int64_t readLatency; // ns
|
||||||
|
};
|
||||||
|
|
||||||
|
// Common ReadWrite test settings
|
||||||
|
struct ReadWriteCommon : KVWorkload {
|
||||||
|
static constexpr int sampleSize = 10000;
|
||||||
|
friend struct ReadWriteCommonImpl;
|
||||||
|
|
||||||
|
// general test setting
|
||||||
|
Standalone<StringRef> descriptionString;
|
||||||
|
bool doSetup, cancelWorkersAtDuration;
|
||||||
|
double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime;
|
||||||
|
double metricsStart, metricsDuration;
|
||||||
|
std::vector<uint64_t> insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup
|
||||||
|
|
||||||
|
// test log setting
|
||||||
|
bool enableReadLatencyLogging;
|
||||||
|
double periodicLoggingInterval;
|
||||||
|
|
||||||
|
// two type of transaction
|
||||||
|
int readsPerTransactionA, writesPerTransactionA;
|
||||||
|
int readsPerTransactionB, writesPerTransactionB;
|
||||||
|
std::string valueString;
|
||||||
|
double alpha; // probability for run TransactionA type
|
||||||
|
// transaction setting
|
||||||
|
bool useRYW;
|
||||||
|
|
||||||
|
// states of metric
|
||||||
|
Int64MetricHandle totalReadsMetric;
|
||||||
|
Int64MetricHandle totalRetriesMetric;
|
||||||
|
EventMetricHandle<TransactionSuccessMetric> transactionSuccessMetric;
|
||||||
|
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
|
||||||
|
EventMetricHandle<ReadMetric> readMetric;
|
||||||
|
PerfIntCounter aTransactions, bTransactions, retries;
|
||||||
|
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
|
||||||
|
double readLatencyTotal;
|
||||||
|
int readLatencyCount;
|
||||||
|
std::vector<PerfMetric> periodicMetrics;
|
||||||
|
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts; // sequential insertion speed
|
||||||
|
|
||||||
|
// other internal states
|
||||||
|
std::vector<Future<Void>> clients;
|
||||||
|
double loadTime, clientBegin;
|
||||||
|
|
||||||
|
explicit ReadWriteCommon(WorkloadContext const& wcx)
|
||||||
|
: KVWorkload(wcx), totalReadsMetric(LiteralStringRef("ReadWrite.TotalReads")),
|
||||||
|
totalRetriesMetric(LiteralStringRef("ReadWrite.TotalRetries")), aTransactions("A Transactions"),
|
||||||
|
bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize),
|
||||||
|
commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0),
|
||||||
|
readLatencyCount(0), loadTime(0.0), clientBegin(0) {
|
||||||
|
|
||||||
|
transactionSuccessMetric.init(LiteralStringRef("ReadWrite.SuccessfulTransaction"));
|
||||||
|
transactionFailureMetric.init(LiteralStringRef("ReadWrite.FailedTransaction"));
|
||||||
|
readMetric.init(LiteralStringRef("ReadWrite.Read"));
|
||||||
|
|
||||||
|
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
||||||
|
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
|
||||||
|
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
|
||||||
|
actorCount = ceil(transactionsPerSecond * allowedLatency);
|
||||||
|
actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount);
|
||||||
|
|
||||||
|
readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10);
|
||||||
|
writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0);
|
||||||
|
readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1);
|
||||||
|
writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9);
|
||||||
|
alpha = getOption(options, LiteralStringRef("alpha"), 0.1);
|
||||||
|
|
||||||
|
valueString = std::string(maxValueBytes, '.');
|
||||||
|
if (nodePrefix > 0) {
|
||||||
|
keyBytes += 16;
|
||||||
|
}
|
||||||
|
|
||||||
|
metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0);
|
||||||
|
metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration);
|
||||||
|
if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) {
|
||||||
|
// discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test
|
||||||
|
metricsStart += testDuration * 0.125;
|
||||||
|
metricsDuration *= 0.75;
|
||||||
|
}
|
||||||
|
|
||||||
|
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
||||||
|
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
||||||
|
debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0);
|
||||||
|
debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0);
|
||||||
|
enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false);
|
||||||
|
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
|
||||||
|
cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true);
|
||||||
|
|
||||||
|
useRYW = getOption(options, LiteralStringRef("useRYW"), false);
|
||||||
|
doSetup = getOption(options, LiteralStringRef("setup"), true);
|
||||||
|
|
||||||
|
// Validate that keyForIndex() is monotonic
|
||||||
|
for (int i = 0; i < 30; i++) {
|
||||||
|
int64_t a = deterministicRandom()->randomInt64(0, nodeCount);
|
||||||
|
int64_t b = deterministicRandom()->randomInt64(0, nodeCount);
|
||||||
|
if (a > b) {
|
||||||
|
std::swap(a, b);
|
||||||
|
}
|
||||||
|
ASSERT(a <= b);
|
||||||
|
ASSERT((keyForIndex(a, false) <= keyForIndex(b, false)));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::string> insertionCountsToMeasureString =
|
||||||
|
getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector<std::string>());
|
||||||
|
for (int i = 0; i < insertionCountsToMeasureString.size(); i++) {
|
||||||
|
try {
|
||||||
|
uint64_t count = boost::lexical_cast<uint64_t>(insertionCountsToMeasureString[i]);
|
||||||
|
insertionCountsToMeasure.push_back(count);
|
||||||
|
} catch (...) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> tracePeriodically();
|
||||||
|
Future<Void> logLatency(Future<Optional<Value>> f, bool shouldRecord);
|
||||||
|
Future<Void> logLatency(Future<RangeResult> f, bool shouldRecord);
|
||||||
|
|
||||||
|
Future<Void> setup(Database const& cx) override;
|
||||||
|
Future<bool> check(Database const& cx) override;
|
||||||
|
void getMetrics(std::vector<PerfMetric>& m) override;
|
||||||
|
|
||||||
|
Standalone<KeyValueRef> operator()(uint64_t n);
|
||||||
|
bool shouldRecord(double checkTime = now());
|
||||||
|
Value randomValue();
|
||||||
|
};
|
||||||
|
|
||||||
|
#include "flow/unactorcompiler.h"
|
||||||
|
#endif // FDBSERVER_READWRITEWORKLOAD_ACTOR_H
|
|
@ -0,0 +1,386 @@
|
||||||
|
/*
|
||||||
|
* ReadWrite.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <boost/lexical_cast.hpp>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "fdbrpc/ContinuousSample.h"
|
||||||
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
|
#include "fdbserver/TesterInterface.actor.h"
|
||||||
|
#include "fdbserver/WorkerInterface.actor.h"
|
||||||
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
|
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||||
|
#include "fdbserver/workloads/ReadWriteWorkload.actor.h"
|
||||||
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
|
#include "flow/TDMetric.actor.h"
|
||||||
|
#include "fdbclient/RunTransaction.actor.h"
|
||||||
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
|
struct SkewedReadWriteWorkload : ReadWriteCommon {
|
||||||
|
// server based hot traffic setting
|
||||||
|
int skewRound = 0; // skewDuration = ceil(testDuration / skewRound)
|
||||||
|
double hotServerFraction = 0, hotServerShardFraction = 1.0; // set > 0 to issue hot key based on shard map
|
||||||
|
double hotServerReadFrac, hotServerWriteFrac; // hot many traffic goes to hot servers
|
||||||
|
double hotReadWriteServerOverlap; // the portion of intersection of write and hot server
|
||||||
|
|
||||||
|
// hot server state
|
||||||
|
typedef std::vector<std::pair<int64_t, int64_t>> IndexRangeVec;
|
||||||
|
// keyForIndex generate key from index. So for a shard range, recording the start and end is enough
|
||||||
|
std::vector<std::pair<UID, IndexRangeVec>> serverShards; // storage server and the shards it owns
|
||||||
|
std::map<UID, StorageServerInterface> serverInterfaces;
|
||||||
|
int hotServerCount = 0, currentHotRound = -1;
|
||||||
|
|
||||||
|
SkewedReadWriteWorkload(WorkloadContext const& wcx) : ReadWriteCommon(wcx) {
|
||||||
|
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("SkewedReadWrite"));
|
||||||
|
hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2);
|
||||||
|
hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0);
|
||||||
|
hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0);
|
||||||
|
skewRound = getOption(options, "skewRound"_sr, 1);
|
||||||
|
hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8);
|
||||||
|
hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0);
|
||||||
|
ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) && skewRound > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string description() const override { return descriptionString.toString(); }
|
||||||
|
Future<Void> start(Database const& cx) override { return _start(cx, this); }
|
||||||
|
|
||||||
|
void debugPrintServerShards() const {
|
||||||
|
std::cout << std::hex;
|
||||||
|
for (auto it : this->serverShards) {
|
||||||
|
std::cout << serverInterfaces.at(it.first).address().toString() << ": [";
|
||||||
|
for (auto p : it.second) {
|
||||||
|
std::cout << "[" << p.first << "," << p.second << "], ";
|
||||||
|
}
|
||||||
|
std::cout << "] \n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// for each boundary except the last one in boundaries, found the first existed key generated from keyForIndex as
|
||||||
|
// beginIdx, found the last existed key generated from keyForIndex the endIdx.
|
||||||
|
ACTOR static Future<IndexRangeVec> convertKeyBoundaryToIndexShard(Database cx,
|
||||||
|
SkewedReadWriteWorkload* self,
|
||||||
|
Standalone<VectorRef<KeyRef>> boundaries) {
|
||||||
|
state IndexRangeVec res;
|
||||||
|
state int i = 0;
|
||||||
|
for (; i < boundaries.size() - 1; ++i) {
|
||||||
|
KeyRangeRef currentShard = KeyRangeRef(boundaries[i], boundaries[i + 1]);
|
||||||
|
// std::cout << currentShard.toString() << "\n";
|
||||||
|
std::vector<RangeResult> ranges = wait(runRYWTransaction(
|
||||||
|
cx, [currentShard](Reference<ReadYourWritesTransaction> tr) -> Future<std::vector<RangeResult>> {
|
||||||
|
std::vector<Future<RangeResult>> f;
|
||||||
|
f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::False));
|
||||||
|
f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::True));
|
||||||
|
return getAll(f);
|
||||||
|
}));
|
||||||
|
ASSERT(ranges[0].size() == 1 && ranges[1].size() == 1);
|
||||||
|
res.emplace_back(self->indexForKey(ranges[0][0].key), self->indexForKey(ranges[1][0].key));
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(res.size() == boundaries.size() - 1);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> updateServerShards(Database cx, SkewedReadWriteWorkload* self) {
|
||||||
|
state Future<RangeResult> serverList =
|
||||||
|
runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) -> Future<RangeResult> {
|
||||||
|
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||||
|
return tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
|
||||||
|
});
|
||||||
|
state RangeResult range =
|
||||||
|
wait(runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) -> Future<RangeResult> {
|
||||||
|
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||||
|
return tr->getRange(serverKeysRange, CLIENT_KNOBS->TOO_MANY);
|
||||||
|
}));
|
||||||
|
wait(success(serverList));
|
||||||
|
// decode server interfaces
|
||||||
|
self->serverInterfaces.clear();
|
||||||
|
for (int i = 0; i < serverList.get().size(); i++) {
|
||||||
|
auto ssi = decodeServerListValue(serverList.get()[i].value);
|
||||||
|
self->serverInterfaces.emplace(ssi.id(), ssi);
|
||||||
|
}
|
||||||
|
// clear self->serverShards
|
||||||
|
self->serverShards.clear();
|
||||||
|
|
||||||
|
// leftEdge < workloadBegin < workloadEnd
|
||||||
|
Key workloadBegin = self->keyForIndex(0), workloadEnd = self->keyForIndex(self->nodeCount);
|
||||||
|
Key leftEdge(allKeys.begin);
|
||||||
|
std::vector<UID> leftServer; // left server owns the range [leftEdge, workloadBegin)
|
||||||
|
KeyRangeRef workloadRange(workloadBegin, workloadEnd);
|
||||||
|
state std::map<Key, std::vector<UID>> beginServers; // begin index to server ID
|
||||||
|
|
||||||
|
for (auto kv = range.begin(); kv != range.end(); kv++) {
|
||||||
|
if (serverHasKey(kv->value)) {
|
||||||
|
auto [id, key] = serverKeysDecodeServerBegin(kv->key);
|
||||||
|
|
||||||
|
if (workloadRange.contains(key)) {
|
||||||
|
beginServers[key].push_back(id);
|
||||||
|
} else if (workloadBegin > key && key > leftEdge) { // update left boundary
|
||||||
|
leftEdge = key;
|
||||||
|
leftServer.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (key == leftEdge) {
|
||||||
|
leftServer.push_back(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(beginServers.size() == 0 || beginServers.begin()->first >= workloadBegin);
|
||||||
|
// handle the left boundary
|
||||||
|
if (beginServers.size() == 0 || beginServers.begin()->first > workloadBegin) {
|
||||||
|
beginServers[workloadBegin] = leftServer;
|
||||||
|
}
|
||||||
|
Standalone<VectorRef<KeyRef>> keyBegins;
|
||||||
|
for (auto p = beginServers.begin(); p != beginServers.end(); ++p) {
|
||||||
|
keyBegins.push_back(keyBegins.arena(), p->first);
|
||||||
|
}
|
||||||
|
// deep count because wait below will destruct workloadEnd
|
||||||
|
keyBegins.push_back_deep(keyBegins.arena(), workloadEnd);
|
||||||
|
|
||||||
|
IndexRangeVec indexShards = wait(convertKeyBoundaryToIndexShard(cx, self, keyBegins));
|
||||||
|
ASSERT(beginServers.size() == indexShards.size());
|
||||||
|
// sort shard begin idx
|
||||||
|
// build self->serverShards, starting from the left shard
|
||||||
|
std::map<UID, IndexRangeVec> serverShards;
|
||||||
|
int i = 0;
|
||||||
|
for (auto p = beginServers.begin(); p != beginServers.end(); ++p) {
|
||||||
|
for (int j = 0; j < p->second.size(); ++j) {
|
||||||
|
serverShards[p->second[j]].emplace_back(indexShards[i]);
|
||||||
|
}
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
// self->serverShards is ordered by UID
|
||||||
|
for (auto it : serverShards) {
|
||||||
|
self->serverShards.emplace_back(it);
|
||||||
|
}
|
||||||
|
// if (self->clientId == 0) {
|
||||||
|
// self->debugPrintServerShards();
|
||||||
|
// }
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR template <class Trans>
|
||||||
|
Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, SkewedReadWriteWorkload* self, bool shouldRecord) {
|
||||||
|
if (!keys.size())
|
||||||
|
return Void();
|
||||||
|
|
||||||
|
std::vector<Future<Void>> readers;
|
||||||
|
for (int op = 0; op < keys.size(); op++) {
|
||||||
|
++self->totalReadsMetric;
|
||||||
|
readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord));
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(waitForAll(readers));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
void startReadWriteClients(Database cx, std::vector<Future<Void>>& clients) {
|
||||||
|
clientBegin = now();
|
||||||
|
for (int c = 0; c < actorCount; c++) {
|
||||||
|
Future<Void> worker;
|
||||||
|
if (useRYW)
|
||||||
|
worker =
|
||||||
|
randomReadWriteClient<ReadYourWritesTransaction>(cx, this, actorCount / transactionsPerSecond, c);
|
||||||
|
else
|
||||||
|
worker = randomReadWriteClient<Transaction>(cx, this, actorCount / transactionsPerSecond, c);
|
||||||
|
clients.push_back(worker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> _start(Database cx, SkewedReadWriteWorkload* self) {
|
||||||
|
state std::vector<Future<Void>> clients;
|
||||||
|
if (self->enableReadLatencyLogging)
|
||||||
|
clients.push_back(self->tracePeriodically());
|
||||||
|
|
||||||
|
wait(updateServerShards(cx, self));
|
||||||
|
for (self->currentHotRound = 0; self->currentHotRound < self->skewRound; ++self->currentHotRound) {
|
||||||
|
self->setHotServers();
|
||||||
|
self->startReadWriteClients(cx, clients);
|
||||||
|
wait(timeout(waitForAll(clients), self->testDuration / self->skewRound, Void()));
|
||||||
|
clients.clear();
|
||||||
|
wait(delay(5.0) >> updateServerShards(cx, self));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate hot server count
|
||||||
|
void setHotServers() {
|
||||||
|
hotServerCount = ceil(hotServerFraction * serverShards.size());
|
||||||
|
std::cout << "Choose " << hotServerCount << "/" << serverShards.size() << "/" << serverInterfaces.size()
|
||||||
|
<< " hot servers: [";
|
||||||
|
int begin = currentHotRound * hotServerCount;
|
||||||
|
for (int i = 0; i < hotServerCount; ++i) {
|
||||||
|
int idx = (begin + i) % serverShards.size();
|
||||||
|
std::cout << serverInterfaces.at(serverShards[idx].first).address().toString() << ",";
|
||||||
|
}
|
||||||
|
std::cout << "]\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t getRandomKeyFromHotServer(bool hotServerRead = true) {
|
||||||
|
ASSERT(hotServerCount > 0);
|
||||||
|
int begin = currentHotRound * hotServerCount;
|
||||||
|
if (!hotServerRead) {
|
||||||
|
begin += hotServerCount * (1.0 - hotReadWriteServerOverlap); // calculate non-overlap part offset
|
||||||
|
}
|
||||||
|
int idx = deterministicRandom()->randomInt(begin, begin + hotServerCount) % serverShards.size();
|
||||||
|
int shardMax = std::min(serverShards[idx].second.size(),
|
||||||
|
(size_t)ceil(serverShards[idx].second.size() * hotServerShardFraction));
|
||||||
|
int shardIdx = deterministicRandom()->randomInt(0, shardMax);
|
||||||
|
return deterministicRandom()->randomInt64(serverShards[idx].second[shardIdx].first,
|
||||||
|
serverShards[idx].second[shardIdx].second + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t getRandomKey(uint64_t nodeCount, bool hotServerRead = true) {
|
||||||
|
auto random = deterministicRandom()->random01();
|
||||||
|
if (hotServerFraction > 0) {
|
||||||
|
if ((hotServerRead && random < hotServerReadFrac) || (!hotServerRead && random < hotServerWriteFrac)) {
|
||||||
|
return getRandomKeyFromHotServer(hotServerRead);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return deterministicRandom()->randomInt64(0, nodeCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR template <class Trans>
|
||||||
|
Future<Void> randomReadWriteClient(Database cx, SkewedReadWriteWorkload* self, double delay, int clientIndex) {
|
||||||
|
state double startTime = now();
|
||||||
|
state double lastTime = now();
|
||||||
|
state double GRVStartTime;
|
||||||
|
state UID debugID;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
wait(poisson(&lastTime, delay));
|
||||||
|
|
||||||
|
state double tstart = now();
|
||||||
|
state bool aTransaction = deterministicRandom()->random01() > self->alpha;
|
||||||
|
|
||||||
|
state std::vector<int64_t> keys;
|
||||||
|
state std::vector<Value> values;
|
||||||
|
state std::vector<KeyRange> extra_ranges;
|
||||||
|
int reads = aTransaction ? self->readsPerTransactionA : self->readsPerTransactionB;
|
||||||
|
state int writes = aTransaction ? self->writesPerTransactionA : self->writesPerTransactionB;
|
||||||
|
for (int op = 0; op < reads; op++)
|
||||||
|
keys.push_back(self->getRandomKey(self->nodeCount));
|
||||||
|
|
||||||
|
values.reserve(writes);
|
||||||
|
for (int op = 0; op < writes; op++)
|
||||||
|
values.push_back(self->randomValue());
|
||||||
|
|
||||||
|
state Trans tr(cx);
|
||||||
|
|
||||||
|
if (tstart - self->clientBegin > self->debugTime &&
|
||||||
|
tstart - self->clientBegin <= self->debugTime + self->debugInterval) {
|
||||||
|
debugID = deterministicRandom()->randomUniqueID();
|
||||||
|
tr.debugTransaction(debugID);
|
||||||
|
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before");
|
||||||
|
} else {
|
||||||
|
debugID = UID();
|
||||||
|
}
|
||||||
|
|
||||||
|
self->transactionSuccessMetric->retries = 0;
|
||||||
|
self->transactionSuccessMetric->commitLatency = -1;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
GRVStartTime = now();
|
||||||
|
self->transactionFailureMetric->startLatency = -1;
|
||||||
|
|
||||||
|
double grvLatency = now() - GRVStartTime;
|
||||||
|
self->transactionSuccessMetric->startLatency = grvLatency * 1e9;
|
||||||
|
self->transactionFailureMetric->startLatency = grvLatency * 1e9;
|
||||||
|
if (self->shouldRecord())
|
||||||
|
self->GRVLatencies.addSample(grvLatency);
|
||||||
|
|
||||||
|
state double readStart = now();
|
||||||
|
wait(self->readOp(&tr, keys, self, self->shouldRecord()));
|
||||||
|
|
||||||
|
double readLatency = now() - readStart;
|
||||||
|
if (self->shouldRecord())
|
||||||
|
self->fullReadLatencies.addSample(readLatency);
|
||||||
|
|
||||||
|
if (!writes)
|
||||||
|
break;
|
||||||
|
|
||||||
|
for (int op = 0; op < writes; op++)
|
||||||
|
tr.set(self->keyForIndex(self->getRandomKey(self->nodeCount, false), false), values[op]);
|
||||||
|
|
||||||
|
state double commitStart = now();
|
||||||
|
wait(tr.commit());
|
||||||
|
|
||||||
|
double commitLatency = now() - commitStart;
|
||||||
|
self->transactionSuccessMetric->commitLatency = commitLatency * 1e9;
|
||||||
|
if (self->shouldRecord())
|
||||||
|
self->commitLatencies.addSample(commitLatency);
|
||||||
|
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
self->transactionFailureMetric->errorCode = e.code();
|
||||||
|
self->transactionFailureMetric->log();
|
||||||
|
|
||||||
|
wait(tr.onError(e));
|
||||||
|
|
||||||
|
++self->transactionSuccessMetric->retries;
|
||||||
|
++self->totalRetriesMetric;
|
||||||
|
|
||||||
|
if (self->shouldRecord())
|
||||||
|
++self->retries;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (debugID != UID())
|
||||||
|
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After");
|
||||||
|
|
||||||
|
tr = Trans();
|
||||||
|
|
||||||
|
double transactionLatency = now() - tstart;
|
||||||
|
self->transactionSuccessMetric->totalLatency = transactionLatency * 1e9;
|
||||||
|
self->transactionSuccessMetric->log();
|
||||||
|
|
||||||
|
if (self->shouldRecord()) {
|
||||||
|
if (aTransaction)
|
||||||
|
++self->aTransactions;
|
||||||
|
else
|
||||||
|
++self->bTransactions;
|
||||||
|
|
||||||
|
self->latencies.addSample(transactionLatency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
WorkloadFactory<SkewedReadWriteWorkload> SkewedReadWriteWorkloadFactory("SkewedReadWrite");
|
||||||
|
|
||||||
|
TEST_CASE("/KVWorkload/methods/ParseKeyForIndex") {
|
||||||
|
auto wk = SkewedReadWriteWorkload(WorkloadContext());
|
||||||
|
for (int i = 0; i < 1000; ++i) {
|
||||||
|
auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount);
|
||||||
|
Key k = wk.keyForIndex(idx);
|
||||||
|
auto parse = wk.indexForKey(k);
|
||||||
|
// std::cout << parse << " " << idx << "\n";
|
||||||
|
ASSERT(parse == idx);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 1000; ++i) {
|
||||||
|
auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount);
|
||||||
|
Key k = wk.keyForIndex(idx, true);
|
||||||
|
auto parse = wk.indexForKey(k, true);
|
||||||
|
ASSERT(parse == idx);
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
|
@ -131,6 +131,8 @@ struct KVWorkload : TestWorkload {
|
||||||
Key getRandomKey(bool absent) const;
|
Key getRandomKey(bool absent) const;
|
||||||
Key keyForIndex(uint64_t index) const;
|
Key keyForIndex(uint64_t index) const;
|
||||||
Key keyForIndex(uint64_t index, bool absent) const;
|
Key keyForIndex(uint64_t index, bool absent) const;
|
||||||
|
// the reverse process of keyForIndex() without division. Set absent=true to ignore the last byte in Key
|
||||||
|
int64_t indexForKey(const KeyRef& key, bool absent = false) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct IWorkloadFactory : ReferenceCounted<IWorkloadFactory> {
|
struct IWorkloadFactory : ReferenceCounted<IWorkloadFactory> {
|
||||||
|
|
|
@ -634,6 +634,8 @@ public:
|
||||||
check = nullptr;
|
check = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t remainingBytes() const { return end - begin; };
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
_Reader(const char* begin, const char* end) : begin(begin), end(end) {}
|
_Reader(const char* begin, const char* end) : begin(begin), end(end) {}
|
||||||
_Reader(const char* begin, const char* end, const Arena& arena) : begin(begin), end(end), m_pool(arena) {}
|
_Reader(const char* begin, const char* end, const Arena& arena) : begin(begin), end(end), m_pool(arena) {}
|
||||||
|
|
|
@ -211,6 +211,7 @@ if(WITH_PYTHON)
|
||||||
add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.toml)
|
add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.toml)
|
||||||
add_fdb_test(TEST_FILES rare/RYWDisable.toml)
|
add_fdb_test(TEST_FILES rare/RYWDisable.toml)
|
||||||
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml)
|
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml)
|
||||||
|
add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml)
|
||||||
add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml)
|
add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml)
|
||||||
add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml)
|
add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml)
|
||||||
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml)
|
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml)
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
[[test]]
|
||||||
|
testTitle = 'SkewedReadWriteTest'
|
||||||
|
connectionFailuresDisableDuration = 100000
|
||||||
|
# waitForQuiescenceBegin= false
|
||||||
|
# waitForQuiescenceEnd=false
|
||||||
|
clearAfterTest = true
|
||||||
|
runSetup = true # false
|
||||||
|
timeout = 3600.0
|
||||||
|
|
||||||
|
[[test.workload]]
|
||||||
|
testName = 'SkewedReadWrite'
|
||||||
|
transactionsPerSecond = 100
|
||||||
|
testDuration = 40.0
|
||||||
|
skewRound = 1
|
||||||
|
nodeCount = 3000 # 30000000
|
||||||
|
valueBytes = 100
|
||||||
|
readsPerTransactionA = 8
|
||||||
|
writesPerTransactionA = 0
|
||||||
|
alpha = 0
|
||||||
|
discardEdgeMeasurements = false
|
||||||
|
hotServerFraction = 0.2
|
||||||
|
hotServerReadFrac = 0.8
|
||||||
|
# hotServerShardFraction = 0.3
|
||||||
|
warmingDelay = 180.0
|
Loading…
Reference in New Issue