!18292 MD Profiling UT: Check CPU util file. Add sequential pipeline UTs

Merge pull request !18292 from cathwong/ckw_mon_py_analyze_ut4
This commit is contained in:
i-robot 2021-06-17 21:47:10 +08:00 committed by Gitee
commit ab3259c43a
3 changed files with 150 additions and 10 deletions

View File

@ -88,7 +88,7 @@ def test_apply_flow_case_0(id_=0):
data1 = data1.apply(dataset_fn)
num_iter = 0
for _ in data1.create_dict_iterator(num_epochs=1):
num_iter = num_iter + 1
num_iter += 1
if id_ == 0:
assert num_iter == 16
@ -119,7 +119,7 @@ def test_apply_flow_case_1(id_=1):
data1 = data1.apply(dataset_fn)
num_iter = 0
for _ in data1.create_dict_iterator(num_epochs=1):
num_iter = num_iter + 1
num_iter += 1
if id_ == 0:
assert num_iter == 16
@ -150,7 +150,7 @@ def test_apply_flow_case_2(id_=2):
data1 = data1.apply(dataset_fn)
num_iter = 0
for _ in data1.create_dict_iterator(num_epochs=1):
num_iter = num_iter + 1
num_iter += 1
if id_ == 0:
assert num_iter == 16
@ -181,7 +181,7 @@ def test_apply_flow_case_3(id_=3):
data1 = data1.apply(dataset_fn)
num_iter = 0
for _ in data1.create_dict_iterator(num_epochs=1):
num_iter = num_iter + 1
num_iter += 1
if id_ == 0:
assert num_iter == 16

View File

@ -157,7 +157,7 @@ def test_mix_up_multi():
assert image1[index].all() == img_golden.all()
logger.info("====test several batch mixup ok====")
break
num_iter = num_iter + 1
num_iter += 1
if __name__ == "__main__":

View File

@ -57,6 +57,17 @@ def delete_profiling_files():
del os.environ['DEVICE_ID']
def confirm_cpuutil(num_pipeline_ops):
"""
Confirm CPU utilization JSON file when <num_pipeline_ops> are in the pipeline
"""
with open(CPU_UTIL_FILE) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm <num_pipeline_ops>+1 ops in CPU util file (including op_id=-1 for monitor thread)
assert len(op_info) == num_pipeline_ops + 1
def test_profiling_simple_pipeline():
"""
Generator -> Shuffle -> Batch
@ -129,6 +140,9 @@ def test_profiling_complex_pipeline():
# Note: Zip is an inline op and hence does not have metrics information
assert op_info[i]["metrics"] is None
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
except Exception as error:
delete_profiling_files()
raise error
@ -161,16 +175,17 @@ def test_profiling_inline_ops_pipeline1():
data3 = data1.concat(data2)
try:
# Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
num_iter = 0
# Note: Do not explicitly set num_epochs argument in create_tuple_iterator() call
# Here i refers to index, d refers to data element
for i, d in enumerate(data3.create_tuple_iterator(output_numpy=True, num_epochs=2)):
num_iter = num_iter + 1
for i, d in enumerate(data3.create_tuple_iterator(output_numpy=True)):
num_iter += 1
t = d
assert i == t[0][0]
assert num_iter == 10
# Confirm pipeline is created with EpochCtrl op
with open(PIPELINE_FILE) as f:
data = json.load(f)
op_info = data["op_info"]
@ -185,6 +200,9 @@ def test_profiling_inline_ops_pipeline1():
assert "length" in op_info[i]["metrics"]["output_queue"]
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
confirm_cpuutil(4)
except Exception as error:
delete_profiling_files()
raise error
@ -229,6 +247,9 @@ def test_profiling_inline_ops_pipeline2():
assert "length" in op_info[i]["metrics"]["output_queue"]
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
except Exception as error:
delete_profiling_files()
raise error
@ -292,7 +313,7 @@ def test_profiling_basic_pipeline():
num_iter = 0
# Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
for _ in data1.create_dict_iterator(num_epochs=2):
num_iter = num_iter + 1
num_iter += 1
assert num_iter == 1000
@ -310,6 +331,9 @@ def test_profiling_basic_pipeline():
assert "length" in op_info[i]["metrics"]["output_queue"]
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
except Exception as error:
delete_profiling_files()
raise error
@ -342,7 +366,7 @@ def test_profiling_cifar10_pipeline():
num_iter = 0
# Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
for _ in data1.create_dict_iterator(num_epochs=1):
num_iter = num_iter + 1
num_iter += 1
assert num_iter == 750
@ -360,6 +384,120 @@ def test_profiling_cifar10_pipeline():
assert "length" in op_info[i]["metrics"]["output_queue"]
assert "throughput" in op_info[i]["metrics"]["output_queue"]
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
confirm_cpuutil(5)
except Exception as error:
delete_profiling_files()
raise error
else:
delete_profiling_files()
def confirm_3ops_in_pipeline():
with open(PIPELINE_FILE) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm 3 ops in pipeline file
assert len(op_info) == 3
for i in range(3):
assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp", "EpochCtrlOp")
def confirm_2ops_in_pipeline():
with open(PIPELINE_FILE) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm 2 ops in pipeline file
assert len(op_info) == 2
for i in range(2):
assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp")
def test_profiling_epochctrl3():
"""
Test with these 2 sequential pipelines:
1) Generator -> Batch -> EpochCtrl
2) Generator -> Batch
Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
"""
set_profiling_env_var()
source = [(np.array([x]),) for x in range(64)]
data1 = ds.GeneratorDataset(source, ["data"])
data1 = data1.batch(32)
try:
# Test A - Call create_dict_iterator with num_epochs>1
num_iter = 0
# Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
for _ in data1.create_dict_iterator(num_epochs=2):
num_iter += 1
assert num_iter == 2
confirm_3ops_in_pipeline()
confirm_cpuutil(3)
# Test B - Call create_dict_iterator with num_epochs=1
num_iter = 0
# Note: If create_tuple_iterator() is called with num_epochs=1,
# then EpochCtrlOp should not be NOT added to the pipeline
for _ in data1.create_dict_iterator(num_epochs=1):
num_iter += 1
assert num_iter == 2
# confirm_2ops_in_pipeline()
# MD BUG: Confirm pipeline file is not changed and wrongly still has 3 ops
confirm_3ops_in_pipeline()
# Confirm CPU util file has correct number of ops
confirm_cpuutil(2)
except Exception as error:
delete_profiling_files()
raise error
else:
delete_profiling_files()
def test_profiling_epochctrl2():
"""
Test with these 2 sequential pipelines:
1) Generator -> Batch
2) Generator -> Batch -> EpochCtrl
"""
set_profiling_env_var()
source = [(np.array([x]),) for x in range(64)]
data2 = ds.GeneratorDataset(source, ["data"])
data2 = data2.batch(16)
try:
# Test A - Call create_dict_iterator with num_epochs=1
num_iter = 0
# Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
for _ in data2.create_dict_iterator(num_epochs=1):
num_iter += 1
assert num_iter == 4
confirm_2ops_in_pipeline()
confirm_cpuutil(2)
# Test B - Call create_dict_iterator with num_epochs>1
num_iter = 0
# Note: If create_tuple_iterator() is called with num_epochs>1,
# then EpochCtrlOp should be added to the pipeline
for _ in data2.create_dict_iterator(num_epochs=2):
num_iter += 1
assert num_iter == 4
# confirm_3ops_in_pipeline()
# MD BUG: Confirm pipeline file is not changed and wrongly still has 2 ops
confirm_2ops_in_pipeline()
# Confirm CPU util file has correct number of ops
confirm_cpuutil(3)
except Exception as error:
delete_profiling_files()
raise error
@ -376,3 +514,5 @@ if __name__ == "__main__":
test_profiling_sampling_interval()
test_profiling_basic_pipeline()
test_profiling_cifar10_pipeline()
test_profiling_epochctrl3()
test_profiling_epochctrl2()