!42773 [MS][LITE][parallel predict] delete split batch

Merge pull request !42773 from yefeng/429-delete_split_batch
This commit is contained in:
i-robot 2022-10-09 13:31:54 +00:00 committed by Gitee
commit ad7fcaa580
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 5 additions and 245 deletions

View File

@ -973,174 +973,8 @@ Status ModelPool::UpdateConfig(const std::string &section, const std::pair<std::
return kSuccess;
}
Status ModelPool::SplitInputTensorByBatch(const std::vector<MSTensor> &inputs,
std::vector<std::vector<MSTensor>> *new_inputs, size_t batch_split_num) {
if (batch_split_num == 0) {
MS_LOG(ERROR) << "batch_split_num is zero.";
return kLiteError;
}
auto batch = inputs[0].Shape()[0];
std::vector<size_t> split_batch;
size_t batch_sum = 0;
size_t per_batch = batch / batch_split_num;
for (size_t i = 0; i < batch_split_num - 1; i++) {
split_batch.push_back(per_batch);
batch_sum += per_batch;
}
split_batch.push_back(batch - batch_sum);
std::vector<std::vector<std::vector<int64_t>>> all_input_shape;
std::vector<size_t> input_data_split_size(inputs.size(), 0);
for (size_t k = 0; k < batch_split_num; k++) { // do for batch
std::vector<std::vector<int64_t>> inputs_shape;
std::vector<MSTensor> new_inputs_tensor;
for (size_t i = 0; i < inputs.size(); i++) { // do for input
std::vector<int64_t> shape;
size_t input_size = split_batch[k];
shape.push_back(split_batch[k]);
for (size_t j = 1; j < inputs[i].Shape().size(); j++) { // do for dims
shape.push_back(inputs[i].Shape()[j]);
input_size *= inputs[i].Shape()[j];
}
inputs_shape.push_back(shape);
if (inputs[i].DataType() == static_cast<enum DataType>(kNumberTypeFloat32)) {
auto data =
reinterpret_cast<float *>(const_cast<MSTensor &>(inputs[i]).MutableData()) + input_data_split_size[i];
auto new_tensor = MSTensor(inputs[i].Name(), static_cast<enum DataType>(kNumberTypeFloat32), shape, data,
input_size * sizeof(float));
if (new_tensor == nullptr) {
MS_LOG(ERROR) << "create tensor failed.";
return kLiteError;
}
new_inputs_tensor.push_back(new_tensor);
input_data_split_size[i] += input_size;
} else if (inputs[i].DataType() == static_cast<enum DataType>(kNumberTypeInt32)) {
auto data =
reinterpret_cast<int32_t *>(const_cast<MSTensor &>(inputs[i]).MutableData()) + input_data_split_size[i];
auto new_tensor = MSTensor(inputs[i].Name(), static_cast<enum DataType>(kNumberTypeInt32), shape, data,
input_size * sizeof(int32_t));
if (new_tensor == nullptr) {
MS_LOG(ERROR) << "create tensor failed.";
return kLiteError;
}
new_inputs_tensor.push_back(new_tensor);
input_data_split_size[i] += input_size;
} else {
MS_LOG(ERROR) << "not support data type in split batch.";
return kLiteError;
}
}
new_inputs->push_back(new_inputs_tensor);
all_input_shape.push_back(inputs_shape);
}
return kSuccess;
}
Status ModelPool::SplitOutputTensorByBatch(std::vector<std::vector<MSTensor>> *new_outputs,
std::vector<MSTensor> *outputs, size_t batch_split_num) {
if (batch_split_num == 0) {
MS_LOG(ERROR) << "batch_split_num is zero.";
return kLiteError;
}
for (size_t i = 0; i < batch_split_num; i++) {
std::vector<MSTensor> new_output;
for (size_t tensor_num_idx = 0; tensor_num_idx < outputs->size(); tensor_num_idx++) {
if (outputs->at(tensor_num_idx).MutableData() != nullptr && outputs->at(tensor_num_idx).DataSize() != 0) {
is_user_data_ = true;
auto data = reinterpret_cast<float *>(outputs->at(tensor_num_idx).MutableData()) +
outputs->at(tensor_num_idx).Shape().at(0) / batch_split_num * i;
auto out_tensor =
MSTensor(outputs->at(tensor_num_idx).Name(), outputs->at(tensor_num_idx).DataType(), {}, data, 0);
new_output.push_back(out_tensor);
}
}
new_outputs->push_back(new_output);
}
return kSuccess;
}
Status ModelPool::ConcatPredictOutput(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs,
int numa_id) {
if (outputs->empty()) {
MS_LOG(ERROR) << "output is empty";
return kLiteError;
}
for (size_t i = 0; i < outputs->at(0).size(); i++) {
std::vector<int64_t> output_tensor_shape = outputs->at(0)[i].Shape();
if (output_tensor_shape.empty()) {
MS_LOG(ERROR) << "output_tensor_shape is empty";
return kLiteError;
}
size_t all_data_size = 0;
size_t all_batch_size = 0;
std::vector<size_t> per_batch_data_size;
for (size_t batch = 0; batch < outputs->size(); batch++) {
per_batch_data_size.push_back(all_data_size);
all_data_size += outputs->at(batch).at(i).DataSize();
all_batch_size += outputs->at(batch).at(i).Shape().front();
}
output_tensor_shape[0] = all_batch_size;
if (is_user_data_) {
new_outputs->at(i).SetShape(output_tensor_shape);
continue;
}
if (all_data_size > MAX_MALLOC_SIZE || all_data_size == 0) {
MS_LOG(ERROR) << "malloc size is wrong.";
return kLiteError;
}
int numa_allocator_id = used_numa_node_num_ ? numa_id : -1;
auto all_out_data = numa_allocator_[numa_allocator_id]->Malloc(all_data_size);
if (all_out_data == nullptr) {
MS_LOG(ERROR) << "all_out_data is nullptr.";
return kLiteError;
}
for (size_t j = 0; j < outputs->size(); j++) {
void *out_data = outputs->at(j).at(i).MutableData();
if (out_data == nullptr) {
MS_LOG(ERROR) << "alloc addr: " << numa_allocator_[numa_allocator_id] << " numa id: " << numa_id;
numa_allocator_[numa_allocator_id]->Free(all_out_data);
all_out_data = nullptr;
MS_LOG(ERROR) << "output data is nullptr.";
return kLiteError;
}
memcpy(reinterpret_cast<float *>(all_out_data) + per_batch_data_size[j] / sizeof(float),
reinterpret_cast<float *>(out_data), outputs->at(j)[i].DataSize());
}
auto new_tensor = mindspore::MSTensor::CreateTensor(outputs->at(0)[i].Name(), outputs->at(0)[i].DataType(),
output_tensor_shape, all_out_data, all_data_size);
if (new_tensor == nullptr) {
MS_LOG(ERROR) << "create tensor failed.";
return kLiteError;
}
if (all_out_data != nullptr) {
numa_allocator_[numa_allocator_id]->Free(all_out_data);
all_out_data = nullptr;
}
new_outputs->push_back(*new_tensor);
delete new_tensor;
}
return kSuccess;
}
Status ModelPool::FreeSplitTensor(std::vector<std::vector<MSTensor>> *new_inputs,
std::vector<std::vector<MSTensor>> *new_outputs) {
for (size_t i = 0; i < new_inputs->size(); i++) {
for (size_t j = 0; j < new_inputs->at(i).size(); j++) {
new_inputs->at(i).at(j).SetData(nullptr);
}
}
new_inputs->clear();
if (is_user_data_) {
for (size_t i = 0; i < new_outputs->size(); i++) {
for (size_t j = 0; j < new_outputs->at(i).size(); j++) {
new_outputs->at(i).at(j).SetData(nullptr);
}
}
new_outputs->clear();
}
return kSuccess;
}
std::shared_ptr<ModelWorker> ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num) {
std::unique_lock<std::mutex> l(predict_task_mutex_);
*max_wait_worker_node_id = 0;
*max_wait_worker_num = predict_task_queue_->GetWaitModelNum(0);
for (size_t i = 1; i < used_numa_node_num_; i++) {
@ -1150,13 +984,14 @@ std::shared_ptr<ModelWorker> ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker
*max_wait_worker_node_id = i;
}
}
if (*max_wait_worker_num > 0 && !use_split_batch_) {
if (*max_wait_worker_num > 0) {
auto &workers = all_model_workers_[*max_wait_worker_node_id];
auto task_queue_id = *max_wait_worker_node_id;
for (auto &worker : workers) {
if (worker->IsAvailable()) {
*max_wait_worker_num = predict_task_queue_->GetWaitModelNum(task_queue_id);
*max_wait_worker_node_id = task_queue_id;
predict_task_queue_->DecreaseWaitModelNum(1, *max_wait_worker_node_id);
return worker;
}
}
@ -1164,52 +999,6 @@ std::shared_ptr<ModelWorker> ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker
return nullptr;
}
Status ModelPool::PredictBySplitBatch(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after,
int max_wait_worker_node_id) {
size_t batch_split_num = predict_task_queue_->GetWaitModelNum(max_wait_worker_node_id);
std::vector<std::vector<MSTensor>> new_inputs;
std::vector<std::vector<MSTensor>> new_outputs;
auto status = SplitInputTensorByBatch(inputs, &new_inputs, batch_split_num);
if (status != kSuccess) {
MS_LOG(ERROR) << "model pool split input tensor by batch failed.";
return kLiteError;
}
status = SplitOutputTensorByBatch(&new_outputs, outputs, batch_split_num);
if (status != kSuccess) {
MS_LOG(ERROR) << "model pool split output tensor by batch failed.";
return kLiteError;
}
std::vector<PredictTask *> tasks;
tasks.reserve(batch_split_num);
std::vector<size_t> tasks_id(batch_split_num);
for (size_t i = 0; i < batch_split_num; i++) {
auto task = CreatePredictTask(new_inputs[i], &new_outputs[i], before, after, &tasks_id[i]);
if (task == nullptr) {
return kLiteNullptr;
}
predict_task_queue_->PushPredictTask(task, max_wait_worker_node_id);
tasks.push_back(task);
}
predict_task_mutex_.unlock();
for (size_t i = 0; i < batch_split_num; i++) {
predict_task_queue_->WaitUntilPredictActive(tasks[i], max_wait_worker_node_id);
UpdateFreeTaskId(tasks_id[i]);
}
status = ConcatPredictOutput(&new_outputs, outputs, max_wait_worker_node_id);
if (status != kSuccess) {
MS_LOG(ERROR) << "ConcatPredictOutput failed.";
return kLiteError;
}
status = FreeSplitTensor(&new_inputs, &new_outputs);
if (status != kSuccess) {
MS_LOG(ERROR) << "free split tensor failed.";
return kLiteError;
}
return kSuccess;
}
PredictTask *ModelPool::CreatePredictTask(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after,
size_t *task_id) {
@ -1237,29 +1026,15 @@ void ModelPool::UpdateFreeTaskId(size_t id) {
Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after) {
std::shared_lock<std::shared_mutex> l(model_pool_mutex_);
predict_task_mutex_.lock();
int max_wait_worker_node_id = 0;
int max_wait_worker_num = 0;
auto available_worker = GetMaxWaitWorkerNum(&max_wait_worker_node_id, &max_wait_worker_num);
if (inputs.size() == 0 || inputs.front().Shape().size() == 0) {
predict_task_mutex_.unlock();
MS_LOG(ERROR) << "inputs is invalid. input size: " << inputs.size();
return kLiteError;
}
auto batch = inputs[0].Shape()[0];
if (use_split_batch_ && max_wait_worker_num > 1 && batch >= max_wait_worker_num) {
// split batch
auto status = PredictBySplitBatch(inputs, outputs, before, after, max_wait_worker_node_id);
if (status != kSuccess) {
predict_task_mutex_.unlock();
MS_LOG(ERROR) << "do split batch failed. ret=" << status;
return kLiteError;
}
return kSuccess;
} else if (available_worker != nullptr) {
predict_task_queue_->DecreaseWaitModelNum(1, max_wait_worker_node_id);
auto available_worker = GetMaxWaitWorkerNum(&max_wait_worker_node_id, &max_wait_worker_num);
if (available_worker != nullptr) {
// dispatch tasks directly to workers
predict_task_mutex_.unlock();
auto ret = available_worker->Predict(inputs, outputs, before, after);
if (ret != kSuccess) {
MS_LOG(ERROR) << "direct predict failed.";
@ -1273,11 +1048,9 @@ Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTen
auto task = CreatePredictTask(inputs, outputs, before, after, &task_id);
if (task == nullptr) {
MS_LOG(ERROR) << "The number of waiting tasks in the queue exceeds the limit, ret=" << kLiteServiceDeny;
predict_task_mutex_.unlock();
return kLiteServiceDeny;
}
predict_task_queue_->PushPredictTask(task, max_wait_worker_node_id);
predict_task_mutex_.unlock();
predict_task_queue_->WaitUntilPredictActive(task, max_wait_worker_node_id);
UpdateFreeTaskId(task_id);
}

View File

@ -77,20 +77,8 @@ class ModelPool {
Status SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, std::vector<int> *numa_node_id,
int thread_num);
Status SplitInputTensorByBatch(const std::vector<MSTensor> &inputs, std::vector<std::vector<MSTensor>> *new_inputs,
size_t batch_split_num);
Status SplitOutputTensorByBatch(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs,
size_t batch_split_num);
Status ConcatPredictOutput(std::vector<std::vector<MSTensor>> *outputs, std::vector<MSTensor> *new_outputs,
int numa_id);
Status FreeSplitTensor(std::vector<std::vector<MSTensor>> *new_inputs,
std::vector<std::vector<MSTensor>> *new_outputs);
std::shared_ptr<ModelWorker> GetMaxWaitWorkerNum(int *max_wait_worker_node_id, int *max_wait_worker_num);
Status PredictBySplitBatch(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after,
int max_wait_worker_node_id);
PredictTask *CreatePredictTask(const std::vector<MSTensor> &inputs, std::vector<MSTensor> *outputs,
const MSKernelCallBack &before, const MSKernelCallBack &after, size_t *task_id);
@ -158,7 +146,6 @@ class ModelPool {
std::unordered_map<int, std::shared_ptr<Allocator>> numa_allocator_;
// split batch
bool use_split_batch_ = false;
bool is_user_data_ = false;
bool can_use_all_physical_core_ = true;