!20980 fixed node timeout

Merge pull request !20980 from anancds/node
This commit is contained in:
i-robot 2021-07-28 09:30:03 +00:00 committed by Gitee
commit 7be2078a97
12 changed files with 29 additions and 13 deletions

View File

@ -114,7 +114,7 @@ constexpr uint32_t kCheckRegisteredRetryCount = 30;
constexpr uint32_t kCheckRegisteredIntervalInMs = 1000; constexpr uint32_t kCheckRegisteredIntervalInMs = 1000;
// The type of persistent storage, currently only supports file storage. // The type of persistent storage, currently only supports file storage.
constexpr char kStoreType[] = "storge_type"; constexpr char kStoreType[] = "storage_type";
// The file used to storage metadata. // The file used to storage metadata.
constexpr char kStoreFilePath[] = "storage_file_path"; constexpr char kStoreFilePath[] = "storage_file_path";
// 1 indicates that the persistent storage type is file. // 1 indicates that the persistent storage type is file.

View File

@ -187,7 +187,7 @@ std::string CommUtil::ParseConfig(const Configuration &config, const std::string
return ""; return "";
} }
std::string path = config.Get(key, ""); std::string path = config.GetString(key, "");
return path; return path;
} }
} // namespace core } // namespace core

View File

@ -226,7 +226,7 @@ bool TcpClient::EstablishSSL() {
// 2. Parse the client password. // 2. Parse the client password.
std::string client_password = CommUtil::ParseConfig(*config_, kClientPassword); std::string client_password = CommUtil::ParseConfig(*config_, kClientPassword);
if (!client_password.empty()) { if (client_password.empty()) {
MS_LOG(WARNING) << "The key:" << kClientPassword << "'s value is empty."; MS_LOG(WARNING) << "The key:" << kClientPassword << "'s value is empty.";
return false; return false;
} }

View File

@ -308,7 +308,7 @@ void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, st
// 2. Parse the server password. // 2. Parse the server password.
std::string server_password = CommUtil::ParseConfig(*(server->config_), kServerPassword); std::string server_password = CommUtil::ParseConfig(*(server->config_), kServerPassword);
if (!server_password.empty()) { if (server_password.empty()) {
MS_LOG(EXCEPTION) << "The key:" << kServerPassword << "'s value is empty."; MS_LOG(EXCEPTION) << "The key:" << kServerPassword << "'s value is empty.";
} }

View File

@ -45,9 +45,12 @@ class Configuration {
// Determine whether the initialization has been completed. // Determine whether the initialization has been completed.
virtual bool IsInitialized() const = 0; virtual bool IsInitialized() const = 0;
// Get configuration data from database or config file. // Get configuration data from database or config file.The returned string is quoted
virtual std::string Get(const std::string &key, const std::string &defaultvalue) const = 0; virtual std::string Get(const std::string &key, const std::string &defaultvalue) const = 0;
// Get configuration data from database or config file.The returned string is not quoted
virtual std::string GetString(const std::string &key, const std::string &defaultvalue) const = 0;
// Put configuration data to database or config file. // Put configuration data to database or config file.
virtual void Put(const std::string &key, const std::string &defaultvalue) = 0; virtual void Put(const std::string &key, const std::string &defaultvalue) = 0;

View File

@ -49,6 +49,15 @@ std::string FileConfiguration::Get(const std::string &key, const std::string &de
return res; return res;
} }
std::string FileConfiguration::GetString(const std::string &key, const std::string &defaultvalue) const {
if (!js.contains(key)) {
MS_LOG(WARNING) << "The key:" << key << " is not exist.";
return defaultvalue;
}
std::string res = js.at(key);
return res;
}
void FileConfiguration::Put(const std::string &key, const std::string &value) { void FileConfiguration::Put(const std::string &key, const std::string &value) {
std::ofstream output_file(file_path_); std::ofstream output_file(file_path_);
js[key] = value; js[key] = value;

View File

@ -56,6 +56,8 @@ class FileConfiguration : public Configuration {
std::string Get(const std::string &key, const std::string &defaultvalue) const override; std::string Get(const std::string &key, const std::string &defaultvalue) const override;
std::string GetString(const std::string &key, const std::string &defaultvalue) const override;
void Put(const std::string &key, const std::string &value) override; void Put(const std::string &key, const std::string &value) override;
bool Exists(const std::string &key) override; bool Exists(const std::string &key) override;

View File

@ -172,6 +172,7 @@ void SchedulerNode::ProcessRegister(const std::shared_ptr<TcpServer> &server,
MS_LOG(INFO) << "The scheduler send metadata to scale in node:" << id; MS_LOG(INFO) << "The scheduler send metadata to scale in node:" << id;
auto scale_in_client = GetOrCreateClient(nodes[id]); auto scale_in_client = GetOrCreateClient(nodes[id]);
SendMetadata(scale_in_client, nodes[id].rank_id_); SendMetadata(scale_in_client, nodes[id].rank_id_);
node_manager_.UpdateHeartbeat(id);
} }
} }
node_manager_.UpdateNodesInfo(); node_manager_.UpdateNodesInfo();
@ -179,6 +180,7 @@ void SchedulerNode::ProcessRegister(const std::shared_ptr<TcpServer> &server,
for (const auto &kvs : node_infos) { for (const auto &kvs : node_infos) {
auto client = GetOrCreateClient(kvs.second); auto client = GetOrCreateClient(kvs.second);
SendMetadata(client, kvs.second.rank_id_); SendMetadata(client, kvs.second.rank_id_);
node_manager_.UpdateHeartbeat(kvs.first);
} }
node_manager_.UpdateClusterState(ClusterState::CLUSTER_READY); node_manager_.UpdateClusterState(ClusterState::CLUSTER_READY);
wait_start_cond_.notify_all(); wait_start_cond_.notify_all();

View File

@ -27,14 +27,14 @@ bool ServerNode::Start(const uint32_t &timeout) {
MS_LOG(INFO) << "[Server start]: 4. The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_) MS_LOG(INFO) << "[Server start]: 4. The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< " the node id:" << node_info_.node_id_ << " successfully registered to the scheduler!"; << " the node id:" << node_info_.node_id_ << " successfully registered to the scheduler!";
StartHeartbeatTimer(client_to_scheduler_);
MS_LOG(INFO) << "[Server start]: 5. Server start heartbeat timer!";
if (!WaitForStart(timeout)) { if (!WaitForStart(timeout)) {
MS_LOG(ERROR) << "Start server node timeout!"; MS_LOG(ERROR) << "Start server node timeout!";
return false; return false;
} }
StartHeartbeatTimer(client_to_scheduler_);
MS_LOG(INFO) << "[Server start]: 5. Server start heartbeat timer!";
MsException::Instance().CheckException(); MsException::Instance().CheckException();
MS_LOG(INFO) << "[Server start]: 6. Successfully start server node!"; MS_LOG(INFO) << "[Server start]: 6. Successfully start server node!";
return true; return true;

View File

@ -27,14 +27,14 @@ bool WorkerNode::Start(const uint32_t &timeout) {
MS_LOG(INFO) << "[Worker start]: 4. The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_) MS_LOG(INFO) << "[Worker start]: 4. The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_)
<< " the node id:" << node_info_.node_id_ << " successfully registered to the scheduler!"; << " the node id:" << node_info_.node_id_ << " successfully registered to the scheduler!";
StartHeartbeatTimer(client_to_scheduler_);
MS_LOG(INFO) << "[Worker start]: 5. Worker start heartbeat timer!";
if (!WaitForStart(timeout)) { if (!WaitForStart(timeout)) {
MS_LOG(ERROR) << "Start Worker node timeout!"; MS_LOG(ERROR) << "Start Worker node timeout!";
return false; return false;
} }
StartHeartbeatTimer(client_to_scheduler_);
MS_LOG(INFO) << "[Worker start]: 5. Worker start heartbeat timer!";
MsException::Instance().CheckException(); MsException::Instance().CheckException();
MS_LOG(INFO) << "[Worker start]: 6. Successfully start worker node!"; MS_LOG(INFO) << "[Worker start]: 6. Successfully start worker node!";
return true; return true;

View File

@ -1,6 +1,6 @@
{ {
"recovery": { "recovery": {
"storge_type": 1, "storage_type": 1,
"storage_file_path": "recovery.json" "storage_file_path": "recovery.json"
}, },
"server_cert_path": "server.crt", "server_cert_path": "server.crt",

View File

@ -1,6 +1,6 @@
{ {
"recovery": { "recovery": {
"storge_type": 1, "storage_type": 1,
"storage_file_path": "recovery.json" "storage_file_path": "recovery.json"
}, },
"server_cert_path": "server.crt", "server_cert_path": "server.crt",