diff --git a/tests/ut/python/dataset/test_apply.py b/tests/ut/python/dataset/test_apply.py index 9c08376e84c..3c201d8849a 100644 --- a/tests/ut/python/dataset/test_apply.py +++ b/tests/ut/python/dataset/test_apply.py @@ -88,7 +88,7 @@ def test_apply_flow_case_0(id_=0): data1 = data1.apply(dataset_fn) num_iter = 0 for _ in data1.create_dict_iterator(num_epochs=1): - num_iter = num_iter + 1 + num_iter += 1 if id_ == 0: assert num_iter == 16 @@ -119,7 +119,7 @@ def test_apply_flow_case_1(id_=1): data1 = data1.apply(dataset_fn) num_iter = 0 for _ in data1.create_dict_iterator(num_epochs=1): - num_iter = num_iter + 1 + num_iter += 1 if id_ == 0: assert num_iter == 16 @@ -150,7 +150,7 @@ def test_apply_flow_case_2(id_=2): data1 = data1.apply(dataset_fn) num_iter = 0 for _ in data1.create_dict_iterator(num_epochs=1): - num_iter = num_iter + 1 + num_iter += 1 if id_ == 0: assert num_iter == 16 @@ -181,7 +181,7 @@ def test_apply_flow_case_3(id_=3): data1 = data1.apply(dataset_fn) num_iter = 0 for _ in data1.create_dict_iterator(num_epochs=1): - num_iter = num_iter + 1 + num_iter += 1 if id_ == 0: assert num_iter == 16 diff --git a/tests/ut/python/dataset/test_mixup_label_smoothing.py b/tests/ut/python/dataset/test_mixup_label_smoothing.py index 227b9cd63ac..cbab4293a48 100644 --- a/tests/ut/python/dataset/test_mixup_label_smoothing.py +++ b/tests/ut/python/dataset/test_mixup_label_smoothing.py @@ -157,7 +157,7 @@ def test_mix_up_multi(): assert image1[index].all() == img_golden.all() logger.info("====test several batch mixup ok====") break - num_iter = num_iter + 1 + num_iter += 1 if __name__ == "__main__": diff --git a/tests/ut/python/dataset/test_profiling.py b/tests/ut/python/dataset/test_profiling.py index d86d760198d..0fda594227c 100644 --- a/tests/ut/python/dataset/test_profiling.py +++ b/tests/ut/python/dataset/test_profiling.py @@ -57,6 +57,17 @@ def delete_profiling_files(): del os.environ['DEVICE_ID'] +def confirm_cpuutil(num_pipeline_ops): + """ + Confirm CPU utilization JSON file when are in the pipeline + """ + with open(CPU_UTIL_FILE) as file1: + data = json.load(file1) + op_info = data["op_info"] + # Confirm +1 ops in CPU util file (including op_id=-1 for monitor thread) + assert len(op_info) == num_pipeline_ops + 1 + + def test_profiling_simple_pipeline(): """ Generator -> Shuffle -> Batch @@ -129,6 +140,9 @@ def test_profiling_complex_pipeline(): # Note: Zip is an inline op and hence does not have metrics information assert op_info[i]["metrics"] is None + # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file + confirm_cpuutil(5) + except Exception as error: delete_profiling_files() raise error @@ -161,16 +175,17 @@ def test_profiling_inline_ops_pipeline1(): data3 = data1.concat(data2) try: - # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline num_iter = 0 + # Note: Do not explicitly set num_epochs argument in create_tuple_iterator() call # Here i refers to index, d refers to data element - for i, d in enumerate(data3.create_tuple_iterator(output_numpy=True, num_epochs=2)): - num_iter = num_iter + 1 + for i, d in enumerate(data3.create_tuple_iterator(output_numpy=True)): + num_iter += 1 t = d assert i == t[0][0] assert num_iter == 10 + # Confirm pipeline is created with EpochCtrl op with open(PIPELINE_FILE) as f: data = json.load(f) op_info = data["op_info"] @@ -185,6 +200,9 @@ def test_profiling_inline_ops_pipeline1(): assert "length" in op_info[i]["metrics"]["output_queue"] assert "throughput" in op_info[i]["metrics"]["output_queue"] + # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file + confirm_cpuutil(4) + except Exception as error: delete_profiling_files() raise error @@ -229,6 +247,9 @@ def test_profiling_inline_ops_pipeline2(): assert "length" in op_info[i]["metrics"]["output_queue"] assert "throughput" in op_info[i]["metrics"]["output_queue"] + # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file + confirm_cpuutil(5) + except Exception as error: delete_profiling_files() raise error @@ -292,7 +313,7 @@ def test_profiling_basic_pipeline(): num_iter = 0 # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline for _ in data1.create_dict_iterator(num_epochs=2): - num_iter = num_iter + 1 + num_iter += 1 assert num_iter == 1000 @@ -310,6 +331,9 @@ def test_profiling_basic_pipeline(): assert "length" in op_info[i]["metrics"]["output_queue"] assert "throughput" in op_info[i]["metrics"]["output_queue"] + # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file + confirm_cpuutil(5) + except Exception as error: delete_profiling_files() raise error @@ -342,7 +366,7 @@ def test_profiling_cifar10_pipeline(): num_iter = 0 # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline for _ in data1.create_dict_iterator(num_epochs=1): - num_iter = num_iter + 1 + num_iter += 1 assert num_iter == 750 @@ -360,6 +384,120 @@ def test_profiling_cifar10_pipeline(): assert "length" in op_info[i]["metrics"]["output_queue"] assert "throughput" in op_info[i]["metrics"]["output_queue"] + # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file + confirm_cpuutil(5) + + except Exception as error: + delete_profiling_files() + raise error + + else: + delete_profiling_files() + + +def confirm_3ops_in_pipeline(): + with open(PIPELINE_FILE) as file1: + data = json.load(file1) + op_info = data["op_info"] + # Confirm 3 ops in pipeline file + assert len(op_info) == 3 + for i in range(3): + assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp", "EpochCtrlOp") + + +def confirm_2ops_in_pipeline(): + with open(PIPELINE_FILE) as file1: + data = json.load(file1) + op_info = data["op_info"] + # Confirm 2 ops in pipeline file + assert len(op_info) == 2 + for i in range(2): + assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp") + + +def test_profiling_epochctrl3(): + """ + Test with these 2 sequential pipelines: + 1) Generator -> Batch -> EpochCtrl + 2) Generator -> Batch + Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation. + """ + set_profiling_env_var() + + source = [(np.array([x]),) for x in range(64)] + data1 = ds.GeneratorDataset(source, ["data"]) + data1 = data1.batch(32) + + try: + # Test A - Call create_dict_iterator with num_epochs>1 + num_iter = 0 + # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline + for _ in data1.create_dict_iterator(num_epochs=2): + num_iter += 1 + assert num_iter == 2 + + confirm_3ops_in_pipeline() + confirm_cpuutil(3) + + # Test B - Call create_dict_iterator with num_epochs=1 + num_iter = 0 + # Note: If create_tuple_iterator() is called with num_epochs=1, + # then EpochCtrlOp should not be NOT added to the pipeline + for _ in data1.create_dict_iterator(num_epochs=1): + num_iter += 1 + assert num_iter == 2 + + # confirm_2ops_in_pipeline() + # MD BUG: Confirm pipeline file is not changed and wrongly still has 3 ops + confirm_3ops_in_pipeline() + # Confirm CPU util file has correct number of ops + confirm_cpuutil(2) + + except Exception as error: + delete_profiling_files() + raise error + + else: + delete_profiling_files() + + +def test_profiling_epochctrl2(): + """ + Test with these 2 sequential pipelines: + 1) Generator -> Batch + 2) Generator -> Batch -> EpochCtrl + """ + set_profiling_env_var() + + source = [(np.array([x]),) for x in range(64)] + data2 = ds.GeneratorDataset(source, ["data"]) + data2 = data2.batch(16) + + try: + # Test A - Call create_dict_iterator with num_epochs=1 + num_iter = 0 + # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline + for _ in data2.create_dict_iterator(num_epochs=1): + num_iter += 1 + assert num_iter == 4 + + confirm_2ops_in_pipeline() + confirm_cpuutil(2) + + # Test B - Call create_dict_iterator with num_epochs>1 + num_iter = 0 + # Note: If create_tuple_iterator() is called with num_epochs>1, + # then EpochCtrlOp should be added to the pipeline + for _ in data2.create_dict_iterator(num_epochs=2): + num_iter += 1 + assert num_iter == 4 + + # confirm_3ops_in_pipeline() + # MD BUG: Confirm pipeline file is not changed and wrongly still has 2 ops + confirm_2ops_in_pipeline() + # Confirm CPU util file has correct number of ops + confirm_cpuutil(3) + except Exception as error: delete_profiling_files() raise error @@ -376,3 +514,5 @@ if __name__ == "__main__": test_profiling_sampling_interval() test_profiling_basic_pipeline() test_profiling_cifar10_pipeline() + test_profiling_epochctrl3() + test_profiling_epochctrl2()