enhance: enable shuffle infile / files when mindrecord

This commit is contained in:
jonyguo 2021-05-10 17:00:36 +08:00
parent 0d4ad73034
commit 9452142b50
19 changed files with 972 additions and 104 deletions

View File

@ -23,6 +23,7 @@
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/engine/runtime_context.h"
#include "minddata/dataset/include/dataset/constants.h"
#include "minddata/dataset/include/dataset/iterator.h"
#include "minddata/dataset/include/dataset/samplers.h"
#include "minddata/dataset/include/dataset/transforms.h"
@ -1001,19 +1002,7 @@ ManifestDataset::ManifestDataset(const std::vector<char> &dataset_file, const st
MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
const std::vector<std::vector<char>> &columns_list,
const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample,
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj,
sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded,
int64_t num_padded, ShuffleMode shuffle_mode,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
@ -1021,13 +1010,29 @@ MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj,
sample, num_padded, cache);
sample, num_padded, shuffle_mode, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded, ShuffleMode shuffle_mode,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj,
sample, num_padded, shuffle_mode, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
int64_t num_padded, ShuffleMode shuffle_mode,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler.get().Parse();
nlohmann::json sample = nullptr;
if (padded_sample) {
@ -1035,26 +1040,14 @@ MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file,
}
auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj,
sample, num_padded, cache);
sample, num_padded, shuffle_mode, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list,
const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample,
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list),
sampler_obj, sample, num_padded, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded,
int64_t num_padded, ShuffleMode shuffle_mode,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
@ -1063,20 +1056,37 @@ MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_f
}
auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list),
sampler_obj, sample, num_padded, cache);
sampler_obj, sample, num_padded, shuffle_mode, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded, ShuffleMode shuffle_mode,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler ? sampler->Parse() : nullptr;
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list),
sampler_obj, sample, num_padded, shuffle_mode, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) {
int64_t num_padded, ShuffleMode shuffle_mode,
const std::shared_ptr<DatasetCache> &cache) {
auto sampler_obj = sampler.get().Parse();
nlohmann::json sample = nullptr;
if (padded_sample) {
sample = *padded_sample;
}
auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list),
sampler_obj, sample, num_padded, cache);
sampler_obj, sample, num_padded, shuffle_mode, cache);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
#endif

View File

@ -173,25 +173,25 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) {
(void)py::class_<MindDataNode, DatasetNode, std::shared_ptr<MindDataNode>>(*m, "MindDataNode",
"to create a MindDataNode")
.def(py::init([](std::string dataset_file, py::list columns_list, py::handle sampler,
py::dict padded_sample, int64_t num_padded) {
py::dict padded_sample, int64_t num_padded, ShuffleMode shuffle_mode) {
nlohmann::json padded_sample_json;
std::map<std::string, std::string> sample_bytes;
THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes));
auto minddata = std::make_shared<MindDataNode>(dataset_file, toStringVector(columns_list),
toSamplerObj(sampler, true), padded_sample_json,
num_padded, nullptr);
num_padded, shuffle_mode, nullptr);
minddata->SetSampleBytes(&sample_bytes);
THROW_IF_ERROR(minddata->ValidateParams());
return minddata;
}))
.def(py::init([](py::list dataset_file, py::list columns_list, py::handle sampler,
py::dict padded_sample, int64_t num_padded) {
py::dict padded_sample, int64_t num_padded, ShuffleMode shuffle_mode) {
nlohmann::json padded_sample_json;
std::map<std::string, std::string> sample_bytes;
THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes));
auto minddata = std::make_shared<MindDataNode>(
toStringVector(dataset_file), toStringVector(columns_list), toSamplerObj(sampler, true),
padded_sample_json, num_padded, nullptr);
padded_sample_json, num_padded, shuffle_mode, nullptr);
minddata->SetSampleBytes(&sample_bytes);
THROW_IF_ERROR(minddata->ValidateParams());
return minddata;

View File

@ -18,6 +18,7 @@
#include "minddata/dataset/api/python/pybind_register.h"
#include "minddata/dataset/include/dataset/constants.h"
#include "minddata/dataset/util/random.h"
#include "minddata/mindrecord/include/shard_distributed_sample.h"
#include "minddata/mindrecord/include/shard_operator.h"
@ -84,5 +85,14 @@ PYBIND_REGISTER(
}));
}));
PYBIND_REGISTER(ShuffleMode, 1, ([](const py::module *m) {
(void)py::enum_<ShuffleMode>(*m, "ShuffleMode", py::arithmetic())
.value("FALSE", ShuffleMode::kFalse)
.value("FILES", ShuffleMode::kFiles)
.value("GLOBAL", ShuffleMode::kGlobal)
.value("INFILE", ShuffleMode::kInfile)
.export_values();
}));
} // namespace dataset
} // namespace mindspore

View File

@ -53,6 +53,7 @@ MindRecordOp::Builder::Builder() : build_dataset_file_({}), builder_sampler_(nul
build_load_dataset_ = false;
build_num_padded_ = 0;
build_sample_ = nullptr;
build_shuffle_mode_ = ShuffleMode::kGlobal;
}
// The builder "build" method creates the final object.
@ -75,7 +76,7 @@ Status MindRecordOp::Builder::Build(std::shared_ptr<MindRecordOp> *ptr) {
new_mind_record_op = std::make_shared<MindRecordOp>(
build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_, build_op_connector_queue_size_,
build_columns_to_load_, build_operators_, build_num_padded_, sample_json, build_sample_bytes_,
build_columns_to_load_, build_operators_, build_num_padded_, sample_json, build_sample_bytes_, build_shuffle_mode_,
std::move(shard_reader), builder_sampler_);
RETURN_IF_NOT_OK(new_mind_record_op->Init());
@ -118,7 +119,8 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::str
int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded,
const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes,
std::unique_ptr<ShardReader> shard_reader, std::shared_ptr<SamplerRT> sampler)
const ShuffleMode shuffle_mode, std::unique_ptr<ShardReader> shard_reader,
std::shared_ptr<SamplerRT> sampler)
: MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)),
dataset_file_(dataset_file),
load_dataset_(load_dataset),
@ -129,6 +131,7 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::str
num_padded_(num_padded),
sample_json_(sample_json),
sample_bytes_(sample_bytes),
shuffle_mode_(shuffle_mode),
shard_reader_(std::move(shard_reader)) {
io_block_queues_.Init(num_workers_, op_connector_queue_size);
epoch_sync_flag_ = true; // MindRecordOp needs to turn this flag on, otherwise, calling ShuffleTask() before all

View File

@ -116,6 +116,11 @@ class MindRecordOp : public MappableLeafOp {
return *this;
}
Builder &SetShuffleMode(const ShuffleMode shuffle_mode) {
build_shuffle_mode_ = shuffle_mode;
return *this;
}
Status SanityCheck() const;
static int32_t num_mind_record_workers() { return kDefaultMindRecordWorkers; }
@ -138,6 +143,7 @@ class MindRecordOp : public MappableLeafOp {
py::handle build_sample_;
std::map<std::string, std::string> build_sample_bytes_;
std::shared_ptr<SamplerRT> builder_sampler_;
ShuffleMode build_shuffle_mode_;
};
// Constructor of the MindRecordOp.
@ -152,7 +158,8 @@ class MindRecordOp : public MappableLeafOp {
int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded_,
const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes_,
std::unique_ptr<ShardReader> shard_reader, std::shared_ptr<SamplerRT> sampler);
const ShuffleMode shuffle_mode_, std::unique_ptr<ShardReader> shard_reader,
std::shared_ptr<SamplerRT> sampler);
// Destructor
~MindRecordOp() override;
@ -240,6 +247,8 @@ class MindRecordOp : public MappableLeafOp {
std::unique_ptr<ShardReader> shard_reader_;
std::mutex ended_worker_mutex_;
ShuffleMode shuffle_mode_;
};
} // namespace dataset
} // namespace mindspore

View File

@ -35,7 +35,7 @@ namespace dataset {
MindDataNode::MindDataNode(const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache = nullptr)
ShuffleMode shuffle_mode, std::shared_ptr<DatasetCache> cache)
: MappableSourceNode(std::move(cache)),
dataset_file_(std::string()),
dataset_files_(dataset_files),
@ -45,11 +45,12 @@ MindDataNode::MindDataNode(const std::vector<std::string> &dataset_files, const
sampler_(std::make_shared<MindRecordSamplerObj>()),
padded_sample_(padded_sample),
sample_bytes_({}),
num_padded_(num_padded) {}
num_padded_(num_padded),
shuffle_mode_(shuffle_mode) {}
MindDataNode::MindDataNode(const std::string &dataset_file, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache = nullptr)
ShuffleMode shuffle_mode, std::shared_ptr<DatasetCache> cache)
: MappableSourceNode(std::move(cache)),
dataset_file_(dataset_file),
dataset_files_({}),
@ -59,15 +60,18 @@ MindDataNode::MindDataNode(const std::string &dataset_file, const std::vector<st
sampler_(std::make_shared<MindRecordSamplerObj>()),
padded_sample_(padded_sample),
sample_bytes_({}),
num_padded_(num_padded) {}
num_padded_(num_padded),
shuffle_mode_(shuffle_mode) {}
std::shared_ptr<DatasetNode> MindDataNode::Copy() {
std::shared_ptr<MindDataNode> node;
std::shared_ptr<SamplerObj> sampler = (input_sampler_ == nullptr) ? nullptr : input_sampler_->SamplerCopy();
if (dataset_files_.empty()) {
node = std::make_shared<MindDataNode>(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_, cache_);
node = std::make_shared<MindDataNode>(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_,
shuffle_mode_, cache_);
} else {
node = std::make_shared<MindDataNode>(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_, cache_);
node = std::make_shared<MindDataNode>(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_,
shuffle_mode_, cache_);
}
node->SetSampleBytes(&sample_bytes_);
return node;
@ -129,7 +133,7 @@ Status MindDataNode::ValidateParams() {
// Helper function to create runtime sampler for minddata dataset
Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr<SamplerObj> &sampler,
std::vector<std::shared_ptr<mindrecord::ShardOperator>> *operators_,
int64_t num_padded) {
int64_t num_padded, ShuffleMode shuffle_mode) {
std::shared_ptr<mindrecord::ShardOperator> op = sampler->BuildForMindDataset();
if (op == nullptr) {
std::string err_msg =
@ -140,10 +144,15 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr<SamplerO
}
std::stack<std::shared_ptr<mindrecord::ShardOperator>> stack_ops;
while (op != nullptr) {
auto sampler_op = std::dynamic_pointer_cast<mindrecord::ShardDistributedSample>(op);
if (sampler_op && num_padded > 0) {
sampler_op->SetNumPaddedSamples(num_padded);
stack_ops.push(sampler_op);
// update the shuffle mode for sampler op or shuffle op
if (shuffle_mode != ShuffleMode::kFalse) {
op->UpdateShuffleMode(shuffle_mode);
}
auto distributed_sampler_op = std::dynamic_pointer_cast<mindrecord::ShardDistributedSample>(op);
if (distributed_sampler_op && num_padded > 0) {
distributed_sampler_op->SetNumPaddedSamples(num_padded);
stack_ops.push(distributed_sampler_op);
} else {
stack_ops.push(op);
}
@ -160,7 +169,7 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr<SamplerO
void MindDataNode::SetSampleBytes(std::map<std::string, std::string> *sample_bytes) { sample_bytes_ = *sample_bytes; }
Status MindDataNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators_, num_padded_));
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators_, num_padded_, shuffle_mode_));
std::shared_ptr<SamplerRT> sampler_rt = nullptr;
// Build the sampler IR into a runtime sampler.
@ -195,11 +204,11 @@ Status MindDataNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
std::vector<std::string> dataset_file_vec_ = {dataset_file_};
mindrecord_op = std::make_shared<MindRecordOp>(
num_workers_, dataset_file_vec_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_,
padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt));
padded_sample_, sample_bytes_, shuffle_mode_, std::move(shard_reader), std::move(sampler_rt));
} else {
mindrecord_op = std::make_shared<MindRecordOp>(
num_workers_, dataset_files_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_,
padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt));
padded_sample_, sample_bytes_, shuffle_mode_, std::move(shard_reader), std::move(sampler_rt));
}
RETURN_IF_NOT_OK(mindrecord_op->Init());
@ -226,7 +235,7 @@ Status MindDataNode::GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &si
}
int64_t num_rows = -1;
std::vector<std::shared_ptr<ShardOperator>> operators;
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators, num_padded_));
RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators, num_padded_, shuffle_mode_));
if (search_for_pattern_) {
dataset_files_ = {dataset_file_};

View File

@ -33,12 +33,12 @@ class MindDataNode : public MappableSourceNode {
/// \brief Constructor
MindDataNode(const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache);
ShuffleMode shuffle_mode = ShuffleMode::kGlobal, std::shared_ptr<DatasetCache> cache = nullptr);
/// \brief Constructor
MindDataNode(const std::string &dataset_file, const std::vector<std::string> &columns_list,
const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded,
std::shared_ptr<DatasetCache> cache);
ShuffleMode shuffle_mode = ShuffleMode::kGlobal, std::shared_ptr<DatasetCache> cache = nullptr);
/// \brief Destructor
~MindDataNode() = default;
@ -72,7 +72,7 @@ class MindDataNode : public MappableSourceNode {
/// \return Status Status::OK() if input sampler is valid
Status BuildMindDatasetSamplerChain(const std::shared_ptr<SamplerObj> &sampler,
std::vector<std::shared_ptr<mindrecord::ShardOperator>> *operators_,
int64_t num_padded);
int64_t num_padded, ShuffleMode shuffle_mode);
/// \brief Set sample_bytes when padded_sample has py::byte value
/// \note Pybind will use this function to set sample_bytes into MindDataNode
@ -118,6 +118,7 @@ class MindDataNode : public MappableSourceNode {
std::map<std::string, std::string> sample_bytes_; // enable in python
int64_t num_padded_;
std::vector<std::shared_ptr<ShardOperator>> operators_;
ShuffleMode shuffle_mode_;
};
} // namespace dataset

View File

@ -36,7 +36,7 @@ enum class DatasetType { kUnknown, kArrow, kTf };
enum class TensorImpl { kNone, kFlexible, kCv, kNP };
// Possible values for shuffle
enum class ShuffleMode { kFalse = 0, kFiles = 1, kGlobal = 2 };
enum class ShuffleMode { kFalse = 0, kFiles = 1, kGlobal = 2, kInfile = 3 };
// Possible values for Border types
enum class BorderType { kConstant = 0, kEdge = 1, kReflect = 2, kSymmetric = 3 };

View File

@ -1247,25 +1247,31 @@ class MindDataDataset : public Dataset {
public:
explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list,
const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample,
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache);
int64_t num_padded, ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr);
explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list,
const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache);
ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr);
explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache);
int64_t num_padded, ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr);
explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list, const std::shared_ptr<Sampler> &sampler,
const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache);
ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr);
explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list, const Sampler *sampler,
const nlohmann::json *padded_sample, int64_t num_padded,
const std::shared_ptr<DatasetCache> &cache);
ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr);
explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files,
const std::vector<std::vector<char>> &columns_list,
const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample,
int64_t num_padded, const std::shared_ptr<DatasetCache> &cache);
int64_t num_padded, ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr);
~MindDataDataset() = default;
};
@ -1284,9 +1290,10 @@ class MindDataDataset : public Dataset {
inline std::shared_ptr<MindDataDataset> MindData(
const std::string &dataset_file, const std::vector<std::string> &columns_list = {},
const std::shared_ptr<Sampler> &sampler = std::make_shared<RandomSampler>(), nlohmann::json *padded_sample = nullptr,
int64_t num_padded = 0, const std::shared_ptr<DatasetCache> &cache = nullptr) {
int64_t num_padded = 0, ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded, cache);
padded_sample, num_padded, shuffle_mode, cache);
}
/// \brief Function to create a MindDataDataset
@ -1302,9 +1309,10 @@ inline std::shared_ptr<MindDataDataset> MindData(
inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file,
const std::vector<std::string> &columns_list, const Sampler *sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded, cache);
padded_sample, num_padded, shuffle_mode, cache);
}
/// \brief Function to create a MindDataDataset
@ -1321,9 +1329,10 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file
const std::vector<std::string> &columns_list,
const std::reference_wrapper<Sampler> sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded, cache);
padded_sample, num_padded, shuffle_mode, cache);
}
/// \brief Function to create a MindDataDataset
@ -1340,9 +1349,10 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file
inline std::shared_ptr<MindDataDataset> MindData(
const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list = {},
const std::shared_ptr<Sampler> &sampler = std::make_shared<RandomSampler>(), nlohmann::json *padded_sample = nullptr,
int64_t num_padded = 0, const std::shared_ptr<DatasetCache> &cache = nullptr) {
int64_t num_padded = 0, ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded, cache);
padded_sample, num_padded, shuffle_mode, cache);
}
/// \brief Function to create a MindDataDataset
@ -1357,9 +1367,10 @@ inline std::shared_ptr<MindDataDataset> MindData(
inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string> &dataset_files,
const std::vector<std::string> &columns_list, const Sampler *sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded, cache);
padded_sample, num_padded, shuffle_mode, cache);
}
/// \brief Function to create a MindDataDataset
@ -1375,9 +1386,10 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string>
const std::vector<std::string> &columns_list,
const std::reference_wrapper<Sampler> sampler,
nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0,
ShuffleMode shuffle_mode = ShuffleMode::kGlobal,
const std::shared_ptr<DatasetCache> &cache = nullptr) {
return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler,
padded_sample, num_padded, cache);
padded_sample, num_padded, shuffle_mode, cache);
}
class MnistDataset : public Dataset {

View File

@ -18,7 +18,9 @@
#define MINDSPORE_CCSRC_MINDDATA_MINDRECORD_INCLUDE_SHARD_OPERATOR_H_
#include <memory>
#include <vector>
#include "minddata/mindrecord/include/shard_task_list.h"
#include "minddata/dataset/include/dataset/constants.h"
namespace mindspore {
namespace mindrecord {
@ -38,6 +40,7 @@ class __attribute__((visibility("default"))) ShardOperator {
}
return SUCCESS;
}
virtual bool HasChildOp() { return child_op_ != nullptr; }
virtual MSRStatus SetChildOp(std::shared_ptr<ShardOperator> child_op) {
@ -55,8 +58,26 @@ class __attribute__((visibility("default"))) ShardOperator {
virtual int64_t GetNumSamples(int64_t dataset_size, int64_t num_classes) { return 0; }
virtual void UpdateShuffleMode(dataset::ShuffleMode shuffle_mode) { shuffle_mode_ = shuffle_mode; }
virtual dataset::ShuffleMode GetShuffleMode() { return shuffle_mode_; }
virtual void SetShardSampleCount(const std::vector<uint32_t> &shard_sample_count) {
shard_sample_count_ = shard_sample_count;
}
virtual std::vector<uint32_t> GetShardSampleCount() { return shard_sample_count_; }
private:
std::shared_ptr<ShardOperator> child_op_ = nullptr;
// indicate shard_id : inc_count
// 0 : 15 - shard0 has 15 samples
// 1 : 41 - shard1 has 26 samples
// 2 : 58 - shard2 has 17 samples
std::vector<uint32_t> shard_sample_count_;
dataset::ShuffleMode shuffle_mode_ = dataset::ShuffleMode::kGlobal;
};
} // namespace mindrecord
} // namespace mindspore

View File

@ -39,6 +39,12 @@ class __attribute__((visibility("default"))) ShardShuffle : public ShardOperator
// Private helper function
MSRStatus CategoryShuffle(ShardTaskList &tasks);
// Keep the file sequence the same but shuffle the data within each file
MSRStatus ShuffleInfile(ShardTaskList &tasks);
// Shuffle the file sequence but keep the order of data within each file
MSRStatus ShuffleFiles(ShardTaskList &tasks);
uint32_t shuffle_seed_;
int64_t no_of_samples_;
bool replacement_;

View File

@ -148,6 +148,10 @@ MSRStatus ShardReader::Init(const std::vector<std::string> &file_paths, bool loa
}
num_rows_ = 0;
auto row_group_summary = ReadRowGroupSummary();
// clear the shard_sample_count_, because it will be insert when Launch func
shard_sample_count_.clear();
for (const auto &rg : row_group_summary) {
num_rows_ += std::get<3>(rg);
}
@ -305,6 +309,7 @@ std::vector<std::tuple<int, int, int, uint64_t>> ShardReader::ReadRowGroupSummar
shard_sample_count_.push_back(total_count);
}
}
return row_group_summary;
}
@ -1224,6 +1229,7 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
break;
}
}
if (-1 == category_operator) {
if (lazy_load_ == false) {
if (SUCCESS != CreateTasksByRow(row_group_summary, operators)) {
@ -1254,6 +1260,11 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
for (uint32_t operator_no = 0; operator_no < operators.size(); operator_no++) {
const auto &op = operators[operator_no];
if (std::dynamic_pointer_cast<ShardCategory>(op)) continue;
if (std::dynamic_pointer_cast<ShardDistributedSample>(op) || std::dynamic_pointer_cast<ShardShuffle>(op)) {
op->SetShardSampleCount(shard_sample_count_);
}
if (SUCCESS != (*op)(tasks_)) {
return FAILED;
}

View File

@ -72,6 +72,8 @@ MSRStatus ShardDistributedSample::PreExecute(ShardTaskList &tasks) {
tasks = task_;
}
if (shuffle_ == true) {
shuffle_op_->SetShardSampleCount(GetShardSampleCount());
shuffle_op_->UpdateShuffleMode(GetShuffleMode());
if (SUCCESS != (*shuffle_op_)(tasks)) {
return FAILED;
}

View File

@ -66,6 +66,97 @@ MSRStatus ShardShuffle::CategoryShuffle(ShardTaskList &tasks) {
return SUCCESS;
}
MSRStatus ShardShuffle::ShuffleFiles(ShardTaskList &tasks) {
if (no_of_samples_ == 0) {
no_of_samples_ = static_cast<int>(tasks.Size());
}
if (no_of_samples_ <= 0) {
MS_LOG(ERROR) << "no_of_samples need to be positive.";
return FAILED;
}
auto shard_sample_cout = GetShardSampleCount();
// shuffle the files index
std::vector<uint32_t> shuffle_files;
for (uint32_t i = 0; i < shard_sample_cout.size(); i++) {
shuffle_files.push_back(i);
}
std::shuffle(shuffle_files.begin(), shuffle_files.end(), std::default_random_engine(shuffle_seed_));
// reconstruct the permutation between files
// -- before --
// file1: [0, 1, 2]
// file2: [3, 4, 5, 6]
// file3: [7, 8]
// file4: [9, 10]
// files: [file1, file2, file3, file4]
// permutation: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// -- after --
// files: [file4, file1, file3, file2]
// permutation : [9, 10, 0, 1, 2, 7, 8, 3, 4, 5, 6]
auto original_permutation = tasks.permutation_;
uint32_t whole_index = 0;
for (uint32_t i = 0; i < shuffle_files.size(); i++) {
uint32_t start_index = 0;
uint32_t current_size = 0;
if (shuffle_files[i] == 0) {
start_index = 0;
current_size = shard_sample_cout[shuffle_files[i]];
} else {
start_index = shard_sample_cout[shuffle_files[i] - 1];
current_size = shard_sample_cout[shuffle_files[i]] - start_index;
}
std::copy(original_permutation.begin() + start_index, original_permutation.begin() + start_index + current_size,
tasks.permutation_.begin() + whole_index);
whole_index += current_size;
}
auto total_no = static_cast<int64_t>(tasks.Size());
size_t samples_to_assign =
(no_of_samples_ > 0 && no_of_samples_ < total_no) ? no_of_samples_ : tasks.sample_ids_.size();
ShardTaskList new_tasks;
for (size_t i = 0; i < samples_to_assign; ++i) {
new_tasks.AssignTask(tasks, tasks.permutation_[i]);
}
ShardTaskList::TaskListSwap(tasks, new_tasks);
}
MSRStatus ShardShuffle::ShuffleInfile(ShardTaskList &tasks) {
if (no_of_samples_ == 0) {
no_of_samples_ = static_cast<int>(tasks.Size());
}
if (no_of_samples_ <= 0) {
MS_LOG(ERROR) << "no_of_samples need to be positive.";
return FAILED;
}
// reconstruct the permutation in file
// -- before --
// file1: [0, 1, 2]
// file2: [3, 4, 5, 6]
// file3: [7, 8]
// file4: [9, 10]
// files: [file1, file2, file3, file4]
// permutation: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// -- after --
// permutation: [2, 0, 1, 4, 6, 3, 5, 8, 7, 9, 10]
auto shard_sample_cout = GetShardSampleCount();
uint32_t start_index = 0;
for (uint32_t i = 0; i < shard_sample_cout.size(); i++) {
auto current_size = shard_sample_cout[i] - start_index;
std::shuffle(tasks.permutation_.begin() + start_index, tasks.permutation_.begin() + start_index + current_size,
std::default_random_engine(shuffle_seed_));
start_index = shard_sample_cout[i];
}
auto total_no = static_cast<int64_t>(tasks.Size());
ShardTaskList new_tasks;
size_t samples_to_assign =
(no_of_samples_ > 0 && no_of_samples_ < total_no) ? no_of_samples_ : tasks.sample_ids_.size();
for (size_t i = 0; i < samples_to_assign; ++i) {
new_tasks.AssignTask(tasks, tasks.permutation_[i]);
}
ShardTaskList::TaskListSwap(tasks, new_tasks);
}
MSRStatus ShardShuffle::Execute(ShardTaskList &tasks) {
if (reshuffle_each_epoch_) shuffle_seed_++;
if (tasks.categories < 1) {
@ -75,6 +166,7 @@ MSRStatus ShardShuffle::Execute(ShardTaskList &tasks) {
if (tasks.permutation_.empty() == true) {
tasks.MakePerm();
}
if (GetShuffleMode() == dataset::ShuffleMode::kGlobal) {
if (replacement_ == true) {
ShardTaskList new_tasks;
if (no_of_samples_ == 0) no_of_samples_ = static_cast<int>(tasks.sample_ids_.size());
@ -98,6 +190,17 @@ MSRStatus ShardShuffle::Execute(ShardTaskList &tasks) {
}
ShardTaskList::TaskListSwap(tasks, new_tasks);
}
} else if (GetShuffleMode() == dataset::ShuffleMode::kInfile) {
auto ret = ShuffleInfile(tasks);
if (ret != SUCCESS) {
return ret;
}
} else if (GetShuffleMode() == dataset::ShuffleMode::kFiles) {
auto ret = ShuffleFiles(tasks);
if (ret != SUCCESS) {
return ret;
}
}
} else { // shuffle unit like: (a1, b1, c1),(a2, b2, c2),..., (an, bn, cn)
return this->CategoryShuffle(tasks);
}

View File

@ -72,7 +72,41 @@ except ModuleNotFoundError:
class Shuffle(str, Enum):
GLOBAL: str = "global"
FILES: str = "file"
FILES: str = "files"
INFILE: str = "infile"
ShuffleToShuffleMode = {Shuffle.FILES: cde.ShuffleMode.FILES,
Shuffle.GLOBAL: cde.ShuffleMode.GLOBAL,
Shuffle.INFILE: cde.ShuffleMode.INFILE}
def shuffle_to_shuffle_mode(shuffle):
"""class Shuffle Enum to int"""
shuffle_mode = cde.ShuffleMode.GLOBAL # Global shuffle
if not isinstance(shuffle, Shuffle):
if shuffle is None or shuffle:
shuffle_mode = cde.ShuffleMode.GLOBAL # Global shuffle
else:
shuffle_mode = cde.ShuffleMode.FALSE # No shuffle
else:
shuffle_mode = ShuffleToShuffleMode[shuffle]
return shuffle_mode
def shuffle_to_bool(shuffle):
"""class Shuffle Enum to bool"""
shuffle_bool = True
if not isinstance(shuffle, Shuffle):
if shuffle is None:
shuffle_bool = None
elif shuffle:
shuffle_bool = True
else:
shuffle_bool = False
else:
shuffle_bool = True
return shuffle_bool
@check_zip
@ -1660,8 +1694,8 @@ class SourceDataset(Dataset):
self.shard_id = replace_none(shard_id, 0)
if shuffle is not None and not isinstance(shuffle, (bool, Shuffle)):
raise TypeError(
"shuffle must be of boolean or enum of 'Shuffle' values like 'Shuffle.GLOBAL' or 'Shuffle.FILES'.")
raise TypeError("shuffle must be of boolean or enum of 'Shuffle' values like 'Shuffle.GLOBAL' or "
"'Shuffle.FILES' or 'Shuffle.INFILE'.")
self.shuffle_flag = 2 # Global shuffle
if not isinstance(shuffle, Shuffle):
@ -1674,6 +1708,8 @@ class SourceDataset(Dataset):
self.shuffle_flag = 2 # Global shuffle
elif shuffle == Shuffle.FILES:
self.shuffle_flag = 1 # Files shuffle
elif shuffle == Shuffle.INFILE:
self.shuffle_flag = 3 # Infile shuffle
def parse(self, children=None):
raise NotImplementedError("Dataset has to implement parse method.")
@ -3121,8 +3157,18 @@ class MindDataset(MappableDataset):
it represents for a list of dataset files to be read directly.
columns_list (list[str], optional): List of columns to be read (default=None).
num_parallel_workers (int, optional): The number of readers (default=None).
shuffle (bool, optional): Whether or not to perform shuffle on the dataset
(default=None, performs shuffle).
shuffle (Union[bool, Shuffle level], optional): Perform reshuffling of the data every epoch
(default=None, performs global shuffle).
If shuffle is False, no shuffling will be performed;
If shuffle is True, the behavior is the same as setting shuffle to be Shuffle.GLOBAL
Otherwise, there are three levels of shuffling:
- Shuffle.GLOBAL: Global shuffle of all rows of data in dataset.
- Shuffle.FILES: Shuffle the file sequence but keep the order of data within each file.
- Shuffle.INFILE: Keep the file sequence the same but shuffle the data within each file.
num_shards (int, optional): Number of shards that the dataset will be divided into (default=None).
When this argument is specified, 'num_samples' reflects the max sample number of per shard.
shard_id (int, optional): The shard ID within num_shards (default=None). This
@ -3151,20 +3197,23 @@ class MindDataset(MappableDataset):
def parse(self, children=None):
return cde.MindDataNode(self.dataset_file, self.columns_list, self.sampler, self.new_padded_sample,
self.num_padded)
self.num_padded, shuffle_to_shuffle_mode(self.shuffle_option))
@check_minddataset
def __init__(self, dataset_file, columns_list=None, num_parallel_workers=None, shuffle=None, num_shards=None,
shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None, cache=None):
if shuffle is not None and not isinstance(shuffle, (bool, Shuffle)):
raise TypeError("shuffle must be of boolean or enum of 'Shuffle' values like 'Shuffle.GLOBAL' or "
"'Shuffle.FILES' or 'Shuffle.INFILE'.")
self.shuffle_option = shuffle
super().__init__(num_parallel_workers=num_parallel_workers, sampler=sampler, num_samples=num_samples,
shuffle=shuffle, num_shards=num_shards, shard_id=shard_id, cache=cache)
shuffle=shuffle_to_bool(shuffle), num_shards=num_shards, shard_id=shard_id, cache=cache)
if isinstance(dataset_file, list):
self.load_dataset = False
else:
self.load_dataset = True
self.dataset_file = dataset_file
self.columns_list = replace_none(columns_list, [])
self.shuffle_option = shuffle
if shuffle is False:
logger.warning("WARN: global shuffle is not used.")

View File

@ -370,7 +370,8 @@ class DistributedSampler(BuiltinSampler):
def parse_for_minddataset(self):
num_samples = self.num_samples if self.num_samples is not None else 0
c_sampler = cde.MindrecordDistributedSampler(self.num_shards, self.shard_id, self.shuffle,
shuffle = self.shuffle if self.shuffle is not None else True
c_sampler = cde.MindrecordDistributedSampler(self.num_shards, self.shard_id, shuffle,
self.seed, num_samples, self.offset)
c_child_sampler = self.parse_child_for_minddataset()
c_sampler.add_child(c_child_sampler)

View File

@ -404,7 +404,7 @@ def to_policy(op_list):
def to_shuffle_mode(shuffle):
if shuffle == 2: return "global"
if shuffle == 1: return "file"
if shuffle == 1: return "files"
return False

View File

@ -463,7 +463,8 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMindRecordCApi) {
std::string file_path = datasets_root_path_ + "/../mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0";
// Create a MindRecord Dataset, 20 records in it
std::shared_ptr<Dataset> ds = MindData(file_path, {}, std::make_shared<RandomSampler>(), nullptr, 0, some_cache);
std::shared_ptr<Dataset> ds = MindData(file_path, {}, std::make_shared<RandomSampler>(), nullptr, 0,
ShuffleMode::kGlobal, some_cache);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset

View File

@ -1950,6 +1950,623 @@ def test_write_with_float32_float64_float32_array_float64_array_and_MindDataset(
os.remove("{}".format(mindrecord_file_name))
os.remove("{}.db".format(mindrecord_file_name))
FILES = ["0.mindrecord", "1.mindrecord", "2.mindrecord", "3.mindrecord"]
ITEMS = [10, 14, 8, 20]
FILES_ITEMS = {FILES[0]: ITEMS[0], FILES[1]: ITEMS[1], FILES[2]: ITEMS[2], FILES[3]: ITEMS[3]}
@pytest.fixture
def create_multi_mindrecord_files():
"""files: {0.mindrecord : 10, 1.mindrecord : 14, 2.mindrecord : 8, 3.mindrecord : 20}"""
try:
index = 0
for filename in FILES_ITEMS:
key = filename
if os.path.exists(key):
os.remove("{}".format(key))
os.remove("{}.db".format(key))
value = FILES_ITEMS[key]
data_list = []
for i in range(value):
data = {}
data['id'] = i + index
data_list.append(data)
index += value
writer = FileWriter(key)
schema = {"id": {"type": "int32"}}
writer.add_schema(schema, "data is so cool")
writer.write_raw_data(data_list)
writer.commit()
yield "yield_create_multi_mindrecord_files"
except Exception as error:
for filename in FILES_ITMES:
if os.path.exists(filename):
os.remove("{}".format(filename))
os.remove("{}.db".format(filename))
raise error
else:
for filename in FILES_ITEMS:
if os.path.exists(filename):
os.remove("{}".format(filename))
os.remove("{}.db".format(filename))
def test_shuffle_with_global_infile_files(create_multi_mindrecord_files):
datas_all = []
index = 0
for filename in FILES_ITEMS:
value = FILES_ITEMS[filename]
data_list = []
for i in range(value):
data = {}
data['id'] = np.array(i + index, dtype=np.int32)
data_list.append(data)
index += value
datas_all.append(data_list)
# no shuffle parameter
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers)
assert data_set.get_dataset_size() == 52
num_iter = 0
datas_all_minddataset = []
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
if num_iter == 9:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 23:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 31:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 51:
datas_all_minddataset.append(data_list)
data_list = []
num_iter += 1
assert data_set.get_dataset_size() == 52
assert len(datas_all) == len(datas_all_minddataset)
for i, _ in enumerate(datas_all):
assert len(datas_all[i]) == len(datas_all_minddataset[i])
assert datas_all[i] != datas_all_minddataset[i]
# shuffle=False
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=False)
assert data_set.get_dataset_size() == 52
num_iter = 0
datas_all_minddataset = []
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
if num_iter == 9:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 23:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 31:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 51:
datas_all_minddataset.append(data_list)
data_list = []
num_iter += 1
assert data_set.get_dataset_size() == 52
assert len(datas_all) == len(datas_all_minddataset)
for i, _ in enumerate(datas_all):
assert len(datas_all[i]) == len(datas_all_minddataset[i])
assert datas_all[i] == datas_all_minddataset[i]
# shuffle=True
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=True)
assert data_set.get_dataset_size() == 52
num_iter = 0
datas_all_minddataset = []
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
if num_iter == 9:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 23:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 31:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 51:
datas_all_minddataset.append(data_list)
data_list = []
num_iter += 1
assert data_set.get_dataset_size() == 52
assert len(datas_all) == len(datas_all_minddataset)
for i, _ in enumerate(datas_all):
assert len(datas_all[i]) == len(datas_all_minddataset[i])
assert datas_all[i] != datas_all_minddataset[i]
# shuffle=Shuffle.GLOBAL
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.GLOBAL)
assert data_set.get_dataset_size() == 52
num_iter = 0
datas_all_minddataset = []
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
if num_iter == 9:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 23:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 31:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 51:
datas_all_minddataset.append(data_list)
data_list = []
num_iter += 1
assert data_set.get_dataset_size() == 52
assert len(datas_all) == len(datas_all_minddataset)
for i, _ in enumerate(datas_all):
assert len(datas_all[i]) == len(datas_all_minddataset[i])
assert datas_all[i] != datas_all_minddataset[i]
# shuffle=Shuffle.INFILE
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.INFILE)
assert data_set.get_dataset_size() == 52
num_iter = 0
datas_all_minddataset = []
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
if num_iter == 9:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 23:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 31:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 51:
datas_all_minddataset.append(data_list)
data_list = []
num_iter += 1
assert data_set.get_dataset_size() == 52
def sort_list_with_dict(dict_in_list):
keys = []
for item in dict_in_list:
for key in item:
keys.append(int(item[key]))
keys.sort()
data_list = []
for item in keys:
data = {}
data['id'] = np.array(item, dtype=np.int32)
data_list.append(data)
return data_list
assert len(datas_all) == len(datas_all_minddataset)
for i, _ in enumerate(datas_all):
assert len(datas_all[i]) == len(datas_all_minddataset[i])
assert datas_all[i] != datas_all_minddataset[i]
# order the datas_all_minddataset
new_datas_all_minddataset = sort_list_with_dict(datas_all_minddataset[i])
assert datas_all[i] == new_datas_all_minddataset
# shuffle=Shuffle.FILES
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.FILES)
assert data_set.get_dataset_size() == 52
num_iter = 0
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
num_iter += 1
assert data_set.get_dataset_size() == 52
current_shard_size = 0
current_shard_index = 0
shard_count = 0
datas_index = 0
origin_index = [i for i in range(len(ITEMS))]
current_index = []
while shard_count < len(ITEMS):
if data_list[datas_index]['id'] < 10:
current_shard_index = 0
elif data_list[datas_index]['id'] < 24:
current_shard_index = 1
elif data_list[datas_index]['id'] < 32:
current_shard_index = 2
elif data_list[datas_index]['id'] < 52:
current_shard_index = 3
else:
raise ValueError("Index out of range")
current_shard_size = ITEMS[current_shard_index]
tmp_datas = data_list[datas_index:datas_index + current_shard_size]
current_index.append(current_shard_index)
assert len(datas_all[current_shard_index]) == len(tmp_datas)
assert datas_all[current_shard_index] == tmp_datas
datas_index += current_shard_size
shard_count += 1
assert origin_index != current_index
def test_distributed_shuffle_with_global_infile_files(create_multi_mindrecord_files):
datas_all = []
datas_all_samples = []
index = 0
for filename in FILES_ITEMS:
value = FILES_ITEMS[filename]
data_list = []
for i in range(value):
data = {}
data['id'] = np.array(i + index, dtype=np.int32)
data_list.append(data)
datas_all_samples.append(data)
index += value
datas_all.append(data_list)
# no shuffle parameter
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
num_shards=4,
shard_id=3)
assert data_set.get_dataset_size() == 13
num_iter = 0
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
num_iter += 1
assert num_iter == 13
assert data_list != datas_all_samples[3*13:]
# shuffle=False
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=False,
num_shards=4,
shard_id=2)
assert data_set.get_dataset_size() == 13
num_iter = 0
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
num_iter += 1
assert num_iter == 13
assert data_list == datas_all_samples[2*13:3*13]
# shuffle=True
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=True,
num_shards=4,
shard_id=1)
assert data_set.get_dataset_size() == 13
num_iter = 0
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
num_iter += 1
assert num_iter == 13
assert data_list != datas_all_samples[1*13:2*13]
# shuffle=Shuffle.GLOBAL
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.GLOBAL,
num_shards=4,
shard_id=0)
assert data_set.get_dataset_size() == 13
num_iter = 0
data_list = []
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
num_iter += 1
assert num_iter == 13
assert data_list != datas_all_samples[0:1*13]
# shuffle=Shuffle.INFILE
output_datas = []
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.INFILE,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
output_datas.append(item)
num_iter += 1
assert num_iter == 13
num_iter = 0
datas_all_minddataset = []
data_list = []
for item in output_datas:
assert len(item) == 1
data_list.append(item)
if num_iter == 9:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 23:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 31:
datas_all_minddataset.append(data_list)
data_list = []
elif num_iter == 51:
datas_all_minddataset.append(data_list)
data_list = []
num_iter += 1
assert num_iter == 52
def sort_list_with_dict(dict_in_list):
keys = []
for item in dict_in_list:
for key in item:
keys.append(int(item[key]))
keys.sort()
data_list = []
for item in keys:
data = {}
data['id'] = np.array(item, dtype=np.int32)
data_list.append(data)
return data_list
assert len(datas_all) == len(datas_all_minddataset)
for i, _ in enumerate(datas_all):
assert len(datas_all[i]) == len(datas_all_minddataset[i])
assert datas_all[i] != datas_all_minddataset[i]
# order the datas_all_minddataset
new_datas_all_minddataset = sort_list_with_dict(datas_all_minddataset[i])
assert datas_all[i] == new_datas_all_minddataset
# shuffle=Shuffle.FILES
data_list = []
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.FILES,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 1
data_list.append(item)
num_iter += 1
assert num_iter == 13
assert len(data_list) == 52
current_shard_size = 0
current_shard_index = 0
shard_count = 0
datas_index = 0
origin_index = [i for i in range(len(ITEMS))]
current_index = []
while shard_count < len(ITEMS):
if data_list[datas_index]['id'] < 10:
current_shard_index = 0
elif data_list[datas_index]['id'] < 24:
current_shard_index = 1
elif data_list[datas_index]['id'] < 32:
current_shard_index = 2
elif data_list[datas_index]['id'] < 52:
current_shard_index = 3
else:
raise ValueError("Index out of range")
current_shard_size = ITEMS[current_shard_index]
tmp_datas = data_list[datas_index:datas_index + current_shard_size]
current_index.append(current_shard_index)
assert len(datas_all[current_shard_index]) == len(tmp_datas)
assert datas_all[current_shard_index] == tmp_datas
datas_index += current_shard_size
shard_count += 1
assert origin_index != current_index
def test_distributed_shuffle_with_multi_epochs(create_multi_mindrecord_files):
datas_all = []
datas_all_samples = []
index = 0
for filename in FILES_ITEMS:
value = FILES_ITEMS[filename]
data_list = []
for i in range(value):
data = {}
data['id'] = np.array(i + index, dtype=np.int32)
data_list.append(data)
datas_all_samples.append(data)
index += value
datas_all.append(data_list)
epoch_size = 3
# no shuffle parameter
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
data_list = []
dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True)
for epoch in range(epoch_size): # 3 epoch
num_iter = 0
new_datas = []
for item in dataset_iter:
assert len(item) == 1
new_datas.append(item)
num_iter += 1
assert num_iter == 13
assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13]
assert data_list != new_datas
data_list = new_datas
# shuffle=False
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=False,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
data_list = []
dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True)
for epoch in range(epoch_size): # 3 epoch
num_iter = 0
new_datas = []
for item in dataset_iter:
assert len(item) == 1
new_datas.append(item)
num_iter += 1
assert num_iter == 13
assert new_datas == datas_all_samples[shard_id*13:(shard_id+1)*13]
# shuffle=True
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=True,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
data_list = []
dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True)
for epoch in range(epoch_size): # 3 epoch
num_iter = 0
new_datas = []
for item in dataset_iter:
assert len(item) == 1
new_datas.append(item)
num_iter += 1
assert num_iter == 13
assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13]
assert data_list != new_datas
data_list = new_datas
# shuffle=Shuffle.GLOBAL
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.GLOBAL,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
data_list = []
dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True)
for epoch in range(epoch_size): # 3 epoch
num_iter = 0
new_datas = []
for item in dataset_iter:
assert len(item) == 1
new_datas.append(item)
num_iter += 1
assert num_iter == 13
assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13]
assert data_list != new_datas
data_list = new_datas
# shuffle=Shuffle.INFILE
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.INFILE,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
data_list = []
dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True)
for epoch in range(epoch_size): # 3 epoch
num_iter = 0
new_datas = []
for item in dataset_iter:
assert len(item) == 1
new_datas.append(item)
num_iter += 1
assert num_iter == 13
assert new_datas != datas_all_samples[shard_id*13:(shard_id+1)*13]
assert data_list != new_datas
data_list = new_datas
# shuffle=Shuffle.FILES
datas_epoch1 = []
datas_epoch2 = []
datas_epoch3 = []
for shard_id in range(4):
num_readers = 2
data_set = ds.MindDataset(dataset_file=FILES,
num_parallel_workers=num_readers,
shuffle=ds.Shuffle.FILES,
num_shards=4,
shard_id=shard_id)
assert data_set.get_dataset_size() == 13
dataset_iter = data_set.create_dict_iterator(num_epochs=epoch_size, output_numpy=True)
for epoch in range(epoch_size): # 3 epoch
num_iter = 0
for item in dataset_iter:
assert len(item) == 1
if epoch == 0:
datas_epoch1.append(item)
elif epoch == 1:
datas_epoch2.append(item)
elif epoch == 2:
datas_epoch3.append(item)
num_iter += 1
assert num_iter == 13
assert datas_epoch1 not in (datas_epoch2, datas_epoch3)
assert datas_epoch2 not in (datas_epoch1, datas_epoch3)
assert datas_epoch3 not in (datas_epoch2, datas_epoch1)
if __name__ == '__main__':
test_nlp_compress_data(add_and_remove_nlp_compress_file)
@ -1983,3 +2600,6 @@ if __name__ == '__main__':
test_write_with_multi_array_and_MindDataset()
test_numpy_generic()
test_write_with_float32_float64_float32_array_float64_array_and_MindDataset()
test_shuffle_with_global_infile_files(create_multi_mindrecord_files)
test_distributed_shuffle_with_global_infile_files(create_multi_mindrecord_files)
test_distributed_shuffle_with_multi_epochs(create_multi_mindrecord_files)