Add snapshot c function
This commit is contained in:
parent
7e38cf21f5
commit
5f912f0deb
|
@ -398,6 +398,10 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_force_recovery_with_data_loss(FDBDa
|
||||||
return (FDBFuture*)(DB(db)->forceRecoveryWithDataLoss(StringRef(dcid, dcid_length)).extractPtr());
|
return (FDBFuture*)(DB(db)->forceRecoveryWithDataLoss(StringRef(dcid, dcid_length)).extractPtr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db, uint8_t const* snap_command, int snap_command_length) {
|
||||||
|
return (FDBFuture*)(DB(db)->createSnapshot(StringRef(snap_command, snap_command_length)).extractPtr());
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" DLLEXPORT
|
extern "C" DLLEXPORT
|
||||||
void fdb_transaction_destroy( FDBTransaction* tr ) {
|
void fdb_transaction_destroy( FDBTransaction* tr ) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -185,6 +185,9 @@ extern "C" {
|
||||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
|
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
|
||||||
fdb_database_force_recovery_with_data_loss( FDBDatabase* db, uint8_t const* dcid, int dcid_length);
|
fdb_database_force_recovery_with_data_loss( FDBDatabase* db, uint8_t const* dcid, int dcid_length);
|
||||||
|
|
||||||
|
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
|
||||||
|
fdb_database_create_snapshot( FDBDatabase* db, uint8_t const* snap_command, int snap_command_length);
|
||||||
|
|
||||||
DLLEXPORT void fdb_transaction_destroy( FDBTransaction* tr);
|
DLLEXPORT void fdb_transaction_destroy( FDBTransaction* tr);
|
||||||
|
|
||||||
DLLEXPORT void fdb_transaction_cancel( FDBTransaction* tr);
|
DLLEXPORT void fdb_transaction_cancel( FDBTransaction* tr);
|
||||||
|
|
|
@ -102,6 +102,10 @@ EmptyFuture Database::force_recovery_with_data_loss(FDBDatabase *db, const uint8
|
||||||
return EmptyFuture(fdb_database_force_recovery_with_data_loss(db, dcid, dcid_length));
|
return EmptyFuture(fdb_database_force_recovery_with_data_loss(db, dcid, dcid_length));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EmptyFuture Database::create_snapshot(FDBDatabase *db, const uint8_t *snap_command, int snap_command_length) {
|
||||||
|
return EmptyFuture(fdb_database_create_snapshot(db, snap_command, snap_command_length));
|
||||||
|
}
|
||||||
|
|
||||||
// Transaction
|
// Transaction
|
||||||
|
|
||||||
Transaction::Transaction(FDBDatabase* db) {
|
Transaction::Transaction(FDBDatabase* db) {
|
||||||
|
|
|
@ -154,6 +154,7 @@ public:
|
||||||
static Int64Future reboot_worker(FDBDatabase* db, const uint8_t* address, int address_length, fdb_bool_t check,
|
static Int64Future reboot_worker(FDBDatabase* db, const uint8_t* address, int address_length, fdb_bool_t check,
|
||||||
int duration);
|
int duration);
|
||||||
static EmptyFuture force_recovery_with_data_loss(FDBDatabase* db, const uint8_t* dcid, int dcid_length);
|
static EmptyFuture force_recovery_with_data_loss(FDBDatabase* db, const uint8_t* dcid, int dcid_length);
|
||||||
|
static EmptyFuture create_snapshot(FDBDatabase* db, const uint8_t* snap_command, int snap_command_length);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Wrapper around FDBTransaction, providing the same set of calls as the C API.
|
// Wrapper around FDBTransaction, providing the same set of calls as the C API.
|
||||||
|
|
|
@ -2046,6 +2046,20 @@ TEST_CASE("fdb_database_force_recovery_with_data_loss") {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("fdb_database_create_snapshot") {
|
||||||
|
std::string snapshot_command = "test";
|
||||||
|
while (1) {
|
||||||
|
fdb::EmptyFuture f = fdb::Database::create_snapshot(db, (const uint8_t*)snapshot_command.c_str(), snapshot_command.length());
|
||||||
|
fdb_error_t err = wait_future(f);
|
||||||
|
if (err == 2505) { // expected error code
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// Otherwise, something went wrong.
|
||||||
|
CHECK(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("fdb_error_predicate") {
|
TEST_CASE("fdb_error_predicate") {
|
||||||
CHECK(fdb_error_predicate(FDB_ERROR_PREDICATE_RETRYABLE, 1007)); // transaction_too_old
|
CHECK(fdb_error_predicate(FDB_ERROR_PREDICATE_RETRYABLE, 1007)); // transaction_too_old
|
||||||
CHECK(fdb_error_predicate(FDB_ERROR_PREDICATE_RETRYABLE, 1020)); // not_committed
|
CHECK(fdb_error_predicate(FDB_ERROR_PREDICATE_RETRYABLE, 1020)); // not_committed
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include <cinttypes>
|
#include <cinttypes>
|
||||||
|
|
||||||
#include "flow/DeterministicRandom.h"
|
#include "flow/DeterministicRandom.h"
|
||||||
|
#include "flow/FastRef.h"
|
||||||
#include "flow/SystemMonitor.h"
|
#include "flow/SystemMonitor.h"
|
||||||
#include "flow/TLSConfig.actor.h"
|
#include "flow/TLSConfig.actor.h"
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
@ -104,6 +105,7 @@ namespace FDB {
|
||||||
void setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value = Optional<StringRef>()) override;
|
void setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||||
Future<int64_t> rebootWorker(const StringRef& address, bool check = false, int duration = 0) override;
|
Future<int64_t> rebootWorker(const StringRef& address, bool check = false, int duration = 0) override;
|
||||||
Future<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
Future<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||||
|
Future<Void> createSnapshot(const StringRef& snap_command) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
FDBDatabase* db;
|
FDBDatabase* db;
|
||||||
|
@ -304,6 +306,13 @@ namespace FDB {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<Void> DatabaseImpl::createSnapshot(const StringRef &snap_command) {
|
||||||
|
return backToFuture<Void>( fdb_database_create_snapshot(db, snap_command.begin(), snap_command.size()), [](Reference<CFuture> f){
|
||||||
|
throw_on_error( fdb_future_get_error( f->f));
|
||||||
|
return Void();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
TransactionImpl::TransactionImpl(FDBDatabase* db) {
|
TransactionImpl::TransactionImpl(FDBDatabase* db) {
|
||||||
throw_on_error(fdb_database_create_transaction(db, &tr));
|
throw_on_error(fdb_database_create_transaction(db, &tr));
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#ifndef FDB_FLOW_FDB_FLOW_H
|
#ifndef FDB_FLOW_FDB_FLOW_H
|
||||||
#define FDB_FLOW_FDB_FLOW_H
|
#define FDB_FLOW_FDB_FLOW_H
|
||||||
|
|
||||||
|
#include "flow/Arena.h"
|
||||||
#include <flow/flow.h>
|
#include <flow/flow.h>
|
||||||
|
|
||||||
#define FDB_API_VERSION 700
|
#define FDB_API_VERSION 700
|
||||||
|
@ -126,6 +127,7 @@ namespace FDB {
|
||||||
virtual void setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
virtual void setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
||||||
virtual Future<int64_t> rebootWorker(const StringRef& address, bool check = false, int duration = 0) = 0;
|
virtual Future<int64_t> rebootWorker(const StringRef& address, bool check = false, int duration = 0) = 0;
|
||||||
virtual Future<Void> forceRecoveryWithDataLoss(const StringRef& dcid) = 0;
|
virtual Future<Void> forceRecoveryWithDataLoss(const StringRef& dcid) = 0;
|
||||||
|
virtual Future<Void> createSnapshot(const StringRef& snap_command) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class API {
|
class API {
|
||||||
|
|
|
@ -211,6 +211,8 @@ public:
|
||||||
Future<int64_t> rebootWorker(StringRef address, bool check = false, int duration = 0);
|
Future<int64_t> rebootWorker(StringRef address, bool check = false, int duration = 0);
|
||||||
// Management API, force the database to recover into DCID, causing the database to lose the most recently committed mutations
|
// Management API, force the database to recover into DCID, causing the database to lose the most recently committed mutations
|
||||||
Future<Void> forceRecoveryWithDataLoss(StringRef dcId);
|
Future<Void> forceRecoveryWithDataLoss(StringRef dcId);
|
||||||
|
// Management API, create snapshot
|
||||||
|
Future<Void> createSnapshot(StringRef snapshot_command);
|
||||||
|
|
||||||
//private:
|
//private:
|
||||||
explicit DatabaseContext( Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
|
explicit DatabaseContext( Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
|
||||||
|
|
|
@ -90,6 +90,8 @@ public:
|
||||||
virtual ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) = 0;
|
virtual ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) = 0;
|
||||||
// Management API, force the database to recover into DCID, causing the database to lose the most recently committed mutations
|
// Management API, force the database to recover into DCID, causing the database to lose the most recently committed mutations
|
||||||
virtual ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) = 0;
|
virtual ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) = 0;
|
||||||
|
// Management API, create snapshot
|
||||||
|
virtual ThreadFuture<Void> createSnapshot(const StringRef& snapshot_command) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class IClientApi {
|
class IClientApi {
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include "fdbclient/MultiVersionAssignmentVars.h"
|
#include "fdbclient/MultiVersionAssignmentVars.h"
|
||||||
#include "fdbclient/ThreadSafeTransaction.h"
|
#include "fdbclient/ThreadSafeTransaction.h"
|
||||||
|
|
||||||
|
#include "flow/FastRef.h"
|
||||||
#include "flow/network.h"
|
#include "flow/network.h"
|
||||||
#include "flow/Platform.h"
|
#include "flow/Platform.h"
|
||||||
#include "flow/ProtocolVersion.h"
|
#include "flow/ProtocolVersion.h"
|
||||||
|
@ -309,6 +310,17 @@ ThreadFuture<Void> DLDatabase::forceRecoveryWithDataLoss(const StringRef &dcid)
|
||||||
return Void();
|
return Void();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef &snapshot_command){
|
||||||
|
if(!api->databaseCreateSnapshot) {
|
||||||
|
return unsupported_operation();
|
||||||
|
}
|
||||||
|
|
||||||
|
FdbCApi::FDBFuture *f = api->databaseCreateSnapshot(db, snapshot_command.begin(), snapshot_command.size());
|
||||||
|
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
|
||||||
|
return Void();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// DLApi
|
// DLApi
|
||||||
template<class T>
|
template<class T>
|
||||||
|
@ -346,6 +358,7 @@ void DLApi::init() {
|
||||||
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
|
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
|
||||||
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
|
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
|
||||||
loadClientFunction(&api->databaseForceRecoveryWithDataLoss, lib, fdbCPath, "fdb_database_force_recovery_with_data_loss", headerVersion >= 700);
|
loadClientFunction(&api->databaseForceRecoveryWithDataLoss, lib, fdbCPath, "fdb_database_force_recovery_with_data_loss", headerVersion >= 700);
|
||||||
|
loadClientFunction(&api->databaseCreateSnapshot, lib, fdbCPath, "fdb_database_create_snapshot", headerVersion >= 700);
|
||||||
|
|
||||||
loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option");
|
loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option");
|
||||||
loadClientFunction(&api->transactionDestroy, lib, fdbCPath, "fdb_transaction_destroy");
|
loadClientFunction(&api->transactionDestroy, lib, fdbCPath, "fdb_transaction_destroy");
|
||||||
|
@ -820,6 +833,11 @@ ThreadFuture<Void> MultiVersionDatabase::forceRecoveryWithDataLoss(const StringR
|
||||||
return abortableFuture(f, dbState->dbVar->get().onChange);
|
return abortableFuture(f, dbState->dbVar->get().onChange);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef &snapshot_command) {
|
||||||
|
auto f = dbState->db ? dbState->db->createSnapshot(snapshot_command) : ThreadFuture<Void>(Never());
|
||||||
|
return abortableFuture(f, dbState->dbVar->get().onChange);
|
||||||
|
}
|
||||||
|
|
||||||
void MultiVersionDatabase::Connector::connect() {
|
void MultiVersionDatabase::Connector::connect() {
|
||||||
addref();
|
addref();
|
||||||
onMainThreadVoid([this]() {
|
onMainThreadVoid([this]() {
|
||||||
|
|
|
@ -68,6 +68,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
||||||
void (*databaseDestroy)(FDBDatabase *database);
|
void (*databaseDestroy)(FDBDatabase *database);
|
||||||
FDBFuture* (*databaseRebootWorker)(FDBDatabase *database, uint8_t const *address, int addressLength, fdb_bool_t check, int duration);
|
FDBFuture* (*databaseRebootWorker)(FDBDatabase *database, uint8_t const *address, int addressLength, fdb_bool_t check, int duration);
|
||||||
FDBFuture* (*databaseForceRecoveryWithDataLoss)(FDBDatabase *database, uint8_t const *dcid, int dcidLength);
|
FDBFuture* (*databaseForceRecoveryWithDataLoss)(FDBDatabase *database, uint8_t const *dcid, int dcidLength);
|
||||||
|
FDBFuture* (*databaseCreateSnapshot)(FDBDatabase *database, uint8_t const *snapshot_commmand, int snapshotCommandLength);
|
||||||
|
|
||||||
//Transaction
|
//Transaction
|
||||||
fdb_error_t (*transactionSetOption)(FDBTransaction *tr, FDBTransactionOptions::Option option, uint8_t const *value, int valueLength);
|
fdb_error_t (*transactionSetOption)(FDBTransaction *tr, FDBTransactionOptions::Option option, uint8_t const *value, int valueLength);
|
||||||
|
@ -199,6 +200,7 @@ public:
|
||||||
|
|
||||||
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
||||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||||
|
ThreadFuture<Void> createSnapshot(const StringRef& snapshot_command) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const Reference<FdbCApi> api;
|
const Reference<FdbCApi> api;
|
||||||
|
@ -320,6 +322,8 @@ class MultiVersionApi;
|
||||||
|
|
||||||
class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
|
class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
|
||||||
public:
|
public:
|
||||||
|
struct DatabaseState;
|
||||||
|
|
||||||
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
|
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
|
||||||
~MultiVersionDatabase() override;
|
~MultiVersionDatabase() override;
|
||||||
|
|
||||||
|
@ -331,11 +335,14 @@ public:
|
||||||
|
|
||||||
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
|
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
|
||||||
|
|
||||||
|
const Reference<DatabaseState>& getDbState() {return dbState;}
|
||||||
|
ThreadFuture<Void> dbAvaliable();
|
||||||
|
|
||||||
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
||||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||||
|
ThreadFuture<Void> createSnapshot(const StringRef& snapshot_command) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct DatabaseState;
|
|
||||||
|
|
||||||
struct Connector : ThreadCallback, ThreadSafeReferenceCounted<Connector> {
|
struct Connector : ThreadCallback, ThreadSafeReferenceCounted<Connector> {
|
||||||
Connector(Reference<DatabaseState> dbState, Reference<ClientInfo> client, std::string clusterFilePath) : dbState(dbState), client(client), clusterFilePath(clusterFilePath), connected(false), cancelled(false) {}
|
Connector(Reference<DatabaseState> dbState, Reference<ClientInfo> client, std::string clusterFilePath) : dbState(dbState), client(client), clusterFilePath(clusterFilePath), connected(false), cancelled(false) {}
|
||||||
|
@ -361,6 +368,7 @@ private:
|
||||||
bool cancelled;
|
bool cancelled;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
struct DatabaseState : ThreadSafeReferenceCounted<DatabaseState> {
|
struct DatabaseState : ThreadSafeReferenceCounted<DatabaseState> {
|
||||||
DatabaseState();
|
DatabaseState();
|
||||||
|
|
||||||
|
|
|
@ -4835,3 +4835,17 @@ Future<int64_t> DatabaseContext::rebootWorker(StringRef addr, bool check, int du
|
||||||
Future<Void> DatabaseContext::forceRecoveryWithDataLoss(StringRef dcId) {
|
Future<Void> DatabaseContext::forceRecoveryWithDataLoss(StringRef dcId) {
|
||||||
return forceRecovery(getConnectionFile(), dcId);
|
return forceRecovery(getConnectionFile(), dcId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> createSnapshotActor(DatabaseContext* cx, StringRef snapCmd) {
|
||||||
|
UID snapUID = deterministicRandom()->randomUniqueID();
|
||||||
|
try {
|
||||||
|
wait(mgmtSnapCreate(cx->clone(), snapCmd, snapUID));
|
||||||
|
} catch (Error& e) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> DatabaseContext::createSnapshot(StringRef snapshot_command) {
|
||||||
|
return createSnapshotActor(this, snapshot_command);
|
||||||
|
}
|
||||||
|
|
|
@ -84,6 +84,14 @@ ThreadFuture<Void> ThreadSafeDatabase::forceRecoveryWithDataLoss(const StringRef
|
||||||
} );
|
} );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& snapshot_command) {
|
||||||
|
DatabaseContext *db = this->db;
|
||||||
|
Key cmd = snapshot_command;
|
||||||
|
return onMainThread( [db, cmd]() -> Future<Void> {
|
||||||
|
return db->createSnapshot(cmd);
|
||||||
|
} );
|
||||||
|
}
|
||||||
|
|
||||||
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
|
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
|
||||||
ClusterConnectionFile *connFile = new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);
|
ClusterConnectionFile *connFile = new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ public:
|
||||||
|
|
||||||
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
|
||||||
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
|
||||||
|
ThreadFuture<Void> createSnapshot(const StringRef& snapshot_command) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class ThreadSafeTransaction;
|
friend class ThreadSafeTransaction;
|
||||||
|
|
Loading…
Reference in New Issue