!31257 [MS][LITE][parallel predict] copy to 1.6

Merge pull request !31257 from yefeng/255-copy_to_1.6_1
This commit is contained in:
i-robot 2022-03-15 07:49:48 +00:00 committed by Gitee
commit edfcb0aadf
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
13 changed files with 362 additions and 214 deletions

View File

@ -26,6 +26,7 @@ struct RunnerConfig {
std::shared_ptr<Context> context = nullptr;
int workers_num = 0;
};
class ModelPool;
/// \brief The ModelParallelRunner class is used to define a MindSpore ModelParallelRunner, facilitating Model
/// management.
@ -38,12 +39,9 @@ class MS_API ModelParallelRunner {
///
/// \param[in] model_path Define the model path.
/// \param[in] runner_config Define the config used to store options during model pool init.
/// \param[in] dec_key Define the key used to decrypt the ciphertext model. The key length is 16, 24, or 32.
/// \param[in] dec_mode Define the decryption mode. Options: AES-GCM, AES-CBC.
///
/// \return Status.
Status Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config = nullptr,
const Key &dec_key = {}, const std::string &dec_mode = kDecModeAesGcm);
Status Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config = nullptr);
/// \brief Obtains all input tensors information of the model.
///
@ -65,6 +63,9 @@ class MS_API ModelParallelRunner {
/// \return Status.
Status Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before = nullptr, const MSKernelCallBack &after = nullptr);
private:
std::shared_ptr<ModelPool> model_pool_ = nullptr;
};
} // namespace mindspore
#endif // MINDSPORE_INCLUDE_API_MODEL_PARALLEL_RUNNER_H

View File

@ -36,6 +36,10 @@ set(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY BOTH)
set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE BOTH)
set(CMAKE_FIND_ROOT_PATH_MODE_PACKAGE BOTH)
if(DEFINED ENV{MSLITE_ENABLE_SERVER_INFERENCE})
set(MSLITE_ENABLE_SERVER_INFERENCE $ENV{MSLITE_ENABLE_SERVER_INFERENCE})
endif()
if(ENABLE_VERBOSE)
set(CMAKE_VERBOSE_MAKEFILE on)
endif()
@ -79,10 +83,11 @@ set(JNI_SRC
${NEW_NATIVE_DIR}/ms_context.cpp
${NEW_NATIVE_DIR}/ms_tensor.cpp
${NEW_NATIVE_DIR}/version.cpp
)
)
if(MSLITE_ENABLE_SERVER_INFERENCE)
set(JNI_TRAIN_SRC
set(JNI_SRC
${JNI_SRC}
${NEW_NATIVE_DIR}/runner_config.cpp
${NEW_NATIVE_DIR}/model_parallel_runner.cpp
)
@ -102,9 +107,9 @@ endif()
if(SUPPORT_TRAIN)
set(LITE_TRAIN_SO_NAME mindspore-lite-train)
set(JNI_TRAIN_SRC
${CMAKE_CURRENT_SOURCE_DIR}/runtime/train_session.cpp
${NEW_NATIVE_DIR}/train_config.cpp
)
${CMAKE_CURRENT_SOURCE_DIR}/runtime/train_session.cpp
${NEW_NATIVE_DIR}/train_config.cpp
)
add_library(mindspore-lite-train-jni SHARED ${JNI_TRAIN_SRC})
if(PLATFORM_ARM64 OR PLATFORM_ARM32)
find_library(log-lib log)

View File

@ -55,6 +55,14 @@ public class RunnerConfig {
return this.runnerConfigPtr != 0L;
}
/**
* Set workers num
*
* @param workersNum The number of parallel models.
*/
public void setWorkersNum(int workersNum) {
setWorkersNum(runnerConfigPtr, workersNum);
}
/**
* Get RunnerConfig pointer.
@ -67,4 +75,6 @@ public class RunnerConfig {
private native long createRunnerConfig(long msContextPtr);
private native void setWorkersNum(long runnerConfigPtr, int workersNum);
}

View File

@ -38,6 +38,17 @@ extern "C" JNIEXPORT jlong JNICALL Java_com_mindspore_config_RunnerConfig_create
return (jlong) nullptr;
}
context.reset(c_context_ptr);
runner_config->model_ctx = context;
runner_config->context = context;
return (jlong)runner_config;
}
extern "C" JNIEXPORT void JNICALL Java_com_mindspore_config_RunnerConfig_setWorkersNum(JNIEnv *env, jobject thiz,
jstring runner_config_ptr,
jint workers_num) {
auto *pointer = reinterpret_cast<mindspore::RunnerConfig *>(runner_config_ptr);
if (pointer == nullptr) {
MS_LOGE("runner config pointer from java is nullptr");
return;
}
pointer->workers_num = workers_num;
}

View File

@ -18,9 +18,13 @@
#include "src/common/log.h"
namespace mindspore {
Status ModelParallelRunner::Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config,
const Key &dec_key, const std::string &dec_mode) {
auto status = ModelPool::GetInstance()->Init(model_path, runner_config, dec_key, dec_mode);
Status ModelParallelRunner::Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config) {
model_pool_ = std::make_shared<ModelPool>();
if (model_pool_ == nullptr) {
MS_LOG(ERROR) << "model pool is nullptr.";
return kLiteNullptr;
}
auto status = model_pool_->Init(model_path, runner_config);
if (status != kSuccess) {
MS_LOG(ERROR) << "model runner init failed.";
return kLiteError;
@ -28,15 +32,9 @@ Status ModelParallelRunner::Init(const std::string &model_path, const std::share
return status;
}
std::vector<MSTensor> ModelParallelRunner::GetInputs() {
auto inputs = ModelPool::GetInstance()->GetInputs();
return inputs;
}
std::vector<MSTensor> ModelParallelRunner::GetInputs() { return model_pool_->GetInputs(); }
std::vector<MSTensor> ModelParallelRunner::GetOutputs() {
auto outputs = ModelPool::GetInstance()->GetOutputs();
return outputs;
}
std::vector<MSTensor> ModelParallelRunner::GetOutputs() { return model_pool_->GetOutputs(); }
Status ModelParallelRunner::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after) {
@ -44,7 +42,7 @@ Status ModelParallelRunner::Predict(const std::vector<MSTensor> &inputs, std::ve
MS_LOG(ERROR) << "predict output is nullptr.";
return kLiteNullptr;
}
auto status = ModelPool::GetInstance()->Predict(inputs, outputs, before, after);
auto status = model_pool_->Predict(inputs, outputs, before, after);
if (status != kSuccess) {
MS_LOG(ERROR) << "model runner predict failed.";
return kLiteError;

View File

@ -28,6 +28,7 @@
namespace mindspore {
namespace {
constexpr int32_t kNumThreads = 4;
constexpr int kNumDeviceInfo = 2;
int GetCoreNum() {
int core_num = 1;
#if defined(_MSC_VER) || defined(_WIN32)
@ -39,34 +40,34 @@ int GetCoreNum() {
#endif
return core_num;
}
void SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num, int node_id) {
if (MS_UNLIKELY(thread_num == 0)) {
MS_LOG(ERROR) << "thread num is zero.";
return;
}
std::vector<int> cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(node_id);
auto cpu_num = cpu_list.size();
if (cpu_num == 0) {
return;
}
std::vector<int> bind_id;
bind_id.reserve(thread_num);
all_model_bind_list->reserve(cpu_num / thread_num + 1);
bind_id.emplace_back(cpu_list[0]);
for (size_t i = 1; i < cpu_num; ++i) {
if (i % thread_num == 0) {
all_model_bind_list->emplace_back(bind_id);
bind_id.clear();
}
bind_id.emplace_back(cpu_list[i]);
}
if (!bind_id.empty()) {
all_model_bind_list->emplace_back(bind_id);
}
}
} // namespace
Status ModelPool::SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num) {
if (thread_num == 0) {
MS_LOG(ERROR) << "thread num is zero.";
return kLiteError;
}
if (thread_num * static_cast<int>(workers_num_) > GetCoreNum()) {
MS_LOG(ERROR) << "thread num or worker num is wrong ,not support param.";
return kLiteNotSupport;
}
for (size_t i = 0; i < workers_num_;) {
std::vector<int> cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(used_numa_node_num_);
if (static_cast<int>(cpu_list.size()) < thread_num) {
MS_LOG(ERROR) << "one numa node do not have enough cpu core for bind thread.";
return kLiteError;
}
for (size_t j = 0; j < cpu_list.size() / thread_num; j++) {
std::vector<int> bind_id;
bind_id.insert(bind_id.begin(), cpu_list.begin() + j * thread_num, cpu_list.begin() + (j + 1) * thread_num);
all_model_bind_list->push_back(bind_id);
i++;
}
used_numa_node_num_++;
}
return kSuccess;
}
void ModelPool::SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num) {
if (thread_num == 0) {
MS_LOG(ERROR) << "thread num is zero.";
@ -74,7 +75,7 @@ void ModelPool::SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_li
}
int core_num = GetCoreNum();
int core_id = 0;
for (size_t i = 0; i < num_models_; i++) {
for (size_t i = 0; i < workers_num_; i++) {
std::vector<int> bind_id;
for (int j = 0; j < thread_num; j++) {
if (core_id >= core_num) {
@ -87,9 +88,90 @@ void ModelPool::SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_li
}
}
ModelPool *ModelPool::GetInstance() {
static ModelPool instance;
return &instance;
Status ModelPool::SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Context> &context) {
if (use_numa_bind_mode_) {
// now only supports the same number of cores per numa node
// do not use if there are extra cores
int one_numa_node_cpu_size = numa::NUMAAdapter::GetInstance()->GetCPUList(0).size();
if (context->GetThreadNum() > one_numa_node_cpu_size) {
MS_LOG(ERROR) << "thread num more than numa node cpu cores.";
return kLiteError;
} else {
workers_num_ = one_numa_node_cpu_size / static_cast<int>(context->GetThreadNum()) * numa_node_num_;
}
} else {
// each model binds all kernels in order
workers_num_ = GetCoreNum() / static_cast<int>(context->GetThreadNum());
}
return kSuccess;
}
Status ModelPool::InitDefaultContext(const std::shared_ptr<mindspore::Context> &context) {
MS_LOG(DEBUG) << "use default config.";
context->SetThreadNum(kNumThreads);
context->SetEnableParallel(true);
context->SetThreadAffinity(lite::HIGHER_CPU);
auto &device_list = context->MutableDeviceInfo();
auto device_info = std::make_shared<CPUDeviceInfo>();
if (device_info == nullptr) {
MS_LOG(ERROR) << "device_info is nullptr.";
return kLiteNullptr;
}
device_info->SetEnableFP16(false);
device_list.push_back(device_info);
// set model num
auto status = SetDefaultOptimalModelNum(context);
if (status != kSuccess) {
MS_LOG(ERROR) << "SetDefaultOptimalModelNum failed.";
return kLiteError;
}
return kSuccess;
}
std::shared_ptr<Context> ModelPool::InitUserDefineContext(const std::shared_ptr<RunnerConfig> &runner_config) {
auto context = runner_config->context;
if (context == nullptr) {
MS_LOG(ERROR) << "user set config context nullptr.";
return nullptr;
}
auto device_list = context->MutableDeviceInfo();
if (device_list.size() > kNumDeviceInfo) {
MS_LOG(ERROR) << "model pool only support device CPU or GPU.";
return nullptr;
}
for (size_t i = 0; i < device_list.size(); i++) {
auto device = device_list[i];
if (device->GetDeviceType() != kCPU && device->GetDeviceType() != kGPU) {
MS_LOG(ERROR) << "model pool only support cpu or gpu type.";
return nullptr;
}
if (device->GetDeviceType() == kGPU) {
workers_num_ = 1;
return context;
} else if (device->GetDeviceType() == kCPU) {
auto cpu_context = device->Cast<CPUDeviceInfo>();
auto enable_fp16 = cpu_context->GetEnableFP16();
if (enable_fp16) {
MS_LOG(ERROR) << "model pool not support enable fp16.";
return nullptr;
}
if (runner_config->workers_num == 0) {
// the user does not define the number of models, the default optimal number of models is used
auto status = SetDefaultOptimalModelNum(context);
if (status != kSuccess) {
MS_LOG(ERROR) << "SetDefaultOptimalModelNum failed.";
return nullptr;
}
} else {
// User defined number of models
workers_num_ = runner_config->workers_num;
}
} else {
MS_LOG(ERROR) << "not support device: " << device->GetDeviceType();
return nullptr;
}
}
return context;
}
std::shared_ptr<Context> ModelPool::InitContext(const std::shared_ptr<RunnerConfig> &runner_config) {
@ -99,44 +181,36 @@ std::shared_ptr<Context> ModelPool::InitContext(const std::shared_ptr<RunnerConf
return nullptr;
}
if (runner_config != nullptr) {
model_context = runner_config->context;
auto device_list = model_context->MutableDeviceInfo();
if (device_list.size() != 1) {
MS_LOG(ERROR) << "model pool only support device num 1.";
return nullptr;
}
auto device = device_list.front();
if (device->GetDeviceType() != kCPU && device->GetDeviceType() != kGPU) {
MS_LOG(ERROR) << "model pool only support cpu or gpu type.";
return nullptr;
}
auto cpu_context = device->Cast<CPUDeviceInfo>();
auto enable_fp16 = cpu_context->GetEnableFP16();
if (enable_fp16) {
MS_LOG(ERROR) << "model pool not support enable fp16.";
return nullptr;
}
if (device->GetDeviceType() == kGPU) {
num_models_ = 1;
} else if (runner_config->workers_num == 0) {
num_models_ = GetCoreNum() / static_cast<int>(model_context->GetThreadNum());
} else {
num_models_ = runner_config->workers_num;
}
use_numa_bind_mode_ = numa::NUMAAdapter::GetInstance()->Available() &&
runner_config->context->GetThreadAffinityMode() == lite::HIGHER_CPU;
numa_node_num_ = numa::NUMAAdapter::GetInstance()->NodesNum();
model_context = InitUserDefineContext(runner_config);
} else {
MS_LOG(DEBUG) << "use default config.";
model_context->SetThreadNum(kNumThreads);
model_context->SetEnableParallel(true);
model_context->SetThreadAffinity(lite::HIGHER_CPU);
auto &device_list = model_context->MutableDeviceInfo();
auto device_info = std::make_shared<CPUDeviceInfo>();
device_info->SetEnableFP16(false);
device_list.push_back(device_info);
use_numa_bind_mode_ = numa::NUMAAdapter::GetInstance()->Available();
numa_node_num_ = numa::NUMAAdapter::GetInstance()->NodesNum();
auto status = InitDefaultContext(model_context);
if (status != kSuccess) {
MS_LOG(ERROR) << "use default context failed.";
return nullptr;
}
}
return model_context;
}
Status ModelPool::SetModelBindMode(std::vector<std::vector<int>> *all_model_bind_list,
std::shared_ptr<Context> model_context) {
if (numa::NUMAAdapter::GetInstance()->Available()) {
auto status = SetNumaBindStrategy(all_model_bind_list, static_cast<int>(model_context->GetThreadNum()));
if (status != kSuccess) {
MS_LOG(ERROR) << "SetNumaBindStrategy failed.";
return kLiteError;
}
} else {
SetBindStrategy(all_model_bind_list, static_cast<int>(model_context->GetThreadNum()));
}
return kSuccess;
}
ModelPoolContex ModelPool::CreateModelContext(const std::shared_ptr<RunnerConfig> &runner_config) {
auto model_context = InitContext(runner_config);
if (model_context == nullptr) {
@ -147,27 +221,24 @@ ModelPoolContex ModelPool::CreateModelContext(const std::shared_ptr<RunnerConfig
MS_LOG(ERROR) << "Invalid thread num " << model_context->GetThreadNum();
return {};
}
int node_id = -1;
if (numa::NUMAAdapter::GetInstance()->Available()) {
node_id = 0;
num_models_ =
numa::NUMAAdapter::GetInstance()->GetCPUList(node_id).size() / static_cast<int>(model_context->GetThreadNum());
} else {
num_models_ = GetCoreNum() / static_cast<int>(model_context->GetThreadNum());
auto device_num = model_context->MutableDeviceInfo().size();
if (device_num > 1) {
used_numa_node_num_ = 1;
return {model_context};
}
ModelPoolContex model_pool_context;
std::vector<std::vector<int>> all_model_bind_list;
if (model_context->GetThreadAffinityMode() == lite::HIGHER_CPU) {
if (numa::NUMAAdapter::GetInstance()->Available()) {
SetNumaBindStrategy(&all_model_bind_list, static_cast<int>(model_context->GetThreadNum()), node_id);
} else {
SetBindStrategy(&all_model_bind_list, static_cast<int>(model_context->GetThreadNum()));
auto status = SetModelBindMode(&all_model_bind_list, model_context);
if (status != kSuccess) {
MS_LOG(ERROR) << "SetModelBindMode failed.";
return {};
}
} else if (model_context->GetThreadAffinityMode() == lite::MID_CPU) {
MS_LOG(ERROR) << "not support bind MID_CPU.";
return {};
}
for (size_t i = 0; i < num_models_; i++) {
for (size_t i = 0; i < workers_num_; i++) {
auto context = std::make_shared<Context>();
if (context == nullptr) {
MS_LOG(ERROR) << "New Context failed.";
@ -184,6 +255,10 @@ ModelPoolContex ModelPool::CreateModelContext(const std::shared_ptr<RunnerConfig
}
auto &new_device_list = context->MutableDeviceInfo();
std::shared_ptr<CPUDeviceInfo> device_info = std::make_shared<CPUDeviceInfo>();
if (device_info == nullptr) {
MS_LOG(ERROR) << "device_info is nullptr.";
return {};
}
device_info->SetEnableFP16(false);
new_device_list.push_back(device_info);
model_pool_context.push_back(context);
@ -207,13 +282,22 @@ std::vector<MSTensor> ModelPool::GetOutputs() {
return model_outputs_;
}
Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config,
const Key &dec_key, const std::string &dec_mode) {
Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config) {
predict_task_queue_ = std::make_shared<PredictTaskQueue>();
if (predict_task_queue_ == nullptr) {
MS_LOG(ERROR) << "create PredictTaskQueue failed, predict task queue is nullptr.";
return kLiteNullptr;
}
auto model_pool_context = CreateModelContext(runner_config);
if (model_pool_context.empty()) {
MS_LOG(ERROR) << "CreateModelContext failed, context is empty.";
return kLiteError;
}
if (use_numa_bind_mode_) {
predict_task_queue_->SetTaskQueueNum(used_numa_node_num_);
} else {
predict_task_queue_->SetTaskQueueNum(1);
}
size_t size = 0;
if (graph_buf_ != nullptr) {
delete[] graph_buf_;
@ -229,23 +313,32 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
MS_LOG(ERROR) << "InitWeightManagerByBuf failed.";
return kLiteError;
}
std::shared_ptr<ModelThread> model_thread = nullptr;
int node_id = -1;
if (numa::NUMAAdapter::GetInstance()->Available()) {
node_id = 0;
}
for (size_t i = 0; i < num_models_; i++) {
model_thread = std::make_shared<ModelThread>();
auto status = model_thread->Init(graph_buf_, size, model_pool_context[i], dec_key, dec_mode, node_id);
std::shared_ptr<ModelWorker> model_worker = nullptr;
for (size_t i = 0; i < workers_num_; i++) {
int numa_node_id = 0;
if (use_numa_bind_mode_ && GetCoreNum() / model_pool_context[i]->GetThreadNum() < numa_node_num_) {
numa_node_id = i;
} else if (use_numa_bind_mode_ && numa_node_num_ != 0) {
numa_node_id = i / (GetCoreNum() / model_pool_context[i]->GetThreadNum() / numa_node_num_);
} else {
numa_node_id = 0;
}
model_worker = std::make_shared<ModelWorker>();
if (model_worker == nullptr) {
MS_LOG(ERROR) << "model worker is nullptr.";
return kLiteError;
}
auto status = model_worker->Init(graph_buf_, size, model_pool_context[i], numa_node_id);
if (status != kSuccess) {
MS_LOG(ERROR) << " model thread init failed.";
return kLiteError;
}
model_thread_vec_.push_back(std::thread(&ModelThread::Run, model_thread));
predict_task_queue_->IncreaseWaitModelNum(1, numa_node_id);
model_worker_vec_.push_back(std::thread(&ModelWorker::Run, model_worker, numa_node_id, predict_task_queue_));
}
if (model_thread != nullptr) {
model_inputs_ = model_thread->GetInputs();
model_outputs_ = model_thread->GetOutputs();
if (model_worker != nullptr) {
model_inputs_ = model_worker->GetInputs();
model_outputs_ = model_worker->GetOutputs();
}
return kSuccess;
}
@ -418,14 +511,30 @@ Status ModelPool::FreeSplitTensor(std::vector<std::vector<MSTensor>> *new_inputs
return kSuccess;
}
void ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num) {
*max_wait_worker_node_id = 0;
*max_wait_worker_num = predict_task_queue_->GetWaitModelNum(0);
for (int i = 1; i < used_numa_node_num_; i++) {
int worker_num = predict_task_queue_->GetWaitModelNum(i);
if (*max_wait_worker_num < worker_num) {
*max_wait_worker_num = worker_num;
*max_wait_worker_node_id = i;
}
}
}
Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after) {
mtx_split_task_.lock();
auto wait_model_num = PredictTaskQueue::GetInstance()->GetWaitModelNum();
int max_wait_worker_node_id = 0;
int max_wait_worker_num = 0;
GetMaxWaitWorkerNum(&max_wait_worker_node_id, &max_wait_worker_num);
auto batch = inputs[0].Shape()[0];
if (PredictTaskQueue::GetInstance()->GetTaskNum() == 0 && wait_model_num > 1 && batch >= wait_model_num) {
size_t batch_split_num = PredictTaskQueue::GetInstance()->GetWaitModelNum();
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(batch_split_num);
if (predict_task_queue_->GetTaskNum(max_wait_worker_node_id) == 0 && max_wait_worker_num > 1 &&
batch >= max_wait_worker_num) {
size_t batch_split_num = predict_task_queue_->GetWaitModelNum(max_wait_worker_node_id);
predict_task_queue_->DecreaseWaitModelNum(batch_split_num, max_wait_worker_node_id);
std::vector<std::vector<MSTensor>> new_inputs;
std::vector<std::vector<MSTensor>> new_outputs;
auto status = SplitInputTensorByBatch(inputs, &new_inputs, batch_split_num);
@ -442,12 +551,16 @@ Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTen
std::vector<std::shared_ptr<PredictTask>> tasks;
for (size_t i = 0; i < batch_split_num; i++) {
auto predict_task = std::make_shared<PredictTask>(&new_inputs[i], &new_outputs.at(i), before, after);
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task);
if (predict_task == nullptr) {
MS_LOG(ERROR) << "predict task is nullptr.";
return kLiteNullptr;
}
predict_task_queue_->PushPredictTask(predict_task, max_wait_worker_node_id);
tasks.push_back(predict_task);
}
mtx_split_task_.unlock();
for (size_t i = 0; i < batch_split_num; i++) {
PredictTaskQueue::GetInstance()->WaitUntilPredictActive(tasks[i]);
predict_task_queue_->WaitUntilPredictActive(tasks[i]);
}
status = ConcatPredictOutput(&new_outputs, outputs);
if (status != kSuccess) {
@ -459,27 +572,32 @@ Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTen
MS_LOG(ERROR) << "free split tensor failed.";
return kLiteError;
}
predict_task_queue_->IncreaseWaitModelNum(batch_split_num, max_wait_worker_node_id);
} else {
if (wait_model_num == 1) {
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(1);
}
predict_task_queue_->DecreaseWaitModelNum(1, max_wait_worker_node_id);
auto predict_task = std::make_shared<PredictTask>(&inputs, outputs, before, after);
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task);
if (predict_task == nullptr) {
MS_LOG(ERROR) << "predict_task is nullptr.";
return kLiteNullptr;
}
predict_task_queue_->PushPredictTask(predict_task, max_wait_worker_node_id);
mtx_split_task_.unlock();
PredictTaskQueue::GetInstance()->WaitUntilPredictActive(predict_task);
predict_task_queue_->WaitUntilPredictActive(predict_task);
predict_task_queue_->IncreaseWaitModelNum(1, max_wait_worker_node_id);
}
return kSuccess;
}
ModelPool::~ModelPool() {
if (graph_buf_ != nullptr) {
delete[] graph_buf_;
graph_buf_ = nullptr;
}
for (auto &th : model_thread_vec_) {
predict_task_queue_->SetPredictTaskDone();
for (auto &th : model_worker_vec_) {
if (th.joinable()) {
th.join();
}
}
if (graph_buf_ != nullptr) {
delete[] graph_buf_;
graph_buf_ = nullptr;
}
}
} // namespace mindspore

View File

@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_INCLUDE_API_MODEL_POOL_MODEL_POOL_H
#define MINDSPORE_INCLUDE_API_MODEL_POOL_MODEL_POOL_H
#ifndef MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_POOL_H_
#define MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_POOL_H_
#include <vector>
#include <memory>
#include <utility>
@ -27,13 +27,15 @@
#include "src/cxx_api/model_pool/model_worker.h"
#include "src/cxx_api/model_pool/predict_task_queue.h"
namespace mindspore {
using ModelPoolContex = std::vector<std::shared_ptr<Context>>;
class ModelPool {
public:
static ModelPool *GetInstance();
ModelPool() = default;
~ModelPool();
Status Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config = nullptr,
const Key &dec_key = {}, const std::string &dec_mode = kDecModeAesGcm);
Status Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config = nullptr);
std::vector<MSTensor> GetInputs();
@ -43,10 +45,17 @@ class ModelPool {
const MSKernelCallBack &before = nullptr, const MSKernelCallBack &after = nullptr);
private:
ModelPool() = default;
void SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num);
ModelPoolContex CreateModelContext(const std::shared_ptr<RunnerConfig> &runner_config);
std::shared_ptr<Context> InitContext(const std::shared_ptr<RunnerConfig> &runner_config);
Status InitDefaultContext(const std::shared_ptr<mindspore::Context> &context);
std::shared_ptr<Context> InitUserDefineContext(const std::shared_ptr<RunnerConfig> &runner_config);
Status SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Context> &context);
Status SetModelBindMode(std::vector<std::vector<int>> *all_model_bind_list, std::shared_ptr<Context> model_context);
Status SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num);
void SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num);
Status SplitInputTensorByBatch(const std::vector<MSTensor> &inputs, std::vector<std::vector<MSTensor>> *new_inputs,
size_t batch_split_num);
Status SplitOutputTensorByBatch(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs,
@ -54,14 +63,21 @@ class ModelPool {
Status ConcatPredictOutput(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs);
Status FreeSplitTensor(std::vector<std::vector<MSTensor>> *new_inputs,
std::vector<std::vector<MSTensor>> *new_outputs);
void GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num);
std::vector<std::thread> model_thread_vec_;
std::vector<std::thread> model_worker_vec_;
std::vector<std::shared_ptr<ModelWorker>> model_workers_;
std::vector<MSTensor> model_inputs_;
std::vector<MSTensor> model_outputs_;
char *graph_buf_ = nullptr;
size_t num_models_ = 10;
size_t workers_num_ = 1;
std::mutex mtx_split_task_;
bool is_user_data_ = false;
int numa_node_num_ = 1;
int used_numa_node_num_ = 0;
bool use_numa_bind_mode_ = false;
bool use_gpu_ = false;
std::shared_ptr<PredictTaskQueue> predict_task_queue_ = nullptr;
};
} // namespace mindspore
#endif // MINDSPORE_INCLUDE_API_MODEL_POOL_MODEL_POOL_H
#endif // MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_POOL_H_

View File

@ -17,11 +17,10 @@
#include "src/common/log.h"
#include "src/common/utils.h"
#include "src/common/common.h"
namespace mindspore {
void ModelThread::Run() {
while (!PredictTaskQueue::GetInstance()->IsPredictTaskDone()) {
auto task = PredictTaskQueue::GetInstance()->GetPredictTask();
void ModelWorker::Run(int node_id, const std::shared_ptr<PredictTaskQueue> &predict_task_queue) {
while (!predict_task_queue->IsPredictTaskDone()) {
auto task = predict_task_queue->GetPredictTask(node_id);
if (task == nullptr) {
break;
}
@ -33,10 +32,10 @@ void ModelThread::Run() {
if (status != kSuccess) {
MS_LOG(ERROR) << "model predict failed.";
task->ready = true;
PredictTaskQueue::GetInstance()->ActiveTask();
predict_task_queue->ActiveTask();
continue;
}
if (is_copy_output_) {
if (need_copy_output_) {
std::vector<MSTensor> new_outputs;
auto output_size = outputs->size();
for (size_t i = 0; i < output_size; i++) {
@ -46,7 +45,7 @@ void ModelThread::Run() {
if (copy_tensor == nullptr) {
MS_LOG(ERROR) << "model thread copy output tensor failed.";
task->ready = true;
PredictTaskQueue::GetInstance()->ActiveTask();
predict_task_queue->ActiveTask();
continue;
}
new_outputs.push_back(*copy_tensor);
@ -56,18 +55,18 @@ void ModelThread::Run() {
outputs->insert(outputs->end(), new_outputs.begin(), new_outputs.end());
}
task->ready = true;
PredictTaskQueue::GetInstance()->ActiveTask();
predict_task_queue->ActiveTask();
}
}
Status ModelThread::Init(const char *model_buf, size_t size, const std::shared_ptr<Context> &model_context,
const Key &dec_key, const std::string &dec_mode, int node_id) {
Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_ptr<Context> &model_context,
int node_id) {
model_ = std::make_shared<Model>();
mindspore::ModelType model_type = kMindIR;
if (node_id != -1) {
model_->UpdateConfig(lite::kConfigServerInference, {lite::kConfigNUMANodeId, std::to_string(node_id)});
}
auto status = model_->Build(model_buf, size, model_type, model_context, dec_key, dec_mode);
auto status = model_->Build(model_buf, size, model_type, model_context);
if (status != kSuccess) {
MS_LOG(ERROR) << "model build failed in ModelPool Init";
return status;
@ -75,25 +74,25 @@ Status ModelThread::Init(const char *model_buf, size_t size, const std::shared_p
return kSuccess;
}
std::vector<MSTensor> ModelThread::GetInputs() {
std::vector<MSTensor> ModelWorker::GetInputs() {
if (model_ == nullptr) {
MS_LOG(ERROR) << "model is nullptr in ModelThread.";
MS_LOG(ERROR) << "model is nullptr in model worker.";
return {};
}
auto inputs = model_->GetInputs();
return inputs;
}
std::vector<MSTensor> ModelThread::GetOutputs() {
std::vector<MSTensor> ModelWorker::GetOutputs() {
if (model_ == nullptr) {
MS_LOG(ERROR) << "model is nullptr in ModelThread.";
MS_LOG(ERROR) << "model is nullptr in model worker.";
return {};
}
auto outputs = model_->GetOutputs();
return outputs;
}
std::pair<std::vector<std::vector<int64_t>>, bool> ModelThread::GetModelResize(
std::pair<std::vector<std::vector<int64_t>>, bool> ModelWorker::GetModelResize(
const std::vector<MSTensor> &model_inputs, const std::vector<MSTensor> &inputs) {
std::unique_lock<std::mutex> model_lock(mtx_model_);
std::vector<std::vector<int64_t>> dims;
@ -109,9 +108,8 @@ std::pair<std::vector<std::vector<int64_t>>, bool> ModelThread::GetModelResize(
return std::make_pair(dims, need_resize);
}
Status ModelThread::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
Status ModelWorker::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after) {
// model
auto model_input = model_->GetInputs();
if (model_input.size() != inputs.size()) {
MS_LOG(ERROR) << "model input size is: " << model_input.size() << ", but get input size is: " << inputs.size();
@ -132,7 +130,7 @@ Status ModelThread::Predict(const std::vector<MSTensor> &inputs, std::vector<MST
/* user set graph-output-tensor from outside */
model_output[i].SetData(outputs->at(i).MutableData());
model_output[i].SetAllocator(nullptr);
is_copy_output_ = false;
need_copy_output_ = false;
}
}
auto status = model_->Predict(inputs, &model_output, before, after);
@ -140,7 +138,7 @@ Status ModelThread::Predict(const std::vector<MSTensor> &inputs, std::vector<MST
MS_LOG(ERROR) << "model predict failed.";
return status;
}
if (is_copy_output_) {
if (need_copy_output_) {
outputs->clear();
outputs->insert(outputs->end(), model_output.begin(), model_output.end());
} else {

View File

@ -14,8 +14,8 @@
* limitations under the License.
*/
#ifndef MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_THREAD_H_
#define MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_THREAD_H_
#ifndef MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_WORKER_H_
#define MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_WORKER_H_
#include <queue>
#include <string>
#include <mutex>
@ -26,17 +26,13 @@
#include "include/api/model.h"
#include "src/cxx_api/model_pool/predict_task_queue.h"
namespace mindspore {
using ModelPoolContex = std::vector<std::shared_ptr<Context>>;
class ModelThread {
class ModelWorker {
public:
ModelThread() = default;
ModelWorker() = default;
~ModelThread() = default;
~ModelWorker() = default;
// the model pool is initialized once and can always accept model run requests
Status Init(const char *model_buf, size_t size, const std::shared_ptr<Context> &model_context,
const Key &dec_key = {}, const std::string &dec_mode = kDecModeAesGcm, int node_id = -1);
Status Init(const char *model_buf, size_t size, const std::shared_ptr<Context> &model_context, int node_id = -1);
std::vector<MSTensor> GetInputs();
@ -45,7 +41,7 @@ class ModelThread {
Status Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before = nullptr, const MSKernelCallBack &after = nullptr);
void Run();
void Run(int node_id, const std::shared_ptr<PredictTaskQueue> &predict_task_queue);
private:
std::pair<std::vector<std::vector<int64_t>>, bool> GetModelResize(const std::vector<MSTensor> &model_inputs,
@ -54,11 +50,7 @@ class ModelThread {
private:
std::shared_ptr<mindspore::Model> model_ = nullptr;
std::mutex mtx_model_;
std::condition_variable model_cond_;
// num thread is configured according to the hardware
int num_models_;
bool is_copy_output_ = true;
bool need_copy_output_ = true;
};
} // namespace mindspore
#endif // MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_THREAD_H_
#endif // MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_MODEL_WORKER_H_

View File

@ -16,12 +16,17 @@
#include "src/cxx_api/model_pool/predict_task_queue.h"
namespace mindspore {
PredictTaskQueue::~PredictTaskQueue() {
void PredictTaskQueue::SetPredictTaskDone() {
predict_task_done_ = true;
task_push_cond_.notify_all();
}
void PredictTaskQueue::WaitUntilPredictActive(std::shared_ptr<PredictTask> task) {
void PredictTaskQueue::SetTaskQueueNum(int num) {
predict_task_.resize(num);
waite_worker_num_.resize(num, 0);
}
void PredictTaskQueue::WaitUntilPredictActive(const std::shared_ptr<PredictTask> &task) {
std::unique_lock<std::mutex> result_lock(mtx_predict_task_);
while (!task->ready) {
task_pop_cond_.wait(result_lock);
@ -31,33 +36,27 @@ void PredictTaskQueue::WaitUntilPredictActive(std::shared_ptr<PredictTask> task)
void PredictTaskQueue::ActiveTask() { task_pop_cond_.notify_all(); }
PredictTaskQueue *PredictTaskQueue::GetInstance() {
static PredictTaskQueue instance;
return &instance;
void PredictTaskQueue::PushPredictTask(std::shared_ptr<PredictTask> task, int node_id) {
std::unique_lock<std::mutex> task_lock(mtx_predict_task_);
predict_task_.at(node_id).push(task);
task_push_cond_.notify_all();
}
void PredictTaskQueue::PushPredictTask(std::shared_ptr<PredictTask> task) {
std::shared_ptr<PredictTask> PredictTaskQueue::GetPredictTask(int node_id) {
std::unique_lock<std::mutex> task_lock(mtx_predict_task_);
predict_task_.push(task);
task_push_cond_.notify_one();
}
std::shared_ptr<PredictTask> PredictTaskQueue::GetPredictTask() {
std::unique_lock<std::mutex> task_lock(mtx_predict_task_);
while (predict_task_.empty() && !predict_task_done_) {
waite_model_num_++;
while (predict_task_.at(node_id).empty() && !predict_task_done_) {
task_push_cond_.wait(task_lock);
}
if (predict_task_done_) {
return nullptr;
}
auto predict_task = predict_task_.front();
predict_task_.pop();
auto predict_task = predict_task_.at(node_id).front();
predict_task_.at(node_id).pop();
return predict_task;
}
int PredictTaskQueue::GetTaskNum() {
int PredictTaskQueue::GetTaskNum(int node_id) {
std::unique_lock<std::mutex> task_lock(mtx_predict_task_);
return predict_task_.size();
return predict_task_.at(node_id).size();
}
} // namespace mindspore

View File

@ -37,23 +37,25 @@ struct PredictTask {
class PredictTaskQueue {
public:
static PredictTaskQueue *GetInstance();
~PredictTaskQueue();
PredictTaskQueue() = default;
~PredictTaskQueue() = default;
void PushPredictTask(std::shared_ptr<PredictTask> task);
void WaitUntilPredictActive(std::shared_ptr<PredictTask> task);
std::shared_ptr<PredictTask> GetPredictTask();
void PushPredictTask(std::shared_ptr<PredictTask> task, int node_id);
void WaitUntilPredictActive(const std::shared_ptr<PredictTask> &task);
std::shared_ptr<PredictTask> GetPredictTask(int node_id);
void ActiveTask();
bool IsPredictTaskDone() { return predict_task_done_; }
int GetTaskNum();
int GetWaitModelNum() { return waite_model_num_; }
void DecreaseWaitModelNum(int num) { waite_model_num_ -= num; }
int GetTaskNum(int node_id);
void SetTaskQueueNum(int num);
bool IsPredictTaskDone() const { return predict_task_done_; }
void SetPredictTaskDone();
int GetWaitModelNum(int node_id) const { return waite_worker_num_.at(node_id); }
void DecreaseWaitModelNum(int num, int node_id) { waite_worker_num_.at(node_id) -= num; }
void IncreaseWaitModelNum(int num, int node_id) { waite_worker_num_.at(node_id) += num; }
private:
PredictTaskQueue() = default;
std::queue<std::shared_ptr<PredictTask>> predict_task_;
int waite_model_num_ = 0;
std::vector<std::queue<std::shared_ptr<PredictTask>>> predict_task_;
std::vector<int> waite_worker_num_;
std::mutex mtx_predict_task_;
std::condition_variable task_pop_cond_;
std::condition_variable task_push_cond_;

View File

@ -13,13 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef SERVER_INFERENCE
#include "src/pack_weight_manager.h"
namespace mindspore::lite {
namespace {
constexpr size_t kMemAliginSize = 64;
constexpr size_t kMemAlignSize = 64;
size_t RoundMemSize(size_t size) { return (size + kMemAliginSize - 1) & (~(kMemAliginSize - 1)); }
size_t RoundMemSize(size_t size) { return (size + kMemAlignSize - 1) & (~(kMemAlignSize - 1)); }
} // namespace
PackWeightManager *PackWeightManager::GetInstance() {
static PackWeightManager instance;
@ -120,9 +119,9 @@ std::pair<PackStatus, void *> PackWeightManager::FindPackedTensor(ModelConstWeig
auto origin_index = weight->origin_data_index[tensor->data()];
void *data = nullptr;
#ifdef _WIN32
data = _aligned_malloc(allocate_size, kMemAlginSize);
data = _aligned_malloc(size, kMemAlignSize);
#else
auto ret = posix_memalign(&data, kMemAliginSize, size);
auto ret = posix_memalign(&data, kMemAlignSize, size);
if (ret != 0) {
MS_LOG(ERROR) << "posix_memalign failed.";
return std::make_pair(MALLOC, nullptr);
@ -198,12 +197,11 @@ void PackWeightManager::FreePackedWeight(ModelConstWeight *weight) {
PackWeightManager::~PackWeightManager() {
for (auto &item : path_model_weight_) {
FreePackedWeight(item.second);
path_model_weight_.erase(item.first);
}
path_model_weight_.clear();
for (auto &item : buf_model_weight_) {
FreePackedWeight(item.second);
buf_model_weight_.erase(item.first);
}
buf_model_weight_.clear();
}
} // namespace mindspore::lite
#endif

View File

@ -57,7 +57,7 @@ class PackWeightManager {
void FreePackedWeight(ModelConstWeight *weight);
std::map<const std::string, ModelConstWeight *> path_model_weight_;
std::map<const std::string, ModelConstWeight *> buf_model_weight_;
std::map<const void *, ModelConstWeight *> buf_model_weight_;
std::map<const std::string, std::vector<const void *>> path_model_buf_;
std::mutex mtx_weight_;
};