diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index c049e6f0fc..57a17a766a 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -1,6 +1,8 @@ set(FDB_C_SRCS fdb_c.cpp - foundationdb/fdb_c.h) + foundationdb/fdb_c.h + foundationdb/fdb_c_internal.h + foundationdb/fdb_c_types.h) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/foundationdb) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index 6ab52cd670..24340a7ca2 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -19,6 +19,7 @@ */ #include "fdbclient/FDBTypes.h" +#include "flow/ProtocolVersion.h" #include #define FDB_API_VERSION 710 #define FDB_INCLUDE_LEGACY_TYPES @@ -26,6 +27,7 @@ #include "fdbclient/MultiVersionTransaction.h" #include "fdbclient/MultiVersionAssignmentVars.h" #include "foundationdb/fdb_c.h" +#include "foundationdb/fdb_c_internal.h" int g_api_version = 0; @@ -293,6 +295,10 @@ extern "C" DLLEXPORT fdb_error_t fdb_future_get_mappedkeyvalue_array(FDBFuture* *out_more = rrr.more;); } +extern "C" DLLEXPORT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr) { + CATCH_AND_RETURN(*outPtr = (DatabaseSharedState*)((TSAV(DatabaseSharedState*, f)->get()));); +} + extern "C" DLLEXPORT fdb_error_t fdb_future_get_string_array(FDBFuture* f, const char*** out_strings, int* out_count) { CATCH_AND_RETURN(Standalone> na = TSAV(Standalone>, f)->get(); *out_strings = (const char**)na.begin(); @@ -426,6 +432,17 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db, .extractPtr()); } +extern "C" DLLEXPORT FDBFuture* fdb_database_create_shared_state(FDBDatabase* db) { + return (FDBFuture*)(DB(db)->createSharedState().extractPtr()); +} + +extern "C" DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p) { + try { + DB(db)->setSharedState(p); + } catch (...) { + } +} + // Get network thread busyness (updated every 1s) // A value of 0 indicates that the client is more or less idle // A value of 1 (or more) indicates that the client is saturated diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 8f5d6840fa..db13ab6e85 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -58,21 +58,12 @@ #include #include "fdb_c_options.g.h" +#include "fdb_c_types.h" #ifdef __cplusplus extern "C" { #endif -/* Pointers to these opaque types represent objects in the FDB API */ -typedef struct FDB_future FDBFuture; -typedef struct FDB_result FDBResult; -typedef struct FDB_database FDBDatabase; -typedef struct FDB_tenant FDBTenant; -typedef struct FDB_transaction FDBTransaction; - -typedef int fdb_error_t; -typedef int fdb_bool_t; - DLLEXPORT const char* fdb_get_error(fdb_error_t code); DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_error_t code); diff --git a/bindings/c/foundationdb/fdb_c_internal.h b/bindings/c/foundationdb/fdb_c_internal.h new file mode 100644 index 0000000000..f1897a598e --- /dev/null +++ b/bindings/c/foundationdb/fdb_c_internal.h @@ -0,0 +1,52 @@ +/* + * fdb_c_internal.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDB_C_INTERNAL_H +#define FDB_C_INTERNAL_H +#include "flow/ProtocolVersion.h" +#pragma once + +#ifndef DLLEXPORT +#define DLLEXPORT +#endif + +#ifndef WARN_UNUSED_RESULT +#define WARN_UNUSED_RESULT +#endif + +#include "fdb_c_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// forward declaration and typedef +typedef struct DatabaseSharedState DatabaseSharedState; + +DLLEXPORT FDBFuture* fdb_database_create_shared_state(FDBDatabase* db); + +DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p); + +DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/bindings/c/foundationdb/fdb_c_types.h b/bindings/c/foundationdb/fdb_c_types.h new file mode 100644 index 0000000000..779e227ad5 --- /dev/null +++ b/bindings/c/foundationdb/fdb_c_types.h @@ -0,0 +1,47 @@ +/* + * fdb_c_types.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDB_C_TYPES_H +#define FDB_C_TYPES_H +#pragma once + +#ifndef DLLEXPORT +#define DLLEXPORT +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/* Pointers to these opaque types represent objects in the FDB API */ +typedef struct FDB_future FDBFuture; +typedef struct FDB_result FDBResult; +typedef struct FDB_cluster FDBCluster; +typedef struct FDB_database FDBDatabase; +typedef struct FDB_tenant FDBTenant; +typedef struct FDB_transaction FDBTransaction; + +typedef int fdb_error_t; +typedef int fdb_bool_t; + +#ifdef __cplusplus +} +#endif +#endif diff --git a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp index 732f1778cb..bcddcd9f86 100644 --- a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp +++ b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp @@ -83,8 +83,7 @@ private: auto results = std::make_shared>>(); execTransaction( [kvPairs, results](auto ctx) { - // TODO: Enable after merging with GRV caching - // ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE); + ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE); auto futures = std::make_shared>(); for (const auto& kv : *kvPairs) { futures->push_back(ctx->tx()->get(kv.key, false)); diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 0581db2301..68af830319 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -29,6 +29,7 @@ #include #pragma once +#include "fdbclient/FDBTypes.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/CommitProxyInterface.h" @@ -519,6 +520,11 @@ public: int outstandingWatches; int maxOutstandingWatches; + // Manage any shared state that may be used by MVC + DatabaseSharedState* sharedStatePtr; + Future initSharedState(); + void setSharedState(DatabaseSharedState* p); + // GRV Cache // Database-level read version cache storing the most recent successful GRV as well as the time it was requested. double lastGrvTime; diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 826fd37e7c..9dc3b32a4d 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -28,6 +28,8 @@ #include #include "flow/Arena.h" +#include "flow/FastRef.h" +#include "flow/ProtocolVersion.h" #include "flow/flow.h" enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 }; @@ -1350,6 +1352,29 @@ struct TenantMode { uint32_t mode; }; +struct GRVCacheSpace { + Version cachedReadVersion; + double lastGrvTime; + + GRVCacheSpace() : cachedReadVersion(Version(0)), lastGrvTime(0.0) {} +}; + +// This structure can be extended in the future to include additional features that required a shared state +struct DatabaseSharedState { + // These two members should always be listed first, in this order. + // This is to preserve compatibility with future updates of this shared state + // and ensures the MVC does not attempt to access methods incorrectly + // due to newly introduced offsets in the structure. + const ProtocolVersion protocolVersion; + void (*delRef)(DatabaseSharedState*); + + Mutex mutexLock; + GRVCacheSpace grvCacheSpace; + std::atomic refCount; + + DatabaseSharedState() + : protocolVersion(currentProtocolVersion), mutexLock(Mutex()), grvCacheSpace(GRVCacheSpace()), refCount(0) {} +}; inline bool isValidPerpetualStorageWiggleLocality(std::string locality) { int pos = locality.find(':'); diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index f726e1b3ba..e6a8d3cafe 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -20,6 +20,7 @@ #ifndef FDBCLIENT_ICLIENTAPI_H #define FDBCLIENT_ICLIENTAPI_H +#include "flow/ProtocolVersion.h" #pragma once #include "fdbclient/FDBOptions.g.h" @@ -151,6 +152,10 @@ public: // Management API, create snapshot virtual ThreadFuture createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0; + // Interface to manage shared state across multiple connections to the same Database + virtual ThreadFuture createSharedState() = 0; + virtual void setSharedState(DatabaseSharedState* p) = 0; + // used in template functions as the Transaction type that can be created through createTransaction() using TransactionT = ITransaction; }; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 09a6875d65..560f585c05 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -26,6 +26,7 @@ #include "fdbclient/ClientVersion.h" #include "fdbclient/LocalClientAPI.h" +#include "flow/ThreadPrimitives.h" #include "flow/network.h" #include "flow/Platform.h" #include "flow/ProtocolVersion.h" @@ -469,6 +470,26 @@ ThreadFuture DLDatabase::createSnapshot(const StringRef& uid, const String return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); }); } +ThreadFuture DLDatabase::createSharedState() { + if (!api->databaseCreateSharedState) { + return unsupported_operation(); + } + FdbCApi::FDBFuture* f = api->databaseCreateSharedState(db); + return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { + DatabaseSharedState* res; + FdbCApi::fdb_error_t error = api->futureGetSharedState(f, &res); + ASSERT(!error); + return res; + }); +} + +void DLDatabase::setSharedState(DatabaseSharedState* p) { + if (!api->databaseSetSharedState) { + throw unsupported_operation(); + } + api->databaseSetSharedState(db, p); +} + // Get network thread busyness double DLDatabase::getMainThreadBusyness() { if (api->databaseGetMainThreadBusyness != nullptr) { @@ -545,6 +566,10 @@ void DLApi::init() { loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610); loadClientFunction(&api->databaseOpenTenant, lib, fdbCPath, "fdb_database_open_tenant", headerVersion >= 710); + loadClientFunction( + &api->databaseCreateSharedState, lib, fdbCPath, "fdb_database_create_shared_state", headerVersion >= 710); + loadClientFunction( + &api->databaseSetSharedState, lib, fdbCPath, "fdb_database_set_shared_state", headerVersion >= 710); loadClientFunction( &api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction", headerVersion >= 0); loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option", headerVersion >= 0); @@ -643,6 +668,7 @@ void DLApi::init() { &api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array", headerVersion >= 0); loadClientFunction( &api->futureGetMappedKeyValueArray, lib, fdbCPath, "fdb_future_get_mappedkeyvalue_array", headerVersion >= 700); + loadClientFunction(&api->futureGetSharedState, lib, fdbCPath, "fdb_future_get_shared_state", headerVersion >= 710); loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback", headerVersion >= 0); loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel", headerVersion >= 0); loadClientFunction(&api->futureDestroy, lib, fdbCPath, "fdb_future_destroy", headerVersion >= 0); @@ -1276,7 +1302,6 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, : dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) { dbState->db = db; dbState->dbVar->set(db); - if (openConnectors) { if (!api->localClientDisabled) { dbState->addClient(api->getLocalClient()); @@ -1391,6 +1416,18 @@ ThreadFuture MultiVersionDatabase::createSnapshot(const StringRef& uid, co return abortableFuture(f, dbState->dbVar->get().onChange); } +ThreadFuture MultiVersionDatabase::createSharedState() { + auto dbVar = dbState->dbVar->get(); + auto f = dbVar.value ? dbVar.value->createSharedState() : ThreadFuture(Never()); + return abortableFuture(f, dbVar.onChange); +} + +void MultiVersionDatabase::setSharedState(DatabaseSharedState* p) { + if (dbState->db) { + dbState->db->setSharedState(p); + } +} + // Get network thread busyness // Return the busyness for the main thread. When using external clients, take the larger of the local client // and the external client's busyness. @@ -1497,6 +1534,11 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion TraceEvent("ProtocolVersionChanged") .detail("NewProtocolVersion", protocolVersion) .detail("OldProtocolVersion", dbProtocolVersion); + // When the protocol version changes, clear the corresponding entry in the shared state map + // so it can be re-initialized. Only do so if there was a valid previous protocol version. + if (dbProtocolVersion.present()) { + MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterFilePath); + } dbProtocolVersion = protocolVersion; @@ -1607,8 +1649,15 @@ void MultiVersionDatabase::DatabaseState::updateDatabase(Reference ne .detail("ClusterFilePath", clusterFilePath); } } - - dbVar->set(db); + if (db.isValid() && dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) { + auto updateResult = MultiVersionApi::api->updateClusterSharedStateMap(clusterFilePath, db); + auto handler = mapThreadFuture(updateResult, [this](ErrorOr result) { + dbVar->set(db); + return ErrorOr(Void()); + }); + } else { + dbVar->set(db); + } ASSERT(protocolVersionMonitor.isValid()); protocolVersionMonitor.cancel(); @@ -2264,6 +2313,31 @@ void MultiVersionApi::updateSupportedVersions() { } } +ThreadFuture MultiVersionApi::updateClusterSharedStateMap(std::string clusterFilePath, Reference db) { + MutexHolder holder(lock); + if (clusterSharedStateMap.find(clusterFilePath) == clusterSharedStateMap.end()) { + clusterSharedStateMap[clusterFilePath] = db->createSharedState(); + } else { + ThreadFuture entry = clusterSharedStateMap[clusterFilePath]; + return mapThreadFuture(entry, [db](ErrorOr result) { + if (result.isError()) { + return ErrorOr(result.getError()); + } + auto ssPtr = result.get(); + db->setSharedState(ssPtr); + return ErrorOr(Void()); + }); + } + return Void(); +} + +void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePath) { + MutexHolder holder(lock); + auto ssPtr = clusterSharedStateMap[clusterFilePath].get(); + ssPtr->delRef(ssPtr); + clusterSharedStateMap.erase(clusterFilePath); +} + std::vector parseOptionValues(std::string valueStr) { std::string specialCharacters = "\\"; specialCharacters += ENV_VAR_PATH_SEPARATOR; diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index c915329681..c7c3b991d8 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -20,6 +20,7 @@ #ifndef FDBCLIENT_MULTIVERSIONTRANSACTION_H #define FDBCLIENT_MULTIVERSIONTRANSACTION_H +#include "flow/ProtocolVersion.h" #pragma once #include "bindings/c/foundationdb/fdb_c_options.g.h" @@ -149,6 +150,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int uidLength, uint8_t const* snapshotCommmand, int snapshotCommandLength); + FDBFuture* (*databaseCreateSharedState)(FDBDatabase* database); + void (*databaseSetSharedState)(FDBDatabase* database, DatabaseSharedState* p); + double (*databaseGetMainThreadBusyness)(FDBDatabase* database); FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion); @@ -285,6 +289,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { FDBMappedKeyValue const** outKVM, int* outCount, fdb_bool_t* outMore); + fdb_error_t (*futureGetSharedState)(FDBFuture* f, DatabaseSharedState** outPtr); fdb_error_t (*futureSetCallback)(FDBFuture* f, FDBCallback callback, void* callback_parameter); void (*futureCancel)(FDBFuture* f); void (*futureDestroy)(FDBFuture* f); @@ -433,6 +438,9 @@ public: ThreadFuture forceRecoveryWithDataLoss(const StringRef& dcid) override; ThreadFuture createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override; + ThreadFuture createSharedState() override; + void setSharedState(DatabaseSharedState* p) override; + private: const Reference api; FdbCApi::FDBDatabase* @@ -708,6 +716,9 @@ public: ThreadFuture forceRecoveryWithDataLoss(const StringRef& dcid) override; ThreadFuture createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override; + ThreadFuture createSharedState() override; + void setSharedState(DatabaseSharedState* p) override; + // private: struct LegacyVersionMonitor; @@ -830,6 +841,8 @@ public: bool callbackOnMainThread; bool localClientDisabled; + ThreadFuture updateClusterSharedStateMap(std::string clusterFilePath, Reference db); + void clearClusterSharedStateMapEntry(std::string clusterFilePath); static bool apiVersionAtLeast(int minVersion); @@ -853,6 +866,9 @@ private: Reference localClient; std::map externalClientDescriptions; std::map>> externalClients; + // Map of clusterFilePath -> DatabaseSharedState pointer Future + // Upon cluster version upgrade, clear the map entry for that cluster + std::map> clusterSharedStateMap; bool networkStartSetup; volatile bool networkSetup; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 7a041e1cd9..ad9f936852 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -71,6 +71,7 @@ #include "flow/Error.h" #include "flow/FastRef.h" #include "flow/IRandom.h" +#include "flow/ProtocolVersion.h" #include "flow/flow.h" #include "flow/genericactors.actor.h" #include "flow/Knobs.h" @@ -214,7 +215,25 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) { } } +void updateCachedReadVersionShared(double t, Version v, DatabaseSharedState* p) { + MutexHolder mutex(p->mutexLock); + if (v >= p->grvCacheSpace.cachedReadVersion) { + TraceEvent(SevDebug, "CacheReadVersionUpdate") + .detail("Version", v) + .detail("CurTime", t) + .detail("LastVersion", p->grvCacheSpace.cachedReadVersion) + .detail("LastTime", p->grvCacheSpace.lastGrvTime); + p->grvCacheSpace.cachedReadVersion = v; + if (t > p->grvCacheSpace.lastGrvTime) { + p->grvCacheSpace.lastGrvTime = t; + } + } +} + void DatabaseContext::updateCachedReadVersion(double t, Version v) { + if (sharedStatePtr) { + return updateCachedReadVersionShared(t, v, sharedStatePtr); + } if (v >= cachedReadVersion) { TraceEvent(SevDebug, "CachedReadVersionUpdate") .detail("Version", v) @@ -233,10 +252,18 @@ void DatabaseContext::updateCachedReadVersion(double t, Version v) { } Version DatabaseContext::getCachedReadVersion() { + if (sharedStatePtr) { + MutexHolder mutex(sharedStatePtr->mutexLock); + return sharedStatePtr->grvCacheSpace.cachedReadVersion; + } return cachedReadVersion; } double DatabaseContext::getLastGrvTime() { + if (sharedStatePtr) { + MutexHolder mutex(sharedStatePtr->mutexLock); + return sharedStatePtr->grvCacheSpace.lastGrvTime; + } return lastGrvTime; } @@ -1363,11 +1390,12 @@ DatabaseContext::DatabaseContext(ReferenceSHARD_STAT_SMOOTH_AMOUNT), + bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), outstandingWatches(0), sharedStatePtr(nullptr), + lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), + lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), + clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0), + healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), + smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), specialKeySpace(std::make_unique(specialKeys.begin, specialKeys.end, /* test */ false)), connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) { dbId = deterministicRandom()->randomUniqueID(); @@ -1664,6 +1692,9 @@ DatabaseContext::~DatabaseContext() { if (grvUpdateHandler.isValid()) { grvUpdateHandler.cancel(); } + if (sharedStatePtr) { + sharedStatePtr->delRef(sharedStatePtr); + } for (auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it)) it->second->notifyContextDestroyed(); ASSERT_ABORT(server_interf.empty()); @@ -8143,6 +8174,29 @@ Future DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command); } +void sharedStateDelRef(DatabaseSharedState* ssPtr) { + if (--ssPtr->refCount == 0) { + delete ssPtr; + } +} + +Future DatabaseContext::initSharedState() { + ASSERT(!sharedStatePtr); // Don't re-initialize shared state if a pointer already exists + DatabaseSharedState* newState = new DatabaseSharedState(); + // Increment refcount by 1 on creation to account for the one held in MultiVersionApi map + // Therefore, on initialization, refCount should be 2 (after also going to setSharedState) + newState->refCount++; + newState->delRef = &sharedStateDelRef; + setSharedState(newState); + return newState; +} + +void DatabaseContext::setSharedState(DatabaseSharedState* p) { + ASSERT(p->protocolVersion == currentProtocolVersion); + sharedStatePtr = p; + sharedStatePtr->refCount++; +} + ACTOR Future storageFeedVersionUpdater(StorageServerInterface interf, ChangeFeedStorageData* self) { state Promise destroyed = self->destroyed; loop { diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 0840456459..1877972ada 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -25,6 +25,7 @@ #include "fdbclient/versions.h" #include "fdbclient/GenericManagementAPI.actor.h" #include "fdbclient/NativeAPI.actor.h" +#include "flow/ProtocolVersion.h" // Users of ThreadSafeTransaction might share Reference between different threads as long as they don't // call addRef (e.g. C API follows this). Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any @@ -101,6 +102,16 @@ ThreadFuture ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons return onMainThread([db, snapUID, cmd]() -> Future { return db->createSnapshot(snapUID, cmd); }); } +ThreadFuture ThreadSafeDatabase::createSharedState() { + DatabaseContext* db = this->db; + return onMainThread([db]() -> Future { return db->initSharedState(); }); +} + +void ThreadSafeDatabase::setSharedState(DatabaseSharedState* p) { + DatabaseContext* db = this->db; + onMainThreadVoid([db, p]() { db->setSharedState(p); }, nullptr); +} + // Return the main network thread busyness double ThreadSafeDatabase::getMainThreadBusyness() { ASSERT(g_network); diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index f37cf84bf1..3c67ed0e9f 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -20,6 +20,7 @@ #ifndef FDBCLIENT_THREADSAFETRANSACTION_H #define FDBCLIENT_THREADSAFETRANSACTION_H +#include "flow/ProtocolVersion.h" #pragma once #include "fdbclient/ReadYourWrites.h" @@ -58,6 +59,9 @@ public: ThreadFuture forceRecoveryWithDataLoss(const StringRef& dcid) override; ThreadFuture createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override; + ThreadFuture createSharedState() override; + void setSharedState(DatabaseSharedState* p) override; + private: friend class ThreadSafeTenant; friend class ThreadSafeTransaction;