diff --git a/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.cc b/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.cc index 241a54fe50e..06e89ad08ed 100644 --- a/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.cc +++ b/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.cc @@ -51,6 +51,9 @@ void ParallelThreadPoolManager::Init(bool enable_shared_thread_pool, const std:: runner_id_pools_[runner_id] = runner_pools; remaining_thread_num_[runner_id] = remaining_thread_num; thread_num_limit_[runner_id] = thread_num_limit; + idle_pool_num_[runner_id] = worker_num; + runner_worker_num_[runner_id] = worker_num; + worker_init_num_[runner_id] = 0; #endif } @@ -136,6 +139,7 @@ void ParallelThreadPoolManager::BindPoolToRunner( auto worker = static_cast(all_workers[i]); pool_workers_[parallel_pool].push_back(worker); } + worker_init_num_[runner_id]++; #endif } @@ -150,7 +154,11 @@ bool ParallelThreadPoolManager::GetEnableSharedThreadPool(std::string runner_id) void ParallelThreadPoolManager::ActivatePool(const std::string &runner_id, int model_id) { #ifdef THREAD_POOL_MANAGER std::shared_lock l(pool_manager_mutex_); + if (!enable_shared_thread_pool_[runner_id]) { + return; + } auto &pool = runner_id_pools_[runner_id][model_id]; + idle_pool_num_[runner_id]--; pool->UseThreadPool(1); auto &workers = pool_workers_[pool]; for (auto &worker : workers) { @@ -162,15 +170,19 @@ void ParallelThreadPoolManager::ActivatePool(const std::string &runner_id, int m void ParallelThreadPoolManager::SetFreePool(const std::string &runner_id, int model_id) { #ifdef THREAD_POOL_MANAGER std::shared_lock l(pool_manager_mutex_); + if (!enable_shared_thread_pool_[runner_id]) { + return; + } auto &pool = runner_id_pools_[runner_id][model_id]; pool->UseThreadPool(-1); + idle_pool_num_[runner_id]++; #endif } #ifdef ENABLE_MINDRT ParallelThreadPool *ParallelThreadPoolManager::GetIdleThreadPool(const std::string &runner_id, ParallelTask *task) { #ifdef THREAD_POOL_MANAGER - if (!has_idle_pool_[runner_id]) { + if (runner_worker_num_[runner_id] != worker_init_num_[runner_id] || idle_pool_num_[runner_id] <= 0) { return nullptr; } std::shared_lock l(pool_manager_mutex_); @@ -205,6 +217,9 @@ void ParallelThreadPoolManager::ResetParallelThreadPoolManager(const std::string enable_shared_thread_pool_.erase(runner_id); remaining_thread_num_.erase(runner_id); thread_num_limit_.erase(runner_id); + runner_worker_num_.erase(runner_id); + worker_init_num_.erase(runner_id); + idle_pool_num_.erase(runner_id); #endif } @@ -218,6 +233,9 @@ ParallelThreadPoolManager::~ParallelThreadPoolManager() { enable_shared_thread_pool_.clear(); remaining_thread_num_.clear(); thread_num_limit_.clear(); + runner_worker_num_.clear(); + worker_init_num_.clear(); + idle_pool_num_.clear(); THREAD_INFO("~ParallelThreadPoolManager end."); #endif } diff --git a/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.h b/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.h index b488ad97694..958bced92a1 100644 --- a/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.h +++ b/mindspore/core/mindrt/src/thread/parallel_thread_pool_manager.h @@ -81,6 +81,9 @@ class ParallelThreadPoolManager { std::shared_mutex pool_manager_mutex_; std::map has_idle_pool_; std::map enable_shared_thread_pool_; + std::map runner_worker_num_; + std::map worker_init_num_; + std::map idle_pool_num_; std::map remaining_thread_num_; std::map thread_num_limit_; #endif diff --git a/mindspore/lite/src/common/common.h b/mindspore/lite/src/common/common.h index 74684008252..a1e1ffc16bd 100644 --- a/mindspore/lite/src/common/common.h +++ b/mindspore/lite/src/common/common.h @@ -76,6 +76,7 @@ static const char *const kInnerSharingWeightCopyBufKey = "sharing_weight_copy_bu static const char *const kInnerModelIDKey = "inner_model_id"; static const char *const kInnerRunnerIDKey = "inner_runner_id"; static const char *const kInnerNumaIDKey = "inner_numa_id"; +static const char *const kInnerWorkerNumKey = "inner_worker_num"; // gpu context static const char *const kGPUContextSection = "gpu_context"; static const char *const kInputShapeKey = "input_shape"; diff --git a/mindspore/lite/src/extendrt/cxx_api/model_pool/model_pool.cc b/mindspore/lite/src/extendrt/cxx_api/model_pool/model_pool.cc index 6d87272eb2d..4e4313eb3ba 100644 --- a/mindspore/lite/src/extendrt/cxx_api/model_pool/model_pool.cc +++ b/mindspore/lite/src/extendrt/cxx_api/model_pool/model_pool.cc @@ -698,19 +698,22 @@ Status ModelPool::CreateWorkers(const char *graph_buf, size_t size, const ModelP MS_LOG(INFO) << "runner_id_: " << runner_id_ << " | enable_shared_thread_pool_: " << enable_shared_thread_pool_ << " | workers_num_: " << workers_num_ << " | remaining_thread_num_: " << remaining_thread_num_ << " | thread_num_limit_: " << thread_num_limit_; - ParallelThreadPoolManager::GetInstance()->Init(enable_shared_thread_pool_, runner_id_, workers_num_, - remaining_thread_num_, thread_num_limit_); for (size_t i = 0; i < workers_num_; i++) { int numa_node_id = model_pool_config[i]->numa_id; std::map ids; ids[lite::kInnerModelIDKey] = std::to_string(i); ids[lite::kInnerRunnerIDKey] = runner_id_; ids[lite::kInnerNumaIDKey] = std::to_string(model_pool_config[i]->numa_id); - model_pool_config[i]->config_info[lite::kInnerModelParallelRunnerSection] = ids; + if (enable_shared_thread_pool_) { + ids[lite::kInnerWorkerNumKey] = std::to_string(workers_num_); + ids[lite::kEnableSharedThreadPoolKey] = "true"; + ids[lite::kThreadNumRemainingPerWorkerKey] = std::to_string(remaining_thread_num_); + ids[lite::kThreadNumLimitPerWorkerKey] = std::to_string(thread_num_limit_); + } if (!copy_model || model_pool_config[i]->numa_id == 0) { ids[lite::kInnerSharingWeightCopyBufKey] = "false"; } - + model_pool_config[i]->config_info[lite::kInnerModelParallelRunnerSection] = ids; model_worker = std::make_shared(); if (model_worker == nullptr) { MS_LOG(ERROR) << "model worker is nullptr."; @@ -911,8 +914,13 @@ Status ModelPool::ParseSharedThreadPoolParam(const std::shared_ptr } ModelPoolConfig ModelPool::Init(const std::shared_ptr &runner_config) { + auto status = ParseSharedThreadPoolParam(runner_config); + if (status != kSuccess) { + MS_LOG(WARNING) << "ParseSharedThreadPoolParam failed, Not use thread pool shared."; + enable_shared_thread_pool_ = false; + } ModelPoolConfig model_pool_config = {}; - auto status = CanUseAllPhysicalResources(); + status = CanUseAllPhysicalResources(); if (status != kSuccess) { MS_LOG(ERROR) << "parser sys file failed."; return model_pool_config; @@ -1080,25 +1088,15 @@ Status ModelPool::Predict(const std::vector &inputs, std::vectorSetHasIdlePool(runner_id_, true); - ParallelThreadPoolManager::GetInstance()->ActivatePool(runner_id_, available_worker->GetWorkerID()); - } auto ret = available_worker->Predict(inputs, outputs, before, after); if (ret != kSuccess) { MS_LOG(ERROR) << "direct predict failed."; return kLiteError; } predict_task_queue_->IncreaseWaitModelNum(1, max_wait_worker_node_id); - if (enable_shared_thread_pool_) { - ParallelThreadPoolManager::GetInstance()->SetFreePool(runner_id_, available_worker->GetWorkerID()); - } return kSuccess; } else { // do predict - if (enable_shared_thread_pool_) { - ParallelThreadPoolManager::GetInstance()->SetHasIdlePool(runner_id_, false); - } size_t task_id; auto task = CreatePredictTask(inputs, outputs, before, after, &task_id); if (task == nullptr) { @@ -1134,9 +1132,6 @@ ModelPool::~ModelPool() { if (thread_.joinable()) { thread_.join(); } - if (enable_shared_thread_pool_) { - ParallelThreadPoolManager::GetInstance()->ResetParallelThreadPoolManager(runner_id_); - } MS_LOG(INFO) << "delete model pool task."; if (tasks_ != nullptr) { delete[] tasks_; diff --git a/mindspore/lite/src/litert/lite_session.cc b/mindspore/lite/src/litert/lite_session.cc index 272b7907e43..f53cb9d6cc5 100644 --- a/mindspore/lite/src/litert/lite_session.cc +++ b/mindspore/lite/src/litert/lite_session.cc @@ -758,6 +758,7 @@ int LiteSession::RunGraph(const KernelCallBack &before, const KernelCallBack &af MS_LOG(ERROR) << "Not support multi-threading"; return RET_ERROR; } + ParallelThreadPoolManager::GetInstance()->ActivatePool(runner_id_, worker_id_); STATUS ret = CheckTensorsInvalid(inputs_); if (MS_UNLIKELY(ret != RET_OK)) { is_running_.store(false); @@ -781,27 +782,49 @@ int LiteSession::RunGraph(const KernelCallBack &before, const KernelCallBack &af input->set_shape_changed(false); } } + ParallelThreadPoolManager::GetInstance()->SetFreePool(runner_id_, worker_id_); is_running_.store(false); return ret; } +int LiteSession::InitSharedThreadPool() { + int workers_num = -1; + int remaining_thread_num = -1; + int thread_num_limit = -1; + bool enable_shared_pool = false; + if (config_info_ != nullptr) { + auto runner_info_item = config_info_->find(kInnerModelParallelRunnerSection); + if (runner_info_item != config_info_->end()) { + auto item_runner = runner_info_item->second.find(kInnerRunnerIDKey); + if (item_runner != runner_info_item->second.end()) { + runner_id_ = runner_info_item->second.at(kInnerRunnerIDKey); + } + auto shared_pool_item = runner_info_item->second.find(kEnableSharedThreadPoolKey); + if (shared_pool_item != runner_info_item->second.end() && + runner_info_item->second.at(kEnableSharedThreadPoolKey) == "true") { + workers_num = std::atoi(runner_info_item->second.at(kInnerWorkerNumKey).c_str()); + remaining_thread_num = std::atoi(runner_info_item->second.at(kThreadNumRemainingPerWorkerKey).c_str()); + thread_num_limit = std::atoi(runner_info_item->second.at(kThreadNumLimitPerWorkerKey).c_str()); + worker_id_ = std::atoi(runner_info_item->second.at(kInnerModelIDKey).c_str()); + enable_shared_pool = true; + } + } + } + MS_LOG(INFO) << "runner id: " << runner_id_ << " enable_shared_pool: " << enable_shared_pool + << " workers_num: " << workers_num << " thread_num_limit: " << thread_num_limit + << " remaining_thread_num: " << remaining_thread_num; + ParallelThreadPoolManager::GetInstance()->Init(enable_shared_pool, runner_id_, workers_num, remaining_thread_num, + thread_num_limit); + return RET_OK; +} + int LiteSession::ContextInit(const std::shared_ptr &context) { if (context == nullptr) { MS_LOG(ERROR) << "context is nullptr"; return RET_NULL_PTR; } this->context_ = context; - std::string runner_id; - if (config_info_ != nullptr) { - auto it_id = config_info_->find(kInnerModelParallelRunnerSection); - if (it_id != config_info_->end()) { - auto item_runner = it_id->second.find(kInnerRunnerIDKey); - if (item_runner != it_id->second.end()) { - runner_id = it_id->second.at(kInnerRunnerIDKey); - } - } - } - context_->SetBindRunnerId(runner_id); + context_->SetBindRunnerId(runner_id_); auto ret = this->context_->Init(); if (ret != RET_OK) { MS_LOG(ERROR) << "Init Context failed"; @@ -819,8 +842,8 @@ int LiteSession::ContextInit(const std::shared_ptr &context) { context_->thread_pool_->SetMinSpinCount(kDefaulLiteIosSpinCount); #endif - if (context_->inter_op_parallel_num_ > 1 && !runner_id.empty() && - ParallelThreadPoolManager::GetInstance()->GetEnableSharedThreadPool(runner_id)) { + if (context_->inter_op_parallel_num_ > 1 && !runner_id_.empty() && + ParallelThreadPoolManager::GetInstance()->GetEnableSharedThreadPool(runner_id_)) { MS_LOG(INFO) << "Enable subgraph parallelism and enable thread pool sharing"; ParallelThreadPoolManager::GetInstance()->BindPoolToRunner(context_->thread_pool_, config_info_); } @@ -982,6 +1005,12 @@ int LiteSession::Init(const std::shared_ptr &context) { return RET_NOT_SUPPORT; } + auto status = InitSharedThreadPool(); + if (status != RET_OK) { + MS_LOG(ERROR) << "init Shared thread pool failed"; + is_running_.store(false); + return status; + } auto ret = ContextInit(context); if (ret != RET_OK) { MS_LOG(ERROR) << "Init Context failed"; @@ -1077,6 +1106,7 @@ LiteSession::~LiteSession() { #endif delete ms_context_; ms_context_ = nullptr; + ParallelThreadPoolManager::GetInstance()->ResetParallelThreadPoolManager(runner_id_); lite::PackWeightManager::GetInstance()->FreePackWeight(runner_id_, model_id_); if (model_ != nullptr && is_shared_weight_) { model_->buf = nullptr; diff --git a/mindspore/lite/src/litert/lite_session.h b/mindspore/lite/src/litert/lite_session.h index 99b4d65b966..02220f632cc 100644 --- a/mindspore/lite/src/litert/lite_session.h +++ b/mindspore/lite/src/litert/lite_session.h @@ -170,6 +170,7 @@ class LiteSession { int CreateCoreMLDelegate(); int DelegateInit(); int InitGPURuntime(); + int InitSharedThreadPool(); private: int IsolateOutputTensor(); @@ -242,6 +243,7 @@ class LiteSession { std::vector non_tail_call_kernels_; std::string model_id_; std::string runner_id_; + int worker_id_; bool is_shared_weight_ = false; }; } // namespace lite