forked from mindspore-Ecosystem/mindspore
FL, codex and pclint fix
This commit is contained in:
parent
2b8e5f98f3
commit
058801126b
|
@ -53,7 +53,7 @@ bool HttpCommunicator::Stop() {
|
||||||
void HttpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) {
|
void HttpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) {
|
||||||
MS_LOG(INFO) << "msg_type is: " << msg_type;
|
MS_LOG(INFO) << "msg_type is: " << msg_type;
|
||||||
msg_callbacks_[msg_type] = cb;
|
msg_callbacks_[msg_type] = cb;
|
||||||
http_msg_callbacks_[msg_type] = [&](std::shared_ptr<HttpMessageHandler> http_msg) -> void {
|
http_msg_callbacks_[msg_type] = [this, msg_type](std::shared_ptr<HttpMessageHandler> http_msg) -> void {
|
||||||
MS_EXCEPTION_IF_NULL(http_msg);
|
MS_EXCEPTION_IF_NULL(http_msg);
|
||||||
try {
|
try {
|
||||||
size_t len = 0;
|
size_t len = 0;
|
||||||
|
|
|
@ -209,7 +209,7 @@ bool HttpMessageHandler::GetPostMsg(size_t *len, uint8_t **buffer) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
*len = evbuffer_get_length(event_request_->input_buffer);
|
*len = evbuffer_get_length(event_request_->input_buffer);
|
||||||
constexpr size_t max_http_bytes_len = UINT32_MAX; // 4GB
|
const size_t max_http_bytes_len = UINT32_MAX; // 4GB
|
||||||
if (*len == 0 || *len > max_http_bytes_len) {
|
if (*len == 0 || *len > max_http_bytes_len) {
|
||||||
MS_LOG(ERROR) << "The post message length " << max_http_bytes_len << " is invalid!";
|
MS_LOG(ERROR) << "The post message length " << max_http_bytes_len << " is invalid!";
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
namespace mindspore {
|
namespace mindspore {
|
||||||
namespace ps {
|
namespace ps {
|
||||||
namespace core {
|
namespace core {
|
||||||
HttpMsgHandler::HttpMsgHandler(const std::shared_ptr<HttpMessageHandler> &http_msg, uint8_t *data, size_t len)
|
HttpMsgHandler::HttpMsgHandler(const std::shared_ptr<HttpMessageHandler> &http_msg, uint8_t *const data, size_t len)
|
||||||
: http_msg_(http_msg), data_(data), len_(len) {}
|
: http_msg_(http_msg), data_(data), len_(len) {}
|
||||||
|
|
||||||
void *HttpMsgHandler::data() const {
|
void *HttpMsgHandler::data() const {
|
||||||
|
|
|
@ -36,7 +36,7 @@ class HttpMsgHandler : public MessageHandler {
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<HttpMessageHandler> http_msg_;
|
std::shared_ptr<HttpMessageHandler> http_msg_;
|
||||||
unsigned char *data_;
|
uint8_t *data_;
|
||||||
size_t len_;
|
size_t len_;
|
||||||
};
|
};
|
||||||
} // namespace core
|
} // namespace core
|
||||||
|
|
|
@ -165,7 +165,7 @@ void TcpClient::SetTcpNoDelay(const evutil_socket_t &fd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpClient::TimeoutCallback(evutil_socket_t, std::int16_t, void *arg) {
|
void TcpClient::TimeoutCallback(evutil_socket_t, std::int16_t, void *const arg) {
|
||||||
try {
|
try {
|
||||||
TimeoutCallbackInner(arg);
|
TimeoutCallbackInner(arg);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
|
@ -173,13 +173,13 @@ void TcpClient::TimeoutCallback(evutil_socket_t, std::int16_t, void *arg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpClient::TimeoutCallbackInner(void *arg) {
|
void TcpClient::TimeoutCallbackInner(void *const arg) {
|
||||||
MS_EXCEPTION_IF_NULL(arg);
|
MS_EXCEPTION_IF_NULL(arg);
|
||||||
auto tcp_client = reinterpret_cast<TcpClient *>(arg);
|
auto tcp_client = reinterpret_cast<TcpClient *>(arg);
|
||||||
tcp_client->Init();
|
tcp_client->Init();
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpClient::ReadCallback(struct bufferevent *bev, void *ctx) {
|
void TcpClient::ReadCallback(struct bufferevent *bev, void *const ctx) {
|
||||||
try {
|
try {
|
||||||
ReadCallbackInner(bev, ctx);
|
ReadCallbackInner(bev, ctx);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
|
@ -187,7 +187,7 @@ void TcpClient::ReadCallback(struct bufferevent *bev, void *ctx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpClient::ReadCallbackInner(struct bufferevent *bev, void *ctx) {
|
void TcpClient::ReadCallbackInner(struct bufferevent *bev, void *const ctx) {
|
||||||
MS_EXCEPTION_IF_NULL(bev);
|
MS_EXCEPTION_IF_NULL(bev);
|
||||||
MS_EXCEPTION_IF_NULL(ctx);
|
MS_EXCEPTION_IF_NULL(ctx);
|
||||||
auto tcp_client = reinterpret_cast<TcpClient *>(ctx);
|
auto tcp_client = reinterpret_cast<TcpClient *>(ctx);
|
||||||
|
@ -234,7 +234,7 @@ bool TcpClient::EstablishSSL() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpClient::EventCallback(struct bufferevent *bev, std::int16_t events, void *ptr) {
|
void TcpClient::EventCallback(struct bufferevent *bev, std::int16_t events, void *const ptr) {
|
||||||
try {
|
try {
|
||||||
EventCallbackInner(bev, events, ptr);
|
EventCallbackInner(bev, events, ptr);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
|
@ -242,7 +242,7 @@ void TcpClient::EventCallback(struct bufferevent *bev, std::int16_t events, void
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpClient::EventCallbackInner(struct bufferevent *bev, std::int16_t events, void *ptr) {
|
void TcpClient::EventCallbackInner(struct bufferevent *bev, std::int16_t events, void *const ptr) {
|
||||||
MS_EXCEPTION_IF_NULL(bev);
|
MS_EXCEPTION_IF_NULL(bev);
|
||||||
MS_EXCEPTION_IF_NULL(ptr);
|
MS_EXCEPTION_IF_NULL(ptr);
|
||||||
auto tcp_client = reinterpret_cast<TcpClient *>(ptr);
|
auto tcp_client = reinterpret_cast<TcpClient *>(ptr);
|
||||||
|
|
|
@ -245,7 +245,7 @@ void TcpServer::RemoveConnection(const evutil_socket_t &fd) {
|
||||||
std::shared_ptr<TcpConnection> TcpServer::GetConnectionByFd(const evutil_socket_t &fd) { return connections_[fd]; }
|
std::shared_ptr<TcpConnection> TcpServer::GetConnectionByFd(const evutil_socket_t &fd) { return connections_[fd]; }
|
||||||
|
|
||||||
void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, struct sockaddr *sockaddr, int,
|
void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, struct sockaddr *sockaddr, int,
|
||||||
void *data) {
|
void *const data) {
|
||||||
try {
|
try {
|
||||||
ListenerCallbackInner(fd, sockaddr, data);
|
ListenerCallbackInner(fd, sockaddr, data);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
|
@ -253,7 +253,7 @@ void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, st
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpServer::ListenerCallbackInner(evutil_socket_t fd, struct sockaddr *sockaddr, void *data) {
|
void TcpServer::ListenerCallbackInner(evutil_socket_t fd, struct sockaddr *sockaddr, void *const data) {
|
||||||
auto server = reinterpret_cast<class TcpServer *>(data);
|
auto server = reinterpret_cast<class TcpServer *>(data);
|
||||||
MS_EXCEPTION_IF_NULL(server);
|
MS_EXCEPTION_IF_NULL(server);
|
||||||
auto base = reinterpret_cast<struct event_base *>(server->base_);
|
auto base = reinterpret_cast<struct event_base *>(server->base_);
|
||||||
|
@ -320,7 +320,7 @@ std::shared_ptr<TcpConnection> TcpServer::onCreateConnection(struct bufferevent
|
||||||
|
|
||||||
OnServerReceiveMessage TcpServer::GetServerReceive() const { return message_callback_; }
|
OnServerReceiveMessage TcpServer::GetServerReceive() const { return message_callback_; }
|
||||||
|
|
||||||
void TcpServer::SignalCallback(evutil_socket_t, std::int16_t, void *data) {
|
void TcpServer::SignalCallback(evutil_socket_t, std::int16_t, void *const data) {
|
||||||
try {
|
try {
|
||||||
SignalCallbackInner(data);
|
SignalCallbackInner(data);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
|
@ -328,7 +328,7 @@ void TcpServer::SignalCallback(evutil_socket_t, std::int16_t, void *data) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpServer::SignalCallbackInner(void *data) {
|
void TcpServer::SignalCallbackInner(void *const data) {
|
||||||
MS_EXCEPTION_IF_NULL(data);
|
MS_EXCEPTION_IF_NULL(data);
|
||||||
auto server = reinterpret_cast<class TcpServer *>(data);
|
auto server = reinterpret_cast<class TcpServer *>(data);
|
||||||
struct event_base *base = server->base_;
|
struct event_base *base = server->base_;
|
||||||
|
@ -340,7 +340,7 @@ void TcpServer::SignalCallbackInner(void *data) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpServer::ReadCallback(struct bufferevent *bev, void *connection) {
|
void TcpServer::ReadCallback(struct bufferevent *bev, void *const connection) {
|
||||||
try {
|
try {
|
||||||
ReadCallbackInner(bev, connection);
|
ReadCallbackInner(bev, connection);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
|
@ -348,7 +348,7 @@ void TcpServer::ReadCallback(struct bufferevent *bev, void *connection) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpServer::ReadCallbackInner(struct bufferevent *bev, void *connection) {
|
void TcpServer::ReadCallbackInner(struct bufferevent *bev, void *const connection) {
|
||||||
MS_EXCEPTION_IF_NULL(bev);
|
MS_EXCEPTION_IF_NULL(bev);
|
||||||
MS_EXCEPTION_IF_NULL(connection);
|
MS_EXCEPTION_IF_NULL(connection);
|
||||||
|
|
||||||
|
@ -365,7 +365,7 @@ void TcpServer::ReadCallbackInner(struct bufferevent *bev, void *connection) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpServer::EventCallback(struct bufferevent *bev, std::int16_t events, void *data) {
|
void TcpServer::EventCallback(struct bufferevent *bev, std::int16_t events, void *const data) {
|
||||||
try {
|
try {
|
||||||
EventCallbackInner(bev, events, data);
|
EventCallbackInner(bev, events, data);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
|
@ -373,7 +373,7 @@ void TcpServer::EventCallback(struct bufferevent *bev, std::int16_t events, void
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TcpServer::EventCallbackInner(struct bufferevent *bev, std::int16_t events, void *data) {
|
void TcpServer::EventCallbackInner(struct bufferevent *bev, std::int16_t events, void *const data) {
|
||||||
MS_EXCEPTION_IF_NULL(bev);
|
MS_EXCEPTION_IF_NULL(bev);
|
||||||
MS_EXCEPTION_IF_NULL(data);
|
MS_EXCEPTION_IF_NULL(data);
|
||||||
struct evbuffer *output = bufferevent_get_output(bev);
|
struct evbuffer *output = bufferevent_get_output(bev);
|
||||||
|
|
|
@ -92,8 +92,10 @@ void SchedulerNode::RunRecovery() {
|
||||||
node_manager_.set_next_server_rank_id(clusterConfig.initial_next_server_rank_id);
|
node_manager_.set_next_server_rank_id(clusterConfig.initial_next_server_rank_id);
|
||||||
node_manager_.set_total_node_num(clusterConfig.initial_total_node_num);
|
node_manager_.set_total_node_num(clusterConfig.initial_total_node_num);
|
||||||
|
|
||||||
for (const auto kvs : initial_node_infos) {
|
for (const auto &kvs : initial_node_infos) {
|
||||||
auto client = std::make_shared<TcpClient>(kvs.second.ip_, kvs.second.port_, config_.get());
|
auto &node_id = kvs.first;
|
||||||
|
auto &node_info = kvs.second;
|
||||||
|
auto client = std::make_shared<TcpClient>(node_info.ip_, node_info.port_, config_.get());
|
||||||
client->SetMessageCallback(
|
client->SetMessageCallback(
|
||||||
[&](const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size) {
|
[&](const std::shared_ptr<MessageMeta> &meta, const Protos &protos, const void *data, size_t size) {
|
||||||
MS_LOG(INFO) << "received the response. ";
|
MS_LOG(INFO) << "received the response. ";
|
||||||
|
@ -106,20 +108,20 @@ void SchedulerNode::RunRecovery() {
|
||||||
MS_EXCEPTION_IF_NULL(message_meta);
|
MS_EXCEPTION_IF_NULL(message_meta);
|
||||||
message_meta->set_cmd(NodeCommand::SCHEDULER_RECOVERY);
|
message_meta->set_cmd(NodeCommand::SCHEDULER_RECOVERY);
|
||||||
|
|
||||||
int rank_id = kvs.second.rank_id_;
|
auto rank_id = node_info.rank_id_;
|
||||||
SendMetadataMessage scheduler_recovery_message;
|
SendMetadataMessage scheduler_recovery_message;
|
||||||
scheduler_recovery_message.set_worker_num(worker_num);
|
scheduler_recovery_message.set_worker_num(worker_num);
|
||||||
scheduler_recovery_message.set_server_num(server_num);
|
scheduler_recovery_message.set_server_num(server_num);
|
||||||
scheduler_recovery_message.set_rank_id(rank_id);
|
scheduler_recovery_message.set_rank_id(rank_id);
|
||||||
if (!SendMessageSync(client, message_meta, Protos::PROTOBUF, scheduler_recovery_message.SerializeAsString().data(),
|
if (!SendMessageSync(client, message_meta, Protos::PROTOBUF, scheduler_recovery_message.SerializeAsString().data(),
|
||||||
scheduler_recovery_message.ByteSizeLong())) {
|
scheduler_recovery_message.ByteSizeLong())) {
|
||||||
if (kvs.second.node_role_ == NodeRole::WORKER) {
|
if (node_info.node_role_ == NodeRole::WORKER) {
|
||||||
is_worker_timeout_ = true;
|
is_worker_timeout_ = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
MS_LOG(WARNING) << "Scheduler send recovery msg to " << kvs.first << " timeout!";
|
MS_LOG(WARNING) << "Scheduler send recovery msg to " << node_id << " timeout!";
|
||||||
} else {
|
} else {
|
||||||
MS_LOG(INFO) << "Scheduler send recovery msg to " << kvs.first << " successful.";
|
MS_LOG(INFO) << "Scheduler send recovery msg to " << node_id << " successful.";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MS_LOG(INFO) << "Scheduler recovery finish.";
|
MS_LOG(INFO) << "Scheduler recovery finish.";
|
||||||
|
@ -1348,7 +1350,7 @@ void SchedulerNode::BroadcastTimeoutEvent() {
|
||||||
auto initial_node_infos = clusterConfig.initial_registered_nodes_infos;
|
auto initial_node_infos = clusterConfig.initial_registered_nodes_infos;
|
||||||
const uint32_t event = static_cast<uint32_t>(ps::UserDefineEvent::kNodeTimeout);
|
const uint32_t event = static_cast<uint32_t>(ps::UserDefineEvent::kNodeTimeout);
|
||||||
MS_LOG(INFO) << "Broad timeout event:" << event;
|
MS_LOG(INFO) << "Broad timeout event:" << event;
|
||||||
for (const auto kvs : initial_node_infos) {
|
for (const auto &kvs : initial_node_infos) {
|
||||||
auto client = GetOrCreateClient(kvs.second);
|
auto client = GetOrCreateClient(kvs.second);
|
||||||
SendEvent(client, event);
|
SendEvent(client, event);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue