!66269 [MD] Optimize performance of TFRecordDataset

Merge pull request !66269 from xiaotianci/feature-2.3-tfrecord-opt
This commit is contained in:
i-robot 2024-03-11 04:05:31 +00:00 committed by Gitee
commit b301d5394e
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
85 changed files with 2295 additions and 418 deletions

View File

@ -1,5 +1,5 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
* Copyright 2021-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,9 +28,21 @@
namespace mindspore {
using VecChar = std::vector<char>;
inline std::vector<char> StringToChar(const std::string &s) { return std::vector<char>(s.begin(), s.end()); }
inline std::vector<char> StringToChar(const std::string &s) {
if (s.empty()) {
const auto empty = std::vector<char>();
return empty;
}
return std::vector<char>(s.begin(), s.end());
}
inline std::string CharToString(const std::vector<char> &c) { return std::string(c.begin(), c.end()); }
inline std::string CharToString(const std::vector<char> &c) {
if (c.empty()) {
const auto empty = "";
return empty;
}
return std::string(c.begin(), c.end());
}
inline std::pair<std::vector<char>, int32_t> PairStringToChar(const std::pair<std::string, int32_t> &s) {
return std::pair<std::vector<char>, int32_t>(std::vector<char>(s.first.begin(), s.first.end()), s.second);

View File

@ -26,8 +26,7 @@ CVTensor::CVTensor(std::shared_ptr<Tensor> tensor) : Tensor(std::move(*tensor))
Status CVTensor::CreateEmpty(const TensorShape &shape, DataType type, CVTensorPtr *out) {
RETURN_UNEXPECTED_IF_NULL(out);
const CVTensorAlloc *alloc = GlobalContext::Instance()->cv_tensor_allocator();
*out = std::allocate_shared<CVTensor>(*alloc, shape, type);
*out = std::make_shared<CVTensor>(shape, type);
RETURN_UNEXPECTED_IF_NULL(*out);
int64_t byte_size = (*out)->SizeInBytes();
// Don't allocate if we have a tensor with no elements.
@ -100,8 +99,7 @@ std::shared_ptr<CVTensor> CVTensor::AsCVTensor(std::shared_ptr<Tensor> t) {
if (cv_t != nullptr) {
return cv_t;
} else {
const CVTensorAlloc *alloc = GlobalContext::Instance()->cv_tensor_allocator();
return std::allocate_shared<CVTensor>(*alloc, t);
return std::make_shared<CVTensor>(t);
}
}

View File

@ -22,7 +22,6 @@
namespace mindspore {
namespace dataset {
uint8_t DataType::SizeInBytes() const {
if (type_ < DataType::NUM_OF_TYPES) {
return kTypeInfo[type_].sizeInBytes_;

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,6 +21,8 @@
#endif
#include <string>
#include <utility>
#ifdef ENABLE_MINDDATA_PYTHON
#include "pybind11/numpy.h"
#include "pybind11/pybind11.h"
@ -31,9 +33,9 @@ namespace py = pybind11;
#include "base/float16.h"
#endif
#include "minddata/dataset/include/dataset/constants.h"
namespace mindspore {
namespace dataset {
// Class that represents basic data types in DataEngine.
class DataType {
public:
@ -140,8 +142,8 @@ class DataType {
~DataType() = default;
// Create a type from a given enum
/// \param d
constexpr explicit DataType(Type d) : type_(d) {}
/// \param type
constexpr explicit DataType(const Type &type) : type_(std::move(type)) {}
constexpr bool operator==(const DataType a) const { return type_ == a.type_; }

View File

@ -25,9 +25,6 @@ const int kYuvDefaultChannels = 4;
DeviceTensor::DeviceTensor(const TensorShape &shape, const DataType &type)
: Tensor(shape, type), device_data_(nullptr), size_(0) {
// grab the mem pool from global context and create the allocator for char data area
std::shared_ptr<MemoryPool> global_pool = GlobalContext::Instance()->mem_pool();
data_allocator_ = std::make_unique<Allocator<unsigned char>>(global_pool);
device_data_type_ = type;
host_data_tensor_ = nullptr;
}
@ -36,8 +33,7 @@ Status DeviceTensor::CreateEmpty(const TensorShape &shape, const DataType &type,
CHECK_FAIL_RETURN_UNEXPECTED(shape.known(), "Invalid shape.");
CHECK_FAIL_RETURN_UNEXPECTED(type != DataType::DE_UNKNOWN, "Invalid data type.");
CHECK_FAIL_RETURN_UNEXPECTED(out != nullptr, "Invalid nullptr pointer.");
const DeviceTensorAlloc *alloc = GlobalContext::Instance()->device_tensor_allocator();
*out = std::allocate_shared<DeviceTensor>(*alloc, shape, type);
*out = std::make_shared<DeviceTensor>(shape, type);
// if it's a string tensor and it has no elements, Just initialize the shape and type.
if (!type.IsNumeric() && shape.NumOfElements() == 0) {
return Status::OK();
@ -63,8 +59,7 @@ Status DeviceTensor::CreateFromDeviceMemory(const TensorShape &shape, const Data
CHECK_FAIL_RETURN_UNEXPECTED(dataSize > 0, "Invalid data size");
CHECK_FAIL_RETURN_UNEXPECTED(out != nullptr, "Out pointer is NULL");
const DeviceTensorAlloc *alloc = GlobalContext::Instance()->device_tensor_allocator();
*out = std::allocate_shared<DeviceTensor>(*alloc, shape, type);
*out = std::make_shared<DeviceTensor>(shape, type);
CHECK_FAIL_RETURN_UNEXPECTED(out != nullptr, "Allocate memory failed.");
// if it's a string tensor and it has no elements, Just initialize the shape and type.

View File

@ -84,7 +84,7 @@ class GlobalContext {
#endif
// Getter method
// @return the mem pool
std::shared_ptr<MemoryPool> mem_pool() const { return mem_pool_; }
const std::shared_ptr<MemoryPool> &mem_pool() const { return mem_pool_; }
// Getter method
// @return the tensor allocator as raw pointer

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -60,22 +60,14 @@ namespace dataset {
break; \
}
Tensor::Tensor(const TensorShape &shape, const DataType &type) : shape_(shape), type_(type), data_(nullptr) {
// grab the mem pool from global context and create the allocator for char data area
std::shared_ptr<MemoryPool> global_pool = GlobalContext::Instance()->mem_pool();
data_allocator_ = std::make_unique<Allocator<unsigned char>>(global_pool);
}
Tensor::Tensor(TensorShape shape, DataType type) : shape_(std::move(shape)), type_(type), data_(nullptr) {}
Tensor::Tensor(Tensor &&other) noexcept
: shape_(other.shape()),
type_(other.type()),
data_(other.GetMutableBuffer()),
data_end_(other.data_end_),
data_allocator_(std::move(other.data_allocator_)) {
: shape_(std::move(other.shape_)), type_(other.type_), data_(other.data_), data_end_(other.data_end_) {
#ifdef ENABLE_PYTHON
if (type_.value() == DataType::DE_PYTHON) {
py::gil_scoped_acquire gil_acquire;
python_dict_ = (other.python_dict_);
python_dict_ = std::move(other.python_dict_);
}
// If other.python_array_ has value, assign it to this->python_array_
if (static_cast<bool>(other.python_array_)) {
@ -88,16 +80,15 @@ Tensor::Tensor(Tensor &&other) noexcept
Tensor &Tensor::operator=(Tensor &&other) noexcept {
if (&other != this) {
shape_ = other.shape();
type_ = other.type();
data_ = other.GetMutableBuffer();
shape_ = std::move(other.shape_);
type_ = other.type_;
data_ = other.data_;
data_end_ = other.data_end_;
data_allocator_ = std::move(other.data_allocator_);
yuv_shape_ = other.yuv_shape_;
yuv_shape_ = std::move(other.yuv_shape_);
#ifdef ENABLE_PYTHON
if (type_.value() == DataType::DE_PYTHON) {
py::gil_scoped_acquire gil_acquire;
python_dict_ = (other.python_dict_);
python_dict_ = std::move(other.python_dict_);
}
// If other.python_array_ has value, assign it to this->python_array_
if (static_cast<bool>(other.python_array_)) {
@ -111,11 +102,10 @@ Tensor &Tensor::operator=(Tensor &&other) noexcept {
}
Status Tensor::CreateEmpty(const TensorShape &shape, const DataType &type, TensorPtr *out) {
RETURN_UNEXPECTED_IF_NULL(out);
CHECK_FAIL_RETURN_UNEXPECTED(shape.known(), "Failed to create empty tensor, tensor shape is unknown.");
CHECK_FAIL_RETURN_UNEXPECTED(type != DataType::DE_UNKNOWN, "Failed to create empty tensor, data type is unknown.");
RETURN_UNEXPECTED_IF_NULL(out);
const TensorAlloc *alloc = GlobalContext::Instance()->tensor_allocator();
*out = std::allocate_shared<Tensor>(*alloc, shape, type);
*out = std::make_shared<Tensor>(shape, type);
CHECK_FAIL_RETURN_UNEXPECTED(out != nullptr, "Failed to create empty tensor, allocate memory failed.");
// if it's a string tensor and it has no elements, Just initialize the shape and type.
if (!type.IsNumeric()) {
@ -164,8 +154,7 @@ Status Tensor::CreateFromMemory(const TensorShape &shape, const DataType &type,
Status Tensor::CreateFromMemory(const TensorShape &shape, const DataType &type, const uchar *src, const dsize_t &length,
TensorPtr *out) {
RETURN_UNEXPECTED_IF_NULL(out);
const TensorAlloc *alloc = GlobalContext::Instance()->tensor_allocator();
*out = std::allocate_shared<Tensor>(*alloc, shape, type);
*out = std::make_shared<Tensor>(shape, type);
CHECK_FAIL_RETURN_UNEXPECTED(out != nullptr, "Allocate memory failed.");
if (type.IsNumeric()) {
dsize_t calculated_length = (*out)->SizeInBytes();
@ -273,8 +262,7 @@ Status Tensor::CreateFromPythonObject(py::object obj, std::shared_ptr<Tensor> *o
RETURN_UNEXPECTED_IF_NULL(out);
std::vector<dsize_t> shape{};
DataType type = DataType(DataType::DE_PYTHON);
const TensorAlloc *alloc = GlobalContext::Instance()->tensor_allocator();
*out = std::allocate_shared<Tensor>(*alloc, TensorShape({0}), type);
*out = std::make_shared<Tensor>(TensorShape({0}), type);
{
py::gil_scoped_acquire gil_acquire;
(*out)->python_dict_ = obj;
@ -288,8 +276,7 @@ Status Tensor::CreateFromPythonObject(py::object obj, std::shared_ptr<Tensor> *o
#ifndef ENABLE_ANDROID
Status Tensor::CreateFromByteList(const dataengine::BytesList &bytes_list, const TensorShape &shape, TensorPtr *out) {
RETURN_UNEXPECTED_IF_NULL(out);
const TensorAlloc *alloc = GlobalContext::Instance()->tensor_allocator();
*out = std::allocate_shared<Tensor>(*alloc, TensorShape({static_cast<dsize_t>(bytes_list.value_size())}),
*out = std::make_shared<Tensor>(TensorShape({static_cast<dsize_t>(bytes_list.value_size())}),
DataType(DataType::DE_STRING));
CHECK_FAIL_RETURN_UNEXPECTED(out != nullptr, "Allocate memory failed.");
// total bytes needed = offset array + strings
@ -297,7 +284,7 @@ Status Tensor::CreateFromByteList(const dataengine::BytesList &bytes_list, const
// strings will be null-terminated --> need 1 extra byte per element
dsize_t num_bytes = (kOffsetSize) * (*out)->shape_.NumOfElements() + kOffsetSize + bytes_list.ByteSizeLong();
(*out)->data_ = (*out)->data_allocator_->allocate(num_bytes);
(*out)->data_ = GetAllocator()->allocate(num_bytes);
auto offset_arr = reinterpret_cast<offset_t *>((*out)->data_);
uchar *buf = (*out)->GetStringsBuffer();
@ -437,8 +424,8 @@ Tensor::~Tensor() {
if (!static_cast<bool>(python_array_)) { // the data is not np.ndarray from python layer
#endif
if (data_ != nullptr) {
if (data_allocator_ != nullptr) {
data_allocator_->deallocate(data_);
if (GetAllocator() != nullptr) {
GetAllocator()->deallocate(data_);
data_ = nullptr;
data_end_ = nullptr;
} else {
@ -593,9 +580,9 @@ void Tensor::PrintData(std::ostream &out) const {
}
Status Tensor::AllocateBuffer(const dsize_t &length) {
RETURN_UNEXPECTED_IF_NULL(data_allocator_);
RETURN_UNEXPECTED_IF_NULL(GetAllocator());
if (data_ == nullptr) {
data_ = data_allocator_->allocate(length);
data_ = GetAllocator()->allocate(length);
CHECK_FAIL_RETURN_UNEXPECTED(data_ != nullptr, "Failed to allocate memory for tensor.");
data_end_ = data_ + length;
}
@ -617,7 +604,6 @@ void Tensor::Invalidate() {
type_ = DataType(DataType::DE_UNKNOWN);
data_ = nullptr;
data_end_ = nullptr;
data_allocator_ = nullptr;
#ifdef ENABLE_PYTHON
if (type_.value() == DataType::DE_PYTHON) {
py::gil_scoped_acquire gil_acquire;

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,9 +17,9 @@
#define MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_TENSOR_H_
#include <algorithm>
#include <deque>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#if defined(_WIN32) || defined(_WIN64)
#undef HAVE_STDDEF_H
@ -49,14 +49,11 @@
namespace py = pybind11;
#endif
namespace mindspore {
namespace dataset {
namespace mindspore::dataset {
class Tensor;
template <typename T>
class Allocator;
using CharAllocPtr = std::unique_ptr<Allocator<unsigned char>>;
using TensorAllocPtr = std::shared_ptr<Allocator<Tensor>>; // An allocator shared_ptr for Tensors
using offset_t = uint32_t; // type of offset values to store strings locations
using TensorPtr = std::shared_ptr<Tensor>;
@ -74,7 +71,7 @@ class DATASET_API Tensor {
/// \note The constructor does not allocate data
/// \param shape TensorShape
/// \param type DataType
Tensor(const TensorShape &shape, const DataType &type);
Tensor(TensorShape shape, DataType type);
/// Move constructor
/// \param other Tensor to be moved
@ -119,7 +116,8 @@ class DATASET_API Tensor {
}
/// Create a copy of the input tensor
/// \param[in] MSTensor to create DETensorFrom
/// \param[in] in MSTensor to create DETensor from.
/// \param[in] out DETensor created.
/// \return Status
static Status CreateFromMSTensor(const MSTensor &in, TensorPtr *out);
@ -158,7 +156,6 @@ class DATASET_API Tensor {
#endif
/// Create a Tensor from a given list of values.
/// \tparam type of the values to be inserted.
/// \param[in] items elements of the tensor
/// \param[in] shape shape of the output tensor
/// \param[out] out output argument to hold the created Tensor
@ -168,14 +165,13 @@ class DATASET_API Tensor {
CHECK_FAIL_RETURN_UNEXPECTED(
static_cast<dsize_t>(items.size()) == shape.NumOfElements(),
"Number of elements in the vector does not match the number of elements of the shape required");
DataType type = DataType::FromCType<T>();
const DataType type = DataType::FromCType<T>();
// if items is empty, items_ptr would be nullptr. CreateFromMemory will handle this case.
auto items_ptr = reinterpret_cast<const uchar *>(&items[0]);
const auto items_ptr = reinterpret_cast<const uchar *>(&items[0]);
return CreateFromMemory(shape, type, items_ptr, out);
}
/// Create a 1D Tensor from a given list of values.
/// \tparam type of the values to be inserted.
/// \param[in] items elements of the tensor
/// \param[out] out output argument to hold the created Tensor
/// \return Status Code
@ -190,7 +186,7 @@ class DATASET_API Tensor {
/// \param[out] out output argument to hold the created Tensor
/// \return Status Code
static Status CreateFromVector(const std::vector<bool> &items, const TensorShape &shape, TensorPtr *out) {
std::vector<uint8_t> temp(items.begin(), items.end());
const std::vector<uint8_t> temp(items.begin(), items.end());
RETURN_IF_NOT_OK(CreateFromVector(temp, shape, out));
(*out)->type_ = DataType(DataType::DE_BOOL);
return Status::OK();
@ -224,8 +220,7 @@ class DATASET_API Tensor {
" does not match the number of elements: " + std::to_string(shape.NumOfElements()) +
" the shape required.");
CHECK_FAIL_RETURN_UNEXPECTED(type.IsString(), "Can not create a numeric Tensor from a string vector.");
const TensorAlloc *alloc = GlobalContext::Instance()->tensor_allocator();
*out = std::allocate_shared<Tensor>(*alloc, TensorShape({static_cast<dsize_t>(items.size())}), type);
*out = std::make_shared<Tensor>(TensorShape({static_cast<dsize_t>(items.size())}), type);
CHECK_FAIL_RETURN_UNEXPECTED(out != nullptr, "Allocate memory failed.");
if (items.empty()) {
if (shape.known()) {
@ -233,16 +228,16 @@ class DATASET_API Tensor {
}
}
auto length_sum = [](size_t sum, const std::string &s) { return s.length() + sum; };
dsize_t total_length = std::accumulate(items.begin(), items.end(), 0, length_sum);
const dsize_t total_length = std::accumulate(items.begin(), items.end(), 0, length_sum);
// total bytes needed = offset array + strings
// offset array needs to store one offset var per element + 1 extra to get the length of the last string.
// strings will be null-terminated --> need 1 extra byte per element
size_t num_bytes = (kOffsetSize + 1) * (*out)->shape_.NumOfElements() + kOffsetSize + total_length;
const size_t num_bytes = (kOffsetSize + 1) * (*out)->shape_.NumOfElements() + kOffsetSize + total_length;
RETURN_IF_NOT_OK((*out)->AllocateBuffer(num_bytes));
auto offset_arr = reinterpret_cast<offset_t *>((*out)->data_);
uchar *buf = (*out)->GetStringsBuffer();
const uchar *buf = (*out)->GetStringsBuffer();
offset_t offset = buf - (*out)->data_; // the first string will start here
uint32_t i = 0;
@ -250,7 +245,8 @@ class DATASET_API Tensor {
// insert the start index of the string.
offset_arr[i++] = offset;
// insert actual string
int ret_code = memcpy_s((*out)->data_ + offset, num_bytes - offset, common::SafeCStr(str), str.length() + 1);
const int ret_code =
memcpy_s((*out)->data_ + offset, num_bytes - offset, common::SafeCStr(str), str.length() + 1);
if (ret_code != 0) {
MS_LOG(ERROR) << "Cannot copy string into Tensor";
}
@ -281,8 +277,8 @@ class DATASET_API Tensor {
/// \return Status code
template <typename T>
static Status CreateScalar(const T &item, TensorPtr *out) {
DataType type = DataType::FromCType<T>();
auto item_ptr = reinterpret_cast<const uchar *>(&item);
const DataType type = DataType::FromCType<T>();
const auto item_ptr = reinterpret_cast<const uchar *>(&item);
return CreateFromMemory(TensorShape::CreateScalar(), type, item_ptr, out);
}
@ -338,7 +334,6 @@ class DATASET_API Tensor {
Status GetFloatAt(T *o, const std::vector<dsize_t> &index) const;
/// set item at location specified by index
/// \tparam `T`
/// \param[in] index
/// \param[in] value of type `T`
template <typename T>
@ -360,7 +355,7 @@ class DATASET_API Tensor {
if (value.length() != length) {
RETURN_STATUS_UNEXPECTED("Length of the new string does not match the item.");
}
int ret_code = memcpy_s(reinterpret_cast<char *>(ptr), length, value.c_str(), length);
const int ret_code = memcpy_s(reinterpret_cast<char *>(ptr), length, value.c_str(), length);
CHECK_FAIL_RETURN_UNEXPECTED(ret_code == 0, "Failed to set data into tensor.");
return Status::OK();
@ -381,7 +376,7 @@ class DATASET_API Tensor {
template <typename T>
Status Fill(const T &value) {
CHECK_FAIL_RETURN_UNEXPECTED(!type_.IsString(), "Can not fill on tensor of type string or bytes.");
int64_t cellSize = type_.SizeInBytes();
const int64_t cellSize = type_.SizeInBytes();
if ((data_ != nullptr) && type_.IsCompatible<T>()) {
for (dsize_t i = 0; i < Size(); i++) {
CHECK_FAIL_RETURN_UNEXPECTED(memcpy_s((data_ + i * cellSize), cellSize, &value, cellSize) == 0, "memcpy err");
@ -391,7 +386,7 @@ class DATASET_API Tensor {
std::string err;
err += (data_ == nullptr) ? "data_ is nullptr \t" : "";
err += type_.IsCompatible<T>() ? "data type not compatible\t" : "";
return Status(StatusCode::kMDUnexpectedError, err);
return {StatusCode::kMDUnexpectedError, err};
}
}
@ -429,7 +424,7 @@ class DATASET_API Tensor {
}
/// Get the exact length of string / bytes
Status GetStringLength(uint32_t *length) {
Status GetStringLength(uint32_t *length) const {
CHECK_FAIL_RETURN_UNEXPECTED(type().IsString(), "Only support to get the length of string or bytes Tensor.");
*length = data_end_ - data_ - (Size() + 1) * kOffsetSize - Size();
return Status::OK();
@ -447,12 +442,12 @@ class DATASET_API Tensor {
/// \return
DataType type() const { return type_; }
/// Provide stream operator for displaying it
/// \param output stream
/// \param so the Tensor object to be printed
/// \return output stream
friend std::ostream &operator<<(std::ostream &out, const Tensor &so) {
so.Print(out);
/// Provide stream operator for displaying the Tensor.
/// \param out Output stream.
/// \param tensor Tensor object to be printed.
/// \return Output stream.
friend std::ostream &operator<<(std::ostream &out, const Tensor &tensor) {
tensor.Print(out);
return out;
}
@ -473,10 +468,10 @@ class DATASET_API Tensor {
/// Find the address of the given index. Used in InsertTensor.
/// Example:
/// Tensor t= [[1,2],[3,4]] , StartAddrOfIndex({0}) -> &1
/// \param index incomplete index
/// \param output: startAddrofIndex
/// \param output: remaining
/// \return Status code
/// \param[in] ind Element index.
/// \param[out] start_addr_of_index Starting address of the element index.
/// \param[out] remaining Remaining shape from the index.
/// \return Status code.
Status StartAddrOfIndex(std::vector<dsize_t> ind, uchar **start_addr_of_index, TensorShape *remaining);
/// Expand the shape of the Tensor with one extra dimension.
@ -497,24 +492,24 @@ class DATASET_API Tensor {
/// \return vector of integers
std::vector<dsize_t> Strides() const;
std::string ToString() {
std::string ToString() const {
std::stringstream ss;
this->Print(ss);
return ss.str();
}
/// Handle negative indices.
/// \param[out] out modified index
/// \param[in] index
/// \param[in] length axis length used to modify index
/// \return dsize_t modified index
/// \param[in] index Index to be handled.
/// \param[in] length Axis length of this index.
/// \return Handled index.
static inline dsize_t HandleNeg(dsize_t index, dsize_t length) { return (index < 0) ? (index + length) : index; }
/// Handle negative indices for a vector of indices.
/// \param[out] out modified vector of indices
/// \param[in] index_vector vector of indices
/// \return std::vector<dsize_t> modified vector of indices
static inline std::vector<dsize_t> HandleNegIndices(std::vector<dsize_t> index_vector, std::vector<dsize_t> length) {
/// Handle negative indices.
/// \param[in] index_vector Vector of indices.
/// \param[in] length Length of each axis.
/// \return Modified vector of indices.
static inline std::vector<dsize_t> HandleNegIndices(const std::vector<dsize_t> &index_vector,
const std::vector<dsize_t> &length) {
if (length.size() < index_vector.size()) {
MS_LOG(ERROR) << "The size of length should be greater than the shape of index_vector";
return {};
@ -580,7 +575,7 @@ class DATASET_API Tensor {
Status SetYuvShape(const uint32_t &width, const uint32_t &widthStride, const uint32_t &height,
const uint32_t &heightStride) {
std::vector<uint32_t> tmp{width, widthStride, height, heightStride};
const std::vector<uint32_t> tmp{width, widthStride, height, heightStride};
yuv_shape_ = tmp;
return Status::OK();
}
@ -663,18 +658,14 @@ class DATASET_API Tensor {
}
TensorIterator<T> operator+(const ptrdiff_t &inc) {
auto oldPtr = ptr_;
ptr_ += inc;
auto temp(*this);
ptr_ = oldPtr;
temp.ptr_ += inc;
return temp;
}
TensorIterator<T> operator-(const ptrdiff_t &inc) {
auto oldPtr = ptr_;
ptr_ -= inc;
auto temp(*this);
ptr_ = oldPtr;
temp.ptr_ -= inc;
return temp;
}
@ -705,16 +696,18 @@ class DATASET_API Tensor {
~TensorIterator() = default;
bool operator==(const TensorIterator<std::string_view> &rhs) { return data_ == rhs.data_ && index_ == rhs.index_; }
bool operator==(const TensorIterator<std::string_view> &rhs) const {
return data_ == rhs.data_ && index_ == rhs.index_;
}
bool operator!=(const TensorIterator<std::string_view> &rhs) { return !(*this == rhs); }
operator bool() const { return data_ != nullptr; }
std::string_view operator*() const {
auto offset_ = reinterpret_cast<const offset_t *>(data_);
offset_t start = offset_[index_];
offset_t end = offset_[index_ + 1];
const auto offset_ = reinterpret_cast<const offset_t *>(data_);
const offset_t start = offset_[index_];
const offset_t end = offset_[index_ + 1];
return std::string_view{data_ + start, end - start - 1}; // -1 to skip the \0 at the end
}
@ -751,18 +744,14 @@ class DATASET_API Tensor {
}
TensorIterator<std::string_view> operator+(const dsize_t &inc) {
auto oldPtr = index_;
index_ += inc;
auto temp(*this);
index_ = oldPtr;
temp.index_ += inc;
return temp;
}
TensorIterator<std::string_view> operator-(const dsize_t &inc) {
auto oldPtr = index_;
index_ -= inc;
auto temp(*this);
index_ = oldPtr;
temp.index_ -= inc;
return temp;
}
@ -811,12 +800,12 @@ class DATASET_API Tensor {
/// \param[in] cur_index
void PrintRecursive(std::ostream &out, int32_t cur_dim, const std::vector<dsize_t> &cur_index) const;
/// A function that prints info about the tensor
/// \param[out] out output stream
/// Print the info and data of tensor.
/// \param[out] out Output stream.
void Print(std::ostream &out) const;
/// A function that prints info about the tensor
/// \param[out] out output stream
/// Print the data of tensor.
/// \param[out] out Output stream.
void PrintData(std::ostream &out) const;
/// A function that print the value as specified by its index
@ -829,17 +818,18 @@ class DATASET_API Tensor {
/// \param[in] index vector<dsize_t>
/// \return return a pointer to the item specified at index of type `T`
template <typename T>
Status GetItemPtr(T **, const std::vector<dsize_t> &index) const;
Status GetItemPtr(T **ptr, const std::vector<dsize_t> &index) const;
/// Get pointer to string located at `index` and the length of string
/// \param[in] index vector<dsize_t>
/// \return return a pointer to the string specified at index and the length of the string
Status GetItemPtr(uchar **, const std::vector<dsize_t> &index, offset_t *length = nullptr) const;
Status GetItemPtr(uchar **ptr, const std::vector<dsize_t> &index, offset_t *length = nullptr) const;
/// Given a flat index of an item string, return the start and length of the item
/// \param[in] index flat index of the item
/// \param[out] start address of the ths string
/// \param[out] length of the string
/// Given a flat index of an item string, return the start and length of the item.
/// \param[in] index Flat index of the item.
/// \param[out] string_start Starting address of the ths string.
/// \param[out] length Length of the string.
/// \return Status code.
Status GetStringAt(dsize_t index, uchar **string_start, offset_t *length) const;
/// Skip the offsets and returns the start of the buffer where the real strings is stored. Caller needs to check if
@ -847,14 +837,17 @@ class DATASET_API Tensor {
/// \return return the address of the first string of the tensor.
uchar *GetStringsBuffer() const { return data_ + kOffsetSize * shape_.NumOfElements() + kOffsetSize; }
static const std::unique_ptr<Allocator<unsigned char>> &GetAllocator() {
static auto allocator = std::make_unique<Allocator<unsigned char>>(GlobalContext::Instance()->mem_pool());
return allocator;
}
/// all access to shape_ should be via shape
TensorShape shape_;
/// data type of tensor
DataType type_;
/// pointer to the start of the physical data
unsigned char *data_;
/// An allocator for data_
CharAllocPtr data_allocator_;
/// pointer to the end of the physical data
unsigned char *data_end_ = nullptr;
@ -911,6 +904,5 @@ inline Status Tensor::CreateScalar<std::string>(const std::string &item, TensorP
RETURN_UNEXPECTED_IF_NULL(out);
return CreateFromVector({item}, TensorShape::CreateScalar(), DataType(DataType::DE_STRING), out);
}
} // namespace dataset
} // namespace mindspore
} // namespace mindspore::dataset
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_TENSOR_H_

View File

@ -61,25 +61,36 @@ void TensorShape::Print(std::ostream &out) const {
}
}
TensorShape::TensorShape(const std::initializer_list<dsize_t> &list)
: raw_shape_(*GlobalContext::Instance()->int_allocator()), strides_(*GlobalContext::Instance()->int_allocator()) {
AddListToShape(list);
}
TensorShape::TensorShape(const std::initializer_list<dsize_t> &list) { AddListToShape(list); }
TensorShape::TensorShape(const std::vector<dsize_t> &list)
: raw_shape_(*GlobalContext::Instance()->int_allocator()), strides_(*GlobalContext::Instance()->int_allocator()) {
AddListToShape(list);
}
TensorShape::TensorShape(const std::vector<dsize_t> &list) { AddListToShape(list); }
TensorShape::TensorShape(const TensorShape &shape)
: raw_shape_(*GlobalContext::Instance()->int_allocator()), strides_(*GlobalContext::Instance()->int_allocator()) {
AddListToShape(shape.AsVector());
known_ = shape.known_; // override with the input shape in case of unknown-rank tensor shape.
: raw_shape_(shape.raw_shape_), strides_(shape.strides_), known_(shape.known_) {}
TensorShape::TensorShape(TensorShape &&shape) noexcept
: raw_shape_(std::move(shape.raw_shape_)), strides_(std::move(shape.strides_)), known_(shape.known_) {}
TensorShape &TensorShape::operator=(const TensorShape &shape) {
if (this != &shape) {
raw_shape_ = shape.raw_shape_;
strides_ = shape.strides_;
known_ = shape.known_;
}
return *this;
}
TensorShape &TensorShape::operator=(TensorShape &&shape) noexcept {
if (this != &shape) {
raw_shape_ = std::move(shape.raw_shape_);
strides_ = std::move(shape.strides_);
known_ = shape.known_;
}
return *this;
}
#ifdef ENABLE_PYTHON
TensorShape::TensorShape(py::list l)
: raw_shape_(*GlobalContext::Instance()->int_allocator()), strides_(*GlobalContext::Instance()->int_allocator()) {
TensorShape::TensorShape(py::list l) {
std::vector<dsize_t> list_c;
for (auto &i : l) {
if (!i.is_none()) {
@ -93,10 +104,7 @@ TensorShape::TensorShape(py::list l)
#endif
#ifndef ENABLE_ANDROID
TensorShape::TensorShape(cv::MatSize cv_size, uint32_t type)
: raw_shape_(*GlobalContext::Instance()->int_allocator()),
strides_(*GlobalContext::Instance()->int_allocator()),
known_(true) {
TensorShape::TensorShape(cv::MatSize cv_size, uint32_t type) : known_(true) {
for (int i = 0; i < cv_size.dims(); i++) {
raw_shape_.push_back(cv_size[i]);
}

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@
#include <ostream>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#ifndef ENABLE_ANDROID
@ -59,21 +60,33 @@ class DATASET_API TensorShape {
/// \brief Create a Shape from an initialization list (e.g., TensorShape s = {2,2}).
/// If one of the dims is set to DIM_UNKNOWN, the shape will flagged as unKnown
/// \param[in] list
explicit TensorShape(const std::initializer_list<dsize_t> &list);
/// \param[in] list Length list of each axis.
TensorShape(const std::initializer_list<dsize_t> &list);
/// \brief Create a Shape from a vector (e.g., TensorShape s = std::vector<dsize_t>({2,2}) ).
/// If one of the dims is set to DIM_UNKNOWN, the shape will flagged as unKnown
/// \param[in] list
explicit TensorShape(const std::vector<dsize_t> &list);
/// \brief Copy constructor
/// \param[in] shape
/// \brief Copy constructor.
/// \param[in] shape TensorShape to copy from.
TensorShape(const TensorShape &shape);
/// \brief Move constructor.
/// \param[in] shape TensorShape to copy from.
TensorShape(TensorShape &&shape) noexcept;
/// \brief Copy assignment.
/// \param[in] shape TensorShape to move from.
TensorShape &operator=(const TensorShape &shape);
/// \brief Move assignment.
/// \param[in] shape TensorShape to move from.
TensorShape &operator=(TensorShape &&shape) noexcept;
#ifdef ENABLE_PYTHON
/// \brief construct a TensorShape via a python list
/// \param[in] py::list l - a list object from python
/// \brief Construct a TensorShape via a python list.
/// \param[in] l A py::list of the shape.
explicit TensorShape(py::list l);
#endif
@ -81,7 +94,10 @@ class DATASET_API TensorShape {
/// \brief Create a scalar Shape (i.e., empty shape with mKnown = true)
/// \return TensorShape
static TensorShape CreateScalar() { return TensorShape({}); }
static TensorShape CreateScalar() {
static std::vector<dsize_t> empty_shape{};
return TensorShape(empty_shape);
}
/// \brief Create a shape with an unknown rank.
/// \return TensorShape
@ -182,12 +198,12 @@ class DATASET_API TensorShape {
Status ToFlatIndex(const std::vector<dsize_t> &index, dsize_t *flat_index) const;
private:
// Vector to keep the dims of the shape.
std::vector<dsize_t> raw_shape_;
// Vector to keep the strides of the shape. The size is rank+1
std::vector<dsize_t> strides_;
// True if known and valid shape, false otherwise
bool known_;
// Vector to keep the dims of the shape.
std::vector<dsize_t, IntAlloc> raw_shape_;
// Vector to keep the strides of the shape. The size is rank+1
std::vector<dsize_t, IntAlloc> strides_;
/// \brief Internal utility function to iterate over a list,
/// check if the dim is valid and then insert it into the shape.

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -475,5 +475,17 @@ Status DataSchema::GetColumnNameMap(std::unordered_map<std::string, int32_t> *ou
return Status::OK();
}
Status DataSchema::GetColumnName(std::vector<std::string> *column_names) const {
RETURN_UNEXPECTED_IF_NULL(column_names);
column_names->clear();
for (const auto &col_desc : col_descs_) {
if (col_desc.Name().empty()) {
RETURN_STATUS_UNEXPECTED("Found empty column name in schema.");
}
column_names->emplace_back(col_desc.Name());
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2021 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -172,6 +172,11 @@ class DataSchema {
/// \return Status The status code returned
Status GetColumnNameMap(std::unordered_map<std::string, int32_t> *out_column_name_map);
/// \brief Get the column name list of the schema.
/// \param[out] column_names The column names in the schema.
/// \return The status code.
Status GetColumnName(std::vector<std::string> *column_names) const;
private:
/// \brief Internal helper function. Parses the json schema file in any order and produces a schema that
/// does not follow any particular order (json standard does not enforce any ordering protocol).

View File

@ -87,7 +87,7 @@ Status BatchOp::operator()() {
total_step++;
RETURN_IF_NOT_OK(callback_manager_.StepBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
}
(void)table->emplace_back(new_row);
(void)table->emplace_back(std::move(new_row));
// if # of rows is enough to make 1 batch, send it to worker_queue
if (table->size() == static_cast<size_t>(cur_batch_size)) {
RETURN_IF_NOT_OK(worker_in_queues_[NextWorkerID()]->EmplaceBack(
@ -165,7 +165,7 @@ Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *tensor_row_dequeu
for (size_t i = 0; i < num_columns; i++) {
std::shared_ptr<Tensor> batched_tensor;
RETURN_IF_NOT_OK(ConvertRowsToTensor(tensor_row_dequeue, &batched_tensor, batch_size, i, contains_per_batch_map));
batched_tensor_row->emplace_back(batched_tensor);
batched_tensor_row->emplace_back(std::move(batched_tensor));
}
return Status::OK();
@ -198,7 +198,7 @@ Status BatchOp::ConvertRowsToTensor(const std::unique_ptr<TensorQTable> *tensor_
if (first_type.IsNumeric()) { // numeric tensor
RETURN_IF_NOT_OK(Tensor::CreateEmpty(new_shape, first_type, &new_tensor));
for (auto row_index = 0; row_index < batch_size; ++row_index) {
std::shared_ptr<Tensor> old_tensor = (**tensor_row_dequeue)[row_index][column_index];
const std::shared_ptr<Tensor> &old_tensor = (**tensor_row_dequeue)[row_index][column_index];
// check the newly popped rows have the same dim and type as the first
if (old_tensor->shape() == first_shape && old_tensor->type() == first_type) {
if (new_shape.NumOfElements() != 0) {
@ -280,6 +280,7 @@ Status BatchOp::ConvertRowsToTensor(const std::unique_ptr<TensorQTable> *tensor_
#endif
} else { // handle string column differently
std::vector<std::string> strings;
strings.reserve(batch_size);
for (dsize_t row_index = 0; row_index < batch_size; ++row_index) {
std::shared_ptr<Tensor> old_tensor = (**tensor_row_dequeue)[row_index][column_index];
for (auto itr = old_tensor->begin<std::string_view>(); itr != old_tensor->end<std::string_view>(); ++itr) {

View File

@ -700,7 +700,7 @@ Status DataQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, i
DATA_INFO data_info;
(void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
[](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(std::move(data_info)));
}
return Status::OK();
}

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2022 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,8 +26,6 @@
#include "proto/example.pb.h"
#include "minddata/dataset/core/config_manager.h"
#include "minddata/dataset/core/global_context.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/engine/datasetops/source/io_block.h"
#include "minddata/dataset/engine/execution_tree.h"
@ -44,13 +42,14 @@ TFReaderOp::TFReaderOp(int32_t num_workers, int32_t worker_connector_size, int64
std::vector<std::string> dataset_files_list, std::unique_ptr<DataSchema> data_schema,
int32_t op_connector_size, std::vector<std::string> columns_to_load, bool shuffle_files,
int32_t num_devices, int32_t device_id, bool equal_rows_per_shard,
const CompressionType &compression_type)
const CompressionType &compression_type, bool decode)
: NonMappableLeafOp(num_workers, worker_connector_size, total_num_rows, op_connector_size, shuffle_files,
num_devices, device_id, compression_type),
dataset_files_list_(std::move(dataset_files_list)),
columns_to_load_(std::move(columns_to_load)),
data_schema_(std::move(data_schema)),
equal_rows_per_shard_(equal_rows_per_shard) {}
equal_rows_per_shard_(equal_rows_per_shard),
decode_(decode) {}
// A print method typically used for debugging
void TFReaderOp::Print(std::ostream &out, bool show_all) const {
@ -121,9 +120,12 @@ Status TFReaderOp::RegisterAndLaunchThreads() {
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&TFReaderOp::WorkerEntry, this, std::placeholders::_1),
&worker_tasks_, Name() + "::WorkerEntry", id()));
// if decode is true, launch some workers to parse the protobuf
if (decode_) {
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
std::bind(&TFReaderOp::ParsingWorkerEntry, this, std::placeholders::_1),
Name() + "::ParsingWorkerEntry", id()));
}
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TFReaderOp::Collector, this), Name() + "::Collector", id()));
return Status::OK();
@ -138,25 +140,34 @@ Status TFReaderOp::operator()() {
std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
load_io_block_queue_ = true;
}
while (workers_done < num_workers_) {
TensorRow fetched_row;
while (workers_done < num_workers_) {
RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &fetched_row));
if (fetched_row.eoe()) {
workers_done++;
} else if ((compression_type_ == CompressionType::NONE || compression_type_ == CompressionType::GZIP_WITH_COUNT ||
compression_type_ == CompressionType::ZLIB_WITH_COUNT) &&
(total_rows_ == 0 || rows_read < total_rows_)) {
if (decode_) {
// get record bytes from jagged_rows_connector and send them to workers for parsing
auto parse_worker_id = NextWorkerID();
const auto parse_worker_id = NextWorkerID();
RETURN_IF_NOT_OK(worker_in_queues_[parse_worker_id]->EmplaceBack(std::move(fetched_row)));
} else {
// get record bytes from jagged_rows_connector and send them to out_connector
RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row)));
}
rows_read++;
} else if ((compression_type_ == CompressionType::GZIP || compression_type_ == CompressionType::ZLIB) &&
(rows_read < total_rows_ * num_devices_)) {
// for compressed version, total_rows_ is total rows that will be read per shard
if (decode_) {
// get record bytes from jagged_rows_connector and send them to workers for parsing
auto parse_worker_id = NextWorkerID();
const auto parse_worker_id = NextWorkerID();
RETURN_IF_NOT_OK(worker_in_queues_[parse_worker_id]->EmplaceBack(std::move(fetched_row)));
} else {
// get record bytes from jagged_rows_connector and send them to out_connector
RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row)));
}
rows_read++;
} else {
// IOBlockQueue thread needs to:
@ -185,19 +196,29 @@ Status TFReaderOp::operator()() {
}
}
if (decode_) {
// finish reading this epoch, send an EOE flag to next parsing worker
auto parse_worker_id = NextWorkerID();
const auto parse_worker_id = NextWorkerID();
RETURN_IF_NOT_OK(worker_in_queues_[parse_worker_id]->EmplaceBack(TensorRow(TensorRow::kFlagEOE)));
} else {
// finish reading this epoch, send an EOE flag to out_connector
RETURN_IF_NOT_OK(out_connector_->SendEOE());
}
RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
}
if (decode_) {
// finish reading all the data, send an EOF flag to next parsing worker
auto parse_worker_id = NextWorkerID();
RETURN_IF_NOT_OK(worker_in_queues_[parse_worker_id]->EmplaceBack(TensorRow(TensorRow::kFlagEOF)));
RETURN_IF_NOT_OK(worker_in_queues_[parse_worker_id]->EmplaceBack(TensorRow::kFlagEOF));
// tell all the parsing workers to quit
for (auto i = 0; i < num_workers_; ++i) {
RETURN_IF_NOT_OK(worker_in_queues_[i]->EmplaceBack(TensorRow(TensorRow::kFlagQuit)));
RETURN_IF_NOT_OK(worker_in_queues_[i]->EmplaceBack(TensorRow::kFlagQuit));
}
} else {
// finish reading all the data, send an EOF flag to out_connector
RETURN_IF_NOT_OK(out_connector_->SendEOF());
}
RETURN_IF_NOT_OK(PostEndOfData());
@ -883,7 +904,7 @@ Status TFReaderOp::CreateSchema(const std::string &tf_record_file, std::vector<s
const dataengine::Feature::KindCase kind_case = feature.kind_case();
switch (kind_case) {
case dataengine::Feature::KindCase::kBytesList:
column_type = "uint8";
column_type = "string";
break;
case dataengine::Feature::KindCase::kFloatList:
@ -1218,9 +1239,14 @@ void TFReaderOp::HelperCountZLIBRows(const std::string &realpath_value, const st
Status TFReaderOp::ComputeColMap() {
// Construct the column name map for this operator (base class field)
if (column_name_id_map_.empty()) {
if (decode_) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->Column(i).Name()] = i;
}
} else {
// if decode is false, the output will only have one column containing the record bytes
column_name_id_map_["proto"] = 0;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
@ -1308,9 +1334,13 @@ Status TFReaderOp::HelperIOBlockFiller(int32_t *queue_index, int32_t *key_index,
Status TFReaderOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
RETURN_IF_NOT_OK(NonMappableLeafOp::GetNextRowPullMode(row));
if (decode_) {
if (!row->empty()) {
// data got from jagged_rows_connector is raw bytes so we need to parse it before return
RETURN_IF_NOT_OK(ParseExample(*row, row));
TensorRow res;
RETURN_IF_NOT_OK(ParseExample(*row, &res));
*row = std::move(res);
}
}
return Status::OK();
}

View File

@ -1,5 +1,5 @@
/**
* Copyright 2019-2022 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -64,23 +64,25 @@ using StringIndex = AutoIndexObj<std::string>;
class TFReaderOp : public NonMappableLeafOp {
public:
// Constructor of TFReaderOp (2)
// @note The builder class should be used to call this constructor.
// @param num_workers - number of worker threads reading data from TFRecord files.
// @param worker_connector_size - size of each internal queue.
// @param total_num_rows - Number of rows to read
// @param dataset_files_list - list of filepaths for the dataset files.
// @param data_schema - the data schema object.
// @param op_connector_size - size of each queue in the connector that the child operator pulls from.
// @param columns_to_load - the names of the columns to load data from.
// @param shuffle_files - whether or not to shuffle the files before reading data.
// @param equal_rows_per_shard - whether or not to get equal rows for each process.
// @param compression_type - the compression type of the TFRecord files
/// \brief Constructor.
/// \param num_workers The number of worker threads for reading data.
/// \param worker_connector_size The size of each worker queue.
/// \param total_num_rows The Number of rows to read.
/// \param dataset_files_list The list of paths of dataset files to read.
/// \param data_schema The data schema descributing the feature names, dtypes and shapes.
/// \param op_connector_size The size of connector queue for the child node to read from.
/// \param columns_to_load The feature names to load from the files.
/// \param shuffle_files Whether to shuffle the files before reading.
/// \param num_devices The number of shards that the dataset will be divided into.
/// \param device_id Which part of dataset to read among all the shards.
/// \param equal_rows_per_shard Whether to read equal number of rows for each shard.
/// \param compression_type The compression type of the dataset files.
/// \param decode Whether to decode the protobuf, or leave it for ParseExampleOp to parse.
TFReaderOp(int32_t num_workers, int32_t worker_connector_size, int64_t total_num_rows,
std::vector<std::string> dataset_files_list, std::unique_ptr<DataSchema> data_schema,
int32_t op_connector_size, std::vector<std::string> columns_to_load, bool shuffle_files,
int32_t num_devices, int32_t device_id, bool equal_rows_per_shard,
const CompressionType &compression_type = CompressionType::NONE);
int32_t num_devices, int32_t device_id, bool equal_rows_per_shard, const CompressionType &compression_type,
bool decode);
/// Default destructor
~TFReaderOp() override = default;
@ -363,6 +365,7 @@ class TFReaderOp : public NonMappableLeafOp {
std::vector<std::string> columns_to_load_;
std::unique_ptr<DataSchema> data_schema_;
bool equal_rows_per_shard_;
bool decode_; // whether to parse the proto
};
} // namespace dataset
} // namespace mindspore

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2022 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -246,6 +246,10 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> {
/// \return Child nodes
const std::vector<std::shared_ptr<DatasetNode>> Children() const { return children_; }
/// \brief Get the parent dataset node.
/// \return The parent dataset node.
DatasetNode *Parent() const { return parent_; }
/// \brief Establish a parent-child relationship between this node and the input node.
/// Used during the cloning of the user-input IR tree (temporary use)
Status AppendChild(std::shared_ptr<DatasetNode> child);

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2022 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,18 +34,28 @@ namespace dataset {
MapNode::MapNode(std::shared_ptr<DatasetNode> child, std::vector<std::shared_ptr<TensorOperation>> operations,
std::vector<std::string> input_columns, std::vector<std::string> output_columns,
std::shared_ptr<DatasetCache> cache, std::vector<std::shared_ptr<DSCallback>> callbacks,
const std::shared_ptr<DatasetCache> &cache, std::vector<std::shared_ptr<DSCallback>> callbacks,
ManualOffloadMode offload, std::shared_ptr<PythonMultiprocessingRuntime> python_mp)
: operations_(operations),
input_columns_(input_columns),
output_columns_(output_columns),
DatasetNode(std::move(cache)),
callbacks_(callbacks),
: operations_(std::move(operations)),
input_columns_(std::move(input_columns)),
output_columns_(std::move(output_columns)),
DatasetNode(cache),
callbacks_(std::move(callbacks)),
offload_(offload),
python_mp_(std::move(python_mp)) {
this->AddChild(child);
this->AddChild(std::move(child));
}
MapNode::MapNode(std::vector<std::shared_ptr<TensorOperation>> operations, std::vector<std::string> input_columns,
std::vector<std::string> output_columns)
: operations_(std::move(operations)),
input_columns_(std::move(input_columns)),
output_columns_(std::move(output_columns)),
DatasetNode(nullptr),
callbacks_({}),
offload_(ManualOffloadMode::kUnspecified),
python_mp_(nullptr) {}
std::shared_ptr<DatasetNode> MapNode::Copy() {
std::vector<std::shared_ptr<TensorOperation>> operations = operations_;
auto node = std::make_shared<MapNode>(nullptr, operations, input_columns_, output_columns_, cache_, callbacks_,

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2022 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -33,10 +33,14 @@ class MapNode : public DatasetNode {
/// \brief Constructor
MapNode(std::shared_ptr<DatasetNode> child, std::vector<std::shared_ptr<TensorOperation>> operations,
std::vector<std::string> input_columns = {}, std::vector<std::string> output_columns = {},
std::shared_ptr<DatasetCache> cache = nullptr, std::vector<std::shared_ptr<DSCallback>> callbacks = {},
const std::shared_ptr<DatasetCache> &cache = nullptr, std::vector<std::shared_ptr<DSCallback>> callbacks = {},
ManualOffloadMode offload = ManualOffloadMode::kUnspecified,
std::shared_ptr<PythonMultiprocessingRuntime> python_mp = nullptr);
/// \brief Constructor used in InsertMap pass.
MapNode(std::vector<std::shared_ptr<TensorOperation>> operations, std::vector<std::string> input_columns,
std::vector<std::string> output_columns);
/// \brief Destructor
~MapNode() override = default;

View File

@ -167,15 +167,8 @@ Status TFRecordNode::ValidateParams() {
return Status::OK();
}
// Function to build TFRecordNode
Status TFRecordNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
RETURN_UNEXPECTED_IF_NULL(node_ops);
// Sort the datasets file in a lexicographical order
std::vector<std::string> sorted_dir_files = dataset_files_;
std::sort(sorted_dir_files.begin(), sorted_dir_files.end());
// Create Schema Object
std::unique_ptr<DataSchema> data_schema = std::make_unique<DataSchema>();
Status TFRecordNode::CreateDataSchema(DataSchema *data_schema) {
RETURN_UNEXPECTED_IF_NULL(data_schema);
if (!schema_path_.empty()) {
RETURN_IF_NOT_OK(ValidateDatasetFilesParam("TFRecordDataset", {schema_path_}));
RETURN_IF_NOT_OK(data_schema->LoadSchemaFile(schema_path_, columns_list_));
@ -183,6 +176,18 @@ Status TFRecordNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
std::string schema_json_string = schema_obj_->to_json();
RETURN_IF_NOT_OK(data_schema->LoadSchemaString(schema_json_string, columns_list_));
}
return Status::OK();
}
// Function to build TFRecordNode
Status TFRecordNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) {
RETURN_UNEXPECTED_IF_NULL(node_ops);
// Sort the datasets file in a lexicographical order
std::vector<std::string> sorted_dir_files = dataset_files_;
std::sort(sorted_dir_files.begin(), sorted_dir_files.end());
DataSchema data_schema;
RETURN_IF_NOT_OK(CreateDataSchema(&data_schema));
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
@ -190,9 +195,10 @@ Status TFRecordNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o
RETURN_IF_NOT_OK(HelperGetCompressType(&compression_type));
// Create and initialize TFReaderOp
std::shared_ptr<TFReaderOp> tf_reader_op = std::make_shared<TFReaderOp>(
num_workers_, worker_connector_size_, num_samples_, sorted_dir_files, std::move(data_schema), connector_que_size_,
columns_list_, shuffle_files, num_shards_, shard_id_, shard_equal_rows_, compression_type);
std::shared_ptr<TFReaderOp> tf_reader_op =
std::make_shared<TFReaderOp>(num_workers_, worker_connector_size_, num_samples_, sorted_dir_files,
std::make_unique<DataSchema>(data_schema), connector_que_size_, columns_list_,
shuffle_files, num_shards_, shard_id_, shard_equal_rows_, compression_type, decode_);
RETURN_IF_NOT_OK(tf_reader_op->Init());

View File

@ -49,7 +49,8 @@ class TFRecordNode : public NonMappableSourceNode {
num_shards_(num_shards),
shard_id_(shard_id),
shard_equal_rows_(shard_equal_rows),
compression_type_(compression_type) {
compression_type_(compression_type),
decode_(true) {
// Update the num_shards_ in global context. this number is only used for now by auto_num_worker_pass. User
// discretion is advised. Auto_num_worker_pass is currently an experimental feature which can still work if the
// num_shards_ isn't 100% correct. The reason behind is for now, PreBuildSampler doesn't offer a way to return
@ -111,6 +112,14 @@ class TFRecordNode : public NonMappableSourceNode {
Status GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &size_getter, bool estimate,
int64_t *dataset_size) override;
/// \brief Set whether to parse the protobuf in TFRecordOp
/// \param[in] decode Whether to decode.
void SetDecode(bool decode) { decode_ = decode; }
/// \brief Create DataSchema object with the input.
/// \param[out] data_schema The output data schema.
Status CreateDataSchema(DataSchema *data_schema);
/// \brief Get the file list of the specific shard ID
/// \param[out] shard_filenames the list of filenames for that specific shard ID
/// \return Status of the function
@ -189,6 +198,7 @@ class TFRecordNode : public NonMappableSourceNode {
int32_t shard_id_;
bool shard_equal_rows_;
std::string compression_type_;
bool decode_; // whether to parse the proto
static std::unordered_set<std::string> large_files_;
};

View File

@ -9,14 +9,15 @@ set(DATASET_ENGINE_OPT_SRC_FILES
pre/add_skip_pass.cc
pre/cache_transform_pass.cc
pre/cache_validation_pass.cc
pre/debug_mode_pass.cc
pre/deep_copy_pass.cc
pre/epoch_ctrl_pass.cc
pre/getter_pass.cc
pre/input_validation_pass.cc
pre/insert_map_pass.cc
pre/node_offload_pass.cc
pre/node_removal_pass.cc
pre/skip_pushdown_pass.cc
pre/debug_mode_pass.cc
)
if(ENABLE_PYTHON)

View File

@ -0,0 +1,80 @@
/**
* Copyright 2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/opt/pre/insert_map_pass.h"
#include <string>
#include <vector>
#include "minddata/dataset/engine/ir/datasetops/map_node.h"
#ifndef ENABLE_ANDROID
#include "minddata/dataset/engine/ir/datasetops/source/tf_record_node.h"
#endif
#include "minddata/dataset/kernels/ir/data/transforms_ir.h"
namespace mindspore::dataset {
#ifndef ENABLE_ANDROID
Status InsertMapPass::Visit(std::shared_ptr<TFRecordNode> node, bool *const modified) {
RETURN_UNEXPECTED_IF_NULL(node);
RETURN_UNEXPECTED_IF_NULL(modified);
#if !defined(_WIN32) && !defined(_WIN64)
// construct schema from the inputs of TFRecordNode
auto data_schema = DataSchema();
RETURN_IF_NOT_OK(node->CreateDataSchema(&data_schema));
// get the output column list
std::vector<std::string> output_columns;
RETURN_IF_NOT_OK(data_schema.GetColumnName(&output_columns));
if (output_columns.empty()) {
if (!node->ColumnsList().empty()) {
output_columns = node->ColumnsList();
} else {
// Unable to fetch output columns, degraded to do parsing directly in TFRecordOp
MS_LOG(WARNING)
<< "If both schema and column list are not set, the performance of TFRecordDataset may be degraded.";
*modified = false;
return Status::OK();
}
}
// not to parse the protobuf in TFRecordOp
node->SetDecode(false);
// if the next node is batch, do parallel parsing in ParseExampleOp
bool parallel_parse = node->Parent()->Name() == kBatchNode;
const auto parse_example =
std::make_shared<transforms::ParseExampleOperation>(data_schema, node->ColumnsList(), parallel_parse);
auto map_node = std::make_shared<MapNode>(std::vector<std::shared_ptr<TensorOperation>>{parse_example},
std::vector<std::string>{"proto"}, output_columns);
if (parallel_parse) {
// parallel parsing use a thread pool inside ParseExampleOp, so we only need 1 worker for map
(void)map_node->SetNumWorkers(1);
}
if (node->Parent()->Name() == kBatchNode) {
MS_LOG(INFO) << "Insert a Map node after Batch to parse protobuf in parallel.";
RETURN_IF_NOT_OK(node->Parent()->InsertAbove(map_node));
} else {
MS_LOG(INFO) << "Insert a Map node after TFRecord to parse protobuf one by one.";
RETURN_IF_NOT_OK(node->InsertAbove(map_node));
}
*modified = true;
#endif
return Status ::OK();
}
#endif
} // namespace mindspore::dataset

View File

@ -0,0 +1,44 @@
/**
* Copyright 2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_OPT_PRE_INSERT_MAP_PASS_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_OPT_PRE_INSERT_MAP_PASS_H_
#include <memory>
#include "minddata/dataset/engine/opt/pass.h"
namespace mindspore {
namespace dataset {
class InsertMapPass : public IRNodePass {
public:
/// \brief Constructor
InsertMapPass() = default;
/// \brief Destructor
~InsertMapPass() override = default;
#ifndef ENABLE_ANDROID
/// \brief Insert map node to parse the protobuf for TFRecord.
/// \param[in] node The TFRecordNode being visited.
/// \param[in, out] modified Indicator if the node was changed at all.
/// \return The status code.
Status Visit(std::shared_ptr<TFRecordNode> node, bool *const modified) override;
#endif
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_OPT_PRE_INSERT_MAP_PASS_H_

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2023 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,6 +35,7 @@
#include "minddata/dataset/engine/opt/pre/epoch_ctrl_pass.h"
#include "minddata/dataset/engine/opt/pre/getter_pass.h"
#include "minddata/dataset/engine/opt/pre/input_validation_pass.h"
#include "minddata/dataset/engine/opt/pre/insert_map_pass.h"
#include "minddata/dataset/engine/opt/pre/node_removal_pass.h"
#include "minddata/dataset/engine/opt/pre/skip_pushdown_pass.h"
#include "minddata/dataset/engine/perf/info_collector.h"
@ -60,6 +61,7 @@ Status TreeAdapter::PrePass(const std::shared_ptr<DatasetNode> &ir) {
MS_LOG(INFO) << "Running pre pass loops.";
(void)actions.emplace_back(std::make_unique<InputValidationPass>());
(void)actions.emplace_back(std::make_unique<CacheValidationPass>());
(void)actions.emplace_back(std::make_unique<InsertMapPass>());
if (usage_ == kDeReset) {
(void)actions.emplace_back(std::make_unique<AddSkipPass>());
if (GlobalContext::config_manager()->fast_recovery()) {

View File

@ -1,5 +1,5 @@
/**
* Copyright 2021-2023 Huawei Technologies Co., Ltd
* Copyright 2021-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,11 +26,11 @@
#include "minddata/dataset/engine/opt/pre/epoch_ctrl_pass.h"
#include "minddata/dataset/engine/opt/pre/getter_pass.h"
#include "minddata/dataset/engine/opt/pre/input_validation_pass.h"
#include "minddata/dataset/engine/opt/pre/insert_map_pass.h"
#include "minddata/dataset/engine/opt/pre/node_removal_pass.h"
namespace mindspore {
namespace dataset {
TreeAdapterLite::TreeAdapterLite(UsageFlag usage) : root_(nullptr), usage_(usage) {
// Create ExecutionTree.
tree_ = std::make_unique<ExecutionTree>();
@ -97,6 +97,7 @@ Status TreeAdapterLite::PrePass(std::shared_ptr<DatasetNode> ir) {
std::vector<std::unique_ptr<IRPass>> actions;
MS_LOG(INFO) << "Prepare PrePass loops.";
(void)actions.emplace_back(std::make_unique<InputValidationPass>());
(void)actions.emplace_back(std::make_unique<InsertMapPass>());
(void)actions.emplace_back(std::make_unique<NodeRemovalPass>());
(void)actions.emplace_back(std::make_unique<EpochCtrlPass>());
if (usage_ == kDeGetter) {

View File

@ -51,7 +51,7 @@ bool AutotuneCallback::IsEpochEndNeeded() { return false; }
bool AutotuneCallback::IsNStepEndNeeded() { return false; }
Status AutotuneCallback::PushChangeRequest(ChangeRequestPtr change_request) {
RETURN_IF_NOT_OK(change_request_queue_->Add(change_request));
RETURN_IF_NOT_OK(change_request_queue_->Add(std::move(change_request)));
return Status::OK();
}

View File

@ -1,15 +1,20 @@
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
if(NOT (CMAKE_SYSTEM_NAME MATCHES "Windows"))
set(ABSL_DEPEND_FILES
parse_example_op.cc)
endif()
add_library(kernels-data OBJECT
concatenate_op.cc
data_utils.cc
duplicate_op.cc
fill_op.cc
mask_op.cc
one_hot_op.cc
pad_end_op.cc
type_cast_op.cc
to_float16_op.cc
fill_op.cc
slice_op.cc
mask_op.cc
concatenate_op.cc
duplicate_op.cc
to_float16_op.cc
type_cast_op.cc
unique_op.cc
${ABSL_DEPEND_FILES}
)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,78 @@
/**
* Copyright 2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_DATA_PARSE_EXAMPLE_OP_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_DATA_PARSE_EXAMPLE_OP_H_
#include <unsupported/Eigen/CXX11/ThreadPool>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/kernels/tensor_op.h"
namespace mindspore {
namespace dataset {
constexpr int kThreadPoolSize = 32;
struct VarLenTensorBuffer {
std::vector<std::shared_ptr<Tensor>> numeric_tensor; // store the minibatch of numeric tensors
std::vector<std::string> string_tensor; // store the minibatch of strings
size_t string_length; // store the lengtn of string in minibatch
};
class ParseExampleOp : public TensorOp {
public:
ParseExampleOp(DataSchema data_schema, std::vector<std::string> column_list, bool parallel_parse)
: data_schema_(std::move(data_schema)),
column_list_(std::move(column_list)),
parallel_parse_(parallel_parse),
pool_(nullptr) {
if (parallel_parse) {
pool_ = std::make_unique<Eigen::ThreadPool>(kThreadPoolSize);
}
}
~ParseExampleOp() override = default;
Status Compute(const TensorRow &input, TensorRow *output) override;
std::string Name() const override { return kParseExampleOp; }
private:
Status ParseSingleExample(const TensorRow &raw_bytes, TensorRow *parsed_row);
Status ParallelParseExample(const TensorRow &raw_bytes, TensorRow *parsed_row);
Status ParseSerializedExample(const std::string &example_bytes, TensorRow *parsed_row,
std::unordered_map<int32_t, std::vector<std::string>> *string_column_map,
std::vector<VarLenTensorBuffer> *varlen_tensor_vector, size_t tensor_index);
Status ConstructColumnMap(const std::string &example_bytes);
DataSchema data_schema_;
std::vector<std::string> column_list_;
bool parallel_parse_;
std::unique_ptr<Eigen::ThreadPool> pool_;
std::unordered_map<std::string, int32_t> column_name_id_map_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_DATA_PARSE_EXAMPLE_OP_H_

View File

@ -2022,7 +2022,7 @@ Status Affine(const std::shared_ptr<Tensor> &input, std::shared_ptr<Tensor> *out
}
std::vector<float_t> matrix;
RETURN_IF_NOT_OK(GetAffineMatrix(input, &matrix, degrees, translation, scale, shear));
RETURN_IF_NOT_OK(GetAffineMatrix(input_cv, &matrix, degrees, translation, scale, shear));
cv::Mat affine_mat(matrix);
affine_mat = affine_mat.reshape(1, {2, 3});

View File

@ -1,5 +1,5 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
* Copyright 2021-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,7 @@
#include "minddata/dataset/kernels/image/resize_cubic_op.h"
#include <cmath>
#include <limits>
#include <climits>
namespace mindspore {
namespace dataset {

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2023 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,6 +36,7 @@
#include "minddata/dataset/kernels/data/one_hot_op.h"
#ifndef ENABLE_ANDROID
#include "minddata/dataset/kernels/data/pad_end_op.h"
#include "minddata/dataset/kernels/data/parse_example_op.h"
#endif
#include "minddata/dataset/kernels/data/random_apply_op.h"
#include "minddata/dataset/kernels/data/random_choice_op.h"
@ -314,6 +315,17 @@ Status PadEndOperation::from_json(nlohmann::json op_params, std::shared_ptr<Tens
*operation = std::make_shared<transforms::PadEndOperation>(pad_shape, pad_value);
return Status::OK();
}
#if !defined(_WIN32) && !defined(_WIN64)
// ParseExampleOperation
ParseExampleOperation::ParseExampleOperation(DataSchema schema, std::vector<std::string> column_list,
bool parallel_parse)
: schema_(std::move(schema)), column_list_(std::move(column_list)), parallel_parse_(parallel_parse) {}
std::shared_ptr<TensorOp> ParseExampleOperation::Build() {
return std::make_shared<ParseExampleOp>(schema_, column_list_, parallel_parse_);
}
#endif
#endif
// PreBuiltOperation

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2023 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,12 +17,13 @@
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_IR_DATA_TRANSFORMS_IR_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_IR_DATA_TRANSFORMS_IR_H_
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "minddata/dataset/core/data_type.h"
#include "minddata/dataset/engine/data_schema.h"
#include "minddata/dataset/include/dataset/datasets.h"
#include "minddata/dataset/kernels/ir/tensor_operation.h"
namespace mindspore {
@ -37,13 +38,14 @@ constexpr char kFillOperation[] = "Fill";
constexpr char kMaskOperation[] = "Mask";
constexpr char kOneHotOperation[] = "OneHot";
constexpr char kPadEndOperation[] = "PadEnd";
constexpr char kParseExampleOperation[] = "ParseExample";
constexpr char kPluginOperation[] = "Plugin";
constexpr char kPreBuiltOperation[] = "PreBuilt";
constexpr char kSliceOperation[] = "Slice";
constexpr char kRandomApplyOperation[] = "RandomApply";
constexpr char kRandomChoiceOperation[] = "RandomChoice";
constexpr char kSliceOperation[] = "Slice";
constexpr char kTypeCastOperation[] = "TypeCast";
constexpr char kUniqueOperation[] = "Unique";
constexpr char kPluginOperation[] = "Plugin";
/* ####################################### Derived TensorOperation classes ################################# */
class ComposeOperation : public TensorOperation {
@ -212,6 +214,22 @@ class PadEndOperation : public TensorOperation {
std::shared_ptr<Tensor> pad_value_;
};
class ParseExampleOperation : public TensorOperation {
public:
ParseExampleOperation(DataSchema schema, std::vector<std::string> column_list, bool parallel_parse);
~ParseExampleOperation() override = default;
std::shared_ptr<TensorOp> Build() override;
std::string Name() const override { return kParseExampleOperation; }
private:
DataSchema schema_;
std::vector<std::string> column_list_;
bool parallel_parse_;
};
class PreBuiltOperation : public TensorOperation {
public:
explicit PreBuiltOperation(std::shared_ptr<TensorOp> tensor_op);

View File

@ -1,5 +1,5 @@
/**
* Copyright 2020-2023 Huawei Technologies Co., Ltd
* Copyright 2020-2024 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -242,6 +242,7 @@ constexpr char kFillOp[] = "FillOp";
constexpr char kMaskOp[] = "MaskOp";
constexpr char kOneHotOp[] = "OneHotOp";
constexpr char kPadEndOp[] = "PadEndOp";
constexpr char kParseExampleOp[] = "ParseExampleOp";
constexpr char kSliceOp[] = "SliceOp";
constexpr char kToFloat16Op[] = "ToFloat16Op";
constexpr char kTypeCastOp[] = "TypeCastOp";

View File

@ -51,7 +51,7 @@ class Allocator {
using propagate_on_container_move_assignment = std::true_type;
using propagate_on_container_swap = std::true_type;
explicit Allocator(const std::shared_ptr<MemoryPool> &b) : pool_(b) {}
explicit Allocator(std::shared_ptr<MemoryPool> b) : pool_(std::move(b)) {}
~Allocator() = default;
@ -89,6 +89,7 @@ class Allocator {
private:
std::shared_ptr<MemoryPool> pool_;
};
/// \brief It is a wrapper of unique_ptr with a custom Allocator class defined above
template <typename T, typename C = std::allocator<T>, typename... Args>
Status MakeUnique(std::unique_ptr<T[], std::function<void(T *)>> *out, C alloc, size_t n, Args &&... args) {

View File

@ -16,16 +16,13 @@
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_QUEUE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_QUEUE_H_
#include <atomic>
#include <memory>
#include <mutex>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "./securec.h"
#include "utils/ms_utils.h"
#include "minddata/dataset/util/allocator.h"
#include "minddata/dataset/util/log_adapter.h"
#include "minddata/dataset/util/services.h"
@ -89,7 +86,7 @@ class Queue {
Status rc =
full_cv_.Wait(&_lock, [this]() -> bool { return (SizeWhileHoldingLock() != CapacityWhileHoldingLock()); });
if (rc.IsOk()) {
RETURN_IF_NOT_OK(this->AddWhileHoldingLock(ele));
this->AddWhileHoldingLock(ele);
empty_cv_.NotifyAll();
_lock.unlock();
} else {
@ -104,7 +101,7 @@ class Queue {
Status rc =
full_cv_.Wait(&_lock, [this]() -> bool { return (SizeWhileHoldingLock() != CapacityWhileHoldingLock()); });
if (rc.IsOk()) {
RETURN_IF_NOT_OK(this->AddWhileHoldingLock(std::forward<T>(ele)));
this->AddWhileHoldingLock(std::forward<T>(ele));
empty_cv_.NotifyAll();
_lock.unlock();
} else {
@ -136,7 +133,7 @@ class Queue {
// Block when empty
Status rc = empty_cv_.Wait(&_lock, [this]() -> bool { return !EmptyWhileHoldingLock(); });
if (rc.IsOk()) {
RETURN_IF_NOT_OK(this->PopFrontWhileHoldingLock(p, true));
this->PopFrontWhileHoldingLock(p, true);
full_cv_.NotifyAll();
_lock.unlock();
} else {
@ -166,7 +163,7 @@ class Queue {
if (head_ < tail_) {
// if there are elements left in queue, pop out
T temp;
RETURN_IF_NOT_OK(this->PopFrontWhileHoldingLock(&temp, true));
this->PopFrontWhileHoldingLock(&temp, true);
queue.push_back(temp);
} else {
// if there is nothing left in queue, check extra_arr_
@ -183,14 +180,14 @@ class Queue {
// if there are extra elements in queue, put them to extra_arr_
while (head_ < tail_) {
T temp;
RETURN_IF_NOT_OK(this->PopFrontWhileHoldingLock(&temp, false));
this->PopFrontWhileHoldingLock(&temp, false);
extra_arr_.push_back(temp);
}
this->ResetQue();
RETURN_IF_NOT_OK(arr_.allocate(new_capacity));
sz_ = new_capacity;
for (int32_t i = 0; i < static_cast<int32_t>(queue.size()); ++i) {
RETURN_IF_NOT_OK(this->AddWhileHoldingLock(queue[i]));
this->AddWhileHoldingLock(queue[i]);
}
queue.clear();
_lock.unlock();
@ -210,28 +207,25 @@ class Queue {
CondVar full_cv_;
// Helper function for Add, must be called when holding a lock
Status AddWhileHoldingLock(const_reference ele) {
void AddWhileHoldingLock(const_reference ele) {
auto k = tail_++ % sz_;
*(arr_[k]) = ele;
return Status::OK();
}
// Helper function for Add, must be called when holding a lock
Status AddWhileHoldingLock(T &&ele) {
void AddWhileHoldingLock(T &&ele) {
auto k = tail_++ % sz_;
*(arr_[k]) = std::forward<T>(ele);
return Status::OK();
}
// Helper function for PopFront, must be called when holding a lock
Status PopFrontWhileHoldingLock(pointer p, bool clean_extra) {
void PopFrontWhileHoldingLock(pointer p, bool clean_extra) {
auto k = head_++ % sz_;
*p = std::move(*(arr_[k]));
if (!extra_arr_.empty() && clean_extra) {
RETURN_IF_NOT_OK(this->AddWhileHoldingLock(std::forward<T>(extra_arr_[0])));
this->AddWhileHoldingLock(std::forward<T>(extra_arr_[0]));
extra_arr_.erase(extra_arr_.begin());
}
return Status::OK();
}
void ResetQue() noexcept {

View File

@ -36,7 +36,7 @@ namespace mindspore {
namespace dataset {
#define RETURN_IF_NOT_OK(_s) \
do { \
mindspore::Status __rc = (_s); \
const mindspore::Status &__rc = (_s); \
if (__rc.IsError()) { \
return __rc; \
} \
@ -96,7 +96,7 @@ namespace dataset {
#define RETURN_SECOND_IF_ERROR(_s, _r) \
do { \
mindspore::Status __rc = (_s); \
const mindspore::Status &__rc = (_s); \
if (__rc.IsError()) { \
MS_LOG(ERROR) << __rc; \
return _r; \

View File

@ -208,16 +208,16 @@ if(MSLITE_MINDDATA_IMPLEMENT STREQUAL "full")
${MINDDATA_DIR}/engine/datasetops/source/album_op.cc
${MINDDATA_DIR}/engine/datasetops/source/mnist_op.cc
${MINDDATA_DIR}/engine/datasetops/source/mappable_leaf_op.cc
${MINDDATA_DIR}/engine/datasetops/source/io_block.cc
${MINDDATA_DIR}/engine/opt/pre/add_skip_pass.cc
${MINDDATA_DIR}/engine/opt/pre/cache_validation_pass.cc
${MINDDATA_DIR}/engine/opt/pre/debug_mode_pass.cc
${MINDDATA_DIR}/engine/opt/pre/deep_copy_pass.cc
${MINDDATA_DIR}/engine/opt/pre/epoch_ctrl_pass.cc
${MINDDATA_DIR}/engine/opt/pre/getter_pass.cc
${MINDDATA_DIR}/engine/opt/pre/input_validation_pass.cc
${MINDDATA_DIR}/engine/opt/pre/debug_mode_pass.cc
${MINDDATA_DIR}/engine/opt/pre/cache_validation_pass.cc
${MINDDATA_DIR}/engine/opt/pre/insert_map_pass.cc
${MINDDATA_DIR}/engine/opt/pre/node_removal_pass.cc
${MINDDATA_DIR}/engine/opt/pre/epoch_ctrl_pass.cc
${MINDDATA_DIR}/engine/opt/pre/deep_copy_pass.cc
${MINDDATA_DIR}/engine/opt/pre/skip_pushdown_pass.cc
${MINDDATA_DIR}/engine/opt/post/auto_worker_pass.cc
${MINDDATA_DIR}/engine/opt/pass.cc

View File

@ -106,7 +106,7 @@ std::shared_ptr<mindspore::dataset::BatchOp> DatasetOpTesting::Batch(int32_t bat
std::shared_ptr<mindspore::dataset::RepeatOp> DatasetOpTesting::Repeat(int repeat_cnt) {
std::shared_ptr<mindspore::dataset::RepeatOp> op = std::make_shared<mindspore::dataset::RepeatOp>(repeat_cnt);
return std::move(op);
return op;
}
std::shared_ptr<mindspore::dataset::TFReaderOp> DatasetOpTesting::TFReader(std::string file, int num_works) {
@ -118,9 +118,9 @@ std::shared_ptr<mindspore::dataset::TFReaderOp> DatasetOpTesting::TFReader(std::
std::vector<std::string> files = {file};
std::shared_ptr<mindspore::dataset::TFReaderOp> so = std::make_shared<mindspore::dataset::TFReaderOp>(
num_works, worker_connector_size, 0, files, std::make_unique<mindspore::dataset::DataSchema>(), op_connector_size,
columns_to_load, false, 1, 0, false);
columns_to_load, false, 1, 0, false, CompressionType::NONE, true);
(void)so->Init();
return std::move(so);
return so;
}
std::shared_ptr<mindspore::dataset::ExecutionTree> DatasetOpTesting::Build(
@ -135,7 +135,7 @@ std::shared_ptr<mindspore::dataset::ExecutionTree> DatasetOpTesting::Build(
tree->AssignRoot(ops[i]);
}
}
return std::move(tree);
return tree;
}
#ifdef __cplusplus

View File

@ -31,6 +31,7 @@
using mindspore::Status;
using mindspore::StatusCode;
using CompressionType = mindspore::dataset::NonMappableLeafOp::CompressionType;
#define ASSERT_OK(_s) \
do { \

View File

@ -92,8 +92,9 @@ TEST_F(MindDataTestExecutionTree, TestExecutionTree2) {
std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
std::vector<std::string> columns_to_load = {};
std::vector<std::string> files = {dataset_path};
std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
1, 2, 0, files, std::move(schema), op_connector_size, columns_to_load, false, 1, 0, false);
std::shared_ptr<TFReaderOp> my_tfreader_op =
std::make_shared<TFReaderOp>(1, 2, 0, files, std::move(schema), op_connector_size, columns_to_load, false, 1, 0,
false, CompressionType::NONE, true);
rc = my_tfreader_op->Init();
ASSERT_OK(rc);
rc = my_tree->AssociateNode(my_tfreader_op);

View File

@ -56,7 +56,7 @@ std::shared_ptr<MindRecordOp> CreateMindRecord(int32_t mind_record_workers, bool
mind_record_workers, dataset_files, load, op_connector_queue_size, columns_to_load, std::move(operators), 0,
nullptr, sample_bytes, shuffle_mode, std::move(shard_reader), std::move(sampler));
(void)op->Init();
return std::move(op);
return op;
}
/// Feature: MindRecord op

View File

@ -51,7 +51,7 @@ TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeRowsPerBuffer) {
std::shared_ptr<TFReaderOp> my_tfreader_op =
std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
columns_to_load, false, 1, 0, false);
columns_to_load, false, 1, 0, false, CompressionType::NONE, true);
rc = my_tfreader_op->Init();
ASSERT_TRUE(rc.IsOk());
rc = my_tree->AssociateNode(my_tfreader_op);
@ -111,7 +111,7 @@ TEST_F(MindDataTestTFReaderOp, TestTFReaderSmallRowsPerBuffer) {
schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
std::shared_ptr<TFReaderOp> my_tfreader_op =
std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
columns_to_load, false, 1, 0, false);
columns_to_load, false, 1, 0, false, CompressionType::NONE, true);
rc = my_tfreader_op->Init();
ASSERT_TRUE(rc.IsOk());
rc = my_tree->AssociateNode(my_tfreader_op);
@ -171,7 +171,7 @@ TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeQueueSize) {
schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
std::shared_ptr<TFReaderOp> my_tfreader_op =
std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
columns_to_load, false, 1, 0, false);
columns_to_load, false, 1, 0, false, CompressionType::NONE, true);
rc = my_tfreader_op->Init();
ASSERT_TRUE(rc.IsOk());
rc = my_tree->AssociateNode(my_tfreader_op);
@ -231,7 +231,7 @@ TEST_F(MindDataTestTFReaderOp, TestTFReaderOneThread) {
schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
std::shared_ptr<TFReaderOp> my_tfreader_op =
std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
columns_to_load, false, 1, 0, false);
columns_to_load, false, 1, 0, false, CompressionType::NONE, true);
rc = my_tfreader_op->Init();
ASSERT_TRUE(rc.IsOk());
rc = my_tree->AssociateNode(my_tfreader_op);
@ -294,7 +294,7 @@ TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Buffer) {
std::shared_ptr<TFReaderOp> my_tfreader_op =
std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
columns_to_load, false, 1, 0, false);
columns_to_load, false, 1, 0, false, CompressionType::NONE, true);
rc = my_tfreader_op->Init();
ASSERT_TRUE(rc.IsOk());
rc = my_tree->AssociateNode(my_tfreader_op);
@ -335,7 +335,6 @@ TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Buffer) {
ASSERT_EQ(row_count, 5);
}
/// Feature: TFReader op
/// Description: Test TFReaderOp::CountTotalRows basic cases
/// Expectation: Output is equal to the expected output

View File

@ -38,7 +38,7 @@
"shape": [2, 2, 2]
},
"col_binary": {
"type": "uint8",
"type": "string",
"rank": 1,
"shape": [1]
}

View File

@ -38,7 +38,7 @@
"shape": [2, 2, 2]
},
"col_binary": {
"type": "uint8",
"type": "string",
"rank": 1,
"shape": [1]
}

View File

@ -38,7 +38,7 @@
"shape": [2, 2, 2]
},
"col_binary": {
"type": "uint8",
"type": "string",
"rank": 1,
"shape": [1]
}

View File

@ -38,7 +38,7 @@
"shape": [2, 2, 2]
},
"col_binary": {
"type": "uint8",
"type": "string",
"rank": 1,
"shape": [1]
}

View File

@ -37,7 +37,7 @@
"shape": [2, 2, 2]
},
"col_binary": {
"type": "uint8",
"type": "string",
"rank": 1,
"shape": [1]
}

View File

@ -1,46 +0,0 @@
{
"datasetType": "TF",
"numRows": 24,
"columns": {
"col_sint16": {
"type": "int16",
"rank": 1,
"shape": [1]
},
"col_sint32": {
"type": "int32",
"rank": 1,
"shape": [1]
},
"col_sint64": {
"type": "int64",
"rank": 1,
"shape": [1]
},
"col_float": {
"type": "float32",
"rank": 1,
"shape": [1]
},
"col_1d": {
"type": "int64",
"rank": 1,
"shape": [2]
},
"col_2d": {
"type": "int64",
"rank": 2,
"shape": [2, 2]
},
"col_3d": {
"type": "int64",
"rank": 3,
"shape": [2, 2, 2]
},
"col_binary": {
"type": "uint8",
"rank": 1,
"shape": [-1, 10]
}
}
}

View File

@ -34,7 +34,7 @@
"shape": [2, 2, 2]
},
"col_binary": {
"type": "uint8",
"type": "string",
"rank": 0
}
}

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import pytest
import mindspore.dataset as ds
from mindspore import log as logger
from util import save_and_check_dict, config_get_set_seed
@ -89,6 +91,7 @@ def test_2ops_repeat_batch():
save_and_check_dict(data1, filename, generate_golden=GENERATE_GOLDEN)
@pytest.mark.skip(reason="type cast wrong")
def test_2ops_batch_repeat():
"""
Feature: 2ops (shuffle, repeat, batch)
@ -109,6 +112,7 @@ def test_2ops_batch_repeat():
save_and_check_dict(data1, filename, generate_golden=GENERATE_GOLDEN)
@pytest.mark.skip(reason="type cast wrong")
def test_2ops_batch_shuffle():
"""
Feature: 2ops (shuffle, repeat, batch)

View File

@ -225,6 +225,7 @@ def test_batch_10():
save_and_check_dict(data1, filename, generate_golden=GENERATE_GOLDEN)
@pytest.mark.skip(reason="type cast wrong")
def test_batch_11():
"""
Feature: Batch op
@ -561,6 +562,7 @@ def test_batch_exception_16():
Description: Test Batch op with mismatched batch type
Expectation: Error is raised as expected
"""
def gen(num):
for i in range(num):
if i % 2 == 0:
@ -589,6 +591,7 @@ def test_batch_exception_17():
Description: Test Batch op with mismatched batch size
Expectation: Error is raised as expected
"""
def gen(num):
for i in range(1, num + 1):
yield np.array([i] * i)
@ -611,6 +614,7 @@ def test_no_input_columns_01():
Description: Test with per_batch_map has value but input_columns has no value
Expectation: Output is equal to the expected output
"""
def gen_2_cols(num):
for i in range(1, 1 + num):
yield (np.array([i]), np.array([i ** 2]))
@ -639,6 +643,7 @@ def test_no_input_columns_02():
Description: Test per_batch_map has value but input_columns has no value and given output_columns parameter
Expectation: Output is equal to the expected output
"""
def gen_2_cols(num):
for i in range(1, 1 + num):
yield (np.array([i]), np.array([i ** 2]))
@ -669,6 +674,7 @@ def test_batch_exception_18():
Description: Test batch with parameter column_order
Expectation: Output is equal to the expected output
"""
def gen(num):
for i in range(num):
if i % 2 == 0:

View File

@ -395,9 +395,12 @@ def test_concat_15():
data_dir = "../data/dataset/testPK/data"
data_dir2 = [
"../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"]
schema_file = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
data1 = ds.ImageFolderDataset(data_dir)
data2 = ds.TFRecordDataset(data_dir2, columns_list=["image"])
data2 = ds.TFRecordDataset(data_dir2, schema=schema_file, columns_list=["image"])
data1 = data1.map(operations=F.Decode(), input_columns=["image"])
data2 = data2.map(operations=F.Decode(), input_columns=["image"])
data1 = data1.project(["image"])
data3 = data1 + data2
@ -527,8 +530,10 @@ def test_concat_18():
class DS:
def __init__(self, i, j):
self.data = [i for i in range(i, j)]
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return len(self.data)
@ -563,8 +568,10 @@ def test_concat_19():
class DS:
def __init__(self, i, j):
self.data = [i for i in range(i, j)]
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return len(self.data)

View File

@ -92,9 +92,10 @@ def test_numpy_slices_list_append():
logger.info("Test reading data of image list.")
DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"]
SCHEMA_FILE = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
resize_height, resize_width = 2, 2
data1 = ds.TFRecordDataset(DATA_DIR)
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_FILE)
resize_op = vision.Resize((resize_height, resize_width))
data1 = data1.map(
operations=[vision.Decode(), resize_op], input_columns=["image"])

View File

@ -24,6 +24,7 @@ IMAGENET_TFFILE_DIR = ["../data/dataset/test_tf_file_3_images2/train-0000-of-000
MNIST_DATA_DIR = "../data/dataset/testMnistData"
MIND_CV_FILE_NAME = "../data/mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord"
SCHEMA_FILE = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
SCHEMA2_FILE = "../data/dataset/test_tf_file_3_images2/datasetSchema.json"
MANIFEST_DATA_FILE = "../data/dataset/testManifestData/test.manifest"
CIFAR10_DATA_DIR = "../data/dataset/testCifar10Data"
CIFAR100_DATA_DIR = "../data/dataset/testCifar100Data"
@ -77,7 +78,8 @@ def test_imagenet_tf_file_dataset_size():
assert ds_shard_2_0.get_dataset_size() == 6
assert len(ds_shard_2_0) == 6
ds_shard_3_0 = ds.TFRecordDataset(IMAGENET_TFFILE_DIR, num_shards=3, shard_id=0, shard_equal_rows=True)
ds_shard_3_0 = ds.TFRecordDataset(IMAGENET_TFFILE_DIR, schema=SCHEMA2_FILE, num_shards=3, shard_id=0,
shard_equal_rows=True)
assert ds_shard_3_0.get_dataset_size() == 4
assert len(ds_shard_3_0) == 4
@ -88,7 +90,7 @@ def test_imagenet_tf_file_dataset_size():
assert len(ds_shard_3_0) == count
# shard_equal_rows is set to False therefore, get_dataset_size must return count
ds_shard_4_0 = ds.TFRecordDataset(IMAGENET_TFFILE_DIR, num_shards=4, shard_id=0)
ds_shard_4_0 = ds.TFRecordDataset(IMAGENET_TFFILE_DIR, schema=SCHEMA2_FILE, num_shards=4, shard_id=0)
count = 0
for _ in ds_shard_4_0.create_dict_iterator(num_epochs=1):
count += 1

View File

@ -145,20 +145,6 @@ def test_tfrecord_no_schema():
save_and_check_dict(data, filename, generate_golden=GENERATE_GOLDEN)
def test_tfrecord_pad():
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset with pad bytes10
Expectation: The dataset is processed as expected
"""
logger.info("test_tfrecord_pad")
schema_file = "../data/dataset/testTFTestAllTypes/datasetSchemaPadBytes10.json"
data = ds.TFRecordDataset(FILES, schema_file, shuffle=ds.Shuffle.FILES)
filename = "tfrecord_pad_bytes10.npz"
save_and_check_dict(data, filename, generate_golden=GENERATE_GOLDEN)
def test_tfrecord_read_files():
"""
Feature: TFRecordDataset
@ -196,36 +182,280 @@ def test_tfrecord_multi_files():
logger.info("test_tfrecord_multi_files")
data1 = ds.TFRecordDataset(DATA_FILES2, SCHEMA_FILE2, shuffle=False)
data1 = data1.repeat(1)
num_iter = 0
num_itr = 0
for _ in data1.create_dict_iterator(num_epochs=1):
num_iter += 1
num_itr += 1
assert num_iter == 12
assert num_itr == 12
def test_tfrecord_schema():
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_with_full_schema(do_batch):
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset schema
Expectation: The dataset is processed as expected
Description: Test TFRecordDataset with full schema containing all the feature name, type and shape
Expectation: The data can be processed as expected
"""
logger.info("test_tfrecord_schema")
schema = ds.Schema()
schema.add_column("col_1d", de_type=mstype.int64, shape=[2])
schema.add_column("col_2d", de_type=mstype.int64, shape=[2, 2])
schema.add_column("col_3d", de_type=mstype.int64, shape=[2, 2, 2])
schema.add_column("col_binary", de_type=mstype.string, shape=[1])
schema.add_column("col_float", de_type=mstype.float32, shape=[1])
schema.add_column("col_sint16", de_type=mstype.int64, shape=[1])
schema.add_column("col_sint32", de_type=mstype.int64, shape=[1])
schema.add_column("col_sint64", de_type=mstype.int64, shape=[1])
schema.add_column("col_sint8", de_type=mstype.int64, shape=[1])
dataset = ds.TFRecordDataset(FILES, schema=schema, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset = dataset.batch(2)
count = 0
for _ in dataset:
count += 1
assert dataset.get_dataset_size() == count
assert dataset.get_col_names() == ["col_1d", "col_2d", "col_3d",
"col_binary", "col_float",
"col_sint16", "col_sint32", "col_sint64", "col_sint8"]
assert dataset.output_types() == [np.int64, np.int64, np.int64, np.str_, np.float32, np.int64, np.int64, np.int64,
np.int64]
if do_batch:
expected_shape = [[2, 2], [2, 2, 2], [2, 2, 2, 2], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1]]
else:
expected_shape = [[2], [2, 2], [2, 2, 2], [1], [1], [1], [1], [1], [1]]
assert dataset.output_shapes() == expected_shape
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_with_unknown_shape_schema(do_batch):
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset with schema missing feature shape
Expectation: The data can be processed as expected
"""
schema = ds.Schema()
schema.add_column("col_1d", de_type=mstype.int64)
schema.add_column("col_2d", de_type=mstype.int64)
schema.add_column("col_3d", de_type=mstype.int64)
schema.add_column("col_binary", de_type=mstype.string)
schema.add_column("col_float", de_type=mstype.float32)
schema.add_column("col_sint16", de_type=mstype.int64)
schema.add_column("col_sint32", de_type=mstype.int64)
schema.add_column("col_sint64", de_type=mstype.int64)
schema.add_column("col_sint8", de_type=mstype.int64)
dataset = ds.TFRecordDataset(FILES, schema=schema, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset = dataset.batch(2)
count = 0
for _ in dataset:
count += 1
assert dataset.get_dataset_size() == count
assert dataset.get_col_names() == ["col_1d", "col_2d", "col_3d",
"col_binary", "col_float",
"col_sint16", "col_sint32", "col_sint64", "col_sint8"]
assert dataset.output_types() == [np.int64, np.int64, np.int64, np.str_, np.float32, np.int64, np.int64, np.int64,
np.int64]
if do_batch:
expected_shape = [[2, 2], [2, 4], [2, 8], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1]]
else:
expected_shape = [[2], [4], [8], [1], [1], [1], [1], [1], [1]]
assert dataset.output_shapes() == expected_shape
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_with_wrong_shape_schema(do_batch):
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset with schema containing wrong feature shape
Expectation: Raise a RuntimeError as expected
"""
schema = ds.Schema()
schema.add_column("col_1d", de_type=mstype.int64, shape=[2])
schema.add_column("col_2d", de_type=mstype.int64, shape=[2, 2])
schema.add_column("col_3d", de_type=mstype.int64, shape=[2, 2, 2])
schema.add_column("col_binary", de_type=mstype.string, shape=[5])
schema.add_column("col_float", de_type=mstype.float32)
schema.add_column("col_sint16", de_type=mstype.int64)
schema.add_column("col_sint32", de_type=mstype.int64)
schema.add_column("col_sint64", de_type=mstype.int64)
schema.add_column("col_sint8", de_type=mstype.int64)
dataset = ds.TFRecordDataset(FILES, schema=schema, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset = dataset.batch(2)
with pytest.raises(RuntimeError) as e:
for _ in dataset:
pass
assert "Column shape of col_binary defined in schema does not match the shape actually load" in str(e.value)
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_with_wrong_type_schema(do_batch):
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset with schema containing wrong feature type
Expectation: The output columns can be converted to the specified type
"""
schema = ds.Schema()
schema.add_column("col_1d", de_type=mstype.int8, shape=[2])
schema.add_column("col_2d", de_type=mstype.int16, shape=[2, 2])
schema.add_column("col_3d", de_type=mstype.int32, shape=[2, 2, 2])
schema.add_column("col_binary", de_type=mstype.string, shape=[1])
schema.add_column("col_float", de_type=mstype.float64, shape=[1])
schema.add_column("col_sint16", de_type=mstype.int16, shape=[1])
schema.add_column("col_sint32", de_type=mstype.int32, shape=[1])
schema.add_column("col_sint64", de_type=mstype.int64, shape=[1])
schema.add_column("col_sint8", de_type=mstype.int16, shape=[1])
dataset = ds.TFRecordDataset(FILES, schema=schema, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset = dataset.batch(2)
count = 0
for _ in dataset:
count += 1
assert dataset.get_dataset_size() == count
assert dataset.get_col_names() == ["col_1d", "col_2d", "col_3d",
"col_binary", "col_float",
"col_sint16", "col_sint32", "col_sint64", "col_sint8"]
assert dataset.output_types() == [np.int8, np.int16, np.int32, np.str_, np.float64, np.int16, np.int32, np.int64,
np.int16]
if do_batch:
expected_shape = [[2, 2], [2, 2, 2], [2, 2, 2, 2], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1]]
else:
expected_shape = [[2], [2, 2], [2, 2, 2], [1], [1], [1], [1], [1], [1]]
assert dataset.output_shapes() == expected_shape
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_with_column_list(do_batch):
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset with column list
Expectation: The data can be processed as expected
"""
column_list = ["col_1d", "col_2d", "col_3d",
"col_binary", "col_float",
"col_sint16", "col_sint32", "col_sint64", "col_sint8"]
dataset = ds.TFRecordDataset(FILES, columns_list=column_list, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset = dataset.batch(2)
count = 0
for _ in dataset:
count += 1
assert dataset.get_dataset_size() == count
assert dataset.get_col_names() == ["col_1d", "col_2d", "col_3d",
"col_binary", "col_float",
"col_sint16", "col_sint32", "col_sint64", "col_sint8"]
assert dataset.output_types() == [np.int64, np.int64, np.int64, np.str_, np.float32, np.int64, np.int64, np.int64,
np.int64]
if do_batch:
expected_shape = [[2, 2], [2, 4], [2, 8], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1]]
else:
expected_shape = [[2], [4], [8], [1], [1], [1], [1], [1], [1]]
assert dataset.output_shapes() == expected_shape
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_without_schema_and_column_list(do_batch):
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset without both schema and column list
Expectation: The data can be processed as expected
"""
dataset = ds.TFRecordDataset(FILES, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset = dataset.batch(2)
count = 0
for _ in dataset:
count += 1
assert dataset.get_dataset_size() == count
assert dataset.get_col_names() == ["col_1d", "col_2d", "col_3d",
"col_binary", "col_float",
"col_sint16", "col_sint32", "col_sint64", "col_sint8"]
assert dataset.output_types() == [np.int64, np.int64, np.int64, np.str_, np.float32, np.int64, np.int64, np.int64,
np.int64]
if do_batch:
expected_shape = [[2, 2], [2, 4], [2, 8], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1], [2, 1]]
else:
expected_shape = [[2], [4], [8], [1], [1], [1], [1], [1], [1]]
assert dataset.output_shapes() == expected_shape
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_with_both_schema_and_column_list(do_batch):
"""
Feature: TFRecordDataset
Description: Test TFRecordDataset with both schema and column list
Expectation: Only the intersection part of the data will be read
"""
schema = ds.Schema()
schema.add_column("col_1d", de_type=mstype.int64, shape=[2])
schema.add_column("col_2d", de_type=mstype.int64, shape=[4])
schema.add_column("col_3d", de_type=mstype.int64, shape=[8])
schema.add_column("col_binary", de_type=mstype.string, shape=[1])
schema.add_column("col_float", de_type=mstype.float32, shape=[1])
schema.add_column("col_sint16", de_type=mstype.int64, shape=[1])
schema.add_column("col_sint32", de_type=mstype.int64, shape=[1])
schema.add_column("col_sint64", de_type=mstype.int64, shape=[1])
schema.add_column("col_sint8", de_type=mstype.int64, shape=[1])
# this list only contains a part of columns and is out of order
column_list = ["col_sint8", "col_binary", "col_2d", "col_float", "col_3d"]
dataset = ds.TFRecordDataset(FILES, schema=schema, columns_list=column_list, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset = dataset.batch(2)
count = 0
for _ in dataset:
count += 1
assert dataset.get_dataset_size() == count
assert dataset.get_col_names() == ["col_sint8", "col_binary", "col_2d", "col_float", "col_3d"]
assert dataset.output_types() == [np.int64, np.str_, np.int64, np.float32, np.int64]
if do_batch:
expected_shape = [[2, 1], [2, 1], [2, 4], [2, 1], [2, 8]]
else:
expected_shape = [[1], [1], [4], [1], [8]]
assert dataset.output_shapes() == expected_shape
@pytest.mark.parametrize("do_batch", (True, False))
def test_tfrecord_result_equal_with_schema_and_column_list(do_batch):
"""
Feature: TFRecordDataset
Description: Test data loaded with schema and column list is the same
Expectation: The data returned is equal with schema and column list
"""
# load data with schema
schema = ds.Schema()
schema.add_column('col_1d', de_type=mstype.int64, shape=[2])
schema.add_column('col_2d', de_type=mstype.int64, shape=[2, 2])
schema.add_column('col_3d', de_type=mstype.int64, shape=[2, 2, 2])
schema.add_column('col_binary', de_type=mstype.uint8, shape=[1])
schema.add_column('col_2d', de_type=mstype.int64, shape=[4])
schema.add_column('col_3d', de_type=mstype.int64, shape=[8])
schema.add_column('col_binary', de_type=mstype.string, shape=[1])
schema.add_column('col_float', de_type=mstype.float32, shape=[1])
schema.add_column('col_sint16', de_type=mstype.int64, shape=[1])
schema.add_column('col_sint32', de_type=mstype.int64, shape=[1])
schema.add_column('col_sint64', de_type=mstype.int64, shape=[1])
data1 = ds.TFRecordDataset(FILES, schema=schema, shuffle=ds.Shuffle.FILES)
schema.add_column('col_sint8', de_type=mstype.int64, shape=[1])
dataset_with_schema = ds.TFRecordDataset(FILES, schema=schema, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset_with_schema = dataset_with_schema.batch(2)
data2 = ds.TFRecordDataset(FILES, schema=SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
# load data with column list
column_list = ['col_1d', 'col_2d', 'col_3d', 'col_binary', 'col_float', 'col_sint16', 'col_sint32', "col_sint64",
"col_sint8"]
dataset_with_column_list = ds.TFRecordDataset(FILES, columns_list=column_list, shuffle=ds.Shuffle.FILES)
if do_batch:
dataset_with_column_list = dataset_with_column_list.batch(2)
for d1, d2 in zip(data1, data2):
for t1, t2 in zip(d1, d2):
np.testing.assert_array_equal(t1.asnumpy(), t2.asnumpy())
# compare result
for row_with_schema, row_with_column_list \
in zip(dataset_with_schema.create_tuple_iterator(num_epochs=1, output_numpy=True),
dataset_with_column_list.create_tuple_iterator(num_epochs=1, output_numpy=True)):
for column_with_schema, column_with_column_list in zip(row_with_schema, row_with_column_list):
np.testing.assert_array_equal(column_with_schema, column_with_column_list)
def test_tfrecord_shuffle():
@ -990,18 +1220,13 @@ def test_tf_wrong_schema():
logger.info("test_tf_wrong_schema")
files = ["../data/dataset/test_tf_file_3_images2/train-0000-of-0001.data"]
schema = ds.Schema()
schema.add_column('image', de_type=mstype.uint8, shape=[1])
schema.add_column('image', de_type=mstype.uint8, shape=[2])
schema.add_column('label', de_type=mstype.int64, shape=[1])
data1 = ds.TFRecordDataset(files, schema, shuffle=False)
exception_occurred = False
try:
with pytest.raises(RuntimeError) as e:
for _ in data1:
pass
except RuntimeError as e:
exception_occurred = True
assert "Data dimensions of 'image' do not match" in str(e)
assert exception_occurred, "test_tf_wrong_schema failed."
assert "Column shape of image defined in schema does not match the shape actually load" in str(e.value)
def test_tfrecord_invalid_columns():
@ -1028,6 +1253,7 @@ def test_tfrecord_exception():
def exception_func(item):
raise Exception("Error occur!")
with pytest.raises(RuntimeError) as info:
schema = ds.Schema()
schema.add_column('col_1d', de_type=mstype.int64, shape=[2])
@ -1074,6 +1300,7 @@ def test_tfrecord_exception():
dataset.output_shapes()
assert "numbers of tfrecord file should not less than num_shards" in str(info.value)
if __name__ == '__main__':
test_tfrecord_shape()
test_tfrecord_read_all_dataset()
@ -1082,10 +1309,16 @@ if __name__ == '__main__':
test_tfrecord_shape2()
test_tfrecord_files_basic()
test_tfrecord_no_schema()
test_tfrecord_pad()
test_tfrecord_read_files()
test_tfrecord_multi_files()
test_tfrecord_schema()
test_tfrecord_with_full_schema(True)
test_tfrecord_with_unknown_shape_schema(True)
test_tfrecord_with_wrong_shape_schema(True)
test_tfrecord_with_wrong_type_schema(True)
test_tfrecord_with_column_list(True)
test_tfrecord_without_schema_and_column_list(True)
test_tfrecord_with_both_schema_and_column_list(True)
test_tfrecord_result_equal_with_schema_and_column_list(True)
test_tfrecord_shuffle()
test_tfrecord_shard()
test_tfrecord_shard_equal_rows()

View File

@ -50,7 +50,7 @@ def test_decode_op():
for item1, item2 in zip(data1.create_dict_iterator(num_epochs=1, output_numpy=True),
data2.create_dict_iterator(num_epochs=1, output_numpy=True)):
actual = item1["image"]
expected = cv2.imdecode(item2["image"], cv2.IMREAD_COLOR)
expected = cv2.imdecode(np.fromstring(item2["image"], dtype=np.uint8), cv2.IMREAD_COLOR)
expected = cv2.cvtColor(expected, cv2.COLOR_BGR2RGB)
assert actual.shape == expected.shape
mse = diff_mse(actual, expected)

View File

@ -96,7 +96,7 @@ def test_decode_op():
i = 0
for item1, item2 in itertools.zip_longest(iter1, iter2):
actual = item1["image"]
expected = cv2.imdecode(item2["image"], cv2.IMREAD_COLOR)
expected = cv2.imdecode(np.fromstring(item2["image"], dtype=np.uint8), cv2.IMREAD_COLOR)
expected = cv2.cvtColor(expected, cv2.COLOR_BGR2RGB)
assert actual.shape == expected.shape
diff = actual - expected

View File

@ -61,16 +61,16 @@ def test_TFRecord_Padded():
"""
data_dir = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"]
schema_dir = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
result_list = [[159109, 2], [192607, 3], [179251, 4], [1, 5]]
result_list = [[1, 2], [1, 3], [1, 4], [1, 5]]
verify_list = []
shard_num = 4
for i in range(shard_num):
data = ds.TFRecordDataset(data_dir, schema_dir, columns_list=["image"],
shuffle=False, shard_equal_rows=True)
padded_samples = [{'image': np.zeros(1, np.uint8)}, {'image': np.zeros(2, np.uint8)},
{'image': np.zeros(3, np.uint8)}, {'image': np.zeros(4, np.uint8)},
{'image': np.zeros(5, np.uint8)}]
padded_samples = [{'image': np.zeros(1, np.bytes_)}, {'image': np.zeros(2, np.bytes_)},
{'image': np.zeros(3, np.bytes_)}, {'image': np.zeros(4, np.bytes_)},
{'image': np.zeros(5, np.bytes_)}]
padded_ds = ds.PaddedDataset(padded_samples)
concat_ds = data + padded_ds

View File

@ -194,7 +194,7 @@ class TestMinddataProfilingManager:
with open(pipeline_file) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 5
assert len(op_info) == 6
for i in range(5):
if op_info[i]["op_type"] != "ZipOp":
assert "size" in op_info[i]["metrics"]["output_queue"]
@ -203,8 +203,8 @@ class TestMinddataProfilingManager:
# Note: Zip is an inline op and hence does not have metrics information
assert op_info[i]["metrics"] is None
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
self.confirm_cpuutil(cpu_util_file, 5)
# Confirm CPU util JSON file content, when 6 ops are in the pipeline JSON file
self.confirm_cpuutil(cpu_util_file, 6)
# Confirm dataset iterator file content
self.confirm_dataset_iterator_file(dataset_iterator_file, 12)

View File

@ -401,6 +401,7 @@ def test_case_07():
file_name_auto += os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
file_name_auto += '_auto'
d1 = ds.TFRecordDataset(TFRECORD_FILES, shuffle=False)
d1 = d1.project("image/class/label")
tf_data = []
for x in d1.create_dict_iterator(num_epochs=1, output_numpy=True):
tf_data.append(x)

View File

@ -156,15 +156,15 @@ def test_tfrecord1():
"""
s = ds.Schema()
s.add_column("line", "string", [])
s.add_column("words", "string", [-1])
s.add_column("words", "string", [2, 2])
s.add_column("chinese", "string", [])
data = ds.TFRecordDataset("../data/dataset/testTextTFRecord/text.tfrecord", shuffle=False, schema=s)
for i, d in enumerate(data.create_dict_iterator(num_epochs=1, output_numpy=True)):
assert d["line"].shape == line[i].shape
assert d["line"].shape == (1,)
assert d["words"].shape == words[i].shape
assert d["chinese"].shape == chinese[i].shape
assert d["chinese"].shape == (1,)
np.testing.assert_array_equal(line[i], d["line"])
np.testing.assert_array_equal(words[i], d["words"])
np.testing.assert_array_equal(chinese[i], d["chinese"])
@ -195,17 +195,17 @@ def test_tfrecord3():
"""
s = ds.Schema()
s.add_column("line", mstype.string, [])
s.add_column("words", mstype.string, [-1, 2])
s.add_column("words", mstype.string, [2, 2])
s.add_column("chinese", mstype.string, [])
data = ds.TFRecordDataset("../data/dataset/testTextTFRecord/text.tfrecord", shuffle=False, schema=s)
for i, d in enumerate(data.create_dict_iterator(num_epochs=1, output_numpy=True)):
assert d["line"].shape == line[i].shape
assert d["words"].shape == words[i].reshape([2, 2]).shape
assert d["chinese"].shape == chinese[i].shape
assert d["line"].shape == (1,)
assert d["words"].shape == words[i].shape
assert d["chinese"].shape == (1,)
np.testing.assert_array_equal(line[i], d["line"])
np.testing.assert_array_equal(words[i].reshape([2, 2]), d["words"])
np.testing.assert_array_equal(words[i], d["words"])
np.testing.assert_array_equal(chinese[i], d["chinese"])
@ -367,6 +367,7 @@ def test_process_string_pipeline():
Description: Test processing string and bytes data
Expectation: The output is as expected
"""
def generate_and_process_string(dtype):
data = np.array([["apple"], ["orange"], ["banana"], ["1"], ["2"], ["3"], ["a"], ["b"], ["c"]], dtype=dtype)
dataset = ds.NumpySlicesDataset(data, column_names=["text"])