resolve pr comments
This commit is contained in:
parent
e3ce3b049d
commit
3b48942f9a
|
@ -141,17 +141,17 @@ public:
|
|||
};
|
||||
|
||||
class WatchMetadata: public ReferenceCounted<WatchMetadata> {
|
||||
public:
|
||||
Optional<Value> value;
|
||||
Version version;
|
||||
Promise<Version> watchPromise;
|
||||
Future<Version> watchFuture;
|
||||
Future<Void> watchFutureSS;
|
||||
public:
|
||||
Optional<Value> value;
|
||||
Version version;
|
||||
Promise<Version> watchPromise;
|
||||
Future<Version> watchFuture;
|
||||
Future<Void> watchFutureSS;
|
||||
|
||||
TransactionInfo info;
|
||||
TagSet tags;
|
||||
TransactionInfo info;
|
||||
TagSet tags;
|
||||
|
||||
WatchMetadata(Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
|
||||
WatchMetadata(Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
|
||||
};
|
||||
|
||||
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
|
||||
|
@ -385,8 +385,7 @@ public:
|
|||
|
||||
static bool debugUseTags;
|
||||
static const std::vector<std::string> debugTransactionTagChoices;
|
||||
private:
|
||||
std::map<Key, Reference<WatchMetadata>> watchMap;
|
||||
std::map<Key, Reference<WatchMetadata>> watchMap;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -2234,7 +2234,8 @@ ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
|
|||
if (watchVersion >= metadata->version) { // case 1: version_1 (SS) >= version_2 (map)
|
||||
cx->deleteWatchMetadata(key);
|
||||
if(metadata->watchPromise.canBeSet()) metadata->watchPromise.send(watchVersion);
|
||||
} else {
|
||||
} else { // ABA happens
|
||||
TEST(true); // ABA issue where the version returned from the server is less than the version in the map
|
||||
if (metadata->watchPromise.getFutureReferenceCount() == 1) { // case 2: version_1 < version_2 and future_count == 1
|
||||
cx->deleteWatchMetadata(key);
|
||||
}
|
||||
|
@ -2262,17 +2263,17 @@ ACTOR Future<Void> sameVersionDifKey(Version ver, Key key, Optional<Value> value
|
|||
|
||||
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
|
||||
state Optional<Value> valSS = wait(tr.get(key));
|
||||
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
|
||||
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
|
||||
|
||||
if (metadata.isValid() && valSS != metadata->value) { // val_3 != val_1
|
||||
if (metadata.isValid() && valSS != metadata->value) { // val_3 != val_1 (storage server value doesnt match value in map)
|
||||
cx->deleteWatchMetadata(key);
|
||||
|
||||
metadata->watchPromise.send(ver);
|
||||
metadata->watchFutureSS.cancel();
|
||||
}
|
||||
|
||||
if (valSS == value) { // val_3 == val_2
|
||||
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, info, tags));
|
||||
if (valSS == value) { // val_3 == val_2 (storage server value matches value passed into the function -> new watch)
|
||||
metadata = makeReference<WatchMetadata>(value, ver, info, tags);
|
||||
cx->setWatchMetadata(key, metadata);
|
||||
|
||||
metadata->watchFutureSS = watchStorageServerResp(key, cx);
|
||||
|
@ -2298,14 +2299,14 @@ Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Databas
|
|||
TransactionInfo info, TagSet tags) {
|
||||
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
|
||||
|
||||
if (!metadata.isValid()) { // case 1: key not in map
|
||||
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, info, tags));
|
||||
if (!metadata.isValid()) { // case 1: key not in map
|
||||
metadata = makeReference<WatchMetadata>(value, ver, info, tags);
|
||||
cx->setWatchMetadata(key, metadata);
|
||||
|
||||
metadata->watchFutureSS = watchStorageServerResp(key, cx);
|
||||
return success(metadata->watchPromise.getFuture());
|
||||
}
|
||||
else if (metadata->value == value) { // case 2: val_1 == val_2
|
||||
else if (metadata->value == value) { // case 2: val_1 == val_2 (received watch with same value as key already in the map so just update)
|
||||
if (ver > metadata->version) {
|
||||
metadata->version = ver;
|
||||
metadata->info = info;
|
||||
|
@ -2313,22 +2314,24 @@ Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Databas
|
|||
}
|
||||
|
||||
return success(metadata->watchPromise.getFuture());
|
||||
} else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1
|
||||
|
||||
} else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1 (recived watch with different value and a higher version so recreate in SS)
|
||||
TEST(true); // Setting a watch that has a different value than the one in the map but a higher version (newer)
|
||||
cx->deleteWatchMetadata(key);
|
||||
|
||||
metadata->watchPromise.send(ver);
|
||||
metadata->watchFutureSS.cancel();
|
||||
|
||||
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, info, tags));
|
||||
metadata = makeReference<WatchMetadata>(value, ver, info, tags);
|
||||
cx->setWatchMetadata(key, metadata);
|
||||
|
||||
metadata->watchFutureSS = watchStorageServerResp(key, cx);
|
||||
|
||||
return success(metadata->watchPromise.getFuture());
|
||||
} else if (metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2
|
||||
} else if (metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2 (recived watch with different value but same version)
|
||||
TEST(true); // Setting a watch which has a different value than the one in the map but the same version
|
||||
return sameVersionDifKey(ver, key, value, cx, info, tags);
|
||||
}
|
||||
TEST(true); // Setting a watch which has a different value than the one in the map but a lower version (older)
|
||||
// case 4: val_1 != val_2 && version_2 < version_1
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -6,3 +6,4 @@ testTitle = 'WatchesTest'
|
|||
|
||||
[[test.workload]]
|
||||
testName = 'WatchesSameKeyCorrectness'
|
||||
numWatches = 3
|
||||
|
|
Loading…
Reference in New Issue