!22542 Bugfix for scope-level flops data cannot display

Merge pull request !22542 from gzhcv/CommunicationOpNotOverlapped
This commit is contained in:
i-robot 2021-09-01 11:51:40 +00:00 committed by Gitee
commit 5f9e9d96ec
3 changed files with 52 additions and 33 deletions

View File

@ -238,10 +238,13 @@ class FlopsParser:
top_level_scope = scope_list[0]
suffix_name = ""
if op_name.startswith("Gradients/recompute_Default"):
suffix_name = "(recompute_Gradients)"
suffix_name = "recompute_Gradients"
else:
suffix_name = f"({top_level_scope})"
scope_list = list(map(lambda x: x + suffix_name, scope_list))
suffix_name = top_level_scope
# To distinguish the same scope name at different scope level and different top level scope,
# the scope level and top level scope is added.
for level, scope_name in enumerate(scope_list):
scope_list[level] = scope_name + f"({level}_{suffix_name})"
scope_list[0] = top_level_scope
# Add root node (refers to total flops).

View File

@ -21,7 +21,6 @@ from decimal import Decimal
from mindspore import log as logger
from mindspore.context import get_auto_parallel_context
from mindspore.communication.management import get_group_size
from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException, \
ProfilerFileNotFoundException, ProfilerRawFileException, ProfilerParamValueErrorException
from mindspore.profiler.common.util import query_latest_trace_time_file, to_int, to_millisecond
@ -991,7 +990,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
_timeline_summary_filename = 'ascend_timeline_summary_{}.json'
_cluster_analyse_filename = 'ascend_cluster_analyse_{}_{}_{}_{}.csv'
def __init__(self, profiling_dir, device_id):
def __init__(self, profiling_dir, device_id, rank_size):
self._profiling_dir = profiling_dir
self._device_id = device_id
self._tid_dict = {
@ -1000,6 +999,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
"communication": 8001,
"free_time": 8002
}
self._rank_size = rank_size
def _load_timeline_data(self):
"""Load timeline data from file."""
@ -1201,7 +1201,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
logger.debug('Finished adding framework info into timeline...')
def _produce_two_separated_timeline(self, timeline, op_name):
"""Produce two separated timeline based on op_name"""
"""Produce two separated timeline based on op_name."""
timeline_include_op_name = []
timeline_exclude_op_name = []
for time_item in timeline:
@ -1213,7 +1213,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
def _analyse_and_write_cluster_profiling_data(self, aicore_timeline, communication_timeline, step_time_list):
"""
Analyse the cluster communication and computation data, and write it to file.
Analyse the cluster communication and computation data, and write result to file.
To analyse the cluster performance bottleneck based on timeline, define the time of a training
step as "t_total", propose five metrics as follows:
@ -1222,11 +1222,12 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
3) The time that "communication" operators not overlapped by others(t2)
4) The time that consumed by computation(t_total - t2)
5) The time that "collective communication" operators not overlapped by others(t3)
In pipeline parallel mode, we can locate slow stage based on "t_total-t1". Inside each stage,
we can locate slow card based on "t_total-t2". The value of t1 indicates the degree that
In pipeline parallel mode, we can locate slow stage based on t_total - t1. Inside each stage,
we can locate slow card based on t_total - t2. The value of t1 indicates the degree that
communication time between stages slow down the training. The value of t3 indicates the degree
that communication inside each stage slow down the training.
"""
step_num = len(step_time_list)
is_pipeline_parallel = False
comm_merged_timeline, _, comm_display_timeline = self._get_merged_time_list(
communication_timeline,
@ -1279,22 +1280,28 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
display_name="free_time"
)[1]
# Compute these five metrics mentioned above per step.
recieve_alone_time = self._compute_time_inside_step(receive_op_not_overlaped_timeline, step_time_list)
stage_time, computation_time = [], []
comm_alone_time = self._compute_time_inside_step(comm_not_overlaped_timeline, step_time_list)
collective_comm_alone_time = self._compute_time_inside_step(
collective_comm_not_overlaped_timeline,
step_time_list
)
for step in range(len(step_time_list)):
if is_pipeline_parallel:
stage_time.append(step_time_list[step][self._duration_idx] - recieve_alone_time[step])
computation_time.append(step_time_list[step][self._duration_idx] - comm_alone_time[step])
try:
# Compute these five metrics mentioned above per step.
recieve_alone_time = self._compute_time_inside_step(receive_op_not_overlaped_timeline, step_time_list)
stage_time, computation_time = [], []
comm_alone_time = self._compute_time_inside_step(comm_not_overlaped_timeline, step_time_list)
collective_comm_alone_time = self._compute_time_inside_step(
collective_comm_not_overlaped_timeline,
step_time_list
)
for step in range(step_num):
if is_pipeline_parallel:
stage_time.append(step_time_list[step][self._duration_idx] - recieve_alone_time[step])
computation_time.append(step_time_list[step][self._duration_idx] - comm_alone_time[step])
metrices_per_step_list = [computation_time, comm_alone_time, stage_time,
recieve_alone_time, collective_comm_alone_time]
for metric in metrices_per_step_list:
metric.append(sum(metric) / step_num)
self._write_cluster_metrices(metrices_per_step_list, is_pipeline_parallel)
except IndexError as e:
logger.error(e)
metrices_per_step_list = [computation_time, comm_alone_time, stage_time,
recieve_alone_time, collective_comm_alone_time]
self._write_cluster_metrices(metrices_per_step_list, is_pipeline_parallel)
res_timeline = []
res_timeline.extend(comm_not_overlaped_timeline)
res_timeline.extend(aicore_display_timeline)
@ -1304,21 +1311,27 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
return res_timeline
def _write_cluster_metrices(self, metrices, is_pipeline_parallel):
"""Write cluster metric"""
# Note that cluster bottleneck analyse do not support offline parse,
"""Write cluster metric."""
# Note that the feature of cluster bottleneck analyse is not supported in offline parse mode,
# due to that parallel context is not set.
try:
parallel_mode = get_auto_parallel_context("parallel_mode")
stage_num = get_auto_parallel_context("pipeline_stages")
rank_size = get_group_size()
except RuntimeError:
logger.error("[profiler] cluster bottleneck analyse do not support offline parse.")
logger.warning("[profiler] the feature of cluster bottleneck analyse "
"is not supported in offline parse mode.")
parallel_mode = "data_parallel"
stage_num = 1
if stage_num > 1:
parallel_mode = "pipeline-parallel"
elif parallel_mode != "data_parallel":
parallel_mode = "model-parallel"
else:
parallel_mode = "data-parallel"
stage_num, rank_size = 1, 1
cluster_analyse_file_path = os.path.join(
self._profiling_dir,
self._cluster_analyse_filename.format(parallel_mode, stage_num, rank_size, self._device_id)
self._cluster_analyse_filename.format(parallel_mode, stage_num, self._rank_size, self._device_id)
)
cluster_analyse_file_path = validate_and_normalize_path(cluster_analyse_file_path)
@ -1342,7 +1355,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
raise ProfilerIOException
def _compute_time_inside_step(self, metric_timeline, step_time_list):
"""Compute per step time of metric_timeline"""
"""Compute per step time of metric_timeline."""
per_step_time_list = []
step = 0
cur_step_metric_time = 0

View File

@ -20,7 +20,7 @@ import json
from enum import Enum
from mindspore import log as logger, context
from mindspore.communication.management import GlobalComm, release, get_rank
from mindspore.communication.management import GlobalComm, release, get_rank, get_group_size
import mindspore._c_expression as c_expression
from mindspore.dataset.core.config import _stop_dataset_profiler
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
@ -243,6 +243,9 @@ class Profiler:
def _ascend_analyse(self):
"""Collect and analyse ascend performance data"""
self._rank_size = 1
if GlobalComm.INITED:
self._rank_size = get_group_size()
release()
job_id = self._get_profiling_job_id()
@ -446,7 +449,7 @@ class Profiler:
optime_parser (OPComputeTimeParserParser): The parser instance for AI Core
operator execution time calculation.
"""
timeline_analyser = AscendTimelineGenerator(self._output_path, self._rank_id)
timeline_analyser = AscendTimelineGenerator(self._output_path, self._rank_id, self._rank_size)
# Get framework info
integrator = Integrator(self._output_path, self._rank_id)
aicore_detail_data = integrator.get_aicore_detail_data()