Add ps execution mode implementation

This commit is contained in:
ZPaC 2022-03-20 21:07:19 +08:00
parent af1f1b346d
commit 506989abb8
4 changed files with 273 additions and 15 deletions

View File

@ -26,12 +26,13 @@
#include "distributed/constants.h" #include "distributed/constants.h"
#include "utils/log_adapter.h" #include "utils/log_adapter.h"
#include "utils/ms_utils.h" #include "utils/ms_utils.h"
#include "include/backend/visible.h"
namespace mindspore { namespace mindspore {
namespace distributed { namespace distributed {
namespace cluster { namespace cluster {
// The dummy cluster context interface. This class is for ut test and windows compiling. // The dummy cluster context interface. This class is for ut test and windows compiling.
class ClusterContext { class BACKEND_EXPORT ClusterContext {
public: public:
~ClusterContext() = default; ~ClusterContext() = default;
DISABLE_COPY_AND_ASSIGN(ClusterContext) DISABLE_COPY_AND_ASSIGN(ClusterContext)

View File

@ -467,5 +467,230 @@ bool GraphSplitter::IsNodesWithSameLabel(const AnfNodePtr &node1, const AnfNodeP
} }
return node_labels_[node1] == node_labels_[node2]; return node_labels_[node1] == node_labels_[node2];
} }
void ParameterServerMode::PreBuildDistributedGraph() {
MS_EXCEPTION_IF_NULL(node_labels_);
ProcessForSplittedOptimizer();
}
void ParameterServerMode::PostBuildDistributedGraph(const InterProcessOpEdgesInfo &comm_edges) {
MS_EXCEPTION_IF_NULL(node_labels_);
// Judge the node role number validation.
uint32_t worker_num = ClusterContext::instance()->node_num(distributed::kEnvRoleOfWorker);
if (worker_num == 0) {
MS_LOG(EXCEPTION) << "In PS mode, worker number should be greater than 0.";
}
uint32_t server_num = ClusterContext::instance()->node_num(distributed::kEnvRoleOfServer);
if (server_num == 0) {
MS_LOG(EXCEPTION) << "In PS mode, server number should be greater than 0.";
}
// Only multiple worker scenario needs this optimizer.
if (worker_num < kMinGradAccumWorkerNum) {
return;
}
MS_EXCEPTION_IF_NULL(func_graph_);
auto return_node = func_graph_->get_return();
MS_EXCEPTION_IF_NULL(return_node);
std::vector<AnfNodePtr> nodes = FuncGraph::TopoSort(return_node);
std::vector<CNodePtr> ps_optimizer_node_list = FilterServerAwareOptimizerList(nodes);
// Duplicate out degrees for ps optimizers because defaultly there's only one edge to the rank 0 worker.
for (const auto &ps_optimizer : ps_optimizer_node_list) {
for (const auto &edge_info : comm_edges) {
if (edge_info.first.src_node == ps_optimizer) {
// The optimizer's output should always connect to Send node which is the input of a MakeTuple node.
// We need to replace the MakeTuple node with a new one.
const auto &origin_send_node = std::get<0>(edge_info.second);
std::vector<AnfNodePtr> new_make_tuple_inputs = {NewValueNode(prim::kPrimMakeTuple), origin_send_node};
AnfNodePtr dst_node = edge_info.first.dst_node;
for (uint32_t i = 1; i < worker_num; i++) {
OperatorLabel worker_label = {i, distributed::kEnvRoleOfWorker};
InterProcessOpEdge edge = {ps_optimizer, node_labels_->at(ps_optimizer), dst_node, worker_label};
auto duplicated_send_node = CreateSendNode(func_graph_, edge);
node_labels_->at(duplicated_send_node) = edge.src_label;
new_make_tuple_inputs.emplace_back(duplicated_send_node);
}
auto new_make_tuple_node = func_graph_->NewCNode(new_make_tuple_inputs);
new_make_tuple_node->set_abstract(new_make_tuple_inputs.back()->abstract());
(void)func_graph_->manager()->Replace(origin_send_node, new_make_tuple_node);
}
}
}
}
void ParameterServerMode::ProcessForSplittedOptimizer() {
// Judge the node role number validation.
uint32_t worker_num = ClusterContext::instance()->node_num(distributed::kEnvRoleOfWorker);
if (worker_num == 0) {
MS_LOG(EXCEPTION) << "In PS mode, worker number should be greater than 0.";
}
uint32_t server_num = ClusterContext::instance()->node_num(distributed::kEnvRoleOfServer);
if (server_num == 0) {
MS_LOG(EXCEPTION) << "In PS mode, server number should be greater than 0.";
}
// Only multiple worker scenario needs this optimizer.
if (worker_num < kMinGradAccumWorkerNum) {
return;
}
MS_EXCEPTION_IF_NULL(func_graph_);
auto return_node = func_graph_->get_return();
MS_EXCEPTION_IF_NULL(return_node);
std::vector<AnfNodePtr> nodes = FuncGraph::TopoSort(return_node);
std::vector<CNodePtr> ps_optimizer_node_list = FilterServerAwareOptimizerList(nodes);
for (const auto &ps_optimizer : ps_optimizer_node_list) {
MS_EXCEPTION_IF_NULL(ps_optimizer);
// Load attributes for this optimizer.
size_t gradient_index = common::AnfAlgo::HasNodeAttr(kAttrGradientInputIndex, ps_optimizer)
? common::AnfAlgo::GetNodeAttr<int64_t>(ps_optimizer, kAttrGradientInputIndex)
: UINT64_MAX;
size_t indices_index = common::AnfAlgo::HasNodeAttr(kAttrIndicesInputIndex, ps_optimizer)
? common::AnfAlgo::GetNodeAttr<int64_t>(ps_optimizer, kAttrIndicesInputIndex)
: UINT64_MAX;
std::string gradient_type = (common::AnfAlgo::HasNodeAttr(kAttrGradientType, ps_optimizer))
? common::AnfAlgo::GetNodeAttr<std::string>(ps_optimizer, kAttrGradientType)
: kDenseGradient;
if (kGradTypeToAccumOpName.count(gradient_type) == 0) {
MS_LOG(EXCEPTION) << "The gradient type " << gradient_type << " is invalid.";
}
for (size_t i = 0; i < common::AnfAlgo::GetInputNum(ps_optimizer); i++) {
auto input = common::AnfAlgo::GetInputNode(ps_optimizer, i);
// If the input is not a cnode, no inter-process edge is added so no node with multiple inputs should be created.
if (!input->isa<CNode>()) {
continue;
}
if (i == gradient_index) {
// Create the node to replace origin gradient which could be a RealDiv node.
auto grad_accum_nodes = CreateNodesForGradAccumulation(
input, (role_ == distributed::kEnvRoleOfWorker) ? rank_id_ : 0, gradient_type, worker_num);
const auto &accum_node = grad_accum_nodes.first;
const auto &real_div_node = grad_accum_nodes.second;
func_graph_->manager()->SetEdge(ps_optimizer, i + 1, real_div_node);
node_labels_->insert(std::make_pair(accum_node, node_labels_->at(ps_optimizer)));
node_labels_->insert(std::make_pair(real_div_node, node_labels_->at(ps_optimizer)));
} else if (i == indices_index) {
// Create the node to replace origin indices.
AnfNodePtr new_indices_input = CreateNodeWithInterProcessEdgeOnPServer(
kConcatOpName, input, (role_ == distributed::kEnvRoleOfWorker) ? rank_id_ : 0, worker_num);
func_graph_->manager()->SetEdge(ps_optimizer, i + 1, new_indices_input);
node_labels_->insert(std::make_pair(new_indices_input, node_labels_->at(ps_optimizer)));
} else {
AnfNodePtr new_input = CreateNodeWithInterProcessEdgeOnPServer(
prim::kMakeTuple, input, (role_ == distributed::kEnvRoleOfWorker) ? rank_id_ : 0, worker_num);
func_graph_->manager()->SetEdge(ps_optimizer, i + 1, new_input);
node_labels_->insert(std::make_pair(new_input, node_labels_->at(ps_optimizer)));
}
}
}
}
std::vector<CNodePtr> ParameterServerMode::FilterServerAwareOptimizerList(const std::vector<AnfNodePtr> &nodes) {
std::vector<CNodePtr> ps_optim_list = {};
for (const auto &node : nodes) {
if (!node->isa<CNode>()) {
continue;
}
const auto &cnode = node->cast<CNodePtr>();
if (common::AnfAlgo::HasNodeAttr(kAttrUpdateParameter, cnode)) {
ps_optim_list.emplace_back(cnode);
}
}
return ps_optim_list;
}
std::pair<CNodePtr, CNodePtr> ParameterServerMode::CreateNodesForGradAccumulation(const AnfNodePtr &gradient_input,
size_t gradient_input_index,
const std::string &gradient_type,
size_t total_gradient_number) {
MS_EXCEPTION_IF_NULL(gradient_input);
if (kGradTypeToAccumOpName.count(gradient_type) == 0) {
MS_LOG(EXCEPTION) << "The gradient type " << gradient_type << " is invalid.";
}
const std::string &accum_node_name = kGradTypeToAccumOpName.at(gradient_type);
auto grad_accum_node = CreateNodeWithInterProcessEdgeOnPServer(accum_node_name, gradient_input, gradient_input_index,
total_gradient_number);
MS_EXCEPTION_IF_NULL(grad_accum_node);
CNodePtr real_div_node = CreateGradMeanNode(grad_accum_node, total_gradient_number);
MS_EXCEPTION_IF_NULL(real_div_node);
return std::make_pair(grad_accum_node, real_div_node);
}
CNodePtr ParameterServerMode::CreateGradMeanNode(const AnfNodePtr &gradient, size_t divisor) {
MS_EXCEPTION_IF_NULL(gradient);
// Step 1: Create the value node of divisor. The divisor's value is worker number.
auto addn_abstract = gradient->abstract()->cast<abstract::AbstractTensorPtr>();
MS_EXCEPTION_IF_NULL(addn_abstract);
auto divisor_tensor =
std::make_shared<tensor::Tensor>(static_cast<uint64_t>(divisor), addn_abstract->element()->BuildType());
MS_EXCEPTION_IF_NULL(divisor_tensor);
auto divisor_value_node = NewValueNode(divisor_tensor);
MS_EXCEPTION_IF_NULL(divisor_value_node);
divisor_value_node->set_abstract(divisor_tensor->ToAbstract());
// Step 2: Create RealDiv node.
std::vector<AnfNodePtr> real_div_inputs = {NewValueNode(std::make_shared<Primitive>(kRealDivOpName)), gradient,
divisor_value_node};
CNodePtr real_div_node = func_graph_->NewCNode(real_div_inputs);
MS_EXCEPTION_IF_NULL(real_div_node);
real_div_node->set_abstract(gradient->abstract());
return real_div_node;
}
CNodePtr ParameterServerMode::CreateNodeWithInterProcessEdgeOnPServer(const std::string &many_to_one_node_name,
const AnfNodePtr &real_input,
size_t index_of_real_input,
uint32_t total_inputs_number) {
if (index_of_real_input >= total_inputs_number) {
MS_LOG(EXCEPTION) << "The index of real input for " << many_to_one_node_name << " " << index_of_real_input
<< " is greater or equal to worker number " << total_inputs_number;
}
// Step 1: Create multiple inputs of new node including extra nodes.
std::vector<AnfNodePtr> new_node_inputs;
new_node_inputs.resize(total_inputs_number);
std::vector<AnfNodePtr> mock_node_inputs = {NewValueNode(
std::make_shared<Primitive>(IsPrimitiveCNode(real_input, prim::kPrimUpdateState) ? "UpdateState" : kVirtualNode))};
for (size_t i = 0; i < new_node_inputs.size(); i++) {
new_node_inputs[i] = func_graph_->NewCNode(mock_node_inputs);
MS_EXCEPTION_IF_NULL(new_node_inputs[i]);
new_node_inputs[i]->set_abstract(real_input->abstract());
new_node_inputs[i]->cast<CNodePtr>()->set_fullname_with_scope(real_input->fullname_with_scope());
// Set operator label for new node's inputs.
OperatorLabel input_label = {SizeToUint(i), distributed::kEnvRoleOfWorker};
node_labels_->insert(std::make_pair(new_node_inputs[i], input_label));
}
new_node_inputs[index_of_real_input] = real_input;
// Step 2: Create the new node.
auto new_node_prim = NewValueNode(std::make_shared<Primitive>(many_to_one_node_name));
new_node_inputs.insert(new_node_inputs.begin(), new_node_prim);
auto new_node = func_graph_->NewCNode(new_node_inputs);
MS_EXCEPTION_IF_NULL(new_node);
// Step 3: Set the new node's abstract.
if (many_to_one_node_name == kConcatOpName) {
auto origin_abs = real_input->abstract()->cast<abstract::AbstractTensorPtr>();
MS_EXCEPTION_IF_NULL(origin_abs);
ShapeVector origin_shape = origin_abs->shape()->shape();
origin_shape[0] = origin_shape[0] * total_inputs_number;
origin_abs->shape()->set_shape(origin_shape);
new_node->set_abstract(origin_abs);
} else {
new_node->set_abstract(real_input->abstract());
}
return new_node;
}
} // namespace parallel } // namespace parallel
} // namespace mindspore } // namespace mindspore

View File

@ -26,6 +26,7 @@
#include "ir/value.h" #include "ir/value.h"
#include "ir/graph_utils.h" #include "ir/graph_utils.h"
#include "base/base.h" #include "base/base.h"
#include "include/common/utils/utils.h"
#include "ir/func_graph.h" #include "ir/func_graph.h"
#include "distributed/constants.h" #include "distributed/constants.h"
#if ((defined ENABLE_CPU) && (!defined _WIN32)) #if ((defined ENABLE_CPU) && (!defined _WIN32))
@ -101,6 +102,16 @@ using InterProcessOpEdgesInfo = std::map<InterProcessOpEdge, InterProcessOpPair>
constexpr char kAttrUpdateParameter[] = "update_parameter"; constexpr char kAttrUpdateParameter[] = "update_parameter";
constexpr char kAttrParameterInputIndex[] = "parameter_input_index"; constexpr char kAttrParameterInputIndex[] = "parameter_input_index";
constexpr char kAttrGradientInputIndex[] = "gradient_input_index"; constexpr char kAttrGradientInputIndex[] = "gradient_input_index";
constexpr char kAttrIndicesInputIndex[] = "indices_input_index";
constexpr char kAttrGradientType[] = "gradient_type";
constexpr char kDenseGradient[] = "dense_gradient";
constexpr char kSparseGradient[] = "sparse_gradient";
// The accumulator operator names for different gradient types.
const std::map<std::string, std::string> kGradTypeToAccumOpName = {
{kDenseGradient, kAddNOpName},
{kSparseGradient, kConcatOpName},
};
// Node which is not physically on this process should be created for splitting graph implementation. This could be // Node which is not physically on this process should be created for splitting graph implementation. This could be
// considered as a virtual node which will be elimimated after splitting graph. For example, for server in PS mode, some // considered as a virtual node which will be elimimated after splitting graph. For example, for server in PS mode, some
@ -204,23 +215,27 @@ using GraphSplitterPtr = std::shared_ptr<GraphSplitter>;
class DistributedExecutionMode { class DistributedExecutionMode {
public: public:
// Pass the dyed graph, node labels, process's role and rank id to construct execution mode. // Pass the dyed graph, node labels, process's role and rank id to construct execution mode.
explicit DistributedExecutionMode(const FuncGraphPtr &func_graph, uint32_t rank_id, const std::string &role) explicit DistributedExecutionMode(const FuncGraphPtr &func_graph, NodeLabels *node_labels, uint32_t rank_id,
: func_graph_(func_graph), rank_id_(rank_id), role_(role) {} const std::string &role)
: func_graph_(func_graph), node_labels_(node_labels), rank_id_(rank_id), role_(role) {}
virtual ~DistributedExecutionMode() = default; virtual ~DistributedExecutionMode() = default;
// Prebuild the distributed graph to prepare for splitting graph. For example,adding extra accumulation nodes, replace // Prebuild the distributed graph to prepare for splitting graph. For example,adding extra accumulation nodes, replace
// gradient input of optimizer nodes, dying new created nodes so that common split implementation could applied. // gradient input of optimizer nodes, dying new created nodes so that common split implementation could applied.
// Input 'node_labels' represents node labels of the origin graph. This method could modify this map. // Input 'node_labels' represents node labels of the origin graph. This method could modify this map.
virtual void PreBuildDistributedGraph(NodeLabels *node_labels) {} virtual void PreBuildDistributedGraph() {}
// Postbuild the distributed graph after splitting graph. For example, adding extra edges to the split graph. // Postbuild the distributed graph after splitting graph. For example, adding extra edges to the split graph.
// Input 'node_labels' represents node labels of the split graph. // Input 'node_labels' represents node labels of the split graph.
// Input 'comm_edges' represents the inter-process edges generated after splitting the graph. // Input 'comm_edges' represents the inter-process edges generated after splitting the graph.
virtual void PostBuildDistributedGraph(const NodeLabels &node_labels, const InterProcessOpEdgesInfo &comm_edges) {} virtual void PostBuildDistributedGraph(const InterProcessOpEdgesInfo &comm_edges) {}
protected: protected:
FuncGraphPtr func_graph_; FuncGraphPtr func_graph_;
// The node label set by graph splitter. It could be modified by DistributedExecutionMode.
NodeLabels *node_labels_;
// Rank id and node role of this process. They are used to dye graph with different labels, help build split graph, // Rank id and node role of this process. They are used to dye graph with different labels, help build split graph,
// etc. // etc.
uint32_t rank_id_; uint32_t rank_id_;
@ -233,24 +248,40 @@ constexpr uint32_t kMinGradAccumWorkerNum = 2;
// The execution of Parameter Server mode. // The execution of Parameter Server mode.
class ParameterServerMode : public DistributedExecutionMode { class ParameterServerMode : public DistributedExecutionMode {
public: public:
explicit ParameterServerMode(const FuncGraphPtr &func_graph, uint32_t rank_id, const std::string &role) explicit ParameterServerMode(const FuncGraphPtr &func_graph, NodeLabels *node_labels, uint32_t rank_id,
: DistributedExecutionMode(func_graph, rank_id, role) {} const std::string &role)
: DistributedExecutionMode(func_graph, node_labels, rank_id, role) {}
~ParameterServerMode() = default; ~ParameterServerMode() = default;
void PreBuildDistributedGraph(NodeLabels *node_labels) override; void PreBuildDistributedGraph() override;
void PostBuildDistributedGraph(const NodeLabels &node_labels, const InterProcessOpEdgesInfo &comm_edges) override; void PostBuildDistributedGraph(const InterProcessOpEdgesInfo &comm_edges) override;
private: private:
// Process optimizers split to the parameter server.
void ProcessForSplittedOptimizer();
// Filter out all optimizer nodes which are set on parameter server from the graph. // Filter out all optimizer nodes which are set on parameter server from the graph.
std::vector<CNodePtr> FilterServerAwareOptimizerList(const std::vector<AnfNodePtr> &nodes); std::vector<CNodePtr> FilterServerAwareOptimizerList(const std::vector<AnfNodePtr> &nodes);
// Create node with multiple inputs whose number is the worker number in PS mode. // Create gradients accumulator with mean operator for the given optimizer. It could be sparse or dense gradients.
// 'node_name' represents the name of the node to be created. // 'total_gradient_number' represents how many workers' gradients will be accumulated for this optimizer.
// The return value is a pair of accumulation node to RealDiv node.
std::pair<CNodePtr, CNodePtr> CreateNodesForGradAccumulation(const AnfNodePtr &gradient_input,
size_t gradient_input_index,
const std::string &gradient_type,
size_t total_gradient_number);
// Normally after gradients accumulation, the mean value should be calculated.
CNodePtr CreateGradMeanNode(const AnfNodePtr &gradient, size_t divisor);
// Create node with multiple inputs. Some of the inputs could be fake nodes.
// 'many_to_one_node_name' represents the name of the node to be created.
// 'real_input' represents the input which is already in the func_graph_. Other inputs will be created as this input. // 'real_input' represents the input which is already in the func_graph_. Other inputs will be created as this input.
// 'index_of_real_input': the input index of 'real_input' of this new created node: 'node_name'. // 'index_of_real_input': the input index of 'real_input' of this new created node: 'many_to_one_node_name'.
// 'worker_num': the worker number in this PS cluster. // 'total_inputs_number': the total inputs number of the created node.
CNodePtr CreateNodeForMultipleWorker(NodeLabels *node_labels, const std::string &node_name, CNodePtr CreateNodeWithInterProcessEdgeOnPServer(const std::string &many_to_one_node_name,
const AnfNodePtr &real_input, size_t index_of_real_input, uint32_t worker_num); const AnfNodePtr &real_input, size_t index_of_real_input,
uint32_t total_inputs_number);
}; };
} // namespace parallel } // namespace parallel
} // namespace mindspore } // namespace mindspore

View File

@ -71,6 +71,7 @@ if(ENABLE_MINDDATA)
./fl/*.cc ./fl/*.cc
./distributed/persistent/*.cc ./distributed/persistent/*.cc
./distributed/rpc/tcp/*.cc ./distributed/rpc/tcp/*.cc
./distributed/cluster/*.cc
./distributed/cluster/topology/*.cc ./distributed/cluster/topology/*.cc
./cxx_api/*.cc ./cxx_api/*.cc
./tbe/*.cc ./tbe/*.cc