From e4b5d77b8e79d78c336fef1cd541b12deadec98f Mon Sep 17 00:00:00 2001 From: gaojing Date: Fri, 8 Oct 2021 09:37:25 -0400 Subject: [PATCH] fix the wrong value of average flops. --- mindspore/profiler/parser/flops_parser.py | 246 ++++++++++++++++++---- mindspore/profiler/profiling.py | 19 +- 2 files changed, 219 insertions(+), 46 deletions(-) diff --git a/mindspore/profiler/parser/flops_parser.py b/mindspore/profiler/parser/flops_parser.py index 2d7c7b145a0..3d9fbd47b00 100644 --- a/mindspore/profiler/parser/flops_parser.py +++ b/mindspore/profiler/parser/flops_parser.py @@ -21,6 +21,8 @@ import stat from mindspore import log as logger from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException, \ ProfilerFileNotFoundException, ProfilerRawFileException +from mindspore.profiler.common.validator.validate_path import \ + validate_and_normalize_path class FlopsParser: @@ -43,18 +45,24 @@ class FlopsParser: # else the task id represents one operator. _task_id_threshold = 25000 - def __init__(self, input_dir, output_dir, op_task_dict, device_id, rank_id): + def __init__(self, input_dir, output_dir, op_task_dict, device_id, rank_id, is_training_mode_flag=True): self._input_dir = input_dir self._output_dir = output_dir self._op_task_dict = op_task_dict self._device_id = device_id self._rank_id = rank_id + self.is_training_mode_flag = is_training_mode_flag + # output files. self._flops_filename = f'flops_{self._rank_id}.txt' self._flops_summary_filename = f'flops_summary_{self._rank_id}.json' self._flops_scope_filename = f'flops_scope_{self._rank_id}.json' - self._aicore_filename = f'aicore.data.{self._device_id}.slice_0' + self._flops_utilization_step_filename = f'flops_utilization_step_{self._rank_id}.json' + # input files. + self._aicore_filename_pref = f'aicore.data.{self._device_id}.slice' self._optime_filename = f'output_op_compute_time_{self._rank_id}.txt' self._info_json = f'info.json.{self._device_id}' + self._step_trace_filename = f'step_trace_raw_{self._rank_id}_detail_time.csv' + self._timeline_data_filename = f'output_timeline_data_{self._rank_id}.txt' self._flops_summary = { 'FLOPs': 0, 'FLOPS': 0, @@ -67,54 +75,86 @@ class FlopsParser: def execute(self): """Get the flops of aicore operators and write to file.""" peak_flops = self._get_peak_flops() - read_count, all_log_struct = self._load_aicore_data() + op_avg_time_dict = self._get_op_avg_time_dict() op_flops_list = [] op_name_set = set() + op_compute_dict = dict() + # get all step time. + op_all_step_time, op_all_step_comp = self._get_all_step_time() + op_start_time = self._get_op_start_time() + op_idx = 0 + step_idx = 0 + aicore_file_doc = os.path.join(self._input_dir, "data") + source_files = self._get_aicore_files(aicore_file_doc) + # parse all sliced aicore files. + for source_file in source_files: + source_file = validate_and_normalize_path(source_file) + read_count, all_log_struct = self._load_aicore_data(source_file) - for idx in range(read_count): - log_struct = all_log_struct[idx * self.AICORE_LOG_SIZE: - (idx + 1) * self.AICORE_LOG_SIZE] - result = [hex(i) for i in struct.unpack(self.RUNTIME_COMMON, log_struct)] - op_name = self._get_op_name(result) - if op_name in op_name_set or op_name == "": - continue - if op_name not in op_avg_time_dict: - logger.warning("Op name {op_name} is not exist in op average time dict.") - continue - # Convert the unit of task_fops to MFLOPs(1e6). - task_fops = self._compute_task_flops(result) * 1e-6 - op_avg_time = op_avg_time_dict[op_name] - # Time unit of op_avg_time is ms. - # The unit of gflop_per_second is GFLOPS(1e9). - if float(op_avg_time) == 0.0: - raise ValueError("All operators take 0 ms.") - if peak_flops == 0: - raise ValueError("The frequency of an operator is 0.") - gflop_per_second = task_fops / float(op_avg_time) - flops_utilization = (gflop_per_second * 1e9 / peak_flops) * 100 - self._flops_summary['FLOPs'] += task_fops - self._flops_summary['FLOPS'] += gflop_per_second - self._flops_summary['FLOPS_Utilization'] += flops_utilization - op_flops = [op_name, str(task_fops), str(gflop_per_second), str(flops_utilization)] - op_flops_list.append(op_flops) - op_name_set.add(op_name) - self._add_flops_to_each_scope(op_name, task_fops) + for idx in range(read_count): + log_struct = all_log_struct[idx * self.AICORE_LOG_SIZE: + (idx + 1) * self.AICORE_LOG_SIZE] + result = [hex(i) for i in struct.unpack(self.RUNTIME_COMMON, log_struct)] + op_name = self._get_op_name(result) + + # Convert the unit of task_fops to MFLOPs(1e6). + if op_name in op_compute_dict: + task_fops = op_compute_dict[op_name] + else: + task_fops = self._compute_task_flops(result) * 1e-6 + op_compute_dict[op_name] = task_fops + + # add the op FLOPS in current step. + op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp = self._add_step_flops_time( + op_name, task_fops, op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp) + logger.debug("calculate FLOPS: step_idx= %d, op_idx=%d.", step_idx, op_idx) + + # calculate averge op FLOPS. + if op_name in op_name_set or op_name == "": + continue + if op_name not in op_avg_time_dict: + logger.warning("Op name {op_name} is not exist in op average time dict.") + continue + op_avg_time = op_avg_time_dict[op_name] + # Time unit of op_avg_time is ms. + # The unit of gflop_per_second is GFLOPS(1e9). + if float(op_avg_time) == 0.0: + raise ValueError("All operators take 0 ms.") + if peak_flops == 0: + raise ValueError("The frequency of an operator is 0.") + gflop_per_second = task_fops / float(op_avg_time) + flops_utilization = (gflop_per_second * 1e9 / peak_flops) * 100 + self._flops_summary['FLOPs'] += task_fops + self._flops_summary['FLOPS'] += gflop_per_second + op_flops = [op_name, str(task_fops), str(gflop_per_second), str(flops_utilization)] + op_flops_list.append(op_flops) + op_name_set.add(op_name) + self._add_flops_to_each_scope(op_name, task_fops) if not op_name_set: raise ProfilerRawFileException("No aicore operator found.") self._flops_summary['FLOPS'] /= len(op_name_set) - self._flops_summary['FLOPS_Utilization'] /= len(op_name_set) + + sum_flops_utilization = 0.0 + # calculate the every step FLOPS utilization and the average values. + utilization_save_filename = os.path.join(self._output_dir, self._flops_utilization_step_filename) + with open(utilization_save_filename, 'w') as f: + f.write("steps, FLOPS_Utilization %\n") + for i, x in enumerate(op_all_step_comp): + current_utilization = x[0] / x[1] * 1e9 / peak_flops * 100 + sum_flops_utilization += current_utilization + f.write(str(i + 1)) + f.write(",") + f.write(str(current_utilization)) + f.write("\n") + self._flops_summary['FLOPS_Utilization'] = sum_flops_utilization / len(op_all_step_comp) self._format_scope_flops() self._write_file(op_flops_list) - def _load_aicore_data(self): + def _load_aicore_data(self, aicore_file_path): """Load the original binary aicore data.""" - aicore_file_path = os.path.join( - self._input_dir, - "data", - self._aicore_filename - ) + logger.info("the aicore file path is %s", aicore_file_path) if not os.path.exists(aicore_file_path): logger.error(f'The file {aicore_file_path} does not exist.') @@ -328,3 +368,133 @@ class FlopsParser: except (IOError, OSError) as err: logger.error(f'Error occurred when write {output_flops_scope_file_path} file: {err}') raise ProfilerIOException() + + def _get_aicore_files(self, profiler_dir): + """Get aicore files.""" + aicore_files = self._search_file(profiler_dir) + if not aicore_files: + raise ProfilerPathErrorException('The aicore file does not exist.') + + return aicore_files + + def _search_file(self, input_dir): + """Search aicore file under specific input directory.""" + # validate input_dir + if not os.path.isdir(input_dir): + raise ProfilerPathErrorException( + '{} does not exist or is not a dir'.format(input_dir) + ) + # get aicore files + files = os.listdir(input_dir) + aicore_files = list( + filter( + lambda file: file.startswith(self._aicore_filename_pref) and not file.endswith('.done'), + files + ) + ) + # validate result + if len(aicore_files) > 1: + # the format of file name is like `aicore.data.$id.slice_$number`. + # use the $number as the sorted key + try: + aicore_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1])) + except ValueError as err: + logger.warning("Unable to parse file names: %s. %s", aicore_files, err) + aicore_files = [] + else: + aicore_files = list( + filter( + lambda file: file.startswith(self._aicore_filename_pref) and not file.endswith('.done'), + files + ) + ) + if len(aicore_files) >= 1: + logger.warning("The aicore file structure is changed, please upgrade " \ + "mindspore and regenerate profiling data") + + file_paths = [os.path.join(input_dir, file) for file in aicore_files] + logger.info("Find %d aicore files.", len(file_paths)) + return file_paths + + def _get_all_step_time(self): + """Get the op average execution time.""" + op_all_step_time = [] + op_all_step_comp = [] + _step_trace_file_path = os.path.join(self._output_dir, self._step_trace_filename) + + if not os.path.exists(_step_trace_file_path): + logger.error(f'The {_step_trace_file_path} file does not exist.') + raise ProfilerFileNotFoundException(_step_trace_file_path) + try: + with open(_step_trace_file_path, 'r') as f: + lines = f.readlines() + # the last line is the average info. + op_avg_time_lines = lines[1:-1] + # train mode. + if self.is_training_mode_flag: + for op_avg_idx in op_avg_time_lines: + line = op_avg_idx.split(',') + fp = float(line[4]) / 100000.0 + bp = float(line[5]) / 100000.0 + op_all_step_time.append([fp, bp]) + op_all_step_comp.append([0.0, bp - fp]) + else: + # eval mode. + for op_avg_idx in op_avg_time_lines: + line = op_avg_idx.split(',') + fp = float(line[4]) / 100000.0 + end_point = float(line[2]) / 100000.0 + op_all_step_time.append([fp, end_point]) + op_all_step_comp.append([0.0, end_point - fp]) + + except (IOError, OSError) as err: + logger.error(f'Error occurred when read {optime_file_path} file: {err}') + raise ProfilerIOException() + logger.info("the train step is %d .", len(op_all_step_time)) + return op_all_step_time, op_all_step_comp + + def _get_op_start_time(self): + """Get the op average execution time.""" + op_start_time = [] + _timeline_file_path = os.path.join(self._output_dir, self._timeline_data_filename) + + if not os.path.exists(_timeline_file_path): + logger.error(f'The {_timeline_file_path} file does not exist.') + raise ProfilerFileNotFoundException(_timeline_file_path) + try: + with open(_timeline_file_path, 'r') as f: + lines = f.readlines() + op_avg_time_lines = lines[1:] + for op_avg_idx in op_avg_time_lines: + line = op_avg_idx.split(',') + op_name = line[0] + op_start = float(line[2]) + op_start_time.append([op_name, op_start]) + except (IOError, OSError) as err: + logger.error(f'Error occurred when read {optime_file_path} file: {err}') + raise ProfilerIOException() + return op_start_time + + def _add_step_flops_time(self, op_name, task_fops, op_idx, step_idx, op_start_time, + op_all_step_time, op_all_step_comp): + """Get the start time from the current task.""" + while((op_idx < len(op_start_time)) and (op_name != op_start_time[op_idx][0])): + op_idx += 1 + if op_idx >= len(op_start_time): + logger.warning(f"Op name {op_name} is not exist in timeline dict.") + + # do not add the op FLOPS that not in fp_and_bp time. + while((step_idx < len(op_all_step_time)) and + (op_start_time[op_idx][1] >= op_all_step_time[step_idx][1])): + step_idx += 1 + if step_idx >= len(op_all_step_time): + logger.warning(f"Op name {op_name} is not exist in timeline dict.") + + # add the op FLOPS that in fp_and_bp time. + if ((step_idx < len(op_all_step_time)) and + (op_start_time[op_idx][1] >= op_all_step_time[step_idx][0]) and + (op_start_time[op_idx][1] <= op_all_step_time[step_idx][1])): + op_all_step_comp[step_idx][0] += task_fops + # next op. + op_idx += 1 + return op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp diff --git a/mindspore/profiler/profiling.py b/mindspore/profiler/profiling.py index 4a05778d5ed..606f751101b 100644 --- a/mindspore/profiler/profiling.py +++ b/mindspore/profiler/profiling.py @@ -294,10 +294,6 @@ class Profiler: aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu) aicpu_data_parser.execute() - # get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt - flops_parser = FlopsParser(source_path, self._output_path, op_task_dict, self._dev_id, self._rank_id) - flops_parser.execute() - # Parsing minddata AICPU profiling MinddataParser.execute(source_path, self._output_path, self._rank_id) @@ -323,8 +319,10 @@ class Profiler: # analyse step trace info points = None + is_training_mode_flag = False + try: - points = self._analyse_step_trace(source_path, framework_parser) + points, is_training_mode_flag = self._analyse_step_trace(source_path, framework_parser) except ProfilerException as err: logger.warning(err.message) @@ -348,6 +346,11 @@ class Profiler: except (ProfilerIOException, ProfilerFileNotFoundException, ProfilerRawFileException) as err: logger.warning(err.message) + # get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt + flops_parser = FlopsParser(source_path, self._output_path, op_task_dict, + self._dev_id, self._rank_id, is_training_mode_flag) + flops_parser.execute() + os.environ['PROFILING_MODE'] = str("false") self._ascend_profiler.stop() @@ -430,14 +433,14 @@ class Profiler: skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME) point_info = framework_parser.point_info # recognize inference or training mode - is_traning_mode_flag = framework_parser.check_op_name("Gradients") + is_training_mode_flag = framework_parser.check_op_name("Gradients") # parser the step trace files and save the result to disk source_path = validate_and_normalize_path(source_path) parser = AscendStepTraceParser(input_dir=source_path, output_file_path=step_trace_intermediate_file_path, job_id=self._job_id_env, skip_first_step=skip_first_step_flag, - is_training_mode=is_traning_mode_flag) + is_training_mode=is_training_mode_flag) parser.update_tag_op_type_map(point_info) parser.parse_and_save() point_info = parser.record_point_info(point_info, point_info_file_path) @@ -446,7 +449,7 @@ class Profiler: logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path) logger.info("The point info is: %s", point_info) - return point_info + return point_info, is_training_mode_flag def _analyse_timeline(self, aicpu_parser, optime_parser, source_path): """