Add tests for special_keys_cross_module_read cases, fixed one corner case
This commit is contained in:
parent
47f4f7085e
commit
032bc59cdb
|
@ -45,11 +45,12 @@ ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl*
|
|||
|
||||
if (ks->offset < 1) {
|
||||
// less than the given key
|
||||
if (skrImpl->getKeyRange().contains(ks->getKey())) endKey = ks->getKey(); // TODO : add test for startKey equals endKey
|
||||
if (skrImpl->getKeyRange().contains(ks->getKey())) endKey = ks->getKey();
|
||||
} else {
|
||||
// greater than the given key
|
||||
if (skrImpl->getKeyRange().contains(ks->getKey())) startKey = ks->getKey();
|
||||
}
|
||||
ASSERT(startKey < endKey); // Note : startKey never equals endKey here
|
||||
|
||||
TraceEvent(SevDebug, "NormalizeKeySelector")
|
||||
.detail("OriginalKey", ks->getKey())
|
||||
|
@ -106,7 +107,8 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, Reference<Rea
|
|||
KeySelector* ks, Optional<SpecialKeySpace::MODULE>* lastModuleRead,
|
||||
int* actualOffset, Standalone<RangeResultRef>* result) {
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter =
|
||||
sks->getImpls().rangeContaining(ks->getKey());
|
||||
ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey())
|
||||
: sks->getImpls().rangeContaining(ks->getKey());
|
||||
while ((ks->offset < 1 && iter != sks->getImpls().ranges().begin()) ||
|
||||
(ks->offset > 1 && iter != sks->getImpls().ranges().end())) {
|
||||
onModuleRead(ryw, sks->getModules().rangeContaining(iter->begin())->value(), *lastModuleRead);
|
||||
|
@ -244,7 +246,7 @@ Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(Reference<ReadYourW
|
|||
begin.removeOrEqual(begin.arena());
|
||||
end.removeOrEqual(end.arena());
|
||||
|
||||
if( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) {
|
||||
if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) {
|
||||
TEST(true); // range inverted
|
||||
return Standalone<RangeResultRef>();
|
||||
}
|
||||
|
|
|
@ -227,6 +227,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
ACTOR Future<Void> testCrossModuleRead(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
|
||||
Database cx = cx_->clone();
|
||||
state Reference<ReadYourWritesTransaction> tx = Reference(new ReadYourWritesTransaction(cx));
|
||||
// begin key outside module range
|
||||
try {
|
||||
wait(success(tx->getRange(
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/transactio"), LiteralStringRef("\xff\xff/transaction0")),
|
||||
|
@ -235,7 +236,9 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
ASSERT(e.code() == error_code_special_keys_cross_module_read);
|
||||
tx->reset();
|
||||
}
|
||||
// end key outside module range
|
||||
try {
|
||||
wait(success(tx->getRange(
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction1")),
|
||||
|
@ -244,7 +247,9 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
ASSERT(e.code() == error_code_special_keys_cross_module_read);
|
||||
tx->reset();
|
||||
}
|
||||
// both begin and end outside module range
|
||||
try {
|
||||
wait(success(tx->getRange(
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/transaction"), LiteralStringRef("\xff\xff/transaction1")),
|
||||
|
@ -253,16 +258,19 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
ASSERT(e.code() == error_code_special_keys_cross_module_read);
|
||||
tx->reset();
|
||||
}
|
||||
// legal range read using the module range
|
||||
try {
|
||||
wait(success(tx->getRange(
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0")),
|
||||
CLIENT_KNOBS->TOO_MANY)));
|
||||
ASSERT(true);
|
||||
TEST(true);
|
||||
tx->reset();
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
|
||||
// begin keySelector outside module range
|
||||
try {
|
||||
const KeyRef key = LiteralStringRef("\xff\xff/cluster_file_path");
|
||||
KeySelector begin = KeySelectorRef(key, false, 0);
|
||||
|
@ -272,6 +280,18 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
ASSERT(e.code() == error_code_special_keys_cross_module_read);
|
||||
tx->reset();
|
||||
}
|
||||
// end keySelector inside module range, a tricky corner case
|
||||
try {
|
||||
tx->addReadConflictRange(singleKeyRange(LiteralStringRef("testKey")));
|
||||
KeySelector begin = KeySelectorRef(readConflictRangeKeysRange.begin, false, 1);
|
||||
KeySelector end = KeySelectorRef(LiteralStringRef("\xff\xff/transaction0"), false, 0);
|
||||
wait(success(tx->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->TOO_MANY))));
|
||||
TEST(true);
|
||||
tx->reset();
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue