!16744 add refactor ThreadPool

From: @yangjie159
Reviewed-by: @zhang_xue_tong,@wangchengyuan
Signed-off-by: @zhang_xue_tong
This commit is contained in:
mindspore-ci-bot 2021-05-24 09:14:21 +08:00 committed by Gitee
commit 64906321fc
6 changed files with 872 additions and 0 deletions

View File

@ -0,0 +1,355 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "thread/core_affinity.h"
#include <string.h>
#include <stdlib.h>
#include <string>
#include <algorithm>
#ifdef __ANDROID__
#include <sched.h>
#endif
#ifdef MS_COMPILE_IOS
#include <sys/types.h>
#include <sys/sysctl.h>
#include <mach/machine.h>
#endif // MS_COMPILE_IOS
#include "thread/threadpool.h"
#ifdef BIND_CORE
namespace mindspore {
#define MAX_PATH_SIZE (256)
enum Arch {
UnKnown_Arch = 0,
Cortex_A5,
Cortex_A7,
Cortex_A8,
Cortex_A9,
Cortex_A12,
Cortex_A15,
Cortex_A17,
Cortex_A32,
Cortex_A34,
Cortex_A35,
Cortex_A53,
Cortex_A55,
Cortex_A57,
Cortex_A65,
Cortex_A72,
Cortex_A73,
Cortex_A75,
Cortex_A76,
Cortex_A77,
Cortex_A78,
Cortex_X1
};
typedef struct {
int core_id;
int max_freq;
enum Arch arch;
} CpuInfo;
enum Arch GetArch(int cpu_part) {
typedef struct {
int part;
enum Arch arch;
} ArchSet;
// https://en.wikipedia.org/wiki/Comparison_of_ARMv7-A_cores
// https://en.wikipedia.org/wiki/Comparison_of_ARMv8-A_cores
std::vector<ArchSet> arch_set = {
{0x800, Cortex_A73}, // High-performance Kryo 260 (r10p2) / Kryo 280 (r10p1) "Gold" -> Cortex-A73
{0x801, Cortex_A53}, // Low-power Kryo 260 / 280 "Silver" -> Cortex-A53
{0x802, Cortex_A75}, // High-performance Kryo 385 "Gold" -> Cortex-A75
{0x803, Cortex_A55}, // Low-power Kryo 385 "Silver" -> Cortex-A55r0
{0x804, Cortex_A76}, // High-performance Kryo 485 "Gold" / "Gold Prime" -> Cortex-A76
{0x805, Cortex_A55}, // Low-performance Kryo 485 "Silver" -> Cortex-A55
{0xC05, Cortex_A5},
{0xC07, Cortex_A7},
{0xC08, Cortex_A8},
{0xC09, Cortex_A9},
{0xC0C, Cortex_A12},
{0xC0D, Cortex_A12},
{0xC0E, Cortex_A17},
{0xC0F, Cortex_A15},
{0xD01, Cortex_A32}, // also Huawei Kunpeng 920
// series taishan_v110 when not
// on android
{0xD02, Cortex_A34},
{0xD03, Cortex_A53},
{0xD04, Cortex_A35},
{0xD05, Cortex_A55},
{0xD06, Cortex_A65},
{0xD07, Cortex_A57},
{0xD08, Cortex_A72},
{0xD09, Cortex_A73},
{0xD0A, Cortex_A75},
{0xD0B, Cortex_A76},
{0xD0D, Cortex_A77},
{0xD0E, Cortex_A76}, // Cortex-A76AE
{0xD40, Cortex_A76}, // Kirin 980 Big/Medium cores -> Cortex-A76
{0xD41, Cortex_A78},
{0xD43, Cortex_A65}, // Cortex-A65AE
{0xD44, Cortex_X1}};
auto item =
std::find_if(arch_set.begin(), arch_set.end(), [&cpu_part](const ArchSet &a) { return a.part == cpu_part; });
return item != arch_set.end() ? item->arch : UnKnown_Arch;
}
int ParseCpuPart(const char *line, int start, int size) {
int cpu_part = 0;
for (int i = start; i < size && i < start + 3; i++) {
char c = line[i];
int d;
if (c >= '0' && c <= '9') {
d = c - '0';
} else if ((c - 'A') < 6) {
d = 10 + (c - 'A');
} else if ((c - 'a') < 6) {
d = 10 + (c - 'a');
} else {
THREAD_ERROR("CPU part in /proc/cpuinfo is ignored due to unexpected non-hex character");
break;
}
cpu_part = cpu_part * 16 + d;
}
return cpu_part;
}
int SetArch(std::vector<CpuInfo> *freq_set, int core_num) {
if (core_num <= 0) {
THREAD_ERROR("core_num must be greater than 0.");
return THREAD_ERROR;
}
FILE *fp = fopen("/proc/cpuinfo", "r");
if (fp == nullptr) {
THREAD_ERROR("read /proc/cpuinfo error.");
return THREAD_ERROR;
}
std::vector<Arch> archs;
archs.resize(core_num);
const int max_line_size = 1024;
char line[max_line_size] = {0};
int count = 0;
while (!feof(fp)) {
if (fgets(line, max_line_size, fp)) {
// line start with "CPU part"
if (0 == memcmp(line, "CPU part", 8)) {
// get number like 0xD03
for (int i = 0; i < max_line_size - 4; ++i) {
if (line[i] == '0' && line[i + 1] == 'x') {
int cpu_part = ParseCpuPart(line, i + 2, max_line_size);
enum Arch arch = GetArch(cpu_part);
if (arch == UnKnown_Arch) {
THREAD_ERROR("cpu's architecture is unknown.");
fclose(fp);
return THREAD_ERROR;
}
count++;
if (count > core_num) {
THREAD_ERROR("number of cpu_part in /proc/cpuinfo is more than core_num.");
fclose(fp);
return THREAD_ERROR;
}
archs[count - 1] = arch;
}
}
}
}
}
if (count < core_num) {
THREAD_ERROR("number of cpu_part in /proc/cpuinfo is less than core_num.");
fclose(fp);
return THREAD_ERROR;
}
for (int i = 0; i < core_num; ++i) {
(*freq_set)[i].arch = archs[i];
}
fclose(fp);
return THREAD_OK;
}
int GetMaxFrequency(int core_id) {
FILE *fp;
std::vector<std::string> paths = {"/sys/devices/system/cpu/cpufreq/stats/cpu",
"/sys/devices/system/cpu/cpufreq/stats/cpu", "/sys/devices/system/cpu/cpu"};
std::vector<std::string> files = {"/time_in_state", "/cpufreq/stats/time_in_state", "/cpufreq/cpuinfo_max_freq"};
for (size_t i = 0; i < paths.size(); ++i) {
std::string file = paths[i] + std::to_string(core_id) + files[i];
fp = fopen(file.c_str(), "rb");
if (fp != nullptr) {
break;
}
}
int max_freq = -1;
if (fp == nullptr) {
THREAD_ERROR("open system file failed");
return max_freq;
}
while (feof(fp) == 0) {
int freq = 0;
int tmp = fscanf(fp, "%d", &freq);
if (tmp != 1) {
break;
}
if (freq > max_freq) {
max_freq = freq;
}
}
fclose(fp);
return max_freq;
}
int CoreAffinity::SortCPUProcessors() {
core_num_ = std::thread::hardware_concurrency();
std::vector<CpuInfo> freq_set;
freq_set.resize(core_num_);
for (size_t i = 0; i < core_num_; ++i) {
int max_freq = GetMaxFrequency(i);
freq_set[i].core_id = i;
freq_set[i].max_freq = max_freq;
freq_set[i].arch = UnKnown_Arch;
}
int err_code = SetArch(&freq_set, core_num_);
if (err_code != THREAD_OK) {
THREAD_INFO("set arch failed, ignoring arch.");
}
// sort core id by frequency into descending order
for (size_t i = 0; i < core_num_; ++i) {
for (size_t j = i + 1; j < core_num_; ++j) {
if (freq_set[i].max_freq < freq_set[j].max_freq ||
(freq_set[i].max_freq == freq_set[j].max_freq && freq_set[i].arch <= freq_set[j].arch)) {
CpuInfo temp = freq_set[i];
freq_set[i] = freq_set[j];
freq_set[j] = temp;
}
}
}
int max_freq = freq_set.front().max_freq;
for (const auto &info : freq_set) {
THREAD_INFO("sorted core id: %d, max frequency: %d, arch: %d", info.core_id, info.max_freq, info.arch);
sorted_id_.push_back(info.core_id);
higher_num_ += info.max_freq == max_freq ? 1 : 0;
}
return THREAD_OK;
}
int CoreAffinity::InitBindCoreId(size_t thread_num, BindMode bind_mode) {
int ret = SortCPUProcessors();
if (ret != THREAD_OK) {
return THREAD_ERROR;
}
if (core_num_ != sorted_id_.size()) {
THREAD_ERROR("init sorted core id failed");
return THREAD_ERROR;
}
thread_num_ = thread_num;
bind_id_.clear();
if (bind_mode == Power_Higher || bind_mode == Power_NoBind) {
for (size_t i = 0; i < thread_num_; ++i) {
bind_id_.push_back(sorted_id_[i % core_num_]);
}
} else if (bind_mode == Power_Middle) {
for (size_t i = 0; i < thread_num_; ++i) {
bind_id_.push_back(sorted_id_[(i + higher_num_) % core_num_]);
}
} else {
return THREAD_ERROR;
}
return THREAD_OK;
}
int CoreAffinity::SetAffinity(pthread_t thread_id, cpu_set_t *cpuSet) const {
#ifdef __ANDROID__
#if __ANDROID_API__ >= 21
THREAD_INFO("thread: %d, mask: %lu", pthread_gettid_np(thread_id), cpuSet->__bits[0]);
int ret = sched_setaffinity(pthread_gettid_np(thread_id), sizeof(cpu_set_t), cpuSet);
if (ret != THREAD_OK) {
THREAD_ERROR("bind thread %d to cpu failed. ERROR %d", pthread_gettid_np(thread_id), ret);
return THREAD_OK;
}
#endif
#else
#if defined(__APPLE__)
THREAD_ERROR("not bind thread to apple's cpu.");
return THREAD_ERROR;
#else
int ret = pthread_setaffinity_np(thread_id, sizeof(cpu_set_t), cpuSet);
if (ret != THREAD_OK) {
THREAD_ERROR("set thread: %lu to cpu failed", thread_id);
return THREAD_ERROR;
}
#endif // __APPLE__
#endif
return THREAD_OK;
}
int CoreAffinity::FreeScheduleThreads(const std::vector<Worker *> &workers) const {
if (thread_num_ != workers.size()) {
return THREAD_ERROR;
}
cpu_set_t mask;
CPU_ZERO(&mask);
for (size_t i = 0; i < thread_num_; ++i) {
CPU_SET(sorted_id_[i], &mask);
}
for (size_t i = 0; i < thread_num_; ++i) {
int ret = SetAffinity(workers[i]->thread.native_handle(), &mask);
if (ret != THREAD_OK) {
THREAD_ERROR("set thread[%zu] affinity failed", i);
return THREAD_ERROR;
}
}
return THREAD_OK;
}
int CoreAffinity::BindThreadsToCoreList(const std::vector<Worker *> &workers) const {
if (bind_id_.size() != thread_num_) {
THREAD_ERROR("invalid core list");
return THREAD_ERROR;
}
for (size_t i = 0; i < thread_num_; ++i) {
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(bind_id_[i], &mask);
// affinity mask determines the CPU core which it is eligible to run
int ret = SetAffinity(workers[i]->thread.native_handle(), &mask);
if (ret != THREAD_OK) {
THREAD_ERROR("set thread[%zu] affinity failed", i);
return THREAD_ERROR;
}
THREAD_INFO("bind thread[%zu] success", i);
}
return THREAD_OK;
}
int CoreAffinity::BindThreads(const std::vector<Worker *> &workers, BindMode bind_mode) const {
if (bind_mode == Power_NoBind) {
return FreeScheduleThreads(workers);
} else {
return BindThreadsToCoreList(workers);
}
}
int CoreAffinity::BindThreads(const std::vector<Worker *> &workers, const std::vector<int> &core_list) {
bind_id_ = core_list;
return BindThreadsToCoreList(workers);
}
} // namespace mindspore
#endif // BIND_CORE

View File

@ -0,0 +1,58 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_RUNTIME_CORE_AFFINITY_H_
#define MINDSPORE_CORE_MINDRT_RUNTIME_CORE_AFFINITY_H_
#include <vector>
#include "thread/threadpool.h"
#ifdef BIND_CORE
namespace mindspore {
class CoreAffinity {
public:
static CoreAffinity *GetInstance() {
static CoreAffinity affinity;
return &affinity;
}
int InitBindCoreId(size_t thread_num, BindMode bind_mode);
int BindThreads(const std::vector<Worker *> &workers, const std::vector<int> &core_list);
int BindThreads(const std::vector<Worker *> &workers, BindMode bind_mode) const;
private:
CoreAffinity() = default;
~CoreAffinity() = default;
int BindThreadsToCoreList(const std::vector<Worker *> &workers) const;
int FreeScheduleThreads(const std::vector<Worker *> &workers) const;
int SetAffinity(pthread_t thread_id, cpu_set_t *cpuSet) const;
int SortCPUProcessors();
// bind_id contains the CPU cores to bind
// the size of bind_id is equal to the size of workers
std::vector<int> bind_id_;
// sorted_id contains the ordered CPU core id
// the size of sorted_id is equal to the size of hardware_concurrency
std::vector<int> sorted_id_;
size_t core_num_{0};
size_t higher_num_{0};
size_t thread_num_{0};
};
} // namespace mindspore
#endif // BIND_CORE
#endif // MINDSPORE_CORE_MINDRT_RUNTIME_CORE_AFFINITY_H_

View File

@ -0,0 +1,102 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "thread/inter_threadpool.h"
#include "thread/core_affinity.h"
namespace mindspore {
InterThreadPool::~InterThreadPool() {
exit_ = true;
alive_ = false;
actor_cond_var_.notify_all();
DestructThreads();
}
void InterThreadPool::ActorThreadRun() {
ActorReference actor;
{
std::unique_lock<std::mutex> _l(actor_mutex_);
actor_cond_var_.wait(_l, [&]() { return !actor_queue_.empty() || exit_; });
if (exit_ && actor_queue_.empty()) {
return;
}
actor = actor_queue_.front();
actor_queue_.pop();
}
actor->Run();
}
void InterThreadPool::ThreadAsyncRun(size_t thread_id) {
{
std::unique_lock<std::mutex> _l(pool_mutex_);
start_cond_.wait(_l, [this]() { return workers_.size() == thread_num_; });
}
Worker *worker = workers_[thread_id];
THREAD_RETURN_IF_NULL(worker);
while (alive_) {
if (worker->type == kKernelThread) {
KernelThreadRun(worker);
} else if (worker->type == kActorThread) {
ActorThreadRun();
}
}
}
void InterThreadPool::EnqueReadyActor(const ActorReference &actor) {
{
std::lock_guard<std::mutex> _l(actor_mutex_);
actor_queue_.push(actor);
}
actor_cond_var_.notify_one();
THREAD_INFO("actor enqueue success");
}
InterThreadPool *InterThreadPool::CreateThreadPool(size_t inter_thread_num, size_t intra_thread_num,
BindMode bind_mode) {
InterThreadPool *pool = new (std::nothrow) InterThreadPool(inter_thread_num);
if (pool == nullptr) {
return nullptr;
}
size_t thread_num = inter_thread_num * intra_thread_num;
int ret = pool->CreateThreads(thread_num);
if (ret != THREAD_OK) {
delete pool;
return nullptr;
}
#ifdef BIND_CORE
ret = CoreAffinity::GetInstance()->InitBindCoreId(thread_num, bind_mode);
if (ret != THREAD_OK) {
delete pool;
return nullptr;
}
#endif // BIND_CORE
return pool;
}
InterThreadPool *InterThreadPool::CreateThreadPool(size_t thread_num) {
InterThreadPool *pool = new (std::nothrow) InterThreadPool(thread_num);
if (pool == nullptr) {
return nullptr;
}
int ret = pool->CreateThreads(thread_num);
if (ret != THREAD_OK) {
delete pool;
return nullptr;
}
return pool;
}
} // namespace mindspore

View File

@ -0,0 +1,53 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_RUNTIME_INTER_THREADPOOL_H_
#define MINDSPORE_CORE_MINDRT_RUNTIME_INTER_THREADPOOL_H_
#include <queue>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include "thread/threadpool.h"
#include "actor/actor.h"
namespace mindspore {
class InterThreadPool : public ThreadPool {
public:
// create ThreadPool that contains inter thread and intra thread
static InterThreadPool *CreateThreadPool(size_t inter_thread_num, size_t intra_thread_num, BindMode bind_mode);
// create ThreadPool that contains only actor thread
static InterThreadPool *CreateThreadPool(size_t thread_num);
~InterThreadPool() override;
void EnqueReadyActor(const ActorReference &actor);
private:
explicit InterThreadPool(size_t inter_thread_num) { inter_thread_num_ = inter_thread_num; }
void ThreadAsyncRun(size_t thread_id) override;
void ActorThreadRun();
std::mutex actor_mutex_;
std::condition_variable actor_cond_var_;
std::queue<ActorReference> actor_queue_;
std::atomic_bool exit_{false};
};
} // namespace mindspore
#endif // MINDSPORE_CORE_MINDRT_RUNTIME_INTER_THREADPOOL_H_

View File

@ -0,0 +1,180 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "thread/threadpool.h"
#include <unistd.h>
#include <algorithm>
#include "thread/core_affinity.h"
namespace mindspore {
constexpr int kDefaultSpinCount = 30000;
ThreadPool::~ThreadPool() {
alive_ = false;
DestructThreads();
}
void ThreadPool::DestructThreads() {
std::lock_guard<std::mutex> lock(pool_mutex_);
for (auto &worker : workers_) {
sem_post(&worker->sem);
if (worker->thread.joinable()) {
worker->thread.join();
}
sem_destroy(&worker->sem);
delete worker;
worker = nullptr;
}
THREAD_INFO("deconstruct threads success");
workers_.clear();
}
int ThreadPool::CreateThreads(size_t thread_num) {
std::lock_guard<std::mutex> lock(pool_mutex_);
size_t core_num = std::thread::hardware_concurrency();
thread_num_ = std::min(thread_num, core_num);
if (thread_num_ <= 0) {
THREAD_ERROR("thread num is invalid");
return THREAD_ERROR;
}
for (size_t i = 0; i < thread_num_; ++i) {
Worker *worker = new (std::nothrow) Worker();
THREAD_ERROR_IF_NULL(worker);
worker->type = i < inter_thread_num_ ? kActorThread : kKernelThread;
worker->thread = std::thread(&ThreadPool::ThreadAsyncRun, this, i);
sem_init(&worker->sem, 0, 0);
workers_.push_back(worker);
THREAD_INFO("create thread[%zu]", i);
}
freelist_.insert(freelist_.begin(), workers_.begin() + inter_thread_num_, workers_.end());
start_cond_.notify_all();
return THREAD_OK;
}
void ThreadPool::KernelThreadRun(Worker *worker) {
if (sem_trywait(&worker->sem) == THREAD_OK) {
Task *task = worker->task;
if (task == nullptr) {
return;
}
task->status |= task->func(task->content, ++task->task_id);
++task->finished;
worker->task = nullptr;
{
std::lock_guard<std::mutex> _l(pool_mutex_);
freelist_.push_back(worker);
}
} else {
std::this_thread::yield();
worker->spin++;
if (worker->spin >= kDefaultSpinCount) {
worker->spin = 0;
sem_wait(&worker->sem);
sem_post(&worker->sem);
}
}
}
void ThreadPool::ThreadAsyncRun(size_t thread_id) {
{
// wait for all threads to be created
std::unique_lock<std::mutex> _l(pool_mutex_);
start_cond_.wait(_l, [this]() { return workers_.size() == thread_num_; });
}
Worker *worker = workers_[thread_id];
THREAD_RETURN_IF_NULL(worker);
while (alive_) {
KernelThreadRun(worker);
}
}
int ThreadPool::ParallelLaunch(const Func &func, Contend contend, int task_num) {
// distribute task to the KernelThread and the free ActorThread,
// if the task num is greater than the KernelThread num
Task task = Task(func, contend);
DistributeTask(&task, task_num);
task.status |= task.func(task.content, 0);
++task.finished;
// synchronization
// wait until the finished is equal to task_num
while (task.finished != task_num) {
std::this_thread::yield();
}
// check the return value of task
if (task.status != THREAD_OK) {
return THREAD_ERROR;
}
return THREAD_OK;
}
void ThreadPool::DistributeTask(Task *task, int task_num) {
int count = 0;
while (count < task_num - 1) {
std::lock_guard<std::mutex> _l(pool_mutex_);
if (!freelist_.empty()) {
Worker *worker = freelist_.back();
freelist_.pop_back();
worker->task = task;
sem_post(&worker->sem);
count++;
}
}
}
int ThreadPool::SetCpuAffinity(BindMode bind_mode) {
if (workers_.empty()) {
return THREAD_ERROR;
}
#ifdef BIND_CORE
return CoreAffinity::GetInstance()->BindThreads(workers_, bind_mode);
#else
return THREAD_OK;
#endif // BIND_CORE
}
int ThreadPool::SetCpuAffinity(const std::vector<int> &core_list) {
if (workers_.empty()) {
return THREAD_ERROR;
}
#ifdef BIND_CORE
return CoreAffinity::GetInstance()->BindThreads(workers_, core_list);
#else
return THREAD_OK;
#endif // BIND_CORE
}
ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, BindMode bind_mode) {
ThreadPool *pool = new (std::nothrow) ThreadPool();
if (pool == nullptr) {
return nullptr;
}
int ret = pool->CreateThreads(thread_num);
if (ret != THREAD_OK) {
delete pool;
return nullptr;
}
#ifdef BIND_CORE
ret = CoreAffinity::GetInstance()->InitBindCoreId(thread_num, bind_mode);
if (ret != THREAD_OK) {
delete pool;
return nullptr;
}
#endif // BIND_CORE
return pool;
}
} // namespace mindspore

View File

@ -0,0 +1,124 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
#define MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
#include <semaphore.h>
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <new>
#ifdef __ANDROID__
#define BIND_CORE
#endif
namespace mindspore {
#ifdef THREAD_POOL_DEBUG
#include <stdio.h>
#define THREAD_INFO(content, args...) \
{ printf("[INFO] %s|%d: " #content "\r\n", __func__, __LINE__, ##args); }
#define THREAD_ERROR(content, args...) \
{ printf("[ERROR] %s|%d: " #content "\r\n", __func__, __LINE__, ##args); }
#else
#define THREAD_INFO(content, args...)
#define THREAD_ERROR(content, args...)
#endif
#define THREAD_ERROR_IF_NULL(ptr) \
do { \
if ((ptr) == nullptr) { \
return THREAD_ERROR; \
} \
} while (0)
#define THREAD_RETURN_IF_NULL(ptr) \
do { \
if ((ptr) == nullptr) { \
return; \
} \
} while (0)
enum ThreadRet { THREAD_OK = 0, THREAD_ERROR = 1 };
enum ThreadType { kActorThread = 0, kKernelThread = 1 };
enum BindMode {
Power_NoBind = 0, // free schedule
Power_Higher = 1,
Power_Middle = 2,
};
using Func = int (*)(void *arg, int);
using Contend = void *;
typedef struct Task {
Task(Func f, Contend c) : func(f), content(c) {}
Func func;
Contend content;
std::atomic_int task_id{0};
std::atomic_int finished{0};
std::atomic_int status{THREAD_OK}; // return status, RET_OK
} Task;
typedef struct Worker {
std::thread thread;
std::atomic_int type{kActorThread};
Task *task{nullptr};
sem_t sem;
int spin{0};
} Worker;
class ThreadPool {
public:
static ThreadPool *CreateThreadPool(size_t thread_num, BindMode bind_mode);
virtual ~ThreadPool();
size_t thread_num() const { return thread_num_; }
int SetCpuAffinity(const std::vector<int> &core_list);
int SetCpuAffinity(BindMode bind_mode);
int ParallelLaunch(const Func &func, Contend contend, int task_num);
protected:
ThreadPool() = default;
int CreateThreads(size_t thread_num);
void DestructThreads();
virtual void ThreadAsyncRun(size_t thread_id);
void KernelThreadRun(Worker *worker);
void DistributeTask(Task *task, int task_num);
std::mutex pool_mutex_;
std::condition_variable start_cond_;
std::vector<Worker *> workers_;
std::vector<Worker *> freelist_;
std::atomic_bool alive_{true};
size_t inter_thread_num_{0};
size_t thread_num_{1};
};
} // namespace mindspore
#endif // MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_