diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc index 7f44027c5ae..737d20b4fbb 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/engine/ir/datasetops/source/bindings.cc @@ -191,18 +191,19 @@ PYBIND_REGISTER( PYBIND_REGISTER(GeneratorNode, 2, ([](const py::module *m) { (void)py::class_>( *m, "GeneratorNode", "to create a GeneratorNode") - .def( - py::init([](py::function generator_function, const std::vector &column_names, - const std::vector &column_types, int64_t dataset_len, py::handle sampler) { - auto gen = std::make_shared(generator_function, column_names, column_types, - dataset_len, toSamplerObj(sampler)); - THROW_IF_ERROR(gen->ValidateParams()); - return gen; - })) - .def(py::init([](py::function generator_function, const std::shared_ptr schema, - int64_t dataset_len, py::handle sampler) { + .def(py::init([](py::function generator_function, const std::vector &column_names, + const std::vector &column_types, int64_t dataset_len, py::handle sampler, + uint32_t num_parallel_workers) { auto gen = - std::make_shared(generator_function, schema, dataset_len, toSamplerObj(sampler)); + std::make_shared(generator_function, column_names, column_types, dataset_len, + toSamplerObj(sampler), num_parallel_workers); + THROW_IF_ERROR(gen->ValidateParams()); + return gen; + })) + .def(py::init([](py::function generator_function, const std::shared_ptr schema, + int64_t dataset_len, py::handle sampler, uint32_t num_parallel_workers) { + auto gen = std::make_shared(generator_function, schema, dataset_len, + toSamplerObj(sampler), num_parallel_workers); THROW_IF_ERROR(gen->ValidateParams()); return gen; })); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 497537051f0..a82025244f9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -171,8 +171,10 @@ Status DeviceQueueOp::operator()() { Status DeviceQueueOp::SendDataToAscend() { MS_LOG(INFO) << "Device queue, sending data to Ascend."; #ifndef ENABLE_SECURITY - uint64_t batch_start_time, end_time; - uint64_t batch_record_start, batch_record_end; + uint64_t batch_start_time = 0; + uint64_t end_time = 0; + uint64_t batch_record_start = 0; + uint64_t batch_record_end = 0; #endif int64_t send_batch = 0; int32_t tdt_cost = 0; @@ -221,11 +223,9 @@ Status DeviceQueueOp::SendDataToAscend() { #ifndef ENABLE_SECURITY DetectPerBatchTime(&batch_record_start, &batch_record_end); #endif + PrintBeginInfoWhenFirstBatch(first_push_flag_); RETURN_IF_NOT_OK(SendRowToTdt(curr_row, is_profiling_enable, &tdt_cost)); - if (first_push_flag_ != true) { - MS_LOG(INFO) << "Loading dataset and push first batch into device successful."; - first_push_flag_ = true; - } + PrintEndInfoWhenFirstBatch(&first_push_flag_); #ifndef ENABLE_SECURITY ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time, connector_capacity, connector_size); @@ -581,11 +581,9 @@ Status DeviceQueueOp::SendDataToGPU() { #ifndef ENABLE_SECURITY DetectPerBatchTime(&batch_record_start, &batch_record_end); #endif + PrintBeginInfoWhenFirstBatch(first_push_flag_); RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row))); - if (first_push_flag_ != true) { - MS_LOG(INFO) << "Loading dataset and push first batch into device successful."; - first_push_flag_ = true; - } + PrintEndInfoWhenFirstBatch(&first_push_flag_); #ifndef ENABLE_SECURITY batch_record_start = ProfilingTime::GetCurMilliSecond(); #endif @@ -727,6 +725,23 @@ void DeviceQueueOp::DetectPerBatchTime(uint64_t *start_time, uint64_t *end_time) " performance(with creating dataset iterator) and optimize it."; } } + +void DeviceQueueOp::PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) { + if (first_push_flag != true) { + MS_LOG(INFO) << "Loading dataset and begin to push first batch into device ..."; + } +} + +void DeviceQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) { + if (!first_push_flag) { + MS_LOG(WARNING) << "First batch flag: first_push_flag is nullptr"; + return; + } + if (*first_push_flag != true) { + MS_LOG(INFO) << "Loading dataset and push first batch into device successful."; + *first_push_flag = true; + } +} #endif } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index c5bdf732852..33d47883d55 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -119,10 +119,18 @@ class DeviceQueueOp : public PipelineOp { // Description: Auto filter metadata column before sending to device. Status FilterMetadata(TensorRow *row); - // Name: checkExceptions(TensorRow); - // Description: Check whether the TensorRow meets the condition for performing DeviceQueueOp + // Name: CheckExceptions(TensorRow); + // Description: Check whether the TensorRow meets the condition for performing DeviceQueueOp Status CheckExceptions(const TensorRow &row) const; + // Name: PrintBeginInfoWhenFirstBatch(bool) + // Description: Print info when first batch begin to send in sink_mode + void PrintBeginInfoWhenFirstBatch(const bool &first_push_flag); + + // Name: PrintEndInfoWhenFirstBatch(bool) + // Description: Print info when first batch send successful in sink_mode + void PrintEndInfoWhenFirstBatch(bool *first_push_flag); + private: #ifdef ENABLE_TDTQUE void WaitContinueSignal() const; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc index ea9b0c26c23..a01191e35d9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc @@ -14,9 +14,10 @@ * limitations under the License. */ #include "minddata/dataset/engine/datasetops/source/generator_op.h" -#include -#include "minddata/dataset/core/global_context.h" +#include + +#include "minddata/dataset/core/global_context.h" #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/task_manager.h" @@ -24,13 +25,14 @@ namespace mindspore { namespace dataset { GeneratorOp::GeneratorOp(py::function generator_function, std::vector column_names, std::vector column_types, int32_t prefetch_size, int32_t connector_size, - std::shared_ptr sampler) + std::shared_ptr sampler, uint32_t num_parallel_workers) : PipelineOp(connector_size, std::move(sampler)), generator_function_(generator_function), column_names_(column_names), column_types_(std::move(column_types)), prefetch_size_(prefetch_size), - generator_counter_(0) {} + generator_counter_(0), + num_parallel_workers_(num_parallel_workers) {} void GeneratorOp::Print(std::ostream &out, bool show_all) const { if (!show_all) { @@ -174,7 +176,15 @@ Status GeneratorOp::operator()() { return Status(StatusCode::kMDPythonInterpreterFailure, "Python Interpreter is finalized"); } try { + auto start = ProfilingTime::GetCurMilliSecond(); RETURN_IF_NOT_OK(PyRowToTensorRow(generator_.attr("__next__")(), &new_row)); + auto end = ProfilingTime::GetCurMilliSecond(); + if ((end - start) / num_parallel_workers_ > kGetItemTimeOutMilliSeconds) { + MS_LOG(WARNING) << "Bad performance attention, it takes more than 25 seconds to generator.__next__ new row, " + "which might cause `GetNext` timeout problem when sink_mode=True. You can increase the " + "parameter num_parallel_workers in GeneratorDataset / optimize the efficiency of " + "obtaining samples in the user-defined generator function."; + } generator_counter_++; } catch (py::error_already_set &e) { eoe = e.matches(PyExc_StopIteration); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h index 051b9eec520..4272ebda684 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h @@ -36,11 +36,13 @@ namespace mindspore { namespace dataset { #pragma GCC visibility push(hidden) +constexpr int32_t kGetItemTimeOutMilliSeconds = 25000; + class GeneratorOp : public PipelineOp, public RandomAccessOp { public: GeneratorOp(py::function generator_function, std::vector column_names, std::vector column_types, int32_t prefetch_size, int32_t connector_size, - std::shared_ptr sampler); + std::shared_ptr sampler, uint32_t num_parallel_workers); ~GeneratorOp() = default; @@ -81,6 +83,7 @@ class GeneratorOp : public PipelineOp, public RandomAccessOp { std::vector column_types_; int32_t prefetch_size_; int64_t generator_counter_; + uint32_t num_parallel_workers_; py::object generator_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc index 12215fa93ea..c828170e890 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc @@ -16,6 +16,8 @@ #include "minddata/dataset/engine/ir/datasetops/source/generator_node.h" +#include + #include "minddata/dataset/engine/datasetops/repeat_op.h" #include "minddata/dataset/engine/datasetops/source/generator_op.h" #include "minddata/dataset/engine/opt/pass.h" @@ -26,30 +28,33 @@ namespace dataset { GeneratorNode::GeneratorNode(py::function generator_function, const std::vector &column_names, const std::vector &column_types, int64_t source_len, - std::shared_ptr sampler) + std::shared_ptr sampler, uint32_t num_parallel_workers) : MappableSourceNode(), generator_function_(generator_function), column_names_(column_names), column_types_(column_types), reset_ancestor_(nullptr), sampler_(std::move(sampler)), - source_len_(source_len) {} + source_len_(source_len), + num_parallel_workers_(num_parallel_workers) {} GeneratorNode::GeneratorNode(py::function generator_function, const std::shared_ptr &schema, - int64_t source_len, std::shared_ptr sampler) + int64_t source_len, std::shared_ptr sampler, uint32_t num_parallel_workers) : MappableSourceNode(), generator_function_(generator_function), schema_(schema), reset_ancestor_(nullptr), sampler_(std::move(sampler)), - source_len_(source_len) {} + source_len_(source_len), + num_parallel_workers_(num_parallel_workers) {} std::shared_ptr GeneratorNode::Copy() { std::shared_ptr node; if (schema_ == nullptr) { - node = std::make_shared(generator_function_, column_names_, column_types_, source_len_, sampler_); + node = std::make_shared(generator_function_, column_names_, column_types_, source_len_, sampler_, + num_parallel_workers_); } else { - node = std::make_shared(generator_function_, schema_, source_len_, sampler_); + node = std::make_shared(generator_function_, schema_, source_len_, sampler_, num_parallel_workers_); } return node; } @@ -78,8 +83,8 @@ Status GeneratorNode::Build(std::vector> *const node_ // GeneratorOp's constructor takes in a prefetch_size, which isn't being set by user nor is it being used by // GeneratorOp internally. Here it is given a zero which is the default in generator builder - std::shared_ptr op = std::make_shared(generator_function_, column_names_, column_types_, 0, - connector_que_size_, sampler_rt); + std::shared_ptr op = std::make_shared( + generator_function_, column_names_, column_types_, 0, connector_que_size_, sampler_rt, num_parallel_workers_); // set the number of rows from source length op->SetNumRows(source_len_); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.h index 0054cf972da..da1e99627e8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.h @@ -34,11 +34,12 @@ class GeneratorNode : public MappableSourceNode { public: /// \brief Constructor GeneratorNode(py::function generator_function, const std::vector &column_names, - const std::vector &column_types, int64_t source_len, std::shared_ptr sampler); + const std::vector &column_types, int64_t source_len, std::shared_ptr sampler, + uint32_t num_parallel_workers); /// \brief Constructor GeneratorNode(py::function generator_function, const std::shared_ptr &schema, int64_t source_len, - std::shared_ptr sampler); + std::shared_ptr sampler, uint32_t num_parallel_workers); /// \brief Destructor ~GeneratorNode() = default; @@ -107,6 +108,7 @@ class GeneratorNode : public MappableSourceNode { std::shared_ptr schema_; std::shared_ptr reset_ancestor_; // updated its immediate Repeat/EpochCtrl ancestor in GeneratorNodePass std::shared_ptr sampler_; + uint32_t num_parallel_workers_; int64_t source_len_; // Length of the dataset source provided by the user, -1 means it's unknown /// \brief Base-class override for accepting IRNodePass visitor diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index 4fd815a98c8..39d60bd3239 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -4326,11 +4326,12 @@ class GeneratorDataset(MappableDataset): def parse(self, children=None): if self.schema is None: return cde.GeneratorNode(self.prepared_source, self.column_names, self.column_types, self.source_len, - self.sampler) + self.sampler, self.num_parallel_workers) schema = self.schema if isinstance(schema, Schema): schema = self.schema.cpp_schema - return cde.GeneratorNode(self.prepared_source, schema, self.source_len, self.sampler) + return cde.GeneratorNode(self.prepared_source, schema, self.source_len, self.sampler, + self.num_parallel_workers) class TFRecordDataset(SourceDataset): diff --git a/mindspore/train/model.py b/mindspore/train/model.py index 50e457eb533..3961d97d37a 100644 --- a/mindspore/train/model.py +++ b/mindspore/train/model.py @@ -380,6 +380,7 @@ class Model: epoch_num=epoch_num, dataset_helper=dataset_helper) train_dataset._dataset_helper = dataset_helper + train_dataset._warmup_epoch = epoch def _init(self, train_dataset=None, valid_dataset=None, sink_size=-1, epoch=1): """ @@ -701,6 +702,12 @@ class Model: dataset_sink_mode = Validator.check_bool(dataset_sink_mode) if isinstance(self._train_network, nn.GraphCell) and dataset_sink_mode is True: raise ValueError("Dataset sink mode is currently not supported when training with a GraphCell.") + + if hasattr(train_dataset, '_warmup_epoch') and train_dataset._warmup_epoch != epoch: + raise ValueError("Use Model.build to initialize model, but the value of parameter `epoch` in Model.build " + "is not equal to value in Model.train, got {} and {} separately." + .format(train_dataset._warmup_epoch, epoch)) + Validator.check_is_int(sink_size) dataset_size = train_dataset.get_dataset_size() if dataset_size == 0: