fix batch ops

This commit is contained in:
jiangshuqiang 2022-07-19 11:46:41 +08:00
parent 5dadaf70c3
commit 0457d018ef
3 changed files with 198 additions and 62 deletions

View File

@ -187,48 +187,58 @@ Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, TensorRow *d
auto num_columns = (*src)->front().size();
for (size_t i = 0; i < num_columns; i++) {
std::shared_ptr<Tensor> first_tensor = (*src)->at(0).at(i); // first row, column i
TensorShape first_shape = first_tensor->shape();
DataType first_type = first_tensor->type();
TensorShape new_shape = first_shape.PrependDim(static_cast<int64_t>(batch_size));
std::shared_ptr<Tensor> new_tensor;
if (first_type.IsNumeric()) { // numeric tensor
RETURN_IF_NOT_OK(Tensor::CreateEmpty(new_shape, first_type, &new_tensor));
dsize_t j = 0;
for (auto row : **src) {
std::shared_ptr<Tensor> old_tensor = row.at(i); // row j, column i
if (old_tensor->shape() == first_shape) { // check the newly popped rows have the same dim as the first
if (new_shape.NumOfElements() != 0) {
RETURN_IF_NOT_OK(new_tensor->InsertTensor({j++}, old_tensor));
}
// Don't do anything if the tensor has no data
} else {
std::stringstream shape1, shape2;
first_shape.Print(shape1);
old_tensor->shape().Print(shape2);
RETURN_STATUS_UNEXPECTED(
"Inconsistent batch shapes, batch operation expect same shape for each data row, "
"but got inconsistent shape in column " +
std::to_string(i) + ", expected shape for this column is:" + shape1.str() + ", got shape:" + shape2.str());
}
}
} else { // handle string column differently
std::vector<std::string> strings;
for (dsize_t j = 0; j < batch_size; j++) {
std::shared_ptr<Tensor> old_tensor = (*src)->at(j).at(i);
for (auto itr = old_tensor->begin<std::string_view>(); itr != old_tensor->end<std::string_view>(); ++itr) {
strings.emplace_back(*itr);
}
}
RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor));
}
RETURN_IF_NOT_OK(ConvertRowsToTensor(src, &new_tensor, batch_size, i));
dest->emplace_back(new_tensor);
}
return Status::OK();
}
Status BatchOp::ConvertRowsToTensor(const std::unique_ptr<TensorQTable> *src, std::shared_ptr<Tensor> *dst,
dsize_t batch_size, size_t col) {
RETURN_UNEXPECTED_IF_NULL(src);
RETURN_UNEXPECTED_IF_NULL(dst);
std::shared_ptr<Tensor> first_tensor = (*src)->at(0).at(col); // first row, column i
TensorShape first_shape = first_tensor->shape();
DataType first_type = first_tensor->type();
TensorShape new_shape = first_shape.PrependDim(static_cast<int64_t>(batch_size));
std::shared_ptr<Tensor> new_tensor;
if (first_type.IsNumeric()) { // numeric tensor
RETURN_IF_NOT_OK(Tensor::CreateEmpty(new_shape, first_type, &new_tensor));
dsize_t j = 0;
for (auto row : **src) {
std::shared_ptr<Tensor> old_tensor = row.at(col); // row j, column i
if (old_tensor->shape() == first_shape) { // check the newly popped rows have the same dim as the first
if (new_shape.NumOfElements() != 0) {
RETURN_IF_NOT_OK(new_tensor->InsertTensor({j++}, old_tensor));
}
// Don't do anything if the tensor has no data
} else {
std::stringstream shape1, shape2;
first_shape.Print(shape1);
old_tensor->shape().Print(shape2);
RETURN_STATUS_UNEXPECTED(
"Inconsistent batch shapes, batch operation expect same shape for each data row, "
"but got inconsistent shape in column " +
std::to_string(col) + ", expected shape for this column is:" + shape1.str() + ", got shape:" + shape2.str());
}
}
} else { // handle string column differently
std::vector<std::string> strings;
for (dsize_t j = 0; j < batch_size; j++) {
std::shared_ptr<Tensor> old_tensor = (*src)->at(j).at(col);
for (auto itr = old_tensor->begin<std::string_view>(); itr != old_tensor->end<std::string_view>(); ++itr) {
strings.emplace_back(*itr);
}
}
RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor));
}
*dst = std::move(new_tensor);
return Status::OK();
}
Status BatchOp::WorkerEntry(int32_t workerId) {
TaskManager::FindMe()->Post();
std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair;
@ -252,15 +262,16 @@ Status BatchOp::WorkerEntry(int32_t workerId) {
Status BatchOp::MakeBatchedRow(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, TensorRow *new_row) {
RETURN_UNEXPECTED_IF_NULL(table_pair.first);
bool concat_batch = false;
#ifdef ENABLE_PYTHON
if (batch_map_func_) {
RETURN_IF_NOT_OK(MapColumns(&table_pair));
RETURN_IF_NOT_OK(MapColumns(&table_pair, &concat_batch));
} // pass it through pyfun
#endif
if (pad_) {
RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_));
} // do padding if needed
RETURN_IF_NOT_OK(BatchRows(&table_pair.first, new_row, table_pair.first->size(), concat_batch_));
RETURN_IF_NOT_OK(BatchRows(&table_pair.first, new_row, table_pair.first->size(), concat_batch));
return Status::OK();
}
@ -272,7 +283,7 @@ Status BatchOp::EoeReceived(int32_t) {
}
#ifdef ENABLE_PYTHON
Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair, bool *concat_batch) {
RETURN_UNEXPECTED_IF_NULL(table_pair);
RETURN_UNEXPECTED_IF_NULL(table_pair->first);
std::unique_ptr<TensorQTable> in_q_table = std::move(table_pair->first);
@ -291,21 +302,29 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>
}
}
RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second));
RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second, concat_batch));
// If concat batch rows, the num_rows should be 1.
if (*concat_batch) {
num_rows = 1;
}
auto out_q_table = std::make_unique<TensorQTable>(num_rows, TensorRow(column_name_id_map_.size(), nullptr));
// If concat batch rows, the num_rows should be 1.
if (concat_batch_) {
out_q_table = std::make_unique<TensorQTable>(1, TensorRow(column_name_id_map_.size(), nullptr));
}
for (const auto &itr : child_map_) {
auto col_itr = in_col_name_id.find(itr.first);
if (col_itr == in_col_name_id.end()) { // col needs to be prepared for per_batch_map
// col needs to be placed into the out table
size_t col_id = column_name_id_map_[itr.first];
for (size_t i = 0; i < num_rows; i++) {
(*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]);
if (*concat_batch) {
std::shared_ptr<Tensor> new_tensor;
RETURN_IF_NOT_OK(
ConvertRowsToTensor(&in_q_table, &new_tensor, in_q_table->size(), static_cast<size_t>(itr.second)));
(*out_q_table)[0][col_id] = std::move(new_tensor);
} else {
for (size_t i = 0; i < num_rows; i++) {
(*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]);
}
}
}
}
@ -314,7 +333,7 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>
for (size_t i = 0; i < out_cols.size(); i++) {
size_t col_id = column_name_id_map_[out_col_names_[i]];
size_t row_id = 0;
CHECK_FAIL_RETURN_UNEXPECTED(num_rows == out_cols[i].size() || concat_batch_,
CHECK_FAIL_RETURN_UNEXPECTED(num_rows == out_cols[i].size(),
"Invalid data, column: " + out_col_names_[i] +
" expects: " + std::to_string(num_rows) +
" rows returned from 'per_batch_map', got: " + std::to_string(out_cols[i].size()));
@ -371,7 +390,7 @@ Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) {
return Status(StatusCode::kSuccess, "batch_size function call succeeded.");
}
Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info) {
Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info, bool *concat_batch) {
RETURN_UNEXPECTED_IF_NULL(input);
RETURN_UNEXPECTED_IF_NULL(output);
{
@ -405,6 +424,14 @@ Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBat
"should be " +
std::to_string(out_col_names_.size()) +
" , but got: " + std::to_string(ret_tuple.size()));
bool all_array = true;
for (size_t i = 0; i < ret_tuple.size(); i++) {
if (!py::isinstance<py::array>(ret_tuple[i])) {
all_array = false;
break;
}
}
*concat_batch = all_array;
for (size_t i = 0; i < ret_tuple.size(); i++) {
TensorRow output_batch;
// If user returns a type that is neither a list nor an array, issue a error msg.
@ -413,8 +440,7 @@ Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBat
<< " returned by per_batch_map is not a list, this could lead to conversion failure.";
}
if (py::isinstance<py::array>(ret_tuple[i])) {
concat_batch_ = true;
if (*concat_batch) {
std::shared_ptr<Tensor> out;
// If concat batch rows, the batch map function result should be in 1 row.
RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(py::cast<py::array>(ret_tuple[i]), &out));

View File

@ -202,6 +202,15 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc
static Status BatchRows(const std::unique_ptr<TensorQTable> *src, TensorRow *dest, dsize_t batch_size,
bool concat_batch = false);
// convert the rows to tensor
// @param const std::unique_ptr<TensorQTable> *src - table that has the rows for batching
// @param const std::unique_ptr<TensorQTable> *dst - dest_table to hold batched rows
// @param int32_t size - batch_size
// @param int32_t size - col
// @return Status The status code returned
static Status ConvertRowsToTensor(const std::unique_ptr<TensorQTable> *src, std::shared_ptr<Tensor> *dst,
dsize_t batch_size, size_t col);
// @param table
// @param const PadInfo &pad_info pad info
// @param const std::unordered_map<std::string, int32_t>& column_name_id_map - column names to index mapping
@ -242,7 +251,7 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc
// Function that calls pyfunc to perform map on batch
// @param (std::pair<std::unique_ptr<TensorQTable>, batch_stats> *table_pair - contains un-batched tensor
// @return Status The status code returned
Status MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair);
Status MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair, bool *concat_batch);
#endif
// @param const PadInfo &pad_info pad info to unpack
@ -272,7 +281,7 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc
// Invoke batch map function with current BatchInfo to generate tensors to batch.
// @return Status The status code returned
Status InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info);
Status InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info, bool *concat_batch);
#endif
Status SendWaitFlagToWorker(int32_t worker_id) override;
@ -281,7 +290,6 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc
Status ComputeColMap() override;
int32_t start_batch_size_;
bool concat_batch_ = false; // bool for whether to concat batch rows
const bool drop_; // bool for whether to drop remainder or not
const bool pad_; // bool for whether to perform padding on tensor
std::vector<std::string> in_col_names_; // input column name for per_batch_map

View File

@ -471,25 +471,127 @@ def test_multi_col_concat_map():
"""
def gen_2_cols(num):
for i in range(1, 1 + num):
yield np.array([i]), np.array([i ** 2])
yield np.array([i]), np.array([i * 2]), np.array([i ** 2])
def concat_col(col1, col2, batch_info):
def concat(args):
arg_list = []
for arg in [col1, col2]:
for arg in args:
rows = []
for value in arg:
rows.append(value)
arg_list.append(np.array(np.concatenate(rows, axis=0)))
return tuple(arg_list)
return arg_list
dst = ds.GeneratorDataset((lambda: gen_2_cols(3)), ["col1", "col2"])
dst = dst.batch(batch_size=3, input_columns=["col1", "col2"], output_columns=["col1", "col2"],
per_batch_map=concat_col)
def append_batch_array(args):
arg_list = []
for arg in args:
rows = []
for value in arg:
rows.append(value)
arg_list.append(np.array(rows))
return arg_list
def append_batch_list(args):
arg_list = []
for arg in args:
rows = []
for value in arg:
rows.append(value)
arg_list.append(rows)
return arg_list
def concat_all_col(col1, col2, col3, batch_info):
return tuple(concat([col1, col2, col3]))
def concat_less_col(col1, col2, batch_info):
return tuple(concat([col1, col2]))
def concat_more_col(col1, col2, batch_info):
return tuple(concat([col1, col2, col1]))
def append_all_col_list(col1, col2, col3, batch_info):
return tuple(append_batch_list([col1, col2, col3]))
def append_less_col_list(col1, col2, batch_info):
return tuple(append_batch_list([col1, col2]))
def append_more_col_list(col1, col2, batch_info):
return tuple(append_batch_list([col1, col2, col1]))
def append_all_col_array(col1, col2, col3, batch_info):
return tuple(append_batch_array([col1, col2, col3]))
def append_less_col_array(col1, col2, batch_info):
return tuple(append_batch_array([col1, col2]))
def append_more_col_array(col1, col2, batch_info):
return tuple(append_batch_array([col1, col2, col1]))
def batch_map(num, all_col, batch_size, icol, ocol, pfunc, res):
dst = ds.GeneratorDataset((lambda: gen_2_cols(num)), all_col)
dst = dst.batch(batch_size=batch_size, input_columns=icol, output_columns=ocol,
per_batch_map=pfunc)
for row in dst.create_dict_iterator(num_epochs=1, output_numpy=True):
res.append(row)
# test all column
res = []
for row in dst.create_dict_iterator(num_epochs=1, output_numpy=True):
res.append(row)
batch_map(3, ["col1", "col2", "col3"], 3, ["col1", "col2", "col3"], ["col1", "col2", "col3"], concat_all_col, res)
assert np.array_equal(res[0]["col1"], [1, 2, 3]) and np.array_equal(res[0]["col2"], [2, 4, 6]) and \
np.array_equal(res[0]["col3"], [1, 4, 9])
# test less column
res = []
batch_map(6, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2"], concat_less_col, res)
assert np.array_equal(res[1]["col1"], [4, 5, 6]) and np.array_equal(res[1]["col2"], [8, 10, 12]) and \
np.array_equal(res[1]["col3"], [[16], [25], [36]])
# test more column
res = []
batch_map(8, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2", "col4"], concat_more_col, res)
assert np.array_equal(res[0]["col1"], [1, 2, 3]) and np.array_equal(res[0]["col2"], [2, 4, 6]) and \
np.array_equal(res[0]["col3"], [[1], [4], [9]]) and np.array_equal(res[0]["col4"], [1, 2, 3])
assert np.array_equal(res[2]["col1"], [7, 8]) and np.array_equal(res[2]["col2"], [14, 16]) and \
np.array_equal(res[2]["col3"], [[49], [64]]) and np.array_equal(res[2]["col4"], [7, 8])
assert np.array_equal(res[0]["col1"], [1, 2, 3]) and np.array_equal(res[0]["col2"], [1, 4, 9])
# test all column
res = []
batch_map(3, ["col1", "col2", "col3"], 3, ["col1", "col2", "col3"], ["col1", "col2", "col3"],
append_all_col_array, res)
assert np.array_equal(res[0]["col1"], [[1], [2], [3]]) and np.array_equal(res[0]["col2"], [[2], [4], [6]]) and \
np.array_equal(res[0]["col3"], [[1], [4], [9]])
list_res = []
batch_map(3, ["col1", "col2", "col3"], 3, ["col1", "col2", "col3"], ["col1", "col2", "col3"],
append_all_col_list, list_res)
assert np.array_equal(res[0]["col1"], list_res[0]["col1"]) and \
np.array_equal(res[0]["col2"], list_res[0]["col2"]) \
and np.array_equal(res[0]["col3"], list_res[0]["col3"])
# test less column
res = []
batch_map(6, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2"], append_less_col_array, res)
assert np.array_equal(res[1]["col1"], [[4], [5], [6]]) and np.array_equal(res[1]["col2"], [[8], [10], [12]]) and \
np.array_equal(res[1]["col3"], [[16], [25], [36]])
list_res = []
batch_map(6, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2"], append_less_col_list, list_res)
assert np.array_equal(res[1]["col1"], list_res[1]["col1"]) and \
np.array_equal(res[1]["col2"], list_res[1]["col2"]) \
and np.array_equal(res[1]["col3"], list_res[1]["col3"])
# test more column
res = []
batch_map(8, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2", "col4"], append_more_col_array, res)
assert np.array_equal(res[0]["col1"], [[1], [2], [3]]) and np.array_equal(res[0]["col2"], [[2], [4], [6]]) and \
np.array_equal(res[0]["col3"], [[1], [4], [9]]) and np.array_equal(res[0]["col4"], [[1], [2], [3]])
assert np.array_equal(res[2]["col1"], [[7], [8]]) and np.array_equal(res[2]["col2"], [[14], [16]]) and \
np.array_equal(res[2]["col3"], [[49], [64]]) and np.array_equal(res[2]["col4"], [[7], [8]])
list_res = []
batch_map(8, ["col1", "col2", "col3"], 3, ["col1", "col2"], ["col1", "col2", "col4"], append_more_col_list,
list_res)
assert np.array_equal(res[0]["col1"], list_res[0]["col1"]) and \
np.array_equal(res[0]["col2"], list_res[0]["col2"]) \
and np.array_equal(res[0]["col3"], list_res[0]["col3"])
assert np.array_equal(res[2]["col1"], list_res[2]["col1"]) and \
np.array_equal(res[2]["col2"], list_res[2]["col2"]) \
and np.array_equal(res[2]["col3"], list_res[2]["col3"])
def test_exceptions_2():