Change the workload to a more controlled test like ConflictRange test
This commit is contained in:
parent
6e92716be7
commit
aef9b515de
|
@ -48,9 +48,10 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
|
||||
readConflictRangeCount = getOption(options, LiteralStringRef("readConflictRangeCountPerTx"), 1);
|
||||
writeConflictRangeCount = getOption(options, LiteralStringRef("writeConflictRangeCountPerTx"), 1);
|
||||
// modeled by geometric distribution: (1 - prob) / prob = mean
|
||||
addReadConflictRangeProb = readConflictRangeCount / (readConflictRangeCount + 1.0);
|
||||
addWriteConflictRangeProb = writeConflictRangeCount / (writeConflictRangeCount + 1.0);
|
||||
ASSERT(readConflictRangeCount >= 1 && writeConflictRangeCount >= 1);
|
||||
// modeled by geometric distribution: (1 - prob) / prob = mean - 1, since we add at least one conflictRange to each tx
|
||||
addReadConflictRangeProb = (readConflictRangeCount - 1.0) / readConflictRangeCount;
|
||||
addWriteConflictRangeProb = (writeConflictRangeCount - 1.0) / writeConflictRangeCount;
|
||||
ASSERT(keyPrefix.size() + 16 <= keyBytes); // make sure the string format is valid
|
||||
nodeCount = getOption(options, LiteralStringRef("nodeCount"), 100);
|
||||
}
|
||||
|
@ -93,29 +94,32 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
.withPrefix(keyPrefix);
|
||||
}
|
||||
|
||||
void addRandomReadConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>& readConflictRanges) {
|
||||
void addRandomReadConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>* readConflictRanges) {
|
||||
int startIdx, endIdx;
|
||||
Key startKey, endKey;
|
||||
while (deterministicRandom()->random01() < addReadConflictRangeProb) {
|
||||
do { // add at least one
|
||||
startIdx = deterministicRandom()->randomInt(0, nodeCount);
|
||||
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount);
|
||||
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount+1);
|
||||
startKey = keyForIndex(startIdx);
|
||||
endKey = keyForIndex(endIdx);
|
||||
tr->addReadConflictRange(KeyRangeRef(startKey, endKey));
|
||||
readConflictRanges.push_back(KeyRangeRef(startKey, endKey));
|
||||
}
|
||||
if (readConflictRanges)
|
||||
readConflictRanges->push_back(KeyRangeRef(startKey, endKey));
|
||||
} while (deterministicRandom()->random01() < addReadConflictRangeProb);
|
||||
}
|
||||
|
||||
void addRandomWriteConflictRange(ReadYourWritesTransaction* tr) {
|
||||
void addRandomWriteConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>* writeConflictRanges) {
|
||||
int startIdx, endIdx;
|
||||
Key startKey, endKey;
|
||||
while (deterministicRandom()->random01() < addWriteConflictRangeProb) {
|
||||
do { // add at least one
|
||||
startIdx = deterministicRandom()->randomInt(0, nodeCount);
|
||||
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount);
|
||||
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount+1);
|
||||
startKey = keyForIndex(startIdx);
|
||||
endKey = keyForIndex(endIdx);
|
||||
tr->addWriteConflictRange(KeyRangeRef(startKey, endKey));
|
||||
}
|
||||
if (writeConflictRanges)
|
||||
writeConflictRanges->push_back(KeyRangeRef(startKey, endKey));
|
||||
} while (deterministicRandom()->random01() < addWriteConflictRangeProb);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> conflictingClient(Database cx, ReportConflictingKeysWorkload* self) {
|
||||
|
@ -123,38 +127,47 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
state ReadYourWritesTransaction tr(cx);
|
||||
state ReadYourWritesTransaction tr2(cx);
|
||||
state std::vector<KeyRange> readConflictRanges;
|
||||
state std::vector<KeyRange> writeConflictRanges;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::REPORT_CONFLICTING_KEYS);
|
||||
tr2.setOption(FDBTransactionOptions::REPORT_CONFLICTING_KEYS);
|
||||
// If READ_YOUR_WRITES_DISABLE set, it behaves like native transaction object
|
||||
// where overlapped conflict ranges are not merged.
|
||||
if (deterministicRandom()->random01() < 0.5)
|
||||
tr.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
|
||||
self->addRandomReadConflictRange(&tr, readConflictRanges);
|
||||
self->addRandomWriteConflictRange(&tr);
|
||||
if (deterministicRandom()->random01() < 0.5)
|
||||
tr2.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
|
||||
Version readVersion = wait( tr.getReadVersion() );
|
||||
tr2.setVersion(readVersion);
|
||||
self->addRandomReadConflictRange(&tr, nullptr);
|
||||
self->addRandomWriteConflictRange(&tr, &writeConflictRanges);
|
||||
++self->commits;
|
||||
wait(tr.commit());
|
||||
++self->xacts;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("FailedToCommitTx").error(e);
|
||||
state bool isConflict = false;
|
||||
if (e.code() == error_code_operation_cancelled)
|
||||
throw;
|
||||
else if (e.code() == error_code_not_committed) {
|
||||
|
||||
state bool foundConflict = false;
|
||||
try {
|
||||
self->addRandomReadConflictRange(&tr2, &readConflictRanges);
|
||||
self->addRandomWriteConflictRange(&tr2, nullptr);
|
||||
++self->commits;
|
||||
wait(tr2.commit());
|
||||
++self->xacts;
|
||||
} catch (Error& e) {
|
||||
if( e.code() != error_code_not_committed )
|
||||
throw e;
|
||||
foundConflict = true;
|
||||
++self->conflicts;
|
||||
isConflict = true;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
// check API correctness
|
||||
if (isConflict) {
|
||||
if (foundConflict) {
|
||||
// \xff\xff/transaction/conflicting_keys is always false, we skip it here for simplicity
|
||||
state KeyRange ckr = KeyRangeRef(keyAfter(LiteralStringRef("").withPrefix(conflictingKeysAbsolutePrefix)),
|
||||
LiteralStringRef("\xff\xff").withPrefix(conflictingKeysAbsolutePrefix));
|
||||
// The getRange here using the special key prefix "\xff\xff/transaction/conflicting_keys/" happens
|
||||
// locally Thus, the error handling is not needed here
|
||||
Future<Standalone<RangeResultRef>> conflictingKeyRangesFuture =
|
||||
tr.getRange(ckr, readConflictRanges.size() * 2);
|
||||
tr2.getRange(ckr, readConflictRanges.size() * 2);
|
||||
ASSERT(conflictingKeyRangesFuture.isReady());
|
||||
const Standalone<RangeResultRef> conflictingKeyRanges = conflictingKeyRangesFuture.get();
|
||||
ASSERT(conflictingKeyRanges.size() && (conflictingKeyRanges.size() % 2 == 0));
|
||||
|
@ -177,12 +190,37 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
++self->invalidReports;
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "InvalidKeyRangeReturned");
|
||||
}
|
||||
else if (!std::any_of(writeConflictRanges.begin(), writeConflictRanges.end(), [&kr](KeyRange wCR) {
|
||||
// Returned key range should be conflicting with at least one writeConflictRange
|
||||
return kr.intersects(wCR);
|
||||
})) {
|
||||
++self->invalidReports;
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "Returned keyranges are not conflicting with any write ranges");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// make sure no conflicts between readConflictRange and writeConflictRange
|
||||
for (const KeyRange& rCR : readConflictRanges) {
|
||||
if (std::any_of(writeConflictRanges.begin(), writeConflictRanges.end(), [&rCR](KeyRange wCR){
|
||||
bool result = wCR.intersects(rCR);
|
||||
if (result)
|
||||
TraceEvent(SevError, "TestFailure").detail("WriteRange", wCR.toString()).detail("ReadRange", rCR.toString());
|
||||
return result;
|
||||
})) {
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "No conflicts but should be");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
++self->retries;
|
||||
} catch (Error& e) {
|
||||
state Error e2 = e;
|
||||
wait(tr.onError(e2));
|
||||
wait(tr2.onError(e2));
|
||||
}
|
||||
readConflictRanges.clear();
|
||||
writeConflictRanges.clear();
|
||||
tr.reset();
|
||||
tr2.reset();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
testTitle=ReportConflictingKeysTest
|
||||
testName=ReportConflictingKeys
|
||||
testDuration=10.0
|
||||
testDuration=20.0
|
||||
nodeCount=10000
|
||||
keyPrefix=RCK
|
||||
keyBytes=64
|
||||
readConflictRangeCountPerTx=1
|
||||
writeConflictRangeCountPerTx=1
|
||||
readConflictRangeCountPerTx=10
|
||||
writeConflictRangeCountPerTx=10
|
||||
connectionFailuresDisableDuration=100000
|
||||
buggify=off
|
Loading…
Reference in New Issue