!35666 [MS][LITE][parallel predict] bind core

Merge pull request !35666 from yefeng/336-model_pool_bind_core
This commit is contained in:
i-robot 2022-06-13 07:27:46 +00:00 committed by Gitee
commit 598a0e8895
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
8 changed files with 255 additions and 43 deletions

View File

@ -26,6 +26,8 @@
namespace mindspore {
namespace {
constexpr int kNumDeviceInfo = 2;
constexpr int kNumIndex = 2;
constexpr int kNumCoreDataLen = 3;
constexpr int kNumMaxTaskQueueSize = 1000;
int GetCoreNum() {
int core_num = 1;
@ -38,33 +40,155 @@ int GetCoreNum() {
#endif
return core_num;
}
Status DistinguishPhysicalAndLogical(std::vector<int> *physical_list, std::vector<int> *logical_list) {
int processor_id = -1;
int physical_id = -1;
int core_id = -1;
// physical id <=> one physical cpu core id
std::unordered_map<int, std::vector<int>> ids;
std::ifstream infile("/proc/cpuinfo", std::ios::in);
std::string line;
int data_size = 0;
while (getline(infile, line)) {
auto line_size = line.size();
if (line.find("processor") != std::string::npos) {
auto it = line.find(": ") + kNumIndex;
processor_id = std::atoi(line.substr(it, line_size - 1).c_str());
data_size++;
}
if (line.find("physical id") != std::string::npos) {
auto it = line.find(": ") + kNumIndex;
physical_id = std::atoi(line.substr(it, line_size - 1).c_str());
data_size++;
}
if (line.find("core id") != std::string::npos) {
auto it = line.find(": ") + kNumIndex;
core_id = std::atoi(line.substr(it, line_size - 1).c_str());
data_size++;
}
if (data_size == kNumCoreDataLen) {
if (core_id == -1 && physical_id == -1) {
MS_LOG(DEBUG) << "All cores are physical cores.";
int core_size = GetCoreNum();
for (int core_num = 0; core_num < core_size; core_num++) {
physical_list->push_back(core_num);
}
return kSuccess;
}
data_size = 0;
if (ids.find(physical_id) == ids.end()) {
std::vector<int> core_id_list = {core_id};
ids.insert(std::make_pair(physical_id, core_id_list));
physical_list->push_back(processor_id);
continue;
}
if (find(ids[physical_id].begin(), ids[physical_id].end(), core_id) == ids[physical_id].end()) {
ids[physical_id].push_back(core_id);
physical_list->push_back(processor_id);
continue;
} else {
logical_list->push_back(processor_id);
}
}
}
return kSuccess;
}
} // namespace
Status ModelPool::SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list,
Status ModelPool::DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list,
const std::vector<int> &logical_core_list) {
std::vector<std::vector<int>> all_numa_core_list;
auto numa_num = numa::NUMAAdapter::GetInstance()->NodesNum();
for (int i = 0; i < numa_num; i++) {
std::vector<int> numa_cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(i);
if (numa_cpu_list.empty()) {
MS_LOG(ERROR) << i << "-th numa node does not exist";
return kLiteError;
}
all_numa_core_list.push_back(numa_cpu_list);
}
MS_CHECK_TRUE_MSG(!all_numa_core_list.empty(), kLiteError, "numa core list is empty.");
for (auto one_numa_list : all_numa_core_list) {
MS_CHECK_TRUE_MSG(!one_numa_list.empty(), kLiteError, "one numa core list is empty.");
std::vector<int> physical_cores;
std::vector<int> logical_cores;
for (auto core_id : one_numa_list) {
if (find(physical_core_list.begin(), physical_core_list.end(), core_id) != physical_core_list.end()) {
physical_cores.push_back(core_id);
} else if (find(logical_core_list.begin(), logical_core_list.end(), core_id) != logical_core_list.end()) {
logical_cores.push_back(core_id);
} else {
MS_LOG(ERROR) << "core id not belong physical/logical core id.";
return kLiteError;
}
}
numa_physical_cores_.push_back(physical_cores);
numa_logical_cores_.push_back(logical_cores);
}
return kSuccess;
}
/*
* bind numa:
* worker 1 worker 2 worker 3 worker 4
* | | | |
* numa 0 -> numa 1 -> numa 0 -> numa 1
*
* core num: 16 worker num: 4 thread num: 4
* physical core id: 0,2,4,6,8,10,12,14 logic core id: 1,3,5,7,9,11,13,15
* numa 0: 0,1,2,3,4,5,6,7 numa 1: 8,9,10,11,12,13,14,15
*
* result of bind numa:
* physical logic
* numa 0: worker1: 0,2,4,6 worker3: 1,3,5,7
* numa 1: worker2: 8,10,12,14 worker4: 9,11,13,15
*
* */
Status ModelPool::SetNumaBindStrategy(std::vector<std::vector<int>> *all_worker_bind_list,
std::vector<int> *numa_node_id, int thread_num) {
if (MS_UNLIKELY(thread_num == 0)) {
MS_LOG(ERROR) << "thread num is zero.";
return kLiteError;
}
if (thread_num * static_cast<int>(workers_num_) > GetCoreNum()) {
MS_LOG(ERROR) << "thread num or worker num is wrong ,not support param.";
return kLiteNotSupport;
if (numa_physical_cores_.empty()) {
MS_LOG(ERROR) << "numa physical cores is empty.";
return kLiteError;
}
for (size_t i = 0; i < workers_num_;) {
std::vector<int> cpu_list = numa::NUMAAdapter::GetInstance()->GetCPUList(used_numa_node_num_);
if (static_cast<int>(cpu_list.size()) < thread_num) {
MS_LOG(ERROR) << "one numa node do not have enough cpu core for bind thread.";
if (numa_physical_cores_.front().size() < static_cast<size_t>(thread_num)) {
MS_LOG(ERROR) << "thread num more than physical core num. one numa physical core size: "
<< numa_physical_cores_.front().size();
return kLiteError;
}
std::vector<int> physical_index(numa_physical_cores_.size(), 0); // numa node size
std::vector<int> logical_index(numa_logical_cores_.size(), 0);
size_t bind_numa_id = 0;
for (size_t i = 0; i < workers_num_; i++) {
if (bind_numa_id >= numa_physical_cores_.size()) {
used_numa_node_num_ = bind_numa_id;
bind_numa_id = 0;
}
std::vector<int> worker_bind_list;
if (physical_index[bind_numa_id] + static_cast<size_t>(thread_num) <= numa_physical_cores_[bind_numa_id].size()) {
worker_bind_list.insert(worker_bind_list.begin(),
numa_physical_cores_[bind_numa_id].begin() + physical_index[bind_numa_id],
numa_physical_cores_[bind_numa_id].begin() + physical_index[bind_numa_id] + thread_num);
physical_index[bind_numa_id] += thread_num;
} else if (logical_index[bind_numa_id] + static_cast<size_t>(thread_num) <=
numa_logical_cores_[bind_numa_id].size()) {
worker_bind_list.insert(worker_bind_list.begin(),
numa_logical_cores_[bind_numa_id].begin() + logical_index[bind_numa_id],
numa_logical_cores_[bind_numa_id].begin() + logical_index[bind_numa_id] + thread_num);
logical_index[bind_numa_id] += thread_num;
} else {
MS_LOG(ERROR) << "not find core id in physical and logical.";
return kLiteError;
}
for (size_t j = 0; j < cpu_list.size() / thread_num; j++) {
std::vector<int> bind_id;
bind_id.insert(bind_id.begin(), cpu_list.begin() + j * thread_num, cpu_list.begin() + (j + 1) * thread_num);
all_model_bind_list->push_back(bind_id);
numa_node_id->push_back(used_numa_node_num_);
i++;
}
used_numa_node_num_++;
all_worker_bind_list->push_back(worker_bind_list);
numa_node_id->push_back(bind_numa_id);
bind_numa_id++;
}
used_numa_node_num_ = used_numa_node_num_ != 0 ? used_numa_node_num_ : bind_numa_id;
return kSuccess;
}
@ -74,15 +198,23 @@ void ModelPool::SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_li
MS_LOG(ERROR) << "thread num is zero.";
return;
}
int core_num = GetCoreNum();
int core_id = 0;
std::vector<int> physical_core_lite;
std::vector<int> logical_core_list;
auto status = DistinguishPhysicalAndLogical(&physical_core_lite, &logical_core_list);
if (status != kSuccess) {
MS_LOG(ERROR) << "DistinguishPhysicalAndLogical failed.";
return;
}
std::vector<int> all_core_list = physical_core_lite;
all_core_list.insert(all_core_list.end(), logical_core_list.begin(), logical_core_list.end());
size_t core_id = 0;
for (size_t i = 0; i < workers_num_; i++) {
std::vector<int> bind_id;
for (int j = 0; j < thread_num; j++) {
if (core_id >= core_num) {
if (core_id >= all_core_list.size()) {
core_id = 0;
}
bind_id.push_back(core_id);
bind_id.push_back(all_core_list[core_id]);
core_id++;
}
all_model_bind_list->push_back(bind_id);
@ -98,13 +230,14 @@ Status ModelPool::SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Con
if (use_numa_bind_mode_) {
// now only supports the same number of cores per numa node
// do not use if there are extra cores
int one_numa_node_cpu_size = numa::NUMAAdapter::GetInstance()->GetCPUList(0).size();
if (context->GetThreadNum() > one_numa_node_cpu_size) {
MS_LOG(ERROR) << "thread num more than numa node cpu cores.";
return kLiteError;
} else {
workers_num_ = one_numa_node_cpu_size / static_cast<int>(context->GetThreadNum()) * numa_node_num_;
auto worker_num = 0;
for (auto one_numa_physical : numa_physical_cores_) {
worker_num += (one_numa_physical.size() / static_cast<int>(context->GetThreadNum()));
}
for (auto one_numa_logical : numa_logical_cores_) {
worker_num += (one_numa_logical.size() / static_cast<int>(context->GetThreadNum()));
}
workers_num_ = worker_num;
} else {
// each model binds all kernels in order
workers_num_ = GetCoreNum() / static_cast<int>(context->GetThreadNum());
@ -192,23 +325,37 @@ std::shared_ptr<Context> ModelPool::InitContext(const std::shared_ptr<RunnerConf
return context;
}
Status ModelPool::SetModelBindMode(std::vector<std::vector<int>> *all_model_bind_list, std::vector<int> *numa_node_id,
Status ModelPool::SetModelBindMode(std::vector<std::vector<int>> *all_worker_bind_list, std::vector<int> *numa_node_id,
std::shared_ptr<Context> model_context) {
if (numa::NUMAAdapter::GetInstance()->Available()) {
auto status =
SetNumaBindStrategy(all_model_bind_list, numa_node_id, static_cast<int>(model_context->GetThreadNum()));
SetNumaBindStrategy(all_worker_bind_list, numa_node_id, static_cast<int>(model_context->GetThreadNum()));
if (status != kSuccess) {
MS_LOG(ERROR) << "SetNumaBindStrategy failed.";
return kLiteError;
}
} else {
SetBindStrategy(all_model_bind_list, numa_node_id, static_cast<int>(model_context->GetThreadNum()));
SetBindStrategy(all_worker_bind_list, numa_node_id, static_cast<int>(model_context->GetThreadNum()));
}
return kSuccess;
}
Status ModelPool::SetWorkersNum(const std::shared_ptr<RunnerConfig> &runner_config,
const std::shared_ptr<Context> &context) {
if (use_numa_bind_mode_) {
std::vector<int> physical_core_lite;
std::vector<int> logical_core_list;
auto status = DistinguishPhysicalAndLogical(&physical_core_lite, &logical_core_list);
if (status != kSuccess) {
MS_LOG(ERROR) << "DistinguishPhysicalAndLogical failed.";
return kLiteError;
}
status = DistinguishPhysicalAndLogicalByNuma(physical_core_lite, logical_core_list);
if (status != kSuccess) {
MS_LOG(ERROR) << "DistinguishPhysicalAndLogicalByNuma failed.";
return kLiteError;
}
}
if ((runner_config != nullptr && runner_config->GetWorkersNum() == 0) || runner_config == nullptr) {
// the user does not define the number of models, the default optimal number of models is used
auto status = SetDefaultOptimalModelNum(context);
@ -258,10 +405,10 @@ ModelPoolConfig ModelPool::CreateModelPoolConfig(const std::shared_ptr<RunnerCon
return model_pool_config;
}
// init all bind list
std::vector<std::vector<int>> all_model_bind_list;
std::vector<std::vector<int>> all_worker_bind_list;
std::vector<int> numa_node_id;
if (init_context->GetThreadAffinityMode() == lite::HIGHER_CPU) {
status = SetModelBindMode(&all_model_bind_list, &numa_node_id, init_context);
status = SetModelBindMode(&all_worker_bind_list, &numa_node_id, init_context);
if (status != kSuccess) {
MS_LOG(ERROR) << "SetModelBindMode failed.";
return {};
@ -289,7 +436,7 @@ ModelPoolConfig ModelPool::CreateModelPoolConfig(const std::shared_ptr<RunnerCon
if (init_context->GetThreadAffinityMode() != lite::NO_BIND) {
// bind by core id
worker_config->numa_id = numa_node_id[i];
context->SetThreadAffinity(all_model_bind_list[i]);
context->SetThreadAffinity(all_worker_bind_list[i]);
} else {
// not bind core , not use numa
worker_config->numa_id = -1;

View File

@ -59,9 +59,9 @@ class ModelPool {
std::shared_ptr<Context> GetUserDefineContext(const std::shared_ptr<RunnerConfig> &runner_config);
Status SetDefaultOptimalModelNum(const std::shared_ptr<mindspore::Context> &context);
Status SetModelBindMode(std::vector<std::vector<int>> *all_model_bind_list, std::vector<int> *numa_node_id,
Status SetModelBindMode(std::vector<std::vector<int>> *all_worker_bind_list, std::vector<int> *numa_node_id,
std::shared_ptr<Context> model_context);
Status SetNumaBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, std::vector<int> *numa_node_id,
Status SetNumaBindStrategy(std::vector<std::vector<int>> *all_worker_bind_list, std::vector<int> *numa_node_id,
int thread_num);
void SetBindStrategy(std::vector<std::vector<int>> *all_model_bind_list, std::vector<int> *numa_node_id,
int thread_num);
@ -85,6 +85,11 @@ class ModelPool {
void UpdateFreeTaskId(size_t id);
Status DistinguishPhysicalAndLogicalByNuma(const std::vector<int> &physical_core_list,
const std::vector<int> &logical_core_list);
std::vector<std::vector<int>> numa_physical_cores_;
std::vector<std::vector<int>> numa_logical_cores_;
std::vector<std::thread> worker_thread_vec_;
std::vector<MSTensor> model_pool_inputs_;
std::vector<MSTensor> model_pool_outputs_;

View File

@ -58,12 +58,12 @@ Status PredictTaskQueue::InitTaskQueue(size_t num, size_t max_queue_size) {
}
void PredictTaskQueue::WaitUntilPredictActive(PredictTask *task, int node_id) {
waite_worker_num_.at(node_id) += 1;
std::unique_lock<std::mutex> result_lock(task->task_done_mutex);
while (!task->ready) {
task->task_done_condition.wait(result_lock);
}
task->ready = false;
waite_worker_num_.at(node_id) += 1;
return;
}

View File

@ -528,7 +528,7 @@ int LiteSession::CompileGraph(Model *model) {
is_running_.store(false);
return ret;
}
ret = lite::PackWeightManager::GetInstance()->StoreOriginTensorData(model);
ret = lite::PackWeightManager::GetInstance()->StoreOriginTensorData(model, &tensors_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "StoreOriginTensorData failed.";
return RET_ERROR;

View File

@ -48,11 +48,16 @@ STATUS PackWeight::InitWeightManagerByBuf(const char *model_buf, size_t model_si
return RET_ERROR;
}
memcpy(new_model_buf, model_buf, model_size);
numa_model_buf_[model_buf] = {numa_id};
if (numa_model_buf_.find(model_buf) == numa_model_buf_.end()) {
numa_model_buf_[model_buf] = {numa_id};
model_buf_map_[model_buf] = {new_model_buf};
} else {
numa_model_buf_[model_buf].push_back(numa_id);
model_buf_map_[model_buf].push_back(new_model_buf);
}
buf_model_weight_[new_model_buf] = model_const_weight;
buf_model_weight_[new_model_buf]->allocator = allocator;
model_const_weight->numa_id = numa_id;
model_buf_map_.insert(std::make_pair(model_buf, new_model_buf));
} else {
buf_model_weight_[model_buf] = model_const_weight;
buf_model_weight_[model_buf]->allocator = allocator;
@ -67,7 +72,9 @@ char *PackWeight::GetNumaModelBuf(const char *model_buf, int numa_id) {
MS_LOG(ERROR) << "can not find numa id in saved model buf.";
return nullptr;
}
return model_buf_map_[model_buf];
auto numa_id_list = numa_model_buf_[model_buf];
auto it = find(numa_id_list.begin(), numa_id_list.end(), numa_id) - numa_id_list.begin();
return model_buf_map_[model_buf][it];
}
STATUS PackWeight::StoreOriginTensorData(const char *model_buf, const void *origin_tensor_data) {
@ -86,6 +93,32 @@ STATUS PackWeight::StoreOriginTensorData(const char *model_buf, const void *orig
return RET_OK;
}
STATUS PackWeight::ReplaceOriginTensorData(const char *model_buf, std::vector<Tensor *> *tensors, int tensor_index) {
std::lock_guard<std::mutex> lock(mtx_weight_);
if (buf_model_weight_.find(model_buf) == buf_model_weight_.end()) {
MS_LOG(ERROR) << "can not find model buf in store origin Tensor";
return RET_ERROR;
}
auto &tensor = tensors->at(tensor_index);
auto &model_weight = buf_model_weight_[model_buf];
if (model_weight->tensors_data.find(tensor_index) == model_weight->tensors_data.end()) {
auto allocator = model_weight->allocator;
void *new_data = allocator->Malloc(tensor->Size());
memcpy(new_data, tensor->data(), tensor->Size());
MS_CHECK_TRUE_MSG(tensor->own_data(), RET_ERROR, "tensor data is not own data.");
tensor->FreeData();
tensor->set_data(new_data);
tensor->set_own_data(false);
model_weight->tensors_data.insert(std::make_pair(tensor_index, new_data));
} else {
auto new_data = model_weight->tensors_data[tensor_index];
tensor->FreeData();
tensor->set_data(new_data);
tensor->set_own_data(false);
}
return RET_OK;
}
void *PackWeight::GetPackData(const void *tensor_data, const size_t size, bool *is_packed) {
std::lock_guard<std::mutex> lock(mtx_weight_);
MS_CHECK_TRUE_RET(tensor_data != nullptr, nullptr);
@ -126,10 +159,25 @@ void PackWeight::FreePackedWeight(ModelConstWeight *weight) {
weight->origin_and_packed_pair.clear();
}
void PackWeight::FreeTensorData(ModelConstWeight *weight) {
MS_CHECK_TRUE_RET_VOID(weight != nullptr);
for (auto &tensor_data : weight->tensors_data) {
auto &data = tensor_data.second;
auto allocator = weight->allocator;
MS_CHECK_TRUE_RET_VOID(allocator != nullptr);
if (data != nullptr) {
allocator->Free(data);
data = nullptr;
}
}
weight->tensors_data.clear();
}
PackWeight::~PackWeight() {
std::lock_guard<std::mutex> lock(mtx_weight_);
for (auto &item : buf_model_weight_) {
FreePackedWeight(item.second);
FreeTensorData(item.second);
}
// free model buf
if (copy_buf_) {

View File

@ -33,6 +33,7 @@ struct ModelConstWeight {
std::map<const void *, void *> origin_and_packed_pair;
std::shared_ptr<Allocator> allocator = nullptr;
int numa_id = -1;
std::unordered_map<int, void *> tensors_data;
};
class PackWeight {
@ -43,15 +44,17 @@ class PackWeight {
char *GetNumaModelBuf(const char *model_buf, int numa_id);
STATUS StoreOriginTensorData(const char *model_buf, const void *origin_tensor_data);
void *GetPackData(const void *tensor_data, const size_t size, bool *is_packed);
STATUS ReplaceOriginTensorData(const char *model_buf, std::vector<Tensor *> *tensors, int tensor_index);
private:
void FreePackedWeight(ModelConstWeight *weight);
void FreeTensorData(ModelConstWeight *weight);
bool copy_buf_ = false;
std::mutex mtx_weight_;
std::unordered_map<const char *, ModelConstWeight *> buf_model_weight_;
std::unordered_map<const char *, std::vector<int>> numa_model_buf_;
std::unordered_map<const char *, char *> model_buf_map_;
std::unordered_map<const char *, std::vector<char *>> model_buf_map_;
};
} // namespace mindspore::lite
#endif // MINDSPORE_LITE_SRC_RUNTIME_PACK_WEIGHT_H_

View File

@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "src/runtime/pack_weight_manager.h"
#include <vector>
#include "src/common/graph_util.h"
namespace mindspore::lite {
namespace {
@ -78,7 +79,7 @@ char *PackWeightManager::GetNumaModelBuf(const char *model_buf, int numa_id) {
return nullptr;
}
STATUS PackWeightManager::StoreOriginTensorData(Model *model) {
STATUS PackWeightManager::StoreOriginTensorData(Model *model, std::vector<Tensor *> *all_tensors) {
#ifdef SHARING_MODEL_WEIGHT
MS_CHECK_TRUE_MSG(model != nullptr, RET_ERROR, "model is nullptr in pack weight manager.");
if (pack_weight_ == nullptr) {
@ -96,7 +97,14 @@ STATUS PackWeightManager::StoreOriginTensorData(Model *model) {
src_tensor->length() == 0) {
continue;
}
auto status = pack_weight_->StoreOriginTensorData(lite_model->buf, src_tensor->data());
if (all_tensors->at(tensor_index)->own_data()) {
auto status = pack_weight_->ReplaceOriginTensorData(lite_model->buf, all_tensors, tensor_index);
if (status != RET_OK) {
MS_LOG(DEBUG) << "ReplaceOriginTensorData failed.";
return RET_ERROR;
}
}
auto status = pack_weight_->StoreOriginTensorData(lite_model->buf, all_tensors->at(tensor_index)->data());
if (status != RET_OK) {
MS_LOG(DEBUG) << "data not packed.";
return RET_ERROR;

View File

@ -17,6 +17,7 @@
#ifndef MINDSPORE_LITE_SRC_RUNTIME_PACK_WEIGHT_MANAGER_H_
#define MINDSPORE_LITE_SRC_RUNTIME_PACK_WEIGHT_MANAGER_H_
#include <memory>
#include <vector>
#include "include/model.h"
#include "include/errorcode.h"
#include "src/tensor.h"
@ -31,7 +32,7 @@ class PackWeightManager {
STATUS InitPackWeight(const char *model_buf, size_t model_size, int numa_id = -1);
STATUS InitPackWeightByBuf(const char *model_buf, size_t model_size);
char *GetNumaModelBuf(const char *model_buf, int numa_id);
STATUS StoreOriginTensorData(Model *model);
STATUS StoreOriginTensorData(Model *model, std::vector<Tensor *> *all_tensors);
void *GetPackData(const void *tensor_data, const size_t size, bool *is_packed);
void Free(void *tensor_data);
bool IsCopyTensor(int op_type);