Use approximate write range for set_versionstamped_key
This commit is contained in:
parent
b7cd6b20e8
commit
0c514e91c7
|
@ -2433,7 +2433,7 @@ void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationR
|
||||||
|
|
||||||
t.mutations.push_back( req.arena, MutationRef( operationType, r.begin, v ) );
|
t.mutations.push_back( req.arena, MutationRef( operationType, r.begin, v ) );
|
||||||
|
|
||||||
if( addConflictRange )
|
if (addConflictRange && operationType != MutationRef::SetVersionstampedKey)
|
||||||
t.write_conflict_ranges.push_back( req.arena, r );
|
t.write_conflict_ranges.push_back( req.arena, r );
|
||||||
|
|
||||||
TEST(true); //NativeAPI atomic operation
|
TEST(true); //NativeAPI atomic operation
|
||||||
|
|
|
@ -1136,7 +1136,7 @@ public:
|
||||||
|
|
||||||
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
|
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
|
||||||
: cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false),
|
: cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false),
|
||||||
options(tr), deferredError(cx->deferredError) {
|
options(tr), deferredError(cx->deferredError), versionStampFuture(tr.getVersionstamp()) {
|
||||||
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
|
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
|
||||||
std::back_inserter(persistentOptions));
|
std::back_inserter(persistentOptions));
|
||||||
applyPersistentOptions();
|
applyPersistentOptions();
|
||||||
|
@ -1586,57 +1586,55 @@ Standalone<RangeResultRef> ReadYourWritesTransaction::getReadConflictRangeInters
|
||||||
}
|
}
|
||||||
|
|
||||||
Standalone<RangeResultRef> ReadYourWritesTransaction::getWriteConflictRangeIntersecting(KeyRangeRef kr) {
|
Standalone<RangeResultRef> ReadYourWritesTransaction::getWriteConflictRangeIntersecting(KeyRangeRef kr) {
|
||||||
if (writeConflictRangeUnknown) {
|
|
||||||
throw accessed_unreadable();
|
|
||||||
}
|
|
||||||
ASSERT(writeConflictRangeKeysRange.contains(kr));
|
ASSERT(writeConflictRangeKeysRange.contains(kr));
|
||||||
Standalone<RangeResultRef> result;
|
Standalone<RangeResultRef> result;
|
||||||
|
|
||||||
|
// Memory owned by result
|
||||||
|
CoalescedKeyRefRangeMap<ValueRef> writeConflicts{ LiteralStringRef("0"), specialKeys.end };
|
||||||
|
|
||||||
|
for (const auto& k : versionStampKeys) {
|
||||||
|
KeyRange range;
|
||||||
|
if (versionStampFuture.isValid() && versionStampFuture.isReady() && !versionStampFuture.isError()) {
|
||||||
|
const auto& stamp = versionStampFuture.get();
|
||||||
|
StringRef key(range.arena(), k); // Copy
|
||||||
|
ASSERT(k.size() >= 4);
|
||||||
|
int32_t pos;
|
||||||
|
memcpy(&pos, k.end() - sizeof(int32_t), sizeof(int32_t));
|
||||||
|
pos = littleEndian32(pos);
|
||||||
|
ASSERT(pos >= 0 && pos + stamp.size() <= key.size());
|
||||||
|
memcpy(mutateString(key) + pos, stamp.begin(), stamp.size());
|
||||||
|
*(mutateString(key) + key.size() - 4) = '\x00';
|
||||||
|
// singleKeyRange, but share begin and end's memory
|
||||||
|
range = KeyRangeRef(key.substr(0, key.size() - 4), key.substr(0, key.size() - 3));
|
||||||
|
} else {
|
||||||
|
range = getVersionstampKeyRange(result.arena(), k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
|
||||||
|
}
|
||||||
|
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||||
|
LiteralStringRef("1"));
|
||||||
|
}
|
||||||
|
|
||||||
if (!options.readYourWritesDisabled) {
|
if (!options.readYourWritesDisabled) {
|
||||||
kr = kr.removePrefix(writeConflictRangeKeysRange.begin);
|
KeyRangeRef strippedWriteRangePrefix = kr.removePrefix(writeConflictRangeKeysRange.begin);
|
||||||
WriteMap::iterator it(&writes);
|
WriteMap::iterator it(&writes);
|
||||||
it.skip(kr.begin);
|
it.skip(strippedWriteRangePrefix.begin);
|
||||||
if (it.beginKey() != allKeys.begin) --it;
|
for (; it.beginKey() < strippedWriteRangePrefix.end; ++it) {
|
||||||
|
if (it.is_conflict_range()) {
|
||||||
bool inConflictRange = false;
|
writeConflicts.insert(
|
||||||
ExtStringRef conflictBegin;
|
KeyRangeRef(it.beginKey().toArena(result.arena()), it.endKey().toArena(result.arena()))
|
||||||
|
.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||||
for (; it.beginKey() < kr.end; ++it) {
|
LiteralStringRef("1"));
|
||||||
if (it.is_conflict_range() && !inConflictRange) {
|
|
||||||
conflictBegin = std::max(ExtStringRef(allKeys.begin), it.beginKey());
|
|
||||||
inConflictRange = true;
|
|
||||||
} else if (!it.is_conflict_range() && inConflictRange) {
|
|
||||||
KeyRangeRef keyrange(conflictBegin.toArena(result.arena()), it.beginKey().toArena(result.arena()));
|
|
||||||
if (kr.contains(keyrange.begin))
|
|
||||||
result.push_back(result.arena(), KeyValueRef(keyrange.begin.withPrefix(
|
|
||||||
writeConflictRangeKeysRange.begin, result.arena()),
|
|
||||||
LiteralStringRef("1")));
|
|
||||||
if (kr.contains(keyrange.end))
|
|
||||||
result.push_back(result.arena(), KeyValueRef(keyrange.end.withPrefix(
|
|
||||||
writeConflictRangeKeysRange.begin, result.arena()),
|
|
||||||
LiteralStringRef("0")));
|
|
||||||
inConflictRange = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inConflictRange) {
|
|
||||||
KeyRef begin = conflictBegin.toArena(result.arena());
|
|
||||||
if (kr.contains(begin))
|
|
||||||
result.push_back(result.arena(),
|
|
||||||
KeyValueRef(begin.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
|
||||||
LiteralStringRef("1")));
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
CoalescedKeyRefRangeMap<ValueRef> writeConflicts{ LiteralStringRef("0"), specialKeys.end };
|
|
||||||
for (const auto& range : tr.writeConflictRanges())
|
for (const auto& range : tr.writeConflictRanges())
|
||||||
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
writeConflicts.insert(range.withPrefix(writeConflictRangeKeysRange.begin, result.arena()),
|
||||||
LiteralStringRef("1"));
|
LiteralStringRef("1"));
|
||||||
|
}
|
||||||
auto beginIter = writeConflicts.rangeContaining(kr.begin);
|
auto beginIter = writeConflicts.rangeContaining(kr.begin);
|
||||||
if (beginIter->begin() != kr.begin) ++beginIter;
|
if (beginIter->begin() != kr.begin) ++beginIter;
|
||||||
for (auto it = beginIter; it->begin() < kr.end; ++it) {
|
for (auto it = beginIter; it->begin() < kr.end; ++it) {
|
||||||
result.push_back(result.arena(), KeyValueRef(it->begin(), it->value()));
|
result.push_back(result.arena(), KeyValueRef(it->begin(), it->value()));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -1686,9 +1684,10 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
|
||||||
}
|
}
|
||||||
|
|
||||||
if(operationType == MutationRef::SetVersionstampedKey) {
|
if(operationType == MutationRef::SetVersionstampedKey) {
|
||||||
writeConflictRangeUnknown = true;
|
|
||||||
// this does validation of the key and needs to be performed before the readYourWritesDisabled path
|
// this does validation of the key and needs to be performed before the readYourWritesDisabled path
|
||||||
KeyRangeRef range = getVersionstampKeyRange(arena, k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
|
KeyRangeRef range = getVersionstampKeyRange(arena, k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
|
||||||
|
versionStampKeys.push_back_deep(arena, k);
|
||||||
|
addWriteConflict = false;
|
||||||
if(!options.readYourWritesDisabled) {
|
if(!options.readYourWritesDisabled) {
|
||||||
writeRangeToNativeTransaction(range);
|
writeRangeToNativeTransaction(range);
|
||||||
writes.addUnmodifiedAndUnreadableRange(range);
|
writes.addUnmodifiedAndUnreadableRange(range);
|
||||||
|
@ -2094,7 +2093,9 @@ void ReadYourWritesTransaction::reset() {
|
||||||
persistentOptions.clear();
|
persistentOptions.clear();
|
||||||
options.reset(tr);
|
options.reset(tr);
|
||||||
transactionDebugInfo.clear();
|
transactionDebugInfo.clear();
|
||||||
|
versionStampKeys = VectorRef<KeyRef>();
|
||||||
tr.fullReset();
|
tr.fullReset();
|
||||||
|
versionStampFuture = tr.getVersionstamp();
|
||||||
std::copy(tr.getDatabase().getTransactionDefaults().begin(), tr.getDatabase().getTransactionDefaults().end(), std::back_inserter(persistentOptions));
|
std::copy(tr.getDatabase().getTransactionDefaults().begin(), tr.getDatabase().getTransactionDefaults().end(), std::back_inserter(persistentOptions));
|
||||||
resetRyow();
|
resetRyow();
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,9 +162,9 @@ private:
|
||||||
double creationTime;
|
double creationTime;
|
||||||
bool commitStarted;
|
bool commitStarted;
|
||||||
|
|
||||||
// If true, then this transactions write conflict range is not known until commit time.
|
// For reading conflict ranges from the special key space
|
||||||
// Currently only set if this transaction has a SetVersionstampedKey mutation
|
VectorRef<KeyRef> versionStampKeys;
|
||||||
bool writeConflictRangeUnknown = false;
|
Future<Standalone<StringRef>> versionStampFuture;
|
||||||
|
|
||||||
Reference<TransactionDebugInfo> transactionDebugInfo;
|
Reference<TransactionDebugInfo> transactionDebugInfo;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue