fixed node timeout
This commit is contained in:
parent
ef08a78112
commit
f097f72798
|
@ -148,6 +148,7 @@ void SchedulerNode::ProcessRegister(std::shared_ptr<TcpServer> server, std::shar
|
||||||
is_ready_ = true;
|
is_ready_ = true;
|
||||||
MS_LOG(INFO) << "There are " << node_manager_.worker_num() << " workers and " << node_manager_.server_num()
|
MS_LOG(INFO) << "There are " << node_manager_.worker_num() << " workers and " << node_manager_.server_num()
|
||||||
<< " servers registered to scheduer, so the scheduler send meta data to worker/server.";
|
<< " servers registered to scheduer, so the scheduler send meta data to worker/server.";
|
||||||
|
node_manager_.UpdateClusterState(ClusterState::CLUSTER_READY);
|
||||||
if (node_manager_.GetClusterState() == ClusterState::CLUSTER_SCALE_IN) {
|
if (node_manager_.GetClusterState() == ClusterState::CLUSTER_SCALE_IN) {
|
||||||
auto nodes = node_manager_.nodes_info();
|
auto nodes = node_manager_.nodes_info();
|
||||||
for (const auto &id : scale_in_node_ids_) {
|
for (const auto &id : scale_in_node_ids_) {
|
||||||
|
@ -162,7 +163,6 @@ void SchedulerNode::ProcessRegister(std::shared_ptr<TcpServer> server, std::shar
|
||||||
auto client = GetOrCreateClient(kvs.second);
|
auto client = GetOrCreateClient(kvs.second);
|
||||||
SendMetadata(client, kvs.second.rank_id_);
|
SendMetadata(client, kvs.second.rank_id_);
|
||||||
}
|
}
|
||||||
node_manager_.UpdateClusterState(ClusterState::CLUSTER_READY);
|
|
||||||
wait_start_cond_.notify_all();
|
wait_start_cond_.notify_all();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,7 @@ size_t Worker::GetParamKey(const std::string ¶m_name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Worker::SetParamInitInServer(const std::string ¶m_name, bool init_in_server) {
|
void Worker::SetParamInitInServer(const std::string ¶m_name, bool init_in_server) {
|
||||||
MS_LOG(INFO) << "Set parameter " << param_name << " init_in_server:" << init_in_server;
|
MS_LOG(DEBUG) << "Set parameter " << param_name << " init_in_server:" << init_in_server;
|
||||||
param_to_init_in_server_[param_name] = init_in_server;
|
param_to_init_in_server_[param_name] = init_in_server;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,7 +260,7 @@ void Worker::InitPSParamAndOptim(const AnfNodePtr &input_node, const tensor::Ten
|
||||||
SetParamInitInServer(param_name, init_in_server);
|
SetParamInitInServer(param_name, init_in_server);
|
||||||
bool init = IsKeyInit(param_key);
|
bool init = IsKeyInit(param_key);
|
||||||
if (!init) {
|
if (!init) {
|
||||||
MS_LOG(INFO) << "Init parameter key " << param_key << " and optimizer in parameter server side for " << param_name
|
MS_LOG(DEBUG) << "Init parameter key " << param_key << " and optimizer in parameter server side for " << param_name
|
||||||
<< ", whether init in server: " << init_in_server;
|
<< ", whether init in server: " << init_in_server;
|
||||||
AddKeyToServerId(param_key);
|
AddKeyToServerId(param_key);
|
||||||
if (!PsDataPrefetch::GetInstance().cache_enable()) {
|
if (!PsDataPrefetch::GetInstance().cache_enable()) {
|
||||||
|
@ -449,7 +449,7 @@ void Worker::AddKeyByHashMod(const Key &key) {
|
||||||
MS_LOG(EXCEPTION) << "Server number is invalid:0";
|
MS_LOG(EXCEPTION) << "Server number is invalid:0";
|
||||||
}
|
}
|
||||||
key_to_server_id_[key] = static_cast<int64_t>(key % server_num_);
|
key_to_server_id_[key] = static_cast<int64_t>(key % server_num_);
|
||||||
MS_LOG(INFO) << "The server id of key " << key << " is " << key_to_server_id_[key];
|
MS_LOG(DEBUG) << "The server id of key " << key << " is " << key_to_server_id_[key];
|
||||||
}
|
}
|
||||||
|
|
||||||
void Worker::InitPSOptimId(const size_t param_key) {
|
void Worker::InitPSOptimId(const size_t param_key) {
|
||||||
|
|
Loading…
Reference in New Issue