diff --git a/mindspore/ccsrc/dataset/CMakeLists.txt b/mindspore/ccsrc/dataset/CMakeLists.txt index 068aec8873f..da0741e5051 100644 --- a/mindspore/ccsrc/dataset/CMakeLists.txt +++ b/mindspore/ccsrc/dataset/CMakeLists.txt @@ -62,6 +62,7 @@ add_dependencies(engine-datasetops-source core) add_dependencies(engine-datasetops-source-sampler core) add_dependencies(engine-datasetops core) add_dependencies(engine-opt core) +add_dependencies(engine-perf core) add_dependencies(engine-gnn core) add_dependencies(engine core) add_dependencies(text core) @@ -81,6 +82,7 @@ set(submodules $ $ $ + $ $ $ $ diff --git a/mindspore/ccsrc/dataset/api/python_bindings.cc b/mindspore/ccsrc/dataset/api/python_bindings.cc index 57fbaea0278..04f89c7e259 100644 --- a/mindspore/ccsrc/dataset/api/python_bindings.cc +++ b/mindspore/ccsrc/dataset/api/python_bindings.cc @@ -239,11 +239,13 @@ void bindTensor(py::module *m) { .def("set_worker_connector_size", &ConfigManager::set_worker_connector_size) .def("set_op_connector_size", &ConfigManager::set_op_connector_size) .def("set_seed", &ConfigManager::set_seed) + .def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval) .def("get_rows_per_buffer", &ConfigManager::rows_per_buffer) .def("get_num_parallel_workers", &ConfigManager::num_parallel_workers) .def("get_worker_connector_size", &ConfigManager::worker_connector_size) .def("get_op_connector_size", &ConfigManager::op_connector_size) .def("get_seed", &ConfigManager::seed) + .def("get_monitor_sampling_interval", &ConfigManager::monitor_sampling_interval) .def("load", [](ConfigManager &c, std::string s) { (void)c.LoadFile(s); }); (void)py::class_>(*m, "Tensor", py::buffer_protocol()) diff --git a/mindspore/ccsrc/dataset/core/config_manager.cc b/mindspore/ccsrc/dataset/core/config_manager.cc index 3f659555f4b..8732a4c4b0a 100644 --- a/mindspore/ccsrc/dataset/core/config_manager.cc +++ b/mindspore/ccsrc/dataset/core/config_manager.cc @@ -88,5 +88,7 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector uint32_t ConfigManager::seed() const { return seed_; } void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; } + +void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; } } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/core/config_manager.h b/mindspore/ccsrc/dataset/core/config_manager.h index 654d5f930c4..807591daa16 100644 --- a/mindspore/ccsrc/dataset/core/config_manager.h +++ b/mindspore/ccsrc/dataset/core/config_manager.h @@ -111,12 +111,21 @@ class ConfigManager { // @param seed - The default seed to use void set_seed(uint32_t seed); + // setter function + // @param interval - The setting to apply to the config + void set_monitor_sampling_interval(uint32_t interval); + + // getter function + // @return The iterval of monitor sampling + int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; } + private: int32_t rows_per_buffer_{kCfgRowsPerBuffer}; int32_t num_parallel_workers_{kCfgParallelWorkers}; int32_t worker_connector_size_{kCfgWorkerConnectorSize}; int32_t op_connector_size_{kCfgOpConnectorSize}; uint32_t seed_{kCfgDefaultSeed}; + uint32_t monitor_sampling_interval_{kCfgMonitorSamplingInterval}; // Private helper function that taks a nlohmann json format and populates the settings // @param j - The json nlohmann json info diff --git a/mindspore/ccsrc/dataset/core/constants.h b/mindspore/ccsrc/dataset/core/constants.h index 9c0e24acc63..ca854e29fcf 100644 --- a/mindspore/ccsrc/dataset/core/constants.h +++ b/mindspore/ccsrc/dataset/core/constants.h @@ -47,6 +47,7 @@ constexpr uint32_t kCfgParallelWorkers = 4; constexpr uint32_t kCfgWorkerConnectorSize = 16; constexpr uint32_t kCfgOpConnectorSize = 16; constexpr uint32_t kCfgDefaultSeed = std::mt19937::default_seed; +constexpr uint32_t kCfgMonitorSamplingInterval = 10; // Invalid OpenCV type should not be from 0 to 7 (opencv4/opencv2/core/hal/interface.h) constexpr uint8_t kCVInvalidType = 255; diff --git a/mindspore/ccsrc/dataset/engine/CMakeLists.txt b/mindspore/ccsrc/dataset/engine/CMakeLists.txt index e7b5e682f38..66f95d09266 100644 --- a/mindspore/ccsrc/dataset/engine/CMakeLists.txt +++ b/mindspore/ccsrc/dataset/engine/CMakeLists.txt @@ -1,6 +1,7 @@ add_subdirectory(datasetops) add_subdirectory(opt) add_subdirectory(gnn) +add_subdirectory(perf) if (ENABLE_TDTQUE) add_subdirectory(tdt) endif () @@ -16,7 +17,7 @@ add_library(engine OBJECT target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS}) if (ENABLE_TDTQUE) - add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt engine-gnn) + add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt engine-gnn engine-perf) else() - add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn) + add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn engine-perf) endif () diff --git a/mindspore/ccsrc/dataset/engine/dataset_iterator.cc b/mindspore/ccsrc/dataset/engine/dataset_iterator.cc index 011e60cc246..3d50d25590a 100644 --- a/mindspore/ccsrc/dataset/engine/dataset_iterator.cc +++ b/mindspore/ccsrc/dataset/engine/dataset_iterator.cc @@ -83,7 +83,14 @@ Status IteratorBase::FetchNextTensorRow(TensorRow *out_row) { } // Constructor of the DatasetIterator -DatasetIterator::DatasetIterator(std::shared_ptr exe_tree) : IteratorBase(), root_(exe_tree->root()) {} +DatasetIterator::DatasetIterator(std::shared_ptr exe_tree) + : IteratorBase(), root_(exe_tree->root()), tracing_(nullptr), cur_batch_num_(0), cur_connector_size_(0) { + std::shared_ptr node; + Status s = exe_tree->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node); + if (s.IsOk()) { + tracing_ = std::dynamic_pointer_cast(node); + } +} DatasetIterator::~DatasetIterator() = default; @@ -101,6 +108,10 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { // Check if we need to get a new DataBuffer to iterate. if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) { + if (tracing_ != nullptr) { + cur_connector_size_ = root_->ConnectorSize(); + cur_connector_capacity_ = root_->ConnectorCapacity(); + } RETURN_IF_NOT_OK(root_->GetNextBuffer(&curr_buffer_)); // Since GetNextBuffer was used rather than GetNextInput(), it means we need to manually @@ -121,6 +132,8 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { } eof_handled_ = true; curr_buffer_.reset(); // explicitly free the eof buffer + // Set tree to Finished state + root_->Tree()->SetFinished(); return Status::OK(); } @@ -131,13 +144,18 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { // flow of an eof up the pipeline by itself. eof_handled_ = true; curr_buffer_.reset(); // explicitly free the eof buffer + // Set tree to Finished state + root_->Tree()->SetFinished(); return Status::OK(); } } // If we got this far, now it's time to pop that next row for return to caller RETURN_IF_NOT_OK(curr_buffer_->PopRow(out_row)); - + if (tracing_ != nullptr) { + cur_batch_num_++; + tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_); + } return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/dataset_iterator.h b/mindspore/ccsrc/dataset/engine/dataset_iterator.h index ddd4883a86c..8c9c0062f0d 100644 --- a/mindspore/ccsrc/dataset/engine/dataset_iterator.h +++ b/mindspore/ccsrc/dataset/engine/dataset_iterator.h @@ -24,6 +24,7 @@ #include "dataset/core/tensor.h" #include "dataset/engine/datasetops/dataset_op.h" #include "dataset/engine/execution_tree.h" +#include "dataset/engine/perf/dataset_iterator_tracing.h" namespace mindspore { namespace dataset { @@ -109,6 +110,10 @@ class DatasetIterator : public IteratorBase { private: std::shared_ptr root_; // saves the root of the executionTree TensorRow device_queue_row_; + std::shared_ptr tracing_; // trace profiling data + int32_t cur_batch_num_; // current batch number,used for profiling + int32_t cur_connector_size_; // current connector size of root op,used for profiling + int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling }; // The ChildIterator derived class is for fetching rows from intermediate nodes of execution tree. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h index d1d7e232cf4..aa0a45d4912 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h @@ -189,6 +189,10 @@ class BatchOp : public ParallelOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "BatchOp"; } + private: // Worker thread for doing the memcpy of batch // @param int32_t param workerId diff --git a/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h b/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h index 9afadab39a2..e2d68e182a0 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h @@ -81,6 +81,10 @@ class ConcatOp : public PipelineOp { // before providing their own implementations. Status PrepareNodePostAction() override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "ConcatOp"; } + private: Status Verify(int32_t id, const std::unique_ptr &buf); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc index 9ee6e706aad..e105c554f2d 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc @@ -38,6 +38,7 @@ DatasetOp::DatasetOp(int32_t op_connector_size) tree_(nullptr), state_(OpState::kDeOpIdle), op_ctrl_flags_(kDeOpNone), + out_connector_(nullptr), first_fetch_(true) { // The operator starts out with an invalid operator id. The only way to // get it out of invalid state is to assign the operator to an execution tree. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h index 29b59ba2f73..57ca40c2a02 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h @@ -51,7 +51,7 @@ class DatasetOp : public std::enable_shared_from_this { }; // Flags that control operator runtime behaviours - enum OpState { kDeOpRunning = 0, kDeOpIdle = 1 }; + enum OpState { kDeOpRunning = 0, kDeOpIdle = 1, kDeOpTerminated }; // Constructor // @param op_connector_size - The size for the output connector of this operator. @@ -213,11 +213,23 @@ class DatasetOp : public std::enable_shared_from_this { // Getter function // @return connector size of current op - virtual int32_t ConnectorSize() const { return out_connector_->size(); } + int32_t ConnectorSize() const { + if (!inlined()) { + return out_connector_->size(); + } + // Return -1 for inlined op + return -1; + } // Getter function // @return connector size of current op - virtual int32_t ConnectorCapacity() const { return out_connector_->capacity(); } + int32_t ConnectorCapacity() const { + if (!inlined()) { + return out_connector_->size(); + } + // Return -1 for inlined op + return -1; + } // Getter function // @return connector size of child op @@ -228,7 +240,7 @@ class DatasetOp : public std::enable_shared_from_this { int32_t ChildOpConnectorCapacity(int32_t child_index = 0) const { return child_[child_index]->ConnectorCapacity(); } // Children Getter - // @return Vector or Children + // @return Vector of Children std::vector> Children() const { return child_; } // Base method for NodePass visit. @@ -237,6 +249,14 @@ class DatasetOp : public std::enable_shared_from_this { // @return Statue of the node visit virtual Status Accept(NodePass *p, bool *modified); + // Op name getter + // @return Name of the current Op + virtual std::string Name() const { return "DatasetOp"; } + + // Execution Tree getter + // @return Pointer to the ExecutionTree the current op belongs to, no ownership + ExecutionTree *Tree() { return tree_; } + protected: // Adds a parent operator to this operator // @notes External callers do not have access to this function. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc index 3857accbefa..a2b55ecfb24 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc @@ -13,25 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "dataset/engine/datasetops/device_queue_op.h" + #include #include #include - #include "dataset/core/config_manager.h" #include "dataset/core/global_context.h" +#include "dataset/engine/datasetops/device_queue_op.h" #include "dataset/engine/data_buffer.h" #include "dataset/engine/dataset_iterator.h" +#include "dataset/engine/opt/pass.h" +#include "dataset/engine/perf/profiling.h" +#include "dataset/engine/perf/device_queue_tracing.h" #include "dataset/util/status.h" #include "dataset/util/task_manager.h" -#include "dataset/engine/opt/pass.h" -#include "dataset/util/profiling.h" namespace mindspore { namespace dataset { -#define DEVICE_QUEUE_PROFILING_DATA(type, subtype, batch_num, value) \ - std::to_string(type) + " " + std::to_string(subtype) + " " + std::to_string(batch_num) + " " + std::to_string(value) - DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size, int32_t op_connector_size, int64_t num_batch) : PipelineOp(op_connector_size), @@ -101,22 +99,16 @@ Status DeviceQueueOp::SendDataToAscend() { MS_LOG(INFO) << "Device queue, sending data to Ascend."; int64_t total_batch = 0; bool is_break_loop = false; - double batch_start_time, tdt_start_time, end_time; + double batch_start_time, end_time; int32_t batch_cost, tdt_cost; int32_t connector_size = 0; int32_t connector_capacity; - std::shared_ptr profiling_node; - bool isProfilingEnable = ProfilingManager::GetInstance().IsProfilingEnable(); + std::shared_ptr profiling_node; + bool isProfilingEnable = tree_->GetProfilingManager()->IsProfilingEnable(); if (isProfilingEnable) { - std::string file_name = "critical_point_profiling"; - // Here can determine performance bottleneck is in pipeline or in tdt. - // Context format of this file "type subtype batchnum value" - // type:0: time, 1: queue depth - // subtype:0: pipeline time, 1: push tdt time, 2: all time - // batchnum: batch number - // value: value of time(ms) or queue depth - profiling_node = std::make_shared(file_name, device_id_); - RETURN_IF_NOT_OK(ProfilingManager::GetInstance().RegisterProfilingNode(&profiling_node)); + std::shared_ptr node; + RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node)); + profiling_node = std::dynamic_pointer_cast(node); batch_start_time = ProfilingTime::GetCurMilliSecond(); connector_capacity = ChildOpConnectorCapacity(); } @@ -129,29 +121,23 @@ Status DeviceQueueOp::SendDataToAscend() { TensorRow currRow; for (int row_id = 0; row_id < current_buffer->NumRows() && !is_break_loop; row_id++) { RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); - if (isProfilingEnable) { - tdt_start_time = ProfilingTime::GetCurMilliSecond(); - } - auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_); + auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost); if (status == TdtStatus::FAILED) { return Status(StatusCode::kTDTPushFailure, "TDT Push Failed"); } if (isProfilingEnable) { end_time = ProfilingTime::GetCurMilliSecond(); - tdt_cost = (int32_t)(end_time - tdt_start_time); // record push tdt time - profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, TDT_PUSH_TIME, total_batch + 1, tdt_cost)); + profiling_node->Record(TIME, TDT_PUSH_TIME, total_batch + 1, tdt_cost); batch_cost = (int32_t)(end_time - batch_start_time); // record batch time - profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, BATCH_TIME, total_batch + 1, batch_cost)); + profiling_node->Record(TIME, BATCH_TIME, total_batch + 1, batch_cost); // record pipeline time - profiling_node->Record( - DEVICE_QUEUE_PROFILING_DATA(TIME, PIPELINE_TIME, total_batch + 1, batch_cost - tdt_cost)); + profiling_node->Record(TIME, PIPELINE_TIME, total_batch + 1, batch_cost - tdt_cost); batch_start_time = end_time; // record connector depth - profiling_node->Record( - DEVICE_QUEUE_PROFILING_DATA(CONNECTOR_DEPTH, connector_capacity, total_batch + 1, connector_size)); + profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, total_batch + 1, connector_size); } total_batch++; if (num_batch_ > 0 && total_batch == num_batch_) { @@ -171,9 +157,7 @@ Status DeviceQueueOp::SendDataToAscend() { RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } - if (isProfilingEnable) { - profiling_node->SaveToFile(); - } + tree_->SetFinished(); MS_LOG(INFO) << "Device queue total batch is " << total_batch << ", number of batches is " << num_batch_ << "."; return Status::OK(); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h index ebbcd16cc3a..6fec18986ee 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h @@ -140,6 +140,10 @@ class DeviceQueueOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "DeviceQueueOp"; } + private: // Name: checkExceptions(DataBuffer); // Description: Check whether the dataBuffer meets the condition for performing DeviceQueueOp diff --git a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h index cd6c01da90f..36f70cb82f5 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h @@ -127,6 +127,10 @@ class FilterOp : public ParallelOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "FilterOp"; } + private: // predicate_func python callable which returns a boolean value. py::function predicate_func_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/map_op.h b/mindspore/ccsrc/dataset/engine/datasetops/map_op.h index f903881ca24..8bec6179e30 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/map_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/map_op.h @@ -177,6 +177,10 @@ class MapOp : public ParallelOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "MapOp"; } + private: // Local queues where worker threads can pop from. // Popping directly from the Connector can block if the previous designated threads haven't pop. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/project_op.h b/mindspore/ccsrc/dataset/engine/datasetops/project_op.h index 3940b9adc71..fc491a6b5ae 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/project_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/project_op.h @@ -107,6 +107,10 @@ class ProjectOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "ProjectOp"; } + private: std::vector columns_to_project_; std::vector projected_column_indices_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h index 2bd4875fda7..ce7d59fa6b4 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h @@ -116,6 +116,10 @@ class RenameOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "RenameOp"; } + protected: // Rename core functionality Status RenameColumns(); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h index 1bcfade0b8b..233bc17e453 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h @@ -124,9 +124,9 @@ class RepeatOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; - virtual int32_t ConnectorSize() const { return child_[0]->ConnectorSize(); } - - virtual int32_t ConnectorCapacity() const { return child_[0]->ConnectorCapacity(); } + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "RepeatOp"; } private: int32_t max_repeats_; // The number of repeats that the user requested diff --git a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h index baabad758c6..e04cb41a60a 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h @@ -161,6 +161,10 @@ class ShuffleOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "ShuffleOp"; } + private: // Private function to add a new row to the shuffle buffer. // @return Status - The error code return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/skip_op.h b/mindspore/ccsrc/dataset/engine/datasetops/skip_op.h index 40db770642c..0812fe51e71 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/skip_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/skip_op.h @@ -80,6 +80,10 @@ class SkipOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "SkipOp"; } + private: int32_t max_skips_; // The number of skips that the user requested int32_t skip_count_; // A counter for the current number of executed skips diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h index f92ba94df75..7d2d44ceb76 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h @@ -169,6 +169,10 @@ class CelebAOp : public ParallelOp, RandomAccessOp { // @return Status - The error code return Status AddIOBlock(std::unique_ptr *data_buffer); + // Op name getter + // @return Name of the current Op + std::string Name() const { return "CelebAOp"; } + private: // Called first when function is called // @return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h index 35c0121818e..62c20ac4010 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h @@ -155,6 +155,10 @@ class CifarOp : public ParallelOp, public RandomAccessOp { // @return static Status CountTotalRows(const std::string &dir, bool isCIFAR10, int64_t *count); + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "CifarOp"; } + private: // Initialize Sampler, calls sampler->Init() within // @return Status - The error code return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h index afeff29b869..b96f41289bd 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h @@ -127,6 +127,10 @@ class GeneratorOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "GeneratorOp"; } + private: py::function generator_function_; std::vector column_names_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h index 7acecedba61..9aab856faaa 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h @@ -210,6 +210,10 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "ImageFolderOp"; } + private: // Initialize Sampler, calls sampler->Init() within // @return Status - The error code return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h index 5283a3ca380..0726a45c577 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h @@ -172,6 +172,10 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { static Status GetClassIndexing(const std::string &file, const py::dict &dict, const std::string &usage, std::map *output_class_indexing); + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "ManifestOp"; } + private: // Initialize Sampler, calls sampler->Init() within // @return Status - The error code return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h index 77b25139303..b704240aaa6 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h @@ -218,6 +218,10 @@ class MindRecordOp : public ParallelOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "MindRecordOp"; } + private: Status GetBufferFromReader(std::unique_ptr *fetched_buffer, int64_t buffer_id, int32_t worker_id); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h index 4a31b7cf666..5b63b92ab2f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h @@ -152,6 +152,10 @@ class MnistOp : public ParallelOp, public RandomAccessOp { // @return static Status CountTotalRows(const std::string &dir, int64_t *count); + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "MnistOp"; } + private: // Initialize Sampler, calls sampler->Init() within // @return Status - The error code return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h index 92d05d7318d..48cfb0be516 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h @@ -189,6 +189,10 @@ class RandomDataOp : public ParallelOp { */ int64_t GetTotalRows() const { return total_rows_; } + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "RandomDataOp"; } + private: /** * The entry point code for when workers are launched diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h index 63dae54930d..2445c5c6be2 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h @@ -169,6 +169,10 @@ class TextFileOp : public ParallelOp { // @return Status - the error coed returned. static Status CountAllFileRows(const std::vector &files, int64_t *count); + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "TextFileOp"; } + private: // The entry point for when workers are launched. // @param worker_id - the id of the worker that is executing this function. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h index 17a76f2c3d7..17728136c7a 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h @@ -228,6 +228,10 @@ class TFReaderOp : public ParallelOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "TFReaderOp"; } + private: // The entry point for when workers are launched. // @param worker_id - the id of the worker that is executing this function. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h index 8a823614619..79e31a6bf8a 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h @@ -205,6 +205,10 @@ class VOCOp : public ParallelOp, public RandomAccessOp { static Status GetClassIndexing(const std::string &dir, const std::string &task_type, const std::string &task_mode, const py::dict &dict, std::map *output_class_indexing); + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "VOCOp"; } + private: // Initialize Sampler, calls sampler->Init() within // @return Status - The error code return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/take_op.h b/mindspore/ccsrc/dataset/engine/datasetops/take_op.h index 64ba8e69e00..4eeb55bdd06 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/take_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/take_op.h @@ -90,6 +90,10 @@ class TakeOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "TakeOp"; } + private: int32_t max_takes_; // The number of takes that the user requested int32_t take_count_; // A counter for the current number of executed takes diff --git a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h index 1140a98dd73..2a7c3e90c6a 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h @@ -110,6 +110,10 @@ class ZipOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "ZipOp"; } + private: // Handles preprocessing of the main loop, used when starting new epoch Status prepare(TensorQTable *const table); diff --git a/mindspore/ccsrc/dataset/engine/execution_tree.cc b/mindspore/ccsrc/dataset/engine/execution_tree.cc index acd32b24c29..345f9758890 100644 --- a/mindspore/ccsrc/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/dataset/engine/execution_tree.cc @@ -19,9 +19,8 @@ #include "dataset/engine/datasetops/dataset_op.h" #include "dataset/engine/datasetops/shuffle_op.h" #include "dataset/util/task_manager.h" -#include "dataset/util/profiling.h" - -#include "dataset/engine/opt/util/printer_pass.h" +#include "dataset/engine/perf/profiling.h" +#include "dataset/engine/perf/monitor.h" namespace mindspore { namespace dataset { @@ -30,6 +29,8 @@ ExecutionTree::ExecutionTree() : id_count_(0) { tg_ = std::make_unique(); tree_state_ = kDeTStateInit; prepare_flags_ = kDePrepNone; + perf_monitor_ = std::make_unique(this); + profiling_manager_ = std::make_unique(this); } // Destructor @@ -121,6 +122,15 @@ Status ExecutionTree::Launch() { } std::ostringstream ss; ss << *this; + + // Profiling infrastructures need to be initialized before Op launching + if (profiling_manager_->IsProfilingEnable()) { + // Setup profiling manager + RETURN_IF_NOT_OK(profiling_manager_->Initialize()); + // Launch Monitor Thread + RETURN_IF_NOT_OK(tg_->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_))); + } + MS_LOG(DEBUG) << "Printing the tree before launch tasks:\n" << ss.str(); for (auto itr = this->begin(); itr != this->end(); ++itr) { // An inlined operator is one that has an output connector size of 0, and it does not @@ -133,7 +143,9 @@ Status ExecutionTree::Launch() { // Set the state of the Operator as running. This only matters in Leaf ops, CacheOp and TakeOp } } + tree_state_ = kDeTStateExecuting; + return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/execution_tree.h b/mindspore/ccsrc/dataset/engine/execution_tree.h index f0c894f05bf..e1c5e8ff540 100644 --- a/mindspore/ccsrc/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/dataset/engine/execution_tree.h @@ -23,12 +23,14 @@ #include #include "dataset/engine/datasetops/dataset_op.h" #include "dataset/util/status.h" +#include "mindspore/ccsrc/dataset/engine/perf/profiling.h" namespace mindspore { namespace dataset { // Forward declares class TaskGroup; class DatasetOp; +class Monitor; class ExecutionTree { public: @@ -40,11 +42,12 @@ class ExecutionTree { // State flags for the lifecycle of the tree enum TreeState { - kDeTStateInit = 0, // The freshly initialized state after construction - kDeTStateBuilding, // The tree is being built, nodes are being added - kDeTStatePrepare, // The tree has been assigned a root node and is pending prepare - kDeTStateReady, // The tree has been prepared and is ready to be launched - kDeTStateExecuting // The tree has been launched and is executing + kDeTStateInit = 0, // The freshly initialized state after construction + kDeTStateBuilding, // The tree is being built, nodes are being added + kDeTStatePrepare, // The tree has been assigned a root node and is pending prepare + kDeTStateReady, // The tree has been prepared and is ready to be launched + kDeTStateExecuting, // The tree has been launched and is executing + kDeTStateFinished // The tree has been drained, dataset iterator received EOF }; class Iterator { @@ -120,7 +123,7 @@ class ExecutionTree { // Returns an iterator positioned at the start // @return Iterator - The iterator ExecutionTree::Iterator begin(const std::shared_ptr &root = nullptr) const { - return Iterator((root == nullptr) ? root_ : root); + return Iterator(root == nullptr ? root_ : root); } // Returns an iterator positioned at the end @@ -207,6 +210,16 @@ class ExecutionTree { // @return raw pointer to the TaskGroup TaskGroup *AllTasks() const { return tg_.get(); } + // Return if the ExecutionTree is finished (iterator receives EOF). + // @return Bool - true is ExecutionTree is finished + bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; } + + // Set the ExecutionTree to Finished state. + void SetFinished() { tree_state_ = TreeState::kDeTStateFinished; } + + // Getter for profiling manager, no ownership + ProfilingManager *GetProfilingManager() { return profiling_manager_.get(); } + private: // A helper functions for doing the recursive printing // @param dataset_op - The dataset op to print @@ -222,6 +235,8 @@ class ExecutionTree { uint32_t prepare_flags_; // Flags used during tree prepare TreeState tree_state_; // Tracking the current tree state std::stack> repeat_stack_; // A stack used during prepare phase + std::unique_ptr perf_monitor_; // Performance Monitor + std::unique_ptr profiling_manager_; // Profiling manager }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt b/mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt new file mode 100644 index 00000000000..0b67469d2db --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt @@ -0,0 +1,6 @@ +add_library(engine-perf OBJECT + profiling.cc + monitor.cc + device_queue_tracing.cc + connector_size.cc + dataset_iterator_tracing.cc) diff --git a/mindspore/ccsrc/dataset/engine/perf/connector_size.cc b/mindspore/ccsrc/dataset/engine/perf/connector_size.cc new file mode 100644 index 00000000000..862ec51c49b --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/connector_size.cc @@ -0,0 +1,89 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dataset/engine/perf/connector_size.h" + +#include +#include +#include +#include +#include "dataset/core/config_manager.h" +#include "dataset/engine/execution_tree.h" +#include "dataset/util/path.h" + +using json = nlohmann::json; +namespace mindspore { +namespace dataset { +using Qrow = std::vector; + +// Sample action +Status ConnectorSize::Sample() { + Qrow cur_row; + std::transform(tree_->begin(), tree_->end(), std::back_inserter(cur_row), + [](DatasetOp &op) { return op.ConnectorSize(); }); + // Push new row of sample + sample_table_.push_back(cur_row); + return Status::OK(); +} + +// JSON serializer helper function +json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector &size) { + auto children = node.Children(); + std::vector children_id; + std::transform(children.begin(), children.end(), std::back_inserter(children_id), + [](std::shared_ptr op) -> int32_t { return op->id(); }); + json json_node; + json_node["op_id"] = node.id(); + json_node["op_type"] = node.Name(); + json_node["num_workers"] = node.num_workers(); + json metrics; + // DeviceQueueOp is a special op,it is not inlined but its output queue is invalid. + // So we should not output its queue size. + if (!node.inlined() && node.Name() != "DeviceQueueOp") { + metrics["output_queue"] = {{"size", size}, {"length", node.ConnectorCapacity()}}; + } + json_node["metrics"] = metrics; + if (!children_id.empty()) { + json_node["children"] = children_id; + } + + return json_node; +} + +// Save profiling data to file +Status ConnectorSize::SaveToFile() { + std::ofstream os(file_path_, std::ios::trunc); + uint32_t idx = 0; + json output; + std::shared_ptr cfg = GlobalContext::config_manager(); + output["sampling_interval"] = cfg->monitor_sampling_interval(); + // Traverse the ExecutionTree for JSON node generation + for (auto &node : *tree_) { + std::vector cur_queue_size; + std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size), + [&](const ConnectorSizeSample &sample) { return sample[idx]; }); + json json_node = ParseOpInfo(node, cur_queue_size); + output["op_info"].push_back(json_node); + idx++; + } + os << output; + return Status::OK(); +} +Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) { + file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString(); + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/connector_size.h b/mindspore/ccsrc/dataset/engine/perf/connector_size.h new file mode 100644 index 00000000000..910b349bf64 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/connector_size.h @@ -0,0 +1,68 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_QUEUE_DEPTH_H +#define MINDSPORE_QUEUE_DEPTH_H + +#include +#include +#include +#include "dataset/engine/perf/profiling.h" +#include "dataset/engine/datasetops/dataset_op.h" + +using json = nlohmann::json; + +namespace mindspore { +namespace dataset { +class ExecutionTree; + +// Connector size sampling samples the output connector size of each op in the pipeline. +// It support JSON serialization for external usage. +class ConnectorSize : public Sampling { + // Connecto size sampling data is stored as a 2D vector + // op_0 ... op_m + // sample_0 size_0_0 ... size_m_0 + // ... ... ... ... + // sample_n size_0_m ... size_m_n + // + // A circular buffer will be implemented in the future to make this table more flexible. + using ConnectorSizeSample = std::vector; + using ConnectorSizeSampleTable = std::vector; + + public: + explicit ConnectorSize(ExecutionTree *tree) : tree_(tree) {} + + // Driver function for connector size sampling. + // This function samples the connector size of every nodes within the ExecutionTree + Status Sample() override; + + std::string Name() const override { return kDeviceQueueTracingName; }; + + // Save sampling data to file + // @return Status - The error code return + Status SaveToFile() override; + + Status Init(const std::string &dir_path, const std::string &device_id); + + // Parse op infomation and transform to json format + json ParseOpInfo(const DatasetOp &node, const std::vector &size); + + private: + ExecutionTree *tree_ = nullptr; // ExecutionTree pointer + ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling +}; +} // namespace dataset +} // namespace mindspore +#endif // MINDSPORE_QUEUE_DEPTH_H diff --git a/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.cc b/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.cc new file mode 100644 index 00000000000..99b0c2d7e08 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.cc @@ -0,0 +1,64 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include "dataset/engine/perf/dataset_iterator_tracing.h" +#include "dataset/util/path.h" + +namespace mindspore { +namespace dataset { + +Status DatasetIteratorTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, + const int32_t value) { + // Format: "type extra-info batch-num value" + // type: 0: time, 1: connector size + // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time + // if type is 1 - connector capacity + // batch-num: batch number + // value: if type is 0 - value is time(ms) + // if type is 1 - value is connector size + // Examples: + // 0 0 20 10 - The 20th batch took 10ms to get data from pipeline. + // 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64. + std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " + + std::to_string(value); + value_.emplace_back(data); + return Status::OK(); +} + +Status DatasetIteratorTracing::SaveToFile() { + if (value_.empty()) { + return Status::OK(); + } + + std::ofstream handle(file_path_, std::ios::trunc); + if (!handle.is_open()) { + RETURN_STATUS_UNEXPECTED("Profiling file can not be opened."); + } + for (auto value : value_) { + handle << value << "\n"; + } + handle.close(); + + return Status::OK(); +} + +Status DatasetIteratorTracing::Init(const std::string &dir_path, const std::string &device_id) { + file_path_ = (Path(dir_path) / Path("dataset_iterator_profiling_" + device_id + ".txt")).toString(); + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h b/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h new file mode 100644 index 00000000000..4967d3a747c --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h @@ -0,0 +1,51 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_DATASET_ITERATOR_TRACING_H +#define MINDSPORE_DATASET_ITERATOR_TRACING_H + +#include +#include +#include "dataset/engine/perf/profiling.h" + +namespace mindspore { +namespace dataset { +class DatasetIteratorTracing : public Tracing { + public: + // Constructor + DatasetIteratorTracing() = default; + + // Destructor + ~DatasetIteratorTracing() = default; + + // Record tracing data + // @return Status - The error code return + Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value); + + std::string Name() const override { return kDatasetIteratorTracingName; }; + + // Save tracing data to file + // @return Status - The error code return + Status SaveToFile() override; + + Status Init(const std::string &dir_path, const std::string &device_id); + + private: + std::vector value_; +}; +} // namespace dataset +} // namespace mindspore + +#endif // MINDSPORE_DATASET_ITERATOR_TRACING_H diff --git a/mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.cc b/mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.cc new file mode 100644 index 00000000000..de5fad66bac --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.cc @@ -0,0 +1,64 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "dataset/engine/perf/device_queue_tracing.h" +#include "dataset/util/path.h" +namespace mindspore { +namespace dataset { + +Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, + const int32_t value) { + // Format: "type extra-info batch-num value" + // type: 0: time, 1: connector size + // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time + // if type is 1 - connector capacity + // batch-num: batch number + // value: if type is 0 - value is time(ms) + // if type is 1 - value is connector size + // Examples: + // 0 0 20 10 - The 20th batch took 10ms to get data from pipeline. + // 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64. + std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " + + std::to_string(value); + value_.emplace_back(data); + return Status::OK(); +} + +Status DeviceQueueTracing::SaveToFile() { + if (value_.empty()) { + return Status::OK(); + } + + std::ofstream handle(file_path_, std::ios::trunc); + if (!handle.is_open()) { + RETURN_STATUS_UNEXPECTED("Profiling file can not be opened."); + } + for (auto value : value_) { + handle << value << "\n"; + } + handle.close(); + + return Status::OK(); +} + +Status DeviceQueueTracing::Init(const std::string &dir_path, const std::string &device_id) { + file_path_ = (Path(dir_path) / Path("critical_point_profiling_" + device_id + ".txt")).toString(); + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.h b/mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.h new file mode 100644 index 00000000000..54638b6f429 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.h @@ -0,0 +1,52 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_DEVICE_QUEUE_TRACING_H +#define MINDSPORE_DEVICE_QUEUE_TRACING_H + +#include +#include +#include "dataset/engine/perf/profiling.h" + +namespace mindspore { +namespace dataset { +class DeviceQueueTracing : public Tracing { + public: + // Constructor + DeviceQueueTracing() = default; + + // Destructor + ~DeviceQueueTracing() = default; + + // Record tracing data + // @return Status - The error code return + Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value); + + std::string Name() const override { return "Device Queue Tracing"; }; + + // Save tracing data to file + // @return Status - The error code return + Status SaveToFile() override; + + Status Init(const std::string &dir_path, const std::string &device_id); + + private: + std::vector value_; +}; +} // namespace dataset +} // namespace mindspore + +#endif // MINDSPORE_DEVICE_QUEUE_TRACING_H diff --git a/mindspore/ccsrc/dataset/engine/perf/monitor.cc b/mindspore/ccsrc/dataset/engine/perf/monitor.cc new file mode 100644 index 00000000000..704a0e8c96c --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/monitor.cc @@ -0,0 +1,50 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "dataset/core/config_manager.h" +#include "dataset/engine/perf/monitor.h" +#include "dataset/engine/execution_tree.h" + +namespace mindspore { +namespace dataset { + +Monitor::Monitor(ExecutionTree *tree) : tree_(tree) { + std::shared_ptr cfg = GlobalContext::config_manager(); + sampling_interval_ = cfg->monitor_sampling_interval(); +} + +Status Monitor::operator()() { + // Register this thread with TaskManager to receive proper interrupt signal. + TaskManager::FindMe()->Post(); + + // Keep sampling if + // 1) Monitor Task is not interrupted by TaskManager AND + // 2) Iterator has not received EOF + while (!this_thread::is_interrupted() && !(tree_->isFinished())) { + for (auto &node : tree_->GetProfilingManager()->GetSamplingNodes()) { + RETURN_IF_NOT_OK(node.second->Sample()); + std::this_thread::sleep_for(std::chrono::milliseconds(sampling_interval_)); + } + } + + // Output all profiling data upon request. + tree_->GetProfilingManager()->SaveProfilingData(); + return Status::OK(); +} + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/monitor.h b/mindspore/ccsrc/dataset/engine/perf/monitor.h new file mode 100644 index 00000000000..de6615f7cdb --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/monitor.h @@ -0,0 +1,52 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_MONITOR_H +#define MINDSPORE_MONITOR_H + +#include +#include +#include +#include "dataset/util/status.h" +#include "dataset/engine/perf/profiling.h" + +namespace mindspore { +namespace dataset { +class ExecutionTree; +class Monitor { + public: + // Monitor object constructor + explicit Monitor(ExecutionTree *tree); + + Monitor() = default; + + // Functor for Perf Monitor main loop. + // This function will be the entry point of Mindspore::Dataset::Task + Status operator()(); + + int64_t GetSamplingInterval() { return sampling_interval_; } + + private: + int64_t cur_row_; + int64_t max_samples_; + int64_t sampling_interval_; + ExecutionTree *tree_; + std::vector> sampling_list_; +}; +} // namespace dataset +} // namespace mindspore + +#endif // MINDSPORE_MONITOR_H diff --git a/mindspore/ccsrc/dataset/engine/perf/profiling.cc b/mindspore/ccsrc/dataset/engine/perf/profiling.cc new file mode 100644 index 00000000000..4786b8dd694 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/profiling.cc @@ -0,0 +1,153 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dataset/engine/perf/profiling.h" + +#include +#include +#include +#include "common/utils.h" +#include "dataset/util/path.h" +#include "dataset/engine/perf/monitor.h" +#include "dataset/engine/perf/device_queue_tracing.h" +#include "dataset/engine/perf/connector_size.h" +#include "dataset/engine/perf/dataset_iterator_tracing.h" +#include "utils/log_adapter.h" + +namespace mindspore { +namespace dataset { + +bool ProfilingManager::IsProfilingEnable() const { + auto profiling = common::GetEnv("PROFILING_MODE"); + if (profiling.empty() || profiling != "true") { + return false; + } + return true; +} + +Status ProfilingManager::Initialize() { + // Register nodes based on config + std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR"); + if (dir.empty()) { + RETURN_STATUS_UNEXPECTED("Profiling dir is not set."); + } + char real_path[PATH_MAX] = {0}; + if (dir.size() >= PATH_MAX) { + RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); + } +#if defined(_WIN32) || defined(_WIN64) + if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) { + RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); + } +#else + if (realpath(common::SafeCStr(dir), real_path) == nullptr) { + RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); + } +#endif + dir_path_ = real_path; + + // If DEVICE_ID is not set,defult value is 0 + device_id_ = common::GetEnv("DEVICE_ID"); + if (device_id_.empty()) { + device_id_ = "0"; + } + + // Register all profiling node. + // device_queue node is used for graph mode + std::shared_ptr device_queue_tracing = std::make_shared(); + RETURN_IF_NOT_OK(RegisterTracingNode(device_queue_tracing)); + // dataset_iterator node is used for graph mode + std::shared_ptr dataset_iterator_tracing = std::make_shared(); + RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing)); + + std::shared_ptr monitor_sampling = std::make_shared(tree_); + RETURN_IF_NOT_OK(RegisterSamplingNode(monitor_sampling)); + + return Status::OK(); +} + +// Profiling node registration +Status ProfilingManager::RegisterTracingNode(std::shared_ptr node) { + // Check if node with the same name has already been registered. + auto exist = tracing_nodes_.find(node->Name()); + if (exist != tracing_nodes_.end()) { + return Status(StatusCode::kProfilingError, "Profiling node already exist: " + node->Name()); + } + // Register the node with its name as key. + RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_)); + tracing_nodes_[node->Name()] = node; + return Status::OK(); +} + +// Profiling node getter +Status ProfilingManager::GetTracingNode(const std::string &name, std::shared_ptr *node) { + // Check if node with the same name has already been registered. + auto exist = tracing_nodes_.find(name); + if (exist == tracing_nodes_.end()) { + return Status(StatusCode::kProfilingError, "Profiling node does not exist: " + name); + } + // Fetch node. + *node = tracing_nodes_[name]; + return Status::OK(); +} + +// Profiling node registration +Status ProfilingManager::RegisterSamplingNode(std::shared_ptr node) { + // Check if node with the same name has already been registered. + auto exist = sampling_nodes_.find(node->Name()); + if (exist != sampling_nodes_.end()) { + return Status(StatusCode::kProfilingError, "Profiling node already exist: " + node->Name()); + } + // Register the node with its name as key. + RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_)); + sampling_nodes_[node->Name()] = node; + return Status::OK(); +} + +// Profiling node getter +Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_ptr *node) { + // Check if node with the same name has already been registered. + auto exist = sampling_nodes_.find(name); + if (exist == sampling_nodes_.end()) { + return Status(StatusCode::kProfilingError, "Profiling node does not exist: " + name); + } + // Fetch node. + *node = sampling_nodes_[name]; + return Status::OK(); +} + +Status ProfilingManager::SaveProfilingData() { + if (!IsProfilingEnable()) { + return Status::OK(); + } + MS_LOG(INFO) << "Start to save profiling data."; + for (auto node : tracing_nodes_) { + RETURN_IF_NOT_OK(node.second->SaveToFile()); + } + for (auto node : sampling_nodes_) { + RETURN_IF_NOT_OK(node.second->SaveToFile()); + } + MS_LOG(INFO) << "Save profiling data end."; + + return Status::OK(); +} + +double ProfilingTime::GetCurMilliSecond() { + struct timeval tv = {0, 0}; + (void)gettimeofday(&tv, nullptr); + return tv.tv_sec * 1000 + tv.tv_usec / 1000; +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/profiling.h b/mindspore/ccsrc/dataset/engine/perf/profiling.h new file mode 100644 index 00000000000..787ea688ba7 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/profiling.h @@ -0,0 +1,140 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_PROFILE_H_ +#define DATASET_UTIL_PROFILE_H_ + +#include +#include +#include +#include +#include "dataset/util/status.h" + +namespace mindspore { +namespace dataset { + +class Monitor; +class ExecutionTree; + +const char kDeviceQueueTracingName[] = "Device Queue Tracing"; +const char kDatasetIteratorTracingName[] = "Dataset Iterator Tracing"; +const char kConnectorSizeSamplingName[] = "Connector Size Sampling"; + +// Profiling is a class of basic unit of profiling action +// This base class encapsulate the serialization output logic +class Profiling : std::enable_shared_from_this { + public: + // Constructor + Profiling() = default; + + // Destructor + virtual ~Profiling() = default; + + virtual Status Init(const std::string &dir_path, const std::string &device_id) = 0; + + // Default serialization file generator + virtual Status SaveToFile() = 0; + + // Profiling name + virtual std::string Name() const = 0; + + protected: + std::string file_path_; +}; + +// Sampling is a class of profiling which generate samples periodically. +class Sampling : public Profiling { + public: + // Sampling action function. This function will be invoked by performance monitor thread. + virtual Status Sample() = 0; +}; + +// Tracing is class of profiling which record samples upon request. +class Tracing : public Profiling { + // Tracing does not define a fixed interface to provide flexible on data recording. +}; + +// ProfilingManager is a class manages all profiling infrastructure +// It serves the following purposes: +// 1) Fetch profiling configs from global contexts +// 2) Setup all profiling node based on config +// 3) Provide access of profiling nodes for profiling actions +// 4) Manage profiling data serialization process +class ProfilingManager { + public: + explicit ProfilingManager(ExecutionTree *tree) : tree_(tree) {} + + ~ProfilingManager() = default; + + Status Initialize(); + + // Save profile data to file + // @return Status - The error code return + Status SaveProfilingData(); + + // Sampling node getter + // @param name - The name of the requested node + // @param node - Pointer to the shared pointer for the Sampling node + // @return Status - The error code return + Status GetSamplingNode(const std::string &name, std::shared_ptr *node); + + // Tracing node getter + // @param name - The name of the requested node + // @param node - Pointer to the shared pointer for the Tracing node + // @return Status - The error code return + Status GetTracingNode(const std::string &name, std::shared_ptr *node); + + // If profiling is enabled. + bool IsProfilingEnable() const; + + std::unordered_map> &GetSamplingNodes() { return sampling_nodes_; } + + private: + std::unordered_map> tracing_nodes_; + + std::unordered_map> sampling_nodes_; + + // Register profile node to tree + // @param node - Profiling node + // @return Status - The error code return + Status RegisterTracingNode(std::shared_ptr node); + + // Register profile node to tree + // @param node - Profiling node + // @return Status - The error code return + Status RegisterSamplingNode(std::shared_ptr node); + + ExecutionTree *tree_ = nullptr; // ExecutionTree pointer + std::string dir_path_; // where to create profiling file + std::string device_id_; // used when create profiling file,filename_deviceid.suffix +}; + +enum ProfilingType { TIME, CONNECTOR_DEPTH }; + +enum ProfilingTimeSubType { + PIPELINE_TIME, + TDT_PUSH_TIME, + BATCH_TIME, + INVALID_TIME, +}; + +class ProfilingTime { + public: + static double GetCurMilliSecond(); +}; + +} // namespace dataset +} // namespace mindspore +#endif diff --git a/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.cc b/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.cc index e457de52ae8..51fff6bbd20 100644 --- a/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.cc +++ b/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.cc @@ -16,6 +16,7 @@ #include "dataset/engine/tdt/tdt_plugin.h" #include "common/utils.h" #include "utils/log_adapter.h" +#include "dataset/engine/perf/profiling.h" namespace mindspore { namespace dataset { @@ -28,18 +29,26 @@ std::shared_ptr TdtPlugin::GetInstance() { return instance_ptr_; } -TdtStatus TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name) { +TdtStatus TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time) { MS_LOG(INFO) << "TDT channel name is " << channel_name << "."; std::vector items; + double start_time; auto ret = translate(ts_row, items); if (ret != SUCCESS) { MS_LOG(ERROR) << "TDT converting tensor failed!"; return FAILED; } + if (profiling) { + start_time = ProfilingTime::GetCurMilliSecond(); + } if (tdt::TdtHostPushData(channel_name, items) != 0) { MS_LOG(ERROR) << "TDT pushing data failed!"; return FAILED; } + if (profiling) { + double end_time = ProfilingTime::GetCurMilliSecond(); + time = (int32_t)(end_time - start_time); + } return SUCCESS; } diff --git a/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.h b/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.h index a25deb4aab3..03c118b59c2 100644 --- a/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.h +++ b/mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.h @@ -37,7 +37,7 @@ class TdtPlugin { public: static std::shared_ptr GetInstance(); - TdtStatus hostPush(TensorRow ts_row, bool is_wait, std::string channel_name); + TdtStatus hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time); private: TdtPlugin() {} diff --git a/mindspore/ccsrc/dataset/util/CMakeLists.txt b/mindspore/ccsrc/dataset/util/CMakeLists.txt index 6f5e37e88f3..b36d612435a 100644 --- a/mindspore/ccsrc/dataset/util/CMakeLists.txt +++ b/mindspore/ccsrc/dataset/util/CMakeLists.txt @@ -14,5 +14,4 @@ add_library(utils OBJECT status.cc path.cc wait_post.cc - sig_handler.cc - profiling.cc) + sig_handler.cc) diff --git a/mindspore/ccsrc/dataset/util/profiling.cc b/mindspore/ccsrc/dataset/util/profiling.cc deleted file mode 100644 index 8d9aad74f2c..00000000000 --- a/mindspore/ccsrc/dataset/util/profiling.cc +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright 2020 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "dataset/util/profiling.h" - -#include -#include -#include -#include "dataset/util/path.h" -#include "common/utils.h" -#include "utils/log_adapter.h" - -namespace mindspore { -namespace dataset { -Profiling::Profiling(const std::string &file_name, const int32_t device_id) - : file_name_(file_name), device_id_(device_id) {} - -Status Profiling::Init() { - std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR"); - if (dir.empty()) { - RETURN_STATUS_UNEXPECTED("Profiling dir is not set."); - } - char real_path[PATH_MAX] = {0}; - if (dir.size() >= PATH_MAX) { - RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); - } -#if defined(_WIN32) || defined(_WIN64) - if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) { - RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); - } -#else - if (realpath(common::SafeCStr(dir), real_path) == nullptr) { - RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); - } -#endif - file_path_ = (Path(real_path) / Path(file_name_ + "_" + std::to_string(device_id_) + ".txt")).toString(); - return Status::OK(); -} - -Status Profiling::Record(const std::string &data) { - value_.emplace_back(data); - return Status::OK(); -} - -Status Profiling::SaveToFile() { - if (file_name_.empty()) { - RETURN_STATUS_UNEXPECTED("Profiling file name has not been set."); - } - std::ofstream handle(file_path_, std::ios::app); - if (!handle.is_open()) { - RETURN_STATUS_UNEXPECTED("Profiling file can not be opened."); - } - for (auto value : value_) { - handle << value << "\n"; - } - handle.close(); - - return Status::OK(); -} - -ProfilingManager &ProfilingManager::GetInstance() { - static ProfilingManager instance; - return instance; -} - -bool ProfilingManager::IsProfilingEnable() const { - auto profiling = common::GetEnv("PROFILING_MODE"); - if (profiling.empty() || profiling != "true") { - return false; - } - - return true; -} - -Status ProfilingManager::RegisterProfilingNode(std::shared_ptr *node) { - RETURN_IF_NOT_OK((*node)->Init()); - profiling_node_.emplace_back(*node); - return Status::OK(); -} - -Status ProfilingManager::SaveProfilingData() { - if (!IsProfilingEnable()) { - return Status::OK(); - } - MS_LOG(INFO) << "Start to save profile data."; - for (auto node : profiling_node_) { - RETURN_IF_NOT_OK(node->SaveToFile()); - } - MS_LOG(INFO) << "Save profile data end."; - - return Status::OK(); -} - -double ProfilingTime::GetCurMilliSecond() { - struct timeval tv = {0, 0}; - (void)gettimeofday(&tv, nullptr); - return tv.tv_sec * 1000 + tv.tv_usec / 1000; -} -} // namespace dataset -} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/profiling.h b/mindspore/ccsrc/dataset/util/profiling.h deleted file mode 100644 index 4de2a1e052f..00000000000 --- a/mindspore/ccsrc/dataset/util/profiling.h +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright 2020 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef DATASET_UTIL_PROFILE_H_ -#define DATASET_UTIL_PROFILE_H_ - -#include -#include -#include -#include "dataset/util/status.h" - -namespace mindspore { -namespace dataset { -enum ProfilingType { - TIME, - CONNECTOR_DEPTH, -}; - -enum ProfilingTimeSubType { - PIPELINE_TIME, - TDT_PUSH_TIME, - BATCH_TIME, - INVALID_TIME, -}; - -class Profiling { - public: - // Constructor - Profiling() = default; - - // Constructor if need save profile data to file - Profiling(const std::string &file_name, const int32_t device_id); - - // Destructor - ~Profiling() = default; - - Status Init(); - - // Record profile data - Status Record(const std::string &data); - - // Save profile data to file if necessary - Status SaveToFile(); - - private: - std::vector value_; - std::string file_name_; - std::string file_path_; - int32_t device_id_; -}; - -class ProfilingManager { - public: - ProfilingManager() = default; - ~ProfilingManager() = default; - - static ProfilingManager &GetInstance(); - - // Save profile data to file - // @return Status - The error code return - Status SaveProfilingData(); - - // Register profile node to tree - // @param node - Profiling node - // @return Status - The error code return - Status RegisterProfilingNode(std::shared_ptr *node); - - bool IsProfilingEnable() const; - - private: - std::vector> profiling_node_; -}; - -class ProfilingTime { - public: - static double GetCurMilliSecond(); -}; -} // namespace dataset -} // namespace mindspore -#endif diff --git a/mindspore/ccsrc/dataset/util/status.cc b/mindspore/ccsrc/dataset/util/status.cc index 84d8ee582c9..27e9dfbc83b 100644 --- a/mindspore/ccsrc/dataset/util/status.cc +++ b/mindspore/ccsrc/dataset/util/status.cc @@ -45,6 +45,9 @@ std::string CodeAsString(const StatusCode c) { case StatusCode::kDuplicateKey: s = "Duplicate key"; break; + case StatusCode::kProfilingError: + s = "Error encountered while profiling"; + break; case StatusCode::kUnexpectedError: default: s = "Unexpected error"; diff --git a/mindspore/ccsrc/dataset/util/status.h b/mindspore/ccsrc/dataset/util/status.h index 38ed1fef899..a98b537b56c 100644 --- a/mindspore/ccsrc/dataset/util/status.h +++ b/mindspore/ccsrc/dataset/util/status.h @@ -70,6 +70,7 @@ enum class StatusCode : char { kPythonInterpreterFailure = 7, kTDTPushFailure = 8, kFileNotExist = 9, + kProfilingError = 10, // Make this error code the last one. Add new error code above it. kUnexpectedError = 127 }; diff --git a/mindspore/dataset/core/configuration.py b/mindspore/dataset/core/configuration.py index 38b25368b34..d3175cd1819 100644 --- a/mindspore/dataset/core/configuration.py +++ b/mindspore/dataset/core/configuration.py @@ -125,6 +125,35 @@ class ConfigurationManager: """ return self.config.get_num_parallel_workers() + def set_monitor_sampling_interval(self, interval): + """ + Set the default interval(ms) of monitor sampling. + + Args: + interval: interval(ms) to be used to performance monitor sampling. + + Raises: + ValueError: If interval is invalid (<= 0 or > MAX_INT_32). + + Examples: + >>> import mindspore.dataset as ds + >>> con = ds.engine.ConfigurationManager() + >>> # sets the new interval value. + >>> con.set_monitor_sampling_interval(100) + """ + if interval <= 0 or interval > INT32_MAX: + raise ValueError("Interval given is not within the required range") + self.config.set_monitor_sampling_interval(interval) + + def get_monitor_sampling_interval(self): + """ + Get the default interval of performance monitor sampling. + + Returns: + Interval: interval(ms) of performance monitor sampling. + """ + return self.config.get_monitor_sampling_interval() + def __str__(self): """ String representation of the configurations. diff --git a/tests/ut/python/dataset/test_profiling.py b/tests/ut/python/dataset/test_profiling.py new file mode 100644 index 00000000000..fca0a4c1dcc --- /dev/null +++ b/tests/ut/python/dataset/test_profiling.py @@ -0,0 +1,119 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Testing profiling support in DE +""" +import os +import numpy as np +import mindspore.dataset as ds + +FILES = ["../data/dataset/testTFTestAllTypes/test.data"] +DATASET_ROOT = "../data/dataset/testTFTestAllTypes/" +SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json" + +PIPELINE_FILE = "./pipeline_profiling_1.json" +DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt" + + +def test_profiling_simple_pipeline(): + """ + Generator -> Shuffle -> Batch + """ + os.environ['PROFILING_MODE'] = 'true' + os.environ['MINDDATA_PROFILING_DIR'] = '.' + os.environ['DEVICE_ID'] = '1' + + source = [(np.array([x]),) for x in range(1024)] + data1 = ds.GeneratorDataset(source, ["data"]) + data1 = data1.shuffle(64) + data1 = data1.batch(32) + + for _ in data1: + pass + + assert os.path.exists(PIPELINE_FILE) is True + os.remove(PIPELINE_FILE) + assert os.path.exists(DATASET_ITERATOR_FILE) is True + os.remove(DATASET_ITERATOR_FILE) + del os.environ['PROFILING_MODE'] + del os.environ['MINDDATA_PROFILING_DIR'] + + +def test_profiling_complex_pipeline(): + """ + Generator -> Map -> + -> Zip -> Batch + TFReader -> Shuffle -> + """ + os.environ['PROFILING_MODE'] = 'true' + os.environ['MINDDATA_PROFILING_DIR'] = '.' + os.environ['DEVICE_ID'] = '1' + + source = [(np.array([x]),) for x in range(1024)] + data1 = ds.GeneratorDataset(source, ["gen"]) + data1 = data1.map("gen", operations=[(lambda x: x + 1)]) + + pattern = DATASET_ROOT + "/test.data" + data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES) + data2 = data2.shuffle(4) + + data3 = ds.zip((data1, data2)) + + for _ in data3: + pass + + assert os.path.exists(PIPELINE_FILE) is True + os.remove(PIPELINE_FILE) + assert os.path.exists(DATASET_ITERATOR_FILE) is True + os.remove(DATASET_ITERATOR_FILE) + del os.environ['PROFILING_MODE'] + del os.environ['MINDDATA_PROFILING_DIR'] + + +def test_profiling_sampling_iterval(): + """ + Test non-default monitor sampling interval + """ + os.environ['PROFILING_MODE'] = 'true' + os.environ['MINDDATA_PROFILING_DIR'] = '.' + os.environ['DEVICE_ID'] = '1' + interval_origin = ds.config.get_monitor_sampling_interval() + + ds.config.set_monitor_sampling_interval(30) + interval = ds.config.get_monitor_sampling_interval() + assert interval == 30 + + source = [(np.array([x]),) for x in range(1024)] + data1 = ds.GeneratorDataset(source, ["data"]) + data1 = data1.shuffle(64) + data1 = data1.batch(32) + + for _ in data1: + pass + + assert os.path.exists(PIPELINE_FILE) is True + os.remove(PIPELINE_FILE) + assert os.path.exists(DATASET_ITERATOR_FILE) is True + os.remove(DATASET_ITERATOR_FILE) + + ds.config.set_monitor_sampling_interval(interval_origin) + del os.environ['PROFILING_MODE'] + del os.environ['MINDDATA_PROFILING_DIR'] + + +if __name__ == "__main__": + test_profiling_simple_pipeline() + test_profiling_complex_pipeline() + test_profiling_sampling_iterval()