c++ api for CSV Dataset

This commit is contained in:
jiangzhiwen 2020-08-18 22:31:44 +08:00
parent 3259dafa7e
commit 39df25f803
6 changed files with 662 additions and 10 deletions

View File

@ -25,6 +25,7 @@
#include "minddata/dataset/engine/datasetops/source/cifar_op.h"
#include "minddata/dataset/engine/datasetops/source/clue_op.h"
#include "minddata/dataset/engine/datasetops/source/coco_op.h"
#include "minddata/dataset/engine/datasetops/source/csv_op.h"
#include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
#include "minddata/dataset/engine/datasetops/source/manifest_op.h"
#include "minddata/dataset/engine/datasetops/source/mnist_op.h"
@ -161,6 +162,18 @@ std::shared_ptr<CocoDataset> Coco(const std::string &dataset_dir, const std::str
return ds->ValidateParams() ? ds : nullptr;
}
// Function to create a CSVDataset.
std::shared_ptr<CSVDataset> CSV(const std::vector<std::string> &dataset_files, char field_delim,
const std::vector<std::shared_ptr<CsvBase>> &column_defaults,
const std::vector<std::string> &column_names, int64_t num_samples, ShuffleMode shuffle,
int32_t num_shards, int32_t shard_id) {
auto ds = std::make_shared<CSVDataset>(dataset_files, field_delim, column_defaults, column_names, num_samples,
shuffle, num_shards, shard_id);
// Call derived class validation method.
return ds->ValidateParams() ? ds : nullptr;
}
// Function to create a ImageFolderDataset.
std::shared_ptr<ImageFolderDataset> ImageFolder(const std::string &dataset_dir, bool decode,
const std::shared_ptr<SamplerObj> &sampler,
@ -1021,6 +1034,84 @@ std::vector<std::shared_ptr<DatasetOp>> CocoDataset::Build() {
return node_ops;
}
// Constructor for CSVDataset
CSVDataset::CSVDataset(const std::vector<std::string> &csv_files, char field_delim,
const std::vector<std::shared_ptr<CsvBase>> &column_defaults,
const std::vector<std::string> &column_names, int64_t num_samples, ShuffleMode shuffle,
int32_t num_shards, int32_t shard_id)
: dataset_files_(csv_files),
field_delim_(field_delim),
column_defaults_(column_defaults),
column_names_(column_names),
num_samples_(num_samples),
shuffle_(shuffle),
num_shards_(num_shards),
shard_id_(shard_id) {}
bool CSVDataset::ValidateParams() {
if (!ValidateDatasetFilesParam("CSVDataset", dataset_files_)) {
return false;
}
if (field_delim_ == '"' || field_delim_ == '\r' || field_delim_ == '\n') {
MS_LOG(ERROR) << "CSVDataset: The field delimiter should not be \", \\r, \\n";
return false;
}
if (num_samples_ < -1) {
MS_LOG(ERROR) << "CSVDataset: Invalid number of samples: " << num_samples_;
return false;
}
if (!ValidateDatasetShardParams("CSVDataset", num_shards_, shard_id_)) {
return false;
}
return true;
}
// Function to build CSVDataset
std::vector<std::shared_ptr<DatasetOp>> CSVDataset::Build() {
// A vector containing shared pointer to the Dataset Ops that this object will create
std::vector<std::shared_ptr<DatasetOp>> node_ops;
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default_list;
for (auto v : column_defaults_) {
if (v->type == CsvType::INT) {
column_default_list.push_back(
std::make_shared<CsvOp::Record<int>>(CsvOp::INT, std::dynamic_pointer_cast<CsvRecord<int>>(v)->value));
} else if (v->type == CsvType::FLOAT) {
column_default_list.push_back(
std::make_shared<CsvOp::Record<float>>(CsvOp::FLOAT, std::dynamic_pointer_cast<CsvRecord<float>>(v)->value));
} else if (v->type == CsvType::STRING) {
column_default_list.push_back(std::make_shared<CsvOp::Record<std::string>>(
CsvOp::STRING, std::dynamic_pointer_cast<CsvRecord<std::string>>(v)->value));
}
}
std::shared_ptr<CsvOp> csv_op = std::make_shared<CsvOp>(
dataset_files_, field_delim_, column_default_list, column_names_, num_workers_, rows_per_buffer_, num_samples_,
worker_connector_size_, connector_que_size_, shuffle_files, num_shards_, shard_id_);
RETURN_EMPTY_IF_ERROR(csv_op->Init());
if (shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;
int64_t num_rows = 0;
// First, get the number of rows in the dataset
RETURN_EMPTY_IF_ERROR(CsvOp::CountAllFileRows(dataset_files_, column_names_.empty(), &num_rows));
// Add the shuffle op after this op
RETURN_EMPTY_IF_ERROR(AddShuffleOp(dataset_files_.size(), num_shards_, num_rows, 0, connector_que_size_,
rows_per_buffer_, &shuffle_op));
node_ops.push_back(shuffle_op);
}
node_ops.push_back(csv_op);
return node_ops;
}
ImageFolderDataset::ImageFolderDataset(std::string dataset_dir, bool decode, std::shared_ptr<SamplerObj> sampler,
bool recursive, std::set<std::string> extensions,
std::map<std::string, int32_t> class_indexing)

View File

@ -51,6 +51,8 @@ class Cifar10Dataset;
class Cifar100Dataset;
class CLUEDataset;
class CocoDataset;
class CSVDataset;
class CsvBase;
class ImageFolderDataset;
class ManifestDataset;
class MnistDataset;
@ -114,13 +116,13 @@ std::shared_ptr<Cifar100Dataset> Cifar100(const std::string &dataset_dir,
/// \param[in] usage Be used to "train", "test" or "eval" data (default="train").
/// \param[in] num_samples The number of samples to be included in the dataset.
/// (Default = 0 means all samples.)
/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode.kGlobal)
/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode::kGlobal)
/// Can be any of:
/// ShuffleMode.kFalse - No shuffling is performed.
/// ShuffleMode.kFiles - Shuffle files only.
/// ShuffleMode.kGlobal - Shuffle both the files and samples.
/// ShuffleMode::kFalse - No shuffling is performed.
/// ShuffleMode::kFiles - Shuffle files only.
/// ShuffleMode::kGlobal - Shuffle both the files and samples.
/// \param[in] num_shards Number of shards that the dataset should be divided into. (Default = 1)
/// \param[in] shard_id The shard ID within num_shards. This argument should be
/// \param[in] shard_id The shard ID within num_shards. This argument should be
/// specified only when num_shards is also specified. (Default = 0)
/// \return Shared pointer to the current CLUEDataset
std::shared_ptr<CLUEDataset> CLUE(const std::vector<std::string> &dataset_files, const std::string &task = "AFQMC",
@ -148,6 +150,32 @@ std::shared_ptr<CocoDataset> Coco(const std::string &dataset_dir, const std::str
const std::string &task = "Detection", const bool &decode = false,
const std::shared_ptr<SamplerObj> &sampler = nullptr);
/// \brief Function to create a CSVDataset
/// \notes The generated dataset has a variable number of columns
/// \param[in] dataset_files List of files to be read to search for a pattern of files. The list
/// will be sorted in a lexicographical order.
/// \param[in] field_delim A char that indicates the delimiter to separate fields (default=',').
/// \param[in] column_defaults List of default values for the CSV field (default={}). Each item in the list is
/// either a valid type (float, int, or string). If this is not provided, treats all columns as string type.
/// \param[in] column_names List of column names of the dataset (default={}). If this is not provided, infers the
/// column_names from the first row of CSV file.
/// \param[in] num_samples The number of samples to be included in the dataset.
/// (Default = -1 means all samples.)
/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode::kGlobal)
/// Can be any of:
/// ShuffleMode::kFalse - No shuffling is performed.
/// ShuffleMode::kFiles - Shuffle files only.
/// ShuffleMode::kGlobal - Shuffle both the files and samples.
/// \param[in] num_shards Number of shards that the dataset should be divided into. (Default = 1)
/// \param[in] shard_id The shard ID within num_shards. This argument should be
/// specified only when num_shards is also specified. (Default = 0)
/// \return Shared pointer to the current Dataset
std::shared_ptr<CSVDataset> CSV(const std::vector<std::string> &dataset_files, char field_delim = ',',
const std::vector<std::shared_ptr<CsvBase>> &column_defaults = {},
const std::vector<std::string> &column_names = {}, int64_t num_samples = -1,
ShuffleMode shuffle = ShuffleMode::kGlobal, int32_t num_shards = 1,
int32_t shard_id = 0);
/// \brief Function to create an ImageFolderDataset
/// \notes A source dataset that reads images from a tree of directories
/// All images within one folder have the same label
@ -217,13 +245,13 @@ std::shared_ptr<RandomDataset> RandomData(const int32_t &total_rows = 0, T schem
/// will be sorted in a lexicographical order.
/// \param[in] num_samples The number of samples to be included in the dataset.
/// (Default = 0 means all samples.)
/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode.kGlobal)
/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode::kGlobal)
/// Can be any of:
/// ShuffleMode.kFalse - No shuffling is performed.
/// ShuffleMode.kFiles - Shuffle files only.
/// ShuffleMode.kGlobal - Shuffle both the files and samples.
/// ShuffleMode::kFalse - No shuffling is performed.
/// ShuffleMode::kFiles - Shuffle files only.
/// ShuffleMode::kGlobal - Shuffle both the files and samples.
/// \param[in] num_shards Number of shards that the dataset should be divided into. (Default = 1)
/// \param[in] shard_id The shard ID within num_shards. This argument should be
/// \param[in] shard_id The shard ID within num_shards. This argument should be
/// specified only when num_shards is also specified. (Default = 0)
/// \return Shared pointer to the current TextFileDataset
std::shared_ptr<TextFileDataset> TextFile(const std::vector<std::string> &dataset_files, int32_t num_samples = 0,
@ -572,6 +600,57 @@ class CocoDataset : public Dataset {
std::shared_ptr<SamplerObj> sampler_;
};
/// \brief Record type for CSV
enum CsvType : uint8_t { INT = 0, FLOAT, STRING };
/// \brief Base class of CSV Record
struct CsvBase {
public:
CsvBase() = default;
explicit CsvBase(CsvType t) : type(t) {}
virtual ~CsvBase() {}
CsvType type;
};
/// \brief CSV Record that can represent integer, float and string.
template <typename T>
class CsvRecord : public CsvBase {
public:
CsvRecord() = default;
CsvRecord(CsvType t, T v) : CsvBase(t), value(v) {}
~CsvRecord() {}
T value;
};
class CSVDataset : public Dataset {
public:
/// \brief Constructor
CSVDataset(const std::vector<std::string> &dataset_files, char field_delim,
const std::vector<std::shared_ptr<CsvBase>> &column_defaults, const std::vector<std::string> &column_names,
int64_t num_samples, ShuffleMode shuffle, int32_t num_shards, int32_t shard_id);
/// \brief Destructor
~CSVDataset() = default;
/// \brief a base class override function to create the required runtime dataset op objects for this class
/// \return shared pointer to the list of newly created DatasetOps
std::vector<std::shared_ptr<DatasetOp>> Build() override;
/// \brief Parameters validation
/// \return bool true if all the params are valid
bool ValidateParams() override;
private:
std::vector<std::string> dataset_files_;
char field_delim_;
std::vector<std::shared_ptr<CsvBase>> column_defaults_;
std::vector<std::string> column_names_;
int64_t num_samples_;
ShuffleMode shuffle_;
int32_t num_shards_;
int32_t shard_id_;
};
/// \class ImageFolderDataset
/// \brief A Dataset derived class to represent ImageFolder dataset
class ImageFolderDataset : public Dataset {

View File

@ -103,7 +103,9 @@ SET(DE_UT_SRCS
c_api_dataset_cifar_test.cc
c_api_dataset_clue_test.cc
c_api_dataset_coco_test.cc
c_api_dataset_csv_test.cc
c_api_dataset_filetext_test.cc
c_api_dataset_manifest_test.cc
c_api_dataset_randomdata_test.cc
c_api_dataset_voc_test.cc
c_api_datasets_test.cc

View File

@ -0,0 +1,475 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/common.h"
#include "minddata/dataset/core/config_manager.h"
#include "minddata/dataset/core/global_context.h"
#include "minddata/dataset/include/datasets.h"
using namespace mindspore::dataset::api;
using mindspore::dataset::ShuffleMode;
using mindspore::dataset::Tensor;
using mindspore::dataset::GlobalContext;
class MindDataTestPipeline : public UT::DatasetOpTesting {
protected:
};
TEST_F(MindDataTestPipeline, TestCSVDatasetBasic) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetBasic.";
// Create a CSVDataset, with single CSV file
std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, -1, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
EXPECT_NE(row.find("col1"), row.end());
std::vector<std::vector<std::string>> expected_result = {
{"1", "2", "3", "4"},
{"5", "6", "7", "8"},
{"9", "10", "11", "12"},
};
uint64_t i = 0;
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
}
iter->GetNextRow(&row);
i++;
}
// Expect 3 samples
EXPECT_EQ(i, 3);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestCSVDatasetMultiFiles) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetMultiFiles.";
// Set configuration
uint32_t original_seed = GlobalContext::config_manager()->seed();
uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
GlobalContext::config_manager()->set_seed(111);
GlobalContext::config_manager()->set_num_parallel_workers(4);
// Create a CSVDataset, with single CSV file
std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
std::shared_ptr<Dataset> ds = CSV({file1, file2}, ',', {}, column_names, -1, ShuffleMode::kGlobal);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
EXPECT_NE(row.find("col1"), row.end());
std::vector<std::vector<std::string>> expected_result = {
{"17", "18", "19", "20"},
{"1", "2", "3", "4"},
{"5", "6", "7", "8"},
{"13", "14", "15", "16"},
{"21", "22", "23", "24"},
{"9", "10", "11", "12"},
};
uint64_t i = 0;
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
}
iter->GetNextRow(&row);
i++;
}
// Expect 6 samples
EXPECT_EQ(i, 6);
// Manually terminate the pipeline
iter->Stop();
// Restore configuration
GlobalContext::config_manager()->set_seed(original_seed);
GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
}
TEST_F(MindDataTestPipeline, TestCSVDatasetNumSamples) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetNumSamples.";
// Create a CSVDataset, with single CSV file
std::string file = datasets_root_path_ + "/testCSV/1.csv";
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
std::shared_ptr<Dataset> ds = CSV({file}, ',', {}, column_names, 2, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
EXPECT_NE(row.find("col1"), row.end());
std::vector<std::vector<std::string>> expected_result = {
{"1", "2", "3", "4"},
{"5", "6", "7", "8"}
};
uint64_t i = 0;
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
}
iter->GetNextRow(&row);
i++;
}
// Expect 2 samples
EXPECT_EQ(i, 2);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestCSVDatasetDistribution) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetDistribution.";
// Create a CSVDataset, with single CSV file
std::string file = datasets_root_path_ + "/testCSV/1.csv";
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
std::shared_ptr<Dataset> ds = CSV({file}, ',', {}, column_names, -1, ShuffleMode::kFalse, 2, 0);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
EXPECT_NE(row.find("col1"), row.end());
std::vector<std::vector<std::string>> expected_result = {
{"1", "2", "3", "4"},
{"5", "6", "7", "8"}
};
uint64_t i = 0;
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
}
iter->GetNextRow(&row);
i++;
}
// Expect 2 samples
EXPECT_EQ(i, 2);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestCSVDatasetType) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetType.";
// Create a CSVDataset, with single CSV file
std::string file = datasets_root_path_ + "/testCSV/default.csv";
std::vector<std::shared_ptr<CsvBase>> colum_type = {
std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
std::make_shared<CsvRecord<int>>(CsvType::INT, 0),
std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 0.0),
std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
};
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
std::shared_ptr<Dataset> ds = CSV({file}, ',', colum_type, column_names, -1, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
std::vector<std::vector<std::shared_ptr<CsvBase>>> expected = {
{
std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
std::make_shared<CsvRecord<int>>(CsvType::INT, 2),
std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 3.0),
std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
},
{
std::make_shared<CsvRecord<std::string>>(CsvType::STRING, "a"),
std::make_shared<CsvRecord<int>>(CsvType::INT, 4),
std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 5.0),
std::make_shared<CsvRecord<std::string>>(CsvType::STRING, "b"),
},
};
EXPECT_NE(row.find("col1"), row.end());
uint64_t i = 0;
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
if (colum_type[j]->type == CsvType::INT) {
int val;
text->GetItemAt(&val, {0});
EXPECT_EQ(val, std::dynamic_pointer_cast<CsvRecord<int>>(expected[i][j])->value);
} else if (colum_type[j]->type == CsvType::FLOAT) {
float val;
text->GetItemAt(&val, {0});
EXPECT_EQ(val, std::dynamic_pointer_cast<CsvRecord<float>>(expected[i][j])->value);
} else if (colum_type[j]->type == CsvType::STRING) {
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), std::dynamic_pointer_cast<CsvRecord<std::string>>(expected[i][j])->value.c_str());
}
}
iter->GetNextRow(&row);
i++;
}
// Expect 2 samples
EXPECT_EQ(i, 2);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestCSVDatasetHeader) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetHeader.";
// Create a CSVDataset, with single CSV file
std::string train_file = datasets_root_path_ + "/testCSV/header.csv";
std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, {});
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
EXPECT_NE(row.find("col1"), row.end());
std::vector<std::vector<std::string>> expected_result = {
{"a", "b", "c", "d"},
};
uint64_t i = 0;
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
}
iter->GetNextRow(&row);
i++;
}
// Expect 3 samples
EXPECT_EQ(i, 1);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestCSVDatasetException) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetException.";
// Create a CSV Dataset
std::string file = datasets_root_path_ + "/testCSV/1.csv";
std::string invalid_csv_file = "./NotExistFile";
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
// Test empty file list
std::shared_ptr<Dataset> ds0 = CSV({});
EXPECT_EQ(ds0, nullptr);
// Test invalid file
std::shared_ptr<Dataset> ds1 = CSV({invalid_csv_file});
EXPECT_EQ(ds1, nullptr);
// Test invalid num_samples < -1
std::shared_ptr<Dataset> ds2 = CSV({file}, ',', {}, column_names, -2);
EXPECT_EQ(ds2, nullptr);
// Test invalid num_shards < 1
std::shared_ptr<Dataset> ds3 = CSV({file}, ',', {}, column_names, -1, ShuffleMode::kFalse, 0);
EXPECT_EQ(ds3, nullptr);
// Test invalid shard_id >= num_shards
std::shared_ptr<Dataset> ds4 = CSV({file}, ',', {}, column_names, -1, ShuffleMode::kFalse, 2, 2);
EXPECT_EQ(ds4, nullptr);
// Test invalid field_delim
std::shared_ptr<Dataset> ds5 = CSV({file}, '"', {}, column_names);
EXPECT_EQ(ds5, nullptr);
}
TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleFiles) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleFiles.";
// Set configuration
uint32_t original_seed = GlobalContext::config_manager()->seed();
uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
GlobalContext::config_manager()->set_seed(130);
GlobalContext::config_manager()->set_num_parallel_workers(4);
// Create a CSVDataset, with single CSV file
std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
std::shared_ptr<Dataset> ds = CSV({file1, file2}, ',', {}, column_names, -1, ShuffleMode::kFiles);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
EXPECT_NE(row.find("col1"), row.end());
std::vector<std::vector<std::string>> expected_result = {
{"13", "14", "15", "16"},
{"1", "2", "3", "4"},
{"17", "18", "19", "20"},
{"5", "6", "7", "8"},
{"21", "22", "23", "24"},
{"9", "10", "11", "12"},
};
uint64_t i = 0;
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
}
iter->GetNextRow(&row);
i++;
}
// Expect 6 samples
EXPECT_EQ(i, 6);
// Manually terminate the pipeline
iter->Stop();
// Restore configuration
GlobalContext::config_manager()->set_seed(original_seed);
GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
}
TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleGlobal) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleGlobal.";
// Test CSV Dataset with GLOBLE shuffle
// Set configuration
uint32_t original_seed = GlobalContext::config_manager()->seed();
uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
GlobalContext::config_manager()->set_seed(135);
GlobalContext::config_manager()->set_num_parallel_workers(4);
// Create a CSVFile Dataset, with single CSV file
std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, -1, ShuffleMode::kGlobal);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
EXPECT_NE(row.find("col1"), row.end());
std::vector<std::vector<std::string>> expected_result = {
{"5", "6", "7", "8"},
{"9", "10", "11", "12"},
{"1", "2", "3", "4"}
};
uint64_t i = 0;
while (row.size() != 0) {
for (int j = 0; j < column_names.size(); j++) {
auto text = row[column_names[j]];
std::string_view sv;
text->GetItemAt(&sv, {0});
std::string ss(sv);
EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
}
iter->GetNextRow(&row);
i++;
}
// Expect 3 samples
EXPECT_EQ(i, 3);
// Manually terminate the pipeline
iter->Stop();
// Restore configuration
GlobalContext::config_manager()->set_seed(original_seed);
GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
}

View File

@ -0,0 +1,3 @@
13,14,15,16
17,18,19,20
21,22,23,24
1 13 14 15 16
2 17 18 19 20
3 21 22 23 24

View File

@ -0,0 +1,2 @@
,2,3.0,
a,4,5,b
1 2 3.0
2 a 4 5 b