MD Profiler: support multiple start/stop calls

This commit is contained in:
mohammad 2021-11-30 23:21:05 -05:00
parent 4a2c1c3a50
commit af6e587c28
7 changed files with 123 additions and 14 deletions

View File

@ -113,6 +113,12 @@ Status ConnectorSize::Init() {
return Status::OK();
}
void ConnectorSize::Clear() {
ts_.clear();
sample_table_.clear();
initial_nodes_data.clear();
}
Status ConnectorSize::GetOpConnectorSize(int32_t op_id, uint64_t start_time, uint64_t end_time,
std::vector<int32_t> *result) {
MS_LOG(DEBUG) << "Op_id: " << op_id << " start_ts: " << start_time << " end_ts: " << end_time;

View File

@ -68,6 +68,9 @@ class ConnectorSize : public Sampling {
// Get the vector of connector sizes of given op for samples taken between start and end time
Status GetOpConnectorSize(int32_t op_id, uint64_t start_time, uint64_t end_time, std::vector<int32_t> *result);
// Clear all collected data
void Clear() override;
private:
json initial_nodes_data; // store data when execution tree is running. (all information for ops except sampled data)
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer

View File

@ -406,6 +406,15 @@ Status CpuSampler::Init() {
return Status::OK();
}
void CpuSampler::Clear() {
ts_.clear();
tasks_.clear();
main_thread_cpu_info_.reset();
main_process_cpu_info_.reset();
op_info_by_id_.clear();
fetched_all_python_multiprocesses_ = false;
}
Status CpuSampler::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
Path path = GetFileName(dir_path, rank_id);
std::string file_path = path.ToString();

View File

@ -151,6 +151,9 @@ class CpuSampler : public Sampling {
Status GetOpUserCpuUtil(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result);
Status GetOpSysCpuUtil(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result);
// Clear all collected data
void Clear() override;
private:
Status UpdateTaskList();
bool fetched_all_python_multiprocesses_{};

View File

@ -222,6 +222,12 @@ Status Tracing::Init() {
size_t Tracing::GetNumberSteps() { return ts_.size(); }
void Tracing::Clear() {
value_.clear();
records_.clear();
ts_.clear();
}
// Constructor
ProfilingManager::ProfilingManager()
: profiling_state_(ProfilingState::kProfilingStateUnBegun), tree_(nullptr), autotuning_(false), profiling_(false) {}
@ -646,12 +652,14 @@ void ProfilingManager::RecordEndOfEpoch(uint32_t step_num) {
}
Status ProfilingManager::Reset() {
tracing_nodes_.clear();
sampling_nodes_.clear();
for (auto node : tracing_nodes_) {
node.second->Clear();
}
for (auto node : sampling_nodes_) {
node.second->Clear();
}
epoch_end_ts_.clear();
epoch_end_step_.clear();
perf_monitor_.reset();
tree_ = nullptr;
profiling_state_ = ProfilingState::kProfilingStateUnBegun;
autotuning_ = false;
profiling_ = false;
@ -666,6 +674,9 @@ Status ProfilingManager::Init(const bool for_autotune) {
CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateRunning,
"Stop MD Profiler before reinitializing it.");
Reset();
tracing_nodes_.clear();
sampling_nodes_.clear();
tree_ = nullptr;
CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ == ProfilingState::kProfilingStateUnBegun,
"MD Profiler is in an unexpected state.");
if (for_autotune) {
@ -681,8 +692,19 @@ Status ProfilingManager::Init(const bool for_autotune) {
Status ProfilingManager::Start() {
CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateRunning,
"MD ProfilingManager is already running.");
CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ != ProfilingState::kProfilingStateFinished,
"MD ProfilingManager is already finished.");
if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
// This scenario (start, stop, and then start again) only happens in profiling, not autotune.
MS_LOG(INFO) << "MD ProfilingManager had already stopped. Resetting...";
Reset();
for (const auto &node : sampling_nodes_) {
RETURN_IF_NOT_OK(node.second->Init());
}
for (const auto &node : tracing_nodes_) {
RETURN_IF_NOT_OK(node.second->Init());
}
profiling_ = true;
MS_LOG(INFO) << "MD profiler is reset successfully for profiling.";
}
profiling_state_ = ProfilingState::kProfilingStateRunning;
for (const auto &node : tracing_nodes_) {

View File

@ -68,6 +68,9 @@ class Profiling : std::enable_shared_from_this<Profiling> {
// Stop collecting data
Status Stop();
// Clear all collected data
virtual void Clear() = 0;
protected:
bool active_; // show current state of ProfilingManager (running, or paused)
std::mutex lock_;
@ -115,6 +118,9 @@ class Tracing : public Profiling {
Status StepIntervalForTimeRange(uint64_t start_ts, uint64_t end_ts, int32_t *start_step, int32_t *end_step);
size_t GetNumberSteps();
// Clear all collected data
void Clear() override;
protected:
Tracing() = default;
std::vector<std::string> value_;

View File

@ -31,10 +31,11 @@ SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
# Add file name to rank id mapping so that each profiling file name is unique,
# to support parallel test execution
file_name_map_rank_id = {"test_profiling_early_stop": "0",
"test_profiling_delay_start": "1",
"test_profiling_delayed_start": "1",
"test_profiling_start_start": "2",
"test_profiling_stop_stop": "3",
"test_profiling_stop_nostart": "4"}
"test_profiling_multiple_start_stop": "3",
"test_profiling_stop_stop": "4",
"test_profiling_stop_nostart": "5"}
@pytest.mark.forked
@ -109,11 +110,14 @@ class TestMindDataProfilingStartStop:
op_info = data["op_info"]
assert len(op_info) == num_pipeline_ops
def confirm_dataset_iterator_file(self):
def confirm_dataset_iterator_file(self, num_batches):
"""
Confirm dataset iterator file exists
Confirm dataset iterator file exists with the correct number of rows in the file
"""
assert os.path.exists(self.dataset_iterator_file)
actual_num_lines = sum(1 for _ in open(self.dataset_iterator_file))
# Confirm there are 4 lines for each batch in the dataset iterator file
assert actual_num_lines == 4 * num_batches
def test_profiling_early_stop(self):
"""
@ -156,9 +160,9 @@ class TestMindDataProfilingStartStop:
# Confirm the content of the profiling files, including 4 ops in the pipeline JSON file
self.confirm_pipeline_file(4, ["GeneratorOp", "BatchOp", "MapOp", "EpochCtrlOp"])
self.confirm_cpuutil_file(4)
self.confirm_dataset_iterator_file()
self.confirm_dataset_iterator_file(401)
def test_profiling_delay_start(self):
def test_profiling_delayed_start(self):
"""
Test MindData Profiling with Delayed Start; profile for subset of iterations
"""
@ -199,7 +203,58 @@ class TestMindDataProfilingStartStop:
# Confirm the content of the profiling files, including 3 ops in the pipeline JSON file
self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"])
self.confirm_cpuutil_file(3)
self.confirm_dataset_iterator_file()
self.confirm_dataset_iterator_file(395)
def test_profiling_multiple_start_stop(self):
"""
Test MindData Profiling with Delayed Start and Multiple Start-Stop Sequences
"""
def source1():
for i in range(8000):
yield (np.array([i]),)
# Get instance pointer for MindData profiling manager
md_profiler = cde.GlobalContext.profiling_manager()
# Initialize MindData profiling manager
md_profiler.init()
# Create this basic and common pipeline
# Leaf/Source-Op -> Map -> Batch
data1 = ds.GeneratorDataset(source1, ["col1"])
type_cast_op = C.TypeCast(mstype.int32)
data1 = data1.map(operations=type_cast_op, input_columns="col1")
data1 = data1.batch(16)
num_iter = 0
# Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is not added to the pipeline
for _ in data1.create_dict_iterator(num_epochs=1):
if num_iter == 5:
# Start MindData Profiling
md_profiler.start()
elif num_iter == 40:
# Stop MindData Profiling
md_profiler.stop()
if num_iter == 200:
# Start MindData Profiling
md_profiler.start()
elif num_iter == 400:
# Stop MindData Profiling
md_profiler.stop()
num_iter += 1
# Save MindData Profiling Output
md_profiler.save(os.getcwd())
assert num_iter == 500
# Confirm the content of the profiling files, including 3 ops in the pipeline JSON file
self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"])
self.confirm_cpuutil_file(3)
# Note: The dataset iterator file should only contain data for batches 200 to 400
self.confirm_dataset_iterator_file(200)
def test_profiling_start_start(self):
"""
@ -259,3 +314,8 @@ class TestMindDataProfilingStartStop:
md_profiler.stop()
assert "MD ProfilingManager has not started yet." in str(info)
# Start MindData Profiling
md_profiler.start()
# Stop MindData Profiling - to return profiler to a healthy state
md_profiler.stop()