optimize performance on GPU of ThreadPool, and rename ActorThread

This commit is contained in:
yangjie159 2021-06-14 17:57:54 +08:00
parent af16989394
commit 7f1ded9cbf
15 changed files with 80 additions and 89 deletions

View File

@ -340,7 +340,7 @@ void GraphScheduler::Initialize() {
// Create the thread pool of actor runtime.
auto max_thread_num = GetMaxThreadNum();
MS_LOG(INFO) << "Max available thread number: " << max_thread_num;
thread_pool_ = InterThreadPool::CreateThreadPool(max_thread_num);
thread_pool_ = ActorThreadPool::CreateThreadPool(max_thread_num);
MS_EXCEPTION_IF_NULL(thread_pool_);
// Create and schedule memory manager actor.

View File

@ -36,7 +36,7 @@
#include "runtime/framework/actor/copy_actor.h"
#include "runtime/hardware/device_context.h"
#include "backend/session/kernel_graph.h"
#include "thread/inter_threadpool.h"
#include "thread/actor_threadpool.h"
namespace mindspore {
namespace runtime {
@ -305,7 +305,7 @@ class GraphScheduler {
const AID *recorder_aid_{nullptr};
const AID *debug_aid_{nullptr};
InterThreadPool *thread_pool_{nullptr};
ActorThreadPool *thread_pool_{nullptr};
bool init_{false};
};

View File

@ -31,7 +31,7 @@ namespace mindspore {
class ActorBase;
class ActorMgr;
class ActorPolicy;
class InterThreadPool;
class ActorThreadPool;
using ActorReference = std::shared_ptr<ActorBase>;
@ -79,7 +79,7 @@ class ActorBase {
// delete the send/receive message package size
void DelRuleUdp(const std::string &peer, bool outputLog);
void set_thread_pool(InterThreadPool *pool) { pool_ = pool; }
void set_thread_pool(ActorThreadPool *pool) { pool_ = pool; }
protected:
using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>;
@ -149,7 +149,7 @@ class ActorBase {
private:
friend class ActorMgr;
friend class InterThreadPool;
friend class ActorThreadPool;
// KMSG Msg Handler
virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg);
@ -197,7 +197,7 @@ class ActorBase {
void SetRunningStatus(bool start);
std::unique_ptr<ActorPolicy> actorPolicy;
InterThreadPool *pool_{nullptr};
ActorThreadPool *pool_{nullptr};
AID id;
std::map<std::string, ActorFunction> actionFunctions;

View File

@ -26,7 +26,7 @@
#include <shared_mutex>
#endif
#include "actor/actor.h"
#include "thread/inter_threadpool.h"
#include "thread/actor_threadpool.h"
namespace mindspore {

View File

@ -14,11 +14,11 @@
* limitations under the License.
*/
#include "thread/inter_threadpool.h"
#include "thread/actor_threadpool.h"
#include "thread/core_affinity.h"
namespace mindspore {
InterThreadPool::~InterThreadPool() {
ActorThreadPool::~ActorThreadPool() {
// wait until actor queue is empty
bool terminate = false;
do {
@ -33,26 +33,20 @@ InterThreadPool::~InterThreadPool() {
DestructThreads();
}
void InterThreadPool::AsyncRunMultiTask(Worker *worker) {
void ActorThreadPool::AsyncRunMultiTask(Worker *worker) {
THREAD_RETURN_IF_NULL(worker);
while (alive_) {
if (RunLocalKernelTask(worker) || RunPoolQueueActorTask(worker)) {
worker->spin = 0;
if (RunLocalKernelTask(worker) || RunPoolQueueActorTask()) {
} else {
// deactivate this worker only on the first entry
if (worker->spin == 0) {
worker->running = false;
}
worker->spin++;
std::this_thread::yield();
}
if (worker->spin >= kDefaultSpinCount) {
WaitUntilActivate(worker);
// wait until Actor enqueue or distribute KernelTask
std::unique_lock<std::mutex> _l(worker->mutex);
worker->running = false;
worker->cond_var.wait(_l, [&] { return worker->running || !alive_; });
}
}
}
bool InterThreadPool::RunPoolQueueActorTask(Worker *worker) {
bool ActorThreadPool::RunPoolQueueActorTask() {
ActorBase *actor = nullptr;
if (!PopActorFromQueue(&actor)) {
return false;
@ -63,7 +57,7 @@ bool InterThreadPool::RunPoolQueueActorTask(Worker *worker) {
return true;
}
bool InterThreadPool::PopActorFromQueue(ActorBase **actor) {
bool ActorThreadPool::PopActorFromQueue(ActorBase **actor) {
std::lock_guard<std::mutex> _l(actor_mutex_);
if (actor_queue_.empty()) {
return false;
@ -73,7 +67,7 @@ bool InterThreadPool::PopActorFromQueue(ActorBase **actor) {
return true;
}
void InterThreadPool::EnqueReadyActor(const ActorReference &actor) {
void ActorThreadPool::EnqueReadyActor(const ActorReference &actor) {
{
std::lock_guard<std::mutex> _l(actor_mutex_);
actor_queue_.push(actor);
@ -89,12 +83,11 @@ void InterThreadPool::EnqueReadyActor(const ActorReference &actor) {
}
}
int InterThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num) {
int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num) {
size_t core_num = std::thread::hardware_concurrency();
THREAD_INFO("actor_thread_num: %zu, actor_thread_num: [%zu], core_num: [%zu]", actor_thread_num, all_thread_num,
core_num);
THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num);
actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num;
if (actor_thread_num_ <= 0) {
if (actor_thread_num_ <= 0 || actor_thread_num > all_thread_num) {
THREAD_ERROR("thread num is invalid");
return THREAD_ERROR;
}
@ -102,7 +95,7 @@ int InterThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_nu
std::lock_guard<std::mutex> _l(pool_mutex_);
auto worker = new (std::nothrow) Worker();
THREAD_ERROR_IF_NULL(worker);
worker->thread = std::thread(&InterThreadPool::AsyncRunMultiTask, this, worker);
worker->thread = std::thread(&ActorThreadPool::AsyncRunMultiTask, this, worker);
workers_.push_back(worker);
THREAD_INFO("create actor thread[%zu]", i);
}
@ -113,8 +106,8 @@ int InterThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_nu
return THREAD_OK;
}
InterThreadPool *InterThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num) {
InterThreadPool *pool = new (std::nothrow) InterThreadPool();
ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num) {
ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
if (pool == nullptr) {
return nullptr;
}
@ -133,8 +126,8 @@ InterThreadPool *InterThreadPool::CreateThreadPool(size_t actor_thread_num, size
return pool;
}
InterThreadPool *InterThreadPool::CreateThreadPool(size_t thread_num) {
InterThreadPool *pool = new (std::nothrow) InterThreadPool();
ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num) {
ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
if (pool == nullptr) {
return nullptr;
}

View File

@ -14,8 +14,8 @@
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_RUNTIME_INTER_THREADPOOL_H_
#define MINDSPORE_CORE_MINDRT_RUNTIME_INTER_THREADPOOL_H_
#ifndef MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_
#define MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_
#include <queue>
#include <mutex>
@ -25,13 +25,13 @@
#include "actor/actor.h"
namespace mindspore {
class InterThreadPool : public ThreadPool {
class ActorThreadPool : public ThreadPool {
public:
// create ThreadPool that contains actor thread and kernel thread
static InterThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num);
static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num);
// create ThreadPool that contains only actor thread
static InterThreadPool *CreateThreadPool(size_t thread_num);
~InterThreadPool() override;
static ActorThreadPool *CreateThreadPool(size_t thread_num);
~ActorThreadPool() override;
void EnqueReadyActor(const ActorReference &actor);
@ -39,10 +39,8 @@ class InterThreadPool : public ThreadPool {
int CreateThreads(size_t actor_thread_num, size_t all_thread_num);
void AsyncRunMultiTask(Worker *worker);
bool RunPoolQueueActorTask(Worker *worker);
bool PopActorFromQueue(ActorBase **actor);
bool RunPoolQueueActorTask();
size_t actor_thread_num_{0};
@ -50,4 +48,4 @@ class InterThreadPool : public ThreadPool {
std::queue<ActorReference> actor_queue_;
};
} // namespace mindspore
#endif // MINDSPORE_CORE_MINDRT_RUNTIME_INTER_THREADPOOL_H_
#endif // MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_

View File

@ -41,58 +41,62 @@ void ThreadPool::DestructThreads() {
int ThreadPool::CreateThreads(size_t thread_num) {
size_t core_num = std::thread::hardware_concurrency();
thread_num_ = std::min(thread_num, core_num);
if (thread_num_ <= 0) {
thread_num = std::min(thread_num, core_num);
THREAD_INFO("ThreadInfo, ThreadNum: [%zu], CoreNum: [%zu]", thread_num, core_num);
if (thread_num <= 0) {
THREAD_ERROR("thread num is invalid");
return THREAD_ERROR;
}
std::lock_guard<std::mutex> _l(pool_mutex_);
for (size_t i = 0; i < thread_num_; ++i) {
for (size_t i = 0; i < thread_num; ++i) {
auto worker = new (std::nothrow) Worker();
THREAD_ERROR_IF_NULL(worker);
worker->thread = std::thread(&ThreadPool::AsyncRunTask, this, worker);
workers_.push_back(worker);
THREAD_INFO("create thread[%zu]", i);
THREAD_INFO("create kernel thread[%zu]", i);
}
return THREAD_OK;
}
void ThreadPool::AsyncRunTask(Worker *worker) {
void ThreadPool::AsyncRunTask(Worker *worker) const {
THREAD_RETURN_IF_NULL(worker);
while (alive_) {
if (RunLocalKernelTask(worker)) {
worker->spin = 0;
} else {
if (worker->spin == 0) {
worker->running = false;
}
worker->spin++;
std::this_thread::yield();
YieldAndDeactive(worker);
}
if (worker->spin >= kDefaultSpinCount) {
WaitUntilActivate(worker);
// wait until distribute KernelTask
std::unique_lock<std::mutex> _l(worker->mutex);
worker->spin = 0;
worker->cond_var.wait(_l, [&] { return worker->running || !alive_; });
}
}
}
void ThreadPool::WaitUntilActivate(Worker *worker) {
std::unique_lock<std::mutex> _l(worker->mutex);
worker->spin = 0;
worker->cond_var.wait(_l, [&] { return worker->running || !alive_; });
void ThreadPool::YieldAndDeactive(Worker *worker) const {
// deactivate this worker only on the first entry
if (worker->spin == 0) {
worker->running = false;
}
worker->spin++;
std::this_thread::yield();
}
bool ThreadPool::RunLocalKernelTask(Worker *worker) const {
if (!worker->running || worker->task == nullptr) {
return false;
}
Task *task = worker->task;
task->status |= task->func(task->content, worker->task_id, worker->lhs_scale, worker->rhs_scale);
worker->task = nullptr;
Task *task = worker->task.load(std::memory_order_consume);
int task_id = worker->task_id.load(std::memory_order_consume);
task->status |= task->func(task->content, task_id, worker->lhs_scale, worker->rhs_scale);
worker->task.store(nullptr, std::memory_order_relaxed);
++task->finished;
return true;
}
int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num) {
int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num) const {
// distribute task to the KernelThread and the free ActorThread,
// if the task num is greater than the KernelThread num
THREAD_INFO("launch: %d", task_num);
@ -116,6 +120,8 @@ int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num)
}
void ThreadPool::SyncRunTask(Task *task, int task_num) const {
// run task sequentially
// if the current thread is not the actor thread
float per_scale = kMaxScale / task_num;
for (int i = 0; i < task_num; ++i) {
float lhs_scale = i * per_scale;
@ -126,7 +132,7 @@ void ThreadPool::SyncRunTask(Task *task, int task_num) const {
}
}
void ThreadPool::DistributeTask(Task *task, int task_num) {
void ThreadPool::DistributeTask(Task *task, int task_num) const {
Worker *curr = CurrentWorker();
THREAD_RETURN_IF_NULL(curr);
@ -169,11 +175,9 @@ void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, Task *task,
for (int i = 0; i < task_num; ++i) {
Worker *worker = workers[i];
THREAD_RETURN_IF_NULL(worker);
{
worker->task = task;
worker->task_id = i;
worker->cond_var.notify_one();
}
worker->task_id.store(i, std::memory_order_relaxed);
worker->task.store(task, std::memory_order_relaxed);
worker->cond_var.notify_one();
if (worker == curr) {
RunLocalKernelTask(worker);
}

View File

@ -50,8 +50,8 @@ typedef struct Worker {
std::atomic_bool running{false};
std::mutex mutex;
std::condition_variable cond_var;
Task *task{nullptr};
int task_id{0};
std::atomic<Task *> task{nullptr};
std::atomic_int task_id{0};
float lhs_scale{0.};
float rhs_scale{kMaxScale};
int frequency{kDefaultFrequency};
@ -67,10 +67,9 @@ class ThreadPool {
int SetCpuAffinity(const std::vector<int> &core_list);
int SetCpuAffinity(BindMode bind_mode);
int SetProcessAffinity(BindMode bind_mode) const;
int ParallelLaunch(const Func &func, Content content, int task_num);
int ParallelLaunch(const Func &func, Content content, int task_num) const;
protected:
ThreadPool() = default;
@ -80,25 +79,22 @@ class ThreadPool {
int InitAffinityInfo();
void AsyncRunTask(Worker *worker);
void AsyncRunTask(Worker *worker) const;
void SyncRunTask(Task *task, int task_num) const;
void DistributeTask(Task *task, int task_num);
void DistributeTask(Task *task, int task_num) const;
void CalculateScales(const std::vector<Worker *> &workers, int sum_frequency) const;
void ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num) const;
void YieldAndDeactive(Worker *worker) const;
bool RunLocalKernelTask(Worker *worker) const;
void WaitUntilActivate(Worker *worker);
Worker *CurrentWorker() const;
std::mutex pool_mutex_;
std::vector<Worker *> workers_;
std::atomic_bool alive_{true};
size_t thread_num_{0};
std::vector<Worker *> workers_;
CoreAffinity *affinity_{nullptr};
};

View File

@ -152,7 +152,7 @@ elseif(TARGET_HIMIX200)
set(LITE_SRC
${LITE_SRC}
${CORE_DIR}/mindrt/src/thread/core_affinity.cc
${CORE_DIR}/mindrt/src/thread/inter_threadpool.cc
${CORE_DIR}/mindrt/src/thread/actor_threadpool.cc
${CORE_DIR}/mindrt/src/thread/threadpool.cc
)
endif()

View File

@ -80,7 +80,7 @@ int InnerContext::Init() {
}
if (this->thread_pool_ == nullptr && this->IsCpuEnabled()) {
int actor_parallel_thread = this->enable_parallel_ ? 2 : 1;
thread_pool_ = InterThreadPool::CreateThreadPool(actor_parallel_thread, this->thread_num_);
thread_pool_ = ActorThreadPool::CreateThreadPool(actor_parallel_thread, this->thread_num_);
if (thread_pool_ == nullptr) {
MS_LOG(ERROR) << "Create ThreadPool failed";
return RET_NULL_PTR;

View File

@ -20,7 +20,7 @@
#include <string>
#include "include/context.h"
#include "src/runtime/inner_allocator.h"
#include "thread/inter_threadpool.h"
#include "thread/actor_threadpool.h"
#ifdef ENABLE_ARM
#include "src/cpu_info.h"
#endif
@ -31,7 +31,7 @@
namespace mindspore::lite {
struct InnerContext : public Context {
public:
InterThreadPool *thread_pool_{nullptr};
ActorThreadPool *thread_pool_{nullptr};
public:
InnerContext() = default;

View File

@ -679,7 +679,7 @@ int LiteSession::Init(const Context *context) {
return ret;
}
CpuBindMode cpu_bind_mode = this->context_->device_list_.front().device_info_.cpu_device_info_.cpu_bind_mode_;
InterThreadPool *thread_pool = this->context_->thread_pool_;
ActorThreadPool *thread_pool = this->context_->thread_pool_;
if (thread_pool == nullptr) {
MS_LOG(ERROR) << "thread pool is nullptr";
is_running_.store(false);

View File

@ -23,7 +23,7 @@ ParallelExecutor::~ParallelExecutor() { delete thread_pool_; }
int ParallelExecutor::Prepare(const std::vector<mindspore::kernel::LiteKernel *> &kernels,
const std::vector<Tensor *> &inputs, const std::vector<Tensor *> &outputs,
const lite::InnerContext *ctx) {
thread_pool_ = InterThreadPool::CreateThreadPool(1, max_thread_num_);
thread_pool_ = ActorThreadPool::CreateThreadPool(1, max_thread_num_);
if (thread_pool_ == nullptr) {
MS_LOG(ERROR) << "Memory error: fail to new ThreadPool";
return RET_ERROR;

View File

@ -24,7 +24,7 @@
#include "src/lite_kernel.h"
#include "include/lite_session.h"
#include "src/executor.h"
#include "mindrt/src/thread/inter_threadpool.h"
#include "thread/actor_threadpool.h"
namespace mindspore::lite {
class ParallelExecutor : public Executor {
@ -45,7 +45,7 @@ class ParallelExecutor : public Executor {
std::unordered_map<kernel::LiteKernel *, size_t> refCount;
std::vector<kernel::LiteKernel *> readyKernels;
std::vector<int> results;
InterThreadPool *thread_pool_ = nullptr;
ActorThreadPool *thread_pool_ = nullptr;
int max_thread_num_ = std::thread::hardware_concurrency();
};

View File

@ -159,7 +159,7 @@ if(ENABLE_MINDRT)
${CORE_DIR}/mindrt/src/async/uuid_generator.cc
${CORE_DIR}/mindrt/src/thread/threadpool.cc
${CORE_DIR}/mindrt/src/thread/core_affinity.cc
${CORE_DIR}/mindrt/src/thread/inter_threadpool.cc
${CORE_DIR}/mindrt/src/thread/actor_threadpool.cc
)
endif()