Implements CompareAndClear AtomicOp

Adds CompareAndClear mutation. If the given parameter is equal to the
current value of the key, the key is cleared. At client, the mutation
is added to the operation stack. Hence if the mutation evaluates to
clear, we only get to know so when `read()` evaluates the stack in
`RYWIterator::kv()`, which is unlike what we currently do for typical
ClearRange.
This commit is contained in:
Vishesh Yadav 2019-01-31 01:23:32 -08:00
parent 0e5db1e7b7
commit c532d5c277
10 changed files with 317 additions and 101 deletions

View File

@ -220,6 +220,15 @@ static ValueRef doByteMin(const Optional<ValueRef>& existingValueOptional, const
return otherOperand;
}
static Optional<ValueRef> doCompareAndClear(const Optional<ValueRef>& existingValueOptional,
const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present() || existingValueOptional.get() == otherOperand) {
// Clear the value.
return Optional<ValueRef>();
}
return existingValueOptional; // No change required.
}
/*
* Returns the range corresponding to the specified versionstamp key.
*/

View File

@ -24,11 +24,54 @@
#include "fdbclient/FDBTypes.h"
static const char * typeString[] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
static const char* typeString[] = { "SetValue",
"ClearRange",
"AddValue",
"DebugKeyRange",
"DebugKey",
"NoOp",
"And",
"Or",
"Xor",
"AppendIfFits",
"AvailableForReuse",
"Reserved_For_LogProtocolMessage",
"Max",
"Min",
"SetVersionstampedKey",
"SetVersionstampedValue",
"ByteMin",
"ByteMax",
"MinV2",
"AndV2",
"CompareAndClear" };
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP };
enum Type : uint8_t {
SetValue = 0,
ClearRange,
AddValue,
DebugKeyRange,
DebugKey,
NoOp,
And,
Or,
Xor,
AppendIfFits,
AvailableForReuse,
Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */,
Max,
Min,
SetVersionstampedKey,
SetVersionstampedValue,
ByteMin,
ByteMax,
MinV2,
AndV2,
CompareAndClear,
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes.
uint8_t type;
StringRef param1, param2;
@ -54,10 +97,14 @@ struct MutationRef {
}
// These masks define which mutation types have particular properties (they are used to implement isSingleKeyMutation() etc)
enum {
ATOMIC_MASK = (1 << AddValue) | (1 << And) | (1 << Or) | (1 << Xor) | (1 << AppendIfFits) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << ByteMin) | (1 << ByteMax) | (1 << MinV2) | (1 << AndV2),
SINGLE_KEY_MASK = ATOMIC_MASK | (1<<SetValue),
NON_ASSOCIATIVE_MASK = (1 << AddValue) | (1 << Or) | (1 << Xor) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << MinV2)
enum {
ATOMIC_MASK = (1 << AddValue) | (1 << And) | (1 << Or) | (1 << Xor) | (1 << AppendIfFits) | (1 << Max) |
(1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << ByteMin) |
(1 << ByteMax) | (1 << MinV2) | (1 << AndV2) | (1 << CompareAndClear),
SINGLE_KEY_MASK = ATOMIC_MASK | (1 << SetValue),
NON_ASSOCIATIVE_MASK = (1 << AddValue) | (1 << Or) | (1 << Xor) | (1 << Max) | (1 << Min) |
(1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << MinV2) |
(1 << CompareAndClear)
};
};

View File

@ -48,16 +48,24 @@ bool RYWIterator::is_unreadable() { return writes.is_unreadable(); }
ExtStringRef RYWIterator::beginKey() { return begin_key_cmp <= 0 ? writes.beginKey() : cache.beginKey(); }
ExtStringRef RYWIterator::endKey() { return end_key_cmp <= 0 ? cache.endKey() : writes.endKey(); }
KeyValueRef const& RYWIterator::kv( Arena& arena ) {
const KeyValueRef* RYWIterator::kv(Arena& arena) {
if(is_unreadable())
throw accessed_unreadable();
if (writes.is_unmodified_range())
if (writes.is_unmodified_range()) {
return cache.kv( arena );
else if (writes.is_independent() || cache.is_empty_range())
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), Optional<ValueRef>(), arena ).value.get() );
else
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), cache.kv(arena).value, arena ).value.get() );
}
auto result = (writes.is_independent() || cache.is_empty_range())
? WriteMap::coalesceUnder(writes.op(), Optional<ValueRef>(), arena)
: WriteMap::coalesceUnder(writes.op(), cache.kv(arena)->value, arena);
if (!result.value.present()) {
// Key is now deleted, which can happen because of CompareAndClear.
return nullptr;
}
temp = KeyValueRef(writes.beginKey().assertRef(), result.value.get());
return &temp;
}
RYWIterator& RYWIterator::operator++() {
@ -208,7 +216,11 @@ void testSnapshotCache() {
RYWIterator it(&cache, &writes);
it.skip(searchKeys.begin);
while (true) {
fprintf(stderr, "b: '%s' e: '%s' type: %s value: '%s'\n", printable(it.beginKey().toStandaloneStringRef()).c_str(), printable(it.endKey().toStandaloneStringRef()).c_str(), it.is_empty_range() ? "empty" : ( it.is_kv() ? "keyvalue" : "unknown" ), it.is_kv() ? printable(it.kv(arena).value).c_str() : "");
fprintf(stderr, "b: '%s' e: '%s' type: %s value: '%s'\n",
printable(it.beginKey().toStandaloneStringRef()).c_str(),
printable(it.endKey().toStandaloneStringRef()).c_str(),
it.is_empty_range() ? "empty" : (it.is_kv() ? "keyvalue" : "unknown"),
it.is_kv() ? printable(it.kv(arena)->value).c_str() : "");
if (it.endKey() >= searchKeys.end) break;
++it;
}
@ -216,7 +228,11 @@ void testSnapshotCache() {
it.skip(searchKeys.end);
while (true) {
fprintf(stderr, "b: '%s' e: '%s' type: %s value: '%s'\n", printable(it.beginKey().toStandaloneStringRef()).c_str(), printable(it.endKey().toStandaloneStringRef()).c_str(), it.is_empty_range() ? "empty" : ( it.is_kv() ? "keyvalue" : "unknown" ), it.is_kv() ? printable(it.kv(arena).value).c_str() : "" );
fprintf(stderr, "b: '%s' e: '%s' type: %s value: '%s'\n",
printable(it.beginKey().toStandaloneStringRef()).c_str(),
printable(it.endKey().toStandaloneStringRef()).c_str(),
it.is_empty_range() ? "empty" : (it.is_kv() ? "keyvalue" : "unknown"),
it.is_kv() ? printable(it.kv(arena)->value).c_str() : "");
if (it.beginKey() <= searchKeys.begin) break;
--it;
}

View File

@ -43,7 +43,7 @@ public:
ExtStringRef beginKey();
ExtStringRef endKey();
KeyValueRef const& kv( Arena& arena );
const KeyValueRef* kv(Arena& arena);
RYWIterator& operator++();

View File

@ -32,12 +32,18 @@ public:
it.skip(allKeys.begin);
Arena arena;
while( true ) {
TraceEvent("RYWDump").detail("Begin", printable(it.beginKey().toStandaloneStringRef()))
.detail("End", printable(it.endKey().toStandaloneStringRef()))
.detail("Unknown", it.is_unknown_range())
.detail("Empty", it.is_empty_range())
.detail("KV", it.is_kv())
.detail("Key", printable(it.is_kv() ? it.kv(arena).key : StringRef()));
Optional<StringRef> key = StringRef();
if (it.is_kv()) {
auto kv = it.kv(arena);
if (kv) key = kv->key;
}
TraceEvent("RYWDump")
.detail("Begin", printable(it.beginKey().toStandaloneStringRef()))
.detail("End", printable(it.endKey().toStandaloneStringRef()))
.detail("Unknown", it.is_unknown_range())
.detail("Empty", it.is_empty_range())
.detail("KV", it.is_kv())
.detail("Key", printable(key.get()));
if( it.endKey() == allKeys.end )
break;
++it;
@ -74,13 +80,18 @@ public:
it->skip(read.key);
state bool dependent = it->is_dependent();
if( it->is_kv() ) {
return it->kv(ryw->arena).value;
const KeyValueRef* result = it->kv(ryw->arena);
if (result != nullptr) {
return result->value;
} else {
return Optional<Value>();
}
} else if( it->is_empty_range() ) {
return Optional<Value>();
} else {
Optional<Value> res = wait( ryw->tr.get( read.key, true ) );
KeyRef k( ryw->arena, read.key );
if( res.present() ) {
if( ryw->cache.insert( k, res.get() ) )
ryw->arena.dependsOn(res.get().arena());
@ -96,7 +107,12 @@ public:
it->skip(k);
ASSERT( it->is_kv() );
return it->kv(ryw->arena).value;
const KeyValueRef* result = it->kv(ryw->arena);
if (result != nullptr) {
return result->value;
} else {
return Optional<Value>();
}
}
}
@ -605,10 +621,14 @@ public:
resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
} else if (it.is_kv()) {
KeyValueRef const* start = &it.kv(ryw->arena);
KeyValueRef const* start = it.kv(ryw->arena);
if (start == nullptr) {
++it;
continue;
}
it.skipContiguous( end.isFirstGreaterOrEqual() ? end.getKey() : ryw->getMaxReadKey() ); //not technically correct since this would add end.getKey(), but that is protected above
int maxCount = &it.kv(ryw->arena) - start + 1;
int maxCount = it.kv(ryw->arena) - start + 1;
int count = 0;
for(; count < maxCount && !limits.isReached(); count++ ) {
limits.decrement(start[count]);
@ -884,10 +904,11 @@ public:
resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
} else {
if (it.is_kv()) {
KeyValueRef const* end = &it.kv(ryw->arena);
KeyValueRef const* end = it.is_kv() ? it.kv(ryw->arena) : nullptr;
if (end != nullptr) {
it.skipContiguousBack( begin.isFirstGreaterOrEqual() ? begin.getKey() : allKeys.begin );
KeyValueRef const* start = &it.kv(ryw->arena);
KeyValueRef const* start = it.kv(ryw->arena);
ASSERT(start != nullptr);
int maxCount = end - start + 1;
int count = 0;
@ -1409,7 +1430,11 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
for( int i = 0; i < op.size(); ++i) {
switch(op[i].type) {
case MutationRef::SetValue:
tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
if (op[i].value.present()) {
tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
} else {
tr.clear( it.beginKey().assertRef(), false );
}
break;
case MutationRef::AddValue:
case MutationRef::AppendIfFits:
@ -1424,7 +1449,8 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
case MutationRef::ByteMax:
case MutationRef::MinV2:
case MutationRef::AndV2:
tr.atomicOp( it.beginKey().assertRef(), op[i].value.get(), op[i].type, false );
case MutationRef::CompareAndClear:
tr.atomicOp(it.beginKey().assertRef(), op[i].value.get(), op[i].type, false);
break;
default:
break;

View File

@ -190,8 +190,8 @@ public:
return ExtStringRef( it->values[ (offset-1)>>1 ].key, 1-(offset&1) );
}
KeyValueRef const& kv( Arena& arena ){ // only if is_kv()
return it->values[ (offset-2)>>1 ];
const KeyValueRef* kv(Arena& arena) { // only if is_kv()
return &it->values[(offset - 2) >> 1];
}
iterator& operator++() {
@ -342,4 +342,4 @@ public:
// segment is not invalidated
};
#endif
#endif

View File

@ -171,7 +171,7 @@ public:
coalesceOver( e.stack, RYWMutation( param, operation ), *arena );
else
e.stack.push( RYWMutation( param, operation ) );
it.tree.clear();
PTreeImpl::remove( writes, ver, e.key ); // FIXME: Make PTreeImpl::insert do this automatically (see also VersionedMap.h FIXME)
PTreeImpl::insert( writes, ver, std::move(e) );
@ -394,8 +394,18 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::AppendIfFits) {
} else if (newEntry.type == MutationRef::CompareAndClear) {
switch (existingEntry.type) {
case MutationRef::SetValue:
if (doCompareAndClear(existingEntry.value, newEntry.value.get(), arena).present()) {
return existingEntry;
} else {
return RYWMutation(Optional<ValueRef>(), MutationRef::SetValue);
}
default:
throw operation_failed();
}
} else if (newEntry.type == MutationRef::AppendIfFits) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -404,8 +414,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::And) {
} else if (newEntry.type == MutationRef::And) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAnd(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -414,8 +423,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::Or) {
} else if (newEntry.type == MutationRef::Or) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doOr(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -424,8 +432,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::Xor) {
} else if (newEntry.type == MutationRef::Xor) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doXor(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -434,8 +441,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::Max) {
} else if (newEntry.type == MutationRef::Max) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -444,8 +450,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::Min) {
} else if (newEntry.type == MutationRef::Min) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -454,8 +459,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::ByteMin) {
} else if (newEntry.type == MutationRef::ByteMin) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doByteMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -464,8 +468,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::ByteMax) {
} else if (newEntry.type == MutationRef::ByteMax) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doByteMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -474,8 +477,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::MinV2) {
} else if (newEntry.type == MutationRef::MinV2) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMinV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -484,8 +486,7 @@ public:
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::AndV2) {
} else if (newEntry.type == MutationRef::AndV2) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAndV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
@ -494,8 +495,8 @@ public:
default:
throw operation_failed();
}
}
else throw operation_failed();
} else
throw operation_failed();
}
static void coalesceOver(OperationStack& stack, RYWMutation newEntry, Arena& arena) {

View File

@ -254,6 +254,9 @@ description is not currently required but encouraged.
<Option name="byte_max" code="17"
paramType="Bytes" paramDescription="value to check against database value"
description="Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the larger of the two values is then stored in the database."/>
<Option name="compare_and_clear" code="20"
paramType="Bytes" paramDescription="Value to compare with"
description="Performs an atomic ``compare and clear`` operation. If the existing value in the database is equal to the given value, then given key is cleared."/>
</Scope>
<Scope name="ConflictRangeType">

View File

@ -203,6 +203,8 @@ struct UpdateEagerReadInfo {
vector<pair<KeyRef, int>> keys;
vector<Optional<Value>> value;
Arena arena;
void addMutations( VectorRef<MutationRef> const& mutations ) {
for(auto& m : mutations)
addMutation(m);
@ -212,7 +214,11 @@ struct UpdateEagerReadInfo {
// SOMEDAY: Theoretically we can avoid a read if there is an earlier overlapping ClearRange
if (m.type == MutationRef::ClearRange && !m.param2.startsWith(systemKeys.end))
keyBegin.push_back( m.param2 );
else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) || (m.type == MutationRef::ByteMax))
else if (m.type == MutationRef::CompareAndClear) {
keyBegin.push_back(keyAfter(m.param1, arena));
keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size() + 1));
} else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) ||
(m.type == MutationRef::ByteMax))
keys.push_back(pair<KeyRef, int>(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT));
else if (isAtomicOp((MutationRef::Type) m.type))
keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size()));
@ -1420,13 +1426,6 @@ ACTOR Future<Void> doEagerReads( StorageServer* data, UpdateEagerReadInfo* eager
return Void();
}
void singleEagerReadFromVector( UpdateEagerReadInfo& eager, KeyRef const& key, VectorRef<KeyValueRef> data ) {
eager.keyBegin.clear(); eager.keyEnd.clear();
eager.keyBegin.push_back( key );
auto e = std::lower_bound( data.begin(), data.end(), key, KeyValueRef::OrderByKey() );
eager.keyEnd.push_back( e != data.end() ? e->key : allKeys.end );
}
bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion ) {
// Remove entries from the latest version of data->versionedData that haven't changed since they were inserted
// before or at desiredDurableVersion, to maintain the invariants for versionedData.
@ -1549,39 +1548,46 @@ bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, U
}
switch(m.type) {
case MutationRef::AddValue:
m.param2 = doLittleEndianAdd(oldVal, m.param2, ar);
break;
case MutationRef::And:
m.param2 = doAnd(oldVal, m.param2, ar);
break;
case MutationRef::Or:
m.param2 = doOr(oldVal, m.param2, ar);
break;
case MutationRef::Xor:
m.param2 = doXor(oldVal, m.param2, ar);
break;
case MutationRef::AppendIfFits:
m.param2 = doAppendIfFits(oldVal, m.param2, ar);
break;
case MutationRef::Max:
m.param2 = doMax(oldVal, m.param2, ar);
break;
case MutationRef::Min:
m.param2 = doMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMin:
m.param2 = doByteMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMax:
m.param2 = doByteMax(oldVal, m.param2, ar);
break;
case MutationRef::MinV2:
m.param2 = doMinV2(oldVal, m.param2, ar);
break;
case MutationRef::AndV2:
m.param2 = doAndV2(oldVal, m.param2, ar);
break;
case MutationRef::AddValue:
m.param2 = doLittleEndianAdd(oldVal, m.param2, ar);
break;
case MutationRef::And:
m.param2 = doAnd(oldVal, m.param2, ar);
break;
case MutationRef::Or:
m.param2 = doOr(oldVal, m.param2, ar);
break;
case MutationRef::Xor:
m.param2 = doXor(oldVal, m.param2, ar);
break;
case MutationRef::AppendIfFits:
m.param2 = doAppendIfFits(oldVal, m.param2, ar);
break;
case MutationRef::Max:
m.param2 = doMax(oldVal, m.param2, ar);
break;
case MutationRef::Min:
m.param2 = doMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMin:
m.param2 = doByteMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMax:
m.param2 = doByteMax(oldVal, m.param2, ar);
break;
case MutationRef::MinV2:
m.param2 = doMinV2(oldVal, m.param2, ar);
break;
case MutationRef::AndV2:
m.param2 = doAndV2(oldVal, m.param2, ar);
break;
case MutationRef::CompareAndClear:
if (oldVal.present() && m.param2 == oldVal.get()) {
m.type = MutationRef::ClearRange;
m.param2 = keyAfter(m.param1, ar);
return expandMutation(m, data, eager, eagerTrustedEnd, ar);
}
return false;
}
m.type = MutationRef::SetValue;
}
@ -2565,6 +2571,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
case MutationRef::MinV2:
case MutationRef::Or:
case MutationRef::Xor:
case MutationRef::CompareAndClear:
++data->counters.atomicMutations;
break;
}

View File

@ -57,7 +57,7 @@ public:
virtual Future<Void> start(Database const& cx) {
if (opType == -1)
opType = sharedRandomNumber % 8;
opType = sharedRandomNumber % 9;
switch (opType) {
case 0:
@ -84,6 +84,9 @@ public:
case 7:
TEST(true); //Testing atomic Add
return testAdd(cx->clone(), this);
case 8:
TEST(true); // Testing atomic CompareAndClear
return testCompareAndClear(cx->clone(), this);
default:
ASSERT(false);
}
@ -268,6 +271,102 @@ public:
return Void();
}
ACTOR Future<Void> testCompareAndClearAtomicOpApi(Database cx, AtomicOpsApiCorrectnessWorkload* self, Key key,
bool keySet) {
state uint64_t opType = MutationRef::CompareAndClear;
state uint64_t intValue1 = g_random->randomInt(0, 10000000);
state uint64_t intValue2 = g_random->coinflip() ? intValue1 : g_random->randomInt(0, 10000000);
state Value val1 = StringRef((const uint8_t*)&intValue1, sizeof(intValue1));
state Value val2 = StringRef((const uint8_t*)&intValue2, sizeof(intValue2));
state std::function<Optional<uint64_t>(uint64_t, uint64_t)> opFunc = [keySet](uint64_t val1, uint64_t val2) {
if (!keySet || val1 == val2) {
return Optional<uint64_t>();
} else {
return Optional<uint64_t>(val1);
}
};
// Do operation on Storage Server
loop {
try {
// Set the key to a random value
wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
if (keySet) {
tr->set(key, val1);
} else {
tr->clear(key);
}
return Void();
}));
// Do atomic op
wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
tr->atomicOp(key, val2, opType);
return Void();
}));
break;
} catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("ErrCode", e.code());
wait(delay(1));
}
}
// Compare result
Optional<Value> outputVal = wait(runRYWTransaction(
cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
state Optional<uint64_t> expectedOutput = opFunc(intValue1, intValue2);
ASSERT(outputVal.present() == expectedOutput.present());
if (outputVal.present()) {
uint64_t output = 0;
ASSERT(outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != expectedOutput.get()) {
TraceEvent(SevError, "AtomicOpApiCorrectnessUnexpectedOutput")
.detail("OpOn", "StorageServer")
.detail("InValue1", intValue1)
.detail("InValue2", intValue2)
.detail("AtomicOp", opType)
.detail("ExpectedOutput", expectedOutput.get())
.detail("ActualOutput", output);
self->testFailed = true;
}
}
// Do operation at RYW layer
Optional<Value> outputVal =
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> {
if (keySet) {
tr->set(key, val1);
} else {
tr->clear(key);
}
tr->atomicOp(key, val2, opType);
return tr->get(key);
}));
// Compare result
ASSERT(outputVal.present() == expectedOutput.present());
if (outputVal.present()) {
uint64_t output = 0;
ASSERT(outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != expectedOutput.get()) {
TraceEvent(SevError, "AtomicOpApiCorrectnessUnexpectedOutput")
.detail("OpOn", "RYWLayer")
.detail("InValue1", intValue1)
.detail("InValue2", intValue2)
.detail("AtomicOp", opType)
.detail("ExpectedOutput", expectedOutput.get())
.detail("ActualOutput", output);
self->testFailed = true;
}
}
return Void();
}
ACTOR Future<Void> testMin(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state int currentApiVersion = getApiVersion(cx);
state Key key = self->getTestKey("test_key_min_");
@ -355,6 +454,14 @@ public:
return Void();
}
ACTOR Future<Void> testCompareAndClear(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_compare_and_clear_");
TraceEvent(SevInfo, "Running Atomic Op COMPARE_AND_CLEAR Correctness Current Api Version");
wait(self->testCompareAndClearAtomicOpApi(cx, self, key, true));
wait(self->testCompareAndClearAtomicOpApi(cx, self, key, false));
return Void();
}
ACTOR Future<Void> testByteMin(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_byte_min_");