!25153 refactor device loop control

Merge pull request !25153 from laiyongqiang/adjust_kernel_refactory
This commit is contained in:
i-robot 2021-10-23 07:42:49 +00:00 committed by Gitee
commit cb307e24cf
7 changed files with 226 additions and 357 deletions

View File

@ -80,14 +80,7 @@ using mindspore::profiler::ascend::MemoryProfiling;
namespace mindspore {
namespace session {
const size_t kInvalidIndex = SIZE_MAX;
const size_t kLoopSinkTensorNum = 3;
const size_t kLoopSinkCurLoopIndex = 0;
const size_t kLoopSinkNextLoopIndex = 1;
const size_t kLoopSinkEpochIndex = 2;
const size_t kLabelNumsThreshold = 1023;
constexpr char SR_TAG[] = "sr_tag";
constexpr char BACKWARD[] = "backward";
constexpr auto kUnknowErrorString = "Unknown error occurred";
namespace {
#ifndef ENABLE_SECURITY
@ -209,88 +202,6 @@ void GenOpOutputStubTensor(const KernelGraphPtr &single_op_graph, const CNodePtr
}
}
size_t LoadCtrlInputTensor(const std::shared_ptr<KernelGraph> &graph, std::vector<tensor::TensorPtr> *inputs) {
MS_EXCEPTION_IF_NULL(graph);
MS_LOG(DEBUG) << "Load kInputCtrlTensors";
auto inputs_params = graph->input_ctrl_tensors();
if (inputs_params == nullptr) {
return 0;
}
if (inputs_params->size() < kLoopSinkTensorNum) {
MS_LOG(EXCEPTION) << "Illegal inputs_params size";
}
// update current loop tensor to 0 per iterator
auto cur_loop_tensor = (*inputs_params)[kLoopSinkCurLoopIndex];
MS_EXCEPTION_IF_NULL(cur_loop_tensor);
auto *cur_val = static_cast<int32_t *>(cur_loop_tensor->data_c());
MS_EXCEPTION_IF_NULL(cur_val);
*cur_val = 0;
cur_loop_tensor->set_sync_status(kNeedSyncHostToDevice);
// set loop_count to zero
if (inputs) {
inputs->push_back(cur_loop_tensor);
} else {
auto device_address = cur_loop_tensor->device_address();
if (!device_address->SyncHostToDevice(cur_loop_tensor->shape(), LongToSize(cur_loop_tensor->data().nbytes()),
cur_loop_tensor->data_type(), cur_loop_tensor->data_c(),
cur_loop_tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for cur_loop_tensor needed for async dump.";
}
}
// update next loop tensor to 0 per iterator
auto next_loop_tensor = (*inputs_params)[kLoopSinkNextLoopIndex];
MS_EXCEPTION_IF_NULL(next_loop_tensor);
auto *next_val = static_cast<int32_t *>(next_loop_tensor->data_c());
MS_EXCEPTION_IF_NULL(next_val);
*next_val = 0;
next_loop_tensor->set_sync_status(kNeedSyncHostToDevice);
// set loop_count to zero
if (inputs) {
inputs->push_back(next_loop_tensor);
} else {
auto device_address = next_loop_tensor->device_address();
if (!device_address->SyncHostToDevice(next_loop_tensor->shape(), LongToSize(next_loop_tensor->data().nbytes()),
next_loop_tensor->data_type(), next_loop_tensor->data_c(),
next_loop_tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for next_loop_tensor needed for async dump.";
}
}
auto epoch_tensor = (*inputs_params)[kLoopSinkEpochIndex];
MS_EXCEPTION_IF_NULL(epoch_tensor);
auto *epoch_val = static_cast<int32_t *>(epoch_tensor->data_c());
MS_EXCEPTION_IF_NULL(epoch_val);
*epoch_val = SizeToInt(graph->current_epoch());
epoch_tensor->set_sync_status(kNeedSyncHostToDevice);
if (inputs) {
inputs->push_back(epoch_tensor);
} else {
auto device_address = epoch_tensor->device_address();
if (!device_address->SyncHostToDevice(epoch_tensor->shape(), LongToSize(epoch_tensor->data().nbytes()),
epoch_tensor->data_type(), epoch_tensor->data_c(),
epoch_tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for epoch_tensor needed for async dump.";
}
}
MS_LOG(DEBUG) << "Load epoch_val:" << *epoch_val;
graph->set_current_epoch(graph->current_epoch() + 1);
return inputs_params->size();
}
void UpdateCtrlInputTensor(const std::shared_ptr<KernelGraph> &graph, std::vector<tensor::TensorPtr> *inputs,
size_t *input_ctrl_size) {
if (graph->input_ctrl_tensors()) {
auto &dump_json_parser = DumpJsonParser::GetInstance();
bool sink_mode = (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE || graph->IsDatasetGraph());
if (sink_mode || !dump_json_parser.async_dump_enabled()) {
*input_ctrl_size = LoadCtrlInputTensor(graph, inputs);
} else {
LoadCtrlInputTensor(graph, nullptr);
}
}
}
bool NeedMemcpyInDevice(const device::DeviceAddressPtr &src_device_addr,
const device::DeviceAddressPtr &dst_device_addr) {
MS_EXCEPTION_IF_NULL(dst_device_addr);
@ -378,15 +289,10 @@ void AscendSession::UnifyMindIR(const KernelGraphPtr &graph) {
void AscendSession::LoadInputData(const std::shared_ptr<KernelGraph> &kernel_graph,
const std::vector<tensor::TensorPtr> &inputs_const) const {
std::vector<tensor::TensorPtr> inputs(inputs_const);
size_t input_ctrl_size = kLoopSinkTensorNum;
uint32_t device_memcpy_nums = 0;
MS_EXCEPTION_IF_NULL(kernel_graph);
UpdateCtrlInputTensor(kernel_graph, &inputs, &input_ctrl_size);
device::KernelAdjust::GetInstance().LoadDeviceLoopCtrlParameters(kernel_graph);
auto &input_nodes = kernel_graph->input_nodes();
if ((inputs.size() + input_ctrl_size) - kLoopSinkTensorNum != input_nodes.size()) {
MS_LOG(EXCEPTION) << "Tensor input:" << inputs.size() << " is not equal graph inputs:" << input_nodes.size()
<< ", input_ctrl_size:" << input_ctrl_size;
}
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
auto enable_mem_scheduler = ms_context->get_param<bool>(MS_CTX_ENABLE_MEM_SCHEDULER);
@ -1175,7 +1081,8 @@ void AscendSession::AdjustKernel(const std::shared_ptr<KernelGraph> &kernel_grap
// prepare for next step from json get atomic info
BuildKernel(kernel_graph);
device::ascend::KernelBuildPreprocess(kernel_graph.get());
device::KernelAdjust::GetInstance().InsertSwitchLoop(kernel_graph);
device::KernelAdjust::GetInstance().InsertDeviceLoopCtrl(kernel_graph);
device::KernelAdjust::GetInstance().ProcessLoopSink(kernel_graph);
#ifdef ENABLE_DUMP_IR
auto context_ptr = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(context_ptr);
@ -1377,6 +1284,7 @@ void AscendSession::MemoryAlloc(KernelGraph *kernel_graph) const {
auto runtime_instance = device::KernelRuntimeManager::Instance().GetKernelRuntime(kAscendDevice, device_id_);
MS_EXCEPTION_IF_NULL(runtime_instance);
runtime_instance->AssignMemory(*kernel_graph);
device::KernelAdjust::GetInstance().AssignLoopCtrlMemory(*kernel_graph);
MS_LOG(INFO) << "Status record: end memory alloc. graph id: " << kernel_graph->graph_id();
}
@ -1415,7 +1323,6 @@ void AscendSession::Load(const std::shared_ptr<KernelGraph> &kernel_graph) const
auto context_ptr = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(context_ptr);
bool is_task_sink = context_ptr->get_param<bool>(MS_CTX_ENABLE_TASK_SINK);
(void)device::KernelAdjust::GetInstance().StepLoadCtrlInputs(kernel_graph);
auto runtime_instance = device::KernelRuntimeManager::Instance().GetKernelRuntime(kAscendDevice, device_id_);
MS_EXCEPTION_IF_NULL(runtime_instance);
bool ret_ok = runtime_instance->Load(*kernel_graph, is_task_sink);

View File

@ -83,7 +83,8 @@ class KernelGraph : public FuncGraph {
summary_node_exist_ = graph.summary_node_exist_;
valid_inputs_ = graph.valid_inputs_;
child_graph_order_ = graph.child_graph_order_;
input_ctrl_tensors_ = graph.input_ctrl_tensors_;
device_loop_ctrl_tensors_ = graph.device_loop_ctrl_tensors_;
device_loop_ctrl_params_ = graph.device_loop_ctrl_params_;
parent_graph_ = graph.parent_graph_;
start_label_ = graph.start_label_;
end_goto_ = graph.end_goto_;
@ -202,12 +203,18 @@ class KernelGraph : public FuncGraph {
// checkout whether current graph is leaf graph
bool IsLeafGraph() const;
// set input_tensors pointer of control parameter
void set_input_ctrl_tensors(const std::shared_ptr<std::vector<tensor::TensorPtr>> &input_tensors_ptr) {
input_ctrl_tensors_ = input_tensors_ptr;
void set_device_loop_ctrl_tensors(const std::map<std::string, tensor::TensorPtr> &device_loop_ctrl_tensors) {
device_loop_ctrl_tensors_ = device_loop_ctrl_tensors;
}
// get input_tensors pointer of control parameter
std::shared_ptr<std::vector<tensor::TensorPtr>> input_ctrl_tensors() const { return input_ctrl_tensors_; }
std::map<std::string, tensor::TensorPtr> device_loop_control_tensors() const { return device_loop_ctrl_tensors_; }
void set_device_loop_ctrl_params(const std::map<std::string, mindspore::ParameterPtr> &device_loop_ctrl_params) {
device_loop_ctrl_params_ = device_loop_ctrl_params;
}
const std::map<std::string, mindspore::ParameterPtr> device_loop_control_params() const {
return device_loop_ctrl_params_;
}
// get parent kernel graph
std::weak_ptr<KernelGraph> parent_graph() const { return parent_graph_; }
// set parent kernel graph
@ -442,8 +449,10 @@ class KernelGraph : public FuncGraph {
// child graph execute order in parent graph
std::vector<std::weak_ptr<KernelGraph>> child_graph_order_;
// input_tensors of control parameter
std::shared_ptr<std::vector<tensor::TensorPtr>> input_ctrl_tensors_;
// device loop control frontend tensors
std::map<std::string, tensor::TensorPtr> device_loop_ctrl_tensors_;
// device loop control backend nodes
std::map<std::string, mindspore::ParameterPtr> device_loop_ctrl_params_;
// parameter graph
std::weak_ptr<KernelGraph> parent_graph_;

View File

@ -27,8 +27,6 @@
#include "utils/ms_context.h"
namespace mindspore {
namespace session {
const char kInputCtrlTensors[] = "input_ctrl_tensors";
class Context : public pipeline::ResourceBase {
public:
explicit Context(std::string target = kAscendDevice, uint32_t device_id = 0)

View File

@ -42,9 +42,6 @@ static constexpr uint32_t kAicpuUnloadFlag = 0;
static constexpr uint32_t kTupleTaskId = 0;
static constexpr uint32_t kTupleStreamId = 1;
static constexpr uint32_t kTupleArgs = 2;
static constexpr uint32_t kCurrentStepTensorIndex = 0;
static constexpr uint32_t kCurrentEpochTensorIndex = 2;
static constexpr uint32_t kStepsPerEpochTensorIndex = 3;
static constexpr uint64_t kOpDebugShape = 2048;
static constexpr uint64_t kOpDebugHostMemSize = 2048;
static constexpr uint64_t kOpDebugDevMemSize = sizeof(void *);
@ -59,6 +56,10 @@ static const std::map<uint32_t, std::string> kOverflowModeStr = {{kNoOverflow, "
constexpr const char *kNodeNameOpDebug = "Node_OpDebug";
constexpr const char *kOpTypeOpDebug = "Opdebug";
static constexpr auto kCurLoopCountName = "current_loop_count";
static constexpr auto kCurEpochCountName = "current_epoch_count";
static constexpr auto kConstLoopNumInEpochName = "const_loop_num_in_epoch";
namespace mindspore {
namespace device {
namespace ascend {
@ -136,9 +137,9 @@ void DataDumper::SetOpMappingInfo(NotNull<aicpu::dump::OpMappingInfo *> dump_inf
MS_EXCEPTION_IF_NULL(context_ptr);
MS_EXCEPTION_IF_NULL(kernel_graph_);
auto dump_path = DumpJsonParser::GetInstance().path();
const auto &input_ctrl_tensors = kernel_graph_->input_ctrl_tensors();
constexpr size_t kLoopSinkCtrlTensorNum = 3; // cur step, cur epoch, steps per epoch
bool valid_ctrl_tensors = input_ctrl_tensors != nullptr && input_ctrl_tensors->size() >= kLoopSinkCtrlTensorNum;
auto input_ctrl_tensors = kernel_graph_->device_loop_control_tensors();
constexpr size_t kLoopSinkCtrlTensorNum = 5; // cur step, next step, cur epoch, one, steps per epoch
bool valid_ctrl_tensors = input_ctrl_tensors.size() >= kLoopSinkCtrlTensorNum;
std::string net_name = DumpJsonParser::GetInstance().net_name();
std::string iteration = DumpJsonParser::GetInstance().iteration_string();
@ -177,19 +178,19 @@ void DataDumper::SetOpMappingInfo(NotNull<aicpu::dump::OpMappingInfo *> dump_inf
MS_LOG(INFO) << "[DataDump] input_ctrl_tensors not valid.";
return;
}
const auto &current_step_tensor = input_ctrl_tensors->at(kCurrentStepTensorIndex);
const auto &currnet_epoch_tensor = input_ctrl_tensors->at(kCurrentEpochTensorIndex);
const auto &steps_per_epoch_tensor = input_ctrl_tensors->at(kStepsPerEpochTensorIndex);
const auto &current_step_tensor = input_ctrl_tensors[kCurLoopCountName];
const auto &current_epoch_tensor = input_ctrl_tensors[kCurEpochCountName];
const auto &steps_per_epoch_tensor = input_ctrl_tensors[kConstLoopNumInEpochName];
MS_EXCEPTION_IF_NULL(current_step_tensor);
MS_EXCEPTION_IF_NULL(currnet_epoch_tensor);
MS_EXCEPTION_IF_NULL(current_epoch_tensor);
MS_EXCEPTION_IF_NULL(steps_per_epoch_tensor);
MS_EXCEPTION_IF_NULL(current_step_tensor->device_address());
MS_EXCEPTION_IF_NULL(currnet_epoch_tensor->device_address());
MS_EXCEPTION_IF_NULL(current_epoch_tensor->device_address());
MS_EXCEPTION_IF_NULL(steps_per_epoch_tensor->device_address());
void *current_step = current_step_tensor->device_address()->GetMutablePtr();
void *current_epoch = currnet_epoch_tensor->device_address()->GetMutablePtr();
void *current_epoch = current_epoch_tensor->device_address()->GetMutablePtr();
void *steps_per_epoch = steps_per_epoch_tensor->device_address()->GetMutablePtr();
if (current_epoch != nullptr && current_step != nullptr && steps_per_epoch != nullptr) {

View File

@ -267,27 +267,6 @@ void KernelAdjust::InsertFpBpAndEosLoopStreamActive(const std::shared_ptr<sessio
MS_LOG(INFO) << "FpBp loop insert FpBp loop and Eos loop Stream Active " << fpbp_active_app->fullname_with_scope();
}
void KernelAdjust::InsertSwitchLoopInput(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
const std::map<std::string, mindspore::ParameterPtr> &switch_loop_input) {
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
std::vector<AnfNodePtr> *mute_inputs = kernel_graph_ptr->MutableInputs();
MS_EXCEPTION_IF_NULL(mute_inputs);
mute_inputs->push_back(switch_loop_input.at(kCurLoopCountParamName));
mute_inputs->push_back(switch_loop_input.at(kNextLoopCountParamName));
mute_inputs->push_back(switch_loop_input.at(kEpochParamName));
mute_inputs->push_back(switch_loop_input.at(kIterLoopParamName));
mute_inputs->push_back(switch_loop_input.at(kOneParamName));
for (const auto &input : kernel_graph_ptr->inputs()) {
MS_EXCEPTION_IF_NULL(input);
if (input->isa<Parameter>()) {
ParameterPtr param_ptr = input->cast<ParameterPtr>();
if (param_ptr == nullptr) {
MS_EXCEPTION(NotSupportError) << "Cast to parameter point failed !";
}
}
}
}
void KernelAdjust::InsertGetNextLoopStreamSwitch(
const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr, std::vector<CNodePtr> *exec_order,
uint32_t *getnext_switch_stream_id, uint32_t *getnext_stream_id,
@ -413,7 +392,7 @@ void KernelAdjust::InsertEosDoneSend(const std::shared_ptr<session::KernelGraph>
MS_LOG(INFO) << "EoS loop insert EoS done Send " << eos_done_send->fullname_with_scope();
}
void KernelAdjust::InsertSwitchLoop(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
void KernelAdjust::ProcessLoopSink(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
device::ascend::AscendStreamMng &resource_manager = device::ascend::AscendStreamMng::GetInstance();
resource_manager.ResetResource();
@ -421,7 +400,7 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr<session::KernelGraph>
return;
}
if (kernel_graph_ptr->is_dynamic_shape()) {
MS_LOG(INFO) << "KernelGraph:" << kernel_graph_ptr->graph_id() << " is dynamic shape, skip InsertSwitchLoop";
MS_LOG(INFO) << "KernelGraph:" << kernel_graph_ptr->graph_id() << " is dynamic shape, skip ProcessLoopSink";
return;
}
bool exist_getnext = ExistGetNext(kernel_graph_ptr);
@ -431,9 +410,7 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr<session::KernelGraph>
if (exist_getnext) {
ReorderGetNext(kernel_graph_ptr);
}
std::map<std::string, mindspore::ParameterPtr> switch_loop_input;
CreateSwitchOpParameters(kernel_graph_ptr, &switch_loop_input);
InsertSwitchLoopInput(kernel_graph_ptr, switch_loop_input);
auto switch_loop_input = kernel_graph_ptr->device_loop_control_params();
const std::vector<CNodePtr> &orders = kernel_graph_ptr->execution_order();
if (orders.empty()) {
@ -511,52 +488,6 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr<session::KernelGraph>
kernel_graph_ptr->set_execution_order(exec_order);
}
void KernelAdjust::CreateSwitchOpParameters(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
std::map<std::string, mindspore::ParameterPtr> *switch_loop_input) {
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
MS_EXCEPTION_IF_NULL(switch_loop_input);
ShapeVector shp = {1};
tensor::TensorPtr tensor_ptr = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(tensor_ptr);
mindspore::abstract::AbstractBasePtr paremeter_abstract_ptr = tensor_ptr->ToAbstract();
if (paremeter_abstract_ptr == nullptr) {
MS_LOG(EXCEPTION) << "create abstract before insert switch op failed!";
}
ParameterPtr cur_loop_count = std::make_shared<Parameter>(kernel_graph_ptr);
MS_EXCEPTION_IF_NULL(cur_loop_count);
cur_loop_count->set_name(kCurLoopCountParamName);
cur_loop_count->set_abstract(paremeter_abstract_ptr);
ParameterPtr loop_count_cur = kernel_graph_ptr->NewParameter(cur_loop_count);
(*switch_loop_input)[kCurLoopCountParamName] = loop_count_cur;
ParameterPtr next_loop_count = std::make_shared<Parameter>(kernel_graph_ptr);
MS_EXCEPTION_IF_NULL(next_loop_count);
next_loop_count->set_name(kNextLoopCountParamName);
next_loop_count->set_abstract(paremeter_abstract_ptr);
ParameterPtr loop_count_next = kernel_graph_ptr->NewParameter(next_loop_count);
(*switch_loop_input)[kNextLoopCountParamName] = loop_count_next;
ParameterPtr iter_loop = std::make_shared<Parameter>(kernel_graph_ptr);
iter_loop->set_name(kIterLoopParamName);
iter_loop->set_abstract(paremeter_abstract_ptr);
ParameterPtr iter_loop_new = kernel_graph_ptr->NewParameter(iter_loop);
(*switch_loop_input)[kIterLoopParamName] = iter_loop_new;
ParameterPtr one = std::make_shared<Parameter>(kernel_graph_ptr);
one->set_name(kOneParamName);
one->set_abstract(paremeter_abstract_ptr);
ParameterPtr one_new = kernel_graph_ptr->NewParameter(one);
(*switch_loop_input)[kOneParamName] = one_new;
ParameterPtr epoch = std::make_shared<Parameter>(kernel_graph_ptr);
MS_EXCEPTION_IF_NULL(epoch);
epoch->set_name(kEpochParamName);
epoch->set_abstract(paremeter_abstract_ptr);
ParameterPtr epoch_new = kernel_graph_ptr->NewParameter(epoch);
(*switch_loop_input)[kEpochParamName] = epoch_new;
}
kernel::KernelBuildInfo::KernelBuildInfoBuilder KernelAdjust::CreateMngKernelBuilder(
const std::vector<std::string> &formats, const std::vector<TypeId> &type_ids) {
kernel::KernelBuildInfo::KernelBuildInfoBuilder selected_kernel_builder;
@ -579,14 +510,14 @@ CNodePtr KernelAdjust::CreateStreamSwitchOp(const std::shared_ptr<session::Kerne
std::vector<AnfNodePtr> inputs;
inputs.push_back(NewValueNode(stream_switch));
if (kind == kFpBpStreamSwitch || kind == kEosStreamSwitch) {
inputs.push_back(switch_loop_input.at(kNextLoopCountParamName));
inputs.push_back(switch_loop_input.at(kNextLoopCountName));
} else if (kind == kGetNextStreamSwitch || kind == kIndependentStreamSwitch) {
inputs.push_back(switch_loop_input.at(kNextLoopCountParamName));
inputs.push_back(switch_loop_input.at(kNextLoopCountName));
} else {
MS_LOG(ERROR) << "unknown stream switch kind: " << kind;
}
inputs.push_back(switch_loop_input.at(kIterLoopParamName));
inputs.push_back(switch_loop_input.at(kConstLoopNumInEpochName));
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
CNodePtr stream_switch_app = kernel_graph_ptr->NewCNode(inputs);
MS_EXCEPTION_IF_NULL(stream_switch_app);
@ -681,12 +612,12 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptr<session::K
std::vector<AnfNodePtr> inputs;
inputs.push_back(NewValueNode(assign_add));
if (cur_loop) {
inputs.push_back(switch_loop_input.at(kCurLoopCountParamName));
inputs.push_back(switch_loop_input.at(kCurLoopCountName));
} else {
inputs.push_back(switch_loop_input.at(kNextLoopCountParamName));
inputs.push_back(switch_loop_input.at(kNextLoopCountName));
}
inputs.push_back(switch_loop_input.at(kOneParamName));
inputs.push_back(switch_loop_input.at(kConstOneName));
CNodePtr assign_add_one = kernel_graph_ptr->NewCNode(inputs);
MS_EXCEPTION_IF_NULL(assign_add_one);
AnfAlgo::SetSelectKernelBuildInfo(selected_kernel_builder.Build(), assign_add_one.get());
@ -697,8 +628,8 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptr<session::K
AnfAlgo::SetNodeAttr("input_names", input_names_v, assign_add_one);
AnfAlgo::SetNodeAttr("output_names", output_names_v, assign_add_one);
selected_kernel_builder.SetKernelType(KernelType::TBE_KERNEL);
MS_EXCEPTION_IF_NULL(switch_loop_input.at(kCurLoopCountParamName));
assign_add_one->set_abstract(switch_loop_input.at(kCurLoopCountParamName)->abstract());
MS_EXCEPTION_IF_NULL(switch_loop_input.at(kCurLoopCountName));
assign_add_one->set_abstract(switch_loop_input.at(kCurLoopCountName)->abstract());
// add AssignAdd op to kernel ref node map
session::AnfWithOutIndex final_pair = std::make_pair(assign_add_one, 0);
session::KernelWithIndex kernel_with_index = AnfAlgo::VisitKernel(AnfAlgo::GetInputNode(assign_add_one, 0), 0);
@ -706,147 +637,6 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptr<session::K
return assign_add_one;
}
bool KernelAdjust::StepLoadCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
auto &dump_json_parser = DumpJsonParser::GetInstance();
bool sink_mode = (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE || kernel_graph_ptr->IsDatasetGraph());
if (!sink_mode && dump_json_parser.async_dump_enabled()) {
InitCtrlInputs(kernel_graph_ptr);
return true;
}
if (!NeedInsertSwitch()) {
return true;
}
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
if (kernel_graph_ptr->is_dynamic_shape()) {
MS_LOG(INFO) << "Skip StepLoadCtrlInputs";
return true;
}
auto input_nodes = kernel_graph_ptr->inputs();
std::vector<tensor::TensorPtr> inputs;
LoadSwitchInputs(&inputs);
std::shared_ptr<std::vector<tensor::TensorPtr>> inputsPtr = std::make_shared<std::vector<tensor::TensorPtr>>(inputs);
kernel_graph_ptr->set_input_ctrl_tensors(inputsPtr);
size_t input_ctrl_size = inputs.size();
// inputs_node:include four ctrl nodes in the back. such as:conv,loop_cnt, ites_loop, zero, one.
// deal four ctrl nodes.
for (size_t i = 0; i < inputs.size(); ++i) {
auto tensor = inputs[i];
MS_EXCEPTION_IF_NULL(tensor);
size_t deal_index = input_nodes.size() - input_ctrl_size + i;
if (deal_index >= input_nodes.size()) {
MS_LOG(EXCEPTION) << "deal_index[" << deal_index << "] out of range";
}
auto input_node = input_nodes[deal_index];
bool need_sync = false;
MS_EXCEPTION_IF_NULL(input_node);
if (input_node->isa<Parameter>()) {
auto pk_node = input_node->cast<ParameterPtr>();
MS_EXCEPTION_IF_NULL(pk_node);
if (tensor->NeedSyncHostToDevice() || !pk_node->has_default()) {
need_sync = true;
}
}
if (need_sync) {
auto pk_node = input_node->cast<ParameterPtr>();
MS_EXCEPTION_IF_NULL(pk_node);
auto device_address = AnfAlgo::GetMutableOutputAddr(pk_node, 0);
MS_EXCEPTION_IF_NULL(device_address);
tensor->set_device_address(device_address);
if (!device_address->SyncHostToDevice(trans::GetRuntimePaddingShape(pk_node, 0),
LongToSize(tensor->data().nbytes()), tensor->data_type(), tensor->data_c(),
tensor->device_info().host_format_)) {
MS_LOG(INFO) << "SyncHostToDevice failed.";
return false;
}
}
tensor->set_sync_status(kNoNeedSync);
}
return true;
}
void KernelAdjust::LoadSwitchInputs(std::vector<tensor::TensorPtr> *inputs) {
MS_LOG(INFO) << "---------------- LoadSwitchInputs---";
MS_EXCEPTION_IF_NULL(inputs);
// current loop count
ShapeVector shp = {1};
tensor::TensorPtr cur_loop_count = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(cur_loop_count);
int32_t *val = nullptr;
val = static_cast<int32_t *>(cur_loop_count->data_c());
MS_EXCEPTION_IF_NULL(val);
*val = 0;
inputs->push_back(cur_loop_count);
// next loop count
tensor::TensorPtr next_loop_count = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(next_loop_count);
val = static_cast<int32_t *>(next_loop_count->data_c());
MS_EXCEPTION_IF_NULL(val);
*val = 0;
inputs->push_back(next_loop_count);
// Epoch in device
tensor::TensorPtr epoch_tensor = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(epoch_tensor);
val = static_cast<int32_t *>(epoch_tensor->data_c());
MS_EXCEPTION_IF_NULL(val);
*val = 0;
inputs->push_back(epoch_tensor);
// total loop count per iter
tensor::TensorPtr iter_loop_tensor = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(iter_loop_tensor);
val = static_cast<int32_t *>(iter_loop_tensor->data_c());
MS_EXCEPTION_IF_NULL(val);
if (ConfigManager::GetInstance().dataset_mode() == DS_NORMAL_MODE) {
MS_LOG(INFO) << "iter_loop_tensor not used in dataset_mode DS_NORMAL_MODE";
*val = 0;
} else {
*val = SizeToInt(LongToSize(ConfigManager::GetInstance().iter_num()));
}
MS_LOG(INFO) << "iter_loop_tensor = " << *val;
inputs->push_back(iter_loop_tensor);
tensor::TensorPtr one_tensor = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(one_tensor);
val = static_cast<int32_t *>(one_tensor->data_c());
MS_EXCEPTION_IF_NULL(val);
*val = 1;
inputs->push_back(one_tensor);
MS_LOG(INFO) << "---------------- LoadSwitchInputs End--";
}
void KernelAdjust::InitCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
MS_LOG(INFO) << " -------------------------- InitCtrlInputs Start-- ";
std::vector<tensor::TensorPtr> inputs;
// prepare default values for CtrlInputs
LoadSwitchInputs(&inputs);
std::shared_ptr<std::vector<tensor::TensorPtr>> inputsPtr = std::make_shared<std::vector<tensor::TensorPtr>>(inputs);
kernel_graph_ptr->set_input_ctrl_tensors(inputsPtr);
for (size_t i = 0; i < inputs.size(); ++i) {
auto tensor = inputs[i];
MS_EXCEPTION_IF_NULL(tensor);
device::DeviceAddressPtr device_address = std::make_shared<device::ascend::AscendDeviceAddress>(
nullptr, LongToSize(tensor->data().nbytes()), tensor->device_info().host_format_, tensor->data_type());
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
auto runtime_instance = KernelRuntimeManager::Instance().GetSingleKernelRuntime(kAscendDevice, device_id);
if (runtime_instance->MallocMem(kStaticMem, LongToSize(tensor->data().nbytes()), device_address) == nullptr) {
MS_LOG(EXCEPTION) << "Cannot alloc address when flag is : " << kStaticMem
<< " , tensor size is : " << tensor->data().nbytes();
}
MS_EXCEPTION_IF_NULL(device_address);
tensor->set_device_address(device_address);
if (!device_address->SyncHostToDevice(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(),
tensor->data_c(), tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for InitCtrlInputs.";
}
}
MS_LOG(INFO) << " ------------------------- InitCtrlInputs End--";
}
#ifndef ENABLE_SECURITY
void KernelAdjust::Profiling(NotNull<session::KernelGraph *> kernel_graph_ptr) {
if (!ascend::ProfilingManager::GetInstance().IsProfiling()) {
@ -1111,5 +901,157 @@ void KernelAdjust::InsertOverflowCheckOperations(const std::shared_ptr<session::
kernel_graph_ptr->set_execution_order(new_execution_order);
}
// device loop control
std::shared_ptr<Tensor> KernelAdjust::CreateTensor(int32_t initial_value) {
ShapeVector shp = {1};
tensor::TensorPtr tensor = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(tensor);
auto val = static_cast<int32_t *>(tensor->data_c());
MS_EXCEPTION_IF_NULL(val);
*val = initial_value;
return tensor;
}
std::shared_ptr<Parameter> KernelAdjust::CreateParameter(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
const string parameter_name) {
ShapeVector shp = {1};
tensor::TensorPtr tensor_ptr = std::make_shared<tensor::Tensor>(kInt32->type_id(), shp);
MS_EXCEPTION_IF_NULL(tensor_ptr);
mindspore::abstract::AbstractBasePtr parameter_abstract_ptr = tensor_ptr->ToAbstract();
if (parameter_abstract_ptr == nullptr) {
MS_LOG(EXCEPTION) << "Create abstract for device loop control failed!";
}
ParameterPtr param = std::make_shared<Parameter>(kernel_graph_ptr);
MS_EXCEPTION_IF_NULL(param);
param->set_name(parameter_name);
param->set_abstract(parameter_abstract_ptr);
ParameterPtr graph_parameter = kernel_graph_ptr->NewParameter(param);
return graph_parameter;
}
void KernelAdjust::InsertDeviceLoopCtrl(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
std::map<std::string, tensor::TensorPtr> device_loop_ctrl_tensors;
std::map<std::string, mindspore::ParameterPtr> device_loop_ctrl_params;
// current loop count
device_loop_ctrl_tensors[kCurLoopCountName] = CreateTensor(0);
device_loop_ctrl_params[kCurLoopCountName] = CreateParameter(kernel_graph_ptr, kCurLoopCountName);
// next loop count tensor
device_loop_ctrl_tensors[kNextLoopCountName] = CreateTensor(0);
device_loop_ctrl_params[kNextLoopCountName] = CreateParameter(kernel_graph_ptr, kNextLoopCountName);
// current epoch count tensor
device_loop_ctrl_tensors[kCurEpochCountName] = CreateTensor(0);
device_loop_ctrl_params[kCurEpochCountName] = CreateParameter(kernel_graph_ptr, kCurEpochCountName);
// constant one tensor
device_loop_ctrl_tensors[kConstOneName] = CreateTensor(1);
device_loop_ctrl_params[kConstOneName] = CreateParameter(kernel_graph_ptr, kConstOneName);
// constant loop num in epoch tensor
int32_t initial_value = 0;
if (NeedInsertSwitch()) {
initial_value = SizeToInt(LongToSize(ConfigManager::GetInstance().iter_num()));
} else {
MS_LOG(INFO) << "Tensor const_loop_num_in_epoch only used in loop sink mode.";
initial_value = 0;
}
MS_LOG(INFO) << "Loop num in epoch is " << initial_value;
device_loop_ctrl_tensors[kConstLoopNumInEpochName] = CreateTensor(initial_value);
device_loop_ctrl_params[kConstLoopNumInEpochName] = CreateParameter(kernel_graph_ptr, kConstLoopNumInEpochName);
kernel_graph_ptr->set_device_loop_ctrl_tensors(device_loop_ctrl_tensors);
kernel_graph_ptr->set_device_loop_ctrl_params(device_loop_ctrl_params);
}
void KernelAdjust::AssignLoopCtrlTensorMem(const session::KernelGraph &kernel_graph, KernelRuntime *runtime_instance,
const string name) {
MS_EXCEPTION_IF_NULL(runtime_instance);
auto device_loop_control_params = kernel_graph.device_loop_control_params();
if (!device_loop_control_params.count(name)) {
MS_LOG(WARNING) << "Can't find Device Loop Control Parameter " << name;
return;
}
auto param = device_loop_control_params.at(name);
MS_EXCEPTION_IF_NULL(param);
DeviceAddressPtr device_address = nullptr;
if (AnfAlgo::OutputAddrExist(param, 0)) {
device_address = AnfAlgo::GetMutableOutputAddr(param, 0);
MS_EXCEPTION_IF_NULL(device_address);
} else {
MS_LOG(INFO) << "Device Loop Control Parameter " << name << " have no address, allocating...";
auto size = AnfAlgo::GetOutputTensorMemSize(param, 0);
auto format = AnfAlgo::GetOutputFormat(param, 0);
auto type_id = AnfAlgo::GetOutputDeviceDataType(param, 0);
device_address = std::make_shared<device::ascend::AscendDeviceAddress>(nullptr, size, format, type_id);
if (runtime_instance->MallocMem(kStaticMem, size, device_address) == nullptr) {
MS_LOG(EXCEPTION) << "Cannot alloc static memory for device loop control parameter " << name
<< " , tensor size is : " << size;
}
MS_EXCEPTION_IF_NULL(device_address);
AnfAlgo::SetOutputAddr(device_address, 0, param.get());
}
auto device_loop_control_tensors = kernel_graph.device_loop_control_tensors();
auto tensor = device_loop_control_tensors.at(name);
MS_EXCEPTION_IF_NULL(tensor);
tensor->set_device_address(device_address);
if (!device_address->SyncHostToDevice(trans::GetRuntimePaddingShape(param, 0), LongToSize(tensor->data().nbytes()),
tensor->data_type(), tensor->data_c(), tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for device loop control parameter " << name;
}
}
void KernelAdjust::AssignLoopCtrlMemory(const session::KernelGraph &kernel_graph_ptr) {
MS_LOG(INFO) << "Assign device loop control memory";
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
auto runtime_instance = KernelRuntimeManager::Instance().GetSingleKernelRuntime(kAscendDevice, device_id);
MS_EXCEPTION_IF_NULL(runtime_instance);
AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kCurLoopCountName);
AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kNextLoopCountName);
AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kCurEpochCountName);
AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kConstOneName);
AssignLoopCtrlTensorMem(kernel_graph_ptr, runtime_instance, kConstLoopNumInEpochName);
}
void KernelAdjust::SetDeviceLoopCtrlTensor(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
const std::string name, int32_t value) {
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
auto device_loop_control_tensors = kernel_graph_ptr->device_loop_control_tensors();
if (!device_loop_control_tensors.count(name)) {
MS_LOG(WARNING) << "Can't find Device Loop Control Tensor " << name;
return;
}
auto tensor = device_loop_control_tensors.at(name);
MS_EXCEPTION_IF_NULL(tensor);
auto *cur_val = static_cast<int32_t *>(tensor->data_c());
MS_EXCEPTION_IF_NULL(cur_val);
*cur_val = value;
tensor->set_sync_status(kNeedSyncHostToDevice);
auto device_address = tensor->device_address();
MS_EXCEPTION_IF_NULL(device_address);
if (!device_address->SyncHostToDevice(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(),
tensor->data_c(), tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for device loop control parameter " << name;
}
}
void KernelAdjust::LoadDeviceLoopCtrlParameters(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
MS_EXCEPTION_IF_NULL(kernel_graph_ptr);
MS_LOG(INFO) << "Load device loop control data";
SetDeviceLoopCtrlTensor(kernel_graph_ptr, kCurLoopCountName, 0);
SetDeviceLoopCtrlTensor(kernel_graph_ptr, kNextLoopCountName, 0);
SetDeviceLoopCtrlTensor(kernel_graph_ptr, kCurEpochCountName, SizeToInt(kernel_graph_ptr->current_epoch()));
kernel_graph_ptr->set_current_epoch(kernel_graph_ptr->current_epoch() + 1);
}
} // namespace device
} // namespace mindspore

View File

@ -36,13 +36,14 @@ using mindspore::device::ascend::ProfilingTraceInfo;
using mindspore::device::ascend::ProfilingUtils;
#endif
namespace mindspore {
constexpr auto kCurLoopCountParamName = "cur_loop_count";
constexpr auto kNextLoopCountParamName = "next_loop_count";
constexpr auto kIterLoopParamName = "iter_loop";
constexpr auto kOneParamName = "one";
constexpr auto kEpochParamName = "loop_epoch";
// device loop control
constexpr auto kCurLoopCountName = "current_loop_count";
constexpr auto kNextLoopCountName = "next_loop_count";
constexpr auto kCurEpochCountName = "current_epoch_count";
constexpr auto kConstOneName = "const_one";
constexpr auto kConstLoopNumInEpochName = "const_loop_num_in_epoch";
constexpr auto kStreamNeedActivedFirst = "stream_need_active_first";
constexpr uint32_t kSecondStreamSwitchLabel = 2;
enum StreamSwitchKind {
kFpBpStreamSwitch = 0,
kGetNextStreamSwitch = 1,
@ -57,10 +58,13 @@ class KernelAdjust {
static KernelAdjust instance;
return instance;
}
// device loop control
void InsertDeviceLoopCtrl(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
void AssignLoopCtrlMemory(const session::KernelGraph &kernel_graph_ptr);
void LoadDeviceLoopCtrlParameters(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
void InsertOverflowCheckOperations(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
void InsertSwitchLoop(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
bool StepLoadCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
void ProcessLoopSink(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
#ifndef ENABLE_SECURITY
void Profiling(NotNull<session::KernelGraph *> kernel_graph_ptr);
#endif
@ -82,8 +86,6 @@ class KernelAdjust {
void ReorderGetNext(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
CNodePtr CreateRecvApplyKernel(const std::shared_ptr<session::KernelGraph> &graph_ptr, uint32_t event_id);
CNodePtr CreateSendApplyKernel(const std::shared_ptr<session::KernelGraph> &graph_ptr, uint32_t event_id);
void CreateSwitchOpParameters(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
std::map<std::string, mindspore::ParameterPtr> *switch_loop_input);
CNodePtr CreateStreamSwitchOp(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
const std::map<std::string, mindspore::ParameterPtr> &switch_loop_input,
StreamSwitchKind kind);
@ -97,17 +99,12 @@ class KernelAdjust {
bool cur_loop);
kernel::KernelBuildInfo::KernelBuildInfoBuilder CreateMngKernelBuilder(const std::vector<std::string> &formats,
const std::vector<TypeId> &type_ids);
void LoadSwitchInputs(std::vector<tensor::TensorPtr> *inputs);
void InitCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
#ifndef ENABLE_SECURITY
void InsertProfilingKernel(const ProfilingTraceInfo &profiling_trace_info,
NotNull<session::KernelGraph *> kernel_graph_ptr);
#endif
bool ExistIndependent(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
bool ExistGetNext(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
void InsertSwitchLoopInput(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
const std::map<std::string, mindspore::ParameterPtr> &switch_loop_input);
void InsertGetNextLoopStreamSwitch(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
std::vector<CNodePtr> *exec_order, uint32_t *getnext_switch_stream_id,
uint32_t *getnext_stream_id,
@ -158,6 +155,13 @@ class KernelAdjust {
void InsertFpBpAndEosLoopStreamActive(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
std::vector<CNodePtr> *exec_order,
const std::vector<uint32_t> &fpbp_active_streams);
void SetDeviceLoopCtrlTensor(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr, const string name,
int32_t value);
void AssignLoopCtrlTensorMem(const session::KernelGraph &kernel_graph, KernelRuntime *runtime_instance,
const string name);
std::shared_ptr<Tensor> CreateTensor(int32_t initial_value);
std::shared_ptr<Parameter> CreateParameter(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr,
const string parameter_name);
};
} // namespace device
} // namespace mindspore

12
tests/ut/cpp/stub/tasksink/ascend_stream_assign_stub.cc Executable file → Normal file
View File

@ -31,9 +31,17 @@ void AscendStreamAssign::GetWaitStreams(vector<uint32_t> *wait_active_stream_lis
void AscendStreamAssign::GetHcomStreams(std::vector<uint32_t> *streams) { return; }
} // namespace ascend
void KernelAdjust::InsertSwitchLoop(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) { return; }
bool KernelAdjust::StepLoadCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) { return true; }
void KernelAdjust::InsertDeviceLoopCtrl(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) { return; }
void KernelAdjust::AssignLoopCtrlMemory(const session::KernelGraph &kernel_graph_ptr) { return; }
void KernelAdjust::LoadDeviceLoopCtrlParameters(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
return;
}
bool KernelAdjust::NeedInsertSwitch() { return true; }
void KernelAdjust::ProcessLoopSink(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) { return; }
#ifndef ENABLE_SECURITY
void KernelAdjust::Profiling(NotNull<session::KernelGraph *> kernel_graph_ptr) { return; }
#endif