diff --git a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt index 861caa85eb..1baea261f2 100644 --- a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt @@ -66,6 +66,7 @@ add_dependencies(kernels core) add_dependencies(engine-datasetops-source core) add_dependencies(engine-datasetops-source-sampler core) add_dependencies(engine-datasetops core) +add_dependencies(engine-datasetops-mapop core) add_dependencies(engine-opt core) add_dependencies(engine-perf core) add_dependencies(engine-gnn core) @@ -89,6 +90,7 @@ set(submodules $ $ $ + $ $ $ $ diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index 7265271098..b02de78d74 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -27,7 +27,7 @@ #include "minddata/dataset/engine/datasetops/source/voc_op.h" // Dataset operator headers (in alphabetical order) #include "minddata/dataset/engine/datasetops/batch_op.h" -#include "minddata/dataset/engine/datasetops/map_op.h" +#include "minddata/dataset/engine/datasetops/map_op/map_op.h" #include "minddata/dataset/engine/datasetops/project_op.h" #include "minddata/dataset/engine/datasetops/rename_op.h" #include "minddata/dataset/engine/datasetops/repeat_op.h" @@ -537,9 +537,6 @@ std::vector> MapDataset::Build() { // A vector containing shared pointer to the Dataset Ops that this object will create std::vector> node_ops; - // Currently default is true, and this is not exposed to user. - bool perf_mode = true; - std::vector> tensor_ops; // Build tensorOp from tensorOperation vector @@ -550,8 +547,7 @@ std::vector> MapDataset::Build() { // This parameter will be removed with next rebase std::vector col_orders; - auto map_op = - std::make_shared(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_, perf_mode); + auto map_op = std::make_shared(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_); if (!project_columns_.empty()) { auto project_op = std::make_shared(project_columns_); node_ops.push_back(project_op); diff --git a/mindspore/ccsrc/minddata/dataset/core/client.h b/mindspore/ccsrc/minddata/dataset/core/client.h index 5e3907562e..b538bb20e1 100644 --- a/mindspore/ccsrc/minddata/dataset/core/client.h +++ b/mindspore/ccsrc/minddata/dataset/core/client.h @@ -39,7 +39,7 @@ #include "minddata/dataset/engine/datasetops/batch_op.h" #include "minddata/dataset/engine/datasetops/dataset_op.h" #include "minddata/dataset/engine/datasetops/device_queue_op.h" -#include "minddata/dataset/engine/datasetops/map_op.h" +#include "minddata/dataset/engine/datasetops/map_op/map_op.h" #include "minddata/dataset/engine/datasetops/project_op.h" #include "minddata/dataset/engine/datasetops/rename_op.h" #include "minddata/dataset/engine/datasetops/repeat_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/core/constants.h b/mindspore/ccsrc/minddata/dataset/core/constants.h index 39573dbc90..8c8c0044a6 100644 --- a/mindspore/ccsrc/minddata/dataset/core/constants.h +++ b/mindspore/ccsrc/minddata/dataset/core/constants.h @@ -26,6 +26,9 @@ namespace dataset { using uchar = unsigned char; using dsize_t = int64_t; +// Target devices to perform map operation +enum class MapTargetDevice { kCpu, kGpu, kDvpp }; + // Possible dataset types for holding the data and client type enum class DatasetType { kUnknown, kArrow, kTf }; diff --git a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt index e3ead16d05..1a9dd56024 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt @@ -19,8 +19,8 @@ target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS}) if (ENABLE_TDTQUE) add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt engine-gnn engine-perf - engine-cache-client engine-cache-server) + engine-cache-client engine-cache-server engine-datasetops-mapop) else () add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn engine-perf - engine-cache-client engine-cache-server) + engine-cache-client engine-cache-server engine-datasetops-mapop) endif () diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/datasetops/CMakeLists.txt index 96446f167a..ef97f3322e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(source) +add_subdirectory(map_op) file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) @@ -9,7 +10,6 @@ set(DATASET_ENGINE_DATASETOPS_SRC_FILES pipeline_op.cc batch_op.cc device_queue_op.cc - map_op.cc project_op.cc rename_op.cc repeat_op.cc @@ -37,4 +37,3 @@ if (ENABLE_PYTHON) endif() add_library(engine-datasetops OBJECT ${DATASET_ENGINE_DATASETOPS_SRC_FILES}) - diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/CMakeLists.txt new file mode 100644 index 0000000000..a877016c9c --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/CMakeLists.txt @@ -0,0 +1,10 @@ +file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") +set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) + +set(DATASET_ENGINE_DATASETOPS_MAPOP_SRC_FILES + map_op.cc + cpu_map_job.cc + gpu_map_job.cc + ) + +add_library(engine-datasetops-mapop OBJECT ${DATASET_ENGINE_DATASETOPS_MAPOP_SRC_FILES}) diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.cc new file mode 100644 index 0000000000..8b6b753aae --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.cc @@ -0,0 +1,56 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h" + +namespace mindspore { +namespace dataset { + +// Constructor +CpuMapJob::CpuMapJob() = default; + +// Constructor +CpuMapJob::CpuMapJob(std::vector> operations) : MapJob(operations) {} + +// Destructor +CpuMapJob::~CpuMapJob() = default; + +// A function to execute a cpu map job +Status CpuMapJob::Run(std::vector in, std::vector *out) { + int32_t num_rows = in.size(); + for (int32_t row = 0; row < num_rows; row++) { + TensorRow input_row = in[row]; + TensorRow result_row; + for (size_t i = 0; i < ops_.size(); i++) { + // Call compute function for cpu + RETURN_IF_NOT_OK(ops_[i]->Compute(input_row, &result_row)); + + // Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list. + if (i + 1 < ops_.size()) { + input_row = std::move(result_row); + } + } + out->push_back(std::move(result_row)); + } + + return Status::OK(); +} + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.h new file mode 100644 index 0000000000..330b676865 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.h @@ -0,0 +1,43 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_ +#define DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_ + +#include +#include +#include "minddata/dataset/engine/datasetops/map_op/map_job.h" + +namespace mindspore { +namespace dataset { +class CpuMapJob : public MapJob { + public: + // Constructor + CpuMapJob(); + + // Constructor + explicit CpuMapJob(std::vector> operations); + + // Destructor + ~CpuMapJob(); + + // A pure virtual run function to execute a cpu map job + Status Run(std::vector in, std::vector *out) override; +}; + +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/gpu_map_job.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/gpu_map_job.cc new file mode 100644 index 0000000000..64502d6da2 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/gpu_map_job.cc @@ -0,0 +1,37 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h" + +namespace mindspore { +namespace dataset { + +// Constructor +GpuMapJob::GpuMapJob(std::vector> operations) : MapJob(operations) {} + +// Destructor +GpuMapJob::~GpuMapJob() = default; + +// A function to execute a cpu map job +Status GpuMapJob::Run(std::vector in, std::vector *out) { + // Do nothing for now + return Status::OK(); +} + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/gpu_map_job.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/gpu_map_job.h new file mode 100644 index 0000000000..743c5104c9 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/gpu_map_job.h @@ -0,0 +1,40 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_ +#define DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_ + +#include +#include +#include "minddata/dataset/engine/datasetops/map_op/map_job.h" + +namespace mindspore { +namespace dataset { +class GpuMapJob : public MapJob { + public: + // Constructor + explicit GpuMapJob(std::vector> operations); + + // Destructor + ~GpuMapJob(); + + // A pure virtual run function to execute a cpu map job + Status Run(std::vector in, std::vector *out) override; +}; + +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_job.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_job.h new file mode 100644 index 0000000000..fd05dfd53f --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_job.h @@ -0,0 +1,55 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_ +#define DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_ + +#include +#include + +#include "minddata/dataset/kernels/tensor_op.h" +#include "minddata/dataset/core/tensor.h" +#include "minddata/dataset/core/tensor_row.h" +#include "minddata/dataset/util/status.h" + +namespace mindspore { +namespace dataset { +class MapJob { + public: + // Constructor + explicit MapJob(std::vector> operations) : ops_(operations) {} + + // Constructor + MapJob() = default; + + // Destructor + ~MapJob() = default; + + Status AddOperation(std::shared_ptr operation) { + ops_.push_back(operation); + return Status::OK(); + } + + // A pure virtual run function to execute a particular map job + virtual Status Run(std::vector in, std::vector *out) = 0; + + protected: + std::vector> ops_; +}; + +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc similarity index 62% rename from mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op.cc rename to mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc index bfd0962ae8..89c06f4917 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "minddata/dataset/engine/datasetops/map_op.h" +#include #include #include #include @@ -28,6 +28,9 @@ #include "minddata/dataset/engine/db_connector.h" #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/engine/opt/pass.h" +#include "minddata/dataset/engine/datasetops/map_op/map_op.h" +#include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h" +#include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h" #include "minddata/dataset/kernels/tensor_op.h" #include "utils/log_adapter.h" #include "minddata/dataset/util/task_manager.h" @@ -35,7 +38,7 @@ namespace mindspore { namespace dataset { // Builder constructor. Creates the builder object. -MapOp::Builder::Builder() : build_perf_mode_(true) { +MapOp::Builder::Builder() { std::shared_ptr cfg = GlobalContext::config_manager(); build_num_workers_ = cfg->num_parallel_workers(); build_op_connector_size_ = cfg->op_connector_size(); @@ -54,31 +57,27 @@ Status MapOp::Builder::sanityCheck() const { Status MapOp::Builder::Build(std::shared_ptr *ptr) { RETURN_IF_NOT_OK(sanityCheck()); *ptr = std::make_shared(std::move(build_in_col_names_), std::move(build_out_col_names_), - std::move(build_tensor_funcs_), build_num_workers_, build_op_connector_size_, - build_perf_mode_); + std::move(build_tensor_funcs_), build_num_workers_, build_op_connector_size_); return Status::OK(); } // Constructor of MapOp MapOp::MapOp(const std::vector &in_col_names, const std::vector &out_col_names, - std::vector> tensor_funcs, int32_t num_workers, int32_t op_connector_size, - bool perf_mode) + std::vector> tensor_funcs, int32_t num_workers, int32_t op_connector_size) : ParallelOp(num_workers, op_connector_size), tfuncs_(std::move(tensor_funcs)), in_columns_(in_col_names), - out_columns_(out_col_names), - perf_mode_(perf_mode) { + out_columns_(out_col_names) { // If caller didn't specify the out_col_names, assume they are same as the in_columns. if (out_columns_.empty() || out_columns_[0].empty()) { out_columns_ = in_columns_; } - MS_LOG(DEBUG) << "Performance Mode in map operator is " << perf_mode_ << "."; } // The number of threads consuming data from previous op's output Connector. int32_t MapOp::num_consumers() const { // When Performance Mode is on, there is only one thread consuming from the previous Connector. - return perf_mode_ == true ? 1 : num_workers_; + return 1; } // A print method typically used for debugging @@ -106,36 +105,98 @@ void MapOp::Print(std::ostream &out, bool show_all) const { } } +// A helper function that fetch worker map job from local queues and extract the data and map job list +Status MapOp::FetchNextWork(uint32_t worker_id, std::unique_ptr *db, + std::vector> *job_list) { + std::unique_ptr worker_job; + // Fetch the next worker job and data buffer + RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&worker_job)); + // Extract the databuffer and job list from the map worker job. + *db = std::move(worker_job->databuffer); + *job_list = std::move(worker_job->jobs); + + return Status::OK(); +} + +Status MapOp::GenerateWorkerJob(const std::unique_ptr *worker_job) { + std::shared_ptr map_job = nullptr; + MapTargetDevice prev_target; + for (size_t i = 0; i < tfuncs_.size(); i++) { + // Currently we only have CPU as the device target + // In the future, we will have heuristic or control from user to select target device + // MapTargetDevice target_device; + // RETURN_IF_NOT_OK(SelectTarget(tfuncs_[i], &target_device)); + MapTargetDevice target_device = MapTargetDevice::kCpu; + + switch (target_device) { + case MapTargetDevice::kCpu: + // If there is no existing map_job, we will create one. + // map_job could be nullptr when we are at the first tensor op or when the target device of the prev op + // is different with that of the current op. + if (map_job == nullptr) { + map_job = std::make_shared(); + } + map_job->AddOperation(tfuncs_[i]); + break; + + case MapTargetDevice::kGpu: + break; + + case MapTargetDevice::kDvpp: + break; + + default: + break; + } + + // Push map_job into worker_job if one of the two conditions is true: + // 1) It is the last tensor operation in tfuncs_ + // 2) The the target device of the current tensor operation is different with previous one + if ((i + 1 == tfuncs_.size()) || ((i != 0) && (prev_target != target_device))) { + (*worker_job)->jobs.push_back(std::move(map_job)); + } + + prev_target = target_device; + } + + return Status::OK(); +} + // This class functor will provide the master loop that drives the logic for performing the work Status MapOp::operator()() { - if (perf_mode_) { - // Create and register the local queues. - local_queues_.Init(num_workers_, oc_queue_size_); - Status rc = local_queues_.Register(tree_->AllTasks()); - if (rc.IsError()) { - TaskManager::FindMe()->Post(); - return rc; - } + // Create and register the local queues. + local_queues_.Init(num_workers_, oc_queue_size_); + Status rc = local_queues_.Register(tree_->AllTasks()); + if (rc.IsError()) { + TaskManager::FindMe()->Post(); + return rc; } // The operator class just starts off threads by calling the tree_ function - Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1)); + rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1)); // Synchronize with TaskManager TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(rc); - if (perf_mode_) { - int64_t que_id = 0; - std::unique_ptr buff; - bool is_eof = false; - // Draining output connector of the previous op and distribute it to local queues. - // Stop when all worker threads are finished (received EOF). - while (!is_eof) { - RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0)); - is_eof = buff->eof(); - RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(buff))); - que_id = (que_id + 1) % num_workers_; - } + int64_t que_id = 0; + std::unique_ptr buff; + bool is_eof = false; + // Drain output connector of the previous op, generate jobs for worker threads, and distribute them via local queues + // Stop when all worker threads are finished (received EOF) + while (!is_eof) { + RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0)); + is_eof = buff->eof(); + + // Create an empty map worker job to be populated by a databuffer and map jobs + std::unique_ptr worker_job = std::make_unique(); + worker_job->databuffer = std::move(buff); + + // Populate map worker job for a worker to execute + RETURN_IF_NOT_OK(GenerateWorkerJob(&worker_job)); + + // Push map worker job to the corresponding worker's queue + RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(worker_job))); + que_id = (que_id + 1) % num_workers_; } return Status::OK(); @@ -148,12 +209,11 @@ Status MapOp::operator()() { Status MapOp::WorkerEntry(int32_t worker_id) { // Handshake with TaskManager that thread creation is successful. TaskManager::FindMe()->Post(); - std::unique_ptr in_buffer; - // Getting a databuffer to work on. - // Perform the first fetch here outside of the loop. This allows us to execute one-time only - // initializations that happen after the first fetch. - RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); + std::unique_ptr in_buffer; + std::vector> job_list; + // Fetch next data buffer and map job list + RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list)); // Sanity check the databuffer. // Special case: if there's more threads than buffers, some threads simply get the final control @@ -175,7 +235,8 @@ Status MapOp::WorkerEntry(int32_t worker_id) { if (in_buffer->eoe()) { // Calling base class EoeReceived to forward eoe buffer. RETURN_IF_NOT_OK(EoeReceived(worker_id)); - RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); + // Fetch next data buffer and map job list + RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list)); continue; } else if (in_buffer->eof()) { // Calling base class EofReceived to forward eof buffer. @@ -185,76 +246,77 @@ Status MapOp::WorkerEntry(int32_t worker_id) { std::unique_ptr new_tensor_table(std::make_unique()); // Perform the compute function of TensorOp(s) and store the result in new_tensor_table. - RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get())); - + RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get(), job_list)); // Replace the TensorTable in DataBuffer with the new one. in_buffer->set_tensor_table(std::move(new_tensor_table)); - // Push the buffer onto the connector for next operator to consume. RETURN_IF_NOT_OK(out_connector_->Add(static_cast(worker_id), std::move(in_buffer))); - - // Fetch the next buffer and loop back to the top. - RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); + // Fetch next data buffer and map job list + RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list)); } - return Status::OK(); } -Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table) { - // Getting number of rows and cols in this buffer. +Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table, + const std::vector> &job_list) { int32_t num_rows = in_buffer->NumRows(); int32_t num_cols = in_buffer->NumCols(); + std::vector job_input_table; + std::vector original_table; + + // Prepare the data that we need from in_buffer for (int32_t r = 0; r < num_rows; r++) { // to_process : A vector of Tensors only holding cols in input_columns. - // result_row; : A vector of Tensors to hold the result after Compute(). - // cur_row : A vector of Tensors holding all the columns from DataBuffer. - TensorRow to_process, result_row, cur_row; + // cur_row : A vector of Tensors holding all the cols from DataBuffer. + TensorRow to_process, cur_row; RETURN_IF_NOT_OK(in_buffer->PopRow(&cur_row)); + // From the current row, select the Tensor that need to be passed to TensorOp + (void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process), + [&cur_row](const auto &it) { return std::move(cur_row[it]); }); + job_input_table.push_back(std::move(to_process)); + original_table.push_back(std::move(cur_row)); + } - // Populate the Tensor from the current row to be processed by TensorOp - for (const auto &idx : to_process_indices_) { - to_process.push_back(std::move(cur_row[idx])); - } - - // Looping over multiple TensorOps supplied in to MapOp. - // The assumption is that the result of one TensorOp matches the required input to the next TensorOp. - for (size_t i = 0; i < tfuncs_.size(); i++) { - // TensorOp can operate on single col or multiple cols. MapOp always call compute for multiple cols. - // TensorOp base class will call the single column Compute() depending on the ops. - // Note: The columns of the result_row is not preallocated, the compute function of each tensor op are - // required to resize/push back the result_row - RETURN_IF_NOT_OK(tfuncs_[i]->Compute(to_process, &result_row)); - - // Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list. - if (i + 1 < tfuncs_.size()) { - to_process = std::move(result_row); - } - } - - if (out_columns_.size() != result_row.size()) { - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, - "Result of a tensorOp doesn't match output column names"); - } - - if (in_columns_.size() == out_columns_.size()) { - for (size_t i = 0; i < result_row.size(); i++) { - cur_row[to_process_indices_[i]] = std::move(result_row[i]); - } - new_tensor_table->push_back(std::move(cur_row)); - } else { - // Add the columns we did not touch to the result_row. - for (int32_t i = 0; i < num_cols; i++) { - if (keep_input_columns_[i]) { - result_row.push_back(std::move(cur_row[i])); - } - } - - // Add this final result_row to our new TensorTable. - new_tensor_table->push_back(std::move(result_row)); + // Variable to keep the result after executing the job. + std::vector result_table; + // Executing the list of jobs + for (size_t i = 0; i < job_list.size(); i++) { + // Executre MapJob. + RETURN_IF_NOT_OK(job_list[i]->Run(job_input_table, &result_table)); + // Assign the pocessed data as an input for the next job processing, except for the last TensorOp in the list. + if (i + 1 < job_list.size()) { + job_input_table = std::move(result_table); } } + // Sanity check a row in result_table + if (!result_table.empty() && out_columns_.size() != result_table[0].size()) { + return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, + "Result of a tensorOp doesn't match output column names"); + } + + // Merging the data processed by job (result_table) with the data that are not used. + for (int32_t r = 0; r < num_rows; r++) { + TensorRow out_row; + if (in_columns_.size() == out_columns_.size()) { + // Place the processed tensor back into the original index of the input tensor + for (size_t i = 0; i < result_table[r].size(); i++) { + original_table[r][to_process_indices_[i]] = std::move(result_table[r][i]); + } + out_row = std::move(original_table[r]); + } else { + // Append the data in the original table that we did not use to the end of each row in result_table. + for (int32_t i = 0; i < num_cols; i++) { + if (keep_input_columns_[i]) { + result_table[r].push_back(std::move(original_table[r][i])); + } + } + out_row = std::move(result_table[r]); + } + // Add this final out_row to our new TensorTable. + new_tensor_table->push_back(std::move(out_row)); + } return Status::OK(); } @@ -288,13 +350,11 @@ Status MapOp::ValidateInColumns(const std::unordered_map & Status MapOp::InitPrivateVariable(std::unordered_map *col_name_id_map) { // If input_columns is empty(), The col at index-0 will be picked. if (in_columns_.empty()) { - for (const auto &pair : *col_name_id_map) { - if (pair.second == 0) { - MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table."; - in_columns_.push_back(pair.first); - break; - } - } + auto itr = + std::find_if(col_name_id_map->begin(), col_name_id_map->end(), [](const auto &it) { return it.second == 0; }); + CHECK_FAIL_RETURN_UNEXPECTED(itr != col_name_id_map->end(), "Column name id map doesn't have id 0"); + MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table."; + in_columns_.push_back(itr->first); // If caller didn't specify the out_col_names, assume they are same as the input_columns. // This was done in the constructor, but if input columns was empty to start we have to redo it here. diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h similarity index 85% rename from mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op.h rename to mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h index 7cb5807738..77ee94d86d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h @@ -24,6 +24,7 @@ #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/kernels/tensor_op.h" #include "minddata/dataset/util/queue.h" +#include "minddata/dataset/engine/datasetops/map_op/map_job.h" namespace mindspore { namespace dataset { @@ -107,13 +108,6 @@ class MapOp : public ParallelOp { return *this; } - // Setter method. - // @return Builder setter method returns reference to the builder. - Builder &SetPerformanceMode(bool perf_mode) { - build_perf_mode_ = perf_mode; - return *this; - } - // The builder "build" method creates the final object. // @param ptr The shared_ptr to the new MapOp object // @return Status @@ -125,7 +119,6 @@ class MapOp : public ParallelOp { std::vector> build_tensor_funcs_; int32_t build_num_workers_; int32_t build_op_connector_size_; - bool build_perf_mode_; // Default true. // Check if the required parameters are set by the builder. // @return Status The error code return @@ -140,8 +133,7 @@ class MapOp : public ParallelOp { // @param num_workers The number of worker threads. // @param op_connector_size The size of each queue in the connector. MapOp(const std::vector &in_col_names, const std::vector &out_col_names, - std::vector> tensor_funcs, int32_t num_workers, int32_t op_connector_size, - bool perf_mode); + std::vector> tensor_funcs, int32_t num_workers, int32_t op_connector_size); // Destructor ~MapOp() = default; @@ -164,6 +156,8 @@ class MapOp : public ParallelOp { // Class functor operator () override. // All dataset ops operate by launching a thread (see ExecutionTree). This class functor will // provide the master loop that drives the logic for performing the work + // This main thread creates local queues, pulls databuffers from the previous + // op's Connector and distributes them to the local queues. Workers pull from the local queues. // @return Status The error code return Status operator()() override; @@ -189,12 +183,24 @@ class MapOp : public ParallelOp { const auto &TFuncs() const { return tfuncs_; } private: - // Local queues where worker threads can pop from. - // Popping directly from the Connector can block if the previous designated threads haven't pop. - // Setting the size of these queues to 0 is essentially the same as pulling directly from Connector. - QueueList> local_queues_; + // A unit of job for map worker thread. + // MapWorkerJob holds a list of MapJob where each MapJob can be a CpuMapJob, GpuMapJob or DvppMapJob. + struct MapWorkerJob { + std::vector> jobs; + std::unique_ptr databuffer; + }; - // Static variables to be ready by worker threads, no modification and readonly + // A helper function to create jobs for workers. + Status GenerateWorkerJob(const std::unique_ptr *worker_job); + + // A helper function that fetch worker map job from local queues and extract the data and map job list + Status FetchNextWork(uint32_t worker_id, std::unique_ptr *db, + std::vector> *job_list); + + // Local queues where worker threads get a job from + QueueList> local_queues_; + + // Tensorops to be read and applied by worker threads std::vector> tfuncs_; // Variable to store the column name that the tensorOps are consuming @@ -209,13 +215,6 @@ class MapOp : public ParallelOp { // Indices of the columns to process. std::vector to_process_indices_; - // Performance mode is when the main thread creates local queues, pulls databuffers from the previous - // op's Connector and distributes them to the local queues. Workers pull from the local queues. - // If this flag is false, each worker pulls directly from the Connector. This use less resources - // (thread and memory), but when the computation cost is heavy (e.g. DecodeOp) and fluctuating, it can - // cause additional blocking because pop calls to Connector from the threads are synchronized to enforce the order. - bool perf_mode_; - // Private function for worker/thread to loop continuously. It comprises the main // logic of MapOp: getting the data from previous Op, validating user specified column names, // applying a list of TensorOps to each of the data, process the results and then @@ -224,25 +223,12 @@ class MapOp : public ParallelOp { // @return Status The error code return Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_ - // Private helper function for getting the next buffer - // When PerformanceMode is enabled, workers pop from the local queue. - // Otherwise, workers pop from the first child output Connector. - // @param p_buffer - the buffer to return - // @return Status return code - Status FetchNextBuffer(std::unique_ptr *p_buffer, int32_t worker_id) { - if (perf_mode_) { - RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(p_buffer)); - } else { - RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id)); - } - return Status::OK(); - } - // Private function for worker thread to perform TensorOp's compute function and get the result. // @param in_buffer A raw pointer to the DataBuffer. A raw pointer is fine because this function doesn't manage memory // and is not shared with other threads. // @param[out] new_tensor_table A new Tensor Table to be populated in this function. - Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table); + Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table, + const std::vector> &job_list); // Private function that create the final column name to index mapping and // get indices of the columns this mapop does not use. diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.cc index d8ce2dd863..fc0eb027b6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.cc @@ -17,7 +17,7 @@ #include #include "minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.h" #include "minddata/dataset/kernels/image/decode_op.h" -#include "minddata/dataset/engine/datasetops/map_op.h" +#include "minddata/dataset/engine/datasetops/map_op/map_op.h" #include "minddata/dataset/kernels/image/random_crop_decode_resize_op.h" namespace mindspore { diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pass.cc index ca4c87f67b..07ee10307a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pass.cc @@ -24,7 +24,7 @@ #include "minddata/dataset/engine/datasetops/dataset_op.h" #include "minddata/dataset/engine/datasetops/device_queue_op.h" #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h" -#include "minddata/dataset/engine/datasetops/map_op.h" +#include "minddata/dataset/engine/datasetops/map_op/map_op.h" #include "minddata/dataset/engine/datasetops/project_op.h" #include "minddata/dataset/engine/datasetops/rename_op.h" #include "minddata/dataset/engine/datasetops/repeat_op.h" diff --git a/tests/ut/cpp/dataset/map_op_test.cc b/tests/ut/cpp/dataset/map_op_test.cc index 4e9cfe9ec9..0cee75b264 100644 --- a/tests/ut/cpp/dataset/map_op_test.cc +++ b/tests/ut/cpp/dataset/map_op_test.cc @@ -645,16 +645,14 @@ TEST_F(MindDataTestMapOp, ImageFolder_Decode_Repeat_Resize) { map_decode_builder.SetInColNames({"image"}) .SetOutColNames({}) .SetTensorFuncs(func_list) - .SetNumWorkers(14) - .SetPerformanceMode(false); + .SetNumWorkers(14); rc = map_decode_builder.Build(&map_decode_map); EXPECT_TRUE(rc.IsOk()); map_resize_builder.SetInColNames({"image"}) .SetOutColNames({}) .SetTensorFuncs(func_list2) - .SetNumWorkers(15) - .SetPerformanceMode(false); + .SetNumWorkers(15); rc = map_resize_builder.Build(&map_resize_op); EXPECT_TRUE(rc.IsOk()); @@ -739,5 +737,3 @@ TEST_F(MindDataTestMapOp, ImageFolder_Decode_Repeat_Resize_NoInputColumns) { } EXPECT_TRUE(i == 88); } - - diff --git a/tests/ut/cpp/dataset/rename_op_test.cc b/tests/ut/cpp/dataset/rename_op_test.cc index ac64346c26..56d3c01f61 100644 --- a/tests/ut/cpp/dataset/rename_op_test.cc +++ b/tests/ut/cpp/dataset/rename_op_test.cc @@ -19,7 +19,6 @@ #include #include "minddata/dataset/core/client.h" #include "minddata/dataset/core/constants.h" -#include "minddata/dataset/engine/datasetops/map_op.h" #include "minddata/dataset/engine/datasetops/rename_op.h" #include "common/common.h" #include "common/utils.h" diff --git a/tests/ut/cpp/dataset/zip_op_test.cc b/tests/ut/cpp/dataset/zip_op_test.cc index 8d74cb0969..90b363b35e 100644 --- a/tests/ut/cpp/dataset/zip_op_test.cc +++ b/tests/ut/cpp/dataset/zip_op_test.cc @@ -23,7 +23,6 @@ #include #include "minddata/dataset/core/client.h" #include "minddata/dataset/core/constants.h" -#include "minddata/dataset/engine/datasetops/map_op.h" #include "minddata/dataset/engine/datasetops/zip_op.h" #include "minddata/dataset/core/tensor.h" #include "minddata/dataset/core/config_manager.h"