forked from mindspore-Ecosystem/mindspore
!22274 MD Profiling UT: Use pytest setup and teardown
Merge pull request !22274 from cathwong/ckw_mon_py_analyze_ut5
This commit is contained in:
commit
661c83e17f
|
@ -24,150 +24,157 @@ import mindspore.dataset as ds
|
|||
import mindspore.dataset.transforms.c_transforms as C
|
||||
from mindspore.profiler.parser.minddata_analyzer import MinddataProfilingAnalyzer
|
||||
|
||||
PIPELINE_FILE = "./pipeline_profiling_7.json"
|
||||
CPU_UTIL_FILE = "./minddata_cpu_utilization_7.json"
|
||||
DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_7.txt"
|
||||
SUMMARY_JSON_FILE = "./minddata_pipeline_summary_7.json"
|
||||
SUMMARY_CSV_FILE = "./minddata_pipeline_summary_7.csv"
|
||||
ANALYZE_FILE_PATH = "./"
|
||||
|
||||
# These are the minimum subset of expected keys (in alphabetical order) in the MindData Analyzer summary output
|
||||
|
||||
# This is the set of keys for success case
|
||||
EXPECTED_SUMMARY_KEYS_SUCCESS = \
|
||||
['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names',
|
||||
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
|
||||
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
|
||||
|
||||
# This is the set of keys for the case which omits the keys for composite computation of more than one raw file.
|
||||
# This is used for the invalid user case in which the number of ops in the pipeline file does not match
|
||||
# the number of ops in the CPU utilization file.
|
||||
EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE = \
|
||||
['avg_cpu_pct', 'children_ids', 'num_workers', 'op_ids', 'op_names',
|
||||
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
|
||||
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
|
||||
|
||||
|
||||
def get_csv_result(file_pathname):
|
||||
class TestMinddataProfilingAnalyzer():
|
||||
"""
|
||||
Get result from the CSV file.
|
||||
|
||||
Args:
|
||||
file_pathname (str): The CSV file pathname.
|
||||
|
||||
Returns:
|
||||
list[list], the parsed CSV information.
|
||||
"""
|
||||
result = []
|
||||
with open(file_pathname, 'r') as csvfile:
|
||||
csv_reader = csv.reader(csvfile)
|
||||
for row in csv_reader:
|
||||
result.append(row)
|
||||
return result
|
||||
|
||||
|
||||
def set_profiling_env_var():
|
||||
"""
|
||||
Set the MindData Profiling environment variables
|
||||
"""
|
||||
os.environ['PROFILING_MODE'] = 'true'
|
||||
os.environ['MINDDATA_PROFILING_DIR'] = '.'
|
||||
os.environ['DEVICE_ID'] = '7'
|
||||
|
||||
|
||||
def delete_profiling_files():
|
||||
"""
|
||||
Delete the MindData profiling files generated from the test.
|
||||
Also disable the MindData Profiling environment variables.
|
||||
"""
|
||||
# Delete MindData profiling files
|
||||
os.remove(PIPELINE_FILE)
|
||||
os.remove(CPU_UTIL_FILE)
|
||||
os.remove(DATASET_ITERATOR_FILE)
|
||||
|
||||
# Delete MindData profiling analyze summary files
|
||||
os.remove(SUMMARY_JSON_FILE)
|
||||
os.remove(SUMMARY_CSV_FILE)
|
||||
|
||||
# Disable MindData Profiling environment variables
|
||||
del os.environ['PROFILING_MODE']
|
||||
del os.environ['MINDDATA_PROFILING_DIR']
|
||||
del os.environ['DEVICE_ID']
|
||||
|
||||
|
||||
def verify_md_summary(md_summary_dict, EXPECTED_SUMMARY_KEYS):
|
||||
"""
|
||||
Verify the content of the 3 variations of the MindData Profiling analyze summary output.
|
||||
Test the MinddataProfilingAnalyzer class
|
||||
"""
|
||||
|
||||
# Confirm MindData Profiling analyze summary files are created
|
||||
assert os.path.exists(SUMMARY_JSON_FILE) is True
|
||||
assert os.path.exists(SUMMARY_CSV_FILE) is True
|
||||
def setup_class(self):
|
||||
"""
|
||||
Run once for the class
|
||||
"""
|
||||
# Define filenames and path used for the MinddataProfilingAnalyzer tests. Use device_id=7.
|
||||
self._PIPELINE_FILE = "./pipeline_profiling_7.json"
|
||||
self._CPU_UTIL_FILE = "./minddata_cpu_utilization_7.json"
|
||||
self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_7.txt"
|
||||
self._SUMMARY_JSON_FILE = "./minddata_pipeline_summary_7.json"
|
||||
self._SUMMARY_CSV_FILE = "./minddata_pipeline_summary_7.csv"
|
||||
self._ANALYZE_FILE_PATH = "./"
|
||||
|
||||
# Build a list of the sorted returned keys
|
||||
summary_returned_keys = list(md_summary_dict.keys())
|
||||
summary_returned_keys.sort()
|
||||
# These are the minimum subset of expected keys (in alphabetical order) in the MindData Analyzer summary output
|
||||
|
||||
# 1. Confirm expected keys are in returned keys
|
||||
for k in EXPECTED_SUMMARY_KEYS:
|
||||
assert k in summary_returned_keys
|
||||
# This is the set of keys for success case
|
||||
self._EXPECTED_SUMMARY_KEYS_SUCCESS = \
|
||||
['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names',
|
||||
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
|
||||
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
|
||||
|
||||
# Read summary JSON file
|
||||
with open(SUMMARY_JSON_FILE) as f:
|
||||
summary_json_data = json.load(f)
|
||||
# Build a list of the sorted JSON keys
|
||||
summary_json_keys = list(summary_json_data.keys())
|
||||
summary_json_keys.sort()
|
||||
|
||||
# 2a. Confirm expected keys are in JSON file keys
|
||||
for k in EXPECTED_SUMMARY_KEYS:
|
||||
assert k in summary_json_keys
|
||||
|
||||
# 2b. Confirm returned dictionary keys are identical to JSON file keys
|
||||
np.testing.assert_array_equal(summary_returned_keys, summary_json_keys)
|
||||
|
||||
# Read summary CSV file
|
||||
summary_csv_data = get_csv_result(SUMMARY_CSV_FILE)
|
||||
# Build a list of the sorted CSV keys from the first column in the CSV file
|
||||
summary_csv_keys = []
|
||||
for x in summary_csv_data:
|
||||
summary_csv_keys.append(x[0])
|
||||
summary_csv_keys.sort()
|
||||
|
||||
# 3a. Confirm expected keys are in the first column of the CSV file
|
||||
for k in EXPECTED_SUMMARY_KEYS:
|
||||
assert k in summary_csv_keys
|
||||
|
||||
# 3b. Confirm returned dictionary keys are identical to CSV file first column keys
|
||||
np.testing.assert_array_equal(summary_returned_keys, summary_csv_keys)
|
||||
# This is the set of keys for the case which omits the keys for composite computation of more than one raw file.
|
||||
# This is used for the invalid user case in which the number of ops in the pipeline file does not match
|
||||
# the number of ops in the CPU utilization file.
|
||||
self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE = \
|
||||
['avg_cpu_pct', 'children_ids', 'num_workers', 'op_ids', 'op_names',
|
||||
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
|
||||
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
|
||||
|
||||
|
||||
def setup_method(self):
|
||||
"""
|
||||
Run before each test function.
|
||||
"""
|
||||
# Confirm MindData Profiling files do not yet exist
|
||||
assert os.path.exists(self._PIPELINE_FILE) is False
|
||||
assert os.path.exists(self._CPU_UTIL_FILE) is False
|
||||
assert os.path.exists(self._DATASET_ITERATOR_FILE) is False
|
||||
# Confirm MindData Profiling analyze summary files do not yet exist
|
||||
assert os.path.exists(self._SUMMARY_JSON_FILE) is False
|
||||
assert os.path.exists(self._SUMMARY_CSV_FILE) is False
|
||||
|
||||
def test_analyze_basic():
|
||||
"""
|
||||
Test MindData profiling analyze summary files exist with basic pipeline.
|
||||
Also test basic content (subset of keys and values) from the returned summary result.
|
||||
"""
|
||||
# Confirm MindData Profiling files do not yet exist
|
||||
assert os.path.exists(PIPELINE_FILE) is False
|
||||
assert os.path.exists(CPU_UTIL_FILE) is False
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is False
|
||||
# Confirm MindData Profiling analyze summary files do not yet exist
|
||||
assert os.path.exists(SUMMARY_JSON_FILE) is False
|
||||
assert os.path.exists(SUMMARY_CSV_FILE) is False
|
||||
# Set the MindData Profiling environment variables
|
||||
os.environ['PROFILING_MODE'] = 'true'
|
||||
os.environ['MINDDATA_PROFILING_DIR'] = '.'
|
||||
os.environ['DEVICE_ID'] = '7'
|
||||
|
||||
# Enable MindData Profiling environment variables
|
||||
set_profiling_env_var()
|
||||
|
||||
def source1():
|
||||
def teardown_method(self):
|
||||
"""
|
||||
Run after each test function.
|
||||
"""
|
||||
# Delete MindData profiling files generated from the test.
|
||||
os.remove(self._PIPELINE_FILE)
|
||||
os.remove(self._CPU_UTIL_FILE)
|
||||
os.remove(self._DATASET_ITERATOR_FILE)
|
||||
|
||||
# Delete MindData profiling analyze summary files generated from the test.
|
||||
os.remove(self._SUMMARY_JSON_FILE)
|
||||
os.remove(self._SUMMARY_CSV_FILE)
|
||||
|
||||
# Disable MindData Profiling environment variables
|
||||
del os.environ['PROFILING_MODE']
|
||||
del os.environ['MINDDATA_PROFILING_DIR']
|
||||
del os.environ['DEVICE_ID']
|
||||
|
||||
|
||||
def get_csv_result(self, file_pathname):
|
||||
"""
|
||||
Get result from the CSV file.
|
||||
|
||||
Args:
|
||||
file_pathname (str): The CSV file pathname.
|
||||
|
||||
Returns:
|
||||
list[list], the parsed CSV information.
|
||||
"""
|
||||
result = []
|
||||
with open(file_pathname, 'r') as csvfile:
|
||||
csv_reader = csv.reader(csvfile)
|
||||
for row in csv_reader:
|
||||
result.append(row)
|
||||
return result
|
||||
|
||||
|
||||
def verify_md_summary(self, md_summary_dict, EXPECTED_SUMMARY_KEYS):
|
||||
"""
|
||||
Verify the content of the 3 variations of the MindData Profiling analyze summary output.
|
||||
"""
|
||||
|
||||
# Confirm MindData Profiling analyze summary files are created
|
||||
assert os.path.exists(self._SUMMARY_JSON_FILE) is True
|
||||
assert os.path.exists(self._SUMMARY_CSV_FILE) is True
|
||||
|
||||
# Build a list of the sorted returned keys
|
||||
summary_returned_keys = list(md_summary_dict.keys())
|
||||
summary_returned_keys.sort()
|
||||
|
||||
# 1. Confirm expected keys are in returned keys
|
||||
for k in EXPECTED_SUMMARY_KEYS:
|
||||
assert k in summary_returned_keys
|
||||
|
||||
# Read summary JSON file
|
||||
with open(self._SUMMARY_JSON_FILE) as f:
|
||||
summary_json_data = json.load(f)
|
||||
# Build a list of the sorted JSON keys
|
||||
summary_json_keys = list(summary_json_data.keys())
|
||||
summary_json_keys.sort()
|
||||
|
||||
# 2a. Confirm expected keys are in JSON file keys
|
||||
for k in EXPECTED_SUMMARY_KEYS:
|
||||
assert k in summary_json_keys
|
||||
|
||||
# 2b. Confirm returned dictionary keys are identical to JSON file keys
|
||||
np.testing.assert_array_equal(summary_returned_keys, summary_json_keys)
|
||||
|
||||
# Read summary CSV file
|
||||
summary_csv_data = self.get_csv_result(self._SUMMARY_CSV_FILE)
|
||||
# Build a list of the sorted CSV keys from the first column in the CSV file
|
||||
summary_csv_keys = []
|
||||
for x in summary_csv_data:
|
||||
summary_csv_keys.append(x[0])
|
||||
summary_csv_keys.sort()
|
||||
|
||||
# 3a. Confirm expected keys are in the first column of the CSV file
|
||||
for k in EXPECTED_SUMMARY_KEYS:
|
||||
assert k in summary_csv_keys
|
||||
|
||||
# 3b. Confirm returned dictionary keys are identical to CSV file first column keys
|
||||
np.testing.assert_array_equal(summary_returned_keys, summary_csv_keys)
|
||||
|
||||
|
||||
def mysource(self):
|
||||
"""Source for data values"""
|
||||
for i in range(8000):
|
||||
yield (np.array([i]),)
|
||||
|
||||
try:
|
||||
|
||||
def test_analyze_basic(self):
|
||||
"""
|
||||
Test MindData profiling analyze summary files exist with basic pipeline.
|
||||
Also test basic content (subset of keys and values) from the returned summary result.
|
||||
"""
|
||||
# Create this basic and common linear pipeline
|
||||
# Generator -> Map -> Batch -> Repeat -> EpochCtrl
|
||||
|
||||
data1 = ds.GeneratorDataset(source1, ["col1"])
|
||||
data1 = ds.GeneratorDataset(self.mysource, ["col1"])
|
||||
type_cast_op = C.TypeCast(mstype.int32)
|
||||
data1 = data1.map(operations=type_cast_op, input_columns="col1")
|
||||
data1 = data1.batch(16)
|
||||
|
@ -182,12 +189,12 @@ def test_analyze_basic():
|
|||
assert num_iter == 1000
|
||||
|
||||
# Confirm MindData Profiling files are created
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
assert os.path.exists(CPU_UTIL_FILE) is True
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
assert os.path.exists(self._PIPELINE_FILE) is True
|
||||
assert os.path.exists(self._CPU_UTIL_FILE) is True
|
||||
assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
|
||||
|
||||
# Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
|
||||
md_analyzer = MinddataProfilingAnalyzer(ANALYZE_FILE_PATH, 7, ANALYZE_FILE_PATH)
|
||||
md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH)
|
||||
md_summary_dict = md_analyzer.analyze()
|
||||
|
||||
# Verify MindData Profiling Analyze Summary output
|
||||
|
@ -195,7 +202,7 @@ def test_analyze_basic():
|
|||
# 1. returned dictionary
|
||||
# 2. JSON file
|
||||
# 3. CSV file
|
||||
verify_md_summary(md_summary_dict, EXPECTED_SUMMARY_KEYS_SUCCESS)
|
||||
self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_SUCCESS)
|
||||
|
||||
# 4. Verify non-variant values or number of values in the tested pipeline for certain keys
|
||||
# of the returned dictionary
|
||||
|
@ -213,38 +220,14 @@ def test_analyze_basic():
|
|||
assert md_summary_dict["parent_id"] == [-1, 0, 1, 2, 3]
|
||||
assert len(md_summary_dict["avg_cpu_pct"]) == 5
|
||||
|
||||
except Exception as error:
|
||||
delete_profiling_files()
|
||||
raise error
|
||||
|
||||
else:
|
||||
delete_profiling_files()
|
||||
|
||||
|
||||
def test_analyze_sequential_pipelines_invalid():
|
||||
"""
|
||||
Test invalid scenario in which MinddataProfilingAnalyzer is called for two sequential pipelines.
|
||||
"""
|
||||
# Confirm MindData Profiling files do not yet exist
|
||||
assert os.path.exists(PIPELINE_FILE) is False
|
||||
assert os.path.exists(CPU_UTIL_FILE) is False
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is False
|
||||
# Confirm MindData Profiling analyze summary files do not yet exist
|
||||
assert os.path.exists(SUMMARY_JSON_FILE) is False
|
||||
assert os.path.exists(SUMMARY_CSV_FILE) is False
|
||||
|
||||
# Enable MindData Profiling environment variables
|
||||
set_profiling_env_var()
|
||||
|
||||
def source1():
|
||||
for i in range(8000):
|
||||
yield (np.array([i]),)
|
||||
|
||||
try:
|
||||
def test_analyze_sequential_pipelines_invalid(self):
|
||||
"""
|
||||
Test invalid scenario in which MinddataProfilingAnalyzer is called for two sequential pipelines.
|
||||
"""
|
||||
# Create the pipeline
|
||||
# Generator -> Map -> Batch -> EpochCtrl
|
||||
|
||||
data1 = ds.GeneratorDataset(source1, ["col1"])
|
||||
data1 = ds.GeneratorDataset(self.mysource, ["col1"])
|
||||
type_cast_op = C.TypeCast(mstype.int32)
|
||||
data1 = data1.map(operations=type_cast_op, input_columns="col1")
|
||||
data1 = data1.batch(64)
|
||||
|
@ -260,9 +243,9 @@ def test_analyze_sequential_pipelines_invalid():
|
|||
assert num_iter == 125
|
||||
|
||||
# Confirm MindData Profiling files are created
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
assert os.path.exists(CPU_UTIL_FILE) is True
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
assert os.path.exists(self._PIPELINE_FILE) is True
|
||||
assert os.path.exists(self._CPU_UTIL_FILE) is True
|
||||
assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
|
||||
|
||||
# Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1
|
||||
# Note: This pipeline has 3 ops: Generator -> Map -> Batch
|
||||
|
@ -277,18 +260,18 @@ def test_analyze_sequential_pipelines_invalid():
|
|||
# Confirm MindData Profiling files are created
|
||||
# Note: There is an MD bug in which which the pipeline file is not recreated;
|
||||
# it still has 4 ops instead of 3 ops
|
||||
assert os.path.exists(PIPELINE_FILE) is True
|
||||
assert os.path.exists(CPU_UTIL_FILE) is True
|
||||
assert os.path.exists(DATASET_ITERATOR_FILE) is True
|
||||
assert os.path.exists(self._PIPELINE_FILE) is True
|
||||
assert os.path.exists(self._CPU_UTIL_FILE) is True
|
||||
assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
|
||||
|
||||
# Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
|
||||
md_analyzer = MinddataProfilingAnalyzer(ANALYZE_FILE_PATH, 7, ANALYZE_FILE_PATH)
|
||||
md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH)
|
||||
md_summary_dict = md_analyzer.analyze()
|
||||
|
||||
# Verify MindData Profiling Analyze Summary output
|
||||
# Use EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE, since composite keys are not produced, since there is a mismatch
|
||||
# Use self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE, since composite keys are not produced, since there is a mismatch
|
||||
# between the 4 ops in the stale pipeline file versus the 3 ops in the recreated cpu util file
|
||||
verify_md_summary(md_summary_dict, EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE)
|
||||
self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE)
|
||||
|
||||
# Confirm pipeline data wrongly contains info for 4 ops
|
||||
assert md_summary_dict["pipeline_ops"] == ["EpochCtrl(id=0)", "Batch(id=1)", "Map(id=2)",
|
||||
|
@ -296,16 +279,3 @@ def test_analyze_sequential_pipelines_invalid():
|
|||
|
||||
# Verify CPU util data contains info for only 3 ops
|
||||
assert len(md_summary_dict["avg_cpu_pct"]) == 3
|
||||
|
||||
|
||||
except Exception as error:
|
||||
delete_profiling_files()
|
||||
raise error
|
||||
|
||||
else:
|
||||
delete_profiling_files()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_analyze_basic()
|
||||
test_analyze_sequential_pipelines_invalid()
|
||||
|
|
Loading…
Reference in New Issue