Add tcp communication into compute graph nodes and meta server node

This commit is contained in:
Parallels 2022-03-08 16:45:18 +08:00
parent 9ef7750d3f
commit e2125108b8
10 changed files with 337 additions and 6 deletions

View File

@ -8,12 +8,14 @@ else()
endif()
if(NOT ENABLE_CPU OR WIN32 OR APPLE)
set(EXCLUDE_DIR "rpc/")
foreach(RPC_FILE ${_DISTRIBUTED_SRC_FILES})
string(FIND ${RPC_FILE} ${EXCLUDE_DIR} FOUND)
if(${FOUND} EQUAL 0)
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES ${RPC_FILE})
endif()
set(EXCLUDE_DIRS "rpc/" "cluster/topology/")
foreach(EXCLUDE_DIR ${EXCLUDE_DIRS})
foreach(EXCLUDE_FILE ${_DISTRIBUTED_SRC_FILES})
string(FIND ${EXCLUDE_FILE} ${EXCLUDE_DIR} FOUND)
if(${FOUND} EQUAL 0)
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES ${EXCLUDE_FILE})
endif()
endforeach()
endforeach()
endif()

View File

@ -0,0 +1,47 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMMON_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMMON_H_
#include <string>
namespace mindspore {
namespace distributed {
namespace cluster {
namespace topology {
// The address of meta server node used by compute graph nodes to register and get addresses of other compute graph
// nodes dynamically.
struct MetaServerAddress {
std::string GetUrl() { return ip + ":" + std::to_string(port); }
std::string ip;
int port;
};
// The address of meta server node.
// This address is set or obtained through environment variables.
constexpr char kEnvMetaServerHost[] = "MS_SCHED_HOST";
constexpr char kEnvMetaServerPort[] = "MS_SCHED_PORT";
constexpr char kEnvNodeId[] = "MS_NODE_ID";
// For port number conversion.
static const int kDecimal = 10;
} // namespace topology
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMMON_H_

View File

@ -0,0 +1,63 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils/log_adapter.h"
#include "distributed/cluster/topology/utils.h"
#include "distributed/cluster/topology/compute_graph_node.h"
namespace mindspore {
namespace distributed {
namespace cluster {
namespace topology {
bool ComputeGraphNode::Initialize() {
// Init the address of meta server node.
RETURN_IF_FALSE_WITH_LOG(FillMetaServerAddress(&meta_server_addr_),
"Failed to init the address of meta server node.");
// Init the TCP client.
tcp_client_ = std::make_unique<rpc::TCPClient>();
MS_EXCEPTION_IF_NULL(tcp_client_);
RETURN_IF_FALSE_WITH_LOG(tcp_client_->Initialize(), "Failed to create the TCP client.");
// Register itself to meta server node.
RETURN_IF_FALSE_WITH_LOG(Register(), "Failed to register to the meta server node.");
return true;
}
bool ComputeGraphNode::Finalize() {
// Release the TCP client.
if (tcp_client_ != nullptr) {
const auto &server_url = meta_server_addr_.GetUrl();
tcp_client_->Disconnect(server_url);
tcp_client_->Finalize();
tcp_client_.reset();
}
return true;
}
bool ComputeGraphNode::Register() {
MS_EXCEPTION_IF_NULL(tcp_client_);
const auto &server_url = meta_server_addr_.GetUrl();
RETURN_IF_FALSE_WITH_LOG(tcp_client_->Connect(server_url),
"Failed to connect to the meta server node url: " << server_url);
return true;
}
bool ComputeGraphNode::Heartbeat() { return true; }
} // namespace topology
} // namespace cluster
} // namespace distributed
} // namespace mindspore

View File

@ -18,6 +18,9 @@
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_COMPUTE_GRAPH_NODE_H_
#include <string>
#include <memory>
#include "distributed/cluster/topology/common.h"
#include "distributed/rpc/tcp/tcp_client.h"
#include "distributed/cluster/topology/node_base.h"
namespace mindspore {
@ -30,9 +33,21 @@ class ComputeGraphNode : public NodeBase {
explicit ComputeGraphNode(const std::string &node_id) : NodeBase(node_id) {}
~ComputeGraphNode() override = default;
bool Initialize() override;
bool Finalize() override;
private:
// Send the register message to the meta server node when this node process startup.
bool Register();
// Send the heartbeat message to the meta server node.
bool Heartbeat();
// The meta server address used to synchronize metadata with other compute graph nodes.
MetaServerAddress meta_server_addr_;
// The TCP client is used to send messages to meta server node.
std::unique_ptr<rpc::TCPClient> tcp_client_;
};
} // namespace topology
} // namespace cluster

View File

@ -0,0 +1,60 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <functional>
#include "distributed/cluster/topology/utils.h"
#include "distributed/cluster/topology/meta_server_node.h"
namespace mindspore {
namespace distributed {
namespace cluster {
namespace topology {
bool MetaServerNode::Initialize() {
// Init the address of meta server node.
RETURN_IF_FALSE_WITH_LOG(FillMetaServerAddress(&meta_server_addr_),
"Failed to init the address of meta server node.");
// Init the TCP server.
RETURN_IF_FALSE_WITH_LOG(InitTCPServer(), "Failed to create the TCP server.");
return true;
}
bool MetaServerNode::Finalize() {
// Release the TCP server.
if (tcp_server_ != nullptr) {
tcp_server_->Finalize();
tcp_server_.reset();
}
return true;
}
bool MetaServerNode::InitTCPServer() {
tcp_server_ = std::make_unique<rpc::TCPServer>();
MS_EXCEPTION_IF_NULL(tcp_server_);
RETURN_IF_FALSE_WITH_LOG(tcp_server_->Initialize(meta_server_addr_.GetUrl()), "Failed to init the tcp server.");
tcp_server_->SetMessageHandler(std::bind(&MetaServerNode::HandleMessage, this, std::placeholders::_1));
return true;
}
void MetaServerNode::HandleMessage(const std::shared_ptr<MessageBase> &message) {}
void MetaServerNode::ProcessRegister() {}
void MetaServerNode::ProcessHeartbeat() {}
} // namespace topology
} // namespace cluster
} // namespace distributed
} // namespace mindspore

View File

@ -18,6 +18,9 @@
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_META_SERVER_NODE_H_
#include <string>
#include <memory>
#include "distributed/cluster/topology/common.h"
#include "distributed/rpc/tcp/tcp_server.h"
#include "distributed/cluster/topology/node_base.h"
namespace mindspore {
@ -31,9 +34,27 @@ class MetaServerNode : public NodeBase {
explicit MetaServerNode(const std::string &node_id) : NodeBase(node_id) {}
~MetaServerNode() override = default;
bool Initialize() override;
bool Finalize() override;
private:
// Create and init the tcp server.
bool InitTCPServer();
// Handle the message received by the tcp server.
void HandleMessage(const std::shared_ptr<MessageBase> &message);
// Process the received register message sent from compute graph nodes.
void ProcessRegister();
// Process the received heartbeat message sent from compute graph nodes.
void ProcessHeartbeat();
// The meta server address used to manage the tcp server.
MetaServerAddress meta_server_addr_;
// The TCP server is used to process messages sent from compute graph nodes.
std::unique_ptr<rpc::TCPServer> tcp_server_;
};
} // namespace topology
} // namespace cluster

View File

@ -32,6 +32,12 @@ class NodeBase {
explicit NodeBase(const std::string &node_id) : node_id_(node_id) {}
virtual ~NodeBase() = default;
// Prepare the resources hold in this node.
virtual bool Initialize() = 0;
// Release the resources hold in this node.
virtual bool Finalize() = 0;
protected:
// Each node process has a unique node id which is immutable during the life cycle of this node.
// The node id is used for identify authentication during networking and process recovery.

View File

@ -0,0 +1,60 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_UTILS_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_UTILS_H_
#include "utils/log_adapter.h"
#include "utils/ms_utils.h"
#include "distributed/cluster/topology/common.h"
namespace mindspore {
namespace distributed {
namespace cluster {
namespace topology {
static bool FillMetaServerAddress(struct MetaServerAddress *address) {
MS_EXCEPTION_IF_NULL(address);
// Get the address of meta server from the environment.
auto ip = common::GetEnv(kEnvMetaServerHost);
auto ms_port = common::GetEnv(kEnvMetaServerPort);
if (ip.empty()) {
MS_LOG(ERROR) << "Failed to get ip of meta server from environment variables.";
return false;
}
if (ms_port.empty()) {
MS_LOG(ERROR) << "Failed to get port of meta server from environment variables.";
return false;
}
auto port = std::strtol(ms_port.c_str(), nullptr, kDecimal);
// Valid port number range.
static int min_port = 1;
static int max_port = 65535;
if (port < min_port || port > max_port) {
MS_LOG(ERROR) << "The port number of meta server node: " << port << " is invalid (1~65535).";
return false;
}
// Fill the meta server address.
address->ip = ip;
address->port = port;
return true;
}
} // namespace topology
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_TOPOLOGY_UTILS_H_

View File

@ -71,6 +71,7 @@ if(ENABLE_MINDDATA)
./fl/*.cc
./distributed/persistent/*.cc
./distributed/rpc/tcp/*.cc
./distributed/cluster/topology/*.cc
./cxx_api/*.cc
./tbe/*.cc
./mindapi/*.cc
@ -183,6 +184,7 @@ file(GLOB_RECURSE MINDSPORE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
"../../../mindspore/ccsrc/distributed/cluster/cluster_context.cc"
"../../../mindspore/ccsrc/distributed/persistent/*.cc"
"../../../mindspore/ccsrc/distributed/rpc/tcp/*.cc"
"../../../mindspore/ccsrc/distributed/cluster/topology/*.cc"
"../../../mindspore/ccsrc/profiler/device/ascend/*.cc"
"../../../mindspore/ccsrc/profiler/device/profiling.cc"
"../../../mindspore/ccsrc/plugin/device/cpu/kernel/nnacl/fp32/adam_fp32.c"

View File

@ -0,0 +1,55 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include "distributed/cluster/topology/compute_graph_node.h"
#include "distributed/cluster/topology/meta_server_node.h"
#include "utils/ms_utils.h"
#include "common/common_test.h"
namespace mindspore {
namespace distributed {
namespace cluster {
namespace topology {
// Test the dynamic networking for distributed computation graph execution.
class TestDynamicNetworking : public UT::Common {
protected:
void SetUp() {}
void TearDown() {}
};
/// Feature: test the normal node registration from compute graph node to meta server node.
/// Description: start a compute graph node and meta server node and send a register message.
/// Expectation: the register message is received by meta server node successfully.
TEST_F(TestDynamicNetworking, NodeRegister) {
std::string server_host = "127.0.0.1";
std::string server_port = "8090";
common::SetEnv(kEnvMetaServerHost, server_host.c_str());
common::SetEnv(kEnvMetaServerPort, server_port.c_str());
MetaServerNode msn("master");
ASSERT_TRUE(msn.Initialize());
ComputeGraphNode cgn("worker");
ASSERT_TRUE(cgn.Initialize());
cgn.Finalize();
msn.Finalize();
}
} // namespace topology
} // namespace cluster
} // namespace distributed
} // namespace mindspore