forked from OSchip/llvm-project
Add a TaskQueue that can serialize work on a ThreadPool.
We have ThreadPool, which can execute work asynchronously on N background threads, but sometimes you need to make sure the work is executed asynchronously but also serially. That is, if task B is enqueued after task A, then task B should not begin until task A has completed. This patch adds such a class. Differential Revision: https://reviews.llvm.org/D48240 llvm-svn: 335440
This commit is contained in:
parent
04c4894911
commit
35169f6698
|
@ -0,0 +1,137 @@
|
|||
//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
|
||||
//
|
||||
// The LLVM Compiler Infrastructure
|
||||
//
|
||||
// This file is distributed under the University of Illinois Open Source
|
||||
// License. See LICENSE.TXT for details.
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This file defines a crude C++11 based task queue.
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#ifndef LLVM_SUPPORT_TASK_QUEUE_H
|
||||
#define LLVM_SUPPORT_TASK_QUEUE_H
|
||||
|
||||
#include "llvm/Config/llvm-config.h"
|
||||
#include "llvm/Support/ThreadPool.h"
|
||||
#include "llvm/Support/thread.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
namespace llvm {
|
||||
/// TaskQueue executes serialized work on a user-defined Thread Pool. It
|
||||
/// guarantees that if task B is enqueued after task A, task B begins after
|
||||
/// task A completes and there is no overlap between the two.
|
||||
class TaskQueue {
|
||||
// Because we don't have init capture to use move-only local variables that
|
||||
// are captured into a lambda, we create the promise inside an explicit
|
||||
// callable struct. We want to do as much of the wrapping in the
|
||||
// type-specialized domain (before type erasure) and then erase this into a
|
||||
// std::function.
|
||||
template <typename Callable> struct Task {
|
||||
using ResultTy = typename std::result_of<Callable()>::type;
|
||||
explicit Task(Callable C, TaskQueue &Parent)
|
||||
: C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
|
||||
Parent(&Parent) {}
|
||||
|
||||
template <typename T> void invokeCallbackAndSetPromise() {
|
||||
P->set_value(C());
|
||||
}
|
||||
|
||||
template <> void invokeCallbackAndSetPromise<void>() {
|
||||
C();
|
||||
P->set_value();
|
||||
}
|
||||
|
||||
void operator()() noexcept {
|
||||
invokeCallbackAndSetPromise<ResultTy>();
|
||||
Parent->completeTask();
|
||||
}
|
||||
|
||||
Callable C;
|
||||
std::shared_ptr<std::promise<ResultTy>> P;
|
||||
TaskQueue *Parent;
|
||||
};
|
||||
|
||||
public:
|
||||
/// Construct a task queue with no work.
|
||||
TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
|
||||
|
||||
/// Blocking destructor: the queue will wait for all work to complete.
|
||||
~TaskQueue() {
|
||||
Scheduler.wait();
|
||||
assert(Tasks.empty());
|
||||
}
|
||||
|
||||
/// Asynchronous submission of a task to the queue. The returned future can be
|
||||
/// used to wait for the task (and all previous tasks that have not yet
|
||||
/// completed) to finish.
|
||||
template <typename Callable>
|
||||
std::future<typename std::result_of<Callable()>::type> async(Callable &&C) {
|
||||
#if !LLVM_ENABLE_THREADS
|
||||
static_assert(false,
|
||||
"TaskQueue requires building with LLVM_ENABLE_THREADS!");
|
||||
#endif
|
||||
Task<Callable> T{std::move(C), *this};
|
||||
using ResultTy = typename std::result_of<Callable()>::type;
|
||||
std::future<ResultTy> F = T.P->get_future();
|
||||
{
|
||||
std::lock_guard<std::mutex> Lock(QueueLock);
|
||||
// If there's already a task in flight, just queue this one up. If
|
||||
// there is not a task in flight, bypass the queue and schedule this
|
||||
// task immediately.
|
||||
if (IsTaskInFlight)
|
||||
Tasks.push_back(std::move(T));
|
||||
else {
|
||||
Scheduler.async(std::move(T));
|
||||
IsTaskInFlight = true;
|
||||
}
|
||||
}
|
||||
return std::move(F);
|
||||
}
|
||||
|
||||
private:
|
||||
void completeTask() {
|
||||
// We just completed a task. If there are no more tasks in the queue,
|
||||
// update IsTaskInFlight to false and stop doing work. Otherwise
|
||||
// schedule the next task (while not holding the lock).
|
||||
std::function<void()> Continuation;
|
||||
{
|
||||
std::lock_guard<std::mutex> Lock(QueueLock);
|
||||
if (Tasks.empty()) {
|
||||
IsTaskInFlight = false;
|
||||
return;
|
||||
}
|
||||
|
||||
Continuation = std::move(Tasks.front());
|
||||
Tasks.pop_front();
|
||||
}
|
||||
Scheduler.async(std::move(Continuation));
|
||||
}
|
||||
|
||||
/// The thread pool on which to run the work.
|
||||
ThreadPool &Scheduler;
|
||||
|
||||
/// State which indicates whether the queue currently is currently processing
|
||||
/// any work.
|
||||
bool IsTaskInFlight = false;
|
||||
|
||||
/// Mutex for synchronizing access to the Tasks array.
|
||||
std::mutex QueueLock;
|
||||
|
||||
/// Tasks waiting for execution in the queue.
|
||||
std::deque<std::function<void()>> Tasks;
|
||||
};
|
||||
} // namespace llvm
|
||||
|
||||
#endif // LLVM_SUPPORT_TASK_QUEUE_H
|
|
@ -53,6 +53,7 @@ add_llvm_unittest(SupportTests
|
|||
SwapByteOrderTest.cpp
|
||||
TarWriterTest.cpp
|
||||
TargetParserTest.cpp
|
||||
TaskQueueTest.cpp
|
||||
ThreadLocalTest.cpp
|
||||
ThreadPool.cpp
|
||||
Threading.cpp
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========//
|
||||
//
|
||||
// The LLVM Compiler Infrastructure
|
||||
//
|
||||
// This file is distributed under the University of Illinois Open Source
|
||||
// License. See LICENSE.TXT for details.
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#include "llvm/Support/TaskQueue.h"
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
using namespace llvm;
|
||||
|
||||
#if LLVM_ENABLE_THREADS
|
||||
class TaskQueueTest : public testing::Test {
|
||||
protected:
|
||||
TaskQueueTest() {}
|
||||
};
|
||||
|
||||
TEST_F(TaskQueueTest, OrderedFutures) {
|
||||
ThreadPool TP(1);
|
||||
TaskQueue TQ(TP);
|
||||
std::atomic<int> X = 0;
|
||||
std::atomic<int> Y = 0;
|
||||
std::atomic<int> Z = 0;
|
||||
|
||||
std::mutex M1, M2, M3;
|
||||
std::unique_lock<std::mutex> L1(M1);
|
||||
std::unique_lock<std::mutex> L2(M2);
|
||||
std::unique_lock<std::mutex> L3(M3);
|
||||
|
||||
std::future<void> F1 = TQ.async([&] {
|
||||
std::unique_lock<std::mutex> Lock(M1);
|
||||
++X;
|
||||
});
|
||||
std::future<void> F2 = TQ.async([&] {
|
||||
std::unique_lock<std::mutex> Lock(M2);
|
||||
++Y;
|
||||
});
|
||||
std::future<void> F3 = TQ.async([&] {
|
||||
std::unique_lock<std::mutex> Lock(M3);
|
||||
++Z;
|
||||
});
|
||||
|
||||
L1.unlock();
|
||||
F1.wait();
|
||||
ASSERT_EQ(1, X);
|
||||
ASSERT_EQ(0, Y);
|
||||
ASSERT_EQ(0, Z);
|
||||
|
||||
L2.unlock();
|
||||
F2.wait();
|
||||
ASSERT_EQ(1, X);
|
||||
ASSERT_EQ(1, Y);
|
||||
ASSERT_EQ(0, Z);
|
||||
|
||||
L3.unlock();
|
||||
F3.wait();
|
||||
ASSERT_EQ(1, X);
|
||||
ASSERT_EQ(1, Y);
|
||||
ASSERT_EQ(1, Z);
|
||||
}
|
||||
|
||||
TEST_F(TaskQueueTest, UnOrderedFutures) {
|
||||
ThreadPool TP(1);
|
||||
TaskQueue TQ(TP);
|
||||
std::atomic<int> X = 0;
|
||||
std::atomic<int> Y = 0;
|
||||
std::atomic<int> Z = 0;
|
||||
std::mutex M;
|
||||
|
||||
std::unique_lock<std::mutex> Lock(M);
|
||||
|
||||
std::future<void> F1 = TQ.async([&] { ++X; });
|
||||
std::future<void> F2 = TQ.async([&] { ++Y; });
|
||||
std::future<void> F3 = TQ.async([&M, &Z] {
|
||||
std::unique_lock<std::mutex> Lock(M);
|
||||
++Z;
|
||||
});
|
||||
|
||||
F2.wait();
|
||||
ASSERT_EQ(1, X);
|
||||
ASSERT_EQ(1, Y);
|
||||
ASSERT_EQ(0, Z);
|
||||
|
||||
Lock.unlock();
|
||||
|
||||
F3.wait();
|
||||
ASSERT_EQ(1, X);
|
||||
ASSERT_EQ(1, Y);
|
||||
ASSERT_EQ(1, Z);
|
||||
}
|
||||
|
||||
TEST_F(TaskQueueTest, FutureWithReturnValue) {
|
||||
ThreadPool TP(1);
|
||||
TaskQueue TQ(TP);
|
||||
std::future<std::string> F1 = TQ.async([&] { return std::string("Hello"); });
|
||||
std::future<int> F2 = TQ.async([&] { return 42; });
|
||||
|
||||
ASSERT_EQ(42, F2.get());
|
||||
ASSERT_EQ("Hello", F1.get());
|
||||
}
|
||||
#endif
|
Loading…
Reference in New Issue