From 5967cb2cd72ba7012e88fc66caa4383613f06341 Mon Sep 17 00:00:00 2001 From: Lixia Chen Date: Fri, 30 Apr 2021 14:51:47 -0400 Subject: [PATCH] Unblock mindrecord under cache --- .../ccsrc/minddata/dataset/CMakeLists.txt | 1 + .../ccsrc/minddata/dataset/api/datasets.cc | 26 ++-- .../engine/ir/datasetops/source/bindings.cc | 12 +- .../engine/datasetops/source/mindrecord_op.cc | 20 +-- .../engine/datasetops/source/mindrecord_op.h | 13 +- .../source/sampler/mind_record_sampler.cc | 2 + .../engine/ir/datasetops/cache_lookup_node.h | 4 + .../engine/ir/datasetops/dataset_node.h | 7 +- .../dataset/engine/ir/datasetops/map_node.cc | 7 +- .../dataset/engine/ir/datasetops/map_node.h | 4 - .../engine/ir/datasetops/source/clue_node.cc | 6 +- .../engine/ir/datasetops/source/csv_node.cc | 6 +- .../ir/datasetops/source/minddata_node.cc | 73 +++++++---- .../ir/datasetops/source/minddata_node.h | 10 +- .../datasetops/source/samplers/CMakeLists.txt | 1 + .../source/samplers/mindrecord_sampler_ir.cc | 56 +++++++++ .../source/samplers/mindrecord_sampler_ir.h | 67 ++++++++++ .../ir/datasetops/source/text_file_node.cc | 6 +- .../ir/datasetops/source/tf_record_node.cc | 6 +- .../engine/opt/pre/cache_transform_pass.cc | 25 +++- .../engine/opt/pre/cache_validation_pass.cc | 2 +- .../dataset/include/dataset/datasets.h | 49 +++++--- .../mindrecord/meta/shard_task_list.cc | 8 +- mindspore/dataset/engine/datasets.py | 6 +- tests/ut/cpp/dataset/c_api_cache_test.cc | 80 ++++++++---- tests/ut/python/cachetests/cachetest_cpp.sh | 5 + tests/ut/python/cachetests/cachetest_py.sh | 3 + tests/ut/python/dataset/test_cache_map.py | 117 +++++++++++------- 28 files changed, 465 insertions(+), 157 deletions(-) create mode 100644 mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.cc create mode 100644 mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h diff --git a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt index 125bee998fb..696087f9a02 100644 --- a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt @@ -123,6 +123,7 @@ if(ENABLE_CACHE) add_dependencies(cpp-API engine-cache-client) add_dependencies(engine-ir-cache engine-cache-client) add_dependencies(engine-ir-datasetops engine-cache-client) + add_dependencies(engine-ir-datasetops-source engine-cache-client) add_dependencies(engine-opt engine-cache-client) add_dependencies(engine-datasetops engine-cache-client) add_dependencies(engine-perf engine-cache-client) diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index 914b0160b42..f4d8969f10b 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -1001,32 +1001,33 @@ ManifestDataset::ManifestDataset(const std::vector &dataset_file, const st MindDataDataset::MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::shared_ptr &sampler, const nlohmann::json *padded_sample, - int64_t num_padded) { + int64_t num_padded, const std::shared_ptr &cache) { auto sampler_obj = sampler ? sampler->Parse() : nullptr; nlohmann::json sample = nullptr; if (padded_sample) { sample = *padded_sample; } auto ds = std::make_shared(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, - sample, num_padded); + sample, num_padded, cache); ir_node_ = std::static_pointer_cast(ds); } MindDataDataset::MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const Sampler *sampler, - const nlohmann::json *padded_sample, int64_t num_padded) { + const nlohmann::json *padded_sample, int64_t num_padded, + const std::shared_ptr &cache) { auto sampler_obj = sampler ? sampler->Parse() : nullptr; nlohmann::json sample = nullptr; if (padded_sample) { sample = *padded_sample; } auto ds = std::make_shared(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, - sample, num_padded); + sample, num_padded, cache); ir_node_ = std::static_pointer_cast(ds); } MindDataDataset::MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded) { + int64_t num_padded, const std::shared_ptr &cache) { auto sampler_obj = sampler.get().Parse(); nlohmann::json sample = nullptr; if (padded_sample) { @@ -1034,13 +1035,13 @@ MindDataDataset::MindDataDataset(const std::vector &dataset_file, } auto ds = std::make_shared(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, - sample, num_padded); + sample, num_padded, cache); ir_node_ = std::static_pointer_cast(ds); } MindDataDataset::MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::shared_ptr &sampler, const nlohmann::json *padded_sample, - int64_t num_padded) { + int64_t num_padded, const std::shared_ptr &cache) { auto sampler_obj = sampler ? sampler->Parse() : nullptr; nlohmann::json sample = nullptr; if (padded_sample) { @@ -1048,12 +1049,13 @@ MindDataDataset::MindDataDataset(const std::vector> &dataset_f } auto ds = std::make_shared(VectorCharToString(dataset_files), VectorCharToString(columns_list), - sampler_obj, sample, num_padded); + sampler_obj, sample, num_padded, cache); ir_node_ = std::static_pointer_cast(ds); } MindDataDataset::MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const Sampler *sampler, - const nlohmann::json *padded_sample, int64_t num_padded) { + const nlohmann::json *padded_sample, int64_t num_padded, + const std::shared_ptr &cache) { auto sampler_obj = sampler ? sampler->Parse() : nullptr; nlohmann::json sample = nullptr; if (padded_sample) { @@ -1061,20 +1063,20 @@ MindDataDataset::MindDataDataset(const std::vector> &dataset_f } auto ds = std::make_shared(VectorCharToString(dataset_files), VectorCharToString(columns_list), - sampler_obj, sample, num_padded); + sampler_obj, sample, num_padded, cache); ir_node_ = std::static_pointer_cast(ds); } MindDataDataset::MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded) { + int64_t num_padded, const std::shared_ptr &cache) { auto sampler_obj = sampler.get().Parse(); nlohmann::json sample = nullptr; if (padded_sample) { sample = *padded_sample; } auto ds = std::make_shared(VectorCharToString(dataset_files), VectorCharToString(columns_list), - sampler_obj, sample, num_padded); + sampler_obj, sample, num_padded, cache); ir_node_ = std::static_pointer_cast(ds); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc index 04013b80d9c..7d1b4137e2d 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc @@ -177,9 +177,9 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) { nlohmann::json padded_sample_json; std::map sample_bytes; THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes)); - auto minddata = - std::make_shared(dataset_file, toStringVector(columns_list), - toSamplerObj(sampler, true), padded_sample_json, num_padded); + auto minddata = std::make_shared(dataset_file, toStringVector(columns_list), + toSamplerObj(sampler, true), padded_sample_json, + num_padded, nullptr); minddata->SetSampleBytes(&sample_bytes); THROW_IF_ERROR(minddata->ValidateParams()); return minddata; @@ -189,9 +189,9 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) { nlohmann::json padded_sample_json; std::map sample_bytes; THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes)); - auto minddata = - std::make_shared(toStringVector(dataset_file), toStringVector(columns_list), - toSamplerObj(sampler, true), padded_sample_json, num_padded); + auto minddata = std::make_shared( + toStringVector(dataset_file), toStringVector(columns_list), toSamplerObj(sampler, true), + padded_sample_json, num_padded, nullptr); minddata->SetSampleBytes(&sample_bytes); THROW_IF_ERROR(minddata->ValidateParams()); return minddata; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc index 1250d83506b..2a6abf2ba4b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc @@ -40,7 +40,7 @@ using mindrecord::ShardOperator; using mindrecord::ShardReader; // Builder constructor. Creates the builder object. -MindRecordOp::Builder::Builder() : build_dataset_file_({}) { +MindRecordOp::Builder::Builder() : build_dataset_file_({}), builder_sampler_(nullptr) { // Some arguments to the MindRecordOp constructor have a default argument that is taken // from the client config. // The user may choose to change these values for the construction of the MindRecordOp by @@ -69,11 +69,14 @@ Status MindRecordOp::Builder::Build(std::shared_ptr *ptr) { } std::unique_ptr shard_reader = std::make_unique(); + if (builder_sampler_ == nullptr) { + builder_sampler_ = std::make_shared(shard_reader.get()); + } - new_mind_record_op = - std::make_shared(build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_, - build_op_connector_queue_size_, build_columns_to_load_, build_operators_, - build_num_padded_, sample_json, build_sample_bytes_, std::move(shard_reader)); + new_mind_record_op = std::make_shared( + build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_, build_op_connector_queue_size_, + build_columns_to_load_, build_operators_, build_num_padded_, sample_json, build_sample_bytes_, + std::move(shard_reader), builder_sampler_); RETURN_IF_NOT_OK(new_mind_record_op->Init()); *ptr = std::move(new_mind_record_op); @@ -115,9 +118,8 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector &columns_to_load, const std::vector> &operators, int64_t num_padded, const mindrecord::json &sample_json, const std::map &sample_bytes, - std::unique_ptr shard_reader) - : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, - std::make_shared(shard_reader.get())), + std::unique_ptr shard_reader, std::shared_ptr sampler) + : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)), dataset_file_(dataset_file), load_dataset_(load_dataset), columns_to_load_(columns_to_load), @@ -275,6 +277,7 @@ Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, int64_t row_id, in RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, {}, mindrecord::json(), task_type)); std::vector file_path(fetched_row->size(), dataset_file_[0]); fetched_row->setPath(file_path); + fetched_row->setId(row_id); } if (tupled_buffer.empty()) return Status::OK(); if (task_type == mindrecord::TaskType::kCommonTask) { @@ -284,6 +287,7 @@ Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, int64_t row_id, in RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, columns_blob, columns_json, task_type)); std::vector file_path(fetched_row->size(), dataset_file_[0]); fetched_row->setPath(file_path); + fetched_row->setId(row_id); } } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h index 2efc37e0fde..0e179dcb2b6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "minddata/dataset/engine/data_schema.h" @@ -107,6 +108,14 @@ class MindRecordOp : public MappableLeafOp { return *this; } + // Setter method + // @param std::shared_ptr sampler + // @return Builder setter method returns reference to the builder. + Builder &SetSampler(std::shared_ptr sampler) { + builder_sampler_ = std::move(sampler); + return *this; + } + Status SanityCheck() const; static int32_t num_mind_record_workers() { return kDefaultMindRecordWorkers; } @@ -128,6 +137,7 @@ class MindRecordOp : public MappableLeafOp { int64_t build_num_padded_; py::handle build_sample_; std::map build_sample_bytes_; + std::shared_ptr builder_sampler_; }; // Constructor of the MindRecordOp. @@ -137,11 +147,12 @@ class MindRecordOp : public MappableLeafOp { // @param op_connector_queue_size - The output connector queue size // @param columns_to_load - The list of columns to use (column name) // @param operators - ShardOperators for Shuffle, Category, Sample + // @param sampler - sampler tells MindRecordOp what to read MindRecordOp(int32_t num_mind_record_workers, std::vector dataset_file, bool load_dataset, int32_t op_connector_queue_size, const std::vector &columns_to_load, const std::vector> &operators, int64_t num_padded_, const mindrecord::json &sample_json, const std::map &sample_bytes_, - std::unique_ptr shard_reader); + std::unique_ptr shard_reader, std::shared_ptr sampler); // Destructor ~MindRecordOp() override; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.cc index 3e305bbfed0..fa3d65c119d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.cc @@ -62,6 +62,8 @@ Status MindRecordSamplerRT::InitSampler() { Status MindRecordSamplerRT::ResetSampler() { // drive the shard reader reshuffle tasks to redo the sampling for another epoch + // Note that when cache is attached, this function is driven by cache lookup op rather than mindrecord op. + // Therefore, the reshuffle of tasks might happen in the middle of mindrecord's epoch next_id_ = 0; shard_reader_->ShuffleTask(); return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/cache_lookup_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/cache_lookup_node.h index 443e4f2003f..c87cb18a2ae 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/cache_lookup_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/cache_lookup_node.h @@ -76,6 +76,10 @@ class CacheLookupNode : public DatasetNode, public SamplerObj { /// \return Status of the node visit Status AcceptAfter(IRNodePass *const p, bool *const modified) override; + /// \brief Sampler getter + /// \return SamplerObj of the current node + std::shared_ptr Sampler() { return sampler_; } + private: std::shared_ptr sampler_; std::shared_ptr lookup_op_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h index f93bd821e36..a3eeea59090 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h @@ -237,7 +237,7 @@ class DatasetNode : public std::enable_shared_from_this { /// \return True if this node is not a data source node const bool IsNotADataSource() const { return (mappable_ == kNotADataSource); } - /// \brief Check if this node is a descendant of an operator with cache. Currently used in leaf nodes + /// \brief Check if this node is a descendant of an operator with cache. /// \return True if a cache-enabled operator is an ancestor of this node const bool IsDescendantOfCache() const { return descendant_of_cache_; } @@ -247,7 +247,7 @@ class DatasetNode : public std::enable_shared_from_this { return node != nullptr && node->parent_ == nullptr && node->Children().empty(); } - /// \brief Mark to indicate this node is a descendant of an operator with cache. Currently used in leaf nodes + /// \brief Mark to indicate this node is a descendant of an operator with cache. void HasCacheAbove() { descendant_of_cache_ = true; } /// \brief Getter of the number of workers @@ -335,7 +335,8 @@ class DatasetNode : public std::enable_shared_from_this { enum DataSource { kNotADataSource = 0, kNonMappableSource = 1, kMappableSource = 2 }; enum DataSource mappable_; bool nary_op_; // an indicator of whether the current node supports multiple children, true for concat/zip node - bool descendant_of_cache_; + bool descendant_of_cache_; // an indicator of whether the current node is a descendant of cache. + // Initially set to false, will set to true by the optimizer when conditions are met. }; // MappableSourceNode represents the leaf nodes that can be randomly accessed with indexes. diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc index 942359b3b84..1aad171ca0f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc @@ -38,8 +38,7 @@ MapNode::MapNode(std::shared_ptr child, std::vectorAddChild(child); } @@ -68,9 +67,9 @@ Status MapNode::Build(std::vector> *const node_ops) { // This is temporary code. // Because the randomness of its tensor operations is not known in TensorOperation form until we convert them // to TensorOp, we need to check the randomness here. - // When TensorOperation captures the randomness behaviour, remove this code and the member "under_a_cache_" + // When TensorOperation captures the randomness behaviour, remove this code // and the temporary code in CacheValidation pre pass in IR optimizer. - if (under_a_cache_) { + if (IsDescendantOfCache()) { auto itr = std::find_if(tensor_ops.begin(), tensor_ops.end(), [](const auto &it) { return !it->Deterministic(); }); if (itr != tensor_ops.end()) { RETURN_STATUS_UNEXPECTED("MapNode containing random operation is not supported as a descendant of cache."); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h index 4589ee40876..7fb4c419030 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h @@ -79,9 +79,6 @@ class MapNode : public DatasetNode { /// \brief setter to set all tensor operations void setOperations(const std::vector> &operations); - /// \brief indicate this Map will be cached - void Cached() { under_a_cache_ = true; } - /// \brief Getter functions /// \brief Getter of tensor operations /// \return Vector of operations the Map node will process @@ -102,7 +99,6 @@ class MapNode : public DatasetNode { std::vector output_columns_; std::vector project_columns_; std::vector> callbacks_; - bool under_a_cache_; }; } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc index 4e58ade020d..c3c614574c5 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc @@ -181,7 +181,11 @@ Status CLUENode::Build(std::vector> *const node_ops) RETURN_IF_NOT_OK(clue_op->Init()); - if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { + // If a global shuffle is used for Clue, it will inject a shuffle op over the Clue. + // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. + // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset Clue's shuffle + // option to false. + if (shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; int64_t num_rows = 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc index 99e999c04c9..453204cd16b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc @@ -119,7 +119,11 @@ Status CSVNode::Build(std::vector> *const node_ops) { RETURN_IF_NOT_OK(csv_op->Init()); - if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { + // If a global shuffle is used for CSV, it will inject a shuffle op over the CSV. + // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. + // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset CSV's shuffle + // option to false. + if (shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; int64_t num_rows = 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc index 5858859b6c2..0303c4140c9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc @@ -20,10 +20,13 @@ #include #include #include +#include #include #include "minddata/dataset/engine/datasetops/source/mindrecord_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h" +#include "minddata/dataset/engine/ir/datasetops/cache_lookup_node.h" +#include "minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h" #include "minddata/dataset/engine/opt/pass.h" #include "minddata/dataset/util/status.h" @@ -31,36 +34,40 @@ namespace mindspore { namespace dataset { MindDataNode::MindDataNode(const std::vector &dataset_files, const std::vector &columns_list, - const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded) - : MappableSourceNode(), + const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, + std::shared_ptr cache = nullptr) + : MappableSourceNode(std::move(cache)), dataset_file_(std::string()), dataset_files_(dataset_files), search_for_pattern_(false), columns_list_(columns_list), - sampler_(sampler), + input_sampler_(sampler), + sampler_(std::make_shared()), padded_sample_(padded_sample), sample_bytes_({}), num_padded_(num_padded) {} MindDataNode::MindDataNode(const std::string &dataset_file, const std::vector &columns_list, - const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded) - : MappableSourceNode(), + const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, + std::shared_ptr cache = nullptr) + : MappableSourceNode(std::move(cache)), dataset_file_(dataset_file), dataset_files_({}), search_for_pattern_(true), columns_list_(columns_list), - sampler_(sampler), + input_sampler_(sampler), + sampler_(std::make_shared()), padded_sample_(padded_sample), sample_bytes_({}), num_padded_(num_padded) {} std::shared_ptr MindDataNode::Copy() { std::shared_ptr node; - std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); + std::shared_ptr sampler = (input_sampler_ == nullptr) ? nullptr : input_sampler_->SamplerCopy(); if (dataset_files_.empty()) { - node = std::make_shared(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_); + node = std::make_shared(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_, cache_); } else { - node = std::make_shared(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_); + node = std::make_shared(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_, cache_); } node->SetSampleBytes(&sample_bytes_); return node; @@ -82,7 +89,7 @@ Status MindDataNode::ValidateParams() { search_for_pattern_ ? std::vector{dataset_file_} : dataset_files_; RETURN_IF_NOT_OK(ValidateDatasetFilesParam("MindDataNode", dataset_file_vec)); - RETURN_IF_NOT_OK(ValidateDatasetSampler("MindDataNode", sampler_)); + RETURN_IF_NOT_OK(ValidateDatasetSampler("MindDataNode", input_sampler_)); if (!columns_list_.empty()) { RETURN_IF_NOT_OK(ValidateDatasetColumnParam("MindDataNode", "columns_list", columns_list_)); @@ -153,22 +160,46 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr *sample_bytes) { sample_bytes_ = *sample_bytes; } Status MindDataNode::Build(std::vector> *const node_ops) { - RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(sampler_, &operators_, num_padded_)); + RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators_, num_padded_)); + + std::shared_ptr sampler_rt = nullptr; + // Build the sampler IR into a runtime sampler. + // This will also create a shard reader object, saved in this node's sampler_. + RETURN_IF_NOT_OK(sampler_->SamplerBuild(&sampler_rt)); + + // Now we need to acquire the newly created shard reader from this node's sampler_. + // There are two cases: + // 1. If this node is cached, now after cache transform pass, its sampler_ has already been replaced by cache lookup + // node, and we should find the shard reader from cache lookup node's sampler_. + // 2. If this node is not cached, just acquire the shard reader from this node's sampler_. + std::unique_ptr shard_reader; + if (IsDescendantOfCache()) { + auto cache_lookup_sampler = std::dynamic_pointer_cast(sampler_); + CHECK_FAIL_RETURN_UNEXPECTED(cache_lookup_sampler != nullptr, + "Internal error. MindDataNode is cached, its sampler should be cache lookup node"); + auto mr_sampler = std::dynamic_pointer_cast(cache_lookup_sampler->Sampler()); + CHECK_FAIL_RETURN_UNEXPECTED(mr_sampler != nullptr, + "Internal error. CacheLookupNode's sampler should be a MindRecordSamplerObj object"); + RETURN_IF_NOT_OK(mr_sampler->GetShardReader(&shard_reader)); + } else { + auto mr_sampler = std::dynamic_pointer_cast(sampler_); + CHECK_FAIL_RETURN_UNEXPECTED(mr_sampler != nullptr, + "Internal error. MindDataNode's sampler should be a MindRecordSamplerObj object"); + RETURN_IF_NOT_OK(mr_sampler->GetShardReader(&shard_reader)); + } std::shared_ptr mindrecord_op; - std::unique_ptr shard_reader = std::make_unique(); - // If pass a string to MindData(), it will be treated as a pattern to search for matched files, // else if pass a vector to MindData(), it will be treated as specified files to be read if (search_for_pattern_) { std::vector dataset_file_vec_ = {dataset_file_}; - mindrecord_op = std::make_shared(num_workers_, dataset_file_vec_, search_for_pattern_, - connector_que_size_, columns_list_, operators_, num_padded_, - padded_sample_, sample_bytes_, std::move(shard_reader)); + mindrecord_op = std::make_shared( + num_workers_, dataset_file_vec_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_, + padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt)); } else { - mindrecord_op = std::make_shared(num_workers_, dataset_files_, search_for_pattern_, - connector_que_size_, columns_list_, operators_, num_padded_, - padded_sample_, sample_bytes_, std::move(shard_reader)); + mindrecord_op = std::make_shared( + num_workers_, dataset_files_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_, + padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt)); } RETURN_IF_NOT_OK(mindrecord_op->Init()); @@ -181,7 +212,7 @@ Status MindDataNode::Build(std::vector> *const node_o // Get the shard id of node Status MindDataNode::GetShardId(int32_t *shard_id) { - *shard_id = sampler_->ShardId(); + *shard_id = input_sampler_->ShardId(); return Status::OK(); } @@ -195,7 +226,7 @@ Status MindDataNode::GetDatasetSize(const std::shared_ptr &si } int64_t num_rows = -1; std::vector> operators; - RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(sampler_, &operators, num_padded_)); + RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators, num_padded_)); if (search_for_pattern_) { dataset_files_ = {dataset_file_}; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h index c91ee6ebf2c..977f1a81f6b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h @@ -32,11 +32,13 @@ class MindDataNode : public MappableSourceNode { public: /// \brief Constructor MindDataNode(const std::vector &dataset_files, const std::vector &columns_list, - const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded); + const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, + std::shared_ptr cache); /// \brief Constructor MindDataNode(const std::string &dataset_file, const std::vector &columns_list, - const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded); + const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, + std::shared_ptr cache); /// \brief Destructor ~MindDataNode() = default; @@ -109,7 +111,9 @@ class MindDataNode : public MappableSourceNode { std::vector dataset_files_; // search_for_pattern_ will be false in this mode bool search_for_pattern_; std::vector columns_list_; - std::shared_ptr sampler_; + std::shared_ptr input_sampler_; // The sampler from users input, will be used to create a set of shard + // operators. + std::shared_ptr sampler_; // An auto-created sampler, IR of runtime MindRecordSamplerRT sampler nlohmann::json padded_sample_; std::map sample_bytes_; // enable in python int64_t num_padded_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/CMakeLists.txt index df8e489ba77..b5d10fce188 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/CMakeLists.txt @@ -11,6 +11,7 @@ set(DATASET_ENGINE_IR_DATASETOPS_SOURCE_SAMPLERS_SRC_FILES subset_random_sampler_ir.cc subset_sampler_ir.cc weighted_random_sampler_ir.cc + mindrecord_sampler_ir.cc ) add_library(engine-ir-datasetops-source-samplers OBJECT ${DATASET_ENGINE_IR_DATASETOPS_SOURCE_SAMPLERS_SRC_FILES}) \ No newline at end of file diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.cc new file mode 100644 index 00000000000..fd07a2f3d10 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.cc @@ -0,0 +1,56 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h" + +#include +#include + +#ifndef ENABLE_ANDROID +#include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h" +#include "minddata/mindrecord/include/shard_reader.h" +#endif + +namespace mindspore { +namespace dataset { + +#ifndef ENABLE_ANDROID +// This function not only creates a runtime sampler object, but also creates a ShardReader, +// which will also be needed to build a runtime MindRecordOp +// (cannot add another output parameter because it has to override base class's function) +Status MindRecordSamplerObj::SamplerBuild(std::shared_ptr *sampler) { + shard_reader_ = std::make_unique(); + *sampler = std::make_shared(shard_reader_.get()); + return Status::OK(); +} + +std::shared_ptr MindRecordSamplerObj::SamplerCopy() { + auto sampler = std::make_shared(); + return sampler; +} + +// Function to acquire the unique pointer of the newly created ShardReader object +// Note this function can only be called after SamplerBuild is finished, and can only be called once. Otherwise this +// function will return error status. +Status MindRecordSamplerObj::GetShardReader(std::unique_ptr *shard_reader) { + CHECK_FAIL_RETURN_UNEXPECTED(shard_reader_ != nullptr, "Internal error. Attempt to get an empty shard reader."); + *shard_reader = std::move(shard_reader_); + return Status::OK(); +} +#endif + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h new file mode 100644 index 00000000000..e95fdb0ef55 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h @@ -0,0 +1,67 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_MINDRECORD_SAMPLER_IR_H +#define MINDSPORE_MINDRECORD_SAMPLER_IR_H + +#include + +#include "minddata/dataset/engine/ir/datasetops/source/samplers/samplers_ir.h" +#include "include/api/status.h" +#ifndef ENABLE_ANDROID +#include "minddata/mindrecord/include/shard_reader.h" +#endif + +namespace mindspore { +namespace dataset { + +#ifndef ENABLE_ANDROID +class MindRecordSamplerObj : public SamplerObj { + public: + /// \brief Constructor + MindRecordSamplerObj() : shard_reader_(nullptr) {} + + /// \brief Destructor + ~MindRecordSamplerObj() = default; + + /// \brief Convert a MindRecordSamplerObj into a runtime MindRecordSamplerRT object + /// Note that this function not only creates a runtime sampler object, but also creates a ShardReader, + /// which will also be needed to build a runtime MindRecordOp + /// \param[out] sampler Shared pointer to the newly created runtime sampler + /// \return The Status code of the function. It returns OK status if sampler is created successfully. + Status SamplerBuild(std::shared_ptr *sampler) override; + + /// \brief Function to copy a MindRecordSamplerObj + /// \return Shared pointer to the newly created SamplerObj + std::shared_ptr SamplerCopy() override; + + /// \brief Function for parameter check. This class requires no input parameter. + /// \return The Status code of the function. This function always return OK status. + Status ValidateParams() override { return Status::OK(); } + + /// \brief Function to acquire the unique pointer of the newly created ShardReader object + /// Note that this function can only be called after SamplerBuild is called, and can only be called once + /// \param shard_reader Unique pointer to the newly created ShardReader object + /// \return The Status code of the function. It returns OK status if acquired a non-empty ShardReader object. + Status GetShardReader(std::unique_ptr *shard_reader); + + private: + std::unique_ptr shard_reader_; +}; +#endif + +} // namespace dataset +} // namespace mindspore +#endif // MINDSPORE_MINDRECORD_SAMPLER_IR_H diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc index 58e80b2a4df..c96ebcaf9d5 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc @@ -87,7 +87,11 @@ Status TextFileNode::Build(std::vector> *const node_o sorted_dataset_files, connector_que_size_, shuffle_files, num_shards_, shard_id_); RETURN_IF_NOT_OK(text_file_op->Init()); - if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { + // If a global shuffle is used for TextFile, it will inject a shuffle op over the TextFile. + // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. + // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset TextFile's shuffle + // option to false. + if (shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; int64_t num_rows = 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc index db336343e6e..ad99f8599e6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc @@ -129,7 +129,11 @@ Status TFRecordNode::Build(std::vector> *const node_o RETURN_IF_NOT_OK(tf_reader_op->Init()); - if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { + // If a global shuffle is used for TFRecord, it will inject a shuffle op over the TFRecord. + // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. + // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset TFRecord's shuffle + // option to false. + if (shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc index 7e4510ef96f..75245fe99fe 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc @@ -94,6 +94,9 @@ Status CacheTransformPass::CachePass::Visit(std::shared_ptr node, bool *const modified) { if (node->IsCached()) { MS_LOG(INFO) << "Cache transform pass: CacheOp found, identified descendant tree."; @@ -137,11 +140,25 @@ Status CacheTransformPass::CachePass::Visit(std::shared_ptr } #ifndef ENABLE_ANDROID -// Perform leaf node cache transform identification +// Almost the same with MappableSourceNode's Visit, only in this one we also marked this node's descendant_of_cache_ +// field to true. Later when building, MindDataNode will take different actions based on this information. Status CacheTransformPass::CachePass::Visit(std::shared_ptr node, bool *const modified) { - if (node->IsCached() || is_caching_) { - return Status(StatusCode::kMDNotImplementedYet, __LINE__, __FILE__, - "There is currently no support for MindRecordOp under cache."); + if (node->IsCached()) { + MS_LOG(INFO) << "Cache transform pass: CacheOp found, identified descendant tree."; + is_caching_ = true; + } + // Cache might also be injected to the non-leaf node upper in the tree, so is_caching_ might also be set to true + // by the other Visit() with DatasetNode argument + if (is_caching_) { + node->HasCacheAbove(); + MS_LOG(DEBUG) << "Cache transform pass: Mappable leaf in a cache descendant tree detected"; + // If a leaf has already been assigned, then we have more than one leaf inside this cache descendant tree. + if (leaf_node_) { + return Status(StatusCode::kMDNotImplementedYet, __LINE__, __FILE__, + "There is currently no support for multiple leaf nodes under cache."); + } + // If we are a leaf in the caching path, then save this leaf + leaf_node_ = node; } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_validation_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_validation_pass.cc index bd562ac674f..ee75b9bfcc3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_validation_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_validation_pass.cc @@ -121,7 +121,7 @@ Status CacheValidationPass::Visit(std::shared_ptr node, bool *const mod // to TensorOp, we need to check the randomness in MapNode::Build(). // By setting this MapNode is under a cache, we will check the randomness of its tensor operations without the need // to walk the IR tree again. - node->Cached(); + node->HasCacheAbove(); auto tfuncs = node->TensorOperations(); for (size_t i = 0; i < tfuncs.size(); i++) { diff --git a/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h b/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h index e0cf036a9e6..c999893d9d0 100644 --- a/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h @@ -1247,22 +1247,25 @@ class MindDataDataset : public Dataset { public: explicit MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::shared_ptr &sampler, const nlohmann::json *padded_sample, - int64_t num_padded); + int64_t num_padded, const std::shared_ptr &cache); explicit MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, - const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded); + const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded, + const std::shared_ptr &cache); explicit MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded); + int64_t num_padded, const std::shared_ptr &cache); explicit MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::shared_ptr &sampler, - const nlohmann::json *padded_sample, int64_t num_padded); + const nlohmann::json *padded_sample, int64_t num_padded, + const std::shared_ptr &cache); explicit MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const Sampler *sampler, - const nlohmann::json *padded_sample, int64_t num_padded); + const nlohmann::json *padded_sample, int64_t num_padded, + const std::shared_ptr &cache); explicit MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded); + int64_t num_padded, const std::shared_ptr &cache); ~MindDataDataset() = default; }; @@ -1276,13 +1279,14 @@ class MindDataDataset : public Dataset { /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. +/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). /// \return Shared pointer to the current MindDataDataset inline std::shared_ptr MindData( const std::string &dataset_file, const std::vector &columns_list = {}, const std::shared_ptr &sampler = std::make_shared(), nlohmann::json *padded_sample = nullptr, - int64_t num_padded = 0) { + int64_t num_padded = 0, const std::shared_ptr &cache = nullptr) { return std::make_shared(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded); + padded_sample, num_padded, cache); } /// \brief Function to create a MindDataDataset @@ -1293,12 +1297,14 @@ inline std::shared_ptr MindData( /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. +/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). /// \return Shared pointer to the current MindDataDataset inline std::shared_ptr MindData(const std::string &dataset_file, const std::vector &columns_list, const Sampler *sampler, - nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { + nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + const std::shared_ptr &cache = nullptr) { return std::make_shared(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded); + padded_sample, num_padded, cache); } /// \brief Function to create a MindDataDataset @@ -1309,13 +1315,15 @@ inline std::shared_ptr MindData(const std::string &dataset_file /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. +/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). /// \return Shared pointer to the current MindDataDataset inline std::shared_ptr MindData(const std::string &dataset_file, const std::vector &columns_list, const std::reference_wrapper sampler, - nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { + nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + const std::shared_ptr &cache = nullptr) { return std::make_shared(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded); + padded_sample, num_padded, cache); } /// \brief Function to create a MindDataDataset @@ -1327,13 +1335,14 @@ inline std::shared_ptr MindData(const std::string &dataset_file /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. +/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). /// \return Shared pointer to the current MindDataDataset inline std::shared_ptr MindData( const std::vector &dataset_files, const std::vector &columns_list = {}, const std::shared_ptr &sampler = std::make_shared(), nlohmann::json *padded_sample = nullptr, - int64_t num_padded = 0) { + int64_t num_padded = 0, const std::shared_ptr &cache = nullptr) { return std::make_shared(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded); + padded_sample, num_padded, cache); } /// \brief Function to create a MindDataDataset @@ -1343,12 +1352,14 @@ inline std::shared_ptr MindData( /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. +/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). /// \return Shared pointer to the current MindDataDataset inline std::shared_ptr MindData(const std::vector &dataset_files, const std::vector &columns_list, const Sampler *sampler, - nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { + nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + const std::shared_ptr &cache = nullptr) { return std::make_shared(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded); + padded_sample, num_padded, cache); } /// \brief Function to create a MindDataDataset @@ -1358,13 +1369,15 @@ inline std::shared_ptr MindData(const std::vector /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. +/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). /// \return Shared pointer to the current MindDataDataset inline std::shared_ptr MindData(const std::vector &dataset_files, const std::vector &columns_list, const std::reference_wrapper sampler, - nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { + nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + const std::shared_ptr &cache = nullptr) { return std::make_shared(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded); + padded_sample, num_padded, cache); } class MnistDataset : public Dataset { diff --git a/mindspore/ccsrc/minddata/mindrecord/meta/shard_task_list.cc b/mindspore/ccsrc/minddata/mindrecord/meta/shard_task_list.cc index c55d783aef5..a4d74ad0fd5 100644 --- a/mindspore/ccsrc/minddata/mindrecord/meta/shard_task_list.cc +++ b/mindspore/ccsrc/minddata/mindrecord/meta/shard_task_list.cc @@ -63,10 +63,12 @@ void ShardTaskList::TaskListSwap(ShardTaskList &orig_tasks, ShardTaskList &new_t // When swapping, if the orig_tasks contains fields that need to be preserved after the swap, then swapping with a // new_tasks that does not have those fields will result in clobbering/losing the data after the swap. // The task_list_ should not be lost/clobbered. - new_tasks.task_list_ = std::move(orig_tasks.task_list_); + // This function can be called in the middle of mindrecord's epoch, when orig_tasks.task_list_ is still being + // used by mindrecord op's worker threads. So don't touch its task_list_ since this field should be preserved anyways. - // Now, it's safe to drive the swap. - std::swap(orig_tasks, new_tasks); + std::swap(orig_tasks.categories, new_tasks.categories); + std::swap(orig_tasks.permutation_, new_tasks.permutation_); + std::swap(orig_tasks.sample_ids_, new_tasks.sample_ids_); } void ShardTaskList::PopBack() { task_list_.pop_back(); } diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index d098745fb83..d8b6602e531 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -3052,6 +3052,8 @@ class MindDataset(MappableDataset): plus num_padded should be divisible by num_shards. num_samples (int, optional): The number of samples to be included in the dataset (default=None, all samples). + cache (DatasetCache, optional): Use tensor caching service to speed up dataset processing. + (default=None, which means no cache is used). Raises: ValueError: If num_shards is specified but shard_id is None. @@ -3068,9 +3070,9 @@ class MindDataset(MappableDataset): @check_minddataset def __init__(self, dataset_file, columns_list=None, num_parallel_workers=None, shuffle=None, num_shards=None, - shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None): + shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None, cache=None): super().__init__(num_parallel_workers=num_parallel_workers, sampler=sampler, num_samples=num_samples, - shuffle=shuffle, num_shards=num_shards, shard_id=shard_id) + shuffle=shuffle, num_shards=num_shards, shard_id=shard_id, cache=cache) if isinstance(dataset_file, list): self.load_dataset = False else: diff --git a/tests/ut/cpp/dataset/c_api_cache_test.cc b/tests/ut/cpp/dataset/c_api_cache_test.cc index 7da53774817..67fb7cfea81 100644 --- a/tests/ut/cpp/dataset/c_api_cache_test.cc +++ b/tests/ut/cpp/dataset/c_api_cache_test.cc @@ -32,7 +32,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiSamplerNull) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true, "127.0.0.1", 50053, 1, 1); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false, "127.0.0.1", 50053, 1, 1); EXPECT_NE(some_cache, nullptr); // Create an ImageFolder Dataset, this folder_path only has 2 images in it @@ -52,7 +52,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiNestedCache) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create an ImageFolder Dataset, this folder_path only has 2 images in it @@ -80,7 +80,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheImageFolderCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create an ImageFolder Dataset, this folder_path only has 2 images in it @@ -121,7 +121,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCocoCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a Coco Dataset, this folder_path has 6 images in it @@ -164,7 +164,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMnistCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a Mnist Dataset @@ -205,7 +205,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCelebaCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a CelebA Dataset, this folder_path has 4 records in it @@ -247,7 +247,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheManifestCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a Manifest Dataset, this file_path has 2 records in it @@ -288,7 +288,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCifar10CApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a Cifar10 Dataset @@ -329,7 +329,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCifar100CApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a Cifar100 Dataset @@ -370,7 +370,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheVocCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a VOC Dataset, this folder_path has 9 records in it @@ -412,7 +412,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheAlbumCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); std::string folder_path = datasets_root_path_ + "/testAlbum/images"; @@ -449,12 +449,50 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheAlbumCApi) { iter->Stop(); } +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMindRecordCApi) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); + EXPECT_NE(some_cache, nullptr); + + // Create a MindData Dataset + // Pass one mindrecord shard file to parse dataset info, and search for other mindrecord files with same dataset info, + // thus all records in imagenet.mindrecord0 ~ imagenet.mindrecord3 will be read + std::string file_path = datasets_root_path_ + "/../mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0"; + + // Create a MindRecord Dataset, 20 records in it + std::shared_ptr ds = MindData(file_path, {}, std::make_shared(), nullptr, 0, some_cache); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + ASSERT_OK(iter->GetNextRow(&row)); + } + + EXPECT_EQ(i, 20); + + // Manually terminate the pipeline + iter->Stop(); +} + TEST_F(MindDataTestCacheOp, DISABLED_TestCacheRandomDataCApi) { session_id_type env_session; Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a RandomDataset @@ -496,7 +534,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi1) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a TFRecord Dataset, this file_path has 3 records in it @@ -539,7 +577,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi2) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a TFRecord Dataset, this file_path has 3 records in it @@ -590,7 +628,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi3) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a TFRecord Dataset, this file_path has 3 records in it @@ -637,7 +675,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTextfileCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a TextFile Dataset, this file_path has 3 records in it @@ -680,7 +718,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCsvCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a CSV Dataset, this file_path has 3 records in it @@ -724,7 +762,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheClueCApi) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create a CLUE Dataset, this file_path has 3 records in it @@ -769,7 +807,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare1) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create an ImageFolder Dataset, this folder_path only has 2 images in it @@ -821,7 +859,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare2) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create an ImageFolder Dataset, this folder_path only has 2 images in it @@ -874,7 +912,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShareFailure1) { Status s = GetSessionFromEnv(&env_session); EXPECT_EQ(s, Status::OK()); - std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, false); EXPECT_NE(some_cache, nullptr); // Create an ImageFolder Dataset, this folder_path only has 2 images in it diff --git a/tests/ut/python/cachetests/cachetest_cpp.sh b/tests/ut/python/cachetests/cachetest_cpp.sh index 1b94cab30be..b2fdf1f577b 100755 --- a/tests/ut/python/cachetests/cachetest_cpp.sh +++ b/tests/ut/python/cachetests/cachetest_cpp.sh @@ -29,6 +29,11 @@ UT_TEST_DIR="${BUILD_PATH}/mindspore/tests/ut/cpp" DateStamp=$(date +%Y%m%d_%H%M%S); CPP_TEST_LOG_OUTPUT="/tmp/ut_tests_cache_${DateStamp}.log" +## prepare data for dataset & mindrecord +cp -fr ${PROJECT_PATH}/tests/ut/data ${UT_TEST_DIR} +## prepare album dataset, uses absolute path so has to be generated +python ${UT_TEST_DIR}/data/dataset/testAlbum/gen_json.py + # start cache server with a spilling path to be used for all tests cmd="${CACHE_ADMIN} --start -s /tmp" CacheAdminCmd "${cmd}" 0 diff --git a/tests/ut/python/cachetests/cachetest_py.sh b/tests/ut/python/cachetests/cachetest_py.sh index de08bce4eec..447de305c4e 100755 --- a/tests/ut/python/cachetests/cachetest_py.sh +++ b/tests/ut/python/cachetests/cachetest_py.sh @@ -121,6 +121,9 @@ HandleRcExit $? 0 0 PytestCmd "test_cache_map.py" "test_cache_map_voc" 1 HandleRcExit $? 0 0 +PytestCmd "test_cache_map.py" "test_cache_map_mindrecord" 1 +HandleRcExit $? 0 0 + PytestCmd "test_cache_map.py" "test_cache_map_python_sampler" 1 HandleRcExit $? 0 0 diff --git a/tests/ut/python/dataset/test_cache_map.py b/tests/ut/python/dataset/test_cache_map.py index 9d4a3e1260c..24db7dcb311 100644 --- a/tests/ut/python/dataset/test_cache_map.py +++ b/tests/ut/python/dataset/test_cache_map.py @@ -413,47 +413,6 @@ def test_cache_map_failure5(): logger.info('test_cache_failure5 Ended.\n') -@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") -def test_cache_map_failure6(): - """ - Test no-cache-supporting MindRecord leaf with Map under cache (failure) - - repeat - | - Cache - | - Map(resize) - | - MindRecord - - """ - logger.info("Test cache failure 6") - if "SESSION_ID" in os.environ: - session_id = int(os.environ['SESSION_ID']) - else: - raise RuntimeError("Testcase requires SESSION_ID environment variable") - - some_cache = ds.DatasetCache(session_id=session_id, size=0) - - columns_list = ["id", "file_name", "label_name", "img_data", "label_data"] - num_readers = 1 - # The dataset has 5 records - data = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list, num_readers) - resize_op = c_vision.Resize((224, 224)) - - data = data.map(input_columns=["img_data"], operations=resize_op, cache=some_cache) - data = data.repeat(4) - - with pytest.raises(RuntimeError) as e: - num_iter = 0 - for _ in data.create_dict_iterator(): - num_iter += 1 - assert "There is currently no support for MindRecordOp under cache" in str(e.value) - - assert num_iter == 0 - logger.info('test_cache_failure6 Ended.\n') - - @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") def test_cache_map_failure7(): """ @@ -1997,6 +1956,79 @@ class ReverseSampler(ds.Sampler): yield i +@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") +def test_cache_map_mindrecord1(): + """ + Test mappable mindrecord leaf with cache op right over the leaf + + cache + | + MindRecord + """ + + logger.info("Test cache map mindrecord1") + if "SESSION_ID" in os.environ: + session_id = int(os.environ['SESSION_ID']) + else: + session_id = 1 + + some_cache = ds.DatasetCache(session_id=session_id, size=0) + + # This dataset has 5 records + columns_list = ["id", "file_name", "label_name", "img_data", "label_data"] + ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list, cache=some_cache) + + num_epoch = 4 + iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) + + epoch_count = 0 + for _ in range(num_epoch): + assert sum([1 for _ in iter1]) == 5 + epoch_count += 1 + assert epoch_count == num_epoch + + logger.info("test_cache_map_mindrecord1 Ended.\n") + + +@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") +def test_cache_map_mindrecord2(): + """ + Test mappable mindrecord leaf with the cache op later in the tree above the map(decode) + + cache + | + Map(decode) + | + MindRecord + """ + + logger.info("Test cache map mindrecord2") + if "SESSION_ID" in os.environ: + session_id = int(os.environ['SESSION_ID']) + else: + session_id = 1 + + some_cache = ds.DatasetCache(session_id=session_id, size=0) + + # This dataset has 5 records + columns_list = ["id", "file_name", "label_name", "img_data", "label_data"] + ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list) + + decode_op = c_vision.Decode() + ds1 = ds1.map(input_columns=["img_data"], operations=decode_op, cache=some_cache) + + num_epoch = 4 + iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) + + epoch_count = 0 + for _ in range(num_epoch): + assert sum([1 for _ in iter1]) == 5 + epoch_count += 1 + assert epoch_count == num_epoch + + logger.info("test_cache_map_mindrecord2 Ended.\n") + + @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") def test_cache_map_python_sampler1(): """ @@ -2169,7 +2201,6 @@ if __name__ == '__main__': test_cache_map_failure3() test_cache_map_failure4() test_cache_map_failure5() - test_cache_map_failure6() test_cache_map_failure7() test_cache_map_failure8() test_cache_map_failure9() @@ -2210,6 +2241,8 @@ if __name__ == '__main__': test_cache_map_cifar4() test_cache_map_voc1() test_cache_map_voc2() + test_cache_map_mindrecord1() + test_cache_map_mindrecord2() test_cache_map_python_sampler1() test_cache_map_python_sampler2() test_cache_map_nested_repeat()