First drop of MapOp refactor to support device augment.

Refactored MapOp to in preparation to support device augment

Minor fix
This commit is contained in:
anthonyaje 2020-07-14 17:16:31 -04:00
parent f2401cd0f9
commit a7eaf2736a
19 changed files with 435 additions and 154 deletions

View File

@ -66,6 +66,7 @@ add_dependencies(kernels core)
add_dependencies(engine-datasetops-source core)
add_dependencies(engine-datasetops-source-sampler core)
add_dependencies(engine-datasetops core)
add_dependencies(engine-datasetops-mapop core)
add_dependencies(engine-opt core)
add_dependencies(engine-perf core)
add_dependencies(engine-gnn core)
@ -89,6 +90,7 @@ set(submodules
$<TARGET_OBJECTS:cpp-API>
$<TARGET_OBJECTS:engine-datasetops-source>
$<TARGET_OBJECTS:engine-datasetops-source-sampler>
$<TARGET_OBJECTS:engine-datasetops-mapop>
$<TARGET_OBJECTS:engine-gnn>
$<TARGET_OBJECTS:engine-perf>
$<TARGET_OBJECTS:engine-datasetops>

View File

@ -27,7 +27,7 @@
#include "minddata/dataset/engine/datasetops/source/voc_op.h"
// Dataset operator headers (in alphabetical order)
#include "minddata/dataset/engine/datasetops/batch_op.h"
#include "minddata/dataset/engine/datasetops/map_op.h"
#include "minddata/dataset/engine/datasetops/map_op/map_op.h"
#include "minddata/dataset/engine/datasetops/project_op.h"
#include "minddata/dataset/engine/datasetops/rename_op.h"
#include "minddata/dataset/engine/datasetops/repeat_op.h"
@ -537,9 +537,6 @@ std::vector<std::shared_ptr<DatasetOp>> MapDataset::Build() {
// A vector containing shared pointer to the Dataset Ops that this object will create
std::vector<std::shared_ptr<DatasetOp>> node_ops;
// Currently default is true, and this is not exposed to user.
bool perf_mode = true;
std::vector<std::shared_ptr<TensorOp>> tensor_ops;
// Build tensorOp from tensorOperation vector
@ -550,8 +547,7 @@ std::vector<std::shared_ptr<DatasetOp>> MapDataset::Build() {
// This parameter will be removed with next rebase
std::vector<std::string> col_orders;
auto map_op =
std::make_shared<MapOp>(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_, perf_mode);
auto map_op = std::make_shared<MapOp>(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_);
if (!project_columns_.empty()) {
auto project_op = std::make_shared<ProjectOp>(project_columns_);
node_ops.push_back(project_op);

View File

@ -39,7 +39,7 @@
#include "minddata/dataset/engine/datasetops/batch_op.h"
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/map_op.h"
#include "minddata/dataset/engine/datasetops/map_op/map_op.h"
#include "minddata/dataset/engine/datasetops/project_op.h"
#include "minddata/dataset/engine/datasetops/rename_op.h"
#include "minddata/dataset/engine/datasetops/repeat_op.h"

View File

@ -26,6 +26,9 @@ namespace dataset {
using uchar = unsigned char;
using dsize_t = int64_t;
// Target devices to perform map operation
enum class MapTargetDevice { kCpu, kGpu, kDvpp };
// Possible dataset types for holding the data and client type
enum class DatasetType { kUnknown, kArrow, kTf };

View File

@ -19,8 +19,8 @@ target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS})
if (ENABLE_TDTQUE)
add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt engine-gnn engine-perf
engine-cache-client engine-cache-server)
engine-cache-client engine-cache-server engine-datasetops-mapop)
else ()
add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn engine-perf
engine-cache-client engine-cache-server)
engine-cache-client engine-cache-server engine-datasetops-mapop)
endif ()

View File

@ -1,4 +1,5 @@
add_subdirectory(source)
add_subdirectory(map_op)
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
@ -9,7 +10,6 @@ set(DATASET_ENGINE_DATASETOPS_SRC_FILES
pipeline_op.cc
batch_op.cc
device_queue_op.cc
map_op.cc
project_op.cc
rename_op.cc
repeat_op.cc
@ -37,4 +37,3 @@ if (ENABLE_PYTHON)
endif()
add_library(engine-datasetops OBJECT ${DATASET_ENGINE_DATASETOPS_SRC_FILES})

View File

@ -0,0 +1,10 @@
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
set(DATASET_ENGINE_DATASETOPS_MAPOP_SRC_FILES
map_op.cc
cpu_map_job.cc
gpu_map_job.cc
)
add_library(engine-datasetops-mapop OBJECT ${DATASET_ENGINE_DATASETOPS_MAPOP_SRC_FILES})

View File

@ -0,0 +1,56 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <vector>
#include <utility>
#include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h"
namespace mindspore {
namespace dataset {
// Constructor
CpuMapJob::CpuMapJob() = default;
// Constructor
CpuMapJob::CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations) : MapJob(operations) {}
// Destructor
CpuMapJob::~CpuMapJob() = default;
// A function to execute a cpu map job
Status CpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) {
int32_t num_rows = in.size();
for (int32_t row = 0; row < num_rows; row++) {
TensorRow input_row = in[row];
TensorRow result_row;
for (size_t i = 0; i < ops_.size(); i++) {
// Call compute function for cpu
RETURN_IF_NOT_OK(ops_[i]->Compute(input_row, &result_row));
// Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list.
if (i + 1 < ops_.size()) {
input_row = std::move(result_row);
}
}
out->push_back(std::move(result_row));
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,43 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_
#define DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_
#include <memory>
#include <vector>
#include "minddata/dataset/engine/datasetops/map_op/map_job.h"
namespace mindspore {
namespace dataset {
class CpuMapJob : public MapJob {
public:
// Constructor
CpuMapJob();
// Constructor
explicit CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations);
// Destructor
~CpuMapJob();
// A pure virtual run function to execute a cpu map job
Status Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) override;
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_

View File

@ -0,0 +1,37 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <vector>
#include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h"
namespace mindspore {
namespace dataset {
// Constructor
GpuMapJob::GpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations) : MapJob(operations) {}
// Destructor
GpuMapJob::~GpuMapJob() = default;
// A function to execute a cpu map job
Status GpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) {
// Do nothing for now
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

View File

@ -0,0 +1,40 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_
#define DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_
#include <memory>
#include <vector>
#include "minddata/dataset/engine/datasetops/map_op/map_job.h"
namespace mindspore {
namespace dataset {
class GpuMapJob : public MapJob {
public:
// Constructor
explicit GpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations);
// Destructor
~GpuMapJob();
// A pure virtual run function to execute a cpu map job
Status Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) override;
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_

View File

@ -0,0 +1,55 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_
#define DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_
#include <memory>
#include <vector>
#include "minddata/dataset/kernels/tensor_op.h"
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/core/tensor_row.h"
#include "minddata/dataset/util/status.h"
namespace mindspore {
namespace dataset {
class MapJob {
public:
// Constructor
explicit MapJob(std::vector<std::shared_ptr<TensorOp>> operations) : ops_(operations) {}
// Constructor
MapJob() = default;
// Destructor
~MapJob() = default;
Status AddOperation(std::shared_ptr<TensorOp> operation) {
ops_.push_back(operation);
return Status::OK();
}
// A pure virtual run function to execute a particular map job
virtual Status Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) = 0;
protected:
std::vector<std::shared_ptr<TensorOp>> ops_;
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/datasetops/map_op.h"
#include <algorithm>
#include <cstring>
#include <iomanip>
#include <iostream>
@ -28,6 +28,9 @@
#include "minddata/dataset/engine/db_connector.h"
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/engine/opt/pass.h"
#include "minddata/dataset/engine/datasetops/map_op/map_op.h"
#include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h"
#include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h"
#include "minddata/dataset/kernels/tensor_op.h"
#include "utils/log_adapter.h"
#include "minddata/dataset/util/task_manager.h"
@ -35,7 +38,7 @@
namespace mindspore {
namespace dataset {
// Builder constructor. Creates the builder object.
MapOp::Builder::Builder() : build_perf_mode_(true) {
MapOp::Builder::Builder() {
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
build_num_workers_ = cfg->num_parallel_workers();
build_op_connector_size_ = cfg->op_connector_size();
@ -54,31 +57,27 @@ Status MapOp::Builder::sanityCheck() const {
Status MapOp::Builder::Build(std::shared_ptr<MapOp> *ptr) {
RETURN_IF_NOT_OK(sanityCheck());
*ptr = std::make_shared<MapOp>(std::move(build_in_col_names_), std::move(build_out_col_names_),
std::move(build_tensor_funcs_), build_num_workers_, build_op_connector_size_,
build_perf_mode_);
std::move(build_tensor_funcs_), build_num_workers_, build_op_connector_size_);
return Status::OK();
}
// Constructor of MapOp
MapOp::MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names,
std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size,
bool perf_mode)
std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size)
: ParallelOp(num_workers, op_connector_size),
tfuncs_(std::move(tensor_funcs)),
in_columns_(in_col_names),
out_columns_(out_col_names),
perf_mode_(perf_mode) {
out_columns_(out_col_names) {
// If caller didn't specify the out_col_names, assume they are same as the in_columns.
if (out_columns_.empty() || out_columns_[0].empty()) {
out_columns_ = in_columns_;
}
MS_LOG(DEBUG) << "Performance Mode in map operator is " << perf_mode_ << ".";
}
// The number of threads consuming data from previous op's output Connector.
int32_t MapOp::num_consumers() const {
// When Performance Mode is on, there is only one thread consuming from the previous Connector.
return perf_mode_ == true ? 1 : num_workers_;
return 1;
}
// A print method typically used for debugging
@ -106,36 +105,98 @@ void MapOp::Print(std::ostream &out, bool show_all) const {
}
}
// A helper function that fetch worker map job from local queues and extract the data and map job list
Status MapOp::FetchNextWork(uint32_t worker_id, std::unique_ptr<DataBuffer> *db,
std::vector<std::shared_ptr<MapJob>> *job_list) {
std::unique_ptr<MapWorkerJob> worker_job;
// Fetch the next worker job and data buffer
RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&worker_job));
// Extract the databuffer and job list from the map worker job.
*db = std::move(worker_job->databuffer);
*job_list = std::move(worker_job->jobs);
return Status::OK();
}
Status MapOp::GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job) {
std::shared_ptr<MapJob> map_job = nullptr;
MapTargetDevice prev_target;
for (size_t i = 0; i < tfuncs_.size(); i++) {
// Currently we only have CPU as the device target
// In the future, we will have heuristic or control from user to select target device
// MapTargetDevice target_device;
// RETURN_IF_NOT_OK(SelectTarget(tfuncs_[i], &target_device));
MapTargetDevice target_device = MapTargetDevice::kCpu;
switch (target_device) {
case MapTargetDevice::kCpu:
// If there is no existing map_job, we will create one.
// map_job could be nullptr when we are at the first tensor op or when the target device of the prev op
// is different with that of the current op.
if (map_job == nullptr) {
map_job = std::make_shared<CpuMapJob>();
}
map_job->AddOperation(tfuncs_[i]);
break;
case MapTargetDevice::kGpu:
break;
case MapTargetDevice::kDvpp:
break;
default:
break;
}
// Push map_job into worker_job if one of the two conditions is true:
// 1) It is the last tensor operation in tfuncs_
// 2) The the target device of the current tensor operation is different with previous one
if ((i + 1 == tfuncs_.size()) || ((i != 0) && (prev_target != target_device))) {
(*worker_job)->jobs.push_back(std::move(map_job));
}
prev_target = target_device;
}
return Status::OK();
}
// This class functor will provide the master loop that drives the logic for performing the work
Status MapOp::operator()() {
if (perf_mode_) {
// Create and register the local queues.
local_queues_.Init(num_workers_, oc_queue_size_);
Status rc = local_queues_.Register(tree_->AllTasks());
if (rc.IsError()) {
TaskManager::FindMe()->Post();
return rc;
}
// Create and register the local queues.
local_queues_.Init(num_workers_, oc_queue_size_);
Status rc = local_queues_.Register(tree_->AllTasks());
if (rc.IsError()) {
TaskManager::FindMe()->Post();
return rc;
}
// The operator class just starts off threads by calling the tree_ function
Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1));
rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1));
// Synchronize with TaskManager
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(rc);
if (perf_mode_) {
int64_t que_id = 0;
std::unique_ptr<DataBuffer> buff;
bool is_eof = false;
// Draining output connector of the previous op and distribute it to local queues.
// Stop when all worker threads are finished (received EOF).
while (!is_eof) {
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0));
is_eof = buff->eof();
RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(buff)));
que_id = (que_id + 1) % num_workers_;
}
int64_t que_id = 0;
std::unique_ptr<DataBuffer> buff;
bool is_eof = false;
// Drain output connector of the previous op, generate jobs for worker threads, and distribute them via local queues
// Stop when all worker threads are finished (received EOF)
while (!is_eof) {
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0));
is_eof = buff->eof();
// Create an empty map worker job to be populated by a databuffer and map jobs
std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>();
worker_job->databuffer = std::move(buff);
// Populate map worker job for a worker to execute
RETURN_IF_NOT_OK(GenerateWorkerJob(&worker_job));
// Push map worker job to the corresponding worker's queue
RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(worker_job)));
que_id = (que_id + 1) % num_workers_;
}
return Status::OK();
@ -148,12 +209,11 @@ Status MapOp::operator()() {
Status MapOp::WorkerEntry(int32_t worker_id) {
// Handshake with TaskManager that thread creation is successful.
TaskManager::FindMe()->Post();
std::unique_ptr<DataBuffer> in_buffer;
// Getting a databuffer to work on.
// Perform the first fetch here outside of the loop. This allows us to execute one-time only
// initializations that happen after the first fetch.
RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));
std::unique_ptr<DataBuffer> in_buffer;
std::vector<std::shared_ptr<MapJob>> job_list;
// Fetch next data buffer and map job list
RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list));
// Sanity check the databuffer.
// Special case: if there's more threads than buffers, some threads simply get the final control
@ -175,7 +235,8 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
if (in_buffer->eoe()) {
// Calling base class EoeReceived to forward eoe buffer.
RETURN_IF_NOT_OK(EoeReceived(worker_id));
RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));
// Fetch next data buffer and map job list
RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list));
continue;
} else if (in_buffer->eof()) {
// Calling base class EofReceived to forward eof buffer.
@ -185,76 +246,77 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
std::unique_ptr<TensorQTable> new_tensor_table(std::make_unique<TensorQTable>());
// Perform the compute function of TensorOp(s) and store the result in new_tensor_table.
RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get()));
RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get(), job_list));
// Replace the TensorTable in DataBuffer with the new one.
in_buffer->set_tensor_table(std::move(new_tensor_table));
// Push the buffer onto the connector for next operator to consume.
RETURN_IF_NOT_OK(out_connector_->Add(static_cast<int>(worker_id), std::move(in_buffer)));
// Fetch the next buffer and loop back to the top.
RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));
// Fetch next data buffer and map job list
RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list));
}
return Status::OK();
}
Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table) {
// Getting number of rows and cols in this buffer.
Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table,
const std::vector<std::shared_ptr<MapJob>> &job_list) {
int32_t num_rows = in_buffer->NumRows();
int32_t num_cols = in_buffer->NumCols();
std::vector<TensorRow> job_input_table;
std::vector<TensorRow> original_table;
// Prepare the data that we need from in_buffer
for (int32_t r = 0; r < num_rows; r++) {
// to_process : A vector of Tensors only holding cols in input_columns.
// result_row; : A vector of Tensors to hold the result after Compute().
// cur_row : A vector of Tensors holding all the columns from DataBuffer.
TensorRow to_process, result_row, cur_row;
// cur_row : A vector of Tensors holding all the cols from DataBuffer.
TensorRow to_process, cur_row;
RETURN_IF_NOT_OK(in_buffer->PopRow(&cur_row));
// From the current row, select the Tensor that need to be passed to TensorOp
(void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process),
[&cur_row](const auto &it) { return std::move(cur_row[it]); });
job_input_table.push_back(std::move(to_process));
original_table.push_back(std::move(cur_row));
}
// Populate the Tensor from the current row to be processed by TensorOp
for (const auto &idx : to_process_indices_) {
to_process.push_back(std::move(cur_row[idx]));
}
// Looping over multiple TensorOps supplied in to MapOp.
// The assumption is that the result of one TensorOp matches the required input to the next TensorOp.
for (size_t i = 0; i < tfuncs_.size(); i++) {
// TensorOp can operate on single col or multiple cols. MapOp always call compute for multiple cols.
// TensorOp base class will call the single column Compute() depending on the ops.
// Note: The columns of the result_row is not preallocated, the compute function of each tensor op are
// required to resize/push back the result_row
RETURN_IF_NOT_OK(tfuncs_[i]->Compute(to_process, &result_row));
// Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list.
if (i + 1 < tfuncs_.size()) {
to_process = std::move(result_row);
}
}
if (out_columns_.size() != result_row.size()) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Result of a tensorOp doesn't match output column names");
}
if (in_columns_.size() == out_columns_.size()) {
for (size_t i = 0; i < result_row.size(); i++) {
cur_row[to_process_indices_[i]] = std::move(result_row[i]);
}
new_tensor_table->push_back(std::move(cur_row));
} else {
// Add the columns we did not touch to the result_row.
for (int32_t i = 0; i < num_cols; i++) {
if (keep_input_columns_[i]) {
result_row.push_back(std::move(cur_row[i]));
}
}
// Add this final result_row to our new TensorTable.
new_tensor_table->push_back(std::move(result_row));
// Variable to keep the result after executing the job.
std::vector<TensorRow> result_table;
// Executing the list of jobs
for (size_t i = 0; i < job_list.size(); i++) {
// Executre MapJob.
RETURN_IF_NOT_OK(job_list[i]->Run(job_input_table, &result_table));
// Assign the pocessed data as an input for the next job processing, except for the last TensorOp in the list.
if (i + 1 < job_list.size()) {
job_input_table = std::move(result_table);
}
}
// Sanity check a row in result_table
if (!result_table.empty() && out_columns_.size() != result_table[0].size()) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Result of a tensorOp doesn't match output column names");
}
// Merging the data processed by job (result_table) with the data that are not used.
for (int32_t r = 0; r < num_rows; r++) {
TensorRow out_row;
if (in_columns_.size() == out_columns_.size()) {
// Place the processed tensor back into the original index of the input tensor
for (size_t i = 0; i < result_table[r].size(); i++) {
original_table[r][to_process_indices_[i]] = std::move(result_table[r][i]);
}
out_row = std::move(original_table[r]);
} else {
// Append the data in the original table that we did not use to the end of each row in result_table.
for (int32_t i = 0; i < num_cols; i++) {
if (keep_input_columns_[i]) {
result_table[r].push_back(std::move(original_table[r][i]));
}
}
out_row = std::move(result_table[r]);
}
// Add this final out_row to our new TensorTable.
new_tensor_table->push_back(std::move(out_row));
}
return Status::OK();
}
@ -288,13 +350,11 @@ Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &
Status MapOp::InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map) {
// If input_columns is empty(), The col at index-0 will be picked.
if (in_columns_.empty()) {
for (const auto &pair : *col_name_id_map) {
if (pair.second == 0) {
MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
in_columns_.push_back(pair.first);
break;
}
}
auto itr =
std::find_if(col_name_id_map->begin(), col_name_id_map->end(), [](const auto &it) { return it.second == 0; });
CHECK_FAIL_RETURN_UNEXPECTED(itr != col_name_id_map->end(), "Column name id map doesn't have id 0");
MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
in_columns_.push_back(itr->first);
// If caller didn't specify the out_col_names, assume they are same as the input_columns.
// This was done in the constructor, but if input columns was empty to start we have to redo it here.

View File

@ -24,6 +24,7 @@
#include "minddata/dataset/engine/datasetops/parallel_op.h"
#include "minddata/dataset/kernels/tensor_op.h"
#include "minddata/dataset/util/queue.h"
#include "minddata/dataset/engine/datasetops/map_op/map_job.h"
namespace mindspore {
namespace dataset {
@ -107,13 +108,6 @@ class MapOp : public ParallelOp {
return *this;
}
// Setter method.
// @return Builder setter method returns reference to the builder.
Builder &SetPerformanceMode(bool perf_mode) {
build_perf_mode_ = perf_mode;
return *this;
}
// The builder "build" method creates the final object.
// @param ptr The shared_ptr to the new MapOp object
// @return Status
@ -125,7 +119,6 @@ class MapOp : public ParallelOp {
std::vector<std::shared_ptr<TensorOp>> build_tensor_funcs_;
int32_t build_num_workers_;
int32_t build_op_connector_size_;
bool build_perf_mode_; // Default true.
// Check if the required parameters are set by the builder.
// @return Status The error code return
@ -140,8 +133,7 @@ class MapOp : public ParallelOp {
// @param num_workers The number of worker threads.
// @param op_connector_size The size of each queue in the connector.
MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names,
std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size,
bool perf_mode);
std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size);
// Destructor
~MapOp() = default;
@ -164,6 +156,8 @@ class MapOp : public ParallelOp {
// Class functor operator () override.
// All dataset ops operate by launching a thread (see ExecutionTree). This class functor will
// provide the master loop that drives the logic for performing the work
// This main thread creates local queues, pulls databuffers from the previous
// op's Connector and distributes them to the local queues. Workers pull from the local queues.
// @return Status The error code return
Status operator()() override;
@ -189,12 +183,24 @@ class MapOp : public ParallelOp {
const auto &TFuncs() const { return tfuncs_; }
private:
// Local queues where worker threads can pop from.
// Popping directly from the Connector can block if the previous designated threads haven't pop.
// Setting the size of these queues to 0 is essentially the same as pulling directly from Connector.
QueueList<std::unique_ptr<DataBuffer>> local_queues_;
// A unit of job for map worker thread.
// MapWorkerJob holds a list of MapJob where each MapJob can be a CpuMapJob, GpuMapJob or DvppMapJob.
struct MapWorkerJob {
std::vector<std::shared_ptr<MapJob>> jobs;
std::unique_ptr<DataBuffer> databuffer;
};
// Static variables to be ready by worker threads, no modification and readonly
// A helper function to create jobs for workers.
Status GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job);
// A helper function that fetch worker map job from local queues and extract the data and map job list
Status FetchNextWork(uint32_t worker_id, std::unique_ptr<DataBuffer> *db,
std::vector<std::shared_ptr<MapJob>> *job_list);
// Local queues where worker threads get a job from
QueueList<std::unique_ptr<MapWorkerJob>> local_queues_;
// Tensorops to be read and applied by worker threads
std::vector<std::shared_ptr<TensorOp>> tfuncs_;
// Variable to store the column name that the tensorOps are consuming
@ -209,13 +215,6 @@ class MapOp : public ParallelOp {
// Indices of the columns to process.
std::vector<size_t> to_process_indices_;
// Performance mode is when the main thread creates local queues, pulls databuffers from the previous
// op's Connector and distributes them to the local queues. Workers pull from the local queues.
// If this flag is false, each worker pulls directly from the Connector. This use less resources
// (thread and memory), but when the computation cost is heavy (e.g. DecodeOp) and fluctuating, it can
// cause additional blocking because pop calls to Connector from the threads are synchronized to enforce the order.
bool perf_mode_;
// Private function for worker/thread to loop continuously. It comprises the main
// logic of MapOp: getting the data from previous Op, validating user specified column names,
// applying a list of TensorOps to each of the data, process the results and then
@ -224,25 +223,12 @@ class MapOp : public ParallelOp {
// @return Status The error code return
Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_
// Private helper function for getting the next buffer
// When PerformanceMode is enabled, workers pop from the local queue.
// Otherwise, workers pop from the first child output Connector.
// @param p_buffer - the buffer to return
// @return Status return code
Status FetchNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id) {
if (perf_mode_) {
RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(p_buffer));
} else {
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id));
}
return Status::OK();
}
// Private function for worker thread to perform TensorOp's compute function and get the result.
// @param in_buffer A raw pointer to the DataBuffer. A raw pointer is fine because this function doesn't manage memory
// and is not shared with other threads.
// @param[out] new_tensor_table A new Tensor Table to be populated in this function.
Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table);
Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table,
const std::vector<std::shared_ptr<MapJob>> &job_list);
// Private function that create the final column name to index mapping and
// get indices of the columns this mapop does not use.

View File

@ -17,7 +17,7 @@
#include <memory>
#include "minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.h"
#include "minddata/dataset/kernels/image/decode_op.h"
#include "minddata/dataset/engine/datasetops/map_op.h"
#include "minddata/dataset/engine/datasetops/map_op/map_op.h"
#include "minddata/dataset/kernels/image/random_crop_decode_resize_op.h"
namespace mindspore {

View File

@ -24,7 +24,7 @@
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h"
#include "minddata/dataset/engine/datasetops/map_op.h"
#include "minddata/dataset/engine/datasetops/map_op/map_op.h"
#include "minddata/dataset/engine/datasetops/project_op.h"
#include "minddata/dataset/engine/datasetops/rename_op.h"
#include "minddata/dataset/engine/datasetops/repeat_op.h"

View File

@ -645,16 +645,14 @@ TEST_F(MindDataTestMapOp, ImageFolder_Decode_Repeat_Resize) {
map_decode_builder.SetInColNames({"image"})
.SetOutColNames({})
.SetTensorFuncs(func_list)
.SetNumWorkers(14)
.SetPerformanceMode(false);
.SetNumWorkers(14);
rc = map_decode_builder.Build(&map_decode_map);
EXPECT_TRUE(rc.IsOk());
map_resize_builder.SetInColNames({"image"})
.SetOutColNames({})
.SetTensorFuncs(func_list2)
.SetNumWorkers(15)
.SetPerformanceMode(false);
.SetNumWorkers(15);
rc = map_resize_builder.Build(&map_resize_op);
EXPECT_TRUE(rc.IsOk());
@ -739,5 +737,3 @@ TEST_F(MindDataTestMapOp, ImageFolder_Decode_Repeat_Resize_NoInputColumns) {
}
EXPECT_TRUE(i == 88);
}

View File

@ -19,7 +19,6 @@
#include <string>
#include "minddata/dataset/core/client.h"
#include "minddata/dataset/core/constants.h"
#include "minddata/dataset/engine/datasetops/map_op.h"
#include "minddata/dataset/engine/datasetops/rename_op.h"
#include "common/common.h"
#include "common/utils.h"

View File

@ -23,7 +23,6 @@
#include <thread>
#include "minddata/dataset/core/client.h"
#include "minddata/dataset/core/constants.h"
#include "minddata/dataset/engine/datasetops/map_op.h"
#include "minddata/dataset/engine/datasetops/zip_op.h"
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/core/config_manager.h"