diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index f4d8969f10b..a3dd10ef2de 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -23,6 +23,7 @@ #include "minddata/dataset/core/tensor.h" #include "minddata/dataset/engine/runtime_context.h" +#include "minddata/dataset/include/dataset/constants.h" #include "minddata/dataset/include/dataset/iterator.h" #include "minddata/dataset/include/dataset/samplers.h" #include "minddata/dataset/include/dataset/transforms.h" @@ -1001,19 +1002,7 @@ ManifestDataset::ManifestDataset(const std::vector &dataset_file, const st MindDataDataset::MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::shared_ptr &sampler, const nlohmann::json *padded_sample, - int64_t num_padded, const std::shared_ptr &cache) { - auto sampler_obj = sampler ? sampler->Parse() : nullptr; - nlohmann::json sample = nullptr; - if (padded_sample) { - sample = *padded_sample; - } - auto ds = std::make_shared(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, - sample, num_padded, cache); - ir_node_ = std::static_pointer_cast(ds); -} -MindDataDataset::MindDataDataset(const std::vector &dataset_file, - const std::vector> &columns_list, const Sampler *sampler, - const nlohmann::json *padded_sample, int64_t num_padded, + int64_t num_padded, ShuffleMode shuffle_mode, const std::shared_ptr &cache) { auto sampler_obj = sampler ? sampler->Parse() : nullptr; nlohmann::json sample = nullptr; @@ -1021,13 +1010,29 @@ MindDataDataset::MindDataDataset(const std::vector &dataset_file, sample = *padded_sample; } auto ds = std::make_shared(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, - sample, num_padded, cache); + sample, num_padded, shuffle_mode, cache); ir_node_ = std::static_pointer_cast(ds); } + +MindDataDataset::MindDataDataset(const std::vector &dataset_file, + const std::vector> &columns_list, const Sampler *sampler, + const nlohmann::json *padded_sample, int64_t num_padded, ShuffleMode shuffle_mode, + const std::shared_ptr &cache) { + auto sampler_obj = sampler ? sampler->Parse() : nullptr; + nlohmann::json sample = nullptr; + if (padded_sample) { + sample = *padded_sample; + } + auto ds = std::make_shared(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, + sample, num_padded, shuffle_mode, cache); + ir_node_ = std::static_pointer_cast(ds); +} + MindDataDataset::MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded, const std::shared_ptr &cache) { + int64_t num_padded, ShuffleMode shuffle_mode, + const std::shared_ptr &cache) { auto sampler_obj = sampler.get().Parse(); nlohmann::json sample = nullptr; if (padded_sample) { @@ -1035,26 +1040,14 @@ MindDataDataset::MindDataDataset(const std::vector &dataset_file, } auto ds = std::make_shared(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, - sample, num_padded, cache); + sample, num_padded, shuffle_mode, cache); ir_node_ = std::static_pointer_cast(ds); } + MindDataDataset::MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::shared_ptr &sampler, const nlohmann::json *padded_sample, - int64_t num_padded, const std::shared_ptr &cache) { - auto sampler_obj = sampler ? sampler->Parse() : nullptr; - nlohmann::json sample = nullptr; - if (padded_sample) { - sample = *padded_sample; - } - - auto ds = std::make_shared(VectorCharToString(dataset_files), VectorCharToString(columns_list), - sampler_obj, sample, num_padded, cache); - ir_node_ = std::static_pointer_cast(ds); -} -MindDataDataset::MindDataDataset(const std::vector> &dataset_files, - const std::vector> &columns_list, const Sampler *sampler, - const nlohmann::json *padded_sample, int64_t num_padded, + int64_t num_padded, ShuffleMode shuffle_mode, const std::shared_ptr &cache) { auto sampler_obj = sampler ? sampler->Parse() : nullptr; nlohmann::json sample = nullptr; @@ -1063,20 +1056,37 @@ MindDataDataset::MindDataDataset(const std::vector> &dataset_f } auto ds = std::make_shared(VectorCharToString(dataset_files), VectorCharToString(columns_list), - sampler_obj, sample, num_padded, cache); + sampler_obj, sample, num_padded, shuffle_mode, cache); ir_node_ = std::static_pointer_cast(ds); } + +MindDataDataset::MindDataDataset(const std::vector> &dataset_files, + const std::vector> &columns_list, const Sampler *sampler, + const nlohmann::json *padded_sample, int64_t num_padded, ShuffleMode shuffle_mode, + const std::shared_ptr &cache) { + auto sampler_obj = sampler ? sampler->Parse() : nullptr; + nlohmann::json sample = nullptr; + if (padded_sample) { + sample = *padded_sample; + } + + auto ds = std::make_shared(VectorCharToString(dataset_files), VectorCharToString(columns_list), + sampler_obj, sample, num_padded, shuffle_mode, cache); + ir_node_ = std::static_pointer_cast(ds); +} + MindDataDataset::MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded, const std::shared_ptr &cache) { + int64_t num_padded, ShuffleMode shuffle_mode, + const std::shared_ptr &cache) { auto sampler_obj = sampler.get().Parse(); nlohmann::json sample = nullptr; if (padded_sample) { sample = *padded_sample; } auto ds = std::make_shared(VectorCharToString(dataset_files), VectorCharToString(columns_list), - sampler_obj, sample, num_padded, cache); + sampler_obj, sample, num_padded, shuffle_mode, cache); ir_node_ = std::static_pointer_cast(ds); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc index 7d1b4137e2d..6811df594a6 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc @@ -173,25 +173,25 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) { (void)py::class_>(*m, "MindDataNode", "to create a MindDataNode") .def(py::init([](std::string dataset_file, py::list columns_list, py::handle sampler, - py::dict padded_sample, int64_t num_padded) { + py::dict padded_sample, int64_t num_padded, ShuffleMode shuffle_mode) { nlohmann::json padded_sample_json; std::map sample_bytes; THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes)); auto minddata = std::make_shared(dataset_file, toStringVector(columns_list), toSamplerObj(sampler, true), padded_sample_json, - num_padded, nullptr); + num_padded, shuffle_mode, nullptr); minddata->SetSampleBytes(&sample_bytes); THROW_IF_ERROR(minddata->ValidateParams()); return minddata; })) .def(py::init([](py::list dataset_file, py::list columns_list, py::handle sampler, - py::dict padded_sample, int64_t num_padded) { + py::dict padded_sample, int64_t num_padded, ShuffleMode shuffle_mode) { nlohmann::json padded_sample_json; std::map sample_bytes; THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes)); auto minddata = std::make_shared( toStringVector(dataset_file), toStringVector(columns_list), toSamplerObj(sampler, true), - padded_sample_json, num_padded, nullptr); + padded_sample_json, num_padded, shuffle_mode, nullptr); minddata->SetSampleBytes(&sample_bytes); THROW_IF_ERROR(minddata->ValidateParams()); return minddata; diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/mindrecord/include/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/mindrecord/include/bindings.cc index 982afb41a15..38bf983bb6e 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/mindrecord/include/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/mindrecord/include/bindings.cc @@ -18,6 +18,7 @@ #include "minddata/dataset/api/python/pybind_register.h" +#include "minddata/dataset/include/dataset/constants.h" #include "minddata/dataset/util/random.h" #include "minddata/mindrecord/include/shard_distributed_sample.h" #include "minddata/mindrecord/include/shard_operator.h" @@ -84,5 +85,14 @@ PYBIND_REGISTER( })); })); +PYBIND_REGISTER(ShuffleMode, 1, ([](const py::module *m) { + (void)py::enum_(*m, "ShuffleMode", py::arithmetic()) + .value("FALSE", ShuffleMode::kFalse) + .value("FILES", ShuffleMode::kFiles) + .value("GLOBAL", ShuffleMode::kGlobal) + .value("INFILE", ShuffleMode::kInfile) + .export_values(); + })); + } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc index 2a6abf2ba4b..a087a4a1e0c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc @@ -53,6 +53,7 @@ MindRecordOp::Builder::Builder() : build_dataset_file_({}), builder_sampler_(nul build_load_dataset_ = false; build_num_padded_ = 0; build_sample_ = nullptr; + build_shuffle_mode_ = ShuffleMode::kGlobal; } // The builder "build" method creates the final object. @@ -75,7 +76,7 @@ Status MindRecordOp::Builder::Build(std::shared_ptr *ptr) { new_mind_record_op = std::make_shared( build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_, build_op_connector_queue_size_, - build_columns_to_load_, build_operators_, build_num_padded_, sample_json, build_sample_bytes_, + build_columns_to_load_, build_operators_, build_num_padded_, sample_json, build_sample_bytes_, build_shuffle_mode_, std::move(shard_reader), builder_sampler_); RETURN_IF_NOT_OK(new_mind_record_op->Init()); @@ -118,7 +119,8 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector &columns_to_load, const std::vector> &operators, int64_t num_padded, const mindrecord::json &sample_json, const std::map &sample_bytes, - std::unique_ptr shard_reader, std::shared_ptr sampler) + const ShuffleMode shuffle_mode, std::unique_ptr shard_reader, + std::shared_ptr sampler) : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)), dataset_file_(dataset_file), load_dataset_(load_dataset), @@ -129,6 +131,7 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector build_sample_bytes_; std::shared_ptr builder_sampler_; + ShuffleMode build_shuffle_mode_; }; // Constructor of the MindRecordOp. @@ -152,7 +158,8 @@ class MindRecordOp : public MappableLeafOp { int32_t op_connector_queue_size, const std::vector &columns_to_load, const std::vector> &operators, int64_t num_padded_, const mindrecord::json &sample_json, const std::map &sample_bytes_, - std::unique_ptr shard_reader, std::shared_ptr sampler); + const ShuffleMode shuffle_mode_, std::unique_ptr shard_reader, + std::shared_ptr sampler); // Destructor ~MindRecordOp() override; @@ -240,6 +247,8 @@ class MindRecordOp : public MappableLeafOp { std::unique_ptr shard_reader_; std::mutex ended_worker_mutex_; + + ShuffleMode shuffle_mode_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc index 0303c4140c9..405284f653e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.cc @@ -35,7 +35,7 @@ namespace dataset { MindDataNode::MindDataNode(const std::vector &dataset_files, const std::vector &columns_list, const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, - std::shared_ptr cache = nullptr) + ShuffleMode shuffle_mode, std::shared_ptr cache) : MappableSourceNode(std::move(cache)), dataset_file_(std::string()), dataset_files_(dataset_files), @@ -45,11 +45,12 @@ MindDataNode::MindDataNode(const std::vector &dataset_files, const sampler_(std::make_shared()), padded_sample_(padded_sample), sample_bytes_({}), - num_padded_(num_padded) {} + num_padded_(num_padded), + shuffle_mode_(shuffle_mode) {} MindDataNode::MindDataNode(const std::string &dataset_file, const std::vector &columns_list, const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, - std::shared_ptr cache = nullptr) + ShuffleMode shuffle_mode, std::shared_ptr cache) : MappableSourceNode(std::move(cache)), dataset_file_(dataset_file), dataset_files_({}), @@ -59,15 +60,18 @@ MindDataNode::MindDataNode(const std::string &dataset_file, const std::vector()), padded_sample_(padded_sample), sample_bytes_({}), - num_padded_(num_padded) {} + num_padded_(num_padded), + shuffle_mode_(shuffle_mode) {} std::shared_ptr MindDataNode::Copy() { std::shared_ptr node; std::shared_ptr sampler = (input_sampler_ == nullptr) ? nullptr : input_sampler_->SamplerCopy(); if (dataset_files_.empty()) { - node = std::make_shared(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_, cache_); + node = std::make_shared(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_, + shuffle_mode_, cache_); } else { - node = std::make_shared(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_, cache_); + node = std::make_shared(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_, + shuffle_mode_, cache_); } node->SetSampleBytes(&sample_bytes_); return node; @@ -129,7 +133,7 @@ Status MindDataNode::ValidateParams() { // Helper function to create runtime sampler for minddata dataset Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr &sampler, std::vector> *operators_, - int64_t num_padded) { + int64_t num_padded, ShuffleMode shuffle_mode) { std::shared_ptr op = sampler->BuildForMindDataset(); if (op == nullptr) { std::string err_msg = @@ -140,10 +144,15 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr> stack_ops; while (op != nullptr) { - auto sampler_op = std::dynamic_pointer_cast(op); - if (sampler_op && num_padded > 0) { - sampler_op->SetNumPaddedSamples(num_padded); - stack_ops.push(sampler_op); + // update the shuffle mode for sampler op or shuffle op + if (shuffle_mode != ShuffleMode::kFalse) { + op->UpdateShuffleMode(shuffle_mode); + } + + auto distributed_sampler_op = std::dynamic_pointer_cast(op); + if (distributed_sampler_op && num_padded > 0) { + distributed_sampler_op->SetNumPaddedSamples(num_padded); + stack_ops.push(distributed_sampler_op); } else { stack_ops.push(op); } @@ -160,7 +169,7 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr *sample_bytes) { sample_bytes_ = *sample_bytes; } Status MindDataNode::Build(std::vector> *const node_ops) { - RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators_, num_padded_)); + RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators_, num_padded_, shuffle_mode_)); std::shared_ptr sampler_rt = nullptr; // Build the sampler IR into a runtime sampler. @@ -195,11 +204,11 @@ Status MindDataNode::Build(std::vector> *const node_o std::vector dataset_file_vec_ = {dataset_file_}; mindrecord_op = std::make_shared( num_workers_, dataset_file_vec_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_, - padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt)); + padded_sample_, sample_bytes_, shuffle_mode_, std::move(shard_reader), std::move(sampler_rt)); } else { mindrecord_op = std::make_shared( num_workers_, dataset_files_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_, - padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt)); + padded_sample_, sample_bytes_, shuffle_mode_, std::move(shard_reader), std::move(sampler_rt)); } RETURN_IF_NOT_OK(mindrecord_op->Init()); @@ -226,7 +235,7 @@ Status MindDataNode::GetDatasetSize(const std::shared_ptr &si } int64_t num_rows = -1; std::vector> operators; - RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators, num_padded_)); + RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators, num_padded_, shuffle_mode_)); if (search_for_pattern_) { dataset_files_ = {dataset_file_}; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h index 977f1a81f6b..127ba5b440d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/minddata_node.h @@ -33,12 +33,12 @@ class MindDataNode : public MappableSourceNode { /// \brief Constructor MindDataNode(const std::vector &dataset_files, const std::vector &columns_list, const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, - std::shared_ptr cache); + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, std::shared_ptr cache = nullptr); /// \brief Constructor MindDataNode(const std::string &dataset_file, const std::vector &columns_list, const std::shared_ptr &sampler, nlohmann::json padded_sample, int64_t num_padded, - std::shared_ptr cache); + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, std::shared_ptr cache = nullptr); /// \brief Destructor ~MindDataNode() = default; @@ -72,7 +72,7 @@ class MindDataNode : public MappableSourceNode { /// \return Status Status::OK() if input sampler is valid Status BuildMindDatasetSamplerChain(const std::shared_ptr &sampler, std::vector> *operators_, - int64_t num_padded); + int64_t num_padded, ShuffleMode shuffle_mode); /// \brief Set sample_bytes when padded_sample has py::byte value /// \note Pybind will use this function to set sample_bytes into MindDataNode @@ -118,6 +118,7 @@ class MindDataNode : public MappableSourceNode { std::map sample_bytes_; // enable in python int64_t num_padded_; std::vector> operators_; + ShuffleMode shuffle_mode_; }; } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/include/dataset/constants.h b/mindspore/ccsrc/minddata/dataset/include/dataset/constants.h index 381a1c63903..e4db6f88c00 100644 --- a/mindspore/ccsrc/minddata/dataset/include/dataset/constants.h +++ b/mindspore/ccsrc/minddata/dataset/include/dataset/constants.h @@ -36,7 +36,7 @@ enum class DatasetType { kUnknown, kArrow, kTf }; enum class TensorImpl { kNone, kFlexible, kCv, kNP }; // Possible values for shuffle -enum class ShuffleMode { kFalse = 0, kFiles = 1, kGlobal = 2 }; +enum class ShuffleMode { kFalse = 0, kFiles = 1, kGlobal = 2, kInfile = 3 }; // Possible values for Border types enum class BorderType { kConstant = 0, kEdge = 1, kReflect = 2, kSymmetric = 3 }; diff --git a/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h b/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h index c999893d9d0..be039601fd4 100644 --- a/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/dataset/datasets.h @@ -1247,25 +1247,31 @@ class MindDataDataset : public Dataset { public: explicit MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::shared_ptr &sampler, const nlohmann::json *padded_sample, - int64_t num_padded, const std::shared_ptr &cache); + int64_t num_padded, ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr); explicit MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded, - const std::shared_ptr &cache); + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr); explicit MindDataDataset(const std::vector &dataset_file, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded, const std::shared_ptr &cache); + int64_t num_padded, ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr); explicit MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::shared_ptr &sampler, const nlohmann::json *padded_sample, int64_t num_padded, - const std::shared_ptr &cache); + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr); explicit MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded, - const std::shared_ptr &cache); + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr); explicit MindDataDataset(const std::vector> &dataset_files, const std::vector> &columns_list, const std::reference_wrapper sampler, const nlohmann::json *padded_sample, - int64_t num_padded, const std::shared_ptr &cache); + int64_t num_padded, ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr); ~MindDataDataset() = default; }; @@ -1284,9 +1290,10 @@ class MindDataDataset : public Dataset { inline std::shared_ptr MindData( const std::string &dataset_file, const std::vector &columns_list = {}, const std::shared_ptr &sampler = std::make_shared(), nlohmann::json *padded_sample = nullptr, - int64_t num_padded = 0, const std::shared_ptr &cache = nullptr) { + int64_t num_padded = 0, ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr) { return std::make_shared(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded, cache); + padded_sample, num_padded, shuffle_mode, cache); } /// \brief Function to create a MindDataDataset @@ -1302,9 +1309,10 @@ inline std::shared_ptr MindData( inline std::shared_ptr MindData(const std::string &dataset_file, const std::vector &columns_list, const Sampler *sampler, nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, const std::shared_ptr &cache = nullptr) { return std::make_shared(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded, cache); + padded_sample, num_padded, shuffle_mode, cache); } /// \brief Function to create a MindDataDataset @@ -1321,9 +1329,10 @@ inline std::shared_ptr MindData(const std::string &dataset_file const std::vector &columns_list, const std::reference_wrapper sampler, nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, const std::shared_ptr &cache = nullptr) { return std::make_shared(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded, cache); + padded_sample, num_padded, shuffle_mode, cache); } /// \brief Function to create a MindDataDataset @@ -1340,9 +1349,10 @@ inline std::shared_ptr MindData(const std::string &dataset_file inline std::shared_ptr MindData( const std::vector &dataset_files, const std::vector &columns_list = {}, const std::shared_ptr &sampler = std::make_shared(), nlohmann::json *padded_sample = nullptr, - int64_t num_padded = 0, const std::shared_ptr &cache = nullptr) { + int64_t num_padded = 0, ShuffleMode shuffle_mode = ShuffleMode::kGlobal, + const std::shared_ptr &cache = nullptr) { return std::make_shared(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded, cache); + padded_sample, num_padded, shuffle_mode, cache); } /// \brief Function to create a MindDataDataset @@ -1357,9 +1367,10 @@ inline std::shared_ptr MindData( inline std::shared_ptr MindData(const std::vector &dataset_files, const std::vector &columns_list, const Sampler *sampler, nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, const std::shared_ptr &cache = nullptr) { return std::make_shared(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded, cache); + padded_sample, num_padded, shuffle_mode, cache); } /// \brief Function to create a MindDataDataset @@ -1375,9 +1386,10 @@ inline std::shared_ptr MindData(const std::vector const std::vector &columns_list, const std::reference_wrapper sampler, nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, + ShuffleMode shuffle_mode = ShuffleMode::kGlobal, const std::shared_ptr &cache = nullptr) { return std::make_shared(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, - padded_sample, num_padded, cache); + padded_sample, num_padded, shuffle_mode, cache); } class MnistDataset : public Dataset { diff --git a/mindspore/ccsrc/minddata/mindrecord/include/shard_operator.h b/mindspore/ccsrc/minddata/mindrecord/include/shard_operator.h index e7719cda9d3..f206ef77ca2 100644 --- a/mindspore/ccsrc/minddata/mindrecord/include/shard_operator.h +++ b/mindspore/ccsrc/minddata/mindrecord/include/shard_operator.h @@ -18,7 +18,9 @@ #define MINDSPORE_CCSRC_MINDDATA_MINDRECORD_INCLUDE_SHARD_OPERATOR_H_ #include +#include #include "minddata/mindrecord/include/shard_task_list.h" +#include "minddata/dataset/include/dataset/constants.h" namespace mindspore { namespace mindrecord { @@ -38,6 +40,7 @@ class __attribute__((visibility("default"))) ShardOperator { } return SUCCESS; } + virtual bool HasChildOp() { return child_op_ != nullptr; } virtual MSRStatus SetChildOp(std::shared_ptr child_op) { @@ -55,8 +58,26 @@ class __attribute__((visibility("default"))) ShardOperator { virtual int64_t GetNumSamples(int64_t dataset_size, int64_t num_classes) { return 0; } + virtual void UpdateShuffleMode(dataset::ShuffleMode shuffle_mode) { shuffle_mode_ = shuffle_mode; } + + virtual dataset::ShuffleMode GetShuffleMode() { return shuffle_mode_; } + + virtual void SetShardSampleCount(const std::vector &shard_sample_count) { + shard_sample_count_ = shard_sample_count; + } + + virtual std::vector GetShardSampleCount() { return shard_sample_count_; } + private: std::shared_ptr child_op_ = nullptr; + + // indicate shard_id : inc_count + // 0 : 15 - shard0 has 15 samples + // 1 : 41 - shard1 has 26 samples + // 2 : 58 - shard2 has 17 samples + std::vector shard_sample_count_; + + dataset::ShuffleMode shuffle_mode_ = dataset::ShuffleMode::kGlobal; }; } // namespace mindrecord } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/mindrecord/include/shard_shuffle.h b/mindspore/ccsrc/minddata/mindrecord/include/shard_shuffle.h index b22b7d9f87f..1eda526a773 100644 --- a/mindspore/ccsrc/minddata/mindrecord/include/shard_shuffle.h +++ b/mindspore/ccsrc/minddata/mindrecord/include/shard_shuffle.h @@ -39,6 +39,12 @@ class __attribute__((visibility("default"))) ShardShuffle : public ShardOperator // Private helper function MSRStatus CategoryShuffle(ShardTaskList &tasks); + // Keep the file sequence the same but shuffle the data within each file + MSRStatus ShuffleInfile(ShardTaskList &tasks); + + // Shuffle the file sequence but keep the order of data within each file + MSRStatus ShuffleFiles(ShardTaskList &tasks); + uint32_t shuffle_seed_; int64_t no_of_samples_; bool replacement_; diff --git a/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc index f86da22ff58..add6fe6ef5d 100644 --- a/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc @@ -148,6 +148,10 @@ MSRStatus ShardReader::Init(const std::vector &file_paths, bool loa } num_rows_ = 0; auto row_group_summary = ReadRowGroupSummary(); + + // clear the shard_sample_count_, because it will be insert when Launch func + shard_sample_count_.clear(); + for (const auto &rg : row_group_summary) { num_rows_ += std::get<3>(rg); } @@ -305,6 +309,7 @@ std::vector> ShardReader::ReadRowGroupSummar shard_sample_count_.push_back(total_count); } } + return row_group_summary; } @@ -1224,6 +1229,7 @@ MSRStatus ShardReader::CreateTasks(const std::vector(op)) continue; + + if (std::dynamic_pointer_cast(op) || std::dynamic_pointer_cast(op)) { + op->SetShardSampleCount(shard_sample_count_); + } + if (SUCCESS != (*op)(tasks_)) { return FAILED; } diff --git a/mindspore/ccsrc/minddata/mindrecord/meta/shard_distributed_sample.cc b/mindspore/ccsrc/minddata/mindrecord/meta/shard_distributed_sample.cc index 4b4d57ee407..93c3c76ecfd 100644 --- a/mindspore/ccsrc/minddata/mindrecord/meta/shard_distributed_sample.cc +++ b/mindspore/ccsrc/minddata/mindrecord/meta/shard_distributed_sample.cc @@ -72,6 +72,8 @@ MSRStatus ShardDistributedSample::PreExecute(ShardTaskList &tasks) { tasks = task_; } if (shuffle_ == true) { + shuffle_op_->SetShardSampleCount(GetShardSampleCount()); + shuffle_op_->UpdateShuffleMode(GetShuffleMode()); if (SUCCESS != (*shuffle_op_)(tasks)) { return FAILED; } diff --git a/mindspore/ccsrc/minddata/mindrecord/meta/shard_shuffle.cc b/mindspore/ccsrc/minddata/mindrecord/meta/shard_shuffle.cc index b7c149a4197..5cd3f754ac8 100644 --- a/mindspore/ccsrc/minddata/mindrecord/meta/shard_shuffle.cc +++ b/mindspore/ccsrc/minddata/mindrecord/meta/shard_shuffle.cc @@ -66,6 +66,97 @@ MSRStatus ShardShuffle::CategoryShuffle(ShardTaskList &tasks) { return SUCCESS; } +MSRStatus ShardShuffle::ShuffleFiles(ShardTaskList &tasks) { + if (no_of_samples_ == 0) { + no_of_samples_ = static_cast(tasks.Size()); + } + if (no_of_samples_ <= 0) { + MS_LOG(ERROR) << "no_of_samples need to be positive."; + return FAILED; + } + auto shard_sample_cout = GetShardSampleCount(); + + // shuffle the files index + std::vector shuffle_files; + for (uint32_t i = 0; i < shard_sample_cout.size(); i++) { + shuffle_files.push_back(i); + } + std::shuffle(shuffle_files.begin(), shuffle_files.end(), std::default_random_engine(shuffle_seed_)); + + // reconstruct the permutation between files + // -- before -- + // file1: [0, 1, 2] + // file2: [3, 4, 5, 6] + // file3: [7, 8] + // file4: [9, 10] + // files: [file1, file2, file3, file4] + // permutation: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + // -- after -- + // files: [file4, file1, file3, file2] + // permutation : [9, 10, 0, 1, 2, 7, 8, 3, 4, 5, 6] + auto original_permutation = tasks.permutation_; + uint32_t whole_index = 0; + for (uint32_t i = 0; i < shuffle_files.size(); i++) { + uint32_t start_index = 0; + uint32_t current_size = 0; + if (shuffle_files[i] == 0) { + start_index = 0; + current_size = shard_sample_cout[shuffle_files[i]]; + } else { + start_index = shard_sample_cout[shuffle_files[i] - 1]; + current_size = shard_sample_cout[shuffle_files[i]] - start_index; + } + std::copy(original_permutation.begin() + start_index, original_permutation.begin() + start_index + current_size, + tasks.permutation_.begin() + whole_index); + whole_index += current_size; + } + + auto total_no = static_cast(tasks.Size()); + size_t samples_to_assign = + (no_of_samples_ > 0 && no_of_samples_ < total_no) ? no_of_samples_ : tasks.sample_ids_.size(); + ShardTaskList new_tasks; + for (size_t i = 0; i < samples_to_assign; ++i) { + new_tasks.AssignTask(tasks, tasks.permutation_[i]); + } + ShardTaskList::TaskListSwap(tasks, new_tasks); +} + +MSRStatus ShardShuffle::ShuffleInfile(ShardTaskList &tasks) { + if (no_of_samples_ == 0) { + no_of_samples_ = static_cast(tasks.Size()); + } + if (no_of_samples_ <= 0) { + MS_LOG(ERROR) << "no_of_samples need to be positive."; + return FAILED; + } + // reconstruct the permutation in file + // -- before -- + // file1: [0, 1, 2] + // file2: [3, 4, 5, 6] + // file3: [7, 8] + // file4: [9, 10] + // files: [file1, file2, file3, file4] + // permutation: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + // -- after -- + // permutation: [2, 0, 1, 4, 6, 3, 5, 8, 7, 9, 10] + auto shard_sample_cout = GetShardSampleCount(); + uint32_t start_index = 0; + for (uint32_t i = 0; i < shard_sample_cout.size(); i++) { + auto current_size = shard_sample_cout[i] - start_index; + std::shuffle(tasks.permutation_.begin() + start_index, tasks.permutation_.begin() + start_index + current_size, + std::default_random_engine(shuffle_seed_)); + start_index = shard_sample_cout[i]; + } + auto total_no = static_cast(tasks.Size()); + ShardTaskList new_tasks; + size_t samples_to_assign = + (no_of_samples_ > 0 && no_of_samples_ < total_no) ? no_of_samples_ : tasks.sample_ids_.size(); + for (size_t i = 0; i < samples_to_assign; ++i) { + new_tasks.AssignTask(tasks, tasks.permutation_[i]); + } + ShardTaskList::TaskListSwap(tasks, new_tasks); +} + MSRStatus ShardShuffle::Execute(ShardTaskList &tasks) { if (reshuffle_each_epoch_) shuffle_seed_++; if (tasks.categories < 1) { @@ -75,28 +166,40 @@ MSRStatus ShardShuffle::Execute(ShardTaskList &tasks) { if (tasks.permutation_.empty() == true) { tasks.MakePerm(); } - if (replacement_ == true) { - ShardTaskList new_tasks; - if (no_of_samples_ == 0) no_of_samples_ = static_cast(tasks.sample_ids_.size()); - if (no_of_samples_ <= 0) { - MS_LOG(ERROR) << "no_of_samples need to be positive."; - return FAILED; - } - for (uint32_t i = 0; i < no_of_samples_; ++i) { - new_tasks.AssignTask(tasks, tasks.GetRandomTaskID()); - } + if (GetShuffleMode() == dataset::ShuffleMode::kGlobal) { + if (replacement_ == true) { + ShardTaskList new_tasks; + if (no_of_samples_ == 0) no_of_samples_ = static_cast(tasks.sample_ids_.size()); + if (no_of_samples_ <= 0) { + MS_LOG(ERROR) << "no_of_samples need to be positive."; + return FAILED; + } + for (uint32_t i = 0; i < no_of_samples_; ++i) { + new_tasks.AssignTask(tasks, tasks.GetRandomTaskID()); + } - ShardTaskList::TaskListSwap(tasks, new_tasks); - } else { - std::shuffle(tasks.permutation_.begin(), tasks.permutation_.end(), std::default_random_engine(shuffle_seed_)); - auto total_no = static_cast(tasks.Size()); - ShardTaskList new_tasks; - size_t samples_to_assign = - (no_of_samples_ > 0 && no_of_samples_ < total_no) ? no_of_samples_ : tasks.sample_ids_.size(); - for (size_t i = 0; i < samples_to_assign; ++i) { - new_tasks.AssignTask(tasks, tasks.permutation_[i]); + ShardTaskList::TaskListSwap(tasks, new_tasks); + } else { + std::shuffle(tasks.permutation_.begin(), tasks.permutation_.end(), std::default_random_engine(shuffle_seed_)); + auto total_no = static_cast(tasks.Size()); + ShardTaskList new_tasks; + size_t samples_to_assign = + (no_of_samples_ > 0 && no_of_samples_ < total_no) ? no_of_samples_ : tasks.sample_ids_.size(); + for (size_t i = 0; i < samples_to_assign; ++i) { + new_tasks.AssignTask(tasks, tasks.permutation_[i]); + } + ShardTaskList::TaskListSwap(tasks, new_tasks); + } + } else if (GetShuffleMode() == dataset::ShuffleMode::kInfile) { + auto ret = ShuffleInfile(tasks); + if (ret != SUCCESS) { + return ret; + } + } else if (GetShuffleMode() == dataset::ShuffleMode::kFiles) { + auto ret = ShuffleFiles(tasks); + if (ret != SUCCESS) { + return ret; } - ShardTaskList::TaskListSwap(tasks, new_tasks); } } else { // shuffle unit like: (a1, b1, c1),(a2, b2, c2),..., (an, bn, cn) return this->CategoryShuffle(tasks); diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index cae80a46607..7aba4a353d2 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -72,7 +72,41 @@ except ModuleNotFoundError: class Shuffle(str, Enum): GLOBAL: str = "global" - FILES: str = "file" + FILES: str = "files" + INFILE: str = "infile" + + +ShuffleToShuffleMode = {Shuffle.FILES: cde.ShuffleMode.FILES, + Shuffle.GLOBAL: cde.ShuffleMode.GLOBAL, + Shuffle.INFILE: cde.ShuffleMode.INFILE} + + +def shuffle_to_shuffle_mode(shuffle): + """class Shuffle Enum to int""" + shuffle_mode = cde.ShuffleMode.GLOBAL # Global shuffle + if not isinstance(shuffle, Shuffle): + if shuffle is None or shuffle: + shuffle_mode = cde.ShuffleMode.GLOBAL # Global shuffle + else: + shuffle_mode = cde.ShuffleMode.FALSE # No shuffle + else: + shuffle_mode = ShuffleToShuffleMode[shuffle] + return shuffle_mode + + +def shuffle_to_bool(shuffle): + """class Shuffle Enum to bool""" + shuffle_bool = True + if not isinstance(shuffle, Shuffle): + if shuffle is None: + shuffle_bool = None + elif shuffle: + shuffle_bool = True + else: + shuffle_bool = False + else: + shuffle_bool = True + return shuffle_bool @check_zip @@ -1660,8 +1694,8 @@ class SourceDataset(Dataset): self.shard_id = replace_none(shard_id, 0) if shuffle is not None and not isinstance(shuffle, (bool, Shuffle)): - raise TypeError( - "shuffle must be of boolean or enum of 'Shuffle' values like 'Shuffle.GLOBAL' or 'Shuffle.FILES'.") + raise TypeError("shuffle must be of boolean or enum of 'Shuffle' values like 'Shuffle.GLOBAL' or " + "'Shuffle.FILES' or 'Shuffle.INFILE'.") self.shuffle_flag = 2 # Global shuffle if not isinstance(shuffle, Shuffle): @@ -1674,6 +1708,8 @@ class SourceDataset(Dataset): self.shuffle_flag = 2 # Global shuffle elif shuffle == Shuffle.FILES: self.shuffle_flag = 1 # Files shuffle + elif shuffle == Shuffle.INFILE: + self.shuffle_flag = 3 # Infile shuffle def parse(self, children=None): raise NotImplementedError("Dataset has to implement parse method.") @@ -3121,8 +3157,18 @@ class MindDataset(MappableDataset): it represents for a list of dataset files to be read directly. columns_list (list[str], optional): List of columns to be read (default=None). num_parallel_workers (int, optional): The number of readers (default=None). - shuffle (bool, optional): Whether or not to perform shuffle on the dataset - (default=None, performs shuffle). + shuffle (Union[bool, Shuffle level], optional): Perform reshuffling of the data every epoch + (default=None, performs global shuffle). + If shuffle is False, no shuffling will be performed; + If shuffle is True, the behavior is the same as setting shuffle to be Shuffle.GLOBAL + Otherwise, there are three levels of shuffling: + + - Shuffle.GLOBAL: Global shuffle of all rows of data in dataset. + + - Shuffle.FILES: Shuffle the file sequence but keep the order of data within each file. + + - Shuffle.INFILE: Keep the file sequence the same but shuffle the data within each file. + num_shards (int, optional): Number of shards that the dataset will be divided into (default=None). When this argument is specified, 'num_samples' reflects the max sample number of per shard. shard_id (int, optional): The shard ID within num_shards (default=None). This @@ -3151,20 +3197,23 @@ class MindDataset(MappableDataset): def parse(self, children=None): return cde.MindDataNode(self.dataset_file, self.columns_list, self.sampler, self.new_padded_sample, - self.num_padded) + self.num_padded, shuffle_to_shuffle_mode(self.shuffle_option)) @check_minddataset def __init__(self, dataset_file, columns_list=None, num_parallel_workers=None, shuffle=None, num_shards=None, shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None, cache=None): + if shuffle is not None and not isinstance(shuffle, (bool, Shuffle)): + raise TypeError("shuffle must be of boolean or enum of 'Shuffle' values like 'Shuffle.GLOBAL' or " + "'Shuffle.FILES' or 'Shuffle.INFILE'.") + self.shuffle_option = shuffle super().__init__(num_parallel_workers=num_parallel_workers, sampler=sampler, num_samples=num_samples, - shuffle=shuffle, num_shards=num_shards, shard_id=shard_id, cache=cache) + shuffle=shuffle_to_bool(shuffle), num_shards=num_shards, shard_id=shard_id, cache=cache) if isinstance(dataset_file, list): self.load_dataset = False else: self.load_dataset = True self.dataset_file = dataset_file self.columns_list = replace_none(columns_list, []) - self.shuffle_option = shuffle if shuffle is False: logger.warning("WARN: global shuffle is not used.") diff --git a/mindspore/dataset/engine/samplers.py b/mindspore/dataset/engine/samplers.py index 5e263e92c0d..98c1fb1ae86 100644 --- a/mindspore/dataset/engine/samplers.py +++ b/mindspore/dataset/engine/samplers.py @@ -370,7 +370,8 @@ class DistributedSampler(BuiltinSampler): def parse_for_minddataset(self): num_samples = self.num_samples if self.num_samples is not None else 0 - c_sampler = cde.MindrecordDistributedSampler(self.num_shards, self.shard_id, self.shuffle, + shuffle = self.shuffle if self.shuffle is not None else True + c_sampler = cde.MindrecordDistributedSampler(self.num_shards, self.shard_id, shuffle, self.seed, num_samples, self.offset) c_child_sampler = self.parse_child_for_minddataset() c_sampler.add_child(c_child_sampler) diff --git a/mindspore/dataset/engine/serializer_deserializer.py b/mindspore/dataset/engine/serializer_deserializer.py index 2ee8c148359..c8770e7e75b 100644 --- a/mindspore/dataset/engine/serializer_deserializer.py +++ b/mindspore/dataset/engine/serializer_deserializer.py @@ -404,7 +404,7 @@ def to_policy(op_list): def to_shuffle_mode(shuffle): if shuffle == 2: return "global" - if shuffle == 1: return "file" + if shuffle == 1: return "files" return False diff --git a/tests/ut/cpp/dataset/c_api_cache_test.cc b/tests/ut/cpp/dataset/c_api_cache_test.cc index 67fb7cfea81..343c86afff1 100644 --- a/tests/ut/cpp/dataset/c_api_cache_test.cc +++ b/tests/ut/cpp/dataset/c_api_cache_test.cc @@ -463,7 +463,8 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMindRecordCApi) { std::string file_path = datasets_root_path_ + "/../mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0"; // Create a MindRecord Dataset, 20 records in it - std::shared_ptr ds = MindData(file_path, {}, std::make_shared(), nullptr, 0, some_cache); + std::shared_ptr ds = MindData(file_path, {}, std::make_shared(), nullptr, 0, + ShuffleMode::kGlobal, some_cache); EXPECT_NE(ds, nullptr); // Create an iterator over the result of the above dataset diff --git a/tests/ut/python/dataset/test_minddataset.py b/tests/ut/python/dataset/test_minddataset.py index d201d4d4b7c..9c470c56b54 100644 --- a/tests/ut/python/dataset/test_minddataset.py +++ b/tests/ut/python/dataset/test_minddataset.py @@ -1950,6 +1950,623 @@ def test_write_with_float32_float64_float32_array_float64_array_and_MindDataset( os.remove("{}".format(mindrecord_file_name)) os.remove("{}.db".format(mindrecord_file_name)) +FILES = ["0.mindrecord", "1.mindrecord", "2.mindrecord", "3.mindrecord"] +ITEMS = [10, 14, 8, 20] +FILES_ITEMS = {FILES[0]: ITEMS[0], FILES[1]: ITEMS[1], FILES[2]: ITEMS[2], FILES[3]: ITEMS[3]} + +@pytest.fixture +def create_multi_mindrecord_files(): + """files: {0.mindrecord : 10, 1.mindrecord : 14, 2.mindrecord : 8, 3.mindrecord : 20}""" + try: + index = 0 + for filename in FILES_ITEMS: + key = filename + if os.path.exists(key): + os.remove("{}".format(key)) + os.remove("{}.db".format(key)) + + value = FILES_ITEMS[key] + data_list = [] + for i in range(value): + data = {} + data['id'] = i + index + data_list.append(data) + index += value + + writer = FileWriter(key) + schema = {"id": {"type": "int32"}} + writer.add_schema(schema, "data is so cool") + writer.write_raw_data(data_list) + writer.commit() + yield "yield_create_multi_mindrecord_files" + except Exception as error: + for filename in FILES_ITMES: + if os.path.exists(filename): + os.remove("{}".format(filename)) + os.remove("{}.db".format(filename)) + raise error + else: + for filename in FILES_ITEMS: + if os.path.exists(filename): + os.remove("{}".format(filename)) + os.remove("{}.db".format(filename)) + +def test_shuffle_with_global_infile_files(create_multi_mindrecord_files): + datas_all = [] + index = 0 + for filename in FILES_ITEMS: + value = FILES_ITEMS[filename] + data_list = [] + for i in range(value): + data = {} + data['id'] = np.array(i + index, dtype=np.int32) + data_list.append(data) + index += value + datas_all.append(data_list) + + # no shuffle parameter + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers) + assert data_set.get_dataset_size() == 52 + num_iter = 0 + datas_all_minddataset = [] + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + if num_iter == 9: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 23: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 31: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 51: + datas_all_minddataset.append(data_list) + data_list = [] + num_iter += 1 + assert data_set.get_dataset_size() == 52 + + assert len(datas_all) == len(datas_all_minddataset) + for i, _ in enumerate(datas_all): + assert len(datas_all[i]) == len(datas_all_minddataset[i]) + assert datas_all[i] != datas_all_minddataset[i] + + # shuffle=False + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=False) + assert data_set.get_dataset_size() == 52 + num_iter = 0 + datas_all_minddataset = [] + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + if num_iter == 9: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 23: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 31: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 51: + datas_all_minddataset.append(data_list) + data_list = [] + num_iter += 1 + assert data_set.get_dataset_size() == 52 + + assert len(datas_all) == len(datas_all_minddataset) + for i, _ in enumerate(datas_all): + assert len(datas_all[i]) == len(datas_all_minddataset[i]) + assert datas_all[i] == datas_all_minddataset[i] + + # shuffle=True + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=True) + assert data_set.get_dataset_size() == 52 + num_iter = 0 + datas_all_minddataset = [] + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + if num_iter == 9: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 23: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 31: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 51: + datas_all_minddataset.append(data_list) + data_list = [] + num_iter += 1 + assert data_set.get_dataset_size() == 52 + + assert len(datas_all) == len(datas_all_minddataset) + for i, _ in enumerate(datas_all): + assert len(datas_all[i]) == len(datas_all_minddataset[i]) + assert datas_all[i] != datas_all_minddataset[i] + + # shuffle=Shuffle.GLOBAL + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.GLOBAL) + assert data_set.get_dataset_size() == 52 + num_iter = 0 + datas_all_minddataset = [] + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + if num_iter == 9: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 23: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 31: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 51: + datas_all_minddataset.append(data_list) + data_list = [] + num_iter += 1 + assert data_set.get_dataset_size() == 52 + + assert len(datas_all) == len(datas_all_minddataset) + for i, _ in enumerate(datas_all): + assert len(datas_all[i]) == len(datas_all_minddataset[i]) + assert datas_all[i] != datas_all_minddataset[i] + + # shuffle=Shuffle.INFILE + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.INFILE) + assert data_set.get_dataset_size() == 52 + num_iter = 0 + datas_all_minddataset = [] + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + if num_iter == 9: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 23: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 31: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 51: + datas_all_minddataset.append(data_list) + data_list = [] + num_iter += 1 + assert data_set.get_dataset_size() == 52 + + def sort_list_with_dict(dict_in_list): + keys = [] + for item in dict_in_list: + for key in item: + keys.append(int(item[key])) + keys.sort() + data_list = [] + for item in keys: + data = {} + data['id'] = np.array(item, dtype=np.int32) + data_list.append(data) + return data_list + + assert len(datas_all) == len(datas_all_minddataset) + for i, _ in enumerate(datas_all): + assert len(datas_all[i]) == len(datas_all_minddataset[i]) + assert datas_all[i] != datas_all_minddataset[i] + # order the datas_all_minddataset + new_datas_all_minddataset = sort_list_with_dict(datas_all_minddataset[i]) + assert datas_all[i] == new_datas_all_minddataset + + # shuffle=Shuffle.FILES + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.FILES) + assert data_set.get_dataset_size() == 52 + + num_iter = 0 + data_list = [] + + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + num_iter += 1 + assert data_set.get_dataset_size() == 52 + + current_shard_size = 0 + current_shard_index = 0 + shard_count = 0 + datas_index = 0 + origin_index = [i for i in range(len(ITEMS))] + current_index = [] + while shard_count < len(ITEMS): + if data_list[datas_index]['id'] < 10: + current_shard_index = 0 + elif data_list[datas_index]['id'] < 24: + current_shard_index = 1 + elif data_list[datas_index]['id'] < 32: + current_shard_index = 2 + elif data_list[datas_index]['id'] < 52: + current_shard_index = 3 + else: + raise ValueError("Index out of range") + current_shard_size = ITEMS[current_shard_index] + + tmp_datas = data_list[datas_index:datas_index + current_shard_size] + current_index.append(current_shard_index) + assert len(datas_all[current_shard_index]) == len(tmp_datas) + assert datas_all[current_shard_index] == tmp_datas + + datas_index += current_shard_size + shard_count += 1 + assert origin_index != current_index + +def test_distributed_shuffle_with_global_infile_files(create_multi_mindrecord_files): + datas_all = [] + datas_all_samples = [] + index = 0 + for filename in FILES_ITEMS: + value = FILES_ITEMS[filename] + data_list = [] + for i in range(value): + data = {} + data['id'] = np.array(i + index, dtype=np.int32) + data_list.append(data) + datas_all_samples.append(data) + index += value + datas_all.append(data_list) + + # no shuffle parameter + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + num_shards=4, + shard_id=3) + assert data_set.get_dataset_size() == 13 + num_iter = 0 + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + num_iter += 1 + assert num_iter == 13 + assert data_list != datas_all_samples[3*13:] + + # shuffle=False + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=False, + num_shards=4, + shard_id=2) + assert data_set.get_dataset_size() == 13 + num_iter = 0 + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + num_iter += 1 + assert num_iter == 13 + assert data_list == datas_all_samples[2*13:3*13] + + # shuffle=True + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=True, + num_shards=4, + shard_id=1) + assert data_set.get_dataset_size() == 13 + num_iter = 0 + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + num_iter += 1 + assert num_iter == 13 + assert data_list != datas_all_samples[1*13:2*13] + + # shuffle=Shuffle.GLOBAL + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.GLOBAL, + num_shards=4, + shard_id=0) + assert data_set.get_dataset_size() == 13 + num_iter = 0 + data_list = [] + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + num_iter += 1 + assert num_iter == 13 + assert data_list != datas_all_samples[0:1*13] + + # shuffle=Shuffle.INFILE + output_datas = [] + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.INFILE, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + num_iter = 0 + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + output_datas.append(item) + num_iter += 1 + assert num_iter == 13 + + num_iter = 0 + datas_all_minddataset = [] + data_list = [] + for item in output_datas: + assert len(item) == 1 + data_list.append(item) + if num_iter == 9: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 23: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 31: + datas_all_minddataset.append(data_list) + data_list = [] + elif num_iter == 51: + datas_all_minddataset.append(data_list) + data_list = [] + num_iter += 1 + assert num_iter == 52 + + def sort_list_with_dict(dict_in_list): + keys = [] + for item in dict_in_list: + for key in item: + keys.append(int(item[key])) + keys.sort() + data_list = [] + for item in keys: + data = {} + data['id'] = np.array(item, dtype=np.int32) + data_list.append(data) + return data_list + + assert len(datas_all) == len(datas_all_minddataset) + for i, _ in enumerate(datas_all): + assert len(datas_all[i]) == len(datas_all_minddataset[i]) + assert datas_all[i] != datas_all_minddataset[i] + # order the datas_all_minddataset + new_datas_all_minddataset = sort_list_with_dict(datas_all_minddataset[i]) + assert datas_all[i] == new_datas_all_minddataset + + # shuffle=Shuffle.FILES + data_list = [] + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.FILES, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + num_iter = 0 + for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): + assert len(item) == 1 + data_list.append(item) + num_iter += 1 + assert num_iter == 13 + assert len(data_list) == 52 + + current_shard_size = 0 + current_shard_index = 0 + shard_count = 0 + datas_index = 0 + origin_index = [i for i in range(len(ITEMS))] + current_index = [] + while shard_count < len(ITEMS): + if data_list[datas_index]['id'] < 10: + current_shard_index = 0 + elif data_list[datas_index]['id'] < 24: + current_shard_index = 1 + elif data_list[datas_index]['id'] < 32: + current_shard_index = 2 + elif data_list[datas_index]['id'] < 52: + current_shard_index = 3 + else: + raise ValueError("Index out of range") + current_shard_size = ITEMS[current_shard_index] + + tmp_datas = data_list[datas_index:datas_index + current_shard_size] + current_index.append(current_shard_index) + assert len(datas_all[current_shard_index]) == len(tmp_datas) + assert datas_all[current_shard_index] == tmp_datas + + datas_index += current_shard_size + shard_count += 1 + assert origin_index != current_index + +def test_distributed_shuffle_with_multi_epochs(create_multi_mindrecord_files): + datas_all = [] + datas_all_samples = [] + index = 0 + for filename in FILES_ITEMS: + value = FILES_ITEMS[filename] + data_list = [] + for i in range(value): + data = {} + data['id'] = np.array(i + index, dtype=np.int32) + data_list.append(data) + datas_all_samples.append(data) + index += value + datas_all.append(data_list) + + epoch_size = 3 + + # no shuffle parameter + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + data_list = [] + dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True) + for epoch in range(epoch_size): # 3 epoch + num_iter = 0 + new_datas = [] + for item in dataset_iter: + assert len(item) == 1 + new_datas.append(item) + num_iter += 1 + assert num_iter == 13 + assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13] + assert data_list != new_datas + data_list = new_datas + + # shuffle=False + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=False, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + data_list = [] + dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True) + for epoch in range(epoch_size): # 3 epoch + num_iter = 0 + new_datas = [] + for item in dataset_iter: + assert len(item) == 1 + new_datas.append(item) + num_iter += 1 + assert num_iter == 13 + assert new_datas == datas_all_samples[shard_id*13:(shard_id+1)*13] + + # shuffle=True + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=True, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + data_list = [] + dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True) + for epoch in range(epoch_size): # 3 epoch + num_iter = 0 + new_datas = [] + for item in dataset_iter: + assert len(item) == 1 + new_datas.append(item) + num_iter += 1 + assert num_iter == 13 + assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13] + assert data_list != new_datas + data_list = new_datas + + # shuffle=Shuffle.GLOBAL + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.GLOBAL, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + data_list = [] + dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True) + for epoch in range(epoch_size): # 3 epoch + num_iter = 0 + new_datas = [] + for item in dataset_iter: + assert len(item) == 1 + new_datas.append(item) + num_iter += 1 + assert num_iter == 13 + assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13] + assert data_list != new_datas + data_list = new_datas + + # shuffle=Shuffle.INFILE + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.INFILE, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + data_list = [] + dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True) + for epoch in range(epoch_size): # 3 epoch + num_iter = 0 + new_datas = [] + for item in dataset_iter: + assert len(item) == 1 + new_datas.append(item) + num_iter += 1 + assert num_iter == 13 + assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13] + assert data_list != new_datas + data_list = new_datas + + # shuffle=Shuffle.FILES + datas_epoch1 = [] + datas_epoch2 = [] + datas_epoch3 = [] + for shard_id in range(4): + num_readers = 2 + data_set = ds.MindDataset(dataset_file=FILES, + num_parallel_workers=num_readers, + shuffle=ds.Shuffle.FILES, + num_shards=4, + shard_id=shard_id) + assert data_set.get_dataset_size() == 13 + dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True) + for epoch in range(epoch_size): # 3 epoch + num_iter = 0 + for item in dataset_iter: + assert len(item) == 1 + if epoch == 0: + datas_epoch1.append(item) + elif epoch == 1: + datas_epoch2.append(item) + elif epoch == 2: + datas_epoch3.append(item) + num_iter += 1 + assert num_iter == 13 + assert datas_epoch1 not in (datas_epoch2, datas_epoch3) + assert datas_epoch2 not in (datas_epoch1, datas_epoch3) + assert datas_epoch3 not in (datas_epoch2, datas_epoch1) if __name__ == '__main__': test_nlp_compress_data(add_and_remove_nlp_compress_file) @@ -1983,3 +2600,6 @@ if __name__ == '__main__': test_write_with_multi_array_and_MindDataset() test_numpy_generic() test_write_with_float32_float64_float32_array_float64_array_and_MindDataset() + test_shuffle_with_global_infile_files(create_multi_mindrecord_files) + test_distributed_shuffle_with_global_infile_files(create_multi_mindrecord_files) + test_distributed_shuffle_with_multi_epochs(create_multi_mindrecord_files)