forked from mindspore-Ecosystem/mindspore
!17461 MD Profiling Analyze: Handle cpu util with missing ops. Enhance UTs.
From: @cathwong Reviewed-by: @robingrosman Signed-off-by: @robingrosman
This commit is contained in:
commit
c4757f0692
|
@ -25,6 +25,7 @@ from mindspore.profiler.common.exceptions.exceptions import \
|
|||
from mindspore import log as logger
|
||||
from mindspore.profiler.common.validator.validate_path import validate_and_normalize_path
|
||||
|
||||
|
||||
class MinddataProfilingAnalyzer:
|
||||
"""
|
||||
The analyzer for MindData profiling files.
|
||||
|
@ -409,12 +410,13 @@ class MinddataProfilingAnalyzer:
|
|||
|
||||
return return_dict
|
||||
|
||||
def _parse_cpu_util_info(self, cpu_util_info):
|
||||
def _parse_cpu_util_info(self, cpu_util_info, num_pipeline_ops):
|
||||
"""
|
||||
Parse and process the CPU profiling information.
|
||||
|
||||
Args:
|
||||
cpu_util_info (dict): The CPU utilization profiling information.
|
||||
num_pipeline_ops (int): Number of ops in the pipeline information.
|
||||
|
||||
Returns:
|
||||
Dictionary with analyzed summary output information
|
||||
|
@ -438,20 +440,31 @@ class MinddataProfilingAnalyzer:
|
|||
# - overage cpu utilization for each op
|
||||
dict_opid_cpuutil = {}
|
||||
for op in cpu_util_info["op_info"]:
|
||||
op_sys, op_usr = op["metrics"]["sys_utilization"], op["metrics"]["user_utilization"]
|
||||
dict_opid_cpuutil[op["op_id"]] = [op_sys[i] + op_usr[i] for i in range(len(op_sys))]
|
||||
# Note: The CPU utilization data may have an extra entry with op_id=-1
|
||||
# Omit info for op_id=1
|
||||
if op["op_id"] != -1:
|
||||
op_sys, op_usr = op["metrics"]["sys_utilization"], op["metrics"]["user_utilization"]
|
||||
dict_opid_cpuutil[op["op_id"]] = [op_sys[i] + op_usr[i] for i in range(len(op_sys))]
|
||||
|
||||
oplist_avg_cpu_pct = []
|
||||
# Produce a warning if the CPU utilization data and pipeline data do not include information
|
||||
# for the same number of ops
|
||||
# Note: There are cases in which CPU utilization data does not have information for some ops
|
||||
if len(dict_opid_cpuutil) != num_pipeline_ops:
|
||||
warning_msg = 'Number of ops for CPU utilization data: ' + str(len(dict_opid_cpuutil)) + \
|
||||
' does not match number of ops for pipeline data: ' + str(num_pipeline_ops)
|
||||
logger.warning(warning_msg)
|
||||
|
||||
# Initialize oplist_avg_cpu_pct with -1 for each pipeline op, since
|
||||
# CPU utilization data may not have information for each pipeline op
|
||||
oplist_avg_cpu_pct = [-1] * num_pipeline_ops
|
||||
total_cpu = 0
|
||||
# Note: The CPU utilization data has an extra entry with op_id=-1
|
||||
for op_id, cpu in dict_opid_cpuutil.items():
|
||||
if op_id != -1:
|
||||
op_avg_cpu_pct = sum(cpu) / len(cpu) if cpu else 0
|
||||
oplist_avg_cpu_pct.append((op_id, round(op_avg_cpu_pct, 2)))
|
||||
total_cpu += op_avg_cpu_pct
|
||||
op_avg_cpu_pct = sum(cpu) / len(cpu) if cpu else 0
|
||||
oplist_avg_cpu_pct[op_id] = round(op_avg_cpu_pct, 2)
|
||||
total_cpu += op_avg_cpu_pct
|
||||
|
||||
return_dict = {}
|
||||
return_dict['avg_cpu_pct'] = [x[1] for x in oplist_avg_cpu_pct]
|
||||
return_dict['avg_cpu_pct'] = oplist_avg_cpu_pct
|
||||
return return_dict
|
||||
|
||||
def _parse_device_trace_info(self, device_trace_info):
|
||||
|
@ -527,6 +540,7 @@ class MinddataProfilingAnalyzer:
|
|||
|
||||
# Check if pipeline does not contain a DeviceQueue op
|
||||
op_names = summary_dict.get('op_names')
|
||||
|
||||
if 'DeviceQueue' not in op_names:
|
||||
# No need for further bottleneck processing
|
||||
return return_dict
|
||||
|
@ -693,7 +707,8 @@ class MinddataProfilingAnalyzer:
|
|||
summary_dict.update(self._parse_pipeline_info(pipeline_info))
|
||||
|
||||
# Parse and process CPU utilization information
|
||||
summary_dict.update(self._parse_cpu_util_info(cpu_util_info))
|
||||
# Supply the number of ops from the pipeline information
|
||||
summary_dict.update(self._parse_cpu_util_info(cpu_util_info, len(summary_dict.get('pipeline_ops'))))
|
||||
|
||||
if device_trace_info is not None:
|
||||
# Parse and process dataset iterator (CPU) or device queue (CPU, Ascend) trace profiling information
|
||||
|
|
|
@ -85,10 +85,113 @@ def test_profiling_complex_pipeline():
|
|||
data = json.load(f)
|
||||
op_info = data["op_info"]
|
||||
assert len(op_info) == 5
|
||||
for i in range(5):
|
||||
if op_info[i]["op_type"] != "ZipOp":
|
||||
assert "size" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "length" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "throughput" in op_info[i]["metrics"]["output_queue"]
|
||||
else:
|
||||
# Note: Zip is an inline op and hence does not have metrics information
|
||||
assert op_info[i]["metrics"] is None
|
||||
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
os.remove(PIPELINE_FILE)
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
os.remove(DATASET_ITERATOR_FILE)
|
||||
del os.environ['PROFILING_MODE']
|
||||
del os.environ['MINDDATA_PROFILING_DIR']
|
||||
|
||||
|
||||
def test_profiling_inline_ops_pipeline1():
|
||||
"""
|
||||
Test pipeline with inline ops: Concat and EpochCtrl
|
||||
Generator ->
|
||||
Concat -> EpochCtrl
|
||||
Generator ->
|
||||
"""
|
||||
os.environ['PROFILING_MODE'] = 'true'
|
||||
os.environ['MINDDATA_PROFILING_DIR'] = '.'
|
||||
os.environ['DEVICE_ID'] = '1'
|
||||
|
||||
# In source1 dataset: Number of rows is 3; its values are 0, 1, 2
|
||||
def source1():
|
||||
for i in range(3):
|
||||
yield (np.array([i]),)
|
||||
|
||||
# In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9
|
||||
def source2():
|
||||
for i in range(3, 10):
|
||||
yield (np.array([i]),)
|
||||
|
||||
data1 = ds.GeneratorDataset(source1, ["col1"])
|
||||
data2 = ds.GeneratorDataset(source2, ["col1"])
|
||||
data3 = data1.concat(data2)
|
||||
|
||||
# Here i refers to index, d refers to data element
|
||||
for i, d in enumerate(data3.create_tuple_iterator(output_numpy=True)):
|
||||
t = d
|
||||
assert i == t[0][0]
|
||||
|
||||
assert sum([1 for _ in data3]) == 10
|
||||
|
||||
with open(PIPELINE_FILE) as f:
|
||||
data = json.load(f)
|
||||
op_info = data["op_info"]
|
||||
assert len(op_info) == 4
|
||||
for i in range(4):
|
||||
assert "size" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "length" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "throughput" in op_info[i]["metrics"]["output_queue"]
|
||||
# Note: The following ops are inline ops: Concat, EpochCtrl
|
||||
if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"):
|
||||
# Confirm these inline ops do not have metrics information
|
||||
assert op_info[i]["metrics"] is None
|
||||
else:
|
||||
assert "size" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "length" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "throughput" in op_info[i]["metrics"]["output_queue"]
|
||||
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
os.remove(PIPELINE_FILE)
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
os.remove(DATASET_ITERATOR_FILE)
|
||||
del os.environ['PROFILING_MODE']
|
||||
del os.environ['MINDDATA_PROFILING_DIR']
|
||||
|
||||
|
||||
def test_profiling_inline_ops_pipeline2():
|
||||
"""
|
||||
Test pipeline with many inline ops
|
||||
Generator -> Rename -> Skip -> Repeat -> Take
|
||||
"""
|
||||
os.environ['PROFILING_MODE'] = 'true'
|
||||
os.environ['MINDDATA_PROFILING_DIR'] = '.'
|
||||
os.environ['DEVICE_ID'] = '1'
|
||||
|
||||
# In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
|
||||
def source1():
|
||||
for i in range(10):
|
||||
yield (np.array([i]),)
|
||||
|
||||
data1 = ds.GeneratorDataset(source1, ["col1"])
|
||||
data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"])
|
||||
data1 = data1.skip(2)
|
||||
data1 = data1.repeat(2)
|
||||
data1 = data1.take(12)
|
||||
|
||||
for _ in data1:
|
||||
pass
|
||||
|
||||
with open(PIPELINE_FILE) as f:
|
||||
data = json.load(f)
|
||||
op_info = data["op_info"]
|
||||
assert len(op_info) == 5
|
||||
for i in range(5):
|
||||
# Check for these inline ops
|
||||
if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"):
|
||||
# Confirm these inline ops do not have metrics information
|
||||
assert op_info[i]["metrics"] is None
|
||||
else:
|
||||
assert "size" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "length" in op_info[i]["metrics"]["output_queue"]
|
||||
assert "throughput" in op_info[i]["metrics"]["output_queue"]
|
||||
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
os.remove(PIPELINE_FILE)
|
||||
|
@ -132,4 +235,6 @@ def test_profiling_sampling_interval():
|
|||
if __name__ == "__main__":
|
||||
test_profiling_simple_pipeline()
|
||||
test_profiling_complex_pipeline()
|
||||
test_profiling_inline_ops_pipeline1()
|
||||
test_profiling_inline_ops_pipeline2()
|
||||
test_profiling_sampling_interval()
|
||||
|
|
Loading…
Reference in New Issue