!18626 support change spin count

Merge pull request !18626 from yangjie159/uniform_runtime
This commit is contained in:
i-robot 2021-06-23 01:22:02 +00:00 committed by Gitee
commit 2c2fbd2fc9
6 changed files with 41 additions and 21 deletions

View File

@ -342,7 +342,7 @@ void GraphScheduler::Initialize() {
size_t actor_thread_num = 0; size_t actor_thread_num = 0;
size_t OMP_thread_num = 0; size_t OMP_thread_num = 0;
ComputeThreadNums(&actor_thread_num, &OMP_thread_num); ComputeThreadNums(&actor_thread_num, &OMP_thread_num);
thread_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num); thread_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, kThreadWait);
MS_EXCEPTION_IF_NULL(thread_pool_); MS_EXCEPTION_IF_NULL(thread_pool_);
std::string OMP_env = std::to_string(OMP_thread_num); std::string OMP_env = std::to_string(OMP_thread_num);
common::SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0); common::SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0);

View File

@ -18,19 +18,23 @@
#include "thread/core_affinity.h" #include "thread/core_affinity.h"
namespace mindspore { namespace mindspore {
void ActorWorker::CreateThread(ActorThreadPool *pool) { void ActorWorker::CreateThread(ActorThreadPool *pool, ThreadPolicy policy) {
THREAD_RETURN_IF_NULL(pool);
pool_ = pool; pool_ = pool;
thread_ = std::thread(&ActorWorker::Run, this); if (policy == KThreadSpin) {
thread_ = std::thread(&ActorWorker::RunWithSpin, this);
} else if (policy == kThreadWait) {
thread_ = std::thread(&ActorWorker::RunWithWait, this);
}
} }
void ActorWorker::Run() { void ActorWorker::RunWithSpin() {
#ifndef __APPLE__ #ifndef __APPLE__
static std::atomic_int index = 0; static std::atomic_int index = 0;
pthread_setname_np(pthread_self(), ("ActorThread_" + std::to_string(index++)).c_str()); pthread_setname_np(pthread_self(), ("ActorThread_" + std::to_string(index++)).c_str());
#endif #endif
while (alive_) { while (alive_) {
// only run either local KernelTask or PoolQueue ActorTask // only run either local KernelTask or PoolQueue ActorTask
#ifdef ENABLE_MINDRT
if (RunLocalKernelTask() || RunQueueActorTask()) { if (RunLocalKernelTask() || RunQueueActorTask()) {
spin_count_ = 0; spin_count_ = 0;
} else { } else {
@ -39,7 +43,16 @@ void ActorWorker::Run() {
if (spin_count_ >= kDefaultSpinCount) { if (spin_count_ >= kDefaultSpinCount) {
WaitUntilActive(); WaitUntilActive();
} }
#else }
}
void ActorWorker::RunWithWait() {
#ifndef __APPLE__
static std::atomic_int index = 0;
pthread_setname_np(pthread_self(), ("ActorThread_" + std::to_string(index++)).c_str());
#endif
while (alive_) {
// only run either local KernelTask or PoolQueue ActorTask
bool busy = RunLocalKernelTask() || RunQueueActorTask(); bool busy = RunLocalKernelTask() || RunQueueActorTask();
if (!busy) { if (!busy) {
// wait until enqueue ActorTask or distribute KernelTask // wait until enqueue ActorTask or distribute KernelTask
@ -47,7 +60,6 @@ void ActorWorker::Run() {
status_ = kThreadIdle; status_ = kThreadIdle;
cond_var_.wait(_l, [&] { return status_ == kThreadBusy || !alive_; }); cond_var_.wait(_l, [&] { return status_ == kThreadBusy || !alive_; });
} }
#endif // ENABLE_MINDRT
} }
} }
@ -116,7 +128,7 @@ void ActorThreadPool::PushActorToQueue(const ActorReference &actor) {
} }
} }
int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num) { int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num, ThreadPolicy policy) {
size_t core_num = std::thread::hardware_concurrency(); size_t core_num = std::thread::hardware_concurrency();
THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num); THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num);
actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num; actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num;
@ -128,7 +140,7 @@ int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_nu
std::lock_guard<std::mutex> _l(pool_mutex_); std::lock_guard<std::mutex> _l(pool_mutex_);
auto worker = new (std::nothrow) ActorWorker(); auto worker = new (std::nothrow) ActorWorker();
THREAD_ERROR_IF_NULL(worker); THREAD_ERROR_IF_NULL(worker);
worker->CreateThread(this); worker->CreateThread(this, policy);
workers_.push_back(worker); workers_.push_back(worker);
THREAD_INFO("create actor thread[%zu]", i); THREAD_INFO("create actor thread[%zu]", i);
} }
@ -139,12 +151,13 @@ int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_nu
return THREAD_OK; return THREAD_OK;
} }
ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num) { ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num,
ThreadPolicy policy) {
ActorThreadPool *pool = new (std::nothrow) ActorThreadPool(); ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
if (pool == nullptr) { if (pool == nullptr) {
return nullptr; return nullptr;
} }
int ret = pool->CreateThreads(actor_thread_num, all_thread_num); int ret = pool->CreateThreads(actor_thread_num, all_thread_num, policy);
if (ret != THREAD_OK) { if (ret != THREAD_OK) {
delete pool; delete pool;
return nullptr; return nullptr;
@ -159,12 +172,12 @@ ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size
return pool; return pool;
} }
ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num) { ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num, ThreadPolicy policy) {
ActorThreadPool *pool = new (std::nothrow) ActorThreadPool(); ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
if (pool == nullptr) { if (pool == nullptr) {
return nullptr; return nullptr;
} }
int ret = pool->CreateThreads(thread_num, thread_num); int ret = pool->CreateThreads(thread_num, thread_num, policy);
if (ret != THREAD_OK) { if (ret != THREAD_OK) {
delete pool; delete pool;
return nullptr; return nullptr;

View File

@ -25,32 +25,39 @@
#include "actor/actor.h" #include "actor/actor.h"
namespace mindspore { namespace mindspore {
enum ThreadPolicy {
KThreadSpin = 0, // thread run in spin
kThreadWait = 1 // synchronous and wait
};
class ActorThreadPool; class ActorThreadPool;
class ActorWorker : public Worker { class ActorWorker : public Worker {
public: public:
void CreateThread(ActorThreadPool *pool); void CreateThread(ActorThreadPool *pool, ThreadPolicy policy);
bool Active(); bool Active();
private: private:
void Run() override; void RunWithWait();
void RunWithSpin();
bool RunQueueActorTask(); bool RunQueueActorTask();
ActorThreadPool *pool_{nullptr}; ActorThreadPool *pool_{nullptr};
}; };
class ActorThreadPool : public ThreadPool { class ActorThreadPool : public ThreadPool {
public: public:
// create ThreadPool that contains actor thread and kernel thread // create ThreadPool that contains actor thread and kernel thread
static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num); static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, ThreadPolicy policy);
// create ThreadPool that contains only actor thread // create ThreadPool that contains only actor thread
static ActorThreadPool *CreateThreadPool(size_t thread_num); static ActorThreadPool *CreateThreadPool(size_t thread_num, ThreadPolicy policy);
~ActorThreadPool() override; ~ActorThreadPool() override;
void PushActorToQueue(const ActorReference &actor); void PushActorToQueue(const ActorReference &actor);
ActorReference PopActorFromQueue(); ActorReference PopActorFromQueue();
private: private:
int CreateThreads(size_t actor_thread_num, size_t all_thread_num); int CreateThreads(size_t actor_thread_num, size_t all_thread_num, ThreadPolicy policy);
size_t actor_thread_num_{0}; size_t actor_thread_num_{0};

View File

@ -75,7 +75,7 @@ class Worker {
pthread_t handle() { return thread_.native_handle(); } pthread_t handle() { return thread_.native_handle(); }
protected: protected:
virtual void Run(); void Run();
void YieldAndDeactive(); void YieldAndDeactive();
void WaitUntilActive(); void WaitUntilActive();

View File

@ -67,7 +67,7 @@ int InnerContext::Init() {
} }
if (this->thread_pool_ == nullptr && this->IsCpuEnabled()) { if (this->thread_pool_ == nullptr && this->IsCpuEnabled()) {
int actor_parallel_thread = this->enable_parallel_ ? 2 : 1; int actor_parallel_thread = this->enable_parallel_ ? 2 : 1;
thread_pool_ = ActorThreadPool::CreateThreadPool(actor_parallel_thread, this->thread_num_); thread_pool_ = ActorThreadPool::CreateThreadPool(actor_parallel_thread, this->thread_num_, KThreadSpin);
if (thread_pool_ == nullptr) { if (thread_pool_ == nullptr) {
MS_LOG(ERROR) << "Create ThreadPool failed"; MS_LOG(ERROR) << "Create ThreadPool failed";
return RET_NULL_PTR; return RET_NULL_PTR;

View File

@ -23,7 +23,7 @@ ParallelExecutor::~ParallelExecutor() { delete thread_pool_; }
int ParallelExecutor::Prepare(const std::vector<mindspore::kernel::LiteKernel *> &kernels, int ParallelExecutor::Prepare(const std::vector<mindspore::kernel::LiteKernel *> &kernels,
const std::vector<Tensor *> &inputs, const std::vector<Tensor *> &outputs, const std::vector<Tensor *> &inputs, const std::vector<Tensor *> &outputs,
const lite::InnerContext *ctx) { const lite::InnerContext *ctx) {
thread_pool_ = ActorThreadPool::CreateThreadPool(1, max_thread_num_); thread_pool_ = ActorThreadPool::CreateThreadPool(1, max_thread_num_, KThreadSpin);
if (thread_pool_ == nullptr) { if (thread_pool_ == nullptr) {
MS_LOG(ERROR) << "Memory error: fail to new ThreadPool"; MS_LOG(ERROR) << "Memory error: fail to new ThreadPool";
return RET_ERROR; return RET_ERROR;