diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc index c977016cf22..28043b28b4d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc @@ -156,7 +156,8 @@ Status MapOp::operator()() { } // The operator class just starts off threads by calling the tree_ function - rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID()); + rc = + tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID(), id()); // Synchronize with TaskManager TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(rc); diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc index 63139b7e36a..122d6119b91 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc @@ -109,5 +109,7 @@ Status ConnectorSize::Init(const std::string &dir_path, const std::string &devic file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString(); return Status::OK(); } + +Status ConnectorSize::Analyze() { return Status::OK(); } } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h index 09cb1e8a308..376bbc4c640 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.h @@ -31,7 +31,7 @@ class ExecutionTree; // Connector size sampling samples the output connector size of each op in the pipeline. // It support JSON serialization for external usage. class ConnectorSize : public Sampling { - // Connecto size sampling data is stored as a 2D vector + // Connector size sampling data is stored as a 2D vector // op_0 ... op_m // sample_0 size_0_0 ... size_m_0 // ... ... ... ... @@ -58,12 +58,14 @@ class ConnectorSize : public Sampling { Status Init(const std::string &dir_path, const std::string &device_id) override; - // Parse op infomation and transform to json format + // Parse op information and transform to json format json ParseOpInfo(const DatasetOp &node, const std::vector &size); // Change file mode after save throughput data Status ChangeFileMode() { return Status::OK(); } + Status Analyze() override; + private: ExecutionTree *tree_ = nullptr; // ExecutionTree pointer ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc index 5cc15750384..c18d2323ee7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc @@ -150,5 +150,7 @@ Status ConnectorThroughput::ChangeFileMode() { } return Status::OK(); } + +Status ConnectorThroughput::Analyze() { return Status::OK(); } } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.h b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.h index 2a7c2be25ad..d8cde89b331 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.h @@ -74,6 +74,8 @@ class ConnectorThroughput : public Sampling { Status ChangeFileMode() override; + Status Analyze() override; + private: ExecutionTree *tree_ = nullptr; // ExecutionTree pointer int64_t max_rows_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc index d2d788cd9e4..de73df5e455 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc @@ -135,6 +135,27 @@ Status DeviceCpu::Collect(ExecutionTree *tree) { first_collect_ = false; return Status::OK(); } +Status DeviceCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) { + *name = std::string("device_info"); + int total_samples = cpu_util_.size(); + int sum = 0; + // Only analyze the middle half of the samples + // Starting and ending may be impacted by startup or ending pipeline activities + int start_analyze = total_samples / 4; + int end_analyze = total_samples - start_analyze; + + for (int i = start_analyze; i < end_analyze; i++) { + sum += cpu_util_[i].user_utilization_; + sum += cpu_util_[i].sys_utilization_; + } + + // Note device utilization is already in range of 0-1, so don't + // need to divide by number of CPUS + if ((end_analyze - start_analyze) > 0) { + *utilization = sum / (end_analyze - start_analyze); + } + return Status::OK(); +} Status DeviceCpu::SaveToFile(const std::string &file_path) { Path path = Path(file_path); @@ -236,6 +257,8 @@ Status OperatorCpu::Collect(ExecutionTree *tree) { if (first_collect_) { for (auto iter = tree->begin(); iter != tree->end(); ++iter) { id_count++; + op_name[iter->id()] = iter->NameWithID(); + op_parallel_workers[iter->id()] = iter->num_workers(); } #if defined(USING_LINUX) cpu_processor_num = get_nprocs_conf(); @@ -327,6 +350,37 @@ Status OperatorCpu::Collect(ExecutionTree *tree) { return Status::OK(); } +Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) { + int total_samples = cpu_op_util_.size(); + + // Only analyze the middle half of the samples + // Starting and ending may be impacted by startup or ending pipeline activities + int start_analyze = total_samples / 4; + int end_analyze = total_samples - start_analyze; + double op_util; + *utilization = 0; + + // start loop from 0 was as don't want to analyze op -1 + for (auto op_id = 0; op_id < id_count; op_id++) { + int sum = 0; + int index = op_id + 1; + for (int i = start_analyze; i < end_analyze; i++) { + sum += cpu_op_util_[i][index].user_utilization_; + sum += cpu_op_util_[i][index].sys_utilization_; + } + if ((end_analyze - start_analyze) > 0) { + op_util = 1.0 * sum * cpu_processor_num / (op_parallel_workers[op_id] * (end_analyze - start_analyze)); + } + if (op_util > *utilization) { + *utilization = op_util; + *name = op_name[op_id]; + } + extra_message->append(op_name[op_id] + " utiliization per thread: " + std::to_string(op_util) + "% (" + + std::to_string(op_parallel_workers[op_id]) + " parallel_workers); "); + } + return Status::OK(); +} + Status OperatorCpu::SaveToFile(const std::string &file_path) { Path path = Path(file_path); json output; @@ -453,6 +507,26 @@ Status ProcessCpu::Collect(ExecutionTree *tree) { return Status::OK(); } +Status ProcessCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) { + *name = std::string("process_info"); + int total_samples = process_util_.size(); + int sum = 0; + // Only analyze the middle half of the samples + // Starting and ending may be impacted by startup or ending pipeline activities + int start_analyze = total_samples / 4; + int end_analyze = total_samples - start_analyze; + + for (int i = start_analyze; i < end_analyze; i++) { + sum += process_util_[i].user_utilization_; + sum += process_util_[i].sys_utilization_; + } + + if ((end_analyze - start_analyze) > 0) { + *utilization = sum / (end_analyze - start_analyze); + } + return Status::OK(); +} + Status ProcessCpu::SaveToFile(const std::string &file_path) { Path path = Path(file_path); json output; @@ -529,6 +603,37 @@ Status CpuSampling::SaveSamplingItervalToFile() { return Status::OK(); } +// Analyze profiling data and output warning messages +Status CpuSampling::Analyze() { + std::string name; + double utilization = 0; + + // Keep track of specific information returned by differentn CPU sampling types + double total_utilization = 0; + double max_op_utilization = 0; + std::string max_op_name; + std::string detailed_op_cpu_message; + + // Save cpu information to json file + for (auto cpu : cpu_) { + std::string extra_message; + RETURN_IF_NOT_OK(cpu->Analyze(&name, &utilization, &extra_message)); + if (name == "device_info") { + total_utilization = utilization; + } else if (name != "process_info") { + max_op_utilization = utilization; + max_op_name = name; + detailed_op_cpu_message = extra_message; + } + } + if ((total_utilization < 90) && (max_op_utilization > 80)) { + MS_LOG(WARNING) << "Operator " << max_op_name << " is using " << max_op_utilization << "% CPU per thread. " + << "This operator may benefit from increasing num_parallel_workers." + << "Full Operator CPU utiliization for all operators: " << detailed_op_cpu_message << std::endl; + } + return Status::OK(); +} + // Save profiling data to file Status CpuSampling::SaveToFile() { // Save time stamp to json file diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h index a987f0d898f..6cfef40d3a3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h @@ -71,6 +71,7 @@ class BaseCpu { // Collect CPU information virtual Status Collect(ExecutionTree *tree) = 0; virtual Status SaveToFile(const std::string &file_path) = 0; + virtual Status Analyze(std::string *name, double *utilization, std::string *extra_message) = 0; protected: std::vector cpu_util_; @@ -90,6 +91,7 @@ class DeviceCpu : public BaseCpu { ~DeviceCpu() = default; Status Collect(ExecutionTree *tree) override; Status SaveToFile(const std::string &file_path) override; + Status Analyze(std::string *name, double *utilization, std::string *extra_message) override; private: // Get CPU information, include use/sys/idle/io utilization @@ -115,6 +117,11 @@ class OperatorCpu : public BaseCpu { ~OperatorCpu() = default; Status Collect(ExecutionTree *tree) override; Status SaveToFile(const std::string &file_path) override; + // Analyze will output the name of the metric, the avg utiliization of highest + // object within the class and any extra message that would be useful for the user. + // The Higher level CPUSampling class will combine information from different classes + // to decide if warning should be output. + Status Analyze(std::string *name, double *utilization, std::string *extra_message) override; private: // Get cpu information, include use/sys/idle/io utilization @@ -131,6 +138,8 @@ class OperatorCpu : public BaseCpu { // Store the id and its corresponding threads. std::unordered_map> op_thread; + std::unordered_map op_name; + std::unordered_map op_parallel_workers; std::unordered_map> pre_op_stat_; uint64_t pre_total_stat_; int32_t id_count = 0; @@ -143,6 +152,7 @@ class ProcessCpu : public BaseCpu { ~ProcessCpu() = default; Status Collect(ExecutionTree *tree) override; Status SaveToFile(const std::string &file_path) override; + Status Analyze(std::string *name, double *utilization, std::string *extra_message) override; private: // Get CPU information, include use/sys/idle/io utilization @@ -183,6 +193,9 @@ class CpuSampling : public Sampling { // Change file mode after save CPU data Status ChangeFileMode() override; + // Analyze sampling data and print message to log + Status Analyze() override; + private: Status CollectTimeStamp(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc index fac5f9a1f9e..16fdc6b593d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc @@ -45,6 +45,7 @@ Status Monitor::operator()() { } // Output all profiling data upon request. + RETURN_IF_NOT_OK(tree_->GetProfilingManager()->Analyze()); RETURN_IF_NOT_OK(tree_->GetProfilingManager()->SaveProfilingData()); RETURN_IF_NOT_OK(tree_->GetProfilingManager()->ChangeFileMode()); return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc index a859095e8cc..2f5ee8e7ebf 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc @@ -157,6 +157,16 @@ Status ProfilingManager::SaveProfilingData() { MS_LOG(INFO) << "Save profiling data end."; return Status::OK(); } +Status ProfilingManager::Analyze() { + if (!IsProfilingEnable()) { + return Status::OK(); + } + MS_LOG(INFO) << "Start to analyze profiling data."; + for (auto node : sampling_nodes_) { + RETURN_IF_NOT_OK(node.second->Analyze()); + } + return Status::OK(); +} Status ProfilingManager::ChangeFileMode() { if (!IsProfilingEnable()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h index 0e267c770ec..94ad85e2c95 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h @@ -65,6 +65,7 @@ class Sampling : public Profiling { // Sampling action function. This function will be invoked by performance monitor thread. virtual Status Sample() = 0; // virtual Status TestPrint() = 0; + virtual Status Analyze() = 0; virtual ~Sampling() = default; }; @@ -118,6 +119,9 @@ class ProfilingManager { Status ChangeFileMode(); + // Analyze profile data and print warning messages + Status Analyze(); + private: std::unique_ptr perf_monitor_; bool enabled_;