!31158 Add registration and heartbeat message from compute graph node to meta server node

Merge pull request !31158 from chengang/add_pb_to_node
This commit is contained in:
i-robot 2022-03-11 06:55:03 +00:00 committed by Gitee
commit 3375003da9
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
8 changed files with 145 additions and 8 deletions

View File

@ -161,6 +161,14 @@ include_directories("${CMAKE_BINARY_DIR}/ps/core")
file(GLOB_RECURSE COMM_PROTO_IN RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ps/core/protos/*.proto")
ms_protobuf_generate(COMM_PROTO_SRCS COMM_PROTO_HDRS ${COMM_PROTO_IN})
list(APPEND MINDSPORE_PROTO_LIST ${COMM_PROTO_SRCS})
include_directories("${CMAKE_BINARY_DIR}/distributed/cluster/topology")
file(GLOB_RECURSE DISTRIBUTED_CLUSTER_TOPOLOGY RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
"distributed/cluster/topology/protocol/*.proto")
ms_protobuf_generate(DISTRIBUTED_CLUSTER_TOPOLOGY_SRCS DISTRIBUTED_CLUSTER_TOPOLOGY_HDRS
${DISTRIBUTED_CLUSTER_TOPOLOGY})
list(APPEND MINDSPORE_PROTO_LIST ${DISTRIBUTED_CLUSTER_TOPOLOGY_SRCS})
if(NOT ENABLE_SECURITY)
include_directories("${CMAKE_BINARY_DIR}/profiler/device/ascend")
file(GLOB_RECURSE PROFILER_PROTO_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}

View File

@ -40,6 +40,9 @@ constexpr char kEnvNodeId[] = "MS_NODE_ID";
// For port number conversion.
static const int kDecimal = 10;
// All kinds of messages sent between compute graph nodes and meta server node.
enum class MessageName { kRegistration, kHeartbeat };
} // namespace topology
} // namespace cluster
} // namespace distributed

View File

@ -16,6 +16,8 @@
#include "utils/log_adapter.h"
#include "distributed/cluster/topology/utils.h"
#include "distributed/cluster/topology/common.h"
#include "proto/topology.pb.h"
#include "distributed/cluster/topology/compute_graph_node.h"
namespace mindspore {
@ -53,10 +55,31 @@ bool ComputeGraphNode::Register() {
const auto &server_url = meta_server_addr_.GetUrl();
RETURN_IF_FALSE_WITH_LOG(tcp_client_->Connect(server_url),
"Failed to connect to the meta server node url: " << server_url);
RegistrationMessage reg_msg;
reg_msg.set_node_id(node_id_);
std::string content = reg_msg.SerializeAsString();
auto message = CreateMessage(server_url, content);
MS_EXCEPTION_IF_NULL(message);
tcp_client_->Send(std::move(message));
return true;
}
bool ComputeGraphNode::Heartbeat() { return true; }
bool ComputeGraphNode::Heartbeat() {
MS_EXCEPTION_IF_NULL(tcp_client_);
HeartbeatMessage hb_msg;
hb_msg.set_node_id(node_id_);
const auto &server_url = meta_server_addr_.GetUrl();
std::string content = hb_msg.SerializeAsString();
auto message = CreateMessage(server_url, content);
MS_EXCEPTION_IF_NULL(message);
tcp_client_->Send(std::move(message));
return true;
}
} // namespace topology
} // namespace cluster
} // namespace distributed

View File

@ -15,6 +15,7 @@
*/
#include <functional>
#include "proto/topology.pb.h"
#include "distributed/cluster/topology/utils.h"
#include "distributed/cluster/topology/meta_server_node.h"
@ -46,14 +47,59 @@ bool MetaServerNode::InitTCPServer() {
MS_EXCEPTION_IF_NULL(tcp_server_);
RETURN_IF_FALSE_WITH_LOG(tcp_server_->Initialize(meta_server_addr_.GetUrl()), "Failed to init the tcp server.");
tcp_server_->SetMessageHandler(std::bind(&MetaServerNode::HandleMessage, this, std::placeholders::_1));
// Configure the message processors for the TCP server.
message_handlers_[MessageName::kRegistration] =
std::bind(&MetaServerNode::ProcessRegister, this, std::placeholders::_1);
message_handlers_[MessageName::kHeartbeat] =
std::bind(&MetaServerNode::ProcessHeartbeat, this, std::placeholders::_1);
return true;
}
void MetaServerNode::HandleMessage(const std::shared_ptr<MessageBase> &message) {}
void MetaServerNode::HandleMessage(const std::shared_ptr<MessageBase> &message) {
MS_EXCEPTION_IF_NULL(message);
const auto &message_name = static_cast<MessageName>(std::stoi(message->Name()));
const auto &handler = message_handlers_.find(message_name);
if (handler == message_handlers_.end()) {
MS_LOG(ERROR) << "Unknown message name: " << message->Name();
}
message_handlers_[message_name](message);
}
void MetaServerNode::ProcessRegister() {}
void MetaServerNode::ProcessRegister(const std::shared_ptr<MessageBase> &message) {
MS_EXCEPTION_IF_NULL(message);
void MetaServerNode::ProcessHeartbeat() {}
RegistrationMessage registration;
const std::string &body = message->Body();
registration.ParseFromArray(body.c_str(), body.length());
// Add the compute graph node into registered nodes.
const auto &node_id = registration.node_id();
if (nodes_.find(node_id) == nodes_.end()) {
std::shared_ptr<ComputeGraphNodeState> node_state = std::make_shared<ComputeGraphNodeState>(node_id);
nodes_[node_id] = node_state;
MS_LOG(INFO) << "The new node: " << node_id << " is registered successfully.";
} else {
MS_LOG(ERROR) << "The node: " << node_id << " have been registered before.";
}
}
void MetaServerNode::ProcessHeartbeat(const std::shared_ptr<MessageBase> &message) {
MS_EXCEPTION_IF_NULL(message);
HeartbeatMessage heartbeat;
const std::string &body = message->Body();
heartbeat.ParseFromArray(body.c_str(), body.length());
// Update the state(timestamp) of this node.
const auto &node_id = heartbeat.node_id();
if (nodes_.find(node_id) == nodes_.end()) {
auto &node = nodes_[node_id];
time(&(node->last_update));
} else {
MS_LOG(ERROR) << "Invalid node: " << node_id << ".";
}
}
} // namespace topology
} // namespace cluster
} // namespace distributed

View File

@ -17,8 +17,10 @@
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
#include <time.h>
#include <string>
#include <memory>
#include <map>
#include "distributed/cluster/topology/common.h"
#include "distributed/rpc/tcp/tcp_server.h"
#include "distributed/cluster/topology/node_base.h"
@ -27,6 +29,16 @@ namespace mindspore {
namespace distributed {
namespace cluster {
namespace topology {
// Record the state of the compute graph node.
struct ComputeGraphNodeState {
explicit ComputeGraphNodeState(std::string id) { node_id = id; }
std::string node_id;
// The timestamp of last heartbeat.
// This timestamp is considered the health state of the node.
time_t last_update;
};
// The MetaServerNode is a separate process representing the meta server node which stores all the metadata and status
// of computation graph nodes.
class MetaServerNode : public NodeBase {
@ -45,16 +57,22 @@ class MetaServerNode : public NodeBase {
void HandleMessage(const std::shared_ptr<MessageBase> &message);
// Process the received register message sent from compute graph nodes.
void ProcessRegister();
void ProcessRegister(const std::shared_ptr<MessageBase> &message);
// Process the received heartbeat message sent from compute graph nodes.
void ProcessHeartbeat();
void ProcessHeartbeat(const std::shared_ptr<MessageBase> &message);
// The meta server address used to manage the tcp server.
MetaServerAddress meta_server_addr_;
// The TCP server is used to process messages sent from compute graph nodes.
std::unique_ptr<rpc::TCPServer> tcp_server_;
// All the handlers for compute graph node's messages processing.
std::map<MessageName, rpc::MessageHandler> message_handlers_;
// Stores the registered compute graph nodes.
std::map<std::string, std::shared_ptr<ComputeGraphNodeState>> nodes_;
};
} // namespace topology
} // namespace cluster

View File

@ -0,0 +1,26 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
package mindspore.distributed.cluster.topology;
message RegistrationMessage {
string node_id = 1;
}
message HeartbeatMessage {
string node_id = 1;
}

View File

@ -17,8 +17,11 @@
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_UTILS_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_UTILS_H_
#include <string>
#include <memory>
#include "utils/log_adapter.h"
#include "utils/ms_utils.h"
#include "actor/msg.h"
#include "distributed/cluster/topology/common.h"
namespace mindspore {
@ -53,6 +56,16 @@ static bool FillMetaServerAddress(struct MetaServerAddress *address) {
address->port = port;
return true;
}
__attribute__((unused)) static std::unique_ptr<MessageBase> CreateMessage(const std::string &dest_url,
const std::string &content) {
std::unique_ptr<MessageBase> message = std::make_unique<MessageBase>();
message->name = std::to_string(static_cast<int>(MessageName::kRegistration));
message->from = AID("", "");
message->to = AID("", dest_url);
message->body = content;
return message;
}
} // namespace topology
} // namespace cluster
} // namespace distributed

View File

@ -40,10 +40,10 @@ TEST_F(TestDynamicNetworking, NodeRegister) {
common::SetEnv(kEnvMetaServerHost, server_host.c_str());
common::SetEnv(kEnvMetaServerPort, server_port.c_str());
MetaServerNode msn("master");
MetaServerNode msn("meta_server_node");
ASSERT_TRUE(msn.Initialize());
ComputeGraphNode cgn("worker");
ComputeGraphNode cgn("compute_graph_node");
ASSERT_TRUE(cgn.Initialize());
cgn.Finalize();