forked from mindspore-Ecosystem/mindspore
!18652 [MS][LITE]optimize mindrt
Merge pull request !18652 from zhaizhiqiang/master
This commit is contained in:
commit
dce1631adc
|
@ -59,6 +59,7 @@ class ActorBase {
|
|||
}
|
||||
|
||||
explicit ActorBase(const std::string &name);
|
||||
explicit ActorBase(const std::string &name, ActorThreadPool *pool);
|
||||
virtual ~ActorBase();
|
||||
|
||||
// send MessageBase message to the actor.
|
||||
|
@ -198,7 +199,6 @@ class ActorBase {
|
|||
void SetRunningStatus(bool start);
|
||||
|
||||
std::unique_ptr<ActorPolicy> actorPolicy;
|
||||
ActorThreadPool *pool_{nullptr};
|
||||
|
||||
AID id;
|
||||
std::map<std::string, ActorFunction> actionFunctions;
|
||||
|
@ -206,6 +206,8 @@ class ActorBase {
|
|||
|
||||
std::string msgRecords[MAX_ACTOR_RECORD_SIZE];
|
||||
uint32_t recordNextPoint = 0;
|
||||
|
||||
ActorThreadPool *pool_{nullptr};
|
||||
};
|
||||
|
||||
}; // namespace mindspore
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
|
||||
#include "actor/actor.h"
|
||||
#include "actor/log.h"
|
||||
|
||||
#include "actor/actormgr.h"
|
||||
#include "async/apply.h"
|
||||
#include "async/future.h"
|
||||
|
||||
|
@ -31,7 +31,15 @@ namespace mindspore {
|
|||
|
||||
using MessageHandler = std::function<void(ActorBase *)>;
|
||||
|
||||
void Async(const AID &aid, std::unique_ptr<MessageHandler> &&handler);
|
||||
class MessageAsync : public MessageBase {
|
||||
public:
|
||||
explicit MessageAsync(MessageHandler &h) : MessageBase("Async", Type::KASYNC), handler(h) {}
|
||||
~MessageAsync() override {}
|
||||
void Run(ActorBase *actor) override { (handler)(actor); }
|
||||
|
||||
private:
|
||||
MessageHandler handler;
|
||||
};
|
||||
|
||||
namespace internal {
|
||||
|
||||
|
@ -43,10 +51,10 @@ template <>
|
|||
struct AsyncHelper<void> {
|
||||
template <typename F>
|
||||
void operator()(const AID &aid, F &&f) {
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { f(); }));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [=](ActorBase *) { f(); };
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -58,10 +66,11 @@ struct AsyncHelper<Future<R>> {
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { promise->Associate(f()); }));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
Async(aid, std::move(handler));
|
||||
MessageHandler handler = [=](ActorBase *) { promise->Associate(f()); };
|
||||
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
};
|
||||
|
@ -74,10 +83,10 @@ struct AsyncHelper {
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([=](ActorBase *) { promise->SetValue(f()); }));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [=](ActorBase *) { promise->SetValue(f()); };
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
};
|
||||
|
@ -87,41 +96,41 @@ struct AsyncHelper {
|
|||
// return void
|
||||
template <typename T>
|
||||
void Async(const AID &aid, void (T::*method)()) {
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([method](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
(t->*method)();
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [method](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
(t->*method)();
|
||||
};
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
}
|
||||
|
||||
template <typename T, typename Arg0, typename Arg1>
|
||||
void Async(const AID &aid, void (T::*method)(Arg0), Arg1 &&arg) {
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([method, arg](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
(t->*method)(arg);
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [method, arg](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
(t->*method)(arg);
|
||||
};
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
}
|
||||
|
||||
template <typename T, typename... Args0, typename... Args1>
|
||||
void Async(const AID &aid, void (T::*method)(Args0...), std::tuple<Args1...> &&tuple) {
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([method, tuple](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
Apply(t, method, tuple);
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [method, tuple](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
Apply(t, method, tuple);
|
||||
};
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
}
|
||||
|
||||
template <typename T, typename... Args0, typename... Args1>
|
||||
|
@ -137,16 +146,15 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)()) {
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([promise, method](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->Associate((t->*method)());
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [promise, method](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->Associate((t->*method)());
|
||||
};
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -156,16 +164,16 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Arg0), Arg1 &&arg) {
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, arg](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->Associate((t->*method)(arg));
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
std::function<void(ActorBase *)> handler = [promise, method, arg](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->Associate((t->*method)(arg));
|
||||
};
|
||||
|
||||
Async(aid, std::move(handler));
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -175,16 +183,16 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Args0...), std::tuple<Arg
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, tuple](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->Associate(Apply(t, method, tuple));
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
std::function<void(ActorBase *)> handler = [promise, method, tuple](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->Associate(Apply(t, method, tuple));
|
||||
};
|
||||
|
||||
Async(aid, std::move(handler));
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -202,16 +210,16 @@ Future<R> Async(const AID &aid, R (T::*method)()) {
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([promise, method](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->SetValue((t->*method)());
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
std::function<void(ActorBase *)> handler = [promise, method](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->SetValue((t->*method)());
|
||||
};
|
||||
|
||||
Async(aid, std::move(handler));
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -223,16 +231,15 @@ Future<R> Async(const AID &aid, R (T::*method)(Arg0), Arg1 &&arg) {
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, arg](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->SetValue((t->*method)(arg));
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [promise, method, arg](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->SetValue((t->*method)(arg));
|
||||
};
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -244,16 +251,15 @@ Future<R> Async(const AID &aid, R (T::*method)(Args0...), std::tuple<Args1...> &
|
|||
MINDRT_OOM_EXIT(promise);
|
||||
Future<R> future = promise->GetFuture();
|
||||
|
||||
std::unique_ptr<std::function<void(ActorBase *)>> handler(
|
||||
new (std::nothrow) std::function<void(ActorBase *)>([promise, method, tuple](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->SetValue(Apply(t, method, tuple));
|
||||
}));
|
||||
MINDRT_OOM_EXIT(handler);
|
||||
|
||||
Async(aid, std::move(handler));
|
||||
std::function<void(ActorBase *)> handler = [promise, method, tuple](ActorBase *actor) {
|
||||
MINDRT_ASSERT(actor != nullptr);
|
||||
T *t = static_cast<T *>(actor);
|
||||
MINDRT_ASSERT(t != nullptr);
|
||||
promise->SetValue(Apply(t, method, tuple));
|
||||
};
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(handler));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
return future;
|
||||
}
|
||||
|
||||
|
|
|
@ -298,12 +298,12 @@ class Future : public FutureBase {
|
|||
return OnAbandoned(std::forward<F>(f), FutureBase());
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type>
|
||||
Future<R> Then(internal::DeferredHelper<F> &&f, FutureBase) const {
|
||||
return Then<R>(std::move(f).operator std::function<Future<R>(const T &)>());
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if<
|
||||
!std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type>
|
||||
Future<R> Then(internal::DeferredHelper<F> &&f, LessFuture) const {
|
||||
|
|
|
@ -24,6 +24,9 @@ namespace mindspore {
|
|||
ActorBase::ActorBase(const std::string &name)
|
||||
: actorPolicy(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions() {}
|
||||
|
||||
ActorBase::ActorBase(const std::string &name, ActorThreadPool *pool)
|
||||
: actorPolicy(nullptr), id(name, ActorMgr::GetActorMgrRef()->GetUrl()), actionFunctions(), pool_(pool) {}
|
||||
|
||||
ActorBase::~ActorBase() {}
|
||||
|
||||
void ActorBase::Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> thread) {
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "async/async.h"
|
||||
#include "actor/actormgr.h"
|
||||
|
||||
namespace mindspore {
|
||||
|
||||
class MessageAsync : public MessageBase {
|
||||
public:
|
||||
explicit MessageAsync(std::unique_ptr<MessageHandler> &&h)
|
||||
: MessageBase("Async", Type::KASYNC), handler(std::move(h)) {}
|
||||
|
||||
~MessageAsync() override {}
|
||||
|
||||
void Run(ActorBase *actor) override { (*handler)(actor); }
|
||||
|
||||
private:
|
||||
std::unique_ptr<MessageHandler> handler;
|
||||
};
|
||||
|
||||
void Async(const AID &aid, std::unique_ptr<std::function<void(ActorBase *)>> &&handler) {
|
||||
std::unique_ptr<MessageAsync> msg(new (std::nothrow) MessageAsync(std::move(handler)));
|
||||
MINDRT_OOM_EXIT(msg);
|
||||
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
|
||||
}
|
||||
|
||||
} // namespace mindspore
|
|
@ -89,10 +89,11 @@ ActorThreadPool::~ActorThreadPool() {
|
|||
// wait until actor queue is empty
|
||||
bool terminate = false;
|
||||
do {
|
||||
std::lock_guard<std::mutex> _l(actor_mutex_);
|
||||
if (actor_queue_.empty()) {
|
||||
terminate = true;
|
||||
} else {
|
||||
{
|
||||
std::lock_guard<std::mutex> _l(actor_mutex_);
|
||||
terminate = actor_queue_.empty();
|
||||
}
|
||||
if (!terminate) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
} while (!terminate);
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <condition_variable>
|
||||
#include "thread/threadpool.h"
|
||||
#include "actor/actor.h"
|
||||
#include "thread/hqueue.h"
|
||||
|
||||
namespace mindspore {
|
||||
enum ThreadPolicy {
|
||||
|
@ -57,6 +58,7 @@ class ActorThreadPool : public ThreadPool {
|
|||
ActorReference PopActorFromQueue();
|
||||
|
||||
private:
|
||||
ActorThreadPool() {}
|
||||
int CreateThreads(size_t actor_thread_num, size_t all_thread_num, ThreadPolicy policy);
|
||||
|
||||
size_t actor_thread_num_{0};
|
||||
|
|
|
@ -21,57 +21,110 @@
|
|||
|
||||
namespace mindspore {
|
||||
// implement a lock-free queue
|
||||
template <class T>
|
||||
// refer to https://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
|
||||
|
||||
template <typename T>
|
||||
struct HQNode {
|
||||
HQNode() {}
|
||||
HQNode(const T &t_, HQNode<T> *n) : t(t_), next(n) {}
|
||||
T t;
|
||||
std::atomic<HQNode<T> *> next = nullptr;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class HQueue {
|
||||
public:
|
||||
HQueue(const HQueue &) = delete;
|
||||
HQueue &operator=(const HQueue &) = delete;
|
||||
explicit HQueue(size_t queue_size) : freeHead(0), usedHead(0) { cache.resize(queue_size); }
|
||||
HQueue() {}
|
||||
virtual ~HQueue() {
|
||||
freeHead = 0;
|
||||
usedHead = 0;
|
||||
// delete dummy head
|
||||
HQNode<T> *node = this->qhead;
|
||||
delete node;
|
||||
}
|
||||
|
||||
bool Enqueue(T *t) {
|
||||
size_t curPos = freeHead.load(std::memory_order_relaxed);
|
||||
size_t nextPos = curPos + 1;
|
||||
if (nextPos == cache.size()) {
|
||||
nextPos = 0;
|
||||
bool Init() {
|
||||
HQNode<T> *dummyHead = new HQNode<T>();
|
||||
if (!dummyHead) {
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t usedIndex = usedHead.load(std::memory_order_acquire);
|
||||
if (nextPos != usedIndex) {
|
||||
cache[curPos] = t;
|
||||
// move free head to new position
|
||||
freeHead.store(nextPos, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
// cache is full
|
||||
return false;
|
||||
qhead = dummyHead;
|
||||
qtail = dummyHead;
|
||||
return true;
|
||||
}
|
||||
|
||||
T *Dequeue() {
|
||||
size_t usedIndex = usedHead.load(std::memory_order_relaxed);
|
||||
size_t freeIndex = freeHead.load(std::memory_order_acquire);
|
||||
|
||||
if (freeIndex == usedHead) { // empty
|
||||
return nullptr;
|
||||
bool Enqueue(const T &data) {
|
||||
HQNode<T> *node = new HQNode<T>(data, nullptr);
|
||||
if (!node) {
|
||||
return false;
|
||||
}
|
||||
|
||||
T *ret = cache[usedIndex];
|
||||
usedIndex++;
|
||||
if (usedIndex == cache.size()) {
|
||||
usedIndex = 0;
|
||||
HQNode<T> *tail = nullptr;
|
||||
HQNode<T> *next = nullptr;
|
||||
while (true) {
|
||||
tail = this->qtail;
|
||||
next = tail->next;
|
||||
|
||||
if (tail != this->qtail) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (next == nullptr) {
|
||||
if (tail->next.compare_exchange_strong(next, node)) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
this->qtail.compare_exchange_strong(tail, next);
|
||||
}
|
||||
}
|
||||
usedHead.store(usedIndex, std::memory_order_release);
|
||||
return ret;
|
||||
this->qtail.compare_exchange_weak(tail, node);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Dequeue(T *data) {
|
||||
HQNode<T> *head = nullptr;
|
||||
HQNode<T> *tail = nullptr;
|
||||
HQNode<T> *next = nullptr;
|
||||
while (true) {
|
||||
head = this->qhead;
|
||||
tail = this->qtail;
|
||||
next = head->next;
|
||||
if (head != this->qhead) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (head == tail) {
|
||||
if (next == nullptr) {
|
||||
return false;
|
||||
}
|
||||
this->qtail.compare_exchange_strong(tail, next);
|
||||
} else {
|
||||
*data = next->t;
|
||||
if (this->qhead.compare_exchange_strong(head, next)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
delete head;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Empty() {
|
||||
HQNode<T> *head = this->qhead;
|
||||
HQNode<T> *tail = this->qtail;
|
||||
HQNode<T> *next = head->next;
|
||||
|
||||
if (head == this->qhead && head == tail && next == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<T *> cache;
|
||||
std::atomic<size_t> freeHead;
|
||||
std::atomic<size_t> usedHead;
|
||||
std::atomic<HQNode<T> *> qhead;
|
||||
std::atomic<HQNode<T> *> qtail;
|
||||
};
|
||||
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -154,7 +154,6 @@ if(ENABLE_MINDRT)
|
|||
${CORE_DIR}/mindrt/src/actor/actormgr.cc
|
||||
${CORE_DIR}/mindrt/src/actor/actorpolicy.cc
|
||||
${CORE_DIR}/mindrt/src/actor/aid.cc
|
||||
${CORE_DIR}/mindrt/src/async/async.cc
|
||||
${CORE_DIR}/mindrt/src/async/future.cc
|
||||
${CORE_DIR}/mindrt/src/async/uuid_base.cc
|
||||
${CORE_DIR}/mindrt/src/async/uuid_generator.cc
|
||||
|
@ -304,6 +303,7 @@ set(TEST_SRC
|
|||
${TEST_DIR}/common/common_test.cc
|
||||
${TEST_DIR}/ut/src/infer_test.cc
|
||||
${TEST_DIR}/ut/src/utils_test.cc
|
||||
${TEST_DIR}/ut/src/lite_mindrt_test.cc
|
||||
${TEST_DIR}/ut/src/dynamic_library_loader_test.cc
|
||||
${TEST_DIR}/ut/src/scheduler_test.cc
|
||||
${TEST_DIR}/ut/src/lite_mindrt_test.cc
|
||||
|
|
|
@ -28,7 +28,8 @@ class LiteMindRtTest : public mindspore::CommonTest {
|
|||
};
|
||||
|
||||
TEST_F(LiteMindRtTest, HQueueTest) {
|
||||
HQueue<int> hq(10000);
|
||||
HQueue<int *> hq;
|
||||
hq.Init();
|
||||
std::vector<int *> v1(2000);
|
||||
int d1 = 1;
|
||||
for (size_t s = 0; s < v1.size(); s++) {
|
||||
|
@ -42,12 +43,12 @@ TEST_F(LiteMindRtTest, HQueueTest) {
|
|||
|
||||
std::thread t1([&]() {
|
||||
for (size_t s = 0; s < v1.size(); s++) {
|
||||
ASSERT_EQ(hq.Enqueue(v1[s]), true);
|
||||
hq.Enqueue(v1[s]);
|
||||
}
|
||||
});
|
||||
std::thread t2([&]() {
|
||||
for (size_t s = 0; s < v2.size(); s++) {
|
||||
ASSERT_EQ(hq.Enqueue(v2[s]), true);
|
||||
hq.Enqueue(v2[s]);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -57,7 +58,8 @@ TEST_F(LiteMindRtTest, HQueueTest) {
|
|||
std::thread t3([&]() {
|
||||
size_t loop = v1.size() + v2.size();
|
||||
while (loop) {
|
||||
int *val = hq.Dequeue();
|
||||
int *val = nullptr;
|
||||
hq.Dequeue(&val);
|
||||
if (val == nullptr) {
|
||||
continue;
|
||||
}
|
||||
|
@ -79,7 +81,8 @@ TEST_F(LiteMindRtTest, HQueueTest) {
|
|||
|
||||
ASSERT_EQ(c1, v1.size());
|
||||
ASSERT_EQ(c2, v2.size());
|
||||
ASSERT_EQ(hq.Dequeue(), nullptr);
|
||||
int *tmp = nullptr;
|
||||
ASSERT_EQ(hq.Dequeue(&tmp), false);
|
||||
|
||||
for (size_t s = 0; s < v1.size(); s++) {
|
||||
delete v1[s];
|
||||
|
@ -89,4 +92,69 @@ TEST_F(LiteMindRtTest, HQueueTest) {
|
|||
delete v2[s];
|
||||
}
|
||||
}
|
||||
|
||||
class TestActor : public ActorBase {
|
||||
public:
|
||||
explicit TestActor(const std::string &nm, ActorThreadPool *pool, const int i) : ActorBase(nm, pool), data(i) {}
|
||||
int Fn1(int *val) {
|
||||
if (val) {
|
||||
(*val)++;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int Fn2(int *val) { return data + (*val); }
|
||||
|
||||
public:
|
||||
int data = 0;
|
||||
};
|
||||
|
||||
TEST_F(LiteMindRtTest, ActorThreadPoolTest) {
|
||||
Initialize("", "", "", "", 4);
|
||||
auto pool = ActorThreadPool::CreateThreadPool(4, KThreadSpin);
|
||||
AID t1 = Spawn(ActorReference(new TestActor("t1", pool, 1)));
|
||||
AID t2 = Spawn(ActorReference(new TestActor("t2", pool, 2)));
|
||||
AID t3 = Spawn(ActorReference(new TestActor("t3", pool, 3)));
|
||||
AID t4 = Spawn(ActorReference(new TestActor("t4", pool, 4)));
|
||||
AID t5 = Spawn(ActorReference(new TestActor("t5", pool, 5)));
|
||||
AID t6 = Spawn(ActorReference(new TestActor("t6", pool, 6)));
|
||||
|
||||
std::vector<int *> vv;
|
||||
std::vector<Future<int>> fv;
|
||||
size_t sz = 2000;
|
||||
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
vv.emplace_back(new int(i));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
int *val = vv[i];
|
||||
Future<int> ret;
|
||||
ret = Async(t1, &TestActor::Fn1, val) // (*vv[i])++;
|
||||
.Then(Defer(t2, &TestActor::Fn2, val), ret) // t2.data += (*vv[i]);
|
||||
.Then(Defer(t3, &TestActor::Fn1, val), ret) // (*vv[i])++;
|
||||
.Then(Defer(t4, &TestActor::Fn2, val), ret) // t4.data += (*vv[i]);
|
||||
.Then(Defer(t5, &TestActor::Fn1, val), ret) // (*vv[i])++;
|
||||
.Then(Defer(t6, &TestActor::Fn2, val), ret); // t6.data += (*vv[i]);
|
||||
fv.emplace_back(ret);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < vv.size(); i++) {
|
||||
int val = static_cast<int>(i);
|
||||
int expected = 0;
|
||||
|
||||
val += 3; // t1.Fn1
|
||||
expected = 6; // t6.data
|
||||
expected += val;
|
||||
|
||||
ASSERT_EQ(fv[i].Get(), expected);
|
||||
ASSERT_EQ(*vv[i], val);
|
||||
}
|
||||
|
||||
Finalize();
|
||||
|
||||
for (size_t i = 0; i < vv.size(); i++) {
|
||||
delete vv[i];
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace mindspore
|
||||
|
|
Loading…
Reference in New Issue