!35207 Replace the synchronization of distributed actor route table with new cluster arch

Merge pull request !35207 from chengang/replace_actor_route_lookup_2
This commit is contained in:
i-robot 2022-06-01 07:19:59 +00:00 committed by Gitee
commit d52608b12b
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
11 changed files with 61 additions and 81 deletions

View File

@ -21,69 +21,39 @@
namespace mindspore {
namespace distributed {
namespace cluster {
bool ActorRouteTableProxy::RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr) {
MS_EXCEPTION_IF_NULL(node_);
std::shared_ptr<std::vector<unsigned char>> output = nullptr;
if (!node_->SendToScheduler(actor_addr.SerializeAsString().data(), actor_addr.SerializeAsString().size(),
NodeCommand::REGISTER_ACTOR_ROUTE, &output)) {
MS_LOG(EXCEPTION) << "Failed to send register route request to scheduler.";
}
GeneralResponseMsg register_route_rsp_msg;
MS_EXCEPTION_IF_NULL(output);
(void)register_route_rsp_msg.ParseFromArray(output->data(), SizeToInt(output->size()));
if (!register_route_rsp_msg.is_success()) {
MS_LOG(ERROR) << "Register route for actor " << actor_id << " failed. " << register_route_rsp_msg.error();
return false;
}
bool ActorRouteTableProxy::RegisterRoute(const std::string &actor_id, const topology::ActorAddress &actor_addr) {
MS_EXCEPTION_IF_NULL(cgn_);
cgn_->PutMetadata(actor_id, actor_addr.SerializeAsString(), false);
return true;
}
bool ActorRouteTableProxy::DeleteRoute(const std::string &actor_id) {
MS_EXCEPTION_IF_NULL(node_);
std::shared_ptr<std::vector<unsigned char>> output = nullptr;
if (!node_->SendToScheduler(actor_id.data(), actor_id.size(), NodeCommand::DELETE_ACTOR_ROUTE, &output)) {
MS_LOG(EXCEPTION) << "Failed to send delete route request to scheduler.";
}
GeneralResponseMsg delete_route_rsp_msg;
MS_EXCEPTION_IF_NULL(output);
(void)delete_route_rsp_msg.ParseFromArray(output->data(), SizeToInt(output->size()));
if (!delete_route_rsp_msg.is_success()) {
MS_LOG(ERROR) << "Delete route for actor " << actor_id << " failed. " << delete_route_rsp_msg.error();
return false;
}
return true;
}
ActorAddress ActorRouteTableProxy::LookupRoute(const std::string &actor_id) const {
MS_EXCEPTION_IF_NULL(node_);
topology::ActorAddress ActorRouteTableProxy::LookupRoute(const std::string &actor_id) const {
MS_EXCEPTION_IF_NULL(cgn_);
// Whether this lookup operation is successful.
bool lookup_success = false;
// Lookup last timestamp before timeout.
auto timeout_ts = CURRENT_TIMESTAMP_MILLI + lookup_timeout_;
std::shared_ptr<std::vector<unsigned char>> output = nullptr;
ActorAddress lookup_route_rsp_msg;
topology::ActorAddress lookup_route_rsp_msg;
do {
if (!node_->SendToScheduler(actor_id.data(), actor_id.size(), NodeCommand::LOOKUP_ACTOR_ROUTE, &output)) {
MS_LOG(EXCEPTION) << "Failed to send lookup route request to scheduler.";
auto route = cgn_->GetMetadata(actor_id);
if (route.length() == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(kLookupInterval));
} else {
(void)lookup_route_rsp_msg.ParseFromArray(route.c_str(), route.length());
lookup_success = true;
}
MS_EXCEPTION_IF_NULL(output);
(void)lookup_route_rsp_msg.ParseFromArray(output->data(), SizeToInt(output->size()));
// An actor route could not be registered yet because another process could be launched slow.
// If the response actor id is empty, this means the adderess is not registered yet.
if (lookup_route_rsp_msg.actor_id().empty()) {
MS_LOG(DEBUG) << "Actor route for actor " << actor_id << " is not registered yet, please try later.";
std::this_thread::sleep_for(std::chrono::milliseconds(kLookupInterval));
} else {
lookup_success = true;
}
} while (!lookup_success && CURRENT_TIMESTAMP_MILLI <= timeout_ts);
if (!lookup_success) {
MS_LOG(EXCEPTION) << "Failed to lookup actor address for " << actor_id;
}
return lookup_route_rsp_msg;
}
} // namespace cluster

View File

@ -20,17 +20,13 @@
#include <string>
#include <memory>
#include <chrono>
#include "proto/comm.pb.h"
#include "ps/core/abstract_node.h"
#include "proto/topology.pb.h"
#include "distributed/constants.h"
#include "distributed/cluster/topology/compute_graph_node.h"
namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
using ps::core::GeneralResponseMsg;
using ps::core::NodeCommand;
// The timeout in milliseconds for one lookup.
constexpr uint32_t kDefaultLookupTimeout = 60000;
@ -41,22 +37,20 @@ constexpr uint32_t kLookupInterval = 100;
// across the network.
class ActorRouteTableProxy {
public:
explicit ActorRouteTableProxy(const ps::core::AbstractNodePtr &node, uint32_t lookup_timeout = kDefaultLookupTimeout)
: node_(node), lookup_timeout_(std::chrono::milliseconds(lookup_timeout)) {}
explicit ActorRouteTableProxy(std::shared_ptr<topology::ComputeGraphNode> cgn,
uint32_t lookup_timeout = kDefaultLookupTimeout)
: cgn_(cgn), lookup_timeout_(std::chrono::milliseconds(lookup_timeout)) {}
~ActorRouteTableProxy() = default;
// Register actor address to the route table stored in scheduler.
bool RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr);
// Delete the actor address of the specified actor_id from the route table stored in scheduler.
bool DeleteRoute(const std::string &actor_id);
bool RegisterRoute(const std::string &actor_id, const topology::ActorAddress &actor_addr);
// Get the actor address for the specified actor_id from the route table stored in scheduler.
ActorAddress LookupRoute(const std::string &actor_id) const;
topology::ActorAddress LookupRoute(const std::string &actor_id) const;
private:
// The node variable helps proxy to communicate with scheduler, e.g., SendMessage.
ps::core::AbstractNodePtr node_;
// The cgn variable helps proxy to communicate with meta server.
std::shared_ptr<topology::ComputeGraphNode> cgn_;
// The timeout window for lookup route operation because time of route lookup_timeout of each process is different.
std::chrono::milliseconds lookup_timeout_;

View File

@ -92,8 +92,8 @@ bool ClusterContext::Initialize() {
// Step 3: Initialize some modules for the node, e.g., actor route table proxy.
if (!IsScheduler()) {
// Only node which is not the scheduler needs route table proxy.
actor_route_table_proxy_ =
std::make_shared<ActorRouteTableProxy>(std::dynamic_pointer_cast<ps::core::AbstractNode>(node_));
auto cgn = std::dynamic_pointer_cast<distributed::cluster::topology::ComputeGraphNode>(node_base_);
actor_route_table_proxy_ = std::make_shared<ActorRouteTableProxy>(cgn);
MS_EXCEPTION_IF_NULL(actor_route_table_proxy_);
}

View File

@ -18,12 +18,12 @@
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_
#include <string>
#include "proto/comm.pb.h"
#include "proto/topology.pb.h"
namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
using distributed::cluster::topology::ActorAddress;
// The dummy ActorRouteTableProxy interface. This class is for ut test and windows compiling so the implementation is
// empty.
class ActorRouteTableProxy {

View File

@ -222,17 +222,22 @@ bool ComputeGraphNode::Reconnect() {
return tcp_client_->IsConnected(server_url) && hb_client_->IsConnected(server_url);
}
bool ComputeGraphNode::SendMessageToMSN(const std::string msg_name, const std::string &msg_body) {
bool ComputeGraphNode::SendMessageToMSN(const std::string msg_name, const std::string &msg_body, bool sync) {
MS_EXCEPTION_IF_NULL(tcp_client_);
auto message = CreateMessage(meta_server_addr_.GetUrl(), msg_name, msg_body);
MS_EXCEPTION_IF_NULL(message);
auto retval = tcp_client_->SendSync(std::move(message));
if (retval > 0) {
return true;
if (sync) {
auto retval = tcp_client_->SendSync(std::move(message));
if (retval > 0) {
return true;
} else {
return false;
}
} else {
return false;
tcp_client_->SendSync(std::move(message));
return true;
}
}
@ -249,11 +254,12 @@ std::shared_ptr<std::string> ComputeGraphNode::RetrieveMessageFromMSN(const std:
return nullptr;
}
bool ComputeGraphNode::PutMetadata(const std::string &name, const std::string &value) {
bool ComputeGraphNode::PutMetadata(const std::string &name, const std::string &value, bool sync) {
MetadataMessage metadata;
metadata.set_name(name);
metadata.set_value(value);
return SendMessageToMSN(std::to_string(static_cast<int>(MessageName::kWriteMetadata)), metadata.SerializeAsString());
return SendMessageToMSN(std::to_string(static_cast<int>(MessageName::kWriteMetadata)), metadata.SerializeAsString(),
sync);
}
bool ComputeGraphNode::PutMetadata(const std::string &name, const void *value, const size_t &size) {

View File

@ -40,14 +40,14 @@ class ComputeGraphNode : public NodeBase {
bool Finalize(bool force = false) override;
// Send the specified message to the meta server node.
bool SendMessageToMSN(const std::string msg_name, const std::string &msg_body);
bool SendMessageToMSN(const std::string msg_name, const std::string &msg_body, bool sync = true);
// Query the specified message from the meta server node according to the given message name.
// Returns nullptr if no message returned after timeout.
std::shared_ptr<std::string> RetrieveMessageFromMSN(const std::string &msg_name, uint32_t timeout = 5);
// Write and read user defined metadata to the meta server node.
bool PutMetadata(const std::string &name, const std::string &value);
bool PutMetadata(const std::string &name, const std::string &value, bool sync = true);
bool PutMetadata(const std::string &name, const void *value, const size_t &size);
std::string GetMetadata(const std::string &name, uint32_t timeout = 5);

View File

@ -39,3 +39,9 @@ message MetadataMessage {
string name = 1;
bytes value = 2;
}
message ActorAddress {
string actor_id = 1;
string ip = 2;
uint32 port = 3;
}

View File

@ -18,6 +18,7 @@
#include "backend/common/optimizer/dynamic_shape/dynamic_shape_helper.h"
#include "kernel/common_utils.h"
#include "runtime/graph_scheduler/actor/rpc/rpc_actor.h"
#include "proto/topology.pb.h"
namespace mindspore {
namespace runtime {
@ -74,9 +75,9 @@ std::string GenerateInterProcessEdge(const std::string &src_role, uint32_t src_r
}
ActorRouteTableProxyPtr CreateRouteTableProxy() {
auto node = ClusterContext::instance()->node();
ActorRouteTableProxyPtr actor_route_table_proxy =
std::make_shared<ActorRouteTableProxy>(std::dynamic_pointer_cast<ps::core::AbstractNode>(node));
auto cgn = std::dynamic_pointer_cast<distributed::cluster::topology::ComputeGraphNode>(
ClusterContext::instance()->node_base());
ActorRouteTableProxyPtr actor_route_table_proxy = std::make_shared<ActorRouteTableProxy>(cgn);
MS_EXCEPTION_IF_NULL(actor_route_table_proxy);
return actor_route_table_proxy;
}
@ -1404,7 +1405,7 @@ bool Receiver::StartServer() {
// 3. Register the server address to route table. The server should not be connected before this step is done.
MS_LOG(INFO) << "Start server for receiver. Server address: " << server_url
<< ", inter process edge name: " << inter_process_edge_;
ActorAddress recv_actor_addresss;
distributed::cluster::topology::ActorAddress recv_actor_addresss;
recv_actor_addresss.set_actor_id(inter_process_edge_);
recv_actor_addresss.set_ip(ip_);
recv_actor_addresss.set_port(port_);

View File

@ -20,6 +20,7 @@
#include <utility>
#include <functional>
#include <condition_variable>
#include "proto/topology.pb.h"
#include "distributed/rpc/tcp/constants.h"
#include "plugin/device/cpu/kernel/rpc/rpc_recv_kernel.h"
#include "backend/common/optimizer/helper.h"
@ -69,7 +70,7 @@ bool RecvActor::StartServer() {
for (const auto &inter_process_edge_name : inter_process_edge_names_) {
MS_LOG(INFO) << "Start server for recv actor. Server address: " << server_url
<< ", inter-process edge name: " << inter_process_edge_name;
ActorAddress recv_actor_addresss;
distributed::cluster::topology::ActorAddress recv_actor_addresss;
recv_actor_addresss.set_actor_id(inter_process_edge_name);
recv_actor_addresss.set_ip(ip_);
recv_actor_addresss.set_port(port_);

View File

@ -27,16 +27,17 @@
#include "distributed/rpc/tcp/tcp_client.h"
#include "distributed/rpc/tcp/tcp_server.h"
#include "proto/rpc.pb.h"
#include "proto/topology.pb.h"
namespace mindspore {
namespace runtime {
using distributed::cluster::ActorRouteTableProxy;
using distributed::cluster::ActorRouteTableProxyPtr;
using distributed::cluster::ClusterContext;
using distributed::cluster::topology::ActorAddress;
using distributed::rpc::TCPClient;
using distributed::rpc::TCPServer;
using mindspore::device::KernelInfo;
using ps::core::ActorAddress;
// The inter-process edge mark between two nodes.
constexpr char kInterProcessEdgeMark[] = "->";

View File

@ -15,6 +15,7 @@
*/
#include "runtime/graph_scheduler/rpc_node_scheduler.h"
#include "distributed/cluster/topology/compute_graph_node.h"
#include "include/common/utils/anfalgo.h"
namespace mindspore {
@ -157,9 +158,9 @@ void RpcNodeScheduler::ResetOpcontext(const RpcActorSetPtr &rpc_actors) {
ActorRouteTableProxyPtr RpcNodeScheduler::CreateRouteTableProxy() {
ActorRouteTableProxyPtr actor_route_table_proxy;
if (!ClusterContext::instance()->IsScheduler()) {
auto node = ClusterContext::instance()->node();
actor_route_table_proxy =
std::make_shared<ActorRouteTableProxy>(std::dynamic_pointer_cast<ps::core::AbstractNode>(node));
auto cgn = std::dynamic_pointer_cast<distributed::cluster::topology::ComputeGraphNode>(
ClusterContext::instance()->node_base());
actor_route_table_proxy = std::make_shared<ActorRouteTableProxy>(cgn);
MS_EXCEPTION_IF_NULL(actor_route_table_proxy);
}
return actor_route_table_proxy;