!46007 Fix aicpu parallel bug.

Merge pull request !46007 from linqingke/paraller_r2.0
This commit is contained in:
i-robot 2022-11-26 02:05:45 +00:00 committed by Gitee
commit eb1c612060
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 7 additions and 310 deletions

View File

@ -27,7 +27,6 @@ if(EXISTS ${CMAKE_C_COMPILER} AND EXISTS ${CMAKE_CXX_COMPILER})
${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_async_event.cc
${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_context.cc
${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_pulse.cc
${CMAKE_CURRENT_SOURCE_DIR}/aicpu_sharder/aicpu_sharder.cc
${CMAKE_CURRENT_SOURCE_DIR}/random_choice_with_mask_kernels.cc
${CMAKE_CURRENT_SOURCE_DIR}/gather_grad_kernels.cc
${CMAKE_CURRENT_SOURCE_DIR}/environ/aicpu_environ_manager.cc

View File

@ -1,218 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "aicpu_sharder/aicpu_sharder.h"
#include <semaphore.h>
#include <unistd.h>
#include <error.h>
#include <atomic>
#include <algorithm>
#include <cerrno>
#include <cstring>
#include "common/kernel_log.h"
namespace aicpu {
#define AICPU_SHARDER_IF_TRUE_RUN(expr, run) \
do { \
if (expr) { \
run; \
} \
} while (0)
void SharderNonBlock::Register(const RunnerBool &schedule, const ClosureBool &do_task, uint32_t cpu_core_num) {
schedule_ = schedule;
do_task_ = do_task;
cpu_core_num_ = cpu_core_num;
}
bool SharderNonBlock::Enqueue(const Closure &closure, bool submit_topic) {
if (schedule_ != nullptr) {
return schedule_(closure, submit_topic);
}
return false;
}
void SharderNonBlock::Schedule(const Closure &closure) {
if (!Enqueue(closure)) {
closure();
}
}
SharderNonBlock &SharderNonBlock::GetInstance() {
static SharderNonBlock sharder_non_block;
return sharder_non_block;
}
int64_t SharderNonBlock::CeilMultiple(int64_t x, int64_t base) const {
if (base == 0) {
return 0;
}
int64_t ret = x / base;
if ((x % base) != 0) {
ret++;
}
return ret;
}
void SharderNonBlock::ParallelFor(int64_t total, int64_t per_unit_size, const SharderWork &work) {
AICPU_LOGI("total: %lld, per_unit_size: %lld", total, per_unit_size);
if ((total <= 0) || (work == nullptr)) {
AICPU_LOGE("invalid param: total<=0 or work is nullptr");
return;
}
// work itself
if ((schedule_ == nullptr) || (cpu_core_num_ <= 1)) {
AICPU_LOGI("work itself all");
work(0, total);
return;
}
// In order to ensure a smaller scheduling delay, the maximum number of slices is twice the number of CPU cores
const int64_t max_shard_num = static_cast<int64_t>(cpu_core_num_) * 2;
// calculate shard number and block size
// i.e., if total is 118, perUintSize is 2, and cpu_core_num_ is 13
// then shard_num is 24, block_size is 5
int64_t block_size = std::max(int64_t{1}, std::min(total, per_unit_size));
int64_t shard_num = CeilMultiple(total, block_size);
shard_num = std::min(max_shard_num, shard_num);
block_size = CeilMultiple(total, shard_num);
shard_num = CeilMultiple(total, block_size);
AICPU_LOGI("shard number: %lld, block size: %lld", shard_num, block_size);
// There is no need to submit an event if shard_num is 1
if (shard_num == 1) {
AICPU_LOGI("executes on the current thread");
work(0, total);
return;
}
std::atomic<int64_t> count(shard_num); // a counter
sem_t sem;
int32_t sem_init_ret = sem_init(&sem, 0, 0);
if (sem_init_ret == -1) {
AICPU_LOGE("sem_init error with message: %s", strerror(errno));
work(0, total);
return;
}
for (int64_t start = 0; start < total; start += block_size) {
auto limit = std::min(start + block_size, total);
Closure closure = [&sem, &work, &count, start, limit]() {
--count;
// In order to ensure that user's work function exception does not affect multithread services,
// exception capture is needed. Exception type is not cared here, and error log is printed.
try {
work(start, limit);
} catch (std::exception &) {
AICPU_LOGE("Exception occurred in work function with start: %lld, limit: %lld", start, limit);
} catch (...) {
AICPU_LOGE("Exception occurred in work function.");
}
int32_t sem_post_ret = sem_post(&sem);
AICPU_SHARDER_IF_TRUE_RUN(sem_post_ret == -1, AICPU_LOGE("sem_post error with message: %s", strerror(errno)));
};
// if enqueue fail, work itself
if (!Enqueue(closure, true)) {
AICPU_LOGI("Enqueue fail, [%lld, %lld), work itself", start, limit);
closure();
}
}
if (do_task_ != nullptr) {
bool ret = true;
while ((count > 0) && ret) {
AICPU_LOGI("Main thread do task begin.");
ret = do_task_();
AICPU_LOGI("Main thread do task end.");
}
}
for (int64_t i = 0; i < shard_num; ++i) {
int sem_wait_ret = sem_wait(&sem);
AICPU_SHARDER_IF_TRUE_RUN(sem_wait_ret == -1, AICPU_LOGE("sem_wait error with message: %s", strerror(errno)));
}
int32_t sem_des_ret = sem_destroy(&sem);
AICPU_SHARDER_IF_TRUE_RUN(sem_des_ret == -1, AICPU_LOGE("sem_destroy error with message: %s", strerror(errno)));
}
void SharderNonBlock::ParallelForHash(int64_t total, int64_t cpu_nums, const SharderWork &work) {
AICPU_LOGI("total: %lld, cpu_nums: %d", total, cpu_nums);
if (total <= 0 || work == nullptr) {
AICPU_LOGE("invalid param: total<=0 or work is nullptr");
return;
}
if ((schedule_ == nullptr) || (cpu_core_num_ <= 1)) {
AICPU_LOGE("schedule is nullptr or cpu core num is not enough");
return;
}
std::atomic<int64_t> count(cpu_nums); // a counter
sem_t sem;
int32_t sem_init_ret = sem_init(&sem, 0, 0);
if (sem_init_ret == -1) {
AICPU_LOGE("sem_init error with message: %s", strerror(errno));
return;
}
for (int64_t cur = 0; cur < cpu_nums; cur++) {
Closure closure = [&sem, &work, &count, total, cur]() {
work(total, cur);
--count;
int32_t sem_post_ret = sem_post(&sem);
AICPU_SHARDER_IF_TRUE_RUN(sem_post_ret == -1, AICPU_LOGE("sem_post error with message: %s", strerror(errno)));
};
// if enqueue fail, work itself
if (!Enqueue(closure, true)) {
closure();
}
}
if (do_task_ != nullptr) {
bool ret = true;
while ((count > 0) && ret) {
ret = do_task_();
}
}
for (int64_t i = 0; i < cpu_nums; i++) {
int sem_wait_ret = sem_wait(&sem);
AICPU_SHARDER_IF_TRUE_RUN(sem_wait_ret == -1, AICPU_LOGE("sem_wait error with message: %s", strerror(errno)));
}
int32_t sem_des_ret = sem_destroy(&sem);
AICPU_SHARDER_IF_TRUE_RUN(sem_des_ret == -1, AICPU_LOGE("sem_destroy error with message: %s", strerror(errno)));
}
} // namespace aicpu
/**
* Shards the "total" unit of work refer "perUintSize"
*/
void ParallelFor(int64_t total, int64_t per_unit_size, const aicpu::SharderWork &work) {
aicpu::SharderNonBlock::GetInstance().ParallelFor(total, per_unit_size, work);
}
/**
* Get CPU number
*/
uint32_t GetCPUNum() { return aicpu::SharderNonBlock::GetInstance().GetCPUNum(); }

View File

@ -1,5 +1,5 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,95 +18,10 @@
#define AICPU_OPS_AICPU_SHARDER_H_
#include <functional>
#include <vector>
#include "common/kernel_util.h"
namespace aicpu {
using Closure = std::function<void()>;
using ClosureBool = std::function<bool()>;
using RunnerBool = std::function<bool(Closure, bool)>;
using SharderWork = std::function<void(int64_t, int64_t)>;
class SharderNonBlock {
public:
/**
* Get the unique object of this class
*/
static SharderNonBlock &GetInstance();
/**
* Register schedule callback function, do_task function and cpu core number
* called by compute process
* @param schedule Schedule callback function
* @param do_task Callback function for itself schedule
* @param cpu_core_num aicpu core number
*/
void Register(const RunnerBool &schedule, const ClosureBool &do_task, uint32_t cpu_core_num);
/**
* Shards the "total" unit of work refer "perUintSize"
* @param total Total unit of work
* @param per_unit_size Minimum shard unit
* @param work should be a callable taking (int64, int64) arguments.
work(start, limit) computes the work units from [start, limit),
i.e., [start, limit) is a shard.
*/
void ParallelFor(int64_t total, int64_t per_unit_size, const SharderWork &work);
/**
* Shards the unit of work refer for hash
* @param total, Total unit of work
* @param cpu_nums Number of cpu cores
* @param work should be a callable taking (int64, int64) arguments.
work(cur, cpu_nums) computes the work units with input hash with (cpu_nums-1) equals cur,
i.e. specially used by parallel unique op
*/
void ParallelForHash(int64_t total, int64_t cpu_nums, const SharderWork &work);
/**
* Schedule a task use schedule function registered by compute process,
* note that the task will actually executed asynchronously
* @param closure Closure function with nothrow
*/
void Schedule(const Closure &closure);
/**
* Get CPU number
* @param None
* @return CPU number
*/
uint32_t GetCPUNum() const { return cpu_core_num_; }
private:
SharderNonBlock() : schedule_(nullptr), do_task_(nullptr), cpu_core_num_(0) {}
~SharderNonBlock() = default;
SharderNonBlock(const SharderNonBlock &) = delete;
SharderNonBlock &operator=(const SharderNonBlock &) = delete;
SharderNonBlock(SharderNonBlock &&) = delete;
SharderNonBlock &operator=(SharderNonBlock &&) = delete;
/**
* Closure function enqueue
* @param closure Closure function can be called
* @param submit_topic whether submit topic, true means submit topic
* @return whether enqueue of closure success
*/
bool Enqueue(const Closure &closure, bool submit_topic = false);
/**
* Calculate how many times, which ceiled, "x" is "base".
* i.e., x is 1, base is 2, this function will return 1
* @param x An integral
* @param base An integral as base when cal multiple
* @return ceiled multiple
*/
int64_t CeilMultiple(int64_t x, int64_t base) const;
RunnerBool schedule_; // enqueue runner
ClosureBool do_task_; // a callback, do task from task queue
uint32_t cpu_core_num_; // aicpu core number
}; // SharderNonBlock
} // namespace aicpu
extern "C" {
@ -118,14 +33,15 @@ extern "C" {
work(start, limit) computes the work units from [start, limit),
i.e., [start, limit) is a shard.
*/
AICPU_VISIBILITY_API void ParallelFor(int64_t total, int64_t per_unit_size, const aicpu::SharderWork &work);
__attribute__((weak)) AICPU_VISIBILITY_API void ParallelFor(int64_t total, int64_t per_unit_size,
const aicpu::SharderWork &work);
/**
* Get CPU number
* @param None
* @return CPU number
*/
AICPU_VISIBILITY_API uint32_t GetCPUNum();
__attribute__((weak)) AICPU_VISIBILITY_API uint32_t GetCPUNum();
}
#endif // AICPU_OPS_AICPU_SHARDER_H_

View File

@ -354,7 +354,7 @@ uint32_t DropOutGenMaskKernel::DoCompute() {
};
const int64_t total_unit = static_cast<int64_t>(byte_count >> 4);
const int64_t perUnitSize = 1; // shard unit size
aicpu::SharderNonBlock::GetInstance().ParallelFor(total_unit, perUnitSize, shards);
ParallelFor(total_unit, perUnitSize, shards);
const int64_t margin = 1021; // the margin of offset
OffsetAdd(bit_count + margin, g_offset, g_offset);
auto offset0 = reinterpret_cast<uint64_t *>(io_addrs_[2]);

View File

@ -66,7 +66,7 @@ static uint32_t GatherGrad(const T *index, const S *grad, S *output, int64_t dim
};
const int64_t per_unit_size = number / std::thread::hardware_concurrency();
SharderNonBlock::GetInstance().ParallelFor(number, per_unit_size, shard_gather_grad);
ParallelFor(number, per_unit_size, shard_gather_grad);
if (status) {
return kAicpuKernelStateFailed;
}

View File

@ -191,7 +191,7 @@ uint32_t SliceGradKernel::SliceGradTask() {
block_num *= dy_shape_[i];
}
const int64_t per_unit_size = block_num / std::thread::hardware_concurrency();
SharderNonBlock::GetInstance().ParallelFor(block_num, per_unit_size, block_task);
ParallelFor(block_num, per_unit_size, block_task);
return kAicpuKernelStateSucess;
}