From 5c8ab5f60cd49543733c4704b24fd76475579b37 Mon Sep 17 00:00:00 2001 From: mohammad Date: Mon, 15 Nov 2021 11:59:50 -0500 Subject: [PATCH] add MD Profiler Save() --- .../python/bindings/dataset/core/bindings.cc | 2 - .../bindings/dataset/engine/perf/bindings.cc | 15 +- .../minddata/dataset/core/config_manager.cc | 6 - .../minddata/dataset/core/config_manager.h | 18 --- .../dataset/engine/perf/connector_size.cc | 56 +++---- .../dataset/engine/perf/connector_size.h | 10 +- .../dataset/engine/perf/cpu_sampler.cc | 27 ++-- .../dataset/engine/perf/cpu_sampler.h | 7 +- .../engine/perf/dataset_iterator_tracing.cc | 6 +- .../engine/perf/dataset_iterator_tracing.h | 5 +- .../engine/perf/device_queue_tracing.cc | 7 +- .../engine/perf/device_queue_tracing.h | 6 +- .../minddata/dataset/engine/perf/monitor.cc | 24 +-- .../minddata/dataset/engine/perf/monitor.h | 4 + .../minddata/dataset/engine/perf/profiling.cc | 142 ++++++++---------- .../minddata/dataset/engine/perf/profiling.h | 43 +++--- mindspore/dataset/core/config.py | 15 -- mindspore/profiler/profiling.py | 5 +- tests/ut/cpp/dataset/profiler_test.cc | 22 +-- tests/ut/python/dataset/test_profiling.py | 73 ++++++++- .../profiler/parser/test_minddata_analyzer.py | 25 ++- 21 files changed, 280 insertions(+), 238 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc index 0e42d34ed9a..bacf1ef0b2c 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc @@ -49,8 +49,6 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) { .def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_) .def("set_callback_timeout", &ConfigManager::set_callback_timeout) .def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval) - .def("stop_dataset_profiler", &ConfigManager::stop_dataset_profiler) - .def("get_profiler_file_status", &ConfigManager::get_profiler_file_status) .def("set_num_parallel_workers", [](ConfigManager &c, int32_t num) { THROW_IF_ERROR(c.set_num_parallel_workers(num)); }) .def("set_op_connector_size", &ConfigManager::set_op_connector_size) diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/perf/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/perf/bindings.cc index 80dc17671bf..9b64076c811 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/perf/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/perf/bindings.cc @@ -21,14 +21,15 @@ namespace mindspore { namespace dataset { PYBIND_REGISTER(ProfilingManager, 0, ([](const py::module *m) { (void)py::class_>(*m, "ProfilingManager") - .def( - "init", - [](ProfilingManager &prof_mgr, const std::string &profile_data_path) { - THROW_IF_ERROR(prof_mgr.Init(profile_data_path)); - }, - py::arg("profile_data_path")) + .def("init", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Init()); }) .def("start", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Start()); }) - .def("stop", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Stop()); }); + .def("stop", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Stop()); }) + .def( + "save", + [](ProfilingManager &prof_mgr, const std::string &profile_data_path) { + THROW_IF_ERROR(prof_mgr.Save(profile_data_path)); + }, + py::arg("profile_data_path")); })); } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc index 71d285bdaca..be75512b67a 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc @@ -41,8 +41,6 @@ ConfigManager::ConfigManager() seed_(kCfgDefaultSeed), numa_enable_(false), monitor_sampling_interval_(kCfgMonitorSamplingInterval), - stop_profiler_(false), - file_ready_(true), callback_timout_(kCfgCallbackTimeout), cache_host_(kCfgDefaultCacheHost), cache_port_(kCfgDefaultCachePort), @@ -150,10 +148,6 @@ void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; } void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; } -void ConfigManager::stop_dataset_profiler(bool stop_profiler) { stop_profiler_ = stop_profiler; } - -void ConfigManager::set_profiler_file_status(bool file_ready) { file_ready_ = file_ready; } - void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; } void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); } diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.h b/mindspore/ccsrc/minddata/dataset/core/config_manager.h index db6e4bbba3a..1dde9997d15 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.h +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.h @@ -180,22 +180,6 @@ class ConfigManager { // @return The interval of monitor sampling int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; } - // setter function - // @param stop_profiler - The setting to apply to the config - void stop_dataset_profiler(bool stop_profiler); - - // getter function - // @return The status of stop profiler - bool stop_profiler_status() const { return stop_profiler_; } - - // setter function - // @param file_ready - The setting to apply to the config - void set_profiler_file_status(bool file_ready); - - // getter function - // @return The status of profiler file, whether generated - bool get_profiler_file_status() const { return file_ready_; } - // setter function // @param auto_num_workers - whether assign threads to each op automatically void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; } @@ -249,8 +233,6 @@ class ConfigManager { int32_t rank_id_; uint32_t seed_; uint32_t monitor_sampling_interval_; - std::atomic_bool stop_profiler_; - std::atomic_bool file_ready_; uint32_t callback_timout_; std::string cache_host_; int32_t cache_port_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc index 428bd018a11..30e78732d71 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc @@ -40,7 +40,7 @@ Status ConnectorSize::Sample() { } // JSON serializer helper function -json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector &size) { +json ConnectorSize::ParseOpInfo(const DatasetOp &node) { json json_node; json_node["op_id"] = node.id(); json_node["op_type"] = node.Name(); @@ -49,7 +49,7 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vectormonitor_sampling_interval(); + + // Traverse the JSON initialized in Init() to access each op's information + CHECK_FAIL_RETURN_UNEXPECTED(output.contains("op_info"), "JSON data does not include op_info!"); + for (uint32_t idx = 0; idx < output["op_info"].size(); idx++) { std::vector cur_queue_size; (void)std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size), [&](const ConnectorSizeSample &sample) { return sample[idx]; }); - if (!path.Exists()) { - json json_node = ParseOpInfo(node, cur_queue_size); - output["op_info"].push_back(json_node); - } else { - if (!node.inlined() && node.Name() != "DeviceQueueOp") { - auto &ops_data = output["op_info"]; - ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size; - ops_data[idx]["metrics"]["output_queue"]["length"] = node.ConnectorCapacity(); - } - } - idx++; + auto &ops_data = output["op_info"]; + if (ops_data[idx]["metrics"].contains("output_queue") && ops_data[idx]["op_type"] != "DeviceQueueOp") { + ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size; + } } // Discard the content of the file when opening. - std::ofstream os(file_path_, std::ios::trunc); + std::ofstream os(file_path, std::ios::trunc); os << output; os.close(); return Status::OK(); } -Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) { - file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).ToString(); - Path path = Path(file_path_); - // Remove the file if it exists (from prior profiling usage) - RETURN_IF_NOT_OK(path.Remove()); +Status ConnectorSize::Init() { + // Traverse the ExecutionTree for JSON node generation + for (auto &node : *tree_) { + json json_node = ParseOpInfo(node); + initial_nodes_data["op_info"].push_back(json_node); + } + return Status::OK(); } @@ -139,5 +137,9 @@ Status ConnectorSize::GetOpConnectorSize(int32_t op_id, uint64_t start_time, uin return Status::OK(); } + +Path ConnectorSize::GetFileName(const std::string &dir_path, const std::string &rank_id) { + return Path(dir_path) / Path("pipeline_profiling_" + rank_id + ".json"); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h index ac46c433d1c..824b9115bb1 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h @@ -55,15 +55,15 @@ class ConnectorSize : public Sampling { // Save sampling data to file // @return Status The status code returned - Status SaveToFile() override; + Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override; - Status Init(const std::string &dir_path, const std::string &device_id) override; + Status Init() override; // Parse op information and transform to json format - json ParseOpInfo(const DatasetOp &node, const std::vector &size); + json ParseOpInfo(const DatasetOp &node); // Change file mode after save throughput data - Status ChangeFileMode() override { return Status::OK(); } + Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override { return Status::OK(); } Status Analyze() override; @@ -71,9 +71,11 @@ class ConnectorSize : public Sampling { Status GetOpConnectorSize(int32_t op_id, uint64_t start_time, uint64_t end_time, std::vector *result); private: + json initial_nodes_data; // store data when execution tree is running. (all information for ops except sampled data) ExecutionTree *tree_ = nullptr; // ExecutionTree pointer ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling Timestamps ts_; // time of sample + Path GetFileName(const std::string &dir_path, const std::string &rank_id) override; }; } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc index f79ba37cb59..7069981fbd1 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc @@ -386,14 +386,10 @@ Status CpuSampler::UpdateTaskList() { return Status::OK(); } -Status CpuSampler::Init(const std::string &dir_path, const std::string &device_id) { +Status CpuSampler::Init() { #if defined(USING_LINUX) main_pid_ = syscall(SYS_getpid); #endif - auto path = Path(dir_path) / Path("minddata_cpu_utilization_" + device_id + ".json"); - // remove file if it already exists - RETURN_IF_NOT_OK(path.Remove()); - file_path_ = path.ToString(); for (auto iter = tree->begin(); iter != tree->end(); iter++) { auto op_id = iter->id(); (void)op_info_by_id_.emplace(std::make_pair(op_id, MDOperatorCpuInfo(op_id))); @@ -406,15 +402,22 @@ Status CpuSampler::Init(const std::string &dir_path, const std::string &device_i return Status::OK(); } -Status CpuSampler::ChangeFileMode() { - if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) { - std::string err_str = "Change file mode failed," + file_path_; +Status CpuSampler::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) { + Path path = GetFileName(dir_path, rank_id); + std::string file_path = path.ToString(); + if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) { + std::string err_str = "Change file mode failed," + file_path; return Status(StatusCode::kMDUnexpectedError, err_str); } return Status::OK(); } -Status CpuSampler::SaveToFile() { +Status CpuSampler::SaveToFile(const std::string &dir_path, const std::string &rank_id) { + Path path = GetFileName(dir_path, rank_id); + // Remove the file if it exists (from prior profiling usage) + RETURN_IF_NOT_OK(path.Remove()); + std::string file_path = path.ToString(); + // construct json obj to write to file json output; output["cpu_processor_num"] = SystemCpuInfo::num_cpu_; @@ -448,7 +451,7 @@ Status CpuSampler::SaveToFile() { output["time_stamp"] = ts_; // Discard the content of the file when opening. - std::ofstream os(file_path_, std::ios::trunc); + std::ofstream os(file_path, std::ios::trunc); os << output; os.close(); @@ -508,5 +511,9 @@ Status CpuSampler::GetSystemSysCpuUtil(uint64_t start_ts, uint64_t end_ts, std:: auto end_index = std::distance(ts_.begin(), upper); return sys_cpu_info_.GetSysCpuUtil(start_index, end_index, result); } + +Path CpuSampler::GetFileName(const std::string &dir_path, const std::string &rank_id) { + return Path(dir_path) / Path("minddata_cpu_utilization_" + rank_id + ".json"); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.h index 6857c6ca70f..291822af10a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.h @@ -142,9 +142,9 @@ class CpuSampler : public Sampling { explicit CpuSampler(ExecutionTree *tree) : fetched_all_python_multiprocesses_(false), tree(tree) {} ~CpuSampler() = default; Status Sample() override; - Status Init(const std::string &dir_path, const std::string &device_id) override; - Status ChangeFileMode() override; - Status SaveToFile() override; + Status Init() override; + Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override; + Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override; std::string Name() const override { return kCpuSamplerName; } Status Analyze() override; Status GetSystemUserCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector *result); @@ -163,6 +163,7 @@ class CpuSampler : public Sampling { std::shared_ptr main_thread_cpu_info_; std::shared_ptr main_process_cpu_info_; std::unordered_map op_info_by_id_; + Path GetFileName(const std::string &dir_path, const std::string &rank_id) override; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc index 2d534be31c9..a8521fc710e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc @@ -29,8 +29,7 @@ namespace dataset { constexpr int32_t CONNECTOR_DEPTH_OFFSET = 0; -Status DatasetIteratorTracing::Init(const std::string &dir_path, const std::string &device_id) { - file_path_ = (Path(dir_path) / Path("dataset_iterator_profiling_" + device_id + ".txt")).ToString(); +Status DatasetIteratorTracing::Init() { (void)ts_.emplace_back(0); return Status::OK(); } @@ -80,5 +79,8 @@ Status DatasetIteratorTracing::GetConnectorCapacity(int32_t start_step, int32_t return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result); } +Path DatasetIteratorTracing::GetFileName(const std::string &dir_path, const std::string &rank_id) { + return Path(dir_path) / Path("dataset_iterator_profiling_" + rank_id + ".txt"); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h index aa050e12a4b..a1c605e44ee 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h @@ -34,7 +34,7 @@ class DatasetIteratorTracing : public Tracing { std::string Name() const override { return kDatasetIteratorTracingName; }; - Status Init(const std::string &dir_path, const std::string &device_id) override; + Status Init() override; Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector *result) override; Status GetPushTime(int32_t start_step, int32_t end_step, std::vector *result) override; @@ -42,6 +42,9 @@ class DatasetIteratorTracing : public Tracing { Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector *result) override; Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector *result) override; Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override; + + private: + Path GetFileName(const std::string &dir_path, const std::string &rank_id) override; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc index a7665f2feb0..a88c5d9602f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc @@ -33,8 +33,7 @@ constexpr int32_t BATCH_TIME_OFFSET = 1; constexpr int32_t PIPELINE_TIME_OFFSET = 2; constexpr int32_t CONNECTOR_DEPTH_OFFSET = 3; -Status DeviceQueueTracing::Init(const std::string &dir_path, const std::string &device_id) { - file_path_ = (Path(dir_path) / Path("device_queue_profiling_" + device_id + ".txt")).ToString(); +Status DeviceQueueTracing::Init() { (void)ts_.emplace_back(0); return Status::OK(); } @@ -82,5 +81,9 @@ Status DeviceQueueTracing::GetEmptyQueueFrequency(int32_t start_step, int32_t en Status DeviceQueueTracing::GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector *result) { return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result); } + +Path DeviceQueueTracing::GetFileName(const std::string &dir_path, const std::string &rank_id) { + return Path(dir_path) / Path("device_queue_profiling_" + rank_id + ".txt"); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h index 7e64a1bad06..247ac3245f2 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h @@ -20,6 +20,7 @@ #include #include #include "minddata/dataset/engine/perf/profiling.h" +#include "minddata/dataset/util/path.h" namespace mindspore { namespace dataset { @@ -34,7 +35,7 @@ class DeviceQueueTracing : public Tracing { std::string Name() const override { return kDeviceQueueTracingName; }; - Status Init(const std::string &dir_path, const std::string &device_id) override; + Status Init() override; Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector *result) override; Status GetPushTime(int32_t start_step, int32_t end_step, std::vector *result) override; @@ -42,6 +43,9 @@ class DeviceQueueTracing : public Tracing { Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector *result) override; Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector *result) override; Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override; + + private: + Path GetFileName(const std::string &dir_path, const std::string &rank_id) override; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc index f67f4d9ee9f..2cd36cb4410 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc @@ -29,39 +29,23 @@ Monitor::Monitor(ProfilingManager *profiling_manager) : profiling_manager_(profi Status Monitor::operator()() { // Register this thread with TaskManager to receive proper interrupt signal. TaskManager::FindMe()->Post(); - std::shared_ptr cfg = GlobalContext::config_manager(); - cfg->set_profiler_file_status(false); + std::unique_lock _lock(mux_); // Keep sampling if // 1) Monitor Task is not interrupted by TaskManager AND // 2) Iterator has not received EOF - // this will trigger a save on 2min, 4min, 8min, 16min ... mark on top of the save per_epoch - // The idea is whenever training is interrupted, you will get at least half of the sampling data during training - constexpr int64_t num_ms_in_two_minutes = 120000; - int64_t save_interval = 1 + (num_ms_in_two_minutes / sampling_interval_); - int64_t loop_cnt = 1; - constexpr int64_t geometric_series_ratio = 2; - while (!this_thread::is_interrupted() && !(tree_->isFinished()) && !(cfg->stop_profiler_status())) { + while (!this_thread::is_interrupted() && !(tree_->isFinished())) { if (tree_->IsEpochEnd()) { - RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData()); tree_->SetExecuting(); - } else if (loop_cnt % save_interval == 0) { - RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData()); } for (auto &node : profiling_manager_->GetSamplingNodes()) { RETURN_IF_NOT_OK(node.second->Sample()); } - if (loop_cnt % save_interval == 0) save_interval *= geometric_series_ratio; - loop_cnt += 1; - std::this_thread::sleep_for(std::chrono::milliseconds(sampling_interval_)); + RETURN_IF_NOT_OK(cv_.WaitFor(&_lock, sampling_interval_)); } + MS_LOG(INFO) << "Monitor Thread terminating..."; - // Output all profiling data upon request. - RETURN_IF_NOT_OK(profiling_manager_->Analyze(true)); - RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData(true)); - RETURN_IF_NOT_OK(profiling_manager_->ChangeFileMode(true)); - cfg->set_profiler_file_status(true); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h index ed023d8e0e9..5d4cfc39fd2 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h @@ -18,9 +18,11 @@ #define MINDSPORE_MONITOR_H #include +#include #include #include #include "minddata/dataset/engine/perf/profiling.h" +#include "minddata/dataset/util/cond_var.h" #include "minddata/dataset/util/status.h" namespace mindspore { @@ -46,6 +48,8 @@ class Monitor { ProfilingManager *profiling_manager_; int64_t sampling_interval_; ExecutionTree *tree_; + std::mutex mux_; + CondVar cv_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc index b2b3c9cd038..55ce0d0efb8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc @@ -48,12 +48,18 @@ Status Profiling::Stop() { return Status::OK(); } -Status Tracing::SaveToFile() { +Status Tracing::SaveToFile(const std::string &dir_path, const std::string &rank_id) { if (value_.empty()) { return Status::OK(); } - std::ofstream handle(file_path_, std::ios::trunc); + Path path = GetFileName(dir_path, rank_id); + // Remove the file if it exists (from prior profiling usage) + RETURN_IF_NOT_OK(path.Remove()); + std::string file_path = path.ToString(); + + MS_LOG(INFO) << "Start to save profiling data for a tracing node."; + std::ofstream handle(file_path, std::ios::trunc); if (!handle.is_open()) { RETURN_STATUS_UNEXPECTED("Profiling file can not be opened."); } @@ -65,13 +71,15 @@ Status Tracing::SaveToFile() { return Status::OK(); } -Status Tracing::ChangeFileMode() { +Status Tracing::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) { if (value_.empty()) { return Status::OK(); } - if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) { - std::string err_str = "Change file mode failed," + file_path_; + Path path = GetFileName(dir_path, rank_id); + std::string file_path = path.ToString(); + if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) { + std::string err_str = "Change file mode failed," + file_path; return Status(StatusCode::kMDUnexpectedError, err_str); } return Status::OK(); @@ -172,24 +180,6 @@ Status Tracing::GetRecordEntryFieldValue(int32_t start_step, int32_t end_step, i Tracing::Tracing(int32_t records_per_step) : records_per_step_(records_per_step) {} -Status Sampling::ReadJson(nlohmann::json *output) { - RETURN_UNEXPECTED_IF_NULL(output); - Path path = Path(file_path_); - if (path.Exists()) { - MS_LOG(DEBUG) << file_path_ << " exists"; - try { - std::ifstream file(file_path_); - file >> (*output); - } catch (const std::exception &err) { - RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ + - ", please delete it and try again!"); - } - } else { - (*output)["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval(); - } - return Status::OK(); -} - // Constructor ProfilingManager::ProfilingManager() : profiling_state_(ProfilingState::kProfilingStateUnBegun), enabled_(false) {} @@ -202,24 +192,7 @@ Status ProfilingManager::RegisterTree(TreeAdapter *tree_adapter) { perf_monitor_ = std::make_unique(this); -#ifdef ENABLE_GPUQUE - std::shared_ptr cfg = GlobalContext::config_manager(); - int32_t rank_id = cfg->rank_id(); - // If DEVICE_ID is not set, default value is 0 - if (rank_id < 0) { - device_id_ = common::GetEnv("DEVICE_ID"); - } else { - device_id_ = std::to_string(rank_id); - } -#else - device_id_ = common::GetEnv("RANK_ID"); -#endif - // If RANK_ID is not set, default value is 0 - if (device_id_.empty()) { - device_id_ = "0"; - } // Register all profiling node. - std::shared_ptr connector_size_sampling = std::make_shared(tree_); RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling)); @@ -249,7 +222,7 @@ Status ProfilingManager::RegisterTracingNode(std::shared_ptr node) { return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name()); } // Register the node with its name as key. - RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_)); + RETURN_IF_NOT_OK(node->Init()); tracing_nodes_[node->Name()] = node; // the user may have already started profiling. @@ -279,7 +252,7 @@ Status ProfilingManager::RegisterSamplingNode(std::shared_ptr node) { return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name()); } // Register the node with its name as key. - RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_)); + RETURN_IF_NOT_OK(node->Init()); sampling_nodes_[node->Name()] = node; // the user may have already started profiling. @@ -301,25 +274,19 @@ Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_pt return Status::OK(); } -Status ProfilingManager::SaveProfilingData(const bool final_round) { - if ((!IsProfilingEnable()) && !final_round) { - return Status::OK(); - } +Status ProfilingManager::SaveProfilingData(const std::string &dir_path, const std::string &rank_id) { MS_LOG(INFO) << "Start to save profiling data."; for (auto node : tracing_nodes_) { - RETURN_IF_NOT_OK(node.second->SaveToFile()); + RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id)); } for (auto node : sampling_nodes_) { - RETURN_IF_NOT_OK(node.second->SaveToFile()); + RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id)); } MS_LOG(INFO) << "Save profiling data end."; return Status::OK(); } -Status ProfilingManager::Analyze(const bool final_round) { - if (!IsProfilingEnable() && !final_round) { - return Status::OK(); - } +Status ProfilingManager::Analyze() { MS_LOG(INFO) << "Start to analyze profiling data."; for (auto node : sampling_nodes_) { RETURN_IF_NOT_OK(node.second->Analyze()); @@ -327,16 +294,13 @@ Status ProfilingManager::Analyze(const bool final_round) { return Status::OK(); } -Status ProfilingManager::ChangeFileMode(const bool final_round) { - if (!IsProfilingEnable() && !final_round) { - return Status::OK(); - } +Status ProfilingManager::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) { MS_LOG(INFO) << "Start to change file mode."; for (auto node : tracing_nodes_) { - RETURN_IF_NOT_OK(node.second->ChangeFileMode()); + RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id)); } for (auto node : sampling_nodes_) { - RETURN_IF_NOT_OK(node.second->ChangeFileMode()); + RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id)); } MS_LOG(INFO) << "Change file mode end."; return Status::OK(); @@ -642,23 +606,7 @@ Status ProfilingManager::Reset() { return Status::OK(); } -Status ProfilingManager::Init(const std::string &profile_data_path = "") { - // Validate input profile data path - CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set."); - CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid."); - - char real_path[PATH_MAX] = {0}; -#if defined(_WIN32) || defined(_WIN64) - if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) { - RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); - } -#else - if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) { - RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory."); - } -#endif - dir_path_ = real_path; - +Status ProfilingManager::Init() { if (profiling_state_ == ProfilingState::kProfilingStateFinished) { profiling_state_ = ProfilingState::kProfilingStateUnBegun; } @@ -666,7 +614,7 @@ Status ProfilingManager::Init(const std::string &profile_data_path = "") { // Enable profiling enabled_ = true; - MS_LOG(INFO) << "MD profiler is initialized with directory path: " << dir_path_; + MS_LOG(INFO) << "MD profiler is initialized successfully."; return Status::OK(); } @@ -708,6 +656,48 @@ Status ProfilingManager::Stop() { return Status::OK(); } +Status ProfilingManager::Save(const std::string &profile_data_path) { + // Validate input profile data path + CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set."); + CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid."); + + // profiling file: /filename_rank_id.suffix + char real_path[PATH_MAX] = {0}; +#if defined(_WIN32) || defined(_WIN64) + if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) { + RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); + } +#else + if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) { + RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory."); + } +#endif + + std::string rank_id; +#ifdef ENABLE_GPUQUE + std::shared_ptr cfg = GlobalContext::config_manager(); + int32_t rank_id_int = cfg->rank_id(); + // If DEVICE_ID is not set, default value is 0 + if (rank_id_int < 0) { + rank_id = common::GetEnv("DEVICE_ID"); + } else { + rank_id = std::to_string(rank_id_int); + } +#else + rank_id = common::GetEnv("RANK_ID"); +#endif + // If RANK_ID is not set, default value is 0 + if (rank_id.empty()) { + rank_id = "0"; + } + + // Output all profiling data upon request. + RETURN_IF_NOT_OK(Analyze()); + RETURN_IF_NOT_OK(SaveProfilingData(std::string(profile_data_path), rank_id)); + RETURN_IF_NOT_OK(ChangeFileMode(std::string(profile_data_path), rank_id)); + return Status::OK(); +} + uint64_t ProfilingTime::GetCurMilliSecond() { // because cpplint does not allow using namespace using std::chrono::duration_cast; diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h index 5fc44e807c6..d03a7cf318b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h @@ -16,13 +16,15 @@ #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_ #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_ -#include -#include -#include -#include +#include #include +#include #include +#include +#include +#include #include +#include "minddata/dataset/util/path.h" #include "minddata/dataset/util/status.h" #include "minddata/dataset/engine/perf/monitor.h" @@ -50,15 +52,15 @@ class Profiling : std::enable_shared_from_this { // Destructor virtual ~Profiling() = default; - virtual Status Init(const std::string &dir_path, const std::string &device_id) = 0; + virtual Status Init() = 0; // Default serialization file generator - virtual Status SaveToFile() = 0; + virtual Status SaveToFile(const std::string &dir_path, const std::string &rank_id) = 0; // Profiling name virtual std::string Name() const = 0; - virtual Status ChangeFileMode() = 0; + virtual Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) = 0; // Start collecting data Status Start(); @@ -68,8 +70,8 @@ class Profiling : std::enable_shared_from_this { protected: bool active_; // show current state of ProfilingManager (running, or paused) - std::string file_path_; std::mutex lock_; + virtual Path GetFileName(const std::string &dir_path, const std::string &rank_id) = 0; }; // Sampling is a class of profiling which generate samples periodically. @@ -80,7 +82,6 @@ class Sampling : public Profiling { // virtual Status TestPrint() = 0; virtual Status Analyze() = 0; virtual ~Sampling() = default; - Status ReadJson(nlohmann::json *output); }; typedef struct TracingRecord_s { @@ -101,8 +102,8 @@ class Tracing : public Profiling { public: // Tracing has minimal interface to provide flexible on data recording. // It only includes some common routines. - Status SaveToFile() override; - Status ChangeFileMode() override; + Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override; + Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override; virtual Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector *result) = 0; virtual Status GetPushTime(int32_t start_step, int32_t end_step, std::vector *result) = 0; virtual Status GetBatchTime(int32_t start_step, int32_t end_step, std::vector *result) = 0; @@ -149,9 +150,9 @@ class ProfilingManager { Status Reset(); // Save profile data to file - // @param final_round The boolean to show if the monitor thread is terminating. + // @param dir_path_ The path to the directory where the profiling data will be saved. // @return Status The status code returned - Status SaveProfilingData(const bool final_round = false); + Status SaveProfilingData(const std::string &dir_path, const std::string &rank_id); // Sampling node getter // @param name - The name of the requested node @@ -177,14 +178,12 @@ class ProfilingManager { // Launch monitoring thread. Status LaunchMonitor(); - // @param final_round The boolean to show if the monitor thread is terminating. // @return Status The status code returned - Status ChangeFileMode(const bool final_round = false); + Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id); // Analyze profile data and print warning messages - // @param final_round The boolean to show if the monitor thread is terminating. // @return Status The status code returned - Status Analyze(const bool final_round = false); + Status Analyze(); #ifndef ENABLE_ANDROID /// \brief API to get User CPU utilization for the system @@ -424,7 +423,7 @@ class ProfilingManager { /// \brief API to initialize profiling manager /// \return Status object with the error code - Status Init(const std::string &profile_data_path); + Status Init(); /// \brief API to signal the profiling nodes to start collecting data /// \return Status object with the error code @@ -434,6 +433,10 @@ class ProfilingManager { /// \return Status object with the error code Status Stop(); + /// \brief API to save to file all the collected data between Start and Stop calls + /// \return Status object with the error code + Status Save(const std::string &profile_data_path); + private: std::unique_ptr perf_monitor_; @@ -444,12 +447,10 @@ class ProfilingManager { kProfilingStateFinished, }; ProfilingState profiling_state_; // show current state of ProfilingManager (running, or paused) - bool enabled_; + std::atomic enabled_; std::unordered_map> tracing_nodes_; std::unordered_map> sampling_nodes_; ExecutionTree *tree_; // ExecutionTree pointer - std::string dir_path_; // where to create profiling file - std::string device_id_; // used when create profiling file,filename_device_id.suffix std::vector epoch_end_ts_; // End of epoch timestamp std::vector epoch_end_step_; // End of epoch step number diff --git a/mindspore/dataset/core/config.py b/mindspore/dataset/core/config.py index 7c0c1fb2e9a..b280fcd215f 100644 --- a/mindspore/dataset/core/config.py +++ b/mindspore/dataset/core/config.py @@ -25,7 +25,6 @@ Common imported modules in corresponding API examples are as follows: import os import platform import random -import time import numpy import mindspore._c_dataengine as cde from mindspore import log as logger @@ -411,20 +410,6 @@ def load(file): _config.load(file) -def _stop_dataset_profiler(): - """ - Mainly for stop dataset profiler. - - Returns: - bool, whether the profiler file has generated. - """ - - while not _config.get_profiler_file_status(): - _config.stop_dataset_profiler(True) - logger.warning("Profiling: waiting for dataset part profiling stop.") - time.sleep(1) - - def get_enable_shared_mem(): """ Get the default state of shared mem enabled variable. diff --git a/mindspore/profiler/profiling.py b/mindspore/profiler/profiling.py index 3ad0abf0cf6..5bc13494102 100644 --- a/mindspore/profiler/profiling.py +++ b/mindspore/profiler/profiling.py @@ -23,7 +23,6 @@ from mindspore import log as logger, context from mindspore.communication.management import GlobalComm, release, get_rank, get_group_size import mindspore._c_expression as c_expression import mindspore._c_dataengine as cde -from mindspore.dataset.core.config import _stop_dataset_profiler from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ ProfilerIOException, ProfilerException, ProfilerRawFileException from mindspore.profiler.common.util import get_file_path, fwrite_format @@ -144,7 +143,7 @@ class Profiler: # Setup and start MindData Profiling self._md_profiler = cde.GlobalContext.profiling_manager() - self._md_profiler.init(self._output_path) + self._md_profiler.init() self._md_profiler.start() if self._device_target: @@ -253,7 +252,7 @@ class Profiler: _environment_check() self._cpu_profiler.stop() self._md_profiler.stop() - _stop_dataset_profiler() + self._md_profiler.save(self._output_path) if self._device_target and self._device_target == "GPU": self._gpu_analyse() diff --git a/tests/ut/cpp/dataset/profiler_test.cc b/tests/ut/cpp/dataset/profiler_test.cc index 4bfb6c1a45a..2428677bd3d 100644 --- a/tests/ut/cpp/dataset/profiler_test.cc +++ b/tests/ut/cpp/dataset/profiler_test.cc @@ -50,7 +50,7 @@ TEST_F(MindDataTestProfiler, TestProfilerManager1) { // Enable profiler and check common::SetEnv("RANK_ID", "1"); std::shared_ptr profiler_manager = GlobalContext::profiling_manager(); - EXPECT_OK(profiler_manager->Init(".")); + EXPECT_OK(profiler_manager->Init()); EXPECT_OK(profiler_manager->Start()); EXPECT_TRUE(profiler_manager->IsProfilingEnable()); @@ -95,12 +95,13 @@ TEST_F(MindDataTestProfiler, TestProfilerManager1) { // Manually terminate the pipeline iter->Stop(); - // File_id is expected to equal RANK_ID - EXPECT_OK(DeleteFiles(1)); - - // Disable profiler + // Stop MindData Profiling and save output files to current working directory EXPECT_OK(profiler_manager->Stop()); EXPECT_FALSE(profiler_manager->IsProfilingEnable()); + EXPECT_OK(profiler_manager->Save(".")); + + // File_id is expected to equal RANK_ID + EXPECT_OK(DeleteFiles(1)); } /// Feature: MindData Profiling Support @@ -112,7 +113,7 @@ TEST_F(MindDataTestProfiler, TestProfilerManager2) { // Enable profiler and check common::SetEnv("RANK_ID", "2"); std::shared_ptr profiler_manager = GlobalContext::profiling_manager(); - EXPECT_OK(profiler_manager->Init(".")); + EXPECT_OK(profiler_manager->Init()); EXPECT_OK(profiler_manager->Start()); EXPECT_TRUE(profiler_manager->IsProfilingEnable()); @@ -147,12 +148,13 @@ TEST_F(MindDataTestProfiler, TestProfilerManager2) { // Manually terminate the pipeline iter->Stop(); - // File_id is expected to equal RANK_ID - EXPECT_OK(DeleteFiles(2)); - - // Disable profiler + // Stop MindData Profiling and save output files to current working directory EXPECT_OK(profiler_manager->Stop()); EXPECT_FALSE(profiler_manager->IsProfilingEnable()); + EXPECT_OK(profiler_manager->Save(".")); + + // File_id is expected to equal RANK_ID + EXPECT_OK(DeleteFiles(2)); } } // namespace test } // namespace dataset diff --git a/tests/ut/python/dataset/test_profiling.py b/tests/ut/python/dataset/test_profiling.py index 9676511dfdb..4e26cd4a662 100644 --- a/tests/ut/python/dataset/test_profiling.py +++ b/tests/ut/python/dataset/test_profiling.py @@ -77,8 +77,8 @@ class TestMinddataProfilingManager: os.environ['RANK_ID'] = file_id os.environ['DEVICE_ID'] = file_id - # Initialize MindData profiling manager with current working directory - self.md_profiler.init("./") + # Initialize MindData profiling manager + self.md_profiler.init() # Start MindData Profiling self.md_profiler.start() @@ -87,8 +87,6 @@ class TestMinddataProfilingManager: """ Run after each test function. """ - # Stop MindData Profiling - self.md_profiler.stop() file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] file_id = file_name_map_rank_id[file_name] @@ -155,6 +153,10 @@ class TestMinddataProfilingManager: for _ in data1: pass + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm profiling files now exist assert os.path.exists(pipeline_file) is True assert os.path.exists(cpu_util_file) is True @@ -186,6 +188,10 @@ class TestMinddataProfilingManager: for _ in data3: pass + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + with open(pipeline_file) as f: data = json.load(f) op_info = data["op_info"] @@ -238,6 +244,10 @@ class TestMinddataProfilingManager: assert num_iter == 10 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm pipeline is created with EpochCtrl op with open(pipeline_file) as f: data = json.load(f) @@ -280,6 +290,10 @@ class TestMinddataProfilingManager: for _ in data1: pass + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + with open(pipeline_file) as f: data = json.load(f) op_info = data["op_info"] @@ -316,6 +330,10 @@ class TestMinddataProfilingManager: ds.config.set_monitor_sampling_interval(interval_origin) + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + def test_profiling_basic_pipeline(self): """ Test with this basic pipeline @@ -347,6 +365,10 @@ class TestMinddataProfilingManager: assert num_iter == 1000 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + with open(pipeline_file) as f: data = json.load(f) op_info = data["op_info"] @@ -394,6 +416,10 @@ class TestMinddataProfilingManager: assert num_iter == 750 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + with open(pipeline_file) as f: data = json.load(f) op_info = data["op_info"] @@ -434,11 +460,20 @@ class TestMinddataProfilingManager: num_iter += 1 assert num_iter == 2 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm pipeline file and CPU util file each have 3 ops self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file) self.confirm_cpuutil(3, cpu_util_file) # Test B - Call create_dict_iterator with num_epochs=1 + + # Initialize and Start MindData profiling manager + self.md_profiler.init() + self.md_profiler.start() + num_iter = 0 # Note: If create_dict_iterator() is called with num_epochs=1, # then EpochCtrlOp should not be NOT added to the pipeline @@ -446,6 +481,10 @@ class TestMinddataProfilingManager: num_iter += 1 assert num_iter == 2 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm pipeline file and CPU util file each have 2 ops self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) self.confirm_cpuutil(2, cpu_util_file) @@ -473,11 +512,20 @@ class TestMinddataProfilingManager: num_iter += 1 assert num_iter == 4 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm pipeline file and CPU util file each have 2 ops self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) self.confirm_cpuutil(2, cpu_util_file) # Test B - Call create_dict_iterator with num_epochs>1 + + # Initialize and Start MindData profiling manager + self.md_profiler.init() + self.md_profiler.start() + num_iter = 0 # Note: If create_dict_iterator() is called with num_epochs>1, # then EpochCtrlOp should be added to the pipeline @@ -485,6 +533,10 @@ class TestMinddataProfilingManager: num_iter += 1 assert num_iter == 4 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm pipeline file and CPU util file each have 3 ops self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file) self.confirm_cpuutil(3, cpu_util_file) @@ -511,17 +563,30 @@ class TestMinddataProfilingManager: num_iter += 1 assert num_iter == 4 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm pipeline file and CPU util file each have 2 ops self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) self.confirm_cpuutil(2, cpu_util_file) # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline + + # Initialize and Start MindData profiling manager + self.md_profiler.init() + self.md_profiler.start() + data2 = data2.repeat(5) num_iter = 0 for _ in data2.create_dict_iterator(num_epochs=1): num_iter += 1 assert num_iter == 20 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save('./') + # Confirm pipeline file and CPU util file each have 3 ops self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file) self.confirm_cpuutil(3, cpu_util_file) diff --git a/tests/ut/python/profiler/parser/test_minddata_analyzer.py b/tests/ut/python/profiler/parser/test_minddata_analyzer.py index fb4ac19bdba..dea9e33141f 100644 --- a/tests/ut/python/profiler/parser/test_minddata_analyzer.py +++ b/tests/ut/python/profiler/parser/test_minddata_analyzer.py @@ -55,7 +55,6 @@ class TestMinddataProfilingAnalyzer: 'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops', 'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct'] - def setup_method(self): """ Run before each test function. @@ -81,19 +80,16 @@ class TestMinddataProfilingAnalyzer: os.environ['RANK_ID'] = file_id os.environ['DEVICE_ID'] = file_id - # Initialize MindData profiling manager with current working directory - self.md_profiler.init(os.getcwd()) + # Initialize MindData profiling manager + self.md_profiler.init() # Start MindData Profiling self.md_profiler.start() - def teardown_method(self): """ Run after each test function. """ - # Stop MindData Profiling - self.md_profiler.stop() file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] file_id = file_name_map_rank_id[file_name] @@ -219,6 +215,10 @@ class TestMinddataProfilingAnalyzer: # Confirm number of rows returned assert num_iter == 1000 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save(os.getcwd()) + # Confirm MindData Profiling files are created assert os.path.exists(pipeline_file) is True assert os.path.exists(cpu_util_file) is True @@ -279,6 +279,10 @@ class TestMinddataProfilingAnalyzer: # Confirm number of rows returned assert num_iter == 125 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save(os.getcwd()) + # Confirm MindData Profiling files are created assert os.path.exists(pipeline_file) is True assert os.path.exists(cpu_util_file) is True @@ -286,6 +290,11 @@ class TestMinddataProfilingAnalyzer: # Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1 # Note: This pipeline has 3 ops: Generator -> Map -> Batch + + # Initialize and Start MindData profiling manager + self.md_profiler.init() + self.md_profiler.start() + num_iter = 0 # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline for _ in data1.create_dict_iterator(num_epochs=1): @@ -294,6 +303,10 @@ class TestMinddataProfilingAnalyzer: # Confirm number of rows returned assert num_iter == 125 + # Stop MindData Profiling and save output files to current working directory + self.md_profiler.stop() + self.md_profiler.save(os.getcwd()) + # Confirm MindData Profiling files are created # Note: There is an MD bug in which which the pipeline file is not recreated; # it still has 4 ops instead of 3 ops