!49500 Assign port range for cgn

Merge pull request !49500 from ZPaC/assign-port-range
This commit is contained in:
i-robot 2023-03-01 14:19:53 +00:00 committed by Gitee
commit a54b91e617
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
7 changed files with 81 additions and 1 deletions

View File

@ -228,6 +228,11 @@ if(ENABLE_MPI)
add_compile_definitions(ENABLE_MPI)
endif()
if(ENABLE_RDMA)
add_compile_definitions(ENABLE_RDMA)
include_directories(/usr/include/umdk)
endif()
## make protobuf files
file(GLOB ONNX_PROTO "" ${CMAKE_SOURCE_DIR}/third_party/proto/onnx/onnx.proto)
message("onnx proto path is :" ${ONNX_PROTO})

View File

@ -24,6 +24,7 @@
#include "distributed/cluster/topology/compute_graph_node.h"
#include "distributed/cluster/topology/meta_server_node.h"
#include "distributed/collective/collective_manager.h"
#include "proto/topology.pb.h"
#include "utils/ms_context.h"
#include "ps/ps_context.h"
#include "ps/core/comm_util.h"
@ -190,6 +191,19 @@ bool ClusterContext::BuildCluster() {
EXECUTE_WITH_RETRY(check_func, retry_num, topology::kExecuteInterval, "Topology build timed out.");
MS_LOG(INFO) << "Cluster is successfully initialized.";
if (node_role_ != kEnvRoleOfScheduler) {
auto cgn = std::dynamic_pointer_cast<topology::ComputeGraphNode>(node_base_);
MS_EXCEPTION_IF_NULL(cgn);
std::string port_range_pb = cgn->GetMetadata(kNodePortRange);
topology::NodePortRanges node_port_ranges;
(void)node_port_ranges.ParseFromArray(port_range_pb.c_str(), SizeToInt(port_range_pb.size()));
auto port_range = node_port_ranges.data().at(node_id);
port_range_.first = port_range.min_port();
port_range_.second = port_range.max_port();
MS_LOG(INFO) << "Port range assigned for this node " << node_id << " is " << port_range_.first << " to "
<< port_range_.second;
}
return true;
}

View File

@ -23,6 +23,7 @@
#include <string>
#include <memory>
#include <atomic>
#include <utility>
#include "distributed/constants.h"
#include "utils/log_adapter.h"
#include "utils/ms_utils.h"
@ -80,6 +81,9 @@ class BACKEND_EXPORT ClusterContext {
void set_cluster_exit_with_exception();
bool cluster_exit_with_exception() const;
// Return server range of this node.
const std::pair<uint32_t, uint32_t> &port_range() const { return port_range_; }
private:
ClusterContext();
@ -125,6 +129,8 @@ class BACKEND_EXPORT ClusterContext {
// The actor route table proxy. It only created in abstract nodes because scheduler does not use proxy.
ActorRouteTableProxyPtr actor_route_table_proxy_;
std::pair<uint32_t, uint32_t> port_range_;
};
} // namespace cluster
} // namespace distributed

View File

@ -18,6 +18,7 @@
#include <algorithm>
#include <string>
#include <vector>
#include <unordered_map>
#include "utils/ms_exception.h"
#include "proto/topology.pb.h"
#include "ps/ps_context.h"
@ -453,12 +454,47 @@ bool MetaServerNode::TransitionToInitialized() {
}
}
topo_state_ = TopoState::kInitialized;
MS_LOG(INFO) << "The cluster topology has been constructed successfully";
MS_LOG(INFO) << "The cluster topology has been constructed successfully.";
// Assign port range after cluster is initialized.
AssignPortRange();
return true;
}
return false;
}
void MetaServerNode::AssignPortRange() {
MS_LOG(DEBUG) << "Start assigning port range for nodes...";
std::unordered_map<std::string, uint32_t> each_host_node_num;
std::unordered_map<std::string, uint32_t> node_index_map;
// Assign computing graph nodes' port range according to their hosts.
for (const auto &n : nodes_) {
std::string node_id = n.first;
const auto &node_info = n.second;
uint32_t &host_node_num = each_host_node_num[node_info->host_name];
node_index_map[node_info->node_id] = host_node_num;
host_node_num++;
}
NodePortRanges node_ranges;
for (const auto &n : nodes_) {
std::string node_id = n.first;
const auto &node_info = n.second;
uint32_t node_index = node_index_map[node_id];
uint32_t each_node_range = kNodePortRangeNum / each_host_node_num[node_info->host_name];
uint32_t min_port = kStartPort + each_node_range * node_index;
uint32_t max_port = min_port + each_node_range - 1;
PortRange range;
range.set_min_port(min_port);
range.set_max_port(max_port);
node_ranges.mutable_data()->insert({node_id, range});
MS_LOG(INFO) << "The port range for node " << node_id << ", rank id: " << node_info->rank_id
<< ", min port: " << min_port << ", max port: " << max_port;
}
metadata_.insert({kNodePortRange, node_ranges.SerializeAsString()});
}
bool MetaServerNode::Recovery() {
std::shared_lock<std::shared_mutex> lock(nodes_mutex_);
std::string recovery_path = recovery::RecoveryPath();

View File

@ -22,6 +22,7 @@
#include <map>
#include <thread>
#include <shared_mutex>
#include <unordered_map>
#include "distributed/rpc/tcp/tcp_server.h"
#include "distributed/recovery/configuration.h"
#include "distributed/cluster/topology/node_base.h"
@ -125,6 +126,9 @@ class MetaServerNode : public NodeBase {
// Try to transition the state of cluster to be initialized.
bool TransitionToInitialized();
// For each computing graph node, port range should be assigned by meta server node for rpc servers to bind.
void AssignPortRange();
// Recover metadata from the configuration if recovery is enabled.
bool Recovery();

View File

@ -54,3 +54,12 @@ message ActorAddress {
string ip = 2;
uint32 port = 3;
}
message PortRange {
uint32 min_port = 1;
uint32 max_port = 2;
}
message NodePortRanges {
map<string, PortRange> data = 1;
}

View File

@ -87,6 +87,12 @@ const uint16_t kDefaultSchedPort = 6667;
const uint16_t kMaxPort = 65535;
constexpr uint32_t kDefaultFinishTimeout = 30;
// For each computing graph node, there is a range for rpc server's port number.
// Each node has range number 2048, and the port started from 8118.
constexpr uint32_t kStartPort = 8118;
constexpr uint32_t kNodePortRangeNum = 4096;
constexpr char kNodePortRange[] = "node_port_range";
constexpr char kDataSyncSrcOpName[] = "DataSyncSrc";
constexpr char kDataSyncDstOpName[] = "DataSyncDst";
constexpr char kControlSrcOpName[] = "ControlSrc";