forked from mindspore-Ecosystem/mindspore
!6017 per_batch_map needs to support x input columns and y output columns
Merge pull request !6017 from ZiruiWu/per_batch_map_multi_col
This commit is contained in:
commit
4c0b3c1bd3
|
@ -1548,7 +1548,7 @@ std::vector<std::shared_ptr<DatasetOp>> TFRecordDataset::Build() {
|
|||
|
||||
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
|
||||
|
||||
// Create and initalize TFReaderOp
|
||||
// Create and initialize TFReaderOp
|
||||
std::shared_ptr<TFReaderOp> tf_reader_op = std::make_shared<TFReaderOp>(
|
||||
num_workers_, worker_connector_size_, rows_per_buffer_, num_samples_, sorted_dir_files, std::move(data_schema),
|
||||
connector_que_size_, columns_list_, shuffle_files, num_shards_, shard_id_, shard_equal_rows_, nullptr);
|
||||
|
@ -1672,11 +1672,14 @@ std::vector<std::shared_ptr<DatasetOp>> BatchDataset::Build() {
|
|||
#ifdef ENABLE_PYTHON
|
||||
py::function noop;
|
||||
node_ops.push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_,
|
||||
cols_to_map_, noop, noop, pad_map_));
|
||||
cols_to_map_, cols_to_map_, noop, noop, pad_map_));
|
||||
#else
|
||||
node_ops.push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_,
|
||||
cols_to_map_, pad_map_));
|
||||
#endif
|
||||
|
||||
// Until py::function is implemented for C++ API, there is no need for a project op to be inserted after batch
|
||||
// because project is only needed when batch op performs per_batch_map. This per_batch_map is a pyfunc
|
||||
return node_ops;
|
||||
}
|
||||
|
||||
|
@ -1685,7 +1688,10 @@ bool BatchDataset::ValidateParams() {
|
|||
MS_LOG(ERROR) << "Batch: batch_size should be positive integer, but got: " << batch_size_;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!cols_to_map_.empty()) {
|
||||
MS_LOG(ERROR) << "cols_to_map functionality is not implemented in C++; this should be left empty.";
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -767,7 +767,7 @@ Status DEPipeline::ParseMapOp(const py::dict &args, std::shared_ptr<DatasetOp> *
|
|||
tensor_op_list.push_back(tensor_op);
|
||||
}
|
||||
}
|
||||
if (tensor_op_list.empty()) RETURN_STATUS_UNEXPECTED("Error: tensor_op is invalid or not set.");
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(!tensor_op_list.empty(), "Error: tensor_op is invalid or not set.");
|
||||
(void)map_builder.SetTensorFuncs(std::move(tensor_op_list));
|
||||
} else if (key == "cache") {
|
||||
cache_client = value.cast<std::shared_ptr<CacheClient>>();
|
||||
|
@ -913,6 +913,7 @@ Status DEPipeline::ParseGeneratorOp(const py::dict &args, std::shared_ptr<Datase
|
|||
Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp> *top,
|
||||
std::shared_ptr<DatasetOp> *bottom) {
|
||||
std::shared_ptr<BatchOp::Builder> builder;
|
||||
std::vector<std::string> project_columns;
|
||||
if (py::isinstance<py::int_>(args["batch_size"])) {
|
||||
batch_size_ = ToInt(args["batch_size"]);
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(batch_size_ > 0, "Error: batch_size is invalid.");
|
||||
|
@ -921,10 +922,8 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp>
|
|||
builder = std::make_shared<BatchOp::Builder>(1);
|
||||
(void)builder->SetBatchSizeFunc(args["batch_size"].cast<py::function>());
|
||||
} else {
|
||||
std::string err_msg = "Error: batch_size is neither an Integer nor a python function";
|
||||
RETURN_STATUS_UNEXPECTED(err_msg);
|
||||
RETURN_STATUS_UNEXPECTED("Error: batch_size is neither an Integer nor a python function.");
|
||||
}
|
||||
|
||||
for (auto arg : args) {
|
||||
std::string key = py::str(arg.first);
|
||||
py::handle value = arg.second;
|
||||
|
@ -936,7 +935,11 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp>
|
|||
} else if (key == "per_batch_map") {
|
||||
(void)builder->SetBatchMapFunc(value.cast<py::function>());
|
||||
} else if (key == "input_columns") {
|
||||
(void)builder->SetColumnsToMap(ToStringVector(value));
|
||||
(void)builder->SetInColNames(ToStringVector(value));
|
||||
} else if (key == "output_columns") {
|
||||
(void)builder->SetOutColNames(ToStringVector(value));
|
||||
} else if (key == "column_order") {
|
||||
project_columns = ToStringVector(value);
|
||||
} else if (key == "pad_info") {
|
||||
PadInfo pad_info;
|
||||
RETURN_IF_NOT_OK(ParsePadInfo(value, &pad_info));
|
||||
|
@ -945,9 +948,21 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp>
|
|||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<BatchOp> op;
|
||||
RETURN_IF_NOT_OK(builder->Build(&op));
|
||||
*top = op;
|
||||
std::shared_ptr<BatchOp> batch_op;
|
||||
RETURN_IF_NOT_OK(builder->Build(&batch_op));
|
||||
*top = batch_op;
|
||||
|
||||
// Add a project op over top of the batch if the user wanted to reposition the columns after per_batch_map
|
||||
if (!project_columns.empty()) {
|
||||
ProjectOp::Builder proj_builder(project_columns);
|
||||
std::shared_ptr<ProjectOp> proj_op;
|
||||
RETURN_IF_NOT_OK(proj_builder.Build(&proj_op));
|
||||
RETURN_IF_NOT_OK(tree_->AssociateNode(batch_op));
|
||||
RETURN_IF_NOT_OK(tree_->AssociateNode(proj_op));
|
||||
RETURN_IF_NOT_OK(proj_op->AddChild(batch_op));
|
||||
*top = proj_op;
|
||||
*bottom = batch_op;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ Status IteratorBase::GetNextAsOrderedPair(std::vector<std::pair<std::string, std
|
|||
if (column_order_.empty()) {
|
||||
const int32_t invalid_col_id = -1;
|
||||
column_order_.resize(num_cols, {std::string(), invalid_col_id});
|
||||
for (const auto itr : col_name_id_map_) {
|
||||
for (const auto &itr : col_name_id_map_) {
|
||||
int32_t ind = itr.second;
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(ind < num_cols && ind >= 0, "column id out of bounds.");
|
||||
column_order_[ind] = std::make_pair(itr.first, ind);
|
||||
|
|
|
@ -40,8 +40,8 @@ Status BatchOp::Builder::Build(std::shared_ptr<BatchOp> *ptr) {
|
|||
RETURN_IF_NOT_OK(SanityCheck());
|
||||
#ifdef ENABLE_PYTHON
|
||||
*ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
|
||||
builder_num_workers_, builder_cols_to_map_, builder_batch_size_func_,
|
||||
builder_batch_map_func_, builder_pad_map_);
|
||||
builder_num_workers_, builder_in_names_, builder_out_names_,
|
||||
builder_batch_size_func_, builder_batch_map_func_, builder_pad_map_);
|
||||
#else
|
||||
*ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
|
||||
builder_num_workers_, builder_cols_to_map_, builder_pad_map_);
|
||||
|
@ -65,18 +65,20 @@ Status BatchOp::Builder::SanityCheck() {
|
|||
|
||||
#ifdef ENABLE_PYTHON
|
||||
BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &cols_to_map, py::function batch_size_func, py::function batch_map_func,
|
||||
PadInfo pad_map)
|
||||
const std::vector<std::string> &in_col, const std::vector<std::string> &out_col,
|
||||
py::function batch_size_func, py::function batch_map_func, PadInfo pad_map)
|
||||
: ParallelOp(num_workers, op_queue_size),
|
||||
start_batch_size_(batch_size),
|
||||
drop_(drop),
|
||||
pad_(pad),
|
||||
pyfunc_column_names_(cols_to_map),
|
||||
in_col_names_(in_col),
|
||||
out_col_names_(out_col),
|
||||
batch_size_func_(batch_size_func),
|
||||
batch_map_func_(batch_map_func),
|
||||
pad_info_(pad_map) {
|
||||
worker_queues_.Init(num_workers, op_queue_size);
|
||||
}
|
||||
// if PYTHON is disabled. per_batch_map can't be used
|
||||
#else
|
||||
BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &cols_to_map, PadInfo pad_map)
|
||||
|
@ -236,7 +238,7 @@ Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatc
|
|||
std::unique_ptr<DataBuffer> *db) {
|
||||
RETURN_UNEXPECTED_IF_NULL(table_pair.first);
|
||||
#ifdef ENABLE_PYTHON
|
||||
if (!pyfunc_column_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
|
||||
if (!in_col_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
|
||||
#endif
|
||||
if (pad_) RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_)); // do padding if needed
|
||||
(*db) = std::make_unique<DataBuffer>(table_pair.second.batch_num_, DataBuffer::kDeBFlagNone);
|
||||
|
@ -264,33 +266,40 @@ Status BatchOp::EoeReceived(int32_t) {
|
|||
|
||||
#ifdef ENABLE_PYTHON
|
||||
Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
|
||||
TensorBatchTable input_table;
|
||||
input_table.reserve(pyfunc_column_names_.size());
|
||||
for (std::string col_name : pyfunc_column_names_) {
|
||||
if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) {
|
||||
RETURN_STATUS_UNEXPECTED("Invalid parameter, column name: '" + col_name + "' does not exist.\n");
|
||||
std::unique_ptr<TensorQTable> in_q_table = std::move(table_pair->first);
|
||||
size_t num_rows = in_q_table->size();
|
||||
auto out_q_table = std::make_unique<TensorQTable>(num_rows, TensorRow(column_name_id_map_.size(), nullptr));
|
||||
TensorTable in_cols(in_col_names_.size(), TensorRow(num_rows, nullptr)), out_cols;
|
||||
|
||||
std::unordered_map<std::string, size_t> in_col_name_id; // name of columns that need to be fed to per-batch_map
|
||||
for (size_t i = 0; i < in_col_names_.size(); i++) in_col_name_id.insert({in_col_names_[i], i});
|
||||
|
||||
for (const auto &itr : child_map_) {
|
||||
auto col_itr = in_col_name_id.find(itr.first);
|
||||
if (col_itr != in_col_name_id.end()) { // col needs to be prepared for per_batch_map
|
||||
for (size_t i = 0; i < num_rows; i++) {
|
||||
in_cols[col_itr->second][i] = std::move((*in_q_table)[i][itr.second]);
|
||||
}
|
||||
} else { // col needs to be placed into the out table
|
||||
size_t col_id = column_name_id_map_[itr.first];
|
||||
for (size_t i = 0; i < num_rows; i++) {
|
||||
(*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]);
|
||||
}
|
||||
}
|
||||
TensorBatch tensor_batch;
|
||||
tensor_batch.reserve(table_pair->first->size());
|
||||
size_t col_idx = static_cast<size_t>(column_name_id_map_[col_name]);
|
||||
for (size_t row_idx = 0; row_idx < table_pair->first->size(); row_idx++) {
|
||||
tensor_batch.push_back(std::move(table_pair->first->at(row_idx)[col_idx]));
|
||||
}
|
||||
input_table.push_back(std::move(tensor_batch));
|
||||
}
|
||||
|
||||
// Perform batch map
|
||||
TensorBatchTable output_table;
|
||||
RETURN_IF_NOT_OK(InvokeBatchMapFunc(&input_table, &output_table, table_pair->second));
|
||||
in_q_table.reset(); // release the input table
|
||||
RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second));
|
||||
|
||||
// Write back to TensorQTable
|
||||
for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) {
|
||||
size_t col_idx = static_cast<size_t>(column_name_id_map_[pyfunc_column_names_[input_idx]]);
|
||||
for (size_t i = 0; i < out_cols.size(); i++) {
|
||||
size_t col_id = column_name_id_map_[out_col_names_[i]];
|
||||
size_t row_id = 0;
|
||||
for (TensorRow &row : *(table_pair->first)) {
|
||||
row[col_idx] = std::move(output_table[input_idx][row_id++]);
|
||||
for (auto &t_row : *out_q_table) {
|
||||
t_row[col_id] = out_cols[i][row_id++];
|
||||
}
|
||||
}
|
||||
|
||||
table_pair->first = std::move(out_q_table);
|
||||
return Status::OK();
|
||||
}
|
||||
#endif
|
||||
|
@ -333,7 +342,7 @@ Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) {
|
|||
return Status(StatusCode::kOK, "Batch size func call succeed");
|
||||
}
|
||||
|
||||
Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *output, CBatchInfo info) {
|
||||
Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info) {
|
||||
{
|
||||
// Acquire Python GIL
|
||||
py::gil_scoped_acquire gil_acquire;
|
||||
|
@ -357,11 +366,10 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
|
|||
py::object ret_py_obj = batch_map_func_(*input_args);
|
||||
// Parse batch map return value
|
||||
py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj);
|
||||
if (ret_tuple.size() != pyfunc_column_names_.size() || !py::isinstance<py::tuple>(ret_tuple)) {
|
||||
return Status(StatusCode::kPyFuncException, "Invalid parameter, batch map function should return a tuple.");
|
||||
}
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(py::isinstance<py::tuple>(ret_tuple), "Batch map function should return a tuple");
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(ret_tuple.size() == out_col_names_.size(), "Incorrect number of columns returned.");
|
||||
for (size_t i = 0; i < ret_tuple.size(); i++) {
|
||||
TensorBatch output_batch;
|
||||
TensorRow output_batch;
|
||||
py::list output_list = py::cast<py::list>(ret_tuple[i]);
|
||||
for (size_t j = 0; j < output_list.size(); j++) {
|
||||
std::shared_ptr<Tensor> out;
|
||||
|
@ -377,7 +385,7 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
|
|||
"Invalid parameter, batch map function should return a tuple of list of numpy array.");
|
||||
}
|
||||
}
|
||||
return Status(StatusCode::kOK);
|
||||
return Status::OK();
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -386,7 +394,7 @@ Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &
|
|||
RETURN_UNEXPECTED_IF_NULL(table); // placeholder for now, might need this in the future
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(
|
||||
(*table)->front().size() == column_name_id_map.size(),
|
||||
"Invaid parameter, size of column_name_id_map must be equal to num of data columns. map size: " +
|
||||
"Invalid parameter, size of column_name_id_map must be equal to num of data columns. map size: " +
|
||||
std::to_string(column_name_id_map.size()) + ", column nums: " + std::to_string((*table)->front().size()));
|
||||
std::vector<std::shared_ptr<Tensor>> pad_vals(column_name_id_map.size(),
|
||||
0); // value to pad each column's tensor with, default 0
|
||||
|
@ -468,5 +476,57 @@ Status BatchOp::Accept(NodePass *p, bool *modified) {
|
|||
return p->RunOnNode(shared_from_base<BatchOp>(), modified);
|
||||
}
|
||||
|
||||
Status BatchOp::ComputeColMap() {
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(child_.size() == 1,
|
||||
"Batch has " + std::to_string(child_.size()) + " child/children, expects only 1 child.");
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(!(child_[0]->column_name_id_map().empty()), "BatchOp child map is empty.");
|
||||
|
||||
if (in_col_names_.empty()) { // if per_batch_map is not set, do not need to deal with out_col_names
|
||||
column_name_id_map_ = child_[0]->column_name_id_map();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// from this point onward, per_batch_map is needed, therefore, child_map_ must be set
|
||||
child_map_ = child_[0]->column_name_id_map();
|
||||
|
||||
// following logic deals with per_batch_map
|
||||
bool col_name_flag = (out_col_names_.empty() || out_col_names_ == in_col_names_); // true if col name is unchanged
|
||||
|
||||
// column names are unchanged
|
||||
if (col_name_flag) {
|
||||
if (out_col_names_.empty()) out_col_names_ = in_col_names_;
|
||||
column_name_id_map_ = child_map_;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// column names are changed from this point onward, this map is the child_map without input cols for per_batch_map
|
||||
auto child_map_no_in_col = child_map_;
|
||||
for (const auto &col : in_col_names_) {
|
||||
const auto itr = child_map_no_in_col.find(col);
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(itr != child_map_no_in_col.end(), "col:" + col + " doesn't exist.");
|
||||
child_map_no_in_col.erase(itr);
|
||||
}
|
||||
|
||||
// col names are changed
|
||||
if (out_col_names_.size() == in_col_names_.size()) { // column names changed, but same number of columns
|
||||
// the following code rename the input keys to output keys. ["a","b"] -> ["b", "a"] is allowed
|
||||
column_name_id_map_ = child_map_no_in_col;
|
||||
for (auto i = 0; i < in_col_names_.size(); i++) {
|
||||
column_name_id_map_[out_col_names_[i]] = child_map_[in_col_names_[i]];
|
||||
}
|
||||
} else { // number of columns are different, put the output column names first, then the original ones
|
||||
for (const std::string &col : out_col_names_) {
|
||||
column_name_id_map_.insert({col, column_name_id_map_.size()});
|
||||
}
|
||||
for (const auto &itr : child_map_no_in_col) {
|
||||
column_name_id_map_.insert({itr.first, column_name_id_map_.size()});
|
||||
}
|
||||
}
|
||||
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(column_name_id_map_.size() == (child_map_no_in_col.size() + out_col_names_.size()),
|
||||
"Key error in column_name_id_map_. output_columns is NOT set correctly!");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -36,8 +36,6 @@ namespace mindspore {
|
|||
namespace dataset {
|
||||
class DataBuffer;
|
||||
|
||||
using TensorBatch = TensorRow;
|
||||
using TensorBatchTable = std::vector<TensorBatch>;
|
||||
using PadInfo = std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>>;
|
||||
|
||||
class BatchOp : public ParallelOp {
|
||||
|
@ -81,11 +79,17 @@ class BatchOp : public ParallelOp {
|
|||
return *this;
|
||||
}
|
||||
|
||||
// set columns to perform map on
|
||||
// @param const std::vector<std::string> & cols_to_map - name of columns to perform map on
|
||||
// @return Builder & reference to builder class object
|
||||
Builder &SetColumnsToMap(const std::vector<std::string> &cols_to_map) {
|
||||
builder_cols_to_map_ = cols_to_map;
|
||||
/// \param in_col_name
|
||||
/// \return Builder & reference to builder class object
|
||||
Builder &SetInColNames(const std::vector<std::string> &in_col_name) {
|
||||
builder_in_names_ = in_col_name;
|
||||
return *this;
|
||||
}
|
||||
|
||||
/// \param out_col_name
|
||||
/// \return Builder & reference to builder class object
|
||||
Builder &SetOutColNames(const std::vector<std::string> &out_col_name) {
|
||||
builder_out_names_ = out_col_name;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -121,7 +125,8 @@ class BatchOp : public ParallelOp {
|
|||
int32_t builder_batch_size_;
|
||||
int32_t builder_num_workers_;
|
||||
int32_t builder_op_connector_size_;
|
||||
std::vector<std::string> builder_cols_to_map_;
|
||||
std::vector<std::string> builder_in_names_;
|
||||
std::vector<std::string> builder_out_names_;
|
||||
PadInfo builder_pad_map_;
|
||||
#ifdef ENABLE_PYTHON
|
||||
py::function builder_batch_size_func_;
|
||||
|
@ -149,14 +154,10 @@ class BatchOp : public ParallelOp {
|
|||
};
|
||||
|
||||
#ifdef ENABLE_PYTHON
|
||||
// BatchOp constructor
|
||||
// @param int32_t batch_size
|
||||
// @param bool drop
|
||||
// @param int32_t op_queue_size
|
||||
// @param int32_t rows_per_buf
|
||||
// @param int32_t num_workers
|
||||
|
||||
BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &, py::function batch_size_func, py::function batch_map_func, PadInfo pad_map);
|
||||
const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names,
|
||||
py::function batch_size_func, py::function batch_map_func, PadInfo pad_map);
|
||||
#else
|
||||
BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &, PadInfo pad_map);
|
||||
|
@ -218,6 +219,9 @@ class BatchOp : public ParallelOp {
|
|||
static Status PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info,
|
||||
const std::unordered_map<std::string, int32_t> &column_name_id_map);
|
||||
|
||||
protected:
|
||||
Status ComputeColMap() override;
|
||||
|
||||
private:
|
||||
// Worker thread for doing the memcpy of batch
|
||||
// @param int32_t param workerId
|
||||
|
@ -270,11 +274,13 @@ class BatchOp : public ParallelOp {
|
|||
#endif
|
||||
|
||||
int32_t start_batch_size_;
|
||||
bool drop_; // bool for whether to drop remainder or not
|
||||
bool pad_; // bool for whether to perform padding on tensor
|
||||
std::vector<std::string> pyfunc_column_names_; // Name of the columns to perform map op on
|
||||
PadInfo pad_info_; // column names to perform padding on
|
||||
std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1
|
||||
const bool drop_; // bool for whether to drop remainder or not
|
||||
const bool pad_; // bool for whether to perform padding on tensor
|
||||
const std::vector<std::string> in_col_names_; // input column name for per_batch_map
|
||||
std::vector<std::string> out_col_names_; // output column name for per_batch_map
|
||||
PadInfo pad_info_; // column names to perform padding on
|
||||
std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1
|
||||
std::unordered_map<std::string, int32_t> child_map_; // col_name_id_map of the child node
|
||||
QueueList<std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>> worker_queues_; // internal queue for syncing worker
|
||||
#ifdef ENABLE_PYTHON
|
||||
py::function batch_size_func_; // Function pointer of batch size function
|
||||
|
|
|
@ -58,6 +58,7 @@ try:
|
|||
except ModuleNotFoundError:
|
||||
context = None
|
||||
|
||||
|
||||
class Shuffle(str, Enum):
|
||||
GLOBAL: str = "global"
|
||||
FILES: str = "file"
|
||||
|
@ -274,14 +275,14 @@ class Dataset:
|
|||
of Tensors on a given column. The number of lists should match with number of entries in input_columns.
|
||||
The last parameter of the callable should always be a BatchInfo object.
|
||||
input_columns (list[str], optional): List of names of the input columns. The size of the list should
|
||||
match with signature of the per_batch_map callable.
|
||||
output_columns (list[str], optional): [Not currently implemented] List of names assigned to the columns
|
||||
match with signature of per_batch_map callable.
|
||||
output_columns (list[str], optional): List of names assigned to the columns
|
||||
outputted by the last operation. This parameter is mandatory if len(input_columns) !=
|
||||
len(output_columns). The size of this list must match the number of output
|
||||
columns of the last operation. (default=None, output columns will have the same
|
||||
name as the input columns, i.e., the columns will be replaced).
|
||||
column_order (list[str], optional): [Not currently implemented] List of all the desired columns to
|
||||
propagate to the child node. This list must be a subset of all the columns in the dataset after
|
||||
column_order (list[str], optional): List of all the desired columns to propagate to
|
||||
the child node. This list must be a subset of all the columns in the dataset after
|
||||
all operations are applied. The order of the columns in each row propagated to the
|
||||
child node follow the order they appear in this list. The parameter is mandatory
|
||||
if the len(input_columns) != len(output_columns). (default=None, all columns
|
||||
|
@ -1703,9 +1704,9 @@ class BatchDataset(DatasetOp):
|
|||
self.batch_size = batch_size
|
||||
self.drop_remainder = drop_remainder
|
||||
self.per_batch_map = per_batch_map
|
||||
self.input_columns = input_columns
|
||||
self.output_columns = output_columns
|
||||
self.column_order = column_order
|
||||
self.input_columns = input_columns if not isinstance(input_columns, str) else [input_columns]
|
||||
self.output_columns = output_columns if not isinstance(output_columns, str) else [output_columns]
|
||||
self.column_order = column_order if not isinstance(column_order, str) else [column_order]
|
||||
self.pad_info = pad_info
|
||||
self.children.append(input_dataset)
|
||||
input_dataset.parent.append(self)
|
||||
|
@ -1717,6 +1718,8 @@ class BatchDataset(DatasetOp):
|
|||
args["drop_remainder"] = self.drop_remainder
|
||||
args["per_batch_map"] = self.per_batch_map
|
||||
args["input_columns"] = self.input_columns
|
||||
args["output_columns"] = self.output_columns
|
||||
args["column_order"] = self.column_order
|
||||
args["pad_info"] = self.pad_info
|
||||
return args
|
||||
|
||||
|
|
|
@ -276,6 +276,7 @@ def check_save(method):
|
|||
|
||||
return new_method
|
||||
|
||||
|
||||
def check_iterator(method):
|
||||
"""A wrapper that wraps a parameter checker around the original create_tuple_iterator and create_dict_iterator."""
|
||||
|
||||
|
@ -529,10 +530,10 @@ def check_batch(method):
|
|||
raise ValueError("the signature of per_batch_map should match with input columns")
|
||||
|
||||
if output_columns is not None:
|
||||
raise ValueError("output_columns is currently not implemented.")
|
||||
check_columns(output_columns, "output_columns")
|
||||
|
||||
if column_order is not None:
|
||||
raise ValueError("column_order is currently not implemented.")
|
||||
check_columns(column_order, "column_order")
|
||||
|
||||
return method(self, *args, **kwargs)
|
||||
|
||||
|
|
|
@ -449,22 +449,6 @@ def test_batch_exception_13():
|
|||
logger.info("Got an exception in DE: {}".format(str(e)))
|
||||
assert "shard_id" in str(e)
|
||||
|
||||
# test non-functional parameters
|
||||
try:
|
||||
data1 = data1.batch(batch_size, output_columns="3")
|
||||
sum([1 for _ in data1])
|
||||
|
||||
except ValueError as e:
|
||||
logger.info("Got an exception in DE: {}".format(str(e)))
|
||||
assert "output_columns is currently not implemented." in str(e)
|
||||
|
||||
try:
|
||||
data1 = data1.batch(batch_size, column_order="3")
|
||||
sum([1 for _ in data1])
|
||||
|
||||
except ValueError as e:
|
||||
logger.info("Got an exception in DE: {}".format(str(e)))
|
||||
assert "column_order is currently not implemented." in str(e)
|
||||
|
||||
def test_batch_exception_14():
|
||||
batch_size = 2
|
||||
|
|
|
@ -289,11 +289,11 @@ def test_exception():
|
|||
|
||||
def bad_batch_size(batchInfo):
|
||||
raise StopIteration
|
||||
#return batchInfo.get_batch_num()
|
||||
# return batchInfo.get_batch_num()
|
||||
|
||||
def bad_map_func(col, batchInfo):
|
||||
raise StopIteration
|
||||
#return (col,)
|
||||
# return (col,)
|
||||
|
||||
data1 = ds.GeneratorDataset((lambda: gen(100)), ["num"]).batch(bad_batch_size)
|
||||
try:
|
||||
|
@ -312,6 +312,68 @@ def test_exception():
|
|||
pass
|
||||
|
||||
|
||||
def test_multi_col_map():
|
||||
def gen_2_cols(num):
|
||||
for i in range(1, 1 + num):
|
||||
yield (np.array([i]), np.array([i ** 2]))
|
||||
|
||||
def split_col(col, batchInfo):
|
||||
return ([np.copy(arr) for arr in col], [np.copy(-arr) for arr in col])
|
||||
|
||||
def merge_col(col1, col2, batchInfo):
|
||||
merged = []
|
||||
for k, v in enumerate(col1):
|
||||
merged.append(np.array(v + col2[k]))
|
||||
return (merged,)
|
||||
|
||||
def swap_col(col1, col2, batchInfo):
|
||||
return ([np.copy(a) for a in col2], [np.copy(b) for b in col1])
|
||||
|
||||
def batch_map_config(num, s, f, in_nms, out_nms, col_order=None):
|
||||
try:
|
||||
dst = ds.GeneratorDataset((lambda: gen_2_cols(num)), ["col1", "col2"])
|
||||
dst = dst.batch(batch_size=s, input_columns=in_nms, output_columns=out_nms, per_batch_map=f,
|
||||
column_order=col_order)
|
||||
res = []
|
||||
for row in dst.create_dict_iterator(num_epochs=1, output_numpy=True):
|
||||
res.append(row)
|
||||
return res
|
||||
except (ValueError, RuntimeError, TypeError) as e:
|
||||
return str(e)
|
||||
|
||||
# split 1 col into 2 cols
|
||||
res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"])[0]
|
||||
assert np.array_equal(res["col1"], [[1], [2]])
|
||||
assert np.array_equal(res["col_x"], [[1], [4]]) and np.array_equal(res["col_y"], [[-1], [-4]])
|
||||
|
||||
# merge 2 cols into 1 col
|
||||
res = batch_map_config(4, 4, merge_col, ["col1", "col2"], ["merged"])[0]
|
||||
assert np.array_equal(res["merged"], [[2], [6], [12], [20]])
|
||||
|
||||
# swap once
|
||||
res = batch_map_config(3, 3, swap_col, ["col1", "col2"], ["col1", "col2"])[0]
|
||||
assert np.array_equal(res["col1"], [[1], [4], [9]]) and np.array_equal(res["col2"], [[1], [2], [3]])
|
||||
|
||||
# swap twice
|
||||
res = batch_map_config(3, 3, swap_col, ["col1", "col2"], ["col2", "col1"])[0]
|
||||
assert np.array_equal(res["col2"], [[1], [4], [9]]) and np.array_equal(res["col1"], [[1], [2], [3]])
|
||||
|
||||
# test project after map
|
||||
res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"], ["col_x", "col_y", "col1"])[0]
|
||||
assert list(res.keys()) == ["col_x", "col_y", "col1"]
|
||||
|
||||
# test the insertion order is maintained
|
||||
res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"], ["col1", "col_x", "col_y"])[0]
|
||||
assert list(res.keys()) == ["col1", "col_x", "col_y"]
|
||||
|
||||
# test exceptions
|
||||
assert "output_columns with value 233 is not of type" in batch_map_config(2, 2, split_col, ["col2"], 233)
|
||||
assert "column_order with value 233 is not of type" in batch_map_config(2, 2, split_col, ["col2"], ["col1"], 233)
|
||||
assert "output_columns is NOT set correctly" in batch_map_config(2, 2, split_col, ["col2"], ["col1"])
|
||||
assert "Incorrect number of columns" in batch_map_config(2, 2, split_col, ["col2"], ["col3", "col4", "col5"])
|
||||
assert "col-1 doesn't exist" in batch_map_config(2, 2, split_col, ["col-1"], ["col_x", "col_y"])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logger.info("Running test_var_batch_map.py test_batch_corner_cases() function")
|
||||
test_batch_corner_cases()
|
||||
|
@ -333,3 +395,6 @@ if __name__ == '__main__':
|
|||
|
||||
logger.info("Running test_var_batch_map.py test_exception() function")
|
||||
test_exception()
|
||||
|
||||
logger.info("Running test_var_batch_map.py test_multi_col_map() function")
|
||||
test_multi_col_map()
|
||||
|
|
Loading…
Reference in New Issue