parallel predict support numa

This commit is contained in:
yefeng 2022-03-01 09:00:34 +08:00
parent ff49911889
commit 58756a44f9
19 changed files with 263 additions and 208 deletions

View File

@ -24,6 +24,7 @@
namespace mindspore {
struct RunnerConfig {
std::shared_ptr<Context> context = nullptr;
int workers_num = 0;
};
/// \brief The ModelParallelRunner class is used to define a MindSpore ModelParallelRunner, facilitating Model

View File

@ -93,5 +93,10 @@ bool IsPackedOp(int op_type) {
schema::PrimitiveType_FullConnection, schema::PrimitiveType_MatMulFusion};
return IsContain(packed_ops, op_type);
}
bool IsShareConstOp(int op_type) {
static const std::vector<int> packed_ops = {schema::PrimitiveType_Gather};
return IsContain(packed_ops, op_type);
}
} // namespace lite
} // namespace mindspore

View File

@ -34,6 +34,8 @@ using NODE_ID = std::string;
// only support op_type from current schema
bool IsPackedOp(int op_type);
bool IsShareConstOp(int op_type);
std::vector<size_t> GetGraphInputNodes(const lite::Model *model);
std::vector<size_t> GetGraphOutputNodes(const lite::Model *model);

View File

@ -1,70 +0,0 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_INCLUDE_API_MODEL_RUNNER_H
#define MINDSPORE_INCLUDE_API_MODEL_RUNNER_H
#include <vector>
#include <memory>
#include <utility>
#include <string>
#include "include/api/status.h"
#include "include/api/context.h"
namespace mindspore {
class ModelPool;
struct RunnerConfig;
/// \brief The ModelRunner class is used to define a MindSpore ModelPoolManager, facilitating Model management.
class MS_API ModelParallelRunner {
public:
ModelParallelRunner() = default;
~ModelParallelRunner() = default;
/// \brief build a model runner from model path so that it can run on a device. Only valid for Lite.
///
/// \param[in] model_path Define the model path.
/// \param[in] model_type Define The type of model file. Options: ModelType::kMindIR, ModelType::kOM. Only
/// ModelType::kMindIR is valid for Lite.
/// \param[in] model_context Define the context used to store options during execution.
/// \param[in] dec_key Define the key used to decrypt the ciphertext model. The key length is 16, 24, or 32.
/// \param[in] dec_mode Define the decryption mode. Options: AES-GCM, AES-CBC.
///
/// \return Status.
Status Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config = nullptr,
const Key &dec_key = {}, const std::string &dec_mode = kDecModeAesGcm);
/// \brief Obtains all input tensors of the model.
///
/// \return The vector that includes all input tensors.
std::vector<MSTensor> GetInputs();
/// \brief Obtains all output tensors of the model.
///
/// \return The vector that includes all output tensors.
std::vector<MSTensor> GetOutputs();
/// \brief Inference ModelPoolManager.
///
/// \param[in] inputs A vector where model inputs are arranged in sequence.
/// \param[out] outputs Which is a pointer to a vector. The model outputs are filled in the container in sequence.
/// \param[in] before CallBack before predict.
/// \param[in] after CallBack after predict.
///
/// \return Status.
Status Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before = nullptr, const MSKernelCallBack &after = nullptr);
};
} // namespace mindspore
#endif // MINDSPORE_INCLUDE_API_MODEL_RUNNER_H

View File

@ -20,7 +20,7 @@
#include "include/lite_types.h"
#include "src/common/config_file.h"
#include "src/runtime/inner_allocator.h"
#include "src/common//file_utils.h"
#include "src/common/file_utils.h"
#include "src/pack_weight_manager.h"
#include "src/runtime/numa_adapter.h"
#include "src/common/common.h"
@ -39,33 +39,33 @@ int GetCoreNum() {
#endif
return core_num;
}
} // namespace
void SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num, int node_id) {
Status ModelPool::SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num) {
if (UNLIKELY(thread_num == 0)) {
MS_LOG(ERROR) << "thread num is zero.";
return;
return kLiteError;
}
std::vector<int> cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(node_id);
auto cpu_num = cpu_list.size();
if (cpu_num == 0) {
return;
if (thread_num * static_cast<int>(workers_num_) > GetCoreNum()) {
MS_LOG(ERROR) << "thread num or worker num is wrong ,not support param.";
return kLiteNotSupport;
}
std::vector<int> bind_id;
bind_id.reserve(thread_num);
all_model_bind_list->reserve(cpu_num / thread_num + 1);
bind_id.emplace_back(cpu_list[0]);
for (size_t i = 1; i < cpu_num; ++i) {
if (i % thread_num == 0) {
all_model_bind_list->emplace_back(bind_id);
bind_id.clear();
for (size_t i = 0; i < workers_num_;) {
std::vector<int> cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(used_numa_node_num_);
if (static_cast<int>(cpu_list.size()) < thread_num) {
MS_LOG(ERROR) << "one numa node do not have enough cpu core for bind thread.";
return kLiteError;
}
bind_id.emplace_back(cpu_list[i]);
}
if (!bind_id.empty()) {
all_model_bind_list->emplace_back(bind_id);
for (size_t j = 0; j < cpu_list.size() / thread_num; j++) {
std::vector<int> bind_id;
bind_id.insert(bind_id.begin(), cpu_list.begin() + j * thread_num, cpu_list.begin() + (j + 1) * thread_num);
all_model_bind_list->push_back(bind_id);
i++;
}
used_numa_node_num_++;
}
return kSuccess;
}
} // namespace
void ModelPool::SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num) {
if (thread_num == 0) {
@ -74,7 +74,7 @@ void ModelPool::SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_li
}
int core_num = GetCoreNum();
int core_id = 0;
for (size_t i = 0; i < num_models_; i++) {
for (size_t i = 0; i < workers_num_; i++) {
std::vector<int> bind_id;
for (int j = 0; j < thread_num; j++) {
if (core_id >= core_num) {
@ -92,6 +92,86 @@ ModelPool *ModelPool::GetInstance() {
return &instance;
}
Status ModelPool::SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Context> &context) {
if (use_numa_bind_mode_) {
// now only supports the same number of cores per numa node
// do not use if there are extra cores
int one_numa_node_cpu_size = numa::NUMAAdapter::GetInstance()->GetCPUList(0).size();
if (context->GetThreadNum() > one_numa_node_cpu_size) {
MS_LOG(ERROR) << "thread num more than numa node cpu cores.";
return kLiteError;
} else {
workers_num_ = one_numa_node_cpu_size / static_cast<int>(context->GetThreadNum()) * numa_node_num_;
}
} else {
// each model binds all kernels in order
workers_num_ = GetCoreNum() / static_cast<int>(context->GetThreadNum());
}
return kSuccess;
}
Status ModelPool::InitDefaultContext(const std::shared_ptr<mindspore::Context> &context) {
MS_LOG(DEBUG) << "use default config.";
context->SetThreadNum(kNumThreads);
context->SetEnableParallel(true);
context->SetThreadAffinity(lite::HIGHER_CPU);
auto &device_list = context->MutableDeviceInfo();
auto device_info = std::make_shared<CPUDeviceInfo>();
device_info->SetEnableFP16(false);
device_list.push_back(device_info);
// set model num
auto status = SetDefaultOptimalModelNum(context);
if (status != kSuccess) {
MS_LOG(ERROR) << "SetDefaultOptimalModelNum failed.";
return kLiteError;
}
return kSuccess;
}
std::shared_ptr<Context> ModelPool::InitUserDefineContext(const std::shared_ptr<RunnerConfig> &runner_config) {
auto context = runner_config->context;
if (context == nullptr) {
MS_LOG(ERROR) << "user set config context nullptr.";
return nullptr;
}
auto device_list = context->MutableDeviceInfo();
if (device_list.size() != 1) {
MS_LOG(ERROR) << "model pool only support device num 1.";
return nullptr;
}
auto device = device_list.front();
if (device->GetDeviceType() != kCPU && device->GetDeviceType() != kGPU) {
MS_LOG(ERROR) << "model pool only support cpu or gpu type.";
return nullptr;
}
auto cpu_context = device->Cast<CPUDeviceInfo>();
auto enable_fp16 = cpu_context->GetEnableFP16();
if (enable_fp16) {
MS_LOG(ERROR) << "model pool not support enable fp16.";
return nullptr;
}
if (device->GetDeviceType() == kGPU) {
workers_num_ = 1;
} else if (device->GetDeviceType() == kCPU) {
if (runner_config->workers_num == 0) {
// the user does not define the number of models, the default optimal number of models is used
auto status = SetDefaultOptimalModelNum(context);
if (status != kSuccess) {
MS_LOG(ERROR) << "SetDefaultOptimalModelNum failed.";
return nullptr;
}
} else {
// User defined number of models
workers_num_ = runner_config->workers_num;
}
} else {
MS_LOG(ERROR) << "not support device: " << device->GetDeviceType();
return nullptr;
}
return context;
}
std::shared_ptr<Context> ModelPool::InitContext(const std::shared_ptr<RunnerConfig> &runner_config) {
auto model_context = std::make_shared<mindspore::Context>();
if (model_context == nullptr) {
@ -99,42 +179,36 @@ std::shared_ptr<Context> ModelPool::InitContext(const std::shared_ptr<RunnerConf
return nullptr;
}
if (runner_config != nullptr) {
model_context = runner_config->context;
auto device_list = model_context->MutableDeviceInfo();
if (device_list.size() != 1) {
MS_LOG(ERROR) << "model pool only support device num 1.";
return nullptr;
}
auto device = device_list.front();
if (device->GetDeviceType() != kCPU && device->GetDeviceType() != kGPU) {
MS_LOG(ERROR) << "model pool only support cpu or gpu type.";
return nullptr;
}
auto cpu_context = device->Cast<CPUDeviceInfo>();
auto enable_fp16 = cpu_context->GetEnableFP16();
if (enable_fp16) {
MS_LOG(ERROR) << "model pool not support enable fp16.";
return nullptr;
}
if (device->GetDeviceType() == kGPU) {
num_models_ = 1;
} else {
num_models_ = GetCoreNum() / static_cast<int>(model_context->GetThreadNum());
}
use_numa_bind_mode_ = numa::NUMAAdapter::GetInstance()->Available() &&
runner_config->context->GetThreadAffinityMode() == lite::HIGHER_CPU;
numa_node_num_ = numa::NUMAAdapter::GetInstance()->NodesNum();
model_context = InitUserDefineContext(runner_config);
} else {
MS_LOG(DEBUG) << "use default config.";
num_models_ = GetCoreNum() / static_cast<int>(model_context->GetThreadNum());
model_context->SetThreadNum(kNumThreads);
model_context->SetEnableParallel(true);
model_context->SetThreadAffinity(lite::HIGHER_CPU);
auto &device_list = model_context->MutableDeviceInfo();
auto device_info = std::make_shared<CPUDeviceInfo>();
device_info->SetEnableFP16(false);
device_list.push_back(device_info);
use_numa_bind_mode_ = numa::NUMAAdapter::GetInstance()->Available();
numa_node_num_ = numa::NUMAAdapter::GetInstance()->NodesNum();
auto status = InitDefaultContext(model_context);
if (status != kSuccess) {
MS_LOG(ERROR) << "use default context failed.";
return nullptr;
}
}
return model_context;
}
Status ModelPool::SetModelBindMode(std::vector<std::vector<int>> *all_model_bind_list,
std::shared_ptr<Context> model_context) {
if (numa::NUMAAdapter::GetInstance()->Available()) {
auto status = SetNumaBindStrategy(all_model_bind_list, static_cast<int>(model_context->GetThreadNum()));
if (status != kSuccess) {
MS_LOG(ERROR) << "SetNumaBindStrategy failed.";
return kLiteError;
}
} else {
SetBindStrategy(all_model_bind_list, static_cast<int>(model_context->GetThreadNum()));
}
return kSuccess;
}
ModelPoolContex ModelPool::CreateModelContext(const std::shared_ptr<RunnerConfig> &runner_config) {
auto model_context = InitContext(runner_config);
if (model_context == nullptr) {
@ -145,27 +219,19 @@ ModelPoolContex ModelPool::CreateModelContext(const std::shared_ptr<RunnerConfig
MS_LOG(ERROR) << "Invalid thread num " << model_context->GetThreadNum();
return {};
}
int node_id = -1;
if (numa::NUMAAdapter::GetInstance()->Available()) {
node_id = 0;
num_models_ =
numa::NUMAAdapter::GetInstance()->GetCPUList(node_id).size() / static_cast<int>(model_context->GetThreadNum());
} else {
num_models_ = GetCoreNum() / static_cast<int>(model_context->GetThreadNum());
}
ModelPoolContex model_pool_context;
std::vector<std::vector<int>> all_model_bind_list;
if (model_context->GetThreadAffinityMode() == lite::HIGHER_CPU) {
if (numa::NUMAAdapter::GetInstance()->Available()) {
SetNumaBindStrategy(&all_model_bind_list, static_cast<int>(model_context->GetThreadNum()), node_id);
} else {
SetBindStrategy(&all_model_bind_list, static_cast<int>(model_context->GetThreadNum()));
auto status = SetModelBindMode(&all_model_bind_list, model_context);
if (status != kSuccess) {
MS_LOG(ERROR) << "SetModelBindMode failed.";
return {};
}
} else if (model_context->GetThreadAffinityMode() == lite::MID_CPU) {
MS_LOG(ERROR) << "not support bind MID_CPU.";
return {};
}
for (size_t i = 0; i < num_models_; i++) {
for (size_t i = 0; i < workers_num_; i++) {
auto context = std::make_shared<Context>();
if (context == nullptr) {
MS_LOG(ERROR) << "New Context failed.";
@ -212,27 +278,44 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
MS_LOG(ERROR) << "CreateModelContext failed, context is empty.";
return kLiteError;
}
if (use_numa_bind_mode_) {
PredictTaskQueue::GetInstance()->SetTaskQueueNum(used_numa_node_num_);
} else {
PredictTaskQueue::GetInstance()->SetTaskQueueNum(1);
}
size_t size = 0;
if (graph_buf_ != nullptr) {
delete[] graph_buf_;
graph_buf_ = nullptr;
}
graph_buf_ = lite::ReadFile(model_path.c_str(), &size);
if (graph_buf_ == nullptr) {
MS_LOG(ERROR) << "read file failed.";
return kLiteError;
}
lite::PackWeightManager::GetInstance()->InitWeightManagerByBuf(graph_buf_);
int node_id = -1;
if (numa::NUMAAdapter::GetInstance()->Available()) {
node_id = 0;
auto ret = lite::PackWeightManager::GetInstance()->InitWeightManagerByBuf(graph_buf_);
if (ret != kSuccess) {
MS_LOG(ERROR) << "InitWeightManagerByBuf failed.";
return kLiteError;
}
std::shared_ptr<ModelThread> model_thread = nullptr;
for (size_t i = 0; i < num_models_; i++) {
for (size_t i = 0; i < workers_num_; i++) {
int numa_node_id = 0;
if (use_numa_bind_mode_ && GetCoreNum() / model_pool_context[i]->GetThreadNum() < numa_node_num_) {
numa_node_id = i;
} else if (use_numa_bind_mode_ && numa_node_num_ != 0) {
numa_node_id = i / (GetCoreNum() / model_pool_context[i]->GetThreadNum() / numa_node_num_);
} else {
numa_node_id = 0;
}
model_thread = std::make_shared<ModelThread>();
auto status = model_thread->Init(graph_buf_, size, model_pool_context[i], dec_key, dec_mode, node_id);
auto status = model_thread->Init(graph_buf_, size, model_pool_context[i], dec_key, dec_mode, numa_node_id);
if (status != kSuccess) {
MS_LOG(ERROR) << " model thread init failed.";
return kLiteError;
}
model_thread_vec_.push_back(std::thread(&ModelThread::Run, model_thread));
PredictTaskQueue::GetInstance()->IncreaseeWaitModelNum(1, numa_node_id);
model_thread_vec_.push_back(std::thread(&ModelThread::Run, model_thread, numa_node_id));
}
if (model_thread != nullptr) {
model_inputs_ = model_thread->GetInputs();
@ -409,14 +492,30 @@ Status ModelPool::FreeSplitTensor(std::vector<std::vector<MSTensor>> *new_inputs
return kSuccess;
}
void ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num) {
*max_wait_worker_node_id = 0;
*max_wait_worker_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(0);
for (int i = 1; i < used_numa_node_num_; i++) {
int worker_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(i);
if (*max_wait_worker_num < worker_num) {
*max_wait_worker_num = worker_num;
*max_wait_worker_node_id = i;
}
}
}
Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after) {
mtx_split_task_.lock();
auto wait_model_num = PredictTaskQueue::GetInstance()->GetWaitModelNum();
int max_wait_worker_node_id = 0;
int max_wait_worker_num = 0;
GetMaxWaitWorkerNum(&max_wait_worker_node_id, &max_wait_worker_num);
auto batch = inputs[0].Shape()[0];
if (PredictTaskQueue::GetInstance()->GetTaskNum() == 0 && wait_model_num > 1 && batch >= wait_model_num) {
size_t batch_split_num = PredictTaskQueue::GetInstance()->GetWaitModelNum();
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(batch_split_num);
if (PredictTaskQueue::GetInstance()->GetTaskNum(max_wait_worker_node_id) == 0 && max_wait_worker_num > 1 &&
batch >= max_wait_worker_num) {
size_t batch_split_num = PredictTaskQueue::GetInstance()->GetWaitModelNum(max_wait_worker_node_id);
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(batch_split_num, max_wait_worker_node_id);
std::vector<std::vector<MSTensor>> new_inputs;
std::vector<std::vector<MSTensor>> new_outputs;
auto status = SplitInputTensorByBatch(inputs, &new_inputs, batch_split_num);
@ -433,7 +532,7 @@ Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTen
std::vector<std::shared_ptr<PredictTask>> tasks;
for (size_t i = 0; i < batch_split_num; i++) {
auto predict_task = std::make_shared<PredictTask>(&new_inputs[i], &new_outputs.at(i), before, after);
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task);
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task, max_wait_worker_node_id);
tasks.push_back(predict_task);
}
mtx_split_task_.unlock();
@ -450,14 +549,14 @@ Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTen
MS_LOG(ERROR) << "free split tensor failed.";
return kLiteError;
}
PredictTaskQueue::GetInstance()->IncreaseeWaitModelNum(batch_split_num, max_wait_worker_node_id);
} else {
if (wait_model_num == 1) {
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(1);
}
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(1, max_wait_worker_node_id);
auto predict_task = std::make_shared<PredictTask>(&inputs, outputs, before, after);
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task);
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task, max_wait_worker_node_id);
mtx_split_task_.unlock();
PredictTaskQueue::GetInstance()->WaitUntilPredictActive(predict_task);
PredictTaskQueue::GetInstance()->IncreaseeWaitModelNum(1, max_wait_worker_node_id);
}
return kSuccess;
}

View File

@ -27,6 +27,8 @@
#include "src/cxx_api/model_pool/model_worker.h"
#include "src/cxx_api/model_pool/predict_task_queue.h"
namespace mindspore {
using ModelPoolContex = std::vector<std::shared_ptr<Context>>;
class ModelPool {
public:
static ModelPool *GetInstance();
@ -44,9 +46,18 @@ class ModelPool {
private:
ModelPool() = default;
void SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num);
ModelPoolContex CreateModelContext(const std::shared_ptr<RunnerConfig> &runner_config);
std::shared_ptr<Context> InitContext(const std::shared_ptr<RunnerConfig> &runner_config);
Status InitDefaultContext(const std::shared_ptr<mindspore::Context> &context);
std::shared_ptr<Context> InitUserDefineContext(const std::shared_ptr<RunnerConfig> &runner_config);
Status SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Context> &context);
Status SetModelBindMode(std::vector<std::vector<int>> *all_model_bind_list, std::shared_ptr<Context> model_context);
Status SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num);
void SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num);
Status SplitInputTensorByBatch(const std::vector<MSTensor> &inputs, std::vector<std::vector<MSTensor>> *new_inputs,
size_t batch_split_num);
Status SplitOutputTensorByBatch(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs,
@ -54,14 +65,18 @@ class ModelPool {
Status ConcatPredictOutput(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs);
Status FreeSplitTensor(std::vector<std::vector<MSTensor>> *new_inputs,
std::vector<std::vector<MSTensor>> *new_outputs);
void GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num);
std::vector<std::thread> model_thread_vec_;
std::vector<MSTensor> model_inputs_;
std::vector<MSTensor> model_outputs_;
char *graph_buf_ = nullptr;
size_t num_models_ = 10;
size_t workers_num_ = 1;
std::mutex mtx_split_task_;
bool is_user_data_ = false;
int numa_node_num_ = 1;
int used_numa_node_num_ = 0;
bool use_numa_bind_mode_ = false;
};
} // namespace mindspore
#endif // MINDSPORE_INCLUDE_API_MODEL_POOL_MODEL_POOL_H

View File

@ -18,9 +18,9 @@
#include "src/common/utils.h"
#include "src/common/common.h"
namespace mindspore {
void ModelThread::Run() {
void ModelThread::Run(int node_id) {
while (!PredictTaskQueue::GetInstance()->IsPredictTaskDone()) {
auto task = PredictTaskQueue::GetInstance()->GetPredictTask();
auto task = PredictTaskQueue::GetInstance()->GetPredictTask(node_id);
if (task == nullptr) {
break;
}
@ -35,7 +35,7 @@ void ModelThread::Run() {
PredictTaskQueue::GetInstance()->ActiveTask();
continue;
}
if (is_copy_output_) {
if (need_copy_output_) {
std::vector<MSTensor> new_outputs;
auto output_size = outputs->size();
for (size_t i = 0; i < output_size; i++) {
@ -63,7 +63,7 @@ Status ModelThread::Init(const char *model_buf, size_t size, const std::shared_p
const Key &dec_key, const std::string &dec_mode, int node_id) {
model_ = std::make_shared<Model>();
mindspore::ModelType model_type = kMindIR;
if (node_id > -1) {
if (node_id != -1) {
model_->UpdateConfig(lite::kConfigServerInference, {lite::kConfigNUMANodeId, std::to_string(node_id)});
}
auto status = model_->Build(model_buf, size, model_type, model_context, dec_key, dec_mode);
@ -131,7 +131,7 @@ Status ModelThread::Predict(const std::vector<MSTensor> &inputs, std::vector<MST
/* user set graph-output-tensor from outside */
model_output[i].SetData(outputs->at(i).MutableData());
model_output[i].SetAllocator(nullptr);
is_copy_output_ = false;
need_copy_output_ = false;
}
}
auto status = model_->Predict(inputs, &model_output, before, after);
@ -139,7 +139,7 @@ Status ModelThread::Predict(const std::vector<MSTensor> &inputs, std::vector<MST
MS_LOG(ERROR) << "model predict failed.";
return status;
}
if (is_copy_output_) {
if (need_copy_output_) {
outputs->clear();
outputs->insert(outputs->end(), model_output.begin(), model_output.end());
} else {

View File

@ -26,8 +26,6 @@
#include "include/api/model.h"
#include "src/cxx_api/model_pool/predict_task_queue.h"
namespace mindspore {
using ModelPoolContex = std::vector<std::shared_ptr<Context>>;
class ModelThread {
public:
ModelThread() = default;
@ -45,7 +43,7 @@ class ModelThread {
Status Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before = nullptr, const MSKernelCallBack &after = nullptr);
void Run();
void Run(int node_id);
private:
std::pair<std::vector<std::vector<int64_t>>, bool> GetModelResize(const std::vector<MSTensor> &model_inputs,
@ -54,11 +52,9 @@ class ModelThread {
private:
std::shared_ptr<mindspore::Model> model_ = nullptr;
std::mutex mtx_model_;
std::condition_variable model_cond_;
// num thread is configured according to the hardware
int num_models_;
bool is_copy_output_ = true;
bool need_copy_output_ = true;
};
} // namespace mindspore
#endif // MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_THREAD_H_

View File

@ -21,6 +21,11 @@ PredictTaskQueue::~PredictTaskQueue() {
task_push_cond_.notify_all();
}
void PredictTaskQueue::SetTaskQueueNum(int num) {
predict_task_.resize(num);
waite_worker_num_.resize(num, 0);
}
void PredictTaskQueue::WaitUntilPredictActive(std::shared_ptr<PredictTask> task) {
std::unique_lock<std::mutex> result_lock(mtx_predict_task_);
while (!task->ready) {
@ -36,28 +41,27 @@ PredictTaskQueue *PredictTaskQueue::GetInstance() {
return &instance;
}
void PredictTaskQueue::PushPredictTask(std::shared_ptr<PredictTask> task) {
void PredictTaskQueue::PushPredictTask(std::shared_ptr<PredictTask> task, int node_id) {
std::unique_lock<std::mutex> task_lock(mtx_predict_task_);
predict_task_.push(task);
task_push_cond_.notify_one();
predict_task_.at(node_id).push(task);
task_push_cond_.notify_all();
}
std::shared_ptr<PredictTask> PredictTaskQueue::GetPredictTask() {
std::shared_ptr<PredictTask> PredictTaskQueue::GetPredictTask(int node_id) {
std::unique_lock<std::mutex> task_lock(mtx_predict_task_);
while (predict_task_.empty() && !predict_task_done_) {
waite_model_num_++;
while (predict_task_.at(node_id).empty() && !predict_task_done_) {
task_push_cond_.wait(task_lock);
}
if (predict_task_done_) {
return nullptr;
}
auto predict_task = predict_task_.front();
predict_task_.pop();
auto predict_task = predict_task_.at(node_id).front();
predict_task_.at(node_id).pop();
return predict_task;
}
int PredictTaskQueue::GetTaskNum() {
int PredictTaskQueue::GetTaskNum(int node_id) {
std::unique_lock<std::mutex> task_lock(mtx_predict_task_);
return predict_task_.size();
return predict_task_.at(node_id).size();
}
} // namespace mindspore

View File

@ -40,20 +40,22 @@ class PredictTaskQueue {
static PredictTaskQueue *GetInstance();
~PredictTaskQueue();
void PushPredictTask(std::shared_ptr<PredictTask> task);
void PushPredictTask(std::shared_ptr<PredictTask> task, int node_id);
void WaitUntilPredictActive(std::shared_ptr<PredictTask> task);
std::shared_ptr<PredictTask> GetPredictTask();
std::shared_ptr<PredictTask> GetPredictTask(int node_id);
void ActiveTask();
int GetTaskNum(int node_id);
void SetTaskQueueNum(int num);
bool IsPredictTaskDone() { return predict_task_done_; }
int GetTaskNum();
int GetWaitModelNum() { return waite_model_num_; }
void DecreaseWaitModelNum(int num) { waite_model_num_ -= num; }
int GetWaitModelNum(int node_id) { return waite_worker_num_.at(node_id); }
void DecreaseWaitModelNum(int num, int node_id) { waite_worker_num_.at(node_id) -= num; }
void IncreaseeWaitModelNum(int num, int node_id) { waite_worker_num_.at(node_id) += num; }
private:
PredictTaskQueue() = default;
std::queue<std::shared_ptr<PredictTask>> predict_task_;
int waite_model_num_ = 0;
std::vector<std::queue<std::shared_ptr<PredictTask>>> predict_task_;
std::vector<int> waite_worker_num_;
std::mutex mtx_predict_task_;
std::condition_variable task_pop_cond_;
std::condition_variable task_push_cond_;

View File

@ -667,7 +667,7 @@ int LiteSession::IniPackWeightData(Model *model) {
for (size_t i = 0; i < kernel_num; i++) {
auto node = model->all_nodes_[i];
auto node_type = node->node_type_;
if (IsPackedOp(node_type)) {
if (IsPackedOp(node_type) || IsShareConstOp(node_type)) {
for (size_t j = 0; j < node->input_indices_.size(); j++) {
auto tensor_index = node->input_indices_[j];
auto src_tensor = lite_model->GetSchemaTensor(tensor_index);

View File

@ -26,16 +26,17 @@ PackWeightManager *PackWeightManager::GetInstance() {
return &instance;
}
void PackWeightManager::InitWeightManagerByBuf(const char *model_buf) {
MS_CHECK_TRUE_RET_VOID(model_buf != nullptr);
STATUS PackWeightManager::InitWeightManagerByBuf(const char *model_buf) {
MS_CHECK_TRUE_RET(model_buf != nullptr, RET_ERROR);
if (buf_model_weight_.find(model_buf) == buf_model_weight_.end()) {
auto *model_const_weight = new (std::nothrow) ModelConstWeight();
if (model_const_weight == nullptr) {
MS_LOG(ERROR) << "model_const_weight is nullptr.";
return;
return RET_ERROR;
}
buf_model_weight_[model_buf] = model_const_weight;
}
return RET_OK;
}
void PackWeightManager::InitWeightManagerByPath(const std::string &model_path, const char *model_buf) {

View File

@ -45,7 +45,7 @@ class PackWeightManager {
virtual ~PackWeightManager();
void InitWeightManagerByPath(const std::string &model_path, const char *model_buf);
void InitWeightManagerByBuf(const char *model_buf);
STATUS InitWeightManagerByBuf(const char *model_buf);
void DeleteSavedModelPtr(LiteModel *delete_model);
STATUS StoreLiteModel(const char *model_buf, const Model *model);
void *GetTensorData(const LiteModel *model, const SchemaTensorWrapper *origin_tensor, size_t tensor_index);

View File

@ -50,7 +50,7 @@ class NUMAAdapter {
}
virtual ~NUMAAdapter();
inline bool Available() const { return false; }
inline bool Available() const { return available_; }
void Bind(int node_id);
void *Malloc(int node_id, size_t size);
void Free(void *data, size_t size);

View File

@ -146,6 +146,11 @@ int CopyConstTensorData(const std::vector<Tensor *> &tensors, int op_type) {
if (IsPackedOp(op_type)) {
return RET_OK;
}
#ifdef SERVER_INFERENCE
if (IsShareConstOp(op_type)) {
return RET_OK;
}
#endif
for (auto *tensor : tensors) {
// only copy non-copied const tensor
if (!tensor->IsConst() && tensor->data() != nullptr) {

View File

@ -139,11 +139,11 @@ class MS_API BenchmarkFlags : public virtual FlagParser {
AddFlag(&BenchmarkFlags::cosine_distance_threshold_, "cosineDistanceThreshold", "cosine distance threshold", -1.1);
AddFlag(&BenchmarkFlags::resize_dims_in_, "inputShapes",
"Shape of input data, the format should be NHWC. e.g. 1,32,32,32:1,1,32,32,1", "");
#ifdef SERVER_INFERENCE
AddFlag(&BenchmarkFlags::enable_parallel_predict_, "enableParallelPredict", "Enable model parallel : true | false",
false);
AddFlag(&BenchmarkFlags::num_require_, "numRequire", "require num", 1);
#endif
AddFlag(&BenchmarkFlags::parallel_request_num_, "parallelRequestNum", "parallel request num of parallel predict",
1);
AddFlag(&BenchmarkFlags::workers_num_, "workersNum", "works num of parallel predict", 2);
#ifdef ENABLE_OPENGL_TEXTURE
AddFlag(&BenchmarkFlags::enable_gl_texture_, "enableGLTexture", "Enable GlTexture2D", false);
#endif
@ -157,10 +157,9 @@ class MS_API BenchmarkFlags : public virtual FlagParser {
public:
// common
#ifdef SERVER_INFERENCE
bool enable_parallel_predict_ = false;
int num_require_ = 1;
#endif
int parallel_request_num_ = 1;
int workers_num_ = 2;
std::string model_file_;
std::string in_data_file_;
std::string config_file_;

View File

@ -956,6 +956,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
ModelParallelRunner model_pool;
auto runner_config = std::make_shared<RunnerConfig>();
runner_config->context = context;
runner_config->workers_num = flags_->workers_num_;
auto model_init_start = GetTimeUs();
auto ret = model_pool.Init(flags_->model_file_, runner_config);
if (ret != kSuccess) {
@ -969,7 +970,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
MS_LOG(ERROR) << "model pool input is empty.";
return RET_ERROR;
}
for (int i = 0; i < flags_->num_require_ + flags_->warm_up_loop_count_; i++) {
for (int i = 0; i < flags_->parallel_request_num_ + flags_->warm_up_loop_count_; i++) {
auto status = LoadInput();
if (status != RET_OK) {
MS_LOG(ERROR) << "Generate input data error";
@ -1000,7 +1001,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
MS_LOG(ERROR) << "model pool predict failed.";
}
auto predict_end = GetTimeUs();
std::cout << "run predict time: " << (predict_end - predict_start) / kFloatMSEC << " ms\n";
std::cout << "per predict time: " << (predict_end - predict_start) / kFloatMSEC << " ms\n";
if (!flags_->benchmark_data_file_.empty()) {
auto status = CompareOutputForModelPool(&output);
if (status != RET_OK) {
@ -1015,11 +1016,11 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
for (auto &warm_up_thread : model_thread_warm_up) {
warm_up_thread.join();
}
std::cout << "================ end warm up ================";
std::cout << "================ end warm up ================\n";
auto all_start = GetTimeUs();
for (int loop_count_num = 0; loop_count_num < flags_->loop_count_; loop_count_num++) {
std::vector<std::thread> model_thread_run;
for (int i = 0; i < flags_->num_require_; i++) {
for (int i = 0; i < flags_->parallel_request_num_; i++) {
model_thread_run.push_back(std::thread(model_pool_run, i + flags_->warm_up_loop_count_));
}
for (auto &run_thread : model_thread_run) {
@ -1028,8 +1029,8 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
}
auto all_end = GetTimeUs();
std::cout << "=================================" << std::endl;
std::cout << "model pool init time: " << (model_init_end - model_init_start) / kFloatMSEC << " ms\n";
std::cout << "model pool all run time: " << (all_end - all_start) / kFloatMSEC / flags_->loop_count_ << " ms\n";
std::cout << "parallel predict init time: " << (model_init_end - model_init_start) / kFloatMSEC << " ms\n";
std::cout << "parallel predict all run time: " << (all_end - all_start) / kFloatMSEC / flags_->loop_count_ << " ms\n";
std::cout << "=================================" << std::endl;
return RET_OK;
}

View File

@ -123,12 +123,12 @@ set(LITE_SRC ${API_SRC}
${SRC_DIR}/errorcode.cc
${SRC_DIR}/weight_decoder.cc
${SRC_DIR}/huffman_decode.cc
${SRC_DIR}/pack_weight_manager.cc
${SRC_DIR}/delegate/tensorrt/distribution/distribution_base.cc
)
if(MSLITE_ENABLE_SERVER_INFERENCE)
set(LITE_SRC
${LITE_SRC}
${SRC_DIR}/pack_weight_manager.cc
${SRC_DIR}/runtime/dynamic_mem_allocator.cc
${SRC_DIR}/runtime/dynamic_mem_manager.cc
${SRC_DIR}/runtime/numa_adapter.cc

View File

@ -126,11 +126,6 @@ set(CODER_OPCODERS_SRC
#### custom
${MICRO_DIR}/coder/opcoders/custom/custom_coder.cc
)
if(MSLITE_ENABLE_SERVER_INFERENCE)
set(LITE_SRC
${LITE_DIR}/src/pack_weight_manager.cc
)
endif()
set(REGISTRY_SRC
${MICRO_DIR}/coder/opcoders/kernel_registry.cc