Api Tester: print workload progress statistics in regular time intervals
This commit is contained in:
parent
8e2a78bf3c
commit
9966e3f7f7
|
@ -48,6 +48,7 @@ public:
|
|||
int numClientThreads;
|
||||
int numDatabases;
|
||||
int numClients;
|
||||
int statsIntervalMs = 0;
|
||||
std::vector<std::pair<std::string, std::string>> knobs;
|
||||
TestSpec testSpec;
|
||||
std::string bgBasePath;
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "TesterScheduler.h"
|
||||
#include "TesterUtil.h"
|
||||
|
||||
#include <boost/asio/detail/chrono.hpp>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <boost/asio.hpp>
|
||||
|
@ -31,6 +32,15 @@ namespace FdbApiTester {
|
|||
|
||||
const TTaskFct NO_OP_TASK = []() {};
|
||||
|
||||
class AsioTimer : public ITimer {
|
||||
public:
|
||||
AsioTimer(io_context& io_ctx, chrono::steady_clock::duration time) : impl(io_ctx, time) {}
|
||||
|
||||
void cancel() override { impl.cancel(); }
|
||||
|
||||
boost::asio::steady_timer impl;
|
||||
};
|
||||
|
||||
class AsioScheduler : public IScheduler {
|
||||
public:
|
||||
AsioScheduler(int numThreads) : numThreads(numThreads) {}
|
||||
|
@ -44,6 +54,16 @@ public:
|
|||
|
||||
void schedule(TTaskFct task) override { post(io_ctx, task); }
|
||||
|
||||
std::unique_ptr<ITimer> scheduleWithDelay(int delayMs, TTaskFct task) override {
|
||||
auto timer = std::make_unique<AsioTimer>(io_ctx, boost::asio::chrono::milliseconds(delayMs));
|
||||
timer->impl.async_wait([task](const boost::system::error_code& e) {
|
||||
if (!e) {
|
||||
task();
|
||||
}
|
||||
});
|
||||
return timer;
|
||||
}
|
||||
|
||||
void stop() override { work = any_io_executor(); }
|
||||
|
||||
void join() override {
|
||||
|
|
|
@ -32,6 +32,16 @@ using TTaskFct = std::function<void(void)>;
|
|||
|
||||
extern const TTaskFct NO_OP_TASK;
|
||||
|
||||
/**
|
||||
* Handle to a scheduled timer
|
||||
*/
|
||||
class ITimer {
|
||||
public:
|
||||
virtual ~ITimer() {}
|
||||
|
||||
virtual void cancel() = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Scheduler for asynchronous execution of tasks on a pool of threads
|
||||
*/
|
||||
|
@ -45,6 +55,9 @@ public:
|
|||
// Schedule a task for asynchronous execution
|
||||
virtual void schedule(TTaskFct task) = 0;
|
||||
|
||||
// Schedule a task to be executed with a given delay
|
||||
virtual std::unique_ptr<ITimer> scheduleWithDelay(int delayMs, TTaskFct task) = 0;
|
||||
|
||||
// Gracefully stop the scheduler. Waits for already running tasks to be finish
|
||||
virtual void stop() = 0;
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ bool WorkloadConfig::getBoolOption(const std::string& name, bool defaultVal) con
|
|||
|
||||
WorkloadBase::WorkloadBase(const WorkloadConfig& config)
|
||||
: manager(nullptr), tasksScheduled(0), numErrors(0), clientId(config.clientId), numClients(config.numClients),
|
||||
failed(false) {
|
||||
failed(false), numTxCompleted(0) {
|
||||
maxErrors = config.getIntOption("maxErrors", 10);
|
||||
workloadId = fmt::format("{}{}", config.name, clientId);
|
||||
}
|
||||
|
@ -89,6 +89,10 @@ void WorkloadBase::init(WorkloadManager* manager) {
|
|||
this->manager = manager;
|
||||
}
|
||||
|
||||
void WorkloadBase::printStats() {
|
||||
info(fmt::format("{} transactions completed", numTxCompleted.load()));
|
||||
}
|
||||
|
||||
void WorkloadBase::schedule(TTaskFct task) {
|
||||
if (failed) {
|
||||
return;
|
||||
|
@ -106,6 +110,7 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskF
|
|||
}
|
||||
tasksScheduled++;
|
||||
manager->txExecutor->execute(tx, [this, tx, cont, failOnError]() {
|
||||
numTxCompleted++;
|
||||
fdb_error_t err = tx->getErrorCode();
|
||||
if (tx->getErrorCode() == error_code_success) {
|
||||
cont();
|
||||
|
@ -198,6 +203,9 @@ void WorkloadManager::workloadDone(IWorkload* workload, bool failed) {
|
|||
bool done = workloads.empty();
|
||||
lock.unlock();
|
||||
if (done) {
|
||||
if (statsTimer) {
|
||||
statsTimer->cancel();
|
||||
}
|
||||
scheduler->stop();
|
||||
}
|
||||
}
|
||||
|
@ -241,6 +249,24 @@ void WorkloadManager::readControlInput(std::string pipeName) {
|
|||
}
|
||||
}
|
||||
|
||||
void WorkloadManager::schedulePrintStatistics(int timeIntervalMs) {
|
||||
statsTimer = scheduler->scheduleWithDelay(timeIntervalMs, [this, timeIntervalMs]() {
|
||||
for (auto workload : getActiveWorkloads()) {
|
||||
workload->printStats();
|
||||
}
|
||||
this->schedulePrintStatistics(timeIntervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<IWorkload>> WorkloadManager::getActiveWorkloads() {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::vector<std::shared_ptr<IWorkload>> res;
|
||||
for (auto iter : workloads) {
|
||||
res.push_back(iter.second.ref);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void WorkloadManager::handleStopCommand() {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
for (auto& iter : workloads) {
|
||||
|
|
|
@ -62,6 +62,9 @@ public:
|
|||
|
||||
// Get workload control interface if supported, nullptr otherwise
|
||||
virtual IWorkloadControlIfc* getControlIfc() = 0;
|
||||
|
||||
// Print workload statistics
|
||||
virtual void printStats() = 0;
|
||||
};
|
||||
|
||||
// Workload configuration
|
||||
|
@ -100,6 +103,8 @@ public:
|
|||
|
||||
std::string getWorkloadId() override { return workloadId; }
|
||||
|
||||
void printStats() override;
|
||||
|
||||
protected:
|
||||
// Schedule the a task as a part of the workload
|
||||
void schedule(TTaskFct task);
|
||||
|
@ -150,6 +155,9 @@ protected:
|
|||
|
||||
// Workload is failed, no further transactions or continuations will be scheduled by the workload
|
||||
std::atomic<bool> failed;
|
||||
|
||||
// Number of completed transactions
|
||||
std::atomic<int> numTxCompleted;
|
||||
};
|
||||
|
||||
// Workload manager
|
||||
|
@ -175,6 +183,9 @@ public:
|
|||
return numWorkloadsFailed > 0;
|
||||
}
|
||||
|
||||
// Schedule statistics to be printed in regular timeintervals
|
||||
void schedulePrintStatistics(int timeIntervalMs);
|
||||
|
||||
private:
|
||||
friend WorkloadBase;
|
||||
|
||||
|
@ -205,6 +216,9 @@ private:
|
|||
// Handle CHECK command received from the test controller
|
||||
void handleCheckCommand();
|
||||
|
||||
// A thread-safe operation to return a list of active workloads
|
||||
std::vector<std::shared_ptr<IWorkload>> getActiveWorkloads();
|
||||
|
||||
// Transaction executor to be used by the workloads
|
||||
ITransactionExecutor* txExecutor;
|
||||
|
||||
|
@ -225,6 +239,9 @@ private:
|
|||
|
||||
// Output pipe for emitting test control events
|
||||
std::ofstream outputPipe;
|
||||
|
||||
// Timer for printing statistics in regular intervals
|
||||
std::unique_ptr<ITimer> statsTimer;
|
||||
};
|
||||
|
||||
// A workload factory
|
||||
|
|
|
@ -53,7 +53,8 @@ enum TesterOptionId {
|
|||
OPT_OUTPUT_PIPE,
|
||||
OPT_FDB_API_VERSION,
|
||||
OPT_TRANSACTION_RETRY_LIMIT,
|
||||
OPT_BLOB_GRANULE_LOCAL_FILE_PATH
|
||||
OPT_BLOB_GRANULE_LOCAL_FILE_PATH,
|
||||
OPT_STATS_INTERVAL
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption TesterOptionDefs[] = //
|
||||
|
@ -77,6 +78,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
|
|||
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
|
||||
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
|
||||
{ OPT_BLOB_GRANULE_LOCAL_FILE_PATH, "--blob-granule-local-file-path", SO_REQ_SEP },
|
||||
{ OPT_STATS_INTERVAL, "--stats-interval", SO_REQ_SEP },
|
||||
SO_END_OF_OPTIONS };
|
||||
|
||||
void printProgramUsage(const char* execName) {
|
||||
|
@ -118,6 +120,8 @@ void printProgramUsage(const char* execName) {
|
|||
" Path to blob granule files on local filesystem\n"
|
||||
" -f, --test-file FILE\n"
|
||||
" Test file to run.\n"
|
||||
" --stats-interval MILLISECONDS\n"
|
||||
" Time interval in milliseconds for printing workload statistics (default: 0 - disabled).\n"
|
||||
" -h, --help Display this help and exit.\n",
|
||||
FDB_API_VERSION);
|
||||
}
|
||||
|
@ -214,6 +218,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
|
|||
case OPT_BLOB_GRANULE_LOCAL_FILE_PATH:
|
||||
options.bgBasePath = args.OptionArg();
|
||||
break;
|
||||
case OPT_STATS_INTERVAL:
|
||||
processIntOption(args.OptionText(), args.OptionArg(), 0, 60000, options.statsIntervalMs);
|
||||
break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -335,6 +342,9 @@ bool runWorkloads(TesterOptions& options) {
|
|||
}
|
||||
|
||||
scheduler->start();
|
||||
if (options.statsIntervalMs) {
|
||||
workloadMgr.schedulePrintStatistics(options.statsIntervalMs);
|
||||
}
|
||||
workloadMgr.run();
|
||||
return !workloadMgr.failed();
|
||||
} catch (const std::runtime_error& err) {
|
||||
|
|
|
@ -30,6 +30,8 @@ import glob
|
|||
import random
|
||||
import string
|
||||
|
||||
TESTER_STATS_INTERVAL_SEC = 5
|
||||
|
||||
|
||||
def random_string(len):
|
||||
return ''.join(random.choice(string.ascii_letters + string.digits) for i in range(len))
|
||||
|
@ -66,7 +68,8 @@ def dump_client_logs(log_dir):
|
|||
def run_tester(args, test_file):
|
||||
cmd = [args.tester_binary,
|
||||
"--cluster-file", args.cluster_file,
|
||||
"--test-file", test_file]
|
||||
"--test-file", test_file,
|
||||
"--stats-interval", str(TESTER_STATS_INTERVAL_SEC*1000)]
|
||||
if args.external_client_library is not None:
|
||||
cmd += ["--external-client-library", args.external_client_library]
|
||||
if args.tmp_dir is not None:
|
||||
|
|
|
@ -73,6 +73,7 @@ LOCAL_OLD_BINARY_REPO = "/opt/foundationdb/old/"
|
|||
CURRENT_VERSION = "7.2.0"
|
||||
HEALTH_CHECK_TIMEOUT_SEC = 5
|
||||
PROGRESS_CHECK_TIMEOUT_SEC = 30
|
||||
TESTER_STATS_INTERVAL_SEC = 5
|
||||
TRANSACTION_RETRY_LIMIT = 100
|
||||
MAX_DOWNLOAD_ATTEMPTS = 5
|
||||
RUN_WITH_GDB = False
|
||||
|
@ -398,6 +399,8 @@ class UpgradeTest:
|
|||
self.tmp_dir,
|
||||
"--transaction-retry-limit",
|
||||
str(TRANSACTION_RETRY_LIMIT),
|
||||
"--stats-interval",
|
||||
str(TESTER_STATS_INTERVAL_SEC*1000)
|
||||
]
|
||||
if RUN_WITH_GDB:
|
||||
cmd_args = ["gdb", "-ex", "run", "--args"] + cmd_args
|
||||
|
|
Loading…
Reference in New Issue