Record the version of each watch
In the case 1. A watch to key A is set, the watchValueMap ACTOR, noted as X, starts waiting. 2. All watches are cleared due to connection string change. 3. The watch to key A is restarted with watchValueMap ACTOR Y. 4. X receives the cancel exception, and tries to dereference the counter. This causes Y gets cancelled. the reference count will cause watch prematurely terminate. Recording the versions of each watch would help preventing this issue
This commit is contained in:
parent
ab0f827058
commit
4bd24e4d64
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* DatabaseContext.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
|
||||
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, KeyRef key) const {
|
||||
const auto it = watchMap.find(std::make_pair(tenantId, key));
|
||||
if (it == watchMap.end())
|
||||
return Reference<WatchMetadata>();
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
const WatchMapKey key(metadata->parameters->tenant.tenantId, metadata->parameters->key);
|
||||
watchMap[key] = metadata;
|
||||
// NOTE Here we do *NOT* update/reset the reference count for the key, see the source code in getWatchFuture.
|
||||
// Basically the reference count could be increased, or the same watch is refreshed, or the watch might be cancelled
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::increaseWatchRefCount(const int64_t tenantID, KeyRef key, const Version& version) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
watchCounterMap[mapKey].insert(version);
|
||||
return watchCounterMap[mapKey].size();
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::decreaseWatchRefCount(const int64_t tenantID, KeyRef key, const Version& version) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
auto mapKeyIter = watchCounterMap.find(mapKey);
|
||||
if (mapKeyIter == std::end(watchCounterMap)) {
|
||||
// Key does not exist. The metadata might be removed by deleteWatchMetadata already.
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto& versionSet = mapKeyIter->second;
|
||||
auto versionIter = versionSet.find(version);
|
||||
|
||||
if (versionIter == std::end(versionSet)) {
|
||||
// Version not found, the watch might be cleared before.
|
||||
return versionSet.size();
|
||||
}
|
||||
versionSet.erase(versionIter);
|
||||
|
||||
const auto count = versionSet.size();
|
||||
// The metadata might be deleted somewhere else, before calling this decreaseWatchRefCount
|
||||
if (auto metadata = getWatchMetadata(tenantID, key); metadata.isValid() && versionSet.size() == 0) {
|
||||
// It is a *must* to cancel the watchFutureSS manually. watchFutureSS waits for watchStorageServerResp, which
|
||||
// holds a reference to the metadata. If the ACTOR is not cancelled, it indirectly holds a Future waiting for
|
||||
// itself.
|
||||
metadata->watchFutureSS.cancel();
|
||||
deleteWatchMetadata(tenantID, key);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantId, key);
|
||||
watchMap.erase(mapKey);
|
||||
watchCounterMap.erase(mapKey);
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
watchCounterMap.clear();
|
||||
}
|
|
@ -2382,54 +2382,6 @@ Database Database::createSimulatedExtraDatabase(std::string connectionString, Op
|
|||
return db;
|
||||
}
|
||||
|
||||
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, KeyRef key) const {
|
||||
const auto it = watchMap.find(std::make_pair(tenantId, key));
|
||||
if (it == watchMap.end())
|
||||
return Reference<WatchMetadata>();
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
const WatchMapKey key(metadata->parameters->tenant.tenantId, metadata->parameters->key);
|
||||
watchMap[key] = metadata;
|
||||
// NOTE Here we do *NOT* update/reset the reference count for the key, see the source code in getWatchFuture
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::increaseWatchRefCount(const int64_t tenantID, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
if (watchCounterMap.count(mapKey) == 0) {
|
||||
watchCounterMap[mapKey] = 0;
|
||||
}
|
||||
const auto count = ++watchCounterMap[mapKey];
|
||||
return count;
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::decreaseWatchRefCount(const int64_t tenantID, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
if (watchCounterMap.count(mapKey) == 0) {
|
||||
// Key does not exist. The metadata might be removed by deleteWatchMetadata already.
|
||||
return 0;
|
||||
}
|
||||
const auto count = --watchCounterMap[mapKey];
|
||||
ASSERT(watchCounterMap[mapKey] >= 0);
|
||||
if (watchCounterMap[mapKey] == 0) {
|
||||
getWatchMetadata(tenantID, key)->watchFutureSS.cancel();
|
||||
deleteWatchMetadata(tenantID, key);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantId, key);
|
||||
watchMap.erase(mapKey);
|
||||
watchCounterMap.erase(mapKey);
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
watchCounterMap.clear();
|
||||
}
|
||||
|
||||
const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDefaults() const {
|
||||
ASSERT(db);
|
||||
return db->transactionDefaults;
|
||||
|
@ -3953,41 +3905,35 @@ class WatchRefCountUpdater {
|
|||
Database cx;
|
||||
int64_t tenantID;
|
||||
KeyRef key;
|
||||
|
||||
void tryAddRefCount() {
|
||||
if (cx.getReference()) {
|
||||
cx->increaseWatchRefCount(tenantID, key);
|
||||
}
|
||||
}
|
||||
|
||||
void tryDelRefCount() {
|
||||
if (cx.getReference()) {
|
||||
cx->decreaseWatchRefCount(tenantID, key);
|
||||
}
|
||||
}
|
||||
Version version;
|
||||
|
||||
public:
|
||||
WatchRefCountUpdater() {}
|
||||
|
||||
WatchRefCountUpdater(const Database& cx_, const int64_t tenantID_, KeyRef key_)
|
||||
: cx(cx_), tenantID(tenantID_), key(key_) {
|
||||
tryAddRefCount();
|
||||
}
|
||||
WatchRefCountUpdater(const Database& cx_, const int64_t tenantID_, KeyRef key_, const Version& ver)
|
||||
: cx(cx_), tenantID(tenantID_), key(key_), version(ver) {}
|
||||
|
||||
WatchRefCountUpdater& operator=(WatchRefCountUpdater&& other) {
|
||||
tryDelRefCount();
|
||||
// Since this class is only used by watchValueMap, and it is used *AFTER* a wait statement, this class is first
|
||||
// initialized by default constructor, then, after the wait, this function is called to assign the actual
|
||||
// database, key, etc., to re-initialize this object. At this stage, the reference count can be increased. And
|
||||
// since the database object is moved, the rvalue will have null reference to the DatabaseContext and will not
|
||||
// reduce the reference count.
|
||||
cx = std::move(other.cx);
|
||||
tenantID = std::move(other.tenantID);
|
||||
key = std::move(other.key);
|
||||
version = std::move(other.version);
|
||||
|
||||
// NOTE: Do not use move semantic, this copy allows other delete the reference count properly.
|
||||
cx = other.cx;
|
||||
tenantID = other.tenantID;
|
||||
key = other.key;
|
||||
|
||||
tryAddRefCount();
|
||||
cx->increaseWatchRefCount(tenantID, key, version);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
~WatchRefCountUpdater() { tryDelRefCount(); }
|
||||
~WatchRefCountUpdater() {
|
||||
if (cx.getReference()) {
|
||||
cx->decreaseWatchRefCount(tenantID, key, version);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
@ -4003,7 +3949,7 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
|
|||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies) {
|
||||
state Version ver = wait(version);
|
||||
state WatchRefCountUpdater watchRefCountUpdater(cx, tenant.tenantId, key);
|
||||
state WatchRefCountUpdater watchRefCountUpdater(cx, tenant.tenantId, key, ver);
|
||||
|
||||
wait(getWatchFuture(cx,
|
||||
makeReference<WatchParameters>(
|
||||
|
@ -5480,17 +5426,26 @@ ACTOR Future<Void> restartWatch(Database cx,
|
|||
tenantInfo.tenantId = locationInfo.tenantEntry.id;
|
||||
}
|
||||
|
||||
wait(watchValueMap(cx->minAcceptableReadVersion,
|
||||
tenantInfo,
|
||||
key,
|
||||
value,
|
||||
cx,
|
||||
tags,
|
||||
spanContext,
|
||||
taskID,
|
||||
debugID,
|
||||
useProvisionalProxies));
|
||||
if (key == "anotherKey"_sr)
|
||||
std::cout << cx->dbId.toString() << " restartWatch" << std::endl;
|
||||
|
||||
try {
|
||||
wait(watchValueMap(cx->minAcceptableReadVersion,
|
||||
tenantInfo,
|
||||
key,
|
||||
value,
|
||||
cx,
|
||||
tags,
|
||||
spanContext,
|
||||
taskID,
|
||||
debugID,
|
||||
useProvisionalProxies));
|
||||
} catch (Error& err) {
|
||||
std::cout << cx->dbId.toString() << " restartWatch fail " << err.code() << std::endl;
|
||||
return Void();
|
||||
}
|
||||
|
||||
std::cout << cx->dbId.toString() << " restartWatch pass" << std::endl;
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -157,7 +157,7 @@ public:
|
|||
WatchMetadata(Reference<const WatchParameters> parameters)
|
||||
: watchFuture(watchPromise.getFuture()), parameters(parameters) {}
|
||||
|
||||
~WatchMetadata() { watchFutureSS.cancel(); }
|
||||
~WatchMetadata() { /*watchFutureSS.cancel();*/ }
|
||||
};
|
||||
|
||||
struct MutationAndVersionStream {
|
||||
|
@ -350,11 +350,11 @@ public:
|
|||
void deleteWatchMetadata(int64_t tenant, KeyRef key);
|
||||
|
||||
// Increases reference count to the given watch. Returns the number of references to the watch.
|
||||
int32_t increaseWatchRefCount(const int64_t tenant, KeyRef key);
|
||||
int32_t increaseWatchRefCount(const int64_t tenant, KeyRef key, const Version& version);
|
||||
|
||||
// Decreases reference count to the given watch. If the reference count is dropped to 0, the watch metadata will be
|
||||
// removed. Returns the number of references to the watch.
|
||||
int32_t decreaseWatchRefCount(const int64_t tenant, KeyRef key);
|
||||
int32_t decreaseWatchRefCount(const int64_t tenant, KeyRef key, const Version& version);
|
||||
|
||||
void clearWatchMetadata();
|
||||
|
||||
|
@ -730,7 +730,17 @@ private:
|
|||
|
||||
WatchMap_t watchMap;
|
||||
|
||||
using WatchCounterMap_t = std::unordered_map<WatchMapKey, int32_t, WatchMapKeyHasher>;
|
||||
// The reason of using a multiset of Versions as counter instead of a simpler integer counter is due to the
|
||||
// possible race condition:
|
||||
//
|
||||
// 1. A watch to key A is set, the watchValueMap ACTOR, noted as X, starts waiting.
|
||||
// 2. All watches are cleared due to connection string change.
|
||||
// 3. The watch to key A is restarted with watchValueMap ACTOR Y.
|
||||
// 4. X receives the cancel exception, and tries to dereference the counter. This causes Y gets cancelled.
|
||||
//
|
||||
// By introducing versions, this race condition is solved.
|
||||
using WatchCounterMapValue = std::multiset<Version>;
|
||||
using WatchCounterMap_t = std::unordered_map<WatchMapKey, WatchCounterMapValue, WatchMapKeyHasher>;
|
||||
// Maps the number of the WatchMapKey being used.
|
||||
WatchCounterMap_t watchCounterMap;
|
||||
};
|
||||
|
|
|
@ -788,7 +788,7 @@ public:
|
|||
T const& get() const { return sav->get(); }
|
||||
T getValue() const { return get(); }
|
||||
|
||||
bool isValid() const { return sav != 0; }
|
||||
bool isValid() const { return sav != nullptr; }
|
||||
bool isReady() const { return sav->isSet(); }
|
||||
bool isError() const { return sav->isError(); }
|
||||
// returns true if get can be called on this future (counterpart of canBeSet on Promises)
|
||||
|
@ -798,12 +798,12 @@ public:
|
|||
return sav->error_state;
|
||||
}
|
||||
|
||||
Future() : sav(0) {}
|
||||
Future() : sav(nullptr) {}
|
||||
Future(const Future<T>& rhs) : sav(rhs.sav) {
|
||||
if (sav)
|
||||
sav->addFutureRef();
|
||||
}
|
||||
Future(Future<T>&& rhs) noexcept : sav(rhs.sav) { rhs.sav = 0; }
|
||||
Future(Future<T>&& rhs) noexcept : sav(rhs.sav) { rhs.sav = nullptr; }
|
||||
Future(const T& presentValue) : sav(new SAV<T>(1, 0)) { sav->send(presentValue); }
|
||||
Future(T&& presentValue) : sav(new SAV<T>(1, 0)) { sav->send(std::move(presentValue)); }
|
||||
Future(Never) : sav(new SAV<T>(1, 0)) { sav->send(Never()); }
|
||||
|
@ -830,7 +830,7 @@ public:
|
|||
if (sav)
|
||||
sav->delFutureRef();
|
||||
sav = rhs.sav;
|
||||
rhs.sav = 0;
|
||||
rhs.sav = nullptr;
|
||||
}
|
||||
}
|
||||
bool operator==(const Future& rhs) { return rhs.sav == sav; }
|
||||
|
@ -843,17 +843,17 @@ public:
|
|||
|
||||
void addCallbackAndClear(Callback<T>* cb) {
|
||||
sav->addCallbackAndDelFutureRef(cb);
|
||||
sav = 0;
|
||||
sav = nullptr;
|
||||
}
|
||||
|
||||
void addYieldedCallbackAndClear(Callback<T>* cb) {
|
||||
sav->addYieldedCallbackAndDelFutureRef(cb);
|
||||
sav = 0;
|
||||
sav = nullptr;
|
||||
}
|
||||
|
||||
void addCallbackChainAndClear(Callback<T>* cb) {
|
||||
sav->addCallbackChainAndDelFutureRef(cb);
|
||||
sav = 0;
|
||||
sav = nullptr;
|
||||
}
|
||||
|
||||
int getFutureReferenceCount() const { return sav->getFutureReferenceCount(); }
|
||||
|
|
Loading…
Reference in New Issue