!23568 make async dump when sink_mode=False use input_ctrl_tensor like sink_mode=True

Merge pull request !23568 from john_tzanakakis/jt_bug_fixes
This commit is contained in:
i-robot 2021-09-17 01:20:16 +00:00 committed by Gitee
commit dc84031179
7 changed files with 110 additions and 90 deletions

View File

@ -237,8 +237,16 @@ size_t LoadCtrlInputTensor(const std::shared_ptr<KernelGraph> &graph, std::vecto
*cur_val = 0;
cur_loop_tensor->set_sync_status(kNeedSyncHostToDevice);
// set loop_count to zero
MS_EXCEPTION_IF_NULL(inputs);
inputs->push_back(cur_loop_tensor);
if (inputs) {
inputs->push_back(cur_loop_tensor);
} else {
auto device_address = cur_loop_tensor->device_address();
if (!device_address->SyncHostToDevice(cur_loop_tensor->shape(), LongToSize(cur_loop_tensor->data().nbytes()),
cur_loop_tensor->data_type(), cur_loop_tensor->data_c(),
cur_loop_tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for cur_loop_tensor needed for async dump.";
}
}
// update next loop tensor to 0 per iterator
auto next_loop_tensor = (*inputs_params)[kLoopSinkNextLoopIndex];
@ -248,7 +256,16 @@ size_t LoadCtrlInputTensor(const std::shared_ptr<KernelGraph> &graph, std::vecto
*next_val = 0;
next_loop_tensor->set_sync_status(kNeedSyncHostToDevice);
// set loop_count to zero
inputs->push_back(next_loop_tensor);
if (inputs) {
inputs->push_back(next_loop_tensor);
} else {
auto device_address = next_loop_tensor->device_address();
if (!device_address->SyncHostToDevice(next_loop_tensor->shape(), LongToSize(next_loop_tensor->data().nbytes()),
next_loop_tensor->data_type(), next_loop_tensor->data_c(),
next_loop_tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for next_loop_tensor needed for async dump.";
}
}
auto epoch_tensor = (*inputs_params)[kLoopSinkEpochIndex];
MS_EXCEPTION_IF_NULL(epoch_tensor);
@ -256,12 +273,33 @@ size_t LoadCtrlInputTensor(const std::shared_ptr<KernelGraph> &graph, std::vecto
MS_EXCEPTION_IF_NULL(epoch_val);
*epoch_val = SizeToInt(graph->current_epoch());
epoch_tensor->set_sync_status(kNeedSyncHostToDevice);
inputs->push_back(epoch_tensor);
if (inputs) {
inputs->push_back(epoch_tensor);
} else {
auto device_address = epoch_tensor->device_address();
if (!device_address->SyncHostToDevice(epoch_tensor->shape(), LongToSize(epoch_tensor->data().nbytes()),
epoch_tensor->data_type(), epoch_tensor->data_c(),
epoch_tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for epoch_tensor needed for async dump.";
}
}
MS_LOG(DEBUG) << "Load epoch_val:" << *epoch_val;
graph->set_current_epoch(graph->current_epoch() + 1);
return inputs_params->size();
}
void UpdateCtrlInputTensor(const std::shared_ptr<KernelGraph> &graph, std::vector<tensor::TensorPtr> *inputs,
size_t *input_ctrl_size) {
if (graph->input_ctrl_tensors()) {
bool sink_mode = (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE || graph->isDatasetGraph());
if (sink_mode) {
*input_ctrl_size = LoadCtrlInputTensor(graph, inputs);
} else {
LoadCtrlInputTensor(graph, nullptr);
}
}
}
bool NeedMemcpyInDevice(const device::DeviceAddressPtr &src_device_addr,
const device::DeviceAddressPtr &dst_device_addr) {
MS_EXCEPTION_IF_NULL(dst_device_addr);
@ -405,9 +443,7 @@ void AscendSession::LoadInputData(const std::shared_ptr<KernelGraph> &kernel_gra
size_t input_ctrl_size = kLoopSinkTensorNum;
uint32_t device_memcpy_nums = 0;
MS_EXCEPTION_IF_NULL(kernel_graph);
if (kernel_graph->input_ctrl_tensors()) {
input_ctrl_size = LoadCtrlInputTensor(kernel_graph, &inputs);
}
UpdateCtrlInputTensor(kernel_graph, &inputs, &input_ctrl_size);
auto &input_nodes = kernel_graph->input_nodes();
if ((inputs.size() + input_ctrl_size) - kLoopSinkTensorNum != input_nodes.size()) {
MS_LOG(EXCEPTION) << "Tensor input:" << inputs.size() << " is not equal graph inputs:" << input_nodes.size()

View File

@ -1371,6 +1371,18 @@ void KernelGraph::SetOptimizerFlag() {
}
}
bool KernelGraph::isDatasetGraph() const {
// check if there is GetNext or InitDataSetQueue node
const auto &nodes = execution_order_;
for (const auto &node : nodes) {
auto node_name = AnfAlgo::GetCNodeName(node);
if (node_name == prim::kPrimInitDataSetQueue->name()) {
return true;
}
}
return false;
}
std::string KernelGraph::ToString() const { return std::string("kernel_graph_").append(std::to_string(graph_id_)); }
KernelGraph::~KernelGraph() {

View File

@ -373,6 +373,8 @@ class KernelGraph : public FuncGraph {
void set_is_need_gil(bool flag) { is_need_gil_ = flag; }
bool is_need_gil() { return is_need_gil_; }
bool isDatasetGraph() const;
private:
// remove value node form graph
bool RemoveValueNodeFromGraph(const ValueNodePtr &value_node);

View File

@ -310,32 +310,10 @@ void E2eDump::UpdateIterDumpSetup(const session::KernelGraph *graph, bool sink_m
void E2eDump::DumpSetup(const session::KernelGraph *graph, uint32_t rank_id) {
auto &dump_json_parser = DumpJsonParser::GetInstance();
uint32_t cur_iter = dump_json_parser.cur_dump_iter();
bool sink_mode = (ConfigManager::GetInstance().dataset_mode() || E2eDump::isDatasetGraph(graph));
if (dump_json_parser.async_dump_enabled() || dump_json_parser.e2e_dump_enabled()) {
UpdateIterDumpSetup(graph, sink_mode);
MS_LOG(DEBUG) << "sink_mode = " << sink_mode;
}
if (dump_json_parser.async_dump_enabled() && dump_json_parser.IsDumpIter(cur_iter) && !sink_mode) {
auto zero_dir_dump_path =
dump_json_parser.path() + "/rank_" + std::to_string(rank_id) + "/_/" + std::to_string(graph->graph_id()) + "/0";
auto root_cur_iter_dump_path = dump_json_parser.path() + "/rank_" + std::to_string(rank_id) + "/" +
dump_json_parser.net_name() + "/" + std::to_string(graph->graph_id());
auto cur_iter_dump_path = root_cur_iter_dump_path + "/" + std::to_string(cur_iter);
MS_LOG(INFO) << "zero_dir_dump_path: " << zero_dir_dump_path;
MS_LOG(INFO) << "root_cur_iter_dump_path: " << root_cur_iter_dump_path;
MS_LOG(INFO) << "cur_iter_dump_path: " << cur_iter_dump_path;
// create cur_iter_dump_path dirs
auto dir_path = FileUtils::CreateNotExistDirs(root_cur_iter_dump_path);
if (!dir_path.has_value()) {
MS_LOG(EXCEPTION) << "Failed at CreateNotExistDirs for " << root_cur_iter_dump_path;
}
}
}
@ -344,7 +322,6 @@ void E2eDump::DumpData(const session::KernelGraph *graph, uint32_t rank_id, cons
bool success = false;
auto &dump_json_parser = DumpJsonParser::GetInstance();
uint32_t graph_id = graph->graph_id();
bool sink_mode = (ConfigManager::GetInstance().dataset_mode() || E2eDump::isDatasetGraph(graph));
if (dump_json_parser.GetIterDumpFlag()) {
MS_LOG(INFO) << "Start e2e dump. Current iteration is " << dump_json_parser.cur_dump_iter();
@ -354,65 +331,13 @@ void E2eDump::DumpData(const session::KernelGraph *graph, uint32_t rank_id, cons
DumpInput(graph, dump_path, debugger);
DumpOutput(graph, dump_path, debugger);
DumpParametersAndConst(graph, dump_path, debugger);
success = true;
} else if (dump_json_parser.async_dump_enabled() && !sink_mode) {
uint32_t current_iter = dump_json_parser.cur_dump_iter();
auto zero_dir_dump_path =
dump_json_parser.path() + "/rank_" + std::to_string(rank_id) + "/_/" + std::to_string(graph->graph_id()) + "/0";
auto cur_iter_dump_path = dump_json_parser.path() + "/rank_" + std::to_string(rank_id) + "/" +
dump_json_parser.net_name() + "/" + std::to_string(graph->graph_id()) + "/" +
std::to_string(current_iter);
MS_LOG(INFO) << "zero_dir_dump_path: " << zero_dir_dump_path;
MS_LOG(INFO) << "cur_iter_dump_path: " << cur_iter_dump_path;
if (dump_json_parser.IsDumpIter(current_iter)) {
// create actual dir for iteration in final dump dir
auto dir_path = FileUtils::CreateNotExistDirs(cur_iter_dump_path);
if (!dir_path.has_value()) {
MS_LOG(EXCEPTION) << "failed at CreateNotExistDirs for " << cur_iter_dump_path;
}
// test if zero_dir_dump_path exists (may not if there was
// no data dumped, for example for an overflow dump)
MS_LOG(INFO) << "Check " << zero_dir_dump_path << " exists.";
bool dir_exists = DumpDirExists(zero_dir_dump_path);
if (dir_exists) {
// move contents from active dump dir to final dump dir
MS_LOG(INFO) << "Move contents from " << zero_dir_dump_path << " to " << cur_iter_dump_path;
bool move_files = MoveDumpFiles(zero_dir_dump_path, cur_iter_dump_path);
if (!move_files) {
MS_LOG(INFO) << "Issue with moving contents.";
}
} else {
MS_LOG(INFO) << "active dump dir, not created yet";
}
} else {
// test if zero_dir_dump_path exists (may not if there was
// no data dumped, for example for an overflow dump)
MS_LOG(INFO) << "Check " << zero_dir_dump_path << " exists.";
bool dir_exists = DumpDirExists(zero_dir_dump_path);
if (dir_exists) {
// delete contents from active dump dir
MS_LOG(INFO) << "Delete contents from active dump dir " << zero_dir_dump_path;
bool delete_contents = DeleteDirContents(zero_dir_dump_path);
if (!delete_contents) {
MS_LOG(EXCEPTION) << "Ascend runtime has changed the dump dir structure!!!";
}
} else {
MS_LOG(INFO) << "active dump dir, not created yet";
}
}
success = true;
}
if (success) {
MS_LOG(DEBUG) << "Dump Data completed!";
MS_LOG(DEBUG) << "E2eDump Dump Data completed!";
} else {
MS_LOG(DEBUG) << "Dump has not occurred!";
MS_LOG(DEBUG) << "E2eDump Dump has not occurred!";
}
}

View File

@ -121,15 +121,16 @@ void DataDumper::LoadDumpInfo() {
}
void DataDumper::SetOpMappingInfo(NotNull<aicpu::dump::OpMappingInfo *> dump_info) const {
MS_LOG(INFO) << "SetOpMappinglnfo Start.";
auto context_ptr = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(context_ptr);
MS_EXCEPTION_IF_NULL(kernel_graph_);
auto dump_path = DumpJsonParser::GetInstance().path();
const auto &input_ctrl_tensors = kernel_graph_->input_ctrl_tensors();
constexpr size_t kLoopSinkCtrlTensorNum = 3; // cur step, cur epoch, steps per epoch
bool data_sink_mode = input_ctrl_tensors != nullptr && input_ctrl_tensors->size() >= kLoopSinkCtrlTensorNum;
std::string net_name = (data_sink_mode ? DumpJsonParser::GetInstance().net_name() : "_");
std::string iteration = (data_sink_mode ? DumpJsonParser::GetInstance().iteration_string() : "0");
bool valid_ctrl_tensors = input_ctrl_tensors != nullptr && input_ctrl_tensors->size() >= kLoopSinkCtrlTensorNum;
std::string net_name = DumpJsonParser::GetInstance().net_name();
std::string iteration = DumpJsonParser::GetInstance().iteration_string();
if (dump_path.empty()) {
MS_LOG(EXCEPTION) << "Dump path invalid";
@ -162,8 +163,8 @@ void DataDumper::SetOpMappingInfo(NotNull<aicpu::dump::OpMappingInfo *> dump_inf
dump_info->set_model_id(graph_id);
dump_info->set_flag(kAicpuLoadFlag);
if (!data_sink_mode) {
MS_LOG(INFO) << "[DataDump] Not data sink mode, input_ctrl_tensor";
if (!valid_ctrl_tensors) {
MS_LOG(INFO) << "[DataDump] input_ctrl_tensors not valid.";
return;
}
const auto &current_step_tensor = input_ctrl_tensors->at(kCurrentStepTensorIndex);
@ -188,6 +189,7 @@ void DataDumper::SetOpMappingInfo(NotNull<aicpu::dump::OpMappingInfo *> dump_inf
} else {
MS_LOG(INFO) << "Invalid ctrl tensor device address";
}
MS_LOG(INFO) << "SetOpMappinglnfo End.";
}
bool DataDumper::KernelNeedDump(const CNodePtr &kernel) const {

View File

@ -707,6 +707,12 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptr<session::K
}
bool KernelAdjust::StepLoadCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
auto &dump_json_parser = DumpJsonParser::GetInstance();
bool sink_mode = (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE || kernel_graph_ptr->isDatasetGraph());
if (!sink_mode && dump_json_parser.async_dump_enabled()) {
InitCtrlInputs(kernel_graph_ptr);
return true;
}
if (!NeedInsertSwitch()) {
return true;
}
@ -792,7 +798,12 @@ void KernelAdjust::LoadSwitchInputs(std::vector<tensor::TensorPtr> *inputs) {
MS_EXCEPTION_IF_NULL(iter_loop_tensor);
val = static_cast<int32_t *>(iter_loop_tensor->data_c());
MS_EXCEPTION_IF_NULL(val);
*val = SizeToInt(LongToSize(ConfigManager::GetInstance().iter_num()));
if (ConfigManager::GetInstance().dataset_mode() == DS_NORMAL_MODE) {
MS_LOG(INFO) << "iter_loop_tensor not used in dataset_mode DS_NORMAL_MODE";
*val = 0;
} else {
*val = SizeToInt(LongToSize(ConfigManager::GetInstance().iter_num()));
}
MS_LOG(INFO) << "iter_loop_tensor = " << *val;
inputs->push_back(iter_loop_tensor);
@ -806,6 +817,36 @@ void KernelAdjust::LoadSwitchInputs(std::vector<tensor::TensorPtr> *inputs) {
MS_LOG(INFO) << "---------------- LoadSwitchInputs End--";
}
void KernelAdjust::InitCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr) {
MS_LOG(INFO) << " -------------------------- InitCtrlInputs Start-- ";
std::vector<tensor::TensorPtr> inputs;
// prepare default values for CtrlInputs
LoadSwitchInputs(&inputs);
std::shared_ptr<std::vector<tensor::TensorPtr>> inputsPtr = std::make_shared<std::vector<tensor::TensorPtr>>(inputs);
kernel_graph_ptr->set_input_ctrl_tensors(inputsPtr);
for (size_t i = 0; i < inputs.size(); ++i) {
auto tensor = inputs[i];
MS_EXCEPTION_IF_NULL(tensor);
device::DeviceAddressPtr device_address = std::make_shared<device::ascend::AscendDeviceAddress>(
nullptr, LongToSize(tensor->data().nbytes()), tensor->device_info().host_format_, tensor->data_type());
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
auto runtime_instance = KernelRuntimeManager::Instance().GetSingleKernelRuntime(kAscendDevice, device_id);
if (runtime_instance->MallocMem(kStaticMem, LongToSize(tensor->data().nbytes()), device_address) == nullptr) {
MS_LOG(EXCEPTION) << "Cannot alloc address when flag is : " << kStaticMem
<< " , tensor size is : " << tensor->data().nbytes();
}
MS_EXCEPTION_IF_NULL(device_address);
tensor->set_device_address(device_address);
if (!device_address->SyncHostToDevice(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(),
tensor->data_c(), tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed for InitCtrlInputs.";
}
}
MS_LOG(INFO) << " ------------------------- InitCtrlInputs End--";
}
#ifndef ENABLE_SECURITY
void KernelAdjust::Profiling(NotNull<session::KernelGraph *> kernel_graph_ptr) {
if (!ascend::ProfilingManager::GetInstance().IsProfiling()) {

View File

@ -28,6 +28,7 @@
#include "backend/session/session_context.h"
#include "ir/tensor.h"
#include "runtime/device/kernel_info.h"
#include "runtime/device/kernel_runtime_manager.h"
#ifndef ENABLE_SECURITY
#include "runtime/device/ascend/profiling/profiling_utils.h"
@ -97,6 +98,7 @@ class KernelAdjust {
kernel::KernelBuildInfo::KernelBuildInfoBuilder CreateMngKernelBuilder(const std::vector<std::string> &formats,
const std::vector<TypeId> &type_ids);
void LoadSwitchInputs(std::vector<tensor::TensorPtr> *inputs);
void InitCtrlInputs(const std::shared_ptr<session::KernelGraph> &kernel_graph_ptr);
#ifndef ENABLE_SECURITY
void InsertProfilingKernel(const ProfilingTraceInfo &profiling_trace_info,
NotNull<session::KernelGraph *> kernel_graph_ptr);