Merge pull request #2557 from alexmiller-apple/reduce-versionstamp-conflictranges
Narrow the unreadable range of keys after a versionstamped key operation
This commit is contained in:
commit
8a065b9da4
|
@ -142,7 +142,7 @@
|
|||
A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database. The last 2 bytes are monotonic in the serialization order for transactions.
|
||||
|
||||
.. |atomic-versionstamps-2| replace::
|
||||
A transaction is not permitted to read any transformed key or value previously set within that transaction, and an attempt to do so will result in an error.
|
||||
A transaction is not permitted to read any transformed key or value previously set within that transaction, and an attempt to do so will result in an ``accessed_unreadable`` error. The range of keys marked unreadable when setting a versionstamped key begins at the transactions's read version if it is known, otherwise a versionstamp of all ``0x00`` bytes is conservatively assumed. The upper bound of the unreadable range is a versionstamp of all ``0xFF`` bytes.
|
||||
|
||||
.. |atomic-versionstamps-tuple-warning-key| replace::
|
||||
At this time, versionstamped keys are not compatible with the Tuple layer except in Java, Python, and Go. Note that this implies versionstamped keys may not be used with the Subspace and Directory layers except in those languages.
|
||||
|
|
|
@ -229,10 +229,19 @@ static Optional<ValueRef> doCompareAndClear(const Optional<ValueRef>& existingVa
|
|||
return existingValueOptional; // No change required.
|
||||
}
|
||||
|
||||
static void placeVersionstamp( uint8_t* destination, Version version, uint16_t transactionNumber ) {
|
||||
version = bigEndian64(version);
|
||||
transactionNumber = bigEndian16(transactionNumber);
|
||||
static_assert( sizeof(version) == 8, "version size mismatch" );
|
||||
memcpy( destination, &version, sizeof(version) );
|
||||
static_assert( sizeof(transactionNumber) == 2, "txn num size mismatch");
|
||||
memcpy( destination + sizeof(version), &transactionNumber, sizeof(transactionNumber) );
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns the range corresponding to the specified versionstamp key.
|
||||
*/
|
||||
static KeyRangeRef getVersionstampKeyRange(Arena& arena, const KeyRef &key, const KeyRef &maxKey) {
|
||||
static KeyRangeRef getVersionstampKeyRange(Arena& arena, const KeyRef &key, Version minVersion, const KeyRef &maxKey) {
|
||||
KeyRef begin(arena, key);
|
||||
KeyRef end(arena, key);
|
||||
|
||||
|
@ -249,19 +258,23 @@ static KeyRangeRef getVersionstampKeyRange(Arena& arena, const KeyRef &key, cons
|
|||
if (pos < 0 || pos + 10 > begin.size())
|
||||
throw client_invalid_operation();
|
||||
|
||||
memset(mutateString(begin) + pos, 0, 10);
|
||||
placeVersionstamp(mutateString(begin)+pos, minVersion, 0);
|
||||
memset(mutateString(end) + pos, '\xff', 10);
|
||||
|
||||
return KeyRangeRef(begin, std::min(end, maxKey));
|
||||
}
|
||||
|
||||
static void placeVersionstamp( uint8_t* destination, Version version, uint16_t transactionNumber ) {
|
||||
version = bigEndian64(version);
|
||||
transactionNumber = bigEndian16(transactionNumber);
|
||||
static_assert( sizeof(version) == 8, "version size mismatch" );
|
||||
memcpy( destination, &version, sizeof(version) );
|
||||
static_assert( sizeof(transactionNumber) == 2, "txn num size mismatch");
|
||||
memcpy( destination + sizeof(version), &transactionNumber, sizeof(transactionNumber) );
|
||||
static void transformVersionstampKey( StringRef& key, Version version, uint16_t transactionNumber ) {
|
||||
if (key.size() < 4)
|
||||
throw client_invalid_operation();
|
||||
|
||||
int32_t pos;
|
||||
memcpy(&pos, key.end() - sizeof(int32_t), sizeof(int32_t));
|
||||
pos = littleEndian32(pos);
|
||||
if (pos < 0 || pos + 10 > key.size())
|
||||
throw client_invalid_operation();
|
||||
|
||||
placeVersionstamp( mutateString(key) + pos, version, transactionNumber );
|
||||
}
|
||||
|
||||
static void transformVersionstampMutation( MutationRef& mutation, StringRef MutationRef::* param, Version version, uint16_t transactionNumber ) {
|
||||
|
|
|
@ -3114,6 +3114,14 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
|||
return readVersion;
|
||||
}
|
||||
|
||||
Optional<Version> Transaction::getCachedReadVersion() {
|
||||
if (readVersion.isValid() && readVersion.isReady() && !readVersion.isError()) {
|
||||
return readVersion.get();
|
||||
} else {
|
||||
return Optional<Version>();
|
||||
}
|
||||
}
|
||||
|
||||
Future<Standalone<StringRef>> Transaction::getVersionstamp() {
|
||||
if(committing.isValid()) {
|
||||
return transaction_invalid_version();
|
||||
|
|
|
@ -212,6 +212,7 @@ public:
|
|||
void setVersion( Version v );
|
||||
Future<Version> getReadVersion() { return getReadVersion(0); }
|
||||
Future<Version> getRawReadVersion();
|
||||
Optional<Version> getCachedReadVersion();
|
||||
|
||||
[[nodiscard]] Future<Optional<Value>> get(const Key& key, bool snapshot = false);
|
||||
[[nodiscard]] Future<Void> watch(Reference<Watch> watch);
|
||||
|
|
|
@ -1565,11 +1565,16 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
|
|||
}
|
||||
|
||||
if(operationType == MutationRef::SetVersionstampedKey) {
|
||||
KeyRangeRef range = getVersionstampKeyRange(arena, k, getMaxReadKey()); // this does validation of the key and needs to be performed before the readYourWritesDisabled path
|
||||
// this does validation of the key and needs to be performed before the readYourWritesDisabled path
|
||||
KeyRangeRef range = getVersionstampKeyRange(arena, k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
|
||||
if(!options.readYourWritesDisabled) {
|
||||
writeRangeToNativeTransaction(range);
|
||||
writes.addUnmodifiedAndUnreadableRange(range);
|
||||
}
|
||||
// k is the unversionstamped key provided by the user. If we've filled in a minimum bound
|
||||
// for the versionstamp, we need to make sure that's reflected when we insert it into the
|
||||
// WriteMap below.
|
||||
transformVersionstampKey( k, tr.getCachedReadVersion().orDefault(0), 0 );
|
||||
}
|
||||
|
||||
if(operationType == MutationRef::SetVersionstampedValue) {
|
||||
|
|
|
@ -69,6 +69,7 @@ public:
|
|||
|
||||
void setVersion( Version v ) { tr.setVersion(v); }
|
||||
Future<Version> getReadVersion();
|
||||
Optional<Version> getCachedReadVersion() { return tr.getCachedReadVersion(); }
|
||||
Future< Optional<Value> > get( const Key& key, bool snapshot = false );
|
||||
Future< Key > getKey( const KeySelector& key, bool snapshot = false );
|
||||
Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, int limit, bool snapshot = false, bool reverse = false );
|
||||
|
|
|
@ -344,7 +344,7 @@ struct UnreadableWorkload : TestWorkload {
|
|||
key = RandomTestImpl::getRandomVersionstampKey(arena);
|
||||
value = RandomTestImpl::getRandomValue(arena);
|
||||
tr.atomicOp(key, value, MutationRef::SetVersionstampedKey);
|
||||
range = getVersionstampKeyRange(arena, key, allKeys.end);
|
||||
range = getVersionstampKeyRange(arena, key, tr.getCachedReadVersion().orDefault(0), allKeys.end);
|
||||
unreadableMap.insert(range, true);
|
||||
//TraceEvent("RYWT_SetVersionstampKey").detail("Range", printable(range));
|
||||
}
|
||||
|
|
|
@ -724,7 +724,7 @@ struct WriteDuringReadWorkload : TestWorkload {
|
|||
if(!self->useSystemKeys && deterministicRandom()->random01() < 0.01) {
|
||||
Key versionStampKey = self->getRandomVersionStampKey();
|
||||
Value value = self->getRandomValue();
|
||||
KeyRangeRef range = getVersionstampKeyRange(versionStampKey.arena(), versionStampKey, normalKeys.end);
|
||||
KeyRangeRef range = getVersionstampKeyRange(versionStampKey.arena(), versionStampKey, tr.getCachedReadVersion().orDefault(0), normalKeys.end);
|
||||
self->changeCount.insert( range, changeNum++ );
|
||||
//TraceEvent("WDRVersionStamp").detail("VersionStampKey", printable(versionStampKey)).detail("Range", printable(range));
|
||||
tr.atomicOp( versionStampKey, value, MutationRef::SetVersionstampedKey );
|
||||
|
|
Loading…
Reference in New Issue