Allow reading conflict ranges after ryw disabled tx commits
This commit is contained in:
parent
0c514e91c7
commit
4a0925e9a6
|
@ -302,8 +302,12 @@ public:
|
|||
Reference<TransactionLogInfo> trLogInfo;
|
||||
|
||||
const vector<Future<std::pair<Key, Key>>>& getExtraReadConflictRanges() const { return extraConflictRanges; }
|
||||
const VectorRef<KeyRangeRef>& readConflictRanges() const { return tr.transaction.read_conflict_ranges; }
|
||||
const VectorRef<KeyRangeRef>& writeConflictRanges() const { return tr.transaction.write_conflict_ranges; }
|
||||
Standalone<VectorRef<KeyRangeRef>> readConflictRanges() const {
|
||||
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.read_conflict_ranges, tr.arena);
|
||||
}
|
||||
Standalone<VectorRef<KeyRangeRef>> writeConflictRanges() const {
|
||||
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.write_conflict_ranges, tr.arena);
|
||||
}
|
||||
|
||||
private:
|
||||
Future<Version> getReadVersion(uint32_t flags);
|
||||
|
|
|
@ -1044,6 +1044,18 @@ public:
|
|||
wait( ryw->resetPromise.getFuture() || ready );
|
||||
|
||||
if( ryw->options.readYourWritesDisabled ) {
|
||||
|
||||
// Stash away conflict ranges to read after commit
|
||||
ryw->nativeReadRanges = ryw->tr.readConflictRanges();
|
||||
ryw->nativeWriteRanges = ryw->tr.writeConflictRanges();
|
||||
for (const auto& f : ryw->tr.getExtraReadConflictRanges()) {
|
||||
ASSERT(f.isReady() && f.get().first < f.get().second);
|
||||
ryw->nativeReadRanges.push_back(
|
||||
ryw->nativeReadRanges.arena(),
|
||||
KeyRangeRef(f.get().first, f.get().second)
|
||||
.withPrefix(readConflictRangeKeysRange.begin, ryw->nativeReadRanges.arena()));
|
||||
}
|
||||
|
||||
if (ryw->resetPromise.isSet())
|
||||
throw ryw->resetPromise.getFuture().getError();
|
||||
wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
|
||||
|
@ -1571,11 +1583,15 @@ Standalone<RangeResultRef> ReadYourWritesTransaction::getReadConflictRangeInters
|
|||
for (const auto& range : tr.readConflictRanges())
|
||||
readConflicts.insert(range.withPrefix(readConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
for (const auto& f : tr.getExtraReadConflictRanges())
|
||||
if (f.isReady() && f.get().first < f.get().second)
|
||||
readConflicts.insert(KeyRangeRef(f.get().first, f.get().second)
|
||||
.withPrefix(readConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
for (const auto& range : nativeReadRanges)
|
||||
readConflicts.insert(range.withPrefix(readConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
for (const auto& f : tr.getExtraReadConflictRanges()) {
|
||||
ASSERT(f.isReady() && f.get().first < f.get().second);
|
||||
readConflicts.insert(
|
||||
KeyRangeRef(f.get().first, f.get().second).withPrefix(readConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
}
|
||||
auto beginIter = readConflicts.rangeContaining(kr.begin);
|
||||
if (beginIter->begin() != kr.begin) ++beginIter;
|
||||
for (auto it = beginIter; it->begin() < kr.end; ++it) {
|
||||
|
@ -1592,6 +1608,27 @@ Standalone<RangeResultRef> ReadYourWritesTransaction::getWriteConflictRangeInter
|
|||
// Memory owned by result
|
||||
CoalescedKeyRefRangeMap<ValueRef> writeConflicts{ LiteralStringRef("0"), specialKeys.end };
|
||||
|
||||
if (!options.readYourWritesDisabled) {
|
||||
KeyRangeRef strippedWriteRangePrefix = kr.removePrefix(writeConflictRangeKeysRange.begin);
|
||||
WriteMap::iterator it(&writes);
|
||||
it.skip(strippedWriteRangePrefix.begin);
|
||||
if (it.beginKey() > allKeys.begin) --it;
|
||||
for (; it.beginKey() < strippedWriteRangePrefix.end; ++it) {
|
||||
if (it.is_conflict_range())
|
||||
writeConflicts.insert(
|
||||
KeyRangeRef(it.beginKey().toArena(result.arena()), it.endKey().toArena(result.arena()))
|
||||
.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
}
|
||||
} else {
|
||||
for (const auto& range : tr.writeConflictRanges())
|
||||
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
for (const auto& range : nativeWriteRanges)
|
||||
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
}
|
||||
|
||||
for (const auto& k : versionStampKeys) {
|
||||
KeyRange range;
|
||||
if (versionStampFuture.isValid() && versionStampFuture.isReady() && !versionStampFuture.isError()) {
|
||||
|
@ -1613,23 +1650,6 @@ Standalone<RangeResultRef> ReadYourWritesTransaction::getWriteConflictRangeInter
|
|||
LiteralStringRef("1"));
|
||||
}
|
||||
|
||||
if (!options.readYourWritesDisabled) {
|
||||
KeyRangeRef strippedWriteRangePrefix = kr.removePrefix(writeConflictRangeKeysRange.begin);
|
||||
WriteMap::iterator it(&writes);
|
||||
it.skip(strippedWriteRangePrefix.begin);
|
||||
for (; it.beginKey() < strippedWriteRangePrefix.end; ++it) {
|
||||
if (it.is_conflict_range()) {
|
||||
writeConflicts.insert(
|
||||
KeyRangeRef(it.beginKey().toArena(result.arena()), it.endKey().toArena(result.arena()))
|
||||
.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const auto& range : tr.writeConflictRanges())
|
||||
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||
LiteralStringRef("1"));
|
||||
}
|
||||
auto beginIter = writeConflicts.rangeContaining(kr.begin);
|
||||
if (beginIter->begin() != kr.begin) ++beginIter;
|
||||
for (auto it = beginIter; it->begin() < kr.end; ++it) {
|
||||
|
@ -2064,6 +2084,9 @@ void ReadYourWritesTransaction::resetRyow() {
|
|||
cache = SnapshotCache(&arena);
|
||||
writes = WriteMap(&arena);
|
||||
readConflicts = CoalescedKeyRefRangeMap<bool>();
|
||||
versionStampKeys = VectorRef<KeyRef>();
|
||||
nativeReadRanges = Standalone<VectorRef<KeyRangeRef>>();
|
||||
nativeWriteRanges = Standalone<VectorRef<KeyRangeRef>>();
|
||||
watchMap.clear();
|
||||
reading = AndFuture();
|
||||
approximateSize = 0;
|
||||
|
@ -2093,7 +2116,6 @@ void ReadYourWritesTransaction::reset() {
|
|||
persistentOptions.clear();
|
||||
options.reset(tr);
|
||||
transactionDebugInfo.clear();
|
||||
versionStampKeys = VectorRef<KeyRef>();
|
||||
tr.fullReset();
|
||||
versionStampFuture = tr.getVersionstamp();
|
||||
std::copy(tr.getDatabase().getTransactionDefaults().begin(), tr.getDatabase().getTransactionDefaults().end(), std::back_inserter(persistentOptions));
|
||||
|
|
|
@ -165,6 +165,10 @@ private:
|
|||
// For reading conflict ranges from the special key space
|
||||
VectorRef<KeyRef> versionStampKeys;
|
||||
Future<Standalone<StringRef>> versionStampFuture;
|
||||
Standalone<VectorRef<KeyRangeRef>>
|
||||
nativeReadRanges; // Used to read conflict ranges after committing an ryw disabled transaction
|
||||
Standalone<VectorRef<KeyRangeRef>>
|
||||
nativeWriteRanges; // Used to read conflict ranges after committing an ryw disabled transaction
|
||||
|
||||
Reference<TransactionDebugInfo> transactionDebugInfo;
|
||||
|
||||
|
|
Loading…
Reference in New Issue