!27033 close autotune in non-sink mode

Merge pull request !27033 from luoyang/close_autotune_nonsink
This commit is contained in:
i-robot 2021-12-03 01:26:06 +00:00 committed by Gitee
commit 9cec03e3a8
2 changed files with 26 additions and 12 deletions

View File

@ -32,14 +32,14 @@ AutoTune::AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr)
Status AutoTune::Main() { Status AutoTune::Main() {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
MS_LOG(INFO) << "AutoTune thread has started."; MS_LOG(INFO) << "Dataset AutoTune thread has started.";
std::unique_lock<std::mutex> _lock(mux_); std::unique_lock<std::mutex> _lock(mux_);
cur_epoch_ = 1; cur_epoch_ = 1;
Status rc; Status rc;
while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) { while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) {
rc = RunIteration(); rc = RunIteration();
if (rc.IsError()) { if (rc.IsError()) {
MS_LOG(ERROR) << "AutoTune failed and will exit with the following error: " << rc; MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
break; break;
} }
rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval()); rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval());
@ -49,9 +49,13 @@ Status AutoTune::Main() {
} }
} }
RETURN_IF_NOT_OK(profiling_manager_->Stop()); RETURN_IF_NOT_OK(profiling_manager_->Stop());
MS_LOG(INFO) << "AutoTune thread is finished."; MS_LOG(INFO) << "Dataset AutoTune thread is finished.";
MS_LOG(INFO) << "Printing final tree configuration"; MS_LOG(INFO) << "Printing final tree configuration";
PrintTreeConfiguration(); PrintTreeConfiguration();
MS_LOG(INFO) << "Suggest to set proper num_parallel_workers for each Operation or use global setting API: "
<< "mindspore.dataset.config.set_num_parallel_workers";
MS_LOG(INFO) << "Suggest to choose maximum prefetch_size from tuned result and set by global setting API: "
<< "mindspore.dataset.config.set_prefetch_size";
return Status::OK(); return Status::OK();
} }
@ -59,14 +63,14 @@ void AutoTune::PrintTreeConfiguration() {
ExecutionTree *tree = tree_adapter_->tree_.get(); ExecutionTree *tree = tree_adapter_->tree_.get();
for (auto itr = tree->begin(); itr != tree->end(); itr++) { for (auto itr = tree->begin(); itr != tree->end(); itr++) {
if (!itr->inlined() && itr->Name() != "DeviceQueueOp") { if (!itr->inlined() && itr->Name() != "DeviceQueueOp") {
MS_LOG(INFO) << itr->NameWithID() << " num_workers: " << itr->NumWorkers() MS_LOG(INFO) << itr->NameWithID() << " num_parallel_workers: " << itr->NumWorkers()
<< " connector_capacity: " << itr->ConnectorCapacity(); << " prefetch_size: " << itr->ConnectorCapacity();
} }
} }
} }
Status AutoTune::LaunchThread() { Status AutoTune::LaunchThread() {
MS_LOG(INFO) << "Launching AutoTune thread"; MS_LOG(INFO) << "Launching Dataset AutoTune thread";
RETURN_IF_NOT_OK(CollectOpsInfo()); RETURN_IF_NOT_OK(CollectOpsInfo());
RETURN_IF_NOT_OK(cv_.Register(tree_adapter_->AllTasks()->GetIntrpService())); RETURN_IF_NOT_OK(cv_.Register(tree_adapter_->AllTasks()->GetIntrpService()));
RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this))); RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this)));
@ -89,7 +93,7 @@ Status AutoTune::CollectOpsInfo() {
if (parallel_ops_ids_.size() != 0) { if (parallel_ops_ids_.size() != 0) {
CHECK_FAIL_RETURN_UNEXPECTED(parallel_ops_ids_[parallel_ops_ids_.size() - 1] != 0, CHECK_FAIL_RETURN_UNEXPECTED(parallel_ops_ids_[parallel_ops_ids_.size() - 1] != 0,
"Non-sink pipeline, root node is a ParallelOp. AutoTune is not supported."); "Non-sink pipeline, root node is a ParallelOp. Dataset AutoTune is not supported.");
} }
return Status::OK(); return Status::OK();
@ -177,9 +181,14 @@ double AutoTune::Mean(const std::vector<T> &items) {
} }
Status AutoTune::RunIteration() { Status AutoTune::RunIteration() {
// Close AutoTune in Non-sink mode, since it's not ready for test.
if (!IsSink()) {
MS_LOG(ERROR) << "Dataset AutoTune doesn't support non-sink pipeline.";
return Status(StatusCode::kMDUnexpectedError, "Dataset AutoTune doesn't support non-sink pipeline.");
}
// Run every epoch // Run every epoch
if ((profiling_manager_->GetNumOfProfiledEpochs()) >= cur_epoch_) { if ((profiling_manager_->GetNumOfProfiledEpochs()) >= cur_epoch_) {
MS_LOG(INFO) << "Run AutoTune at epoch #" << cur_epoch_; MS_LOG(INFO) << "Run Dataset AutoTune at epoch #" << cur_epoch_;
RETURN_IF_NOT_OK(RunIterationEpoch()); RETURN_IF_NOT_OK(RunIterationEpoch());
++cur_epoch_; ++cur_epoch_;
} }
@ -241,8 +250,8 @@ Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int3
new_workers = std::min(new_workers, max_workers_); new_workers = std::min(new_workers, max_workers_);
new_workers = std::max(new_workers, MIN_NUM_WORKERS); new_workers = std::max(new_workers, MIN_NUM_WORKERS);
RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ChangeNumWorkersRequest>(new_workers))); RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ChangeNumWorkersRequest>(new_workers)));
MS_LOG(WARNING) << "Added request to change number of workers of Operator: " << ops_[op_id]->NameWithID() MS_LOG(WARNING) << "Added request to change \"num_parallel_workers\" of Operator: " << ops_[op_id]->NameWithID()
<< " New value: " << new_workers << " Old value: " << old_workers; << "From old value: [" << old_workers << "] to new value: [" << new_workers << "].";
return Status::OK(); return Status::OK();
} }
@ -251,8 +260,8 @@ Status AutoTune::RequestConnectorCapacityChange(int32_t op_id, int32_t old_size,
new_size = std::max(new_size, MIN_QUEUE_SIZE); new_size = std::max(new_size, MIN_QUEUE_SIZE);
RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ResizeConnectorRequest>(new_size))); RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ResizeConnectorRequest>(new_size)));
MS_LOG(WARNING) << "Added request to change Connector capacity of Operator: " << ops_[op_id]->NameWithID() MS_LOG(WARNING) << "Added request to change \"prefetch_size\" of Operator: " << ops_[op_id]->NameWithID()
<< " New value: " << new_size << " Old value: " << old_size; << "From old value: [" << old_size << "] to new value: [" << new_size << "].";
return Status::OK(); return Status::OK();
} }

View File

@ -88,6 +88,7 @@ class TestAutotuneWithProfiler:
md_profiler.stop() md_profiler.stop()
ds.config.set_enable_autotune(True) ds.config.set_enable_autotune(True)
@pytest.mark.skip(reason="close non-sink")
def test_autotune_with_2_pipeline(self, capfd): def test_autotune_with_2_pipeline(self, capfd):
""" """
Feature: Autotuning Feature: Autotuning
@ -112,6 +113,7 @@ class TestAutotuneWithProfiler:
ds.config.set_enable_autotune(False) ds.config.set_enable_autotune(False)
@pytest.mark.skip(reason="close non-sink")
def test_delayed_autotune_with_2_pipeline(self, capfd): def test_delayed_autotune_with_2_pipeline(self, capfd):
""" """
Feature: Autotuning Feature: Autotuning
@ -134,6 +136,7 @@ class TestAutotuneWithProfiler:
# sys.stdout.write(_) # sys.stdout.write(_)
# sys.stderr.write(err) # sys.stderr.write(err)
@pytest.mark.skip(reason="close non-sink")
def test_delayed_start_autotune_with_3_pipeline(self, capfd): def test_delayed_start_autotune_with_3_pipeline(self, capfd):
""" """
Feature: Autotuning Feature: Autotuning
@ -158,6 +161,7 @@ class TestAutotuneWithProfiler:
# sys.stdout.write(_) # sys.stdout.write(_)
# sys.stderr.write(err) # sys.stderr.write(err)
@pytest.mark.skip(reason="close non-sink")
def test_autotune_before_profiler(self): def test_autotune_before_profiler(self):
""" """
Feature: Autotuning with Profiler Feature: Autotuning with Profiler
@ -228,3 +232,4 @@ class TestAutotuneWithProfiler:
with pytest.raises(ValueError): with pytest.raises(ValueError):
ds.config.set_autotune_interval(-999) ds.config.set_autotune_interval(-999)