!12494 [MS][LITE]support lite mindrt

From: @zhang_xue_tong
Reviewed-by: 
Signed-off-by:
This commit is contained in:
mindspore-ci-bot 2021-02-25 10:20:28 +08:00 committed by Gitee
commit a5523ffbb4
48 changed files with 480 additions and 1592 deletions

View File

@ -22,10 +22,13 @@
#include <sstream>
#include <string>
#include <string.h>
#include "actor/buserrcode.h"
#ifdef USE_GLOG
#include "utils/log_adapter.h"
#else
#include "common/log_adapter.h"
#endif
namespace mindspore {
#define BUS_LOG(severity) // LOG(severity)
@ -37,7 +40,6 @@ namespace mindspore {
#define ICTSBASE_LOG_COMMON_CODE
#define HLOG_LEVEL_INFO
#define PID_LITEBUS_LOG
//#define BUS_OOM_EXIT
#define HLOG_LEVEL_DEBUG 1
#define ICTSBASE_LOG0(logig, level, pid, format)
#define ICTSBASE_LOG1(logig, level, pid, format, para)

View File

@ -14,6 +14,7 @@
* limitations under the License.
*/
#include <list>
#include <vector>
#include <memory>
#include <string>
@ -21,35 +22,44 @@
#include "actor/actor.h"
#include "async/uuid_base.h"
#include "async/future.h"
#include "async/async.h"
#include "mindrt/include/async/collect.h"
namespace mindspore {
// OpActor data route.
struct OpArrow {
OpArrow(int from_output_index, AID *to_op_id, int to_input_index)
OpArrow(int from_output_index, AID to_op_id, int to_input_index)
: from_output_index_(from_output_index), to_op_id_(to_op_id), to_input_index_(to_input_index) {}
int from_output_index_;
AID *to_op_id_;
AID to_op_id_;
int to_input_index_;
};
// OpActor data.
template <typename T>
struct OpData {
OpData(T *data, int to_input_index) : data_(data), to_input_index_(to_input_index) {}
OpData(const AID &op_id, T *data, int index) : op_id_(op_id), data_(data), index_(index) {}
AID op_id_;
T *data_;
int to_input_index_;
};
// The context of opActor running.
template <typename T>
struct OpContext {
uuids::uuid *sequential_num_;
std::vector<Promise<T *>> *results_;
int index_;
};
using OpArrowPtr = std::shared_ptr<OpArrow>;
template <typename T>
using OpDataPtr = std::shared_ptr<OpData<T>>;
// The context of opActor running.
template <typename T>
struct OpContext {
uuids::uuid *sequential_num_;
std::vector<OpDataPtr<T>> *outputData_;
std::vector<Promise<int>> *results_;
void SetFailed(int32_t code) {
for (auto promise : *results_) {
promise.SetFailed(code);
}
}
void SetResult(size_t index, int value) { results_->at(index).SetValue(value); }
};
template <typename T>
class OpActor : public ActorBase {
@ -62,4 +72,38 @@ class OpActor : public ActorBase {
std::unordered_map<uuids::uuid *, std::vector<OpDataPtr<T>>> input_op_datas_;
std::vector<OpArrowPtr> output_op_arrow_;
};
template <typename T>
Future<std::list<int>> MindrtAsyncRun(const std::vector<OpDataPtr<T>> &inputData, OpContext<T> *context) {
std::list<Future<int>> futures;
for (auto promise : *(context->results_)) {
futures.push_back(promise.GetFuture());
}
Future<std::list<int>> collect = mindspore::Collect<int>(futures);
for (auto data : inputData) {
Async(data->op_id_, &mindspore::OpActor<T>::OpRun, data, context);
}
return collect;
}
template <typename T>
int MindrtRun(const std::vector<OpDataPtr<T>> &inputData, std::vector<OpDataPtr<T>> *outputData) {
OpContext<T> context;
std::vector<Promise<int>> promises(outputData->size());
uuids::uuid uid;
context.sequential_num_ = &uid;
context.results_ = &promises;
context.outputData_ = outputData;
auto collect = MindrtAsyncRun<T>(inputData, &context);
collect.Wait();
if (!collect.IsOK()) {
return -1;
}
return 0;
}
} // namespace mindspore

View File

@ -1,49 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_ASYNCAFTER_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_ASYNCAFTER_H
#include "async/async.h"
#include "timer/timertools.h"
constexpr mindspore::Duration MILLISECONDS = 1;
constexpr mindspore::Duration SECONDS = 1000;
namespace mindspore {
template <typename T>
Timer AsyncAfter(const Duration &duration, const AID &aid, void (T::*method)()) {
return TimerTools::AddTimer(duration, aid, [=]() { Async(aid, method); });
}
template <typename T, typename Arg0, typename Arg1>
Timer AsyncAfter(const Duration &duration, const AID &aid, void (T::*method)(Arg0), Arg1 &&arg) {
return TimerTools::AddTimer(duration, aid, [=]() { Async(aid, method, arg); });
}
template <typename T, typename... Args0, typename... Args1>
Timer AsyncAfter(const Duration &duration, const AID &aid, void (T::*method)(Args0...), Args1 &&... args) {
std::function<void(Args0...)> f([=](Args0... args0) { Async(aid, method, args0...); });
auto handler = std::bind(f, args...);
return TimerTools::AddTimer(duration, aid, [=]() { Async(aid, std::move(handler)); });
}
}; // namespace mindspore
#endif

View File

@ -20,9 +20,11 @@
#include <future>
#include <iostream>
#include <list>
#include <memory>
#include <tuple>
#include "async/common.h"
#include "async/future.h"
#include "async/defer.h"
#include "async/spinlock.h"
#include "actor/actor.h"

View File

@ -16,7 +16,8 @@
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#include <memory>
#include <utility>
#include <future>
#include <iostream>
#include <list>
@ -30,7 +31,7 @@
#include "litebus.hpp"
#include "future_base.h"
#include "async/future_base.h"
namespace mindspore {
@ -114,22 +115,6 @@ class Future : public FutureBase {
return data->t;
}
Option<T> Get(uint64_t timeMs) const {
if (data->gotten) {
return Option<T>(data->t);
}
if (WaitFor(timeMs).IsError()) {
return Option<T>();
}
if (data->status.IsError()) {
return Option<T>();
}
return Option<T>(Get());
}
bool Valid() const noexcept { return data->future.valid(); }
bool IsInit() const { return data->status.IsInit(); }
@ -157,32 +142,6 @@ class Future : public FutureBase {
data->future.wait();
}
WaitForStatus WaitFor(uint64_t timeMs) const {
if (!data->status.IsInit()) {
return Status::KOK;
}
AID aid = mindspore::Spawn(std::make_shared<internal::WaitActor>(
internal::WAIT_ACTOR_NAME + std::to_string(mindspore::localid_generator::GenLocalActorId())));
mindspore::Timer timer = TimerTools::AddTimer(timeMs, aid, std::bind(&internal::Waitf, aid));
OnComplete(std::bind(&internal::Wait, aid, timer));
// block
mindspore::Await(aid);
data->lock.Lock();
bool ret = data->status.IsInit();
data->lock.Unlock();
if (!ret) {
return Status::KOK;
}
return Status::KERROR;
}
template <typename F>
const Future<T> &OnComplete(internal::DeferredHelper<F> &&deferred) const {
return OnComplete(std::move(deferred).operator std::function<void(const Future<T> &)>());
@ -341,19 +300,6 @@ class Future : public FutureBase {
return OnAbandoned(std::forward<F>(f), FutureBase());
}
Future<T> After(const Duration &timeMs, const std::function<Future<T>(const Future<T> &)> &f) const {
std::shared_ptr<Promise<T>> promise(new (std::nothrow) Promise<T>());
BUS_OOM_EXIT(promise);
Future<T> future = promise->GetFuture();
mindspore::Timer timer =
TimerTools::AddTimer(timeMs, "__After__", std::bind(&internal::Afterf<T>, f, promise, *this));
OnComplete(std::bind(&internal::After<T>, promise, timer, std::placeholders::_1));
return future;
}
private:
template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type>
Future<R> Then(internal::DeferredHelper<F> &&f, FutureBase) const {

View File

@ -19,6 +19,9 @@
#include <future>
#include <iostream>
#include <string>
#include <utility>
#include <memory>
#include <list>
#include "actor/actor.h"
@ -26,8 +29,6 @@
#include "async/spinlock.h"
#include "async/status.h"
#include "timer/timertools.h"
namespace mindspore {
template <typename T>
@ -104,15 +105,6 @@ struct FutureData {
namespace internal {
const std::string WAIT_ACTOR_NAME = "WACTOR_";
class WaitActor : public ActorBase {
public:
explicit WaitActor(const std::string &name) : mindspore::ActorBase(name) {}
~WaitActor() override {}
};
template <typename T>
class DeferredHelper;
@ -186,16 +178,8 @@ static void Afterf(const std::function<Future<T>(const Future<T> &)> &f, const s
promise->Associate(f(future));
}
template <typename T>
static void After(const std::shared_ptr<Promise<T>> &promise, const mindspore::Timer &timer, const Future<T> &future) {
(void)mindspore::TimerTools::Cancel(timer);
promise->Associate(future);
}
void Waitf(const AID &aid);
void Wait(const AID &aid, const mindspore::Timer &timer);
} // namespace internal
} // namespace mindspore

View File

@ -26,7 +26,7 @@ namespace mindspore {
template <typename T>
struct InnerSome {
InnerSome(const T &t) : _t(std::move(t)) {}
explicit InnerSome(const T &t) : _t(std::move(t)) {}
T _t;
};
@ -35,7 +35,7 @@ InnerSome<typename std::decay<T>::type> Some(T &&t) {
return InnerSome<typename std::decay<T>::type>(std::forward<T>(t));
}
struct None {};
struct MindrtNone {};
template <typename T>
class Option {
@ -48,7 +48,7 @@ class Option {
Option(const InnerSome<T> &some) : data(some._t), state(SOME) {}
Option(const None &none) : data(), state(NONE) {}
Option(const MindrtNone &none) : data(), state(NONE) {}
Option(const Option<T> &that) : data(), state(that.state) {
if (that.IsSome()) {

View File

@ -1,32 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_DURATION_HPP__
#define __LITEBUS_DURATION_HPP__
#include <deque>
#include <functional>
#include <list>
#include <map>
#include <string>
#include <thread>
#include "async/spinlock.h"
namespace mindspore {
using Duration = uint64_t;
}
#endif

View File

@ -1,48 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_TIMER_HPP__
#define __LITEBUS_TIMER_HPP__
#include "actor/aid.h"
#include "timer/timewatch.h"
namespace mindspore {
class Timer {
public:
Timer();
~Timer();
bool operator==(const Timer &that) const;
// run this timer's thunk.
void operator()() const;
TimeWatch GetTimeWatch() const;
AID GetTimerAID() const;
uint64_t GetTimerID() const;
private:
friend class TimerTools;
Timer(uint64_t timerId, const TimeWatch &timeWatch, const AID &timeAid, const std::function<void()> &handler);
uint64_t id;
TimeWatch t;
AID aid;
std::function<void()> thunk;
};
} // namespace mindspore
#endif

View File

@ -1,39 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_TIMETOOLS_HPP__
#define __LITEBUS_TIMETOOLS_HPP__
#include <atomic>
#include <list>
#include <map>
#include <set>
#include "timer/duration.h"
#include "timer/timer.h"
namespace mindspore {
class TimerTools {
public:
static bool Initialize();
static void Finalize();
static Timer AddTimer(const Duration &duration, const AID &aid, const std::function<void()> &thunk);
static bool Cancel(const Timer &timer);
static std::atomic_bool g_initStatus;
};
} // namespace mindspore
#endif

View File

@ -1,65 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_TIMEWATCH_HPP__
#define __LITEBUS_TIMEWATCH_HPP__
#include "timer/duration.h"
namespace mindspore {
constexpr Duration MICRTONANO = 1000;
constexpr Duration MILLITOMICR = 1000;
constexpr Duration SECTOMILLI = 1000;
class TimeWatch {
public:
TimeWatch();
TimeWatch(const Duration &duration);
TimeWatch(const TimeWatch &that);
~TimeWatch();
// Constructs a Time instance that is the 'duration' from now.
static TimeWatch In(const Duration &duration);
static Duration Now();
TimeWatch &operator=(const TimeWatch &that);
TimeWatch &operator=(const Duration &duration);
bool operator==(const TimeWatch &that) const;
bool operator<(const TimeWatch &that) const;
bool operator<=(const TimeWatch &that) const;
// Returns the value of the timewatch as a Duration object.
Duration Time() const;
// Returns the amount of time remaining.
Duration Remaining() const;
// return true if the time expired.
bool Expired() const;
private:
Duration duration;
};
} // namespace mindspore
#endif

View File

@ -18,7 +18,6 @@
#include "actor/actormgr.h"
#include "actor/actorpolicyinterface.h"
#include "actor/iomgr.h"
#include "utils/log_adapter.h"
namespace mindspore {

View File

@ -17,7 +17,6 @@
#include "actor/actormgr.h"
#include "actor/actorpolicy.h"
#include "actor/iomgr.h"
#include "utils/log_adapter.h"
namespace mindspore {

View File

@ -17,7 +17,6 @@
#include "actor/actor.h"
#include "actor/actormgr.h"
#include "actor/actorpolicy.h"
#include "utils/log_adapter.h"
namespace mindspore {

View File

@ -16,7 +16,6 @@
#include "actor/actorthread.h"
#include <atomic>
#include "utils/log_adapter.h"
namespace mindspore {
constexpr int MAXTHREADNAMELEN = 12;
@ -97,7 +96,6 @@ void ActorThread::Run() {
terminate = true;
MS_LOG(DEBUG) << "Actor this Threads have finished exiting.";
}
} while (!terminate);
}

View File

@ -15,7 +15,6 @@
*/
#include "actor/aid.h"
#include "utils/log_adapter.h"
namespace mindspore {
@ -121,6 +120,9 @@ std::string AID::GetIp() const {
uint16_t AID::GetPort() const {
size_t index = url.rfind(':');
if (index == std::string::npos) {
return 0;
}
return (uint16_t)std::stoul(url.substr(index + 1));
}

View File

@ -1,85 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <actor/sysmgr_actor.h>
#include "actor/actormgr.h"
#include "actor/iomgr.h"
#include "utils/log_adapter.h"
namespace mindspore {
Duration SysMgrActor::linkRecycleDuration = 10000;
void MetricsMessage::PrintMetrics() {
// print sendMetrics by default, in the future we can add more metrics format
std::ostringstream out;
while (!intTypeMetrics.empty()) {
out << intTypeMetrics.front() << "-";
intTypeMetrics.pop();
}
out << "|";
while (!stringTypeMetrics.empty()) {
std::string stringMetric = stringTypeMetrics.front();
if (stringMetric.empty()) {
out << "null"
<< "-";
} else {
out << stringMetric << "-";
}
stringTypeMetrics.pop();
}
MS_LOG(INFO) << "[format:fd-err-sum-size|to-okmsg-failmsg], value:" << out.str().c_str();
}
void SysMgrActor::SendMetricsDurationCallback() {
std::string protocol = "tcp";
std::shared_ptr<mindspore::IOMgr> ioMgrRef = ActorMgr::GetIOMgrRef(protocol);
if (ioMgrRef == nullptr) {
MS_LOG(INFO) << "tcp protocol is not exist.";
} else {
ioMgrRef->CollectMetrics();
}
(void)AsyncAfter(printSendMetricsDuration, GetAID(), &SysMgrActor::SendMetricsDurationCallback);
}
void SysMgrActor::HandleSendMetricsCallback(const AID &from, std::unique_ptr<MetricsMessage> message) {
if (message == nullptr) {
MS_LOG(WARNING) << "Can't transform to MetricsMessage.";
return;
}
message->PrintMetrics();
return;
}
void SysMgrActor::LinkRecycleDurationCallback() {
std::string protocol = "tcp";
std::shared_ptr<mindspore::IOMgr> ioMgrRef = ActorMgr::GetIOMgrRef(protocol);
if (ioMgrRef == nullptr) {
MS_LOG(INFO) << "tcp protocol is not exist.";
} else {
ioMgrRef->LinkRecycleCheck(linkRecyclePeriod);
}
(void)AsyncAfter(linkRecycleDuration, GetAID(), &SysMgrActor::LinkRecycleDurationCallback);
}
} // namespace mindspore

View File

@ -1,93 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_SRC_ACTOR_SYSMGR_ACTOR_H
#define MINDSPORE_CORE_MINDRT_SRC_ACTOR_SYSMGR_ACTOR_H
#include <queue>
#include "async/async.h"
#include "async/asyncafter.h"
#include "actor/actorapp.h"
#include "utils/log_adapter.h"
namespace mindspore {
const std::string SYSMGR_ACTOR_NAME = "SysMgrActor";
const std::string METRICS_SEND_MSGNAME = "SendMetrics";
const int LINK_RECYCLE_PERIOD_MIN = 20;
const int LINK_RECYCLE_PERIOD_MAX = 360;
using IntTypeMetrics = std::queue<int>;
using StringTypeMetrics = std::queue<std::string>;
class MetricsMessage : public MessageBase {
public:
explicit MetricsMessage(const std::string &tfrom, const std::string &tTo, const std::string &tName,
const IntTypeMetrics &tInts = IntTypeMetrics(),
const StringTypeMetrics &tStrings = StringTypeMetrics())
: MessageBase(tfrom, tTo, tName), intTypeMetrics(tInts), stringTypeMetrics(tStrings) {}
~MetricsMessage() override {}
void PrintMetrics();
private:
IntTypeMetrics intTypeMetrics;
StringTypeMetrics stringTypeMetrics;
};
class SysMgrActor : public mindspore::AppActor {
public:
explicit SysMgrActor(const std::string &name, const Duration &duration)
: mindspore::AppActor(name), printSendMetricsDuration(duration) {
linkRecyclePeriod = 0;
}
~SysMgrActor() override {}
protected:
virtual void Init() override {
MS_LOG(INFO) << "Initiaize SysMgrActor";
// register receive handle
Receive("SendMetrics", &SysMgrActor::HandleSendMetricsCallback);
// start sys manager timers
(void)AsyncAfter(printSendMetricsDuration, GetAID(), &SysMgrActor::SendMetricsDurationCallback);
char *linkRecycleEnv = getenv("LITEBUS_LINK_RECYCLE_PERIOD");
if (linkRecycleEnv != nullptr) {
int period = 0;
period = std::stoi(linkRecycleEnv);
if (period >= LINK_RECYCLE_PERIOD_MIN && period <= LINK_RECYCLE_PERIOD_MAX) {
MS_LOG(INFO) << "link recycle set:" << period;
linkRecyclePeriod = period;
(void)AsyncAfter(linkRecycleDuration, GetAID(), &SysMgrActor::LinkRecycleDurationCallback);
}
}
}
private:
void SendMetricsDurationCallback();
void HandleSendMetricsCallback(const AID &from, std::unique_ptr<MetricsMessage> message);
void LinkRecycleDurationCallback();
Duration printSendMetricsDuration;
static Duration linkRecycleDuration;
int linkRecyclePeriod;
};
} // namespace mindspore
#endif

View File

@ -15,7 +15,6 @@
*/
#include "async/future.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace internal {
@ -25,10 +24,5 @@ void Waitf(const AID &aid) {
MS_LOG(WARNING) << "WaitFor is timeout.";
}
void Wait(const AID &aid, const mindspore::Timer &timer) {
mindspore::TimerTools::Cancel(timer);
mindspore::Terminate(aid);
}
} // namespace internal
} // namespace mindspore

View File

@ -13,11 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <random>
#include "async/uuid_base.h"
#include <atomic>
#include "utils/log_adapter.h"
namespace mindspore {
namespace uuids {
@ -41,7 +39,7 @@ std::string uuid::ToBytes(const uuid &u) {
Option<uuid> uuid::FromBytes(const std::string &s) {
if (s.size() != UUID_SIZE) {
return None();
return MindrtNone();
}
uuid u;
memcpy(&u.uuidData, s.data(), s.size());
@ -57,7 +55,7 @@ Option<unsigned char> uuid::GetValue(char c) {
size_t pos = std::find(digitsBegin, digitsEnd, c) - digitsBegin;
if (pos >= digitsLen) {
MS_LOG(ERROR) << "invalid char";
return None();
return MindrtNone();
}
return values[pos];
}
@ -65,7 +63,7 @@ Option<unsigned char> uuid::GetValue(char c) {
Option<uuid> uuid::FromString(const std::string &s) {
auto sBegin = s.begin();
if (sBegin == s.end()) {
return None();
return MindrtNone();
}
auto c = *sBegin;
bool hasOpenBrace = (c == '{');
@ -84,12 +82,12 @@ Option<uuid> uuid::FromString(const std::string &s) {
c = *(sBegin++);
} else {
MS_LOG(ERROR) << "str invalid";
return None();
return MindrtNone();
}
}
Option<unsigned char> oc1 = GetValue(c);
if (oc1.IsNone()) {
return None();
return MindrtNone();
}
u.uuidData[i] = oc1.Get();
if (sBegin != s.end()) {
@ -98,13 +96,13 @@ Option<uuid> uuid::FromString(const std::string &s) {
u.uuidData[i] <<= SHIFT_BIT;
Option<unsigned char> oc2 = GetValue(c);
if (oc2.IsNone()) {
return None();
return MindrtNone();
}
u.uuidData[i] |= oc2.Get();
}
if ((hasOpenBrace && (c != '}')) || (sBegin != s.end())) {
MS_LOG(ERROR) << "No } end or leng invalid";
return None();
return MindrtNone();
}
return u;
}
@ -140,14 +138,14 @@ uuid RandomBasedGenerator::GenerateRandomUuid() {
std::mt19937 gen(rd());
// We use uniform distribution
std::uniform_int_distribution<unsigned long> distribution((std::numeric_limits<unsigned long>::min)(),
(std::numeric_limits<unsigned long>::max)());
std::uniform_int_distribution<uint64_t> distribution((std::numeric_limits<uint64_t>::min)(),
(std::numeric_limits<uint64_t>::max)());
unsigned long randomValue = distribution(gen);
uint64_t randomValue = distribution(gen);
unsigned int i = 0;
for (uint8_t *it = tmpUUID.BeginAddress(); it != tmpUUID.EndAddress(); ++it, ++i) {
if (i == sizeof(unsigned long)) {
if (i == sizeof(uint64_t)) {
randomValue = distribution(gen);
i = 0;
}
@ -156,9 +154,9 @@ uuid RandomBasedGenerator::GenerateRandomUuid() {
}
// use atomic ++ to replace random
static std::atomic<unsigned long> ul(1);
unsigned long lCount = ul.fetch_add(1);
unsigned long offSet = distribution(gen) % RIGHT_SHIFT_BITS;
static std::atomic<uint64_t> ul(1);
uint64_t lCount = ul.fetch_add(1);
uint64_t offSet = distribution(gen) % RIGHT_SHIFT_BITS;
auto ret = memcpy(tmpUUID.BeginAddress() + offSet, &lCount, sizeof(lCount));
if (ret != 0) {
MS_LOG(ERROR) << "memcpy_s error.";

View File

@ -1,3 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/evloop.cc
)

View File

@ -1,387 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <string>
#include <thread>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <csignal>
#include <unistd.h>
#include "actor/buslog.h"
#include "evloop/evloop.h"
#include "utils/log_adapter.h"
namespace mindspore {
int EventLoopRun(EvLoop *evloop, int timeout) {
int nevent = 0;
struct epoll_event *events = nullptr;
(void)sem_post(&evloop->semId);
size_t size = 1;
events = (struct epoll_event *)malloc(size);
if (events == nullptr) {
MS_LOG(ERROR) << "malloc events fail";
return BUS_ERROR;
}
// 1. dest is valid 2. destsz equals to count and both are valid.
// memset_s will always executes successfully.
(void)memset(events, 0, size);
while (!evloop->stopLoop) {
/* free deleted event handlers */
evloop->EventFreeDelEvents();
MS_LOG(DEBUG) << "timeout:" << timeout << ",epoll_fd:" << evloop->efd;
MS_LOG(DEBUG) << "nevent:" << nevent << ",epoll_fd:" << evloop->efd;
if (nevent < 0) {
if (errno != EINTR) {
MS_LOG(ERROR) << "epoll_wait failed]epoll_fd:" << evloop->efd << ",errno:" << errno;
free(events);
return BUS_ERROR;
} else {
continue;
}
} else if (nevent > 0) {
/* save the epoll modify in "stop" while dispatching handlers */
evloop->HandleEvent(events, nevent);
} else {
MS_LOG(ERROR) << "epoll_wait failed]epoll_fd:" << evloop->efd << ",ret:0,errno:" << errno;
evloop->stopLoop = 1;
}
if (evloop->stopLoop) {
/* free deleted event handlers */
evloop->EventFreeDelEvents();
}
}
evloop->stopLoop = 0;
MS_LOG(INFO) << "event epoll loop run end";
free(events);
return BUS_OK;
}
void *EvloopRun(void *arg) {
if (arg == nullptr) {
MS_LOG(ERROR) << "arg is null";
} else {
(void)EventLoopRun((EvLoop *)arg, -1);
}
return nullptr;
}
void QueueReadyCallback(int fd, uint32_t events, void *arg) {
EvLoop *evloop = (EvLoop *)arg;
if (evloop == nullptr) {
MS_LOG(ERROR) << "evloop is null]fd:" << fd << ",events:" << events;
return;
}
uint64_t count;
if (read(evloop->queueEventfd, &count, sizeof(count)) == sizeof(count)) {
// take out functions from the queue
std::queue<std::function<void()>> q;
evloop->queueMutex.lock();
evloop->queue.swap(q);
evloop->queueMutex.unlock();
// invoke functions in the queue
while (!q.empty()) {
q.front()();
q.pop();
}
}
}
void EvLoop::CleanUp() {
if (queueEventfd != -1) {
close(queueEventfd);
queueEventfd = -1;
}
if (efd != -1) {
close(efd);
efd = -1;
}
}
int EvLoop::AddFuncToEvLoop(std::function<void()> &&func) {
// put func to the queue
queueMutex.lock();
queue.emplace(std::move(func));
// return the queque size to send's caller.
int result = queue.size();
queueMutex.unlock();
if (result == 1) {
// wakeup event loop
uint64_t one = 1;
if (write(queueEventfd, &one, sizeof(one)) != sizeof(one)) {
MS_LOG(WARNING) << "fail to write queueEventfd]fd:" << queueEventfd << ",errno:" << errno;
}
}
return result;
}
bool EvLoop::Init(const std::string &threadName) {
int retval = EventLoopCreate();
if (retval != BUS_OK) {
return false;
}
(void)sem_init(&semId, 0, 0);
if (pthread_create(&loopThread, nullptr, EvloopRun, (void *)this) != 0) {
MS_LOG(ERROR) << "pthread_create fail";
Finish();
return false;
}
// wait EvloopRun
(void)sem_wait(&semId);
#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 12
std::string name = threadName;
if (name.empty()) {
name = "EventLoopThread";
}
retval = pthread_setname_np(loopThread, name.c_str());
if (retval != 0) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "set pthread name fail]%s",
"name:%s,retval:%d", name.c_str(), retval);
} else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "set pthread name success]%s",
"name:%s,loopThread:%lu", name.c_str(), loopThread);
}
#endif
return true;
}
void EvLoop::Finish() {
if (loopThread) {
void *threadResult = nullptr;
StopEventLoop();
int ret = pthread_join(loopThread, &threadResult);
if (ret != 0) {
MS_LOG(INFO) << "pthread_join loopThread fail";
}
loopThread = 0;
}
EventLoopDestroy();
MS_LOG(INFO) << "stop loop succ";
}
EvLoop::~EvLoop() { Finish(); }
void EvLoop::DeleteEvent(int fd) {
auto iter = events.find(fd);
if (iter == events.end()) {
MS_LOG(DEBUG) << "not found event]fd:" << fd;
return;
}
MS_LOG(DEBUG) << "erase event]fd:%d" << fd;
EventData *eventData = iter->second;
if (eventData != nullptr) {
delete eventData;
}
events.erase(fd);
}
EventData *EvLoop::FindEvent(int fd) {
auto iter = events.find(fd);
if (iter == events.end()) {
return nullptr;
}
return iter->second;
}
void EvLoop::AddEvent(EventData *eventData) {
if (!eventData) {
return;
}
DeleteEvent(eventData->fd);
events.emplace(eventData->fd, eventData);
}
int EvLoop::EventLoopCreate(void) { return BUS_OK; }
int EvLoop::AddFdEvent(int fd, uint32_t tEvents, EventHandler handler, void *data) { return BUS_OK; }
int EvLoop::DelFdEvent(int fd) {
EventData *tev = nullptr;
eventsLock.lock();
tev = FindEvent(fd);
if (tev == nullptr) {
eventsLock.unlock();
MS_LOG(DEBUG) << "event search fail]fd:" << fd << ",epollfd:" << efd;
return BUS_ERROR;
}
events.erase(tev->fd);
// Don't delete tev immediately, let's push it into deletedEvents, before next epoll_wait,we will free
// all events in deletedEvents.
AddDeletedEvents(tev);
eventsLock.unlock();
return BUS_OK;
}
int EvLoop::ModifyFdEvent(int fd, uint32_t tEvents) {
struct epoll_event ev;
EventData *tev = nullptr;
tev = FindEvent(fd);
if (tev == nullptr) {
MS_LOG(ERROR) << "event lookup fail]fd:" << fd << ",events:" << tEvents;
return BUS_ERROR;
}
// 1. dest is valid 2. destsz equals to count and both are valid.
// memset_s will always executes successfully.
(void)memset(&ev, 0, sizeof(ev));
ev.events = tEvents;
ev.data.ptr = tev;
return BUS_OK;
}
void EvLoop::AddDeletedEvents(EventData *eventData) {
// caller need check eventData is not nullptr
std::list<EventData *> deleteEventList;
// if fd not found, push eventData into deletedEvents[fd]
std::map<int, std::list<EventData *>>::iterator fdIter = deletedEvents.find(eventData->fd);
if (fdIter == deletedEvents.end()) {
deletedEvents[eventData->fd].push_back(eventData);
return;
}
// if fd found, check if same eventData ptr exists
deleteEventList = fdIter->second;
std::list<EventData *>::iterator eventIter = deleteEventList.begin();
bool found = false;
while (eventIter != deleteEventList.end()) {
if (*eventIter == eventData) {
MS_LOG(WARNING) << "fd has been deleted before]fd:" << eventData->fd << ",efd:" << efd;
found = true;
break;
}
++eventIter;
}
// if found same eventData ptr, do nothing
if (found) {
return;
}
deletedEvents[eventData->fd].push_back(eventData);
return;
}
void EvLoop::EventFreeDelEvents() {
std::map<int, std::list<EventData *>>::iterator fdIter = deletedEvents.begin();
while (fdIter != deletedEvents.end()) {
std::list<EventData *> deleteEventList = fdIter->second;
std::list<EventData *>::iterator eventIter = deleteEventList.begin();
while (eventIter != deleteEventList.end()) {
EventData *deleteEv = *eventIter;
delete deleteEv;
deleteEv = nullptr;
++eventIter;
}
deletedEvents.erase(fdIter++);
}
deletedEvents.clear();
}
int EvLoop::FindDeletedEvent(const EventData *tev) {
std::map<int, std::list<EventData *>>::iterator fdIter = deletedEvents.find(tev->fd);
if (fdIter == deletedEvents.end()) {
return 0;
}
std::list<EventData *> deleteEventList = fdIter->second;
std::list<EventData *>::iterator eventIter = deleteEventList.begin();
while (eventIter != deleteEventList.end()) {
if (*eventIter == tev) {
return 1;
}
++eventIter;
}
return 0;
}
void EvLoop::HandleEvent(const struct epoll_event *tEvents, int nevent) {
int i;
int found;
EventData *tev = nullptr;
for (i = 0; i < nevent; i++) {
tev = reinterpret_cast<EventData *>(tEvents[i].data.ptr);
if (tev != nullptr) {
found = FindDeletedEvent(tev);
if (found) {
MS_LOG(WARNING) << "fd has been deleted from epoll]fd:" << tev->fd << ",efd:" << efd;
continue;
}
tev->handler(tev->fd, tEvents[i].events, tev->data);
}
}
}
void EvLoop::StopEventLoop() {
if (stopLoop == 1) {
return;
}
stopLoop = 1;
uint64_t one = 1;
if (write(queueEventfd, &one, sizeof(one)) != sizeof(one)) {
MS_LOG(WARNING) << "fail to write queueEventfd]fd:" << queueEventfd << ",errno:" << errno;
}
return;
}
void EvLoop::EventLoopDestroy() {
/* free deleted event handlers */
EventFreeDelEvents();
if (efd > 0) {
if (queueEventfd > 0) {
(void)DelFdEvent(queueEventfd);
close(queueEventfd);
queueEventfd = -1;
}
close(efd);
efd = -1;
}
}
} // namespace mindspore

View File

@ -1,109 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_EVLOOP_H__
#define __LITEBUS_EVLOOP_H__
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <functional>
#include <list>
#include <mutex>
#include <queue>
#include <sys/eventfd.h>
#include <semaphore.h>
#include "timer/duration.h"
namespace mindspore {
/*
* max epoll set size
*/
constexpr auto EPOLL_SIZE = 4096;
/*
* epoll event max size
*/
constexpr auto EPOLL_EVENTS_SIZE = 64;
typedef void (*EventHandler)(int fd, uint32_t events, void *data);
typedef struct EventData {
EventHandler handler;
void *data;
int fd;
} EventData;
class EvLoop {
public:
EvLoop() {
efd = -1;
stopLoop = 0;
queueEventfd = -1;
loopThread = 0;
};
EvLoop(const EvLoop &) = delete;
EvLoop &operator=(const EvLoop &) = delete;
bool Init(const std::string &threadName);
int AddFuncToEvLoop(std::function<void()> &&func);
int AddFdEvent(int fd, uint32_t events, EventHandler handler, void *data);
int ModifyFdEvent(int fd, uint32_t events);
int DelFdEvent(int fd);
void Finish();
~EvLoop();
int EventLoopCreate(void);
void StopEventLoop();
void EventLoopDestroy();
void EventFreeDelEvents();
void AddDeletedEvents(EventData *eventData);
int FindDeletedEvent(const EventData *tev);
void HandleEvent(const struct epoll_event *events, int nevent);
void DeleteEvent(int fd);
EventData *FindEvent(int fd);
void AddEvent(EventData *eventData);
void CleanUp();
int efd;
int stopLoop;
std::mutex loopMutex;
sem_t semId;
pthread_t loopThread;
int queueEventfd;
std::mutex queueMutex;
std::queue<std::function<void()>> queue;
std::mutex eventsLock;
// fd,EventData
std::map<int, EventData *> events;
// Just to be safe, let's use a list to preserve deleted events rather than a map. Because the caller may
// delete events on the same fd twice in once epoll_wait
std::map<int, std::list<EventData *>> deletedEvents;
};
} // namespace mindspore
#endif

View File

@ -15,16 +15,11 @@
*/
#include <cstdlib>
#include "mindrt/src/actor/sysmgr_actor.h"
#include <atomic>
#include "mindrt/src/actor/actormgr.h"
#include "mindrt/src/actor/iomgr.h"
//#include "utils/os_utils.hpp"
#include "litebus.hpp"
#include "timer/timertools.h"
#include "litebus.h"
#include "include/litebus.h"
extern "C" {
int LitebusInitializeC(const struct LitebusConfig *config) {
@ -48,12 +43,6 @@ int LitebusInitializeC(const struct LitebusConfig *config) {
void LitebusFinalizeC() { mindspore::Finalize(); }
}
constexpr auto LITEBUSTHREADMIN = 3;
constexpr auto LITEBUSTHREADMAX = 100;
constexpr auto LITEBUSTHREADS = 10;
constexpr auto SYSMGR_TIMER_DURATION = 600000;
namespace mindspore {
namespace local {
@ -68,62 +57,7 @@ const LitebusAddress &GetLitebusAddress() {
return *local::g_litebusAddress;
}
bool SetServerIo(std::shared_ptr<mindspore::IOMgr> &io, std::string &advertiseUrl, const std::string &protocol,
const std::string &url) {
#if 0
if (protocol == "tcp") {
size_t index = advertiseUrl.find("://");
if (index != std::string::npos) {
advertiseUrl = advertiseUrl.substr(index + URL_PROTOCOL_IP_SEPARATOR.size());
}
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "create tcp iomgr. (%s)",
"Url=%s,advertiseUrl=%s", url.c_str(), advertiseUrl.c_str());
if (local::g_litebusAddress == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
"Couldn't allocate memory for LitebusAddress");
return false;
}
local::g_litebusAddress->scheme = protocol;
local::g_litebusAddress->ip = AID("test@" + advertiseUrl).GetIp();
local::g_litebusAddress->port = AID("test@" + advertiseUrl).GetPort();
#ifdef HTTP_ENABLED
mindspore::HttpIOMgr::EnableHttp();
#endif
io.reset(new (std::nothrow) mindspore::TCPMgr());
if (io == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
"Couldn't allocate memory for TCPMgr");
return false;
}
}
#ifdef UDP_ENABLED
else if (protocol == "udp") {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "create udp iomgr. (%s)",
"Url=%s,advertiseUrl=%s", url.c_str(), advertiseUrl.c_str());
io.reset(new (std::nothrow) mindspore::UDPMgr());
if (io == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
"Couldn't allocate memory for UDPMgr");
return false;
}
}
#endif
else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "unsupported protocol. (%s)",
"%s", protocol.c_str());
return false;
}
#endif
return true;
}
void SetThreadCount(int threadCount) {
int tmpThreadCount = LITEBUSTHREADS;
ActorMgr::GetActorMgrRef()->Initialize(tmpThreadCount);
}
void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); }
class LiteBusExit {
public:
@ -141,16 +75,9 @@ int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts ......");
signal(SIGPIPE, SIG_IGN);
if (!TimerTools::Initialize()) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Failed to initialize timer tools");
return BUS_ERROR;
}
// start actor's thread
SetThreadCount(threadCount);
mindspore::Spawn(std::make_shared<SysMgrActor>(SYSMGR_ACTOR_NAME, SYSMGR_TIMER_DURATION));
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has started.");
return BUS_OK;
}
@ -205,7 +132,6 @@ void Finalize() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts to finalize.");
mindspore::ActorMgr::GetActorMgrRef()->Finalize();
TimerTools::Finalize();
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has been finalized.");
// flush the log in cache to disk before exiting.

View File

@ -1,5 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/timer.cc
${CMAKE_CURRENT_SOURCE_DIR}/timertools.cc
${CMAKE_CURRENT_SOURCE_DIR}/timewatch.cc
)

View File

@ -1,38 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "timer/timer.h"
namespace mindspore {
Timer::Timer() : id(0), t(TimeWatch()), aid(AID()), thunk(&abort) {}
Timer::~Timer() {}
bool Timer::operator==(const Timer &that) const { return id == that.id; }
void Timer::operator()() const { thunk(); }
TimeWatch Timer::GetTimeWatch() const { return t; }
AID Timer::GetTimerAID() const { return aid; }
uint64_t Timer::GetTimerID() const { return id; }
Timer::Timer(uint64_t timerId, const TimeWatch &timeWatch, const AID &timeAid, const std::function<void()> &handler)
: id(timerId), t(timeWatch), aid(timeAid), thunk(handler) {}
} // namespace mindspore

View File

@ -1,347 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "timer/timertools.h"
#include <csignal>
#include <ctime>
#include <unistd.h>
#include <sys/timerfd.h>
#include "evloop/evloop.h"
namespace mindspore {
using TimerPoolType = std::map<Duration, std::list<Timer>>;
static std::unique_ptr<TimerPoolType> g_timerPool;
static std::unique_ptr<EvLoop> g_timerEvLoop;
static Duration g_ticks(0);
static int g_runTimerFD(-1);
static int g_watchTimerFD(-1);
static SpinLock g_timersLock;
std::atomic_bool TimerTools::g_initStatus(false);
constexpr Duration SCAN_TIMERPOOL_DELAY = 30;
constexpr Duration WATCH_INTERVAL = 20;
constexpr unsigned int TIMER_LOG_INTERVAL = 6;
const static std::string TIMER_EVLOOP_THREADNAME = "HARES_LB_TMer";
namespace timer {
void ScanTimerPool(int fd, uint32_t events, void *data);
Duration NextTick(const std::map<Duration, std::list<Timer>> &timerPool) {
if (!timerPool.empty()) {
Duration first = timerPool.begin()->first;
return first;
}
return 0;
}
void ExecTimers(const std::list<Timer> &timers) {
for (const auto &timer : timers) {
timer();
}
}
void CreateTimerToLoop(const Duration &delay, const Duration &nextTick) {
if (g_runTimerFD == -1) {
g_runTimerFD = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (g_runTimerFD >= 0) {
int retval = g_timerEvLoop->AddFdEvent(g_runTimerFD, EPOLLIN, ScanTimerPool, nullptr);
if (retval != BUS_OK) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "add run timer event fail]%s",
"ID:%d", g_runTimerFD);
close(g_runTimerFD);
g_runTimerFD = -1;
return;
}
} else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "create run timer fd fail]%s",
"ID:%d", g_runTimerFD);
g_runTimerFD = -1;
return;
}
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "create run timer fd success]%s",
"ID:%d", g_runTimerFD);
}
struct itimerspec it;
it.it_interval.tv_sec = 0;
it.it_interval.tv_nsec = 0;
it.it_value.tv_sec = delay / SECTOMILLI;
it.it_value.tv_nsec = (delay % SECTOMILLI) * MILLITOMICR * MICRTONANO;
if (timerfd_settime(g_runTimerFD, 0, &it, nullptr) == -1) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "start run timer fail]%s", "ID:%d",
g_runTimerFD);
close(g_runTimerFD);
g_runTimerFD = -1;
return;
}
}
void ScheduleTick(const std::map<Duration, std::list<Timer>> &timerPool) {
Duration nextTick = NextTick(timerPool);
if (nextTick != 0) {
// 'tick' scheduled for an earlier time, not schedule current 'tick'
if ((g_ticks == 0) || (nextTick < g_ticks)) {
Duration nowTime = TimeWatch::Now();
Duration delay = 0;
if (nextTick > nowTime) {
delay = nextTick - nowTime;
g_ticks = nextTick;
CreateTimerToLoop(delay, nextTick);
} else {
delay = SCAN_TIMERPOOL_DELAY;
g_ticks = delay + nowTime;
CreateTimerToLoop(delay, nextTick);
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_DEBUG, PID_LITEBUS_LOG,
"run timer immediately {nextTick, now time}= %s ", "{%" PRIu64 ", %" PRIu64 "}", nextTick,
nowTime);
}
}
}
}
// select timeout timers
void ScanTimerPool(int fd, uint32_t events, void *data) {
std::list<Timer> outTimer;
uint64_t count;
if ((g_runTimerFD != fd) || !(events & EPOLLIN)) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG,
"run timer fd or events err {g_runTimerFD, fd, events}= %s ", "{%d, %d, %u}", g_runTimerFD, fd,
events);
return;
}
if (read(fd, &count, sizeof(uint64_t)) < 0) {
return;
}
g_timersLock.Lock();
Duration now = TimeWatch::Now();
auto it = g_timerPool->begin();
while (it != g_timerPool->end()) {
if (it->first > now) {
break;
}
outTimer.splice(outTimer.end(), (*g_timerPool)[it->first]);
++it;
}
// delete timed out timer
(void)g_timerPool->erase(g_timerPool->begin(), g_timerPool->upper_bound(now));
g_ticks = 0;
ScheduleTick(*g_timerPool);
g_timersLock.Unlock();
ExecTimers(outTimer);
outTimer.clear();
}
void CheckPassedTimer(int fd, uint32_t events, void *data) {
std::list<Timer> passTimer;
static unsigned long watchTimes = 0;
uint64_t count;
if ((g_watchTimerFD != fd) || !(events & EPOLLIN)) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG,
"check timer fd or events err {g_watchTimerFD, fd, events}= %s ", "{%d, %d, %u}",
g_watchTimerFD, fd, events);
return;
}
if (read(fd, &count, sizeof(uint64_t)) < 0) {
return;
}
g_timersLock.Lock();
Duration now = TimeWatch::Now();
++watchTimes;
for (auto it = g_timerPool->begin(); it != g_timerPool->end(); ++it) {
if (it->first > now) {
break;
}
passTimer.splice(passTimer.end(), (*g_timerPool)[it->first]);
}
// delete passed timer
if (passTimer.size() > 0) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_DEBUG, PID_LITEBUS_LOG,
"fire pass timer {pass size, now, g_ticks}= %s ", "{%zd, %" PRIu64 ", %" PRIu64 "}",
passTimer.size(), now, g_ticks);
}
(void)g_timerPool->erase(g_timerPool->begin(), g_timerPool->upper_bound(now));
if (g_ticks <= now) {
g_ticks = 0;
}
if (g_timerPool->size() > 0) {
if ((watchTimes % TIMER_LOG_INTERVAL == 0) && (passTimer.size() > 0) &&
(now - g_timerPool->begin()->first > SECTOMILLI)) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG,
"timer info {pool size, pass size, now, g_ticks, poolTick, watchTimes}= %s ",
"{%zd, %zd, %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %lu}", g_timerPool->size(),
passTimer.size(), now, g_ticks, g_timerPool->begin()->first, watchTimes);
}
}
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_DEBUG, PID_LITEBUS_LOG,
"timer info {pool size, pass size, now, g_ticks, poolTick, watchTimes}= %s ",
"{%zd, %zd, %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %lu}", g_timerPool->size(), passTimer.size(),
now, g_ticks, g_timerPool->begin()->first, watchTimes);
ScheduleTick(*g_timerPool);
g_timersLock.Unlock();
ExecTimers(passTimer);
passTimer.clear();
}
bool StartWatchTimer() {
// create watch timer
g_watchTimerFD = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (g_watchTimerFD >= 0) {
int retval = g_timerEvLoop->AddFdEvent(g_watchTimerFD, EPOLLIN, CheckPassedTimer, nullptr);
if (retval != BUS_OK) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "add watch timer event fail]%s",
"ID:%d", g_watchTimerFD);
close(g_watchTimerFD);
g_watchTimerFD = -1;
return false;
}
} else {
g_watchTimerFD = -1;
return false;
}
// start watch timer
struct itimerspec it;
it.it_interval.tv_sec = WATCH_INTERVAL;
it.it_interval.tv_nsec = 0;
it.it_value.tv_sec = WATCH_INTERVAL;
it.it_value.tv_nsec = 0;
if (timerfd_settime(g_watchTimerFD, 0, &it, nullptr) == -1) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "start watch timer fail]%s",
"ID:%d", g_watchTimerFD);
close(g_watchTimerFD);
g_watchTimerFD = -1;
return false;
}
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG,
"start watch timer success, {id}= %s ", "{%d}", g_watchTimerFD);
return true;
}
} // namespace timer
bool TimerTools::Initialize() {
bool ret = true;
g_timersLock.Lock();
g_timerPool.reset(new (std::nothrow) TimerPoolType());
if (g_timerPool == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "timer pool new failed.");
g_timersLock.Unlock();
return false;
}
g_timerEvLoop.reset(new (std::nothrow) EvLoop());
if (g_timerEvLoop == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "ev new failed.");
g_timersLock.Unlock();
return false;
}
bool ok = g_timerEvLoop->Init(TIMER_EVLOOP_THREADNAME);
if (!ok) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "ev init failed.");
g_timerEvLoop = nullptr;
g_timersLock.Unlock();
return false;
}
ret = timer::StartWatchTimer();
g_timersLock.Unlock();
g_initStatus.store(true);
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "Timer init succ.");
return ret;
}
void TimerTools::Finalize() {
if (g_initStatus.load() == false) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "no need for Timer Finalize.");
return;
}
g_initStatus.store(false);
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "Timer Finalize.");
g_timersLock.Lock();
if (g_runTimerFD >= 0) {
close(g_runTimerFD);
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "run timer close {ID}=%s", "{%d}",
g_runTimerFD);
g_runTimerFD = -1;
}
if (g_watchTimerFD >= 0) {
close(g_watchTimerFD);
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "watch timer close {ID}=%s", "{%d}",
g_watchTimerFD);
g_watchTimerFD = -1;
}
g_timersLock.Unlock();
}
Timer TimerTools::AddTimer(const Duration &duration, const AID &aid, const std::function<void()> &thunk) {
if (g_initStatus.load() == false) {
return Timer();
}
if (duration == 0) {
thunk();
return Timer();
}
static std::atomic<uint64_t> id(1);
TimeWatch timeWatch = TimeWatch::In(duration);
Timer timer(id.fetch_add(1), timeWatch, aid, thunk);
// Add the timer to timerpoll and Schedule it
g_timersLock.Lock();
if (g_timerPool->size() == 0 || timer.GetTimeWatch().Time() < g_timerPool->begin()->first) {
(*g_timerPool)[timer.GetTimeWatch().Time()].push_back(timer);
timer::ScheduleTick(*g_timerPool);
} else {
if (!(g_timerPool->size() >= 1)) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
"g_timerPool size invalid {size}=%s", "{%zd}", g_timerPool->size());
}
(*g_timerPool)[timer.GetTimeWatch().Time()].push_back(timer);
}
g_timersLock.Unlock();
return timer;
}
bool TimerTools::Cancel(const Timer &timer) {
if (g_initStatus.load() == false) {
return false;
}
bool canceled = false;
g_timersLock.Lock();
Duration duration = timer.GetTimeWatch().Time();
if (g_timerPool->count(duration) > 0) {
canceled = true;
(*g_timerPool)[duration].remove(timer);
if ((*g_timerPool)[duration].empty()) {
(void)(g_timerPool->erase(duration));
}
}
g_timersLock.Unlock();
return canceled;
}
} // namespace mindspore

View File

@ -1,66 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "timer/timewatch.h"
namespace mindspore {
TimeWatch::TimeWatch() : duration(Now()) {}
TimeWatch::TimeWatch(const Duration &d) : duration(d) {}
TimeWatch::TimeWatch(const TimeWatch &that) : duration(that.duration) {}
TimeWatch::~TimeWatch() {}
Duration TimeWatch::Now() {
struct timespec ts = {0, 0};
(void)clock_gettime(CLOCK_MONOTONIC, &ts);
Duration duration = ts.tv_sec * SECTOMILLI + (ts.tv_nsec / MICRTONANO) / MILLITOMICR;
return duration;
}
TimeWatch TimeWatch::In(const Duration &duration) { return TimeWatch(Now() + duration); }
TimeWatch &TimeWatch::operator=(const TimeWatch &that) {
if (&that != this) {
duration = that.duration;
}
return *this;
}
TimeWatch &TimeWatch::operator=(const Duration &d) {
duration = Now() + d;
return *this;
}
bool TimeWatch::operator==(const TimeWatch &that) const { return duration == that.duration; }
bool TimeWatch::operator<(const TimeWatch &that) const { return duration < that.duration; }
bool TimeWatch::operator<=(const TimeWatch &that) const { return duration <= that.duration; }
Duration TimeWatch::Time() const { return duration; }
Duration TimeWatch::Remaining() const {
Duration nowTime = Now();
return duration > nowTime ? (duration - nowTime) : 0;
}
bool TimeWatch::Expired() const { return duration <= Now(); }
} // namespace mindspore

View File

@ -26,6 +26,7 @@ option(BUILD_MINDDATA_EXAMPLE "" on)
option(ENABLE_VERBOSE "" off)
option(ENABLE_SSE "if x86_64 support SSE instruction set" off)
option(ENABLE_AVX "if x86_64 support SSE instruction set" off)
#option(ENABLE_MINDRT "if support mindrt" on)
set(DIR_PREFIX mindspore-lite)
set(MS_VERSION ${MS_VERSION_MAJOR}.${MS_VERSION_MINOR}.${MS_VERSION_REVISION})
@ -55,6 +56,7 @@ else()
set(PROCESS_UNIT cpu)
endif()
if(SUPPORT_NPU)
set(DDK_PATH "$ENV{HWHIAI_DDK}/ddk/ai_ddk_lib")
if(PLATFORM_ARM64)
@ -115,6 +117,7 @@ include_directories(${CORE_DIR}/ir)
include_directories(${CCSRC_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/src/runtime/kernel/arm)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/src/)
include_directories(${TOP_DIR}/third_party)
include_directories(${CMAKE_BINARY_DIR})
include_directories(${CCSRC_DIR}/minddata/dataset/liteapi)
@ -134,6 +137,11 @@ if(ENABLE_CONVERTER OR BUILD_MINDDATA STREQUAL "full" OR BUILD_MINDDATA STREQUAL
include(${TOP_DIR}/cmake/external_libs/json.cmake)
endif()
set(ENABLE_MINDRT "off")
if(PLATFORM_ARM AND NOT SUPPORT_TRAIN)
set(ENABLE_MINDRT "on")
endif()
file(GLOB FBS_FILES ${CMAKE_CURRENT_SOURCE_DIR}/schema/*.fbs)
ms_build_flatbuffers_lite(FBS_FILES ${CMAKE_CURRENT_SOURCE_DIR}/schema/ fbs_src ${CMAKE_BINARY_DIR}/schema "")
ms_build_flatbuffers_lite(FBS_FILES ${CMAKE_CURRENT_SOURCE_DIR}/schema/ fbs_inner_src ${CMAKE_BINARY_DIR}/schema/inner
@ -200,6 +208,12 @@ if(WIN32)
add_compile_definitions(BUILDING_DLL)
endif()
if(ENABLE_MINDRT)
include_directories(${CORE_DIR}/mindrt/include)
add_compile_definitions(ENABLE_MINDRT)
add_subdirectory(${CORE_DIR}/mindrt mindspore_mindrt)
endif()
if(ENABLE_CONVERTER)
if(PLATFORM_ARM)
MESSAGE(FATAL_ERROR "Cannot build converter in arm platform")

View File

@ -106,6 +106,14 @@ if(SUPPORT_TRAIN)
)
endif()
if(ENABLE_MINDRT)
set(LITE_SRC
${LITE_SRC}
${CMAKE_CURRENT_SOURCE_DIR}/lite_mindrt.cc
${CMAKE_CURRENT_SOURCE_DIR}/mindrt_executor.cc
)
endif()
add_subdirectory(ops)
add_subdirectory(runtime/kernel/arm)
@ -119,6 +127,12 @@ set_target_properties(mindspore-lite_static PROPERTIES OUTPUT_NAME "mindspore-li
set_target_properties(mindspore-lite_static PROPERTIES CLEAN_DIRECT_OUTPUT 1)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-private-field")
if(ENABLE_MINDRT)
target_link_libraries(mindspore-lite mindrt_mid)
target_link_libraries(mindspore-lite_static mindrt_mid)
endif()
if(SUPPORT_GPU STREQUAL opencl)
add_subdirectory(runtime/kernel/opencl)
target_link_libraries(mindspore-lite cpu_kernel_mid opencl_kernel_mid nnacl cpu_ops_mid)

View File

@ -40,8 +40,8 @@ int Executor::CheckInputs(const std::vector<Tensor *> &in_tensors) {
return RET_OK;
}
int Executor::Run(std::vector<Tensor *> &in_tensors, std::vector<Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator, const KernelCallBack &before,
int Executor::Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator, const KernelCallBack &before,
const KernelCallBack &after) {
MS_ASSERT(nullptr != allocator);
auto ret = this->CheckInputs(in_tensors);
@ -89,9 +89,9 @@ int Executor::Run(std::vector<Tensor *> &in_tensors, std::vector<Tensor *> &out_
return RET_OK;
}
int CpuExecutor::Run(std::vector<Tensor *> &in_tensors, std::vector<Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator, const KernelCallBack &before,
const KernelCallBack &after) {
int CpuExecutor::Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator,
const KernelCallBack &before, const KernelCallBack &after) {
MS_ASSERT(nullptr != allocator);
// not check input for merge. too hard
if (kernels.front()->Type() != schema::PrimitiveType_Merge) {

View File

@ -30,8 +30,8 @@ class Executor {
virtual int Prepare(const std::vector<kernel::LiteKernel *> &kernels) { return RET_OK; }
virtual int Run(std::vector<Tensor *> &in_tensors, std::vector<Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
virtual int Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const KernelCallBack &before = nullptr, const KernelCallBack &after = nullptr);
protected:
@ -43,8 +43,8 @@ class CpuExecutor : public Executor {
CpuExecutor() = default;
virtual ~CpuExecutor() = default;
int Run(std::vector<Tensor *> &in_tensors, std::vector<Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
int Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const KernelCallBack &before = nullptr, const KernelCallBack &after = nullptr) override;
};

View File

@ -378,7 +378,7 @@ int LiteKernelUtil::TopologicalSortKernels(std::vector<kernel::LiteKernel *> *ke
return RET_OK;
}
void LiteKernelUtil::InitTensorInitRefCount(std::vector<kernel::LiteKernel *> &kernels) {
void LiteKernelUtil::InitTensorInitRefCount(const std::vector<kernel::LiteKernel *> &kernels) {
for (auto *kernel : kernels) {
kernel->InitOutTensorInitRefCount();
}

View File

@ -197,7 +197,7 @@ class LiteKernel {
std::vector<LiteKernel *> in_kernels_;
std::vector<LiteKernel *> out_kernels_;
bool train_mode_ = false;
bool trainable_ = false; // paramaters of this Kernel are trained in Train Session
bool trainable_ = false; // parameters of this Kernel are trained in Train Session
bool is_model_output_ = false;
SubGraphType subgraph_type_ = kNotSubGraph;
#ifdef SUPPORT_TRAIN
@ -223,7 +223,7 @@ class LiteKernelUtil {
static int TopologicalSortKernels(std::vector<kernel::LiteKernel *> *kernels);
static void InitTensorInitRefCount(std::vector<kernel::LiteKernel *> &kernels);
static void InitTensorInitRefCount(const std::vector<kernel::LiteKernel *> &kernels);
static int SetInput(LiteKernel &kernelMod, const std::vector<lite::Tensor *> &inputs);
};

View File

@ -0,0 +1,80 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <utility>
#include "src/lite_mindrt.h"
#include "mindrt/include/litebus.hpp"
namespace mindspore::lite {
int LiteOpActor::CompileArrow() {
int outTensorSize = static_cast<int>(kernel_->out_tensors().size());
for (int i = 0; i < outTensorSize; i++) {
for (auto out : kernel_->out_kernels()) {
int inTensorSize = static_cast<int>(out->in_tensors().size());
int to_input_index = -1;
for (int j = 0; j < inTensorSize; j++) {
if (kernel_->out_tensors()[i] == out->in_tensors()[j]) {
to_input_index = j;
break;
}
}
if (to_input_index == -1) {
break;
}
auto id = out->name() + this->GetAID().Url();
auto arrow = std::make_shared<OpArrow>(i, id, to_input_index);
if (arrow == nullptr) {
MS_LOG(ERROR) << "create OpArrow failed, out kernel: " << out->name();
return RET_ERROR;
}
output_op_arrow_.emplace_back(std::move(arrow));
break;
}
}
return RET_OK;
}
void LiteOpActor::SetOutputData(OpContext<Tensor> *context) {
auto size = context->outputData_->size();
MS_ASSERT(size == context->results_->size());
for (size_t i = 0; i < size; i++) {
auto outputData = context->outputData_->at(i);
if (GetAID() == outputData->op_id_) {
outputData->data_ = kernel_->out_tensors()[outputData->index_];
context->SetResult(i, RET_OK);
}
}
}
int MindrtInit() { return mindspore::Initialize("tcp://127.0.0.1:8080", "", "", "", 1); }
std::vector<std::shared_ptr<LiteOpActor>> CreateOpActor(const std::vector<kernel::LiteKernel *> &kernels) {
std::vector<std::shared_ptr<LiteOpActor>> actors;
for (auto kernel : kernels) {
auto actor = std::make_shared<LiteOpActor>(kernel);
if (actor == nullptr) {
MS_LOG(ERROR) << "create LiteOpActor failed: " << kernel->name();
actors.clear();
return actors;
}
auto aid = mindspore::Spawn(actor);
actors.push_back(actor);
}
return actors;
}
} // namespace mindspore::lite

View File

@ -0,0 +1,92 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_LITE_SRC_LITE_MINDRT_H_
#define MINDSPORE_LITE_SRC_LITE_MINDRT_H_
#include <vector>
#include <memory>
#include <string>
#include <unordered_map>
#include "actor/op_actor.h"
#include "src/lite_kernel.h"
#include "actor/actor.h"
#include "async/uuid_base.h"
#include "async/future.h"
namespace mindspore {
namespace lite {
typedef enum { GRAPH, OP_BY_OP } MindRTMode;
class LiteOpActor : public OpActor<lite::Tensor> {
public:
explicit LiteOpActor(kernel::LiteKernel *kernel) : OpActor<lite::Tensor>(kernel->name()), kernel_(kernel) {}
virtual ~LiteOpActor() = default;
virtual void OpRun(OpDataPtr<Tensor> inputs, OpContext<Tensor> *context = nullptr) {
input_op_datas_[context->sequential_num_].push_back(inputs);
if (input_op_datas_[context->sequential_num_].size() < kernel_->in_tensors().size()) {
return;
}
auto ret = RunKernel();
if (ret != RET_OK) {
context->SetFailed(ret);
input_op_datas_.erase(context->sequential_num_);
return;
}
SetOutputData(context);
input_op_datas_.erase(context->sequential_num_);
}
void Init() {
auto ret = CompileArrow();
if (ret != RET_OK) {
MS_LOG(ERROR) << "CompileArrow failed, name: " << kernel_->name();
// do not support return error
}
}
int CompileArrow();
int RunKernel() {
int ret;
ret = kernel_->PreProcess();
if (RET_OK != ret) {
MS_LOG(ERROR) << "PreProcess kernel failed, name: " << kernel_->name();
return ret;
}
ret = kernel_->Run();
if (RET_OK != ret) {
MS_LOG(ERROR) << "run kernel failed, name: " << kernel_->name();
return ret;
}
ret = kernel_->PostProcess();
if (RET_OK != ret) {
MS_LOG(ERROR) << "PostProcess kernel failed, name: " << kernel_->name();
return ret;
}
return ret;
}
private:
void SetOutputData(OpContext<Tensor> *context);
kernel::LiteKernel *kernel_;
};
int MindrtInit();
std::vector<std::shared_ptr<LiteOpActor>> CreateOpActor(const std::vector<kernel::LiteKernel *> &kernels);
} // namespace lite
} // namespace mindspore
#endif // MINDSPORE_LITE_SRC_LITE_MINDRT_H_

View File

@ -28,6 +28,9 @@
#include "src/kernel_registry.h"
#include "src/lite_model.h"
#include "src/dequant.h"
#ifdef ENABLE_MINDRT
#include "src/mindrt_executor.h"
#endif
#if SUPPORT_NPU
#include "src/runtime/agent/npu/npu_manager.h"
#include "src/runtime/agent/npu/optimizer/npu_pass_manager.h"
@ -419,6 +422,7 @@ int LiteSession::CompileGraph(Model *model) {
is_running_.store(false);
return ret;
}
is_running_.store(false);
return RET_OK;
}
@ -511,7 +515,15 @@ int LiteSession::Init(const Context *context) {
is_running_.store(false);
return ret;
}
#ifdef ENABLE_MINDRT
if (context_->IsCpuEnabled() && !context_->IsGpuEnabled() && !context_->IsNpuEnabled() && kernels_.size() == 1) {
executor_ = new (std::nothrow) MindrtExecutor();
} else {
executor_ = new (std::nothrow) Executor();
}
#else
executor_ = new (std::nothrow) Executor();
#endif
if (nullptr == executor_) {
MS_LOG(ERROR) << "New Executor failed";
is_running_.store(false);

View File

@ -0,0 +1,81 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <queue>
#include <memory>
#include "src/mindrt_executor.h"
#include "src/lite_mindrt.h"
#include "include/errorcode.h"
namespace mindspore::lite {
int MindrtExecutor::Prepare(const std::vector<kernel::LiteKernel *> &kernels) {
auto ret = MindrtInit();
if (ret != RET_OK) {
MS_LOG(ERROR) << "MindrtInit failed";
return ret;
}
auto kernelSize = kernels.size();
opActors_ = CreateOpActor(kernels);
if (opActors_.size() != kernelSize) {
MS_LOG(ERROR) << "CreateOpActor failed";
return RET_ERROR;
}
for (size_t i = 0; i < kernelSize; i++) {
if (kernels[i]->in_kernels().size() == 0) {
auto inTensorSize = kernels[i]->in_tensors().size();
for (size_t j = 0; j < inTensorSize; j++) {
auto data =
std::make_shared<OpData<Tensor>>(opActors_[i]->GetAID(), kernels[i]->in_tensors()[j], static_cast<int>(j));
inputData_.emplace_back(data);
}
}
if (kernels[i]->out_kernels().size() == 0) {
auto outTensorSize = kernels[i]->out_tensors().size();
for (size_t j = 0; j < outTensorSize; j++) {
auto data =
std::make_shared<OpData<Tensor>>(opActors_[i]->GetAID(), kernels[i]->in_tensors()[j], static_cast<int>(j));
outputData_.emplace_back(data);
}
}
}
return RET_OK;
}
int MindrtExecutor::Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator,
const KernelCallBack &before, const KernelCallBack &after) {
MS_ASSERT(nullptr != allocator);
if (kernels.front()->Type() != schema::PrimitiveType_Merge) {
auto ret = this->CheckInputs(in_tensors);
if (RET_OK != ret) {
MS_LOG(ERROR) << "CheckInputs failed";
return ret;
}
}
// clear ref_count
for (auto *kernel : kernels) {
for (auto *tensor : kernel->in_tensors()) {
tensor->set_ref_count(0);
}
}
return MindrtRun<Tensor>(inputData_, &outputData_);
}
} // namespace mindspore::lite

View File

@ -0,0 +1,47 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_LITE_SRC_MINDRT_EXECUTOR_H_
#define MINDSPORE_LITE_SRC_MINDRT_EXECUTOR_H_
#include <memory>
#include <vector>
#include "src/runtime/allocator.h"
#include "src/lite_kernel.h"
#include "src/lite_mindrt.h"
#include "src/executor.h"
#include "include/lite_session.h"
namespace mindspore::lite {
// class Executor {
class MindrtExecutor : public Executor {
public:
MindrtExecutor() = default;
virtual ~MindrtExecutor() = default;
virtual int Prepare(const std::vector<kernel::LiteKernel *> &kernels);
virtual int Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const KernelCallBack &before = nullptr, const KernelCallBack &after = nullptr);
protected:
std::vector<std::shared_ptr<LiteOpActor>> opActors_;
std::vector<OpDataPtr<Tensor>> inputData_;
std::vector<OpDataPtr<Tensor>> outputData_;
};
} // namespace mindspore::lite
#endif

View File

@ -21,9 +21,9 @@
namespace mindspore::lite::opencl {
int OpenCLExecutor::Run(std::vector<Tensor *> &inputs, std::vector<Tensor *> &outputs,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator, const KernelCallBack &before,
const KernelCallBack &after) {
int OpenCLExecutor::Run(const std::vector<Tensor *> &inputs, const std::vector<Tensor *> &outputs,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator,
const KernelCallBack &before, const KernelCallBack &after) {
return RunOrTune(inputs, outputs, kernels, allocator, before, after, false);
}

View File

@ -31,9 +31,9 @@ class OpenCLExecutor : public Executor {
int Prepare(const std::vector<kernel::LiteKernel *> &kernels) override { return RET_OK; }
int Run(std::vector<Tensor *> &inputs, std::vector<Tensor *> &outputs, std::vector<kernel::LiteKernel *> &kernels,
Allocator *allocator = nullptr, const KernelCallBack &before = nullptr,
const KernelCallBack &after = nullptr) override;
int Run(const std::vector<Tensor *> &inputs, const std::vector<Tensor *> &outputs,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const KernelCallBack &before = nullptr, const KernelCallBack &after = nullptr) override;
int RunOrTune(const std::vector<Tensor *> &inputs, const std::vector<Tensor *> &outputs,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const KernelCallBack &before = nullptr, const KernelCallBack &after = nullptr, bool is_tune = false);

View File

@ -48,8 +48,8 @@ static int RunKernel(void *data, int index) {
return 0;
}
int ParallelExecutor::Run(std::vector<Tensor *> &in_tensors, std::vector<Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator,
int ParallelExecutor::Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator,
const KernelCallBack &before, const KernelCallBack &after) {
MS_ASSERT(nullptr != allocator);
for (auto &inTensor : in_tensors) {

View File

@ -32,8 +32,8 @@ class ParallelExecutor : public Executor {
int Prepare(const std::vector<kernel::LiteKernel *> &kernels) override;
int Run(std::vector<Tensor *> &in_tensors, std::vector<Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
int Run(const std::vector<Tensor *> &in_tensors, const std::vector<Tensor *> &out_tensors,
const std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const KernelCallBack &before = nullptr, const KernelCallBack &after = nullptr) override;
inline kernel::LiteKernel *GetReadyKernel(const int index) const { return readyKernels.at(index); }
inline void SetResult(const int index, const int result) { results.at(index) = result; }

View File

@ -17,6 +17,7 @@
#ifndef MINDSPORE_LITE_SRC_SUB_GRAPH_H
#define MINDSPORE_LITE_SRC_SUB_GRAPH_H
#include <atomic>
#include <utility>
#include <string>
#include <vector>
@ -145,7 +146,8 @@ class CpuFp32SubGraph : public CpuSubGraph {
std::vector<LiteKernel *> nodes, const lite::InnerContext *ctx)
: CpuSubGraph(inputs, outputs, std::move(in_kernels), std::move(out_kernels), std::move(nodes), ctx) {
subgraph_type_ = kCpuFP32SubGraph;
this->name_ = "CpuFP32SubGraph";
static std::atomic_int index = 0;
this->name_ = "CpuFP32SubGraph" + std::to_string(index++);
}
~CpuFp32SubGraph() override = default;
@ -166,7 +168,8 @@ class CpuFp16SubGraph : public CpuSubGraph {
std::vector<LiteKernel *> nodes, const lite::InnerContext *ctx)
: CpuSubGraph(inputs, outputs, std::move(in_kernels), std::move(out_kernels), std::move(nodes), ctx) {
subgraph_type_ = kCpuFP16SubGraph;
this->name_ = "CpuFP16SubGraph";
static std::atomic_int index = 0;
this->name_ = "CpuFP16SubGraph" + std::to_string(index++);
}
~CpuFp16SubGraph() override = default;

View File

@ -180,6 +180,17 @@ if(SUPPORT_GPU STREQUAL vulkan)
${VULKAN_RUNTIME_SRC}
)
endif()
if(ENABLE_MINDRT)
set(TEST_LITE_SRC
${TEST_LITE_SRC}
${LITE_DIR}/src/lite_mindrt.cc
${LITE_DIR}/src/mindrt_executor.cc
)
include_directories(${TOP_DIR}/mindspore/core/mindrt/include)
endif()
### converter
if(ENABLE_CONVERTER)
add_definitions(-DPRIMITIVE_WRITEABLE)
@ -331,10 +342,15 @@ endif()
add_executable(lite-test ${TEST_SRC})
add_dependencies(lite-test fbs_src)
target_link_libraries(lite-test dl mindspore::gtest)
if(SUPPORT_TRAIN)
target_link_libraries(lite-test minddata-lite)
endif()
if(ENABLE_MINDRT)
target_link_libraries(lite-test mindrt_mid)
endif()
if(PLATFORM_ARM64 AND ENABLE_FP16)
target_link_libraries(lite-test nnacl_fp16_mid nnacl_optimize_mid)

View File

@ -4,7 +4,6 @@ set(COMMON_SRC
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/file_utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/utils.cc
)
add_executable(benchmark
${CMAKE_CURRENT_SOURCE_DIR}/main.cc
${CMAKE_CURRENT_SOURCE_DIR}/benchmark.cc

View File

@ -3,6 +3,8 @@ set(COMMON_SRC
${CMAKE_CURRENT_SOURCE_DIR}/../common/flag_parser.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/file_utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/log_adapter.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/errorcode.cc
)
add_executable(cropper
@ -14,7 +16,7 @@ add_executable(cropper
add_dependencies(cropper fbs_src)
target_link_libraries(cropper mindspore-lite_static)
target_link_libraries(cropper)
add_custom_command(TARGET cropper POST_BUILD COMMAND
bash build_cropper_config.sh WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

View File

@ -5,6 +5,7 @@ set(COMMON_SRC
${CMAKE_CURRENT_SOURCE_DIR}/../common/flag_parser.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/file_utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/utils.cc
${CMAKE_CURRENT_SOURCE_DIR}/../../src/common/log_adapter.cc
)
add_executable(schema_gen
${CMAKE_CURRENT_SOURCE_DIR}/main.cc