added fdb_c_internal.h and set up shared space initialization in multiversion api db creation

This commit is contained in:
Jon Fu 2022-01-21 16:52:11 -05:00
parent dd1d48754d
commit 133ee2c113
11 changed files with 140 additions and 8 deletions

View File

@ -1,6 +1,7 @@
set(FDB_C_SRCS
fdb_c.cpp
foundationdb/fdb_c.h
foundationdb/fdb_c_internal.h
ThreadCleanup.cpp)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/foundationdb)

View File

@ -26,6 +26,7 @@
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/MultiVersionAssignmentVars.h"
#include "foundationdb/fdb_c.h"
#include "foundationdb/fdb_c_internal.h"
int g_api_version = 0;
@ -405,6 +406,14 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
.extractPtr());
}
extern "C" DLLEXPORT DatabaseSharedState* fdb_database_create_shared_state(FDBDatabase* db) {
return (DatabaseSharedState*)(DB(db)->createSharedState());
}
extern "C" DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p) {
(DB(db)->setSharedState(p));
}
// Get network thread busyness (updated every 1s)
// A value of 0 indicates that the client is more or less idle
// A value of 1 (or more) indicates that the client is saturated

View File

@ -63,6 +63,8 @@
extern "C" {
#endif
#ifndef FDB_API_OBJECTS
#define FDB_API_OBJECTS
/* Pointers to these opaque types represent objects in the FDB API */
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
@ -71,6 +73,7 @@ typedef struct FDB_transaction FDBTransaction;
typedef int fdb_error_t;
typedef int fdb_bool_t;
#endif
DLLEXPORT const char* fdb_get_error(fdb_error_t code);

View File

@ -0,0 +1,55 @@
/*
* fdb_c_internal.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDB_C_INTERNAL_H
#define FDB_C_INTERNAL_H
#pragma once
#ifndef DLLEXPORT
#define DLLEXPORT
#endif
#ifdef __cplusplus
extern "C" {
#endif
#ifndef FDB_API_OBJECTS
#define FDB_API_OBJECTS
/* Pointers to these opaque types represent objects in the FDB API */
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_transaction FDBTransaction;
typedef int fdb_error_t;
typedef int fdb_bool_t;
#endif
// forward declaration and typedef
typedef struct DatabaseSharedState DatabaseSharedState;
DLLEXPORT DatabaseSharedState* fdb_database_create_shared_state(FDBDatabase* db);
DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -470,9 +470,9 @@ public:
int maxOutstandingWatches;
// Manage any shared state that may be used by MVC
std::shared_ptr<DatabaseSharedState> sharedStatePtr;
std::shared_ptr<DatabaseSharedState> initSharedState();
void setSharedState(std::shared_ptr<DatabaseSharedState> p);
DatabaseSharedState* sharedStatePtr;
DatabaseSharedState* initSharedState();
void setSharedState(DatabaseSharedState* p);
// GRV Cache
// Database-level read version cache storing the most recent successful GRV as well as the time it was requested.

View File

@ -137,6 +137,10 @@ public:
// Management API, create snapshot
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
// Interface to manage shared state across multiple connections to the same Database
virtual DatabaseSharedState* createSharedState() = 0;
virtual void setSharedState(DatabaseSharedState* p) = 0;
// used in template functions as the Transaction type that can be created through createTransaction()
using TransactionT = ITransaction;
};

View File

@ -446,6 +446,19 @@ ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef& uid, const String
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
DatabaseSharedState* DLDatabase::createSharedState() {
if (!api->databaseCreateSharedState) {
return nullptr;
}
return api->databaseCreateSharedState(db);
}
void DLDatabase::setSharedState(DatabaseSharedState* p) {
if (api->databaseSetSharedState) {
api->databaseSetSharedState(db, p);
}
}
// Get network thread busyness
double DLDatabase::getMainThreadBusyness() {
if (api->databaseGetMainThreadBusyness != nullptr) {
@ -1288,6 +1301,19 @@ ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef& uid, co
return abortableFuture(f, dbState->dbVar->get().onChange);
}
DatabaseSharedState* MultiVersionDatabase::createSharedState() {
if (dbState->db) {
return dbState->db->createSharedState();
}
return nullptr;
}
void MultiVersionDatabase::setSharedState(DatabaseSharedState* p) {
if (dbState->db) {
dbState->db->setSharedState(p);
}
}
// Get network thread busyness
// Return the busyness for the main thread. When using external clients, take the larger of the local client
// and the external client's busyness.
@ -2121,6 +2147,13 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
lock.leave();
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
if (clusterSharedStateMap.find(clusterFile) == clusterSharedStateMap.end() ||
clusterSharedStateMap[clusterFile] == nullptr) {
DatabaseSharedState* p = localDb->createSharedState();
clusterSharedStateMap[clusterFile] = p;
} else {
localDb->setSharedState(clusterSharedStateMap[clusterFile]);
}
return Reference<IDatabase>(
new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>(), localDb));
}
@ -2130,6 +2163,13 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
ASSERT_LE(threadCount, 1);
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
if (clusterSharedStateMap.find(clusterFile) == clusterSharedStateMap.end() ||
clusterSharedStateMap[clusterFile] == nullptr) {
DatabaseSharedState* p = localDb->createSharedState();
clusterSharedStateMap[clusterFile] = p;
} else {
localDb->setSharedState(clusterSharedStateMap[clusterFile]);
}
if (bypassMultiClientApi) {
return localDb;
} else {

View File

@ -108,6 +108,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int uidLength,
uint8_t const* snapshotCommmand,
int snapshotCommandLength);
DatabaseSharedState* (*databaseCreateSharedState)(FDBDatabase* database);
void (*databaseSetSharedState)(FDBDatabase* database, DatabaseSharedState* p);
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
@ -359,6 +362,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
DatabaseSharedState* createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;
private:
const Reference<FdbCApi> api;
FdbCApi::FDBDatabase*
@ -587,6 +593,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
DatabaseSharedState* createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;
// private:
struct LegacyVersionMonitor;
@ -732,7 +741,7 @@ private:
Reference<ClientInfo> localClient;
std::map<std::string, ClientDesc> externalClientDescriptions;
std::map<std::string, std::vector<Reference<ClientInfo>>> externalClients;
std::map<std::string, GRVCacheSpace> clusterCacheMap;
std::map<std::string, DatabaseSharedState*> clusterSharedStateMap;
bool networkStartSetup;
volatile bool networkSetup;

View File

@ -209,7 +209,7 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
}
}
void updateCachedRVShared(double t, Version v, std::shared_ptr<DatabaseSharedState> p) {
void updateCachedRVShared(double t, Version v, DatabaseSharedState* p) {
MutexHolder mutex(p->mutexLock);
TraceEvent("CheckpointCacheUpdateShared")
.detail("Version", v)
@ -7163,13 +7163,13 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
}
std::shared_ptr<DatabaseSharedState> DatabaseContext::initSharedState() {
std::shared_ptr<DatabaseSharedState> newState = std::make_shared<DatabaseSharedState>();
DatabaseSharedState* DatabaseContext::initSharedState() {
DatabaseSharedState* newState = new DatabaseSharedState();
setSharedState(newState);
return newState;
}
void DatabaseContext::setSharedState(std::shared_ptr<DatabaseSharedState> p) {
void DatabaseContext::setSharedState(DatabaseSharedState* p) {
sharedStatePtr = p;
}

View File

@ -96,6 +96,14 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
}
DatabaseSharedState* ThreadSafeDatabase::createSharedState() {
return db->initSharedState();
}
void ThreadSafeDatabase::setSharedState(DatabaseSharedState* p) {
db->setSharedState(p);
}
// Return the main network thread busyness
double ThreadSafeDatabase::getMainThreadBusyness() {
ASSERT(g_network);

View File

@ -57,6 +57,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
DatabaseSharedState* createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;
private:
friend class ThreadSafeTransaction;
bool isConfigDB{ false };