Revert "Merge pull request #8602 from apple/revert-8498-mmmm"
This reverts commit4d789b2fd9
, reversing changes made to9625efd5b9
.
This commit is contained in:
parent
3d2124df80
commit
b35d1b614e
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* DatabaseContext.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
|
||||
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, KeyRef key) const {
|
||||
const auto it = watchMap.find(std::make_pair(tenantId, key));
|
||||
if (it == watchMap.end())
|
||||
return Reference<WatchMetadata>();
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
const WatchMapKey key(metadata->parameters->tenant.tenantId, metadata->parameters->key);
|
||||
watchMap[key] = metadata;
|
||||
// NOTE Here we do *NOT* update/reset the reference count for the key, see the source code in getWatchFuture.
|
||||
// Basically the reference count could be increased, or the same watch is refreshed, or the watch might be cancelled
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::increaseWatchRefCount(const int64_t tenantID, KeyRef key, const Version& version) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
watchCounterMap[mapKey].insert(version);
|
||||
return watchCounterMap[mapKey].size();
|
||||
}
|
||||
|
||||
int32_t DatabaseContext::decreaseWatchRefCount(const int64_t tenantID, KeyRef key, const Version& version) {
|
||||
const WatchMapKey mapKey(tenantID, key);
|
||||
auto mapKeyIter = watchCounterMap.find(mapKey);
|
||||
if (mapKeyIter == std::end(watchCounterMap)) {
|
||||
// Key does not exist. The metadata might be removed by deleteWatchMetadata already.
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto& versionSet = mapKeyIter->second;
|
||||
auto versionIter = versionSet.find(version);
|
||||
|
||||
if (versionIter == std::end(versionSet)) {
|
||||
// Version not found, the watch might be cleared before.
|
||||
return versionSet.size();
|
||||
}
|
||||
versionSet.erase(versionIter);
|
||||
|
||||
const auto count = versionSet.size();
|
||||
// The metadata might be deleted somewhere else, before calling this decreaseWatchRefCount
|
||||
if (auto metadata = getWatchMetadata(tenantID, key); metadata.isValid() && versionSet.size() == 0) {
|
||||
// It is a *must* to cancel the watchFutureSS manually. watchFutureSS waits for watchStorageServerResp, which
|
||||
// holds a reference to the metadata. If the ACTOR is not cancelled, it indirectly holds a Future waiting for
|
||||
// itself.
|
||||
metadata->watchFutureSS.cancel();
|
||||
deleteWatchMetadata(tenantID, key);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
const WatchMapKey mapKey(tenantId, key);
|
||||
watchMap.erase(mapKey);
|
||||
watchCounterMap.erase(mapKey);
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
watchCounterMap.clear();
|
||||
}
|
|
@ -2162,14 +2162,14 @@ void DatabaseContext::setOption(FDBDatabaseOptions::Option option, Optional<Stri
|
|||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::addWatch() {
|
||||
void DatabaseContext::addWatchCounter() {
|
||||
if (outstandingWatches >= maxOutstandingWatches)
|
||||
throw too_many_watches();
|
||||
|
||||
++outstandingWatches;
|
||||
}
|
||||
|
||||
void DatabaseContext::removeWatch() {
|
||||
void DatabaseContext::removeWatchCounter() {
|
||||
--outstandingWatches;
|
||||
ASSERT(outstandingWatches >= 0);
|
||||
}
|
||||
|
@ -2381,25 +2381,6 @@ Database Database::createSimulatedExtraDatabase(std::string connectionString, Op
|
|||
return db;
|
||||
}
|
||||
|
||||
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(int64_t tenantId, KeyRef key) const {
|
||||
const auto it = watchMap.find(std::make_pair(tenantId, key));
|
||||
if (it == watchMap.end())
|
||||
return Reference<WatchMetadata>();
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
|
||||
watchMap[std::make_pair(metadata->parameters->tenant.tenantId, metadata->parameters->key)] = metadata;
|
||||
}
|
||||
|
||||
void DatabaseContext::deleteWatchMetadata(int64_t tenantId, KeyRef key) {
|
||||
watchMap.erase(std::make_pair(tenantId, key));
|
||||
}
|
||||
|
||||
void DatabaseContext::clearWatchMetadata() {
|
||||
watchMap.clear();
|
||||
}
|
||||
|
||||
const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDefaults() const {
|
||||
ASSERT(db);
|
||||
return db->transactionDefaults;
|
||||
|
@ -3914,6 +3895,50 @@ Future<Void> getWatchFuture(Database cx, Reference<WatchParameters> parameters)
|
|||
return Void();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// NOTE: Since an ACTOR could receive multiple exceptions for a single catch clause, e.g. broken promise together with
|
||||
// operation cancelled, If the decreaseWatchRefCount is placed at the catch clause, it might be triggered for multiple
|
||||
// times. One could check if the SAV isSet, but seems a more intuitive way is to use RAII-style constructor/destructor
|
||||
// pair. Yet the object has to be constructed after a wait statement, so it must be trivially-constructible. This
|
||||
// requires move-assignment operator implemented.
|
||||
class WatchRefCountUpdater {
|
||||
Database cx;
|
||||
int64_t tenantID;
|
||||
KeyRef key;
|
||||
Version version;
|
||||
|
||||
public:
|
||||
WatchRefCountUpdater() = default;
|
||||
|
||||
WatchRefCountUpdater(const Database& cx_, const int64_t tenantID_, KeyRef key_, const Version& ver)
|
||||
: cx(cx_), tenantID(tenantID_), key(key_), version(ver) {}
|
||||
|
||||
WatchRefCountUpdater& operator=(WatchRefCountUpdater&& other) {
|
||||
// Since this class is only used by watchValueMap, and it is used *AFTER* a wait statement, this class is first
|
||||
// initialized by default constructor, then, after the wait, this function is called to assign the actual
|
||||
// database, key, etc., to re-initialize this object. At this stage, the reference count can be increased. And
|
||||
// since the database object is moved, the rvalue will have null reference to the DatabaseContext and will not
|
||||
// reduce the reference count.
|
||||
cx = std::move(other.cx);
|
||||
tenantID = std::move(other.tenantID);
|
||||
key = std::move(other.key);
|
||||
version = std::move(other.version);
|
||||
|
||||
cx->increaseWatchRefCount(tenantID, key, version);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
~WatchRefCountUpdater() {
|
||||
if (cx.getReference()) {
|
||||
cx->decreaseWatchRefCount(tenantID, key, version);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Void> watchValueMap(Future<Version> version,
|
||||
TenantInfo tenant,
|
||||
Key key,
|
||||
|
@ -3925,6 +3950,7 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
|
|||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies) {
|
||||
state Version ver = wait(version);
|
||||
state WatchRefCountUpdater watchRefCountUpdater(cx, tenant.tenantId, key, ver);
|
||||
|
||||
wait(getWatchFuture(cx,
|
||||
makeReference<WatchParameters>(
|
||||
|
@ -5464,11 +5490,11 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
cx->removeWatch();
|
||||
cx->removeWatchCounter();
|
||||
throw;
|
||||
}
|
||||
|
||||
cx->removeWatch();
|
||||
cx->removeWatchCounter();
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -5479,7 +5505,7 @@ Future<Version> Transaction::getRawReadVersion() {
|
|||
Future<Void> Transaction::watch(Reference<Watch> watch) {
|
||||
++trState->cx->transactionWatchRequests;
|
||||
|
||||
trState->cx->addWatch();
|
||||
trState->cx->addWatchCounter();
|
||||
watches.push_back(watch);
|
||||
return ::watch(
|
||||
watch,
|
||||
|
|
|
@ -328,14 +328,32 @@ public:
|
|||
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
||||
Future<ProtocolVersion> getClusterProtocol(Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>());
|
||||
|
||||
// Update the watch counter for the database
|
||||
void addWatch();
|
||||
void removeWatch();
|
||||
// Increases the counter of the number of watches in this DatabaseContext by 1. If the number of watches is too
|
||||
// many, throws too_many_watches.
|
||||
void addWatchCounter();
|
||||
|
||||
// Decrease the counter of the number of watches in this DatabaseContext by 1
|
||||
void removeWatchCounter();
|
||||
|
||||
// watch map operations
|
||||
|
||||
// Gets the watch metadata per tenant id and key
|
||||
Reference<WatchMetadata> getWatchMetadata(int64_t tenantId, KeyRef key) const;
|
||||
|
||||
// Refreshes the watch metadata. If the same watch is used (this is determined by the tenant id and the key), the
|
||||
// metadata will be updated.
|
||||
void setWatchMetadata(Reference<WatchMetadata> metadata);
|
||||
|
||||
// Removes the watch metadata
|
||||
void deleteWatchMetadata(int64_t tenant, KeyRef key);
|
||||
|
||||
// Increases reference count to the given watch. Returns the number of references to the watch.
|
||||
int32_t increaseWatchRefCount(const int64_t tenant, KeyRef key, const Version& version);
|
||||
|
||||
// Decreases reference count to the given watch. If the reference count is dropped to 0, the watch metadata will be
|
||||
// removed. Returns the number of references to the watch.
|
||||
int32_t decreaseWatchRefCount(const int64_t tenant, KeyRef key, const Version& version);
|
||||
|
||||
void clearWatchMetadata();
|
||||
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value);
|
||||
|
@ -705,8 +723,26 @@ public:
|
|||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||
|
||||
private:
|
||||
std::unordered_map<std::pair<int64_t, Key>, Reference<WatchMetadata>, boost::hash<std::pair<int64_t, Key>>>
|
||||
watchMap;
|
||||
using WatchMapKey = std::pair<int64_t, Key>;
|
||||
using WatchMapKeyHasher = boost::hash<WatchMapKey>;
|
||||
using WatchMapValue = Reference<WatchMetadata>;
|
||||
using WatchMap_t = std::unordered_map<WatchMapKey, WatchMapValue, WatchMapKeyHasher>;
|
||||
|
||||
WatchMap_t watchMap;
|
||||
|
||||
// The reason of using a multiset of Versions as counter instead of a simpler integer counter is due to the
|
||||
// possible race condition:
|
||||
//
|
||||
// 1. A watch to key A is set, the watchValueMap ACTOR, noted as X, starts waiting.
|
||||
// 2. All watches are cleared due to connection string change.
|
||||
// 3. The watch to key A is restarted with watchValueMap ACTOR Y.
|
||||
// 4. X receives the cancel exception, and tries to dereference the counter. This causes Y gets cancelled.
|
||||
//
|
||||
// By introducing versions, this race condition is solved.
|
||||
using WatchCounterMapValue = std::multiset<Version>;
|
||||
using WatchCounterMap_t = std::unordered_map<WatchMapKey, WatchCounterMapValue, WatchMapKeyHasher>;
|
||||
// Maps the number of the WatchMapKey being used.
|
||||
WatchCounterMap_t watchCounterMap;
|
||||
};
|
||||
|
||||
// Similar to tr.onError(), but doesn't require a DatabaseContext.
|
||||
|
|
|
@ -788,7 +788,7 @@ public:
|
|||
T const& get() const { return sav->get(); }
|
||||
T getValue() const { return get(); }
|
||||
|
||||
bool isValid() const { return sav != 0; }
|
||||
bool isValid() const { return sav != nullptr; }
|
||||
bool isReady() const { return sav->isSet(); }
|
||||
bool isError() const { return sav->isError(); }
|
||||
// returns true if get can be called on this future (counterpart of canBeSet on Promises)
|
||||
|
@ -798,16 +798,12 @@ public:
|
|||
return sav->error_state;
|
||||
}
|
||||
|
||||
Future() : sav(0) {}
|
||||
Future() : sav(nullptr) {}
|
||||
Future(const Future<T>& rhs) : sav(rhs.sav) {
|
||||
if (sav)
|
||||
sav->addFutureRef();
|
||||
// if (sav->endpoint.isValid()) std::cout << "Future copied for " << sav->endpoint.key << std::endl;
|
||||
}
|
||||
Future(Future<T>&& rhs) noexcept : sav(rhs.sav) {
|
||||
rhs.sav = 0;
|
||||
// if (sav->endpoint.isValid()) std::cout << "Future moved for " << sav->endpoint.key << std::endl;
|
||||
}
|
||||
Future(Future<T>&& rhs) noexcept : sav(rhs.sav) { rhs.sav = nullptr; }
|
||||
Future(const T& presentValue) : sav(new SAV<T>(1, 0)) { sav->send(presentValue); }
|
||||
Future(T&& presentValue) : sav(new SAV<T>(1, 0)) { sav->send(std::move(presentValue)); }
|
||||
Future(Never) : sav(new SAV<T>(1, 0)) { sav->send(Never()); }
|
||||
|
@ -819,7 +815,6 @@ public:
|
|||
#endif
|
||||
|
||||
~Future() {
|
||||
// if (sav && sav->endpoint.isValid()) std::cout << "Future destroyed for " << sav->endpoint.key << std::endl;
|
||||
if (sav)
|
||||
sav->delFutureRef();
|
||||
}
|
||||
|
@ -835,7 +830,7 @@ public:
|
|||
if (sav)
|
||||
sav->delFutureRef();
|
||||
sav = rhs.sav;
|
||||
rhs.sav = 0;
|
||||
rhs.sav = nullptr;
|
||||
}
|
||||
}
|
||||
bool operator==(const Future& rhs) { return rhs.sav == sav; }
|
||||
|
@ -848,25 +843,23 @@ public:
|
|||
|
||||
void addCallbackAndClear(Callback<T>* cb) {
|
||||
sav->addCallbackAndDelFutureRef(cb);
|
||||
sav = 0;
|
||||
sav = nullptr;
|
||||
}
|
||||
|
||||
void addYieldedCallbackAndClear(Callback<T>* cb) {
|
||||
sav->addYieldedCallbackAndDelFutureRef(cb);
|
||||
sav = 0;
|
||||
sav = nullptr;
|
||||
}
|
||||
|
||||
void addCallbackChainAndClear(Callback<T>* cb) {
|
||||
sav->addCallbackChainAndDelFutureRef(cb);
|
||||
sav = 0;
|
||||
sav = nullptr;
|
||||
}
|
||||
|
||||
int getFutureReferenceCount() const { return sav->getFutureReferenceCount(); }
|
||||
int getPromiseReferenceCount() const { return sav->getPromiseReferenceCount(); }
|
||||
|
||||
explicit Future(SAV<T>* sav) : sav(sav) {
|
||||
// if (sav->endpoint.isValid()) std::cout << "Future created for " << sav->endpoint.key << std::endl;
|
||||
}
|
||||
explicit Future(SAV<T>* sav) : sav(sav) {}
|
||||
|
||||
private:
|
||||
SAV<T>* sav;
|
||||
|
|
Loading…
Reference in New Issue