Merge pull request #3046 from atn34/atn34/conflict-ranges

Add read/write conflict ranges to special key space
This commit is contained in:
Meng Xu 2020-05-07 22:36:18 -07:00 committed by GitHub
commit a08bbcc539
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 350 additions and 13 deletions

View File

@ -231,6 +231,8 @@ public:
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
std::shared_ptr<SpecialKeySpace> specialKeySpace;
std::shared_ptr<ConflictingKeysImpl> cKImpl;
std::shared_ptr<ReadConflictRangeImpl> rCRImpl;
std::shared_ptr<WriteConflictRangeImpl> wCRImpl;
};
#endif

View File

@ -268,6 +268,10 @@ struct KeyRangeRef {
return KeyRangeRef( begin.withPrefix(prefix), end.withPrefix(prefix) );
}
KeyRangeRef withPrefix(const StringRef& prefix, Arena& arena) const {
return KeyRangeRef(begin.withPrefix(prefix, arena), end.withPrefix(prefix, arena));
}
KeyRangeRef removePrefix( const StringRef& prefix ) const {
return KeyRangeRef( begin.removePrefix(prefix), end.removePrefix(prefix) );
}

View File

@ -528,7 +528,9 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal),
specialKeySpace(std::make_shared<SpecialKeySpace>(normalKeys.begin, specialKeys.end)),
cKImpl(std::make_shared<ConflictingKeysImpl>(conflictingKeysRange)) {
cKImpl(std::make_shared<ConflictingKeysImpl>(conflictingKeysRange)),
rCRImpl(std::make_shared<ReadConflictRangeImpl>(readConflictRangeKeysRange)),
wCRImpl(std::make_shared<WriteConflictRangeImpl>(writeConflictRangeKeysRange)) {
dbId = deterministicRandom()->randomUniqueID();
connected = clientInfo->get().proxies.size() ? Void() : clientInfo->onChange();
@ -548,6 +550,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
specialKeySpace->registerKeyRange(conflictingKeysRange, cKImpl.get());
specialKeySpace->registerKeyRange(readConflictRangeKeysRange, rCRImpl.get());
specialKeySpace->registerKeyRange(writeConflictRangeKeysRange, wCRImpl.get());
}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
@ -2427,7 +2431,7 @@ void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationR
t.mutations.push_back( req.arena, MutationRef( operationType, r.begin, v ) );
if( addConflictRange )
if (addConflictRange && operationType != MutationRef::SetVersionstampedKey)
t.write_conflict_ranges.push_back( req.arena, r );
TEST(true); //NativeAPI atomic operation

View File

@ -301,6 +301,15 @@ public:
TransactionOptions options;
double startTime;
Reference<TransactionLogInfo> trLogInfo;
const vector<Future<std::pair<Key, Key>>>& getExtraReadConflictRanges() const { return extraConflictRanges; }
Standalone<VectorRef<KeyRangeRef>> readConflictRanges() const {
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.read_conflict_ranges, tr.arena);
}
Standalone<VectorRef<KeyRangeRef>> writeConflictRanges() const {
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.write_conflict_ranges, tr.arena);
}
private:
Future<Version> getReadVersion(uint32_t flags);
void setPriority(uint32_t priorityFlag);

View File

@ -1040,6 +1040,18 @@ public:
wait( ryw->resetPromise.getFuture() || ready );
if( ryw->options.readYourWritesDisabled ) {
// Stash away conflict ranges to read after commit
ryw->nativeReadRanges = ryw->tr.readConflictRanges();
ryw->nativeWriteRanges = ryw->tr.writeConflictRanges();
for (const auto& f : ryw->tr.getExtraReadConflictRanges()) {
if (f.isReady() && f.get().first < f.get().second)
ryw->nativeReadRanges.push_back(
ryw->nativeReadRanges.arena(),
KeyRangeRef(f.get().first, f.get().second)
.withPrefix(readConflictRangeKeysRange.begin, ryw->nativeReadRanges.arena()));
}
if (ryw->resetPromise.isSet())
throw ryw->resetPromise.getFuture().getError();
wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
@ -1132,7 +1144,7 @@ public:
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
: cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false),
options(tr), deferredError(cx->deferredError) {
options(tr), deferredError(cx->deferredError), versionStampFuture(tr.getVersionstamp()) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
std::back_inserter(persistentOptions));
applyPersistentOptions();
@ -1290,7 +1302,7 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
}
// special key space are only allowed to query if both begin and end are in \xff\xff, \xff\xff\xff
if (specialKeys.contains(begin.getKey()) && specialKeys.contains(end.getKey()))
if (specialKeys.contains(begin.getKey()) && end.getKey() <= specialKeys.end)
return getDatabase()->specialKeySpace->getRange(Reference<ReadYourWritesTransaction>::addRef(this), begin, end,
limits, reverse);
@ -1545,6 +1557,104 @@ void ReadYourWritesTransaction::getWriteConflicts( KeyRangeMap<bool> *result ) {
}
}
Standalone<RangeResultRef> ReadYourWritesTransaction::getReadConflictRangeIntersecting(KeyRangeRef kr) {
ASSERT(readConflictRangeKeysRange.contains(kr));
ASSERT(!tr.options.checkWritesEnabled)
Standalone<RangeResultRef> result;
if (!options.readYourWritesDisabled) {
kr = kr.removePrefix(readConflictRangeKeysRange.begin);
auto iter = readConflicts.rangeContainingKeyBefore(kr.begin);
if (iter->begin() == allKeys.begin && !iter->value()) {
++iter; // Conventionally '' is missing from the result range if it's not part of a read conflict
}
for (; iter->begin() < kr.end; ++iter) {
if (kr.begin <= iter->begin() && iter->begin() < kr.end) {
result.push_back(result.arena(),
KeyValueRef(iter->begin().withPrefix(readConflictRangeKeysRange.begin, result.arena()),
iter->value() ? LiteralStringRef("1") : LiteralStringRef("0")));
}
}
} else {
CoalescedKeyRefRangeMap<ValueRef> readConflicts{ LiteralStringRef("0"), specialKeys.end };
for (const auto& range : tr.readConflictRanges())
readConflicts.insert(range.withPrefix(readConflictRangeKeysRange.begin, result.arena()),
LiteralStringRef("1"));
for (const auto& range : nativeReadRanges)
readConflicts.insert(range.withPrefix(readConflictRangeKeysRange.begin, result.arena()),
LiteralStringRef("1"));
for (const auto& f : tr.getExtraReadConflictRanges()) {
if (f.isReady() && f.get().first < f.get().second)
readConflicts.insert(KeyRangeRef(f.get().first, f.get().second)
.withPrefix(readConflictRangeKeysRange.begin, result.arena()),
LiteralStringRef("1"));
}
auto beginIter = readConflicts.rangeContaining(kr.begin);
if (beginIter->begin() != kr.begin) ++beginIter;
for (auto it = beginIter; it->begin() < kr.end; ++it) {
result.push_back(result.arena(), KeyValueRef(it->begin(), it->value()));
}
}
return result;
}
Standalone<RangeResultRef> ReadYourWritesTransaction::getWriteConflictRangeIntersecting(KeyRangeRef kr) {
ASSERT(writeConflictRangeKeysRange.contains(kr));
Standalone<RangeResultRef> result;
// Memory owned by result
CoalescedKeyRefRangeMap<ValueRef> writeConflicts{ LiteralStringRef("0"), specialKeys.end };
if (!options.readYourWritesDisabled) {
KeyRangeRef strippedWriteRangePrefix = kr.removePrefix(writeConflictRangeKeysRange.begin);
WriteMap::iterator it(&writes);
it.skip(strippedWriteRangePrefix.begin);
if (it.beginKey() > allKeys.begin) --it;
for (; it.beginKey() < strippedWriteRangePrefix.end; ++it) {
if (it.is_conflict_range())
writeConflicts.insert(
KeyRangeRef(it.beginKey().toArena(result.arena()), it.endKey().toArena(result.arena()))
.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
LiteralStringRef("1"));
}
} else {
for (const auto& range : tr.writeConflictRanges())
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
LiteralStringRef("1"));
for (const auto& range : nativeWriteRanges)
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
LiteralStringRef("1"));
}
for (const auto& k : versionStampKeys) {
KeyRange range;
if (versionStampFuture.isValid() && versionStampFuture.isReady() && !versionStampFuture.isError()) {
const auto& stamp = versionStampFuture.get();
StringRef key(range.arena(), k); // Copy
ASSERT(k.size() >= 4);
int32_t pos;
memcpy(&pos, k.end() - sizeof(int32_t), sizeof(int32_t));
pos = littleEndian32(pos);
ASSERT(pos >= 0 && pos + stamp.size() <= key.size());
memcpy(mutateString(key) + pos, stamp.begin(), stamp.size());
*(mutateString(key) + key.size() - 4) = '\x00';
// singleKeyRange, but share begin and end's memory
range = KeyRangeRef(key.substr(0, key.size() - 4), key.substr(0, key.size() - 3));
} else {
range = getVersionstampKeyRange(result.arena(), k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
}
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
LiteralStringRef("1"));
}
auto beginIter = writeConflicts.rangeContaining(kr.begin);
if (beginIter->begin() != kr.begin) ++beginIter;
for (auto it = beginIter; it->begin() < kr.end; ++it) {
result.push_back(result.arena(), KeyValueRef(it->begin(), it->value()));
}
return result;
}
void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& operand, uint32_t operationType ) {
bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
@ -1593,6 +1703,8 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
TEST(options.readYourWritesDisabled); // SetVersionstampedKey without ryw enabled
// this does validation of the key and needs to be performed before the readYourWritesDisabled path
KeyRangeRef range = getVersionstampKeyRange(arena, k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
versionStampKeys.push_back(arena, k);
addWriteConflict = false;
if(!options.readYourWritesDisabled) {
writeRangeToNativeTransaction(range);
writes.addUnmodifiedAndUnreadableRange(range);
@ -1911,6 +2023,9 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N
cache.arena = &arena;
writes.arena = &arena;
persistentOptions = std::move(r.persistentOptions);
nativeReadRanges = std::move(r.nativeReadRanges);
nativeWriteRanges = std::move(r.nativeWriteRanges);
versionStampKeys = std::move(r.versionStampKeys);
}
ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT :
@ -1935,6 +2050,9 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
watchMap = std::move( r.watchMap );
r.resetPromise = Promise<Void>();
persistentOptions = std::move(r.persistentOptions);
nativeReadRanges = std::move(r.nativeReadRanges);
nativeWriteRanges = std::move(r.nativeWriteRanges);
versionStampKeys = std::move(r.versionStampKeys);
}
Future<Void> ReadYourWritesTransaction::onError(Error const& e) {
@ -1969,6 +2087,9 @@ void ReadYourWritesTransaction::resetRyow() {
cache = SnapshotCache(&arena);
writes = WriteMap(&arena);
readConflicts = CoalescedKeyRefRangeMap<bool>();
versionStampKeys = VectorRef<KeyRef>();
nativeReadRanges = Standalone<VectorRef<KeyRangeRef>>();
nativeWriteRanges = Standalone<VectorRef<KeyRangeRef>>();
watchMap.clear();
reading = AndFuture();
approximateSize = 0;
@ -1999,6 +2120,7 @@ void ReadYourWritesTransaction::reset() {
options.reset(tr);
transactionDebugInfo.clear();
tr.fullReset();
versionStampFuture = tr.getVersionstamp();
std::copy(tr.getDatabase().getTransactionDefaults().begin(), tr.getDatabase().getTransactionDefaults().end(), std::back_inserter(persistentOptions));
resetRyow();
}

View File

@ -119,7 +119,10 @@ public:
void reset();
void debugTransaction(UID dID) { tr.debugTransaction(dID); }
Future<Void> debug_onIdle() { return reading; }
Future<Void> debug_onIdle() { return reading; }
// Wait for all reads that are currently pending to complete
Future<Void> pendingReads() { return resetPromise.getFuture() || reading; }
// Used by ThreadSafeTransaction for exceptions thrown in void methods
Error deferredError;
@ -135,6 +138,12 @@ public:
const TransactionInfo& getTransactionInfo() const {
return tr.info;
}
// Read from the special key space readConflictRangeKeysRange
Standalone<RangeResultRef> getReadConflictRangeIntersecting(KeyRangeRef kr);
// Read from the special key space writeConflictRangeKeysRange
Standalone<RangeResultRef> getWriteConflictRangeIntersecting(KeyRangeRef kr);
private:
friend class RYWImpl;
@ -152,6 +161,14 @@ private:
double creationTime;
bool commitStarted;
// For reading conflict ranges from the special key space
VectorRef<KeyRef> versionStampKeys;
Future<Standalone<StringRef>> versionStampFuture;
Standalone<VectorRef<KeyRangeRef>>
nativeReadRanges; // Used to read conflict ranges after committing an ryw disabled transaction
Standalone<VectorRef<KeyRangeRef>>
nativeWriteRanges; // Used to read conflict ranges after committing an ryw disabled transaction
Reference<TransactionDebugInfo> transactionDebugInfo;
void resetTimeout();

View File

@ -243,6 +243,26 @@ Future<Optional<Value>> SpecialKeySpace::get(Reference<ReadYourWritesTransaction
return getActor(this, ryw, key);
}
ReadConflictRangeImpl::ReadConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
ACTOR static Future<Standalone<RangeResultRef>> getReadConflictRangeImpl(Reference<ReadYourWritesTransaction> ryw,
KeyRange kr) {
wait(ryw->pendingReads());
return ryw->getReadConflictRangeIntersecting(kr);
}
Future<Standalone<RangeResultRef>> ReadConflictRangeImpl::getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const {
return getReadConflictRangeImpl(ryw, kr);
}
WriteConflictRangeImpl::WriteConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
Future<Standalone<RangeResultRef>> WriteConflictRangeImpl::getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const {
return ryw->getWriteConflictRangeIntersecting(kr);
}
ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(Reference<ReadYourWritesTransaction> ryw,

View File

@ -95,5 +95,19 @@ public:
KeyRangeRef kr) const override;
};
class ReadConflictRangeImpl : public SpecialKeyRangeBaseImpl {
public:
explicit ReadConflictRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override;
};
class WriteConflictRangeImpl : public SpecialKeyRangeBaseImpl {
public:
explicit WriteConflictRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -108,11 +108,20 @@ void decodeKeyServersValue( Standalone<RangeResultRef> result, const ValueRef& v
std::sort(dest.begin(), dest.end());
}
const KeyRangeRef conflictingKeysRange = KeyRangeRef(LiteralStringRef("\xff\xff/transaction/conflicting_keys/"),
LiteralStringRef("\xff\xff/transaction/conflicting_keys/\xff"));
const KeyRangeRef conflictingKeysRange =
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/conflicting_keys/"),
LiteralStringRef("\xff\xff/transaction/conflicting_keys/\xff\xff"));
const ValueRef conflictingKeysTrue = LiteralStringRef("1");
const ValueRef conflictingKeysFalse = LiteralStringRef("0");
const KeyRangeRef readConflictRangeKeysRange =
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/read_conflict_range/"),
LiteralStringRef("\xff\xff/transaction/read_conflict_range/\xff\xff"));
const KeyRangeRef writeConflictRangeKeysRange =
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/write_conflict_range/"),
LiteralStringRef("\xff\xff/transaction/write_conflict_range/\xff\xff"));
// "\xff/storageCache/[[begin]]" := "[[vector<uint16_t>]]"
const KeyRangeRef storageCacheKeys( LiteralStringRef("\xff/storageCache/"), LiteralStringRef("\xff/storageCache0") );
const KeyRef storageCachePrefix = storageCacheKeys.begin;

View File

@ -77,6 +77,8 @@ bool serverHasKey( ValueRef storedValue );
extern const KeyRangeRef conflictingKeysRange;
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;
extern const KeyRangeRef writeConflictRangeKeysRange;
extern const KeyRangeRef readConflictRangeKeysRange;
extern const KeyRef cacheKeysPrefix;

View File

@ -652,7 +652,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
limit = deterministicRandom()->randomInt(0, INT_MAX)+1;
}
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && specialKeys.contains(keysel2.getKey());
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair( error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf(limit < 0) ),
@ -685,7 +685,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
keysel2 = makeKeySel();
limits = makeRangeLimits();
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && specialKeys.contains(keysel2.getKey());
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair( error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf( !limits.isReached() && !limits.isValid()) ),
@ -729,7 +729,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
limit = deterministicRandom()->randomInt(0, INT_MAX)+1;
}
bool isSpecialKeyRange = specialKeys.contains(key1) && specialKeys.contains(key2);
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
contract = {
std::make_pair( error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2) ),
@ -764,7 +764,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
key2 = makeKey();
limits = makeRangeLimits();
bool isSpecialKeyRange = specialKeys.contains(key1) && specialKeys.contains(key2);
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
contract = {
std::make_pair( error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2) ),

View File

@ -42,7 +42,7 @@ public:
struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
int actorCount, minKeysPerRange, maxKeysPerRange, rangeCount, keyBytes, valBytes;
int actorCount, minKeysPerRange, maxKeysPerRange, rangeCount, keyBytes, valBytes, conflictRangeSizeFactor;
double testDuration, absoluteRandomProb, transactionsPerSecond;
PerfIntCounter wrongResults, keysCount;
Reference<ReadYourWritesTransaction> ryw; // used to store all populated data
@ -60,6 +60,9 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 100.0);
actorCount = getOption(options, LiteralStringRef("actorCount"), 1);
absoluteRandomProb = getOption(options, LiteralStringRef("absoluteRandomProb"), 0.5);
// Controls the relative size of read/write conflict ranges and the number of random getranges
conflictRangeSizeFactor = getOption(options, LiteralStringRef("conflictRangeSizeFactor"), 10);
ASSERT(conflictRangeSizeFactor >= 1);
}
virtual std::string description() { return "SpecialKeySpaceCorrectness"; }
@ -72,6 +75,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
Future<Void> _setup(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
cx->specialKeySpace = std::make_shared<SpecialKeySpace>();
if (self->clientId == 0) {
self->ryw = Reference(new ReadYourWritesTransaction(cx));
self->ryw->setVersion(100);
@ -97,7 +101,11 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
return Void();
}
ACTOR Future<Void> _start(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
if (self->clientId == 0) wait(timeout(self->getRangeCallActor(cx, self), self->testDuration, Void()));
if (self->clientId == 0) {
wait(timeout(self->getRangeCallActor(cx, self) && testConflictRanges(cx, /*read*/ true, self) &&
testConflictRanges(cx, /*read*/ false, self),
self->testDuration, Void()));
}
return Void();
}
@ -161,6 +169,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
.detail("TestValue", printable(res2[i].value));
return false;
}
TEST(true); // Special key space keys equal
}
return true;
}
@ -201,6 +210,131 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
return GetRangeLimits(rowLimits, byteLimits);
}
ACTOR static Future<Void> testConflictRanges(Database cx_, bool read, SpecialKeySpaceCorrectnessWorkload* self) {
state StringRef prefix = read ? readConflictRangeKeysRange.begin : writeConflictRangeKeysRange.begin;
TEST(read); // test read conflict range special key implementation
TEST(!read); // test write conflict range special key implementation
// Get a default special key range instance
Database cx = cx_->clone();
state Reference<ReadYourWritesTransaction> tx = Reference(new ReadYourWritesTransaction(cx));
state Reference<ReadYourWritesTransaction> referenceTx = Reference(new ReadYourWritesTransaction(cx));
state bool ryw = deterministicRandom()->coinflip();
if (!ryw) {
tx->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
}
referenceTx->setVersion(100); // Prevent this from doing a GRV or committing
referenceTx->clear(normalKeys);
referenceTx->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
int numKeys = deterministicRandom()->randomInt(1, self->conflictRangeSizeFactor) * 4;
state std::vector<std::string> keys; // Must all be distinct
keys.resize(numKeys);
int lastKey = 0;
for (auto& key : keys) {
key = std::to_string(lastKey++);
}
if (deterministicRandom()->coinflip()) {
// Include beginning of keyspace
keys.push_back("");
}
if (deterministicRandom()->coinflip()) {
// Include end of keyspace
keys.push_back("\xff");
}
std::mt19937 g(deterministicRandom()->randomUInt32());
std::shuffle(keys.begin(), keys.end(), g);
// First half of the keys will be ranges, the other keys will mix in some read boundaries that aren't range
// boundaries
std::sort(keys.begin(), keys.begin() + keys.size() / 2);
for (auto iter = keys.begin(); iter + 1 < keys.begin() + keys.size() / 2; iter += 2) {
Standalone<KeyRangeRef> range = KeyRangeRef(*iter, *(iter + 1));
if (read) {
tx->addReadConflictRange(range);
// Add it twice so that we can observe the de-duplication that should get done
tx->addReadConflictRange(range);
} else {
tx->addWriteConflictRange(range);
tx->addWriteConflictRange(range);
}
// TODO test that fails if we don't wait on tx->pendingReads()
referenceTx->set(range.begin, LiteralStringRef("1"));
referenceTx->set(range.end, LiteralStringRef("0"));
}
if (!read && deterministicRandom()->coinflip()) {
try {
wait(tx->commit());
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
return Void();
}
TEST(true); // Read write conflict range of committed transaction
}
for (int i = 0; i < self->conflictRangeSizeFactor; ++i) {
GetRangeLimits limit;
KeySelector begin;
KeySelector end;
loop {
begin = firstGreaterOrEqual(deterministicRandom()->randomChoice(keys));
end = firstGreaterOrEqual(deterministicRandom()->randomChoice(keys));
if (begin.getKey() <= end.getKey()) break;
}
bool reverse = deterministicRandom()->coinflip();
auto correctResultFuture = referenceTx->getRange(begin, end, limit, false, reverse);
ASSERT(correctResultFuture.isReady());
begin.setKey(begin.getKey().withPrefix(prefix, begin.arena()));
end.setKey(end.getKey().withPrefix(prefix, begin.arena()));
auto testResultFuture = tx->getRange(begin, end, limit, false, reverse);
ASSERT(testResultFuture.isReady());
auto correct_iter = correctResultFuture.get().begin();
auto test_iter = testResultFuture.get().begin();
bool had_error = false;
while (correct_iter != correctResultFuture.get().end() && test_iter != testResultFuture.get().end()) {
if (correct_iter->key != test_iter->key.removePrefix(prefix) ||
correct_iter->value != test_iter->value) {
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Mismatched keys")
.detail("ConflictType", read ? "read" : "write")
.detail("CorrectKey", correct_iter->key)
.detail("TestKey", test_iter->key)
.detail("CorrectValue", correct_iter->value)
.detail("TestValue", test_iter->value)
.detail("Begin", begin.toString())
.detail("End", end.toString())
.detail("Ryw", ryw);
had_error = true;
}
++correct_iter;
++test_iter;
}
while (correct_iter != correctResultFuture.get().end()) {
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Extra correct key")
.detail("ConflictType", read ? "read" : "write")
.detail("CorrectKey", correct_iter->key)
.detail("CorrectValue", correct_iter->value)
.detail("Begin", begin.toString())
.detail("End", end.toString())
.detail("Ryw", ryw);
++correct_iter;
had_error = true;
}
while (test_iter != testResultFuture.get().end()) {
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Extra test key")
.detail("ConflictType", read ? "read" : "write")
.detail("TestKey", test_iter->key)
.detail("TestValue", test_iter->value)
.detail("Begin", begin.toString())
.detail("End", end.toString())
.detail("Ryw", ryw);
++test_iter;
had_error = true;
}
if (had_error) break;
}
return Void();
}
};
WorkloadFactory<SpecialKeySpaceCorrectnessWorkload> SpecialKeySpaceCorrectnessFactory("SpecialKeySpaceCorrectness");