From 5c349fafac359679b3e2c5f01b024c3a2a1cc508 Mon Sep 17 00:00:00 2001 From: f00566451 Date: Wed, 24 Aug 2022 11:45:25 +0800 Subject: [PATCH] add inter_op_parallel_threads --- .../mindspore/mindspore.set_context.rst | 3 ++ .../device/cpu/kernel/pack_cpu_kernel.cc | 20 ++++++----- .../ccsrc/pybind_api/utils/ms_context_py.cc | 1 + .../graph_scheduler/actor/actor_common.cc | 35 +++++++++++-------- .../graph_scheduler/graph_scheduler.cc | 2 +- mindspore/core/mindrt/src/thread/threadpool.h | 1 + mindspore/core/utils/ms_context.cc | 5 ++- mindspore/core/utils/ms_context.h | 1 + mindspore/python/mindspore/context.py | 15 ++++++-- 9 files changed, 56 insertions(+), 27 deletions(-) diff --git a/docs/api/api_python/mindspore/mindspore.set_context.rst b/docs/api/api_python/mindspore/mindspore.set_context.rst index ac5959ab87f..5a61ee133f7 100644 --- a/docs/api/api_python/mindspore/mindspore.set_context.rst +++ b/docs/api/api_python/mindspore/mindspore.set_context.rst @@ -63,6 +63,8 @@ mindspore.set_context | +------------------------------+----------------------------+ | | enable_compile_cache | CPU/GPU/Ascend | | +------------------------------+----------------------------+ + | | inter_op_parallel_num | CPU/GPU/Ascend | + | +------------------------------+----------------------------+ | | runtime_num_threads | CPU/GPU/Ascend | | +------------------------------+----------------------------+ | | compile_cache_path | CPU/GPU/Ascend | @@ -134,6 +136,7 @@ mindspore.set_context - **grad_for_scalar** (bool) - 表示是否获取标量梯度。默认值:False。当 `grad_for_scalar` 设置为True时,则可以导出函数的标量输入。由于后端目前不支持伸缩操作,所以该接口只支持在前端可推演的简单操作。 - **enable_compile_cache** (bool) - 表示是否加载或者保存前端编译的图。当 `enable_compile_cache` 被设置为True时,在第一次执行的过程中,一个硬件无关的编译缓存会被生成并且导出为一个MINDIR文件。当该网络被再次执行时,如果 `enable_compile_cache` 仍然为True并且网络脚本没有被更改,那么这个编译缓存会被加载。注意目前只支持有限的Python脚本更改的自动检测,这意味着可能有正确性风险。默认值:False。这是一个实验特性,可能会被更改或者删除。 - **compile_cache_path** (str) - 保存前端图编译缓存的路径。默认值:"."。如果目录不存在,系统会自动创建这个目录。缓存会被保存到如下目录: `compile_cache_path/rank_${rank_id}/` 。 `rank_id` 是集群上当前设备的ID。 + - **inter_op_parallel_num** (int) - 算子间并行数控制。 默认值为0,表示由框架默认指定。 - **runtime_num_threads** (int) - 运行时actor和CPU算子核使用的线程池线程数,必须大于0。默认值为30,如果同时运行多个进程,应将该值设置得小一些,以避免线程争用。 - **disable_format_transform** (bool) - 表示是否取消NCHW到NHWC的自动格式转换功能。当fp16的网络性能不如fp32的时,可以设置 `disable_format_transform` 为True,以尝试提高训练性能。默认值:False。 - **support_binary** (bool) - 是否支持在图形模式下运行.pyc或.so。如果要支持在图形模式下运行.so或.pyc,可将 `support_binary` 置为True,并运行一次.py文件,从而将接口源码保存到接口定义.py文件中,因此要保证该文件可写。然后将.py文件编译成.pyc或.so文件,即可在图模式下运行。 diff --git a/mindspore/ccsrc/plugin/device/cpu/kernel/pack_cpu_kernel.cc b/mindspore/ccsrc/plugin/device/cpu/kernel/pack_cpu_kernel.cc index ad2f1f93507..f3bd6100d4e 100644 --- a/mindspore/ccsrc/plugin/device/cpu/kernel/pack_cpu_kernel.cc +++ b/mindspore/ccsrc/plugin/device/cpu/kernel/pack_cpu_kernel.cc @@ -66,24 +66,26 @@ bool PackFwdCpuKernelMod::LaunchKernel(const std::vector &in } CHECK_KERNEL_INPUTS_NUM(inputs.size(), input_num_, kernel_name_); CHECK_KERNEL_OUTPUTS_NUM(outputs.size(), kPackOutputsNum, kernel_name_); - auto *output = reinterpret_cast(outputs[0]->addr); - std::unique_ptr inputs_host = std::make_unique(input_num_); + auto *output = reinterpret_cast(outputs[0]->addr); + std::vector inputs_host; for (size_t i = 0; i < inputs.size(); i++) { - inputs_host[i] = static_cast(inputs[i]->addr); + (void)inputs_host.emplace_back(reinterpret_cast(inputs[i]->addr)); } // multi-threading size_t input_size = output_size_; size_t dims_behind_axis = dims_behind_axis_; - auto task = [this, &output, &dims_behind_axis, &inputs_host](size_t start, size_t end) { + size_t copy_time = input_size / dims_behind_axis; + size_t single_copy_size = dims_behind_axis * sizeof(T); + auto task = [&](size_t start, size_t end) { for (size_t pos = start; pos < end; ++pos) { - size_t cur_input_index = pos / dims_behind_axis % this->input_num_; - size_t cycle_len = this->input_num_ * dims_behind_axis; - size_t local_index = pos / cycle_len * dims_behind_axis + pos % cycle_len % dims_behind_axis; - output[pos] = inputs_host[cur_input_index][local_index]; + size_t cur_input_index = pos % this->input_num_; + size_t local_idx = pos / this->input_num_; + (void)memcpy_s(output + single_copy_size * pos, single_copy_size, + inputs_host[cur_input_index] + single_copy_size * local_idx, single_copy_size); } }; - ParallelLaunchAutoSearch(task, input_size, this, ¶llel_search_info_); + ParallelLaunchAutoSearch(task, copy_time, this, ¶llel_search_info_); return true; } diff --git a/mindspore/ccsrc/pybind_api/utils/ms_context_py.cc b/mindspore/ccsrc/pybind_api/utils/ms_context_py.cc index c18c48f57d6..64fc4c75b38 100644 --- a/mindspore/ccsrc/pybind_api/utils/ms_context_py.cc +++ b/mindspore/ccsrc/pybind_api/utils/ms_context_py.cc @@ -86,6 +86,7 @@ void RegMsContext(py::module *m) { .value("mempool_block_size", MsCtxParam::MS_CTX_MEMPOOL_BLOCK_SIZE) .value("mode", MsCtxParam::MS_CTX_EXECUTION_MODE) .value("device_target", MsCtxParam::MS_CTX_DEVICE_TARGET) + .value("inter_op_parallel_num", MsCtxParam::MS_CTX_INTER_OP_PARALLEL_NUM) .value("runtime_num_threads", MsCtxParam::MS_CTX_RUNTIME_NUM_THREADS) .value("_graph_memory_max_size", MsCtxParam::MS_CTX_GRAPH_MEMORY_MAX_SIZE) .value("print_file_path", MsCtxParam::MS_CTX_PRINT_FILE_PATH) diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_common.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_common.cc index 61bf4e7a604..6eb6ab39a8a 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_common.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/actor_common.cc @@ -37,24 +37,31 @@ void ComputeThreadNums(size_t *actor_thread_num, size_t *actor_and_kernel_thread MS_EXCEPTION_IF_NULL(actor_and_kernel_thread_num); auto context_ptr = MsContext::GetInstance(); MS_EXCEPTION_IF_NULL(context_ptr); - const size_t cpu_core_num = std::thread::hardware_concurrency() - 1; + const size_t cpu_core_num = std::thread::hardware_concurrency(); + auto inter_op_parallel_num = static_cast(context_ptr->get_param(MS_CTX_INTER_OP_PARALLEL_NUM)); auto runtime_num_threads = static_cast(context_ptr->get_param(MS_CTX_RUNTIME_NUM_THREADS)); size_t runtime_num_threads_min = std::min(runtime_num_threads, cpu_core_num); + size_t inter_op_parallel_num_min = std::min(inter_op_parallel_num, cpu_core_num); const float kActorUsage = 0.18; - const size_t kActorThreadMinNum = 2; - size_t actor_thread_max_num = - std::max(static_cast(std::floor(runtime_num_threads_min * kActorUsage)), kActorThreadMinNum); - // Compute the actor thread num. - // The MemoryManagerActor binds single thread, and the other actors share one thread at least, so the min num is 2. - *actor_thread_num = runtime_num_threads_min < kActorThreadMinNum ? kActorThreadMinNum : runtime_num_threads_min; - *actor_thread_num = *actor_thread_num > actor_thread_max_num ? actor_thread_max_num : *actor_thread_num; - + const size_t kActorThreadMinNum = 1; // Compute the actor and kernel thread num. - *actor_and_kernel_thread_num = - runtime_num_threads_min > *actor_thread_num ? runtime_num_threads_min : (*actor_thread_num + 1); - if (runtime_num_threads != *actor_and_kernel_thread_num) { - MS_LOG(WARNING) << "The runtime_num_threads is " << runtime_num_threads - << ", but actually the num of threads in threadpool is " << *actor_and_kernel_thread_num; + // The MemoryManagerActor binds single thread, so if runtime_num_threads is 30, actor num would be 5, + // kernel num would be 25. + if (inter_op_parallel_num_min == 0) { + size_t actor_thread_max_num = + std::max(static_cast(std::floor(runtime_num_threads_min * kActorUsage)), kActorThreadMinNum); + *actor_thread_num = actor_thread_max_num; + *actor_and_kernel_thread_num = + runtime_num_threads_min > *actor_thread_num ? (runtime_num_threads_min) : (*actor_thread_num + 1); + } else { + *actor_thread_num = inter_op_parallel_num_min; + *actor_and_kernel_thread_num = runtime_num_threads_min + *actor_thread_num; + } + + if (*actor_and_kernel_thread_num > cpu_core_num) { + MS_LOG(WARNING) << "The total num of thread pool is " << *actor_and_kernel_thread_num + << ", but the num of cpu core is " << cpu_core_num + << ", please set the threads within reasonable limits."; } } diff --git a/mindspore/ccsrc/runtime/graph_scheduler/graph_scheduler.cc b/mindspore/ccsrc/runtime/graph_scheduler/graph_scheduler.cc index 09aa15da4d5..6e63f6814cf 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/graph_scheduler.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/graph_scheduler.cc @@ -429,7 +429,7 @@ void GraphScheduler::BuildAndScheduleGlobalActor() { memory_manager_aid_ = memory_manager_actor->GetAID(); auto base_actor = static_cast(memory_manager_actor); // Bind single thread to response to memory alloc and free quickly. - (void)actor_manager->Spawn(base_actor, false); + (void)actor_manager->Spawn(base_actor, true); // Create and schedule recorder actor. bool recorder_actor_need = false; diff --git a/mindspore/core/mindrt/src/thread/threadpool.h b/mindspore/core/mindrt/src/thread/threadpool.h index 282de551ee3..486f3b25f79 100644 --- a/mindspore/core/mindrt/src/thread/threadpool.h +++ b/mindspore/core/mindrt/src/thread/threadpool.h @@ -169,6 +169,7 @@ class MS_CORE_API ThreadPool { void SetActorThreadNum(size_t actor_thread_num) { actor_thread_num_ = actor_thread_num; } void SetKernelThreadNum(size_t kernel_thread_num) { kernel_thread_num_ = kernel_thread_num; } size_t GetKernelThreadNum() const { return kernel_thread_num_; } + size_t GetActorThreadNum() const { return actor_thread_num_; } void SetKernelThreadMaxSpinCount(int spin_count); void SetSpinCountMaxValue(); void SetSpinCountMinValue(); diff --git a/mindspore/core/utils/ms_context.cc b/mindspore/core/utils/ms_context.cc index 5af2fea47e7..319a348fe61 100644 --- a/mindspore/core/utils/ms_context.cc +++ b/mindspore/core/utils/ms_context.cc @@ -105,10 +105,13 @@ MsContext::MsContext(const std::string &policy, const std::string &target) { set_param(MS_CTX_MEMORY_OPTIMIZE_LEVEL, kOptimizeO0); set_param(MS_CTX_OP_TIMEOUT, kOpTimeout); + uint32_t kDefaultInterOpParallelThreads = 0; uint32_t kDefaultRuntimeNumThreads = 30; - uint32_t cpu_core_num = std::thread::hardware_concurrency() - 1; + uint32_t cpu_core_num = std::thread::hardware_concurrency(); uint32_t runtime_num_threads_default = std::min(cpu_core_num, kDefaultRuntimeNumThreads); + uint32_t inter_op_parallel_num_default = std::min(cpu_core_num, kDefaultInterOpParallelThreads); set_param(MS_CTX_RUNTIME_NUM_THREADS, runtime_num_threads_default); + set_param(MS_CTX_INTER_OP_PARALLEL_NUM, inter_op_parallel_num_default); backend_policy_ = policy_map_[policy]; } diff --git a/mindspore/core/utils/ms_context.h b/mindspore/core/utils/ms_context.h index f4a45e65325..7b41430f646 100644 --- a/mindspore/core/utils/ms_context.h +++ b/mindspore/core/utils/ms_context.h @@ -99,6 +99,7 @@ enum MsCtxParam : unsigned { MS_CTX_TYPE_UINT32_BEGIN = MS_CTX_TYPE_INT_END, MS_CTX_DEVICE_ID = MS_CTX_TYPE_UINT32_BEGIN, MS_CTX_RUNTIME_NUM_THREADS, + MS_CTX_INTER_OP_PARALLEL_NUM, MS_CTX_GE_REF, MS_CTX_MAX_CALL_DEPTH, MS_CTX_TSD_REF, diff --git a/mindspore/python/mindspore/context.py b/mindspore/python/mindspore/context.py index 21746699d64..d0a42ebdb6c 100644 --- a/mindspore/python/mindspore/context.py +++ b/mindspore/python/mindspore/context.py @@ -363,6 +363,11 @@ class _Context: if op_timeout <= 0: raise ValueError("The num of op exe timeout must bigger than 0.") self.set_param(ms_ctx_param.op_timeout, op_timeout) + def set_inter_op_parallel_num(self, inter_op_parallel_num): + """Check and set inter_op_parallel_num.""" + if inter_op_parallel_num < 0: + raise ValueError("The num of parallel thread must bigger than or equal to 0.") + self.set_param(ms_ctx_param.inter_op_parallel_num, inter_op_parallel_num) setters = { 'mode': set_mode, @@ -377,6 +382,7 @@ class _Context: 'mempool_block_size': set_mempool_block_size, 'print_file_path': set_print_file_path, 'env_config_path': set_env_config_path, + 'inter_op_parallel_num': set_inter_op_parallel_num, 'runtime_num_threads': set_runtime_num_threads, 'memory_optimize_level': set_memory_optimize_level, 'op_timeout': set_op_timeout @@ -679,7 +685,7 @@ def _check_target_specific_cfgs(device, arg_key): @args_type_check(mode=int, precompile_only=bool, device_target=str, device_id=int, save_graphs=bool, save_graphs_path=str, enable_dump=bool, auto_tune_mode=str, save_dump_path=str, enable_reduce_precision=bool, variable_memory_max_size=str, - enable_auto_mixed_precision=bool, + enable_auto_mixed_precision=bool, inter_op_parallel_num=int, enable_graph_kernel=bool, reserve_class_name_in_scope=bool, check_bprop=bool, max_device_memory=str, print_file_path=str, max_call_depth=int, env_config_path=str, graph_kernel_flags=str, save_compile_cache=bool, runtime_num_threads=int, load_compile_cache=bool, @@ -750,6 +756,8 @@ def set_context(**kwargs): | +------------------------------+----------------------------+ | | enable_compile_cache | CPU/GPU/Ascend | | +------------------------------+----------------------------+ + | | inter_op_parallel_num | CPU/GPU/Ascend | + | +------------------------------+----------------------------+ | | runtime_num_threads | CPU/GPU/Ascend | | +------------------------------+----------------------------+ | | compile_cache_path | CPU/GPU/Ascend | @@ -888,7 +896,9 @@ def set_context(**kwargs): If the specified directory does not exist, the system will automatically create the directory. The cache will be saved to the directory of `compile_cache_path/rank_${rank_id}/`. The `rank_id` is the ID of the current device in the cluster. - runtime_num_threads(int): The thread pool number of cpu kernel and actor used in runtime, + inter_op_parallel_num(int): The thread number of op parallel at the same time. Default value is 0, + which means use the default num. + runtime_num_threads(int): The thread pool number of cpu kernel used in runtime, which must bigger than 0. Default value is 30, if you run many processes at the same time, you should set the value smaller to avoid thread contention. disable_format_transform (bool): Whether to disable the automatic format transform function from NCHW to NHWC. @@ -925,6 +935,7 @@ def set_context(**kwargs): >>> ms.set_context(enable_compile_cache=True, compile_cache_path="./cache.ms") >>> ms.set_context(pynative_synchronize=True) >>> ms.set_context(runtime_num_threads=10) + >>> ms.set_context(inter_op_parallel_num=4) >>> ms.set_context(disable_format_transform=True) """ ctx = _context()