Cancel pending special key space reads on destroy

Previously the special key space would take a reference to the
transaction, but this doesn't make sense since the transaction might not
be refcounted
This commit is contained in:
Andrew Noyes 2020-05-20 20:55:07 +00:00
parent 83e9c841a7
commit a2fd6d46a0
7 changed files with 117 additions and 107 deletions

View File

@ -20,7 +20,7 @@ Consequently, the special-key-space framework wants to integrate all client func
If your feature is exposing information to clients and the results are easily formatted as key-value pairs, then you can use special-key-space to implement your client function.
## How
If you choose to use, you need to implement a function class that inherits from `SpecialKeyRangeBaseImpl`, which has an abstract method `Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw, KeyRangeRef kr)`.
If you choose to use, you need to implement a function class that inherits from `SpecialKeyRangeBaseImpl`, which has an abstract method `Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr)`.
This method can be treated as a callback, whose implementation details are determined by the developer.
Once you fill out the method, register the function class to the corresponding key range.
Below is a detailed example.
@ -38,7 +38,7 @@ public:
CountryToCapitalCity[LiteralStringRef("China")] = LiteralStringRef("Beijing");
}
// Implement the getRange interface
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const override {
Standalone<RangeResultRef> result;
@ -95,4 +95,4 @@ In addition, all singleKeyRanges are formatted as modules and cannot be used aga
- STATUSJSON : `\xff\xff/status/json`
- CONNECTIONSTRING : `\xff\xff/connection_string`
- CLUSTERFILEPATH : `\xff\xff/cluster_file_path`
- CLUSTERFILEPATH : `\xff\xff/cluster_file_path`

View File

@ -503,8 +503,7 @@ ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces(Reference<ClusterCo
ACTOR Future<Optional<Value>> getJSON(Database db);
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeBaseImpl {
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override {
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Key prefix = Key(getKeyRange().begin);
return map(getWorkerInterfaces(ryw->getDatabase()->getConnectionFile()),
@ -527,8 +526,7 @@ struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeBaseImpl {
};
struct SingleSpecialKeyImpl : SpecialKeyRangeBaseImpl {
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override {
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override {
ASSERT(kr.contains(k));
return map(f(ryw), [k = k](Optional<Value> v) {
Standalone<RangeResultRef> result;
@ -539,13 +537,12 @@ struct SingleSpecialKeyImpl : SpecialKeyRangeBaseImpl {
});
}
SingleSpecialKeyImpl(KeyRef k,
const std::function<Future<Optional<Value>>(Reference<ReadYourWritesTransaction>)>& f)
SingleSpecialKeyImpl(KeyRef k, const std::function<Future<Optional<Value>>(ReadYourWritesTransaction*)>& f)
: SpecialKeyRangeBaseImpl(singleKeyRange(k)), k(k), f(f) {}
private:
Key k;
std::function<Future<Optional<Value>>(Reference<ReadYourWritesTransaction>)> f;
std::function<Future<Optional<Value>>(ReadYourWritesTransaction*)> f;
};
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
@ -611,43 +608,49 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
std::make_unique<DDStatsRangeImpl>(ddStatsRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::WORKERINTERFACE, std::make_unique<WorkerInterfacesSpecialKeyImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0"))));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::STATUSJSON, std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/status/json"),
[](Reference<ReadYourWritesTransaction> ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
}
}));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::CLUSTERFILEPATH, std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](Reference<ReadYourWritesTransaction> ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(ryw->getDatabase()->getConnectionFile()->getFilename());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::STATUSJSON,
std::make_unique<SingleSpecialKeyImpl>(LiteralStringRef("\xff\xff/status/json"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() &&
ryw->getDatabase()->getConnectionFile()) {
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
}
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CLUSTERFILEPATH,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(ryw->getDatabase()->getConnectionFile()->getFilename());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::CONNECTIONSTRING, std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/connection_string"),
[](Reference<ReadYourWritesTransaction> ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = ryw->getDatabase()->getConnectionFile();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CONNECTIONSTRING,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/connection_string"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = ryw->getDatabase()->getConnectionFile();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
}
throttleExpirer = recurring([this](){ expireThrottles(); }, CLIENT_KNOBS->TAG_THROTTLE_EXPIRATION_INTERVAL);

View File

@ -1230,7 +1230,7 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
if (getDatabase()->apiVersionAtLeast(630)) {
if (specialKeys.contains(key)) {
TEST(true); // Special keys get
return getDatabase()->specialKeySpace->get(Reference<ReadYourWritesTransaction>::addRef(this), key);
return getDatabase()->specialKeySpace->get(this, key);
}
} else {
if (key == LiteralStringRef("\xff\xff/status/json")) {
@ -1312,8 +1312,7 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
if (getDatabase()->apiVersionAtLeast(630)) {
if (specialKeys.contains(begin.getKey()) && end.getKey() <= specialKeys.end) {
TEST(true); // Special key space get range
return getDatabase()->specialKeySpace->getRange(Reference<ReadYourWritesTransaction>::addRef(this), begin,
end, limits, reverse);
return getDatabase()->specialKeySpace->getRange(this, begin, end, limits, reverse);
}
} else {
if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")) {

View File

@ -124,6 +124,8 @@ public:
// Wait for all reads that are currently pending to complete
Future<Void> pendingReads() { return resetPromise.getFuture() || reading; }
// Throws before the lifetime of this transaction ends
Future<Void> resetFuture() { return resetPromise.getFuture(); }
// Used by ThreadSafeTransaction for exceptions thrown in void methods
Error deferredError;

View File

@ -37,8 +37,8 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
// This function will move the given KeySelector as far as possible to the standard form:
// orEqual == false && offset == 1 (Standard form)
// If the corresponding key is not in the underlying key range, it will move over the range
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl,
Reference<ReadYourWritesTransaction> ryw, KeySelector* ks) {
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw,
KeySelector* ks) {
ASSERT(!ks->orEqual); // should be removed before calling
ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized
@ -91,7 +91,7 @@ ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl*
return Void();
}
void onModuleRead(const Reference<ReadYourWritesTransaction>& ryw, SpecialKeySpace::MODULE module,
void onModuleRead(ReadYourWritesTransaction* ryw, SpecialKeySpace::MODULE module,
Optional<SpecialKeySpace::MODULE>& lastModuleRead) {
if (ryw && !ryw->specialKeySpaceRelaxed() && lastModuleRead.present() && lastModuleRead.get() != module) {
throw special_keys_cross_module_read();
@ -105,9 +105,9 @@ void onModuleRead(const Reference<ReadYourWritesTransaction>& ryw, SpecialKeySpa
// It does have overhead here since we query all keys twice in the worst case.
// However, moving the KeySelector while handling other parameters like limits makes the code much more complex and hard
// to maintain; Thus, separate each part to make the code easy to understand and more compact
ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
KeySelector* ks, Optional<SpecialKeySpace::MODULE>* lastModuleRead,
int* actualOffset, Standalone<RangeResultRef>* result) {
ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeySelector* ks,
Optional<SpecialKeySpace::MODULE>* lastModuleRead, int* actualOffset,
Standalone<RangeResultRef>* result) {
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter =
ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey())
: sks->getImpls().rangeContaining(ks->getKey());
@ -140,23 +140,27 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, Reference<Rea
}
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::checkModuleFound(SpecialKeySpace* sks,
Reference<ReadYourWritesTransaction> ryw,
ReadYourWritesTransaction* ryw,
KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse) {
std::pair<Standalone<RangeResultRef>, Optional<SpecialKeySpace::MODULE>> result =
wait(SpecialKeySpace::getRangeAggregationActor(sks, ryw, begin, end, limits, reverse));
if (ryw && !ryw->specialKeySpaceRelaxed()) {
auto module = result.second;
if (!module.present() || module.get() == SpecialKeySpace::MODULE::UNKNOWN) {
throw special_keys_no_module_found();
choose {
when(std::pair<Standalone<RangeResultRef>, Optional<SpecialKeySpace::MODULE>> result =
wait(SpecialKeySpace::getRangeAggregationActor(sks, ryw, begin, end, limits, reverse))) {
if (ryw && !ryw->specialKeySpaceRelaxed()) {
auto module = result.second;
if (!module.present() || module.get() == SpecialKeySpace::MODULE::UNKNOWN) {
throw special_keys_no_module_found();
}
}
return result.first;
}
when(wait(ryw ? ryw->resetFuture() : Never())) { throw internal_error(); }
}
return result.first;
}
ACTOR Future<std::pair<Standalone<RangeResultRef>, Optional<SpecialKeySpace::MODULE>>>
SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
KeySelector begin, KeySelector end, GetRangeLimits limits, bool reverse) {
SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeySelector begin,
KeySelector end, GetRangeLimits limits, bool reverse) {
// This function handles ranges which cover more than one keyrange and aggregates all results
// KeySelector, GetRangeLimits and reverse are all handled here
state Standalone<RangeResultRef> result;
@ -234,9 +238,8 @@ SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks, Reference<ReadYo
return std::make_pair(result, lastModuleRead);
}
Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(Reference<ReadYourWritesTransaction> ryw,
KeySelector begin, KeySelector end, GetRangeLimits limits,
bool reverse) {
Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(ReadYourWritesTransaction* ryw, KeySelector begin,
KeySelector end, GetRangeLimits limits, bool reverse) {
// validate limits here
if (!limits.isValid()) return range_limits_invalid();
if (limits.isReached()) {
@ -255,7 +258,7 @@ Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(Reference<ReadYourW
return checkModuleFound(this, ryw, begin, end, limits, reverse);
}
ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw,
KeyRef key) {
// use getRange to workaround this
Standalone<RangeResultRef> result =
@ -269,34 +272,32 @@ ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* sks, Re
}
}
Future<Optional<Value>> SpecialKeySpace::get(Reference<ReadYourWritesTransaction> ryw, const Key& key) {
Future<Optional<Value>> SpecialKeySpace::get(ReadYourWritesTransaction* ryw, const Key& key) {
return getActor(this, ryw, key);
}
ReadConflictRangeImpl::ReadConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
ACTOR static Future<Standalone<RangeResultRef>> getReadConflictRangeImpl(Reference<ReadYourWritesTransaction> ryw,
KeyRange kr) {
ACTOR static Future<Standalone<RangeResultRef>> getReadConflictRangeImpl(ReadYourWritesTransaction* ryw, KeyRange kr) {
wait(ryw->pendingReads());
return ryw->getReadConflictRangeIntersecting(kr);
}
Future<Standalone<RangeResultRef>> ReadConflictRangeImpl::getRange(Reference<ReadYourWritesTransaction> ryw,
Future<Standalone<RangeResultRef>> ReadConflictRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
return getReadConflictRangeImpl(ryw, kr);
}
WriteConflictRangeImpl::WriteConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
Future<Standalone<RangeResultRef>> WriteConflictRangeImpl::getRange(Reference<ReadYourWritesTransaction> ryw,
Future<Standalone<RangeResultRef>> WriteConflictRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
return ryw->getWriteConflictRangeIntersecting(kr);
}
ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const {
Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
if (ryw->getTransactionInfo().conflictingKeys) {
auto krMapPtr = ryw->getTransactionInfo().conflictingKeys.get();
@ -361,8 +362,7 @@ public:
Key getKeyForIndex(int idx) { return Key(prefix + format("%010d", idx)).withPrefix(range.begin); }
int getSize() { return size; }
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override {
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override {
int startIndex = 0, endIndex = size;
while (startIndex < size && kvs[startIndex].key < kr.begin) ++startIndex;
while (endIndex > startIndex && kvs[endIndex - 1].key >= kr.end) --endIndex;
@ -386,14 +386,13 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
sks.registerKeyRange(SpecialKeySpace::MODULE::TESTONLY, pkr1.getKeyRange(), &pkr1);
sks.registerKeyRange(SpecialKeySpace::MODULE::TESTONLY, pkr2.getKeyRange(), &pkr2);
sks.registerKeyRange(SpecialKeySpace::MODULE::TESTONLY, pkr3.getKeyRange(), &pkr3);
auto nullRef = Reference<ReadYourWritesTransaction>();
// get
{
auto resultFuture = sks.get(nullRef, LiteralStringRef("/cat/small0000000009"));
auto resultFuture = sks.get(nullptr, LiteralStringRef("/cat/small0000000009"));
ASSERT(resultFuture.isReady());
auto result = resultFuture.getValue().get();
ASSERT(result == pkr1.getKeyValueForIndex(9).value);
auto emptyFuture = sks.get(nullRef, LiteralStringRef("/cat/small0000000010"));
auto emptyFuture = sks.get(nullptr, LiteralStringRef("/cat/small0000000010"));
ASSERT(emptyFuture.isReady());
auto emptyResult = emptyFuture.getValue();
ASSERT(!emptyResult.present());
@ -402,7 +401,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
{
KeySelector start = KeySelectorRef(LiteralStringRef("/elepant"), false, -9);
KeySelector end = KeySelectorRef(LiteralStringRef("/frog"), false, +11);
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits());
auto resultFuture = sks.getRange(nullptr, start, end, GetRangeLimits());
ASSERT(resultFuture.isReady());
auto result = resultFuture.getValue();
ASSERT(result.size() == 20);
@ -413,7 +412,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
{
KeySelector start = KeySelectorRef(pkr3.getKeyForIndex(999), true, -1110);
KeySelector end = KeySelectorRef(pkr1.getKeyForIndex(0), false, +1112);
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits());
auto resultFuture = sks.getRange(nullptr, start, end, GetRangeLimits());
ASSERT(resultFuture.isReady());
auto result = resultFuture.getValue();
ASSERT(result.size() == 1110);
@ -424,7 +423,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
{
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(0), false, 0);
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits(2));
auto resultFuture = sks.getRange(nullptr, start, end, GetRangeLimits(2));
ASSERT(resultFuture.isReady());
auto result = resultFuture.getValue();
ASSERT(result.size() == 2);
@ -435,7 +434,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
{
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(0), false, 0);
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits(10, 100));
auto resultFuture = sks.getRange(nullptr, start, end, GetRangeLimits(10, 100));
ASSERT(resultFuture.isReady());
auto result = resultFuture.getValue();
int bytes = 0;
@ -447,7 +446,7 @@ TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
{
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(999), true, +1);
auto resultFuture = sks.getRange(nullRef, start, end, GetRangeLimits(1100), true);
auto resultFuture = sks.getRange(nullptr, start, end, GetRangeLimits(1100), true);
ASSERT(resultFuture.isReady());
auto result = resultFuture.getValue();
for (int i = 0; i < pkr3.getSize(); ++i) ASSERT(result[i] == pkr3.getKeyValueForIndex(pkr3.getSize() - 1 - i));

View File

@ -36,8 +36,7 @@
class SpecialKeyRangeBaseImpl {
public:
// Each derived class only needs to implement this simple version of getRange
virtual Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const = 0;
virtual Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0;
explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {}
KeyRangeRef getKeyRange() const { return range; }
@ -61,10 +60,10 @@ public:
WORKERINTERFACE,
};
Future<Optional<Value>> get(Reference<ReadYourWritesTransaction> ryw, const Key& key);
Future<Optional<Value>> get(ReadYourWritesTransaction* ryw, const Key& key);
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw, KeySelector begin,
KeySelector end, GetRangeLimits limits, bool reverse = false);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse = false);
SpecialKeySpace(KeyRef spaceStartKey = Key(), KeyRef spaceEndKey = normalKeys.end, bool testOnly = true) {
// Default value is nullptr, begin of KeyRangeMap is Key()
@ -108,16 +107,15 @@ public:
KeyRangeRef getKeyRange() const { return range; }
private:
ACTOR static Future<Optional<Value>> getActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw,
KeyRef key);
ACTOR static Future<Optional<Value>> getActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeyRef key);
ACTOR static Future<Standalone<RangeResultRef>> checkModuleFound(SpecialKeySpace* sks,
Reference<ReadYourWritesTransaction> ryw,
KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse);
ReadYourWritesTransaction* ryw, KeySelector begin,
KeySelector end, GetRangeLimits limits,
bool reverse);
ACTOR static Future<std::pair<Standalone<RangeResultRef>, Optional<SpecialKeySpace::MODULE>>>
getRangeAggregationActor(SpecialKeySpace* sks, Reference<ReadYourWritesTransaction> ryw, KeySelector begin,
KeySelector end, GetRangeLimits limits, bool reverse);
getRangeAggregationActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse);
KeyRangeMap<SpecialKeyRangeBaseImpl*> impls;
KeyRangeMap<SpecialKeySpace::MODULE> modules;
@ -135,22 +133,19 @@ private:
class ConflictingKeysImpl : public SpecialKeyRangeBaseImpl {
public:
explicit ConflictingKeysImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override;
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class ReadConflictRangeImpl : public SpecialKeyRangeBaseImpl {
public:
explicit ReadConflictRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override;
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class WriteConflictRangeImpl : public SpecialKeyRangeBaseImpl {
public:
explicit WriteConflictRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override;
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class DDStatsRangeImpl : public SpecialKeyRangeBaseImpl {

View File

@ -28,8 +28,7 @@
class SKSCTestImpl : public SpecialKeyRangeBaseImpl {
public:
explicit SKSCTestImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
virtual Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const {
virtual Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
ASSERT(range.contains(kr));
auto resultFuture = ryw->getRange(kr, CLIENT_KNOBS->TOO_MANY);
// all keys are written to RYW, since GRV is set, the read should happen locally
@ -108,12 +107,25 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
return Void();
}
ACTOR Future<Void> _start(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
testRywLifetime(cx);
wait(timeout(self->testModuleRangeReadErrors(cx, self) && self->getRangeCallActor(cx, self) &&
testConflictRanges(cx, /*read*/ true, self) && testConflictRanges(cx, /*read*/ false, self),
self->testDuration, Void()));
return Void();
}
// This would be a unit test except we need a Database to create an ryw transaction
static void testRywLifetime(Database cx) {
Future<Void> f;
{
ReadYourWritesTransaction ryw{ cx->clone() };
f = success(ryw.get(LiteralStringRef("\xff\xff/status/json")));
TEST(!f.isReady());
}
ASSERT(f.isError());
ASSERT(f.getError().code() == error_code_transaction_cancelled);
}
ACTOR Future<Void> getRangeCallActor(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
state double lastTime = now();
loop {
@ -125,7 +137,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
auto correctResultFuture = self->ryw->getRange(begin, end, limit, false, reverse);
ASSERT(correctResultFuture.isReady());
auto correctResult = correctResultFuture.getValue();
auto testResultFuture = cx->specialKeySpace->getRange(self->ryw, begin, end, limit, reverse);
auto testResultFuture = cx->specialKeySpace->getRange(self->ryw.getPtr(), begin, end, limit, reverse);
ASSERT(testResultFuture.isReady());
auto testResult = testResultFuture.getValue();