[clangd] Add priorities to background index queue, extract to separate class

Reviewers: kadircet

Subscribers: mgorny, ilya-biryukov, MaskRay, jkorous, arphaman, jfb, llvm-commits

Tags: #llvm

Differential Revision: https://reviews.llvm.org/D64560

llvm-svn: 365773
This commit is contained in:
Sam McCall 2019-07-11 13:34:38 +00:00
parent 9cf1303560
commit 7e27d86afb
6 changed files with 233 additions and 138 deletions

View File

@ -73,8 +73,9 @@ add_clang_library(clangDaemon
XRefs.cpp
index/Background.cpp
index/BackgroundRebuild.cpp
index/BackgroundIndexStorage.cpp
index/BackgroundQueue.cpp
index/BackgroundRebuild.cpp
index/CanonicalIncludes.cpp
index/FileIndex.cpp
index/Index.cpp

View File

@ -9,6 +9,7 @@
#include "index/Background.h"
#include "ClangdUnit.h"
#include "Compiler.h"
#include "Context.h"
#include "Headers.h"
#include "Logger.h"
#include "Path.h"
@ -33,8 +34,10 @@
#include "llvm/ADT/StringRef.h"
#include "llvm/ADT/StringSet.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/Path.h"
#include "llvm/Support/Threading.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
@ -50,8 +53,6 @@ namespace clang {
namespace clangd {
namespace {
static std::atomic<bool> PreventStarvation = {false};
// Resolves URI to file paths with cache.
class URIToFileCache {
public:
@ -134,8 +135,10 @@ BackgroundIndex::BackgroundIndex(
assert(ThreadPoolSize > 0 && "Thread pool size can't be zero.");
assert(this->IndexStorageFactory && "Storage factory can not be null!");
for (unsigned I = 0; I < ThreadPoolSize; ++I) {
ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1),
[this] { run(); });
ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1), [this] {
WithContext Ctx(this->BackgroundContext.clone());
Queue.work([&] { Rebuilder.idle(); });
});
}
}
@ -144,113 +147,42 @@ BackgroundIndex::~BackgroundIndex() {
ThreadPool.wait();
}
void BackgroundIndex::stop() {
Rebuilder.shutdown();
{
std::lock_guard<std::mutex> QueueLock(QueueMu);
std::lock_guard<std::mutex> IndexLock(IndexMu);
ShouldStop = true;
}
QueueCV.notify_all();
IndexCV.notify_all();
BackgroundQueue::Task BackgroundIndex::changedFilesTask(
const std::vector<std::string> &ChangedFiles) {
BackgroundQueue::Task T([this, ChangedFiles] {
trace::Span Tracer("BackgroundIndexEnqueue");
// We're doing this asynchronously, because we'll read shards here too.
log("Enqueueing {0} commands for indexing", ChangedFiles.size());
SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size()));
auto NeedsReIndexing = loadShards(std::move(ChangedFiles));
// Run indexing for files that need to be updated.
std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(),
std::mt19937(std::random_device{}()));
std::vector<BackgroundQueue::Task> Tasks;
Tasks.reserve(NeedsReIndexing.size());
for (auto &Elem : NeedsReIndexing)
Tasks.push_back(indexFileTask(std::move(Elem.first), Elem.second));
Queue.append(std::move(Tasks));
});
T.QueuePri = LoadShards;
T.ThreadPri = llvm::ThreadPriority::Default;
return T;
}
void BackgroundIndex::run() {
WithContext Background(BackgroundContext.clone());
while (true) {
llvm::Optional<Task> Task;
llvm::ThreadPriority Priority;
{
std::unique_lock<std::mutex> Lock(QueueMu);
QueueCV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); });
if (ShouldStop) {
Queue.clear();
QueueCV.notify_all();
return;
}
++NumActiveTasks;
std::tie(Task, Priority) = std::move(Queue.front());
Queue.pop_front();
}
if (Priority != llvm::ThreadPriority::Default && !PreventStarvation.load())
llvm::set_thread_priority(Priority);
(*Task)();
if (Priority != llvm::ThreadPriority::Default)
llvm::set_thread_priority(llvm::ThreadPriority::Default);
{
std::unique_lock<std::mutex> Lock(QueueMu);
if (NumActiveTasks == 1 && Queue.empty()) {
// We just finished the last item, the queue is going idle.
Lock.unlock();
Rebuilder.idle();
Lock.lock();
}
assert(NumActiveTasks > 0 && "before decrementing");
--NumActiveTasks;
}
QueueCV.notify_all();
}
}
bool BackgroundIndex::blockUntilIdleForTest(
llvm::Optional<double> TimeoutSeconds) {
std::unique_lock<std::mutex> Lock(QueueMu);
return wait(Lock, QueueCV, timeoutSeconds(TimeoutSeconds),
[&] { return Queue.empty() && NumActiveTasks == 0; });
}
void BackgroundIndex::enqueue(const std::vector<std::string> &ChangedFiles) {
enqueueTask(
[this, ChangedFiles] {
trace::Span Tracer("BackgroundIndexEnqueue");
// We're doing this asynchronously, because we'll read shards here too.
log("Enqueueing {0} commands for indexing", ChangedFiles.size());
SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size()));
auto NeedsReIndexing = loadShards(std::move(ChangedFiles));
// Run indexing for files that need to be updated.
std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(),
std::mt19937(std::random_device{}()));
for (auto &Elem : NeedsReIndexing)
enqueue(std::move(Elem.first), Elem.second);
},
llvm::ThreadPriority::Default);
}
void BackgroundIndex::enqueue(tooling::CompileCommand Cmd,
BackgroundIndexStorage *Storage) {
enqueueTask(Bind(
[this, Storage](tooling::CompileCommand Cmd) {
// We can't use llvm::StringRef here since we are going to
// move from Cmd during the call below.
const std::string FileName = Cmd.Filename;
if (auto Error = index(std::move(Cmd), Storage))
elog("Indexing {0} failed: {1}", FileName,
std::move(Error));
},
std::move(Cmd)),
llvm::ThreadPriority::Background);
}
void BackgroundIndex::enqueueTask(Task T, llvm::ThreadPriority Priority) {
{
std::lock_guard<std::mutex> Lock(QueueMu);
auto I = Queue.end();
// We first store the tasks with Normal priority in the front of the queue.
// Then we store low priority tasks. Normal priority tasks are pretty rare,
// they should not grow beyond single-digit numbers, so it is OK to do
// linear search and insert after that.
if (Priority == llvm::ThreadPriority::Default) {
I = llvm::find_if(
Queue, [](const std::pair<Task, llvm::ThreadPriority> &Elem) {
return Elem.second == llvm::ThreadPriority::Background;
});
}
Queue.insert(I, {std::move(T), Priority});
}
QueueCV.notify_all();
BackgroundQueue::Task
BackgroundIndex::indexFileTask(tooling::CompileCommand Cmd,
BackgroundIndexStorage *Storage) {
BackgroundQueue::Task T([this, Storage, Cmd] {
// We can't use llvm::StringRef here since we are going to
// move from Cmd during the call below.
const std::string FileName = Cmd.Filename;
if (auto Error = index(std::move(Cmd), Storage))
elog("Indexing {0} failed: {1}", FileName, std::move(Error));
});
T.QueuePri = IndexFile;
return T;
}
/// Given index results from a TU, only update symbols coming from files that
@ -649,9 +581,5 @@ BackgroundIndex::loadShards(std::vector<std::string> ChangedFiles) {
return NeedsReIndexing;
}
void BackgroundIndex::preventThreadStarvationInTests() {
PreventStarvation.store(true);
}
} // namespace clangd
} // namespace clang

View File

@ -25,6 +25,7 @@
#include <condition_variable>
#include <deque>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
@ -59,6 +60,47 @@ public:
static Factory createDiskBackedStorageFactory();
};
// A priority queue of tasks which can be run on (external) worker threads.
class BackgroundQueue {
public:
/// A work item on the thread pool's queue.
struct Task {
template <typename Func>
explicit Task(Func &&F) : Run(std::forward<Func>(F)){};
std::function<void()> Run;
llvm::ThreadPriority ThreadPri = llvm::ThreadPriority::Background;
// Higher-priority tasks will run first.
unsigned QueuePri = 0;
bool operator<(const Task &O) const { return QueuePri < O.QueuePri; }
};
// Add tasks to the queue.
void push(Task);
void append(std::vector<Task>);
// Process items on the queue until the queue is stopped.
// If the queue becomes empty, OnIdle will be called (on one worker).
void work(std::function<void()> OnIdle = nullptr);
// Stop processing new tasks, allowing all work() calls to return soon.
void stop();
// Disables thread priority lowering to ensure progress on loaded systems.
// Only affects tasks that run after the call.
static void preventThreadStarvationInTests();
LLVM_NODISCARD bool
blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds);
private:
std::mutex Mu;
unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks.
std::condition_variable CV;
bool ShouldStop = false;
std::vector<Task> Queue; // max-heap
};
// Builds an in-memory index by by running the static indexer action over
// all commands in a compilation database. Indexing happens in the background.
// FIXME: it should also persist its state on disk for fast start.
@ -78,19 +120,22 @@ public:
// Enqueue translation units for indexing.
// The indexing happens in a background thread, so the symbols will be
// available sometime later.
void enqueue(const std::vector<std::string> &ChangedFiles);
void enqueue(const std::vector<std::string> &ChangedFiles) {
Queue.push(changedFilesTask(ChangedFiles));
}
// Cause background threads to stop after ther current task, any remaining
// tasks will be discarded.
void stop();
void stop() {
Rebuilder.shutdown();
Queue.stop();
}
// Wait until the queue is empty, to allow deterministic testing.
LLVM_NODISCARD bool
blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds = 10);
// Disables thread priority lowering in background index to make sure it can
// progress on loaded systems. Only affects tasks that run after the call.
static void preventThreadStarvationInTests();
blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds = 10) {
return Queue.blockUntilIdleForTest(TimeoutSeconds);
}
private:
/// Represents the state of a single file when indexing was performed.
@ -111,11 +156,8 @@ private:
const GlobalCompilationDatabase &CDB;
Context BackgroundContext;
// index state
llvm::Error index(tooling::CompileCommand,
BackgroundIndexStorage *IndexStorage);
std::mutex IndexMu;
std::condition_variable IndexCV;
FileSymbols IndexedSymbols;
BackgroundIndexRebuilder Rebuilder;
@ -137,19 +179,18 @@ private:
// Tries to load shards for the ChangedFiles.
std::vector<std::pair<tooling::CompileCommand, BackgroundIndexStorage *>>
loadShards(std::vector<std::string> ChangedFiles);
void enqueue(tooling::CompileCommand Cmd, BackgroundIndexStorage *Storage);
// queue management
using Task = std::function<void()>;
void run(); // Main loop executed by Thread. Runs tasks from Queue.
void enqueueTask(Task T, llvm::ThreadPriority Prioirty);
void enqueueLocked(tooling::CompileCommand Cmd,
BackgroundIndexStorage *IndexStorage);
std::mutex QueueMu;
unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks.
std::condition_variable QueueCV;
bool ShouldStop = false;
std::deque<std::pair<Task, llvm::ThreadPriority>> Queue;
BackgroundQueue::Task
changedFilesTask(const std::vector<std::string> &ChangedFiles);
BackgroundQueue::Task indexFileTask(tooling::CompileCommand Cmd,
BackgroundIndexStorage *Storage);
// from lowest to highest priority
enum QueuePriority {
IndexFile,
LoadShards,
};
BackgroundQueue Queue;
AsyncTaskRunner ThreadPool;
GlobalCompilationDatabase::CommandChanged::Subscription CommandsChanged;
};

View File

@ -0,0 +1,93 @@
//===-- BackgroundQueue.cpp - Task queue for background index -------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#include "index/Background.h"
namespace clang {
namespace clangd {
static std::atomic<bool> PreventStarvation = {false};
void BackgroundQueue::preventThreadStarvationInTests() {
PreventStarvation.store(true);
}
void BackgroundQueue::work(std::function<void()> OnIdle) {
while (true) {
llvm::Optional<Task> Task;
{
std::unique_lock<std::mutex> Lock(Mu);
CV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); });
if (ShouldStop) {
Queue.clear();
CV.notify_all();
return;
}
++NumActiveTasks;
std::pop_heap(Queue.begin(), Queue.end());
Task = std::move(Queue.back());
Queue.pop_back();
}
if (Task->ThreadPri != llvm::ThreadPriority::Default &&
!PreventStarvation.load())
llvm::set_thread_priority(Task->ThreadPri);
Task->Run();
if (Task->ThreadPri != llvm::ThreadPriority::Default)
llvm::set_thread_priority(llvm::ThreadPriority::Default);
{
std::unique_lock<std::mutex> Lock(Mu);
if (NumActiveTasks == 1 && Queue.empty() && OnIdle) {
// We just finished the last item, the queue is going idle.
Lock.unlock();
OnIdle();
Lock.lock();
}
assert(NumActiveTasks > 0 && "before decrementing");
--NumActiveTasks;
}
CV.notify_all();
}
}
void BackgroundQueue::stop() {
{
std::lock_guard<std::mutex> QueueLock(Mu);
ShouldStop = true;
}
CV.notify_all();
}
void BackgroundQueue::push(Task T) {
{
std::lock_guard<std::mutex> Lock(Mu);
Queue.push_back(std::move(T));
std::push_heap(Queue.begin(), Queue.end());
}
CV.notify_all();
}
void BackgroundQueue::append(std::vector<Task> Tasks) {
{
std::lock_guard<std::mutex> Lock(Mu);
std::move(Tasks.begin(), Tasks.end(), std::back_inserter(Queue));
std::make_heap(Queue.begin(), Queue.end());
}
CV.notify_all();
}
bool BackgroundQueue::blockUntilIdleForTest(
llvm::Optional<double> TimeoutSeconds) {
std::unique_lock<std::mutex> Lock(Mu);
return wait(Lock, CV, timeoutSeconds(TimeoutSeconds),
[&] { return Queue.empty() && NumActiveTasks == 0; });
}
} // namespace clangd
} // namespace clang

View File

@ -356,7 +356,7 @@ int main(int argc, char *argv[]) {
LogLevel = Logger::Verbose;
PrettyPrint = true;
// Ensure background index makes progress.
BackgroundIndex::preventThreadStarvationInTests();
BackgroundQueue::preventThreadStarvationInTests();
}
if (Test || EnableTestScheme) {
static URISchemeRegistry::Add<TestScheme> X(

View File

@ -82,7 +82,7 @@ public:
class BackgroundIndexTest : public ::testing::Test {
protected:
BackgroundIndexTest() { BackgroundIndex::preventThreadStarvationInTests(); }
BackgroundIndexTest() { BackgroundQueue::preventThreadStarvationInTests(); }
};
TEST_F(BackgroundIndexTest, NoCrashOnErrorFile) {
@ -646,5 +646,37 @@ TEST_F(BackgroundIndexRebuilderTest, LoadingShards) {
EXPECT_TRUE(checkRebuild([&] { Rebuilder.doneLoading(); }));
}
TEST(BackgroundQueueTest, Priority) {
// Create high and low priority tasks.
// Once a bunch of high priority tasks have run, the queue is stopped.
// So the low priority tasks should never run.
BackgroundQueue Q;
std::atomic<unsigned> HiRan(0), LoRan(0);
BackgroundQueue::Task Lo([&] { ++LoRan; });
BackgroundQueue::Task Hi([&] {
if (++HiRan >= 10)
Q.stop();
});
Hi.QueuePri = 100;
// Enqueuing the low-priority ones first shouldn't make them run first.
Q.append(std::vector<BackgroundQueue::Task>(30, Lo));
for (unsigned I = 0; I < 30; ++I)
Q.push(Hi);
AsyncTaskRunner ThreadPool;
for (unsigned I = 0; I < 5; ++I)
ThreadPool.runAsync("worker", [&] { Q.work(); });
// We should test enqueue with active workers, but it's hard to avoid races.
// Just make sure we don't crash.
Q.push(Lo);
Q.append(std::vector<BackgroundQueue::Task>(2, Hi));
// After finishing, check the tasks that ran.
ThreadPool.wait();
EXPECT_GE(HiRan, 10u);
EXPECT_EQ(LoRan, 0u);
}
} // namespace clangd
} // namespace clang