forked from mindspore-Ecosystem/mindspore
!35225 fix_independent_stream_error_r1.6
Merge pull request !35225 from yao_yf/fix_independent_stream_error_r1.6
This commit is contained in:
commit
34761f434f
|
@ -159,6 +159,7 @@
|
|||
#include "backend/optimizer/ascend/dynamic_shape/convert_inherited_dynamic_op.h"
|
||||
#include "backend/optimizer/ascend/dynamic_shape/link_custom_op.h"
|
||||
#include "backend/optimizer/pass/adjust_depend_for_parallel_optimizer_recompute_all_gather.h"
|
||||
#include "backend/optimizer/pass/gradients_allreduce_depend_last_send.h"
|
||||
#include "backend/kernel_compiler/tbe/tbe_kernel_compile.h"
|
||||
#include "utils/ms_context.h"
|
||||
#include "utils/config_manager.h"
|
||||
|
@ -461,6 +462,7 @@ void AscendBackendOptimization(const std::shared_ptr<session::KernelGraph> &kern
|
|||
other_pm->AddPass(std::make_shared<SplitInputsForReduceScatter>());
|
||||
other_pm->AddPass(std::make_shared<BroadcastFusion>());
|
||||
other_pm->AddPass(std::make_shared<InsertTensorMoveForCascade>());
|
||||
other_pm->AddPass(std::make_shared<GradientsAllReduceDependLastSend>());
|
||||
other_pm->AddPass(std::make_shared<ParameterTransOpFusion>());
|
||||
other_pm->AddPass(std::make_shared<RefreshParameterFormat>());
|
||||
other_pm->AddPass(std::make_shared<SplitOpOptimizer>());
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Copyright 2022 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "backend/optimizer/pass/gradients_allreduce_depend_last_send.h"
|
||||
#include <algorithm>
|
||||
#include "frontend/parallel/context.h"
|
||||
#include "backend/session/anf_runtime_algorithm.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace opt {
|
||||
bool GradientsAllReduceDependLastSend::Run(const FuncGraphPtr &graph) {
|
||||
MS_EXCEPTION_IF_NULL(graph);
|
||||
auto parallel_mode = parallel::ParallelContext::GetInstance()->parallel_mode();
|
||||
if (parallel_mode != parallel::SEMI_AUTO_PARALLEL && parallel_mode != parallel::AUTO_PARALLEL) {
|
||||
return false;
|
||||
}
|
||||
int32_t split_stage_num = parallel::ParallelContext::GetInstance()->pipeline_stage_split_num();
|
||||
if (split_stage_num <= 1) {
|
||||
return false;
|
||||
}
|
||||
std::vector<AnfNodePtr> node_list = TopoSort(graph->get_return());
|
||||
std::vector<CNodePtr> addn_list;
|
||||
CNodePtr last_send;
|
||||
for (auto &node : node_list) {
|
||||
MS_EXCEPTION_IF_NULL(node);
|
||||
if (!node->cast<CNodePtr>() || !AnfUtils::IsRealKernel(node)) {
|
||||
continue;
|
||||
}
|
||||
auto cnode = node->cast<CNodePtr>();
|
||||
if (IsPrimitiveCNode(cnode, prim::kPrimAllReduce) && AnfAlgo::IsFusion(cnode)) {
|
||||
auto last_input = cnode->inputs().back();
|
||||
if (IsPrimitiveCNode(last_input, prim::kPrimTensorMove)) {
|
||||
auto last_input_cnode = last_input->cast<CNodePtr>();
|
||||
auto real_input_node = last_input_cnode->input(1);
|
||||
if (IsPrimitiveCNode(real_input_node, prim::kPrimDepend)) {
|
||||
auto addn_node = real_input_node->cast<CNodePtr>()->input(2);
|
||||
if (IsPrimitiveCNode(addn_node, prim::kPrimAddN)) {
|
||||
MS_LOG(INFO) << "Find the pipeline addn " << addn_node->fullname_with_scope();
|
||||
addn_list.push_back(addn_node->cast<CNodePtr>());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (IsPrimitiveCNode(cnode, prim::kPrimSend)) {
|
||||
last_send = cnode;
|
||||
}
|
||||
}
|
||||
return InsertDependBetweenAllReduceAndSend(graph, addn_list, last_send);
|
||||
}
|
||||
|
||||
bool GradientsAllReduceDependLastSend::InsertDependBetweenAllReduceAndSend(const FuncGraphPtr &graph,
|
||||
const std::vector<CNodePtr> &addn_list,
|
||||
const CNodePtr &last_send) {
|
||||
bool changed = false;
|
||||
FuncGraphManagerPtr manager = graph->manager();
|
||||
for (auto &addn : addn_list) {
|
||||
std::vector<AnfNodePtr> inputs = {NewValueNode(std::make_shared<Primitive>(prim::kPrimDepend->name())), addn,
|
||||
last_send};
|
||||
auto new_depend = graph->NewCNode(inputs);
|
||||
new_depend->set_abstract(addn->abstract());
|
||||
(void)manager->Replace(addn, new_depend);
|
||||
changed = true;
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
} // namespace opt
|
||||
} // namespace mindspore
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Copyright 2022 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_BACKEND_OPTIMIZER_PASS_GRADIENTS_ALLREDUCE_DEPEBD_LAST_SEND_H_
|
||||
#define MINDSPORE_CCSRC_BACKEND_OPTIMIZER_PASS_GRADIENTS_ALLREDUCE_DEPEBD_LAST_SEND_H_
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <memory>
|
||||
|
||||
#include "backend/optimizer/common/pass.h"
|
||||
#include "ir/func_graph.h"
|
||||
#include "ir/anf.h"
|
||||
#include "backend/optimizer/common/helper.h"
|
||||
#include "backend/optimizer/common/optimizer.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace opt {
|
||||
class GradientsAllReduceDependLastSend : public Pass {
|
||||
public:
|
||||
GradientsAllReduceDependLastSend() : Pass("adjust_depend_for_parallel_optimizer_recompute_all_gather") {}
|
||||
~GradientsAllReduceDependLastSend() override = default;
|
||||
bool Run(const FuncGraphPtr &graph) override;
|
||||
|
||||
private:
|
||||
bool InsertDependBetweenAllReduceAndSend(const FuncGraphPtr &graph, const std::vector<CNodePtr> &addn_list,
|
||||
const CNodePtr &last_send);
|
||||
};
|
||||
} // namespace opt
|
||||
} // namespace mindspore
|
||||
#endif // MINDSPORE_CCSRC_BACKEND_OPTIMIZER_PASS_GRADIENTS_ALLREDUCE_DEPEBD_LAST_SEND_H_
|
|
@ -264,8 +264,10 @@ void AscendStreamAssign::AssignStream(const NotNull<KernelGraphPtr> &graph_ptr)
|
|||
Reset();
|
||||
SetLoopSink();
|
||||
ReorderIndependentOrders(graph_ptr);
|
||||
TrailingTimeOptimizationByReorder(graph_ptr);
|
||||
|
||||
auto parallel_mode = parallel::ParallelContext::GetInstance()->parallel_mode();
|
||||
if (parallel_mode != parallel::SEMI_AUTO_PARALLEL && parallel_mode != parallel::AUTO_PARALLEL) {
|
||||
TrailingTimeOptimizationByReorder(graph_ptr);
|
||||
}
|
||||
AssignAllNodesStream(graph_ptr);
|
||||
UpdateAtomicAddrCleanStreamId(graph_ptr);
|
||||
InsertStreamActive(graph_ptr);
|
||||
|
@ -1121,6 +1123,33 @@ void AscendStreamAssign::InsertStreamActiveForCommon(const NotNull<KernelGraphPt
|
|||
graph_ptr->set_execution_order(update_cnode_list);
|
||||
}
|
||||
|
||||
std::vector<CNodePtr> AscendStreamAssign::GetIndependentNodesNeedsInsertActive(const std::vector<CNodePtr> exe_orders,
|
||||
const uint32_t graph_id) {
|
||||
uint32_t pre_stream_id = 0;
|
||||
std::vector<CNodePtr> independent_nodes_need_insert_active;
|
||||
CNodePtr pre_cnode;
|
||||
for (size_t i = 0; i < exe_orders.size(); i++) {
|
||||
auto cur_cnode_ptr = exe_orders[i];
|
||||
if (!AnfAlgo::IsIndependentNode(cur_cnode_ptr)) {
|
||||
continue;
|
||||
}
|
||||
if (AnfAlgo::GetGraphId(cur_cnode_ptr.get()) != graph_id) {
|
||||
continue;
|
||||
}
|
||||
auto cur_stream_id = AnfAlgo::GetStreamId(cur_cnode_ptr);
|
||||
if (pre_stream_id == 0) {
|
||||
pre_stream_id = cur_stream_id;
|
||||
pre_cnode = cur_cnode_ptr;
|
||||
}
|
||||
if (cur_stream_id != pre_stream_id) {
|
||||
independent_nodes_need_insert_active.push_back(pre_cnode);
|
||||
}
|
||||
pre_stream_id = cur_stream_id;
|
||||
pre_cnode = cur_cnode_ptr;
|
||||
}
|
||||
return independent_nodes_need_insert_active;
|
||||
}
|
||||
|
||||
void AscendStreamAssign::InsertStreamActiveForIndependent(const NotNull<KernelGraphPtr> &graph_ptr) {
|
||||
auto root_graph_id = graph_ptr->graph_id();
|
||||
if (root_graph_id == kInvalidGraphId) {
|
||||
|
@ -1132,31 +1161,23 @@ void AscendStreamAssign::InsertStreamActiveForIndependent(const NotNull<KernelGr
|
|||
independent_streams = item.second;
|
||||
}
|
||||
}
|
||||
|
||||
// Root graph independent stream size is not more than one, no need insert active
|
||||
if (independent_streams.size() <= 1) {
|
||||
return;
|
||||
}
|
||||
std::vector<CNodePtr> update_cnode_list;
|
||||
auto exe_orders = graph_ptr->execution_order();
|
||||
|
||||
auto independent_nodes_need_insert_active = GetIndependentNodesNeedsInsertActive(exe_orders, root_graph_id);
|
||||
// first independent is been activated, active other independent stream
|
||||
std::vector<uint32_t> streams;
|
||||
std::copy(independent_streams.begin(), independent_streams.end(), std::back_inserter(streams));
|
||||
std::sort(streams.begin(), streams.end());
|
||||
uint32_t node_num = 0;
|
||||
for (size_t i = 0; i < exe_orders.size(); i++) {
|
||||
auto cur_cnode_ptr = exe_orders[i];
|
||||
update_cnode_list.emplace_back(cur_cnode_ptr);
|
||||
if (!AnfAlgo::IsIndependentNode(cur_cnode_ptr)) {
|
||||
if (!AnfAlgo::IsIndependentNode(cur_cnode_ptr) || AnfAlgo::GetGraphId(cur_cnode_ptr.get()) != root_graph_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (AnfAlgo::GetGraphId(cur_cnode_ptr.get()) != root_graph_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
node_num++;
|
||||
auto cur_stream_id = AnfAlgo::GetStreamId(cur_cnode_ptr);
|
||||
auto it = std::find(streams.begin(), streams.end(), cur_stream_id);
|
||||
if (it == streams.end()) {
|
||||
|
@ -1165,7 +1186,8 @@ void AscendStreamAssign::InsertStreamActiveForIndependent(const NotNull<KernelGr
|
|||
std::copy(exe_orders.begin() + i + 1, exe_orders.end(), std::back_inserter(update_cnode_list));
|
||||
break;
|
||||
} else {
|
||||
if (node_num == kMaxCommonNodeNumPerStream) {
|
||||
if (std::find(independent_nodes_need_insert_active.begin(), independent_nodes_need_insert_active.end(),
|
||||
cur_cnode_ptr) != independent_nodes_need_insert_active.end()) {
|
||||
CNodePtr active_ptr = KernelAdjust::GetInstance().CreateStreamActiveOp(graph_ptr);
|
||||
// 1.set stream id
|
||||
AnfAlgo::SetStreamId(cur_stream_id, active_ptr.get());
|
||||
|
@ -1173,7 +1195,6 @@ void AscendStreamAssign::InsertStreamActiveForIndependent(const NotNull<KernelGr
|
|||
std::vector<uint32_t> active_index_list{*(it + 1)};
|
||||
AnfAlgo::SetNodeAttr(kAttrActiveStreamList, MakeValue<std::vector<uint32_t>>(active_index_list), active_ptr);
|
||||
update_cnode_list.emplace_back(active_ptr);
|
||||
node_num = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,6 +104,8 @@ class AscendStreamAssign {
|
|||
std::pair<std::string, std::map<uint32_t, std::set<uint32_t>>> group_item);
|
||||
std::vector<std::pair<uint32_t, vector<size_t>>> GetStreamIDHcomMap(std::vector<CNodePtr> cnode_ptr_list,
|
||||
std::string group, size_t graph_id);
|
||||
std::vector<CNodePtr> GetIndependentNodesNeedsInsertActive(const std::vector<CNodePtr> exe_orders,
|
||||
const uint32_t graph_id);
|
||||
|
||||
void AdjustAtomicAddrCleanOrder(const NotNull<KernelGraphPtr> &graph_ptr);
|
||||
vector<CNodePtr> GetLastInputCnode(const NotNull<KernelGraphPtr> &graph_ptr, const CNodePtr &cur_cnode_ptr);
|
||||
|
|
Loading…
Reference in New Issue