fix bugs, update naming and comments, refine functions
This commit is contained in:
parent
6788c8eb7d
commit
12a51b2d39
|
@ -130,7 +130,6 @@ set(FDBSERVER_SRCS
|
||||||
workloads/LowLatency.actor.cpp
|
workloads/LowLatency.actor.cpp
|
||||||
workloads/MachineAttrition.actor.cpp
|
workloads/MachineAttrition.actor.cpp
|
||||||
workloads/Mako.actor.cpp
|
workloads/Mako.actor.cpp
|
||||||
workloads/Mako.h
|
|
||||||
workloads/MemoryKeyValueStore.cpp
|
workloads/MemoryKeyValueStore.cpp
|
||||||
workloads/MemoryKeyValueStore.h
|
workloads/MemoryKeyValueStore.h
|
||||||
workloads/MemoryLifetime.actor.cpp
|
workloads/MemoryLifetime.actor.cpp
|
||||||
|
|
|
@ -2,21 +2,24 @@
|
||||||
#include "fdbserver/TesterInterface.actor.h"
|
#include "fdbserver/TesterInterface.actor.h"
|
||||||
#include "workloads.actor.h"
|
#include "workloads.actor.h"
|
||||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||||
#include "fdbserver/workloads/Mako.h"
|
|
||||||
#include "flow/actorcompiler.h"
|
#include "flow/actorcompiler.h"
|
||||||
|
|
||||||
static const char* opNames[MAX_OP] = {"GRV", "GET", "GETRANGE", "SGET", "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", "COMMIT"};
|
|
||||||
|
|
||||||
|
enum {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_UPDATE, OP_INSERT, OP_INSERTRANGE, OP_CLEAR, OP_SETCLEAR, OP_CLEARRANGE, OP_SETCLEARRANGE, OP_COMMIT, MAX_OP};
|
||||||
|
enum {OP_COUNT, OP_RANGE};
|
||||||
|
static constexpr int BUFFERSIZE = 10000;
|
||||||
|
static constexpr int KEYPREFIXLEN = 4;
|
||||||
|
static const char* opNames[MAX_OP] = {"GRV", "GET", "GETRANGE", "SGET", "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", "COMMIT"};
|
||||||
|
static const std::string KEYPREFIX = "mako";
|
||||||
struct MakoWorkload : KVWorkload {
|
struct MakoWorkload : KVWorkload {
|
||||||
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient;
|
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient;
|
||||||
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval;
|
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval;
|
||||||
double metricsStart, metricsDuration;
|
bool enableLogging, commitGet, populateData, runBenchmark, preserveData;
|
||||||
bool enableLogging, commitGet;
|
|
||||||
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
||||||
vector<PerfIntCounter> opCounters;
|
vector<PerfIntCounter> opCounters;
|
||||||
vector<uint64_t> insertionCountsToMeasure;
|
vector<uint64_t> insertionCountsToMeasure;
|
||||||
vector<pair<uint64_t, double>> ratesAtKeyCounts;
|
vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
|
||||||
std::string valueString, unparsedTxStr, mode;
|
std::string valueString, operationsSpec;
|
||||||
|
|
||||||
//store operations to execute
|
//store operations to execute
|
||||||
int operations[MAX_OP][2];
|
int operations[MAX_OP][2];
|
||||||
|
@ -37,13 +40,16 @@ struct MakoWorkload : KVWorkload {
|
||||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 30.0);
|
testDuration = getOption(options, LiteralStringRef("testDuration"), 30.0);
|
||||||
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
||||||
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
||||||
// One of the following modes must be specified:
|
// Flag to control whether to populate data into database
|
||||||
// clean : Clean up existing data; build : Populate data; run : Run the benchmark
|
populateData = getOption(options, LiteralStringRef("populateData"), true);
|
||||||
mode = getOption(options, LiteralStringRef("mode"), LiteralStringRef("build")).contents().toString();
|
// Flag to control whether to run benchmark
|
||||||
|
runBenchmark = getOption(options, LiteralStringRef("runBenchmark"), true);
|
||||||
|
// Flag to control whether to clean data in the database
|
||||||
|
preserveData = getOption(options, LiteralStringRef("preserveData"), true);
|
||||||
// If true, force commit for read-only transactions
|
// If true, force commit for read-only transactions
|
||||||
commitGet = getOption(options, LiteralStringRef("commitGet"), false);
|
commitGet = getOption(options, LiteralStringRef("commitGet"), false);
|
||||||
// Target total transaction-per-second (TPS) of all clients
|
// Target total transaction-per-second (TPS) of all clients
|
||||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 1000.0) / clientCount;
|
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 100000.0) / clientCount;
|
||||||
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
|
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
|
||||||
actorCountPerClient = ceil(transactionsPerSecond * allowedLatency);
|
actorCountPerClient = ceil(transactionsPerSecond * allowedLatency);
|
||||||
actorCountPerClient = getOption(options, LiteralStringRef("actorCountPerClient"), actorCountPerClient);
|
actorCountPerClient = getOption(options, LiteralStringRef("actorCountPerClient"), actorCountPerClient);
|
||||||
|
@ -52,24 +58,38 @@ struct MakoWorkload : KVWorkload {
|
||||||
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
|
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
|
||||||
periodicLoggingInterval = getOption( options, LiteralStringRef("periodicLoggingInterval"), 5.0 );
|
periodicLoggingInterval = getOption( options, LiteralStringRef("periodicLoggingInterval"), 5.0 );
|
||||||
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
|
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
|
||||||
// Start time of benchmark
|
|
||||||
metricsStart = getOption( options, LiteralStringRef("metricsStart"), 0.0 );
|
|
||||||
// The duration of benchmark
|
|
||||||
metricsDuration = getOption( options, LiteralStringRef("metricsDuration"), testDuration );
|
|
||||||
// used to store randomly generated value
|
// used to store randomly generated value
|
||||||
valueString = std::string(maxValueBytes, '.');
|
valueString = std::string(maxValueBytes, '.');
|
||||||
// The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x')
|
// The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x')
|
||||||
// assume we want to insert 10000 rows with keyBytes set to 16,
|
// assume we want to insert 10000 rows with keyBytes set to 16,
|
||||||
// then the key goes from 'mako00000xxxxxxx' to 'mako09999xxxxxxx'
|
// then the key goes from 'mako00000xxxxxxx' to 'mako09999xxxxxxx'
|
||||||
seqNumLen = digits(rowCount) + KEYPREFIXLEN;
|
seqNumLen = digits(rowCount);
|
||||||
// check keyBytes, maxValueBytes is valid
|
// check keyBytes, maxValueBytes is valid
|
||||||
ASSERT(seqNumLen <= keyBytes);
|
ASSERT(seqNumLen + KEYPREFIXLEN <= keyBytes);
|
||||||
ASSERT(keyBytes <= BUFFERSIZE);
|
ASSERT(keyBytes <= BUFFERSIZE);
|
||||||
ASSERT(maxValueBytes <= BUFFERSIZE);
|
ASSERT(maxValueBytes <= BUFFERSIZE);
|
||||||
// user input: a sequence of operations to be executed; e.g. "g10i5" means to do GET 10 times and Insert 5 times
|
// user input: a sequence of operations to be executed; e.g. "g10i5" means to do GET 10 times and Insert 5 times
|
||||||
unparsedTxStr = getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString();
|
// One operation type is defined as "<Type><Count>" or "<Type><Count>:<Range>".
|
||||||
|
// When Count is omitted, it's equivalent to setting it to 1. (e.g. "g" is equivalent to "g1")
|
||||||
|
// Multiple operation types can be concatenated. (e.g. "g9u1" = 9 GETs and 1 update)
|
||||||
|
// For RANGE operations, "Range" needs to be specified in addition to "Count".
|
||||||
|
// Below are all allowed inputs:
|
||||||
|
// g – GET
|
||||||
|
// gr – GET RANGE
|
||||||
|
// sg – Snapshot GET
|
||||||
|
// sgr – Snapshot GET RANGE
|
||||||
|
// u – Update (= GET followed by SET)
|
||||||
|
// i – Insert (= SET with a new key)
|
||||||
|
// ir – Insert Range (Sequential)
|
||||||
|
// c – CLEAR
|
||||||
|
// sc – SET & CLEAR
|
||||||
|
// cr – CLEAR RANGE
|
||||||
|
// scr – SET & CLEAR RANGE
|
||||||
|
// grv – GetReadVersion()
|
||||||
|
// Every transaction is committed unless it contains only GET / GET RANGE operations.
|
||||||
|
operationsSpec = getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString();
|
||||||
// parse the sequence and extract operations to be executed
|
// parse the sequence and extract operations to be executed
|
||||||
parse_transaction();
|
parseOperationsSpec();
|
||||||
for (int i = 0; i < MAX_OP; ++i) {
|
for (int i = 0; i < MAX_OP; ++i) {
|
||||||
// initilize per-operation latency record
|
// initilize per-operation latency record
|
||||||
opLatencies.push_back(opLatencies.arena(), ContinuousSample<double>(rowCount / sampleSize));
|
opLatencies.push_back(opLatencies.arena(), ContinuousSample<double>(rowCount / sampleSize));
|
||||||
|
@ -85,21 +105,13 @@ struct MakoWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> setup(Database const& cx) override {
|
Future<Void> setup(Database const& cx) override {
|
||||||
// populate data when we are in "build" mode and use one client to do it
|
// use all the clients to populate data
|
||||||
if (mode.compare("build") == 0 && clientId == 0)
|
if (populateData)
|
||||||
return _setup(cx, this);
|
return _setup(cx, this);
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> start(Database const& cx) override {
|
Future<Void> start(Database const& cx) override {
|
||||||
// TODO: Do I need to read data to warm the cache of the keySystem like ReadWrite.actor.cpp (line 465)?
|
|
||||||
if (mode == "clean"){
|
|
||||||
if (clientId == 0)
|
|
||||||
return cleanup(cx, this);
|
|
||||||
else
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
return _start(cx, this);
|
return _start(cx, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,11 +120,8 @@ struct MakoWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
void getMetrics(vector<PerfMetric>& m) override {
|
void getMetrics(vector<PerfMetric>& m) override {
|
||||||
if (mode == "clean"){
|
// metrics of population process
|
||||||
return;
|
if (populateData){
|
||||||
}
|
|
||||||
// Add metrics of population process
|
|
||||||
if (mode == "build" && clientId == 0){
|
|
||||||
m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) );
|
m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) );
|
||||||
// The importing rate of keys, controlled by parameter "insertionCountsToMeasure"
|
// The importing rate of keys, controlled by parameter "insertionCountsToMeasure"
|
||||||
auto ratesItr = ratesAtKeyCounts.begin();
|
auto ratesItr = ratesAtKeyCounts.begin();
|
||||||
|
@ -120,42 +129,45 @@ struct MakoWorkload : KVWorkload {
|
||||||
m.push_back(PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
|
m.push_back(PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.push_back(PerfMetric("Measured Duration", metricsDuration, true));
|
// benchmark
|
||||||
m.push_back(xacts.getMetric());
|
if (runBenchmark){
|
||||||
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
|
m.push_back(PerfMetric("Measured Duration", testDuration, true));
|
||||||
m.push_back(totalOps.getMetric());
|
m.push_back(xacts.getMetric());
|
||||||
m.push_back(PerfMetric("Operations/sec", totalOps.getValue() / testDuration, true));
|
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
|
||||||
m.push_back(conflicts.getMetric());
|
m.push_back(totalOps.getMetric());
|
||||||
m.push_back(PerfMetric("Conflicts/sec", conflicts.getValue() / testDuration, true));
|
m.push_back(PerfMetric("Operations/sec", totalOps.getValue() / testDuration, true));
|
||||||
m.push_back(retries.getMetric());
|
m.push_back(conflicts.getMetric());
|
||||||
|
m.push_back(PerfMetric("Conflicts/sec", conflicts.getValue() / testDuration, true));
|
||||||
|
m.push_back(retries.getMetric());
|
||||||
|
|
||||||
// count of each operation
|
// count of each operation
|
||||||
for (int i = 0; i < MAX_OP; ++i){
|
for (int i = 0; i < MAX_OP; ++i){
|
||||||
m.push_back(opCounters[i].getMetric());
|
m.push_back(opCounters[i].getMetric());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Meaningful Latency metrics
|
||||||
|
const int opExecutedAtOnce[] = {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT};
|
||||||
|
for (const int& op : opExecutedAtOnce){
|
||||||
|
m.push_back(PerfMetric("Mean " + std::string(opNames[op]) +" Latency (ms)", 1000 * opLatencies[op].mean(), true));
|
||||||
|
m.push_back(PerfMetric("Max " + std::string(opNames[op]) + " Latency (ms, averaged)", 1000 * opLatencies[op].max(), true));
|
||||||
|
m.push_back(PerfMetric("Min " + std::string(opNames[op]) + " Latency (ms, averaged)", 1000 * opLatencies[op].min(), true));
|
||||||
|
}
|
||||||
|
|
||||||
|
//insert logging metrics if exists
|
||||||
|
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Meaningful Latency metrics
|
|
||||||
const int opExecutedAtOnce[] = {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT};
|
|
||||||
for (const int& op : opExecutedAtOnce){
|
|
||||||
m.push_back(PerfMetric("Mean " + std::string(opNames[op]) +" Latency (ms)", 1000 * opLatencies[op].mean(), true));
|
|
||||||
m.push_back(PerfMetric("Max " + std::string(opNames[op]) + " Latency (ms, averaged)", 1000 * opLatencies[op].max(), true));
|
|
||||||
m.push_back(PerfMetric("Min " + std::string(opNames[op]) + " Latency (ms, averaged)", 1000 * opLatencies[op].min(), true));
|
|
||||||
}
|
|
||||||
|
|
||||||
//insert logging metrics if exists
|
|
||||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
|
||||||
}
|
}
|
||||||
static void randStr(std::string& str, const int& len) {
|
static void randStr(std::string& str, const int& len) {
|
||||||
static char randomStr[BUFFERSIZE];
|
static char randomStr[BUFFERSIZE];
|
||||||
for (int i = 0; i < len; ++i) {
|
for (int i = 0; i < len; ++i) {
|
||||||
randomStr[i] = '!' + g_random->randomInt(0, 'z' - '!'); /* generage a char from '!' to 'z' */
|
randomStr[i] = g_random->randomAlphaNumeric();
|
||||||
}
|
}
|
||||||
str.assign(randomStr, len);
|
str.assign(randomStr, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void randStr(char *str, const int& len){
|
static void randStr(char *str, const int& len){
|
||||||
for (int i = 0; i < len; ++i) {
|
for (int i = 0; i < len; ++i) {
|
||||||
str[i] = '!' + g_random->randomInt(0, 'z'-'!'); /* generage a char from '!' to 'z' */
|
str[i] = g_random->randomAlphaNumeric();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,12 +179,13 @@ struct MakoWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
Key ind2key(const uint64_t& ind) {
|
Key ind2key(const uint64_t& ind) {
|
||||||
Key result = makeString(keyBytes);
|
// allocate one more byte for potential null-character added by snprintf
|
||||||
|
Key result = makeString(keyBytes+1);
|
||||||
uint8_t* data = mutateString(result);
|
uint8_t* data = mutateString(result);
|
||||||
sprintf((char*)data, "mako%0*d", seqNumLen, ind);
|
sprintf((char*)data, (KEYPREFIX + "%0*d").c_str(), seqNumLen, ind);
|
||||||
for (int i = 4 + seqNumLen; i < keyBytes; ++i)
|
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
|
||||||
data[i] = 'x';
|
data[i] = 'x';
|
||||||
return result;
|
return StringRef(result.begin(), keyBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* number of digits */
|
/* number of digits */
|
||||||
|
@ -212,7 +225,7 @@ struct MakoWorkload : KVWorkload {
|
||||||
ACTOR Future<Void> _setup(Database cx, MakoWorkload* self) {
|
ACTOR Future<Void> _setup(Database cx, MakoWorkload* self) {
|
||||||
|
|
||||||
state Promise<double> loadTime;
|
state Promise<double> loadTime;
|
||||||
state Promise<vector<pair<uint64_t, double>>> ratesAtKeyCounts;
|
state Promise<vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
||||||
|
|
||||||
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay,
|
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay,
|
||||||
self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
||||||
|
@ -226,6 +239,17 @@ struct MakoWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> _start(Database cx, MakoWorkload* self) {
|
ACTOR Future<Void> _start(Database cx, MakoWorkload* self) {
|
||||||
|
// TODO: Do I need to read data to warm the cache of the keySystem like ReadWrite.actor.cpp (line 465)?
|
||||||
|
if (self->runBenchmark) {
|
||||||
|
wait(self->_runBenchMark(cx, self));
|
||||||
|
}
|
||||||
|
if (!self->preserveData && self->clientId == 0){
|
||||||
|
wait(self->cleanup(cx, self));
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> _runBenchMark(Database cx, MakoWorkload* self){
|
||||||
vector<Future<Void>> clients;
|
vector<Future<Void>> clients;
|
||||||
for (int c = 0; c < self->actorCountPerClient; ++c) {
|
for (int c = 0; c < self->actorCountPerClient; ++c) {
|
||||||
clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c));
|
clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c));
|
||||||
|
@ -275,9 +299,7 @@ struct MakoWorkload : KVWorkload {
|
||||||
rkeyRangeRef = KeyRangeRef(self->ind2key(indBegin), self->ind2key(indEnd));
|
rkeyRangeRef = KeyRangeRef(self->ind2key(indBegin), self->ind2key(indEnd));
|
||||||
|
|
||||||
if (i == OP_GETREADVERSION){
|
if (i == OP_GETREADVERSION){
|
||||||
state double getReadVersionStart = now();
|
wait(logLatency(tr.getReadVersion(), &self->opLatencies[i]));
|
||||||
Version v = wait(tr.getReadVersion());
|
|
||||||
self->opLatencies[i].addSample(now() - getReadVersionStart);
|
|
||||||
}
|
}
|
||||||
else if (i == OP_GET){
|
else if (i == OP_GET){
|
||||||
wait(logLatency(tr.get(rkey, false), &self->opLatencies[i]));
|
wait(logLatency(tr.get(rkey, false), &self->opLatencies[i]));
|
||||||
|
@ -295,52 +317,47 @@ struct MakoWorkload : KVWorkload {
|
||||||
tr.set(rkey, rval, true);
|
tr.set(rkey, rval, true);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_INSERT){
|
} else if (i == OP_INSERT){
|
||||||
// generate a unique key here
|
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters
|
||||||
std::string unique_key;
|
randStr((char*) mutateString(rkey) + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
randStr(unique_key, self->keyBytes-KEYPREFIXLEN);
|
tr.set(rkey, rval, true);
|
||||||
tr.set(StringRef(unique_key), rval, true);
|
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_INSERTRANGE){
|
} else if (i == OP_INSERTRANGE){
|
||||||
Key tempKey = makeString(self->keyBytes-KEYPREFIXLEN);
|
uint8_t *rkeyPtr = mutateString(rkey);
|
||||||
uint8_t *keyPtr = mutateString(tempKey);
|
randStr((char*) rkeyPtr + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
randStr((char*) keyPtr, self->keyBytes-KEYPREFIXLEN);
|
|
||||||
for (int range_i = 0; range_i < range; ++range_i){
|
for (int range_i = 0; range_i < range; ++range_i){
|
||||||
sprintf((char*) keyPtr + self->keyBytes - KEYPREFIXLEN - rangeLen, "%0.*d", rangeLen, range_i);
|
sprintf((char*) rkeyPtr + self->keyBytes - rangeLen, "%0.*d", rangeLen, range_i);
|
||||||
tr.set(tempKey, self->randomValue(), true);
|
tr.set(rkey, self->randomValue(), true);
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_CLEAR){
|
} else if (i == OP_CLEAR){
|
||||||
tr.clear(rkey, true);
|
tr.clear(rkey, true);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if(i == OP_SETCLEAR){
|
} else if(i == OP_SETCLEAR){
|
||||||
state std::string st_key;
|
randStr((char*) mutateString(rkey) + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
randStr(st_key, self->keyBytes-KEYPREFIXLEN);
|
tr.set(rkey, rval, true);
|
||||||
tr.set(StringRef(st_key), rval, true);
|
|
||||||
// commit the change and update metrics
|
// commit the change and update metrics
|
||||||
commitStart = now();
|
commitStart = now();
|
||||||
wait(tr.commit());
|
wait(tr.commit());
|
||||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||||
++perOpCount[OP_COMMIT];
|
++perOpCount[OP_COMMIT];
|
||||||
tr.reset();
|
tr.reset();
|
||||||
tr.clear(st_key, true);
|
tr.clear(rkey, true);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_CLEARRANGE){
|
} else if (i == OP_CLEARRANGE){
|
||||||
tr.clear(rkeyRangeRef, true);
|
tr.clear(rkeyRangeRef, true);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_SETCLEARRANGE){
|
} else if (i == OP_SETCLEARRANGE){
|
||||||
// Key tempKey[self->keyBytes-KEYPREFIXLEN+1];
|
uint8_t *rkeyPtr = mutateString(rkey);
|
||||||
Key tempKey = makeString(self->keyBytes-KEYPREFIXLEN);
|
randStr((char*) rkeyPtr + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
uint8_t *keyPtr = mutateString(tempKey);
|
|
||||||
randStr((char*) keyPtr, self->keyBytes-KEYPREFIXLEN);
|
|
||||||
state std::string scr_start_key;
|
state std::string scr_start_key;
|
||||||
state std::string scr_end_key;
|
state std::string scr_end_key;
|
||||||
for (int range_i = 0; range_i < range; ++range_i){
|
for (int range_i = 0; range_i < range; ++range_i){
|
||||||
sprintf((char*) keyPtr + self->keyBytes - KEYPREFIXLEN - rangeLen, "%0.*d", rangeLen, range_i);
|
sprintf((char*) rkeyPtr + self->keyBytes - rangeLen, "%0.*d", rangeLen, range_i);
|
||||||
tr.set(tempKey, self->randomValue(), true);
|
tr.set(rkey, self->randomValue(), true);
|
||||||
if (range_i == 0)
|
if (range_i == 0)
|
||||||
scr_start_key = tempKey.toString();
|
scr_start_key = rkey.toString();
|
||||||
}
|
}
|
||||||
scr_end_key = tempKey.toString();
|
scr_end_key = rkey.toString();
|
||||||
commitStart = now();
|
commitStart = now();
|
||||||
wait(tr.commit());
|
wait(tr.commit());
|
||||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||||
|
@ -382,9 +399,9 @@ struct MakoWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
||||||
// clear all data in the database
|
// clear all data starts with 'mako' in the database
|
||||||
state std::string startKey("");
|
state std::string startKey("mako");
|
||||||
state std::string endKey("\xff");
|
state std::string endKey("mako\xff");
|
||||||
state Transaction tr(cx);
|
state Transaction tr(cx);
|
||||||
|
|
||||||
loop{
|
loop{
|
||||||
|
@ -400,22 +417,11 @@ struct MakoWorkload : KVWorkload {
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
ACTOR template<class T>
|
||||||
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f, ContinuousSample<double>* opLatencies) {
|
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies){
|
||||||
state double opBegin = now();
|
state double opBegin = now();
|
||||||
Optional<Value> value = wait(f);
|
T value = wait(f);
|
||||||
|
opLatencies->addSample(now() - opBegin);
|
||||||
double latency = now() - opBegin;
|
|
||||||
opLatencies->addSample(latency);
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR static Future<Void> logLatency(Future<Standalone<RangeResultRef>> f, ContinuousSample<double>* opLatencies){
|
|
||||||
state double opBegin = now();
|
|
||||||
Standalone<RangeResultRef> value = wait(f);
|
|
||||||
|
|
||||||
double latency = now() - opBegin;
|
|
||||||
opLatencies->addSample(latency);
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,8 +429,8 @@ struct MakoWorkload : KVWorkload {
|
||||||
// TODO: support other distribution like zipf
|
// TODO: support other distribution like zipf
|
||||||
return g_random->randomInt64(0, rowCount);
|
return g_random->randomInt64(0, rowCount);
|
||||||
}
|
}
|
||||||
int parse_transaction() {
|
int parseOperationsSpec() {
|
||||||
const char *ptr = unparsedTxStr.c_str();
|
const char *ptr = operationsSpec.c_str();
|
||||||
int op = 0;
|
int op = 0;
|
||||||
int rangeop = 0;
|
int rangeop = 0;
|
||||||
int num;
|
int num;
|
||||||
|
|
|
@ -1,33 +0,0 @@
|
||||||
#ifndef MAKO_H
|
|
||||||
#define MAKO_H
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#define VERBOSE_NONE 0
|
|
||||||
#define VERBOSE_DEFAULT 1
|
|
||||||
#define VERBOSE_ANNOYING 2
|
|
||||||
#define VERBOSE_DEBUG 3
|
|
||||||
|
|
||||||
/* transaction specification */
|
|
||||||
#define OP_GETREADVERSION 0
|
|
||||||
#define OP_GET 1
|
|
||||||
#define OP_GETRANGE 2
|
|
||||||
#define OP_SGET 3
|
|
||||||
#define OP_SGETRANGE 4
|
|
||||||
#define OP_UPDATE 5
|
|
||||||
#define OP_INSERT 6
|
|
||||||
#define OP_INSERTRANGE 7
|
|
||||||
#define OP_CLEAR 8
|
|
||||||
#define OP_SETCLEAR 9
|
|
||||||
#define OP_CLEARRANGE 10
|
|
||||||
#define OP_SETCLEARRANGE 11
|
|
||||||
#define OP_COMMIT 12
|
|
||||||
#define MAX_OP 13 /* update this when adding a new operation */
|
|
||||||
|
|
||||||
#define OP_COUNT 0
|
|
||||||
#define OP_RANGE 1
|
|
||||||
|
|
||||||
#define KEYPREFIXLEN 4
|
|
||||||
|
|
||||||
#define BUFFERSIZE 10000
|
|
||||||
|
|
||||||
#endif /* MAKO_H */
|
|
|
@ -1,13 +1,15 @@
|
||||||
testTitle=MakoTest
|
testTitle=MakoTest
|
||||||
testName=Mako
|
testName=Mako
|
||||||
testDuration=10.0
|
testDuration=10.0
|
||||||
transactionsPerSecond=1000
|
transactionsPerSecond=100000
|
||||||
rows=10000
|
rows=1000000
|
||||||
sampling=100
|
sampling=100
|
||||||
valueBytes=16
|
valueBytes=16
|
||||||
keyBytes=16
|
keyBytes=16
|
||||||
operations=scr10:10
|
operations=g5gr5:10i5ir5:10grv5
|
||||||
actorCountPerClient=100
|
actorCountPerClient=256
|
||||||
enableLogging=false
|
enableLogging=false
|
||||||
commitGet=false
|
commitGet=false
|
||||||
mode=build
|
populateData=true
|
||||||
|
runBenchmark=true
|
||||||
|
preserveData=true
|
||||||
|
|
Loading…
Reference in New Issue