replace python removal with pytest tmp_path

This commit is contained in:
zetongzhao 2021-12-13 15:28:48 -05:00
parent efa63b78c0
commit 7cb6989efd
3 changed files with 146 additions and 291 deletions

View File

@ -29,23 +29,13 @@ FILES = ["../data/dataset/testTFTestAllTypes/"]
DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
# add file name to rank id mapping to avoid file writing crash
file_name_map_rank_id = {"test_profiling_simple_pipeline": "0",
"test_profiling_complex_pipeline": "1",
"test_profiling_inline_ops_pipeline1": "2",
"test_profiling_inline_ops_pipeline2": "3",
"test_profiling_sampling_interval": "4",
"test_profiling_basic_pipeline": "5",
"test_profiling_cifar10_pipeline": "6",
"test_profiling_seq_pipelines_epochctrl3": "7",
"test_profiling_seq_pipelines_epochctrl2": "8",
"test_profiling_seq_pipelines_repeat": "9"}
class TestMinddataProfilingManager:
Test MinddataProfilingManager
Note: Use pytest fixture tmp_path to create files within this temporary directory,
which is automatically created for each test and deleted at the end of the test.
def setup_class(self):
@ -55,29 +45,14 @@ class TestMinddataProfilingManager:
# Get instance pointer for MindData profiling manager
self.md_profiler = cde.GlobalContext.profiling_manager()
self._pipeline_file = "./pipeline_profiling"
self._cpu_util_file = "./minddata_cpu_utilization"
self._dataset_iterator_file = "./dataset_iterator_profiling"
def setup_method(self):
Run before each test function.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
# Confirm MindData Profiling files do not yet exist
assert os.path.exists(pipeline_file) is False
assert os.path.exists(cpu_util_file) is False
assert os.path.exists(dataset_iterator_file) is False
# Set the MindData Profiling related environment variables
os.environ['RANK_ID'] = file_id
os.environ['DEVICE_ID'] = file_id
os.environ['RANK_ID'] = "1"
os.environ['DEVICE_ID'] = "1"
# Initialize MindData profiling manager
@ -90,18 +65,6 @@ class TestMinddataProfilingManager:
Run after each test function.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
# Delete MindData profiling files generated from the test.
# Disable MindData Profiling related environment variables
del os.environ['RANK_ID']
del os.environ['DEVICE_ID']
@ -127,16 +90,10 @@ class TestMinddataProfilingManager:
for i in range(num_ops):
assert op_info[i]["op_type"] in op_list
def test_profiling_simple_pipeline(self):
def test_profiling_simple_pipeline(self, tmp_path):
Generator -> Shuffle -> Batch
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["data"])
@ -147,36 +104,29 @@ class TestMinddataProfilingManager:
assert [str(tp) for tp in data1.output_types()] == ["int64"]
assert data1.get_dataset_size() == 32
# Confirm profiling files do not (yet) exist
assert os.path.exists(pipeline_file) is False
assert os.path.exists(cpu_util_file) is False
assert os.path.exists(dataset_iterator_file) is False
for _ in data1:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
# Confirm profiling files now exist
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
assert os.path.exists(dataset_iterator_file) is True
def test_profiling_complex_pipeline(self):
def test_profiling_complex_pipeline(self, tmp_path):
Generator -> Map ->
-> Zip
TFReader -> Shuffle ->
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["gen"])
data1 =[(lambda x: x + 1)], input_columns=["gen"])
@ -192,7 +142,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
with open(pipeline_file) as f:
data = json.load(f)
@ -209,18 +162,13 @@ class TestMinddataProfilingManager:
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
self.confirm_cpuutil(5, cpu_util_file)
def test_profiling_inline_ops_pipeline1(self):
def test_profiling_inline_ops_pipeline1(self, tmp_path):
Test pipeline with inline ops: Concat and EpochCtrl
Generator ->
Concat -> EpochCtrl
Generator ->
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
# In source1 dataset: Number of rows is 3; its values are 0, 1, 2
def source1():
@ -248,7 +196,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
# Confirm pipeline is created with EpochCtrl op
with open(pipeline_file) as f:
@ -267,16 +218,11 @@ class TestMinddataProfilingManager:
# Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
self.confirm_cpuutil(4, cpu_util_file)
def test_profiling_inline_ops_pipeline2(self):
def test_profiling_inline_ops_pipeline2(self, tmp_path):
Test pipeline with many inline ops
Generator -> Rename -> Skip -> Repeat -> Take
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
# In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
def source1():
@ -294,7 +240,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
with open(pipeline_file) as f:
data = json.load(f)
@ -312,7 +261,7 @@ class TestMinddataProfilingManager:
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
self.confirm_cpuutil(5, cpu_util_file)
def test_profiling_sampling_interval(self):
def test_profiling_sampling_interval(self, tmp_path):
Test non-default monitor sampling interval
@ -334,18 +283,13 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
def test_profiling_basic_pipeline(self):
def test_profiling_basic_pipeline(self, tmp_path):
Test with this basic pipeline
Generator -> Map -> Batch -> Repeat -> EpochCtrl
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
def source1():
for i in range(8000):
@ -369,7 +313,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
with open(pipeline_file) as f:
data = json.load(f)
@ -387,16 +334,11 @@ class TestMinddataProfilingManager:
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
self.confirm_cpuutil(5, cpu_util_file)
def test_profiling_cifar10_pipeline(self):
def test_profiling_cifar10_pipeline(self, tmp_path):
Test with this common pipeline with Cifar10
Cifar10 -> Map -> Map -> Batch -> Repeat
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
# Create this common pipeline
# Cifar10 -> Map -> Map -> Batch -> Repeat
@ -420,7 +362,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
with open(pipeline_file) as f:
data = json.load(f)
@ -438,18 +383,13 @@ class TestMinddataProfilingManager:
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
self.confirm_cpuutil(5, cpu_util_file)
def test_profiling_seq_pipelines_epochctrl3(self):
def test_profiling_seq_pipelines_epochctrl3(self, tmp_path):
Test with these 2 sequential pipelines:
1) Generator -> Batch -> EpochCtrl
2) Generator -> Batch
Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(64)]
data1 = ds.GeneratorDataset(source, ["data"])
@ -464,7 +404,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
# Confirm pipeline file and CPU util file each have 3 ops
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
@ -485,23 +428,18 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
# Confirm pipeline file and CPU util file each have 2 ops
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
self.confirm_cpuutil(2, cpu_util_file)
def test_profiling_seq_pipelines_epochctrl2(self):
def test_profiling_seq_pipelines_epochctrl2(self, tmp_path):
Test with these 2 sequential pipelines:
1) Generator -> Batch
2) Generator -> Batch -> EpochCtrl
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(64)]
data2 = ds.GeneratorDataset(source, ["data"])
@ -516,7 +454,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
# Confirm pipeline file and CPU util file each have 2 ops
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
@ -537,23 +478,18 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
# Confirm pipeline file and CPU util file each have 3 ops
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
self.confirm_cpuutil(3, cpu_util_file)
def test_profiling_seq_pipelines_repeat(self):
def test_profiling_seq_pipelines_repeat(self, tmp_path):
Test with these 2 sequential pipelines:
1) Generator -> Batch
2) Generator -> Batch -> Repeat
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
source = [(np.array([x]),) for x in range(64)]
data2 = ds.GeneratorDataset(source, ["data"])
@ -567,7 +503,10 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
# Confirm pipeline file and CPU util file each have 2 ops
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
@ -587,7 +526,7 @@ class TestMinddataProfilingManager:
# Stop MindData Profiling and save output files to current working directory
# Confirm pipeline file and CPU util file each have 3 ops
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)

View File

@ -28,71 +28,45 @@ FILES = ["../data/dataset/testTFTestAllTypes/"]
DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
# Add file name to rank id mapping so that each profiling file name is unique,
# to support parallel test execution
file_name_map_rank_id = {"test_profiling_early_stop": "0",
"test_profiling_delayed_start": "1",
"test_profiling_start_start": "2",
"test_profiling_multiple_start_stop": "3",
"test_profiling_stop_stop": "4",
"test_profiling_stop_nostart": "5"}
class TestMindDataProfilingStartStop:
Test MindData Profiling Manager Start-Stop Support
Note: Use pytest fixture tmp_path to create files within this temporary directory,
which is automatically created for each test and deleted at the end of the test.
def setup_class(self):
Run once for the class
self._pipeline_file = "./pipeline_profiling"
self._cpu_util_file = "./minddata_cpu_utilization"
self._dataset_iterator_file = "./dataset_iterator_profiling"
# Get instance pointer for MindData profiling manager
self.md_profiler = cde.GlobalContext.profiling_manager()
def setup_method(self):
Run before each test function.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
self.pipeline_file = self._pipeline_file + "_" + file_id + ".json"
self.cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
self.dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
# Confirm MindData Profiling files do not yet exist
assert os.path.exists(self.pipeline_file) is False
assert os.path.exists(self.cpu_util_file) is False
assert os.path.exists(self.dataset_iterator_file) is False
# Set the MindData Profiling related environment variables
os.environ['RANK_ID'] = file_id
os.environ['DEVICE_ID'] = file_id
os.environ['RANK_ID'] = "0"
os.environ['DEVICE_ID'] = "0"
def teardown_method(self):
Run after each test function.
# Delete MindData profiling files generated from the test.
if os.path.exists(self.pipeline_file):
if os.path.exists(self.cpu_util_file):
if os.path.exists(self.dataset_iterator_file):
# Disable MindData Profiling related environment variables
del os.environ['RANK_ID']
del os.environ['DEVICE_ID']
def confirm_pipeline_file(self, num_ops, op_list=None):
def confirm_pipeline_file(self, pipeline_file, num_ops, op_list=None):
Confirm pipeline JSON file with <num_ops> in the pipeline and the given optional list of ops
with open(self.pipeline_file) as file1:
with open(pipeline_file) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm ops in pipeline file
@ -101,41 +75,37 @@ class TestMindDataProfilingStartStop:
for i in range(num_ops):
assert op_info[i]["op_type"] in op_list
def confirm_cpuutil_file(self, num_pipeline_ops):
def confirm_cpuutil_file(self, cpu_util_file, num_pipeline_ops):
Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
with open(self.cpu_util_file) as file1:
with open(cpu_util_file) as file1:
data = json.load(file1)
op_info = data["op_info"]
assert len(op_info) == num_pipeline_ops
def confirm_dataset_iterator_file(self, num_batches):
def confirm_dataset_iterator_file(self, dataset_iterator_file, num_batches):
Confirm dataset iterator file exists with the correct number of rows in the file
assert os.path.exists(self.dataset_iterator_file)
actual_num_lines = sum(1 for _ in open(self.dataset_iterator_file))
assert os.path.exists(dataset_iterator_file)
actual_num_lines = sum(1 for _ in open(dataset_iterator_file))
# Confirm there are 4 lines for each batch in the dataset iterator file
assert actual_num_lines == 4 * num_batches
def test_profiling_early_stop(self):
def test_profiling_early_stop(self, tmp_path):
Test MindData Profiling with Early Stop; profile for some iterations and then stop profiling
def source1():
for i in range(8000):
yield (np.array([i]),)
# Get instance pointer for MindData profiling manager
md_profiler = cde.GlobalContext.profiling_manager()
# Initialize MindData profiling manager
# Start MindData Profiling
# Create this basic and common pipeline
# Leaf/Source-Op -> Map -> Batch
@ -150,19 +120,23 @@ class TestMindDataProfilingStartStop:
for _ in data1.create_dict_iterator(num_epochs=2):
if num_iter == 400:
# Stop MindData Profiling and Save MindData Profiling Output
num_iter += 1
assert num_iter == 500
# Confirm the content of the profiling files, including 4 ops in the pipeline JSON file
self.confirm_pipeline_file(4, ["GeneratorOp", "BatchOp", "MapOp", "EpochCtrlOp"])
pipeline_file = str(tmp_path) + "/pipeline_profiling_0.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_0.json"
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_0.txt"
def test_profiling_delayed_start(self):
# Confirm the content of the profiling files, including 4 ops in the pipeline JSON file
self.confirm_pipeline_file(pipeline_file, 4, ["GeneratorOp", "BatchOp", "MapOp", "EpochCtrlOp"])
self.confirm_cpuutil_file(cpu_util_file, 4)
self.confirm_dataset_iterator_file(dataset_iterator_file, 401)
def test_profiling_delayed_start(self, tmp_path):
Test MindData Profiling with Delayed Start; profile for subset of iterations
@ -171,11 +145,8 @@ class TestMindDataProfilingStartStop:
for i in range(8000):
yield (np.array([i]),)
# Get instance pointer for MindData profiling manager
md_profiler = cde.GlobalContext.profiling_manager()
# Initialize MindData profiling manager
# Create this basic and common pipeline
# Leaf/Source-Op -> Map -> Batch
@ -190,22 +161,25 @@ class TestMindDataProfilingStartStop:
for _ in data1.create_dict_iterator(num_epochs=1):
if num_iter == 5:
# Start MindData Profiling
elif num_iter == 400:
# Stop MindData Profiling and Save MindData Profiling Output
num_iter += 1
assert num_iter == 500
pipeline_file = str(tmp_path) + "/pipeline_profiling_0.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_0.json"
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_0.txt"
# Confirm the content of the profiling files, including 3 ops in the pipeline JSON file
self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"])
self.confirm_pipeline_file(pipeline_file, 3, ["GeneratorOp", "BatchOp", "MapOp"])
self.confirm_cpuutil_file(cpu_util_file, 3)
self.confirm_dataset_iterator_file(dataset_iterator_file, 395)
def test_profiling_multiple_start_stop(self):
def test_profiling_multiple_start_stop(self, tmp_path):
Test MindData Profiling with Delayed Start and Multiple Start-Stop Sequences
@ -214,11 +188,8 @@ class TestMindDataProfilingStartStop:
for i in range(8000):
yield (np.array([i]),)
# Get instance pointer for MindData profiling manager
md_profiler = cde.GlobalContext.profiling_manager()
# Initialize MindData profiling manager
# Create this basic and common pipeline
# Leaf/Source-Op -> Map -> Batch
@ -233,89 +204,84 @@ class TestMindDataProfilingStartStop:
for _ in data1.create_dict_iterator(num_epochs=1):
if num_iter == 5:
# Start MindData Profiling
elif num_iter == 40:
# Stop MindData Profiling
if num_iter == 200:
# Start MindData Profiling
elif num_iter == 400:
# Stop MindData Profiling
num_iter += 1
# Save MindData Profiling Output
assert num_iter == 500
pipeline_file = str(tmp_path) + "/pipeline_profiling_0.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_0.json"
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_0.txt"
# Confirm the content of the profiling files, including 3 ops in the pipeline JSON file
self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"])
self.confirm_pipeline_file(pipeline_file, 3, ["GeneratorOp", "BatchOp", "MapOp"])
self.confirm_cpuutil_file(cpu_util_file, 3)
# Note: The dataset iterator file should only contain data for batches 200 to 400
self.confirm_dataset_iterator_file(dataset_iterator_file, 200)
def test_profiling_start_start(self):
Test MindData Profiling with Start followed by Start - user error scenario
# Get instance pointer for MindData profiling manager
md_profiler = cde.GlobalContext.profiling_manager()
# Initialize MindData profiling manager
# Start MindData Profiling
with pytest.raises(RuntimeError) as info:
# Reissue Start MindData Profiling
assert "MD ProfilingManager is already running." in str(info)
# Stop MindData Profiling
def test_profiling_stop_stop(self):
def test_profiling_stop_stop(self, tmp_path):
Test MindData Profiling with Stop followed by Stop - user warning scenario
# Get instance pointer for MindData profiling manager
md_profiler = cde.GlobalContext.profiling_manager()
# Initialize MindData profiling manager
# Start MindData Profiling
# Stop MindData Profiling and Save MindData Profiling Output
# Reissue Stop MindData Profiling
# A warning "MD ProfilingManager had already stopped" is produced.
def test_profiling_stop_nostart(self):
Test MindData Profiling with Stop not without prior Start - user error scenario
# Get instance pointer for MindData profiling manager
md_profiler = cde.GlobalContext.profiling_manager()
# Initialize MindData profiling manager
with pytest.raises(RuntimeError) as info:
# Stop MindData Profiling - without prior Start()
assert "MD ProfilingManager has not started yet." in str(info)
# Start MindData Profiling
# Stop MindData Profiling - to return profiler to a healthy state

View File

@ -26,15 +26,13 @@ import mindspore.dataset.transforms.c_transforms as C
import mindspore._c_dataengine as cde
from mindspore.profiler.parser.minddata_analyzer import MinddataProfilingAnalyzer
# add file name to rank id mapping to avoid file writing crash
file_name_map_rank_id = {"test_analyze_basic": "0",
"test_analyze_sequential_pipelines_invalid": "1"}
class TestMinddataProfilingAnalyzer:
Test the MinddataProfilingAnalyzer class
Note: Use pytest fixture tmp_path to create files within this temporary directory,
which is automatically created for each test and deleted at the end of the test.
def setup_class(self):
@ -44,13 +42,6 @@ class TestMinddataProfilingAnalyzer:
# Get instance pointer for MindData profiling manager
self.md_profiler = cde.GlobalContext.profiling_manager()
self._pipeline_file = "./pipeline_profiling"
self._cpu_util_file = "./minddata_cpu_utilization"
self._dataset_iterator_file = "./dataset_iterator_profiling"
self._summary_json_file = "./minddata_pipeline_summary"
self._summary_csv_file = "./minddata_pipeline_summary"
self._analyze_file_path = "./"
# This is the set of keys for success case
self._expected_summary_keys_success = \
['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names',
@ -61,26 +52,10 @@ class TestMinddataProfilingAnalyzer:
Run before each test function.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
summary_json_file = self._summary_json_file + "_" + file_id + ".json"
summary_csv_file = self._summary_csv_file + "_" + file_id + ".csv"
# Confirm MindData Profiling files do not yet exist
assert os.path.exists(pipeline_file) is False
assert os.path.exists(cpu_util_file) is False
assert os.path.exists(dataset_iterator_file) is False
# Confirm MindData Profiling analyze summary files do not yet exist
assert os.path.exists(summary_json_file) is False
assert os.path.exists(summary_csv_file) is False
# Set the MindData Profiling related environment variables
os.environ['RANK_ID'] = file_id
os.environ['DEVICE_ID'] = file_id
os.environ['RANK_ID'] = "7"
os.environ['DEVICE_ID'] = "7"
# Initialize MindData profiling manager
@ -93,24 +68,6 @@ class TestMinddataProfilingAnalyzer:
Run after each test function.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
summary_json_file = self._summary_json_file + "_" + file_id + ".json"
summary_csv_file = self._summary_csv_file + "_" + file_id + ".csv"
# Delete MindData profiling files generated from the test.
# Delete MindData profiling analyze summary files generated from the test.
# Disable MindData Profiling related environment variables
del os.environ['RANK_ID']
del os.environ['DEVICE_ID']
@ -132,16 +89,12 @@ class TestMinddataProfilingAnalyzer:
return result
def verify_md_summary(self, md_summary_dict, expected_summary_keys):
def verify_md_summary(self, md_summary_dict, expected_summary_keys, output_dir):
Verify the content of the 3 variations of the MindData Profiling analyze summary output.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
summary_json_file = self._summary_json_file + "_" + file_id + ".json"
summary_csv_file = self._summary_csv_file + "_" + file_id + ".csv"
summary_json_file = output_dir + "/minddata_pipeline_summary_7.json"
summary_csv_file = output_dir + "/minddata_pipeline_summary_7.csv"
# Confirm MindData Profiling analyze summary files are created
assert os.path.exists(summary_json_file) is True
assert os.path.exists(summary_csv_file) is True
@ -188,19 +141,12 @@ class TestMinddataProfilingAnalyzer:
for i in range(8000):
yield (np.array([i]),)
def test_analyze_basic(self):
def test_analyze_basic(self, tmp_path):
Test MindData profiling analyze summary files exist with basic pipeline.
Also test basic content (subset of keys and values) from the returned summary result.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
# Create this basic and common linear pipeline
# Generator -> Map -> Batch -> Repeat -> EpochCtrl
data1 = ds.GeneratorDataset(self.mysource, ["col1"])
@ -219,7 +165,12 @@ class TestMinddataProfilingAnalyzer:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_7.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_7.json"
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_7.txt"
analyze_file_path = str(tmp_path) + "/"
# Confirm MindData Profiling files are created
assert os.path.exists(pipeline_file) is True
@ -227,7 +178,7 @@ class TestMinddataProfilingAnalyzer:
assert os.path.exists(dataset_iterator_file) is True
# Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
md_analyzer = MinddataProfilingAnalyzer(self._analyze_file_path, file_id, self._analyze_file_path)
md_analyzer = MinddataProfilingAnalyzer(analyze_file_path, "7", analyze_file_path)
md_summary_dict = md_analyzer.analyze()
# Verify MindData Profiling Analyze Summary output
@ -235,7 +186,7 @@ class TestMinddataProfilingAnalyzer:
# 1. returned dictionary
# 2. JSON file
# 3. CSV file
self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success)
self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success, str(tmp_path))
# 4. Verify non-variant values or number of values in the tested pipeline for certain keys
# of the returned dictionary
@ -253,16 +204,10 @@ class TestMinddataProfilingAnalyzer:
assert md_summary_dict["parent_id"] == [-1, 0, 1, 2, 3]
assert len(md_summary_dict["avg_cpu_pct"]) == 5
def test_analyze_sequential_pipelines_invalid(self):
def test_analyze_sequential_pipelines_invalid(self, tmp_path):
Test invalid scenario in which MinddataProfilingAnalyzer is called for two sequential pipelines.
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
pipeline_file = self._pipeline_file + "_" + file_id + ".json"
cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
# Create the pipeline
# Generator -> Map -> Batch -> EpochCtrl
@ -283,7 +228,12 @@ class TestMinddataProfilingAnalyzer:
# Stop MindData Profiling and save output files to current working directory
pipeline_file = str(tmp_path) + "/pipeline_profiling_7.json"
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_7.json"
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_7.txt"
analyze_file_path = str(tmp_path) + "/"
# Confirm MindData Profiling files are created
assert os.path.exists(pipeline_file) is True
@ -307,7 +257,7 @@ class TestMinddataProfilingAnalyzer:
# Stop MindData Profiling and save output files to current working directory
# Confirm MindData Profiling files are created
# Note: There is an MD bug in which which the pipeline file is not recreated;
@ -317,11 +267,11 @@ class TestMinddataProfilingAnalyzer:
assert os.path.exists(dataset_iterator_file) is True
# Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
md_analyzer = MinddataProfilingAnalyzer(self._analyze_file_path, file_id, self._analyze_file_path)
md_analyzer = MinddataProfilingAnalyzer(analyze_file_path, "7", analyze_file_path)
md_summary_dict = md_analyzer.analyze()
# Verify MindData Profiling Analyze Summary output
self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success)
self.verify_md_summary(md_summary_dict, self._expected_summary_keys_success, str(tmp_path))
# Confirm pipeline data contains info for 3 ops
assert md_summary_dict["pipeline_ops"] == ["Batch(id=0)", "Map(id=1)", "Generator(id=2)"]