!15558 [MSLITE] litebus -> mindrt

From: @ling_qiao_min
Reviewed-by: @zhang_xue_tong,@zhanghaibo5
Signed-off-by: @zhang_xue_tong
This commit is contained in:
mindspore-ci-bot 2021-04-23 15:21:54 +08:00 committed by Gitee
commit a8bd5c3e83
17 changed files with 89 additions and 243 deletions

View File

@ -11,5 +11,3 @@ file(GLOB MINDRT_SRC
)
add_library(mindrt_mid OBJECT ${MINDRT_SRC})

View File

@ -50,7 +50,7 @@ class ActorBase {
inline void PrintMsgRecord() {
uint32_t startPoint = recordNextPoint % MAX_ACTOR_RECORD_SIZE;
for (uint32_t i = 0; i < MAX_ACTOR_RECORD_SIZE; i++) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "Actor message dumps:%s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "Actor message dumps:%s",
"actor:%s,msg:%s", id.Name().c_str(), msgRecords[startPoint].c_str());
startPoint = (startPoint + MAX_ACTOR_RECORD_SIZE - 1) % MAX_ACTOR_RECORD_SIZE;
}
@ -79,7 +79,7 @@ class ActorBase {
void DelRuleUdp(const std::string &peer, bool outputLog);
protected:
using ActorFunction = std::function<void(std::unique_ptr<MessageBase> &msg)>;
using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>;
// install KMSG handler . This method will be called before the actor start to run.
virtual void Init() {}
@ -89,19 +89,19 @@ class ActorBase {
// KHTTPMsg handler
virtual void HandleHttp(std::unique_ptr<MessageBase> msg) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) HandleHttp() is not implemented", "a=%s", id.Name().c_str());
}
// KLOCALMsg handler
virtual void HandleLocalMsg(std::unique_ptr<MessageBase> msg) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) HandleLocalMsg() is not implemented.", "a=%s", id.Name().c_str());
}
// The link is closed.
virtual void Exited(const AID &actor) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG,
"ACTOR (%s) Exited() is not implemented. ", "a=%s", id.Name().c_str());
}
@ -152,13 +152,13 @@ class ActorBase {
friend class ActorThread;
// KMSG Msg Handler
virtual void HandlekMsg(std::unique_ptr<MessageBase> &msg);
virtual void HandlekMsg(const std::unique_ptr<MessageBase> &msg);
template <typename T>
static void BehaviorBase(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
std::unique_ptr<MessageBase> &msg) {
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KMSG) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Drop non-tcp message: %s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-tcp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
return;
@ -169,9 +169,9 @@ class ActorBase {
// register the message handle. It will be discarded.
template <typename T>
static void BehaviorBase1(T *t, void (T::*method)(mindspore::AID, std::string &&, std::string &&),
std::unique_ptr<MessageBase> &msg) {
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KMSG) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Drop non-tcp message: %s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-tcp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
return;
@ -182,9 +182,9 @@ class ActorBase {
// register the udp message handle. Use this closure function to drop non-udp messages
template <typename T>
static void BehaviorBaseForUdp(T *t, void (T::*method)(const mindspore::AID &, std::string &&, std::string &&),
std::unique_ptr<MessageBase> &msg) {
const std::unique_ptr<MessageBase> &msg) {
if (msg->type != MessageBase::Type::KUDP) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Drop non-udp message: %s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Drop non-udp message: %s",
"from:%s,to:%s,name:%s", std::string(msg->from).c_str(), std::string(msg->to).c_str(),
msg->name.c_str());
return;
@ -196,7 +196,7 @@ class ActorBase {
void Quit();
int EnqueMessage(std::unique_ptr<MessageBase> msg);
void Spawn(std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> actorThread);
void Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> actorThread);
void SetRunningStatus(bool start);
std::unique_ptr<ActorPolicy> actorThread;

View File

@ -56,7 +56,7 @@ class AppActor : public ActorBase {
APPBehavior behavior = std::bind(&BehaviorBase<T, M>, static_cast<T *>(this), method, std::placeholders::_1);
if (appBehaviors.find(msgName) != appBehaviors.end()) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "ACTOR msgName conflict:%s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "ACTOR msgName conflict:%s",
"a=%s,msg=%s", GetAID().Name().c_str(), msgName.c_str());
BUS_EXIT("msgName conflicts.");
return;
@ -80,7 +80,7 @@ class AppActor : public ActorBase {
if (it != appBehaviors.end()) {
it->second(std::move(msg));
} else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "ACTOR can not finds handler:%s",
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "ACTOR can not finds handler:%s",
"a=%s,msg=%s,hdlno=%zd", GetAID().Name().c_str(), msg->Name().c_str(), appBehaviors.size());
}
}

View File

@ -35,11 +35,11 @@ namespace mindspore {
#define BUS_DLOG(verboselevel) // VLOG(verboselevel)
#define HARES_LOG_PID int // GetLogPID();
#define PID_LITEBUS_LOG
#define PID_MINDRT_LOG
#define ICTSBASE_LOG_COMMON_CODE
#define HLOG_LEVEL_INFO
#define PID_LITEBUS_LOG
#define PID_MINDRT_LOG
#define HLOG_LEVEL_DEBUG 1
#define ICTSBASE_LOG0(logig, level, pid, format)
#define ICTSBASE_LOG1(logig, level, pid, format, para)
@ -50,8 +50,7 @@ namespace mindspore {
#define FlushHLogCache()
// Kill the process for safe exiting.
inline void KillProcess(const std::string &ret) {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "BUS Exit Tip: %s", "%s",
ret.c_str());
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "BUS Exit Tip: %s", "%s", ret.c_str());
// flush the log in cache to disk before exiting.
FlushHLogCache();
}
@ -80,12 +79,12 @@ constexpr int DLEVEL0 = 0;
mindspore::KillProcess(ss.str()); \
} while (0)
#define BUS_OOM_EXIT(ptr) \
{ \
if (ptr == nullptr) { \
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "new failed, will exit"); \
BUS_EXIT("Exit for OOM."); \
} \
#define BUS_OOM_EXIT(ptr) \
{ \
if (ptr == nullptr) { \
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "new failed, will exit"); \
BUS_EXIT("Exit for OOM."); \
} \
}
constexpr int LOG_CHECK_EVERY_FIRSTNUM = 10;

View File

@ -83,4 +83,4 @@ class MessageBase {
} // namespace mindspore
#endif // __LITEBUS_MESSAGE_HPP__
#endif

View File

@ -26,10 +26,8 @@
#include "async/future.h"
#include "async/defer.h"
#include "async/spinlock.h"
#include "actor/actor.h"
#include "litebus.hpp"
#include "mindrt/include/mindrt.hpp"
namespace mindspore {

View File

@ -14,24 +14,21 @@
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_
#include <memory>
#include <utility>
#include <future>
#include <iostream>
#include <list>
#include "actor/actor.h"
#include "actor/buslog.h"
#include "async/spinlock.h"
#include "async/status.h"
#include "async/uuid_generator.h"
#include "litebus.hpp"
#include "async/future_base.h"
#include "mindrt/include/mindrt.hpp"
namespace mindspore {
@ -90,7 +87,7 @@ class Future : public FutureBase {
const T &Get() const {
if (data->status.IsError()) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_LITEBUS_LOG,
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_MINDRT_LOG,
"Future::Get() but status == Error: %d", GetErrorCode());
return data->t;
}
@ -103,15 +100,15 @@ class Future : public FutureBase {
data->t = data->future.get();
data->gotten = true;
// } catch (std::future_error const &e) {
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Future error: %s",
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Future error: %s",
// "%s",
// e.what());
// } catch (std::exception const &e) {
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Standard exception:
// ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Standard exception:
// %s",
// "%s", e.what());
// } catch (...) {
// ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Unknown exception.");
// ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Unknown exception.");
// }
return data->t;

View File

@ -14,8 +14,8 @@
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_H_
#ifdef __cplusplus
#include <string>
@ -23,20 +23,20 @@
extern "C" {
#endif
#define LITEBUS_URL_MAX_LEN 138
#define MINDRT_URL_MAX_LEN 138
struct LitebusConfig {
char tcpUrl[LITEBUS_URL_MAX_LEN];
char tcpUrlAdv[LITEBUS_URL_MAX_LEN];
char udpUrl[LITEBUS_URL_MAX_LEN];
char udpUrlAdv[LITEBUS_URL_MAX_LEN];
struct MindrtConfig {
char tcpUrl[MINDRT_URL_MAX_LEN];
char tcpUrlAdv[MINDRT_URL_MAX_LEN];
char udpUrl[MINDRT_URL_MAX_LEN];
char udpUrlAdv[MINDRT_URL_MAX_LEN];
int threadCount;
int httpKmsgFlag;
};
int LitebusInitializeC(const struct LitebusConfig *config);
int MindrtInitializeC(const struct MindrtConfig *config);
void LitebusFinalizeC();
void MindrtFinalizeC();
#ifdef __cplusplus
}

View File

@ -14,8 +14,8 @@
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_HPP_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_LITEBUS_HPP_H
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_HPP_H_
#define MINDSPORE_CORE_MINDRT_INCLUDE_MINDRT_HPP_H_
#include <string>
#include "mindrt/include/actor/actor.h"
@ -23,10 +23,10 @@
// brief provide an asynchronous programming framework as Actor model
namespace mindspore {
struct LitebusAddress {
std::string scheme;
std::string ip;
uint16_t port;
struct MindrtAddress {
std::string scheme;
std::string ip;
uint16_t port;
};
// brief initialize the library
@ -60,11 +60,11 @@ void Finalize();
// brief set the delegate of restful
void SetDelegate(const std::string &delegate);
// set log pid of the process use litebus
// set log pid of the process use mindrt
void SetLogPID(HARES_LOG_PID pid);
// get global litebus address
const LitebusAddress &GetLitebusAddress();
// get global mindrt address
const MindrtAddress &GetMindrtAddress();
// get flag of http message format
int GetHttpKmsgFlag();
@ -72,5 +72,5 @@ int GetHttpKmsgFlag();
// brief set flag of http message format
void SetHttpKmsgFlag(int flag);
} // namespace mindspore
} // namespace mindspore
#endif

View File

@ -1,131 +0,0 @@
# include DIRECTIVES FOR LITEBUS LIBRARY (generates, e.g., -I/path/to/thing on Linux).
######################################################################################
if(${HTTP_ENABLED} STREQUAL "on")
set(LITEBUS_INCLUDE_DIR ${LITEBUS_INCLUDE_DIR} ${HTTP_PARSER_INCLUDE_DIR})
endif()
if(${SSL_ENABLED} STREQUAL "on")
set(LITEBUS_INCLUDE_DIR ${LITEBUS_INCLUDE_DIR}
${LITEBUS_OSSCRYPTO_INCLUDE_DIR}
${LITEBUS_HARESCRYPTO_INCLUDE_DIR}
)
endif()
link_directories(${SECUREC_LIB_DIR})
link_directories(${GLOG_LIB_DIR})
link_directories(${HARES_LOG_LIB_DIR})
link_directories(${PROTOBUF_C_LIB_DIR})
if(${HTTP_ENABLED} STREQUAL "on")
link_directories(${HTTP_PARSER_LIB_DIR})
endif()
# LINK libgcov.a
#######################################################################
if(${CODE_COVERAGE} STREQUAL "on")
set(LINK_LIBS ${LINK_LIBS} gcov)
endif()
# add object lib to avoid duplicate compile for a single source file
#######################################################################
add_library(litebus_obj OBJECT litebus.cc)
target_include_directories(litebus_obj PUBLIC ${LITEBUS_INCLUDE_DIR} ${3RDPARTY_LITEBUS_INCLUDE_DIRS})
#add_library(decrypt_obj OBJECT)
# THE LITEBUS LIBRARY (generates, e.g., liblitebus.so, etc., on Linux).
#######################################################################
if(${STATIC_LIB} STREQUAL "on")
#######################################################################
add_library(${LITEBUS_TARGET}_static STATIC $<TARGET_OBJECTS:litebus_obj>)
target_link_libraries(${LITEBUS_TARGET}_static ${LINK_LIBS} ${LITEBUS_HARES_DECRYPT_SLIB}
${OPENSSL_SSL_LIB_A} ${OPENSSL_CRYPTO_LIB_A})
set_target_properties(${LITEBUS_TARGET}_static PROPERTIES OUTPUT_NAME ${LITEBUS_TARGET})
if(DEFINED DEPEND_PATH)
add_custom_command(TARGET ${LITEBUS_TARGET}_static POST_BUILD
COMMAND ${CMAKE_COMMAND} -E create_symlink lib${LITEBUS_TARGET}.a liblitebus.a
)
endif()
#######################################################################
endif()
set(LINK_LIBS ${LINK_LIBS})
if(${HTTP_ENABLED} STREQUAL "on")
set(LINK_LIBS ${LINK_LIBS} ${HTTP_PARSER_DFLAG})
endif()
add_library(litebus_shared SHARED $<TARGET_OBJECTS:litebus_obj>)
target_link_libraries(litebus_shared ${LINK_LIBS})
target_include_directories(litebus_shared PUBLIC ${LITEBUS_INCLUDE_DIR} ${3RDPARTY_LITEBUS_INCLUDE_DIRS})
set_target_properties(
litebus_shared PROPERTIES
OUTPUT_NAME litebus
VERSION ${LITEBUS_PACKAGE_VERSION}
SOVERSION ${LITEBUS_PACKAGE_VERSION}
LINK_FLAGS -s
)
#copy lib to depend path (internal use)
#set(DEPEND_PATH "${PROJECT_SOURCE_DIR}/output1")
if(DEFINED DEPEND_PATH)
set(DEPEND_LIB_PATH ${DEPEND_PATH}/LITEBUS/lib)
set(DEPEND_INCLUDE_PATH ${DEPEND_PATH}/LITEBUS/include)
add_custom_target(litebus_all ALL COMMENT "================= litebus_all =====================")
if(${STATIC_LIB} STREQUAL "on")
add_dependencies(litebus_all litebus_shared ${LITEBUS_TARGET}_static)
endif()
add_dependencies(litebus_all litebus_shared)
add_custom_command(TARGET litebus_all POST_BUILD
COMMAND ${CMAKE_COMMAND} -P ${PROJECT_SOURCE_DIR}/cmake/MakeDirectory.cmake
${DEPEND_LIB_PATH} ${DEPEND_INCLUDE_PATH}
COMMAND ${CMAKE_COMMAND} -DLITEBUS_COPYTO="${DEPEND_LIB_PATH}" -P
${PROJECT_SOURCE_DIR}/cmake/CopyLibToPath.cmake
COMMAND ${CMAKE_COMMAND} -E copy ${PROJECT_SOURCE_DIR}/include/litebus.hpp
${DEPEND_INCLUDE_PATH}
COMMAND ${CMAKE_COMMAND} -E copy ${PROJECT_SOURCE_DIR}/include/litebus.h
${DEPEND_INCLUDE_PATH}
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/actor
${DEPEND_INCLUDE_PATH}/actor
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/async
${DEPEND_INCLUDE_PATH}/async
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/timer
${DEPEND_INCLUDE_PATH}/timer
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/exec
${DEPEND_INCLUDE_PATH}/exec
COMMAND ${CMAKE_COMMAND} -E copy_directory ${PROJECT_SOURCE_DIR}/include/utils
${DEPEND_INCLUDE_PATH}/utils)
if(${HTTP_ENABLED} STREQUAL "on")
add_custom_command(TARGET litebus_all POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
${PROJECT_SOURCE_DIR}/include/httpd ${DEPEND_INCLUDE_PATH}/httpd)
endif()
if(${SSL_ENABLED} STREQUAL "on")
add_custom_command(TARGET litebus_all POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
${PROJECT_SOURCE_DIR}/include/ssl ${DEPEND_INCLUDE_PATH}/ssl)
endif()
endif()
#install lib to package path
if("${PROJECT_HARES}" STREQUAL "cloudcore")
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_LS_Master/lib PERMISSIONS OWNER_READ)
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_LS_Slave/lib PERMISSIONS OWNER_READ)
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_HASEN_Common/lib PERMISSIONS OWNER_READ)
elseif("${PROJECT_HARES}" STREQUAL "hasen_cloudcore_csp")
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_Common/lib PERMISSIONS OWNER_READ)
else()
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_Common/lib PERMISSIONS OWNER_READ)
install(TARGETS litebus_shared LIBRARY DESTINATION HARES_Slave/lib PERMISSIONS OWNER_READ)
endif()
# Build pbjson.so.
add_subdirectory(actor)
add_subdirectory(async)
add_subdirectory(evloop)
add_subdirectory(timer)

View File

@ -1,8 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/actor.cc
${CMAKE_CURRENT_SOURCE_DIR}/actormgr.cc
${CMAKE_CURRENT_SOURCE_DIR}/actorthread.cc
${CMAKE_CURRENT_SOURCE_DIR}/actorpolicy.cc
${CMAKE_CURRENT_SOURCE_DIR}/aid.cc
${CMAKE_CURRENT_SOURCE_DIR}/sysmgr_actor.cc
)

View File

@ -26,7 +26,7 @@ ActorBase::ActorBase(const std::string &name)
ActorBase::~ActorBase() {}
void ActorBase::Spawn(std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> thread) {
void ActorBase::Spawn(const std::shared_ptr<ActorBase> &actor, std::unique_ptr<ActorPolicy> thread) {
// lock here or await(). and unlock at Quit() or at aweit.
waiterLock.lock();
@ -49,13 +49,13 @@ void ActorBase::Terminate() {
(void)EnqueMessage(std::move(msg));
}
void ActorBase::HandlekMsg(std::unique_ptr<MessageBase> &msg) {
void ActorBase::HandlekMsg(const std::unique_ptr<MessageBase> &msg) {
auto it = actionFunctions.find(msg->Name());
if (it != actionFunctions.end()) {
ActorFunction &func = it->second;
func(msg);
} else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_LITEBUS_LOG,
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_WARNING, PID_MINDRT_LOG,
"ACTOR can not find function for message (%s)", "a=%s,m=%s", id.Name().c_str(),
msg->Name().c_str());
MS_LOG(WARNING) << "ACTOR can not find function for message, a=" << id.Name().c_str()

View File

@ -102,11 +102,11 @@ void ActorMgr::Initialize(int threadCount) { threadPool.AddThread(threadCount);
void ActorMgr::Finalize() {
this->TerminateAll();
MS_LOG(INFO) << "litebus Actors finish exiting.";
MS_LOG(INFO) << "mindrt Actors finish exiting.";
// stop all actor threads;
threadPool.Finalize();
MS_LOG(INFO) << "litebus Threads finish exiting.";
MS_LOG(INFO) << "mindrt Threads finish exiting.";
// stop iomgr thread
for (auto mgrIt = ioMgrs.begin(); mgrIt != ioMgrs.end(); ++mgrIt) {
@ -114,7 +114,7 @@ void ActorMgr::Finalize() {
mgrIt->second->Finish();
}
MS_LOG(INFO) << "litebus IOMGRS finish exiting.";
MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
}
ActorReference ActorMgr::GetActor(const AID &id) {

View File

@ -1,7 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/async.cc
${CMAKE_CURRENT_SOURCE_DIR}/future.cc
${CMAKE_CURRENT_SOURCE_DIR}/uuid_base.cc
${CMAKE_CURRENT_SOURCE_DIR}/uuid_generator.cc
# ${CMAKE_CURRENT_SOURCE_DIR}/flag_parser_impl.cpp
)

View File

@ -16,13 +16,13 @@
#include <cstdlib>
#include <atomic>
#include "mindrt/src/actor/actormgr.h"
#include "mindrt/src/actor/iomgr.h"
#include "litebus.hpp"
#include "include/litebus.h"
#include "src/actor/actormgr.h"
#include "src/actor/iomgr.h"
#include "include/mindrt.hpp"
#include "include/mindrt.h"
extern "C" {
int LitebusInitializeC(const struct LitebusConfig *config) {
int MindrtInitializeC(const struct MindrtConfig *config) {
if (config == nullptr) {
return -1;
}
@ -40,44 +40,44 @@ int LitebusInitializeC(const struct LitebusConfig *config) {
std::string(config->udpUrlAdv), config->threadCount);
}
void LitebusFinalizeC() { mindspore::Finalize(); }
void MindrtFinalizeC() { mindspore::Finalize(); }
}
namespace mindspore {
namespace local {
static LitebusAddress *g_litebusAddress = new (std::nothrow) LitebusAddress();
static std::atomic_bool g_finalizeLitebusStatus(false);
static MindrtAddress *g_mindrtAddress = new (std::nothrow) MindrtAddress();
static std::atomic_bool g_finalizeMindrtStatus(false);
} // namespace local
const LitebusAddress &GetLitebusAddress() {
BUS_OOM_EXIT(local::g_litebusAddress);
return *local::g_litebusAddress;
const MindrtAddress &GetMindrtAddress() {
BUS_OOM_EXIT(local::g_mindrtAddress);
return *local::g_mindrtAddress;
}
void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); }
class LiteBusExit {
class MindrtExit {
public:
LiteBusExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "trace: enter LiteBusExit()---------");
MindrtExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "trace: enter MindrtExit()---------");
}
~LiteBusExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "trace: enter ~LiteBusExit()---------");
~MindrtExit() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "trace: enter ~MindrtExit()---------");
mindspore::Finalize();
}
};
int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl,
const std::string &udpUrlAdv, int threadCount) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts ......");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt starts ......");
// start actor's thread
SetThreadCount(threadCount);
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has started.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has started.");
return BUS_OK;
}
@ -85,18 +85,18 @@ int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv, const st
const std::string &udpUrlAdv, int threadCount) {
/* support repeat initialize */
int result = InitializeImp(tcpUrl, tcpUrlAdv, udpUrl, udpUrlAdv, threadCount);
static LiteBusExit busExit;
static MindrtExit busExit;
return result;
}
AID Spawn(ActorReference actor, bool sharedThread, bool start) {
if (actor == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Actor is nullptr.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Actor is nullptr.");
BUS_EXIT("Actor is nullptr.");
}
if (local::g_finalizeLitebusStatus.load() == true) {
if (local::g_finalizeMindrtStatus.load() == true) {
return actor->GetAID();
} else {
return ActorMgr::GetActorMgrRef()->Spawn(actor, sharedThread, start);
@ -117,15 +117,15 @@ void TerminateAll() { mindspore::ActorMgr::GetActorMgrRef()->TerminateAll(); }
void Finalize() {
bool inite = false;
if (local::g_finalizeLitebusStatus.compare_exchange_strong(inite, true) == false) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has been Finalized.");
if (local::g_finalizeMindrtStatus.compare_exchange_strong(inite, true) == false) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has been Finalized.");
return;
}
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts to finalize.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt starts to finalize.");
mindspore::ActorMgr::GetActorMgrRef()->Finalize();
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has been finalized.");
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "mindrt has been finalized.");
// flush the log in cache to disk before exiting.
FlushHLogCache();
}
@ -134,14 +134,14 @@ void SetDelegate(const std::string &delegate) { mindspore::ActorMgr::GetActorMgr
static HARES_LOG_PID g_busLogPid = 1;
void SetLogPID(HARES_LOG_PID pid) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, pid, "Set LiteBus log PID: %u", pid);
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, pid, "Set Mindrt log PID: %u", pid);
g_busLogPid = pid;
}
HARES_LOG_PID GetLogPID() { return g_busLogPid; }
static int g_httpKmsgEnable = -1;
void SetHttpKmsgFlag(int flag) {
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "Set LiteBus http message format:%d", flag);
ICTSBASE_LOG1(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_MINDRT_LOG, "Set Mindrt http message format:%d", flag);
g_httpKmsgEnable = flag;
}

View File

@ -16,7 +16,7 @@
#include <utility>
#include "src/lite_mindrt.h"
#include "mindrt/include/litebus.hpp"
#include "mindrt/include/mindrt.hpp"
namespace mindspore::lite {
int LiteOpActor::CompileArrow() {

View File

@ -194,7 +194,7 @@ if(ENABLE_MINDRT)
set(TEST_LITE_SRC ${TEST_LITE_SRC}
${LITE_DIR}/src/lite_mindrt.cc
${LITE_DIR}/src/mindrt_executor.cc
${CORE_DIR}/mindrt/src/litebus.cc
${CORE_DIR}/mindrt/src/mindrt.cc
${CORE_DIR}/mindrt/src/actor/actor.cc
${CORE_DIR}/mindrt/src/actor/actormgr.cc
${CORE_DIR}/mindrt/src/actor/actorpolicy.cc