Fix an off-by-one error in determining whether to include the entire range in the conflict ranges when a reverse range read returns early due to limit.

This commit is contained in:
A.J. Beamon 2022-06-06 15:22:55 -07:00
parent e579038018
commit 4f308b34fc
2 changed files with 37 additions and 12 deletions

View File

@ -253,6 +253,8 @@ public:
if (read.begin.getKey() < read.end.getKey()) {
rangeBegin = read.begin.getKey();
// If the end offset is 1 (first greater than / first greater or equal) or more, then no changes to the
// range after the returned results can change the outcome.
rangeEnd = read.end.offset > 0 && result.more ? read.begin.getKey() : read.end.getKey();
} else {
rangeBegin = read.end.getKey();
@ -289,7 +291,9 @@ public:
bool endInArena = false;
if (read.begin.getKey() < read.end.getKey()) {
rangeBegin = read.begin.offset <= 0 && result.more ? read.end.getKey() : read.begin.getKey();
// If the begin offset is 1 (first greater than / first greater or equal) or less, then no changes to the
// range prior to the returned results can change the outcome.
rangeBegin = read.begin.offset <= 1 && result.more ? read.end.getKey() : read.begin.getKey();
rangeEnd = read.end.getKey();
} else {
rangeBegin = read.end.getKey();

View File

@ -80,6 +80,7 @@ struct ConflictRangeWorkload : TestWorkload {
state int offsetA;
state int offsetB;
state int randomLimit;
state Reverse reverse = Reverse::False;
state bool randomSets = false;
state std::set<int> insertedSet;
state RangeResult originalResults;
@ -159,10 +160,13 @@ struct ConflictRangeWorkload : TestWorkload {
offsetA = deterministicRandom()->randomInt(-1 * self->maxOffset, self->maxOffset);
offsetB = deterministicRandom()->randomInt(-1 * self->maxOffset, self->maxOffset);
randomLimit = deterministicRandom()->randomInt(1, self->maxKeySpace);
reverse.set(deterministicRandom()->coinflip());
RangeResult res = wait(tr1.getRange(KeySelectorRef(StringRef(myKeyA), onEqualA, offsetA),
KeySelectorRef(StringRef(myKeyB), onEqualB, offsetB),
randomLimit));
randomLimit,
Snapshot::False,
reverse));
if (res.size()) {
originalResults = res;
break;
@ -225,13 +229,17 @@ struct ConflictRangeWorkload : TestWorkload {
StringRef(format("%010d", clearedEnd))));
RangeResult res = wait(trRYOW.getRange(KeySelectorRef(StringRef(myKeyA), onEqualA, offsetA),
KeySelectorRef(StringRef(myKeyB), onEqualB, offsetB),
randomLimit));
randomLimit,
Snapshot::False,
reverse));
wait(trRYOW.commit());
} else {
tr3.clear(StringRef(format("%010d", self->maxKeySpace + 1)));
RangeResult res = wait(tr3.getRange(KeySelectorRef(StringRef(myKeyA), onEqualA, offsetA),
KeySelectorRef(StringRef(myKeyB), onEqualB, offsetB),
randomLimit));
randomLimit,
Snapshot::False,
reverse));
wait(tr3.commit());
}
} catch (Error& e) {
@ -252,7 +260,9 @@ struct ConflictRangeWorkload : TestWorkload {
RangeResult res = wait(tr4.getRange(KeySelectorRef(StringRef(myKeyA), onEqualA, offsetA),
KeySelectorRef(StringRef(myKeyB), onEqualB, offsetB),
randomLimit));
randomLimit,
Snapshot::False,
reverse));
++self->withConflicts;
if (res.size() == originalResults.size()) {
@ -261,20 +271,27 @@ struct ConflictRangeWorkload : TestWorkload {
throw not_committed();
// Discard known cases where conflicts do not change the results
if (originalResults.size() == randomLimit && offsetB <= 0) {
// Hit limit but end offset goes backwards, so changes could effect results even though in
// this instance they did not
if (originalResults.size() == randomLimit &&
((offsetB <= 0 && !reverse) || (offsetA > 1 && reverse))) {
// Hit limit but end offset goes into the range, so changes could effect results even though
// in this instance they did not
throw not_committed();
}
if (originalResults[originalResults.size() - 1].key >= sentinelKey) {
KeyRef smallestResult = originalResults[0].key;
KeyRef largestResult = originalResults[originalResults.size() - 1].key;
if (reverse) {
std::swap(smallestResult, largestResult);
}
if (largestResult >= sentinelKey) {
// Results go into server keyspace, so if a key selector does not fully resolve offset, a
// change won't effect results
throw not_committed();
}
if ((originalResults[0].key == firstElement ||
originalResults[0].key == StringRef(format("%010d", *(insertedSet.begin())))) &&
if ((smallestResult == firstElement ||
smallestResult == StringRef(format("%010d", *(insertedSet.begin())))) &&
offsetA < 0) {
// Results return the first element, and the begin offset is negative, so if a key selector
// does not fully resolve the offset, a change won't effect results
@ -308,6 +325,7 @@ struct ConflictRangeWorkload : TestWorkload {
.detail("OffsetA", offsetA)
.detail("OffsetB", offsetB)
.detail("RandomLimit", randomLimit)
.detail("Reverse", reverse)
.detail("Size", originalResults.size())
.detail("Results", keyStr1)
.detail("Original", keyStr2);
@ -328,7 +346,9 @@ struct ConflictRangeWorkload : TestWorkload {
// If the commit is successful, check that the result matches the first execution.
RangeResult res = wait(tr4.getRange(KeySelectorRef(StringRef(myKeyA), onEqualA, offsetA),
KeySelectorRef(StringRef(myKeyB), onEqualB, offsetB),
randomLimit));
randomLimit,
Snapshot::False,
reverse));
++self->withoutConflicts;
if (res.size() == originalResults.size()) {
@ -366,6 +386,7 @@ struct ConflictRangeWorkload : TestWorkload {
.detail("OffsetA", offsetA)
.detail("OffsetB", offsetB)
.detail("RandomLimit", randomLimit)
.detail("Reverse", reverse)
.detail("Size", originalResults.size())
.detail("Results", keyStr1)
.detail("Original", keyStr2);