From 43defcfaada1508d4a1367f73da94a1deac40850 Mon Sep 17 00:00:00 2001 From: chendongsheng Date: Wed, 31 Mar 2021 15:45:15 +0800 Subject: [PATCH] add multithread http server --- mindspore/ccsrc/ps/CMakeLists.txt | 13 +- mindspore/ccsrc/ps/core/abstract_node.h | 2 +- .../ps/core/{ => communicator}/http_client.cc | 2 +- .../ps/core/{ => communicator}/http_client.h | 8 +- .../http_message_handler.cc | 2 +- .../{ => communicator}/http_message_handler.h | 6 +- .../ccsrc/ps/core/communicator/http_server.cc | 149 +++++++++++++++ .../ps/core/{ => communicator}/http_server.h | 44 +++-- .../ps/core/{ => communicator}/message.h | 6 +- .../ps/core/{ => communicator}/tcp_client.cc | 2 +- .../ps/core/{ => communicator}/tcp_client.h | 8 +- .../{ => communicator}/tcp_message_handler.cc | 2 +- .../{ => communicator}/tcp_message_handler.h | 8 +- .../ps/core/{ => communicator}/tcp_server.cc | 2 +- .../ps/core/{ => communicator}/tcp_server.h | 8 +- .../ps/core/communicator/worker_queue.cc | 89 +++++++++ .../ccsrc/ps/core/communicator/worker_queue.h | 50 +++++ mindspore/ccsrc/ps/core/http_server.cc | 175 ------------------ mindspore/ccsrc/ps/core/node.h | 4 +- mindspore/ccsrc/ps/core/scheduler_node.h | 4 +- mindspore/ccsrc/ps/core/server_node.h | 4 +- mindspore/ccsrc/ps/core/worker_node.h | 4 +- tests/ut/cpp/ps/core/http_client_test.cc | 121 ------------ tests/ut/cpp/ps/core/http_server_test.cc | 162 ---------------- tests/ut/cpp/ps/core/tcp_client_tests.cc | 2 +- .../cpp/ps/core/tcp_message_handler_test.cc | 2 +- tests/ut/cpp/ps/core/tcp_pb_server_test.cc | 4 +- 27 files changed, 356 insertions(+), 527 deletions(-) rename mindspore/ccsrc/ps/core/{ => communicator}/http_client.cc (99%) rename mindspore/ccsrc/ps/core/{ => communicator}/http_client.h (92%) rename mindspore/ccsrc/ps/core/{ => communicator}/http_message_handler.cc (99%) rename mindspore/ccsrc/ps/core/{ => communicator}/http_message_handler.h (94%) create mode 100644 mindspore/ccsrc/ps/core/communicator/http_server.cc rename mindspore/ccsrc/ps/core/{ => communicator}/http_server.h (66%) rename mindspore/ccsrc/ps/core/{ => communicator}/message.h (90%) rename mindspore/ccsrc/ps/core/{ => communicator}/tcp_client.cc (99%) rename mindspore/ccsrc/ps/core/{ => communicator}/tcp_client.h (93%) rename mindspore/ccsrc/ps/core/{ => communicator}/tcp_message_handler.cc (98%) rename mindspore/ccsrc/ps/core/{ => communicator}/tcp_message_handler.h (86%) rename mindspore/ccsrc/ps/core/{ => communicator}/tcp_server.cc (99%) rename mindspore/ccsrc/ps/core/{ => communicator}/tcp_server.h (95%) create mode 100644 mindspore/ccsrc/ps/core/communicator/worker_queue.cc create mode 100644 mindspore/ccsrc/ps/core/communicator/worker_queue.h delete mode 100644 mindspore/ccsrc/ps/core/http_server.cc delete mode 100644 tests/ut/cpp/ps/core/http_client_test.cc delete mode 100644 tests/ut/cpp/ps/core/http_server_test.cc diff --git a/mindspore/ccsrc/ps/CMakeLists.txt b/mindspore/ccsrc/ps/CMakeLists.txt index ab34d8ac73..6550ebe4e5 100644 --- a/mindspore/ccsrc/ps/CMakeLists.txt +++ b/mindspore/ccsrc/ps/CMakeLists.txt @@ -6,12 +6,12 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU))) list(REMOVE_ITEM _PS_SRC_FILES "scheduler.cc") list(REMOVE_ITEM _PS_SRC_FILES "util.cc") list(REMOVE_ITEM _PS_SRC_FILES "embedding_table_shard_metadata.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/http_message_handler.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/http_server.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_message_handler.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_server.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/comm_util.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_client.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_message_handler.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_server.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_client.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_message_handler.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_server.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/node.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/node_manager.cc") list(REMOVE_ITEM _PS_SRC_FILES "ps_cache/ps_cache_manager.cc") @@ -19,9 +19,10 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU))) list(REMOVE_ITEM _PS_SRC_FILES "core/server_node.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/abstract_node.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/scheduler_node.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/http_client.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_client.cc") list(REMOVE_ITEM _PS_SRC_FILES "worker.cc") list(REMOVE_ITEM _PS_SRC_FILES "parameter_server.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/worker_queue.cc") endif() if(NOT ENABLE_D) diff --git a/mindspore/ccsrc/ps/core/abstract_node.h b/mindspore/ccsrc/ps/core/abstract_node.h index c803f8ecd6..eae22c3351 100644 --- a/mindspore/ccsrc/ps/core/abstract_node.h +++ b/mindspore/ccsrc/ps/core/abstract_node.h @@ -25,7 +25,7 @@ #include #include "ps/core/node.h" -#include "ps/core/message.h" +#include "ps/core/communicator/message.h" #include "utils/ms_exception.h" namespace mindspore { diff --git a/mindspore/ccsrc/ps/core/http_client.cc b/mindspore/ccsrc/ps/core/communicator/http_client.cc similarity index 99% rename from mindspore/ccsrc/ps/core/http_client.cc rename to mindspore/ccsrc/ps/core/communicator/http_client.cc index a66a25b6c1..2642d78a1e 100644 --- a/mindspore/ccsrc/ps/core/http_client.cc +++ b/mindspore/ccsrc/ps/core/communicator/http_client.cc @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "ps/core/http_client.h" +#include "ps/core/communicator/http_client.h" namespace mindspore { namespace ps { diff --git a/mindspore/ccsrc/ps/core/http_client.h b/mindspore/ccsrc/ps/core/communicator/http_client.h similarity index 92% rename from mindspore/ccsrc/ps/core/http_client.h rename to mindspore/ccsrc/ps/core/communicator/http_client.h index 3da0ea378f..954e5a3256 100644 --- a/mindspore/ccsrc/ps/core/http_client.h +++ b/mindspore/ccsrc/ps/core/communicator/http_client.h @@ -14,8 +14,8 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_HTTP_CLIENT_H_ -#define MINDSPORE_CCSRC_PS_CORE_HTTP_CLIENT_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_CLIENT_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_CLIENT_H_ #include #include @@ -39,7 +39,7 @@ #include #include -#include "ps/core/http_message_handler.h" +#include "ps/core/communicator/http_message_handler.h" #include "ps/core/comm_util.h" namespace mindspore { @@ -93,4 +93,4 @@ class HttpClient { } // namespace core } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_HTTP_CLIENT_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_CLIENT_H_ diff --git a/mindspore/ccsrc/ps/core/http_message_handler.cc b/mindspore/ccsrc/ps/core/communicator/http_message_handler.cc similarity index 99% rename from mindspore/ccsrc/ps/core/http_message_handler.cc rename to mindspore/ccsrc/ps/core/communicator/http_message_handler.cc index aa5a7fe2d8..f816c51f28 100644 --- a/mindspore/ccsrc/ps/core/http_message_handler.cc +++ b/mindspore/ccsrc/ps/core/communicator/http_message_handler.cc @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "ps/core/http_message_handler.h" +#include "ps/core/communicator/http_message_handler.h" #include #include diff --git a/mindspore/ccsrc/ps/core/http_message_handler.h b/mindspore/ccsrc/ps/core/communicator/http_message_handler.h similarity index 94% rename from mindspore/ccsrc/ps/core/http_message_handler.h rename to mindspore/ccsrc/ps/core/communicator/http_message_handler.h index 1c2f2a7da5..96b5932a31 100644 --- a/mindspore/ccsrc/ps/core/http_message_handler.h +++ b/mindspore/ccsrc/ps/core/communicator/http_message_handler.h @@ -14,8 +14,8 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_HTTP_MESSAGE_HANDLER_H_ -#define MINDSPORE_CCSRC_PS_CORE_HTTP_MESSAGE_HANDLER_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MESSAGE_HANDLER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MESSAGE_HANDLER_H_ #include #include @@ -127,4 +127,4 @@ class HttpMessageHandler { } // namespace core } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_HTTP_MESSAGE_HANDLER_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MESSAGE_HANDLER_H_ diff --git a/mindspore/ccsrc/ps/core/communicator/http_server.cc b/mindspore/ccsrc/ps/core/communicator/http_server.cc new file mode 100644 index 0000000000..c1b0261880 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/http_server.cc @@ -0,0 +1,149 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/communicator/http_server.h" +#include "ps/core/communicator/http_message_handler.h" +#include "ps/core/comm_util.h" + +#ifdef WIN32 +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace mindspore { +namespace ps { +namespace core { +HttpServer::~HttpServer() { Stop(); } + +bool HttpServer::InitServer() { + if (!CommUtil::CheckIp(server_address_)) { + MS_LOG(ERROR) << "The http server ip:" << server_address_ << " is illegal!"; + return false; + } + + is_stop_ = false; + int result = evthread_use_pthreads(); + if (result != 0) { + MS_LOG(ERROR) << "Use event pthread failed!"; + return false; + } + + fd_ = ::socket(AF_INET, SOCK_STREAM, 0); + if (fd_ < 0) { + MS_LOG(ERROR) << "Socker error!"; + return false; + } + + int one = 1; + result = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&one), sizeof(int)); + if (result < 0) { + MS_LOG(ERROR) << "Set sock opt error!"; + return false; + } + + struct sockaddr_in addr; + errno_t ret = memset_s(&addr, sizeof(addr), 0, sizeof(addr)); + if (ret != EOK) { + MS_LOG(EXCEPTION) << "Memset failed."; + } + + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr(server_address_.c_str()); + addr.sin_port = htons(server_port_); + + result = ::bind(fd_, (struct sockaddr *)&addr, sizeof(addr)); + if (result < 0) { + MS_LOG(ERROR) << "Bind ip:" << server_address_ << " port:" << server_port_ << "failed!"; + return false; + } + + result = ::listen(fd_, backlog_); + if (result < 0) { + MS_LOG(ERROR) << "Listen ip:" << server_address_ << " port:" << server_port_ << "failed!"; + return false; + } + + int flags = 0; + if ((flags = fcntl(fd_, F_GETFL, 0)) < 0 || fcntl(fd_, F_SETFL, flags | O_NONBLOCK) < 0) { + MS_LOG(ERROR) << "Set fcntl O_NONBLOCK failed!"; + return false; + } + + return true; +} + +void HttpServer::SetTimeOut(int seconds) { + if (seconds < 0) { + MS_LOG(EXCEPTION) << "The timeout seconds:" << seconds << "is less than 0!"; + } + request_timeout_ = seconds; +} + +bool HttpServer::RegisterRoute(const std::string &url, OnRequestReceive *function) { + if (!function) { + return false; + } + request_handlers_[url] = function; + return true; +} + +bool HttpServer::Start() { + MS_LOG(INFO) << "Start http server!"; + for (size_t i = 0; i < thread_num_; i++) { + auto worker_queue = std::make_shared(); + worker_queue->Initialize(fd_, request_handlers_); + worker_queues_.push_back(worker_queue); + worker_threads_.emplace_back(std::make_shared(&WorkerQueue::Run, worker_queue)); + } + return true; +} + +bool HttpServer::Wait() { + for (size_t i = 0; i < thread_num_; i++) { + worker_threads_[i]->join(); + worker_threads_[i].reset(); + } + return true; +} + +void HttpServer::Stop() { + MS_LOG(INFO) << "Stop http server!"; + + if (!is_stop_.load()) { + for (size_t i = 0; i < thread_num_; i++) { + worker_queues_[i]->Stop(); + } + is_stop_ = true; + } +} +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/http_server.h b/mindspore/ccsrc/ps/core/communicator/http_server.h similarity index 66% rename from mindspore/ccsrc/ps/core/http_server.h rename to mindspore/ccsrc/ps/core/communicator/http_server.h index fd05d6a229..cbb03f85a1 100644 --- a/mindspore/ccsrc/ps/core/http_server.h +++ b/mindspore/ccsrc/ps/core/communicator/http_server.h @@ -14,10 +14,10 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_HTTP_SERVER_H_ -#define MINDSPORE_CCSRC_PS_CORE_HTTP_SERVER_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_SERVER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_SERVER_H_ -#include "ps/core/http_message_handler.h" +#include "ps/core/communicator/http_message_handler.h" #include #include @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -34,55 +35,52 @@ #include #include #include +#include +#include + +#include "ps/core/communicator/worker_queue.h" namespace mindspore { namespace ps { namespace core { -using OnRequestReceive = std::function)>; class HttpServer { public: // Server address only support IPV4 now, and should be in format of "x.x.x.x" - explicit HttpServer(const std::string &address, std::uint16_t port) + explicit HttpServer(const std::string &address, std::uint16_t port, size_t thread_num = 10) : server_address_(address), server_port_(port), - event_base_(nullptr), - event_http_(nullptr), - is_init_(false), is_stop_(true), - request_timeout_(300) {} + request_timeout_(300), + thread_num_(thread_num), + backlog_(1024), + fd_(-1) {} ~HttpServer(); bool InitServer(); void SetTimeOut(int seconds); - // Default allowed methods: GET, POST, HEAD, PUT, DELETE - void SetAllowedMethod(u_int16_t methods); - - // Default to ((((unsigned long long)0xffffffffUL) << 32) | 0xffffffffUL) - void SetMaxHeaderSize(std::size_t num); - - // Default to ((((unsigned long long)0xffffffffUL) << 32) | 0xffffffffUL) - void SetMaxBodySize(std::size_t num); - // Return: true if success, false if failed, check log to find failure reason bool RegisterRoute(const std::string &url, OnRequestReceive *func); - bool UnRegisterRoute(const std::string &url); bool Start(); + bool Wait(); void Stop(); private: std::string server_address_; std::uint16_t server_port_; - struct event_base *event_base_; - struct evhttp *event_http_; - bool is_init_; std::atomic is_stop_; int request_timeout_; + size_t thread_num_; + std::vector> worker_threads_; + std::vector> worker_queues_; + int32_t backlog_; + std::unordered_map request_handlers_; + int fd_; }; } // namespace core } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_HTTP_SERVER_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_SERVER_H_ diff --git a/mindspore/ccsrc/ps/core/message.h b/mindspore/ccsrc/ps/core/communicator/message.h similarity index 90% rename from mindspore/ccsrc/ps/core/message.h rename to mindspore/ccsrc/ps/core/communicator/message.h index a3a28ba3cf..f66c86783d 100644 --- a/mindspore/ccsrc/ps/core/message.h +++ b/mindspore/ccsrc/ps/core/communicator/message.h @@ -14,8 +14,8 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_MESSAGE_H_ -#define MINDSPORE_CCSRC_PS_CORE_MESSAGE_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_H_ #include #include @@ -56,4 +56,4 @@ struct CommandMeta { } // namespace core } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_MESSAGE_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_H_ diff --git a/mindspore/ccsrc/ps/core/tcp_client.cc b/mindspore/ccsrc/ps/core/communicator/tcp_client.cc similarity index 99% rename from mindspore/ccsrc/ps/core/tcp_client.cc rename to mindspore/ccsrc/ps/core/communicator/tcp_client.cc index 2d4e2afe0b..968ade6b5b 100644 --- a/mindspore/ccsrc/ps/core/tcp_client.cc +++ b/mindspore/ccsrc/ps/core/communicator/tcp_client.cc @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "ps/core/tcp_client.h" +#include "ps/core/communicator/tcp_client.h" #include #include diff --git a/mindspore/ccsrc/ps/core/tcp_client.h b/mindspore/ccsrc/ps/core/communicator/tcp_client.h similarity index 93% rename from mindspore/ccsrc/ps/core/tcp_client.h rename to mindspore/ccsrc/ps/core/communicator/tcp_client.h index 57761b266a..71748bab4e 100644 --- a/mindspore/ccsrc/ps/core/tcp_client.h +++ b/mindspore/ccsrc/ps/core/communicator/tcp_client.h @@ -14,10 +14,10 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_TCP_CLIENT_H_ -#define MINDSPORE_CCSRC_PS_CORE_TCP_CLIENT_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_ -#include "ps/core/tcp_message_handler.h" +#include "ps/core/communicator/tcp_message_handler.h" #include #include @@ -103,4 +103,4 @@ class TcpClient { } // namespace core } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_TCP_CLIENT_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_ diff --git a/mindspore/ccsrc/ps/core/tcp_message_handler.cc b/mindspore/ccsrc/ps/core/communicator/tcp_message_handler.cc similarity index 98% rename from mindspore/ccsrc/ps/core/tcp_message_handler.cc rename to mindspore/ccsrc/ps/core/communicator/tcp_message_handler.cc index e3c74ffc98..c24d3aa7d2 100644 --- a/mindspore/ccsrc/ps/core/tcp_message_handler.cc +++ b/mindspore/ccsrc/ps/core/communicator/tcp_message_handler.cc @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "ps/core/tcp_message_handler.h" +#include "ps/core/communicator/tcp_message_handler.h" #include #include diff --git a/mindspore/ccsrc/ps/core/tcp_message_handler.h b/mindspore/ccsrc/ps/core/communicator/tcp_message_handler.h similarity index 86% rename from mindspore/ccsrc/ps/core/tcp_message_handler.h rename to mindspore/ccsrc/ps/core/communicator/tcp_message_handler.h index 2912ae1c72..8bcc45c440 100644 --- a/mindspore/ccsrc/ps/core/tcp_message_handler.h +++ b/mindspore/ccsrc/ps/core/communicator/tcp_message_handler.h @@ -14,8 +14,8 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_TCP_MESSAGE_HANDLER_H_ -#define MINDSPORE_CCSRC_PS_CORE_TCP_MESSAGE_HANDLER_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_ #include #include @@ -24,7 +24,7 @@ #include #include "utils/log_adapter.h" -#include "ps/core/message.h" +#include "ps/core/communicator/message.h" #include "proto/comm.pb.h" #include "proto/ps.pb.h" @@ -58,4 +58,4 @@ class TcpMessageHandler { } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_TCP_MESSAGE_HANDLER_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_ diff --git a/mindspore/ccsrc/ps/core/tcp_server.cc b/mindspore/ccsrc/ps/core/communicator/tcp_server.cc similarity index 99% rename from mindspore/ccsrc/ps/core/tcp_server.cc rename to mindspore/ccsrc/ps/core/communicator/tcp_server.cc index ed497a4d4a..726bd03253 100644 --- a/mindspore/ccsrc/ps/core/tcp_server.cc +++ b/mindspore/ccsrc/ps/core/communicator/tcp_server.cc @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "ps/core/tcp_server.h" +#include "ps/core/communicator/tcp_server.h" #include #include diff --git a/mindspore/ccsrc/ps/core/tcp_server.h b/mindspore/ccsrc/ps/core/communicator/tcp_server.h similarity index 95% rename from mindspore/ccsrc/ps/core/tcp_server.h rename to mindspore/ccsrc/ps/core/communicator/tcp_server.h index d0a3be7cb0..f75fe356f0 100644 --- a/mindspore/ccsrc/ps/core/tcp_server.h +++ b/mindspore/ccsrc/ps/core/communicator/tcp_server.h @@ -14,8 +14,8 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_TCP_SERVER_H_ -#define MINDSPORE_CCSRC_PS_CORE_TCP_SERVER_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_ #include #include @@ -34,7 +34,7 @@ #include #include -#include "ps/core/tcp_message_handler.h" +#include "ps/core/communicator/tcp_message_handler.h" #include "ps/core/cluster_metadata.h" #include "utils/convert_utils_base.h" #include "ps/core/comm_util.h" @@ -140,4 +140,4 @@ class TcpServer { } // namespace core } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_TCP_SERVER_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_ diff --git a/mindspore/ccsrc/ps/core/communicator/worker_queue.cc b/mindspore/ccsrc/ps/core/communicator/worker_queue.cc new file mode 100644 index 0000000000..28c1119119 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/worker_queue.cc @@ -0,0 +1,89 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/communicator/worker_queue.h" + +namespace mindspore { +namespace ps { +namespace core { +bool WorkerQueue::Initialize(int fd, std::unordered_map handlers) { + evbase_ = event_base_new(); + MS_EXCEPTION_IF_NULL(evbase_); + struct evhttp *http = evhttp_new(evbase_); + MS_EXCEPTION_IF_NULL(http); + int result = evhttp_accept_socket(http, fd); + if (result < 0) { + MS_LOG(ERROR) << "Evhttp accept socket failed!"; + return false; + } + + for (const auto &handler : handlers) { + auto TransFunc = [](struct evhttp_request *req, void *arg) { + MS_EXCEPTION_IF_NULL(req); + MS_EXCEPTION_IF_NULL(arg); + auto httpReq = std::make_shared(); + httpReq->set_request(req); + httpReq->InitHttpMessage(); + OnRequestReceive *func = reinterpret_cast(arg); + (*func)(httpReq); + }; + + // O SUCCESS,-1 ALREADY_EXIST,-2 FAILURE + int ret = evhttp_set_cb(http, handler.first.c_str(), TransFunc, reinterpret_cast(handler.second)); + std::string log_prefix = "Ev http register handle of:"; + if (ret == 0) { + MS_LOG(INFO) << log_prefix << handler.first.c_str() << " success."; + } else if (ret == -1) { + MS_LOG(WARNING) << log_prefix << handler.first.c_str() << " exist."; + } else { + MS_LOG(ERROR) << log_prefix << handler.first.c_str() << " failed."; + return false; + } + } + return true; +} + +void WorkerQueue::Run() { + MS_LOG(INFO) << "Start http server!"; + MS_EXCEPTION_IF_NULL(evbase_); + int ret = event_base_dispatch(evbase_); + if (ret == 0) { + MS_LOG(INFO) << "Event base dispatch success!"; + } else if (ret == 1) { + MS_LOG(ERROR) << "Event base dispatch failed with no events pending or active!"; + } else if (ret == -1) { + MS_LOG(ERROR) << "Event base dispatch failed with error occurred!"; + } else { + MS_LOG(ERROR) << "Event base dispatch with unexpected error code!"; + } + + if (evbase_) { + event_base_free(evbase_); + evbase_ = nullptr; + } +} + +void WorkerQueue::Stop() { + MS_LOG(INFO) << "Stop http server!"; + + int ret = event_base_loopbreak(evbase_); + if (ret != 0) { + MS_LOG(EXCEPTION) << "event base loop break failed!"; + } +} +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/communicator/worker_queue.h b/mindspore/ccsrc/ps/core/communicator/worker_queue.h new file mode 100644 index 0000000000..eb25734ea1 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/worker_queue.h @@ -0,0 +1,50 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_ + +#include +#include +#include + +#include +#include +#include + +#include "utils/log_adapter.h" +#include "ps/core/communicator/http_message_handler.h" + +namespace mindspore { +namespace ps { +namespace core { +using OnRequestReceive = std::function)>; +class WorkerQueue { + public: + WorkerQueue() : evbase_(nullptr) {} + virtual ~WorkerQueue() = default; + + bool Initialize(int fd, std::unordered_map handlers); + void Run(); + void Stop(); + + private: + struct event_base *evbase_; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_ diff --git a/mindspore/ccsrc/ps/core/http_server.cc b/mindspore/ccsrc/ps/core/http_server.cc deleted file mode 100644 index 36a3862175..0000000000 --- a/mindspore/ccsrc/ps/core/http_server.cc +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Copyright 2020 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ps/core/http_server.h" -#include "ps/core/http_message_handler.h" -#include "ps/core/comm_util.h" - -#ifdef WIN32 -#include -#endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -namespace mindspore { -namespace ps { -namespace core { -HttpServer::~HttpServer() { Stop(); } - -bool HttpServer::InitServer() { - if (!CommUtil::CheckIp(server_address_)) { - MS_LOG(EXCEPTION) << "The http server ip:" << server_address_ << " is illegal!"; - } - - is_stop_ = false; - int result = evthread_use_pthreads(); - if (result != 0) { - MS_LOG(EXCEPTION) << "Use event pthread failed!"; - } - event_base_ = event_base_new(); - MS_EXCEPTION_IF_NULL(event_base_); - event_http_ = evhttp_new(event_base_); - MS_EXCEPTION_IF_NULL(event_http_); - evhttp_set_timeout(event_http_, request_timeout_); - int ret = evhttp_bind_socket(event_http_, server_address_.c_str(), server_port_); - if (ret != 0) { - MS_LOG(EXCEPTION) << "Http bind server addr:" << server_address_.c_str() << " port:" << server_port_ << "failed"; - } - is_init_ = true; - return true; -} - -void HttpServer::SetTimeOut(int seconds) { - MS_EXCEPTION_IF_NULL(event_http_); - if (seconds < 0) { - MS_LOG(EXCEPTION) << "The timeout seconds:" << seconds << "is less than 0!"; - } - request_timeout_ = seconds; -} - -void HttpServer::SetAllowedMethod(u_int16_t methods) { - MS_EXCEPTION_IF_NULL(event_http_); - evhttp_set_allowed_methods(event_http_, methods); -} - -void HttpServer::SetMaxHeaderSize(size_t num) { - MS_EXCEPTION_IF_NULL(event_http_); - if (num < 0) { - MS_LOG(EXCEPTION) << "The header num:" << num << "is less than 0!"; - } - evhttp_set_max_headers_size(event_http_, num); -} - -void HttpServer::SetMaxBodySize(size_t num) { - MS_EXCEPTION_IF_NULL(event_http_); - if (num < 0) { - MS_LOG(EXCEPTION) << "The max body num:" << num << "is less than 0!"; - } - evhttp_set_max_body_size(event_http_, num); -} - -bool HttpServer::RegisterRoute(const std::string &url, OnRequestReceive *function) { - if ((!is_init_) && (!InitServer())) { - MS_LOG(EXCEPTION) << "Init http server failed!"; - } - if (!function) { - return false; - } - - auto TransFunc = [](struct evhttp_request *req, void *arg) { - MS_EXCEPTION_IF_NULL(req); - MS_EXCEPTION_IF_NULL(arg); - auto httpReq = std::make_shared(); - httpReq->set_request(req); - httpReq->InitHttpMessage(); - OnRequestReceive *func = reinterpret_cast(arg); - (*func)(httpReq); - }; - MS_EXCEPTION_IF_NULL(event_http_); - - // O SUCCESS,-1 ALREADY_EXIST,-2 FAILURE - int ret = evhttp_set_cb(event_http_, url.c_str(), TransFunc, reinterpret_cast(function)); - if (ret == 0) { - MS_LOG(INFO) << "Ev http register handle of:" << url.c_str() << " success."; - } else if (ret == -1) { - MS_LOG(WARNING) << "Ev http register handle of:" << url.c_str() << " exist."; - } else { - MS_LOG(ERROR) << "Ev http register handle of:" << url.c_str() << " failed."; - return false; - } - return true; -} - -bool HttpServer::UnRegisterRoute(const std::string &url) { - MS_EXCEPTION_IF_NULL(event_http_); - return (evhttp_del_cb(event_http_, url.c_str()) == 0); -} - -bool HttpServer::Start() { - MS_LOG(INFO) << "Start http server!"; - MS_EXCEPTION_IF_NULL(event_base_); - int ret = event_base_dispatch(event_base_); - if (ret == 0) { - MS_LOG(INFO) << "Event base dispatch success!"; - return true; - } else if (ret == 1) { - MS_LOG(ERROR) << "Event base dispatch failed with no events pending or active!"; - return false; - } else if (ret == -1) { - MS_LOG(ERROR) << "Event base dispatch failed with error occurred!"; - return false; - } else { - MS_LOG(EXCEPTION) << "Event base dispatch with unexpected error code!"; - } - return true; -} - -void HttpServer::Stop() { - MS_LOG(INFO) << "Stop http server!"; - - if (!is_stop_.load()) { - int ret = event_base_loopbreak(event_base_); - if (ret != 0) { - MS_LOG(EXCEPTION) << "event base loop break failed!"; - } - if (event_http_) { - evhttp_free(event_http_); - event_http_ = nullptr; - } - if (event_base_) { - event_base_free(event_base_); - event_base_ = nullptr; - } - is_stop_ = true; - } -} -} // namespace core -} // namespace ps -} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/node.h b/mindspore/ccsrc/ps/core/node.h index 8b558ad648..d2fe18e0ea 100644 --- a/mindspore/ccsrc/ps/core/node.h +++ b/mindspore/ccsrc/ps/core/node.h @@ -32,8 +32,8 @@ #include "ps/core/cluster_metadata.h" #include "ps/core/node_info.h" -#include "ps/core/tcp_client.h" -#include "ps/core/tcp_server.h" +#include "ps/core/communicator/tcp_client.h" +#include "ps/core/communicator/tcp_server.h" namespace mindspore { namespace ps { diff --git a/mindspore/ccsrc/ps/core/scheduler_node.h b/mindspore/ccsrc/ps/core/scheduler_node.h index 8a682770da..ba18549fd5 100644 --- a/mindspore/ccsrc/ps/core/scheduler_node.h +++ b/mindspore/ccsrc/ps/core/scheduler_node.h @@ -28,8 +28,8 @@ #include #include "ps/core/cluster_metadata.h" -#include "ps/core/tcp_client.h" -#include "ps/core/tcp_server.h" +#include "ps/core/communicator/tcp_client.h" +#include "ps/core/communicator/tcp_server.h" #include "ps/core/node_manager.h" #include "ps/core/node.h" diff --git a/mindspore/ccsrc/ps/core/server_node.h b/mindspore/ccsrc/ps/core/server_node.h index 4efca95794..eb12747765 100644 --- a/mindspore/ccsrc/ps/core/server_node.h +++ b/mindspore/ccsrc/ps/core/server_node.h @@ -26,8 +26,8 @@ #include #include "ps/core/cluster_metadata.h" -#include "ps/core/tcp_client.h" -#include "ps/core/tcp_server.h" +#include "ps/core/communicator/tcp_client.h" +#include "ps/core/communicator/tcp_server.h" #include "ps/core/abstract_node.h" namespace mindspore { diff --git a/mindspore/ccsrc/ps/core/worker_node.h b/mindspore/ccsrc/ps/core/worker_node.h index baa12b18e2..c9dc027081 100644 --- a/mindspore/ccsrc/ps/core/worker_node.h +++ b/mindspore/ccsrc/ps/core/worker_node.h @@ -25,8 +25,8 @@ #include #include "ps/core/cluster_metadata.h" -#include "ps/core/tcp_client.h" -#include "ps/core/tcp_server.h" +#include "ps/core/communicator/tcp_client.h" +#include "ps/core/communicator/tcp_server.h" #include "ps/core/abstract_node.h" namespace mindspore { diff --git a/tests/ut/cpp/ps/core/http_client_test.cc b/tests/ut/cpp/ps/core/http_client_test.cc deleted file mode 100644 index 9909b93609..0000000000 --- a/tests/ut/cpp/ps/core/http_client_test.cc +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Copyright 2021 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "common/common_test.h" -#include "ps/core/http_server.h" -#include "ps/core/http_client.h" - -using namespace std; - -namespace mindspore { -namespace ps { -namespace core { -class TestHttpClient : public UT::Common { - public: - TestHttpClient() : server_(nullptr), http_server_thread_(nullptr) {} - - virtual ~TestHttpClient() = default; - - OnRequestReceive http_get_func = std::bind( - [](std::shared_ptr resp) { - EXPECT_STREQ(resp->GetUriPath().c_str(), "/httpget"); - const unsigned char ret[] = "get request success!\n"; - resp->QuickResponse(200, ret, 22); - }, - std::placeholders::_1); - - OnRequestReceive http_handler_func = std::bind( - [](std::shared_ptr resp) { - std::string host = resp->GetRequestHost(); - EXPECT_STREQ(host.c_str(), "127.0.0.1"); - - std::string path_param = resp->GetPathParam("key1"); - std::string header_param = resp->GetHeadParam("headerKey"); - unsigned char *data = nullptr; - const uint64_t len = resp->GetPostMsg(&data); - char post_message[len + 1]; - if (memset_s(post_message, len + 1, 0, len + 1) != 0) { - MS_LOG(EXCEPTION) << "The memset_s error"; - } - if (memcpy_s(post_message, len, data, len) != 0) { - MS_LOG(EXCEPTION) << "The memset_s error"; - } - MS_LOG(WARNING) << "The path param:" << path_param; - MS_LOG(WARNING) << "The header param:" << header_param; - - const std::string rKey("headKey"); - const std::string rVal("headValue"); - const std::string rBody("post request success!\n"); - resp->AddRespHeadParam(rKey, rVal); - resp->AddRespString(rBody); - - resp->SetRespCode(200); - resp->SendResponse(); - }, - std::placeholders::_1); - - void SetUp() override { - server_ = std::make_unique("0.0.0.0", 9999); - - server_->RegisterRoute("/httpget", &http_get_func); - server_->RegisterRoute("/handler", &http_handler_func); - http_server_thread_ = std::make_unique([&]() { server_->Start(); }); - http_server_thread_->detach(); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - - void TearDown() override { - server_->Stop(); - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - } - - private: - std::unique_ptr server_; - std::unique_ptr http_server_thread_; -}; - -TEST_F(TestHttpClient, Get) { - HttpClient client; - std::map headers = {{"headerKey", "headerValue"}}; - auto output = std::make_shared>(); - auto ret = client.Get("http://127.0.0.1:9999/httpget", output, headers); - - MS_LOG(WARNING) << "The get output:" << output->data(); - EXPECT_EQ(Status::OK, ret); -} - -TEST_F(TestHttpClient, Post) { - HttpClient client; - std::map headers = {{"headerKey", "headerValue"}}; - auto output = std::make_shared>(); - std::string post_data = "postKey=postValue"; - auto ret = - client.Post("http://127.0.0.1:9999/handler?key1=value1", post_data.c_str(), post_data.length(), output, headers); - MS_LOG(WARNING) << "The post output:" << output->data(); - EXPECT_EQ(Status::OK, ret); -} -} // namespace core -} // namespace ps -} // namespace mindspore diff --git a/tests/ut/cpp/ps/core/http_server_test.cc b/tests/ut/cpp/ps/core/http_server_test.cc deleted file mode 100644 index dd3dc94c79..0000000000 --- a/tests/ut/cpp/ps/core/http_server_test.cc +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Copyright 2020 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ps/core/http_server.h" -#include "common/common_test.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace mindspore { -namespace ps { -namespace core { - -class TestHttpServer : public UT::Common { - public: - TestHttpServer() : server_(nullptr) {} - - virtual ~TestHttpServer() = default; - - static void testGetHandler(std::shared_ptr resp) { - std::string host = resp->GetRequestHost(); - EXPECT_STREQ(host.c_str(), "127.0.0.1"); - - std::string path_param = resp->GetPathParam("key1"); - std::string header_param = resp->GetHeadParam("headerKey"); - unsigned char *data = nullptr; - const uint64_t len = resp->GetPostMsg(&data); - char post_message[len + 1]; - if (memset_s(post_message, len + 1, 0, len + 1) != 0) { - MS_LOG(EXCEPTION) << "The memset_s error"; - } - if (memcpy_s(post_message, len, data, len) != 0) { - MS_LOG(EXCEPTION) << "The memset_s error"; - } - EXPECT_STREQ(path_param.c_str(), "value1"); - EXPECT_STREQ(header_param.c_str(), "headerValue"); - EXPECT_STREQ(post_message, "postKey=postValue"); - - const std::string rKey("headKey"); - const std::string rVal("headValue"); - const std::string rBody("post request success!\n"); - resp->AddRespHeadParam(rKey, rVal); - resp->AddRespString(rBody); - - resp->SetRespCode(200); - resp->SendResponse(); - } - - void SetUp() override { - server_ = std::make_unique("0.0.0.0", 9999); - OnRequestReceive http_get_func = std::bind( - [](std::shared_ptr resp) { - EXPECT_STREQ(resp->GetPathParam("key1").c_str(), "value1"); - EXPECT_STREQ(resp->GetUriQuery().c_str(), "key1=value1"); - EXPECT_STREQ(resp->GetRequestUri().c_str(), "/httpget?key1=value1"); - EXPECT_STREQ(resp->GetUriPath().c_str(), "/httpget"); - const unsigned char ret[] = "get request success!\n"; - resp->QuickResponse(200, ret, 22); - }, - std::placeholders::_1); - - OnRequestReceive http_handler_func = std::bind( - [](std::shared_ptr resp) { - std::string host = resp->GetRequestHost(); - EXPECT_STREQ(host.c_str(), "127.0.0.1"); - - std::string path_param = resp->GetPathParam("key1"); - std::string header_param = resp->GetHeadParam("headerKey"); - std::string post_param = resp->GetPostParam("postKey"); - unsigned char *data = nullptr; - const uint64_t len = resp->GetPostMsg(&data); - char post_message[len + 1]; - if (memset_s(post_message, len + 1, 0, len + 1) != 0) { - MS_LOG(EXCEPTION) << "The memset_s error"; - } - if (memcpy_s(post_message, len, data, len) != 0) { - MS_LOG(EXCEPTION) << "The memset_s error"; - } - MS_LOG(WARNING) << "The Path param:" << path_param; - MS_LOG(WARNING) << "The header param:" << header_param; - - const std::string rKey("headKey"); - const std::string rVal("headValue"); - const std::string rBody("post request success!\n"); - resp->AddRespHeadParam(rKey, rVal); - resp->AddRespString(rBody); - - resp->SetRespCode(200); - resp->SendResponse(); - }, - std::placeholders::_1); - server_->RegisterRoute("/httpget", &http_get_func); - server_->RegisterRoute("/handler", &http_handler_func); - std::unique_ptr http_server_thread_(nullptr); - http_server_thread_ = std::make_unique([&]() { server_->Start(); }); - http_server_thread_->detach(); - std::this_thread::sleep_for(std::chrono::milliseconds(5000)); - } - - void TearDown() override { - std::this_thread::sleep_for(std::chrono::milliseconds(5000)); - server_->Stop(); - } - - private: - std::unique_ptr server_; -}; - -TEST_F(TestHttpServer, httpGetRequest) { - char buffer[100]; - FILE *file; - std::string cmd = "curl -X GET http://127.0.0.1:9999/httpget?key1=value1"; - std::string result; - const char *sysCommand = cmd.data(); - if ((file = popen(sysCommand, "r")) == nullptr) { - return; - } - while (fgets(buffer, sizeof(buffer) - 1, file) != nullptr) { - result += buffer; - } - MS_LOG(WARNING) << "The get output:" << result.c_str(); - pclose(file); -} - -TEST_F(TestHttpServer, messageHandler) { - char buffer[100]; - FILE *file; - std::string cmd = - R"(curl -X POST -d 'postKey=postValue' -i -H "Accept: application/json" -H "headerKey: headerValue" http://127.0.0.1:9999/handler?key1=value1)"; - std::string result; - const char *sysCommand = cmd.data(); - if ((file = popen(sysCommand, "r")) == nullptr) { - return; - } - while (fgets(buffer, sizeof(buffer) - 1, file) != nullptr) { - result += buffer; - } - MS_LOG(WARNING) << "The post output:" << result.substr(result.find("post")).c_str(); - pclose(file); -} -} // namespace core -} // namespace ps -} // namespace mindspore diff --git a/tests/ut/cpp/ps/core/tcp_client_tests.cc b/tests/ut/cpp/ps/core/tcp_client_tests.cc index 26e6703790..e8c9603275 100644 --- a/tests/ut/cpp/ps/core/tcp_client_tests.cc +++ b/tests/ut/cpp/ps/core/tcp_client_tests.cc @@ -17,7 +17,7 @@ #include #include "common/common_test.h" -#include "ps/core/tcp_client.h" +#include "ps/core/communicator/tcp_client.h" namespace mindspore { namespace ps { diff --git a/tests/ut/cpp/ps/core/tcp_message_handler_test.cc b/tests/ut/cpp/ps/core/tcp_message_handler_test.cc index 39c63d9bb0..d86b841b9a 100644 --- a/tests/ut/cpp/ps/core/tcp_message_handler_test.cc +++ b/tests/ut/cpp/ps/core/tcp_message_handler_test.cc @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "ps/core/tcp_message_handler.h" +#include "ps/core/communicator/tcp_message_handler.h" #include "common/common_test.h" #include diff --git a/tests/ut/cpp/ps/core/tcp_pb_server_test.cc b/tests/ut/cpp/ps/core/tcp_pb_server_test.cc index 3afa7f9013..9daa85fd1d 100644 --- a/tests/ut/cpp/ps/core/tcp_pb_server_test.cc +++ b/tests/ut/cpp/ps/core/tcp_pb_server_test.cc @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "ps/core/tcp_client.h" -#include "ps/core/tcp_server.h" +#include "ps/core/communicator/tcp_client.h" +#include "ps/core/communicator/tcp_server.h" #include "common/common_test.h" #include