diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 4521085459..53043f551c 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -146,12 +146,12 @@ class WatchMetadata: public ReferenceCounted { Version version; Promise watchPromise; Future watchFuture; - Future watchFutureSS; + Future watchFutureSS; TransactionInfo info; TagSet tags; - WatchMetadata(Optional value, Version version, Future watchFutureSS, TransactionInfo info, TagSet tags); + WatchMetadata(Optional value, Version version, TransactionInfo info, TagSet tags); }; class DatabaseContext : public ReferenceCounted, public FastAllocated, NonCopyable { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index d2a0d868a6..91c034c275 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1386,7 +1386,7 @@ void DatabaseContext::deleteWatchMetadata(Key key) { watchMap.erase(key); } -WatchMetadata::WatchMetadata(Optional value, Version version, Future watchFutureSS, TransactionInfo info, TagSet tags): value(value), version(version), watchFutureSS(watchFutureSS), info(info), tags(tags) { +WatchMetadata::WatchMetadata(Optional value, Version version, TransactionInfo info, TagSet tags): value(value), version(version), info(info), tags(tags) { // create dummy future watchFuture = watchPromise.getFuture(); } @@ -2216,14 +2216,15 @@ ACTOR Future watchValue(Future version, Key key, Optional watchStorageServerResp(Key key, Database cx) { - state Reference metadata = cx->getWatchMetadata(key); - if (!metadata) return Void(); - - loop { - - try { + try { + loop { + state Reference metadata = cx->getWatchMetadata(key); + if (!metadata) {TraceEvent("Nim_Case SS exit normal"); return Void();} TraceEvent("Nim_Case SS enter").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); - Version watchVersion = wait(metadata->watchFutureSS); + + state Future watchFutureSS = watchValue(Future(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags); + Version watchVersion = wait(watchFutureSS); + TraceEvent("Nim_Case SS done wait"); if (watchVersion >= metadata->version) { // case 1: version_1 (SS) >= version_2 (map) TraceEvent("Nim_Case SS 1").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); @@ -2233,98 +2234,110 @@ ACTOR Future watchStorageServerResp(Key key, Database cx) { if (metadata->watchPromise.getFutureReferenceCount() == 1) { // case 2: version_1 < version_2 and future_count == 1 TraceEvent("Nim_Case SS 2").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); cx->deleteWatchMetadata(key); - } else { // case 3: future_count > 1 - TraceEvent("Nim_Case SS 3").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); - metadata->watchFutureSS = watchValue(Future(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags); } - } - - // if there is no key in the map then destroy actor - metadata = cx->getWatchMetadata(key); - if (!metadata) return Void(); - - } catch(Error &e) { - // no more watches for the key so shutdown actor - metadata = cx->getWatchMetadata(key); - if (!metadata) return Void(); + } } + } catch(Error &e) { + TraceEvent("Nim_Case SS exit exception"); + if (e.code() == error_code_actor_cancelled) { + TraceEvent("Nim_Case SS exit actor cancelled"); + } + throw e; } } -ACTOR Future watchValueMap(Future version, Key key, Optional value, Database cx, +ACTOR Future sameVersionDifKey(Version ver, Key key, Optional value, Database cx, + TransactionInfo info, TagSet tags) { + state Reference metadata = cx->getWatchMetadata(key); + state ReadYourWritesTransaction tr(cx); + + try { + state Optional valSS = wait(tr.get(key)); + + if (valSS == value) { // val_3 == val_2 + Reference newMetadata(new WatchMetadata(value, ver, info, tags)); + cx->setWatchMetadata(key, newMetadata); + + newMetadata->watchFutureSS = watchStorageServerResp(key, cx); + } + + if (valSS != metadata->value) { // val_3 != val_1 + if (valSS != value) cx->deleteWatchMetadata(key); + + metadata->watchPromise.send(ver); + metadata->watchFutureSS.cancel(); + } + + wait(tr.commit()); + + metadata = cx->getWatchMetadata(key); + if (metadata) { // if val_3 == val_2 + Future watchFuture = success(metadata->watchPromise.getFuture()); + wait(watchFuture); + } + + return Void(); // if val_3 != val_2 + } + catch(Error& e) { + state Error err = e; + wait(tr.onError(e)); + throw err; + } +} + +Future getWatchFuture(Version ver, Key key, Optional value, Database cx, TransactionInfo info, TagSet tags) { TraceEvent("Nim_Case start").detail("Key", key).detail("Value", value); - state Version ver = wait( version ); - state Reference metadata = cx->getWatchMetadata(key); + Reference metadata = cx->getWatchMetadata(key); if (!metadata) { // case 1: key not in map TraceEvent("Nim_Case 1").detail("Key", key).detail("Value", value); - Future watchFutureSS = watchValue(version, key, value, cx, info, tags); - metadata = Reference(new WatchMetadata(value, ver, watchFutureSS, info, tags)); + + metadata = Reference(new WatchMetadata(value, ver, info, tags)); cx->setWatchMetadata(key, metadata); + metadata->watchFutureSS = watchStorageServerResp(key, cx); Future watchFuture = success(metadata->watchPromise.getFuture()); - Future SSResp = watchStorageServerResp(key, cx); - wait(SSResp && watchFuture); + return watchFuture; } else if (metadata->value == value) { // case 2: val_1 == val_2 TraceEvent("Nim_Case 2").detail("Key", key).detail("Value", value); if (ver > metadata->version) { + TraceEvent("Nim_Case 2 replace version").detail("Key", key).detail("Value", value); metadata->version = ver; metadata->info = info; metadata->tags = tags; } Future watchFuture = success(metadata->watchPromise.getFuture()); - wait(watchFuture); + return watchFuture; } else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1 TraceEvent("Nim_Case 3").detail("Key", key).detail("Value", value); - Future watchFutureSS = watchValue(version, key, value, cx, info, tags); - Reference newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags)); + + Reference newMetadata(new WatchMetadata(value, ver, info, tags)); cx->setWatchMetadata(key, newMetadata); + newMetadata->watchFutureSS = watchStorageServerResp(key, cx); + metadata->watchPromise.send(ver); metadata->watchFutureSS.cancel(); Future watchFuture = success(newMetadata->watchPromise.getFuture()); - wait(watchFuture); - } else if (metadata->value != value && metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2 + return watchFuture; + } else if (metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2 TraceEvent("Nim_Case 5").detail("Key", key).detail("Value", value); - state ReadYourWritesTransaction tr(cx); - - try { - state Optional valSS = wait(tr.get(key)); - - if (valSS == value) { // val_3 == val_2 - Future watchFutureSS = watchValue(version, key, value, cx, info, tags); - Reference newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags)); - cx->setWatchMetadata(key, newMetadata); - } - - if (valSS != metadata->value) { // val_3 != val_1 - if (valSS != value) cx->deleteWatchMetadata(key); - - metadata->watchPromise.send(ver); - metadata->watchFutureSS.cancel(); - } - - wait(tr.commit()); - - if (valSS != value) return Void(); - - metadata = cx->getWatchMetadata(key); - if (metadata) { // if val_3 == val_2 - Future watchFuture = success(metadata->watchPromise.getFuture()); - wait(watchFuture); - } - } - catch(Error& e) { - wait(tr.onError(e)); - } + return sameVersionDifKey(ver, key, value, cx, info, tags); } TraceEvent("Nim_Case return").detail("Key", key).detail("Value", value); // Note: We will reach here for case 3: val_1 != val_2 && version_2 < version_1 + return Future(); +} + +ACTOR Future watchValueMap(Future version, Key key, Optional value, Database cx, + TransactionInfo info, TagSet tags) { + state Version ver = wait(version); + wait(getWatchFuture(ver, key, value, cx, info, tags)); return Void(); }