From 39df25f803f009b72aa71e1a6be818951651781a Mon Sep 17 00:00:00 2001 From: jiangzhiwen Date: Tue, 18 Aug 2020 22:31:44 +0800 Subject: [PATCH] c++ api for CSV Dataset --- .../ccsrc/minddata/dataset/api/datasets.cc | 91 ++++ .../ccsrc/minddata/dataset/include/datasets.h | 99 +++- tests/ut/cpp/dataset/CMakeLists.txt | 2 + .../ut/cpp/dataset/c_api_dataset_csv_test.cc | 475 ++++++++++++++++++ tests/ut/data/dataset/testCSV/append.csv | 3 + tests/ut/data/dataset/testCSV/default.csv | 2 + 6 files changed, 662 insertions(+), 10 deletions(-) create mode 100644 tests/ut/cpp/dataset/c_api_dataset_csv_test.cc create mode 100644 tests/ut/data/dataset/testCSV/append.csv create mode 100644 tests/ut/data/dataset/testCSV/default.csv diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index b937830ae2d..a4434afbb23 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -25,6 +25,7 @@ #include "minddata/dataset/engine/datasetops/source/cifar_op.h" #include "minddata/dataset/engine/datasetops/source/clue_op.h" #include "minddata/dataset/engine/datasetops/source/coco_op.h" +#include "minddata/dataset/engine/datasetops/source/csv_op.h" #include "minddata/dataset/engine/datasetops/source/image_folder_op.h" #include "minddata/dataset/engine/datasetops/source/manifest_op.h" #include "minddata/dataset/engine/datasetops/source/mnist_op.h" @@ -161,6 +162,18 @@ std::shared_ptr Coco(const std::string &dataset_dir, const std::str return ds->ValidateParams() ? ds : nullptr; } +// Function to create a CSVDataset. +std::shared_ptr CSV(const std::vector &dataset_files, char field_delim, + const std::vector> &column_defaults, + const std::vector &column_names, int64_t num_samples, ShuffleMode shuffle, + int32_t num_shards, int32_t shard_id) { + auto ds = std::make_shared(dataset_files, field_delim, column_defaults, column_names, num_samples, + shuffle, num_shards, shard_id); + + // Call derived class validation method. + return ds->ValidateParams() ? ds : nullptr; +} + // Function to create a ImageFolderDataset. std::shared_ptr ImageFolder(const std::string &dataset_dir, bool decode, const std::shared_ptr &sampler, @@ -1021,6 +1034,84 @@ std::vector> CocoDataset::Build() { return node_ops; } +// Constructor for CSVDataset +CSVDataset::CSVDataset(const std::vector &csv_files, char field_delim, + const std::vector> &column_defaults, + const std::vector &column_names, int64_t num_samples, ShuffleMode shuffle, + int32_t num_shards, int32_t shard_id) + : dataset_files_(csv_files), + field_delim_(field_delim), + column_defaults_(column_defaults), + column_names_(column_names), + num_samples_(num_samples), + shuffle_(shuffle), + num_shards_(num_shards), + shard_id_(shard_id) {} + +bool CSVDataset::ValidateParams() { + if (!ValidateDatasetFilesParam("CSVDataset", dataset_files_)) { + return false; + } + + if (field_delim_ == '"' || field_delim_ == '\r' || field_delim_ == '\n') { + MS_LOG(ERROR) << "CSVDataset: The field delimiter should not be \", \\r, \\n"; + return false; + } + + if (num_samples_ < -1) { + MS_LOG(ERROR) << "CSVDataset: Invalid number of samples: " << num_samples_; + return false; + } + + if (!ValidateDatasetShardParams("CSVDataset", num_shards_, shard_id_)) { + return false; + } + + return true; +} + +// Function to build CSVDataset +std::vector> CSVDataset::Build() { + // A vector containing shared pointer to the Dataset Ops that this object will create + std::vector> node_ops; + + bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); + std::vector> column_default_list; + for (auto v : column_defaults_) { + if (v->type == CsvType::INT) { + column_default_list.push_back( + std::make_shared>(CsvOp::INT, std::dynamic_pointer_cast>(v)->value)); + } else if (v->type == CsvType::FLOAT) { + column_default_list.push_back( + std::make_shared>(CsvOp::FLOAT, std::dynamic_pointer_cast>(v)->value)); + } else if (v->type == CsvType::STRING) { + column_default_list.push_back(std::make_shared>( + CsvOp::STRING, std::dynamic_pointer_cast>(v)->value)); + } + } + + std::shared_ptr csv_op = std::make_shared( + dataset_files_, field_delim_, column_default_list, column_names_, num_workers_, rows_per_buffer_, num_samples_, + worker_connector_size_, connector_que_size_, shuffle_files, num_shards_, shard_id_); + RETURN_EMPTY_IF_ERROR(csv_op->Init()); + if (shuffle_ == ShuffleMode::kGlobal) { + // Inject ShuffleOp + std::shared_ptr shuffle_op = nullptr; + int64_t num_rows = 0; + + // First, get the number of rows in the dataset + RETURN_EMPTY_IF_ERROR(CsvOp::CountAllFileRows(dataset_files_, column_names_.empty(), &num_rows)); + + // Add the shuffle op after this op + RETURN_EMPTY_IF_ERROR(AddShuffleOp(dataset_files_.size(), num_shards_, num_rows, 0, connector_que_size_, + rows_per_buffer_, &shuffle_op)); + node_ops.push_back(shuffle_op); + } + + node_ops.push_back(csv_op); + return node_ops; +} + ImageFolderDataset::ImageFolderDataset(std::string dataset_dir, bool decode, std::shared_ptr sampler, bool recursive, std::set extensions, std::map class_indexing) diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index 57893c2d6c8..1f12cf0c0c6 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -51,6 +51,8 @@ class Cifar10Dataset; class Cifar100Dataset; class CLUEDataset; class CocoDataset; +class CSVDataset; +class CsvBase; class ImageFolderDataset; class ManifestDataset; class MnistDataset; @@ -114,13 +116,13 @@ std::shared_ptr Cifar100(const std::string &dataset_dir, /// \param[in] usage Be used to "train", "test" or "eval" data (default="train"). /// \param[in] num_samples The number of samples to be included in the dataset. /// (Default = 0 means all samples.) -/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode.kGlobal) +/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode::kGlobal) /// Can be any of: -/// ShuffleMode.kFalse - No shuffling is performed. -/// ShuffleMode.kFiles - Shuffle files only. -/// ShuffleMode.kGlobal - Shuffle both the files and samples. +/// ShuffleMode::kFalse - No shuffling is performed. +/// ShuffleMode::kFiles - Shuffle files only. +/// ShuffleMode::kGlobal - Shuffle both the files and samples. /// \param[in] num_shards Number of shards that the dataset should be divided into. (Default = 1) -/// \param[in] shard_id The shard ID within num_shards. This argument should be +/// \param[in] shard_id The shard ID within num_shards. This argument should be /// specified only when num_shards is also specified. (Default = 0) /// \return Shared pointer to the current CLUEDataset std::shared_ptr CLUE(const std::vector &dataset_files, const std::string &task = "AFQMC", @@ -148,6 +150,32 @@ std::shared_ptr Coco(const std::string &dataset_dir, const std::str const std::string &task = "Detection", const bool &decode = false, const std::shared_ptr &sampler = nullptr); +/// \brief Function to create a CSVDataset +/// \notes The generated dataset has a variable number of columns +/// \param[in] dataset_files List of files to be read to search for a pattern of files. The list +/// will be sorted in a lexicographical order. +/// \param[in] field_delim A char that indicates the delimiter to separate fields (default=','). +/// \param[in] column_defaults List of default values for the CSV field (default={}). Each item in the list is +/// either a valid type (float, int, or string). If this is not provided, treats all columns as string type. +/// \param[in] column_names List of column names of the dataset (default={}). If this is not provided, infers the +/// column_names from the first row of CSV file. +/// \param[in] num_samples The number of samples to be included in the dataset. +/// (Default = -1 means all samples.) +/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode::kGlobal) +/// Can be any of: +/// ShuffleMode::kFalse - No shuffling is performed. +/// ShuffleMode::kFiles - Shuffle files only. +/// ShuffleMode::kGlobal - Shuffle both the files and samples. +/// \param[in] num_shards Number of shards that the dataset should be divided into. (Default = 1) +/// \param[in] shard_id The shard ID within num_shards. This argument should be +/// specified only when num_shards is also specified. (Default = 0) +/// \return Shared pointer to the current Dataset +std::shared_ptr CSV(const std::vector &dataset_files, char field_delim = ',', + const std::vector> &column_defaults = {}, + const std::vector &column_names = {}, int64_t num_samples = -1, + ShuffleMode shuffle = ShuffleMode::kGlobal, int32_t num_shards = 1, + int32_t shard_id = 0); + /// \brief Function to create an ImageFolderDataset /// \notes A source dataset that reads images from a tree of directories /// All images within one folder have the same label @@ -217,13 +245,13 @@ std::shared_ptr RandomData(const int32_t &total_rows = 0, T schem /// will be sorted in a lexicographical order. /// \param[in] num_samples The number of samples to be included in the dataset. /// (Default = 0 means all samples.) -/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode.kGlobal) +/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode::kGlobal) /// Can be any of: -/// ShuffleMode.kFalse - No shuffling is performed. -/// ShuffleMode.kFiles - Shuffle files only. -/// ShuffleMode.kGlobal - Shuffle both the files and samples. +/// ShuffleMode::kFalse - No shuffling is performed. +/// ShuffleMode::kFiles - Shuffle files only. +/// ShuffleMode::kGlobal - Shuffle both the files and samples. /// \param[in] num_shards Number of shards that the dataset should be divided into. (Default = 1) -/// \param[in] shard_id The shard ID within num_shards. This argument should be +/// \param[in] shard_id The shard ID within num_shards. This argument should be /// specified only when num_shards is also specified. (Default = 0) /// \return Shared pointer to the current TextFileDataset std::shared_ptr TextFile(const std::vector &dataset_files, int32_t num_samples = 0, @@ -572,6 +600,57 @@ class CocoDataset : public Dataset { std::shared_ptr sampler_; }; +/// \brief Record type for CSV +enum CsvType : uint8_t { INT = 0, FLOAT, STRING }; + +/// \brief Base class of CSV Record +struct CsvBase { + public: + CsvBase() = default; + explicit CsvBase(CsvType t) : type(t) {} + virtual ~CsvBase() {} + CsvType type; +}; + +/// \brief CSV Record that can represent integer, float and string. +template +class CsvRecord : public CsvBase { + public: + CsvRecord() = default; + CsvRecord(CsvType t, T v) : CsvBase(t), value(v) {} + ~CsvRecord() {} + T value; +}; + +class CSVDataset : public Dataset { + public: + /// \brief Constructor + CSVDataset(const std::vector &dataset_files, char field_delim, + const std::vector> &column_defaults, const std::vector &column_names, + int64_t num_samples, ShuffleMode shuffle, int32_t num_shards, int32_t shard_id); + + /// \brief Destructor + ~CSVDataset() = default; + + /// \brief a base class override function to create the required runtime dataset op objects for this class + /// \return shared pointer to the list of newly created DatasetOps + std::vector> Build() override; + + /// \brief Parameters validation + /// \return bool true if all the params are valid + bool ValidateParams() override; + + private: + std::vector dataset_files_; + char field_delim_; + std::vector> column_defaults_; + std::vector column_names_; + int64_t num_samples_; + ShuffleMode shuffle_; + int32_t num_shards_; + int32_t shard_id_; +}; + /// \class ImageFolderDataset /// \brief A Dataset derived class to represent ImageFolder dataset class ImageFolderDataset : public Dataset { diff --git a/tests/ut/cpp/dataset/CMakeLists.txt b/tests/ut/cpp/dataset/CMakeLists.txt index 20976398bb9..fbcd1dbc75f 100644 --- a/tests/ut/cpp/dataset/CMakeLists.txt +++ b/tests/ut/cpp/dataset/CMakeLists.txt @@ -103,7 +103,9 @@ SET(DE_UT_SRCS c_api_dataset_cifar_test.cc c_api_dataset_clue_test.cc c_api_dataset_coco_test.cc + c_api_dataset_csv_test.cc c_api_dataset_filetext_test.cc + c_api_dataset_manifest_test.cc c_api_dataset_randomdata_test.cc c_api_dataset_voc_test.cc c_api_datasets_test.cc diff --git a/tests/ut/cpp/dataset/c_api_dataset_csv_test.cc b/tests/ut/cpp/dataset/c_api_dataset_csv_test.cc new file mode 100644 index 00000000000..f005cf301f7 --- /dev/null +++ b/tests/ut/cpp/dataset/c_api_dataset_csv_test.cc @@ -0,0 +1,475 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/common.h" +#include "minddata/dataset/core/config_manager.h" +#include "minddata/dataset/core/global_context.h" +#include "minddata/dataset/include/datasets.h" + +using namespace mindspore::dataset::api; +using mindspore::dataset::ShuffleMode; +using mindspore::dataset::Tensor; +using mindspore::dataset::GlobalContext; + +class MindDataTestPipeline : public UT::DatasetOpTesting { + protected: +}; + +TEST_F(MindDataTestPipeline, TestCSVDatasetBasic) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetBasic."; + + // Create a CSVDataset, with single CSV file + std::string train_file = datasets_root_path_ + "/testCSV/1.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({train_file}, ',', {}, column_names, -1, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + EXPECT_NE(row.find("col1"), row.end()); + std::vector> expected_result = { + {"1", "2", "3", "4"}, + {"5", "6", "7", "8"}, + {"9", "10", "11", "12"}, + }; + + uint64_t i = 0; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str()); + } + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetMultiFiles) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetMultiFiles."; + + // Set configuration + uint32_t original_seed = GlobalContext::config_manager()->seed(); + uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers(); + MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers; + GlobalContext::config_manager()->set_seed(111); + GlobalContext::config_manager()->set_num_parallel_workers(4); + + // Create a CSVDataset, with single CSV file + std::string file1 = datasets_root_path_ + "/testCSV/1.csv"; + std::string file2 = datasets_root_path_ + "/testCSV/append.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({file1, file2}, ',', {}, column_names, -1, ShuffleMode::kGlobal); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + EXPECT_NE(row.find("col1"), row.end()); + std::vector> expected_result = { + {"17", "18", "19", "20"}, + {"1", "2", "3", "4"}, + {"5", "6", "7", "8"}, + {"13", "14", "15", "16"}, + {"21", "22", "23", "24"}, + {"9", "10", "11", "12"}, + }; + + uint64_t i = 0; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str()); + } + iter->GetNextRow(&row); + i++; + } + + // Expect 6 samples + EXPECT_EQ(i, 6); + + // Manually terminate the pipeline + iter->Stop(); + + // Restore configuration + GlobalContext::config_manager()->set_seed(original_seed); + GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetNumSamples) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetNumSamples."; + + // Create a CSVDataset, with single CSV file + std::string file = datasets_root_path_ + "/testCSV/1.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({file}, ',', {}, column_names, 2, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + EXPECT_NE(row.find("col1"), row.end()); + std::vector> expected_result = { + {"1", "2", "3", "4"}, + {"5", "6", "7", "8"} + }; + + uint64_t i = 0; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str()); + } + iter->GetNextRow(&row); + i++; + } + + // Expect 2 samples + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetDistribution) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetDistribution."; + + // Create a CSVDataset, with single CSV file + std::string file = datasets_root_path_ + "/testCSV/1.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({file}, ',', {}, column_names, -1, ShuffleMode::kFalse, 2, 0); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + EXPECT_NE(row.find("col1"), row.end()); + std::vector> expected_result = { + {"1", "2", "3", "4"}, + {"5", "6", "7", "8"} + }; + + uint64_t i = 0; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str()); + } + iter->GetNextRow(&row); + i++; + } + + // Expect 2 samples + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetType) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetType."; + + // Create a CSVDataset, with single CSV file + std::string file = datasets_root_path_ + "/testCSV/default.csv"; + std::vector> colum_type = { + std::make_shared>(CsvType::STRING, ""), + std::make_shared>(CsvType::INT, 0), + std::make_shared>(CsvType::FLOAT, 0.0), + std::make_shared>(CsvType::STRING, ""), + }; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({file}, ',', colum_type, column_names, -1, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + std::vector>> expected = { + { + std::make_shared>(CsvType::STRING, ""), + std::make_shared>(CsvType::INT, 2), + std::make_shared>(CsvType::FLOAT, 3.0), + std::make_shared>(CsvType::STRING, ""), + }, + { + std::make_shared>(CsvType::STRING, "a"), + std::make_shared>(CsvType::INT, 4), + std::make_shared>(CsvType::FLOAT, 5.0), + std::make_shared>(CsvType::STRING, "b"), + }, + }; + EXPECT_NE(row.find("col1"), row.end()); + + uint64_t i = 0; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + if (colum_type[j]->type == CsvType::INT) { + int val; + text->GetItemAt(&val, {0}); + EXPECT_EQ(val, std::dynamic_pointer_cast>(expected[i][j])->value); + } else if (colum_type[j]->type == CsvType::FLOAT) { + float val; + text->GetItemAt(&val, {0}); + EXPECT_EQ(val, std::dynamic_pointer_cast>(expected[i][j])->value); + } else if (colum_type[j]->type == CsvType::STRING) { + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), std::dynamic_pointer_cast>(expected[i][j])->value.c_str()); + } + } + iter->GetNextRow(&row); + i++; + } + + // Expect 2 samples + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetHeader) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetHeader."; + + // Create a CSVDataset, with single CSV file + std::string train_file = datasets_root_path_ + "/testCSV/header.csv"; + std::shared_ptr ds = CSV({train_file}, ',', {}, {}); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + EXPECT_NE(row.find("col1"), row.end()); + std::vector> expected_result = { + {"a", "b", "c", "d"}, + }; + + uint64_t i = 0; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str()); + } + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 1); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetException) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetException."; + // Create a CSV Dataset + std::string file = datasets_root_path_ + "/testCSV/1.csv"; + std::string invalid_csv_file = "./NotExistFile"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + + // Test empty file list + std::shared_ptr ds0 = CSV({}); + EXPECT_EQ(ds0, nullptr); + + // Test invalid file + std::shared_ptr ds1 = CSV({invalid_csv_file}); + EXPECT_EQ(ds1, nullptr); + + // Test invalid num_samples < -1 + std::shared_ptr ds2 = CSV({file}, ',', {}, column_names, -2); + EXPECT_EQ(ds2, nullptr); + + // Test invalid num_shards < 1 + std::shared_ptr ds3 = CSV({file}, ',', {}, column_names, -1, ShuffleMode::kFalse, 0); + EXPECT_EQ(ds3, nullptr); + + // Test invalid shard_id >= num_shards + std::shared_ptr ds4 = CSV({file}, ',', {}, column_names, -1, ShuffleMode::kFalse, 2, 2); + EXPECT_EQ(ds4, nullptr); + + // Test invalid field_delim + std::shared_ptr ds5 = CSV({file}, '"', {}, column_names); + EXPECT_EQ(ds5, nullptr); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleFiles) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleFiles."; + + // Set configuration + uint32_t original_seed = GlobalContext::config_manager()->seed(); + uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers(); + MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers; + GlobalContext::config_manager()->set_seed(130); + GlobalContext::config_manager()->set_num_parallel_workers(4); + + // Create a CSVDataset, with single CSV file + std::string file1 = datasets_root_path_ + "/testCSV/1.csv"; + std::string file2 = datasets_root_path_ + "/testCSV/append.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({file1, file2}, ',', {}, column_names, -1, ShuffleMode::kFiles); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + EXPECT_NE(row.find("col1"), row.end()); + std::vector> expected_result = { + {"13", "14", "15", "16"}, + {"1", "2", "3", "4"}, + {"17", "18", "19", "20"}, + {"5", "6", "7", "8"}, + {"21", "22", "23", "24"}, + {"9", "10", "11", "12"}, + }; + + uint64_t i = 0; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str()); + } + iter->GetNextRow(&row); + i++; + } + + // Expect 6 samples + EXPECT_EQ(i, 6); + + // Manually terminate the pipeline + iter->Stop(); + + // Restore configuration + GlobalContext::config_manager()->set_seed(original_seed); + GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers); +} + +TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleGlobal) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleGlobal."; + // Test CSV Dataset with GLOBLE shuffle + + // Set configuration + uint32_t original_seed = GlobalContext::config_manager()->seed(); + uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers(); + MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers; + GlobalContext::config_manager()->set_seed(135); + GlobalContext::config_manager()->set_num_parallel_workers(4); + + // Create a CSVFile Dataset, with single CSV file + std::string train_file = datasets_root_path_ + "/testCSV/1.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({train_file}, ',', {}, column_names, -1, ShuffleMode::kGlobal); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + EXPECT_NE(row.find("col1"), row.end()); + std::vector> expected_result = { + {"5", "6", "7", "8"}, + {"9", "10", "11", "12"}, + {"1", "2", "3", "4"} + }; + + uint64_t i = 0; + while (row.size() != 0) { + for (int j = 0; j < column_names.size(); j++) { + auto text = row[column_names[j]]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str()); + } + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); + + // Restore configuration + GlobalContext::config_manager()->set_seed(original_seed); + GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers); +} diff --git a/tests/ut/data/dataset/testCSV/append.csv b/tests/ut/data/dataset/testCSV/append.csv new file mode 100644 index 00000000000..558432d388a --- /dev/null +++ b/tests/ut/data/dataset/testCSV/append.csv @@ -0,0 +1,3 @@ +13,14,15,16 +17,18,19,20 +21,22,23,24 diff --git a/tests/ut/data/dataset/testCSV/default.csv b/tests/ut/data/dataset/testCSV/default.csv new file mode 100644 index 00000000000..457834e599e --- /dev/null +++ b/tests/ut/data/dataset/testCSV/default.csv @@ -0,0 +1,2 @@ +,2,3.0, +a,4,5,b \ No newline at end of file