foundationdb/fdbserver/workloads/Mako.actor.cpp

868 lines
32 KiB
C++
Raw Normal View History

2019-05-10 06:35:30 +08:00
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
2019-05-31 15:27:30 +08:00
#include "fdbserver/workloads/workloads.actor.h"
2019-05-10 06:35:30 +08:00
#include "fdbserver/workloads/BulkSetup.actor.h"
2019-05-22 07:22:02 +08:00
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/zipf.h"
#include "flow/crc32c.h"
#include "flow/actorcompiler.h"
2020-09-02 08:28:32 +08:00
enum {
OP_GETREADVERSION,
OP_GET,
OP_GETRANGE,
OP_SGET,
OP_SGETRANGE,
OP_UPDATE,
OP_INSERT,
OP_INSERTRANGE,
OP_CLEAR,
OP_SETCLEAR,
OP_CLEARRANGE,
OP_SETCLEARRANGE,
OP_COMMIT,
MAX_OP
};
enum { OP_COUNT, OP_RANGE };
2019-06-03 14:16:39 +08:00
struct MakoWorkload : TestWorkload {
2020-09-02 08:28:32 +08:00
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes, csSize,
csCount, csPartitionSize, csStepSizeInPartition;
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency,
periodicLoggingInterval, zipfConstant;
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification,
doChecksumVerificationOnly, latencyForLocalOperation;
2019-05-15 01:13:13 +08:00
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
2019-05-31 15:27:30 +08:00
std::vector<PerfIntCounter> opCounters;
std::vector<uint64_t> insertionCountsToMeasure;
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
2019-05-31 15:44:07 +08:00
std::string operationsSpec;
2020-09-02 08:28:32 +08:00
// store operations to execute
2019-05-10 06:35:30 +08:00
int operations[MAX_OP][2];
2019-05-16 03:58:12 +08:00
// used for periodically tracing
2019-05-31 15:27:30 +08:00
std::vector<PerfMetric> periodicMetrics;
// store latency of each operation with sampling
std::vector<ContinuousSample<double>> opLatencies;
// key used to store checkSum for given key range
std::vector<Key> csKeys;
// key prefix of for all generated keys
std::string keyPrefix;
int KEYPREFIXLEN;
2020-09-02 08:28:32 +08:00
const std::array<std::string, MAX_OP> opNames = { "GRV", "GET", "GETRANGE", "SGET",
"SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE",
"CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE",
"COMMIT" };
2019-05-10 06:35:30 +08:00
MakoWorkload(WorkloadContext const& wcx)
2020-09-02 08:28:32 +08:00
: TestWorkload(wcx), xacts("Transactions"), retries("Retries"), conflicts("Conflicts"), commits("Commits"),
totalOps("Operations"), loadTime(0.0) {
2019-05-16 03:58:12 +08:00
// init parameters from test file
// Number of rows populated
2019-05-10 06:35:30 +08:00
rowCount = getOption(options, LiteralStringRef("rows"), 10000);
2019-05-16 03:58:12 +08:00
// Test duration in seconds
2019-05-10 06:35:30 +08:00
testDuration = getOption(options, LiteralStringRef("testDuration"), 30.0);
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
// Flag to control whether to populate data into database
populateData = getOption(options, LiteralStringRef("populateData"), true);
// Flag to control whether to run benchmark
runBenchmark = getOption(options, LiteralStringRef("runBenchmark"), true);
// Flag to control whether to clean data in the database
preserveData = getOption(options, LiteralStringRef("preserveData"), true);
2019-05-16 03:58:12 +08:00
// If true, force commit for read-only transactions
2019-05-15 01:13:13 +08:00
commitGet = getOption(options, LiteralStringRef("commitGet"), false);
2020-09-02 07:14:43 +08:00
// If true, log latency for set, clear and clearrange
latencyForLocalOperation = getOption(options, LiteralStringRef("latencyForLocalOperation"), false);
2019-05-16 03:58:12 +08:00
// Target total transaction-per-second (TPS) of all clients
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 100000.0) / clientCount;
actorCountPerClient = getOption(options, LiteralStringRef("actorCountPerClient"), 16);
2019-05-16 03:58:12 +08:00
// Sampling rate (1 sample / <sampleSize> ops) for latency stats
2019-05-31 15:27:30 +08:00
sampleSize = getOption(options, LiteralStringRef("sampleSize"), rowCount / 100);
2019-05-16 03:58:12 +08:00
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
2020-09-02 08:28:32 +08:00
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
// All the generated keys will start with the specified prefix
2020-09-02 08:28:32 +08:00
keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("mako")).toString();
KEYPREFIXLEN = keyPrefix.size();
// If true, the workload will picking up keys which are zipfian distributed
zipf = getOption(options, LiteralStringRef("zipf"), false);
zipfConstant = getOption(options, LiteralStringRef("zipfConstant"), 0.99);
2019-06-03 14:16:39 +08:00
// Specified length of keys and length range of values
2020-09-02 08:28:32 +08:00
keyBytes = std::max(getOption(options, LiteralStringRef("keyBytes"), 16), 16);
maxValueBytes = getOption(options, LiteralStringRef("valueBytes"), 16);
minValueBytes = getOption(options, LiteralStringRef("minValueBytes"), maxValueBytes);
2019-06-03 14:16:39 +08:00
ASSERT(minValueBytes <= maxValueBytes);
2019-05-16 03:58:12 +08:00
// The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x')
// assume we want to insert 10000 rows with keyBytes set to 16,
2019-05-16 03:58:12 +08:00
// then the key goes from 'mako00000xxxxxxx' to 'mako09999xxxxxxx'
seqNumLen = digits(rowCount);
2019-05-16 03:58:12 +08:00
// check keyBytes, maxValueBytes is valid
ASSERT(seqNumLen + KEYPREFIXLEN <= keyBytes);
2019-05-16 03:58:12 +08:00
// user input: a sequence of operations to be executed; e.g. "g10i5" means to do GET 10 times and Insert 5 times
// One operation type is defined as "<Type><Count>" or "<Type><Count>:<Range>".
// When Count is omitted, it's equivalent to setting it to 1. (e.g. "g" is equivalent to "g1")
// Multiple operation types can be concatenated. (e.g. "g9u1" = 9 GETs and 1 update)
// For RANGE operations, "Range" needs to be specified in addition to "Count".
// Below are all allowed inputs:
2020-09-02 08:28:32 +08:00
// g GET
// gr GET RANGE
// sg Snapshot GET
// sgr Snapshot GET RANGE
// u Update (= GET followed by SET)
// i Insert (= SET with a new key)
// ir Insert Range (Sequential)
// c CLEAR
// sc SET & CLEAR
// cr CLEAR RANGE
// scr SET & CLEAR RANGE
// grv GetReadVersion()
// Every transaction is committed unless it contains only GET / GET RANGE operations.
2020-09-02 08:28:32 +08:00
operationsSpec =
getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString();
2019-05-16 03:58:12 +08:00
// parse the sequence and extract operations to be executed
parseOperationsSpec();
2019-05-10 06:35:30 +08:00
for (int i = 0; i < MAX_OP; ++i) {
// initilize per-operation latency record
2019-05-31 15:27:30 +08:00
opLatencies.push_back(ContinuousSample<double>(rowCount / sampleSize));
2019-05-10 06:35:30 +08:00
// initialize per-operation counter
opCounters.push_back(PerfIntCounter(opNames[i]));
}
2020-09-02 08:28:32 +08:00
if (zipf) {
zipfian_generator3(0, (int)rowCount - 1, zipfConstant);
}
// Added for checksum verification
csSize = getOption(options, LiteralStringRef("csSize"), rowCount / 100);
ASSERT(csSize <= rowCount);
csCount = getOption(options, LiteralStringRef("csCount"), 0);
checksumVerification = (csCount != 0);
doChecksumVerificationOnly = getOption(options, LiteralStringRef("doChecksumVerificationOnly"), false);
if (doChecksumVerificationOnly)
ASSERT(checksumVerification); // csCount should be non-zero when you do checksum verification only
if (csCount) {
csPartitionSize = rowCount / csSize;
ASSERT(csCount <= csPartitionSize);
csStepSizeInPartition = csPartitionSize / csCount;
2020-09-02 08:28:32 +08:00
for (int i = 0; i < csCount; ++i) {
csKeys.emplace_back(format((keyPrefix + "_crc32c_%u_%u").c_str(), i, rowCount));
}
}
2019-05-10 06:35:30 +08:00
}
2020-10-05 13:29:07 +08:00
std::string description() const override {
2019-05-16 03:58:12 +08:00
// Mako is a simple workload to measure the performance of FDB.
// The primary purpose of this benchmark is to generate consistent performance results
return "Mako";
}
2019-05-10 06:35:30 +08:00
2019-05-15 01:13:13 +08:00
Future<Void> setup(Database const& cx) override {
if (doChecksumVerificationOnly)
return Void();
return _setup(cx, this);
2019-05-15 01:13:13 +08:00
}
2019-05-10 06:35:30 +08:00
Future<Void> start(Database const& cx) override {
if (doChecksumVerificationOnly)
return Void();
2019-05-15 01:13:13 +08:00
return _start(cx, this);
2019-05-10 06:35:30 +08:00
}
Future<bool> check(Database const& cx) override {
2020-09-02 08:28:32 +08:00
if (!checksumVerification) {
return true;
}
// verify checksum consistency
return dochecksumVerification(cx, this);
2019-05-10 06:35:30 +08:00
}
// disable the default timeout setting
2020-10-05 13:29:07 +08:00
double getCheckTimeout() const override { return std::numeric_limits<double>::max(); }
2019-05-31 15:27:30 +08:00
void getMetrics(std::vector<PerfMetric>& m) override {
// metrics of population process
2020-09-02 08:28:32 +08:00
if (populateData) {
m.push_back(PerfMetric("Mean load time (seconds)", loadTime, true));
2019-05-16 03:58:12 +08:00
// The importing rate of keys, controlled by parameter "insertionCountsToMeasure"
2019-05-15 01:13:13 +08:00
auto ratesItr = ratesAtKeyCounts.begin();
2020-09-02 08:28:32 +08:00
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) {
m.push_back(
PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
2019-05-16 03:58:12 +08:00
}
2019-05-15 01:13:13 +08:00
}
// benchmark
2020-09-02 08:28:32 +08:00
if (runBenchmark) {
m.push_back(PerfMetric("Measured Duration", testDuration, true));
m.push_back(xacts.getMetric());
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
m.push_back(totalOps.getMetric());
m.push_back(PerfMetric("Operations/sec", totalOps.getValue() / testDuration, true));
m.push_back(conflicts.getMetric());
m.push_back(PerfMetric("Conflicts/sec", conflicts.getValue() / testDuration, true));
m.push_back(retries.getMetric());
// count of each operation
2020-09-02 08:28:32 +08:00
for (int i = 0; i < MAX_OP; ++i) {
m.push_back(opCounters[i].getMetric());
}
2019-05-10 06:35:30 +08:00
// Meaningful Latency metrics
2020-09-02 08:28:32 +08:00
const int opExecutedAtOnce[] = { OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT };
for (const int& op : opExecutedAtOnce) {
m.push_back(PerfMetric("Mean " + opNames[op] + " Latency (us)", 1e6 * opLatencies[op].mean(), true));
m.push_back(
PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true));
m.push_back(
PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true));
}
2020-09-02 07:14:43 +08:00
// Latency for local operations if needed
if (latencyForLocalOperation) {
2020-09-02 08:28:32 +08:00
const int localOp[] = { OP_INSERT, OP_CLEAR, OP_CLEARRANGE };
for (const int& op : localOp) {
2020-09-02 07:14:43 +08:00
TraceEvent(SevDebug, "LocalLatency")
2020-09-02 08:28:32 +08:00
.detail("Name", opNames[op])
.detail("Size", opLatencies[op].getPopulationSize());
m.push_back(
PerfMetric("Mean " + opNames[op] + " Latency (us)", 1e6 * opLatencies[op].mean(), true));
m.push_back(PerfMetric(
"Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true));
m.push_back(PerfMetric(
"Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true));
2020-09-02 07:14:43 +08:00
}
}
2019-05-10 06:35:30 +08:00
2020-09-02 08:28:32 +08:00
// insert logging metrics if exists
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
}
2019-05-10 06:35:30 +08:00
}
2019-05-31 15:44:07 +08:00
static std::string randStr(int len) {
std::string result(len, '.');
2019-05-15 01:13:13 +08:00
for (int i = 0; i < len; ++i) {
2019-05-31 15:44:07 +08:00
result[i] = deterministicRandom()->randomAlphaNumeric();
2019-05-10 06:35:30 +08:00
}
2019-05-31 15:44:07 +08:00
return result;
2019-05-10 06:35:30 +08:00
}
2019-05-15 01:13:13 +08:00
2020-09-02 08:28:32 +08:00
static void randStr(char* str, int len) {
2019-05-16 03:58:12 +08:00
for (int i = 0; i < len; ++i) {
str[i] = deterministicRandom()->randomAlphaNumeric();
2019-05-15 01:13:13 +08:00
}
}
2019-05-16 03:58:12 +08:00
2019-05-10 06:35:30 +08:00
Value randomValue() {
const int length = deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1);
2019-05-31 15:44:07 +08:00
std::string valueString = randStr(length);
2019-05-22 07:22:02 +08:00
return StringRef(reinterpret_cast<const uint8_t*>(valueString.c_str()), length);
2019-05-10 06:35:30 +08:00
}
2019-06-03 14:16:39 +08:00
Key keyForIndex(uint64_t ind) {
Key result = makeString(keyBytes);
2019-05-22 07:22:02 +08:00
char* data = reinterpret_cast<char*>(mutateString(result));
format((keyPrefix + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen);
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
data[i] = 'x';
return result;
2019-05-10 06:35:30 +08:00
}
/* number of digits */
static uint64_t digits(uint64_t num) {
uint64_t digits = 0;
while (num > 0) {
num /= 10;
digits++;
}
return digits;
}
2020-09-02 08:28:32 +08:00
static void updateCSFlags(MakoWorkload* self, std::vector<bool>& flags, uint64_t startIdx, uint64_t endIdx) {
// We deal with cases where rowCount % csCount != 0 and csPartitionSize % csSize != 0;
2020-09-02 08:28:32 +08:00
// In particular, all keys with index in range [csSize * csPartitionSize, rowCount) will not be used for
// checksum By the same way, for any i in range [0, csSize): keys with index in range [ i*csPartitionSize,
// i*csPartitionSize + csCount*csStepSizeInPartition) will not be used for checksum
uint64_t boundary = self->csSize * self->csPartitionSize;
if (startIdx >= boundary)
return;
else if (endIdx > boundary)
endIdx = boundary;
// If all checksums need to be updated, just return
if (std::all_of(flags.begin(), flags.end(), [](bool flag) { return flag; }))
return;
2020-09-02 08:28:32 +08:00
if (startIdx + 1 == endIdx) {
// single key case
startIdx = startIdx % self->csPartitionSize;
2020-09-02 08:28:32 +08:00
if ((startIdx < self->csCount * self->csStepSizeInPartition) &&
(startIdx % self->csStepSizeInPartition == 0)) {
flags.at(startIdx / self->csStepSizeInPartition) = true;
}
2020-09-02 08:28:32 +08:00
} else {
// key range case
uint64_t count = self->csCount;
uint64_t base = (startIdx / self->csPartitionSize) * self->csPartitionSize;
startIdx -= base;
endIdx -= base;
uint64_t startStepIdx = std::min(startIdx / self->csStepSizeInPartition, self->csCount - 1);
// if changed range size is more than one csPartitionSize, which means every checksum needs to be updated
2020-09-02 08:28:32 +08:00
if ((endIdx - startIdx) < self->csPartitionSize) {
uint64_t endStepIdx;
2020-09-02 08:28:32 +08:00
if (endIdx > self->csPartitionSize) {
endStepIdx =
self->csCount +
std::min((endIdx - 1 - self->csPartitionSize) / self->csStepSizeInPartition, self->csCount);
} else {
endStepIdx = std::min((endIdx - 1) / self->csStepSizeInPartition, self->csCount - 1);
}
// All the left boundary of csStep should be updated
// Also, check the startIdx whether it is the left boundary of a csStep
if (startIdx == self->csStepSizeInPartition * startStepIdx)
flags[startStepIdx] = true;
count = endStepIdx - startStepIdx;
}
2020-09-02 08:28:32 +08:00
for (int i = 1; i <= count; ++i) {
flags[(startStepIdx + i) % self->csCount] = true;
}
}
}
2020-09-02 08:28:32 +08:00
Standalone<KeyValueRef> operator()(uint64_t n) { return KeyValueRef(keyForIndex(n), randomValue()); }
2019-05-10 06:35:30 +08:00
2020-09-02 08:28:32 +08:00
ACTOR static Future<Void> tracePeriodically(MakoWorkload* self) {
2020-09-02 08:20:53 +08:00
state double start = timer();
2019-05-15 01:13:13 +08:00
state double elapsed = 0.0;
state int64_t last_ops = 0;
state int64_t last_xacts = 0;
loop {
elapsed += self->periodicLoggingInterval;
2020-09-02 08:28:32 +08:00
wait(delayUntil(start + elapsed));
TraceEvent((self->description() + "_CommitLatency").c_str())
.detail("Mean", self->opLatencies[OP_COMMIT].mean())
.detail("Median", self->opLatencies[OP_COMMIT].median())
.detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05))
.detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95))
.detail("Count", self->opCounters[OP_COMMIT].getValue())
.detail("Elapsed", elapsed);
TraceEvent((self->description() + "_GRVLatency").c_str())
.detail("Mean", self->opLatencies[OP_GETREADVERSION].mean())
.detail("Median", self->opLatencies[OP_GETREADVERSION].median())
.detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05))
.detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95))
.detail("Count", self->opCounters[OP_GETREADVERSION].getValue());
2019-05-15 01:13:13 +08:00
std::string ts = format("T=%04.0fs: ", elapsed);
2020-09-02 08:28:32 +08:00
self->periodicMetrics.push_back(PerfMetric(
ts + "Transactions/sec", (self->xacts.getValue() - last_xacts) / self->periodicLoggingInterval, false));
self->periodicMetrics.push_back(PerfMetric(
ts + "Operations/sec", (self->totalOps.getValue() - last_ops) / self->periodicLoggingInterval, false));
2019-05-15 01:13:13 +08:00
last_xacts = self->xacts.getValue();
last_ops = self->totalOps.getValue();
}
}
2019-05-10 06:35:30 +08:00
ACTOR Future<Void> _setup(Database cx, MakoWorkload* self) {
// use all the clients to populate data
if (self->populateData) {
state Promise<double> loadTime;
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
2019-05-10 06:35:30 +08:00
wait(bulkSetup(cx,
self,
self->rowCount,
loadTime,
self->insertionCountsToMeasure.empty(),
self->warmingDelay,
self->maxInsertRate,
self->insertionCountsToMeasure,
ratesAtKeyCounts));
2019-05-10 06:35:30 +08:00
// This is the setup time
self->loadTime = loadTime.getFuture().get();
// This is the rates of importing keys
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
}
// Use one client to initialize checksums
2020-09-02 08:28:32 +08:00
if (self->checksumVerification && self->clientId == 0) {
wait(generateChecksum(cx, self));
}
2019-05-10 06:35:30 +08:00
return Void();
}
ACTOR Future<Void> _start(Database cx, MakoWorkload* self) {
// TODO: Do I need to read data to warm the cache of the keySystem like ReadWrite.actor.cpp (line 465)?
if (self->runBenchmark) {
2019-05-31 15:27:30 +08:00
wait(self->_runBenchmark(cx, self));
}
2020-09-02 08:28:32 +08:00
if (!self->preserveData && self->clientId == 0) {
wait(self->cleanup(cx, self));
}
return Void();
}
2020-09-02 08:28:32 +08:00
ACTOR Future<Void> _runBenchmark(Database cx, MakoWorkload* self) {
2019-05-31 15:27:30 +08:00
std::vector<Future<Void>> clients;
clients.reserve(self->actorCountPerClient);
2019-05-16 03:58:12 +08:00
for (int c = 0; c < self->actorCountPerClient; ++c) {
clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c));
2019-05-10 06:35:30 +08:00
}
if (self->enableLogging)
clients.push_back(tracePeriodically(self));
2019-05-10 06:35:30 +08:00
2020-09-02 08:28:32 +08:00
wait(timeout(waitForAll(clients), self->testDuration, Void()));
2019-05-10 06:35:30 +08:00
return Void();
}
2019-05-15 01:13:13 +08:00
ACTOR Future<Void> makoClient(Database cx, MakoWorkload* self, double delay, int actorIndex) {
2019-05-10 06:35:30 +08:00
state Key rkey, rkey2;
2019-05-10 06:35:30 +08:00
state Value rval;
2019-05-22 07:22:02 +08:00
state ReadYourWritesTransaction tr(cx);
2019-05-10 06:35:30 +08:00
state bool doCommit;
state int i, count;
2019-05-16 03:58:12 +08:00
state uint64_t range, indBegin, indEnd, rangeLen;
2019-05-10 06:35:30 +08:00
state KeyRangeRef rkeyRangeRef;
2019-05-31 15:27:30 +08:00
state std::vector<int> perOpCount(MAX_OP, 0);
// flag at index-i indicates whether checksum-i need to be updated
state std::vector<bool> csChangedFlags(self->csCount, false);
2020-09-02 08:20:53 +08:00
state double lastTime = timer();
state double commitStart;
2019-05-10 06:35:30 +08:00
2020-09-02 08:28:32 +08:00
TraceEvent("ClientStarting")
.detail("ActorIndex", actorIndex)
.detail("ClientIndex", self->clientId)
.detail("NumActors", self->actorCountPerClient);
2019-05-15 01:13:13 +08:00
2019-05-10 06:35:30 +08:00
loop {
2019-05-15 01:13:13 +08:00
// used for throttling
wait(poisson(&lastTime, delay));
2020-09-02 08:28:32 +08:00
try {
2019-05-15 01:13:13 +08:00
// user-defined value: whether commit read-only ops or not; default is false
doCommit = self->commitGet;
2019-05-10 06:35:30 +08:00
for (i = 0; i < MAX_OP; ++i) {
if (i == OP_COMMIT)
continue;
2019-05-10 06:35:30 +08:00
for (count = 0; count < self->operations[i][0]; ++count) {
2020-03-18 04:41:14 +08:00
range = self->operations[i][1];
2019-05-31 15:27:30 +08:00
rangeLen = digits(range);
2019-05-15 01:13:13 +08:00
// generate random key-val pair for operation
indBegin = self->getRandomKeyIndex(self->rowCount);
2019-06-03 14:16:39 +08:00
rkey = self->keyForIndex(indBegin);
2019-05-10 06:35:30 +08:00
rval = self->randomValue();
indEnd = std::min(indBegin + range, self->rowCount);
2019-06-03 14:16:39 +08:00
rkey2 = self->keyForIndex(indEnd);
// KeyRangeRef(min, maxPlusOne)
rkeyRangeRef = KeyRangeRef(rkey, rkey2);
2019-05-10 06:35:30 +08:00
// used for mako-level consistency check
2020-09-02 08:28:32 +08:00
if (self->checksumVerification) {
if (i == OP_INSERT | i == OP_UPDATE | i == OP_CLEAR) {
updateCSFlags(self, csChangedFlags, indBegin, indBegin + 1);
2020-09-02 08:28:32 +08:00
} else if (i == OP_CLEARRANGE) {
updateCSFlags(self, csChangedFlags, indBegin, indEnd);
}
}
2020-09-02 08:28:32 +08:00
if (i == OP_GETREADVERSION) {
wait(logLatency(tr.getReadVersion(), &self->opLatencies[i]));
2020-09-02 08:28:32 +08:00
} else if (i == OP_GET) {
wait(logLatency(tr.get(rkey, Snapshot::False), &self->opLatencies[i]));
2020-09-02 08:28:32 +08:00
} else if (i == OP_GETRANGE) {
wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, Snapshot::False),
2020-09-02 08:28:32 +08:00
&self->opLatencies[i]));
} else if (i == OP_SGET) {
wait(logLatency(tr.get(rkey, Snapshot::True), &self->opLatencies[i]));
2020-09-02 08:28:32 +08:00
} else if (i == OP_SGETRANGE) {
// do snapshot get range here
wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, Snapshot::True),
2020-09-02 08:28:32 +08:00
&self->opLatencies[i]));
} else if (i == OP_UPDATE) {
wait(logLatency(tr.get(rkey, Snapshot::False), &self->opLatencies[OP_GET]));
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.set(rkey, rval);
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
} else {
tr.set(rkey, rval);
}
2019-05-10 06:35:30 +08:00
doCommit = true;
2020-09-02 08:28:32 +08:00
} else if (i == OP_INSERT) {
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly
// generated characters
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN,
self->keyBytes - self->KEYPREFIXLEN);
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.set(rkey, rval);
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
} else {
tr.set(rkey, rval);
}
2019-05-15 01:13:13 +08:00
doCommit = true;
2020-09-02 08:28:32 +08:00
} else if (i == OP_INSERTRANGE) {
char* rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes - self->KEYPREFIXLEN);
2020-09-02 07:14:43 +08:00
for (int range_i = 0; range_i < range; ++range_i) {
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.set(rkey, self->randomValue());
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
} else {
tr.set(rkey, self->randomValue());
}
2019-05-15 01:13:13 +08:00
}
2019-05-10 06:35:30 +08:00
doCommit = true;
2020-09-02 08:28:32 +08:00
} else if (i == OP_CLEAR) {
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.clear(rkey);
self->opLatencies[OP_CLEAR].addSample(timer() - opBegin);
} else {
tr.clear(rkey);
}
2019-05-10 06:35:30 +08:00
doCommit = true;
2020-09-02 08:28:32 +08:00
} else if (i == OP_SETCLEAR) {
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN,
self->keyBytes - self->KEYPREFIXLEN);
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.set(rkey, rval);
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
} else {
tr.set(rkey, rval);
}
wait(self->updateCSBeforeCommit(&tr, self, &csChangedFlags));
2019-05-16 03:58:12 +08:00
// commit the change and update metrics
2020-09-02 08:20:53 +08:00
commitStart = timer();
2019-05-15 01:13:13 +08:00
wait(tr.commit());
2020-09-02 08:20:53 +08:00
self->opLatencies[OP_COMMIT].addSample(timer() - commitStart);
2019-05-16 03:58:12 +08:00
++perOpCount[OP_COMMIT];
2019-05-15 01:13:13 +08:00
tr.reset();
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.clear(rkey);
self->opLatencies[OP_CLEAR].addSample(timer() - opBegin);
} else {
tr.clear(rkey);
}
2019-05-15 01:13:13 +08:00
doCommit = true;
2020-09-02 08:28:32 +08:00
} else if (i == OP_CLEARRANGE) {
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.clear(rkeyRangeRef);
self->opLatencies[OP_CLEARRANGE].addSample(timer() - opBegin);
} else {
tr.clear(rkeyRangeRef);
}
2019-05-10 06:35:30 +08:00
doCommit = true;
2020-09-02 08:28:32 +08:00
} else if (i == OP_SETCLEARRANGE) {
char* rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes - self->KEYPREFIXLEN);
2019-05-15 01:13:13 +08:00
state std::string scr_start_key;
state std::string scr_end_key;
state KeyRangeRef scr_key_range_ref;
2020-09-02 08:28:32 +08:00
for (int range_i = 0; range_i < range; ++range_i) {
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.set(rkey, self->randomValue());
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
} else {
tr.set(rkey, self->randomValue());
}
if (range_i == 0)
scr_start_key = rkey.toString();
2019-05-15 01:13:13 +08:00
}
scr_end_key = rkey.toString();
scr_key_range_ref = KeyRangeRef(KeyRef(scr_start_key), KeyRef(scr_end_key));
wait(self->updateCSBeforeCommit(&tr, self, &csChangedFlags));
2020-09-02 08:20:53 +08:00
commitStart = timer();
2019-05-15 01:13:13 +08:00
wait(tr.commit());
2020-09-02 08:20:53 +08:00
self->opLatencies[OP_COMMIT].addSample(timer() - commitStart);
2019-05-16 03:58:12 +08:00
++perOpCount[OP_COMMIT];
2019-05-15 01:13:13 +08:00
tr.reset();
2020-09-02 07:14:43 +08:00
if (self->latencyForLocalOperation) {
double opBegin = timer();
tr.clear(scr_key_range_ref);
self->opLatencies[OP_CLEARRANGE].addSample(timer() - opBegin);
} else {
tr.clear(scr_key_range_ref);
}
2019-05-15 01:13:13 +08:00
doCommit = true;
2019-05-10 06:35:30 +08:00
}
++perOpCount[i];
}
}
2019-05-15 01:13:13 +08:00
2019-05-10 06:35:30 +08:00
if (doCommit) {
wait(self->updateCSBeforeCommit(&tr, self, &csChangedFlags));
2020-09-02 08:20:53 +08:00
commitStart = timer();
2019-05-10 06:35:30 +08:00
wait(tr.commit());
2020-09-02 08:20:53 +08:00
self->opLatencies[OP_COMMIT].addSample(timer() - commitStart);
2019-05-10 06:35:30 +08:00
++perOpCount[OP_COMMIT];
}
// successfully finish the transaction, update metrics
++self->xacts;
2020-09-02 08:28:32 +08:00
for (int op = 0; op < MAX_OP; ++op) {
2019-05-10 06:35:30 +08:00
self->opCounters[op] += perOpCount[op];
self->totalOps += perOpCount[op];
}
} catch (Error& e) {
2019-05-31 15:27:30 +08:00
TraceEvent("FailedToExecOperations").error(e);
2019-05-10 06:35:30 +08:00
if (e.code() == error_code_operation_cancelled)
throw;
2019-05-15 01:13:13 +08:00
else if (e.code() == error_code_not_committed)
2019-05-10 06:35:30 +08:00
++self->conflicts;
2019-05-10 06:35:30 +08:00
wait(tr.onError(e));
2019-05-15 01:13:13 +08:00
++self->retries;
2019-05-10 06:35:30 +08:00
}
// reset all the operations' counters to 0
std::fill(perOpCount.begin(), perOpCount.end(), 0);
tr.reset();
}
}
2020-09-02 08:28:32 +08:00
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self) {
// clear all data starts with 'mako' in the database
state std::string keyPrefix(self->keyPrefix);
2019-05-22 07:22:02 +08:00
state ReadYourWritesTransaction tr(cx);
2019-05-15 01:13:13 +08:00
2020-09-02 08:28:32 +08:00
loop {
2019-05-15 01:13:13 +08:00
try {
2019-05-22 07:22:02 +08:00
tr.clear(prefixRange(keyPrefix));
2019-05-15 01:13:13 +08:00
wait(tr.commit());
TraceEvent("CleanUpMakoRelatedData").detail("KeyPrefix", self->keyPrefix);
2019-05-15 01:13:13 +08:00
break;
2020-09-02 08:28:32 +08:00
} catch (Error& e) {
2019-05-31 15:27:30 +08:00
TraceEvent("FailedToCleanData").error(e);
2019-05-15 01:13:13 +08:00
wait(tr.onError(e));
}
}
return Void();
}
2020-09-02 08:28:32 +08:00
ACTOR template <class T>
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies) {
2020-09-02 08:20:53 +08:00
state double opBegin = timer();
2019-09-26 14:19:42 +08:00
wait(success(f));
2020-09-02 08:20:53 +08:00
opLatencies->addSample(timer() - opBegin);
2019-05-10 06:35:30 +08:00
return Void();
}
int64_t getRandomKeyIndex(uint64_t rowCount) {
int64_t randomKeyIndex;
2020-09-02 08:28:32 +08:00
if (zipf) {
randomKeyIndex = zipfian_next();
} else {
randomKeyIndex = deterministicRandom()->randomInt64(0, rowCount);
}
return randomKeyIndex;
2019-05-15 01:13:13 +08:00
}
2019-05-31 15:27:30 +08:00
void parseOperationsSpec() {
2020-09-02 08:28:32 +08:00
const char* ptr = operationsSpec.c_str();
2019-05-10 06:35:30 +08:00
int op = 0;
int rangeop = 0;
int num;
int error = 0;
for (op = 0; op < MAX_OP; op++) {
operations[op][OP_COUNT] = 0;
operations[op][OP_RANGE] = 0;
}
op = 0;
2019-05-16 03:58:12 +08:00
while (*ptr) {
if (strncmp(ptr, "grv", 3) == 0) {
op = OP_GETREADVERSION;
ptr += 3;
} else if (strncmp(ptr, "gr", 2) == 0) {
op = OP_GETRANGE;
rangeop = 1;
ptr += 2;
} else if (strncmp(ptr, "g", 1) == 0) {
op = OP_GET;
ptr++;
} else if (strncmp(ptr, "sgr", 3) == 0) {
op = OP_SGETRANGE;
rangeop = 1;
ptr += 3;
} else if (strncmp(ptr, "sg", 2) == 0) {
op = OP_SGET;
ptr += 2;
} else if (strncmp(ptr, "u", 1) == 0) {
2019-05-10 06:35:30 +08:00
op = OP_UPDATE;
2019-05-16 03:58:12 +08:00
ptr++;
} else if (strncmp(ptr, "ir", 2) == 0) {
op = OP_INSERTRANGE;
rangeop = 1;
ptr += 2;
} else if (strncmp(ptr, "i", 1) == 0) {
2019-05-10 06:35:30 +08:00
op = OP_INSERT;
2019-05-16 03:58:12 +08:00
ptr++;
} else if (strncmp(ptr, "cr", 2) == 0) {
op = OP_CLEARRANGE;
rangeop = 1;
ptr += 2;
} else if (strncmp(ptr, "c", 1) == 0) {
op = OP_CLEAR;
ptr++;
} else if (strncmp(ptr, "scr", 3) == 0) {
op = OP_SETCLEARRANGE;
rangeop = 1;
ptr += 3;
} else if (strncmp(ptr, "sc", 2) == 0) {
op = OP_SETCLEAR;
ptr += 2;
} else {
error = 1;
break;
2019-05-10 06:35:30 +08:00
}
/* count */
num = 0;
2019-05-16 03:58:12 +08:00
if ((*ptr < '0') || (*ptr > '9')) {
2019-05-10 06:35:30 +08:00
num = 1; /* if omitted, set it to 1 */
} else {
2019-05-16 03:58:12 +08:00
while ((*ptr >= '0') && (*ptr <= '9')) {
num = num * 10 + *ptr - '0';
ptr++;
2019-05-10 06:35:30 +08:00
}
}
/* set count */
operations[op][OP_COUNT] = num;
if (rangeop) {
2019-05-16 03:58:12 +08:00
if (*ptr != ':') {
2019-05-10 06:35:30 +08:00
error = 1;
break;
} else {
2019-05-16 03:58:12 +08:00
ptr++; /* skip ':' */
2019-05-10 06:35:30 +08:00
num = 0;
2019-05-16 03:58:12 +08:00
if ((*ptr < '0') || (*ptr > '9')) {
2019-05-10 06:35:30 +08:00
error = 1;
break;
}
2019-05-16 03:58:12 +08:00
while ((*ptr >= '0') && (*ptr <= '9')) {
num = num * 10 + *ptr - '0';
ptr++;
2019-05-10 06:35:30 +08:00
}
/* set range */
operations[op][OP_RANGE] = num;
}
}
rangeop = 0;
}
if (error) {
2020-09-02 08:28:32 +08:00
TraceEvent(SevError, "TestFailure")
.detail("Reason", "InvalidTransactionSpecification")
.detail("operations", operationsSpec);
}
}
2020-09-02 08:28:32 +08:00
ACTOR static Future<uint32_t> calcCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIndex) {
state uint32_t result = 0;
state int i;
state Key csKey;
2020-09-02 08:28:32 +08:00
for (i = 0; i < self->csSize; ++i) {
int idx = csIndex * self->csStepSizeInPartition + i * self->csPartitionSize;
csKey = self->keyForIndex(idx);
Optional<Value> temp = wait(tr->get(csKey));
2020-09-02 08:28:32 +08:00
if (temp.present()) {
Value val = temp.get();
result = crc32c_append(result, val.begin(), val.size());
} else {
// If the key does not exists, we just use the key itself not the value to calculate checkSum
result = crc32c_append(result, csKey.begin(), csKey.size());
}
}
return result;
}
ACTOR static Future<bool> dochecksumVerification(Database cx, MakoWorkload* self) {
state ReadYourWritesTransaction tr(cx);
state int csIdx;
state Value csValue;
loop {
try {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
Optional<Value> temp = wait(tr.get(self->csKeys[csIdx]));
2020-09-02 08:28:32 +08:00
if (!temp.present()) {
TraceEvent(SevError, "TestFailure")
.detail("Reason", "NoExistingChecksum")
.detail("missedChecksumIndex", csIdx);
return false;
} else {
csValue = temp.get();
ASSERT(csValue.size() == sizeof(uint32_t));
uint32_t calculatedCS = wait(calcCheckSum(&tr, self, csIdx));
uint32_t existingCS = *(reinterpret_cast<const uint32_t*>(csValue.begin()));
2020-09-02 08:28:32 +08:00
if (existingCS != calculatedCS) {
TraceEvent(SevError, "TestFailure")
.detail("Reason", "ChecksumVerificationFailure")
.detail("ChecksumIndex", csIdx)
.detail("ExistingChecksum", existingCS)
.detail("CurrentChecksum", calculatedCS);
return false;
}
2020-09-02 08:28:32 +08:00
TraceEvent("ChecksumVerificationPass")
.detail("ChecksumIndex", csIdx)
.detail("ChecksumValue", existingCS);
}
}
return true;
2020-09-02 08:28:32 +08:00
} catch (Error& e) {
TraceEvent("FailedToCalculateChecksum").detail("ChecksumIndex", csIdx).error(e);
wait(tr.onError(e));
}
}
}
ACTOR static Future<Void> generateChecksum(Database cx, MakoWorkload* self) {
state ReadYourWritesTransaction tr(cx);
state int csIdx;
loop {
try {
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
Optional<Value> temp = wait(tr.get(self->csKeys[csIdx]));
if (temp.present())
TraceEvent("DuplicatePopulationOnSamePrefix").detail("KeyPrefix", self->keyPrefix);
wait(self->updateCheckSum(&tr, self, csIdx));
}
wait(tr.commit());
break;
2020-09-02 08:28:32 +08:00
} catch (Error& e) {
TraceEvent("FailedToGenerateChecksumForPopulatedData").error(e);
wait(tr.onError(e));
}
2019-05-10 06:35:30 +08:00
}
return Void();
}
2020-09-02 08:28:32 +08:00
ACTOR static Future<Void> updateCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIdx) {
state uint32_t csVal = wait(calcCheckSum(tr, self, csIdx));
TraceEvent("UpdateCheckSum").detail("ChecksumIndex", csIdx).detail("Checksum", csVal);
tr->set(self->csKeys[csIdx], ValueRef(reinterpret_cast<const uint8_t*>(&csVal), sizeof(uint32_t)));
return Void();
}
ACTOR static Future<Void> updateCSBeforeCommit(ReadYourWritesTransaction* tr,
MakoWorkload* self,
2020-09-02 08:28:32 +08:00
std::vector<bool>* flags) {
if (!self->checksumVerification)
return Void();
state int csIdx;
2020-09-02 08:28:32 +08:00
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
if ((*flags)[csIdx]) {
wait(updateCheckSum(tr, self, csIdx));
(*flags)[csIdx] = false;
}
}
return Void();
2019-05-10 06:35:30 +08:00
}
};
WorkloadFactory<MakoWorkload> MakoloadFactory("Mako");