Reuse rank id

This commit is contained in:
chendongsheng 2021-07-08 10:57:34 +08:00
parent 104782f257
commit a4b0c29a6f
5 changed files with 32 additions and 12 deletions

View File

@ -24,6 +24,7 @@ void AbstractNode::Register(const std::shared_ptr<TcpClient> &client) {
MS_EXCEPTION_IF_NULL(client);
auto message_meta = std::make_shared<MessageMeta>();
message_meta->set_cmd(NodeCommand::REGISTER);
message_meta->set_rank_id(node_info_.rank_id_);
RegisterMessage register_message;
register_message.set_node_id(node_info_.node_id_);

View File

@ -36,7 +36,7 @@ enum class ClusterEvent {
};
struct NodeInfo {
NodeInfo() : ip_(""), port_(0), node_role_(NodeRole::SCHEDULER), rank_id_(0), is_alive(false) {}
NodeInfo() : ip_(""), port_(0), node_role_(NodeRole::SCHEDULER), rank_id_(UINT32_MAX), is_alive(false) {}
// ip
std::string ip_;
// the port of this node

View File

@ -27,7 +27,7 @@ void NodeManager::InitNode() {
total_node_num_ = initial_total_node_num_;
}
uint32_t NodeManager::NextRankId(const RegisterMessage &register_message) {
uint32_t NodeManager::NextRankId(const RegisterMessage &register_message, const std::shared_ptr<MessageMeta> &meta) {
std::lock_guard<std::mutex> lock(assign_rank_id_mutex_);
uint32_t rank_id = UINT_MAX;
@ -51,7 +51,12 @@ uint32_t NodeManager::NextRankId(const RegisterMessage &register_message) {
return res;
});
if (rank_it == registered_nodes_info_.end()) {
rank_id = ++next_server_rank_id_;
if (meta->rank_id() != UINT32_MAX && UintToInt(meta->rank_id()) <= next_server_rank_id_) {
rank_id = meta->rank_id();
MS_LOG(INFO) << "Use the old rank id:" << rank_id;
} else {
rank_id = ++next_server_rank_id_;
}
} else {
registered_nodes_info_.erase((*rank_it).first);
}
@ -85,7 +90,12 @@ uint32_t NodeManager::NextRankId(const RegisterMessage &register_message) {
return res;
});
if (worker_rank_it == registered_nodes_info_.end()) {
rank_id = ++next_worker_rank_id_;
if (meta->rank_id() != UINT32_MAX && UintToInt(meta->rank_id()) <= next_worker_rank_id_) {
rank_id = meta->rank_id();
MS_LOG(INFO) << "Use the old rank id:" << rank_id;
} else {
rank_id = ++next_worker_rank_id_;
}
} else {
registered_nodes_info_.erase((*worker_rank_it).first);
}
@ -235,12 +245,21 @@ ClusterState NodeManager::GetClusterState() {
return cluster_state_;
}
void NodeManager::ResetMetadata() {
void NodeManager::ResetMetadata(const std::vector<std::string> &scale_in_nodes) {
MS_LOG(WARNING) << "Reset metadata.";
std::vector<uint32_t> server_rank_ids;
if (GetClusterState() == ClusterState::CLUSTER_SCALE_IN) {
for (const auto &item : scale_in_nodes) {
if (registered_nodes_info_.count(item)) {
server_rank_ids.push_back(registered_nodes_info_[item].rank_id_);
}
}
auto min_rank_id = std::min_element(server_rank_ids.begin(), server_rank_ids.end());
next_server_rank_id_ = *min_rank_id - 1;
MS_LOG(INFO) << "The next server rank id:" << next_server_rank_id_;
}
registered_nodes_info_.clear();
heartbeats_.clear();
next_worker_rank_id_ = -1;
next_server_rank_id_ = -1;
}
bool NodeManager::IsWorkerOrServer0() {

View File

@ -56,7 +56,7 @@ class NodeManager {
// When initializing nodes, the initial number of nodes will be assigned to the total number of nodes.
void InitNode();
uint32_t NextRankId(const RegisterMessage &register_message);
uint32_t NextRankId(const RegisterMessage &register_message, const std::shared_ptr<MessageMeta> &meta);
void UpdateHeartbeat(const std::string &node_id);
@ -106,7 +106,7 @@ class NodeManager {
// When the scheduler receives the scale out or scale in message, the metadata needs to be reset, because all nodes
// will re-register.
void ResetMetadata();
void ResetMetadata(const std::vector<std::string> &scale_in_nodes = {});
// Recovery currently does not support worker or server0 node downtime.
bool IsWorkerOrServer0();

View File

@ -131,7 +131,7 @@ void SchedulerNode::ProcessRegister(std::shared_ptr<TcpServer> server, std::shar
}
// assign worker node and server node rank id
uint32_t rank_id = node_manager_.NextRankId(register_message);
uint32_t rank_id = node_manager_.NextRankId(register_message, meta);
if (rank_id == UINT32_MAX) {
MS_LOG(WARNING) << "The rank id is wrong!";
}
@ -559,7 +559,8 @@ void SchedulerNode::ProcessScaleIn(std::shared_ptr<HttpMessageHandler> resp) {
int32_t scale_worker_num = 0;
int32_t scale_server_num = 0;
auto node_infos = node_manager_.nodes_info();
node_manager_.ResetMetadata();
node_manager_.UpdateClusterState(ClusterState::CLUSTER_SCALE_IN);
node_manager_.ResetMetadata(scale_in_node_ids_);
for (auto const &val : scale_in_node_ids_) {
if (node_infos.count(val)) {
scale_in_nodes[val] = true;
@ -580,7 +581,6 @@ void SchedulerNode::ProcessScaleIn(std::shared_ptr<HttpMessageHandler> resp) {
node_manager_.set_worker_num(total_worker_num);
node_manager_.set_server_num(total_server_num);
node_manager_.set_total_node_num(total_worker_num + total_server_num);
node_manager_.UpdateClusterState(ClusterState::CLUSTER_SCALE_IN);
for (const auto &kvs : node_infos) {
auto client = GetOrCreateClient(kvs.second);
bool is_node_scale_in = false;