!46901 [MS][LITE][parallel predict] check param: sharing parallel thread pool

Merge pull request !46901 from yefeng/479-check_shared_thread_pool_param
This commit is contained in:
i-robot 2022-12-16 09:47:36 +00:00 committed by Gitee
commit 02ee9b06c1
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 103 additions and 59 deletions

View File

@ -20,6 +20,11 @@
#include "thread/parallel_threadpool.h"
namespace mindspore {
namespace {
const char *kInnerIDs = "inner_ids";
const char *kInnerRunnerID = "inner_runner_id";
const char *kInnerModelID = "inner_model_id";
} // namespace
ParallelThreadPoolManager *ParallelThreadPoolManager::GetInstance() {
static ParallelThreadPoolManager instance;
return &instance;
@ -50,11 +55,11 @@ void ParallelThreadPoolManager::BindPoolToRunner(
return;
}
std::string runner_id;
auto it_id = config_info->find("inner_ids");
auto it_id = config_info->find(kInnerIDs);
if (it_id != config_info->end()) {
auto item_runner = it_id->second.find("inner_runner_id");
auto item_runner = it_id->second.find(kInnerRunnerID);
if (item_runner != it_id->second.end()) {
runner_id = it_id->second.at("inner_runner_id");
runner_id = it_id->second.at(kInnerRunnerID);
}
}
auto parallel_pool = static_cast<ParallelThreadPool *>(pool);
@ -62,9 +67,9 @@ void ParallelThreadPoolManager::BindPoolToRunner(
THREAD_ERROR("parallel pool is nullptr.");
}
int model_id = 0;
auto item_runner = it_id->second.find("inner_model_id");
auto item_runner = it_id->second.find(kInnerModelID);
if (item_runner != it_id->second.end()) {
model_id = std::atoi(it_id->second.at("inner_model_id").c_str());
model_id = std::atoi(it_id->second.at(kInnerModelID).c_str());
}
runner_id_pools_[runner_id].at(model_id) = parallel_pool;
parallel_pool->SetRunnerID(runner_id);

View File

@ -135,7 +135,7 @@ class ParallelThreadPool : public ActorThreadPool {
int GetPoolRef();
void SetRunnerID(std::string runner_id) { bind_runner_id_ = runner_id; }
inline void SetRunnerID(const std::string &runner_id) { bind_runner_id_ = runner_id; }
std::vector<ParallelWorker *> GetParallelPoolWorkers();

View File

@ -649,6 +649,28 @@ Status ModelPool::InitNumaParameter(const std::shared_ptr<RunnerConfig> &runner_
return kSuccess;
}
Status ModelPool::CheckSharingThreadPoolParam(const ModelPoolConfig &model_pool_config) {
if (!enable_shared_thread_pool_) {
return kSuccess;
}
if (model_pool_config.front()->context->GetInterOpParallelNum() <= 1) {
MS_LOG(ERROR) << "If you want to enable thread pool sharing, please enable parallelThreadPool";
return kLiteError;
}
if (remaining_thread_num_ < 0) {
MS_LOG(ERROR) << "remaining thread num is invalid, remaining_thread_num_: " << remaining_thread_num_;
return kLiteParamInvalid;
}
if (remaining_thread_num_ > model_pool_config.front()->context->GetThreadNum()) {
MS_LOG(ERROR) << "remaining thread num must less then thread num, remaining_thread_num is: "
<< remaining_thread_num_ << ", thread num: " << model_pool_config.front()->context->GetThreadNum();
return kLiteParamInvalid;
}
ParallelThreadPoolManager::GetInstance()->Init(enable_shared_thread_pool_, runner_id_, workers_num_,
remaining_thread_num_);
return kSuccess;
}
Status ModelPool::CreateWorkers(char *graph_buf, size_t size, const ModelPoolConfig &model_pool_config,
bool copy_model) {
std::shared_ptr<ModelWorker> model_worker = nullptr;
@ -660,18 +682,10 @@ Status ModelPool::CreateWorkers(char *graph_buf, size_t size, const ModelPoolCon
runner_id_ = ResourceManager::GetInstance()->GenRunnerID();
std::map<std::string, std::string> ids;
ids[lite::kInnerRunnerID] = runner_id_;
if (enable_shared_thread_pool_) {
if (model_pool_config.front()->context->GetInterOpParallelNum() <= 1) {
MS_LOG(ERROR) << "If you want to enable thread pool sharing, please enable parallelThreadPool";
return kLiteError;
}
if (remaining_thread_num_ > model_pool_config.front()->context->GetThreadNum()) {
MS_LOG(ERROR) << "remaining thread num must less then thread num, remaining_thread_num is: "
<< remaining_thread_num_ << ", thread num: " << model_pool_config.front()->context->GetThreadNum();
return kLiteParamInvalid;
}
ParallelThreadPoolManager::GetInstance()->Init(enable_shared_thread_pool_, runner_id_, workers_num_,
remaining_thread_num_);
auto status = CheckSharingThreadPoolParam(model_pool_config);
if (status != kSuccess) {
MS_LOG(ERROR) << "CheckSharingThreadPoolParam failed.";
return kLiteError;
}
for (size_t i = 0; i < workers_num_; i++) {
ids[lite::kInnerModelID] = std::to_string(i);
@ -775,32 +789,34 @@ Status ModelPool::CanUseAllPhysicalResources() {
}
Status ModelPool::ParseSharedThreadPoolParam(const std::shared_ptr<RunnerConfig> &runner_config) {
if (runner_config != nullptr) {
auto config_info = runner_config->GetConfigInfo();
auto shared_thread_pool = config_info.find(lite::kSharedThreadPool);
if (shared_thread_pool != config_info.end()) {
int remaining_thread_num = 0;
auto shared_thread_pool_param = shared_thread_pool->second;
if (shared_thread_pool_param.find(lite::kEnable) != shared_thread_pool_param.end() &&
shared_thread_pool_param[lite::kEnable] == "true" &&
shared_thread_pool_param.find(lite::kThreadNumLimitPerWorker) != shared_thread_pool_param.end()) {
MS_LOG(INFO) << "use shared thread pool";
enable_shared_thread_pool_ = true;
if (shared_thread_pool_param.find(lite::kThreadNumRemainingPerWorker) != shared_thread_pool_param.end()) {
if (!shared_thread_pool_param[lite::kThreadNumRemainingPerWorker].empty()) {
remaining_thread_num_ = std::atoi(shared_thread_pool_param[lite::kThreadNumRemainingPerWorker].c_str());
} else {
MS_LOG(INFO) << "not set thread_num_remaining_per_worker param, default remaining thread num is 0.";
}
}
} else {
MS_LOG(ERROR) << "Set shared thread pool param failed.";
return kLiteParamInvalid;
}
MS_LOG(INFO) << "use thread pool shared, remaining thread num: " << remaining_thread_num
<< " | Limit thread num: " << shared_thread_pool_param[lite::kThreadNumLimitPerWorker];
}
if (runner_config == nullptr) {
return kSuccess;
}
auto config_info = runner_config->GetConfigInfo();
auto shared_thread_pool = config_info.find(lite::kSharedThreadPool);
if (shared_thread_pool == config_info.end()) {
return kSuccess;
}
int remaining_thread_num = 0;
auto shared_thread_pool_param = shared_thread_pool->second;
if (shared_thread_pool_param.find(lite::kEnable) != shared_thread_pool_param.end() &&
shared_thread_pool_param[lite::kEnable] == "true" &&
shared_thread_pool_param.find(lite::kThreadNumLimitPerWorker) != shared_thread_pool_param.end()) {
MS_LOG(INFO) << "use shared thread pool";
enable_shared_thread_pool_ = true;
if (shared_thread_pool_param.find(lite::kThreadNumRemainingPerWorker) != shared_thread_pool_param.end()) {
if (!shared_thread_pool_param[lite::kThreadNumRemainingPerWorker].empty()) {
remaining_thread_num_ = std::atoi(shared_thread_pool_param[lite::kThreadNumRemainingPerWorker].c_str());
} else {
MS_LOG(INFO) << "not set thread_num_remaining_per_worker param, default remaining thread num is 0.";
}
}
} else {
MS_LOG(ERROR) << "Set shared thread pool param failed.";
return kLiteParamInvalid;
}
MS_LOG(INFO) << "use thread pool shared, remaining thread num: " << remaining_thread_num
<< " | Limit thread num: " << shared_thread_pool_param[lite::kThreadNumLimitPerWorker];
return kSuccess;
}

View File

@ -111,6 +111,8 @@ class ModelPool {
Status ParseSharedThreadPoolParam(const std::shared_ptr<RunnerConfig> &runner_config);
Status CheckSharingThreadPoolParam(const ModelPoolConfig &model_pool_config);
private:
// different workers get tasks from different task queues.
// currently task queues are distinguished according to different numa node numbers.

View File

@ -60,6 +60,8 @@ using AbstractBaseModel = mindspore::infer::AbstractBaseModel;
namespace mindspore::lite {
namespace {
constexpr int kMainSubGraphIndex = 0;
constexpr int kMaxThreadNumTimes = 5;
constexpr int kDefaultThreadNumTimes = 2;
} // namespace
namespace {
@ -1038,22 +1040,36 @@ void Scheduler::ResetByExecutionPlan(std::string node_name, TypeId *data_type) {
return;
}
int Scheduler::GetThreadNumLimit() {
int thread_num_limit = -1;
if (config_info_ != nullptr) {
auto shared_thread_pool = config_info_->find(kSharedThreadPool);
if (shared_thread_pool != config_info_->end()) {
auto shared_thread_pool_param = shared_thread_pool->second;
if (shared_thread_pool_param.find(kEnable) != shared_thread_pool_param.end() &&
shared_thread_pool_param[kEnable] == "true" &&
shared_thread_pool_param.find(kThreadNumLimitPerWorker) != shared_thread_pool_param.end() &&
!shared_thread_pool_param[kThreadNumLimitPerWorker].empty()) {
thread_num_limit = std::atoi(shared_thread_pool_param[kThreadNumLimitPerWorker].c_str());
}
STATUS Scheduler::ParseThreadNumLimit(int *thread_num_limit) {
*thread_num_limit = -1;
if (config_info_ == nullptr) {
return RET_OK;
}
auto shared_thread_pool = config_info_->find(kSharedThreadPool);
if (shared_thread_pool == config_info_->end()) {
return RET_OK;
}
auto shared_thread_pool_param = shared_thread_pool->second;
if (shared_thread_pool_param.find(kEnable) != shared_thread_pool_param.end() &&
shared_thread_pool_param[kEnable] == "true") {
if (shared_thread_pool_param.find(kThreadNumLimitPerWorker) != shared_thread_pool_param.end() &&
!shared_thread_pool_param[kThreadNumLimitPerWorker].empty()) {
*thread_num_limit = std::atoi(shared_thread_pool_param[kThreadNumLimitPerWorker].c_str());
} else {
*thread_num_limit = context_->thread_num_ * kDefaultThreadNumTimes;
}
}
MS_LOG(INFO) << "thread num limit: " << thread_num_limit;
return thread_num_limit;
if (*thread_num_limit <= 0) {
MS_LOG(ERROR) << "thread_num_limit is invalid, thread_num_limit: " << *thread_num_limit;
return RET_ERROR;
}
if (*thread_num_limit > context_->thread_num_ * kMaxThreadNumTimes) {
MS_LOG(ERROR) << "thread num limit: " << *thread_num_limit
<< " is more than 5 times thread num: " << context_->thread_num_
<< ", change it to 5 times thread num. Please check whether Thread num is reasonable.";
return RET_ERROR;
}
return RET_OK;
}
int Scheduler::FindCpuKernel(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
@ -1089,7 +1105,12 @@ int Scheduler::FindCpuKernel(const std::vector<Tensor *> &in_tensors, const std:
}
}
// reset op task num, The number of operator segmentation tasks is not necessarily equal to the number of threads
auto thread_num_limit = GetThreadNumLimit();
int thread_num_limit = -1;
ret = ParseThreadNumLimit(&thread_num_limit);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ParseThreadNumLimit failed.";
return RET_ERROR;
}
if (thread_num_limit != -1 && IsSharedThreadPoolOp(op_type)) {
op_parameter->thread_num_ = thread_num_limit;
}

View File

@ -142,7 +142,7 @@ class Scheduler {
bool GetEnableGLTexture() { return context_->GetDeviceInfo(DT_GPU).gpu_device_info_.enable_gl_texture_; }
void *GetGLContext() { return context_->GetDeviceInfo(DT_GPU).gpu_device_info_.gl_context_; }
void *GetGLDisplay() { return context_->GetDeviceInfo(DT_GPU).gpu_device_info_.gl_display_; }
int GetThreadNumLimit();
STATUS ParseThreadNumLimit(int *thread_num_limit);
protected:
InnerContext *context_ = nullptr;