diff --git a/mindspore/ccsrc/ps/constants.h b/mindspore/ccsrc/ps/constants.h index 6e31f0e7211..7f2f304f3dd 100644 --- a/mindspore/ccsrc/ps/constants.h +++ b/mindspore/ccsrc/ps/constants.h @@ -88,6 +88,13 @@ constexpr int64_t kRetryIntervalInMs = 10; constexpr int64_t kThreadNum = 32; +// The timeout period for the scale in node to send the finish message to scheduler. +constexpr uint32_t kScaleInTimeoutInSenconds = 30; +// The number of retries to determine whether all nodes are successfully registered. +constexpr uint32_t kCheckRegisteredRetryCount = 30; +// The timeout interval for judging whether all nodes are successfully registered. +constexpr uint32_t kCheckRegisteredIntervalInMs = 1000; + // The barrier function which should be called before doing scaling out/in operations. // It's easy for us to scale out/in nodes after one iteration is completed and keep consistent. using BarrierBeforeScaleOut = std::function; diff --git a/mindspore/ccsrc/ps/core/abstract_node.cc b/mindspore/ccsrc/ps/core/abstract_node.cc index 7c2be33137f..6d3a3189579 100644 --- a/mindspore/ccsrc/ps/core/abstract_node.cc +++ b/mindspore/ccsrc/ps/core/abstract_node.cc @@ -84,23 +84,27 @@ bool AbstractNode::Broadcast(const enum NodeRole &node_role, const DataPtr &mess } void AbstractNode::set_ready_for_scale_out() { + MS_LOG(INFO) << "[Scale out]: begin to set ready for scale out."; Register(client_to_scheduler_); connected_nodes_.clear(); } void AbstractNode::set_ready_for_scale_in() { + MS_LOG(INFO) << "[Scale in]: begin to set ready for scale in."; if (!is_current_node_scale_in_) { Register(client_to_scheduler_); connected_nodes_.clear(); } else { current_cluster_state_ = ClusterState::CLUSTER_SCALE_IN; node_info_.rank_id_ = UINT32_MAX; - MS_LOG(WARNING) << "Trigger cluster scale in done event."; + SendScaleInFinishMessage(client_to_scheduler_, kScaleInTimeoutInSenconds); + MS_LOG(WARNING) << "[Event]:Trigger cluster scale in done event."; OnEventCallback(ClusterEvent::CLUSTER_SCALE_IN_DONE); } } void AbstractNode::set_scale_out_done() { + MS_LOG(INFO) << "[Scale out]: begin to set scale out done."; auto message_meta = std::make_shared(); message_meta->set_cmd(NodeCommand::SCALE_OUT_DONE); @@ -114,10 +118,11 @@ void AbstractNode::set_scale_out_done() { } MS_LOG(INFO) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_) - << " the node id:" << node_info_.node_id_ << "is send scale_out_done to scheduler!"; + << " the node id:" << node_info_.node_id_ << "is send scale_out_done to scheduler successful!"; } void AbstractNode::set_scale_in_done() { + MS_LOG(INFO) << "[Scale in]: begin to set scale in done."; auto message_meta = std::make_shared(); message_meta->set_cmd(NodeCommand::SCALE_IN_DONE); @@ -131,7 +136,7 @@ void AbstractNode::set_scale_in_done() { } MS_LOG(INFO) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_) - << " the node id:" << node_info_.node_id_ << "is send scale_in_done to scheduler!"; + << " the node id:" << node_info_.node_id_ << "is send scale_in_done to scheduler successful!"; } void AbstractNode::BroadcastEvent(const uint32_t &event) { @@ -564,6 +569,20 @@ void AbstractNode::ProcessEvent(std::shared_ptr conn, std::shared OnCustomEventCallback(event); } +void AbstractNode::SendScaleInFinishMessage(const std::shared_ptr &client, const uint32_t &timeout) { + auto meta = std::make_shared(); + meta->set_cmd(NodeCommand::SCALE_IN_FINISH); + + std::string finish_message = node_info_.node_id_; + + MS_LOG(INFO) << "The scale in node id:" << node_info_.node_id_ << " send a scale in finish message to scheduler."; + + if (!SendMessageSync(client, meta, Protos::RAW, finish_message.data(), finish_message.length(), timeout)) { + MS_LOG(WARNING) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_) + << " the node id:" << node_info_.node_id_ << " send scale in finish Message timeout!"; + } +} + void AbstractNode::ProcessScaleOut(std::shared_ptr conn, std::shared_ptr meta, const Protos &protos, const void *data, size_t size) { MS_EXCEPTION_IF_NULL(conn); @@ -824,6 +843,7 @@ void AbstractNode::InitCommandHandler() { handlers_[NodeCommand::SCALE_OUT_DONE] = nullptr; handlers_[NodeCommand::SCALE_IN_DONE] = nullptr; handlers_[NodeCommand::SEND_EVENT] = nullptr; + handlers_[NodeCommand::SCALE_IN_FINISH] = nullptr; } void AbstractNode::InitServerHandler() { @@ -855,18 +875,18 @@ void AbstractNode::InitNodeNum() { void AbstractNode::OnEventCallback(const ClusterEvent &event) { if (!event_to_callback_.count(event)) { - MS_LOG(ERROR) << "The event callback of " << event << " is not set."; + MS_LOG(ERROR) << "[Event]:The event callback of " << event << " is not set."; } else { - MS_LOG(INFO) << "Trigger the event:" << event; + MS_LOG(INFO) << "[Event]:Trigger the event:" << event; event_to_callback_[event](); } } void AbstractNode::OnCustomEventCallback(const uint32_t &event) { if (!custom_event_to_callback_.count(event)) { - MS_LOG(WARNING) << "The event callback of " << event << " is not set."; + MS_LOG(WARNING) << "[Custom event]:The event callback of " << event << " is not set."; } else { - MS_LOG(INFO) << "Trigger the event:" << event; + MS_LOG(INFO) << "[Custom event]:Trigger the event:" << event; custom_event_to_callback_[event](); } } diff --git a/mindspore/ccsrc/ps/core/abstract_node.h b/mindspore/ccsrc/ps/core/abstract_node.h index cd96f2b8e5f..20880afc577 100644 --- a/mindspore/ccsrc/ps/core/abstract_node.h +++ b/mindspore/ccsrc/ps/core/abstract_node.h @@ -138,6 +138,10 @@ class AbstractNode : public Node { void ProcessEvent(std::shared_ptr conn, std::shared_ptr meta, const Protos &protos, const void *data, size_t size); + // The scale in node needs to send a scale_in_finish message to the scheduler before trigger the event: + // CLUSTER_SCALE_IN_DONE. + void SendScaleInFinishMessage(const std::shared_ptr &client, const uint32_t &timeout); + void StartHeartbeatTimer(const std::shared_ptr &client); void UpdateSchedulerTime(); bool CheckSchedulerTimeout() const; diff --git a/mindspore/ccsrc/ps/core/node_manager.cc b/mindspore/ccsrc/ps/core/node_manager.cc index 7a8cf7a1273..477cfc84fcc 100644 --- a/mindspore/ccsrc/ps/core/node_manager.cc +++ b/mindspore/ccsrc/ps/core/node_manager.cc @@ -32,8 +32,8 @@ uint32_t NodeManager::NextRankId(const RegisterMessage ®ister_message) { uint32_t rank_id = UINT_MAX; const std::string &node_id = register_message.node_id(); - if (nodes_info_.find(node_id) != nodes_info_.end()) { - rank_id = nodes_info_[node_id].rank_id_; + if (registered_nodes_info_.find(node_id) != registered_nodes_info_.end()) { + rank_id = registered_nodes_info_[node_id].rank_id_; MS_LOG(INFO) << "The node id: " << node_id << " is already assigned!"; return rank_id; } @@ -54,7 +54,7 @@ uint32_t NodeManager::NextRankId(const RegisterMessage ®ister_message) { node_info.rank_id_ = rank_id; node_info.ip_ = ip; node_info.port_ = port; - nodes_info_[node_id] = node_info; + registered_nodes_info_[node_id] = node_info; MS_LOG(INFO) << "The server node id:" << node_id << ",node ip: " << node_info.ip_ << ",node port:" << port << " assign rank id:" << rank_id; } else if (register_message.role() == NodeRole::WORKER) { @@ -72,7 +72,7 @@ uint32_t NodeManager::NextRankId(const RegisterMessage ®ister_message) { node_info.rank_id_ = rank_id; node_info.ip_ = ip; node_info.port_ = port; - nodes_info_[node_id] = node_info; + registered_nodes_info_[node_id] = node_info; MS_LOG(INFO) << "The worker node id:" << node_id << " assign rank id:" << rank_id; } return rank_id; @@ -112,9 +112,9 @@ void NodeManager::UpdateCluster() { timeout_nodes_info_.clear(); for (auto it = heartbeats_.begin(); it != heartbeats_.end(); ++it) { if (it->second.tv_sec + PSContext::instance()->cluster_config().heartbeat_timeout < current_time.tv_sec) { - if (nodes_info_.count(it->first)) { + if (registered_nodes_info_.count(it->first)) { MS_LOG(WARNING) << "The node id:" << it->first << " is timeout!"; - timeout_nodes_info_[it->first] = nodes_info_[it->first]; + timeout_nodes_info_[it->first] = registered_nodes_info_[it->first]; } } } @@ -133,12 +133,12 @@ void NodeManager::UpdateCluster() { } void NodeManager::CheckClusterTimeout() { - if (total_node_num_ != SizeToInt(nodes_info_.size())) { + if (total_node_num_ != SizeToInt(registered_nodes_info_.size())) { MS_LOG(WARNING) << "The cluster is not ready after " << PSContext::instance()->cluster_config().cluster_available_timeout << " seconds,so finish the cluster, and change total node number from " << total_node_num_ << " to " - << nodes_info_.size(); - current_node_num_ = nodes_info_.size(); + << registered_nodes_info_.size(); + current_node_num_ = registered_nodes_info_.size(); UpdateClusterState(ClusterState::NODE_TIMEOUT); } } @@ -149,7 +149,7 @@ void NodeManager::AddScaleOutDoneNode(const std::string &node_id) { scale_out_do void NodeManager::AddScaleInDoneNode(const std::string &node_id) { scale_in_done_nodes_id_.insert(node_id); } -bool NodeManager::IsAllNodesRegistered() { return SizeToInt(nodes_info_.size()) == total_node_num_; } +bool NodeManager::IsAllNodesRegistered() { return SizeToInt(registered_nodes_info_.size()) == total_node_num_; } bool NodeManager::IsAllNodesFinished() { return SizeToInt(finish_nodes_id_.size()) == total_node_num_; } @@ -159,6 +159,12 @@ bool NodeManager::IsAllNodesScaleInDone() { return SizeToInt(scale_in_done_nodes std::unordered_map &NodeManager::nodes_info() { return nodes_info_; } +void NodeManager::UpdateNodesInfo() { + MS_LOG(INFO) << "Update nodes info."; + nodes_info_.clear(); + nodes_info_ = registered_nodes_info_; +} + void NodeManager::UpdateNodeState(const NodeState &state) { std::lock_guard lk(node_mutex_); node_state_ = state; @@ -181,7 +187,7 @@ ClusterState NodeManager::GetClusterState() { void NodeManager::ResetMetadata() { MS_LOG(WARNING) << "Reset metadata."; - nodes_info_.clear(); + registered_nodes_info_.clear(); heartbeats_.clear(); next_worker_rank_id_ = -1; next_server_rank_id_ = -1; diff --git a/mindspore/ccsrc/ps/core/node_manager.h b/mindspore/ccsrc/ps/core/node_manager.h index e933e60b6f4..4f5f8629825 100644 --- a/mindspore/ccsrc/ps/core/node_manager.h +++ b/mindspore/ccsrc/ps/core/node_manager.h @@ -86,6 +86,8 @@ class NodeManager { bool IsAllNodesScaleInDone(); std::unordered_map &nodes_info(); + // After all the nodes are registered successfully, the nodes info can be updated. + void UpdateNodesInfo(); void set_total_node_num(const int32_t &node_num); const int32_t &total_node_num(); @@ -114,7 +116,10 @@ class NodeManager { std::atomic next_worker_rank_id_; std::atomic next_server_rank_id_; - // worker nodes and server nodes + // Whenever a node is registered, it will be stored in this map. + std::unordered_map registered_nodes_info_; + // When all nodes are registered successfully, then all nodes info will be stored in this map. In other words, the + // nodes_info_ is a snapshot of the registered_nodes_info_. std::unordered_map nodes_info_; std::mutex assign_rank_id_mutex_; std::mutex heartbeat_mutex_; diff --git a/mindspore/ccsrc/ps/core/protos/comm.proto b/mindspore/ccsrc/ps/core/protos/comm.proto index 0156290a22c..e46a6d3cef4 100644 --- a/mindspore/ccsrc/ps/core/protos/comm.proto +++ b/mindspore/ccsrc/ps/core/protos/comm.proto @@ -39,6 +39,8 @@ enum NodeCommand { SCALE_IN_DONE = 11; // This command is used to send user defined event. SEND_EVENT = 12; + // This command is used to send finish message from the scale in node to the scheduler. + SCALE_IN_FINISH = 13; } enum NodeRole { diff --git a/mindspore/ccsrc/ps/core/scheduler_node.cc b/mindspore/ccsrc/ps/core/scheduler_node.cc index 4a909514487..2bbb0ce525a 100644 --- a/mindspore/ccsrc/ps/core/scheduler_node.cc +++ b/mindspore/ccsrc/ps/core/scheduler_node.cc @@ -82,6 +82,7 @@ void SchedulerNode::InitCommandHandler() { handlers_[NodeCommand::SCALE_OUT_DONE] = &SchedulerNode::ProcessScaleOutDone; handlers_[NodeCommand::SCALE_IN_DONE] = &SchedulerNode::ProcessScaleInDone; handlers_[NodeCommand::SEND_EVENT] = &SchedulerNode::ProcessSendEvent; + handlers_[NodeCommand::SCALE_IN_FINISH] = &SchedulerNode::ProcessScaleInFinish; } void SchedulerNode::CreateTcpServer() { @@ -132,6 +133,7 @@ void SchedulerNode::ProcessRegister(std::shared_ptr server, std::shar register_resp_message.ByteSizeLong()); if (node_manager_.IsAllNodesRegistered()) { + node_manager_.UpdateNodesInfo(); is_ready_ = true; auto node_infos = node_manager_.nodes_info(); for (const auto &kvs : node_infos) { @@ -181,6 +183,23 @@ void SchedulerNode::ProcessFetchMetadata(std::shared_ptr server, std: fetch_servers_message.ByteSizeLong()); } +void SchedulerNode::ProcessScaleInFinish(std::shared_ptr server, std::shared_ptr conn, + std::shared_ptr meta, const void *data, size_t size) { + MS_EXCEPTION_IF_NULL(server); + MS_EXCEPTION_IF_NULL(conn); + MS_EXCEPTION_IF_NULL(meta); + MS_EXCEPTION_IF_NULL(data); + auto finish_message = std::make_unique(reinterpret_cast(data), size); + MS_LOG(INFO) << "Process finish message from node id:" << *finish_message; + bool res = CommUtil::Retry([&] { return node_manager_.IsAllNodesRegistered(); }, kCheckRegisteredRetryCount, + kCheckRegisteredIntervalInMs); + if (res == false) { + MS_LOG(ERROR) << "All nodes have not yet fully registered successfully."; + } + MS_LOG(INFO) << "The scheduler response the scale in finish message."; + server->SendMessage(conn, meta, Protos::PROTOBUF, data, size); +} + void SchedulerNode::ProcessScaleOutDone(std::shared_ptr server, std::shared_ptr conn, std::shared_ptr meta, const void *data, size_t size) { MS_EXCEPTION_IF_NULL(server); diff --git a/mindspore/ccsrc/ps/core/scheduler_node.h b/mindspore/ccsrc/ps/core/scheduler_node.h index c0510632ae9..f13749c51e9 100644 --- a/mindspore/ccsrc/ps/core/scheduler_node.h +++ b/mindspore/ccsrc/ps/core/scheduler_node.h @@ -78,6 +78,10 @@ class SchedulerNode : public Node { std::shared_ptr meta, const void *data, size_t size); void ProcessFetchMetadata(std::shared_ptr server, std::shared_ptr conn, std::shared_ptr meta, const void *data, size_t size); + // After receiving the scale in node's scale_in_finish message,then wait for all others nodes are registered to the + // scheduler. + void ProcessScaleInFinish(std::shared_ptr server, std::shared_ptr conn, + std::shared_ptr meta, const void *data, size_t size); // Process scale_out_done messages from workers/servers void ProcessScaleOutDone(std::shared_ptr server, std::shared_ptr conn,