batch with padding implemented

support for 1 specific dimension to be None, added validator

fix various CI complains

another round of CI fixes

ci

refactor parts of the code

code refactor

ci fix

comments added, fix bugs

address review comments

address review comments

review cmts

added simple perf test script

update pad code

perf imprv
This commit is contained in:
Zirui Wu 2020-04-09 17:38:42 -04:00
parent 16930c562d
commit c2d364a573
10 changed files with 588 additions and 110 deletions

View File

@ -207,6 +207,8 @@ int DEPipeline::GetBatchSize() const { return batch_size_; }
int DEPipeline::GetRepeatCount() const { return repeat_num_; }
float ToFloat(const py::handle &handle) { return py::reinterpret_borrow<py::float_>(handle); }
int ToInt(const py::handle &handle) { return py::reinterpret_borrow<py::int_>(handle); }
bool ToBool(const py::handle &handle) { return py::reinterpret_borrow<py::bool_>(handle); }
@ -621,6 +623,21 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp>
if (key == "input_columns") {
(void)builder->SetColumnsToMap(ToStringVector(value));
}
if (key == "pad_info") {
std::map<std::string, std::pair<TensorShape, float>> pad_info;
for (auto p : py::reinterpret_borrow<py::dict>(value)) {
if (!p.second.is_none()) {
py::tuple tp = py::reinterpret_borrow<py::tuple>(p.second);
CHECK_FAIL_RETURN_UNEXPECTED(tp.size() == 2, "tuple in pad_info must be (list,int) or (list,float)");
TensorShape shape = tp[0].is_none() ? TensorShape::CreateUnknownRankShape() : TensorShape(tp[0]);
float pad_val = tp[1].is_none() ? 0 : ToFloat(tp[1]);
(void)pad_info.insert({ToString(p.first), {shape, pad_val}});
} else { // tuple is None
(void)pad_info.insert({ToString(p.first), {TensorShape({}), 0}});
}
}
(void)builder->SetPaddingMap(pad_info, true);
}
}
}

View File

@ -93,10 +93,10 @@ class Tensor {
// Copy raw data of a array based on shape and strides to the destination pointer
// @param dst Pointer to the destination array where the content is to be copied
// @param src Pointer to the source of stided array to be copied
// @param src Pointer to the source of strided array to be copied
// @param shape - shape of the source array
// @param strides - strides of the source array
// @param type_size - number of bytes needed to store one array elment's type
// @param type_size - number of bytes needed to store one array element's type
// @return Status Code
static Status CopyStridedArray(unsigned char *dst, unsigned char *src, std::vector<dsize_t> shape,
std::vector<dsize_t> strides, uint8_t type_size);
@ -138,10 +138,10 @@ class Tensor {
return Status::OK();
}
// fill tensor with Zeros
Status Zero() {
dsize_t size = SizeInBytes();
int retCode = memset_sp(StartAddr(), size, 0, size);
if (retCode != 0) return Status(StatusCode::kUnexpectedError, "Failed to fill tensor with zeroes.");
CHECK_FAIL_RETURN_UNEXPECTED(memset_sp(StartAddr(), size, 0, size) == 0, "Failed to fill tensor with zeroes.");
return Status::OK();
}
@ -154,10 +154,7 @@ class Tensor {
int64_t cellSize = type_.SizeInBytes();
if ((data_ != nullptr) && type_.IsCompatible<T>()) {
for (dsize_t i = 0; i < Size(); i++) {
int retCode = memcpy_s((data_ + i * cellSize), cellSize, &value, cellSize);
if (retCode != 0) {
return Status(StatusCode::kUnexpectedError, "Failed to fill tensor.");
}
CHECK_FAIL_RETURN_UNEXPECTED(memcpy_s((data_ + i * cellSize), cellSize, &value, cellSize) == 0, "memcpy err");
}
return Status::OK();
} else {

View File

@ -87,8 +87,12 @@ TensorShape::TensorShape(const TensorShape &shape) : raw_shape_(*GlobalContext::
TensorShape::TensorShape(py::list l) : raw_shape_(*GlobalContext::Instance()->int_allocator()) {
std::vector<dsize_t> list_c;
for (auto i : l) {
list_c.push_back(i.cast<int>());
for (auto &i : l) {
if (!i.is_none()) {
list_c.push_back(i.cast<int>());
} else {
list_c.push_back(TensorShape::kDimUnknown);
}
}
AddListToShape(list_c);
}

View File

@ -65,6 +65,10 @@ class TensorShape {
// @param shape
TensorShape(const TensorShape &shape);
// construct a TensorShape via a python list
// @param py::list l - a list object from python
explicit TensorShape(py::list l);
~TensorShape() = default;
// Create a scalar Shape (i.e., empty shape with mKnown = true)
@ -142,8 +146,6 @@ class TensorShape {
return out;
}
explicit TensorShape(py::list l);
py::list AsPyList();
// Checks if the given index is a valid index for this tensor.

View File

@ -14,15 +14,20 @@
* limitations under the License.
*/
#include "dataset/engine/datasetops/batch_op.h"
#include <utility>
#include <iomanip>
#include "common/utils.h"
#include "dataset/core/pybind_support.h"
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
using float16 = Eigen::half;
namespace mindspore {
namespace dataset {
BatchOp::Builder::Builder(int32_t batch_size) : builder_drop_(false) {
BatchOp::Builder::Builder(int32_t batch_size) : builder_drop_(false), builder_pad_(false), builder_pad_map_({}) {
builder_batch_size_ = batch_size;
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
builder_num_workers_ = cfg->num_parallel_workers();
@ -31,8 +36,9 @@ BatchOp::Builder::Builder(int32_t batch_size) : builder_drop_(false) {
Status BatchOp::Builder::Build(std::shared_ptr<BatchOp> *ptr) {
RETURN_IF_NOT_OK(SanityCheck());
*ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_op_connector_size_, builder_num_workers_,
builder_cols_to_map_, builder_batch_size_func_, builder_batch_map_func_);
*ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
builder_num_workers_, builder_cols_to_map_, builder_batch_size_func_,
builder_batch_map_func_, builder_pad_map_);
return Status::OK();
}
@ -44,14 +50,17 @@ Status BatchOp::Builder::SanityCheck() {
return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, common::SafeCStr(err));
}
BatchOp::BatchOp(int32_t batch_size, bool drop, int32_t op_queue_size, int32_t num_workers,
const std::vector<std::string> &cols_to_map, py::function batch_size_func, py::function batch_map_func)
BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
const std::vector<std::string> &cols_to_map, py::function batch_size_func, py::function batch_map_func,
std::map<std::string, std::pair<TensorShape, float>> pad_map)
: ParallelOp(num_workers, op_queue_size),
start_batch_size_(batch_size),
drop_(drop),
input_column_names_(cols_to_map),
pad_(pad),
pyfunc_column_names_(cols_to_map),
batch_size_func_(batch_size_func),
batch_map_func_(batch_map_func) {
batch_map_func_(batch_map_func),
pad_info_(pad_map) {
worker_queues_.Init(num_workers, op_queue_size);
}
@ -181,7 +190,8 @@ Status BatchOp::WorkerEntry(int32_t workerId) {
Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair,
std::unique_ptr<DataBuffer> *db) {
RETURN_UNEXPECTED_IF_NULL(table_pair.first);
if (!input_column_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
if (!pyfunc_column_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
if (pad_) RETURN_IF_NOT_OK(PadColumns(&table_pair)); // do padding if needed
(*db) = std::make_unique<DataBuffer>(table_pair.second.batch_num_, DataBuffer::kDeBFlagNone);
std::unique_ptr<TensorQTable> dest_table = std::make_unique<TensorQTable>();
RETURN_IF_NOT_OK(BatchRows(&table_pair.first, &dest_table, table_pair.first->size()));
@ -206,8 +216,8 @@ Status BatchOp::EoeReceived(int32_t) {
Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
TensorBatchTable input_table;
input_table.reserve(input_column_names_.size());
for (std::string col_name : input_column_names_) {
input_table.reserve(pyfunc_column_names_.size());
for (std::string col_name : pyfunc_column_names_) {
if (column_name_map_.find(col_name) == column_name_map_.end()) {
RETURN_STATUS_UNEXPECTED("column : '" + col_name + "' does not exist\n");
}
@ -225,8 +235,8 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>
RETURN_IF_NOT_OK(InvokeBatchMapFunc(&input_table, &output_table, table_pair->second));
// Write back to TensorQTable
for (size_t input_idx = 0; input_idx < input_column_names_.size(); input_idx++) {
size_t col_idx = static_cast<size_t>(column_name_map_[input_column_names_[input_idx]]);
for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) {
size_t col_idx = static_cast<size_t>(column_name_map_[pyfunc_column_names_[input_idx]]);
size_t row_id = 0;
for (TensorRow &row : *(table_pair->first)) {
row[col_idx] = std::move(output_table[input_idx][row_id++]);
@ -290,8 +300,8 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
py::object ret_py_obj = batch_map_func_(*input_args);
// Parse batch map return value
py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj);
if (ret_tuple.size() != input_column_names_.size() || !py::isinstance<py::tuple>(ret_tuple)) {
return Status(StatusCode::kPyFuncException, "Batch map function should return an tuple if size(input_columns)");
if (ret_tuple.size() != pyfunc_column_names_.size() || !py::isinstance<py::tuple>(ret_tuple)) {
return Status(StatusCode::kPyFuncException, "Batch map function should return a tuple");
}
for (size_t i = 0; i < ret_tuple.size(); i++) {
TensorBatch output_batch;
@ -311,5 +321,142 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
}
return Status(StatusCode::kOK);
}
Status BatchOp::PadTensor(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> *dst,
const std::vector<dsize_t> &pad_shape, float pad_val) {
CHECK_FAIL_RETURN_UNEXPECTED(src != nullptr && dst != nullptr, "tensor can't be nullptr");
if (src->Rank() == 0 || src->shape().AsVector() == pad_shape) {
(*dst) = src; // if no padding, copy the pointer
} else {
CHECK_FAIL_RETURN_UNEXPECTED(src->Rank() == pad_shape.size(), "Pad to diff rank not allowed");
RETURN_IF_NOT_OK(Tensor::CreateTensor(dst, TensorImpl::kFlexible, TensorShape(pad_shape), src->type()));
auto tensor_type = src->type().value();
if (pad_val == 0) { // if pad with zero, don't care what type it is
RETURN_IF_NOT_OK((*dst)->Zero());
} else if (tensor_type == DataType::DE_INT8) {
RETURN_IF_NOT_OK((*dst)->Fill<int8_t>(pad_val));
} else if (tensor_type == DataType::DE_BOOL) {
RETURN_IF_NOT_OK((*dst)->Fill<bool>(pad_val));
} else if (tensor_type == DataType::DE_UINT8) {
RETURN_IF_NOT_OK((*dst)->Fill<uint8_t>(pad_val));
} else if (tensor_type == DataType::DE_INT16) {
RETURN_IF_NOT_OK((*dst)->Fill<int16_t>(pad_val));
} else if (tensor_type == DataType::DE_FLOAT16) {
RETURN_IF_NOT_OK((*dst)->Fill<float16>(static_cast<float16>(pad_val)));
} else if (tensor_type == DataType::DE_UINT16) {
RETURN_IF_NOT_OK((*dst)->Fill<uint16_t>(pad_val));
} else if (tensor_type == DataType::DE_INT32) {
RETURN_IF_NOT_OK((*dst)->Fill<int32_t>(pad_val));
} else if (tensor_type == DataType::DE_UINT32) {
RETURN_IF_NOT_OK((*dst)->Fill<uint32_t>(pad_val));
} else if (tensor_type == DataType::DE_INT64) {
RETURN_IF_NOT_OK((*dst)->Fill<int64_t>(pad_val));
} else if (tensor_type == DataType::DE_UINT64) {
RETURN_IF_NOT_OK((*dst)->Fill<uint64_t>(pad_val));
} else if (tensor_type == DataType::DE_FLOAT32) {
RETURN_IF_NOT_OK((*dst)->Fill<float>(pad_val));
} else if (tensor_type == DataType::DE_FLOAT64) {
RETURN_IF_NOT_OK((*dst)->Fill<double>(pad_val));
} else {
RETURN_STATUS_UNEXPECTED("Incorrect/Unknown tensor type");
}
std::vector<dsize_t> cur_ind(src->Rank(), 0), src_s(src->Rank(), 1), dst_s(src->Rank(), 1);
for (dsize_t i = src->Rank() - 2; i >= 0; i--) {
src_s[i] = src->shape()[i + 1] * src_s[i + 1];
dst_s[i] = pad_shape[i + 1] * dst_s[i + 1];
}
RETURN_IF_NOT_OK(PadHelper(src, *dst, cur_ind, src_s, dst_s, 0));
}
return Status::OK();
} // namespace dataset
Status BatchOp::PadColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
RETURN_UNEXPECTED_IF_NULL(table_pair); // placeholder for now, might need this in the future
CHECK_FAIL_RETURN_UNEXPECTED(table_pair->first->front().size() == column_name_map_.size(), "col_name_map mismatch");
std::vector<float> pad_vals(column_name_map_.size(), 0); // value to pad each column's tensor with, default 0
std::set<int32_t> pad_cols;
// padded_shape provided by user, maximum shapes of current batch of tensors
std::vector<std::vector<dsize_t>> pad_shapes(column_name_map_.size()), max_shapes(column_name_map_.size());
RETURN_IF_NOT_OK(UnpackPadInfo(&pad_cols, &pad_vals, &pad_shapes));
// init each shape in max_shape to {-1,-1...} init each unspecified shape in pad_shape to -1 as well
for (size_t col_id : pad_cols) {
max_shapes[col_id] = std::vector<dsize_t>(table_pair->first->front()[col_id]->Rank(), -1);
if (pad_shapes[col_id].empty()) pad_shapes[col_id] = max_shapes[col_id]; // fill pad shape with -1
CHECK_FAIL_RETURN_UNEXPECTED(pad_shapes[col_id].size() == max_shapes[col_id].size(), "wrong rank in pad_shape");
}
// calculate maximum shape for each column that needs to be padded
for (const TensorRow &row : *(table_pair->first)) { // iterator each row in a batch
for (size_t col_id : pad_cols) { // iterator each tensor in a row
CHECK_FAIL_RETURN_UNEXPECTED(row[col_id]->Rank() == max_shapes[col_id].size(),
"Tensor to be padded together need to have the same rank");
for (size_t dim = 0; dim < row[col_id]->Rank(); dim++) { // pick the largest number in each dimension
max_shapes[col_id][dim] = std::max(max_shapes[col_id][dim], row[col_id]->shape()[dim]);
}
}
}
// if user sets a dimension to -1 (None in python), use the max value for current dimension
for (size_t col_id : pad_cols) {
for (size_t dim = 0; dim < pad_shapes[col_id].size(); dim++) {
if (pad_shapes[col_id][dim] < 0) pad_shapes[col_id][dim] = max_shapes[col_id][dim];
}
}
// call pad on each tensor that needs to be padded
for (TensorRow &row : *(table_pair->first)) {
for (size_t col_id : pad_cols) {
std::shared_ptr<Tensor> pad_tensor;
RETURN_IF_NOT_OK(PadTensor(row[col_id], &pad_tensor, pad_shapes[col_id], pad_vals[col_id]));
row[col_id] = pad_tensor;
}
}
return Status::OK();
}
Status BatchOp::UnpackPadInfo(std::set<int32_t> *pad_cols, std::vector<float> *pad_vals,
std::vector<std::vector<dsize_t>> *pad_shapes) {
if (pad_info_.empty()) { // if pad_info empty, pad every columns automatically
for (dsize_t col_id = 0; col_id < column_name_map_.size(); col_id++) {
pad_cols->insert(col_id);
}
} else {
for (auto p : pad_info_) {
CHECK_FAIL_RETURN_UNEXPECTED(column_name_map_.find(p.first) != column_name_map_.end(),
"no column exists with name:" + p.first);
dsize_t col_id = static_cast<dsize_t>(column_name_map_[p.first]);
CHECK_FAIL_RETURN_UNEXPECTED(col_id < pad_vals->size() && col_id < pad_shapes->size(), "col_id out of bound");
pad_cols->insert(col_id);
(*pad_vals)[col_id] = p.second.second; // set pad values
(*pad_shapes)[col_id] = p.second.first.AsVector(); // empty vector if shape is unknown
}
}
return Status::OK();
}
Status BatchOp::PadHelper(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> dst, std::vector<dsize_t> cur_ind,
const std::vector<dsize_t> &src_s, const std::vector<dsize_t> &dst_s, size_t cur_dim) {
if (cur_dim == src->Rank() - 1) { // if this is the last dimension, copy the data
uint8_t type_size = src->type().SizeInBytes();
size_t len = std::min(src->shape()[cur_dim], dst->shape()[cur_dim]) * type_size;
dsize_t src_flat_ind = 0, dst_flat_ind = 0;
for (size_t i = 0; i < src->Rank(); i++) {
src_flat_ind += src_s[i] * cur_ind[i];
dst_flat_ind += dst_s[i] * cur_ind[i];
}
unsigned char *src_addr = src->StartAddr() + src_flat_ind * type_size;
unsigned char *dst_addr = dst->StartAddr() + dst_flat_ind * type_size;
CHECK_FAIL_RETURN_UNEXPECTED(memcpy_s(dst_addr, len, src_addr, len) == 0, "memcpy error");
} else { // not the last dimension, keep doing recursion
dsize_t min_ind = std::min(dst->shape()[cur_dim], src->shape()[cur_dim]);
for (dsize_t i = 0; i < min_ind; i++) {
cur_ind[cur_dim] = i;
RETURN_IF_NOT_OK(PadHelper(src, dst, cur_ind, src_s, dst_s, cur_dim + 1));
}
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -16,8 +16,11 @@
#ifndef DATASET_ENGINE_DATASETOPS_BATCH_OP_H_
#define DATASET_ENGINE_DATASETOPS_BATCH_OP_H_
#include <algorithm>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
@ -44,10 +47,6 @@ class BatchOp : public ParallelOp {
// @param int32_t batch_size
explicit Builder(int32_t batch_size);
// Builder constructor for Batch, batch size function needs to be specified
// @param py::function batch_size_func
explicit Builder(py::function batch_size_func);
// Default destructor
~Builder() = default;
@ -67,6 +66,12 @@ class BatchOp : public ParallelOp {
return *this;
}
Builder &SetPaddingMap(const std::map<std::string, std::pair<TensorShape, float>> &pad_map, bool pad = true) {
builder_pad_ = pad;
builder_pad_map_ = pad_map;
return *this;
}
// set connector size for batch
// @param int32_t op_conn_size
// @return Builder & reference to builder class object
@ -109,11 +114,12 @@ class BatchOp : public ParallelOp {
Status SanityCheck();
bool builder_drop_;
bool builder_pad_;
int32_t builder_batch_size_;
int32_t builder_num_workers_;
int32_t builder_op_connector_size_;
std::vector<std::string> builder_cols_to_map_;
std::map<std::string, std::pair<TensorShape, float>> builder_pad_map_;
py::function builder_batch_size_func_;
py::function builder_batch_map_func_;
};
@ -143,8 +149,9 @@ class BatchOp : public ParallelOp {
// @param int32_t op_queue_size
// @param int32_t rows_per_buf
// @param int32_t num_workers
BatchOp(int32_t batch_size, bool drop, int32_t op_queue_size, int32_t num_workers, const std::vector<std::string> &,
py::function batch_size_func, py::function batch_map_func);
BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
const std::vector<std::string> &, py::function batch_size_func, py::function batch_map_func,
std::map<std::string, std::pair<TensorShape, float>> pad_map);
// BatchOp destructor
~BatchOp() {}
@ -176,7 +183,28 @@ class BatchOp : public ParallelOp {
// @return Status - The error code return
Status operator()() override;
// Pad input tensor according pad_shape, need to have same rank.
// @param std::shared_ptr<Tensor> src - tensor to pad from
// @param std::shared_ptr<Tensor> *dst - return tensor padded
// @param std::vector<dsize_t> pad_shape - shape to pad to
// @param float pad_val - value to pad with
// @return - The error code return
Status PadTensor(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> *dst, const std::vector<dsize_t> &pad_shape,
float pad_val);
private:
// recursive helper function. This function could be very expensive if called on a multi-dimensional tensor
// it is only meant to be called by PadTensor.
// @tparam T - type of tensor and fill value
// @param std::shared_ptr<Tensor> src - Tensor to pad from
// @param std::shared_ptr<Tensor>* dst - Tensor to pad to, return value
// @param std::vector<dsize_t> cur_ind - recursion helper
// @param T pad_val - value to pad tensor with
// @param size_t cur_dim - recursion helper
// @return Status - The error code return
Status PadHelper(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> dst, std::vector<dsize_t> cur_ind,
const std::vector<dsize_t> &src_s, const std::vector<dsize_t> &dst_s, size_t cur_dim = 0);
// Worker thread for doing the memcpy of batch
// @param int32_t param workerId
// @return Status - The error code return
@ -199,6 +227,16 @@ class BatchOp : public ParallelOp {
// @return Status - The error code return
Status MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair);
// @param std::set<int32_t> *cols, col ids to perform pad on
// @param std::vector<float> *vals, default padding value for each column
// @param std::vector<std::vector<dsize_t>> *shapes, padding shape specified by user
// @return Status - The error code return
Status UnpackPadInfo(std::set<int32_t> *cols, std::vector<float> *vals, std::vector<std::vector<dsize_t>> *shapes);
// @param table_pair
// @return Status - The error code return
Status PadColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair);
// the number of thread pulling from the mOutConnector of the Op below
// @return int32_t, 1
int32_t num_consumers() const override { return 1; }
@ -220,19 +258,15 @@ class BatchOp : public ParallelOp {
Status InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info);
int32_t start_batch_size_;
bool drop_;
// Name of the columns to perform map op on
std::vector<std::string> input_column_names_;
// Iterator for fetching
std::unique_ptr<ChildIterator> child_iterator_;
// Map of column_name: column_index
std::unordered_map<std::string, int32_t> column_name_map_;
// Internal queue for task distribution
QueueList<std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>> worker_queues_;
// Function pointer of batch size function
py::function batch_size_func_;
// Function pointer of per batch map function
py::function batch_map_func_;
bool drop_; // bool for whether to drop remainder or not
bool pad_; // bool for whether to perform padding on tensor
std::vector<std::string> pyfunc_column_names_; // Name of the columns to perform map op on
std::map<std::string, std::pair<TensorShape, float>> pad_info_; // column names to perform padding on
std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1
std::unordered_map<std::string, int32_t> column_name_map_; // Map of column_name: column_index
QueueList<std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>> worker_queues_; // internal queue for syncing worker
py::function batch_size_func_; // Function pointer of batch size function
py::function batch_map_func_; // Function pointer of per batch map function
};
} // namespace dataset
} // namespace mindspore

View File

@ -40,7 +40,8 @@ from mindspore._c_expression import typing
from mindspore import log as logger
from . import samplers
from .iterators import DictIterator, TupleIterator
from .validators import check, check_batch, check_shuffle, check_map, check_filter, check_repeat, check_skip, check_zip, check_rename, \
from .validators import check, check_batch, check_shuffle, check_map, check_filter, check_repeat, check_skip, check_zip, \
check_rename, \
check_take, check_project, check_imagefolderdatasetv2, check_mnist_cifar_dataset, check_manifestdataset, \
check_tfrecorddataset, check_vocdataset, check_celebadataset, check_minddataset, check_generatordataset, \
check_sync_wait, check_zip_dataset, check_add_column, check_textfiledataset
@ -163,7 +164,7 @@ class Dataset:
@check_batch
def batch(self, batch_size, drop_remainder=False, num_parallel_workers=None, per_batch_map=None,
input_columns=None):
input_columns=None, pad_info=None):
"""
Combines batch_size number of consecutive rows into batches.
@ -181,7 +182,7 @@ class Dataset:
drop_remainder (bool, optional): Determines whether or not to drop the last
possibly incomplete batch (default=False). If True, and if there are less
than batch_size rows available to make the last batch, then those rows will
be dropped and not propogated to the child node.
be dropped and not propagated to the child node.
num_parallel_workers (int, optional): Number of workers to process the Dataset in parallel (default=None).
per_batch_map (callable, optional): Per batch map callable. A callable which takes
(list[Tensor], list[Tensor], ..., BatchInfo) as input parameters. Each list[Tensor] represent a batch of
@ -189,6 +190,8 @@ class Dataset:
last parameter of the callable should always be a BatchInfo object.
input_columns (list of string, optional): List of names of the input columns. The size of the list should
match with signature of per_batch_map callable.
pad_info (dict, optional): Whether to perform padding on selected columns. pad_info={"col1":([224,224],0)}
would pad column with name "col1" to a tensor of size [224,224] and fill the missing with 0.
Returns:
BatchDataset, dataset batched.
@ -200,7 +203,8 @@ class Dataset:
>>> # and drops the last incomplete batch if there is one.
>>> data = data.batch(100, True)
"""
return BatchDataset(self, batch_size, drop_remainder, num_parallel_workers, per_batch_map, input_columns)
return BatchDataset(self, batch_size, drop_remainder, num_parallel_workers, per_batch_map, input_columns,
pad_info)
@check_sync_wait
def sync_wait(self, condition_name, num_batch=1, callback=None):
@ -1026,13 +1030,26 @@ class BatchDataset(DatasetOp):
Args:
input_dataset (Dataset): Input Dataset to be batched.
batch_size (int): The size of the batch.
drop_remainder (bool, optional): Whether drop the remainder batch of data (drop_remainder=False).
If True, the last incomplete batch will be dropped.
batch_size (int or function): The number of rows each batch is created with. An
int or callable which takes exactly 1 parameter, BatchInfo.
drop_remainder (bool, optional): Determines whether or not to drop the last
possibly incomplete batch (default=False). If True, and if there are less
than batch_size rows available to make the last batch, then those rows will
be dropped and not propagated to the child node.
num_parallel_workers (int, optional): Number of workers to process the Dataset in parallel (default=None).
per_batch_map (callable, optional): Per batch map callable. A callable which takes
(list[Tensor], list[Tensor], ..., BatchInfo) as input parameters. Each list[Tensor] represent a batch of
Tensors on a given column. The number of lists should match with number of entries in input_columns. The
last parameter of the callable should always be a BatchInfo object.
input_columns (list of string, optional): List of names of the input columns. The size of the list should
match with signature of per_batch_map callable.
pad_info (dict, optional): Whether to perform padding on selected columns. pad_info={"col1":([224,224],0)}
would pad column with name "col1" to a tensor of size [224,224] and fill the missing with 0.
"""
def __init__(self, input_dataset, batch_size, drop_remainder=False, num_parallel_workers=None,
per_batch_map=None, input_columns=None):
per_batch_map=None, input_columns=None, pad_info=None):
super().__init__(num_parallel_workers)
if BatchDataset._is_ancestor_of_repeat(input_dataset):
@ -1044,6 +1061,7 @@ class BatchDataset(DatasetOp):
self.drop_remainder = drop_remainder
self.per_batch_map = per_batch_map
self.input_columns = input_columns
self.pad_info = pad_info
self.input.append(input_dataset)
input_dataset.output.append(self)
self._input_indexs = input_dataset.input_indexs
@ -1054,6 +1072,7 @@ class BatchDataset(DatasetOp):
args["drop_remainder"] = self.drop_remainder
args["per_batch_map"] = self.per_batch_map
args["input_columns"] = self.input_columns
args["pad_info"] = self.pad_info
return args
def get_dataset_size(self):
@ -2702,6 +2721,7 @@ class TFRecordDataset(SourceDataset):
>>> # 3) get all rows from dataset_files with schema file "./schema.json":
>>> tfdataset = ds.TFRecordDataset(dataset_files=dataset_files, schema="./schema.json")
"""
@check_tfrecorddataset
def __init__(self, dataset_files, schema=None, columns_list=None, num_samples=None, num_parallel_workers=None,
shuffle=Shuffle.GLOBAL, num_shards=None, shard_id=None, shard_equal_rows=False):
@ -3551,6 +3571,7 @@ class CelebADataset(SourceDataset):
args["shard_id"] = self.shard_id
return args
class TextFileDataset(SourceDataset):
"""
A source dataset that reads and parses datasets stored on disk in text format.

View File

@ -324,6 +324,7 @@ def check_sampler_shuffle_shard_options(param_dict):
def check_imagefolderdatasetv2(method):
"""A wrapper that wrap a parameter checker to the original Dataset(ImageFolderDatasetV2)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -356,6 +357,7 @@ def check_imagefolderdatasetv2(method):
def check_mnist_cifar_dataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(ManifestDataset, Cifar10/100Dataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -382,6 +384,7 @@ def check_mnist_cifar_dataset(method):
def check_manifestdataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(ManifestDataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -414,6 +417,7 @@ def check_manifestdataset(method):
def check_tfrecorddataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(TFRecordDataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -444,6 +448,7 @@ def check_tfrecorddataset(method):
def check_vocdataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(VOCDataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -470,6 +475,7 @@ def check_vocdataset(method):
def check_celebadataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(CelebADataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -510,6 +516,7 @@ def check_celebadataset(method):
def check_minddataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(MindDataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -541,6 +548,7 @@ def check_minddataset(method):
def check_generatordataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(GeneratorDataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -628,8 +636,25 @@ def check_columns(columns, name):
raise TypeError("{} should be either a list of strings or a single string.".format(name))
def check_pad_info(key, val):
"""check the key and value pair of pad_info in batch"""
check_type(key, "key in pad_info", str)
if val is not None:
assert len(val) == 2, "value of pad_info should be a tuple of size 2"
check_type(val, "value in pad_info", tuple)
if val[0] is not None:
check_type(val[0], "pad_shape", list)
for dim in val[0]:
if dim is not None:
check_type(dim, "dim in pad_shape", int)
assert dim > 0, "pad shape should be positive integers"
if val[1] is not None:
check_type(val[1], "pad_value", (int, float))
def check_batch(method):
"""check the input arguments of batch."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -648,6 +673,14 @@ def check_batch(method):
check_param_type(nreq_param_bool, param_dict, bool)
if (param_dict.get('pad_info') is not None) and (param_dict.get('per_batch_map') is not None):
raise ValueError("pad_info and per_batch_map can't both be set")
if param_dict.get('pad_info') is not None:
check_type(param_dict["pad_info"], "pad_info", dict)
for k, v in param_dict.get('pad_info').items():
check_pad_info(k, v)
for param_name in nreq_param_columns:
param = param_dict.get(param_name)
if param is not None:
@ -687,6 +720,7 @@ def check_sync_wait(method):
def check_shuffle(method):
"""check the input arguments of shuffle."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -705,6 +739,7 @@ def check_shuffle(method):
def check_map(method):
"""check the input arguments of map."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -729,6 +764,7 @@ def check_map(method):
def check_filter(method):
""""check the input arguments of filter."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -749,6 +785,7 @@ def check_filter(method):
def check_repeat(method):
"""check the input arguments of repeat."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -764,6 +801,7 @@ def check_repeat(method):
def check_skip(method):
"""check the input arguments of skip."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -780,6 +818,7 @@ def check_skip(method):
def check_take(method):
"""check the input arguments of take."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -794,6 +833,7 @@ def check_take(method):
def check_zip(method):
"""check the input arguments of zip."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -811,6 +851,7 @@ def check_zip(method):
def check_zip_dataset(method):
"""check the input arguments of zip method in `Dataset`."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -830,6 +871,7 @@ def check_zip_dataset(method):
def check_rename(method):
"""check the input arguments of rename."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -849,6 +891,7 @@ def check_rename(method):
def check_project(method):
"""check the input arguments of project."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -876,6 +919,7 @@ def check_shape(shape, name):
def check_add_column(method):
"""check the input arguments of add_column."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)
@ -905,6 +949,7 @@ def check_add_column(method):
def check_textfiledataset(method):
"""A wrapper that wrap a parameter checker to the original Dataset(TextFileDataset)."""
@wraps(method)
def new_method(*args, **kwargs):
param_dict = make_param_dict(method, args, kwargs)

View File

@ -30,16 +30,14 @@ namespace common = mindspore::common;
namespace de = mindspore::dataset;
using namespace mindspore::dataset;
using mindspore::MsLogLevel::ERROR;
using mindspore::ExceptionType::NoExceptionType;
using mindspore::LogStream;
using mindspore::ExceptionType::NoExceptionType;
using mindspore::MsLogLevel::ERROR;
class MindDataTestBatchOp : public UT::DatasetOpTesting {
protected:
};
std::shared_ptr<de::BatchOp> Batch(int32_t batch_size = 1, bool drop = false, int rows_per_buf = 2) {
Status rc;
std::shared_ptr<de::BatchOp> op;
@ -93,10 +91,8 @@ TEST_F(MindDataTestBatchOp, TestSimpleBatch) {
rc = di.GetNextAsMap(&tensor_map);
EXPECT_TRUE(rc.IsOk());
std::shared_ptr<de::Tensor> t;
rc = de::Tensor::CreateTensor(&t,
TensorImpl::kFlexible, de::TensorShape({12, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) payload);
rc = de::Tensor::CreateTensor(&t, TensorImpl::kFlexible, de::TensorShape({12, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)payload);
EXPECT_TRUE(rc.IsOk());
// verify the actual data in Tensor is correct
EXPECT_EQ(*t == *tensor_map["col_sint64"], true);
@ -111,7 +107,6 @@ TEST_F(MindDataTestBatchOp, TestSimpleBatch) {
EXPECT_EQ(success, true);
}
TEST_F(MindDataTestBatchOp, TestRepeatBatchDropTrue) {
std::string schema_file = datasets_root_path_ + "/testBatchDataset";
bool success = false;
@ -125,20 +120,14 @@ TEST_F(MindDataTestBatchOp, TestRepeatBatchDropTrue) {
-9223372036854775807 - 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 9223372036854775807};
de::DatasetIterator di(tree);
std::shared_ptr<de::Tensor> t1, t2, t3;
rc = de::Tensor::CreateTensor(&t1,
TensorImpl::kFlexible, de::TensorShape({7, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) payload);
rc = de::Tensor::CreateTensor(&t1, TensorImpl::kFlexible, de::TensorShape({7, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)payload);
EXPECT_TRUE(rc.IsOk());
rc = de::Tensor::CreateTensor(&t2,
TensorImpl::kFlexible, de::TensorShape({7, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) (payload + 7));
rc = de::Tensor::CreateTensor(&t2, TensorImpl::kFlexible, de::TensorShape({7, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)(payload + 7));
EXPECT_TRUE(rc.IsOk());
rc = de::Tensor::CreateTensor(&t3,
TensorImpl::kFlexible, de::TensorShape({7, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) (payload + 2));
rc = de::Tensor::CreateTensor(&t3, TensorImpl::kFlexible, de::TensorShape({7, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)(payload + 2));
EXPECT_TRUE(rc.IsOk());
TensorMap tensor_map;
@ -163,7 +152,6 @@ TEST_F(MindDataTestBatchOp, TestRepeatBatchDropTrue) {
EXPECT_EQ(success, true);
}
TEST_F(MindDataTestBatchOp, TestRepeatBatchDropFalse) {
std::string schema_file = datasets_root_path_ + "/testBatchDataset";
bool success = false;
@ -177,25 +165,17 @@ TEST_F(MindDataTestBatchOp, TestRepeatBatchDropFalse) {
-9223372036854775807 - 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 9223372036854775807};
de::DatasetIterator di(tree);
std::shared_ptr<de::Tensor> t1, t2, t3, t4;
rc = de::Tensor::CreateTensor(&t1,
TensorImpl::kFlexible, de::TensorShape({7, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) payload);
rc = de::Tensor::CreateTensor(&t1, TensorImpl::kFlexible, de::TensorShape({7, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)payload);
EXPECT_TRUE(rc.IsOk());
rc = de::Tensor::CreateTensor(&t2,
TensorImpl::kFlexible, de::TensorShape({7, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) (payload + 7));
rc = de::Tensor::CreateTensor(&t2, TensorImpl::kFlexible, de::TensorShape({7, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)(payload + 7));
EXPECT_TRUE(rc.IsOk());
rc = de::Tensor::CreateTensor(&t3,
TensorImpl::kFlexible, de::TensorShape({7, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) (payload + 2));
rc = de::Tensor::CreateTensor(&t3, TensorImpl::kFlexible, de::TensorShape({7, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)(payload + 2));
EXPECT_TRUE(rc.IsOk());
rc = de::Tensor::CreateTensor(&t4,
TensorImpl::kFlexible, de::TensorShape({3, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) (payload + 9));
rc = de::Tensor::CreateTensor(&t4, TensorImpl::kFlexible, de::TensorShape({3, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)(payload + 9));
EXPECT_TRUE(rc.IsOk());
TensorMap tensor_map;
@ -224,7 +204,6 @@ TEST_F(MindDataTestBatchOp, TestRepeatBatchDropFalse) {
EXPECT_EQ(success, true);
}
TEST_F(MindDataTestBatchOp, TestBatchDropFalseRepeat) {
std::string schema_file = datasets_root_path_ + "/testBatchDataset";
bool success = false;
@ -238,15 +217,11 @@ TEST_F(MindDataTestBatchOp, TestBatchDropFalseRepeat) {
-9223372036854775807 - 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 9223372036854775807};
de::DatasetIterator di(tree);
std::shared_ptr<de::Tensor> t1, t2;
rc = de::Tensor::CreateTensor(&t1,
TensorImpl::kFlexible, de::TensorShape({7, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) payload);
rc = de::Tensor::CreateTensor(&t1, TensorImpl::kFlexible, de::TensorShape({7, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)payload);
EXPECT_TRUE(rc.IsOk());
rc = de::Tensor::CreateTensor(&t2,
TensorImpl::kFlexible, de::TensorShape({5, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) (payload + 7));
rc = de::Tensor::CreateTensor(&t2, TensorImpl::kFlexible, de::TensorShape({5, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)(payload + 7));
EXPECT_TRUE(rc.IsOk());
TensorMap tensor_map;
@ -275,7 +250,6 @@ TEST_F(MindDataTestBatchOp, TestBatchDropFalseRepeat) {
EXPECT_EQ(success, true);
}
TEST_F(MindDataTestBatchOp, TestBatchDropTrueRepeat) {
std::string schema_file = datasets_root_path_ + "/testBatchDataset";
bool success = false;
@ -289,15 +263,11 @@ TEST_F(MindDataTestBatchOp, TestBatchDropTrueRepeat) {
-9223372036854775807 - 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 9223372036854775807};
de::DatasetIterator di(tree);
std::shared_ptr<de::Tensor> t1, t2;
rc = de::Tensor::CreateTensor(&t1,
TensorImpl::kFlexible, de::TensorShape({5, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) payload);
rc = de::Tensor::CreateTensor(&t1, TensorImpl::kFlexible, de::TensorShape({5, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)payload);
EXPECT_TRUE(rc.IsOk());
rc = de::Tensor::CreateTensor(&t2,
TensorImpl::kFlexible, de::TensorShape({5, 1}),
de::DataType(DataType::DE_INT64),
(unsigned char *) (payload + 5));
rc = de::Tensor::CreateTensor(&t2, TensorImpl::kFlexible, de::TensorShape({5, 1}), de::DataType(DataType::DE_INT64),
(unsigned char *)(payload + 5));
EXPECT_TRUE(rc.IsOk());
TensorMap tensor_map;
@ -325,3 +295,31 @@ TEST_F(MindDataTestBatchOp, TestBatchDropTrueRepeat) {
}
EXPECT_EQ(success, true);
}
TEST_F(MindDataTestBatchOp, TestSimpleBatchPadding) {
std::string schema_file = datasets_root_path_ + "/testBatchDataset";
std::shared_ptr<BatchOp> op;
std::map<std::string, std::pair<TensorShape, float>> m;
m.insert({"col_1d", std::make_pair(TensorShape({4}), -1)});
de::BatchOp::Builder(12).SetDrop(false).SetPaddingMap(m, true).Build(&op);
auto tree = Build({Storage(schema_file), op});
tree->Prepare();
Status rc = tree->Launch();
if (rc.IsError()) {
MS_LOG(ERROR) << "Return code error detected during tree launch: " << rc.ToString() << ".";
} else {
int64_t payload[] = {-9223372036854775807 - 1, 1, -1, -1, 2, 3, -1, -1, 4, 5, -1, -1, 6, 7, -1, -1,
8, 9, -1, -1, 10, 11, -1, -1, 12, 13, -1, -1, 14, 15, -1, -1,
16, 17, -1, -1, 18, 19, -1, -1, 20, 21, -1, -1, 22, 23, -1, -1};
std::shared_ptr<de::Tensor> t;
rc = de::Tensor::CreateTensor(&t, TensorImpl::kFlexible, de::TensorShape({12, 4}), de::DataType(DataType::DE_INT64),
(unsigned char *)payload);
de::DatasetIterator di(tree);
TensorMap tensor_map;
rc = di.GetNextAsMap(&tensor_map);
EXPECT_TRUE((*t) == (*(tensor_map["col_1d"])));
rc = di.GetNextAsMap(&tensor_map);
EXPECT_TRUE(tensor_map.size() == 0);
EXPECT_TRUE(rc.IsOk());
}
}

View File

@ -0,0 +1,213 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import mindspore.dataset as ds
import numpy as np
import time
# This UT test tests the following cases
# 1. padding: input_shape=[x] output_shape=[y] where y > x
# 2. padding in one dimension and truncate in the other. input_shape=[x1,x2] output_shape=[y1,y2] y1>x1 and y2<x2
# 3. automatic padding for a specific column
# 4. default setting for all columns
# 5. test None in different places
# this generator function yield two columns
# col1d: [0],[1], [2], [3]
# col2d: [[100],[200]], [[101],[201]], [102],[202]], [103],[203]]
def gen_2cols(num):
for i in range(num):
yield (np.array([i]), np.array([[i + 100], [i + 200]]))
# this generator function yield one column of variable shapes
# col: [0], [0,1], [0,1,2], [0,1,2,3]
def gen_var_col(num):
for i in range(num):
yield (np.array([j for j in range(i + 1)]),)
# this generator function yield two columns of variable shapes
# col1: [0], [0,1], [0,1,2], [0,1,2,3]
# col2: [100], [100,101], [100,101,102], [100,110,102,103]
def gen_var_cols(num):
for i in range(num):
yield (np.array([j for j in range(i + 1)]), np.array([100 + j for j in range(i + 1)]))
# this generator function yield two columns of variable shapes
# col1: [[0]], [[0,1]], [[0,1,2]], [[0,1,2,3]]
# col2: [[100]], [[100,101]], [[100,101,102]], [[100,110,102,103]]
def gen_var_cols_2d(num):
for i in range(num):
yield (np.array([[j for j in range(i + 1)]]), np.array([[100 + j for j in range(i + 1)]]))
def test_batch_padding_01():
data1 = ds.GeneratorDataset((lambda: gen_2cols(2)), ["col1d", "col2d"])
data1 = data1.batch(batch_size=2, drop_remainder=False, pad_info={"col2d": ([2, 2], -2), "col1d": ([2], -1)})
data1 = data1.repeat(2)
for data in data1.create_dict_iterator():
assert np.array_equal([[0, -1], [1, -1]], data["col1d"])
assert np.array_equal([[[100, -2], [200, -2]], [[101, -2], [201, -2]]], data["col2d"])
def test_batch_padding_02():
data1 = ds.GeneratorDataset((lambda: gen_2cols(2)), ["col1d", "col2d"])
data1 = data1.batch(batch_size=2, drop_remainder=False, pad_info={"col2d": ([1, 2], -2)})
data1 = data1.repeat(2)
for data in data1.create_dict_iterator():
assert np.array_equal([[0], [1]], data["col1d"])
assert np.array_equal([[[100, -2]], [[101, -2]]], data["col2d"])
def test_batch_padding_03():
data1 = ds.GeneratorDataset((lambda: gen_var_col(4)), ["col"])
data1 = data1.batch(batch_size=2, drop_remainder=False, pad_info={"col": (None, -1)}) # pad automatically
data1 = data1.repeat(2)
res = dict()
for ind, data in enumerate(data1.create_dict_iterator()):
res[ind] = data["col"].copy()
assert np.array_equal(res[0], [[0, -1], [0, 1]])
assert np.array_equal(res[1], [[0, 1, 2, -1], [0, 1, 2, 3]])
assert np.array_equal(res[2], [[0, -1], [0, 1]])
assert np.array_equal(res[3], [[0, 1, 2, -1], [0, 1, 2, 3]])
def test_batch_padding_04():
data1 = ds.GeneratorDataset((lambda: gen_var_cols(2)), ["col1", "col2"])
data1 = data1.batch(batch_size=2, drop_remainder=False, pad_info={}) # pad automatically
data1 = data1.repeat(2)
for data in data1.create_dict_iterator():
assert np.array_equal(data["col1"], [[0, 0], [0, 1]])
assert np.array_equal(data["col2"], [[100, 0], [100, 101]])
def test_batch_padding_05():
data1 = ds.GeneratorDataset((lambda: gen_var_cols_2d(3)), ["col1", "col2"])
data1 = data1.batch(batch_size=3, drop_remainder=False,
pad_info={"col2": ([2, None], -2), "col1": (None, -1)}) # pad automatically
for data in data1.create_dict_iterator():
assert np.array_equal(data["col1"], [[[0, -1, -1]], [[0, 1, -1]], [[0, 1, 2]]])
assert np.array_equal(data["col2"], [[[100, -2, -2], [-2, -2, -2]], [[100, 101, -2], [-2, -2, -2]],
[[100, 101, 102], [-2, -2, -2]]])
def batch_padding_performance_3d():
cifar10_dir = "../data/dataset/testCifar10Data"
data1 = ds.Cifar10Dataset(cifar10_dir, shuffle=False) # shape = [32,32,3]
data1 = data1.repeat(24)
pad_info = {"image": ([36, 36, 3], 0)}
# pad_info = None
data1 = data1.batch(batch_size=24, drop_remainder=True, pad_info=pad_info)
start_time = time.time()
num_batches = 0
ret = []
for data in data1.create_dict_iterator():
num_batches += 1
res = "total number of batch:" + str(num_batches) + " time elapsed:" + str(time.time() - start_time)
# print(res)
def batch_padding_performance_1d():
cifar10_dir = "../data/dataset/testCifar10Data"
data1 = ds.Cifar10Dataset(cifar10_dir, shuffle=False) # shape = [32,32,3]
data1 = data1.repeat(24)
data1 = data1.map(input_columns="image", operations=(lambda x: x.reshape(-1)))
pad_info = {"image": ([3888], 0)} # 3888 =36*36*3
# pad_info = None
data1 = data1.batch(batch_size=24, drop_remainder=True, pad_info=pad_info)
start_time = time.time()
num_batches = 0
for data in data1.create_dict_iterator():
num_batches += 1
res = "total number of batch:" + str(num_batches) + " time elapsed:" + str(time.time() - start_time)
# print(res)
def batch_pyfunc_padding_3d():
cifar10_dir = "../data/dataset/testCifar10Data"
data1 = ds.Cifar10Dataset(cifar10_dir, shuffle=False) # shape = [32,32,3]
data1 = data1.repeat(24)
# pad_info = {"image": ([36, 36, 3], 0)}
data1 = data1.map(input_columns="image", operations=(lambda x: np.pad(x, ((0, 4), (0, 4), (0, 0)))),
python_multiprocessing=False)
data1 = data1.batch(batch_size=24, drop_remainder=True)
start_time = time.time()
num_batches = 0
for data in data1.create_dict_iterator():
num_batches += 1
res = "total number of batch:" + str(num_batches) + " time elapsed:" + str(time.time() - start_time)
# print(res)
def batch_pyfunc_padding_1d():
cifar10_dir = "../data/dataset/testCifar10Data"
data1 = ds.Cifar10Dataset(cifar10_dir, shuffle=False) # shape = [32,32,3]
data1 = data1.repeat(24)
data1 = data1.map(input_columns="image", operations=(lambda x: x.reshape(-1)))
data1 = data1.map(input_columns="image", operations=(lambda x: np.pad(x, (0, 816))), python_multiprocessing=False)
data1 = data1.batch(batch_size=24, drop_remainder=True)
start_time = time.time()
num_batches = 0
for data in data1.create_dict_iterator():
num_batches += 1
res = "total number of batch:" + str(num_batches) + " time elapsed:" + str(time.time() - start_time)
# print(res)
# this function runs pad_batch and numpy.pad then compare the results
def test_pad_via_map():
cifar10_dir = "../data/dataset/testCifar10Data"
def pad_map_config():
data1 = ds.Cifar10Dataset(cifar10_dir, shuffle=False, num_samples=1000) # shape = [32,32,3]
data1 = data1.map(input_columns="image", operations=(lambda x: x.reshape(-1))) # reshape to 1d
data1 = data1.map(input_columns="image", operations=(lambda x: np.pad(x, (0, 816))))
data1 = data1.batch(batch_size=25, drop_remainder=True)
res = []
for data in data1.create_dict_iterator():
res.append(data["image"])
return res
def pad_batch_config():
data2 = ds.Cifar10Dataset(cifar10_dir, shuffle=False, num_samples=1000) # shape = [32,32,3]
data2 = data2.map(input_columns="image", operations=(lambda x: x.reshape(-1))) # reshape to 1d
data2 = data2.batch(batch_size=25, drop_remainder=True, pad_info={"image": ([3888], 0)})
res = []
for data in data2.create_dict_iterator():
res.append(data["image"])
return res
res_from_map = pad_map_config()
res_from_batch = pad_batch_config()
assert len(res_from_batch) == len(res_from_batch)
for i in range(len(res_from_map)):
assert np.array_equal(res_from_map[i], res_from_batch[i])
if __name__ == '__main__':
test_batch_padding_01()
test_batch_padding_02()
test_batch_padding_03()
test_batch_padding_04()
test_batch_padding_05()
# batch_padding_performance_3d()
# batch_padding_performance_1d()
# batch_pyfunc_padding_3d()
# batch_pyfunc_padding_1d()
test_pad_via_map()