add http communicator

This commit is contained in:
chendongsheng 2021-04-27 10:14:44 +08:00
parent c04e8fce4d
commit 05d3ae2219
15 changed files with 478 additions and 9 deletions

View File

@ -12,6 +12,10 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)))
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_client.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_message_handler.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_server.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/communicator_base.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_communicator.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_msg_handler.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_msg_handler.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/node.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/node_manager.cc")
list(REMOVE_ITEM _PS_SRC_FILES "ps_cache/ps_cache_manager.cc")

View File

@ -0,0 +1,41 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ps/core/communicator/communicator_base.h"
#include <memory>
namespace mindspore {
namespace ps {
namespace core {
bool CommunicatorBase::SendResponse(const void *rsp_data, size_t rsp_len, std::shared_ptr<MessageHandler> msg_handler) {
// The rsp_len could be 0 because of ProtoBuffer's feature.
if (rsp_data == nullptr || msg_handler == nullptr) {
MS_LOG(ERROR) << "SendResponse inputs are invalid.";
return false;
}
return msg_handler->SendResponse(rsp_data, rsp_len);
}
void CommunicatorBase::Join() {
if (!running_thread_.joinable()) {
MS_LOG(EXCEPTION) << "The running thread of communicator is not joinable.";
return;
}
running_thread_.join();
return;
}
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -0,0 +1,68 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_COMMUNICATOR_BASE_H_
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_COMMUNICATOR_BASE_H_
#include <string>
#include <memory>
#include <unordered_map>
#include <functional>
#include <thread>
#include "ps/core/communicator/message_handler.h"
#include "utils/log_adapter.h"
#include "ps/core/communicator/http_message_handler.h"
#include "ps/core/communicator/tcp_server.h"
#include "ps/core/node_info.h"
#include "ps/constants.h"
namespace mindspore {
namespace ps {
namespace core {
// CommunicatorBase is used to receive request and send response for server.
// It is the base class of HttpCommunicator and TcpCommunicator.
class CommunicatorBase {
public:
using MessageCallback = std::function<void(std::shared_ptr<MessageHandler>)>;
using HttpMsgCallback = std::function<void(std::shared_ptr<HttpMessageHandler>)>;
using OnNodeEventCallback = std::function<void(const NodeEvent &)>;
using TcpMsgCallBack = std::function<void(std::shared_ptr<core::TcpConnection> conn,
std::shared_ptr<core::MessageMeta> meta, DataPtr data, size_t size)>;
using CertainEventCallBack = std::function<void(void)>;
CommunicatorBase() = default;
virtual ~CommunicatorBase() = default;
virtual bool Start() = 0;
virtual bool Stop() = 0;
// You need to call the Start() function before calling the Join() function, it will block server's main thread.
// if you want to exit the Join() function, then you should call the Stop() function in another thread.
void Join();
virtual void RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) = 0;
bool SendResponse(const void *rsp_data, size_t rsp_len, std::shared_ptr<MessageHandler> msg_handler);
protected:
std::unordered_map<std::string, MessageCallback> msg_callbacks_;
std::thread running_thread_;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_COMMUNICATOR_BASE_H_

View File

@ -0,0 +1,70 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ps/core/communicator/http_communicator.h"
#include <memory>
#include "common/thread_pool.h"
namespace mindspore {
namespace ps {
namespace core {
bool HttpCommunicator::Start() {
MS_LOG(INFO) << "Initialize http server IP:" << ip_ << ", PORT:" << port_;
http_server_ = std::make_shared<HttpServer>(ip_, port_, 32);
http_server_->InitServer();
MS_EXCEPTION_IF_NULL(http_server_);
if (!http_server_->Start()) {
MS_LOG(EXCEPTION) << "Http server starting failed.";
}
MS_LOG(INFO) << "Http communicator started.";
running_thread_ = std::thread([&]() {
try {
http_server_->Wait();
} catch (const std::exception &e) {
MsException::Instance().SetException();
}
});
return true;
}
bool HttpCommunicator::Stop() {
MS_EXCEPTION_IF_NULL(http_server_);
return http_server_->Stop();
}
void HttpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) {
msg_callbacks_[msg_type] = cb;
http_msg_callbacks_[msg_type] = std::bind(
[&](std::shared_ptr<HttpMessageHandler> http_msg) -> void {
std::shared_ptr<MessageHandler> http_msg_handler = std::make_shared<HttpMsgHandler>(http_msg);
MS_EXCEPTION_IF_NULL(http_msg_handler);
msg_callbacks_[msg_type](http_msg_handler);
return;
},
std::placeholders::_1);
std::string url = "/";
url += msg_type;
bool is_succeed = http_server_->RegisterRoute(url, &http_msg_callbacks_[msg_type]);
if (!is_succeed) {
MS_LOG(EXCEPTION) << "Http server register handler for url " << url << " failed.";
}
return;
}
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -0,0 +1,55 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_COMMUNICATOR_H_
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_COMMUNICATOR_H_
#include <string>
#include <memory>
#include <unordered_map>
#include "ps/core/communicator/http_server.h"
#include "ps/core/communicator/http_message_handler.h"
#include "ps/core/communicator/task_executor.h"
#include "ps/core/communicator/communicator_base.h"
#include "ps/core/communicator/http_msg_handler.h"
#include "utils/ms_exception.h"
namespace mindspore {
namespace ps {
namespace core {
class HttpCommunicator : public CommunicatorBase {
public:
explicit HttpCommunicator(const std::string &ip, std::int16_t port,
const std::shared_ptr<TaskExecutor> &task_executor)
: task_executor_(task_executor), http_server_(nullptr), ip_(ip), port_(port) {}
~HttpCommunicator() = default;
bool Start() override;
bool Stop() override;
void RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) override;
private:
std::shared_ptr<TaskExecutor> task_executor_;
std::shared_ptr<HttpServer> http_server_;
std::unordered_map<std::string, HttpMsgCallback> http_msg_callbacks_;
std::string ip_;
std::int16_t port_;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_COMMUNICATOR_H_

View File

@ -0,0 +1,43 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ps/core/communicator/http_msg_handler.h"
#include <memory>
namespace mindspore {
namespace ps {
namespace core {
HttpMsgHandler::HttpMsgHandler(std::shared_ptr<HttpMessageHandler> http_msg)
: http_msg_(http_msg), data_(nullptr), len_(0) {
len_ = http_msg_->GetPostMsg(&data_);
}
void *HttpMsgHandler::data() const {
if (data_ == nullptr) {
MS_LOG(ERROR) << "HttpMsgHandler data is nullptr.";
}
return data_;
}
size_t HttpMsgHandler::len() const { return len_; }
bool HttpMsgHandler::SendResponse(const void *data, const size_t &len) {
http_msg_->QuickResponse(200, reinterpret_cast<unsigned char *>(const_cast<void *>(data)), len);
return true;
}
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -0,0 +1,44 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MSG_HANDLER_H_
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MSG_HANDLER_H_
#include <memory>
#include "ps/core/communicator/http_message_handler.h"
#include "ps/core/communicator/message_handler.h"
namespace mindspore {
namespace ps {
namespace core {
class HttpMsgHandler : public MessageHandler {
public:
explicit HttpMsgHandler(std::shared_ptr<HttpMessageHandler> http_msg);
~HttpMsgHandler() override = default;
void *data() const override;
size_t len() const override;
bool SendResponse(const void *data, const size_t &len) override;
private:
std::shared_ptr<HttpMessageHandler> http_msg_;
unsigned char *data_;
size_t len_;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MSG_HANDLER_H_

View File

@ -99,13 +99,15 @@ void HttpRequestHandler::Run() {
}
}
void HttpRequestHandler::Stop() {
bool HttpRequestHandler::Stop() {
MS_LOG(INFO) << "Stop http server!";
int ret = event_base_loopbreak(evbase_);
if (ret != 0) {
MS_LOG(EXCEPTION) << "event base loop break failed!";
MS_LOG(ERROR) << "event base loop break failed!";
return false;
}
return true;
}
bufferevent *HttpRequestHandler::BuffereventCallback(event_base *base, void *arg) {

View File

@ -47,7 +47,7 @@ class HttpRequestHandler {
bool Initialize(int fd, const std::unordered_map<std::string, OnRequestReceive *> &handlers);
void Run();
void Stop();
bool Stop();
static bufferevent *BuffereventCallback(event_base *base, void *arg);
private:

View File

@ -134,15 +134,20 @@ bool HttpServer::Wait() {
return true;
}
void HttpServer::Stop() {
bool HttpServer::Stop() {
MS_LOG(INFO) << "Stop http server!";
bool result = true;
if (!is_stop_.load()) {
for (size_t i = 0; i < thread_num_; i++) {
http_request_handlers[i]->Stop();
bool res = http_request_handlers[i]->Stop();
if (res == false) {
result = false;
}
}
is_stop_ = true;
}
return result;
}
} // namespace core
} // namespace ps

View File

@ -66,7 +66,7 @@ class HttpServer {
bool Start();
bool Wait();
void Stop();
bool Stop();
private:
std::string server_address_;

View File

@ -0,0 +1,41 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_HANDLER_H_
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_HANDLER_H_
namespace mindspore {
namespace ps {
namespace core {
// MessageHandler class is used to handle requests from clients and send response from server.
// It's the base class of HttpMsgHandler and TcpMsgHandler.
class MessageHandler {
public:
MessageHandler() = default;
virtual ~MessageHandler() = default;
// Raw data of this message in bytes.
virtual void *data() const = 0;
// Raw data size of this message.(Number of bytes)
virtual size_t len() const = 0;
virtual bool SendResponse(const void *data, const size_t &len) = 0;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_HANDLER_H_

View File

@ -0,0 +1,46 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ps/core/communicator/tcp_msg_handler.h"
#include <memory>
namespace mindspore {
namespace ps {
namespace core {
TcpMsgHandler::TcpMsgHandler(ServerNode *server_node, std::shared_ptr<core::TcpConnection> conn,
std::shared_ptr<MessageMeta> meta, DataPtr data, size_t size)
: server_node_(server_node), tcp_conn_(conn), meta_(meta), data_ptr_(data), data_(nullptr), len_(size) {
if (data_ptr_ != nullptr) {
data_ = data_ptr_.get();
}
}
void *TcpMsgHandler::data() const {
if (data_ == nullptr) {
MS_LOG(ERROR) << "TcpMsgHandler data is nullptr.";
}
return data_;
}
size_t TcpMsgHandler::len() const { return len_; }
bool TcpMsgHandler::SendResponse(const void *data, const size_t &len) {
server_node_->Response(tcp_conn_, meta_, const_cast<void *>(data), len);
return true;
}
} // namespace core
} // namespace ps
} // namespace mindspore

View File

@ -0,0 +1,52 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MSG_HANDLER_H_
#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MSG_HANDLER_H_
#include <memory>
#include "proto/ps.pb.h"
#include "ps/core/server_node.h"
#include "ps/core/communicator/message_handler.h"
#include "ps/constants.h"
namespace mindspore {
namespace ps {
namespace core {
class TcpMsgHandler : public MessageHandler {
public:
TcpMsgHandler(ServerNode *server_node, std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta,
DataPtr data, size_t size);
~TcpMsgHandler() override = default;
void *data() const override;
size_t len() const override;
bool SendResponse(const void *data, const size_t &len) override;
private:
ServerNode *server_node_;
std::shared_ptr<TcpConnection> tcp_conn_;
// core::MessageMeta is used for server to get the user command and to find communication peer when responding.
std::shared_ptr<MessageMeta> meta_;
// We use data of shared_ptr array so that the raw pointer won't be released until the reference is 0.
DataPtr data_ptr_;
void *data_;
size_t len_;
};
} // namespace core
} // namespace ps
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MSG_HANDLER_H_

View File

@ -161,8 +161,6 @@ list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/scheduler.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/optimizer_info.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/optimizer_info_builder.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/worker.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/core/communicator/http_request_handler.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/core/communicator/http_server.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/parameter_server.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/ps_cache/gpu/gpu_ps_cache.cc")
list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/ps_cache/ascend/ascend_ps_cache.cc")
@ -191,7 +189,7 @@ endif()
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
target_link_libraries(ut_tests PRIVATE mindspore::gtest mindspore::event mindspore::event_pthreads
mindspore_gvar ${PYTHON_LIBRARIES} pthread util dl)
mindspore::event_openssl mindspore_gvar ${PYTHON_LIBRARIES} pthread util dl)
if(ENABLE_MINDDATA)
# AUX_SOURCE_DIRECTORY(LITE_CV_FILES)