This commit is contained in:
hesham 2021-11-12 14:27:13 -05:00
parent 70363899e7
commit 40797c40da
19 changed files with 756 additions and 49 deletions

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -59,6 +59,10 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("get_enable_shared_mem", &ConfigManager::enable_shared_mem)
.def("set_auto_offload", &ConfigManager::set_auto_offload)
.def("get_auto_offload", &ConfigManager::get_auto_offload)
.def("set_enable_autotune", &ConfigManager::set_enable_autotune)
.def("get_enable_autotune", &ConfigManager::enable_autotune)
.def("set_autotune_interval", &ConfigManager::set_autotune_interval)
.def("get_autotune_interval", &ConfigManager::autotune_interval)
.def("load", [](ConfigManager &c, std::string s) { THROW_IF_ERROR(c.LoadFile(s)); });
}));

View File

@ -51,7 +51,9 @@ ConfigManager::ConfigManager()
auto_num_workers_num_shards_(1),
auto_worker_config_(0),
enable_shared_mem_(true),
auto_offload_(false) {
auto_offload_(false),
enable_autotune_(false),
autotune_interval_(kCfgAutoTuneInterval) {
num_cpu_threads_ = num_cpu_threads_ > 0 ? num_cpu_threads_ : std::numeric_limits<uint16_t>::max();
num_parallel_workers_ = num_parallel_workers_ < num_cpu_threads_ ? num_parallel_workers_ : num_cpu_threads_;
std::string env_cache_host = common::GetEnv("MS_CACHE_HOST");

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2019-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -230,6 +230,22 @@ class ConfigManager {
// @return - Flag to indicate whether automatic offloading is enabled for the dataset
bool get_auto_offload() { return auto_offload_; }
// setter function
// @param enable - To enable autotune
void set_enable_autotune(bool enable) { enable_autotune_ = enable; }
// getter function
// @return - Flag to indicate whether autotune is enabled
bool enable_autotune() { return enable_autotune_; }
// getter function
// @return - autotune interval in millisecods
int64_t autotune_interval() { return autotune_interval_; }
// setter function
// @param interval - autotune interval in millisecods
void set_autotune_interval(int64_t interval) { autotune_interval_ = interval; }
private:
int32_t num_parallel_workers_;
int32_t worker_connector_size_;
@ -253,6 +269,8 @@ class ConfigManager {
uint8_t auto_worker_config_;
bool enable_shared_mem_;
bool auto_offload_;
bool enable_autotune_;
int64_t autotune_interval_;
// Private helper function that takes a nlohmann json format and populates the settings
// @param j - The json nlohmann json info
Status FromJson(const nlohmann::json &j);

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,6 +24,7 @@
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/opt/pre/getter_pass.h"
#ifndef ENABLE_SECURITY
#include "minddata/dataset/engine/perf/auto_tune.h"
#include "minddata/dataset/engine/perf/monitor.h"
#include "minddata/dataset/engine/perf/profiling.h"
#endif
@ -37,12 +38,16 @@
namespace mindspore {
namespace dataset {
#ifndef ENABLE_SECURITY
using ProfilingRegistrationState = ProfilingManager::ProfilingRegistrationState;
#endif
// TreeConsumer
TreeConsumer::TreeConsumer() { tree_adapter_ = std::make_unique<TreeAdapter>(); }
Status TreeConsumer::Init(std::shared_ptr<DatasetNode> d) {
RETURN_IF_NOT_OK(tree_adapter_->Compile(std::move(d)));
#ifndef ENABLE_SECURITY
profiling_manager_ = GlobalContext::profiling_manager();
RETURN_IF_NOT_OK(RegisterProfilingManager());
#endif
return Status::OK();
@ -55,9 +60,14 @@ Status TreeConsumer::Terminate() {
#ifndef ENABLE_SECURITY
Status IteratorConsumer::RegisterProfilingManager() {
profiling_manager_ = GlobalContext::profiling_manager();
// Profiling infrastructures need to be initialized before Op launching
if (profiling_manager_->IsProfilingEnable(tree_adapter_->tree_.get())) {
auto profiler_state = profiling_manager_->GetProfilerTreeState(tree_adapter_->tree_.get());
// This should never happen
CHECK_FAIL_RETURN_UNEXPECTED(profiler_state != ProfilingManager::kEnabledTreeRegistered,
"Something went wrong. Current tree is already registered with the MD Profiler");
if (profiler_state == ProfilingManager::kEnabledDifferentTreeRegistered) {
MS_LOG(WARNING) << "MD Profiler is already enabled for a different tree.";
} else if (profiler_state == ProfilingRegistrationState::kEnabledTreeNotRegistered) {
// Profiling infrastructures need to be initialized before Op launching
// Setup profiling manager
RETURN_IF_NOT_OK(profiling_manager_->RegisterTree(this->tree_adapter_.get()));
// dataset_iterator node is used for graph mode
@ -72,9 +82,14 @@ Status IteratorConsumer::RegisterProfilingManager() {
}
Status ToDevice::RegisterProfilingManager() {
profiling_manager_ = GlobalContext::profiling_manager();
// Profiling infrastructures need to be initialized before Op launching
if (profiling_manager_->IsProfilingEnable(tree_adapter_->tree_.get())) {
auto profiler_state = profiling_manager_->GetProfilerTreeState(tree_adapter_->tree_.get());
// This should never happen
CHECK_FAIL_RETURN_UNEXPECTED(profiler_state != ProfilingManager::kEnabledTreeRegistered,
"Something went wrong. Current tree is already registered with the MD Profiler");
if (profiler_state == ProfilingManager::kEnabledDifferentTreeRegistered) {
MS_LOG(WARNING) << "MD Profiler is already enabled for a different tree.";
} else if (profiler_state == ProfilingRegistrationState::kEnabledTreeNotRegistered) {
// Profiling infrastructures need to be initialized before Op launching
// Setup profiling manager
RETURN_IF_NOT_OK(profiling_manager_->RegisterTree(this->tree_adapter_.get()));
// device_queue node is used for graph mode
@ -90,9 +105,36 @@ Status ToDevice::RegisterProfilingManager() {
}
Status TreeConsumer::RegisterProfilingManager() {
profiling_manager_ = GlobalContext::profiling_manager();
if (profiling_manager_->IsProfilingEnable(tree_adapter_->tree_.get())) {
return Status(StatusCode::kMDUnexpectedError, "Profiling is not supported for this consumer.");
auto profiler_state = profiling_manager_->GetProfilerTreeState(tree_adapter_->tree_.get());
if (profiler_state == ProfilingRegistrationState::kEnabledTreeNotRegistered) {
return {StatusCode::kMDUnexpectedError, "Profiling is not supported for this consumer."};
}
return Status::OK();
}
Status TreeConsumer::InitAutoTune() {
if (profiling_manager_->IsAutotuning()) {
// future improvement to show tree UUID in log
MS_LOG(WARNING) << "MD Auto-tune is already running for another tree";
return Status::OK();
}
auto profiler_state = profiling_manager_->GetProfilerTreeState(tree_adapter_->tree_.get());
if (profiler_state == ProfilingRegistrationState::kNotEnabled) {
// Init ProfilingManager to `Enable` it.
RETURN_IF_NOT_OK(profiling_manager_->Init());
// Register this tree
RETURN_IF_NOT_OK(RegisterProfilingManager());
// Start Profiler
RETURN_IF_NOT_OK(profiling_manager_->Start());
// AutoTune object and thread init
autotune_ = std::make_unique<AutoTune>(this->tree_adapter_.get(), GetProfilingManager());
RETURN_IF_NOT_OK(autotune_->LaunchThread());
// Set flag to distinguish between MD Profiler and MD Autotuning
// to generate appropriate logs
profiling_manager_->EnableAutotuneFlag();
} else {
MS_LOG(WARNING) << "Unable to start MD Auto-tune as MD Profiler is already enabled. Disable MD Profiler to "
"continue with auto-tune.";
}
return Status::OK();
}
@ -104,7 +146,12 @@ std::string TreeConsumer::GetOffload() { return (tree_adapter_->GetOffloadJson()
Status IteratorConsumer::Init(std::shared_ptr<DatasetNode> d) {
RETURN_IF_NOT_OK(tree_adapter_->Compile(std::move(d), num_epochs_));
#ifndef ENABLE_SECURITY
RETURN_IF_NOT_OK(RegisterProfilingManager());
profiling_manager_ = GlobalContext::profiling_manager();
if (GlobalContext::config_manager()->enable_autotune()) {
RETURN_IF_NOT_OK(InitAutoTune());
} else {
RETURN_IF_NOT_OK(RegisterProfilingManager());
}
#endif
return Status::OK();
}
@ -213,7 +260,12 @@ Status IteratorConsumer::GetNextAsOrderedPair(std::vector<std::pair<std::string,
Status ToDevice::Init(std::shared_ptr<DatasetNode> d) {
RETURN_IF_NOT_OK(tree_adapter_->Compile(std::move(d), num_epochs_));
#ifndef ENABLE_SECURITY
RETURN_IF_NOT_OK(RegisterProfilingManager());
profiling_manager_ = GlobalContext::profiling_manager();
if (GlobalContext::config_manager()->enable_autotune()) {
RETURN_IF_NOT_OK(InitAutoTune());
} else {
RETURN_IF_NOT_OK(RegisterProfilingManager());
}
#endif
return Status::OK();
}

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -31,6 +31,7 @@ namespace mindspore::dataset {
class TreeAdapter;
class DatasetNode;
#ifndef ENABLE_SECURITY
class AutoTune;
class ProfilingManager;
#endif
/// A base class for tree consumers which would fetch rows from the tree pipeline
@ -62,6 +63,8 @@ class TreeConsumer {
/// \brief Return profiling manager pointer
std::shared_ptr<ProfilingManager> GetProfilingManagerPtr() { return profiling_manager_; }
Status InitAutoTune();
#endif
protected:
@ -71,6 +74,7 @@ class TreeConsumer {
#ifndef ENABLE_SECURITY
/// Profiling Manager
std::shared_ptr<ProfilingManager> profiling_manager_;
std::shared_ptr<AutoTune> autotune_;
#endif
/// Method to return the name of the consumer

View File

@ -451,7 +451,7 @@ Status DeviceQueueOp::PushDataToGPU() {
RETURN_IF_NOT_OK(RetryPushData(handle, items));
#ifndef ENABLE_SECURITY
ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, push_cost, &batch_start_time, &end_time,
gpu_connector_->size(), gpu_connector_->capacity());
gpu_connector_->capacity(), gpu_connector_->size());
#endif
send_batch++;
#ifdef ENABLE_DUMP_IR

View File

@ -6,4 +6,5 @@ add_library(
connector_size.cc
dataset_iterator_tracing.cc
cpu_sampler.cc
auto_tune.cc
)

View File

@ -0,0 +1,273 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/perf/auto_tune.h"
#include <algorithm>
#include <functional>
#include <memory>
#include <vector>
#include "minddata/dataset/util/task_manager.h"
namespace mindspore {
namespace dataset {
AutoTune::AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr)
: tree_adapter_(tree_adap), profiling_manager_(profiling_mgr), leaf_op_id_(-1), cur_epoch_(1) {
tree_modifier_ = std::make_unique<TreeModifier>(tree_adapter_);
max_workers_ = GlobalContext::config_manager()->num_cpu_threads();
}
Status AutoTune::Main() {
TaskManager::FindMe()->Post();
MS_LOG(INFO) << "AutoTune thread has started.";
std::unique_lock<std::mutex> _lock(mux_);
cur_epoch_ = 1;
Status rc;
while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) {
rc = RunIteration();
if (rc.IsError()) {
MS_LOG(ERROR) << "AutoTune failed and will exit with the following error: " << rc;
break;
}
RETURN_IF_NOT_OK(cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval()));
}
RETURN_IF_NOT_OK(profiling_manager_->Stop());
MS_LOG(INFO) << "AutoTune thread is finished.";
return Status::OK();
}
Status AutoTune::LaunchThread() {
MS_LOG(INFO) << "Launching AutoTune thread";
RETURN_IF_NOT_OK(CollectOpsInfo());
RETURN_IF_NOT_OK(cv_.Register(tree_adapter_->AllTasks()->GetIntrpService()));
RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this)));
return Status::OK();
}
Status AutoTune::CollectOpsInfo() {
ExecutionTree *tree = tree_adapter_->tree_.get();
RETURN_UNEXPECTED_IF_NULL(tree);
for (auto itr = tree->begin(); itr != tree->end(); ++itr) {
ops_[itr->id()] = itr.get();
// get all parallel ops (num_workers>0) except leaf nodes (no children)
if (itr->NumWorkers() > 0 && itr->Children().size() > 0) {
parallel_ops_ids_.push_back(itr->id());
}
}
// sort parallel ops in reverse order of IDs (i.e., bottommost op is first)
std::sort(parallel_ops_ids_.begin(), parallel_ops_ids_.end(), std::greater<>());
leaf_op_id_ = ops_.size() - 1;
if (parallel_ops_ids_.size() != 0) {
CHECK_FAIL_RETURN_UNEXPECTED(parallel_ops_ids_[parallel_ops_ids_.size() - 1] != 0,
"Non-sink pipeline, root node is a ParallelOp. AutoTune is not supported.");
CHECK_FAIL_RETURN_UNEXPECTED(parallel_ops_ids_[0] != leaf_op_id_, "Leaf Operator is added to ParallelOps list.");
}
return Status::OK();
}
Status AutoTune::GetOpConnectorCapacity(int32_t op_id, int64_t *capacity) {
auto item = ops_.find(op_id);
CHECK_FAIL_RETURN_UNEXPECTED(item != ops_.end(), "Invalid Operator ID");
*capacity = item->second->ConnectorCapacity();
return Status::OK();
}
Status AutoTune::GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util) {
// loop over all itr keys and get avg cpu usage
for (auto itr = ops_.begin(); itr != ops_.end(); ++itr) {
std::vector<uint16_t> sys_util;
std::vector<uint16_t> user_util;
#ifndef ENABLE_ANDROID
RETURN_IF_NOT_OK(profiling_manager_->GetSysCpuUtilByEpoch(itr->first, cur_epoch_, &sys_util));
RETURN_IF_NOT_OK(profiling_manager_->GetUserCpuUtilByEpoch(itr->first, cur_epoch_, &user_util));
#endif
double sys_cpu_util = Mean(sys_util);
double user_cpu_util = Mean(user_util);
(*ops_cpu_util)[itr->first] = sys_cpu_util + user_cpu_util;
}
return Status::OK();
}
Status AutoTune::GetOpsQueueUtil(std::map<int32_t, double> *ops_queue_util) {
// loop over all itr keys in the ops_ and get output_queue usage
for (auto itr = ops_.begin(); itr != ops_.end(); ++itr) {
std::vector<int32_t> sizes;
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(itr->first, cur_epoch_, &sizes));
double avg_size = Mean(sizes);
int64_t capacity = itr->second->ConnectorCapacity();
CHECK_FAIL_RETURN_UNEXPECTED(capacity != 0, "Capacity of connector should not be 0");
(*ops_queue_util)[itr->first] = avg_size / capacity;
}
return Status::OK();
}
Status AutoTune::GetOpsNumWorker(std::map<int32_t, int32_t> *ops_num_workers) {
for (auto itr = ops_.begin(); itr != ops_.end(); ++itr) {
(*ops_num_workers)[itr->first] = itr->second->NumWorkers();
}
return Status::OK();
}
bool AutoTune::IsSink() {
std::shared_ptr<Tracing> node;
return profiling_manager_->GetTracingNode(kDeviceQueueTracingName, &node).IsOk();
}
template <typename T>
double AutoTune::Mean(std::vector<T> items) {
if (items.size() == 0) {
return 0;
}
return std::accumulate(items.begin(), items.end(), 0.0) / static_cast<double>(items.size());
}
Status AutoTune::RunIteration() {
// Run every epoch
if ((profiling_manager_->GetNumOfProfiledEpochs()) >= cur_epoch_) {
MS_LOG(INFO) << "Run AutoTune at epoch #" << cur_epoch_;
if (IsSink()) {
RETURN_IF_NOT_OK(RunIterationSink());
} else {
RETURN_IF_NOT_OK(RunIterationNonSink());
}
++cur_epoch_;
}
return Status::OK();
}
Status AutoTune::RecordPipelineTime() {
std::vector<int32_t> times;
RETURN_IF_NOT_OK(profiling_manager_->GetPipelineTimeByEpoch(cur_epoch_, &times));
double avg_time = Mean(times);
avg_pipeline_times_.push_back(avg_time);
MS_LOG(INFO) << "Epoch #" << cur_epoch_ << ", Average Pipeline time is " << avg_time
<< " ms. The avg pipeline time for all epochs is " << Mean(avg_pipeline_times_) << "ms";
return Status::OK();
}
Status AutoTune::RunIterationSink() {
RETURN_IF_NOT_OK(RecordPipelineTime());
bool isBottleneck = false;
RETURN_IF_NOT_OK(IsDSaBottleneck(&isBottleneck));
if (isBottleneck) {
RETURN_IF_NOT_OK(Analyse());
}
return Status::OK();
}
Status AutoTune::RunIterationNonSink() {
return Status(StatusCode::kMDUnexpectedError, "AutoTune doesn't support non-sink pipeline.");
}
Status AutoTune::IsDSaBottleneck(bool *isBottleneck) {
std::vector<int32_t> sizes;
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(cur_epoch_, &sizes));
double avg_size = Mean(sizes);
std::vector<int32_t> capacities;
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorCapacityByEpoch(cur_epoch_, &capacities));
double avg_capacity = Mean(capacities);
CHECK_FAIL_RETURN_UNEXPECTED(avg_capacity != 0, "Capacities of connectors should not be 0");
double usage_avg_last = (avg_size / avg_capacity);
float empty_freq = 0;
RETURN_IF_NOT_OK(profiling_manager_->GetEmptyQueueFrequencyByEpoch(cur_epoch_, &empty_freq));
// Reporting values
MS_LOG(INFO) << "Epoch #" << cur_epoch_ << ", Device Connector Size: " << avg_size
<< ", Connector Capacity: " << avg_capacity << ", Utilization: " << (usage_avg_last * TO_PERCENT) << "%"
<< ", Empty Freq: " << (empty_freq * TO_PERCENT) << "% ";
// Decision
if (usage_avg_last < DEVICE_CONNECTOR_UTIL_THRESHOLD) {
MS_LOG(WARNING) << "Utilization: " << (usage_avg_last * TO_PERCENT) << "% < "
<< (DEVICE_CONNECTOR_UTIL_THRESHOLD * TO_PERCENT)
<< "% threshold, dataset pipeline performance needs tuning.";
*isBottleneck = true;
} else {
MS_LOG(INFO) << "Utilization: " << (usage_avg_last * TO_PERCENT) << "% > "
<< (DEVICE_CONNECTOR_UTIL_THRESHOLD * TO_PERCENT)
<< "% threshold, dataset pipeline performance is OK.";
*isBottleneck = false;
}
return Status::OK();
}
Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t new_workers) {
new_workers = std::min(new_workers, max_workers_);
new_workers = std::max(new_workers, MIN_NUM_WORKERS);
MS_LOG(INFO) << "Request to change number of workers of Operator: " << ops_[op_id]->NameWithID()
<< " New value: " << new_workers << " Old value: " << old_workers;
RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ChangeNumWorkersRequest>(new_workers)));
return Status::OK();
}
Status AutoTune::RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, int32_t new_size) {
new_size = std::min(new_size, MAX_QUEUE_SIZE);
new_size = std::max(new_size, MIN_QUEUE_SIZE);
MS_LOG(INFO) << "Request to change Connector capacity of Operator: " << ops_[op_id]->NameWithID()
<< " New value: " << new_size << " Old value: " << old_size;
RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ResizeConnectorRequest>(new_size)));
return Status::OK();
}
Status AutoTune::Analyse() {
// collect stats
std::map<int32_t, int32_t> ops_num_workers;
RETURN_IF_NOT_OK(GetOpsNumWorker(&ops_num_workers));
std::map<int32_t, double> ops_queue_util;
RETURN_IF_NOT_OK(GetOpsQueueUtil(&ops_queue_util));
std::map<int32_t, double> ops_cpu_util;
RETURN_IF_NOT_OK(GetOpsCpuUtil(&ops_cpu_util));
// check leaf
if (ops_queue_util[leaf_op_id_] < LEAF_QUEUE_THRESHOLD) {
MS_LOG(WARNING) << "Leaf op (" << ops_[leaf_op_id_]->NameWithID()
<< ") queue utilization: " << ops_queue_util[leaf_op_id_] * TO_PERCENT << "% < "
<< LEAF_QUEUE_THRESHOLD * TO_PERCENT << "% threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(leaf_op_id_, ops_num_workers[leaf_op_id_],
ops_num_workers[leaf_op_id_] + INCREMENT_WORKER));
}
// check parallel ops in loop
for (const auto &op_id : parallel_ops_ids_) {
// op specifics
double output_queue_util = ops_queue_util[op_id];
double input_queue_util = ops_queue_util[op_id + 1];
double cpu_util = ops_cpu_util[op_id];
int32_t num_workers = ops_num_workers[op_id];
CHECK_FAIL_RETURN_UNEXPECTED(num_workers != 0, "ParallelOp with num_workers=0");
// derived metrics
double queue_diff = input_queue_util - output_queue_util;
int64_t queue_capacity = 0;
RETURN_IF_NOT_OK(GetOpConnectorCapacity(op_id, &queue_capacity));
// map decisions - queue
if (queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID()
<< ") is slow, input connector utilization=" << input_queue_util
<< ", output connector utilization=" << output_queue_util << ", diff= " << queue_diff << " > "
<< INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD << "threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, num_workers + INCREMENT_WORKER));
} else {
if ((cpu_util / num_workers) > MAP_OP_WORKER_HIGH_THRESHOLD) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting high average worker cpu utilization "
<< (cpu_util / num_workers) << "% > " << MAP_OP_WORKER_HIGH_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, num_workers + INCREMENT_WORKER));
} else if ((cpu_util / num_workers) < MAP_OP_WORKER_LOW_THRESHOLD &&
((input_queue_util < INPUT_QUEUE_LOW) || (-1 * queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD))) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting low average worker cpu utilization "
<< (cpu_util / num_workers) << "% < " << MAP_OP_WORKER_LOW_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestConnectorCapacityChange(op_id, queue_capacity, queue_capacity + INCREMENT_QUEUE_SIZE));
}
}
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,173 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_AUTO_TUNE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_AUTO_TUNE_H_
#include <map>
#include <memory>
#include <mutex>
#include <vector>
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/util/log_adapter.h"
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/engine/tree_adapter.h"
#include "minddata/dataset/engine/tree_modifier.h"
#include "minddata/dataset/engine/perf/profiling.h"
namespace mindspore {
namespace dataset {
class TreeModifier;
class AutoTune {
public:
/// AutoTune constructor
/// \param tree_adap_ pointer to the tree adapter
/// \param profiling_mgr_ pointer to the profiler manager
AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr);
~AutoTune() = default;
/// Function to create and launch the AutoTune thread.
/// \return Status object
Status LaunchThread();
private:
/// The main loop in AutoTune, it iterates every interval
/// \return Status object
Status Main();
/// Function to collect info from the tree
/// \return Status code
Status CollectOpsInfo();
/// The AutoTune logic that executes every iteration
/// \return status code
Status RunIteration();
/// The AutoTune logic for sink pipelines that executes every iteration
/// \return status code
Status RunIterationSink();
/// The AutoTune logic for non-sink pipelines that executes every iteration
/// \return status code
Status RunIterationNonSink();
/// Check if the dataset pipeline is the bottleneck
/// \param[out] isBottleneck bool
/// \return Status code
Status IsDSaBottleneck(bool *isBottleneck);
/// Returns true if the pipeline is sink or non-sink
/// \return bool
bool IsSink();
const int32_t TO_PERCENT = 100;
// system specifics
int32_t max_workers_;
const int32_t MIN_NUM_WORKERS = 1;
const int32_t MAX_QUEUE_SIZE = 128;
const int32_t MIN_QUEUE_SIZE = 1;
// Worker specifics
const int32_t INCREMENT_WORKER = 2;
const int32_t DECREMENT_WORKER = -1;
// Queue Specifics
const float_t INPUT_QUEUE_LOW = 0.5;
const float_t DEVICE_CONNECTOR_UTIL_THRESHOLD = 0.75;
const float_t LEAF_QUEUE_THRESHOLD = 0.9;
const float_t INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD = 0.35;
const int64_t INCREMENT_QUEUE_SIZE = 4;
// CPU Specifics
const float_t MAP_OP_WORKER_HIGH_THRESHOLD = 75;
const float_t MAP_OP_WORKER_LOW_THRESHOLD = 35;
/// Get the out connector capacity of the operator
/// \param[in] op_id operator id
/// \param[out] capacity the capacity of the connector
/// \return Status code
Status GetOpConnectorCapacity(int32_t op_id, int64_t *capacity);
/// Get the CPU usage of each operator in the pipeline
/// \param[out] ops_cpu_util map from op_id to cpu utilization
/// \return Status code
Status GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util);
/// Get the queue utilization of each operator in the pipeline
/// \param[out] ops_queue_util map from op_id to queue utilization
/// \return Status code
Status GetOpsQueueUtil(std::map<int32_t, double> *ops_queue_util);
/// Get the number of workers for each operator in the pipeline
/// \param[out] ops_num_workers map from op_id to num_workers
/// \return Status code
Status GetOpsNumWorker(std::map<int32_t, int32_t> *ops_num_workers);
/// Main AutoTune algorithm
/// \return Status code
Status Analyse();
/// Send a ChangeRequest to the operator to update the number of workers
/// \param op_id operator ID
/// \param old_workers Old number of workers for logging purposes
/// \param new_workers new number of worker
/// \return Status code
Status RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t new_workers);
/// Send a ChangeRequest to the operator to update the connector capacity
/// \param op_id operator ID
/// \param old_workers Old size for logging purposes
/// \param new_workers new size
/// \return Status code
Status RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, int32_t new_size);
/// Record the pipeline time of the current epoch into avg_pipeline_times_
/// \return Status code
Status RecordPipelineTime();
/// Utility function to calculate the mean/average of a list of numbers
/// \tparam T type of the vector
/// \param items vector of T
/// \return double the calculated mean
template <typename T>
double Mean(std::vector<T> items);
/// Pointer to the tree adapter to get tree info
TreeAdapter *tree_adapter_;
/// Pointer to the profiler manager to get statistics
ProfilingManager *profiling_manager_;
/// Unique_ptr to the tree modifier to handle change requests
std::unique_ptr<TreeModifier> tree_modifier_;
/// mux to be used to sleep
std::mutex mux_;
/// Conditional variable used to sleep
CondVar cv_;
/// a map from op_id to a pointer to the operator
std::map<int32_t, std::shared_ptr<DatasetOp>> ops_;
/// list of all map_ops
std::vector<int32_t> parallel_ops_ids_;
/// ID of the leaf op
int32_t leaf_op_id_;
/// vector of pipeline time per epoch
std::vector<double> avg_pipeline_times_;
/// the current epoch index (starts from 1)
int32_t cur_epoch_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_AUTO_TUNE_H_

View File

@ -182,35 +182,31 @@ Tracing::Tracing(int32_t records_per_step) : records_per_step_(records_per_step)
// Constructor
ProfilingManager::ProfilingManager()
: profiling_state_(ProfilingState::kProfilingStateUnBegun), enabled_(false), tree_(nullptr) {}
: profiling_state_(ProfilingState::kProfilingStateUnBegun), enabled_(false), tree_(nullptr), autotuning_(false) {}
bool ProfilingManager::IsProfilingEnable(const ExecutionTree *tree) const {
if (tree_ == nullptr) {
return enabled_;
} else {
return enabled_ && (tree_ == tree);
}
auto external_state = GetProfilerTreeState(tree);
return (external_state == kEnabledTreeNotRegistered || external_state == kEnabledTreeRegistered);
}
Status ProfilingManager::RegisterTree(TreeAdapter *tree_adapter) {
if (IsProfilingEnable()) {
tree_ = tree_adapter->tree_.get();
perf_monitor_ = std::make_unique<Monitor>(this);
// Register all profiling node.
std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
CHECK_FAIL_RETURN_UNEXPECTED(tree_ == nullptr, "Another tree is already registered.");
CHECK_FAIL_RETURN_UNEXPECTED(enabled_ == true, "MD Profiler is disabled. Cannot register a tree.");
tree_ = tree_adapter->tree_.get();
perf_monitor_ = std::make_unique<Monitor>(this);
// Register all sampling nodes here.
// Tracing node registration is the responsibility of the Consumer
std::shared_ptr<Sampling> connector_size_sampling = std::make_shared<ConnectorSize>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling));
#ifndef ENABLE_ANDROID
std::shared_ptr<Sampling> cpu_sampler = std::make_shared<CpuSampler>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampler));
std::shared_ptr<Sampling> cpu_sampler = std::make_shared<CpuSampler>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampler));
#endif
// can insert a correct timestamp so that we can ignore the samples that were taken
// during start up of the pipeline.
(void)epoch_end_ts_.emplace_back(0);
(void)epoch_end_step_.emplace_back(0);
}
// can insert a correct timestamp so that we can ignore the samples that were taken
// during start up of the pipeline.
(void)epoch_end_ts_.emplace_back(0);
(void)epoch_end_step_.emplace_back(0);
return Status::OK();
}
@ -400,7 +396,6 @@ Status ProfilingManager::EpochToTimeInterval(int32_t epoch_num, uint64_t *start_
Status ProfilingManager::EpochToStepInterval(int32_t epoch_num, uint32_t *start_step, uint32_t *end_step) {
if (epoch_num <= 0 || epoch_num >= epoch_end_step_.size()) {
std::string err = "Epoch: " + std::to_string(epoch_num) + " is invalid.";
MS_LOG(INFO) << err;
return {StatusCode::kMDUnexpectedError, err};
}
*start_step = epoch_end_step_[epoch_num - 1] + 1;
@ -603,6 +598,7 @@ Status ProfilingManager::Reset() {
perf_monitor_.reset();
tree_ = nullptr;
profiling_state_ = ProfilingState::kProfilingStateUnBegun;
autotuning_ = false;
return Status::OK();
}
@ -614,10 +610,6 @@ Status ProfilingManager::Init() {
Reset();
CHECK_FAIL_RETURN_UNEXPECTED(profiling_state_ == ProfilingState::kProfilingStateUnBegun,
"MD Profiler is in an unexpected state.");
if (profiling_state_ == ProfilingState::kProfilingStateFinished) {
profiling_state_ = ProfilingState::kProfilingStateUnBegun;
}
// Enable profiling
enabled_ = true;
@ -704,6 +696,15 @@ Status ProfilingManager::Save(const std::string &profile_data_path) {
return Status::OK();
}
ProfilingManager::ProfilingRegistrationState ProfilingManager::GetProfilerTreeState(const ExecutionTree *tree) const {
if (!enabled_) return kNotEnabled;
if (tree_ == nullptr) {
return enabled_ ? kEnabledTreeNotRegistered : kNotEnabled;
} else {
return tree_ == tree ? kEnabledTreeRegistered : kEnabledDifferentTreeRegistered;
}
}
uint64_t ProfilingTime::GetCurMilliSecond() {
// because cpplint does not allow using namespace
using std::chrono::duration_cast;

View File

@ -432,7 +432,31 @@ class ProfilingManager {
/// \return Status object with the error code
Status Save(const std::string &profile_data_path);
private:
/// Get number of epochs that have been already profiled
/// \return number of epochs
int32_t GetNumOfProfiledEpochs() { return epoch_end_step_.size() - 1; }
/// Determine if the Profiler is being used for autotuning_
/// \return boolean
bool IsAutotuning() { return autotuning_; }
/// \brief Setter for autotuning_ bool flag
void EnableAutotuneFlag() { autotuning_ = true; }
// Registration state for the profiler
enum ProfilingRegistrationState {
kNotEnabled,
kEnabledTreeNotRegistered,
kEnabledTreeRegistered,
kEnabledDifferentTreeRegistered,
};
/// \brief Getter for the profiling and tree registration state
/// \param tree Execution Tree pointer
/// \return ProfilingRegistrationState
ProfilingRegistrationState GetProfilerTreeState(const ExecutionTree *tree) const;
protected:
std::unique_ptr<Monitor> perf_monitor_;
// State flags for profiling
@ -448,6 +472,7 @@ class ProfilingManager {
ExecutionTree *tree_; // ExecutionTree pointer
std::vector<uint64_t> epoch_end_ts_; // End of epoch timestamp
std::vector<uint32_t> epoch_end_step_; // End of epoch step number
bool autotuning_; // flag to indicate if Profiler is being used for autotuning_
// Register profile node to tree
// @param node - Profiling node

View File

@ -25,6 +25,7 @@
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/engine/ir/datasetops/dataset_node.h"
#include "minddata/dataset/engine/perf/auto_tune.h"
#include "minddata/dataset/engine/perf/dataset_iterator_tracing.h"
namespace mindspore {
@ -40,6 +41,7 @@ class TreeAdapter {
friend TreeConsumer;
friend ToDevice;
friend IteratorConsumer;
friend AutoTune;
#endif
friend TreeModifier;

View File

@ -65,5 +65,7 @@ Status ChangeNumWorkersRequest::ApplyChange(DatasetOp *op) {
}
return Status::OK();
}
TreeModifier::TreeModifier(TreeAdapter *adapter) : TreeModifier(adapter->tree_.get()) {}
} // namespace dataset
} // namespace mindspore

View File

@ -116,13 +116,13 @@ class AutotuneCallback : public DSCallback {
/// Main class to handle modification of the ExecutionTree used by AutoTune
class TreeModifier {
// friend with TreeAdapter to access the ExeecutionTree
// friend with TreeAdapter to access the ExecutionTree
friend TreeAdapter;
public:
/// Constructor to create a TreeModifier given a TreeAdapter
/// \param adapter TreeAdapter
explicit TreeModifier(TreeAdapter *adapter) : TreeModifier(adapter->tree_.get()) {}
explicit TreeModifier(TreeAdapter *adapter);
/// Constructor to create a TreeModifier given an ExecutionTree
/// \param tree ExecutionTree

View File

@ -283,6 +283,8 @@ constexpr uint8_t kCVInvalidType = 255;
using connection_id_type = uint64_t;
using session_id_type = uint32_t;
using row_id_type = int64_t;
constexpr uint32_t kCfgAutoTuneInterval = 100; // ms
} // namespace dataset
} // namespace mindspore

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2019-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -146,7 +146,7 @@ class Queue {
std::unique_lock<std::mutex> _lock(mux_);
CHECK_FAIL_RETURN_UNEXPECTED(new_capacity > 0,
"New capacity: " + std::to_string(new_capacity) + ", should be larger than 0");
RETURN_OK_IF_TRUE(new_capacity == capacity());
RETURN_OK_IF_TRUE(new_capacity == static_cast<int32_t>(capacity()));
std::vector<T> queue;
// pop from the original queue until the new_capacity is full
for (int32_t i = 0; i < new_capacity; ++i) {
@ -176,7 +176,7 @@ class Queue {
this->ResetQue();
RETURN_IF_NOT_OK(arr_.allocate(new_capacity));
sz_ = new_capacity;
for (int32_t i = 0; i < queue.size(); ++i) {
for (int32_t i = 0; i < static_cast<int32_t>(queue.size()); ++i) {
RETURN_IF_NOT_OK(this->AddWhileHoldingLock(queue[i]));
}
queue.clear();

View File

@ -410,6 +410,76 @@ def load(file):
_config.load(file)
def set_enable_autotune(enable):
"""
Set the default state of AutoTune flag. If it is True, will facilitate users to improve
performance for a given workload by automatically finding the better settings for data pipeline.
Args:
enable (bool): Whether to use AutoTune feature when running data pipeline.
Raises:
TypeError: If enable is not a boolean data type.
Examples:
>>> # Enable AutoTune
>>> ds.config.set_enable_autotune(True)
"""
if not isinstance(enable, bool):
raise TypeError("enable must be of type bool.")
_config.set_enable_autotune(enable)
def get_enable_autotune():
"""
Get the default state of AutoTune enabled variable.
Returns:
bool, the state of AutoTune enabled variable (default=True).
Examples:
>>> # Get the flag of AutoTune feature.
>>> autotune_flag = ds.config.get_enable_autotune()
"""
return _config.get_enable_autotune()
def set_autotune_interval(interval):
"""
Set the default interval (in milliseconds) for data pipeline auto-tuning.
Args:
interval (int): Interval (in milliseconds) to be used for data pipeline auto-tuning.
Raises:
ValueError: If interval is invalid when interval <= 0 or interval > MAX_INT_32.
Examples:
>>> # Set a new global configuration value for the auto-tuning interval.
>>> ds.config.set_autotune_interval(100)
"""
if not isinstance(interval, int):
raise TypeError("interval must be of type int.")
if interval <= 0 or interval > INT32_MAX:
raise ValueError("Interval given is not within the required range.")
_config.set_autotune_interval(interval)
def get_autotune_interval():
"""
Get the global configuration of sampling interval of pipeline auto-tuning.
Returns:
int, interval (in milliseconds) for data pipeline auto-tuning.
Examples:
>>> # Get the global configuration of the auto-tuning interval.
>>> # If set_autotune_interval() is never called before, the default value(100) will be returned.
>>> autotune_interval = ds.config.get_autotune_interval()
"""
return _config.get_autotune_interval()
def get_enable_shared_mem():
"""
Get the default state of shared mem enabled variable.

View File

@ -133,6 +133,7 @@ if(BUILD_MINDDATA STREQUAL "full")
${MINDDATA_DIR}/core/global_context.cc
${MINDDATA_DIR}/core/client.cc
${MINDDATA_DIR}/engine/tree_adapter_lite.cc
${MINDDATA_DIR}/engine/tree_modifier.cc
${MINDDATA_DIR}/engine/consumers/pull_based_tree_consumer.cc
${MINDDATA_DIR}/engine/consumers/tree_consumer.cc
${MINDDATA_DIR}/engine/ir/datasetops/dataset_node.cc
@ -177,6 +178,7 @@ if(BUILD_MINDDATA STREQUAL "full")
${MINDDATA_DIR}/engine/opt/pre/deep_copy_pass.cc
${MINDDATA_DIR}/engine/opt/post/auto_worker_pass.cc
${MINDDATA_DIR}/engine/opt/pass.cc
${MINDDATA_DIR}/engine/perf/auto_tune.cc
${MINDDATA_DIR}/engine/perf/profiling.cc
${MINDDATA_DIR}/engine/perf/monitor.cc
${MINDDATA_DIR}/engine/perf/device_queue_tracing.cc

View File

@ -0,0 +1,76 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
Testing Autotune support in DE
"""
import numpy as np
import pytest
import mindspore.dataset as ds
def test_autotune_simple_pipeline():
"""
Feature: Auto-tuning
Description: test simple pipeline of autotune - Generator -> Shuffle -> Batch
Expectation: pipeline runs successfully
"""
ds.config.set_enable_autotune(True)
source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["data"])
data1 = data1.shuffle(64)
data1 = data1.batch(32)
itr = data1.create_dict_iterator(num_epochs=5)
for _ in range(5):
for _ in itr:
pass
ds.config.set_enable_autotune(False)
def test_autotune_config():
"""
Feature: Auto-tuning
Description: test basic config of autotune
Expectation: config can be set successfully
"""
autotune_state = ds.config.get_enable_autotune()
assert autotune_state is False
ds.config.set_enable_autotune(False)
autotune_state = ds.config.get_enable_autotune()
assert autotune_state is False
with pytest.raises(TypeError):
ds.config.set_enable_autotune(1)
autotune_interval = ds.config.get_autotune_interval()
assert autotune_interval == 100
ds.config.set_autotune_interval(200)
autotune_interval = ds.config.get_autotune_interval()
assert autotune_interval == 200
with pytest.raises(TypeError):
ds.config.set_autotune_interval(20.012)
with pytest.raises(ValueError):
ds.config.set_autotune_interval(-999)
if __name__ == "__main__":
test_autotune_simple_pipeline()
test_autotune_config()