task queue

This commit is contained in:
yefeng 2022-06-13 13:56:27 +08:00
parent 6d6327882f
commit 98ba08d1be
6 changed files with 84 additions and 75 deletions

View File

@ -29,6 +29,9 @@ constexpr int kNumDeviceInfo = 2;
constexpr int kNumIndex = 2; constexpr int kNumIndex = 2;
constexpr int kNumCoreDataLen = 3; constexpr int kNumCoreDataLen = 3;
constexpr int kNumMaxTaskQueueSize = 1000; constexpr int kNumMaxTaskQueueSize = 1000;
constexpr int kNumPhysicalCoreThreshold = 32;
constexpr int kDefaultWorkerNumPerPhysicalCpu = 4;
constexpr int kDefaultThreadsNum = 8;
int GetCoreNum() { int GetCoreNum() {
int core_num = 1; int core_num = 1;
#if defined(_MSC_VER) || defined(_WIN32) #if defined(_MSC_VER) || defined(_WIN32)
@ -94,6 +97,22 @@ Status DistinguishPhysicalAndLogical(std::vector<int> *physical_list, std::vecto
} }
return kSuccess; return kSuccess;
} }
int GetDefaultThreadNum() {
std::vector<int> physical_core_lite;
std::vector<int> logical_core_list;
auto status = DistinguishPhysicalAndLogical(&physical_core_lite, &logical_core_list);
if (status != kSuccess) {
MS_LOG(ERROR) << "DistinguishPhysicalAndLogical failed.";
return 0;
}
auto physical_core_size = physical_core_lite.size();
if (physical_core_lite.size() < kNumPhysicalCoreThreshold) {
return physical_core_size / kDefaultWorkerNumPerPhysicalCpu;
} else {
return kDefaultThreadsNum;
}
}
} // namespace } // namespace
Status ModelPool::DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list, Status ModelPool::DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list,
@ -248,6 +267,12 @@ Status ModelPool::SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Con
std::shared_ptr<mindspore::Context> ModelPool::GetDefaultContext() { std::shared_ptr<mindspore::Context> ModelPool::GetDefaultContext() {
MS_LOG(DEBUG) << "use default config."; MS_LOG(DEBUG) << "use default config.";
auto context = std::make_shared<Context>(); auto context = std::make_shared<Context>();
auto thread_num = GetDefaultThreadNum();
if (thread_num == 0) {
MS_LOG(ERROR) << "computer thread num failed.";
return nullptr;
}
context->SetThreadNum(thread_num);
auto &device_list = context->MutableDeviceInfo(); auto &device_list = context->MutableDeviceInfo();
auto device_info = std::make_shared<CPUDeviceInfo>(); auto device_info = std::make_shared<CPUDeviceInfo>();
if (device_info == nullptr) { if (device_info == nullptr) {
@ -545,6 +570,7 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
MS_LOG(ERROR) << "model pool config size is wrong."; MS_LOG(ERROR) << "model pool config size is wrong.";
return kLiteError; return kLiteError;
} }
bool create_worker_success = true;
for (size_t i = 0; i < workers_num_; i++) { for (size_t i = 0; i < workers_num_; i++) {
int numa_node_id = model_pool_config[i]->numa_id; int numa_node_id = model_pool_config[i]->numa_id;
auto ret = lite::PackWeightManager::GetInstance()->InitPackWeight(graph_buf, size, numa_node_id); auto ret = lite::PackWeightManager::GetInstance()->InitPackWeight(graph_buf, size, numa_node_id);
@ -559,17 +585,23 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
int task_queue_id = numa_node_id != -1 ? numa_node_id : 0; int task_queue_id = numa_node_id != -1 ? numa_node_id : 0;
predict_task_queue_->IncreaseWaitModelNum(1, task_queue_id); predict_task_queue_->IncreaseWaitModelNum(1, task_queue_id);
worker_thread_vec_.push_back(std::thread(&ModelWorker::CreateThreadWorker, model_worker, new_model_buf, size, worker_thread_vec_.push_back(std::thread(&ModelWorker::CreateThreadWorker, model_worker, new_model_buf, size,
model_pool_config[i], predict_task_queue_, &create_worker_success_)); model_pool_config[i], predict_task_queue_, &create_worker_success));
all_model_worker_.insert(std::make_pair(model_worker, task_queue_id)); if (all_model_workers_.find(task_queue_id) != all_model_workers_.end()) {
all_model_workers_[task_queue_id].push_back(model_worker);
} else {
all_model_workers_[task_queue_id] = {model_worker};
} }
for (auto &item : all_model_worker_) { }
auto &worker = item.first; for (auto &item : all_model_workers_) {
auto &workers = item.second;
for (auto &worker : workers) {
worker->WaitCreateWorkerDone(); worker->WaitCreateWorkerDone();
if (!create_worker_success_) { if (!create_worker_success) {
MS_LOG(ERROR) << "init failed."; MS_LOG(ERROR) << "worker init failed.";
return kLiteError; return kLiteError;
} }
} }
}
// init model pool input and output // init model pool input and output
if (model_worker != nullptr) { if (model_worker != nullptr) {
model_pool_inputs_ = model_worker->GetInputs(); model_pool_inputs_ = model_worker->GetInputs();
@ -591,14 +623,16 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
} }
Status ModelPool::UpdateConfig(const std::string &section, const std::pair<std::string, std::string> &config) { Status ModelPool::UpdateConfig(const std::string &section, const std::pair<std::string, std::string> &config) {
for (auto &item : all_model_worker_) { for (auto &item : all_model_workers_) {
auto &worker = item.first; auto &workers = item.second;
for (auto &worker : workers) {
auto status = worker->UpdateConfig(section, config); auto status = worker->UpdateConfig(section, config);
if (status != kSuccess) { if (status != kSuccess) {
MS_LOG(ERROR) << "model pool update config failed, status=" << status; MS_LOG(ERROR) << "model pool update config failed, status=" << status;
return status; return status;
} }
} }
}
return kSuccess; return kSuccess;
} }
@ -780,12 +814,12 @@ std::shared_ptr<ModelWorker> ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker
} }
} }
if (*max_wait_worker_num > 0 && !use_split_batch_) { if (*max_wait_worker_num > 0 && !use_split_batch_) {
for (auto &item : all_model_worker_) { auto &workers = all_model_workers_[*max_wait_worker_node_id];
auto &worker = item.first; auto task_queue_id = *max_wait_worker_node_id;
auto numa_id = item.second; for (auto &worker : workers) {
if (worker->IsAvailable()) { if (worker->IsAvailable()) {
*max_wait_worker_num = predict_task_queue_->GetWaitModelNum(numa_id); *max_wait_worker_num = predict_task_queue_->GetWaitModelNum(task_queue_id);
*max_wait_worker_node_id = numa_id; *max_wait_worker_node_id = task_queue_id;
return worker; return worker;
} }
} }

View File

@ -88,25 +88,37 @@ class ModelPool {
Status DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list, Status DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list,
const std::vector<int> &logical_core_list); const std::vector<int> &logical_core_list);
std::vector<std::vector<int>> numa_physical_cores_; private:
std::vector<std::vector<int>> numa_logical_cores_; // different workers get tasks from different task queues.
// currently task queues are distinguished according to different numa node numbers.
// if you do not distinguish between numa nodes, the default task queue number is 0.
// task queue id <=> worker : sort workers by performance.
std::unordered_map<int, std::vector<std::shared_ptr<ModelWorker>>> all_model_workers_;
// save all worker thread
std::vector<std::thread> worker_thread_vec_; std::vector<std::thread> worker_thread_vec_;
std::mutex predict_task_mutex_;
std::vector<MSTensor> model_pool_inputs_; std::vector<MSTensor> model_pool_inputs_;
std::vector<MSTensor> model_pool_outputs_; std::vector<MSTensor> model_pool_outputs_;
size_t workers_num_ = 1; size_t workers_num_ = 1;
std::mutex predict_task_mutex_;
bool is_user_data_ = false; // create predict task
int numa_node_num_ = 1;
int used_numa_node_num_ = 0;
bool use_numa_bind_mode_ = false;
std::shared_ptr<PredictTaskQueue> predict_task_queue_ = nullptr; std::shared_ptr<PredictTaskQueue> predict_task_queue_ = nullptr;
std::unordered_map<int, std::shared_ptr<Allocator>> numa_allocator_;
bool use_split_batch_ = false;
std::unordered_map<std::shared_ptr<ModelWorker>, int> all_model_worker_;
bool create_worker_success_ = true;
PredictTask *tasks_ = nullptr; PredictTask *tasks_ = nullptr;
std::mutex task_id_mutex_; std::mutex task_id_mutex_;
std::queue<size_t> free_tasks_id_; std::queue<size_t> free_tasks_id_;
// use numa
int numa_node_num_ = 1;
int used_numa_node_num_ = 0;
bool use_numa_bind_mode_ = false;
std::vector<std::vector<int>> numa_physical_cores_;
std::vector<std::vector<int>> numa_logical_cores_;
std::unordered_map<int, std::shared_ptr<Allocator>> numa_allocator_;
// split batch
bool use_split_batch_ = false;
bool is_user_data_ = false; // use in split batch
}; };
} // namespace mindspore } // namespace mindspore
#endif // MINDSPORE_LITE_SRC_RUNTIME_CXX_API_MODEL_POOL_MODEL_POOL_H_ #endif // MINDSPORE_LITE_SRC_RUNTIME_CXX_API_MODEL_POOL_MODEL_POOL_H_

View File

@ -78,42 +78,6 @@ void ModelWorker::Run(int node_id, const std::shared_ptr<PredictTaskQueue> &pred
} }
} }
Status ModelWorker::ResizeInit() {
auto inputs = model_->GetInputs();
std::vector<std::vector<int64_t>> new_input_shape;
for (size_t input_idx = 0; input_idx < inputs.size(); input_idx++) {
new_input_shape.push_back(inputs[input_idx].Shape());
for (size_t i = 1; i < new_input_shape.size(); i++) {
if (new_input_shape[input_idx][i] == -1) {
return kSuccess;
}
}
if (new_input_shape[input_idx][0] == -1) {
// only support resize for batch dim
new_input_shape[input_idx][0] = kNumInitBatch;
} else {
// If the batch dimension is not -1, no resize processing is performed
return kSuccess;
}
}
auto status = model_->Resize(inputs, new_input_shape);
if (status != kSuccess) {
MS_LOG(ERROR) << "model resize failed in init. ret=" << status;
return kLiteError;
}
inputs = model_->GetInputs();
for (auto &input : inputs) {
input.MutableData();
}
std::vector<MSTensor> out;
status = model_->Predict(inputs, &out);
if (status != kSuccess) {
MS_LOG(ERROR) << "init resize failed. ret=" << status;
return kLiteError;
}
return kSuccess;
}
Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_ptr<WorkerConfig> &worker_config) { Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_ptr<WorkerConfig> &worker_config) {
MS_CHECK_TRUE_MSG(model_buf != nullptr, kLiteError, "model_buf is nullptr in model worker."); MS_CHECK_TRUE_MSG(model_buf != nullptr, kLiteError, "model_buf is nullptr in model worker.");
MS_CHECK_TRUE_MSG(worker_config != nullptr, kLiteError, "worker_config is nullptr in model worker."); MS_CHECK_TRUE_MSG(worker_config != nullptr, kLiteError, "worker_config is nullptr in model worker.");
@ -143,13 +107,6 @@ Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_p
MS_LOG(ERROR) << "model worker get empty input/output."; MS_LOG(ERROR) << "model worker get empty input/output.";
return kLiteError; return kLiteError;
} }
if (need_init_resize_) {
status = ResizeInit();
if (status != kSuccess) {
MS_LOG(ERROR) << "init resize failed. ret=" << status;
return kLiteError;
}
}
return kSuccess; return kSuccess;
} }

View File

@ -64,12 +64,10 @@ class ModelWorker {
std::pair<std::vector<std::vector<int64_t>>, bool> GetModelResize(const std::vector<MSTensor> &model_inputs, std::pair<std::vector<std::vector<int64_t>>, bool> GetModelResize(const std::vector<MSTensor> &model_inputs,
const std::vector<MSTensor> &inputs); const std::vector<MSTensor> &inputs);
Status ResizeInit();
Status CopyOutputTensor(std::vector<MSTensor> model_outputs, std::vector<MSTensor> *user_outputs); Status CopyOutputTensor(std::vector<MSTensor> model_outputs, std::vector<MSTensor> *user_outputs);
private: private:
bool need_init_resize_ = true;
std::shared_ptr<mindspore::Model> model_ = nullptr; std::shared_ptr<mindspore::Model> model_ = nullptr;
std::mutex mtx_worker_; std::mutex mtx_worker_;
std::atomic_bool available_ = true; std::atomic_bool available_ = true;

View File

@ -104,6 +104,10 @@ STATUS PackWeight::ReplaceOriginTensorData(const char *model_buf, std::vector<Te
if (model_weight->tensors_data.find(tensor_index) == model_weight->tensors_data.end()) { if (model_weight->tensors_data.find(tensor_index) == model_weight->tensors_data.end()) {
auto allocator = model_weight->allocator; auto allocator = model_weight->allocator;
void *new_data = allocator->Malloc(tensor->Size()); void *new_data = allocator->Malloc(tensor->Size());
if (new_data == nullptr) {
MS_LOG(ERROR) << "allocator malloc data failed.";
return RET_ERROR;
}
memcpy(new_data, tensor->data(), tensor->Size()); memcpy(new_data, tensor->data(), tensor->Size());
MS_CHECK_TRUE_MSG(tensor->own_data(), RET_ERROR, "tensor data is not own data."); MS_CHECK_TRUE_MSG(tensor->own_data(), RET_ERROR, "tensor data is not own data.");
tensor->FreeData(); tensor->FreeData();

View File

@ -996,6 +996,10 @@ int BenchmarkUnifiedApi::ParallelInference(std::shared_ptr<mindspore::Context> c
all_outputs_.push_back(output); all_outputs_.push_back(output);
} }
if (!flags_->benchmark_data_file_.empty()) { if (!flags_->benchmark_data_file_.empty()) {
for (size_t i = 0; i < ms_inputs_for_api_.size(); i++) {
auto &tensor = ms_inputs_for_api_[i];
tensor.SetShape(resize_dims_[i]);
}
status = PrintInputData(); status = PrintInputData();
MS_CHECK_FALSE_MSG(status != RET_OK, status, "PrintInputData error "); MS_CHECK_FALSE_MSG(status != RET_OK, status, "PrintInputData error ");
status = ReadCalibData(); status = ReadCalibData();