From 9cbb45bcc4e3d01b2b01f5ad610710faf10eeed0 Mon Sep 17 00:00:00 2001 From: luoyang Date: Wed, 1 Dec 2021 11:51:19 +0800 Subject: [PATCH] close autotune in non-sink mode --- .../minddata/dataset/engine/perf/auto_tune.cc | 33 ++++++++++++------- tests/ut/python/dataset/test_autotune.py | 5 +++ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc index 865d672c768..98d2f1e2959 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc @@ -32,14 +32,14 @@ AutoTune::AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr) Status AutoTune::Main() { TaskManager::FindMe()->Post(); - MS_LOG(INFO) << "AutoTune thread has started."; + MS_LOG(INFO) << "Dataset AutoTune thread has started."; std::unique_lock _lock(mux_); cur_epoch_ = 1; Status rc; while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) { rc = RunIteration(); if (rc.IsError()) { - MS_LOG(ERROR) << "AutoTune failed and will exit with the following error: " << rc; + MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc; break; } rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval()); @@ -49,9 +49,13 @@ Status AutoTune::Main() { } } RETURN_IF_NOT_OK(profiling_manager_->Stop()); - MS_LOG(INFO) << "AutoTune thread is finished."; + MS_LOG(INFO) << "Dataset AutoTune thread is finished."; MS_LOG(INFO) << "Printing final tree configuration"; PrintTreeConfiguration(); + MS_LOG(INFO) << "Suggest to set proper num_parallel_workers for each Operation or use global setting API: " + << "mindspore.dataset.config.set_num_parallel_workers"; + MS_LOG(INFO) << "Suggest to choose maximum prefetch_size from tuned result and set by global setting API: " + << "mindspore.dataset.config.set_prefetch_size"; return Status::OK(); } @@ -59,14 +63,14 @@ void AutoTune::PrintTreeConfiguration() { ExecutionTree *tree = tree_adapter_->tree_.get(); for (auto itr = tree->begin(); itr != tree->end(); itr++) { if (!itr->inlined() && itr->Name() != "DeviceQueueOp") { - MS_LOG(INFO) << itr->NameWithID() << " num_workers: " << itr->NumWorkers() - << " connector_capacity: " << itr->ConnectorCapacity(); + MS_LOG(INFO) << itr->NameWithID() << " num_parallel_workers: " << itr->NumWorkers() + << " prefetch_size: " << itr->ConnectorCapacity(); } } } Status AutoTune::LaunchThread() { - MS_LOG(INFO) << "Launching AutoTune thread"; + MS_LOG(INFO) << "Launching Dataset AutoTune thread"; RETURN_IF_NOT_OK(CollectOpsInfo()); RETURN_IF_NOT_OK(cv_.Register(tree_adapter_->AllTasks()->GetIntrpService())); RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this))); @@ -89,7 +93,7 @@ Status AutoTune::CollectOpsInfo() { if (parallel_ops_ids_.size() != 0) { CHECK_FAIL_RETURN_UNEXPECTED(parallel_ops_ids_[parallel_ops_ids_.size() - 1] != 0, - "Non-sink pipeline, root node is a ParallelOp. AutoTune is not supported."); + "Non-sink pipeline, root node is a ParallelOp. Dataset AutoTune is not supported."); } return Status::OK(); @@ -177,9 +181,14 @@ double AutoTune::Mean(const std::vector &items) { } Status AutoTune::RunIteration() { + // Close AutoTune in Non-sink mode, since it's not ready for test. + if (!IsSink()) { + MS_LOG(ERROR) << "Dataset AutoTune doesn't support non-sink pipeline."; + return Status(StatusCode::kMDUnexpectedError, "Dataset AutoTune doesn't support non-sink pipeline."); + } // Run every epoch if ((profiling_manager_->GetNumOfProfiledEpochs()) >= cur_epoch_) { - MS_LOG(INFO) << "Run AutoTune at epoch #" << cur_epoch_; + MS_LOG(INFO) << "Run Dataset AutoTune at epoch #" << cur_epoch_; RETURN_IF_NOT_OK(RunIterationEpoch()); ++cur_epoch_; } @@ -241,8 +250,8 @@ Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int3 new_workers = std::min(new_workers, max_workers_); new_workers = std::max(new_workers, MIN_NUM_WORKERS); RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared(new_workers))); - MS_LOG(WARNING) << "Added request to change number of workers of Operator: " << ops_[op_id]->NameWithID() - << " New value: " << new_workers << " Old value: " << old_workers; + MS_LOG(WARNING) << "Added request to change \"num_parallel_workers\" of Operator: " << ops_[op_id]->NameWithID() + << "From old value: [" << old_workers << "] to new value: [" << new_workers << "]."; return Status::OK(); } @@ -251,8 +260,8 @@ Status AutoTune::RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, new_size = std::max(new_size, MIN_QUEUE_SIZE); RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared(new_size))); - MS_LOG(WARNING) << "Added request to change Connector capacity of Operator: " << ops_[op_id]->NameWithID() - << " New value: " << new_size << " Old value: " << old_size; + MS_LOG(WARNING) << "Added request to change \"prefetch_size\" of Operator: " << ops_[op_id]->NameWithID() + << "From old value: [" << old_size << "] to new value: [" << new_size << "]."; return Status::OK(); } diff --git a/tests/ut/python/dataset/test_autotune.py b/tests/ut/python/dataset/test_autotune.py index d905033c92a..6c5cfd027d0 100644 --- a/tests/ut/python/dataset/test_autotune.py +++ b/tests/ut/python/dataset/test_autotune.py @@ -88,6 +88,7 @@ class TestAutotuneWithProfiler: md_profiler.stop() ds.config.set_enable_autotune(True) + @pytest.mark.skip(reason="close non-sink") def test_autotune_with_2_pipeline(self, capfd): """ Feature: Autotuning @@ -112,6 +113,7 @@ class TestAutotuneWithProfiler: ds.config.set_enable_autotune(False) + @pytest.mark.skip(reason="close non-sink") def test_delayed_autotune_with_2_pipeline(self, capfd): """ Feature: Autotuning @@ -134,6 +136,7 @@ class TestAutotuneWithProfiler: # sys.stdout.write(_) # sys.stderr.write(err) + @pytest.mark.skip(reason="close non-sink") def test_delayed_start_autotune_with_3_pipeline(self, capfd): """ Feature: Autotuning @@ -158,6 +161,7 @@ class TestAutotuneWithProfiler: # sys.stdout.write(_) # sys.stderr.write(err) + @pytest.mark.skip(reason="close non-sink") def test_autotune_before_profiler(self): """ Feature: Autotuning with Profiler @@ -228,3 +232,4 @@ class TestAutotuneWithProfiler: with pytest.raises(ValueError): ds.config.set_autotune_interval(-999) +