!33482 Replace the API of node_role
Merge pull request !33482 from chengang/replace_node_role_api
This commit is contained in:
commit
25f2f7a4e8
|
@ -34,14 +34,16 @@ bool Initialize() {
|
|||
// Server and Scheduler don't use collective communication library.
|
||||
auto node = cluster::ClusterContext::instance()->node();
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
if (node->role() != ps::core::NodeRole::SCHEDULER && node->role() != ps::core::NodeRole::SERVER) {
|
||||
const auto &cluster_ctx = cluster::ClusterContext::instance();
|
||||
MS_EXCEPTION_IF_NULL(cluster_ctx);
|
||||
if (cluster_ctx->node_role() != kEnvRoleOfScheduler && cluster_ctx->node_role() != kEnvRoleOfServer) {
|
||||
// Global rank id and size should be manually set if cluster is initialized by MindSpore communication framework.
|
||||
auto abstract_node =
|
||||
std::dynamic_pointer_cast<ps::core::AbstractNode>(cluster::ClusterContext::instance()->node());
|
||||
MS_EXCEPTION_IF_NULL(abstract_node);
|
||||
collective::CollectiveManager::instance()->set_global_rank_id(abstract_node->rank_id());
|
||||
auto global_rank_size =
|
||||
(node->role() == ps::core::NodeRole::WORKER) ? abstract_node->worker_num() : abstract_node->server_num();
|
||||
(cluster_ctx->node_role() == kEnvRoleOfWorker) ? abstract_node->worker_num() : abstract_node->server_num();
|
||||
collective::CollectiveManager::instance()->set_global_rank_size(global_rank_size);
|
||||
|
||||
if (RecoveryContext::GetInstance()->enable_recovery()) {
|
||||
|
|
|
@ -84,15 +84,10 @@ std::string GetRole() {
|
|||
if (distributed::cluster::ClusterContext::instance()->initialized()) {
|
||||
auto node = distributed::cluster::ClusterContext::instance()->node();
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
MS_LOG(INFO) << "Cluster is initialized. This node role is " << node->role();
|
||||
switch (node->role()) {
|
||||
case ps::core::NodeRole::SERVER:
|
||||
return kRolePServer;
|
||||
case ps::core::NodeRole::SCHEDULER:
|
||||
return kRolePScheduler;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
const auto &cluster_ctx = distributed::cluster::ClusterContext::instance();
|
||||
MS_EXCEPTION_IF_NULL(cluster_ctx);
|
||||
MS_LOG(INFO) << "Cluster is initialized. This node role is " << cluster_ctx->node_role();
|
||||
return cluster_ctx->node_role();
|
||||
}
|
||||
#endif
|
||||
return "";
|
||||
|
|
|
@ -750,14 +750,13 @@ std::vector<ActionItem> GetPipeline(const ResourcePtr &resource, const std::stri
|
|||
if (distributed::cluster::ClusterContext::instance()->initialized()) {
|
||||
auto node = distributed::cluster::ClusterContext::instance()->node();
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
MS_LOG(INFO) << "Cluster is initialized. This node role is " << node->role();
|
||||
switch (node->role()) {
|
||||
case ps::core::NodeRole::SERVER:
|
||||
return PServerPipeline(resource);
|
||||
case ps::core::NodeRole::SCHEDULER:
|
||||
return PSchedulerPipeline(resource);
|
||||
default:
|
||||
break;
|
||||
const auto &cluster_ctx = distributed::cluster::ClusterContext::instance();
|
||||
MS_EXCEPTION_IF_NULL(cluster_ctx);
|
||||
MS_LOG(INFO) << "Cluster is initialized. This node role is " << cluster_ctx->node_role();
|
||||
if (cluster_ctx->node_role() == distributed::kEnvRoleOfServer) {
|
||||
return PServerPipeline(resource);
|
||||
} else if (cluster_ctx->node_role() == distributed::kEnvRoleOfScheduler) {
|
||||
return PSchedulerPipeline(resource);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -34,15 +34,18 @@ AllReduceLauncher::AllReduceLauncher() {
|
|||
MS_LOG(EXCEPTION) << "The abstract node is nullptr when init AllReduceLauncher.";
|
||||
}
|
||||
rank_id_ = abs_node_->rank_id();
|
||||
node_role_ = abs_node_->role();
|
||||
rank_size_ = IntToSize(abs_node_->worker_num());
|
||||
|
||||
const auto &cluster_ctx = distributed::cluster::ClusterContext::instance();
|
||||
MS_EXCEPTION_IF_NULL(cluster_ctx);
|
||||
node_role_ = cluster_ctx->node_role();
|
||||
}
|
||||
|
||||
bool AllReduceLauncher::Execute(const void *input_data, void *const output_data, size_t data_size) const {
|
||||
MS_EXCEPTION_IF_NULL(input_data);
|
||||
MS_EXCEPTION_IF_NULL(output_data);
|
||||
// If node is scheduler, don't need to participate in the reduction.
|
||||
if (node_role_ == ps::core::SCHEDULER) {
|
||||
if (node_role_ == distributed::kEnvRoleOfScheduler) {
|
||||
return true;
|
||||
}
|
||||
size_t data_num = data_size / sizeof(float);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#ifndef MINDSPORE_CCSRC_RUNTIME_HARDWARE_CPU_ALLREDUCE_IMPL_H_
|
||||
#define MINDSPORE_CCSRC_RUNTIME_HARDWARE_CPU_ALLREDUCE_IMPL_H_
|
||||
|
||||
#include <string>
|
||||
#include "distributed/cluster/cluster_context.h"
|
||||
|
||||
namespace mindspore {
|
||||
|
@ -37,7 +38,7 @@ class AllReduceLauncher {
|
|||
private:
|
||||
size_t rank_id_{0};
|
||||
size_t rank_size_{0};
|
||||
ps::core::NodeRole node_role_{ps::core::WORKER};
|
||||
std::string node_role_{distributed::kEnvRoleOfWorker};
|
||||
ps::core::AbstractNodePtr abs_node_{nullptr};
|
||||
|
||||
AllReduceLauncher();
|
||||
|
|
Loading…
Reference in New Issue