profiler test

This commit is contained in:
zetongzhao 2021-10-29 15:16:47 -04:00
parent c063fe67b9
commit 2de8142c44
8 changed files with 192 additions and 1 deletions

View File

@ -209,6 +209,15 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc
int64_t GetTreeBatchSize() override;
bool IsPython() const override {
#ifdef ENABLE_PYTHON
if (batch_map_func_ || batch_size_func_) {
return true;
}
#endif
return false;
}
private:
// Worker thread for doing the memcpy of batch
// @param int32_t param workerId

View File

@ -344,6 +344,8 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// \brief Remove all callbacks from DatasetOp
void ClearCallbacks() { callback_manager_.ClearCallbacks(); }
virtual bool IsPython() const { return false; }
protected:
// \brief Removes a parent operator from this operator
// \notes External callers do not have access to this function

View File

@ -118,6 +118,15 @@ class MapOp : public ParallelOp<std::unique_ptr<MapWorkerJob>, TensorRow> {
const auto &TFuncs() const { return tfuncs_; }
bool IsPython() const override {
for (const auto &tensorOp : tfuncs_) {
if (tensorOp->Name() == kPyFuncOp) {
return true;
}
}
return false;
}
private:
// A helper function to create jobs for workers.
Status GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job);

View File

@ -77,6 +77,8 @@ class GeneratorOp : public PipelineOp, public RandomAccessOp {
/// \return Name of the current Op
std::string Name() const override { return "GeneratorOp"; }
bool IsPython() const override { return true; }
private:
py::function generator_function_;
std::vector<std::string> column_names_;

View File

@ -145,6 +145,15 @@ class ExecutionTree {
return out;
}
const bool IsPython() {
for (auto itr = this->begin(); itr != this->end(); ++itr) {
if (itr->IsPython()) {
return true;
}
}
return false;
}
/// \brief Given the number of workers, launches the worker entry function for each. Essentially a
/// wrapper for the TaskGroup handling that is stored inside the execution tree.
/// \param num_workers - The number of workers to launch

View File

@ -361,7 +361,7 @@ Status CpuSampler::UpdateTaskList() {
}
}
if (!fetched_all_python_multiprocesses_) {
if (!fetched_all_python_multiprocesses_ && tree->IsPython()) {
py::gil_scoped_acquire gil_acquire;
py::module ds = py::module::import("mindspore.dataset.engine.datasets");
py::tuple process_info = ds.attr("_get_operator_process")();

View File

@ -104,6 +104,7 @@ SET(DE_UT_SRCS
pad_op_test.cc
path_test.cc
perf_data_test.cc
profiler_test.cc
queue_test.cc
random_affine_op_test.cc
random_color_adjust_op_test.cc

View File

@ -0,0 +1,159 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/common.h"
#include "minddata/dataset/engine/perf/profiling.h"
#include "minddata/dataset/include/dataset/datasets.h"
using namespace mindspore::dataset;
using mindspore::LogStream;
using mindspore::MsLogLevel::INFO;
namespace mindspore {
namespace dataset {
namespace test {
class MindDataTestProfiler : public UT::DatasetOpTesting {
protected:
MindDataTestProfiler() {}
Status DeleteFiles(int file_id = 0) {
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
std::string pipeline_file = "./pipeline_profiling_" + std::to_string(file_id) + ".json";
std::string cpu_util_file = "./minddata_cpu_utilization_" + std::to_string(file_id) + ".json";
std::string dataset_iterator_file = "./dataset_iterator_profiling_" + std::to_string(file_id) + ".txt";
if (remove(pipeline_file.c_str()) == 0 && remove(cpu_util_file.c_str()) == 0 &&
remove(dataset_iterator_file.c_str()) == 0) {
return Status::OK();
} else {
RETURN_STATUS_UNEXPECTED("Error deleting profiler files");
}
}
};
/// Feature: MindData Profiling Support
/// Description: Test MindData Profiling with profiling enabled for pipeline with ImageFolder
/// Expectation: Profiling files are created.
TEST_F(MindDataTestProfiler, TestProfilerManager1) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestProfilerManager1.";
// Enable profiler and check
common::SetEnv("RANK_ID", "1");
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
EXPECT_OK(profiler_manager->Init("."));
EXPECT_OK(profiler_manager->Start());
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
std::string folder_path = datasets_root_path_ + "/testPK/data/";
std::shared_ptr<Dataset> ds = ImageFolder(folder_path, true, std::make_shared<SequentialSampler>(0, 2));
EXPECT_NE(ds, nullptr);
ds = ds->Repeat(2);
EXPECT_NE(ds, nullptr);
ds = ds->Shuffle(4);
EXPECT_NE(ds, nullptr);
// Create objects for the tensor ops
std::shared_ptr<TensorTransform> one_hot = std::make_shared<transforms::OneHot>(10);
EXPECT_NE(one_hot, nullptr);
// Create a Map operation, this will automatically add a project after map
ds = ds->Map({one_hot}, {"label"}, {"label"}, {"label"});
EXPECT_NE(ds, nullptr);
ds = ds->Take(4);
EXPECT_NE(ds, nullptr);
ds = ds->Batch(2, true);
EXPECT_NE(ds, nullptr);
// No columns are specified, use all columns
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::vector<mindspore::MSTensor> row;
ASSERT_OK(iter->GetNextRow(&row));
uint64_t i = 0;
while (row.size() != 0) {
ASSERT_OK(iter->GetNextRow(&row));
i++;
}
EXPECT_EQ(i, 2);
// Manually terminate the pipeline
iter->Stop();
// File_id is expected to equal RANK_ID
EXPECT_OK(DeleteFiles(1));
// Disable profiler
EXPECT_OK(profiler_manager->Stop());
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
}
/// Feature: MindData Profiling Support
/// Description: Test MindData Profiling with profiling enabled for pipeline with Mnist
/// Expectation: Profiling files are created.
TEST_F(MindDataTestProfiler, TestProfilerManager2) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestProfilerManager2.";
// Enable profiler and check
common::SetEnv("RANK_ID", "2");
std::shared_ptr<ProfilingManager> profiler_manager = GlobalContext::profiling_manager();
EXPECT_OK(profiler_manager->Init("."));
EXPECT_OK(profiler_manager->Start());
EXPECT_TRUE(profiler_manager->IsProfilingEnable());
// Create a Mnist Dataset
std::string folder_path = datasets_root_path_ + "/testMnistData/";
std::shared_ptr<Dataset> ds = Mnist(folder_path, "all", std::make_shared<SequentialSampler>(0, 3));
EXPECT_NE(ds, nullptr);
ds = ds->Skip(1);
EXPECT_NE(ds, nullptr);
ds = ds->Repeat(2);
EXPECT_NE(ds, nullptr);
ds = ds->Batch(2, false);
EXPECT_NE(ds, nullptr);
// No columns are specified, use all columns
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// Iterate the dataset and get each row
std::vector<mindspore::MSTensor> row;
ASSERT_OK(iter->GetNextRow(&row));
uint64_t i = 0;
while (row.size() != 0) {
ASSERT_OK(iter->GetNextRow(&row));
i++;
}
EXPECT_EQ(i, 2);
// Manually terminate the pipeline
iter->Stop();
// File_id is expected to equal RANK_ID
EXPECT_OK(DeleteFiles(2));
// Disable profiler
EXPECT_OK(profiler_manager->Stop());
EXPECT_FALSE(profiler_manager->IsProfilingEnable());
}
} // namespace test
} // namespace dataset
} // namespace mindspore