Merge pull request #6664 from sfc-gh-jfu/jfu-grv-cache-multi-threaded
Introduce multi-threaded/multi-version client support for GRV caching
This commit is contained in:
commit
2ff11982cc
|
@ -1,6 +1,8 @@
|
||||||
set(FDB_C_SRCS
|
set(FDB_C_SRCS
|
||||||
fdb_c.cpp
|
fdb_c.cpp
|
||||||
foundationdb/fdb_c.h)
|
foundationdb/fdb_c.h
|
||||||
|
foundationdb/fdb_c_internal.h
|
||||||
|
foundationdb/fdb_c_types.h)
|
||||||
|
|
||||||
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/foundationdb)
|
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/foundationdb)
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "fdbclient/FDBTypes.h"
|
#include "fdbclient/FDBTypes.h"
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#define FDB_API_VERSION 710
|
#define FDB_API_VERSION 710
|
||||||
#define FDB_INCLUDE_LEGACY_TYPES
|
#define FDB_INCLUDE_LEGACY_TYPES
|
||||||
|
@ -26,6 +27,7 @@
|
||||||
#include "fdbclient/MultiVersionTransaction.h"
|
#include "fdbclient/MultiVersionTransaction.h"
|
||||||
#include "fdbclient/MultiVersionAssignmentVars.h"
|
#include "fdbclient/MultiVersionAssignmentVars.h"
|
||||||
#include "foundationdb/fdb_c.h"
|
#include "foundationdb/fdb_c.h"
|
||||||
|
#include "foundationdb/fdb_c_internal.h"
|
||||||
|
|
||||||
int g_api_version = 0;
|
int g_api_version = 0;
|
||||||
|
|
||||||
|
@ -293,6 +295,10 @@ extern "C" DLLEXPORT fdb_error_t fdb_future_get_mappedkeyvalue_array(FDBFuture*
|
||||||
*out_more = rrr.more;);
|
*out_more = rrr.more;);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" DLLEXPORT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr) {
|
||||||
|
CATCH_AND_RETURN(*outPtr = (DatabaseSharedState*)((TSAV(DatabaseSharedState*, f)->get())););
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" DLLEXPORT fdb_error_t fdb_future_get_string_array(FDBFuture* f, const char*** out_strings, int* out_count) {
|
extern "C" DLLEXPORT fdb_error_t fdb_future_get_string_array(FDBFuture* f, const char*** out_strings, int* out_count) {
|
||||||
CATCH_AND_RETURN(Standalone<VectorRef<const char*>> na = TSAV(Standalone<VectorRef<const char*>>, f)->get();
|
CATCH_AND_RETURN(Standalone<VectorRef<const char*>> na = TSAV(Standalone<VectorRef<const char*>>, f)->get();
|
||||||
*out_strings = (const char**)na.begin();
|
*out_strings = (const char**)na.begin();
|
||||||
|
@ -426,6 +432,17 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
|
||||||
.extractPtr());
|
.extractPtr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" DLLEXPORT FDBFuture* fdb_database_create_shared_state(FDBDatabase* db) {
|
||||||
|
return (FDBFuture*)(DB(db)->createSharedState().extractPtr());
|
||||||
|
}
|
||||||
|
|
||||||
|
extern "C" DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p) {
|
||||||
|
try {
|
||||||
|
DB(db)->setSharedState(p);
|
||||||
|
} catch (...) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get network thread busyness (updated every 1s)
|
// Get network thread busyness (updated every 1s)
|
||||||
// A value of 0 indicates that the client is more or less idle
|
// A value of 0 indicates that the client is more or less idle
|
||||||
// A value of 1 (or more) indicates that the client is saturated
|
// A value of 1 (or more) indicates that the client is saturated
|
||||||
|
|
|
@ -58,21 +58,12 @@
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#include "fdb_c_options.g.h"
|
#include "fdb_c_options.g.h"
|
||||||
|
#include "fdb_c_types.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Pointers to these opaque types represent objects in the FDB API */
|
|
||||||
typedef struct FDB_future FDBFuture;
|
|
||||||
typedef struct FDB_result FDBResult;
|
|
||||||
typedef struct FDB_database FDBDatabase;
|
|
||||||
typedef struct FDB_tenant FDBTenant;
|
|
||||||
typedef struct FDB_transaction FDBTransaction;
|
|
||||||
|
|
||||||
typedef int fdb_error_t;
|
|
||||||
typedef int fdb_bool_t;
|
|
||||||
|
|
||||||
DLLEXPORT const char* fdb_get_error(fdb_error_t code);
|
DLLEXPORT const char* fdb_get_error(fdb_error_t code);
|
||||||
|
|
||||||
DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_error_t code);
|
DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_error_t code);
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* fdb_c_internal.h
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FDB_C_INTERNAL_H
|
||||||
|
#define FDB_C_INTERNAL_H
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#ifndef DLLEXPORT
|
||||||
|
#define DLLEXPORT
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifndef WARN_UNUSED_RESULT
|
||||||
|
#define WARN_UNUSED_RESULT
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "fdb_c_types.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// forward declaration and typedef
|
||||||
|
typedef struct DatabaseSharedState DatabaseSharedState;
|
||||||
|
|
||||||
|
DLLEXPORT FDBFuture* fdb_database_create_shared_state(FDBDatabase* db);
|
||||||
|
|
||||||
|
DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p);
|
||||||
|
|
||||||
|
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
#endif
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* fdb_c_types.h
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FDB_C_TYPES_H
|
||||||
|
#define FDB_C_TYPES_H
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#ifndef DLLEXPORT
|
||||||
|
#define DLLEXPORT
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Pointers to these opaque types represent objects in the FDB API */
|
||||||
|
typedef struct FDB_future FDBFuture;
|
||||||
|
typedef struct FDB_result FDBResult;
|
||||||
|
typedef struct FDB_cluster FDBCluster;
|
||||||
|
typedef struct FDB_database FDBDatabase;
|
||||||
|
typedef struct FDB_tenant FDBTenant;
|
||||||
|
typedef struct FDB_transaction FDBTransaction;
|
||||||
|
|
||||||
|
typedef int fdb_error_t;
|
||||||
|
typedef int fdb_bool_t;
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
#endif
|
|
@ -83,8 +83,7 @@ private:
|
||||||
auto results = std::make_shared<std::vector<std::optional<std::string>>>();
|
auto results = std::make_shared<std::vector<std::optional<std::string>>>();
|
||||||
execTransaction(
|
execTransaction(
|
||||||
[kvPairs, results](auto ctx) {
|
[kvPairs, results](auto ctx) {
|
||||||
// TODO: Enable after merging with GRV caching
|
ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
|
||||||
// ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
|
|
||||||
auto futures = std::make_shared<std::vector<Future>>();
|
auto futures = std::make_shared<std::vector<Future>>();
|
||||||
for (const auto& kv : *kvPairs) {
|
for (const auto& kv : *kvPairs) {
|
||||||
futures->push_back(ctx->tx()->get(kv.key, false));
|
futures->push_back(ctx->tx()->get(kv.key, false));
|
||||||
|
|
|
@ -29,6 +29,7 @@
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "fdbclient/FDBTypes.h"
|
||||||
#include "fdbclient/NativeAPI.actor.h"
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
#include "fdbclient/KeyRangeMap.h"
|
#include "fdbclient/KeyRangeMap.h"
|
||||||
#include "fdbclient/CommitProxyInterface.h"
|
#include "fdbclient/CommitProxyInterface.h"
|
||||||
|
@ -519,6 +520,11 @@ public:
|
||||||
int outstandingWatches;
|
int outstandingWatches;
|
||||||
int maxOutstandingWatches;
|
int maxOutstandingWatches;
|
||||||
|
|
||||||
|
// Manage any shared state that may be used by MVC
|
||||||
|
DatabaseSharedState* sharedStatePtr;
|
||||||
|
Future<DatabaseSharedState*> initSharedState();
|
||||||
|
void setSharedState(DatabaseSharedState* p);
|
||||||
|
|
||||||
// GRV Cache
|
// GRV Cache
|
||||||
// Database-level read version cache storing the most recent successful GRV as well as the time it was requested.
|
// Database-level read version cache storing the most recent successful GRV as well as the time it was requested.
|
||||||
double lastGrvTime;
|
double lastGrvTime;
|
||||||
|
|
|
@ -28,6 +28,8 @@
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
|
|
||||||
#include "flow/Arena.h"
|
#include "flow/Arena.h"
|
||||||
|
#include "flow/FastRef.h"
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
#include "flow/flow.h"
|
#include "flow/flow.h"
|
||||||
|
|
||||||
enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
|
enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
|
||||||
|
@ -1350,6 +1352,29 @@ struct TenantMode {
|
||||||
|
|
||||||
uint32_t mode;
|
uint32_t mode;
|
||||||
};
|
};
|
||||||
|
struct GRVCacheSpace {
|
||||||
|
Version cachedReadVersion;
|
||||||
|
double lastGrvTime;
|
||||||
|
|
||||||
|
GRVCacheSpace() : cachedReadVersion(Version(0)), lastGrvTime(0.0) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
// This structure can be extended in the future to include additional features that required a shared state
|
||||||
|
struct DatabaseSharedState {
|
||||||
|
// These two members should always be listed first, in this order.
|
||||||
|
// This is to preserve compatibility with future updates of this shared state
|
||||||
|
// and ensures the MVC does not attempt to access methods incorrectly
|
||||||
|
// due to newly introduced offsets in the structure.
|
||||||
|
const ProtocolVersion protocolVersion;
|
||||||
|
void (*delRef)(DatabaseSharedState*);
|
||||||
|
|
||||||
|
Mutex mutexLock;
|
||||||
|
GRVCacheSpace grvCacheSpace;
|
||||||
|
std::atomic<int> refCount;
|
||||||
|
|
||||||
|
DatabaseSharedState()
|
||||||
|
: protocolVersion(currentProtocolVersion), mutexLock(Mutex()), grvCacheSpace(GRVCacheSpace()), refCount(0) {}
|
||||||
|
};
|
||||||
|
|
||||||
inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {
|
inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {
|
||||||
int pos = locality.find(':');
|
int pos = locality.find(':');
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
#ifndef FDBCLIENT_ICLIENTAPI_H
|
#ifndef FDBCLIENT_ICLIENTAPI_H
|
||||||
#define FDBCLIENT_ICLIENTAPI_H
|
#define FDBCLIENT_ICLIENTAPI_H
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "fdbclient/FDBOptions.g.h"
|
#include "fdbclient/FDBOptions.g.h"
|
||||||
|
@ -151,6 +152,10 @@ public:
|
||||||
// Management API, create snapshot
|
// Management API, create snapshot
|
||||||
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
|
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
|
||||||
|
|
||||||
|
// Interface to manage shared state across multiple connections to the same Database
|
||||||
|
virtual ThreadFuture<DatabaseSharedState*> createSharedState() = 0;
|
||||||
|
virtual void setSharedState(DatabaseSharedState* p) = 0;
|
||||||
|
|
||||||
// used in template functions as the Transaction type that can be created through createTransaction()
|
// used in template functions as the Transaction type that can be created through createTransaction()
|
||||||
using TransactionT = ITransaction;
|
using TransactionT = ITransaction;
|
||||||
};
|
};
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "fdbclient/ClientVersion.h"
|
#include "fdbclient/ClientVersion.h"
|
||||||
#include "fdbclient/LocalClientAPI.h"
|
#include "fdbclient/LocalClientAPI.h"
|
||||||
|
|
||||||
|
#include "flow/ThreadPrimitives.h"
|
||||||
#include "flow/network.h"
|
#include "flow/network.h"
|
||||||
#include "flow/Platform.h"
|
#include "flow/Platform.h"
|
||||||
#include "flow/ProtocolVersion.h"
|
#include "flow/ProtocolVersion.h"
|
||||||
|
@ -469,6 +470,26 @@ ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef& uid, const String
|
||||||
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
|
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<DatabaseSharedState*> DLDatabase::createSharedState() {
|
||||||
|
if (!api->databaseCreateSharedState) {
|
||||||
|
return unsupported_operation();
|
||||||
|
}
|
||||||
|
FdbCApi::FDBFuture* f = api->databaseCreateSharedState(db);
|
||||||
|
return toThreadFuture<DatabaseSharedState*>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||||
|
DatabaseSharedState* res;
|
||||||
|
FdbCApi::fdb_error_t error = api->futureGetSharedState(f, &res);
|
||||||
|
ASSERT(!error);
|
||||||
|
return res;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void DLDatabase::setSharedState(DatabaseSharedState* p) {
|
||||||
|
if (!api->databaseSetSharedState) {
|
||||||
|
throw unsupported_operation();
|
||||||
|
}
|
||||||
|
api->databaseSetSharedState(db, p);
|
||||||
|
}
|
||||||
|
|
||||||
// Get network thread busyness
|
// Get network thread busyness
|
||||||
double DLDatabase::getMainThreadBusyness() {
|
double DLDatabase::getMainThreadBusyness() {
|
||||||
if (api->databaseGetMainThreadBusyness != nullptr) {
|
if (api->databaseGetMainThreadBusyness != nullptr) {
|
||||||
|
@ -545,6 +566,10 @@ void DLApi::init() {
|
||||||
loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610);
|
loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610);
|
||||||
|
|
||||||
loadClientFunction(&api->databaseOpenTenant, lib, fdbCPath, "fdb_database_open_tenant", headerVersion >= 710);
|
loadClientFunction(&api->databaseOpenTenant, lib, fdbCPath, "fdb_database_open_tenant", headerVersion >= 710);
|
||||||
|
loadClientFunction(
|
||||||
|
&api->databaseCreateSharedState, lib, fdbCPath, "fdb_database_create_shared_state", headerVersion >= 710);
|
||||||
|
loadClientFunction(
|
||||||
|
&api->databaseSetSharedState, lib, fdbCPath, "fdb_database_set_shared_state", headerVersion >= 710);
|
||||||
loadClientFunction(
|
loadClientFunction(
|
||||||
&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction", headerVersion >= 0);
|
&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction", headerVersion >= 0);
|
||||||
loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option", headerVersion >= 0);
|
loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option", headerVersion >= 0);
|
||||||
|
@ -643,6 +668,7 @@ void DLApi::init() {
|
||||||
&api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array", headerVersion >= 0);
|
&api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array", headerVersion >= 0);
|
||||||
loadClientFunction(
|
loadClientFunction(
|
||||||
&api->futureGetMappedKeyValueArray, lib, fdbCPath, "fdb_future_get_mappedkeyvalue_array", headerVersion >= 700);
|
&api->futureGetMappedKeyValueArray, lib, fdbCPath, "fdb_future_get_mappedkeyvalue_array", headerVersion >= 700);
|
||||||
|
loadClientFunction(&api->futureGetSharedState, lib, fdbCPath, "fdb_future_get_shared_state", headerVersion >= 710);
|
||||||
loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback", headerVersion >= 0);
|
loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback", headerVersion >= 0);
|
||||||
loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel", headerVersion >= 0);
|
loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel", headerVersion >= 0);
|
||||||
loadClientFunction(&api->futureDestroy, lib, fdbCPath, "fdb_future_destroy", headerVersion >= 0);
|
loadClientFunction(&api->futureDestroy, lib, fdbCPath, "fdb_future_destroy", headerVersion >= 0);
|
||||||
|
@ -1276,7 +1302,6 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
|
||||||
: dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) {
|
: dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) {
|
||||||
dbState->db = db;
|
dbState->db = db;
|
||||||
dbState->dbVar->set(db);
|
dbState->dbVar->set(db);
|
||||||
|
|
||||||
if (openConnectors) {
|
if (openConnectors) {
|
||||||
if (!api->localClientDisabled) {
|
if (!api->localClientDisabled) {
|
||||||
dbState->addClient(api->getLocalClient());
|
dbState->addClient(api->getLocalClient());
|
||||||
|
@ -1391,6 +1416,18 @@ ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef& uid, co
|
||||||
return abortableFuture(f, dbState->dbVar->get().onChange);
|
return abortableFuture(f, dbState->dbVar->get().onChange);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<DatabaseSharedState*> MultiVersionDatabase::createSharedState() {
|
||||||
|
auto dbVar = dbState->dbVar->get();
|
||||||
|
auto f = dbVar.value ? dbVar.value->createSharedState() : ThreadFuture<DatabaseSharedState*>(Never());
|
||||||
|
return abortableFuture(f, dbVar.onChange);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MultiVersionDatabase::setSharedState(DatabaseSharedState* p) {
|
||||||
|
if (dbState->db) {
|
||||||
|
dbState->db->setSharedState(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get network thread busyness
|
// Get network thread busyness
|
||||||
// Return the busyness for the main thread. When using external clients, take the larger of the local client
|
// Return the busyness for the main thread. When using external clients, take the larger of the local client
|
||||||
// and the external client's busyness.
|
// and the external client's busyness.
|
||||||
|
@ -1497,6 +1534,11 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion
|
||||||
TraceEvent("ProtocolVersionChanged")
|
TraceEvent("ProtocolVersionChanged")
|
||||||
.detail("NewProtocolVersion", protocolVersion)
|
.detail("NewProtocolVersion", protocolVersion)
|
||||||
.detail("OldProtocolVersion", dbProtocolVersion);
|
.detail("OldProtocolVersion", dbProtocolVersion);
|
||||||
|
// When the protocol version changes, clear the corresponding entry in the shared state map
|
||||||
|
// so it can be re-initialized. Only do so if there was a valid previous protocol version.
|
||||||
|
if (dbProtocolVersion.present()) {
|
||||||
|
MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterFilePath);
|
||||||
|
}
|
||||||
|
|
||||||
dbProtocolVersion = protocolVersion;
|
dbProtocolVersion = protocolVersion;
|
||||||
|
|
||||||
|
@ -1607,8 +1649,15 @@ void MultiVersionDatabase::DatabaseState::updateDatabase(Reference<IDatabase> ne
|
||||||
.detail("ClusterFilePath", clusterFilePath);
|
.detail("ClusterFilePath", clusterFilePath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (db.isValid() && dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) {
|
||||||
dbVar->set(db);
|
auto updateResult = MultiVersionApi::api->updateClusterSharedStateMap(clusterFilePath, db);
|
||||||
|
auto handler = mapThreadFuture<Void, Void>(updateResult, [this](ErrorOr<Void> result) {
|
||||||
|
dbVar->set(db);
|
||||||
|
return ErrorOr<Void>(Void());
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
dbVar->set(db);
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(protocolVersionMonitor.isValid());
|
ASSERT(protocolVersionMonitor.isValid());
|
||||||
protocolVersionMonitor.cancel();
|
protocolVersionMonitor.cancel();
|
||||||
|
@ -2264,6 +2313,31 @@ void MultiVersionApi::updateSupportedVersions() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<Void> MultiVersionApi::updateClusterSharedStateMap(std::string clusterFilePath, Reference<IDatabase> db) {
|
||||||
|
MutexHolder holder(lock);
|
||||||
|
if (clusterSharedStateMap.find(clusterFilePath) == clusterSharedStateMap.end()) {
|
||||||
|
clusterSharedStateMap[clusterFilePath] = db->createSharedState();
|
||||||
|
} else {
|
||||||
|
ThreadFuture<DatabaseSharedState*> entry = clusterSharedStateMap[clusterFilePath];
|
||||||
|
return mapThreadFuture<DatabaseSharedState*, Void>(entry, [db](ErrorOr<DatabaseSharedState*> result) {
|
||||||
|
if (result.isError()) {
|
||||||
|
return ErrorOr<Void>(result.getError());
|
||||||
|
}
|
||||||
|
auto ssPtr = result.get();
|
||||||
|
db->setSharedState(ssPtr);
|
||||||
|
return ErrorOr<Void>(Void());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePath) {
|
||||||
|
MutexHolder holder(lock);
|
||||||
|
auto ssPtr = clusterSharedStateMap[clusterFilePath].get();
|
||||||
|
ssPtr->delRef(ssPtr);
|
||||||
|
clusterSharedStateMap.erase(clusterFilePath);
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<std::string> parseOptionValues(std::string valueStr) {
|
std::vector<std::string> parseOptionValues(std::string valueStr) {
|
||||||
std::string specialCharacters = "\\";
|
std::string specialCharacters = "\\";
|
||||||
specialCharacters += ENV_VAR_PATH_SEPARATOR;
|
specialCharacters += ENV_VAR_PATH_SEPARATOR;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
#ifndef FDBCLIENT_MULTIVERSIONTRANSACTION_H
|
#ifndef FDBCLIENT_MULTIVERSIONTRANSACTION_H
|
||||||
#define FDBCLIENT_MULTIVERSIONTRANSACTION_H
|
#define FDBCLIENT_MULTIVERSIONTRANSACTION_H
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "bindings/c/foundationdb/fdb_c_options.g.h"
|
#include "bindings/c/foundationdb/fdb_c_options.g.h"
|
||||||
|
@ -149,6 +150,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
||||||
int uidLength,
|
int uidLength,
|
||||||
uint8_t const* snapshotCommmand,
|
uint8_t const* snapshotCommmand,
|
||||||
int snapshotCommandLength);
|
int snapshotCommandLength);
|
||||||
|
FDBFuture* (*databaseCreateSharedState)(FDBDatabase* database);
|
||||||
|
void (*databaseSetSharedState)(FDBDatabase* database, DatabaseSharedState* p);
|
||||||
|
|
||||||
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
|
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
|
||||||
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
|
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
|
||||||
|
|
||||||
|
@ -285,6 +289,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
||||||
FDBMappedKeyValue const** outKVM,
|
FDBMappedKeyValue const** outKVM,
|
||||||
int* outCount,
|
int* outCount,
|
||||||
fdb_bool_t* outMore);
|
fdb_bool_t* outMore);
|
||||||
|
fdb_error_t (*futureGetSharedState)(FDBFuture* f, DatabaseSharedState** outPtr);
|
||||||
fdb_error_t (*futureSetCallback)(FDBFuture* f, FDBCallback callback, void* callback_parameter);
|
fdb_error_t (*futureSetCallback)(FDBFuture* f, FDBCallback callback, void* callback_parameter);
|
||||||
void (*futureCancel)(FDBFuture* f);
|
void (*futureCancel)(FDBFuture* f);
|
||||||
void (*futureDestroy)(FDBFuture* f);
|
void (*futureDestroy)(FDBFuture* f);
|
||||||
|
@ -433,6 +438,9 @@ public:
|
||||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||||
|
|
||||||
|
ThreadFuture<DatabaseSharedState*> createSharedState() override;
|
||||||
|
void setSharedState(DatabaseSharedState* p) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const Reference<FdbCApi> api;
|
const Reference<FdbCApi> api;
|
||||||
FdbCApi::FDBDatabase*
|
FdbCApi::FDBDatabase*
|
||||||
|
@ -708,6 +716,9 @@ public:
|
||||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||||
|
|
||||||
|
ThreadFuture<DatabaseSharedState*> createSharedState() override;
|
||||||
|
void setSharedState(DatabaseSharedState* p) override;
|
||||||
|
|
||||||
// private:
|
// private:
|
||||||
|
|
||||||
struct LegacyVersionMonitor;
|
struct LegacyVersionMonitor;
|
||||||
|
@ -830,6 +841,8 @@ public:
|
||||||
|
|
||||||
bool callbackOnMainThread;
|
bool callbackOnMainThread;
|
||||||
bool localClientDisabled;
|
bool localClientDisabled;
|
||||||
|
ThreadFuture<Void> updateClusterSharedStateMap(std::string clusterFilePath, Reference<IDatabase> db);
|
||||||
|
void clearClusterSharedStateMapEntry(std::string clusterFilePath);
|
||||||
|
|
||||||
static bool apiVersionAtLeast(int minVersion);
|
static bool apiVersionAtLeast(int minVersion);
|
||||||
|
|
||||||
|
@ -853,6 +866,9 @@ private:
|
||||||
Reference<ClientInfo> localClient;
|
Reference<ClientInfo> localClient;
|
||||||
std::map<std::string, ClientDesc> externalClientDescriptions;
|
std::map<std::string, ClientDesc> externalClientDescriptions;
|
||||||
std::map<std::string, std::vector<Reference<ClientInfo>>> externalClients;
|
std::map<std::string, std::vector<Reference<ClientInfo>>> externalClients;
|
||||||
|
// Map of clusterFilePath -> DatabaseSharedState pointer Future
|
||||||
|
// Upon cluster version upgrade, clear the map entry for that cluster
|
||||||
|
std::map<std::string, ThreadFuture<DatabaseSharedState*>> clusterSharedStateMap;
|
||||||
|
|
||||||
bool networkStartSetup;
|
bool networkStartSetup;
|
||||||
volatile bool networkSetup;
|
volatile bool networkSetup;
|
||||||
|
|
|
@ -71,6 +71,7 @@
|
||||||
#include "flow/Error.h"
|
#include "flow/Error.h"
|
||||||
#include "flow/FastRef.h"
|
#include "flow/FastRef.h"
|
||||||
#include "flow/IRandom.h"
|
#include "flow/IRandom.h"
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
#include "flow/flow.h"
|
#include "flow/flow.h"
|
||||||
#include "flow/genericactors.actor.h"
|
#include "flow/genericactors.actor.h"
|
||||||
#include "flow/Knobs.h"
|
#include "flow/Knobs.h"
|
||||||
|
@ -214,7 +215,25 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateCachedReadVersionShared(double t, Version v, DatabaseSharedState* p) {
|
||||||
|
MutexHolder mutex(p->mutexLock);
|
||||||
|
if (v >= p->grvCacheSpace.cachedReadVersion) {
|
||||||
|
TraceEvent(SevDebug, "CacheReadVersionUpdate")
|
||||||
|
.detail("Version", v)
|
||||||
|
.detail("CurTime", t)
|
||||||
|
.detail("LastVersion", p->grvCacheSpace.cachedReadVersion)
|
||||||
|
.detail("LastTime", p->grvCacheSpace.lastGrvTime);
|
||||||
|
p->grvCacheSpace.cachedReadVersion = v;
|
||||||
|
if (t > p->grvCacheSpace.lastGrvTime) {
|
||||||
|
p->grvCacheSpace.lastGrvTime = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void DatabaseContext::updateCachedReadVersion(double t, Version v) {
|
void DatabaseContext::updateCachedReadVersion(double t, Version v) {
|
||||||
|
if (sharedStatePtr) {
|
||||||
|
return updateCachedReadVersionShared(t, v, sharedStatePtr);
|
||||||
|
}
|
||||||
if (v >= cachedReadVersion) {
|
if (v >= cachedReadVersion) {
|
||||||
TraceEvent(SevDebug, "CachedReadVersionUpdate")
|
TraceEvent(SevDebug, "CachedReadVersionUpdate")
|
||||||
.detail("Version", v)
|
.detail("Version", v)
|
||||||
|
@ -233,10 +252,18 @@ void DatabaseContext::updateCachedReadVersion(double t, Version v) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Version DatabaseContext::getCachedReadVersion() {
|
Version DatabaseContext::getCachedReadVersion() {
|
||||||
|
if (sharedStatePtr) {
|
||||||
|
MutexHolder mutex(sharedStatePtr->mutexLock);
|
||||||
|
return sharedStatePtr->grvCacheSpace.cachedReadVersion;
|
||||||
|
}
|
||||||
return cachedReadVersion;
|
return cachedReadVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
double DatabaseContext::getLastGrvTime() {
|
double DatabaseContext::getLastGrvTime() {
|
||||||
|
if (sharedStatePtr) {
|
||||||
|
MutexHolder mutex(sharedStatePtr->mutexLock);
|
||||||
|
return sharedStatePtr->grvCacheSpace.lastGrvTime;
|
||||||
|
}
|
||||||
return lastGrvTime;
|
return lastGrvTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1363,11 +1390,12 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
||||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||||
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
||||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
|
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
|
||||||
bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), outstandingWatches(0), lastGrvTime(0.0),
|
bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), outstandingWatches(0), sharedStatePtr(nullptr),
|
||||||
cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
|
lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0),
|
||||||
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
|
lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
|
||||||
coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
|
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0),
|
||||||
detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||||
|
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
|
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
|
||||||
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
|
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
|
||||||
dbId = deterministicRandom()->randomUniqueID();
|
dbId = deterministicRandom()->randomUniqueID();
|
||||||
|
@ -1664,6 +1692,9 @@ DatabaseContext::~DatabaseContext() {
|
||||||
if (grvUpdateHandler.isValid()) {
|
if (grvUpdateHandler.isValid()) {
|
||||||
grvUpdateHandler.cancel();
|
grvUpdateHandler.cancel();
|
||||||
}
|
}
|
||||||
|
if (sharedStatePtr) {
|
||||||
|
sharedStatePtr->delRef(sharedStatePtr);
|
||||||
|
}
|
||||||
for (auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
|
for (auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
|
||||||
it->second->notifyContextDestroyed();
|
it->second->notifyContextDestroyed();
|
||||||
ASSERT_ABORT(server_interf.empty());
|
ASSERT_ABORT(server_interf.empty());
|
||||||
|
@ -8143,6 +8174,29 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
|
||||||
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
|
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void sharedStateDelRef(DatabaseSharedState* ssPtr) {
|
||||||
|
if (--ssPtr->refCount == 0) {
|
||||||
|
delete ssPtr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<DatabaseSharedState*> DatabaseContext::initSharedState() {
|
||||||
|
ASSERT(!sharedStatePtr); // Don't re-initialize shared state if a pointer already exists
|
||||||
|
DatabaseSharedState* newState = new DatabaseSharedState();
|
||||||
|
// Increment refcount by 1 on creation to account for the one held in MultiVersionApi map
|
||||||
|
// Therefore, on initialization, refCount should be 2 (after also going to setSharedState)
|
||||||
|
newState->refCount++;
|
||||||
|
newState->delRef = &sharedStateDelRef;
|
||||||
|
setSharedState(newState);
|
||||||
|
return newState;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DatabaseContext::setSharedState(DatabaseSharedState* p) {
|
||||||
|
ASSERT(p->protocolVersion == currentProtocolVersion);
|
||||||
|
sharedStatePtr = p;
|
||||||
|
sharedStatePtr->refCount++;
|
||||||
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> storageFeedVersionUpdater(StorageServerInterface interf, ChangeFeedStorageData* self) {
|
ACTOR Future<Void> storageFeedVersionUpdater(StorageServerInterface interf, ChangeFeedStorageData* self) {
|
||||||
state Promise<Void> destroyed = self->destroyed;
|
state Promise<Void> destroyed = self->destroyed;
|
||||||
loop {
|
loop {
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "fdbclient/versions.h"
|
#include "fdbclient/versions.h"
|
||||||
#include "fdbclient/GenericManagementAPI.actor.h"
|
#include "fdbclient/GenericManagementAPI.actor.h"
|
||||||
#include "fdbclient/NativeAPI.actor.h"
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
|
|
||||||
// Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't
|
// Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't
|
||||||
// call addRef (e.g. C API follows this). Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any
|
// call addRef (e.g. C API follows this). Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any
|
||||||
|
@ -101,6 +102,16 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
|
||||||
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
|
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<DatabaseSharedState*> ThreadSafeDatabase::createSharedState() {
|
||||||
|
DatabaseContext* db = this->db;
|
||||||
|
return onMainThread([db]() -> Future<DatabaseSharedState*> { return db->initSharedState(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadSafeDatabase::setSharedState(DatabaseSharedState* p) {
|
||||||
|
DatabaseContext* db = this->db;
|
||||||
|
onMainThreadVoid([db, p]() { db->setSharedState(p); }, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
// Return the main network thread busyness
|
// Return the main network thread busyness
|
||||||
double ThreadSafeDatabase::getMainThreadBusyness() {
|
double ThreadSafeDatabase::getMainThreadBusyness() {
|
||||||
ASSERT(g_network);
|
ASSERT(g_network);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
#ifndef FDBCLIENT_THREADSAFETRANSACTION_H
|
#ifndef FDBCLIENT_THREADSAFETRANSACTION_H
|
||||||
#define FDBCLIENT_THREADSAFETRANSACTION_H
|
#define FDBCLIENT_THREADSAFETRANSACTION_H
|
||||||
|
#include "flow/ProtocolVersion.h"
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "fdbclient/ReadYourWrites.h"
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
|
@ -58,6 +59,9 @@ public:
|
||||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||||
|
|
||||||
|
ThreadFuture<DatabaseSharedState*> createSharedState() override;
|
||||||
|
void setSharedState(DatabaseSharedState* p) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class ThreadSafeTenant;
|
friend class ThreadSafeTenant;
|
||||||
friend class ThreadSafeTransaction;
|
friend class ThreadSafeTransaction;
|
||||||
|
|
Loading…
Reference in New Issue