diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h index 2a6ec722890..08fb31d9727 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h @@ -209,6 +209,15 @@ class BatchOp : public ParallelOp, CBatc int64_t GetTreeBatchSize() override; + bool IsPython() const override { +#ifdef ENABLE_PYTHON + if (batch_map_func_ || batch_size_func_) { + return true; + } +#endif + return false; + } + private: // Worker thread for doing the memcpy of batch // @param int32_t param workerId diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h index cd1e6e1f390..cd601d47355 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h @@ -344,6 +344,8 @@ class DatasetOp : public std::enable_shared_from_this { // \brief Remove all callbacks from DatasetOp void ClearCallbacks() { callback_manager_.ClearCallbacks(); } + virtual bool IsPython() const { return false; } + protected: // \brief Removes a parent operator from this operator // \notes External callers do not have access to this function diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h index 0aa67c1edf1..770ee6f9b3c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h @@ -118,6 +118,15 @@ class MapOp : public ParallelOp, TensorRow> { const auto &TFuncs() const { return tfuncs_; } + bool IsPython() const override { + for (const auto &tensorOp : tfuncs_) { + if (tensorOp->Name() == kPyFuncOp) { + return true; + } + } + return false; + } + private: // A helper function to create jobs for workers. Status GenerateWorkerJob(const std::unique_ptr *worker_job); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h index 4272ebda684..1091aa79c60 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h @@ -77,6 +77,8 @@ class GeneratorOp : public PipelineOp, public RandomAccessOp { /// \return Name of the current Op std::string Name() const override { return "GeneratorOp"; } + bool IsPython() const override { return true; } + private: py::function generator_function_; std::vector column_names_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h index fac835e74c5..789f63607e1 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h @@ -145,6 +145,15 @@ class ExecutionTree { return out; } + const bool IsPython() { + for (auto itr = this->begin(); itr != this->end(); ++itr) { + if (itr->IsPython()) { + return true; + } + } + return false; + } + /// \brief Given the number of workers, launches the worker entry function for each. Essentially a /// wrapper for the TaskGroup handling that is stored inside the execution tree. /// \param num_workers - The number of workers to launch diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc index 94210893272..f79ba37cb59 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampler.cc @@ -361,7 +361,7 @@ Status CpuSampler::UpdateTaskList() { } } - if (!fetched_all_python_multiprocesses_) { + if (!fetched_all_python_multiprocesses_ && tree->IsPython()) { py::gil_scoped_acquire gil_acquire; py::module ds = py::module::import("mindspore.dataset.engine.datasets"); py::tuple process_info = ds.attr("_get_operator_process")(); diff --git a/tests/ut/cpp/dataset/CMakeLists.txt b/tests/ut/cpp/dataset/CMakeLists.txt index 8a38e54ecd0..73d7630727b 100644 --- a/tests/ut/cpp/dataset/CMakeLists.txt +++ b/tests/ut/cpp/dataset/CMakeLists.txt @@ -104,6 +104,7 @@ SET(DE_UT_SRCS pad_op_test.cc path_test.cc perf_data_test.cc + profiler_test.cc queue_test.cc random_affine_op_test.cc random_color_adjust_op_test.cc diff --git a/tests/ut/cpp/dataset/profiler_test.cc b/tests/ut/cpp/dataset/profiler_test.cc new file mode 100644 index 00000000000..4bfb6c1a45a --- /dev/null +++ b/tests/ut/cpp/dataset/profiler_test.cc @@ -0,0 +1,159 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/common.h" +#include "minddata/dataset/engine/perf/profiling.h" +#include "minddata/dataset/include/dataset/datasets.h" + +using namespace mindspore::dataset; +using mindspore::LogStream; +using mindspore::MsLogLevel::INFO; + +namespace mindspore { +namespace dataset { +namespace test { +class MindDataTestProfiler : public UT::DatasetOpTesting { + protected: + MindDataTestProfiler() {} + Status DeleteFiles(int file_id = 0) { + std::shared_ptr profiler_manager = GlobalContext::profiling_manager(); + std::string pipeline_file = "./pipeline_profiling_" + std::to_string(file_id) + ".json"; + std::string cpu_util_file = "./minddata_cpu_utilization_" + std::to_string(file_id) + ".json"; + std::string dataset_iterator_file = "./dataset_iterator_profiling_" + std::to_string(file_id) + ".txt"; + if (remove(pipeline_file.c_str()) == 0 && remove(cpu_util_file.c_str()) == 0 && + remove(dataset_iterator_file.c_str()) == 0) { + return Status::OK(); + } else { + RETURN_STATUS_UNEXPECTED("Error deleting profiler files"); + } + } +}; + +/// Feature: MindData Profiling Support +/// Description: Test MindData Profiling with profiling enabled for pipeline with ImageFolder +/// Expectation: Profiling files are created. +TEST_F(MindDataTestProfiler, TestProfilerManager1) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestProfilerManager1."; + + // Enable profiler and check + common::SetEnv("RANK_ID", "1"); + std::shared_ptr profiler_manager = GlobalContext::profiling_manager(); + EXPECT_OK(profiler_manager->Init(".")); + EXPECT_OK(profiler_manager->Start()); + EXPECT_TRUE(profiler_manager->IsProfilingEnable()); + + std::string folder_path = datasets_root_path_ + "/testPK/data/"; + std::shared_ptr ds = ImageFolder(folder_path, true, std::make_shared(0, 2)); + EXPECT_NE(ds, nullptr); + + ds = ds->Repeat(2); + EXPECT_NE(ds, nullptr); + + ds = ds->Shuffle(4); + EXPECT_NE(ds, nullptr); + + // Create objects for the tensor ops + std::shared_ptr one_hot = std::make_shared(10); + EXPECT_NE(one_hot, nullptr); + + // Create a Map operation, this will automatically add a project after map + ds = ds->Map({one_hot}, {"label"}, {"label"}, {"label"}); + EXPECT_NE(ds, nullptr); + + ds = ds->Take(4); + EXPECT_NE(ds, nullptr); + + ds = ds->Batch(2, true); + EXPECT_NE(ds, nullptr); + + // No columns are specified, use all columns + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + ASSERT_OK(iter->GetNextRow(&row)); + i++; + } + EXPECT_EQ(i, 2); + // Manually terminate the pipeline + iter->Stop(); + + // File_id is expected to equal RANK_ID + EXPECT_OK(DeleteFiles(1)); + + // Disable profiler + EXPECT_OK(profiler_manager->Stop()); + EXPECT_FALSE(profiler_manager->IsProfilingEnable()); +} + +/// Feature: MindData Profiling Support +/// Description: Test MindData Profiling with profiling enabled for pipeline with Mnist +/// Expectation: Profiling files are created. +TEST_F(MindDataTestProfiler, TestProfilerManager2) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestProfilerManager2."; + + // Enable profiler and check + common::SetEnv("RANK_ID", "2"); + std::shared_ptr profiler_manager = GlobalContext::profiling_manager(); + EXPECT_OK(profiler_manager->Init(".")); + EXPECT_OK(profiler_manager->Start()); + EXPECT_TRUE(profiler_manager->IsProfilingEnable()); + + // Create a Mnist Dataset + std::string folder_path = datasets_root_path_ + "/testMnistData/"; + std::shared_ptr ds = Mnist(folder_path, "all", std::make_shared(0, 3)); + EXPECT_NE(ds, nullptr); + + ds = ds->Skip(1); + EXPECT_NE(ds, nullptr); + + ds = ds->Repeat(2); + EXPECT_NE(ds, nullptr); + + ds = ds->Batch(2, false); + EXPECT_NE(ds, nullptr); + + // No columns are specified, use all columns + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::vector row; + ASSERT_OK(iter->GetNextRow(&row)); + + uint64_t i = 0; + while (row.size() != 0) { + ASSERT_OK(iter->GetNextRow(&row)); + i++; + } + EXPECT_EQ(i, 2); + // Manually terminate the pipeline + iter->Stop(); + + // File_id is expected to equal RANK_ID + EXPECT_OK(DeleteFiles(2)); + + // Disable profiler + EXPECT_OK(profiler_manager->Stop()); + EXPECT_FALSE(profiler_manager->IsProfilingEnable()); +} +} // namespace test +} // namespace dataset +} // namespace mindspore