!18705 added recovery check worker and server0

Merge pull request !18705 from anancds/server
This commit is contained in:
i-robot 2021-06-23 01:32:02 +00:00 committed by Gitee
commit ae2206db87
6 changed files with 48 additions and 3 deletions

View File

@ -456,7 +456,23 @@ void AbstractNode::ProcessHeartbeatResp(std::shared_ptr<MessageMeta> meta, const
heartbeat_resp_message.ParseFromArray(data, size);
current_cluster_state_ = heartbeat_resp_message.cluster_state();
MS_LOG(DEBUG) << "The current cluster state from heartbeat:" << current_cluster_state_;
MS_LOG(DEBUG) << "The current cluster state from heartbeat:"
<< CommUtil::ClusterStateToString(current_cluster_state_);
all_nodes_info_.clear();
for (const auto &it : heartbeat_resp_message.servers_meta()) {
NodeInfo info;
info.ip_ = it.ip();
info.node_id_ = it.node_id();
info.port_ = it.port();
info.node_role_ = it.role();
info.rank_id_ = it.rank_id();
info.is_alive = it.is_alive();
all_nodes_info_[info.node_id_] = info;
MS_LOG(DEBUG) << "The node id:" << info.node_id_ << ", the rank id:" << info.rank_id_
<< ", the node role:" << CommUtil::NodeRoleToString(info.node_role_) << " is alive:" << info.is_alive;
}
if (current_cluster_state_ == ClusterState::CLUSTER_READY) {
is_ready_ = true;

View File

@ -30,6 +30,7 @@
#include "utils/ms_exception.h"
#include "ps/constants.h"
#include "ps/core/recovery_base.h"
#include "ps/core/node_info.h"
namespace mindspore {
namespace ps {
@ -245,6 +246,9 @@ class AbstractNode : public Node {
std::string scheduler_ip_;
// The port of scheduler.
uint16_t scheduler_port_;
// Synchronize all node metadata from the scheduler.
std::unordered_map<std::string, NodeInfo> all_nodes_info_;
};
} // namespace core
} // namespace ps

View File

@ -130,6 +130,21 @@ std::vector<ServersMeta> NodeManager::FetchServersMeta() {
return servers_meta_list;
}
std::vector<ServersMeta> NodeManager::FetchAllNodesMeta() {
std::vector<ServersMeta> servers_meta_list;
for (auto it = registered_nodes_info_.begin(); it != registered_nodes_info_.end(); ++it) {
ServersMeta servers_meta;
servers_meta.set_rank_id(it->second.rank_id_);
servers_meta.set_ip(it->second.ip_);
servers_meta.set_port(it->second.port_);
servers_meta.set_is_alive(it->second.is_alive);
servers_meta.set_role(it->second.node_role_);
servers_meta.set_node_id(it->second.node_id_);
servers_meta_list.push_back(servers_meta);
}
return servers_meta_list;
}
void NodeManager::UpdateCluster() {
// 1. update cluster timeout state
struct timeval current_time {};

View File

@ -61,6 +61,9 @@ class NodeManager {
void UpdateHeartbeat(const std::string &node_id);
std::vector<ServersMeta> FetchServersMeta();
// Fetch metadata information of all nodes.
std::vector<ServersMeta> FetchAllNodesMeta();
void UpdateCluster();
void CheckClusterTimeout();
void AddFinishNode(const std::string &finish_message);

View File

@ -97,6 +97,7 @@ enum ClusterState {
message HeartbeatRespMessage {
ClusterState cluster_state = 1;
repeated ServersMeta servers_meta = 2;
}
message FetchServersMessage {
@ -111,7 +112,9 @@ message ServersMeta {
uint32 rank_id = 1;
string ip = 2;
int32 port = 3;
bool is_alive = 4;
NodeRole role = 5;
string node_id = 6;
}
message SendMetadataMessage {

View File

@ -56,9 +56,13 @@ void SchedulerNode::ProcessHeartbeat(std::shared_ptr<TcpServer> server, std::sha
HeartbeatRespMessage heartbeat_resp_message;
MS_LOG(DEBUG) << "The cluster state:" << node_manager_.GetClusterState();
MS_LOG(DEBUG) << "The cluster state:" << CommUtil::ClusterStateToString(node_manager_.GetClusterState());
heartbeat_resp_message.set_cluster_state(node_manager_.GetClusterState());
std::vector<ServersMeta> servers_meta_list = node_manager_.FetchAllNodesMeta();
*heartbeat_resp_message.mutable_servers_meta() = {servers_meta_list.begin(), servers_meta_list.end()};
server->SendMessage(conn, meta, Protos::PROTOBUF, heartbeat_resp_message.SerializeAsString().data(),
heartbeat_resp_message.ByteSizeLong());
}