clang-format MakoWorkload
This commit is contained in:
parent
83b09784c7
commit
5078bc5728
|
@ -7,20 +7,36 @@
|
||||||
#include "flow/crc32c.h"
|
#include "flow/crc32c.h"
|
||||||
#include "flow/actorcompiler.h"
|
#include "flow/actorcompiler.h"
|
||||||
|
|
||||||
|
enum {
|
||||||
|
OP_GETREADVERSION,
|
||||||
enum {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_UPDATE, OP_INSERT, OP_INSERTRANGE, OP_CLEAR, OP_SETCLEAR, OP_CLEARRANGE, OP_SETCLEARRANGE, OP_COMMIT, MAX_OP};
|
OP_GET,
|
||||||
enum {OP_COUNT, OP_RANGE};
|
OP_GETRANGE,
|
||||||
|
OP_SGET,
|
||||||
|
OP_SGETRANGE,
|
||||||
|
OP_UPDATE,
|
||||||
|
OP_INSERT,
|
||||||
|
OP_INSERTRANGE,
|
||||||
|
OP_CLEAR,
|
||||||
|
OP_SETCLEAR,
|
||||||
|
OP_CLEARRANGE,
|
||||||
|
OP_SETCLEARRANGE,
|
||||||
|
OP_COMMIT,
|
||||||
|
MAX_OP
|
||||||
|
};
|
||||||
|
enum { OP_COUNT, OP_RANGE };
|
||||||
struct MakoWorkload : TestWorkload {
|
struct MakoWorkload : TestWorkload {
|
||||||
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes, csSize, csCount, csPartitionSize, csStepSizeInPartition;
|
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes, csSize,
|
||||||
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval, zipfConstant;
|
csCount, csPartitionSize, csStepSizeInPartition;
|
||||||
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification, doChecksumVerificationOnly, latencyForLocalOperation;
|
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency,
|
||||||
|
periodicLoggingInterval, zipfConstant;
|
||||||
|
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification,
|
||||||
|
doChecksumVerificationOnly, latencyForLocalOperation;
|
||||||
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
||||||
std::vector<PerfIntCounter> opCounters;
|
std::vector<PerfIntCounter> opCounters;
|
||||||
std::vector<uint64_t> insertionCountsToMeasure;
|
std::vector<uint64_t> insertionCountsToMeasure;
|
||||||
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
|
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
|
||||||
std::string operationsSpec;
|
std::string operationsSpec;
|
||||||
//store operations to execute
|
// store operations to execute
|
||||||
int operations[MAX_OP][2];
|
int operations[MAX_OP][2];
|
||||||
// used for periodically tracing
|
// used for periodically tracing
|
||||||
std::vector<PerfMetric> periodicMetrics;
|
std::vector<PerfMetric> periodicMetrics;
|
||||||
|
@ -31,12 +47,13 @@ struct MakoWorkload : TestWorkload {
|
||||||
// key prefix of for all generated keys
|
// key prefix of for all generated keys
|
||||||
std::string keyPrefix;
|
std::string keyPrefix;
|
||||||
int KEYPREFIXLEN;
|
int KEYPREFIXLEN;
|
||||||
const std::array<std::string, MAX_OP> opNames = {"GRV", "GET", "GETRANGE", "SGET", "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", "COMMIT"};
|
const std::array<std::string, MAX_OP> opNames = { "GRV", "GET", "GETRANGE", "SGET",
|
||||||
|
"SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE",
|
||||||
|
"CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE",
|
||||||
|
"COMMIT" };
|
||||||
MakoWorkload(WorkloadContext const& wcx)
|
MakoWorkload(WorkloadContext const& wcx)
|
||||||
: TestWorkload(wcx),
|
: TestWorkload(wcx), xacts("Transactions"), retries("Retries"), conflicts("Conflicts"), commits("Commits"),
|
||||||
xacts("Transactions"), retries("Retries"), conflicts("Conflicts"), commits("Commits"), totalOps("Operations"),
|
totalOps("Operations"), loadTime(0.0) {
|
||||||
loadTime(0.0)
|
|
||||||
{
|
|
||||||
// init parameters from test file
|
// init parameters from test file
|
||||||
// Number of rows populated
|
// Number of rows populated
|
||||||
rowCount = getOption(options, LiteralStringRef("rows"), 10000);
|
rowCount = getOption(options, LiteralStringRef("rows"), 10000);
|
||||||
|
@ -61,17 +78,17 @@ struct MakoWorkload : TestWorkload {
|
||||||
sampleSize = getOption(options, LiteralStringRef("sampleSize"), rowCount / 100);
|
sampleSize = getOption(options, LiteralStringRef("sampleSize"), rowCount / 100);
|
||||||
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
|
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
|
||||||
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
|
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
|
||||||
periodicLoggingInterval = getOption( options, LiteralStringRef("periodicLoggingInterval"), 5.0 );
|
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
|
||||||
// All the generated keys will start with the specified prefix
|
// All the generated keys will start with the specified prefix
|
||||||
keyPrefix = getOption( options, LiteralStringRef("keyPrefix"), LiteralStringRef("mako")).toString();
|
keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("mako")).toString();
|
||||||
KEYPREFIXLEN = keyPrefix.size();
|
KEYPREFIXLEN = keyPrefix.size();
|
||||||
// If true, the workload will picking up keys which are zipfian distributed
|
// If true, the workload will picking up keys which are zipfian distributed
|
||||||
zipf = getOption(options, LiteralStringRef("zipf"), false);
|
zipf = getOption(options, LiteralStringRef("zipf"), false);
|
||||||
zipfConstant = getOption(options, LiteralStringRef("zipfConstant"), 0.99);
|
zipfConstant = getOption(options, LiteralStringRef("zipfConstant"), 0.99);
|
||||||
// Specified length of keys and length range of values
|
// Specified length of keys and length range of values
|
||||||
keyBytes = std::max( getOption( options, LiteralStringRef("keyBytes"), 16 ), 16);
|
keyBytes = std::max(getOption(options, LiteralStringRef("keyBytes"), 16), 16);
|
||||||
maxValueBytes = getOption( options, LiteralStringRef("valueBytes"), 16 );
|
maxValueBytes = getOption(options, LiteralStringRef("valueBytes"), 16);
|
||||||
minValueBytes = getOption( options, LiteralStringRef("minValueBytes"), maxValueBytes);
|
minValueBytes = getOption(options, LiteralStringRef("minValueBytes"), maxValueBytes);
|
||||||
ASSERT(minValueBytes <= maxValueBytes);
|
ASSERT(minValueBytes <= maxValueBytes);
|
||||||
// The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x')
|
// The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x')
|
||||||
// assume we want to insert 10000 rows with keyBytes set to 16,
|
// assume we want to insert 10000 rows with keyBytes set to 16,
|
||||||
|
@ -98,7 +115,8 @@ struct MakoWorkload : TestWorkload {
|
||||||
// scr – SET & CLEAR RANGE
|
// scr – SET & CLEAR RANGE
|
||||||
// grv – GetReadVersion()
|
// grv – GetReadVersion()
|
||||||
// Every transaction is committed unless it contains only GET / GET RANGE operations.
|
// Every transaction is committed unless it contains only GET / GET RANGE operations.
|
||||||
operationsSpec = getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString();
|
operationsSpec =
|
||||||
|
getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString();
|
||||||
// parse the sequence and extract operations to be executed
|
// parse the sequence and extract operations to be executed
|
||||||
parseOperationsSpec();
|
parseOperationsSpec();
|
||||||
for (int i = 0; i < MAX_OP; ++i) {
|
for (int i = 0; i < MAX_OP; ++i) {
|
||||||
|
@ -107,8 +125,8 @@ struct MakoWorkload : TestWorkload {
|
||||||
// initialize per-operation counter
|
// initialize per-operation counter
|
||||||
opCounters.push_back(PerfIntCounter(opNames[i]));
|
opCounters.push_back(PerfIntCounter(opNames[i]));
|
||||||
}
|
}
|
||||||
if (zipf){
|
if (zipf) {
|
||||||
zipfian_generator3(0, (int)rowCount-1, zipfConstant);
|
zipfian_generator3(0, (int)rowCount - 1, zipfConstant);
|
||||||
}
|
}
|
||||||
// Added for checksum verification
|
// Added for checksum verification
|
||||||
csSize = getOption(options, LiteralStringRef("csSize"), rowCount / 100);
|
csSize = getOption(options, LiteralStringRef("csSize"), rowCount / 100);
|
||||||
|
@ -122,7 +140,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
csPartitionSize = rowCount / csSize;
|
csPartitionSize = rowCount / csSize;
|
||||||
ASSERT(csCount <= csPartitionSize);
|
ASSERT(csCount <= csPartitionSize);
|
||||||
csStepSizeInPartition = csPartitionSize / csCount;
|
csStepSizeInPartition = csPartitionSize / csCount;
|
||||||
for (int i= 0; i < csCount; ++i) {
|
for (int i = 0; i < csCount; ++i) {
|
||||||
csKeys.emplace_back(format((keyPrefix + "_crc32c_%u_%u").c_str(), i, rowCount));
|
csKeys.emplace_back(format((keyPrefix + "_crc32c_%u_%u").c_str(), i, rowCount));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -135,19 +153,17 @@ struct MakoWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> setup(Database const& cx) override {
|
Future<Void> setup(Database const& cx) override {
|
||||||
if (doChecksumVerificationOnly)
|
if (doChecksumVerificationOnly) return Void();
|
||||||
return Void();
|
|
||||||
return _setup(cx, this);
|
return _setup(cx, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> start(Database const& cx) override {
|
Future<Void> start(Database const& cx) override {
|
||||||
if (doChecksumVerificationOnly)
|
if (doChecksumVerificationOnly) return Void();
|
||||||
return Void();
|
|
||||||
return _start(cx, this);
|
return _start(cx, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<bool> check(Database const& cx) override {
|
Future<bool> check(Database const& cx) override {
|
||||||
if (!checksumVerification){
|
if (!checksumVerification) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// verify checksum consistency
|
// verify checksum consistency
|
||||||
|
@ -155,20 +171,21 @@ struct MakoWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
// disable the default timeout setting
|
// disable the default timeout setting
|
||||||
double getCheckTimeout() override {return std::numeric_limits<double>::max();}
|
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
|
||||||
|
|
||||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||||
// metrics of population process
|
// metrics of population process
|
||||||
if (populateData){
|
if (populateData) {
|
||||||
m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) );
|
m.push_back(PerfMetric("Mean load time (seconds)", loadTime, true));
|
||||||
// The importing rate of keys, controlled by parameter "insertionCountsToMeasure"
|
// The importing rate of keys, controlled by parameter "insertionCountsToMeasure"
|
||||||
auto ratesItr = ratesAtKeyCounts.begin();
|
auto ratesItr = ratesAtKeyCounts.begin();
|
||||||
for(; ratesItr != ratesAtKeyCounts.end(); ratesItr++){
|
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) {
|
||||||
m.push_back(PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
|
m.push_back(
|
||||||
|
PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// benchmark
|
// benchmark
|
||||||
if (runBenchmark){
|
if (runBenchmark) {
|
||||||
m.push_back(PerfMetric("Measured Duration", testDuration, true));
|
m.push_back(PerfMetric("Measured Duration", testDuration, true));
|
||||||
m.push_back(xacts.getMetric());
|
m.push_back(xacts.getMetric());
|
||||||
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
|
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
|
||||||
|
@ -179,31 +196,36 @@ struct MakoWorkload : TestWorkload {
|
||||||
m.push_back(retries.getMetric());
|
m.push_back(retries.getMetric());
|
||||||
|
|
||||||
// count of each operation
|
// count of each operation
|
||||||
for (int i = 0; i < MAX_OP; ++i){
|
for (int i = 0; i < MAX_OP; ++i) {
|
||||||
m.push_back(opCounters[i].getMetric());
|
m.push_back(opCounters[i].getMetric());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Meaningful Latency metrics
|
// Meaningful Latency metrics
|
||||||
const int opExecutedAtOnce[] = {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT};
|
const int opExecutedAtOnce[] = { OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT };
|
||||||
for (const int& op : opExecutedAtOnce){
|
for (const int& op : opExecutedAtOnce) {
|
||||||
m.push_back(PerfMetric("Mean " + opNames[op] +" Latency (us)", 1e6 * opLatencies[op].mean(), true));
|
m.push_back(PerfMetric("Mean " + opNames[op] + " Latency (us)", 1e6 * opLatencies[op].mean(), true));
|
||||||
m.push_back(PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true));
|
m.push_back(
|
||||||
m.push_back(PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true));
|
PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true));
|
||||||
|
m.push_back(
|
||||||
|
PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true));
|
||||||
}
|
}
|
||||||
// Latency for local operations if needed
|
// Latency for local operations if needed
|
||||||
if (latencyForLocalOperation) {
|
if (latencyForLocalOperation) {
|
||||||
const int localOp[] = {OP_INSERT, OP_CLEAR, OP_CLEARRANGE};
|
const int localOp[] = { OP_INSERT, OP_CLEAR, OP_CLEARRANGE };
|
||||||
for (const int& op : localOp){
|
for (const int& op : localOp) {
|
||||||
TraceEvent(SevDebug, "LocalLatency")
|
TraceEvent(SevDebug, "LocalLatency")
|
||||||
.detail("Name", opNames[op])
|
.detail("Name", opNames[op])
|
||||||
.detail("Size", opLatencies[op].getPopulationSize());
|
.detail("Size", opLatencies[op].getPopulationSize());
|
||||||
m.push_back(PerfMetric("Mean " + opNames[op] +" Latency (us)", 1e6 * opLatencies[op].mean(), true));
|
m.push_back(
|
||||||
m.push_back(PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true));
|
PerfMetric("Mean " + opNames[op] + " Latency (us)", 1e6 * opLatencies[op].mean(), true));
|
||||||
m.push_back(PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true));
|
m.push_back(PerfMetric("Max " + opNames[op] + " Latency (us, averaged)",
|
||||||
|
1e6 * opLatencies[op].max(), true));
|
||||||
|
m.push_back(PerfMetric("Min " + opNames[op] + " Latency (us, averaged)",
|
||||||
|
1e6 * opLatencies[op].min(), true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//insert logging metrics if exists
|
// insert logging metrics if exists
|
||||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,7 +237,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void randStr(char *str, int len){
|
static void randStr(char* str, int len) {
|
||||||
for (int i = 0; i < len; ++i) {
|
for (int i = 0; i < len; ++i) {
|
||||||
str[i] = deterministicRandom()->randomAlphaNumeric();
|
str[i] = deterministicRandom()->randomAlphaNumeric();
|
||||||
}
|
}
|
||||||
|
@ -231,8 +253,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
Key result = makeString(keyBytes);
|
Key result = makeString(keyBytes);
|
||||||
char* data = reinterpret_cast<char*>(mutateString(result));
|
char* data = reinterpret_cast<char*>(mutateString(result));
|
||||||
format((keyPrefix + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen);
|
format((keyPrefix + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen);
|
||||||
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
|
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i) data[i] = 'x';
|
||||||
data[i] = 'x';
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,11 +267,11 @@ struct MakoWorkload : TestWorkload {
|
||||||
return digits;
|
return digits;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateCSFlags(MakoWorkload* self, std::vector<bool>& flags, uint64_t startIdx, uint64_t endIdx){
|
static void updateCSFlags(MakoWorkload* self, std::vector<bool>& flags, uint64_t startIdx, uint64_t endIdx) {
|
||||||
// We deal with cases where rowCount % csCount != 0 and csPartitionSize % csSize != 0;
|
// We deal with cases where rowCount % csCount != 0 and csPartitionSize % csSize != 0;
|
||||||
// In particular, all keys with index in range [csSize * csPartitionSize, rowCount) will not be used for checksum
|
// In particular, all keys with index in range [csSize * csPartitionSize, rowCount) will not be used for
|
||||||
// By the same way, for any i in range [0, csSize):
|
// checksum By the same way, for any i in range [0, csSize): keys with index in range [ i*csPartitionSize,
|
||||||
// keys with index in range [ i*csPartitionSize, i*csPartitionSize + csCount*csStepSizeInPartition) will not be used for checksum
|
// i*csPartitionSize + csCount*csStepSizeInPartition) will not be used for checksum
|
||||||
uint64_t boundary = self->csSize * self->csPartitionSize;
|
uint64_t boundary = self->csSize * self->csPartitionSize;
|
||||||
if (startIdx >= boundary)
|
if (startIdx >= boundary)
|
||||||
return;
|
return;
|
||||||
|
@ -258,17 +279,16 @@ struct MakoWorkload : TestWorkload {
|
||||||
endIdx = boundary;
|
endIdx = boundary;
|
||||||
|
|
||||||
// If all checksums need to be updated, just return
|
// If all checksums need to be updated, just return
|
||||||
if (std::all_of(flags.begin(), flags.end(), [](bool flag){return flag;}))
|
if (std::all_of(flags.begin(), flags.end(), [](bool flag) { return flag; })) return;
|
||||||
return;
|
|
||||||
|
|
||||||
if (startIdx + 1 == endIdx){
|
if (startIdx + 1 == endIdx) {
|
||||||
// single key case
|
// single key case
|
||||||
startIdx = startIdx % self->csPartitionSize;
|
startIdx = startIdx % self->csPartitionSize;
|
||||||
if ((startIdx < self->csCount * self->csStepSizeInPartition) && (startIdx % self->csStepSizeInPartition == 0)){
|
if ((startIdx < self->csCount * self->csStepSizeInPartition) &&
|
||||||
|
(startIdx % self->csStepSizeInPartition == 0)) {
|
||||||
flags.at(startIdx / self->csStepSizeInPartition) = true;
|
flags.at(startIdx / self->csStepSizeInPartition) = true;
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
// key range case
|
// key range case
|
||||||
uint64_t count = self->csCount;
|
uint64_t count = self->csCount;
|
||||||
uint64_t base = (startIdx / self->csPartitionSize) * self->csPartitionSize;
|
uint64_t base = (startIdx / self->csPartitionSize) * self->csPartitionSize;
|
||||||
|
@ -277,29 +297,28 @@ struct MakoWorkload : TestWorkload {
|
||||||
uint64_t startStepIdx = std::min(startIdx / self->csStepSizeInPartition, self->csCount - 1);
|
uint64_t startStepIdx = std::min(startIdx / self->csStepSizeInPartition, self->csCount - 1);
|
||||||
|
|
||||||
// if changed range size is more than one csPartitionSize, which means every checksum needs to be updated
|
// if changed range size is more than one csPartitionSize, which means every checksum needs to be updated
|
||||||
if ((endIdx - startIdx) < self->csPartitionSize){
|
if ((endIdx - startIdx) < self->csPartitionSize) {
|
||||||
uint64_t endStepIdx;
|
uint64_t endStepIdx;
|
||||||
if (endIdx > self->csPartitionSize){
|
if (endIdx > self->csPartitionSize) {
|
||||||
endStepIdx = self->csCount + std::min((endIdx - 1 - self->csPartitionSize) / self->csStepSizeInPartition, self->csCount);
|
endStepIdx =
|
||||||
|
self->csCount +
|
||||||
|
std::min((endIdx - 1 - self->csPartitionSize) / self->csStepSizeInPartition, self->csCount);
|
||||||
} else {
|
} else {
|
||||||
endStepIdx = std::min((endIdx - 1) / self->csStepSizeInPartition, self->csCount - 1);
|
endStepIdx = std::min((endIdx - 1) / self->csStepSizeInPartition, self->csCount - 1);
|
||||||
}
|
}
|
||||||
// All the left boundary of csStep should be updated
|
// All the left boundary of csStep should be updated
|
||||||
// Also, check the startIdx whether it is the left boundary of a csStep
|
// Also, check the startIdx whether it is the left boundary of a csStep
|
||||||
if (startIdx == self->csStepSizeInPartition * startStepIdx)
|
if (startIdx == self->csStepSizeInPartition * startStepIdx) flags[startStepIdx] = true;
|
||||||
flags[startStepIdx] = true;
|
|
||||||
count = endStepIdx - startStepIdx;
|
count = endStepIdx - startStepIdx;
|
||||||
}
|
}
|
||||||
for (int i = 1; i <= count; ++i){
|
for (int i = 1; i <= count; ++i) {
|
||||||
flags[ (startStepIdx+i) % self->csCount] = true;
|
flags[(startStepIdx + i) % self->csCount] = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Standalone<KeyValueRef> operator()(uint64_t n) {
|
Standalone<KeyValueRef> operator()(uint64_t n) { return KeyValueRef(keyForIndex(n), randomValue()); }
|
||||||
return KeyValueRef(keyForIndex(n), randomValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR static Future<Void> tracePeriodically( MakoWorkload *self){
|
ACTOR static Future<Void> tracePeriodically(MakoWorkload* self) {
|
||||||
state double start = timer();
|
state double start = timer();
|
||||||
state double elapsed = 0.0;
|
state double elapsed = 0.0;
|
||||||
state int64_t last_ops = 0;
|
state int64_t last_ops = 0;
|
||||||
|
@ -307,13 +326,26 @@ struct MakoWorkload : TestWorkload {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
elapsed += self->periodicLoggingInterval;
|
elapsed += self->periodicLoggingInterval;
|
||||||
wait( delayUntil(start + elapsed));
|
wait(delayUntil(start + elapsed));
|
||||||
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->opLatencies[OP_COMMIT].mean()).detail("Median", self->opLatencies[OP_COMMIT].median()).detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05)).detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95)).detail("Count", self->opCounters[OP_COMMIT].getValue()).detail("Elapsed", elapsed);
|
TraceEvent((self->description() + "_CommitLatency").c_str())
|
||||||
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->opLatencies[OP_GETREADVERSION].mean()).detail("Median", self->opLatencies[OP_GETREADVERSION].median()).detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05)).detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95)).detail("Count", self->opCounters[OP_GETREADVERSION].getValue());
|
.detail("Mean", self->opLatencies[OP_COMMIT].mean())
|
||||||
|
.detail("Median", self->opLatencies[OP_COMMIT].median())
|
||||||
|
.detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05))
|
||||||
|
.detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95))
|
||||||
|
.detail("Count", self->opCounters[OP_COMMIT].getValue())
|
||||||
|
.detail("Elapsed", elapsed);
|
||||||
|
TraceEvent((self->description() + "_GRVLatency").c_str())
|
||||||
|
.detail("Mean", self->opLatencies[OP_GETREADVERSION].mean())
|
||||||
|
.detail("Median", self->opLatencies[OP_GETREADVERSION].median())
|
||||||
|
.detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05))
|
||||||
|
.detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95))
|
||||||
|
.detail("Count", self->opCounters[OP_GETREADVERSION].getValue());
|
||||||
|
|
||||||
std::string ts = format("T=%04.0fs: ", elapsed);
|
std::string ts = format("T=%04.0fs: ", elapsed);
|
||||||
self->periodicMetrics.push_back(PerfMetric(ts + "Transactions/sec", (self->xacts.getValue() - last_xacts) / self->periodicLoggingInterval, false));
|
self->periodicMetrics.push_back(PerfMetric(
|
||||||
self->periodicMetrics.push_back(PerfMetric(ts + "Operations/sec", (self->totalOps.getValue() - last_ops) / self->periodicLoggingInterval, false));
|
ts + "Transactions/sec", (self->xacts.getValue() - last_xacts) / self->periodicLoggingInterval, false));
|
||||||
|
self->periodicMetrics.push_back(PerfMetric(
|
||||||
|
ts + "Operations/sec", (self->totalOps.getValue() - last_ops) / self->periodicLoggingInterval, false));
|
||||||
|
|
||||||
last_xacts = self->xacts.getValue();
|
last_xacts = self->xacts.getValue();
|
||||||
last_ops = self->totalOps.getValue();
|
last_ops = self->totalOps.getValue();
|
||||||
|
@ -325,8 +357,8 @@ struct MakoWorkload : TestWorkload {
|
||||||
state Promise<double> loadTime;
|
state Promise<double> loadTime;
|
||||||
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
||||||
|
|
||||||
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay,
|
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(),
|
||||||
self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
self->warmingDelay, self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
||||||
|
|
||||||
// This is the setup time
|
// This is the setup time
|
||||||
self->loadTime = loadTime.getFuture().get();
|
self->loadTime = loadTime.getFuture().get();
|
||||||
|
@ -334,7 +366,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
|
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
|
||||||
}
|
}
|
||||||
// Use one client to initialize checksums
|
// Use one client to initialize checksums
|
||||||
if (self->checksumVerification && self->clientId == 0){
|
if (self->checksumVerification && self->clientId == 0) {
|
||||||
wait(generateChecksum(cx, self));
|
wait(generateChecksum(cx, self));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,22 +378,21 @@ struct MakoWorkload : TestWorkload {
|
||||||
if (self->runBenchmark) {
|
if (self->runBenchmark) {
|
||||||
wait(self->_runBenchmark(cx, self));
|
wait(self->_runBenchmark(cx, self));
|
||||||
}
|
}
|
||||||
if (!self->preserveData && self->clientId == 0){
|
if (!self->preserveData && self->clientId == 0) {
|
||||||
wait(self->cleanup(cx, self));
|
wait(self->cleanup(cx, self));
|
||||||
}
|
}
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> _runBenchmark(Database cx, MakoWorkload* self){
|
ACTOR Future<Void> _runBenchmark(Database cx, MakoWorkload* self) {
|
||||||
std::vector<Future<Void>> clients;
|
std::vector<Future<Void>> clients;
|
||||||
for (int c = 0; c < self->actorCountPerClient; ++c) {
|
for (int c = 0; c < self->actorCountPerClient; ++c) {
|
||||||
clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c));
|
clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (self->enableLogging)
|
if (self->enableLogging) clients.push_back(tracePeriodically(self));
|
||||||
clients.push_back(tracePeriodically(self));
|
|
||||||
|
|
||||||
wait( timeout( waitForAll( clients ), self->testDuration, Void() ) );
|
wait(timeout(waitForAll(clients), self->testDuration, Void()));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,17 +411,19 @@ struct MakoWorkload : TestWorkload {
|
||||||
state double lastTime = timer();
|
state double lastTime = timer();
|
||||||
state double commitStart;
|
state double commitStart;
|
||||||
|
|
||||||
TraceEvent("ClientStarting").detail("ActorIndex", actorIndex).detail("ClientIndex", self->clientId).detail("NumActors", self->actorCountPerClient);
|
TraceEvent("ClientStarting")
|
||||||
|
.detail("ActorIndex", actorIndex)
|
||||||
|
.detail("ClientIndex", self->clientId)
|
||||||
|
.detail("NumActors", self->actorCountPerClient);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// used for throttling
|
// used for throttling
|
||||||
wait(poisson(&lastTime, delay));
|
wait(poisson(&lastTime, delay));
|
||||||
try{
|
try {
|
||||||
// user-defined value: whether commit read-only ops or not; default is false
|
// user-defined value: whether commit read-only ops or not; default is false
|
||||||
doCommit = self->commitGet;
|
doCommit = self->commitGet;
|
||||||
for (i = 0; i < MAX_OP; ++i) {
|
for (i = 0; i < MAX_OP; ++i) {
|
||||||
if (i == OP_COMMIT)
|
if (i == OP_COMMIT) continue;
|
||||||
continue;
|
|
||||||
for (count = 0; count < self->operations[i][0]; ++count) {
|
for (count = 0; count < self->operations[i][0]; ++count) {
|
||||||
range = self->operations[i][1];
|
range = self->operations[i][1];
|
||||||
rangeLen = digits(range);
|
rangeLen = digits(range);
|
||||||
|
@ -404,29 +437,28 @@ struct MakoWorkload : TestWorkload {
|
||||||
rkeyRangeRef = KeyRangeRef(rkey, rkey2);
|
rkeyRangeRef = KeyRangeRef(rkey, rkey2);
|
||||||
|
|
||||||
// used for mako-level consistency check
|
// used for mako-level consistency check
|
||||||
if (self->checksumVerification){
|
if (self->checksumVerification) {
|
||||||
if (i == OP_INSERT | i == OP_UPDATE | i == OP_CLEAR) {
|
if (i == OP_INSERT | i == OP_UPDATE | i == OP_CLEAR) {
|
||||||
updateCSFlags(self, csChangedFlags, indBegin, indBegin + 1);
|
updateCSFlags(self, csChangedFlags, indBegin, indBegin + 1);
|
||||||
}
|
} else if (i == OP_CLEARRANGE) {
|
||||||
else if (i == OP_CLEARRANGE) {
|
|
||||||
updateCSFlags(self, csChangedFlags, indBegin, indEnd);
|
updateCSFlags(self, csChangedFlags, indBegin, indEnd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i == OP_GETREADVERSION){
|
if (i == OP_GETREADVERSION) {
|
||||||
wait(logLatency(tr.getReadVersion(), &self->opLatencies[i]));
|
wait(logLatency(tr.getReadVersion(), &self->opLatencies[i]));
|
||||||
}
|
} else if (i == OP_GET) {
|
||||||
else if (i == OP_GET){
|
|
||||||
wait(logLatency(tr.get(rkey, false), &self->opLatencies[i]));
|
wait(logLatency(tr.get(rkey, false), &self->opLatencies[i]));
|
||||||
} else if (i == OP_GETRANGE){
|
} else if (i == OP_GETRANGE) {
|
||||||
wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, false), &self->opLatencies[i]));
|
wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, false),
|
||||||
}
|
&self->opLatencies[i]));
|
||||||
else if (i == OP_SGET){
|
} else if (i == OP_SGET) {
|
||||||
wait(logLatency(tr.get(rkey, true), &self->opLatencies[i]));
|
wait(logLatency(tr.get(rkey, true), &self->opLatencies[i]));
|
||||||
} else if (i == OP_SGETRANGE){
|
} else if (i == OP_SGETRANGE) {
|
||||||
//do snapshot get range here
|
// do snapshot get range here
|
||||||
wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, true), &self->opLatencies[i]));
|
wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, true),
|
||||||
} else if (i == OP_UPDATE){
|
&self->opLatencies[i]));
|
||||||
|
} else if (i == OP_UPDATE) {
|
||||||
wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET]));
|
wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET]));
|
||||||
if (self->latencyForLocalOperation) {
|
if (self->latencyForLocalOperation) {
|
||||||
double opBegin = timer();
|
double opBegin = timer();
|
||||||
|
@ -436,9 +468,11 @@ struct MakoWorkload : TestWorkload {
|
||||||
tr.set(rkey, rval);
|
tr.set(rkey, rval);
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_INSERT){
|
} else if (i == OP_INSERT) {
|
||||||
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters
|
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly
|
||||||
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
// generated characters
|
||||||
|
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN,
|
||||||
|
self->keyBytes - self->KEYPREFIXLEN);
|
||||||
if (self->latencyForLocalOperation) {
|
if (self->latencyForLocalOperation) {
|
||||||
double opBegin = timer();
|
double opBegin = timer();
|
||||||
tr.set(rkey, rval);
|
tr.set(rkey, rval);
|
||||||
|
@ -447,9 +481,9 @@ struct MakoWorkload : TestWorkload {
|
||||||
tr.set(rkey, rval);
|
tr.set(rkey, rval);
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_INSERTRANGE){
|
} else if (i == OP_INSERTRANGE) {
|
||||||
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
char* rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||||
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes - self->KEYPREFIXLEN);
|
||||||
for (int range_i = 0; range_i < range; ++range_i) {
|
for (int range_i = 0; range_i < range; ++range_i) {
|
||||||
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
||||||
if (self->latencyForLocalOperation) {
|
if (self->latencyForLocalOperation) {
|
||||||
|
@ -461,7 +495,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_CLEAR){
|
} else if (i == OP_CLEAR) {
|
||||||
if (self->latencyForLocalOperation) {
|
if (self->latencyForLocalOperation) {
|
||||||
double opBegin = timer();
|
double opBegin = timer();
|
||||||
tr.clear(rkey);
|
tr.clear(rkey);
|
||||||
|
@ -470,8 +504,9 @@ struct MakoWorkload : TestWorkload {
|
||||||
tr.clear(rkey);
|
tr.clear(rkey);
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if(i == OP_SETCLEAR){
|
} else if (i == OP_SETCLEAR) {
|
||||||
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN,
|
||||||
|
self->keyBytes - self->KEYPREFIXLEN);
|
||||||
if (self->latencyForLocalOperation) {
|
if (self->latencyForLocalOperation) {
|
||||||
double opBegin = timer();
|
double opBegin = timer();
|
||||||
tr.set(rkey, rval);
|
tr.set(rkey, rval);
|
||||||
|
@ -494,7 +529,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
tr.clear(rkey);
|
tr.clear(rkey);
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_CLEARRANGE){
|
} else if (i == OP_CLEARRANGE) {
|
||||||
if (self->latencyForLocalOperation) {
|
if (self->latencyForLocalOperation) {
|
||||||
double opBegin = timer();
|
double opBegin = timer();
|
||||||
tr.clear(rkeyRangeRef);
|
tr.clear(rkeyRangeRef);
|
||||||
|
@ -503,13 +538,13 @@ struct MakoWorkload : TestWorkload {
|
||||||
tr.clear(rkeyRangeRef);
|
tr.clear(rkeyRangeRef);
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_SETCLEARRANGE){
|
} else if (i == OP_SETCLEARRANGE) {
|
||||||
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
char* rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||||
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes - self->KEYPREFIXLEN);
|
||||||
state std::string scr_start_key;
|
state std::string scr_start_key;
|
||||||
state std::string scr_end_key;
|
state std::string scr_end_key;
|
||||||
state KeyRangeRef scr_key_range_ref;
|
state KeyRangeRef scr_key_range_ref;
|
||||||
for (int range_i = 0; range_i < range; ++range_i){
|
for (int range_i = 0; range_i < range; ++range_i) {
|
||||||
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
||||||
if (self->latencyForLocalOperation) {
|
if (self->latencyForLocalOperation) {
|
||||||
double opBegin = timer();
|
double opBegin = timer();
|
||||||
|
@ -518,8 +553,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
} else {
|
} else {
|
||||||
tr.set(rkey, self->randomValue());
|
tr.set(rkey, self->randomValue());
|
||||||
}
|
}
|
||||||
if (range_i == 0)
|
if (range_i == 0) scr_start_key = rkey.toString();
|
||||||
scr_start_key = rkey.toString();
|
|
||||||
}
|
}
|
||||||
scr_end_key = rkey.toString();
|
scr_end_key = rkey.toString();
|
||||||
scr_key_range_ref = KeyRangeRef(KeyRef(scr_start_key), KeyRef(scr_end_key));
|
scr_key_range_ref = KeyRangeRef(KeyRef(scr_start_key), KeyRef(scr_end_key));
|
||||||
|
@ -551,7 +585,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
// successfully finish the transaction, update metrics
|
// successfully finish the transaction, update metrics
|
||||||
++self->xacts;
|
++self->xacts;
|
||||||
for (int op = 0; op < MAX_OP; ++op){
|
for (int op = 0; op < MAX_OP; ++op) {
|
||||||
self->opCounters[op] += perOpCount[op];
|
self->opCounters[op] += perOpCount[op];
|
||||||
self->totalOps += perOpCount[op];
|
self->totalOps += perOpCount[op];
|
||||||
}
|
}
|
||||||
|
@ -571,18 +605,18 @@ struct MakoWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self) {
|
||||||
// clear all data starts with 'mako' in the database
|
// clear all data starts with 'mako' in the database
|
||||||
state std::string keyPrefix(self->keyPrefix);
|
state std::string keyPrefix(self->keyPrefix);
|
||||||
state ReadYourWritesTransaction tr(cx);
|
state ReadYourWritesTransaction tr(cx);
|
||||||
|
|
||||||
loop{
|
loop {
|
||||||
try {
|
try {
|
||||||
tr.clear(prefixRange(keyPrefix));
|
tr.clear(prefixRange(keyPrefix));
|
||||||
wait(tr.commit());
|
wait(tr.commit());
|
||||||
TraceEvent("CleanUpMakoRelatedData").detail("KeyPrefix", self->keyPrefix);
|
TraceEvent("CleanUpMakoRelatedData").detail("KeyPrefix", self->keyPrefix);
|
||||||
break;
|
break;
|
||||||
} catch (Error &e){
|
} catch (Error& e) {
|
||||||
TraceEvent("FailedToCleanData").error(e);
|
TraceEvent("FailedToCleanData").error(e);
|
||||||
wait(tr.onError(e));
|
wait(tr.onError(e));
|
||||||
}
|
}
|
||||||
|
@ -590,8 +624,8 @@ struct MakoWorkload : TestWorkload {
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
ACTOR template<class T>
|
ACTOR template <class T>
|
||||||
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies){
|
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies) {
|
||||||
state double opBegin = timer();
|
state double opBegin = timer();
|
||||||
wait(success(f));
|
wait(success(f));
|
||||||
opLatencies->addSample(timer() - opBegin);
|
opLatencies->addSample(timer() - opBegin);
|
||||||
|
@ -600,7 +634,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
|
|
||||||
int64_t getRandomKeyIndex(uint64_t rowCount) {
|
int64_t getRandomKeyIndex(uint64_t rowCount) {
|
||||||
int64_t randomKeyIndex;
|
int64_t randomKeyIndex;
|
||||||
if (zipf){
|
if (zipf) {
|
||||||
randomKeyIndex = zipfian_next();
|
randomKeyIndex = zipfian_next();
|
||||||
} else {
|
} else {
|
||||||
randomKeyIndex = deterministicRandom()->randomInt64(0, rowCount);
|
randomKeyIndex = deterministicRandom()->randomInt64(0, rowCount);
|
||||||
|
@ -608,7 +642,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
return randomKeyIndex;
|
return randomKeyIndex;
|
||||||
}
|
}
|
||||||
void parseOperationsSpec() {
|
void parseOperationsSpec() {
|
||||||
const char *ptr = operationsSpec.c_str();
|
const char* ptr = operationsSpec.c_str();
|
||||||
int op = 0;
|
int op = 0;
|
||||||
int rangeop = 0;
|
int rangeop = 0;
|
||||||
int num;
|
int num;
|
||||||
|
@ -703,19 +737,21 @@ struct MakoWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (error) {
|
if (error) {
|
||||||
TraceEvent(SevError, "TestFailure").detail("Reason", "InvalidTransactionSpecification").detail("operations", operationsSpec);
|
TraceEvent(SevError, "TestFailure")
|
||||||
|
.detail("Reason", "InvalidTransactionSpecification")
|
||||||
|
.detail("operations", operationsSpec);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<uint32_t> calcCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIndex){
|
ACTOR static Future<uint32_t> calcCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIndex) {
|
||||||
state uint32_t result = 0;
|
state uint32_t result = 0;
|
||||||
state int i;
|
state int i;
|
||||||
state Key csKey;
|
state Key csKey;
|
||||||
for( i = 0; i < self->csSize; ++i){
|
for (i = 0; i < self->csSize; ++i) {
|
||||||
int idx = csIndex * self->csStepSizeInPartition + i * self->csPartitionSize;
|
int idx = csIndex * self->csStepSizeInPartition + i * self->csPartitionSize;
|
||||||
csKey = self->keyForIndex(idx);
|
csKey = self->keyForIndex(idx);
|
||||||
Optional<Value> temp = wait(tr->get(csKey));
|
Optional<Value> temp = wait(tr->get(csKey));
|
||||||
if (temp.present()){
|
if (temp.present()) {
|
||||||
Value val = temp.get();
|
Value val = temp.get();
|
||||||
result = crc32c_append(result, val.begin(), val.size());
|
result = crc32c_append(result, val.begin(), val.size());
|
||||||
} else {
|
} else {
|
||||||
|
@ -736,23 +772,31 @@ struct MakoWorkload : TestWorkload {
|
||||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||||
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
|
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
|
||||||
Optional<Value> temp = wait(tr.get(self->csKeys[csIdx]));
|
Optional<Value> temp = wait(tr.get(self->csKeys[csIdx]));
|
||||||
if (!temp.present()){
|
if (!temp.present()) {
|
||||||
TraceEvent(SevError, "TestFailure").detail("Reason", "NoExistingChecksum").detail("missedChecksumIndex", csIdx);
|
TraceEvent(SevError, "TestFailure")
|
||||||
|
.detail("Reason", "NoExistingChecksum")
|
||||||
|
.detail("missedChecksumIndex", csIdx);
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
csValue = temp.get();
|
csValue = temp.get();
|
||||||
ASSERT(csValue.size() == sizeof(uint32_t));
|
ASSERT(csValue.size() == sizeof(uint32_t));
|
||||||
uint32_t calculatedCS = wait(calcCheckSum(&tr, self, csIdx));
|
uint32_t calculatedCS = wait(calcCheckSum(&tr, self, csIdx));
|
||||||
uint32_t existingCS = *(reinterpret_cast<const uint32_t*>(csValue.begin()));
|
uint32_t existingCS = *(reinterpret_cast<const uint32_t*>(csValue.begin()));
|
||||||
if (existingCS != calculatedCS){
|
if (existingCS != calculatedCS) {
|
||||||
TraceEvent(SevError, "TestFailure").detail("Reason", "ChecksumVerificationFailure").detail("ChecksumIndex", csIdx).detail("ExistingChecksum", existingCS).detail("CurrentChecksum", calculatedCS);
|
TraceEvent(SevError, "TestFailure")
|
||||||
|
.detail("Reason", "ChecksumVerificationFailure")
|
||||||
|
.detail("ChecksumIndex", csIdx)
|
||||||
|
.detail("ExistingChecksum", existingCS)
|
||||||
|
.detail("CurrentChecksum", calculatedCS);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
TraceEvent("ChecksumVerificationPass").detail("ChecksumIndex", csIdx).detail("ChecksumValue", existingCS);
|
TraceEvent("ChecksumVerificationPass")
|
||||||
|
.detail("ChecksumIndex", csIdx)
|
||||||
|
.detail("ChecksumValue", existingCS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
} catch(Error& e) {
|
} catch (Error& e) {
|
||||||
TraceEvent("FailedToCalculateChecksum").detail("ChecksumIndex", csIdx).error(e);
|
TraceEvent("FailedToCalculateChecksum").detail("ChecksumIndex", csIdx).error(e);
|
||||||
wait(tr.onError(e));
|
wait(tr.onError(e));
|
||||||
}
|
}
|
||||||
|
@ -772,7 +816,7 @@ struct MakoWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
wait(tr.commit());
|
wait(tr.commit());
|
||||||
break;
|
break;
|
||||||
} catch (Error &e) {
|
} catch (Error& e) {
|
||||||
TraceEvent("FailedToGenerateChecksumForPopulatedData").error(e);
|
TraceEvent("FailedToGenerateChecksumForPopulatedData").error(e);
|
||||||
wait(tr.onError(e));
|
wait(tr.onError(e));
|
||||||
}
|
}
|
||||||
|
@ -780,20 +824,20 @@ struct MakoWorkload : TestWorkload {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> updateCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIdx){
|
ACTOR static Future<Void> updateCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIdx) {
|
||||||
state uint32_t csVal = wait(calcCheckSum(tr, self, csIdx));
|
state uint32_t csVal = wait(calcCheckSum(tr, self, csIdx));
|
||||||
TraceEvent("UpdateCheckSum").detail("ChecksumIndex", csIdx).detail("Checksum", csVal);
|
TraceEvent("UpdateCheckSum").detail("ChecksumIndex", csIdx).detail("Checksum", csVal);
|
||||||
tr->set(self->csKeys[csIdx], ValueRef(reinterpret_cast<const uint8_t*>(&csVal), sizeof(uint32_t)));
|
tr->set(self->csKeys[csIdx], ValueRef(reinterpret_cast<const uint8_t*>(&csVal), sizeof(uint32_t)));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> updateCSBeforeCommit(ReadYourWritesTransaction* tr, MakoWorkload* self, std::vector<bool>* flags){
|
ACTOR static Future<Void> updateCSBeforeCommit(ReadYourWritesTransaction* tr, MakoWorkload* self,
|
||||||
if (!self->checksumVerification)
|
std::vector<bool>* flags) {
|
||||||
return Void();
|
if (!self->checksumVerification) return Void();
|
||||||
|
|
||||||
state int csIdx;
|
state int csIdx;
|
||||||
for (csIdx = 0; csIdx < self->csCount; ++csIdx){
|
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
|
||||||
if ((*flags)[csIdx]){
|
if ((*flags)[csIdx]) {
|
||||||
wait(updateCheckSum(tr, self, csIdx));
|
wait(updateCheckSum(tr, self, csIdx));
|
||||||
(*flags)[csIdx] = false;
|
(*flags)[csIdx] = false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue