Add comments, refine traces, add delay to dd rpc calls in the test

This commit is contained in:
Chaoguang Lin 2020-06-15 15:31:55 -07:00
parent 5bc2e2e595
commit 7a5fe3800d
2 changed files with 20 additions and 16 deletions

View File

@ -37,6 +37,9 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
// This function will move the given KeySelector as far as possible to the standard form: // This function will move the given KeySelector as far as possible to the standard form:
// orEqual == false && offset == 1 (Standard form) // orEqual == false && offset == 1 (Standard form)
// If the corresponding key is not in the underlying key range, it will move over the range // If the corresponding key is not in the underlying key range, it will move over the range
// The cache object is used to cache the first read result from the rpc call during the key resolution,
// then when we need to do key resolution or result filtering,
// we, instead of rpc call, read from this cache object have consistent results
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw, ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw,
KeySelector* ks, Optional<Standalone<RangeResultRef>>* cache) { KeySelector* ks, Optional<Standalone<RangeResultRef>>* cache) {
ASSERT(!ks->orEqual); // should be removed before calling ASSERT(!ks->orEqual); // should be removed before calling

View File

@ -31,6 +31,7 @@ struct DataDistributionMetricsWorkload : KVWorkload {
double testDuration; double testDuration;
std::string keyPrefix; std::string keyPrefix;
PerfIntCounter commits, errors; PerfIntCounter commits, errors;
double delayPerLoop;
DataDistributionMetricsWorkload(WorkloadContext const& wcx) DataDistributionMetricsWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), numShards(0), avgBytes(0), commits("Commits"), errors("Errors") { : KVWorkload(wcx), numShards(0), avgBytes(0), commits("Commits"), errors("Errors") {
@ -38,6 +39,7 @@ struct DataDistributionMetricsWorkload : KVWorkload {
keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("DDMetrics")).toString(); keyPrefix = getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("DDMetrics")).toString();
readPerTx = getOption(options, LiteralStringRef("readPerTransaction"), 1); readPerTx = getOption(options, LiteralStringRef("readPerTransaction"), 1);
writePerTx = getOption(options, LiteralStringRef("writePerTransaction"), 5 * readPerTx); writePerTx = getOption(options, LiteralStringRef("writePerTransaction"), 5 * readPerTx);
delayPerLoop = getOption(options, LiteralStringRef("delayPerLoop"), 0.1); // throttling dd rpc calls
ASSERT(nodeCount > 1); ASSERT(nodeCount > 1);
} }
@ -63,7 +65,6 @@ struct DataDistributionMetricsWorkload : KVWorkload {
} catch (Error& e) { } catch (Error& e) {
wait(tr.onError(e)); wait(tr.onError(e));
} }
tr.reset();
} }
} }
@ -72,6 +73,7 @@ struct DataDistributionMetricsWorkload : KVWorkload {
Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx)); Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
loop { loop {
try { try {
wait(delay(self->delayPerLoop));
int startIndex = deterministicRandom()->randomInt(0, self->nodeCount - 1); int startIndex = deterministicRandom()->randomInt(0, self->nodeCount - 1);
int endIndex = deterministicRandom()->randomInt(startIndex + 1, self->nodeCount); int endIndex = deterministicRandom()->randomInt(startIndex + 1, self->nodeCount);
state Key startKey = self->keyForIndex(startIndex); state Key startKey = self->keyForIndex(startIndex);
@ -88,13 +90,7 @@ struct DataDistributionMetricsWorkload : KVWorkload {
wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT))); wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT)));
// Condition #1 and #2 can be broken if multiple rpc calls happened in one getRange // Condition #1 and #2 can be broken if multiple rpc calls happened in one getRange
if (result.size() > 1) { if (result.size() > 1) {
if (result[0].key <= begin.getKey() && result[1].key > begin.getKey()) { if (result[0].key > begin.getKey() || result[1].key <= begin.getKey()) {
TraceEvent(SevDebug, "DDMetricsConsistencyTest")
.detail("Size", result.size())
.detail("FirstKey", result[0].key.toString())
.detail("SecondKey", result[1].key.toString())
.detail("BeginKeySelector", begin.toString());
} else {
++self->errors; ++self->errors;
TraceEvent(SevError, "TestFailure") TraceEvent(SevError, "TestFailure")
.detail("Reason", "Result mismatches the given begin selector") .detail("Reason", "Result mismatches the given begin selector")
@ -103,13 +99,7 @@ struct DataDistributionMetricsWorkload : KVWorkload {
.detail("SecondKey", result[1].key.toString()) .detail("SecondKey", result[1].key.toString())
.detail("BeginKeySelector", begin.toString()); .detail("BeginKeySelector", begin.toString());
} }
if (result[result.size() - 1].key >= end.getKey() && result[result.size() - 2].key < end.getKey()) { if (result[result.size() - 1].key < end.getKey() || result[result.size() - 2].key >= end.getKey()) {
TraceEvent(SevDebug, "DDMetricsConsistencyTest")
.detail("Size", result.size())
.detail("LastKey", result[result.size() - 1].key.toString())
.detail("SecondLastKey", result[result.size() - 2].key.toString())
.detail("EndKeySelector", end.toString());
} else {
++self->errors; ++self->errors;
TraceEvent(SevError, "TestFailure") TraceEvent(SevError, "TestFailure")
.detail("Reason", "Result mismatches the given end selector") .detail("Reason", "Result mismatches the given end selector")
@ -118,11 +108,22 @@ struct DataDistributionMetricsWorkload : KVWorkload {
.detail("SecondKey", result[result.size() - 2].key.toString()) .detail("SecondKey", result[result.size() - 2].key.toString())
.detail("EndKeySelector", end.toString()); .detail("EndKeySelector", end.toString());
} }
// Debugging traces
// TraceEvent(SevDebug, "DDMetricsConsistencyTest")
// .detail("Size", result.size())
// .detail("FirstKey", result[0].key.toString())
// .detail("SecondKey", result[1].key.toString())
// .detail("BeginKeySelector", begin.toString());
// TraceEvent(SevDebug, "DDMetricsConsistencyTest")
// .detail("Size", result.size())
// .detail("LastKey", result[result.size() - 1].key.toString())
// .detail("SecondLastKey", result[result.size() - 2].key.toString())
// .detail("EndKeySelector", end.toString());
} }
} catch (Error& e) { } catch (Error& e) {
// Ignore timed_out error and cross_module_read, the end key selector may read through the end // Ignore timed_out error and cross_module_read, the end key selector may read through the end
if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read) continue; if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read) continue;
TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").detail("Error", e.what()); TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").error(e);
wait(tr->onError(e)); wait(tr->onError(e));
} }
} }