modify the master build alarm

This commit is contained in:
臧庆香 2021-09-14 19:42:29 +08:00
parent 55a23dde43
commit ec03a76c66
7 changed files with 191 additions and 129 deletions

View File

@ -19,7 +19,6 @@ This module provides the utils.
"""
import os
# one sys count takes 10 ns, 1 ms has 100000 system count
import re
@ -156,6 +155,20 @@ def get_file_names(input_path, file_name):
return name_list
def parse_device_id(filename, device_id_list, profiler_file_prefix):
"""Parse device id from filename."""
items = filename.split("_")
if filename.startswith("step_trace_raw"):
device_num = ""
if len(items) > 3:
device_num = items[3]
else:
device_num = items[-1].split(".")[0] if items[-1].split(".") else ""
if device_num.isdigit() and '_'.join(items[:-1]) in profiler_file_prefix:
device_id_list.add(device_num)
def analyse_device_list_from_profiler_dir(profiler_dir):
"""
Analyse device list from profiler dir.
@ -171,17 +184,7 @@ def analyse_device_list_from_profiler_dir(profiler_dir):
device_id_list = set()
for _, _, filenames in os.walk(profiler_dir):
for filename in filenames:
if filename.startswith("step_trace_raw"):
items = filename.split("_")
device_num = ""
if len(items) > 3:
device_num = items[3]
else:
items = filename.split("_")
device_num = items[-1].split(".")[0] if items[-1].split(".") else ""
if device_num.isdigit() and '_'.join(items[:-1]) in profiler_file_prefix:
device_id_list.add(device_num)
parse_device_id(filename, device_id_list, profiler_file_prefix)
return sorted(list(device_id_list))
@ -303,4 +306,5 @@ def get_field_value(row_info, field_name, header, time_type='realtime'):
def get_options(options):
if options is None:
options = {}
return options

View File

@ -81,6 +81,14 @@ class HcclParser:
"""Parse communication info."""
self._parse_and_save(self._source_dir)
def _parse_communication_cost(self, operators_cost_info, info, operators_dict):
"""Parse communication cost."""
for key, value in operators_cost_info.items():
for item in value:
# index0:step_num
if info[0] == item[0]:
operators_dict[key] = item
def _parse_and_save(self, dir_path):
"""Parse and save communication info."""
communication_info_cache = list()
@ -91,11 +99,7 @@ class HcclParser:
communication_info_cache = self._merge_communication_info_by_step_num(communication_info_cache)
for info in communication_info_cache:
operators_dict = dict()
for key, value in operators_cost_info.items():
for item in value:
# index0:step_num
if info[0] == item[0]:
operators_dict[key] = item
self._parse_communication_cost(operators_cost_info, info, operators_dict)
info.append(operators_dict)
# Calculate device communication average.
device_communication_average_value = self._calculate_communication_average_value(communication_info_cache)
@ -380,6 +384,22 @@ class HcclParser:
total_wait_time += link_type_value[3]
return total_communication_time, total_wait_time
def _parse_link_cost(self, result_dict, key, link_type_dict):
"""Parse link cost."""
for link_type_key, link_type_value in link_type_dict.items():
if link_type_key == CommunicationInfo.RDMA.value:
# Divide information by thread.
rdma_infos = []
threads_dict = self._divide_communication_info_by_thread(link_type_value)
for thread_value in threads_dict.values():
rdma_info = self._calculate_adma_link_info(thread_value)
rdma_infos.append(rdma_info)
rdma_total_cost = np.sum(rdma_infos, axis=0).tolist()
result_dict[key][link_type_key] = rdma_total_cost
if link_type_key == CommunicationInfo.SDMA.value:
sdma_total_cost = self._calculate_sdma_link_info(link_type_value)
result_dict[key][link_type_key] = sdma_total_cost
def _calculate_src_dst_link_info(self, src_dst_dict: dict):
"""Calculate src dst link info."""
result_dict = dict()
@ -389,19 +409,7 @@ class HcclParser:
if not link_type_dict:
continue
result_dict[key] = dict()
for link_type_key, link_type_value in link_type_dict.items():
if link_type_key == CommunicationInfo.RDMA.value:
# Divide information by thread.
rdma_infos = []
threads_dict = self._divide_communication_info_by_thread(link_type_value)
for thread_value in threads_dict.values():
rdma_info = self._calculate_adma_link_info(thread_value)
rdma_infos.append(rdma_info)
rdma_total_cost = np.sum(rdma_infos, axis=0).tolist()
result_dict[key][link_type_key] = rdma_total_cost
if link_type_key == CommunicationInfo.SDMA.value:
sdma_total_cost = self._calculate_sdma_link_info(link_type_value)
result_dict[key][link_type_key] = sdma_total_cost
self._parse_link_cost(result_dict, key, link_type_dict)
return result_dict
@staticmethod
@ -498,20 +506,25 @@ class HcclParser:
link_average_info = HcclParser._calculate_link_value(link_info, calculate_type)
return [communication_cost_average, wait_cost_average, link_average_info]
@staticmethod
def _parser_link_dict(result_dict, src_dst_key, src_dst_value):
"""Parser link info to dict."""
if src_dst_key not in result_dict.keys():
result_dict[src_dst_key] = dict()
for link_key, link_value in src_dst_value.items():
if link_key not in result_dict[src_dst_key].keys():
result_dict[src_dst_key][link_key] = list()
result_dict[src_dst_key][link_key].append(link_value)
@staticmethod
def _calculate_link_value(link_info: list, calculate_type):
"""Calculate link average or total value."""
result_dict = dict()
for item in link_info:
for src_dst_key, src_dst_value in item.items():
if src_dst_key not in result_dict.keys():
result_dict[src_dst_key] = dict()
for link_key, link_value in src_dst_value.items():
if link_key not in result_dict[src_dst_key].keys():
result_dict[src_dst_key][link_key] = list()
result_dict[src_dst_key][link_key].append(link_value)
HcclParser._parser_link_dict(result_dict, src_dst_key, src_dst_value)
for src_dst_key, src_dst_value in result_dict.items():
for link_key, link_value in src_dst_value.items():
for link_key, _ in src_dst_value.items():
if calculate_type == 'average':
result_dict[src_dst_key][link_key] = np.mean(result_dict[src_dst_key][link_key], axis=0).tolist()
if calculate_type == 'total':

View File

@ -406,6 +406,7 @@ class Integrator:
Args:
filter_condition (dict): The filter condition.
"""
def _inner_filter(item: list):
return self._default_filter(item, filter_condition)
@ -523,7 +524,7 @@ class BaseTimelineGenerator:
_op_name_idx, _tid_idx, _start_time_idx, _duration_idx = 0, 1, 2, 3
_max_scope_name_num = 0
_host_cpu_process = 11000
_host_cpu_Op_label = 'HostCpuOps'
_host_cpu_op_label = 'HostCpuOps'
def write_timeline(self, size_limit=SIZE_LIMIT_DEFAULT):
"""Load data according to the parsed profiling files."""
@ -612,7 +613,7 @@ class BaseTimelineGenerator:
else:
return
if tid_name == self._host_cpu_Op_label:
if tid_name == self._host_cpu_op_label:
thread_name_meta_data['pid'] = self._host_cpu_process
thread_name_meta_data["tid"] = tid
@ -774,7 +775,7 @@ class GpuTimelineGenerator(BaseTimelineGenerator):
# remove the level of scope name which has a format like "0-conv2-Conv2d".
timeline_dict['name'] = "-".join(op_meta.op_name.split('-')[1:])
timeline_dict['scope_level'] = int(op_meta.op_name.split('-')[0])
elif op_meta.stream_id == self._host_cpu_Op_label:
elif op_meta.stream_id == self._host_cpu_op_label:
timeline_dict['pid'] = self._host_cpu_process
if len(timeline) > 4:
@ -1053,7 +1054,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
# remove the level of scope name which has a format like "0-conv2-Conv2d".
timeline_dict['name'] = "-".join(op_meta.op_name.split('-')[1:])
timeline_dict['scope_level'] = int(op_meta.op_name.split('-')[0])
elif op_meta.stream_id == self._host_cpu_Op_label:
elif op_meta.stream_id == self._host_cpu_op_label:
timeline_dict['pid'] = self._host_cpu_process
if op_meta.pid is None:
@ -1398,7 +1399,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
time_merged_segment_list = []
tid = self._tid_dict[display_name]
for time_item in time_list:
time_segment = list(map(float, time_item[self._start_time_idx:self._duration_idx+1]))
time_segment = list(map(float, time_item[self._start_time_idx:self._duration_idx + 1]))
time_segment[1] += time_segment[0]
if not time_merged_segment_list or \
time_segment[0] > time_merged_segment_list[-1]:

View File

@ -736,37 +736,36 @@ class BottleneckAnalyzer:
continue
if self.op_names[op_id] == "Batch":
pass
else:
in_op_id, out_q = self.__get_non_inline_child_recur(
op_id), self.queue_utilization_pct[op_id]
if in_op_id == self.op_id_not_exist and out_q != self.queue_usage_not_exist:
# This is a leaf node since input queue does not exist and output queue exists
if out_q < self._LEAF_OUTPUT_QUEUE_EMPTY_FREQ_PCT_MAXIMUM:
queue_usage_analysis.append(("Leaf op {} is using {}% of its output queue."
"Setting num_parallel_workers"
">{} might speed up I/O.").format(self.pipeline_ops[op_id],
out_q,
self.num_workers[op_id]))
elif self.op_names[op_id] == "DeviceQueue" and in_op_id != self.op_id_not_exist:
# if this is device_queue op,
if self.queue_empty_freq_pct[in_op_id] > self._DEVICEQUEUE_INPUT_QUEUE_EMPTY_FREQ_PCT_MAXIMUM:
queue_usage_analysis.append((
"{}'s input queue is empty {}% of the time. This might indicate dataset bottlenecks."
" Hence host cannot keep up with the device {}% of the time."
" Device waits whenever input queue is empty.").format(self.pipeline_ops[op_id],
self.queue_empty_freq_pct[in_op_id],
self.queue_empty_freq_pct[in_op_id]))
elif in_op_id != self.op_id_not_exist and out_q != self.queue_usage_not_exist:
in_q = self.queue_utilization_pct[in_op_id]
if in_q != self.queue_usage_not_exist and in_q - out_q > self._IN_OUT_QUEUE_UTIL_PCT_DIFF_MAXIMUM:
queue_usage_analysis.append((
"{}'s input queue usage={}% is greater output queue usage={}%."
" This indicates child op {} might be producing faster than its parent {} can consume."
" If this op has low CPU utilization, try increasing "
"prefetch_size or increasing num_workers.").format(self.pipeline_ops[op_id],
in_q, out_q, self.pipeline_ops[in_op_id],
self.pipeline_ops[op_id]))
continue
in_op_id, out_q = self.__get_non_inline_child_recur(
op_id), self.queue_utilization_pct[op_id]
if in_op_id == self.op_id_not_exist and out_q != self.queue_usage_not_exist:
# This is a leaf node since input queue does not exist and output queue exists
if out_q < self._LEAF_OUTPUT_QUEUE_EMPTY_FREQ_PCT_MAXIMUM:
queue_usage_analysis.append(("Leaf op {} is using {}% of its output queue."
"Setting num_parallel_workers"
">{} might speed up I/O.").format(self.pipeline_ops[op_id],
out_q,
self.num_workers[op_id]))
elif self.op_names[op_id] == "DeviceQueue" and in_op_id != self.op_id_not_exist:
# if this is device_queue op,
if self.queue_empty_freq_pct[in_op_id] > self._DEVICEQUEUE_INPUT_QUEUE_EMPTY_FREQ_PCT_MAXIMUM:
queue_usage_analysis.append((
"{}'s input queue is empty {}% of the time. This might indicate dataset bottlenecks."
" Hence host cannot keep up with the device {}% of the time."
" Device waits whenever input queue is empty.").format(self.pipeline_ops[op_id],
self.queue_empty_freq_pct[in_op_id],
self.queue_empty_freq_pct[in_op_id]))
elif in_op_id != self.op_id_not_exist and out_q != self.queue_usage_not_exist:
in_q = self.queue_utilization_pct[in_op_id]
if in_q != self.queue_usage_not_exist and in_q - out_q > self._IN_OUT_QUEUE_UTIL_PCT_DIFF_MAXIMUM:
queue_usage_analysis.append((
"{}'s input queue usage={}% is greater output queue usage={}%."
" This indicates child op {} might be producing faster than its parent {} can consume."
" If this op has low CPU utilization, try increasing "
"prefetch_size or increasing num_workers.").format(self.pipeline_ops[op_id],
in_q, out_q, self.pipeline_ops[in_op_id],
self.pipeline_ops[op_id]))
return queue_usage_analysis
def analyze_bottleneck(self):

View File

@ -24,6 +24,43 @@ from mindspore.profiler.common.validator.validate_path import \
class MinddataParser:
"""Minddata Aicpu Parser."""
@staticmethod
def parse_step_minddata_aicpu_data(one_step, result):
"""
Parse step mind_data ai_cpu data.
Args:
one_step (str): The mind_data step info text, it is one of two structures.
Type queue: node_name,queue_size,run_start,run_end
Type run: node_name,run_start,run_end,queue_size
result ([[node_name, node_start, node_end, queue_size]]): Step info list.
"""
if not one_step:
return
node_info = one_step.split(", ")
node_name, node_start, node_end, queue_size = "", 0, 0, 0
if node_info:
node_name = node_info[0].replace("Node:", "")
if len(node_info) > 3:
if "queue" in node_info[1]:
queue_size = node_info[1].replace("queue size:", "")
node_start = node_info[2].replace("Run start:", "")
node_end = node_info[3].replace("Run end:", "")
elif "Run" in node_info[1]:
queue_size = node_info[3].replace("queue size:", "")
node_start = node_info[1].replace("Run start:", "")
node_end = node_info[2].replace("Run end:", "")
queue_size = int(queue_size) if queue_size.isdigit() else queue_size
node_start = int(node_start) if node_start.isdigit() else node_start
node_end = int(node_end) if node_end.isdigit() else node_end
one_step_list = [node_name, node_start, node_end, queue_size]
result.append(one_step_list)
@staticmethod
def parse_minddata_aicpu_data(minddata_aicpu_source_path):
"""
@ -42,29 +79,7 @@ class MinddataParser:
source_data = source_data_file.read()
step_data = source_data.split("\x00")
for one_step in step_data:
if one_step:
node_info = one_step.split(", ")
node_name, node_start, node_end, queue_size = "", 0, 0, 0
if node_info:
node_name = node_info[0].replace("Node:", "")
if len(node_info) > 3 and "queue" in node_info[1]:
queue_size = node_info[1].replace("queue size:", "")
queue_size = int(queue_size) if queue_size.isdigit() else queue_size
node_start = node_info[2].replace("Run start:", "")
node_start = int(node_start) if node_start.isdigit() else node_start
node_end = node_info[3].replace("Run end:", "")
node_end = int(node_end) if node_end.isdigit() else node_end
elif len(node_info) > 3 and "Run" in node_info[1]:
queue_size = node_info[3].replace("queue size:", "")
queue_size = int(queue_size) if queue_size.isdigit() else queue_size
node_start = node_info[1].replace("Run start:", "")
node_start = int(node_start) if node_start.isdigit() else node_start
node_end = node_info[2].replace("Run end:", "")
node_end = int(node_end) if node_end.isdigit() else node_end
one_step_list = [node_name, node_start, node_end, queue_size]
result.append(one_step_list)
MinddataParser.parse_step_minddata_aicpu_data(one_step, result)
except OSError:
logger.error("Open get_next profiling file error.")

View File

@ -189,6 +189,16 @@ class BaseStepTraceParser:
except ValueError as err:
log.warning("Unable to parse file names: %s. %s", step_trace_files, err)
step_trace_files = []
else:
training_trace_files = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
if len(training_trace_files) >= 1:
log.warning("The training_trace file structure is changed, please upgrade "
"mindspore and regenerate profiling data")
file_paths = [os.path.join(input_dir, file) for file in step_trace_files]
log.info("Find %d step trace files.", len(file_paths))
@ -366,6 +376,19 @@ class BaseStepTraceParser:
class GpuStepTraceParser(BaseStepTraceParser):
"""The parser for gpu step trace data."""
def get_fp_bp(self, f_obj, all_step_fp, all_step_bp):
"""Parser the fp and bp."""
fp_start, bp_end = 0, 1
if self._is_gpu_kernel_async_launch:
for line in f_obj:
line = line.strip().split()
all_step_fp.append(line[1].split(',')[0])
all_step_bp.append(line[2].split(',')[0])
else:
lines = f_obj.readlines()
all_step_fp.append(lines[fp_start].split()[0])
all_step_bp.append(lines[bp_end].split()[0])
def record_point_info(self, source_file, output_path):
"""
Record point info into json.
@ -377,21 +400,12 @@ class GpuStepTraceParser(BaseStepTraceParser):
Returns:
dict, parsed point info.
"""
fp_start, bp_end = 0, 1
all_step_points = []
all_step_fp = []
all_step_bp = []
try:
with open(source_file, 'r') as f_obj:
if self._is_gpu_kernel_async_launch:
for line in f_obj:
line = line.strip().split()
all_step_fp.append(line[1].split(',')[0])
all_step_bp.append(line[2].split(',')[0])
else:
lines = f_obj.readlines()
all_step_fp.append(lines[fp_start].split()[0])
all_step_bp.append(lines[bp_end].split()[0])
self.get_fp_bp(f_obj, all_step_fp, all_step_bp)
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException
@ -478,6 +492,42 @@ class GpuStepTraceParser(BaseStepTraceParser):
self._record_average_info()
log.info("Finish to parse step trace file.")
def _parse_one_step(self, line):
"""
Parse step text line to dict obj.
Args:
line (str): The step trace line text, it contains five parts, each part is separated by a space.
part 1: start_op_name,start_op_time
part 2: fp_op_name,fp_time
part 3: bp_op_name,bp_time
part 4: end_op_name,end_time
part 5: [reduce_op_name,reduce1_start],it contains multiple reduce, each reduce is separated by a space.
"""
line = line.strip().split()
start_time = int(line[0].split(',')[1][:-1])
fp_time = int(line[1].split(',')[1][:-1])
bp_time = int(line[2].split(',')[1][:-1])
end_time = int(line[3].split(',')[1][:-1])
reduce_info = {}
reduce_time_info = []
for reduce_item in line[4:]:
# add communication op start and end time, time unit from ns to 10ns.
reduce_time_info.append(reduce_item.split(',')[1][:-1])
reduce_time_info.append(reduce_item.split(',')[2][:-1])
step_trace = {
'start': start_time,
'fp': fp_time,
'bp': bp_time,
'end': end_time
}
if reduce_time_info:
reduce_info['ops'] = reduce_time_info
step_trace['reduce'] = reduce_info
self._record_trace_event(step_trace)
def _parse_async_launch(self, source_file):
"""Parse source step trace files generated from async launch kernel."""
log.info("Start to parse step trace file.")
@ -486,27 +536,7 @@ class GpuStepTraceParser(BaseStepTraceParser):
try:
with open(source_file, 'r') as f_obj:
for line in f_obj:
line = line.strip().split()
start_time = int(line[0].split(',')[1][:-1])
fp_time = int(line[1].split(',')[1][:-1])
bp_time = int(line[2].split(',')[1][:-1])
end_time = int(line[3].split(',')[1][:-1])
reduce_info = {}
reduce_time_info = []
for reduce_item in line[4:]:
# add communication op start and end time, time unit from ns to 10ns.
reduce_time_info.append(reduce_item.split(',')[1][:-1])
reduce_time_info.append(reduce_item.split(',')[2][:-1])
step_trace = {
'start': start_time,
'fp': fp_time,
'bp': bp_time,
'end': end_time
}
if reduce_time_info:
reduce_info['ops'] = reduce_time_info
step_trace['reduce'] = reduce_info
self._record_trace_event(step_trace)
self._parse_one_step(line)
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)

View File

@ -135,8 +135,8 @@ class Profiler:
self._cpu_profiler.init(self._output_path)
self._cpu_profiler.step_profiling_enable(True)
if self._device_target and self._device_target == "GPU":
GPUProfiler = c_expression.GPUProfiler
self._gpu_profiler = GPUProfiler.get_instance()
gpu_profiler = c_expression.GPUProfiler
self._gpu_profiler = gpu_profiler.get_instance()
self._gpu_profiler.init(self._output_path)
self._gpu_profiler.step_profiling_enable(True)
if GlobalComm.WORLD_COMM_GROUP == "nccl_world_group":