!5496 Change the argument type of CSV state function from char to int

Merge pull request !5496 from jiangzhiwen/bug/csv_char_to_int
This commit is contained in:
mindspore-ci-bot 2020-08-31 12:45:46 +08:00 committed by Gitee
commit 6e452d829d
2 changed files with 232 additions and 212 deletions

View File

@ -97,7 +97,67 @@ Status CsvOp::Init() {
return Status::OK();
}
int CsvOp::CsvParser::put_record(char c) {
CsvOp::CsvParser::CsvParser(int32_t worker_id, std::shared_ptr<JaggedConnector> connector, int64_t rows_per_buffer,
char field_delim, std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default)
: worker_id_(worker_id),
buffer_connector_(connector),
csv_rows_per_buffer_(rows_per_buffer),
csv_field_delim_(field_delim),
column_default_(column_default),
cur_state_(START_OF_FILE),
pos_(0),
cur_row_(0),
cur_col_(0),
total_rows_(0),
start_offset_(0),
end_offset_(std::numeric_limits<int64_t>::max()),
err_message_("unknown") {
cur_buffer_ = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone);
InitCsvParser();
}
void CsvOp::CsvParser::Reset() {
cur_state_ = START_OF_FILE;
pos_ = 0;
cur_row_ = 0;
cur_col_ = 0;
}
CsvOp::CsvParser::Message CsvOp::CsvParser::GetMessage(int c) {
if (c == csv_field_delim_) {
return Message::MS_DELIM;
} else if (c == '"') {
return Message::MS_QUOTE;
} else if (c == '\r' || c == '\n') {
return Message::MS_END_OF_LINE;
} else if (c == std::char_traits<char>::eof()) {
return Message::MS_END_OF_FILE;
} else {
return Message::MS_NORMAL;
}
}
int CsvOp::CsvParser::ProcessMessage(int c) {
Message m = GetMessage(c);
StateDiagram::iterator it = sd.find({cur_state_, m});
if (it == sd.end()) {
return -1;
}
int ret = it->second.second(*this, c);
cur_state_ = it->second.first;
return ret;
}
int CsvOp::CsvParser::PutChar(int c) {
if (pos_ >= str_buf_.size()) {
str_buf_.resize(str_buf_.size() * 2);
}
str_buf_[pos_] = c;
pos_++;
return 0;
}
int CsvOp::CsvParser::PutRecord(int c) {
std::string s = std::string(str_buf_.begin(), str_buf_.begin() + pos_);
std::shared_ptr<Tensor> t;
if (cur_col_ >= column_default_.size()) {
@ -125,7 +185,7 @@ int CsvOp::CsvParser::put_record(char c) {
return 0;
}
int CsvOp::CsvParser::put_row(char c) {
int CsvOp::CsvParser::PutRow(int c) {
if (total_rows_ < start_offset_) {
total_rows_++;
cur_col_ = 0;
@ -137,7 +197,7 @@ int CsvOp::CsvParser::put_row(char c) {
return 0;
}
int ret = put_record(c);
int ret = PutRecord(c);
if (ret < 0) {
return ret;
}
@ -162,9 +222,14 @@ int CsvOp::CsvParser::put_row(char c) {
return 0;
}
int CsvOp::CsvParser::end_file(char c) {
int CsvOp::CsvParser::AddRow(int c) {
total_rows_++;
return 0;
}
int CsvOp::CsvParser::EndFile(int c) {
if (cur_col_ > 0) {
int ret = put_row(c);
int ret = PutRow(c);
if (ret < 0) {
return ret;
}
@ -177,7 +242,18 @@ int CsvOp::CsvParser::end_file(char c) {
return 0;
}
int CsvOp::CsvParser::countRows(int c) {
int CsvOp::CsvParser::CatchException(int c) {
if (GetMessage(c) == Message::MS_QUOTE && cur_state_ == State::UNQUOTE) {
err_message_ = "Invalid quote in unquote field.";
} else if (GetMessage(c) == Message::MS_END_OF_FILE && cur_state_ == State::QUOTE) {
err_message_ = "Reach the end of file in quote field.";
} else if (GetMessage(c) == Message::MS_NORMAL && cur_state_ == State::SECOND_QUOTE) {
err_message_ = "Receive unquote char in quote field.";
}
return -1;
}
int CsvOp::CsvParser::CountRows(int c) {
Message m;
if (c == '"') {
m = Message::MS_QUOTE;
@ -194,79 +270,79 @@ int CsvOp::CsvParser::countRows(int c) {
return it->second.second(*this, c);
}
Status CsvOp::CsvParser::initCsvParser() {
Status CsvOp::CsvParser::InitCsvParser() {
str_buf_.resize(CSV_BUFFER_SIZE);
// State diagram for counting rows
sdl = {// START_OF_FILE
// ┌───────────┬───────────┬───────────────┐
// │ abc │ " │ \n │
// ├───────────┼───────────┼───────────────┤
// │ UNQUOTE │ QUOTE │ START_OF_FILE │
// ├───────────┼───────────┼───────────────┤
// | null_func │ null_func │ null_func │
// └───────────┴───────────┴───────────────┘
{{State::START_OF_FILE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}},
{{State::START_OF_FILE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}},
{{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::null_func}},
// |---------------------------------------|
// | abc | " | \n |
// |---------------------------------------|
// | UNQUOTE | QUOTE | START_OF_FILE |
// |---------------------------------------|
// | NullFunc | NullFunc | NullFunc |
// |---------------------------------------|
{{State::START_OF_FILE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}},
{{State::START_OF_FILE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}},
{{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::NullFunc}},
// UNQUOTE
// ┌───────────┬───────────┬─────────────┐
// │ abc │ " │ \n │
// ├───────────┼───────────┼─────────────┤
// │ UNQUOTE │ QUOTE │ END_OF_LINE │
// ├───────────┼───────────┼─────────────┤
// | null_func │ null_func │ add_row │
// └───────────┴───────────┴─────────────┘
{{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}},
{{State::UNQUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}},
{{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::add_row}},
// |-------------------------------------|
// | abc | " | \n |
// |-------------------------------------|
// | UNQUOTE | QUOTE | END_OF_LINE |
// |-------------------------------------|
// | NullFunc | NullFunc | AddRow |
// |-------------------------------------|
{{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}},
{{State::UNQUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}},
{{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::AddRow}},
// QUOTE
// ┌───────────┬──────────────┬───────────┐
// │ abc │ " │ \n │
// ├───────────┼──────────────┼───────────┤
// │ QUOTE │ SECOND_QUOTE │ QUOTE │
// ├───────────┼──────────────┼───────────┤
// | null_func │ null_func │ null_func │
// └───────────┴──────────────┴───────────┘
{{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::null_func}},
{{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::null_func}},
{{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::null_func}},
// |--------------------------------------|
// | abc | " | \n |
// |--------------------------------------|
// | QUOTE | SECOND_QUOTE | QUOTE |
// |--------------------------------------|
// | NullFunc | NullFunc | NullFunc |
// |--------------------------------------|
{{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::NullFunc}},
{{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::NullFunc}},
{{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::NullFunc}},
// SECOND_QUOTE
// ┌───────────┬───────────┬─────────────┐
// │ abc │ " │ \n │
// ├───────────┼───────────┼─────────────┤
// │ UNQUOTE │ QUOTE │ END_OF_LINE │
// ├───────────┼───────────┼─────────────┤
// | null_func │ null_func │ add_row │
// └───────────┴───────────┴─────────────┘
{{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}},
{{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}},
{{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::add_row}},
// |-------------------------------------|
// | abc | " | \n |
// |-------------------------------------|
// | UNQUOTE | QUOTE | END_OF_LINE |
// |-------------------------------------|
// | NullFunc | NullFunc | AddRow |
// |-------------------------------------|
{{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}},
{{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}},
{{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::AddRow}},
// END_OF_LINE
// ┌───────────┬───────────┬─────────────┐
// │ abc │ " │ \n │
// ├───────────┼───────────┼─────────────┤
// │ UNQUOTE │ QUOTE │ END_OF_LINE │
// ├───────────┼───────────┼─────────────┤
// | null_func │ null_func │ null_func │
// └───────────┴───────────┴─────────────┘
{{State::END_OF_LINE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}},
{{State::END_OF_LINE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}},
{{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::null_func}}};
// |-------------------------------------|
// | abc | " | \n |
// |-------------------------------------|
// | UNQUOTE | QUOTE | END_OF_LINE |
// |-------------------------------------|
// | NullFunc | NullFunc | NullFunc |
// |-------------------------------------|
{{State::END_OF_LINE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}},
{{State::END_OF_LINE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}},
{{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::NullFunc}}};
// State diagram for CSV parser
sd = {// START_OF_FILE
// ┌───────────┬──────────┬──────────┬────────────────┬────────────────┐
// │ abc │ , │ " │ \n │ EOF │
// ├───────────┼──────────┼──────────┼────────────────┼────────────────┤
// │ UNQUOTE │ DELIM │ QUOTE │ START_OF_FILE │ END_OF_FILE │
// ├───────────┼──────────┼──────────┼────────────────┼────────────────┤
// | lambda │ lambda │ lambda │ null_func │ null_func │
// └───────────┴──────────┴──────────┴────────────────┴────────────────┘
// |-------------------------------------------------------------------|
// | abc | , | " | \n | EOF |
// |-------------------------------------------------------------------|
// | UNQUOTE | DELIM | QUOTE | START_OF_FILE | END_OF_FILE |
// |-------------------------------------------------------------------|
// | lambda | lambda | lambda | NullFunc | NullFunc |
// |-------------------------------------------------------------------|
{{State::START_OF_FILE, Message::MS_NORMAL},
{State::UNQUOTE,
[this](CsvParser &, char c) -> int {
@ -281,7 +357,7 @@ Status CsvOp::CsvParser::initCsvParser() {
[this](CsvParser &, char c) -> int {
this->tensor_table_ = std::make_unique<TensorQTable>();
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
return this->put_record(c);
return this->PutRecord(c);
}}},
{{State::START_OF_FILE, Message::MS_QUOTE},
{State::QUOTE,
@ -291,81 +367,81 @@ Status CsvOp::CsvParser::initCsvParser() {
this->pos_ = 0;
return 0;
}}},
{{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::null_func}},
{{State::START_OF_FILE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::null_func}},
{{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::NullFunc}},
{{State::START_OF_FILE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::NullFunc}},
// UNQUOTE
// ┌───────────┬────────────┬───────────┬─────────────┬────────────────┐
// │ abc │ , │ " │ \n │ EOF │
// ├───────────┼────────────┼───────────┼─────────────┼────────────────┤
// │ UNQUOTE │ DELIM │ EXCEPTION │ END_OF_LINE │ END_OF_FILE │
// ├───────────┼────────────┼───────────┼─────────────┼────────────────┤
// | put_char │ put_record │ exception │ put_row │ end_file │
// └───────────┴────────────┴───────────┴─────────────┴────────────────┘
{{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::put_char}},
{{State::UNQUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::put_record}},
{{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::put_row}},
{{State::UNQUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}},
// |-------------------------------------------------------------------|
// | abc | , | " | \n | EOF |
// |-------------------------------------------------------------------|
// | UNQUOTE | DELIM | EXCEPTION | END_OF_LINE | END_OF_FILE |
// |-------------------------------------------------------------------|
// | PutChar | PutRecord | exception | PutRow | EndFile |
// |-------------------------------------------------------------------|
{{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::PutChar}},
{{State::UNQUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::PutRecord}},
{{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::PutRow}},
{{State::UNQUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}},
// UNQUOTE-Exception
{{State::UNQUOTE, Message::MS_QUOTE}, {State::EXCEPTION, &CsvParser::catch_exception}},
{{State::UNQUOTE, Message::MS_QUOTE}, {State::EXCEPTION, &CsvParser::CatchException}},
// DELIM
// ┌───────────┬────────────┬───────────┬─────────────┬────────────────┐
// │ abc │ , │ " │ \n │ EOF │
// ├───────────┼────────────┼───────────┼─────────────┼────────────────┤
// │ UNQUOTE │ DELIM │ QUOTE │ END_OF_LINE │ END_OF_FILE │
// ├───────────┼────────────┼───────────┼─────────────┼────────────────┤
// | put_char │ put_record │ lambda │ put_row │ end_file │
// └───────────┴────────────┴───────────┴─────────────┴────────────────┘
{{State::DELIM, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::put_char}},
{{State::DELIM, Message::MS_DELIM}, {State::DELIM, &CsvParser::put_record}},
// |-------------------------------------------------------------------|
// | abc | , | " | \n | EOF |
// |-------------------------------------------------------------------|
// | UNQUOTE | DELIM | QUOTE | END_OF_LINE | END_OF_FILE |
// |-------------------------------------------------------------------|
// | PutChar | PutRecord | lambda | PutRow | EndFile |
// |-------------------------------------------------------------------|
{{State::DELIM, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::PutChar}},
{{State::DELIM, Message::MS_DELIM}, {State::DELIM, &CsvParser::PutRecord}},
{{State::DELIM, Message::MS_QUOTE},
{State::QUOTE,
[this](CsvParser &, char c) -> int {
this->pos_ = 0;
return 0;
}}},
{{State::DELIM, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::put_row}},
{{State::DELIM, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}},
{{State::DELIM, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::PutRow}},
{{State::DELIM, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}},
// QUOTE
// ┌───────────┬──────────┬──────────────┬──────────┬────────────────┐
// │ abc │ , │ " │ \n │ EOF │
// ├───────────┼──────────┼──────────────┼──────────┼────────────────┤
// │ QUOTE │ QUOTE │ SECOND_QUOTE │ QUOTE │ EXCEPTION │
// ├───────────┼──────────┼──────────────┼──────────┼────────────────┤
// | put_char │ put_char │ null_func │ put_char │ exception │
// └───────────┴──────────┴──────────────┴──────────┴────────────────┘
{{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::put_char}},
{{State::QUOTE, Message::MS_DELIM}, {State::QUOTE, &CsvParser::put_char}},
{{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::null_func}},
{{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::put_char}},
// |-----------------------------------------------------------------|
// | abc | , | " | \n | EOF |
// |-----------------------------------------------------------------|
// | QUOTE | QUOTE | SECOND_QUOTE | QUOTE | EXCEPTION |
// |-----------------------------------------------------------------|
// | PutChar | PutChar | NullFunc | PutChar | exception |
// |-----------------------------------------------------------------|
{{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::PutChar}},
{{State::QUOTE, Message::MS_DELIM}, {State::QUOTE, &CsvParser::PutChar}},
{{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::NullFunc}},
{{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::PutChar}},
// QUOTE-Exception
{{State::QUOTE, Message::MS_END_OF_FILE}, {State::EXCEPTION, &CsvParser::catch_exception}},
{{State::QUOTE, Message::MS_END_OF_FILE}, {State::EXCEPTION, &CsvParser::CatchException}},
// SECOND_QUOTE
// ┌───────────┬────────────┬──────────┬─────────────┬────────────────┐
// │ abc │ , │ " │ \n │ EOF │
// ├───────────┼────────────┼──────────┼─────────────┼────────────────┤
// │ EXCEPTION │ DELIM │ QUOTE │ END_OF_LINE │ END_OF_FILE │
// ├───────────┼────────────┼──────────┼─────────────┼────────────────┤
// | exception │ put_record │ put_char │ put_row │ end_file │
// └───────────┴────────────┴──────────┴─────────────┴────────────────┘
{{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::put_char}},
{{State::SECOND_QUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::put_record}},
{{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::put_row}},
{{State::SECOND_QUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}},
// |------------------------------------------------------------------|
// | abc | , | " | \n | EOF |
// |------------------------------------------------------------------|
// | EXCEPTION | DELIM | QUOTE | END_OF_LINE | END_OF_FILE |
// |------------------------------------------------------------------|
// | exception | PutRecord | PutChar | PutRow | EndFile |
// |------------------------------------------------------------------|
{{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::PutChar}},
{{State::SECOND_QUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::PutRecord}},
{{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::PutRow}},
{{State::SECOND_QUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}},
// SECOND_QUOTE-Exception
{{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::EXCEPTION, &CsvParser::catch_exception}},
{{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::EXCEPTION, &CsvParser::CatchException}},
// END_OF_LINE
// ┌─────────┬────────┬────────┬─────────────┬─────────────┐
// │ abc │ , │ " │ \n │ EOF │
// ├─────────┼────────┼────────┼─────────────┼─────────────┤
// │ UNQUOTE │ DELIM │ QUOTE │ END_OF_LINE │ END_OF_FILE │
// ├─────────┼────────┼────────┼─────────────┼─────────────┤
// | lambda │ lambda │ lambda │ null_func │ end_file │
// └─────────┴────────┴────────┴─────────────┴─────────────┘
// |-------------------------------------------------------|
// | abc | , | " | \n | EOF |
// |-------------------------------------------------------|
// | UNQUOTE | DELIM | QUOTE | END_OF_LINE | END_OF_FILE |
// |-------------------------------------------------------|
// | lambda | lambda | lambda | NullFunc | EndFile |
// |-------------------------------------------------------|
{{State::END_OF_LINE, Message::MS_NORMAL},
{State::UNQUOTE,
[this](CsvParser &, char c) -> int {
@ -382,7 +458,7 @@ Status CsvOp::CsvParser::initCsvParser() {
if (this->total_rows_ > this->start_offset_ && this->total_rows_ <= this->end_offset_) {
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
}
return this->put_record(c);
return this->PutRecord(c);
}}},
{{State::END_OF_LINE, Message::MS_QUOTE},
{State::QUOTE,
@ -392,8 +468,8 @@ Status CsvOp::CsvParser::initCsvParser() {
}
return 0;
}}},
{{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::null_func}},
{{State::END_OF_LINE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}}};
{{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::NullFunc}},
{{State::END_OF_LINE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}}};
return Status::OK();
}
@ -409,8 +485,8 @@ Status CsvOp::Reset() {
Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, const int64_t end_offset,
const int32_t worker_id) {
CsvParser csv_parser(worker_id, jagged_buffer_connector_, rows_per_buffer_, field_delim_, column_default_list_);
csv_parser.setStartOffset(start_offset);
csv_parser.setEndOffset(end_offset);
csv_parser.SetStartOffset(start_offset);
csv_parser.SetEndOffset(end_offset);
std::ifstream ifs;
ifs.open(file, std::ifstream::in);
if (column_name_list_.empty()) {
@ -424,16 +500,16 @@ Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, cons
// which is a 32-bit -1, it's not equal to the 8-bit -1 on Euler OS. So instead of char, we use
// int to receive its return value.
int chr = ifs.get();
if (csv_parser.processMessage(chr) != 0) {
RETURN_STATUS_UNEXPECTED("Failed to parse file " + file + ":" + std::to_string(csv_parser.total_rows_ + 1) +
". error message: " + csv_parser.err_message_);
if (csv_parser.ProcessMessage(chr) != 0) {
RETURN_STATUS_UNEXPECTED("Failed to parse file " + file + ":" + std::to_string(csv_parser.GetTotalRows() + 1) +
". error message: " + csv_parser.GetErrorMessage());
}
}
} catch (std::invalid_argument &ia) {
std::string err_row = std::to_string(csv_parser.total_rows_ + 1);
std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1);
RETURN_STATUS_UNEXPECTED(file + ":" + err_row + ", type does not match");
} catch (std::out_of_range &oor) {
std::string err_row = std::to_string(csv_parser.total_rows_ + 1);
std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1);
RETURN_STATUS_UNEXPECTED(file + ":" + err_row + ", out of range");
}
return Status::OK();
@ -712,12 +788,12 @@ int64_t CsvOp::CountTotalRows(const std::string &file) {
csv_parser.Reset();
while (ifs.good()) {
int chr = ifs.get();
if (csv_parser.countRows(chr) != 0) {
if (csv_parser.CountRows(chr) != 0) {
break;
}
}
return csv_parser.total_rows_;
return csv_parser.GetTotalRows();
}
// Pushes a control indicator onto the IOBlockQueue for each worker to consume.

View File

@ -64,52 +64,27 @@ class CsvOp : public ParallelOp {
CsvParser() = delete;
CsvParser(int32_t worker_id, std::shared_ptr<JaggedConnector> connector, int64_t rows_per_buffer, char field_delim,
std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default)
: worker_id_(worker_id),
buffer_connector_(connector),
csv_rows_per_buffer_(rows_per_buffer),
csv_field_delim_(field_delim),
column_default_(column_default),
cur_state_(START_OF_FILE),
pos_(0),
cur_row_(0),
cur_col_(0),
total_rows_(0),
start_offset_(0),
end_offset_(std::numeric_limits<int64_t>::max()),
err_message_("unknown") {
cur_buffer_ = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone);
initCsvParser();
}
std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default);
~CsvParser() = default;
void Reset() {
cur_state_ = START_OF_FILE;
pos_ = 0;
cur_row_ = 0;
cur_col_ = 0;
}
void Reset();
void setStartOffset(int64_t start_offset) { start_offset_ = start_offset; }
void SetStartOffset(int64_t start_offset) { start_offset_ = start_offset; }
void setEndOffset(int64_t end_offset) { end_offset_ = end_offset; }
void SetEndOffset(int64_t end_offset) { end_offset_ = end_offset; }
int processMessage(int c) {
Message m = getMessage(c);
StateDiagram::iterator it = sd.find({cur_state_, m});
if (it == sd.end()) {
return -1;
}
int ret = it->second.second(*this, static_cast<char>(c));
cur_state_ = it->second.first;
return ret;
}
int ProcessMessage(int c);
int countRows(int c);
int CountRows(int c);
Status initCsvParser();
Status InitCsvParser();
int64_t GetTotalRows() { return total_rows_; }
std::string GetErrorMessage() { return err_message_; }
private:
enum State : uint8_t {
START_OF_FILE = 0,
UNQUOTE,
@ -130,55 +105,24 @@ class CsvOp : public ParallelOp {
};
typedef std::pair<State, Message> StateMessagePair;
typedef std::pair<State, std::function<int(CsvParser &, char)>> StateActionPair;
typedef std::pair<State, std::function<int(CsvParser &, int)>> StateActionPair;
typedef std::map<StateMessagePair, StateActionPair> StateDiagram;
Message getMessage(int c) {
if (c == csv_field_delim_) {
return Message::MS_DELIM;
} else if (c == '"') {
return Message::MS_QUOTE;
} else if (c == '\r' || c == '\n') {
return Message::MS_END_OF_LINE;
} else if (c == std::char_traits<char>::eof()) {
return Message::MS_END_OF_FILE;
} else {
return Message::MS_NORMAL;
}
}
Message GetMessage(int c);
int null_func(char c) { return 0; }
int NullFunc(int c) { return 0; }
int put_char(char c) {
if (pos_ >= str_buf_.size()) {
str_buf_.resize(str_buf_.size() * 2);
}
str_buf_[pos_] = c;
pos_++;
return 0;
}
int PutChar(int c);
int put_record(char c);
int PutRecord(int c);
int put_row(char c);
int PutRow(int c);
int end_file(char c);
int EndFile(int c);
int add_row(char c) {
total_rows_++;
return 0;
}
int AddRow(int c);
int catch_exception(char c) {
if (getMessage(c) == Message::MS_QUOTE && cur_state_ == State::UNQUOTE) {
err_message_ = "Invalid quote in unquote field.";
} else if (getMessage(c) == Message::MS_END_OF_FILE && cur_state_ == State::QUOTE) {
err_message_ = "Reach the end of file in quote field.";
} else if (getMessage(c) == Message::MS_NORMAL && cur_state_ == State::SECOND_QUOTE) {
err_message_ = "Receive unquote char in quote field.";
}
return -1;
}
int CatchException(int c);
int32_t worker_id_;
std::shared_ptr<JaggedConnector> buffer_connector_;
@ -401,8 +345,8 @@ class CsvOp : public ParallelOp {
// Select file and push it to the block queue.
// @param file_name - File name.
// @param start_file - If file contains the first sample of data.
// @param end_file - If file contains the end sample of data.
// @param start_offset - If file contains the first sample of data.
// @param end_offset - If file contains the end sample of data.
// @param pre_count - Total rows of previous files.
// @return Status - the error code returned.
bool NeedPushFileToBlockQueue(const std::string &file_name, int64_t *start_offset, int64_t *end_offset,