diff --git a/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.cc b/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.cc index e89bbd0f9b3..2699c5d2557 100644 --- a/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.cc +++ b/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.cc @@ -29,6 +29,9 @@ constexpr int kNumDeviceInfo = 2; constexpr int kNumIndex = 2; constexpr int kNumCoreDataLen = 3; constexpr int kNumMaxTaskQueueSize = 1000; +constexpr int kNumPhysicalCoreThreshold = 32; +constexpr int kDefaultWorkerNumPerPhysicalCpu = 4; +constexpr int kDefaultThreadsNum = 8; int GetCoreNum() { int core_num = 1; #if defined(_MSC_VER) || defined(_WIN32) @@ -94,6 +97,22 @@ Status DistinguishPhysicalAndLogical(std::vector *physical_list, std::vecto } return kSuccess; } + +int GetDefaultThreadNum() { + std::vector physical_core_lite; + std::vector logical_core_list; + auto status = DistinguishPhysicalAndLogical(&physical_core_lite, &logical_core_list); + if (status != kSuccess) { + MS_LOG(ERROR) << "DistinguishPhysicalAndLogical failed."; + return 0; + } + auto physical_core_size = physical_core_lite.size(); + if (physical_core_lite.size() < kNumPhysicalCoreThreshold) { + return physical_core_size / kDefaultWorkerNumPerPhysicalCpu; + } else { + return kDefaultThreadsNum; + } +} } // namespace Status ModelPool::DistinguishPhysicalAndLogicalByNuma(const std::vector &physical_core_list, @@ -248,6 +267,12 @@ Status ModelPool::SetDefaultOptimalModelNum(const std::shared_ptr ModelPool::GetDefaultContext() { MS_LOG(DEBUG) << "use default config."; auto context = std::make_shared(); + auto thread_num = GetDefaultThreadNum(); + if (thread_num == 0) { + MS_LOG(ERROR) << "computer thread num failed."; + return nullptr; + } + context->SetThreadNum(thread_num); auto &device_list = context->MutableDeviceInfo(); auto device_info = std::make_shared(); if (device_info == nullptr) { @@ -545,6 +570,7 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptrnuma_id; auto ret = lite::PackWeightManager::GetInstance()->InitPackWeight(graph_buf, size, numa_node_id); @@ -559,15 +585,21 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptrIncreaseWaitModelNum(1, task_queue_id); worker_thread_vec_.push_back(std::thread(&ModelWorker::CreateThreadWorker, model_worker, new_model_buf, size, - model_pool_config[i], predict_task_queue_, &create_worker_success_)); - all_model_worker_.insert(std::make_pair(model_worker, task_queue_id)); + model_pool_config[i], predict_task_queue_, &create_worker_success)); + if (all_model_workers_.find(task_queue_id) != all_model_workers_.end()) { + all_model_workers_[task_queue_id].push_back(model_worker); + } else { + all_model_workers_[task_queue_id] = {model_worker}; + } } - for (auto &item : all_model_worker_) { - auto &worker = item.first; - worker->WaitCreateWorkerDone(); - if (!create_worker_success_) { - MS_LOG(ERROR) << "init failed."; - return kLiteError; + for (auto &item : all_model_workers_) { + auto &workers = item.second; + for (auto &worker : workers) { + worker->WaitCreateWorkerDone(); + if (!create_worker_success) { + MS_LOG(ERROR) << "worker init failed."; + return kLiteError; + } } } // init model pool input and output @@ -591,12 +623,14 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr &config) { - for (auto &item : all_model_worker_) { - auto &worker = item.first; - auto status = worker->UpdateConfig(section, config); - if (status != kSuccess) { - MS_LOG(ERROR) << "model pool update config failed, status=" << status; - return status; + for (auto &item : all_model_workers_) { + auto &workers = item.second; + for (auto &worker : workers) { + auto status = worker->UpdateConfig(section, config); + if (status != kSuccess) { + MS_LOG(ERROR) << "model pool update config failed, status=" << status; + return status; + } } } return kSuccess; @@ -780,12 +814,12 @@ std::shared_ptr ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker } } if (*max_wait_worker_num > 0 && !use_split_batch_) { - for (auto &item : all_model_worker_) { - auto &worker = item.first; - auto numa_id = item.second; + auto &workers = all_model_workers_[*max_wait_worker_node_id]; + auto task_queue_id = *max_wait_worker_node_id; + for (auto &worker : workers) { if (worker->IsAvailable()) { - *max_wait_worker_num = predict_task_queue_->GetWaitModelNum(numa_id); - *max_wait_worker_node_id = numa_id; + *max_wait_worker_num = predict_task_queue_->GetWaitModelNum(task_queue_id); + *max_wait_worker_node_id = task_queue_id; return worker; } } diff --git a/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.h b/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.h index d38519a2053..a7169a6d430 100644 --- a/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.h +++ b/mindspore/lite/src/runtime/cxx_api/model_pool/model_pool.h @@ -88,25 +88,37 @@ class ModelPool { Status DistinguishPhysicalAndLogicalByNuma(const std::vector &physical_core_list, const std::vector &logical_core_list); - std::vector> numa_physical_cores_; - std::vector> numa_logical_cores_; + private: + // different workers get tasks from different task queues. + // currently task queues are distinguished according to different numa node numbers. + // if you do not distinguish between numa nodes, the default task queue number is 0. + // task queue id <=> worker : sort workers by performance. + std::unordered_map>> all_model_workers_; + + // save all worker thread std::vector worker_thread_vec_; + std::mutex predict_task_mutex_; std::vector model_pool_inputs_; std::vector model_pool_outputs_; size_t workers_num_ = 1; - std::mutex predict_task_mutex_; - bool is_user_data_ = false; - int numa_node_num_ = 1; - int used_numa_node_num_ = 0; - bool use_numa_bind_mode_ = false; + + // create predict task std::shared_ptr predict_task_queue_ = nullptr; - std::unordered_map> numa_allocator_; - bool use_split_batch_ = false; - std::unordered_map, int> all_model_worker_; - bool create_worker_success_ = true; PredictTask *tasks_ = nullptr; std::mutex task_id_mutex_; std::queue free_tasks_id_; + + // use numa + int numa_node_num_ = 1; + int used_numa_node_num_ = 0; + bool use_numa_bind_mode_ = false; + std::vector> numa_physical_cores_; + std::vector> numa_logical_cores_; + std::unordered_map> numa_allocator_; + + // split batch + bool use_split_batch_ = false; + bool is_user_data_ = false; // use in split batch }; } // namespace mindspore #endif // MINDSPORE_LITE_SRC_RUNTIME_CXX_API_MODEL_POOL_MODEL_POOL_H_ diff --git a/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.cc b/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.cc index cd0c6b5f82e..19fdfd144c5 100644 --- a/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.cc +++ b/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.cc @@ -78,42 +78,6 @@ void ModelWorker::Run(int node_id, const std::shared_ptr &pred } } -Status ModelWorker::ResizeInit() { - auto inputs = model_->GetInputs(); - std::vector> new_input_shape; - for (size_t input_idx = 0; input_idx < inputs.size(); input_idx++) { - new_input_shape.push_back(inputs[input_idx].Shape()); - for (size_t i = 1; i < new_input_shape.size(); i++) { - if (new_input_shape[input_idx][i] == -1) { - return kSuccess; - } - } - if (new_input_shape[input_idx][0] == -1) { - // only support resize for batch dim - new_input_shape[input_idx][0] = kNumInitBatch; - } else { - // If the batch dimension is not -1, no resize processing is performed - return kSuccess; - } - } - auto status = model_->Resize(inputs, new_input_shape); - if (status != kSuccess) { - MS_LOG(ERROR) << "model resize failed in init. ret=" << status; - return kLiteError; - } - inputs = model_->GetInputs(); - for (auto &input : inputs) { - input.MutableData(); - } - std::vector out; - status = model_->Predict(inputs, &out); - if (status != kSuccess) { - MS_LOG(ERROR) << "init resize failed. ret=" << status; - return kLiteError; - } - return kSuccess; -} - Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_ptr &worker_config) { MS_CHECK_TRUE_MSG(model_buf != nullptr, kLiteError, "model_buf is nullptr in model worker."); MS_CHECK_TRUE_MSG(worker_config != nullptr, kLiteError, "worker_config is nullptr in model worker."); @@ -143,13 +107,6 @@ Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_p MS_LOG(ERROR) << "model worker get empty input/output."; return kLiteError; } - if (need_init_resize_) { - status = ResizeInit(); - if (status != kSuccess) { - MS_LOG(ERROR) << "init resize failed. ret=" << status; - return kLiteError; - } - } return kSuccess; } diff --git a/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.h b/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.h index 1b1d79aa8c5..bdc1d932a18 100644 --- a/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.h +++ b/mindspore/lite/src/runtime/cxx_api/model_pool/model_worker.h @@ -64,12 +64,10 @@ class ModelWorker { std::pair>, bool> GetModelResize(const std::vector &model_inputs, const std::vector &inputs); - Status ResizeInit(); Status CopyOutputTensor(std::vector model_outputs, std::vector *user_outputs); private: - bool need_init_resize_ = true; std::shared_ptr model_ = nullptr; std::mutex mtx_worker_; std::atomic_bool available_ = true; diff --git a/mindspore/lite/src/runtime/pack_weight.cc b/mindspore/lite/src/runtime/pack_weight.cc index deaf340bed7..635694b1c11 100644 --- a/mindspore/lite/src/runtime/pack_weight.cc +++ b/mindspore/lite/src/runtime/pack_weight.cc @@ -104,6 +104,10 @@ STATUS PackWeight::ReplaceOriginTensorData(const char *model_buf, std::vectortensors_data.find(tensor_index) == model_weight->tensors_data.end()) { auto allocator = model_weight->allocator; void *new_data = allocator->Malloc(tensor->Size()); + if (new_data == nullptr) { + MS_LOG(ERROR) << "allocator malloc data failed."; + return RET_ERROR; + } memcpy(new_data, tensor->data(), tensor->Size()); MS_CHECK_TRUE_MSG(tensor->own_data(), RET_ERROR, "tensor data is not own data."); tensor->FreeData(); diff --git a/mindspore/lite/tools/benchmark/benchmark_unified_api.cc b/mindspore/lite/tools/benchmark/benchmark_unified_api.cc index 7134ce79613..56e2e7e21d7 100644 --- a/mindspore/lite/tools/benchmark/benchmark_unified_api.cc +++ b/mindspore/lite/tools/benchmark/benchmark_unified_api.cc @@ -996,6 +996,10 @@ int BenchmarkUnifiedApi::ParallelInference(std::shared_ptr c all_outputs_.push_back(output); } if (!flags_->benchmark_data_file_.empty()) { + for (size_t i = 0; i < ms_inputs_for_api_.size(); i++) { + auto &tensor = ms_inputs_for_api_[i]; + tensor.SetShape(resize_dims_[i]); + } status = PrintInputData(); MS_CHECK_FALSE_MSG(status != RET_OK, status, "PrintInputData error "); status = ReadCalibData();