Refactor special-key-space code that handles the normalization of keySelectors
This commit is contained in:
parent
03e395b353
commit
944f87e17b
|
@ -22,34 +22,30 @@
|
|||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// This function will normalize the given KeySelector to a standard KeySelector:
|
||||
// This function will move the given KeySelector as far as possible to the standard form:
|
||||
// orEqual == false && offset == 1 (Standard form)
|
||||
// If the corresponding key is not in this special key range, it will move as far as possible to adjust the offset to 1
|
||||
// It does have overhead here since we query all keys twice in the worst case.
|
||||
// However, moving the KeySelector while handling other parameters like limits makes the code much more complex and hard
|
||||
// to maintain; Thus, separate each part to make the code easy to understand and more compact
|
||||
ACTOR Future<Void> SpecialKeyRangeBaseImpl::normalizeKeySelectorActor(const SpecialKeyRangeBaseImpl* skrImpl,
|
||||
Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector* ks) {
|
||||
// If the corresponding key is not in the underlying key range, it will move over the range
|
||||
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl,
|
||||
Reference<ReadYourWritesTransaction> ryw, KeySelector* ks) {
|
||||
ASSERT(!ks->orEqual); // should be removed before calling
|
||||
ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized
|
||||
|
||||
state Key startKey(skrImpl->range.begin);
|
||||
state Key endKey(skrImpl->range.end);
|
||||
state Key startKey(skrImpl->getKeyRange().begin);
|
||||
state Key endKey(skrImpl->getKeyRange().end);
|
||||
|
||||
if (ks->offset < 1) {
|
||||
// less than the given key
|
||||
if (skrImpl->range.contains(ks->getKey())) endKey = keyAfter(ks->getKey());
|
||||
if (skrImpl->getKeyRange().contains(ks->getKey())) endKey = keyAfter(ks->getKey());
|
||||
} else {
|
||||
// greater than the given key
|
||||
if (skrImpl->range.contains(ks->getKey())) startKey = ks->getKey();
|
||||
if (skrImpl->getKeyRange().contains(ks->getKey())) startKey = ks->getKey();
|
||||
}
|
||||
|
||||
TraceEvent(SevDebug, "NormalizeKeySelector")
|
||||
.detail("OriginalKey", ks->getKey())
|
||||
.detail("OriginalOffset", ks->offset)
|
||||
.detail("SpecialKeyRangeStart", skrImpl->range.begin)
|
||||
.detail("SpecialKeyRangeEnd", skrImpl->range.end);
|
||||
.detail("SpecialKeyRangeStart", skrImpl->getKeyRange().begin)
|
||||
.detail("SpecialKeyRangeEnd", skrImpl->getKeyRange().end);
|
||||
|
||||
Standalone<RangeResultRef> result = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey)));
|
||||
if (result.size() == 0) {
|
||||
|
@ -77,8 +73,8 @@ ACTOR Future<Void> SpecialKeyRangeBaseImpl::normalizeKeySelectorActor(const Spec
|
|||
TraceEvent(SevDebug, "NormalizeKeySelector")
|
||||
.detail("NormalizedKey", ks->getKey())
|
||||
.detail("NormalizedOffset", ks->offset)
|
||||
.detail("SpecialKeyRangeStart", skrImpl->range.begin)
|
||||
.detail("SpecialKeyRangeEnd", skrImpl->range.end);
|
||||
.detail("SpecialKeyRangeStart", skrImpl->getKeyRange().begin)
|
||||
.detail("SpecialKeyRangeEnd", skrImpl->getKeyRange().end);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -90,12 +86,51 @@ void onModuleRead(const Reference<ReadYourWritesTransaction>& ryw, SpecialKeyRan
|
|||
lastModuleRead = module;
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::checkModuleFound(SpecialKeySpace* pks,
|
||||
// This function will normalize the given KeySelector to a standard KeySelector:
|
||||
// orEqual == false && offset == 1 (Standard form)
|
||||
// If the corresponding key is outside the whole space, it will move to the begin or the end
|
||||
// It does have overhead here since we query all keys twice in the worst case.
|
||||
// However, moving the KeySelector while handling other parameters like limits makes the code much more complex and hard
|
||||
// to maintain; Thus, separate each part to make the code easy to understand and more compact
|
||||
ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector* ks, Optional<SpecialKeyRangeBaseImpl*>* lastModuleRead,
|
||||
int* actualOffset, Standalone<RangeResultRef>* result) {
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter =
|
||||
sks->getImpls().rangeContaining(ks->getKey());
|
||||
while ((ks->offset < 1 && iter != sks->getImpls().ranges().begin()) ||
|
||||
(ks->offset > 1 && iter != sks->getImpls().ranges().end())) {
|
||||
onModuleRead(ryw, iter->value(), *lastModuleRead);
|
||||
if (iter->value() != nullptr) {
|
||||
wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks));
|
||||
}
|
||||
ks->offset < 1 ? --iter : ++iter;
|
||||
}
|
||||
*actualOffset = ks->offset;
|
||||
if (iter == sks->getImpls().ranges().begin())
|
||||
ks->setKey(sks->getKeyRange().begin);
|
||||
else if (iter == sks->getImpls().ranges().end())
|
||||
ks->setKey(sks->getKeyRange().end);
|
||||
|
||||
if (!ks->isFirstGreaterOrEqual()) {
|
||||
// The Key Selector points to key outside the whole special key space
|
||||
TraceEvent(SevInfo, "KeySelectorPointsOutside")
|
||||
.detail("TerminateKey", ks->getKey())
|
||||
.detail("TerminateOffset", ks->offset);
|
||||
if (ks->offset < 1 && iter == sks->getImpls().ranges().begin())
|
||||
result->readToBegin = true;
|
||||
else
|
||||
result->readThroughEnd = true;
|
||||
ks->offset = 1;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::checkModuleFound(SpecialKeySpace* sks,
|
||||
Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector begin, KeySelector end,
|
||||
GetRangeLimits limits, bool reverse) {
|
||||
std::pair<Standalone<RangeResultRef>, Optional<SpecialKeyRangeBaseImpl*>> result =
|
||||
wait(SpecialKeySpace::getRangeAggregationActor(pks, ryw, begin, end, limits, reverse));
|
||||
wait(SpecialKeySpace::getRangeAggregationActor(sks, ryw, begin, end, limits, reverse));
|
||||
if (ryw && !ryw->specialKeySpaceRelaxed()) {
|
||||
auto* module = result.second.orDefault(nullptr);
|
||||
if (module == nullptr) {
|
||||
|
@ -106,7 +141,7 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::checkModuleFound(Speci
|
|||
}
|
||||
|
||||
ACTOR Future<std::pair<Standalone<RangeResultRef>, Optional<SpecialKeyRangeBaseImpl*>>>
|
||||
SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw,
|
||||
SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector begin, KeySelector end, GetRangeLimits limits, bool reverse) {
|
||||
// This function handles ranges which cover more than one keyrange and aggregates all results
|
||||
// KeySelector, GetRangeLimits and reverse are all handled here
|
||||
|
@ -116,63 +151,8 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* pks, Reference<ReadYo
|
|||
state int actualEndOffset;
|
||||
state Optional<SpecialKeyRangeBaseImpl*> lastModuleRead;
|
||||
|
||||
// make sure offset == 1
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator beginIter =
|
||||
pks->impls.rangeContaining(begin.getKey());
|
||||
while ((begin.offset < 1 && beginIter != pks->impls.ranges().begin()) ||
|
||||
(begin.offset > 1 && beginIter != pks->impls.ranges().end())) {
|
||||
onModuleRead(ryw, beginIter->value(), lastModuleRead);
|
||||
if (beginIter->value() != nullptr) {
|
||||
wait(beginIter->value()->normalizeKeySelectorActor(beginIter->value(), ryw, &begin));
|
||||
}
|
||||
begin.offset < 1 ? --beginIter : ++beginIter;
|
||||
}
|
||||
|
||||
actualBeginOffset = begin.offset;
|
||||
if (beginIter == pks->impls.ranges().begin())
|
||||
begin.setKey(pks->range.begin);
|
||||
else if (beginIter == pks->impls.ranges().end())
|
||||
begin.setKey(pks->range.end);
|
||||
|
||||
if (!begin.isFirstGreaterOrEqual()) {
|
||||
// The Key Selector points to key outside the whole special key space
|
||||
TraceEvent(SevInfo, "BeginKeySelectorPointsOutside")
|
||||
.detail("TerminateKey", begin.getKey())
|
||||
.detail("TerminateOffset", begin.offset);
|
||||
if (begin.offset < 1 && beginIter == pks->impls.ranges().begin())
|
||||
result.readToBegin = true;
|
||||
else
|
||||
result.readThroughEnd = true;
|
||||
begin.offset = 1;
|
||||
}
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator endIter =
|
||||
pks->impls.rangeContaining(end.getKey());
|
||||
while ((end.offset < 1 && endIter != pks->impls.ranges().begin()) ||
|
||||
(end.offset > 1 && endIter != pks->impls.ranges().end())) {
|
||||
onModuleRead(ryw, endIter->value(), lastModuleRead);
|
||||
if (endIter->value() != nullptr) {
|
||||
wait(endIter->value()->normalizeKeySelectorActor(endIter->value(), ryw, &end));
|
||||
}
|
||||
end.offset < 1 ? --endIter : ++endIter;
|
||||
}
|
||||
|
||||
actualEndOffset = end.offset;
|
||||
if (endIter == pks->impls.ranges().begin())
|
||||
end.setKey(pks->range.begin);
|
||||
else if (endIter == pks->impls.ranges().end())
|
||||
end.setKey(pks->range.end);
|
||||
|
||||
if (!end.isFirstGreaterOrEqual()) {
|
||||
// The Key Selector points to key outside the whole special key space
|
||||
TraceEvent(SevInfo, "EndKeySelectorPointsOutside")
|
||||
.detail("TerminateKey", end.getKey())
|
||||
.detail("TerminateOffset", end.offset);
|
||||
if (end.offset < 1 && endIter == pks->impls.ranges().begin())
|
||||
result.readToBegin = true;
|
||||
else
|
||||
result.readThroughEnd = true;
|
||||
end.offset = 1;
|
||||
}
|
||||
wait(normalizeKeySelectorActor(sks, ryw, &begin, &lastModuleRead, &actualBeginOffset, &result));
|
||||
wait(normalizeKeySelectorActor(sks, ryw, &end, &lastModuleRead, &actualEndOffset, &result));
|
||||
// Handle all corner cases like what RYW does
|
||||
// return if range inverted
|
||||
if (actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
|
||||
|
@ -180,12 +160,12 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* pks, Reference<ReadYo
|
|||
return std::make_pair(RangeResultRef(false, false), lastModuleRead);
|
||||
}
|
||||
// If touches begin or end, return with readToBegin and readThroughEnd flags
|
||||
if (beginIter == pks->impls.ranges().end() || endIter == pks->impls.ranges().begin()) {
|
||||
if (begin.getKey() == sks->range.end || end.getKey() == sks->range.begin) {
|
||||
TEST(true);
|
||||
return std::make_pair(result, lastModuleRead);
|
||||
}
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Ranges ranges =
|
||||
pks->impls.intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
|
||||
sks->impls.intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
|
||||
// TODO : workaround to write this two together to make the code compact
|
||||
// The issue here is boost::iterator_range<> doest not provide rbegin(), rend()
|
||||
iter = reverse ? ranges.end() : ranges.begin();
|
||||
|
@ -256,11 +236,11 @@ Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(Reference<ReadYourW
|
|||
return checkModuleFound(this, ryw, begin, end, limits, reverse);
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw,
|
||||
ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRef key) {
|
||||
// use getRange to workaround this
|
||||
Standalone<RangeResultRef> result =
|
||||
wait(pks->getRange(ryw, KeySelector(firstGreaterOrEqual(key)), KeySelector(firstGreaterOrEqual(keyAfter(key))),
|
||||
wait(sks->getRange(ryw, KeySelector(firstGreaterOrEqual(key)), KeySelector(firstGreaterOrEqual(keyAfter(key))),
|
||||
GetRangeLimits(CLIENT_KNOBS->TOO_MANY), false));
|
||||
ASSERT(result.size() <= 1);
|
||||
if (result.size()) {
|
||||
|
@ -349,21 +329,21 @@ private:
|
|||
};
|
||||
|
||||
TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
|
||||
SpecialKeySpace pks(normalKeys.begin, normalKeys.end);
|
||||
SpecialKeySpace sks(normalKeys.begin, normalKeys.end);
|
||||
SpecialKeyRangeTestImpl pkr1(KeyRangeRef(LiteralStringRef("/cat/"), LiteralStringRef("/cat/\xff")), "small", 10);
|
||||
SpecialKeyRangeTestImpl pkr2(KeyRangeRef(LiteralStringRef("/dog/"), LiteralStringRef("/dog/\xff")), "medium", 100);
|
||||
SpecialKeyRangeTestImpl pkr3(KeyRangeRef(LiteralStringRef("/pig/"), LiteralStringRef("/pig/\xff")), "large", 1000);
|
||||
pks.registerKeyRange(pkr1.getKeyRange(), &pkr1);
|
||||
pks.registerKeyRange(pkr2.getKeyRange(), &pkr2);
|
||||
pks.registerKeyRange(pkr3.getKeyRange(), &pkr3);
|
||||
sks.registerKeyRange(pkr1.getKeyRange(), &pkr1);
|
||||
sks.registerKeyRange(pkr2.getKeyRange(), &pkr2);
|
||||
sks.registerKeyRange(pkr3.getKeyRange(), &pkr3);
|
||||
auto nullRef = Reference<ReadYourWritesTransaction>();
|
||||
// get
|
||||
{
|
||||
auto resultFuture = pks.get(nullRef, LiteralStringRef("/cat/small0000000009"));
|
||||
auto resultFuture = sks.get(nullRef, LiteralStringRef("/cat/small0000000009"));
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue().get();
|
||||
ASSERT(result == pkr1.getKeyValueForIndex(9).value);
|
||||
auto emptyFuture = pks.get(nullRef, LiteralStringRef("/cat/small0000000010"));
|
||||
auto emptyFuture = sks.get(nullRef, LiteralStringRef("/cat/small0000000010"));
|
||||
ASSERT(emptyFuture.isReady());
|
||||
auto emptyResult = emptyFuture.getValue();
|
||||
ASSERT(!emptyResult.present());
|
||||
|
@ -372,7 +352,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
|
|||
{
|
||||
KeySelector start = KeySelectorRef(LiteralStringRef("/elepant"), false, -9);
|
||||
KeySelector end = KeySelectorRef(LiteralStringRef("/frog"), false, +11);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits());
|
||||
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits());
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
ASSERT(result.size() == 20);
|
||||
|
@ -383,7 +363,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
|
|||
{
|
||||
KeySelector start = KeySelectorRef(pkr3.getKeyForIndex(999), true, -1110);
|
||||
KeySelector end = KeySelectorRef(pkr1.getKeyForIndex(0), false, +1112);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits());
|
||||
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits());
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
ASSERT(result.size() == 1110);
|
||||
|
@ -394,7 +374,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
|
|||
{
|
||||
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
|
||||
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(0), false, 0);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits(2));
|
||||
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits(2));
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
ASSERT(result.size() == 2);
|
||||
|
@ -405,7 +385,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
|
|||
{
|
||||
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
|
||||
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(0), false, 0);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits(10, 100));
|
||||
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits(10, 100));
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
int bytes = 0;
|
||||
|
@ -417,7 +397,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
|
|||
{
|
||||
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
|
||||
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(999), true, +1);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits(1100), true);
|
||||
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits(1100), true);
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
for (int i = 0; i < pkr3.getSize(); ++i) ASSERT(result[i] == pkr3.getKeyValueForIndex(pkr3.getSize() - 1 - i));
|
||||
|
|
|
@ -41,8 +41,6 @@ public:
|
|||
|
||||
explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {}
|
||||
KeyRangeRef getKeyRange() const { return range; }
|
||||
ACTOR Future<Void> normalizeKeySelectorActor(const SpecialKeyRangeBaseImpl* pkrImpl,
|
||||
Reference<ReadYourWritesTransaction> ryw, KeySelector* ks);
|
||||
|
||||
virtual ~SpecialKeyRangeBaseImpl() {}
|
||||
|
||||
|
@ -71,17 +69,20 @@ public:
|
|||
ASSERT(impls.rangeContaining(kr.begin) == impls.rangeContaining(kr.end) && impls[kr.begin] == nullptr);
|
||||
impls.insert(kr, impl);
|
||||
}
|
||||
KeyRangeMap<SpecialKeyRangeBaseImpl*>& getImpls() { return impls; }
|
||||
KeyRangeRef getKeyRange() const { return range; }
|
||||
|
||||
private:
|
||||
ACTOR static Future<Optional<Value>> getActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw, KeyRef key);
|
||||
ACTOR static Future<Optional<Value>> getActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRef key);
|
||||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> checkModuleFound(SpecialKeySpace* sks,
|
||||
Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector begin, KeySelector end, GetRangeLimits limits,
|
||||
bool reverse);
|
||||
ACTOR static Future<std::pair<Standalone<RangeResultRef>, Optional<SpecialKeyRangeBaseImpl*>>> getRangeAggregationActor(
|
||||
SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw, KeySelector begin, KeySelector end,
|
||||
GetRangeLimits limits, bool reverse);
|
||||
Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector begin, KeySelector end,
|
||||
GetRangeLimits limits, bool reverse);
|
||||
ACTOR static Future<std::pair<Standalone<RangeResultRef>, Optional<SpecialKeyRangeBaseImpl*>>>
|
||||
getRangeAggregationActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw, KeySelector begin,
|
||||
KeySelector end, GetRangeLimits limits, bool reverse);
|
||||
|
||||
KeyRangeMap<SpecialKeyRangeBaseImpl*> impls;
|
||||
KeyRange range;
|
||||
|
|
Loading…
Reference in New Issue