Cancel watch when the key is not being waited
Currently, there is a cyclic reference situation in DatabaseContext -> WatchMetadata -> watchStorageServerResp -> DatabaseContext If there is a watch created in the DatabaseContext, even the corresponding wait ACTOR is cancelled, the WatchMetadata will still hold a reference to watchStorageServerResp ACTOR, which holds a reference to DatabaseContext. In this situation, any DatabaseContext who held a watch will not be automatically destructed since its reference count will never reduce to 0 until the watch value is changed. Every time the cluster recoveries, several watches are created, and when the cluster restarts, the DatabaseContext which not being used, will not be able to destructed due to these watches. With this patch, each wait to the watch will be counted. Either the watch is triggered or cancelled, the corresponding count will be reduced. If a watch is not being waited, the watch will be cancelled, effectively reduce the reference count of DatabaseContext. This will hopefully fix the issue mentioned above. The code is tested by 1) Manually change the number of logs of a local cluster, see the cluster recovery and previous DatabaseContext being destructed; 2) 100K joshua run, with 1 failure, the same test will fail on the current git main branch.
This commit is contained in:
parent
03b102d86a
commit
639afbe62c
|
@ -2163,14 +2163,14 @@ void DatabaseContext::setOption(FDBDatabaseOptions::Option option, Optional<Stri
|
|||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::addWatch() {
|
||||
void DatabaseContext::addWatchCounter() {
|
||||
if (outstandingWatches >= maxOutstandingWatches)
|
||||
throw too_many_watches();
|
||||
|
||||
++outstandingWatches;
|
||||
}
|
||||
|
||||
void DatabaseContext::removeWatch() {
|
||||
void DatabaseContext::removeWatchCounter() {
|
||||
--outstandingWatches;
|
||||
ASSERT(outstandingWatches >= 0);
|
||||
}
|
||||
|
@ -2390,15 +2390,44 @@ Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, Key
|
|||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
watchMap[std::make_pair(metadata->parameters->tenant.tenantId, metadata->parameters->key)] = metadata;
|
||||
const WatchMapKey key(metadata->parameters->tenant.tenantId, metadata->parameters->key);
|
||||
watchMap[key] = metadata;
|
||||
// NOTE Here we do *NOT* update/reset the reference count for the key, see the source code in getWatchFuture
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::increaseWatchRefCount(const int64_t tenantID, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
if (watchCounterMap.count(mapKey) == 0) {
|
||||
watchCounterMap[mapKey] = 0;
|
||||
}
|
||||
const auto count = ++watchCounterMap[mapKey];
|
||||
return count;
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::decreaseWatchRefCount(const int64_t tenantID, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
if (watchCounterMap.count(mapKey) == 0) {
|
||||
// Key does not exist. The metadata might be removed by deleteWatchMetadata already.
|
||||
return 0;
|
||||
}
|
||||
const auto count = --watchCounterMap[mapKey];
|
||||
ASSERT(watchCounterMap[mapKey] >= 0);
|
||||
if (watchCounterMap[mapKey] == 0) {
|
||||
getWatchMetadata(tenantID, key)->watchFutureSS.cancel();
|
||||
deleteWatchMetadata(tenantID, key);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
watchMap.erase(std::make_pair(tenantId, key));
|
||||
const WatchMapKey mapKey(tenantId, key);
|
||||
watchMap.erase(mapKey);
|
||||
watchCounterMap.erase(mapKey);
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
watchCounterMap.clear();
|
||||
}
|
||||
|
||||
const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDefaults() const {
|
||||
|
@ -3913,6 +3942,56 @@ Future<Void> getWatchFuture(Database cx, Reference<WatchParameters> parameters)
|
|||
return Void();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// NOTE: Since an ACTOR could receive multiple exceptions for a single catch clause, e.g. broken promise together with
|
||||
// operation cancelled, If the decreaseWatchRefCount is placed at the catch clause, it might be triggered for multiple
|
||||
// times. One could check if the SAV isSet, but seems a more intuitive way is to use RAII-style constructor/destructor
|
||||
// pair. Yet the object has to be constructed after a wait statement, so it must be trivially-constructible. This
|
||||
// requires move-assignment operator implemented.
|
||||
class WatchRefCountUpdater {
|
||||
Database cx;
|
||||
int64_t tenantID;
|
||||
KeyRef key;
|
||||
|
||||
void tryAddRefCount() {
|
||||
if (cx.getReference()) {
|
||||
cx->increaseWatchRefCount(tenantID, key);
|
||||
}
|
||||
}
|
||||
|
||||
void tryDelRefCount() {
|
||||
if (cx.getReference()) {
|
||||
cx->decreaseWatchRefCount(tenantID, key);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
WatchRefCountUpdater() {}
|
||||
|
||||
WatchRefCountUpdater(const Database& cx_, const int64_t tenantID_, KeyRef key_)
|
||||
: cx(cx_), tenantID(tenantID_), key(key_) {
|
||||
tryAddRefCount();
|
||||
}
|
||||
|
||||
WatchRefCountUpdater& operator=(WatchRefCountUpdater&& other) {
|
||||
tryDelRefCount();
|
||||
|
||||
// NOTE: Do not use move semantic, this copy allows other delete the reference count properly.
|
||||
cx = other.cx;
|
||||
tenantID = other.tenantID;
|
||||
key = other.key;
|
||||
|
||||
tryAddRefCount();
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
~WatchRefCountUpdater() { tryDelRefCount(); }
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Void> watchValueMap(Future<Version> version,
|
||||
TenantInfo tenant,
|
||||
Key key,
|
||||
|
@ -3924,6 +4003,7 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
|
|||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies) {
|
||||
state Version ver = wait(version);
|
||||
state WatchRefCountUpdater watchRefCountUpdater(cx, tenant.tenantId, key);
|
||||
|
||||
wait(getWatchFuture(cx,
|
||||
makeReference<WatchParameters>(
|
||||
|
@ -5456,11 +5536,11 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
cx->removeWatch();
|
||||
cx->removeWatchCounter();
|
||||
throw;
|
||||
}
|
||||
|
||||
cx->removeWatch();
|
||||
cx->removeWatchCounter();
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -5471,7 +5551,7 @@ Future<Version> Transaction::getRawReadVersion() {
|
|||
Future<Void> Transaction::watch(Reference<Watch> watch) {
|
||||
++trState->cx->transactionWatchRequests;
|
||||
|
||||
trState->cx->addWatch();
|
||||
trState->cx->addWatchCounter();
|
||||
watches.push_back(watch);
|
||||
return ::watch(
|
||||
watch,
|
||||
|
|
|
@ -156,6 +156,8 @@ public:
|
|||
|
||||
WatchMetadata(Reference<const WatchParameters> parameters)
|
||||
: watchFuture(watchPromise.getFuture()), parameters(parameters) {}
|
||||
|
||||
~WatchMetadata() { watchFutureSS.cancel(); }
|
||||
};
|
||||
|
||||
struct MutationAndVersionStream {
|
||||
|
@ -328,14 +330,32 @@ public:
|
|||
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
||||
Future<ProtocolVersion> getClusterProtocol(Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>());
|
||||
|
||||
// Update the watch counter for the database
|
||||
void addWatch();
|
||||
void removeWatch();
|
||||
// Increases the counter of the number of watches in this DatabaseContext by 1. If the number of watches is too
|
||||
// many, throws too_many_watches.
|
||||
void addWatchCounter();
|
||||
|
||||
// Decrease the counter of the number of watches in this DatabaseContext by 1
|
||||
void removeWatchCounter();
|
||||
|
||||
// watch map operations
|
||||
|
||||
// Gets the watch metadata per tenant id and key
|
||||
Reference<WatchMetadata> getWatchMetadata(int64_t tenantId, KeyRef key) const;
|
||||
|
||||
// Refreshes the watch metadata. If the same watch is used (this is determined by the tenant id and the key), the
|
||||
// metadata will be updated.
|
||||
void setWatchMetadata(Reference<WatchMetadata> metadata);
|
||||
|
||||
// Removes the watch metadata
|
||||
void deleteWatchMetadata(int64_t tenant, KeyRef key);
|
||||
|
||||
// Increases reference count to the given watch. Returns the number of references to the watch.
|
||||
int32_t increaseWatchRefCount(const int64_t tenant, KeyRef key);
|
||||
|
||||
// Decreases reference count to the given watch. If the reference count is dropped to 0, the watch metadata will be
|
||||
// removed. Returns the number of references to the watch.
|
||||
int32_t decreaseWatchRefCount(const int64_t tenant, KeyRef key);
|
||||
|
||||
void clearWatchMetadata();
|
||||
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value);
|
||||
|
@ -703,8 +723,16 @@ public:
|
|||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||
|
||||
private:
|
||||
std::unordered_map<std::pair<int64_t, Key>, Reference<WatchMetadata>, boost::hash<std::pair<int64_t, Key>>>
|
||||
watchMap;
|
||||
using WatchMapKey = std::pair<int64_t, Key>;
|
||||
using WatchMapKeyHasher = boost::hash<WatchMapKey>;
|
||||
using WatchMapValue = Reference<WatchMetadata>;
|
||||
using WatchMap_t = std::unordered_map<WatchMapKey, WatchMapValue, WatchMapKeyHasher>;
|
||||
|
||||
WatchMap_t watchMap;
|
||||
|
||||
using WatchCounterMap_t = std::unordered_map<WatchMapKey, int32_t, WatchMapKeyHasher>;
|
||||
// Maps the number of the WatchMapKey being used.
|
||||
WatchCounterMap_t watchCounterMap;
|
||||
};
|
||||
|
||||
// Similar to tr.onError(), but doesn't require a DatabaseContext.
|
||||
|
|
|
@ -567,44 +567,106 @@ ACTOR Future<Void> changeCoordinators(Reference<ClusterRecoveryData> self) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> configurationMonitor(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
namespace {
|
||||
|
||||
// NOTE: This vector may not be initialized here as the keys might be initialized *AFTER* this vector, causing all
|
||||
// keys empty. The reason is that the order of the initialization of keys and this vector might not be ordered as wished
|
||||
// so the vector might be initialized before the keys receives values; thus, all values inside the vector are copied
|
||||
// from uninitialized KeyRefs..
|
||||
// See C++11 standard 3.6.2 for more info.
|
||||
std::vector<KeyRef> configurationMonitorWatchKeys;
|
||||
|
||||
ACTOR Future<Void> configurationMonitorImpl(Reference<ClusterRecoveryData> self,
|
||||
Database cx,
|
||||
Optional<int64_t>* pTenantId) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.fromKeyValues((VectorRef<KeyValueRef>)results);
|
||||
TraceEvent("ConfigurationMonitor", self->dbgid)
|
||||
.detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(),
|
||||
self->recoveryState);
|
||||
if (conf != self->configuration) {
|
||||
if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED &&
|
||||
self->recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
self->controllerData->shouldCommitSuicide = true;
|
||||
throw restart_cluster_controller();
|
||||
}
|
||||
|
||||
self->configuration = conf;
|
||||
self->registrationTrigger.trigger();
|
||||
DatabaseConfiguration conf;
|
||||
conf.fromKeyValues((VectorRef<KeyValueRef>)results);
|
||||
TraceEvent("ConfigurationMonitor", self->dbgid)
|
||||
.detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(),
|
||||
self->recoveryState);
|
||||
if (conf != self->configuration) {
|
||||
if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED &&
|
||||
self->recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
self->controllerData->shouldCommitSuicide = true;
|
||||
throw restart_cluster_controller();
|
||||
}
|
||||
|
||||
state Future<Void> watchFuture =
|
||||
tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) ||
|
||||
tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) ||
|
||||
tr.watch(failedLocalityVersionKey);
|
||||
wait(tr.commit());
|
||||
wait(watchFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
self->configuration = conf;
|
||||
self->registrationTrigger.trigger();
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> watchFutures;
|
||||
std::transform(std::begin(configurationMonitorWatchKeys),
|
||||
std::end(configurationMonitorWatchKeys),
|
||||
std::back_inserter(watchFutures),
|
||||
[this](KeyRef key) { return tr.watch(key); });
|
||||
// Only after this stage, where getKeyLocation is called indirectly, the tenant information is updated and
|
||||
// set to the transaction state.
|
||||
(*pTenantId) = tr.getTransactionState()->tenantId();
|
||||
state Future<Void> watchFuture = waitForAny(watchFutures);
|
||||
|
||||
wait(tr.commit());
|
||||
|
||||
wait(watchFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
ACTOR Future<Void> configurationMonitor(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
state Optional<int64_t> tenantId;
|
||||
|
||||
// The keys cannot be initialized where it is defined. see comments at the definition.
|
||||
if (configurationMonitorWatchKeys.empty()) {
|
||||
configurationMonitorWatchKeys = std::vector<KeyRef>{ moveKeysLockOwnerKey,
|
||||
excludedServersVersionKey,
|
||||
failedServersVersionKey,
|
||||
excludedLocalityVersionKey,
|
||||
failedLocalityVersionKey };
|
||||
}
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
wait(configurationMonitorImpl(self, cx, &tenantId));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Cancel all watches created by configurationMonitorImpl. Due to a circular reference issue, if the watches are
|
||||
// not cancelled manually, the DatabaseContext object in cx will not be able to destructed properly, see
|
||||
//
|
||||
// https://github.com/apple/foundationdb/issues/8321
|
||||
//
|
||||
// for more detailed discussion.
|
||||
|
||||
if (!tenantId.present()) {
|
||||
// Tenant ID not set, no watches are created in this case, no cleanup required.
|
||||
return Void();
|
||||
}
|
||||
std::for_each(std::begin(configurationMonitorWatchKeys),
|
||||
std::end(configurationMonitorWatchKeys),
|
||||
[this](KeyRef key) { cx->decreaseWatchRefCount(tenantId.get(), key); });
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<Version>> getMinBackupVersion(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
|
|
Loading…
Reference in New Issue