!45602 Add debug mode support to minddata pipeline

Merge pull request !45602 from TinaMengtingZhang/dev_md_pipeline_debug
This commit is contained in:
i-robot 2022-11-23 21:09:23 +00:00 committed by Gitee
commit 711661d964
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
52 changed files with 2386 additions and 68 deletions

View File

@ -96,7 +96,8 @@ PullIterator::~PullIterator() = default;
// Get the next row from the data pipeline.
Status PullIterator::GetRows(int32_t num_rows, std::vector<MSTensorVec> *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr. Please launch iterator fist.");
CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr. Please launch iterator first.");
row->clear();
for (int i = 0; i < num_rows; i++) {
std::vector<std::shared_ptr<dataset::Tensor>> md_row;
Status rc = pull_consumer_->GetNextAsVector(&md_row);
@ -120,6 +121,7 @@ Status PullIterator::GetRows(int32_t num_rows, std::vector<MSTensorVec> *const r
Status PullIterator::GetNextRow(MSTensorVec *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr.");
row->clear();
std::vector<std::shared_ptr<dataset::Tensor>> md_row;
Status rc = pull_consumer_->GetNextAsVector(&md_row);
if (rc.IsError()) {

View File

@ -77,6 +77,8 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("get_dynamic_shape", &ConfigManager::dynamic_shape)
.def("set_fast_recovery", &ConfigManager::set_fast_recovery)
.def("get_fast_recovery", &ConfigManager::fast_recovery)
.def("set_debug_mode", &ConfigManager::set_debug_mode)
.def("get_debug_mode", &ConfigManager::get_debug_mode)
.def("load", [](ConfigManager &c, const std::string &s) { THROW_IF_ERROR(c.LoadFile(s)); });
}));

View File

@ -49,6 +49,27 @@ PYBIND_REGISTER(PythonIteratorConsumer, 1, ([](const py::module *m) {
});
}));
PYBIND_REGISTER(
PythonPullBasedIteratorConsumer, 1, ([](const py::module *m) {
(void)py::class_<PythonPullBasedIteratorConsumer, TreeConsumer, std::shared_ptr<PythonPullBasedIteratorConsumer>>(
*m, "PythonPullBasedIteratorConsumer")
.def(py::init<int32_t>())
.def("Init",
[](PythonPullBasedIteratorConsumer &self, std::shared_ptr<DatasetNode> d) { THROW_IF_ERROR(self.Init(d)); })
.def("GetNextAsMap",
[](PythonPullBasedIteratorConsumer &self) {
py::dict output;
THROW_IF_ERROR(self.GetNextAsDict(&output));
return output;
})
.def("GetOffload", [](PythonPullBasedIteratorConsumer &self) { return self.GetOffload(); })
.def("GetNextAsList", [](PythonPullBasedIteratorConsumer &self) {
py::list output;
THROW_IF_ERROR(self.GetNextAsList(&output));
return output;
});
}));
PYBIND_REGISTER(TreeGetters, 1, ([](const py::module *m) {
(void)py::class_<PythonTreeGetters, TreeConsumer, std::shared_ptr<PythonTreeGetters>>(*m,
"TreeGetters")

View File

@ -93,6 +93,7 @@ Status ConfigManager::FromJson(const nlohmann::json &j) {
set_cache_port(j.value("cachePort", cache_port_));
set_num_connections(j.value("numConnections", num_connections_));
set_cache_prefetch_size(j.value("cachePrefetchSize", cache_prefetch_size_));
set_debug_mode(j.value("debug_mode_flag", debug_mode_flag_));
return Status::OK();
}

View File

@ -293,6 +293,14 @@ class ConfigManager {
// @return - Flag to indicate whether md pipeline recovers fast in failover reset
bool fast_recovery() const { return fast_recovery_; }
// setter function
// @param debug_mode_flag - Indicate whether the debug mode is on
void set_debug_mode(const bool debug_mode_flag) { debug_mode_flag_ = debug_mode_flag; }
// getter function
// @return - Flag to indicate whether the debug mode is on
bool get_debug_mode() const { return debug_mode_flag_; }
private:
// Private helper function that takes a nlohmann json format and populates the settings
// @param j - The json nlohmann json info
@ -327,7 +335,8 @@ class ConfigManager {
uint32_t multiprocessing_timeout_interval_; // Multiprocessing timeout interval in seconds
std::string autotune_json_filepath_; // Filepath name of the final AutoTune Configuration JSON file
bool dynamic_shape_{false};
bool fast_recovery_{true}; // Used for failover scenario to recover quickly or produce same augmentations
bool fast_recovery_{true}; // Used for failover scenario to recover quickly or produce same augmentations
bool debug_mode_flag_{false}; // Indicator for debug mode
};
} // namespace dataset
} // namespace mindspore

View File

@ -19,11 +19,9 @@
#include <algorithm>
namespace mindspore::dataset {
PullBasedIteratorConsumer::PullBasedIteratorConsumer() { tree_adapter_lite_ = std::make_unique<TreeAdapterLite>(); }
Status PullBasedIteratorConsumer::Init(std::shared_ptr<DatasetNode> root) {
RETURN_UNEXPECTED_IF_NULL(root);
return tree_adapter_lite_->BuildTree(std::move(root));
return tree_adapter_lite_->Compile(std::move(root), num_epochs_);
}
std::vector<TensorRow> PullBasedIteratorConsumer::GetRows(int64_t num_rows) {
@ -73,7 +71,6 @@ Status PullBasedIteratorConsumer::GetNextAsMap(std::unordered_map<std::string, T
Status PullBasedIteratorConsumer::GetNextAsOrderedPair(
std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *const vec) {
std::unique_ptr<TreeAdapter> tree_adapter = std::make_unique<TreeAdapter>();
CHECK_FAIL_RETURN_UNEXPECTED(vec != nullptr && vec->empty(), "vec is null or non-empty.");
TensorRow curr_row;
@ -85,7 +82,7 @@ Status PullBasedIteratorConsumer::GetNextAsOrderedPair(
if (column_order_.empty()) {
const int32_t invalid_col_id = -1;
column_order_.resize(num_cols, {std::string(), invalid_col_id});
for (const auto &itr : tree_adapter->GetColumnNameMap()) {
for (const auto &itr : tree_adapter_lite_->GetColumnNameMap()) {
int32_t ind = itr.second;
CHECK_FAIL_RETURN_UNEXPECTED(ind < num_cols && ind >= 0, "column id out of bounds.");
column_order_[ind] = std::make_pair(itr.first, ind);

View File

@ -23,6 +23,7 @@
#include <vector>
#include <cstddef>
#include "minddata/dataset/engine/tree_adapter_lite.h"
#include "minddata/dataset/engine/consumers/tree_consumer.h"
namespace mindspore::dataset {
@ -30,14 +31,17 @@ class TreeAdapterLite;
class TensorRow;
/// Consumer that iterates over the dataset and returns the rows one by one as a in a pull based fashion
class PullBasedIteratorConsumer {
class PullBasedIteratorConsumer : public TreeConsumer {
public:
/// Constructor
PullBasedIteratorConsumer();
/// \param num_epochs number of epochs. Default: 1.
explicit PullBasedIteratorConsumer(int32_t num_epochs = 1) : TreeConsumer(num_epochs) {
tree_adapter_lite_ = std::make_unique<TreeAdapterLite>();
}
~PullBasedIteratorConsumer() = default;
Status Init(std::shared_ptr<DatasetNode> root);
Status Init(std::shared_ptr<DatasetNode> root) override;
/// \brief Returns the next row in a vector format
/// \note This is currently a placeholder function
@ -48,17 +52,22 @@ class PullBasedIteratorConsumer {
/// Returns the next row in a vector format
/// \param[out] out std::vector of Tensors
/// \return Status error code
Status GetNextAsVector(std::vector<TensorPtr> *const out);
Status GetNextAsVector(std::vector<TensorPtr> *const out) override;
/// Returns the next row in as a map
/// \param[out] out std::map of string to Tensor
/// \return Status error code
Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *out);
Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *const out) override;
/// Returns the next row in as a vector
/// \param[out] vec std::vector of pairs of string to Tensor
/// \return Status error code
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *vec);
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *const vec) override;
protected:
/// Method to return the name of the consumer
/// \return string
std::string Name() override { return "PullBasedIteratorConsumer"; }
private:
std::unique_ptr<TreeAdapterLite> tree_adapter_lite_;

View File

@ -48,6 +48,35 @@ Status PythonIteratorConsumer::GetNextAsDict(const py::dict *out) {
return Status::OK();
}
Status PythonPullBasedIteratorConsumer::GetNextAsList(const py::list *out) {
RETURN_UNEXPECTED_IF_NULL(out);
std::vector<TensorPtr> row;
{
py::gil_scoped_release gil_release;
RETURN_IF_NOT_OK(GetNextAsVector(&row));
}
for (auto el : row) {
(*out).append(el);
}
return Status::OK();
}
Status PythonPullBasedIteratorConsumer::GetNextAsDict(const py::dict *out) {
RETURN_UNEXPECTED_IF_NULL(out);
std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> vec;
Status s;
{
py::gil_scoped_release gil_release;
s = GetNextAsOrderedPair(&vec);
}
RETURN_IF_NOT_OK(s);
// Generate Python dict, python dict maintains its insertion order
for (const auto &pair : vec) {
(*out)[common::SafeCStr(pair.first)] = pair.second;
}
return Status::OK();
}
Status PythonBuildVocabConsumer::Start() {
py::gil_scoped_release gil_release;
return BuildVocabConsumer::Start();

View File

@ -20,6 +20,7 @@
#include <string>
#include <unordered_map>
#include "minddata/dataset/engine/consumers/tree_consumer.h"
#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h"
#include "pybind11/pybind11.h"
namespace mindspore::dataset {
@ -44,6 +45,24 @@ class PythonIteratorConsumer : public IteratorConsumer {
Status GetNextAsDict(const py::dict *out);
};
class PythonPullBasedIteratorConsumer : public PullBasedIteratorConsumer {
public:
/// Constructor which will call the base class default constructor.
/// \param num_epochs number of epochs. Default to -1 (infinite epochs).
explicit PythonPullBasedIteratorConsumer(int32_t num_epochs = -1) : PullBasedIteratorConsumer() {}
~PythonPullBasedIteratorConsumer() = default;
/// Returns the next row in a vector format
/// \param[out] out std::vector of Tensors
/// \return Status error code
Status GetNextAsList(const py::list *out);
/// Returns the next row in as a map
/// \param[out] out std::map of string to Tensor
/// \return Status error code
Status GetNextAsDict(const py::dict *out);
};
class PythonBuildVocabConsumer : public BuildVocabConsumer {
public:
Status Start() override;

View File

@ -174,7 +174,7 @@ Status IteratorConsumer::Init(std::shared_ptr<DatasetNode> d) {
return Status::OK();
}
Status IteratorConsumer::GetNextAsVector(std::vector<TensorPtr> *out) {
Status IteratorConsumer::GetNextAsVector(std::vector<TensorPtr> *const out) {
RETURN_UNEXPECTED_IF_NULL(out);
out->clear();

View File

@ -80,6 +80,23 @@ class TreeConsumer {
Status InitAutoTune();
#endif
/// Returns the next row in a vector format
/// \param[out] out std::vector of Tensors
/// \return Status error code
virtual Status GetNextAsVector(std::vector<TensorPtr> *const out) { return Status::OK(); }
/// Returns the next row in as a map
/// \param[out] out std::map of string to Tensor
/// \return Status error code
virtual Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *const out) { return Status::OK(); }
/// Returns the next row in as a vector
/// \param[out] out std::vector of pairs of string to Tensor
/// \return Status error code
virtual Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *const vec) {
return Status::OK();
}
protected:
/// The class owns the tree_adapter that handles execution tree operations.
std::unique_ptr<TreeAdapter> tree_adapter_;
@ -111,17 +128,17 @@ class IteratorConsumer : public TreeConsumer {
/// Returns the next row in a vector format
/// \param[out] out std::vector of Tensors
/// \return Status error code
Status GetNextAsVector(std::vector<TensorPtr> *out);
Status GetNextAsVector(std::vector<TensorPtr> *const out) override;
/// Returns the next row in as a map
/// \param[out] out std::map of string to Tensor
/// \return Status error code
Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *const out);
Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *const out) override;
/// Returns the next row in as a vector
/// \param[out] out std::vector of pairs of string to Tensor
/// \return Status error code
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *const vec);
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *const vec) override;
Status RegisterProfilingManager() override;

View File

@ -697,10 +697,10 @@ Status BatchOp::GetNextRowPullMode(TensorRow *const row) {
}
}
RETURN_UNEXPECTED_IF_NULL(table);
if (pad_) {
RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_));
} // do padding if needed
if (!table->empty()) {
if (pad_) {
RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_));
} // do padding if needed
RETURN_IF_NOT_OK(BatchRows(&table, row, table->size()));
batch_cnt_++;
batch_num_++;

View File

@ -207,5 +207,44 @@ Status ConcatOp::GetNextRow(TensorRow *row) {
return Status::OK();
}
Status ConcatOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
bool is_not_mappable_or_second_ne_zero = true;
if (!children_flag_and_nums_.empty()) {
const bool is_not_mappable = children_flag_and_nums_[cur_child_].first != 0 ? true : false;
const bool second_ne_zero = children_flag_and_nums_[cur_child_].second == 0 ? true : false;
is_not_mappable_or_second_ne_zero = is_not_mappable || second_ne_zero;
}
RETURN_IF_NOT_OK(child_[static_cast<size_t>(cur_child_)]->GetNextRowPullMode(row));
if (row->eoe()) {
// if last child, send out eoe and reset epoch
if (cur_child_ == child_.size() - 1) {
// reset
cur_child_ = 0;
verified_ = false;
UpdateRepeatAndEpochCounter();
return Status::OK();
}
if (!is_not_mappable_or_second_ne_zero) {
sample_number_ += children_flag_and_nums_[cur_child_].second;
}
cur_child_++;
verified_ = false;
RETURN_IF_NOT_OK(GetNextRowPullMode(row));
return Status::OK();
} else {
if (!verified_) {
RETURN_IF_NOT_OK(Verify(cur_child_, *row));
}
if (IgnoreSample()) {
RETURN_IF_NOT_OK(GetNextRowPullMode(row));
}
return Status::OK();
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -74,6 +74,8 @@ class ConcatOp : public PipelineOp {
Status GetNextRow(TensorRow *row) override;
Status GetNextRowPullMode(TensorRow *const row) override;
/// Check if the current sample will be taken or dropped
/// \return bool
bool IgnoreSample();

View File

@ -255,6 +255,10 @@ void DatasetOp::Print(std::ostream &out, bool show_all) const {
Status DatasetOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
if (child_.empty()) {
MS_LOG(DEBUG) << "No child for operator [" << Name() << "].";
return Status::OK();
}
RETURN_UNEXPECTED_IF_NULL(child_[0]);
return child_[0]->GetNextRowPullMode(row);
}
@ -326,6 +330,14 @@ Status DatasetOp::PrepareOperator() {
return Status::OK();
}
// During tree prepare phase, operators may have specific post-operations to perform depending on their role.
Status DatasetOp::PrepareOperatorPullBased() {
// Generate the column name map for the current op.
RETURN_IF_NOT_OK(this->ComputeColMap());
return Status::OK();
}
// Derived classes may implement the reset function if the operator is stateful and needs
// specific reset handling that is not contained in this common code version of the reset.
Status DatasetOp::Reset() {

View File

@ -201,6 +201,12 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// before providing their own implementations.
virtual Status PrepareOperator();
// \brief During tree prepare phase, operators may have specific post-operations to perform depending on
// their role.
// \notes Derived versions of this function should always call its superclass version first
// before providing their own implementations.
virtual Status PrepareOperatorPullBased();
// \brief Getter function
// \return The operator id
int32_t id() const { return operator_id_; }

View File

@ -17,6 +17,7 @@
#include <algorithm>
#include <cstring>
#include <memory>
#include <set>
#include <vector>
#include "minddata/dataset/callback/callback_param.h"
@ -451,5 +452,62 @@ std::vector<int32_t> MapOp::GetMPWorkerPIDs() const {
}
return DatasetOp::GetMPWorkerPIDs();
}
Status MapOp::GetNextRowPullMode(TensorRow *const row) {
TensorRow new_row;
RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row));
if (new_row.empty()) {
return Status::OK();
}
auto column_name_id_map = child_[0]->column_name_id_map();
// Apply transforms on tensor
for (auto &t : tfuncs_[0]) {
for (auto &col_name : in_columns_) {
TensorRow i_row, o_row;
auto index = column_name_id_map[col_name];
i_row.push_back(new_row.at(index));
Status rc = t->Compute(i_row, &o_row);
if (rc.IsError()) {
std::string op_name = t->Name();
RETURN_IF_NOT_OK(RebuildMapErrorMsg(new_row, op_name, &rc));
}
// For next transform
new_row[index] = std::move(o_row.at(0));
}
}
(*row) = std::move(new_row);
return Status::OK();
}
Status MapOp::RebuildMapErrorMsg(const TensorRow &input_row, const std::string &op_name, Status *rc) {
std::string err_msg = "";
// Need to remove the suffix "Op" whose length is 2
std::string abbr_op_name = op_name.substr(0, op_name.length() - 2);
err_msg += "map operation: [" + abbr_op_name + "] failed. ";
if (input_row.getPath().size() > 0 && !input_row.getPath()[0].empty()) {
err_msg += "The corresponding data file is: " + input_row.getPath()[0];
if (input_row.getPath().size() > 1) {
std::set<std::string> path_set;
path_set.insert(input_row.getPath()[0]);
for (auto j = 1; j < input_row.getPath().size(); j++) {
if (!input_row.getPath()[j].empty() && path_set.find(input_row.getPath()[j]) == path_set.end()) {
err_msg += ", " + input_row.getPath()[j];
path_set.insert(input_row.getPath()[j]);
}
}
}
err_msg += ". ";
}
std::string tensor_err_msg = rc->GetErrDescription();
if (rc->GetLineOfCode() < 0) {
err_msg += "Error description:\n";
}
err_msg += tensor_err_msg;
if (abbr_op_name == "PyFunc") {
RETURN_STATUS_ERROR(StatusCode::kMDPyFuncException, err_msg);
}
rc->SetErrDescription(err_msg);
return *rc;
}
} // namespace dataset
} // namespace mindspore

View File

@ -149,6 +149,8 @@ class MapOp : public ParallelOp<std::unique_ptr<MapWorkerJob>, TensorRow> {
/// \return vector of int
std::vector<int32_t> GetMPWorkerPIDs() const override;
Status GetNextRowPullMode(TensorRow *const row) override;
private:
// A helper function to create jobs for workers.
Status GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job, int32_t worker_id);
@ -214,6 +216,9 @@ class MapOp : public ParallelOp<std::unique_ptr<MapWorkerJob>, TensorRow> {
Status Launch() override;
Status AddNewWorkers(int32_t num_new_workers) override;
Status RemoveWorkers(int32_t num_workers) override;
private:
Status RebuildMapErrorMsg(const TensorRow &input_row, const std::string &op_name, Status *rc);
};
} // namespace dataset
} // namespace mindspore

View File

@ -263,5 +263,16 @@ Status ShuffleOp::EoeReceived(int32_t worker_id) {
state_ = OpState::kDeOpIdle;
return Status::OK();
}
Status ShuffleOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
RETURN_UNEXPECTED_IF_NULL(child_[0]);
if (GlobalContext::config_manager()->get_debug_mode()) {
MS_LOG(WARNING) << "In debug mode, shuffle operation is disabled for debugging purposes.";
} else {
MS_LOG(WARNING) << "Shuffle operation has not been implemented yet in pull mode.";
}
return child_[0]->GetNextRowPullMode(row);
}
} // namespace dataset
} // namespace mindspore

View File

@ -94,6 +94,11 @@ class ShuffleOp : public PipelineOp {
// @return Status The status code returned
Status PrepareOperator() override;
/// \brief Gets the next row
/// \param row[out] - Fetched TensorRow
/// \return Status The status code returned
Status GetNextRowPullMode(TensorRow *const row) override;
private:
// Private function to add a new row to the shuffle buffer.
// @return Status The status code returned

View File

@ -72,5 +72,30 @@ Status SkipOp::GetNextRow(TensorRow *row) {
}
return Status::OK();
}
Status SkipOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
bool eoe_received = false;
while (skip_count_ < max_skips_) {
RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(row));
if (row->eoe() && !once_only_) {
eoe_received = true;
break;
}
if (!row->eoe()) {
skip_count_++;
}
}
if (!eoe_received) {
RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(row));
}
if (row->eoe()) {
UpdateRepeatAndEpochCounter();
if (!once_only_) {
skip_count_ = 0;
}
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -52,6 +52,12 @@ class SkipOp : public PipelineOp {
void SetOnceOnly(bool once_only) { once_only_ = once_only; }
protected:
/// \brief Gets the next row
/// \param row[out] - Fetched TensorRow
/// \return Status The status code returned
Status GetNextRowPullMode(TensorRow *const row) override;
private:
int32_t max_skips_; // The number of skips that the user requested
int32_t skip_count_; // A counter for the current number of executed skips

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2021 Huawei Technologies Co., Ltd
* Copyright 2020-2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,9 +37,7 @@ AlbumOp::AlbumOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, boo
extensions_(exts),
data_schema_(std::move(data_schema)),
sampler_ind_(0),
dirname_offset_(0),
sample_ids_(nullptr),
curr_row_(0) {
dirname_offset_(0) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->Column(i).Name()] = i;
@ -430,29 +428,9 @@ Status AlbumOp::ComputeColMap() {
column_name_id_map_[data_schema_->Column(i).Name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
MS_LOG(INFO) << "Column name map is already set!";
}
return Status::OK();
}
Status AlbumOp::GetNextRowPullMode(TensorRow *const row) {
if (image_rows_.empty()) {
RETURN_IF_NOT_OK(PrepareData());
}
if (sample_ids_ == nullptr) {
RETURN_IF_NOT_OK(this->InitSampler());
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
sample_ids_ = sample_row[0];
}
if (curr_row_ + 1 > sample_ids_->Size()) {
return Status::OK();
}
int64_t key;
RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_}));
RETURN_IF_NOT_OK(LoadTensorRow(key, row));
curr_row_++;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -160,11 +160,6 @@ class AlbumOp : public MappableLeafOp {
/// \return Status The status code returned
Status loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row);
/// \brief Gets the next row
/// \param row[out] - Fetched TensorRow
/// \return Status The status code returned
Status GetNextRowPullMode(TensorRow *const row) override;
/// Private function for computing the assignment of the column name map.
/// \return Status The status code returned
Status ComputeColMap() override;
@ -177,9 +172,6 @@ class AlbumOp : public MappableLeafOp {
int64_t sampler_ind_;
int64_t dirname_offset_;
std::vector<std::string> image_rows_;
TensorPtr sample_ids_;
uint32_t curr_row_;
};
} // namespace dataset
} // namespace mindspore

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2021 Huawei Technologies Co., Ltd
* Copyright 2019-2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -315,5 +315,15 @@ Status CelebAOp::ComputeColMap() {
return Status::OK();
}
Status CelebAOp::InitPullMode() {
if (!image_labels_vec_.empty()) {
return Status::OK();
}
if (attr_info_queue_->empty()) {
RETURN_IF_NOT_OK(ParseAttrFile());
}
return PrepareData();
}
} // namespace dataset
} // namespace mindspore

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2021 Huawei Technologies Co., Ltd
* Copyright 2019-2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -119,6 +119,10 @@ class CelebAOp : public MappableLeafOp {
// @return - Status
Status ComputeColMap() override;
/// Initialize pull mode, calls PrepareData() within
/// @return Status The status code returned
Status InitPullMode() override;
std::string folder_path_; // directory of celeba folder
bool decode_;
std::set<std::string> extensions_; /// extensions allowed

View File

@ -340,5 +340,17 @@ Status ImageFolderOp::GetNumClasses(int64_t *num_classes) {
num_classes_ = *num_classes;
return Status::OK();
}
Status ImageFolderOp::InitPullMode() {
// to avoid the concurrent and multi end signal in StartAsyncWalk, explicitly set num_workers_ to 1
num_workers_ = 1;
if (folder_name_queue_->empty()) {
RETURN_IF_NOT_OK(StartAsyncWalk());
}
if (image_name_queue_->empty()) {
RETURN_IF_NOT_OK(PrescanWorkerEntry(id()));
}
return PrepareData();
}
} // namespace dataset
} // namespace mindspore

View File

@ -151,6 +151,10 @@ class ImageFolderOp : public MappableLeafOp {
/// @return - Status
Status ComputeColMap() override;
/// Initialize pull mode, calls PrepareData() within
/// @return Status The status code returned
Status InitPullMode() override;
std::string folder_path_; // directory of image folder
bool recursive_;
bool decode_;

View File

@ -22,7 +22,7 @@
namespace mindspore {
namespace dataset {
MappableLeafOp::MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr<SamplerRT> sampler)
: ParallelOp(num_wkrs, queue_size, std::move(sampler)) {}
: ParallelOp(num_wkrs, queue_size, std::move(sampler)), sample_ids_(nullptr), curr_row_(0), prepared_data_{false} {}
#ifdef ENABLE_PYTHON
Status MappableLeafOp::ImageDecrypt(const std::string &path, std::shared_ptr<Tensor> *tensor,
@ -158,5 +158,30 @@ Status MappableLeafOp::SendQuitFlagToWorker(int32_t worker_id) {
RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockNone)));
return Status::OK();
}
Status MappableLeafOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
row->clear();
if (!prepared_data_) {
RETURN_IF_NOT_OK(InitPullMode());
prepared_data_ = true;
}
if (sample_ids_ == nullptr) {
RETURN_IF_NOT_OK(this->InitSampler());
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler.");
sample_ids_ = sample_row[0];
}
if (curr_row_ + 1 > sample_ids_->Size()) {
*row = TensorRow(TensorRow::kFlagEOE);
return Status::OK();
}
int64_t key;
RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_}));
RETURN_IF_NOT_OK(LoadTensorRow(key, row));
curr_row_++;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -21,6 +21,8 @@
#include <algorithm>
#include <map>
#include <set>
#include <utility>
#include <vector>
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/engine/data_schema.h"
@ -75,6 +77,10 @@ class MappableLeafOp : public ParallelOp<std::unique_ptr<IOBlock>, TensorRow>, p
#endif
protected:
TensorPtr sample_ids_; // sample id pointer for pull mode
uint32_t curr_row_; // current row number count for pull mode
bool prepared_data_; // flag to indicate whether the data is prepared before LoadTensorRow for pull mode
/// Initialize Sampler, calls sampler->Init() within
/// @return Status The status code returned
Status InitSampler();
@ -104,6 +110,15 @@ class MappableLeafOp : public ParallelOp<std::unique_ptr<IOBlock>, TensorRow>, p
Status Reset() override;
Status SendWaitFlagToWorker(int32_t worker_id) override;
Status SendQuitFlagToWorker(int32_t worker_id) override;
/// \brief In pull mode, gets the next row
/// \param row[out] - Fetched TensorRow
/// \return Status The status code returned
Status GetNextRowPullMode(TensorRow *const row) override;
/// Initialize pull mode, calls PrepareData() within
/// @return Status The status code returned
virtual Status InitPullMode() { return PrepareData(); }
};
} // namespace dataset
} // namespace mindspore

View File

@ -366,5 +366,36 @@ Status MindRecordOp::RemoveWorkers(int32_t num_workers) {
return Status::OK();
}
Status MindRecordOp::InitPullMode() {
RETURN_IF_NOT_OK(Init());
RETURN_IF_NOT_OK(shard_reader_->Launch(true));
return this->PrepareData();
}
Status MindRecordOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
row->clear();
if (!prepared_data_) {
RETURN_IF_NOT_OK(InitPullMode());
prepared_data_ = true;
}
if (sample_ids_ == nullptr) {
RETURN_IF_NOT_OK(this->InitSampler());
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row));
CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler.");
sample_ids_ = sample_row[0];
}
if (curr_row_ + 1 > sample_ids_->Size()) {
*row = TensorRow(TensorRow::kFlagEOE);
return Status::OK();
}
int64_t key;
RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_}));
RETURN_IF_NOT_OK(GetRowFromReader(row, key, id()));
curr_row_++;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -151,6 +151,15 @@ class MindRecordOp : public MappableLeafOp {
/// \return Status The status code returned
Status RemoveWorkers(int32_t num_workers = 1) override;
/// Initialize pull mode, calls PrepareData() within
/// @return Status The status code returned
Status InitPullMode() override;
/// \brief Gets the next row
/// \param row[out] - Fetched TensorRow
/// \return Status The status code returned
Status GetNextRowPullMode(TensorRow *const row) override;
private:
std::vector<std::string> dataset_file_; // dataset files
bool load_dataset_; // load dataset from single file or not
@ -164,8 +173,6 @@ class MindRecordOp : public MappableLeafOp {
std::map<std::string, std::string> sample_bytes_;
std::unique_ptr<DataSchema> data_schema_; // Data schema for column typing
std::vector<std::string> columns_blob_; // Blob Columns to load from dataset
std::vector<int32_t> columns_blob_index_; // Blob Columns to load from dataset
std::unique_ptr<ShardReader> shard_reader_;

View File

@ -66,5 +66,19 @@ Status TakeOp::GetNextRow(TensorRow *row) {
return Status::OK();
}
Status TakeOp::GetNextRowPullMode(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
if (take_count_ < max_takes_) {
RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(row));
}
if (take_count_ == max_takes_ || row->eoe()) {
UpdateRepeatAndEpochCounter();
take_count_ = 0;
} else {
take_count_++;
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -60,6 +60,12 @@ class TakeOp : public PipelineOp {
Status GetNextRow(TensorRow *row) override;
protected:
/// \brief Gets the next row
/// \param row[out] - Fetched TensorRow
/// \return Status The status code returned
Status GetNextRowPullMode(TensorRow *const row) override;
private:
int32_t max_takes_; // The number of takes that the user requested
int32_t take_count_; // A counter for the current number of executed takes

View File

@ -260,7 +260,7 @@ Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function<Status(ui
}
// Walks the tree to perform modifications to the tree in post-order to get it ready for execution.
Status ExecutionTree::Prepare() {
Status ExecutionTree::Prepare(bool is_pull_mode) {
if (root_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Please assign one operator as the root of this tree.");
}
@ -279,7 +279,11 @@ Status ExecutionTree::Prepare() {
// By iterating from the end of the FIFO queue, we simulate the post-order walk.
for (auto rit = fifo.crbegin(); rit != fifo.crend(); ++rit) {
RETURN_IF_NOT_OK((*rit)->PrepareOperator());
if (!is_pull_mode) {
RETURN_IF_NOT_OK((*rit)->PrepareOperator());
} else {
RETURN_IF_NOT_OK((*rit)->PrepareOperatorPullBased());
}
}
// The tree is prepared.

View File

@ -185,8 +185,9 @@ class ExecutionTree {
std::shared_ptr<DatasetOp> root() const { return root_; }
/// \brief The prepare phase walks the tree in post-order to perform modifications to get it ready for execution.
/// \param is_pull_mode - an indicator if it's in pull mode or not
/// \return Status The status code returned
Status Prepare();
Status Prepare(bool is_pull_mode = false);
/// \brief Return the pointer to the TaskGroup
/// \return raw pointer to the TaskGroup

View File

@ -322,6 +322,10 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> {
/// \return Shared pointer to the original object
std::shared_ptr<DatasetNode> SetDatasetCache(const std::shared_ptr<DatasetCache> &cache);
/// \brief Setter function for descendant_of_cache_
/// \param[in] descendant_of_cache Indicator for whether this node is a descendant of cache.
void setDescendantOfCache(bool descendant_of_cache) { descendant_of_cache_ = descendant_of_cache; }
/// \brief A helper templated function for casting "this" pointer to shared_ptr<derived>
/// Similar to shared_from_this, except this one will give you the derived class as shared_ptr
/// \return A shared_ptr casted to the derived class

View File

@ -16,6 +16,7 @@ set(DATASET_ENGINE_OPT_SRC_FILES
pre/node_offload_pass.cc
pre/node_removal_pass.cc
pre/skip_pushdown_pass.cc
pre/debug_mode_pass.cc
)
if(ENABLE_PYTHON)

View File

@ -0,0 +1,63 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/opt/pre/debug_mode_pass.h"
#include <string>
#include "minddata/dataset/engine/ir/datasetops/map_node.h"
#include "minddata/dataset/engine/ir/datasetops/root_node.h"
#include "minddata/dataset/include/dataset/datasets.h"
namespace mindspore {
namespace dataset {
bool DebugModePass::RemoveCacheAndOffload(std::shared_ptr<DatasetNode> node) {
// remove DatasetNode cache
bool ret = false;
if (node->IsCached()) {
MS_LOG(WARNING) << node->Name() << " with cache found in the debug mode. Dropping the cache."
<< " If performance is a concern, then disable debug mode.";
node->SetDatasetCache(nullptr);
ret = true;
}
if (node->IsDescendantOfCache()) {
node->setDescendantOfCache(false);
ret = true;
}
if (GlobalContext::config_manager()->get_auto_offload()) {
MS_LOG(WARNING) << "Both debug mode and auto offload are enabled. Disabling auto offload."
"If performance is a concern, then disable debug mode and re-enable auto offload.";
GlobalContext::config_manager()->set_auto_offload(false);
}
return ret;
}
Status DebugModePass::Visit(std::shared_ptr<MapNode> node, bool *const modified) {
*modified = RemoveCacheAndOffload(node);
if (node->GetOffload() == ManualOffloadMode::kEnabled) {
MS_LOG(WARNING) << "Map operation with offload found in the debug mode. Ignoring offload."
"If performance is a concern, then disable debug mode.";
node->SetOffload(ManualOffloadMode::kDisabled);
*modified = true;
}
return Status::OK();
}
Status DebugModePass::Visit(std::shared_ptr<DatasetNode> node, bool *const modified) {
*modified = RemoveCacheAndOffload(node);
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,57 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_ENGINE_OPT_PRE_DEBUG_MODE_PASS_H_
#define DATASET_ENGINE_OPT_PRE_DEBUG_MODE_PASS_H_
#include <memory>
#include <vector>
#include "minddata/dataset/engine/opt/pass.h"
namespace mindspore {
namespace dataset {
/// \class DebugModePass
/// \brief This is a pre parse pass that disable some nodes and prepares for the debug mode.
class DebugModePass : public IRNodePass {
public:
/// \brief Constructor
DebugModePass() {}
/// \brief Destructor
~DebugModePass() = default;
/// \brief Runs a pass on MapNode
/// \param[in] node The node being visited
/// \param[in, out] *modified indicates if the node was changed at all
/// \return Status code
Status Visit(std::shared_ptr<MapNode> node, bool *const modified) override;
/// \brief Runs a pass on DatasetNode
/// \param[in] node The node being visited
/// \param[in, out] *modified indicates if the node was changed at all
/// \return Status code
Status Visit(std::shared_ptr<DatasetNode> node, bool *const modified) override;
protected:
/// \brief Check and remove cache and offload if node has any
/// \param[in] node The node being visited
/// \return true if the node was changed; otherwise, false
bool RemoveCacheAndOffload(std::shared_ptr<DatasetNode> node);
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_ENGINE_OPT_PRE_DEBUG_MODE_PASS_H_

View File

@ -44,7 +44,11 @@ PythonRuntimeContext::~PythonRuntimeContext() {
}
}
PythonIteratorConsumer *PythonRuntimeContext::GetPythonConsumer() {
return dynamic_cast<PythonIteratorConsumer *>(tree_consumer_.get());
TreeConsumer *PythonRuntimeContext::GetPythonConsumer() {
if (GlobalContext::config_manager()->get_debug_mode()) {
return dynamic_cast<PythonPullBasedIteratorConsumer *>(tree_consumer_.get());
} else {
return dynamic_cast<PythonIteratorConsumer *>(tree_consumer_.get());
}
}
} // namespace mindspore::dataset

View File

@ -34,7 +34,7 @@ class PythonRuntimeContext : public RuntimeContext {
/// Safe destructing the tree that includes python objects
~PythonRuntimeContext() override;
PythonIteratorConsumer *GetPythonConsumer();
TreeConsumer *GetPythonConsumer();
private:
/// Internal function to perform the termination

View File

@ -15,6 +15,13 @@
*/
#include "minddata/dataset/engine/tree_adapter_lite.h"
#include "minddata/dataset/engine/ir/datasetops/root_node.h"
#include "minddata/dataset/engine/opt/pass.h"
#include "minddata/dataset/engine/opt/pre/debug_mode_pass.h"
#include "minddata/dataset/engine/opt/pre/deep_copy_pass.h"
#include "minddata/dataset/engine/opt/pre/epoch_ctrl_pass.h"
#include "minddata/dataset/engine/opt/pre/input_validation_pass.h"
#include "minddata/dataset/engine/opt/pre/node_removal_pass.h"
namespace mindspore {
namespace dataset {
@ -54,8 +61,12 @@ Status TreeAdapterLite::BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir,
Status TreeAdapterLite::BuildTree(std::shared_ptr<DatasetNode> root_ir) {
RETURN_UNEXPECTED_IF_NULL(root_ir);
RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir, &root_));
// Build the Execution tree from the child of the IR root node, which represent the root of the input IR tree as a Top
// Node is added to IR tree.
RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir->Children()[0], &root_));
RETURN_IF_NOT_OK(tree_->AssignRoot(root_));
// Prepare the tree
RETURN_IF_NOT_OK(tree_->Prepare(true));
return Status::OK();
}
@ -66,5 +77,52 @@ Status TreeAdapterLite::GetNextRow(TensorRow *const row) {
return Status::OK();
}
Status TreeAdapterLite::PrePass(std::shared_ptr<DatasetNode> ir) {
RETURN_UNEXPECTED_IF_NULL(ir);
// Vector of actions in pre-pass phase
std::vector<std::unique_ptr<IRPass>> actions;
MS_LOG(INFO) << "Prepare PrePass loops.";
(void)actions.emplace_back(std::make_unique<InputValidationPass>());
(void)actions.emplace_back(std::make_unique<NodeRemovalPass>());
(void)actions.emplace_back(std::make_unique<EpochCtrlPass>());
if (GlobalContext::config_manager()->get_debug_mode()) {
(void)actions.emplace_back(std::make_unique<DebugModePass>());
}
// Apply pre-pass actions
for (auto i = 0; i < actions.size(); i++) {
auto m = false;
RETURN_IF_NOT_OK(actions[i]->Run(ir, &m));
}
MS_LOG(INFO) << "PrePass completed.";
return Status::OK();
}
Status TreeAdapterLite::Compile(const std::shared_ptr<DatasetNode> &input_ir, int32_t num_epochs) {
RETURN_UNEXPECTED_IF_NULL(input_ir);
input_ir_ = input_ir;
MS_LOG(INFO) << "Input plan:" << '\n' << *input_ir << '\n';
// Clone the input IR tree and insert under the root node
// Create a root node to host the new copy of the input IR tree
// This is done so that the PrePass will process and modify the tree
// without changing the tree associated with the user code.
// The tree from the user code is permitted to form a graph where any node
// is consumed by more than one parent. However, this cloning process here
// will break the graph into a tree by copying each consumption of a node into a new copy.
DeepCopyPass cloning_tree;
bool m = false;
RETURN_IF_NOT_OK(cloning_tree.Run(input_ir, &m));
std::shared_ptr<RootNode> root_ir = cloning_tree.Root();
root_ir->SetNumEpochs(num_epochs);
MS_LOG(INFO) << "Plan before PrePass:" << '\n' << *root_ir << '\n';
// Pre-pass of the IR tree
RETURN_IF_NOT_OK(PrePass(root_ir));
MS_LOG(INFO) << "Plan after PrePass:" << '\n' << *root_ir << '\n';
root_ir_ = root_ir;
RETURN_IF_NOT_OK(BuildTree(root_ir));
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -44,6 +44,16 @@ class TreeAdapterLite {
std::unordered_map<std::string, int32_t> GetColumnNameMap() const { return tree_->root()->column_name_id_map(); }
// This function performs syntax checking, semantics checking, and then call BuildTree
Status Compile(const std::shared_ptr<DatasetNode> &input_ir, int32_t num_epochs = -1);
protected:
// Run the mandatory pass checking the syntax and semantics of the IR tree
Status PrePass(std::shared_ptr<DatasetNode> ir);
std::shared_ptr<DatasetNode> input_ir_;
std::shared_ptr<DatasetNode> root_ir_;
private:
// This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree.
Status BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op);

View File

@ -190,6 +190,7 @@ if(MSLITE_MINDDATA_IMPLEMENT STREQUAL "full")
${MINDDATA_DIR}/engine/opt/pre/add_skip_pass.cc
${MINDDATA_DIR}/engine/opt/pre/getter_pass.cc
${MINDDATA_DIR}/engine/opt/pre/input_validation_pass.cc
${MINDDATA_DIR}/engine/opt/pre/debug_mode_pass.cc
${MINDDATA_DIR}/engine/opt/pre/cache_validation_pass.cc
${MINDDATA_DIR}/engine/opt/pre/node_removal_pass.cc
${MINDDATA_DIR}/engine/opt/pre/epoch_ctrl_pass.cc

View File

@ -46,7 +46,8 @@ __all__ = ['set_sending_batches', 'load', '_init_device_info',
'set_auto_offload', 'get_auto_offload',
'set_enable_watchdog', 'get_enable_watchdog',
'set_fast_recovery', 'get_fast_recovery',
'set_multiprocessing_timeout_interval', 'get_multiprocessing_timeout_interval']
'set_multiprocessing_timeout_interval', 'get_multiprocessing_timeout_interval',
'set_debug_mode', 'get_debug_mode']
INT32_MAX = 2147483647
UINT32_MAX = 4294967295
@ -832,3 +833,43 @@ def get_fast_recovery():
>>> is_fast_recovery = ds.config.get_fast_recovery()
"""
return _config.get_fast_recovery()
def set_debug_mode(debug_mode_flag):
"""
Set the debug_mode flag of the dataset pipeline
Notes:
1. If both debug_mode and auto_offload are enabled, then during runtime, auto_offload is forcibly disabled.
2. If both debug_mode is enabled and a dataset pipeline has Map operation with offload set, then offload is
ignored.
3. If both debug_mode is enabled and a dataset operation has cache set, then the cache is dropped.
Args:
debug_mode_flag (bool): Whether dataset pipeline debugger is enabled, which forces the pipeline
to run synchronously and sequentially.
Raises:
TypeError: If `debug_mode_flag` is not a boolean data type.
Examples:
>>> ds.config.set_debug_mode(True)
"""
if not isinstance(debug_mode_flag, bool):
raise TypeError("debug_mode_flag isn't of type boolean.")
if debug_mode_flag:
logger.warning("MindData Debug mode is enabled. Performance will be impacted because the pipeline will be"
" running in a single thread.")
_config.set_debug_mode(debug_mode_flag)
def get_debug_mode():
"""
Get the debug_mode flag of the dataset pipeline
Returns:
bool, whether enables dataset pipeline debugger
Examples:
>>> debug_mode = ds.config.get_debug_mode()
"""
return _config.get_debug_mode()

View File

@ -24,6 +24,7 @@ import numpy as np
import mindspore._c_dataengine as cde
from mindspore.common.tensor import Tensor
import mindspore.dataset.engine.offload as offload
from mindspore.dataset.core.config import get_debug_mode
from mindspore import log as logger
@ -75,7 +76,10 @@ class Iterator:
self._runtime_context = cde.PythonRuntimeContext()
self._runtime_context.Init()
consumer = cde.PythonIteratorConsumer(num_epochs)
if get_debug_mode():
consumer = cde.PythonPullBasedIteratorConsumer(num_epochs)
else:
consumer = cde.PythonIteratorConsumer(num_epochs)
consumer.Init(self.ir_tree)
self._runtime_context.AssignConsumer(consumer)
self._iterator = self._runtime_context.GetConsumer()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,244 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/common.h"
#include "minddata/dataset/include/dataset/datasets.h"
#include "minddata/dataset/include/dataset/vision.h"
namespace common = mindspore::common;
using namespace mindspore::dataset;
class MindDataTestPipeline : public UT::DatasetOpTesting {
protected:
};
/// Feature: PullBasedIterator GetNextRowPullMode
/// Description: Test PullBasedIterator on SkipOp
/// Expectation: Output is the same as the normal iterator
TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineSkipOp) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineSkipOp.";
// Create an ImageFolder Dataset
std::string folder_path = datasets_root_path_ + "/testPK/data/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, true, std::make_shared<RandomSampler>(false, 10));
EXPECT_NE(ds, nullptr);
// Create a Skip operation on ds
int32_t count = 3;
ds = ds->Skip(count);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<PullIterator> iter = ds->CreatePullBasedIterator();
EXPECT_NE(iter, nullptr);
// iterate over the dataset and get each row
std::vector<mindspore::MSTensor> row;
ASSERT_OK(iter->GetNextRow(&row));
EXPECT_EQ(row.size(), 2);
uint64_t i = 0;
while (row.size() != 0) {
i++;
auto image = row[0];
MS_LOG(INFO) << "Tensor image shape: " << image.Shape();
ASSERT_OK(iter->GetNextRow(&row));
}
MS_LOG(INFO) << "Number of rows: " << i;
// Expect 10 - 3 = 7 rows
EXPECT_EQ(i, 7);
// Manually terminate the pipeline
iter->Stop();
}
/// Feature: PullBasedIterator GetNextRowPullMode
/// Description: Test PullBasedIterator on SkipOp with count larger than number of rows
/// Expectation: Output is the same as the normal iterator
TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineSkipOpLargeCount) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineSkipOpLargeCount.";
// Create an ImageFolder Dataset
std::string folder_path = datasets_root_path_ + "/testPK/data/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, true, std::make_shared<RandomSampler>(false, 10));
EXPECT_NE(ds, nullptr);
// Create a Skip operation on ds
int32_t count = 30;
ds = ds->Skip(count);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<PullIterator> iter = ds->CreatePullBasedIterator();
EXPECT_NE(iter, nullptr);
// iterate over the dataset and get each row
std::vector<mindspore::MSTensor> row;
ASSERT_OK(iter->GetNextRow(&row));
EXPECT_EQ(row.size(), 0);
iter->Stop();
}
/// Feature: PullBasedIterator GetNextRowPullMode
/// Description: Test PullBasedIterator on TakeOp with default count=-1
/// Expectation: Output is equal to the expected output
TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineTakeOpDefault) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineTakeOpDefault.";
// Create an ImageFolder Dataset
std::string folder_path = datasets_root_path_ + "/testPK/data/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, true, std::make_shared<RandomSampler>(false, 7));
EXPECT_NE(ds, nullptr);
// Create a Take operation on ds, default count = -1
ds = ds->Take();
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<PullIterator> iter = ds->CreatePullBasedIterator();
EXPECT_NE(iter, nullptr);
// iterate over the dataset and get each row
std::vector<mindspore::MSTensor> row;
ASSERT_OK(iter->GetNextRow(&row));
uint64_t i = 0;
while (row.size() != 0) {
i++;
auto image = row[0];
MS_LOG(INFO) << "Tensor image shape: " << image.Shape();
ASSERT_OK(iter->GetNextRow(&row));
}
MS_LOG(INFO) << "Number of rows: " << i;
// Expect 7 rows
EXPECT_EQ(i, 7);
// Manually terminate the pipeline
iter->Stop();
}
/// Feature: PullBasedIterator GetNextRowPullMode
/// Description: Test PullBasedIterator on TakeOp with count = 5
/// Expectation: Output is equal to the expected output
TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineTakeOpCount5) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineTakeOpCount5.";
// Create an ImageFolder Dataset
std::string folder_path = datasets_root_path_ + "/testPK/data/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, true, std::make_shared<RandomSampler>(false, 7));
EXPECT_NE(ds, nullptr);
// Create a Take operation on ds, count = 5
ds = ds->Take(5);
EXPECT_EQ(ds->GetDatasetSize(), 5);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<PullIterator> iter = ds->CreatePullBasedIterator();
EXPECT_NE(iter, nullptr);
// iterate over the dataset and get each row
std::vector<mindspore::MSTensor> row;
ASSERT_OK(iter->GetNextRow(&row));
EXPECT_EQ(row.size(), 2);
uint64_t i = 0;
while (row.size() != 0) {
i++;
auto image = row[0];
MS_LOG(INFO) << "Tensor image shape: " << image.Shape();
ASSERT_OK(iter->GetNextRow(&row));
}
MS_LOG(INFO) << "Number of rows: " << i;
// Expect 5 rows from take(5).
EXPECT_EQ(i, 5);
// Manually terminate the pipeline
iter->Stop();
}
/// Feature: Take op
/// Description: Test Take op with invalid count input
/// Expectation: Error message is logged, and CreatePullBasedIterator() for invalid pipeline returns nullptr
TEST_F(MindDataTestPipeline, TestTakeDatasetError1CreatePullBasedIterator) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTakeDatasetError1CreatePullBasedIterator.";
// Create an ImageFolder Dataset
std::string folder_path = datasets_root_path_ + "/testPK/data/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, true, std::make_shared<RandomSampler>(false, 10));
EXPECT_NE(ds, nullptr);
// Create a Take operation on ds with invalid count input
int32_t count = -5;
auto ds1 = ds->Take(count);
EXPECT_NE(ds1, nullptr);
// Create an iterator over the result of the above dataset
std::shared_ptr<Iterator> iter = ds1->CreatePullBasedIterator();
// Expect failure: invalid Op input
EXPECT_EQ(iter, nullptr);
// Create a Take operation on ds with invalid count input
count = 0;
auto ds2 = ds->Take(count);
EXPECT_NE(ds2, nullptr);
// Create an iterator over the result of the above dataset
iter = ds2->CreatePullBasedIterator();
// Expect failure: invalid Op input
EXPECT_EQ(iter, nullptr);
}
/// Feature: PullBasedIterator GetNextRowPullMode
/// Description: Test PullBasedIterator on TakeOp with invalid count = -5
/// Expectation: Error message is logged, and CreatePullBasedIterator() for invalid pipeline returns nullptr
TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineTakeOpError) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineTakeOpError.";
// Create an ImageFolder Dataset
std::string folder_path = datasets_root_path_ + "/testPK/data/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, true, std::make_shared<RandomSampler>(false, 10));
EXPECT_NE(ds, nullptr);
// Create a Take operation on ds with invalid count input
int32_t count = -5;
auto ds1 = ds->Take(count);
EXPECT_NE(ds1, nullptr);
// Create an iterator over the result of the above dataset
std::shared_ptr<PullIterator> iter = ds1->CreatePullBasedIterator();
std::vector<mindspore::MSTensor> row;
// Expect failure: invalid Op input
EXPECT_EQ(iter, nullptr);
// Create a Take operation on ds with invalid count input
count = 0;
auto ds2 = ds->Take(count);
EXPECT_NE(ds2, nullptr);
// Create an iterator over the result of the above dataset
iter = ds2->CreatePullBasedIterator();
// Expect failure: invalid Op input
EXPECT_EQ(iter, nullptr);
}

View File

@ -5,5 +5,6 @@
"workerConnectorSize": 16,
"opConnectorSize": 16,
"seed": 5489,
"monitorSamplingInterval": 15
"monitorSamplingInterval": 15,
"debug_mode_flag": true
}

View File

@ -54,6 +54,7 @@ def test_basic():
seed_original = ds.config.get_seed()
monitor_sampling_interval_original = ds.config.get_monitor_sampling_interval()
fast_recovery_original = ds.config.get_fast_recovery()
debug_mode = ds.config.get_debug_mode()
ds.config.load('../data/dataset/declient.cfg')
@ -63,6 +64,7 @@ def test_basic():
assert ds.config.get_seed() == 5489
assert ds.config.get_monitor_sampling_interval() == 15
assert ds.config.get_fast_recovery()
assert ds.config.get_debug_mode()
ds.config.set_num_parallel_workers(2)
# ds.config.set_worker_connector_size(3)
@ -70,6 +72,7 @@ def test_basic():
ds.config.set_seed(5)
ds.config.set_monitor_sampling_interval(45)
ds.config.set_fast_recovery(False)
ds.config.set_debug_mode(False)
assert ds.config.get_num_parallel_workers() == 2
# assert ds.config.get_worker_connector_size() == 3
@ -77,6 +80,7 @@ def test_basic():
assert ds.config.get_seed() == 5
assert ds.config.get_monitor_sampling_interval() == 45
assert not ds.config.get_fast_recovery()
assert not ds.config.get_debug_mode()
# Restore original configuration values
ds.config.set_num_parallel_workers(num_parallel_workers_original)
@ -84,6 +88,7 @@ def test_basic():
ds.config.set_seed(seed_original)
ds.config.set_monitor_sampling_interval(monitor_sampling_interval_original)
ds.config.set_fast_recovery(fast_recovery_original)
ds.config.set_debug_mode(debug_mode)
def test_get_seed():
@ -516,6 +521,30 @@ def test_fast_recovery():
assert "set_fast_recovery() missing 1 required positional argument: 'fast_recovery'" in str(error_info.value)
def test_debug_mode():
"""
Feature: Test the debug mode setter/getter function
Description: This function only accepts a boolean as input and outputs error otherwise
Expectation: TypeError will be raised when input argument is missing or is not a boolean
"""
# set_debug_mode() to True and then check if the value is indeed True with get_debug_mode().
debug_mode_flag = True
ds.config.set_debug_mode(debug_mode_flag)
assert ds.config.get_debug_mode() == debug_mode_flag
# set_debug_mode will raise TypeError if input is an integer
config_error_func(ds.config.set_debug_mode, 0, TypeError, "debug_mode_flag isn't of type boolean.")
# set_debug_mode will raise TypeError if input is a string
config_error_func(ds.config.set_debug_mode, "True", TypeError, "debug_mode_flag isn't of type boolean.")
# set_debug_mode will raise TypeError if input is a tuple
config_error_func(ds.config.set_debug_mode, (True,), TypeError, "debug_mode_flag isn't of type boolean.")
# set_debug_mode will raise TypeError if input is None
config_error_func(ds.config.set_debug_mode, None, TypeError, "debug_mode_flag isn't of type boolean.")
# set_debug_mode will raise TypeError if no input is provided
with pytest.raises(TypeError) as error_info:
ds.config.set_debug_mode()
assert "set_debug_mode() missing 1 required positional argument: 'debug_mode_flag'" in str(error_info.value)
if __name__ == '__main__':
test_basic()
test_get_seed()
@ -532,3 +561,4 @@ if __name__ == '__main__':
test_multiprocessing_timeout_interval()
test_config_bool_type_error()
test_fast_recovery()
test_debug_mode()

View File

@ -0,0 +1,185 @@
# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import pytest
import mindspore.dataset as ds
import mindspore.dataset.vision as vision
from mindspore.dataset.vision import Inter
from mindspore import log as logger
# Need to run all these tests in separate processes since
# the global configuration setting of debug_mode may impact other tests running in parallel.
DEBUG_MODE = False
def setup_function():
global DEBUG_MODE
DEBUG_MODE = ds.config.get_debug_mode()
ds.config.set_debug_mode(True)
def teardown_function():
ds.config.set_debug_mode(DEBUG_MODE)
@pytest.mark.forked
def test_pipeline_debug_mode_tuple():
"""
Feature: Pipeline debug mode.
Description: Test creating tuple iterator with debug mode enabled.
Expectation: Successful.
"""
logger.info("test_pipeline_debug_mode_tuple")
data = ds.CelebADataset("../data/dataset/testCelebAData/", decode=True, num_shards=1, shard_id=0)
crop_size = (80, 80)
resize_size = (24, 24)
# define map operations
center_crop = vision.CenterCrop(crop_size)
resize_op = vision.Resize(resize_size, Inter.LINEAR) # Bilinear mode
data = data.map(operations=[center_crop, resize_op], input_columns=["image"])
data = data.batch(2)
num_row = 0
for item in data.create_tuple_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 2
assert item[0].shape == (2, 24, 24, 3)
assert item[1].shape == (2, 40)
num_row += 1
assert num_row == 2
@pytest.mark.forked
def test_pipeline_debug_mode_dict():
"""
Feature: Pipeline debug mode.
Description: Test creating dict iterator with debug mode enabled.
Expectation: Successful.
"""
logger.info("test_pipeline_debug_mode_dict")
data = ds.CelebADataset("../data/dataset/testCelebAData/", decode=True, num_shards=1, shard_id=0)
crop_size = (80, 80)
resize_size = (24, 24)
# define map operations
center_crop = vision.CenterCrop(crop_size)
resize_op = vision.Resize(resize_size, Inter.LINEAR) # Bilinear mode
data = data.map(operations=[center_crop, resize_op], input_columns=["image"])
data = data.batch(2)
num_row = 0
for item in data.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 2
assert item["image"].shape == (2, 24, 24, 3)
assert item["attr"].shape == (2, 40)
num_row += 1
assert num_row == 2
@pytest.mark.forked
def test_pipeline_debug_mode_minddata():
"""
Feature: Pipeline debug mode.
Description: Test iterator with MindDataset in debug mode.
Expectation:Successful.
"""
logger.info("test_pipeline_debug_mode_minddata")
data = ds.MindDataset("../data/mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0")
data_lst = []
for item in data.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 3
data_lst.append(item["data"].copy())
assert len(data_lst) == 20
def test_pipeline_debug_mode_not_support():
"""
Feature: Pipeline debug mode.
Description: Test creating tuple iterator with op have not supported in pull mode.
Expectation: Successful with no data generated.
"""
logger.info("test_pipeline_debug_mode_not_support")
data = ds.NumpySlicesDataset(data=[[0, 1, 2]], column_names=["data"])
num_rows = 0
for _ in data.create_tuple_iterator(num_epochs=1, output_numpy=True):
num_rows += 1
assert num_rows == 0
@pytest.mark.forked
def test_pipeline_debug_mode_map_pyfunc():
"""
Feature: Pipeline debug mode.
Description: Test creating dict iterator with map(PyFunc).
Expectation: Successful.
"""
logger.info("test_pipeline_debug_mode_map_pyfunc")
data = ds.CelebADataset("../data/dataset/testCelebAData/", decode=True, num_shards=1, shard_id=0)
data = data.map(operations=[(lambda x: x - 1), (lambda x: x * 2)], input_columns=["image"])
num_rows = 0
for item in data.create_dict_iterator(num_epochs=1):
assert len(item) == 2
assert item["image"].shape == (2268, 4032, 3)
num_rows += 1
assert num_rows == 4
@pytest.mark.forked
def test_pipeline_debug_mode_batch_pyfunc():
"""
Feature: Pipeline debug mode.
Description: Test creating dict iterator with Batch(PyFunc).
Expectation: Successful.
"""
logger.info("test_pipeline_debug_mode_batch_pyfunc")
def add_one(batch_info):
return batch_info.get_batch_num() + 1
data = ds.MnistDataset("../data/dataset/testMnistData", num_samples=20)
data = data.batch(batch_size=add_one, drop_remainder=True)
num_rows = 0
for item in data.create_dict_iterator(num_epochs=1, output_numpy=True):
num_rows += 1
assert item["label"].shape == (num_rows,)
assert num_rows == 5
@pytest.mark.forked
def test_pipeline_debug_mode_concat():
"""
Feature: Pipeline debug mode.
Description: Test creating tuple iterator with concat.
Expectation: Successful.
"""
logger.info("test_pipeline_debug_mode_concat")
data_dir = "../data/dataset/testCelebAData/"
data1 = ds.CelebADataset(data_dir, decode=True, num_shards=1, shard_id=0)
data2 = ds.CelebADataset(data_dir, decode=True, num_shards=1, shard_id=0)
data3 = ds.CelebADataset(data_dir, decode=True, num_shards=1, shard_id=0)
data4 = data1.concat(data2)
data5 = data3 + data4
num_rows = 0
for item1 in data5.create_tuple_iterator(num_epochs=1):
assert len(item1) == 2
assert item1[0].shape == (2268, 4032, 3)
num_rows += 1
assert num_rows == 12
if __name__ == '__main__':
test_pipeline_debug_mode_tuple()
test_pipeline_debug_mode_dict()
test_pipeline_debug_mode_minddata()
test_pipeline_debug_mode_not_support()
test_pipeline_debug_mode_map_pyfunc()
test_pipeline_debug_mode_batch_pyfunc()
test_pipeline_debug_mode_concat()