!35821 [MS][LITE][parallel predict] task queue support numa
Merge pull request !35821 from yefeng/337-task_queue_support_different_numa_node
This commit is contained in:
commit
26405d4f4b
|
@ -29,6 +29,9 @@ constexpr int kNumDeviceInfo = 2;
|
|||
constexpr int kNumIndex = 2;
|
||||
constexpr int kNumCoreDataLen = 3;
|
||||
constexpr int kNumMaxTaskQueueSize = 1000;
|
||||
constexpr int kNumPhysicalCoreThreshold = 32;
|
||||
constexpr int kDefaultWorkerNumPerPhysicalCpu = 4;
|
||||
constexpr int kDefaultThreadsNum = 8;
|
||||
int GetCoreNum() {
|
||||
int core_num = 1;
|
||||
#if defined(_MSC_VER) || defined(_WIN32)
|
||||
|
@ -94,6 +97,22 @@ Status DistinguishPhysicalAndLogical(std::vector<int> *physical_list, std::vecto
|
|||
}
|
||||
return kSuccess;
|
||||
}
|
||||
|
||||
int GetDefaultThreadNum() {
|
||||
std::vector<int> physical_core_lite;
|
||||
std::vector<int> logical_core_list;
|
||||
auto status = DistinguishPhysicalAndLogical(&physical_core_lite, &logical_core_list);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "DistinguishPhysicalAndLogical failed.";
|
||||
return 0;
|
||||
}
|
||||
auto physical_core_size = physical_core_lite.size();
|
||||
if (physical_core_lite.size() < kNumPhysicalCoreThreshold) {
|
||||
return physical_core_size / kDefaultWorkerNumPerPhysicalCpu;
|
||||
} else {
|
||||
return kDefaultThreadsNum;
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
Status ModelPool::DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list,
|
||||
|
@ -248,6 +267,12 @@ Status ModelPool::SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Con
|
|||
std::shared_ptr<mindspore::Context> ModelPool::GetDefaultContext() {
|
||||
MS_LOG(DEBUG) << "use default config.";
|
||||
auto context = std::make_shared<Context>();
|
||||
auto thread_num = GetDefaultThreadNum();
|
||||
if (thread_num == 0) {
|
||||
MS_LOG(ERROR) << "computer thread num failed.";
|
||||
return nullptr;
|
||||
}
|
||||
context->SetThreadNum(thread_num);
|
||||
auto &device_list = context->MutableDeviceInfo();
|
||||
auto device_info = std::make_shared<CPUDeviceInfo>();
|
||||
if (device_info == nullptr) {
|
||||
|
@ -545,6 +570,7 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
|
|||
MS_LOG(ERROR) << "model pool config size is wrong.";
|
||||
return kLiteError;
|
||||
}
|
||||
bool create_worker_success = true;
|
||||
for (size_t i = 0; i < workers_num_; i++) {
|
||||
int numa_node_id = model_pool_config[i]->numa_id;
|
||||
auto ret = lite::PackWeightManager::GetInstance()->InitPackWeight(graph_buf, size, numa_node_id);
|
||||
|
@ -559,15 +585,21 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
|
|||
int task_queue_id = numa_node_id != -1 ? numa_node_id : 0;
|
||||
predict_task_queue_->IncreaseWaitModelNum(1, task_queue_id);
|
||||
worker_thread_vec_.push_back(std::thread(&ModelWorker::CreateThreadWorker, model_worker, new_model_buf, size,
|
||||
model_pool_config[i], predict_task_queue_, &create_worker_success_));
|
||||
all_model_worker_.insert(std::make_pair(model_worker, task_queue_id));
|
||||
model_pool_config[i], predict_task_queue_, &create_worker_success));
|
||||
if (all_model_workers_.find(task_queue_id) != all_model_workers_.end()) {
|
||||
all_model_workers_[task_queue_id].push_back(model_worker);
|
||||
} else {
|
||||
all_model_workers_[task_queue_id] = {model_worker};
|
||||
}
|
||||
}
|
||||
for (auto &item : all_model_worker_) {
|
||||
auto &worker = item.first;
|
||||
worker->WaitCreateWorkerDone();
|
||||
if (!create_worker_success_) {
|
||||
MS_LOG(ERROR) << "init failed.";
|
||||
return kLiteError;
|
||||
for (auto &item : all_model_workers_) {
|
||||
auto &workers = item.second;
|
||||
for (auto &worker : workers) {
|
||||
worker->WaitCreateWorkerDone();
|
||||
if (!create_worker_success) {
|
||||
MS_LOG(ERROR) << "worker init failed.";
|
||||
return kLiteError;
|
||||
}
|
||||
}
|
||||
}
|
||||
// init model pool input and output
|
||||
|
@ -591,12 +623,14 @@ Status ModelPool::Init(const std::string &model_path, const std::shared_ptr<Runn
|
|||
}
|
||||
|
||||
Status ModelPool::UpdateConfig(const std::string §ion, const std::pair<std::string, std::string> &config) {
|
||||
for (auto &item : all_model_worker_) {
|
||||
auto &worker = item.first;
|
||||
auto status = worker->UpdateConfig(section, config);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "model pool update config failed, status=" << status;
|
||||
return status;
|
||||
for (auto &item : all_model_workers_) {
|
||||
auto &workers = item.second;
|
||||
for (auto &worker : workers) {
|
||||
auto status = worker->UpdateConfig(section, config);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "model pool update config failed, status=" << status;
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
return kSuccess;
|
||||
|
@ -780,12 +814,12 @@ std::shared_ptr<ModelWorker> ModelPool::GetMaxWaitWorkerNum(int *max_wait_worker
|
|||
}
|
||||
}
|
||||
if (*max_wait_worker_num > 0 && !use_split_batch_) {
|
||||
for (auto &item : all_model_worker_) {
|
||||
auto &worker = item.first;
|
||||
auto numa_id = item.second;
|
||||
auto &workers = all_model_workers_[*max_wait_worker_node_id];
|
||||
auto task_queue_id = *max_wait_worker_node_id;
|
||||
for (auto &worker : workers) {
|
||||
if (worker->IsAvailable()) {
|
||||
*max_wait_worker_num = predict_task_queue_->GetWaitModelNum(numa_id);
|
||||
*max_wait_worker_node_id = numa_id;
|
||||
*max_wait_worker_num = predict_task_queue_->GetWaitModelNum(task_queue_id);
|
||||
*max_wait_worker_node_id = task_queue_id;
|
||||
return worker;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,25 +88,37 @@ class ModelPool {
|
|||
Status DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list,
|
||||
const std::vector<int> &logical_core_list);
|
||||
|
||||
std::vector<std::vector<int>> numa_physical_cores_;
|
||||
std::vector<std::vector<int>> numa_logical_cores_;
|
||||
private:
|
||||
// different workers get tasks from different task queues.
|
||||
// currently task queues are distinguished according to different numa node numbers.
|
||||
// if you do not distinguish between numa nodes, the default task queue number is 0.
|
||||
// task queue id <=> worker : sort workers by performance.
|
||||
std::unordered_map<int, std::vector<std::shared_ptr<ModelWorker>>> all_model_workers_;
|
||||
|
||||
// save all worker thread
|
||||
std::vector<std::thread> worker_thread_vec_;
|
||||
std::mutex predict_task_mutex_;
|
||||
std::vector<MSTensor> model_pool_inputs_;
|
||||
std::vector<MSTensor> model_pool_outputs_;
|
||||
size_t workers_num_ = 1;
|
||||
std::mutex predict_task_mutex_;
|
||||
bool is_user_data_ = false;
|
||||
int numa_node_num_ = 1;
|
||||
int used_numa_node_num_ = 0;
|
||||
bool use_numa_bind_mode_ = false;
|
||||
|
||||
// create predict task
|
||||
std::shared_ptr<PredictTaskQueue> predict_task_queue_ = nullptr;
|
||||
std::unordered_map<int, std::shared_ptr<Allocator>> numa_allocator_;
|
||||
bool use_split_batch_ = false;
|
||||
std::unordered_map<std::shared_ptr<ModelWorker>, int> all_model_worker_;
|
||||
bool create_worker_success_ = true;
|
||||
PredictTask *tasks_ = nullptr;
|
||||
std::mutex task_id_mutex_;
|
||||
std::queue<size_t> free_tasks_id_;
|
||||
|
||||
// use numa
|
||||
int numa_node_num_ = 1;
|
||||
int used_numa_node_num_ = 0;
|
||||
bool use_numa_bind_mode_ = false;
|
||||
std::vector<std::vector<int>> numa_physical_cores_;
|
||||
std::vector<std::vector<int>> numa_logical_cores_;
|
||||
std::unordered_map<int, std::shared_ptr<Allocator>> numa_allocator_;
|
||||
|
||||
// split batch
|
||||
bool use_split_batch_ = false;
|
||||
bool is_user_data_ = false; // use in split batch
|
||||
};
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_LITE_SRC_RUNTIME_CXX_API_MODEL_POOL_MODEL_POOL_H_
|
||||
|
|
|
@ -78,42 +78,6 @@ void ModelWorker::Run(int node_id, const std::shared_ptr<PredictTaskQueue> &pred
|
|||
}
|
||||
}
|
||||
|
||||
Status ModelWorker::ResizeInit() {
|
||||
auto inputs = model_->GetInputs();
|
||||
std::vector<std::vector<int64_t>> new_input_shape;
|
||||
for (size_t input_idx = 0; input_idx < inputs.size(); input_idx++) {
|
||||
new_input_shape.push_back(inputs[input_idx].Shape());
|
||||
for (size_t i = 1; i < new_input_shape.size(); i++) {
|
||||
if (new_input_shape[input_idx][i] == -1) {
|
||||
return kSuccess;
|
||||
}
|
||||
}
|
||||
if (new_input_shape[input_idx][0] == -1) {
|
||||
// only support resize for batch dim
|
||||
new_input_shape[input_idx][0] = kNumInitBatch;
|
||||
} else {
|
||||
// If the batch dimension is not -1, no resize processing is performed
|
||||
return kSuccess;
|
||||
}
|
||||
}
|
||||
auto status = model_->Resize(inputs, new_input_shape);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "model resize failed in init. ret=" << status;
|
||||
return kLiteError;
|
||||
}
|
||||
inputs = model_->GetInputs();
|
||||
for (auto &input : inputs) {
|
||||
input.MutableData();
|
||||
}
|
||||
std::vector<MSTensor> out;
|
||||
status = model_->Predict(inputs, &out);
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "init resize failed. ret=" << status;
|
||||
return kLiteError;
|
||||
}
|
||||
return kSuccess;
|
||||
}
|
||||
|
||||
Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_ptr<WorkerConfig> &worker_config) {
|
||||
MS_CHECK_TRUE_MSG(model_buf != nullptr, kLiteError, "model_buf is nullptr in model worker.");
|
||||
MS_CHECK_TRUE_MSG(worker_config != nullptr, kLiteError, "worker_config is nullptr in model worker.");
|
||||
|
@ -143,13 +107,6 @@ Status ModelWorker::Init(const char *model_buf, size_t size, const std::shared_p
|
|||
MS_LOG(ERROR) << "model worker get empty input/output.";
|
||||
return kLiteError;
|
||||
}
|
||||
if (need_init_resize_) {
|
||||
status = ResizeInit();
|
||||
if (status != kSuccess) {
|
||||
MS_LOG(ERROR) << "init resize failed. ret=" << status;
|
||||
return kLiteError;
|
||||
}
|
||||
}
|
||||
return kSuccess;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,12 +64,10 @@ class ModelWorker {
|
|||
|
||||
std::pair<std::vector<std::vector<int64_t>>, bool> GetModelResize(const std::vector<MSTensor> &model_inputs,
|
||||
const std::vector<MSTensor> &inputs);
|
||||
Status ResizeInit();
|
||||
|
||||
Status CopyOutputTensor(std::vector<MSTensor> model_outputs, std::vector<MSTensor> *user_outputs);
|
||||
|
||||
private:
|
||||
bool need_init_resize_ = true;
|
||||
std::shared_ptr<mindspore::Model> model_ = nullptr;
|
||||
std::mutex mtx_worker_;
|
||||
std::atomic_bool available_ = true;
|
||||
|
|
|
@ -104,6 +104,10 @@ STATUS PackWeight::ReplaceOriginTensorData(const char *model_buf, std::vector<Te
|
|||
if (model_weight->tensors_data.find(tensor_index) == model_weight->tensors_data.end()) {
|
||||
auto allocator = model_weight->allocator;
|
||||
void *new_data = allocator->Malloc(tensor->Size());
|
||||
if (new_data == nullptr) {
|
||||
MS_LOG(ERROR) << "allocator malloc data failed.";
|
||||
return RET_ERROR;
|
||||
}
|
||||
memcpy(new_data, tensor->data(), tensor->Size());
|
||||
MS_CHECK_TRUE_MSG(tensor->own_data(), RET_ERROR, "tensor data is not own data.");
|
||||
tensor->FreeData();
|
||||
|
|
|
@ -996,6 +996,10 @@ int BenchmarkUnifiedApi::ParallelInference(std::shared_ptr<mindspore::Context> c
|
|||
all_outputs_.push_back(output);
|
||||
}
|
||||
if (!flags_->benchmark_data_file_.empty()) {
|
||||
for (size_t i = 0; i < ms_inputs_for_api_.size(); i++) {
|
||||
auto &tensor = ms_inputs_for_api_[i];
|
||||
tensor.SetShape(resize_dims_[i]);
|
||||
}
|
||||
status = PrintInputData();
|
||||
MS_CHECK_FALSE_MSG(status != RET_OK, status, "PrintInputData error ");
|
||||
status = ReadCalibData();
|
||||
|
|
Loading…
Reference in New Issue