fix bugs for device_id_to_rank_id
This commit is contained in:
parent
3d10610bc9
commit
3e5cb3b506
|
@ -59,6 +59,7 @@ class HcclParser:
|
|||
Args:
|
||||
source_dir (str): The hccl source dir.
|
||||
device_id (str): The device ID.
|
||||
rank_id (str): The rank ID.
|
||||
output_path (str): The directory of the parsed file. Default: `./`.
|
||||
|
||||
Raises:
|
||||
|
@ -68,8 +69,9 @@ class HcclParser:
|
|||
_parsed_hccl_file_name = 'hccl_raw_{}.csv'
|
||||
_col_names = ['step_num', 'communication_cost', 'wait_cost', 'link_info', 'communication_operator_cost']
|
||||
|
||||
def __init__(self, source_dir, device_id, output_path):
|
||||
def __init__(self, source_dir, device_id, rank_id, output_path):
|
||||
self._dev_id = device_id
|
||||
self._rank_id = rank_id
|
||||
self._source_dir = source_dir
|
||||
self._save_path = self._get_save_path(output_path)
|
||||
self._step_trace_info = self._get_step_trace_info(output_path)
|
||||
|
@ -136,14 +138,14 @@ class HcclParser:
|
|||
"""
|
||||
output_path = self._validate_dir_path(output_path)
|
||||
return os.path.join(
|
||||
output_path, self._parsed_hccl_file_name.format(self._dev_id)
|
||||
output_path, self._parsed_hccl_file_name.format(self._rank_id)
|
||||
)
|
||||
|
||||
def _get_step_trace_info(self, source_dir):
|
||||
"""Get the start and end timestamps in a step and communication operators names."""
|
||||
file_path = os.path.join(
|
||||
source_dir,
|
||||
f'step_trace_raw_{self._dev_id}_detail_time.csv'
|
||||
f'step_trace_raw_{self._rank_id}_detail_time.csv'
|
||||
)
|
||||
try:
|
||||
file_path = validate_and_normalize_path(file_path)
|
||||
|
@ -171,14 +173,15 @@ class HcclParser:
|
|||
"""Get the name of communication operators mapping between hccl and step trace."""
|
||||
dir_path = self._validate_dir_path(self._source_dir)
|
||||
# The name of the operator in hccl is like:operatorName_{Ordered_number}_xx_xx.
|
||||
operators_names_in_hccl = [entry.name for entry in os.scandir(dir_path) if entry.is_dir()]
|
||||
operators_names_in_hccl = [entry.name for entry in os.scandir(dir_path) if entry.is_dir()
|
||||
and entry.name.endswith(self._dev_id)]
|
||||
operators_names_in_hccl_set = set({i.split('_')[0] for i in operators_names_in_hccl})
|
||||
op_names_in_hccl_dic = dict()
|
||||
for item in operators_names_in_hccl_set:
|
||||
op_names_in_hccl_dic[item] = sorted([i for i in operators_names_in_hccl if i.split('_')[0] == item],
|
||||
key=lambda x: int(x.split('_')[1]))
|
||||
|
||||
# The op_info in step trace is like:[op_name,op_name_start_point,op_name_end_point]
|
||||
# The op_info in step trace is like: [op_name,op_name_start_point,op_name_end_point]
|
||||
# The name of the operator in step trace can be obtained every three.
|
||||
# The name of the operator in step trace is like: stream_xx_xx_operatorName-opxx.
|
||||
operators_names_in_step_trace = [self._step_trace_info[0][i]
|
||||
|
@ -219,7 +222,8 @@ class HcclParser:
|
|||
"""Obtain time-consuming information of all communication operators."""
|
||||
operators_cost_info = dict()
|
||||
dir_path = self._validate_dir_path(dir_path)
|
||||
operators_dir = [entry.name for entry in os.scandir(dir_path) if entry.is_dir()]
|
||||
operators_dir = [entry.name for entry in os.scandir(dir_path) if entry.is_dir()
|
||||
and entry.name.endswith(self._dev_id)]
|
||||
operator_dir_path = [os.path.join(dir_path, operator_dir) for operator_dir in operators_dir]
|
||||
for operator_dir in operator_dir_path:
|
||||
operator_cost = self._calculate_communication_operator_cost(operator_dir)
|
||||
|
|
|
@ -649,6 +649,10 @@ class Profiler:
|
|||
|
||||
def _get_output_path(self, kwargs):
|
||||
"""Get output path of profiling data."""
|
||||
if os.getenv("MS_DIAGNOSTIC_DATA_PATH") and kwargs.get("output_path") is not None:
|
||||
logger.warning("Both parameter output_path and environment variable MS_DIAGNOSTIC_DATA_PATH"
|
||||
" have values set, and the profiling data saving path is the value set "
|
||||
"in parameter output_path")
|
||||
if kwargs.get("output_path") is None:
|
||||
if "output_path" in kwargs:
|
||||
kwargs.pop("output_path")
|
||||
|
@ -690,7 +694,7 @@ class Profiler:
|
|||
raise ImportError(err)
|
||||
logger.info("Parse hccl info successfully.")
|
||||
logger.info("Start analyse hccl info.")
|
||||
hccl_parse = HcclParser(hccl_path, self._dev_id, self._output_path)
|
||||
hccl_parse = HcclParser(hccl_path, self._dev_id, self._rank_id, self._output_path)
|
||||
hccl_parse.parse()
|
||||
logger.info("Analyse hccl info successfully.")
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ class TestHcclParser:
|
|||
)
|
||||
shutil.copyfile(os.path.join(PROFILER_DIR, 'step_trace_raw_6_detail_time.csv'),
|
||||
os.path.join(self._output_path, 'step_trace_raw_6_detail_time.csv'))
|
||||
self._parser = HcclParser(os.path.join(PROFILER_DIR, 'hccl_info'), '6', self._output_path)
|
||||
self._parser = HcclParser(os.path.join(PROFILER_DIR, 'hccl_info'), '6', '6', self._output_path)
|
||||
|
||||
def teardown_method(self) -> None:
|
||||
"""Clear up after test case execution."""
|
||||
|
|
Loading…
Reference in New Issue