From 7cb6989efd306154a53ab1daa835a6f64f117065 Mon Sep 17 00:00:00 2001 From: zetongzhao Date: Mon, 13 Dec 2021 15:28:48 -0500 Subject: [PATCH] replace python removal with pytest tmp_path --- tests/ut/python/dataset/test_profiling.py | 171 ++++++------------ .../dataset/test_profiling_startstop.py | 164 +++++++---------- .../profiler/parser/test_minddata_analyzer.py | 102 +++-------- 3 files changed, 146 insertions(+), 291 deletions(-) diff --git a/tests/ut/python/dataset/test_profiling.py b/tests/ut/python/dataset/test_profiling.py index 9c623efdd8c..7a678dfec6d 100644 --- a/tests/ut/python/dataset/test_profiling.py +++ b/tests/ut/python/dataset/test_profiling.py @@ -29,23 +29,13 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"] DATASET_ROOT = "../data/dataset/testTFTestAllTypes/" SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json" -# add file name to rank id mapping to avoid file writing crash -file_name_map_rank_id = {"test_profiling_simple_pipeline": "0", - "test_profiling_complex_pipeline": "1", - "test_profiling_inline_ops_pipeline1": "2", - "test_profiling_inline_ops_pipeline2": "3", - "test_profiling_sampling_interval": "4", - "test_profiling_basic_pipeline": "5", - "test_profiling_cifar10_pipeline": "6", - "test_profiling_seq_pipelines_epochctrl3": "7", - "test_profiling_seq_pipelines_epochctrl2": "8", - "test_profiling_seq_pipelines_repeat": "9"} - @pytest.mark.forked class TestMinddataProfilingManager: """ Test MinddataProfilingManager + Note: Use pytest fixture tmp_path to create files within this temporary directory, + which is automatically created for each test and deleted at the end of the test. """ def setup_class(self): @@ -55,29 +45,14 @@ class TestMinddataProfilingManager: # Get instance pointer for MindData profiling manager self.md_profiler = cde.GlobalContext.profiling_manager() - self._pipeline_file = "./pipeline_profiling" - self._cpu_util_file = "./minddata_cpu_utilization" - self._dataset_iterator_file = "./dataset_iterator_profiling" - def setup_method(self): """ Run before each test function. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" - - # Confirm MindData Profiling files do not yet exist - assert os.path.exists(pipeline_file) is False - assert os.path.exists(cpu_util_file) is False - assert os.path.exists(dataset_iterator_file) is False # Set the MindData Profiling related environment variables - os.environ['RANK_ID'] = file_id - os.environ['DEVICE_ID'] = file_id + os.environ['RANK_ID'] = "1" + os.environ['DEVICE_ID'] = "1" # Initialize MindData profiling manager self.md_profiler.init() @@ -90,18 +65,6 @@ class TestMinddataProfilingManager: Run after each test function. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" - - # Delete MindData profiling files generated from the test. - os.remove(pipeline_file) - os.remove(cpu_util_file) - os.remove(dataset_iterator_file) - # Disable MindData Profiling related environment variables del os.environ['RANK_ID'] del os.environ['DEVICE_ID'] @@ -127,16 +90,10 @@ class TestMinddataProfilingManager: for i in range(num_ops): assert op_info[i]["op_type"] in op_list - def test_profiling_simple_pipeline(self): + def test_profiling_simple_pipeline(self, tmp_path): """ Generator -> Shuffle -> Batch """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" source = [(np.array([x]),) for x in range(1024)] data1 = ds.GeneratorDataset(source, ["data"]) @@ -147,36 +104,29 @@ class TestMinddataProfilingManager: assert [str(tp) for tp in data1.output_types()] == ["int64"] assert data1.get_dataset_size() == 32 - # Confirm profiling files do not (yet) exist - assert os.path.exists(pipeline_file) is False - assert os.path.exists(cpu_util_file) is False - assert os.path.exists(dataset_iterator_file) is False - for _ in data1: pass # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" + dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" # Confirm profiling files now exist assert os.path.exists(pipeline_file) is True assert os.path.exists(cpu_util_file) is True assert os.path.exists(dataset_iterator_file) is True - def test_profiling_complex_pipeline(self): + def test_profiling_complex_pipeline(self, tmp_path): """ Generator -> Map -> -> Zip TFReader -> Shuffle -> """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - source = [(np.array([x]),) for x in range(1024)] data1 = ds.GeneratorDataset(source, ["gen"]) data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"]) @@ -192,7 +142,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" with open(pipeline_file) as f: data = json.load(f) @@ -209,18 +162,13 @@ class TestMinddataProfilingManager: # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file self.confirm_cpuutil(5, cpu_util_file) - def test_profiling_inline_ops_pipeline1(self): + def test_profiling_inline_ops_pipeline1(self, tmp_path): """ Test pipeline with inline ops: Concat and EpochCtrl Generator -> Concat -> EpochCtrl Generator -> """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" # In source1 dataset: Number of rows is 3; its values are 0, 1, 2 def source1(): @@ -248,7 +196,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" # Confirm pipeline is created with EpochCtrl op with open(pipeline_file) as f: @@ -267,16 +218,11 @@ class TestMinddataProfilingManager: # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file self.confirm_cpuutil(4, cpu_util_file) - def test_profiling_inline_ops_pipeline2(self): + def test_profiling_inline_ops_pipeline2(self, tmp_path): """ Test pipeline with many inline ops Generator -> Rename -> Skip -> Repeat -> Take """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9 def source1(): @@ -294,7 +240,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" with open(pipeline_file) as f: data = json.load(f) @@ -312,7 +261,7 @@ class TestMinddataProfilingManager: # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file self.confirm_cpuutil(5, cpu_util_file) - def test_profiling_sampling_interval(self): + def test_profiling_sampling_interval(self, tmp_path): """ Test non-default monitor sampling interval """ @@ -334,18 +283,13 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) - def test_profiling_basic_pipeline(self): + def test_profiling_basic_pipeline(self, tmp_path): """ Test with this basic pipeline Generator -> Map -> Batch -> Repeat -> EpochCtrl """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" def source1(): for i in range(8000): @@ -369,7 +313,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" with open(pipeline_file) as f: data = json.load(f) @@ -387,16 +334,11 @@ class TestMinddataProfilingManager: # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file self.confirm_cpuutil(5, cpu_util_file) - def test_profiling_cifar10_pipeline(self): + def test_profiling_cifar10_pipeline(self, tmp_path): """ Test with this common pipeline with Cifar10 Cifar10 -> Map -> Map -> Batch -> Repeat """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" # Create this common pipeline # Cifar10 -> Map -> Map -> Batch -> Repeat @@ -420,7 +362,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" with open(pipeline_file) as f: data = json.load(f) @@ -438,18 +383,13 @@ class TestMinddataProfilingManager: # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file self.confirm_cpuutil(5, cpu_util_file) - def test_profiling_seq_pipelines_epochctrl3(self): + def test_profiling_seq_pipelines_epochctrl3(self, tmp_path): """ Test with these 2 sequential pipelines: 1) Generator -> Batch -> EpochCtrl 2) Generator -> Batch Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" source = [(np.array([x]),) for x in range(64)] data1 = ds.GeneratorDataset(source, ["data"]) @@ -464,7 +404,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" # Confirm pipeline file and CPU util file each have 3 ops self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file) @@ -485,23 +428,18 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) # Confirm pipeline file and CPU util file each have 2 ops self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) self.confirm_cpuutil(2, cpu_util_file) - def test_profiling_seq_pipelines_epochctrl2(self): + def test_profiling_seq_pipelines_epochctrl2(self, tmp_path): """ Test with these 2 sequential pipelines: 1) Generator -> Batch 2) Generator -> Batch -> EpochCtrl """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" source = [(np.array([x]),) for x in range(64)] data2 = ds.GeneratorDataset(source, ["data"]) @@ -516,7 +454,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" # Confirm pipeline file and CPU util file each have 2 ops self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) @@ -537,23 +478,18 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) # Confirm pipeline file and CPU util file each have 3 ops self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file) self.confirm_cpuutil(3, cpu_util_file) - def test_profiling_seq_pipelines_repeat(self): + def test_profiling_seq_pipelines_repeat(self, tmp_path): """ Test with these 2 sequential pipelines: 1) Generator -> Batch 2) Generator -> Batch -> Repeat """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" source = [(np.array([x]),) for x in range(64)] data2 = ds.GeneratorDataset(source, ["data"]) @@ -567,7 +503,10 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" # Confirm pipeline file and CPU util file each have 2 ops self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) @@ -587,7 +526,7 @@ class TestMinddataProfilingManager: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save('./') + self.md_profiler.save(str(tmp_path)) # Confirm pipeline file and CPU util file each have 3 ops self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file) diff --git a/tests/ut/python/dataset/test_profiling_startstop.py b/tests/ut/python/dataset/test_profiling_startstop.py index a1c6d681ce7..156773a3ffd 100644 --- a/tests/ut/python/dataset/test_profiling_startstop.py +++ b/tests/ut/python/dataset/test_profiling_startstop.py @@ -28,71 +28,45 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"] DATASET_ROOT = "../data/dataset/testTFTestAllTypes/" SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json" -# Add file name to rank id mapping so that each profiling file name is unique, -# to support parallel test execution -file_name_map_rank_id = {"test_profiling_early_stop": "0", - "test_profiling_delayed_start": "1", - "test_profiling_start_start": "2", - "test_profiling_multiple_start_stop": "3", - "test_profiling_stop_stop": "4", - "test_profiling_stop_nostart": "5"} - @pytest.mark.forked class TestMindDataProfilingStartStop: """ Test MindData Profiling Manager Start-Stop Support + Note: Use pytest fixture tmp_path to create files within this temporary directory, + which is automatically created for each test and deleted at the end of the test. """ def setup_class(self): """ Run once for the class """ - self._pipeline_file = "./pipeline_profiling" - self._cpu_util_file = "./minddata_cpu_utilization" - self._dataset_iterator_file = "./dataset_iterator_profiling" + # Get instance pointer for MindData profiling manager + self.md_profiler = cde.GlobalContext.profiling_manager() def setup_method(self): """ Run before each test function. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - self.pipeline_file = self._pipeline_file + "_" + file_id + ".json" - self.cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - self.dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" - - # Confirm MindData Profiling files do not yet exist - assert os.path.exists(self.pipeline_file) is False - assert os.path.exists(self.cpu_util_file) is False - assert os.path.exists(self.dataset_iterator_file) is False # Set the MindData Profiling related environment variables - os.environ['RANK_ID'] = file_id - os.environ['DEVICE_ID'] = file_id + os.environ['RANK_ID'] = "0" + os.environ['DEVICE_ID'] = "0" def teardown_method(self): """ Run after each test function. """ - # Delete MindData profiling files generated from the test. - if os.path.exists(self.pipeline_file): - os.remove(self.pipeline_file) - if os.path.exists(self.cpu_util_file): - os.remove(self.cpu_util_file) - if os.path.exists(self.dataset_iterator_file): - os.remove(self.dataset_iterator_file) # Disable MindData Profiling related environment variables del os.environ['RANK_ID'] del os.environ['DEVICE_ID'] - def confirm_pipeline_file(self, num_ops, op_list=None): + def confirm_pipeline_file(self, pipeline_file, num_ops, op_list=None): """ Confirm pipeline JSON file with in the pipeline and the given optional list of ops """ - with open(self.pipeline_file) as file1: + with open(pipeline_file) as file1: data = json.load(file1) op_info = data["op_info"] # Confirm ops in pipeline file @@ -101,41 +75,37 @@ class TestMindDataProfilingStartStop: for i in range(num_ops): assert op_info[i]["op_type"] in op_list - def confirm_cpuutil_file(self, num_pipeline_ops): + def confirm_cpuutil_file(self, cpu_util_file, num_pipeline_ops): """ Confirm CPU utilization JSON file with in the pipeline """ - with open(self.cpu_util_file) as file1: + with open(cpu_util_file) as file1: data = json.load(file1) op_info = data["op_info"] assert len(op_info) == num_pipeline_ops - def confirm_dataset_iterator_file(self, num_batches): + def confirm_dataset_iterator_file(self, dataset_iterator_file, num_batches): """ Confirm dataset iterator file exists with the correct number of rows in the file """ - assert os.path.exists(self.dataset_iterator_file) - actual_num_lines = sum(1 for _ in open(self.dataset_iterator_file)) + assert os.path.exists(dataset_iterator_file) + actual_num_lines = sum(1 for _ in open(dataset_iterator_file)) # Confirm there are 4 lines for each batch in the dataset iterator file assert actual_num_lines == 4 * num_batches - def test_profiling_early_stop(self): + def test_profiling_early_stop(self, tmp_path): """ Test MindData Profiling with Early Stop; profile for some iterations and then stop profiling """ - def source1(): for i in range(8000): yield (np.array([i]),) - # Get instance pointer for MindData profiling manager - md_profiler = cde.GlobalContext.profiling_manager() - # Initialize MindData profiling manager - md_profiler.init() + self.md_profiler.init() # Start MindData Profiling - md_profiler.start() + self.md_profiler.start() # Create this basic and common pipeline # Leaf/Source-Op -> Map -> Batch @@ -150,19 +120,23 @@ class TestMindDataProfilingStartStop: for _ in data1.create_dict_iterator(num_epochs=2): if num_iter == 400: # Stop MindData Profiling and Save MindData Profiling Output - md_profiler.stop() - md_profiler.save(os.getcwd()) + self.md_profiler.stop() + self.md_profiler.save(str(tmp_path)) num_iter += 1 assert num_iter == 500 - # Confirm the content of the profiling files, including 4 ops in the pipeline JSON file - self.confirm_pipeline_file(4, ["GeneratorOp", "BatchOp", "MapOp", "EpochCtrlOp"]) - self.confirm_cpuutil_file(4) - self.confirm_dataset_iterator_file(401) + pipeline_file = str(tmp_path) + "/pipeline_profiling_0.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_0.json" + dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_0.txt" - def test_profiling_delayed_start(self): + # Confirm the content of the profiling files, including 4 ops in the pipeline JSON file + self.confirm_pipeline_file(pipeline_file, 4, ["GeneratorOp", "BatchOp", "MapOp", "EpochCtrlOp"]) + self.confirm_cpuutil_file(cpu_util_file, 4) + self.confirm_dataset_iterator_file(dataset_iterator_file, 401) + + def test_profiling_delayed_start(self, tmp_path): """ Test MindData Profiling with Delayed Start; profile for subset of iterations """ @@ -171,11 +145,8 @@ class TestMindDataProfilingStartStop: for i in range(8000): yield (np.array([i]),) - # Get instance pointer for MindData profiling manager - md_profiler = cde.GlobalContext.profiling_manager() - # Initialize MindData profiling manager - md_profiler.init() + self.md_profiler.init() # Create this basic and common pipeline # Leaf/Source-Op -> Map -> Batch @@ -190,22 +161,25 @@ class TestMindDataProfilingStartStop: for _ in data1.create_dict_iterator(num_epochs=1): if num_iter == 5: # Start MindData Profiling - md_profiler.start() + self.md_profiler.start() elif num_iter == 400: # Stop MindData Profiling and Save MindData Profiling Output - md_profiler.stop() - md_profiler.save(os.getcwd()) + self.md_profiler.stop() + self.md_profiler.save(str(tmp_path)) num_iter += 1 assert num_iter == 500 + pipeline_file = str(tmp_path) + "/pipeline_profiling_0.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_0.json" + dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_0.txt" # Confirm the content of the profiling files, including 3 ops in the pipeline JSON file - self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"]) - self.confirm_cpuutil_file(3) - self.confirm_dataset_iterator_file(395) + self.confirm_pipeline_file(pipeline_file, 3, ["GeneratorOp", "BatchOp", "MapOp"]) + self.confirm_cpuutil_file(cpu_util_file, 3) + self.confirm_dataset_iterator_file(dataset_iterator_file, 395) - def test_profiling_multiple_start_stop(self): + def test_profiling_multiple_start_stop(self, tmp_path): """ Test MindData Profiling with Delayed Start and Multiple Start-Stop Sequences """ @@ -214,11 +188,8 @@ class TestMindDataProfilingStartStop: for i in range(8000): yield (np.array([i]),) - # Get instance pointer for MindData profiling manager - md_profiler = cde.GlobalContext.profiling_manager() - # Initialize MindData profiling manager - md_profiler.init() + self.md_profiler.init() # Create this basic and common pipeline # Leaf/Source-Op -> Map -> Batch @@ -233,89 +204,84 @@ class TestMindDataProfilingStartStop: for _ in data1.create_dict_iterator(num_epochs=1): if num_iter == 5: # Start MindData Profiling - md_profiler.start() + self.md_profiler.start() elif num_iter == 40: # Stop MindData Profiling - md_profiler.stop() + self.md_profiler.stop() if num_iter == 200: # Start MindData Profiling - md_profiler.start() + self.md_profiler.start() elif num_iter == 400: # Stop MindData Profiling - md_profiler.stop() + self.md_profiler.stop() num_iter += 1 # Save MindData Profiling Output - md_profiler.save(os.getcwd()) + self.md_profiler.save(str(tmp_path)) + assert num_iter == 500 + pipeline_file = str(tmp_path) + "/pipeline_profiling_0.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_0.json" + dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_0.txt" # Confirm the content of the profiling files, including 3 ops in the pipeline JSON file - self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"]) - self.confirm_cpuutil_file(3) + self.confirm_pipeline_file(pipeline_file, 3, ["GeneratorOp", "BatchOp", "MapOp"]) + self.confirm_cpuutil_file(cpu_util_file, 3) # Note: The dataset iterator file should only contain data for batches 200 to 400 - self.confirm_dataset_iterator_file(200) + self.confirm_dataset_iterator_file(dataset_iterator_file, 200) def test_profiling_start_start(self): """ Test MindData Profiling with Start followed by Start - user error scenario """ - # Get instance pointer for MindData profiling manager - md_profiler = cde.GlobalContext.profiling_manager() - # Initialize MindData profiling manager - md_profiler.init() + self.md_profiler.init() # Start MindData Profiling - md_profiler.start() + self.md_profiler.start() with pytest.raises(RuntimeError) as info: # Reissue Start MindData Profiling - md_profiler.start() + self.md_profiler.start() assert "MD ProfilingManager is already running." in str(info) # Stop MindData Profiling - md_profiler.stop() + self.md_profiler.stop() - def test_profiling_stop_stop(self): + def test_profiling_stop_stop(self, tmp_path): """ Test MindData Profiling with Stop followed by Stop - user warning scenario """ - # Get instance pointer for MindData profiling manager - md_profiler = cde.GlobalContext.profiling_manager() - # Initialize MindData profiling manager - md_profiler.init() + self.md_profiler.init() # Start MindData Profiling - md_profiler.start() + self.md_profiler.start() # Stop MindData Profiling and Save MindData Profiling Output - md_profiler.stop() - md_profiler.save(os.getcwd()) + self.md_profiler.stop() + self.md_profiler.save(str(tmp_path)) # Reissue Stop MindData Profiling # A warning "MD ProfilingManager had already stopped" is produced. - md_profiler.stop() + self.md_profiler.stop() def test_profiling_stop_nostart(self): """ Test MindData Profiling with Stop not without prior Start - user error scenario """ - # Get instance pointer for MindData profiling manager - md_profiler = cde.GlobalContext.profiling_manager() - # Initialize MindData profiling manager - md_profiler.init() + self.md_profiler.init() with pytest.raises(RuntimeError) as info: # Stop MindData Profiling - without prior Start() - md_profiler.stop() + self.md_profiler.stop() assert "MD ProfilingManager has not started yet." in str(info) # Start MindData Profiling - md_profiler.start() + self.md_profiler.start() # Stop MindData Profiling - to return profiler to a healthy state - md_profiler.stop() + self.md_profiler.stop() diff --git a/tests/ut/python/profiler/parser/test_minddata_analyzer.py b/tests/ut/python/profiler/parser/test_minddata_analyzer.py index 22dff05a6ef..dc899e289cb 100644 --- a/tests/ut/python/profiler/parser/test_minddata_analyzer.py +++ b/tests/ut/python/profiler/parser/test_minddata_analyzer.py @@ -26,15 +26,13 @@ import mindspore.dataset.transforms.c_transforms as C import mindspore._c_dataengine as cde from mindspore.profiler.parser.minddata_analyzer import MinddataProfilingAnalyzer -# add file name to rank id mapping to avoid file writing crash -file_name_map_rank_id = {"test_analyze_basic": "0", - "test_analyze_sequential_pipelines_invalid": "1"} - @pytest.mark.forked class TestMinddataProfilingAnalyzer: """ Test the MinddataProfilingAnalyzer class + Note: Use pytest fixture tmp_path to create files within this temporary directory, + which is automatically created for each test and deleted at the end of the test. """ def setup_class(self): @@ -44,13 +42,6 @@ class TestMinddataProfilingAnalyzer: # Get instance pointer for MindData profiling manager self.md_profiler = cde.GlobalContext.profiling_manager() - self._pipeline_file = "./pipeline_profiling" - self._cpu_util_file = "./minddata_cpu_utilization" - self._dataset_iterator_file = "./dataset_iterator_profiling" - self._summary_json_file = "./minddata_pipeline_summary" - self._summary_csv_file = "./minddata_pipeline_summary" - self._analyze_file_path = "./" - # This is the set of keys for success case self._expected_summary_keys_success = \ ['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names', @@ -61,26 +52,10 @@ class TestMinddataProfilingAnalyzer: """ Run before each test function. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" - summary_json_file = self._summary_json_file + "_" + file_id + ".json" - summary_csv_file = self._summary_csv_file + "_" + file_id + ".csv" - - # Confirm MindData Profiling files do not yet exist - assert os.path.exists(pipeline_file) is False - assert os.path.exists(cpu_util_file) is False - assert os.path.exists(dataset_iterator_file) is False - # Confirm MindData Profiling analyze summary files do not yet exist - assert os.path.exists(summary_json_file) is False - assert os.path.exists(summary_csv_file) is False # Set the MindData Profiling related environment variables - os.environ['RANK_ID'] = file_id - os.environ['DEVICE_ID'] = file_id + os.environ['RANK_ID'] = "7" + os.environ['DEVICE_ID'] = "7" # Initialize MindData profiling manager self.md_profiler.init() @@ -93,24 +68,6 @@ class TestMinddataProfilingAnalyzer: Run after each test function. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" - summary_json_file = self._summary_json_file + "_" + file_id + ".json" - summary_csv_file = self._summary_csv_file + "_" + file_id + ".csv" - - # Delete MindData profiling files generated from the test. - os.remove(pipeline_file) - os.remove(cpu_util_file) - os.remove(dataset_iterator_file) - - # Delete MindData profiling analyze summary files generated from the test. - os.remove(summary_json_file) - os.remove(summary_csv_file) - # Disable MindData Profiling related environment variables del os.environ['RANK_ID'] del os.environ['DEVICE_ID'] @@ -132,16 +89,12 @@ class TestMinddataProfilingAnalyzer: result.append(row) return result - def verify_md_summary(self, md_summary_dict, expected_summary_keys): + def verify_md_summary(self, md_summary_dict, expected_summary_keys, output_dir): """ Verify the content of the 3 variations of the MindData Profiling analyze summary output. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - summary_json_file = self._summary_json_file + "_" + file_id + ".json" - summary_csv_file = self._summary_csv_file + "_" + file_id + ".csv" - + summary_json_file = output_dir + "/minddata_pipeline_summary_7.json" + summary_csv_file = output_dir + "/minddata_pipeline_summary_7.csv" # Confirm MindData Profiling analyze summary files are created assert os.path.exists(summary_json_file) is True assert os.path.exists(summary_csv_file) is True @@ -188,19 +141,12 @@ class TestMinddataProfilingAnalyzer: for i in range(8000): yield (np.array([i]),) - def test_analyze_basic(self): + def test_analyze_basic(self, tmp_path): """ Test MindData profiling analyze summary files exist with basic pipeline. Also test basic content (subset of keys and values) from the returned summary result. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" - # Create this basic and common linear pipeline # Generator -> Map -> Batch -> Repeat -> EpochCtrl data1 = ds.GeneratorDataset(self.mysource, ["col1"]) @@ -219,7 +165,12 @@ class TestMinddataProfilingAnalyzer: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save(os.getcwd()) + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_7.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_7.json" + dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_7.txt" + analyze_file_path = str(tmp_path) + "/" # Confirm MindData Profiling files are created assert os.path.exists(pipeline_file) is True @@ -227,7 +178,7 @@ class TestMinddataProfilingAnalyzer: assert os.path.exists(dataset_iterator_file) is True # Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result - md_analyzer = MinddataProfilingAnalyzer(self._analyze_file_path, file_id, self._analyze_file_path) + md_analyzer = MinddataProfilingAnalyzer(analyze_file_path, "7", analyze_file_path) md_summary_dict = md_analyzer.analyze() # Verify MindData Profiling Analyze Summary output @@ -235,7 +186,7 @@ class TestMinddataProfilingAnalyzer: # 1. returned dictionary # 2. JSON file # 3. CSV file - self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success) + self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success, str(tmp_path)) # 4. Verify non-variant values or number of values in the tested pipeline for certain keys # of the returned dictionary @@ -253,16 +204,10 @@ class TestMinddataProfilingAnalyzer: assert md_summary_dict["parent_id"] == [-1, 0, 1, 2, 3] assert len(md_summary_dict["avg_cpu_pct"]) == 5 - def test_analyze_sequential_pipelines_invalid(self): + def test_analyze_sequential_pipelines_invalid(self, tmp_path): """ Test invalid scenario in which MinddataProfilingAnalyzer is called for two sequential pipelines. """ - file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] - file_id = file_name_map_rank_id[file_name] - - pipeline_file = self._pipeline_file + "_" + file_id + ".json" - cpu_util_file = self._cpu_util_file + "_" + file_id + ".json" - dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt" # Create the pipeline # Generator -> Map -> Batch -> EpochCtrl @@ -283,7 +228,12 @@ class TestMinddataProfilingAnalyzer: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save(os.getcwd()) + self.md_profiler.save(str(tmp_path)) + + pipeline_file = str(tmp_path) + "/pipeline_profiling_7.json" + cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_7.json" + dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_7.txt" + analyze_file_path = str(tmp_path) + "/" # Confirm MindData Profiling files are created assert os.path.exists(pipeline_file) is True @@ -307,7 +257,7 @@ class TestMinddataProfilingAnalyzer: # Stop MindData Profiling and save output files to current working directory self.md_profiler.stop() - self.md_profiler.save(os.getcwd()) + self.md_profiler.save(str(tmp_path)) # Confirm MindData Profiling files are created # Note: There is an MD bug in which which the pipeline file is not recreated; @@ -317,11 +267,11 @@ class TestMinddataProfilingAnalyzer: assert os.path.exists(dataset_iterator_file) is True # Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result - md_analyzer = MinddataProfilingAnalyzer(self._analyze_file_path, file_id, self._analyze_file_path) + md_analyzer = MinddataProfilingAnalyzer(analyze_file_path, "7", analyze_file_path) md_summary_dict = md_analyzer.analyze() # Verify MindData Profiling Analyze Summary output - self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success) + self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success, str(tmp_path)) # Confirm pipeline data contains info for 3 ops assert md_summary_dict["pipeline_ops"] == ["Batch(id=0)", "Map(id=1)", "Generator(id=2)"]