2019-05-10 06:35:30 +08:00
|
|
|
|
#include "fdbclient/NativeAPI.actor.h"
|
|
|
|
|
#include "fdbserver/TesterInterface.actor.h"
|
2019-05-31 15:27:30 +08:00
|
|
|
|
#include "fdbserver/workloads/workloads.actor.h"
|
2019-05-10 06:35:30 +08:00
|
|
|
|
#include "fdbserver/workloads/BulkSetup.actor.h"
|
2019-05-22 07:22:02 +08:00
|
|
|
|
#include "fdbclient/ReadYourWrites.h"
|
2019-05-10 06:35:30 +08:00
|
|
|
|
#include "flow/actorcompiler.h"
|
2019-09-18 04:04:54 +08:00
|
|
|
|
#include "fdbclient/zipf.h"
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
|
|
|
|
|
2019-05-18 02:15:35 +08:00
|
|
|
|
enum {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_UPDATE, OP_INSERT, OP_INSERTRANGE, OP_CLEAR, OP_SETCLEAR, OP_CLEARRANGE, OP_SETCLEARRANGE, OP_COMMIT, MAX_OP};
|
|
|
|
|
enum {OP_COUNT, OP_RANGE};
|
2019-05-31 15:27:30 +08:00
|
|
|
|
constexpr int MAXKEYVALUESIZE = 1000;
|
|
|
|
|
constexpr int RANGELIMIT = 10000;
|
2019-06-03 14:16:39 +08:00
|
|
|
|
struct MakoWorkload : TestWorkload {
|
|
|
|
|
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes;
|
2019-09-18 04:04:54 +08:00
|
|
|
|
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval, zipfConstant;
|
|
|
|
|
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf;
|
2019-05-15 01:13:13 +08:00
|
|
|
|
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
2019-05-31 15:27:30 +08:00
|
|
|
|
std::vector<PerfIntCounter> opCounters;
|
|
|
|
|
std::vector<uint64_t> insertionCountsToMeasure;
|
|
|
|
|
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
|
2019-05-31 15:44:07 +08:00
|
|
|
|
std::string operationsSpec;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
//store operations to execute
|
2019-05-10 06:35:30 +08:00
|
|
|
|
int operations[MAX_OP][2];
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// used for periodically tracing
|
2019-05-31 15:27:30 +08:00
|
|
|
|
std::vector<PerfMetric> periodicMetrics;
|
|
|
|
|
// store latency of each operation with sampling
|
|
|
|
|
std::vector<ContinuousSample<double>> opLatencies;
|
2019-06-04 07:40:34 +08:00
|
|
|
|
// prefix of keys populated, e.g. 'mako00000xxxxxxx'
|
|
|
|
|
const std::string KEYPREFIX = "mako";
|
|
|
|
|
const int KEYPREFIXLEN = KEYPREFIX.size();
|
|
|
|
|
const std::array<std::string, MAX_OP> opNames = {"GRV", "GET", "GETRANGE", "SGET", "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", "COMMIT"};
|
2019-05-10 06:35:30 +08:00
|
|
|
|
MakoWorkload(WorkloadContext const& wcx)
|
2019-06-03 14:16:39 +08:00
|
|
|
|
: TestWorkload(wcx),
|
2019-05-15 01:13:13 +08:00
|
|
|
|
xacts("Transactions"), retries("Retries"), conflicts("Conflicts"), commits("Commits"), totalOps("Operations"),
|
|
|
|
|
loadTime(0.0)
|
|
|
|
|
{
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// init parameters from test file
|
|
|
|
|
// Number of rows populated
|
2019-05-10 06:35:30 +08:00
|
|
|
|
rowCount = getOption(options, LiteralStringRef("rows"), 10000);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// Test duration in seconds
|
2019-05-10 06:35:30 +08:00
|
|
|
|
testDuration = getOption(options, LiteralStringRef("testDuration"), 30.0);
|
|
|
|
|
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
|
|
|
|
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// Flag to control whether to populate data into database
|
|
|
|
|
populateData = getOption(options, LiteralStringRef("populateData"), true);
|
|
|
|
|
// Flag to control whether to run benchmark
|
|
|
|
|
runBenchmark = getOption(options, LiteralStringRef("runBenchmark"), true);
|
|
|
|
|
// Flag to control whether to clean data in the database
|
|
|
|
|
preserveData = getOption(options, LiteralStringRef("preserveData"), true);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// If true, force commit for read-only transactions
|
2019-05-15 01:13:13 +08:00
|
|
|
|
commitGet = getOption(options, LiteralStringRef("commitGet"), false);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// Target total transaction-per-second (TPS) of all clients
|
2019-05-18 02:15:35 +08:00
|
|
|
|
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 100000.0) / clientCount;
|
2019-05-29 06:43:41 +08:00
|
|
|
|
actorCountPerClient = getOption(options, LiteralStringRef("actorCountPerClient"), 16);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// Sampling rate (1 sample / <sampleSize> ops) for latency stats
|
2019-05-31 15:27:30 +08:00
|
|
|
|
sampleSize = getOption(options, LiteralStringRef("sampleSize"), rowCount / 100);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
|
|
|
|
|
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
|
2019-05-29 06:43:41 +08:00
|
|
|
|
periodicLoggingInterval = getOption( options, LiteralStringRef("periodicLoggingInterval"), 5.0 );
|
2019-09-18 04:04:54 +08:00
|
|
|
|
// If true, the workload will picking up keys which are zipfian distributed
|
|
|
|
|
zipf = getOption(options, LiteralStringRef("zipf"), false);
|
|
|
|
|
zipfConstant = getOption(options, LiteralStringRef("zipfConstant"), 0.99);
|
2019-06-03 14:16:39 +08:00
|
|
|
|
// Specified length of keys and length range of values
|
|
|
|
|
keyBytes = std::max( getOption( options, LiteralStringRef("keyBytes"), 16 ), 16);
|
|
|
|
|
maxValueBytes = getOption( options, LiteralStringRef("valueBytes"), 16 );
|
|
|
|
|
minValueBytes = getOption( options, LiteralStringRef("minValueBytes"), maxValueBytes);
|
|
|
|
|
ASSERT(minValueBytes <= maxValueBytes);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x')
|
|
|
|
|
// assume we want to insert 10000 rows with keyBytes set to 16,
|
|
|
|
|
// then the key goes from 'mako00000xxxxxxx' to 'mako09999xxxxxxx'
|
2019-05-18 02:15:35 +08:00
|
|
|
|
seqNumLen = digits(rowCount);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// check keyBytes, maxValueBytes is valid
|
2019-05-18 02:15:35 +08:00
|
|
|
|
ASSERT(seqNumLen + KEYPREFIXLEN <= keyBytes);
|
2019-05-22 09:49:16 +08:00
|
|
|
|
ASSERT(keyBytes <= MAXKEYVALUESIZE);
|
|
|
|
|
ASSERT(maxValueBytes <= MAXKEYVALUESIZE);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// user input: a sequence of operations to be executed; e.g. "g10i5" means to do GET 10 times and Insert 5 times
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// One operation type is defined as "<Type><Count>" or "<Type><Count>:<Range>".
|
|
|
|
|
// When Count is omitted, it's equivalent to setting it to 1. (e.g. "g" is equivalent to "g1")
|
|
|
|
|
// Multiple operation types can be concatenated. (e.g. "g9u1" = 9 GETs and 1 update)
|
|
|
|
|
// For RANGE operations, "Range" needs to be specified in addition to "Count".
|
|
|
|
|
// Below are all allowed inputs:
|
|
|
|
|
// g – GET
|
|
|
|
|
// gr – GET RANGE
|
|
|
|
|
// sg – Snapshot GET
|
|
|
|
|
// sgr – Snapshot GET RANGE
|
|
|
|
|
// u – Update (= GET followed by SET)
|
|
|
|
|
// i – Insert (= SET with a new key)
|
|
|
|
|
// ir – Insert Range (Sequential)
|
|
|
|
|
// c – CLEAR
|
|
|
|
|
// sc – SET & CLEAR
|
|
|
|
|
// cr – CLEAR RANGE
|
|
|
|
|
// scr – SET & CLEAR RANGE
|
|
|
|
|
// grv – GetReadVersion()
|
|
|
|
|
// Every transaction is committed unless it contains only GET / GET RANGE operations.
|
|
|
|
|
operationsSpec = getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString();
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// parse the sequence and extract operations to be executed
|
2019-05-18 02:15:35 +08:00
|
|
|
|
parseOperationsSpec();
|
2019-05-10 06:35:30 +08:00
|
|
|
|
for (int i = 0; i < MAX_OP; ++i) {
|
|
|
|
|
// initilize per-operation latency record
|
2019-05-31 15:27:30 +08:00
|
|
|
|
opLatencies.push_back(ContinuousSample<double>(rowCount / sampleSize));
|
2019-05-10 06:35:30 +08:00
|
|
|
|
// initialize per-operation counter
|
|
|
|
|
opCounters.push_back(PerfIntCounter(opNames[i]));
|
|
|
|
|
}
|
2019-09-18 04:04:54 +08:00
|
|
|
|
if (zipf){
|
|
|
|
|
zipfian_generator3(0, (int)rowCount-1, zipfConstant);
|
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
|
2019-05-16 03:58:12 +08:00
|
|
|
|
std::string description() override {
|
|
|
|
|
// Mako is a simple workload to measure the performance of FDB.
|
|
|
|
|
// The primary purpose of this benchmark is to generate consistent performance results
|
|
|
|
|
return "Mako";
|
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
2019-05-15 01:13:13 +08:00
|
|
|
|
Future<Void> setup(Database const& cx) override {
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// use all the clients to populate data
|
|
|
|
|
if (populateData)
|
2019-05-16 03:58:12 +08:00
|
|
|
|
return _setup(cx, this);
|
|
|
|
|
return Void();
|
2019-05-15 01:13:13 +08:00
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
|
|
|
|
Future<Void> start(Database const& cx) override {
|
2019-05-15 01:13:13 +08:00
|
|
|
|
return _start(cx, this);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<bool> check(Database const& cx) override {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2019-09-18 04:04:54 +08:00
|
|
|
|
// disable the default timeout setting
|
|
|
|
|
double getCheckTimeout() {return std::numeric_limits<double>::max();}
|
|
|
|
|
|
2019-05-31 15:27:30 +08:00
|
|
|
|
void getMetrics(std::vector<PerfMetric>& m) override {
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// metrics of population process
|
|
|
|
|
if (populateData){
|
2019-05-15 01:13:13 +08:00
|
|
|
|
m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) );
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// The importing rate of keys, controlled by parameter "insertionCountsToMeasure"
|
2019-05-15 01:13:13 +08:00
|
|
|
|
auto ratesItr = ratesAtKeyCounts.begin();
|
2019-05-16 03:58:12 +08:00
|
|
|
|
for(; ratesItr != ratesAtKeyCounts.end(); ratesItr++){
|
2019-05-15 01:13:13 +08:00
|
|
|
|
m.push_back(PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
|
2019-05-16 03:58:12 +08:00
|
|
|
|
}
|
2019-05-15 01:13:13 +08:00
|
|
|
|
}
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// benchmark
|
|
|
|
|
if (runBenchmark){
|
|
|
|
|
m.push_back(PerfMetric("Measured Duration", testDuration, true));
|
|
|
|
|
m.push_back(xacts.getMetric());
|
|
|
|
|
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
|
|
|
|
|
m.push_back(totalOps.getMetric());
|
|
|
|
|
m.push_back(PerfMetric("Operations/sec", totalOps.getValue() / testDuration, true));
|
|
|
|
|
m.push_back(conflicts.getMetric());
|
|
|
|
|
m.push_back(PerfMetric("Conflicts/sec", conflicts.getValue() / testDuration, true));
|
|
|
|
|
m.push_back(retries.getMetric());
|
|
|
|
|
|
|
|
|
|
// count of each operation
|
|
|
|
|
for (int i = 0; i < MAX_OP; ++i){
|
|
|
|
|
m.push_back(opCounters[i].getMetric());
|
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// Meaningful Latency metrics
|
|
|
|
|
const int opExecutedAtOnce[] = {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT};
|
|
|
|
|
for (const int& op : opExecutedAtOnce){
|
2019-05-31 15:27:30 +08:00
|
|
|
|
m.push_back(PerfMetric("Mean " + opNames[op] +" Latency (ms)", 1000 * opLatencies[op].mean(), true));
|
|
|
|
|
m.push_back(PerfMetric("Max " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].max(), true));
|
|
|
|
|
m.push_back(PerfMetric("Min " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].min(), true));
|
2019-05-18 02:15:35 +08:00
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
2019-05-18 02:15:35 +08:00
|
|
|
|
//insert logging metrics if exists
|
|
|
|
|
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
2019-05-31 15:44:07 +08:00
|
|
|
|
static std::string randStr(int len) {
|
|
|
|
|
std::string result(len, '.');
|
2019-05-15 01:13:13 +08:00
|
|
|
|
for (int i = 0; i < len; ++i) {
|
2019-05-31 15:44:07 +08:00
|
|
|
|
result[i] = deterministicRandom()->randomAlphaNumeric();
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
2019-05-31 15:44:07 +08:00
|
|
|
|
return result;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
2019-05-15 01:13:13 +08:00
|
|
|
|
|
2019-05-31 15:27:30 +08:00
|
|
|
|
static void randStr(char *str, int len){
|
2019-05-16 03:58:12 +08:00
|
|
|
|
for (int i = 0; i < len; ++i) {
|
2019-05-29 09:37:55 +08:00
|
|
|
|
str[i] = deterministicRandom()->randomAlphaNumeric();
|
2019-05-15 01:13:13 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2019-05-16 03:58:12 +08:00
|
|
|
|
|
2019-05-10 06:35:30 +08:00
|
|
|
|
Value randomValue() {
|
2019-05-29 09:37:55 +08:00
|
|
|
|
const int length = deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1);
|
2019-05-31 15:44:07 +08:00
|
|
|
|
std::string valueString = randStr(length);
|
2019-05-22 07:22:02 +08:00
|
|
|
|
return StringRef(reinterpret_cast<const uint8_t*>(valueString.c_str()), length);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
|
2019-06-03 14:16:39 +08:00
|
|
|
|
Key keyForIndex(uint64_t ind) {
|
2019-05-22 09:49:16 +08:00
|
|
|
|
Key result = makeString(keyBytes);
|
2019-05-22 07:22:02 +08:00
|
|
|
|
char* data = reinterpret_cast<char*>(mutateString(result));
|
2019-05-22 09:49:16 +08:00
|
|
|
|
format((KEYPREFIX + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen);
|
2019-05-18 02:15:35 +08:00
|
|
|
|
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
|
2019-05-15 01:13:13 +08:00
|
|
|
|
data[i] = 'x';
|
2019-05-22 09:49:16 +08:00
|
|
|
|
return result;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* number of digits */
|
|
|
|
|
static uint64_t digits(uint64_t num) {
|
|
|
|
|
uint64_t digits = 0;
|
|
|
|
|
while (num > 0) {
|
|
|
|
|
num /= 10;
|
|
|
|
|
digits++;
|
|
|
|
|
}
|
|
|
|
|
return digits;
|
|
|
|
|
}
|
|
|
|
|
Standalone<KeyValueRef> operator()(uint64_t n) {
|
2019-06-03 14:16:39 +08:00
|
|
|
|
return KeyValueRef(keyForIndex(n), randomValue());
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
|
2019-05-15 01:13:13 +08:00
|
|
|
|
ACTOR static Future<Void> tracePeriodically( MakoWorkload *self){
|
|
|
|
|
state double start = now();
|
|
|
|
|
state double elapsed = 0.0;
|
|
|
|
|
state int64_t last_ops = 0;
|
|
|
|
|
state int64_t last_xacts = 0;
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
elapsed += self->periodicLoggingInterval;
|
|
|
|
|
wait( delayUntil(start + elapsed));
|
|
|
|
|
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->opLatencies[OP_COMMIT].mean()).detail("Median", self->opLatencies[OP_COMMIT].median()).detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05)).detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95)).detail("Count", self->opCounters[OP_COMMIT].getValue()).detail("Elapsed", elapsed);
|
|
|
|
|
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->opLatencies[OP_GETREADVERSION].mean()).detail("Median", self->opLatencies[OP_GETREADVERSION].median()).detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05)).detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95)).detail("Count", self->opCounters[OP_GETREADVERSION].getValue());
|
|
|
|
|
|
|
|
|
|
std::string ts = format("T=%04.0fs: ", elapsed);
|
|
|
|
|
self->periodicMetrics.push_back(PerfMetric(ts + "Transactions/sec", (self->xacts.getValue() - last_xacts) / self->periodicLoggingInterval, false));
|
|
|
|
|
self->periodicMetrics.push_back(PerfMetric(ts + "Operations/sec", (self->totalOps.getValue() - last_ops) / self->periodicLoggingInterval, false));
|
|
|
|
|
|
|
|
|
|
last_xacts = self->xacts.getValue();
|
|
|
|
|
last_ops = self->totalOps.getValue();
|
|
|
|
|
}
|
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
ACTOR Future<Void> _setup(Database cx, MakoWorkload* self) {
|
|
|
|
|
|
|
|
|
|
state Promise<double> loadTime;
|
2019-05-31 15:27:30 +08:00
|
|
|
|
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
|
|
|
|
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay,
|
|
|
|
|
self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
|
|
|
|
|
2019-05-15 01:13:13 +08:00
|
|
|
|
// This is the setup time
|
2019-05-10 06:35:30 +08:00
|
|
|
|
self->loadTime = loadTime.getFuture().get();
|
2019-05-15 01:13:13 +08:00
|
|
|
|
// This is the rates of importing keys
|
2019-05-10 06:35:30 +08:00
|
|
|
|
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
|
|
|
|
|
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ACTOR Future<Void> _start(Database cx, MakoWorkload* self) {
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// TODO: Do I need to read data to warm the cache of the keySystem like ReadWrite.actor.cpp (line 465)?
|
|
|
|
|
if (self->runBenchmark) {
|
2019-05-31 15:27:30 +08:00
|
|
|
|
wait(self->_runBenchmark(cx, self));
|
2019-05-18 02:15:35 +08:00
|
|
|
|
}
|
|
|
|
|
if (!self->preserveData && self->clientId == 0){
|
|
|
|
|
wait(self->cleanup(cx, self));
|
|
|
|
|
}
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
|
2019-05-31 15:27:30 +08:00
|
|
|
|
ACTOR Future<Void> _runBenchmark(Database cx, MakoWorkload* self){
|
|
|
|
|
std::vector<Future<Void>> clients;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
for (int c = 0; c < self->actorCountPerClient; ++c) {
|
|
|
|
|
clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c));
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (self->enableLogging)
|
|
|
|
|
clients.push_back(tracePeriodically(self));
|
|
|
|
|
|
|
|
|
|
wait( timeout( waitForAll( clients ), self->testDuration, Void() ) );
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
|
2019-05-15 01:13:13 +08:00
|
|
|
|
ACTOR Future<Void> makoClient(Database cx, MakoWorkload* self, double delay, int actorIndex) {
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
2019-05-22 09:49:16 +08:00
|
|
|
|
state Key rkey, rkey2;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
state Value rval;
|
2019-05-22 07:22:02 +08:00
|
|
|
|
state ReadYourWritesTransaction tr(cx);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
state bool doCommit;
|
|
|
|
|
state int i, count;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
state uint64_t range, indBegin, indEnd, rangeLen;
|
2019-05-15 01:13:13 +08:00
|
|
|
|
state double lastTime = now();
|
2019-05-16 03:58:12 +08:00
|
|
|
|
state double commitStart;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
state KeyRangeRef rkeyRangeRef;
|
2019-05-31 15:27:30 +08:00
|
|
|
|
state std::vector<int> perOpCount(MAX_OP, 0);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
2019-05-22 07:22:02 +08:00
|
|
|
|
TraceEvent("ClientStarting").detail("ActorIndex", actorIndex).detail("ClientIndex", self->clientId).detail("NumActors", self->actorCountPerClient);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
|
2019-05-10 06:35:30 +08:00
|
|
|
|
loop {
|
2019-05-15 01:13:13 +08:00
|
|
|
|
// used for throttling
|
|
|
|
|
wait(poisson(&lastTime, delay));
|
2019-05-10 06:35:30 +08:00
|
|
|
|
try{
|
2019-05-15 01:13:13 +08:00
|
|
|
|
// user-defined value: whether commit read-only ops or not; default is false
|
|
|
|
|
doCommit = self->commitGet;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
for (i = 0; i < MAX_OP; ++i) {
|
2019-05-15 01:13:13 +08:00
|
|
|
|
if (i == OP_COMMIT)
|
|
|
|
|
continue;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
for (count = 0; count < self->operations[i][0]; ++count) {
|
2019-05-31 15:27:30 +08:00
|
|
|
|
range = std::min(RANGELIMIT, self->operations[i][1]);
|
|
|
|
|
rangeLen = digits(range);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
// generate random key-val pair for operation
|
2019-09-18 04:04:54 +08:00
|
|
|
|
indBegin = self->getRandomKeyIndex(self->rowCount);
|
2019-06-03 14:16:39 +08:00
|
|
|
|
rkey = self->keyForIndex(indBegin);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
rval = self->randomValue();
|
2019-05-22 09:49:16 +08:00
|
|
|
|
indEnd = std::min(indBegin + range, self->rowCount);
|
2019-06-03 14:16:39 +08:00
|
|
|
|
rkey2 = self->keyForIndex(indEnd);
|
2019-05-22 09:49:16 +08:00
|
|
|
|
// KeyRangeRef(min, maxPlusOne)
|
|
|
|
|
rkeyRangeRef = KeyRangeRef(rkey, rkey2);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
|
|
|
|
|
if (i == OP_GETREADVERSION){
|
2019-05-18 02:15:35 +08:00
|
|
|
|
wait(logLatency(tr.getReadVersion(), &self->opLatencies[i]));
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
else if (i == OP_GET){
|
|
|
|
|
wait(logLatency(tr.get(rkey, false), &self->opLatencies[i]));
|
|
|
|
|
} else if (i == OP_GETRANGE){
|
2019-05-31 15:27:30 +08:00
|
|
|
|
wait(logLatency(tr.getRange(rkeyRangeRef, RANGELIMIT, false), &self->opLatencies[i]));
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
else if (i == OP_SGET){
|
|
|
|
|
wait(logLatency(tr.get(rkey, true), &self->opLatencies[i]));
|
|
|
|
|
} else if (i == OP_SGETRANGE){
|
|
|
|
|
//do snapshot get range here
|
2019-05-31 15:27:30 +08:00
|
|
|
|
wait(logLatency(tr.getRange(rkeyRangeRef, RANGELIMIT, true), &self->opLatencies[i]));
|
2019-05-10 06:35:30 +08:00
|
|
|
|
} else if (i == OP_UPDATE){
|
|
|
|
|
wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET]));
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.set(rkey, rval);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
doCommit = true;
|
|
|
|
|
} else if (i == OP_INSERT){
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters
|
2019-06-04 07:40:34 +08:00
|
|
|
|
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.set(rkey, rval);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
doCommit = true;
|
|
|
|
|
} else if (i == OP_INSERTRANGE){
|
2019-05-22 07:22:02 +08:00
|
|
|
|
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
2019-06-04 07:40:34 +08:00
|
|
|
|
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
for (int range_i = 0; range_i < range; ++range_i){
|
2019-05-22 09:49:16 +08:00
|
|
|
|
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.set(rkey, self->randomValue());
|
2019-05-15 01:13:13 +08:00
|
|
|
|
}
|
2019-05-10 06:35:30 +08:00
|
|
|
|
doCommit = true;
|
|
|
|
|
} else if (i == OP_CLEAR){
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.clear(rkey);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
doCommit = true;
|
2019-05-15 01:13:13 +08:00
|
|
|
|
} else if(i == OP_SETCLEAR){
|
2019-06-04 07:40:34 +08:00
|
|
|
|
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.set(rkey, rval);
|
2019-05-16 03:58:12 +08:00
|
|
|
|
// commit the change and update metrics
|
|
|
|
|
commitStart = now();
|
2019-05-15 01:13:13 +08:00
|
|
|
|
wait(tr.commit());
|
2019-05-16 03:58:12 +08:00
|
|
|
|
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
|
|
|
|
++perOpCount[OP_COMMIT];
|
2019-05-15 01:13:13 +08:00
|
|
|
|
tr.reset();
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.clear(rkey);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
doCommit = true;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
} else if (i == OP_CLEARRANGE){
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.clear(rkeyRangeRef);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
doCommit = true;
|
2019-05-15 01:13:13 +08:00
|
|
|
|
} else if (i == OP_SETCLEARRANGE){
|
2019-05-22 07:22:02 +08:00
|
|
|
|
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
2019-06-04 07:40:34 +08:00
|
|
|
|
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
state std::string scr_start_key;
|
|
|
|
|
state std::string scr_end_key;
|
|
|
|
|
for (int range_i = 0; range_i < range; ++range_i){
|
2019-05-22 09:49:16 +08:00
|
|
|
|
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.set(rkey, self->randomValue());
|
2019-05-15 01:13:13 +08:00
|
|
|
|
if (range_i == 0)
|
2019-05-18 02:15:35 +08:00
|
|
|
|
scr_start_key = rkey.toString();
|
2019-05-15 01:13:13 +08:00
|
|
|
|
}
|
2019-05-18 02:15:35 +08:00
|
|
|
|
scr_end_key = rkey.toString();
|
2019-05-16 03:58:12 +08:00
|
|
|
|
commitStart = now();
|
2019-05-15 01:13:13 +08:00
|
|
|
|
wait(tr.commit());
|
2019-05-16 03:58:12 +08:00
|
|
|
|
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
|
|
|
|
++perOpCount[OP_COMMIT];
|
2019-05-15 01:13:13 +08:00
|
|
|
|
tr.reset();
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.clear(KeyRangeRef(StringRef(scr_start_key), StringRef(scr_end_key)));
|
2019-05-15 01:13:13 +08:00
|
|
|
|
doCommit = true;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
++perOpCount[i];
|
|
|
|
|
}
|
|
|
|
|
}
|
2019-05-15 01:13:13 +08:00
|
|
|
|
|
2019-05-10 06:35:30 +08:00
|
|
|
|
if (doCommit) {
|
2019-05-16 03:58:12 +08:00
|
|
|
|
commitStart = now();
|
2019-05-10 06:35:30 +08:00
|
|
|
|
wait(tr.commit());
|
|
|
|
|
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
|
|
|
|
++perOpCount[OP_COMMIT];
|
|
|
|
|
}
|
|
|
|
|
// successfully finish the transaction, update metrics
|
|
|
|
|
++self->xacts;
|
|
|
|
|
for (int op = 0; op < MAX_OP; ++op){
|
|
|
|
|
self->opCounters[op] += perOpCount[op];
|
|
|
|
|
self->totalOps += perOpCount[op];
|
|
|
|
|
}
|
|
|
|
|
} catch (Error& e) {
|
2019-05-31 15:27:30 +08:00
|
|
|
|
TraceEvent("FailedToExecOperations").error(e);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
if (e.code() == error_code_operation_cancelled)
|
|
|
|
|
throw;
|
2019-05-15 01:13:13 +08:00
|
|
|
|
else if (e.code() == error_code_not_committed)
|
2019-05-10 06:35:30 +08:00
|
|
|
|
++self->conflicts;
|
2019-05-15 01:13:13 +08:00
|
|
|
|
|
2019-05-10 06:35:30 +08:00
|
|
|
|
wait(tr.onError(e));
|
2019-05-15 01:13:13 +08:00
|
|
|
|
++self->retries;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
// reset all the operations' counters to 0
|
|
|
|
|
std::fill(perOpCount.begin(), perOpCount.end(), 0);
|
|
|
|
|
tr.reset();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2019-05-15 01:13:13 +08:00
|
|
|
|
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
2019-05-18 02:15:35 +08:00
|
|
|
|
// clear all data starts with 'mako' in the database
|
2019-06-04 07:40:34 +08:00
|
|
|
|
state std::string keyPrefix(self->KEYPREFIX);
|
2019-05-22 07:22:02 +08:00
|
|
|
|
state ReadYourWritesTransaction tr(cx);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
|
|
|
|
|
loop{
|
|
|
|
|
try {
|
2019-05-22 07:22:02 +08:00
|
|
|
|
tr.clear(prefixRange(keyPrefix));
|
2019-05-15 01:13:13 +08:00
|
|
|
|
wait(tr.commit());
|
|
|
|
|
break;
|
|
|
|
|
} catch (Error &e){
|
2019-05-31 15:27:30 +08:00
|
|
|
|
TraceEvent("FailedToCleanData").error(e);
|
2019-05-15 01:13:13 +08:00
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
2019-05-18 02:15:35 +08:00
|
|
|
|
ACTOR template<class T>
|
|
|
|
|
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies){
|
2019-05-10 06:35:30 +08:00
|
|
|
|
state double opBegin = now();
|
2019-09-26 14:19:42 +08:00
|
|
|
|
wait(success(f));
|
2019-05-18 02:15:35 +08:00
|
|
|
|
opLatencies->addSample(now() - opBegin);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
|
2019-09-18 04:04:54 +08:00
|
|
|
|
int64_t getRandomKeyIndex(uint64_t rowCount) {
|
|
|
|
|
int64_t randomKeyIndex;
|
|
|
|
|
if (zipf){
|
|
|
|
|
randomKeyIndex = zipfian_next();
|
|
|
|
|
} else {
|
|
|
|
|
randomKeyIndex = deterministicRandom()->randomInt64(0, rowCount);
|
|
|
|
|
}
|
|
|
|
|
return randomKeyIndex;
|
2019-05-15 01:13:13 +08:00
|
|
|
|
}
|
2019-05-31 15:27:30 +08:00
|
|
|
|
void parseOperationsSpec() {
|
2019-05-18 02:15:35 +08:00
|
|
|
|
const char *ptr = operationsSpec.c_str();
|
2019-05-10 06:35:30 +08:00
|
|
|
|
int op = 0;
|
|
|
|
|
int rangeop = 0;
|
|
|
|
|
int num;
|
|
|
|
|
int error = 0;
|
|
|
|
|
|
|
|
|
|
for (op = 0; op < MAX_OP; op++) {
|
|
|
|
|
operations[op][OP_COUNT] = 0;
|
|
|
|
|
operations[op][OP_RANGE] = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
op = 0;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
while (*ptr) {
|
|
|
|
|
if (strncmp(ptr, "grv", 3) == 0) {
|
|
|
|
|
op = OP_GETREADVERSION;
|
|
|
|
|
ptr += 3;
|
|
|
|
|
} else if (strncmp(ptr, "gr", 2) == 0) {
|
|
|
|
|
op = OP_GETRANGE;
|
|
|
|
|
rangeop = 1;
|
|
|
|
|
ptr += 2;
|
|
|
|
|
} else if (strncmp(ptr, "g", 1) == 0) {
|
|
|
|
|
op = OP_GET;
|
|
|
|
|
ptr++;
|
|
|
|
|
} else if (strncmp(ptr, "sgr", 3) == 0) {
|
|
|
|
|
op = OP_SGETRANGE;
|
|
|
|
|
rangeop = 1;
|
|
|
|
|
ptr += 3;
|
|
|
|
|
} else if (strncmp(ptr, "sg", 2) == 0) {
|
|
|
|
|
op = OP_SGET;
|
|
|
|
|
ptr += 2;
|
|
|
|
|
} else if (strncmp(ptr, "u", 1) == 0) {
|
2019-05-10 06:35:30 +08:00
|
|
|
|
op = OP_UPDATE;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
ptr++;
|
|
|
|
|
} else if (strncmp(ptr, "ir", 2) == 0) {
|
|
|
|
|
op = OP_INSERTRANGE;
|
|
|
|
|
rangeop = 1;
|
|
|
|
|
ptr += 2;
|
|
|
|
|
} else if (strncmp(ptr, "i", 1) == 0) {
|
2019-05-10 06:35:30 +08:00
|
|
|
|
op = OP_INSERT;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
ptr++;
|
|
|
|
|
} else if (strncmp(ptr, "cr", 2) == 0) {
|
|
|
|
|
op = OP_CLEARRANGE;
|
|
|
|
|
rangeop = 1;
|
|
|
|
|
ptr += 2;
|
|
|
|
|
} else if (strncmp(ptr, "c", 1) == 0) {
|
|
|
|
|
op = OP_CLEAR;
|
|
|
|
|
ptr++;
|
|
|
|
|
} else if (strncmp(ptr, "scr", 3) == 0) {
|
|
|
|
|
op = OP_SETCLEARRANGE;
|
|
|
|
|
rangeop = 1;
|
|
|
|
|
ptr += 3;
|
|
|
|
|
} else if (strncmp(ptr, "sc", 2) == 0) {
|
|
|
|
|
op = OP_SETCLEAR;
|
|
|
|
|
ptr += 2;
|
|
|
|
|
} else {
|
|
|
|
|
error = 1;
|
|
|
|
|
break;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* count */
|
|
|
|
|
num = 0;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
if ((*ptr < '0') || (*ptr > '9')) {
|
2019-05-10 06:35:30 +08:00
|
|
|
|
num = 1; /* if omitted, set it to 1 */
|
|
|
|
|
} else {
|
2019-05-16 03:58:12 +08:00
|
|
|
|
while ((*ptr >= '0') && (*ptr <= '9')) {
|
|
|
|
|
num = num * 10 + *ptr - '0';
|
|
|
|
|
ptr++;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/* set count */
|
|
|
|
|
operations[op][OP_COUNT] = num;
|
|
|
|
|
|
|
|
|
|
if (rangeop) {
|
2019-05-16 03:58:12 +08:00
|
|
|
|
if (*ptr != ':') {
|
2019-05-10 06:35:30 +08:00
|
|
|
|
error = 1;
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
2019-05-16 03:58:12 +08:00
|
|
|
|
ptr++; /* skip ':' */
|
2019-05-10 06:35:30 +08:00
|
|
|
|
num = 0;
|
2019-05-16 03:58:12 +08:00
|
|
|
|
if ((*ptr < '0') || (*ptr > '9')) {
|
2019-05-10 06:35:30 +08:00
|
|
|
|
error = 1;
|
|
|
|
|
break;
|
|
|
|
|
}
|
2019-05-16 03:58:12 +08:00
|
|
|
|
while ((*ptr >= '0') && (*ptr <= '9')) {
|
|
|
|
|
num = num * 10 + *ptr - '0';
|
|
|
|
|
ptr++;
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
/* set range */
|
2019-05-31 15:27:30 +08:00
|
|
|
|
if (num > RANGELIMIT)
|
|
|
|
|
TraceEvent(SevError, "RangeExceedLimit").detail("RangeLimit", RANGELIMIT).detail("Range", num);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
operations[op][OP_RANGE] = num;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
rangeop = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (error) {
|
2019-05-31 15:27:30 +08:00
|
|
|
|
TraceEvent(SevError, "InvalidTransactionSpecification").detail("operations", operationsSpec);
|
2019-05-10 06:35:30 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
WorkloadFactory<MakoWorkload> MakoloadFactory("Mako");
|