make ut parallel

This commit is contained in:
ms_yan 2021-10-21 21:25:39 +08:00
parent a7bff87bbb
commit bb314ad07f
2 changed files with 203 additions and 112 deletions

View File

@ -27,30 +27,46 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
PIPELINE_FILE = "./pipeline_profiling_1.json"
CPU_UTIL_FILE = "./minddata_cpu_utilization_1.json"
DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt"
PIPELINE_FILE = "./pipeline_profiling"
CPU_UTIL_FILE = "./minddata_cpu_utilization"
DATASET_ITERATOR_FILE = "./dataset_iterator_profiling"
# add file name to rank id mapping to avoid file writing crash
file_name_map_rank_id = {"test_profiling_simple_pipeline": "0",
"test_profiling_complex_pipeline": "1",
"test_profiling_inline_ops_pipeline1": "2",
"test_profiling_inline_ops_pipeline2": "3",
"test_profiling_sampling_interval": "4",
"test_profiling_basic_pipeline": "5",
"test_profiling_cifar10_pipeline": "6",
"test_profiling_seq_pipelines_epochctrl3": "7",
"test_profiling_seq_pipelines_epochctrl2": "8",
"test_profiling_seq_pipelines_repeat": "9"}
def set_profiling_env_var():
def set_profiling_env_var(index='0'):
"""
Set the MindData Profiling environment variables
"""
os.environ['PROFILING_MODE'] = 'true'
os.environ['MINDDATA_PROFILING_DIR'] = '.'
os.environ['DEVICE_ID'] = '1'
os.environ['RANK_ID'] = '1'
os.environ['DEVICE_ID'] = index
os.environ['RANK_ID'] = index
def delete_profiling_files():
def delete_profiling_files(index='0'):
"""
Delete the MindData profiling files generated from the test.
Also disable the MindData Profiling environment variables.
"""
# Delete MindData profiling files
os.remove(PIPELINE_FILE)
os.remove(CPU_UTIL_FILE)
os.remove(DATASET_ITERATOR_FILE)
pipeline_file = PIPELINE_FILE + "_" + index + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + index + ".json"
dataset_iterator_file = DATASET_ITERATOR_FILE + "_" + index + ".txt"
os.remove(pipeline_file)
os.remove(cpu_util_file)
os.remove(dataset_iterator_file)
# Disable MindData Profiling environment variables
del os.environ['PROFILING_MODE']
@ -59,22 +75,22 @@ def delete_profiling_files():
del os.environ['RANK_ID']
def confirm_cpuutil(num_pipeline_ops):
def confirm_cpuutil(num_pipeline_ops, cpu_uti_file):
"""
Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
"""
with open(CPU_UTIL_FILE) as file1:
with open(cpu_uti_file) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm <num_pipeline_ops>+1 ops in CPU util file (including op_id=-1 for monitor thread)
assert len(op_info) == num_pipeline_ops + 1
def confirm_ops_in_pipeline(num_ops, op_list):
def confirm_ops_in_pipeline(num_ops, op_list, pipeline_file):
"""
Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
"""
with open(PIPELINE_FILE) as file1:
with open(pipeline_file) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm ops in pipeline file
@ -87,7 +103,12 @@ def test_profiling_simple_pipeline():
"""
Generator -> Shuffle -> Batch
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
dataset_iterator_file = DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["data"])
@ -99,25 +120,25 @@ def test_profiling_simple_pipeline():
assert data1.get_dataset_size() == 32
# Confirm profiling files do not (yet) exist
assert os.path.exists(PIPELINE_FILE) is False
assert os.path.exists(CPU_UTIL_FILE) is False
assert os.path.exists(DATASET_ITERATOR_FILE) is False
assert os.path.exists(pipeline_file) is False
assert os.path.exists(cpu_util_file) is False
assert os.path.exists(dataset_iterator_file) is False
try:
for _ in data1:
pass
# Confirm profiling files now exist
assert os.path.exists(PIPELINE_FILE) is True
assert os.path.exists(CPU_UTIL_FILE) is True
assert os.path.exists(DATASET_ITERATOR_FILE) is True
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
assert os.path.exists(dataset_iterator_file) is True
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_complex_pipeline():
@ -126,7 +147,12 @@ def test_profiling_complex_pipeline():
-> Zip
TFReader -> Shuffle ->
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["gen"])
@ -142,7 +168,7 @@ def test_profiling_complex_pipeline():
for _ in data3:
pass
with open(PIPELINE_FILE) as f:
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 5
@ -156,14 +182,14 @@ def test_profiling_complex_pipeline():
assert op_info[i]["metrics"] is None
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
confirm_cpuutil(5, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_inline_ops_pipeline1():
@ -173,7 +199,11 @@ def test_profiling_inline_ops_pipeline1():
Concat -> EpochCtrl
Generator ->
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
# In source1 dataset: Number of rows is 3; its values are 0, 1, 2
def source1():
@ -201,7 +231,7 @@ def test_profiling_inline_ops_pipeline1():
assert num_iter == 10
# Confirm pipeline is created with EpochCtrl op
with open(PIPELINE_FILE) as f:
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 4
@ -216,14 +246,14 @@ def test_profiling_inline_ops_pipeline1():
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
confirm_cpuutil(4)
confirm_cpuutil(4, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_inline_ops_pipeline2():
@ -231,7 +261,11 @@ def test_profiling_inline_ops_pipeline2():
Test pipeline with many inline ops
Generator -> Rename -> Skip -> Repeat -> Take
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
# In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
def source1():
@ -248,7 +282,7 @@ def test_profiling_inline_ops_pipeline2():
for _ in data1:
pass
with open(PIPELINE_FILE) as f:
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 5
@ -263,21 +297,23 @@ def test_profiling_inline_ops_pipeline2():
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
confirm_cpuutil(5, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_sampling_interval():
"""
Test non-default monitor sampling interval
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
interval_origin = ds.config.get_monitor_sampling_interval()
@ -296,12 +332,12 @@ def test_profiling_sampling_interval():
except Exception as error:
ds.config.set_monitor_sampling_interval(interval_origin)
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
ds.config.set_monitor_sampling_interval(interval_origin)
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_basic_pipeline():
@ -309,7 +345,11 @@ def test_profiling_basic_pipeline():
Test with this basic pipeline
Generator -> Map -> Batch -> Repeat -> EpochCtrl
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
def source1():
for i in range(8000):
@ -332,7 +372,7 @@ def test_profiling_basic_pipeline():
assert num_iter == 1000
with open(PIPELINE_FILE) as f:
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 5
@ -347,14 +387,14 @@ def test_profiling_basic_pipeline():
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
confirm_cpuutil(5, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_cifar10_pipeline():
@ -362,7 +402,11 @@ def test_profiling_cifar10_pipeline():
Test with this common pipeline with Cifar10
Cifar10 -> Map -> Map -> Batch -> Repeat
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
# Create this common pipeline
# Cifar10 -> Map -> Map -> Batch -> Repeat
@ -385,7 +429,7 @@ def test_profiling_cifar10_pipeline():
assert num_iter == 750
with open(PIPELINE_FILE) as f:
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 5
@ -400,14 +444,14 @@ def test_profiling_cifar10_pipeline():
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
confirm_cpuutil(5, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_seq_pipelines_epochctrl3():
@ -417,7 +461,11 @@ def test_profiling_seq_pipelines_epochctrl3():
2) Generator -> Batch
Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(64)]
data1 = ds.GeneratorDataset(source, ["data"])
@ -432,8 +480,8 @@ def test_profiling_seq_pipelines_epochctrl3():
assert num_iter == 2
# Confirm pipeline file and CPU util file each have 3 ops
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
confirm_cpuutil(3)
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
confirm_cpuutil(3, cpu_util_file)
# Test B - Call create_dict_iterator with num_epochs=1
num_iter = 0
@ -444,15 +492,15 @@ def test_profiling_seq_pipelines_epochctrl3():
assert num_iter == 2
# Confirm pipeline file and CPU util file each have 2 ops
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
confirm_cpuutil(2)
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
confirm_cpuutil(2, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_seq_pipelines_epochctrl2():
@ -461,7 +509,11 @@ def test_profiling_seq_pipelines_epochctrl2():
1) Generator -> Batch
2) Generator -> Batch -> EpochCtrl
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(64)]
data2 = ds.GeneratorDataset(source, ["data"])
@ -476,8 +528,8 @@ def test_profiling_seq_pipelines_epochctrl2():
assert num_iter == 4
# Confirm pipeline file and CPU util file each have 2 ops
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
confirm_cpuutil(2)
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
confirm_cpuutil(2, cpu_util_file)
# Test B - Call create_dict_iterator with num_epochs>1
num_iter = 0
@ -488,15 +540,15 @@ def test_profiling_seq_pipelines_epochctrl2():
assert num_iter == 4
# Confirm pipeline file and CPU util file each have 3 ops
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
confirm_cpuutil(3)
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
confirm_cpuutil(3, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
def test_profiling_seq_pipelines_repeat():
@ -505,7 +557,11 @@ def test_profiling_seq_pipelines_repeat():
1) Generator -> Batch
2) Generator -> Batch -> Repeat
"""
set_profiling_env_var()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
set_profiling_env_var(file_id)
pipeline_file = PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = CPU_UTIL_FILE + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(64)]
data2 = ds.GeneratorDataset(source, ["data"])
@ -519,8 +575,8 @@ def test_profiling_seq_pipelines_repeat():
assert num_iter == 4
# Confirm pipeline file and CPU util file each have 2 ops
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
confirm_cpuutil(2)
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
confirm_cpuutil(2, cpu_util_file)
# Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
data2 = data2.repeat(5)
@ -530,15 +586,15 @@ def test_profiling_seq_pipelines_repeat():
assert num_iter == 20
# Confirm pipeline file and CPU util file each have 3 ops
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"])
confirm_cpuutil(3)
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)
confirm_cpuutil(3, cpu_util_file)
except Exception as error:
delete_profiling_files()
delete_profiling_files(file_id)
raise error
else:
delete_profiling_files()
delete_profiling_files(file_id)
if __name__ == "__main__":

View File

@ -24,8 +24,12 @@ import mindspore.dataset as ds
import mindspore.dataset.transforms.c_transforms as C
from mindspore.profiler.parser.minddata_analyzer import MinddataProfilingAnalyzer
# add file name to rank id mapping to avoid file writing crash
file_name_map_rank_id = {"test_analyze_basic": "0",
"test_analyze_sequential_pipelines_invalid": "1"}
class TestMinddataProfilingAnalyzer():
class TestMinddataProfilingAnalyzer:
"""
Test the MinddataProfilingAnalyzer class
"""
@ -34,12 +38,11 @@ class TestMinddataProfilingAnalyzer():
"""
Run once for the class
"""
# Define filenames and path used for the MinddataProfilingAnalyzer tests. Use device_id=7.
self._PIPELINE_FILE = "./pipeline_profiling_7.json"
self._CPU_UTIL_FILE = "./minddata_cpu_utilization_7.json"
self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_7.txt"
self._SUMMARY_JSON_FILE = "./minddata_pipeline_summary_7.json"
self._SUMMARY_CSV_FILE = "./minddata_pipeline_summary_7.csv"
self._PIPELINE_FILE = "./pipeline_profiling"
self._CPU_UTIL_FILE = "./minddata_cpu_utilization"
self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling"
self._SUMMARY_JSON_FILE = "./minddata_pipeline_summary"
self._SUMMARY_CSV_FILE = "./minddata_pipeline_summary"
self._ANALYZE_FILE_PATH = "./"
# This is the set of keys for success case
@ -48,44 +51,60 @@ class TestMinddataProfilingAnalyzer():
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
def setup_method(self):
"""
Run before each test function.
"""
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
summary_json_file = self._SUMMARY_JSON_FILE + "_" + file_id + ".json"
summary_csv_file = self._SUMMARY_CSV_FILE + "_" + file_id + ".csv"
# Confirm MindData Profiling files do not yet exist
assert os.path.exists(self._PIPELINE_FILE) is False
assert os.path.exists(self._CPU_UTIL_FILE) is False
assert os.path.exists(self._DATASET_ITERATOR_FILE) is False
assert os.path.exists(pipeline_file) is False
assert os.path.exists(cpu_util_file) is False
assert os.path.exists(dataset_iterator_file) is False
# Confirm MindData Profiling analyze summary files do not yet exist
assert os.path.exists(self._SUMMARY_JSON_FILE) is False
assert os.path.exists(self._SUMMARY_CSV_FILE) is False
assert os.path.exists(summary_json_file) is False
assert os.path.exists(summary_csv_file) is False
# Set the MindData Profiling environment variables
os.environ['PROFILING_MODE'] = 'true'
os.environ['MINDDATA_PROFILING_DIR'] = '.'
os.environ['RANK_ID'] = '7'
os.environ['RANK_ID'] = file_id
def teardown_method(self):
"""
Run after each test function.
"""
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
summary_json_file = self._SUMMARY_JSON_FILE + "_" + file_id + ".json"
summary_csv_file = self._SUMMARY_CSV_FILE + "_" + file_id + ".csv"
# Delete MindData profiling files generated from the test.
os.remove(self._PIPELINE_FILE)
os.remove(self._CPU_UTIL_FILE)
os.remove(self._DATASET_ITERATOR_FILE)
os.remove(pipeline_file)
os.remove(cpu_util_file)
os.remove(dataset_iterator_file)
# Delete MindData profiling analyze summary files generated from the test.
os.remove(self._SUMMARY_JSON_FILE)
os.remove(self._SUMMARY_CSV_FILE)
os.remove(summary_json_file)
os.remove(summary_csv_file)
# Disable MindData Profiling environment variables
del os.environ['PROFILING_MODE']
del os.environ['MINDDATA_PROFILING_DIR']
del os.environ['RANK_ID']
def get_csv_result(self, file_pathname):
"""
Get result from the CSV file.
@ -103,15 +122,19 @@ class TestMinddataProfilingAnalyzer():
result.append(row)
return result
def verify_md_summary(self, md_summary_dict, EXPECTED_SUMMARY_KEYS):
"""
Verify the content of the 3 variations of the MindData Profiling analyze summary output.
"""
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
summary_json_file = self._SUMMARY_JSON_FILE + "_" + file_id + ".json"
summary_csv_file = self._SUMMARY_CSV_FILE + "_" + file_id + ".csv"
# Confirm MindData Profiling analyze summary files are created
assert os.path.exists(self._SUMMARY_JSON_FILE) is True
assert os.path.exists(self._SUMMARY_CSV_FILE) is True
assert os.path.exists(summary_json_file) is True
assert os.path.exists(summary_csv_file) is True
# Build a list of the sorted returned keys
summary_returned_keys = list(md_summary_dict.keys())
@ -122,7 +145,7 @@ class TestMinddataProfilingAnalyzer():
assert k in summary_returned_keys
# Read summary JSON file
with open(self._SUMMARY_JSON_FILE) as f:
with open(summary_json_file) as f:
summary_json_data = json.load(f)
# Build a list of the sorted JSON keys
summary_json_keys = list(summary_json_data.keys())
@ -136,7 +159,7 @@ class TestMinddataProfilingAnalyzer():
np.testing.assert_array_equal(summary_returned_keys, summary_json_keys)
# Read summary CSV file
summary_csv_data = self.get_csv_result(self._SUMMARY_CSV_FILE)
summary_csv_data = self.get_csv_result(summary_csv_file)
# Build a list of the sorted CSV keys from the first column in the CSV file
summary_csv_keys = []
for x in summary_csv_data:
@ -150,18 +173,24 @@ class TestMinddataProfilingAnalyzer():
# 3b. Confirm returned dictionary keys are identical to CSV file first column keys
np.testing.assert_array_equal(summary_returned_keys, summary_csv_keys)
def mysource(self):
"""Source for data values"""
for i in range(8000):
yield (np.array([i]),)
def test_analyze_basic(self):
"""
Test MindData profiling analyze summary files exist with basic pipeline.
Also test basic content (subset of keys and values) from the returned summary result.
"""
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
# Create this basic and common linear pipeline
# Generator -> Map -> Batch -> Repeat -> EpochCtrl
data1 = ds.GeneratorDataset(self.mysource, ["col1"])
@ -179,12 +208,12 @@ class TestMinddataProfilingAnalyzer():
assert num_iter == 1000
# Confirm MindData Profiling files are created
assert os.path.exists(self._PIPELINE_FILE) is True
assert os.path.exists(self._CPU_UTIL_FILE) is True
assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
assert os.path.exists(dataset_iterator_file) is True
# Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH)
md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, file_id, self._ANALYZE_FILE_PATH)
md_summary_dict = md_analyzer.analyze()
# Verify MindData Profiling Analyze Summary output
@ -210,11 +239,17 @@ class TestMinddataProfilingAnalyzer():
assert md_summary_dict["parent_id"] == [-1, 0, 1, 2, 3]
assert len(md_summary_dict["avg_cpu_pct"]) == 5
def test_analyze_sequential_pipelines_invalid(self):
"""
Test invalid scenario in which MinddataProfilingAnalyzer is called for two sequential pipelines.
"""
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
# Create the pipeline
# Generator -> Map -> Batch -> EpochCtrl
data1 = ds.GeneratorDataset(self.mysource, ["col1"])
@ -233,9 +268,9 @@ class TestMinddataProfilingAnalyzer():
assert num_iter == 125
# Confirm MindData Profiling files are created
assert os.path.exists(self._PIPELINE_FILE) is True
assert os.path.exists(self._CPU_UTIL_FILE) is True
assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
assert os.path.exists(dataset_iterator_file) is True
# Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1
# Note: This pipeline has 3 ops: Generator -> Map -> Batch
@ -250,12 +285,12 @@ class TestMinddataProfilingAnalyzer():
# Confirm MindData Profiling files are created
# Note: There is an MD bug in which which the pipeline file is not recreated;
# it still has 4 ops instead of 3 ops
assert os.path.exists(self._PIPELINE_FILE) is True
assert os.path.exists(self._CPU_UTIL_FILE) is True
assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
assert os.path.exists(dataset_iterator_file) is True
# Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH)
md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, file_id, self._ANALYZE_FILE_PATH)
md_summary_dict = md_analyzer.analyze()
# Verify MindData Profiling Analyze Summary output