ApiTester: Options to run each transaction in new database & to execute callbacks on external threads

This commit is contained in:
Vaidas Gasiunas 2022-03-04 18:34:36 +01:00
parent 20c1e893c7
commit 1e75ffd880
9 changed files with 111 additions and 30 deletions

View File

@ -61,6 +61,14 @@ std::unordered_map<std::string, std::function<void(const std::string& value, Tes
[](const std::string& value, TestSpec* spec) { //
spec->multiThreaded = (value == "true");
} },
{ "fdbCallbacksOnExternalThreads",
[](const std::string& value, TestSpec* spec) { //
spec->fdbCallbacksOnExternalThreads = (value == "true");
} },
{ "databasePerTransaction",
[](const std::string& value, TestSpec* spec) { //
spec->databasePerTransaction = (value == "true");
} },
{ "minFdbThreads",
[](const std::string& value, TestSpec* spec) { //
processIntOption(value, "minFdbThreads", spec->minFdbThreads, 1, 1000);

View File

@ -43,6 +43,8 @@ struct TestSpec {
bool blockOnFutures = false;
bool multiThreaded = false;
bool buggify = false;
bool fdbCallbacksOnExternalThreads = false;
bool databasePerTransaction = false;
int minFdbThreads = 1;
int maxFdbThreads = 1;
int minClientThreads = 1;

View File

@ -187,15 +187,43 @@ private:
IScheduler* scheduler;
};
class TransactionExecutor : public ITransactionExecutor {
class TransactionExecutorBase : public ITransactionExecutor {
public:
TransactionExecutor() : scheduler(nullptr) {}
TransactionExecutorBase(const TransactionExecutorOptions& options) : options(options), scheduler(nullptr) {}
~TransactionExecutor() { release(); }
void init(IScheduler* scheduler, const char* clusterFile, const TransactionExecutorOptions& options) override {
void init(IScheduler* scheduler, const char* clusterFile) override {
this->scheduler = scheduler;
this->options = options;
this->clusterFile = clusterFile;
}
protected:
void executeWithDatabase(FDBDatabase* db, std::shared_ptr<ITransactionActor> txActor, TTaskFct cont) {
FDBTransaction* tx;
fdb_error_t err = fdb_database_create_transaction(db, &tx);
if (err != error_code_success) {
txActor->setError(err);
cont();
} else {
TransactionContext* ctx = new TransactionContext(tx, txActor, cont, options, scheduler);
txActor->init(ctx);
txActor->start();
}
}
protected:
TransactionExecutorOptions options;
std::string clusterFile;
IScheduler* scheduler;
};
class DBPoolTransactionExecutor : public TransactionExecutorBase {
public:
DBPoolTransactionExecutor(const TransactionExecutorOptions& options) : TransactionExecutorBase(options) {}
~DBPoolTransactionExecutor() override { release(); }
void init(IScheduler* scheduler, const char* clusterFile) override {
TransactionExecutorBase::init(scheduler, clusterFile);
for (int i = 0; i < options.numDatabases; i++) {
FDBDatabase* db;
fdb_error_t err = fdb_create_database(clusterFile, &db);
@ -211,19 +239,10 @@ public:
void execute(std::shared_ptr<ITransactionActor> txActor, TTaskFct cont) override {
int idx = random.randomInt(0, options.numDatabases - 1);
FDBTransaction* tx;
fdb_error_t err = fdb_database_create_transaction(databases[idx], &tx);
if (err != error_code_success) {
txActor->setError(err);
cont();
} else {
TransactionContext* ctx = new TransactionContext(tx, txActor, cont, options, scheduler);
txActor->init(ctx);
txActor->start();
}
executeWithDatabase(databases[idx], txActor, cont);
}
void release() override {
void release() {
for (FDBDatabase* db : databases) {
fdb_database_destroy(db);
}
@ -231,13 +250,33 @@ public:
private:
std::vector<FDBDatabase*> databases;
TransactionExecutorOptions options;
IScheduler* scheduler;
Random random;
};
std::unique_ptr<ITransactionExecutor> createTransactionExecutor() {
return std::make_unique<TransactionExecutor>();
class DBPerTransactionExecutor : public TransactionExecutorBase {
public:
DBPerTransactionExecutor(const TransactionExecutorOptions& options) : TransactionExecutorBase(options) {}
void execute(std::shared_ptr<ITransactionActor> txActor, TTaskFct cont) override {
FDBDatabase* db = nullptr;
fdb_error_t err = fdb_create_database(clusterFile.c_str(), &db);
if (err != error_code_success) {
txActor->setError(err);
cont();
}
executeWithDatabase(db, txActor, [cont, db]() {
fdb_database_destroy(db);
cont();
});
}
};
std::unique_ptr<ITransactionExecutor> createTransactionExecutor(const TransactionExecutorOptions& options) {
if (options.databasePerTransaction) {
return std::make_unique<DBPerTransactionExecutor>(options);
} else {
return std::make_unique<DBPoolTransactionExecutor>(options);
}
}
} // namespace FdbApiTester

View File

@ -80,18 +80,18 @@ private:
struct TransactionExecutorOptions {
std::string prefix = "";
bool blockOnFutures = false;
bool databasePerTransaction = false;
int numDatabases = 1;
};
class ITransactionExecutor {
public:
virtual ~ITransactionExecutor() {}
virtual void init(IScheduler* sched, const char* clusterFile, const TransactionExecutorOptions& options) = 0;
virtual void init(IScheduler* sched, const char* clusterFile) = 0;
virtual void execute(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) = 0;
virtual void release() = 0;
};
std::unique_ptr<ITransactionExecutor> createTransactionExecutor();
std::unique_ptr<ITransactionExecutor> createTransactionExecutor(const TransactionExecutorOptions& options);
} // namespace FdbApiTester

View File

@ -191,6 +191,10 @@ void applyNetworkOptions(TesterOptions& options) {
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION, options.numFdbThreads));
}
if (options.testSpec.fdbCallbacksOnExternalThreads) {
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CALLBACKS_ON_EXTERNAL_THREADS));
}
if (options.testSpec.buggify) {
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_BUGGIFY_ENABLE));
}
@ -219,11 +223,12 @@ bool runWorkloads(TesterOptions& options) {
TransactionExecutorOptions txExecOptions;
txExecOptions.blockOnFutures = options.testSpec.blockOnFutures;
txExecOptions.numDatabases = options.numDatabases;
txExecOptions.databasePerTransaction = options.testSpec.databasePerTransaction;
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor();
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);
scheduler->start();
txExecutor->init(scheduler.get(), options.clusterFile.c_str(), txExecOptions);
txExecutor->init(scheduler.get(), options.clusterFile.c_str());
WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
for (const auto& workloadSpec : options.testSpec.workloads) {

View File

@ -1,6 +1,8 @@
[[test]]
title = 'API Correctness Blocking'
multiThreaded = true
buggify = true
blockOnFutures = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
@ -9,8 +11,7 @@ minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
buggify = true
blockOnFutures = true
[[test.workload]]
name = 'ApiCorrectness'

View File

@ -0,0 +1,24 @@
[[test]]
title = 'API Correctness Callbacks On External Threads'
multiThreaded = true
fdbCallbacksOnExternalThreads = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100
readExistingKeysRatio = 0.9

View File

@ -1,6 +1,8 @@
[[test]]
title = 'API Correctness Buggify'
title = 'API Correctness Database Per Transaction'
multiThreaded = true
buggify = true
databasePerTransaction = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
@ -9,7 +11,6 @@ minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
buggify = true
[[test.workload]]
name = 'ApiCorrectness'

View File

@ -1,6 +1,7 @@
[[test]]
title = 'API Correctness Multi Threaded'
multiThreaded = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2