diff --git a/mindspore/profiler/parser/flops_parser.py b/mindspore/profiler/parser/flops_parser.py index a44525b82aa..2d7c7b145a0 100644 --- a/mindspore/profiler/parser/flops_parser.py +++ b/mindspore/profiler/parser/flops_parser.py @@ -238,10 +238,13 @@ class FlopsParser: top_level_scope = scope_list[0] suffix_name = "" if op_name.startswith("Gradients/recompute_Default"): - suffix_name = "(recompute_Gradients)" + suffix_name = "recompute_Gradients" else: - suffix_name = f"({top_level_scope})" - scope_list = list(map(lambda x: x + suffix_name, scope_list)) + suffix_name = top_level_scope + # To distinguish the same scope name at different scope level and different top level scope, + # the scope level and top level scope is added. + for level, scope_name in enumerate(scope_list): + scope_list[level] = scope_name + f"({level}_{suffix_name})" scope_list[0] = top_level_scope # Add root node (refers to total flops). diff --git a/mindspore/profiler/parser/integrator.py b/mindspore/profiler/parser/integrator.py index 1fc64906bc2..9a49e370622 100644 --- a/mindspore/profiler/parser/integrator.py +++ b/mindspore/profiler/parser/integrator.py @@ -21,7 +21,6 @@ from decimal import Decimal from mindspore import log as logger from mindspore.context import get_auto_parallel_context -from mindspore.communication.management import get_group_size from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException, \ ProfilerFileNotFoundException, ProfilerRawFileException, ProfilerParamValueErrorException from mindspore.profiler.common.util import query_latest_trace_time_file, to_int, to_millisecond @@ -991,7 +990,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator): _timeline_summary_filename = 'ascend_timeline_summary_{}.json' _cluster_analyse_filename = 'ascend_cluster_analyse_{}_{}_{}_{}.csv' - def __init__(self, profiling_dir, device_id): + def __init__(self, profiling_dir, device_id, rank_size): self._profiling_dir = profiling_dir self._device_id = device_id self._tid_dict = { @@ -1000,6 +999,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator): "communication": 8001, "free_time": 8002 } + self._rank_size = rank_size def _load_timeline_data(self): """Load timeline data from file.""" @@ -1201,7 +1201,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator): logger.debug('Finished adding framework info into timeline...') def _produce_two_separated_timeline(self, timeline, op_name): - """Produce two separated timeline based on op_name""" + """Produce two separated timeline based on op_name.""" timeline_include_op_name = [] timeline_exclude_op_name = [] for time_item in timeline: @@ -1213,7 +1213,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator): def _analyse_and_write_cluster_profiling_data(self, aicore_timeline, communication_timeline, step_time_list): """ - Analyse the cluster communication and computation data, and write it to file. + Analyse the cluster communication and computation data, and write result to file. To analyse the cluster performance bottleneck based on timeline, define the time of a training step as "t_total", propose five metrics as follows: @@ -1222,11 +1222,12 @@ class AscendTimelineGenerator(BaseTimelineGenerator): 3) The time that "communication" operators not overlapped by others(t2) 4) The time that consumed by computation(t_total - t2) 5) The time that "collective communication" operators not overlapped by others(t3) - In pipeline parallel mode, we can locate slow stage based on "t_total-t1". Inside each stage, - we can locate slow card based on "t_total-t2". The value of t1 indicates the degree that + In pipeline parallel mode, we can locate slow stage based on t_total - t1. Inside each stage, + we can locate slow card based on t_total - t2. The value of t1 indicates the degree that communication time between stages slow down the training. The value of t3 indicates the degree that communication inside each stage slow down the training. """ + step_num = len(step_time_list) is_pipeline_parallel = False comm_merged_timeline, _, comm_display_timeline = self._get_merged_time_list( communication_timeline, @@ -1279,22 +1280,28 @@ class AscendTimelineGenerator(BaseTimelineGenerator): display_name="free_time" )[1] - # Compute these five metrics mentioned above per step. - recieve_alone_time = self._compute_time_inside_step(receive_op_not_overlaped_timeline, step_time_list) - stage_time, computation_time = [], [] - comm_alone_time = self._compute_time_inside_step(comm_not_overlaped_timeline, step_time_list) - collective_comm_alone_time = self._compute_time_inside_step( - collective_comm_not_overlaped_timeline, - step_time_list - ) - for step in range(len(step_time_list)): - if is_pipeline_parallel: - stage_time.append(step_time_list[step][self._duration_idx] - recieve_alone_time[step]) - computation_time.append(step_time_list[step][self._duration_idx] - comm_alone_time[step]) + try: + # Compute these five metrics mentioned above per step. + recieve_alone_time = self._compute_time_inside_step(receive_op_not_overlaped_timeline, step_time_list) + stage_time, computation_time = [], [] + comm_alone_time = self._compute_time_inside_step(comm_not_overlaped_timeline, step_time_list) + collective_comm_alone_time = self._compute_time_inside_step( + collective_comm_not_overlaped_timeline, + step_time_list + ) + for step in range(step_num): + if is_pipeline_parallel: + stage_time.append(step_time_list[step][self._duration_idx] - recieve_alone_time[step]) + computation_time.append(step_time_list[step][self._duration_idx] - comm_alone_time[step]) + + metrices_per_step_list = [computation_time, comm_alone_time, stage_time, + recieve_alone_time, collective_comm_alone_time] + for metric in metrices_per_step_list: + metric.append(sum(metric) / step_num) + self._write_cluster_metrices(metrices_per_step_list, is_pipeline_parallel) + except IndexError as e: + logger.error(e) - metrices_per_step_list = [computation_time, comm_alone_time, stage_time, - recieve_alone_time, collective_comm_alone_time] - self._write_cluster_metrices(metrices_per_step_list, is_pipeline_parallel) res_timeline = [] res_timeline.extend(comm_not_overlaped_timeline) res_timeline.extend(aicore_display_timeline) @@ -1304,21 +1311,27 @@ class AscendTimelineGenerator(BaseTimelineGenerator): return res_timeline def _write_cluster_metrices(self, metrices, is_pipeline_parallel): - """Write cluster metric""" - # Note that cluster bottleneck analyse do not support offline parse, + """Write cluster metric.""" + # Note that the feature of cluster bottleneck analyse is not supported in offline parse mode, # due to that parallel context is not set. try: parallel_mode = get_auto_parallel_context("parallel_mode") stage_num = get_auto_parallel_context("pipeline_stages") - rank_size = get_group_size() except RuntimeError: - logger.error("[profiler] cluster bottleneck analyse do not support offline parse.") + logger.warning("[profiler] the feature of cluster bottleneck analyse " + "is not supported in offline parse mode.") + parallel_mode = "data_parallel" + stage_num = 1 + if stage_num > 1: + parallel_mode = "pipeline-parallel" + elif parallel_mode != "data_parallel": + parallel_mode = "model-parallel" + else: parallel_mode = "data-parallel" - stage_num, rank_size = 1, 1 cluster_analyse_file_path = os.path.join( self._profiling_dir, - self._cluster_analyse_filename.format(parallel_mode, stage_num, rank_size, self._device_id) + self._cluster_analyse_filename.format(parallel_mode, stage_num, self._rank_size, self._device_id) ) cluster_analyse_file_path = validate_and_normalize_path(cluster_analyse_file_path) @@ -1342,7 +1355,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator): raise ProfilerIOException def _compute_time_inside_step(self, metric_timeline, step_time_list): - """Compute per step time of metric_timeline""" + """Compute per step time of metric_timeline.""" per_step_time_list = [] step = 0 cur_step_metric_time = 0 diff --git a/mindspore/profiler/profiling.py b/mindspore/profiler/profiling.py index e766a0a7743..80ea4632c4f 100644 --- a/mindspore/profiler/profiling.py +++ b/mindspore/profiler/profiling.py @@ -20,7 +20,7 @@ import json from enum import Enum from mindspore import log as logger, context -from mindspore.communication.management import GlobalComm, release, get_rank +from mindspore.communication.management import GlobalComm, release, get_rank, get_group_size import mindspore._c_expression as c_expression from mindspore.dataset.core.config import _stop_dataset_profiler from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ @@ -243,6 +243,9 @@ class Profiler: def _ascend_analyse(self): """Collect and analyse ascend performance data""" + self._rank_size = 1 + if GlobalComm.INITED: + self._rank_size = get_group_size() release() job_id = self._get_profiling_job_id() @@ -446,7 +449,7 @@ class Profiler: optime_parser (OPComputeTimeParserParser): The parser instance for AI Core operator execution time calculation. """ - timeline_analyser = AscendTimelineGenerator(self._output_path, self._rank_id) + timeline_analyser = AscendTimelineGenerator(self._output_path, self._rank_id, self._rank_size) # Get framework info integrator = Integrator(self._output_path, self._rank_id) aicore_detail_data = integrator.get_aicore_detail_data()