forked from mindspore-Ecosystem/mindspore
!24572 fix the wrong value of average flops.
Merge pull request !24572 from casgj/master
This commit is contained in:
commit
4634feabcd
|
@ -21,6 +21,8 @@ import stat
|
|||
from mindspore import log as logger
|
||||
from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException, \
|
||||
ProfilerFileNotFoundException, ProfilerRawFileException
|
||||
from mindspore.profiler.common.validator.validate_path import \
|
||||
validate_and_normalize_path
|
||||
|
||||
|
||||
class FlopsParser:
|
||||
|
@ -43,18 +45,24 @@ class FlopsParser:
|
|||
# else the task id represents one operator.
|
||||
_task_id_threshold = 25000
|
||||
|
||||
def __init__(self, input_dir, output_dir, op_task_dict, device_id, rank_id):
|
||||
def __init__(self, input_dir, output_dir, op_task_dict, device_id, rank_id, is_training_mode_flag=True):
|
||||
self._input_dir = input_dir
|
||||
self._output_dir = output_dir
|
||||
self._op_task_dict = op_task_dict
|
||||
self._device_id = device_id
|
||||
self._rank_id = rank_id
|
||||
self.is_training_mode_flag = is_training_mode_flag
|
||||
# output files.
|
||||
self._flops_filename = f'flops_{self._rank_id}.txt'
|
||||
self._flops_summary_filename = f'flops_summary_{self._rank_id}.json'
|
||||
self._flops_scope_filename = f'flops_scope_{self._rank_id}.json'
|
||||
self._aicore_filename = f'aicore.data.{self._device_id}.slice_0'
|
||||
self._flops_utilization_step_filename = f'flops_utilization_step_{self._rank_id}.json'
|
||||
# input files.
|
||||
self._aicore_filename_pref = f'aicore.data.{self._device_id}.slice'
|
||||
self._optime_filename = f'output_op_compute_time_{self._rank_id}.txt'
|
||||
self._info_json = f'info.json.{self._device_id}'
|
||||
self._step_trace_filename = f'step_trace_raw_{self._rank_id}_detail_time.csv'
|
||||
self._timeline_data_filename = f'output_timeline_data_{self._rank_id}.txt'
|
||||
self._flops_summary = {
|
||||
'FLOPs': 0,
|
||||
'FLOPS': 0,
|
||||
|
@ -67,54 +75,86 @@ class FlopsParser:
|
|||
def execute(self):
|
||||
"""Get the flops of aicore operators and write to file."""
|
||||
peak_flops = self._get_peak_flops()
|
||||
read_count, all_log_struct = self._load_aicore_data()
|
||||
|
||||
op_avg_time_dict = self._get_op_avg_time_dict()
|
||||
op_flops_list = []
|
||||
op_name_set = set()
|
||||
op_compute_dict = dict()
|
||||
# get all step time.
|
||||
op_all_step_time, op_all_step_comp = self._get_all_step_time()
|
||||
op_start_time = self._get_op_start_time()
|
||||
op_idx = 0
|
||||
step_idx = 0
|
||||
aicore_file_doc = os.path.join(self._input_dir, "data")
|
||||
source_files = self._get_aicore_files(aicore_file_doc)
|
||||
# parse all sliced aicore files.
|
||||
for source_file in source_files:
|
||||
source_file = validate_and_normalize_path(source_file)
|
||||
read_count, all_log_struct = self._load_aicore_data(source_file)
|
||||
|
||||
for idx in range(read_count):
|
||||
log_struct = all_log_struct[idx * self.AICORE_LOG_SIZE:
|
||||
(idx + 1) * self.AICORE_LOG_SIZE]
|
||||
result = [hex(i) for i in struct.unpack(self.RUNTIME_COMMON, log_struct)]
|
||||
op_name = self._get_op_name(result)
|
||||
if op_name in op_name_set or op_name == "":
|
||||
continue
|
||||
if op_name not in op_avg_time_dict:
|
||||
logger.warning("Op name {op_name} is not exist in op average time dict.")
|
||||
continue
|
||||
# Convert the unit of task_fops to MFLOPs(1e6).
|
||||
task_fops = self._compute_task_flops(result) * 1e-6
|
||||
op_avg_time = op_avg_time_dict[op_name]
|
||||
# Time unit of op_avg_time is ms.
|
||||
# The unit of gflop_per_second is GFLOPS(1e9).
|
||||
if float(op_avg_time) == 0.0:
|
||||
raise ValueError("All operators take 0 ms.")
|
||||
if peak_flops == 0:
|
||||
raise ValueError("The frequency of an operator is 0.")
|
||||
gflop_per_second = task_fops / float(op_avg_time)
|
||||
flops_utilization = (gflop_per_second * 1e9 / peak_flops) * 100
|
||||
self._flops_summary['FLOPs'] += task_fops
|
||||
self._flops_summary['FLOPS'] += gflop_per_second
|
||||
self._flops_summary['FLOPS_Utilization'] += flops_utilization
|
||||
op_flops = [op_name, str(task_fops), str(gflop_per_second), str(flops_utilization)]
|
||||
op_flops_list.append(op_flops)
|
||||
op_name_set.add(op_name)
|
||||
self._add_flops_to_each_scope(op_name, task_fops)
|
||||
for idx in range(read_count):
|
||||
log_struct = all_log_struct[idx * self.AICORE_LOG_SIZE:
|
||||
(idx + 1) * self.AICORE_LOG_SIZE]
|
||||
result = [hex(i) for i in struct.unpack(self.RUNTIME_COMMON, log_struct)]
|
||||
op_name = self._get_op_name(result)
|
||||
|
||||
# Convert the unit of task_fops to MFLOPs(1e6).
|
||||
if op_name in op_compute_dict:
|
||||
task_fops = op_compute_dict[op_name]
|
||||
else:
|
||||
task_fops = self._compute_task_flops(result) * 1e-6
|
||||
op_compute_dict[op_name] = task_fops
|
||||
|
||||
# add the op FLOPS in current step.
|
||||
op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp = self._add_step_flops_time(
|
||||
op_name, task_fops, op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp)
|
||||
logger.debug("calculate FLOPS: step_idx= %d, op_idx=%d.", step_idx, op_idx)
|
||||
|
||||
# calculate averge op FLOPS.
|
||||
if op_name in op_name_set or op_name == "":
|
||||
continue
|
||||
if op_name not in op_avg_time_dict:
|
||||
logger.warning("Op name {op_name} is not exist in op average time dict.")
|
||||
continue
|
||||
op_avg_time = op_avg_time_dict[op_name]
|
||||
# Time unit of op_avg_time is ms.
|
||||
# The unit of gflop_per_second is GFLOPS(1e9).
|
||||
if float(op_avg_time) == 0.0:
|
||||
raise ValueError("All operators take 0 ms.")
|
||||
if peak_flops == 0:
|
||||
raise ValueError("The frequency of an operator is 0.")
|
||||
gflop_per_second = task_fops / float(op_avg_time)
|
||||
flops_utilization = (gflop_per_second * 1e9 / peak_flops) * 100
|
||||
self._flops_summary['FLOPs'] += task_fops
|
||||
self._flops_summary['FLOPS'] += gflop_per_second
|
||||
op_flops = [op_name, str(task_fops), str(gflop_per_second), str(flops_utilization)]
|
||||
op_flops_list.append(op_flops)
|
||||
op_name_set.add(op_name)
|
||||
self._add_flops_to_each_scope(op_name, task_fops)
|
||||
|
||||
if not op_name_set:
|
||||
raise ProfilerRawFileException("No aicore operator found.")
|
||||
self._flops_summary['FLOPS'] /= len(op_name_set)
|
||||
self._flops_summary['FLOPS_Utilization'] /= len(op_name_set)
|
||||
|
||||
sum_flops_utilization = 0.0
|
||||
# calculate the every step FLOPS utilization and the average values.
|
||||
utilization_save_filename = os.path.join(self._output_dir, self._flops_utilization_step_filename)
|
||||
with open(utilization_save_filename, 'w') as f:
|
||||
f.write("steps, FLOPS_Utilization %\n")
|
||||
for i, x in enumerate(op_all_step_comp):
|
||||
current_utilization = x[0] / x[1] * 1e9 / peak_flops * 100
|
||||
sum_flops_utilization += current_utilization
|
||||
f.write(str(i + 1))
|
||||
f.write(",")
|
||||
f.write(str(current_utilization))
|
||||
f.write("\n")
|
||||
self._flops_summary['FLOPS_Utilization'] = sum_flops_utilization / len(op_all_step_comp)
|
||||
self._format_scope_flops()
|
||||
self._write_file(op_flops_list)
|
||||
|
||||
def _load_aicore_data(self):
|
||||
def _load_aicore_data(self, aicore_file_path):
|
||||
"""Load the original binary aicore data."""
|
||||
aicore_file_path = os.path.join(
|
||||
self._input_dir,
|
||||
"data",
|
||||
self._aicore_filename
|
||||
)
|
||||
logger.info("the aicore file path is %s", aicore_file_path)
|
||||
|
||||
if not os.path.exists(aicore_file_path):
|
||||
logger.error(f'The file {aicore_file_path} does not exist.')
|
||||
|
@ -328,3 +368,133 @@ class FlopsParser:
|
|||
except (IOError, OSError) as err:
|
||||
logger.error(f'Error occurred when write {output_flops_scope_file_path} file: {err}')
|
||||
raise ProfilerIOException()
|
||||
|
||||
def _get_aicore_files(self, profiler_dir):
|
||||
"""Get aicore files."""
|
||||
aicore_files = self._search_file(profiler_dir)
|
||||
if not aicore_files:
|
||||
raise ProfilerPathErrorException('The aicore file does not exist.')
|
||||
|
||||
return aicore_files
|
||||
|
||||
def _search_file(self, input_dir):
|
||||
"""Search aicore file under specific input directory."""
|
||||
# validate input_dir
|
||||
if not os.path.isdir(input_dir):
|
||||
raise ProfilerPathErrorException(
|
||||
'{} does not exist or is not a dir'.format(input_dir)
|
||||
)
|
||||
# get aicore files
|
||||
files = os.listdir(input_dir)
|
||||
aicore_files = list(
|
||||
filter(
|
||||
lambda file: file.startswith(self._aicore_filename_pref) and not file.endswith('.done'),
|
||||
files
|
||||
)
|
||||
)
|
||||
# validate result
|
||||
if len(aicore_files) > 1:
|
||||
# the format of file name is like `aicore.data.$id.slice_$number`.
|
||||
# use the $number as the sorted key
|
||||
try:
|
||||
aicore_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
|
||||
except ValueError as err:
|
||||
logger.warning("Unable to parse file names: %s. %s", aicore_files, err)
|
||||
aicore_files = []
|
||||
else:
|
||||
aicore_files = list(
|
||||
filter(
|
||||
lambda file: file.startswith(self._aicore_filename_pref) and not file.endswith('.done'),
|
||||
files
|
||||
)
|
||||
)
|
||||
if len(aicore_files) >= 1:
|
||||
logger.warning("The aicore file structure is changed, please upgrade " \
|
||||
"mindspore and regenerate profiling data")
|
||||
|
||||
file_paths = [os.path.join(input_dir, file) for file in aicore_files]
|
||||
logger.info("Find %d aicore files.", len(file_paths))
|
||||
return file_paths
|
||||
|
||||
def _get_all_step_time(self):
|
||||
"""Get the op average execution time."""
|
||||
op_all_step_time = []
|
||||
op_all_step_comp = []
|
||||
_step_trace_file_path = os.path.join(self._output_dir, self._step_trace_filename)
|
||||
|
||||
if not os.path.exists(_step_trace_file_path):
|
||||
logger.error(f'The {_step_trace_file_path} file does not exist.')
|
||||
raise ProfilerFileNotFoundException(_step_trace_file_path)
|
||||
try:
|
||||
with open(_step_trace_file_path, 'r') as f:
|
||||
lines = f.readlines()
|
||||
# the last line is the average info.
|
||||
op_avg_time_lines = lines[1:-1]
|
||||
# train mode.
|
||||
if self.is_training_mode_flag:
|
||||
for op_avg_idx in op_avg_time_lines:
|
||||
line = op_avg_idx.split(',')
|
||||
fp = float(line[4]) / 100000.0
|
||||
bp = float(line[5]) / 100000.0
|
||||
op_all_step_time.append([fp, bp])
|
||||
op_all_step_comp.append([0.0, bp - fp])
|
||||
else:
|
||||
# eval mode.
|
||||
for op_avg_idx in op_avg_time_lines:
|
||||
line = op_avg_idx.split(',')
|
||||
fp = float(line[4]) / 100000.0
|
||||
end_point = float(line[2]) / 100000.0
|
||||
op_all_step_time.append([fp, end_point])
|
||||
op_all_step_comp.append([0.0, end_point - fp])
|
||||
|
||||
except (IOError, OSError) as err:
|
||||
logger.error(f'Error occurred when read {optime_file_path} file: {err}')
|
||||
raise ProfilerIOException()
|
||||
logger.info("the train step is %d .", len(op_all_step_time))
|
||||
return op_all_step_time, op_all_step_comp
|
||||
|
||||
def _get_op_start_time(self):
|
||||
"""Get the op average execution time."""
|
||||
op_start_time = []
|
||||
_timeline_file_path = os.path.join(self._output_dir, self._timeline_data_filename)
|
||||
|
||||
if not os.path.exists(_timeline_file_path):
|
||||
logger.error(f'The {_timeline_file_path} file does not exist.')
|
||||
raise ProfilerFileNotFoundException(_timeline_file_path)
|
||||
try:
|
||||
with open(_timeline_file_path, 'r') as f:
|
||||
lines = f.readlines()
|
||||
op_avg_time_lines = lines[1:]
|
||||
for op_avg_idx in op_avg_time_lines:
|
||||
line = op_avg_idx.split(',')
|
||||
op_name = line[0]
|
||||
op_start = float(line[2])
|
||||
op_start_time.append([op_name, op_start])
|
||||
except (IOError, OSError) as err:
|
||||
logger.error(f'Error occurred when read {optime_file_path} file: {err}')
|
||||
raise ProfilerIOException()
|
||||
return op_start_time
|
||||
|
||||
def _add_step_flops_time(self, op_name, task_fops, op_idx, step_idx, op_start_time,
|
||||
op_all_step_time, op_all_step_comp):
|
||||
"""Get the start time from the current task."""
|
||||
while((op_idx < len(op_start_time)) and (op_name != op_start_time[op_idx][0])):
|
||||
op_idx += 1
|
||||
if op_idx >= len(op_start_time):
|
||||
logger.warning(f"Op name {op_name} is not exist in timeline dict.")
|
||||
|
||||
# do not add the op FLOPS that not in fp_and_bp time.
|
||||
while((step_idx < len(op_all_step_time)) and
|
||||
(op_start_time[op_idx][1] >= op_all_step_time[step_idx][1])):
|
||||
step_idx += 1
|
||||
if step_idx >= len(op_all_step_time):
|
||||
logger.warning(f"Op name {op_name} is not exist in timeline dict.")
|
||||
|
||||
# add the op FLOPS that in fp_and_bp time.
|
||||
if ((step_idx < len(op_all_step_time)) and
|
||||
(op_start_time[op_idx][1] >= op_all_step_time[step_idx][0]) and
|
||||
(op_start_time[op_idx][1] <= op_all_step_time[step_idx][1])):
|
||||
op_all_step_comp[step_idx][0] += task_fops
|
||||
# next op.
|
||||
op_idx += 1
|
||||
return op_idx, step_idx, op_start_time, op_all_step_time, op_all_step_comp
|
||||
|
|
|
@ -294,10 +294,6 @@ class Profiler:
|
|||
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
|
||||
aicpu_data_parser.execute()
|
||||
|
||||
# get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt
|
||||
flops_parser = FlopsParser(source_path, self._output_path, op_task_dict, self._dev_id, self._rank_id)
|
||||
flops_parser.execute()
|
||||
|
||||
# Parsing minddata AICPU profiling
|
||||
MinddataParser.execute(source_path, self._output_path, self._rank_id)
|
||||
|
||||
|
@ -323,8 +319,10 @@ class Profiler:
|
|||
|
||||
# analyse step trace info
|
||||
points = None
|
||||
is_training_mode_flag = False
|
||||
|
||||
try:
|
||||
points = self._analyse_step_trace(source_path, framework_parser)
|
||||
points, is_training_mode_flag = self._analyse_step_trace(source_path, framework_parser)
|
||||
except ProfilerException as err:
|
||||
logger.warning(err.message)
|
||||
|
||||
|
@ -348,6 +346,11 @@ class Profiler:
|
|||
except (ProfilerIOException, ProfilerFileNotFoundException, ProfilerRawFileException) as err:
|
||||
logger.warning(err.message)
|
||||
|
||||
# get op FLOPs from aicore.data.x.slice.0 file, and compute FLOPS, write output_op_flops_x.txt
|
||||
flops_parser = FlopsParser(source_path, self._output_path, op_task_dict,
|
||||
self._dev_id, self._rank_id, is_training_mode_flag)
|
||||
flops_parser.execute()
|
||||
|
||||
os.environ['PROFILING_MODE'] = str("false")
|
||||
self._ascend_profiler.stop()
|
||||
|
||||
|
@ -430,14 +433,14 @@ class Profiler:
|
|||
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
|
||||
point_info = framework_parser.point_info
|
||||
# recognize inference or training mode
|
||||
is_traning_mode_flag = framework_parser.check_op_name("Gradients")
|
||||
is_training_mode_flag = framework_parser.check_op_name("Gradients")
|
||||
# parser the step trace files and save the result to disk
|
||||
source_path = validate_and_normalize_path(source_path)
|
||||
parser = AscendStepTraceParser(input_dir=source_path,
|
||||
output_file_path=step_trace_intermediate_file_path,
|
||||
job_id=self._job_id_env,
|
||||
skip_first_step=skip_first_step_flag,
|
||||
is_training_mode=is_traning_mode_flag)
|
||||
is_training_mode=is_training_mode_flag)
|
||||
parser.update_tag_op_type_map(point_info)
|
||||
parser.parse_and_save()
|
||||
point_info = parser.record_point_info(point_info, point_info_file_path)
|
||||
|
@ -446,7 +449,7 @@ class Profiler:
|
|||
logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)
|
||||
logger.info("The point info is: %s", point_info)
|
||||
|
||||
return point_info
|
||||
return point_info, is_training_mode_flag
|
||||
|
||||
def _analyse_timeline(self, aicpu_parser, optime_parser, source_path):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue