From 0086f4ee055c0985928c3b98959b6ba94e42ad67 Mon Sep 17 00:00:00 2001 From: harshvardhangupta Date: Tue, 22 Mar 2022 17:19:36 -0400 Subject: [PATCH] Modify the Dataset AutoTune config #31742 --- .../minddata/dataset/engine/perf/auto_tune.cc | 34 ++++++++++++------- .../minddata/dataset/engine/perf/auto_tune.h | 3 ++ .../ccsrc/minddata/dataset/engine/serdes.cc | 4 +-- .../minddata/dataset/engine/tree_modifier.h | 6 ++++ .../minddata/dataset/kernels/py_func_op.cc | 7 ++-- .../python/dataset/test_autotune_saveload.py | 4 +-- 6 files changed, 40 insertions(+), 18 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc index 717d74b7b29..5e150c6641f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc @@ -99,13 +99,7 @@ Status AutoTune::Main() { } } RETURN_IF_NOT_OK(profiling_manager_->Stop()); - MS_LOG(INFO) << "Dataset AutoTune thread is finished."; - MS_LOG(INFO) << "Printing final tree configuration"; - PrintTreeConfiguration(); - MS_LOG(INFO) << "Suggest to set proper num_parallel_workers for each Operation or use global setting API: " - << "mindspore.dataset.config.set_num_parallel_workers"; - MS_LOG(INFO) << "Suggest to choose maximum prefetch_size from tuned result and set by global setting API: " - << "mindspore.dataset.config.set_prefetch_size"; + PostMainLogging(); #ifndef ENABLE_ANDROID if (output_final_config && (SaveAutotuneConfig(autotune_json_filepath_).IsError())) { MS_LOG(WARNING) << "Failed to write final autotune configuration to disk"; @@ -124,8 +118,12 @@ Status AutoTune::SaveAutotuneConfig(const std::string &file_name) { RETURN_IF_NOT_OK(SummarizeTreeConfiguration(&summary)); nlohmann::json out_json; out_json["summary"] = summary; - out_json["pipeline"] = autotune_config_json_; - out_json["remark"] = "The following file has been auto-generated by the Dataset Autotune."; + out_json["tree"] = autotune_config_json_; + std::string remark_value = "The following file has been auto-generated by the Dataset AutoTune."; + if (tree_modifier_->GetRequestsCount() == 0) { + remark_value += " Dataset Pipeline is not the bottleneck. No configuration changes were made by Dataset AutoTune."; + } + out_json["remark"] = remark_value; RETURN_IF_NOT_OK(Serdes::SaveJSONToFile(out_json, file_name, true)); return Status::OK(); } @@ -150,13 +148,12 @@ Status AutoTune::SetAutotuneConfigJson() { Status AutoTune::SummarizeTreeConfiguration(std::vector *out) { constexpr int op_name_width = 20; constexpr int val_width = 2; - auto num_ops = ops_.size(); - for (int i = 0; i < num_ops; ++i) { + for (int i = ops_.size() - 1; i >= 0; --i) { const auto op = ops_[i]; if (!op->inlined() && op->Name() != "DeviceQueueOp") { std::stringstream s; s << std::left << std::setw(op_name_width) << op->NameWithID() << "(num_parallel_workers:" << std::right - << std::setw(val_width) << op->NumWorkers() << ", connector_queue_size:" << std::setw(val_width) + << std::setw(val_width) << op->NumWorkers() << ", prefetch_size:" << std::setw(val_width) << op->ConnectorCapacity() << ")"; (void)out->emplace_back(s.str()); } @@ -164,6 +161,19 @@ Status AutoTune::SummarizeTreeConfiguration(std::vector *out) { return Status::OK(); } +void AutoTune::PostMainLogging() const { + MS_LOG(INFO) << "Dataset AutoTune thread is finished."; + MS_LOG(INFO) << "Printing final tree configuration"; + PrintTreeConfiguration(); + // Print the suggestion in logs only if autotune requested some changes + if (tree_modifier_->GetRequestsCount() > 0) { + MS_LOG(INFO) << "Suggest to set proper num_parallel_workers for each Operation or use global setting API: " + << "mindspore.dataset.config.set_num_parallel_workers"; + MS_LOG(INFO) << "Suggest to choose maximum prefetch_size from tuned result and set by global setting API: " + << "mindspore.dataset.config.set_prefetch_size"; + } +} + void AutoTune::PrintTreeConfiguration() const { ExecutionTree const *tree = tree_adapter_->tree_.get(); for (auto itr = tree->begin(); itr != tree->end(); itr++) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h index b9e7e835e45..40c6cbdd8ee 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h @@ -53,6 +53,9 @@ class AutoTune { /// \brief Helper to print the tree configuration void PrintTreeConfiguration() const; + /// \brief Helper to print the logs after/post the main loop in AutoTune + void PostMainLogging() const; + /// \brief Helper to summarize the execution tree /// \param[out] out An output vector of string to store the summary /// \return Status object diff --git a/mindspore/ccsrc/minddata/dataset/engine/serdes.cc b/mindspore/ccsrc/minddata/dataset/engine/serdes.cc index 080ed1e4a38..20acaf8845f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/serdes.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/serdes.cc @@ -110,8 +110,8 @@ Status Serdes::Deserialize(const std::string &json_filepath, std::shared_ptrPushChangeRequest(change_request)); return Status::OK(); } + /// \brief Get the number of change requests received + /// \return Number of change requests received + uint64_t GetRequestsCount() const { return num_requests_; } + private: ExecutionTree *tree_; std::map> callbacks; + uint64_t num_requests_ = 0; // counter for number of requests received }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/kernels/py_func_op.cc b/mindspore/ccsrc/minddata/dataset/kernels/py_func_op.cc index 54da1454195..061d43c8713 100644 --- a/mindspore/ccsrc/minddata/dataset/kernels/py_func_op.cc +++ b/mindspore/ccsrc/minddata/dataset/kernels/py_func_op.cc @@ -127,8 +127,11 @@ Status PyFuncOp::CastOutput(const py::object &ret_py_obj, TensorRow *output) { Status PyFuncOp::to_json(nlohmann::json *out_json) { nlohmann::json args; - if (py_func_ptr_.attr("to_json")) { - args = nlohmann::json::parse(py_func_ptr_.attr("to_json")().cast()); + { + py::gil_scoped_acquire gil_acquire; + if (py_func_ptr_.attr("to_json")) { + args = nlohmann::json::parse(py_func_ptr_.attr("to_json")().cast()); + } } *out_json = args; return Status::OK(); diff --git a/tests/ut/python/dataset/test_autotune_saveload.py b/tests/ut/python/dataset/test_autotune_saveload.py index a9382055935..ecbe02eff85 100644 --- a/tests/ut/python/dataset/test_autotune_saveload.py +++ b/tests/ut/python/dataset/test_autotune_saveload.py @@ -32,9 +32,9 @@ def data_pipeline_same(file1, file2): assert file2.exists() with file1.open() as f1, file2.open() as f2: pipeline1 = json.load(f1) - pipeline1 = pipeline1["pipeline"] if "pipeline" in pipeline1 else pipeline1 + pipeline1 = pipeline1["tree"] if "tree" in pipeline1 else pipeline1 pipeline2 = json.load(f2) - pipeline2 = pipeline2["pipeline"] if "pipeline" in pipeline2 else pipeline2 + pipeline2 = pipeline2["tree"] if "tree" in pipeline2 else pipeline2 return pipeline1 == pipeline2