!32327 Add message sending API from compute graph node to meta server node

Merge pull request !32327 from chengang/add_sendmsg_api
This commit is contained in:
i-robot 2022-03-31 06:18:34 +00:00 committed by Gitee
commit 8b13840742
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 120 additions and 14 deletions

View File

@ -102,6 +102,20 @@ bool ComputeGraphNode::Heartbeat() {
tcp_client_->SendSync(std::move(message));
return true;
}
bool ComputeGraphNode::SendMessageToMSN(const std::string msg_name, const std::string &msg_body) {
MS_EXCEPTION_IF_NULL(tcp_client_);
auto message = CreateMessage(meta_server_addr_.GetUrl(), msg_name, msg_body);
MS_EXCEPTION_IF_NULL(message);
auto retval = tcp_client_->SendSync(std::move(message));
if (retval > 0) {
return true;
} else {
return false;
}
}
} // namespace topology
} // namespace cluster
} // namespace distributed

View File

@ -36,6 +36,9 @@ class ComputeGraphNode : public NodeBase {
bool Initialize() override;
bool Finalize() override;
// Send the specified message to the meta server node.
bool SendMessageToMSN(const std::string msg_name, const std::string &msg_body);
private:
// Send the register message to the meta server node when this node process startup.
bool Register();

View File

@ -15,6 +15,8 @@
*/
#include <functional>
#include <algorithm>
#include <string>
#include "proto/topology.pb.h"
#include "distributed/cluster/topology/utils.h"
#include "distributed/cluster/topology/meta_server_node.h"
@ -58,23 +60,38 @@ bool MetaServerNode::InitTCPServer() {
tcp_server_->SetMessageHandler(std::bind(&MetaServerNode::HandleMessage, this, std::placeholders::_1));
// Configure the message processors for the TCP server.
message_handlers_[MessageName::kRegistration] =
system_msg_handlers_[MessageName::kRegistration] =
std::bind(&MetaServerNode::ProcessRegister, this, std::placeholders::_1);
message_handlers_[MessageName::kUnregistration] =
system_msg_handlers_[MessageName::kUnregistration] =
std::bind(&MetaServerNode::ProcessUnregister, this, std::placeholders::_1);
message_handlers_[MessageName::kHeartbeat] =
system_msg_handlers_[MessageName::kHeartbeat] =
std::bind(&MetaServerNode::ProcessHeartbeat, this, std::placeholders::_1);
return true;
}
void MetaServerNode::HandleMessage(const std::shared_ptr<MessageBase> &message) {
MS_EXCEPTION_IF_NULL(message);
const auto &message_name = static_cast<MessageName>(std::stoi(message->Name()));
const auto &handler = message_handlers_.find(message_name);
if (handler == message_handlers_.end()) {
MS_LOG(ERROR) << "Unknown message name: " << message->Name();
const auto &name = message->Name();
// Handle system messages.
if (std::all_of(name.begin(), name.end(), ::isdigit)) {
const auto &message_name = static_cast<MessageName>(std::stoi(message->Name()));
const auto &handler = system_msg_handlers_.find(message_name);
if (handler == system_msg_handlers_.end()) {
MS_LOG(ERROR) << "Unknown system message name: " << message->Name();
return;
}
system_msg_handlers_[message_name](message);
// Handle user defined messages.
} else {
const auto &handler = message_handlers_.find(name);
if (handler == message_handlers_.end()) {
MS_LOG(ERROR) << "Unknown message name: " << name;
return;
}
(*message_handlers_[name])(message->Body());
}
message_handlers_[message_name](message);
}
void MetaServerNode::ProcessRegister(const std::shared_ptr<MessageBase> &message) {
@ -161,6 +178,16 @@ size_t MetaServerNode::GetAliveNodeNum() {
std::shared_lock<std::shared_mutex> lock(nodes_mutex_);
return nodes_.size();
}
bool MetaServerNode::RegisterMessageHandler(const std::string &name,
std::shared_ptr<std::function<void(const std::string &)>> handler) {
if (message_handlers_.find(name) != message_handlers_.end()) {
MS_LOG(ERROR) << "The message name: " << name << " have already been registered";
return false;
}
message_handlers_[name] = handler;
return true;
}
} // namespace topology
} // namespace cluster
} // namespace distributed

View File

@ -62,14 +62,18 @@ class MetaServerNode : public NodeBase {
: NodeBase(node_id), total_node_num_(node_num), topo_state_(TopoState::kInitializing), enable_monitor_(true) {}
~MetaServerNode() override = default;
bool Initialize() override;
bool Finalize() override;
// Get the current topology state.
TopoState TopologyState();
// Get the number of alive compute graph node.
size_t GetAliveNodeNum();
bool Initialize() override;
bool Finalize() override;
// Register the message handler for the user defined message which is specified by the `name` parameter.
bool RegisterMessageHandler(const std::string &name,
std::shared_ptr<std::function<void(const std::string &)>> handler);
private:
// Create and init the tcp server.
@ -96,8 +100,13 @@ class MetaServerNode : public NodeBase {
// The TCP server is used to process messages sent from compute graph nodes.
std::unique_ptr<rpc::TCPServer> tcp_server_;
// All the handlers for compute graph node's messages processing.
std::map<MessageName, rpc::MessageHandler> message_handlers_;
// All the handlers for compute graph node's system messages processing.
// The `system` means the built-in messages used for cluster topology construction.
std::map<MessageName, rpc::MessageHandler> system_msg_handlers_;
// All the handlers for compute graph node's user-defined messages processing.
// The `user-defined` means that this kind of message is user defined and has customized message handler.
std::map<std::string, std::shared_ptr<std::function<void(const std::string &)>>> message_handlers_;
// Stores the registered compute graph nodes.
std::map<std::string, std::shared_ptr<ComputeGraphNodeState>> nodes_;

View File

@ -59,16 +59,22 @@ static bool FillMetaServerAddress(struct MetaServerAddress *address) {
}
__attribute__((unused)) static std::unique_ptr<MessageBase> CreateMessage(const std::string &dest_url,
const MessageName &name,
const std::string &name,
const std::string &content) {
std::unique_ptr<MessageBase> message = std::make_unique<MessageBase>();
message->name = std::to_string(static_cast<int>(name));
message->name = name;
message->from = AID("", "");
message->to = AID("", dest_url);
message->body = content;
return message;
}
__attribute__((unused)) static std::unique_ptr<MessageBase> CreateMessage(const std::string &dest_url,
const MessageName &name,
const std::string &content) {
return CreateMessage(dest_url, std::to_string(static_cast<int>(name)), content);
}
__attribute__((unused)) static std::chrono::high_resolution_clock::time_point Now() {
return std::chrono::high_resolution_clock::now();
}

View File

@ -67,6 +67,53 @@ TEST_F(TestDynamicNetworking, NodeRegister) {
msn.Finalize();
}
/// Feature: test sending message through compute graph node to meta server node.
/// Description: send a special kind of message to msn and register the corresponding message handler.
/// Expectation: the registered handler received the sent message successfully.
TEST_F(TestDynamicNetworking, AddMessageHandler) {
std::string server_host = "127.0.0.1";
std::string server_port = "8090";
common::SetEnv(kEnvMetaServerHost, server_host.c_str());
common::SetEnv(kEnvMetaServerPort, server_port.c_str());
size_t total_node_num = 1;
MetaServerNode msn("meta_server_node", total_node_num);
ASSERT_TRUE(msn.Initialize());
std::string message_name = "route";
static std::string received_message;
auto func = std::make_shared<std::function<void(const std::string &)>>(
[](const std::string &message) { received_message = message; });
msn.RegisterMessageHandler(message_name, func);
ComputeGraphNode cgn("compute_graph_node");
ASSERT_TRUE(cgn.Initialize());
size_t interval = 1;
size_t retry = 30;
while (((msn.GetAliveNodeNum() != total_node_num) || (msn.TopologyState() != TopoState::kInitialized)) &&
(retry-- > 0)) {
sleep(interval);
}
ASSERT_EQ(total_node_num, msn.GetAliveNodeNum());
ASSERT_EQ(TopoState::kInitialized, msn.TopologyState());
std::string message_body = "127.0.0.1:8080";
ASSERT_TRUE(cgn.SendMessageToMSN(message_name, message_body));
cgn.Finalize();
retry = 30;
while (msn.GetAliveNodeNum() > 0 && retry-- > 0) {
sleep(interval);
}
ASSERT_EQ(0, msn.GetAliveNodeNum());
ASSERT_EQ(message_body, received_message);
msn.Finalize();
}
} // namespace topology
} // namespace cluster
} // namespace distributed