diff --git a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/CMakeLists.txt b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/CMakeLists.txt index 9424340b2cf..01fbcaa3d94 100644 --- a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/CMakeLists.txt +++ b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/CMakeLists.txt @@ -27,7 +27,6 @@ if(EXISTS ${CMAKE_C_COMPILER} AND EXISTS ${CMAKE_CXX_COMPILER}) ${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_async_event.cc ${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_context.cc ${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_pulse.cc - ${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_sharder.cc ${CMAKE_CURRENT_SOURCE_DIR}/random_choice_with_mask_kernels.cc ${CMAKE_CURRENT_SOURCE_DIR}/gather_grad_kernels.cc ${CMAKE_CURRENT_SOURCE_DIR}/environ/aicpu_environ_manager.cc diff --git a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/aicpu_sharder/aicpu_sharder.cc b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/aicpu_sharder/aicpu_sharder.cc deleted file mode 100644 index 0ad8b327121..00000000000 --- a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/aicpu_sharder/aicpu_sharder.cc +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Copyright 2021 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "aicpu_sharder/aicpu_sharder.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "common/kernel_log.h" - -namespace aicpu { -#define AICPU_SHARDER_IF_TRUE_RUN(expr, run) \ - do { \ - if (expr) { \ - run; \ - } \ - } while (0) - -void SharderNonBlock::Register(const RunnerBool &schedule, const ClosureBool &do_task, uint32_t cpu_core_num) { - schedule_ = schedule; - do_task_ = do_task; - cpu_core_num_ = cpu_core_num; -} - -bool SharderNonBlock::Enqueue(const Closure &closure, bool submit_topic) { - if (schedule_ != nullptr) { - return schedule_(closure, submit_topic); - } - return false; -} - -void SharderNonBlock::Schedule(const Closure &closure) { - if (!Enqueue(closure)) { - closure(); - } -} - -SharderNonBlock &SharderNonBlock::GetInstance() { - static SharderNonBlock sharder_non_block; - return sharder_non_block; -} - -int64_t SharderNonBlock::CeilMultiple(int64_t x, int64_t base) const { - if (base == 0) { - return 0; - } - int64_t ret = x / base; - if ((x % base) != 0) { - ret++; - } - - return ret; -} - -void SharderNonBlock::ParallelFor(int64_t total, int64_t per_unit_size, const SharderWork &work) { - AICPU_LOGI("total: %lld, per_unit_size: %lld", total, per_unit_size); - if ((total <= 0) || (work == nullptr)) { - AICPU_LOGE("invalid param: total<=0 or work is nullptr"); - return; - } - - // work itself - if ((schedule_ == nullptr) || (cpu_core_num_ <= 1)) { - AICPU_LOGI("work itself all"); - work(0, total); - return; - } - - // In order to ensure a smaller scheduling delay, the maximum number of slices is twice the number of CPU cores - const int64_t max_shard_num = static_cast(cpu_core_num_) * 2; - - // calculate shard number and block size - // i.e., if total is 118, perUintSize is 2, and cpu_core_num_ is 13 - // then shard_num is 24, block_size is 5 - int64_t block_size = std::max(int64_t{1}, std::min(total, per_unit_size)); - int64_t shard_num = CeilMultiple(total, block_size); - shard_num = std::min(max_shard_num, shard_num); - block_size = CeilMultiple(total, shard_num); - shard_num = CeilMultiple(total, block_size); - AICPU_LOGI("shard number: %lld, block size: %lld", shard_num, block_size); - - // There is no need to submit an event if shard_num is 1 - if (shard_num == 1) { - AICPU_LOGI("executes on the current thread"); - work(0, total); - return; - } - - std::atomic count(shard_num); // a counter - sem_t sem; - int32_t sem_init_ret = sem_init(&sem, 0, 0); - if (sem_init_ret == -1) { - AICPU_LOGE("sem_init error with message: %s", strerror(errno)); - work(0, total); - return; - } - - for (int64_t start = 0; start < total; start += block_size) { - auto limit = std::min(start + block_size, total); - Closure closure = [&sem, &work, &count, start, limit]() { - --count; - // In order to ensure that user's work function exception does not affect multithread services, - // exception capture is needed. Exception type is not cared here, and error log is printed. - try { - work(start, limit); - } catch (std::exception &) { - AICPU_LOGE("Exception occurred in work function with start: %lld, limit: %lld", start, limit); - } catch (...) { - AICPU_LOGE("Exception occurred in work function."); - } - - int32_t sem_post_ret = sem_post(&sem); - AICPU_SHARDER_IF_TRUE_RUN(sem_post_ret == -1, AICPU_LOGE("sem_post error with message: %s", strerror(errno))); - }; - - // if enqueue fail, work itself - if (!Enqueue(closure, true)) { - AICPU_LOGI("Enqueue fail, [%lld, %lld), work itself", start, limit); - closure(); - } - } - - if (do_task_ != nullptr) { - bool ret = true; - while ((count > 0) && ret) { - AICPU_LOGI("Main thread do task begin."); - ret = do_task_(); - AICPU_LOGI("Main thread do task end."); - } - } - - for (int64_t i = 0; i < shard_num; ++i) { - int sem_wait_ret = sem_wait(&sem); - AICPU_SHARDER_IF_TRUE_RUN(sem_wait_ret == -1, AICPU_LOGE("sem_wait error with message: %s", strerror(errno))); - } - int32_t sem_des_ret = sem_destroy(&sem); - AICPU_SHARDER_IF_TRUE_RUN(sem_des_ret == -1, AICPU_LOGE("sem_destroy error with message: %s", strerror(errno))); -} - -void SharderNonBlock::ParallelForHash(int64_t total, int64_t cpu_nums, const SharderWork &work) { - AICPU_LOGI("total: %lld, cpu_nums: %d", total, cpu_nums); - if (total <= 0 || work == nullptr) { - AICPU_LOGE("invalid param: total<=0 or work is nullptr"); - return; - } - - if ((schedule_ == nullptr) || (cpu_core_num_ <= 1)) { - AICPU_LOGE("schedule is nullptr or cpu core num is not enough"); - return; - } - - std::atomic count(cpu_nums); // a counter - - sem_t sem; - int32_t sem_init_ret = sem_init(&sem, 0, 0); - if (sem_init_ret == -1) { - AICPU_LOGE("sem_init error with message: %s", strerror(errno)); - return; - } - - for (int64_t cur = 0; cur < cpu_nums; cur++) { - Closure closure = [&sem, &work, &count, total, cur]() { - work(total, cur); - --count; - int32_t sem_post_ret = sem_post(&sem); - AICPU_SHARDER_IF_TRUE_RUN(sem_post_ret == -1, AICPU_LOGE("sem_post error with message: %s", strerror(errno))); - }; - - // if enqueue fail, work itself - if (!Enqueue(closure, true)) { - closure(); - } - } - - if (do_task_ != nullptr) { - bool ret = true; - while ((count > 0) && ret) { - ret = do_task_(); - } - } - - for (int64_t i = 0; i < cpu_nums; i++) { - int sem_wait_ret = sem_wait(&sem); - AICPU_SHARDER_IF_TRUE_RUN(sem_wait_ret == -1, AICPU_LOGE("sem_wait error with message: %s", strerror(errno))); - } - int32_t sem_des_ret = sem_destroy(&sem); - AICPU_SHARDER_IF_TRUE_RUN(sem_des_ret == -1, AICPU_LOGE("sem_destroy error with message: %s", strerror(errno))); -} -} // namespace aicpu - -/** - * Shards the "total" unit of work refer "perUintSize" - */ -void ParallelFor(int64_t total, int64_t per_unit_size, const aicpu::SharderWork &work) { - aicpu::SharderNonBlock::GetInstance().ParallelFor(total, per_unit_size, work); -} - -/** - * Get CPU number - */ -uint32_t GetCPUNum() { return aicpu::SharderNonBlock::GetInstance().GetCPUNum(); } diff --git a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/aicpu_sharder/aicpu_sharder.h b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/aicpu_sharder/aicpu_sharder.h index 8e5fcc2b812..3e6b8b8711e 100644 --- a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/aicpu_sharder/aicpu_sharder.h +++ b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/aicpu_sharder/aicpu_sharder.h @@ -1,5 +1,5 @@ /** - * Copyright 2021 Huawei Technologies Co., Ltd + * Copyright 2022 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,95 +18,10 @@ #define AICPU_OPS_AICPU_SHARDER_H_ #include -#include #include "common/kernel_util.h" namespace aicpu { -using Closure = std::function; -using ClosureBool = std::function; -using RunnerBool = std::function; using SharderWork = std::function; - -class SharderNonBlock { - public: - /** - * Get the unique object of this class - */ - static SharderNonBlock &GetInstance(); - - /** - * Register schedule callback function, do_task function and cpu core number - * called by compute process - * @param schedule Schedule callback function - * @param do_task Callback function for itself schedule - * @param cpu_core_num aicpu core number - */ - void Register(const RunnerBool &schedule, const ClosureBool &do_task, uint32_t cpu_core_num); - - /** - * Shards the "total" unit of work refer "perUintSize" - * @param total Total unit of work - * @param per_unit_size Minimum shard unit - * @param work should be a callable taking (int64, int64) arguments. - work(start, limit) computes the work units from [start, limit), - i.e., [start, limit) is a shard. - */ - void ParallelFor(int64_t total, int64_t per_unit_size, const SharderWork &work); - - /** - * Shards the unit of work refer for hash - * @param total, Total unit of work - * @param cpu_nums Number of cpu cores - * @param work should be a callable taking (int64, int64) arguments. - work(cur, cpu_nums) computes the work units with input hash with (cpu_nums-1) equals cur, - i.e. specially used by parallel unique op - */ - void ParallelForHash(int64_t total, int64_t cpu_nums, const SharderWork &work); - - /** - * Schedule a task use schedule function registered by compute process, - * note that the task will actually executed asynchronously - * @param closure Closure function with nothrow - */ - void Schedule(const Closure &closure); - - /** - * Get CPU number - * @param None - * @return CPU number - */ - uint32_t GetCPUNum() const { return cpu_core_num_; } - - private: - SharderNonBlock() : schedule_(nullptr), do_task_(nullptr), cpu_core_num_(0) {} - ~SharderNonBlock() = default; - - SharderNonBlock(const SharderNonBlock &) = delete; - SharderNonBlock &operator=(const SharderNonBlock &) = delete; - SharderNonBlock(SharderNonBlock &&) = delete; - SharderNonBlock &operator=(SharderNonBlock &&) = delete; - - /** - * Closure function enqueue - * @param closure Closure function can be called - * @param submit_topic whether submit topic, true means submit topic - * @return whether enqueue of closure success - */ - bool Enqueue(const Closure &closure, bool submit_topic = false); - - /** - * Calculate how many times, which ceiled, "x" is "base". - * i.e., x is 1, base is 2, this function will return 1 - * @param x An integral - * @param base An integral as base when cal multiple - * @return ceiled multiple - */ - int64_t CeilMultiple(int64_t x, int64_t base) const; - - RunnerBool schedule_; // enqueue runner - ClosureBool do_task_; // a callback, do task from task queue - uint32_t cpu_core_num_; // aicpu core number -}; // SharderNonBlock } // namespace aicpu extern "C" { @@ -118,14 +33,15 @@ extern "C" { work(start, limit) computes the work units from [start, limit), i.e., [start, limit) is a shard. */ -AICPU_VISIBILITY_API void ParallelFor(int64_t total, int64_t per_unit_size, const aicpu::SharderWork &work); +__attribute__((weak)) AICPU_VISIBILITY_API void ParallelFor(int64_t total, int64_t per_unit_size, + const aicpu::SharderWork &work); /** * Get CPU number * @param None * @return CPU number */ -AICPU_VISIBILITY_API uint32_t GetCPUNum(); +__attribute__((weak)) AICPU_VISIBILITY_API uint32_t GetCPUNum(); } #endif // AICPU_OPS_AICPU_SHARDER_H_ diff --git a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/drop_out_gen_mask_kernels.cc b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/drop_out_gen_mask_kernels.cc index 575133253cd..e1e2aac530a 100644 --- a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/drop_out_gen_mask_kernels.cc +++ b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/drop_out_gen_mask_kernels.cc @@ -354,7 +354,7 @@ uint32_t DropOutGenMaskKernel::DoCompute() { }; const int64_t total_unit = static_cast(byte_count >> 4); const int64_t perUnitSize = 1; // shard unit size - aicpu::SharderNonBlock::GetInstance().ParallelFor(total_unit, perUnitSize, shards); + ParallelFor(total_unit, perUnitSize, shards); const int64_t margin = 1021; // the margin of offset OffsetAdd(bit_count + margin, g_offset, g_offset); auto offset0 = reinterpret_cast(io_addrs_[2]); diff --git a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/gather_grad_kernels.cc b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/gather_grad_kernels.cc index 7c28385093d..7d1b6e0021f 100644 --- a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/gather_grad_kernels.cc +++ b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/gather_grad_kernels.cc @@ -66,7 +66,7 @@ static uint32_t GatherGrad(const T *index, const S *grad, S *output, int64_t dim }; const int64_t per_unit_size = number / std::thread::hardware_concurrency(); - SharderNonBlock::GetInstance().ParallelFor(number, per_unit_size, shard_gather_grad); + ParallelFor(number, per_unit_size, shard_gather_grad); if (status) { return kAicpuKernelStateFailed; } diff --git a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/slice_grad_kernel.cc b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/slice_grad_kernel.cc index 36d04bcca35..c5c32ac5381 100644 --- a/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/slice_grad_kernel.cc +++ b/mindspore/ccsrc/plugin/device/ascend/kernel/aicpu/aicpu_ops/slice_grad_kernel.cc @@ -191,7 +191,7 @@ uint32_t SliceGradKernel::SliceGradTask() { block_num *= dy_shape_[i]; } const int64_t per_unit_size = block_num / std::thread::hardware_concurrency(); - SharderNonBlock::GetInstance().ParallelFor(block_num, per_unit_size, block_task); + ParallelFor(block_num, per_unit_size, block_task); return kAicpuKernelStateSucess; }