diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc index 80560a59366..ff0285442de 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc @@ -97,7 +97,67 @@ Status CsvOp::Init() { return Status::OK(); } -int CsvOp::CsvParser::put_record(char c) { +CsvOp::CsvParser::CsvParser(int32_t worker_id, std::shared_ptr connector, int64_t rows_per_buffer, + char field_delim, std::vector> column_default) + : worker_id_(worker_id), + buffer_connector_(connector), + csv_rows_per_buffer_(rows_per_buffer), + csv_field_delim_(field_delim), + column_default_(column_default), + cur_state_(START_OF_FILE), + pos_(0), + cur_row_(0), + cur_col_(0), + total_rows_(0), + start_offset_(0), + end_offset_(std::numeric_limits::max()), + err_message_("unknown") { + cur_buffer_ = std::make_unique(0, DataBuffer::BufferFlags::kDeBFlagNone); + InitCsvParser(); +} + +void CsvOp::CsvParser::Reset() { + cur_state_ = START_OF_FILE; + pos_ = 0; + cur_row_ = 0; + cur_col_ = 0; +} + +CsvOp::CsvParser::Message CsvOp::CsvParser::GetMessage(int c) { + if (c == csv_field_delim_) { + return Message::MS_DELIM; + } else if (c == '"') { + return Message::MS_QUOTE; + } else if (c == '\r' || c == '\n') { + return Message::MS_END_OF_LINE; + } else if (c == std::char_traits::eof()) { + return Message::MS_END_OF_FILE; + } else { + return Message::MS_NORMAL; + } +} + +int CsvOp::CsvParser::ProcessMessage(int c) { + Message m = GetMessage(c); + StateDiagram::iterator it = sd.find({cur_state_, m}); + if (it == sd.end()) { + return -1; + } + int ret = it->second.second(*this, c); + cur_state_ = it->second.first; + return ret; +} + +int CsvOp::CsvParser::PutChar(int c) { + if (pos_ >= str_buf_.size()) { + str_buf_.resize(str_buf_.size() * 2); + } + str_buf_[pos_] = c; + pos_++; + return 0; +} + +int CsvOp::CsvParser::PutRecord(int c) { std::string s = std::string(str_buf_.begin(), str_buf_.begin() + pos_); std::shared_ptr t; if (cur_col_ >= column_default_.size()) { @@ -125,7 +185,7 @@ int CsvOp::CsvParser::put_record(char c) { return 0; } -int CsvOp::CsvParser::put_row(char c) { +int CsvOp::CsvParser::PutRow(int c) { if (total_rows_ < start_offset_) { total_rows_++; cur_col_ = 0; @@ -137,7 +197,7 @@ int CsvOp::CsvParser::put_row(char c) { return 0; } - int ret = put_record(c); + int ret = PutRecord(c); if (ret < 0) { return ret; } @@ -162,9 +222,14 @@ int CsvOp::CsvParser::put_row(char c) { return 0; } -int CsvOp::CsvParser::end_file(char c) { +int CsvOp::CsvParser::AddRow(int c) { + total_rows_++; + return 0; +} + +int CsvOp::CsvParser::EndFile(int c) { if (cur_col_ > 0) { - int ret = put_row(c); + int ret = PutRow(c); if (ret < 0) { return ret; } @@ -177,7 +242,18 @@ int CsvOp::CsvParser::end_file(char c) { return 0; } -int CsvOp::CsvParser::countRows(int c) { +int CsvOp::CsvParser::CatchException(int c) { + if (GetMessage(c) == Message::MS_QUOTE && cur_state_ == State::UNQUOTE) { + err_message_ = "Invalid quote in unquote field."; + } else if (GetMessage(c) == Message::MS_END_OF_FILE && cur_state_ == State::QUOTE) { + err_message_ = "Reach the end of file in quote field."; + } else if (GetMessage(c) == Message::MS_NORMAL && cur_state_ == State::SECOND_QUOTE) { + err_message_ = "Receive unquote char in quote field."; + } + return -1; +} + +int CsvOp::CsvParser::CountRows(int c) { Message m; if (c == '"') { m = Message::MS_QUOTE; @@ -194,79 +270,79 @@ int CsvOp::CsvParser::countRows(int c) { return it->second.second(*this, c); } -Status CsvOp::CsvParser::initCsvParser() { +Status CsvOp::CsvParser::InitCsvParser() { str_buf_.resize(CSV_BUFFER_SIZE); // State diagram for counting rows sdl = {// START_OF_FILE - // ┌───────────┬───────────┬───────────────┐ - // │ abc │ " │ \n │ - // ├───────────┼───────────┼───────────────┤ - // │ UNQUOTE │ QUOTE │ START_OF_FILE │ - // ├───────────┼───────────┼───────────────┤ - // | null_func │ null_func │ null_func │ - // └───────────┴───────────┴───────────────┘ - {{State::START_OF_FILE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}}, - {{State::START_OF_FILE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}}, - {{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::null_func}}, + // |---------------------------------------| + // | abc | " | \n | + // |---------------------------------------| + // | UNQUOTE | QUOTE | START_OF_FILE | + // |---------------------------------------| + // | NullFunc | NullFunc | NullFunc | + // |---------------------------------------| + {{State::START_OF_FILE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}}, + {{State::START_OF_FILE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}}, + {{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::NullFunc}}, // UNQUOTE - // ┌───────────┬───────────┬─────────────┐ - // │ abc │ " │ \n │ - // ├───────────┼───────────┼─────────────┤ - // │ UNQUOTE │ QUOTE │ END_OF_LINE │ - // ├───────────┼───────────┼─────────────┤ - // | null_func │ null_func │ add_row │ - // └───────────┴───────────┴─────────────┘ - {{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}}, - {{State::UNQUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}}, - {{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::add_row}}, + // |-------------------------------------| + // | abc | " | \n | + // |-------------------------------------| + // | UNQUOTE | QUOTE | END_OF_LINE | + // |-------------------------------------| + // | NullFunc | NullFunc | AddRow | + // |-------------------------------------| + {{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}}, + {{State::UNQUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}}, + {{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::AddRow}}, // QUOTE - // ┌───────────┬──────────────┬───────────┐ - // │ abc │ " │ \n │ - // ├───────────┼──────────────┼───────────┤ - // │ QUOTE │ SECOND_QUOTE │ QUOTE │ - // ├───────────┼──────────────┼───────────┤ - // | null_func │ null_func │ null_func │ - // └───────────┴──────────────┴───────────┘ - {{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::null_func}}, - {{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::null_func}}, - {{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::null_func}}, + // |--------------------------------------| + // | abc | " | \n | + // |--------------------------------------| + // | QUOTE | SECOND_QUOTE | QUOTE | + // |--------------------------------------| + // | NullFunc | NullFunc | NullFunc | + // |--------------------------------------| + {{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::NullFunc}}, + {{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::NullFunc}}, + {{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::NullFunc}}, // SECOND_QUOTE - // ┌───────────┬───────────┬─────────────┐ - // │ abc │ " │ \n │ - // ├───────────┼───────────┼─────────────┤ - // │ UNQUOTE │ QUOTE │ END_OF_LINE │ - // ├───────────┼───────────┼─────────────┤ - // | null_func │ null_func │ add_row │ - // └───────────┴───────────┴─────────────┘ - {{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}}, - {{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}}, - {{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::add_row}}, + // |-------------------------------------| + // | abc | " | \n | + // |-------------------------------------| + // | UNQUOTE | QUOTE | END_OF_LINE | + // |-------------------------------------| + // | NullFunc | NullFunc | AddRow | + // |-------------------------------------| + {{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}}, + {{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}}, + {{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::AddRow}}, // END_OF_LINE - // ┌───────────┬───────────┬─────────────┐ - // │ abc │ " │ \n │ - // ├───────────┼───────────┼─────────────┤ - // │ UNQUOTE │ QUOTE │ END_OF_LINE │ - // ├───────────┼───────────┼─────────────┤ - // | null_func │ null_func │ null_func │ - // └───────────┴───────────┴─────────────┘ - {{State::END_OF_LINE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::null_func}}, - {{State::END_OF_LINE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::null_func}}, - {{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::null_func}}}; + // |-------------------------------------| + // | abc | " | \n | + // |-------------------------------------| + // | UNQUOTE | QUOTE | END_OF_LINE | + // |-------------------------------------| + // | NullFunc | NullFunc | NullFunc | + // |-------------------------------------| + {{State::END_OF_LINE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}}, + {{State::END_OF_LINE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}}, + {{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::NullFunc}}}; // State diagram for CSV parser sd = {// START_OF_FILE - // ┌───────────┬──────────┬──────────┬────────────────┬────────────────┐ - // │ abc │ , │ " │ \n │ EOF │ - // ├───────────┼──────────┼──────────┼────────────────┼────────────────┤ - // │ UNQUOTE │ DELIM │ QUOTE │ START_OF_FILE │ END_OF_FILE │ - // ├───────────┼──────────┼──────────┼────────────────┼────────────────┤ - // | lambda │ lambda │ lambda │ null_func │ null_func │ - // └───────────┴──────────┴──────────┴────────────────┴────────────────┘ + // |-------------------------------------------------------------------| + // | abc | , | " | \n | EOF | + // |-------------------------------------------------------------------| + // | UNQUOTE | DELIM | QUOTE | START_OF_FILE | END_OF_FILE | + // |-------------------------------------------------------------------| + // | lambda | lambda | lambda | NullFunc | NullFunc | + // |-------------------------------------------------------------------| {{State::START_OF_FILE, Message::MS_NORMAL}, {State::UNQUOTE, [this](CsvParser &, char c) -> int { @@ -281,7 +357,7 @@ Status CsvOp::CsvParser::initCsvParser() { [this](CsvParser &, char c) -> int { this->tensor_table_ = std::make_unique(); this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr)); - return this->put_record(c); + return this->PutRecord(c); }}}, {{State::START_OF_FILE, Message::MS_QUOTE}, {State::QUOTE, @@ -291,81 +367,81 @@ Status CsvOp::CsvParser::initCsvParser() { this->pos_ = 0; return 0; }}}, - {{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::null_func}}, - {{State::START_OF_FILE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::null_func}}, + {{State::START_OF_FILE, Message::MS_END_OF_LINE}, {State::START_OF_FILE, &CsvParser::NullFunc}}, + {{State::START_OF_FILE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::NullFunc}}, // UNQUOTE - // ┌───────────┬────────────┬───────────┬─────────────┬────────────────┐ - // │ abc │ , │ " │ \n │ EOF │ - // ├───────────┼────────────┼───────────┼─────────────┼────────────────┤ - // │ UNQUOTE │ DELIM │ EXCEPTION │ END_OF_LINE │ END_OF_FILE │ - // ├───────────┼────────────┼───────────┼─────────────┼────────────────┤ - // | put_char │ put_record │ exception │ put_row │ end_file │ - // └───────────┴────────────┴───────────┴─────────────┴────────────────┘ - {{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::put_char}}, - {{State::UNQUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::put_record}}, - {{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::put_row}}, - {{State::UNQUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}}, + // |-------------------------------------------------------------------| + // | abc | , | " | \n | EOF | + // |-------------------------------------------------------------------| + // | UNQUOTE | DELIM | EXCEPTION | END_OF_LINE | END_OF_FILE | + // |-------------------------------------------------------------------| + // | PutChar | PutRecord | exception | PutRow | EndFile | + // |-------------------------------------------------------------------| + {{State::UNQUOTE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::PutChar}}, + {{State::UNQUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::PutRecord}}, + {{State::UNQUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::PutRow}}, + {{State::UNQUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}}, // UNQUOTE-Exception - {{State::UNQUOTE, Message::MS_QUOTE}, {State::EXCEPTION, &CsvParser::catch_exception}}, + {{State::UNQUOTE, Message::MS_QUOTE}, {State::EXCEPTION, &CsvParser::CatchException}}, // DELIM - // ┌───────────┬────────────┬───────────┬─────────────┬────────────────┐ - // │ abc │ , │ " │ \n │ EOF │ - // ├───────────┼────────────┼───────────┼─────────────┼────────────────┤ - // │ UNQUOTE │ DELIM │ QUOTE │ END_OF_LINE │ END_OF_FILE │ - // ├───────────┼────────────┼───────────┼─────────────┼────────────────┤ - // | put_char │ put_record │ lambda │ put_row │ end_file │ - // └───────────┴────────────┴───────────┴─────────────┴────────────────┘ - {{State::DELIM, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::put_char}}, - {{State::DELIM, Message::MS_DELIM}, {State::DELIM, &CsvParser::put_record}}, + // |-------------------------------------------------------------------| + // | abc | , | " | \n | EOF | + // |-------------------------------------------------------------------| + // | UNQUOTE | DELIM | QUOTE | END_OF_LINE | END_OF_FILE | + // |-------------------------------------------------------------------| + // | PutChar | PutRecord | lambda | PutRow | EndFile | + // |-------------------------------------------------------------------| + {{State::DELIM, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::PutChar}}, + {{State::DELIM, Message::MS_DELIM}, {State::DELIM, &CsvParser::PutRecord}}, {{State::DELIM, Message::MS_QUOTE}, {State::QUOTE, [this](CsvParser &, char c) -> int { this->pos_ = 0; return 0; }}}, - {{State::DELIM, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::put_row}}, - {{State::DELIM, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}}, + {{State::DELIM, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::PutRow}}, + {{State::DELIM, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}}, // QUOTE - // ┌───────────┬──────────┬──────────────┬──────────┬────────────────┐ - // │ abc │ , │ " │ \n │ EOF │ - // ├───────────┼──────────┼──────────────┼──────────┼────────────────┤ - // │ QUOTE │ QUOTE │ SECOND_QUOTE │ QUOTE │ EXCEPTION │ - // ├───────────┼──────────┼──────────────┼──────────┼────────────────┤ - // | put_char │ put_char │ null_func │ put_char │ exception │ - // └───────────┴──────────┴──────────────┴──────────┴────────────────┘ - {{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::put_char}}, - {{State::QUOTE, Message::MS_DELIM}, {State::QUOTE, &CsvParser::put_char}}, - {{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::null_func}}, - {{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::put_char}}, + // |-----------------------------------------------------------------| + // | abc | , | " | \n | EOF | + // |-----------------------------------------------------------------| + // | QUOTE | QUOTE | SECOND_QUOTE | QUOTE | EXCEPTION | + // |-----------------------------------------------------------------| + // | PutChar | PutChar | NullFunc | PutChar | exception | + // |-----------------------------------------------------------------| + {{State::QUOTE, Message::MS_NORMAL}, {State::QUOTE, &CsvParser::PutChar}}, + {{State::QUOTE, Message::MS_DELIM}, {State::QUOTE, &CsvParser::PutChar}}, + {{State::QUOTE, Message::MS_QUOTE}, {State::SECOND_QUOTE, &CsvParser::NullFunc}}, + {{State::QUOTE, Message::MS_END_OF_LINE}, {State::QUOTE, &CsvParser::PutChar}}, // QUOTE-Exception - {{State::QUOTE, Message::MS_END_OF_FILE}, {State::EXCEPTION, &CsvParser::catch_exception}}, + {{State::QUOTE, Message::MS_END_OF_FILE}, {State::EXCEPTION, &CsvParser::CatchException}}, // SECOND_QUOTE - // ┌───────────┬────────────┬──────────┬─────────────┬────────────────┐ - // │ abc │ , │ " │ \n │ EOF │ - // ├───────────┼────────────┼──────────┼─────────────┼────────────────┤ - // │ EXCEPTION │ DELIM │ QUOTE │ END_OF_LINE │ END_OF_FILE │ - // ├───────────┼────────────┼──────────┼─────────────┼────────────────┤ - // | exception │ put_record │ put_char │ put_row │ end_file │ - // └───────────┴────────────┴──────────┴─────────────┴────────────────┘ - {{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::put_char}}, - {{State::SECOND_QUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::put_record}}, - {{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::put_row}}, - {{State::SECOND_QUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}}, + // |------------------------------------------------------------------| + // | abc | , | " | \n | EOF | + // |------------------------------------------------------------------| + // | EXCEPTION | DELIM | QUOTE | END_OF_LINE | END_OF_FILE | + // |------------------------------------------------------------------| + // | exception | PutRecord | PutChar | PutRow | EndFile | + // |------------------------------------------------------------------| + {{State::SECOND_QUOTE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::PutChar}}, + {{State::SECOND_QUOTE, Message::MS_DELIM}, {State::DELIM, &CsvParser::PutRecord}}, + {{State::SECOND_QUOTE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::PutRow}}, + {{State::SECOND_QUOTE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}}, // SECOND_QUOTE-Exception - {{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::EXCEPTION, &CsvParser::catch_exception}}, + {{State::SECOND_QUOTE, Message::MS_NORMAL}, {State::EXCEPTION, &CsvParser::CatchException}}, // END_OF_LINE - // ┌─────────┬────────┬────────┬─────────────┬─────────────┐ - // │ abc │ , │ " │ \n │ EOF │ - // ├─────────┼────────┼────────┼─────────────┼─────────────┤ - // │ UNQUOTE │ DELIM │ QUOTE │ END_OF_LINE │ END_OF_FILE │ - // ├─────────┼────────┼────────┼─────────────┼─────────────┤ - // | lambda │ lambda │ lambda │ null_func │ end_file │ - // └─────────┴────────┴────────┴─────────────┴─────────────┘ + // |-------------------------------------------------------| + // | abc | , | " | \n | EOF | + // |-------------------------------------------------------| + // | UNQUOTE | DELIM | QUOTE | END_OF_LINE | END_OF_FILE | + // |-------------------------------------------------------| + // | lambda | lambda | lambda | NullFunc | EndFile | + // |-------------------------------------------------------| {{State::END_OF_LINE, Message::MS_NORMAL}, {State::UNQUOTE, [this](CsvParser &, char c) -> int { @@ -382,7 +458,7 @@ Status CsvOp::CsvParser::initCsvParser() { if (this->total_rows_ > this->start_offset_ && this->total_rows_ <= this->end_offset_) { this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr)); } - return this->put_record(c); + return this->PutRecord(c); }}}, {{State::END_OF_LINE, Message::MS_QUOTE}, {State::QUOTE, @@ -392,8 +468,8 @@ Status CsvOp::CsvParser::initCsvParser() { } return 0; }}}, - {{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::null_func}}, - {{State::END_OF_LINE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::end_file}}}; + {{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::NullFunc}}, + {{State::END_OF_LINE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}}}; return Status::OK(); } @@ -409,8 +485,8 @@ Status CsvOp::Reset() { Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, const int64_t end_offset, const int32_t worker_id) { CsvParser csv_parser(worker_id, jagged_buffer_connector_, rows_per_buffer_, field_delim_, column_default_list_); - csv_parser.setStartOffset(start_offset); - csv_parser.setEndOffset(end_offset); + csv_parser.SetStartOffset(start_offset); + csv_parser.SetEndOffset(end_offset); std::ifstream ifs; ifs.open(file, std::ifstream::in); if (column_name_list_.empty()) { @@ -424,16 +500,16 @@ Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, cons // which is a 32-bit -1, it's not equal to the 8-bit -1 on Euler OS. So instead of char, we use // int to receive its return value. int chr = ifs.get(); - if (csv_parser.processMessage(chr) != 0) { - RETURN_STATUS_UNEXPECTED("Failed to parse file " + file + ":" + std::to_string(csv_parser.total_rows_ + 1) + - ". error message: " + csv_parser.err_message_); + if (csv_parser.ProcessMessage(chr) != 0) { + RETURN_STATUS_UNEXPECTED("Failed to parse file " + file + ":" + std::to_string(csv_parser.GetTotalRows() + 1) + + ". error message: " + csv_parser.GetErrorMessage()); } } } catch (std::invalid_argument &ia) { - std::string err_row = std::to_string(csv_parser.total_rows_ + 1); + std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1); RETURN_STATUS_UNEXPECTED(file + ":" + err_row + ", type does not match"); } catch (std::out_of_range &oor) { - std::string err_row = std::to_string(csv_parser.total_rows_ + 1); + std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1); RETURN_STATUS_UNEXPECTED(file + ":" + err_row + ", out of range"); } return Status::OK(); @@ -712,12 +788,12 @@ int64_t CsvOp::CountTotalRows(const std::string &file) { csv_parser.Reset(); while (ifs.good()) { int chr = ifs.get(); - if (csv_parser.countRows(chr) != 0) { + if (csv_parser.CountRows(chr) != 0) { break; } } - return csv_parser.total_rows_; + return csv_parser.GetTotalRows(); } // Pushes a control indicator onto the IOBlockQueue for each worker to consume. diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h index 93ed2a754e6..1417d9c8b09 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h @@ -64,52 +64,27 @@ class CsvOp : public ParallelOp { CsvParser() = delete; CsvParser(int32_t worker_id, std::shared_ptr connector, int64_t rows_per_buffer, char field_delim, - std::vector> column_default) - : worker_id_(worker_id), - buffer_connector_(connector), - csv_rows_per_buffer_(rows_per_buffer), - csv_field_delim_(field_delim), - column_default_(column_default), - cur_state_(START_OF_FILE), - pos_(0), - cur_row_(0), - cur_col_(0), - total_rows_(0), - start_offset_(0), - end_offset_(std::numeric_limits::max()), - err_message_("unknown") { - cur_buffer_ = std::make_unique(0, DataBuffer::BufferFlags::kDeBFlagNone); - initCsvParser(); - } + std::vector> column_default); ~CsvParser() = default; - void Reset() { - cur_state_ = START_OF_FILE; - pos_ = 0; - cur_row_ = 0; - cur_col_ = 0; - } + void Reset(); - void setStartOffset(int64_t start_offset) { start_offset_ = start_offset; } + void SetStartOffset(int64_t start_offset) { start_offset_ = start_offset; } - void setEndOffset(int64_t end_offset) { end_offset_ = end_offset; } + void SetEndOffset(int64_t end_offset) { end_offset_ = end_offset; } - int processMessage(int c) { - Message m = getMessage(c); - StateDiagram::iterator it = sd.find({cur_state_, m}); - if (it == sd.end()) { - return -1; - } - int ret = it->second.second(*this, static_cast(c)); - cur_state_ = it->second.first; - return ret; - } + int ProcessMessage(int c); - int countRows(int c); + int CountRows(int c); - Status initCsvParser(); + Status InitCsvParser(); + int64_t GetTotalRows() { return total_rows_; } + + std::string GetErrorMessage() { return err_message_; } + + private: enum State : uint8_t { START_OF_FILE = 0, UNQUOTE, @@ -130,55 +105,24 @@ class CsvOp : public ParallelOp { }; typedef std::pair StateMessagePair; - typedef std::pair> StateActionPair; + typedef std::pair> StateActionPair; typedef std::map StateDiagram; - Message getMessage(int c) { - if (c == csv_field_delim_) { - return Message::MS_DELIM; - } else if (c == '"') { - return Message::MS_QUOTE; - } else if (c == '\r' || c == '\n') { - return Message::MS_END_OF_LINE; - } else if (c == std::char_traits::eof()) { - return Message::MS_END_OF_FILE; - } else { - return Message::MS_NORMAL; - } - } + Message GetMessage(int c); - int null_func(char c) { return 0; } + int NullFunc(int c) { return 0; } - int put_char(char c) { - if (pos_ >= str_buf_.size()) { - str_buf_.resize(str_buf_.size() * 2); - } - str_buf_[pos_] = c; - pos_++; - return 0; - } + int PutChar(int c); - int put_record(char c); + int PutRecord(int c); - int put_row(char c); + int PutRow(int c); - int end_file(char c); + int EndFile(int c); - int add_row(char c) { - total_rows_++; - return 0; - } + int AddRow(int c); - int catch_exception(char c) { - if (getMessage(c) == Message::MS_QUOTE && cur_state_ == State::UNQUOTE) { - err_message_ = "Invalid quote in unquote field."; - } else if (getMessage(c) == Message::MS_END_OF_FILE && cur_state_ == State::QUOTE) { - err_message_ = "Reach the end of file in quote field."; - } else if (getMessage(c) == Message::MS_NORMAL && cur_state_ == State::SECOND_QUOTE) { - err_message_ = "Receive unquote char in quote field."; - } - return -1; - } + int CatchException(int c); int32_t worker_id_; std::shared_ptr buffer_connector_; @@ -401,8 +345,8 @@ class CsvOp : public ParallelOp { // Select file and push it to the block queue. // @param file_name - File name. - // @param start_file - If file contains the first sample of data. - // @param end_file - If file contains the end sample of data. + // @param start_offset - If file contains the first sample of data. + // @param end_offset - If file contains the end sample of data. // @param pre_count - Total rows of previous files. // @return Status - the error code returned. bool NeedPushFileToBlockQueue(const std::string &file_name, int64_t *start_offset, int64_t *end_offset,