!3075 Add profiler module.

Merge pull request !3075 from yuximiao/yuximiao_mindspore_profiler
This commit is contained in:
mindspore-ci-bot 2020-07-20 11:00:32 +08:00 committed by Gitee
commit be2cfa9ed6
22 changed files with 4136 additions and 0 deletions

View File

@ -216,6 +216,7 @@ install(
${CMAKE_SOURCE_DIR}/mindspore/common ${CMAKE_SOURCE_DIR}/mindspore/common
${CMAKE_SOURCE_DIR}/mindspore/ops ${CMAKE_SOURCE_DIR}/mindspore/ops
${CMAKE_SOURCE_DIR}/mindspore/communication ${CMAKE_SOURCE_DIR}/mindspore/communication
${CMAKE_SOURCE_DIR}/mindspore/profiler
DESTINATION ${INSTALL_PY_DIR} DESTINATION ${INSTALL_PY_DIR}
COMPONENT mindspore COMPONENT mindspore
) )

View File

@ -0,0 +1,27 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Profiler Module Introduction.
This module provides Python APIs to enable the profiling of MindSpore neural networks.
Users can import the mindspore.profiler.Profiler, initialize the Profiler object to start profiling,
and use Profiler.analyse() to stop profiling and analyse the results.
To visualize the profiling results, users can open mindspore Web, find the corresponding run
and click the profile link.
Now, Profiler supports the AICore operator analysis.
"""
from mindspore.profiler.profiling import Profiler
__all__ = ["Profiler"]

View File

@ -0,0 +1,14 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================

View File

@ -0,0 +1,14 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================

View File

@ -0,0 +1,85 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiler error code and messages."""
from enum import unique, Enum
_GENERAL_MASK = 0b00001 << 7
_PARSER_MASK = 0b00010 << 7
_ANALYSER_MASK = 0b00011 << 7
class ProfilerMgrErrors(Enum):
"""Enum definition for profiler errors"""
@unique
class ProfilerErrors(ProfilerMgrErrors):
"""Profiler error codes."""
# general error code
PARAM_VALUE_ERROR = 0 | _GENERAL_MASK
PATH_ERROR = 1 | _GENERAL_MASK
PARAM_TYPE_ERROR = 2 | _GENERAL_MASK
DIR_NOT_FOUND_ERROR = 3 | _GENERAL_MASK
FILE_NOT_FOUND_ERROR = 4 | _GENERAL_MASK
IO_ERROR = 5 | _GENERAL_MASK
# parser error code
DEVICE_ID_MISMATCH_ERROR = 0 | _PARSER_MASK
RAW_FILE_ERROR = 1 | _PARSER_MASK
STEP_NUM_NOT_SUPPORTED_ERROR = 2 | _PARSER_MASK
JOB_ID_MISMATCH_ERROR = 3 | _PARSER_MASK
# analyser error code
COLUMN_NOT_EXIST_ERROR = 0 | _ANALYSER_MASK
ANALYSER_NOT_EXIST_ERROR = 1 | _ANALYSER_MASK
DEVICE_ID_ERROR = 2 | _ANALYSER_MASK
OP_TYPE_ERROR = 3 | _ANALYSER_MASK
GROUP_CONDITION_ERROR = 4 | _ANALYSER_MASK
SORT_CONDITION_ERROR = 5 | _ANALYSER_MASK
FILTER_CONDITION_ERROR = 6 | _ANALYSER_MASK
COLUMN_NOT_SUPPORT_SORT_ERROR = 7 | _ANALYSER_MASK
PIPELINE_OP_NOT_EXIST_ERROR = 8 | _ANALYSER_MASK
@unique
class ProfilerErrorMsg(Enum):
"""Profiler error messages."""
# general error msg
PARAM_VALUE_ERROR = 'Param value error. {}'
PATH_ERROR = 'Path error. {}'
PARAM_TYPE_ERROR = 'Param type error. {}'
DIR_NOT_FOUND_ERROR = 'The dir <{}> not found.'
FILE_NOT_FOUND_ERROR = 'The file <{}> not found.'
IO_ERROR = 'Read or write file fail.'
# parser error msg
DEVICE_ID_MISMATCH_ERROR = 'The device ID mismatch.'
RAW_FILE_ERROR = 'Raw file error. {}'
STEP_NUM_NOT_SUPPORTED_ERROR = 'The step num must be in {}'
JOB_ID_MISMATCH_ERROR = 'The job id in the parameter is not the same as ' \
'in the training trace file. '
# analyser error msg
COLUMN_NOT_EXIST_ERROR = 'The column {} does not exist.'
ANALYSER_NOT_EXIST_ERROR = 'The analyser {} does not exist.'
DEIVICE_ID_ERROR = 'The device_id in search_condition error, {}'
FILTER_CONDITION_ERROR = 'The filter_condition in search_condition error, {}'
OP_TYPE_ERROR = 'The op_type in search_condition error, {}'
GROUP_CONDITION_ERROR = 'The group_condition in search_condition error, {}'
SORT_CONDITION_ERROR = 'The sort_condition in search_condition error, {}'
COLUMN_NOT_SUPPORT_SORT_ERROR = 'The column {} does not support to sort.'
PIPELINE_OP_NOT_EXIST_ERROR = 'The minddata pipeline operator {} does not exist.'

View File

@ -0,0 +1,287 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Definition of error code and relative messages in profiler module."""
from mindspore.profiler.common.exceptions.error_code import ProfilerErrors, \
ProfilerErrorMsg
class ProfilerException(Exception):
"""
Base class for Profilier exception.
Examples:
>>> raise ProfilerException(GeneralErrors.PATH_NOT_EXISTS_ERROR, 'path not exists')
"""
RUNTIME = 1
TYPE = 1
LEVEL = 0
SYSID = 42
def __init__(self, error, message, http_code=500):
"""
Initialization of ProfilerException.
Args:
error (Enum): Error value for specified case.
message (str): Description for exception.
http_code (int): Http code for exception. Default is 500.
"""
if isinstance(message, str):
message = ' '.join(message.split())
super(ProfilerException, self).__init__(message)
self.error = error
self.message = message
self.http_code = http_code
@property
def error_code(self):
"""
Transform exception no to Profiler error code.
code compose(4bytes):
runtime 2bits, type 2bits, level 3bits, sysid 8bits, modid 5bits, value 12bits.
num = ((0xFF & runtime) << 30) \
| ((0xFF & type) << 28) \
| ((0xFF & level) << 25) \
| ((0xFF & sysid) << 17) \
| ((0xFF & modid) << 12) \
| (0x0FFF & value)
Returns:
str, Hex string representing the composed Profiler error code.
"""
num = (((0xFF & self.RUNTIME) << 30)
| ((0xFF & self.TYPE) << 28)
| ((0xFF & self.LEVEL) << 25)
| ((0xFF & self.SYSID) << 17)
| ((0xFF & 6) << 12)
| (0x0FFF & self.error.value))
return hex(num)[2:].zfill(8).upper()
def __str__(self):
return '[{}] code: {}, msg: {}'.format(self.__class__.__name__, self.error_code, self.message)
class ProfilerParamValueErrorException(ProfilerException):
"""The parameter value error in profiler module."""
def __init__(self, msg):
super(ProfilerParamValueErrorException, self).__init__(
error=ProfilerErrors.PARAM_VALUE_ERROR,
message=ProfilerErrorMsg.PARAM_VALUE_ERROR.value.format(msg),
http_code=400
)
class ProfilerPathErrorException(ProfilerException):
"""The path error in profiler module."""
def __init__(self, msg):
super(ProfilerPathErrorException, self).__init__(
error=ProfilerErrors.PATH_ERROR,
message=ProfilerErrorMsg.PATH_ERROR.value.format(msg),
http_code=400
)
class ProfilerParamTypeErrorException(ProfilerException):
"""The parameter type error in profiler module."""
def __init__(self, msg):
super(ProfilerParamTypeErrorException, self).__init__(
error=ProfilerErrors.PARAM_TYPE_ERROR,
message=ProfilerErrorMsg.PARAM_TYPE_ERROR.value.format(msg),
http_code=400
)
class ProfilerDirNotFoundException(ProfilerException):
"""The dir not found exception in profiler module."""
def __init__(self, msg):
super(ProfilerDirNotFoundException, self).__init__(
error=ProfilerErrors.DIR_NOT_FOUND_ERROR,
message=ProfilerErrorMsg.DIR_NOT_FOUND_ERROR.value.format(msg),
http_code=400
)
class ProfilerFileNotFoundException(ProfilerException):
"""The file not found exception in profiler module."""
def __init__(self, msg):
super(ProfilerFileNotFoundException, self).__init__(
error=ProfilerErrors.FILE_NOT_FOUND_ERROR,
message=ProfilerErrorMsg.FILE_NOT_FOUND_ERROR.value.format(msg),
http_code=400
)
class ProfilerIOException(ProfilerException):
"""The IO exception in profiler module."""
def __init__(self):
super(ProfilerIOException, self).__init__(
error=ProfilerErrors.IO_ERROR,
message=ProfilerErrorMsg.IO_ERROR.value,
http_code=400
)
class ProfilerDeviceIdMismatchException(ProfilerException):
"""The device id mismatch exception in profiler module."""
def __init__(self):
super(ProfilerDeviceIdMismatchException, self).__init__(
error=ProfilerErrors.DEVICE_ID_MISMATCH_ERROR,
message=ProfilerErrorMsg.DEVICE_ID_MISMATCH_ERROR.value,
http_code=400
)
class ProfilerRawFileException(ProfilerException):
"""The raw file exception in profiler module."""
def __init__(self, msg):
super(ProfilerRawFileException, self).__init__(
error=ProfilerErrors.RAW_FILE_ERROR,
message=ProfilerErrorMsg.RAW_FILE_ERROR.value.format(msg),
http_code=400
)
class ProfilerColumnNotExistException(ProfilerException):
"""The column does not exist exception in profiler module."""
def __init__(self, msg):
super(ProfilerColumnNotExistException, self).__init__(
error=ProfilerErrors.COLUMN_NOT_EXIST_ERROR,
message=ProfilerErrorMsg.COLUMN_NOT_EXIST_ERROR.value.format(msg),
http_code=400
)
class ProfilerAnalyserNotExistException(ProfilerException):
"""The analyser in profiler module."""
def __init__(self, msg):
super(ProfilerAnalyserNotExistException, self).__init__(
error=ProfilerErrors.ANALYSER_NOT_EXIST_ERROR,
message=ProfilerErrorMsg.ANALYSER_NOT_EXIST_ERROR.value.format(msg),
http_code=400
)
class ProfilerDeviceIdException(ProfilerException):
"""The parameter device_id error in profiler module."""
def __init__(self, msg):
super(ProfilerDeviceIdException, self).__init__(
error=ProfilerErrors.DEVICE_ID_ERROR,
message=ProfilerErrorMsg.DEIVICE_ID_ERROR.value.format(msg),
http_code=400
)
class ProfilerOpTypeException(ProfilerException):
"""The parameter op_type error in profiler module."""
def __init__(self, msg):
super(ProfilerOpTypeException, self).__init__(
error=ProfilerErrors.OP_TYPE_ERROR,
message=ProfilerErrorMsg.OP_TYPE_ERROR.value.format(msg),
http_code=400
)
class ProfilerSortConditionException(ProfilerException):
"""The parameter sort_condition error in profiler module."""
def __init__(self, msg):
super(ProfilerSortConditionException, self).__init__(
error=ProfilerErrors.SORT_CONDITION_ERROR,
message=ProfilerErrorMsg.SORT_CONDITION_ERROR.value.format(msg),
http_code=400
)
class ProfilerFilterConditionException(ProfilerException):
"""The parameter filer_condition error in profiler module."""
def __init__(self, msg):
super(ProfilerFilterConditionException, self).__init__(
error=ProfilerErrors.FILTER_CONDITION_ERROR,
message=ProfilerErrorMsg.FILTER_CONDITION_ERROR.value.format(msg),
http_code=400
)
class ProfilerGroupConditionException(ProfilerException):
"""The parameter group_condition error in profiler module."""
def __init__(self, msg):
super(ProfilerGroupConditionException, self).__init__(
error=ProfilerErrors.GROUP_CONDITION_ERROR,
message=ProfilerErrorMsg.GROUP_CONDITION_ERROR.value.format(msg),
http_code=400
)
class ProfilerColumnNotSupportSortException(ProfilerException):
"""The column does not support to sort error in profiler module."""
def __init__(self, msg):
super(ProfilerColumnNotSupportSortException, self).__init__(
error=ProfilerErrors.COLUMN_NOT_SUPPORT_SORT_ERROR,
message=ProfilerErrorMsg.COLUMN_NOT_SUPPORT_SORT_ERROR.value.format(msg),
http_code=400
)
class StepNumNotSupportedException(ProfilerException):
"""The step number error in profiler module."""
def __init__(self, msg):
super(StepNumNotSupportedException, self).__init__(
error=ProfilerErrors.STEP_NUM_NOT_SUPPORTED_ERROR,
message=ProfilerErrorMsg.STEP_NUM_NOT_SUPPORTED_ERROR.value.format(msg),
http_code=400
)
class JobIdMismatchException(ProfilerException):
"""The Job ID mismatch error in profiler module."""
def __init__(self):
super(JobIdMismatchException, self).__init__(
error=ProfilerErrors.JOB_ID_MISMATCH_ERROR,
message=ProfilerErrorMsg.JOB_ID_MISMATCH_ERROR.value,
http_code=400
)
class ProfilerPipelineOpNotExistException(ProfilerException):
"""The minddata pipeline operator does not exist error in profiler module."""
def __init__(self, msg):
super(ProfilerPipelineOpNotExistException, self).__init__(
error=ProfilerErrors.PIPELINE_OP_NOT_EXIST_ERROR,
message=ProfilerErrorMsg.PIPELINE_OP_NOT_EXIST_ERROR.value.format(msg),
http_code=400
)

View File

@ -0,0 +1,295 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Profiler util.
This module provides the utils.
"""
import os
# one sys count takes 10 ns, 1 ms has 100000 system count
import re
PER_MS_SYSCNT = 100000
def to_int(param, param_name):
"""
Transfer param to int type.
Args:
param (Any): A param transformed.
param_name (str): Param name.
Returns:
int, value after transformed.
"""
try:
param = int(param)
except ValueError:
raise TypeError('Must be Integer: ' + param_name)
return param
def fwrite_format(output_data_path, data_source=None, is_print=False, is_start=False):
"""
Write data to the output file.
Args:
output_data_path (str): The output file path of the data.
data_source (str, list, tuple): The data to write.
is_print (bool): whether to print the data to stdout.
is_start (bool): Whether is the first line of the output file, will remove the old file if True."
"""
if is_start is True and os.path.exists(output_data_path):
os.remove(output_data_path)
if isinstance(data_source, str) and data_source.startswith("title:"):
title_label = '=' * 20
data_source = title_label + data_source[6:] + title_label
with open(output_data_path, 'a+') as f:
if isinstance(data_source, (list, tuple)):
for raw_data in data_source:
if isinstance(raw_data, (list, tuple)):
raw_data = map(str, raw_data)
raw_data = " ".join(raw_data)
f.write(raw_data)
f.write("\n")
else:
f.write(data_source)
f.write("\n")
if is_print:
if isinstance(data_source, (list, tuple)):
for raw_data in data_source:
if isinstance(raw_data, (list, tuple)):
raw_data = map(str, raw_data)
raw_data = " ".join(raw_data)
print(raw_data)
else:
print(data_source)
def get_log_slice_id(file_name):
pattern = re.compile(r'(?<=slice_)\d+')
slice_list = pattern.findall(file_name)
index = re.findall(r'\d+', slice_list[0])
return int(index[0])
def get_file_join_name(input_path, file_name):
"""
Search files under the special path, and will join all the files to one file.
Args:
input_path (str): The source path, will search files under it.
file_name (str): The target of the filename, such as 'hwts.log.data.45.dev'.
Returns:
str, the join file name.
"""
name_list = []
file_join_name = ''
input_path = os.path.realpath(input_path)
if os.path.exists(input_path):
files = os.listdir(input_path)
for f in files:
if file_name in f and not f.endswith('.done') and not f.endswith('.join') \
and not f.endswith('.zip'):
name_list.append(f)
# resort name_list
name_list.sort(key=get_log_slice_id)
if len(name_list) == 1:
file_join_name = os.path.join(input_path, name_list[0])
elif len(name_list) > 1:
file_join_name = os.path.join(input_path, '%s.join' % file_name)
if os.path.exists(file_join_name):
os.remove(file_join_name)
with open(file_join_name, 'ab') as bin_data:
for i in name_list:
file = input_path + os.sep + i
with open(file, 'rb') as txt:
bin_data.write(txt.read())
return file_join_name
def get_file_names(input_path, file_name):
"""
Search files under the special path.
Args:
input_path (str): The source path, will search files under it.
file_name (str): The target of the filename, such as 'host_start_log'.
Returns:
list, file name list.
"""
input_path = os.path.realpath(input_path)
name_list = []
if os.path.exists(input_path):
files = os.listdir(input_path)
for f in files:
if file_name in f and not f.endswith('.done') \
and not f.endswith('.zip'):
name_list.append(f)
break
return name_list
def analyse_device_list_from_profiler_dir(profiler_dir):
"""
Analyse device list from profiler dir.
Args:
profiler_dir (str): The profiler data dir.
Returns:
list, the device_id list.
"""
profiler_file_prefix = ["timeline_display", "output_op_compute_time"]
device_id_list = set()
for _, _, filenames in os.walk(profiler_dir):
for filename in filenames:
if filename.startswith("step_trace_raw"):
items = filename.split("_")
device_num = ""
if len(items) > 3:
device_num = items[3]
else:
items = filename.split("_")
device_num = items[-1].split(".")[0] if items[-1].split(".") else ""
if device_num.isdigit() and '_'.join(items[:-1]) in profiler_file_prefix:
device_id_list.add(device_num)
return sorted(list(device_id_list))
def query_latest_trace_time_file(profiler_dir, device_id=0):
"""
Query the latest trace time file.
Args:
profiler_dir (str): The profiler directory.
device_id (int): The id of device.
Returns:
str, the latest trace time file path.
"""
files = os.listdir(profiler_dir)
target_file = f'step_trace_raw_{device_id}_detail_time.csv'
try:
latest_file = max(
filter(
lambda file: file == target_file,
files
),
key=lambda file: os.stat(os.path.join(profiler_dir, file)).st_mtime
)
except ValueError:
return None
return os.path.join(profiler_dir, latest_file)
def query_step_trace_file(profiler_dir):
"""
Query for all step trace file.
Args:
profiler_dir (str): The directory that contains all step trace files.
Returns:
str, the file path of step trace time.
"""
files = os.listdir(profiler_dir)
training_trace_file = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
if training_trace_file:
return os.path.join(profiler_dir, training_trace_file[0])
return None
def get_summary_for_step_trace(average_info, header):
"""The property of summary info."""
if not average_info or not header:
return {}
total_time = get_field_value(average_info, 'total', header)
iteration_interval = get_field_value(average_info, 'iteration_interval',
header)
fp_and_bp = get_field_value(average_info, 'fp_and_bp', header)
tail = get_field_value(average_info, 'tail', header)
summary = {
'total_time': total_time,
'iteration_interval': iteration_interval,
'iteration_interval_percent': calculate_percent(iteration_interval, total_time),
'fp_and_bp': fp_and_bp,
'fp_and_bp_percent': calculate_percent(fp_and_bp, total_time),
'tail': tail,
'tail_percent': calculate_percent(tail, total_time)
}
return summary
def calculate_percent(partial, total):
"""Calculate percent value."""
if total:
percent = round(partial / total * 100, 2)
else:
percent = 0
return f'{percent}%'
def to_millisecond(sys_count, limit=4):
"""Translate system count to millisecond."""
return round(sys_count / PER_MS_SYSCNT, limit)
def get_field_value(row_info, field_name, header, time_type='realtime'):
"""
Extract basic info through row_info.
Args:
row_info (list): The list of data info in one row.
field_name (str): The name in header.
header (list[str]): The list of field names.
time_type (str): The type of value, `realtime` or `systime`. Default: `realtime`.
Returns:
dict, step trace info in dict format.
"""
field_index = header.index(field_name)
value = row_info[field_index]
value = to_int(value, field_name)
if time_type == 'realtime':
value = to_millisecond(value)
return value
def get_options(options):
if options is None:
options = {}
return options

View File

@ -0,0 +1,14 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================

View File

@ -0,0 +1,26 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiler check parameters."""
def check_bool(input_param, param_name):
"""Bool type judgment."""
if isinstance(input_param, bool):
return input_param
raise TypeError("Parameter {}: input type must be bool!".format(param_name))
def check_subgraph(subgraph):
"""Check subgraph."""
if subgraph in ("all", "Default", "Gradients"):
return subgraph
raise ValueError("subgraph must be all or Default or Gradients, but got {}.".format(subgraph))

View File

@ -0,0 +1,307 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Validate the profiler parameters."""
import os
import sys
from mindspore.profiler.common.exceptions.exceptions import ProfilerParamTypeErrorException, \
ProfilerDeviceIdException, ProfilerOpTypeException, \
ProfilerSortConditionException, ProfilerFilterConditionException, \
ProfilerGroupConditionException, ProfilerParamValueErrorException
from mindspore import log
from mindspore.profiler.common.util import to_int
AICORE_TYPE_COL = ["op_type", "execution_time", "execution_frequency", "precent"]
AICORE_DETAIL_COL = ["op_name", "op_type", "avg_execution_time", "subgraph", "full_op_name"]
AICPU_COL = ["serial_number", "op_type", "total_time", "dispatch_time", "run_start",
"run_end"]
MINDDATA_PIPELINE_COL = [
'op_id', 'op_type', 'num_workers', 'output_queue_average_size',
'output_queue_length', 'output_queue_usage_rate', 'sample_interval',
'parent_id'
]
def validate_condition(search_condition):
"""
Verify the param in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
Raises:
ProfilerParamTypeErrorException: If the type of the param in search_condition is invalid.
ProfilerDeviceIdException: If the device_id param in search_condition is invalid.
ProfilerOpTypeException: If the op_type param in search_condition is invalid.
ProfilerGroupConditionException: If the group_condition param in search_condition is invalid.
ProfilerSortConditionException: If the sort_condition param in search_condition is invalid.
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
if not isinstance(search_condition, dict):
log.error("Invalid search_condition type, it should be dict.")
raise ProfilerParamTypeErrorException(
"Invalid search_condition type, it should be dict.")
if "device_id" in search_condition:
device_id = search_condition.get("device_id")
if not isinstance(device_id, str):
raise ProfilerDeviceIdException("Invalid device_id type, it should be str.")
if "op_type" in search_condition:
op_type = search_condition.get("op_type")
if op_type == "aicpu":
search_scope = AICPU_COL
elif op_type == "aicore_type":
search_scope = AICORE_TYPE_COL
elif op_type == "aicore_detail":
search_scope = AICORE_DETAIL_COL
else:
raise ProfilerOpTypeException("The op_type must in ['aicpu', 'aicore_type', 'aicore_detail']")
else:
raise ProfilerOpTypeException("The op_type must in ['aicpu', 'aicore_type', 'aicore_detail']")
if "group_condition" in search_condition:
validate_group_condition(search_condition)
if "sort_condition" in search_condition:
validate_sort_condition(search_condition, search_scope)
if "filter_condition" in search_condition:
validate_filter_condition(search_condition)
def validate_group_condition(search_condition):
"""
Verify the group_condition in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
Raises:
ProfilerGroupConditionException: If the group_condition param in search_condition is invalid.
"""
group_condition = search_condition.get("group_condition")
if not isinstance(group_condition, dict):
raise ProfilerGroupConditionException("The group condition must be dict.")
if "limit" in group_condition:
limit = group_condition.get("limit", 10)
if isinstance(limit, bool) \
or not isinstance(group_condition.get("limit"), int):
log.error("The limit must be int.")
raise ProfilerGroupConditionException("The limit must be int.")
if limit < 1 or limit > 100:
raise ProfilerGroupConditionException("The limit must in [1, 100].")
if "offset" in group_condition:
offset = group_condition.get("offset", 0)
if isinstance(offset, bool) \
or not isinstance(group_condition.get("offset"), int):
log.error("The offset must be int.")
raise ProfilerGroupConditionException("The offset must be int.")
if offset < 0:
raise ProfilerGroupConditionException("The offset must ge 0.")
if offset > 1000000:
raise ProfilerGroupConditionException("The offset must le 1000000.")
def validate_sort_condition(search_condition, search_scope):
"""
Verify the sort_condition in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
search_scope (list): The search scope.
Raises:
ProfilerSortConditionException: If the sort_condition param in search_condition is invalid.
"""
sort_condition = search_condition.get("sort_condition")
if not isinstance(sort_condition, dict):
raise ProfilerSortConditionException("The sort condition must be dict.")
if "name" in sort_condition:
sorted_name = sort_condition.get("name", "")
err_msg = "The sorted_name must be in {}".format(search_scope)
if not isinstance(sorted_name, str):
log.error("Wrong sorted name type.")
raise ProfilerSortConditionException("Wrong sorted name type.")
if sorted_name not in search_scope:
log.error(err_msg)
raise ProfilerSortConditionException(err_msg)
if "type" in sort_condition:
sorted_type_param = ['ascending', 'descending']
sorted_type = sort_condition.get("type")
if sorted_type and sorted_type not in sorted_type_param:
err_msg = "The sorted type must be ascending or descending."
log.error(err_msg)
raise ProfilerSortConditionException(err_msg)
def validate_op_filter_condition(op_condition, value_type=str, value_type_msg='str'):
"""
Verify the op_condition in filter_condition is valid or not.
Args:
op_condition (dict): The op_condition in search_condition.
value_type (type): The value type. Default: str.
value_type_msg (str): The value type message. Default: 'str'.
Raises:
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
filter_key = ["in", "not_in", "partial_match_str_in"]
if not isinstance(op_condition, dict):
raise ProfilerFilterConditionException("The filter condition value must be dict.")
for key, value in op_condition.items():
if not isinstance(key, str):
raise ProfilerFilterConditionException("The filter key must be str")
if not isinstance(value, list):
raise ProfilerFilterConditionException("The filter value must be list")
if key not in filter_key:
raise ProfilerFilterConditionException("The filter key must in {}.".format(filter_key))
for item in value:
if not isinstance(item, value_type):
raise ProfilerFilterConditionException(
"The item in filter value must be {}.".format(value_type_msg)
)
def validate_filter_condition(search_condition):
"""
Verify the filter_condition in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
Raises:
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
filter_condition = search_condition.get("filter_condition")
if not isinstance(filter_condition, dict):
raise ProfilerFilterConditionException("The filter condition must be dict.")
if filter_condition:
if "op_type" in filter_condition:
op_type_condition = filter_condition.get("op_type")
validate_op_filter_condition(op_type_condition)
if "op_name" in filter_condition:
op_name_condition = filter_condition.get("op_name")
validate_op_filter_condition(op_name_condition)
if "op_type" not in filter_condition and "op_name" not in filter_condition:
raise ProfilerFilterConditionException("The key of filter_condition is not support")
def validate_and_set_job_id_env(job_id_env):
"""
Validate the job id and set it in environment.
Args:
job_id_env (str): The id that to be set in environment parameter `JOB_ID`.
Returns:
int, the valid job id env.
"""
if job_id_env is None:
return job_id_env
# get job_id_env in int type
valid_id = to_int(job_id_env, 'job_id_env')
# check the range of valid_id
if valid_id and 255 < valid_id < sys.maxsize:
os.environ['JOB_ID'] = job_id_env
else:
log.warning("Invalid job_id_env %s. The value should be int and between 255 and %s. Use"
"default job id env instead.",
job_id_env, sys.maxsize)
return valid_id
def validate_ui_proc(proc_name):
"""
Validate proc name in restful request.
Args:
proc_name (str): The proc name to query. Acceptable value is in
[`iteration_interval`, `fp_and_bp`, `tail`].
Raises:
ProfilerParamValueErrorException: If the proc_name is invalid.
"""
accept_names = ['iteration_interval', 'fp_and_bp', 'tail']
if proc_name not in accept_names:
log.error("Invalid proc_name. The proc_name for restful api is in %s", accept_names)
raise ProfilerParamValueErrorException(f'proc_name should be in {accept_names}.')
def validate_minddata_pipeline_condition(condition):
"""
Verify the minddata pipeline search condition is valid or not.
Args:
condition (dict): The minddata pipeline search condition.
Raises:
ProfilerParamTypeErrorException: If the type of the search condition is
invalid.
ProfilerDeviceIdException: If the device_id param in the search
condition is invalid.
ProfilerGroupConditionException: If the group_condition param in the
search condition is invalid.
ProfilerSortConditionException: If the sort_condition param in the
search condition is invalid.
ProfilerFilterConditionException: If the filter_condition param in the
search condition is invalid.
"""
if not isinstance(condition, dict):
log.error("Invalid condition type, it should be dict.")
raise ProfilerParamTypeErrorException(
"Invalid condition type, it should be dict."
)
if "device_id" in condition:
device_id = condition.get("device_id")
if not isinstance(device_id, str):
raise ProfilerDeviceIdException(
"Invalid device_id type, it should be str."
)
if "group_condition" in condition:
validate_group_condition(condition)
if "sort_condition" in condition:
validate_sort_condition(condition, MINDDATA_PIPELINE_COL)
if "filter_condition" in condition:
filter_condition = condition.get('filter_condition')
if not isinstance(filter_condition, dict):
raise ProfilerFilterConditionException(
"The filter condition must be dict."
)
for key, value in filter_condition.items():
if key == 'op_id':
validate_op_filter_condition(
value, value_type=int, value_type_msg='int'
)
elif key == 'op_type':
validate_op_filter_condition(value)
elif key == 'is_display_op_detail':
if not isinstance(value, bool):
raise ProfilerFilterConditionException(
"The condition must be bool."
)
else:
raise ProfilerFilterConditionException(
"The key {} of filter_condition is not support.".format(key)
)

View File

@ -0,0 +1,60 @@
# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Validate the input path."""
import os
def validate_and_normalize_path(
path,
check_absolute_path=False,
allow_parent_dir=False,
):
"""
Validates path and returns its normalized form.
If path has a valid scheme, treat path as url, otherwise consider path a
unix local path.
Note:
File scheme (rfc8089) is currently not supported.
Args:
path (str): Path to be normalized.
check_absolute_path (bool): Whether check path scheme is supported.
allow_parent_dir (bool): Whether allow parent dir in path.
Returns:
str, normalized path.
"""
if not path:
raise RuntimeError("The path is invalid!")
path_str = str(path)
if not allow_parent_dir:
path_components = path_str.split("/")
if ".." in path_components:
raise RuntimeError("The path is invalid!")
# path does not have valid schema, treat it as unix local path.
if check_absolute_path:
if not path_str.startswith("/"):
raise RuntimeError("The path is invalid!")
try:
# most unix systems allow
normalized_path = os.path.realpath(path)
except ValueError:
raise RuntimeError("The path is invalid!")
return normalized_path

View File

@ -0,0 +1,14 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================

View File

@ -0,0 +1,175 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
The parser for AI CPU preprocess data.
"""
import os
from mindspore.profiler.common.util import fwrite_format, get_file_join_name
from mindspore import log as logger
class DataPreProcessParser:
"""
The Parser for AI CPU preprocess data.
Args:
input_path(str): The profiling job path.
output_filename(str): The output data path and name.
"""
_source_file_target = 'DATA_PREPROCESS.dev.AICPU.'
_dst_file_title = 'title:DATA_PREPROCESS AICPU'
_dst_file_column_title = ['serial_number', 'node_type_name', 'total_time(ms)',
'dispatch_time(ms)', 'run_start', 'run_end']
_ms_unit = 1000
def __init__(self, input_path, output_filename):
self._input_path = input_path
self._output_filename = output_filename
self._source_file_name = self._get_source_file()
self._ms_kernel_flag = 3
self._other_kernel_flag = 6
self._thread_flag = 7
self._ms_kernel_run_end_index = 2
self._other_kernel_run_end_index = 5
self._result_list = []
self._min_cycle_counter = float('inf')
def _get_source_file(self):
"""Get log file name, which was created by ada service."""
file_name = get_file_join_name(self._input_path, self._source_file_target)
if not file_name:
data_path = os.path.join(self._input_path, "data")
file_name = get_file_join_name(data_path, self._source_file_target)
return file_name
def _get_kernel_result(self, number, node_list, thread_list):
"""Get the profiling data form different aicpu kernel"""
try:
if len(node_list) == self._ms_kernel_flag and len(thread_list) == self._thread_flag:
node_type_name = node_list[0].split(':')[-1]
run_end_index = self._ms_kernel_run_end_index
elif len(node_list) == self._other_kernel_flag and len(thread_list) == self._thread_flag:
node_type_name = node_list[0].split(':')[-1].split('/')[-1].split('-')[0]
run_end_index = self._other_kernel_run_end_index
else:
logger.warning("the data format can't support 'node_list':%s", str(node_list))
return None
run_start = node_list[1].split(':')[-1].split(' ')[0]
run_end = node_list[run_end_index].split(':')[-1].split(' ')[0]
total_time = float(thread_list[-1].split('=')[-1].split()[0]) / self._ms_unit
dispatch_time = float(thread_list[-2].split('=')[-1].split()[0]) / self._ms_unit
return [number, node_type_name, total_time, dispatch_time,
run_start, run_end]
except IndexError as e:
logger.error(e)
return None
def execute(self):
"""Execute the parser, get result data, and write it to the output file."""
if not os.path.exists(self._source_file_name):
logger.info("Did not find the aicpu profiling source file")
return
with open(self._source_file_name, 'rb') as ai_cpu_data:
ai_cpu_str = str(ai_cpu_data.read().replace(b'\n\x00', b' ___ ')
.replace(b'\x00', b' ___ '))[2:-1]
ai_cpu_lines = ai_cpu_str.split(" ___ ")
result_list = list()
ai_cpu_total_time_summary = 0
# Node serial number.
serial_number = 1
for i in range(len(ai_cpu_lines) - 1):
node_line = ai_cpu_lines[i]
thread_line = ai_cpu_lines[i + 1]
if "Node" in node_line and "Thread" in thread_line:
# Get the node data from node_line
node_list = node_line.split(',')
thread_list = thread_line.split(',')
result = self._get_kernel_result(serial_number, node_list, thread_list)
if result is None:
continue
result_list.append(result)
# Calculate the total time.
total_time = result[2]
ai_cpu_total_time_summary += total_time
# Increase node serial number.
serial_number += 1
elif "Node" in node_line and "Thread" not in thread_line:
node_type_name = node_line.split(',')[0].split(':')[-1]
logger.warning("The node type:%s cannot find thread data", node_type_name)
if result_list:
ai_cpu_total_time = format(ai_cpu_total_time_summary, '.6f')
result_list.append(["AI CPU Total Time(ms):", ai_cpu_total_time])
fwrite_format(self._output_filename, " ".join(self._dst_file_column_title), is_start=True, is_print=True)
fwrite_format(self._output_filename, result_list, is_print=True)
# For timeline display.
self._result_list = result_list
def query_aicpu_data(self):
"""
Get execution time of AI CPU operator.
Returns:
a dict, the metadata of AI CPU operator execution time.
"""
stream_id = 0 # Default stream id for AI CPU.
pid = 9000 # Default pid for AI CPU.
factor = 1000 # Convert time unit from 1us to 1ms
total_time = 0
min_cycle_counter = float('inf')
aicpu_info = []
op_count_list = []
for aicpu_item in self._result_list:
if "AI CPU Total Time(ms):" in aicpu_item:
total_time = aicpu_item[-1]
continue
op_name = aicpu_item[1]
start_time = float(aicpu_item[4]) / factor
min_cycle_counter = min(min_cycle_counter, start_time)
end_time = float(aicpu_item[5]) / factor
duration = end_time - start_time
aicpu_info.append([op_name, stream_id, start_time, duration, pid])
# Record the number of operator types.
if op_name not in op_count_list:
op_count_list.append(op_name)
self._min_cycle_counter = min_cycle_counter
aicpu_dict = {
'info': aicpu_info,
'total_time': float(total_time),
'op_exe_times': len(aicpu_info),
'num_of_ops': len(op_count_list),
'num_of_streams': 1
}
return aicpu_dict
@property
def min_cycle_counter(self):
"""Get minimum cycle counter in AI CPU."""
return self._min_cycle_counter

View File

@ -0,0 +1,113 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The container of metadata used in profiler parser."""
class HWTSContainer:
"""
HWTS output container.
Args:
split_list (list): The split list of metadata in HWTS output file.
"""
def __init__(self, split_list):
self._op_name = ''
self._duration = None
self._status = split_list[0]
self._task_id = split_list[6]
self._cycle_counter = float(split_list[7])
self._stream_id = split_list[8]
@property
def status(self):
"""Get the status of the operator, i.e. Start or End."""
return self._status
@property
def task_id(self):
"""Get the task id of the operator."""
return self._task_id
@property
def cycle_counter(self):
"""Get the cycle counter."""
return self._cycle_counter
@property
def stream_id(self):
"""Get the stream id of the operator."""
return self._stream_id
@property
def op_name(self):
"""Get the name of the operator."""
return self._op_name
@op_name.setter
def op_name(self, name):
"""Set the name of the operator."""
self._op_name = name
@property
def duration(self):
"""Get the duration of the operator execution."""
return self._duration
@duration.setter
def duration(self, value):
"""Set the duration of the operator execution."""
self._duration = value
class TimelineContainer:
"""
A container of operator computation metadata.
Args:
split_list (list): The split list of metadata in op_compute output file.
"""
def __init__(self, split_list):
self._op_name = split_list[0]
self._stream_id = int(split_list[1])
self._start_time = float(split_list[2])
self._duration = float(split_list[3])
self._pid = None
if len(split_list) == 5:
self._pid = int(split_list[4])
@property
def op_name(self):
"""Get the name of the operator."""
return self._op_name
@property
def stream_id(self):
"""Get the stream id of the operator."""
return self._stream_id
@property
def start_time(self):
"""Get the execution start time of the operator."""
return self._start_time
@property
def duration(self):
"""Get the duration of the operator execution."""
return self._duration
@property
def pid(self):
"""Get the pid of the operator execution."""
return self._pid

View File

@ -0,0 +1,595 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Thr parser for parsing framework files."""
import csv
import enum
import json
import os
import re
from mindspore.profiler.common.exceptions.exceptions import \
ProfilerPathErrorException, ProfilerDirNotFoundException, \
ProfilerFileNotFoundException, ProfilerDeviceIdMismatchException, \
ProfilerRawFileException, ProfilerParamValueErrorException
from mindspore.profiler.common.validator.validate_path import \
validate_and_normalize_path
class VmDataType(enum.IntEnum):
"""Definition of vm data type."""
NUMBER_TYPE_BEGIN = 26
NUMBER_TYPE_BOOL = 27
NUMBER_TYPE_INT = 28
NUMBER_TYPE_INT8 = 29
NUMBER_TYPE_INT16 = 30
NUMBER_TYPE_INT32 = 31
NUMBER_TYPE_INT64 = 32
NUMBER_TYPE_UINT = 33
NUMBER_TYPE_UINT8 = 34
NUMBER_TYPE_UINT16 = 35
NUMBER_TYPE_UINT32 = 36
NUMBER_TYPE_UINT64 = 37
NUMBER_TYPE_FLOAT = 38
NUMBER_TYPE_FLOAT16 = 39
NUMBER_TYPE_FLOAT32 = 40
NUMBER_TYPE_FLOAT64 = 41
NUMBER_TYPE_END = 42
@classmethod
def get_data_type_name(cls, num):
"""
Get the name of data type by enum number.
Args:
num (int): Enum number.
Returns:
str, the name of data type.
"""
data_type = cls._value2member_map_.get(num)
return 'UNKNOWN' if data_type is None else data_type.name
class GeDataType(enum.IntEnum):
"""Definition of ge data type."""
DT_FLOAT = 0
DT_FLOAT16 = 1
DT_INT8 = 2
DT_INT16 = 6
DT_UINT16 = 7
DT_UINT8 = 4
DT_INT32 = 3
DT_INT64 = 9
DT_UINT32 = 8
DT_UINT64 = 10
DT_BOOL = 12
DT_DOUBLE = 11
DT_STRING = 13
DT_DUAL_SUB_INT8 = 14
DT_DUAL_SUB_UINT8 = 15
DT_COMPLEX64 = 16
DT_COMPLEX128 = 17
DT_QINT8 = 18
DT_QINT16 = 19
DT_QINT32 = 20
DT_QUINT8 = 21
DT_QUINT16 = 22
DT_RESOURCE = 23
DT_STRING_REF = 24
DT_DUAL = 25
DT_UNDEFINED = 26
@classmethod
def get_data_type_name(cls, num):
"""
Get the name of data type by enum number.
Args:
num (int): Enum number.
Returns:
str, the name of data type.
"""
data_type = cls._value2member_map_.get(num)
return 'UNKNOWN' if data_type is None else data_type.name
class GeFormat(enum.IntEnum):
"""Definition of ge format type."""
FORMAT_NCHW = 0
FORMAT_NHWC = 1
FORMAT_ND = 2
FORMAT_NC1HWC0 = 3
FORMAT_FRACTAL_Z = 4
FORMAT_NC1C0HWPAD = 5
FORMAT_NHWC1C0 = 6
FORMAT_FSR_NCHW = 7
FORMAT_FRACTAL_DECONV = 8
FORMAT_C1HWNC0 = 9
FORMAT_FRACTAL_DECONV_TRANSPOSE = 10
FORMAT_FRACTAL_DECONV_SP_STRIDE_TRANS = 11
FORMAT_NC1HWC0_C04 = 12
FORMAT_FRACTAL_Z_C04 = 13
FORMAT_CHWN = 14
FORMAT_FRACTAL_DECONV_SP_STRIDE8_TRANS = 15
FORMAT_HWCN = 16
FORMAT_NC1KHKWHWC0 = 17
FORMAT_BN_WEIGHT = 18
FORMAT_FILTER_HWCK = 19
FORMAT_HASHTABLE_LOOKUP_LOOKUPS = 20
FORMAT_HASHTABLE_LOOKUP_KEYS = 21
FORMAT_HASHTABLE_LOOKUP_VALUE = 22
FORMAT_HASHTABLE_LOOKUP_OUTPUT = 23
FORMAT_HASHTABLE_LOOKUP_HITS = 24
FORMAT_C1HWNCOC0 = 25
FORMAT_MD = 26
FORMAT_NDHWC = 27
FORMAT_FRACTAL_ZZ = 28
FORMAT_FRACTAL_NZ = 29
FORMAT_NCDHW = 30
FORMAT_DHWCN = 31
FORMAT_NDC1HWC0 = 32
FORMAT_FRACTAL_Z_3D = 33
FORMAT_CN = 34
FORMAT_NC = 35
FORMAT_DHWNC = 36
FORMAT_FRACTAL_Z_3D_TRANSPOSE = 37
FORMAT_RESERVED = 38
FORMAT_ALL = 39
@classmethod
def get_format_name(cls, num):
"""
Get the name of format type by enum number.
Args:
num (int): Enum number.
Returns:
str, the name of format type.
"""
format_type = cls._value2member_map_.get(num)
return 'UNKNOWN' if format_type is None else format_type.name
class FrameworkParser:
"""
Thr parser for parsing framework files.
Args:
profiling_id (str): The profiling ID.
device_id (str): The device ID.
output_path (str): The directory of the parsed file. Default: `./`.
"""
_raw_data_dir = '/var/log/npu/profiling'
_regex_framework = r'Framework\.host\.(?P<data_type>.+)\.(?P<device_id>\d).+'
_regex_framework_in_data = r'Framework\.host\.(?P<data_type>.+)\.' \
r'(?P<device_id>\d)\.(?P<profiling_id>[a-zA-Z0-9]+).+'
_col_names = [
'task_id', 'stream_id', 'block_dim', 'full_op_name', 'op_name',
'op_type', 'subgraph', 'op_info'
]
_graph_attr_name = [
'input_format', 'input_data_type', 'input_shape', 'output_format',
'output_data_type', 'output_shape'
]
# if the task id is less than the task id threshold, The combination of
# task id and Stream id represents one operator, else the task id represents
# one operator
_task_id_threshold = 25000
def __init__(self, profiling_id, device_id, output_path='./'):
self._profiling_path = self._get_raw_profiling_path(profiling_id)
self._backend_type = None
self._framework_path = {'graph': [], 'task': [], 'point': []}
self._search_file(profiling_id, device_id)
self._device_id = device_id
self._save_path = self._get_save_path(device_id, output_path)
self._task_id_full_op_name_dict = {}
self._task_cache = {}
self._point_info = {}
self._parse_task_files()
self._parse_point_files()
@property
def save_path(self):
"""
The property of save path.
Returns:
str, the save path.
"""
return self._save_path
@property
def point_info(self):
"""
The property of the framework point information.
Returns:
dict, the framework point information.
"""
return self._point_info
def to_task_id_full_op_name_dict(self):
"""
Get the task id and full operator name dict.
Returns:
dict, the task id and full operator name dict.
"""
return self._task_id_full_op_name_dict
def parse(self):
"""Parse the framework files."""
self._parse_graph_files_and_save(self._task_cache)
del self._task_cache
def check_op_name(self, op_name, is_prefix=True):
"""
Check whether the operator name exists.
Args:
op_name (str): The operator name or operator name prefix.
is_prefix (bool): `True` if the op_name is prefix, else `False`.
Default: True.
Returns:
bool, `True` if the operator name does exist in framework file, else
`False`.
"""
if not op_name:
raise ProfilerParamValueErrorException('The op_name should exist.')
for full_op_name in self._task_id_full_op_name_dict.values():
if full_op_name:
if is_prefix and full_op_name.startswith(op_name):
return True
if not is_prefix and op_name == full_op_name:
return True
return False
def _get_raw_profiling_path(self, profiling_id):
"""
Get raw profiling path.
Args:
profiling_id (str): The profiling ID.
Returns:
str, the raw profiling path.
Raises:
ProfilerPathErrorException: If the profiling path is invalid.
ProfilerDirNotFoundException: If the profiling dir is not found.
"""
profiling_path = os.path.join(self._raw_data_dir, profiling_id)
try:
profiling_path = validate_and_normalize_path(profiling_path)
except RuntimeError:
raise ProfilerPathErrorException('Profiling path is invalid.')
if not os.path.isdir(profiling_path):
raise ProfilerDirNotFoundException(profiling_path)
return profiling_path
def _search_file(self, profiling_id, device_id):
"""
Search all framework files in raw profiling path.
Args:
profiling_id (str): The profiling ID.
device_id (str): The device ID.
Raises:
ProfilerFileNotFoundException: If the framework files are not found.
"""
# first search in the JOB dir, and if not, search in the sub directory
# in the JOB
self._search_file_from_job_path(device_id, search_in_sub_path=False)
if self._backend_type is None:
self._search_file_from_job_path(device_id, search_in_sub_path=True)
self._search_file_from_data_path(profiling_id, device_id)
if self._backend_type is None:
raise ProfilerFileNotFoundException('Framework')
self._framework_path['graph'].sort()
self._framework_path['task'].sort()
def _search_file_from_job_path(self, device_id, search_in_sub_path=False):
"""
Search framework files from job path.
Args:
device_id (str): The device ID.
search_in_sub_path (bool): `True` if search file in profiling dir,
else search in profiling sub dir. Default: False.
Raises:
ProfilerRawFileException: If the framework file type is inconsistent.
ProfilerDeviceIdMismatchException: If the device id is mismatch
with framework in the raw dir.
"""
profiling_dir = os.path.join(self._profiling_path, 'data') \
if search_in_sub_path else self._profiling_path
if not os.path.isdir(profiling_dir):
return
files = os.listdir(profiling_dir)
for file in files:
pattern = re.search(self._regex_framework, file)
if not pattern or file.endswith('.done'):
continue
attrs = pattern.groupdict()
device_id_in_path = attrs.get('device_id')
if device_id_in_path != device_id:
raise ProfilerDeviceIdMismatchException()
data_type = attrs.get('data_type')
if data_type.startswith('vm.'):
if self._backend_type and self._backend_type != 'vm':
raise ProfilerRawFileException('Backend type is inconsistent.')
self._backend_type = 'vm'
data_type = data_type.split('.')[1]
else:
if self._backend_type and self._backend_type != 'ge':
raise ProfilerRawFileException('Backend type is inconsistent.')
self._backend_type = 'ge'
if data_type.startswith('graph_desc_info'):
self._framework_path['graph'].append(
os.path.join(profiling_dir, file)
)
elif data_type.startswith('task_desc_info'):
self._framework_path['task'].append(
os.path.join(profiling_dir, file)
)
elif data_type.startswith('point'):
self._framework_path['point'].append(
os.path.join(profiling_dir, file)
)
def _search_file_from_data_path(self, profiling_id, device_id):
"""
Search framework files from data path.
Args:
profiling_id (str): The profiling ID.
device_id (str): The device ID.
Raises:
ProfilerRawFileException: If the framework file type is inconsistent.
ProfilerDeviceIdMismatchException: If the device id is mismatch
with framework in the raw dir.
"""
profiling_data_path = os.path.join(
self._raw_data_dir, 'container', device_id, 'data'
)
if not os.path.isdir(profiling_data_path):
return
files = os.listdir(profiling_data_path)
for file in files:
pattern = re.search(self._regex_framework_in_data, file)
if not pattern or file.endswith('.done') or file.endswith('.zip'):
continue
attrs = pattern.groupdict()
profiling_id_in_path = attrs.get('profiling_id')
if profiling_id_in_path != profiling_id:
continue
device_id_in_path = attrs.get('device_id')
if device_id_in_path != device_id:
raise ProfilerDeviceIdMismatchException()
data_type = attrs.get('data_type')
if data_type.startswith('vm.'):
if self._backend_type and self._backend_type != 'vm':
raise ProfilerRawFileException('Backend type is inconsistent.')
self._backend_type = 'vm'
data_type = data_type.split('.')[1]
else:
if self._backend_type and self._backend_type != 'ge':
raise ProfilerRawFileException('Backend type is inconsistent.')
self._backend_type = 'ge'
if data_type.startswith('graph_desc_info'):
self._framework_path['graph'].append(
os.path.join(profiling_data_path, file)
)
elif data_type.startswith('task_desc_info'):
self._framework_path['task'].append(
os.path.join(profiling_data_path, file)
)
elif data_type.startswith('point'):
self._framework_path['point'].append(
os.path.join(profiling_data_path, file)
)
def _get_save_path(self, device_id, output_path):
"""
Get the save path.
Args:
device_id (str): The device ID.
output_path (str): The output dir.
Returns:
str, the save path.
Raises:
ProfilerPathErrorException: If the output path is invalid.
ProfilerDirNotFoundException: If the output dir is not found.
"""
try:
output_dir = validate_and_normalize_path(output_path)
except RuntimeError:
raise ProfilerPathErrorException('Output path is invalid.')
if not os.path.isdir(output_dir):
raise ProfilerDirNotFoundException(output_dir)
return os.path.join(
output_dir, '_'.join(['framework', 'raw', device_id]) + '.csv'
)
def _parse_task_files(self):
"""Parse the framework task files."""
for path in self._framework_path['task']:
with open(path, 'r') as file:
for task_info in file:
infos = task_info.strip('\n').split(' ')
infos = infos[1:] if len(infos) == 5 else infos
# key is op name, values is task id, stream id, block_dim
self._task_cache[infos[0]] = [infos[2], infos[3], infos[1]]
# if the task id is less than the task id threshold, the
# stream id and task id correspond to an operator
task_id = infos[2]
if int(task_id) < self._task_id_threshold:
task_id = '_'.join([infos[3], task_id])
self._task_id_full_op_name_dict[task_id] = infos[0]
def _parse_graph_files_and_save(self, task_cache):
"""
Parse the framework graph files and save the framework information.
Args:
task_cache (dict): The task information cache.
"""
with open(self._save_path, 'w') as save_file:
csv_writer = csv.writer(save_file)
csv_writer.writerow(self._col_names)
for path in self._framework_path['graph']:
with open(path, 'r') as graph_file:
for graph_info in graph_file:
result = self._parse_one_row_graph_info(graph_info)
task_info = task_cache.get(result[0])
if task_info:
task_info.extend(result)
csv_writer.writerow(task_info)
del task_cache[result[0]]
else:
save_info = [None, None, None]
save_info.extend(result)
csv_writer.writerow(save_info)
none_list = [None, None, None, None]
for key, value in task_cache.items():
value.append(key)
value.extend(none_list)
csv_writer.writerow(value)
def _parse_one_row_graph_info(self, row_info):
"""
Parse the graph information in one row.
Args:
row_info (str): One row graph information.
Returns:
list[str], the parsed graph information.
"""
full_op_name = None
op_name = None
subgraph_name = None
op_type = None
op_info = dict()
cur_op_info_key = None
infos = row_info.strip('\n').split(' ')
for info in infos:
attr_name, attr_value = info.split(':', 1)
if attr_name == 'op_name':
full_op_name = attr_value
subgraph_name = self._get_subgraph_name(full_op_name)
op_name = self._get_op_name(full_op_name, subgraph_name)
elif attr_name == 'op_type':
op_type = attr_value
elif attr_name in ['input_id', 'output_id']:
cur_op_info_key = '{}_{}'.format(
attr_name.split('_')[0], attr_value
)
op_info[cur_op_info_key] = dict()
elif attr_name in self._graph_attr_name:
op_attr = attr_name.split('_', 1)[1]
if op_attr == 'shape':
attr_value = attr_value.strip('"')
if self._backend_type == 'vm':
if op_attr == 'data_type':
attr_value = VmDataType.get_data_type_name(
int(attr_value)
)
else:
if op_attr == 'data_type':
attr_value = GeDataType.get_data_type_name(
int(attr_value)
)
elif op_attr == 'format':
attr_value = GeFormat.get_format_name(int(attr_value))
op_info[cur_op_info_key][op_attr] = attr_value
# the list info are full_op_name, op_name, op_type, subgraph, op_info
return [full_op_name, op_name, op_type, subgraph_name,
json.dumps(op_info)]
def _get_subgraph_name(self, full_op_name):
"""
Get subgraph name.
Args:
full_op_name (str): The full operator name.
Returns:
str, the subgraph name.
"""
subgraph_name = full_op_name.split('/', 1)[0]
if subgraph_name in ['Default', 'Gradients']:
return subgraph_name
return None
def _get_op_name(self, full_op_name, subgraph_name):
"""
Get operator name.
Args:
full_op_name (str): The full operator name.
subgraph_name (str): The subgraph name.
Returns:
str, the operator name.
"""
if subgraph_name is None:
return full_op_name
if self._backend_type == 'vm':
return full_op_name.split('/')[-1]
strs = full_op_name.split(subgraph_name + '/')
op_name = None
for name_str in strs:
if not name_str:
continue
if op_name is None:
op_name = name_str.split('/')[-1]
else:
op_name = '+'.join([op_name, name_str.split('/')[-1]])
return op_name
def _parse_point_files(self):
"""Parse the framework point files."""
for path in self._framework_path['point']:
with open(path, 'r') as file:
for point_info in file:
infos = point_info.strip('\n').split(' ')
self._point_info[int(infos[0])] = infos[1]

View File

@ -0,0 +1,109 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for hwts log file."""
import os
import struct
from mindspore.profiler.common.util import fwrite_format, get_file_join_name
from mindspore import log as logger
class HWTSLogParser:
"""
The Parser for hwts log files.
Args:
input_path (str): The profiling job path. Such as: '/var/log/npu/profiling/JOBAIFGJEJFEDCBAEADIFJAAAAAAAAAA".
output_filename (str): The output data path and name. Such as: './output_format_data_hwts_0.txt'.
"""
_source_file_target = 'hwts.log.data.45.dev.profiler_default_tag'
_dst_file_title = 'title:45 HWTS data'
_dst_file_column_title = 'Type cnt Core_ID Block_ID Task_ID Cycle_counter Stream_ID'
def __init__(self, input_path, output_filename):
self._input_path = input_path
self._output_filename = output_filename
self._source_flie_name = self._get_source_file()
def _get_source_file(self):
"""Get hwts log file name, which was created by ada service."""
file_name = get_file_join_name(self._input_path, self._source_file_target)
if not file_name:
data_path = os.path.join(self._input_path, "data")
file_name = get_file_join_name(data_path, self._source_file_target)
if not file_name:
msg = "Fail to find hwts log file, under profiling directory"
raise RuntimeError(msg)
return file_name
def execute(self):
"""
Execute the parser, get result data, and write it to the output file.
Returns:
bool, whether succeed to analyse hwts log.
"""
content_format = ['QIIIIIIIIIIII', 'QIIQIIIIIIII', 'IIIIQIIIIIIII']
log_type = ['Start of task', 'End of task', 'Start of block', 'End of block', 'Block PMU']
result_data = ""
with open(self._source_flie_name, 'rb') as hwts_data:
while True:
line = hwts_data.read(64)
if line:
if not line.strip():
continue
else:
break
byte_first_four = struct.unpack('BBHHH', line[0:8])
byte_first = bin(byte_first_four[0]).replace('0b', '').zfill(8)
ms_type = byte_first[-3:]
is_warn_res0_ov = byte_first[4]
cnt = int(byte_first[0:4], 2)
core_id = byte_first_four[1]
blk_id, task_id = byte_first_four[3], byte_first_four[4]
if ms_type in ['000', '001', '010']: # log type 0,1,2
result = struct.unpack(content_format[0], line[8:])
syscnt = result[0]
stream_id = result[1]
elif ms_type == '011': # log type 3
result = struct.unpack(content_format[1], line[8:])
syscnt = result[0]
stream_id = result[1]
elif ms_type == '100': # log type 4
result = struct.unpack(content_format[2], line[8:])
stream_id = result[2]
if is_warn_res0_ov == '0':
syscnt = result[4]
else:
syscnt = None
else:
logger.info("Profiling: invalid hwts log record type %s", ms_type)
continue
if int(task_id) < 25000:
task_id = str(stream_id) + "_" + str(task_id)
result_data += ("%-14s %-4s %-8s %-9s %-8s %-15s %s\n" %(log_type[int(ms_type, 2)], cnt, core_id,
blk_id, task_id, syscnt, stream_id))
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=self._dst_file_column_title)
fwrite_format(self._output_filename, data_source=result_data)
return True

View File

@ -0,0 +1,581 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The integrator for integrating parsed profiling files."""
import csv
import json
import os
from decimal import Decimal
from mindspore import log as logger
from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException, \
ProfilerFileNotFoundException, ProfilerRawFileException
from mindspore.profiler.common.util import query_latest_trace_time_file, to_int, to_millisecond
from mindspore.profiler.common.validator.validate_path import validate_and_normalize_path
from mindspore.profiler.parser.container import TimelineContainer
SIZE_LIMIT = 20 * 1024 * 1024 # 20MB
class Integrator:
"""
The integrator for integrating parsed profiling files.
Args:
profiling_dir (str): The directory where the parsed profiling files are
located.
device_id (str): The device ID.
"""
_file_name_aicore_detail_time = 'output_op_compute_time_{}.txt'
_file_name_aicpu_time = 'output_data_preprocess_aicpu_{}.txt'
_file_name_framework = 'framework_raw_{}.csv'
_header_aicore_type = ['op_type', 'execution_time', 'execution_frequency',
'percent']
_header_aicore_detail = ['full_op_name', 'execution_time']
_header_aicpu = ['serial_number', 'op_type', 'total_time', 'dispatch_time',
'run_start', 'run_end']
_file_name_aicore_type_time = 'aicore_intermediate_{}_type.csv'
_file_name_aicore_detail_info = 'aicore_intermediate_{}_detail.csv'
_aicore_data = []
_aicore_detail_data = []
_aicore_trace_data = []
_col_names = []
def __init__(self, profiling_dir, device_id):
self._profiling_dir = profiling_dir
self._device_id = device_id
self._op_time_cache = {}
self._total_time = Decimal('0.0')
def integrate(self):
"""Integrate the parsed profiling files."""
self._parse_aicore_detail_time()
self._parse_aicore_type_time()
self._parse_aicpu_time()
def get_aicore_data(self):
self._aicore_data_load()
return self._aicore_data
def get_aicore_detail_data(self):
self._aicore_detail_data_load()
return self._aicore_detail_data
def get_aicore_trace_data(self):
self._aicore_trace_data_load()
return self._aicore_trace_data
def query_for_all_reduce(self):
return self._query_for_all_reduce()
def _parse_aicore_type_time(self):
"""Parse the parsed AICORE operator type file."""
framework_file = os.path.join(
self._profiling_dir,
self._file_name_framework.format(self._device_id)
)
if not os.path.isfile(framework_file):
return
op_name_type_cache = {}
with open(framework_file, 'r') as src_file:
csv_reader = csv.reader(src_file)
_ = next(csv_reader)
for row in csv_reader:
op_name_type_cache[row[3]] = row[5]
op_type_time_cache = {}
for full_op_name, op_time in self._op_time_cache.items():
op_type = op_name_type_cache.get(full_op_name)
if op_type_time_cache.get(op_type) is None:
op_type_time_cache[op_type] = [op_time, 1]
else:
op_type_time_cache[op_type][0] += op_time
op_type_time_cache[op_type][1] += 1
op_type_file_name = 'aicore_intermediate_' + self._device_id + '_type.csv'
op_type_file_path = os.path.join(self._profiling_dir, op_type_file_name)
with open(op_type_file_path, 'w') as type_file:
csv_writer = csv.writer(type_file)
csv_writer.writerow(self._header_aicore_type)
for op_type, op_type_time_info in op_type_time_cache.items():
type_info = [
op_type, op_type_time_info[0], op_type_time_info[1],
round((op_type_time_info[0] / self._total_time) * 100, 2)
]
csv_writer.writerow(type_info)
def _parse_aicore_detail_time(self):
"""Parse the parsed AICORE operator time file."""
aicore_detail_file = os.path.join(
self._profiling_dir,
self._file_name_aicore_detail_time.format(self._device_id)
)
if not os.path.isfile(aicore_detail_file):
return
op_detail_file_name = 'aicore_intermediate_' + self._device_id + '_detail.csv'
op_detail_file_path = os.path.join(
self._profiling_dir, op_detail_file_name
)
with open(aicore_detail_file, 'r') as src_file:
row = src_file.readline()
if row.startswith('op_name'):
_ = src_file.readline()
elif row.startswith('====='):
_ = src_file.readline()
_ = src_file.readline()
else:
return
with open(op_detail_file_path, 'w') as detail_file:
csv_writer = csv.writer(detail_file)
csv_writer.writerow(self._header_aicore_detail)
while True:
row = src_file.readline()
if not row:
break
op_infos = row.split()
if op_infos[0] == 'total':
self._total_time = Decimal(op_infos[2])
continue
self._op_time_cache[op_infos[0]] = Decimal(op_infos[1])
csv_writer.writerow([op_infos[0], op_infos[1]])
def _parse_aicpu_time(self):
"""Parse the parsed AICPU operator time file."""
aicpu_file = os.path.join(
self._profiling_dir,
self._file_name_aicpu_time.format(self._device_id)
)
if not os.path.isfile(aicpu_file):
return
save_file_name = 'aicpu_intermediate_' + self._device_id + '.csv'
save_file_path = os.path.join(self._profiling_dir, save_file_name)
with open(aicpu_file, 'r') as src_file:
row = src_file.readline()
if not row.startswith('serial_number'):
return
_ = src_file.readline()
with open(save_file_path, 'w') as save_file:
csv_writer = csv.writer(save_file)
csv_writer.writerow(self._header_aicpu)
while True:
row = src_file.readline()
if not row:
break
infos = row.split()
if infos[0] == 'AI':
continue
csv_writer.writerow(infos)
def _aicore_data_load(self):
"""Load data according to the parsed AICORE operator types file."""
op_type_file_path = os.path.join(
self._profiling_dir,
self._file_name_aicore_type_time.format(self._device_id)
)
if not os.path.isfile(op_type_file_path):
logger.warning('The file <%s> does not exist.', op_type_file_path)
return
with open(op_type_file_path, 'r') as file:
csv_reader = csv.reader(file)
_ = next(csv_reader)
for info in csv_reader:
self._aicore_data.append([info[0], float(info[1]), int(info[2]), float(info[3])])
def _aicore_detail_data_load(self):
"""Load data according to the parsed AICORE operator file."""
op_detail_file_path = os.path.join(
self._profiling_dir,
self._file_name_aicore_detail_info.format(self._device_id)
)
framework_file_path = os.path.join(
self._profiling_dir,
self._file_name_framework.format(self._device_id)
)
if not os.path.isfile(op_detail_file_path):
logger.warning('The file <%s> does not exist.', op_detail_file_path)
return
if not os.path.isfile(framework_file_path):
logger.warning('The file <%s> does not exist.', framework_file_path)
return
framework_infos = dict()
with open(framework_file_path, 'r') as file:
csv_reader = csv.reader(file)
_ = next(csv_reader)
for info in csv_reader:
framework_infos[info[3]] = [
info[3], info[4], info[5], info[6], json.loads(info[7]) if info[7] else None]
with open(op_detail_file_path, 'r') as file:
csv_reader = csv.reader(file)
_ = next(csv_reader)
for info in csv_reader:
framework_info = framework_infos.get(info[0])
self._aicore_detail_data.append(
[
framework_info[1], framework_info[2], float(info[1]),
framework_info[3], framework_info[0], framework_info[4]
]
)
del framework_infos
def _aicore_trace_data_load(self):
"""Load data according to the parsed AICORE operator types file."""
file_path = query_latest_trace_time_file(self._profiling_dir, int(self._device_id))
if not file_path:
logger.error("Failed to find parsed trace time file.")
raise ProfilerFileNotFoundException('parsed step trace time file')
with open(file_path, 'r') as handle:
csv_reader = csv.reader(handle)
self.__column__ = next(csv_reader)
self._aicore_trace_data = list(csv_reader)
self._size = len(self._aicore_trace_data) - 1
self._display_col_names = self._col_names[:]
self._load_point_info()
def _load_point_info(self):
"""Load point info."""
file_path = os.path.join(self._profiling_dir, 'step_trace_point_info.json')
if os.path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
try:
self._point_info = json.load(file)
except (json.JSONDecodeError, TypeError) as err:
logger.warning(err)
raise ProfilerRawFileException('Fail to parse point info file.')
def _query_for_all_reduce(self):
"""
Query for all reduce info.
Returns:
list[dict], reduce information. Each item is the reduce info for one step.
The reduce info is format like:
{stream_id: List[Tuple(start_point, end_point, duration, field_name)]}.
"""
self._aicore_trace_data_load()
reduce_infos = []
for row_info in self._aicore_trace_data[:-1]:
row_info_dict = self._get_info_dict_from_row_data(row_info, 'systime')
reduce_info = self._sort_reduce_by_time(row_info_dict)
if reduce_info:
reduce_infos.extend(reduce_info)
return reduce_infos
def _get_info_dict_from_row_data(self, row_info, time_type):
"""
Get step info in dict format.
Args:
row_info (list[str]): Step info, the value is corresponding to `__column__`.
time_type (str): The value type. `systime` keeps the original value.
`realtime` transforms the value in millisecond. Default: `realtime`.
Returns:
dict, step trace information. The key is in `__column__`.
"""
row_info_dict = {}
for key, value in zip(self.__column__, row_info):
if key == 'step_num':
continue
value = to_int(value, key)
row_info_dict[key] = to_millisecond(value) if time_type == 'realtime' else value
return row_info_dict
def _sort_reduce_by_time(self, row_info_dict):
"""
Sort reduce info by time.
Args:
row_info_dict (dict): Step trace information.
Returns:
list, including the all reduce info sorted by start time only.
[
[reduce_field, stream_id, reduce_start, reduce_duration],
[...],
[...]
]
"""
factor = 1e5 # convert time unit from 10ns to 1ms
reduce_pid = 10000
reduce_info = []
reduce_fields = [field_name for field_name in self.__column__
if field_name.startswith('stream_') and not field_name.endswith('point')]
for reduce_field in reduce_fields:
reduce_start = row_info_dict.get(reduce_field + '_start_point')
reduce_start = reduce_start / factor \
if reduce_start else 0
reduce_duration = row_info_dict.get(reduce_field)
reduce_duration = reduce_duration / factor if reduce_duration else 0
if not (reduce_start and reduce_duration):
logger.info("Reduce event missing value.")
continue
cur_stream_id = reduce_field.split('_', 2)[1]
reduce_meta = [reduce_field, int(cur_stream_id), reduce_start,
reduce_duration, reduce_pid]
reduce_info.append(reduce_meta)
return reduce_info
class TimelineAnalyser:
"""
Analyse timeline data from file.
"""
__col_names__ = ['op_name', 'stream_id', 'start_time', 'duration']
_output_timeline_data_file_path = 'output_timeline_data_{}.txt'
_min_cycle_counter_file_path = 'min_cycle_counter_{}.txt'
_display_filename = 'timeline_display_{}.json'
_timeline_summary_filename = 'timeline_summary_{}.json'
_timeline_meta = []
_timeline_summary = {
'total_time': 0,
'num_of_streams': 0,
'num_of_ops': 0,
'op_exe_times': 0
}
def __init__(self, profiling_dir, device_id):
self._profiling_dir = profiling_dir
self._device_id = device_id
def write_timeline(self):
"""Load data according to the parsed profiling files."""
# Write timeline to file.
logger.info('Writing timeline file...')
self.write_timeline_to_json_by_limitation()
logger.info('Finished file writing!')
def write_timeline_to_json_by_limitation(self):
"""Write timeline to json by limitation."""
display_filename = self._display_filename.format(self._device_id)
display_file_path = os.path.join(
self._profiling_dir,
display_filename
)
display_file_path = validate_and_normalize_path(display_file_path)
length = len(self._timeline_meta)
try:
with open(display_file_path, 'w') as json_file:
json_file.write('[')
for index, item in enumerate(self._timeline_meta):
json.dump(item, json_file)
file_size = os.path.getsize(display_file_path)
if file_size > SIZE_LIMIT:
break
if index == length - 1:
break
json_file.write(',')
json_file.write(']')
except (IOError, OSError) as err:
logger.error('Error occurred when write timeline display file: %s', err)
raise ProfilerIOException
def write_timeline_summary(self):
"""Write timeline summary to json."""
timeline_summary_file_path = os.path.join(
self._profiling_dir,
self._timeline_summary_filename.format(self._device_id)
)
timeline_summary_file_path = validate_and_normalize_path(timeline_summary_file_path)
try:
with open(timeline_summary_file_path, 'w') as json_file:
json.dump(self._timeline_summary, json_file)
except (IOError, OSError) as err:
logger.error('Error occurred when write timeline summary file: %s', err)
raise ProfilerIOException
def _load_timeline_data(self):
"""Load timeline data from file."""
file_path = os.path.join(
self._profiling_dir,
self._output_timeline_data_file_path.format(self._device_id)
)
file_path = validate_and_normalize_path(file_path)
if not os.path.exists(file_path):
logger.error("Failed to find parsed timeline file.")
raise ProfilerFileNotFoundException('parsed timeline file')
timeline_list = []
try:
with open(file_path, 'r') as f_obj:
for line in f_obj:
if not line.startswith('op_name'):
line_list = line.strip('\n').split(',')
timeline_list.append(line_list)
except (IOError, OSError) as err:
logger.error('Error occurred when read timeline intermediate file: %s', err)
raise ProfilerIOException
return timeline_list
def _parse_timeline_data(self, timeline, min_cycle_counter):
"""Parse timeline data."""
# factor to convert the time unit from 1ms to 1us for timeline display
factor = 1000
op_meta = TimelineContainer(timeline)
timeline_dict = {}
timeline_dict['name'] = op_meta.op_name
timeline_dict['ph'] = 'X'
timeline_dict['tid'] = op_meta.stream_id
timeline_dict['ts'] = (op_meta.start_time - min_cycle_counter) * factor
dur = op_meta.duration * factor
timeline_dict['dur'] = dur
if op_meta.pid is None:
timeline_dict['pid'] = int(self._device_id)
# Update total time of operator execution.
self._timeline_summary['total_time'] += dur
else: # AllReduce and AI CPU pid
timeline_dict['pid'] = op_meta.pid
self._timeline_meta.append(timeline_dict)
@staticmethod
def _update_num_of_streams(timeline, stream_count_dict):
"""Update number of streams."""
stream_id = timeline[1]
if stream_id not in stream_count_dict.keys():
stream_count_dict[stream_id] = 1
else:
stream_count_dict[stream_id] += 1
def get_min_cycle_counter(self):
"""
Get minimum cycle counter.
Returns:
float, the minimum value of the cycle counter.
"""
file_path = os.path.join(
self._profiling_dir,
self._min_cycle_counter_file_path.format(self._device_id)
)
file_path = validate_and_normalize_path(file_path)
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f_obj:
min_cycle_counter = f_obj.read()
min_cycle_counter = float(min_cycle_counter) \
if not min_cycle_counter == 'inf' else 0
except (IOError, OSError) as err:
logger.error('Error occurred when read minimum cycle counter: %s', err)
raise ProfilerIOException
else:
min_cycle_counter = 0
logger.info("No min cycle counter recorded.")
return min_cycle_counter
def init_timeline(self, all_reduce_info, framework_info, aicpu_info, min_cycle_counter):
"""
Init timeline metadata, adding all collected info.
Args:
all_reduce_info (list[list]): The metadata of AllReduce operator.
framework_info (dict): The framework metadata.
aicpu_info (dict): The metadata of AI CPU operator.
min_cycle_counter (float): The minimum cycle counter of the timeline.
"""
if min_cycle_counter == float('inf'):
min_cycle_counter = 0
logger.info('Initiating timeline...')
timeline_list = self._load_timeline_data()
self._timeline_summary['op_exe_times'] = len(timeline_list)
# Add AllReduce info to timeline temp list and sort by start time.
if all_reduce_info:
logger.debug('AllReduce info found. Start adding info into timeline...')
timeline_list.extend(all_reduce_info)
timeline_list.sort(key=lambda x: float(x[2]))
# Add AI CPU data into timeline temp list and sort by start time.
aicpu_data = aicpu_info.get('info')
if aicpu_data:
timeline_list.extend(aicpu_data)
timeline_list.sort(key=lambda x: float(x[2]))
self._timeline_summary['op_exe_times'] += aicpu_info.get('op_exe_times', 0)
self._timeline_summary['num_of_streams'] += aicpu_info.get('num_of_streams', 0)
self._timeline_summary['num_of_ops'] += aicpu_info.get('num_of_ops', 0)
self._timeline_summary['total_time'] += aicpu_info.get('total_time', 0)
# Init a dict for counting the num of streams.
stream_count_dict = {}
for timeline in timeline_list:
self._parse_timeline_data(timeline, min_cycle_counter)
# Updating the collection of streams.
if len(timeline) == 4:
self._update_num_of_streams(timeline, stream_count_dict)
# Get framework metadata.
framework_obj_list = framework_info.get('object')
# The length of list is the number of operators.
self._timeline_summary['num_of_ops'] += len(framework_obj_list)
self._add_framework_info(framework_obj_list)
logger.info('Finished adding info into timeline...')
# Update timeline summary info
self._timeline_summary['num_of_streams'] += len(stream_count_dict.keys())
def _add_framework_info(self, framework_obj_list):
"""
Add framework info into timeline metadata.
Args:
framework_obj_list (list): The framework metadata.
"""
logger.debug('Start adding framework info into timeline...')
# Get the framework info that will be written into timeline.
framework_info_dict = {}
for framework_obj in framework_obj_list:
op_name = framework_obj[0]
op_type = framework_obj[1]
op_full_name = framework_obj[4]
op_info = framework_obj[5]
framework_info_dict[op_full_name] = {
'name': op_name,
'args': {
'type': op_type,
'fullname': op_full_name
}
}
framework_info_dict[op_full_name]['args'].update(op_info)
# Insert framework info into timeline.
for timeline_item in self._timeline_meta:
op_full_name = timeline_item.get('name')
framework_item = framework_info_dict.get(op_full_name)
if framework_item:
timeline_item['name'] = framework_item.get('name')
timeline_item['args'] = framework_item.get('args')
logger.debug('Finished adding framework info into timeline...')

View File

@ -0,0 +1,88 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Minddata aicpu parser."""
import os
from mindspore.profiler.common.util import get_file_join_name, fwrite_format
from mindspore import log as logger
class MinddataParser:
"""Minddata Aicpu Parser."""
@staticmethod
def parse_minddata_aicpu_data(minddata_aicpu_source_path):
"""
Parse minddata get_next info which contains queue size and execute time.
Args:
minddata_aicpu_source_path (str): the source file path.
Returns:
list[Union[str, float]], the converted data.
"""
result = list()
try:
with open(minddata_aicpu_source_path) as source_data_file:
source_data = source_data_file.read()
step_data = source_data.split("\x00")
for one_step in step_data:
if one_step:
node_info = one_step.split(", ")
node_name, node_start, node_end, queue_size = "", 0, 0, 0
if node_info:
node_name = node_info[0].replace("Node:", "")
if len(node_info) > 2:
node_start = node_info[1].replace("Run start:", "")
if node_start.isdigit():
node_start = int(node_start)
node_end = node_info[2].replace("Run end:", "")
if node_end.isdigit():
node_end = int(node_end)
if len(node_info) > 3:
queue_size = node_info[3].replace("queue size:", "")
if queue_size.isdigit():
queue_size = int(queue_size)
one_step_list = [node_name, node_start, node_end, queue_size]
result.append(one_step_list)
except OSError:
logger.error("Open get_next profiling file error.")
return result
@staticmethod
def execute(source_path, output_path, device_id):
"""
Execute the parser.
Args:
source_path (str): the source file path.
output_path (str): the output file path.
device_id (str): the device id.
"""
col_names = ["node_name", "start_time", "end_time", "queue_size"]
minddata_aicpu_source_path = get_file_join_name(
input_path=source_path, file_name='DATA_PREPROCESS.dev.AICPUMI')
if not minddata_aicpu_source_path:
minddata_aicpu_source_path = get_file_join_name(
input_path=os.path.join(source_path, "data"), file_name='DATA_PREPROCESS.dev.AICPUMI')
if not minddata_aicpu_source_path:
return
minddata_aicpu_output_path = os.path.join(output_path, "minddata_aicpu_" + device_id + ".txt")
minddata_aicpu_data = MinddataParser.parse_minddata_aicpu_data(minddata_aicpu_source_path)
if minddata_aicpu_data:
fwrite_format(minddata_aicpu_output_path, " ".join(col_names), is_start=True)
fwrite_format(minddata_aicpu_output_path, minddata_aicpu_data, is_start=True)

View File

@ -0,0 +1,287 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Thr parser for parsing minddata pipeline files."""
import csv
import json
import os
from queue import Queue
from mindspore.profiler.common.exceptions.exceptions import \
ProfilerPathErrorException, ProfilerFileNotFoundException, \
ProfilerDirNotFoundException, ProfilerRawFileException
from mindspore import log as logger
from mindspore.profiler.common.validator.validate_path import \
validate_and_normalize_path
class MinddataPipelineParser:
"""
Thr parser for parsing minddata pipeline files.
Args:
source_dir (str): The minddata pipeline source dir.
device_id (str): The device ID.
output_path (str): The directory of the parsed file. Default: `./`.
Raises:
ProfilerPathErrorException: If the minddata pipeline file path or
the output path is invalid.
ProfilerFileNotFoundException: If the minddata pipeline file or
the output dir does not exist.
"""
_raw_pipeline_file_name = 'pipeline_profiling_{}.json'
_parsed_pipeline_file_name = 'minddata_pipeline_raw_{}.csv'
_col_names = [
'op_id', 'op_type', 'num_workers', 'output_queue_size',
'output_queue_average_size', 'output_queue_length',
'output_queue_usage_rate', 'sample_interval', 'parent_id', 'children_id'
]
def __init__(self, source_dir, device_id, output_path='./'):
self._device_id = device_id
self._pipeline_path = self._get_pipeline_path(source_dir)
self._save_path = self._get_save_path(output_path)
@property
def save_path(self):
"""
The property of save path.
Returns:
str, the save path.
"""
return self._save_path
def parse(self):
"""
Parse the minddata pipeline files.
Raises:
ProfilerRawFileException: If fails to parse the raw file of
minddata pipeline or the file is empty.
"""
with open(self._pipeline_path, 'r') as file:
try:
pipeline_info = json.load(file)
except (json.JSONDecodeError, TypeError) as err:
logger.warning(err)
raise ProfilerRawFileException(
'Fail to parse minddata pipeline file.'
)
if not pipeline_info:
logger.warning('The minddata pipeline file is empty.')
raise ProfilerRawFileException(
'The minddata pipeline file is empty.'
)
self._parse_and_save(pipeline_info)
def _get_pipeline_path(self, source_dir):
"""
Get the minddata pipeline file path.
Args:
source_dir (str): The minddata pipeline source dir.
Returns:
str, the minddata pipeline file path.
"""
pipeline_path = os.path.join(
source_dir,
self._raw_pipeline_file_name.format(self._device_id)
)
try:
pipeline_path = validate_and_normalize_path(pipeline_path)
except RuntimeError:
logger.warning('Minddata pipeline file is invalid.')
raise ProfilerPathErrorException('Minddata pipeline file is invalid.')
if not os.path.isfile(pipeline_path):
logger.warning(
'The minddata pipeline file <%s> not found.', pipeline_path
)
raise ProfilerFileNotFoundException(pipeline_path)
return pipeline_path
def _get_save_path(self, output_path):
"""
Get the save path.
Args:
output_path (str): The output dir.
Returns:
str, the save path.
"""
try:
output_dir = validate_and_normalize_path(output_path)
except ValidationError:
logger.warning('Output path is invalid.')
raise ProfilerPathErrorException('Output path is invalid.')
if not os.path.isdir(output_dir):
logger.warning('The output dir <%s> not found.', output_dir)
raise ProfilerDirNotFoundException(output_dir)
return os.path.join(
output_dir, self._parsed_pipeline_file_name.format(self._device_id)
)
def _parse_and_save(self, pipeline_info):
"""
Parse and save the parsed minddata pipeline file.
Args:
pipeline_info (dict): The pipeline info reads from the raw file of
the minddata pipeline.
Raises:
ProfilerRawFileException: If the format of minddata pipeline raw
file is wrong.
"""
sample_interval = pipeline_info.get('sampling_interval')
op_info = pipeline_info.get('op_info')
if sample_interval is None or not op_info:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong.'
)
op_id_info_cache = {}
for item in op_info:
op_id_info_cache[item.get('op_id')] = item
with open(self._save_path, 'w') as save_file:
csv_writer = csv.writer(save_file)
csv_writer.writerow(self._col_names)
self._parse_and_save_op_info(
csv_writer, op_id_info_cache, sample_interval
)
def _parse_and_save_op_info(self, csv_writer, op_id_info_cache,
sample_interval):
"""
Parse and save the minddata pipeline operator information.
Args:
csv_writer (csv.writer): The csv writer.
op_id_info_cache (dict): The operator id and information cache.
sample_interval (int): The sample interval.
Raises:
ProfilerRawFileException: If the operator that id is 0 does not exist.
"""
queue = Queue()
root_node = op_id_info_cache.get(0)
if not root_node:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong, '
'the operator that id is 0 does not exist.'
)
root_node['parent_id'] = None
queue.put_nowait(root_node)
while not queue.empty():
node = queue.get_nowait()
self._update_child_node(node, op_id_info_cache)
csv_writer.writerow(self._get_op_info(node, sample_interval))
op_id = node.get('op_id')
children_ids = node.get('children')
if not children_ids:
continue
for child_op_id in children_ids:
sub_node = op_id_info_cache.get(child_op_id)
sub_node['parent_id'] = op_id
queue.put_nowait(sub_node)
def _update_child_node(self, node, op_id_info_cache):
"""
Updates the child node information of the operator.
Args:
node (dict): The node represents an operator.
op_id_info_cache (dict): The operator id and information cache.
"""
child_op_ids = node.get('children')
if not child_op_ids:
return
queue = Queue()
self._cp_list_item_to_queue(child_op_ids, queue)
new_child_op_ids = []
while not queue.empty():
child_op_id = queue.get_nowait()
child_node = op_id_info_cache.get(child_op_id)
if child_node is None:
continue
metrics = child_node.get('metrics')
if not metrics or not metrics.get('output_queue'):
op_ids = child_node.get('children')
if op_ids:
self._cp_list_item_to_queue(op_ids, queue)
else:
new_child_op_ids.append(child_op_id)
node['children'] = new_child_op_ids
def _get_op_info(self, op_node, sample_interval):
"""
Get the operator information.
Args:
op_node (dict): The node represents an operator.
sample_interval (int): The sample interval.
Returns:
list[str, int, float], the operator information.
"""
queue_size = None
queue_average_size = None
queue_length = None
queue_usage_rate = None
metrics = op_node.get('metrics')
if metrics:
output_queue = metrics.get('output_queue')
if output_queue:
queue_size = output_queue.get('size')
queue_average_size = sum(queue_size) / len(queue_size)
queue_length = output_queue.get('length')
queue_usage_rate = queue_average_size / queue_length
children_id = op_node.get('children')
op_info = [
op_node.get('op_id'),
op_node.get('op_type'),
op_node.get('num_workers'),
queue_size,
queue_average_size,
queue_length,
queue_usage_rate,
sample_interval,
op_node.get('parent_id'),
children_id if children_id else None
]
return op_info
def _cp_list_item_to_queue(self, inner_list, queue):
"""
Copy the contents of a list to a queue.
Args:
inner_list (list): The list.
queue (Queue): The target queue.
"""
for item in inner_list:
queue.put_nowait(item)

View File

@ -0,0 +1,245 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Op compute time files parser."""
import os
from mindspore.profiler.common.util import fwrite_format
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException
from mindspore import log as logger
from mindspore.profiler.common.validator.validate_path import validate_and_normalize_path
from mindspore.profiler.parser.container import HWTSContainer
TIMELINE_FILE_COLUMN_TITLE = 'op_name, stream_id, start_time(ms), duration(ms)'
class OPComputeTimeParser:
"""
Join hwts info and framework info, get op time info, and output to the result file.
Args:
hwts_output_file (str): The file path of hwts_output_file. Such as: './output_format_data_hwts_0.txt".
output_filename (str): The output data file path and name. Such as: './output_op_compute_time_0.txt'.
op_task_info (dict): The task and op relation info. The format: {task_id, [opname, stream_id, block dim]}.
"""
_dst_file_title = 'title:op compute time'
_dst_file_column_title = 'op_name compute_time(ms) stream_id'
_dst_file_column_title += '\n------------ --------------- ---------'
def __init__(self, hwts_output_file, output_filename, op_task_info,
output_path, device_id):
hwts_output_file = validate_and_normalize_path(hwts_output_file)
self._hwts_output_file = hwts_output_file
self._output_filename = output_filename
self._op_task_info = op_task_info
self._output_path = output_path
self._device_id = device_id
self._min_cycle_counter = float("inf")
def _get_op_task_id_map(self):
"""
Read hwts data file, get the task time info.
Returns:
list: all hwts task time info.
"""
op_map_result = []
hwts_list = []
if not os.path.exists(self._hwts_output_file):
logger.error('The hwts output file does not exist.')
raise ProfilerFileNotFoundException('hwts output file')
with open(self._hwts_output_file, 'r') as data_file:
lines = data_file.readlines()
for line in lines:
if line.startswith("Start of task") or line.startswith("End of task"):
line_split = line.split()
container = HWTSContainer(line_split)
hwts_list.append(container)
# hwts op map by taskId
for hwts in hwts_list:
if hwts.task_id in self._op_task_info.keys():
hwts.op_name = self._op_task_info[hwts.task_id]
op_map_result.append(hwts)
return op_map_result
def execute(self):
"""Execute the parser, compute all op, get op time, and write it to the output file."""
# Calculate the execution time of operators,
# and update the minimum cycle counter.
tmp_result_data = self._calculate_op_execution_time()
# Convert time units from nanoseconds to milliseconds.
# The unit of the cycle counter is 10 nanoseconds.
op_name_time_dict = {}
op_name_stream_dict = {}
op_name_count_dict = {}
op_name_task_dict = {}
op_name_start_time = {}
self._convert_op_time_unit(
tmp_result_data, op_name_time_dict, op_name_stream_dict,
op_name_count_dict, op_name_task_dict, op_name_start_time
)
result_data = ""
total_time = 0
for op_name, time in op_name_time_dict.items():
if op_name in op_name_stream_dict.keys():
stream_id = op_name_stream_dict[op_name]
avg_time = time / op_name_count_dict[op_name]
total_time += avg_time
result_data += ("%s %s %s\n" %(op_name, str(avg_time), stream_id))
result_data += ("total op %s 0" %(str(total_time)))
timeline_data = []
for op_name, time in op_name_time_dict.items():
if op_name in op_name_stream_dict.keys():
stream_id = op_name_stream_dict[op_name]
start_time_list = op_name_start_time.get(op_name)
for (start_time, duration) in start_time_list:
timeline_data.append([op_name, stream_id, start_time, duration])
# Write the metadata of operators into the file,
# including operator name, average time, and stream id.
self._write_op_time_into_file(result_data)
# Write the timeline data into file,
# including operator name, stream id, start time, and duration.
self._write_timeline_data_into_file(timeline_data)
def _write_op_time_into_file(self, result_data):
"""
Write the metadata of operators into the file, including
op name, average time, and stream id.
Args:
result_data (str): The metadata to be written into the file.
'op_name_1', 'avg_time_1', 'stream_id_1',
'op_name_2', 'avg_time_2', 'stream_id_2',
...
"""
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=self._dst_file_column_title)
fwrite_format(self._output_filename, data_source=result_data)
def _write_timeline_data_into_file(self, timeline_data):
"""
Write the timeline information into the file, including
operator name, stream id, start time and duration.
Args:
timeline_data (list): The metadata to be written into the file.
[
['op_name_1', 'stream_id_1', 'start_time_1', 'durarion_1'],
['op_name_2', 'stream_id_2', 'start_time_2', 'durarion_2'],
[...]
]
"""
# sorted by start times
timeline_data.sort(key=lambda x: float(x[2]))
filename = 'output_timeline_data_{}.txt'.format(self._device_id)
file_path = os.path.join(self._output_path, filename)
file_path = validate_and_normalize_path(file_path)
# write to file
try:
with open(file_path, 'w') as f_obj:
f_obj.write(TIMELINE_FILE_COLUMN_TITLE + '\n')
for timeline in timeline_data:
timeline = [str(item) for item in timeline]
f_obj.write(','.join(timeline) + '\n')
except (IOError, OSError) as err:
logger.error('Error occurred when writing intermediate timeline file: %s', err)
raise ProfilerIOException
def _calculate_op_execution_time(self):
"""
Calculate the execution time of each operator.
Returns:
list, including the intermediate data of op execution time.
"""
tmp_result_data = []
op_map_list = self._get_op_task_id_map()
cur_index = 0
length = len(op_map_list)
min_cycle_counter = float("inf")
while cur_index < length:
if cur_index + 1 == length:
break
op_start = op_map_list[cur_index]
op_end = op_map_list[cur_index + 1]
if op_start.status == "Start" and op_end.status == "End" \
and op_start.op_name == op_end.op_name:
op_start.duration = op_end.cycle_counter - op_start.cycle_counter
tmp_result_data.append(op_start)
cur_index += 2
if not op_start.op_name.startswith("assign"):
min_cycle_counter = min(min_cycle_counter, op_start.cycle_counter)
else:
cur_index += 1
# Update the value of minimum cycle counter.
self._min_cycle_counter = min_cycle_counter / 1e5 # Convert the time unit from 10ns to 1ms
return tmp_result_data
def _convert_op_time_unit(self, op_data_list, op_name_time_dict, op_name_stream_dict,
op_name_count_dict, op_name_task_dict, op_name_start_time):
"""
Calculate the execution time of operator and convert it into millisecond.
Args:
op_data_list (list): The list of operator metadata.
op_name_time_dict (dict): The mapping relation of operator name and its execution time.
op_name_stream_dict (dict): The mapping relation of operator name and its stream id.
op_name_count_dict (dict): The mapping relation of operator name and its count.
op_name_task_dict (dict): The mapping relation of operator name and its task id.
op_name_start_time (dict): The mapping relation of operator name and its start time.
"""
factor = 1e5
for item in op_data_list:
op_name = item.op_name
# Unit conversion: converting the cycle counter into ms.
op_start_time_str = str(item.cycle_counter / factor)
op_duration = item.duration / factor
op_duration_str = str(item.duration / factor)
if op_name in op_name_time_dict.keys():
op_name_time_dict[op_name] += op_duration
if item.task_id == op_name_task_dict[op_name]:
op_name_count_dict[op_name] += 1
op_name_start_time[op_name].append(
(op_start_time_str, op_duration_str)
)
else:
op_name_time_dict[op_name] = op_duration
op_name_stream_dict[op_name] = item.stream_id
op_name_task_dict[op_name] = item.task_id
op_name_count_dict[op_name] = 1
op_name_start_time[op_name] = []
op_name_start_time[op_name].append(
(op_start_time_str, op_duration_str)
)
@property
def min_cycle_counter(self):
"""Get minimum cycle counter."""
return self._min_cycle_counter

View File

@ -0,0 +1,382 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for step trace data."""
import csv
import json
import os
import stat
import struct
from collections import namedtuple
from decimal import Decimal
from mindspore.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \
JobIdMismatchException, ProfilerIOException
from mindspore import log
from mindspore.profiler.common.util import get_summary_for_step_trace
StepTraceStruct = namedtuple(
'TrainingTraceStruct', ['tag_id', 'task_id', 'stream_id', 'sys_count']
)
class StepTraceParser:
"""
The parser for step trace data.
Args:
input_dir (str): The directory that contains original step trace data.
output_file_path (str): The output file path.
job_id (int): The job id used to define the start of new step. Default: 0.
skip_first_step (bool): Whether skip the first step or not.
"""
_event_size = 20
_fp_tag = 1
_bp_tag = 2
_end_tag = 255
def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False):
self._input_dir = input_dir
self._output_path = output_file_path
self._job_id = job_id
self._skip_first_step = skip_first_step
self._result = []
self._header = []
self._step_num = 0
self._tag_map = {}
@property
def output_file(self):
"""The property of step trace header."""
file_name = self._output_path.rsplit('/', 2)
return file_name[-1] if len(file_name) == 3 else ''
def show(self):
"""The property of step trace info."""
summary_info = {}
if self._result:
summary_info = get_summary_for_step_trace(self._result[-1], self._header)
summary_info['total_steps'] = len(self._result) - 1
print('\nStep trace summary info (unit: syscnt):')
print(summary_info)
print('\nThe step trace parse result saves under ${summary_dir}/profiler/%s'
% self.output_file)
def parse_and_save(self):
"""Parse step trace files and save the result."""
try:
source_files = self._get_step_trace_files()
self._parse(source_files)
self._save()
except IOError as err:
log.warning(err)
raise ProfilerIOException()
else:
log.info("Finish to save intermediate result for step trace file.")
def record_point_info(self, point_info, output_path):
"""
Record point info into json.
Args:
point_info (dict): The point info about tag id and relative op name.
output_path (str): The output path for saving point info.
Returns:
dict, parsed point info.
"""
points = {
'fp_start': point_info.get(self._fp_tag, ''),
'bp_end': point_info.get(self._bp_tag, '')
}
try:
with open(output_path, 'w') as json_file:
json.dump(points, json_file)
os.chmod(output_path, stat.S_IREAD)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return points
def update_tag_op_type_map(self, point_info):
"""
update the map from tag id to op type.
Args:
point_info (dict): The point info about tag id and relative op name.
"""
tag_map = {}
for tag, op_name in point_info.items():
op_type = self._get_op_type(tag, op_name)
tag_map[tag] = op_type
log.info("Get tag types for step trace analysis: %s", tag_map)
self._tag_map = tag_map
def _get_op_type(self, tag, name):
"""
Get op type from tag and name.
Args:
tag (int): The tag id.
name (str): The op name.
Returns:
str, the op type.
"""
tag_map = {self._fp_tag: 'fp', self._bp_tag: 'bp', self._end_tag: 'end'}
# get solid tag type
op_type = tag_map.get(tag, '')
if op_type:
return op_type
# check if the tag is step tag.
if tag > self._end_tag or tag == 0:
return 'start'
# analyze the reduce tag
op_type = name.rsplit('/', 1)[-1].split('-')[0]
if not op_type:
log.warning("Unexpected op name:%s", name)
return op_type
def _get_step_trace_files(self):
"""Get step trace files."""
# step trace files may under $profiler_dir or $profiler_dir/data
profiler_dir = self._input_dir
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
# try to find step trace files under $profiler_dir/data
profiler_dir = os.path.join(profiler_dir, 'data')
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
raise ProfilerPathErrorException('Training trace file does not exist.')
return step_trace_files
@staticmethod
def _search_file(input_dir):
"""Search step trace file under specific input directory."""
# validate input_dir
if not os.path.isdir(input_dir):
raise ProfilerPathErrorException(
'{} does not exist or is not a dir'.format(input_dir)
)
# get step trace files
files = os.listdir(input_dir)
step_trace_files = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
# validate result
if len(step_trace_files) > 1:
# the format of file name is like
# `training_trace.46.dev.profiler_default_tag.$id.slice_$number`
# use the $number as the sorted key
try:
step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
except ValueError as err:
log.warning("Unable to parse file names: %s. %s", step_trace_files, err)
step_trace_files = []
file_paths = [os.path.join(input_dir, file) for file in step_trace_files]
log.info("Find %d step trace files.", len(file_paths))
return file_paths
def _parse(self, source_files):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
event_info = {}
for source_file in source_files:
with open(source_file, 'rb') as handler:
content = handler.read()
for step_trace in self._get_next_step_trace(content, event_info):
if self._skip_first_step:
self._skip_first_step = False
continue
self._record_trace_event(step_trace)
self._record_average_info()
log.info("Finish to parse step trace file.")
def _get_next_step_trace(self, content, event_info):
"""
Get next step trace info.
Args:
content (bytes): The input step trace info.
event_info (dict): The event info.
Returns:
Generator, return the step trace one by one.
"""
for pos in range(0, len(content), 20):
next_event = self._get_trace_struct(content[pos:pos + self._event_size])
self._construct_event_info(next_event, event_info)
if event_info.get('end'):
yield event_info
def _get_trace_struct(self, bin_info):
"""Translate event info to StepTraceStruct."""
if len(bin_info) == self._event_size:
parsed_info = struct.unpack('=QHHQ', bin_info)
return StepTraceStruct(*parsed_info)
return None
def _construct_event_info(self, next_event, event_info):
"""Construct event info according to next_event."""
min_job_id = 255
step_flag: bool = lambda tag: tag > min_job_id or tag == 0
end_flag: bool = lambda tag: tag == min_job_id
fp_flag: bool = lambda tag: tag == self._fp_tag
bp_flag: bool = lambda tag: tag == self._bp_tag
def _on_step_event():
"""Handle step event."""
self._validate_tag_id(tag_id)
start_time = event_info.get('end', '-')
event_info.clear()
event_info['start'] = start_time
event_info['reduce'] = {}
def _on_reduce_event(reduce_tag_id):
"""Handle reduce event."""
stream_id = next_event.stream_id
if event_info['reduce'].get(stream_id):
event_info['reduce'][stream_id].append((reduce_tag_id, sys_count))
else:
event_info['reduce'][stream_id] = [(reduce_tag_id, sys_count)]
tag_id = next_event.tag_id
sys_count = next_event.sys_count
if end_flag(tag_id):
event_info['end'] = sys_count
elif step_flag(tag_id):
_on_step_event()
elif fp_flag(tag_id):
event_info['fp'] = sys_count
elif bp_flag(tag_id):
event_info['bp'] = sys_count
else:
_on_reduce_event(tag_id)
def _validate_tag_id(self, job_id):
"""Check the job id in source step trace file is same as user set."""
if not self._job_id:
self._job_id = job_id
elif self._job_id != job_id:
raise JobIdMismatchException()
def _record_trace_event(self, step_trace):
"""Record trace event."""
self._step_num += 1
start_time = step_trace.get('start')
end_time = step_trace.get('end')
fp_time = step_trace.get('fp')
bp_time = step_trace.get('bp')
if not (start_time and end_time and fp_time and bp_time):
log.warning("The step %d lacks basic time.", self._step_num)
return
if start_time == '-':
start_time = fp_time
row_data = {
'step_num': self._step_num,
'start_point': start_time,
'end_point': end_time,
'total': end_time - start_time,
'fp_point': fp_time,
'bp_point': bp_time,
'iteration_interval': fp_time - start_time,
'fp_and_bp': bp_time - fp_time,
'tail': end_time - bp_time
}
# update reduce info
self._update_reduce_info(step_trace, row_data)
# save the row data
if not self._header:
self._header = list(row_data.keys())
row_data_list = [row_data.get(header_name, 0) for header_name in self._header]
self._result.append(row_data_list)
def _update_reduce_info(self, step_trace, row_data):
"""Extract reduce info."""
reduce_time = step_trace.get('reduce', {})
for stream_id, time_points in reduce_time.items():
time_point_num = len(time_points)
if time_point_num % 2:
log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num)
continue
for index, point_id in enumerate(range(0, time_point_num, 2)):
field_name = f'stream_{stream_id}_{index}'
reduce_info = self._get_single_reduce_event_info(
field_name, time_points[point_id], time_points[point_id + 1])
row_data.update(reduce_info)
def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.
Args:
field_name (str): The field name.
start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
Returns:
dict, reduce info.
"""
reduce_info = {}
if end_point[0] - start_point[0] != 1 or end_point[0] % 2:
log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
return reduce_info
op_type = self._tag_map.get(start_point[0])
# append field name with op type.
if not op_type:
log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
field_name += '_parallel'
else:
field_name += '_' + op_type
reduce_info[field_name] = end_point[1] - start_point[1]
reduce_info[field_name + '_start_point'] = start_point[1]
reduce_info[field_name + '_end_point'] = end_point[1]
return reduce_info
def _record_average_info(self):
"""Calculate average info."""
result_size = len(self._result)
# calculate average data for each column in result data
average_data = [0] * len(self._header)
if result_size >= 2:
for row_info in self._result[1:]:
average_data = [
Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data)
]
average_data = [
round((item / (result_size - 1))) for item in average_data
]
# change step num info in average_data to None
step_num_index = self._header.index('step_num')
average_data[step_num_index] = '-'
self._result.append(average_data)
log.info("Finish add average info for step trace.")
def _save(self):
log.info("Start to save step trace file.")
if not self._header:
return
with open(self._output_path, 'w') as file_handle:
csv_writer = csv.writer(file_handle)
csv_writer.writerow(self._header)
for row_data in self._result:
csv_writer.writerow(row_data)
os.chmod(self._output_path, stat.S_IREAD)

View File

@ -0,0 +1,417 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiling api file."""
import os
import time
from mindspore import log as logger, context
from mindspore.communication.management import release
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException, ProfilerException
from mindspore.profiler.common.util import get_file_names, fwrite_format
from mindspore.profiler.common.validator.checkparam import \
check_bool, check_subgraph
from mindspore.profiler.common.validator.validate_path import \
validate_and_normalize_path
from mindspore.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindspore.profiler.parser.framework_parser import FrameworkParser
from mindspore.profiler.parser.hwts_log_parser import HWTSLogParser
from mindspore.profiler.parser.integrator import Integrator
from mindspore.profiler.parser.integrator import TimelineAnalyser
from mindspore.profiler.parser.minddata_parser import MinddataParser
from mindspore.profiler.parser.minddata_pipeline_parser import \
MinddataPipelineParser
from mindspore.profiler.parser.optime_parser import OPComputeTimeParser
from mindspore.profiler.parser.step_trace_parser import StepTraceParser
PROFILING_LOG_BASE_PATH = "/var/log/npu/profiling"
INIT_OP_NAME = 'Default/InitDataSetQueue'
class Profiler:
"""
Performance profiling API.
Enable MindSpore users to profile the performance of neural network.
Args:
subgraph (str): Define which subgraph to monitor and analyse, can be 'all', 'Default', 'Gradients'.
is_detail (bool): Whether to show profiling data for op_instance level, only show optype level if False.
is_show_op_path (bool): Whether to save the full path for each op instance.
output_path (str): Output data path.
optypes_to_deal (str): Op type names, the data of which optype should be collected and analysed,
will deal with all op if null; Different op types should be seperated by comma.
optypes_not_deal (str): Op type names, the data of which optype will not be collected and analysed;
Different op types should be seperated by comma.
Examples:
>>> from mindspore.profiler import Profiler
>>> import mindspore.context
>>> context.set_context(mode=context.GRAPH_MODE, device_target="Ascend",
>>> device_id=int(os.environ["DEVICE_ID"]))
>>> profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data')
>>> model = Model()
>>> model.train()
>>> profiler.analyse()
"""
_base_profiling_container_path = "/var/log/npu/profiling/container"
_hwts_output_filename_target = "output_format_data_hwts_"
_opcompute_output_filename_target = "output_op_compute_time_"
_aicpu_op_output_filename_target = "output_data_preprocess_aicpu_"
def __init__(self, subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data',
optypes_to_deal='', optypes_not_deal='Variable', job_id=""):
# get device_id and device_target
self._get_devid_and_devtarget()
self._container_path = os.path.join(self._base_profiling_container_path, self._dev_id)
data_path = os.path.join(self._container_path, "data")
if not os.path.exists(data_path):
os.makedirs(data_path, exist_ok=True)
self._output_path = validate_and_normalize_path(output_path)
self._output_path = os.path.join(self._output_path, "profiler")
if not os.path.exists(self._output_path):
os.makedirs(self._output_path, exist_ok=True)
os.environ['PROFILING_MODE'] = 'true'
os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace'
os.environ['MINDDATA_PROFILING_DIR'] = self._output_path
os.environ['DEVICE_ID'] = self._dev_id
os.environ['AICPU_PROFILING_MODE'] = 'true'
os.environ['PROFILING_DIR'] = str(self._container_path)
# use context interface to open profiling, for the new mindspore version(after 2020.5.21)
context.set_context(enable_profiling=True, profiling_options="training_trace:task_trace")
self._subgraph = check_subgraph(subgraph)
self._valid_optype_name = optypes_to_deal.split(",") if optypes_to_deal else []
self._filt_optype_names = optypes_not_deal.split(",") if optypes_not_deal else []
self._detail = check_bool(is_detail, 'is_detail')
self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path')
self._profiling_job_id = job_id
# add job id env through user input later
self._job_id_env = 0
self._start_time = int(time.time() * 10000000)
logger.info("Profiling: profiling start time: %d", self._start_time)
def analyse(self):
"""
Collect and analyse performance data, called after training or during training.
Examples:
>>> from mindspore.profiler import Profiler
>>> import mindspore.context
>>> context.set_context(mode=context.GRAPH_MODE, device_target="Ascend",
>>> device_id=int(os.environ["DEVICE_ID"]))
>>> profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data')
>>> model = Model()
>>> model.train()
>>> profiler.analyse()
"""
release()
job_id = self._get_profiling_job_id()
logger.info("Profiling: job id is %s ", job_id)
source_path = os.path.join(PROFILING_LOG_BASE_PATH, job_id)
# parse hwts.log.data.45.dev file, and get task profiling data
hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt"
hwts_output_filename = os.path.join(self._output_path, hwts_output_filename)
hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename)
result = hwtslog_parser.execute()
if not result:
logger.error("Profiling: fail to parse hwts log file.")
return
# parse Framework file, and get the relation of op and tasks
framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path)
framework_parser.parse()
op_task_dict = framework_parser.to_task_id_full_op_name_dict()
if not op_task_dict:
logger.error("Profiling: fail to parse framework files.")
return
# get op compute time from hwts data and framework data, write output_op_compute_time.txt
opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt"
opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename)
optime_parser = OPComputeTimeParser(
hwts_output_filename, opcompute_output_filename,
op_task_dict, self._output_path, self._dev_id
)
optime_parser.execute()
# parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt
output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt"
output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser.execute()
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._dev_id)
# parse minddata pipeline operator and queue
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
pipeline_parser.parse()
except ProfilerException as err:
logger.warning(err.message)
# analyse op compute time info
try:
self._analyser_op_info()
except ProfilerException as err:
logger.warning(err.message)
# analyse step trace info
try:
self._analyse_step_trace(source_path, framework_parser)
except ProfilerException as err:
logger.warning(err.message)
# analyse timeline info
try:
self._analyse_timeline(aicpu_data_parser, optime_parser)
except (ProfilerIOException, ProfilerFileNotFoundException, RuntimeError) as err:
logger.warning('Fail to write timeline data: %s', err)
def _analyse_step_trace(self, source_path, framework_parser):
"""
Analyse step trace data and save the result.
Args:
source_path (str): The directory that contains the step trace original data.
framework_parser (FrameworkParser): The framework parse instance.
"""
logger.info("Begin to parse step trace.")
# construct output path
step_trace_intermediate_file_path = os.path.join(
self._output_path,
f'step_trace_raw_{self._dev_id}_detail_time.csv'
)
point_info_file_path = os.path.join(
self._output_path,
'step_trace_point_info.json'
)
# whether keep the first step
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
point_info = framework_parser.point_info
# parser the step trace files and save the result to disk
parser = StepTraceParser(input_dir=source_path,
output_file_path=step_trace_intermediate_file_path,
job_id=self._job_id_env,
skip_first_step=skip_first_step_flag)
parser.update_tag_op_type_map(point_info)
parser.parse_and_save()
point_info = parser.record_point_info(point_info, point_info_file_path)
# print parser result
parser.show()
logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)
logger.info("The point info is: %s", point_info)
def _analyse_timeline(self, aicpu_parser, optime_parser):
"""
Analyse and parse timeline info.
Args:
aicpu_parser (DataPreProcessParser): The parser instance for AI CPU operator
execution time calculation.
optime_parser (OPComputeTimeParserParser): The parser instance for AI Core
operator execution time calculation.
"""
timeline_analyser = TimelineAnalyser(self._output_path, self._dev_id)
# Get framework info
integrator = Integrator(self._output_path, self._dev_id)
aicore_detail_data = integrator.get_aicore_detail_data()
aicore_detail_data_size = len(aicore_detail_data)
col_names = ['op_name', 'op_type', 'avg_execution_time', 'subgraph',
'full_op_name', 'op_info']
framework_info = {
'col_name': col_names,
'object': aicore_detail_data,
'size': aicore_detail_data_size
}
all_reduce_info = integrator.query_for_all_reduce()
# Get timeline info
logger.info('Start writing timeline info...')
logger.info('Warm Prompt: It could take a few minutes if you are training '
'with a complex network or more than 10 steps.')
# Add info into timeline, such as AI CPU, AllReduce, framework info.
aicpu_info = aicpu_parser.query_aicpu_data()
min_cycle_counter = min(aicpu_parser.min_cycle_counter, optime_parser.min_cycle_counter)
timeline_analyser.init_timeline(all_reduce_info, framework_info, aicpu_info, min_cycle_counter)
timeline_analyser.write_timeline()
timeline_analyser.write_timeline_summary()
def __del__(self):
"""Disable the profiling collection service, called after training."""
os.environ['PROFILING_MODE'] = str("false")
context.set_context(enable_profiling=False)
def _get_profiling_job_id(self):
"""Get profiling job id, which was generated by ada service.
Returns:
str: profiling jon id.
"""
if self._profiling_job_id:
return self._profiling_job_id
job_id = ""
cmd = "ls -t " + PROFILING_LOG_BASE_PATH + "|grep JOB|awk '{print $1}'"
r = os.popen(cmd)
profiling_job_dirs = r.readlines()
r.close()
for item in profiling_job_dirs:
path = os.path.join(PROFILING_LOG_BASE_PATH, item.strip())
log_file = get_file_names(path, "host_start.log")
if not log_file:
logger.error("Profiling: job path %s, host_start.log not exist.", path)
continue
log_file = os.path.join(path, log_file[0])
item_dict = self._parse_host_start_log(log_file)
if not item_dict:
logger.error("Profiling: job path %s, fail to get job start info.", path)
continue
if self._start_time > int(item_dict["start_time"]):
logger.info("Profiling: job path %s, start_time %s, training start_time %d.",
path, item_dict["start_time"], self._start_time)
break
if self._dev_id != item_dict["device_id"]:
logger.info("Profiling: job path %s, dev id %s, training device id %s.",
path, item_dict["device_id"], self._dev_id)
continue
job_id = item.strip()
break
if not job_id:
msg = "Fail to get profiling job, please check whether job dir was generated"
raise RuntimeError(msg)
return job_id
def _parse_host_start_log(self, input_file):
"""
Parse host start log file, get the device id and start time of the job.
Args:
input_file (str): The file path of the host start log file.
Returns:
dict, job start time and device id.
"""
item_dict = {}
for line in open(input_file):
if "Device" in line:
item_dict["device_id"] = line[7:len(line)-2]
elif "clock_realtime" in line:
item_dict["start_time"] = line[16:len(line)-3]
return item_dict
def _analyser_op_info(self):
"""Analyse the operator information."""
integrator = Integrator(self._output_path, self._dev_id)
integrator.integrate()
aicore_type_result = self._query_op_type_info()
detail_file_path = os.path.join(
self._output_path,
'output_op_compute_time_detail_{}.txt'.format(self._dev_id)
)
fwrite_format(detail_file_path, data_source='title:op compute time')
display_names = [
'optype_name', 'compute_time(ms, per-step)',
'called_times(per-step)', 'percent'
]
fwrite_format(detail_file_path, data_source=" ".join(display_names), is_print=True)
fwrite_format(detail_file_path, data_source=aicore_type_result, is_print=True)
if self._detail:
op_type_order = [item[0] for item in aicore_type_result]
aicore_detail_result = self._query_op_detail_info(op_type_order)
fwrite_format(detail_file_path, data_source='', is_print=True)
fwrite_format(detail_file_path, data_source='Detail:', is_print=True)
col_names = ['op_name', 'op_type', 'avg_execution_time', 'subgraph',
'full_op_name', 'op_info']
fwrite_format(detail_file_path, data_source=" ".join(col_names), is_print=True)
fwrite_format(detail_file_path, data_source=aicore_detail_result, is_print=True)
def _query_op_type_info(self):
"""
Query AICORE operator type information.
Returns:
list[list], the AICORE operator type and execution time information.
"""
integrator = Integrator(self._output_path, self._dev_id)
return integrator.get_aicore_data()
def _query_op_detail_info(self, op_type_order):
"""
Query AICORE operator detail information.
Args:
op_type_order(list): The name of the op type in order.
Returns:
dict, the AICORE operator detail information.
"""
op_type_condition = {}
if self._valid_optype_name:
op_type_condition['in'] = self._valid_optype_name
if self._filt_optype_names:
op_type_condition['not_in'] = self._filt_optype_names
subgraph_condition = {}
if self._subgraph != 'all':
subgraph_condition['in'] = [self._subgraph]
integrator = Integrator(self._output_path, self._dev_id)
return integrator.get_aicore_detail_data()
def _get_devid_and_devtarget(self):
"""Get device id and target of this training."""
device_target = ""
dev_id = ""
try:
dev_id = str(context.get_context("device_id"))
device_target = context.get_context("device_target")
except ValueError as err:
logger.error("Profiling: fail to get context, %s", err)
if not dev_id or not dev_id.isdigit():
dev_id = os.getenv('DEVICE_ID')
if not dev_id or not dev_id.isdigit():
dev_id = "0"
logger.error("Fail to get DEVICE_ID, use 0 instead.")
if device_target and device_target != "Davinci" \
and device_target != "Ascend":
msg = "Profiling: unsupport backend: %s" % device_target
raise RuntimeError(msg)
self._dev_id = dev_id