Only data for data processing is collected.

This commit is contained in:
liuchuting 2023-01-28 11:30:22 +08:00
parent 6d49821c11
commit 6fea61ba01
9 changed files with 135 additions and 114 deletions

View File

@ -82,6 +82,7 @@ void RegProfiler(py::module *m) {
.def("stop", &Profiler::Stop, "stop")
.def("finalize", &Profiler::Finalize, "finalize")
.def("sync_enable", &Profiler::SyncEnable, py::arg("enable_flag"))
.def("data_process_enable", &Profiler::DataProcessEnable, py::arg("enable_flag"))
.def("step_profiling_enable", &Profiler::StepProfilingEnable, py::arg("enable_flag"),
"enable or disable step profiling");
}

View File

@ -84,7 +84,9 @@ void AscendProfiler::Init(const std::string &profiling_path, uint32_t device_id,
// Init ErrorManager instance in order to get error msg reported by Ascend.
(void)ErrorManagerAdapter::Init();
(void)ProfilingManager::GetInstance().InitProfiling(profiling_path, device_id);
if (options["op_time"] == "on") {
(void)ProfilingManager::GetInstance().InitProfiling(profiling_path, device_id);
}
MemoryProfiling::GetInstance().SetMemoryProfilingInitialize(profiling_options_);
@ -111,6 +113,10 @@ uint64_t AscendProfiler::GetOptionsMask() const {
mask |= ACL_PROF_TASK_TIME;
}
if (options_json["training_trace"] == "on") {
mask |= ACL_PROF_TRAINING_TRACE;
}
if (options_json["aicpu"] == "on") {
mask |= ACL_PROF_AICPU;
}

View File

@ -615,6 +615,7 @@ void GPUProfiler::ClearInst() {
is_init_ = false;
enable_flag_ = false;
sync_enable_flag_ = true;
data_process_enable_ = false;
init_flag_ = false;
enable_flag_ = false;
has_find_ = false;

View File

@ -125,6 +125,7 @@ class GPU_EXPORT GPUProfiler : public Profiler {
void StopCUPTI();
void StepProfilingEnable(const bool enable_flag) override;
bool GetSyncEnableFlag() const { return sync_enable_flag_; }
bool GetDataProcessEnableFlag() const { return data_process_enable_; }
void EventHandleProcess(CUpti_CallbackId cbid, const CUpti_CallbackData *cbdata, const std::string &typestring,
uint64_t startTimestamp, uint64_t endTimestamp);
void CUPTIAPI AllocBuffer(uint8_t **buffer, size_t *size, size_t *maxNumRecords);

View File

@ -104,7 +104,7 @@ bool DatasetIteratorKernelMod::ReadDevice(std::vector<DataQueueItem> *data) {
int repeat = 0;
while (true) {
#ifndef ENABLE_SECURITY
profiling_enable_ = profiler_inst->GetEnableFlag();
profiling_enable_ = profiler_inst->GetDataProcessEnableFlag();
if (profiling_enable_) {
start_time_stamp = profiling_op_->GetTimeStamp();
queue_size = DataQueueMgr::GetInstance().Size(queue_name_);

View File

@ -41,6 +41,11 @@ void Profiler::SyncEnable(const bool enable_flag) {
sync_enable_flag_ = enable_flag;
}
void Profiler::DataProcessEnable(const bool enable_flag) {
MS_LOG(INFO) << "Profiler data process enable flag:" << enable_flag;
data_process_enable_ = enable_flag;
}
bool Profiler::Register(const std::string &name, const std::shared_ptr<Profiler> &instance) {
if (GetInstanceMap().find(name) != GetInstanceMap().end()) {
MS_LOG(WARNING) << name << " has been registered.";

View File

@ -102,6 +102,7 @@ class BACKEND_EXPORT Profiler {
}
bool GetParallelStrategyEnableFlag() const { return is_parallel_strategy; }
void SyncEnable(const bool enable_flag);
void DataProcessEnable(const bool enable_flag);
protected:
void SetRunTimeData(const std::string &op_name, const float time_elapsed);
@ -129,6 +130,7 @@ class BACKEND_EXPORT Profiler {
uint32_t iter_end_op_index_ = 0;
uint32_t fp_start_op_index_ = 1;
bool sync_enable_flag_ = true;
bool data_process_enable_ = false;
std::string op_type_ = "GetNext";
private:

View File

@ -165,16 +165,6 @@ class Profiler:
_ascend_job_id = ""
def __init__(self, **kwargs):
self._msprof_enable = os.getenv("PROFILER_SAMPLECONFIG")
if self._msprof_enable:
return
if kwargs.get("env_enable"):
self._profiler_init(kwargs)
return
if Profiler._has_initialized:
msg = "Do not init twice in the profiler."
raise RuntimeError(msg)
Profiler._has_initialized = True
self._dev_id = None
self._cpu_profiler = None
self._gpu_profiler = None
@ -187,7 +177,7 @@ class Profiler:
self._job_id_env = None
self._filt_optype_names = ''
self._output_path = ''
self._rank_size = 0
self._rank_size = 1
self._rank_id = 0
self._ascend_profiler = None
self._timeline_size_limit_byte = 500 * 1024 * 1024 # 500MB
@ -197,10 +187,7 @@ class Profiler:
self._aicore_metrics_id = 0
self._l2_cache = "off"
self._data_process = True
self._parser_kwargs(kwargs)
# get device_id and device_target
self._get_devid_rankid_and_devtarget()
self._get_output_path(kwargs)
self._op_time = True
self._profile_communication = False
self._has_started = False
self._has_started_twice = False
@ -209,6 +196,20 @@ class Profiler:
self._sync_enable = True
self._stop_time = 0
self._dynamic_status = False
self._msprof_enable = os.getenv("PROFILER_SAMPLECONFIG")
if self._msprof_enable:
return
if kwargs.get("env_enable"):
self._profiler_init(kwargs)
return
if Profiler._has_initialized:
msg = "Do not init twice in the profiler."
raise RuntimeError(msg)
Profiler._has_initialized = True
self._parser_kwargs(kwargs)
# get device_id and device_target
self._get_devid_rankid_and_devtarget()
self._get_output_path(kwargs)
self._decide_device_target(kwargs)
if self.start_profile:
self.start()
@ -402,7 +403,9 @@ class Profiler:
if self._device_target and self._device_target == DeviceTarget.GPU.value:
if self._data_process:
self._md_profiler.start()
self._gpu_profiler.step_profiling_enable(True)
self._gpu_profiler.data_process_enable(True)
if self._op_time:
self._gpu_profiler.step_profiling_enable(True)
elif self._device_target and self._device_target == DeviceTarget.ASCEND.value:
if self._data_process:
self._md_profiler.start()
@ -466,11 +469,7 @@ class Profiler:
def _profiler_init(self, kwargs):
"""Initialize variables when profiler is enabled by environment variables."""
options = kwargs.get("env_enable")
self._filt_optype_names = ''
self._has_started = True
self._stop_time = 0
self._is_heterogeneous = False
self._rank_size = 1
self._start_time = options.get("start_time")
self._output_path = options.get('file_output_path')
self._profile_memory = options.get('profile_memory')
@ -593,14 +592,15 @@ class Profiler:
"output": self._output_path,
"fp_point": fp_point,
"bp_point": bp_point,
"training_trace": "on",
"task_trace": "on",
"training_trace": "on" if self._op_time else "off",
"task_trace": "on" if self._op_time else "off",
"aic_metrics": AICORE_METRICS_DICT.get(self._aicore_metrics_id, "ArithmeticUtilization"),
"aicpu": "on",
"aicpu": "on" if self._data_process or self._op_time else "off",
"profile_memory": profile_memory,
"hccl": profiler_communication,
"l2_cache": self._l2_cache,
"parallel_strategy": "on" if self._parallel_strategy else "off",
"op_time": "on" if self._op_time else "off"
}
return profiling_options
@ -742,19 +742,46 @@ class Profiler:
logger.info("No need to stop profiler because profiler has been stopped.")
self._ascend_graph_analyse()
# Call MSAdvisor function
def _ascend_timeline_analyse(self, aicpu_data_parser, optime_parser, source_path):
"""Analyse timeline info."""
try:
msadvisor = Msadvisor(self._get_profiling_job_id(), self._rank_id, self._output_path)
logger.info("MSAdvisor starts running.")
msadvisor.analyse()
except (ProfilerFileNotFoundException, ValueError, FileNotFoundError, OSError) as err:
if context.get_context("mode") == context.PYNATIVE_MODE:
logger.warning("Pynative mode does not support MSAdvisor analyzer currently.")
else:
logger.warning("MSAdvisor running failed. %s", err)
self._analyse_timeline(aicpu_data_parser, optime_parser, source_path)
except (ProfilerIOException, ProfilerFileNotFoundException, RuntimeError) as err:
logger.warning('Fail to write timeline data: %s', err)
finally:
pass
def _ascend_step_trace_analyse(self, source_path, framework_parser):
"""Analyse step trace info."""
points, is_training_mode_flag = None, False
try:
if self._is_support_step_info_collect() and not self._dynamic_status:
points, is_training_mode_flag = self._analyse_step_trace(source_path, framework_parser)
except ProfilerException as err:
logger.warning(err.message)
finally:
pass
return points, is_training_mode_flag
def _ascend_dynamic_net_analyse(self):
"""Analyse dynamic shape network info."""
if self._profile_communication:
raise RuntimeError(
"The profile_communication parameter cannot be set on the dynamic shape network.")
if self._profile_memory:
raise RuntimeError("The profile_memory parameter cannot be set on the dynamic shape network.")
logger.warning(
"[Profiler]Dynamic Shape network does not support collecting step trace performance data currently.")
dynamic_parser = DynamicFrameWorkParser(self._output_path, self._rank_id)
dynamic_parser.write_dynamic_shape_data()
def _ascend_flops_analyse(self, source_path, op_task_dict, is_training_mode_flag):
"""Get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt."""
flops_parser = FlopsParser(source_path, self._output_path, op_task_dict, self._dev_id, self._rank_id,
is_training_mode_flag)
logger.info("Profiling: analyzing the operation FLOPs.")
flops_parser.execute()
def _ascend_graph_memory_analyse(self, points):
"""Analyse memory usage info."""
if not self._profile_memory:
@ -785,6 +812,20 @@ class Profiler:
finally:
pass
def _ascend_graph_msadvisor_analyse(self, job_id):
"""Call MSAdvisor function."""
try:
msadvisor = Msadvisor(job_id, self._rank_id, self._output_path)
logger.info("MSAdvisor starts running.")
msadvisor.analyse()
except (ProfilerFileNotFoundException, ValueError, FileNotFoundError, OSError) as err:
if context.get_context("mode") == context.PYNATIVE_MODE:
logger.warning("Pynative mode does not support MSAdvisor analyzer currently.")
else:
logger.warning("MSAdvisor running failed. %s", err)
finally:
pass
def _ascend_graph_op_analyse(self, source_path):
"""
Ascend graph model hwts analyse.
@ -828,17 +869,28 @@ class Profiler:
logger.info("Profiling: analyzing the data preprocess data.")
aicpu_data_parser.execute()
# analyse op compute time info
try:
self._analyser_op_info()
except ProfilerException as err:
logger.warning(err.message)
finally:
pass
return [framework_parser, aicpu_data_parser, optime_parser, op_task_dict]
def _ascend_graph_minddata_analyse(self, source_path):
def _minddata_analyse(self, source_path):
"""Analyse mindadata for ascend graph model."""
if not self._data_process:
return
store_id = self._rank_id if self._device_target == DeviceTarget.ASCEND.value else self._dev_id
# Parsing minddata AICPU profiling
logger.info("Profiling: analyzing the minddata AICPU data.")
MinddataParser.execute(source_path, self._output_path, self._rank_id)
if self._device_target == DeviceTarget.ASCEND.value:
logger.info("Profiling: analyzing the minddata AICPU data.")
MinddataParser.execute(source_path, self._output_path, store_id)
# parse minddata pipeline operator and queue
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._rank_id, self._output_path)
pipeline_parser = MinddataPipelineParser(self._output_path, store_id, self._output_path)
logger.info("Profiling: analyzing the minddata pipeline operator and queue.")
pipeline_parser.parse()
except ProfilerException as err:
@ -848,7 +900,7 @@ class Profiler:
# Analyze minddata information
try:
md_analyzer = MinddataProfilingAnalyzer(self._output_path, self._rank_id, self._output_path)
md_analyzer = MinddataProfilingAnalyzer(self._output_path, store_id, self._output_path)
logger.info("Profiling: analyzing the minddata information.")
md_analyzer.analyze()
except ProfilerException as err:
@ -865,59 +917,18 @@ class Profiler:
self._check_output_path(output_path=self._output_path)
source_path = os.path.join(self._output_path, job_id)
framework_parser, aicpu_data_parser, optime_parser, op_task_dict = self._ascend_graph_op_analyse(source_path)
self._ascend_graph_minddata_analyse(source_path)
# analyse op compute time info
try:
logger.info("Profiling: analyzing the operation compute time.")
self._analyser_op_info()
except ProfilerException as err:
logger.warning(err.message)
finally:
pass
if self._dynamic_status and self._profile_communication:
raise RuntimeError("The profile_communication parameter cannot be set on the dynamic shape network.")
if self._dynamic_status and self._profile_memory:
raise RuntimeError("The profile_memory parameter cannot be set on the dynamic shape network.")
if self._dynamic_status:
logger.warning(
"[Profiler]Dynamic Shape network does not support collecting step trace performance data currently.")
# analyse step trace info
points = None
is_training_mode_flag = False
try:
if self._is_support_step_info_collect() and not self._dynamic_status:
points, is_training_mode_flag = self._analyse_step_trace(source_path, framework_parser)
except ProfilerException as err:
logger.warning(err.message)
finally:
pass
# analyse timeline info
try:
logger.info("Profiling: analyzing the timeline data.")
self._analyse_timeline(aicpu_data_parser, optime_parser, source_path)
except (ProfilerIOException, ProfilerFileNotFoundException, RuntimeError) as err:
logger.warning('Fail to write timeline data: %s', err)
finally:
pass
if self._dynamic_status:
dynamic_parser = DynamicFrameWorkParser(self._output_path, self._rank_id)
dynamic_parser.write_dynamic_shape_data()
# Get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt
flops_parser = FlopsParser(source_path, self._output_path, op_task_dict, self._dev_id, self._rank_id,
is_training_mode_flag)
logger.info("Profiling: analyzing the operation FLOPs.")
flops_parser.execute()
self._ascend_graph_memory_analyse(points)
self._ascend_graph_hccl_analyse()
self._minddata_analyse(source_path)
if self._op_time:
framework_parser, aicpu_data_parser, optime_parser, op_task_dict = self._ascend_graph_op_analyse(
source_path)
points, is_training_mode_flag = self._ascend_step_trace_analyse(source_path, framework_parser)
self._ascend_timeline_analyse(aicpu_data_parser, optime_parser, source_path)
if self._dynamic_status:
self._ascend_dynamic_net_analyse()
self._ascend_flops_analyse(source_path, op_task_dict, is_training_mode_flag)
self._ascend_graph_memory_analyse(points)
self._ascend_graph_hccl_analyse()
self._ascend_graph_msadvisor_analyse(job_id)
def _ascend_graph_start(self):
"""Ascend graph mode start profiling."""
@ -940,27 +951,10 @@ class Profiler:
else:
logger.info("No need to stop profiler because profiler has been stopped.")
reduce_op_type = self._get_step_reduce_op_type()
timeline_generator = self._generate_timeline(reduce_op_type)
# parse minddata pipeline operator and queue for GPU
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
logger.info("Profiling: analyzing the minddata pipeline operator and queue for GPU.")
pipeline_parser.parse()
except ProfilerException as err:
logger.warning(err.message)
# Analyze minddata information
try:
md_analyzer = MinddataProfilingAnalyzer(self._output_path, self._dev_id, self._output_path)
logger.info("Profiling: analyzing the minddata information.")
md_analyzer.analyze()
except ProfilerException as err:
logger.warning(err.message)
self._minddata_analyse(self._output_path)
try:
self._analyse_step_relation_info(timeline_generator)
self._analyse_step_relation_info()
except ProfilerException as err:
logger.warning(err.message)
finally:
@ -985,8 +979,12 @@ class Profiler:
"scenarios currently.")
return analyse_step_trace
def _analyse_step_relation_info(self, timeline_generator):
def _analyse_step_relation_info(self):
"""Parse iteration related information."""
if not self._op_time:
return
reduce_op_type = self._get_step_reduce_op_type()
timeline_generator = self._generate_timeline(reduce_op_type)
parser = GpuFrameWorkParser(self._output_path, self._dev_id)
graph_ids = parser.get_graph_ids()
ProfilerInfo.set_graph_ids(graph_ids)
@ -1091,6 +1089,7 @@ class Profiler:
optime_parser (OPComputeTimeParserParser): The parser instance for AI Core
operator execution time calculation.
"""
logger.info("Profiling: analyzing the timeline data.")
timeline_analyser = AscendTimelineGenerator(self._output_path, self._dev_id, self._rank_id,
self._rank_size, context.get_context("mode"))
# Get framework info
@ -1216,6 +1215,7 @@ class Profiler:
def _analyser_op_info(self):
"""Analyse the operator information."""
logger.info("Profiling: analyzing the operation compute time.")
integrator = Integrator(self._output_path, self._rank_id)
integrator.integrate()
@ -1340,6 +1340,11 @@ class Profiler:
raise TypeError(f"For '{self.__class__.__name__}', the parameter data_process must be bool, "
f"but got type {type(self._data_process)}")
self._op_time = kwargs.pop("op_time", True)
if not isinstance(self._op_time, bool):
raise TypeError(f"For '{self.__class__.__name__}', the parameter op_time must be bool, "
f"but got type {type(self._op_time)}")
timeline_limit = kwargs.pop("timeline_limit", 500)
if not isinstance(timeline_limit, int):
raise TypeError(f"For '{self.__class__.__name__}', the parameter timeline_limit must be int, "

View File

@ -122,7 +122,7 @@ class TestEnvEnableProfiler:
@security_off_wrap
def test_gpu_profiler(self):
root_status = os.system("whoami | grep root")
cuda_status = os.system("nvcc -V | grep cuda_10")
cuda_status = os.system("nvcc -V | grep 'release 10'")
if root_status and not cuda_status:
return
status = os.system(
@ -144,7 +144,7 @@ class TestEnvEnableProfiler:
Expectation: No exception.
"""
root_status = os.system("whoami | grep root")
cuda_status = os.system("nvcc -V | grep cuda_10")
cuda_status = os.system("nvcc -V | grep 'release 10'")
if root_status and not cuda_status:
return
status = os.system(