Add distributed meta store module

This commit is contained in:
ZPaC 2022-02-14 15:11:20 +08:00
parent 33c3a9e910
commit 191b65b236
10 changed files with 321 additions and 1 deletions

View File

@ -2,6 +2,7 @@ file(GLOB_RECURSE _DISTRIBUTED_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*
if(NOT ENABLE_CPU OR WIN32) if(NOT ENABLE_CPU OR WIN32)
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/cluster_context.cc") list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/cluster_context.cc")
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/actor_route_table_proxy.cc")
else() else()
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/dummy_cluster_context.cc") list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/dummy_cluster_context.cc")
endif() endif()

View File

@ -0,0 +1,30 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include "distributed/cluster/actor_route_table_proxy.h"
namespace mindspore {
namespace distributed {
namespace cluster {
bool ActorRouteTableProxy::RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr) { return true; }
bool ActorRouteTableProxy::DeleteRoute(const std::string &actor_id) { return true; }
ActorAddress ActorRouteTableProxy::LookupRoute(const std::string &actor_id) const { return {}; }
} // namespace cluster
} // namespace distributed
} // namespace mindspore

View File

@ -0,0 +1,53 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_PROXY_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_PROXY_H_
#include <string>
#include <memory>
#include "proto/comm.pb.h"
#include "ps/core/node.h"
#include "distributed/constants.h"
namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
// Actor route table proxy for nodes like workers and server. This class helps update actor route table in scheduler
// across the network.
class ActorRouteTableProxy {
public:
explicit ActorRouteTableProxy(const std::shared_ptr<ps::core::Node> &node) : node_(node) {}
~ActorRouteTableProxy() = default;
// Register actor address to the route table stored in scheduler.
bool RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr);
// Delete the actor address of the specified actor_id from the route table stored in scheduler.
bool DeleteRoute(const std::string &actor_id);
// Get the actor address for the specified actor_id from the route table stored in scheduler.
ActorAddress LookupRoute(const std::string &actor_id) const;
private:
// The node variable helps proxy to communicate with scheduler, e.g., SendMessage.
std::shared_ptr<ps::core::Node> node_;
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_PROXY_H_

View File

@ -0,0 +1,34 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed/cluster/actor_route_table_service.h"
namespace mindspore {
namespace distributed {
namespace cluster {
bool ActorRouteTableService::Initialize() { return true; }
bool ActorRouteTableService::RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr,
std::string *error) {
return true;
}
bool ActorRouteTableService::DeleteRoute(const std::string &actor_id, std::string *error) { return true; }
ActorAddress ActorRouteTableService::LookupRoute(const std::string &actor_id, std::string *error) { return {}; }
} // namespace cluster
} // namespace distributed
} // namespace mindspore

View File

@ -0,0 +1,62 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_SERVICE_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_SERVICE_H_
#include <map>
#include <string>
#include <memory>
#include <shared_mutex>
#include "proto/comm.pb.h"
#include "distributed/constants.h"
namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
// Metadata of actor's route table is physically stored in scheduler node. It receives requests from other nodes like
// workers and servers to update the actor route table.
class ActorRouteTableService {
public:
ActorRouteTableService() = default;
~ActorRouteTableService() = default;
bool Initialize();
// Register actor address to the route table. Parameter 'error' represents the failure information if this operation
// failed.
bool RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr, std::string *error);
// Delete the actor address of the specified actor_id. Parameter 'error' represents the failure information if this
// operation failed.
bool DeleteRoute(const std::string &actor_id, std::string *error);
// Get the actor address for the specified actor_id. Parameter 'error' represents the failure information if this
// operation failed.
ActorAddress LookupRoute(const std::string &actor_id, std::string *error);
private:
// Metadata of actor address which will used in rpc actors' inter-process communication as 'actor route table'.
std::map<std::string, ActorAddress> actor_addresses_;
// Read/write lock for the actor route table.
std::shared_mutex mtx_;
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_SERVICE_H_

View File

@ -0,0 +1,41 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_
#include <string>
#include "proto/comm.pb.h"
namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
// The dummy ActorRouteTableProxy interface. This class is for ut test and windows compiling so the implementation is
// empty.
class ActorRouteTableProxy {
public:
ActorRouteTableProxy() = default;
~ActorRouteTableProxy() = default;
bool RegisterRoute(const std::string &, const ActorAddress &) { return true; }
bool DeleteRoute(const std::string &) { return true; }
ActorAddress LookupRoute(const std::string &) const { return {}; }
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_

View File

@ -43,6 +43,12 @@ enum NodeCommand {
SCHEDULER_RECOVERY = 13; SCHEDULER_RECOVERY = 13;
// This command is used to send prepare building network msg. // This command is used to send prepare building network msg.
PREPARE_BUILDING_NETWORK = 14; PREPARE_BUILDING_NETWORK = 14;
// Register address for actor's route table.
REGISTER_ACTOR_ROUTE = 15;
// Delete address of actor.
DELETE_ACTOR_ROUTE = 16;
// Lookup address of the actor.
LOOKUP_ACTOR_ROUTE = 17;
} }
enum NodeRole { enum NodeRole {
@ -208,3 +214,14 @@ message EventRespMessage {
message ScaleInFinishMessage { message ScaleInFinishMessage {
bool is_all_nodes_registered = 1; bool is_all_nodes_registered = 1;
} }
message GeneralResponseMsg {
bool is_success = 1;
string error = 2;
}
message ActorAddress {
string actor_id = 1;
string ip = 2;
uint32 port = 3;
}

View File

@ -61,6 +61,8 @@ bool SchedulerNode::Start(const uint32_t &timeout) {
StartUpdatePersistentCommandTimer(); StartUpdatePersistentCommandTimer();
MS_LOG(INFO) << "[Scheduler start]: 4. Successfully start scheduler, there are " << node_manager_.worker_num() MS_LOG(INFO) << "[Scheduler start]: 4. Successfully start scheduler, there are " << node_manager_.worker_num()
<< " workers and " << node_manager_.server_num() << " servers registered."; << " workers and " << node_manager_.server_num() << " servers registered.";
InitializeActorRouteTableService();
return true; return true;
} }
@ -201,6 +203,13 @@ void SchedulerNode::InitCommandHandler() {
handlers_[NodeCommand::SCALE_OUT_DONE] = &SchedulerNode::ProcessScaleOutDone; handlers_[NodeCommand::SCALE_OUT_DONE] = &SchedulerNode::ProcessScaleOutDone;
handlers_[NodeCommand::SCALE_IN_DONE] = &SchedulerNode::ProcessScaleInDone; handlers_[NodeCommand::SCALE_IN_DONE] = &SchedulerNode::ProcessScaleInDone;
handlers_[NodeCommand::SEND_EVENT] = &SchedulerNode::ProcessSendEvent; handlers_[NodeCommand::SEND_EVENT] = &SchedulerNode::ProcessSendEvent;
RegisterActorRouteTableServiceHandler();
}
void SchedulerNode::RegisterActorRouteTableServiceHandler() {
handlers_[NodeCommand::REGISTER_ACTOR_ROUTE] = &SchedulerNode::ProcessRegisterActorRoute;
handlers_[NodeCommand::DELETE_ACTOR_ROUTE] = &SchedulerNode::ProcessDeleteActorRoute;
handlers_[NodeCommand::LOOKUP_ACTOR_ROUTE] = &SchedulerNode::ProcessLookupActorRoute;
} }
void SchedulerNode::CreateTcpServer() { void SchedulerNode::CreateTcpServer() {
@ -476,6 +485,28 @@ void SchedulerNode::ProcessSendEvent(const std::shared_ptr<TcpServer> &server,
} }
} }
void SchedulerNode::ProcessRegisterActorRoute(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size) {
MS_ERROR_IF_NULL_WO_RET_VAL(data);
MS_ERROR_IF_NULL_WO_RET_VAL(actor_route_table_service_);
ActorAddress actor_address;
actor_address.ParseFromArray(data, SizeToInt(size));
std::string actor_id = actor_address.actor_id();
std::string error = "";
bool ret = actor_route_table_service_->RegisterRoute(actor_id, actor_address, &error);
GeneralResponse(server, conn, meta, ret, error);
}
void SchedulerNode::ProcessDeleteActorRoute(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size) {}
void SchedulerNode::ProcessLookupActorRoute(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size) {}
bool SchedulerNode::SendPrepareBuildingNetwork(const std::unordered_map<std::string, NodeInfo> &node_infos) { bool SchedulerNode::SendPrepareBuildingNetwork(const std::unordered_map<std::string, NodeInfo> &node_infos) {
uint64_t request_id = AddMessageTrack(node_infos.size()); uint64_t request_id = AddMessageTrack(node_infos.size());
for (const auto &kvs : node_infos) { for (const auto &kvs : node_infos) {
@ -636,6 +667,11 @@ void SchedulerNode::StartUpdatePersistentCommandTimer() {
MS_EXCEPTION_IF_NULL(update_persistent_cmd_thread_); MS_EXCEPTION_IF_NULL(update_persistent_cmd_thread_);
} }
void SchedulerNode::InitializeActorRouteTableService() {
actor_route_table_service_ = std::make_unique<ActorRouteTableService>();
MS_EXCEPTION_IF_NULL(actor_route_table_service_);
}
const std::shared_ptr<TcpClient> &SchedulerNode::GetOrCreateClient(const NodeInfo &node_info) { const std::shared_ptr<TcpClient> &SchedulerNode::GetOrCreateClient(const NodeInfo &node_info) {
std::lock_guard<std::mutex> lock(client_mutex_); std::lock_guard<std::mutex> lock(client_mutex_);
if (connected_nodes_.count(node_info.node_id_)) { if (connected_nodes_.count(node_info.node_id_)) {
@ -1383,6 +1419,26 @@ void SchedulerNode::SetRegisterConnectionFd(const std::shared_ptr<TcpConnection>
MS_LOG(INFO) << "register client fd:" << fd << ", register client id:" << node_id; MS_LOG(INFO) << "register client fd:" << fd << ", register client id:" << node_id;
register_connection_fd_[fd] = node_id; register_connection_fd_[fd] = node_id;
} }
void SchedulerNode::GeneralResponse(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, bool is_success,
const std::string &error) {
MS_ERROR_IF_NULL_WO_RET_VAL(server);
MS_ERROR_IF_NULL_WO_RET_VAL(conn);
MS_ERROR_IF_NULL_WO_RET_VAL(meta);
GeneralResponseMsg general_response_message;
general_response_message.set_is_success(is_success);
general_response_message.set_error(error);
if (!server->SendMessage(conn, meta, Protos::PROTOBUF, general_response_message.SerializeAsString().data(),
general_response_message.ByteSizeLong())) {
MS_LOG(ERROR) << "Scheduler failed to respond message.";
return;
}
return;
}
} // namespace core } // namespace core
} // namespace ps } // namespace ps
} // namespace mindspore } // namespace mindspore

View File

@ -42,10 +42,12 @@
#include "ps/core/leader_scaler.h" #include "ps/core/leader_scaler.h"
#include "ps/core/recovery_base.h" #include "ps/core/recovery_base.h"
#include "ps/core/instance_manager.h" #include "ps/core/instance_manager.h"
#include "distributed/cluster/actor_route_table_service.h"
namespace mindspore { namespace mindspore {
namespace ps { namespace ps {
namespace core { namespace core {
using distributed::cluster::ActorRouteTableService;
class SchedulerNode : public Node { class SchedulerNode : public Node {
public: public:
SchedulerNode() SchedulerNode()
@ -60,7 +62,8 @@ class SchedulerNode : public Node {
leader_scaler_(nullptr), leader_scaler_(nullptr),
scheduler_recovery_(nullptr), scheduler_recovery_(nullptr),
persistent_cmd_(PersistentCommand::DEFAULT), persistent_cmd_(PersistentCommand::DEFAULT),
is_worker_timeout_(false) {} is_worker_timeout_(false),
actor_route_table_service_(nullptr) {}
~SchedulerNode() override; ~SchedulerNode() override;
typedef void (SchedulerNode::*ResponseHandler)(const std::shared_ptr<TcpServer> &server, typedef void (SchedulerNode::*ResponseHandler)(const std::shared_ptr<TcpServer> &server,
@ -81,6 +84,10 @@ class SchedulerNode : public Node {
// Persistent timer, periodically trigger persistent behavior. // Persistent timer, periodically trigger persistent behavior.
void StartUpdatePersistentCommandTimer(); void StartUpdatePersistentCommandTimer();
// Register and initialize the actor route table service.
void RegisterActorRouteTableServiceHandler();
void InitializeActorRouteTableService();
const std::shared_ptr<TcpClient> &GetOrCreateClient(const NodeInfo &node_info); const std::shared_ptr<TcpClient> &GetOrCreateClient(const NodeInfo &node_info);
void ProcessHeartbeat(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, void ProcessHeartbeat(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
@ -103,6 +110,18 @@ class SchedulerNode : public Node {
void ProcessSendEvent(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn, void ProcessSendEvent(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size); const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
// Process register actor route messages from other nodes.
void ProcessRegisterActorRoute(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
// Process delete actor route messages from other nodes.
void ProcessDeleteActorRoute(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
// Process lookup actor route messages from other nodes.
void ProcessLookupActorRoute(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);
// Determine whether the registration request of the node should be rejected, the registration of the // Determine whether the registration request of the node should be rejected, the registration of the
// alive node should be rejected. // alive node should be rejected.
virtual bool NeedRejectRegister(const NodeInfo &node_info) { return false; } virtual bool NeedRejectRegister(const NodeInfo &node_info) { return false; }
@ -176,6 +195,10 @@ class SchedulerNode : public Node {
bool SendPrepareBuildingNetwork(const std::unordered_map<std::string, NodeInfo> &node_infos); bool SendPrepareBuildingNetwork(const std::unordered_map<std::string, NodeInfo> &node_infos);
// Responding peer with the general response message.
void GeneralResponse(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, bool is_success, const std::string &error);
std::shared_ptr<TcpServer> server_; std::shared_ptr<TcpServer> server_;
std::unique_ptr<std::thread> scheduler_thread_; std::unique_ptr<std::thread> scheduler_thread_;
std::unique_ptr<std::thread> update_state_thread_; std::unique_ptr<std::thread> update_state_thread_;
@ -214,6 +237,8 @@ class SchedulerNode : public Node {
std::atomic<bool> is_worker_timeout_; std::atomic<bool> is_worker_timeout_;
// This is a map of register connection fd to client node id // This is a map of register connection fd to client node id
std::unordered_map<int, std::string> register_connection_fd_; std::unordered_map<int, std::string> register_connection_fd_;
std::unique_ptr<ActorRouteTableService> actor_route_table_service_;
}; };
} // namespace core } // namespace core
} // namespace ps } // namespace ps

View File

@ -178,6 +178,7 @@ file(GLOB_RECURSE MINDSPORE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
"../../../mindspore/ccsrc/transform/graph_ir/op_declare/*.cc" "../../../mindspore/ccsrc/transform/graph_ir/op_declare/*.cc"
"../../../mindspore/ccsrc/ps/*.cc" "../../../mindspore/ccsrc/ps/*.cc"
"../../../mindspore/ccsrc/fl/*.cc" "../../../mindspore/ccsrc/fl/*.cc"
"../../../mindspore/ccsrc/distributed/cluster/actor_route_table_service.cc"
"../../../mindspore/ccsrc/distributed/persistent/*.cc" "../../../mindspore/ccsrc/distributed/persistent/*.cc"
"../../../mindspore/ccsrc/distributed/rpc/tcp/*.cc" "../../../mindspore/ccsrc/distributed/rpc/tcp/*.cc"
"../../../mindspore/ccsrc/profiler/device/ascend/*.cc" "../../../mindspore/ccsrc/profiler/device/ascend/*.cc"