diff --git a/cmake/external_libs/libevent.cmake b/cmake/external_libs/libevent.cmake index cc4067ef644..b780b1f9664 100644 --- a/cmake/external_libs/libevent.cmake +++ b/cmake/external_libs/libevent.cmake @@ -11,7 +11,7 @@ endif () mindspore_add_pkg(libevent VER 2.1.12 - LIBS event event_pthreads + LIBS event event_pthreads event_core URL ${REQ_URL} MD5 ${MD5} CMAKE_OPTION -DCMAKE_BUILD_TYPE:STRING=Release -DBUILD_TESTING=OFF) @@ -20,3 +20,4 @@ include_directories(${libevent_INC}) add_library(mindspore::event ALIAS libevent::event) add_library(mindspore::event_pthreads ALIAS libevent::event_pthreads) +add_library(mindspore::event_core ALIAS libevent::event_core) diff --git a/cmake/package.cmake b/cmake/package.cmake index 5da32b81e17..d818f01a573 100644 --- a/cmake/package.cmake +++ b/cmake/package.cmake @@ -80,8 +80,8 @@ if (USE_GLOG) endif () file(GLOB_RECURSE LIBEVENT_LIB_LIST - ${libevent_LIBPATH}/libevent* - ${libevent_LIBPATH}/libevent_pthreads* + ${libevent_LIBPATH}/libevent*${CMAKE_SHARED_LIBRARY_SUFFIX}* + ${libevent_LIBPATH}/libevent_pthreads*${CMAKE_SHARED_LIBRARY_SUFFIX}* ) install( @@ -89,6 +89,7 @@ install( DESTINATION ${INSTALL_LIB_DIR} COMPONENT mindspore ) + if (ENABLE_MINDDATA) install( TARGETS _c_dataengine _c_mindrecord diff --git a/mindspore/ccsrc/CMakeLists.txt b/mindspore/ccsrc/CMakeLists.txt index 66996e97fd2..a370daa3ad1 100644 --- a/mindspore/ccsrc/CMakeLists.txt +++ b/mindspore/ccsrc/CMakeLists.txt @@ -304,6 +304,7 @@ if (CMAKE_SYSTEM_NAME MATCHES "Windows") else () if (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)) target_link_libraries(mindspore mindspore::pslite proto_input mindspore::protobuf mindspore::event mindspore::event_pthreads ${zeromq_DIRPATH}/zmq_install/lib/libzmq.a) + target_link_libraries(mindspore -Wl,--no-as-needed mindspore::event_core) if (${ENABLE_IBVERBS} STREQUAL "ON") target_link_libraries(mindspore ibverbs rdmacm) endif() diff --git a/mindspore/ccsrc/ps/core/cluster_config.h b/mindspore/ccsrc/ps/core/cluster_config.h index 8a8ace7fb40..ea7bd68b355 100644 --- a/mindspore/ccsrc/ps/core/cluster_config.h +++ b/mindspore/ccsrc/ps/core/cluster_config.h @@ -28,7 +28,6 @@ namespace mindspore { namespace ps { namespace core { - constexpr uint32_t kHeartbeatInterval = 3; class ClusterConfig { diff --git a/mindspore/ccsrc/ps/core/comm_util.cc b/mindspore/ccsrc/ps/core/comm_util.cc index 9d5be87ee0a..71bc9d0d599 100644 --- a/mindspore/ccsrc/ps/core/comm_util.cc +++ b/mindspore/ccsrc/ps/core/comm_util.cc @@ -26,7 +26,6 @@ namespace mindspore { namespace ps { namespace core { - bool CommUtil::CheckIpWithRegex(const std::string &ip) { std::regex pattern("((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)"); std::smatch res; @@ -76,7 +75,6 @@ void CommUtil::GetAvailableInterfaceAndIP(std::string *interface, std::string *i MS_EXCEPTION_IF_NULL(if_address); freeifaddrs(if_address); } - } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/comm_util.h b/mindspore/ccsrc/ps/core/comm_util.h index 3200a624159..651ba1c3e8f 100644 --- a/mindspore/ccsrc/ps/core/comm_util.h +++ b/mindspore/ccsrc/ps/core/comm_util.h @@ -49,7 +49,6 @@ namespace mindspore { namespace ps { namespace core { - class CommUtil { public: static bool CheckIpWithRegex(const std::string &ip); diff --git a/mindspore/ccsrc/ps/core/http_message_handler.cc b/mindspore/ccsrc/ps/core/http_message_handler.cc index c7f6c008adb..a3b3351c917 100644 --- a/mindspore/ccsrc/ps/core/http_message_handler.cc +++ b/mindspore/ccsrc/ps/core/http_message_handler.cc @@ -37,7 +37,6 @@ namespace mindspore { namespace ps { namespace core { - void HttpMessageHandler::InitHttpMessage() { MS_EXCEPTION_IF_NULL(event_request_); event_uri_ = evhttp_request_get_evhttp_uri(event_request_); @@ -176,10 +175,13 @@ void HttpMessageHandler::SendResponse() { evhttp_send_reply(event_request_, resp_code_, nullptr, resp_buf_); } -void HttpMessageHandler::QuickResponse(int code, const std::string &body) { +void HttpMessageHandler::QuickResponse(int code, const unsigned char *body, size_t len) { MS_EXCEPTION_IF_NULL(event_request_); + MS_EXCEPTION_IF_NULL(body); MS_EXCEPTION_IF_NULL(resp_buf_); - AddRespString(body); + if (evbuffer_add(resp_buf_, body, len) == -1) { + MS_LOG(EXCEPTION) << "Add body to response body failed."; + } evhttp_send_reply(event_request_, code, nullptr, resp_buf_); } diff --git a/mindspore/ccsrc/ps/core/http_message_handler.h b/mindspore/ccsrc/ps/core/http_message_handler.h index f5c7a0e619f..3b7e571e7d8 100644 --- a/mindspore/ccsrc/ps/core/http_message_handler.h +++ b/mindspore/ccsrc/ps/core/http_message_handler.h @@ -37,7 +37,6 @@ namespace mindspore { namespace ps { namespace core { - using HttpHeaders = std::map>; class HttpMessageHandler { @@ -79,7 +78,7 @@ class HttpMessageHandler { // Make sure code and all response body has finished set void SendResponse(); - void QuickResponse(int code, const std::string &body); + void QuickResponse(int code, const unsigned char *body, size_t len); void SimpleResponse(int code, const HttpHeaders &headers, const std::string &body); // If message is empty, libevent will use default error code message instead @@ -100,7 +99,6 @@ class HttpMessageHandler { // Body length should no more than MAX_POST_BODY_LEN, default 64kB void ParsePostParam(); }; - } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/http_server.cc b/mindspore/ccsrc/ps/core/http_server.cc index 44af4bb81fa..5ed0376f8f4 100644 --- a/mindspore/ccsrc/ps/core/http_server.cc +++ b/mindspore/ccsrc/ps/core/http_server.cc @@ -41,7 +41,6 @@ namespace mindspore { namespace ps { namespace core { - HttpServer::~HttpServer() { Stop(); } bool HttpServer::InitServer() { @@ -50,6 +49,10 @@ bool HttpServer::InitServer() { } is_stop_ = false; + int result = evthread_use_pthreads(); + if (result != 0) { + MS_LOG(EXCEPTION) << "Use event pthread failed!"; + } event_base_ = event_base_new(); MS_EXCEPTION_IF_NULL(event_base_); event_http_ = evhttp_new(event_base_); @@ -164,7 +167,6 @@ void HttpServer::Stop() { is_stop_ = true; } } - } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/http_server.h b/mindspore/ccsrc/ps/core/http_server.h index 69d2936800a..9d95f33faf8 100644 --- a/mindspore/ccsrc/ps/core/http_server.h +++ b/mindspore/ccsrc/ps/core/http_server.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -37,7 +38,6 @@ namespace mindspore { namespace ps { namespace core { - typedef enum eHttpMethod { HM_GET = 1 << 0, HM_POST = 1 << 1, @@ -92,7 +92,6 @@ class HttpServer { bool is_init_; std::atomic is_stop_; }; - } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/tcp_client.cc b/mindspore/ccsrc/ps/core/tcp_client.cc index ad8db6aed46..ece0f3c1d23 100644 --- a/mindspore/ccsrc/ps/core/tcp_client.cc +++ b/mindspore/ccsrc/ps/core/tcp_client.cc @@ -35,7 +35,6 @@ namespace mindspore { namespace ps { namespace core { - TcpClient::TcpClient(const std::string &address, std::uint16_t port) : event_base_(nullptr), event_timeout_(nullptr), @@ -254,7 +253,6 @@ void TcpClient::SendMessageWithTimer() { ev = evtimer_new(event_base_, SendHeartBeatCallback, this); evtimer_add(ev, &timeout); } - } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/tcp_message_handler.h b/mindspore/ccsrc/ps/core/tcp_message_handler.h index c13db9d703b..b77041e65e2 100644 --- a/mindspore/ccsrc/ps/core/tcp_message_handler.h +++ b/mindspore/ccsrc/ps/core/tcp_message_handler.h @@ -30,7 +30,6 @@ namespace mindspore { namespace ps { namespace core { - using messageReceive = std::function; class TcpMessageHandler { diff --git a/mindspore/ccsrc/ps/core/tcp_server.cc b/mindspore/ccsrc/ps/core/tcp_server.cc index 9093eec6272..004142b6bf5 100644 --- a/mindspore/ccsrc/ps/core/tcp_server.cc +++ b/mindspore/ccsrc/ps/core/tcp_server.cc @@ -32,7 +32,6 @@ namespace mindspore { namespace ps { namespace core { - void TcpConnection::InitConnection() { tcp_message_handler_.SetCallback([&](const CommMessage &message) { OnServerReceiveMessage on_server_receive = server_->GetServerReceive(); @@ -301,7 +300,6 @@ void TcpServer::SendMessage(const CommMessage &message) { uint16_t TcpServer::BoundPort() const { return server_port_; } void TcpServer::SetMessageCallback(const OnServerReceiveMessage &cb) { message_callback_ = cb; } - } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/tcp_server.h b/mindspore/ccsrc/ps/core/tcp_server.h index ba554e29a2c..e74af03796d 100644 --- a/mindspore/ccsrc/ps/core/tcp_server.h +++ b/mindspore/ccsrc/ps/core/tcp_server.h @@ -36,7 +36,6 @@ namespace mindspore { namespace ps { namespace core { - class TcpServer; class TcpConnection { public: @@ -106,7 +105,6 @@ class TcpServer { std::recursive_mutex connection_mutex_; OnServerReceiveMessage message_callback_; }; - } // namespace core } // namespace ps } // namespace mindspore diff --git a/tests/ut/cpp/ps/core/http_server_test.cc b/tests/ut/cpp/ps/core/http_server_test.cc index bb0f9bede15..01445eabe48 100644 --- a/tests/ut/cpp/ps/core/http_server_test.cc +++ b/tests/ut/cpp/ps/core/http_server_test.cc @@ -75,7 +75,8 @@ class TestHttpServer : public UT::Common { EXPECT_STREQ(resp->GetUriQuery().c_str(), "key1=value1"); EXPECT_STREQ(resp->GetRequestUri().c_str(), "/httpget?key1=value1"); EXPECT_STREQ(resp->GetUriPath().c_str(), "/httpget"); - resp->QuickResponse(200, "get request success!\n"); + const unsigned char ret[] = "get request success!\n"; + resp->QuickResponse(200, ret, 22); }, std::placeholders::_1);