!41749 add interop parallel num

Merge pull request !41749 from fangzehua/add_interop_0909
This commit is contained in:
i-robot 2022-09-14 01:19:33 +00:00 committed by Gitee
commit 1dc64892f8
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
9 changed files with 56 additions and 27 deletions

View File

@ -63,6 +63,8 @@ mindspore.set_context
| +------------------------------+----------------------------+
| | enable_compile_cache | CPU/GPU/Ascend |
| +------------------------------+----------------------------+
| | inter_op_parallel_num | CPU/GPU/Ascend |
| +------------------------------+----------------------------+
| | runtime_num_threads | CPU/GPU/Ascend |
| +------------------------------+----------------------------+
| | compile_cache_path | CPU/GPU/Ascend |
@ -134,6 +136,7 @@ mindspore.set_context
- **grad_for_scalar** (bool) - 表示是否获取标量梯度。默认值False。当 `grad_for_scalar` 设置为True时则可以导出函数的标量输入。由于后端目前不支持伸缩操作所以该接口只支持在前端可推演的简单操作。
- **enable_compile_cache** (bool) - 表示是否加载或者保存前端编译的图。当 `enable_compile_cache` 被设置为True时在第一次执行的过程中一个硬件无关的编译缓存会被生成并且导出为一个MINDIR文件。当该网络被再次执行时如果 `enable_compile_cache` 仍然为True并且网络脚本没有被更改那么这个编译缓存会被加载。注意目前只支持有限的Python脚本更改的自动检测这意味着可能有正确性风险。默认值False。这是一个实验特性可能会被更改或者删除。
- **compile_cache_path** (str) - 保存前端图编译缓存的路径。默认值:"."。如果目录不存在,系统会自动创建这个目录。缓存会被保存到如下目录: `compile_cache_path/rank_${rank_id}/``rank_id` 是集群上当前设备的ID。
- **inter_op_parallel_num** (int) - 算子间并行数控制。 默认值为0表示由框架默认指定。
- **runtime_num_threads** (int) - 运行时actor和CPU算子核使用的线程池线程数必须大于0。默认值为30如果同时运行多个进程应将该值设置得小一些以避免线程争用。
- **disable_format_transform** (bool) - 表示是否取消NCHW到NHWC的自动格式转换功能。当fp16的网络性能不如fp32的时可以设置 `disable_format_transform` 为True以尝试提高训练性能。默认值False。
- **support_binary** (bool) - 是否支持在图形模式下运行.pyc或.so。如果要支持在图形模式下运行.so或.pyc可将 `support_binary` 置为True并运行一次.py文件从而将接口源码保存到接口定义.py文件中因此要保证该文件可写。然后将.py文件编译成.pyc或.so文件即可在图模式下运行。

View File

@ -66,24 +66,26 @@ bool PackFwdCpuKernelMod::LaunchKernel(const std::vector<kernel::AddressPtr> &in
}
CHECK_KERNEL_INPUTS_NUM(inputs.size(), input_num_, kernel_name_);
CHECK_KERNEL_OUTPUTS_NUM(outputs.size(), kPackOutputsNum, kernel_name_);
auto *output = reinterpret_cast<T *>(outputs[0]->addr);
std::unique_ptr<T *[]> inputs_host = std::make_unique<T *[]>(input_num_);
auto *output = reinterpret_cast<char *>(outputs[0]->addr);
std::vector<char *> inputs_host;
for (size_t i = 0; i < inputs.size(); i++) {
inputs_host[i] = static_cast<T *>(inputs[i]->addr);
(void)inputs_host.emplace_back(reinterpret_cast<char *>(inputs[i]->addr));
}
// multi-threading
size_t input_size = output_size_;
size_t dims_behind_axis = dims_behind_axis_;
auto task = [this, &output, &dims_behind_axis, &inputs_host](size_t start, size_t end) {
size_t copy_time = input_size / dims_behind_axis;
size_t single_copy_size = dims_behind_axis * sizeof(T);
auto task = [&](size_t start, size_t end) {
for (size_t pos = start; pos < end; ++pos) {
size_t cur_input_index = pos / dims_behind_axis % this->input_num_;
size_t cycle_len = this->input_num_ * dims_behind_axis;
size_t local_index = pos / cycle_len * dims_behind_axis + pos % cycle_len % dims_behind_axis;
output[pos] = inputs_host[cur_input_index][local_index];
size_t cur_input_index = pos % this->input_num_;
size_t local_idx = pos / this->input_num_;
(void)memcpy_s(output + single_copy_size * pos, single_copy_size,
inputs_host[cur_input_index] + single_copy_size * local_idx, single_copy_size);
}
};
ParallelLaunchAutoSearch(task, input_size, this, &parallel_search_info_);
ParallelLaunchAutoSearch(task, copy_time, this, &parallel_search_info_);
return true;
}

View File

@ -86,6 +86,7 @@ void RegMsContext(py::module *m) {
.value("mempool_block_size", MsCtxParam::MS_CTX_MEMPOOL_BLOCK_SIZE)
.value("mode", MsCtxParam::MS_CTX_EXECUTION_MODE)
.value("device_target", MsCtxParam::MS_CTX_DEVICE_TARGET)
.value("inter_op_parallel_num", MsCtxParam::MS_CTX_INTER_OP_PARALLEL_NUM)
.value("runtime_num_threads", MsCtxParam::MS_CTX_RUNTIME_NUM_THREADS)
.value("_graph_memory_max_size", MsCtxParam::MS_CTX_GRAPH_MEMORY_MAX_SIZE)
.value("print_file_path", MsCtxParam::MS_CTX_PRINT_FILE_PATH)

View File

@ -37,24 +37,31 @@ void ComputeThreadNums(size_t *actor_thread_num, size_t *actor_and_kernel_thread
MS_EXCEPTION_IF_NULL(actor_and_kernel_thread_num);
auto context_ptr = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(context_ptr);
const size_t cpu_core_num = std::thread::hardware_concurrency() - 1;
const size_t cpu_core_num = std::thread::hardware_concurrency();
auto inter_op_parallel_num = static_cast<size_t>(context_ptr->get_param<uint32_t>(MS_CTX_INTER_OP_PARALLEL_NUM));
auto runtime_num_threads = static_cast<size_t>(context_ptr->get_param<uint32_t>(MS_CTX_RUNTIME_NUM_THREADS));
size_t runtime_num_threads_min = std::min(runtime_num_threads, cpu_core_num);
size_t inter_op_parallel_num_min = std::min(inter_op_parallel_num, cpu_core_num);
const float kActorUsage = 0.18;
const size_t kActorThreadMinNum = 2;
size_t actor_thread_max_num =
std::max(static_cast<size_t>(std::floor(runtime_num_threads_min * kActorUsage)), kActorThreadMinNum);
// Compute the actor thread num.
// The MemoryManagerActor binds single thread, and the other actors share one thread at least, so the min num is 2.
*actor_thread_num = runtime_num_threads_min < kActorThreadMinNum ? kActorThreadMinNum : runtime_num_threads_min;
*actor_thread_num = *actor_thread_num > actor_thread_max_num ? actor_thread_max_num : *actor_thread_num;
const size_t kActorThreadMinNum = 1;
// Compute the actor and kernel thread num.
*actor_and_kernel_thread_num =
runtime_num_threads_min > *actor_thread_num ? runtime_num_threads_min : (*actor_thread_num + 1);
if (runtime_num_threads != *actor_and_kernel_thread_num) {
MS_LOG(WARNING) << "The runtime_num_threads is " << runtime_num_threads
<< ", but actually the num of threads in threadpool is " << *actor_and_kernel_thread_num;
// The MemoryManagerActor binds single thread, so if runtime_num_threads is 30, actor num would be 5,
// kernel num would be 25.
if (inter_op_parallel_num_min == 0) {
size_t actor_thread_max_num =
std::max(static_cast<size_t>(std::floor(runtime_num_threads_min * kActorUsage)), kActorThreadMinNum);
*actor_thread_num = actor_thread_max_num;
*actor_and_kernel_thread_num =
runtime_num_threads_min > *actor_thread_num ? (runtime_num_threads_min) : (*actor_thread_num + 1);
} else {
*actor_thread_num = inter_op_parallel_num_min;
*actor_and_kernel_thread_num = runtime_num_threads_min + *actor_thread_num;
}
if (*actor_and_kernel_thread_num > cpu_core_num) {
MS_LOG(WARNING) << "The total num of thread pool is " << *actor_and_kernel_thread_num
<< ", but the num of cpu core is " << cpu_core_num
<< ", please set the threads within reasonable limits.";
}
}

View File

@ -429,7 +429,7 @@ void GraphScheduler::BuildAndScheduleGlobalActor() {
memory_manager_aid_ = memory_manager_actor->GetAID();
auto base_actor = static_cast<ActorReference>(memory_manager_actor);
// Bind single thread to response to memory alloc and free quickly.
(void)actor_manager->Spawn(base_actor, false);
(void)actor_manager->Spawn(base_actor, true);
// Create and schedule recorder actor.
bool recorder_actor_need = false;

View File

@ -169,6 +169,7 @@ class MS_CORE_API ThreadPool {
void SetActorThreadNum(size_t actor_thread_num) { actor_thread_num_ = actor_thread_num; }
void SetKernelThreadNum(size_t kernel_thread_num) { kernel_thread_num_ = kernel_thread_num; }
size_t GetKernelThreadNum() const { return kernel_thread_num_; }
size_t GetActorThreadNum() const { return actor_thread_num_; }
void SetKernelThreadMaxSpinCount(int spin_count);
void SetSpinCountMaxValue();
void SetSpinCountMinValue();

View File

@ -105,10 +105,13 @@ MsContext::MsContext(const std::string &policy, const std::string &target) {
set_param<int>(MS_CTX_MEMORY_OPTIMIZE_LEVEL, kOptimizeO0);
set_param<uint32_t>(MS_CTX_OP_TIMEOUT, kOpTimeout);
uint32_t kDefaultInterOpParallelThreads = 0;
uint32_t kDefaultRuntimeNumThreads = 30;
uint32_t cpu_core_num = std::thread::hardware_concurrency() - 1;
uint32_t cpu_core_num = std::thread::hardware_concurrency();
uint32_t runtime_num_threads_default = std::min(cpu_core_num, kDefaultRuntimeNumThreads);
uint32_t inter_op_parallel_num_default = std::min(cpu_core_num, kDefaultInterOpParallelThreads);
set_param<uint32_t>(MS_CTX_RUNTIME_NUM_THREADS, runtime_num_threads_default);
set_param<uint32_t>(MS_CTX_INTER_OP_PARALLEL_NUM, inter_op_parallel_num_default);
backend_policy_ = policy_map_[policy];
}

View File

@ -99,6 +99,7 @@ enum MsCtxParam : unsigned {
MS_CTX_TYPE_UINT32_BEGIN = MS_CTX_TYPE_INT_END,
MS_CTX_DEVICE_ID = MS_CTX_TYPE_UINT32_BEGIN,
MS_CTX_RUNTIME_NUM_THREADS,
MS_CTX_INTER_OP_PARALLEL_NUM,
MS_CTX_GE_REF,
MS_CTX_MAX_CALL_DEPTH,
MS_CTX_TSD_REF,

View File

@ -363,6 +363,11 @@ class _Context:
if op_timeout <= 0:
raise ValueError("The num of op exe timeout must bigger than 0.")
self.set_param(ms_ctx_param.op_timeout, op_timeout)
def set_inter_op_parallel_num(self, inter_op_parallel_num):
"""Check and set inter_op_parallel_num."""
if inter_op_parallel_num < 0:
raise ValueError("The num of parallel thread must bigger than or equal to 0.")
self.set_param(ms_ctx_param.inter_op_parallel_num, inter_op_parallel_num)
setters = {
'mode': set_mode,
@ -377,6 +382,7 @@ class _Context:
'mempool_block_size': set_mempool_block_size,
'print_file_path': set_print_file_path,
'env_config_path': set_env_config_path,
'inter_op_parallel_num': set_inter_op_parallel_num,
'runtime_num_threads': set_runtime_num_threads,
'memory_optimize_level': set_memory_optimize_level,
'op_timeout': set_op_timeout
@ -679,7 +685,7 @@ def _check_target_specific_cfgs(device, arg_key):
@args_type_check(mode=int, precompile_only=bool, device_target=str, device_id=int, save_graphs=bool,
save_graphs_path=str, enable_dump=bool, auto_tune_mode=str,
save_dump_path=str, enable_reduce_precision=bool, variable_memory_max_size=str,
enable_auto_mixed_precision=bool,
enable_auto_mixed_precision=bool, inter_op_parallel_num=int,
enable_graph_kernel=bool, reserve_class_name_in_scope=bool, check_bprop=bool,
max_device_memory=str, print_file_path=str, max_call_depth=int, env_config_path=str,
graph_kernel_flags=str, save_compile_cache=bool, runtime_num_threads=int, load_compile_cache=bool,
@ -750,6 +756,8 @@ def set_context(**kwargs):
| +------------------------------+----------------------------+
| | enable_compile_cache | CPU/GPU/Ascend |
| +------------------------------+----------------------------+
| | inter_op_parallel_num | CPU/GPU/Ascend |
| +------------------------------+----------------------------+
| | runtime_num_threads | CPU/GPU/Ascend |
| +------------------------------+----------------------------+
| | compile_cache_path | CPU/GPU/Ascend |
@ -888,7 +896,9 @@ def set_context(**kwargs):
If the specified directory does not exist, the system will automatically create the directory.
The cache will be saved to the directory of `compile_cache_path/rank_${rank_id}/`. The `rank_id` is
the ID of the current device in the cluster.
runtime_num_threads(int): The thread pool number of cpu kernel and actor used in runtime,
inter_op_parallel_num(int): The thread number of op parallel at the same time. Default value is 0,
which means use the default num.
runtime_num_threads(int): The thread pool number of cpu kernel used in runtime,
which must bigger than 0. Default value is 30, if you run many processes at
the same time, you should set the value smaller to avoid thread contention.
disable_format_transform (bool): Whether to disable the automatic format transform function from NCHW to NHWC.
@ -925,6 +935,7 @@ def set_context(**kwargs):
>>> ms.set_context(enable_compile_cache=True, compile_cache_path="./cache.ms")
>>> ms.set_context(pynative_synchronize=True)
>>> ms.set_context(runtime_num_threads=10)
>>> ms.set_context(inter_op_parallel_num=4)
>>> ms.set_context(disable_format_transform=True)
"""
ctx = _context()