diff --git a/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.cc b/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.cc index 461388b76af..249b32bfc80 100644 --- a/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.cc +++ b/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.cc @@ -21,69 +21,39 @@ namespace mindspore { namespace distributed { namespace cluster { -bool ActorRouteTableProxy::RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr) { - MS_EXCEPTION_IF_NULL(node_); - std::shared_ptr> output = nullptr; - if (!node_->SendToScheduler(actor_addr.SerializeAsString().data(), actor_addr.SerializeAsString().size(), - NodeCommand::REGISTER_ACTOR_ROUTE, &output)) { - MS_LOG(EXCEPTION) << "Failed to send register route request to scheduler."; - } - - GeneralResponseMsg register_route_rsp_msg; - MS_EXCEPTION_IF_NULL(output); - (void)register_route_rsp_msg.ParseFromArray(output->data(), SizeToInt(output->size())); - if (!register_route_rsp_msg.is_success()) { - MS_LOG(ERROR) << "Register route for actor " << actor_id << " failed. " << register_route_rsp_msg.error(); - return false; - } +bool ActorRouteTableProxy::RegisterRoute(const std::string &actor_id, const topology::ActorAddress &actor_addr) { + MS_EXCEPTION_IF_NULL(cgn_); + cgn_->PutMetadata(actor_id, actor_addr.SerializeAsString(), false); return true; } -bool ActorRouteTableProxy::DeleteRoute(const std::string &actor_id) { - MS_EXCEPTION_IF_NULL(node_); - std::shared_ptr> output = nullptr; - if (!node_->SendToScheduler(actor_id.data(), actor_id.size(), NodeCommand::DELETE_ACTOR_ROUTE, &output)) { - MS_LOG(EXCEPTION) << "Failed to send delete route request to scheduler."; - } - - GeneralResponseMsg delete_route_rsp_msg; - MS_EXCEPTION_IF_NULL(output); - (void)delete_route_rsp_msg.ParseFromArray(output->data(), SizeToInt(output->size())); - if (!delete_route_rsp_msg.is_success()) { - MS_LOG(ERROR) << "Delete route for actor " << actor_id << " failed. " << delete_route_rsp_msg.error(); - return false; - } - return true; -} - -ActorAddress ActorRouteTableProxy::LookupRoute(const std::string &actor_id) const { - MS_EXCEPTION_IF_NULL(node_); +topology::ActorAddress ActorRouteTableProxy::LookupRoute(const std::string &actor_id) const { + MS_EXCEPTION_IF_NULL(cgn_); // Whether this lookup operation is successful. bool lookup_success = false; // Lookup last timestamp before timeout. auto timeout_ts = CURRENT_TIMESTAMP_MILLI + lookup_timeout_; - std::shared_ptr> output = nullptr; - ActorAddress lookup_route_rsp_msg; + topology::ActorAddress lookup_route_rsp_msg; + do { - if (!node_->SendToScheduler(actor_id.data(), actor_id.size(), NodeCommand::LOOKUP_ACTOR_ROUTE, &output)) { - MS_LOG(EXCEPTION) << "Failed to send lookup route request to scheduler."; + auto route = cgn_->GetMetadata(actor_id); + if (route.length() == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(kLookupInterval)); + } else { + (void)lookup_route_rsp_msg.ParseFromArray(route.c_str(), route.length()); + lookup_success = true; } - MS_EXCEPTION_IF_NULL(output); - (void)lookup_route_rsp_msg.ParseFromArray(output->data(), SizeToInt(output->size())); // An actor route could not be registered yet because another process could be launched slow. // If the response actor id is empty, this means the adderess is not registered yet. if (lookup_route_rsp_msg.actor_id().empty()) { MS_LOG(DEBUG) << "Actor route for actor " << actor_id << " is not registered yet, please try later."; - std::this_thread::sleep_for(std::chrono::milliseconds(kLookupInterval)); - } else { - lookup_success = true; } } while (!lookup_success && CURRENT_TIMESTAMP_MILLI <= timeout_ts); + if (!lookup_success) { MS_LOG(EXCEPTION) << "Failed to lookup actor address for " << actor_id; } - return lookup_route_rsp_msg; } } // namespace cluster diff --git a/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.h b/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.h index 0afee0c4f5f..69ad7b8e083 100644 --- a/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.h +++ b/mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.h @@ -20,17 +20,13 @@ #include #include #include -#include "proto/comm.pb.h" -#include "ps/core/abstract_node.h" +#include "proto/topology.pb.h" #include "distributed/constants.h" +#include "distributed/cluster/topology/compute_graph_node.h" namespace mindspore { namespace distributed { namespace cluster { -using ps::core::ActorAddress; -using ps::core::GeneralResponseMsg; -using ps::core::NodeCommand; - // The timeout in milliseconds for one lookup. constexpr uint32_t kDefaultLookupTimeout = 60000; @@ -41,22 +37,20 @@ constexpr uint32_t kLookupInterval = 100; // across the network. class ActorRouteTableProxy { public: - explicit ActorRouteTableProxy(const ps::core::AbstractNodePtr &node, uint32_t lookup_timeout = kDefaultLookupTimeout) - : node_(node), lookup_timeout_(std::chrono::milliseconds(lookup_timeout)) {} + explicit ActorRouteTableProxy(std::shared_ptr cgn, + uint32_t lookup_timeout = kDefaultLookupTimeout) + : cgn_(cgn), lookup_timeout_(std::chrono::milliseconds(lookup_timeout)) {} ~ActorRouteTableProxy() = default; // Register actor address to the route table stored in scheduler. - bool RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr); - - // Delete the actor address of the specified actor_id from the route table stored in scheduler. - bool DeleteRoute(const std::string &actor_id); + bool RegisterRoute(const std::string &actor_id, const topology::ActorAddress &actor_addr); // Get the actor address for the specified actor_id from the route table stored in scheduler. - ActorAddress LookupRoute(const std::string &actor_id) const; + topology::ActorAddress LookupRoute(const std::string &actor_id) const; private: - // The node variable helps proxy to communicate with scheduler, e.g., SendMessage. - ps::core::AbstractNodePtr node_; + // The cgn variable helps proxy to communicate with meta server. + std::shared_ptr cgn_; // The timeout window for lookup route operation because time of route lookup_timeout of each process is different. std::chrono::milliseconds lookup_timeout_; diff --git a/mindspore/ccsrc/distributed/cluster/cluster_context.cc b/mindspore/ccsrc/distributed/cluster/cluster_context.cc index 7773669dee2..566e1b38578 100644 --- a/mindspore/ccsrc/distributed/cluster/cluster_context.cc +++ b/mindspore/ccsrc/distributed/cluster/cluster_context.cc @@ -92,8 +92,8 @@ bool ClusterContext::Initialize() { // Step 3: Initialize some modules for the node, e.g., actor route table proxy. if (!IsScheduler()) { // Only node which is not the scheduler needs route table proxy. - actor_route_table_proxy_ = - std::make_shared(std::dynamic_pointer_cast(node_)); + auto cgn = std::dynamic_pointer_cast(node_base_); + actor_route_table_proxy_ = std::make_shared(cgn); MS_EXCEPTION_IF_NULL(actor_route_table_proxy_); } diff --git a/mindspore/ccsrc/distributed/cluster/dummy_actor_route_table_proxy.h b/mindspore/ccsrc/distributed/cluster/dummy_actor_route_table_proxy.h index 41c03c3639d..a6bee01ecb1 100644 --- a/mindspore/ccsrc/distributed/cluster/dummy_actor_route_table_proxy.h +++ b/mindspore/ccsrc/distributed/cluster/dummy_actor_route_table_proxy.h @@ -18,12 +18,12 @@ #define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_ #include -#include "proto/comm.pb.h" +#include "proto/topology.pb.h" namespace mindspore { namespace distributed { namespace cluster { -using ps::core::ActorAddress; +using distributed::cluster::topology::ActorAddress; // The dummy ActorRouteTableProxy interface. This class is for ut test and windows compiling so the implementation is // empty. class ActorRouteTableProxy { diff --git a/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc b/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc index 10519cb8f29..599d5aa0c57 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc +++ b/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.cc @@ -222,17 +222,22 @@ bool ComputeGraphNode::Reconnect() { return tcp_client_->IsConnected(server_url) && hb_client_->IsConnected(server_url); } -bool ComputeGraphNode::SendMessageToMSN(const std::string msg_name, const std::string &msg_body) { +bool ComputeGraphNode::SendMessageToMSN(const std::string msg_name, const std::string &msg_body, bool sync) { MS_EXCEPTION_IF_NULL(tcp_client_); auto message = CreateMessage(meta_server_addr_.GetUrl(), msg_name, msg_body); MS_EXCEPTION_IF_NULL(message); - auto retval = tcp_client_->SendSync(std::move(message)); - if (retval > 0) { - return true; + if (sync) { + auto retval = tcp_client_->SendSync(std::move(message)); + if (retval > 0) { + return true; + } else { + return false; + } } else { - return false; + tcp_client_->SendSync(std::move(message)); + return true; } } @@ -249,11 +254,12 @@ std::shared_ptr ComputeGraphNode::RetrieveMessageFromMSN(const std: return nullptr; } -bool ComputeGraphNode::PutMetadata(const std::string &name, const std::string &value) { +bool ComputeGraphNode::PutMetadata(const std::string &name, const std::string &value, bool sync) { MetadataMessage metadata; metadata.set_name(name); metadata.set_value(value); - return SendMessageToMSN(std::to_string(static_cast(MessageName::kWriteMetadata)), metadata.SerializeAsString()); + return SendMessageToMSN(std::to_string(static_cast(MessageName::kWriteMetadata)), metadata.SerializeAsString(), + sync); } bool ComputeGraphNode::PutMetadata(const std::string &name, const void *value, const size_t &size) { diff --git a/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.h b/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.h index 8098025aa60..de78bc2b147 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.h +++ b/mindspore/ccsrc/distributed/cluster/topology/compute_graph_node.h @@ -40,14 +40,14 @@ class ComputeGraphNode : public NodeBase { bool Finalize(bool force = false) override; // Send the specified message to the meta server node. - bool SendMessageToMSN(const std::string msg_name, const std::string &msg_body); + bool SendMessageToMSN(const std::string msg_name, const std::string &msg_body, bool sync = true); // Query the specified message from the meta server node according to the given message name. // Returns nullptr if no message returned after timeout. std::shared_ptr RetrieveMessageFromMSN(const std::string &msg_name, uint32_t timeout = 5); // Write and read user defined metadata to the meta server node. - bool PutMetadata(const std::string &name, const std::string &value); + bool PutMetadata(const std::string &name, const std::string &value, bool sync = true); bool PutMetadata(const std::string &name, const void *value, const size_t &size); std::string GetMetadata(const std::string &name, uint32_t timeout = 5); diff --git a/mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto b/mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto index b40648f5ffc..093ce797ffe 100644 --- a/mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto +++ b/mindspore/ccsrc/distributed/cluster/topology/protocol/topology.proto @@ -39,3 +39,9 @@ message MetadataMessage { string name = 1; bytes value = 2; } + +message ActorAddress { + string actor_id = 1; + string ip = 2; + uint32 port = 3; +} diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/embedding_cache/embedding_cache_prefetch_actor.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/embedding_cache/embedding_cache_prefetch_actor.cc index 3119f04e8a2..29c4f62efd2 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/embedding_cache/embedding_cache_prefetch_actor.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/embedding_cache/embedding_cache_prefetch_actor.cc @@ -18,6 +18,7 @@ #include "backend/common/optimizer/dynamic_shape/dynamic_shape_helper.h" #include "kernel/common_utils.h" #include "runtime/graph_scheduler/actor/rpc/rpc_actor.h" +#include "proto/topology.pb.h" namespace mindspore { namespace runtime { @@ -74,9 +75,9 @@ std::string GenerateInterProcessEdge(const std::string &src_role, uint32_t src_r } ActorRouteTableProxyPtr CreateRouteTableProxy() { - auto node = ClusterContext::instance()->node(); - ActorRouteTableProxyPtr actor_route_table_proxy = - std::make_shared(std::dynamic_pointer_cast(node)); + auto cgn = std::dynamic_pointer_cast( + ClusterContext::instance()->node_base()); + ActorRouteTableProxyPtr actor_route_table_proxy = std::make_shared(cgn); MS_EXCEPTION_IF_NULL(actor_route_table_proxy); return actor_route_table_proxy; } @@ -1404,7 +1405,7 @@ bool Receiver::StartServer() { // 3. Register the server address to route table. The server should not be connected before this step is done. MS_LOG(INFO) << "Start server for receiver. Server address: " << server_url << ", inter process edge name: " << inter_process_edge_; - ActorAddress recv_actor_addresss; + distributed::cluster::topology::ActorAddress recv_actor_addresss; recv_actor_addresss.set_actor_id(inter_process_edge_); recv_actor_addresss.set_ip(ip_); recv_actor_addresss.set_port(port_); diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc index 3f2725f95f8..18cb28040bb 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/recv_actor.cc @@ -20,6 +20,7 @@ #include #include #include +#include "proto/topology.pb.h" #include "distributed/rpc/tcp/constants.h" #include "plugin/device/cpu/kernel/rpc/rpc_recv_kernel.h" #include "backend/common/optimizer/helper.h" @@ -69,7 +70,7 @@ bool RecvActor::StartServer() { for (const auto &inter_process_edge_name : inter_process_edge_names_) { MS_LOG(INFO) << "Start server for recv actor. Server address: " << server_url << ", inter-process edge name: " << inter_process_edge_name; - ActorAddress recv_actor_addresss; + distributed::cluster::topology::ActorAddress recv_actor_addresss; recv_actor_addresss.set_actor_id(inter_process_edge_name); recv_actor_addresss.set_ip(ip_); recv_actor_addresss.set_port(port_); diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h index 4edef7675ac..575e40c1bf6 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/rpc/rpc_actor.h @@ -27,16 +27,17 @@ #include "distributed/rpc/tcp/tcp_client.h" #include "distributed/rpc/tcp/tcp_server.h" #include "proto/rpc.pb.h" +#include "proto/topology.pb.h" namespace mindspore { namespace runtime { using distributed::cluster::ActorRouteTableProxy; using distributed::cluster::ActorRouteTableProxyPtr; using distributed::cluster::ClusterContext; +using distributed::cluster::topology::ActorAddress; using distributed::rpc::TCPClient; using distributed::rpc::TCPServer; using mindspore::device::KernelInfo; -using ps::core::ActorAddress; // The inter-process edge mark between two nodes. constexpr char kInterProcessEdgeMark[] = "->"; diff --git a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc index fada6f7c3ed..969ccaa5cc2 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/rpc_node_scheduler.cc @@ -15,6 +15,7 @@ */ #include "runtime/graph_scheduler/rpc_node_scheduler.h" +#include "distributed/cluster/topology/compute_graph_node.h" #include "include/common/utils/anfalgo.h" namespace mindspore { @@ -157,9 +158,9 @@ void RpcNodeScheduler::ResetOpcontext(const RpcActorSetPtr &rpc_actors) { ActorRouteTableProxyPtr RpcNodeScheduler::CreateRouteTableProxy() { ActorRouteTableProxyPtr actor_route_table_proxy; if (!ClusterContext::instance()->IsScheduler()) { - auto node = ClusterContext::instance()->node(); - actor_route_table_proxy = - std::make_shared(std::dynamic_pointer_cast(node)); + auto cgn = std::dynamic_pointer_cast( + ClusterContext::instance()->node_base()); + actor_route_table_proxy = std::make_shared(cgn); MS_EXCEPTION_IF_NULL(actor_route_table_proxy); } return actor_route_table_proxy;