address pr comments
This commit is contained in:
parent
b8f595cbc7
commit
8c3b292b55
|
@ -18,9 +18,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <string>
|
||||
#include <atomic>
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
|
@ -32,7 +29,6 @@
|
|||
struct WatchesSameKeyWorkload : TestWorkload {
|
||||
int numWatches;
|
||||
std::vector<Future<Void>> cases;
|
||||
static std::atomic<int32_t> uid;
|
||||
|
||||
WatchesSameKeyWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
numWatches = getOption(options, LiteralStringRef("numWatches"), 3);
|
||||
|
@ -61,16 +57,13 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
return ok;
|
||||
}
|
||||
|
||||
static int getId() {
|
||||
return uid++;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> setKeyRandomValue(Database cx, Key key, Optional<Value> val) {
|
||||
// set value at key to val if provided (random otherwise)
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
Value valS;
|
||||
if (!val.present()) valS = Value(std::to_string(getId()));
|
||||
if (!val.present()) valS = Value(deterministicRandom()->randomUniqueID().toString());
|
||||
else valS = val.get();
|
||||
tr.set(key, valS);
|
||||
wait(tr.commit());
|
||||
|
@ -82,6 +75,7 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR static Future<Future<Void>> watchKey(Database cx, Key key) {
|
||||
// sets a watch on a key and returns future
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
|
@ -105,13 +99,13 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
state std::vector<Future<Void>> watchFutures;
|
||||
state int i;
|
||||
|
||||
tr.set(key, Value(std::to_string(getId())));
|
||||
for ( i = 0; i < self->numWatches; i++ ) {
|
||||
tr.set(key, Value(deterministicRandom()->randomUniqueID().toString()));
|
||||
for ( i = 0; i < self->numWatches; i++ ) { // set watches for a given k/v pair set above
|
||||
watchFutures.push_back(tr.watch(key));
|
||||
}
|
||||
wait(tr.commit());
|
||||
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>()));
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>())); // trigger all watches created above
|
||||
for (i = 0; i < watchFutures.size(); i++) {
|
||||
wait(watchFutures[i]);
|
||||
}
|
||||
|
@ -134,13 +128,13 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
state Future<Void> watch1 = wait(watchKey(cx, key));
|
||||
state int i;
|
||||
|
||||
tr.set(key, Value( std::to_string(getId()) ));
|
||||
for ( i = 0; i < self->numWatches; i++ ) {
|
||||
tr.set(key, Value( deterministicRandom()->randomUniqueID().toString() ));
|
||||
for ( i = 0; i < self->numWatches; i++ ) { // set watches for a given k/v pair set above
|
||||
watchFutures.push_back(tr.watch(key));
|
||||
}
|
||||
wait(tr.commit());
|
||||
wait(watch1);
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>()));
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>())); // trigger remaining watches
|
||||
for (i = 0; i < watchFutures.size(); i++) {
|
||||
wait(watchFutures[i]);
|
||||
}
|
||||
|
@ -160,7 +154,7 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
state ReadYourWritesTransaction tr2(cx);
|
||||
loop {
|
||||
try {
|
||||
state Value val = std::to_string(getId());
|
||||
state Value val = deterministicRandom()->randomUniqueID().toString();
|
||||
tr2.set(key, val);
|
||||
state Future<Void> watch1 = tr2.watch(key);
|
||||
wait( tr2.commit() );
|
||||
|
@ -188,16 +182,17 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
state ReadYourWritesTransaction tr2(cx);
|
||||
loop {
|
||||
try {
|
||||
state Value val = std::to_string(getId());
|
||||
// watch1 and watch2 are set on the same k/v pair
|
||||
state Value val = deterministicRandom()->randomUniqueID().toString();
|
||||
tr2.set(key, val);
|
||||
state Future<Void> watch1 = tr2.watch(key);
|
||||
wait( tr2.commit() );
|
||||
wait ( setKeyRandomValue(cx, key, Optional<Value>()) );
|
||||
tr.set(key, val);
|
||||
tr.set(key, val); // trigger ABA (line above changes value and this line changes it back)
|
||||
state Future<Void> watch2 = tr.watch(key);
|
||||
wait(tr.commit());
|
||||
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>()));
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>())); // since ABA has occured we need to trigger the watches with a new value
|
||||
wait(watch1);
|
||||
wait(watch2);
|
||||
return Void();
|
||||
|
@ -218,14 +213,15 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
try {
|
||||
tr1.setOption( FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE );
|
||||
tr2.setOption( FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE );
|
||||
tr1.set(key, Value( std::to_string(getId()) ));
|
||||
tr2.set(key, Value( std::to_string(getId()) ));
|
||||
tr1.set(key, Value( deterministicRandom()->randomUniqueID().toString() ));
|
||||
tr2.set(key, Value( deterministicRandom()->randomUniqueID().toString() ));
|
||||
state Future<Void> watch1 = tr1.watch(key);
|
||||
state Future<Void> watch2 = tr2.watch(key);
|
||||
// each watch commits with a different value but (hopefully) the same version since there is no write conflict range
|
||||
wait(tr1.commit() && tr2.commit());
|
||||
|
||||
wait(watch1 || watch2); // since we enter case 5 at least one of the watches should be fired
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>()));
|
||||
wait(setKeyRandomValue(cx, key, Optional<Value>())); // fire the watch that possibly wasn't triggered
|
||||
wait(watch1 && watch2);
|
||||
|
||||
return Void();
|
||||
|
@ -238,6 +234,4 @@ struct WatchesSameKeyWorkload : TestWorkload {
|
|||
void getMetrics(vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
std::atomic<int32_t> WatchesSameKeyWorkload::uid = 0;
|
||||
|
||||
WorkloadFactory<WatchesSameKeyWorkload> WatchesSameKeyWorkloadFactory("WatchesSameKeyCorrectness");
|
||||
|
|
Loading…
Reference in New Issue