Add test for smaller range read in dd-stats

This commit is contained in:
Chaoguang Lin 2020-05-19 20:49:15 -07:00
parent 7971be2a19
commit c230c23a3a
2 changed files with 14 additions and 6 deletions

View File

@ -822,13 +822,14 @@ ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, Ge
// list of metrics, regenerate on loop when full range unsuccessful
Standalone<VectorRef<DDMetricsRef>> result;
Future<Void> onChange;
for( auto t : self->shards.intersectingRanges( req.keys ) ) {
for( auto t : self->shards.containedRanges( req.keys ) ) {
auto &stats = t.value().stats;
if( !stats->get().present() ) {
onChange = stats->onChange();
break;
}
result.push_back_deep(result.arena(), DDMetricsRef(stats->get().get().metrics.bytes, KeyRef(t.begin().toString())));
result.push_back_deep(result.arena(),
DDMetricsRef(stats->get().get().metrics.bytes, KeyRef(t.begin().toString())));
++shardNum;
if (shardNum >= req.shardLimit) {
break;

View File

@ -38,7 +38,9 @@ struct DataDistributionMetricsWorkload : KVWorkload {
writesPerTransaction = getOption(options, LiteralStringRef("writesPerTransaction"), 1000);
}
static Value getRandomValue() { return Standalone<StringRef>(format("Value/%08d", deterministicRandom()->randomInt(0, 10e6))); }
static Value getRandomValue() {
return Standalone<StringRef>(format("Value/%08d", deterministicRandom()->randomInt(0, 10e6)));
}
ACTOR static Future<Void> _start(Database cx, DataDistributionMetricsWorkload* self) {
state int tNum;
@ -66,19 +68,24 @@ struct DataDistributionMetricsWorkload : KVWorkload {
TraceEvent(SevError, "NoTransactionsCommitted");
return false;
}
Reference<ReadYourWritesTransaction> tr =
state Reference<ReadYourWritesTransaction> tr =
Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
try {
Standalone<RangeResultRef> result = wait(tr->getRange(ddStatsRange, 100));
state Standalone<RangeResultRef> result = wait(tr->getRange(ddStatsRange, 100));
ASSERT(!result.more);
self->numShards = result.size();
if (self->numShards < 1) return false;
int64_t totalBytes = 0;
state int64_t totalBytes = 0;
for (int i = 0; i < result.size(); ++i) {
ASSERT(result[i].key.startsWith(ddStatsRange.begin));
totalBytes += std::stoi(result[i].value.toString());
}
self->avgBytes = totalBytes / self->numShards;
// fetch data-distribution stats for a smalller range
state int idx = deterministicRandom()->randomInt(0, result.size());
Standalone<RangeResultRef> res = wait(tr->getRange(
KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : normalKeys.end), 100));
ASSERT(res.size() == 1 && res[0] == result[idx]); // If the data distribtion stats is not changing, then this is fine
} catch (Error& e) {
TraceEvent(SevError, "FailedToRetrieveDDMetrics").detail("Error", e.what());
return false;