[Support] Simplify and optimize ThreadPool

* Merge QueueLock and CompletionLock.
* Avoid spurious CompletionCondition.notify_all() when ActiveThreads is greater than 0.
* Use default member initializers.

Reviewed By: mehdi_amini

Differential Revision: https://reviews.llvm.org/D78856
This commit is contained in:
Fangrui Song 2020-04-24 20:50:06 -07:00
parent 03ffe58605
commit 6f23049119
2 changed files with 17 additions and 21 deletions

View File

@ -72,6 +72,8 @@ public:
unsigned getThreadCount() const { return ThreadCount; }
private:
bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
std::shared_future<void> asyncImpl(TaskTy F);
@ -86,16 +88,15 @@ private:
std::mutex QueueLock;
std::condition_variable QueueCondition;
/// Locking and signaling for job completion
std::mutex CompletionLock;
/// Signaling for job completion
std::condition_variable CompletionCondition;
/// Keep track of the number of thread actually busy
std::atomic<unsigned> ActiveThreads;
unsigned ActiveThreads = 0;
#if LLVM_ENABLE_THREADS // avoids warning for unused variable
/// Signal for the destruction of the pool, asking thread to exit.
bool EnableFlag;
bool EnableFlag = true;
#endif
unsigned ThreadCount;

View File

@ -21,8 +21,7 @@ using namespace llvm;
#if LLVM_ENABLE_THREADS
ThreadPool::ThreadPool(ThreadPoolStrategy S)
: ActiveThreads(0), EnableFlag(true),
ThreadCount(S.compute_thread_count()) {
: ThreadCount(S.compute_thread_count()) {
// Create ThreadCount threads that will loop forever, wait on QueueCondition
// for tasks to be queued or the Pool to be destroyed.
Threads.reserve(ThreadCount);
@ -44,24 +43,24 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S)
// We first need to signal that we are active before popping the queue
// in order for wait() to properly detect that even if the queue is
// empty, there is still a task in flight.
{
std::unique_lock<std::mutex> LockGuard(CompletionLock);
++ActiveThreads;
}
++ActiveThreads;
Task = std::move(Tasks.front());
Tasks.pop();
}
// Run the task we just grabbed
Task();
bool Notify;
{
// Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
std::unique_lock<std::mutex> LockGuard(CompletionLock);
std::lock_guard<std::mutex> LockGuard(QueueLock);
--ActiveThreads;
Notify = workCompletedUnlocked();
}
// Notify task completion, in case someone waits on ThreadPool::wait()
CompletionCondition.notify_all();
// Notify task completion if this is the last active thread, in case
// someone waits on ThreadPool::wait().
if (Notify)
CompletionCondition.notify_all();
}
});
}
@ -69,12 +68,8 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S)
void ThreadPool::wait() {
// Wait for all threads to complete and the queue to be empty
std::unique_lock<std::mutex> LockGuard(CompletionLock);
// The order of the checks for ActiveThreads and Tasks.empty() matters because
// any active threads might be modifying the Tasks queue, and this would be a
// race.
CompletionCondition.wait(LockGuard,
[&] { return !ActiveThreads && Tasks.empty(); });
std::unique_lock<std::mutex> LockGuard(QueueLock);
CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); });
}
std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
@ -109,7 +104,7 @@ ThreadPool::~ThreadPool() {
// No threads are launched, issue a warning if ThreadCount is not 0
ThreadPool::ThreadPool(ThreadPoolStrategy S)
: ActiveThreads(0), ThreadCount(S.compute_thread_count()) {
: ThreadCount(S.compute_thread_count()) {
if (ThreadCount != 1) {
errs() << "Warning: request a ThreadPool with " << ThreadCount
<< " threads, but LLVM_ENABLE_THREADS has been turned off\n";