diff --git a/mindspore/ccsrc/minddata/dataset/api/iterator.cc b/mindspore/ccsrc/minddata/dataset/api/iterator.cc index e20a3df6e51..7314c1f0615 100644 --- a/mindspore/ccsrc/minddata/dataset/api/iterator.cc +++ b/mindspore/ccsrc/minddata/dataset/api/iterator.cc @@ -96,7 +96,8 @@ PullIterator::~PullIterator() = default; // Get the next row from the data pipeline. Status PullIterator::GetRows(int32_t num_rows, std::vector *const row) { RETURN_UNEXPECTED_IF_NULL(row); - CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr. Please launch iterator fist."); + CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr. Please launch iterator first."); + row->clear(); for (int i = 0; i < num_rows; i++) { std::vector> md_row; Status rc = pull_consumer_->GetNextAsVector(&md_row); @@ -120,6 +121,7 @@ Status PullIterator::GetRows(int32_t num_rows, std::vector *const r Status PullIterator::GetNextRow(MSTensorVec *const row) { RETURN_UNEXPECTED_IF_NULL(row); CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr."); + row->clear(); std::vector> md_row; Status rc = pull_consumer_->GetNextAsVector(&md_row); if (rc.IsError()) { diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc index c875455f9d4..0df13d8dcbf 100755 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc @@ -77,6 +77,8 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) { .def("get_dynamic_shape", &ConfigManager::dynamic_shape) .def("set_fast_recovery", &ConfigManager::set_fast_recovery) .def("get_fast_recovery", &ConfigManager::fast_recovery) + .def("set_debug_mode", &ConfigManager::set_debug_mode) + .def("get_debug_mode", &ConfigManager::get_debug_mode) .def("load", [](ConfigManager &c, const std::string &s) { THROW_IF_ERROR(c.LoadFile(s)); }); })); diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/consumer/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/consumer/bindings.cc index a356b04be44..e764ccb7b59 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/consumer/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/consumer/bindings.cc @@ -49,6 +49,27 @@ PYBIND_REGISTER(PythonIteratorConsumer, 1, ([](const py::module *m) { }); })); +PYBIND_REGISTER( + PythonPullBasedIteratorConsumer, 1, ([](const py::module *m) { + (void)py::class_>( + *m, "PythonPullBasedIteratorConsumer") + .def(py::init()) + .def("Init", + [](PythonPullBasedIteratorConsumer &self, std::shared_ptr d) { THROW_IF_ERROR(self.Init(d)); }) + .def("GetNextAsMap", + [](PythonPullBasedIteratorConsumer &self) { + py::dict output; + THROW_IF_ERROR(self.GetNextAsDict(&output)); + return output; + }) + .def("GetOffload", [](PythonPullBasedIteratorConsumer &self) { return self.GetOffload(); }) + .def("GetNextAsList", [](PythonPullBasedIteratorConsumer &self) { + py::list output; + THROW_IF_ERROR(self.GetNextAsList(&output)); + return output; + }); + })); + PYBIND_REGISTER(TreeGetters, 1, ([](const py::module *m) { (void)py::class_>(*m, "TreeGetters") diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc index 257fe575c63..dcd7160c701 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc @@ -93,6 +93,7 @@ Status ConfigManager::FromJson(const nlohmann::json &j) { set_cache_port(j.value("cachePort", cache_port_)); set_num_connections(j.value("numConnections", num_connections_)); set_cache_prefetch_size(j.value("cachePrefetchSize", cache_prefetch_size_)); + set_debug_mode(j.value("debug_mode_flag", debug_mode_flag_)); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.h b/mindspore/ccsrc/minddata/dataset/core/config_manager.h index 8c9408e77bf..5a2a7a00a05 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.h +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.h @@ -293,6 +293,14 @@ class ConfigManager { // @return - Flag to indicate whether md pipeline recovers fast in failover reset bool fast_recovery() const { return fast_recovery_; } + // setter function + // @param debug_mode_flag - Indicate whether the debug mode is on + void set_debug_mode(const bool debug_mode_flag) { debug_mode_flag_ = debug_mode_flag; } + + // getter function + // @return - Flag to indicate whether the debug mode is on + bool get_debug_mode() const { return debug_mode_flag_; } + private: // Private helper function that takes a nlohmann json format and populates the settings // @param j - The json nlohmann json info @@ -327,7 +335,8 @@ class ConfigManager { uint32_t multiprocessing_timeout_interval_; // Multiprocessing timeout interval in seconds std::string autotune_json_filepath_; // Filepath name of the final AutoTune Configuration JSON file bool dynamic_shape_{false}; - bool fast_recovery_{true}; // Used for failover scenario to recover quickly or produce same augmentations + bool fast_recovery_{true}; // Used for failover scenario to recover quickly or produce same augmentations + bool debug_mode_flag_{false}; // Indicator for debug mode }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc index 0f6140efbc5..792e691b64b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc @@ -19,11 +19,9 @@ #include namespace mindspore::dataset { -PullBasedIteratorConsumer::PullBasedIteratorConsumer() { tree_adapter_lite_ = std::make_unique(); } - Status PullBasedIteratorConsumer::Init(std::shared_ptr root) { RETURN_UNEXPECTED_IF_NULL(root); - return tree_adapter_lite_->BuildTree(std::move(root)); + return tree_adapter_lite_->Compile(std::move(root), num_epochs_); } std::vector PullBasedIteratorConsumer::GetRows(int64_t num_rows) { @@ -73,7 +71,6 @@ Status PullBasedIteratorConsumer::GetNextAsMap(std::unordered_map>> *const vec) { - std::unique_ptr tree_adapter = std::make_unique(); CHECK_FAIL_RETURN_UNEXPECTED(vec != nullptr && vec->empty(), "vec is null or non-empty."); TensorRow curr_row; @@ -85,7 +82,7 @@ Status PullBasedIteratorConsumer::GetNextAsOrderedPair( if (column_order_.empty()) { const int32_t invalid_col_id = -1; column_order_.resize(num_cols, {std::string(), invalid_col_id}); - for (const auto &itr : tree_adapter->GetColumnNameMap()) { + for (const auto &itr : tree_adapter_lite_->GetColumnNameMap()) { int32_t ind = itr.second; CHECK_FAIL_RETURN_UNEXPECTED(ind < num_cols && ind >= 0, "column id out of bounds."); column_order_[ind] = std::make_pair(itr.first, ind); diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h index 9a105244060..3b532ce02f8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h @@ -23,6 +23,7 @@ #include #include #include "minddata/dataset/engine/tree_adapter_lite.h" +#include "minddata/dataset/engine/consumers/tree_consumer.h" namespace mindspore::dataset { @@ -30,14 +31,17 @@ class TreeAdapterLite; class TensorRow; /// Consumer that iterates over the dataset and returns the rows one by one as a in a pull based fashion -class PullBasedIteratorConsumer { +class PullBasedIteratorConsumer : public TreeConsumer { public: /// Constructor - PullBasedIteratorConsumer(); + /// \param num_epochs number of epochs. Default: 1. + explicit PullBasedIteratorConsumer(int32_t num_epochs = 1) : TreeConsumer(num_epochs) { + tree_adapter_lite_ = std::make_unique(); + } ~PullBasedIteratorConsumer() = default; - Status Init(std::shared_ptr root); + Status Init(std::shared_ptr root) override; /// \brief Returns the next row in a vector format /// \note This is currently a placeholder function @@ -48,17 +52,22 @@ class PullBasedIteratorConsumer { /// Returns the next row in a vector format /// \param[out] out std::vector of Tensors /// \return Status error code - Status GetNextAsVector(std::vector *const out); + Status GetNextAsVector(std::vector *const out) override; /// Returns the next row in as a map /// \param[out] out std::map of string to Tensor /// \return Status error code - Status GetNextAsMap(std::unordered_map *out); + Status GetNextAsMap(std::unordered_map *const out) override; /// Returns the next row in as a vector /// \param[out] vec std::vector of pairs of string to Tensor /// \return Status error code - Status GetNextAsOrderedPair(std::vector>> *vec); + Status GetNextAsOrderedPair(std::vector>> *const vec) override; + + protected: + /// Method to return the name of the consumer + /// \return string + std::string Name() override { return "PullBasedIteratorConsumer"; } private: std::unique_ptr tree_adapter_lite_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.cc index 461585e76e2..813aeb89a40 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.cc @@ -48,6 +48,35 @@ Status PythonIteratorConsumer::GetNextAsDict(const py::dict *out) { return Status::OK(); } +Status PythonPullBasedIteratorConsumer::GetNextAsList(const py::list *out) { + RETURN_UNEXPECTED_IF_NULL(out); + std::vector row; + { + py::gil_scoped_release gil_release; + RETURN_IF_NOT_OK(GetNextAsVector(&row)); + } + for (auto el : row) { + (*out).append(el); + } + return Status::OK(); +} + +Status PythonPullBasedIteratorConsumer::GetNextAsDict(const py::dict *out) { + RETURN_UNEXPECTED_IF_NULL(out); + std::vector>> vec; + Status s; + { + py::gil_scoped_release gil_release; + s = GetNextAsOrderedPair(&vec); + } + RETURN_IF_NOT_OK(s); + // Generate Python dict, python dict maintains its insertion order + for (const auto &pair : vec) { + (*out)[common::SafeCStr(pair.first)] = pair.second; + } + return Status::OK(); +} + Status PythonBuildVocabConsumer::Start() { py::gil_scoped_release gil_release; return BuildVocabConsumer::Start(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.h b/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.h index 011490f826f..4af185fb50e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.h +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.h @@ -20,6 +20,7 @@ #include #include #include "minddata/dataset/engine/consumers/tree_consumer.h" +#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h" #include "pybind11/pybind11.h" namespace mindspore::dataset { @@ -44,6 +45,24 @@ class PythonIteratorConsumer : public IteratorConsumer { Status GetNextAsDict(const py::dict *out); }; +class PythonPullBasedIteratorConsumer : public PullBasedIteratorConsumer { + public: + /// Constructor which will call the base class default constructor. + /// \param num_epochs number of epochs. Default to -1 (infinite epochs). + explicit PythonPullBasedIteratorConsumer(int32_t num_epochs = -1) : PullBasedIteratorConsumer() {} + + ~PythonPullBasedIteratorConsumer() = default; + /// Returns the next row in a vector format + /// \param[out] out std::vector of Tensors + /// \return Status error code + Status GetNextAsList(const py::list *out); + + /// Returns the next row in as a map + /// \param[out] out std::map of string to Tensor + /// \return Status error code + Status GetNextAsDict(const py::dict *out); +}; + class PythonBuildVocabConsumer : public BuildVocabConsumer { public: Status Start() override; diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc index 4dac024abb3..5101ffa0204 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc @@ -174,7 +174,7 @@ Status IteratorConsumer::Init(std::shared_ptr d) { return Status::OK(); } -Status IteratorConsumer::GetNextAsVector(std::vector *out) { +Status IteratorConsumer::GetNextAsVector(std::vector *const out) { RETURN_UNEXPECTED_IF_NULL(out); out->clear(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h index 140f5f7d47d..45edeb00ece 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h @@ -80,6 +80,23 @@ class TreeConsumer { Status InitAutoTune(); #endif + /// Returns the next row in a vector format + /// \param[out] out std::vector of Tensors + /// \return Status error code + virtual Status GetNextAsVector(std::vector *const out) { return Status::OK(); } + + /// Returns the next row in as a map + /// \param[out] out std::map of string to Tensor + /// \return Status error code + virtual Status GetNextAsMap(std::unordered_map *const out) { return Status::OK(); } + + /// Returns the next row in as a vector + /// \param[out] out std::vector of pairs of string to Tensor + /// \return Status error code + virtual Status GetNextAsOrderedPair(std::vector>> *const vec) { + return Status::OK(); + } + protected: /// The class owns the tree_adapter that handles execution tree operations. std::unique_ptr tree_adapter_; @@ -111,17 +128,17 @@ class IteratorConsumer : public TreeConsumer { /// Returns the next row in a vector format /// \param[out] out std::vector of Tensors /// \return Status error code - Status GetNextAsVector(std::vector *out); + Status GetNextAsVector(std::vector *const out) override; /// Returns the next row in as a map /// \param[out] out std::map of string to Tensor /// \return Status error code - Status GetNextAsMap(std::unordered_map *const out); + Status GetNextAsMap(std::unordered_map *const out) override; /// Returns the next row in as a vector /// \param[out] out std::vector of pairs of string to Tensor /// \return Status error code - Status GetNextAsOrderedPair(std::vector>> *const vec); + Status GetNextAsOrderedPair(std::vector>> *const vec) override; Status RegisterProfilingManager() override; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc index 96eadb1a63b..62b43395091 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc @@ -697,10 +697,10 @@ Status BatchOp::GetNextRowPullMode(TensorRow *const row) { } } RETURN_UNEXPECTED_IF_NULL(table); - if (pad_) { - RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_)); - } // do padding if needed if (!table->empty()) { + if (pad_) { + RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_)); + } // do padding if needed RETURN_IF_NOT_OK(BatchRows(&table, row, table->size())); batch_cnt_++; batch_num_++; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc index 43a7919de4b..e098ef4a377 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc @@ -207,5 +207,44 @@ Status ConcatOp::GetNextRow(TensorRow *row) { return Status::OK(); } + +Status ConcatOp::GetNextRowPullMode(TensorRow *const row) { + RETURN_UNEXPECTED_IF_NULL(row); + bool is_not_mappable_or_second_ne_zero = true; + + if (!children_flag_and_nums_.empty()) { + const bool is_not_mappable = children_flag_and_nums_[cur_child_].first != 0 ? true : false; + const bool second_ne_zero = children_flag_and_nums_[cur_child_].second == 0 ? true : false; + is_not_mappable_or_second_ne_zero = is_not_mappable || second_ne_zero; + } + RETURN_IF_NOT_OK(child_[static_cast(cur_child_)]->GetNextRowPullMode(row)); + + if (row->eoe()) { + // if last child, send out eoe and reset epoch + if (cur_child_ == child_.size() - 1) { + // reset + cur_child_ = 0; + verified_ = false; + UpdateRepeatAndEpochCounter(); + return Status::OK(); + } + if (!is_not_mappable_or_second_ne_zero) { + sample_number_ += children_flag_and_nums_[cur_child_].second; + } + cur_child_++; + verified_ = false; + RETURN_IF_NOT_OK(GetNextRowPullMode(row)); + return Status::OK(); + } else { + if (!verified_) { + RETURN_IF_NOT_OK(Verify(cur_child_, *row)); + } + if (IgnoreSample()) { + RETURN_IF_NOT_OK(GetNextRowPullMode(row)); + } + return Status::OK(); + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.h index d8c8a77f7e7..f844bda471d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.h @@ -74,6 +74,8 @@ class ConcatOp : public PipelineOp { Status GetNextRow(TensorRow *row) override; + Status GetNextRowPullMode(TensorRow *const row) override; + /// Check if the current sample will be taken or dropped /// \return bool bool IgnoreSample(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc index a6c5659e6f8..5e176c2fbfc 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc @@ -255,6 +255,10 @@ void DatasetOp::Print(std::ostream &out, bool show_all) const { Status DatasetOp::GetNextRowPullMode(TensorRow *const row) { RETURN_UNEXPECTED_IF_NULL(row); + if (child_.empty()) { + MS_LOG(DEBUG) << "No child for operator [" << Name() << "]."; + return Status::OK(); + } RETURN_UNEXPECTED_IF_NULL(child_[0]); return child_[0]->GetNextRowPullMode(row); } @@ -326,6 +330,14 @@ Status DatasetOp::PrepareOperator() { return Status::OK(); } +// During tree prepare phase, operators may have specific post-operations to perform depending on their role. +Status DatasetOp::PrepareOperatorPullBased() { + // Generate the column name map for the current op. + RETURN_IF_NOT_OK(this->ComputeColMap()); + + return Status::OK(); +} + // Derived classes may implement the reset function if the operator is stateful and needs // specific reset handling that is not contained in this common code version of the reset. Status DatasetOp::Reset() { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h index e4417254155..b1c8a60e796 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h @@ -201,6 +201,12 @@ class DatasetOp : public std::enable_shared_from_this { // before providing their own implementations. virtual Status PrepareOperator(); + // \brief During tree prepare phase, operators may have specific post-operations to perform depending on + // their role. + // \notes Derived versions of this function should always call its superclass version first + // before providing their own implementations. + virtual Status PrepareOperatorPullBased(); + // \brief Getter function // \return The operator id int32_t id() const { return operator_id_; } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc index dad977d15c0..e8a73000822 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "minddata/dataset/callback/callback_param.h" @@ -451,5 +452,62 @@ std::vector MapOp::GetMPWorkerPIDs() const { } return DatasetOp::GetMPWorkerPIDs(); } + +Status MapOp::GetNextRowPullMode(TensorRow *const row) { + TensorRow new_row; + RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row)); + if (new_row.empty()) { + return Status::OK(); + } + auto column_name_id_map = child_[0]->column_name_id_map(); + // Apply transforms on tensor + for (auto &t : tfuncs_[0]) { + for (auto &col_name : in_columns_) { + TensorRow i_row, o_row; + auto index = column_name_id_map[col_name]; + i_row.push_back(new_row.at(index)); + Status rc = t->Compute(i_row, &o_row); + if (rc.IsError()) { + std::string op_name = t->Name(); + RETURN_IF_NOT_OK(RebuildMapErrorMsg(new_row, op_name, &rc)); + } + // For next transform + new_row[index] = std::move(o_row.at(0)); + } + } + (*row) = std::move(new_row); + return Status::OK(); +} + +Status MapOp::RebuildMapErrorMsg(const TensorRow &input_row, const std::string &op_name, Status *rc) { + std::string err_msg = ""; + // Need to remove the suffix "Op" whose length is 2 + std::string abbr_op_name = op_name.substr(0, op_name.length() - 2); + err_msg += "map operation: [" + abbr_op_name + "] failed. "; + if (input_row.getPath().size() > 0 && !input_row.getPath()[0].empty()) { + err_msg += "The corresponding data file is: " + input_row.getPath()[0]; + if (input_row.getPath().size() > 1) { + std::set path_set; + path_set.insert(input_row.getPath()[0]); + for (auto j = 1; j < input_row.getPath().size(); j++) { + if (!input_row.getPath()[j].empty() && path_set.find(input_row.getPath()[j]) == path_set.end()) { + err_msg += ", " + input_row.getPath()[j]; + path_set.insert(input_row.getPath()[j]); + } + } + } + err_msg += ". "; + } + std::string tensor_err_msg = rc->GetErrDescription(); + if (rc->GetLineOfCode() < 0) { + err_msg += "Error description:\n"; + } + err_msg += tensor_err_msg; + if (abbr_op_name == "PyFunc") { + RETURN_STATUS_ERROR(StatusCode::kMDPyFuncException, err_msg); + } + rc->SetErrDescription(err_msg); + return *rc; +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h index bcb141d144b..8143bd8af51 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h @@ -149,6 +149,8 @@ class MapOp : public ParallelOp, TensorRow> { /// \return vector of int std::vector GetMPWorkerPIDs() const override; + Status GetNextRowPullMode(TensorRow *const row) override; + private: // A helper function to create jobs for workers. Status GenerateWorkerJob(const std::unique_ptr *worker_job, int32_t worker_id); @@ -214,6 +216,9 @@ class MapOp : public ParallelOp, TensorRow> { Status Launch() override; Status AddNewWorkers(int32_t num_new_workers) override; Status RemoveWorkers(int32_t num_workers) override; + + private: + Status RebuildMapErrorMsg(const TensorRow &input_row, const std::string &op_name, Status *rc); }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc index aa225f39b2f..3eebd273467 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc @@ -263,5 +263,16 @@ Status ShuffleOp::EoeReceived(int32_t worker_id) { state_ = OpState::kDeOpIdle; return Status::OK(); } + +Status ShuffleOp::GetNextRowPullMode(TensorRow *const row) { + RETURN_UNEXPECTED_IF_NULL(row); + RETURN_UNEXPECTED_IF_NULL(child_[0]); + if (GlobalContext::config_manager()->get_debug_mode()) { + MS_LOG(WARNING) << "In debug mode, shuffle operation is disabled for debugging purposes."; + } else { + MS_LOG(WARNING) << "Shuffle operation has not been implemented yet in pull mode."; + } + return child_[0]->GetNextRowPullMode(row); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h index b8526716bfd..b9bb0d02c75 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h @@ -94,6 +94,11 @@ class ShuffleOp : public PipelineOp { // @return Status The status code returned Status PrepareOperator() override; + /// \brief Gets the next row + /// \param row[out] - Fetched TensorRow + /// \return Status The status code returned + Status GetNextRowPullMode(TensorRow *const row) override; + private: // Private function to add a new row to the shuffle buffer. // @return Status The status code returned diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc index 1a8159f9d6a..18cfe7195da 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc @@ -72,5 +72,30 @@ Status SkipOp::GetNextRow(TensorRow *row) { } return Status::OK(); } + +Status SkipOp::GetNextRowPullMode(TensorRow *const row) { + RETURN_UNEXPECTED_IF_NULL(row); + bool eoe_received = false; + while (skip_count_ < max_skips_) { + RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(row)); + if (row->eoe() && !once_only_) { + eoe_received = true; + break; + } + if (!row->eoe()) { + skip_count_++; + } + } + if (!eoe_received) { + RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(row)); + } + if (row->eoe()) { + UpdateRepeatAndEpochCounter(); + if (!once_only_) { + skip_count_ = 0; + } + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.h index 366903a9b09..fc5e500cff6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.h @@ -52,6 +52,12 @@ class SkipOp : public PipelineOp { void SetOnceOnly(bool once_only) { once_only_ = once_only; } + protected: + /// \brief Gets the next row + /// \param row[out] - Fetched TensorRow + /// \return Status The status code returned + Status GetNextRowPullMode(TensorRow *const row) override; + private: int32_t max_skips_; // The number of skips that the user requested int32_t skip_count_; // A counter for the current number of executed skips diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc index aad8f15fa43..bc04d714661 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc @@ -1,5 +1,5 @@ /** - * Copyright 2020-2021 Huawei Technologies Co., Ltd + * Copyright 2020-2022 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,9 +37,7 @@ AlbumOp::AlbumOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, boo extensions_(exts), data_schema_(std::move(data_schema)), sampler_ind_(0), - dirname_offset_(0), - sample_ids_(nullptr), - curr_row_(0) { + dirname_offset_(0) { // Set the column name map (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { column_name_id_map_[data_schema_->Column(i).Name()] = i; @@ -430,29 +428,9 @@ Status AlbumOp::ComputeColMap() { column_name_id_map_[data_schema_->Column(i).Name()] = i; } } else { - MS_LOG(WARNING) << "Column name map is already set!"; + MS_LOG(INFO) << "Column name map is already set!"; } return Status::OK(); } - -Status AlbumOp::GetNextRowPullMode(TensorRow *const row) { - if (image_rows_.empty()) { - RETURN_IF_NOT_OK(PrepareData()); - } - if (sample_ids_ == nullptr) { - RETURN_IF_NOT_OK(this->InitSampler()); - TensorRow sample_row; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); - sample_ids_ = sample_row[0]; - } - if (curr_row_ + 1 > sample_ids_->Size()) { - return Status::OK(); - } - int64_t key; - RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_})); - RETURN_IF_NOT_OK(LoadTensorRow(key, row)); - curr_row_++; - return Status::OK(); -} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h index 9f3803f4123..b20ba6fe4c8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h @@ -160,11 +160,6 @@ class AlbumOp : public MappableLeafOp { /// \return Status The status code returned Status loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row); - /// \brief Gets the next row - /// \param row[out] - Fetched TensorRow - /// \return Status The status code returned - Status GetNextRowPullMode(TensorRow *const row) override; - /// Private function for computing the assignment of the column name map. /// \return Status The status code returned Status ComputeColMap() override; @@ -177,9 +172,6 @@ class AlbumOp : public MappableLeafOp { int64_t sampler_ind_; int64_t dirname_offset_; std::vector image_rows_; - TensorPtr sample_ids_; - - uint32_t curr_row_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc index a2ea2b1cbd5..38a4d6ada5e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc @@ -1,5 +1,5 @@ /** - * Copyright 2019-2021 Huawei Technologies Co., Ltd + * Copyright 2019-2022 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -315,5 +315,15 @@ Status CelebAOp::ComputeColMap() { return Status::OK(); } +Status CelebAOp::InitPullMode() { + if (!image_labels_vec_.empty()) { + return Status::OK(); + } + if (attr_info_queue_->empty()) { + RETURN_IF_NOT_OK(ParseAttrFile()); + } + return PrepareData(); +} + } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h index a34f5f402ba..4c0f9a52559 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h @@ -1,5 +1,5 @@ /** - * Copyright 2019-2021 Huawei Technologies Co., Ltd + * Copyright 2019-2022 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -119,6 +119,10 @@ class CelebAOp : public MappableLeafOp { // @return - Status Status ComputeColMap() override; + /// Initialize pull mode, calls PrepareData() within + /// @return Status The status code returned + Status InitPullMode() override; + std::string folder_path_; // directory of celeba folder bool decode_; std::set extensions_; /// extensions allowed diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc index 09972305e2a..79b36c7fdd3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc @@ -340,5 +340,17 @@ Status ImageFolderOp::GetNumClasses(int64_t *num_classes) { num_classes_ = *num_classes; return Status::OK(); } + +Status ImageFolderOp::InitPullMode() { + // to avoid the concurrent and multi end signal in StartAsyncWalk, explicitly set num_workers_ to 1 + num_workers_ = 1; + if (folder_name_queue_->empty()) { + RETURN_IF_NOT_OK(StartAsyncWalk()); + } + if (image_name_queue_->empty()) { + RETURN_IF_NOT_OK(PrescanWorkerEntry(id())); + } + return PrepareData(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h index 88c9afdbd3a..056db178415 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h @@ -151,6 +151,10 @@ class ImageFolderOp : public MappableLeafOp { /// @return - Status Status ComputeColMap() override; + /// Initialize pull mode, calls PrepareData() within + /// @return Status The status code returned + Status InitPullMode() override; + std::string folder_path_; // directory of image folder bool recursive_; bool decode_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc index dba56ed285a..f77e26a2100 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc @@ -22,7 +22,7 @@ namespace mindspore { namespace dataset { MappableLeafOp::MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr sampler) - : ParallelOp(num_wkrs, queue_size, std::move(sampler)) {} + : ParallelOp(num_wkrs, queue_size, std::move(sampler)), sample_ids_(nullptr), curr_row_(0), prepared_data_{false} {} #ifdef ENABLE_PYTHON Status MappableLeafOp::ImageDecrypt(const std::string &path, std::shared_ptr *tensor, @@ -158,5 +158,30 @@ Status MappableLeafOp::SendQuitFlagToWorker(int32_t worker_id) { RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->Add(std::make_unique(IOBlock::kDeIoBlockNone))); return Status::OK(); } + +Status MappableLeafOp::GetNextRowPullMode(TensorRow *const row) { + RETURN_UNEXPECTED_IF_NULL(row); + row->clear(); + if (!prepared_data_) { + RETURN_IF_NOT_OK(InitPullMode()); + prepared_data_ = true; + } + if (sample_ids_ == nullptr) { + RETURN_IF_NOT_OK(this->InitSampler()); + TensorRow sample_row; + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); + CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler."); + sample_ids_ = sample_row[0]; + } + if (curr_row_ + 1 > sample_ids_->Size()) { + *row = TensorRow(TensorRow::kFlagEOE); + return Status::OK(); + } + int64_t key; + RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_})); + RETURN_IF_NOT_OK(LoadTensorRow(key, row)); + curr_row_++; + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h index cebe551a7c7..c72709c2db7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "minddata/dataset/core/tensor.h" #include "minddata/dataset/engine/data_schema.h" @@ -75,6 +77,10 @@ class MappableLeafOp : public ParallelOp, TensorRow>, p #endif protected: + TensorPtr sample_ids_; // sample id pointer for pull mode + uint32_t curr_row_; // current row number count for pull mode + bool prepared_data_; // flag to indicate whether the data is prepared before LoadTensorRow for pull mode + /// Initialize Sampler, calls sampler->Init() within /// @return Status The status code returned Status InitSampler(); @@ -104,6 +110,15 @@ class MappableLeafOp : public ParallelOp, TensorRow>, p Status Reset() override; Status SendWaitFlagToWorker(int32_t worker_id) override; Status SendQuitFlagToWorker(int32_t worker_id) override; + + /// \brief In pull mode, gets the next row + /// \param row[out] - Fetched TensorRow + /// \return Status The status code returned + Status GetNextRowPullMode(TensorRow *const row) override; + + /// Initialize pull mode, calls PrepareData() within + /// @return Status The status code returned + virtual Status InitPullMode() { return PrepareData(); } }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc index 915571bc9e5..349a1e9d199 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc @@ -366,5 +366,36 @@ Status MindRecordOp::RemoveWorkers(int32_t num_workers) { return Status::OK(); } +Status MindRecordOp::InitPullMode() { + RETURN_IF_NOT_OK(Init()); + RETURN_IF_NOT_OK(shard_reader_->Launch(true)); + return this->PrepareData(); +} + +Status MindRecordOp::GetNextRowPullMode(TensorRow *const row) { + RETURN_UNEXPECTED_IF_NULL(row); + row->clear(); + if (!prepared_data_) { + RETURN_IF_NOT_OK(InitPullMode()); + prepared_data_ = true; + } + if (sample_ids_ == nullptr) { + RETURN_IF_NOT_OK(this->InitSampler()); + TensorRow sample_row; + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); + CHECK_FAIL_RETURN_UNEXPECTED(sample_row.size() > 0, "GetNextRowPullMode: Expect at least one sample in sampler."); + sample_ids_ = sample_row[0]; + } + if (curr_row_ + 1 > sample_ids_->Size()) { + *row = TensorRow(TensorRow::kFlagEOE); + return Status::OK(); + } + int64_t key; + RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_})); + RETURN_IF_NOT_OK(GetRowFromReader(row, key, id())); + curr_row_++; + return Status::OK(); +} + } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h index 072b4eebd93..248fbaa3cd8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h @@ -151,6 +151,15 @@ class MindRecordOp : public MappableLeafOp { /// \return Status The status code returned Status RemoveWorkers(int32_t num_workers = 1) override; + /// Initialize pull mode, calls PrepareData() within + /// @return Status The status code returned + Status InitPullMode() override; + + /// \brief Gets the next row + /// \param row[out] - Fetched TensorRow + /// \return Status The status code returned + Status GetNextRowPullMode(TensorRow *const row) override; + private: std::vector dataset_file_; // dataset files bool load_dataset_; // load dataset from single file or not @@ -164,8 +173,6 @@ class MindRecordOp : public MappableLeafOp { std::map sample_bytes_; std::unique_ptr data_schema_; // Data schema for column typing - std::vector columns_blob_; // Blob Columns to load from dataset - std::vector columns_blob_index_; // Blob Columns to load from dataset std::unique_ptr shard_reader_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc index 455745e55cb..702218495dc 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc @@ -66,5 +66,19 @@ Status TakeOp::GetNextRow(TensorRow *row) { return Status::OK(); } + +Status TakeOp::GetNextRowPullMode(TensorRow *row) { + RETURN_UNEXPECTED_IF_NULL(row); + if (take_count_ < max_takes_) { + RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(row)); + } + if (take_count_ == max_takes_ || row->eoe()) { + UpdateRepeatAndEpochCounter(); + take_count_ = 0; + } else { + take_count_++; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.h index 5ebe45c27d6..0dd30a54557 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.h @@ -60,6 +60,12 @@ class TakeOp : public PipelineOp { Status GetNextRow(TensorRow *row) override; + protected: + /// \brief Gets the next row + /// \param row[out] - Fetched TensorRow + /// \return Status The status code returned + Status GetNextRowPullMode(TensorRow *const row) override; + private: int32_t max_takes_; // The number of takes that the user requested int32_t take_count_; // A counter for the current number of executed takes diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc index 5d3e0ca727c..15308a38306 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc @@ -260,7 +260,7 @@ Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::functionPrepareOperator()); + if (!is_pull_mode) { + RETURN_IF_NOT_OK((*rit)->PrepareOperator()); + } else { + RETURN_IF_NOT_OK((*rit)->PrepareOperatorPullBased()); + } } // The tree is prepared. diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h index afd2afc81a5..46d9748e9be 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h @@ -185,8 +185,9 @@ class ExecutionTree { std::shared_ptr root() const { return root_; } /// \brief The prepare phase walks the tree in post-order to perform modifications to get it ready for execution. + /// \param is_pull_mode - an indicator if it's in pull mode or not /// \return Status The status code returned - Status Prepare(); + Status Prepare(bool is_pull_mode = false); /// \brief Return the pointer to the TaskGroup /// \return raw pointer to the TaskGroup diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h index 0c7bdb3224e..4e5bfd14ec0 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h @@ -322,6 +322,10 @@ class DatasetNode : public std::enable_shared_from_this { /// \return Shared pointer to the original object std::shared_ptr SetDatasetCache(const std::shared_ptr &cache); + /// \brief Setter function for descendant_of_cache_ + /// \param[in] descendant_of_cache Indicator for whether this node is a descendant of cache. + void setDescendantOfCache(bool descendant_of_cache) { descendant_of_cache_ = descendant_of_cache; } + /// \brief A helper templated function for casting "this" pointer to shared_ptr /// Similar to shared_from_this, except this one will give you the derived class as shared_ptr /// \return A shared_ptr casted to the derived class diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/opt/CMakeLists.txt index 4c654b918ca..8ec8d7cf392 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/CMakeLists.txt @@ -16,6 +16,7 @@ set(DATASET_ENGINE_OPT_SRC_FILES pre/node_offload_pass.cc pre/node_removal_pass.cc pre/skip_pushdown_pass.cc + pre/debug_mode_pass.cc ) if(ENABLE_PYTHON) diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/debug_mode_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/debug_mode_pass.cc new file mode 100644 index 00000000000..32f3926f416 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/debug_mode_pass.cc @@ -0,0 +1,63 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "minddata/dataset/engine/opt/pre/debug_mode_pass.h" + +#include +#include "minddata/dataset/engine/ir/datasetops/map_node.h" +#include "minddata/dataset/engine/ir/datasetops/root_node.h" +#include "minddata/dataset/include/dataset/datasets.h" + +namespace mindspore { +namespace dataset { +bool DebugModePass::RemoveCacheAndOffload(std::shared_ptr node) { + // remove DatasetNode cache + bool ret = false; + if (node->IsCached()) { + MS_LOG(WARNING) << node->Name() << " with cache found in the debug mode. Dropping the cache." + << " If performance is a concern, then disable debug mode."; + node->SetDatasetCache(nullptr); + ret = true; + } + if (node->IsDescendantOfCache()) { + node->setDescendantOfCache(false); + ret = true; + } + if (GlobalContext::config_manager()->get_auto_offload()) { + MS_LOG(WARNING) << "Both debug mode and auto offload are enabled. Disabling auto offload." + "If performance is a concern, then disable debug mode and re-enable auto offload."; + GlobalContext::config_manager()->set_auto_offload(false); + } + return ret; +} + +Status DebugModePass::Visit(std::shared_ptr node, bool *const modified) { + *modified = RemoveCacheAndOffload(node); + if (node->GetOffload() == ManualOffloadMode::kEnabled) { + MS_LOG(WARNING) << "Map operation with offload found in the debug mode. Ignoring offload." + "If performance is a concern, then disable debug mode."; + node->SetOffload(ManualOffloadMode::kDisabled); + *modified = true; + } + return Status::OK(); +} + +Status DebugModePass::Visit(std::shared_ptr node, bool *const modified) { + *modified = RemoveCacheAndOffload(node); + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/debug_mode_pass.h b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/debug_mode_pass.h new file mode 100644 index 00000000000..ce7edeb929e --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/debug_mode_pass.h @@ -0,0 +1,57 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DATASET_ENGINE_OPT_PRE_DEBUG_MODE_PASS_H_ +#define DATASET_ENGINE_OPT_PRE_DEBUG_MODE_PASS_H_ + +#include +#include +#include "minddata/dataset/engine/opt/pass.h" + +namespace mindspore { +namespace dataset { +/// \class DebugModePass +/// \brief This is a pre parse pass that disable some nodes and prepares for the debug mode. +class DebugModePass : public IRNodePass { + public: + /// \brief Constructor + DebugModePass() {} + + /// \brief Destructor + ~DebugModePass() = default; + + /// \brief Runs a pass on MapNode + /// \param[in] node The node being visited + /// \param[in, out] *modified indicates if the node was changed at all + /// \return Status code + Status Visit(std::shared_ptr node, bool *const modified) override; + + /// \brief Runs a pass on DatasetNode + /// \param[in] node The node being visited + /// \param[in, out] *modified indicates if the node was changed at all + /// \return Status code + Status Visit(std::shared_ptr node, bool *const modified) override; + + protected: + /// \brief Check and remove cache and offload if node has any + /// \param[in] node The node being visited + /// \return true if the node was changed; otherwise, false + bool RemoveCacheAndOffload(std::shared_ptr node); +}; +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_ENGINE_OPT_PRE_DEBUG_MODE_PASS_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.cc b/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.cc index 6fd36a01b92..e8969e61a34 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.cc @@ -44,7 +44,11 @@ PythonRuntimeContext::~PythonRuntimeContext() { } } -PythonIteratorConsumer *PythonRuntimeContext::GetPythonConsumer() { - return dynamic_cast(tree_consumer_.get()); +TreeConsumer *PythonRuntimeContext::GetPythonConsumer() { + if (GlobalContext::config_manager()->get_debug_mode()) { + return dynamic_cast(tree_consumer_.get()); + } else { + return dynamic_cast(tree_consumer_.get()); + } } } // namespace mindspore::dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.h b/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.h index 761499f778a..bd0404cd992 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.h +++ b/mindspore/ccsrc/minddata/dataset/engine/python_runtime_context.h @@ -34,7 +34,7 @@ class PythonRuntimeContext : public RuntimeContext { /// Safe destructing the tree that includes python objects ~PythonRuntimeContext() override; - PythonIteratorConsumer *GetPythonConsumer(); + TreeConsumer *GetPythonConsumer(); private: /// Internal function to perform the termination diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc index a6471471492..48cb911e55f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc @@ -15,6 +15,13 @@ */ #include "minddata/dataset/engine/tree_adapter_lite.h" +#include "minddata/dataset/engine/ir/datasetops/root_node.h" +#include "minddata/dataset/engine/opt/pass.h" +#include "minddata/dataset/engine/opt/pre/debug_mode_pass.h" +#include "minddata/dataset/engine/opt/pre/deep_copy_pass.h" +#include "minddata/dataset/engine/opt/pre/epoch_ctrl_pass.h" +#include "minddata/dataset/engine/opt/pre/input_validation_pass.h" +#include "minddata/dataset/engine/opt/pre/node_removal_pass.h" namespace mindspore { namespace dataset { @@ -54,8 +61,12 @@ Status TreeAdapterLite::BuildExecutionTreeRecur(std::shared_ptr ir, Status TreeAdapterLite::BuildTree(std::shared_ptr root_ir) { RETURN_UNEXPECTED_IF_NULL(root_ir); - RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir, &root_)); + // Build the Execution tree from the child of the IR root node, which represent the root of the input IR tree as a Top + // Node is added to IR tree. + RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir->Children()[0], &root_)); RETURN_IF_NOT_OK(tree_->AssignRoot(root_)); + // Prepare the tree + RETURN_IF_NOT_OK(tree_->Prepare(true)); return Status::OK(); } @@ -66,5 +77,52 @@ Status TreeAdapterLite::GetNextRow(TensorRow *const row) { return Status::OK(); } +Status TreeAdapterLite::PrePass(std::shared_ptr ir) { + RETURN_UNEXPECTED_IF_NULL(ir); + // Vector of actions in pre-pass phase + std::vector> actions; + MS_LOG(INFO) << "Prepare PrePass loops."; + (void)actions.emplace_back(std::make_unique()); + (void)actions.emplace_back(std::make_unique()); + (void)actions.emplace_back(std::make_unique()); + if (GlobalContext::config_manager()->get_debug_mode()) { + (void)actions.emplace_back(std::make_unique()); + } + // Apply pre-pass actions + for (auto i = 0; i < actions.size(); i++) { + auto m = false; + RETURN_IF_NOT_OK(actions[i]->Run(ir, &m)); + } + MS_LOG(INFO) << "PrePass completed."; + return Status::OK(); +} + +Status TreeAdapterLite::Compile(const std::shared_ptr &input_ir, int32_t num_epochs) { + RETURN_UNEXPECTED_IF_NULL(input_ir); + input_ir_ = input_ir; + MS_LOG(INFO) << "Input plan:" << '\n' << *input_ir << '\n'; + + // Clone the input IR tree and insert under the root node + // Create a root node to host the new copy of the input IR tree + // This is done so that the PrePass will process and modify the tree + // without changing the tree associated with the user code. + // The tree from the user code is permitted to form a graph where any node + // is consumed by more than one parent. However, this cloning process here + // will break the graph into a tree by copying each consumption of a node into a new copy. + DeepCopyPass cloning_tree; + bool m = false; + RETURN_IF_NOT_OK(cloning_tree.Run(input_ir, &m)); + std::shared_ptr root_ir = cloning_tree.Root(); + root_ir->SetNumEpochs(num_epochs); + + MS_LOG(INFO) << "Plan before PrePass:" << '\n' << *root_ir << '\n'; + // Pre-pass of the IR tree + RETURN_IF_NOT_OK(PrePass(root_ir)); + MS_LOG(INFO) << "Plan after PrePass:" << '\n' << *root_ir << '\n'; + root_ir_ = root_ir; + + RETURN_IF_NOT_OK(BuildTree(root_ir)); + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h index 2fe54c488d4..a6bc043cadf 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h @@ -44,6 +44,16 @@ class TreeAdapterLite { std::unordered_map GetColumnNameMap() const { return tree_->root()->column_name_id_map(); } + // This function performs syntax checking, semantics checking, and then call BuildTree + Status Compile(const std::shared_ptr &input_ir, int32_t num_epochs = -1); + + protected: + // Run the mandatory pass checking the syntax and semantics of the IR tree + Status PrePass(std::shared_ptr ir); + + std::shared_ptr input_ir_; + std::shared_ptr root_ir_; + private: // This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree. Status BuildExecutionTreeRecur(std::shared_ptr ir, std::shared_ptr *op); diff --git a/mindspore/lite/minddata/CMakeLists.txt b/mindspore/lite/minddata/CMakeLists.txt index fdc4f8d4714..c14679a590f 100644 --- a/mindspore/lite/minddata/CMakeLists.txt +++ b/mindspore/lite/minddata/CMakeLists.txt @@ -190,6 +190,7 @@ if(MSLITE_MINDDATA_IMPLEMENT STREQUAL "full") ${MINDDATA_DIR}/engine/opt/pre/add_skip_pass.cc ${MINDDATA_DIR}/engine/opt/pre/getter_pass.cc ${MINDDATA_DIR}/engine/opt/pre/input_validation_pass.cc + ${MINDDATA_DIR}/engine/opt/pre/debug_mode_pass.cc ${MINDDATA_DIR}/engine/opt/pre/cache_validation_pass.cc ${MINDDATA_DIR}/engine/opt/pre/node_removal_pass.cc ${MINDDATA_DIR}/engine/opt/pre/epoch_ctrl_pass.cc diff --git a/mindspore/python/mindspore/dataset/core/config.py b/mindspore/python/mindspore/dataset/core/config.py index 80dd8b06cb9..f9f026cb9ff 100644 --- a/mindspore/python/mindspore/dataset/core/config.py +++ b/mindspore/python/mindspore/dataset/core/config.py @@ -46,7 +46,8 @@ __all__ = ['set_sending_batches', 'load', '_init_device_info', 'set_auto_offload', 'get_auto_offload', 'set_enable_watchdog', 'get_enable_watchdog', 'set_fast_recovery', 'get_fast_recovery', - 'set_multiprocessing_timeout_interval', 'get_multiprocessing_timeout_interval'] + 'set_multiprocessing_timeout_interval', 'get_multiprocessing_timeout_interval', + 'set_debug_mode', 'get_debug_mode'] INT32_MAX = 2147483647 UINT32_MAX = 4294967295 @@ -832,3 +833,43 @@ def get_fast_recovery(): >>> is_fast_recovery = ds.config.get_fast_recovery() """ return _config.get_fast_recovery() + + +def set_debug_mode(debug_mode_flag): + """ + Set the debug_mode flag of the dataset pipeline + Notes: + 1. If both debug_mode and auto_offload are enabled, then during runtime, auto_offload is forcibly disabled. + 2. If both debug_mode is enabled and a dataset pipeline has Map operation with offload set, then offload is + ignored. + 3. If both debug_mode is enabled and a dataset operation has cache set, then the cache is dropped. + + Args: + debug_mode_flag (bool): Whether dataset pipeline debugger is enabled, which forces the pipeline + to run synchronously and sequentially. + + Raises: + TypeError: If `debug_mode_flag` is not a boolean data type. + + Examples: + >>> ds.config.set_debug_mode(True) + """ + if not isinstance(debug_mode_flag, bool): + raise TypeError("debug_mode_flag isn't of type boolean.") + if debug_mode_flag: + logger.warning("MindData Debug mode is enabled. Performance will be impacted because the pipeline will be" + " running in a single thread.") + _config.set_debug_mode(debug_mode_flag) + + +def get_debug_mode(): + """ + Get the debug_mode flag of the dataset pipeline + + Returns: + bool, whether enables dataset pipeline debugger + + Examples: + >>> debug_mode = ds.config.get_debug_mode() + """ + return _config.get_debug_mode() diff --git a/mindspore/python/mindspore/dataset/engine/iterators.py b/mindspore/python/mindspore/dataset/engine/iterators.py index e07dd08de0f..e30b7e833ac 100644 --- a/mindspore/python/mindspore/dataset/engine/iterators.py +++ b/mindspore/python/mindspore/dataset/engine/iterators.py @@ -24,6 +24,7 @@ import numpy as np import mindspore._c_dataengine as cde from mindspore.common.tensor import Tensor import mindspore.dataset.engine.offload as offload +from mindspore.dataset.core.config import get_debug_mode from mindspore import log as logger @@ -75,7 +76,10 @@ class Iterator: self._runtime_context = cde.PythonRuntimeContext() self._runtime_context.Init() - consumer = cde.PythonIteratorConsumer(num_epochs) + if get_debug_mode(): + consumer = cde.PythonPullBasedIteratorConsumer(num_epochs) + else: + consumer = cde.PythonIteratorConsumer(num_epochs) consumer.Init(self.ir_tree) self._runtime_context.AssignConsumer(consumer) self._iterator = self._runtime_context.GetConsumer() diff --git a/tests/ut/cpp/dataset/c_api_pull_based_get_next_mappable_op_test.cc b/tests/ut/cpp/dataset/c_api_pull_based_get_next_mappable_op_test.cc new file mode 100644 index 00000000000..883d6c66b3b --- /dev/null +++ b/tests/ut/cpp/dataset/c_api_pull_based_get_next_mappable_op_test.cc @@ -0,0 +1,1237 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/common.h" +#include "minddata/dataset/include/dataset/datasets.h" +#include "minddata/dataset/include/dataset/vision.h" + +namespace common = mindspore::common; + +using namespace mindspore::dataset; + +class MindDataTestPipeline : public UT::DatasetOpTesting { + protected: +}; + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Album +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableAlbum) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableAlbum."; + + std::string folder_path = datasets_root_path_ + "/testAlbum/images"; + std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json"; + std::vector column_names = {"label", "image"}; + std::shared_ptr ds = Album(folder_path, schema_file, column_names); + EXPECT_NE(ds, nullptr); + + std::vector row; + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + ASSERT_OK(iter->GetNextRow(&row)); + auto count = 0; + while (row.size() > 0) { + ++count; + auto label = row[0]; + auto image = row[1]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + MS_LOG(INFO) << "Tensor level shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + EXPECT_EQ(count, 7); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on CelebA +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableCelebA) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableCelebA."; + + // Create a CelebA Dataset + std::string folder_path = datasets_root_path_ + "/testCelebAData/"; + std::shared_ptr ds = CelebA(folder_path); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + // order: image, level + // Check if CelebA() read correct images/attr + uint64_t i = 0; + while (row.size() != 0) { + auto image = row[0]; + auto level = row[1]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + MS_LOG(INFO) << "Tensor level shape: " << level.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + i++; + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Cityscapes +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableCityscapes) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableCityscapes."; + + std::string dataset_path = datasets_root_path_ + "/testCityscapesData/cityscapes"; + std::string usage = "train"; // quality_mode=fine 'train', 'test', 'val' else 'train', 'train_extra', 'val' + std::string quality_mode = "fine"; // fine coarse + std::string task = "color"; // instance semantic polygon color + + // Create a Cityscapes Dataset + std::shared_ptr ds = Cityscapes(dataset_path, usage, quality_mode, task); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[1]; + auto task = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 5); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on CMUArctic +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableCMUArctic) { + MS_LOG(INFO) << "Doing CMUArcticDataTestPipeline-TestGetNextPullBasedMappableCMUArctic."; + + std::string folder_path = datasets_root_path_ + "/testCMUArcticData"; + // Create a CMUArctic Dataset. + std::shared_ptr ds = CMUArctic(folder_path); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row. + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + std::string_view transcript_idx, utterance_id_idx; + uint32_t rate = 0; + uint64_t i = 0; + + while (row.size() != 0) { + // order: waveform, sample_rate, transcript, utterance_id + auto waveform = row[0]; + auto sample_rate = row[1]; + auto transcript = row[2]; + auto utterance_id = row[3]; + + MS_LOG(INFO) << "Tensor waveform shape: " << waveform.Shape(); + + std::shared_ptr trate; + ASSERT_OK(Tensor::CreateFromMSTensor(sample_rate, &trate)); + ASSERT_OK(trate->GetItemAt(&rate, {})); + MS_LOG(INFO) << "Audio sample rate: " << rate; + + std::shared_ptr de_transcript; + ASSERT_OK(Tensor::CreateFromMSTensor(transcript, &de_transcript)); + ASSERT_OK(de_transcript->GetItemAt(&transcript_idx, {})); + std::string s_transcript(transcript_idx); + MS_LOG(INFO) << "Tensor transcript value: " << transcript_idx; + + std::shared_ptr de_utterance_id; + ASSERT_OK(Tensor::CreateFromMSTensor(utterance_id, &de_utterance_id)); + ASSERT_OK(de_utterance_id->GetItemAt(&utterance_id_idx, {})); + std::string s_utterance_id(utterance_id_idx); + MS_LOG(INFO) << "Tensor utterance_id value: " << utterance_id_idx; + i++; + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Coco +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableCoco) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableCoco."; + // Create a Coco Dataset. + std::string folder_path = datasets_root_path_ + "/testCOCO/train"; + std::string annotation_file = datasets_root_path_ + "/testCOCO/annotations/train.json"; + + std::shared_ptr ds = Coco(folder_path, annotation_file); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, bbox, category_id, iscrowd + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 4); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + auto bbox = row[1]; + auto category_id = row[2]; + auto iscrowd = row[3]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + MS_LOG(INFO) << "Tensor bbox shape: " << bbox.Shape(); + MS_LOG(INFO) << "Tensor category_id shape: " << category_id.Shape(); + MS_LOG(INFO) << "Tensor iscrowd shape: " << iscrowd.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 6); + + // Manually terminate the pipeline. + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on DIV2K +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableDIV2K) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableDIV2K."; + + std::string dataset_path = datasets_root_path_ + "/testDIV2KData/div2k"; + std::string usage = "train"; // train valid, all + std::string downgrade = "bicubic"; // bicubic, unknown, mild, difficult, wild + int32_t scale = 2; // 2, 3, 4, 8 + + // Create a DIV2K Dataset + std::shared_ptr ds = DIV2K(dataset_path, usage, downgrade, scale); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: hr_image, lr_image + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto hr_image = row[0]; + auto lr_image = row[1]; + MS_LOG(INFO) << "Tensor hr_image shape: " << hr_image.Shape(); + MS_LOG(INFO) << "Tensor lr_image shape: " << lr_image.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 5); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on FakeImage +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableFakeImage) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableFakeImage."; + + // Create a FakeImage Dataset + std::shared_ptr ds = FakeImage(50, {28, 28, 3}, 3, 0, std::make_shared(false, 10)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 10); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Flickr +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableFlickr) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableFlickr."; + + std::string dataset_path = datasets_root_path_ + "/testFlickrData/flickr30k/flickr30k-images"; + std::string file_path = datasets_root_path_ + "/testFlickrData/flickr30k/test1.token"; + + // Create a Flickr30k Dataset + std::shared_ptr ds = Flickr(dataset_path, file_path); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, annotation + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto annotation = row[1]; + MS_LOG(INFO) << "Tensor annotation shape: " << annotation.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on GTZAN +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableGTZAN) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableGTZAN."; + + std::string file_path = datasets_root_path_ + "/testGTZANData"; + // Create a GTZAN Dataset + std::shared_ptr ds = GTZAN(file_path); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row. + // order: waveform, sample_rate, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + std::string_view label_idx; + uint32_t rate = 0; + uint64_t i = 0; + + while (row.size() != 0) { + i++; + auto waveform = row[0]; + auto sample_rate = row[1]; + auto label = row[2]; + MS_LOG(INFO) << "Tensor waveform shape: " << waveform.Shape(); + + std::shared_ptr trate; + ASSERT_OK(Tensor::CreateFromMSTensor(sample_rate, &trate)); + ASSERT_OK(trate->GetItemAt(&rate, {})); + EXPECT_EQ(rate, 22050); + MS_LOG(INFO) << "Tensor label rate: " << rate; + + std::shared_ptr de_label; + ASSERT_OK(Tensor::CreateFromMSTensor(label, &de_label)); + ASSERT_OK(de_label->GetItemAt(&label_idx, {})); + std::string s_label(label_idx); + std::string expected("blues"); + EXPECT_STREQ(s_label.c_str(), expected.c_str()); + MS_LOG(INFO) << "Tensor label value: " << label_idx; + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on ImageFolder +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableImageFolder) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableImageFolder."; + // Test get dataset size in distributed scenario when num_per_shard is more than num_samples + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(4, 0, false, 10)); + EXPECT_NE(ds, nullptr); + + // num_per_shard is equal to 44/4 = 11 which is more than num_samples = 10, so the output is 10 + EXPECT_EQ(ds->GetDatasetSize(), 10); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // iterate over the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + MS_LOG(INFO) << "row.size() " << row.size(); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + // The value of i should be equal to the result of get dataset size + EXPECT_EQ(i, 10); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on IMDB +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableIMDB) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableIMDB."; + + std::string dataset_path = datasets_root_path_ + "/testIMDBDataset"; + std::string usage = "all"; // 'train', 'test', 'all' + + // Create a IMDB Dataset + std::shared_ptr ds = IMDB(dataset_path, usage); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: text, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto text = row[0]; + MS_LOG(INFO) << "Tensor text shape: " << text.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 8); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on KITTI +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableKITTI) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableKITTI."; + + // Create a KITTI Dataset. + std::string folder_path = datasets_root_path_ + "/testKITTI"; + std::shared_ptr ds = KITTI(folder_path, "train", false, std::make_shared(0, 2)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row. + // order: image + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + // Check if KITTI() read correct images. + std::string expect_file[] = {"000000", "000001", "000002"}; + + uint64_t i = 0; + while (row.size() != 0) { + auto image = row[0]; + auto label = row[1]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + + mindspore::MSTensor expect_image = + ReadFileToTensor(folder_path + "/data_object_image_2/training/image_2/" + expect_file[i] + ".png"); + EXPECT_MSTENSOR_EQ(image, expect_image); + + ASSERT_OK(iter->GetNextRow(&row)); + i++; + } + + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline. + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on LFW +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableLFW) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableLFW."; + + // Create a LFW Dataset. + std::string folder_path = datasets_root_path_ + "/testLFW"; + std::shared_ptr ds = LFW(folder_path, "people", "all", "original", false); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row. + // order: image, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline. + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on LibriTTS +/// Expectation: Output is the same as the normal iterator +// Note: test files in /testLibriTTSData incompleted +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableLibriTTS) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableLibriTTS."; + + std::string folder_path = datasets_root_path_ + "/testLibriTTSData"; + std::shared_ptr ds = LibriTTS(folder_path); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row. + // order: waveform, sample_rate, original_text, normalized_text, speaker_id, chapter_id, utterance_id + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 7); + uint64_t i = 0; + + while (row.size() != 0) { + auto waveform = row[0]; + auto sample_rate = row[1]; + auto original_text = row[2]; + auto normalized_text = row[3]; + auto speaker_id = row[4]; + auto chapter_id = row[5]; + auto utterance_id = row[6]; + i++; + ASSERT_OK(iter->GetNextRow(&row)); + } + EXPECT_EQ(i, 3); + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on LJSpeech +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableLJSpeech) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableLJSpeech."; + std::string folder_path = datasets_root_path_ + "/testLJSpeechData/"; + std::shared_ptr ds = LJSpeech(folder_path, std::make_shared(false, 3)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row. + // order: waveform, sample_rate, transcription, normalized_transcript + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 4); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto waveform = row[0]; + MS_LOG(INFO) << "Tensor waveform shape: " << waveform.Shape(); + auto sample_rate = row[1]; + MS_LOG(INFO) << "Tensor sample_rate shape: " << sample_rate.Shape(); + auto transcription = row[2]; + MS_LOG(INFO) << "Tensor transcription shape: " << transcription.Shape(); + auto normalized_transcript = row[3]; + MS_LOG(INFO) << "Tensor normalized_transcript shape: " << normalized_transcript.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline. + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Manifest +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableManifest) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableManifest."; + + std::string file_path = datasets_root_path_ + "/testManifestData/cpp.json"; + // Create a Manifest Dataset + std::shared_ptr ds = Manifest(file_path); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on MindRecord +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableMinddata) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableMinddata with string file pattern."; + // Create a MindData Dataset + // Pass one mindrecord shard file to parse dataset info, and search for other mindrecord files with same dataset info, + // thus all records in imagenet.mindrecord0 ~ imagenet.mindrecord3 will be read + std::string file_path = datasets_root_path_ + "/../mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0"; + std::vector column_names = {"file_name", "label", "data"}; + std::shared_ptr ds = MindData(file_path, column_names); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 3); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + TEST_MS_LOG_MSTENSOR(INFO, "Tensor image file name: ", image); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + auto data = row[2]; + MS_LOG(INFO) << "Tensor data shape: " << data.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + // Each *.mindrecord file has 5 rows, so there are 20 rows in total(imagenet.mindrecord0 ~ imagenet.mindrecord3) + EXPECT_EQ(i, 20); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Mnist +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableMnist) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableMnist."; + + // Create a Mnist Dataset + std::string folder_path = datasets_root_path_ + "/testMnistData/"; + std::shared_ptr ds = Mnist(folder_path, "all", std::make_shared(false, 10)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // iterate over the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 10); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on PhotoTour +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappablePhotoTour) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappablePhotoTour."; + + // Create a PhotoTour Test Dataset + std::string folder_path = datasets_root_path_ + "/testPhotoTourData"; + std::shared_ptr ds = PhotoTour(folder_path, "liberty", "test", std::make_shared(false, 10)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + + std::vector row; + // order: image1, image2, label + ASSERT_OK(iter->GetNextRow(&row)); + + EXPECT_EQ(row.size(), 3); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image1 = row[0]; + MS_LOG(INFO) << "Tensor image1 shape: " << image1.Shape(); + auto image2 = row[1]; + MS_LOG(INFO) << "Tensor image2 shape: " << image2.Shape(); + auto label = row[2]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 10); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Places365 with train standard +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappablePlaces365TrainStandard) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappablePlaces365TrainStandard."; + + // Create a Places365 Train Dataset. + std::string folder_path = datasets_root_path_ + "/testPlaces365Data"; + std::shared_ptr ds = + Places365(folder_path, "train-standard", true, true, std::make_shared(false, 4)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline. + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on RandomData +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableRandomData) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableRandomData."; + + // Create a RandomDataset + std::shared_ptr schema = Schema(); + ASSERT_OK(schema->add_column("image", mindspore::DataType::kNumberTypeUInt8, {2})); + ASSERT_OK(schema->add_column("label", mindspore::DataType::kNumberTypeUInt8, {1})); + std::shared_ptr ds = RandomData(50, schema); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + EXPECT_EQ(i, 50); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on SBU +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableSBU) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableSBU."; + + // Create a SBU Dataset + std::string folder_path = datasets_root_path_ + "/testSBUDataset/"; + std::shared_ptr ds = SBU(folder_path, true, std::make_shared(false, 5)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, caption + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto caption = row[1]; + MS_LOG(INFO) << "Tensor caption shape: " << caption.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 5); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Semeion +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableSemeion) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableSemeion."; + + // Create a Semeion Dataset. + std::string folder_path = datasets_root_path_ + "/testSemeionData"; + std::shared_ptr ds = Semeion(folder_path, std::make_shared(false, 5), nullptr); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + EXPECT_EQ(i, 5); + + // Manually terminate the pipeline. + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on SpeechCommands +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableSpeechCommands) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableSpeechCommands."; + std::string folder_path = datasets_root_path_ + "/testSpeechCommandsData/"; + std::shared_ptr ds = SpeechCommands(folder_path, "all", std::make_shared(false, 2)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: waveform, sample_rate, label, speaker_id, utterance_number + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 5); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto waveform = row[0]; + MS_LOG(INFO) << "Tensor waveform shape: " << waveform.Shape(); + auto sample_rate = row[1]; + MS_LOG(INFO) << "Tensor sample_rate shape: " << sample_rate.Shape(); + auto label = row[2]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + auto speaker_id = row[3]; + MS_LOG(INFO) << "Tensor speaker_id shape: " << speaker_id.Shape(); + auto utterance_number = row[4]; + MS_LOG(INFO) << "Tensor utterance_number shape: " << utterance_number.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline. + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on STL10Dataset with train+unlabeled +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableSTL10) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableSTL10."; + + // Create a STL10 Dataset + std::string folder_path = datasets_root_path_ + "/testSTL10Data/"; + std::shared_ptr ds = STL10(folder_path, "train+unlabeled", std::make_shared(false, 2)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + auto label = row[1]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on Tedlium +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableTedlium) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableTedlium."; + // Create a Tedlium Dataset. + std::string folder_path12 = datasets_root_path_ + "/testTedliumData/TEDLIUM_release1"; + std::shared_ptr ds = + Tedlium(folder_path12, "release1", "all", ".sph", std::make_shared(false, 4), nullptr); + EXPECT_NE(ds, nullptr); + + // Create a Batch operation on ds + int32_t batch_size = 1; + ds = ds->Batch(batch_size); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + std::vector columns = {"waveform", "sample_rate", "transcript", "talk_id", "speaker_id", "identifier"}; + std::shared_ptr project_ds = ds->Project(columns); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: waveform, sample_rate, transcript, talk_id, speaker_id, identifier + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 6); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto waveform = row[0]; + MS_LOG(INFO) << "Tensor waveform shape: " << waveform.Shape(); + auto sample_rate = row[1]; + MS_LOG(INFO) << "Tensor sample_rate shape: " << sample_rate.Shape(); + auto transcript = row[2]; + MS_LOG(INFO) << "Tensor transcript shape: " << transcript.Shape(); + auto talk_id = row[3]; + MS_LOG(INFO) << "Tensor talk_id shape: " << talk_id.Shape(); + auto speaker_id = row[4]; + MS_LOG(INFO) << "Tensor speaker_id shape: " << speaker_id.Shape(); + auto identifier = row[5]; + MS_LOG(INFO) << "Tensor identifier shape: " << identifier.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on VOC +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableVOC) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableVOC."; + + // Create a VOC Dataset + std::string folder_path = datasets_root_path_ + "/testVOC2012_2"; + std::map class_index; + class_index["car"] = 0; + class_index["cat"] = 1; + class_index["train"] = 9; + + std::shared_ptr ds = + VOC(folder_path, "Detection", "train", class_index, false, std::make_shared(0, 6)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, bbox, label, difficult, truncate + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 5); + + // Check if VOC() read correct labels + // When we provide class_index, label of ["car","cat","train"] become [0,1,9] + std::shared_ptr de_expect_label; + ASSERT_OK(Tensor::CreateFromMemory(TensorShape({1, 1}), DataType(DataType::DE_UINT32), nullptr, &de_expect_label)); + uint32_t expect[] = {9, 9, 9, 1, 1, 0}; + + uint64_t i = 0; + while (row.size() != 0) { + auto image = row[0]; + auto bbox = row[1]; + auto label = row[2]; + auto difficult = row[3]; + auto truncate = row[4]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + MS_LOG(INFO) << "Tensor bbox shape: " << bbox.Shape(); + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + MS_LOG(INFO) << "Tensor difficult shape: " << difficult.Shape(); + MS_LOG(INFO) << "Tensor truncate shape: " << truncate.Shape(); + + ASSERT_OK(de_expect_label->SetItemAt({0, 0}, expect[i])); + mindspore::MSTensor expect_label = + mindspore::MSTensor(std::make_shared(de_expect_label)); + EXPECT_MSTENSOR_EQ(label, expect_label); + ASSERT_OK(iter->GetNextRow(&row)); + i++; + } + + EXPECT_EQ(i, 6); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on WIDERFace +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableWIDERFace) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableWIDERFace."; + // Create a WIDERFace Dataset. + std::string folder_path = datasets_root_path_ + "/testWIDERFace/"; + std::shared_ptr ds = WIDERFace(folder_path); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: image, bbox, blur, expression, illumination, occlusion, pose, invalid + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 8); + + uint64_t i = 0; + while (row.size() != 0) { + auto image = row[0]; + auto bbox = row[1]; + auto blur = row[2]; + auto expression = row[3]; + auto illumination = row[4]; + auto occlusion = row[5]; + auto pose = row[6]; + auto invalid = row[7]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + MS_LOG(INFO) << "Tensor bbox shape: " << bbox.Shape(); + MS_LOG(INFO) << "Tensor blur shape: " << blur.Shape(); + MS_LOG(INFO) << "Tensor expression shape: " << expression.Shape(); + MS_LOG(INFO) << "Tensor illumination shape: " << illumination.Shape(); + MS_LOG(INFO) << "Tensor occlusion shape: " << occlusion.Shape(); + MS_LOG(INFO) << "Tensor pose shape: " << pose.Shape(); + MS_LOG(INFO) << "Tensor invalid shape: " << invalid.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + i++; + } + + EXPECT_EQ(i, 4); + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on YesNo +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedMappableYesNo) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedMappableYesNo."; + // Create a YesNoDataset + std::string folder_path = datasets_root_path_ + "/testYesNoData/"; + std::shared_ptr ds = YesNo(folder_path, std::make_shared(false, 2)); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + // order: waveform, sample_rate, label + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 3); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto waveform = row[0]; + MS_LOG(INFO) << "Tensor waveform shape: " << waveform.Shape(); + auto sample_rate = row[1]; + MS_LOG(INFO) << "Tensor sample_rate shape: " << sample_rate.Shape(); + auto label = row[2]; + MS_LOG(INFO) << "Tensor label shape: " << label.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} diff --git a/tests/ut/cpp/dataset/c_api_pull_based_get_next_pipline_op_test.cc b/tests/ut/cpp/dataset/c_api_pull_based_get_next_pipline_op_test.cc new file mode 100644 index 00000000000..3a29d385d35 --- /dev/null +++ b/tests/ut/cpp/dataset/c_api_pull_based_get_next_pipline_op_test.cc @@ -0,0 +1,244 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/common.h" +#include "minddata/dataset/include/dataset/datasets.h" +#include "minddata/dataset/include/dataset/vision.h" + +namespace common = mindspore::common; + +using namespace mindspore::dataset; + +class MindDataTestPipeline : public UT::DatasetOpTesting { + protected: +}; + + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on SkipOp +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineSkipOp) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineSkipOp."; + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(false, 10)); + EXPECT_NE(ds, nullptr); + + // Create a Skip operation on ds + int32_t count = 3; + ds = ds->Skip(count); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // iterate over the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + MS_LOG(INFO) << "Number of rows: " << i; + + // Expect 10 - 3 = 7 rows + EXPECT_EQ(i, 7); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on SkipOp with count larger than number of rows +/// Expectation: Output is the same as the normal iterator +TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineSkipOpLargeCount) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineSkipOpLargeCount."; + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(false, 10)); + EXPECT_NE(ds, nullptr); + + // Create a Skip operation on ds + int32_t count = 30; + ds = ds->Skip(count); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // iterate over the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 0); + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on TakeOp with default count=-1 +/// Expectation: Output is equal to the expected output +TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineTakeOpDefault) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineTakeOpDefault."; + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(false, 7)); + EXPECT_NE(ds, nullptr); + + // Create a Take operation on ds, default count = -1 + ds = ds->Take(); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // iterate over the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + MS_LOG(INFO) << "Number of rows: " << i; + + // Expect 7 rows + EXPECT_EQ(i, 7); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on TakeOp with count = 5 +/// Expectation: Output is equal to the expected output +TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineTakeOpCount5) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineTakeOpCount5."; + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(false, 7)); + EXPECT_NE(ds, nullptr); + + // Create a Take operation on ds, count = 5 + ds = ds->Take(5); + EXPECT_EQ(ds->GetDatasetSize(), 5); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + // iterate over the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + EXPECT_EQ(row.size(), 2); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row[0]; + MS_LOG(INFO) << "Tensor image shape: " << image.Shape(); + ASSERT_OK(iter->GetNextRow(&row)); + } + MS_LOG(INFO) << "Number of rows: " << i; + + // Expect 5 rows from take(5). + EXPECT_EQ(i, 5); + + // Manually terminate the pipeline + iter->Stop(); +} + +/// Feature: Take op +/// Description: Test Take op with invalid count input +/// Expectation: Error message is logged, and CreatePullBasedIterator() for invalid pipeline returns nullptr +TEST_F(MindDataTestPipeline, TestTakeDatasetError1CreatePullBasedIterator) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTakeDatasetError1CreatePullBasedIterator."; + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(false, 10)); + EXPECT_NE(ds, nullptr); + + // Create a Take operation on ds with invalid count input + int32_t count = -5; + auto ds1 = ds->Take(count); + EXPECT_NE(ds1, nullptr); + + // Create an iterator over the result of the above dataset + std::shared_ptr iter = ds1->CreatePullBasedIterator(); + // Expect failure: invalid Op input + EXPECT_EQ(iter, nullptr); + + // Create a Take operation on ds with invalid count input + count = 0; + auto ds2 = ds->Take(count); + EXPECT_NE(ds2, nullptr); + + // Create an iterator over the result of the above dataset + iter = ds2->CreatePullBasedIterator(); + // Expect failure: invalid Op input + EXPECT_EQ(iter, nullptr); +} + +/// Feature: PullBasedIterator GetNextRowPullMode +/// Description: Test PullBasedIterator on TakeOp with invalid count = -5 +/// Expectation: Error message is logged, and CreatePullBasedIterator() for invalid pipeline returns nullptr +TEST_F(MindDataTestPipeline, TestGetNextPullBasedPipelineTakeOpError) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestGetNextPullBasedPipelineTakeOpError."; + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(false, 10)); + EXPECT_NE(ds, nullptr); + + // Create a Take operation on ds with invalid count input + int32_t count = -5; + auto ds1 = ds->Take(count); + EXPECT_NE(ds1, nullptr); + + // Create an iterator over the result of the above dataset + std::shared_ptr iter = ds1->CreatePullBasedIterator(); + std::vector row; + // Expect failure: invalid Op input + EXPECT_EQ(iter, nullptr); + + // Create a Take operation on ds with invalid count input + count = 0; + auto ds2 = ds->Take(count); + EXPECT_NE(ds2, nullptr); + + // Create an iterator over the result of the above dataset + iter = ds2->CreatePullBasedIterator(); + // Expect failure: invalid Op input + EXPECT_EQ(iter, nullptr); +} diff --git a/tests/ut/data/dataset/declient.cfg b/tests/ut/data/dataset/declient.cfg index 36c8a455327..1df12279d8f 100644 --- a/tests/ut/data/dataset/declient.cfg +++ b/tests/ut/data/dataset/declient.cfg @@ -5,5 +5,6 @@ "workerConnectorSize": 16, "opConnectorSize": 16, "seed": 5489, - "monitorSamplingInterval": 15 + "monitorSamplingInterval": 15, + "debug_mode_flag": true } diff --git a/tests/ut/python/dataset/test_config.py b/tests/ut/python/dataset/test_config.py index 42a5c19f2cb..3ef6e3d5d20 100644 --- a/tests/ut/python/dataset/test_config.py +++ b/tests/ut/python/dataset/test_config.py @@ -54,6 +54,7 @@ def test_basic(): seed_original = ds.config.get_seed() monitor_sampling_interval_original = ds.config.get_monitor_sampling_interval() fast_recovery_original = ds.config.get_fast_recovery() + debug_mode = ds.config.get_debug_mode() ds.config.load('../data/dataset/declient.cfg') @@ -63,6 +64,7 @@ def test_basic(): assert ds.config.get_seed() == 5489 assert ds.config.get_monitor_sampling_interval() == 15 assert ds.config.get_fast_recovery() + assert ds.config.get_debug_mode() ds.config.set_num_parallel_workers(2) # ds.config.set_worker_connector_size(3) @@ -70,6 +72,7 @@ def test_basic(): ds.config.set_seed(5) ds.config.set_monitor_sampling_interval(45) ds.config.set_fast_recovery(False) + ds.config.set_debug_mode(False) assert ds.config.get_num_parallel_workers() == 2 # assert ds.config.get_worker_connector_size() == 3 @@ -77,6 +80,7 @@ def test_basic(): assert ds.config.get_seed() == 5 assert ds.config.get_monitor_sampling_interval() == 45 assert not ds.config.get_fast_recovery() + assert not ds.config.get_debug_mode() # Restore original configuration values ds.config.set_num_parallel_workers(num_parallel_workers_original) @@ -84,6 +88,7 @@ def test_basic(): ds.config.set_seed(seed_original) ds.config.set_monitor_sampling_interval(monitor_sampling_interval_original) ds.config.set_fast_recovery(fast_recovery_original) + ds.config.set_debug_mode(debug_mode) def test_get_seed(): @@ -516,6 +521,30 @@ def test_fast_recovery(): assert "set_fast_recovery() missing 1 required positional argument: 'fast_recovery'" in str(error_info.value) +def test_debug_mode(): + """ + Feature: Test the debug mode setter/getter function + Description: This function only accepts a boolean as input and outputs error otherwise + Expectation: TypeError will be raised when input argument is missing or is not a boolean + """ + # set_debug_mode() to True and then check if the value is indeed True with get_debug_mode(). + debug_mode_flag = True + ds.config.set_debug_mode(debug_mode_flag) + assert ds.config.get_debug_mode() == debug_mode_flag + # set_debug_mode will raise TypeError if input is an integer + config_error_func(ds.config.set_debug_mode, 0, TypeError, "debug_mode_flag isn't of type boolean.") + # set_debug_mode will raise TypeError if input is a string + config_error_func(ds.config.set_debug_mode, "True", TypeError, "debug_mode_flag isn't of type boolean.") + # set_debug_mode will raise TypeError if input is a tuple + config_error_func(ds.config.set_debug_mode, (True,), TypeError, "debug_mode_flag isn't of type boolean.") + # set_debug_mode will raise TypeError if input is None + config_error_func(ds.config.set_debug_mode, None, TypeError, "debug_mode_flag isn't of type boolean.") + # set_debug_mode will raise TypeError if no input is provided + with pytest.raises(TypeError) as error_info: + ds.config.set_debug_mode() + assert "set_debug_mode() missing 1 required positional argument: 'debug_mode_flag'" in str(error_info.value) + + if __name__ == '__main__': test_basic() test_get_seed() @@ -532,3 +561,4 @@ if __name__ == '__main__': test_multiprocessing_timeout_interval() test_config_bool_type_error() test_fast_recovery() + test_debug_mode() diff --git a/tests/ut/python/dataset/test_pipeline_debug_mode.py b/tests/ut/python/dataset/test_pipeline_debug_mode.py new file mode 100644 index 00000000000..cf23cb1bb06 --- /dev/null +++ b/tests/ut/python/dataset/test_pipeline_debug_mode.py @@ -0,0 +1,185 @@ +# Copyright 2022 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import pytest +import mindspore.dataset as ds +import mindspore.dataset.vision as vision +from mindspore.dataset.vision import Inter +from mindspore import log as logger + +# Need to run all these tests in separate processes since +# the global configuration setting of debug_mode may impact other tests running in parallel. + +DEBUG_MODE = False + + +def setup_function(): + global DEBUG_MODE + DEBUG_MODE = ds.config.get_debug_mode() + ds.config.set_debug_mode(True) + + +def teardown_function(): + ds.config.set_debug_mode(DEBUG_MODE) + + +@pytest.mark.forked +def test_pipeline_debug_mode_tuple(): + """ + Feature: Pipeline debug mode. + Description: Test creating tuple iterator with debug mode enabled. + Expectation: Successful. + """ + logger.info("test_pipeline_debug_mode_tuple") + data = ds.CelebADataset("../data/dataset/testCelebAData/", decode=True, num_shards=1, shard_id=0) + crop_size = (80, 80) + resize_size = (24, 24) + # define map operations + center_crop = vision.CenterCrop(crop_size) + resize_op = vision.Resize(resize_size, Inter.LINEAR) # Bilinear mode + data = data.map(operations=[center_crop, resize_op], input_columns=["image"]) + data = data.batch(2) + num_row = 0 + for item in data.create_tuple_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 2 + assert item[0].shape == (2, 24, 24, 3) + assert item[1].shape == (2, 40) + num_row += 1 + assert num_row == 2 + + +@pytest.mark.forked +def test_pipeline_debug_mode_dict(): + """ + Feature: Pipeline debug mode. + Description: Test creating dict iterator with debug mode enabled. + Expectation: Successful. + """ + logger.info("test_pipeline_debug_mode_dict") + data = ds.CelebADataset("../data/dataset/testCelebAData/", decode=True, num_shards=1, shard_id=0) + crop_size = (80, 80) + resize_size = (24, 24) + # define map operations + center_crop = vision.CenterCrop(crop_size) + resize_op = vision.Resize(resize_size, Inter.LINEAR) # Bilinear mode + data = data.map(operations=[center_crop, resize_op], input_columns=["image"]) + data = data.batch(2) + num_row = 0 + for item in data.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 2 + assert item["image"].shape == (2, 24, 24, 3) + assert item["attr"].shape == (2, 40) + num_row += 1 + assert num_row == 2 + + +@pytest.mark.forked +def test_pipeline_debug_mode_minddata(): + """ + Feature: Pipeline debug mode. + Description: Test iterator with MindDataset in debug mode. + Expectation:Successful. + """ + logger.info("test_pipeline_debug_mode_minddata") + data = ds.MindDataset("../data/mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0") + data_lst = [] + for item in data.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 3 + data_lst.append(item["data"].copy()) + assert len(data_lst) == 20 + + +def test_pipeline_debug_mode_not_support(): + """ + Feature: Pipeline debug mode. + Description: Test creating tuple iterator with op have not supported in pull mode. + Expectation: Successful with no data generated. + """ + logger.info("test_pipeline_debug_mode_not_support") + data = ds.NumpySlicesDataset(data=[[0, 1, 2]], column_names=["data"]) + num_rows = 0 + for _ in data.create_tuple_iterator(num_epochs=1, output_numpy=True): + num_rows += 1 + assert num_rows == 0 + + +@pytest.mark.forked +def test_pipeline_debug_mode_map_pyfunc(): + """ + Feature: Pipeline debug mode. + Description: Test creating dict iterator with map(PyFunc). + Expectation: Successful. + """ + logger.info("test_pipeline_debug_mode_map_pyfunc") + data = ds.CelebADataset("../data/dataset/testCelebAData/", decode=True, num_shards=1, shard_id=0) + data = data.map(operations=[(lambda x: x - 1), (lambda x: x * 2)], input_columns=["image"]) + num_rows = 0 + for item in data.create_dict_iterator(num_epochs=1): + assert len(item) == 2 + assert item["image"].shape == (2268, 4032, 3) + num_rows += 1 + assert num_rows == 4 + + +@pytest.mark.forked +def test_pipeline_debug_mode_batch_pyfunc(): + """ + Feature: Pipeline debug mode. + Description: Test creating dict iterator with Batch(PyFunc). + Expectation: Successful. + """ + logger.info("test_pipeline_debug_mode_batch_pyfunc") + def add_one(batch_info): + return batch_info.get_batch_num() + 1 + + data = ds.MnistDataset("../data/dataset/testMnistData", num_samples=20) + data = data.batch(batch_size=add_one, drop_remainder=True) + num_rows = 0 + for item in data.create_dict_iterator(num_epochs=1, output_numpy=True): + num_rows += 1 + assert item["label"].shape == (num_rows,) + assert num_rows == 5 + + +@pytest.mark.forked +def test_pipeline_debug_mode_concat(): + """ + Feature: Pipeline debug mode. + Description: Test creating tuple iterator with concat. + Expectation: Successful. + """ + logger.info("test_pipeline_debug_mode_concat") + data_dir = "../data/dataset/testCelebAData/" + data1 = ds.CelebADataset(data_dir, decode=True, num_shards=1, shard_id=0) + data2 = ds.CelebADataset(data_dir, decode=True, num_shards=1, shard_id=0) + data3 = ds.CelebADataset(data_dir, decode=True, num_shards=1, shard_id=0) + data4 = data1.concat(data2) + data5 = data3 + data4 + num_rows = 0 + for item1 in data5.create_tuple_iterator(num_epochs=1): + assert len(item1) == 2 + assert item1[0].shape == (2268, 4032, 3) + num_rows += 1 + assert num_rows == 12 + + +if __name__ == '__main__': + test_pipeline_debug_mode_tuple() + test_pipeline_debug_mode_dict() + test_pipeline_debug_mode_minddata() + test_pipeline_debug_mode_not_support() + test_pipeline_debug_mode_map_pyfunc() + test_pipeline_debug_mode_batch_pyfunc() + test_pipeline_debug_mode_concat()