Add analysis of profiling to warn if dataset parallel_workers should be increased

This message will currently only be printed if profiling of minddata is enabled.
Profiling of minddata is not enabled by default.
This commit is contained in:
RobinGrosman 2021-03-15 13:26:35 -07:00
parent 7838f8570b
commit c616150fd6
10 changed files with 145 additions and 3 deletions

View File

@ -156,7 +156,8 @@ Status MapOp::operator()() {
}
// The operator class just starts off threads by calling the tree_ function
rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID());
rc =
tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID(), id());
// Synchronize with TaskManager
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(rc);

View File

@ -109,5 +109,7 @@ Status ConnectorSize::Init(const std::string &dir_path, const std::string &devic
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
return Status::OK();
}
Status ConnectorSize::Analyze() { return Status::OK(); }
} // namespace dataset
} // namespace mindspore

View File

@ -31,7 +31,7 @@ class ExecutionTree;
// Connector size sampling samples the output connector size of each op in the pipeline.
// It support JSON serialization for external usage.
class ConnectorSize : public Sampling {
// Connecto size sampling data is stored as a 2D vector
// Connector size sampling data is stored as a 2D vector
// op_0 ... op_m
// sample_0 size_0_0 ... size_m_0
// ... ... ... ...
@ -58,12 +58,14 @@ class ConnectorSize : public Sampling {
Status Init(const std::string &dir_path, const std::string &device_id) override;
// Parse op infomation and transform to json format
// Parse op information and transform to json format
json ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size);
// Change file mode after save throughput data
Status ChangeFileMode() { return Status::OK(); }
Status Analyze() override;
private:
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling

View File

@ -150,5 +150,7 @@ Status ConnectorThroughput::ChangeFileMode() {
}
return Status::OK();
}
Status ConnectorThroughput::Analyze() { return Status::OK(); }
} // namespace dataset
} // namespace mindspore

View File

@ -74,6 +74,8 @@ class ConnectorThroughput : public Sampling {
Status ChangeFileMode() override;
Status Analyze() override;
private:
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
int64_t max_rows_;

View File

@ -135,6 +135,27 @@ Status DeviceCpu::Collect(ExecutionTree *tree) {
first_collect_ = false;
return Status::OK();
}
Status DeviceCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
*name = std::string("device_info");
int total_samples = cpu_util_.size();
int sum = 0;
// Only analyze the middle half of the samples
// Starting and ending may be impacted by startup or ending pipeline activities
int start_analyze = total_samples / 4;
int end_analyze = total_samples - start_analyze;
for (int i = start_analyze; i < end_analyze; i++) {
sum += cpu_util_[i].user_utilization_;
sum += cpu_util_[i].sys_utilization_;
}
// Note device utilization is already in range of 0-1, so don't
// need to divide by number of CPUS
if ((end_analyze - start_analyze) > 0) {
*utilization = sum / (end_analyze - start_analyze);
}
return Status::OK();
}
Status DeviceCpu::SaveToFile(const std::string &file_path) {
Path path = Path(file_path);
@ -236,6 +257,8 @@ Status OperatorCpu::Collect(ExecutionTree *tree) {
if (first_collect_) {
for (auto iter = tree->begin(); iter != tree->end(); ++iter) {
id_count++;
op_name[iter->id()] = iter->NameWithID();
op_parallel_workers[iter->id()] = iter->num_workers();
}
#if defined(USING_LINUX)
cpu_processor_num = get_nprocs_conf();
@ -327,6 +350,37 @@ Status OperatorCpu::Collect(ExecutionTree *tree) {
return Status::OK();
}
Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
int total_samples = cpu_op_util_.size();
// Only analyze the middle half of the samples
// Starting and ending may be impacted by startup or ending pipeline activities
int start_analyze = total_samples / 4;
int end_analyze = total_samples - start_analyze;
double op_util;
*utilization = 0;
// start loop from 0 was as don't want to analyze op -1
for (auto op_id = 0; op_id < id_count; op_id++) {
int sum = 0;
int index = op_id + 1;
for (int i = start_analyze; i < end_analyze; i++) {
sum += cpu_op_util_[i][index].user_utilization_;
sum += cpu_op_util_[i][index].sys_utilization_;
}
if ((end_analyze - start_analyze) > 0) {
op_util = 1.0 * sum * cpu_processor_num / (op_parallel_workers[op_id] * (end_analyze - start_analyze));
}
if (op_util > *utilization) {
*utilization = op_util;
*name = op_name[op_id];
}
extra_message->append(op_name[op_id] + " utiliization per thread: " + std::to_string(op_util) + "% (" +
std::to_string(op_parallel_workers[op_id]) + " parallel_workers); ");
}
return Status::OK();
}
Status OperatorCpu::SaveToFile(const std::string &file_path) {
Path path = Path(file_path);
json output;
@ -453,6 +507,26 @@ Status ProcessCpu::Collect(ExecutionTree *tree) {
return Status::OK();
}
Status ProcessCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) {
*name = std::string("process_info");
int total_samples = process_util_.size();
int sum = 0;
// Only analyze the middle half of the samples
// Starting and ending may be impacted by startup or ending pipeline activities
int start_analyze = total_samples / 4;
int end_analyze = total_samples - start_analyze;
for (int i = start_analyze; i < end_analyze; i++) {
sum += process_util_[i].user_utilization_;
sum += process_util_[i].sys_utilization_;
}
if ((end_analyze - start_analyze) > 0) {
*utilization = sum / (end_analyze - start_analyze);
}
return Status::OK();
}
Status ProcessCpu::SaveToFile(const std::string &file_path) {
Path path = Path(file_path);
json output;
@ -529,6 +603,37 @@ Status CpuSampling::SaveSamplingItervalToFile() {
return Status::OK();
}
// Analyze profiling data and output warning messages
Status CpuSampling::Analyze() {
std::string name;
double utilization = 0;
// Keep track of specific information returned by differentn CPU sampling types
double total_utilization = 0;
double max_op_utilization = 0;
std::string max_op_name;
std::string detailed_op_cpu_message;
// Save cpu information to json file
for (auto cpu : cpu_) {
std::string extra_message;
RETURN_IF_NOT_OK(cpu->Analyze(&name, &utilization, &extra_message));
if (name == "device_info") {
total_utilization = utilization;
} else if (name != "process_info") {
max_op_utilization = utilization;
max_op_name = name;
detailed_op_cpu_message = extra_message;
}
}
if ((total_utilization < 90) && (max_op_utilization > 80)) {
MS_LOG(WARNING) << "Operator " << max_op_name << " is using " << max_op_utilization << "% CPU per thread. "
<< "This operator may benefit from increasing num_parallel_workers."
<< "Full Operator CPU utiliization for all operators: " << detailed_op_cpu_message << std::endl;
}
return Status::OK();
}
// Save profiling data to file
Status CpuSampling::SaveToFile() {
// Save time stamp to json file

View File

@ -71,6 +71,7 @@ class BaseCpu {
// Collect CPU information
virtual Status Collect(ExecutionTree *tree) = 0;
virtual Status SaveToFile(const std::string &file_path) = 0;
virtual Status Analyze(std::string *name, double *utilization, std::string *extra_message) = 0;
protected:
std::vector<CpuUtil> cpu_util_;
@ -90,6 +91,7 @@ class DeviceCpu : public BaseCpu {
~DeviceCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
Status Analyze(std::string *name, double *utilization, std::string *extra_message) override;
private:
// Get CPU information, include use/sys/idle/io utilization
@ -115,6 +117,11 @@ class OperatorCpu : public BaseCpu {
~OperatorCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
// Analyze will output the name of the metric, the avg utiliization of highest
// object within the class and any extra message that would be useful for the user.
// The Higher level CPUSampling class will combine information from different classes
// to decide if warning should be output.
Status Analyze(std::string *name, double *utilization, std::string *extra_message) override;
private:
// Get cpu information, include use/sys/idle/io utilization
@ -131,6 +138,8 @@ class OperatorCpu : public BaseCpu {
// Store the id and its corresponding threads.
std::unordered_map<int32_t, std::vector<pid_t>> op_thread;
std::unordered_map<int32_t, std::string> op_name;
std::unordered_map<int32_t, int32_t> op_parallel_workers;
std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> pre_op_stat_;
uint64_t pre_total_stat_;
int32_t id_count = 0;
@ -143,6 +152,7 @@ class ProcessCpu : public BaseCpu {
~ProcessCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
Status Analyze(std::string *name, double *utilization, std::string *extra_message) override;
private:
// Get CPU information, include use/sys/idle/io utilization
@ -183,6 +193,9 @@ class CpuSampling : public Sampling {
// Change file mode after save CPU data
Status ChangeFileMode() override;
// Analyze sampling data and print message to log
Status Analyze() override;
private:
Status CollectTimeStamp();

View File

@ -45,6 +45,7 @@ Status Monitor::operator()() {
}
// Output all profiling data upon request.
RETURN_IF_NOT_OK(tree_->GetProfilingManager()->Analyze());
RETURN_IF_NOT_OK(tree_->GetProfilingManager()->SaveProfilingData());
RETURN_IF_NOT_OK(tree_->GetProfilingManager()->ChangeFileMode());
return Status::OK();

View File

@ -157,6 +157,16 @@ Status ProfilingManager::SaveProfilingData() {
MS_LOG(INFO) << "Save profiling data end.";
return Status::OK();
}
Status ProfilingManager::Analyze() {
if (!IsProfilingEnable()) {
return Status::OK();
}
MS_LOG(INFO) << "Start to analyze profiling data.";
for (auto node : sampling_nodes_) {
RETURN_IF_NOT_OK(node.second->Analyze());
}
return Status::OK();
}
Status ProfilingManager::ChangeFileMode() {
if (!IsProfilingEnable()) {

View File

@ -65,6 +65,7 @@ class Sampling : public Profiling {
// Sampling action function. This function will be invoked by performance monitor thread.
virtual Status Sample() = 0;
// virtual Status TestPrint() = 0;
virtual Status Analyze() = 0;
virtual ~Sampling() = default;
};
@ -118,6 +119,9 @@ class ProfilingManager {
Status ChangeFileMode();
// Analyze profile data and print warning messages
Status Analyze();
private:
std::unique_ptr<Monitor> perf_monitor_;
bool enabled_;