parsing the profiling data on ai cpu in binary

This commit is contained in:
gaocongli 2021-12-22 11:59:47 +08:00
parent 393b5ec11a
commit b9187f96da
3 changed files with 83 additions and 108 deletions
mindspore/python/mindspore/profiler
tests/ut/python/profiler/parser

View File

@ -1,4 +1,4 @@
# Copyright 2020 Huawei Technologies Co., Ltd
# Copyright 2020-2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,9 +17,48 @@ The parser for AI CPU preprocess data.
"""
import os
import stat
from collections import namedtuple
import struct
from mindspore.profiler.common.util import fwrite_format, get_file_join_name
from mindspore import log as logger
from mindspore.profiler.common.struct_type import StructType
AiCpuStruct = namedtuple(
'AiCpuStruct', ['magic_number', 'data_tag', 'stream_id', 'task_id', 'run_start', 'run_start_counter',
'compute_start', 'memcpy_start', 'memcpy_end', 'run_end', 'run_end_counter', 'thread',
'device', 'submit_tick', 'schedule_tick', 'tick_before_run', 'tick_after_fun', 'kernel_type',
'dispatch_time', 'total_time', 'FFTS_thread_id', 'version']
)
AI_CPU_STRUCT = dict(
magic_number=StructType.UINT16,
data_tag=StructType.UINT16,
stream_id=StructType.UINT16,
task_id=StructType.UINT16,
run_start=StructType.UINT64,
run_start_counter=StructType.UINT64,
compute_start=StructType.UINT64,
memcpy_start=StructType.UINT64,
memcpy_end=StructType.UINT64,
run_end=StructType.UINT64,
run_end_counter=StructType.UINT64,
thread=StructType.UINT32,
device=StructType.UINT32,
submit_tick=StructType.UINT64,
schedule_tick=StructType.UINT64,
tick_before_run=StructType.UINT64,
tick_after_fun=StructType.UINT64,
kernel_type=StructType.UINT32,
dispatch_time=StructType.UINT32,
total_time=StructType.UINT32,
FFTS_thread_id=StructType.UINT16,
version=StructType.UCHAR
)
class DataPreProcessParser:
@ -38,8 +77,10 @@ class DataPreProcessParser:
'dispatch_time(ms)', 'execution_time(ms)', 'run_start',
'run_end']
_ms_unit = 1000
_us_unit = 100 # Convert 10ns to 1us.
_task_id_threshold = 25000
def __init__(self, input_path, output_filename):
def __init__(self, input_path, output_filename, op_task_dict):
self._input_path = input_path
self._output_filename = output_filename
self._source_file_name = self._get_source_file()
@ -51,6 +92,8 @@ class DataPreProcessParser:
self._total_time_index = 6
self._result_list = []
self._min_cycle_counter = float('inf')
self._ai_cpu_len = 128
self._op_task_dict = op_task_dict
def _get_source_file(self):
"""Get log file name, which was created by ada service."""
@ -100,37 +143,13 @@ class DataPreProcessParser:
return
with open(self._source_file_name, 'rb') as ai_cpu_data:
ai_cpu_str = str(ai_cpu_data.read().replace(b'\n\x00', b' ___ ')
.replace(b'\x00', b' ___ '))[2:-1]
ai_cpu_lines = ai_cpu_str.split(" ___ ")
os.chmod(self._source_file_name, stat.S_IREAD | stat.S_IWRITE)
result_list = list()
ai_cpu_total_time_summary = 0
# Node serial number.
serial_number = 1
for i in range(len(ai_cpu_lines) - 1):
node_line = ai_cpu_lines[i]
thread_line = ai_cpu_lines[i + 1]
if "Node" in node_line and "Thread" in thread_line:
# Get the node data from node_line
result = self._get_kernel_result(
serial_number,
node_line.split(','),
thread_line.split(',')
)
content = ai_cpu_data.read()
if content[0:2].hex().upper() == "5A5A":
ai_cpu_total_time_summary, result_list = self.parser_binary_file(content)
else:
raise Exception("The data of profiler needs to be parsed using MindSpore r1.5 version")
if result is None:
continue
result_list.append(result)
# Calculate the total time.
total_time = result[2]
ai_cpu_total_time_summary += total_time
# Increase node serial number.
serial_number += 1
elif "Node" in node_line and "Thread" not in thread_line:
node_type_name = node_line.split(',')[0].split(':')[-1]
logger.warning("The node type:%s cannot find thread data", node_type_name)
os.chmod(self._source_file_name, stat.S_IREAD)
if result_list:
ai_cpu_total_time = format(ai_cpu_total_time_summary, '.6f')
@ -141,6 +160,37 @@ class DataPreProcessParser:
# For timeline display.
self._result_list = result_list
def parser_binary_file(self, content):
"""Parse binary format file."""
result_list = list()
ai_cpu_total_time_summary = 0
# Node serial number.
serial_number = 1
i = 0
ai_cpu_format = StructType.format(AI_CPU_STRUCT.values())
ai_cpu_size = StructType.sizeof(AI_CPU_STRUCT.values())
while i < len(content):
ai_cpu_data = struct.unpack(ai_cpu_format, content[i:i + ai_cpu_size])
ai_cpu = AiCpuStruct(*ai_cpu_data)
if ai_cpu.task_id < self._task_id_threshold:
node_type_name = f'{ai_cpu.stream_id}_{ai_cpu.task_id}'
if self._op_task_dict:
node_type_name = self._op_task_dict[node_type_name].split('/')[-1]
exe_time = (float(ai_cpu.run_end) - float(ai_cpu.run_start)) / self._ms_unit
total_time = ai_cpu.total_time / self._ms_unit
result_list.append([serial_number, node_type_name, total_time, ai_cpu.dispatch_time / self._ms_unit,
exe_time, ai_cpu.run_start_counter / self._us_unit,
ai_cpu.run_end_counter / self._us_unit])
ai_cpu_total_time_summary += total_time
# Increase node serial number.
serial_number += 1
i = i + self._ai_cpu_len
return ai_cpu_total_time_summary, result_list
def query_aicpu_data(self):
"""
Get execution time of AI CPU operator.

View File

@ -348,7 +348,7 @@ class Profiler:
output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._rank_id + ".txt"
output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
output_data_preprocess_aicpu = validate_and_normalize_path(output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu, op_task_dict)
logger.info("Profiling: analyzing the data preprocess data.")
aicpu_data_parser.execute()

View File

@ -1,75 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test the aicpu parser."""
import os
import tempfile
import shutil
from unittest import TestCase
from mindspore.profiler.parser.aicpu_data_parser import DataPreProcessParser
def get_result(file_path):
"""
Get result from the aicpu file.
Args:
file_path (str): The aicpu file path.
Returns:
list[list], the parsed aicpu information.
"""
result = []
file = None
try:
file = open(file_path, 'r')
result.append(file.read())
return result
finally:
if file:
file.close()
class TestAicpuParser(TestCase):
"""Test the class of Aicpu Parser."""
def setUp(self) -> None:
"""Initialization before test case execution."""
self.profiling_dir = os.path.realpath(os.path.join(os.path.dirname(__file__),
'../../../data/profiler_data/'
'JOB_AICPU/data'))
self.expect_dir = os.path.realpath(os.path.join(os.path.dirname(__file__),
'../../../data/profiler_data/'
'JOB_AICPU/expect'))
self.output_path = tempfile.mkdtemp(prefix='output_data_preprocess_aicpu_')
self.output_file = os.path.join(self.output_path, 'output_data_preprocess_aicpu_0.txt')
self.expect_file = os.path.join(self.expect_dir, 'output_data_preprocess_aicpu_0.txt')
def test_aicpu_parser(self):
"""Test the class of Aicpu Parser."""
data = DataPreProcessParser(self.profiling_dir, self.output_file)
data.execute()
expect_result = get_result(self.expect_file)
result = get_result(self.output_file)
shutil.rmtree(self.output_path)
assert expect_result == result
def test_aicpu_parser_file_not_exist(self):
"""Test the class of Aicpu Parser."""
profiling_dir = os.path.realpath(os.path.join(self.profiling_dir, 'data'))
data = DataPreProcessParser(profiling_dir, self.output_file)
data.execute()
shutil.rmtree(self.output_path)