forked from OSSInnovation/mindspore
!14230 add multithread http server
From: @anancds Reviewed-by: Signed-off-by:
This commit is contained in:
commit
aac165a8e5
|
@ -6,12 +6,12 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)))
|
|||
list(REMOVE_ITEM _PS_SRC_FILES "scheduler.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "util.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "embedding_table_shard_metadata.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/http_message_handler.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/http_server.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_message_handler.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_server.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/comm_util.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_client.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_message_handler.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_server.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_client.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_message_handler.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_server.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/node.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/node_manager.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "ps_cache/ps_cache_manager.cc")
|
||||
|
@ -19,9 +19,10 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)))
|
|||
list(REMOVE_ITEM _PS_SRC_FILES "core/server_node.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/abstract_node.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/scheduler_node.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/http_client.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_client.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "worker.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "parameter_server.cc")
|
||||
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/worker_queue.cc")
|
||||
endif()
|
||||
|
||||
if(NOT ENABLE_D)
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
#include <unordered_map>
|
||||
|
||||
#include "ps/core/node.h"
|
||||
#include "ps/core/message.h"
|
||||
#include "ps/core/communicator/message.h"
|
||||
#include "utils/ms_exception.h"
|
||||
|
||||
namespace mindspore {
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/http_client.h"
|
||||
#include "ps/core/communicator/http_client.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
|
@ -14,8 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_HTTP_CLIENT_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_HTTP_CLIENT_H_
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_CLIENT_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_CLIENT_H_
|
||||
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/event.h>
|
||||
|
@ -39,7 +39,7 @@
|
|||
#include <vector>
|
||||
#include <map>
|
||||
|
||||
#include "ps/core/http_message_handler.h"
|
||||
#include "ps/core/communicator/http_message_handler.h"
|
||||
#include "ps/core/comm_util.h"
|
||||
|
||||
namespace mindspore {
|
||||
|
@ -93,4 +93,4 @@ class HttpClient {
|
|||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_HTTP_CLIENT_H_
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_CLIENT_H_
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/http_message_handler.h"
|
||||
#include "ps/core/communicator/http_message_handler.h"
|
||||
|
||||
#include <event2/event.h>
|
||||
#include <event2/buffer.h>
|
|
@ -14,8 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_HTTP_MESSAGE_HANDLER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_HTTP_MESSAGE_HANDLER_H_
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MESSAGE_HANDLER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MESSAGE_HANDLER_H_
|
||||
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/event.h>
|
||||
|
@ -127,4 +127,4 @@ class HttpMessageHandler {
|
|||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_HTTP_MESSAGE_HANDLER_H_
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MESSAGE_HANDLER_H_
|
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/communicator/http_server.h"
|
||||
#include "ps/core/communicator/http_message_handler.h"
|
||||
#include "ps/core/comm_util.h"
|
||||
|
||||
#ifdef WIN32
|
||||
#include <WinSock2.h>
|
||||
#endif
|
||||
#include <arpa/inet.h>
|
||||
#include <event.h>
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/bufferevent.h>
|
||||
#include <event2/bufferevent_compat.h>
|
||||
#include <event2/http.h>
|
||||
#include <event2/http_compat.h>
|
||||
#include <event2/http_struct.h>
|
||||
#include <event2/listener.h>
|
||||
#include <event2/util.h>
|
||||
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <regex>
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
HttpServer::~HttpServer() { Stop(); }
|
||||
|
||||
bool HttpServer::InitServer() {
|
||||
if (!CommUtil::CheckIp(server_address_)) {
|
||||
MS_LOG(ERROR) << "The http server ip:" << server_address_ << " is illegal!";
|
||||
return false;
|
||||
}
|
||||
|
||||
is_stop_ = false;
|
||||
int result = evthread_use_pthreads();
|
||||
if (result != 0) {
|
||||
MS_LOG(ERROR) << "Use event pthread failed!";
|
||||
return false;
|
||||
}
|
||||
|
||||
fd_ = ::socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (fd_ < 0) {
|
||||
MS_LOG(ERROR) << "Socker error!";
|
||||
return false;
|
||||
}
|
||||
|
||||
int one = 1;
|
||||
result = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&one), sizeof(int));
|
||||
if (result < 0) {
|
||||
MS_LOG(ERROR) << "Set sock opt error!";
|
||||
return false;
|
||||
}
|
||||
|
||||
struct sockaddr_in addr;
|
||||
errno_t ret = memset_s(&addr, sizeof(addr), 0, sizeof(addr));
|
||||
if (ret != EOK) {
|
||||
MS_LOG(EXCEPTION) << "Memset failed.";
|
||||
}
|
||||
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = inet_addr(server_address_.c_str());
|
||||
addr.sin_port = htons(server_port_);
|
||||
|
||||
result = ::bind(fd_, (struct sockaddr *)&addr, sizeof(addr));
|
||||
if (result < 0) {
|
||||
MS_LOG(ERROR) << "Bind ip:" << server_address_ << " port:" << server_port_ << "failed!";
|
||||
return false;
|
||||
}
|
||||
|
||||
result = ::listen(fd_, backlog_);
|
||||
if (result < 0) {
|
||||
MS_LOG(ERROR) << "Listen ip:" << server_address_ << " port:" << server_port_ << "failed!";
|
||||
return false;
|
||||
}
|
||||
|
||||
int flags = 0;
|
||||
if ((flags = fcntl(fd_, F_GETFL, 0)) < 0 || fcntl(fd_, F_SETFL, flags | O_NONBLOCK) < 0) {
|
||||
MS_LOG(ERROR) << "Set fcntl O_NONBLOCK failed!";
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void HttpServer::SetTimeOut(int seconds) {
|
||||
if (seconds < 0) {
|
||||
MS_LOG(EXCEPTION) << "The timeout seconds:" << seconds << "is less than 0!";
|
||||
}
|
||||
request_timeout_ = seconds;
|
||||
}
|
||||
|
||||
bool HttpServer::RegisterRoute(const std::string &url, OnRequestReceive *function) {
|
||||
if (!function) {
|
||||
return false;
|
||||
}
|
||||
request_handlers_[url] = function;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool HttpServer::Start() {
|
||||
MS_LOG(INFO) << "Start http server!";
|
||||
for (size_t i = 0; i < thread_num_; i++) {
|
||||
auto worker_queue = std::make_shared<WorkerQueue>();
|
||||
worker_queue->Initialize(fd_, request_handlers_);
|
||||
worker_queues_.push_back(worker_queue);
|
||||
worker_threads_.emplace_back(std::make_shared<std::thread>(&WorkerQueue::Run, worker_queue));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool HttpServer::Wait() {
|
||||
for (size_t i = 0; i < thread_num_; i++) {
|
||||
worker_threads_[i]->join();
|
||||
worker_threads_[i].reset();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void HttpServer::Stop() {
|
||||
MS_LOG(INFO) << "Stop http server!";
|
||||
|
||||
if (!is_stop_.load()) {
|
||||
for (size_t i = 0; i < thread_num_; i++) {
|
||||
worker_queues_[i]->Stop();
|
||||
}
|
||||
is_stop_ = true;
|
||||
}
|
||||
}
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
|
@ -14,10 +14,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_HTTP_SERVER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_HTTP_SERVER_H_
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_SERVER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_SERVER_H_
|
||||
|
||||
#include "ps/core/http_message_handler.h"
|
||||
#include "ps/core/communicator/http_message_handler.h"
|
||||
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/event.h>
|
||||
|
@ -26,6 +26,7 @@
|
|||
#include <event2/listener.h>
|
||||
#include <event2/util.h>
|
||||
#include <event2/thread.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
|
@ -34,55 +35,52 @@
|
|||
#include <memory>
|
||||
#include <string>
|
||||
#include <atomic>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "ps/core/communicator/worker_queue.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
using OnRequestReceive = std::function<void(std::shared_ptr<HttpMessageHandler>)>;
|
||||
|
||||
class HttpServer {
|
||||
public:
|
||||
// Server address only support IPV4 now, and should be in format of "x.x.x.x"
|
||||
explicit HttpServer(const std::string &address, std::uint16_t port)
|
||||
explicit HttpServer(const std::string &address, std::uint16_t port, size_t thread_num = 10)
|
||||
: server_address_(address),
|
||||
server_port_(port),
|
||||
event_base_(nullptr),
|
||||
event_http_(nullptr),
|
||||
is_init_(false),
|
||||
is_stop_(true),
|
||||
request_timeout_(300) {}
|
||||
request_timeout_(300),
|
||||
thread_num_(thread_num),
|
||||
backlog_(1024),
|
||||
fd_(-1) {}
|
||||
|
||||
~HttpServer();
|
||||
|
||||
bool InitServer();
|
||||
void SetTimeOut(int seconds);
|
||||
|
||||
// Default allowed methods: GET, POST, HEAD, PUT, DELETE
|
||||
void SetAllowedMethod(u_int16_t methods);
|
||||
|
||||
// Default to ((((unsigned long long)0xffffffffUL) << 32) | 0xffffffffUL)
|
||||
void SetMaxHeaderSize(std::size_t num);
|
||||
|
||||
// Default to ((((unsigned long long)0xffffffffUL) << 32) | 0xffffffffUL)
|
||||
void SetMaxBodySize(std::size_t num);
|
||||
|
||||
// Return: true if success, false if failed, check log to find failure reason
|
||||
bool RegisterRoute(const std::string &url, OnRequestReceive *func);
|
||||
bool UnRegisterRoute(const std::string &url);
|
||||
|
||||
bool Start();
|
||||
bool Wait();
|
||||
void Stop();
|
||||
|
||||
private:
|
||||
std::string server_address_;
|
||||
std::uint16_t server_port_;
|
||||
struct event_base *event_base_;
|
||||
struct evhttp *event_http_;
|
||||
bool is_init_;
|
||||
std::atomic<bool> is_stop_;
|
||||
int request_timeout_;
|
||||
size_t thread_num_;
|
||||
std::vector<std::shared_ptr<std::thread>> worker_threads_;
|
||||
std::vector<std::shared_ptr<WorkerQueue>> worker_queues_;
|
||||
int32_t backlog_;
|
||||
std::unordered_map<std::string, OnRequestReceive *> request_handlers_;
|
||||
int fd_;
|
||||
};
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_HTTP_SERVER_H_
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_SERVER_H_
|
|
@ -14,8 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_MESSAGE_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_MESSAGE_H_
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_H_
|
||||
|
||||
#include <string>
|
||||
#include <memory>
|
||||
|
@ -56,4 +56,4 @@ struct CommandMeta {
|
|||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_MESSAGE_H_
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_H_
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/tcp_client.h"
|
||||
#include "ps/core/communicator/tcp_client.h"
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <event2/buffer.h>
|
|
@ -14,10 +14,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_TCP_CLIENT_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_TCP_CLIENT_H_
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
|
||||
|
||||
#include "ps/core/tcp_message_handler.h"
|
||||
#include "ps/core/communicator/tcp_message_handler.h"
|
||||
|
||||
#include <event2/event.h>
|
||||
#include <event2/bufferevent.h>
|
||||
|
@ -103,4 +103,4 @@ class TcpClient {
|
|||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_TCP_CLIENT_H_
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_CLIENT_H_
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/tcp_message_handler.h"
|
||||
#include "ps/core/communicator/tcp_message_handler.h"
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <iostream>
|
|
@ -14,8 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_TCP_MESSAGE_HANDLER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_TCP_MESSAGE_HANDLER_H_
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_
|
||||
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
|
@ -24,7 +24,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "utils/log_adapter.h"
|
||||
#include "ps/core/message.h"
|
||||
#include "ps/core/communicator/message.h"
|
||||
#include "proto/comm.pb.h"
|
||||
#include "proto/ps.pb.h"
|
||||
|
||||
|
@ -58,4 +58,4 @@ class TcpMessageHandler {
|
|||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_TCP_MESSAGE_HANDLER_H_
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/tcp_server.h"
|
||||
#include "ps/core/communicator/tcp_server.h"
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <event2/buffer.h>
|
|
@ -14,8 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_TCP_SERVER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_TCP_SERVER_H_
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_
|
||||
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/bufferevent.h>
|
||||
|
@ -34,7 +34,7 @@
|
|||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
#include "ps/core/tcp_message_handler.h"
|
||||
#include "ps/core/communicator/tcp_message_handler.h"
|
||||
#include "ps/core/cluster_metadata.h"
|
||||
#include "utils/convert_utils_base.h"
|
||||
#include "ps/core/comm_util.h"
|
||||
|
@ -140,4 +140,4 @@ class TcpServer {
|
|||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_TCP_SERVER_H_
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_SERVER_H_
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/communicator/worker_queue.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
bool WorkerQueue::Initialize(int fd, std::unordered_map<std::string, OnRequestReceive *> handlers) {
|
||||
evbase_ = event_base_new();
|
||||
MS_EXCEPTION_IF_NULL(evbase_);
|
||||
struct evhttp *http = evhttp_new(evbase_);
|
||||
MS_EXCEPTION_IF_NULL(http);
|
||||
int result = evhttp_accept_socket(http, fd);
|
||||
if (result < 0) {
|
||||
MS_LOG(ERROR) << "Evhttp accept socket failed!";
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const auto &handler : handlers) {
|
||||
auto TransFunc = [](struct evhttp_request *req, void *arg) {
|
||||
MS_EXCEPTION_IF_NULL(req);
|
||||
MS_EXCEPTION_IF_NULL(arg);
|
||||
auto httpReq = std::make_shared<HttpMessageHandler>();
|
||||
httpReq->set_request(req);
|
||||
httpReq->InitHttpMessage();
|
||||
OnRequestReceive *func = reinterpret_cast<OnRequestReceive *>(arg);
|
||||
(*func)(httpReq);
|
||||
};
|
||||
|
||||
// O SUCCESS,-1 ALREADY_EXIST,-2 FAILURE
|
||||
int ret = evhttp_set_cb(http, handler.first.c_str(), TransFunc, reinterpret_cast<void *>(handler.second));
|
||||
std::string log_prefix = "Ev http register handle of:";
|
||||
if (ret == 0) {
|
||||
MS_LOG(INFO) << log_prefix << handler.first.c_str() << " success.";
|
||||
} else if (ret == -1) {
|
||||
MS_LOG(WARNING) << log_prefix << handler.first.c_str() << " exist.";
|
||||
} else {
|
||||
MS_LOG(ERROR) << log_prefix << handler.first.c_str() << " failed.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void WorkerQueue::Run() {
|
||||
MS_LOG(INFO) << "Start http server!";
|
||||
MS_EXCEPTION_IF_NULL(evbase_);
|
||||
int ret = event_base_dispatch(evbase_);
|
||||
if (ret == 0) {
|
||||
MS_LOG(INFO) << "Event base dispatch success!";
|
||||
} else if (ret == 1) {
|
||||
MS_LOG(ERROR) << "Event base dispatch failed with no events pending or active!";
|
||||
} else if (ret == -1) {
|
||||
MS_LOG(ERROR) << "Event base dispatch failed with error occurred!";
|
||||
} else {
|
||||
MS_LOG(ERROR) << "Event base dispatch with unexpected error code!";
|
||||
}
|
||||
|
||||
if (evbase_) {
|
||||
event_base_free(evbase_);
|
||||
evbase_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void WorkerQueue::Stop() {
|
||||
MS_LOG(INFO) << "Stop http server!";
|
||||
|
||||
int ret = event_base_loopbreak(evbase_);
|
||||
if (ret != 0) {
|
||||
MS_LOG(EXCEPTION) << "event base loop break failed!";
|
||||
}
|
||||
}
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_
|
||||
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_
|
||||
|
||||
#include <event2/event.h>
|
||||
#include <event2/http.h>
|
||||
#include <event2/http_struct.h>
|
||||
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "utils/log_adapter.h"
|
||||
#include "ps/core/communicator/http_message_handler.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
using OnRequestReceive = std::function<void(std::shared_ptr<HttpMessageHandler>)>;
|
||||
class WorkerQueue {
|
||||
public:
|
||||
WorkerQueue() : evbase_(nullptr) {}
|
||||
virtual ~WorkerQueue() = default;
|
||||
|
||||
bool Initialize(int fd, std::unordered_map<std::string, OnRequestReceive *> handlers);
|
||||
void Run();
|
||||
void Stop();
|
||||
|
||||
private:
|
||||
struct event_base *evbase_;
|
||||
};
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_
|
|
@ -1,175 +0,0 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/http_server.h"
|
||||
#include "ps/core/http_message_handler.h"
|
||||
#include "ps/core/comm_util.h"
|
||||
|
||||
#ifdef WIN32
|
||||
#include <WinSock2.h>
|
||||
#endif
|
||||
#include <arpa/inet.h>
|
||||
#include <event.h>
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/bufferevent.h>
|
||||
#include <event2/bufferevent_compat.h>
|
||||
#include <event2/http.h>
|
||||
#include <event2/http_compat.h>
|
||||
#include <event2/http_struct.h>
|
||||
#include <event2/listener.h>
|
||||
#include <event2/util.h>
|
||||
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <regex>
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
HttpServer::~HttpServer() { Stop(); }
|
||||
|
||||
bool HttpServer::InitServer() {
|
||||
if (!CommUtil::CheckIp(server_address_)) {
|
||||
MS_LOG(EXCEPTION) << "The http server ip:" << server_address_ << " is illegal!";
|
||||
}
|
||||
|
||||
is_stop_ = false;
|
||||
int result = evthread_use_pthreads();
|
||||
if (result != 0) {
|
||||
MS_LOG(EXCEPTION) << "Use event pthread failed!";
|
||||
}
|
||||
event_base_ = event_base_new();
|
||||
MS_EXCEPTION_IF_NULL(event_base_);
|
||||
event_http_ = evhttp_new(event_base_);
|
||||
MS_EXCEPTION_IF_NULL(event_http_);
|
||||
evhttp_set_timeout(event_http_, request_timeout_);
|
||||
int ret = evhttp_bind_socket(event_http_, server_address_.c_str(), server_port_);
|
||||
if (ret != 0) {
|
||||
MS_LOG(EXCEPTION) << "Http bind server addr:" << server_address_.c_str() << " port:" << server_port_ << "failed";
|
||||
}
|
||||
is_init_ = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
void HttpServer::SetTimeOut(int seconds) {
|
||||
MS_EXCEPTION_IF_NULL(event_http_);
|
||||
if (seconds < 0) {
|
||||
MS_LOG(EXCEPTION) << "The timeout seconds:" << seconds << "is less than 0!";
|
||||
}
|
||||
request_timeout_ = seconds;
|
||||
}
|
||||
|
||||
void HttpServer::SetAllowedMethod(u_int16_t methods) {
|
||||
MS_EXCEPTION_IF_NULL(event_http_);
|
||||
evhttp_set_allowed_methods(event_http_, methods);
|
||||
}
|
||||
|
||||
void HttpServer::SetMaxHeaderSize(size_t num) {
|
||||
MS_EXCEPTION_IF_NULL(event_http_);
|
||||
if (num < 0) {
|
||||
MS_LOG(EXCEPTION) << "The header num:" << num << "is less than 0!";
|
||||
}
|
||||
evhttp_set_max_headers_size(event_http_, num);
|
||||
}
|
||||
|
||||
void HttpServer::SetMaxBodySize(size_t num) {
|
||||
MS_EXCEPTION_IF_NULL(event_http_);
|
||||
if (num < 0) {
|
||||
MS_LOG(EXCEPTION) << "The max body num:" << num << "is less than 0!";
|
||||
}
|
||||
evhttp_set_max_body_size(event_http_, num);
|
||||
}
|
||||
|
||||
bool HttpServer::RegisterRoute(const std::string &url, OnRequestReceive *function) {
|
||||
if ((!is_init_) && (!InitServer())) {
|
||||
MS_LOG(EXCEPTION) << "Init http server failed!";
|
||||
}
|
||||
if (!function) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto TransFunc = [](struct evhttp_request *req, void *arg) {
|
||||
MS_EXCEPTION_IF_NULL(req);
|
||||
MS_EXCEPTION_IF_NULL(arg);
|
||||
auto httpReq = std::make_shared<HttpMessageHandler>();
|
||||
httpReq->set_request(req);
|
||||
httpReq->InitHttpMessage();
|
||||
OnRequestReceive *func = reinterpret_cast<OnRequestReceive *>(arg);
|
||||
(*func)(httpReq);
|
||||
};
|
||||
MS_EXCEPTION_IF_NULL(event_http_);
|
||||
|
||||
// O SUCCESS,-1 ALREADY_EXIST,-2 FAILURE
|
||||
int ret = evhttp_set_cb(event_http_, url.c_str(), TransFunc, reinterpret_cast<void *>(function));
|
||||
if (ret == 0) {
|
||||
MS_LOG(INFO) << "Ev http register handle of:" << url.c_str() << " success.";
|
||||
} else if (ret == -1) {
|
||||
MS_LOG(WARNING) << "Ev http register handle of:" << url.c_str() << " exist.";
|
||||
} else {
|
||||
MS_LOG(ERROR) << "Ev http register handle of:" << url.c_str() << " failed.";
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool HttpServer::UnRegisterRoute(const std::string &url) {
|
||||
MS_EXCEPTION_IF_NULL(event_http_);
|
||||
return (evhttp_del_cb(event_http_, url.c_str()) == 0);
|
||||
}
|
||||
|
||||
bool HttpServer::Start() {
|
||||
MS_LOG(INFO) << "Start http server!";
|
||||
MS_EXCEPTION_IF_NULL(event_base_);
|
||||
int ret = event_base_dispatch(event_base_);
|
||||
if (ret == 0) {
|
||||
MS_LOG(INFO) << "Event base dispatch success!";
|
||||
return true;
|
||||
} else if (ret == 1) {
|
||||
MS_LOG(ERROR) << "Event base dispatch failed with no events pending or active!";
|
||||
return false;
|
||||
} else if (ret == -1) {
|
||||
MS_LOG(ERROR) << "Event base dispatch failed with error occurred!";
|
||||
return false;
|
||||
} else {
|
||||
MS_LOG(EXCEPTION) << "Event base dispatch with unexpected error code!";
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void HttpServer::Stop() {
|
||||
MS_LOG(INFO) << "Stop http server!";
|
||||
|
||||
if (!is_stop_.load()) {
|
||||
int ret = event_base_loopbreak(event_base_);
|
||||
if (ret != 0) {
|
||||
MS_LOG(EXCEPTION) << "event base loop break failed!";
|
||||
}
|
||||
if (event_http_) {
|
||||
evhttp_free(event_http_);
|
||||
event_http_ = nullptr;
|
||||
}
|
||||
if (event_base_) {
|
||||
event_base_free(event_base_);
|
||||
event_base_ = nullptr;
|
||||
}
|
||||
is_stop_ = true;
|
||||
}
|
||||
}
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
|
@ -32,8 +32,8 @@
|
|||
|
||||
#include "ps/core/cluster_metadata.h"
|
||||
#include "ps/core/node_info.h"
|
||||
#include "ps/core/tcp_client.h"
|
||||
#include "ps/core/tcp_server.h"
|
||||
#include "ps/core/communicator/tcp_client.h"
|
||||
#include "ps/core/communicator/tcp_server.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
|
|
|
@ -28,8 +28,8 @@
|
|||
#include <unordered_map>
|
||||
|
||||
#include "ps/core/cluster_metadata.h"
|
||||
#include "ps/core/tcp_client.h"
|
||||
#include "ps/core/tcp_server.h"
|
||||
#include "ps/core/communicator/tcp_client.h"
|
||||
#include "ps/core/communicator/tcp_server.h"
|
||||
#include "ps/core/node_manager.h"
|
||||
#include "ps/core/node.h"
|
||||
|
||||
|
|
|
@ -26,8 +26,8 @@
|
|||
#include <vector>
|
||||
|
||||
#include "ps/core/cluster_metadata.h"
|
||||
#include "ps/core/tcp_client.h"
|
||||
#include "ps/core/tcp_server.h"
|
||||
#include "ps/core/communicator/tcp_client.h"
|
||||
#include "ps/core/communicator/tcp_server.h"
|
||||
#include "ps/core/abstract_node.h"
|
||||
|
||||
namespace mindspore {
|
||||
|
|
|
@ -25,8 +25,8 @@
|
|||
#include <algorithm>
|
||||
|
||||
#include "ps/core/cluster_metadata.h"
|
||||
#include "ps/core/tcp_client.h"
|
||||
#include "ps/core/tcp_server.h"
|
||||
#include "ps/core/communicator/tcp_client.h"
|
||||
#include "ps/core/communicator/tcp_server.h"
|
||||
#include "ps/core/abstract_node.h"
|
||||
|
||||
namespace mindspore {
|
||||
|
|
|
@ -1,121 +0,0 @@
|
|||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <memory>
|
||||
|
||||
#include "common/common_test.h"
|
||||
#include "ps/core/http_server.h"
|
||||
#include "ps/core/http_client.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
class TestHttpClient : public UT::Common {
|
||||
public:
|
||||
TestHttpClient() : server_(nullptr), http_server_thread_(nullptr) {}
|
||||
|
||||
virtual ~TestHttpClient() = default;
|
||||
|
||||
OnRequestReceive http_get_func = std::bind(
|
||||
[](std::shared_ptr<HttpMessageHandler> resp) {
|
||||
EXPECT_STREQ(resp->GetUriPath().c_str(), "/httpget");
|
||||
const unsigned char ret[] = "get request success!\n";
|
||||
resp->QuickResponse(200, ret, 22);
|
||||
},
|
||||
std::placeholders::_1);
|
||||
|
||||
OnRequestReceive http_handler_func = std::bind(
|
||||
[](std::shared_ptr<HttpMessageHandler> resp) {
|
||||
std::string host = resp->GetRequestHost();
|
||||
EXPECT_STREQ(host.c_str(), "127.0.0.1");
|
||||
|
||||
std::string path_param = resp->GetPathParam("key1");
|
||||
std::string header_param = resp->GetHeadParam("headerKey");
|
||||
unsigned char *data = nullptr;
|
||||
const uint64_t len = resp->GetPostMsg(&data);
|
||||
char post_message[len + 1];
|
||||
if (memset_s(post_message, len + 1, 0, len + 1) != 0) {
|
||||
MS_LOG(EXCEPTION) << "The memset_s error";
|
||||
}
|
||||
if (memcpy_s(post_message, len, data, len) != 0) {
|
||||
MS_LOG(EXCEPTION) << "The memset_s error";
|
||||
}
|
||||
MS_LOG(WARNING) << "The path param:" << path_param;
|
||||
MS_LOG(WARNING) << "The header param:" << header_param;
|
||||
|
||||
const std::string rKey("headKey");
|
||||
const std::string rVal("headValue");
|
||||
const std::string rBody("post request success!\n");
|
||||
resp->AddRespHeadParam(rKey, rVal);
|
||||
resp->AddRespString(rBody);
|
||||
|
||||
resp->SetRespCode(200);
|
||||
resp->SendResponse();
|
||||
},
|
||||
std::placeholders::_1);
|
||||
|
||||
void SetUp() override {
|
||||
server_ = std::make_unique<HttpServer>("0.0.0.0", 9999);
|
||||
|
||||
server_->RegisterRoute("/httpget", &http_get_func);
|
||||
server_->RegisterRoute("/handler", &http_handler_func);
|
||||
http_server_thread_ = std::make_unique<std::thread>([&]() { server_->Start(); });
|
||||
http_server_thread_->detach();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
server_->Stop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<HttpServer> server_;
|
||||
std::unique_ptr<std::thread> http_server_thread_;
|
||||
};
|
||||
|
||||
TEST_F(TestHttpClient, Get) {
|
||||
HttpClient client;
|
||||
std::map<std::string, std::string> headers = {{"headerKey", "headerValue"}};
|
||||
auto output = std::make_shared<std::vector<char>>();
|
||||
auto ret = client.Get("http://127.0.0.1:9999/httpget", output, headers);
|
||||
|
||||
MS_LOG(WARNING) << "The get output:" << output->data();
|
||||
EXPECT_EQ(Status::OK, ret);
|
||||
}
|
||||
|
||||
TEST_F(TestHttpClient, Post) {
|
||||
HttpClient client;
|
||||
std::map<std::string, std::string> headers = {{"headerKey", "headerValue"}};
|
||||
auto output = std::make_shared<std::vector<char>>();
|
||||
std::string post_data = "postKey=postValue";
|
||||
auto ret =
|
||||
client.Post("http://127.0.0.1:9999/handler?key1=value1", post_data.c_str(), post_data.length(), output, headers);
|
||||
MS_LOG(WARNING) << "The post output:" << output->data();
|
||||
EXPECT_EQ(Status::OK, ret);
|
||||
}
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
|
@ -1,162 +0,0 @@
|
|||
/**
|
||||
* Copyright 2020 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/http_server.h"
|
||||
#include "common/common_test.h"
|
||||
#include <gtest/gtest.h>
|
||||
#include <algorithm>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <memory>
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
namespace core {
|
||||
|
||||
class TestHttpServer : public UT::Common {
|
||||
public:
|
||||
TestHttpServer() : server_(nullptr) {}
|
||||
|
||||
virtual ~TestHttpServer() = default;
|
||||
|
||||
static void testGetHandler(std::shared_ptr<HttpMessageHandler> resp) {
|
||||
std::string host = resp->GetRequestHost();
|
||||
EXPECT_STREQ(host.c_str(), "127.0.0.1");
|
||||
|
||||
std::string path_param = resp->GetPathParam("key1");
|
||||
std::string header_param = resp->GetHeadParam("headerKey");
|
||||
unsigned char *data = nullptr;
|
||||
const uint64_t len = resp->GetPostMsg(&data);
|
||||
char post_message[len + 1];
|
||||
if (memset_s(post_message, len + 1, 0, len + 1) != 0) {
|
||||
MS_LOG(EXCEPTION) << "The memset_s error";
|
||||
}
|
||||
if (memcpy_s(post_message, len, data, len) != 0) {
|
||||
MS_LOG(EXCEPTION) << "The memset_s error";
|
||||
}
|
||||
EXPECT_STREQ(path_param.c_str(), "value1");
|
||||
EXPECT_STREQ(header_param.c_str(), "headerValue");
|
||||
EXPECT_STREQ(post_message, "postKey=postValue");
|
||||
|
||||
const std::string rKey("headKey");
|
||||
const std::string rVal("headValue");
|
||||
const std::string rBody("post request success!\n");
|
||||
resp->AddRespHeadParam(rKey, rVal);
|
||||
resp->AddRespString(rBody);
|
||||
|
||||
resp->SetRespCode(200);
|
||||
resp->SendResponse();
|
||||
}
|
||||
|
||||
void SetUp() override {
|
||||
server_ = std::make_unique<HttpServer>("0.0.0.0", 9999);
|
||||
OnRequestReceive http_get_func = std::bind(
|
||||
[](std::shared_ptr<HttpMessageHandler> resp) {
|
||||
EXPECT_STREQ(resp->GetPathParam("key1").c_str(), "value1");
|
||||
EXPECT_STREQ(resp->GetUriQuery().c_str(), "key1=value1");
|
||||
EXPECT_STREQ(resp->GetRequestUri().c_str(), "/httpget?key1=value1");
|
||||
EXPECT_STREQ(resp->GetUriPath().c_str(), "/httpget");
|
||||
const unsigned char ret[] = "get request success!\n";
|
||||
resp->QuickResponse(200, ret, 22);
|
||||
},
|
||||
std::placeholders::_1);
|
||||
|
||||
OnRequestReceive http_handler_func = std::bind(
|
||||
[](std::shared_ptr<HttpMessageHandler> resp) {
|
||||
std::string host = resp->GetRequestHost();
|
||||
EXPECT_STREQ(host.c_str(), "127.0.0.1");
|
||||
|
||||
std::string path_param = resp->GetPathParam("key1");
|
||||
std::string header_param = resp->GetHeadParam("headerKey");
|
||||
std::string post_param = resp->GetPostParam("postKey");
|
||||
unsigned char *data = nullptr;
|
||||
const uint64_t len = resp->GetPostMsg(&data);
|
||||
char post_message[len + 1];
|
||||
if (memset_s(post_message, len + 1, 0, len + 1) != 0) {
|
||||
MS_LOG(EXCEPTION) << "The memset_s error";
|
||||
}
|
||||
if (memcpy_s(post_message, len, data, len) != 0) {
|
||||
MS_LOG(EXCEPTION) << "The memset_s error";
|
||||
}
|
||||
MS_LOG(WARNING) << "The Path param:" << path_param;
|
||||
MS_LOG(WARNING) << "The header param:" << header_param;
|
||||
|
||||
const std::string rKey("headKey");
|
||||
const std::string rVal("headValue");
|
||||
const std::string rBody("post request success!\n");
|
||||
resp->AddRespHeadParam(rKey, rVal);
|
||||
resp->AddRespString(rBody);
|
||||
|
||||
resp->SetRespCode(200);
|
||||
resp->SendResponse();
|
||||
},
|
||||
std::placeholders::_1);
|
||||
server_->RegisterRoute("/httpget", &http_get_func);
|
||||
server_->RegisterRoute("/handler", &http_handler_func);
|
||||
std::unique_ptr<std::thread> http_server_thread_(nullptr);
|
||||
http_server_thread_ = std::make_unique<std::thread>([&]() { server_->Start(); });
|
||||
http_server_thread_->detach();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
|
||||
server_->Stop();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<HttpServer> server_;
|
||||
};
|
||||
|
||||
TEST_F(TestHttpServer, httpGetRequest) {
|
||||
char buffer[100];
|
||||
FILE *file;
|
||||
std::string cmd = "curl -X GET http://127.0.0.1:9999/httpget?key1=value1";
|
||||
std::string result;
|
||||
const char *sysCommand = cmd.data();
|
||||
if ((file = popen(sysCommand, "r")) == nullptr) {
|
||||
return;
|
||||
}
|
||||
while (fgets(buffer, sizeof(buffer) - 1, file) != nullptr) {
|
||||
result += buffer;
|
||||
}
|
||||
MS_LOG(WARNING) << "The get output:" << result.c_str();
|
||||
pclose(file);
|
||||
}
|
||||
|
||||
TEST_F(TestHttpServer, messageHandler) {
|
||||
char buffer[100];
|
||||
FILE *file;
|
||||
std::string cmd =
|
||||
R"(curl -X POST -d 'postKey=postValue' -i -H "Accept: application/json" -H "headerKey: headerValue" http://127.0.0.1:9999/handler?key1=value1)";
|
||||
std::string result;
|
||||
const char *sysCommand = cmd.data();
|
||||
if ((file = popen(sysCommand, "r")) == nullptr) {
|
||||
return;
|
||||
}
|
||||
while (fgets(buffer, sizeof(buffer) - 1, file) != nullptr) {
|
||||
result += buffer;
|
||||
}
|
||||
MS_LOG(WARNING) << "The post output:" << result.substr(result.find("post")).c_str();
|
||||
pclose(file);
|
||||
}
|
||||
} // namespace core
|
||||
} // namespace ps
|
||||
} // namespace mindspore
|
|
@ -17,7 +17,7 @@
|
|||
#include <memory>
|
||||
|
||||
#include "common/common_test.h"
|
||||
#include "ps/core/tcp_client.h"
|
||||
#include "ps/core/communicator/tcp_client.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace ps {
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/tcp_message_handler.h"
|
||||
#include "ps/core/communicator/tcp_message_handler.h"
|
||||
#include "common/common_test.h"
|
||||
|
||||
#include <memory>
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ps/core/tcp_client.h"
|
||||
#include "ps/core/tcp_server.h"
|
||||
#include "ps/core/communicator/tcp_client.h"
|
||||
#include "ps/core/communicator/tcp_server.h"
|
||||
#include "common/common_test.h"
|
||||
|
||||
#include <memory>
|
||||
|
|
Loading…
Reference in New Issue