fixed scale in send failed

This commit is contained in:
chendongsheng 2021-06-09 11:31:49 +08:00
parent 4ecd0fca6e
commit e266baffca
8 changed files with 86 additions and 19 deletions

View File

@ -88,6 +88,13 @@ constexpr int64_t kRetryIntervalInMs = 10;
constexpr int64_t kThreadNum = 32;
// The timeout period for the scale in node to send the finish message to scheduler.
constexpr uint32_t kScaleInTimeoutInSenconds = 30;
// The number of retries to determine whether all nodes are successfully registered.
constexpr uint32_t kCheckRegisteredRetryCount = 30;
// The timeout interval for judging whether all nodes are successfully registered.
constexpr uint32_t kCheckRegisteredIntervalInMs = 1000;
// The barrier function which should be called before doing scaling out/in operations.
// It's easy for us to scale out/in nodes after one iteration is completed and keep consistent.
using BarrierBeforeScaleOut = std::function<void(void)>;

View File

@ -84,23 +84,27 @@ bool AbstractNode::Broadcast(const enum NodeRole &node_role, const DataPtr &mess
}
void AbstractNode::set_ready_for_scale_out() {
MS_LOG(INFO) << "[Scale out]: begin to set ready for scale out.";
Register(client_to_scheduler_);
connected_nodes_.clear();
}
void AbstractNode::set_ready_for_scale_in() {
MS_LOG(INFO) << "[Scale in]: begin to set ready for scale in.";
if (!is_current_node_scale_in_) {
Register(client_to_scheduler_);
connected_nodes_.clear();
} else {
current_cluster_state_ = ClusterState::CLUSTER_SCALE_IN;
node_info_.rank_id_ = UINT32_MAX;
MS_LOG(WARNING) << "Trigger cluster scale in done event.";
SendScaleInFinishMessage(client_to_scheduler_, kScaleInTimeoutInSenconds);
MS_LOG(WARNING) << "[Event]:Trigger cluster scale in done event.";
OnEventCallback(ClusterEvent::CLUSTER_SCALE_IN_DONE);
}
}
void AbstractNode::set_scale_out_done() {
MS_LOG(INFO) << "[Scale out]: begin to set scale out done.";
auto message_meta = std::make_shared<MessageMeta>();
message_meta->set_cmd(NodeCommand::SCALE_OUT_DONE);
@ -114,10 +118,11 @@ void AbstractNode::set_scale_out_done() {
}
MS_LOG(INFO) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< " the node id:" << node_info_.node_id_ << "is send scale_out_done to scheduler!";
<< " the node id:" << node_info_.node_id_ << "is send scale_out_done to scheduler successful!";
}
void AbstractNode::set_scale_in_done() {
MS_LOG(INFO) << "[Scale in]: begin to set scale in done.";
auto message_meta = std::make_shared<MessageMeta>();
message_meta->set_cmd(NodeCommand::SCALE_IN_DONE);
@ -131,7 +136,7 @@ void AbstractNode::set_scale_in_done() {
}
MS_LOG(INFO) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< " the node id:" << node_info_.node_id_ << "is send scale_in_done to scheduler!";
<< " the node id:" << node_info_.node_id_ << "is send scale_in_done to scheduler successful!";
}
void AbstractNode::BroadcastEvent(const uint32_t &event) {
@ -564,6 +569,20 @@ void AbstractNode::ProcessEvent(std::shared_ptr<TcpConnection> conn, std::shared
OnCustomEventCallback(event);
}
void AbstractNode::SendScaleInFinishMessage(const std::shared_ptr<TcpClient> &client, const uint32_t &timeout) {
auto meta = std::make_shared<MessageMeta>();
meta->set_cmd(NodeCommand::SCALE_IN_FINISH);
std::string finish_message = node_info_.node_id_;
MS_LOG(INFO) << "The scale in node id:" << node_info_.node_id_ << " send a scale in finish message to scheduler.";
if (!SendMessageSync(client, meta, Protos::RAW, finish_message.data(), finish_message.length(), timeout)) {
MS_LOG(WARNING) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< " the node id:" << node_info_.node_id_ << " send scale in finish Message timeout!";
}
}
void AbstractNode::ProcessScaleOut(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta,
const Protos &protos, const void *data, size_t size) {
MS_EXCEPTION_IF_NULL(conn);
@ -824,6 +843,7 @@ void AbstractNode::InitCommandHandler() {
handlers_[NodeCommand::SCALE_OUT_DONE] = nullptr;
handlers_[NodeCommand::SCALE_IN_DONE] = nullptr;
handlers_[NodeCommand::SEND_EVENT] = nullptr;
handlers_[NodeCommand::SCALE_IN_FINISH] = nullptr;
}
void AbstractNode::InitServerHandler() {
@ -855,18 +875,18 @@ void AbstractNode::InitNodeNum() {
void AbstractNode::OnEventCallback(const ClusterEvent &event) {
if (!event_to_callback_.count(event)) {
MS_LOG(ERROR) << "The event callback of " << event << " is not set.";
MS_LOG(ERROR) << "[Event]:The event callback of " << event << " is not set.";
} else {
MS_LOG(INFO) << "Trigger the event:" << event;
MS_LOG(INFO) << "[Event]:Trigger the event:" << event;
event_to_callback_[event]();
}
}
void AbstractNode::OnCustomEventCallback(const uint32_t &event) {
if (!custom_event_to_callback_.count(event)) {
MS_LOG(WARNING) << "The event callback of " << event << " is not set.";
MS_LOG(WARNING) << "[Custom event]:The event callback of " << event << " is not set.";
} else {
MS_LOG(INFO) << "Trigger the event:" << event;
MS_LOG(INFO) << "[Custom event]:Trigger the event:" << event;
custom_event_to_callback_[event]();
}
}

View File

@ -138,6 +138,10 @@ class AbstractNode : public Node {
void ProcessEvent(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, const Protos &protos,
const void *data, size_t size);
// The scale in node needs to send a scale_in_finish message to the scheduler before trigger the event:
// CLUSTER_SCALE_IN_DONE.
void SendScaleInFinishMessage(const std::shared_ptr<TcpClient> &client, const uint32_t &timeout);
void StartHeartbeatTimer(const std::shared_ptr<TcpClient> &client);
void UpdateSchedulerTime();
bool CheckSchedulerTimeout() const;

View File

@ -32,8 +32,8 @@ uint32_t NodeManager::NextRankId(const RegisterMessage &register_message) {
uint32_t rank_id = UINT_MAX;
const std::string &node_id = register_message.node_id();
if (nodes_info_.find(node_id) != nodes_info_.end()) {
rank_id = nodes_info_[node_id].rank_id_;
if (registered_nodes_info_.find(node_id) != registered_nodes_info_.end()) {
rank_id = registered_nodes_info_[node_id].rank_id_;
MS_LOG(INFO) << "The node id: " << node_id << " is already assigned!";
return rank_id;
}
@ -54,7 +54,7 @@ uint32_t NodeManager::NextRankId(const RegisterMessage &register_message) {
node_info.rank_id_ = rank_id;
node_info.ip_ = ip;
node_info.port_ = port;
nodes_info_[node_id] = node_info;
registered_nodes_info_[node_id] = node_info;
MS_LOG(INFO) << "The server node id:" << node_id << ",node ip: " << node_info.ip_ << ",node port:" << port
<< " assign rank id:" << rank_id;
} else if (register_message.role() == NodeRole::WORKER) {
@ -72,7 +72,7 @@ uint32_t NodeManager::NextRankId(const RegisterMessage &register_message) {
node_info.rank_id_ = rank_id;
node_info.ip_ = ip;
node_info.port_ = port;
nodes_info_[node_id] = node_info;
registered_nodes_info_[node_id] = node_info;
MS_LOG(INFO) << "The worker node id:" << node_id << " assign rank id:" << rank_id;
}
return rank_id;
@ -112,9 +112,9 @@ void NodeManager::UpdateCluster() {
timeout_nodes_info_.clear();
for (auto it = heartbeats_.begin(); it != heartbeats_.end(); ++it) {
if (it->second.tv_sec + PSContext::instance()->cluster_config().heartbeat_timeout < current_time.tv_sec) {
if (nodes_info_.count(it->first)) {
if (registered_nodes_info_.count(it->first)) {
MS_LOG(WARNING) << "The node id:" << it->first << " is timeout!";
timeout_nodes_info_[it->first] = nodes_info_[it->first];
timeout_nodes_info_[it->first] = registered_nodes_info_[it->first];
}
}
}
@ -133,12 +133,12 @@ void NodeManager::UpdateCluster() {
}
void NodeManager::CheckClusterTimeout() {
if (total_node_num_ != SizeToInt(nodes_info_.size())) {
if (total_node_num_ != SizeToInt(registered_nodes_info_.size())) {
MS_LOG(WARNING) << "The cluster is not ready after "
<< PSContext::instance()->cluster_config().cluster_available_timeout
<< " seconds,so finish the cluster, and change total node number from " << total_node_num_ << " to "
<< nodes_info_.size();
current_node_num_ = nodes_info_.size();
<< registered_nodes_info_.size();
current_node_num_ = registered_nodes_info_.size();
UpdateClusterState(ClusterState::NODE_TIMEOUT);
}
}
@ -149,7 +149,7 @@ void NodeManager::AddScaleOutDoneNode(const std::string &node_id) { scale_out_do
void NodeManager::AddScaleInDoneNode(const std::string &node_id) { scale_in_done_nodes_id_.insert(node_id); }
bool NodeManager::IsAllNodesRegistered() { return SizeToInt(nodes_info_.size()) == total_node_num_; }
bool NodeManager::IsAllNodesRegistered() { return SizeToInt(registered_nodes_info_.size()) == total_node_num_; }
bool NodeManager::IsAllNodesFinished() { return SizeToInt(finish_nodes_id_.size()) == total_node_num_; }
@ -159,6 +159,12 @@ bool NodeManager::IsAllNodesScaleInDone() { return SizeToInt(scale_in_done_nodes
std::unordered_map<std::string, NodeInfo> &NodeManager::nodes_info() { return nodes_info_; }
void NodeManager::UpdateNodesInfo() {
MS_LOG(INFO) << "Update nodes info.";
nodes_info_.clear();
nodes_info_ = registered_nodes_info_;
}
void NodeManager::UpdateNodeState(const NodeState &state) {
std::lock_guard<std::mutex> lk(node_mutex_);
node_state_ = state;
@ -181,7 +187,7 @@ ClusterState NodeManager::GetClusterState() {
void NodeManager::ResetMetadata() {
MS_LOG(WARNING) << "Reset metadata.";
nodes_info_.clear();
registered_nodes_info_.clear();
heartbeats_.clear();
next_worker_rank_id_ = -1;
next_server_rank_id_ = -1;

View File

@ -86,6 +86,8 @@ class NodeManager {
bool IsAllNodesScaleInDone();
std::unordered_map<std::string, NodeInfo> &nodes_info();
// After all the nodes are registered successfully, the nodes info can be updated.
void UpdateNodesInfo();
void set_total_node_num(const int32_t &node_num);
const int32_t &total_node_num();
@ -114,7 +116,10 @@ class NodeManager {
std::atomic<int> next_worker_rank_id_;
std::atomic<int> next_server_rank_id_;
// worker nodes and server nodes
// Whenever a node is registered, it will be stored in this map.
std::unordered_map<std::string, NodeInfo> registered_nodes_info_;
// When all nodes are registered successfully, then all nodes info will be stored in this map. In other words, the
// nodes_info_ is a snapshot of the registered_nodes_info_.
std::unordered_map<std::string, NodeInfo> nodes_info_;
std::mutex assign_rank_id_mutex_;
std::mutex heartbeat_mutex_;

View File

@ -39,6 +39,8 @@ enum NodeCommand {
SCALE_IN_DONE = 11;
// This command is used to send user defined event.
SEND_EVENT = 12;
// This command is used to send finish message from the scale in node to the scheduler.
SCALE_IN_FINISH = 13;
}
enum NodeRole {

View File

@ -82,6 +82,7 @@ void SchedulerNode::InitCommandHandler() {
handlers_[NodeCommand::SCALE_OUT_DONE] = &SchedulerNode::ProcessScaleOutDone;
handlers_[NodeCommand::SCALE_IN_DONE] = &SchedulerNode::ProcessScaleInDone;
handlers_[NodeCommand::SEND_EVENT] = &SchedulerNode::ProcessSendEvent;
handlers_[NodeCommand::SCALE_IN_FINISH] = &SchedulerNode::ProcessScaleInFinish;
}
void SchedulerNode::CreateTcpServer() {
@ -132,6 +133,7 @@ void SchedulerNode::ProcessRegister(std::shared_ptr<TcpServer> server, std::shar
register_resp_message.ByteSizeLong());
if (node_manager_.IsAllNodesRegistered()) {
node_manager_.UpdateNodesInfo();
is_ready_ = true;
auto node_infos = node_manager_.nodes_info();
for (const auto &kvs : node_infos) {
@ -181,6 +183,23 @@ void SchedulerNode::ProcessFetchMetadata(std::shared_ptr<TcpServer> server, std:
fetch_servers_message.ByteSizeLong());
}
void SchedulerNode::ProcessScaleInFinish(std::shared_ptr<TcpServer> server, std::shared_ptr<TcpConnection> conn,
std::shared_ptr<MessageMeta> meta, const void *data, size_t size) {
MS_EXCEPTION_IF_NULL(server);
MS_EXCEPTION_IF_NULL(conn);
MS_EXCEPTION_IF_NULL(meta);
MS_EXCEPTION_IF_NULL(data);
auto finish_message = std::make_unique<std::string>(reinterpret_cast<const char *>(data), size);
MS_LOG(INFO) << "Process finish message from node id:" << *finish_message;
bool res = CommUtil::Retry([&] { return node_manager_.IsAllNodesRegistered(); }, kCheckRegisteredRetryCount,
kCheckRegisteredIntervalInMs);
if (res == false) {
MS_LOG(ERROR) << "All nodes have not yet fully registered successfully.";
}
MS_LOG(INFO) << "The scheduler response the scale in finish message.";
server->SendMessage(conn, meta, Protos::PROTOBUF, data, size);
}
void SchedulerNode::ProcessScaleOutDone(std::shared_ptr<TcpServer> server, std::shared_ptr<TcpConnection> conn,
std::shared_ptr<MessageMeta> meta, const void *data, size_t size) {
MS_EXCEPTION_IF_NULL(server);

View File

@ -78,6 +78,10 @@ class SchedulerNode : public Node {
std::shared_ptr<MessageMeta> meta, const void *data, size_t size);
void ProcessFetchMetadata(std::shared_ptr<TcpServer> server, std::shared_ptr<TcpConnection> conn,
std::shared_ptr<MessageMeta> meta, const void *data, size_t size);
// After receiving the scale in node's scale_in_finish message,then wait for all others nodes are registered to the
// scheduler.
void ProcessScaleInFinish(std::shared_ptr<TcpServer> server, std::shared_ptr<TcpConnection> conn,
std::shared_ptr<MessageMeta> meta, const void *data, size_t size);
// Process scale_out_done messages from workers/servers
void ProcessScaleOutDone(std::shared_ptr<TcpServer> server, std::shared_ptr<TcpConnection> conn,