From 5078bc5728384505006bb85b4f99f2ff1bf04bbb Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Tue, 1 Sep 2020 17:28:32 -0700 Subject: [PATCH] clang-format MakoWorkload --- fdbserver/workloads/Mako.actor.cpp | 354 ++++++++++++++++------------- 1 file changed, 199 insertions(+), 155 deletions(-) diff --git a/fdbserver/workloads/Mako.actor.cpp b/fdbserver/workloads/Mako.actor.cpp index 2b1b660d24..d7ab755cc8 100644 --- a/fdbserver/workloads/Mako.actor.cpp +++ b/fdbserver/workloads/Mako.actor.cpp @@ -7,20 +7,36 @@ #include "flow/crc32c.h" #include "flow/actorcompiler.h" - - -enum {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_UPDATE, OP_INSERT, OP_INSERTRANGE, OP_CLEAR, OP_SETCLEAR, OP_CLEARRANGE, OP_SETCLEARRANGE, OP_COMMIT, MAX_OP}; -enum {OP_COUNT, OP_RANGE}; +enum { + OP_GETREADVERSION, + OP_GET, + OP_GETRANGE, + OP_SGET, + OP_SGETRANGE, + OP_UPDATE, + OP_INSERT, + OP_INSERTRANGE, + OP_CLEAR, + OP_SETCLEAR, + OP_CLEARRANGE, + OP_SETCLEARRANGE, + OP_COMMIT, + MAX_OP +}; +enum { OP_COUNT, OP_RANGE }; struct MakoWorkload : TestWorkload { - uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes, csSize, csCount, csPartitionSize, csStepSizeInPartition; - double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval, zipfConstant; - bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification, doChecksumVerificationOnly, latencyForLocalOperation; + uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes, csSize, + csCount, csPartitionSize, csStepSizeInPartition; + double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, + periodicLoggingInterval, zipfConstant; + bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification, + doChecksumVerificationOnly, latencyForLocalOperation; PerfIntCounter xacts, retries, conflicts, commits, totalOps; std::vector opCounters; std::vector insertionCountsToMeasure; std::vector> ratesAtKeyCounts; std::string operationsSpec; - //store operations to execute + // store operations to execute int operations[MAX_OP][2]; // used for periodically tracing std::vector periodicMetrics; @@ -31,12 +47,13 @@ struct MakoWorkload : TestWorkload { // key prefix of for all generated keys std::string keyPrefix; int KEYPREFIXLEN; - const std::array opNames = {"GRV", "GET", "GETRANGE", "SGET", "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", "COMMIT"}; + const std::array opNames = { "GRV", "GET", "GETRANGE", "SGET", + "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", + "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", + "COMMIT" }; MakoWorkload(WorkloadContext const& wcx) - : TestWorkload(wcx), - xacts("Transactions"), retries("Retries"), conflicts("Conflicts"), commits("Commits"), totalOps("Operations"), - loadTime(0.0) - { + : TestWorkload(wcx), xacts("Transactions"), retries("Retries"), conflicts("Conflicts"), commits("Commits"), + totalOps("Operations"), loadTime(0.0) { // init parameters from test file // Number of rows populated rowCount = getOption(options, LiteralStringRef("rows"), 10000); @@ -61,17 +78,17 @@ struct MakoWorkload : TestWorkload { sampleSize = getOption(options, LiteralStringRef("sampleSize"), rowCount / 100); // If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically() enableLogging = getOption(options, LiteralStringRef("enableLogging"), false); - periodicLoggingInterval = getOption( options, LiteralStringRef("periodicLoggingInterval"), 5.0 ); + periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0); // All the generated keys will start with the specified prefix - keyPrefix = getOption( options, LiteralStringRef("keyPrefix"), LiteralStringRef("mako")).toString(); + keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("mako")).toString(); KEYPREFIXLEN = keyPrefix.size(); // If true, the workload will picking up keys which are zipfian distributed zipf = getOption(options, LiteralStringRef("zipf"), false); zipfConstant = getOption(options, LiteralStringRef("zipfConstant"), 0.99); // Specified length of keys and length range of values - keyBytes = std::max( getOption( options, LiteralStringRef("keyBytes"), 16 ), 16); - maxValueBytes = getOption( options, LiteralStringRef("valueBytes"), 16 ); - minValueBytes = getOption( options, LiteralStringRef("minValueBytes"), maxValueBytes); + keyBytes = std::max(getOption(options, LiteralStringRef("keyBytes"), 16), 16); + maxValueBytes = getOption(options, LiteralStringRef("valueBytes"), 16); + minValueBytes = getOption(options, LiteralStringRef("minValueBytes"), maxValueBytes); ASSERT(minValueBytes <= maxValueBytes); // The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x') // assume we want to insert 10000 rows with keyBytes set to 16, @@ -85,20 +102,21 @@ struct MakoWorkload : TestWorkload { // Multiple operation types can be concatenated. (e.g. "g9u1" = 9 GETs and 1 update) // For RANGE operations, "Range" needs to be specified in addition to "Count". // Below are all allowed inputs: - // g – GET - // gr – GET RANGE - // sg – Snapshot GET - // sgr – Snapshot GET RANGE - // u – Update (= GET followed by SET) - // i – Insert (= SET with a new key) - // ir – Insert Range (Sequential) - // c – CLEAR - // sc – SET & CLEAR - // cr – CLEAR RANGE - // scr – SET & CLEAR RANGE - // grv – GetReadVersion() + // g – GET + // gr – GET RANGE + // sg – Snapshot GET + // sgr – Snapshot GET RANGE + // u – Update (= GET followed by SET) + // i – Insert (= SET with a new key) + // ir – Insert Range (Sequential) + // c – CLEAR + // sc – SET & CLEAR + // cr – CLEAR RANGE + // scr – SET & CLEAR RANGE + // grv – GetReadVersion() // Every transaction is committed unless it contains only GET / GET RANGE operations. - operationsSpec = getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString(); + operationsSpec = + getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString(); // parse the sequence and extract operations to be executed parseOperationsSpec(); for (int i = 0; i < MAX_OP; ++i) { @@ -107,8 +125,8 @@ struct MakoWorkload : TestWorkload { // initialize per-operation counter opCounters.push_back(PerfIntCounter(opNames[i])); } - if (zipf){ - zipfian_generator3(0, (int)rowCount-1, zipfConstant); + if (zipf) { + zipfian_generator3(0, (int)rowCount - 1, zipfConstant); } // Added for checksum verification csSize = getOption(options, LiteralStringRef("csSize"), rowCount / 100); @@ -122,7 +140,7 @@ struct MakoWorkload : TestWorkload { csPartitionSize = rowCount / csSize; ASSERT(csCount <= csPartitionSize); csStepSizeInPartition = csPartitionSize / csCount; - for (int i= 0; i < csCount; ++i) { + for (int i = 0; i < csCount; ++i) { csKeys.emplace_back(format((keyPrefix + "_crc32c_%u_%u").c_str(), i, rowCount)); } } @@ -135,19 +153,17 @@ struct MakoWorkload : TestWorkload { } Future setup(Database const& cx) override { - if (doChecksumVerificationOnly) - return Void(); + if (doChecksumVerificationOnly) return Void(); return _setup(cx, this); } Future start(Database const& cx) override { - if (doChecksumVerificationOnly) - return Void(); + if (doChecksumVerificationOnly) return Void(); return _start(cx, this); } Future check(Database const& cx) override { - if (!checksumVerification){ + if (!checksumVerification) { return true; } // verify checksum consistency @@ -155,20 +171,21 @@ struct MakoWorkload : TestWorkload { } // disable the default timeout setting - double getCheckTimeout() override {return std::numeric_limits::max();} + double getCheckTimeout() override { return std::numeric_limits::max(); } void getMetrics(std::vector& m) override { // metrics of population process - if (populateData){ - m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) ); + if (populateData) { + m.push_back(PerfMetric("Mean load time (seconds)", loadTime, true)); // The importing rate of keys, controlled by parameter "insertionCountsToMeasure" auto ratesItr = ratesAtKeyCounts.begin(); - for(; ratesItr != ratesAtKeyCounts.end(); ratesItr++){ - m.push_back(PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false)); + for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) { + m.push_back( + PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false)); } } // benchmark - if (runBenchmark){ + if (runBenchmark) { m.push_back(PerfMetric("Measured Duration", testDuration, true)); m.push_back(xacts.getMetric()); m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true)); @@ -179,31 +196,36 @@ struct MakoWorkload : TestWorkload { m.push_back(retries.getMetric()); // count of each operation - for (int i = 0; i < MAX_OP; ++i){ + for (int i = 0; i < MAX_OP; ++i) { m.push_back(opCounters[i].getMetric()); } // Meaningful Latency metrics - const int opExecutedAtOnce[] = {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT}; - for (const int& op : opExecutedAtOnce){ - m.push_back(PerfMetric("Mean " + opNames[op] +" Latency (us)", 1e6 * opLatencies[op].mean(), true)); - m.push_back(PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true)); - m.push_back(PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true)); + const int opExecutedAtOnce[] = { OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT }; + for (const int& op : opExecutedAtOnce) { + m.push_back(PerfMetric("Mean " + opNames[op] + " Latency (us)", 1e6 * opLatencies[op].mean(), true)); + m.push_back( + PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true)); + m.push_back( + PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true)); } // Latency for local operations if needed if (latencyForLocalOperation) { - const int localOp[] = {OP_INSERT, OP_CLEAR, OP_CLEARRANGE}; - for (const int& op : localOp){ + const int localOp[] = { OP_INSERT, OP_CLEAR, OP_CLEARRANGE }; + for (const int& op : localOp) { TraceEvent(SevDebug, "LocalLatency") - .detail("Name", opNames[op]) - .detail("Size", opLatencies[op].getPopulationSize()); - m.push_back(PerfMetric("Mean " + opNames[op] +" Latency (us)", 1e6 * opLatencies[op].mean(), true)); - m.push_back(PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].max(), true)); - m.push_back(PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", 1e6 * opLatencies[op].min(), true)); + .detail("Name", opNames[op]) + .detail("Size", opLatencies[op].getPopulationSize()); + m.push_back( + PerfMetric("Mean " + opNames[op] + " Latency (us)", 1e6 * opLatencies[op].mean(), true)); + m.push_back(PerfMetric("Max " + opNames[op] + " Latency (us, averaged)", + 1e6 * opLatencies[op].max(), true)); + m.push_back(PerfMetric("Min " + opNames[op] + " Latency (us, averaged)", + 1e6 * opLatencies[op].min(), true)); } } - //insert logging metrics if exists + // insert logging metrics if exists m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end()); } } @@ -215,7 +237,7 @@ struct MakoWorkload : TestWorkload { return result; } - static void randStr(char *str, int len){ + static void randStr(char* str, int len) { for (int i = 0; i < len; ++i) { str[i] = deterministicRandom()->randomAlphaNumeric(); } @@ -231,8 +253,7 @@ struct MakoWorkload : TestWorkload { Key result = makeString(keyBytes); char* data = reinterpret_cast(mutateString(result)); format((keyPrefix + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen); - for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i) - data[i] = 'x'; + for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i) data[i] = 'x'; return result; } @@ -246,11 +267,11 @@ struct MakoWorkload : TestWorkload { return digits; } - static void updateCSFlags(MakoWorkload* self, std::vector& flags, uint64_t startIdx, uint64_t endIdx){ + static void updateCSFlags(MakoWorkload* self, std::vector& flags, uint64_t startIdx, uint64_t endIdx) { // We deal with cases where rowCount % csCount != 0 and csPartitionSize % csSize != 0; - // In particular, all keys with index in range [csSize * csPartitionSize, rowCount) will not be used for checksum - // By the same way, for any i in range [0, csSize): - // keys with index in range [ i*csPartitionSize, i*csPartitionSize + csCount*csStepSizeInPartition) will not be used for checksum + // In particular, all keys with index in range [csSize * csPartitionSize, rowCount) will not be used for + // checksum By the same way, for any i in range [0, csSize): keys with index in range [ i*csPartitionSize, + // i*csPartitionSize + csCount*csStepSizeInPartition) will not be used for checksum uint64_t boundary = self->csSize * self->csPartitionSize; if (startIdx >= boundary) return; @@ -258,17 +279,16 @@ struct MakoWorkload : TestWorkload { endIdx = boundary; // If all checksums need to be updated, just return - if (std::all_of(flags.begin(), flags.end(), [](bool flag){return flag;})) - return; + if (std::all_of(flags.begin(), flags.end(), [](bool flag) { return flag; })) return; - if (startIdx + 1 == endIdx){ + if (startIdx + 1 == endIdx) { // single key case startIdx = startIdx % self->csPartitionSize; - if ((startIdx < self->csCount * self->csStepSizeInPartition) && (startIdx % self->csStepSizeInPartition == 0)){ + if ((startIdx < self->csCount * self->csStepSizeInPartition) && + (startIdx % self->csStepSizeInPartition == 0)) { flags.at(startIdx / self->csStepSizeInPartition) = true; } - } - else { + } else { // key range case uint64_t count = self->csCount; uint64_t base = (startIdx / self->csPartitionSize) * self->csPartitionSize; @@ -277,29 +297,28 @@ struct MakoWorkload : TestWorkload { uint64_t startStepIdx = std::min(startIdx / self->csStepSizeInPartition, self->csCount - 1); // if changed range size is more than one csPartitionSize, which means every checksum needs to be updated - if ((endIdx - startIdx) < self->csPartitionSize){ + if ((endIdx - startIdx) < self->csPartitionSize) { uint64_t endStepIdx; - if (endIdx > self->csPartitionSize){ - endStepIdx = self->csCount + std::min((endIdx - 1 - self->csPartitionSize) / self->csStepSizeInPartition, self->csCount); + if (endIdx > self->csPartitionSize) { + endStepIdx = + self->csCount + + std::min((endIdx - 1 - self->csPartitionSize) / self->csStepSizeInPartition, self->csCount); } else { endStepIdx = std::min((endIdx - 1) / self->csStepSizeInPartition, self->csCount - 1); } // All the left boundary of csStep should be updated // Also, check the startIdx whether it is the left boundary of a csStep - if (startIdx == self->csStepSizeInPartition * startStepIdx) - flags[startStepIdx] = true; + if (startIdx == self->csStepSizeInPartition * startStepIdx) flags[startStepIdx] = true; count = endStepIdx - startStepIdx; } - for (int i = 1; i <= count; ++i){ - flags[ (startStepIdx+i) % self->csCount] = true; + for (int i = 1; i <= count; ++i) { + flags[(startStepIdx + i) % self->csCount] = true; } } } - Standalone operator()(uint64_t n) { - return KeyValueRef(keyForIndex(n), randomValue()); - } + Standalone operator()(uint64_t n) { return KeyValueRef(keyForIndex(n), randomValue()); } - ACTOR static Future tracePeriodically( MakoWorkload *self){ + ACTOR static Future tracePeriodically(MakoWorkload* self) { state double start = timer(); state double elapsed = 0.0; state int64_t last_ops = 0; @@ -307,13 +326,26 @@ struct MakoWorkload : TestWorkload { loop { elapsed += self->periodicLoggingInterval; - wait( delayUntil(start + elapsed)); - TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->opLatencies[OP_COMMIT].mean()).detail("Median", self->opLatencies[OP_COMMIT].median()).detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05)).detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95)).detail("Count", self->opCounters[OP_COMMIT].getValue()).detail("Elapsed", elapsed); - TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->opLatencies[OP_GETREADVERSION].mean()).detail("Median", self->opLatencies[OP_GETREADVERSION].median()).detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05)).detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95)).detail("Count", self->opCounters[OP_GETREADVERSION].getValue()); + wait(delayUntil(start + elapsed)); + TraceEvent((self->description() + "_CommitLatency").c_str()) + .detail("Mean", self->opLatencies[OP_COMMIT].mean()) + .detail("Median", self->opLatencies[OP_COMMIT].median()) + .detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05)) + .detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95)) + .detail("Count", self->opCounters[OP_COMMIT].getValue()) + .detail("Elapsed", elapsed); + TraceEvent((self->description() + "_GRVLatency").c_str()) + .detail("Mean", self->opLatencies[OP_GETREADVERSION].mean()) + .detail("Median", self->opLatencies[OP_GETREADVERSION].median()) + .detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05)) + .detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95)) + .detail("Count", self->opCounters[OP_GETREADVERSION].getValue()); std::string ts = format("T=%04.0fs: ", elapsed); - self->periodicMetrics.push_back(PerfMetric(ts + "Transactions/sec", (self->xacts.getValue() - last_xacts) / self->periodicLoggingInterval, false)); - self->periodicMetrics.push_back(PerfMetric(ts + "Operations/sec", (self->totalOps.getValue() - last_ops) / self->periodicLoggingInterval, false)); + self->periodicMetrics.push_back(PerfMetric( + ts + "Transactions/sec", (self->xacts.getValue() - last_xacts) / self->periodicLoggingInterval, false)); + self->periodicMetrics.push_back(PerfMetric( + ts + "Operations/sec", (self->totalOps.getValue() - last_ops) / self->periodicLoggingInterval, false)); last_xacts = self->xacts.getValue(); last_ops = self->totalOps.getValue(); @@ -325,8 +357,8 @@ struct MakoWorkload : TestWorkload { state Promise loadTime; state Promise>> ratesAtKeyCounts; - wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay, - self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts)); + wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), + self->warmingDelay, self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts)); // This is the setup time self->loadTime = loadTime.getFuture().get(); @@ -334,7 +366,7 @@ struct MakoWorkload : TestWorkload { self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get(); } // Use one client to initialize checksums - if (self->checksumVerification && self->clientId == 0){ + if (self->checksumVerification && self->clientId == 0) { wait(generateChecksum(cx, self)); } @@ -346,22 +378,21 @@ struct MakoWorkload : TestWorkload { if (self->runBenchmark) { wait(self->_runBenchmark(cx, self)); } - if (!self->preserveData && self->clientId == 0){ + if (!self->preserveData && self->clientId == 0) { wait(self->cleanup(cx, self)); } return Void(); } - ACTOR Future _runBenchmark(Database cx, MakoWorkload* self){ + ACTOR Future _runBenchmark(Database cx, MakoWorkload* self) { std::vector> clients; for (int c = 0; c < self->actorCountPerClient; ++c) { clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c)); } - if (self->enableLogging) - clients.push_back(tracePeriodically(self)); + if (self->enableLogging) clients.push_back(tracePeriodically(self)); - wait( timeout( waitForAll( clients ), self->testDuration, Void() ) ); + wait(timeout(waitForAll(clients), self->testDuration, Void())); return Void(); } @@ -380,17 +411,19 @@ struct MakoWorkload : TestWorkload { state double lastTime = timer(); state double commitStart; - TraceEvent("ClientStarting").detail("ActorIndex", actorIndex).detail("ClientIndex", self->clientId).detail("NumActors", self->actorCountPerClient); + TraceEvent("ClientStarting") + .detail("ActorIndex", actorIndex) + .detail("ClientIndex", self->clientId) + .detail("NumActors", self->actorCountPerClient); loop { // used for throttling wait(poisson(&lastTime, delay)); - try{ + try { // user-defined value: whether commit read-only ops or not; default is false doCommit = self->commitGet; for (i = 0; i < MAX_OP; ++i) { - if (i == OP_COMMIT) - continue; + if (i == OP_COMMIT) continue; for (count = 0; count < self->operations[i][0]; ++count) { range = self->operations[i][1]; rangeLen = digits(range); @@ -404,29 +437,28 @@ struct MakoWorkload : TestWorkload { rkeyRangeRef = KeyRangeRef(rkey, rkey2); // used for mako-level consistency check - if (self->checksumVerification){ + if (self->checksumVerification) { if (i == OP_INSERT | i == OP_UPDATE | i == OP_CLEAR) { updateCSFlags(self, csChangedFlags, indBegin, indBegin + 1); - } - else if (i == OP_CLEARRANGE) { + } else if (i == OP_CLEARRANGE) { updateCSFlags(self, csChangedFlags, indBegin, indEnd); } } - if (i == OP_GETREADVERSION){ + if (i == OP_GETREADVERSION) { wait(logLatency(tr.getReadVersion(), &self->opLatencies[i])); - } - else if (i == OP_GET){ + } else if (i == OP_GET) { wait(logLatency(tr.get(rkey, false), &self->opLatencies[i])); - } else if (i == OP_GETRANGE){ - wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, false), &self->opLatencies[i])); - } - else if (i == OP_SGET){ + } else if (i == OP_GETRANGE) { + wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, false), + &self->opLatencies[i])); + } else if (i == OP_SGET) { wait(logLatency(tr.get(rkey, true), &self->opLatencies[i])); - } else if (i == OP_SGETRANGE){ - //do snapshot get range here - wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, true), &self->opLatencies[i])); - } else if (i == OP_UPDATE){ + } else if (i == OP_SGETRANGE) { + // do snapshot get range here + wait(logLatency(tr.getRange(rkeyRangeRef, CLIENT_KNOBS->TOO_MANY, true), + &self->opLatencies[i])); + } else if (i == OP_UPDATE) { wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET])); if (self->latencyForLocalOperation) { double opBegin = timer(); @@ -436,9 +468,11 @@ struct MakoWorkload : TestWorkload { tr.set(rkey, rval); } doCommit = true; - } else if (i == OP_INSERT){ - // generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters - randStr(reinterpret_cast(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN); + } else if (i == OP_INSERT) { + // generate an (almost) unique key here, it starts with 'mako' and then comes with randomly + // generated characters + randStr(reinterpret_cast(mutateString(rkey)) + self->KEYPREFIXLEN, + self->keyBytes - self->KEYPREFIXLEN); if (self->latencyForLocalOperation) { double opBegin = timer(); tr.set(rkey, rval); @@ -447,9 +481,9 @@ struct MakoWorkload : TestWorkload { tr.set(rkey, rval); } doCommit = true; - } else if (i == OP_INSERTRANGE){ - char *rkeyPtr = reinterpret_cast(mutateString(rkey)); - randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN); + } else if (i == OP_INSERTRANGE) { + char* rkeyPtr = reinterpret_cast(mutateString(rkey)); + randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes - self->KEYPREFIXLEN); for (int range_i = 0; range_i < range; ++range_i) { format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen); if (self->latencyForLocalOperation) { @@ -461,7 +495,7 @@ struct MakoWorkload : TestWorkload { } } doCommit = true; - } else if (i == OP_CLEAR){ + } else if (i == OP_CLEAR) { if (self->latencyForLocalOperation) { double opBegin = timer(); tr.clear(rkey); @@ -470,8 +504,9 @@ struct MakoWorkload : TestWorkload { tr.clear(rkey); } doCommit = true; - } else if(i == OP_SETCLEAR){ - randStr(reinterpret_cast(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN); + } else if (i == OP_SETCLEAR) { + randStr(reinterpret_cast(mutateString(rkey)) + self->KEYPREFIXLEN, + self->keyBytes - self->KEYPREFIXLEN); if (self->latencyForLocalOperation) { double opBegin = timer(); tr.set(rkey, rval); @@ -494,7 +529,7 @@ struct MakoWorkload : TestWorkload { tr.clear(rkey); } doCommit = true; - } else if (i == OP_CLEARRANGE){ + } else if (i == OP_CLEARRANGE) { if (self->latencyForLocalOperation) { double opBegin = timer(); tr.clear(rkeyRangeRef); @@ -503,13 +538,13 @@ struct MakoWorkload : TestWorkload { tr.clear(rkeyRangeRef); } doCommit = true; - } else if (i == OP_SETCLEARRANGE){ - char *rkeyPtr = reinterpret_cast(mutateString(rkey)); - randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN); + } else if (i == OP_SETCLEARRANGE) { + char* rkeyPtr = reinterpret_cast(mutateString(rkey)); + randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes - self->KEYPREFIXLEN); state std::string scr_start_key; state std::string scr_end_key; state KeyRangeRef scr_key_range_ref; - for (int range_i = 0; range_i < range; ++range_i){ + for (int range_i = 0; range_i < range; ++range_i) { format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen); if (self->latencyForLocalOperation) { double opBegin = timer(); @@ -518,8 +553,7 @@ struct MakoWorkload : TestWorkload { } else { tr.set(rkey, self->randomValue()); } - if (range_i == 0) - scr_start_key = rkey.toString(); + if (range_i == 0) scr_start_key = rkey.toString(); } scr_end_key = rkey.toString(); scr_key_range_ref = KeyRangeRef(KeyRef(scr_start_key), KeyRef(scr_end_key)); @@ -551,7 +585,7 @@ struct MakoWorkload : TestWorkload { } // successfully finish the transaction, update metrics ++self->xacts; - for (int op = 0; op < MAX_OP; ++op){ + for (int op = 0; op < MAX_OP; ++op) { self->opCounters[op] += perOpCount[op]; self->totalOps += perOpCount[op]; } @@ -571,18 +605,18 @@ struct MakoWorkload : TestWorkload { } } - ACTOR Future cleanup(Database cx, MakoWorkload* self){ + ACTOR Future cleanup(Database cx, MakoWorkload* self) { // clear all data starts with 'mako' in the database state std::string keyPrefix(self->keyPrefix); state ReadYourWritesTransaction tr(cx); - loop{ + loop { try { tr.clear(prefixRange(keyPrefix)); wait(tr.commit()); TraceEvent("CleanUpMakoRelatedData").detail("KeyPrefix", self->keyPrefix); break; - } catch (Error &e){ + } catch (Error& e) { TraceEvent("FailedToCleanData").error(e); wait(tr.onError(e)); } @@ -590,8 +624,8 @@ struct MakoWorkload : TestWorkload { return Void(); } - ACTOR template - static Future logLatency(Future f, ContinuousSample* opLatencies){ + ACTOR template + static Future logLatency(Future f, ContinuousSample* opLatencies) { state double opBegin = timer(); wait(success(f)); opLatencies->addSample(timer() - opBegin); @@ -600,7 +634,7 @@ struct MakoWorkload : TestWorkload { int64_t getRandomKeyIndex(uint64_t rowCount) { int64_t randomKeyIndex; - if (zipf){ + if (zipf) { randomKeyIndex = zipfian_next(); } else { randomKeyIndex = deterministicRandom()->randomInt64(0, rowCount); @@ -608,7 +642,7 @@ struct MakoWorkload : TestWorkload { return randomKeyIndex; } void parseOperationsSpec() { - const char *ptr = operationsSpec.c_str(); + const char* ptr = operationsSpec.c_str(); int op = 0; int rangeop = 0; int num; @@ -703,19 +737,21 @@ struct MakoWorkload : TestWorkload { } if (error) { - TraceEvent(SevError, "TestFailure").detail("Reason", "InvalidTransactionSpecification").detail("operations", operationsSpec); + TraceEvent(SevError, "TestFailure") + .detail("Reason", "InvalidTransactionSpecification") + .detail("operations", operationsSpec); } } - ACTOR static Future calcCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIndex){ + ACTOR static Future calcCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIndex) { state uint32_t result = 0; state int i; state Key csKey; - for( i = 0; i < self->csSize; ++i){ + for (i = 0; i < self->csSize; ++i) { int idx = csIndex * self->csStepSizeInPartition + i * self->csPartitionSize; csKey = self->keyForIndex(idx); Optional temp = wait(tr->get(csKey)); - if (temp.present()){ + if (temp.present()) { Value val = temp.get(); result = crc32c_append(result, val.begin(), val.size()); } else { @@ -736,23 +772,31 @@ struct MakoWorkload : TestWorkload { tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); for (csIdx = 0; csIdx < self->csCount; ++csIdx) { Optional temp = wait(tr.get(self->csKeys[csIdx])); - if (!temp.present()){ - TraceEvent(SevError, "TestFailure").detail("Reason", "NoExistingChecksum").detail("missedChecksumIndex", csIdx); + if (!temp.present()) { + TraceEvent(SevError, "TestFailure") + .detail("Reason", "NoExistingChecksum") + .detail("missedChecksumIndex", csIdx); return false; } else { csValue = temp.get(); ASSERT(csValue.size() == sizeof(uint32_t)); uint32_t calculatedCS = wait(calcCheckSum(&tr, self, csIdx)); uint32_t existingCS = *(reinterpret_cast(csValue.begin())); - if (existingCS != calculatedCS){ - TraceEvent(SevError, "TestFailure").detail("Reason", "ChecksumVerificationFailure").detail("ChecksumIndex", csIdx).detail("ExistingChecksum", existingCS).detail("CurrentChecksum", calculatedCS); + if (existingCS != calculatedCS) { + TraceEvent(SevError, "TestFailure") + .detail("Reason", "ChecksumVerificationFailure") + .detail("ChecksumIndex", csIdx) + .detail("ExistingChecksum", existingCS) + .detail("CurrentChecksum", calculatedCS); return false; } - TraceEvent("ChecksumVerificationPass").detail("ChecksumIndex", csIdx).detail("ChecksumValue", existingCS); + TraceEvent("ChecksumVerificationPass") + .detail("ChecksumIndex", csIdx) + .detail("ChecksumValue", existingCS); } } return true; - } catch(Error& e) { + } catch (Error& e) { TraceEvent("FailedToCalculateChecksum").detail("ChecksumIndex", csIdx).error(e); wait(tr.onError(e)); } @@ -772,7 +816,7 @@ struct MakoWorkload : TestWorkload { } wait(tr.commit()); break; - } catch (Error &e) { + } catch (Error& e) { TraceEvent("FailedToGenerateChecksumForPopulatedData").error(e); wait(tr.onError(e)); } @@ -780,20 +824,20 @@ struct MakoWorkload : TestWorkload { return Void(); } - ACTOR static Future updateCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIdx){ + ACTOR static Future updateCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIdx) { state uint32_t csVal = wait(calcCheckSum(tr, self, csIdx)); TraceEvent("UpdateCheckSum").detail("ChecksumIndex", csIdx).detail("Checksum", csVal); tr->set(self->csKeys[csIdx], ValueRef(reinterpret_cast(&csVal), sizeof(uint32_t))); return Void(); } - ACTOR static Future updateCSBeforeCommit(ReadYourWritesTransaction* tr, MakoWorkload* self, std::vector* flags){ - if (!self->checksumVerification) - return Void(); + ACTOR static Future updateCSBeforeCommit(ReadYourWritesTransaction* tr, MakoWorkload* self, + std::vector* flags) { + if (!self->checksumVerification) return Void(); state int csIdx; - for (csIdx = 0; csIdx < self->csCount; ++csIdx){ - if ((*flags)[csIdx]){ + for (csIdx = 0; csIdx < self->csCount; ++csIdx) { + if ((*flags)[csIdx]) { wait(updateCheckSum(tr, self, csIdx)); (*flags)[csIdx] = false; }