Fix fetchShardMetricsList_impl and add read cache in special key space

This commit is contained in:
Chaoguang Lin 2020-06-11 12:22:19 -07:00
parent cc10aef548
commit 980bee1d13
5 changed files with 203 additions and 59 deletions

View File

@ -38,12 +38,13 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
// orEqual == false && offset == 1 (Standard form)
// If the corresponding key is not in the underlying key range, it will move over the range
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw,
KeySelector* ks) {
KeySelector* ks, Optional<Standalone<RangeResultRef>>* cache) {
ASSERT(!ks->orEqual); // should be removed before calling
ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized
state Key startKey(skrImpl->getKeyRange().begin);
state Key endKey(skrImpl->getKeyRange().end);
state Standalone<RangeResultRef> result;
if (ks->offset < 1) {
// less than the given key
@ -60,7 +61,15 @@ ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl*
.detail("SpecialKeyRangeStart", skrImpl->getKeyRange().begin)
.detail("SpecialKeyRangeEnd", skrImpl->getKeyRange().end);
Standalone<RangeResultRef> result = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey)));
if (skrImpl->isAsync()) {
const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast<const SpecialKeyRangeAsyncImpl*>(skrImpl);
Standalone<RangeResultRef> result_ = wait(ptr->getRange(ryw, KeyRangeRef(startKey, endKey), cache));
result = result_;
} else {
Standalone<RangeResultRef> result_ = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey)));
result = result_;
}
if (result.size() == 0) {
TraceEvent(SevDebug, "ZeroElementsIntheRange").detail("Start", startKey).detail("End", endKey);
return Void();
@ -107,7 +116,8 @@ void onModuleRead(ReadYourWritesTransaction* ryw, SpecialKeySpace::MODULE module
// to maintain; Thus, separate each part to make the code easy to understand and more compact
ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeySelector* ks,
Optional<SpecialKeySpace::MODULE>* lastModuleRead, int* actualOffset,
Standalone<RangeResultRef>* result) {
Standalone<RangeResultRef>* result,
Optional<Standalone<RangeResultRef>>* cache) {
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter =
ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey())
: sks->getImpls().rangeContaining(ks->getKey());
@ -115,7 +125,7 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite
(ks->offset > 1 && iter != sks->getImpls().ranges().end())) {
onModuleRead(ryw, sks->getModules().rangeContaining(iter->begin())->value(), *lastModuleRead);
if (iter->value() != nullptr) {
wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks));
wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks, cache));
}
ks->offset < 1 ? --iter : ++iter;
}
@ -164,13 +174,16 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr
// This function handles ranges which cover more than one keyrange and aggregates all results
// KeySelector, GetRangeLimits and reverse are all handled here
state Standalone<RangeResultRef> result;
state Standalone<RangeResultRef> pairs;
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter;
state int actualBeginOffset;
state int actualEndOffset;
state Optional<SpecialKeySpace::MODULE> lastModuleRead;
// used to cache result from potential first read
state Optional<Standalone<RangeResultRef>> cache;
wait(normalizeKeySelectorActor(sks, ryw, &begin, &lastModuleRead, &actualBeginOffset, &result));
wait(normalizeKeySelectorActor(sks, ryw, &end, &lastModuleRead, &actualEndOffset, &result));
wait(normalizeKeySelectorActor(sks, ryw, &begin, &lastModuleRead, &actualBeginOffset, &result, &cache));
wait(normalizeKeySelectorActor(sks, ryw, &end, &lastModuleRead, &actualEndOffset, &result, &cache));
// Handle all corner cases like what RYW does
// return if range inverted
if (actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
@ -195,7 +208,14 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr
KeyRangeRef kr = iter->range();
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end;
Standalone<RangeResultRef> pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
if (iter->value()->isAsync() && cache.present()) {
const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast<const SpecialKeyRangeAsyncImpl*>(iter->value());
Standalone<RangeResultRef> pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache));
pairs = pairs_;
} else {
Standalone<RangeResultRef> pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
pairs = pairs_;
}
result.arena().dependsOn(pairs.arena());
// limits handler
for (int i = pairs.size() - 1; i >= 0; --i) {
@ -218,7 +238,14 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTr
KeyRangeRef kr = iter->range();
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end;
Standalone<RangeResultRef> pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
if (iter->value()->isAsync() && cache.present()) {
const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast<const SpecialKeyRangeAsyncImpl*>(iter->value());
Standalone<RangeResultRef> pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache));
pairs = pairs_;
} else {
Standalone<RangeResultRef> pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
pairs = pairs_;
}
result.arena().dependsOn(pairs.arena());
// limits handler
for (int i = 0; i < pairs.size(); ++i) {
@ -316,7 +343,7 @@ Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(ReadYourWritesT
return result;
}
ACTOR Future<Standalone<RangeResultRef>> ddStatsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
ACTOR Future<Standalone<RangeResultRef>> ddMetricsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
try {
auto keys = kr.removePrefix(ddStatsRange.begin);
Standalone<VectorRef<DDMetricsRef>> resultWithoutPrefix =
@ -339,10 +366,10 @@ ACTOR Future<Standalone<RangeResultRef>> ddStatsGetRangeActor(ReadYourWritesTran
}
}
DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {}
Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
return ddStatsGetRangeActor(ryw, kr);
return ddMetricsGetRangeActor(ryw, kr);
}
class SpecialKeyRangeTestImpl : public SpecialKeyRangeBaseImpl {

View File

@ -38,13 +38,55 @@ public:
// Each derived class only needs to implement this simple version of getRange
virtual Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0;
explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {}
explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr, bool async = false) : range(kr), async(async) {}
KeyRangeRef getKeyRange() const { return range; }
bool isAsync() const { return async; }
virtual ~SpecialKeyRangeBaseImpl() {}
protected:
KeyRange range; // underlying key range for this function
bool async; // true if the range read emits a rpc call, thus we cache the results to keep consistency in the same
// range read
};
class SpecialKeyRangeAsyncImpl : public SpecialKeyRangeBaseImpl {
public:
explicit SpecialKeyRangeAsyncImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr, true) {}
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0;
// calling with a cache object to have consistent results if we need to call rpc
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr,
Optional<Standalone<RangeResultRef>>* cache) const {
return getRangeAsyncActor(this, ryw, kr, cache);
}
ACTOR static Future<Standalone<RangeResultRef>> getRangeAsyncActor(const SpecialKeyRangeBaseImpl* skrAyncImpl,
ReadYourWritesTransaction* ryw, KeyRangeRef kr,
Optional<Standalone<RangeResultRef>>* cache) {
ASSERT(skrAyncImpl->getKeyRange().contains(kr));
if (cache == nullptr) {
// a nullptr means we want to read directly and do not need to cache them
Standalone<RangeResultRef> result = wait(skrAyncImpl->getRange(ryw, kr));
return result;
} else if (!cache->present()) {
// For simplicity, every time we need to cache, we read the whole range
// TODO : improvements are needed if we have expensive rpc calls
Standalone<RangeResultRef> result_ = wait(skrAyncImpl->getRange(ryw, skrAyncImpl->getKeyRange()));
*cache = result_;
}
const auto& allResults = cache->get();
int start = 0, end = allResults.size();
while (start < allResults.size() && allResults[start].key < kr.begin) ++start;
while (end > 0 && allResults[end - 1].key >= kr.end) --end;
if (start < end) {
Standalone<RangeResultRef> result = RangeResultRef(allResults.slice(start, end), false);
result.arena().dependsOn(allResults.arena());
return result;
} else
return Standalone<RangeResultRef>();
}
};
class SpecialKeySpace {
@ -148,7 +190,7 @@ public:
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class DDStatsRangeImpl : public SpecialKeyRangeBaseImpl {
class DDStatsRangeImpl : public SpecialKeyRangeAsyncImpl {
public:
explicit DDStatsRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;

View File

@ -822,7 +822,8 @@ ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, Ge
// list of metrics, regenerate on loop when full range unsuccessful
Standalone<VectorRef<DDMetricsRef>> result;
Future<Void> onChange;
for (auto t : self->shards.containedRanges(req.keys)) {
for (auto t = self->shards.containedRanges(req.keys).begin();
t != self->shards.intersectingRanges(req.keys).end(); ++t) {
auto &stats = t.value().stats;
if( !stats->get().present() ) {
onChange = stats->onChange();

View File

@ -26,52 +26,110 @@
struct DataDistributionMetricsWorkload : KVWorkload {
int numTransactions;
int writesPerTransaction;
int transactionsCommitted;
int numShards;
int numShards, readPerTx, writePerTx;
int64_t avgBytes;
double testDuration;
std::string keyPrefix;
PerfIntCounter commits, errors;
DataDistributionMetricsWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), transactionsCommitted(0), numShards(0), avgBytes(0) {
numTransactions = getOption(options, LiteralStringRef("numTransactions"), 100);
writesPerTransaction = getOption(options, LiteralStringRef("writesPerTransaction"), 1000);
: KVWorkload(wcx), numShards(0), avgBytes(0), commits("Commits"), errors("Errors") {
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("DDMetrics")).toString();
readPerTx = getOption(options, LiteralStringRef("readPerTransaction"), 1);
writePerTx = getOption(options, LiteralStringRef("writePerTransaction"), 5 * readPerTx);
ASSERT(nodeCount > 1);
}
static Value getRandomValue() {
return Standalone<StringRef>(format("Value/%08d", deterministicRandom()->randomInt(0, 10e6)));
}
ACTOR static Future<Void> _start(Database cx, DataDistributionMetricsWorkload* self) {
state int tNum;
for (tNum = 0; tNum < self->numTransactions; ++tNum) {
loop {
state ReadYourWritesTransaction tr(cx);
try {
state int i;
for (i = 0; i < self->writesPerTransaction; ++i) {
tr.set(StringRef(format("Key/%08d", tNum * self->writesPerTransaction + i)), getRandomValue());
Key keyForIndex(int n) { return doubleToTestKey((double)n / nodeCount, keyPrefix); }
ACTOR static Future<Void> ddRWClient(Database cx, DataDistributionMetricsWorkload* self) {
loop {
state ReadYourWritesTransaction tr(cx);
state int i;
try {
for (i = 0; i < self->readPerTx; ++i)
wait(success(tr.get(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount))))); // read
for (i = 0; i < self->writePerTx; ++i)
tr.set(self->keyForIndex(deterministicRandom()->randomInt(0, self->nodeCount)), getRandomValue()); // write
wait(tr.commit());
++self->commits;
} catch (Error& e) {
wait(tr.onError(e));
}
tr.reset();
}
}
ACTOR Future<Void> resultConsistencyCheckClient(Database cx, DataDistributionMetricsWorkload* self) {
state Reference<ReadYourWritesTransaction> tr =
Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
loop {
try {
int startIndex = deterministicRandom()->randomInt(0, self->nodeCount - 1);
int endIndex = deterministicRandom()->randomInt(startIndex + 1, self->nodeCount);
state Key startKey = self->keyForIndex(startIndex);
state Key endKey = self->keyForIndex(endIndex);
// lastLessOrEqual
state KeySelector begin = KeySelectorRef(startKey.withPrefix(ddStatsRange.begin, startKey.arena()), true, 0);
state KeySelector end = KeySelectorRef(endKey.withPrefix(ddStatsRange.begin, endKey.arena()), false, 2);
Standalone<RangeResultRef> result = wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT)));
if (result.size() > 1) {
if (result[0].key <= begin.getKey() && result[1].key > begin.getKey()) {
TraceEvent(SevDebug, "DDMetricsConsistencyTest")
.detail("Size", result.size())
.detail("FirstKey", result[0].key.toString())
.detail("SecondKey", result[1].key.toString())
.detail("BeginKeySelector", begin.toString());
} else {
++self->errors;
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Result mismatches the given begin selector")
.detail("Size", result.size())
.detail("FirstKey", result[0].key.toString())
.detail("SecondKey", result[1].key.toString())
.detail("BeginKeySelector", begin.toString());
}
if (result[result.size()-1].key >= end.getKey() && result[result.size()-2].key < end.getKey()) {
TraceEvent(SevDebug, "DDMetricsConsistencyTest")
.detail("Size", result.size())
.detail("LastKey", result[result.size()-1].key.toString())
.detail("SecondLastKey", result[result.size()-2].key.toString())
.detail("EndKeySelector", end.toString());
} else {
++self->errors;
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Result mismatches the given end selector")
.detail("Size", result.size())
.detail("FirstKey", result[result.size()-1].key.toString())
.detail("SecondKey", result[result.size()-2].key.toString())
.detail("EndKeySelector", end.toString());
}
wait(tr.commit());
++self->transactionsCommitted;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
} catch (Error& e) {
// Ignore timed_out error and cross_module_read, the end key may potentially point outside the range
if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read) continue;
TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").detail("Error", e.what());
wait(tr->onError(e));
}
}
return Void();
}
ACTOR static Future<bool> _check(Database cx, DataDistributionMetricsWorkload* self) {
if (self->transactionsCommitted == 0) {
TraceEvent(SevError, "NoTransactionsCommitted");
if (self->errors.getValue() > 0) {
TraceEvent(SevError, "TestFailure").detail("Reason", "GetRange Results Inconsistent");
return false;
}
// TODO : find why this not work
// wait(quietDatabase(cx, self->dbInfo, "PopulateTPCC"));
state Reference<ReadYourWritesTransaction> tr =
Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
try {
state Standalone<RangeResultRef> result = wait(tr->getRange(ddStatsRange, 100));
state Standalone<RangeResultRef> result = wait(tr->getRange(ddStatsRange, CLIENT_KNOBS->SHARD_COUNT_LIMIT));
ASSERT(!result.more);
self->numShards = result.size();
if (self->numShards < 1) return false;
@ -81,19 +139,31 @@ struct DataDistributionMetricsWorkload : KVWorkload {
totalBytes += readJSONStrictly(result[i].value.toString()).get_obj()["ShardBytes"].get_int64();
}
self->avgBytes = totalBytes / self->numShards;
// fetch data-distribution stats for a smalller range
// fetch data-distribution stats for a smaller range
ASSERT(result.size());
state int idx = deterministicRandom()->randomInt(0, result.size());
Standalone<RangeResultRef> res = wait(tr->getRange(
KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), 100));
KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end),
100));
ASSERT_WE_THINK(res.size() == 1 &&
res[0] == result[idx]); // It works good now. However, not sure in any case of data-distribution, the number changes
res[0] == result[idx]); // It works good now. However, not sure in any
// case of data-distribution, the number changes
} catch (Error& e) {
TraceEvent(SevError, "FailedToRetrieveDDMetrics").detail("Error", e.what());
return false;
throw;
}
return true;
}
ACTOR Future<Void> _start(Database cx, DataDistributionMetricsWorkload* self) {
std::vector<Future<Void>> clients;
clients.push_back(self->resultConsistencyCheckClient(cx, self));
for (int i = 0; i < self->actorCount; ++i) clients.push_back(self->ddRWClient(cx, self));
wait(timeout(waitForAll(clients), self->testDuration, Void()));
wait(delay(5.0));
return Void();
}
virtual std::string description() { return "DataDistributionMetrics"; }
virtual Future<Void> setup(Database const& cx) { return Void(); }
virtual Future<Void> start(Database const& cx) { return _start(cx, this); }
@ -102,6 +172,7 @@ struct DataDistributionMetricsWorkload : KVWorkload {
virtual void getMetrics(vector<PerfMetric>& m) {
m.push_back(PerfMetric("NumShards", numShards, true));
m.push_back(PerfMetric("AvgBytes", avgBytes, true));
m.push_back(commits.getMetric());
}
};

View File

@ -1,21 +1,24 @@
testTitle=DataDistributionMetrics
testTitle=DataDistributionMetricsCorrectness
testName=DataDistributionMetrics
testDuration=10.0
nodeCount=100000
actorCount=64
keyPrefix=DDMetrics
testName=Cycle
transactionsPerSecond=2500.0
testDuration=10.0
expectedRate=0.025
testName=DataDistributionMetrics
numTransactions=100
writesPerTransaction=1000
testName=Attrition
machinesToKill=1
machinesToLeave=3
reboot=true
testDuration=10.0
testName=Attrition
machinesToKill=1
machinesToLeave=3
reboot=true
testDuration=10.0
testName=Mako
testDuration=10.0
transactionsPerSecond=2500
rows=100000
sampleSize=100
valueBytes=16
keyBytes=16
operations=u8i
actorCountPerClient=64
populateData=true
runBenchmark=true
preserveData=false