From 0457d018efd0d30771d588b2364bd8df92a1c8bd Mon Sep 17 00:00:00 2001 From: jiangshuqiang Date: Tue, 19 Jul 2022 11:46:41 +0800 Subject: [PATCH] fix batch ops --- .../dataset/engine/datasetops/batch_op.cc | 124 +++++++++++------- .../dataset/engine/datasetops/batch_op.h | 14 +- tests/ut/python/dataset/test_var_batch_map.py | 122 +++++++++++++++-- 3 files changed, 198 insertions(+), 62 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc index e0c82f94523..a4e90a2cdec 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc @@ -187,48 +187,58 @@ Status BatchOp::BatchRows(const std::unique_ptr *src, TensorRow *d auto num_columns = (*src)->front().size(); for (size_t i = 0; i < num_columns; i++) { - std::shared_ptr first_tensor = (*src)->at(0).at(i); // first row, column i - TensorShape first_shape = first_tensor->shape(); - DataType first_type = first_tensor->type(); - TensorShape new_shape = first_shape.PrependDim(static_cast(batch_size)); - std::shared_ptr new_tensor; - if (first_type.IsNumeric()) { // numeric tensor - RETURN_IF_NOT_OK(Tensor::CreateEmpty(new_shape, first_type, &new_tensor)); - dsize_t j = 0; - for (auto row : **src) { - std::shared_ptr old_tensor = row.at(i); // row j, column i - if (old_tensor->shape() == first_shape) { // check the newly popped rows have the same dim as the first - if (new_shape.NumOfElements() != 0) { - RETURN_IF_NOT_OK(new_tensor->InsertTensor({j++}, old_tensor)); - } - // Don't do anything if the tensor has no data - } else { - std::stringstream shape1, shape2; - first_shape.Print(shape1); - old_tensor->shape().Print(shape2); - RETURN_STATUS_UNEXPECTED( - "Inconsistent batch shapes, batch operation expect same shape for each data row, " - "but got inconsistent shape in column " + - std::to_string(i) + ", expected shape for this column is:" + shape1.str() + ", got shape:" + shape2.str()); - } - } - } else { // handle string column differently - std::vector strings; - for (dsize_t j = 0; j < batch_size; j++) { - std::shared_ptr old_tensor = (*src)->at(j).at(i); - for (auto itr = old_tensor->begin(); itr != old_tensor->end(); ++itr) { - strings.emplace_back(*itr); - } - } - RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor)); - } + RETURN_IF_NOT_OK(ConvertRowsToTensor(src, &new_tensor, batch_size, i)); dest->emplace_back(new_tensor); } return Status::OK(); } +Status BatchOp::ConvertRowsToTensor(const std::unique_ptr *src, std::shared_ptr *dst, + dsize_t batch_size, size_t col) { + RETURN_UNEXPECTED_IF_NULL(src); + RETURN_UNEXPECTED_IF_NULL(dst); + std::shared_ptr first_tensor = (*src)->at(0).at(col); // first row, column i + TensorShape first_shape = first_tensor->shape(); + DataType first_type = first_tensor->type(); + TensorShape new_shape = first_shape.PrependDim(static_cast(batch_size)); + + std::shared_ptr new_tensor; + if (first_type.IsNumeric()) { // numeric tensor + RETURN_IF_NOT_OK(Tensor::CreateEmpty(new_shape, first_type, &new_tensor)); + dsize_t j = 0; + for (auto row : **src) { + std::shared_ptr old_tensor = row.at(col); // row j, column i + if (old_tensor->shape() == first_shape) { // check the newly popped rows have the same dim as the first + if (new_shape.NumOfElements() != 0) { + RETURN_IF_NOT_OK(new_tensor->InsertTensor({j++}, old_tensor)); + } + // Don't do anything if the tensor has no data + } else { + std::stringstream shape1, shape2; + first_shape.Print(shape1); + old_tensor->shape().Print(shape2); + RETURN_STATUS_UNEXPECTED( + "Inconsistent batch shapes, batch operation expect same shape for each data row, " + "but got inconsistent shape in column " + + std::to_string(col) + ", expected shape for this column is:" + shape1.str() + ", got shape:" + shape2.str()); + } + } + } else { // handle string column differently + std::vector strings; + for (dsize_t j = 0; j < batch_size; j++) { + std::shared_ptr old_tensor = (*src)->at(j).at(col); + for (auto itr = old_tensor->begin(); itr != old_tensor->end(); ++itr) { + strings.emplace_back(*itr); + } + } + RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor)); + } + *dst = std::move(new_tensor); + return Status::OK(); +} + Status BatchOp::WorkerEntry(int32_t workerId) { TaskManager::FindMe()->Post(); std::pair, CBatchInfo> table_pair; @@ -252,15 +262,16 @@ Status BatchOp::WorkerEntry(int32_t workerId) { Status BatchOp::MakeBatchedRow(std::pair, CBatchInfo> table_pair, TensorRow *new_row) { RETURN_UNEXPECTED_IF_NULL(table_pair.first); + bool concat_batch = false; #ifdef ENABLE_PYTHON if (batch_map_func_) { - RETURN_IF_NOT_OK(MapColumns(&table_pair)); + RETURN_IF_NOT_OK(MapColumns(&table_pair, &concat_batch)); } // pass it through pyfun #endif if (pad_) { RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_)); } // do padding if needed - RETURN_IF_NOT_OK(BatchRows(&table_pair.first, new_row, table_pair.first->size(), concat_batch_)); + RETURN_IF_NOT_OK(BatchRows(&table_pair.first, new_row, table_pair.first->size(), concat_batch)); return Status::OK(); } @@ -272,7 +283,7 @@ Status BatchOp::EoeReceived(int32_t) { } #ifdef ENABLE_PYTHON -Status BatchOp::MapColumns(std::pair, CBatchInfo> *table_pair) { +Status BatchOp::MapColumns(std::pair, CBatchInfo> *table_pair, bool *concat_batch) { RETURN_UNEXPECTED_IF_NULL(table_pair); RETURN_UNEXPECTED_IF_NULL(table_pair->first); std::unique_ptr in_q_table = std::move(table_pair->first); @@ -291,21 +302,29 @@ Status BatchOp::MapColumns(std::pair, CBatchInfo> } } - RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second)); + RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second, concat_batch)); + + // If concat batch rows, the num_rows should be 1. + if (*concat_batch) { + num_rows = 1; + } auto out_q_table = std::make_unique(num_rows, TensorRow(column_name_id_map_.size(), nullptr)); - // If concat batch rows, the num_rows should be 1. - if (concat_batch_) { - out_q_table = std::make_unique(1, TensorRow(column_name_id_map_.size(), nullptr)); - } for (const auto &itr : child_map_) { auto col_itr = in_col_name_id.find(itr.first); if (col_itr == in_col_name_id.end()) { // col needs to be prepared for per_batch_map // col needs to be placed into the out table size_t col_id = column_name_id_map_[itr.first]; - for (size_t i = 0; i < num_rows; i++) { - (*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]); + if (*concat_batch) { + std::shared_ptr new_tensor; + RETURN_IF_NOT_OK( + ConvertRowsToTensor(&in_q_table, &new_tensor, in_q_table->size(), static_cast(itr.second))); + (*out_q_table)[0][col_id] = std::move(new_tensor); + } else { + for (size_t i = 0; i < num_rows; i++) { + (*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]); + } } } } @@ -314,7 +333,7 @@ Status BatchOp::MapColumns(std::pair, CBatchInfo> for (size_t i = 0; i < out_cols.size(); i++) { size_t col_id = column_name_id_map_[out_col_names_[i]]; size_t row_id = 0; - CHECK_FAIL_RETURN_UNEXPECTED(num_rows == out_cols[i].size() || concat_batch_, + CHECK_FAIL_RETURN_UNEXPECTED(num_rows == out_cols[i].size(), "Invalid data, column: " + out_col_names_[i] + " expects: " + std::to_string(num_rows) + " rows returned from 'per_batch_map', got: " + std::to_string(out_cols[i].size())); @@ -371,7 +390,7 @@ Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) { return Status(StatusCode::kSuccess, "batch_size function call succeeded."); } -Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info) { +Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info, bool *concat_batch) { RETURN_UNEXPECTED_IF_NULL(input); RETURN_UNEXPECTED_IF_NULL(output); { @@ -405,6 +424,14 @@ Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBat "should be " + std::to_string(out_col_names_.size()) + " , but got: " + std::to_string(ret_tuple.size())); + bool all_array = true; + for (size_t i = 0; i < ret_tuple.size(); i++) { + if (!py::isinstance(ret_tuple[i])) { + all_array = false; + break; + } + } + *concat_batch = all_array; for (size_t i = 0; i < ret_tuple.size(); i++) { TensorRow output_batch; // If user returns a type that is neither a list nor an array, issue a error msg. @@ -413,8 +440,7 @@ Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBat << " returned by per_batch_map is not a list, this could lead to conversion failure."; } - if (py::isinstance(ret_tuple[i])) { - concat_batch_ = true; + if (*concat_batch) { std::shared_ptr out; // If concat batch rows, the batch map function result should be in 1 row. RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(py::cast(ret_tuple[i]), &out)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h index 2f7f9c60629..20556700448 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h @@ -202,6 +202,15 @@ class BatchOp : public ParallelOp, CBatc static Status BatchRows(const std::unique_ptr *src, TensorRow *dest, dsize_t batch_size, bool concat_batch = false); + // convert the rows to tensor + // @param const std::unique_ptr *src - table that has the rows for batching + // @param const std::unique_ptr *dst - dest_table to hold batched rows + // @param int32_t size - batch_size + // @param int32_t size - col + // @return Status The status code returned + static Status ConvertRowsToTensor(const std::unique_ptr *src, std::shared_ptr *dst, + dsize_t batch_size, size_t col); + // @param table // @param const PadInfo &pad_info pad info // @param const std::unordered_map& column_name_id_map - column names to index mapping @@ -242,7 +251,7 @@ class BatchOp : public ParallelOp, CBatc // Function that calls pyfunc to perform map on batch // @param (std::pair, batch_stats> *table_pair - contains un-batched tensor // @return Status The status code returned - Status MapColumns(std::pair, CBatchInfo> *table_pair); + Status MapColumns(std::pair, CBatchInfo> *table_pair, bool *concat_batch); #endif // @param const PadInfo &pad_info pad info to unpack @@ -272,7 +281,7 @@ class BatchOp : public ParallelOp, CBatc // Invoke batch map function with current BatchInfo to generate tensors to batch. // @return Status The status code returned - Status InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info); + Status InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info, bool *concat_batch); #endif Status SendWaitFlagToWorker(int32_t worker_id) override; @@ -281,7 +290,6 @@ class BatchOp : public ParallelOp, CBatc Status ComputeColMap() override; int32_t start_batch_size_; - bool concat_batch_ = false; // bool for whether to concat batch rows const bool drop_; // bool for whether to drop remainder or not const bool pad_; // bool for whether to perform padding on tensor std::vector in_col_names_; // input column name for per_batch_map diff --git a/tests/ut/python/dataset/test_var_batch_map.py b/tests/ut/python/dataset/test_var_batch_map.py index 33cbbc9b836..a9106304eee 100644 --- a/tests/ut/python/dataset/test_var_batch_map.py +++ b/tests/ut/python/dataset/test_var_batch_map.py @@ -471,25 +471,127 @@ def test_multi_col_concat_map(): """ def gen_2_cols(num): for i in range(1, 1 + num): - yield np.array([i]), np.array([i ** 2]) + yield np.array([i]), np.array([i * 2]), np.array([i ** 2]) - def concat_col(col1, col2, batch_info): + def concat(args): arg_list = [] - for arg in [col1, col2]: + for arg in args: rows = [] for value in arg: rows.append(value) arg_list.append(np.array(np.concatenate(rows, axis=0))) - return tuple(arg_list) + return arg_list - dst = ds.GeneratorDataset((lambda: gen_2_cols(3)), ["col1", "col2"]) - dst = dst.batch(batch_size=3, input_columns=["col1", "col2"], output_columns=["col1", "col2"], - per_batch_map=concat_col) + def append_batch_array(args): + arg_list = [] + for arg in args: + rows = [] + for value in arg: + rows.append(value) + arg_list.append(np.array(rows)) + return arg_list + + def append_batch_list(args): + arg_list = [] + for arg in args: + rows = [] + for value in arg: + rows.append(value) + arg_list.append(rows) + return arg_list + + def concat_all_col(col1, col2, col3, batch_info): + return tuple(concat([col1, col2, col3])) + + def concat_less_col(col1, col2, batch_info): + return tuple(concat([col1, col2])) + + def concat_more_col(col1, col2, batch_info): + return tuple(concat([col1, col2, col1])) + + def append_all_col_list(col1, col2, col3, batch_info): + return tuple(append_batch_list([col1, col2, col3])) + + def append_less_col_list(col1, col2, batch_info): + return tuple(append_batch_list([col1, col2])) + + def append_more_col_list(col1, col2, batch_info): + return tuple(append_batch_list([col1, col2, col1])) + + def append_all_col_array(col1, col2, col3, batch_info): + return tuple(append_batch_array([col1, col2, col3])) + + def append_less_col_array(col1, col2, batch_info): + return tuple(append_batch_array([col1, col2])) + + def append_more_col_array(col1, col2, batch_info): + return tuple(append_batch_array([col1, col2, col1])) + + def batch_map(num, all_col, batch_size, icol, ocol, pfunc, res): + dst = ds.GeneratorDataset((lambda: gen_2_cols(num)), all_col) + dst = dst.batch(batch_size=batch_size, input_columns=icol, output_columns=ocol, + per_batch_map=pfunc) + for row in dst.create_dict_iterator(num_epochs=1, output_numpy=True): + res.append(row) + + # test all column res = [] - for row in dst.create_dict_iterator(num_epochs=1, output_numpy=True): - res.append(row) + batch_map(3, ["col1", "col2", "col3"], 3, ["col1", "col2", "col3"], ["col1", "col2", "col3"], concat_all_col, res) + assert np.array_equal(res[0]["col1"], [1, 2, 3]) and np.array_equal(res[0]["col2"], [2, 4, 6]) and \ + np.array_equal(res[0]["col3"], [1, 4, 9]) + # test less column + res = [] + batch_map(6, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2"], concat_less_col, res) + assert np.array_equal(res[1]["col1"], [4, 5, 6]) and np.array_equal(res[1]["col2"], [8, 10, 12]) and \ + np.array_equal(res[1]["col3"], [[16], [25], [36]]) + # test more column + res = [] + batch_map(8, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2", "col4"], concat_more_col, res) + assert np.array_equal(res[0]["col1"], [1, 2, 3]) and np.array_equal(res[0]["col2"], [2, 4, 6]) and \ + np.array_equal(res[0]["col3"], [[1], [4], [9]]) and np.array_equal(res[0]["col4"], [1, 2, 3]) + assert np.array_equal(res[2]["col1"], [7, 8]) and np.array_equal(res[2]["col2"], [14, 16]) and \ + np.array_equal(res[2]["col3"], [[49], [64]]) and np.array_equal(res[2]["col4"], [7, 8]) - assert np.array_equal(res[0]["col1"], [1, 2, 3]) and np.array_equal(res[0]["col2"], [1, 4, 9]) + # test all column + res = [] + batch_map(3, ["col1", "col2", "col3"], 3, ["col1", "col2", "col3"], ["col1", "col2", "col3"], + append_all_col_array, res) + assert np.array_equal(res[0]["col1"], [[1], [2], [3]]) and np.array_equal(res[0]["col2"], [[2], [4], [6]]) and \ + np.array_equal(res[0]["col3"], [[1], [4], [9]]) + list_res = [] + batch_map(3, ["col1", "col2", "col3"], 3, ["col1", "col2", "col3"], ["col1", "col2", "col3"], + append_all_col_list, list_res) + assert np.array_equal(res[0]["col1"], list_res[0]["col1"]) and \ + np.array_equal(res[0]["col2"], list_res[0]["col2"]) \ + and np.array_equal(res[0]["col3"], list_res[0]["col3"]) + + # test less column + res = [] + batch_map(6, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2"], append_less_col_array, res) + assert np.array_equal(res[1]["col1"], [[4], [5], [6]]) and np.array_equal(res[1]["col2"], [[8], [10], [12]]) and \ + np.array_equal(res[1]["col3"], [[16], [25], [36]]) + list_res = [] + batch_map(6, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2"], append_less_col_list, list_res) + assert np.array_equal(res[1]["col1"], list_res[1]["col1"]) and \ + np.array_equal(res[1]["col2"], list_res[1]["col2"]) \ + and np.array_equal(res[1]["col3"], list_res[1]["col3"]) + + # test more column + res = [] + batch_map(8, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2", "col4"], append_more_col_array, res) + assert np.array_equal(res[0]["col1"], [[1], [2], [3]]) and np.array_equal(res[0]["col2"], [[2], [4], [6]]) and \ + np.array_equal(res[0]["col3"], [[1], [4], [9]]) and np.array_equal(res[0]["col4"], [[1], [2], [3]]) + assert np.array_equal(res[2]["col1"], [[7], [8]]) and np.array_equal(res[2]["col2"], [[14], [16]]) and \ + np.array_equal(res[2]["col3"], [[49], [64]]) and np.array_equal(res[2]["col4"], [[7], [8]]) + list_res = [] + batch_map(8, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2", "col4"], append_more_col_list, + list_res) + assert np.array_equal(res[0]["col1"], list_res[0]["col1"]) and \ + np.array_equal(res[0]["col2"], list_res[0]["col2"]) \ + and np.array_equal(res[0]["col3"], list_res[0]["col3"]) + assert np.array_equal(res[2]["col1"], list_res[2]["col1"]) and \ + np.array_equal(res[2]["col2"], list_res[2]["col2"]) \ + and np.array_equal(res[2]["col3"], list_res[2]["col3"]) def test_exceptions_2():