It is not appropriate to enable profiler warning scenarios by environment variables

This commit is contained in:
liuchuting 2023-02-06 16:31:02 +08:00
parent 6b91d6a2a1
commit 69b14bb126
3 changed files with 106 additions and 70 deletions

View File

@ -16,23 +16,14 @@
import json
import os
import time
from enum import Enum
from mindspore.profiler import Profiler
from mindspore.profiler.profiling import AICORE_METRICS_DICT
from mindspore.profiler.profiling import AICORE_METRICS_DICT, DeviceSupportParam
from mindspore.profiler.common.validator.validate_path import validate_and_normalize_path
from mindspore.profiler.parser.integrator import DeviceTarget
from mindspore import log as logger, context
class DeviceSupportParam(Enum):
"""The device target enum."""
CPU = ['start', 'output_path']
GPU = ['start', 'output_path', 'data_process', 'timeline_limit', 'sync_enable']
ASCEND = ['start', 'output_path', 'data_process', 'timeline_limit', 'profile_memory', 'parallel_strategy',
'profile_communication', 'aicore_metrics', 'l2_cache']
def get_profiling_options():
"""Get profiling options."""
try:
@ -42,27 +33,41 @@ def get_profiling_options():
return options
def parse_device_support_param(options):
def parse_device_support_param(origin_options, final_options, factor_s_to_us=1e7):
"""Parse platform support parameters."""
device_target = context.get_context("device_target").upper()
for param in options.keys():
if param not in DeviceSupportParam.__getattr__(f'{device_target}').value:
logger.warning(f"The parameter '{param}' is not supported on {device_target} currently.")
support_list = DeviceSupportParam.__getattr__(f'{device_target}').value
support_dict = final_options.copy()
for param in list(set(origin_options) | set(final_options)):
if param not in support_list and origin_options.get(param):
logger.warning(f"[Profiler]'{param}' is invalid params on this platform.")
if param not in support_list and final_options.get(param):
support_dict.pop(param)
simple_options = {
"start_time": int(time.time() * factor_s_to_us),
"file_output_path": "",
"pid": os.getpid(),
}
support_dict.update(simple_options)
return support_dict
def construct_profiling_options():
"""Construct profiling options to determine which profiling data should be collected."""
profiling_options = get_profiling_options()
if profiling_options is None:
raise RuntimeError(
error_config = {"start": False}
if os.getenv("MS_PROFILER_RUN_CONFIG"):
return error_config
os.environ["MS_PROFILER_RUN_CONFIG"] = json.dumps(error_config)
logger.error(
"The format of MS_PROFILER_OPTIONS is incorrect. "
"The MS_PROFILER_OPTIONS parameter configuration may refer to "
"'https://www.mindspore.cn/mindinsight/docs/zh-CN/master/performance_profiling_ascend.html'."
)
options = combine_profile_options(profiling_options)
conbine_options = parse_profiling_args(options)
return error_config
conbine_options = combine_profile_options(profiling_options)
if conbine_options.get("start"):
parse_device_support_param(profiling_options)
output_path = conbine_options.get("output_path")
if not output_path:
output_path = os.path.join(os.getcwd(), "data")
@ -87,11 +92,20 @@ def parse_pubilc_args(options):
"The 'data_process' parameter of the environment variable MS_PROFILE_OPTIONS must be bool,"
f" but got type {type(options.get('data_process'))}, it will be set to true.")
options["data_process"] = True
if not isinstance(options.get("timeline_limit"), int):
if not isinstance(options.get("op_time"), bool):
logger.warning(
"The 'op_time' parameter of the environment variable MS_PROFILE_OPTIONS must be bool,"
f" but got type {type(options.get('op_time'))}, it will be set to true.")
options["op_time"] = True
if isinstance(options.get("timeline_limit"), bool) or not isinstance(options.get("timeline_limit"), int):
logger.warning(
"The 'timeline_limit' parameter of the environment variable MS_PROFILE_OPTIONS must be int,"
f" but got type {type(options.get('timeline_limit'))}, it will be set to 500.")
options["timeline_limit"] = 500
if options.get('timeline_limit') <= 0:
logger.warning(
"The 'timeline_limit' parameter of the environment variable MS_PROFILE_OPTIONS must be greater than 0.")
options["timeline_limit"] = 500
absolute_path = os.path.join(os.getcwd(), "data")
if not isinstance(options.get("output_path"), str):
logger.warning(
@ -160,28 +174,25 @@ def parse_profiling_args(options):
def combine_profile_options(profiling_options):
"""Combined profiling options."""
factor_s_to_us = 1e7
output_path = os.path.join(os.getcwd(), "data")
if context.get_context("device_target").upper() == "GPU":
sync_enable = profiling_options.get("sync_enable", True)
else:
sync_enable = profiling_options.get("sync_enable", False)
options = {
config_options = {
"start": profiling_options.get('start', False),
"start_time": int(time.time() * factor_s_to_us),
"pid": os.getpid(),
"output_path": profiling_options.get('output_path', output_path),
"file_output_path": "",
"profile_memory": profiling_options.get("profile_memory", False),
"profile_communication": profiling_options.get("profile_communication", False),
"aicore_metrics": profiling_options.get("aicore_metrics", 0),
"l2_cache": profiling_options.get("l2_cache", False),
"sync_enable": sync_enable,
"sync_enable": profiling_options.get("sync_enable", True),
"data_process": profiling_options.get("data_process", True),
"timeline_limit": profiling_options.get("timeline_limit", 500),
"parallel_strategy": profiling_options.get("parallel_strategy", True),
'op_time': profiling_options.get("op_time", True)
}
return options
combine_options = parse_profiling_args(config_options)
if combine_options.get("start"):
final_options = parse_device_support_param(profiling_options, combine_options)
return final_options
return combine_options
class EnvProfiler:
@ -214,14 +225,15 @@ def profiler_check_env():
if not config.get("start"):
return
Profiler(output_path=config.get("output_path"),
profile_memory=config.get("profile_memory"),
profile_communication=config.get("profile_communication"),
data_process=config.get("data_process"),
parallel_strategy=config.get("parallel_strategy"),
aicore_metrics=config.get("aicore_metrics"),
l2_cache=config.get("l2_cache"),
sync_enable=config.get("sync_enable"),
timeline_limit=config.get("timeline_limit"))
profile_memory=config.get("profile_memory", False),
profile_communication=config.get("profile_communication", False),
data_process=config.get("data_process", False),
parallel_strategy=config.get("parallel_strategy", False),
aicore_metrics=config.get("aicore_metrics", 0),
l2_cache=config.get("l2_cache", False),
sync_enable=config.get("sync_enable", False),
op_time=config.get("op_time", False),
timeline_limit=config.get("timeline_limit", 500))
profiler_check_env()

View File

@ -78,7 +78,7 @@ class MinddataProfilingAnalyzer:
try:
validated_dir = validate_and_normalize_path(dir_name)
except RuntimeError as path_error:
logger.warning('<%s> <%s> is invalid.', dir_type, validated_dir)
logger.warning('<%s> is invalid.', dir_type)
raise ProfilerPathErrorException(dir_type + 'is invalid.') from path_error
if not os.path.isdir(validated_dir):

View File

@ -18,6 +18,7 @@ import stat
import time
import json
import glob
from enum import Enum
from mindspore import log as logger, context
from mindspore.communication.management import GlobalComm, get_rank, get_group_size
@ -62,6 +63,14 @@ AICORE_METRICS_DICT = {
}
class DeviceSupportParam(Enum):
"""The device target enum."""
CPU = ['start', 'output_path', 'timeline_limit']
GPU = ['start', 'output_path', 'data_process', 'timeline_limit', 'sync_enable', 'op_time']
ASCEND = ['start', 'output_path', 'data_process', 'timeline_limit', 'profile_memory', 'parallel_strategy',
'profile_communication', 'aicore_metrics', 'l2_cache', 'op_time']
def _environment_check():
if c_expression.security.enable_security():
raise RuntimeError("Profiler is not supported when MindSpore is compiled with \'-s on\'.")
@ -206,9 +215,9 @@ class Profiler:
msg = "Do not init twice in the profiler."
raise RuntimeError(msg)
Profiler._has_initialized = True
self._parser_kwargs(kwargs)
# get device_id and device_target
self._get_devid_rankid_and_devtarget()
self._parser_kwargs(kwargs)
self._get_output_path(kwargs)
self._decide_device_target(kwargs)
if self.start_profile:
@ -328,7 +337,6 @@ class Profiler:
if self._device_target and self._device_target != DeviceTarget.CPU.value and cpu_op_file:
self._is_heterogeneous = True
ProfilerInfo.set_analyse_start_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
self._init_profiler_info()
if self._device_target and self._device_target == DeviceTarget.CPU.value:
self._cpu_analyse()
@ -338,6 +346,7 @@ class Profiler:
elif self._device_target and self._device_target == DeviceTarget.ASCEND.value:
self._ascend_analyse()
logger.info("Profiling: all the data have been analyzed.")
self._init_profiler_info()
ProfilerInfo.set_analyse_end_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
ProfilerInfo.set_rank_size(self._rank_size)
ProfilerInfo.set_heterogeneous(self._is_heterogeneous)
@ -474,9 +483,10 @@ class Profiler:
self._output_path = options.get('file_output_path')
self._profile_memory = options.get('profile_memory')
self._parallel_strategy = options.get('parallel_strategy')
self._timeline_size_limit_byte = options.get('timeline_limit')
self._timeline_size_limit_byte = options.get('timeline_limit') * 1024 * 1024
self._data_process = options.get('data_process')
self._profile_communication = options.get('profile_communication')
self._op_time = options.get('op_time')
self._device_target = context.get_context("device_target").lower()
self._profiler_manager = c_expression.ProfilerManager.get_instance()
self._cpu_profiler = c_expression.Profiler.get_instance("CPU")
@ -494,7 +504,8 @@ class Profiler:
mode = "graph"
if context.get_context("mode") == context.PYNATIVE_MODE:
mode = "pynative"
ProfilerInfo.init_info(mode, self._rank_id)
store_id = self._dev_id if self._device_target == DeviceTarget.GPU.value else self._rank_id
ProfilerInfo.init_info(mode, store_id)
def _decide_device_target(self, kwargs):
"""Complete Profiler initialization according to device_target"""
@ -624,6 +635,7 @@ class Profiler:
self._sync_enable = kwargs.pop("sync_enable", True)
if not isinstance(self._sync_enable, bool):
logger.warning("The parameter sync_enable is an invalid value, it will be set to True.")
self._sync_enable = True
def _parse_parameter_for_ascend(self, kwargs):
"""Parse parameter in Proflier when the device target is Ascend."""
@ -636,8 +648,9 @@ class Profiler:
self._profile_communication = kwargs.pop("profile_communication", False)
if not isinstance(self._profile_communication, bool):
raise TypeError(f"For '{self.__class__.__name__}', the parameter profile_communication must be bool, "
f"but got type {type(self._profile_communication)}")
logger.warning(f"For '{self.__class__.__name__}', the parameter profile_communication must be bool, "
f"but got type {type(self._profile_communication)}, it will be set to False.")
self._profile_communication = False
if self._profile_communication:
hccl_option = {"output": self._output_path, "task_trace": "on"}
@ -648,21 +661,26 @@ class Profiler:
self._profile_memory = kwargs.pop("profile_memory", False)
if not isinstance(self._profile_memory, bool):
raise TypeError(f"For '{self.__class__.__name__}', the parameter profile_memory must be bool, "
f"but got type '{type(self._profile_memory)}'")
logger.warning(f"For '{self.__class__.__name__}', the parameter profile_memory must be bool, "
f"but got type {type(self._profile_memory)}, it will be set to False.")
self._profile_memory = False
self._aicore_metrics_id = kwargs.pop("aicore_metrics", 0)
if not isinstance(self._aicore_metrics_id, int):
raise TypeError(f"For '{self.__class__.__name__}', the parameter aicore_metrics must be int, "
f"but got type {type(self._aicore_metrics_id)}")
logger.warning(f"For '{self.__class__.__name__}', the parameter aicore_metrics must be int, "
f"but got type {type(self._aicore_metrics_id)}, it will be set to 0.")
self._aicore_metrics_id = 0
if self._aicore_metrics_id not in AICORE_METRICS_DICT:
raise ValueError(f"For '{self.__class__.__name__}', the parameter aicore_metrics must be in "
f"[-1, 0, 1, 2, 3, 4, 5], but got {self._aicore_metrics_id}")
logger.warning(f"For '{self.__class__.__name__}', the parameter aicore_metrics must be in "
f"[-1, 0, 1, 2, 3, 4, 5], but got {self._aicore_metrics_id}, it will be set to 0.")
self._aicore_metrics_id = 0
l2_cache_enable = kwargs.pop("l2_cache", False)
if not isinstance(l2_cache_enable, bool):
raise TypeError(f"For '{self.__class__.__name__}', the parameter l2_cache must be bool, "
f"but got type {type(l2_cache_enable)}")
logger.warning(f"For '{self.__class__.__name__}', the parameter l2_cache must be bool, "
f"but got type {type(l2_cache_enable)}, it will be set to False.")
l2_cache_enable = False
if l2_cache_enable:
self._l2_cache = "on"
else:
@ -670,15 +688,9 @@ class Profiler:
self._parallel_strategy = kwargs.pop("parallel_strategy", True)
if not isinstance(self._parallel_strategy, bool):
raise TypeError(f"For '{self.__class__.__name__}', the parameter parallel_strategy must be bool, "
f"but got type {type(self._parallel_strategy)}")
self._sync_enable = kwargs.pop("sync_enable", False)
if self._sync_enable:
logger.warning(f"The parameter sync_enable is not supported on Ascend currently.")
if kwargs:
logger.warning("%s are invalid params which don't work.", kwargs)
logger.warning(f"For '{self.__class__.__name__}', the parameter parallel_strategy must be bool, "
f"but got type {type(self._parallel_strategy)}, it will be set to True.")
self._parallel_strategy = True
task_sink = os.getenv("GRAPH_OP_RUN")
if task_sink and task_sink == "1":
@ -766,10 +778,10 @@ class Profiler:
def _ascend_dynamic_net_analyse(self):
"""Analyse dynamic shape network info."""
if self._profile_communication:
raise RuntimeError(
logger.warning(
"The profile_communication parameter cannot be set on the dynamic shape network.")
if self._profile_memory:
raise RuntimeError("The profile_memory parameter cannot be set on the dynamic shape network.")
logger.warning("The profile_memory parameter cannot be set on the dynamic shape network.")
logger.warning(
"[Profiler]Dynamic Shape network does not support collecting step trace performance data currently.")
dynamic_parser = DynamicFrameWorkParser(self._output_path, self._rank_id)
@ -1337,20 +1349,32 @@ class Profiler:
"""Parse kwargs vale."""
self._data_process = kwargs.pop("data_process", True)
if not isinstance(self._data_process, bool):
raise TypeError(f"For '{self.__class__.__name__}', the parameter data_process must be bool, "
f"but got type {type(self._data_process)}")
logger.warning(f"For '{self.__class__.__name__}', the parameter data_process must be bool, "
f"but got type {type(self._data_process)}, it will be set to True.")
self._data_process = True
self._op_time = kwargs.pop("op_time", True)
if not isinstance(self._op_time, bool):
raise TypeError(f"For '{self.__class__.__name__}', the parameter op_time must be bool, "
f"but got type {type(self._op_time)}")
logger.warning(f"For '{self.__class__.__name__}', the parameter op_time must be bool, "
f"but got type {type(self._op_time)}, it will be set to True.")
self._op_time = True
timeline_limit = kwargs.pop("timeline_limit", 500)
if not isinstance(timeline_limit, int):
raise TypeError(f"For '{self.__class__.__name__}', the parameter timeline_limit must be int, "
f"but got type {type(timeline_limit)}")
logger.warning(f"For '{self.__class__.__name__}', the parameter timeline_limit must be int, "
f"but got type {type(timeline_limit)}, it will be set to 500.")
timeline_limit = 500
if timeline_limit <= 0:
logger.warning(
"[Profiler]The 'timeline_limit' parameter must be greater than 0, it will be set to 500.")
timeline_limit = 500
self._timeline_size_limit_byte = timeline_limit * 1024 * 1024
for param in kwargs.keys():
if param not in DeviceSupportParam.__getattr__(f'{self._device_target}'.upper()).value \
and kwargs.get(param):
logger.warning("%s are invalid param which don't work.", param)
def _analyse_hccl_info(self):
"""Analyse hccl info."""
hccl_path = os.path.join(self._output_path, "hccl_info_{}".format(self._rank_id))