!25535 [MSLITE] mindrt bug

Merge pull request !25535 from ling/mindrt
This commit is contained in:
i-robot 2021-10-30 02:27:32 +00:00 committed by Gitee
commit d09c93b3c8
11 changed files with 33 additions and 39 deletions

View File

@ -28,14 +28,10 @@
#include "actor/mailbox.h"
namespace mindspore {
class ActorBase;
class ActorMgr;
class ActorWorker;
class ActorThreadPool;
using ActorReference = std::shared_ptr<ActorBase>;
// should be at least greater than 1
constexpr uint32_t MAX_ACTOR_RECORD_SIZE = 3;
@ -200,9 +196,9 @@ class ActorBase {
void Run();
void Quit();
int EnqueMessage(std::unique_ptr<MessageBase> msg);
int EnqueMessage(std::unique_ptr<MessageBase> msg) const;
void Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<MailBox> mailbox);
void Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailbox);
std::unique_ptr<MailBox> mailbox;
std::atomic_bool terminating_ = false;
@ -217,6 +213,6 @@ class ActorBase {
ActorThreadPool *pool_{nullptr};
};
using ActorReference = std::shared_ptr<ActorBase>;
}; // namespace mindspore
#endif

View File

@ -45,7 +45,8 @@ class AppActor : public ActorBase {
// send T message to the actor
template <typename M>
int Send(const std::string &to, const std::string &msgName, std::unique_ptr<M> msg) {
std::unique_ptr<MessageLocal> localMsg(new (std::nothrow) MessageLocal(GetAID(), to, msgName, msg.release()));
auto localMsg =
std::unique_ptr<MessageLocal>(new (std::nothrow) MessageLocal(GetAID(), to, msgName, msg.release()));
MINDRT_OOM_EXIT(localMsg);
return Send(to, std::move(localMsg));
}

View File

@ -52,7 +52,7 @@ struct AsyncHelper<void> {
template <typename F>
void operator()(const AID &aid, F &&f) {
std::function<void(ActorBase *)> handler = [=](ActorBase *) { f(); };
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
}
@ -62,13 +62,13 @@ template <typename R>
struct AsyncHelper<Future<R>> {
template <typename F>
Future<R> operator()(const AID &aid, F &&f) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
auto promise = std::shared_ptr<Promise<R>>(new (std::nothrow) Promise<R>());
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();
MessageHandler handler = [=](ActorBase *) { promise->Associate(f()); };
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
return future;
@ -79,12 +79,12 @@ template <typename R>
struct AsyncHelper {
template <typename F>
Future<R> operator()(const AID &aid, F &&f) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
auto promise = std::shared_ptr<Promise<R>>(new (std::nothrow) Promise<R>());
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();
std::function<void(ActorBase *)> handler = [=](ActorBase *) { promise->SetValue(f()); };
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
return future;
@ -102,7 +102,7 @@ void Async(const AID &aid, void (T::*method)()) {
MINDRT_ASSERT(t != nullptr);
(t->*method)();
};
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
}
@ -115,7 +115,7 @@ void Async(const AID &aid, void (T::*method)(Arg0), Arg1 &&arg) {
MINDRT_ASSERT(t != nullptr);
(t->*method)(arg);
};
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
}
@ -128,7 +128,7 @@ void Async(const AID &aid, void (T::*method)(Args0...), std::tuple<Args1...> &&t
MINDRT_ASSERT(t != nullptr);
Apply(t, method, tuple);
};
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
}
@ -142,7 +142,7 @@ void Async(const AID &aid, void (T::*method)(Args0...), Args1 &&... args) {
// return future
template <typename R, typename T>
Future<R> Async(const AID &aid, Future<R> (T::*method)()) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
auto promise = std::shared_ptr<Promise<R>>(new (std::nothrow) Promise<R>());
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();
@ -160,7 +160,7 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)()) {
template <typename R, typename T, typename Arg0, typename Arg1>
Future<R> Async(const AID &aid, Future<R> (T::*method)(Arg0), Arg1 &&arg) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
auto promise = std::shared_ptr<Promise<R>>(new (std::nothrow) Promise<R>());
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();
@ -171,7 +171,7 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Arg0), Arg1 &&arg) {
promise->Associate((t->*method)(arg));
};
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
return future;
@ -179,7 +179,7 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Arg0), Arg1 &&arg) {
template <typename R, typename T, typename... Args0, typename... Args1>
Future<R> Async(const AID &aid, Future<R> (T::*method)(Args0...), std::tuple<Args1...> &&tuple) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
auto promise = std::shared_ptr<Promise<R>>(new (std::nothrow) Promise<R>());
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();
@ -190,7 +190,7 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Args0...), std::tuple<Arg
promise->Associate(Apply(t, method, tuple));
};
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
return future;
@ -206,7 +206,7 @@ Future<R> Async(const AID &aid, Future<R> (T::*method)(Args0...), Args1 &&... ar
template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int>::type = 0,
typename std::enable_if<!internal::IsFuture<R>::value, int>::type = 0, typename T>
Future<R> Async(const AID &aid, R (T::*method)()) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
auto promise = std::shared_ptr<Promise<R>>(new (std::nothrow) Promise<R>());
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();
@ -227,7 +227,7 @@ template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int
typename std::enable_if<!internal::IsFuture<R>::value, int>::type = 0, typename T, typename Arg0,
typename Arg1>
Future<R> Async(const AID &aid, R (T::*method)(Arg0), Arg1 &&arg) {
std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
auto promise = std::shared_ptr<Promise<R>>(new (std::nothrow) Promise<R>());
MINDRT_OOM_EXIT(promise);
Future<R> future = promise->GetFuture();
@ -237,7 +237,7 @@ Future<R> Async(const AID &aid, R (T::*method)(Arg0), Arg1 &&arg) {
MINDRT_ASSERT(t != nullptr);
promise->SetValue((t->*method)(arg));
};
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
return future;
@ -257,7 +257,7 @@ Future<R> Async(const AID &aid, R (T::*method)(Args0...), std::tuple<Args1...> &
MINDRT_ASSERT(t != nullptr);
promise->SetValue(Apply(t, method, tuple));
};
std::unique_ptr<MessageBase> msg(new (std::nothrow) MessageAsync(std::move(handler)));
auto msg = std::unique_ptr<MessageBase>(new (std::nothrow) MessageAsync(std::move(handler)));
MINDRT_OOM_EXIT(msg);
(void)ActorMgr::GetActorMgrRef()->Send(aid, std::move(msg));
return future;

View File

@ -33,7 +33,7 @@ int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv = "", con
const std::string &udpUrlAdv = "", int threadCount = 0);
// brief spawn a process to run an actor
AID Spawn(ActorReference actor, bool sharedThread = true);
AID Spawn(const ActorReference actor, bool sharedThread = true);
// brief wait for the actor process to exit . It will be discarded
void Await(const ActorReference &actor);

View File

@ -29,7 +29,7 @@ ActorBase::ActorBase(const std::string &name, ActorThreadPool *pool)
ActorBase::~ActorBase() {}
void ActorBase::Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<MailBox> mailboxPtr) {
void ActorBase::Spawn(const std::shared_ptr<ActorBase>, std::unique_ptr<MailBox> mailboxPtr) {
// lock here or await(). and unlock at Quit() or at await.
waiterLock.lock();
this->mailbox = std::move(mailboxPtr);
@ -67,7 +67,7 @@ void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
<< ",m=" << msg->Name().c_str();
}
}
int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> msg) {
int ActorBase::EnqueMessage(std::unique_ptr<MessageBase> msg) const {
int ret = mailbox->EnqueueMessage(std::move(msg));
return ret;
}
@ -121,14 +121,13 @@ void ActorBase::Run() {
if (msg == nullptr) {
continue;
}
MS_LOG_DEBUG << "dequeue message]actor=" << id.Name() << ",msg=" << msg->Name();
MS_LOG(DEBUG) << "dequeue message]actor=" << id.Name() << ",msg=" << msg->Name();
if (msgHandler(msg) == ACTOR_TERMINATED) {
return;
}
}
msgs->clear();
}
} else {
while (auto msg = mailbox->GetMsg()) {
if (msgHandler(msg) == ACTOR_TERMINATED) {

View File

@ -187,7 +187,7 @@ ActorReference ActorMgr::GetActor(const AID &id) {
}
}
int ActorMgr::EnqueueMessage(mindspore::ActorReference actor, std::unique_ptr<mindspore::MessageBase> msg) {
int ActorMgr::EnqueueMessage(const mindspore::ActorReference actor, std::unique_ptr<mindspore::MessageBase> msg) {
return actor->EnqueMessage(std::move(msg));
}

View File

@ -80,7 +80,7 @@ class ActorMgr {
return false;
}
}
int EnqueueMessage(ActorReference actor, std::unique_ptr<MessageBase> msg);
int EnqueueMessage(const ActorReference actor, std::unique_ptr<MessageBase> msg);
// in order to avoid being initialized many times
std::atomic_bool initialized_{false};

View File

@ -16,11 +16,10 @@
#include "actor/mailbox.h"
namespace mindspore {
int BlockingMailBox::EnqueueMessage(std::unique_ptr<mindspore::MessageBase> msg) {
{
std::unique_lock<std::mutex> ulk(lock);
enqueMailBox->emplace_back(std::move(msg));
(void)enqueMailBox->emplace_back(std::move(msg));
}
cond.notify_all();
@ -46,7 +45,7 @@ int NonblockingMailBox::EnqueueMessage(std::unique_ptr<mindspore::MessageBase> m
{
std::unique_lock<std::mutex> ulk(lock);
empty = enqueMailBox->empty();
enqueMailBox->emplace_back(std::move(msg));
(void)enqueMailBox->emplace_back(std::move(msg));
released = this->released_;
}
if (empty && released && notifyHook) {
@ -86,5 +85,4 @@ std::unique_ptr<MessageBase> HQueMailBox::GetMsg() {
std::unique_ptr<MessageBase> msg(mailbox.Dequeue());
return msg;
}
} // namespace mindspore

View File

@ -79,7 +79,7 @@ int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv, const st
return result;
}
AID Spawn(ActorReference actor, bool sharedThread) {
AID Spawn(const ActorReference actor, bool sharedThread) {
if (actor == nullptr) {
MS_LOG(ERROR) << "Actor is nullptr.";
MINDRT_EXIT("Actor is nullptr.");

View File

@ -174,7 +174,7 @@ else()
set(MSLITE_ENABLE_NPU off)
endif()
if(MSLITE_ENABLE_SSE OR MSLITE_ENABLE_AVX OR WIN32)
if(MSLITE_ENABLE_SSE OR MSLITE_ENABLE_AVX OR WIN32 OR MSLITE_ENABLE_ACL)
set(MSLITE_ENABLE_RUNTIME_CONVERT off)
endif()

View File

@ -245,7 +245,7 @@ int AnfTransform::RunParallelPass(const FuncGraphPtr &old_graph, const converter
opt::ParserSplitStrategy(config->parallel_split_config_.parallel_compute_rates_,
config->parallel_split_config_.parallel_devices_, split_mode);
if (split_strategys.empty()) {
MS_LOG(ERROR) << "parse split_strategy error.";
MS_LOG(WARNING) << "No valid split_strategy. Run convert without split";
return RET_OK;
}
opt::Spliter::GetInstance()->RecordGraphInfo(old_graph);