!11631 Add SubsetSampler

From: @hfarahat
Reviewed-by: 
Signed-off-by:
This commit is contained in:
mindspore-ci-bot 2021-01-29 03:48:45 +08:00 committed by Gitee
commit 20c1727b2c
21 changed files with 838 additions and 389 deletions

View File

@ -72,12 +72,17 @@ PYBIND_REGISTER(SequentialSamplerRT, 1, ([](const py::module *m) {
.def(py::init<int64_t, int64_t>());
}));
PYBIND_REGISTER(SubsetRandomSamplerRT, 1, ([](const py::module *m) {
(void)py::class_<SubsetRandomSamplerRT, SamplerRT, std::shared_ptr<SubsetRandomSamplerRT>>(
PYBIND_REGISTER(SubsetRandomSamplerRT, 2, ([](const py::module *m) {
(void)py::class_<SubsetRandomSamplerRT, SubsetSamplerRT, std::shared_ptr<SubsetRandomSamplerRT>>(
*m, "SubsetRandomSampler")
.def(py::init<int64_t, std::vector<int64_t>>());
}));
PYBIND_REGISTER(SubsetSamplerRT, 1, ([](const py::module *m) {
(void)py::class_<SubsetSamplerRT, SamplerRT, std::shared_ptr<SubsetSamplerRT>>(*m, "SubsetSampler")
.def(py::init<int64_t, std::vector<int64_t>>());
}));
PYBIND_REGISTER(WeightedRandomSamplerRT, 1, ([](const py::module *m) {
(void)py::class_<WeightedRandomSamplerRT, SamplerRT, std::shared_ptr<WeightedRandomSamplerRT>>(
*m, "WeightedRandomSampler")

View File

@ -61,8 +61,9 @@ PYBIND_REGISTER(
PYBIND_REGISTER(
ShardSample, 0, ([](const py::module *m) {
(void)py::class_<mindrecord::ShardSample, mindrecord::ShardOperator, std::shared_ptr<mindrecord::ShardSample>>(
*m, "MindrecordSubsetRandomSampler")
.def(py::init<std::vector<int64_t>, uint32_t>());
*m, "MindrecordSubsetSampler")
.def(py::init<std::vector<int64_t>, uint32_t>())
.def(py::init<std::vector<int64_t>>());
}));
PYBIND_REGISTER(ShardSequentialSample, 0, ([](const py::module *m) {

View File

@ -20,6 +20,7 @@
#include "minddata/dataset/engine/datasetops/source/sampler/random_sampler.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
#include "minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.h"
#include "minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h"
#include "minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h"
#include "minddata/dataset/engine/datasetops/source/sampler/pk_sampler.h"
@ -121,6 +122,16 @@ std::shared_ptr<SequentialSamplerObj> SequentialSampler(int64_t start_index, int
return sampler;
}
/// Function to create a Subset Random Sampler.
std::shared_ptr<SubsetSamplerObj> SubsetSampler(std::vector<int64_t> indices, int64_t num_samples) {
auto sampler = std::make_shared<SubsetSamplerObj>(std::move(indices), num_samples);
// Input validation
if (sampler->ValidateParams().IsError()) {
return nullptr;
}
return sampler;
}
/// Function to create a Subset Random Sampler.
std::shared_ptr<SubsetRandomSamplerObj> SubsetRandomSampler(std::vector<int64_t> indices, int64_t num_samples) {
auto sampler = std::make_shared<SubsetRandomSamplerObj>(std::move(indices), num_samples);
@ -340,11 +351,11 @@ std::shared_ptr<mindrecord::ShardOperator> SequentialSamplerObj::BuildForMindDat
}
#endif
// SubsetRandomSampler
SubsetRandomSamplerObj::SubsetRandomSamplerObj(std::vector<int64_t> indices, int64_t num_samples)
// SubsetSampler
SubsetSamplerObj::SubsetSamplerObj(std::vector<int64_t> indices, int64_t num_samples)
: indices_(std::move(indices)), num_samples_(num_samples) {}
Status SubsetRandomSamplerObj::ValidateParams() {
Status SubsetSamplerObj::ValidateParams() {
if (num_samples_ < 0) {
RETURN_STATUS_UNEXPECTED("SubsetRandomSampler: invalid num_samples: " + std::to_string(num_samples_));
}
@ -352,6 +363,26 @@ Status SubsetRandomSamplerObj::ValidateParams() {
return Status::OK();
}
std::shared_ptr<SamplerRT> SubsetSamplerObj::SamplerBuild() {
// runtime sampler object
auto sampler = std::make_shared<dataset::SubsetSamplerRT>(num_samples_, indices_);
BuildChildren(sampler);
return sampler;
}
#ifndef ENABLE_ANDROID
std::shared_ptr<mindrecord::ShardOperator> SubsetSamplerObj::BuildForMindDataset() {
// runtime mindrecord sampler object
auto mind_sampler = std::make_shared<mindrecord::ShardSample>(indices_);
return mind_sampler;
}
#endif
// SubsetRandomSampler
SubsetRandomSamplerObj::SubsetRandomSamplerObj(std::vector<int64_t> indices, int64_t num_samples)
: SubsetSamplerObj(std::move(indices), num_samples) {}
std::shared_ptr<SamplerRT> SubsetRandomSamplerObj::SamplerBuild() {
// runtime sampler object
auto sampler = std::make_shared<dataset::SubsetRandomSamplerRT>(num_samples_, indices_);

View File

@ -8,6 +8,7 @@ set(DATASET_ENGINE_DATASETOPS_SOURCE_SAMPLER_SRC_FILES
sampler.cc
sequential_sampler.cc
subset_random_sampler.cc
subset_sampler.cc
weighted_random_sampler.cc
)

View File

@ -28,99 +28,31 @@ namespace dataset {
// Constructor.
SubsetRandomSamplerRT::SubsetRandomSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices,
int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer), indices_(indices), sample_id_(0), buffer_id_(0) {}
: SubsetSamplerRT(num_samples, indices, samples_per_buffer) {}
// Initialized this Sampler.
Status SubsetRandomSamplerRT::InitSampler() {
if (is_initialized) {
return Status::OK();
}
CHECK_FAIL_RETURN_UNEXPECTED(
num_rows_ > 0, "Invalid parameter, num_rows must be greater than 0, but got " + std::to_string(num_rows_) + ".\n");
// Special value of 0 for num_samples means that the user wants to sample the entire set of data.
// In this case, the id's are provided by the user. Cap the num_samples on the number of id's given.
if (num_samples_ == 0 || num_samples_ > static_cast<int64_t>(indices_.size())) {
num_samples_ = static_cast<int64_t>(indices_.size());
}
// Initialize random generator with seed from config manager
rand_gen_.seed(GetSeed());
if (samples_per_buffer_ > num_samples_) {
samples_per_buffer_ = num_samples_;
}
// num_samples_ could be smaller than the total number of input id's.
// We will shuffle the full set of id's, but only select the first num_samples_ of them later.
std::shuffle(indices_.begin(), indices_.end(), rand_gen_);
is_initialized = true;
return Status::OK();
return SubsetSamplerRT::InitSampler();
}
// Reset the internal variable to the initial state.
Status SubsetRandomSamplerRT::ResetSampler() {
// Reset the internal counters.
sample_id_ = 0;
buffer_id_ = 0;
// Randomized the indices again.
rand_gen_.seed(GetSeed());
std::shuffle(indices_.begin(), indices_.end(), rand_gen_);
if (HasChildSampler()) {
RETURN_IF_NOT_OK(child_[0]->ResetSampler());
}
return Status::OK();
}
// Get the sample ids.
Status SubsetRandomSamplerRT::GetNextSample(std::unique_ptr<DataBuffer> *out_buffer) {
// All samples have been drawn
if (sample_id_ == num_samples_) {
(*out_buffer) = std::make_unique<DataBuffer>(buffer_id_++, DataBuffer::kDeBFlagEOE);
} else {
if (HasChildSampler()) {
RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_));
}
(*out_buffer) = std::make_unique<DataBuffer>(buffer_id_++, DataBuffer::kDeBFlagNone);
std::shared_ptr<Tensor> outputIds;
int64_t last_id = sample_id_ + samples_per_buffer_;
// Handling the return all samples at once, and when last draw is not a full batch.
if (last_id > num_samples_) {
last_id = num_samples_;
}
// Allocate tensor
RETURN_IF_NOT_OK(CreateSamplerTensor(&outputIds, last_id - sample_id_));
// Initialize tensor
auto id_ptr = outputIds->begin<int64_t>();
while (sample_id_ < last_id) {
if (indices_[sample_id_] >= num_rows_) {
std::string err_msg = "Generated indice is out of bound, expect range [0, num_data-1], got indice: " +
std::to_string(indices_[sample_id_]) + ", num_data: " + std::to_string(num_rows_ - 1);
RETURN_STATUS_UNEXPECTED(err_msg);
}
int64_t sampled_id = ((indices_[sample_id_] % num_rows_) + num_rows_) % num_rows_;
if (HasChildSampler()) {
RETURN_IF_NOT_OK(GetAssociatedChildId(&sampled_id, sampled_id));
}
*id_ptr = sampled_id;
id_ptr++;
sample_id_++;
}
// Create a TensorTable from that single tensor and push into DataBuffer
(*out_buffer)->set_tensor_table(std::make_unique<TensorQTable>(1, TensorRow(1, outputIds)));
}
return Status::OK();
return SubsetSamplerRT::ResetSampler();
}
void SubsetRandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
@ -134,19 +66,8 @@ void SubsetRandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const
Status SubsetRandomSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
RETURN_IF_NOT_OK(SubsetSamplerRT::to_json(&args));
args["sampler_name"] = "SubsetRandomSampler";
args["indices"] = indices_;
args["num_samples"] = num_samples_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
} // namespace dataset

View File

@ -21,39 +21,35 @@
#include <vector>
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h"
namespace mindspore {
namespace dataset {
// Randomly samples elements from a given list of indices, without replacement.
class SubsetRandomSamplerRT : public SamplerRT {
/// Randomly samples elements from a given list of indices, without replacement.
class SubsetRandomSamplerRT : public SubsetSamplerRT {
public:
// Constructor.
// @param num_samples The number of samples to draw. 0 for the full amount.
// @param indices List of indices from where we will randomly draw samples.
// @param samples_per_buffer The number of ids we draw on each call to GetNextBuffer().
// When samplesPerBuffer=0, GetNextBuffer() will draw all the sample ids and return them at once.
/// Constructor.
/// \param num_samples The number of samples to draw. 0 for the full amount.
/// \param indices List of indices from where we will randomly draw samples.
/// \param samples_per_buffer The number of ids we draw on each call to GetNextBuffer().
/// When samples_per_buffer=0, GetNextBuffer() will draw all the sample ids and return them at once.
SubsetRandomSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices,
std::int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
// Destructor.
/// Destructor.
~SubsetRandomSamplerRT() = default;
// Initialize the sampler.
// @return Status
/// Initialize the sampler.
/// \return Status
Status InitSampler() override;
// Reset the internal variable to the initial state and reshuffle the indices.
// @return Status
/// Reset the internal variable to the initial state and reshuffle the indices.
/// \return Status
Status ResetSampler() override;
// Get the sample ids.
// @param[out] out_buffer The address of a unique_ptr to DataBuffer where the sample ids will be placed.
// @note the sample ids (int64_t) will be placed in one Tensor and be placed into pBuffer.
Status GetNextSample(std::unique_ptr<DataBuffer> *out_buffer) override;
// Printer for debugging purposes.
// @param out - output stream to write to
// @param show_all - bool to show detailed vs summary
/// Printer for debugging purposes.
/// \param out - output stream to write to
/// \param show_all - bool to show detailed vs summary
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
@ -62,15 +58,6 @@ class SubsetRandomSamplerRT : public SamplerRT {
Status to_json(nlohmann::json *out_json) override;
private:
// A list of indices (already randomized in constructor).
std::vector<int64_t> indices_;
// Current sample id.
int64_t sample_id_;
// Current buffer id.
int64_t buffer_id_;
// A random number generator.
std::mt19937 rand_gen_;
};

View File

@ -0,0 +1,148 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h"
#include <algorithm>
#include <memory>
#include <string>
namespace mindspore {
namespace dataset {
// Constructor.
SubsetSamplerRT::SubsetSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices, int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer), indices_(indices), sample_id_(0), buffer_id_(0) {}
// Initialized this Sampler.
Status SubsetSamplerRT::InitSampler() {
if (is_initialized) {
return Status::OK();
}
CHECK_FAIL_RETURN_UNEXPECTED(
num_rows_ > 0, "Invalid parameter, num_rows must be greater than 0, but got " + std::to_string(num_rows_) + ".\n");
// Special value of 0 for num_samples means that the user wants to sample the entire set of data.
// In this case, the id's are provided by the user. Cap the num_samples on the number of id's given.
if (num_samples_ == 0 || num_samples_ > static_cast<int64_t>(indices_.size())) {
num_samples_ = static_cast<int64_t>(indices_.size());
}
if (samples_per_buffer_ > num_samples_) {
samples_per_buffer_ = num_samples_;
}
is_initialized = true;
return Status::OK();
}
// Reset the internal variable to the initial state.
Status SubsetSamplerRT::ResetSampler() {
// Reset the internal counters.
sample_id_ = 0;
buffer_id_ = 0;
if (HasChildSampler()) {
RETURN_IF_NOT_OK(child_[0]->ResetSampler());
}
return Status::OK();
}
// Get the sample ids.
Status SubsetSamplerRT::GetNextSample(std::unique_ptr<DataBuffer> *out_buffer) {
// All samples have been drawn
if (sample_id_ == num_samples_) {
(*out_buffer) = std::make_unique<DataBuffer>(buffer_id_++, DataBuffer::kDeBFlagEOE);
} else {
if (HasChildSampler()) {
RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_));
}
(*out_buffer) = std::make_unique<DataBuffer>(buffer_id_++, DataBuffer::kDeBFlagNone);
std::shared_ptr<Tensor> outputIds;
int64_t last_id = sample_id_ + samples_per_buffer_;
// Handling the return all samples at once, and when last draw is not a full batch.
if (last_id > num_samples_) {
last_id = num_samples_;
}
// Allocate tensor
RETURN_IF_NOT_OK(CreateSamplerTensor(&outputIds, last_id - sample_id_));
// Initialize tensor
auto id_ptr = outputIds->begin<int64_t>();
while (sample_id_ < last_id) {
if (indices_[sample_id_] >= num_rows_ || indices_[sample_id_] < 0) {
std::string err_msg = "Sample ID (" + std::to_string(indices_[sample_id_]) +
") is out of bound, expected range [0, " + std::to_string(num_rows_ - 1) + "]";
RETURN_STATUS_UNEXPECTED(err_msg);
}
int64_t sampled_id = ((indices_[sample_id_] % num_rows_) + num_rows_) % num_rows_;
if (HasChildSampler()) {
RETURN_IF_NOT_OK(GetAssociatedChildId(&sampled_id, sampled_id));
}
*id_ptr = sampled_id;
id_ptr++;
sample_id_++;
}
// Create a TensorTable from that single tensor and push into DataBuffer
(*out_buffer)->set_tensor_table(std::make_unique<TensorQTable>(1, TensorRow(1, outputIds)));
}
return Status::OK();
}
void SubsetSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
out << "\nSampler: SubsetSampler";
if (show_all) {
// Call the super class for displaying any common detailed info
SamplerRT::SamplerPrint(out, show_all);
// Then add our own info if any
}
}
Status SubsetSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["sampler_name"] = "SubsetSampler";
args["indices"] = indices_;
args["num_samples"] = num_samples_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
int64_t SubsetSamplerRT::CalculateNumSamples(int64_t num_rows) {
int64_t child_num_rows = num_rows;
if (!child_.empty()) {
child_num_rows = child_[0]->CalculateNumSamples(num_rows);
}
int64_t res = (num_samples_ > 0) ? std::min(child_num_rows, num_samples_) : child_num_rows;
res = std::min(res, static_cast<int64_t>(indices_.size()));
return res;
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,84 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_SAMPLER_SUBSET_SAMPLER_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_SAMPLER_SUBSET_SAMPLER_H_
#include <limits>
#include <memory>
#include <vector>
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
namespace mindspore {
namespace dataset {
/// Samples elements from a given list of indices.
class SubsetSamplerRT : public SamplerRT {
public:
/// Constructor.
/// \param num_samples The number of elements to sample. 0 for the full amount.
/// \param indices List of indices.
/// \param samples_per_buffer The number of ids we draw on each call to GetNextBuffer().
/// When samples_per_buffer=0, GetNextBuffer() will draw all the sample ids and return them at once.
SubsetSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices,
std::int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
/// Destructor.
~SubsetSamplerRT() = default;
/// Initialize the sampler.
/// \return Status
Status InitSampler() override;
/// Reset the internal variable to the initial state and reshuffle the indices.
/// \return Status
Status ResetSampler() override;
/// Get the sample ids.
/// \param[out] out_buffer The address of a unique_ptr to DataBuffer where the sample ids will be placed.
/// @note the sample ids (int64_t) will be placed in one Tensor and be placed into pBuffer.
Status GetNextSample(std::unique_ptr<DataBuffer> *out_buffer) override;
/// Printer for debugging purposes.
/// \param out - output stream to write to
/// \param show_all - bool to show detailed vs summary
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
/// Calculate num samples. Unlike GetNumSamples, it is not a getter and doesn't necessarily return the value of
/// num_samples_
/// \param num_rows the size of the dataset this sampler will be applied to.
/// \return number of samples
int64_t CalculateNumSamples(int64_t num_rows) override;
protected:
/// A list of indices (already randomized in constructor).
std::vector<int64_t> indices_;
private:
/// Current sample id.
int64_t sample_id_;
/// Current buffer id.
int64_t buffer_id_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_SAMPLER_SUBSET_SAMPLER_H_

View File

@ -86,6 +86,7 @@ class PKSamplerObj;
class PreBuiltSamplerObj;
class RandomSamplerObj;
class SequentialSamplerObj;
class SubsetSamplerObj;
class SubsetRandomSamplerObj;
class WeightedRandomSamplerObj;
@ -127,6 +128,13 @@ std::shared_ptr<RandomSamplerObj> RandomSampler(bool replacement = false, int64_
/// \return Shared pointer to the current Sampler.
std::shared_ptr<SequentialSamplerObj> SequentialSampler(int64_t start_index = 0, int64_t num_samples = 0);
/// Function to create a Subset Sampler.
/// \notes Samples the elements from a sequence of indices.
/// \param[in] indices - A vector sequence of indices.
/// \param[in] num_samples - The number of samples to draw (default to all elements).
/// \return Shared pointer to the current Sampler.
std::shared_ptr<SubsetSamplerObj> SubsetSampler(std::vector<int64_t> indices, int64_t num_samples = 0);
/// Function to create a Subset Random Sampler.
/// \notes Samples the elements randomly from a sequence of indices.
/// \param[in] indices - A vector sequence of indices.
@ -293,7 +301,34 @@ class SequentialSamplerObj : public SamplerObj {
int64_t num_samples_;
};
class SubsetRandomSamplerObj : public SamplerObj {
class SubsetSamplerObj : public SamplerObj {
public:
SubsetSamplerObj(std::vector<int64_t> indices, int64_t num_samples);
~SubsetSamplerObj() = default;
std::shared_ptr<SamplerRT> SamplerBuild() override;
std::shared_ptr<SamplerObj> SamplerCopy() override {
auto sampler = std::make_shared<SubsetSamplerObj>(indices_, num_samples_);
for (auto child : children_) {
sampler->AddChildSampler(child);
}
return sampler;
}
#ifndef ENABLE_ANDROID
std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override;
#endif
Status ValidateParams() override;
protected:
const std::vector<int64_t> indices_;
int64_t num_samples_;
};
class SubsetRandomSamplerObj : public SubsetSamplerObj {
public:
SubsetRandomSamplerObj(std::vector<int64_t> indices, int64_t num_samples);
@ -313,11 +348,7 @@ class SubsetRandomSamplerObj : public SamplerObj {
std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override;
#endif
Status ValidateParams() override;
private:
const std::vector<int64_t> indices_;
int64_t num_samples_;
};
class WeightedRandomSamplerObj : public SamplerObj {

View File

@ -77,7 +77,7 @@ enum TaskType {
kCommonTask = 0,
kPaddedTask = 1,
};
enum SamplerType { kCustomTopNSampler, kCustomTopPercentSampler, kSubsetRandomSampler, kPKSampler };
enum SamplerType { kCustomTopNSampler, kCustomTopPercentSampler, kSubsetRandomSampler, kPKSampler, kSubsetSampler };
enum ShuffleType { kShuffleCategory, kShuffleSample };
@ -144,7 +144,7 @@ const std::unordered_map<std::string, std::string> kTypesMap = {
{"float16", "float32"}, {"float32", "float32"}, {"float64", "float64"}, {"string", "string"}};
/// \brief split a string using a character
/// \param[in] field target string
/// \param[in] separator a character for spliting
/// \param[in] separator a character for splitting
/// \return vector type result
std::vector<std::string> StringSplit(const std::string &field, char separator);

View File

@ -34,12 +34,16 @@ class __attribute__((visibility("default"))) ShardSample : public ShardOperator
ShardSample(int num, int den, int par, int no_of_samples = 0, int offset = -1);
ShardSample(const std::vector<int64_t> &indices);
ShardSample(const std::vector<int64_t> &indices, uint32_t seed);
~ShardSample() override{};
MSRStatus Execute(ShardTask &tasks) override;
MSRStatus UpdateTasks(ShardTask &tasks, int taking);
MSRStatus SufExecute(ShardTask &tasks) override;
int64_t GetNumSamples(int64_t dataset_size, int64_t num_classes) override;

View File

@ -49,13 +49,16 @@ ShardSample::ShardSample(int num, int den, int par, int no_of_samples, int offse
sampler_type_(kCustomTopPercentSampler),
offset_(offset) {}
ShardSample::ShardSample(const std::vector<int64_t> &indices, uint32_t seed)
ShardSample::ShardSample(const std::vector<int64_t> &indices)
: numerator_(0),
denominator_(0),
partition_id_(0),
no_of_samples_(0),
indices_(indices),
sampler_type_(kSubsetRandomSampler) {
sampler_type_(kSubsetSampler) {}
ShardSample::ShardSample(const std::vector<int64_t> &indices, uint32_t seed) : ShardSample(indices) {
sampler_type_ = kSubsetRandomSampler;
shuffle_op_ = std::make_shared<ShardShuffle>(seed);
}
@ -71,55 +74,17 @@ int64_t ShardSample::GetNumSamples(int64_t dataset_size, int64_t num_classes) {
return dataset_size / denominator_ * numerator_ + 1;
}
}
if (sampler_type_ == kSubsetRandomSampler) {
if (sampler_type_ == kSubsetRandomSampler || sampler_type_ == kSubsetSampler) {
return indices_.size();
}
return 0;
}
MSRStatus ShardSample::Execute(ShardTask &tasks) {
if (offset_ != -1) {
int64_t old_v = 0;
int num_rows_ = static_cast<int>(tasks.Size());
for (int x = 0; x < denominator_; x++) {
int samples_per_buffer_ = (num_rows_ + offset_) / denominator_;
int remainder = (num_rows_ + offset_) % denominator_;
if (x < remainder) samples_per_buffer_++;
if (x < offset_) samples_per_buffer_--;
old_v += samples_per_buffer_;
// nums_per_shard_ is used to save the current shard's ending index
nums_per_shard_.push_back(old_v);
}
}
int no_of_categories = static_cast<int>(tasks.categories);
int total_no = static_cast<int>(tasks.Size()); // make sure task_size
int taking = 0;
if (sampler_type_ == kCustomTopNSampler) { // non sharding case constructor #1
no_of_samples_ = std::min(no_of_samples_, total_no);
taking = no_of_samples_ - no_of_samples_ % no_of_categories;
} else if (sampler_type_ == kSubsetRandomSampler) {
if (indices_.size() > total_no) {
MS_LOG(ERROR) << "parameter indices's size is greater than dataset size.";
return FAILED;
}
} else { // constructor TopPercent
if (numerator_ > 0 && denominator_ > 0 && numerator_ <= denominator_) {
if (numerator_ == 1 && denominator_ > 1) { // sharding
taking = (total_no + denominator_ - 1) / denominator_;
} else { // non sharding
taking = total_no * numerator_ / denominator_;
taking -= (taking % no_of_categories);
}
} else {
MS_LOG(ERROR) << "parameter numerator or denominator is illegal";
return FAILED;
}
}
MSRStatus ShardSample::UpdateTasks(ShardTask &tasks, int taking) {
if (tasks.permutation_.empty()) {
ShardTask new_tasks;
total_no = static_cast<int>(tasks.Size());
if (sampler_type_ == kSubsetRandomSampler) {
int total_no = static_cast<int>(tasks.Size());
if (sampler_type_ == kSubsetRandomSampler || sampler_type_ == kSubsetSampler) {
for (int i = 0; i < indices_.size(); ++i) {
int index = ((indices_[i] % total_no) + total_no) % total_no;
new_tasks.InsertTask(tasks.GetTaskByID(index)); // different mod result between c and python
@ -148,7 +113,7 @@ MSRStatus ShardSample::Execute(ShardTask &tasks) {
if (taking > static_cast<int>(tasks.permutation_.size())) {
return FAILED;
}
total_no = static_cast<int>(tasks.permutation_.size());
int total_no = static_cast<int>(tasks.permutation_.size());
int count = 0;
for (size_t i = partition_id_ * taking; i < (partition_id_ + 1) * taking; i++) {
if (no_of_samples_ != 0 && count == no_of_samples_) break;
@ -160,6 +125,48 @@ MSRStatus ShardSample::Execute(ShardTask &tasks) {
return SUCCESS;
}
MSRStatus ShardSample::Execute(ShardTask &tasks) {
if (offset_ != -1) {
int64_t old_v = 0;
int num_rows_ = static_cast<int>(tasks.Size());
for (int x = 0; x < denominator_; x++) {
int samples_per_buffer_ = (num_rows_ + offset_) / denominator_;
int remainder = (num_rows_ + offset_) % denominator_;
if (x < remainder) samples_per_buffer_++;
if (x < offset_) samples_per_buffer_--;
old_v += samples_per_buffer_;
// nums_per_shard_ is used to save the current shard's ending index
nums_per_shard_.push_back(old_v);
}
}
int no_of_categories = static_cast<int>(tasks.categories);
int total_no = static_cast<int>(tasks.Size()); // make sure task_size
int taking = 0;
if (sampler_type_ == kCustomTopNSampler) { // non sharding case constructor #1
no_of_samples_ = std::min(no_of_samples_, total_no);
taking = no_of_samples_ - no_of_samples_ % no_of_categories;
} else if (sampler_type_ == kSubsetRandomSampler || sampler_type_ == kSubsetSampler) {
if (indices_.size() > total_no) {
MS_LOG(ERROR) << "parameter indices's size is greater than dataset size.";
return FAILED;
}
} else { // constructor TopPercent
if (numerator_ > 0 && denominator_ > 0 && numerator_ <= denominator_) {
if (numerator_ == 1 && denominator_ > 1) { // sharding
taking = (total_no + denominator_ - 1) / denominator_;
} else { // non sharding
taking = total_no * numerator_ / denominator_;
taking -= (taking % no_of_categories);
}
} else {
MS_LOG(ERROR) << "parameter numerator or denominator is illegal";
return FAILED;
}
}
return UpdateTasks(tasks, taking);
}
MSRStatus ShardSample::SufExecute(ShardTask &tasks) {
if (sampler_type_ == kSubsetRandomSampler) {
if (SUCCESS != (*shuffle_op_)(tasks)) {

View File

@ -3321,7 +3321,7 @@ class MindDataset(MappableDataset):
logger.warning("WARN: global shuffle is not used.")
if sampler is not None:
if isinstance(sampler, (samplers.SubsetRandomSampler, samplers.PKSampler,
if isinstance(sampler, (samplers.SubsetRandomSampler, samplers.SubsetSampler, samplers.PKSampler,
samplers.DistributedSampler, samplers.RandomSampler,
samplers.SequentialSampler)) is False:
raise ValueError("The sampler is not supported yet.")
@ -3849,9 +3849,7 @@ class GeneratorDataset(MappableDataset):
if hasattr(self, "__total_batch__"):
new_op.__total_batch__ = self.__total_batch__
if new_op.sampler is not None and hasattr(self.source, "__getitem__"):
if isinstance(new_op.sampler, (samplers.SequentialSampler, samplers.DistributedSampler,
samplers.RandomSampler, samplers.SubsetRandomSampler,
samplers.WeightedRandomSampler, samplers.Sampler)):
if isinstance(new_op.sampler, samplers.BuiltinSampler):
if new_op.num_parallel_workers > 1:
sample_fn = SamplerFn(self.source, new_op.num_parallel_workers, self.python_multiprocessing)
new_op.source = (lambda sample_ids: _cpp_sampler_fn_mp(sample_ids, sample_fn))

View File

@ -25,103 +25,6 @@ import mindspore._c_dataengine as cde
import mindspore.dataset as ds
class Sampler:
"""
Base class for user defined sampler.
A user defined sampler can be used with any existing dataset with sampler support.
A required _iter_() method should by overridden by the user for sample index generation.
An optional reset() method can be overridden for per repeat reset,
dataset_size and num_samples will be set by dataset once a dataset iterator is created.
Examples:
>>> import mindspore.dataset as ds
>>>
>>> class ReverseSampler(ds,Sampler):
>>> def __iter__(self):
>>> for i in range(self.dataset_size - 1, -1, -1):
>>> yield i
>>>
>>> ds = ds.ImageFolderDataset(path, sampler=ReverseSampler())
"""
def __init__(self, num_samples=None):
self.dataset_size = 0
self.child_sampler = None
self.num_samples = num_samples
def __iter__(self):
"""
User defined iterator, must be overridden.
_handshake is guaranteed to be called prior to iterator construction.
"""
raise NotImplementedError
def reset(self):
"""
Per repeat reset callback, override this method if necessary
"""
# Initialization handshake callback
# Do not override this method!
def _handshake(self, ds_size, num_samples):
self.dataset_size = ds_size
self.num_samples = num_samples
# Indices fetcher
# Do not override this method!
def _get_indices(self):
sampler_iter = iter(self)
ret = []
for _ in range(self.num_samples):
try:
idx = next(sampler_iter)
ret.append(idx)
except StopIteration:
break
return np.array(ret)
# Instance fetcher
# Do not override this method!
def create(self):
num_samples = self.num_samples if self.num_samples is not None else 0
c_sampler = cde.PythonSampler(num_samples, self)
c_child_sampler = self.create_child()
c_sampler.add_child(c_child_sampler)
return c_sampler
def add_child(self, sampler):
self.child_sampler = sampler
def get_child(self):
return self.child_sampler
def create_child(self):
c_child_sampler = None
if self.child_sampler is not None:
c_child_sampler = self.child_sampler.create()
return c_child_sampler
def is_shuffled(self):
if self.child_sampler is None:
return False
return self.child_sampler.is_shuffled()
def is_sharded(self):
if self.child_sampler is None:
return False
return self.child_sampler.is_sharded()
def get_num_samples(self):
if self.num_samples is None:
return None
return self._get_indices().size
class BuiltinSampler:
"""
Base class for BuiltinSampler.
@ -231,6 +134,89 @@ class BuiltinSampler:
return self.num_samples
class Sampler(BuiltinSampler):
"""
Base class for user defined sampler.
A user defined sampler can be used with any existing dataset with sampler support.
A required _iter_() method should by overridden by the user for sample index generation.
An optional reset() method can be overridden for per repeat reset,
dataset_size and num_samples will be set by dataset once a dataset iterator is created.
Examples:
>>> import mindspore.dataset as ds
>>>
>>> class ReverseSampler(ds,Sampler):
>>> def __iter__(self):
>>> for i in range(self.dataset_size - 1, -1, -1):
>>> yield i
>>>
>>> ds = ds.ImageFolderDataset(path, sampler=ReverseSampler())
"""
def __init__(self, num_samples=None):
super().__init__(num_samples)
self.dataset_size = 0
def __iter__(self):
"""
User defined iterator, must be overridden.
_handshake is guaranteed to be called prior to iterator construction.
"""
raise NotImplementedError
def reset(self):
"""
Per repeat reset callback, override this method if necessary
"""
# Initialization handshake callback
# Do not override this method!
def _handshake(self, ds_size, num_samples):
self.dataset_size = ds_size
self.num_samples = num_samples
# Indices fetcher
# Do not override this method!
def _get_indices(self):
sampler_iter = iter(self)
ret = []
for _ in range(self.num_samples):
try:
idx = next(sampler_iter)
ret.append(idx)
except StopIteration:
break
return np.array(ret)
# Instance fetcher
# Do not override this method!
def create(self):
num_samples = self.num_samples if self.num_samples is not None else 0
c_sampler = cde.PythonSampler(num_samples, self)
c_child_sampler = self.create_child()
c_sampler.add_child(c_child_sampler)
return c_sampler
def is_shuffled(self):
if self.child_sampler is None:
return False
return self.child_sampler.is_shuffled()
def is_sharded(self):
if self.child_sampler is None:
return False
return self.child_sampler.is_sharded()
def get_num_samples(self):
if self.num_samples is None:
return None
return self._get_indices().size
class DistributedSampler(BuiltinSampler):
"""
A sampler that accesses a shard of the dataset.
@ -518,7 +504,69 @@ class SequentialSampler(BuiltinSampler):
return self.child_sampler.is_sharded()
class SubsetRandomSampler(BuiltinSampler):
class SubsetSampler(BuiltinSampler):
"""
Samples the elements from a sequence of indices.
Args:
indices (list[int]): A sequence of indices.
num_samples (int, optional): Number of elements to sample (default=None, all elements).
Examples:
>>> import mindspore.dataset as ds
>>>
>>> dataset_dir = "path/to/imagefolder_directory"
>>>
>>> indices = [0, 1, 2, 3, 7, 88, 119]
>>>
>>> # creates a SubsetSampler, will sample from the provided indices
>>> sampler = ds.SubsetSampler(indices)
>>> data = ds.ImageFolderDataset(dataset_dir, num_parallel_workers=8, sampler=sampler)
"""
def __init__(self, indices, num_samples=None):
if num_samples is not None:
if num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples: {}.".format(num_samples))
if not isinstance(indices, list):
indices = [indices]
self.indices = indices
super().__init__(num_samples)
def create(self):
num_samples = self.num_samples if self.num_samples is not None else 0
c_sampler = cde.SubsetSampler(num_samples, self.indices)
c_child_sampler = self.create_child()
c_sampler.add_child(c_child_sampler)
return c_sampler
def is_shuffled(self):
return False
def is_sharded(self):
if self.child_sampler is None:
return False
return self.child_sampler.is_sharded()
def create_for_minddataset(self):
c_sampler = cde.MindrecordSubsetSampler(self.indices)
c_child_sampler = self.create_child_for_minddataset()
c_sampler.add_child(c_child_sampler)
return c_sampler
def get_num_samples(self):
num_samples = super().get_num_samples()
if num_samples is None:
return len(self.indices)
return min(len(self.indices), num_samples)
class SubsetRandomSampler(SubsetSampler):
"""
Samples the elements randomly from a sequence of indices.
@ -538,18 +586,6 @@ class SubsetRandomSampler(BuiltinSampler):
>>> data = ds.ImageFolderDataset(dataset_dir, num_parallel_workers=8, sampler=sampler)
"""
def __init__(self, indices, num_samples=None):
if num_samples is not None:
if num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples: {}.".format(num_samples))
if not isinstance(indices, list):
indices = [indices]
self.indices = indices
super().__init__(num_samples)
def create(self):
num_samples = self.num_samples if self.num_samples is not None else 0
c_sampler = cde.SubsetRandomSampler(num_samples, self.indices)
@ -560,25 +596,12 @@ class SubsetRandomSampler(BuiltinSampler):
def is_shuffled(self):
return True
def is_sharded(self):
if self.child_sampler is None:
return False
return self.child_sampler.is_sharded()
def create_for_minddataset(self):
c_sampler = cde.MindrecordSubsetRandomSampler(self.indices, ds.config.get_seed())
c_sampler = cde.MindrecordSubsetSampler(self.indices, ds.config.get_seed())
c_child_sampler = self.create_child_for_minddataset()
c_sampler.add_child(c_child_sampler)
return c_sampler
def get_num_samples(self):
num_samples = super().get_num_samples()
if num_samples is None:
return len(self.indices)
return min(len(self.indices), num_samples)
class WeightedRandomSampler(BuiltinSampler):
"""

View File

@ -410,9 +410,7 @@ def check_generatordataset(method):
if sampler is not None:
if isinstance(sampler, samplers.PKSampler):
raise ValueError("GeneratorDataset doesn't support PKSampler.")
if not isinstance(sampler, (samplers.SequentialSampler, samplers.DistributedSampler,
samplers.RandomSampler, samplers.SubsetRandomSampler,
samplers.WeightedRandomSampler, samplers.Sampler)):
if not isinstance(sampler, samplers.BuiltinSampler):
try:
iter(sampler)
except TypeError:

View File

@ -119,6 +119,7 @@ SET(DE_UT_SRCS
status_test.cc
storage_container_test.cc
subset_random_sampler_test.cc
subset_sampler_test.cc
swap_red_blue_test.cc
take_op_test.cc
task_manager_test.cc

View File

@ -242,8 +242,11 @@ TEST_F(MindDataTestPipeline, TestMindDataSuccess6) {
std::shared_ptr<Dataset> ds5 = MindData(file_list, {}, SubsetRandomSampler({0, 1, 2}, 10));
EXPECT_NE(ds5, nullptr);
std::vector<std::shared_ptr<Dataset>> ds = {ds1, ds2, ds3, ds4, ds5};
std::vector<int32_t> expected_samples = {5, 5, 2, 3, 3};
std::shared_ptr<Dataset> ds6 = MindData(file_list, {}, SubsetSampler({1, 2}, 10));
EXPECT_NE(ds5, nullptr);
std::vector<std::shared_ptr<Dataset>> ds = {ds1, ds2, ds3, ds4, ds5, ds6};
std::vector<int32_t> expected_samples = {5, 5, 2, 3, 3, 2};
for (int32_t i = 0; i < ds.size(); i++) {
// Create an iterator over the result of the above dataset

View File

@ -42,6 +42,9 @@ TEST_F(MindDataTestPipeline, TestImageFolderWithSamplers) {
EXPECT_NE(sampl, nullptr);
std::vector<int64_t> indices = {1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23};
sampl = SubsetSampler(indices);
EXPECT_NE(sampl, nullptr);
sampl = SubsetRandomSampler(indices);
EXPECT_NE(sampl, nullptr);
@ -138,7 +141,7 @@ TEST_F(MindDataTestPipeline, TestCalculateNumSamples) {
EXPECT_NE(sampl4, nullptr);
std::shared_ptr<SamplerRT> sampler_rt4 = sampl4->SamplerBuild();
sampler_rt4->AddChild(sampler_rt3);
EXPECT_EQ(sampler_rt4->CalculateNumSamples(num_rows), 12);
EXPECT_EQ(sampler_rt4->CalculateNumSamples(num_rows), 11);
// Child doesn't have num_samples
std::shared_ptr<SamplerObj> sampl5 = RandomSampler(false);

View File

@ -0,0 +1,144 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/common.h"
#include "gtest/gtest.h"
#include "minddata/dataset/core/constants.h"
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h"
#include "minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h"
#include <vector>
#include <unordered_set>
using namespace mindspore::dataset;
class MindDataTestSubsetSampler : public UT::Common {
public:
class DummyRandomAccessOp : public RandomAccessOp {
public:
DummyRandomAccessOp(int64_t num_rows) {
num_rows_ = num_rows; // base class
};
};
};
TEST_F(MindDataTestSubsetSampler, TestAllAtOnce) {
std::vector<int64_t> in({3, 1, 4, 0, 1});
std::unordered_set<int64_t> in_set(in.begin(), in.end());
int64_t num_samples = 0;
SubsetSamplerRT sampler(num_samples, in);
DummyRandomAccessOp dummyRandomAccessOp(5);
sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp);
std::unique_ptr<DataBuffer> db;
TensorRow row;
std::vector<int64_t> out;
ASSERT_EQ(sampler.GetNextSample(&db), Status::OK());
db->PopRow(&row);
for (const auto &t : row) {
for (auto it = t->begin<int64_t>(); it != t->end<int64_t>(); it++) {
out.push_back(*it);
}
}
ASSERT_EQ(in.size(), out.size());
for (int i = 0; i < in.size(); i++) {
ASSERT_EQ(in[i], out[i]);
}
ASSERT_EQ(sampler.GetNextSample(&db), Status::OK());
ASSERT_EQ(db->eoe(), true);
}
TEST_F(MindDataTestSubsetSampler, TestGetNextBuffer) {
int64_t total_samples = 100000 - 5;
int64_t samples_per_buffer = 10;
int64_t num_samples = 0;
std::vector<int64_t> input(total_samples, 1);
SubsetSamplerRT sampler(num_samples, input, samples_per_buffer);
DummyRandomAccessOp dummyRandomAccessOp(total_samples);
sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp);
std::unique_ptr<DataBuffer> db;
TensorRow row;
std::vector<int64_t> out;
ASSERT_EQ(sampler.GetNextSample(&db), Status::OK());
int epoch = 0;
while (!db->eoe()) {
epoch++;
db->PopRow(&row);
for (const auto &t : row) {
for (auto it = t->begin<int64_t>(); it != t->end<int64_t>(); it++) {
out.push_back(*it);
}
}
db.reset();
ASSERT_EQ(sampler.GetNextSample(&db), Status::OK());
}
ASSERT_EQ(epoch, (total_samples + samples_per_buffer - 1) / samples_per_buffer);
ASSERT_EQ(input.size(), out.size());
}
TEST_F(MindDataTestSubsetSampler, TestReset) {
std::vector<int64_t> in({0, 1, 2, 3, 4});
std::unordered_set<int64_t> in_set(in.begin(), in.end());
int64_t num_samples = 0;
SubsetSamplerRT sampler(num_samples, in);
DummyRandomAccessOp dummyRandomAccessOp(5);
sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp);
std::unique_ptr<DataBuffer> db;
TensorRow row;
std::vector<int64_t> out;
ASSERT_EQ(sampler.GetNextSample(&db), Status::OK());
db->PopRow(&row);
for (const auto &t : row) {
for (auto it = t->begin<int64_t>(); it != t->end<int64_t>(); it++) {
out.push_back(*it);
}
}
ASSERT_EQ(in.size(), out.size());
for (int i = 0; i < in.size(); i++) {
ASSERT_EQ(in[i], out[i]);
}
sampler.ResetSampler();
ASSERT_EQ(sampler.GetNextSample(&db), Status::OK());
ASSERT_EQ(db->eoe(), false);
db->PopRow(&row);
out.clear();
for (const auto &t : row) {
for (auto it = t->begin<int64_t>(); it != t->end<int64_t>(); it++) {
out.push_back(*it);
}
}
ASSERT_EQ(in.size(), out.size());
for (int i = 0; i < in.size(); i++) {
ASSERT_EQ(in[i], out[i]);
}
ASSERT_EQ(sampler.GetNextSample(&db), Status::OK());
ASSERT_EQ(db->eoe(), true);
}

View File

@ -61,6 +61,7 @@ def add_and_remove_cv_file():
os.remove("{}".format(x))
os.remove("{}.db".format(x))
def test_cv_minddataset_pk_sample_no_column(add_and_remove_cv_file):
"""tutorial for cv minderdataset."""
num_readers = 4
@ -101,6 +102,7 @@ def test_cv_minddataset_pk_sample_basic(add_and_remove_cv_file):
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
def test_cv_minddataset_pk_sample_shuffle(add_and_remove_cv_file):
"""tutorial for cv minderdataset."""
columns_list = ["data", "file_name", "label"]
@ -142,6 +144,7 @@ def test_cv_minddataset_pk_sample_shuffle_1(add_and_remove_cv_file):
num_iter += 1
assert num_iter == 5
def test_cv_minddataset_pk_sample_shuffle_2(add_and_remove_cv_file):
"""tutorial for cv minderdataset."""
columns_list = ["data", "file_name", "label"]
@ -182,6 +185,7 @@ def test_cv_minddataset_pk_sample_out_of_range_0(add_and_remove_cv_file):
num_iter += 1
assert num_iter == 15
def test_cv_minddataset_pk_sample_out_of_range_1(add_and_remove_cv_file):
"""tutorial for cv minderdataset."""
columns_list = ["data", "file_name", "label"]
@ -201,6 +205,7 @@ def test_cv_minddataset_pk_sample_out_of_range_1(add_and_remove_cv_file):
num_iter += 1
assert num_iter == 15
def test_cv_minddataset_pk_sample_out_of_range_2(add_and_remove_cv_file):
"""tutorial for cv minderdataset."""
columns_list = ["data", "file_name", "label"]
@ -226,22 +231,23 @@ def test_cv_minddataset_subset_random_sample_basic(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
indices = [1, 2, 3, 5, 7]
sampler = ds.SubsetRandomSampler(indices)
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 5
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 5
samplers = (ds.SubsetRandomSampler(indices), ds.SubsetSampler(indices))
for sampler in samplers:
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 5
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 5
def test_cv_minddataset_subset_random_sample_replica(add_and_remove_cv_file):
@ -249,22 +255,23 @@ def test_cv_minddataset_subset_random_sample_replica(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
indices = [1, 2, 2, 5, 7, 9]
sampler = ds.SubsetRandomSampler(indices)
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 6
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 6
samplers = ds.SubsetRandomSampler(indices), ds.SubsetSampler(indices)
for sampler in samplers:
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 6
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 6
def test_cv_minddataset_subset_random_sample_empty(add_and_remove_cv_file):
@ -272,22 +279,23 @@ def test_cv_minddataset_subset_random_sample_empty(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
indices = []
sampler = ds.SubsetRandomSampler(indices)
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 0
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 0
samplers = ds.SubsetRandomSampler(indices), ds.SubsetSampler(indices)
for sampler in samplers:
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 0
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 0
def test_cv_minddataset_subset_random_sample_out_of_range(add_and_remove_cv_file):
@ -295,44 +303,46 @@ def test_cv_minddataset_subset_random_sample_out_of_range(add_and_remove_cv_file
columns_list = ["data", "file_name", "label"]
num_readers = 4
indices = [1, 2, 4, 11, 13]
sampler = ds.SubsetRandomSampler(indices)
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 5
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 5
samplers = ds.SubsetRandomSampler(indices), ds.SubsetSampler(indices)
for sampler in samplers:
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 5
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 5
def test_cv_minddataset_subset_random_sample_negative(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
indices = [1, 2, 4, -1, -2]
sampler = ds.SubsetRandomSampler(indices)
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 5
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 5
samplers = ds.SubsetRandomSampler(indices), ds.SubsetSampler(indices)
for sampler in samplers:
data_set = ds.MindDataset(CV_FILE_NAME + "0", columns_list, num_readers,
sampler=sampler)
assert data_set.get_dataset_size() == 5
num_iter = 0
for item in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
logger.info(
"-------------- cv reader basic: {} ------------------------".format(num_iter))
logger.info(
"-------------- item[data]: {} -----------------------------".format(item["data"]))
logger.info(
"-------------- item[file_name]: {} ------------------------".format(item["file_name"]))
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
num_iter += 1
assert num_iter == 5
def test_cv_minddataset_random_sampler_basic(add_and_remove_cv_file):
@ -359,6 +369,7 @@ def test_cv_minddataset_random_sampler_basic(add_and_remove_cv_file):
assert num_iter == 10
assert new_dataset != [x['file_name'] for x in data]
def test_cv_minddataset_random_sampler_repeat(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
@ -392,6 +403,7 @@ def test_cv_minddataset_random_sampler_repeat(add_and_remove_cv_file):
assert epoch2_dataset not in (epoch1_dataset, epoch3_dataset)
assert epoch3_dataset not in (epoch1_dataset, epoch2_dataset)
def test_cv_minddataset_random_sampler_replacement(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
@ -412,6 +424,7 @@ def test_cv_minddataset_random_sampler_replacement(add_and_remove_cv_file):
num_iter += 1
assert num_iter == 5
def test_cv_minddataset_random_sampler_replacement_false_1(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
@ -432,6 +445,7 @@ def test_cv_minddataset_random_sampler_replacement_false_1(add_and_remove_cv_fil
num_iter += 1
assert num_iter == 2
def test_cv_minddataset_random_sampler_replacement_false_2(add_and_remove_cv_file):
columns_list = ["data", "file_name", "label"]
num_readers = 4
@ -472,7 +486,7 @@ def test_cv_minddataset_sequential_sampler_basic(add_and_remove_cv_file):
logger.info(
"-------------- item[label]: {} ----------------------------".format(item["label"]))
assert item['file_name'] == np.array(
data[num_iter+1]['file_name'], dtype='S')
data[num_iter + 1]['file_name'], dtype='S')
num_iter += 1
assert num_iter == 4
@ -501,6 +515,7 @@ def test_cv_minddataset_sequential_sampler_offeset(add_and_remove_cv_file):
num_iter += 1
assert num_iter == 10
def test_cv_minddataset_sequential_sampler_exceed_size(add_and_remove_cv_file):
data = get_data(CV_DIR_NAME, True)
columns_list = ["data", "file_name", "label"]
@ -671,7 +686,7 @@ def test_cv_minddataset_split_deterministic(add_and_remove_cv_file):
num_iter += 1
assert num_iter == 2
inter_dataset = [x for x in d1_dataset if x in d2_dataset]
assert inter_dataset == [] # intersection of d1 and d2
assert inter_dataset == [] # intersection of d1 and d2
def test_cv_minddataset_split_sharding(add_and_remove_cv_file):
@ -731,7 +746,7 @@ def test_cv_minddataset_split_sharding(add_and_remove_cv_file):
assert len(epoch2_dataset) == 4
assert len(epoch3_dataset) == 4
inter_dataset = [x for x in d1_shard1 if x in epoch1_dataset]
assert inter_dataset == [] # intersection of d1's shard1 and d1's shard2
assert inter_dataset == [] # intersection of d1's shard1 and d1's shard2
assert epoch1_dataset not in (epoch2_dataset, epoch3_dataset)
assert epoch2_dataset not in (epoch1_dataset, epoch3_dataset)
assert epoch3_dataset not in (epoch1_dataset, epoch2_dataset)
@ -777,6 +792,7 @@ def get_data(dir_name, sampler=False):
continue
return data_list
if __name__ == '__main__':
test_cv_minddataset_pk_sample_no_column(add_and_remove_cv_file)
test_cv_minddataset_pk_sample_basic(add_and_remove_cv_file)

View File

@ -165,7 +165,7 @@ def test_python_sampler():
assert list(sp1.get_indices()) == [0, 1, 2, 3, 4]
def test_subset_sampler():
def test_sequential_sampler2():
manifest_file = "../data/dataset/testManifestData/test5trainimgs.json"
map_ = {(172876, 0): 0, (54214, 0): 1, (54214, 1): 2, (173673, 0): 3, (64631, 1): 4}
@ -191,6 +191,48 @@ def test_subset_sampler():
assert test_config(4, None) == [4]
def test_subset_sampler():
def test_config(indices, num_samples=None, exception_msg=None):
def pipeline():
sampler = ds.SubsetSampler(indices, num_samples)
data = ds.NumpySlicesDataset(list(range(0, 10)), sampler=sampler)
dataset_size = data.get_dataset_size()
return [d[0] for d in data.create_tuple_iterator(num_epochs=1, output_numpy=True)], dataset_size
if exception_msg is None:
res, size = pipeline()
assert indices[:num_samples] == res
assert len(indices[:num_samples]) == size
else:
with pytest.raises(Exception) as error_info:
pipeline()
print(str(error_info))
assert exception_msg in str(error_info)
test_config([1, 2, 3])
test_config(list(range(10)))
test_config([0])
test_config([9])
test_config(list(range(0, 10, 2)))
test_config(list(range(1, 10, 2)))
test_config(list(range(9, 0, -1)))
test_config(list(range(9, 0, -2)))
test_config(list(range(8, 0, -2)))
test_config([0, 9, 3, 2])
test_config([0, 0, 0, 0])
test_config([0])
test_config([0, 9, 3, 2], num_samples=2)
test_config([0, 9, 3, 2], num_samples=5)
test_config([20], exception_msg="Sample ID (20) is out of bound, expected range [0, 9]")
test_config([10], exception_msg="Sample ID (10) is out of bound, expected range [0, 9]")
test_config([0, 9, 0, 500], exception_msg="Sample ID (500) is out of bound, expected range [0, 9]")
test_config([0, 9, -6, 2], exception_msg="Sample ID (-6) is out of bound, expected range [0, 9]")
# test_config([], exception_msg="Indices list is empty") # temporary until we check with MindDataset
test_config([0, 9, 3, 2], num_samples=0,
exception_msg="num_samples should be a positive integer value, but got num_samples: 0.")
def test_sampler_chain():
manifest_file = "../data/dataset/testManifestData/test5trainimgs.json"
map_ = {(172876, 0): 0, (54214, 0): 1, (54214, 1): 2, (173673, 0): 3, (64631, 1): 4}
@ -249,6 +291,7 @@ if __name__ == '__main__':
test_random_sampler_multi_iter(True)
test_sampler_py_api()
test_python_sampler()
test_sequential_sampler2()
test_subset_sampler()
test_sampler_chain()
test_add_sampler_invalid_input()