From 5fe6d42fe108a4fa2972e74794a153970d5836e1 Mon Sep 17 00:00:00 2001 From: Parallels Date: Thu, 10 Mar 2022 00:20:59 +0800 Subject: [PATCH] Add registration and heartbeat message from compute graph node to meta server node --- mindspore/ccsrc/CMakeLists.txt | 8 +++ .../distributed/cluster/topology/common.h | 3 ++ .../cluster/topology/compute_graph_node.cc | 25 ++++++++- .../cluster/topology/meta_server_node.cc | 52 +++++++++++++++++-- .../cluster/topology/meta_server_node.h | 22 +++++++- .../cluster/topology/protocol/topology.proto | 26 ++++++++++ .../distributed/cluster/topology/utils.h | 13 +++++ .../topology/test_dynamic_networking.cc | 4 +- 8 files changed, 145 insertions(+), 8 deletions(-) create mode 100644 mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto diff --git a/mindspore/ccsrc/CMakeLists.txt b/mindspore/ccsrc/CMakeLists.txt index ccb35d507df..f83f0cbee21 100644 --- a/mindspore/ccsrc/CMakeLists.txt +++ b/mindspore/ccsrc/CMakeLists.txt @@ -161,6 +161,14 @@ include_directories("${CMAKE_BINARY_DIR}/ps/core") file(GLOB_RECURSE COMM_PROTO_IN RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ps/core/protos/*.proto") ms_protobuf_generate(COMM_PROTO_SRCS COMM_PROTO_HDRS ${COMM_PROTO_IN}) list(APPEND MINDSPORE_PROTO_LIST ${COMM_PROTO_SRCS}) + +include_directories("${CMAKE_BINARY_DIR}/distributed/cluster/topology") +file(GLOB_RECURSE DISTRIBUTED_CLUSTER_TOPOLOGY RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} + "distributed/cluster/topology/protocol/*.proto") +ms_protobuf_generate(DISTRIBUTED_CLUSTER_TOPOLOGY_SRCS DISTRIBUTED_CLUSTER_TOPOLOGY_HDRS + ${DISTRIBUTED_CLUSTER_TOPOLOGY}) +list(APPEND MINDSPORE_PROTO_LIST ${DISTRIBUTED_CLUSTER_TOPOLOGY_SRCS}) + if(NOT ENABLE_SECURITY) include_directories("${CMAKE_BINARY_DIR}/profiler/device/ascend") file(GLOB_RECURSE PROFILER_PROTO_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} diff --git a/mindspore/ccsrc/distributed/cluster/topology/common.h b/mindspore/ccsrc/distributed/cluster/topology/common.h index 2139a4f0750..e539199e47b 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/common.h +++ b/mindspore/ccsrc/distributed/cluster/topology/common.h @@ -40,6 +40,9 @@ constexpr char kEnvNodeId[] = "MS_NODE_ID"; // For port number conversion. static const int kDecimal = 10; + +// All kinds of messages sent between compute graph nodes and meta server node. +enum class MessageName { kRegistration, kHeartbeat }; } // namespace topology } // namespace cluster } // namespace distributed diff --git a/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc b/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc index 4433313d3c2..9e23075ff54 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc +++ b/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc @@ -16,6 +16,8 @@ #include "utils/log_adapter.h" #include "distributed/cluster/topology/utils.h" +#include "distributed/cluster/topology/common.h" +#include "proto/topology.pb.h" #include "distributed/cluster/topology/compute_graph_node.h" namespace mindspore { @@ -53,10 +55,31 @@ bool ComputeGraphNode::Register() { const auto &server_url = meta_server_addr_.GetUrl(); RETURN_IF_FALSE_WITH_LOG(tcp_client_->Connect(server_url), "Failed to connect to the meta server node url: " << server_url); + RegistrationMessage reg_msg; + reg_msg.set_node_id(node_id_); + + std::string content = reg_msg.SerializeAsString(); + auto message = CreateMessage(server_url, content); + MS_EXCEPTION_IF_NULL(message); + + tcp_client_->Send(std::move(message)); return true; } -bool ComputeGraphNode::Heartbeat() { return true; } +bool ComputeGraphNode::Heartbeat() { + MS_EXCEPTION_IF_NULL(tcp_client_); + + HeartbeatMessage hb_msg; + hb_msg.set_node_id(node_id_); + + const auto &server_url = meta_server_addr_.GetUrl(); + std::string content = hb_msg.SerializeAsString(); + auto message = CreateMessage(server_url, content); + MS_EXCEPTION_IF_NULL(message); + + tcp_client_->Send(std::move(message)); + return true; +} } // namespace topology } // namespace cluster } // namespace distributed diff --git a/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.cc b/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.cc index 73f267354e1..77c17fa9728 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.cc +++ b/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.cc @@ -15,6 +15,7 @@ */ #include +#include "proto/topology.pb.h" #include "distributed/cluster/topology/utils.h" #include "distributed/cluster/topology/meta_server_node.h" @@ -46,14 +47,59 @@ bool MetaServerNode::InitTCPServer() { MS_EXCEPTION_IF_NULL(tcp_server_); RETURN_IF_FALSE_WITH_LOG(tcp_server_->Initialize(meta_server_addr_.GetUrl()), "Failed to init the tcp server."); tcp_server_->SetMessageHandler(std::bind(&MetaServerNode::HandleMessage, this, std::placeholders::_1)); + + // Configure the message processors for the TCP server. + message_handlers_[MessageName::kRegistration] = + std::bind(&MetaServerNode::ProcessRegister, this, std::placeholders::_1); + message_handlers_[MessageName::kHeartbeat] = + std::bind(&MetaServerNode::ProcessHeartbeat, this, std::placeholders::_1); return true; } -void MetaServerNode::HandleMessage(const std::shared_ptr &message) {} +void MetaServerNode::HandleMessage(const std::shared_ptr &message) { + MS_EXCEPTION_IF_NULL(message); + const auto &message_name = static_cast(std::stoi(message->Name())); + const auto &handler = message_handlers_.find(message_name); + if (handler == message_handlers_.end()) { + MS_LOG(ERROR) << "Unknown message name: " << message->Name(); + } + message_handlers_[message_name](message); +} -void MetaServerNode::ProcessRegister() {} +void MetaServerNode::ProcessRegister(const std::shared_ptr &message) { + MS_EXCEPTION_IF_NULL(message); -void MetaServerNode::ProcessHeartbeat() {} + RegistrationMessage registration; + const std::string &body = message->Body(); + registration.ParseFromArray(body.c_str(), body.length()); + + // Add the compute graph node into registered nodes. + const auto &node_id = registration.node_id(); + if (nodes_.find(node_id) == nodes_.end()) { + std::shared_ptr node_state = std::make_shared(node_id); + nodes_[node_id] = node_state; + MS_LOG(INFO) << "The new node: " << node_id << " is registered successfully."; + } else { + MS_LOG(ERROR) << "The node: " << node_id << " have been registered before."; + } +} + +void MetaServerNode::ProcessHeartbeat(const std::shared_ptr &message) { + MS_EXCEPTION_IF_NULL(message); + + HeartbeatMessage heartbeat; + const std::string &body = message->Body(); + heartbeat.ParseFromArray(body.c_str(), body.length()); + + // Update the state(timestamp) of this node. + const auto &node_id = heartbeat.node_id(); + if (nodes_.find(node_id) == nodes_.end()) { + auto &node = nodes_[node_id]; + time(&(node->last_update)); + } else { + MS_LOG(ERROR) << "Invalid node: " << node_id << "."; + } +} } // namespace topology } // namespace cluster } // namespace distributed diff --git a/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.h b/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.h index 11d23bc257d..aee1008f66b 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.h +++ b/mindspore/ccsrc/distributed/cluster/topology/meta_server_node.h @@ -17,8 +17,10 @@ #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_ #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_ +#include #include #include +#include #include "distributed/cluster/topology/common.h" #include "distributed/rpc/tcp/tcp_server.h" #include "distributed/cluster/topology/node_base.h" @@ -27,6 +29,16 @@ namespace mindspore { namespace distributed { namespace cluster { namespace topology { +// Record the state of the compute graph node. +struct ComputeGraphNodeState { + explicit ComputeGraphNodeState(std::string id) { node_id = id; } + std::string node_id; + + // The timestamp of last heartbeat. + // This timestamp is considered the health state of the node. + time_t last_update; +}; + // The MetaServerNode is a separate process representing the meta server node which stores all the metadata and status // of computation graph nodes. class MetaServerNode : public NodeBase { @@ -45,16 +57,22 @@ class MetaServerNode : public NodeBase { void HandleMessage(const std::shared_ptr &message); // Process the received register message sent from compute graph nodes. - void ProcessRegister(); + void ProcessRegister(const std::shared_ptr &message); // Process the received heartbeat message sent from compute graph nodes. - void ProcessHeartbeat(); + void ProcessHeartbeat(const std::shared_ptr &message); // The meta server address used to manage the tcp server. MetaServerAddress meta_server_addr_; // The TCP server is used to process messages sent from compute graph nodes. std::unique_ptr tcp_server_; + + // All the handlers for compute graph node's messages processing. + std::map message_handlers_; + + // Stores the registered compute graph nodes. + std::map> nodes_; }; } // namespace topology } // namespace cluster diff --git a/mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto b/mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto new file mode 100644 index 00000000000..c4338d5941a --- /dev/null +++ b/mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto @@ -0,0 +1,26 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; +package mindspore.distributed.cluster.topology; + +message RegistrationMessage { + string node_id = 1; +} + +message HeartbeatMessage { + string node_id = 1; +} diff --git a/mindspore/ccsrc/distributed/cluster/topology/utils.h b/mindspore/ccsrc/distributed/cluster/topology/utils.h index 0ea35aba705..ee1e5b9c625 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/utils.h +++ b/mindspore/ccsrc/distributed/cluster/topology/utils.h @@ -17,8 +17,11 @@ #ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_UTILS_H_ #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_UTILS_H_ +#include +#include #include "utils/log_adapter.h" #include "utils/ms_utils.h" +#include "actor/msg.h" #include "distributed/cluster/topology/common.h" namespace mindspore { @@ -53,6 +56,16 @@ static bool FillMetaServerAddress(struct MetaServerAddress *address) { address->port = port; return true; } + +__attribute__((unused)) static std::unique_ptr CreateMessage(const std::string &dest_url, + const std::string &content) { + std::unique_ptr message = std::make_unique(); + message->name = std::to_string(static_cast(MessageName::kRegistration)); + message->from = AID("", ""); + message->to = AID("", dest_url); + message->body = content; + return message; +} } // namespace topology } // namespace cluster } // namespace distributed diff --git a/tests/ut/cpp/distributed/cluster/topology/test_dynamic_networking.cc b/tests/ut/cpp/distributed/cluster/topology/test_dynamic_networking.cc index 390e17e870d..45491972a4d 100644 --- a/tests/ut/cpp/distributed/cluster/topology/test_dynamic_networking.cc +++ b/tests/ut/cpp/distributed/cluster/topology/test_dynamic_networking.cc @@ -40,10 +40,10 @@ TEST_F(TestDynamicNetworking, NodeRegister) { common::SetEnv(kEnvMetaServerHost, server_host.c_str()); common::SetEnv(kEnvMetaServerPort, server_port.c_str()); - MetaServerNode msn("master"); + MetaServerNode msn("meta_server_node"); ASSERT_TRUE(msn.Initialize()); - ComputeGraphNode cgn("worker"); + ComputeGraphNode cgn("compute_graph_node"); ASSERT_TRUE(cgn.Initialize()); cgn.Finalize();