add ConfigInfo for C++ api

This commit is contained in:
yefeng 2022-05-10 09:36:22 +08:00
parent 605943ecd0
commit eb05387052
7 changed files with 89 additions and 19 deletions

View File

@ -18,6 +18,7 @@
#include <vector>
#include <memory>
#include <utility>
#include <map>
#include <string>
#include "include/api/status.h"
#include "include/api/context.h"
@ -40,6 +41,16 @@ class RunnerConfig {
/// \param[in] context store environment variables at runtime.
void SetContext(const std::shared_ptr<Context> &context);
/// \brief Set the config before runtime. Only valid for ModelParallelRunner.
///
/// \param[in] config store environment variables before runtime.
void SetConfigInfo(const std::string &key, const std::map<std::string, std::string> &config);
/// \brief Get the current config setting. Only valid for ModelParallelRunner.
///
/// \return The current config setting.
std::map<std::string, std::map<std::string, std::string>> GetConfigInfo() const;
/// \brief Get the current operators parallel workers number setting. Only valid for ModelParallelRunner.
///
/// \return The current operators parallel workers number setting.

View File

@ -19,6 +19,10 @@
#include "src/common/log_adapter.h"
#include "src/cpu_info.h"
namespace mindspore {
namespace {
constexpr size_t kMaxSectionNum = 100;
constexpr size_t kMaxConfigNumPerSection = 1000;
} // namespace
#ifdef USE_GLOG
extern "C" {
extern void mindspore_log_init();
@ -35,7 +39,25 @@ int32_t RunnerConfig::GetWorkersNum() const { return data_->workers_num; }
std::shared_ptr<Context> RunnerConfig::GetContext() const { return data_->context; }
void RunnerConfig::SetConfigInfo(const std::string &section, const std::map<std::string, std::string> &config) {
if (data_->config_info.size() > kMaxSectionNum) {
return;
}
if (config.size() > kMaxConfigNumPerSection) {
return;
}
data_->config_info[section] = config;
return;
}
std::map<std::string, std::map<std::string, std::string>> RunnerConfig::GetConfigInfo() const {
return data_->config_info;
}
Status ModelParallelRunner::Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config) {
#ifdef USE_GLOG
mindspore::mindspore_log_init();
#endif
if (!PlatformInstructionSetSupportCheck()) {
return kLiteNotSupport;
}
@ -44,9 +66,6 @@ Status ModelParallelRunner::Init(const std::string &model_path, const std::share
MS_LOG(ERROR) << "model pool is nullptr.";
return kLiteNullptr;
}
#ifdef USE_GLOG
mindspore::mindspore_log_init();
#endif
auto status = model_pool_->Init(model_path, runner_config);
if (status != kSuccess) {
MS_LOG(ERROR) << "model runner init failed.";

View File

@ -307,6 +307,9 @@ ModelPoolConfig ModelPool::CreateModelPoolConfig(const std::shared_ptr<RunnerCon
device_info->SetAllocator(allocator);
device_info->SetEnableFP16(false);
new_device_list.push_back(device_info);
if (runner_config != nullptr) {
worker_config->config_info = runner_config->GetConfigInfo();
}
worker_config->context = context;
model_pool_config.push_back(worker_config);
}
@ -398,8 +401,7 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
int task_queue_id = numa_node_id != -1 ? numa_node_id : 0;
predict_task_queue_->IncreaseWaitModelNum(1, task_queue_id);
worker_thread_vec_.push_back(std::thread(&ModelWorker::CreateThreadWorker, model_worker, new_model_buf, size,
task_queue_id, model_pool_config[i]->context, predict_task_queue_,
&create_worker_success_));
model_pool_config[i], predict_task_queue_, &create_worker_success_));
all_model_worker_.push_back(model_worker);
}
for (size_t i = 0; i < workers_num_; i++) {
@ -422,6 +424,17 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
return kSuccess;
}
Status ModelPool::UpdateConfig(const std::string &section, const std::pair<std::string, std::string> &config) {
for (auto &worker : all_model_worker_) {
auto status = worker->UpdateConfig(section, config);
if (status != kSuccess) {
MS_LOG(ERROR) << "model pool update config failed, status=" << status;
return status;
}
}
return kSuccess;
}
Status ModelPool::SplitInputTensorByBatch(const std::vector<MSTensor> &inputs,
std::vector<std::vector<MSTensor>> *new_inputs, size_t batch_split_num) {
if (batch_split_num == 0) {

View File

@ -29,10 +29,6 @@
#include "src/cxx_api/model_pool/model_worker.h"
#include "src/cxx_api/model_pool/predict_task_queue.h"
namespace mindspore {
struct WorkerConfig {
std::shared_ptr<Context> context = nullptr;
int numa_id = 0;
};
using ModelPoolConfig = std::vector<std::shared_ptr<WorkerConfig>>;
class ModelPool {
@ -43,6 +39,8 @@ class ModelPool {
Status Init(const std::string &model_path, const std::shared_ptr<RunnerConfig> &runner_config = nullptr);
Status UpdateConfig(const std::string &section, const std::pair<std::string, std::string> &config);
std::vector<MSTensor> GetInputs();
std::vector<MSTensor> GetOutputs();

View File

@ -35,18 +35,20 @@ void ModelWorker::WaitCreateWorkerDone() {
return;
}
void ModelWorker::CreateThreadWorker(const char *model_buf, size_t size, int node_id,
const std::shared_ptr<Context> &model_context,
void ModelWorker::CreateThreadWorker(const char *model_buf, size_t size,
const std::shared_ptr<WorkerConfig> &worker_config,
const std::shared_ptr<PredictTaskQueue> &predict_task_queue,
bool *create_success) {
auto status = Init(model_buf, size, model_context);
auto status = Init(model_buf, size, worker_config);
if (status != kSuccess) {
MS_LOG(ERROR) << "init failed in model worker.";
*create_success = false;
create_work_done_ = true;
create_work_done_condition_.notify_one();
}
Run(node_id, predict_task_queue);
auto numa_node_id = worker_config->numa_id;
int task_queue_id = numa_node_id != -1 ? numa_node_id : 0;
Run(task_queue_id, predict_task_queue);
}
void ModelWorker::Run(int node_id, const std::shared_ptr<PredictTaskQueue> &predict_task_queue) {
@ -106,16 +108,25 @@ Status ModelWorker::ResizeInit() {
return kSuccess;
}
Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_ptr<Context> &model_context) {
Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_ptr<WorkerConfig> &worker_config) {
MS_CHECK_TRUE_MSG(model_buf != nullptr, kLiteError, "model_buf is nullptr in model worker.");
MS_CHECK_TRUE_MSG(model_context != nullptr, kLiteError, "model_context is nullptr in model worker.");
MS_CHECK_TRUE_MSG(worker_config != nullptr, kLiteError, "worker_config is nullptr in model worker.");
model_ = std::make_shared<Model>();
if (model_ == nullptr) {
MS_LOG(ERROR) << "model is nullptr.";
return kLiteNullptr;
}
mindspore::ModelType model_type = kMindIR_Lite;
auto status = model_->Build(model_buf, size, model_type, model_context);
for (auto &section : worker_config->config_info) {
for (auto &config : section.second) {
auto status = model_->UpdateConfig(section.first, std::make_pair(config.first, config.second));
if (status != kSuccess) {
MS_LOG(ERROR) << "Update Config failed, status=" << status;
return status;
}
}
}
auto status = model_->Build(model_buf, size, model_type, worker_config->context);
if (status != kSuccess) {
MS_LOG(ERROR) << "model build failed in ModelPool Init";
return status;
@ -136,6 +147,12 @@ Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_p
return kSuccess;
}
Status ModelWorker::UpdateConfig(const std::string &section, const std::pair<std::string, std::string> &config) {
std::lock_guard<std::mutex> worker_lock(mtx_worker_);
MS_LOG(DEBUG) << "UpdateConfig now.";
return model_->UpdateConfig(section, config);
}
std::vector<MSTensor> ModelWorker::GetInputs() { return origin_worker_inputs_; }
std::vector<MSTensor> ModelWorker::GetOutputs() { return origin_worker_outputs_; }

View File

@ -23,17 +23,27 @@
#include <vector>
#include <utility>
#include <memory>
#include <map>
#include "include/api/model.h"
#include "src/cxx_api/model_pool/predict_task_queue.h"
namespace mindspore {
class PredictTaskQueue;
struct WorkerConfig {
std::map<std::string, std::map<std::string, std::string>> config_info;
std::shared_ptr<Context> context = nullptr;
int numa_id = 0;
};
class ModelWorker {
public:
ModelWorker() = default;
~ModelWorker() = default;
Status Init(const char *model_buf, size_t size, const std::shared_ptr<Context> &model_context);
Status Init(const char *model_buf, size_t size, const std::shared_ptr<WorkerConfig> &worker_config);
Status UpdateConfig(const std::string &section, const std::pair<std::string, std::string> &config);
std::vector<MSTensor> GetInputs();
@ -46,8 +56,7 @@ class ModelWorker {
bool IsAvailable();
void CreateThreadWorker(const char *model_buf, size_t size, int node_id,
const std::shared_ptr<Context> &model_context,
void CreateThreadWorker(const char *model_buf, size_t size, const std::shared_ptr<WorkerConfig> &worker_config,
const std::shared_ptr<PredictTaskQueue> &predict_task_queue, bool *create_success);
private:

View File

@ -16,11 +16,14 @@
#ifndef MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_RUNNER_CONFIG_H_
#define MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_RUNNER_CONFIG_H_
#include <memory>
#include <string>
#include <map>
#include "include/api/model_parallel_runner.h"
namespace mindspore {
struct RunnerConfig::Data {
int workers_num = 0;
std::shared_ptr<Context> context = nullptr;
std::map<std::string, std::map<std::string, std::string>> config_info;
};
} // namespace mindspore
#endif // MINDSPORE_LITE_SRC_CXX_API_MODEL_POOL_RUNNER_CONFIG_H_