forked from mindspore-Ecosystem/mindspore
per_batch_map supports x num_columns to y num_columns
decided to use project after batch as well per_batch_map 1->x colunm works add more test case add test cases, project after map col order hahaah fix ci add test cases addr review cmts ci fix build err fix test case failure ci
This commit is contained in:
parent
16b77da7dd
commit
0cab373223
|
@ -1548,7 +1548,7 @@ std::vector<std::shared_ptr<DatasetOp>> TFRecordDataset::Build() {
|
|||
|
||||
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
|
||||
|
||||
// Create and initalize TFReaderOp
|
||||
// Create and initialize TFReaderOp
|
||||
std::shared_ptr<TFReaderOp> tf_reader_op = std::make_shared<TFReaderOp>(
|
||||
num_workers_, worker_connector_size_, rows_per_buffer_, num_samples_, sorted_dir_files, std::move(data_schema),
|
||||
connector_que_size_, columns_list_, shuffle_files, num_shards_, shard_id_, shard_equal_rows_, nullptr);
|
||||
|
@ -1672,11 +1672,14 @@ std::vector<std::shared_ptr<DatasetOp>> BatchDataset::Build() {
|
|||
#ifdef ENABLE_PYTHON
|
||||
py::function noop;
|
||||
node_ops.push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_,
|
||||
cols_to_map_, noop, noop, pad_map_));
|
||||
cols_to_map_, cols_to_map_, noop, noop, pad_map_));
|
||||
#else
|
||||
node_ops.push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_,
|
||||
cols_to_map_, pad_map_));
|
||||
#endif
|
||||
|
||||
// Until py::function is implemented for C++ API, there is no need for a project op to be inserted after batch
|
||||
// because project is only needed when batch op performs per_batch_map. This per_batch_map is a pyfunc
|
||||
return node_ops;
|
||||
}
|
||||
|
||||
|
@ -1685,7 +1688,10 @@ bool BatchDataset::ValidateParams() {
|
|||
MS_LOG(ERROR) << "Batch: batch_size should be positive integer, but got: " << batch_size_;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!cols_to_map_.empty()) {
|
||||
MS_LOG(ERROR) << "cols_to_map functionality is not implemented in C++; this should be left empty.";
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -767,7 +767,7 @@ Status DEPipeline::ParseMapOp(const py::dict &args, std::shared_ptr<DatasetOp> *
|
|||
tensor_op_list.push_back(tensor_op);
|
||||
}
|
||||
}
|
||||
if (tensor_op_list.empty()) RETURN_STATUS_UNEXPECTED("Error: tensor_op is invalid or not set.");
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(!tensor_op_list.empty(), "Error: tensor_op is invalid or not set.");
|
||||
(void)map_builder.SetTensorFuncs(std::move(tensor_op_list));
|
||||
} else if (key == "cache") {
|
||||
cache_client = value.cast<std::shared_ptr<CacheClient>>();
|
||||
|
@ -913,6 +913,7 @@ Status DEPipeline::ParseGeneratorOp(const py::dict &args, std::shared_ptr<Datase
|
|||
Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp> *top,
|
||||
std::shared_ptr<DatasetOp> *bottom) {
|
||||
std::shared_ptr<BatchOp::Builder> builder;
|
||||
std::vector<std::string> project_columns;
|
||||
if (py::isinstance<py::int_>(args["batch_size"])) {
|
||||
batch_size_ = ToInt(args["batch_size"]);
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(batch_size_ > 0, "Error: batch_size is invalid.");
|
||||
|
@ -921,10 +922,8 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp>
|
|||
builder = std::make_shared<BatchOp::Builder>(1);
|
||||
(void)builder->SetBatchSizeFunc(args["batch_size"].cast<py::function>());
|
||||
} else {
|
||||
std::string err_msg = "Error: batch_size is neither an Integer nor a python function";
|
||||
RETURN_STATUS_UNEXPECTED(err_msg);
|
||||
RETURN_STATUS_UNEXPECTED("Error: batch_size is neither an Integer nor a python function.");
|
||||
}
|
||||
|
||||
for (auto arg : args) {
|
||||
std::string key = py::str(arg.first);
|
||||
py::handle value = arg.second;
|
||||
|
@ -936,7 +935,11 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp>
|
|||
} else if (key == "per_batch_map") {
|
||||
(void)builder->SetBatchMapFunc(value.cast<py::function>());
|
||||
} else if (key == "input_columns") {
|
||||
(void)builder->SetColumnsToMap(ToStringVector(value));
|
||||
(void)builder->SetInColNames(ToStringVector(value));
|
||||
} else if (key == "output_columns") {
|
||||
(void)builder->SetOutColNames(ToStringVector(value));
|
||||
} else if (key == "column_order") {
|
||||
project_columns = ToStringVector(value);
|
||||
} else if (key == "pad_info") {
|
||||
PadInfo pad_info;
|
||||
RETURN_IF_NOT_OK(ParsePadInfo(value, &pad_info));
|
||||
|
@ -945,9 +948,21 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr<DatasetOp>
|
|||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<BatchOp> op;
|
||||
RETURN_IF_NOT_OK(builder->Build(&op));
|
||||
*top = op;
|
||||
std::shared_ptr<BatchOp> batch_op;
|
||||
RETURN_IF_NOT_OK(builder->Build(&batch_op));
|
||||
*top = batch_op;
|
||||
|
||||
// Add a project op over top of the batch if the user wanted to reposition the columns after per_batch_map
|
||||
if (!project_columns.empty()) {
|
||||
ProjectOp::Builder proj_builder(project_columns);
|
||||
std::shared_ptr<ProjectOp> proj_op;
|
||||
RETURN_IF_NOT_OK(proj_builder.Build(&proj_op));
|
||||
RETURN_IF_NOT_OK(tree_->AssociateNode(batch_op));
|
||||
RETURN_IF_NOT_OK(tree_->AssociateNode(proj_op));
|
||||
RETURN_IF_NOT_OK(proj_op->AddChild(batch_op));
|
||||
*top = proj_op;
|
||||
*bottom = batch_op;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ Status IteratorBase::GetNextAsOrderedPair(std::vector<std::pair<std::string, std
|
|||
if (column_order_.empty()) {
|
||||
const int32_t invalid_col_id = -1;
|
||||
column_order_.resize(num_cols, {std::string(), invalid_col_id});
|
||||
for (const auto itr : col_name_id_map_) {
|
||||
for (const auto &itr : col_name_id_map_) {
|
||||
int32_t ind = itr.second;
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(ind < num_cols && ind >= 0, "column id out of bounds.");
|
||||
column_order_[ind] = std::make_pair(itr.first, ind);
|
||||
|
|
|
@ -40,8 +40,8 @@ Status BatchOp::Builder::Build(std::shared_ptr<BatchOp> *ptr) {
|
|||
RETURN_IF_NOT_OK(SanityCheck());
|
||||
#ifdef ENABLE_PYTHON
|
||||
*ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
|
||||
builder_num_workers_, builder_cols_to_map_, builder_batch_size_func_,
|
||||
builder_batch_map_func_, builder_pad_map_);
|
||||
builder_num_workers_, builder_in_names_, builder_out_names_,
|
||||
builder_batch_size_func_, builder_batch_map_func_, builder_pad_map_);
|
||||
#else
|
||||
*ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
|
||||
builder_num_workers_, builder_cols_to_map_, builder_pad_map_);
|
||||
|
@ -65,18 +65,20 @@ Status BatchOp::Builder::SanityCheck() {
|
|||
|
||||
#ifdef ENABLE_PYTHON
|
||||
BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &cols_to_map, py::function batch_size_func, py::function batch_map_func,
|
||||
PadInfo pad_map)
|
||||
const std::vector<std::string> &in_col, const std::vector<std::string> &out_col,
|
||||
py::function batch_size_func, py::function batch_map_func, PadInfo pad_map)
|
||||
: ParallelOp(num_workers, op_queue_size),
|
||||
start_batch_size_(batch_size),
|
||||
drop_(drop),
|
||||
pad_(pad),
|
||||
pyfunc_column_names_(cols_to_map),
|
||||
in_col_names_(in_col),
|
||||
out_col_names_(out_col),
|
||||
batch_size_func_(batch_size_func),
|
||||
batch_map_func_(batch_map_func),
|
||||
pad_info_(pad_map) {
|
||||
worker_queues_.Init(num_workers, op_queue_size);
|
||||
}
|
||||
// if PYTHON is disabled. per_batch_map can't be used
|
||||
#else
|
||||
BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &cols_to_map, PadInfo pad_map)
|
||||
|
@ -236,7 +238,7 @@ Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatc
|
|||
std::unique_ptr<DataBuffer> *db) {
|
||||
RETURN_UNEXPECTED_IF_NULL(table_pair.first);
|
||||
#ifdef ENABLE_PYTHON
|
||||
if (!pyfunc_column_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
|
||||
if (!in_col_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
|
||||
#endif
|
||||
if (pad_) RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_)); // do padding if needed
|
||||
(*db) = std::make_unique<DataBuffer>(table_pair.second.batch_num_, DataBuffer::kDeBFlagNone);
|
||||
|
@ -264,33 +266,40 @@ Status BatchOp::EoeReceived(int32_t) {
|
|||
|
||||
#ifdef ENABLE_PYTHON
|
||||
Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
|
||||
TensorBatchTable input_table;
|
||||
input_table.reserve(pyfunc_column_names_.size());
|
||||
for (std::string col_name : pyfunc_column_names_) {
|
||||
if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) {
|
||||
RETURN_STATUS_UNEXPECTED("Invalid parameter, column name: '" + col_name + "' does not exist.\n");
|
||||
std::unique_ptr<TensorQTable> in_q_table = std::move(table_pair->first);
|
||||
size_t num_rows = in_q_table->size();
|
||||
auto out_q_table = std::make_unique<TensorQTable>(num_rows, TensorRow(column_name_id_map_.size(), nullptr));
|
||||
TensorTable in_cols(in_col_names_.size(), TensorRow(num_rows, nullptr)), out_cols;
|
||||
|
||||
std::unordered_map<std::string, size_t> in_col_name_id; // name of columns that need to be fed to per-batch_map
|
||||
for (size_t i = 0; i < in_col_names_.size(); i++) in_col_name_id.insert({in_col_names_[i], i});
|
||||
|
||||
for (const auto &itr : child_map_) {
|
||||
auto col_itr = in_col_name_id.find(itr.first);
|
||||
if (col_itr != in_col_name_id.end()) { // col needs to be prepared for per_batch_map
|
||||
for (size_t i = 0; i < num_rows; i++) {
|
||||
in_cols[col_itr->second][i] = std::move((*in_q_table)[i][itr.second]);
|
||||
}
|
||||
} else { // col needs to be placed into the out table
|
||||
size_t col_id = column_name_id_map_[itr.first];
|
||||
for (size_t i = 0; i < num_rows; i++) {
|
||||
(*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]);
|
||||
}
|
||||
}
|
||||
TensorBatch tensor_batch;
|
||||
tensor_batch.reserve(table_pair->first->size());
|
||||
size_t col_idx = static_cast<size_t>(column_name_id_map_[col_name]);
|
||||
for (size_t row_idx = 0; row_idx < table_pair->first->size(); row_idx++) {
|
||||
tensor_batch.push_back(std::move(table_pair->first->at(row_idx)[col_idx]));
|
||||
}
|
||||
input_table.push_back(std::move(tensor_batch));
|
||||
}
|
||||
|
||||
// Perform batch map
|
||||
TensorBatchTable output_table;
|
||||
RETURN_IF_NOT_OK(InvokeBatchMapFunc(&input_table, &output_table, table_pair->second));
|
||||
in_q_table.reset(); // release the input table
|
||||
RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second));
|
||||
|
||||
// Write back to TensorQTable
|
||||
for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) {
|
||||
size_t col_idx = static_cast<size_t>(column_name_id_map_[pyfunc_column_names_[input_idx]]);
|
||||
for (size_t i = 0; i < out_cols.size(); i++) {
|
||||
size_t col_id = column_name_id_map_[out_col_names_[i]];
|
||||
size_t row_id = 0;
|
||||
for (TensorRow &row : *(table_pair->first)) {
|
||||
row[col_idx] = std::move(output_table[input_idx][row_id++]);
|
||||
for (auto &t_row : *out_q_table) {
|
||||
t_row[col_id] = out_cols[i][row_id++];
|
||||
}
|
||||
}
|
||||
|
||||
table_pair->first = std::move(out_q_table);
|
||||
return Status::OK();
|
||||
}
|
||||
#endif
|
||||
|
@ -333,7 +342,7 @@ Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) {
|
|||
return Status(StatusCode::kOK, "Batch size func call succeed");
|
||||
}
|
||||
|
||||
Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *output, CBatchInfo info) {
|
||||
Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info) {
|
||||
{
|
||||
// Acquire Python GIL
|
||||
py::gil_scoped_acquire gil_acquire;
|
||||
|
@ -357,11 +366,10 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
|
|||
py::object ret_py_obj = batch_map_func_(*input_args);
|
||||
// Parse batch map return value
|
||||
py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj);
|
||||
if (ret_tuple.size() != pyfunc_column_names_.size() || !py::isinstance<py::tuple>(ret_tuple)) {
|
||||
return Status(StatusCode::kPyFuncException, "Invalid parameter, batch map function should return a tuple.");
|
||||
}
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(py::isinstance<py::tuple>(ret_tuple), "Batch map function should return a tuple");
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(ret_tuple.size() == out_col_names_.size(), "Incorrect number of columns returned.");
|
||||
for (size_t i = 0; i < ret_tuple.size(); i++) {
|
||||
TensorBatch output_batch;
|
||||
TensorRow output_batch;
|
||||
py::list output_list = py::cast<py::list>(ret_tuple[i]);
|
||||
for (size_t j = 0; j < output_list.size(); j++) {
|
||||
std::shared_ptr<Tensor> out;
|
||||
|
@ -377,7 +385,7 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
|
|||
"Invalid parameter, batch map function should return a tuple of list of numpy array.");
|
||||
}
|
||||
}
|
||||
return Status(StatusCode::kOK);
|
||||
return Status::OK();
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -386,7 +394,7 @@ Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &
|
|||
RETURN_UNEXPECTED_IF_NULL(table); // placeholder for now, might need this in the future
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(
|
||||
(*table)->front().size() == column_name_id_map.size(),
|
||||
"Invaid parameter, size of column_name_id_map must be equal to num of data columns. map size: " +
|
||||
"Invalid parameter, size of column_name_id_map must be equal to num of data columns. map size: " +
|
||||
std::to_string(column_name_id_map.size()) + ", column nums: " + std::to_string((*table)->front().size()));
|
||||
std::vector<std::shared_ptr<Tensor>> pad_vals(column_name_id_map.size(),
|
||||
0); // value to pad each column's tensor with, default 0
|
||||
|
@ -468,5 +476,57 @@ Status BatchOp::Accept(NodePass *p, bool *modified) {
|
|||
return p->RunOnNode(shared_from_base<BatchOp>(), modified);
|
||||
}
|
||||
|
||||
Status BatchOp::ComputeColMap() {
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(child_.size() == 1,
|
||||
"Batch has " + std::to_string(child_.size()) + " child/children, expects only 1 child.");
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(!(child_[0]->column_name_id_map().empty()), "BatchOp child map is empty.");
|
||||
|
||||
if (in_col_names_.empty()) { // if per_batch_map is not set, do not need to deal with out_col_names
|
||||
column_name_id_map_ = child_[0]->column_name_id_map();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// from this point onward, per_batch_map is needed, therefore, child_map_ must be set
|
||||
child_map_ = child_[0]->column_name_id_map();
|
||||
|
||||
// following logic deals with per_batch_map
|
||||
bool col_name_flag = (out_col_names_.empty() || out_col_names_ == in_col_names_); // true if col name is unchanged
|
||||
|
||||
// column names are unchanged
|
||||
if (col_name_flag) {
|
||||
if (out_col_names_.empty()) out_col_names_ = in_col_names_;
|
||||
column_name_id_map_ = child_map_;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// column names are changed from this point onward, this map is the child_map without input cols for per_batch_map
|
||||
auto child_map_no_in_col = child_map_;
|
||||
for (const auto &col : in_col_names_) {
|
||||
const auto itr = child_map_no_in_col.find(col);
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(itr != child_map_no_in_col.end(), "col:" + col + " doesn't exist.");
|
||||
child_map_no_in_col.erase(itr);
|
||||
}
|
||||
|
||||
// col names are changed
|
||||
if (out_col_names_.size() == in_col_names_.size()) { // column names changed, but same number of columns
|
||||
// the following code rename the input keys to output keys. ["a","b"] -> ["b", "a"] is allowed
|
||||
column_name_id_map_ = child_map_no_in_col;
|
||||
for (auto i = 0; i < in_col_names_.size(); i++) {
|
||||
column_name_id_map_[out_col_names_[i]] = child_map_[in_col_names_[i]];
|
||||
}
|
||||
} else { // number of columns are different, put the output column names first, then the original ones
|
||||
for (const std::string &col : out_col_names_) {
|
||||
column_name_id_map_.insert({col, column_name_id_map_.size()});
|
||||
}
|
||||
for (const auto &itr : child_map_no_in_col) {
|
||||
column_name_id_map_.insert({itr.first, column_name_id_map_.size()});
|
||||
}
|
||||
}
|
||||
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(column_name_id_map_.size() == (child_map_no_in_col.size() + out_col_names_.size()),
|
||||
"Key error in column_name_id_map_. output_columns is NOT set correctly!");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
|
|
@ -36,8 +36,6 @@ namespace mindspore {
|
|||
namespace dataset {
|
||||
class DataBuffer;
|
||||
|
||||
using TensorBatch = TensorRow;
|
||||
using TensorBatchTable = std::vector<TensorBatch>;
|
||||
using PadInfo = std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>>;
|
||||
|
||||
class BatchOp : public ParallelOp {
|
||||
|
@ -81,11 +79,17 @@ class BatchOp : public ParallelOp {
|
|||
return *this;
|
||||
}
|
||||
|
||||
// set columns to perform map on
|
||||
// @param const std::vector<std::string> & cols_to_map - name of columns to perform map on
|
||||
// @return Builder & reference to builder class object
|
||||
Builder &SetColumnsToMap(const std::vector<std::string> &cols_to_map) {
|
||||
builder_cols_to_map_ = cols_to_map;
|
||||
/// \param in_col_name
|
||||
/// \return Builder & reference to builder class object
|
||||
Builder &SetInColNames(const std::vector<std::string> &in_col_name) {
|
||||
builder_in_names_ = in_col_name;
|
||||
return *this;
|
||||
}
|
||||
|
||||
/// \param out_col_name
|
||||
/// \return Builder & reference to builder class object
|
||||
Builder &SetOutColNames(const std::vector<std::string> &out_col_name) {
|
||||
builder_out_names_ = out_col_name;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -121,7 +125,8 @@ class BatchOp : public ParallelOp {
|
|||
int32_t builder_batch_size_;
|
||||
int32_t builder_num_workers_;
|
||||
int32_t builder_op_connector_size_;
|
||||
std::vector<std::string> builder_cols_to_map_;
|
||||
std::vector<std::string> builder_in_names_;
|
||||
std::vector<std::string> builder_out_names_;
|
||||
PadInfo builder_pad_map_;
|
||||
#ifdef ENABLE_PYTHON
|
||||
py::function builder_batch_size_func_;
|
||||
|
@ -149,14 +154,10 @@ class BatchOp : public ParallelOp {
|
|||
};
|
||||
|
||||
#ifdef ENABLE_PYTHON
|
||||
// BatchOp constructor
|
||||
// @param int32_t batch_size
|
||||
// @param bool drop
|
||||
// @param int32_t op_queue_size
|
||||
// @param int32_t rows_per_buf
|
||||
// @param int32_t num_workers
|
||||
|
||||
BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &, py::function batch_size_func, py::function batch_map_func, PadInfo pad_map);
|
||||
const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names,
|
||||
py::function batch_size_func, py::function batch_map_func, PadInfo pad_map);
|
||||
#else
|
||||
BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
|
||||
const std::vector<std::string> &, PadInfo pad_map);
|
||||
|
@ -218,6 +219,9 @@ class BatchOp : public ParallelOp {
|
|||
static Status PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info,
|
||||
const std::unordered_map<std::string, int32_t> &column_name_id_map);
|
||||
|
||||
protected:
|
||||
Status ComputeColMap() override;
|
||||
|
||||
private:
|
||||
// Worker thread for doing the memcpy of batch
|
||||
// @param int32_t param workerId
|
||||
|
@ -270,11 +274,13 @@ class BatchOp : public ParallelOp {
|
|||
#endif
|
||||
|
||||
int32_t start_batch_size_;
|
||||
bool drop_; // bool for whether to drop remainder or not
|
||||
bool pad_; // bool for whether to perform padding on tensor
|
||||
std::vector<std::string> pyfunc_column_names_; // Name of the columns to perform map op on
|
||||
PadInfo pad_info_; // column names to perform padding on
|
||||
std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1
|
||||
const bool drop_; // bool for whether to drop remainder or not
|
||||
const bool pad_; // bool for whether to perform padding on tensor
|
||||
const std::vector<std::string> in_col_names_; // input column name for per_batch_map
|
||||
std::vector<std::string> out_col_names_; // output column name for per_batch_map
|
||||
PadInfo pad_info_; // column names to perform padding on
|
||||
std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1
|
||||
std::unordered_map<std::string, int32_t> child_map_; // col_name_id_map of the child node
|
||||
QueueList<std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>> worker_queues_; // internal queue for syncing worker
|
||||
#ifdef ENABLE_PYTHON
|
||||
py::function batch_size_func_; // Function pointer of batch size function
|
||||
|
|
|
@ -55,6 +55,7 @@ try:
|
|||
except ModuleNotFoundError:
|
||||
context = None
|
||||
|
||||
|
||||
class Shuffle(str, Enum):
|
||||
GLOBAL: str = "global"
|
||||
FILES: str = "file"
|
||||
|
@ -271,14 +272,14 @@ class Dataset:
|
|||
of Tensors on a given column. The number of lists should match with number of entries in input_columns.
|
||||
The last parameter of the callable should always be a BatchInfo object.
|
||||
input_columns (list[str], optional): List of names of the input columns. The size of the list should
|
||||
match with signature of the per_batch_map callable.
|
||||
output_columns (list[str], optional): [Not currently implemented] List of names assigned to the columns
|
||||
match with signature of per_batch_map callable.
|
||||
output_columns (list[str], optional): List of names assigned to the columns
|
||||
outputted by the last operation. This parameter is mandatory if len(input_columns) !=
|
||||
len(output_columns). The size of this list must match the number of output
|
||||
columns of the last operation. (default=None, output columns will have the same
|
||||
name as the input columns, i.e., the columns will be replaced).
|
||||
column_order (list[str], optional): [Not currently implemented] List of all the desired columns to
|
||||
propagate to the child node. This list must be a subset of all the columns in the dataset after
|
||||
column_order (list[str], optional): List of all the desired columns to propagate to
|
||||
the child node. This list must be a subset of all the columns in the dataset after
|
||||
all operations are applied. The order of the columns in each row propagated to the
|
||||
child node follow the order they appear in this list. The parameter is mandatory
|
||||
if the len(input_columns) != len(output_columns). (default=None, all columns
|
||||
|
@ -1700,9 +1701,9 @@ class BatchDataset(DatasetOp):
|
|||
self.batch_size = batch_size
|
||||
self.drop_remainder = drop_remainder
|
||||
self.per_batch_map = per_batch_map
|
||||
self.input_columns = input_columns
|
||||
self.output_columns = output_columns
|
||||
self.column_order = column_order
|
||||
self.input_columns = input_columns if not isinstance(input_columns, str) else [input_columns]
|
||||
self.output_columns = output_columns if not isinstance(output_columns, str) else [output_columns]
|
||||
self.column_order = column_order if not isinstance(column_order, str) else [column_order]
|
||||
self.pad_info = pad_info
|
||||
self.children.append(input_dataset)
|
||||
input_dataset.parent.append(self)
|
||||
|
@ -1714,6 +1715,8 @@ class BatchDataset(DatasetOp):
|
|||
args["drop_remainder"] = self.drop_remainder
|
||||
args["per_batch_map"] = self.per_batch_map
|
||||
args["input_columns"] = self.input_columns
|
||||
args["output_columns"] = self.output_columns
|
||||
args["column_order"] = self.column_order
|
||||
args["pad_info"] = self.pad_info
|
||||
return args
|
||||
|
||||
|
|
|
@ -276,6 +276,7 @@ def check_save(method):
|
|||
|
||||
return new_method
|
||||
|
||||
|
||||
def check_iterator(method):
|
||||
"""A wrapper that wraps a parameter checker around the original create_tuple_iterator and create_dict_iterator."""
|
||||
|
||||
|
@ -529,10 +530,10 @@ def check_batch(method):
|
|||
raise ValueError("the signature of per_batch_map should match with input columns")
|
||||
|
||||
if output_columns is not None:
|
||||
raise ValueError("output_columns is currently not implemented.")
|
||||
check_columns(output_columns, "output_columns")
|
||||
|
||||
if column_order is not None:
|
||||
raise ValueError("column_order is currently not implemented.")
|
||||
check_columns(column_order, "column_order")
|
||||
|
||||
return method(self, *args, **kwargs)
|
||||
|
||||
|
|
|
@ -449,22 +449,6 @@ def test_batch_exception_13():
|
|||
logger.info("Got an exception in DE: {}".format(str(e)))
|
||||
assert "shard_id" in str(e)
|
||||
|
||||
# test non-functional parameters
|
||||
try:
|
||||
data1 = data1.batch(batch_size, output_columns="3")
|
||||
sum([1 for _ in data1])
|
||||
|
||||
except ValueError as e:
|
||||
logger.info("Got an exception in DE: {}".format(str(e)))
|
||||
assert "output_columns is currently not implemented." in str(e)
|
||||
|
||||
try:
|
||||
data1 = data1.batch(batch_size, column_order="3")
|
||||
sum([1 for _ in data1])
|
||||
|
||||
except ValueError as e:
|
||||
logger.info("Got an exception in DE: {}".format(str(e)))
|
||||
assert "column_order is currently not implemented." in str(e)
|
||||
|
||||
def test_batch_exception_14():
|
||||
batch_size = 2
|
||||
|
|
|
@ -289,11 +289,11 @@ def test_exception():
|
|||
|
||||
def bad_batch_size(batchInfo):
|
||||
raise StopIteration
|
||||
#return batchInfo.get_batch_num()
|
||||
# return batchInfo.get_batch_num()
|
||||
|
||||
def bad_map_func(col, batchInfo):
|
||||
raise StopIteration
|
||||
#return (col,)
|
||||
# return (col,)
|
||||
|
||||
data1 = ds.GeneratorDataset((lambda: gen(100)), ["num"]).batch(bad_batch_size)
|
||||
try:
|
||||
|
@ -312,6 +312,68 @@ def test_exception():
|
|||
pass
|
||||
|
||||
|
||||
def test_multi_col_map():
|
||||
def gen_2_cols(num):
|
||||
for i in range(1, 1 + num):
|
||||
yield (np.array([i]), np.array([i ** 2]))
|
||||
|
||||
def split_col(col, batchInfo):
|
||||
return ([np.copy(arr) for arr in col], [np.copy(-arr) for arr in col])
|
||||
|
||||
def merge_col(col1, col2, batchInfo):
|
||||
merged = []
|
||||
for k, v in enumerate(col1):
|
||||
merged.append(np.array(v + col2[k]))
|
||||
return (merged,)
|
||||
|
||||
def swap_col(col1, col2, batchInfo):
|
||||
return ([np.copy(a) for a in col2], [np.copy(b) for b in col1])
|
||||
|
||||
def batch_map_config(num, s, f, in_nms, out_nms, col_order=None):
|
||||
try:
|
||||
dst = ds.GeneratorDataset((lambda: gen_2_cols(num)), ["col1", "col2"])
|
||||
dst = dst.batch(batch_size=s, input_columns=in_nms, output_columns=out_nms, per_batch_map=f,
|
||||
column_order=col_order)
|
||||
res = []
|
||||
for row in dst.create_dict_iterator(num_epochs=1, output_numpy=True):
|
||||
res.append(row)
|
||||
return res
|
||||
except (ValueError, RuntimeError, TypeError) as e:
|
||||
return str(e)
|
||||
|
||||
# split 1 col into 2 cols
|
||||
res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"])[0]
|
||||
assert np.array_equal(res["col1"], [[1], [2]])
|
||||
assert np.array_equal(res["col_x"], [[1], [4]]) and np.array_equal(res["col_y"], [[-1], [-4]])
|
||||
|
||||
# merge 2 cols into 1 col
|
||||
res = batch_map_config(4, 4, merge_col, ["col1", "col2"], ["merged"])[0]
|
||||
assert np.array_equal(res["merged"], [[2], [6], [12], [20]])
|
||||
|
||||
# swap once
|
||||
res = batch_map_config(3, 3, swap_col, ["col1", "col2"], ["col1", "col2"])[0]
|
||||
assert np.array_equal(res["col1"], [[1], [4], [9]]) and np.array_equal(res["col2"], [[1], [2], [3]])
|
||||
|
||||
# swap twice
|
||||
res = batch_map_config(3, 3, swap_col, ["col1", "col2"], ["col2", "col1"])[0]
|
||||
assert np.array_equal(res["col2"], [[1], [4], [9]]) and np.array_equal(res["col1"], [[1], [2], [3]])
|
||||
|
||||
# test project after map
|
||||
res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"], ["col_x", "col_y", "col1"])[0]
|
||||
assert list(res.keys()) == ["col_x", "col_y", "col1"]
|
||||
|
||||
# test the insertion order is maintained
|
||||
res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"], ["col1", "col_x", "col_y"])[0]
|
||||
assert list(res.keys()) == ["col1", "col_x", "col_y"]
|
||||
|
||||
# test exceptions
|
||||
assert "output_columns with value 233 is not of type" in batch_map_config(2, 2, split_col, ["col2"], 233)
|
||||
assert "column_order with value 233 is not of type" in batch_map_config(2, 2, split_col, ["col2"], ["col1"], 233)
|
||||
assert "output_columns is NOT set correctly" in batch_map_config(2, 2, split_col, ["col2"], ["col1"])
|
||||
assert "Incorrect number of columns" in batch_map_config(2, 2, split_col, ["col2"], ["col3", "col4", "col5"])
|
||||
assert "col-1 doesn't exist" in batch_map_config(2, 2, split_col, ["col-1"], ["col_x", "col_y"])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logger.info("Running test_var_batch_map.py test_batch_corner_cases() function")
|
||||
test_batch_corner_cases()
|
||||
|
@ -333,3 +395,6 @@ if __name__ == '__main__':
|
|||
|
||||
logger.info("Running test_var_batch_map.py test_exception() function")
|
||||
test_exception()
|
||||
|
||||
logger.info("Running test_var_batch_map.py test_multi_col_map() function")
|
||||
test_multi_col_map()
|
||||
|
|
Loading…
Reference in New Issue