From 639afbe62cc157a3428261bf8783088becc9ac13 Mon Sep 17 00:00:00 2001 From: Xiaoge Su Date: Mon, 17 Oct 2022 20:45:02 -0700 Subject: [PATCH] Cancel watch when the key is not being waited Currently, there is a cyclic reference situation in DatabaseContext -> WatchMetadata -> watchStorageServerResp -> DatabaseContext If there is a watch created in the DatabaseContext, even the corresponding wait ACTOR is cancelled, the WatchMetadata will still hold a reference to watchStorageServerResp ACTOR, which holds a reference to DatabaseContext. In this situation, any DatabaseContext who held a watch will not be automatically destructed since its reference count will never reduce to 0 until the watch value is changed. Every time the cluster recoveries, several watches are created, and when the cluster restarts, the DatabaseContext which not being used, will not be able to destructed due to these watches. With this patch, each wait to the watch will be counted. Either the watch is triggered or cancelled, the corresponding count will be reduced. If a watch is not being waited, the watch will be cancelled, effectively reduce the reference count of DatabaseContext. This will hopefully fix the issue mentioned above. The code is tested by 1) Manually change the number of logs of a local cluster, see the cluster recovery and previous DatabaseContext being destructed; 2) 100K joshua run, with 1 failure, the same test will fail on the current git main branch. --- fdbclient/NativeAPI.actor.cpp | 94 ++++++++++++- fdbclient/include/fdbclient/DatabaseContext.h | 38 +++++- fdbserver/ClusterRecovery.actor.cpp | 124 +++++++++++++----- 3 files changed, 213 insertions(+), 43 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index fa301ef15c..fcef80270a 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2163,14 +2163,14 @@ void DatabaseContext::setOption(FDBDatabaseOptions::Option option, Optional= maxOutstandingWatches) throw too_many_watches(); ++outstandingWatches; } -void DatabaseContext::removeWatch() { +void DatabaseContext::removeWatchCounter() { --outstandingWatches; ASSERT(outstandingWatches >= 0); } @@ -2390,15 +2390,44 @@ Reference DatabaseContext::getWatchMetadata(int64_t tenantId, Key } void DatabaseContext::setWatchMetadata(Reference metadata) { - watchMap[std::make_pair(metadata->parameters->tenant.tenantId, metadata->parameters->key)] = metadata; + const WatchMapKey key(metadata->parameters->tenant.tenantId, metadata->parameters->key); + watchMap[key] = metadata; + // NOTE Here we do *NOT* update/reset the reference count for the key, see the source code in getWatchFuture +} + +int32_t DatabaseContext::increaseWatchRefCount(const int64_t tenantID, KeyRef key) { + const WatchMapKey mapKey(tenantID, key); + if (watchCounterMap.count(mapKey) == 0) { + watchCounterMap[mapKey] = 0; + } + const auto count = ++watchCounterMap[mapKey]; + return count; +} + +int32_t DatabaseContext::decreaseWatchRefCount(const int64_t tenantID, KeyRef key) { + const WatchMapKey mapKey(tenantID, key); + if (watchCounterMap.count(mapKey) == 0) { + // Key does not exist. The metadata might be removed by deleteWatchMetadata already. + return 0; + } + const auto count = --watchCounterMap[mapKey]; + ASSERT(watchCounterMap[mapKey] >= 0); + if (watchCounterMap[mapKey] == 0) { + getWatchMetadata(tenantID, key)->watchFutureSS.cancel(); + deleteWatchMetadata(tenantID, key); + } + return count; } void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) { - watchMap.erase(std::make_pair(tenantId, key)); + const WatchMapKey mapKey(tenantId, key); + watchMap.erase(mapKey); + watchCounterMap.erase(mapKey); } void DatabaseContext::clearWatchMetadata() { watchMap.clear(); + watchCounterMap.clear(); } const UniqueOrderedOptionList& Database::getTransactionDefaults() const { @@ -3913,6 +3942,56 @@ Future getWatchFuture(Database cx, Reference parameters) return Void(); } +namespace { + +// NOTE: Since an ACTOR could receive multiple exceptions for a single catch clause, e.g. broken promise together with +// operation cancelled, If the decreaseWatchRefCount is placed at the catch clause, it might be triggered for multiple +// times. One could check if the SAV isSet, but seems a more intuitive way is to use RAII-style constructor/destructor +// pair. Yet the object has to be constructed after a wait statement, so it must be trivially-constructible. This +// requires move-assignment operator implemented. +class WatchRefCountUpdater { + Database cx; + int64_t tenantID; + KeyRef key; + + void tryAddRefCount() { + if (cx.getReference()) { + cx->increaseWatchRefCount(tenantID, key); + } + } + + void tryDelRefCount() { + if (cx.getReference()) { + cx->decreaseWatchRefCount(tenantID, key); + } + } + +public: + WatchRefCountUpdater() {} + + WatchRefCountUpdater(const Database& cx_, const int64_t tenantID_, KeyRef key_) + : cx(cx_), tenantID(tenantID_), key(key_) { + tryAddRefCount(); + } + + WatchRefCountUpdater& operator=(WatchRefCountUpdater&& other) { + tryDelRefCount(); + + // NOTE: Do not use move semantic, this copy allows other delete the reference count properly. + cx = other.cx; + tenantID = other.tenantID; + key = other.key; + + tryAddRefCount(); + + return *this; + } + + ~WatchRefCountUpdater() { tryDelRefCount(); } +}; + +} // namespace + ACTOR Future watchValueMap(Future version, TenantInfo tenant, Key key, @@ -3924,6 +4003,7 @@ ACTOR Future watchValueMap(Future version, Optional debugID, UseProvisionalProxies useProvisionalProxies) { state Version ver = wait(version); + state WatchRefCountUpdater watchRefCountUpdater(cx, tenant.tenantId, key); wait(getWatchFuture(cx, makeReference( @@ -5456,11 +5536,11 @@ ACTOR Future watch(Reference watch, } } } catch (Error& e) { - cx->removeWatch(); + cx->removeWatchCounter(); throw; } - cx->removeWatch(); + cx->removeWatchCounter(); return Void(); } @@ -5471,7 +5551,7 @@ Future Transaction::getRawReadVersion() { Future Transaction::watch(Reference watch) { ++trState->cx->transactionWatchRequests; - trState->cx->addWatch(); + trState->cx->addWatchCounter(); watches.push_back(watch); return ::watch( watch, diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index 390873e0ef..33c31bf76e 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -156,6 +156,8 @@ public: WatchMetadata(Reference parameters) : watchFuture(watchPromise.getFuture()), parameters(parameters) {} + + ~WatchMetadata() { watchFutureSS.cancel(); } }; struct MutationAndVersionStream { @@ -328,14 +330,32 @@ public: // Note: this will never return if the server is running a protocol from FDB 5.0 or older Future getClusterProtocol(Optional expectedVersion = Optional()); - // Update the watch counter for the database - void addWatch(); - void removeWatch(); + // Increases the counter of the number of watches in this DatabaseContext by 1. If the number of watches is too + // many, throws too_many_watches. + void addWatchCounter(); + + // Decrease the counter of the number of watches in this DatabaseContext by 1 + void removeWatchCounter(); // watch map operations + + // Gets the watch metadata per tenant id and key Reference getWatchMetadata(int64_t tenantId, KeyRef key) const; + + // Refreshes the watch metadata. If the same watch is used (this is determined by the tenant id and the key), the + // metadata will be updated. void setWatchMetadata(Reference metadata); + + // Removes the watch metadata void deleteWatchMetadata(int64_t tenant, KeyRef key); + + // Increases reference count to the given watch. Returns the number of references to the watch. + int32_t increaseWatchRefCount(const int64_t tenant, KeyRef key); + + // Decreases reference count to the given watch. If the reference count is dropped to 0, the watch metadata will be + // removed. Returns the number of references to the watch. + int32_t decreaseWatchRefCount(const int64_t tenant, KeyRef key); + void clearWatchMetadata(); void setOption(FDBDatabaseOptions::Option option, Optional value); @@ -703,8 +723,16 @@ public: EventCacheHolder connectToDatabaseEventCacheHolder; private: - std::unordered_map, Reference, boost::hash>> - watchMap; + using WatchMapKey = std::pair; + using WatchMapKeyHasher = boost::hash; + using WatchMapValue = Reference; + using WatchMap_t = std::unordered_map; + + WatchMap_t watchMap; + + using WatchCounterMap_t = std::unordered_map; + // Maps the number of the WatchMapKey being used. + WatchCounterMap_t watchCounterMap; }; // Similar to tr.onError(), but doesn't require a DatabaseContext. diff --git a/fdbserver/ClusterRecovery.actor.cpp b/fdbserver/ClusterRecovery.actor.cpp index afc6d85bfa..77d3261a53 100644 --- a/fdbserver/ClusterRecovery.actor.cpp +++ b/fdbserver/ClusterRecovery.actor.cpp @@ -567,44 +567,106 @@ ACTOR Future changeCoordinators(Reference self) { } } -ACTOR Future configurationMonitor(Reference self, Database cx) { +namespace { + +// NOTE: This vector may not be initialized here as the keys might be initialized *AFTER* this vector, causing all +// keys empty. The reason is that the order of the initialization of keys and this vector might not be ordered as wished +// so the vector might be initialized before the keys receives values; thus, all values inside the vector are copied +// from uninitialized KeyRefs.. +// See C++11 standard 3.6.2 for more info. +std::vector configurationMonitorWatchKeys; + +ACTOR Future configurationMonitorImpl(Reference self, + Database cx, + Optional* pTenantId) { + state ReadYourWritesTransaction tr(cx); loop { - state ReadYourWritesTransaction tr(cx); + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY)); - ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); - - DatabaseConfiguration conf; - conf.fromKeyValues((VectorRef)results); - TraceEvent("ConfigurationMonitor", self->dbgid) - .detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), - self->recoveryState); - if (conf != self->configuration) { - if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED && - self->recoveryState != RecoveryState::FULLY_RECOVERED) { - self->controllerData->shouldCommitSuicide = true; - throw restart_cluster_controller(); - } - - self->configuration = conf; - self->registrationTrigger.trigger(); + DatabaseConfiguration conf; + conf.fromKeyValues((VectorRef)results); + TraceEvent("ConfigurationMonitor", self->dbgid) + .detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), + self->recoveryState); + if (conf != self->configuration) { + if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED && + self->recoveryState != RecoveryState::FULLY_RECOVERED) { + self->controllerData->shouldCommitSuicide = true; + throw restart_cluster_controller(); } - state Future watchFuture = - tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || - tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) || - tr.watch(failedLocalityVersionKey); - wait(tr.commit()); - wait(watchFuture); - break; - } catch (Error& e) { - wait(tr.onError(e)); + self->configuration = conf; + self->registrationTrigger.trigger(); } + + std::vector> watchFutures; + std::transform(std::begin(configurationMonitorWatchKeys), + std::end(configurationMonitorWatchKeys), + std::back_inserter(watchFutures), + [this](KeyRef key) { return tr.watch(key); }); + // Only after this stage, where getKeyLocation is called indirectly, the tenant information is updated and + // set to the transaction state. + (*pTenantId) = tr.getTransactionState()->tenantId(); + state Future watchFuture = waitForAny(watchFutures); + + wait(tr.commit()); + + wait(watchFuture); + break; + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw e; + } + wait(tr.onError(e)); } } + + return Void(); +} + +} // anonymous namespace + +ACTOR Future configurationMonitor(Reference self, Database cx) { + state Optional tenantId; + + // The keys cannot be initialized where it is defined. see comments at the definition. + if (configurationMonitorWatchKeys.empty()) { + configurationMonitorWatchKeys = std::vector{ moveKeysLockOwnerKey, + excludedServersVersionKey, + failedServersVersionKey, + excludedLocalityVersionKey, + failedLocalityVersionKey }; + } + + try { + while (true) { + wait(configurationMonitorImpl(self, cx, &tenantId)); + } + } catch (Error& e) { + if (e.code() != error_code_actor_cancelled) { + throw e; + } + + // Cancel all watches created by configurationMonitorImpl. Due to a circular reference issue, if the watches are + // not cancelled manually, the DatabaseContext object in cx will not be able to destructed properly, see + // + // https://github.com/apple/foundationdb/issues/8321 + // + // for more detailed discussion. + + if (!tenantId.present()) { + // Tenant ID not set, no watches are created in this case, no cleanup required. + return Void(); + } + std::for_each(std::begin(configurationMonitorWatchKeys), + std::end(configurationMonitorWatchKeys), + [this](KeyRef key) { cx->decreaseWatchRefCount(tenantId.get(), key); }); + } + + return Void(); } ACTOR static Future> getMinBackupVersion(Reference self, Database cx) {