From 58756a44f9a2ae22797145ad5e79cefadd44c070 Mon Sep 17 00:00:00 2001 From: yefeng Date: Tue, 1 Mar 2022 09:00:34 +0800 Subject: [PATCH] parallel predict support numa --- include/api/model_parallel_runner.h | 1 + mindspore/lite/src/common/graph_util.cc | 5 + mindspore/lite/src/common/graph_util.h | 2 + .../model_pool/model_parallel_runner.h | 70 ----- .../lite/src/cxx_api/model_pool/model_pool.cc | 263 ++++++++++++------ .../lite/src/cxx_api/model_pool/model_pool.h | 19 +- .../src/cxx_api/model_pool/model_worker.cc | 12 +- .../src/cxx_api/model_pool/model_worker.h | 8 +- .../cxx_api/model_pool/predict_task_queue.cc | 24 +- .../cxx_api/model_pool/predict_task_queue.h | 18 +- mindspore/lite/src/lite_session.cc | 2 +- mindspore/lite/src/pack_weight_manager.cc | 7 +- mindspore/lite/src/pack_weight_manager.h | 2 +- mindspore/lite/src/runtime/numa_adapter.h | 2 +- mindspore/lite/src/scheduler.cc | 5 + .../lite/tools/benchmark/benchmark_base.h | 11 +- .../tools/benchmark/benchmark_unified_api.cc | 13 +- mindspore/lite/tools/converter/CMakeLists.txt | 2 +- .../converter/micro/cmake/file_list.cmake | 5 - 19 files changed, 263 insertions(+), 208 deletions(-) delete mode 100644 mindspore/lite/src/cxx_api/model_pool/model_parallel_runner.h diff --git a/include/api/model_parallel_runner.h b/include/api/model_parallel_runner.h index f618a40a32e..386b342200c 100644 --- a/include/api/model_parallel_runner.h +++ b/include/api/model_parallel_runner.h @@ -24,6 +24,7 @@ namespace mindspore { struct RunnerConfig { std::shared_ptr context = nullptr; + int workers_num = 0; }; /// \brief The ModelParallelRunner class is used to define a MindSpore ModelParallelRunner, facilitating Model diff --git a/mindspore/lite/src/common/graph_util.cc b/mindspore/lite/src/common/graph_util.cc index e01d6fdf47f..fb6f752315c 100644 --- a/mindspore/lite/src/common/graph_util.cc +++ b/mindspore/lite/src/common/graph_util.cc @@ -93,5 +93,10 @@ bool IsPackedOp(int op_type) { schema::PrimitiveType_FullConnection, schema::PrimitiveType_MatMulFusion}; return IsContain(packed_ops, op_type); } + +bool IsShareConstOp(int op_type) { + static const std::vector packed_ops = {schema::PrimitiveType_Gather}; + return IsContain(packed_ops, op_type); +} } // namespace lite } // namespace mindspore diff --git a/mindspore/lite/src/common/graph_util.h b/mindspore/lite/src/common/graph_util.h index dac4409c1c4..a62f5abdae5 100644 --- a/mindspore/lite/src/common/graph_util.h +++ b/mindspore/lite/src/common/graph_util.h @@ -34,6 +34,8 @@ using NODE_ID = std::string; // only support op_type from current schema bool IsPackedOp(int op_type); +bool IsShareConstOp(int op_type); + std::vector GetGraphInputNodes(const lite::Model *model); std::vector GetGraphOutputNodes(const lite::Model *model); diff --git a/mindspore/lite/src/cxx_api/model_pool/model_parallel_runner.h b/mindspore/lite/src/cxx_api/model_pool/model_parallel_runner.h deleted file mode 100644 index cb38bd3eae5..00000000000 --- a/mindspore/lite/src/cxx_api/model_pool/model_parallel_runner.h +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright 2022 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef MINDSPORE_INCLUDE_API_MODEL_RUNNER_H -#define MINDSPORE_INCLUDE_API_MODEL_RUNNER_H -#include -#include -#include -#include -#include "include/api/status.h" -#include "include/api/context.h" - -namespace mindspore { -class ModelPool; -struct RunnerConfig; - -/// \brief The ModelRunner class is used to define a MindSpore ModelPoolManager, facilitating Model management. -class MS_API ModelParallelRunner { - public: - ModelParallelRunner() = default; - ~ModelParallelRunner() = default; - - /// \brief build a model runner from model path so that it can run on a device. Only valid for Lite. - /// - /// \param[in] model_path Define the model path. - /// \param[in] model_type Define The type of model file. Options: ModelType::kMindIR, ModelType::kOM. Only - /// ModelType::kMindIR is valid for Lite. - /// \param[in] model_context Define the context used to store options during execution. - /// \param[in] dec_key Define the key used to decrypt the ciphertext model. The key length is 16, 24, or 32. - /// \param[in] dec_mode Define the decryption mode. Options: AES-GCM, AES-CBC. - /// - /// \return Status. - Status Init(const std::string &model_path, const std::shared_ptr &runner_config = nullptr, - const Key &dec_key = {}, const std::string &dec_mode = kDecModeAesGcm); - - /// \brief Obtains all input tensors of the model. - /// - /// \return The vector that includes all input tensors. - std::vector GetInputs(); - - /// \brief Obtains all output tensors of the model. - /// - /// \return The vector that includes all output tensors. - std::vector GetOutputs(); - - /// \brief Inference ModelPoolManager. - /// - /// \param[in] inputs A vector where model inputs are arranged in sequence. - /// \param[out] outputs Which is a pointer to a vector. The model outputs are filled in the container in sequence. - /// \param[in] before CallBack before predict. - /// \param[in] after CallBack after predict. - /// - /// \return Status. - Status Predict(const std::vector &inputs, std::vector *outputs, - const MSKernelCallBack &before = nullptr, const MSKernelCallBack &after = nullptr); -}; -} // namespace mindspore -#endif // MINDSPORE_INCLUDE_API_MODEL_RUNNER_H diff --git a/mindspore/lite/src/cxx_api/model_pool/model_pool.cc b/mindspore/lite/src/cxx_api/model_pool/model_pool.cc index af15476d8fc..d5d7cd11b17 100644 --- a/mindspore/lite/src/cxx_api/model_pool/model_pool.cc +++ b/mindspore/lite/src/cxx_api/model_pool/model_pool.cc @@ -20,7 +20,7 @@ #include "include/lite_types.h" #include "src/common/config_file.h" #include "src/runtime/inner_allocator.h" -#include "src/common//file_utils.h" +#include "src/common/file_utils.h" #include "src/pack_weight_manager.h" #include "src/runtime/numa_adapter.h" #include "src/common/common.h" @@ -39,33 +39,33 @@ int GetCoreNum() { #endif return core_num; } +} // namespace -void SetNumaBindStrategy(std::vector> *all_model_bind_list, int thread_num, int node_id) { +Status ModelPool::SetNumaBindStrategy(std::vector> *all_model_bind_list, int thread_num) { if (UNLIKELY(thread_num == 0)) { MS_LOG(ERROR) << "thread num is zero."; - return; + return kLiteError; } - std::vector cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(node_id); - auto cpu_num = cpu_list.size(); - if (cpu_num == 0) { - return; + if (thread_num * static_cast(workers_num_) > GetCoreNum()) { + MS_LOG(ERROR) << "thread num or worker num is wrong ,not support param."; + return kLiteNotSupport; } - std::vector bind_id; - bind_id.reserve(thread_num); - all_model_bind_list->reserve(cpu_num / thread_num + 1); - bind_id.emplace_back(cpu_list[0]); - for (size_t i = 1; i < cpu_num; ++i) { - if (i % thread_num == 0) { - all_model_bind_list->emplace_back(bind_id); - bind_id.clear(); + for (size_t i = 0; i < workers_num_;) { + std::vector cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(used_numa_node_num_); + if (static_cast(cpu_list.size()) < thread_num) { + MS_LOG(ERROR) << "one numa node do not have enough cpu core for bind thread."; + return kLiteError; } - bind_id.emplace_back(cpu_list[i]); - } - if (!bind_id.empty()) { - all_model_bind_list->emplace_back(bind_id); + for (size_t j = 0; j < cpu_list.size() / thread_num; j++) { + std::vector bind_id; + bind_id.insert(bind_id.begin(), cpu_list.begin() + j * thread_num, cpu_list.begin() + (j + 1) * thread_num); + all_model_bind_list->push_back(bind_id); + i++; + } + used_numa_node_num_++; } + return kSuccess; } -} // namespace void ModelPool::SetBindStrategy(std::vector> *all_model_bind_list, int thread_num) { if (thread_num == 0) { @@ -74,7 +74,7 @@ void ModelPool::SetBindStrategy(std::vector> *all_model_bind_li } int core_num = GetCoreNum(); int core_id = 0; - for (size_t i = 0; i < num_models_; i++) { + for (size_t i = 0; i < workers_num_; i++) { std::vector bind_id; for (int j = 0; j < thread_num; j++) { if (core_id >= core_num) { @@ -92,6 +92,86 @@ ModelPool *ModelPool::GetInstance() { return &instance; } +Status ModelPool::SetDefaultOptimalModelNum(const std::shared_ptr &context) { + if (use_numa_bind_mode_) { + // now only supports the same number of cores per numa node + // do not use if there are extra cores + int one_numa_node_cpu_size = numa::NUMAAdapter::GetInstance()->GetCPUList(0).size(); + if (context->GetThreadNum() > one_numa_node_cpu_size) { + MS_LOG(ERROR) << "thread num more than numa node cpu cores."; + return kLiteError; + } else { + workers_num_ = one_numa_node_cpu_size / static_cast(context->GetThreadNum()) * numa_node_num_; + } + } else { + // each model binds all kernels in order + workers_num_ = GetCoreNum() / static_cast(context->GetThreadNum()); + } + return kSuccess; +} + +Status ModelPool::InitDefaultContext(const std::shared_ptr &context) { + MS_LOG(DEBUG) << "use default config."; + context->SetThreadNum(kNumThreads); + context->SetEnableParallel(true); + context->SetThreadAffinity(lite::HIGHER_CPU); + auto &device_list = context->MutableDeviceInfo(); + auto device_info = std::make_shared(); + device_info->SetEnableFP16(false); + device_list.push_back(device_info); + // set model num + auto status = SetDefaultOptimalModelNum(context); + if (status != kSuccess) { + MS_LOG(ERROR) << "SetDefaultOptimalModelNum failed."; + return kLiteError; + } + return kSuccess; +} + +std::shared_ptr ModelPool::InitUserDefineContext(const std::shared_ptr &runner_config) { + auto context = runner_config->context; + if (context == nullptr) { + MS_LOG(ERROR) << "user set config context nullptr."; + return nullptr; + } + auto device_list = context->MutableDeviceInfo(); + if (device_list.size() != 1) { + MS_LOG(ERROR) << "model pool only support device num 1."; + return nullptr; + } + auto device = device_list.front(); + if (device->GetDeviceType() != kCPU && device->GetDeviceType() != kGPU) { + MS_LOG(ERROR) << "model pool only support cpu or gpu type."; + return nullptr; + } + auto cpu_context = device->Cast(); + auto enable_fp16 = cpu_context->GetEnableFP16(); + if (enable_fp16) { + MS_LOG(ERROR) << "model pool not support enable fp16."; + return nullptr; + } + + if (device->GetDeviceType() == kGPU) { + workers_num_ = 1; + } else if (device->GetDeviceType() == kCPU) { + if (runner_config->workers_num == 0) { + // the user does not define the number of models, the default optimal number of models is used + auto status = SetDefaultOptimalModelNum(context); + if (status != kSuccess) { + MS_LOG(ERROR) << "SetDefaultOptimalModelNum failed."; + return nullptr; + } + } else { + // User defined number of models + workers_num_ = runner_config->workers_num; + } + } else { + MS_LOG(ERROR) << "not support device: " << device->GetDeviceType(); + return nullptr; + } + return context; +} + std::shared_ptr ModelPool::InitContext(const std::shared_ptr &runner_config) { auto model_context = std::make_shared(); if (model_context == nullptr) { @@ -99,42 +179,36 @@ std::shared_ptr ModelPool::InitContext(const std::shared_ptrcontext; - auto device_list = model_context->MutableDeviceInfo(); - if (device_list.size() != 1) { - MS_LOG(ERROR) << "model pool only support device num 1."; - return nullptr; - } - auto device = device_list.front(); - if (device->GetDeviceType() != kCPU && device->GetDeviceType() != kGPU) { - MS_LOG(ERROR) << "model pool only support cpu or gpu type."; - return nullptr; - } - auto cpu_context = device->Cast(); - auto enable_fp16 = cpu_context->GetEnableFP16(); - if (enable_fp16) { - MS_LOG(ERROR) << "model pool not support enable fp16."; - return nullptr; - } - if (device->GetDeviceType() == kGPU) { - num_models_ = 1; - } else { - num_models_ = GetCoreNum() / static_cast(model_context->GetThreadNum()); - } + use_numa_bind_mode_ = numa::NUMAAdapter::GetInstance()->Available() && + runner_config->context->GetThreadAffinityMode() == lite::HIGHER_CPU; + numa_node_num_ = numa::NUMAAdapter::GetInstance()->NodesNum(); + model_context = InitUserDefineContext(runner_config); } else { - MS_LOG(DEBUG) << "use default config."; - num_models_ = GetCoreNum() / static_cast(model_context->GetThreadNum()); - model_context->SetThreadNum(kNumThreads); - model_context->SetEnableParallel(true); - model_context->SetThreadAffinity(lite::HIGHER_CPU); - auto &device_list = model_context->MutableDeviceInfo(); - auto device_info = std::make_shared(); - device_info->SetEnableFP16(false); - device_list.push_back(device_info); + use_numa_bind_mode_ = numa::NUMAAdapter::GetInstance()->Available(); + numa_node_num_ = numa::NUMAAdapter::GetInstance()->NodesNum(); + auto status = InitDefaultContext(model_context); + if (status != kSuccess) { + MS_LOG(ERROR) << "use default context failed."; + return nullptr; + } } return model_context; } +Status ModelPool::SetModelBindMode(std::vector> *all_model_bind_list, + std::shared_ptr model_context) { + if (numa::NUMAAdapter::GetInstance()->Available()) { + auto status = SetNumaBindStrategy(all_model_bind_list, static_cast(model_context->GetThreadNum())); + if (status != kSuccess) { + MS_LOG(ERROR) << "SetNumaBindStrategy failed."; + return kLiteError; + } + } else { + SetBindStrategy(all_model_bind_list, static_cast(model_context->GetThreadNum())); + } + return kSuccess; +} + ModelPoolContex ModelPool::CreateModelContext(const std::shared_ptr &runner_config) { auto model_context = InitContext(runner_config); if (model_context == nullptr) { @@ -145,27 +219,19 @@ ModelPoolContex ModelPool::CreateModelContext(const std::shared_ptrGetThreadNum(); return {}; } - int node_id = -1; - if (numa::NUMAAdapter::GetInstance()->Available()) { - node_id = 0; - num_models_ = - numa::NUMAAdapter::GetInstance()->GetCPUList(node_id).size() / static_cast(model_context->GetThreadNum()); - } else { - num_models_ = GetCoreNum() / static_cast(model_context->GetThreadNum()); - } ModelPoolContex model_pool_context; std::vector> all_model_bind_list; if (model_context->GetThreadAffinityMode() == lite::HIGHER_CPU) { - if (numa::NUMAAdapter::GetInstance()->Available()) { - SetNumaBindStrategy(&all_model_bind_list, static_cast(model_context->GetThreadNum()), node_id); - } else { - SetBindStrategy(&all_model_bind_list, static_cast(model_context->GetThreadNum())); + auto status = SetModelBindMode(&all_model_bind_list, model_context); + if (status != kSuccess) { + MS_LOG(ERROR) << "SetModelBindMode failed."; + return {}; } } else if (model_context->GetThreadAffinityMode() == lite::MID_CPU) { MS_LOG(ERROR) << "not support bind MID_CPU."; return {}; } - for (size_t i = 0; i < num_models_; i++) { + for (size_t i = 0; i < workers_num_; i++) { auto context = std::make_shared(); if (context == nullptr) { MS_LOG(ERROR) << "New Context failed."; @@ -212,27 +278,44 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptrSetTaskQueueNum(used_numa_node_num_); + } else { + PredictTaskQueue::GetInstance()->SetTaskQueueNum(1); + } size_t size = 0; + if (graph_buf_ != nullptr) { + delete[] graph_buf_; + graph_buf_ = nullptr; + } graph_buf_ = lite::ReadFile(model_path.c_str(), &size); if (graph_buf_ == nullptr) { MS_LOG(ERROR) << "read file failed."; return kLiteError; } - lite::PackWeightManager::GetInstance()->InitWeightManagerByBuf(graph_buf_); - int node_id = -1; - if (numa::NUMAAdapter::GetInstance()->Available()) { - node_id = 0; + auto ret = lite::PackWeightManager::GetInstance()->InitWeightManagerByBuf(graph_buf_); + if (ret != kSuccess) { + MS_LOG(ERROR) << "InitWeightManagerByBuf failed."; + return kLiteError; } std::shared_ptr model_thread = nullptr; - for (size_t i = 0; i < num_models_; i++) { + for (size_t i = 0; i < workers_num_; i++) { + int numa_node_id = 0; + if (use_numa_bind_mode_ && GetCoreNum() / model_pool_context[i]->GetThreadNum() < numa_node_num_) { + numa_node_id = i; + } else if (use_numa_bind_mode_ && numa_node_num_ != 0) { + numa_node_id = i / (GetCoreNum() / model_pool_context[i]->GetThreadNum() / numa_node_num_); + } else { + numa_node_id = 0; + } model_thread = std::make_shared(); - auto status = model_thread->Init(graph_buf_, size, model_pool_context[i], dec_key, dec_mode, node_id); + auto status = model_thread->Init(graph_buf_, size, model_pool_context[i], dec_key, dec_mode, numa_node_id); if (status != kSuccess) { MS_LOG(ERROR) << " model thread init failed."; return kLiteError; } - model_thread_vec_.push_back(std::thread(&ModelThread::Run, model_thread)); + PredictTaskQueue::GetInstance()->IncreaseeWaitModelNum(1, numa_node_id); + model_thread_vec_.push_back(std::thread(&ModelThread::Run, model_thread, numa_node_id)); } if (model_thread != nullptr) { model_inputs_ = model_thread->GetInputs(); @@ -409,14 +492,30 @@ Status ModelPool::FreeSplitTensor(std::vector> *new_inputs return kSuccess; } +void ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num) { + *max_wait_worker_node_id = 0; + *max_wait_worker_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(0); + for (int i = 1; i < used_numa_node_num_; i++) { + int worker_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(i); + if (*max_wait_worker_num < worker_num) { + *max_wait_worker_num = worker_num; + *max_wait_worker_node_id = i; + } + } +} + Status ModelPool::Predict(const std::vector &inputs, std::vector *outputs, const MSKernelCallBack &before, const MSKernelCallBack &after) { mtx_split_task_.lock(); - auto wait_model_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(); + int max_wait_worker_node_id = 0; + int max_wait_worker_num = 0; + GetMaxWaitWorkerNum(&max_wait_worker_node_id, &max_wait_worker_num); + auto batch = inputs[0].Shape()[0]; - if (PredictTaskQueue::GetInstance()->GetTaskNum() == 0 && wait_model_num > 1 && batch >= wait_model_num) { - size_t batch_split_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(); - PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(batch_split_num); + if (PredictTaskQueue::GetInstance()->GetTaskNum(max_wait_worker_node_id) == 0 && max_wait_worker_num > 1 && + batch >= max_wait_worker_num) { + size_t batch_split_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(max_wait_worker_node_id); + PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(batch_split_num, max_wait_worker_node_id); std::vector> new_inputs; std::vector> new_outputs; auto status = SplitInputTensorByBatch(inputs, &new_inputs, batch_split_num); @@ -433,7 +532,7 @@ Status ModelPool::Predict(const std::vector &inputs, std::vector> tasks; for (size_t i = 0; i < batch_split_num; i++) { auto predict_task = std::make_shared(&new_inputs[i], &new_outputs.at(i), before, after); - PredictTaskQueue::GetInstance()->PushPredictTask(predict_task); + PredictTaskQueue::GetInstance()->PushPredictTask(predict_task, max_wait_worker_node_id); tasks.push_back(predict_task); } mtx_split_task_.unlock(); @@ -450,14 +549,14 @@ Status ModelPool::Predict(const std::vector &inputs, std::vectorIncreaseeWaitModelNum(batch_split_num, max_wait_worker_node_id); } else { - if (wait_model_num == 1) { - PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(1); - } + PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(1, max_wait_worker_node_id); auto predict_task = std::make_shared(&inputs, outputs, before, after); - PredictTaskQueue::GetInstance()->PushPredictTask(predict_task); + PredictTaskQueue::GetInstance()->PushPredictTask(predict_task, max_wait_worker_node_id); mtx_split_task_.unlock(); PredictTaskQueue::GetInstance()->WaitUntilPredictActive(predict_task); + PredictTaskQueue::GetInstance()->IncreaseeWaitModelNum(1, max_wait_worker_node_id); } return kSuccess; } diff --git a/mindspore/lite/src/cxx_api/model_pool/model_pool.h b/mindspore/lite/src/cxx_api/model_pool/model_pool.h index cffeab846c6..83d540d9ca2 100644 --- a/mindspore/lite/src/cxx_api/model_pool/model_pool.h +++ b/mindspore/lite/src/cxx_api/model_pool/model_pool.h @@ -27,6 +27,8 @@ #include "src/cxx_api/model_pool/model_worker.h" #include "src/cxx_api/model_pool/predict_task_queue.h" namespace mindspore { +using ModelPoolContex = std::vector>; + class ModelPool { public: static ModelPool *GetInstance(); @@ -44,9 +46,18 @@ class ModelPool { private: ModelPool() = default; - void SetBindStrategy(std::vector> *all_model_bind_list, int thread_num); + ModelPoolContex CreateModelContext(const std::shared_ptr &runner_config); std::shared_ptr InitContext(const std::shared_ptr &runner_config); + + Status InitDefaultContext(const std::shared_ptr &context); + std::shared_ptr InitUserDefineContext(const std::shared_ptr &runner_config); + Status SetDefaultOptimalModelNum(const std::shared_ptr &context); + + Status SetModelBindMode(std::vector> *all_model_bind_list, std::shared_ptr model_context); + Status SetNumaBindStrategy(std::vector> *all_model_bind_list, int thread_num); + void SetBindStrategy(std::vector> *all_model_bind_list, int thread_num); + Status SplitInputTensorByBatch(const std::vector &inputs, std::vector> *new_inputs, size_t batch_split_num); Status SplitOutputTensorByBatch(std::vector> *outputs, std::vector *new_outputs, @@ -54,14 +65,18 @@ class ModelPool { Status ConcatPredictOutput(std::vector> *outputs, std::vector *new_outputs); Status FreeSplitTensor(std::vector> *new_inputs, std::vector> *new_outputs); + void GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num); std::vector model_thread_vec_; std::vector model_inputs_; std::vector model_outputs_; char *graph_buf_ = nullptr; - size_t num_models_ = 10; + size_t workers_num_ = 1; std::mutex mtx_split_task_; bool is_user_data_ = false; + int numa_node_num_ = 1; + int used_numa_node_num_ = 0; + bool use_numa_bind_mode_ = false; }; } // namespace mindspore #endif // MINDSPORE_INCLUDE_API_MODEL_POOL_MODEL_POOL_H diff --git a/mindspore/lite/src/cxx_api/model_pool/model_worker.cc b/mindspore/lite/src/cxx_api/model_pool/model_worker.cc index 4e88d6c672f..77cf29f6cb7 100644 --- a/mindspore/lite/src/cxx_api/model_pool/model_worker.cc +++ b/mindspore/lite/src/cxx_api/model_pool/model_worker.cc @@ -18,9 +18,9 @@ #include "src/common/utils.h" #include "src/common/common.h" namespace mindspore { -void ModelThread::Run() { +void ModelThread::Run(int node_id) { while (!PredictTaskQueue::GetInstance()->IsPredictTaskDone()) { - auto task = PredictTaskQueue::GetInstance()->GetPredictTask(); + auto task = PredictTaskQueue::GetInstance()->GetPredictTask(node_id); if (task == nullptr) { break; } @@ -35,7 +35,7 @@ void ModelThread::Run() { PredictTaskQueue::GetInstance()->ActiveTask(); continue; } - if (is_copy_output_) { + if (need_copy_output_) { std::vector new_outputs; auto output_size = outputs->size(); for (size_t i = 0; i < output_size; i++) { @@ -63,7 +63,7 @@ Status ModelThread::Init(const char *model_buf, size_t size, const std::shared_p const Key &dec_key, const std::string &dec_mode, int node_id) { model_ = std::make_shared(); mindspore::ModelType model_type = kMindIR; - if (node_id > -1) { + if (node_id != -1) { model_->UpdateConfig(lite::kConfigServerInference, {lite::kConfigNUMANodeId, std::to_string(node_id)}); } auto status = model_->Build(model_buf, size, model_type, model_context, dec_key, dec_mode); @@ -131,7 +131,7 @@ Status ModelThread::Predict(const std::vector &inputs, std::vectorat(i).MutableData()); model_output[i].SetAllocator(nullptr); - is_copy_output_ = false; + need_copy_output_ = false; } } auto status = model_->Predict(inputs, &model_output, before, after); @@ -139,7 +139,7 @@ Status ModelThread::Predict(const std::vector &inputs, std::vectorclear(); outputs->insert(outputs->end(), model_output.begin(), model_output.end()); } else { diff --git a/mindspore/lite/src/cxx_api/model_pool/model_worker.h b/mindspore/lite/src/cxx_api/model_pool/model_worker.h index b3c903b155f..302f6801ffe 100644 --- a/mindspore/lite/src/cxx_api/model_pool/model_worker.h +++ b/mindspore/lite/src/cxx_api/model_pool/model_worker.h @@ -26,8 +26,6 @@ #include "include/api/model.h" #include "src/cxx_api/model_pool/predict_task_queue.h" namespace mindspore { -using ModelPoolContex = std::vector>; - class ModelThread { public: ModelThread() = default; @@ -45,7 +43,7 @@ class ModelThread { Status Predict(const std::vector &inputs, std::vector *outputs, const MSKernelCallBack &before = nullptr, const MSKernelCallBack &after = nullptr); - void Run(); + void Run(int node_id); private: std::pair>, bool> GetModelResize(const std::vector &model_inputs, @@ -54,11 +52,9 @@ class ModelThread { private: std::shared_ptr model_ = nullptr; std::mutex mtx_model_; - std::condition_variable model_cond_; // num thread is configured according to the hardware - int num_models_; - bool is_copy_output_ = true; + bool need_copy_output_ = true; }; } // namespace mindspore #endif // MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_THREAD_H_ diff --git a/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.cc b/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.cc index a6248a84256..673a7747f6f 100644 --- a/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.cc +++ b/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.cc @@ -21,6 +21,11 @@ PredictTaskQueue::~PredictTaskQueue() { task_push_cond_.notify_all(); } +void PredictTaskQueue::SetTaskQueueNum(int num) { + predict_task_.resize(num); + waite_worker_num_.resize(num, 0); +} + void PredictTaskQueue::WaitUntilPredictActive(std::shared_ptr task) { std::unique_lock result_lock(mtx_predict_task_); while (!task->ready) { @@ -36,28 +41,27 @@ PredictTaskQueue *PredictTaskQueue::GetInstance() { return &instance; } -void PredictTaskQueue::PushPredictTask(std::shared_ptr task) { +void PredictTaskQueue::PushPredictTask(std::shared_ptr task, int node_id) { std::unique_lock task_lock(mtx_predict_task_); - predict_task_.push(task); - task_push_cond_.notify_one(); + predict_task_.at(node_id).push(task); + task_push_cond_.notify_all(); } -std::shared_ptr PredictTaskQueue::GetPredictTask() { +std::shared_ptr PredictTaskQueue::GetPredictTask(int node_id) { std::unique_lock task_lock(mtx_predict_task_); - while (predict_task_.empty() && !predict_task_done_) { - waite_model_num_++; + while (predict_task_.at(node_id).empty() && !predict_task_done_) { task_push_cond_.wait(task_lock); } if (predict_task_done_) { return nullptr; } - auto predict_task = predict_task_.front(); - predict_task_.pop(); + auto predict_task = predict_task_.at(node_id).front(); + predict_task_.at(node_id).pop(); return predict_task; } -int PredictTaskQueue::GetTaskNum() { +int PredictTaskQueue::GetTaskNum(int node_id) { std::unique_lock task_lock(mtx_predict_task_); - return predict_task_.size(); + return predict_task_.at(node_id).size(); } } // namespace mindspore diff --git a/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.h b/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.h index 08e0923a1c8..e967bf2659d 100644 --- a/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.h +++ b/mindspore/lite/src/cxx_api/model_pool/predict_task_queue.h @@ -40,20 +40,22 @@ class PredictTaskQueue { static PredictTaskQueue *GetInstance(); ~PredictTaskQueue(); - void PushPredictTask(std::shared_ptr task); + void PushPredictTask(std::shared_ptr task, int node_id); void WaitUntilPredictActive(std::shared_ptr task); - std::shared_ptr GetPredictTask(); + std::shared_ptr GetPredictTask(int node_id); void ActiveTask(); + int GetTaskNum(int node_id); + void SetTaskQueueNum(int num); + bool IsPredictTaskDone() { return predict_task_done_; } - int GetTaskNum(); - int GetWaitModelNum() { return waite_model_num_; } - void DecreaseWaitModelNum(int num) { waite_model_num_ -= num; } + int GetWaitModelNum(int node_id) { return waite_worker_num_.at(node_id); } + void DecreaseWaitModelNum(int num, int node_id) { waite_worker_num_.at(node_id) -= num; } + void IncreaseeWaitModelNum(int num, int node_id) { waite_worker_num_.at(node_id) += num; } private: PredictTaskQueue() = default; - std::queue> predict_task_; - int waite_model_num_ = 0; - + std::vector>> predict_task_; + std::vector waite_worker_num_; std::mutex mtx_predict_task_; std::condition_variable task_pop_cond_; std::condition_variable task_push_cond_; diff --git a/mindspore/lite/src/lite_session.cc b/mindspore/lite/src/lite_session.cc index 2eb254f9b53..05b28e5cfda 100644 --- a/mindspore/lite/src/lite_session.cc +++ b/mindspore/lite/src/lite_session.cc @@ -667,7 +667,7 @@ int LiteSession::IniPackWeightData(Model *model) { for (size_t i = 0; i < kernel_num; i++) { auto node = model->all_nodes_[i]; auto node_type = node->node_type_; - if (IsPackedOp(node_type)) { + if (IsPackedOp(node_type) || IsShareConstOp(node_type)) { for (size_t j = 0; j < node->input_indices_.size(); j++) { auto tensor_index = node->input_indices_[j]; auto src_tensor = lite_model->GetSchemaTensor(tensor_index); diff --git a/mindspore/lite/src/pack_weight_manager.cc b/mindspore/lite/src/pack_weight_manager.cc index a2b5cceeae3..7aba4199a5e 100644 --- a/mindspore/lite/src/pack_weight_manager.cc +++ b/mindspore/lite/src/pack_weight_manager.cc @@ -26,16 +26,17 @@ PackWeightManager *PackWeightManager::GetInstance() { return &instance; } -void PackWeightManager::InitWeightManagerByBuf(const char *model_buf) { - MS_CHECK_TRUE_RET_VOID(model_buf != nullptr); +STATUS PackWeightManager::InitWeightManagerByBuf(const char *model_buf) { + MS_CHECK_TRUE_RET(model_buf != nullptr, RET_ERROR); if (buf_model_weight_.find(model_buf) == buf_model_weight_.end()) { auto *model_const_weight = new (std::nothrow) ModelConstWeight(); if (model_const_weight == nullptr) { MS_LOG(ERROR) << "model_const_weight is nullptr."; - return; + return RET_ERROR; } buf_model_weight_[model_buf] = model_const_weight; } + return RET_OK; } void PackWeightManager::InitWeightManagerByPath(const std::string &model_path, const char *model_buf) { diff --git a/mindspore/lite/src/pack_weight_manager.h b/mindspore/lite/src/pack_weight_manager.h index 48d024ac41b..3e03d36cf3c 100644 --- a/mindspore/lite/src/pack_weight_manager.h +++ b/mindspore/lite/src/pack_weight_manager.h @@ -45,7 +45,7 @@ class PackWeightManager { virtual ~PackWeightManager(); void InitWeightManagerByPath(const std::string &model_path, const char *model_buf); - void InitWeightManagerByBuf(const char *model_buf); + STATUS InitWeightManagerByBuf(const char *model_buf); void DeleteSavedModelPtr(LiteModel *delete_model); STATUS StoreLiteModel(const char *model_buf, const Model *model); void *GetTensorData(const LiteModel *model, const SchemaTensorWrapper *origin_tensor, size_t tensor_index); diff --git a/mindspore/lite/src/runtime/numa_adapter.h b/mindspore/lite/src/runtime/numa_adapter.h index 5304fde717c..4dbaa6c9c99 100644 --- a/mindspore/lite/src/runtime/numa_adapter.h +++ b/mindspore/lite/src/runtime/numa_adapter.h @@ -50,7 +50,7 @@ class NUMAAdapter { } virtual ~NUMAAdapter(); - inline bool Available() const { return false; } + inline bool Available() const { return available_; } void Bind(int node_id); void *Malloc(int node_id, size_t size); void Free(void *data, size_t size); diff --git a/mindspore/lite/src/scheduler.cc b/mindspore/lite/src/scheduler.cc index b6c1f3ce978..c2412d86716 100644 --- a/mindspore/lite/src/scheduler.cc +++ b/mindspore/lite/src/scheduler.cc @@ -146,6 +146,11 @@ int CopyConstTensorData(const std::vector &tensors, int op_type) { if (IsPackedOp(op_type)) { return RET_OK; } +#ifdef SERVER_INFERENCE + if (IsShareConstOp(op_type)) { + return RET_OK; + } +#endif for (auto *tensor : tensors) { // only copy non-copied const tensor if (!tensor->IsConst() && tensor->data() != nullptr) { diff --git a/mindspore/lite/tools/benchmark/benchmark_base.h b/mindspore/lite/tools/benchmark/benchmark_base.h index f5e5dc32a42..af01a017f96 100644 --- a/mindspore/lite/tools/benchmark/benchmark_base.h +++ b/mindspore/lite/tools/benchmark/benchmark_base.h @@ -139,11 +139,11 @@ class MS_API BenchmarkFlags : public virtual FlagParser { AddFlag(&BenchmarkFlags::cosine_distance_threshold_, "cosineDistanceThreshold", "cosine distance threshold", -1.1); AddFlag(&BenchmarkFlags::resize_dims_in_, "inputShapes", "Shape of input data, the format should be NHWC. e.g. 1,32,32,32:1,1,32,32,1", ""); -#ifdef SERVER_INFERENCE AddFlag(&BenchmarkFlags::enable_parallel_predict_, "enableParallelPredict", "Enable model parallel : true | false", false); - AddFlag(&BenchmarkFlags::num_require_, "numRequire", "require num", 1); -#endif + AddFlag(&BenchmarkFlags::parallel_request_num_, "parallelRequestNum", "parallel request num of parallel predict", + 1); + AddFlag(&BenchmarkFlags::workers_num_, "workersNum", "works num of parallel predict", 2); #ifdef ENABLE_OPENGL_TEXTURE AddFlag(&BenchmarkFlags::enable_gl_texture_, "enableGLTexture", "Enable GlTexture2D", false); #endif @@ -157,10 +157,9 @@ class MS_API BenchmarkFlags : public virtual FlagParser { public: // common -#ifdef SERVER_INFERENCE bool enable_parallel_predict_ = false; - int num_require_ = 1; -#endif + int parallel_request_num_ = 1; + int workers_num_ = 2; std::string model_file_; std::string in_data_file_; std::string config_file_; diff --git a/mindspore/lite/tools/benchmark/benchmark_unified_api.cc b/mindspore/lite/tools/benchmark/benchmark_unified_api.cc index 146d9f7ee39..ab423420cf9 100644 --- a/mindspore/lite/tools/benchmark/benchmark_unified_api.cc +++ b/mindspore/lite/tools/benchmark/benchmark_unified_api.cc @@ -956,6 +956,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr contex ModelParallelRunner model_pool; auto runner_config = std::make_shared(); runner_config->context = context; + runner_config->workers_num = flags_->workers_num_; auto model_init_start = GetTimeUs(); auto ret = model_pool.Init(flags_->model_file_, runner_config); if (ret != kSuccess) { @@ -969,7 +970,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr contex MS_LOG(ERROR) << "model pool input is empty."; return RET_ERROR; } - for (int i = 0; i < flags_->num_require_ + flags_->warm_up_loop_count_; i++) { + for (int i = 0; i < flags_->parallel_request_num_ + flags_->warm_up_loop_count_; i++) { auto status = LoadInput(); if (status != RET_OK) { MS_LOG(ERROR) << "Generate input data error"; @@ -1000,7 +1001,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr contex MS_LOG(ERROR) << "model pool predict failed."; } auto predict_end = GetTimeUs(); - std::cout << "run predict time: " << (predict_end - predict_start) / kFloatMSEC << " ms\n"; + std::cout << "per predict time: " << (predict_end - predict_start) / kFloatMSEC << " ms\n"; if (!flags_->benchmark_data_file_.empty()) { auto status = CompareOutputForModelPool(&output); if (status != RET_OK) { @@ -1015,11 +1016,11 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr contex for (auto &warm_up_thread : model_thread_warm_up) { warm_up_thread.join(); } - std::cout << "================ end warm up ================"; + std::cout << "================ end warm up ================\n"; auto all_start = GetTimeUs(); for (int loop_count_num = 0; loop_count_num < flags_->loop_count_; loop_count_num++) { std::vector model_thread_run; - for (int i = 0; i < flags_->num_require_; i++) { + for (int i = 0; i < flags_->parallel_request_num_; i++) { model_thread_run.push_back(std::thread(model_pool_run, i + flags_->warm_up_loop_count_)); } for (auto &run_thread : model_thread_run) { @@ -1028,8 +1029,8 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr contex } auto all_end = GetTimeUs(); std::cout << "=================================" << std::endl; - std::cout << "model pool init time: " << (model_init_end - model_init_start) / kFloatMSEC << " ms\n"; - std::cout << "model pool all run time: " << (all_end - all_start) / kFloatMSEC / flags_->loop_count_ << " ms\n"; + std::cout << "parallel predict init time: " << (model_init_end - model_init_start) / kFloatMSEC << " ms\n"; + std::cout << "parallel predict all run time: " << (all_end - all_start) / kFloatMSEC / flags_->loop_count_ << " ms\n"; std::cout << "=================================" << std::endl; return RET_OK; } diff --git a/mindspore/lite/tools/converter/CMakeLists.txt b/mindspore/lite/tools/converter/CMakeLists.txt index 16044303ec9..338fee8d6c4 100644 --- a/mindspore/lite/tools/converter/CMakeLists.txt +++ b/mindspore/lite/tools/converter/CMakeLists.txt @@ -123,12 +123,12 @@ set(LITE_SRC ${API_SRC} ${SRC_DIR}/errorcode.cc ${SRC_DIR}/weight_decoder.cc ${SRC_DIR}/huffman_decode.cc - ${SRC_DIR}/pack_weight_manager.cc ${SRC_DIR}/delegate/tensorrt/distribution/distribution_base.cc ) if(MSLITE_ENABLE_SERVER_INFERENCE) set(LITE_SRC ${LITE_SRC} + ${SRC_DIR}/pack_weight_manager.cc ${SRC_DIR}/runtime/dynamic_mem_allocator.cc ${SRC_DIR}/runtime/dynamic_mem_manager.cc ${SRC_DIR}/runtime/numa_adapter.cc diff --git a/mindspore/lite/tools/converter/micro/cmake/file_list.cmake b/mindspore/lite/tools/converter/micro/cmake/file_list.cmake index 01534d4ae56..7776b2503b5 100644 --- a/mindspore/lite/tools/converter/micro/cmake/file_list.cmake +++ b/mindspore/lite/tools/converter/micro/cmake/file_list.cmake @@ -126,11 +126,6 @@ set(CODER_OPCODERS_SRC #### custom ${MICRO_DIR}/coder/opcoders/custom/custom_coder.cc ) -if(MSLITE_ENABLE_SERVER_INFERENCE) - set(LITE_SRC - ${LITE_DIR}/src/pack_weight_manager.cc - ) -endif() set(REGISTRY_SRC ${MICRO_DIR}/coder/opcoders/kernel_registry.cc