expose cluster ID through NativeAPI and C API
This commit is contained in:
parent
b7ca2a656c
commit
da76abbdf6
|
@ -358,6 +358,10 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
|
|||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT FDBFuture* fdb_database_get_cluster_id(FDBDatabase* db) {
|
||||
return (FDBFuture*)(DB(db)->getClusterId().extractPtr());
|
||||
}
|
||||
|
||||
// Get network thread busyness (updated every 1s)
|
||||
// A value of 0 indicates that the client is more or less idle
|
||||
// A value of 1 (or more) indicates that the client is saturated
|
||||
|
|
|
@ -187,6 +187,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_create_snapshot(FDBDatabase
|
|||
uint8_t const* snap_command,
|
||||
int snap_command_length);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_get_cluster_id(FDBDatabase* db);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version);
|
||||
|
|
|
@ -274,6 +274,8 @@ public:
|
|||
// Management API, create snapshot
|
||||
Future<Void> createSnapshot(StringRef uid, StringRef snapshot_command);
|
||||
|
||||
Future<UID> getClusterId();
|
||||
|
||||
Future<Void> getChangeFeedStream(Reference<ChangeFeedData> results,
|
||||
Key rangeID,
|
||||
Version begin = 0,
|
||||
|
|
|
@ -130,6 +130,8 @@ public:
|
|||
// Management API, create snapshot
|
||||
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
|
||||
|
||||
virtual ThreadFuture<UID> getClusterId() = 0;
|
||||
|
||||
// used in template functions as the Transaction type that can be created through createTransaction()
|
||||
using TransactionT = ITransaction;
|
||||
};
|
||||
|
|
|
@ -398,6 +398,25 @@ double DLDatabase::getMainThreadBusyness() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
ThreadFuture<UID> DLDatabase::getClusterId() {
|
||||
if (api->getClusterId != nullptr) {
|
||||
FdbCApi::FDBFuture* f = api->getClusterId();
|
||||
return toThreadFuture<UID>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
const uint8_t* res;
|
||||
int resLen;
|
||||
FdbCApi::fdb_error_t error = api->futureGetKey(f, &res, &resLen);
|
||||
ASSERT(!error);
|
||||
// UID loading code taken from
|
||||
// IRandom.h:load(const uint8_t* i, UID& out, Context& context)
|
||||
const uint64_t* in = reinterpret_cast<const uint64_t*>(res);
|
||||
UID out = UID(in[0], in[1]);
|
||||
return out;
|
||||
});
|
||||
}
|
||||
|
||||
return UID();
|
||||
}
|
||||
|
||||
// Returns the protocol version reported by the coordinator this client is connected to
|
||||
// If an expected version is given, the future won't return until the protocol version is different than expected
|
||||
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
||||
|
@ -1183,6 +1202,13 @@ double MultiVersionDatabase::getMainThreadBusyness() {
|
|||
return localClientBusyness;
|
||||
}
|
||||
|
||||
ThreadFuture<UID> MultiVersionDatabase::getClusterId() {
|
||||
if (dbState->db) {
|
||||
return dbState->db->getClusterId();
|
||||
}
|
||||
return UID();
|
||||
}
|
||||
|
||||
// Returns the protocol version reported by the coordinator this client is connected to
|
||||
// If an expected version is given, the future won't return until the protocol version is different than expected
|
||||
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
||||
|
|
|
@ -84,6 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int snapshotCommandLength);
|
||||
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
|
||||
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
|
||||
FDBFuture* (*getClusterId)();
|
||||
|
||||
// Transaction
|
||||
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
|
||||
|
@ -307,6 +308,7 @@ public:
|
|||
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||
ThreadFuture<UID> getClusterId() override;
|
||||
|
||||
private:
|
||||
const Reference<FdbCApi> api;
|
||||
|
@ -524,6 +526,7 @@ public:
|
|||
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||
ThreadFuture<UID> getClusterId() override;
|
||||
|
||||
// private:
|
||||
|
||||
|
|
|
@ -6910,6 +6910,26 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
|
|||
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
|
||||
}
|
||||
|
||||
ACTOR static Future<UID> getClusterIdActor(DatabaseContext* cx) {
|
||||
Database db(cx);
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
|
||||
ASSERT(clusterId.present());
|
||||
return BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<UID> DatabaseContext::getClusterId() {
|
||||
return getClusterIdActor(this);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageFeedVersionUpdater(StorageServerInterface interf, ChangeFeedStorageData* self) {
|
||||
loop {
|
||||
if (self->version.get() < self->desired.get()) {
|
||||
|
|
|
@ -95,6 +95,11 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
|
|||
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
|
||||
}
|
||||
|
||||
ThreadFuture<UID> ThreadSafeDatabase::getClusterId() {
|
||||
DatabaseContext* db = this->db;
|
||||
return onMainThread([db]() -> Future<UID> { return db->getClusterId(); });
|
||||
}
|
||||
|
||||
// Return the main network thread busyness
|
||||
double ThreadSafeDatabase::getMainThreadBusyness() {
|
||||
ASSERT(g_network);
|
||||
|
|
|
@ -56,6 +56,7 @@ public:
|
|||
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
|
||||
ThreadFuture<UID> getClusterId() override;
|
||||
|
||||
private:
|
||||
friend class ThreadSafeTransaction;
|
||||
|
|
Loading…
Reference in New Issue