MD Profiling: Update Connector Init to Remove any existing file

to fix sequential pipeline scenario.
This commit is contained in:
Cathy Wong 2021-09-02 16:00:13 -04:00
parent da69703896
commit bc85c606b8
5 changed files with 39 additions and 56 deletions

View File

@ -92,11 +92,15 @@ Status ConnectorSize::SaveToFile() {
// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
os << output;
os.close();
return Status::OK();
}
Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
Path path = Path(file_path_);
// Remove the file if it exists (from prior profiling usage)
RETURN_IF_NOT_OK(path.Remove());
return Status::OK();
}

View File

@ -125,11 +125,15 @@ Status ConnectorThroughput::SaveToFile() {
// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
os << output;
os.close();
return Status::OK();
}
Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
Path path = Path(file_path_);
// Remove the file if it exists (from prior profiling usage)
RETURN_IF_NOT_OK(path.Remove());
return Status::OK();
}

View File

@ -287,10 +287,8 @@ class MinddataProfilingAnalyzer:
if metrics and metrics['output_queue']:
queue_size = metrics['output_queue']['size']
queue_length = metrics['output_queue']['length']
if queue_length == 0:
raise ValueError("The input queue can not be None.")
queue_average_size = round(sum(queue_size) / len(queue_size), 2) if queue_size else -1
queue_utilization_pct = round(100 * queue_average_size / queue_length, 2)
queue_utilization_pct = round(100 * queue_average_size / queue_length, 2) if queue_length else -1
# Compute percentage of time queue is empty
empty_count = 0
for q_size in queue_size:

View File

@ -61,7 +61,7 @@ def delete_profiling_files():
def confirm_cpuutil(num_pipeline_ops):
"""
Confirm CPU utilization JSON file when <num_pipeline_ops> are in the pipeline
Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
"""
with open(CPU_UTIL_FILE) as file1:
data = json.load(file1)
@ -70,6 +70,19 @@ def confirm_cpuutil(num_pipeline_ops):
assert len(op_info) == num_pipeline_ops + 1
def confirm_ops_in_pipeline(num_ops, op_list):
"""
Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
"""
with open(PIPELINE_FILE) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm ops in pipeline file
assert len(op_info) == num_ops
for i in range(num_ops):
assert op_info[i]["op_type"] in op_list
def test_profiling_simple_pipeline():
"""
Generator -> Shuffle -> Batch
@ -397,26 +410,6 @@ def test_profiling_cifar10_pipeline():
delete_profiling_files()
def confirm_3ops_in_pipeline():
with open(PIPELINE_FILE) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm 3 ops in pipeline file
assert len(op_info) == 3
for i in range(3):
assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp", "EpochCtrlOp")
def confirm_2ops_in_pipeline():
with open(PIPELINE_FILE) as file1:
data = json.load(file1)
op_info = data["op_info"]
# Confirm 2 ops in pipeline file
assert len(op_info) == 2
for i in range(2):
assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp")
def test_profiling_seq_pipelines_epochctrl3():
"""
Test with these 2 sequential pipelines:
@ -438,7 +431,8 @@ def test_profiling_seq_pipelines_epochctrl3():
num_iter += 1
assert num_iter == 2
confirm_3ops_in_pipeline()
# Confirm pipeline file and CPU util file each have 3 ops
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
confirm_cpuutil(3)
# Test B - Call create_dict_iterator with num_epochs=1
@ -449,10 +443,8 @@ def test_profiling_seq_pipelines_epochctrl3():
num_iter += 1
assert num_iter == 2
# confirm_2ops_in_pipeline()
# MD BUG: Confirm pipeline file is not changed and wrongly still has 3 ops
confirm_3ops_in_pipeline()
# Confirm CPU util file has correct number of ops
# Confirm pipeline file and CPU util file each have 2 ops
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
confirm_cpuutil(2)
except Exception as error:
@ -483,7 +475,8 @@ def test_profiling_seq_pipelines_epochctrl2():
num_iter += 1
assert num_iter == 4
confirm_2ops_in_pipeline()
# Confirm pipeline file and CPU util file each have 2 ops
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
confirm_cpuutil(2)
# Test B - Call create_dict_iterator with num_epochs>1
@ -494,10 +487,8 @@ def test_profiling_seq_pipelines_epochctrl2():
num_iter += 1
assert num_iter == 4
# confirm_3ops_in_pipeline()
# MD BUG: Confirm pipeline file is not changed and wrongly still has 2 ops
confirm_2ops_in_pipeline()
# Confirm CPU util file has correct number of ops
# Confirm pipeline file and CPU util file each have 3 ops
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
confirm_cpuutil(3)
except Exception as error:
@ -527,7 +518,8 @@ def test_profiling_seq_pipelines_repeat():
num_iter += 1
assert num_iter == 4
confirm_2ops_in_pipeline()
# Confirm pipeline file and CPU util file each have 2 ops
confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
confirm_cpuutil(2)
# Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
@ -537,10 +529,8 @@ def test_profiling_seq_pipelines_repeat():
num_iter += 1
assert num_iter == 20
# confirm_3ops_in_pipeline()
# MD BUG: Confirm pipeline file is not changed and wrongly still has 2 ops
confirm_2ops_in_pipeline()
# Confirm CPU util file has correct number of ops
# Confirm pipeline file and CPU util file each have 3 ops
confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"])
confirm_cpuutil(3)
except Exception as error:

View File

@ -42,22 +42,12 @@ class TestMinddataProfilingAnalyzer():
self._SUMMARY_CSV_FILE = "./minddata_pipeline_summary_7.csv"
self._ANALYZE_FILE_PATH = "./"
# These are the minimum subset of expected keys (in alphabetical order) in the MindData Analyzer summary output
# This is the set of keys for success case
self._EXPECTED_SUMMARY_KEYS_SUCCESS = \
['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names',
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
# This is the set of keys for the case which omits the keys for composite computation of more than one raw file.
# This is used for the invalid user case in which the number of ops in the pipeline file does not match
# the number of ops in the CPU utilization file.
self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE = \
['avg_cpu_pct', 'children_ids', 'num_workers', 'op_ids', 'op_names',
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
def setup_method(self):
"""
@ -269,13 +259,10 @@ class TestMinddataProfilingAnalyzer():
md_summary_dict = md_analyzer.analyze()
# Verify MindData Profiling Analyze Summary output
# Use self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE, since composite keys are not produced, since there is a mismatch
# between the 4 ops in the stale pipeline file versus the 3 ops in the recreated cpu util file
self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE)
self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_SUCCESS)
# Confirm pipeline data wrongly contains info for 4 ops
assert md_summary_dict["pipeline_ops"] == ["EpochCtrl(id=0)", "Batch(id=1)", "Map(id=2)",
"Generator(id=3)"]
# Confirm pipeline data contains info for 3 ops
assert md_summary_dict["pipeline_ops"] == ["Batch(id=0)", "Map(id=1)", "Generator(id=2)"]
# Verify CPU util data contains info for only 3 ops
# Verify CPU util data contains info for 3 ops
assert len(md_summary_dict["avg_cpu_pct"]) == 3