fix the wrong value of average flops.

This commit is contained in:
gaojing 2021-10-08 09:37:25 -04:00
parent 94a7298690
commit e4b5d77b8e
2 changed files with 219 additions and 46 deletions

View File

@ -21,6 +21,8 @@ import stat
from mindspore import log as logger
from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException, \
ProfilerFileNotFoundException, ProfilerRawFileException
from mindspore.profiler.common.validator.validate_path import \
validate_and_normalize_path
class FlopsParser:
@ -43,18 +45,24 @@ class FlopsParser:
# else the task id represents one operator.
_task_id_threshold = 25000
def __init__(self, input_dir, output_dir, op_task_dict, device_id, rank_id):
def __init__(self, input_dir, output_dir, op_task_dict, device_id, rank_id, is_training_mode_flag=True):
self._input_dir = input_dir
self._output_dir = output_dir
self._op_task_dict = op_task_dict
self._device_id = device_id
self._rank_id = rank_id
self.is_training_mode_flag = is_training_mode_flag
# output files.
self._flops_filename = f'flops_{self._rank_id}.txt'
self._flops_summary_filename = f'flops_summary_{self._rank_id}.json'
self._flops_scope_filename = f'flops_scope_{self._rank_id}.json'
self._aicore_filename = f'aicore.data.{self._device_id}.slice_0'
self._flops_utilization_step_filename = f'flops_utilization_step_{self._rank_id}.json'
# input files.
self._aicore_filename_pref = f'aicore.data.{self._device_id}.slice'
self._optime_filename = f'output_op_compute_time_{self._rank_id}.txt'
self._info_json = f'info.json.{self._device_id}'
self._step_trace_filename = f'step_trace_raw_{self._rank_id}_detail_time.csv'
self._timeline_data_filename = f'output_timeline_data_{self._rank_id}.txt'
self._flops_summary = {
'FLOPs': 0,
'FLOPS': 0,
@ -67,54 +75,86 @@ class FlopsParser:
def execute(self):
"""Get the flops of aicore operators and write to file."""
peak_flops = self._get_peak_flops()
read_count, all_log_struct = self._load_aicore_data()
op_avg_time_dict = self._get_op_avg_time_dict()
op_flops_list = []
op_name_set = set()
op_compute_dict = dict()
# get all step time.
op_all_step_time, op_all_step_comp = self._get_all_step_time()
op_start_time = self._get_op_start_time()
op_idx = 0
step_idx = 0
aicore_file_doc = os.path.join(self._input_dir, "data")
source_files = self._get_aicore_files(aicore_file_doc)
# parse all sliced aicore files.
for source_file in source_files:
source_file = validate_and_normalize_path(source_file)
read_count, all_log_struct = self._load_aicore_data(source_file)
for idx in range(read_count):
log_struct = all_log_struct[idx * self.AICORE_LOG_SIZE:
(idx + 1) * self.AICORE_LOG_SIZE]
result = [hex(i) for i in struct.unpack(self.RUNTIME_COMMON, log_struct)]
op_name = self._get_op_name(result)
if op_name in op_name_set or op_name == "":
continue
if op_name not in op_avg_time_dict:
logger.warning("Op name {op_name} is not exist in op average time dict.")
continue
# Convert the unit of task_fops to MFLOPs(1e6).
task_fops = self._compute_task_flops(result) * 1e-6
op_avg_time = op_avg_time_dict[op_name]
# Time unit of op_avg_time is ms.
# The unit of gflop_per_second is GFLOPS(1e9).
if float(op_avg_time) == 0.0:
raise ValueError("All operators take 0 ms.")
if peak_flops == 0:
raise ValueError("The frequency of an operator is 0.")
gflop_per_second = task_fops / float(op_avg_time)
flops_utilization = (gflop_per_second * 1e9 / peak_flops) * 100
self._flops_summary['FLOPs'] += task_fops
self._flops_summary['FLOPS'] += gflop_per_second
self._flops_summary['FLOPS_Utilization'] += flops_utilization
op_flops = [op_name, str(task_fops), str(gflop_per_second), str(flops_utilization)]
op_flops_list.append(op_flops)
op_name_set.add(op_name)
self._add_flops_to_each_scope(op_name, task_fops)
for idx in range(read_count):
log_struct = all_log_struct[idx * self.AICORE_LOG_SIZE:
(idx + 1) * self.AICORE_LOG_SIZE]
result = [hex(i) for i in struct.unpack(self.RUNTIME_COMMON, log_struct)]
op_name = self._get_op_name(result)
# Convert the unit of task_fops to MFLOPs(1e6).
if op_name in op_compute_dict:
task_fops = op_compute_dict[op_name]
else:
task_fops = self._compute_task_flops(result) * 1e-6
op_compute_dict[op_name] = task_fops
# add the op FLOPS in current step.
op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp = self._add_step_flops_time(
op_name, task_fops, op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp)
logger.debug("calculate FLOPS: step_idx= %d, op_idx=%d.", step_idx, op_idx)
# calculate averge op FLOPS.
if op_name in op_name_set or op_name == "":
continue
if op_name not in op_avg_time_dict:
logger.warning("Op name {op_name} is not exist in op average time dict.")
continue
op_avg_time = op_avg_time_dict[op_name]
# Time unit of op_avg_time is ms.
# The unit of gflop_per_second is GFLOPS(1e9).
if float(op_avg_time) == 0.0:
raise ValueError("All operators take 0 ms.")
if peak_flops == 0:
raise ValueError("The frequency of an operator is 0.")
gflop_per_second = task_fops / float(op_avg_time)
flops_utilization = (gflop_per_second * 1e9 / peak_flops) * 100
self._flops_summary['FLOPs'] += task_fops
self._flops_summary['FLOPS'] += gflop_per_second
op_flops = [op_name, str(task_fops), str(gflop_per_second), str(flops_utilization)]
op_flops_list.append(op_flops)
op_name_set.add(op_name)
self._add_flops_to_each_scope(op_name, task_fops)
if not op_name_set:
raise ProfilerRawFileException("No aicore operator found.")
self._flops_summary['FLOPS'] /= len(op_name_set)
self._flops_summary['FLOPS_Utilization'] /= len(op_name_set)
sum_flops_utilization = 0.0
# calculate the every step FLOPS utilization and the average values.
utilization_save_filename = os.path.join(self._output_dir, self._flops_utilization_step_filename)
with open(utilization_save_filename, 'w') as f:
f.write("steps, FLOPS_Utilization %\n")
for i, x in enumerate(op_all_step_comp):
current_utilization = x[0] / x[1] * 1e9 / peak_flops * 100
sum_flops_utilization += current_utilization
f.write(str(i + 1))
f.write(",")
f.write(str(current_utilization))
f.write("\n")
self._flops_summary['FLOPS_Utilization'] = sum_flops_utilization / len(op_all_step_comp)
self._format_scope_flops()
self._write_file(op_flops_list)
def _load_aicore_data(self):
def _load_aicore_data(self, aicore_file_path):
"""Load the original binary aicore data."""
aicore_file_path = os.path.join(
self._input_dir,
"data",
self._aicore_filename
)
logger.info("the aicore file path is %s", aicore_file_path)
if not os.path.exists(aicore_file_path):
logger.error(f'The file {aicore_file_path} does not exist.')
@ -328,3 +368,133 @@ class FlopsParser:
except (IOError, OSError) as err:
logger.error(f'Error occurred when write {output_flops_scope_file_path} file: {err}')
raise ProfilerIOException()
def _get_aicore_files(self, profiler_dir):
"""Get aicore files."""
aicore_files = self._search_file(profiler_dir)
if not aicore_files:
raise ProfilerPathErrorException('The aicore file does not exist.')
return aicore_files
def _search_file(self, input_dir):
"""Search aicore file under specific input directory."""
# validate input_dir
if not os.path.isdir(input_dir):
raise ProfilerPathErrorException(
'{} does not exist or is not a dir'.format(input_dir)
)
# get aicore files
files = os.listdir(input_dir)
aicore_files = list(
filter(
lambda file: file.startswith(self._aicore_filename_pref) and not file.endswith('.done'),
files
)
)
# validate result
if len(aicore_files) > 1:
# the format of file name is like `aicore.data.$id.slice_$number`.
# use the $number as the sorted key
try:
aicore_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
except ValueError as err:
logger.warning("Unable to parse file names: %s. %s", aicore_files, err)
aicore_files = []
else:
aicore_files = list(
filter(
lambda file: file.startswith(self._aicore_filename_pref) and not file.endswith('.done'),
files
)
)
if len(aicore_files) >= 1:
logger.warning("The aicore file structure is changed, please upgrade " \
"mindspore and regenerate profiling data")
file_paths = [os.path.join(input_dir, file) for file in aicore_files]
logger.info("Find %d aicore files.", len(file_paths))
return file_paths
def _get_all_step_time(self):
"""Get the op average execution time."""
op_all_step_time = []
op_all_step_comp = []
_step_trace_file_path = os.path.join(self._output_dir, self._step_trace_filename)
if not os.path.exists(_step_trace_file_path):
logger.error(f'The {_step_trace_file_path} file does not exist.')
raise ProfilerFileNotFoundException(_step_trace_file_path)
try:
with open(_step_trace_file_path, 'r') as f:
lines = f.readlines()
# the last line is the average info.
op_avg_time_lines = lines[1:-1]
# train mode.
if self.is_training_mode_flag:
for op_avg_idx in op_avg_time_lines:
line = op_avg_idx.split(',')
fp = float(line[4]) / 100000.0
bp = float(line[5]) / 100000.0
op_all_step_time.append([fp, bp])
op_all_step_comp.append([0.0, bp - fp])
else:
# eval mode.
for op_avg_idx in op_avg_time_lines:
line = op_avg_idx.split(',')
fp = float(line[4]) / 100000.0
end_point = float(line[2]) / 100000.0
op_all_step_time.append([fp, end_point])
op_all_step_comp.append([0.0, end_point - fp])
except (IOError, OSError) as err:
logger.error(f'Error occurred when read {optime_file_path} file: {err}')
raise ProfilerIOException()
logger.info("the train step is %d .", len(op_all_step_time))
return op_all_step_time, op_all_step_comp
def _get_op_start_time(self):
"""Get the op average execution time."""
op_start_time = []
_timeline_file_path = os.path.join(self._output_dir, self._timeline_data_filename)
if not os.path.exists(_timeline_file_path):
logger.error(f'The {_timeline_file_path} file does not exist.')
raise ProfilerFileNotFoundException(_timeline_file_path)
try:
with open(_timeline_file_path, 'r') as f:
lines = f.readlines()
op_avg_time_lines = lines[1:]
for op_avg_idx in op_avg_time_lines:
line = op_avg_idx.split(',')
op_name = line[0]
op_start = float(line[2])
op_start_time.append([op_name, op_start])
except (IOError, OSError) as err:
logger.error(f'Error occurred when read {optime_file_path} file: {err}')
raise ProfilerIOException()
return op_start_time
def _add_step_flops_time(self, op_name, task_fops, op_idx, step_idx, op_start_time,
op_all_step_time, op_all_step_comp):
"""Get the start time from the current task."""
while((op_idx < len(op_start_time)) and (op_name != op_start_time[op_idx][0])):
op_idx += 1
if op_idx >= len(op_start_time):
logger.warning(f"Op name {op_name} is not exist in timeline dict.")
# do not add the op FLOPS that not in fp_and_bp time.
while((step_idx < len(op_all_step_time)) and
(op_start_time[op_idx][1] >= op_all_step_time[step_idx][1])):
step_idx += 1
if step_idx >= len(op_all_step_time):
logger.warning(f"Op name {op_name} is not exist in timeline dict.")
# add the op FLOPS that in fp_and_bp time.
if ((step_idx < len(op_all_step_time)) and
(op_start_time[op_idx][1] >= op_all_step_time[step_idx][0]) and
(op_start_time[op_idx][1] <= op_all_step_time[step_idx][1])):
op_all_step_comp[step_idx][0] += task_fops
# next op.
op_idx += 1
return op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp

View File

@ -294,10 +294,6 @@ class Profiler:
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser.execute()
# get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt
flops_parser = FlopsParser(source_path, self._output_path, op_task_dict, self._dev_id, self._rank_id)
flops_parser.execute()
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._rank_id)
@ -323,8 +319,10 @@ class Profiler:
# analyse step trace info
points = None
is_training_mode_flag = False
try:
points = self._analyse_step_trace(source_path, framework_parser)
points, is_training_mode_flag = self._analyse_step_trace(source_path, framework_parser)
except ProfilerException as err:
logger.warning(err.message)
@ -348,6 +346,11 @@ class Profiler:
except (ProfilerIOException, ProfilerFileNotFoundException, ProfilerRawFileException) as err:
logger.warning(err.message)
# get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt
flops_parser = FlopsParser(source_path, self._output_path, op_task_dict,
self._dev_id, self._rank_id, is_training_mode_flag)
flops_parser.execute()
os.environ['PROFILING_MODE'] = str("false")
self._ascend_profiler.stop()
@ -430,14 +433,14 @@ class Profiler:
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
point_info = framework_parser.point_info
# recognize inference or training mode
is_traning_mode_flag = framework_parser.check_op_name("Gradients")
is_training_mode_flag = framework_parser.check_op_name("Gradients")
# parser the step trace files and save the result to disk
source_path = validate_and_normalize_path(source_path)
parser = AscendStepTraceParser(input_dir=source_path,
output_file_path=step_trace_intermediate_file_path,
job_id=self._job_id_env,
skip_first_step=skip_first_step_flag,
is_training_mode=is_traning_mode_flag)
is_training_mode=is_training_mode_flag)
parser.update_tag_op_type_map(point_info)
parser.parse_and_save()
point_info = parser.record_point_info(point_info, point_info_file_path)
@ -446,7 +449,7 @@ class Profiler:
logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)
logger.info("The point info is: %s", point_info)
return point_info
return point_info, is_training_mode_flag
def _analyse_timeline(self, aicpu_parser, optime_parser, source_path):
"""