fix threadpool when not use mindrt

This commit is contained in:
yangjie159 2021-06-16 22:24:01 +08:00
parent 1c991331b9
commit 74e938cc77
3 changed files with 45 additions and 35 deletions

View File

@ -36,14 +36,13 @@ ActorThreadPool::~ActorThreadPool() {
void ActorThreadPool::AsyncRunMultiTask(Worker *worker) { void ActorThreadPool::AsyncRunMultiTask(Worker *worker) {
THREAD_RETURN_IF_NULL(worker); THREAD_RETURN_IF_NULL(worker);
while (alive_) { while (alive_) {
if (RunLocalKernelTask(worker) || RunPoolQueueActorTask()) { // only run either local KernelTask or PoolQueue ActorTask
// only run either local KernelTask or PoolQueue ActorTask bool busy = RunLocalKernelTask(worker) || RunPoolQueueActorTask();
} else { if (!busy) {
// wait until Actor enqueue or distribute KernelTask // wait until Actor enqueue or distribute KernelTask
worker->running = false;
std::unique_lock<std::mutex> _l(worker->mutex); std::unique_lock<std::mutex> _l(worker->mutex);
worker->cond_var.wait( worker->status = kThreadIdle;
_l, [&] { return worker->task != nullptr || (worker->running && !actor_queue_.empty()) || !alive_; }); worker->cond_var.wait(_l, [&] { return worker->status == kThreadBusy || !alive_; });
} }
} }
} }
@ -75,10 +74,11 @@ void ActorThreadPool::EnqueReadyActor(const ActorReference &actor) {
actor_queue_.push(actor); actor_queue_.push(actor);
} }
THREAD_INFO("actor[%s] enqueue success", actor->GetAID().Name().c_str()); THREAD_INFO("actor[%s] enqueue success", actor->GetAID().Name().c_str());
// active one free actor thread // active one idle actor thread if exist
for (size_t i = 0; i < actor_thread_num_; ++i) { for (size_t i = 0; i < actor_thread_num_; ++i) {
bool expected = false; std::lock_guard<std::mutex> _l(workers_[i]->mutex);
if (workers_[i]->running.compare_exchange_strong(expected, true)) { if (workers_[i]->status == kThreadIdle) {
workers_[i]->status = kThreadBusy;
workers_[i]->cond_var.notify_one(); workers_[i]->cond_var.notify_one();
break; break;
} }

View File

@ -68,9 +68,8 @@ void ThreadPool::AsyncRunTask(Worker *worker) const {
} }
if (worker->spin >= kDefaultSpinCount) { if (worker->spin >= kDefaultSpinCount) {
// wait until distribute KernelTask // wait until distribute KernelTask
worker->spin = 0;
std::unique_lock<std::mutex> _l(worker->mutex); std::unique_lock<std::mutex> _l(worker->mutex);
worker->cond_var.wait(_l, [&] { return worker->running || !alive_; }); worker->cond_var.wait(_l, [&] { return worker->status == kThreadBusy || !alive_; });
} }
} }
} }
@ -78,7 +77,7 @@ void ThreadPool::AsyncRunTask(Worker *worker) const {
void ThreadPool::YieldAndDeactive(Worker *worker) const { void ThreadPool::YieldAndDeactive(Worker *worker) const {
// deactivate this worker only on the first entry // deactivate this worker only on the first entry
if (worker->spin == 0) { if (worker->spin == 0) {
worker->running = false; worker->status.store(kThreadIdle);
} }
worker->spin++; worker->spin++;
std::this_thread::yield(); std::this_thread::yield();
@ -101,12 +100,8 @@ int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num)
// if the task num is greater than the KernelThread num // if the task num is greater than the KernelThread num
THREAD_INFO("launch: %d", task_num); THREAD_INFO("launch: %d", task_num);
Task task = Task(func, content); Task task = Task(func, content);
Worker *curr = CurrentWorker();
if (curr == nullptr) { DistributeTask(&task, task_num);
SyncRunTask(&task, task_num);
} else {
DistributeTask(&task, task_num);
}
// synchronization // synchronization
// wait until the finished is equal to task_num // wait until the finished is equal to task_num
while (task.finished != task_num) { while (task.finished != task_num) {
@ -134,28 +129,34 @@ void ThreadPool::SyncRunTask(Task *task, int task_num) const {
void ThreadPool::DistributeTask(Task *task, int task_num) const { void ThreadPool::DistributeTask(Task *task, int task_num) const {
Worker *curr = CurrentWorker(); Worker *curr = CurrentWorker();
THREAD_RETURN_IF_NULL(curr); // if the current thread isn't nullptr, that is the curr is a ActorThread,
// then the count is equal to 1. otherwise the count is equal to 0
int count = 1; int count = curr != nullptr ? 1 : 0;
int sum_frequency = 0; int sum_frequency = 0;
std::vector<Worker *> assigned; std::vector<Worker *> assigned;
int num = static_cast<int>(workers_.size()) - 1; int num = static_cast<int>(workers_.size()) - 1;
for (int i = num; i >= 0 && count < task_num; --i) { for (int i = num; i >= 0 && count < task_num; --i) {
bool expected = false; int expected = kThreadIdle;
if (workers_[i]->running.compare_exchange_strong(expected, true)) { if (workers_[i]->status.compare_exchange_strong(expected, kThreadHeld)) {
assigned.push_back(workers_[i]); assigned.push_back(workers_[i]);
sum_frequency += workers_[i]->frequency; sum_frequency += workers_[i]->frequency;
count++; count++;
} }
} }
assigned.push_back(curr); // when there are not enough free threads,
for (; count < task_num; ++count) { // distribute other tasks to the master thread
if (curr != nullptr) {
assigned.push_back(curr); assigned.push_back(curr);
sum_frequency += curr->frequency; for (; count < task_num; ++count) {
assigned.push_back(curr);
sum_frequency += curr->frequency;
}
} else if (assigned.size() != static_cast<size_t>(task_num)) {
SyncRunTask(task, task_num);
return;
} }
CalculateScales(assigned, sum_frequency); CalculateScales(assigned, sum_frequency);
ActiveWorkers(assigned, task, task_num); ActiveWorkers(assigned, task, task_num, curr);
} }
void ThreadPool::CalculateScales(const std::vector<Worker *> &assigned, int sum_frequency) const { void ThreadPool::CalculateScales(const std::vector<Worker *> &assigned, int sum_frequency) const {
@ -170,13 +171,17 @@ void ThreadPool::CalculateScales(const std::vector<Worker *> &assigned, int sum_
} }
} }
void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num) const { void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num,
Worker *curr = workers.back(); const Worker *curr) const {
for (int i = 0; i < task_num; ++i) { for (int i = 0; i < task_num; ++i) {
Worker *worker = workers[i]; Worker *worker = workers[i];
THREAD_RETURN_IF_NULL(worker); THREAD_RETURN_IF_NULL(worker);
worker->task_id.store(i, std::memory_order_relaxed); {
worker->task.store(task, std::memory_order_relaxed); std::lock_guard<std::mutex> _l(worker->mutex);
worker->task_id.store(i, std::memory_order_relaxed);
worker->task.store(task, std::memory_order_relaxed);
worker->status = kThreadBusy;
}
worker->cond_var.notify_one(); worker->cond_var.notify_one();
if (worker == curr) { if (worker == curr) {
RunLocalKernelTask(worker); RunLocalKernelTask(worker);

View File

@ -45,9 +45,14 @@ typedef struct Task {
std::atomic_int status{THREAD_OK}; // return status, RET_OK std::atomic_int status{THREAD_OK}; // return status, RET_OK
} Task; } Task;
// busy, the thread is running task
// held, the thread has been marked as occupied
// idle, the thread is waiting
enum ThreadStatus { kThreadBusy = 0, kThreadHeld = 1, kThreadIdle = 2 };
typedef struct Worker { typedef struct Worker {
std::thread thread; std::thread thread;
std::atomic_bool running{false}; std::atomic_int status{kThreadBusy};
std::mutex mutex; std::mutex mutex;
std::condition_variable cond_var; std::condition_variable cond_var;
std::atomic<Task *> task{nullptr}; std::atomic<Task *> task{nullptr};
@ -84,9 +89,9 @@ class ThreadPool {
void DistributeTask(Task *task, int task_num) const; void DistributeTask(Task *task, int task_num) const;
void CalculateScales(const std::vector<Worker *> &workers, int sum_frequency) const; void CalculateScales(const std::vector<Worker *> &workers, int sum_frequency) const;
void ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num) const; void ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num, const Worker *curr) const;
void YieldAndDeactive(Worker *worker) const; void YieldAndDeactive(Worker *worker) const;
bool RunLocalKernelTask(Worker *worker) const; bool RunLocalKernelTask(Worker *worker) const;
Worker *CurrentWorker() const; Worker *CurrentWorker() const;