diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h index a41ecd7447e..74ba18949fd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h @@ -49,10 +49,8 @@ class ParallelOp : public DatasetOp { epoch_sync_flag_(false), next_worker_id_(0) { // reduce excessive memory usage with high parallelism - // when num_workers > 4, reduce op_connector_size to have similar total size if there were only 4 workers constexpr int32_t worker_limit = 4; if (num_workers_ > worker_limit) { - oc_queue_size_ = std::max(1, op_connector_size * worker_limit / num_workers_); worker_connector_size_ = std::max(1, op_connector_size * worker_limit / num_workers_); } } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h index 3922c81ec91..3ad0a4ff847 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h @@ -67,7 +67,7 @@ class ClueOp : public NonMappableLeafOp { // Op name getter // @return Name of the current Op - std::string Name() const override { return "ClueOp"; } + std::string Name() const override { return "CLUEOp"; } private: // Reads a clue file and loads the data into multiple TensorRows. diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h index 79866201865..98fa4b3ae22 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h @@ -181,7 +181,7 @@ class CsvOp : public NonMappableLeafOp { /// Op name getter /// @return Name of the current Op - std::string Name() const override { return "CsvOp"; } + std::string Name() const override { return "CSVOp"; } // DatasetName name getter // \return DatasetName of the current Op diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc index 8e3bbd68441..13d73f82ec6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/batch_node.cc @@ -61,6 +61,8 @@ std::shared_ptr BatchNode::Copy() { #else auto node = std::make_shared(nullptr, batch_size_, drop_remainder_); #endif + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -155,6 +157,7 @@ Status BatchNode::AcceptAfter(IRNodePass *const p, bool *const modified) { Status BatchNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["batch_size"] = batch_size_; args["drop_remainder"] = drop_remainder_; #ifdef ENABLE_PYTHON @@ -170,12 +173,14 @@ Status BatchNode::to_json(nlohmann::json *out_json) { Status BatchNode::from_json(nlohmann::json json_obj, std::shared_ptr ds, std::shared_ptr *result) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kBatchNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kBatchNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "batch_size", kBatchNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "drop_remainder", kBatchNode)); int32_t batch_size = json_obj["batch_size"]; bool drop_remainder = json_obj["drop_remainder"]; *result = std::make_shared(ds, batch_size, drop_remainder); (*result)->SetNumWorkers(json_obj["num_parallel_workers"]); + (*result)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.cc index 63e290b79ed..b1b8bdbaddb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.cc @@ -228,6 +228,11 @@ std::shared_ptr DatasetNode::SetNumWorkers(int32_t num_workers) { return shared_from_this(); } +std::shared_ptr DatasetNode::SetConnectorQueueSize(int32_t connector_queue_size) { + connector_que_size_ = connector_queue_size; + return shared_from_this(); +} + std::shared_ptr DatasetNode::SetDatasetCache(const std::shared_ptr &cache) { cache_ = cache; return shared_from_this(); @@ -657,6 +662,7 @@ Status DatasetNode::ValidateParams() { Status DatasetNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; *out_json = args; return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h index 0508431a2f5..60e66c93d10 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/dataset_node.h @@ -303,6 +303,9 @@ class DatasetNode : public std::enable_shared_from_this { /// \brief Getter of the number of workers int32_t NumWorkers() const { return num_workers_; } + /// \brief Getter of the connector queue size + int32_t ConnectorQueueSize() { return connector_que_size_; } + /// \brief Getter of dataset cache std::shared_ptr GetDatasetCache() { return cache_; } @@ -311,6 +314,8 @@ class DatasetNode : public std::enable_shared_from_this { /// \return Shared pointer to the original object std::shared_ptr SetNumWorkers(int32_t num_workers); + std::shared_ptr SetConnectorQueueSize(int32_t connector_queue_size); + /// \brief Setter function for DatasetCache /// \param[in] cache Shared pointer to DatasetCache /// \return Shared pointer to the original object diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc index 2fed0075008..f15254370d0 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/map_node.cc @@ -50,6 +50,8 @@ std::shared_ptr MapNode::Copy() { std::vector> operations = operations_; auto node = std::make_shared(nullptr, operations, input_columns_, output_columns_, project_columns_, cache_, callbacks_, offload_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -80,8 +82,6 @@ Status MapNode::Build(std::vector> *const node_ops) { RETURN_STATUS_UNEXPECTED("MapNode containing random operation is not supported as a descendant of cache."); } } - // This parameter will be removed with next rebase - std::vector col_orders; auto map_op = std::make_shared(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_); if (!callbacks_.empty()) { @@ -156,6 +156,7 @@ Status MapNode::to_json(nlohmann::json *out_json) { RETURN_UNEXPECTED_IF_NULL(out_json); nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["input_columns"] = input_columns_; args["output_columns"] = output_columns_; args["project_columns"] = project_columns_; @@ -192,6 +193,7 @@ Status MapNode::to_json(nlohmann::json *out_json) { Status MapNode::from_json(nlohmann::json json_obj, std::shared_ptr ds, std::shared_ptr *result) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kMapNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kMapNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "input_columns", kMapNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "output_columns", kMapNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "project_columns", kMapNode)); @@ -203,6 +205,7 @@ Status MapNode::from_json(nlohmann::json json_obj, std::shared_ptr RETURN_IF_NOT_OK(Serdes::ConstructTensorOps(json_obj["operations"], &operations)); *result = std::make_shared(ds, operations, input_columns, output_columns, project_columns); (*result)->SetNumWorkers(json_obj["num_parallel_workers"]); + (*result)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/ag_news_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/ag_news_node.cc index 27261dd12a5..f6d856f26fe 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/ag_news_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/ag_news_node.cc @@ -45,6 +45,8 @@ AGNewsNode::AGNewsNode(const std::string &dataset_dir, int64_t num_samples, Shuf std::shared_ptr AGNewsNode::Copy() { auto node = std::make_shared(dataset_dir_, num_samples_, shuffle_, usage_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -131,6 +133,7 @@ Status AGNewsNode::GetDatasetSize(const std::shared_ptr &size Status AGNewsNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc index 634643bc11c..156340c9566 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc @@ -45,6 +45,8 @@ AlbumNode::AlbumNode(const std::string &dataset_dir, const std::string &data_sch std::shared_ptr AlbumNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, schema_path_, column_names_, decode_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -132,6 +134,7 @@ Status AlbumNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["decode"] = decode_; args["data_schema"] = schema_path_; @@ -148,6 +151,7 @@ Status AlbumNode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status AlbumNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kAlbumNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kAlbumNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kAlbumNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "data_schema", kAlbumNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "column_names", kAlbumNode)); @@ -163,6 +167,7 @@ Status AlbumNode::from_json(nlohmann::json json_obj, std::shared_ptr(dataset_dir, data_schema, column_names, decode, sampler, cache); (void)((*ds)->SetNumWorkers(json_obj["num_parallel_workers"])); + (void)((*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"])); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/amazon_review_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/amazon_review_node.cc index cac6650c608..69fcc777b37 100755 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/amazon_review_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/amazon_review_node.cc @@ -44,6 +44,8 @@ AmazonReviewNode::AmazonReviewNode(const std::string &dataset_dir, const std::st std::shared_ptr AmazonReviewNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -133,6 +135,7 @@ Status AmazonReviewNode::GetDatasetSize(const std::shared_ptr Status AmazonReviewNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/caltech256_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/caltech256_node.cc index 921318cc724..b33d6a21a59 100755 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/caltech256_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/caltech256_node.cc @@ -103,6 +103,7 @@ Status Caltech256Node::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["decode"] = decode_; if (cache_ != nullptr) { @@ -117,6 +118,7 @@ Status Caltech256Node::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status Caltech256Node::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kCaltech256Node)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kCaltech256Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kCaltech256Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "decode", kCaltech256Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kCaltech256Node)); @@ -127,7 +129,8 @@ Status Caltech256Node::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_dir, decode, sampler, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/celeba_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/celeba_node.cc index c23fafc7375..79e703548b7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/celeba_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/celeba_node.cc @@ -46,6 +46,8 @@ CelebANode::CelebANode(const std::string &dataset_dir, const std::string &usage, std::shared_ptr CelebANode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, decode_, extensions_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -176,6 +178,7 @@ Status CelebANode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["decode"] = decode_; args["extensions"] = extensions_; @@ -192,6 +195,7 @@ Status CelebANode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status CelebANode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kCelebANode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kCelebANode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kCelebANode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kCelebANode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kCelebANode)); @@ -206,7 +210,8 @@ Status CelebANode::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_dir, usage, sampler, decode, extension, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar100_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar100_node.cc index 048884bec64..fb8370b6a2a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar100_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar100_node.cc @@ -38,6 +38,8 @@ Cifar100Node::Cifar100Node(const std::string &dataset_dir, const std::string &us std::shared_ptr Cifar100Node::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -110,6 +112,7 @@ Status Cifar100Node::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { @@ -124,6 +127,7 @@ Status Cifar100Node::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status Cifar100Node::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kCifar100Node)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kCifar100Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kCifar100Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kCifar100Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kCifar100Node)); @@ -134,7 +138,8 @@ Status Cifar100Node::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_dir, usage, sampler, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar10_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar10_node.cc index 2da1b871fb2..115d5baa98f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar10_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cifar10_node.cc @@ -38,6 +38,8 @@ Cifar10Node::Cifar10Node(const std::string &dataset_dir, const std::string &usag std::shared_ptr Cifar10Node::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -111,6 +113,7 @@ Status Cifar10Node::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { @@ -125,6 +128,7 @@ Status Cifar10Node::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status Cifar10Node::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kCifar10Node)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kCifar10Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kCifar10Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kCifar10Node)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kCifar10Node)); @@ -135,7 +139,8 @@ Status Cifar10Node::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_dir, usage, sampler, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cityscapes_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cityscapes_node.cc index 40f990a0272..e42987ad293 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cityscapes_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cityscapes_node.cc @@ -40,6 +40,8 @@ CityscapesNode::CityscapesNode(const std::string &dataset_dir, const std::string std::shared_ptr CityscapesNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, quality_mode_, task_, decode_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -127,6 +129,7 @@ Status CityscapesNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["quality_mode"] = quality_mode_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc index ea5db97c65c..66139df3d35 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc @@ -40,6 +40,8 @@ CLUENode::CLUENode(const std::vector clue_files, std::string task, std::shared_ptr CLUENode::Copy() { auto node = std::make_shared(dataset_files_, task_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -223,6 +225,7 @@ Status CLUENode::GetDatasetSize(const std::shared_ptr &size_g Status CLUENode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_files_; args["task"] = task_; args["usage"] = usage_; @@ -241,6 +244,7 @@ Status CLUENode::to_json(nlohmann::json *out_json) { Status CLUENode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kCLUENode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kCLUENode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kCLUENode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "task", kCLUENode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kCLUENode)); @@ -258,7 +262,8 @@ Status CLUENode::from_json(nlohmann::json json_obj, std::shared_ptr std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_files, task, usage, num_samples, shuffle, num_shards, shard_id, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } // Note: The following two functions are common among NonMappableSourceNode and should be promoted to its parent diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cmu_arctic_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cmu_arctic_node.cc index a2e171627a5..a91a1deac1b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cmu_arctic_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/cmu_arctic_node.cc @@ -34,6 +34,8 @@ void CMUArcticNode::Print(std::ostream &out) const { out << Name(); } std::shared_ptr CMUArcticNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, name_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -102,6 +104,7 @@ Status CMUArcticNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["name"] = name_; if (cache_ != nullptr) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/coco_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/coco_node.cc index 12fa15d94a9..14f6bb85351 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/coco_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/coco_node.cc @@ -46,6 +46,8 @@ std::shared_ptr CocoNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, annotation_file_, task_, decode_, sampler, cache_, extra_metadata_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -179,6 +181,7 @@ Status CocoNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["annotation_file"] = annotation_file_; args["task"] = task_; @@ -196,6 +199,7 @@ Status CocoNode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status CocoNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kCocoNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kCocoNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kCocoNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "annotation_file", kCocoNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "task", kCocoNode)); @@ -212,7 +216,8 @@ Status CocoNode::from_json(nlohmann::json json_obj, std::shared_ptr RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); bool extra_metadata = json_obj["extra_metadata"]; *ds = std::make_shared(dataset_dir, annotation_file, task, decode, sampler, cache, extra_metadata); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/conll2000_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/conll2000_node.cc index 885914256a0..1b957597c39 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/conll2000_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/conll2000_node.cc @@ -46,6 +46,8 @@ CoNLL2000Node::CoNLL2000Node(const std::string &dataset_dir, const std::string & std::shared_ptr CoNLL2000Node::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -142,6 +144,7 @@ Status CoNLL2000Node::GetDatasetSize(const std::shared_ptr &s Status CoNLL2000Node::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc index b456bd80e17..367e2fb5d84 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc @@ -49,6 +49,8 @@ CSVNode::CSVNode(const std::vector &csv_files, char field_delim, std::shared_ptr CSVNode::Copy() { auto node = std::make_shared(dataset_files_, field_delim_, column_defaults_, column_names_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -163,6 +165,7 @@ Status CSVNode::GetDatasetSize(const std::shared_ptr &size_ge Status CSVNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_files"] = dataset_files_; args["field_delim"] = std::string(1, field_delim_); args["column_names"] = column_names_; @@ -181,6 +184,7 @@ Status CSVNode::to_json(nlohmann::json *out_json) { Status CSVNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kCSVNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kCSVNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_files", kCSVNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "field_delim", kCSVNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "column_names", kCSVNode)); @@ -200,7 +204,8 @@ Status CSVNode::from_json(nlohmann::json json_obj, std::shared_ptr RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_files, field_delim.c_str()[0], column_defaults, column_names, num_samples, shuffle, num_shards, shard_id, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/dbpedia_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/dbpedia_node.cc index fba36d2cff7..f92ad609829 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/dbpedia_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/dbpedia_node.cc @@ -46,6 +46,8 @@ DBpediaNode::DBpediaNode(const std::string &dataset_dir, const std::string &usag std::shared_ptr DBpediaNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -163,6 +165,7 @@ Status DBpediaNode::GetDatasetSize(const std::shared_ptr &siz Status DBpediaNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/div2k_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/div2k_node.cc index 7040202fb12..38891ca4ae0 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/div2k_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/div2k_node.cc @@ -41,6 +41,8 @@ DIV2KNode::DIV2KNode(const std::string &dataset_dir, const std::string &usage, c std::shared_ptr DIV2KNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, downgrade_, scale_, decode_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -136,6 +138,7 @@ Status DIV2KNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["downgrade"] = downgrade_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/emnist_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/emnist_node.cc index 020c53f1f9b..ccc56809ba7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/emnist_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/emnist_node.cc @@ -33,6 +33,8 @@ EMnistNode::EMnistNode(const std::string &dataset_dir, const std::string &name, std::shared_ptr EMnistNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, name_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -105,6 +107,7 @@ Status EMnistNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["name"] = name_; args["usage"] = usage_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/en_wik9_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/en_wik9_node.cc index 51fb4ff4177..712c044d36d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/en_wik9_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/en_wik9_node.cc @@ -46,6 +46,8 @@ EnWik9Node::EnWik9Node(const std::string &dataset_dir, int32_t num_samples, Shuf std::shared_ptr EnWik9Node::Copy() { auto node = std::make_shared(dataset_dir_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -135,6 +137,7 @@ Status EnWik9Node::GetDatasetSize(const std::shared_ptr &size Status EnWik9Node::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["num_samples"] = num_samples_; args["shuffle"] = shuffle_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fake_image_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fake_image_node.cc index 4088e01d825..b99fdbb967f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fake_image_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fake_image_node.cc @@ -39,6 +39,8 @@ FakeImageNode::FakeImageNode(int32_t num_images, const std::vector &ima std::shared_ptr FakeImageNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(num_images_, image_size_, num_classes_, base_seed_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -119,7 +121,7 @@ Status FakeImageNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; - + args["connector_queue_size"] = connector_que_size_; args["num_images"] = num_images_; args["image_size"] = image_size_; args["num_classes"] = num_classes_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fashion_mnist_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fashion_mnist_node.cc index 99da401fbdf..a800ed86235 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fashion_mnist_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/fashion_mnist_node.cc @@ -33,6 +33,8 @@ FashionMnistNode::FashionMnistNode(const std::string &dataset_dir, const std::st std::shared_ptr FashionMnistNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -100,6 +102,7 @@ Status FashionMnistNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/flickr_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/flickr_node.cc index 7ff09bcf280..4cf7ee1cb62 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/flickr_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/flickr_node.cc @@ -42,6 +42,8 @@ FlickrNode::FlickrNode(const std::string &dataset_dir, const std::string &annota std::shared_ptr FlickrNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, annotation_file_, decode_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -133,6 +135,7 @@ Status FlickrNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["annotation_file"] = annotation_file_; args["decode"] = decode_; @@ -147,6 +150,7 @@ Status FlickrNode::to_json(nlohmann::json *out_json) { Status FlickrNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kFlickrNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kFlickrNode)); CHECK_FAIL_RETURN_UNEXPECTED(json_obj.find("dataset_dir") != json_obj.end(), "Failed to find dataset_dir"); CHECK_FAIL_RETURN_UNEXPECTED(json_obj.find("annotation_file") != json_obj.end(), "Failed to find annotation_file"); CHECK_FAIL_RETURN_UNEXPECTED(json_obj.find("decode") != json_obj.end(), "Failed to find decode"); @@ -158,7 +162,8 @@ Status FlickrNode::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_dir, annotation_file, decode, sampler, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc index a157baaef85..c130f4992fd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/generator_node.cc @@ -55,6 +55,8 @@ std::shared_ptr GeneratorNode::Copy() { } else { node = std::make_shared(generator_function_, schema_, source_len_, sampler_, num_parallel_workers_); } + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/gtzan_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/gtzan_node.cc index 888cf7ffce2..7ff244d22a8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/gtzan_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/gtzan_node.cc @@ -34,6 +34,8 @@ void GTZANNode::Print(std::ostream &out) const { out << Name(); } std::shared_ptr GTZANNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -96,6 +98,7 @@ Status GTZANNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/image_folder_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/image_folder_node.cc index cb8abe9cac2..c97356336a3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/image_folder_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/image_folder_node.cc @@ -48,6 +48,8 @@ std::shared_ptr ImageFolderNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, decode_, sampler, recursive_, exts_, class_indexing_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -115,6 +117,7 @@ Status ImageFolderNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["recursive"] = recursive_; args["decode"] = decode_; @@ -132,6 +135,7 @@ Status ImageFolderNode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status ImageFolderNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kImageFolderNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kImageFolderNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kImageFolderNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "decode", kImageFolderNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kImageFolderNode)); @@ -154,7 +158,8 @@ Status ImageFolderNode::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_dir, decode, sampler, recursive, extension, class_indexing, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/imdb_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/imdb_node.cc index 35e6584c9c5..3c9278f3da3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/imdb_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/imdb_node.cc @@ -38,6 +38,8 @@ IMDBNode::IMDBNode(const std::string &dataset_dir, const std::string &usage, std std::shared_ptr IMDBNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -106,6 +108,7 @@ Status IMDBNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { @@ -121,6 +124,7 @@ Status IMDBNode::to_json(nlohmann::json *out_json) { Status IMDBNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_UNEXPECTED_IF_NULL(ds); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kIMDBNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kIMDBNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kIMDBNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kIMDBNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kIMDBNode)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2016_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2016_node.cc index d2bb6645777..9d9f9082ab9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2016_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2016_node.cc @@ -59,6 +59,8 @@ IWSLT2016Node::IWSLT2016Node(const std::string &dataset_dir, const std::string & std::shared_ptr IWSLT2016Node::Copy() { auto node = std::make_shared(dataset_dir_, usage_, language_pair_, valid_set_, test_set_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -157,6 +159,7 @@ Status IWSLT2016Node::GetDatasetSize(const std::shared_ptr &s Status IWSLT2016Node::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["language_pair"] = language_pair_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2017_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2017_node.cc index 45c122e6a3a..840e1205dbc 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2017_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/iwslt2017_node.cc @@ -57,6 +57,8 @@ IWSLT2017Node::IWSLT2017Node(const std::string &dataset_dir, const std::string & std::shared_ptr IWSLT2017Node::Copy() { auto node = std::make_shared(dataset_dir_, usage_, language_pair_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -150,6 +152,7 @@ Status IWSLT2017Node::GetDatasetSize(const std::shared_ptr &s Status IWSLT2017Node::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["language_pair"] = language_pair_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/kmnist_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/kmnist_node.cc index b315dab8b37..2c7ffcd6752 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/kmnist_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/kmnist_node.cc @@ -33,6 +33,8 @@ KMnistNode::KMnistNode(const std::string &dataset_dir, const std::string &usage, std::shared_ptr KMnistNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -100,6 +102,7 @@ Status KMnistNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/libri_tts_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/libri_tts_node.cc index 4b7d6bd7821..c1166045aae 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/libri_tts_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/libri_tts_node.cc @@ -34,6 +34,8 @@ void LibriTTSNode::Print(std::ostream &out) const { out << Name(); } std::shared_ptr LibriTTSNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -107,6 +109,7 @@ Status LibriTTSNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/lj_speech_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/lj_speech_node.cc index a6929c46088..cc129268f21 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/lj_speech_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/lj_speech_node.cc @@ -30,6 +30,8 @@ LJSpeechNode::LJSpeechNode(const std::string &dataset_dir, std::shared_ptr LJSpeechNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -104,6 +106,7 @@ Status LJSpeechNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; if (cache_ != nullptr) { nlohmann::json cache_args; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/manifest_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/manifest_node.cc index 116e6d81118..207382d63f9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/manifest_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/manifest_node.cc @@ -45,6 +45,8 @@ ManifestNode::ManifestNode(const std::string &dataset_file, const std::string &u std::shared_ptr ManifestNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_file_, usage_, sampler, class_index_, decode_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -138,6 +140,7 @@ Status ManifestNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_file"] = dataset_file_; args["usage"] = usage_; args["class_indexing"] = class_index_; @@ -155,6 +158,7 @@ Status ManifestNode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status ManifestNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kManifestNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kManifestNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_file", kManifestNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kManifestNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kManifestNode)); @@ -175,7 +179,8 @@ Status ManifestNode::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_file, usage, sampler, class_indexing, decode, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/mnist_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/mnist_node.cc index b29535e2a1e..241b7fa48c0 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/mnist_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/mnist_node.cc @@ -37,6 +37,8 @@ MnistNode::MnistNode(std::string dataset_dir, std::string usage, std::shared_ptr std::shared_ptr MnistNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -62,7 +64,6 @@ Status MnistNode::Build(std::vector> *const node_ops) schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar))); std::shared_ptr sampler_rt = nullptr; RETURN_IF_NOT_OK(sampler_->SamplerBuild(&sampler_rt)); - auto op = std::make_shared(usage_, num_workers_, dataset_dir_, connector_que_size_, std::move(schema), std::move(sampler_rt)); op->SetTotalRepeats(GetTotalRepeats()); @@ -104,6 +105,7 @@ Status MnistNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { @@ -118,6 +120,7 @@ Status MnistNode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status MnistNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kMnistNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kMnistNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kMnistNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kMnistNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "sampler", kMnistNode)); @@ -129,6 +132,7 @@ Status MnistNode::from_json(nlohmann::json json_obj, std::shared_ptr(dataset_dir, usage, sampler, cache); (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/multi30k_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/multi30k_node.cc index bfe43688b0f..6edf7a14321 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/multi30k_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/multi30k_node.cc @@ -50,6 +50,8 @@ void Multi30kNode::Print(std::ostream &out) const { std::shared_ptr Multi30kNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, language_pair_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -139,6 +141,7 @@ Status Multi30kNode::GetDatasetSize(const std::shared_ptr &si Status Multi30kNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["num_samples"] = num_samples_; args["shuffle"] = shuffle_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/penn_treebank_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/penn_treebank_node.cc index 23680314b73..a25ebd41090 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/penn_treebank_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/penn_treebank_node.cc @@ -49,6 +49,8 @@ PennTreebankNode::PennTreebankNode(const std::string &dataset_dir, const std::st std::shared_ptr PennTreebankNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -131,6 +133,7 @@ Status PennTreebankNode::GetDatasetSize(const std::shared_ptr Status PennTreebankNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/photo_tour_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/photo_tour_node.cc index d716484e1f1..6ef4b4c4778 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/photo_tour_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/photo_tour_node.cc @@ -33,6 +33,8 @@ PhotoTourNode::PhotoTourNode(const std::string &dataset_dir, const std::string & std::shared_ptr PhotoTourNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, name_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -116,6 +118,7 @@ Status PhotoTourNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["name"] = name_; args["usage"] = usage_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/places365_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/places365_node.cc index 30bd48b4228..b233615ce14 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/places365_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/places365_node.cc @@ -38,6 +38,8 @@ Places365Node::Places365Node(const std::string &dataset_dir, const std::string & std::shared_ptr Places365Node::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, small_, decode_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -103,6 +105,7 @@ Status Places365Node::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["small"] = small_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/qmnist_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/qmnist_node.cc index 3e0b7ce89b1..bff14ba2c8f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/qmnist_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/qmnist_node.cc @@ -40,6 +40,8 @@ QMnistNode::QMnistNode(const std::string &dataset_dir, const std::string &usage, std::shared_ptr QMnistNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, compat_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -114,6 +116,7 @@ Status QMnistNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["compat"] = compat_; @@ -129,6 +132,7 @@ Status QMnistNode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status QMnistNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kQMnistNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kQMnistNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kQMnistNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kQMnistNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "compat", kQMnistNode)); @@ -141,7 +145,8 @@ Status QMnistNode::from_json(nlohmann::json json_obj, std::shared_ptr cache = nullptr; RETURN_IF_NOT_OK(DatasetCache::from_json(json_obj, &cache)); *ds = std::make_shared(dataset_dir, usage, compat, sampler, cache); - (*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetNumWorkers(json_obj["num_parallel_workers"]); + (void)(*ds)->SetConnectorQueueSize(json_obj["connector_queue_size"]); return Status::OK(); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sbu_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sbu_node.cc index 0f2d3b6b0c7..65eedd89cef 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sbu_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sbu_node.cc @@ -33,6 +33,8 @@ SBUNode::SBUNode(const std::string &dataset_dir, bool decode, const std::shared_ std::shared_ptr SBUNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, decode_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -108,6 +110,7 @@ Status SBUNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["decode"] = decode_; if (cache_ != nullptr) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/semeion_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/semeion_node.cc index d52e600422a..54c06024348 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/semeion_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/semeion_node.cc @@ -31,6 +31,8 @@ SemeionNode::SemeionNode(const std::string &dataset_dir, const std::shared_ptr SemeionNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -98,6 +100,7 @@ Status SemeionNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; if (cache_ != nullptr) { nlohmann::json cache_args; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sogou_news_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sogou_news_node.cc index 317eee06972..57e8f2a7d44 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sogou_news_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/sogou_news_node.cc @@ -46,6 +46,8 @@ SogouNewsNode::SogouNewsNode(const std::string &dataset_dir, const std::string & std::shared_ptr SogouNewsNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -136,6 +138,7 @@ Status SogouNewsNode::GetDatasetSize(const std::shared_ptr &s Status SogouNewsNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/speech_commands_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/speech_commands_node.cc index 2e987979eeb..e887c6cd4fc 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/speech_commands_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/speech_commands_node.cc @@ -28,6 +28,8 @@ SpeechCommandsNode::SpeechCommandsNode(const std::string &dataset_dir, const std std::shared_ptr SpeechCommandsNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -104,6 +106,7 @@ Status SpeechCommandsNode::to_json(nlohmann::json *out_json) { args["sampler"] = sampler_args; args["usage"] = usage_; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; if (cache_ != nullptr) { nlohmann::json cache_args; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/squad_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/squad_node.cc index f4ad050aa70..3374c4aebfc 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/squad_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/squad_node.cc @@ -46,6 +46,8 @@ void SQuADNode::Print(std::ostream &out) const { std::shared_ptr SQuADNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -137,6 +139,7 @@ Status SQuADNode::GetDatasetSize(const std::shared_ptr &size_ Status SQuADNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/stl10_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/stl10_node.cc index 9eaa374d693..5d3b4364437 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/stl10_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/stl10_node.cc @@ -33,6 +33,8 @@ STL10Node::STL10Node(const std::string &dataset_dir, const std::string &usage, s std::shared_ptr STL10Node::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -107,6 +109,7 @@ Status STL10Node::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; if (cache_ != nullptr) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tedlium_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tedlium_node.cc index 212979e695c..c6195225fe4 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tedlium_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tedlium_node.cc @@ -37,6 +37,8 @@ TedliumNode::TedliumNode(const std::string &dataset_dir, const std::string &rele std::shared_ptr TedliumNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, release_, usage_, extensions_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -136,6 +138,7 @@ Status TedliumNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["release"] = release_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc index 5de7ffd2e67..1a30bde4c35 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc @@ -43,6 +43,8 @@ TextFileNode::TextFileNode(std::vector dataset_files, int32_t num_s std::shared_ptr TextFileNode::Copy() { auto node = std::make_shared(dataset_files_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -132,6 +134,7 @@ Status TextFileNode::GetDatasetSize(const std::shared_ptr &si Status TextFileNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_files"] = dataset_files_; args["num_samples"] = num_samples_; args["shuffle"] = shuffle_; @@ -148,6 +151,7 @@ Status TextFileNode::to_json(nlohmann::json *out_json) { Status TextFileNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kTextFileNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kTextFileNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_files", kTextFileNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_samples", kTextFileNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "shuffle", kTextFileNode)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc index 15f749f3130..75c6f67c034 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc @@ -41,6 +41,8 @@ std::shared_ptr TFRecordNode::Copy() { node = std::make_shared(dataset_files_, schema_path_, columns_list_, num_samples_, shuffle_, num_shards_, shard_id_, shard_equal_rows_, cache_); } + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -177,6 +179,7 @@ Status TFRecordNode::to_json(nlohmann::json *out_json) { RETURN_UNEXPECTED_IF_NULL(out_json); nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_files"] = dataset_files_; args["columns_list"] = columns_list_; args["num_samples"] = num_samples_; @@ -206,6 +209,7 @@ Status TFRecordNode::to_json(nlohmann::json *out_json) { Status TFRecordNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kTFRecordNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kTFRecordNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_files", kTFRecordNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "columns_list", kTFRecordNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_samples", kTFRecordNode)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/udpos_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/udpos_node.cc index b91900efbcf..b6a6547af6f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/udpos_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/udpos_node.cc @@ -44,6 +44,8 @@ UDPOSNode::UDPOSNode(const std::string &dataset_dir, const std::string &usage, i std::shared_ptr UDPOSNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -136,6 +138,7 @@ Status UDPOSNode::GetDatasetSize(const std::shared_ptr &size_ Status UDPOSNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/usps_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/usps_node.cc index 599009f6922..aad0f04c0a1 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/usps_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/usps_node.cc @@ -45,6 +45,8 @@ USPSNode::USPSNode(const std::string &dataset_dir, const std::string &usage, int std::shared_ptr USPSNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -125,6 +127,7 @@ Status USPSNode::GetDatasetSize(const std::shared_ptr &size_g Status USPSNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/voc_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/voc_node.cc index c1e485c2513..8cefb2e901a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/voc_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/voc_node.cc @@ -48,6 +48,8 @@ std::shared_ptr VOCNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, task_, usage_, class_index_, decode_, sampler, cache_, extra_metadata_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -167,6 +169,7 @@ Status VOCNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["task"] = task_; args["usage"] = usage_; @@ -185,6 +188,7 @@ Status VOCNode::to_json(nlohmann::json *out_json) { #ifndef ENABLE_ANDROID Status VOCNode::from_json(nlohmann::json json_obj, std::shared_ptr *ds) { RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "num_parallel_workers", kTFRecordNode)); + RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "connector_queue_size", kTFRecordNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "dataset_dir", kTFRecordNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "task", kTFRecordNode)); RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "usage", kTFRecordNode)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wider_face_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wider_face_node.cc index 094f78422de..9e790c35b72 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wider_face_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wider_face_node.cc @@ -36,6 +36,8 @@ WIDERFaceNode::WIDERFaceNode(const std::string &dataset_dir, const std::string & std::shared_ptr WIDERFaceNode::Copy() { std::shared_ptr sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); auto node = std::make_shared(dataset_dir_, usage_, decode_, sampler, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -116,6 +118,7 @@ Status WIDERFaceNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["decode"] = decode_; args["usage"] = usage_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wiki_text_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wiki_text_node.cc index 2bc014d7775..5e2503851bd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wiki_text_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/wiki_text_node.cc @@ -49,6 +49,8 @@ WikiTextNode::WikiTextNode(const std::string &dataset_dir, const std::string &us std::shared_ptr WikiTextNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -131,6 +133,7 @@ Status WikiTextNode::GetDatasetSize(const std::shared_ptr &si Status WikiTextNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yahoo_answers_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yahoo_answers_node.cc index 0036d6d4832..4a81de906b8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yahoo_answers_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yahoo_answers_node.cc @@ -47,6 +47,8 @@ YahooAnswersNode::YahooAnswersNode(const std::string &dataset_dir, const std::st std::shared_ptr YahooAnswersNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -170,6 +172,7 @@ Status YahooAnswersNode::GetDatasetSize(const std::shared_ptr Status YahooAnswersNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yelp_review_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yelp_review_node.cc index 1ed2d7aabdd..eefa111f0ba 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yelp_review_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yelp_review_node.cc @@ -47,6 +47,8 @@ YelpReviewNode::YelpReviewNode(const std::string &dataset_dir, const std::string std::shared_ptr YelpReviewNode::Copy() { auto node = std::make_shared(dataset_dir_, usage_, num_samples_, shuffle_, num_shards_, shard_id_, cache_); + node->SetNumWorkers(num_workers_); + node->SetConnectorQueueSize(connector_que_size_); return node; } @@ -134,6 +136,7 @@ Status YelpReviewNode::GetDatasetSize(const std::shared_ptr & Status YelpReviewNode::to_json(nlohmann::json *out_json) { nlohmann::json args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; args["usage"] = usage_; args["num_samples"] = num_samples_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yes_no_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yes_no_node.cc index bee2291bd45..91ed4c8a90f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yes_no_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/yes_no_node.cc @@ -102,6 +102,7 @@ Status YesNoNode::to_json(nlohmann::json *out_json) { RETURN_IF_NOT_OK(sampler_->to_json(&sampler_args)); args["sampler"] = sampler_args; args["num_parallel_workers"] = num_workers_; + args["connector_queue_size"] = connector_que_size_; args["dataset_dir"] = dataset_dir_; if (cache_ != nullptr) { nlohmann::json cache_args; diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/deep_copy_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/deep_copy_pass.cc index b1790e7bf92..76b85bbe5d9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/deep_copy_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/deep_copy_pass.cc @@ -48,10 +48,11 @@ Status DeepCopyPass::Visit(std::shared_ptr node, bool *const modifi // Clone a new copy of this node std::shared_ptr new_node = node->Copy(); - // Temporary fix to set the num_workers to each cloned node. + // Temporary fix to set the num_workers and connector_queue_size to each cloned node. // This can be improved by adding a new method in the base class DatasetNode to transfer the properties to // the cloned node. Each derived class's Copy() will need to include this method. new_node->SetNumWorkers(node->NumWorkers()); + new_node->SetConnectorQueueSize(node->ConnectorQueueSize()); // This method below assumes a DFS walk and from the first child to the last child. // Future: A more robust implementation that does not depend on the above assumption. RETURN_IF_NOT_OK(parent_->AppendChild(new_node)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc index 868079da2f2..06524d42edb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc @@ -21,10 +21,11 @@ #include #include #include +#include #ifndef ENABLE_ANDROID #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h" +#include "minddata/dataset/engine/serdes.h" #endif - #include "minddata/dataset/util/task_manager.h" namespace mindspore { @@ -53,8 +54,22 @@ Status AutoTune::Main() { } else if (step_gap_ == 0) { mode_ = AutoTuneMode::kAutoTuneModeEpoch; } + const bool nodes_offloaded = !tree_adapter_->GetOffloadJson().empty(); + if (nodes_offloaded) { + // When nodes are offloaded they are removed from the optimized IR tree. + // Serializing the optimized IR Tree and then deserializing will not work. + MS_LOG(WARNING) << "Some nodes have been offloaded. AutoTune is unable to write the autotune configuration to " + "disk. Disable offload to prevent this from happening."; + } + bool output_final_config = save_autoconfig_ && !nodes_offloaded; + bool output_intermediate_config = save_intermediate_autoconfig_ && output_final_config; Status rc; + int loop_cnt = 0; while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) { +#ifndef ENABLE_ANDROID + auto last_epoch = cur_epoch_; + auto last_step = cur_step_; +#endif if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) { rc = RunIterationEpoch(); } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) { @@ -65,6 +80,16 @@ Status AutoTune::Main() { RETURN_IF_NOT_OK(profiling_manager_->Stop()); break; } +#ifndef ENABLE_ANDROID + if (last_epoch != cur_epoch_ || last_step != cur_step_) { + if (output_intermediate_config && + (SaveAutotuneConfig(tree_adapter_->tree_->GetUniqueId() + "_autotune_" + std::to_string(loop_cnt) + ".json") + .IsError())) { + MS_LOG(WARNING) << "Failed to write current iteration autotune configuration to disk"; + } + ++loop_cnt; + } +#endif rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->monitor_sampling_interval()); // the thread may be interrupted for tree termination when waiting (we should not report error in this case) if (rc.IsError() && rc != StatusCode::kMDInterrupted) { @@ -79,11 +104,43 @@ Status AutoTune::Main() { << "mindspore.dataset.config.set_num_parallel_workers"; MS_LOG(INFO) << "Suggest to choose maximum prefetch_size from tuned result and set by global setting API: " << "mindspore.dataset.config.set_prefetch_size"; +#ifndef ENABLE_ANDROID + if (output_final_config && (SaveAutotuneConfig(autotune_json_filepath_).IsError())) { + MS_LOG(WARNING) << "Failed to write final autotune configuration to disk"; + } +#endif return Status::OK(); } -void AutoTune::PrintTreeConfiguration() { - ExecutionTree *tree = tree_adapter_->tree_.get(); +#ifndef ENABLE_ANDROID +Status AutoTune::SaveAutotuneConfig(const std::string &file_name) { + RETURN_IF_NOT_OK(SetAutotuneConfigJson()); + // The Execution Tree is built by visiting the optimized IR Tree in DFS order. + // So we visit the optimized IR tree in DFS order and try to match each IR node with its corresponding dataset op. + RETURN_IF_NOT_OK(Serdes::UpdateOptimizedIRTreeJSON(&autotune_config_json_, ops_)); + RETURN_IF_NOT_OK(Serdes::SaveJSONToFile(autotune_config_json_, file_name)); + return Status::OK(); +} + +Status AutoTune::SetAutotuneConfigJson() { + if (autotune_config_json_.empty()) { + nlohmann::json out_json; + RETURN_IF_NOT_OK(Serdes::SaveToJSON(tree_adapter_->RootIRNode(), "", &out_json)); + // We do not want to serialize TransferNode/DeviceQueueOp + if (out_json["op_type"] == kTransferNode) { + CHECK_FAIL_RETURN_UNEXPECTED( + out_json["children"].size() == 1, + "Expected Transfer node to have exactly 1 child but it has " + std::to_string(out_json["children"].size())); + out_json = out_json["children"][0]; + } + autotune_config_json_ = std::move(out_json); + } + return Status::OK(); +} +#endif + +void AutoTune::PrintTreeConfiguration() const { + ExecutionTree const *tree = tree_adapter_->tree_.get(); for (auto itr = tree->begin(); itr != tree->end(); itr++) { if (!itr->inlined() && itr->Name() != "DeviceQueueOp") { MS_LOG(INFO) << itr->NameWithID() << " num_parallel_workers: " << itr->NumWorkers() @@ -106,7 +163,7 @@ Status AutoTune::LaunchThread() { } Status AutoTune::CollectOpsInfo() { - ExecutionTree *tree = tree_adapter_->tree_.get(); + ExecutionTree const *tree = tree_adapter_->tree_.get(); RETURN_UNEXPECTED_IF_NULL(tree); for (auto itr = tree->begin(); itr != tree->end(); ++itr) { ops_[itr->id()] = itr.get(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h index 3246189863b..2bdeb09d65f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h @@ -51,7 +51,18 @@ class AutoTune { Status Main(); /// \brief Helper to print the tree configuration - void PrintTreeConfiguration(); + void PrintTreeConfiguration() const; + +#ifndef ENABLE_ANDROID + /// \brief Serialize the dataset and save the AT config (workers and queue size) to a json file + /// \param file_name Name of the file + /// \return Status object + Status SaveAutotuneConfig(const std::string &file_name); + + /// Setter for autotune_config_json_ + /// \return Status code + Status SetAutotuneConfigJson(); +#endif /// Function to collect info from the tree /// \return Status code @@ -195,8 +206,14 @@ class AutoTune { /// True if should save AutoTune configuration bool save_autoconfig_; + /// Flag to enable saving of intermediate autotune config to disk + bool save_intermediate_autoconfig_{false}; + /// Filepath name of the final AutoTune Configuration JSON file std::string autotune_json_filepath_; + + /// Serialized json of the optimized ir tree that holds the updated configuration (workers and queue size) + nlohmann::json autotune_config_json_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/serdes.cc b/mindspore/ccsrc/minddata/dataset/engine/serdes.cc index 08ac35693c9..f0f1bc4904e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/serdes.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/serdes.cc @@ -30,6 +30,13 @@ std::map node, const std::string &filename, nlohmann::json *out_json) { RETURN_UNEXPECTED_IF_NULL(node); RETURN_UNEXPECTED_IF_NULL(out_json); + // If an optimized IR Tree is sent (use-case for MD AutoTune), ignore Top and EpochCtrl nodes + if (node->Name() == "Top" || node->Name() == "EpochCtrl") { + CHECK_FAIL_RETURN_UNEXPECTED( + node->Children().size() == 1, + "Expected " + node->Name() + " to have exactly 1 child but it has " + std::to_string(node->Children().size())); + return SaveToJSON(node->Children()[0], filename, out_json); + } // Dump attributes of current node to json string nlohmann::json args; RETURN_IF_NOT_OK(node->to_json(&args)); @@ -55,7 +62,7 @@ Status Serdes::SaveToJSON(std::shared_ptr node, const std::string & return Status::OK(); } -Status Serdes::SaveJSONToFile(nlohmann::json json_string, const std::string &file_name) { +Status Serdes::SaveJSONToFile(const nlohmann::json &json_string, const std::string &file_name) { try { std::optional dir = ""; std::optional local_file_name = ""; @@ -116,7 +123,7 @@ Status Serdes::ConstructPipeline(nlohmann::json json_obj, std::shared_ptr> datasets; - for (auto child_json_obj : json_obj["children"]) { + for (const auto &child_json_obj : json_obj["children"]) { RETURN_IF_NOT_OK(ConstructPipeline(child_json_obj, &child_ds)); datasets.push_back(child_ds); } @@ -380,6 +387,64 @@ Status Serdes::ParseMindIRPreprocess(const std::vector &map_json_st return Status::OK(); } +Status Serdes::UpdateOptimizedIRTreeJSON(nlohmann::json *serialized_json, + const std::map> &op_map) { + RETURN_UNEXPECTED_IF_NULL(serialized_json); + int32_t op_id = 0; + return RecurseUpdateOptimizedIRTreeJSON(serialized_json, &op_id, op_map); +} + +bool IsDatasetOpMatchIRNode(std::string_view ir_node_name, std::string_view dataset_op_name) { + // Helper function to match IR Node name to its dataset op name + if (ir_node_name == kSyncWaitNode) { + return dataset_op_name == kBarrierOp; + } else if (ir_node_name == kCifar10Node || ir_node_name == kCifar100Node) { + return dataset_op_name == "CifarOp"; + } else if (ir_node_name == kMindDataNode) { + return dataset_op_name == "MindRecordOp"; + } else if (ir_node_name == kRandomNode) { + return dataset_op_name == "RandomDataOp"; + } else if (ir_node_name == kTFRecordNode) { + return dataset_op_name == "TFReaderOp"; + } else if (ir_node_name == kIWSLT2016Node || ir_node_name == kIWSLT2017Node) { + return dataset_op_name == "IWSLTOp"; + } else { + // Generic way of matching, special cases handled above. Special cases will evolve over time. + return ir_node_name.substr(0, ir_node_name.find("Dataset")) == + dataset_op_name.substr(0, dataset_op_name.find("Op")); + } +} + +Status Serdes::RecurseUpdateOptimizedIRTreeJSON(nlohmann::json *serialized_json, int32_t *op_id, + const std::map> &op_map) { + RETURN_UNEXPECTED_IF_NULL(serialized_json); + RETURN_UNEXPECTED_IF_NULL(op_id); + + std::string ir_node_name = (*serialized_json)["op_type"]; + MS_LOG(INFO) << "Visiting IR Node: " << ir_node_name; + // Each IR Node should have a corresponding dataset node in the execution tree but the reverse is not necessarily true + while (!IsDatasetOpMatchIRNode(ir_node_name, op_map.find(*op_id)->second->Name())) { + // During the construction of execution tree, extra dataset nodes may have been inserted + // Skip dataset ops unless we get to the expected node + MS_LOG(INFO) << "\tSkipping dataset op: " << op_map.find(*op_id)->second->NameWithID(); + ++(*op_id); + CHECK_FAIL_RETURN_UNEXPECTED(*op_id < op_map.size(), "op_id is out of bounds"); + } + MS_LOG(INFO) << "\tMatch found for IR Node: " << ir_node_name + << " with dataset op: " << op_map.find(*op_id)->second->NameWithID(); + if (!op_map.find(*op_id)->second->inlined() && serialized_json->contains("num_parallel_workers") && + serialized_json->contains("connector_queue_size")) { + (*serialized_json)["num_parallel_workers"] = op_map.find(*op_id)->second->NumWorkers(); + (*serialized_json)["connector_queue_size"] = op_map.find(*op_id)->second->ConnectorCapacity(); + } + ++(*op_id); + auto num_children = (*serialized_json)["children"].size(); + for (int i = 0; i < num_children; ++i) { + RETURN_IF_NOT_OK(RecurseUpdateOptimizedIRTreeJSON(&(*serialized_json)["children"][i], op_id, op_map)); + } + return Status::OK(); +} + // In the current stage, there is a cyclic dependency between libmindspore.so and c_dataengine.so, // we make a C function here and dlopen by libminspore.so to avoid linking explicitly, // will be fix after decouling libminspore.so into multi submodules diff --git a/mindspore/ccsrc/minddata/dataset/engine/serdes.h b/mindspore/ccsrc/minddata/dataset/engine/serdes.h index 1e6d1ba657d..3321bccc8e8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/serdes.h +++ b/mindspore/ccsrc/minddata/dataset/engine/serdes.h @@ -154,6 +154,13 @@ class Serdes { /// \return Status The status code returned static Status SaveToJSON(std::shared_ptr node, const std::string &filename, nlohmann::json *out_json); + /// \brief Function to update the parameters [num_parallel_workers, connector_queue_size] in the serialized JSON + /// object of the optimized IR tree + /// \param[in, out] serialized_json The optimized ir tree json node + /// \param[in] op_map An ID to DatasetOp mapping + static Status UpdateOptimizedIRTreeJSON(nlohmann::json *serialized_json, + const std::map> &op_map); + /// \brief function to de-serialize JSON file to IR tree /// \param[in] json_filepath input path of json file /// \param[out] ds The deserialized dataset @@ -185,13 +192,13 @@ class Serdes { static Status ParseMindIRPreprocess(const std::vector &map_json_string, std::vector> *data_graph); - protected: /// \brief Helper function to save JSON to a file /// \param[in] json_string The JSON string to be saved to the file /// \param[in] file_name The file name /// \return Status The status code returned - static Status SaveJSONToFile(nlohmann::json json_string, const std::string &file_name); + static Status SaveJSONToFile(const nlohmann::json &json_string, const std::string &file_name); + protected: /// \brief Function to determine type of the node - dataset node if no dataset exists or operation node /// \param[in] child_ds children datasets that is already created /// \param[in] json_obj json object to read out type of the node @@ -221,6 +228,14 @@ class Serdes { static std::map *operation)> InitializeFuncPtr(); + /// \brief Helper function to perform recursive DFS on the optimized IR tree and to match each IR node with its + /// corresponding dataset op + /// \param [in, out] serialized_json The optimized ir tree json node + /// \param [in, out] op_id The id in execution tree from where to continue the IR Node - DatasetOp matching search + /// \param [in] op_map An ID to DatasetOp mapping + static Status RecurseUpdateOptimizedIRTreeJSON(nlohmann::json *serialized_json, int32_t *op_id, + const std::map> &op_map); + private: static std::map *operation)> func_ptr_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc index 592d466fb2f..2ae699105dd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc @@ -168,14 +168,11 @@ Status TreeAdapter::Build(std::shared_ptr root_ir) { RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir->Children()[0], &root_op)); RETURN_IF_NOT_OK(tree_->AssignRoot(root_op)); - // Note: We will gradually move the pre pass, optimizer pass, and post pass - // on ExecutionTree to perform on IR tree. // Prepare the tree RETURN_IF_NOT_OK(tree_->Prepare()); // After the tree is prepared, the col_name_id_map can safely be obtained column_name_map_ = tree_->root()->column_name_id_map(); - return Status::OK(); } @@ -219,7 +216,6 @@ Status TreeAdapter::Compile(std::shared_ptr input_ir, int32_t num_e RETURN_IF_NOT_OK(Build(root_ir_)); tree_state_ = kCompileStateReady; - return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h index 41ddb4abd1f..038c4499dfc 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h @@ -62,6 +62,8 @@ class TreeAdapter { // Return the root node of the IR after cloned from the parsed IR tree std::shared_ptr RootIRNode() const { return root_ir_; } + const ExecutionTree *GetExecutionTree() const { return tree_.get(); } + // This is the main method TreeConsumer uses to interact with TreeAdapter // 1. GetNext will Launch() the ExeTree on its first call by iterator (tree is already prepared) // 2. GetNext will return empty row when eoe/eof is obtained @@ -87,7 +89,6 @@ class TreeAdapter { // Return Offload Json nlohmann::json GetOffloadJson(); - #ifndef ENABLE_SECURITY /// \brief Setter for Profiling Manager Status SetProfilingManagerPtr(std::shared_ptr profiling_manager, @@ -119,8 +120,8 @@ class TreeAdapter { std::unordered_map column_name_map_; std::shared_ptr root_ir_; - std::unique_ptr tree_; // current connector capacity of root op, used for profiling - bool optimize_; // Flag to enable optional optimization pass + std::unique_ptr tree_; + bool optimize_; // Flag to enable optional optimization pass #ifndef ENABLE_SECURITY std::shared_ptr profiling_manager_; // Profiling manager std::shared_ptr tracing_; // trace profiling data diff --git a/tests/st/dataset/test_gpu_autotune.py b/tests/st/dataset/test_gpu_autotune.py index 1099b76ae7f..04ed8391ae1 100644 --- a/tests/st/dataset/test_gpu_autotune.py +++ b/tests/st/dataset/test_gpu_autotune.py @@ -45,7 +45,7 @@ def create_model(): return model_simple -def create_dataset(data_path, batch_size=32, repeat_size=1, num_parallel_workers=1): +def create_dataset(data_path, batch_size=32, num_parallel_workers=1): """ Create dataset for train or test """ @@ -71,12 +71,7 @@ def create_dataset(data_path, batch_size=32, repeat_size=1, num_parallel_workers mnist_ds = mnist_ds.map(operations=rescale_op, input_columns="image", num_parallel_workers=num_parallel_workers) mnist_ds = mnist_ds.map(operations=rescale_nml_op, input_columns="image", num_parallel_workers=num_parallel_workers) mnist_ds = mnist_ds.map(operations=hwc2chw_op, input_columns="image", num_parallel_workers=num_parallel_workers) - - # Apply DatasetOps - buffer_size = 10000 - mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size) mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True) - mnist_ds = mnist_ds.repeat(repeat_size) return mnist_ds @@ -85,29 +80,44 @@ def create_dataset(data_path, batch_size=32, repeat_size=1, num_parallel_workers @pytest.mark.platform_x86_gpu_training @pytest.mark.env_onecard @pytest.mark.forked -def test_autotune_train_simple_model(): +def test_autotune_train_simple_model(tmp_path): """ Feature: Dataset AutoTune - Description: Test Dataset AutoTune for Training of a Simple Model - Expectation: Training completes successfully + Description: Test Dataset AutoTune for training of a simple model and deserialize the written at config file + Expectation: Training and data deserialization completes successfully """ original_seed = ds.config.get_seed() set_seed(1) context.set_context(mode=context.GRAPH_MODE, device_target="GPU") context.set_context(enable_graph_kernel=True) + at_config_filename = "test_autotune_train_simple_model_at_config.json" # Enable Dataset AutoTune original_autotune = ds.config.get_enable_autotune() - ds.config.set_enable_autotune(True) + ds.config.set_enable_autotune(True, str(tmp_path) + at_config_filename) - ds_train = create_dataset(os.path.join("/home/workspace/mindspore_dataset/mnist", "train"), 32, 1) + ds_train = create_dataset(os.path.join("/home/workspace/mindspore_dataset/mnist", "train"), 32) model = create_model() - print("Start Training.") + print("Start training.") epoch_size = 10 + start_time = time.time() model.train(epoch_size, ds_train) - print("Training is finished.") + print("Training finished. Took {}s".format(time.time() - start_time)) + + ds.config.set_enable_autotune(False) + + ds_train_deserialized = ds.deserialize(json_filepath=str(tmp_path) + at_config_filename) + + num = 0 + for data1, data2 in zip(ds_train.create_dict_iterator(num_epochs=1, output_numpy=True), + ds_train_deserialized.create_dict_iterator(num_epochs=1, output_numpy=True)): + np.testing.assert_array_equal(data1['image'], data2['image']) + np.testing.assert_array_equal(data1['label'], data2['label']) + num += 1 + + assert num == 1875 # Restore settings ds.config.set_enable_autotune(original_autotune) @@ -188,5 +198,5 @@ def test_autotune_pymultiproc_train_simple_model(): if __name__ == "__main__": - test_autotune_train_simple_model() + test_autotune_train_simple_model("") test_autotune_pymultiproc_train_simple_model() diff --git a/tests/ut/cpp/dataset/ir_tree_adapter_test.cc b/tests/ut/cpp/dataset/ir_tree_adapter_test.cc index f928bacf2bf..d9d2c87eb54 100644 --- a/tests/ut/cpp/dataset/ir_tree_adapter_test.cc +++ b/tests/ut/cpp/dataset/ir_tree_adapter_test.cc @@ -21,24 +21,15 @@ #include "minddata/dataset/include/dataset/transforms.h" // IR non-leaf nodes -#include "minddata/dataset/engine/ir/datasetops/batch_node.h" #include "minddata/dataset/engine/ir/datasetops/bucket_batch_by_length_node.h" -#include "minddata/dataset/engine/ir/datasetops/concat_node.h" -#include "minddata/dataset/engine/ir/datasetops/map_node.h" -#include "minddata/dataset/engine/ir/datasetops/project_node.h" -#include "minddata/dataset/engine/ir/datasetops/rename_node.h" -#include "minddata/dataset/engine/ir/datasetops/shuffle_node.h" -#include "minddata/dataset/engine/ir/datasetops/skip_node.h" -#include "minddata/dataset/engine/ir/datasetops/zip_node.h" #include "minddata/dataset/engine/tree_modifier.h" +#include "minddata/dataset/engine/serdes.h" using namespace mindspore::dataset; using mindspore::dataset::Tensor; -class MindDataTestTreeAdapter : public UT::DatasetOpTesting { - protected: -}; +class MindDataTestTreeAdapter : public UT::DatasetOpTesting {}; TEST_F(MindDataTestTreeAdapter, TestSimpleTreeAdapter) { MS_LOG(INFO) << "Doing MindDataTestTreeAdapter-TestSimpleTreeAdapter."; @@ -148,6 +139,121 @@ TEST_F(MindDataTestTreeAdapter, TestProjectMapTreeAdapter) { EXPECT_TRUE(err_msg.find("EOF buffer encountered.") != err_msg.npos); } +// Feature: Test for Serializing and Deserializing an optimized IR Tree after the tree has been modified with +// TreeModifier or in other words through Autotune indirectly. +// Description: Create a simple tree, modify the workers and queue size, serialize the optimized IR Tree, obtain a new +// tree with deserialize and then compare the output of serializing the new optimized IR tree with the first tree. +// Expectation: No failures. +TEST_F(MindDataTestTreeAdapter, TestOptimizedTreeSerializeDeserializeForAutoTune) { + MS_LOG(INFO) << "Doing MindDataTestTreeAdapter-TestOptimizedTreeSerializeDeserializeForAutoTune."; + + // Create a CSVDataset, with single CSV file + std::string train_file = datasets_root_path_ + "/testCSV/1.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + std::shared_ptr ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kFalse); + ASSERT_NE(ds, nullptr); + ds = ds->Project({"col1"}); + ASSERT_NE(ds, nullptr); + ds = ds->Repeat(2); + ASSERT_NE(ds, nullptr); + auto to_number = std::make_shared(mindspore::DataType::kNumberTypeInt32); + ASSERT_NE(to_number, nullptr); + ds = ds->Map({to_number}, {"col1"}, {"col1"}); + ds->SetNumWorkers(1); + ds = ds->Batch(1); + ds->SetNumWorkers(1); + + // Create a tree adapter and compile the IR Tree + auto tree_adapter1 = std::make_shared(); + ASSERT_OK(tree_adapter1->Compile(ds->IRNode(), 1)); + + // Change num_parallel_workers and connector_queue_size for some ops + auto tree_modifier = std::make_unique(tree_adapter1.get()); + tree_modifier->AddChangeRequest(1, std::make_shared(10)); + tree_modifier->AddChangeRequest(1, std::make_shared(20)); + tree_modifier->AddChangeRequest(0, std::make_shared(100)); + tree_modifier->AddChangeRequest(0, std::make_shared(10)); + + std::vector expected_result = {1, 5, 9, 1, 5, 9}; + TensorRow row; + + uint64_t i = 0; + ASSERT_OK(tree_adapter1->GetNext(&row)); + while (!row.empty()) { + auto tensor = row[0]; + int32_t num; + ASSERT_OK(tensor->GetItemAt(&num, {0})); + EXPECT_EQ(num, expected_result[i]); + ASSERT_OK(tree_adapter1->GetNext(&row)); + i++; + } + // Expect 6 samples + EXPECT_EQ(i, 6); + + // Serialize the optimized IR Tree + nlohmann::json out_json; + ASSERT_OK(Serdes::SaveToJSON(tree_adapter1->RootIRNode(), "", &out_json)); + + // Check that updated values of num_parallel_workers and connector_queue_size are not reflected in the json + EXPECT_EQ(out_json["op_type"], "Batch"); + EXPECT_NE(out_json["num_parallel_workers"], 10); + EXPECT_NE(out_json["connector_queue_size"], 100); + + EXPECT_EQ(out_json["children"][0]["op_type"], "Map"); + EXPECT_NE(out_json["children"][0]["num_parallel_workers"], 10); + EXPECT_NE(out_json["children"][0]["connector_queue_size"], 20); + + // Create an op_id to dataset op mapping + std::map> op_mapping; + auto tree = tree_adapter1->GetExecutionTree(); + ASSERT_NE(tree, nullptr); + + for (auto itr = tree->begin(); itr != tree->end(); ++itr) { + op_mapping[itr->id()] = itr.get(); + } + + // Update the serialized JSON object of the optimized IR tree + ASSERT_OK(Serdes::UpdateOptimizedIRTreeJSON(&out_json, op_mapping)); + + // Check that updated values of num_parallel_workers and connector_queue_size are reflected in the json now + EXPECT_EQ(out_json["op_type"], "Batch"); + EXPECT_EQ(out_json["num_parallel_workers"], 10); + EXPECT_EQ(out_json["connector_queue_size"], 100); + + EXPECT_EQ(out_json["children"][0]["op_type"], "Map"); + EXPECT_EQ(out_json["children"][0]["num_parallel_workers"], 10); + EXPECT_EQ(out_json["children"][0]["connector_queue_size"], 20); + + // Deserialize the above updated serialized optimized IR Tree + std::shared_ptr deserialized_node; + ASSERT_OK(Serdes::ConstructPipeline(out_json, &deserialized_node)); + + // Create a new tree adapter and compile the IR Tree obtained from deserialization above + auto tree_adapter2 = std::make_shared(); + ASSERT_OK(tree_adapter2->Compile(deserialized_node, 1)); + + // Serialize the new optimized IR Tree + nlohmann::json out_json1; + ASSERT_OK(Serdes::SaveToJSON(tree_adapter2->RootIRNode(), "", &out_json1)); + + // Ensure that both the serialized outputs are equal + EXPECT_TRUE(out_json == out_json1); + + i = 0; + ASSERT_OK(tree_adapter2->GetNext(&row)); + while (!row.empty()) { + auto tensor = row[0]; + int32_t num; + ASSERT_OK(tensor->GetItemAt(&num, {0})); + EXPECT_EQ(num, expected_result[i]); + ASSERT_OK(tree_adapter2->GetNext(&row)); + i++; + } + + // Expect 6 samples + EXPECT_EQ(i, 6); +} + // Feature: Basic test for TreeModifier // Description: Create simple tree and modify the tree by adding workers, change queue size and then removing workers // Expectation: No failures. @@ -193,7 +299,7 @@ TEST_F(MindDataTestTreeAdapter, TestSimpleTreeModifier) { uint64_t i = 0; ASSERT_OK(tree_adapter->GetNext(&row)); - while (row.size() != 0) { + while (!row.empty()) { auto tensor = row[0]; int32_t num; ASSERT_OK(tensor->GetItemAt(&num, {0})); @@ -232,7 +338,7 @@ TEST_F(MindDataTestTreeAdapter, TestTreeModifierMindRecord) { // Iterate the dataset and collect the file_names in the dataset ASSERT_OK(tree_adapter->GetNext(&row)); uint64_t i = 0; - while (row.size() != 0) { + while (!row.empty()) { auto tensor = row[0]; std::string_view sv; ASSERT_OK(tensor->GetItemAt(&sv, {})); @@ -255,7 +361,7 @@ TEST_F(MindDataTestTreeAdapter, TestTreeModifierMindRecord) { i = 0; ASSERT_OK(tree_adapter2->GetNext(&row)); - while (row.size() != 0) { + while (!row.empty()) { auto tensor = row[0]; std::string_view sv; ASSERT_OK(tensor->GetItemAt(&sv, {})); @@ -278,7 +384,7 @@ TEST_F(MindDataTestTreeAdapter, TestTreeModifierMindRecord) { i = 0; ASSERT_OK(tree_adapter3->GetNext(&row)); - while (row.size() != 0) { + while (!row.empty()) { auto tensor = row[0]; std::string_view sv; ASSERT_OK(tensor->GetItemAt(&sv, {})); @@ -290,4 +396,4 @@ TEST_F(MindDataTestTreeAdapter, TestTreeModifierMindRecord) { } // Expect 20 samples EXPECT_EQ(i, 20); -} \ No newline at end of file +} diff --git a/tests/ut/data/dataset/tf_file_dataset/pyvision_dataset_pipeline.json b/tests/ut/data/dataset/tf_file_dataset/pyvision_dataset_pipeline.json index 7f13754314e..518525d4555 100644 --- a/tests/ut/data/dataset/tf_file_dataset/pyvision_dataset_pipeline.json +++ b/tests/ut/data/dataset/tf_file_dataset/pyvision_dataset_pipeline.json @@ -1 +1 @@ -{"callback":[],"children":[{"callback":[],"children":[{"children":[],"columns_list":["image","label"],"dataset_files":["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"],"num_parallel_workers":8,"num_samples":0,"num_shards":1,"op_type":"TFRecordDataset","schema_file_path":"../data/dataset/test_tf_file_3_images/datasetSchema.json","shard_equal_rows":false,"shard_id":0,"shuffle":0,"shuffle_files":false,"shuffle_global":false}],"input_columns":["image"],"num_parallel_workers":8,"op_type":"Map","operations":[{"python_module":"mindspore.dataset.transforms.py_transforms","tensor_op_name":"Compose","tensor_op_params":{"random":false,"transforms":[{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"Decode","tensor_op_params":{"random":false}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"CenterCrop","tensor_op_params":{"random":false,"size":[32,32]}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"ToTensor","tensor_op_params":{"output_type":"float32","random":false}}]}}],"output_columns":["image"],"project_columns":[]}],"input_columns":["image"],"num_parallel_workers":8,"op_type":"Map","operations":[{"python_module":"mindspore.dataset.transforms.py_transforms","tensor_op_name":"RandomApply","tensor_op_params":{"prob":0.5,"transforms":[{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"RandomColorAdjust","tensor_op_params":{"brightness":[1,1],"contrast":[1,1],"hue":[0,0],"saturation":[1,1]}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"FiveCrop","tensor_op_params":{"random":false,"size":1}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"Grayscale","tensor_op_params":{"num_output_channels":1,"random":false}},{"python_module":"mindspore.dataset.transforms.py_transforms","tensor_op_name":"OneHotOp","tensor_op_params":{"num_classes":1,"random":false,"smoothing_rate":0.0}}]}}],"output_columns":["image"],"project_columns":[]} \ No newline at end of file +{"callback":[],"children":[{"callback":[],"children":[{"children":[],"columns_list":["image","label"],"dataset_files":["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"],"num_parallel_workers":8,"connector_queue_size":8,"num_samples":0,"num_shards":1,"op_type":"TFRecordDataset","schema_file_path":"../data/dataset/test_tf_file_3_images/datasetSchema.json","shard_equal_rows":false,"shard_id":0,"shuffle":0,"shuffle_files":false,"shuffle_global":false}],"input_columns":["image"],"num_parallel_workers":8,"connector_queue_size":8,"op_type":"Map","operations":[{"python_module":"mindspore.dataset.transforms.py_transforms","tensor_op_name":"Compose","tensor_op_params":{"random":false,"transforms":[{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"Decode","tensor_op_params":{"random":false}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"CenterCrop","tensor_op_params":{"random":false,"size":[32,32]}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"ToTensor","tensor_op_params":{"output_type":"float32","random":false}}]}}],"output_columns":["image"],"project_columns":[]}],"input_columns":["image"],"num_parallel_workers":8,"connector_queue_size":8,"op_type":"Map","operations":[{"python_module":"mindspore.dataset.transforms.py_transforms","tensor_op_name":"RandomApply","tensor_op_params":{"prob":0.5,"transforms":[{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"RandomColorAdjust","tensor_op_params":{"brightness":[1,1],"contrast":[1,1],"hue":[0,0],"saturation":[1,1]}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"FiveCrop","tensor_op_params":{"random":false,"size":1}},{"python_module":"mindspore.dataset.vision.py_transforms","tensor_op_name":"Grayscale","tensor_op_params":{"num_output_channels":1,"random":false}},{"python_module":"mindspore.dataset.transforms.py_transforms","tensor_op_name":"OneHotOp","tensor_op_params":{"num_classes":1,"random":false,"smoothing_rate":0.0}}]}}],"output_columns":["image"],"project_columns":[]} \ No newline at end of file diff --git a/tests/ut/python/dataset/test_autotune_saveload.py b/tests/ut/python/dataset/test_autotune_saveload.py index aaf087e91cb..898dd497c2f 100644 --- a/tests/ut/python/dataset/test_autotune_saveload.py +++ b/tests/ut/python/dataset/test_autotune_saveload.py @@ -16,12 +16,15 @@ Test Dataset AutoTune's Save and Load Configuration support """ import filecmp + import numpy as np import pytest import mindspore.dataset as ds import mindspore.dataset.transforms.c_transforms as c_transforms +import mindspore.dataset.vision.c_transforms as c_vision MNIST_DATA_DIR = "../data/dataset/testMnistData" +DATA_DIR = "../data/dataset/testPK/data" @pytest.mark.forked @@ -36,6 +39,7 @@ class TestAutotuneSaveLoad: Description: Test save final config with GeneratorDataset pipeline: Generator -> Shuffle -> Batch Expectation: pipeline runs successfully """ + original_autotune = ds.config.get_enable_autotune() ds.config.set_enable_autotune(True, str(tmp_path) + "test_autotune_generator_atfinal.json") source = [(np.array([x]),) for x in range(1024)] @@ -50,17 +54,18 @@ class TestAutotuneSaveLoad: for _ in itr: pass - ds.config.set_enable_autotune(False) + ds.config.set_enable_autotune(original_autotune) @staticmethod - def skip_test_autotune_mnist_pipeline(tmp_path): + def test_autotune_mnist_pipeline(tmp_path): """ Feature: Autotuning Description: Test save final config with Mnist pipeline: Mnist -> Batch -> Map Expectation: pipeline runs successfully """ + original_autotune = ds.config.get_enable_autotune() ds.config.set_enable_autotune(True, str(tmp_path) + "test_autotune_mnist_pipeline_atfinal.json") - + original_seed = ds.config.get_seed() ds.config.set_seed(1) data1 = ds.MnistDataset(MNIST_DATA_DIR, num_samples=100) @@ -74,7 +79,7 @@ class TestAutotuneSaveLoad: for _ in data1.create_dict_iterator(num_epochs=1, output_numpy=True): pass - ds.config.set_enable_autotune(False) + ds.config.set_enable_autotune(original_autotune) # Confirm final AutoTune config file is identical to the serialized file. assert filecmp.cmp(str(tmp_path) + "test_autotune_mnist_pipeline_atfinal.json", @@ -91,6 +96,8 @@ class TestAutotuneSaveLoad: num += 1 assert num == 10 + ds.config.set_seed(original_seed) + @staticmethod def test_autotune_save_overwrite_generator(tmp_path): """ @@ -102,7 +109,7 @@ class TestAutotuneSaveLoad: source = [(np.array([x]),) for x in range(1024)] at_final_json_filename = "test_autotune_save_overwrite_generator_atfinal.json" - + original_autotune = ds.config.get_enable_autotune() ds.config.set_enable_autotune(True, str(tmp_path) + at_final_json_filename) data1 = ds.GeneratorDataset(source, ["data"]) @@ -120,20 +127,22 @@ class TestAutotuneSaveLoad: for _ in data2.create_dict_iterator(num_epochs=1, output_numpy=True): pass - ds.config.set_enable_autotune(False) + ds.config.set_enable_autotune(original_autotune) @staticmethod - def skip_test_autotune_save_overwrite_mnist(tmp_path): + def test_autotune_save_overwrite_mnist(tmp_path): """ Feature: Autotuning Description: Test set_enable_autotune and existing json_filepath is overwritten Expectation: set_enable_autotune() executes successfully with file-exist warning produced. Execution of 2nd pipeline overwrites AutoTune configuration file of 1st pipeline. """ + original_seed = ds.config.get_seed() ds.config.set_seed(1) at_final_json_filename = "test_autotune_save_overwrite_mnist_atfinal.json" # Pipeline#1 + original_autotune = ds.config.get_enable_autotune() ds.config.set_enable_autotune(True, str(tmp_path) + at_final_json_filename) data1 = ds.MnistDataset(MNIST_DATA_DIR, num_samples=100) @@ -170,3 +179,41 @@ class TestAutotuneSaveLoad: # Confirm the serialized files for the 2 different pipelines are different assert not filecmp.cmp(str(tmp_path) + "test_autotune_save_overwrite_mnist_serialized1.json", str(tmp_path) + "test_autotune_save_overwrite_mnist_serialized2.json") + + ds.config.set_seed(original_seed) + ds.config.set_enable_autotune(original_autotune) + + @staticmethod + def test_autotune_warning_with_offload(tmp_path, capfd): + """ + Feature: Autotuning + Description: Test autotune config saving with offload=True + Expectation: Autotune should not write the config file and print a log message + """ + original_seed = ds.config.get_seed() + ds.config.set_seed(1) + at_final_json_filename = "test_autotune_warning_with_offload_config.json" + config_path = tmp_path / at_final_json_filename + original_autotune = ds.config.get_enable_autotune() + ds.config.set_enable_autotune(True, str(config_path)) + + # Dataset with offload activated. + dataset = ds.ImageFolderDataset(DATA_DIR) + dataset = dataset.map(operations=[c_vision.Decode()], input_columns="image") + dataset = dataset.map(operations=[c_vision.HWC2CHW()], input_columns="image", offload=True) + dataset = dataset.batch(8, drop_remainder=True) + + for _ in dataset.create_tuple_iterator(num_epochs=1, output_numpy=True): + pass + + _, err = capfd.readouterr() + + assert "Some nodes have been offloaded. AutoTune is unable to write the autotune configuration to disk. " \ + "Disable offload to prevent this from happening." in err + + with pytest.raises(FileNotFoundError): + with open(config_path) as _: + pass + + ds.config.set_enable_autotune(original_autotune) + ds.config.set_seed(original_seed)