remove all multi-version client code related to grv cache
This commit is contained in:
parent
d8e7fea421
commit
6d05ecffc2
|
@ -1,8 +1,6 @@
|
|||
set(FDB_C_SRCS
|
||||
fdb_c.cpp
|
||||
foundationdb/fdb_c.h
|
||||
foundationdb/fdb_c_internal.h
|
||||
foundationdb/fdb_c_types.h
|
||||
ThreadCleanup.cpp)
|
||||
|
||||
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/foundationdb)
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
#include "fdbclient/MultiVersionTransaction.h"
|
||||
#include "fdbclient/MultiVersionAssignmentVars.h"
|
||||
#include "foundationdb/fdb_c.h"
|
||||
#include "foundationdb/fdb_c_internal.h"
|
||||
|
||||
int g_api_version = 0;
|
||||
|
||||
|
@ -406,14 +405,6 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
|
|||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT DatabaseSharedState* fdb_database_create_shared_state(FDBDatabase* db) {
|
||||
return (DatabaseSharedState*)(DB(db)->createSharedState());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p) {
|
||||
(DB(db)->setSharedState(p));
|
||||
}
|
||||
|
||||
// Get network thread busyness (updated every 1s)
|
||||
// A value of 0 indicates that the client is more or less idle
|
||||
// A value of 1 (or more) indicates that the client is saturated
|
||||
|
|
|
@ -58,12 +58,20 @@
|
|||
#include <stdint.h>
|
||||
|
||||
#include "fdb_c_options.g.h"
|
||||
#include "fdb_c_types.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/* Pointers to these opaque types represent objects in the FDB API */
|
||||
typedef struct FDB_future FDBFuture;
|
||||
typedef struct FDB_result FDBResult;
|
||||
typedef struct FDB_database FDBDatabase;
|
||||
typedef struct FDB_transaction FDBTransaction;
|
||||
|
||||
typedef int fdb_error_t;
|
||||
typedef int fdb_bool_t;
|
||||
|
||||
DLLEXPORT const char* fdb_get_error(fdb_error_t code);
|
||||
|
||||
DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_error_t code);
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
* fdb_c_internal.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDB_C_INTERNAL_H
|
||||
#define FDB_C_INTERNAL_H
|
||||
#pragma once
|
||||
|
||||
#ifndef DLLEXPORT
|
||||
#define DLLEXPORT
|
||||
#endif
|
||||
|
||||
#include "fdb_c_types.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
// forward declaration and typedef
|
||||
typedef struct DatabaseSharedState DatabaseSharedState;
|
||||
|
||||
DLLEXPORT DatabaseSharedState* fdb_database_create_shared_state(FDBDatabase* db);
|
||||
|
||||
DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
#endif
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
* fdb_c_types.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDB_C_TYPES_H
|
||||
#define FDB_C_TYPES_H
|
||||
#pragma once
|
||||
|
||||
#ifndef DLLEXPORT
|
||||
#define DLLEXPORT
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/* Pointers to these opaque types represent objects in the FDB API */
|
||||
typedef struct FDB_future FDBFuture;
|
||||
typedef struct FDB_result FDBResult;
|
||||
typedef struct FDB_database FDBDatabase;
|
||||
typedef struct FDB_transaction FDBTransaction;
|
||||
|
||||
typedef int fdb_error_t;
|
||||
typedef int fdb_bool_t;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
#endif
|
|
@ -468,11 +468,6 @@ public:
|
|||
int outstandingWatches;
|
||||
int maxOutstandingWatches;
|
||||
|
||||
// Manage any shared state that may be used by MVC
|
||||
DatabaseSharedState* sharedStatePtr;
|
||||
DatabaseSharedState* initSharedState();
|
||||
void setSharedState(DatabaseSharedState* p);
|
||||
|
||||
// GRV Cache
|
||||
// Database-level read version cache storing the most recent successful GRV as well as the time it was requested.
|
||||
double lastTimedGrv;
|
||||
|
|
|
@ -1171,22 +1171,6 @@ struct StorageMigrationType {
|
|||
uint32_t type;
|
||||
};
|
||||
|
||||
struct GRVCacheSpace {
|
||||
Version cachedRv;
|
||||
double lastTimedGrv;
|
||||
|
||||
GRVCacheSpace() : cachedRv(Version(0)), lastTimedGrv(0.0) {}
|
||||
};
|
||||
|
||||
// This structure can be extended in the future to include additional features that required a shared state
|
||||
struct DatabaseSharedState {
|
||||
Mutex mutexLock;
|
||||
GRVCacheSpace grvCacheSpace;
|
||||
int refCount;
|
||||
|
||||
DatabaseSharedState() : mutexLock(Mutex()), grvCacheSpace(GRVCacheSpace()) {}
|
||||
};
|
||||
|
||||
inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {
|
||||
int pos = locality.find(':');
|
||||
// locality should be either 0 or in the format '<non_empty_string>:<non_empty_string>'
|
||||
|
|
|
@ -137,10 +137,6 @@ public:
|
|||
// Management API, create snapshot
|
||||
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
|
||||
|
||||
// Interface to manage shared state across multiple connections to the same Database
|
||||
virtual DatabaseSharedState* createSharedState() = 0;
|
||||
virtual void setSharedState(DatabaseSharedState* p) = 0;
|
||||
|
||||
// used in template functions as the Transaction type that can be created through createTransaction()
|
||||
using TransactionT = ITransaction;
|
||||
};
|
||||
|
|
|
@ -446,19 +446,6 @@ ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef& uid, const String
|
|||
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
|
||||
}
|
||||
|
||||
DatabaseSharedState* DLDatabase::createSharedState() {
|
||||
if (!api->databaseCreateSharedState) {
|
||||
return nullptr;
|
||||
}
|
||||
return api->databaseCreateSharedState(db);
|
||||
}
|
||||
|
||||
void DLDatabase::setSharedState(DatabaseSharedState* p) {
|
||||
if (api->databaseSetSharedState) {
|
||||
api->databaseSetSharedState(db, p);
|
||||
}
|
||||
}
|
||||
|
||||
// Get network thread busyness
|
||||
double DLDatabase::getMainThreadBusyness() {
|
||||
if (api->databaseGetMainThreadBusyness != nullptr) {
|
||||
|
@ -736,7 +723,6 @@ Reference<IDatabase> DLApi::createDatabase609(const char* clusterFilePath) {
|
|||
Reference<IDatabase> DLApi::createDatabase(const char* clusterFilePath) {
|
||||
if (headerVersion >= 610) {
|
||||
FdbCApi::FDBDatabase* db;
|
||||
// can the FdbCApi wrapper signature be changed to add this ptr?
|
||||
throwIfError(api->createDatabase(clusterFilePath, &db));
|
||||
return Reference<IDatabase>(new DLDatabase(api, db));
|
||||
} else {
|
||||
|
@ -1191,14 +1177,6 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
|
|||
: dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) {
|
||||
dbState->db = db;
|
||||
dbState->dbVar->set(db);
|
||||
auto stateMapKey = std::make_pair(clusterFilePath, dbState->dbProtocolVersion.get());
|
||||
if (api->clusterSharedStateMap.find(stateMapKey) == api->clusterSharedStateMap.end() ||
|
||||
api->clusterSharedStateMap[stateMapKey] == nullptr) {
|
||||
DatabaseSharedState* p = db->createSharedState();
|
||||
api->clusterSharedStateMap[stateMapKey] = p;
|
||||
} else {
|
||||
db->setSharedState(api->clusterSharedStateMap[stateMapKey]);
|
||||
}
|
||||
if (openConnectors) {
|
||||
if (!api->localClientDisabled) {
|
||||
dbState->addClient(api->getLocalClient());
|
||||
|
@ -1308,19 +1286,6 @@ ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef& uid, co
|
|||
return abortableFuture(f, dbState->dbVar->get().onChange);
|
||||
}
|
||||
|
||||
DatabaseSharedState* MultiVersionDatabase::createSharedState() {
|
||||
if (dbState->db) {
|
||||
return dbState->db->createSharedState();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void MultiVersionDatabase::setSharedState(DatabaseSharedState* p) {
|
||||
if (dbState->db) {
|
||||
dbState->db->setSharedState(p);
|
||||
}
|
||||
}
|
||||
|
||||
// Get network thread busyness
|
||||
// Return the busyness for the main thread. When using external clients, take the larger of the local client
|
||||
// and the external client's busyness.
|
||||
|
@ -1538,15 +1503,6 @@ void MultiVersionDatabase::DatabaseState::updateDatabase(Reference<IDatabase> ne
|
|||
}
|
||||
}
|
||||
|
||||
auto stateMapKey = std::make_pair(clusterFilePath, dbProtocolVersion.get());
|
||||
if (MultiVersionApi::api->clusterSharedStateMap.find(stateMapKey) ==
|
||||
MultiVersionApi::api->clusterSharedStateMap.end() ||
|
||||
MultiVersionApi::api->clusterSharedStateMap[stateMapKey] == nullptr) {
|
||||
DatabaseSharedState* p = db->createSharedState();
|
||||
MultiVersionApi::api->clusterSharedStateMap[stateMapKey] = p;
|
||||
} else {
|
||||
db->setSharedState(MultiVersionApi::api->clusterSharedStateMap[stateMapKey]);
|
||||
}
|
||||
dbVar->set(db);
|
||||
|
||||
ASSERT(protocolVersionMonitor.isValid());
|
||||
|
|
|
@ -108,8 +108,6 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int uidLength,
|
||||
uint8_t const* snapshotCommmand,
|
||||
int snapshotCommandLength);
|
||||
DatabaseSharedState* (*databaseCreateSharedState)(FDBDatabase* database);
|
||||
void (*databaseSetSharedState)(FDBDatabase* database, DatabaseSharedState* p);
|
||||
|
||||
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
|
||||
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
|
||||
|
@ -362,9 +360,6 @@ public:
|
|||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||
|
||||
DatabaseSharedState* createSharedState() override;
|
||||
void setSharedState(DatabaseSharedState* p) override;
|
||||
|
||||
private:
|
||||
const Reference<FdbCApi> api;
|
||||
FdbCApi::FDBDatabase*
|
||||
|
@ -593,9 +588,6 @@ public:
|
|||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||
|
||||
DatabaseSharedState* createSharedState() override;
|
||||
void setSharedState(DatabaseSharedState* p) override;
|
||||
|
||||
// private:
|
||||
|
||||
struct LegacyVersionMonitor;
|
||||
|
@ -718,8 +710,6 @@ public:
|
|||
|
||||
bool callbackOnMainThread;
|
||||
bool localClientDisabled;
|
||||
// Map of (clusterFilePath + protocolVersion) -> DatabaseSharedState pointer
|
||||
std::map<std::pair<std::string, ProtocolVersion>, DatabaseSharedState*> clusterSharedStateMap;
|
||||
|
||||
static bool apiVersionAtLeast(int minVersion);
|
||||
|
||||
|
|
|
@ -209,23 +209,8 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
|
|||
}
|
||||
}
|
||||
|
||||
void updateCachedRVShared(double t, Version v, DatabaseSharedState* p) {
|
||||
MutexHolder mutex(p->mutexLock);
|
||||
TraceEvent("CheckpointCacheUpdateShared")
|
||||
.detail("Version", v)
|
||||
.detail("CurTime", t)
|
||||
.detail("LastVersion", p->grvCacheSpace.cachedRv)
|
||||
.detail("LastTime", p->grvCacheSpace.lastTimedGrv);
|
||||
p->grvCacheSpace.cachedRv = v;
|
||||
if (t > p->grvCacheSpace.lastTimedGrv) {
|
||||
p->grvCacheSpace.lastTimedGrv = t;
|
||||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::updateCachedRV(double t, Version v) {
|
||||
if (sharedStatePtr) {
|
||||
return updateCachedRVShared(t, v, sharedStatePtr);
|
||||
}
|
||||
if (v >= cachedRv) {
|
||||
TraceEvent("CheckpointCacheUpdate")
|
||||
.detail("Version", v)
|
||||
|
@ -244,18 +229,10 @@ void DatabaseContext::updateCachedRV(double t, Version v) {
|
|||
}
|
||||
|
||||
Version DatabaseContext::getCachedRV() {
|
||||
if (sharedStatePtr) {
|
||||
MutexHolder mutex(sharedStatePtr->mutexLock);
|
||||
return sharedStatePtr->grvCacheSpace.cachedRv;
|
||||
}
|
||||
return cachedRv;
|
||||
}
|
||||
|
||||
double DatabaseContext::getLastTimedGRV() {
|
||||
if (sharedStatePtr) {
|
||||
MutexHolder mutex(sharedStatePtr->mutexLock);
|
||||
return sharedStatePtr->grvCacheSpace.lastTimedGrv;
|
||||
}
|
||||
return lastTimedGrv;
|
||||
}
|
||||
|
||||
|
@ -1335,10 +1312,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
|
||||
bytesPerCommit(1000), outstandingWatches(0), sharedStatePtr(nullptr), lastTimedGrv(0.0), cachedRv(0),
|
||||
lastTimedRkThrottle(0.0), lastProxyRequest(0.0), transactionTracingSample(false), taskID(taskID),
|
||||
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion),
|
||||
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
bytesPerCommit(1000), outstandingWatches(0), lastTimedGrv(0.0), cachedRv(0), lastTimedRkThrottle(0.0),
|
||||
lastProxyRequest(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
|
||||
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
|
||||
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
|
||||
|
@ -1627,13 +1604,6 @@ DatabaseContext::~DatabaseContext() {
|
|||
if (grvUpdateHandler.isValid()) {
|
||||
grvUpdateHandler.cancel();
|
||||
}
|
||||
if (sharedStatePtr) {
|
||||
sharedStatePtr->refCount--;
|
||||
if (sharedStatePtr->refCount <= 0) {
|
||||
delete sharedStatePtr;
|
||||
sharedStatePtr = nullptr;
|
||||
}
|
||||
}
|
||||
for (auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
|
||||
it->second->notifyContextDestroyed();
|
||||
ASSERT_ABORT(server_interf.empty());
|
||||
|
@ -6043,16 +6013,6 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
|
|||
return rep.version;
|
||||
}
|
||||
|
||||
// ACTOR Future<Version> getDBCachedReadVersion(DatabaseContext* cx, double requestTime) {
|
||||
// if (requestTime - cx->lastTimedGrv.get() > CLIENT_KNOBS->MAX_VERSION_CACHE_LAG) {
|
||||
// wait(cx->lastTimedGrv.whenAtLeast(requestTime - CLIENT_KNOBS->MAX_VERSION_CACHE_LAG));
|
||||
// }
|
||||
// // Want to check that the cached version is at most
|
||||
// // MAX_VERSION_CACHE_LAG (100ms) old compared to requestTime.
|
||||
// ASSERT(!debug_checkVersionTime(cx->cachedRv, requestTime, "CheckStaleness"));
|
||||
// return cx->cachedRv;
|
||||
// }
|
||||
|
||||
bool rkThrottlingCooledDown(DatabaseContext* cx) {
|
||||
if (cx->lastTimedRkThrottle == 0.0) {
|
||||
return true;
|
||||
|
@ -7169,17 +7129,6 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
|
|||
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
|
||||
}
|
||||
|
||||
DatabaseSharedState* DatabaseContext::initSharedState() {
|
||||
DatabaseSharedState* newState = new DatabaseSharedState();
|
||||
setSharedState(newState);
|
||||
return newState;
|
||||
}
|
||||
|
||||
void DatabaseContext::setSharedState(DatabaseSharedState* p) {
|
||||
sharedStatePtr = p;
|
||||
sharedStatePtr->refCount++;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageFeedVersionUpdater(StorageServerInterface interf, ChangeFeedStorageData* self) {
|
||||
state Promise<Void> destroyed = self->destroyed;
|
||||
loop {
|
||||
|
|
|
@ -96,14 +96,6 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
|
|||
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
|
||||
}
|
||||
|
||||
DatabaseSharedState* ThreadSafeDatabase::createSharedState() {
|
||||
return db->initSharedState();
|
||||
}
|
||||
|
||||
void ThreadSafeDatabase::setSharedState(DatabaseSharedState* p) {
|
||||
db->setSharedState(p);
|
||||
}
|
||||
|
||||
// Return the main network thread busyness
|
||||
double ThreadSafeDatabase::getMainThreadBusyness() {
|
||||
ASSERT(g_network);
|
||||
|
|
|
@ -57,9 +57,6 @@ public:
|
|||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||
|
||||
DatabaseSharedState* createSharedState() override;
|
||||
void setSharedState(DatabaseSharedState* p) override;
|
||||
|
||||
private:
|
||||
friend class ThreadSafeTransaction;
|
||||
bool isConfigDB{ false };
|
||||
|
|
Loading…
Reference in New Issue