From f9c54b5a81cbc9b30dfa3676515d5ec47fb0b7f0 Mon Sep 17 00:00:00 2001 From: yao_yf Date: Tue, 31 May 2022 11:29:54 +0800 Subject: [PATCH] fix indenpent stream active --- .../ascend/ascend_backend_optimization.cc | 2 + .../gradients_allreduce_depend_last_send.cc | 80 +++++++++++++++++++ .../gradients_allreduce_depend_last_send.h | 44 ++++++++++ .../device/ascend/ascend_stream_assign.cc | 49 ++++++++---- .../device/ascend/ascend_stream_assign.h | 2 + 5 files changed, 163 insertions(+), 14 deletions(-) create mode 100644 mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.cc create mode 100644 mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.h diff --git a/mindspore/ccsrc/backend/optimizer/ascend/ascend_backend_optimization.cc b/mindspore/ccsrc/backend/optimizer/ascend/ascend_backend_optimization.cc index 5cf04b60022..c8de6456b56 100644 --- a/mindspore/ccsrc/backend/optimizer/ascend/ascend_backend_optimization.cc +++ b/mindspore/ccsrc/backend/optimizer/ascend/ascend_backend_optimization.cc @@ -159,6 +159,7 @@ #include "backend/optimizer/ascend/dynamic_shape/convert_inherited_dynamic_op.h" #include "backend/optimizer/ascend/dynamic_shape/link_custom_op.h" #include "backend/optimizer/pass/adjust_depend_for_parallel_optimizer_recompute_all_gather.h" +#include "backend/optimizer/pass/gradients_allreduce_depend_last_send.h" #include "backend/kernel_compiler/tbe/tbe_kernel_compile.h" #include "utils/ms_context.h" #include "utils/config_manager.h" @@ -461,6 +462,7 @@ void AscendBackendOptimization(const std::shared_ptr &kern other_pm->AddPass(std::make_shared()); other_pm->AddPass(std::make_shared()); other_pm->AddPass(std::make_shared()); + other_pm->AddPass(std::make_shared()); other_pm->AddPass(std::make_shared()); other_pm->AddPass(std::make_shared()); other_pm->AddPass(std::make_shared()); diff --git a/mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.cc b/mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.cc new file mode 100644 index 00000000000..aa91f78c921 --- /dev/null +++ b/mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.cc @@ -0,0 +1,80 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "backend/optimizer/pass/gradients_allreduce_depend_last_send.h" +#include +#include "frontend/parallel/context.h" +#include "backend/session/anf_runtime_algorithm.h" + +namespace mindspore { +namespace opt { +bool GradientsAllReduceDependLastSend::Run(const FuncGraphPtr &graph) { + MS_EXCEPTION_IF_NULL(graph); + auto parallel_mode = parallel::ParallelContext::GetInstance()->parallel_mode(); + if (parallel_mode != parallel::SEMI_AUTO_PARALLEL && parallel_mode != parallel::AUTO_PARALLEL) { + return false; + } + int32_t split_stage_num = parallel::ParallelContext::GetInstance()->pipeline_stage_split_num(); + if (split_stage_num <= 1) { + return false; + } + std::vector node_list = TopoSort(graph->get_return()); + std::vector addn_list; + CNodePtr last_send; + for (auto &node : node_list) { + MS_EXCEPTION_IF_NULL(node); + if (!node->cast() || !AnfUtils::IsRealKernel(node)) { + continue; + } + auto cnode = node->cast(); + if (IsPrimitiveCNode(cnode, prim::kPrimAllReduce) && AnfAlgo::IsFusion(cnode)) { + auto last_input = cnode->inputs().back(); + if (IsPrimitiveCNode(last_input, prim::kPrimTensorMove)) { + auto last_input_cnode = last_input->cast(); + auto real_input_node = last_input_cnode->input(1); + if (IsPrimitiveCNode(real_input_node, prim::kPrimDepend)) { + auto addn_node = real_input_node->cast()->input(2); + if (IsPrimitiveCNode(addn_node, prim::kPrimAddN)) { + MS_LOG(INFO) << "Find the pipeline addn " << addn_node->fullname_with_scope(); + addn_list.push_back(addn_node->cast()); + } + } + } + } + if (IsPrimitiveCNode(cnode, prim::kPrimSend)) { + last_send = cnode; + } + } + return InsertDependBetweenAllReduceAndSend(graph, addn_list, last_send); +} + +bool GradientsAllReduceDependLastSend::InsertDependBetweenAllReduceAndSend(const FuncGraphPtr &graph, + const std::vector &addn_list, + const CNodePtr &last_send) { + bool changed = false; + FuncGraphManagerPtr manager = graph->manager(); + for (auto &addn : addn_list) { + std::vector inputs = {NewValueNode(std::make_shared(prim::kPrimDepend->name())), addn, + last_send}; + auto new_depend = graph->NewCNode(inputs); + new_depend->set_abstract(addn->abstract()); + (void)manager->Replace(addn, new_depend); + changed = true; + } + return changed; +} +} // namespace opt +} // namespace mindspore diff --git a/mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.h b/mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.h new file mode 100644 index 00000000000..91510023c18 --- /dev/null +++ b/mindspore/ccsrc/backend/optimizer/pass/gradients_allreduce_depend_last_send.h @@ -0,0 +1,44 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_BACKEND_OPTIMIZER_PASS_GRADIENTS_ALLREDUCE_DEPEBD_LAST_SEND_H_ +#define MINDSPORE_CCSRC_BACKEND_OPTIMIZER_PASS_GRADIENTS_ALLREDUCE_DEPEBD_LAST_SEND_H_ +#include +#include +#include +#include + +#include "backend/optimizer/common/pass.h" +#include "ir/func_graph.h" +#include "ir/anf.h" +#include "backend/optimizer/common/helper.h" +#include "backend/optimizer/common/optimizer.h" + +namespace mindspore { +namespace opt { +class GradientsAllReduceDependLastSend : public Pass { + public: + GradientsAllReduceDependLastSend() : Pass("adjust_depend_for_parallel_optimizer_recompute_all_gather") {} + ~GradientsAllReduceDependLastSend() override = default; + bool Run(const FuncGraphPtr &graph) override; + + private: + bool InsertDependBetweenAllReduceAndSend(const FuncGraphPtr &graph, const std::vector &addn_list, + const CNodePtr &last_send); +}; +} // namespace opt +} // namespace mindspore +#endif // MINDSPORE_CCSRC_BACKEND_OPTIMIZER_PASS_GRADIENTS_ALLREDUCE_DEPEBD_LAST_SEND_H_ diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc index 54cc53679a9..63b29aca8cf 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc @@ -264,8 +264,10 @@ void AscendStreamAssign::AssignStream(const NotNull &graph_ptr) Reset(); SetLoopSink(); ReorderIndependentOrders(graph_ptr); - TrailingTimeOptimizationByReorder(graph_ptr); - + auto parallel_mode = parallel::ParallelContext::GetInstance()->parallel_mode(); + if (parallel_mode != parallel::SEMI_AUTO_PARALLEL && parallel_mode != parallel::AUTO_PARALLEL) { + TrailingTimeOptimizationByReorder(graph_ptr); + } AssignAllNodesStream(graph_ptr); UpdateAtomicAddrCleanStreamId(graph_ptr); InsertStreamActive(graph_ptr); @@ -1121,6 +1123,33 @@ void AscendStreamAssign::InsertStreamActiveForCommon(const NotNullset_execution_order(update_cnode_list); } +std::vector AscendStreamAssign::GetIndependentNodesNeedsInsertActive(const std::vector exe_orders, + const uint32_t graph_id) { + uint32_t pre_stream_id = 0; + std::vector independent_nodes_need_insert_active; + CNodePtr pre_cnode; + for (size_t i = 0; i < exe_orders.size(); i++) { + auto cur_cnode_ptr = exe_orders[i]; + if (!AnfAlgo::IsIndependentNode(cur_cnode_ptr)) { + continue; + } + if (AnfAlgo::GetGraphId(cur_cnode_ptr.get()) != graph_id) { + continue; + } + auto cur_stream_id = AnfAlgo::GetStreamId(cur_cnode_ptr); + if (pre_stream_id == 0) { + pre_stream_id = cur_stream_id; + pre_cnode = cur_cnode_ptr; + } + if (cur_stream_id != pre_stream_id) { + independent_nodes_need_insert_active.push_back(pre_cnode); + } + pre_stream_id = cur_stream_id; + pre_cnode = cur_cnode_ptr; + } + return independent_nodes_need_insert_active; +} + void AscendStreamAssign::InsertStreamActiveForIndependent(const NotNull &graph_ptr) { auto root_graph_id = graph_ptr->graph_id(); if (root_graph_id == kInvalidGraphId) { @@ -1132,31 +1161,23 @@ void AscendStreamAssign::InsertStreamActiveForIndependent(const NotNull update_cnode_list; auto exe_orders = graph_ptr->execution_order(); - + auto independent_nodes_need_insert_active = GetIndependentNodesNeedsInsertActive(exe_orders, root_graph_id); // first independent is been activated, active other independent stream std::vector streams; std::copy(independent_streams.begin(), independent_streams.end(), std::back_inserter(streams)); std::sort(streams.begin(), streams.end()); - uint32_t node_num = 0; for (size_t i = 0; i < exe_orders.size(); i++) { auto cur_cnode_ptr = exe_orders[i]; update_cnode_list.emplace_back(cur_cnode_ptr); - if (!AnfAlgo::IsIndependentNode(cur_cnode_ptr)) { + if (!AnfAlgo::IsIndependentNode(cur_cnode_ptr) || AnfAlgo::GetGraphId(cur_cnode_ptr.get()) != root_graph_id) { continue; } - - if (AnfAlgo::GetGraphId(cur_cnode_ptr.get()) != root_graph_id) { - continue; - } - - node_num++; auto cur_stream_id = AnfAlgo::GetStreamId(cur_cnode_ptr); auto it = std::find(streams.begin(), streams.end(), cur_stream_id); if (it == streams.end()) { @@ -1165,7 +1186,8 @@ void AscendStreamAssign::InsertStreamActiveForIndependent(const NotNull active_index_list{*(it + 1)}; AnfAlgo::SetNodeAttr(kAttrActiveStreamList, MakeValue>(active_index_list), active_ptr); update_cnode_list.emplace_back(active_ptr); - node_num = 0; } } } diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h index 4a989321f72..0d6759cb8d7 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h @@ -104,6 +104,8 @@ class AscendStreamAssign { std::pair>> group_item); std::vector>> GetStreamIDHcomMap(std::vector cnode_ptr_list, std::string group, size_t graph_id); + std::vector GetIndependentNodesNeedsInsertActive(const std::vector exe_orders, + const uint32_t graph_id); void AdjustAtomicAddrCleanOrder(const NotNull &graph_ptr); vector GetLastInputCnode(const NotNull &graph_ptr, const CNodePtr &cur_cnode_ptr);