Log latency for set, clear, clearrange
This commit is contained in:
parent
ad31de6b8e
commit
ce7a35a4be
|
@ -14,7 +14,7 @@ enum {OP_COUNT, OP_RANGE};
|
|||
struct MakoWorkload : TestWorkload {
|
||||
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes, csSize, csCount, csPartitionSize, csStepSizeInPartition;
|
||||
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval, zipfConstant;
|
||||
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification, doChecksumVerificationOnly;
|
||||
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification, doChecksumVerificationOnly, latencyForLocalOperation;
|
||||
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
||||
std::vector<PerfIntCounter> opCounters;
|
||||
std::vector<uint64_t> insertionCountsToMeasure;
|
||||
|
@ -52,6 +52,8 @@ struct MakoWorkload : TestWorkload {
|
|||
preserveData = getOption(options, LiteralStringRef("preserveData"), true);
|
||||
// If true, force commit for read-only transactions
|
||||
commitGet = getOption(options, LiteralStringRef("commitGet"), false);
|
||||
// If true, log latency for set, clear and clearrange
|
||||
latencyForLocalOperation = getOption(options, LiteralStringRef("latencyForLocalOperation"), false);
|
||||
// Target total transaction-per-second (TPS) of all clients
|
||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 100000.0) / clientCount;
|
||||
actorCountPerClient = getOption(options, LiteralStringRef("actorCountPerClient"), 16);
|
||||
|
@ -188,6 +190,18 @@ struct MakoWorkload : TestWorkload {
|
|||
m.push_back(PerfMetric("Max " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].max(), true));
|
||||
m.push_back(PerfMetric("Min " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].min(), true));
|
||||
}
|
||||
// Latency for local operations if needed
|
||||
if (latencyForLocalOperation) {
|
||||
const int localOp[] = {OP_INSERT, OP_CLEAR, OP_CLEARRANGE};
|
||||
for (const int& op : localOp){
|
||||
TraceEvent(SevDebug, "LocalLatency")
|
||||
.detail("Name", opNames[op])
|
||||
.detail("Size", opLatencies[op].getPopulationSize());
|
||||
m.push_back(PerfMetric("Mean " + opNames[op] +" Latency (ms)", 1000 * opLatencies[op].mean(), true));
|
||||
m.push_back(PerfMetric("Max " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].max(), true));
|
||||
m.push_back(PerfMetric("Min " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].min(), true));
|
||||
}
|
||||
}
|
||||
|
||||
//insert logging metrics if exists
|
||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||
|
@ -414,27 +428,57 @@ struct MakoWorkload : TestWorkload {
|
|||
wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, true), &self->opLatencies[i]));
|
||||
} else if (i == OP_UPDATE){
|
||||
wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET]));
|
||||
tr.set(rkey, rval);
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.set(rkey, rval);
|
||||
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.set(rkey, rval);
|
||||
}
|
||||
doCommit = true;
|
||||
} else if (i == OP_INSERT){
|
||||
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters
|
||||
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
tr.set(rkey, rval);
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.set(rkey, rval);
|
||||
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.set(rkey, rval);
|
||||
}
|
||||
doCommit = true;
|
||||
} else if (i == OP_INSERTRANGE){
|
||||
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
for (int range_i = 0; range_i < range; ++range_i){
|
||||
for (int range_i = 0; range_i < range; ++range_i) {
|
||||
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
||||
tr.set(rkey, self->randomValue());
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.set(rkey, self->randomValue());
|
||||
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.set(rkey, self->randomValue());
|
||||
}
|
||||
}
|
||||
doCommit = true;
|
||||
} else if (i == OP_CLEAR){
|
||||
tr.clear(rkey);
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.clear(rkey);
|
||||
self->opLatencies[OP_CLEAR].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.clear(rkey);
|
||||
}
|
||||
doCommit = true;
|
||||
} else if(i == OP_SETCLEAR){
|
||||
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
tr.set(rkey, rval);
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.set(rkey, rval);
|
||||
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.set(rkey, rval);
|
||||
}
|
||||
wait(self->updateCSBeforeCommit(&tr, self, &csChangedFlags));
|
||||
// commit the change and update metrics
|
||||
commitStart = now();
|
||||
|
@ -442,10 +486,22 @@ struct MakoWorkload : TestWorkload {
|
|||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||
++perOpCount[OP_COMMIT];
|
||||
tr.reset();
|
||||
tr.clear(rkey);
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.clear(rkey);
|
||||
self->opLatencies[OP_CLEAR].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.clear(rkey);
|
||||
}
|
||||
doCommit = true;
|
||||
} else if (i == OP_CLEARRANGE){
|
||||
tr.clear(rkeyRangeRef);
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.clear(rkeyRangeRef);
|
||||
self->opLatencies[OP_CLEARRANGE].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.clear(rkeyRangeRef);
|
||||
}
|
||||
doCommit = true;
|
||||
} else if (i == OP_SETCLEARRANGE){
|
||||
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||
|
@ -455,7 +511,13 @@ struct MakoWorkload : TestWorkload {
|
|||
state KeyRangeRef scr_key_range_ref;
|
||||
for (int range_i = 0; range_i < range; ++range_i){
|
||||
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
||||
tr.set(rkey, self->randomValue());
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.set(rkey, self->randomValue());
|
||||
self->opLatencies[OP_INSERT].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.set(rkey, self->randomValue());
|
||||
}
|
||||
if (range_i == 0)
|
||||
scr_start_key = rkey.toString();
|
||||
}
|
||||
|
@ -467,7 +529,13 @@ struct MakoWorkload : TestWorkload {
|
|||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||
++perOpCount[OP_COMMIT];
|
||||
tr.reset();
|
||||
tr.clear(scr_key_range_ref);
|
||||
if (self->latencyForLocalOperation) {
|
||||
double opBegin = timer();
|
||||
tr.clear(scr_key_range_ref);
|
||||
self->opLatencies[OP_CLEARRANGE].addSample(timer() - opBegin);
|
||||
} else {
|
||||
tr.clear(scr_key_range_ref);
|
||||
}
|
||||
doCommit = true;
|
||||
}
|
||||
++perOpCount[i];
|
||||
|
|
Loading…
Reference in New Issue