diff --git a/mindspore/ccsrc/runtime/framework/graph_scheduler.cc b/mindspore/ccsrc/runtime/framework/graph_scheduler.cc index aa78b74213f..ac6f438f559 100644 --- a/mindspore/ccsrc/runtime/framework/graph_scheduler.cc +++ b/mindspore/ccsrc/runtime/framework/graph_scheduler.cc @@ -342,7 +342,7 @@ void GraphScheduler::Initialize() { size_t actor_thread_num = 0; size_t OMP_thread_num = 0; ComputeThreadNums(&actor_thread_num, &OMP_thread_num); - thread_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num); + thread_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, kThreadWait); MS_EXCEPTION_IF_NULL(thread_pool_); std::string OMP_env = std::to_string(OMP_thread_num); common::SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0); diff --git a/mindspore/core/mindrt/src/thread/actor_threadpool.cc b/mindspore/core/mindrt/src/thread/actor_threadpool.cc index 738c4132172..fd728ee37d8 100644 --- a/mindspore/core/mindrt/src/thread/actor_threadpool.cc +++ b/mindspore/core/mindrt/src/thread/actor_threadpool.cc @@ -18,19 +18,23 @@ #include "thread/core_affinity.h" namespace mindspore { -void ActorWorker::CreateThread(ActorThreadPool *pool) { +void ActorWorker::CreateThread(ActorThreadPool *pool, ThreadPolicy policy) { + THREAD_RETURN_IF_NULL(pool); pool_ = pool; - thread_ = std::thread(&ActorWorker::Run, this); + if (policy == KThreadSpin) { + thread_ = std::thread(&ActorWorker::RunWithSpin, this); + } else if (policy == kThreadWait) { + thread_ = std::thread(&ActorWorker::RunWithWait, this); + } } -void ActorWorker::Run() { +void ActorWorker::RunWithSpin() { #ifndef __APPLE__ static std::atomic_int index = 0; pthread_setname_np(pthread_self(), ("ActorThread_" + std::to_string(index++)).c_str()); #endif while (alive_) { // only run either local KernelTask or PoolQueue ActorTask -#ifdef ENABLE_MINDRT if (RunLocalKernelTask() || RunQueueActorTask()) { spin_count_ = 0; } else { @@ -39,7 +43,16 @@ void ActorWorker::Run() { if (spin_count_ >= kDefaultSpinCount) { WaitUntilActive(); } -#else + } +} + +void ActorWorker::RunWithWait() { +#ifndef __APPLE__ + static std::atomic_int index = 0; + pthread_setname_np(pthread_self(), ("ActorThread_" + std::to_string(index++)).c_str()); +#endif + while (alive_) { + // only run either local KernelTask or PoolQueue ActorTask bool busy = RunLocalKernelTask() || RunQueueActorTask(); if (!busy) { // wait until enqueue ActorTask or distribute KernelTask @@ -47,7 +60,6 @@ void ActorWorker::Run() { status_ = kThreadIdle; cond_var_.wait(_l, [&] { return status_ == kThreadBusy || !alive_; }); } -#endif // ENABLE_MINDRT } } @@ -116,7 +128,7 @@ void ActorThreadPool::PushActorToQueue(const ActorReference &actor) { } } -int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num) { +int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num, ThreadPolicy policy) { size_t core_num = std::thread::hardware_concurrency(); THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num); actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num; @@ -128,7 +140,7 @@ int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_nu std::lock_guard _l(pool_mutex_); auto worker = new (std::nothrow) ActorWorker(); THREAD_ERROR_IF_NULL(worker); - worker->CreateThread(this); + worker->CreateThread(this, policy); workers_.push_back(worker); THREAD_INFO("create actor thread[%zu]", i); } @@ -139,12 +151,13 @@ int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_nu return THREAD_OK; } -ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num) { +ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, + ThreadPolicy policy) { ActorThreadPool *pool = new (std::nothrow) ActorThreadPool(); if (pool == nullptr) { return nullptr; } - int ret = pool->CreateThreads(actor_thread_num, all_thread_num); + int ret = pool->CreateThreads(actor_thread_num, all_thread_num, policy); if (ret != THREAD_OK) { delete pool; return nullptr; @@ -159,12 +172,12 @@ ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size return pool; } -ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num) { +ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num, ThreadPolicy policy) { ActorThreadPool *pool = new (std::nothrow) ActorThreadPool(); if (pool == nullptr) { return nullptr; } - int ret = pool->CreateThreads(thread_num, thread_num); + int ret = pool->CreateThreads(thread_num, thread_num, policy); if (ret != THREAD_OK) { delete pool; return nullptr; diff --git a/mindspore/core/mindrt/src/thread/actor_threadpool.h b/mindspore/core/mindrt/src/thread/actor_threadpool.h index 543bc60a83e..83291c8e9c9 100644 --- a/mindspore/core/mindrt/src/thread/actor_threadpool.h +++ b/mindspore/core/mindrt/src/thread/actor_threadpool.h @@ -25,32 +25,39 @@ #include "actor/actor.h" namespace mindspore { +enum ThreadPolicy { + KThreadSpin = 0, // thread run in spin + kThreadWait = 1 // synchronous and wait +}; + class ActorThreadPool; class ActorWorker : public Worker { public: - void CreateThread(ActorThreadPool *pool); + void CreateThread(ActorThreadPool *pool, ThreadPolicy policy); bool Active(); private: - void Run() override; + void RunWithWait(); + void RunWithSpin(); bool RunQueueActorTask(); + ActorThreadPool *pool_{nullptr}; }; class ActorThreadPool : public ThreadPool { public: // create ThreadPool that contains actor thread and kernel thread - static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num); + static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, ThreadPolicy policy); // create ThreadPool that contains only actor thread - static ActorThreadPool *CreateThreadPool(size_t thread_num); + static ActorThreadPool *CreateThreadPool(size_t thread_num, ThreadPolicy policy); ~ActorThreadPool() override; void PushActorToQueue(const ActorReference &actor); ActorReference PopActorFromQueue(); private: - int CreateThreads(size_t actor_thread_num, size_t all_thread_num); + int CreateThreads(size_t actor_thread_num, size_t all_thread_num, ThreadPolicy policy); size_t actor_thread_num_{0}; diff --git a/mindspore/core/mindrt/src/thread/threadpool.h b/mindspore/core/mindrt/src/thread/threadpool.h index e885c24000c..589a0d42e8d 100644 --- a/mindspore/core/mindrt/src/thread/threadpool.h +++ b/mindspore/core/mindrt/src/thread/threadpool.h @@ -75,7 +75,7 @@ class Worker { pthread_t handle() { return thread_.native_handle(); } protected: - virtual void Run(); + void Run(); void YieldAndDeactive(); void WaitUntilActive(); diff --git a/mindspore/lite/src/inner_context.cc b/mindspore/lite/src/inner_context.cc index cd3b9fb48ad..045d38de39c 100644 --- a/mindspore/lite/src/inner_context.cc +++ b/mindspore/lite/src/inner_context.cc @@ -67,7 +67,7 @@ int InnerContext::Init() { } if (this->thread_pool_ == nullptr && this->IsCpuEnabled()) { int actor_parallel_thread = this->enable_parallel_ ? 2 : 1; - thread_pool_ = ActorThreadPool::CreateThreadPool(actor_parallel_thread, this->thread_num_); + thread_pool_ = ActorThreadPool::CreateThreadPool(actor_parallel_thread, this->thread_num_, KThreadSpin); if (thread_pool_ == nullptr) { MS_LOG(ERROR) << "Create ThreadPool failed"; return RET_NULL_PTR; diff --git a/mindspore/lite/src/runtime/parallel_executor.cc b/mindspore/lite/src/runtime/parallel_executor.cc index 0f2ea41bc8f..6196f73fd03 100644 --- a/mindspore/lite/src/runtime/parallel_executor.cc +++ b/mindspore/lite/src/runtime/parallel_executor.cc @@ -23,7 +23,7 @@ ParallelExecutor::~ParallelExecutor() { delete thread_pool_; } int ParallelExecutor::Prepare(const std::vector &kernels, const std::vector &inputs, const std::vector &outputs, const lite::InnerContext *ctx) { - thread_pool_ = ActorThreadPool::CreateThreadPool(1, max_thread_num_); + thread_pool_ = ActorThreadPool::CreateThreadPool(1, max_thread_num_, KThreadSpin); if (thread_pool_ == nullptr) { MS_LOG(ERROR) << "Memory error: fail to new ThreadPool"; return RET_ERROR;