diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index b23b5b8d87..56f47f69b5 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -38,12 +38,13 @@ std::unordered_map SpecialKeySpace::moduleToB // orEqual == false && offset == 1 (Standard form) // If the corresponding key is not in the underlying key range, it will move over the range ACTOR Future moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw, - KeySelector* ks) { + KeySelector* ks, Optional>* cache) { ASSERT(!ks->orEqual); // should be removed before calling ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized state Key startKey(skrImpl->getKeyRange().begin); state Key endKey(skrImpl->getKeyRange().end); + state Standalone result; if (ks->offset < 1) { // less than the given key @@ -60,7 +61,15 @@ ACTOR Future moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* .detail("SpecialKeyRangeStart", skrImpl->getKeyRange().begin) .detail("SpecialKeyRangeEnd", skrImpl->getKeyRange().end); - Standalone result = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey))); + if (skrImpl->isAsync()) { + const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast(skrImpl); + Standalone result_ = wait(ptr->getRange(ryw, KeyRangeRef(startKey, endKey), cache)); + result = result_; + } else { + Standalone result_ = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey))); + result = result_; + } + if (result.size() == 0) { TraceEvent(SevDebug, "ZeroElementsIntheRange").detail("Start", startKey).detail("End", endKey); return Void(); @@ -107,7 +116,8 @@ void onModuleRead(ReadYourWritesTransaction* ryw, SpecialKeySpace::MODULE module // to maintain; Thus, separate each part to make the code easy to understand and more compact ACTOR Future normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeySelector* ks, Optional* lastModuleRead, int* actualOffset, - Standalone* result) { + Standalone* result, + Optional>* cache) { state RangeMap::Iterator iter = ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey()) : sks->getImpls().rangeContaining(ks->getKey()); @@ -115,7 +125,7 @@ ACTOR Future normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite (ks->offset > 1 && iter != sks->getImpls().ranges().end())) { onModuleRead(ryw, sks->getModules().rangeContaining(iter->begin())->value(), *lastModuleRead); if (iter->value() != nullptr) { - wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks)); + wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks, cache)); } ks->offset < 1 ? --iter : ++iter; } @@ -164,13 +174,16 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr // This function handles ranges which cover more than one keyrange and aggregates all results // KeySelector, GetRangeLimits and reverse are all handled here state Standalone result; + state Standalone pairs; state RangeMap::Iterator iter; state int actualBeginOffset; state int actualEndOffset; state Optional lastModuleRead; + // used to cache result from potential first read + state Optional> cache; - wait(normalizeKeySelectorActor(sks, ryw, &begin, &lastModuleRead, &actualBeginOffset, &result)); - wait(normalizeKeySelectorActor(sks, ryw, &end, &lastModuleRead, &actualEndOffset, &result)); + wait(normalizeKeySelectorActor(sks, ryw, &begin, &lastModuleRead, &actualBeginOffset, &result, &cache)); + wait(normalizeKeySelectorActor(sks, ryw, &end, &lastModuleRead, &actualEndOffset, &result, &cache)); // Handle all corner cases like what RYW does // return if range inverted if (actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) { @@ -195,7 +208,14 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr KeyRangeRef kr = iter->range(); KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin; KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end; - Standalone pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + if (iter->value()->isAsync() && cache.present()) { + const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast(iter->value()); + Standalone pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache)); + pairs = pairs_; + } else { + Standalone pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + pairs = pairs_; + } result.arena().dependsOn(pairs.arena()); // limits handler for (int i = pairs.size() - 1; i >= 0; --i) { @@ -218,7 +238,14 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr KeyRangeRef kr = iter->range(); KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin; KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end; - Standalone pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + if (iter->value()->isAsync() && cache.present()) { + const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast(iter->value()); + Standalone pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache)); + pairs = pairs_; + } else { + Standalone pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd))); + pairs = pairs_; + } result.arena().dependsOn(pairs.arena()); // limits handler for (int i = 0; i < pairs.size(); ++i) { @@ -316,7 +343,7 @@ Future> ConflictingKeysImpl::getRange(ReadYourWritesT return result; } -ACTOR Future> ddStatsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { +ACTOR Future> ddMetricsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { try { auto keys = kr.removePrefix(ddStatsRange.begin); Standalone> resultWithoutPrefix = @@ -339,10 +366,10 @@ ACTOR Future> ddStatsGetRangeActor(ReadYourWritesTran } } -DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {} +DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {} Future> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { - return ddStatsGetRangeActor(ryw, kr); + return ddMetricsGetRangeActor(ryw, kr); } class SpecialKeyRangeTestImpl : public SpecialKeyRangeBaseImpl { diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index a33ff666a4..4c6b127cbc 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -38,13 +38,55 @@ public: // Each derived class only needs to implement this simple version of getRange virtual Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0; - explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {} + explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr, bool async = false) : range(kr), async(async) {} KeyRangeRef getKeyRange() const { return range; } + bool isAsync() const { return async; } virtual ~SpecialKeyRangeBaseImpl() {} protected: KeyRange range; // underlying key range for this function + bool async; // true if the range read emits a rpc call, thus we cache the results to keep consistency in the same + // range read +}; + +class SpecialKeyRangeAsyncImpl : public SpecialKeyRangeBaseImpl { +public: + explicit SpecialKeyRangeAsyncImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr, true) {} + + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0; + + // calling with a cache object to have consistent results if we need to call rpc + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr, + Optional>* cache) const { + return getRangeAsyncActor(this, ryw, kr, cache); + } + + ACTOR static Future> getRangeAsyncActor(const SpecialKeyRangeBaseImpl* skrAyncImpl, + ReadYourWritesTransaction* ryw, KeyRangeRef kr, + Optional>* cache) { + ASSERT(skrAyncImpl->getKeyRange().contains(kr)); + if (cache == nullptr) { + // a nullptr means we want to read directly and do not need to cache them + Standalone result = wait(skrAyncImpl->getRange(ryw, kr)); + return result; + } else if (!cache->present()) { + // For simplicity, every time we need to cache, we read the whole range + // TODO : improvements are needed if we have expensive rpc calls + Standalone result_ = wait(skrAyncImpl->getRange(ryw, skrAyncImpl->getKeyRange())); + *cache = result_; + } + const auto& allResults = cache->get(); + int start = 0, end = allResults.size(); + while (start < allResults.size() && allResults[start].key < kr.begin) ++start; + while (end > 0 && allResults[end - 1].key >= kr.end) --end; + if (start < end) { + Standalone result = RangeResultRef(allResults.slice(start, end), false); + result.arena().dependsOn(allResults.arena()); + return result; + } else + return Standalone(); + } }; class SpecialKeySpace { @@ -148,7 +190,7 @@ public: Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; }; -class DDStatsRangeImpl : public SpecialKeyRangeBaseImpl { +class DDStatsRangeImpl : public SpecialKeyRangeAsyncImpl { public: explicit DDStatsRangeImpl(KeyRangeRef kr); Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 01d2a34ab9..3d96ed4408 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -822,7 +822,8 @@ ACTOR Future fetchShardMetricsList_impl( DataDistributionTracker* self, Ge // list of metrics, regenerate on loop when full range unsuccessful Standalone> result; Future onChange; - for (auto t : self->shards.containedRanges(req.keys)) { + for (auto t = self->shards.containedRanges(req.keys).begin(); + t != self->shards.intersectingRanges(req.keys).end(); ++t) { auto &stats = t.value().stats; if( !stats->get().present() ) { onChange = stats->onChange(); diff --git a/fdbserver/workloads/DataDistributionMetrics.actor.cpp b/fdbserver/workloads/DataDistributionMetrics.actor.cpp index 96a0d37510..222b96ccff 100644 --- a/fdbserver/workloads/DataDistributionMetrics.actor.cpp +++ b/fdbserver/workloads/DataDistributionMetrics.actor.cpp @@ -26,52 +26,110 @@ struct DataDistributionMetricsWorkload : KVWorkload { - int numTransactions; - int writesPerTransaction; - int transactionsCommitted; - int numShards; + int numShards, readPerTx, writePerTx; int64_t avgBytes; + double testDuration; + std::string keyPrefix; + PerfIntCounter commits, errors; DataDistributionMetricsWorkload(WorkloadContext const& wcx) - : KVWorkload(wcx), transactionsCommitted(0), numShards(0), avgBytes(0) { - numTransactions = getOption(options, LiteralStringRef("numTransactions"), 100); - writesPerTransaction = getOption(options, LiteralStringRef("writesPerTransaction"), 1000); + : KVWorkload(wcx), numShards(0), avgBytes(0), commits("Commits"), errors("Errors") { + testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); + keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("DDMetrics")).toString(); + readPerTx = getOption(options, LiteralStringRef("readPerTransaction"), 1); + writePerTx = getOption(options, LiteralStringRef("writePerTransaction"), 5 * readPerTx); + ASSERT(nodeCount > 1); } static Value getRandomValue() { return Standalone(format("Value/%08d", deterministicRandom()->randomInt(0, 10e6))); } - ACTOR static Future _start(Database cx, DataDistributionMetricsWorkload* self) { - state int tNum; - for (tNum = 0; tNum < self->numTransactions; ++tNum) { - loop { - state ReadYourWritesTransaction tr(cx); - try { - state int i; - for (i = 0; i < self->writesPerTransaction; ++i) { - tr.set(StringRef(format("Key/%08d", tNum * self->writesPerTransaction + i)), getRandomValue()); + Key keyForIndex(int n) { return doubleToTestKey((double)n / nodeCount, keyPrefix); } + + ACTOR static Future ddRWClient(Database cx, DataDistributionMetricsWorkload* self) { + loop { + state ReadYourWritesTransaction tr(cx); + state int i; + try { + for (i = 0; i < self->readPerTx; ++i) + wait(success(tr.get(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount))))); // read + for (i = 0; i < self->writePerTx; ++i) + tr.set(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount)), getRandomValue()); // write + wait(tr.commit()); + ++self->commits; + } catch (Error& e) { + wait(tr.onError(e)); + } + tr.reset(); + } + } + + ACTOR Future resultConsistencyCheckClient(Database cx, DataDistributionMetricsWorkload* self) { + state Reference tr = + Reference(new ReadYourWritesTransaction(cx)); + loop { + try { + int startIndex = deterministicRandom()->randomInt(0, self->nodeCount - 1); + int endIndex = deterministicRandom()->randomInt(startIndex + 1, self->nodeCount); + state Key startKey = self->keyForIndex(startIndex); + state Key endKey = self->keyForIndex(endIndex); + // lastLessOrEqual + state KeySelector begin = KeySelectorRef(startKey.withPrefix(ddStatsRange.begin, startKey.arena()), true, 0); + state KeySelector end = KeySelectorRef(endKey.withPrefix(ddStatsRange.begin, endKey.arena()), false, 2); + Standalone result = wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT))); + if (result.size() > 1) { + if (result[0].key <= begin.getKey() && result[1].key > begin.getKey()) { + TraceEvent(SevDebug, "DDMetricsConsistencyTest") + .detail("Size", result.size()) + .detail("FirstKey", result[0].key.toString()) + .detail("SecondKey", result[1].key.toString()) + .detail("BeginKeySelector", begin.toString()); + } else { + ++self->errors; + TraceEvent(SevError, "TestFailure") + .detail("Reason", "Result mismatches the given begin selector") + .detail("Size", result.size()) + .detail("FirstKey", result[0].key.toString()) + .detail("SecondKey", result[1].key.toString()) + .detail("BeginKeySelector", begin.toString()); + } + if (result[result.size()-1].key >= end.getKey() && result[result.size()-2].key < end.getKey()) { + TraceEvent(SevDebug, "DDMetricsConsistencyTest") + .detail("Size", result.size()) + .detail("LastKey", result[result.size()-1].key.toString()) + .detail("SecondLastKey", result[result.size()-2].key.toString()) + .detail("EndKeySelector", end.toString()); + } else { + ++self->errors; + TraceEvent(SevError, "TestFailure") + .detail("Reason", "Result mismatches the given end selector") + .detail("Size", result.size()) + .detail("FirstKey", result[result.size()-1].key.toString()) + .detail("SecondKey", result[result.size()-2].key.toString()) + .detail("EndKeySelector", end.toString()); } - wait(tr.commit()); - ++self->transactionsCommitted; - break; - } catch (Error& e) { - wait(tr.onError(e)); } + } catch (Error& e) { + // Ignore timed_out error and cross_module_read, the end key may potentially point outside the range + if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read) continue; + TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").detail("Error", e.what()); + wait(tr->onError(e)); } } - return Void(); } ACTOR static Future _check(Database cx, DataDistributionMetricsWorkload* self) { - if (self->transactionsCommitted == 0) { - TraceEvent(SevError, "NoTransactionsCommitted"); + if (self->errors.getValue() > 0) { + TraceEvent(SevError, "TestFailure").detail("Reason", "GetRange Results Inconsistent"); return false; } + // TODO : find why this not work + // wait(quietDatabase(cx, self->dbInfo, "PopulateTPCC")); state Reference tr = Reference(new ReadYourWritesTransaction(cx)); try { - state Standalone result = wait(tr->getRange(ddStatsRange, 100)); + state Standalone result = wait(tr->getRange(ddStatsRange, CLIENT_KNOBS->SHARD_COUNT_LIMIT)); ASSERT(!result.more); self->numShards = result.size(); if (self->numShards < 1) return false; @@ -81,19 +139,31 @@ struct DataDistributionMetricsWorkload : KVWorkload { totalBytes += readJSONStrictly(result[i].value.toString()).get_obj()["ShardBytes"].get_int64(); } self->avgBytes = totalBytes / self->numShards; - // fetch data-distribution stats for a smalller range + // fetch data-distribution stats for a smaller range + ASSERT(result.size()); state int idx = deterministicRandom()->randomInt(0, result.size()); Standalone res = wait(tr->getRange( - KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), 100)); + KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), + 100)); ASSERT_WE_THINK(res.size() == 1 && - res[0] == result[idx]); // It works good now. However, not sure in any case of data-distribution, the number changes + res[0] == result[idx]); // It works good now. However, not sure in any + // case of data-distribution, the number changes } catch (Error& e) { TraceEvent(SevError, "FailedToRetrieveDDMetrics").detail("Error", e.what()); - return false; + throw; } return true; } + ACTOR Future _start(Database cx, DataDistributionMetricsWorkload* self) { + std::vector> clients; + clients.push_back(self->resultConsistencyCheckClient(cx, self)); + for (int i = 0; i < self->actorCount; ++i) clients.push_back(self->ddRWClient(cx, self)); + wait(timeout(waitForAll(clients), self->testDuration, Void())); + wait(delay(5.0)); + return Void(); + } + virtual std::string description() { return "DataDistributionMetrics"; } virtual Future setup(Database const& cx) { return Void(); } virtual Future start(Database const& cx) { return _start(cx, this); } @@ -102,6 +172,7 @@ struct DataDistributionMetricsWorkload : KVWorkload { virtual void getMetrics(vector& m) { m.push_back(PerfMetric("NumShards", numShards, true)); m.push_back(PerfMetric("AvgBytes", avgBytes, true)); + m.push_back(commits.getMetric()); } }; diff --git a/tests/DataDistributionMetrics.txt b/tests/DataDistributionMetrics.txt index 77c83b0eb6..b9a98ab782 100644 --- a/tests/DataDistributionMetrics.txt +++ b/tests/DataDistributionMetrics.txt @@ -1,21 +1,24 @@ -testTitle=DataDistributionMetrics +testTitle=DataDistributionMetricsCorrectness + testName=DataDistributionMetrics + testDuration=10.0 + nodeCount=100000 + actorCount=64 + keyPrefix=DDMetrics + testName=Cycle transactionsPerSecond=2500.0 testDuration=10.0 expectedRate=0.025 - testName=DataDistributionMetrics - numTransactions=100 - writesPerTransaction=1000 - - testName=Attrition - machinesToKill=1 - machinesToLeave=3 - reboot=true - testDuration=10.0 - - testName=Attrition - machinesToKill=1 - machinesToLeave=3 - reboot=true - testDuration=10.0 \ No newline at end of file + testName=Mako + testDuration=10.0 + transactionsPerSecond=2500 + rows=100000 + sampleSize=100 + valueBytes=16 + keyBytes=16 + operations=u8i + actorCountPerClient=64 + populateData=true + runBenchmark=true + preserveData=false \ No newline at end of file