Support recovery for the meta server node

This commit is contained in:
Parallels 2022-05-11 09:51:33 +08:00
parent 74fce14134
commit 9002e03524
8 changed files with 123 additions and 16 deletions

View File

@ -20,12 +20,19 @@
#include "proto/topology.pb.h"
#include "distributed/rpc/tcp/constants.h"
#include "distributed/cluster/topology/utils.h"
#include "distributed/recovery/recovery_context.h"
#include "distributed/recovery/file_configuration.h"
#include "distributed/cluster/topology/meta_server_node.h"
namespace mindspore {
namespace distributed {
namespace cluster {
namespace topology {
// The keys for the persisted metadata of compute node states.
constexpr char kComputeNodeStates[] = "compute_node_states";
constexpr char kNodeId[] = "node_id";
constexpr char kRecoveryFileName[] = "recovery.dat";
bool MetaServerNode::Initialize() {
// Init the address of meta server node.
RETURN_IF_FALSE_WITH_LOG(FillMetaServerAddress(&meta_server_addr_),
@ -34,6 +41,11 @@ bool MetaServerNode::Initialize() {
// Init the TCP server.
RETURN_IF_FALSE_WITH_LOG(InitTCPServer(), "Failed to create the TCP server.");
// The meta server node is restarted and the metadata of cluster needs to be recovered.
if (recovery::IsEnableRecovery()) {
RETURN_IF_FALSE_WITH_LOG(Recovery(), "Failed to recover from configuration.");
}
start_time_ = Now();
// Init the thread for monitoring the state of the cluster topo.
@ -46,7 +58,6 @@ bool MetaServerNode::Initialized() {
}
bool MetaServerNode::Finalize() {
std::unique_lock<std::shared_mutex> lock(nodes_mutex_);
if (topo_state_ != TopoState::kFinished) {
MS_LOG(WARNING) << "The meta server node can not be finalized because there are still " << nodes_.size()
<< " alive nodes.";
@ -127,10 +138,8 @@ MessageBase *const MetaServerNode::ProcessRegister(MessageBase *const message) {
if (nodes_.find(node_id) == nodes_.end()) {
std::shared_ptr<ComputeGraphNodeState> node_state = std::make_shared<ComputeGraphNodeState>(node_id);
nodes_[node_id] = node_state;
if (nodes_.size() == total_node_num_) {
topo_state_ = TopoState::kInitialized;
}
MS_LOG(INFO) << "The new node: " << node_id << " is registered successfully.";
TransitionToInitialized();
RegistrationRespMessage reg_resp_msg;
reg_resp_msg.set_success(true);
@ -251,9 +260,7 @@ void MetaServerNode::UpdateTopoState() {
}
std::shared_lock<std::shared_mutex> lock(nodes_mutex_);
if (nodes_.size() == total_node_num_) {
MS_LOG(INFO) << "The cluster topology has been constructed successfully";
topo_state_ = TopoState::kInitialized;
if (TransitionToInitialized()) {
continue;
}
MS_LOG(INFO) << "The cluster topology is in the process of constructing, current alive node num: ("
@ -269,6 +276,83 @@ void MetaServerNode::UpdateTopoState() {
}
}
bool MetaServerNode::TransitionToInitialized() {
if (nodes_.size() == total_node_num_) {
// Persist the cluster metadata into storage through configuration.
if (recovery::IsEnableRecovery() && configuration_->Empty()) {
if (!Persist()) {
MS_LOG(EXCEPTION) << "Failed to persist the metadata of the cluster.";
}
}
topo_state_ = TopoState::kInitialized;
MS_LOG(INFO) << "The cluster topology has been constructed successfully";
return true;
}
return false;
}
bool MetaServerNode::Recovery() {
std::string recovery_path = recovery::RecoveryFullPath();
configuration_ = std::make_unique<recovery::FileConfiguration>(recovery_path + "/" + kRecoveryFileName);
RETURN_IF_FALSE_WITH_LOG(configuration_->Initialize(),
"Failed to initialize the recovery file configuration from file path: " << recovery_path);
if (configuration_->Empty()) {
MS_LOG(INFO) << "The meta server node is started for the first time.";
return true;
// The meta server node is restarted and the metadata of cluster needs to be recovered.
} else {
std::string states_key = kComputeNodeStates;
RETURN_IF_FALSE_WITH_LOG(configuration_->Exists(states_key),
"Can not find the key " + states_key + " in configuration.");
// Check the validation of the previous metadata.
const auto &states = configuration_->Get(states_key, "");
nlohmann::json node_states = nlohmann::json::parse(states);
RETURN_IF_FALSE_WITH_LOG(node_states.size() == total_node_num_,
"Invalid number of node in configuration: " + std::to_string(node_states.size()) +
", expected total number of node: " + std::to_string(total_node_num_));
// Restore the nodes state.
for (auto iter = node_states.begin(); iter != node_states.end(); ++iter) {
const auto &node_id = iter.key();
std::shared_ptr<ComputeGraphNodeState> node_state = std::make_shared<ComputeGraphNodeState>(node_id);
time(&(node_state->last_update));
nodes_[node_id] = node_state;
}
std::shared_lock<std::shared_mutex> lock(nodes_mutex_);
if (nodes_.size() == total_node_num_) {
topo_state_ = TopoState::kInitialized;
}
}
return true;
}
bool MetaServerNode::Persist() {
MS_EXCEPTION_IF_NULL(configuration_);
if (total_node_num_ != nodes_.size()) {
MS_LOG(ERROR) << "Invalid number of alive node: " << nodes_.size()
<< ", the expected total number of node is: " << total_node_num_;
return false;
}
nlohmann::json node_states;
for (auto iter = nodes_.begin(); iter != nodes_.end(); ++iter) {
const auto &node_id = iter->first;
nlohmann::json node_state;
node_state[kNodeId] = node_id;
node_states[node_id] = node_state.dump();
}
configuration_->Put(kComputeNodeStates, node_states.dump());
configuration_->Flush();
return true;
}
TopoState MetaServerNode::TopologyState() { return topo_state_; }
size_t MetaServerNode::GetAliveNodeNum() {

View File

@ -26,6 +26,7 @@
#include <shared_mutex>
#include "distributed/cluster/topology/common.h"
#include "distributed/rpc/tcp/tcp_server.h"
#include "distributed/recovery/configuration.h"
#include "distributed/cluster/topology/node_base.h"
namespace mindspore {
@ -107,6 +108,15 @@ class MetaServerNode : public NodeBase {
// Maintain the state which is type of `TopoState` of this cluster topology.
void UpdateTopoState();
// Try to transition the state of cluster to be initialized.
bool TransitionToInitialized();
// Recover metadata from the configuration if recovery is enabled.
bool Recovery();
// Persist the required metadata of cluster into storage through configuration.
bool Persist();
// The meta server address used to manage the tcp server.
MetaServerAddress meta_server_addr_;
@ -148,6 +158,9 @@ class MetaServerNode : public NodeBase {
std::map<std::string, std::string> metadata_;
mutable std::shared_mutex meta_mutex_;
// A key-value pairs metadata config used for failover recovery if enabled.
std::unique_ptr<recovery::Configuration> configuration_;
};
} // namespace topology
} // namespace cluster

View File

@ -44,6 +44,9 @@ class Configuration {
// Check whether the specified configuration key exists.
virtual bool Exists(const std::string &key) const = 0;
// Check whether the configuration contains any key-value pairs.
virtual bool Empty() const = 0;
// Flush all the key-value pairs in memory into the specific sub-class's storage.
virtual bool Flush() = 0;
};

View File

@ -55,6 +55,8 @@ void FileConfiguration::Put(const std::string &key, const std::string &value) {
bool FileConfiguration::Exists(const std::string &key) const { return values_.contains(key); }
bool FileConfiguration::Empty() const { return values_.size() == 0; }
bool FileConfiguration::Flush() {
if (!storage::FileIOUtils::IsFileOrDirExist(file_)) {
MS_LOG(EXCEPTION) << "The local configuration file : " << file_ << " does not exist.";

View File

@ -38,6 +38,8 @@ class FileConfiguration : public Configuration {
bool Exists(const std::string &key) const override;
bool Empty() const override;
bool Flush() override;
private:

View File

@ -33,10 +33,6 @@
namespace mindspore {
namespace distributed {
namespace recovery {
constexpr char kEnvEnableRecovery[] = "MS_ENABLE_RECOVERY";
constexpr char kEnvRecoveryPath[] = "MS_RECOVERY_PATH";
constexpr char kEnvRecoveryInterval[] = "MS_RECOVERY_INTERVAL";
constexpr char kCkptSuffix[] = ".ckpt";
constexpr char kCkptPath[] = "ckpt_path";
constexpr char kJsonSuffix[] = ".json";
@ -77,7 +73,7 @@ void RecoveryContext::Initialize() {
}
// 1. Read environment variable.
enable_recovery_ = (common::GetEnv(kEnvEnableRecovery) == std::string("1"));
enable_recovery_ = IsEnableRecovery();
if (!enable_recovery_) {
return;
}
@ -86,7 +82,7 @@ void RecoveryContext::Initialize() {
MS_EXCEPTION_IF_NULL(context_ptr);
context_ptr->set_param<bool>(MS_CTX_ENABLE_RECOVERY, true);
recovery_path_ = common::GetEnv(kEnvRecoveryPath);
recovery_path_ = RecoveryFullPath();
if (recovery_path_.empty()) {
MS_LOG(EXCEPTION) << "The recovery path is empty, please export MS_RECOVERY_PATH correctly.";
}

View File

@ -29,6 +29,16 @@
namespace mindspore {
namespace distributed {
namespace recovery {
constexpr char kEnvEnableRecovery[] = "MS_ENABLE_RECOVERY";
constexpr char kEnvRecoveryPath[] = "MS_RECOVERY_PATH";
constexpr char kEnvRecoveryInterval[] = "MS_RECOVERY_INTERVAL";
__attribute__((unused)) static bool IsEnableRecovery() {
return common::GetEnv(kEnvEnableRecovery) == std::string("1");
}
__attribute__((unused)) static std::string RecoveryFullPath() { return common::GetEnv(kEnvRecoveryPath); }
using distributed::storage::FileIOUtils;
using distributed::storage::JsonUtils;

View File

@ -23,9 +23,6 @@
namespace mindspore {
namespace distributed {
namespace recovery {
constexpr char kEnvEnableRecovery[] = "MS_ENABLE_RECOVERY";
constexpr char kEnvRecoveryPath[] = "MS_RECOVERY_PATH";
constexpr char kEnvRecoveryInterval[] = "MS_RECOVERY_INTERVAL";
constexpr char kEnvMSRole[] = "MS_ROLE";
class TestRecoveryContext : public UT::Common {