add MD Profiler Save()

This commit is contained in:
mohammad 2021-11-15 11:59:50 -05:00
parent 04da5c2808
commit 5c8ab5f60c
21 changed files with 280 additions and 238 deletions

View File

@ -49,8 +49,6 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_)
.def("set_callback_timeout", &ConfigManager::set_callback_timeout)
.def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval)
.def("stop_dataset_profiler", &ConfigManager::stop_dataset_profiler)
.def("get_profiler_file_status", &ConfigManager::get_profiler_file_status)
.def("set_num_parallel_workers",
[](ConfigManager &c, int32_t num) { THROW_IF_ERROR(c.set_num_parallel_workers(num)); })
.def("set_op_connector_size", &ConfigManager::set_op_connector_size)

View File

@ -21,14 +21,15 @@ namespace mindspore {
namespace dataset {
PYBIND_REGISTER(ProfilingManager, 0, ([](const py::module *m) {
(void)py::class_<ProfilingManager, std::shared_ptr<ProfilingManager>>(*m, "ProfilingManager")
.def(
"init",
[](ProfilingManager &prof_mgr, const std::string &profile_data_path) {
THROW_IF_ERROR(prof_mgr.Init(profile_data_path));
},
py::arg("profile_data_path"))
.def("init", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Init()); })
.def("start", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Start()); })
.def("stop", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Stop()); });
.def("stop", [](ProfilingManager &prof_mgr) { THROW_IF_ERROR(prof_mgr.Stop()); })
.def(
"save",
[](ProfilingManager &prof_mgr, const std::string &profile_data_path) {
THROW_IF_ERROR(prof_mgr.Save(profile_data_path));
},
py::arg("profile_data_path"));
}));
} // namespace dataset
} // namespace mindspore

View File

@ -41,8 +41,6 @@ ConfigManager::ConfigManager()
seed_(kCfgDefaultSeed),
numa_enable_(false),
monitor_sampling_interval_(kCfgMonitorSamplingInterval),
stop_profiler_(false),
file_ready_(true),
callback_timout_(kCfgCallbackTimeout),
cache_host_(kCfgDefaultCacheHost),
cache_port_(kCfgDefaultCachePort),
@ -150,10 +148,6 @@ void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }
void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }
void ConfigManager::stop_dataset_profiler(bool stop_profiler) { stop_profiler_ = stop_profiler; }
void ConfigManager::set_profiler_file_status(bool file_ready) { file_ready_ = file_ready; }
void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; }
void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); }

View File

@ -180,22 +180,6 @@ class ConfigManager {
// @return The interval of monitor sampling
int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; }
// setter function
// @param stop_profiler - The setting to apply to the config
void stop_dataset_profiler(bool stop_profiler);
// getter function
// @return The status of stop profiler
bool stop_profiler_status() const { return stop_profiler_; }
// setter function
// @param file_ready - The setting to apply to the config
void set_profiler_file_status(bool file_ready);
// getter function
// @return The status of profiler file, whether generated
bool get_profiler_file_status() const { return file_ready_; }
// setter function
// @param auto_num_workers - whether assign threads to each op automatically
void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; }
@ -249,8 +233,6 @@ class ConfigManager {
int32_t rank_id_;
uint32_t seed_;
uint32_t monitor_sampling_interval_;
std::atomic_bool stop_profiler_;
std::atomic_bool file_ready_;
uint32_t callback_timout_;
std::string cache_host_;
int32_t cache_port_;

View File

@ -40,7 +40,7 @@ Status ConnectorSize::Sample() {
}
// JSON serializer helper function
json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size) {
json ConnectorSize::ParseOpInfo(const DatasetOp &node) {
json json_node;
json_node["op_id"] = node.id();
json_node["op_type"] = node.Name();
@ -49,7 +49,7 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t
// DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
// So we should not output its queue size.
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
metrics["output_queue"] = {{"size", size}, {"length", node.ConnectorCapacity()}};
metrics["output_queue"] = {{"length", node.ConnectorCapacity()}};
}
json_node["metrics"] = metrics;
@ -65,44 +65,42 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t
}
// Save profiling data to file
// If the file is already exist (created by other sampling node), simply add the data to metrics field.
Status ConnectorSize::SaveToFile() {
json output;
RETURN_IF_NOT_OK(ReadJson(&output));
Status ConnectorSize::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
Path path = GetFileName(dir_path, rank_id);
// Remove the file if it exists (from prior profiling usage)
RETURN_IF_NOT_OK(path.Remove());
std::string file_path = path.ToString();
Path path = Path(file_path_);
uint32_t idx = 0;
// Traverse the ExecutionTree for JSON node generation
for (auto &node : *tree_) {
json output = initial_nodes_data;
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
// Traverse the JSON initialized in Init() to access each op's information
CHECK_FAIL_RETURN_UNEXPECTED(output.contains("op_info"), "JSON data does not include op_info!");
for (uint32_t idx = 0; idx < output["op_info"].size(); idx++) {
std::vector<int32_t> cur_queue_size;
(void)std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size),
[&](const ConnectorSizeSample &sample) { return sample[idx]; });
if (!path.Exists()) {
json json_node = ParseOpInfo(node, cur_queue_size);
output["op_info"].push_back(json_node);
} else {
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
auto &ops_data = output["op_info"];
ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size;
ops_data[idx]["metrics"]["output_queue"]["length"] = node.ConnectorCapacity();
}
}
idx++;
auto &ops_data = output["op_info"];
if (ops_data[idx]["metrics"].contains("output_queue") && ops_data[idx]["op_type"] != "DeviceQueueOp") {
ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size;
}
}
// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
std::ofstream os(file_path, std::ios::trunc);
os << output;
os.close();
return Status::OK();
}
Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).ToString();
Path path = Path(file_path_);
// Remove the file if it exists (from prior profiling usage)
RETURN_IF_NOT_OK(path.Remove());
Status ConnectorSize::Init() {
// Traverse the ExecutionTree for JSON node generation
for (auto &node : *tree_) {
json json_node = ParseOpInfo(node);
initial_nodes_data["op_info"].push_back(json_node);
}
return Status::OK();
}
@ -139,5 +137,9 @@ Status ConnectorSize::GetOpConnectorSize(int32_t op_id, uint64_t start_time, uin
return Status::OK();
}
Path ConnectorSize::GetFileName(const std::string &dir_path, const std::string &rank_id) {
return Path(dir_path) / Path("pipeline_profiling_" + rank_id + ".json");
}
} // namespace dataset
} // namespace mindspore

View File

@ -55,15 +55,15 @@ class ConnectorSize : public Sampling {
// Save sampling data to file
// @return Status The status code returned
Status SaveToFile() override;
Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override;
Status Init(const std::string &dir_path, const std::string &device_id) override;
Status Init() override;
// Parse op information and transform to json format
json ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size);
json ParseOpInfo(const DatasetOp &node);
// Change file mode after save throughput data
Status ChangeFileMode() override { return Status::OK(); }
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override { return Status::OK(); }
Status Analyze() override;
@ -71,9 +71,11 @@ class ConnectorSize : public Sampling {
Status GetOpConnectorSize(int32_t op_id, uint64_t start_time, uint64_t end_time, std::vector<int32_t> *result);
private:
json initial_nodes_data; // store data when execution tree is running. (all information for ops except sampled data)
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling
Timestamps ts_; // time of sample
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
};
} // namespace dataset

View File

@ -386,14 +386,10 @@ Status CpuSampler::UpdateTaskList() {
return Status::OK();
}
Status CpuSampler::Init(const std::string &dir_path, const std::string &device_id) {
Status CpuSampler::Init() {
#if defined(USING_LINUX)
main_pid_ = syscall(SYS_getpid);
#endif
auto path = Path(dir_path) / Path("minddata_cpu_utilization_" + device_id + ".json");
// remove file if it already exists
RETURN_IF_NOT_OK(path.Remove());
file_path_ = path.ToString();
for (auto iter = tree->begin(); iter != tree->end(); iter++) {
auto op_id = iter->id();
(void)op_info_by_id_.emplace(std::make_pair(op_id, MDOperatorCpuInfo(op_id)));
@ -406,15 +402,22 @@ Status CpuSampler::Init(const std::string &dir_path, const std::string &device_i
return Status::OK();
}
Status CpuSampler::ChangeFileMode() {
if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
std::string err_str = "Change file mode failed," + file_path_;
Status CpuSampler::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
Path path = GetFileName(dir_path, rank_id);
std::string file_path = path.ToString();
if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) {
std::string err_str = "Change file mode failed," + file_path;
return Status(StatusCode::kMDUnexpectedError, err_str);
}
return Status::OK();
}
Status CpuSampler::SaveToFile() {
Status CpuSampler::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
Path path = GetFileName(dir_path, rank_id);
// Remove the file if it exists (from prior profiling usage)
RETURN_IF_NOT_OK(path.Remove());
std::string file_path = path.ToString();
// construct json obj to write to file
json output;
output["cpu_processor_num"] = SystemCpuInfo::num_cpu_;
@ -448,7 +451,7 @@ Status CpuSampler::SaveToFile() {
output["time_stamp"] = ts_;
// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
std::ofstream os(file_path, std::ios::trunc);
os << output;
os.close();
@ -508,5 +511,9 @@ Status CpuSampler::GetSystemSysCpuUtil(uint64_t start_ts, uint64_t end_ts, std::
auto end_index = std::distance(ts_.begin(), upper);
return sys_cpu_info_.GetSysCpuUtil(start_index, end_index, result);
}
Path CpuSampler::GetFileName(const std::string &dir_path, const std::string &rank_id) {
return Path(dir_path) / Path("minddata_cpu_utilization_" + rank_id + ".json");
}
} // namespace dataset
} // namespace mindspore

View File

@ -142,9 +142,9 @@ class CpuSampler : public Sampling {
explicit CpuSampler(ExecutionTree *tree) : fetched_all_python_multiprocesses_(false), tree(tree) {}
~CpuSampler() = default;
Status Sample() override;
Status Init(const std::string &dir_path, const std::string &device_id) override;
Status ChangeFileMode() override;
Status SaveToFile() override;
Status Init() override;
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override;
Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override;
std::string Name() const override { return kCpuSamplerName; }
Status Analyze() override;
Status GetSystemUserCpuUtil(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result);
@ -163,6 +163,7 @@ class CpuSampler : public Sampling {
std::shared_ptr<ThreadCpuInfo> main_thread_cpu_info_;
std::shared_ptr<ProcessCpuInfo> main_process_cpu_info_;
std::unordered_map<int32_t, MDOperatorCpuInfo> op_info_by_id_;
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
};
} // namespace dataset
} // namespace mindspore

View File

@ -29,8 +29,7 @@ namespace dataset {
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 0;
Status DatasetIteratorTracing::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("dataset_iterator_profiling_" + device_id + ".txt")).ToString();
Status DatasetIteratorTracing::Init() {
(void)ts_.emplace_back(0);
return Status::OK();
}
@ -80,5 +79,8 @@ Status DatasetIteratorTracing::GetConnectorCapacity(int32_t start_step, int32_t
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
}
Path DatasetIteratorTracing::GetFileName(const std::string &dir_path, const std::string &rank_id) {
return Path(dir_path) / Path("dataset_iterator_profiling_" + rank_id + ".txt");
}
} // namespace dataset
} // namespace mindspore

View File

@ -34,7 +34,7 @@ class DatasetIteratorTracing : public Tracing {
std::string Name() const override { return kDatasetIteratorTracingName; };
Status Init(const std::string &dir_path, const std::string &device_id) override;
Status Init() override;
Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
@ -42,6 +42,9 @@ class DatasetIteratorTracing : public Tracing {
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
private:
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
};
} // namespace dataset
} // namespace mindspore

View File

@ -33,8 +33,7 @@ constexpr int32_t BATCH_TIME_OFFSET = 1;
constexpr int32_t PIPELINE_TIME_OFFSET = 2;
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 3;
Status DeviceQueueTracing::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("device_queue_profiling_" + device_id + ".txt")).ToString();
Status DeviceQueueTracing::Init() {
(void)ts_.emplace_back(0);
return Status::OK();
}
@ -82,5 +81,9 @@ Status DeviceQueueTracing::GetEmptyQueueFrequency(int32_t start_step, int32_t en
Status DeviceQueueTracing::GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
}
Path DeviceQueueTracing::GetFileName(const std::string &dir_path, const std::string &rank_id) {
return Path(dir_path) / Path("device_queue_profiling_" + rank_id + ".txt");
}
} // namespace dataset
} // namespace mindspore

View File

@ -20,6 +20,7 @@
#include <string>
#include <vector>
#include "minddata/dataset/engine/perf/profiling.h"
#include "minddata/dataset/util/path.h"
namespace mindspore {
namespace dataset {
@ -34,7 +35,7 @@ class DeviceQueueTracing : public Tracing {
std::string Name() const override { return kDeviceQueueTracingName; };
Status Init(const std::string &dir_path, const std::string &device_id) override;
Status Init() override;
Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
@ -42,6 +43,9 @@ class DeviceQueueTracing : public Tracing {
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
private:
Path GetFileName(const std::string &dir_path, const std::string &rank_id) override;
};
} // namespace dataset
} // namespace mindspore

View File

@ -29,39 +29,23 @@ Monitor::Monitor(ProfilingManager *profiling_manager) : profiling_manager_(profi
Status Monitor::operator()() {
// Register this thread with TaskManager to receive proper interrupt signal.
TaskManager::FindMe()->Post();
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
cfg->set_profiler_file_status(false);
std::unique_lock<std::mutex> _lock(mux_);
// Keep sampling if
// 1) Monitor Task is not interrupted by TaskManager AND
// 2) Iterator has not received EOF
// this will trigger a save on 2min, 4min, 8min, 16min ... mark on top of the save per_epoch
// The idea is whenever training is interrupted, you will get at least half of the sampling data during training
constexpr int64_t num_ms_in_two_minutes = 120000;
int64_t save_interval = 1 + (num_ms_in_two_minutes / sampling_interval_);
int64_t loop_cnt = 1;
constexpr int64_t geometric_series_ratio = 2;
while (!this_thread::is_interrupted() && !(tree_->isFinished()) && !(cfg->stop_profiler_status())) {
while (!this_thread::is_interrupted() && !(tree_->isFinished())) {
if (tree_->IsEpochEnd()) {
RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData());
tree_->SetExecuting();
} else if (loop_cnt % save_interval == 0) {
RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData());
}
for (auto &node : profiling_manager_->GetSamplingNodes()) {
RETURN_IF_NOT_OK(node.second->Sample());
}
if (loop_cnt % save_interval == 0) save_interval *= geometric_series_ratio;
loop_cnt += 1;
std::this_thread::sleep_for(std::chrono::milliseconds(sampling_interval_));
RETURN_IF_NOT_OK(cv_.WaitFor(&_lock, sampling_interval_));
}
MS_LOG(INFO) << "Monitor Thread terminating...";
// Output all profiling data upon request.
RETURN_IF_NOT_OK(profiling_manager_->Analyze(true));
RETURN_IF_NOT_OK(profiling_manager_->SaveProfilingData(true));
RETURN_IF_NOT_OK(profiling_manager_->ChangeFileMode(true));
cfg->set_profiler_file_status(true);
return Status::OK();
}

View File

@ -18,9 +18,11 @@
#define MINDSPORE_MONITOR_H
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>
#include "minddata/dataset/engine/perf/profiling.h"
#include "minddata/dataset/util/cond_var.h"
#include "minddata/dataset/util/status.h"
namespace mindspore {
@ -46,6 +48,8 @@ class Monitor {
ProfilingManager *profiling_manager_;
int64_t sampling_interval_;
ExecutionTree *tree_;
std::mutex mux_;
CondVar cv_;
};
} // namespace dataset
} // namespace mindspore

View File

@ -48,12 +48,18 @@ Status Profiling::Stop() {
return Status::OK();
}
Status Tracing::SaveToFile() {
Status Tracing::SaveToFile(const std::string &dir_path, const std::string &rank_id) {
if (value_.empty()) {
return Status::OK();
}
std::ofstream handle(file_path_, std::ios::trunc);
Path path = GetFileName(dir_path, rank_id);
// Remove the file if it exists (from prior profiling usage)
RETURN_IF_NOT_OK(path.Remove());
std::string file_path = path.ToString();
MS_LOG(INFO) << "Start to save profiling data for a tracing node.";
std::ofstream handle(file_path, std::ios::trunc);
if (!handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
}
@ -65,13 +71,15 @@ Status Tracing::SaveToFile() {
return Status::OK();
}
Status Tracing::ChangeFileMode() {
Status Tracing::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
if (value_.empty()) {
return Status::OK();
}
if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) {
std::string err_str = "Change file mode failed," + file_path_;
Path path = GetFileName(dir_path, rank_id);
std::string file_path = path.ToString();
if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) {
std::string err_str = "Change file mode failed," + file_path;
return Status(StatusCode::kMDUnexpectedError, err_str);
}
return Status::OK();
@ -172,24 +180,6 @@ Status Tracing::GetRecordEntryFieldValue(int32_t start_step, int32_t end_step, i
Tracing::Tracing(int32_t records_per_step) : records_per_step_(records_per_step) {}
Status Sampling::ReadJson(nlohmann::json *output) {
RETURN_UNEXPECTED_IF_NULL(output);
Path path = Path(file_path_);
if (path.Exists()) {
MS_LOG(DEBUG) << file_path_ << " exists";
try {
std::ifstream file(file_path_);
file >> (*output);
} catch (const std::exception &err) {
RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + file_path_ +
", please delete it and try again!");
}
} else {
(*output)["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
}
return Status::OK();
}
// Constructor
ProfilingManager::ProfilingManager() : profiling_state_(ProfilingState::kProfilingStateUnBegun), enabled_(false) {}
@ -202,24 +192,7 @@ Status ProfilingManager::RegisterTree(TreeAdapter *tree_adapter) {
perf_monitor_ = std::make_unique<Monitor>(this);
#ifdef ENABLE_GPUQUE
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
int32_t rank_id = cfg->rank_id();
// If DEVICE_ID is not set, default value is 0
if (rank_id < 0) {
device_id_ = common::GetEnv("DEVICE_ID");
} else {
device_id_ = std::to_string(rank_id);
}
#else
device_id_ = common::GetEnv("RANK_ID");
#endif
// If RANK_ID is not set, default value is 0
if (device_id_.empty()) {
device_id_ = "0";
}
// Register all profiling node.
std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
@ -249,7 +222,7 @@ Status ProfilingManager::RegisterTracingNode(std::shared_ptr<Tracing> node) {
return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
}
// Register the node with its name as key.
RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
RETURN_IF_NOT_OK(node->Init());
tracing_nodes_[node->Name()] = node;
// the user may have already started profiling.
@ -279,7 +252,7 @@ Status ProfilingManager::RegisterSamplingNode(std::shared_ptr<Sampling> node) {
return Status(StatusCode::kMDProfilingError, "Profiling node already exist: " + node->Name());
}
// Register the node with its name as key.
RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
RETURN_IF_NOT_OK(node->Init());
sampling_nodes_[node->Name()] = node;
// the user may have already started profiling.
@ -301,25 +274,19 @@ Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_pt
return Status::OK();
}
Status ProfilingManager::SaveProfilingData(const bool final_round) {
if ((!IsProfilingEnable()) && !final_round) {
return Status::OK();
}
Status ProfilingManager::SaveProfilingData(const std::string &dir_path, const std::string &rank_id) {
MS_LOG(INFO) << "Start to save profiling data.";
for (auto node : tracing_nodes_) {
RETURN_IF_NOT_OK(node.second->SaveToFile());
RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
}
for (auto node : sampling_nodes_) {
RETURN_IF_NOT_OK(node.second->SaveToFile());
RETURN_IF_NOT_OK(node.second->SaveToFile(dir_path, rank_id));
}
MS_LOG(INFO) << "Save profiling data end.";
return Status::OK();
}
Status ProfilingManager::Analyze(const bool final_round) {
if (!IsProfilingEnable() && !final_round) {
return Status::OK();
}
Status ProfilingManager::Analyze() {
MS_LOG(INFO) << "Start to analyze profiling data.";
for (auto node : sampling_nodes_) {
RETURN_IF_NOT_OK(node.second->Analyze());
@ -327,16 +294,13 @@ Status ProfilingManager::Analyze(const bool final_round) {
return Status::OK();
}
Status ProfilingManager::ChangeFileMode(const bool final_round) {
if (!IsProfilingEnable() && !final_round) {
return Status::OK();
}
Status ProfilingManager::ChangeFileMode(const std::string &dir_path, const std::string &rank_id) {
MS_LOG(INFO) << "Start to change file mode.";
for (auto node : tracing_nodes_) {
RETURN_IF_NOT_OK(node.second->ChangeFileMode());
RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
}
for (auto node : sampling_nodes_) {
RETURN_IF_NOT_OK(node.second->ChangeFileMode());
RETURN_IF_NOT_OK(node.second->ChangeFileMode(dir_path, rank_id));
}
MS_LOG(INFO) << "Change file mode end.";
return Status::OK();
@ -642,23 +606,7 @@ Status ProfilingManager::Reset() {
return Status::OK();
}
Status ProfilingManager::Init(const std::string &profile_data_path = "") {
// Validate input profile data path
CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set.");
CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid.");
char real_path[PATH_MAX] = {0};
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#else
if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) {
RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
}
#endif
dir_path_ = real_path;
Status ProfilingManager::Init() {
if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
profiling_state_ = ProfilingState::kProfilingStateUnBegun;
}
@ -666,7 +614,7 @@ Status ProfilingManager::Init(const std::string &profile_data_path = "") {
// Enable profiling
enabled_ = true;
MS_LOG(INFO) << "MD profiler is initialized with directory path: " << dir_path_;
MS_LOG(INFO) << "MD profiler is initialized successfully.";
return Status::OK();
}
@ -708,6 +656,48 @@ Status ProfilingManager::Stop() {
return Status::OK();
}
Status ProfilingManager::Save(const std::string &profile_data_path) {
// Validate input profile data path
CHECK_FAIL_RETURN_UNEXPECTED(!profile_data_path.empty(), "Invalid parameter, Profiling directory is not set.");
CHECK_FAIL_RETURN_UNEXPECTED(profile_data_path.size() < PATH_MAX, "Invalid file, Profiling directory is invalid.");
// profiling file: <profile_data_path>/filename_rank_id.suffix
char real_path[PATH_MAX] = {0};
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(real_path, common::SafeCStr(profile_data_path), PATH_MAX) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#else
if (realpath(common::SafeCStr(profile_data_path), real_path) == nullptr) {
RETURN_STATUS_UNEXPECTED("Invalid file, can not get realpath of Profiling directory.");
}
#endif
std::string rank_id;
#ifdef ENABLE_GPUQUE
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
int32_t rank_id_int = cfg->rank_id();
// If DEVICE_ID is not set, default value is 0
if (rank_id_int < 0) {
rank_id = common::GetEnv("DEVICE_ID");
} else {
rank_id = std::to_string(rank_id_int);
}
#else
rank_id = common::GetEnv("RANK_ID");
#endif
// If RANK_ID is not set, default value is 0
if (rank_id.empty()) {
rank_id = "0";
}
// Output all profiling data upon request.
RETURN_IF_NOT_OK(Analyze());
RETURN_IF_NOT_OK(SaveProfilingData(std::string(profile_data_path), rank_id));
RETURN_IF_NOT_OK(ChangeFileMode(std::string(profile_data_path), rank_id));
return Status::OK();
}
uint64_t ProfilingTime::GetCurMilliSecond() {
// because cpplint does not allow using namespace
using std::chrono::duration_cast;

View File

@ -16,13 +16,15 @@
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_PROFILE_H_
#include <string>
#include <vector>
#include <unordered_map>
#include <memory>
#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
#include <nlohmann/json.hpp>
#include "minddata/dataset/util/path.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/engine/perf/monitor.h"
@ -50,15 +52,15 @@ class Profiling : std::enable_shared_from_this<Profiling> {
// Destructor
virtual ~Profiling() = default;
virtual Status Init(const std::string &dir_path, const std::string &device_id) = 0;
virtual Status Init() = 0;
// Default serialization file generator
virtual Status SaveToFile() = 0;
virtual Status SaveToFile(const std::string &dir_path, const std::string &rank_id) = 0;
// Profiling name
virtual std::string Name() const = 0;
virtual Status ChangeFileMode() = 0;
virtual Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) = 0;
// Start collecting data
Status Start();
@ -68,8 +70,8 @@ class Profiling : std::enable_shared_from_this<Profiling> {
protected:
bool active_; // show current state of ProfilingManager (running, or paused)
std::string file_path_;
std::mutex lock_;
virtual Path GetFileName(const std::string &dir_path, const std::string &rank_id) = 0;
};
// Sampling is a class of profiling which generate samples periodically.
@ -80,7 +82,6 @@ class Sampling : public Profiling {
// virtual Status TestPrint() = 0;
virtual Status Analyze() = 0;
virtual ~Sampling() = default;
Status ReadJson(nlohmann::json *output);
};
typedef struct TracingRecord_s {
@ -101,8 +102,8 @@ class Tracing : public Profiling {
public:
// Tracing has minimal interface to provide flexible on data recording.
// It only includes some common routines.
Status SaveToFile() override;
Status ChangeFileMode() override;
Status SaveToFile(const std::string &dir_path, const std::string &rank_id) override;
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id) override;
virtual Status GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
virtual Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
virtual Status GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
@ -149,9 +150,9 @@ class ProfilingManager {
Status Reset();
// Save profile data to file
// @param final_round The boolean to show if the monitor thread is terminating.
// @param dir_path_ The path to the directory where the profiling data will be saved.
// @return Status The status code returned
Status SaveProfilingData(const bool final_round = false);
Status SaveProfilingData(const std::string &dir_path, const std::string &rank_id);
// Sampling node getter
// @param name - The name of the requested node
@ -177,14 +178,12 @@ class ProfilingManager {
// Launch monitoring thread.
Status LaunchMonitor();
// @param final_round The boolean to show if the monitor thread is terminating.
// @return Status The status code returned
Status ChangeFileMode(const bool final_round = false);
Status ChangeFileMode(const std::string &dir_path, const std::string &rank_id);
// Analyze profile data and print warning messages
// @param final_round The boolean to show if the monitor thread is terminating.
// @return Status The status code returned
Status Analyze(const bool final_round = false);
Status Analyze();
#ifndef ENABLE_ANDROID
/// \brief API to get User CPU utilization for the system
@ -424,7 +423,7 @@ class ProfilingManager {
/// \brief API to initialize profiling manager
/// \return Status object with the error code
Status Init(const std::string &profile_data_path);
Status Init();
/// \brief API to signal the profiling nodes to start collecting data
/// \return Status object with the error code
@ -434,6 +433,10 @@ class ProfilingManager {
/// \return Status object with the error code
Status Stop();
/// \brief API to save to file all the collected data between Start and Stop calls
/// \return Status object with the error code
Status Save(const std::string &profile_data_path);
private:
std::unique_ptr<Monitor> perf_monitor_;
@ -444,12 +447,10 @@ class ProfilingManager {
kProfilingStateFinished,
};
ProfilingState profiling_state_; // show current state of ProfilingManager (running, or paused)
bool enabled_;
std::atomic<bool> enabled_;
std::unordered_map<std::string, std::shared_ptr<Tracing>> tracing_nodes_;
std::unordered_map<std::string, std::shared_ptr<Sampling>> sampling_nodes_;
ExecutionTree *tree_; // ExecutionTree pointer
std::string dir_path_; // where to create profiling file
std::string device_id_; // used when create profiling file,filename_device_id.suffix
std::vector<uint64_t> epoch_end_ts_; // End of epoch timestamp
std::vector<uint32_t> epoch_end_step_; // End of epoch step number

View File

@ -25,7 +25,6 @@ Common imported modules in corresponding API examples are as follows:
import os
import platform
import random
import time
import numpy
import mindspore._c_dataengine as cde
from mindspore import log as logger
@ -411,20 +410,6 @@ def load(file):
_config.load(file)
def _stop_dataset_profiler():
"""
Mainly for stop dataset profiler.
Returns:
bool, whether the profiler file has generated.
"""
while not _config.get_profiler_file_status():
_config.stop_dataset_profiler(True)
logger.warning("Profiling: waiting for dataset part profiling stop.")
time.sleep(1)
def get_enable_shared_mem():
"""
Get the default state of shared mem enabled variable.

View File

@ -23,7 +23,6 @@ from mindspore import log as logger, context
from mindspore.communication.management import GlobalComm, release, get_rank, get_group_size
import mindspore._c_expression as c_expression
import mindspore._c_dataengine as cde
from mindspore.dataset.core.config import _stop_dataset_profiler
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException, ProfilerException, ProfilerRawFileException
from mindspore.profiler.common.util import get_file_path, fwrite_format
@ -144,7 +143,7 @@ class Profiler:
# Setup and start MindData Profiling
self._md_profiler = cde.GlobalContext.profiling_manager()
self._md_profiler.init(self._output_path)
self._md_profiler.init()
self._md_profiler.start()
if self._device_target:
@ -253,7 +252,7 @@ class Profiler:
_environment_check()
self._cpu_profiler.stop()
self._md_profiler.stop()
_stop_dataset_profiler()
self._md_profiler.save(self._output_path)
if self._device_target and self._device_target == "GPU":
self._gpu_analyse()

View File

@ -50,7 +50,7 @@ TEST_F(MindDataTestProfiler, TestProfilerManager1) {
// Enable profiler and check
common::SetEnv("RANK_ID", "1");
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
EXPECT_OK(profiler_manager->Init("."));
EXPECT_OK(profiler_manager->Init());
EXPECT_OK(profiler_manager->Start());
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
@ -95,12 +95,13 @@ TEST_F(MindDataTestProfiler, TestProfilerManager1) {
// Manually terminate the pipeline
iter->Stop();
// File_id is expected to equal RANK_ID
EXPECT_OK(DeleteFiles(1));
// Disable profiler
// Stop MindData Profiling and save output files to current working directory
EXPECT_OK(profiler_manager->Stop());
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
EXPECT_OK(profiler_manager->Save("."));
// File_id is expected to equal RANK_ID
EXPECT_OK(DeleteFiles(1));
}
/// Feature: MindData Profiling Support
@ -112,7 +113,7 @@ TEST_F(MindDataTestProfiler, TestProfilerManager2) {
// Enable profiler and check
common::SetEnv("RANK_ID", "2");
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
EXPECT_OK(profiler_manager->Init("."));
EXPECT_OK(profiler_manager->Init());
EXPECT_OK(profiler_manager->Start());
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
@ -147,12 +148,13 @@ TEST_F(MindDataTestProfiler, TestProfilerManager2) {
// Manually terminate the pipeline
iter->Stop();
// File_id is expected to equal RANK_ID
EXPECT_OK(DeleteFiles(2));
// Disable profiler
// Stop MindData Profiling and save output files to current working directory
EXPECT_OK(profiler_manager->Stop());
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
EXPECT_OK(profiler_manager->Save("."));
// File_id is expected to equal RANK_ID
EXPECT_OK(DeleteFiles(2));
}
} // namespace test
} // namespace dataset

View File

@ -77,8 +77,8 @@ class TestMinddataProfilingManager:
os.environ['RANK_ID'] = file_id
os.environ['DEVICE_ID'] = file_id
# Initialize MindData profiling manager with current working directory
self.md_profiler.init("./")
# Initialize MindData profiling manager
self.md_profiler.init()
# Start MindData Profiling
self.md_profiler.start()
@ -87,8 +87,6 @@ class TestMinddataProfilingManager:
"""
Run after each test function.
"""
# Stop MindData Profiling
self.md_profiler.stop()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
@ -155,6 +153,10 @@ class TestMinddataProfilingManager:
for _ in data1:
pass
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm profiling files now exist
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
@ -186,6 +188,10 @@ class TestMinddataProfilingManager:
for _ in data3:
pass
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
@ -238,6 +244,10 @@ class TestMinddataProfilingManager:
assert num_iter == 10
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm pipeline is created with EpochCtrl op
with open(pipeline_file) as f:
data = json.load(f)
@ -280,6 +290,10 @@ class TestMinddataProfilingManager:
for _ in data1:
pass
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
@ -316,6 +330,10 @@ class TestMinddataProfilingManager:
ds.config.set_monitor_sampling_interval(interval_origin)
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
def test_profiling_basic_pipeline(self):
"""
Test with this basic pipeline
@ -347,6 +365,10 @@ class TestMinddataProfilingManager:
assert num_iter == 1000
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
@ -394,6 +416,10 @@ class TestMinddataProfilingManager:
assert num_iter == 750
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
@ -434,11 +460,20 @@ class TestMinddataProfilingManager:
num_iter += 1
assert num_iter == 2
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm pipeline file and CPU util file each have 3 ops
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
self.confirm_cpuutil(3, cpu_util_file)
# Test B - Call create_dict_iterator with num_epochs=1
# Initialize and Start MindData profiling manager
self.md_profiler.init()
self.md_profiler.start()
num_iter = 0
# Note: If create_dict_iterator() is called with num_epochs=1,
# then EpochCtrlOp should not be NOT added to the pipeline
@ -446,6 +481,10 @@ class TestMinddataProfilingManager:
num_iter += 1
assert num_iter == 2
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm pipeline file and CPU util file each have 2 ops
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
self.confirm_cpuutil(2, cpu_util_file)
@ -473,11 +512,20 @@ class TestMinddataProfilingManager:
num_iter += 1
assert num_iter == 4
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm pipeline file and CPU util file each have 2 ops
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
self.confirm_cpuutil(2, cpu_util_file)
# Test B - Call create_dict_iterator with num_epochs>1
# Initialize and Start MindData profiling manager
self.md_profiler.init()
self.md_profiler.start()
num_iter = 0
# Note: If create_dict_iterator() is called with num_epochs>1,
# then EpochCtrlOp should be added to the pipeline
@ -485,6 +533,10 @@ class TestMinddataProfilingManager:
num_iter += 1
assert num_iter == 4
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm pipeline file and CPU util file each have 3 ops
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
self.confirm_cpuutil(3, cpu_util_file)
@ -511,17 +563,30 @@ class TestMinddataProfilingManager:
num_iter += 1
assert num_iter == 4
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm pipeline file and CPU util file each have 2 ops
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
self.confirm_cpuutil(2, cpu_util_file)
# Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
# Initialize and Start MindData profiling manager
self.md_profiler.init()
self.md_profiler.start()
data2 = data2.repeat(5)
num_iter = 0
for _ in data2.create_dict_iterator(num_epochs=1):
num_iter += 1
assert num_iter == 20
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save('./')
# Confirm pipeline file and CPU util file each have 3 ops
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)
self.confirm_cpuutil(3, cpu_util_file)

View File

@ -55,7 +55,6 @@ class TestMinddataProfilingAnalyzer:
'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
def setup_method(self):
"""
Run before each test function.
@ -81,19 +80,16 @@ class TestMinddataProfilingAnalyzer:
os.environ['RANK_ID'] = file_id
os.environ['DEVICE_ID'] = file_id
# Initialize MindData profiling manager with current working directory
self.md_profiler.init(os.getcwd())
# Initialize MindData profiling manager
self.md_profiler.init()
# Start MindData Profiling
self.md_profiler.start()
def teardown_method(self):
"""
Run after each test function.
"""
# Stop MindData Profiling
self.md_profiler.stop()
file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_id = file_name_map_rank_id[file_name]
@ -219,6 +215,10 @@ class TestMinddataProfilingAnalyzer:
# Confirm number of rows returned
assert num_iter == 1000
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save(os.getcwd())
# Confirm MindData Profiling files are created
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
@ -279,6 +279,10 @@ class TestMinddataProfilingAnalyzer:
# Confirm number of rows returned
assert num_iter == 125
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save(os.getcwd())
# Confirm MindData Profiling files are created
assert os.path.exists(pipeline_file) is True
assert os.path.exists(cpu_util_file) is True
@ -286,6 +290,11 @@ class TestMinddataProfilingAnalyzer:
# Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1
# Note: This pipeline has 3 ops: Generator -> Map -> Batch
# Initialize and Start MindData profiling manager
self.md_profiler.init()
self.md_profiler.start()
num_iter = 0
# Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
for _ in data1.create_dict_iterator(num_epochs=1):
@ -294,6 +303,10 @@ class TestMinddataProfilingAnalyzer:
# Confirm number of rows returned
assert num_iter == 125
# Stop MindData Profiling and save output files to current working directory
self.md_profiler.stop()
self.md_profiler.save(os.getcwd())
# Confirm MindData Profiling files are created
# Note: There is an MD bug in which which the pipeline file is not recreated;
# it still has 4 ops instead of 3 ops