forked from mindspore-Ecosystem/mindspore
!26236 Add MD Profiler save()
Merge pull request !26236 from Mohammad Motallebi/add_profiler_save
This commit is contained in:
commit
0113b3eee1
|
@ -49,8 +49,6 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
|
||||||
.def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_)
|
.def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_)
|
||||||
.def("set_callback_timeout", &ConfigManager::set_callback_timeout)
|
.def("set_callback_timeout", &ConfigManager::set_callback_timeout)
|
||||||
.def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval)
|
.def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval)
|
||||||
.def("stop_dataset_profiler", &ConfigManager::stop_dataset_profiler)
|
|
||||||
.def("get_profiler_file_status", &ConfigManager::get_profiler_file_status)
|
|
||||||
.def("set_num_parallel_workers",
|
.def("set_num_parallel_workers",
|
||||||
[](ConfigManager &c, int32_t num) { THROW_IF_ERROR(c.set_num_parallel_workers(num)); })
|
[](ConfigManager &c, int32_t num) { THROW_IF_ERROR(c.set_num_parallel_workers(num)); })
|
||||||
.def("set_op_connector_size", &ConfigManager::set_op_connector_size)
|
.def("set_op_connector_size", &ConfigManager::set_op_connector_size)
|
||||||
|
|
|
@ -21,14 +21,15 @@ namespace mindspore {
|
||||||
namespace dataset {
|
namespace dataset {
|
||||||
PYBIND_REGISTER(ProfilingManager, 0, ([](const py::module *m) {
|
PYBIND_REGISTER(ProfilingManager, 0, ([](const py::module *m) {
|
||||||
(void)py::class_<ProfilingManager, std::shared_ptr<ProfilingManager>>(*m, "ProfilingManager")
|
(void)py::class_<ProfilingManager, std::shared_ptr<ProfilingManager>>(*m, "ProfilingManager")
|
||||||
.def(
|
.def("init", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Init()); })
|
||||||
"init",
|
|
||||||
[](ProfilingManager &prof_mgr, const std::string &profile_data_path) {
|
|
||||||
THROW_IF_ERROR(prof_mgr.Init(profile_data_path));
|
|
||||||
},
|
|
||||||
py::arg("profile_data_path"))
|
|
||||||
.def("start", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Start()); })
|
.def("start", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Start()); })
|
||||||
.def("stop", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Stop()); });
|
.def("stop", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Stop()); })
|
||||||
|
.def(
|
||||||
|
"save",
|
||||||
|
[](ProfilingManager &prof_mgr, const std::string &profile_data_path) {
|
||||||
|
THROW_IF_ERROR(prof_mgr.Save(profile_data_path));
|
||||||
|
},
|
||||||
|
py::arg("profile_data_path"));
|
||||||
}));
|
}));
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -41,8 +41,6 @@ ConfigManager::ConfigManager()
|
||||||
seed_(kCfgDefaultSeed),
|
seed_(kCfgDefaultSeed),
|
||||||
numa_enable_(false),
|
numa_enable_(false),
|
||||||
monitor_sampling_interval_(kCfgMonitorSamplingInterval),
|
monitor_sampling_interval_(kCfgMonitorSamplingInterval),
|
||||||
stop_profiler_(false),
|
|
||||||
file_ready_(true),
|
|
||||||
callback_timout_(kCfgCallbackTimeout),
|
callback_timout_(kCfgCallbackTimeout),
|
||||||
cache_host_(kCfgDefaultCacheHost),
|
cache_host_(kCfgDefaultCacheHost),
|
||||||
cache_port_(kCfgDefaultCachePort),
|
cache_port_(kCfgDefaultCachePort),
|
||||||
|
@ -150,10 +148,6 @@ void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }
|
||||||
|
|
||||||
void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }
|
void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }
|
||||||
|
|
||||||
void ConfigManager::stop_dataset_profiler(bool stop_profiler) { stop_profiler_ = stop_profiler; }
|
|
||||||
|
|
||||||
void ConfigManager::set_profiler_file_status(bool file_ready) { file_ready_ = file_ready; }
|
|
||||||
|
|
||||||
void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; }
|
void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; }
|
||||||
|
|
||||||
void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); }
|
void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); }
|
||||||
|
|
|
@ -180,22 +180,6 @@ class ConfigManager {
|
||||||
// @return The interval of monitor sampling
|
// @return The interval of monitor sampling
|
||||||
int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; }
|
int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; }
|
||||||
|
|
||||||
// setter function
|
|
||||||
// @param stop_profiler - The setting to apply to the config
|
|
||||||
void stop_dataset_profiler(bool stop_profiler);
|
|
||||||
|
|
||||||
// getter function
|
|
||||||
// @return The status of stop profiler
|
|
||||||
bool stop_profiler_status() const { return stop_profiler_; }
|
|
||||||
|
|
||||||
// setter function
|
|
||||||
// @param file_ready - The setting to apply to the config
|
|
||||||
void set_profiler_file_status(bool file_ready);
|
|
||||||
|
|
||||||
// getter function
|
|
||||||
// @return The status of profiler file, whether generated
|
|
||||||
bool get_profiler_file_status() const { return file_ready_; }
|
|
||||||
|
|
||||||
// setter function
|
// setter function
|
||||||
// @param auto_num_workers - whether assign threads to each op automatically
|
// @param auto_num_workers - whether assign threads to each op automatically
|
||||||
void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; }
|
void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; }
|
||||||
|
@ -249,8 +233,6 @@ class ConfigManager {
|
||||||
int32_t rank_id_;
|
int32_t rank_id_;
|
||||||
uint32_t seed_;
|
uint32_t seed_;
|
||||||
uint32_t monitor_sampling_interval_;
|
uint32_t monitor_sampling_interval_;
|
||||||
std::atomic_bool stop_profiler_;
|
|
||||||
std::atomic_bool file_ready_;
|
|
||||||
uint32_t callback_timout_;
|
uint32_t callback_timout_;
|
||||||
std::string cache_host_;
|
std::string cache_host_;
|
||||||
int32_t cache_port_;
|
int32_t cache_port_;
|
||||||
|
|
|
@ -40,7 +40,7 @@ Status ConnectorSize::Sample() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// JSON serializer helper function
|
// JSON serializer helper function
|
||||||
json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size) {
|
json ConnectorSize::ParseOpInfo(const DatasetOp &node) {
|
||||||
json json_node;
|
json json_node;
|
||||||
json_node["op_id"] = node.id();
|
json_node["op_id"] = node.id();
|
||||||
json_node["op_type"] = node.Name();
|
json_node["op_type"] = node.Name();
|
||||||
|
@ -49,7 +49,7 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t
|
||||||
// DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
|
// DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
|
||||||
// So we should not output its queue size.
|
// So we should not output its queue size.
|
||||||
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
|
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
|
||||||
metrics["output_queue"] = {{"size", size}, {"length", node.ConnectorCapacity()}};
|
metrics["output_queue"] = {{"length", node.ConnectorCapacity()}};
|
||||||
}
|
}
|
||||||
json_node["metrics"] = metrics;
|
json_node["metrics"] = metrics;
|
||||||
|
|
||||||
|
@ -65,44 +65,42 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save profiling data to file
|
// Save profiling data to file
|
||||||
// If the file is already exist (created by other sampling node), simply add the data to metrics field.
|
Status ConnectorSize::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
|
||||||
Status ConnectorSize::SaveToFile() {
|
Path path = GetFileName(dir_path, rank_id);
|
||||||
json output;
|
// Remove the file if it exists (from prior profiling usage)
|
||||||
RETURN_IF_NOT_OK(ReadJson(&output));
|
RETURN_IF_NOT_OK(path.Remove());
|
||||||
|
std::string file_path = path.ToString();
|
||||||
|
|
||||||
Path path = Path(file_path_);
|
json output = initial_nodes_data;
|
||||||
uint32_t idx = 0;
|
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
|
||||||
// Traverse the ExecutionTree for JSON node generation
|
|
||||||
for (auto &node : *tree_) {
|
// Traverse the JSON initialized in Init() to access each op's information
|
||||||
|
CHECK_FAIL_RETURN_UNEXPECTED(output.contains("op_info"), "JSON data does not include op_info!");
|
||||||
|
for (uint32_t idx = 0; idx < output["op_info"].size(); idx++) {
|
||||||
std::vector<int32_t> cur_queue_size;
|
std::vector<int32_t> cur_queue_size;
|
||||||
(void)std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size),
|
(void)std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size),
|
||||||
[&](const ConnectorSizeSample &sample) { return sample[idx]; });
|
[&](const ConnectorSizeSample &sample) { return sample[idx]; });
|
||||||
if (!path.Exists()) {
|
|
||||||
json json_node = ParseOpInfo(node, cur_queue_size);
|
|
||||||
output["op_info"].push_back(json_node);
|
|
||||||
} else {
|
|
||||||
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
|
|
||||||
auto &ops_data = output["op_info"];
|
|
||||||
ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size;
|
|
||||||
ops_data[idx]["metrics"]["output_queue"]["length"] = node.ConnectorCapacity();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
idx++;
|
auto &ops_data = output["op_info"];
|
||||||
|
if (ops_data[idx]["metrics"].contains("output_queue") && ops_data[idx]["op_type"] != "DeviceQueueOp") {
|
||||||
|
ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Discard the content of the file when opening.
|
// Discard the content of the file when opening.
|
||||||
std::ofstream os(file_path_, std::ios::trunc);
|
std::ofstream os(file_path, std::ios::trunc);
|
||||||
os << output;
|
os << output;
|
||||||
os.close();
|
os.close();
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) {
|
Status ConnectorSize::Init() {
|
||||||
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).ToString();
|
// Traverse the ExecutionTree for JSON node generation
|
||||||
Path path = Path(file_path_);
|
for (auto &node : *tree_) {
|
||||||
// Remove the file if it exists (from prior profiling usage)
|
json json_node = ParseOpInfo(node);
|
||||||
RETURN_IF_NOT_OK(path.Remove());
|
initial_nodes_data["op_info"].push_back(json_node);
|
||||||
|
}
|
||||||
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,5 +137,9 @@ Status ConnectorSize::GetOpConnectorSize(int32_t op_id, uint64_t start_time, uin
|
||||||
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Path ConnectorSize::GetFileName(const std::string &dir_path, const std::string &rank_id) {
|
||||||
|
return Path(dir_path) / Path("pipeline_profiling_" + rank_id + ".json");
|
||||||
|
}
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -55,15 +55,15 @@ class ConnectorSize : public Sampling {
|
||||||
|
|
||||||
// Save sampling data to file
|
// Save sampling data to file
|
||||||
// @return Status The status code returned
|
// @return Status The status code returned
|
||||||
Status SaveToFile() override;
|
Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
|
|
||||||
Status Init(const std::string &dir_path, const std::string &device_id) override;
|
Status Init() override;
|
||||||
|
|
||||||
// Parse op information and transform to json format
|
// Parse op information and transform to json format
|
||||||
json ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size);
|
json ParseOpInfo(const DatasetOp &node);
|
||||||
|
|
||||||
// Change file mode after save throughput data
|
// Change file mode after save throughput data
|
||||||
Status ChangeFileMode() override { return Status::OK(); }
|
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override { return Status::OK(); }
|
||||||
|
|
||||||
Status Analyze() override;
|
Status Analyze() override;
|
||||||
|
|
||||||
|
@ -71,9 +71,11 @@ class ConnectorSize : public Sampling {
|
||||||
Status GetOpConnectorSize(int32_t op_id, uint64_t start_time, uint64_t end_time, std::vector<int32_t> *result);
|
Status GetOpConnectorSize(int32_t op_id, uint64_t start_time, uint64_t end_time, std::vector<int32_t> *result);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
json initial_nodes_data; // store data when execution tree is running. (all information for ops except sampled data)
|
||||||
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
|
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
|
||||||
ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling
|
ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling
|
||||||
Timestamps ts_; // time of sample
|
Timestamps ts_; // time of sample
|
||||||
|
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
|
|
|
@ -386,14 +386,10 @@ Status CpuSampler::UpdateTaskList() {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status CpuSampler::Init(const std::string &dir_path, const std::string &device_id) {
|
Status CpuSampler::Init() {
|
||||||
#if defined(USING_LINUX)
|
#if defined(USING_LINUX)
|
||||||
main_pid_ = syscall(SYS_getpid);
|
main_pid_ = syscall(SYS_getpid);
|
||||||
#endif
|
#endif
|
||||||
auto path = Path(dir_path) / Path("minddata_cpu_utilization_" + device_id + ".json");
|
|
||||||
// remove file if it already exists
|
|
||||||
RETURN_IF_NOT_OK(path.Remove());
|
|
||||||
file_path_ = path.ToString();
|
|
||||||
for (auto iter = tree->begin(); iter != tree->end(); iter++) {
|
for (auto iter = tree->begin(); iter != tree->end(); iter++) {
|
||||||
auto op_id = iter->id();
|
auto op_id = iter->id();
|
||||||
(void)op_info_by_id_.emplace(std::make_pair(op_id, MDOperatorCpuInfo(op_id)));
|
(void)op_info_by_id_.emplace(std::make_pair(op_id, MDOperatorCpuInfo(op_id)));
|
||||||
|
@ -406,15 +402,22 @@ Status CpuSampler::Init(const std::string &dir_path, const std::string &device_i
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status CpuSampler::ChangeFileMode() {
|
Status CpuSampler::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
|
||||||
if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
|
Path path = GetFileName(dir_path, rank_id);
|
||||||
std::string err_str = "Change file mode failed," + file_path_;
|
std::string file_path = path.ToString();
|
||||||
|
if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) {
|
||||||
|
std::string err_str = "Change file mode failed," + file_path;
|
||||||
return Status(StatusCode::kMDUnexpectedError, err_str);
|
return Status(StatusCode::kMDUnexpectedError, err_str);
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status CpuSampler::SaveToFile() {
|
Status CpuSampler::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
|
||||||
|
Path path = GetFileName(dir_path, rank_id);
|
||||||
|
// Remove the file if it exists (from prior profiling usage)
|
||||||
|
RETURN_IF_NOT_OK(path.Remove());
|
||||||
|
std::string file_path = path.ToString();
|
||||||
|
|
||||||
// construct json obj to write to file
|
// construct json obj to write to file
|
||||||
json output;
|
json output;
|
||||||
output["cpu_processor_num"] = SystemCpuInfo::num_cpu_;
|
output["cpu_processor_num"] = SystemCpuInfo::num_cpu_;
|
||||||
|
@ -448,7 +451,7 @@ Status CpuSampler::SaveToFile() {
|
||||||
output["time_stamp"] = ts_;
|
output["time_stamp"] = ts_;
|
||||||
|
|
||||||
// Discard the content of the file when opening.
|
// Discard the content of the file when opening.
|
||||||
std::ofstream os(file_path_, std::ios::trunc);
|
std::ofstream os(file_path, std::ios::trunc);
|
||||||
os << output;
|
os << output;
|
||||||
os.close();
|
os.close();
|
||||||
|
|
||||||
|
@ -508,5 +511,9 @@ Status CpuSampler::GetSystemSysCpuUtil(uint64_t start_ts, uint64_t end_ts, std::
|
||||||
auto end_index = std::distance(ts_.begin(), upper);
|
auto end_index = std::distance(ts_.begin(), upper);
|
||||||
return sys_cpu_info_.GetSysCpuUtil(start_index, end_index, result);
|
return sys_cpu_info_.GetSysCpuUtil(start_index, end_index, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Path CpuSampler::GetFileName(const std::string &dir_path, const std::string &rank_id) {
|
||||||
|
return Path(dir_path) / Path("minddata_cpu_utilization_" + rank_id + ".json");
|
||||||
|
}
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -142,9 +142,9 @@ class CpuSampler : public Sampling {
|
||||||
explicit CpuSampler(ExecutionTree *tree) : fetched_all_python_multiprocesses_(false), tree(tree) {}
|
explicit CpuSampler(ExecutionTree *tree) : fetched_all_python_multiprocesses_(false), tree(tree) {}
|
||||||
~CpuSampler() = default;
|
~CpuSampler() = default;
|
||||||
Status Sample() override;
|
Status Sample() override;
|
||||||
Status Init(const std::string &dir_path, const std::string &device_id) override;
|
Status Init() override;
|
||||||
Status ChangeFileMode() override;
|
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
Status SaveToFile() override;
|
Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
std::string Name() const override { return kCpuSamplerName; }
|
std::string Name() const override { return kCpuSamplerName; }
|
||||||
Status Analyze() override;
|
Status Analyze() override;
|
||||||
Status GetSystemUserCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result);
|
Status GetSystemUserCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result);
|
||||||
|
@ -163,6 +163,7 @@ class CpuSampler : public Sampling {
|
||||||
std::shared_ptr<ThreadCpuInfo> main_thread_cpu_info_;
|
std::shared_ptr<ThreadCpuInfo> main_thread_cpu_info_;
|
||||||
std::shared_ptr<ProcessCpuInfo> main_process_cpu_info_;
|
std::shared_ptr<ProcessCpuInfo> main_process_cpu_info_;
|
||||||
std::unordered_map<int32_t, MDOperatorCpuInfo> op_info_by_id_;
|
std::unordered_map<int32_t, MDOperatorCpuInfo> op_info_by_id_;
|
||||||
|
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
};
|
};
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -29,8 +29,7 @@ namespace dataset {
|
||||||
|
|
||||||
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 0;
|
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 0;
|
||||||
|
|
||||||
Status DatasetIteratorTracing::Init(const std::string &dir_path, const std::string &device_id) {
|
Status DatasetIteratorTracing::Init() {
|
||||||
file_path_ = (Path(dir_path) / Path("dataset_iterator_profiling_" + device_id + ".txt")).ToString();
|
|
||||||
(void)ts_.emplace_back(0);
|
(void)ts_.emplace_back(0);
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -80,5 +79,8 @@ Status DatasetIteratorTracing::GetConnectorCapacity(int32_t start_step, int32_t
|
||||||
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
|
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Path DatasetIteratorTracing::GetFileName(const std::string &dir_path, const std::string &rank_id) {
|
||||||
|
return Path(dir_path) / Path("dataset_iterator_profiling_" + rank_id + ".txt");
|
||||||
|
}
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -34,7 +34,7 @@ class DatasetIteratorTracing : public Tracing {
|
||||||
|
|
||||||
std::string Name() const override { return kDatasetIteratorTracingName; };
|
std::string Name() const override { return kDatasetIteratorTracingName; };
|
||||||
|
|
||||||
Status Init(const std::string &dir_path, const std::string &device_id) override;
|
Status Init() override;
|
||||||
|
|
||||||
Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
|
@ -42,6 +42,9 @@ class DatasetIteratorTracing : public Tracing {
|
||||||
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
|
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
};
|
};
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -33,8 +33,7 @@ constexpr int32_t BATCH_TIME_OFFSET = 1;
|
||||||
constexpr int32_t PIPELINE_TIME_OFFSET = 2;
|
constexpr int32_t PIPELINE_TIME_OFFSET = 2;
|
||||||
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 3;
|
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 3;
|
||||||
|
|
||||||
Status DeviceQueueTracing::Init(const std::string &dir_path, const std::string &device_id) {
|
Status DeviceQueueTracing::Init() {
|
||||||
file_path_ = (Path(dir_path) / Path("device_queue_profiling_" + device_id + ".txt")).ToString();
|
|
||||||
(void)ts_.emplace_back(0);
|
(void)ts_.emplace_back(0);
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -82,5 +81,9 @@ Status DeviceQueueTracing::GetEmptyQueueFrequency(int32_t start_step, int32_t en
|
||||||
Status DeviceQueueTracing::GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
|
Status DeviceQueueTracing::GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
|
||||||
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
|
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Path DeviceQueueTracing::GetFileName(const std::string &dir_path, const std::string &rank_id) {
|
||||||
|
return Path(dir_path) / Path("device_queue_profiling_" + rank_id + ".txt");
|
||||||
|
}
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include "minddata/dataset/engine/perf/profiling.h"
|
#include "minddata/dataset/engine/perf/profiling.h"
|
||||||
|
#include "minddata/dataset/util/path.h"
|
||||||
|
|
||||||
namespace mindspore {
|
namespace mindspore {
|
||||||
namespace dataset {
|
namespace dataset {
|
||||||
|
@ -34,7 +35,7 @@ class DeviceQueueTracing : public Tracing {
|
||||||
|
|
||||||
std::string Name() const override { return kDeviceQueueTracingName; };
|
std::string Name() const override { return kDeviceQueueTracingName; };
|
||||||
|
|
||||||
Status Init(const std::string &dir_path, const std::string &device_id) override;
|
Status Init() override;
|
||||||
|
|
||||||
Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
|
@ -42,6 +43,9 @@ class DeviceQueueTracing : public Tracing {
|
||||||
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
|
||||||
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
|
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
};
|
};
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -29,39 +29,23 @@ Monitor::Monitor(ProfilingManager *profiling_manager) : profiling_manager_(profi
|
||||||
Status Monitor::operator()() {
|
Status Monitor::operator()() {
|
||||||
// Register this thread with TaskManager to receive proper interrupt signal.
|
// Register this thread with TaskManager to receive proper interrupt signal.
|
||||||
TaskManager::FindMe()->Post();
|
TaskManager::FindMe()->Post();
|
||||||
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
|
std::unique_lock<std::mutex> _lock(mux_);
|
||||||
cfg->set_profiler_file_status(false);
|
|
||||||
|
|
||||||
// Keep sampling if
|
// Keep sampling if
|
||||||
// 1) Monitor Task is not interrupted by TaskManager AND
|
// 1) Monitor Task is not interrupted by TaskManager AND
|
||||||
// 2) Iterator has not received EOF
|
// 2) Iterator has not received EOF
|
||||||
|
|
||||||
// this will trigger a save on 2min, 4min, 8min, 16min ... mark on top of the save per_epoch
|
while (!this_thread::is_interrupted() && !(tree_->isFinished())) {
|
||||||
// The idea is whenever training is interrupted, you will get at least half of the sampling data during training
|
|
||||||
constexpr int64_t num_ms_in_two_minutes = 120000;
|
|
||||||
int64_t save_interval = 1 + (num_ms_in_two_minutes / sampling_interval_);
|
|
||||||
int64_t loop_cnt = 1;
|
|
||||||
constexpr int64_t geometric_series_ratio = 2;
|
|
||||||
while (!this_thread::is_interrupted() && !(tree_->isFinished()) && !(cfg->stop_profiler_status())) {
|
|
||||||
if (tree_->IsEpochEnd()) {
|
if (tree_->IsEpochEnd()) {
|
||||||
RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData());
|
|
||||||
tree_->SetExecuting();
|
tree_->SetExecuting();
|
||||||
} else if (loop_cnt % save_interval == 0) {
|
|
||||||
RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData());
|
|
||||||
}
|
}
|
||||||
for (auto &node : profiling_manager_->GetSamplingNodes()) {
|
for (auto &node : profiling_manager_->GetSamplingNodes()) {
|
||||||
RETURN_IF_NOT_OK(node.second->Sample());
|
RETURN_IF_NOT_OK(node.second->Sample());
|
||||||
}
|
}
|
||||||
if (loop_cnt % save_interval == 0) save_interval *= geometric_series_ratio;
|
RETURN_IF_NOT_OK(cv_.WaitFor(&_lock, sampling_interval_));
|
||||||
loop_cnt += 1;
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(sampling_interval_));
|
|
||||||
}
|
}
|
||||||
|
MS_LOG(INFO) << "Monitor Thread terminating...";
|
||||||
|
|
||||||
// Output all profiling data upon request.
|
|
||||||
RETURN_IF_NOT_OK(profiling_manager_->Analyze(true));
|
|
||||||
RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData(true));
|
|
||||||
RETURN_IF_NOT_OK(profiling_manager_->ChangeFileMode(true));
|
|
||||||
cfg->set_profiler_file_status(true);
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
#define MINDSPORE_MONITOR_H
|
#define MINDSPORE_MONITOR_H
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include "minddata/dataset/engine/perf/profiling.h"
|
#include "minddata/dataset/engine/perf/profiling.h"
|
||||||
|
#include "minddata/dataset/util/cond_var.h"
|
||||||
#include "minddata/dataset/util/status.h"
|
#include "minddata/dataset/util/status.h"
|
||||||
|
|
||||||
namespace mindspore {
|
namespace mindspore {
|
||||||
|
@ -46,6 +48,8 @@ class Monitor {
|
||||||
ProfilingManager *profiling_manager_;
|
ProfilingManager *profiling_manager_;
|
||||||
int64_t sampling_interval_;
|
int64_t sampling_interval_;
|
||||||
ExecutionTree *tree_;
|
ExecutionTree *tree_;
|
||||||
|
std::mutex mux_;
|
||||||
|
CondVar cv_;
|
||||||
};
|
};
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
} // namespace mindspore
|
} // namespace mindspore
|
||||||
|
|
|
@ -48,12 +48,18 @@ Status Profiling::Stop() {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Tracing::SaveToFile() {
|
Status Tracing::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
|
||||||
if (value_.empty()) {
|
if (value_.empty()) {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::ofstream handle(file_path_, std::ios::trunc);
|
Path path = GetFileName(dir_path, rank_id);
|
||||||
|
// Remove the file if it exists (from prior profiling usage)
|
||||||
|
RETURN_IF_NOT_OK(path.Remove());
|
||||||
|
std::string file_path = path.ToString();
|
||||||
|
|
||||||
|
MS_LOG(INFO) << "Start to save profiling data for a tracing node.";
|
||||||
|
std::ofstream handle(file_path, std::ios::trunc);
|
||||||
if (!handle.is_open()) {
|
if (!handle.is_open()) {
|
||||||
RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
|
RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
|
||||||
}
|
}
|
||||||
|
@ -65,13 +71,15 @@ Status Tracing::SaveToFile() {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Tracing::ChangeFileMode() {
|
Status Tracing::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
|
||||||
if (value_.empty()) {
|
if (value_.empty()) {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
|
Path path = GetFileName(dir_path, rank_id);
|
||||||
std::string err_str = "Change file mode failed," + file_path_;
|
std::string file_path = path.ToString();
|
||||||
|
if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) {
|
||||||
|
std::string err_str = "Change file mode failed," + file_path;
|
||||||
return Status(StatusCode::kMDUnexpectedError, err_str);
|
return Status(StatusCode::kMDUnexpectedError, err_str);
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
@ -172,24 +180,6 @@ Status Tracing::GetRecordEntryFieldValue(int32_t start_step, int32_t end_step, i
|
||||||
|
|
||||||
Tracing::Tracing(int32_t records_per_step) : records_per_step_(records_per_step) {}
|
Tracing::Tracing(int32_t records_per_step) : records_per_step_(records_per_step) {}
|
||||||
|
|
||||||
Status Sampling::ReadJson(nlohmann::json *output) {
|
|
||||||
RETURN_UNEXPECTED_IF_NULL(output);
|
|
||||||
Path path = Path(file_path_);
|
|
||||||
if (path.Exists()) {
|
|
||||||
MS_LOG(DEBUG) << file_path_ << " exists";
|
|
||||||
try {
|
|
||||||
std::ifstream file(file_path_);
|
|
||||||
file >> (*output);
|
|
||||||
} catch (const std::exception &err) {
|
|
||||||
RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
|
|
||||||
", please delete it and try again!");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
(*output)["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
|
|
||||||
}
|
|
||||||
return Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Constructor
|
// Constructor
|
||||||
ProfilingManager::ProfilingManager() : profiling_state_(ProfilingState::kProfilingStateUnBegun), enabled_(false) {}
|
ProfilingManager::ProfilingManager() : profiling_state_(ProfilingState::kProfilingStateUnBegun), enabled_(false) {}
|
||||||
|
|
||||||
|
@ -202,24 +192,7 @@ Status ProfilingManager::RegisterTree(TreeAdapter *tree_adapter) {
|
||||||
|
|
||||||
perf_monitor_ = std::make_unique<Monitor>(this);
|
perf_monitor_ = std::make_unique<Monitor>(this);
|
||||||
|
|
||||||
#ifdef ENABLE_GPUQUE
|
|
||||||
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
|
|
||||||
int32_t rank_id = cfg->rank_id();
|
|
||||||
// If DEVICE_ID is not set, default value is 0
|
|
||||||
if (rank_id < 0) {
|
|
||||||
device_id_ = common::GetEnv("DEVICE_ID");
|
|
||||||
} else {
|
|
||||||
device_id_ = std::to_string(rank_id);
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
device_id_ = common::GetEnv("RANK_ID");
|
|
||||||
#endif
|
|
||||||
// If RANK_ID is not set, default value is 0
|
|
||||||
if (device_id_.empty()) {
|
|
||||||
device_id_ = "0";
|
|
||||||
}
|
|
||||||
// Register all profiling node.
|
// Register all profiling node.
|
||||||
|
|
||||||
std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
|
std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
|
||||||
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
|
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
|
||||||
|
|
||||||
|
@ -249,7 +222,7 @@ Status ProfilingManager::RegisterTracingNode(std::shared_ptr<Tracing> node) {
|
||||||
return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
|
return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
|
||||||
}
|
}
|
||||||
// Register the node with its name as key.
|
// Register the node with its name as key.
|
||||||
RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
|
RETURN_IF_NOT_OK(node->Init());
|
||||||
tracing_nodes_[node->Name()] = node;
|
tracing_nodes_[node->Name()] = node;
|
||||||
|
|
||||||
// the user may have already started profiling.
|
// the user may have already started profiling.
|
||||||
|
@ -279,7 +252,7 @@ Status ProfilingManager::RegisterSamplingNode(std::shared_ptr<Sampling> node) {
|
||||||
return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
|
return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
|
||||||
}
|
}
|
||||||
// Register the node with its name as key.
|
// Register the node with its name as key.
|
||||||
RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
|
RETURN_IF_NOT_OK(node->Init());
|
||||||
sampling_nodes_[node->Name()] = node;
|
sampling_nodes_[node->Name()] = node;
|
||||||
|
|
||||||
// the user may have already started profiling.
|
// the user may have already started profiling.
|
||||||
|
@ -301,25 +274,19 @@ Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_pt
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status ProfilingManager::SaveProfilingData(const bool final_round) {
|
Status ProfilingManager::SaveProfilingData(const std::string &dir_path, const std::string &rank_id) {
|
||||||
if ((!IsProfilingEnable()) && !final_round) {
|
|
||||||
return Status::OK();
|
|
||||||
}
|
|
||||||
MS_LOG(INFO) << "Start to save profiling data.";
|
MS_LOG(INFO) << "Start to save profiling data.";
|
||||||
for (auto node : tracing_nodes_) {
|
for (auto node : tracing_nodes_) {
|
||||||
RETURN_IF_NOT_OK(node.second->SaveToFile());
|
RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
|
||||||
}
|
}
|
||||||
for (auto node : sampling_nodes_) {
|
for (auto node : sampling_nodes_) {
|
||||||
RETURN_IF_NOT_OK(node.second->SaveToFile());
|
RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
|
||||||
}
|
}
|
||||||
MS_LOG(INFO) << "Save profiling data end.";
|
MS_LOG(INFO) << "Save profiling data end.";
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status ProfilingManager::Analyze(const bool final_round) {
|
Status ProfilingManager::Analyze() {
|
||||||
if (!IsProfilingEnable() && !final_round) {
|
|
||||||
return Status::OK();
|
|
||||||
}
|
|
||||||
MS_LOG(INFO) << "Start to analyze profiling data.";
|
MS_LOG(INFO) << "Start to analyze profiling data.";
|
||||||
for (auto node : sampling_nodes_) {
|
for (auto node : sampling_nodes_) {
|
||||||
RETURN_IF_NOT_OK(node.second->Analyze());
|
RETURN_IF_NOT_OK(node.second->Analyze());
|
||||||
|
@ -327,16 +294,13 @@ Status ProfilingManager::Analyze(const bool final_round) {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status ProfilingManager::ChangeFileMode(const bool final_round) {
|
Status ProfilingManager::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
|
||||||
if (!IsProfilingEnable() && !final_round) {
|
|
||||||
return Status::OK();
|
|
||||||
}
|
|
||||||
MS_LOG(INFO) << "Start to change file mode.";
|
MS_LOG(INFO) << "Start to change file mode.";
|
||||||
for (auto node : tracing_nodes_) {
|
for (auto node : tracing_nodes_) {
|
||||||
RETURN_IF_NOT_OK(node.second->ChangeFileMode());
|
RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
|
||||||
}
|
}
|
||||||
for (auto node : sampling_nodes_) {
|
for (auto node : sampling_nodes_) {
|
||||||
RETURN_IF_NOT_OK(node.second->ChangeFileMode());
|
RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
|
||||||
}
|
}
|
||||||
MS_LOG(INFO) << "Change file mode end.";
|
MS_LOG(INFO) << "Change file mode end.";
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
@ -642,23 +606,7 @@ Status ProfilingManager::Reset() {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status ProfilingManager::Init(const std::string &profile_data_path = "") {
|
Status ProfilingManager::Init() {
|
||||||
// Validate input profile data path
|
|
||||||
CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set.");
|
|
||||||
CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid.");
|
|
||||||
|
|
||||||
char real_path[PATH_MAX] = {0};
|
|
||||||
#if defined(_WIN32) || defined(_WIN64)
|
|
||||||
if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) {
|
|
||||||
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) {
|
|
||||||
RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
dir_path_ = real_path;
|
|
||||||
|
|
||||||
if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
|
if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
|
||||||
profiling_state_ = ProfilingState::kProfilingStateUnBegun;
|
profiling_state_ = ProfilingState::kProfilingStateUnBegun;
|
||||||
}
|
}
|
||||||
|
@ -666,7 +614,7 @@ Status ProfilingManager::Init(const std::string &profile_data_path = "") {
|
||||||
// Enable profiling
|
// Enable profiling
|
||||||
enabled_ = true;
|
enabled_ = true;
|
||||||
|
|
||||||
MS_LOG(INFO) << "MD profiler is initialized with directory path: " << dir_path_;
|
MS_LOG(INFO) << "MD profiler is initialized successfully.";
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -708,6 +656,48 @@ Status ProfilingManager::Stop() {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status ProfilingManager::Save(const std::string &profile_data_path) {
|
||||||
|
// Validate input profile data path
|
||||||
|
CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set.");
|
||||||
|
CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid.");
|
||||||
|
|
||||||
|
// profiling file: <profile_data_path>/filename_rank_id.suffix
|
||||||
|
char real_path[PATH_MAX] = {0};
|
||||||
|
#if defined(_WIN32) || defined(_WIN64)
|
||||||
|
if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) {
|
||||||
|
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) {
|
||||||
|
RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
std::string rank_id;
|
||||||
|
#ifdef ENABLE_GPUQUE
|
||||||
|
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
|
||||||
|
int32_t rank_id_int = cfg->rank_id();
|
||||||
|
// If DEVICE_ID is not set, default value is 0
|
||||||
|
if (rank_id_int < 0) {
|
||||||
|
rank_id = common::GetEnv("DEVICE_ID");
|
||||||
|
} else {
|
||||||
|
rank_id = std::to_string(rank_id_int);
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
rank_id = common::GetEnv("RANK_ID");
|
||||||
|
#endif
|
||||||
|
// If RANK_ID is not set, default value is 0
|
||||||
|
if (rank_id.empty()) {
|
||||||
|
rank_id = "0";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Output all profiling data upon request.
|
||||||
|
RETURN_IF_NOT_OK(Analyze());
|
||||||
|
RETURN_IF_NOT_OK(SaveProfilingData(std::string(profile_data_path), rank_id));
|
||||||
|
RETURN_IF_NOT_OK(ChangeFileMode(std::string(profile_data_path), rank_id));
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t ProfilingTime::GetCurMilliSecond() {
|
uint64_t ProfilingTime::GetCurMilliSecond() {
|
||||||
// because cpplint does not allow using namespace
|
// because cpplint does not allow using namespace
|
||||||
using std::chrono::duration_cast;
|
using std::chrono::duration_cast;
|
||||||
|
|
|
@ -16,13 +16,15 @@
|
||||||
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_
|
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_
|
||||||
#define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_
|
#define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_
|
||||||
|
|
||||||
#include <string>
|
#include <atomic>
|
||||||
#include <vector>
|
|
||||||
#include <unordered_map>
|
|
||||||
#include <memory>
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <string>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
#include <nlohmann/json.hpp>
|
#include <nlohmann/json.hpp>
|
||||||
|
#include "minddata/dataset/util/path.h"
|
||||||
#include "minddata/dataset/util/status.h"
|
#include "minddata/dataset/util/status.h"
|
||||||
#include "minddata/dataset/engine/perf/monitor.h"
|
#include "minddata/dataset/engine/perf/monitor.h"
|
||||||
|
|
||||||
|
@ -50,15 +52,15 @@ class Profiling : std::enable_shared_from_this<Profiling> {
|
||||||
// Destructor
|
// Destructor
|
||||||
virtual ~Profiling() = default;
|
virtual ~Profiling() = default;
|
||||||
|
|
||||||
virtual Status Init(const std::string &dir_path, const std::string &device_id) = 0;
|
virtual Status Init() = 0;
|
||||||
|
|
||||||
// Default serialization file generator
|
// Default serialization file generator
|
||||||
virtual Status SaveToFile() = 0;
|
virtual Status SaveToFile(const std::string &dir_path, const std::string &rank_id) = 0;
|
||||||
|
|
||||||
// Profiling name
|
// Profiling name
|
||||||
virtual std::string Name() const = 0;
|
virtual std::string Name() const = 0;
|
||||||
|
|
||||||
virtual Status ChangeFileMode() = 0;
|
virtual Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) = 0;
|
||||||
|
|
||||||
// Start collecting data
|
// Start collecting data
|
||||||
Status Start();
|
Status Start();
|
||||||
|
@ -68,8 +70,8 @@ class Profiling : std::enable_shared_from_this<Profiling> {
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool active_; // show current state of ProfilingManager (running, or paused)
|
bool active_; // show current state of ProfilingManager (running, or paused)
|
||||||
std::string file_path_;
|
|
||||||
std::mutex lock_;
|
std::mutex lock_;
|
||||||
|
virtual Path GetFileName(const std::string &dir_path, const std::string &rank_id) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Sampling is a class of profiling which generate samples periodically.
|
// Sampling is a class of profiling which generate samples periodically.
|
||||||
|
@ -80,7 +82,6 @@ class Sampling : public Profiling {
|
||||||
// virtual Status TestPrint() = 0;
|
// virtual Status TestPrint() = 0;
|
||||||
virtual Status Analyze() = 0;
|
virtual Status Analyze() = 0;
|
||||||
virtual ~Sampling() = default;
|
virtual ~Sampling() = default;
|
||||||
Status ReadJson(nlohmann::json *output);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct TracingRecord_s {
|
typedef struct TracingRecord_s {
|
||||||
|
@ -101,8 +102,8 @@ class Tracing : public Profiling {
|
||||||
public:
|
public:
|
||||||
// Tracing has minimal interface to provide flexible on data recording.
|
// Tracing has minimal interface to provide flexible on data recording.
|
||||||
// It only includes some common routines.
|
// It only includes some common routines.
|
||||||
Status SaveToFile() override;
|
Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
Status ChangeFileMode() override;
|
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override;
|
||||||
virtual Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
|
virtual Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
|
||||||
virtual Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
|
virtual Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
|
||||||
virtual Status GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
|
virtual Status GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
|
||||||
|
@ -149,9 +150,9 @@ class ProfilingManager {
|
||||||
Status Reset();
|
Status Reset();
|
||||||
|
|
||||||
// Save profile data to file
|
// Save profile data to file
|
||||||
// @param final_round The boolean to show if the monitor thread is terminating.
|
// @param dir_path_ The path to the directory where the profiling data will be saved.
|
||||||
// @return Status The status code returned
|
// @return Status The status code returned
|
||||||
Status SaveProfilingData(const bool final_round = false);
|
Status SaveProfilingData(const std::string &dir_path, const std::string &rank_id);
|
||||||
|
|
||||||
// Sampling node getter
|
// Sampling node getter
|
||||||
// @param name - The name of the requested node
|
// @param name - The name of the requested node
|
||||||
|
@ -177,14 +178,12 @@ class ProfilingManager {
|
||||||
// Launch monitoring thread.
|
// Launch monitoring thread.
|
||||||
Status LaunchMonitor();
|
Status LaunchMonitor();
|
||||||
|
|
||||||
// @param final_round The boolean to show if the monitor thread is terminating.
|
|
||||||
// @return Status The status code returned
|
// @return Status The status code returned
|
||||||
Status ChangeFileMode(const bool final_round = false);
|
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id);
|
||||||
|
|
||||||
// Analyze profile data and print warning messages
|
// Analyze profile data and print warning messages
|
||||||
// @param final_round The boolean to show if the monitor thread is terminating.
|
|
||||||
// @return Status The status code returned
|
// @return Status The status code returned
|
||||||
Status Analyze(const bool final_round = false);
|
Status Analyze();
|
||||||
|
|
||||||
#ifndef ENABLE_ANDROID
|
#ifndef ENABLE_ANDROID
|
||||||
/// \brief API to get User CPU utilization for the system
|
/// \brief API to get User CPU utilization for the system
|
||||||
|
@ -424,7 +423,7 @@ class ProfilingManager {
|
||||||
|
|
||||||
/// \brief API to initialize profiling manager
|
/// \brief API to initialize profiling manager
|
||||||
/// \return Status object with the error code
|
/// \return Status object with the error code
|
||||||
Status Init(const std::string &profile_data_path);
|
Status Init();
|
||||||
|
|
||||||
/// \brief API to signal the profiling nodes to start collecting data
|
/// \brief API to signal the profiling nodes to start collecting data
|
||||||
/// \return Status object with the error code
|
/// \return Status object with the error code
|
||||||
|
@ -434,6 +433,10 @@ class ProfilingManager {
|
||||||
/// \return Status object with the error code
|
/// \return Status object with the error code
|
||||||
Status Stop();
|
Status Stop();
|
||||||
|
|
||||||
|
/// \brief API to save to file all the collected data between Start and Stop calls
|
||||||
|
/// \return Status object with the error code
|
||||||
|
Status Save(const std::string &profile_data_path);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<Monitor> perf_monitor_;
|
std::unique_ptr<Monitor> perf_monitor_;
|
||||||
|
|
||||||
|
@ -444,12 +447,10 @@ class ProfilingManager {
|
||||||
kProfilingStateFinished,
|
kProfilingStateFinished,
|
||||||
};
|
};
|
||||||
ProfilingState profiling_state_; // show current state of ProfilingManager (running, or paused)
|
ProfilingState profiling_state_; // show current state of ProfilingManager (running, or paused)
|
||||||
bool enabled_;
|
std::atomic<bool> enabled_;
|
||||||
std::unordered_map<std::string, std::shared_ptr<Tracing>> tracing_nodes_;
|
std::unordered_map<std::string, std::shared_ptr<Tracing>> tracing_nodes_;
|
||||||
std::unordered_map<std::string, std::shared_ptr<Sampling>> sampling_nodes_;
|
std::unordered_map<std::string, std::shared_ptr<Sampling>> sampling_nodes_;
|
||||||
ExecutionTree *tree_; // ExecutionTree pointer
|
ExecutionTree *tree_; // ExecutionTree pointer
|
||||||
std::string dir_path_; // where to create profiling file
|
|
||||||
std::string device_id_; // used when create profiling file,filename_device_id.suffix
|
|
||||||
std::vector<uint64_t> epoch_end_ts_; // End of epoch timestamp
|
std::vector<uint64_t> epoch_end_ts_; // End of epoch timestamp
|
||||||
std::vector<uint32_t> epoch_end_step_; // End of epoch step number
|
std::vector<uint32_t> epoch_end_step_; // End of epoch step number
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ Common imported modules in corresponding API examples are as follows:
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
import random
|
import random
|
||||||
import time
|
|
||||||
import numpy
|
import numpy
|
||||||
import mindspore._c_dataengine as cde
|
import mindspore._c_dataengine as cde
|
||||||
from mindspore import log as logger
|
from mindspore import log as logger
|
||||||
|
@ -411,20 +410,6 @@ def load(file):
|
||||||
_config.load(file)
|
_config.load(file)
|
||||||
|
|
||||||
|
|
||||||
def _stop_dataset_profiler():
|
|
||||||
"""
|
|
||||||
Mainly for stop dataset profiler.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool, whether the profiler file has generated.
|
|
||||||
"""
|
|
||||||
|
|
||||||
while not _config.get_profiler_file_status():
|
|
||||||
_config.stop_dataset_profiler(True)
|
|
||||||
logger.warning("Profiling: waiting for dataset part profiling stop.")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
|
|
||||||
def get_enable_shared_mem():
|
def get_enable_shared_mem():
|
||||||
"""
|
"""
|
||||||
Get the default state of shared mem enabled variable.
|
Get the default state of shared mem enabled variable.
|
||||||
|
|
|
@ -23,7 +23,6 @@ from mindspore import log as logger, context
|
||||||
from mindspore.communication.management import GlobalComm, release, get_rank, get_group_size
|
from mindspore.communication.management import GlobalComm, release, get_rank, get_group_size
|
||||||
import mindspore._c_expression as c_expression
|
import mindspore._c_expression as c_expression
|
||||||
import mindspore._c_dataengine as cde
|
import mindspore._c_dataengine as cde
|
||||||
from mindspore.dataset.core.config import _stop_dataset_profiler
|
|
||||||
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
|
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
|
||||||
ProfilerIOException, ProfilerException, ProfilerRawFileException
|
ProfilerIOException, ProfilerException, ProfilerRawFileException
|
||||||
from mindspore.profiler.common.util import get_file_path, fwrite_format
|
from mindspore.profiler.common.util import get_file_path, fwrite_format
|
||||||
|
@ -144,7 +143,7 @@ class Profiler:
|
||||||
|
|
||||||
# Setup and start MindData Profiling
|
# Setup and start MindData Profiling
|
||||||
self._md_profiler = cde.GlobalContext.profiling_manager()
|
self._md_profiler = cde.GlobalContext.profiling_manager()
|
||||||
self._md_profiler.init(self._output_path)
|
self._md_profiler.init()
|
||||||
self._md_profiler.start()
|
self._md_profiler.start()
|
||||||
|
|
||||||
if self._device_target:
|
if self._device_target:
|
||||||
|
@ -253,7 +252,7 @@ class Profiler:
|
||||||
_environment_check()
|
_environment_check()
|
||||||
self._cpu_profiler.stop()
|
self._cpu_profiler.stop()
|
||||||
self._md_profiler.stop()
|
self._md_profiler.stop()
|
||||||
_stop_dataset_profiler()
|
self._md_profiler.save(self._output_path)
|
||||||
if self._device_target and self._device_target == "GPU":
|
if self._device_target and self._device_target == "GPU":
|
||||||
self._gpu_analyse()
|
self._gpu_analyse()
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ TEST_F(MindDataTestProfiler, TestProfilerManager1) {
|
||||||
// Enable profiler and check
|
// Enable profiler and check
|
||||||
common::SetEnv("RANK_ID", "1");
|
common::SetEnv("RANK_ID", "1");
|
||||||
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
|
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
|
||||||
EXPECT_OK(profiler_manager->Init("."));
|
EXPECT_OK(profiler_manager->Init());
|
||||||
EXPECT_OK(profiler_manager->Start());
|
EXPECT_OK(profiler_manager->Start());
|
||||||
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
|
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
|
||||||
|
|
||||||
|
@ -95,12 +95,13 @@ TEST_F(MindDataTestProfiler, TestProfilerManager1) {
|
||||||
// Manually terminate the pipeline
|
// Manually terminate the pipeline
|
||||||
iter->Stop();
|
iter->Stop();
|
||||||
|
|
||||||
// File_id is expected to equal RANK_ID
|
// Stop MindData Profiling and save output files to current working directory
|
||||||
EXPECT_OK(DeleteFiles(1));
|
|
||||||
|
|
||||||
// Disable profiler
|
|
||||||
EXPECT_OK(profiler_manager->Stop());
|
EXPECT_OK(profiler_manager->Stop());
|
||||||
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
|
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
|
||||||
|
EXPECT_OK(profiler_manager->Save("."));
|
||||||
|
|
||||||
|
// File_id is expected to equal RANK_ID
|
||||||
|
EXPECT_OK(DeleteFiles(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Feature: MindData Profiling Support
|
/// Feature: MindData Profiling Support
|
||||||
|
@ -112,7 +113,7 @@ TEST_F(MindDataTestProfiler, TestProfilerManager2) {
|
||||||
// Enable profiler and check
|
// Enable profiler and check
|
||||||
common::SetEnv("RANK_ID", "2");
|
common::SetEnv("RANK_ID", "2");
|
||||||
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
|
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
|
||||||
EXPECT_OK(profiler_manager->Init("."));
|
EXPECT_OK(profiler_manager->Init());
|
||||||
EXPECT_OK(profiler_manager->Start());
|
EXPECT_OK(profiler_manager->Start());
|
||||||
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
|
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
|
||||||
|
|
||||||
|
@ -147,12 +148,13 @@ TEST_F(MindDataTestProfiler, TestProfilerManager2) {
|
||||||
// Manually terminate the pipeline
|
// Manually terminate the pipeline
|
||||||
iter->Stop();
|
iter->Stop();
|
||||||
|
|
||||||
// File_id is expected to equal RANK_ID
|
// Stop MindData Profiling and save output files to current working directory
|
||||||
EXPECT_OK(DeleteFiles(2));
|
|
||||||
|
|
||||||
// Disable profiler
|
|
||||||
EXPECT_OK(profiler_manager->Stop());
|
EXPECT_OK(profiler_manager->Stop());
|
||||||
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
|
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
|
||||||
|
EXPECT_OK(profiler_manager->Save("."));
|
||||||
|
|
||||||
|
// File_id is expected to equal RANK_ID
|
||||||
|
EXPECT_OK(DeleteFiles(2));
|
||||||
}
|
}
|
||||||
} // namespace test
|
} // namespace test
|
||||||
} // namespace dataset
|
} // namespace dataset
|
||||||
|
|
|
@ -77,8 +77,8 @@ class TestMinddataProfilingManager:
|
||||||
os.environ['RANK_ID'] = file_id
|
os.environ['RANK_ID'] = file_id
|
||||||
os.environ['DEVICE_ID'] = file_id
|
os.environ['DEVICE_ID'] = file_id
|
||||||
|
|
||||||
# Initialize MindData profiling manager with current working directory
|
# Initialize MindData profiling manager
|
||||||
self.md_profiler.init("./")
|
self.md_profiler.init()
|
||||||
|
|
||||||
# Start MindData Profiling
|
# Start MindData Profiling
|
||||||
self.md_profiler.start()
|
self.md_profiler.start()
|
||||||
|
@ -87,8 +87,6 @@ class TestMinddataProfilingManager:
|
||||||
"""
|
"""
|
||||||
Run after each test function.
|
Run after each test function.
|
||||||
"""
|
"""
|
||||||
# Stop MindData Profiling
|
|
||||||
self.md_profiler.stop()
|
|
||||||
|
|
||||||
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
|
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
|
||||||
file_id = file_name_map_rank_id[file_name]
|
file_id = file_name_map_rank_id[file_name]
|
||||||
|
@ -155,6 +153,10 @@ class TestMinddataProfilingManager:
|
||||||
for _ in data1:
|
for _ in data1:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm profiling files now exist
|
# Confirm profiling files now exist
|
||||||
assert os.path.exists(pipeline_file) is True
|
assert os.path.exists(pipeline_file) is True
|
||||||
assert os.path.exists(cpu_util_file) is True
|
assert os.path.exists(cpu_util_file) is True
|
||||||
|
@ -186,6 +188,10 @@ class TestMinddataProfilingManager:
|
||||||
for _ in data3:
|
for _ in data3:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
with open(pipeline_file) as f:
|
with open(pipeline_file) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
op_info = data["op_info"]
|
op_info = data["op_info"]
|
||||||
|
@ -238,6 +244,10 @@ class TestMinddataProfilingManager:
|
||||||
|
|
||||||
assert num_iter == 10
|
assert num_iter == 10
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm pipeline is created with EpochCtrl op
|
# Confirm pipeline is created with EpochCtrl op
|
||||||
with open(pipeline_file) as f:
|
with open(pipeline_file) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
|
@ -280,6 +290,10 @@ class TestMinddataProfilingManager:
|
||||||
for _ in data1:
|
for _ in data1:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
with open(pipeline_file) as f:
|
with open(pipeline_file) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
op_info = data["op_info"]
|
op_info = data["op_info"]
|
||||||
|
@ -316,6 +330,10 @@ class TestMinddataProfilingManager:
|
||||||
|
|
||||||
ds.config.set_monitor_sampling_interval(interval_origin)
|
ds.config.set_monitor_sampling_interval(interval_origin)
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
def test_profiling_basic_pipeline(self):
|
def test_profiling_basic_pipeline(self):
|
||||||
"""
|
"""
|
||||||
Test with this basic pipeline
|
Test with this basic pipeline
|
||||||
|
@ -347,6 +365,10 @@ class TestMinddataProfilingManager:
|
||||||
|
|
||||||
assert num_iter == 1000
|
assert num_iter == 1000
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
with open(pipeline_file) as f:
|
with open(pipeline_file) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
op_info = data["op_info"]
|
op_info = data["op_info"]
|
||||||
|
@ -394,6 +416,10 @@ class TestMinddataProfilingManager:
|
||||||
|
|
||||||
assert num_iter == 750
|
assert num_iter == 750
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
with open(pipeline_file) as f:
|
with open(pipeline_file) as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
op_info = data["op_info"]
|
op_info = data["op_info"]
|
||||||
|
@ -434,11 +460,20 @@ class TestMinddataProfilingManager:
|
||||||
num_iter += 1
|
num_iter += 1
|
||||||
assert num_iter == 2
|
assert num_iter == 2
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm pipeline file and CPU util file each have 3 ops
|
# Confirm pipeline file and CPU util file each have 3 ops
|
||||||
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
|
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
|
||||||
self.confirm_cpuutil(3, cpu_util_file)
|
self.confirm_cpuutil(3, cpu_util_file)
|
||||||
|
|
||||||
# Test B - Call create_dict_iterator with num_epochs=1
|
# Test B - Call create_dict_iterator with num_epochs=1
|
||||||
|
|
||||||
|
# Initialize and Start MindData profiling manager
|
||||||
|
self.md_profiler.init()
|
||||||
|
self.md_profiler.start()
|
||||||
|
|
||||||
num_iter = 0
|
num_iter = 0
|
||||||
# Note: If create_dict_iterator() is called with num_epochs=1,
|
# Note: If create_dict_iterator() is called with num_epochs=1,
|
||||||
# then EpochCtrlOp should not be NOT added to the pipeline
|
# then EpochCtrlOp should not be NOT added to the pipeline
|
||||||
|
@ -446,6 +481,10 @@ class TestMinddataProfilingManager:
|
||||||
num_iter += 1
|
num_iter += 1
|
||||||
assert num_iter == 2
|
assert num_iter == 2
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm pipeline file and CPU util file each have 2 ops
|
# Confirm pipeline file and CPU util file each have 2 ops
|
||||||
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
|
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
|
||||||
self.confirm_cpuutil(2, cpu_util_file)
|
self.confirm_cpuutil(2, cpu_util_file)
|
||||||
|
@ -473,11 +512,20 @@ class TestMinddataProfilingManager:
|
||||||
num_iter += 1
|
num_iter += 1
|
||||||
assert num_iter == 4
|
assert num_iter == 4
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm pipeline file and CPU util file each have 2 ops
|
# Confirm pipeline file and CPU util file each have 2 ops
|
||||||
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
|
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
|
||||||
self.confirm_cpuutil(2, cpu_util_file)
|
self.confirm_cpuutil(2, cpu_util_file)
|
||||||
|
|
||||||
# Test B - Call create_dict_iterator with num_epochs>1
|
# Test B - Call create_dict_iterator with num_epochs>1
|
||||||
|
|
||||||
|
# Initialize and Start MindData profiling manager
|
||||||
|
self.md_profiler.init()
|
||||||
|
self.md_profiler.start()
|
||||||
|
|
||||||
num_iter = 0
|
num_iter = 0
|
||||||
# Note: If create_dict_iterator() is called with num_epochs>1,
|
# Note: If create_dict_iterator() is called with num_epochs>1,
|
||||||
# then EpochCtrlOp should be added to the pipeline
|
# then EpochCtrlOp should be added to the pipeline
|
||||||
|
@ -485,6 +533,10 @@ class TestMinddataProfilingManager:
|
||||||
num_iter += 1
|
num_iter += 1
|
||||||
assert num_iter == 4
|
assert num_iter == 4
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm pipeline file and CPU util file each have 3 ops
|
# Confirm pipeline file and CPU util file each have 3 ops
|
||||||
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
|
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
|
||||||
self.confirm_cpuutil(3, cpu_util_file)
|
self.confirm_cpuutil(3, cpu_util_file)
|
||||||
|
@ -511,17 +563,30 @@ class TestMinddataProfilingManager:
|
||||||
num_iter += 1
|
num_iter += 1
|
||||||
assert num_iter == 4
|
assert num_iter == 4
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm pipeline file and CPU util file each have 2 ops
|
# Confirm pipeline file and CPU util file each have 2 ops
|
||||||
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
|
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
|
||||||
self.confirm_cpuutil(2, cpu_util_file)
|
self.confirm_cpuutil(2, cpu_util_file)
|
||||||
|
|
||||||
# Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
|
# Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
|
||||||
|
|
||||||
|
# Initialize and Start MindData profiling manager
|
||||||
|
self.md_profiler.init()
|
||||||
|
self.md_profiler.start()
|
||||||
|
|
||||||
data2 = data2.repeat(5)
|
data2 = data2.repeat(5)
|
||||||
num_iter = 0
|
num_iter = 0
|
||||||
for _ in data2.create_dict_iterator(num_epochs=1):
|
for _ in data2.create_dict_iterator(num_epochs=1):
|
||||||
num_iter += 1
|
num_iter += 1
|
||||||
assert num_iter == 20
|
assert num_iter == 20
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save('./')
|
||||||
|
|
||||||
# Confirm pipeline file and CPU util file each have 3 ops
|
# Confirm pipeline file and CPU util file each have 3 ops
|
||||||
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)
|
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)
|
||||||
self.confirm_cpuutil(3, cpu_util_file)
|
self.confirm_cpuutil(3, cpu_util_file)
|
||||||
|
|
|
@ -55,7 +55,6 @@ class TestMinddataProfilingAnalyzer:
|
||||||
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
|
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
|
||||||
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
|
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
|
||||||
|
|
||||||
|
|
||||||
def setup_method(self):
|
def setup_method(self):
|
||||||
"""
|
"""
|
||||||
Run before each test function.
|
Run before each test function.
|
||||||
|
@ -81,19 +80,16 @@ class TestMinddataProfilingAnalyzer:
|
||||||
os.environ['RANK_ID'] = file_id
|
os.environ['RANK_ID'] = file_id
|
||||||
os.environ['DEVICE_ID'] = file_id
|
os.environ['DEVICE_ID'] = file_id
|
||||||
|
|
||||||
# Initialize MindData profiling manager with current working directory
|
# Initialize MindData profiling manager
|
||||||
self.md_profiler.init(os.getcwd())
|
self.md_profiler.init()
|
||||||
|
|
||||||
# Start MindData Profiling
|
# Start MindData Profiling
|
||||||
self.md_profiler.start()
|
self.md_profiler.start()
|
||||||
|
|
||||||
|
|
||||||
def teardown_method(self):
|
def teardown_method(self):
|
||||||
"""
|
"""
|
||||||
Run after each test function.
|
Run after each test function.
|
||||||
"""
|
"""
|
||||||
# Stop MindData Profiling
|
|
||||||
self.md_profiler.stop()
|
|
||||||
|
|
||||||
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
|
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
|
||||||
file_id = file_name_map_rank_id[file_name]
|
file_id = file_name_map_rank_id[file_name]
|
||||||
|
@ -219,6 +215,10 @@ class TestMinddataProfilingAnalyzer:
|
||||||
# Confirm number of rows returned
|
# Confirm number of rows returned
|
||||||
assert num_iter == 1000
|
assert num_iter == 1000
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save(os.getcwd())
|
||||||
|
|
||||||
# Confirm MindData Profiling files are created
|
# Confirm MindData Profiling files are created
|
||||||
assert os.path.exists(pipeline_file) is True
|
assert os.path.exists(pipeline_file) is True
|
||||||
assert os.path.exists(cpu_util_file) is True
|
assert os.path.exists(cpu_util_file) is True
|
||||||
|
@ -279,6 +279,10 @@ class TestMinddataProfilingAnalyzer:
|
||||||
# Confirm number of rows returned
|
# Confirm number of rows returned
|
||||||
assert num_iter == 125
|
assert num_iter == 125
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save(os.getcwd())
|
||||||
|
|
||||||
# Confirm MindData Profiling files are created
|
# Confirm MindData Profiling files are created
|
||||||
assert os.path.exists(pipeline_file) is True
|
assert os.path.exists(pipeline_file) is True
|
||||||
assert os.path.exists(cpu_util_file) is True
|
assert os.path.exists(cpu_util_file) is True
|
||||||
|
@ -286,6 +290,11 @@ class TestMinddataProfilingAnalyzer:
|
||||||
|
|
||||||
# Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1
|
# Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1
|
||||||
# Note: This pipeline has 3 ops: Generator -> Map -> Batch
|
# Note: This pipeline has 3 ops: Generator -> Map -> Batch
|
||||||
|
|
||||||
|
# Initialize and Start MindData profiling manager
|
||||||
|
self.md_profiler.init()
|
||||||
|
self.md_profiler.start()
|
||||||
|
|
||||||
num_iter = 0
|
num_iter = 0
|
||||||
# Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
|
# Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
|
||||||
for _ in data1.create_dict_iterator(num_epochs=1):
|
for _ in data1.create_dict_iterator(num_epochs=1):
|
||||||
|
@ -294,6 +303,10 @@ class TestMinddataProfilingAnalyzer:
|
||||||
# Confirm number of rows returned
|
# Confirm number of rows returned
|
||||||
assert num_iter == 125
|
assert num_iter == 125
|
||||||
|
|
||||||
|
# Stop MindData Profiling and save output files to current working directory
|
||||||
|
self.md_profiler.stop()
|
||||||
|
self.md_profiler.save(os.getcwd())
|
||||||
|
|
||||||
# Confirm MindData Profiling files are created
|
# Confirm MindData Profiling files are created
|
||||||
# Note: There is an MD bug in which which the pipeline file is not recreated;
|
# Note: There is an MD bug in which which the pipeline file is not recreated;
|
||||||
# it still has 4 ops instead of 3 ops
|
# it still has 4 ops instead of 3 ops
|
||||||
|
|
Loading…
Reference in New Issue