From 3967f9ed1482c72a88cf2fcbe847ca2f65b8df1f Mon Sep 17 00:00:00 2001 From: Vaidas Gasiunas Date: Sat, 26 Feb 2022 00:09:37 +0100 Subject: [PATCH] ApiTester: Introduce workload manager --- .../apitester/TesterCorrectnessWorkload.cpp | 10 ++-- bindings/c/test/apitester/TesterScheduler.cpp | 2 + bindings/c/test/apitester/TesterScheduler.h | 2 + bindings/c/test/apitester/TesterWorkload.cpp | 49 +++++++++++++++---- bindings/c/test/apitester/TesterWorkload.h | 41 +++++++++++++--- .../c/test/apitester/fdb_c_api_tester.cpp | 14 +++--- 6 files changed, 91 insertions(+), 27 deletions(-) diff --git a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp index 3ab3c5ab71..1b90e4b5a6 100644 --- a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp +++ b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp @@ -26,7 +26,7 @@ namespace FdbApiTester { class ApiCorrectnessWorkload : public WorkloadBase { public: - ApiCorrectnessWorkload() : numTxLeft(10) {} + ApiCorrectnessWorkload() : numTxLeft(1000) {} void start() override { schedule([this]() { nextTransaction(); }); @@ -34,7 +34,9 @@ public: private: void nextTransaction() { - std::cout << numTxLeft << " transactions left" << std::endl; + if (numTxLeft % 100 == 0) { + std::cout << numTxLeft << " transactions left" << std::endl; + } if (numTxLeft == 0) return; @@ -54,8 +56,8 @@ private: int numTxLeft; }; -std::unique_ptr createApiCorrectnessWorkload() { - return std::make_unique(); +std::shared_ptr createApiCorrectnessWorkload() { + return std::make_shared(); } } // namespace FdbApiTester \ No newline at end of file diff --git a/bindings/c/test/apitester/TesterScheduler.cpp b/bindings/c/test/apitester/TesterScheduler.cpp index 14661b04b2..7038cd85bd 100644 --- a/bindings/c/test/apitester/TesterScheduler.cpp +++ b/bindings/c/test/apitester/TesterScheduler.cpp @@ -29,6 +29,8 @@ using namespace boost::asio; namespace FdbApiTester { +const TTaskFct NO_OP_TASK = []() {}; + class AsioScheduler : public IScheduler { public: AsioScheduler(int numThreads) : numThreads(numThreads) {} diff --git a/bindings/c/test/apitester/TesterScheduler.h b/bindings/c/test/apitester/TesterScheduler.h index 1486bfba02..676384fc9c 100644 --- a/bindings/c/test/apitester/TesterScheduler.h +++ b/bindings/c/test/apitester/TesterScheduler.h @@ -30,6 +30,8 @@ namespace FdbApiTester { using TTaskFct = std::function; +extern const TTaskFct NO_OP_TASK; + class IScheduler { public: virtual ~IScheduler() {} diff --git a/bindings/c/test/apitester/TesterWorkload.cpp b/bindings/c/test/apitester/TesterWorkload.cpp index c4269dd37b..5406c65635 100644 --- a/bindings/c/test/apitester/TesterWorkload.cpp +++ b/bindings/c/test/apitester/TesterWorkload.cpp @@ -20,36 +20,65 @@ #include "TesterWorkload.h" #include +#include namespace FdbApiTester { -void WorkloadBase::init(ITransactionExecutor* txExecutor, IScheduler* sched, TTaskFct cont) { - this->txExecutor = txExecutor; - this->scheduler = sched; - this->doneCont = cont; +void WorkloadBase::init(WorkloadManager* manager) { + this->manager = manager; } void WorkloadBase::schedule(TTaskFct task) { tasksScheduled++; - scheduler->schedule([this, task]() { + manager->scheduler->schedule([this, task]() { tasksScheduled--; task(); - contIfDone(); + checkIfDone(); }); } void WorkloadBase::execTransaction(std::shared_ptr tx, TTaskFct cont) { txRunning++; - txExecutor->execute(tx, [this, cont]() { + manager->txExecutor->execute(tx, [this, cont]() { txRunning--; cont(); - contIfDone(); + checkIfDone(); }); } -void WorkloadBase::contIfDone() { +void WorkloadBase::checkIfDone() { if (txRunning == 0 && tasksScheduled == 0) { - doneCont(); + manager->workloadDone(this); + } +} + +void WorkloadManager::add(std::shared_ptr workload, TTaskFct cont) { + std::unique_lock lock(mutex); + workloads[workload.get()] = WorkloadInfo{ workload, cont }; +} + +void WorkloadManager::run() { + for (auto iter : workloads) { + iter.first->init(this); + } + for (auto iter : workloads) { + iter.first->start(); + } + scheduler->join(); +} + +void WorkloadManager::workloadDone(IWorkload* workload) { + std::unique_lock lock(mutex); + auto iter = workloads.find(workload); + assert(iter != workloads.end()); + lock.unlock(); + iter->second.cont(); + lock.lock(); + workloads.erase(iter); + bool done = workloads.empty(); + lock.unlock(); + if (done) { + scheduler->stop(); } } diff --git a/bindings/c/test/apitester/TesterWorkload.h b/bindings/c/test/apitester/TesterWorkload.h index b6f57ced1b..53cc8637fb 100644 --- a/bindings/c/test/apitester/TesterWorkload.h +++ b/bindings/c/test/apitester/TesterWorkload.h @@ -25,20 +25,24 @@ #include "TesterTransactionExecutor.h" #include +#include +#include namespace FdbApiTester { +class WorkloadManager; + class IWorkload { public: virtual ~IWorkload() {} - virtual void init(ITransactionExecutor* txExecutor, IScheduler* sched, TTaskFct cont) = 0; + virtual void init(WorkloadManager* manager) = 0; virtual void start() = 0; }; class WorkloadBase : public IWorkload { public: - WorkloadBase() : txExecutor(nullptr), scheduler(nullptr), tasksScheduled(0), txRunning(0) {} - void init(ITransactionExecutor* txExecutor, IScheduler* sched, TTaskFct cont) override; + WorkloadBase() : manager(nullptr), tasksScheduled(0), txRunning(0) {} + void init(WorkloadManager* manager) override; protected: void schedule(TTaskFct task); @@ -46,16 +50,39 @@ protected: void execTransaction(TTxStartFct start, TTaskFct cont) { execTransaction(std::make_shared(start), cont); } - void contIfDone(); + void checkIfDone(); private: - ITransactionExecutor* txExecutor; - IScheduler* scheduler; - TTaskFct doneCont; + WorkloadManager* manager; std::atomic tasksScheduled; std::atomic txRunning; }; +class WorkloadManager { +public: + WorkloadManager(ITransactionExecutor* txExecutor, IScheduler* scheduler) + : txExecutor(txExecutor), scheduler(scheduler) {} + + void add(std::shared_ptr workload, TTaskFct cont = NO_OP_TASK); + void run(); + +private: + friend WorkloadBase; + + struct WorkloadInfo { + std::shared_ptr ref; + TTaskFct cont; + }; + + void workloadDone(IWorkload* workload); + + ITransactionExecutor* txExecutor; + IScheduler* scheduler; + + std::mutex mutex; + std::unordered_map workloads; +}; + } // namespace FdbApiTester #endif \ No newline at end of file diff --git a/bindings/c/test/apitester/fdb_c_api_tester.cpp b/bindings/c/test/apitester/fdb_c_api_tester.cpp index 134ec85281..8334149351 100644 --- a/bindings/c/test/apitester/fdb_c_api_tester.cpp +++ b/bindings/c/test/apitester/fdb_c_api_tester.cpp @@ -220,7 +220,7 @@ void fdb_check(fdb_error_t e) { } } // namespace -std::unique_ptr createApiCorrectnessWorkload(); +std::shared_ptr createApiCorrectnessWorkload(); } // namespace FdbApiTester @@ -248,11 +248,13 @@ void runApiCorrectness(TesterOptions& options) { std::unique_ptr txExecutor = createTransactionExecutor(); scheduler->start(); txExecutor->init(scheduler.get(), options.clusterFile.c_str(), txExecOptions); - std::unique_ptr workload = createApiCorrectnessWorkload(); - IScheduler* schedPtr = scheduler.get(); - workload->init(txExecutor.get(), schedPtr, [schedPtr]() { schedPtr->stop(); }); - workload->start(); - scheduler->join(); + + WorkloadManager workloadMgr(txExecutor.get(), scheduler.get()); + for (int i = 0; i < 10; i++) { + std::shared_ptr workload = createApiCorrectnessWorkload(); + workloadMgr.add(workload); + } + workloadMgr.run(); } int main(int argc, char** argv) {