Make a smaller range inaccessable after writing a versionstamped key
A transaction's read version is the lower bound of what a transaction's commit version could be. Thus, we can narrow the conflict range of a versionstamped key, and thus reduce the amount of the keyspace that is rendered inaccessable, by filling in the read version on the versionstamped key and using that as the lower bound of the conflict range. This allows reads to still be done to versionstampled keys lower than the read version of the current transaction.
This commit is contained in:
parent
725ab1b996
commit
eb64eede8d
|
@ -229,10 +229,19 @@ static Optional<ValueRef> doCompareAndClear(const Optional<ValueRef>& existingVa
|
|||
return existingValueOptional; // No change required.
|
||||
}
|
||||
|
||||
static void placeVersionstamp( uint8_t* destination, Version version, uint16_t transactionNumber ) {
|
||||
version = bigEndian64(version);
|
||||
transactionNumber = bigEndian16(transactionNumber);
|
||||
static_assert( sizeof(version) == 8, "version size mismatch" );
|
||||
memcpy( destination, &version, sizeof(version) );
|
||||
static_assert( sizeof(transactionNumber) == 2, "txn num size mismatch");
|
||||
memcpy( destination + sizeof(version), &transactionNumber, sizeof(transactionNumber) );
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns the range corresponding to the specified versionstamp key.
|
||||
*/
|
||||
static KeyRangeRef getVersionstampKeyRange(Arena& arena, const KeyRef &key, const KeyRef &maxKey) {
|
||||
static KeyRangeRef getVersionstampKeyRange(Arena& arena, const KeyRef &key, Version minVersion, const KeyRef &maxKey) {
|
||||
KeyRef begin(arena, key);
|
||||
KeyRef end(arena, key);
|
||||
|
||||
|
@ -249,21 +258,12 @@ static KeyRangeRef getVersionstampKeyRange(Arena& arena, const KeyRef &key, cons
|
|||
if (pos < 0 || pos + 10 > begin.size())
|
||||
throw client_invalid_operation();
|
||||
|
||||
memset(mutateString(begin) + pos, 0, 10);
|
||||
placeVersionstamp(mutateString(begin)+pos, minVersion, 0);
|
||||
memset(mutateString(end) + pos, '\xff', 10);
|
||||
|
||||
return KeyRangeRef(begin, std::min(end, maxKey));
|
||||
}
|
||||
|
||||
static void placeVersionstamp( uint8_t* destination, Version version, uint16_t transactionNumber ) {
|
||||
version = bigEndian64(version);
|
||||
transactionNumber = bigEndian16(transactionNumber);
|
||||
static_assert( sizeof(version) == 8, "version size mismatch" );
|
||||
memcpy( destination, &version, sizeof(version) );
|
||||
static_assert( sizeof(transactionNumber) == 2, "txn num size mismatch");
|
||||
memcpy( destination + sizeof(version), &transactionNumber, sizeof(transactionNumber) );
|
||||
}
|
||||
|
||||
static void transformVersionstampMutation( MutationRef& mutation, StringRef MutationRef::* param, Version version, uint16_t transactionNumber ) {
|
||||
if ((mutation.*param).size() >= 4) {
|
||||
int32_t pos;
|
||||
|
|
|
@ -3114,6 +3114,14 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
|||
return readVersion;
|
||||
}
|
||||
|
||||
Optional<Version> Transaction::getCachedReadVersion() {
|
||||
if (readVersion.isValid() && readVersion.isReady() && !readVersion.isError()) {
|
||||
return readVersion.get();
|
||||
} else {
|
||||
return Optional<Version>();
|
||||
}
|
||||
}
|
||||
|
||||
Future<Standalone<StringRef>> Transaction::getVersionstamp() {
|
||||
if(committing.isValid()) {
|
||||
return transaction_invalid_version();
|
||||
|
|
|
@ -212,6 +212,7 @@ public:
|
|||
void setVersion( Version v );
|
||||
Future<Version> getReadVersion() { return getReadVersion(0); }
|
||||
Future<Version> getRawReadVersion();
|
||||
Optional<Version> getCachedReadVersion();
|
||||
|
||||
[[nodiscard]] Future<Optional<Value>> get(const Key& key, bool snapshot = false);
|
||||
[[nodiscard]] Future<Void> watch(Reference<Watch> watch);
|
||||
|
|
|
@ -1565,11 +1565,16 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
|
|||
}
|
||||
|
||||
if(operationType == MutationRef::SetVersionstampedKey) {
|
||||
KeyRangeRef range = getVersionstampKeyRange(arena, k, getMaxReadKey()); // this does validation of the key and needs to be performed before the readYourWritesDisabled path
|
||||
// this does validation of the key and needs to be performed before the readYourWritesDisabled path
|
||||
KeyRangeRef range = getVersionstampKeyRange(arena, k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
|
||||
if(!options.readYourWritesDisabled) {
|
||||
writeRangeToNativeTransaction(range);
|
||||
writes.addUnmodifiedAndUnreadableRange(range);
|
||||
}
|
||||
// k is the unversionstamped key provided by the user. If we've filled in a minimum bound
|
||||
// for the versionstamp, we need to make sure that's reflected when we insert it into the
|
||||
// WriteMap below.
|
||||
k = range.begin;
|
||||
}
|
||||
|
||||
if(operationType == MutationRef::SetVersionstampedValue) {
|
||||
|
|
|
@ -69,6 +69,7 @@ public:
|
|||
|
||||
void setVersion( Version v ) { tr.setVersion(v); }
|
||||
Future<Version> getReadVersion();
|
||||
Optional<Version> getCachedReadVersion() { return tr.getCachedReadVersion(); }
|
||||
Future< Optional<Value> > get( const Key& key, bool snapshot = false );
|
||||
Future< Key > getKey( const KeySelector& key, bool snapshot = false );
|
||||
Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, int limit, bool snapshot = false, bool reverse = false );
|
||||
|
|
|
@ -344,7 +344,7 @@ struct UnreadableWorkload : TestWorkload {
|
|||
key = RandomTestImpl::getRandomVersionstampKey(arena);
|
||||
value = RandomTestImpl::getRandomValue(arena);
|
||||
tr.atomicOp(key, value, MutationRef::SetVersionstampedKey);
|
||||
range = getVersionstampKeyRange(arena, key, allKeys.end);
|
||||
range = getVersionstampKeyRange(arena, key, tr.getCachedReadVersion().orDefault(0), allKeys.end);
|
||||
unreadableMap.insert(range, true);
|
||||
//TraceEvent("RYWT_SetVersionstampKey").detail("Range", printable(range));
|
||||
}
|
||||
|
|
|
@ -724,7 +724,7 @@ struct WriteDuringReadWorkload : TestWorkload {
|
|||
if(!self->useSystemKeys && deterministicRandom()->random01() < 0.01) {
|
||||
Key versionStampKey = self->getRandomVersionStampKey();
|
||||
Value value = self->getRandomValue();
|
||||
KeyRangeRef range = getVersionstampKeyRange(versionStampKey.arena(), versionStampKey, normalKeys.end);
|
||||
KeyRangeRef range = getVersionstampKeyRange(versionStampKey.arena(), versionStampKey, tr.getCachedReadVersion().orDefault(0), normalKeys.end);
|
||||
self->changeCount.insert( range, changeNum++ );
|
||||
//TraceEvent("WDRVersionStamp").detail("VersionStampKey", printable(versionStampKey)).detail("Range", printable(range));
|
||||
tr.atomicOp( versionStampKey, value, MutationRef::SetVersionstampedKey );
|
||||
|
|
Loading…
Reference in New Issue