ApiTester: Introduce workload manager

This commit is contained in:
Vaidas Gasiunas 2022-02-26 00:09:37 +01:00
parent c3e6eea41a
commit 3967f9ed14
6 changed files with 91 additions and 27 deletions

View File

@ -26,7 +26,7 @@ namespace FdbApiTester {
class ApiCorrectnessWorkload : public WorkloadBase { class ApiCorrectnessWorkload : public WorkloadBase {
public: public:
ApiCorrectnessWorkload() : numTxLeft(10) {} ApiCorrectnessWorkload() : numTxLeft(1000) {}
void start() override { void start() override {
schedule([this]() { nextTransaction(); }); schedule([this]() { nextTransaction(); });
@ -34,7 +34,9 @@ public:
private: private:
void nextTransaction() { void nextTransaction() {
std::cout << numTxLeft << " transactions left" << std::endl; if (numTxLeft % 100 == 0) {
std::cout << numTxLeft << " transactions left" << std::endl;
}
if (numTxLeft == 0) if (numTxLeft == 0)
return; return;
@ -54,8 +56,8 @@ private:
int numTxLeft; int numTxLeft;
}; };
std::unique_ptr<IWorkload> createApiCorrectnessWorkload() { std::shared_ptr<IWorkload> createApiCorrectnessWorkload() {
return std::make_unique<ApiCorrectnessWorkload>(); return std::make_shared<ApiCorrectnessWorkload>();
} }
} // namespace FdbApiTester } // namespace FdbApiTester

View File

@ -29,6 +29,8 @@ using namespace boost::asio;
namespace FdbApiTester { namespace FdbApiTester {
const TTaskFct NO_OP_TASK = []() {};
class AsioScheduler : public IScheduler { class AsioScheduler : public IScheduler {
public: public:
AsioScheduler(int numThreads) : numThreads(numThreads) {} AsioScheduler(int numThreads) : numThreads(numThreads) {}

View File

@ -30,6 +30,8 @@ namespace FdbApiTester {
using TTaskFct = std::function<void(void)>; using TTaskFct = std::function<void(void)>;
extern const TTaskFct NO_OP_TASK;
class IScheduler { class IScheduler {
public: public:
virtual ~IScheduler() {} virtual ~IScheduler() {}

View File

@ -20,36 +20,65 @@
#include "TesterWorkload.h" #include "TesterWorkload.h"
#include <memory> #include <memory>
#include <cassert>
namespace FdbApiTester { namespace FdbApiTester {
void WorkloadBase::init(ITransactionExecutor* txExecutor, IScheduler* sched, TTaskFct cont) { void WorkloadBase::init(WorkloadManager* manager) {
this->txExecutor = txExecutor; this->manager = manager;
this->scheduler = sched;
this->doneCont = cont;
} }
void WorkloadBase::schedule(TTaskFct task) { void WorkloadBase::schedule(TTaskFct task) {
tasksScheduled++; tasksScheduled++;
scheduler->schedule([this, task]() { manager->scheduler->schedule([this, task]() {
tasksScheduled--; tasksScheduled--;
task(); task();
contIfDone(); checkIfDone();
}); });
} }
void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) { void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) {
txRunning++; txRunning++;
txExecutor->execute(tx, [this, cont]() { manager->txExecutor->execute(tx, [this, cont]() {
txRunning--; txRunning--;
cont(); cont();
contIfDone(); checkIfDone();
}); });
} }
void WorkloadBase::contIfDone() { void WorkloadBase::checkIfDone() {
if (txRunning == 0 && tasksScheduled == 0) { if (txRunning == 0 && tasksScheduled == 0) {
doneCont(); manager->workloadDone(this);
}
}
void WorkloadManager::add(std::shared_ptr<IWorkload> workload, TTaskFct cont) {
std::unique_lock<std::mutex> lock(mutex);
workloads[workload.get()] = WorkloadInfo{ workload, cont };
}
void WorkloadManager::run() {
for (auto iter : workloads) {
iter.first->init(this);
}
for (auto iter : workloads) {
iter.first->start();
}
scheduler->join();
}
void WorkloadManager::workloadDone(IWorkload* workload) {
std::unique_lock<std::mutex> lock(mutex);
auto iter = workloads.find(workload);
assert(iter != workloads.end());
lock.unlock();
iter->second.cont();
lock.lock();
workloads.erase(iter);
bool done = workloads.empty();
lock.unlock();
if (done) {
scheduler->stop();
} }
} }

View File

@ -25,20 +25,24 @@
#include "TesterTransactionExecutor.h" #include "TesterTransactionExecutor.h"
#include <atomic> #include <atomic>
#include <unordered_map>
#include <mutex>
namespace FdbApiTester { namespace FdbApiTester {
class WorkloadManager;
class IWorkload { class IWorkload {
public: public:
virtual ~IWorkload() {} virtual ~IWorkload() {}
virtual void init(ITransactionExecutor* txExecutor, IScheduler* sched, TTaskFct cont) = 0; virtual void init(WorkloadManager* manager) = 0;
virtual void start() = 0; virtual void start() = 0;
}; };
class WorkloadBase : public IWorkload { class WorkloadBase : public IWorkload {
public: public:
WorkloadBase() : txExecutor(nullptr), scheduler(nullptr), tasksScheduled(0), txRunning(0) {} WorkloadBase() : manager(nullptr), tasksScheduled(0), txRunning(0) {}
void init(ITransactionExecutor* txExecutor, IScheduler* sched, TTaskFct cont) override; void init(WorkloadManager* manager) override;
protected: protected:
void schedule(TTaskFct task); void schedule(TTaskFct task);
@ -46,16 +50,39 @@ protected:
void execTransaction(TTxStartFct start, TTaskFct cont) { void execTransaction(TTxStartFct start, TTaskFct cont) {
execTransaction(std::make_shared<TransactionFct>(start), cont); execTransaction(std::make_shared<TransactionFct>(start), cont);
} }
void contIfDone(); void checkIfDone();
private: private:
ITransactionExecutor* txExecutor; WorkloadManager* manager;
IScheduler* scheduler;
TTaskFct doneCont;
std::atomic<int> tasksScheduled; std::atomic<int> tasksScheduled;
std::atomic<int> txRunning; std::atomic<int> txRunning;
}; };
class WorkloadManager {
public:
WorkloadManager(ITransactionExecutor* txExecutor, IScheduler* scheduler)
: txExecutor(txExecutor), scheduler(scheduler) {}
void add(std::shared_ptr<IWorkload> workload, TTaskFct cont = NO_OP_TASK);
void run();
private:
friend WorkloadBase;
struct WorkloadInfo {
std::shared_ptr<IWorkload> ref;
TTaskFct cont;
};
void workloadDone(IWorkload* workload);
ITransactionExecutor* txExecutor;
IScheduler* scheduler;
std::mutex mutex;
std::unordered_map<IWorkload*, WorkloadInfo> workloads;
};
} // namespace FdbApiTester } // namespace FdbApiTester
#endif #endif

View File

@ -220,7 +220,7 @@ void fdb_check(fdb_error_t e) {
} }
} // namespace } // namespace
std::unique_ptr<IWorkload> createApiCorrectnessWorkload(); std::shared_ptr<IWorkload> createApiCorrectnessWorkload();
} // namespace FdbApiTester } // namespace FdbApiTester
@ -248,11 +248,13 @@ void runApiCorrectness(TesterOptions& options) {
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(); std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor();
scheduler->start(); scheduler->start();
txExecutor->init(scheduler.get(), options.clusterFile.c_str(), txExecOptions); txExecutor->init(scheduler.get(), options.clusterFile.c_str(), txExecOptions);
std::unique_ptr<IWorkload> workload = createApiCorrectnessWorkload();
IScheduler* schedPtr = scheduler.get(); WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
workload->init(txExecutor.get(), schedPtr, [schedPtr]() { schedPtr->stop(); }); for (int i = 0; i < 10; i++) {
workload->start(); std::shared_ptr<IWorkload> workload = createApiCorrectnessWorkload();
scheduler->join(); workloadMgr.add(workload);
}
workloadMgr.run();
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {