From b6e7961b789a98e8cac9fb29851dedeb6fc75f2c Mon Sep 17 00:00:00 2001 From: Nim Wijetunga Date: Tue, 2 Feb 2021 20:29:03 -0500 Subject: [PATCH] change to references --- fdbclient/DatabaseContext.h | 24 ++++++- fdbclient/NativeAPI.actor.cpp | 124 ++++++++++++++++++---------------- fdbclient/NativeAPI.actor.h | 19 ------ 3 files changed, 88 insertions(+), 79 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index ab2debfe4c..4521085459 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -140,6 +140,20 @@ public: } }; +class WatchMetadata: public ReferenceCounted { + public: + Optional value; + Version version; + Promise watchPromise; + Future watchFuture; + Future watchFutureSS; + + TransactionInfo info; + TagSet tags; + + WatchMetadata(Optional value, Version version, Future watchFutureSS, TransactionInfo info, TagSet tags); +}; + class DatabaseContext : public ReferenceCounted, public FastAllocated, NonCopyable { public: static DatabaseContext* allocateOnForeignThread() { @@ -175,7 +189,12 @@ public: // Update the watch counter for the database void addWatch(); void removeWatch(); - + + // watch map operations + Reference getWatchMetadata(Key key) const; + void setWatchMetadata(Key key, Reference metadata); + void deleteWatchMetadata(Key key); + void setOption( FDBDatabaseOptions::Option option, Optional value ); Error deferredError; @@ -364,7 +383,8 @@ public: std::unique_ptr &&impl); static bool debugUseTags; - static const std::vector debugTransactionTagChoices; + static const std::vector debugTransactionTagChoices; + std::map> watchMap; }; #endif diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 1a48a002cf..d2a0d868a6 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1372,17 +1372,17 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, boo return Database::createDatabase(rccf, apiVersion, internal, clientLocality); } -Optional Database::getWatchMetadata(Key key) const { +Reference DatabaseContext::getWatchMetadata(Key key) const { const auto it = watchMap.find(key); - if (it == watchMap.end()) return Optional(); + if (it == watchMap.end()) return Reference(); return it->second; } -void Database::setWatchMetadata(Key key, WatchMetadata metadata) { +void DatabaseContext::setWatchMetadata(Key key, Reference metadata) { watchMap[key] = metadata; } -void Database::deleteWatchMetadata(Key key) { +void DatabaseContext::deleteWatchMetadata(Key key) { watchMap.erase(key); } @@ -1391,8 +1391,6 @@ WatchMetadata::WatchMetadata(Optional value, Version version, Future& Database::getTransactionDefaults() const { ASSERT(db); return db->transactionDefaults; @@ -2169,6 +2167,7 @@ ACTOR Future watchValue(Future version, Key key, OptionalgetCurrentTask()); } state WatchValueReply resp; + TraceEvent("Nim_WatcherCommitted Started"); choose { when(WatchValueReply r = wait( loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue, @@ -2187,11 +2186,14 @@ ACTOR Future watchValue(Future version, Key key, OptionalMAX_VERSIONS_IN_FLIGHT - if (v - resp.version < 50000000) return resp.version; + if (v - resp.version < 50000000) { + TraceEvent("Nim_WatcherCommitted Done!"); + return resp.version; + } ver = v; } catch (Error& e) { if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { @@ -2214,75 +2216,81 @@ ACTOR Future watchValue(Future version, Key key, Optional watchStorageServerResp(Key key, Database cx) { - state Optional metadata = cx.getWatchMetadata(key); - if (!metadata.present()) return Void(); + state Reference metadata = cx->getWatchMetadata(key); + if (!metadata) return Void(); loop { try { - Version watchVersion = wait(metadata.get().watchFutureSS); - - if (watchVersion >= metadata.get().version) { // case 1: version_1 (SS) >= version_2 (map) - cx.deleteWatchMetadata(key); - metadata.get().watchPromise.send(watchVersion); + TraceEvent("Nim_Case SS enter").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); + Version watchVersion = wait(metadata->watchFutureSS); + TraceEvent("Nim_Case SS done wait"); + if (watchVersion >= metadata->version) { // case 1: version_1 (SS) >= version_2 (map) + TraceEvent("Nim_Case SS 1").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); + cx->deleteWatchMetadata(key); + metadata->watchPromise.send(watchVersion); } else { - // 2 because we have one future ref in map and one in the current function - if (metadata.get().watchPromise.getFutureReferenceCount() == 2) { // case 2: version_1 < version_2 and future_count == 2 - cx.deleteWatchMetadata(key); - } else { // case 3: future_count > 2 - metadata.get().watchFutureSS = watchValue(Future(metadata.get().version), key, metadata.get().value, cx, metadata.get().info, metadata.get().tags); - cx.setWatchMetadata(key, metadata.get()); + if (metadata->watchPromise.getFutureReferenceCount() == 1) { // case 2: version_1 < version_2 and future_count == 1 + TraceEvent("Nim_Case SS 2").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); + cx->deleteWatchMetadata(key); + } else { // case 3: future_count > 1 + TraceEvent("Nim_Case SS 3").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount()); + metadata->watchFutureSS = watchValue(Future(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags); } } // if there is no key in the map then destroy actor - metadata = cx.getWatchMetadata(key); - if (!metadata.present()) return Void(); + metadata = cx->getWatchMetadata(key); + if (!metadata) return Void(); } catch(Error &e) { // no more watches for the key so shutdown actor - metadata = cx.getWatchMetadata(key); - if (!metadata.present()) return Void(); + metadata = cx->getWatchMetadata(key); + if (!metadata) return Void(); } } } ACTOR Future watchValueMap(Future version, Key key, Optional value, Database cx, TransactionInfo info, TagSet tags) { + TraceEvent("Nim_Case start").detail("Key", key).detail("Value", value); state Version ver = wait( version ); - state Optional metadata = cx.getWatchMetadata(key); + state Reference metadata = cx->getWatchMetadata(key); - if (!metadata.present()) { // case 1: key not in map + if (!metadata) { // case 1: key not in map + TraceEvent("Nim_Case 1").detail("Key", key).detail("Value", value); Future watchFutureSS = watchValue(version, key, value, cx, info, tags); - WatchMetadata newMetadata = WatchMetadata(value, ver, watchFutureSS, info, tags); - cx.setWatchMetadata(key, newMetadata); + metadata = Reference(new WatchMetadata(value, ver, watchFutureSS, info, tags)); + cx->setWatchMetadata(key, metadata); + + Future watchFuture = success(metadata->watchPromise.getFuture()); - Future watchFuture = success(newMetadata.watchPromise.getFuture()); - Future SSResp = watchStorageServerResp(key, cx); - wait(watchFuture); - } else if (metadata.get().value == value) { // case 2: val_1 == val_2 - if (ver > metadata.get().version) { - metadata.get().version = ver; - metadata.get().info = info; - metadata.get().tags = tags; - cx.setWatchMetadata(key, metadata.get()); + wait(SSResp && watchFuture); + } + else if (metadata->value == value) { // case 2: val_1 == val_2 + TraceEvent("Nim_Case 2").detail("Key", key).detail("Value", value); + if (ver > metadata->version) { + metadata->version = ver; + metadata->info = info; + metadata->tags = tags; } - Future watchFuture = success(metadata.get().watchPromise.getFuture()); + Future watchFuture = success(metadata->watchPromise.getFuture()); wait(watchFuture); - } else if(ver > metadata.get().version) { // case 3: val_1 != val_2 && version_2 > version_1 - + } else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1 + TraceEvent("Nim_Case 3").detail("Key", key).detail("Value", value); Future watchFutureSS = watchValue(version, key, value, cx, info, tags); - WatchMetadata newMetadata = WatchMetadata(value, ver, watchFutureSS, info, tags); - cx.setWatchMetadata(key, newMetadata); + Reference newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags)); + cx->setWatchMetadata(key, newMetadata); - metadata.get().watchPromise.send(ver); - metadata.get().watchFutureSS.cancel(); + metadata->watchPromise.send(ver); + metadata->watchFutureSS.cancel(); - Future watchFuture = success(newMetadata.watchPromise.getFuture()); + Future watchFuture = success(newMetadata->watchPromise.getFuture()); wait(watchFuture); - } else if (metadata.get().value != value && metadata.get().version == ver) { // case 5: val_1 != val_2 && version_1 == version_2 + } else if (metadata->value != value && metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2 + TraceEvent("Nim_Case 5").detail("Key", key).detail("Value", value); state ReadYourWritesTransaction tr(cx); try { @@ -2290,24 +2298,24 @@ ACTOR Future watchValueMap(Future version, Key key, Optional watchFutureSS = watchValue(version, key, value, cx, info, tags); - WatchMetadata newMetadata = WatchMetadata(value, ver, watchFutureSS, info, tags); - cx.setWatchMetadata(key, newMetadata); + Reference newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags)); + cx->setWatchMetadata(key, newMetadata); } - if (valSS != metadata.get().value) { // val_3 != val_1 - if (valSS != value) cx.deleteWatchMetadata(key); + if (valSS != metadata->value) { // val_3 != val_1 + if (valSS != value) cx->deleteWatchMetadata(key); - metadata.get().watchPromise.send(ver); - metadata.get().watchFutureSS.cancel(); + metadata->watchPromise.send(ver); + metadata->watchFutureSS.cancel(); } wait(tr.commit()); if (valSS != value) return Void(); - metadata = cx.getWatchMetadata(key); - if (metadata.present()) { // if val_3 == val_2 - Future watchFuture = success(metadata.get().watchPromise.getFuture()); + metadata = cx->getWatchMetadata(key); + if (metadata) { // if val_3 == val_2 + Future watchFuture = success(metadata->watchPromise.getFuture()); wait(watchFuture); } } @@ -2315,7 +2323,7 @@ ACTOR Future watchValueMap(Future version, Key key, Optional watchVersion = getCommittedVersion() > 0 ? getCommittedVersion() : getReadVersion(); - + TraceEvent("Nim_calling watchValueMap"); for(int i = 0; i < watches.size(); ++i) watches[i]->setWatch(watchValueMap(watchVersion, watches[i]->key, watches[i]->value, cx, info, options.readTags)); diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 97796f2e03..033bc2883b 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -94,13 +94,8 @@ public: const UniqueOrderedOptionList& getTransactionDefaults() const; - Optional getWatchMetadata(Key key) const; - void setWatchMetadata(Key key, WatchMetadata metadata); - void deleteWatchMetadata(Key key); - private: Reference db; - std::map watchMap; }; void setNetworkOption(FDBNetworkOptions::Option option, Optional value = Optional() ); @@ -174,20 +169,6 @@ struct TransactionInfo { : taskID(taskID), spanID(spanID), useProvisionalProxies(false) {} }; -struct WatchMetadata { - Optional value; - Version version; - Promise watchPromise; - Future watchFuture; - Future watchFutureSS; - - TransactionInfo info; - TagSet tags; - - WatchMetadata(); - WatchMetadata(Optional value, Version version, Future watchFutureSS, TransactionInfo info, TagSet tags); -}; - struct TransactionLogInfo : public ReferenceCounted, NonCopyable { enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 };