From 7538f82da916e29d5955b2fdab4efa77d3f2710d Mon Sep 17 00:00:00 2001 From: Zirui Wu Date: Fri, 6 Nov 2020 13:47:18 -0500 Subject: [PATCH] add callback to map's c api fix test err fix ci round 1 fix ci round 2 add col_order to batch_node minor fix --- .../ccsrc/minddata/dataset/api/datasets.cc | 7 ++-- .../ccsrc/minddata/dataset/api/iterator.cc | 7 +++- .../dataset/engine/datasetops/dataset_op.h | 3 ++ .../engine/datasetops/map_op/map_op.cc | 6 +-- .../engine/ir/datasetops/batch_node.cc | 8 ++-- .../dataset/engine/ir/datasetops/batch_node.h | 3 +- .../dataset/engine/ir/datasetops/map_node.cc | 11 +++++- .../dataset/engine/ir/datasetops/map_node.h | 4 +- .../dataset/engine/opt/post/repeat_pass.cc | 2 +- .../engine/opt/pre/cache_transform_pass.cc | 5 ++- .../engine/opt/pre/cache_transform_pass.h | 30 +++++++------- .../ccsrc/minddata/dataset/include/datasets.h | 8 ++-- tests/ut/cpp/dataset/callback_test.cc | 39 +++++++++++++++++-- 13 files changed, 94 insertions(+), 39 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index ccbff9217c2..e71e9b779bb 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -499,9 +499,10 @@ FilterDataset::FilterDataset(std::shared_ptr input, std::function input, std::vector> operations, std::vector input_columns, std::vector output_columns, - const std::vector &project_columns, const std::shared_ptr &cache) { - auto ds = - std::make_shared(input->IRNode(), operations, input_columns, output_columns, project_columns, cache); + const std::vector &project_columns, const std::shared_ptr &cache, + std::vector> callbacks) { + auto ds = std::make_shared(input->IRNode(), operations, input_columns, output_columns, project_columns, + cache, callbacks); ir_node_ = std::static_pointer_cast(ds); } diff --git a/mindspore/ccsrc/minddata/dataset/api/iterator.cc b/mindspore/ccsrc/minddata/dataset/api/iterator.cc index 31a9ffaad59..e9fda03b2c2 100644 --- a/mindspore/ccsrc/minddata/dataset/api/iterator.cc +++ b/mindspore/ccsrc/minddata/dataset/api/iterator.cc @@ -44,7 +44,12 @@ bool Iterator::GetNextRow(TensorVec *row) { } // Shut down the data pipeline. -void Iterator::Stop() { runtime_context_->Terminate(); } +void Iterator::Stop() { + Status rc = runtime_context_->Terminate(); + if (rc.IsError()) { + MS_LOG(ERROR) << rc.ToString(); + } +} // Function to build and launch the execution tree. Status Iterator::BuildAndLaunchTree(std::shared_ptr ds) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h index 6c9fd15dbb0..b1e4ff8d4b2 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h @@ -385,6 +385,9 @@ class DatasetOp : public std::enable_shared_from_this { /// \return Status virtual Status WaitForWorkers() { return Status::OK(); } + /// \brief Add callback to DatasetOp, only MapOp supports Callback at the moment + void AddCallbacks(std::vector> callbacks) { callback_manager_.AddCallbacks(callbacks); } + protected: /// \brief Removes a parent operator from this operator /// \notes External callers do not have access to this function diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc index 0fa1efec4b0..3c1e6f4cba4 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "minddata/dataset/engine/datasetops/map_op/map_op.h" #include #include -#include #include #include #include "minddata/dataset/core/config_manager.h" @@ -26,8 +26,6 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h" #include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h" -#include "minddata/dataset/engine/datasetops/map_op/map_op.h" -#include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/engine/opt/pass.h" #include "minddata/dataset/kernels/tensor_op.h" #include "minddata/dataset/util/task_manager.h" @@ -60,7 +58,7 @@ Status MapOp::Builder::Build(std::shared_ptr *ptr) { RETURN_IF_NOT_OK(sanityCheck()); *ptr = std::make_shared(std::move(build_in_col_names_), std::move(build_out_col_names_), std::move(build_tensor_funcs_), build_num_workers_, build_op_connector_size_); - (*ptr)->callback_manager_.AddCallbacks(std::move(builder_callbacks_)); + (*ptr)->AddCallbacks(std::move(builder_callbacks_)); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc index 9af28833dec..705b12d9fa6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc @@ -31,13 +31,15 @@ namespace dataset { // constructor #1, called by Pybind BatchNode::BatchNode(std::shared_ptr child, int32_t batch_size, bool drop_remainder, bool pad, const std::vector &in_col_names, const std::vector &out_col_names, - py::function batch_size_func, py::function batch_map_func, + const std::vector &col_order, py::function batch_size_func, + py::function batch_map_func, std::map>> pad_map) : batch_size_(batch_size), drop_remainder_(drop_remainder), pad_(pad), in_col_names_(in_col_names), out_col_names_(out_col_names), + col_order_(col_order), batch_size_func_(batch_size_func), batch_map_func_(batch_map_func), pad_map_(pad_map) { @@ -83,8 +85,8 @@ std::vector> BatchNode::Build() { in_col_names_, out_col_names_, batch_size_func_, batch_map_func_, pad_map_)); // need to insert a project when per_batch_func changes the number of columns - if (!out_col_names_.empty()) { - auto project_op = std::make_shared(out_col_names_); + if (!col_order_.empty()) { + auto project_op = std::make_shared(col_order_); node_ops.push_back(project_op); } #else diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.h index 9156cc66842..0e4d693b469 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.h @@ -34,7 +34,7 @@ class BatchNode : public DatasetNode { /// \brief Constructor #1, for Python API to create a BatchNode BatchNode(std::shared_ptr child, int32_t batch_size, bool drop_remainder, bool pad, const std::vector &in_col_names, const std::vector &out_col_names, - py::function batch_size_func, py::function batch_map_func, + const std::vector &col_order, py::function batch_size_func, py::function batch_map_func, std::map>> pad_map); #endif @@ -58,6 +58,7 @@ class BatchNode : public DatasetNode { bool pad_; std::vector in_col_names_; std::vector out_col_names_; + std::vector col_order_; #ifdef ENABLE_PYTHON py::function batch_size_func_; py::function batch_map_func_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc index 91cc1fe98b2..2362bced009 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc @@ -29,12 +29,14 @@ namespace dataset { MapNode::MapNode(std::shared_ptr child, std::vector> operations, std::vector input_columns, std::vector output_columns, - const std::vector &project_columns, std::shared_ptr cache) + const std::vector &project_columns, std::shared_ptr cache, + std::vector> callbacks) : operations_(operations), input_columns_(input_columns), output_columns_(output_columns), project_columns_(project_columns), - DatasetNode(std::move(cache)) { + DatasetNode(std::move(cache)), + callbacks_(callbacks) { this->children.push_back(child); } @@ -53,6 +55,11 @@ std::vector> MapNode::Build() { // This parameter will be removed with next rebase std::vector col_orders; auto map_op = std::make_shared(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_); + + if (!callbacks_.empty()) { + map_op->AddCallbacks(callbacks_); + } + if (!project_columns_.empty()) { auto project_op = std::make_shared(project_columns_); node_ops.push_back(project_op); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h index aca9a19187b..101e73382a3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.h @@ -31,7 +31,8 @@ class MapNode : public DatasetNode { /// \brief Constructor MapNode(std::shared_ptr child, std::vector> operations, std::vector input_columns = {}, std::vector output_columns = {}, - const std::vector &columns = {}, std::shared_ptr cache = nullptr); + const std::vector &columns = {}, std::shared_ptr cache = nullptr, + std::vector> callbacks = {}); /// \brief Destructor ~MapNode() = default; @@ -49,6 +50,7 @@ class MapNode : public DatasetNode { std::vector input_columns_; std::vector output_columns_; std::vector project_columns_; + std::vector> callbacks_; }; } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/post/repeat_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/post/repeat_pass.cc index c94b34468a8..73be7aebc36 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/post/repeat_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/post/repeat_pass.cc @@ -149,7 +149,7 @@ Status RepeatPass::RunOnNode(std::shared_ptr node, bool *modified) { // We finish the walk of this RepeatOp's descendent nodes. // The total repeats of nodes above this Repeat(n) have nothing to do with this RepeatOp's parameter n. // But num_repeats_ has been multiplied by n during this Repeat(n)'s PreRunOnNode, - // so we devide num_repeats_ by n to be able to correctly set total repeats for nodes above this RepeatOp. + // so we divide num_repeats_ by n to be able to correctly set total repeats for nodes above this RepeatOp. num_repeats_ /= node->num_repeats(); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc index 85891afc026..4d15c775a23 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc @@ -120,7 +120,7 @@ Status CacheTransformPass::CachePass::NonMappableCacheLeafSetup(std::shared_ptr< RETURN_STATUS_UNEXPECTED("There is currently no support for multiple leaf nodes under cache."); } - // Sampler for non mapable dataset only works if there is a downstream cache. Remove it from the leaf + // Sampler for non mappable dataset only works if there is a downstream cache. Remove it from the leaf // as save it for use by cache op in ascendant tree. if (is_caching_) { RETURN_IF_NOT_OK(leaf_op->FetchRemoveSampler(&sampler_)); @@ -261,7 +261,8 @@ Status CacheTransformPass::RunOnTree(ExecutionTree *tree, bool *modified) { // Then, execute the transform for each pair for (auto cache_pair : cache_pass.cache_pairs()) { MS_LOG(DEBUG) << "Cache transform pass: Executing a cache op mappable transform."; - ExecuteCacheTransform(tree, cache_pair.first, cache_pair.second, cache_pair.second->cache_client()); + RETURN_IF_NOT_OK( + ExecuteCacheTransform(tree, cache_pair.first, cache_pair.second, cache_pair.second->cache_client())); } MS_LOG(INFO) << "Pre pass: Cache transform pass complete."; return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.h b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.h index 0c6d288f37d..cb2a4b58412 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.h +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.h @@ -60,95 +60,95 @@ class CacheTransformPass : public TreePass { #ifndef ENABLE_ANDROID - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; #endif - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; #ifdef ENABLE_PYTHON - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; #endif - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return Status RunOnNode(std::shared_ptr node, bool *modified) override; #ifndef ENABLE_ANDROID - /// \brief Perform leaf node cache tranform identifications + /// \brief Perform leaf node cache transform identifications /// \param[in] node The node being visited /// \param[inout] modified Indicator if the node was changed at all /// \return Status The error code return diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index 9e0eaca18cd..db2a8e8cee1 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -276,9 +276,10 @@ class Dataset : public std::enable_shared_from_this { std::vector input_columns = {}, std::vector output_columns = {}, const std::vector &project_columns = {}, - const std::shared_ptr &cache = nullptr) { + const std::shared_ptr &cache = nullptr, + std::vector> callbacks = {}) { return std::make_shared(shared_from_this(), operations, input_columns, output_columns, project_columns, - cache); + cache, callbacks); } /// \brief Function to create a Project Dataset @@ -443,7 +444,8 @@ class MapDataset : public Dataset { public: MapDataset(std::shared_ptr input, std::vector> operations, std::vector input_columns, std::vector output_columns, - const std::vector &project_columns, const std::shared_ptr &cache); + const std::vector &project_columns, const std::shared_ptr &cache, + std::vector> callbacks); }; class ProjectDataset : public Dataset { diff --git a/tests/ut/cpp/dataset/callback_test.cc b/tests/ut/cpp/dataset/callback_test.cc index 9509d081c46..d299d005fd1 100644 --- a/tests/ut/cpp/dataset/callback_test.cc +++ b/tests/ut/cpp/dataset/callback_test.cc @@ -21,6 +21,8 @@ #include "minddata/dataset/callback/ds_callback.h" #include "minddata/dataset/core/client.h" #include "minddata/dataset/engine/datasetops/source/random_data_op.h" +#include "minddata/dataset/include/datasets.h" +#include "minddata/dataset/include/transforms.h" #include "minddata/dataset/kernels/data/no_op.h" #include "utils/log_adapter.h" @@ -149,7 +151,7 @@ TEST_F(MindDataTestCallback, TestBasicCallback) { std::unique_ptr schema = std::make_unique(); TensorShape shape({}); // empty shape is a 1-value scalar Tensor ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape); - schema->AddColumn(col); + ASSERT_OK(schema->AddColumn(col)); std::shared_ptr leaf; rc = RandomDataOp::Builder().SetRowsPerBuffer(1).SetDataSchema(std::move(schema)).SetTotalRows(44).Build(&leaf); EXPECT_TRUE(rc.IsOk()); @@ -196,7 +198,7 @@ TEST_F(MindDataTestCallback, TestMutiEpochCallback) { std::unique_ptr schema = std::make_unique(); TensorShape shape({}); // empty shape is a 1-value scalar Tensor ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape); - schema->AddColumn(col); + ASSERT_OK(schema->AddColumn(col)); std::shared_ptr leaf; rc = RandomDataOp::Builder().SetRowsPerBuffer(1).SetDataSchema(std::move(schema)).SetTotalRows(4).Build(&leaf); EXPECT_TRUE(rc.IsOk()); @@ -253,7 +255,7 @@ TEST_F(MindDataTestCallback, TestSelectedCallback) { std::unique_ptr schema = std::make_unique(); TensorShape shape({}); // empty shape is a 1-value scalar Tensor ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape); - schema->AddColumn(col); + ASSERT_OK(schema->AddColumn(col)); std::shared_ptr leaf; rc = RandomDataOp::Builder().SetRowsPerBuffer(1).SetDataSchema(std::move(schema)).SetTotalRows(4).Build(&leaf); EXPECT_TRUE(rc.IsOk()); @@ -296,3 +298,34 @@ TEST_F(MindDataTestCallback, TestSelectedCallback) { EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs); EXPECT_EQ(tst_cb->all_step_nums(len), all_steps); } + +TEST_F(MindDataTestCallback, TestCAPICallback) { + MS_LOG(INFO) << "Doing: MindDataTestCallback-TestCAPICallback"; + // config callback + std::shared_ptr tst_cb = std::make_shared(64); + std::shared_ptr cb1 = tst_cb; + // config leaf_op, use random_data to avoid I/O + std::shared_ptr schema = std::make_shared(); + ASSERT_TRUE(schema->add_column("label", "uint32", {})); + std::shared_ptr ds = RandomData(44, schema); + ds = ds->Map({transforms::TypeCast("uint64")}, {"label"}, {}, {}, nullptr, {cb1}); + ds = ds->Repeat(2); + + TreeAdapter tree_adapter; + // using tree_adapter to set num_epoch = 1 + ASSERT_OK(tree_adapter.Compile(ds->IRNode(), 1)); + + TensorRow row; + ASSERT_OK(tree_adapter.GetNext(&row)); + while (!row.empty()) { + ASSERT_OK(tree_adapter.GetNext(&row)); + } + std::vector callback_names = {"BGN", "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND"}; + std::vector all_steps = {0, 0, 1, 1, 65, 65, 88}; + std::vector all_epochs = {0, 1, 1, 1, 1, 1, 1}; + // doing resize to make sure no unexpected epoch_end or extra epoch_begin is called + size_t len = 7; + EXPECT_EQ(tst_cb->all_names(len), callback_names); + EXPECT_EQ(tst_cb->all_step_nums(len), all_steps); + EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs); +}