change logic

This commit is contained in:
Nim Wijetunga 2021-02-03 20:01:26 -05:00
parent b6e7961b78
commit ec6226bde9
2 changed files with 79 additions and 66 deletions

View File

@ -146,12 +146,12 @@ class WatchMetadata: public ReferenceCounted<WatchMetadata> {
Version version;
Promise<Version> watchPromise;
Future<Version> watchFuture;
Future<Version> watchFutureSS;
Future<Void> watchFutureSS;
TransactionInfo info;
TagSet tags;
WatchMetadata(Optional<Value> value, Version version, Future<Version> watchFutureSS, TransactionInfo info, TagSet tags);
WatchMetadata(Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {

View File

@ -1386,7 +1386,7 @@ void DatabaseContext::deleteWatchMetadata(Key key) {
watchMap.erase(key);
}
WatchMetadata::WatchMetadata(Optional<Value> value, Version version, Future<Version> watchFutureSS, TransactionInfo info, TagSet tags): value(value), version(version), watchFutureSS(watchFutureSS), info(info), tags(tags) {
WatchMetadata::WatchMetadata(Optional<Value> value, Version version, TransactionInfo info, TagSet tags): value(value), version(version), info(info), tags(tags) {
// create dummy future
watchFuture = watchPromise.getFuture();
}
@ -2216,14 +2216,15 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
}
ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata) return Void();
loop {
try {
try {
loop {
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata) {TraceEvent("Nim_Case SS exit normal"); return Void();}
TraceEvent("Nim_Case SS enter").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
Version watchVersion = wait(metadata->watchFutureSS);
state Future<Version> watchFutureSS = watchValue(Future<Version>(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags);
Version watchVersion = wait(watchFutureSS);
TraceEvent("Nim_Case SS done wait");
if (watchVersion >= metadata->version) { // case 1: version_1 (SS) >= version_2 (map)
TraceEvent("Nim_Case SS 1").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
@ -2233,98 +2234,110 @@ ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
if (metadata->watchPromise.getFutureReferenceCount() == 1) { // case 2: version_1 < version_2 and future_count == 1
TraceEvent("Nim_Case SS 2").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
cx->deleteWatchMetadata(key);
} else { // case 3: future_count > 1
TraceEvent("Nim_Case SS 3").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
metadata->watchFutureSS = watchValue(Future<Version>(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags);
}
}
// if there is no key in the map then destroy actor
metadata = cx->getWatchMetadata(key);
if (!metadata) return Void();
} catch(Error &e) {
// no more watches for the key so shutdown actor
metadata = cx->getWatchMetadata(key);
if (!metadata) return Void();
}
} catch(Error &e) {
TraceEvent("Nim_Case SS exit exception");
if (e.code() == error_code_actor_cancelled) {
TraceEvent("Nim_Case SS exit actor cancelled");
}
throw e;
}
}
ACTOR Future<Void> watchValueMap(Future<Version> version, Key key, Optional<Value> value, Database cx,
ACTOR Future<Void> sameVersionDifKey(Version ver, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
state ReadYourWritesTransaction tr(cx);
try {
state Optional<Value> valSS = wait(tr.get(key));
if (valSS == value) { // val_3 == val_2
Reference<WatchMetadata> newMetadata(new WatchMetadata(value, ver, info, tags));
cx->setWatchMetadata(key, newMetadata);
newMetadata->watchFutureSS = watchStorageServerResp(key, cx);
}
if (valSS != metadata->value) { // val_3 != val_1
if (valSS != value) cx->deleteWatchMetadata(key);
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
}
wait(tr.commit());
metadata = cx->getWatchMetadata(key);
if (metadata) { // if val_3 == val_2
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
wait(watchFuture);
}
return Void(); // if val_3 != val_2
}
catch(Error& e) {
state Error err = e;
wait(tr.onError(e));
throw err;
}
}
Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
TraceEvent("Nim_Case start").detail("Key", key).detail("Value", value);
state Version ver = wait( version );
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata) { // case 1: key not in map
TraceEvent("Nim_Case 1").detail("Key", key).detail("Value", value);
Future<Version> watchFutureSS = watchValue(version, key, value, cx, info, tags);
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, watchFutureSS, info, tags));
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, info, tags));
cx->setWatchMetadata(key, metadata);
metadata->watchFutureSS = watchStorageServerResp(key, cx);
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
Future<Void> SSResp = watchStorageServerResp(key, cx);
wait(SSResp && watchFuture);
return watchFuture;
}
else if (metadata->value == value) { // case 2: val_1 == val_2
TraceEvent("Nim_Case 2").detail("Key", key).detail("Value", value);
if (ver > metadata->version) {
TraceEvent("Nim_Case 2 replace version").detail("Key", key).detail("Value", value);
metadata->version = ver;
metadata->info = info;
metadata->tags = tags;
}
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
wait(watchFuture);
return watchFuture;
} else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1
TraceEvent("Nim_Case 3").detail("Key", key).detail("Value", value);
Future<Version> watchFutureSS = watchValue(version, key, value, cx, info, tags);
Reference<WatchMetadata> newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags));
Reference<WatchMetadata> newMetadata(new WatchMetadata(value, ver, info, tags));
cx->setWatchMetadata(key, newMetadata);
newMetadata->watchFutureSS = watchStorageServerResp(key, cx);
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
Future<Void> watchFuture = success(newMetadata->watchPromise.getFuture());
wait(watchFuture);
} else if (metadata->value != value && metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2
return watchFuture;
} else if (metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2
TraceEvent("Nim_Case 5").detail("Key", key).detail("Value", value);
state ReadYourWritesTransaction tr(cx);
try {
state Optional<Value> valSS = wait(tr.get(key));
if (valSS == value) { // val_3 == val_2
Future<Version> watchFutureSS = watchValue(version, key, value, cx, info, tags);
Reference<WatchMetadata> newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags));
cx->setWatchMetadata(key, newMetadata);
}
if (valSS != metadata->value) { // val_3 != val_1
if (valSS != value) cx->deleteWatchMetadata(key);
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
}
wait(tr.commit());
if (valSS != value) return Void();
metadata = cx->getWatchMetadata(key);
if (metadata) { // if val_3 == val_2
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
wait(watchFuture);
}
}
catch(Error& e) {
wait(tr.onError(e));
}
return sameVersionDifKey(ver, key, value, cx, info, tags);
}
TraceEvent("Nim_Case return").detail("Key", key).detail("Value", value);
// Note: We will reach here for case 3: val_1 != val_2 && version_2 < version_1
return Future<Void>();
}
ACTOR Future<Void> watchValueMap(Future<Version> version, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state Version ver = wait(version);
wait(getWatchFuture(ver, key, value, cx, info, tags));
return Void();
}