Added testing and fixed some bugs in AsyncTaskThread

This commit is contained in:
sfc-gh-tclinkenbeard 2020-10-22 16:27:37 -07:00
parent 456a7c77a2
commit 4d127a29cc
3 changed files with 62 additions and 14 deletions

View File

@ -1,5 +1,5 @@
/*
* AsyncTaskThread.cpp
* AsyncTaskThread.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
@ -19,5 +19,35 @@
*/
#include "fdbclient/AsyncTaskThread.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const double AsyncTaskThread::meanDelay = 0.01;
namespace {
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, int* sum, int count) {
state int i = 0;
for (; i < count; ++i) {
wait(asyncTaskThread->execAsync([sum = sum] {
++(*sum);
return Void();
}));
}
return Void();
}
} // namespace
TEST_CASE("/asynctaskthread/add") {
state int sum = 0;
state AsyncTaskThread asyncTaskThread;
std::vector<Future<Void>> clients;
clients.reserve(10);
for (int i = 0; i < 10; ++i) {
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, 100));
}
wait(waitForAll(clients));
ASSERT(sum == 1000);
return Void();
}

View File

@ -33,16 +33,24 @@ class IAsyncTask {
public:
virtual void operator()() = 0;
virtual ~IAsyncTask() = default;
virtual bool isTerminate() const = 0;
};
class TerminateTask final : public IAsyncTask {
public:
void operator()() override { ASSERT(false); }
bool isTerminate() const { return true; }
};
template <class F>
class AsyncTask : public IAsyncTask {
class AsyncTask final : public IAsyncTask {
F func;
public:
AsyncTask(const F& func) : func(func) {}
void operator()() override { func(); }
bool isTerminate() const override { return false; }
};
class AsyncTaskThread {
@ -50,16 +58,17 @@ class AsyncTaskThread {
std::promise<void> wakeUp;
std::thread thread;
static void run(AsyncTaskThread* conn) {
static void run(AsyncTaskThread* self) {
while (true) {
std::shared_ptr<IAsyncTask> task;
{
if (conn->queue.canSleep()) {
conn->wakeUp.get_future().get();
conn->wakeUp = {};
}
task = conn->queue.pop().get();
if (self->queue.canSleep()) {
self->wakeUp.get_future().get();
self->wakeUp = {};
}
std::shared_ptr<IAsyncTask> task = self->queue.pop().get();
if (task->isTerminate()) {
return;
}
(*task)();
}
}
@ -75,6 +84,15 @@ class AsyncTaskThread {
public:
AsyncTaskThread() : thread([this] { run(this); }) {}
~AsyncTaskThread() {
if (queue.push(std::make_shared<TerminateTask>())) {
wakeUp.set_value();
}
// Warning: This destructor can hang if a task hangs, so it is
// up to the caller to prevent tasks from hanging indefinitely
thread.join();
}
template <class F>
auto execAsync(const F& func, TaskPriority priority = TaskPriority::DefaultOnMainThread)
-> Future<decltype(func())> {
@ -82,12 +100,12 @@ public:
return map(delayJittered(meanDelay), [func](Void _) { return func(); });
}
Promise<decltype(func())> promise;
addTask([&promise, &func, priority] {
addTask([promise, func, priority] {
try {
auto funcResult = func();
onMainThreadVoid([&promise, &funcResult] { promise.send(funcResult); }, nullptr, priority);
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, nullptr, priority);
} catch (Error& e) {
onMainThreadVoid([&promise, &e] { promise.sendError(e); }, nullptr, priority);
onMainThreadVoid([promise, e] { promise.sendError(e); }, nullptr, priority);
}
});
return promise.getFuture();

View File

@ -1,7 +1,7 @@
set(FDBCLIENT_SRCS
AsyncFileBlobStore.actor.cpp
AsyncFileBlobStore.actor.h
AsyncTaskThread.cpp
AsyncTaskThread.actor.cpp
AsyncTaskThread.h
Atomic.h
AutoPublicAddress.cpp