diff --git a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt index 0b8d526f56f..688a802204e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt @@ -14,6 +14,7 @@ add_library(engine OBJECT data_buffer.cc data_schema.cc dataset_iterator.cc + tree_adapter.cc ) if (ENABLE_PYTHON) diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc new file mode 100644 index 00000000000..993ce85914f --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc @@ -0,0 +1,93 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "minddata/dataset/engine/tree_adapter.h" + +#include "minddata/dataset/core/client.h" +#include "minddata/dataset/include/datasets.h" + +namespace mindspore { +namespace dataset { + +Status TreeAdapter::BuildAndPrepare(std::shared_ptr root_ir, int32_t num_epoch) { + // Check whether this function has been called before. If so, return fail + CHECK_FAIL_RETURN_UNEXPECTED(tree_ == nullptr, "ExecutionTree is already built."); + RETURN_UNEXPECTED_IF_NULL(root_ir); + // GlobalInit, might need to be moved to the proper place once RuntimeConext is complete + RETURN_IF_NOT_OK(GlobalInit()); + + // this will evolve in the long run + tree_ = std::make_unique(); + + std::shared_ptr root_op; + RETURN_IF_NOT_OK(DFSBuildTree(root_ir, &root_op)); + RETURN_IF_NOT_OK(tree_->AssignRoot(root_op)); + + // Prepare the tree + RETURN_IF_NOT_OK(tree_->Prepare(num_epoch)); + + // after the tree is prepared, the col_name_id_map can safely be obtained + column_name_map_ = tree_->root()->column_name_id_map(); + + return Status::OK(); +} + +Status TreeAdapter::GetNext(TensorRow *row) { + RETURN_UNEXPECTED_IF_NULL(tree_); + RETURN_UNEXPECTED_IF_NULL(row); + row->clear(); // make sure row is empty + // cur_db_ being a nullptr means this is the first call to get_next, launch ExecutionTree + if (cur_db_ == nullptr) { + RETURN_IF_NOT_OK(tree_->Launch()); + RETURN_IF_NOT_OK(tree_->root()->GetNextBuffer(&cur_db_)); // first buf can't be eof or empty buf with none flag + RETURN_OK_IF_TRUE(cur_db_->eoe()); // return empty tensor if 1st buf is a ctrl buf (no rows) + } + + CHECK_FAIL_RETURN_UNEXPECTED(!cur_db_->eof(), "EOF has already been reached."); + + if (cur_db_->NumRows() == 0) { // a new row is fetched if cur buf is empty or a ctrl buf + RETURN_IF_NOT_OK(tree_->root()->GetNextBuffer(&cur_db_)); + RETURN_OK_IF_TRUE(cur_db_->eoe() || cur_db_->eof()); // return empty if this new buffer is a ctrl flag + } + + RETURN_IF_NOT_OK(cur_db_->PopRow(row)); + return Status::OK(); +} + +Status TreeAdapter::DFSBuildTree(std::shared_ptr ir, std::shared_ptr *op) { + std::vector> ops = ir->Build(); + CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node."); + + (*op) = ops.front(); // return the first op to be added as child by the caller of this function + RETURN_IF_NOT_OK(tree_->AssociateNode(*op)); + + for (size_t i = 1; i < ops.size(); i++) { + RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i])); + RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i])); + } + + // build the children of ir, once they return, add the return value to *op + for (std::shared_ptr child_ir : ir->children) { + std::shared_ptr child_op; + RETURN_IF_NOT_OK(DFSBuildTree(child_ir, &child_op)); + RETURN_IF_NOT_OK(ops.back()->AddChild(child_op)); // append children to the last of ops + } + + return Status::OK(); +} + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h new file mode 100644 index 00000000000..38907e82e7d --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h @@ -0,0 +1,66 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_ +#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_ + +#include +#include +#include +#include +#include + +#include "minddata/dataset/engine/execution_tree.h" +#include "minddata/dataset/include/datasets.h" + +namespace mindspore { +namespace dataset { +class TreeAdapter { + public: + TreeAdapter() = default; + + ~TreeAdapter() = default; + + // This will construct a ExeTree from a Dataset root and Prepare() the ExeTree + // This function is only meant to be called once and needs to be called before GetNext + // ExeTree will be launched when the first GetNext is called + Status BuildAndPrepare(std::shared_ptr root, int32_t num_epoch = -1); + + // This is the main method TreeConsumer uses to interact with TreeAdapter + // 1. GetNext will Launch() the ExeTree on its first call by iterator (tree is already prepared) + // 2. GetNext will return empty row when eoe/eof is obtained + Status GetNext(TensorRow *); + + // this function will return the column_name_map once BuildAndPrepare() is called + std::unordered_map GetColumnNameMap() const { return column_name_map_; } + + // this function returns the TaskGroup associated with ExeTree, this is needed by DeviceQueueConsumer + // to be able to launch a thread. BuildAndPrepare needs to be called before this function + TaskGroup *AllTasks() const { return tree_ != nullptr ? tree_->AllTasks() : nullptr; } + + private: + // this RECURSIVE function converts IR nodes into DatasetOp in ExecutionTree. ir could build a vector of ops. In + // such case, the first node is returned. Op is added as child when the current function returns. + Status DFSBuildTree(std::shared_ptr ir, std::shared_ptr *op); + + std::unique_ptr cur_db_; + std::unordered_map column_name_map_; + std::unique_ptr tree_; +}; +} // namespace dataset +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_H_ diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index 229de349570..527f050c4f6 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -45,6 +45,7 @@ class DatasetOp; class DataSchema; class Tensor; class TensorShape; +class TreeAdapter; #ifndef ENABLE_ANDROID class Vocab; #endif @@ -458,7 +459,9 @@ std::shared_ptr Zip(const std::vector> &datase /// \brief A base class to represent a dataset in the data pipeline. class Dataset : public std::enable_shared_from_this { public: + // need friend class so they can access the children_ field friend class Iterator; + friend class mindspore::dataset::TreeAdapter; /// \brief Constructor Dataset(); diff --git a/tests/ut/cpp/dataset/CMakeLists.txt b/tests/ut/cpp/dataset/CMakeLists.txt index 96c6012dbf8..3c21213759c 100644 --- a/tests/ut/cpp/dataset/CMakeLists.txt +++ b/tests/ut/cpp/dataset/CMakeLists.txt @@ -70,12 +70,13 @@ SET(DE_UT_SRCS stand_alone_samplers_test.cc status_test.cc task_manager_test.cc - tensor_test.cc tensor_row_test.cc tensor_string_test.cc + tensor_test.cc tensorshape_test.cc tfReader_op_test.cc to_float16_op_test.cc + tree_adapter_test.cc type_cast_op_test.cc zip_op_test.cc random_resize_op_test.cc @@ -112,8 +113,8 @@ SET(DE_UT_SRCS c_api_dataset_csv_test.cc c_api_dataset_manifest_test.cc c_api_dataset_randomdata_test.cc - c_api_dataset_textfile_test.cc - c_api_dataset_tfrecord_test.cc + c_api_dataset_textfile_test.cc + c_api_dataset_tfrecord_test.cc c_api_dataset_voc_test.cc c_api_datasets_test.cc c_api_dataset_iterator_test.cc diff --git a/tests/ut/cpp/dataset/tree_adapter_test.cc b/tests/ut/cpp/dataset/tree_adapter_test.cc new file mode 100644 index 00000000000..583750aa635 --- /dev/null +++ b/tests/ut/cpp/dataset/tree_adapter_test.cc @@ -0,0 +1,133 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "minddata/dataset/engine/tree_adapter.h" +#include "common/common.h" +#include "minddata/dataset/core/tensor_row.h" +#include "minddata/dataset/include/datasets.h" +#include "minddata/dataset/include/transforms.h" + +using namespace mindspore::dataset; +using mindspore::dataset::Tensor; + +class MindDataTestTreeAdapter : public UT::DatasetOpTesting { + protected: +}; + +TEST_F(MindDataTestTreeAdapter, TestSimpleTreeAdapter) { + MS_LOG(INFO) << "Doing MindDataTestTreeAdapter-TestSimpleTreeAdapter."; + + // Create a Mnist Dataset + std::string folder_path = datasets_root_path_ + "/testMnistData/"; + std::shared_ptr ds = Mnist(folder_path, "all", api::SequentialSampler(0, 4)); + EXPECT_NE(ds, nullptr); + + ds = ds->Batch(2); + EXPECT_NE(ds, nullptr); + + mindspore::dataset::TreeAdapter tree_adapter; + + Status rc = tree_adapter.BuildAndPrepare(ds, 1); + + EXPECT_TRUE(rc.IsOk()); + + const std::unordered_map map = {{"label", 1}, {"image", 0}}; + EXPECT_EQ(tree_adapter.GetColumnNameMap(), map); + + std::vector row_sizes = {2, 2, 0, 0}; + + TensorRow row; + for (size_t sz : row_sizes) { + rc = tree_adapter.GetNext(&row); + EXPECT_TRUE(rc.IsOk()); + EXPECT_EQ(row.size(), sz); + } + + rc = tree_adapter.GetNext(&row); + EXPECT_TRUE(rc.IsError()); + const std::string err_msg = rc.ToString(); + EXPECT_TRUE(err_msg.find("EOF has already been reached") != err_msg.npos); +} + +TEST_F(MindDataTestTreeAdapter, TestTreeAdapterWithRepeat) { + MS_LOG(INFO) << "Doing MindDataTestTreeAdapter-TestTreeAdapterWithRepeat."; + + // Create a Mnist Dataset + std::string folder_path = datasets_root_path_ + "/testMnistData/"; + std::shared_ptr ds = Mnist(folder_path, "all", api::SequentialSampler(0, 3)); + EXPECT_NE(ds, nullptr); + + ds = ds->Batch(2, false); + EXPECT_NE(ds, nullptr); + + mindspore::dataset::TreeAdapter tree_adapter; + + Status rc = tree_adapter.BuildAndPrepare(ds, 2); + EXPECT_TRUE(rc.IsOk()); + + const std::unordered_map map = tree_adapter.GetColumnNameMap(); + EXPECT_EQ(tree_adapter.GetColumnNameMap(), map); + + std::vector row_sizes = {2, 2, 0, 2, 2, 0, 0}; + + TensorRow row; + for (size_t sz : row_sizes) { + rc = tree_adapter.GetNext(&row); + EXPECT_TRUE(rc.IsOk()); + EXPECT_EQ(row.size(), sz); + } + rc = tree_adapter.GetNext(&row); + const std::string err_msg = rc.ToString(); + EXPECT_TRUE(err_msg.find("EOF has already been reached") != err_msg.npos); +} + +TEST_F(MindDataTestTreeAdapter, TestProjectMapTreeAdapter) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestProjectMap."; + + // Create an ImageFolder Dataset + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, api::SequentialSampler(0, 2)); + EXPECT_NE(ds, nullptr); + + // Create objects for the tensor ops + std::shared_ptr one_hot = api::transforms::OneHot(10); + EXPECT_NE(one_hot, nullptr); + + // Create a Map operation, this will automatically add a project after map + ds = ds->Map({one_hot}, {"label"}, {"label"}, {"label"}); + EXPECT_NE(ds, nullptr); + + mindspore::dataset::TreeAdapter tree_adapter; + + Status rc = tree_adapter.BuildAndPrepare(ds, 2); + + EXPECT_TRUE(rc.IsOk()); + + const std::unordered_map map = {{"label", 0}}; + EXPECT_EQ(tree_adapter.GetColumnNameMap(), map); + + std::vector row_sizes = {1, 1, 0, 1, 1, 0, 0}; + TensorRow row; + + for (size_t sz : row_sizes) { + rc = tree_adapter.GetNext(&row); + EXPECT_TRUE(rc.IsOk()); + EXPECT_EQ(row.size(), sz); + } + rc = tree_adapter.GetNext(&row); + const std::string err_msg = rc.ToString(); + EXPECT_TRUE(err_msg.find("EOF has already been reached") != err_msg.npos); +}