diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index af9ba32a31..2768a3e4c1 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -302,7 +302,8 @@ std::pair>, std::vector> server_id; return server_id; } + +std::pair serverKeysDecodeServerBegin(const KeyRef& key) { + UID server_id; + BinaryReader rd(key.removePrefix(serverKeysPrefix), Unversioned()); + rd >> server_id; + rd.readBytes(1); // skip "/" + const auto remainingBytes = rd.remainingBytes(); + KeyRef ref = KeyRef(rd.arenaRead(remainingBytes), remainingBytes); + // std::cout << ref.size() << " " << ref.toString() << std::endl; + return std::make_pair(server_id, Key(ref)); +} + bool serverHasKey(ValueRef storedValue) { return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange; } diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 705caf98f1..0dcf845ee6 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -99,11 +99,13 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector& serve // Using the serverID as a prefix, then followed by the beginning of the shard range // as the key, the value indicates whether the shard does or does not exist on the server. // These values can be changed as data movement occurs. +extern const KeyRangeRef serverKeysRange; extern const KeyRef serverKeysPrefix; extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse; const Key serverKeysKey(UID serverID, const KeyRef& keys); const Key serverKeysPrefixFor(UID serverID); UID serverKeysDecodeServer(const KeyRef& key); +std::pair serverKeysDecodeServerBegin(const KeyRef& key); bool serverHasKey(ValueRef storedValue); extern const KeyRangeRef conflictingKeysRange; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index f47e003a67..18a9e10b6d 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -267,6 +267,7 @@ set(FDBSERVER_SRCS workloads/ReadAfterWrite.actor.cpp workloads/ReadHotDetection.actor.cpp workloads/ReadWrite.actor.cpp + workloads/ReadWriteWorkload.actor.h workloads/RemoveServersSafely.actor.cpp workloads/ReportConflictingKeys.actor.cpp workloads/RestoreBackup.actor.cpp @@ -281,6 +282,7 @@ set(FDBSERVER_SRCS workloads/Sideband.actor.cpp workloads/SidebandSingle.actor.cpp workloads/SimpleAtomicAdd.actor.cpp + workloads/SkewedReadWrite.actor.cpp workloads/SlowTaskWorkload.actor.cpp workloads/SnapTest.actor.cpp workloads/SpecialKeySpaceCorrectness.actor.cpp diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 11851f100e..41b39605b5 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -99,6 +99,20 @@ Key KVWorkload::keyForIndex(uint64_t index) const { } } +int64_t KVWorkload::indexForKey(const KeyRef& key, bool absent) const { + int idx = 0; + if (nodePrefix > 0) { + ASSERT(keyBytes >= 32); + idx += 16; + } + ASSERT(keyBytes >= 16); + // extract int64_t index, the reverse process of emplaceIndex() + auto end = key.size() - idx - (absent ? 1 : 0); + std::string str((char*)key.begin() + idx, end); + int64_t res = std::stoll(str, nullptr, 16); + return res; +} + Key KVWorkload::keyForIndex(uint64_t index, bool absent) const { int adjustedKeyBytes = (absent) ? (keyBytes + 1) : keyBytes; Key result = makeString(adjustedKeyBytes); @@ -112,8 +126,8 @@ Key KVWorkload::keyForIndex(uint64_t index, bool absent) const { idx += 16; } ASSERT(keyBytes >= 16); - double d = double(index) / nodeCount; - emplaceIndex(data, idx, *(int64_t*)&d); + emplaceIndex(data, idx, (int64_t)index); + // ASSERT(indexForKey(result) == (int64_t)index); // debug assert return result; } diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp index 4741a01371..475c3a023c 100644 --- a/fdbserver/workloads/ReadWrite.actor.cpp +++ b/fdbserver/workloads/ReadWrite.actor.cpp @@ -28,202 +28,13 @@ #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/workloads/BulkSetup.actor.h" +#include "fdbserver/workloads/ReadWriteWorkload.actor.h" #include "fdbclient/ReadYourWrites.h" #include "flow/TDMetric.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. -const int sampleSize = 10000; -static Future nextRV; -static Version lastRV = invalidVersion; - -ACTOR static Future getNextRV(Database db) { - state Transaction tr(db); - loop { - try { - Version v = wait(tr.getReadVersion()); - return v; - } catch (Error& e) { - wait(tr.onError(e)); - } - } -} -static Future getInconsistentReadVersion(Database const& db) { - if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running - if (nextRV.isValid()) - lastRV = nextRV.get(); - nextRV = getNextRV(db); - } - if (lastRV == invalidVersion) - return nextRV; - else - return lastRV; -} - -DESCR struct TransactionSuccessMetric { - int64_t totalLatency; // ns - int64_t startLatency; // ns - int64_t commitLatency; // ns - int64_t retries; // count -}; - -DESCR struct TransactionFailureMetric { - int64_t startLatency; // ns - int64_t errorCode; // flow error code -}; - -DESCR struct ReadMetric { - int64_t readLatency; // ns -}; - -struct ReadWriteWorkload : KVWorkload { - int readsPerTransactionA, writesPerTransactionA; - int readsPerTransactionB, writesPerTransactionB; - int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction; - double testDuration, transactionsPerSecond, alpha, warmingDelay, loadTime, maxInsertRate, debugInterval, debugTime; - double metricsStart, metricsDuration, clientBegin; - std::string valueString; - - bool dependentReads; - bool enableReadLatencyLogging; - double periodicLoggingInterval; - bool cancelWorkersAtDuration; - bool inconsistentReads; - bool adjacentReads; - bool adjacentWrites; - bool rampUpLoad; - int rampSweepCount; - double hotKeyFraction, forceHotProbability; - bool rangeReads; - bool useRYW; - bool rampTransactionType; - bool rampUpConcurrency; - bool batchPriority; - - Standalone descriptionString; - - Int64MetricHandle totalReadsMetric; - Int64MetricHandle totalRetriesMetric; - EventMetricHandle transactionSuccessMetric; - EventMetricHandle transactionFailureMetric; - EventMetricHandle readMetric; - - std::vector> clients; - PerfIntCounter aTransactions, bTransactions, retries; - ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies; - double readLatencyTotal; - int readLatencyCount; - - std::vector insertionCountsToMeasure; - std::vector> ratesAtKeyCounts; - - std::vector periodicMetrics; - - bool doSetup; - - ReadWriteWorkload(WorkloadContext const& wcx) - : KVWorkload(wcx), loadTime(0.0), clientBegin(0), dependentReads(false), adjacentReads(false), - adjacentWrites(false), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), - totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"), - bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), - commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), - readLatencyCount(0) { - transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction")); - transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction")); - readMetric.init(LiteralStringRef("RWWorkload.Read")); - - testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); - transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount; - double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250); - actorCount = ceil(transactionsPerSecond * allowedLatency); - actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount); - - readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10); - writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0); - readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1); - writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9); - alpha = getOption(options, LiteralStringRef("alpha"), 0.1); - - extraReadConflictRangesPerTransaction = - getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0); - extraWriteConflictRangesPerTransaction = - getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0); - - valueString = std::string(maxValueBytes, '.'); - if (nodePrefix > 0) { - keyBytes += 16; - } - - metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0); - metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration); - if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) { - // discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test - metricsStart += testDuration * 0.125; - metricsDuration *= 0.75; - } - - dependentReads = getOption(options, LiteralStringRef("dependentReads"), false); - warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0); - maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12); - debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0); - debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0); - enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false); - periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0); - cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true); - inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false); - adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false); - adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false); - rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false); - useRYW = getOption(options, LiteralStringRef("useRYW"), false); - rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1); - rangeReads = getOption(options, LiteralStringRef("rangeReads"), false); - rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false); - rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false); - doSetup = getOption(options, LiteralStringRef("setup"), true); - batchPriority = getOption(options, LiteralStringRef("batchPriority"), false); - descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite")); - - if (rampUpConcurrency) - ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down - - // Validate that keyForIndex() is monotonic - for (int i = 0; i < 30; i++) { - int64_t a = deterministicRandom()->randomInt64(0, nodeCount); - int64_t b = deterministicRandom()->randomInt64(0, nodeCount); - if (a > b) { - std::swap(a, b); - } - ASSERT(a <= b); - ASSERT((keyForIndex(a, false) <= keyForIndex(b, false))); - } - - std::vector insertionCountsToMeasureString = - getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector()); - for (int i = 0; i < insertionCountsToMeasureString.size(); i++) { - try { - uint64_t count = boost::lexical_cast(insertionCountsToMeasureString[i]); - insertionCountsToMeasure.push_back(count); - } catch (...) { - } - } - - { - // with P(hotTrafficFraction) an access is directed to one of a fraction - // of hot keys, else it is directed to a disjoint set of cold keys - hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0); - double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0); - ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1); - ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot! - // p(Cold key) = (1-FHP) * (1-hkf) - // p(Cold key) = (1-htf) - // solving for FHP gives: - forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction); - } - } - - std::string description() const override { return descriptionString.toString(); } - Future setup(Database const& cx) override { return _setup(cx, this); } - Future start(Database const& cx) override { return _start(cx, this); } - +struct ReadWriteCommonImpl { + // trace methods ACTOR static Future traceDumpWorkers(Reference const> db) { try { loop { @@ -250,91 +61,7 @@ struct ReadWriteWorkload : KVWorkload { throw; } } - - Future check(Database const& cx) override { - clients.clear(); - - if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration) - metricsDuration = now() - metricsStart; - - g_traceBatch.dump(); - if (clientId == 0) - return traceDumpWorkers(dbInfo); - else - return true; - } - - void getMetrics(std::vector& m) override { - double duration = metricsDuration; - int reads = - (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB); - int writes = - (aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB); - m.emplace_back("Measured Duration", duration, Averaged::True); - m.emplace_back( - "Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False); - m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False); - m.push_back(aTransactions.getMetric()); - m.push_back(bTransactions.getMetric()); - m.push_back(retries.getMetric()); - m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True); - m.emplace_back("Read rows", reads, Averaged::False); - m.emplace_back("Write rows", writes, Averaged::False); - - if (!rampUpLoad) { - m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True); - m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True); - m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True); - m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True); - m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True); - - m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True); - m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True); - m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True); - - m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True); - m.emplace_back( - "Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True); - m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True); - - m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True); - m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True); - m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True); - - m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True); - m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True); - m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True); - } - - m.emplace_back("Read rows/sec", reads / duration, Averaged::False); - m.emplace_back("Write rows/sec", writes / duration, Averaged::False); - m.emplace_back( - "Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); - m.emplace_back("Bytes written/sec", - (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, - Averaged::False); - m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end()); - - std::vector>::iterator ratesItr = ratesAtKeyCounts.begin(); - for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) - m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False); - } - - Value randomValue() { - return StringRef((uint8_t*)valueString.c_str(), - deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1)); - } - - Standalone operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); } - - template - void setupTransaction(Trans* tr) { - if (batchPriority) { - tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); - } - } - - ACTOR static Future tracePeriodically(ReadWriteWorkload* self) { + ACTOR static Future tracePeriodically(ReadWriteCommon* self) { state double start = now(); state double elapsed = 0.0; state int64_t last_ops = 0; @@ -470,7 +197,6 @@ struct ReadWriteWorkload : KVWorkload { self->readLatencyCount = 0; } } - ACTOR static Future logLatency(Future> f, ContinuousSample* latencies, double* totalLatency, @@ -491,7 +217,6 @@ struct ReadWriteWorkload : KVWorkload { } return Void(); } - ACTOR static Future logLatency(Future f, ContinuousSample* latencies, double* totalLatency, @@ -513,52 +238,7 @@ struct ReadWriteWorkload : KVWorkload { return Void(); } - ACTOR template - Future readOp(Trans* tr, std::vector keys, ReadWriteWorkload* self, bool shouldRecord) { - if (!keys.size()) - return Void(); - if (!self->dependentReads) { - std::vector> readers; - if (self->rangeReads) { - for (int op = 0; op < keys.size(); op++) { - ++self->totalReadsMetric; - readers.push_back(logLatency( - tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))), - GetRangeLimits(-1, 80000)), - &self->readLatencies, - &self->readLatencyTotal, - &self->readLatencyCount, - self->readMetric, - shouldRecord)); - } - } else { - for (int op = 0; op < keys.size(); op++) { - ++self->totalReadsMetric; - readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])), - &self->readLatencies, - &self->readLatencyTotal, - &self->readLatencyCount, - self->readMetric, - shouldRecord)); - } - } - wait(waitForAll(readers)); - } else { - state int op; - for (op = 0; op < keys.size(); op++) { - ++self->totalReadsMetric; - wait(logLatency(tr->get(self->keyForIndex(keys[op])), - &self->readLatencies, - &self->readLatencyTotal, - &self->readLatencyCount, - self->readMetric, - shouldRecord)); - } - } - return Void(); - } - - ACTOR Future _setup(Database cx, ReadWriteWorkload* self) { + ACTOR static Future setup(Database cx, ReadWriteCommon* self) { if (!self->doSetup) return Void(); @@ -580,8 +260,232 @@ struct ReadWriteWorkload : KVWorkload { return Void(); } +}; - ACTOR Future _start(Database cx, ReadWriteWorkload* self) { +Future ReadWriteCommon::tracePeriodically() { + return ReadWriteCommonImpl::tracePeriodically(this); +} + +Future ReadWriteCommon::logLatency(Future> f, bool shouldRecord) { + return ReadWriteCommonImpl::logLatency( + f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord); +} + +Future ReadWriteCommon::logLatency(Future f, bool shouldRecord) { + return ReadWriteCommonImpl::logLatency( + f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord); +} + +Future ReadWriteCommon::setup(Database const& cx) { + return ReadWriteCommonImpl::setup(cx, this); +} + +Future ReadWriteCommon::check(Database const& cx) { + clients.clear(); + + if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration) + metricsDuration = now() - metricsStart; + + g_traceBatch.dump(); + if (clientId == 0) + return ReadWriteCommonImpl::traceDumpWorkers(dbInfo); + else + return true; +} + +void ReadWriteCommon::getMetrics(std::vector& m) { + double duration = metricsDuration; + int reads = (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB); + int writes = + (aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB); + m.emplace_back("Measured Duration", duration, Averaged::True); + m.emplace_back( + "Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False); + m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False); + m.push_back(aTransactions.getMetric()); + m.push_back(bTransactions.getMetric()); + m.push_back(retries.getMetric()); + m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True); + m.emplace_back("Read rows", reads, Averaged::False); + m.emplace_back("Write rows", writes, Averaged::False); + m.emplace_back("Read rows/sec", reads / duration, Averaged::False); + m.emplace_back("Write rows/sec", writes / duration, Averaged::False); + m.emplace_back( + "Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); + m.emplace_back( + "Bytes written/sec", (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); + m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end()); + + std::vector>::iterator ratesItr = ratesAtKeyCounts.begin(); + for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) + m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False); +} + +Value ReadWriteCommon::randomValue() { + return StringRef((uint8_t*)valueString.c_str(), deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1)); +} + +Standalone ReadWriteCommon::operator()(uint64_t n) { + return KeyValueRef(keyForIndex(n, false), randomValue()); +} + +bool ReadWriteCommon::shouldRecord(double checkTime) { + double timeSinceStart = checkTime - clientBegin; + return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration); +} + +static Future nextRV; +static Version lastRV = invalidVersion; + +ACTOR static Future getNextRV(Database db) { + state Transaction tr(db); + loop { + try { + Version v = wait(tr.getReadVersion()); + return v; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +static Future getInconsistentReadVersion(Database const& db) { + if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running + if (nextRV.isValid()) + lastRV = nextRV.get(); + nextRV = getNextRV(db); + } + if (lastRV == invalidVersion) + return nextRV; + else + return lastRV; +} + +struct ReadWriteWorkload : ReadWriteCommon { + // use ReadWrite as a ramp up workload + bool rampUpLoad; // indicate this is a ramp up workload + int rampSweepCount; // how many times of ramp up + bool rampTransactionType; // choose transaction type based on client start time + bool rampUpConcurrency; // control client concurrency + + // transaction setting + bool batchPriority; + bool rangeReads; // read operations are all single key range read + bool dependentReads; // read operations are issued sequentially + bool inconsistentReads; // read with previous read version + bool adjacentReads; // keys are adjacent within a transaction + bool adjacentWrites; + int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction; + + // hot traffic pattern + double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting + + ReadWriteWorkload(WorkloadContext const& wcx) + : ReadWriteCommon(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false) { + extraReadConflictRangesPerTransaction = + getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0); + extraWriteConflictRangesPerTransaction = + getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0); + dependentReads = getOption(options, LiteralStringRef("dependentReads"), false); + inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false); + adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false); + adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false); + rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false); + rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1); + rangeReads = getOption(options, LiteralStringRef("rangeReads"), false); + rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false); + rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false); + batchPriority = getOption(options, LiteralStringRef("batchPriority"), false); + descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite")); + + if (rampUpConcurrency) + ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down + + { + // with P(hotTrafficFraction) an access is directed to one of a fraction + // of hot keys, else it is directed to a disjoint set of cold keys + hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0); + double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0); + ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1); + ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot! + // p(Cold key) = (1-FHP) * (1-hkf) + // p(Cold key) = (1-htf) + // solving for FHP gives: + forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction); + } + } + + std::string description() const override { return descriptionString.toString(); } + + template + void setupTransaction(Trans* tr) { + if (batchPriority) { + tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); + } + } + + void getMetrics(std::vector& m) override { + ReadWriteCommon::getMetrics(m); + if (!rampUpLoad) { + m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True); + m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True); + m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True); + m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True); + m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True); + + m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True); + m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True); + m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True); + + m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True); + m.emplace_back( + "Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True); + m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True); + + m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True); + m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True); + m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True); + + m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True); + m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True); + m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True); + } + } + + Future start(Database const& cx) override { return _start(cx, this); } + + ACTOR template + static Future readOp(Trans* tr, std::vector keys, ReadWriteWorkload* self, bool shouldRecord) { + if (!keys.size()) + return Void(); + if (!self->dependentReads) { + std::vector> readers; + if (self->rangeReads) { + for (int op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + readers.push_back(self->logLatency( + tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))), + GetRangeLimits(-1, 80000)), + shouldRecord)); + } + } else { + for (int op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord)); + } + } + wait(waitForAll(readers)); + } else { + state int op; + for (op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + wait(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord)); + } + } + return Void(); + } + + ACTOR static Future _start(Database cx, ReadWriteWorkload* self) { // Read one record from the database to warm the cache of keyServers state std::vector keys; keys.push_back(deterministicRandom()->randomInt64(0, self->nodeCount)); @@ -603,7 +507,7 @@ struct ReadWriteWorkload : KVWorkload { std::vector> clients; if (self->enableReadLatencyLogging) - clients.push_back(tracePeriodically(self)); + clients.push_back(self->tracePeriodically()); self->clientBegin = now(); for (int c = 0; c < self->actorCount; c++) { @@ -625,13 +529,6 @@ struct ReadWriteWorkload : KVWorkload { return Void(); } - bool shouldRecord() { return shouldRecord(now()); } - - bool shouldRecord(double checkTime) { - double timeSinceStart = checkTime - clientBegin; - return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration); - } - int64_t getRandomKey(uint64_t nodeCount) { if (forceHotProbability && deterministicRandom()->random01() < forceHotProbability) return deterministicRandom()->randomInt64(0, nodeCount * hotKeyFraction) / diff --git a/fdbserver/workloads/ReadWriteWorkload.actor.h b/fdbserver/workloads/ReadWriteWorkload.actor.h new file mode 100644 index 0000000000..fe33be0213 --- /dev/null +++ b/fdbserver/workloads/ReadWriteWorkload.actor.h @@ -0,0 +1,171 @@ +/* + * ReadWriteWorkload.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H) +#define FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H +#include "fdbserver/workloads/ReadWriteWorkload.actor.g.h" +#elif !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_H) +#define FDBSERVER_READWRITEWORKLOAD_ACTOR_H + +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/TDMetric.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. +DESCR struct TransactionSuccessMetric { + int64_t totalLatency; // ns + int64_t startLatency; // ns + int64_t commitLatency; // ns + int64_t retries; // count +}; + +DESCR struct TransactionFailureMetric { + int64_t startLatency; // ns + int64_t errorCode; // flow error code +}; + +DESCR struct ReadMetric { + int64_t readLatency; // ns +}; + +// Common ReadWrite test settings +struct ReadWriteCommon : KVWorkload { + static constexpr int sampleSize = 10000; + friend struct ReadWriteCommonImpl; + + // general test setting + Standalone descriptionString; + bool doSetup, cancelWorkersAtDuration; + double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime; + double metricsStart, metricsDuration; + std::vector insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup + + // test log setting + bool enableReadLatencyLogging; + double periodicLoggingInterval; + + // two type of transaction + int readsPerTransactionA, writesPerTransactionA; + int readsPerTransactionB, writesPerTransactionB; + std::string valueString; + double alpha; // probability for run TransactionA type + // transaction setting + bool useRYW; + + // states of metric + Int64MetricHandle totalReadsMetric; + Int64MetricHandle totalRetriesMetric; + EventMetricHandle transactionSuccessMetric; + EventMetricHandle transactionFailureMetric; + EventMetricHandle readMetric; + PerfIntCounter aTransactions, bTransactions, retries; + ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies; + double readLatencyTotal; + int readLatencyCount; + std::vector periodicMetrics; + std::vector> ratesAtKeyCounts; // sequential insertion speed + + // other internal states + std::vector> clients; + double loadTime, clientBegin; + + explicit ReadWriteCommon(WorkloadContext const& wcx) + : KVWorkload(wcx), totalReadsMetric(LiteralStringRef("ReadWrite.TotalReads")), + totalRetriesMetric(LiteralStringRef("ReadWrite.TotalRetries")), aTransactions("A Transactions"), + bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), + commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), + readLatencyCount(0), loadTime(0.0), clientBegin(0) { + + transactionSuccessMetric.init(LiteralStringRef("ReadWrite.SuccessfulTransaction")); + transactionFailureMetric.init(LiteralStringRef("ReadWrite.FailedTransaction")); + readMetric.init(LiteralStringRef("ReadWrite.Read")); + + testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); + transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount; + double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250); + actorCount = ceil(transactionsPerSecond * allowedLatency); + actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount); + + readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10); + writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0); + readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1); + writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9); + alpha = getOption(options, LiteralStringRef("alpha"), 0.1); + + valueString = std::string(maxValueBytes, '.'); + if (nodePrefix > 0) { + keyBytes += 16; + } + + metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0); + metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration); + if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) { + // discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test + metricsStart += testDuration * 0.125; + metricsDuration *= 0.75; + } + + warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0); + maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12); + debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0); + debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0); + enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false); + periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0); + cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true); + + useRYW = getOption(options, LiteralStringRef("useRYW"), false); + doSetup = getOption(options, LiteralStringRef("setup"), true); + + // Validate that keyForIndex() is monotonic + for (int i = 0; i < 30; i++) { + int64_t a = deterministicRandom()->randomInt64(0, nodeCount); + int64_t b = deterministicRandom()->randomInt64(0, nodeCount); + if (a > b) { + std::swap(a, b); + } + ASSERT(a <= b); + ASSERT((keyForIndex(a, false) <= keyForIndex(b, false))); + } + + std::vector insertionCountsToMeasureString = + getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector()); + for (int i = 0; i < insertionCountsToMeasureString.size(); i++) { + try { + uint64_t count = boost::lexical_cast(insertionCountsToMeasureString[i]); + insertionCountsToMeasure.push_back(count); + } catch (...) { + } + } + } + + Future tracePeriodically(); + Future logLatency(Future> f, bool shouldRecord); + Future logLatency(Future f, bool shouldRecord); + + Future setup(Database const& cx) override; + Future check(Database const& cx) override; + void getMetrics(std::vector& m) override; + + Standalone operator()(uint64_t n); + bool shouldRecord(double checkTime = now()); + Value randomValue(); +}; + +#include "flow/unactorcompiler.h" +#endif // FDBSERVER_READWRITEWORKLOAD_ACTOR_H diff --git a/fdbserver/workloads/SkewedReadWrite.actor.cpp b/fdbserver/workloads/SkewedReadWrite.actor.cpp new file mode 100644 index 0000000000..78576f957f --- /dev/null +++ b/fdbserver/workloads/SkewedReadWrite.actor.cpp @@ -0,0 +1,386 @@ +/* + * ReadWrite.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "fdbrpc/ContinuousSample.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbserver/TesterInterface.actor.h" +#include "fdbserver/WorkerInterface.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/workloads/BulkSetup.actor.h" +#include "fdbserver/workloads/ReadWriteWorkload.actor.h" +#include "fdbclient/ReadYourWrites.h" +#include "flow/TDMetric.actor.h" +#include "fdbclient/RunTransaction.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +struct SkewedReadWriteWorkload : ReadWriteCommon { + // server based hot traffic setting + int skewRound = 0; // skewDuration = ceil(testDuration / skewRound) + double hotServerFraction = 0, hotServerShardFraction = 1.0; // set > 0 to issue hot key based on shard map + double hotServerReadFrac, hotServerWriteFrac; // hot many traffic goes to hot servers + double hotReadWriteServerOverlap; // the portion of intersection of write and hot server + + // hot server state + typedef std::vector> IndexRangeVec; + // keyForIndex generate key from index. So for a shard range, recording the start and end is enough + std::vector> serverShards; // storage server and the shards it owns + std::map serverInterfaces; + int hotServerCount = 0, currentHotRound = -1; + + SkewedReadWriteWorkload(WorkloadContext const& wcx) : ReadWriteCommon(wcx) { + descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("SkewedReadWrite")); + hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2); + hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0); + hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0); + skewRound = getOption(options, "skewRound"_sr, 1); + hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8); + hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0); + ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) && skewRound > 0); + } + + std::string description() const override { return descriptionString.toString(); } + Future start(Database const& cx) override { return _start(cx, this); } + + void debugPrintServerShards() const { + std::cout << std::hex; + for (auto it : this->serverShards) { + std::cout << serverInterfaces.at(it.first).address().toString() << ": ["; + for (auto p : it.second) { + std::cout << "[" << p.first << "," << p.second << "], "; + } + std::cout << "] \n"; + } + } + + // for each boundary except the last one in boundaries, found the first existed key generated from keyForIndex as + // beginIdx, found the last existed key generated from keyForIndex the endIdx. + ACTOR static Future convertKeyBoundaryToIndexShard(Database cx, + SkewedReadWriteWorkload* self, + Standalone> boundaries) { + state IndexRangeVec res; + state int i = 0; + for (; i < boundaries.size() - 1; ++i) { + KeyRangeRef currentShard = KeyRangeRef(boundaries[i], boundaries[i + 1]); + // std::cout << currentShard.toString() << "\n"; + std::vector ranges = wait(runRYWTransaction( + cx, [currentShard](Reference tr) -> Future> { + std::vector> f; + f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::False)); + f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::True)); + return getAll(f); + })); + ASSERT(ranges[0].size() == 1 && ranges[1].size() == 1); + res.emplace_back(self->indexForKey(ranges[0][0].key), self->indexForKey(ranges[1][0].key)); + } + + ASSERT(res.size() == boundaries.size() - 1); + return res; + } + + ACTOR static Future updateServerShards(Database cx, SkewedReadWriteWorkload* self) { + state Future serverList = + runRYWTransaction(cx, [](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + return tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY); + }); + state RangeResult range = + wait(runRYWTransaction(cx, [](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + return tr->getRange(serverKeysRange, CLIENT_KNOBS->TOO_MANY); + })); + wait(success(serverList)); + // decode server interfaces + self->serverInterfaces.clear(); + for (int i = 0; i < serverList.get().size(); i++) { + auto ssi = decodeServerListValue(serverList.get()[i].value); + self->serverInterfaces.emplace(ssi.id(), ssi); + } + // clear self->serverShards + self->serverShards.clear(); + + // leftEdge < workloadBegin < workloadEnd + Key workloadBegin = self->keyForIndex(0), workloadEnd = self->keyForIndex(self->nodeCount); + Key leftEdge(allKeys.begin); + std::vector leftServer; // left server owns the range [leftEdge, workloadBegin) + KeyRangeRef workloadRange(workloadBegin, workloadEnd); + state std::map> beginServers; // begin index to server ID + + for (auto kv = range.begin(); kv != range.end(); kv++) { + if (serverHasKey(kv->value)) { + auto [id, key] = serverKeysDecodeServerBegin(kv->key); + + if (workloadRange.contains(key)) { + beginServers[key].push_back(id); + } else if (workloadBegin > key && key > leftEdge) { // update left boundary + leftEdge = key; + leftServer.clear(); + } + + if (key == leftEdge) { + leftServer.push_back(id); + } + } + } + ASSERT(beginServers.size() == 0 || beginServers.begin()->first >= workloadBegin); + // handle the left boundary + if (beginServers.size() == 0 || beginServers.begin()->first > workloadBegin) { + beginServers[workloadBegin] = leftServer; + } + Standalone> keyBegins; + for (auto p = beginServers.begin(); p != beginServers.end(); ++p) { + keyBegins.push_back(keyBegins.arena(), p->first); + } + // deep count because wait below will destruct workloadEnd + keyBegins.push_back_deep(keyBegins.arena(), workloadEnd); + + IndexRangeVec indexShards = wait(convertKeyBoundaryToIndexShard(cx, self, keyBegins)); + ASSERT(beginServers.size() == indexShards.size()); + // sort shard begin idx + // build self->serverShards, starting from the left shard + std::map serverShards; + int i = 0; + for (auto p = beginServers.begin(); p != beginServers.end(); ++p) { + for (int j = 0; j < p->second.size(); ++j) { + serverShards[p->second[j]].emplace_back(indexShards[i]); + } + ++i; + } + // self->serverShards is ordered by UID + for (auto it : serverShards) { + self->serverShards.emplace_back(it); + } + // if (self->clientId == 0) { + // self->debugPrintServerShards(); + // } + return Void(); + } + + ACTOR template + Future readOp(Trans* tr, std::vector keys, SkewedReadWriteWorkload* self, bool shouldRecord) { + if (!keys.size()) + return Void(); + + std::vector> readers; + for (int op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord)); + } + + wait(waitForAll(readers)); + return Void(); + } + + void startReadWriteClients(Database cx, std::vector>& clients) { + clientBegin = now(); + for (int c = 0; c < actorCount; c++) { + Future worker; + if (useRYW) + worker = + randomReadWriteClient(cx, this, actorCount / transactionsPerSecond, c); + else + worker = randomReadWriteClient(cx, this, actorCount / transactionsPerSecond, c); + clients.push_back(worker); + } + } + + ACTOR static Future _start(Database cx, SkewedReadWriteWorkload* self) { + state std::vector> clients; + if (self->enableReadLatencyLogging) + clients.push_back(self->tracePeriodically()); + + wait(updateServerShards(cx, self)); + for (self->currentHotRound = 0; self->currentHotRound < self->skewRound; ++self->currentHotRound) { + self->setHotServers(); + self->startReadWriteClients(cx, clients); + wait(timeout(waitForAll(clients), self->testDuration / self->skewRound, Void())); + clients.clear(); + wait(delay(5.0) >> updateServerShards(cx, self)); + } + + return Void(); + } + + // calculate hot server count + void setHotServers() { + hotServerCount = ceil(hotServerFraction * serverShards.size()); + std::cout << "Choose " << hotServerCount << "/" << serverShards.size() << "/" << serverInterfaces.size() + << " hot servers: ["; + int begin = currentHotRound * hotServerCount; + for (int i = 0; i < hotServerCount; ++i) { + int idx = (begin + i) % serverShards.size(); + std::cout << serverInterfaces.at(serverShards[idx].first).address().toString() << ","; + } + std::cout << "]\n"; + } + + int64_t getRandomKeyFromHotServer(bool hotServerRead = true) { + ASSERT(hotServerCount > 0); + int begin = currentHotRound * hotServerCount; + if (!hotServerRead) { + begin += hotServerCount * (1.0 - hotReadWriteServerOverlap); // calculate non-overlap part offset + } + int idx = deterministicRandom()->randomInt(begin, begin + hotServerCount) % serverShards.size(); + int shardMax = std::min(serverShards[idx].second.size(), + (size_t)ceil(serverShards[idx].second.size() * hotServerShardFraction)); + int shardIdx = deterministicRandom()->randomInt(0, shardMax); + return deterministicRandom()->randomInt64(serverShards[idx].second[shardIdx].first, + serverShards[idx].second[shardIdx].second + 1); + } + + int64_t getRandomKey(uint64_t nodeCount, bool hotServerRead = true) { + auto random = deterministicRandom()->random01(); + if (hotServerFraction > 0) { + if ((hotServerRead && random < hotServerReadFrac) || (!hotServerRead && random < hotServerWriteFrac)) { + return getRandomKeyFromHotServer(hotServerRead); + } + } + return deterministicRandom()->randomInt64(0, nodeCount); + } + + ACTOR template + Future randomReadWriteClient(Database cx, SkewedReadWriteWorkload* self, double delay, int clientIndex) { + state double startTime = now(); + state double lastTime = now(); + state double GRVStartTime; + state UID debugID; + + loop { + wait(poisson(&lastTime, delay)); + + state double tstart = now(); + state bool aTransaction = deterministicRandom()->random01() > self->alpha; + + state std::vector keys; + state std::vector values; + state std::vector extra_ranges; + int reads = aTransaction ? self->readsPerTransactionA : self->readsPerTransactionB; + state int writes = aTransaction ? self->writesPerTransactionA : self->writesPerTransactionB; + for (int op = 0; op < reads; op++) + keys.push_back(self->getRandomKey(self->nodeCount)); + + values.reserve(writes); + for (int op = 0; op < writes; op++) + values.push_back(self->randomValue()); + + state Trans tr(cx); + + if (tstart - self->clientBegin > self->debugTime && + tstart - self->clientBegin <= self->debugTime + self->debugInterval) { + debugID = deterministicRandom()->randomUniqueID(); + tr.debugTransaction(debugID); + g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before"); + } else { + debugID = UID(); + } + + self->transactionSuccessMetric->retries = 0; + self->transactionSuccessMetric->commitLatency = -1; + + loop { + try { + GRVStartTime = now(); + self->transactionFailureMetric->startLatency = -1; + + double grvLatency = now() - GRVStartTime; + self->transactionSuccessMetric->startLatency = grvLatency * 1e9; + self->transactionFailureMetric->startLatency = grvLatency * 1e9; + if (self->shouldRecord()) + self->GRVLatencies.addSample(grvLatency); + + state double readStart = now(); + wait(self->readOp(&tr, keys, self, self->shouldRecord())); + + double readLatency = now() - readStart; + if (self->shouldRecord()) + self->fullReadLatencies.addSample(readLatency); + + if (!writes) + break; + + for (int op = 0; op < writes; op++) + tr.set(self->keyForIndex(self->getRandomKey(self->nodeCount, false), false), values[op]); + + state double commitStart = now(); + wait(tr.commit()); + + double commitLatency = now() - commitStart; + self->transactionSuccessMetric->commitLatency = commitLatency * 1e9; + if (self->shouldRecord()) + self->commitLatencies.addSample(commitLatency); + + break; + } catch (Error& e) { + self->transactionFailureMetric->errorCode = e.code(); + self->transactionFailureMetric->log(); + + wait(tr.onError(e)); + + ++self->transactionSuccessMetric->retries; + ++self->totalRetriesMetric; + + if (self->shouldRecord()) + ++self->retries; + } + } + + if (debugID != UID()) + g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After"); + + tr = Trans(); + + double transactionLatency = now() - tstart; + self->transactionSuccessMetric->totalLatency = transactionLatency * 1e9; + self->transactionSuccessMetric->log(); + + if (self->shouldRecord()) { + if (aTransaction) + ++self->aTransactions; + else + ++self->bTransactions; + + self->latencies.addSample(transactionLatency); + } + } + } +}; + +WorkloadFactory SkewedReadWriteWorkloadFactory("SkewedReadWrite"); + +TEST_CASE("/KVWorkload/methods/ParseKeyForIndex") { + auto wk = SkewedReadWriteWorkload(WorkloadContext()); + for (int i = 0; i < 1000; ++i) { + auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount); + Key k = wk.keyForIndex(idx); + auto parse = wk.indexForKey(k); + // std::cout << parse << " " << idx << "\n"; + ASSERT(parse == idx); + } + for (int i = 0; i < 1000; ++i) { + auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount); + Key k = wk.keyForIndex(idx, true); + auto parse = wk.indexForKey(k, true); + ASSERT(parse == idx); + } + return Void(); +} \ No newline at end of file diff --git a/fdbserver/workloads/workloads.actor.h b/fdbserver/workloads/workloads.actor.h index bdbbd5707c..df08911fc7 100644 --- a/fdbserver/workloads/workloads.actor.h +++ b/fdbserver/workloads/workloads.actor.h @@ -131,6 +131,8 @@ struct KVWorkload : TestWorkload { Key getRandomKey(bool absent) const; Key keyForIndex(uint64_t index) const; Key keyForIndex(uint64_t index, bool absent) const; + // the reverse process of keyForIndex() without division. Set absent=true to ignore the last byte in Key + int64_t indexForKey(const KeyRef& key, bool absent = false) const; }; struct IWorkloadFactory : ReferenceCounted { diff --git a/flow/serialize.h b/flow/serialize.h index db58461eed..5c218b9bc6 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -634,6 +634,8 @@ public: check = nullptr; } + size_t remainingBytes() const { return end - begin; }; + protected: _Reader(const char* begin, const char* end) : begin(begin), end(end) {} _Reader(const char* begin, const char* end, const Arena& arena) : begin(begin), end(end), m_pool(arena) {} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e358bbf1bc..f9695703e6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -211,6 +211,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.toml) add_fdb_test(TEST_FILES rare/RYWDisable.toml) add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml) + add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml) add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml) add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml) add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml) diff --git a/tests/rare/ReadSkewReadWrite.toml b/tests/rare/ReadSkewReadWrite.toml new file mode 100644 index 0000000000..207ecd6c63 --- /dev/null +++ b/tests/rare/ReadSkewReadWrite.toml @@ -0,0 +1,24 @@ +[[test]] +testTitle = 'SkewedReadWriteTest' +connectionFailuresDisableDuration = 100000 +# waitForQuiescenceBegin= false +# waitForQuiescenceEnd=false +clearAfterTest = true +runSetup = true # false +timeout = 3600.0 + +[[test.workload]] +testName = 'SkewedReadWrite' +transactionsPerSecond = 100 +testDuration = 40.0 +skewRound = 1 +nodeCount = 3000 # 30000000 +valueBytes = 100 +readsPerTransactionA = 8 +writesPerTransactionA = 0 +alpha = 0 +discardEdgeMeasurements = false +hotServerFraction = 0.2 +hotServerReadFrac = 0.8 +# hotServerShardFraction = 0.3 +warmingDelay = 180.0