Add clusterCacheMap to multiVersionApi and propagate shared object through createDatabase

This commit is contained in:
Jon Fu 2022-01-18 13:02:05 -05:00
parent 5dccca4081
commit bcc69ac1f5
6 changed files with 25 additions and 17 deletions

View File

@ -1171,6 +1171,8 @@ struct GRVCacheSpace {
Mutex cacheLock;
Version cachedRv;
double lastTimedGrv;
GRVCacheSpace() : cacheLock(Mutex()), cachedRv(Version(0)), lastTimedGrv(0.0) {}
};
inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {

View File

@ -153,7 +153,8 @@ public:
virtual void runNetwork() = 0;
virtual void stopNetwork() = 0;
virtual Reference<IDatabase> createDatabase(const char* clusterFilePath) = 0;
virtual Reference<IDatabase> createDatabase(const char* clusterFilePath,
GRVCacheSpace* sharedCachePtr = nullptr) = 0;
virtual void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) = 0;
};

View File

@ -649,10 +649,11 @@ Reference<IDatabase> DLApi::createDatabase609(const char* clusterFilePath) {
return makeReference<DLDatabase>(api, dbFuture);
}
Reference<IDatabase> DLApi::createDatabase(const char* clusterFilePath) {
Reference<IDatabase> DLApi::createDatabase(const char* clusterFilePath, GRVCacheSpace* sharedCachePtr) {
if (headerVersion >= 610) {
FdbCApi::FDBDatabase* db;
throwIfError(api->createDatabase(clusterFilePath, &db));
// can the FdbCApi wrapper signature be changed to add this ptr?
throwIfError(api->createDatabase(clusterFilePath, &db, sharedCachePtr));
return Reference<IDatabase>(new DLDatabase(api, db));
} else {
return DLApi::createDatabase609(clusterFilePath);
@ -1995,13 +1996,17 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void*
}
// Creates an IDatabase object that represents a connection to the cluster
Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath) {
Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath, GRVCacheSpace* sharedCachePtr) {
lock.enter();
if (!networkSetup) {
lock.leave();
throw network_not_setup();
}
std::string clusterFile(clusterFilePath);
if (clusterCacheMap.find(clusterFile) == clusterCacheMap.end()) {
clusterCacheMap[clusterFile] = GRVCacheSpace();
}
sharedCachePtr = &clusterCacheMap[clusterFile];
if (localClientDisabled) {
ASSERT(!bypassMultiClientApi);
@ -2010,7 +2015,7 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
nextThread = (nextThread + 1) % threadCount;
lock.leave();
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath, sharedCachePtr);
return Reference<IDatabase>(
new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>(), localDb));
}
@ -2019,7 +2024,7 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
ASSERT_LE(threadCount, 1);
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath, sharedCachePtr);
if (bypassMultiClientApi) {
return localDb;
} else {

View File

@ -62,7 +62,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
fdb_error_t (*setupNetwork)();
fdb_error_t (*runNetwork)();
fdb_error_t (*stopNetwork)();
fdb_error_t (*createDatabase)(const char* clusterFilePath, FDBDatabase** db);
fdb_error_t (*createDatabase)(const char* clusterFilePath, FDBDatabase** db, GRVCacheSpace* sharedCachePtr);
// Database
fdb_error_t (*databaseCreateTransaction)(FDBDatabase* database, FDBTransaction** tr);
@ -331,7 +331,7 @@ public:
void runNetwork() override;
void stopNetwork() override;
Reference<IDatabase> createDatabase(const char* clusterFilePath) override;
Reference<IDatabase> createDatabase(const char* clusterFilePath, GRVCacheSpace* sharedCachePtr = nullptr) override;
Reference<IDatabase> createDatabase609(const char* clusterFilePath); // legacy database creation
void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override;
@ -532,8 +532,6 @@ public:
struct LegacyVersionMonitor;
std::map<UID, GRVCacheSpace> clusterCacheMap;
// A struct that manages the current connection state of the MultiVersionDatabase. This wraps the underlying
// IDatabase object that is currently interacting with the cluster.
struct DatabaseState : ThreadSafeReferenceCounted<DatabaseState> {
@ -639,7 +637,7 @@ public:
void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override;
// Creates an IDatabase object that represents a connection to the cluster
Reference<IDatabase> createDatabase(const char* clusterFilePath) override;
Reference<IDatabase> createDatabase(const char* clusterFilePath, GRVCacheSpace* sharedCachePtr = nullptr) override;
static MultiVersionApi* api;
Reference<ClientInfo> getLocalClient();
@ -675,6 +673,7 @@ private:
Reference<ClientInfo> localClient;
std::map<std::string, ClientDesc> externalClientDescriptions;
std::map<std::string, std::vector<Reference<ClientInfo>>> externalClients;
std::map<std::string, GRVCacheSpace> clusterCacheMap;
bool networkStartSetup;
volatile bool networkSetup;

View File

@ -115,7 +115,7 @@ ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<Pro
[db, expectedVersion]() -> Future<ProtocolVersion> { return db->getClusterProtocol(expectedVersion); });
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion, GRVCacheSpace* sharedCachePtr) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);
@ -124,7 +124,7 @@ ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion)
DatabaseContext* db = this->db = DatabaseContext::allocateOnForeignThread();
onMainThreadVoid(
[db, connFile, apiVersion]() {
[db, connFile, apiVersion, sharedCachePtr]() {
try {
Database::createDatabase(
Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::False, LocalityData(), db)
@ -134,6 +134,7 @@ ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion)
} catch (...) {
new (db) DatabaseContext(unknown_error());
}
db->setSharedCacheSpace(sharedCachePtr);
},
nullptr);
}
@ -499,8 +500,8 @@ void ThreadSafeApi::stopNetwork() {
::stopNetwork();
}
Reference<IDatabase> ThreadSafeApi::createDatabase(const char* clusterFilePath) {
return Reference<IDatabase>(new ThreadSafeDatabase(clusterFilePath, apiVersion));
Reference<IDatabase> ThreadSafeApi::createDatabase(const char* clusterFilePath, GRVCacheSpace* sharedCachePtr) {
return Reference<IDatabase>(new ThreadSafeDatabase(clusterFilePath, apiVersion, sharedCachePtr));
}
void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) {

View File

@ -64,7 +64,7 @@ private:
DatabaseContext* db;
public: // Internal use only
ThreadSafeDatabase(std::string connFilename, int apiVersion);
ThreadSafeDatabase(std::string connFilename, int apiVersion, GRVCacheSpace* sharedCachePtr = nullptr);
ThreadSafeDatabase(DatabaseContext* db) : db(db) {}
DatabaseContext* unsafeGetPtr() const { return db; }
};
@ -169,7 +169,7 @@ public:
void runNetwork() override;
void stopNetwork() override;
Reference<IDatabase> createDatabase(const char* clusterFilePath) override;
Reference<IDatabase> createDatabase(const char* clusterFilePath, GRVCacheSpace* sharedCachePtr = nullptr) override;
void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override;