Revert "Cancel watch when the key is not being waited"
This reverts commit 639afbe62c
.
This commit is contained in:
parent
fbe9802be5
commit
dc60f63f9b
|
@ -2163,14 +2163,14 @@ void DatabaseContext::setOption(FDBDatabaseOptions::Option option, Optional<Stri
|
|||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::addWatchCounter() {
|
||||
void DatabaseContext::addWatch() {
|
||||
if (outstandingWatches >= maxOutstandingWatches)
|
||||
throw too_many_watches();
|
||||
|
||||
++outstandingWatches;
|
||||
}
|
||||
|
||||
void DatabaseContext::removeWatchCounter() {
|
||||
void DatabaseContext::removeWatch() {
|
||||
--outstandingWatches;
|
||||
ASSERT(outstandingWatches >= 0);
|
||||
}
|
||||
|
@ -2390,44 +2390,15 @@ Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, Key
|
|||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
const WatchMapKey key(metadata->parameters->tenant.tenantId, metadata->parameters->key);
|
||||
watchMap[key] = metadata;
|
||||
// NOTE Here we do *NOT* update/reset the reference count for the key, see the source code in getWatchFuture
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::increaseWatchRefCount(const int64_t tenantID, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
if (watchCounterMap.count(mapKey) == 0) {
|
||||
watchCounterMap[mapKey] = 0;
|
||||
}
|
||||
const auto count = ++watchCounterMap[mapKey];
|
||||
return count;
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::decreaseWatchRefCount(const int64_t tenantID, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
if (watchCounterMap.count(mapKey) == 0) {
|
||||
// Key does not exist. The metadata might be removed by deleteWatchMetadata already.
|
||||
return 0;
|
||||
}
|
||||
const auto count = --watchCounterMap[mapKey];
|
||||
ASSERT(watchCounterMap[mapKey] >= 0);
|
||||
if (watchCounterMap[mapKey] == 0) {
|
||||
getWatchMetadata(tenantID, key)->watchFutureSS.cancel();
|
||||
deleteWatchMetadata(tenantID, key);
|
||||
}
|
||||
return count;
|
||||
watchMap[std::make_pair(metadata->parameters->tenant.tenantId, metadata->parameters->key)] = metadata;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantId, key);
|
||||
watchMap.erase(mapKey);
|
||||
watchCounterMap.erase(mapKey);
|
||||
watchMap.erase(std::make_pair(tenantId, key));
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
watchCounterMap.clear();
|
||||
}
|
||||
|
||||
const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDefaults() const {
|
||||
|
@ -3942,56 +3913,6 @@ Future<Void> getWatchFuture(Database cx, Reference<WatchParameters> parameters)
|
|||
return Void();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// NOTE: Since an ACTOR could receive multiple exceptions for a single catch clause, e.g. broken promise together with
|
||||
// operation cancelled, If the decreaseWatchRefCount is placed at the catch clause, it might be triggered for multiple
|
||||
// times. One could check if the SAV isSet, but seems a more intuitive way is to use RAII-style constructor/destructor
|
||||
// pair. Yet the object has to be constructed after a wait statement, so it must be trivially-constructible. This
|
||||
// requires move-assignment operator implemented.
|
||||
class WatchRefCountUpdater {
|
||||
Database cx;
|
||||
int64_t tenantID;
|
||||
KeyRef key;
|
||||
|
||||
void tryAddRefCount() {
|
||||
if (cx.getReference()) {
|
||||
cx->increaseWatchRefCount(tenantID, key);
|
||||
}
|
||||
}
|
||||
|
||||
void tryDelRefCount() {
|
||||
if (cx.getReference()) {
|
||||
cx->decreaseWatchRefCount(tenantID, key);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
WatchRefCountUpdater() {}
|
||||
|
||||
WatchRefCountUpdater(const Database& cx_, const int64_t tenantID_, KeyRef key_)
|
||||
: cx(cx_), tenantID(tenantID_), key(key_) {
|
||||
tryAddRefCount();
|
||||
}
|
||||
|
||||
WatchRefCountUpdater& operator=(WatchRefCountUpdater&& other) {
|
||||
tryDelRefCount();
|
||||
|
||||
// NOTE: Do not use move semantic, this copy allows other delete the reference count properly.
|
||||
cx = other.cx;
|
||||
tenantID = other.tenantID;
|
||||
key = other.key;
|
||||
|
||||
tryAddRefCount();
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
~WatchRefCountUpdater() { tryDelRefCount(); }
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Void> watchValueMap(Future<Version> version,
|
||||
TenantInfo tenant,
|
||||
Key key,
|
||||
|
@ -4003,7 +3924,6 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
|
|||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies) {
|
||||
state Version ver = wait(version);
|
||||
state WatchRefCountUpdater watchRefCountUpdater(cx, tenant.tenantId, key);
|
||||
|
||||
wait(getWatchFuture(cx,
|
||||
makeReference<WatchParameters>(
|
||||
|
@ -5536,11 +5456,11 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
cx->removeWatchCounter();
|
||||
cx->removeWatch();
|
||||
throw;
|
||||
}
|
||||
|
||||
cx->removeWatchCounter();
|
||||
cx->removeWatch();
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -5551,7 +5471,7 @@ Future<Version> Transaction::getRawReadVersion() {
|
|||
Future<Void> Transaction::watch(Reference<Watch> watch) {
|
||||
++trState->cx->transactionWatchRequests;
|
||||
|
||||
trState->cx->addWatchCounter();
|
||||
trState->cx->addWatch();
|
||||
watches.push_back(watch);
|
||||
return ::watch(
|
||||
watch,
|
||||
|
|
|
@ -156,8 +156,6 @@ public:
|
|||
|
||||
WatchMetadata(Reference<const WatchParameters> parameters)
|
||||
: watchFuture(watchPromise.getFuture()), parameters(parameters) {}
|
||||
|
||||
~WatchMetadata() { watchFutureSS.cancel(); }
|
||||
};
|
||||
|
||||
struct MutationAndVersionStream {
|
||||
|
@ -330,32 +328,14 @@ public:
|
|||
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
||||
Future<ProtocolVersion> getClusterProtocol(Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>());
|
||||
|
||||
// Increases the counter of the number of watches in this DatabaseContext by 1. If the number of watches is too
|
||||
// many, throws too_many_watches.
|
||||
void addWatchCounter();
|
||||
|
||||
// Decrease the counter of the number of watches in this DatabaseContext by 1
|
||||
void removeWatchCounter();
|
||||
// Update the watch counter for the database
|
||||
void addWatch();
|
||||
void removeWatch();
|
||||
|
||||
// watch map operations
|
||||
|
||||
// Gets the watch metadata per tenant id and key
|
||||
Reference<WatchMetadata> getWatchMetadata(int64_t tenantId, KeyRef key) const;
|
||||
|
||||
// Refreshes the watch metadata. If the same watch is used (this is determined by the tenant id and the key), the
|
||||
// metadata will be updated.
|
||||
void setWatchMetadata(Reference<WatchMetadata> metadata);
|
||||
|
||||
// Removes the watch metadata
|
||||
void deleteWatchMetadata(int64_t tenant, KeyRef key);
|
||||
|
||||
// Increases reference count to the given watch. Returns the number of references to the watch.
|
||||
int32_t increaseWatchRefCount(const int64_t tenant, KeyRef key);
|
||||
|
||||
// Decreases reference count to the given watch. If the reference count is dropped to 0, the watch metadata will be
|
||||
// removed. Returns the number of references to the watch.
|
||||
int32_t decreaseWatchRefCount(const int64_t tenant, KeyRef key);
|
||||
|
||||
void clearWatchMetadata();
|
||||
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value);
|
||||
|
@ -723,16 +703,8 @@ public:
|
|||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||
|
||||
private:
|
||||
using WatchMapKey = std::pair<int64_t, Key>;
|
||||
using WatchMapKeyHasher = boost::hash<WatchMapKey>;
|
||||
using WatchMapValue = Reference<WatchMetadata>;
|
||||
using WatchMap_t = std::unordered_map<WatchMapKey, WatchMapValue, WatchMapKeyHasher>;
|
||||
|
||||
WatchMap_t watchMap;
|
||||
|
||||
using WatchCounterMap_t = std::unordered_map<WatchMapKey, int32_t, WatchMapKeyHasher>;
|
||||
// Maps the number of the WatchMapKey being used.
|
||||
WatchCounterMap_t watchCounterMap;
|
||||
std::unordered_map<std::pair<int64_t, Key>, Reference<WatchMetadata>, boost::hash<std::pair<int64_t, Key>>>
|
||||
watchMap;
|
||||
};
|
||||
|
||||
// Similar to tr.onError(), but doesn't require a DatabaseContext.
|
||||
|
|
|
@ -563,106 +563,44 @@ ACTOR Future<Void> changeCoordinators(Reference<ClusterRecoveryData> self) {
|
|||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// NOTE: This vector may not be initialized here as the keys might be initialized *AFTER* this vector, causing all
|
||||
// keys empty. The reason is that the order of the initialization of keys and this vector might not be ordered as wished
|
||||
// so the vector might be initialized before the keys receives values; thus, all values inside the vector are copied
|
||||
// from uninitialized KeyRefs..
|
||||
// See C++11 standard 3.6.2 for more info.
|
||||
std::vector<KeyRef> configurationMonitorWatchKeys;
|
||||
|
||||
ACTOR Future<Void> configurationMonitorImpl(Reference<ClusterRecoveryData> self,
|
||||
Database cx,
|
||||
Optional<int64_t>* pTenantId) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
ACTOR Future<Void> configurationMonitor(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.fromKeyValues((VectorRef<KeyValueRef>)results);
|
||||
TraceEvent("ConfigurationMonitor", self->dbgid)
|
||||
.detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(),
|
||||
self->recoveryState);
|
||||
if (conf != self->configuration) {
|
||||
if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED &&
|
||||
self->recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
self->controllerData->shouldCommitSuicide = true;
|
||||
throw restart_cluster_controller();
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.fromKeyValues((VectorRef<KeyValueRef>)results);
|
||||
TraceEvent("ConfigurationMonitor", self->dbgid)
|
||||
.detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(),
|
||||
self->recoveryState);
|
||||
if (conf != self->configuration) {
|
||||
if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED &&
|
||||
self->recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
self->controllerData->shouldCommitSuicide = true;
|
||||
throw restart_cluster_controller();
|
||||
}
|
||||
|
||||
self->configuration = conf;
|
||||
self->registrationTrigger.trigger();
|
||||
}
|
||||
|
||||
self->configuration = conf;
|
||||
self->registrationTrigger.trigger();
|
||||
state Future<Void> watchFuture =
|
||||
tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) ||
|
||||
tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) ||
|
||||
tr.watch(failedLocalityVersionKey);
|
||||
wait(tr.commit());
|
||||
wait(watchFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> watchFutures;
|
||||
std::transform(std::begin(configurationMonitorWatchKeys),
|
||||
std::end(configurationMonitorWatchKeys),
|
||||
std::back_inserter(watchFutures),
|
||||
[this](KeyRef key) { return tr.watch(key); });
|
||||
// Only after this stage, where getKeyLocation is called indirectly, the tenant information is updated and
|
||||
// set to the transaction state.
|
||||
(*pTenantId) = tr.getTransactionState()->tenantId();
|
||||
state Future<Void> watchFuture = waitForAny(watchFutures);
|
||||
|
||||
wait(tr.commit());
|
||||
|
||||
wait(watchFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
ACTOR Future<Void> configurationMonitor(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
state Optional<int64_t> tenantId;
|
||||
|
||||
// The keys cannot be initialized where it is defined. see comments at the definition.
|
||||
if (configurationMonitorWatchKeys.empty()) {
|
||||
configurationMonitorWatchKeys = std::vector<KeyRef>{ moveKeysLockOwnerKey,
|
||||
excludedServersVersionKey,
|
||||
failedServersVersionKey,
|
||||
excludedLocalityVersionKey,
|
||||
failedLocalityVersionKey };
|
||||
}
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
wait(configurationMonitorImpl(self, cx, &tenantId));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Cancel all watches created by configurationMonitorImpl. Due to a circular reference issue, if the watches are
|
||||
// not cancelled manually, the DatabaseContext object in cx will not be able to destructed properly, see
|
||||
//
|
||||
// https://github.com/apple/foundationdb/issues/8321
|
||||
//
|
||||
// for more detailed discussion.
|
||||
|
||||
if (!tenantId.present()) {
|
||||
// Tenant ID not set, no watches are created in this case, no cleanup required.
|
||||
return Void();
|
||||
}
|
||||
std::for_each(std::begin(configurationMonitorWatchKeys),
|
||||
std::end(configurationMonitorWatchKeys),
|
||||
[this](KeyRef key) { cx->decreaseWatchRefCount(tenantId.get(), key); });
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<Version>> getMinBackupVersion(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
|
|
Loading…
Reference in New Issue