diff --git a/mindspore/ccsrc/backend/session/ascend_session.cc b/mindspore/ccsrc/backend/session/ascend_session.cc index 6644c58441b..b74e050d189 100644 --- a/mindspore/ccsrc/backend/session/ascend_session.cc +++ b/mindspore/ccsrc/backend/session/ascend_session.cc @@ -80,14 +80,7 @@ using mindspore::profiler::ascend::MemoryProfiling; namespace mindspore { namespace session { -const size_t kInvalidIndex = SIZE_MAX; -const size_t kLoopSinkTensorNum = 3; -const size_t kLoopSinkCurLoopIndex = 0; -const size_t kLoopSinkNextLoopIndex = 1; -const size_t kLoopSinkEpochIndex = 2; const size_t kLabelNumsThreshold = 1023; -constexpr char SR_TAG[] = "sr_tag"; -constexpr char BACKWARD[] = "backward"; constexpr auto kUnknowErrorString = "Unknown error occurred"; namespace { #ifndef ENABLE_SECURITY @@ -209,88 +202,6 @@ void GenOpOutputStubTensor(const KernelGraphPtr &single_op_graph, const CNodePtr } } -size_t LoadCtrlInputTensor(const std::shared_ptr &graph, std::vector *inputs) { - MS_EXCEPTION_IF_NULL(graph); - MS_LOG(DEBUG) << "Load kInputCtrlTensors"; - auto inputs_params = graph->input_ctrl_tensors(); - if (inputs_params == nullptr) { - return 0; - } - if (inputs_params->size() < kLoopSinkTensorNum) { - MS_LOG(EXCEPTION) << "Illegal inputs_params size"; - } - // update current loop tensor to 0 per iterator - auto cur_loop_tensor = (*inputs_params)[kLoopSinkCurLoopIndex]; - MS_EXCEPTION_IF_NULL(cur_loop_tensor); - auto *cur_val = static_cast(cur_loop_tensor->data_c()); - MS_EXCEPTION_IF_NULL(cur_val); - *cur_val = 0; - cur_loop_tensor->set_sync_status(kNeedSyncHostToDevice); - // set loop_count to zero - if (inputs) { - inputs->push_back(cur_loop_tensor); - } else { - auto device_address = cur_loop_tensor->device_address(); - if (!device_address->SyncHostToDevice(cur_loop_tensor->shape(), LongToSize(cur_loop_tensor->data().nbytes()), - cur_loop_tensor->data_type(), cur_loop_tensor->data_c(), - cur_loop_tensor->device_info().host_format_)) { - MS_LOG(EXCEPTION) << "SyncHostToDevice failed for cur_loop_tensor needed for async dump."; - } - } - - // update next loop tensor to 0 per iterator - auto next_loop_tensor = (*inputs_params)[kLoopSinkNextLoopIndex]; - MS_EXCEPTION_IF_NULL(next_loop_tensor); - auto *next_val = static_cast(next_loop_tensor->data_c()); - MS_EXCEPTION_IF_NULL(next_val); - *next_val = 0; - next_loop_tensor->set_sync_status(kNeedSyncHostToDevice); - // set loop_count to zero - if (inputs) { - inputs->push_back(next_loop_tensor); - } else { - auto device_address = next_loop_tensor->device_address(); - if (!device_address->SyncHostToDevice(next_loop_tensor->shape(), LongToSize(next_loop_tensor->data().nbytes()), - next_loop_tensor->data_type(), next_loop_tensor->data_c(), - next_loop_tensor->device_info().host_format_)) { - MS_LOG(EXCEPTION) << "SyncHostToDevice failed for next_loop_tensor needed for async dump."; - } - } - - auto epoch_tensor = (*inputs_params)[kLoopSinkEpochIndex]; - MS_EXCEPTION_IF_NULL(epoch_tensor); - auto *epoch_val = static_cast(epoch_tensor->data_c()); - MS_EXCEPTION_IF_NULL(epoch_val); - *epoch_val = SizeToInt(graph->current_epoch()); - epoch_tensor->set_sync_status(kNeedSyncHostToDevice); - if (inputs) { - inputs->push_back(epoch_tensor); - } else { - auto device_address = epoch_tensor->device_address(); - if (!device_address->SyncHostToDevice(epoch_tensor->shape(), LongToSize(epoch_tensor->data().nbytes()), - epoch_tensor->data_type(), epoch_tensor->data_c(), - epoch_tensor->device_info().host_format_)) { - MS_LOG(EXCEPTION) << "SyncHostToDevice failed for epoch_tensor needed for async dump."; - } - } - MS_LOG(DEBUG) << "Load epoch_val:" << *epoch_val; - graph->set_current_epoch(graph->current_epoch() + 1); - return inputs_params->size(); -} - -void UpdateCtrlInputTensor(const std::shared_ptr &graph, std::vector *inputs, - size_t *input_ctrl_size) { - if (graph->input_ctrl_tensors()) { - auto &dump_json_parser = DumpJsonParser::GetInstance(); - bool sink_mode = (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE || graph->IsDatasetGraph()); - if (sink_mode || !dump_json_parser.async_dump_enabled()) { - *input_ctrl_size = LoadCtrlInputTensor(graph, inputs); - } else { - LoadCtrlInputTensor(graph, nullptr); - } - } -} - bool NeedMemcpyInDevice(const device::DeviceAddressPtr &src_device_addr, const device::DeviceAddressPtr &dst_device_addr) { MS_EXCEPTION_IF_NULL(dst_device_addr); @@ -378,15 +289,10 @@ void AscendSession::UnifyMindIR(const KernelGraphPtr &graph) { void AscendSession::LoadInputData(const std::shared_ptr &kernel_graph, const std::vector &inputs_const) const { std::vector inputs(inputs_const); - size_t input_ctrl_size = kLoopSinkTensorNum; uint32_t device_memcpy_nums = 0; MS_EXCEPTION_IF_NULL(kernel_graph); - UpdateCtrlInputTensor(kernel_graph, &inputs, &input_ctrl_size); + device::KernelAdjust::GetInstance().LoadDeviceLoopCtrlParameters(kernel_graph); auto &input_nodes = kernel_graph->input_nodes(); - if ((inputs.size() + input_ctrl_size) - kLoopSinkTensorNum != input_nodes.size()) { - MS_LOG(EXCEPTION) << "Tensor input:" << inputs.size() << " is not equal graph inputs:" << input_nodes.size() - << ", input_ctrl_size:" << input_ctrl_size; - } auto ms_context = MsContext::GetInstance(); MS_EXCEPTION_IF_NULL(ms_context); auto enable_mem_scheduler = ms_context->get_param(MS_CTX_ENABLE_MEM_SCHEDULER); @@ -1175,7 +1081,8 @@ void AscendSession::AdjustKernel(const std::shared_ptr &kernel_grap // prepare for next step from json get atomic info BuildKernel(kernel_graph); device::ascend::KernelBuildPreprocess(kernel_graph.get()); - device::KernelAdjust::GetInstance().InsertSwitchLoop(kernel_graph); + device::KernelAdjust::GetInstance().InsertDeviceLoopCtrl(kernel_graph); + device::KernelAdjust::GetInstance().ProcessLoopSink(kernel_graph); #ifdef ENABLE_DUMP_IR auto context_ptr = MsContext::GetInstance(); MS_EXCEPTION_IF_NULL(context_ptr); @@ -1377,6 +1284,7 @@ void AscendSession::MemoryAlloc(KernelGraph *kernel_graph) const { auto runtime_instance = device::KernelRuntimeManager::Instance().GetKernelRuntime(kAscendDevice, device_id_); MS_EXCEPTION_IF_NULL(runtime_instance); runtime_instance->AssignMemory(*kernel_graph); + device::KernelAdjust::GetInstance().AssignLoopCtrlMemory(*kernel_graph); MS_LOG(INFO) << "Status record: end memory alloc. graph id: " << kernel_graph->graph_id(); } @@ -1415,7 +1323,6 @@ void AscendSession::Load(const std::shared_ptr &kernel_graph) const auto context_ptr = MsContext::GetInstance(); MS_EXCEPTION_IF_NULL(context_ptr); bool is_task_sink = context_ptr->get_param(MS_CTX_ENABLE_TASK_SINK); - (void)device::KernelAdjust::GetInstance().StepLoadCtrlInputs(kernel_graph); auto runtime_instance = device::KernelRuntimeManager::Instance().GetKernelRuntime(kAscendDevice, device_id_); MS_EXCEPTION_IF_NULL(runtime_instance); bool ret_ok = runtime_instance->Load(*kernel_graph, is_task_sink); diff --git a/mindspore/ccsrc/backend/session/kernel_graph.h b/mindspore/ccsrc/backend/session/kernel_graph.h index 4a5ac4a8c0b..71c04ba57c8 100644 --- a/mindspore/ccsrc/backend/session/kernel_graph.h +++ b/mindspore/ccsrc/backend/session/kernel_graph.h @@ -83,7 +83,8 @@ class KernelGraph : public FuncGraph { summary_node_exist_ = graph.summary_node_exist_; valid_inputs_ = graph.valid_inputs_; child_graph_order_ = graph.child_graph_order_; - input_ctrl_tensors_ = graph.input_ctrl_tensors_; + device_loop_ctrl_tensors_ = graph.device_loop_ctrl_tensors_; + device_loop_ctrl_params_ = graph.device_loop_ctrl_params_; parent_graph_ = graph.parent_graph_; start_label_ = graph.start_label_; end_goto_ = graph.end_goto_; @@ -202,12 +203,18 @@ class KernelGraph : public FuncGraph { // checkout whether current graph is leaf graph bool IsLeafGraph() const; - // set input_tensors pointer of control parameter - void set_input_ctrl_tensors(const std::shared_ptr> &input_tensors_ptr) { - input_ctrl_tensors_ = input_tensors_ptr; + void set_device_loop_ctrl_tensors(const std::map &device_loop_ctrl_tensors) { + device_loop_ctrl_tensors_ = device_loop_ctrl_tensors; } - // get input_tensors pointer of control parameter - std::shared_ptr> input_ctrl_tensors() const { return input_ctrl_tensors_; } + std::map device_loop_control_tensors() const { return device_loop_ctrl_tensors_; } + + void set_device_loop_ctrl_params(const std::map &device_loop_ctrl_params) { + device_loop_ctrl_params_ = device_loop_ctrl_params; + } + const std::map device_loop_control_params() const { + return device_loop_ctrl_params_; + } + // get parent kernel graph std::weak_ptr parent_graph() const { return parent_graph_; } // set parent kernel graph @@ -442,8 +449,10 @@ class KernelGraph : public FuncGraph { // child graph execute order in parent graph std::vector> child_graph_order_; - // input_tensors of control parameter - std::shared_ptr> input_ctrl_tensors_; + // device loop control frontend tensors + std::map device_loop_ctrl_tensors_; + // device loop control backend nodes + std::map device_loop_ctrl_params_; // parameter graph std::weak_ptr parent_graph_; diff --git a/mindspore/ccsrc/backend/session/session_context.h b/mindspore/ccsrc/backend/session/session_context.h index 22397710478..add117e742e 100644 --- a/mindspore/ccsrc/backend/session/session_context.h +++ b/mindspore/ccsrc/backend/session/session_context.h @@ -27,8 +27,6 @@ #include "utils/ms_context.h" namespace mindspore { namespace session { -const char kInputCtrlTensors[] = "input_ctrl_tensors"; - class Context : public pipeline::ResourceBase { public: explicit Context(std::string target = kAscendDevice, uint32_t device_id = 0) diff --git a/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc b/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc index 971f40c6fa9..bfe964f6459 100644 --- a/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc +++ b/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc @@ -42,9 +42,6 @@ static constexpr uint32_t kAicpuUnloadFlag = 0; static constexpr uint32_t kTupleTaskId = 0; static constexpr uint32_t kTupleStreamId = 1; static constexpr uint32_t kTupleArgs = 2; -static constexpr uint32_t kCurrentStepTensorIndex = 0; -static constexpr uint32_t kCurrentEpochTensorIndex = 2; -static constexpr uint32_t kStepsPerEpochTensorIndex = 3; static constexpr uint64_t kOpDebugShape = 2048; static constexpr uint64_t kOpDebugHostMemSize = 2048; static constexpr uint64_t kOpDebugDevMemSize = sizeof(void *); @@ -59,6 +56,10 @@ static const std::map kOverflowModeStr = {{kNoOverflow, " constexpr const char *kNodeNameOpDebug = "Node_OpDebug"; constexpr const char *kOpTypeOpDebug = "Opdebug"; +static constexpr auto kCurLoopCountName = "current_loop_count"; +static constexpr auto kCurEpochCountName = "current_epoch_count"; +static constexpr auto kConstLoopNumInEpochName = "const_loop_num_in_epoch"; + namespace mindspore { namespace device { namespace ascend { @@ -136,9 +137,9 @@ void DataDumper::SetOpMappingInfo(NotNull dump_inf MS_EXCEPTION_IF_NULL(context_ptr); MS_EXCEPTION_IF_NULL(kernel_graph_); auto dump_path = DumpJsonParser::GetInstance().path(); - const auto &input_ctrl_tensors = kernel_graph_->input_ctrl_tensors(); - constexpr size_t kLoopSinkCtrlTensorNum = 3; // cur step, cur epoch, steps per epoch - bool valid_ctrl_tensors = input_ctrl_tensors != nullptr && input_ctrl_tensors->size() >= kLoopSinkCtrlTensorNum; + auto input_ctrl_tensors = kernel_graph_->device_loop_control_tensors(); + constexpr size_t kLoopSinkCtrlTensorNum = 5; // cur step, next step, cur epoch, one, steps per epoch + bool valid_ctrl_tensors = input_ctrl_tensors.size() >= kLoopSinkCtrlTensorNum; std::string net_name = DumpJsonParser::GetInstance().net_name(); std::string iteration = DumpJsonParser::GetInstance().iteration_string(); @@ -177,19 +178,19 @@ void DataDumper::SetOpMappingInfo(NotNull dump_inf MS_LOG(INFO) << "[DataDump] input_ctrl_tensors not valid."; return; } - const auto ¤t_step_tensor = input_ctrl_tensors->at(kCurrentStepTensorIndex); - const auto &currnet_epoch_tensor = input_ctrl_tensors->at(kCurrentEpochTensorIndex); - const auto &steps_per_epoch_tensor = input_ctrl_tensors->at(kStepsPerEpochTensorIndex); + const auto ¤t_step_tensor = input_ctrl_tensors[kCurLoopCountName]; + const auto ¤t_epoch_tensor = input_ctrl_tensors[kCurEpochCountName]; + const auto &steps_per_epoch_tensor = input_ctrl_tensors[kConstLoopNumInEpochName]; MS_EXCEPTION_IF_NULL(current_step_tensor); - MS_EXCEPTION_IF_NULL(currnet_epoch_tensor); + MS_EXCEPTION_IF_NULL(current_epoch_tensor); MS_EXCEPTION_IF_NULL(steps_per_epoch_tensor); MS_EXCEPTION_IF_NULL(current_step_tensor->device_address()); - MS_EXCEPTION_IF_NULL(currnet_epoch_tensor->device_address()); + MS_EXCEPTION_IF_NULL(current_epoch_tensor->device_address()); MS_EXCEPTION_IF_NULL(steps_per_epoch_tensor->device_address()); void *current_step = current_step_tensor->device_address()->GetMutablePtr(); - void *current_epoch = currnet_epoch_tensor->device_address()->GetMutablePtr(); + void *current_epoch = current_epoch_tensor->device_address()->GetMutablePtr(); void *steps_per_epoch = steps_per_epoch_tensor->device_address()->GetMutablePtr(); if (current_epoch != nullptr && current_step != nullptr && steps_per_epoch != nullptr) { diff --git a/mindspore/ccsrc/runtime/device/kernel_adjust.cc b/mindspore/ccsrc/runtime/device/kernel_adjust.cc index 1d22613e27d..e0440eb0a02 100644 --- a/mindspore/ccsrc/runtime/device/kernel_adjust.cc +++ b/mindspore/ccsrc/runtime/device/kernel_adjust.cc @@ -267,27 +267,6 @@ void KernelAdjust::InsertFpBpAndEosLoopStreamActive(const std::shared_ptrfullname_with_scope(); } -void KernelAdjust::InsertSwitchLoopInput(const std::shared_ptr &kernel_graph_ptr, - const std::map &switch_loop_input) { - MS_EXCEPTION_IF_NULL(kernel_graph_ptr); - std::vector *mute_inputs = kernel_graph_ptr->MutableInputs(); - MS_EXCEPTION_IF_NULL(mute_inputs); - mute_inputs->push_back(switch_loop_input.at(kCurLoopCountParamName)); - mute_inputs->push_back(switch_loop_input.at(kNextLoopCountParamName)); - mute_inputs->push_back(switch_loop_input.at(kEpochParamName)); - mute_inputs->push_back(switch_loop_input.at(kIterLoopParamName)); - mute_inputs->push_back(switch_loop_input.at(kOneParamName)); - for (const auto &input : kernel_graph_ptr->inputs()) { - MS_EXCEPTION_IF_NULL(input); - if (input->isa()) { - ParameterPtr param_ptr = input->cast(); - if (param_ptr == nullptr) { - MS_EXCEPTION(NotSupportError) << "Cast to parameter point failed !"; - } - } - } -} - void KernelAdjust::InsertGetNextLoopStreamSwitch( const std::shared_ptr &kernel_graph_ptr, std::vector *exec_order, uint32_t *getnext_switch_stream_id, uint32_t *getnext_stream_id, @@ -413,7 +392,7 @@ void KernelAdjust::InsertEosDoneSend(const std::shared_ptr MS_LOG(INFO) << "EoS loop insert EoS done Send " << eos_done_send->fullname_with_scope(); } -void KernelAdjust::InsertSwitchLoop(const std::shared_ptr &kernel_graph_ptr) { +void KernelAdjust::ProcessLoopSink(const std::shared_ptr &kernel_graph_ptr) { MS_EXCEPTION_IF_NULL(kernel_graph_ptr); device::ascend::AscendStreamMng &resource_manager = device::ascend::AscendStreamMng::GetInstance(); resource_manager.ResetResource(); @@ -421,7 +400,7 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr return; } if (kernel_graph_ptr->is_dynamic_shape()) { - MS_LOG(INFO) << "KernelGraph:" << kernel_graph_ptr->graph_id() << " is dynamic shape, skip InsertSwitchLoop"; + MS_LOG(INFO) << "KernelGraph:" << kernel_graph_ptr->graph_id() << " is dynamic shape, skip ProcessLoopSink"; return; } bool exist_getnext = ExistGetNext(kernel_graph_ptr); @@ -431,9 +410,7 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr if (exist_getnext) { ReorderGetNext(kernel_graph_ptr); } - std::map switch_loop_input; - CreateSwitchOpParameters(kernel_graph_ptr, &switch_loop_input); - InsertSwitchLoopInput(kernel_graph_ptr, switch_loop_input); + auto switch_loop_input = kernel_graph_ptr->device_loop_control_params(); const std::vector &orders = kernel_graph_ptr->execution_order(); if (orders.empty()) { @@ -511,52 +488,6 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr kernel_graph_ptr->set_execution_order(exec_order); } -void KernelAdjust::CreateSwitchOpParameters(const std::shared_ptr &kernel_graph_ptr, - std::map *switch_loop_input) { - MS_EXCEPTION_IF_NULL(kernel_graph_ptr); - MS_EXCEPTION_IF_NULL(switch_loop_input); - ShapeVector shp = {1}; - tensor::TensorPtr tensor_ptr = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(tensor_ptr); - mindspore::abstract::AbstractBasePtr paremeter_abstract_ptr = tensor_ptr->ToAbstract(); - if (paremeter_abstract_ptr == nullptr) { - MS_LOG(EXCEPTION) << "create abstract before insert switch op failed!"; - } - - ParameterPtr cur_loop_count = std::make_shared(kernel_graph_ptr); - MS_EXCEPTION_IF_NULL(cur_loop_count); - cur_loop_count->set_name(kCurLoopCountParamName); - cur_loop_count->set_abstract(paremeter_abstract_ptr); - ParameterPtr loop_count_cur = kernel_graph_ptr->NewParameter(cur_loop_count); - (*switch_loop_input)[kCurLoopCountParamName] = loop_count_cur; - - ParameterPtr next_loop_count = std::make_shared(kernel_graph_ptr); - MS_EXCEPTION_IF_NULL(next_loop_count); - next_loop_count->set_name(kNextLoopCountParamName); - next_loop_count->set_abstract(paremeter_abstract_ptr); - ParameterPtr loop_count_next = kernel_graph_ptr->NewParameter(next_loop_count); - (*switch_loop_input)[kNextLoopCountParamName] = loop_count_next; - - ParameterPtr iter_loop = std::make_shared(kernel_graph_ptr); - iter_loop->set_name(kIterLoopParamName); - iter_loop->set_abstract(paremeter_abstract_ptr); - ParameterPtr iter_loop_new = kernel_graph_ptr->NewParameter(iter_loop); - (*switch_loop_input)[kIterLoopParamName] = iter_loop_new; - - ParameterPtr one = std::make_shared(kernel_graph_ptr); - one->set_name(kOneParamName); - one->set_abstract(paremeter_abstract_ptr); - ParameterPtr one_new = kernel_graph_ptr->NewParameter(one); - (*switch_loop_input)[kOneParamName] = one_new; - - ParameterPtr epoch = std::make_shared(kernel_graph_ptr); - MS_EXCEPTION_IF_NULL(epoch); - epoch->set_name(kEpochParamName); - epoch->set_abstract(paremeter_abstract_ptr); - ParameterPtr epoch_new = kernel_graph_ptr->NewParameter(epoch); - (*switch_loop_input)[kEpochParamName] = epoch_new; -} - kernel::KernelBuildInfo::KernelBuildInfoBuilder KernelAdjust::CreateMngKernelBuilder( const std::vector &formats, const std::vector &type_ids) { kernel::KernelBuildInfo::KernelBuildInfoBuilder selected_kernel_builder; @@ -579,14 +510,14 @@ CNodePtr KernelAdjust::CreateStreamSwitchOp(const std::shared_ptr inputs; inputs.push_back(NewValueNode(stream_switch)); if (kind == kFpBpStreamSwitch || kind == kEosStreamSwitch) { - inputs.push_back(switch_loop_input.at(kNextLoopCountParamName)); + inputs.push_back(switch_loop_input.at(kNextLoopCountName)); } else if (kind == kGetNextStreamSwitch || kind == kIndependentStreamSwitch) { - inputs.push_back(switch_loop_input.at(kNextLoopCountParamName)); + inputs.push_back(switch_loop_input.at(kNextLoopCountName)); } else { MS_LOG(ERROR) << "unknown stream switch kind: " << kind; } - inputs.push_back(switch_loop_input.at(kIterLoopParamName)); + inputs.push_back(switch_loop_input.at(kConstLoopNumInEpochName)); MS_EXCEPTION_IF_NULL(kernel_graph_ptr); CNodePtr stream_switch_app = kernel_graph_ptr->NewCNode(inputs); MS_EXCEPTION_IF_NULL(stream_switch_app); @@ -681,12 +612,12 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptr inputs; inputs.push_back(NewValueNode(assign_add)); if (cur_loop) { - inputs.push_back(switch_loop_input.at(kCurLoopCountParamName)); + inputs.push_back(switch_loop_input.at(kCurLoopCountName)); } else { - inputs.push_back(switch_loop_input.at(kNextLoopCountParamName)); + inputs.push_back(switch_loop_input.at(kNextLoopCountName)); } - inputs.push_back(switch_loop_input.at(kOneParamName)); + inputs.push_back(switch_loop_input.at(kConstOneName)); CNodePtr assign_add_one = kernel_graph_ptr->NewCNode(inputs); MS_EXCEPTION_IF_NULL(assign_add_one); AnfAlgo::SetSelectKernelBuildInfo(selected_kernel_builder.Build(), assign_add_one.get()); @@ -697,8 +628,8 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptrset_abstract(switch_loop_input.at(kCurLoopCountParamName)->abstract()); + MS_EXCEPTION_IF_NULL(switch_loop_input.at(kCurLoopCountName)); + assign_add_one->set_abstract(switch_loop_input.at(kCurLoopCountName)->abstract()); // add AssignAdd op to kernel ref node map session::AnfWithOutIndex final_pair = std::make_pair(assign_add_one, 0); session::KernelWithIndex kernel_with_index = AnfAlgo::VisitKernel(AnfAlgo::GetInputNode(assign_add_one, 0), 0); @@ -706,147 +637,6 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptr &kernel_graph_ptr) { - auto &dump_json_parser = DumpJsonParser::GetInstance(); - bool sink_mode = (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE || kernel_graph_ptr->IsDatasetGraph()); - if (!sink_mode && dump_json_parser.async_dump_enabled()) { - InitCtrlInputs(kernel_graph_ptr); - return true; - } - if (!NeedInsertSwitch()) { - return true; - } - MS_EXCEPTION_IF_NULL(kernel_graph_ptr); - if (kernel_graph_ptr->is_dynamic_shape()) { - MS_LOG(INFO) << "Skip StepLoadCtrlInputs"; - return true; - } - auto input_nodes = kernel_graph_ptr->inputs(); - std::vector inputs; - LoadSwitchInputs(&inputs); - std::shared_ptr> inputsPtr = std::make_shared>(inputs); - kernel_graph_ptr->set_input_ctrl_tensors(inputsPtr); - size_t input_ctrl_size = inputs.size(); - // inputs_node:include four ctrl nodes in the back. such as:conv,loop_cnt, ites_loop, zero, one. - // deal four ctrl nodes. - for (size_t i = 0; i < inputs.size(); ++i) { - auto tensor = inputs[i]; - MS_EXCEPTION_IF_NULL(tensor); - size_t deal_index = input_nodes.size() - input_ctrl_size + i; - if (deal_index >= input_nodes.size()) { - MS_LOG(EXCEPTION) << "deal_index[" << deal_index << "] out of range"; - } - auto input_node = input_nodes[deal_index]; - bool need_sync = false; - MS_EXCEPTION_IF_NULL(input_node); - if (input_node->isa()) { - auto pk_node = input_node->cast(); - MS_EXCEPTION_IF_NULL(pk_node); - if (tensor->NeedSyncHostToDevice() || !pk_node->has_default()) { - need_sync = true; - } - } - if (need_sync) { - auto pk_node = input_node->cast(); - MS_EXCEPTION_IF_NULL(pk_node); - auto device_address = AnfAlgo::GetMutableOutputAddr(pk_node, 0); - MS_EXCEPTION_IF_NULL(device_address); - tensor->set_device_address(device_address); - if (!device_address->SyncHostToDevice(trans::GetRuntimePaddingShape(pk_node, 0), - LongToSize(tensor->data().nbytes()), tensor->data_type(), tensor->data_c(), - tensor->device_info().host_format_)) { - MS_LOG(INFO) << "SyncHostToDevice failed."; - return false; - } - } - tensor->set_sync_status(kNoNeedSync); - } - return true; -} - -void KernelAdjust::LoadSwitchInputs(std::vector *inputs) { - MS_LOG(INFO) << "---------------- LoadSwitchInputs---"; - MS_EXCEPTION_IF_NULL(inputs); - // current loop count - ShapeVector shp = {1}; - tensor::TensorPtr cur_loop_count = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(cur_loop_count); - int32_t *val = nullptr; - val = static_cast(cur_loop_count->data_c()); - MS_EXCEPTION_IF_NULL(val); - *val = 0; - inputs->push_back(cur_loop_count); - - // next loop count - tensor::TensorPtr next_loop_count = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(next_loop_count); - val = static_cast(next_loop_count->data_c()); - MS_EXCEPTION_IF_NULL(val); - *val = 0; - inputs->push_back(next_loop_count); - - // Epoch in device - tensor::TensorPtr epoch_tensor = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(epoch_tensor); - val = static_cast(epoch_tensor->data_c()); - MS_EXCEPTION_IF_NULL(val); - *val = 0; - inputs->push_back(epoch_tensor); - - // total loop count per iter - tensor::TensorPtr iter_loop_tensor = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(iter_loop_tensor); - val = static_cast(iter_loop_tensor->data_c()); - MS_EXCEPTION_IF_NULL(val); - if (ConfigManager::GetInstance().dataset_mode() == DS_NORMAL_MODE) { - MS_LOG(INFO) << "iter_loop_tensor not used in dataset_mode DS_NORMAL_MODE"; - *val = 0; - } else { - *val = SizeToInt(LongToSize(ConfigManager::GetInstance().iter_num())); - } - MS_LOG(INFO) << "iter_loop_tensor = " << *val; - inputs->push_back(iter_loop_tensor); - - tensor::TensorPtr one_tensor = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(one_tensor); - val = static_cast(one_tensor->data_c()); - MS_EXCEPTION_IF_NULL(val); - *val = 1; - inputs->push_back(one_tensor); - - MS_LOG(INFO) << "---------------- LoadSwitchInputs End--"; -} - -void KernelAdjust::InitCtrlInputs(const std::shared_ptr &kernel_graph_ptr) { - MS_LOG(INFO) << " -------------------------- InitCtrlInputs Start-- "; - std::vector inputs; - // prepare default values for CtrlInputs - LoadSwitchInputs(&inputs); - std::shared_ptr> inputsPtr = std::make_shared>(inputs); - kernel_graph_ptr->set_input_ctrl_tensors(inputsPtr); - for (size_t i = 0; i < inputs.size(); ++i) { - auto tensor = inputs[i]; - MS_EXCEPTION_IF_NULL(tensor); - device::DeviceAddressPtr device_address = std::make_shared( - nullptr, LongToSize(tensor->data().nbytes()), tensor->device_info().host_format_, tensor->data_type()); - auto ms_context = MsContext::GetInstance(); - MS_EXCEPTION_IF_NULL(ms_context); - auto device_id = ms_context->get_param(MS_CTX_DEVICE_ID); - auto runtime_instance = KernelRuntimeManager::Instance().GetSingleKernelRuntime(kAscendDevice, device_id); - if (runtime_instance->MallocMem(kStaticMem, LongToSize(tensor->data().nbytes()), device_address) == nullptr) { - MS_LOG(EXCEPTION) << "Cannot alloc address when flag is : " << kStaticMem - << " , tensor size is : " << tensor->data().nbytes(); - } - MS_EXCEPTION_IF_NULL(device_address); - tensor->set_device_address(device_address); - if (!device_address->SyncHostToDevice(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(), - tensor->data_c(), tensor->device_info().host_format_)) { - MS_LOG(EXCEPTION) << "SyncHostToDevice failed for InitCtrlInputs."; - } - } - MS_LOG(INFO) << " ------------------------- InitCtrlInputs End--"; -} - #ifndef ENABLE_SECURITY void KernelAdjust::Profiling(NotNull kernel_graph_ptr) { if (!ascend::ProfilingManager::GetInstance().IsProfiling()) { @@ -1111,5 +901,157 @@ void KernelAdjust::InsertOverflowCheckOperations(const std::shared_ptrset_execution_order(new_execution_order); } + +// device loop control +std::shared_ptr KernelAdjust::CreateTensor(int32_t initial_value) { + ShapeVector shp = {1}; + tensor::TensorPtr tensor = std::make_shared(kInt32->type_id(), shp); + MS_EXCEPTION_IF_NULL(tensor); + auto val = static_cast(tensor->data_c()); + MS_EXCEPTION_IF_NULL(val); + *val = initial_value; + return tensor; +} + +std::shared_ptr KernelAdjust::CreateParameter(const std::shared_ptr &kernel_graph_ptr, + const string parameter_name) { + ShapeVector shp = {1}; + tensor::TensorPtr tensor_ptr = std::make_shared(kInt32->type_id(), shp); + MS_EXCEPTION_IF_NULL(tensor_ptr); + mindspore::abstract::AbstractBasePtr parameter_abstract_ptr = tensor_ptr->ToAbstract(); + if (parameter_abstract_ptr == nullptr) { + MS_LOG(EXCEPTION) << "Create abstract for device loop control failed!"; + } + + ParameterPtr param = std::make_shared(kernel_graph_ptr); + MS_EXCEPTION_IF_NULL(param); + param->set_name(parameter_name); + param->set_abstract(parameter_abstract_ptr); + ParameterPtr graph_parameter = kernel_graph_ptr->NewParameter(param); + return graph_parameter; +} + +void KernelAdjust::InsertDeviceLoopCtrl(const std::shared_ptr &kernel_graph_ptr) { + MS_EXCEPTION_IF_NULL(kernel_graph_ptr); + std::map device_loop_ctrl_tensors; + std::map device_loop_ctrl_params; + + // current loop count + device_loop_ctrl_tensors[kCurLoopCountName] = CreateTensor(0); + device_loop_ctrl_params[kCurLoopCountName] = CreateParameter(kernel_graph_ptr, kCurLoopCountName); + + // next loop count tensor + device_loop_ctrl_tensors[kNextLoopCountName] = CreateTensor(0); + device_loop_ctrl_params[kNextLoopCountName] = CreateParameter(kernel_graph_ptr, kNextLoopCountName); + + // current epoch count tensor + device_loop_ctrl_tensors[kCurEpochCountName] = CreateTensor(0); + device_loop_ctrl_params[kCurEpochCountName] = CreateParameter(kernel_graph_ptr, kCurEpochCountName); + + // constant one tensor + device_loop_ctrl_tensors[kConstOneName] = CreateTensor(1); + device_loop_ctrl_params[kConstOneName] = CreateParameter(kernel_graph_ptr, kConstOneName); + + // constant loop num in epoch tensor + int32_t initial_value = 0; + if (NeedInsertSwitch()) { + initial_value = SizeToInt(LongToSize(ConfigManager::GetInstance().iter_num())); + } else { + MS_LOG(INFO) << "Tensor const_loop_num_in_epoch only used in loop sink mode."; + initial_value = 0; + } + MS_LOG(INFO) << "Loop num in epoch is " << initial_value; + device_loop_ctrl_tensors[kConstLoopNumInEpochName] = CreateTensor(initial_value); + device_loop_ctrl_params[kConstLoopNumInEpochName] = CreateParameter(kernel_graph_ptr, kConstLoopNumInEpochName); + + kernel_graph_ptr->set_device_loop_ctrl_tensors(device_loop_ctrl_tensors); + kernel_graph_ptr->set_device_loop_ctrl_params(device_loop_ctrl_params); +} + +void KernelAdjust::AssignLoopCtrlTensorMem(const session::KernelGraph &kernel_graph, KernelRuntime *runtime_instance, + const string name) { + MS_EXCEPTION_IF_NULL(runtime_instance); + auto device_loop_control_params = kernel_graph.device_loop_control_params(); + if (!device_loop_control_params.count(name)) { + MS_LOG(WARNING) << "Can't find Device Loop Control Parameter " << name; + return; + } + auto param = device_loop_control_params.at(name); + MS_EXCEPTION_IF_NULL(param); + + DeviceAddressPtr device_address = nullptr; + if (AnfAlgo::OutputAddrExist(param, 0)) { + device_address = AnfAlgo::GetMutableOutputAddr(param, 0); + MS_EXCEPTION_IF_NULL(device_address); + } else { + MS_LOG(INFO) << "Device Loop Control Parameter " << name << " have no address, allocating..."; + auto size = AnfAlgo::GetOutputTensorMemSize(param, 0); + auto format = AnfAlgo::GetOutputFormat(param, 0); + auto type_id = AnfAlgo::GetOutputDeviceDataType(param, 0); + + device_address = std::make_shared(nullptr, size, format, type_id); + if (runtime_instance->MallocMem(kStaticMem, size, device_address) == nullptr) { + MS_LOG(EXCEPTION) << "Cannot alloc static memory for device loop control parameter " << name + << " , tensor size is : " << size; + } + MS_EXCEPTION_IF_NULL(device_address); + AnfAlgo::SetOutputAddr(device_address, 0, param.get()); + } + + auto device_loop_control_tensors = kernel_graph.device_loop_control_tensors(); + auto tensor = device_loop_control_tensors.at(name); + MS_EXCEPTION_IF_NULL(tensor); + tensor->set_device_address(device_address); + if (!device_address->SyncHostToDevice(trans::GetRuntimePaddingShape(param, 0), LongToSize(tensor->data().nbytes()), + tensor->data_type(), tensor->data_c(), tensor->device_info().host_format_)) { + MS_LOG(EXCEPTION) << "SyncHostToDevice failed for device loop control parameter " << name; + } +} + +void KernelAdjust::AssignLoopCtrlMemory(const session::KernelGraph &kernel_graph_ptr) { + MS_LOG(INFO) << "Assign device loop control memory"; + auto ms_context = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(ms_context); + auto device_id = ms_context->get_param(MS_CTX_DEVICE_ID); + auto runtime_instance = KernelRuntimeManager::Instance().GetSingleKernelRuntime(kAscendDevice, device_id); + MS_EXCEPTION_IF_NULL(runtime_instance); + AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kCurLoopCountName); + AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kNextLoopCountName); + AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kCurEpochCountName); + AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kConstOneName); + AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kConstLoopNumInEpochName); +} + +void KernelAdjust::SetDeviceLoopCtrlTensor(const std::shared_ptr &kernel_graph_ptr, + const std::string name, int32_t value) { + MS_EXCEPTION_IF_NULL(kernel_graph_ptr); + auto device_loop_control_tensors = kernel_graph_ptr->device_loop_control_tensors(); + if (!device_loop_control_tensors.count(name)) { + MS_LOG(WARNING) << "Can't find Device Loop Control Tensor " << name; + return; + } + auto tensor = device_loop_control_tensors.at(name); + MS_EXCEPTION_IF_NULL(tensor); + auto *cur_val = static_cast(tensor->data_c()); + MS_EXCEPTION_IF_NULL(cur_val); + *cur_val = value; + tensor->set_sync_status(kNeedSyncHostToDevice); + auto device_address = tensor->device_address(); + MS_EXCEPTION_IF_NULL(device_address); + if (!device_address->SyncHostToDevice(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(), + tensor->data_c(), tensor->device_info().host_format_)) { + MS_LOG(EXCEPTION) << "SyncHostToDevice failed for device loop control parameter " << name; + } +} + +void KernelAdjust::LoadDeviceLoopCtrlParameters(const std::shared_ptr &kernel_graph_ptr) { + MS_EXCEPTION_IF_NULL(kernel_graph_ptr); + MS_LOG(INFO) << "Load device loop control data"; + SetDeviceLoopCtrlTensor(kernel_graph_ptr, kCurLoopCountName, 0); + SetDeviceLoopCtrlTensor(kernel_graph_ptr, kNextLoopCountName, 0); + SetDeviceLoopCtrlTensor(kernel_graph_ptr, kCurEpochCountName, SizeToInt(kernel_graph_ptr->current_epoch())); + + kernel_graph_ptr->set_current_epoch(kernel_graph_ptr->current_epoch() + 1); +} } // namespace device } // namespace mindspore diff --git a/mindspore/ccsrc/runtime/device/kernel_adjust.h b/mindspore/ccsrc/runtime/device/kernel_adjust.h index 165f3f19181..9b9afedbf82 100644 --- a/mindspore/ccsrc/runtime/device/kernel_adjust.h +++ b/mindspore/ccsrc/runtime/device/kernel_adjust.h @@ -36,13 +36,14 @@ using mindspore::device::ascend::ProfilingTraceInfo; using mindspore::device::ascend::ProfilingUtils; #endif namespace mindspore { -constexpr auto kCurLoopCountParamName = "cur_loop_count"; -constexpr auto kNextLoopCountParamName = "next_loop_count"; -constexpr auto kIterLoopParamName = "iter_loop"; -constexpr auto kOneParamName = "one"; -constexpr auto kEpochParamName = "loop_epoch"; +// device loop control +constexpr auto kCurLoopCountName = "current_loop_count"; +constexpr auto kNextLoopCountName = "next_loop_count"; +constexpr auto kCurEpochCountName = "current_epoch_count"; +constexpr auto kConstOneName = "const_one"; +constexpr auto kConstLoopNumInEpochName = "const_loop_num_in_epoch"; + constexpr auto kStreamNeedActivedFirst = "stream_need_active_first"; -constexpr uint32_t kSecondStreamSwitchLabel = 2; enum StreamSwitchKind { kFpBpStreamSwitch = 0, kGetNextStreamSwitch = 1, @@ -57,10 +58,13 @@ class KernelAdjust { static KernelAdjust instance; return instance; } + // device loop control + void InsertDeviceLoopCtrl(const std::shared_ptr &kernel_graph_ptr); + void AssignLoopCtrlMemory(const session::KernelGraph &kernel_graph_ptr); + void LoadDeviceLoopCtrlParameters(const std::shared_ptr &kernel_graph_ptr); void InsertOverflowCheckOperations(const std::shared_ptr &kernel_graph_ptr); - void InsertSwitchLoop(const std::shared_ptr &kernel_graph_ptr); - bool StepLoadCtrlInputs(const std::shared_ptr &kernel_graph_ptr); + void ProcessLoopSink(const std::shared_ptr &kernel_graph_ptr); #ifndef ENABLE_SECURITY void Profiling(NotNull kernel_graph_ptr); #endif @@ -82,8 +86,6 @@ class KernelAdjust { void ReorderGetNext(const std::shared_ptr &kernel_graph_ptr); CNodePtr CreateRecvApplyKernel(const std::shared_ptr &graph_ptr, uint32_t event_id); CNodePtr CreateSendApplyKernel(const std::shared_ptr &graph_ptr, uint32_t event_id); - void CreateSwitchOpParameters(const std::shared_ptr &kernel_graph_ptr, - std::map *switch_loop_input); CNodePtr CreateStreamSwitchOp(const std::shared_ptr &kernel_graph_ptr, const std::map &switch_loop_input, StreamSwitchKind kind); @@ -97,17 +99,12 @@ class KernelAdjust { bool cur_loop); kernel::KernelBuildInfo::KernelBuildInfoBuilder CreateMngKernelBuilder(const std::vector &formats, const std::vector &type_ids); - void LoadSwitchInputs(std::vector *inputs); - void InitCtrlInputs(const std::shared_ptr &kernel_graph_ptr); #ifndef ENABLE_SECURITY void InsertProfilingKernel(const ProfilingTraceInfo &profiling_trace_info, NotNull kernel_graph_ptr); #endif bool ExistIndependent(const std::shared_ptr &kernel_graph_ptr); bool ExistGetNext(const std::shared_ptr &kernel_graph_ptr); - - void InsertSwitchLoopInput(const std::shared_ptr &kernel_graph_ptr, - const std::map &switch_loop_input); void InsertGetNextLoopStreamSwitch(const std::shared_ptr &kernel_graph_ptr, std::vector *exec_order, uint32_t *getnext_switch_stream_id, uint32_t *getnext_stream_id, @@ -158,6 +155,13 @@ class KernelAdjust { void InsertFpBpAndEosLoopStreamActive(const std::shared_ptr &kernel_graph_ptr, std::vector *exec_order, const std::vector &fpbp_active_streams); + void SetDeviceLoopCtrlTensor(const std::shared_ptr &kernel_graph_ptr, const string name, + int32_t value); + void AssignLoopCtrlTensorMem(const session::KernelGraph &kernel_graph, KernelRuntime *runtime_instance, + const string name); + std::shared_ptr CreateTensor(int32_t initial_value); + std::shared_ptr CreateParameter(const std::shared_ptr &kernel_graph_ptr, + const string parameter_name); }; } // namespace device } // namespace mindspore diff --git a/tests/ut/cpp/stub/tasksink/ascend_stream_assign_stub.cc b/tests/ut/cpp/stub/tasksink/ascend_stream_assign_stub.cc old mode 100755 new mode 100644 index dc603a12e36..4a83c7f284d --- a/tests/ut/cpp/stub/tasksink/ascend_stream_assign_stub.cc +++ b/tests/ut/cpp/stub/tasksink/ascend_stream_assign_stub.cc @@ -31,9 +31,17 @@ void AscendStreamAssign::GetWaitStreams(vector *wait_active_stream_lis void AscendStreamAssign::GetHcomStreams(std::vector *streams) { return; } } // namespace ascend -void KernelAdjust::InsertSwitchLoop(const std::shared_ptr &kernel_graph_ptr) { return; } -bool KernelAdjust::StepLoadCtrlInputs(const std::shared_ptr &kernel_graph_ptr) { return true; } + +void KernelAdjust::InsertDeviceLoopCtrl(const std::shared_ptr &kernel_graph_ptr) { return; } +void KernelAdjust::AssignLoopCtrlMemory(const session::KernelGraph &kernel_graph_ptr) { return; } +void KernelAdjust::LoadDeviceLoopCtrlParameters(const std::shared_ptr &kernel_graph_ptr) { + return; +} + bool KernelAdjust::NeedInsertSwitch() { return true; } + +void KernelAdjust::ProcessLoopSink(const std::shared_ptr &kernel_graph_ptr) { return; } + #ifndef ENABLE_SECURITY void KernelAdjust::Profiling(NotNull kernel_graph_ptr) { return; } #endif