!30021 [MS][LITE] model pool fix bug
Merge pull request !30021 from yefeng/227-fix_split_batch_and_getoutput_api-01
This commit is contained in:
commit
5d898f5789
|
@ -173,6 +173,10 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
|
|||
for (size_t i = 0; i < num_models_; i++) {
|
||||
model_thread = std::make_shared<ModelThread>();
|
||||
auto status = model_thread->Init(model_path, model_pool_context[i], dec_key, dec_mode);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << " model thread init failed.";
|
||||
return kLiteError;
|
||||
}
|
||||
model_thread_vec_.push_back(std::thread(&ModelThread::Run, model_thread));
|
||||
}
|
||||
if (model_thread != nullptr) {
|
||||
|
@ -182,20 +186,24 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
|
|||
return kSuccess;
|
||||
}
|
||||
|
||||
Status ModelPool::SplitTensorByBatch(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
|
||||
std::vector<std::vector<MSTensor>> *new_inputs) {
|
||||
Status ModelPool::SplitInputTensorByBatch(const std::vector<MSTensor> &inputs,
|
||||
std::vector<std::vector<MSTensor>> *new_inputs, size_t batch_split_num) {
|
||||
if (batch_split_num == 0) {
|
||||
MS_LOG(ERROR) << "batch_split_num is zero.";
|
||||
return kLiteError;
|
||||
}
|
||||
auto batch = inputs[0].Shape()[0];
|
||||
std::vector<size_t> split_batch;
|
||||
size_t batch_sum = 0;
|
||||
size_t per_batch = batch / batch_split_num_;
|
||||
for (size_t i = 0; i < batch_split_num_ - 1; i++) {
|
||||
size_t per_batch = batch / batch_split_num;
|
||||
for (size_t i = 0; i < batch_split_num - 1; i++) {
|
||||
split_batch.push_back(per_batch);
|
||||
batch_sum += per_batch;
|
||||
}
|
||||
split_batch.push_back(batch - batch_sum);
|
||||
std::vector<std::vector<std::vector<int64_t>>> all_input_shape;
|
||||
std::vector<size_t> input_data_split_size(inputs.size(), 0);
|
||||
for (size_t k = 0; k < batch_split_num_; k++) { // do for batch
|
||||
for (size_t k = 0; k < batch_split_num; k++) { // do for batch
|
||||
std::vector<std::vector<int64_t>> inputs_shape;
|
||||
std::vector<MSTensor> new_inputs_tensor;
|
||||
for (size_t i = 0; i < inputs.size(); i++) { // do for input
|
||||
|
@ -262,6 +270,29 @@ Status ModelPool::SplitTensorByBatch(const std::vector<MSTensor> &inputs, std::v
|
|||
return kSuccess;
|
||||
}
|
||||
|
||||
Status ModelPool::SplitOutputTensorByBatch(std::vector<std::vector<MSTensor>> *new_outputs,
|
||||
std::vector<MSTensor> *outputs, size_t batch_split_num) {
|
||||
if (batch_split_num == 0) {
|
||||
MS_LOG(ERROR) << "batch_split_num is zero.";
|
||||
return kLiteError;
|
||||
}
|
||||
for (size_t i = 0; i < batch_split_num; i++) {
|
||||
std::vector<MSTensor> new_output;
|
||||
for (size_t tensor_num_idx = 0; tensor_num_idx < outputs->size(); tensor_num_idx++) {
|
||||
if (outputs->at(tensor_num_idx).MutableData() != nullptr && outputs->at(tensor_num_idx).DataSize() != 0) {
|
||||
is_user_data_ = true;
|
||||
auto data = reinterpret_cast<float *>(outputs->at(tensor_num_idx).MutableData()) +
|
||||
outputs->at(tensor_num_idx).Shape().at(0) / batch_split_num * i;
|
||||
auto out_tensor =
|
||||
MSTensor(outputs->at(tensor_num_idx).Name(), outputs->at(tensor_num_idx).DataType(), {}, data, 0);
|
||||
new_output.push_back(out_tensor);
|
||||
}
|
||||
}
|
||||
new_outputs->push_back(new_output);
|
||||
}
|
||||
return kSuccess;
|
||||
}
|
||||
|
||||
Status ModelPool::ConcatPredictOutput(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs) {
|
||||
if (outputs->empty()) {
|
||||
MS_LOG(ERROR) << "output is empty";
|
||||
|
@ -282,18 +313,19 @@ Status ModelPool::ConcatPredictOutput(std::vector<std::vector<MSTensor>> *output
|
|||
all_batch_size += outputs->at(batch).at(i).Shape().front();
|
||||
}
|
||||
output_tensor_shape[0] = all_batch_size;
|
||||
if (all_out_data != nullptr) {
|
||||
free(all_out_data);
|
||||
all_out_data = nullptr;
|
||||
if (is_user_data_) {
|
||||
new_outputs->at(i).SetShape(output_tensor_shape);
|
||||
continue;
|
||||
}
|
||||
all_out_data = malloc(all_data_size);
|
||||
auto all_out_data = malloc(all_data_size);
|
||||
if (all_out_data == nullptr) {
|
||||
MS_LOG(ERROR) << "all_out_data is nullptr.";
|
||||
return kLiteError;
|
||||
}
|
||||
for (size_t j = 0; j < batch_split_num_; j++) {
|
||||
for (size_t j = 0; j < outputs->size(); j++) {
|
||||
void *out_data = outputs->at(j)[i].MutableData();
|
||||
if (out_data == nullptr) {
|
||||
free(all_out_data);
|
||||
MS_LOG(ERROR) << "output data is nullptr.";
|
||||
return kLiteError;
|
||||
}
|
||||
|
@ -306,6 +338,10 @@ Status ModelPool::ConcatPredictOutput(std::vector<std::vector<MSTensor>> *output
|
|||
MS_LOG(ERROR) << "create tensor failed.";
|
||||
return kLiteError;
|
||||
}
|
||||
if (all_out_data != nullptr) {
|
||||
free(all_out_data);
|
||||
all_out_data = nullptr;
|
||||
}
|
||||
new_outputs->push_back(*new_tensor);
|
||||
}
|
||||
return kSuccess;
|
||||
|
@ -314,28 +350,32 @@ Status ModelPool::ConcatPredictOutput(std::vector<std::vector<MSTensor>> *output
|
|||
Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
|
||||
const MSKernelCallBack &before, const MSKernelCallBack &after) {
|
||||
mtx_split_task_.lock();
|
||||
if (PredictTaskQueue::GetInstance()->GetTaskNum() == 0 && PredictTaskQueue::GetInstance()->GetWaitModelNum() > 1) {
|
||||
batch_split_num_ = PredictTaskQueue::GetInstance()->GetWaitModelNum();
|
||||
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(batch_split_num_);
|
||||
auto wait_model_num = PredictTaskQueue::GetInstance()->GetWaitModelNum();
|
||||
auto batch = inputs[0].Shape()[0];
|
||||
if (PredictTaskQueue::GetInstance()->GetTaskNum() == 0 && wait_model_num > 1 && batch >= wait_model_num) {
|
||||
size_t batch_split_num = PredictTaskQueue::GetInstance()->GetWaitModelNum();
|
||||
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(batch_split_num);
|
||||
std::vector<std::vector<MSTensor>> new_inputs;
|
||||
std::vector<std::vector<MSTensor>> new_outputs;
|
||||
auto status = SplitTensorByBatch(inputs, outputs, &new_inputs);
|
||||
auto status = SplitInputTensorByBatch(inputs, &new_inputs, batch_split_num);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "model pool predict failed.";
|
||||
MS_LOG(ERROR) << "model pool split input tensor by batch failed.";
|
||||
return kLiteError;
|
||||
}
|
||||
for (size_t i = 0; i < batch_split_num_; i++) {
|
||||
std::vector<MSTensor> new_output;
|
||||
new_outputs.push_back(new_output);
|
||||
status = SplitOutputTensorByBatch(&new_outputs, outputs, batch_split_num);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "model pool split output tensor by batch failed.";
|
||||
return kLiteError;
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<PredictTask>> tasks;
|
||||
for (size_t i = 0; i < batch_split_num_; i++) {
|
||||
auto predict_task = std::make_shared<PredictTask>(&new_inputs[i], &new_outputs[i], before, after);
|
||||
for (size_t i = 0; i < batch_split_num; i++) {
|
||||
auto predict_task = std::make_shared<PredictTask>(&new_inputs[i], &new_outputs.at(i), before, after);
|
||||
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task);
|
||||
tasks.push_back(predict_task);
|
||||
}
|
||||
mtx_split_task_.unlock();
|
||||
for (size_t i = 0; i < batch_split_num_; i++) {
|
||||
for (size_t i = 0; i < batch_split_num; i++) {
|
||||
PredictTaskQueue::GetInstance()->WaitUntilPredictActive(tasks[i]);
|
||||
}
|
||||
status = ConcatPredictOutput(&new_outputs, outputs);
|
||||
|
@ -343,7 +383,17 @@ Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTen
|
|||
MS_LOG(ERROR) << "ConcatPredictOutput failed.";
|
||||
return kLiteError;
|
||||
}
|
||||
if (is_user_data_) {
|
||||
for (size_t i = 0; i < batch_split_num; i++) {
|
||||
for (size_t j = 0; j < new_outputs.size(); j++) {
|
||||
new_outputs.at(i).at(j).SetData(nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (wait_model_num == 1) {
|
||||
PredictTaskQueue::GetInstance()->DecreaseWaitModelNum(1);
|
||||
}
|
||||
auto predict_task = std::make_shared<PredictTask>(&inputs, outputs, before, after);
|
||||
PredictTaskQueue::GetInstance()->PushPredictTask(predict_task);
|
||||
mtx_split_task_.unlock();
|
||||
|
@ -358,9 +408,5 @@ ModelPool::~ModelPool() {
|
|||
th.join();
|
||||
}
|
||||
}
|
||||
if (all_out_data != nullptr) {
|
||||
free(all_out_data);
|
||||
all_out_data = nullptr;
|
||||
}
|
||||
}
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -47,17 +47,18 @@ class ModelPool {
|
|||
void SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, int thread_num);
|
||||
ModelPoolContex CreateModelContext(const std::shared_ptr<RunnerConfig> &runner_config);
|
||||
std::shared_ptr<Context> InitContext(const std::shared_ptr<RunnerConfig> &runner_config);
|
||||
Status SplitTensorByBatch(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
|
||||
std::vector<std::vector<MSTensor>> *new_inputs);
|
||||
Status SplitInputTensorByBatch(const std::vector<MSTensor> &inputs, std::vector<std::vector<MSTensor>> *new_inputs,
|
||||
size_t batch_split_num);
|
||||
Status SplitOutputTensorByBatch(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs,
|
||||
size_t batch_split_num);
|
||||
Status ConcatPredictOutput(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs);
|
||||
|
||||
void *all_out_data = nullptr;
|
||||
std::vector<std::thread> model_thread_vec_;
|
||||
std::vector<MSTensor> model_inputs_;
|
||||
std::vector<MSTensor> model_outputs_;
|
||||
size_t num_models_ = 10;
|
||||
size_t batch_split_num_ = 4;
|
||||
std::mutex mtx_split_task_;
|
||||
bool is_user_data_ = false;
|
||||
};
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_INCLUDE_API_MODEL_POOL_MODEL_POOL_H
|
||||
|
|
|
@ -127,11 +127,22 @@ Status ModelThread::Predict(const std::vector<MSTensor> &inputs, std::vector<MST
|
|||
is_copy_output_ = false;
|
||||
}
|
||||
}
|
||||
auto status = model_->Predict(inputs, outputs, before, after);
|
||||
auto status = model_->Predict(inputs, &model_output, before, after);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "model predict failed.";
|
||||
return status;
|
||||
}
|
||||
if (is_copy_output_) {
|
||||
outputs->clear();
|
||||
outputs->insert(outputs->end(), model_output.begin(), model_output.end());
|
||||
} else {
|
||||
model_output = model_->GetOutputs();
|
||||
for (size_t i = 0; i < outputs->size(); i++) {
|
||||
outputs->at(i).SetShape(model_output[i].Shape());
|
||||
model_output[i].SetData(nullptr);
|
||||
model_output[i].SetAllocator(nullptr);
|
||||
}
|
||||
}
|
||||
return kSuccess;
|
||||
}
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -949,6 +949,10 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
|
|||
auto model_init_end = GetTimeUs();
|
||||
// load data
|
||||
ms_inputs_for_api_ = model_pool.GetInputs();
|
||||
if (ms_inputs_for_api_.empty()) {
|
||||
MS_LOG(ERROR) << "model pool input is empty.";
|
||||
return RET_ERROR;
|
||||
}
|
||||
for (int i = 0; i < flags_->num_require_ + flags_->warm_up_loop_count_; i++) {
|
||||
auto status = LoadInput();
|
||||
if (status != RET_OK) {
|
||||
|
@ -980,7 +984,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
|
|||
MS_LOG(ERROR) << "model pool predict failed.";
|
||||
}
|
||||
auto predict_end = GetTimeUs();
|
||||
MS_LOG(ERROR) << "run predict time: " << (predict_end - predict_start) / kFloatMSEC << " ms";
|
||||
std::cout << "run predict time: " << (predict_end - predict_start) / kFloatMSEC << " ms\n";
|
||||
if (!flags_->benchmark_data_file_.empty()) {
|
||||
auto status = CompareOutputForModelPool(&output);
|
||||
if (status != RET_OK) {
|
||||
|
@ -995,7 +999,7 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
|
|||
for (auto &warm_up_thread : model_thread_warm_up) {
|
||||
warm_up_thread.join();
|
||||
}
|
||||
MS_LOG(DEBUG) << "================ end warm up ================";
|
||||
std::cout << "================ end warm up ================";
|
||||
auto all_start = GetTimeUs();
|
||||
std::vector<std::thread> model_thread_run;
|
||||
for (int i = 0; i < flags_->num_require_; i++) {
|
||||
|
@ -1005,8 +1009,10 @@ int BenchmarkUnifiedApi::RunModelPool(std::shared_ptr<mindspore::Context> contex
|
|||
run_thread.join();
|
||||
}
|
||||
auto all_end = GetTimeUs();
|
||||
std::cout << "=================================" << std::endl;
|
||||
std::cout << "model pool init time: " << (model_init_end - model_init_start) / kFloatMSEC << " ms\n";
|
||||
std::cout << "model pool all run time: " << (all_end - all_start) / kFloatMSEC << " ms\n";
|
||||
std::cout << "=================================" << std::endl;
|
||||
return RET_OK;
|
||||
}
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue