enable refactor thread pool

This commit is contained in:
yangjie159 2021-05-25 23:37:25 +08:00
parent abb6192daa
commit a6622a70ba
213 changed files with 587 additions and 1718 deletions

View File

@ -3,6 +3,7 @@ include_directories(${CMAKE_SOURCE_DIR}/mindspore/core)
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${CMAKE_BINARY_DIR})
include_directories(${CMAKE_SOURCE_DIR}/mindspore/core/mindrt/include)
include_directories(${CMAKE_SOURCE_DIR}/mindspore/core/mindrt/src)
if(ENABLE_CPU)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/backend/kernel_compiler/cpu)

View File

@ -312,6 +312,8 @@ GraphScheduler::~GraphScheduler() {
// Local maps clear.
actor_name_to_actor_.clear();
graph_output_to_actor_.clear();
delete thread_pool_;
thread_pool_ = nullptr;
}
void GraphScheduler::Initialize() {
@ -326,11 +328,13 @@ void GraphScheduler::Initialize() {
auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
actorMgr->Initialize();
// Create the thread pool of actor runtime.
auto max_thread_num = GetMaxThreadNum();
MS_LOG(INFO) << "Max available thread number: " << max_thread_num;
actorMgr->Initialize(max_thread_num);
thread_pool_ = InterThreadPool::CreateThreadPool(max_thread_num);
MS_EXCEPTION_IF_NULL(thread_pool_);
// Create memory manager actor.
auto memory_manager_actor = std::make_shared<MemoryManagerActor>();
@ -338,6 +342,7 @@ void GraphScheduler::Initialize() {
memory_manager_aid_ = memory_manager_actor->GetAID();
// Schedule memory manager actor, bind single thread to response to memory alloc and free quickly.
auto base_actor = static_cast<ActorReference>(memory_manager_actor);
base_actor->set_thread_pool(thread_pool_);
(void)actorMgr->Spawn(base_actor, false);
}
@ -400,6 +405,7 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) {
auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
for (auto actor : actors) {
actor->set_thread_pool(thread_pool_);
(void)actorMgr->Spawn(actor);
}
}

View File

@ -33,6 +33,7 @@
#include "runtime/framework/actor/copy_actor.h"
#include "runtime/hardware/device_context.h"
#include "backend/session/kernel_graph.h"
#include "thread/inter_threadpool.h"
namespace mindspore {
namespace runtime {
@ -242,6 +243,8 @@ class GraphScheduler {
// The id of memory manager actor.
AID memory_manager_aid_;
InterThreadPool *thread_pool_{nullptr};
bool init_{false};
};
} // namespace runtime

View File

@ -1,4 +1,5 @@
include_directories(${CMAKE_SOURCE_DIR}/mindspore/core/mindrt/include)
include_directories(${CMAKE_SOURCE_DIR}/mindspore/core/mindrt/src)
file(GLOB_RECURSE _VM_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_VM_SRC_LIST} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_VM)

View File

@ -8,6 +8,7 @@ file(GLOB MINDRT_SRC
${CMAKE_CURRENT_SOURCE_DIR}/src/async/*.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/evloop/*.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/timer/*.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/thread/*.cc
)
add_library(mindrt_mid OBJECT ${MINDRT_SRC})

View File

@ -31,6 +31,7 @@ namespace mindspore {
class ActorBase;
class ActorMgr;
class ActorPolicy;
class InterThreadPool;
using ActorReference = std::shared_ptr<ActorBase>;
@ -78,6 +79,8 @@ class ActorBase {
// delete the send/receive message package size
void DelRuleUdp(const std::string &peer, bool outputLog);
void set_thread_pool(InterThreadPool *pool) { pool_ = pool; }
protected:
using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>;
@ -146,7 +149,7 @@ class ActorBase {
private:
friend class ActorMgr;
friend class ActorThread;
friend class InterThreadPool;
// KMSG Msg Handler
virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg);
@ -194,6 +197,7 @@ class ActorBase {
void SetRunningStatus(bool start);
std::unique_ptr<ActorPolicy> actorThread;
InterThreadPool *pool_{nullptr};
AID id;
std::map<std::string, ActorFunction> actionFunctions;

View File

@ -33,9 +33,6 @@ struct MindrtAddress {
int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv = "", const std::string &udpUrl = "",
const std::string &udpUrlAdv = "", int threadCount = 0);
// brief terminate the threads for current session
void TerminateCurThreads(int threadCount = 0);
// brief spawn a process to run an actor
AID Spawn(ActorReference actor, bool sharedThread = true, bool start = true);

View File

@ -98,16 +98,11 @@ void ActorMgr::TerminateAll() {
}
}
void ActorMgr::Initialize(int threadCount) { threadPool.AddThread(threadCount); }
void ActorMgr::TerminateCurThreads(int threadCount) { threadPool.TerminateThread(threadCount); }
void ActorMgr::Finalize() {
this->TerminateAll();
MS_LOG(INFO) << "mindrt Actors finish exiting.";
// stop all actor threads;
threadPool.Finalize();
MS_LOG(INFO) << "mindrt Threads finish exiting.";
// stop iomgr thread
@ -115,7 +110,6 @@ void ActorMgr::Finalize() {
MS_LOG(INFO) << "finalize IOMgr=" << mgrIt->first.c_str();
mgrIt->second->Finish();
}
MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
}

View File

@ -23,7 +23,8 @@
#include <memory>
#include <string>
#include "actor/actorthread.h"
#include "actor/actor.h"
#include "thread/inter_threadpool.h"
namespace mindspore {
@ -47,8 +48,7 @@ class ActorMgr {
~ActorMgr();
void Finalize();
void Initialize(int threadCount);
void TerminateCurThreads(int threadCount);
void Initialize() {}
void RemoveActor(const std::string &name);
ActorReference GetActor(const AID &id);
const std::string GetUrl(const std::string &protocol = "tcp");
@ -62,7 +62,14 @@ class ActorMgr {
inline const std::string &GetDelegate() const { return delegate; }
inline void SetDelegate(const std::string &d) { delegate = d; }
inline void SetActorReady(const std::shared_ptr<ActorBase> &actor) { threadPool.EnqueReadyActor(actor); }
inline void SetActorReady(const std::shared_ptr<ActorBase> &actor) const {
auto pool = actor->pool_;
if (pool == nullptr) {
MS_LOG(ERROR) << "ThreadPOol is nullptr, actor: " << actor->GetAID().Name();
return;
}
pool->EnqueReadyActor(actor);
}
void SetActorStatus(const AID &pid, bool start);
private:
@ -77,8 +84,6 @@ class ActorMgr {
std::map<std::string, ActorReference> actors;
std::mutex actorsMutex;
ActorThread threadPool;
std::map<std::string, std::string> procotols;
std::set<std::string> urls;
std::string delegate;

View File

@ -1,138 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "actor/actorthread.h"
#ifdef __WIN32__
#include <windows.h>
#else
#include <unistd.h>
#endif
#include <atomic>
#include <utility>
#include <memory>
namespace mindspore {
constexpr int MAXTHREADNAMELEN = 12;
size_t GetMaxThreadCount() {
size_t max_num;
#ifdef __WIN32__
SYSTEM_INFO sys_info;
GetSystemInfo(&sys_info);
max_num = sys_info.dwNumberOfProcessors;
#else
max_num = sysconf(_SC_NPROCESSORS_ONLN);
#endif
return max_num;
}
ActorThread::ActorThread() : readyActors(), workers() {
readyActors.clear();
workers.clear();
char *envThreadName = getenv("MINDRT_THREAD_NAME");
if (envThreadName != nullptr) {
threadName = envThreadName;
if (threadName.size() > MAXTHREADNAMELEN) {
threadName.resize(MAXTHREADNAMELEN);
}
} else {
threadName = "MINDRT_ACT";
}
maxThreads_ = GetMaxThreadCount();
}
ActorThread::~ActorThread() {}
void ActorThread::AddThread(int threadCount) {
std::unique_lock<std::mutex> lock(initLock_);
int threadsNeed = threadCount - (workers.size() - threadsInUse_);
for (int i = 0; i < threadsNeed; ++i) {
if (workers.size() >= maxThreads_) {
MS_LOG(DEBUG) << "threads number in mindrt reach upper limit. maxThreads:" << maxThreads_;
break;
}
std::unique_ptr<std::thread> worker(new (std::nothrow) std::thread(&ActorThread::Run, this));
MINDRT_OOM_EXIT(worker)
workers.push_back(std::move(worker));
threadsInUse_ += 1;
}
}
void ActorThread::TerminateThread(int threadCount) {
// temp scheme, not actually terminate the threads when current session destructs
threadsInUse_ -= threadCount;
}
void ActorThread::Finalize() {
MS_LOG(INFO) << "Actor's threads are exiting.";
// terminate all thread; enqueue nullptr actor to terminate;
std::shared_ptr<ActorBase> exitActor(nullptr);
for (auto it = workers.begin(); it != workers.end(); ++it) {
EnqueReadyActor(exitActor);
}
// wait all thread to exit
for (auto it = workers.begin(); it != workers.end(); ++it) {
std::unique_ptr<std::thread> &worker = *it;
if (worker->joinable()) {
worker->join();
}
}
workers.clear();
MS_LOG(INFO) << "Actor's threads finish exiting.";
}
void ActorThread::DequeReadyActor(std::shared_ptr<ActorBase> &actor) {
std::unique_lock<std::mutex> lock(readyActorMutex);
conditionVar.wait(lock, [this] { return (this->readyActors.size() > 0); });
actor = readyActors.front();
readyActors.pop_front();
}
void ActorThread::EnqueReadyActor(const std::shared_ptr<ActorBase> &actor) {
{
std::lock_guard<std::mutex> lock(readyActorMutex);
readyActors.push_back(actor);
}
conditionVar.notify_one();
}
void ActorThread::Run() {
#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 12
static std::atomic<int> actorCount(1);
int ret = pthread_setname_np(pthread_self(), (threadName + std::to_string(actorCount.fetch_add(1))).c_str());
if (0 != ret) {
MS_LOG(INFO) << "set pthread name fail]ret:" << ret;
} else {
MS_LOG(INFO) << "set pthread name success]threadID:" << pthread_self();
}
#endif
bool terminate = false;
do {
std::shared_ptr<ActorBase> actor;
DequeReadyActor(actor);
if (actor != nullptr) {
actor->Run();
} else {
terminate = true;
MS_LOG(DEBUG) << "Actor this Threads have finished exiting.";
}
} while (!terminate);
}
}; // end of namespace mindspore

View File

@ -1,56 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORTHREAD_H
#define MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORTHREAD_H
#include <condition_variable>
#include <list>
#include <thread>
#include <memory>
#include <string>
#include "actor/actor.h"
namespace mindspore {
class ActorThread {
public:
ActorThread();
~ActorThread();
void Finalize();
void AddThread(int threadCount);
void TerminateThread(int threadCount);
void EnqueReadyActor(const std::shared_ptr<ActorBase> &actor);
private:
void Run();
void DequeReadyActor(std::shared_ptr<ActorBase> &actor);
std::list<std::shared_ptr<ActorBase>> readyActors;
std::mutex readyActorMutex;
std::condition_variable conditionVar;
std::list<std::unique_ptr<std::thread>> workers;
std::string threadName;
size_t threadsInUse_ = 0;
size_t maxThreads_;
std::mutex initLock_;
};
}; // end of namespace mindspore
#endif

View File

@ -57,10 +57,6 @@ const MindrtAddress &GetMindrtAddress() {
return *local::g_mindrtAddress;
}
void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); }
void TerminateCurThreads(int threadCount) { ActorMgr::GetActorMgrRef()->TerminateCurThreads(threadCount); }
class MindrtExit {
public:
MindrtExit() { MS_LOG(DEBUG) << "trace: enter MindrtExit()---------"; }
@ -73,10 +69,7 @@ class MindrtExit {
int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl,
const std::string &udpUrlAdv, int threadCount) {
MS_LOG(DEBUG) << "mindrt starts ......";
// start actor's thread
SetThreadCount(threadCount);
ActorMgr::GetActorMgrRef()->Initialize();
MS_LOG(DEBUG) << "mindrt has started.";
return MINDRT_OK;
}

View File

@ -19,9 +19,6 @@
#include <stdlib.h>
#include <string>
#include <algorithm>
#ifdef __ANDROID__
#include <sched.h>
#endif
#ifdef MS_COMPILE_IOS
#include <sys/types.h>
#include <sys/sysctl.h>
@ -29,7 +26,6 @@
#endif // MS_COMPILE_IOS
#include "thread/threadpool.h"
#ifdef BIND_CORE
namespace mindspore {
#define MAX_PATH_SIZE (256)
@ -240,6 +236,8 @@ int CoreAffinity::SortCPUProcessors() {
}
}
}
higher_num_ = 0;
sorted_id_.clear();
int max_freq = freq_set.front().max_freq;
for (const auto &info : freq_set) {
THREAD_INFO("sorted core id: %d, max frequency: %d, arch: %d", info.core_id, info.max_freq, info.arch);
@ -274,14 +272,15 @@ int CoreAffinity::InitBindCoreId(size_t thread_num, BindMode bind_mode) {
return THREAD_OK;
}
int CoreAffinity::SetAffinity(pthread_t thread_id, cpu_set_t *cpuSet) const {
#ifdef BIND_CORE
int CoreAffinity::SetAffinity(const pthread_t &thread_id, cpu_set_t *cpu_set) const {
#ifdef __ANDROID__
#if __ANDROID_API__ >= 21
THREAD_INFO("thread: %d, mask: %lu", pthread_gettid_np(thread_id), cpuSet->__bits[0]);
int ret = sched_setaffinity(pthread_gettid_np(thread_id), sizeof(cpu_set_t), cpuSet);
THREAD_INFO("thread: %d, mask: %lu", pthread_gettid_np(thread_id), cpu_set->__bits[0]);
int ret = sched_setaffinity(pthread_gettid_np(thread_id), sizeof(cpu_set_t), cpu_set);
if (ret != THREAD_OK) {
THREAD_ERROR("bind thread %d to cpu failed. ERROR %d", pthread_gettid_np(thread_id), ret);
return THREAD_OK;
return THREAD_ERROR;
}
#endif
#else
@ -289,7 +288,7 @@ int CoreAffinity::SetAffinity(pthread_t thread_id, cpu_set_t *cpuSet) const {
THREAD_ERROR("not bind thread to apple's cpu.");
return THREAD_ERROR;
#else
int ret = pthread_setaffinity_np(thread_id, sizeof(cpu_set_t), cpuSet);
int ret = pthread_setaffinity_np(thread_id, sizeof(cpu_set_t), cpu_set);
if (ret != THREAD_OK) {
THREAD_ERROR("set thread: %lu to cpu failed", thread_id);
return THREAD_ERROR;
@ -298,8 +297,10 @@ int CoreAffinity::SetAffinity(pthread_t thread_id, cpu_set_t *cpuSet) const {
#endif
return THREAD_OK;
}
#endif // BIND_CORE
int CoreAffinity::FreeScheduleThreads(const std::vector<Worker *> &workers) const {
#ifdef BIND_CORE
if (thread_num_ != workers.size()) {
return THREAD_ERROR;
}
@ -315,11 +316,13 @@ int CoreAffinity::FreeScheduleThreads(const std::vector<Worker *> &workers) cons
return THREAD_ERROR;
}
}
#endif // BIND_CORE
return THREAD_OK;
}
int CoreAffinity::BindThreadsToCoreList(const std::vector<Worker *> &workers) const {
if (bind_id_.size() != thread_num_) {
#ifdef BIND_CORE
if (thread_num_ != workers.size()) {
THREAD_ERROR("invalid core list");
return THREAD_ERROR;
}
@ -330,14 +333,32 @@ int CoreAffinity::BindThreadsToCoreList(const std::vector<Worker *> &workers) co
// affinity mask determines the CPU core which it is eligible to run
int ret = SetAffinity(workers[i]->thread.native_handle(), &mask);
if (ret != THREAD_OK) {
THREAD_ERROR("set thread[%zu] affinity failed", i);
THREAD_ERROR("set thread[%zu] affinity to core[%d] failed", i, bind_id_[i]);
return THREAD_ERROR;
}
THREAD_INFO("bind thread[%zu] success", i);
THREAD_ERROR("set thread[%zu] affinity to core[%d] success", i, bind_id_[i]);
}
#endif // BIND_CORE
return THREAD_OK;
}
int CoreAffinity::BindProcess(BindMode bind_mode) const {
#ifdef BIND_CORE
cpu_set_t mask;
CPU_ZERO(&mask);
if (bind_mode != Power_NoBind) {
CPU_SET(bind_id_.front(), &mask);
} else {
for (int id : bind_id_) {
CPU_SET(id, &mask);
}
}
return SetAffinity(pthread_self(), &mask);
#else
return THREAD_OK;
#endif // BIND_CORE
}
int CoreAffinity::BindThreads(const std::vector<Worker *> &workers, BindMode bind_mode) const {
if (bind_mode == Power_NoBind) {
return FreeScheduleThreads(workers);
@ -351,5 +372,3 @@ int CoreAffinity::BindThreads(const std::vector<Worker *> &workers, const std::v
return BindThreadsToCoreList(workers);
}
} // namespace mindspore
#endif // BIND_CORE

View File

@ -18,28 +18,41 @@
#define MINDSPORE_CORE_MINDRT_RUNTIME_CORE_AFFINITY_H_
#include <vector>
#include "thread/threadpool.h"
#ifdef BIND_CORE
#include <thread>
#ifdef __ANDROID__
#define BIND_CORE
#include <sched.h>
#endif
namespace mindspore {
enum BindMode {
Power_NoBind = 0, // free schedule
Power_Higher = 1,
Power_Middle = 2,
};
struct Worker;
class CoreAffinity {
public:
static CoreAffinity *GetInstance() {
static CoreAffinity affinity;
return &affinity;
}
CoreAffinity() = default;
~CoreAffinity() = default;
int InitBindCoreId(size_t thread_num, BindMode bind_mode);
int BindThreads(const std::vector<Worker *> &workers, const std::vector<int> &core_list);
int BindThreads(const std::vector<Worker *> &workers, BindMode bind_mode) const;
int BindProcess(BindMode bind_mode) const;
private:
CoreAffinity() = default;
~CoreAffinity() = default;
#ifdef BIND_CORE
int SetAffinity(const pthread_t &thread_id, cpu_set_t *cpu_set) const;
#endif // BIND_CORE
int BindThreadsToCoreList(const std::vector<Worker *> &workers) const;
int FreeScheduleThreads(const std::vector<Worker *> &workers) const;
int SetAffinity(pthread_t thread_id, cpu_set_t *cpuSet) const;
int SortCPUProcessors();
// bind_id contains the CPU cores to bind
@ -52,7 +65,7 @@ class CoreAffinity {
size_t higher_num_{0};
size_t thread_num_{0};
};
} // namespace mindspore
#endif // BIND_CORE
#endif // MINDSPORE_CORE_MINDRT_RUNTIME_CORE_AFFINITY_H_

View File

@ -20,6 +20,11 @@
namespace mindspore {
InterThreadPool::~InterThreadPool() {
{
THREAD_INFO("wait util actor queue is empty");
std::unique_lock<std::mutex> _l(actor_mutex_);
finish_cond_var_.wait(_l, [this]() { return actor_queue_.empty(); });
}
exit_ = true;
alive_ = false;
actor_cond_var_.notify_all();
@ -38,15 +43,12 @@ void InterThreadPool::ActorThreadRun() {
actor_queue_.pop();
}
actor->Run();
finish_cond_var_.notify_one();
}
void InterThreadPool::ThreadAsyncRun(size_t thread_id) {
{
std::unique_lock<std::mutex> _l(pool_mutex_);
start_cond_.wait(_l, [this]() { return workers_.size() == thread_num_; });
}
Worker *worker = workers_[thread_id];
void InterThreadPool::ThreadAsyncRun(Worker *worker) {
THREAD_RETURN_IF_NULL(worker);
sem_post(&worker->init);
while (alive_) {
if (worker->type == kKernelThread) {
KernelThreadRun(worker);
@ -78,7 +80,7 @@ InterThreadPool *InterThreadPool::CreateThreadPool(size_t inter_thread_num, size
return nullptr;
}
#ifdef BIND_CORE
ret = CoreAffinity::GetInstance()->InitBindCoreId(thread_num, bind_mode);
ret = pool->InitAffinityInfo(bind_mode);
if (ret != THREAD_OK) {
delete pool;
return nullptr;

View File

@ -39,12 +39,13 @@ class InterThreadPool : public ThreadPool {
private:
explicit InterThreadPool(size_t inter_thread_num) { inter_thread_num_ = inter_thread_num; }
void ThreadAsyncRun(size_t thread_id) override;
void ThreadAsyncRun(Worker *worker) override;
void ActorThreadRun();
std::mutex actor_mutex_;
std::condition_variable actor_cond_var_;
std::condition_variable finish_cond_var_;
std::queue<ActorReference> actor_queue_;
std::atomic_bool exit_{false};

View File

@ -24,12 +24,11 @@ namespace mindspore {
constexpr int kDefaultSpinCount = 30000;
ThreadPool::~ThreadPool() {
alive_ = false;
alive_.store(false);
DestructThreads();
}
void ThreadPool::DestructThreads() {
std::lock_guard<std::mutex> lock(pool_mutex_);
for (auto &worker : workers_) {
sem_post(&worker->sem);
if (worker->thread.joinable()) {
@ -39,12 +38,15 @@ void ThreadPool::DestructThreads() {
delete worker;
worker = nullptr;
}
THREAD_INFO("deconstruct threads success");
workers_.clear();
if (affinity_ != nullptr) {
delete affinity_;
affinity_ = nullptr;
}
THREAD_INFO("deconstruct threads success");
}
int ThreadPool::CreateThreads(size_t thread_num) {
std::lock_guard<std::mutex> lock(pool_mutex_);
size_t core_num = std::thread::hardware_concurrency();
thread_num_ = std::min(thread_num, core_num);
if (thread_num_ <= 0) {
@ -54,14 +56,17 @@ int ThreadPool::CreateThreads(size_t thread_num) {
for (size_t i = 0; i < thread_num_; ++i) {
Worker *worker = new (std::nothrow) Worker();
THREAD_ERROR_IF_NULL(worker);
worker->type = i < inter_thread_num_ ? kActorThread : kKernelThread;
worker->thread = std::thread(&ThreadPool::ThreadAsyncRun, this, i);
sem_init(&worker->sem, 0, 0);
sem_init(&worker->init, 0, 0);
worker->type = i < inter_thread_num_ ? kActorThread : kKernelThread;
if (worker->type == kKernelThread) {
freelist_.push_back(worker);
}
worker->thread = std::thread(&ThreadPool::ThreadAsyncRun, this, worker);
sem_wait(&worker->init);
workers_.push_back(worker);
THREAD_INFO("create thread[%zu]", i);
}
freelist_.insert(freelist_.begin(), workers_.begin() + inter_thread_num_, workers_.end());
start_cond_.notify_all();
return THREAD_OK;
}
@ -89,20 +94,16 @@ void ThreadPool::KernelThreadRun(Worker *worker) {
}
}
void ThreadPool::ThreadAsyncRun(size_t thread_id) {
{
// wait for all threads to be created
std::unique_lock<std::mutex> _l(pool_mutex_);
start_cond_.wait(_l, [this]() { return workers_.size() == thread_num_; });
}
Worker *worker = workers_[thread_id];
void ThreadPool::ThreadAsyncRun(Worker *worker) {
THREAD_RETURN_IF_NULL(worker);
sem_post(&worker->init);
while (alive_) {
KernelThreadRun(worker);
}
}
int ThreadPool::ParallelLaunch(const Func &func, Contend contend, int task_num) {
THREAD_INFO("parallel launch, task num: %d", task_num);
// distribute task to the KernelThread and the free ActorThread,
// if the task num is greater than the KernelThread num
Task task = Task(func, contend);
@ -136,12 +137,25 @@ void ThreadPool::DistributeTask(Task *task, int task_num) {
}
}
int ThreadPool::InitAffinityInfo(BindMode bind_mode) {
affinity_ = new (std::nothrow) CoreAffinity();
THREAD_ERROR_IF_NULL(affinity_);
int ret = affinity_->InitBindCoreId(thread_num_, bind_mode);
if (ret != THREAD_OK) {
delete affinity_;
affinity_ = nullptr;
return THREAD_ERROR;
}
return THREAD_OK;
}
int ThreadPool::SetCpuAffinity(BindMode bind_mode) {
if (workers_.empty()) {
return THREAD_ERROR;
}
#ifdef BIND_CORE
return CoreAffinity::GetInstance()->BindThreads(workers_, bind_mode);
THREAD_ERROR_IF_NULL(affinity_);
return affinity_->BindThreads(workers_, bind_mode);
#else
return THREAD_OK;
#endif // BIND_CORE
@ -152,7 +166,17 @@ int ThreadPool::SetCpuAffinity(const std::vector<int> &core_list) {
return THREAD_ERROR;
}
#ifdef BIND_CORE
return CoreAffinity::GetInstance()->BindThreads(workers_, core_list);
THREAD_ERROR_IF_NULL(affinity_);
return affinity_->BindThreads(workers_, core_list);
#else
return THREAD_OK;
#endif // BIND_CORE
}
int ThreadPool::SetProcessAffinity(BindMode bind_mode) const {
#ifdef BIND_CORE
THREAD_ERROR_IF_NULL(affinity_);
return affinity_->BindProcess(bind_mode);
#else
return THREAD_OK;
#endif // BIND_CORE
@ -169,7 +193,7 @@ ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, BindMode bind_mode)
return nullptr;
}
#ifdef BIND_CORE
ret = CoreAffinity::GetInstance()->InitBindCoreId(thread_num, bind_mode);
ret = pool->InitAffinityInfo(bind_mode);
if (ret != THREAD_OK) {
delete pool;
return nullptr;

View File

@ -25,10 +25,7 @@
#include <condition_variable>
#include <mutex>
#include <new>
#ifdef __ANDROID__
#define BIND_CORE
#endif
#include "thread/core_affinity.h"
namespace mindspore {
@ -60,12 +57,6 @@ namespace mindspore {
enum ThreadRet { THREAD_OK = 0, THREAD_ERROR = 1 };
enum ThreadType { kActorThread = 0, kKernelThread = 1 };
enum BindMode {
Power_NoBind = 0, // free schedule
Power_Higher = 1,
Power_Middle = 2,
};
using Func = int (*)(void *arg, int);
using Contend = void *;
@ -83,6 +74,7 @@ typedef struct Worker {
std::atomic_int type{kActorThread};
Task *task{nullptr};
sem_t sem;
sem_t init;
int spin{0};
} Worker;
@ -96,6 +88,8 @@ class ThreadPool {
int SetCpuAffinity(const std::vector<int> &core_list);
int SetCpuAffinity(BindMode bind_mode);
int SetProcessAffinity(BindMode bind_mode) const;
int ParallelLaunch(const Func &func, Contend contend, int task_num);
protected:
@ -104,13 +98,14 @@ class ThreadPool {
int CreateThreads(size_t thread_num);
void DestructThreads();
virtual void ThreadAsyncRun(size_t thread_id);
int InitAffinityInfo(BindMode bind_mode);
virtual void ThreadAsyncRun(Worker *worker);
void KernelThreadRun(Worker *worker);
void DistributeTask(Task *task, int task_num);
std::mutex pool_mutex_;
std::condition_variable start_cond_;
std::vector<Worker *> workers_;
std::vector<Worker *> freelist_;
@ -118,6 +113,8 @@ class ThreadPool {
size_t inter_thread_num_{0};
size_t thread_num_{1};
CoreAffinity *affinity_{nullptr};
};
} // namespace mindspore

View File

@ -130,10 +130,6 @@ if(ENABLE_CONVERTER OR BUILD_MINDDATA STREQUAL "full" OR BUILD_MINDDATA STREQUAL
include(${TOP_DIR}/cmake/external_libs/json.cmake)
endif()
if(SUPPORT_TRAIN OR WIN32)
set(ENABLE_MINDRT "off")
endif()
if(DEFINED ARCHS)
add_definitions(-DMS_COMPILE_IOS)
endif()
@ -210,6 +206,7 @@ endif()
if(ENABLE_MINDRT)
include_directories(${CORE_DIR}/mindrt/include)
include_directories(${CORE_DIR}/mindrt/src)
endif()
if(NOT WIN32 AND NOT APPLE)

View File

@ -54,7 +54,6 @@ set(LITE_SRC
${CMAKE_CURRENT_SOURCE_DIR}/common/quant_utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/runtime/allocator.cc
${CMAKE_CURRENT_SOURCE_DIR}/runtime/runtime_api.cc
${CMAKE_CURRENT_SOURCE_DIR}/runtime/thread_pool.c
${CMAKE_CURRENT_SOURCE_DIR}/runtime/infer_manager.cc
${CMAKE_CURRENT_SOURCE_DIR}/tensor.cc
${CMAKE_CURRENT_SOURCE_DIR}/ms_tensor.cc

View File

@ -23,8 +23,12 @@ namespace mindspore::lite {
int Executor::Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, mindspore::Allocator *allocator,
const KernelCallBack &before, const KernelCallBack &after) {
InterThreadPool *thread_pool = ctx_->thread_pool_;
if (thread_pool == nullptr) {
return RET_ERROR;
}
CpuBindMode cpu_bind_mode = ctx_->device_list_.front().device_info_.cpu_device_info_.cpu_bind_mode_;
BindThreads(ctx_->thread_pool_, true, cpu_bind_mode);
thread_pool->SetCpuAffinity(static_cast<BindMode>(cpu_bind_mode));
MS_ASSERT(nullptr != allocator);
auto ret = CheckTensorsInvalid(in_tensors);
@ -59,8 +63,7 @@ int Executor::Run(const std::vector<Tensor *> &in_tensors, const std::vector<Ten
}
}
}
BindThreads(ctx_->thread_pool_, false, cpu_bind_mode);
thread_pool->SetCpuAffinity(static_cast<BindMode>(NO_BIND));
return RET_OK;
}
} // namespace mindspore::lite

View File

@ -72,9 +72,9 @@ int InnerContext::Init() {
return RET_NOT_SUPPORT;
}
if (this->thread_pool_ == nullptr && this->IsCpuEnabled()) {
this->thread_pool_ =
CreateLiteThreadPool(this->thread_num_, this->device_list_[0].device_info_.cpu_device_info_.cpu_bind_mode_);
if (this->thread_pool_ == nullptr) {
thread_pool_ = InterThreadPool::CreateThreadPool(
1, this->thread_num_, static_cast<BindMode>(this->device_list_[0].device_info_.cpu_device_info_.cpu_bind_mode_));
if (thread_pool_ == nullptr) {
MS_LOG(ERROR) << "Create ThreadPool failed";
return RET_NULL_PTR;
}
@ -110,8 +110,7 @@ int InnerContext::Init() {
InnerContext::~InnerContext() {
if (this->thread_pool_ != nullptr) {
DestroyThreadPool(this->thread_pool_);
free(this->thread_pool_);
delete thread_pool_;
this->thread_pool_ = nullptr;
}
#ifdef ENABLE_ARM

View File

@ -21,6 +21,7 @@
#include "include/context.h"
#include "src/runtime/runtime_api.h"
#include "src/runtime/allocator.h"
#include "thread/inter_threadpool.h"
#ifdef ENABLE_ARM
#include "src/cpu_info.h"
#endif
@ -31,7 +32,7 @@
namespace mindspore::lite {
struct InnerContext : public Context {
public:
struct ThreadPool *thread_pool_ = nullptr;
InterThreadPool *thread_pool_{nullptr};
public:
InnerContext() = default;

View File

@ -199,6 +199,11 @@ int LiteOpActor::PrepareOutputData() {
std::vector<std::shared_ptr<LiteOpActor>> CreateOpActor(const std::vector<kernel::LiteKernel *> &kernels) {
std::vector<std::shared_ptr<LiteOpActor>> actors;
std::unordered_map<size_t, AID> partial_map{};
auto thread_pool = kernels[0]->Context()->thread_pool_;
if (thread_pool == nullptr) {
MS_LOG(ERROR) << "thread pool is nullptr";
return actors;
}
for (size_t i = 0; i < kernels.size(); ++i) {
if ((kernel::LiteKernelUtil::IsSwitchCall(kernels[i]))) {
auto switch_actor = std::make_shared<LiteSwitchOpActor>(kernels[i]);
@ -207,6 +212,7 @@ std::vector<std::shared_ptr<LiteOpActor>> CreateOpActor(const std::vector<kernel
actors.clear();
return actors;
}
switch_actor->set_thread_pool(thread_pool);
partial_map[i] = switch_actor->GetAID();
actors.push_back(switch_actor);
} else {
@ -216,6 +222,7 @@ std::vector<std::shared_ptr<LiteOpActor>> CreateOpActor(const std::vector<kernel
actors.clear();
return actors;
}
actor->set_thread_pool(thread_pool);
partial_map[i] = actor->GetAID();
actors.push_back(actor);
}
@ -423,7 +430,6 @@ void MindrtTerminate(const std::vector<std::shared_ptr<LiteOpActor>> &actor_list
for (const auto &actor : actor_list) {
mindspore::Terminate(actor->GetAID());
}
mindspore::TerminateCurThreads(1);
}
} // namespace mindspore::lite

View File

@ -47,8 +47,13 @@ class LiteOpActor : public OpActor<lite::Tensor> {
return;
}
InterThreadPool *thread_pool = kernel_->Context()->thread_pool_;
if (thread_pool == nullptr) {
MS_LOG(ERROR) << "ThreadPool is nullptr, kernel: " << kernel_->name();
return;
}
CpuBindMode cpu_bind_mode = kernel_->Context()->device_list_.front().device_info_.cpu_device_info_.cpu_bind_mode_;
BindThreads(static_cast<const lite::InnerContext *>(kernel_->Context())->thread_pool_, true, cpu_bind_mode);
thread_pool->SetCpuAffinity(static_cast<BindMode>(cpu_bind_mode));
int ret = CheckInputData();
if (ret != RET_OK) {
@ -78,7 +83,7 @@ class LiteOpActor : public OpActor<lite::Tensor> {
inputs_data_.clear();
AsyncOutput(context);
BindThreads(static_cast<const lite::InnerContext *>(kernel_->Context())->thread_pool_, false, cpu_bind_mode);
thread_pool->SetCpuAffinity(static_cast<BindMode>(NO_BIND));
SetOutputData(context);
for (auto &input_data : inputs_data_) {

View File

@ -570,11 +570,16 @@ int LiteSession::Init(const Context *context) {
is_running_.store(false);
return ret;
}
BindThreads(context_->thread_pool_, true,
context_->device_list_.front().device_info_.cpu_device_info_.cpu_bind_mode_);
CpuBindMode cpu_bind_mode = this->context_->device_list_.front().device_info_.cpu_device_info_.cpu_bind_mode_;
InterThreadPool *thread_pool = this->context_->thread_pool_;
if (thread_pool == nullptr) {
MS_LOG(ERROR) << "thread pool is nullptr";
is_running_.store(false);
return RET_NULL_PTR;
}
thread_pool->SetProcessAffinity(static_cast<BindMode>(cpu_bind_mode));
ret = InitGPURuntime();
BindThreads(context_->thread_pool_, false,
context_->device_list_.front().device_info_.cpu_device_info_.cpu_bind_mode_);
thread_pool->SetProcessAffinity(static_cast<BindMode>(NO_BIND));
if (ret != RET_OK) {
MS_LOG(ERROR) << "Init GPU runtime failed.";
is_running_.store(false);
@ -614,7 +619,6 @@ LiteSession::~LiteSession() {
output_node_map_.clear();
output_tensor_map_.clear();
input_vec_.clear();
delete this->context_;
delete this->executor_;
this->executor_ = nullptr;
#if SUPPORT_NPU
@ -628,6 +632,8 @@ LiteSession::~LiteSession() {
#if GPU_OPENCL
delete opencl_runtime_wrapper_;
#endif
delete this->context_;
this->context_ = nullptr;
delete (model_);
is_running_.store(false);
}

View File

@ -73,8 +73,8 @@ int ConstantOfShapeCPUKernel::Run() {
int thread_count = MSMIN(op_parameter_->thread_num_, param_->element_size_);
thread_stride_ = UP_DIV(param_->element_size_, thread_count);
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConstantOfShapeRun,
this, thread_count);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConstantOfShapeRun, this, thread_count);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ConstantOfShapeRun error error_code[" << ret << "]";
return ret;

View File

@ -236,8 +236,8 @@ int DetectionPostProcessBaseCPUKernel::Run() {
return status;
}
} else {
status = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
NmsMultiClassesFastCoreRun, this, op_parameter_->thread_num_);
status = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(NmsMultiClassesFastCoreRun, this, op_parameter_->thread_num_);
if (status != RET_OK) {
MS_LOG(ERROR) << "NmsMultiClassesFastCoreRun error error_code[" << status << "]";
FreeAllocatedBuffer();

View File

@ -166,8 +166,8 @@ int RunPriorBox(void *cdata, int task_id) {
}
int PriorBoxCPUKernel::Run() {
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, RunPriorBox,
this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(RunPriorBox, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "PriorBox run error, error_code[" << error_code << "]";
return RET_ERROR;

View File

@ -172,8 +172,8 @@ int QuantDTypeCastCPUKernel::Run() {
uint8_ptr_ = reinterpret_cast<uint8_t *>(out_tensors_[0]->data_c());
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, QuantDTypeCastRun,
this, thread_n_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(QuantDTypeCastRun, this, thread_n_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Scale error error_code[" << ret << "]";
if (in_tensors_[0]->data_type() == TypeId::kNumberTypeInt8 &&

View File

@ -66,8 +66,8 @@ int ReshapeRun(void *cdata, int task_id) {
int ReshapeBaseCPUKernel::Run() {
input_ptr_ = reinterpret_cast<uint8_t *>(in_tensors_.at(kInputIndex)->data_c());
output_ptr_ = reinterpret_cast<uint8_t *>(out_tensors_.at(kOutputIndex)->data_c());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ReshapeRun, this,
context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ReshapeRun, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Reshape run error error_code[" << ret << "]";
return ret;

View File

@ -82,8 +82,8 @@ int SliceCPUKernel::Run() {
lite::DataTypeSize(in_tensors_.at(0)->data_type()));
return RET_OK;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, SliceLaunch, this,
op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(SliceLaunch, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "slice launch fail!ret: " << ret;
return RET_ERROR;

View File

@ -126,8 +126,8 @@ int SplitBaseCPUKernel::Run() {
output_ptr_.at(i) = output_tensor->data_c();
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, SplitRun, this,
thread_n_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(SplitRun, this, thread_n_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "split error error_code[" << ret << "]";
}

View File

@ -118,8 +118,8 @@ int SplitWithOverlapBaseCPUKernel::Run() {
inner_stride_ *= input_shape[i];
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, SplitWithOverlapRun,
this, param_->num_split_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(SplitWithOverlapRun, this, param_->num_split_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ParallelLaunch for SplitWIthOverlapRun run fail. errorcode:[" << ret << "]";
return RET_ERROR;

View File

@ -101,7 +101,7 @@ int StackBaseCPUKernel::Run() {
// run stack
num_threads_ = MSMIN(UP_DIV(outer_size_, 64), this->context_->thread_num_);
auto ret =
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, StackRun, this, num_threads_);
static_cast<const lite::InnerContext *>(this->context_)->thread_pool_->ParallelLaunch(StackRun, this, num_threads_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "StackBaseCPUKernel Run error: error_code[" << ret << "]";
return RET_ERROR;

View File

@ -163,8 +163,8 @@ int StridedSliceCPUKernel::FastRun() {
}
input_ptr_ = reinterpret_cast<uint8_t *>(in_tensors_.front()->data_c());
output_ptr_ = reinterpret_cast<uint8_t *>(out_tensors_.front()->data_c());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, StrideRun, this,
context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(StrideRun, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Stride run error error_code[" << ret << "]";
return ret;

View File

@ -128,8 +128,8 @@ int TileCPUKernel::SimpleTileImpl(int task_id) {
}
int TileCPUKernel::RunSimpleTile() {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, SimpleTile, this,
context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(SimpleTile, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "RunSimpleTile error code[" << ret << "]";
return ret;

View File

@ -100,8 +100,8 @@ int ActivationFp16CPUKernel::Run() {
fp16_input_ = reinterpret_cast<float16_t *>(input_tensor->data_c());
fp16_output_ = reinterpret_cast<float16_t *>(output_tensor->data_c());
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
ActivationFp16Run, this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ActivationFp16Run, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "Activation function error error_code[" << error_code << "]";
return RET_ERROR;

View File

@ -168,8 +168,8 @@ int ArithmeticCompareFP16CPUKernel::Run() {
FreeTmpBuffer();
return RET_ERROR;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ArithmeticsRunFp16,
this, context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ArithmeticsRunFp16, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ArithmeticsRunFp16 run error error_code[" << ret << "]";
}

View File

@ -178,8 +178,8 @@ int ArithmeticFP16CPUKernel::Run() {
FreeFp16Buffer();
return RET_ERROR;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ArithmeticsRun, this,
context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ArithmeticsRun, this, context_->thread_num_);
if (out_tensors_.at(0)->data_type() == kNumberTypeFloat32) {
Float16ToFloat32(static_cast<float16_t *>(output_ptr_), reinterpret_cast<float *>(output_tensor->MutableData()),
output_tensor->ElementsNum());

View File

@ -83,8 +83,8 @@ int ArithmeticSelfFp16CPUKernel::Run() {
}
output_fp16_ptr_ = reinterpret_cast<float16_t *>(output_tensor->data_c());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ArithmeticSelfRun,
this, op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ArithmeticSelfRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ArithmeticSelfRun error error_code[" << ret << "]";
}

View File

@ -59,8 +59,8 @@ int BatchnormFp16CPUKernel::Run() {
return RET_ERROR;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, BatchNormRun, this,
op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(BatchNormRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "BatchnormRun error error_code[" << ret << "]";
}

View File

@ -132,8 +132,8 @@ int CastFp16CPUKernel::Run() {
if (data_num_ == 0) {
return RET_OK;
}
return ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, CastFp16Run, this,
op_parameter_->thread_num_);
return static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(CastFp16Run, this, op_parameter_->thread_num_);
}
REG_KERNEL(kCPU, kNumberTypeFloat16, PrimitiveType_Cast, LiteKernelCreator<CastFp16CPUKernel>)

View File

@ -236,16 +236,16 @@ int Convolution1x1FP16CPUKernel::Run() {
int ret = RET_ERROR;
if (multi_thread_by_hw_) {
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
Convolution1x1Fp16RunHw, this, thread_count_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(Convolution1x1Fp16RunHw, this, thread_count_);
} else {
#ifdef ENABLE_ARM64
RowMajor2Col16MajorFp16Opt(input_ptr_, pack_input_, matmul_param_->row_, matmul_param_->deep_);
#else
RowMajor2Col12MajorFp16Opt(input_ptr_, pack_input_, matmul_param_->row_, matmul_param_->deep_);
#endif
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
Convolution1x1Fp16RunOc, this, thread_count_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(Convolution1x1Fp16RunOc, this, thread_count_);
}
if (ret != RET_OK) {
MS_LOG(ERROR) << "ParallelLaunch failed.";

View File

@ -104,8 +104,8 @@ static int ConvDwFp16Run(void *cdata, int task_id) {
}
int ConvolutionDepthwiseFp16CPUKernel::Run() {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvDwFp16Run, this,
conv_param_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvDwFp16Run, this, conv_param_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ConvDwFp16Run error: error_code[" << ret << "]";
}

View File

@ -155,8 +155,8 @@ int ConvolutionDepthwiseSWFp16CPUKernel::Run() {
packed_output_ = output_ptr;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvDwSWFp16Run, this,
conv_param_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvDwSWFp16Run, this, conv_param_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ConvDwSWFp16Run error: error_code[" << ret << "]";
}

View File

@ -144,8 +144,8 @@ int ConvolutionFP16CPUKernel::Run() {
return RET_ERROR;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvolutionFp16Impl, this,
thread_count_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvolutionFp16Impl, this, thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "conv fp16 error ret[" << ret << "]";
}

View File

@ -213,8 +213,8 @@ int ConvolutionWinogradFP16CPUKernel::Run() {
return RET_ERROR;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
ConvolutionWinogradFp16Impl, this, thread_count_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvolutionWinogradFp16Impl, this, thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "conv winograd error error_code[" << ret << "]";
}

View File

@ -53,8 +53,8 @@ int CropFp16CPUKernel::Run() {
input_ptr_ = reinterpret_cast<float16_t *>(input_tensor->data_c());
output_ptr_ = reinterpret_cast<float16_t *>(output_tensor->data_c());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, CropFp16Run, this,
crop_para_->thread_count_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(CropFp16Run, this, crop_para_->thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ParallelLaunch failed: " << ret;
}

View File

@ -173,8 +173,8 @@ int DeconvolutionDepthwiseFp16CPUKernel::Run() {
memset(output_ptr, 0, out_tensors_.at(kOutputIndex)->ElementsNum() * sizeof(float16_t));
packed_output_ = output_ptr;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeconvDwFp16Run, this,
conv_param_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeconvDwFp16Run, this, conv_param_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "DeconvDwFp16Run error: error_code[" << ret << "]";
}

View File

@ -218,8 +218,8 @@ int DeConvolutionFp16CPUKernel::Run() {
RowMajor2Col16MajorFp16Opt(batch_input_, pack_input_, input_plane_, conv_param_->input_channel_);
error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeConvFp16Run,
this, thread_count_);
error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeConvFp16Run, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "deconv fp16 run error! error_code[" << error_code << "]";
}

View File

@ -392,12 +392,12 @@ int DeConvWinogradFp16CPUKernel::Run() {
nhwc_output_ = output_ptr + batch_index * deconv_param_->output_plane_ * conv_param_->output_channel_;
::memset(nc4hw4_output_, 0, deconv_param_->output_plane_ * deconv_param_->oc_div4_ * C4NUM * sizeof(float16_t));
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeConvWgFp16Run, this,
deconv_param_->thread_num_);
static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeConvWgFp16Run, this, deconv_param_->thread_num_);
/*post bias activate and nhwc */
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeConvWgPostFp16Run, this,
thread_num_hw_);
static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeConvWgPostFp16Run, this, thread_num_hw_);
}
return RET_OK;

View File

@ -148,8 +148,8 @@ int GatherFp16CPUKernel::Run() {
Float32ToFloat16(reinterpret_cast<float *>(input_tensor->data_c()), input_data_, input_tensor->ElementsNum());
}
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, GatherRunFp16, this,
op_parameter_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(GatherRunFp16, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Gather function error error_code[" << ret << "]";
}

View File

@ -109,8 +109,8 @@ int InstanceNormFp16Run(void *cdata, int task_id) {
int InstanceNormFp16CPUKernel::Run() {
src_data_ = reinterpret_cast<float16_t *>(in_tensors_[0]->data_c());
dst_data_ = reinterpret_cast<float16_t *>(out_tensors_[0]->data_c());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, InstanceNormFp16Run,
this, op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(InstanceNormFp16Run, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "InstanceNormFp16Run error error_code[" << ret << "]";
return ret;

View File

@ -95,8 +95,8 @@ int LayerNormFp16CPUKernel::Run() {
var_data_ =
reinterpret_cast<float16_t *>(context_->allocator->Malloc(param_->norm_outer_size_ * sizeof(float16_t)));
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, LayerNormFp16Run, this,
op_parameter_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(LayerNormFp16Run, this, op_parameter_->thread_num_);
if (out_tensors_.size() != 3) {
context_->allocator->Free(mean_data_);
context_->allocator->Free(var_data_);

View File

@ -95,8 +95,8 @@ int LogSoftmaxLastAxisFp16Run(void *cdata, int task_id) {
int LogSoftmaxFp16CPUKernel::Run() {
if (in_plane_size_ == 1) {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
LogSoftmaxLastAxisFp16Run, this, context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(LogSoftmaxLastAxisFp16Run, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "LogSoftmaxFp16CPUKernel ParallelLaunch failed, ret: " << ret;
}

View File

@ -294,8 +294,8 @@ int MatmulBaseFP16CPUKernel::Run() {
batch_b_ptr_ = b_pack_ptr_ + i * params_->deep_ * params_->col_align_;
batch_c_ptr_ = c_ptr + i * params_->row_ * params_->col_;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, MatmulBaseFP16Run,
this, thread_count_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(MatmulBaseFP16Run, this, thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "MatmulBaseFloatRun failed";
return ret;

View File

@ -89,8 +89,8 @@ int PadFp16CPUKernel::Run() {
output_[i] = pad_param_->constant_value_;
}
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, PadImpl, this,
op_parameter_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(PadImpl, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "BatchnormRun error error_code[" << ret << "]";
}
@ -102,8 +102,8 @@ int PadFp16CPUKernel::Run() {
return ret;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, MirrorPadImpl, this,
context_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(MirrorPadImpl, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Pad Reflect or Symmetric mode run error, error_code[" << ret << "]";
}

View File

@ -90,8 +90,8 @@ int PoolingFp16CPUKernel::Run() {
fp16_input_ = reinterpret_cast<float16_t *>(input_tensor->data_c());
fp16_output_ = reinterpret_cast<float16_t *>(output_tensor->data_c());
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
PoolingFp16Impl, this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(PoolingFp16Impl, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "pooling error error_code[" << error_code << "]";
return RET_ERROR;

View File

@ -87,8 +87,8 @@ int PowerFp16CPUKernel::Run() {
return ret;
}
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, PowerImplFp16, this,
thread_count_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(PowerImplFp16, this, thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "PowerFp16CPUKernel error: " << ret;
return RET_ERROR;

View File

@ -164,8 +164,8 @@ int QuantDTypeCastFp16CPUKernel::Run() {
return RET_ERROR;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
QuantDTypeCastFP16Run, this, thread_n_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(QuantDTypeCastFP16Run, this, thread_n_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Scale error error_code[" << ret << "]";
return RET_ERROR;

View File

@ -93,8 +93,8 @@ int ReduceFp16CPUKernel::Run() {
outer_size_ = outer_sizes_.at(i);
inner_size_ = inner_sizes_.at(i);
axis_size_ = axis_sizes_.at(i);
auto error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
ReduceFp16Impl, this, context_->thread_num_);
auto error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ReduceFp16Impl, this, context_->thread_num_);
if (error_code != RET_OK) {
FreeTmpBuffer();
MS_LOG(ERROR) << "Reduce run error, error_code[" << error_code << "]";
@ -109,8 +109,8 @@ int ReduceFp16CPUKernel::Run() {
outer_size_ = outer_sizes_.back();
inner_size_ = inner_sizes_.back();
axis_size_ = axis_sizes_.back();
auto error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
ReduceFp16Impl, this, context_->thread_num_);
auto error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ReduceFp16Impl, this, context_->thread_num_);
if (error_code != RET_OK) {
FreeTmpBuffer();
MS_LOG(ERROR) << "Reduce run error, error_code[" << error_code << "]";

View File

@ -115,8 +115,8 @@ int ScaleFp16CPUKernel::Run() {
return ret;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ScaleFp16Run, this,
op_parameter_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ScaleFp16Run, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Scale error error_code[" << ret << "]";
FreeTmpBuffer();

View File

@ -63,8 +63,8 @@ int SliceFp16CPUKernel::Run() {
DoSliceNoParallel(input_data, out_tensors_.at(0)->data_c(), param_, lite::DataTypeSize(kNumberTypeFloat16));
return RET_OK;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, SliceFp16Launch,
this, op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(SliceFp16Launch, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "fp16 slice launch fail!ret: " << ret;
return RET_ERROR;

View File

@ -95,8 +95,8 @@ int SoftmaxLastAxisFp16Run(void *cdata, int task_id) {
int SoftmaxFp16CPUKernel::Run() {
if (in_plane_size_ == 1) {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
SoftmaxLastAxisFp16Run, this, context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(SoftmaxLastAxisFp16Run, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "SoftmaxFp16CPUKernel ParallelLaunch failed, ret: " << ret;
}

View File

@ -102,7 +102,7 @@ int StackFp16CPUKernel::Run() {
// run stack
num_threads_ = MSMIN(UP_DIV(outer_size_, 64), this->context_->thread_num_);
ret =
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, StackRun, this, num_threads_);
static_cast<const lite::InnerContext *>(this->context_)->thread_pool_->ParallelLaunch(StackRun, this, num_threads_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "StackBaseCPUKernel Run error: error_code[" << ret << "]";
return RET_ERROR;

View File

@ -79,8 +79,8 @@ int ActivationGradRunFp16(void *cdata, int task_id) {
}
int ActivationGradCPUKernelFp16::Run() {
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
ActivationGradRunFp16, this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ActivationGradRunFp16, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "Activation Grad function error error_code[" << error_code << "]";
return RET_ERROR;

View File

@ -73,8 +73,8 @@ int ArithmeticSelfGradFp16Run(void *cdata, int task_id) {
}
int ArithmeticSelfGradFp16CPUKernel::Run() {
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
ArithmeticSelfGradFp16Run, this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ArithmeticSelfGradFp16Run, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "Activation Grad function error error_code[" << error_code << "]";
return RET_ERROR;

View File

@ -105,8 +105,8 @@ int ActivationRun(void *cdata, int task_id) {
}
int ActivationCPUKernel::Run() {
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ActivationRun,
this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ActivationRun, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "Activation function error error_code[" << error_code << "]";
return RET_ERROR;

View File

@ -122,8 +122,8 @@ int AdderCPUKernel::Run() {
return RET_ERROR;
}
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, AdderImpl,
this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(AdderImpl, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "adder error error_code[" << error_code << "]";
FreeTmpBuffer();

View File

@ -89,8 +89,8 @@ int AddNCPUKernel::Run() {
in1_addr_ = input0_data;
in2_addr_ = input1_data;
out_addr_ = output_data;
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, AddNLaunch, this,
op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(AddNLaunch, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "addn launch fail!ret: " << ret;
return RET_ERROR;
@ -98,8 +98,8 @@ int AddNCPUKernel::Run() {
for (size_t i = 2; i < in_tensors_.size(); ++i) {
in1_addr_ = reinterpret_cast<float *>(in_tensors_[i]->MutableData());
in2_addr_ = output_data;
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, AddNLaunch, this,
op_parameter_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(AddNLaunch, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "addn launch fail!ret: " << ret << ", input index: " << i;
return RET_ERROR;

View File

@ -419,8 +419,8 @@ int ArithmeticCPUKernel::Run() {
input1_ptr_ = in_tensors_[1]->data_c();
}
output_ptr_ = out_tensors_[0]->data_c();
return ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ArithmeticsRun, this,
context_->thread_num_);
return static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ArithmeticsRun, this, context_->thread_num_);
}
REG_KERNEL(kCPU, kNumberTypeFloat32, PrimitiveType_MulFusion, LiteKernelCreator<ArithmeticCPUKernel>)

View File

@ -113,8 +113,8 @@ int ArithmeticSelfRun(void *cdata, int task_id) {
}
int ArithmeticSelfCPUKernel::Run() {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ArithmeticSelfRun,
this, op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ArithmeticSelfRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ArithmeticSelfRun error error_code[" << ret << "]";
}

View File

@ -75,8 +75,8 @@ int BatchnormCPUKernel::InitConstTensor() {
}
int BatchnormCPUKernel::Run() {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, BatchNormRun, this,
op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(BatchNormRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "BatchnormRun error error_code[" << ret << "]";
}

View File

@ -141,8 +141,8 @@ int CastCPUKernel::Run() {
if (data_num_ == 0) {
return RET_OK;
}
return ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, CastRun, this,
op_parameter_->thread_num_);
return static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(CastRun, this, op_parameter_->thread_num_);
}
REG_KERNEL(kCPU, kNumberTypeFloat32, PrimitiveType_Cast, LiteKernelCreator<CastCPUKernel>)

View File

@ -69,8 +69,8 @@ int ConcatRun(void *cdata, int task_id) {
}
int ConcatCPUKernel::Run() {
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConcatRun,
this, op_parameter_->thread_num_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConcatRun, this, op_parameter_->thread_num_);
return error_code;
}

View File

@ -22,7 +22,6 @@
#include "nnacl/concat_parameter.h"
#include "include/errorcode.h"
#include "src/runtime/runtime_api.h"
#include "src/runtime/thread_pool.h"
#include "include/context.h"
using mindspore::lite::InnerContext;

View File

@ -247,12 +247,12 @@ int Convolution1x1CPUKernel::Run() {
}
if (multi_thread_by_hw_) {
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, Convolution1x1RunHw, this,
thread_count_);
static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(Convolution1x1RunHw, this, thread_count_);
} else {
PackMatmulInput(input_ptr_, pack_input_, matmul_param_->row_, matmul_param_->deep_);
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, Convolution1x1Run, this,
thread_count_);
static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(Convolution1x1Run, this, thread_count_);
}
}

View File

@ -126,8 +126,8 @@ int ConvolutionDepthwise3x3CPUKernel::Run() {
auto output_tensor = out_tensors_.at(kOutputIndex);
output_ptr_ = reinterpret_cast<float *>(output_tensor->data_c());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvDw3x3Run, this,
conv_param_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvDw3x3Run, this, conv_param_->thread_num_);
ctx_->allocator->Free(buffer_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ConvDw3x3Run error: error_code[" << ret << "]";

View File

@ -107,8 +107,8 @@ int ConvolutionDepthwiseCPUKernel::Run() {
auto output_tensor = out_tensors_.at(kOutputIndex);
output_ptr_ = reinterpret_cast<float *>(output_tensor->MutableData());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvDwRun, this,
conv_param_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvDwRun, this, conv_param_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ConvDwRun error: error_code[" << ret << "]";
return RET_ERROR;

View File

@ -194,8 +194,8 @@ int ConvolutionDepthwiseIndirectCPUKernel::Run() {
ConvDwInitIndirection(indirect_buffer_, packed_input_, zero_ptr_, conv_param_, step_h, step_w);
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvDwIndirectRun,
this, conv_param_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvDwIndirectRun, this, conv_param_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ConvDwIndirectRun error: error_code[" << ret << "]";
return RET_ERROR;

View File

@ -163,8 +163,8 @@ int ConvolutionDepthwiseSWCPUKernel::Run() {
packed_output_ = output_ptr;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvDwSWRun, this,
conv_param_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvDwSWRun, this, conv_param_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "ConvDwSWRun error: error_code[" << ret << "]";
}

View File

@ -152,8 +152,8 @@ int ConvolutionCPUKernel::Run() {
PackWeight();
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvolutionImpl, this,
thread_count_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvolutionImpl, this, thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "conv error error_code[" << ret << "]";
}

View File

@ -179,8 +179,8 @@ int ConvolutionSWCPUKernel::Run() {
auto input_data = in_tensors_.at(kInputIndex)->MutableData();
MS_ASSERT(input_data != nullptr);
ori_input_data_ = reinterpret_cast<float *>(input_data);
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
ConvolutionSWImpl, this, thread_count_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvolutionSWImpl, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "conv error error_code[" << error_code << "]";
FreeTmpBuffer();

View File

@ -219,8 +219,8 @@ int ConvolutionWinogradCPUKernel::Run() {
InitWeightBias();
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ConvolutionWinogradImpl,
this, thread_count_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ConvolutionWinogradImpl, this, thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "conv winograd error error_code[" << ret << "]";
}

View File

@ -159,8 +159,8 @@ int CropAndResizeCPUKernel::Run() {
return ret;
}
int error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
CropAndResizeImpl, this, context_->thread_num_);
int error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(CropAndResizeImpl, this, context_->thread_num_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "CropAndResize run error, error_code[" << error_code << "]";
FreeTmpBuffer();

View File

@ -62,8 +62,8 @@ int CropCPUKernel::Run() {
return RET_OK;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, CropLaunch, this,
crop_para_->thread_count_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(CropLaunch, this, crop_para_->thread_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Crop launch fail!ret: " << ret;
return RET_ERROR;

View File

@ -137,8 +137,8 @@ int CumSumCPUKernel::DoCumsumInt(int task_id) {
}
int CumSumCPUKernel::Run() {
int ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, CumsumLaunch, this,
op_parameter_->thread_num_);
int ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(CumsumLaunch, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Crop launch fail!ret: " << ret;

View File

@ -168,8 +168,8 @@ int DeconvolutionDepthwiseCPUKernel::Run() {
packed_output_ = output_addr;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeconvDwRun, this,
conv_param_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeconvDwRun, this, conv_param_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "DeconvDwRun error: error_code[" << ret << "]";
}

View File

@ -227,8 +227,8 @@ int DeConvolutionCPUKernel::Run() {
RowMajor2Col12Major(input_ptr_, pack_input_, matmul_param_->row_, matmul_param_->deep_);
#endif
error_code = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeConvFp32Run,
this, thread_count_);
error_code = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeConvFp32Run, this, thread_count_);
if (error_code != RET_OK) {
MS_LOG(ERROR) << "deconv fp32 run error! error_code[" << error_code << "]";
FreeRunBuf();

View File

@ -411,12 +411,12 @@ int DeConvolutionWinogradCPUKernel::Run() {
nhwc_output_ = src_out + batch_index * deconv_param_->output_plane_ * conv_param_->output_channel_;
::memset(nc4hw4_output_, 0, deconv_param_->output_plane_ * deconv_param_->oc_div4_ * C4NUM * sizeof(float));
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeConvWgFp32Run, this,
deconv_param_->thread_num_);
static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeConvWgFp32Run, this, deconv_param_->thread_num_);
/*post bias activate and nhwc */
ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, DeConvWgPostFp32Run, this,
thread_num_hw_);
static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(DeConvWgPostFp32Run, this, thread_num_hw_);
}
FreeRunBuf();

View File

@ -55,8 +55,8 @@ int EluRun(void *cdata, int task_id) {
}
int EluCPUKernel::Run() {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, EluRun, this,
op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(EluRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Elu error: error_code[" << ret << "]";
return RET_ERROR;

View File

@ -87,8 +87,8 @@ int EmbeddingLookupCPUKernel::Run() {
memcpy(input_addr_ + dest_loc, input_t, sizeof(float) * in_tensors_.at(i)->ElementsNum());
dest_loc += in_tensors_.at(i)->ElementsNum();
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, EmbeddingLookupRun,
this, op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(EmbeddingLookupRun, this, op_parameter_->thread_num_);
FreeRunBuff();
if (ret != RET_OK) {
MS_LOG(ERROR) << "EmbeddingLookup error: error_code[" << ret << "]";

View File

@ -73,8 +73,8 @@ int ExpCPUKernel::Run() {
output_addr_ = reinterpret_cast<float *>(out_tensors_.front()->MutableData());
exp_parameter_->element_num_ = in_tensors_.front()->ElementsNum();
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, ExpRun, this,
exp_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(ExpRun, this, exp_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Exp error: error_code[" << ret << "]";
return RET_ERROR;

View File

@ -91,8 +91,8 @@ int FillCPUKernel::Run() {
MS_LOG(ERROR) << "unsupported fill data type " << fill_input->data_type();
return RET_ERROR;
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, FillRun, this,
thread_sz_count_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(FillRun, this, thread_sz_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "FillRun error error_code[" << ret << "]";
return ret;

View File

@ -93,8 +93,8 @@ int FusedBatchnormCPUKernel::Run() {
trained_ = true; // trained at least once
}
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, BatchNormRun, this,
op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(BatchNormRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "BatchnormRun error error_code[" << ret << "]";
}

View File

@ -128,8 +128,8 @@ int GatherNdCPUKernel::Run() {
in_ptr_ = reinterpret_cast<float *>(in_tensors_.front()->MutableData());
out_ptr_ = reinterpret_cast<float *>(out_tensors_.front()->MutableData());
InitOffset();
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, GatherNdRun, this,
thread_sz_count_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(GatherNdRun, this, thread_sz_count_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "gatherNd error error_code[" << ret << "]";
return ret;

View File

@ -92,8 +92,8 @@ int GatherCPUKernel::Run() {
return ret;
}
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, GatherRun, this,
op_parameter_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(GatherRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "Gather function error error_code[" << ret << "]";
}

View File

@ -66,8 +66,8 @@ int InstanceNormCPUKernel::Run() {
gamma_data_ = reinterpret_cast<float *>(in_tensors_.at(1)->data_c());
beta_data_ = reinterpret_cast<float *>(in_tensors_.at(2)->data_c());
dst_data_ = reinterpret_cast<float *>(out_tensors_.at(0)->data_c());
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, InstanceNormRun,
this, op_parameter_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(InstanceNormRun, this, op_parameter_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "InstanceNormRun error error_code[" << ret << "]";
return ret;

View File

@ -146,8 +146,8 @@ int L2NormCPUKernel::Run() {
output_ptr_ = reinterpret_cast<float *>(out_tensors_.at(kOutputIndex)->MutableData());
if (l2_norm_param_->axis_num_ == 0 || l2_norm_param_->axis_num_ == input_shape.size()) {
// all axis
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, SquareSumRun, this,
context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(SquareSumRun, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "L2Norm error: error_code[" << ret << "]";
return RET_ERROR;
@ -157,15 +157,15 @@ int L2NormCPUKernel::Run() {
sum += tmp_sum_[i];
}
sqrt_sum_ = sqrt(sum > l2_norm_param_->epsilon_ ? sum : l2_norm_param_->epsilon_);
ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_, L2NormRun, this,
context_->thread_num_);
ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(L2NormRun, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "L2Norm error: error_code[" << ret << "]";
return RET_ERROR;
}
} else if (l2_norm_param_->axis_num_ == 1 && l2_norm_param_->axis_[0] == static_cast<int>(input_shape.size()) - 1) {
auto ret = ParallelLaunch(static_cast<const lite::InnerContext *>(this->context_)->thread_pool_,
L2NormTrailingAxisRun, this, context_->thread_num_);
auto ret = static_cast<const lite::InnerContext *>(this->context_)
->thread_pool_->ParallelLaunch(L2NormTrailingAxisRun, this, context_->thread_num_);
if (ret != RET_OK) {
MS_LOG(ERROR) << "L2Norm error: error_code[" << ret << "]";
return RET_ERROR;

Some files were not shown because too many files have changed in this diff Show More