!26611 [MD] fix mindrecord log message part01

Merge pull request !26611 from liyong126/fix_mindrecord_log_msg
This commit is contained in:
i-robot 2021-12-01 01:12:48 +00:00 committed by Gitee
commit bf59d75604
7 changed files with 194 additions and 238 deletions

View File

@ -44,19 +44,22 @@ Status ShardWriter::GetFullPathFromFileName(const std::vector<std::string> &path
// Get full path from file name // Get full path from file name
for (const auto &path : paths) { for (const auto &path : paths) {
CHECK_FAIL_RETURN_UNEXPECTED(CheckIsValidUtf8(path), CHECK_FAIL_RETURN_UNEXPECTED(CheckIsValidUtf8(path),
"Invalid data, file name: " + path + " contains invalid uft-8 character."); "Invalid file, mindrecord file name: " + path +
" contains invalid uft-8 character. Please rename mindrecord file name.");
char resolved_path[PATH_MAX] = {0}; char resolved_path[PATH_MAX] = {0};
char buf[PATH_MAX] = {0}; char buf[PATH_MAX] = {0};
CHECK_FAIL_RETURN_UNEXPECTED(strncpy_s(buf, PATH_MAX, common::SafeCStr(path), path.length()) == EOK, CHECK_FAIL_RETURN_UNEXPECTED(strncpy_s(buf, PATH_MAX, common::SafeCStr(path), path.length()) == EOK,
"Failed to call securec func [strncpy_s], path: " + path); "[Internal ERROR] Failed to call securec func [strncpy_s], path: " + path);
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
RETURN_UNEXPECTED_IF_NULL(_fullpath(resolved_path, dirname(&(buf[0])), PATH_MAX)); RETURN_UNEXPECTED_IF_NULL(_fullpath(resolved_path, dirname(&(buf[0])), PATH_MAX));
RETURN_UNEXPECTED_IF_NULL(_fullpath(resolved_path, common::SafeCStr(path), PATH_MAX)); RETURN_UNEXPECTED_IF_NULL(_fullpath(resolved_path, common::SafeCStr(path), PATH_MAX));
#else #else
CHECK_FAIL_RETURN_UNEXPECTED(realpath(dirname(&(buf[0])), resolved_path) != nullptr, CHECK_FAIL_RETURN_UNEXPECTED(
"Invalid file, path: " + std::string(resolved_path)); realpath(dirname(&(buf[0])), resolved_path) != nullptr,
"Invalid file, failed to get the realpath of mindrecord files. Please check file path: " +
std::string(resolved_path));
if (realpath(common::SafeCStr(path), resolved_path) == nullptr) { if (realpath(common::SafeCStr(path), resolved_path) == nullptr) {
MS_LOG(DEBUG) << "Path: " << common::SafeCStr(path) << "check success."; MS_LOG(DEBUG) << "Succeed to check path: " << common::SafeCStr(path);
} }
#endif #endif
file_paths_.emplace_back(string(resolved_path)); file_paths_.emplace_back(string(resolved_path));
@ -75,7 +78,8 @@ Status ShardWriter::OpenDataFiles(bool append, bool overwrite) {
} }
auto realpath = FileUtils::GetRealPath(dir.value().data()); auto realpath = FileUtils::GetRealPath(dir.value().data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + file); CHECK_FAIL_RETURN_UNEXPECTED(
realpath.has_value(), "Invalid file, failed to get the realpath of mindrecord files. Please check file: " + file);
std::optional<std::string> whole_path = ""; std::optional<std::string> whole_path = "";
FileUtils::ConcatDirAndFileName(&realpath, &local_file_name, &whole_path); FileUtils::ConcatDirAndFileName(&realpath, &local_file_name, &whole_path);
@ -91,19 +95,25 @@ Status ShardWriter::OpenDataFiles(bool append, bool overwrite) {
if (overwrite) { if (overwrite) {
auto res1 = std::remove(whole_path.value().c_str()); auto res1 = std::remove(whole_path.value().c_str());
CHECK_FAIL_RETURN_UNEXPECTED(!std::ifstream(whole_path.value()) == true, CHECK_FAIL_RETURN_UNEXPECTED(!std::ifstream(whole_path.value()) == true,
"Failed to delete file, path: " + file); "Invalid file, failed to remove the old files when trying to overwrite "
"mindrecord files. Please check file path and permission: " +
file);
if (res1 == 0) { if (res1 == 0) {
MS_LOG(WARNING) << "Succeed to delete file, path: " << file; MS_LOG(WARNING) << "Succeed to remove the old mindrecord files, path: " << file;
} }
auto db_file = whole_path.value() + ".db"; auto db_file = whole_path.value() + ".db";
auto res2 = std::remove(db_file.c_str()); auto res2 = std::remove(db_file.c_str());
CHECK_FAIL_RETURN_UNEXPECTED(!std::ifstream(whole_path.value() + ".db") == true, CHECK_FAIL_RETURN_UNEXPECTED(!std::ifstream(whole_path.value() + ".db") == true,
"Failed to delete db file, path: " + file + ".db"); "Invalid file, failed to remove the old mindrecord meta files when trying to "
"overwrite mindrecord files. Please check file path and permission: " +
file + ".db");
if (res2 == 0) { if (res2 == 0) {
MS_LOG(WARNING) << "Succeed to delete metadata file, path: " << file + ".db"; MS_LOG(WARNING) << "Succeed to remove the old mindrecord metadata files, path: " << file + ".db";
} }
} else { } else {
RETURN_STATUS_UNEXPECTED("Invalid file, Mindrecord files already existed in path: " + file); RETURN_STATUS_UNEXPECTED(
"Invalid file, mindrecord files already exist. Please check file path: " + file +
+".\nIf you do not want to keep the files, set the 'overwrite' parameter to True and try again.");
} }
} else { } else {
fs->close(); fs->close();
@ -112,17 +122,23 @@ Status ShardWriter::OpenDataFiles(bool append, bool overwrite) {
// open the mindrecord file to write // open the mindrecord file to write
fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary | std::ios::trunc); fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary | std::ios::trunc);
if (!fs->good()) { if (!fs->good()) {
RETURN_STATUS_UNEXPECTED("Failed to open file, path: " + file); RETURN_STATUS_UNEXPECTED(
"Invalid file, failed to open files for writing mindrecord files. Please check file path, permission and "
"open file limit: " +
file);
} }
} else { } else {
// open the mindrecord file to append // open the mindrecord file to append
fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary); fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary);
if (!fs->good()) { if (!fs->good()) {
fs->close(); fs->close();
RETURN_STATUS_UNEXPECTED("Failed to open file for append data, path: " + file); RETURN_STATUS_UNEXPECTED(
"Invalid file, failed to open files for appending mindrecord files. Please check file path, permission and "
"open file limit: " +
file);
} }
} }
MS_LOG(INFO) << "Succeed to open shard file, path: " << file; MS_LOG(INFO) << "Succeed to open mindrecord shard file, path: " << file;
file_streams_.push_back(fs); file_streams_.push_back(fs);
} }
return Status::OK(); return Status::OK();
@ -143,7 +159,7 @@ Status ShardWriter::RemoveLockFile() {
} }
Status ShardWriter::InitLockFile() { Status ShardWriter::InitLockFile() {
CHECK_FAIL_RETURN_UNEXPECTED(file_paths_.size() != 0, "Invalid data, file_paths_ is not initialized."); CHECK_FAIL_RETURN_UNEXPECTED(file_paths_.size() != 0, "[Internal ERROR] 'file_paths_' is not initialized.");
lock_file_ = file_paths_[0] + kLockFileSuffix; lock_file_ = file_paths_[0] + kLockFileSuffix;
pages_file_ = file_paths_[0] + kPageFileSuffix; pages_file_ = file_paths_[0] + kPageFileSuffix;
@ -154,8 +170,8 @@ Status ShardWriter::InitLockFile() {
Status ShardWriter::Open(const std::vector<std::string> &paths, bool append, bool overwrite) { Status ShardWriter::Open(const std::vector<std::string> &paths, bool append, bool overwrite) {
shard_count_ = paths.size(); shard_count_ = paths.size();
CHECK_FAIL_RETURN_UNEXPECTED(schema_count_ <= kMaxSchemaCount, CHECK_FAIL_RETURN_UNEXPECTED(schema_count_ <= kMaxSchemaCount,
"Invalid data, schema_count_ must be less than or equal to " + "[Internal ERROR] 'schema_count_' must be less than or equal to " +
std::to_string(kMaxSchemaCount) + ", but got " + std::to_string(schema_count_)); std::to_string(kMaxSchemaCount) + ", but got: " + std::to_string(schema_count_));
// Get full path from file name // Get full path from file name
RETURN_IF_NOT_OK(GetFullPathFromFileName(paths)); RETURN_IF_NOT_OK(GetFullPathFromFileName(paths));
@ -167,7 +183,8 @@ Status ShardWriter::Open(const std::vector<std::string> &paths, bool append, boo
} }
Status ShardWriter::OpenForAppend(const std::string &path) { Status ShardWriter::OpenForAppend(const std::string &path) {
CHECK_FAIL_RETURN_UNEXPECTED(IsLegalFile(path), "Invalid file, path: " + path); CHECK_FAIL_RETURN_UNEXPECTED(
IsLegalFile(path), "Invalid file, failed to verify files for append mindrecord files. Please check file: " + path);
std::shared_ptr<json> header_ptr; std::shared_ptr<json> header_ptr;
RETURN_IF_NOT_OK(ShardHeader::BuildSingleHeader(path, &header_ptr)); RETURN_IF_NOT_OK(ShardHeader::BuildSingleHeader(path, &header_ptr));
auto ds = std::make_shared<std::vector<std::string>>(); auto ds = std::make_shared<std::vector<std::string>>();
@ -262,7 +279,9 @@ void ShardWriter::DeleteErrorData(std::map<uint64_t, std::vector<json>> &raw_dat
for (auto &subMg : sub_err_mg) { for (auto &subMg : sub_err_mg) {
int loc = subMg.first; int loc = subMg.first;
std::string message = subMg.second; std::string message = subMg.second;
MS_LOG(ERROR) << "Invalid input, the " << loc + 1 << " th data is invalid, " << message; MS_LOG(ERROR) << "Invalid input, the " << loc + 1
<< " th data provided by user is invalid while writing mindrecord files. Please fix the error: "
<< message;
(void)delete_set.insert(loc); (void)delete_set.insert(loc);
} }
} }
@ -299,8 +318,8 @@ Status ShardWriter::CheckDataTypeAndValue(const std::string &key, const json &va
(data_type == "int64" && !data[key].is_number_integer()) || (data_type == "int64" && !data[key].is_number_integer()) ||
(data_type == "float32" && !data[key].is_number_float()) || (data_type == "float32" && !data[key].is_number_float()) ||
(data_type == "float64" && !data[key].is_number_float()) || (data_type == "string" && !data[key].is_string())) { (data_type == "float64" && !data[key].is_number_float()) || (data_type == "string" && !data[key].is_string())) {
std::string message = std::string message = "Invalid input, for field: " + key + ", type: " + data_type +
"field: " + key + " ,type : " + data_type + " ,value: " + data[key].dump() + " is not matched."; " and value: " + data[key].dump() + " do not match while writing mindrecord files.";
PopulateMutexErrorData(i, message, err_raw_data); PopulateMutexErrorData(i, message, err_raw_data);
RETURN_STATUS_UNEXPECTED(message); RETURN_STATUS_UNEXPECTED(message);
} }
@ -309,8 +328,8 @@ Status ShardWriter::CheckDataTypeAndValue(const std::string &key, const json &va
int64_t temp_value = data[key]; int64_t temp_value = data[key];
if (static_cast<int64_t>(temp_value) < static_cast<int64_t>(std::numeric_limits<int32_t>::min()) && if (static_cast<int64_t>(temp_value) < static_cast<int64_t>(std::numeric_limits<int32_t>::min()) &&
static_cast<int64_t>(temp_value) > static_cast<int64_t>(std::numeric_limits<int32_t>::max())) { static_cast<int64_t>(temp_value) > static_cast<int64_t>(std::numeric_limits<int32_t>::max())) {
std::string message = std::string message = "Invalid input, for field: " + key + "and its type: " + data_type +
"field: " + key + " ,type : " + data_type + " ,value: " + data[key].dump() + " is out of range."; ", value: " + data[key].dump() + " is out of range while writing mindrecord files.";
PopulateMutexErrorData(i, message, err_raw_data); PopulateMutexErrorData(i, message, err_raw_data);
RETURN_STATUS_UNEXPECTED(message); RETURN_STATUS_UNEXPECTED(message);
} }
@ -366,7 +385,7 @@ Status ShardWriter::CheckData(const std::map<uint64_t, std::vector<json>> &raw_d
// calculate start position and end position for each thread // calculate start position and end position for each thread
int batch_size = rawdata_iter->second.size() / shard_count_; int batch_size = rawdata_iter->second.size() / shard_count_;
int thread_num = shard_count_; int thread_num = shard_count_;
CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "Invalid data, thread_num should be positive."); CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "[Internal ERROR] 'thread_num' should be positive.");
if (thread_num > kMaxThreadCount) { if (thread_num > kMaxThreadCount) {
thread_num = kMaxThreadCount; thread_num = kMaxThreadCount;
} }
@ -387,7 +406,7 @@ Status ShardWriter::CheckData(const std::map<uint64_t, std::vector<json>> &raw_d
} }
CHECK_FAIL_RETURN_UNEXPECTED( CHECK_FAIL_RETURN_UNEXPECTED(
thread_num <= kMaxThreadCount, thread_num <= kMaxThreadCount,
"Invalid data, thread_num should be less than or equal to " + std::to_string(kMaxThreadCount)); "[Internal ERROR] 'thread_num' should be less than or equal to " + std::to_string(kMaxThreadCount));
// Wait for threads done // Wait for threads done
for (int x = 0; x < thread_num; ++x) { for (int x = 0; x < thread_num; ++x) {
thread_set[x].join(); thread_set[x].join();
@ -404,7 +423,8 @@ Status ShardWriter::ValidateRawData(std::map<uint64_t, std::vector<json>> &raw_d
RETURN_UNEXPECTED_IF_NULL(count_ptr); RETURN_UNEXPECTED_IF_NULL(count_ptr);
auto rawdata_iter = raw_data.begin(); auto rawdata_iter = raw_data.begin();
schema_count_ = raw_data.size(); schema_count_ = raw_data.size();
CHECK_FAIL_RETURN_UNEXPECTED(schema_count_ > 0, "Invalid data, schema count should be positive."); CHECK_FAIL_RETURN_UNEXPECTED(schema_count_ > 0, "Invalid data, the number of schema should be positive but got: " +
std::to_string(schema_count_) + ". Please check the input schema.");
// keep schema_id // keep schema_id
std::set<int64_t> schema_ids; std::set<int64_t> schema_ids;
@ -412,17 +432,18 @@ Status ShardWriter::ValidateRawData(std::map<uint64_t, std::vector<json>> &raw_d
// Determine if the number of schemas is the same // Determine if the number of schemas is the same
CHECK_FAIL_RETURN_UNEXPECTED(shard_header_->GetSchemas().size() == schema_count_, CHECK_FAIL_RETURN_UNEXPECTED(shard_header_->GetSchemas().size() == schema_count_,
"Invalid data, schema count: " + std::to_string(schema_count_) + " is not matched."); "[Internal ERROR] 'schema_count_' and the schema count in schema: " +
std::to_string(schema_count_) + " do not match.");
// Determine raw_data size == blob_data size // Determine raw_data size == blob_data size
CHECK_FAIL_RETURN_UNEXPECTED(raw_data[0].size() == blob_data.size(), CHECK_FAIL_RETURN_UNEXPECTED(raw_data[0].size() == blob_data.size(),
"Invalid data, raw data size: " + std::to_string(raw_data[0].size()) + "[Internal ERROR] raw data size: " + std::to_string(raw_data[0].size()) +
" is not equal to blob data size: " + std::to_string(blob_data.size()) + "."); " is not equal to blob data size: " + std::to_string(blob_data.size()) + ".");
// Determine whether the number of samples corresponding to each schema is the same // Determine whether the number of samples corresponding to each schema is the same
for (rawdata_iter = raw_data.begin(); rawdata_iter != raw_data.end(); ++rawdata_iter) { for (rawdata_iter = raw_data.begin(); rawdata_iter != raw_data.end(); ++rawdata_iter) {
CHECK_FAIL_RETURN_UNEXPECTED(row_count_ == rawdata_iter->second.size(), CHECK_FAIL_RETURN_UNEXPECTED(row_count_ == rawdata_iter->second.size(),
"Invalid data, number of samples: " + std::to_string(rawdata_iter->second.size()) + "[Internal ERROR] 'row_count_': " + std::to_string(rawdata_iter->second.size()) +
" for schema is not matched."); " for each schema is not the same.");
(void)schema_ids.insert(rawdata_iter->first); (void)schema_ids.insert(rawdata_iter->first);
} }
const std::vector<std::shared_ptr<Schema>> &schemas = shard_header_->GetSchemas(); const std::vector<std::shared_ptr<Schema>> &schemas = shard_header_->GetSchemas();
@ -431,7 +452,7 @@ Status ShardWriter::ValidateRawData(std::map<uint64_t, std::vector<json>> &raw_d
[schema_ids](const std::shared_ptr<Schema> &schema) { [schema_ids](const std::shared_ptr<Schema> &schema) {
return schema_ids.find(schema->GetSchemaID()) == schema_ids.end(); return schema_ids.find(schema->GetSchemaID()) == schema_ids.end();
}), }),
"Invalid data, schema id of data is not matched."); "[Internal ERROR] schema id in 'schemas' can not found in 'schema_ids'.");
if (!sign) { if (!sign) {
*count_ptr = std::make_shared<std::pair<int, int>>(schema_count_, row_count_); *count_ptr = std::make_shared<std::pair<int, int>>(schema_count_, row_count_);
return Status::OK(); return Status::OK();
@ -487,7 +508,7 @@ Status ShardWriter::LockWriter(bool parallel_writer, std::unique_ptr<int> *fd_pt
flock(fd, LOCK_EX); flock(fd, LOCK_EX);
} else { } else {
close(fd); close(fd);
RETURN_STATUS_UNEXPECTED("Failed to lock file, path: " + lock_file_); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to lock file, path: " + lock_file_);
} }
#endif #endif
@ -497,20 +518,20 @@ Status ShardWriter::LockWriter(bool parallel_writer, std::unique_ptr<int> *fd_pt
auto realpath = FileUtils::GetRealPath(file.data()); auto realpath = FileUtils::GetRealPath(file.data());
if (!realpath.has_value()) { if (!realpath.has_value()) {
close(fd); close(fd);
RETURN_STATUS_UNEXPECTED("Failed to get real path, path: " + file); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to get real path, path: " + file);
} }
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(realpath.value(), std::ios::in | std::ios::out | std::ios::binary); fs->open(realpath.value(), std::ios::in | std::ios::out | std::ios::binary);
if (fs->fail()) { if (fs->fail()) {
close(fd); close(fd);
RETURN_STATUS_UNEXPECTED("Failed to open file, path: " + file); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open file, path: " + file);
} }
file_streams_.push_back(fs); file_streams_.push_back(fs);
} }
auto status = shard_header_->FileToPages(pages_file_); auto status = shard_header_->FileToPages(pages_file_);
if (status.IsError()) { if (status.IsError()) {
close(fd); close(fd);
RETURN_STATUS_UNEXPECTED("Error raised in FileToPages function."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Error raised in FileToPages function.");
} }
*fd_ptr = std::make_unique<int>(fd); *fd_ptr = std::make_unique<int>(fd);
return Status::OK(); return Status::OK();
@ -540,8 +561,9 @@ Status ShardWriter::WriteRawDataPreCheck(std::map<uint64_t, std::vector<json>> &
// check the free disk size // check the free disk size
std::shared_ptr<uint64_t> size_ptr; std::shared_ptr<uint64_t> size_ptr;
RETURN_IF_NOT_OK(GetDiskSize(file_paths_[0], kFreeSize, &size_ptr)); RETURN_IF_NOT_OK(GetDiskSize(file_paths_[0], kFreeSize, &size_ptr));
CHECK_FAIL_RETURN_UNEXPECTED(*size_ptr >= kMinFreeDiskSize, CHECK_FAIL_RETURN_UNEXPECTED(
"No free disk to be used, free disk size: " + std::to_string(*size_ptr)); *size_ptr >= kMinFreeDiskSize,
"No free disk to be used while writing mindrecord files, available free disk size: " + std::to_string(*size_ptr));
// compress blob // compress blob
if (shard_column_->CheckCompressBlob()) { if (shard_column_->CheckCompressBlob()) {
for (auto &blob : blob_data) { for (auto &blob : blob_data) {
@ -615,7 +637,7 @@ Status ShardWriter::WriteRawData(std::map<uint64_t, std::vector<json>> &raw_data
// Serialize raw data // Serialize raw data
RETURN_IF_NOT_OK(WriteRawDataPreCheck(raw_data, blob_data, sign, &schema_count, &row_count)); RETURN_IF_NOT_OK(WriteRawDataPreCheck(raw_data, blob_data, sign, &schema_count, &row_count));
CHECK_FAIL_RETURN_UNEXPECTED(row_count >= kInt0, "Invalid data, raw data size should be positive."); CHECK_FAIL_RETURN_UNEXPECTED(row_count >= kInt0, "[Internal ERROR] the size of raw data should be positive.");
if (row_count == kInt0) { if (row_count == kInt0) {
return Status::OK(); return Status::OK();
} }
@ -676,7 +698,7 @@ Status ShardWriter::ParallelWriteData(const std::vector<std::vector<uint8_t>> &b
auto shards = BreakIntoShards(); auto shards = BreakIntoShards();
// define the number of thread // define the number of thread
int thread_num = static_cast<int>(shard_count_); int thread_num = static_cast<int>(shard_count_);
CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "Invalid data, thread_num should be positive."); CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "[Internal ERROR] 'thread_num' should be positive.");
if (thread_num > kMaxThreadCount) { if (thread_num > kMaxThreadCount) {
thread_num = kMaxThreadCount; thread_num = kMaxThreadCount;
} }
@ -741,13 +763,13 @@ Status ShardWriter::CutRowGroup(int start_row, int end_row, const std::vector<st
int page_start_row = start_row; int page_start_row = start_row;
CHECK_FAIL_RETURN_UNEXPECTED(start_row <= end_row, CHECK_FAIL_RETURN_UNEXPECTED(start_row <= end_row,
"Invalid data, start row: " + std::to_string(start_row) + "[Internal ERROR] 'start_row': " + std::to_string(start_row) +
" should be less than or equal to end row: " + std::to_string(end_row)); " should be less than or equal to 'end_row': " + std::to_string(end_row));
CHECK_FAIL_RETURN_UNEXPECTED( CHECK_FAIL_RETURN_UNEXPECTED(
end_row <= static_cast<int>(blob_data_size_.size()) && end_row <= static_cast<int>(raw_data_size_.size()), end_row <= static_cast<int>(blob_data_size_.size()) && end_row <= static_cast<int>(raw_data_size_.size()),
"Invalid data, end row: " + std::to_string(end_row) + " should be less than blob data size: " + "[Internal ERROR] 'end_row': " + std::to_string(end_row) + " should be less than 'blob_data_size': " +
std::to_string(blob_data_size_.size()) + " and raw data size: " + std::to_string(raw_data_size_.size()) + "."); std::to_string(blob_data_size_.size()) + " and 'raw_data_size': " + std::to_string(raw_data_size_.size()) + ".");
for (int i = start_row; i < end_row; ++i) { for (int i = start_row; i < end_row; ++i) {
// n_byte_blob(0) indicate appendBlobPage // n_byte_blob(0) indicate appendBlobPage
if (n_byte_blob == 0 || n_byte_blob + blob_data_size_[i] > page_size_ || if (n_byte_blob == 0 || n_byte_blob + blob_data_size_[i] > page_size_ ||
@ -780,7 +802,7 @@ Status ShardWriter::AppendBlobPage(const int &shard_id, const std::vector<std::v
auto &io_seekp = file_streams_[shard_id]->seekp(page_size_ * page_id + header_size_ + bytes_page, std::ios::beg); auto &io_seekp = file_streams_[shard_id]->seekp(page_size_ * page_id + header_size_ + bytes_page, std::ios::beg);
if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) { if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to seekg file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to seekg file.");
} }
(void)FlushBlobChunk(file_streams_[shard_id], blob_data, blob_row); (void)FlushBlobChunk(file_streams_[shard_id], blob_data, blob_row);
@ -808,7 +830,7 @@ Status ShardWriter::NewBlobPage(const int &shard_id, const std::vector<std::vect
auto &io_seekp = file_streams_[shard_id]->seekp(page_size_ * (page_id + 1) + header_size_, std::ios::beg); auto &io_seekp = file_streams_[shard_id]->seekp(page_size_ * (page_id + 1) + header_size_, std::ios::beg);
if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) { if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to seekg file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to seekg file.");
} }
(void)FlushBlobChunk(file_streams_[shard_id], blob_data, blob_row); (void)FlushBlobChunk(file_streams_[shard_id], blob_data, blob_row);
@ -847,32 +869,32 @@ Status ShardWriter::ShiftRawPage(const int &shard_id, const std::vector<std::pai
// Read last row group from previous raw data page // Read last row group from previous raw data page
CHECK_FAIL_RETURN_UNEXPECTED( CHECK_FAIL_RETURN_UNEXPECTED(
shard_id >= 0 && shard_id < file_streams_.size(), shard_id >= 0 && shard_id < file_streams_.size(),
"Invalid data, shard_id should be in range [0, " + std::to_string(file_streams_.size()) + ")."); "[Internal ERROR] 'shard_id' should be in range [0, " + std::to_string(file_streams_.size()) + ").");
auto &io_seekg = file_streams_[shard_id]->seekg( auto &io_seekg = file_streams_[shard_id]->seekg(
page_size_ * last_raw_page_id + header_size_ + last_row_group_id_offset, std::ios::beg); page_size_ * last_raw_page_id + header_size_ + last_row_group_id_offset, std::ios::beg);
if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) { if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to seekg file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to seekg file.");
} }
auto &io_read = file_streams_[shard_id]->read(reinterpret_cast<char *>(&buf[0]), buf.size()); auto &io_read = file_streams_[shard_id]->read(reinterpret_cast<char *>(&buf[0]), buf.size());
if (!io_read.good() || io_read.fail() || io_read.bad()) { if (!io_read.good() || io_read.fail() || io_read.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to read file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to read file.");
} }
// Merge into new row group at new raw data page // Merge into new row group at new raw data page
auto &io_seekp = file_streams_[shard_id]->seekp(page_size_ * (page_id + 1) + header_size_, std::ios::beg); auto &io_seekp = file_streams_[shard_id]->seekp(page_size_ * (page_id + 1) + header_size_, std::ios::beg);
if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) { if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to seekg file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to seekg file.");
} }
auto &io_handle = file_streams_[shard_id]->write(reinterpret_cast<char *>(&buf[0]), buf.size()); auto &io_handle = file_streams_[shard_id]->write(reinterpret_cast<char *>(&buf[0]), buf.size());
if (!io_handle.good() || io_handle.fail() || io_handle.bad()) { if (!io_handle.good() || io_handle.fail() || io_handle.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to write file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to write file.");
} }
last_raw_page->DeleteLastGroupId(); last_raw_page->DeleteLastGroupId();
(void)shard_header_->SetPage(last_raw_page); (void)shard_header_->SetPage(last_raw_page);
@ -935,7 +957,7 @@ Status ShardWriter::AppendRawPage(const int &shard_id, const std::vector<std::pa
file_streams_[shard_id]->seekp(page_size_ * last_raw_page_id + header_size_ + n_bytes, std::ios::beg); file_streams_[shard_id]->seekp(page_size_ * last_raw_page_id + header_size_ + n_bytes, std::ios::beg);
if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) { if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to seekg file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to seekg file.");
} }
if (chunk_id > 0) { if (chunk_id > 0) {
@ -958,7 +980,7 @@ Status ShardWriter::FlushBlobChunk(const std::shared_ptr<std::fstream> &out,
const std::pair<int, int> &blob_row) { const std::pair<int, int> &blob_row) {
CHECK_FAIL_RETURN_UNEXPECTED( CHECK_FAIL_RETURN_UNEXPECTED(
blob_row.first <= blob_row.second && blob_row.second <= static_cast<int>(blob_data.size()) && blob_row.first >= 0, blob_row.first <= blob_row.second && blob_row.second <= static_cast<int>(blob_data.size()) && blob_row.first >= 0,
"Invalid data, blob_row: " + std::to_string(blob_row.first) + ", " + std::to_string(blob_row.second) + "[Internal ERROR] 'blob_row': " + std::to_string(blob_row.first) + ", " + std::to_string(blob_row.second) +
" is invalid."); " is invalid.");
for (int j = blob_row.first; j < blob_row.second; ++j) { for (int j = blob_row.first; j < blob_row.second; ++j) {
// Write the size of blob // Write the size of blob
@ -966,7 +988,7 @@ Status ShardWriter::FlushBlobChunk(const std::shared_ptr<std::fstream> &out,
auto &io_handle = out->write(reinterpret_cast<char *>(&line_len), kInt64Len); auto &io_handle = out->write(reinterpret_cast<char *>(&line_len), kInt64Len);
if (!io_handle.good() || io_handle.fail() || io_handle.bad()) { if (!io_handle.good() || io_handle.fail() || io_handle.bad()) {
out->close(); out->close();
RETURN_STATUS_UNEXPECTED("Failed to write file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to write file.");
} }
// Write the data of blob // Write the data of blob
@ -974,7 +996,7 @@ Status ShardWriter::FlushBlobChunk(const std::shared_ptr<std::fstream> &out,
auto &io_handle_data = out->write(reinterpret_cast<char *>(&line[0]), line_len); auto &io_handle_data = out->write(reinterpret_cast<char *>(&line[0]), line_len);
if (!io_handle_data.good() || io_handle_data.fail() || io_handle_data.bad()) { if (!io_handle_data.good() || io_handle_data.fail() || io_handle_data.bad()) {
out->close(); out->close();
RETURN_STATUS_UNEXPECTED("Failed to write file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to write file.");
} }
} }
return Status::OK(); return Status::OK();
@ -990,7 +1012,7 @@ Status ShardWriter::FlushRawChunk(const std::shared_ptr<std::fstream> &out,
auto &io_handle = out->write(reinterpret_cast<char *>(&line_len), kInt64Len); auto &io_handle = out->write(reinterpret_cast<char *>(&line_len), kInt64Len);
if (!io_handle.good() || io_handle.fail() || io_handle.bad()) { if (!io_handle.good() || io_handle.fail() || io_handle.bad()) {
out->close(); out->close();
RETURN_STATUS_UNEXPECTED("Failed to write file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to write file.");
} }
} }
// Write the data of multi schemas // Write the data of multi schemas
@ -999,7 +1021,7 @@ Status ShardWriter::FlushRawChunk(const std::shared_ptr<std::fstream> &out,
auto &io_handle = out->write(reinterpret_cast<char *>(&line[0]), line.size()); auto &io_handle = out->write(reinterpret_cast<char *>(&line[0]), line.size());
if (!io_handle.good() || io_handle.fail() || io_handle.bad()) { if (!io_handle.good() || io_handle.fail() || io_handle.bad()) {
out->close(); out->close();
RETURN_STATUS_UNEXPECTED("Failed to write file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to write file.");
} }
} }
} }
@ -1041,32 +1063,32 @@ Status ShardWriter::WriteShardHeader() {
// Write header data to multi files // Write header data to multi files
CHECK_FAIL_RETURN_UNEXPECTED( CHECK_FAIL_RETURN_UNEXPECTED(
shard_count_ <= static_cast<int>(file_streams_.size()) && shard_count_ <= static_cast<int>(shard_header.size()), shard_count_ <= static_cast<int>(file_streams_.size()) && shard_count_ <= static_cast<int>(shard_header.size()),
"Invalid data, shard count should be less than or equal to file size: " + std::to_string(file_streams_.size()) + "[Internal ERROR] 'shard_count_' should be less than or equal to 'file_stream_' size: " +
", and header size: " + std::to_string(shard_header.size()) + "."); std::to_string(file_streams_.size()) + ", and 'shard_header' size: " + std::to_string(shard_header.size()) + ".");
if (shard_count_ <= kMaxShardCount) { if (shard_count_ <= kMaxShardCount) {
for (int shard_id = 0; shard_id < shard_count_; ++shard_id) { for (int shard_id = 0; shard_id < shard_count_; ++shard_id) {
auto &io_seekp = file_streams_[shard_id]->seekp(0, std::ios::beg); auto &io_seekp = file_streams_[shard_id]->seekp(0, std::ios::beg);
if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) { if (!io_seekp.good() || io_seekp.fail() || io_seekp.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to seekp file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to seekp file.");
} }
std::vector<uint8_t> bin_header(shard_header[shard_id].begin(), shard_header[shard_id].end()); std::vector<uint8_t> bin_header(shard_header[shard_id].begin(), shard_header[shard_id].end());
uint64_t line_len = bin_header.size(); uint64_t line_len = bin_header.size();
if (line_len + kInt64Len > header_size_) { if (line_len + kInt64Len > header_size_) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("shard header is too big."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] shard header is too big.");
} }
auto &io_handle = file_streams_[shard_id]->write(reinterpret_cast<char *>(&line_len), kInt64Len); auto &io_handle = file_streams_[shard_id]->write(reinterpret_cast<char *>(&line_len), kInt64Len);
if (!io_handle.good() || io_handle.fail() || io_handle.bad()) { if (!io_handle.good() || io_handle.fail() || io_handle.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to write file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to write file.");
} }
auto &io_handle_header = file_streams_[shard_id]->write(reinterpret_cast<char *>(&bin_header[0]), line_len); auto &io_handle_header = file_streams_[shard_id]->write(reinterpret_cast<char *>(&bin_header[0]), line_len);
if (!io_handle_header.good() || io_handle_header.fail() || io_handle_header.bad()) { if (!io_handle_header.good() || io_handle_header.fail() || io_handle_header.bad()) {
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
RETURN_STATUS_UNEXPECTED("Failed to write file."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to write file.");
} }
file_streams_[shard_id]->close(); file_streams_[shard_id]->close();
} }
@ -1100,7 +1122,7 @@ Status ShardWriter::SerializeRawData(std::map<uint64_t, std::vector<json>> &raw_
// Set obstacles to prevent the main thread from running // Set obstacles to prevent the main thread from running
thread_set[x].join(); thread_set[x].join();
} }
CHECK_FAIL_RETURN_SYNTAX_ERROR(flag_ != true, "Error raised in FillArray function."); CHECK_FAIL_RETURN_SYNTAX_ERROR(flag_ != true, "[Internal ERROR] Error raised in FillArray function.");
return Status::OK(); return Status::OK();
} }
@ -1111,9 +1133,10 @@ Status ShardWriter::SetRawDataSize(const std::vector<std::vector<uint8_t>> &bin_
bin_raw_data.begin() + (i * schema_count_), bin_raw_data.begin() + (i * schema_count_) + schema_count_, 0, bin_raw_data.begin() + (i * schema_count_), bin_raw_data.begin() + (i * schema_count_) + schema_count_, 0,
[](uint64_t accumulator, const std::vector<uint8_t> &row) { return accumulator + kInt64Len + row.size(); }); [](uint64_t accumulator, const std::vector<uint8_t> &row) { return accumulator + kInt64Len + row.size(); });
} }
CHECK_FAIL_RETURN_SYNTAX_ERROR( CHECK_FAIL_RETURN_SYNTAX_ERROR(*std::max_element(raw_data_size_.begin(), raw_data_size_.end()) <= page_size_,
*std::max_element(raw_data_size_.begin(), raw_data_size_.end()) <= page_size_, "Invalid data, Page size: " + std::to_string(page_size_) +
"Invalid data, Page size: " + std::to_string(page_size_) + " is too small to save a raw row!"); " is too small to save a raw row. Please try to use the mindrecord api "
"'set_page_size(1<<25)' to enable 64MB page size.");
return Status::OK(); return Status::OK();
} }
@ -1121,16 +1144,17 @@ Status ShardWriter::SetBlobDataSize(const std::vector<std::vector<uint8_t>> &blo
blob_data_size_ = std::vector<uint64_t>(row_count_); blob_data_size_ = std::vector<uint64_t>(row_count_);
(void)std::transform(blob_data.begin(), blob_data.end(), blob_data_size_.begin(), (void)std::transform(blob_data.begin(), blob_data.end(), blob_data_size_.begin(),
[](const std::vector<uint8_t> &row) { return kInt64Len + row.size(); }); [](const std::vector<uint8_t> &row) { return kInt64Len + row.size(); });
CHECK_FAIL_RETURN_SYNTAX_ERROR( CHECK_FAIL_RETURN_SYNTAX_ERROR(*std::max_element(blob_data_size_.begin(), blob_data_size_.end()) <= page_size_,
*std::max_element(blob_data_size_.begin(), blob_data_size_.end()) <= page_size_, "Invalid data, Page size: " + std::to_string(page_size_) +
"Invalid data, Page size: " + std::to_string(page_size_) + " is too small to save a blob row!"); " is too small to save a blob row. Please try to use the mindrecord api "
"'set_page_size(1<<25)' to enable 64MB page size.");
return Status::OK(); return Status::OK();
} }
Status ShardWriter::SetLastRawPage(const int &shard_id, std::shared_ptr<Page> &last_raw_page) { Status ShardWriter::SetLastRawPage(const int &shard_id, std::shared_ptr<Page> &last_raw_page) {
// Get last raw page // Get last raw page
auto last_raw_page_id = shard_header_->GetLastPageIdByType(shard_id, kPageTypeRaw); auto last_raw_page_id = shard_header_->GetLastPageIdByType(shard_id, kPageTypeRaw);
CHECK_FAIL_RETURN_SYNTAX_ERROR(last_raw_page_id >= 0, "Invalid data, last_raw_page_id: " + CHECK_FAIL_RETURN_SYNTAX_ERROR(last_raw_page_id >= 0, "[Internal ERROR] 'last_raw_page_id': " +
std::to_string(last_raw_page_id) + " should be positive."); std::to_string(last_raw_page_id) + " should be positive.");
RETURN_IF_NOT_OK(shard_header_->GetPage(shard_id, last_raw_page_id, &last_raw_page)); RETURN_IF_NOT_OK(shard_header_->GetPage(shard_id, last_raw_page_id, &last_raw_page));
return Status::OK(); return Status::OK();
@ -1139,7 +1163,7 @@ Status ShardWriter::SetLastRawPage(const int &shard_id, std::shared_ptr<Page> &l
Status ShardWriter::SetLastBlobPage(const int &shard_id, std::shared_ptr<Page> &last_blob_page) { Status ShardWriter::SetLastBlobPage(const int &shard_id, std::shared_ptr<Page> &last_blob_page) {
// Get last blob page // Get last blob page
auto last_blob_page_id = shard_header_->GetLastPageIdByType(shard_id, kPageTypeBlob); auto last_blob_page_id = shard_header_->GetLastPageIdByType(shard_id, kPageTypeBlob);
CHECK_FAIL_RETURN_SYNTAX_ERROR(last_blob_page_id >= 0, "Invalid data, last_blob_page_id: " + CHECK_FAIL_RETURN_SYNTAX_ERROR(last_blob_page_id >= 0, "[Internal ERROR] 'last_blob_page_id': " +
std::to_string(last_blob_page_id) + " should be positive."); std::to_string(last_blob_page_id) + " should be positive.");
RETURN_IF_NOT_OK(shard_header_->GetPage(shard_id, last_blob_page_id, &last_blob_page)); RETURN_IF_NOT_OK(shard_header_->GetPage(shard_id, last_blob_page_id, &last_blob_page));
return Status::OK(); return Status::OK();

View File

@ -61,20 +61,27 @@ Status ShardHeader::InitializeHeader(const std::vector<json> &headers, bool load
Status ShardHeader::CheckFileStatus(const std::string &path) { Status ShardHeader::CheckFileStatus(const std::string &path) {
auto realpath = FileUtils::GetRealPath(path.data()); auto realpath = FileUtils::GetRealPath(path.data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + path); CHECK_FAIL_RETURN_UNEXPECTED(
realpath.has_value(),
"Invalid file, failed to get the realpath of mindrecord files. Please check file path: " + path);
std::ifstream fin(realpath.value(), std::ios::in | std::ios::binary); std::ifstream fin(realpath.value(), std::ios::in | std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(fin, "Failed to open file, file path: " + path); CHECK_FAIL_RETURN_UNEXPECTED(fin,
"Invalid file, failed to open files for loading mindrecord files. Please check file "
"path, permission and open file limit: " +
path);
// fetch file size // fetch file size
auto &io_seekg = fin.seekg(0, std::ios::end); auto &io_seekg = fin.seekg(0, std::ios::end);
if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) { if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) {
fin.close(); fin.close();
RETURN_STATUS_UNEXPECTED("Failed to seekg file, file path: " + path); RETURN_STATUS_UNEXPECTED("[Internal ERROR] failed to seekg file, file path: " + path);
} }
size_t file_size = fin.tellg(); size_t file_size = fin.tellg();
if (file_size < kMinFileSize) { if (file_size < kMinFileSize) {
fin.close(); fin.close();
RETURN_STATUS_UNEXPECTED("Invalid file content, file " + path + " size is smaller than the lower limit."); RETURN_STATUS_UNEXPECTED("Invalid file, the size of mindrecord file: " + std::to_string(file_size) +
" is smaller than the lower limit: " + std::to_string(kMinFileSize) +
".\n Please use 'FileWriter' to generate valid mindrecord files.");
} }
fin.close(); fin.close();
return Status::OK(); return Status::OK();
@ -86,18 +93,23 @@ Status ShardHeader::ValidateHeader(const std::string &path, std::shared_ptr<json
// read header size // read header size
json json_header; json json_header;
std::ifstream fin(common::SafeCStr(path), std::ios::in | std::ios::binary); std::ifstream fin(common::SafeCStr(path), std::ios::in | std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(fin.is_open(), "Failed to open file, file path: " + path); CHECK_FAIL_RETURN_UNEXPECTED(fin.is_open(),
"Invalid file, failed to open files for loading mindrecord files. Please check file "
"path, permission and open file limit: " +
path);
uint64_t header_size = 0; uint64_t header_size = 0;
auto &io_read = fin.read(reinterpret_cast<char *>(&header_size), kInt64Len); auto &io_read = fin.read(reinterpret_cast<char *>(&header_size), kInt64Len);
if (!io_read.good() || io_read.fail() || io_read.bad()) { if (!io_read.good() || io_read.fail() || io_read.bad()) {
fin.close(); fin.close();
RETURN_STATUS_UNEXPECTED("Failed to read file, file path: " + path); RETURN_STATUS_UNEXPECTED("[Internal ERROR] failed to read file, file path: " + path);
} }
if (header_size > kMaxHeaderSize) { if (header_size > kMaxHeaderSize) {
fin.close(); fin.close();
RETURN_STATUS_UNEXPECTED("Invalid file content, incorrect file or file header is exceeds the upper limit."); RETURN_STATUS_UNEXPECTED(
"Invalid file, the size of mindrecord file header is larger than the upper limit. \nPlease use 'FileWriter' to "
"generate valid mindrecord files.");
} }
// read header content // read header content
@ -105,7 +117,7 @@ Status ShardHeader::ValidateHeader(const std::string &path, std::shared_ptr<json
auto &io_read_content = fin.read(reinterpret_cast<char *>(&header_content[0]), header_size); auto &io_read_content = fin.read(reinterpret_cast<char *>(&header_content[0]), header_size);
if (!io_read_content.good() || io_read_content.fail() || io_read_content.bad()) { if (!io_read_content.good() || io_read_content.fail() || io_read_content.bad()) {
fin.close(); fin.close();
RETURN_STATUS_UNEXPECTED("Failed to read file, file path: " + path); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to read file, file path: " + path);
} }
fin.close(); fin.close();
@ -114,7 +126,8 @@ Status ShardHeader::ValidateHeader(const std::string &path, std::shared_ptr<json
try { try {
json_header = json::parse(raw_header_content); json_header = json::parse(raw_header_content);
} catch (json::parse_error &e) { } catch (json::parse_error &e) {
RETURN_STATUS_UNEXPECTED("Json parse failed: " + std::string(e.what())); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to parse the metadata in JSON format in the mindrecord files: " +
std::string(e.what()));
} }
*header_ptr = std::make_shared<json>(json_header); *header_ptr = std::make_shared<json>(json_header);
return Status::OK(); return Status::OK();
@ -165,7 +178,7 @@ Status ShardHeader::BuildDataset(const std::vector<std::string> &file_paths, boo
} }
if (thread_status) { if (thread_status) {
thread_status = false; thread_status = false;
RETURN_STATUS_UNEXPECTED("Error occurred in GetHeadersOneTask thread."); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Error raised in GetHeadersOneTask function.");
} }
RETURN_IF_NOT_OK(InitializeHeader(headers, load_dataset)); RETURN_IF_NOT_OK(InitializeHeader(headers, load_dataset));
return Status::OK(); return Status::OK();
@ -186,8 +199,8 @@ void ShardHeader::GetHeadersOneTask(int start, int end, std::vector<json> &heade
(*header)["shard_addresses"] = realAddresses; (*header)["shard_addresses"] = realAddresses;
if (std::find(kSupportedVersion.begin(), kSupportedVersion.end(), (*header)["version"]) == if (std::find(kSupportedVersion.begin(), kSupportedVersion.end(), (*header)["version"]) ==
kSupportedVersion.end()) { kSupportedVersion.end()) {
MS_LOG(ERROR) << "Invalid version, file version " << (*header)["version"].dump() << " can not match lib version " MS_LOG(ERROR) << "Invalid file, the version of mindrecord files" << (*header)["version"].dump()
<< kVersion << "."; << " is not supported.\nPlease use 'FileWriter' to generate valid mindrecord files.";
thread_status = true; thread_status = true;
return; return;
} }
@ -205,8 +218,8 @@ Status ShardHeader::InitByFiles(const std::vector<std::string> &file_paths) {
shard_addresses_ = std::move(file_names); shard_addresses_ = std::move(file_names);
shard_count_ = file_paths.size(); shard_count_ = file_paths.size();
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ != 0 && (shard_count_ <= kMaxShardCount), CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ != 0 && (shard_count_ <= kMaxShardCount),
"Invalid input, The number of MindRecord files " + std::to_string(shard_count_) + "[Internal ERROR] 'shard_count_': " + std::to_string(shard_count_) +
"is not int range (0, " + std::to_string(kMaxShardCount) + "]."); "is not in range (0, " + std::to_string(kMaxShardCount) + "].");
pages_.resize(shard_count_); pages_.resize(shard_count_);
return Status::OK(); return Status::OK();
} }
@ -225,9 +238,10 @@ Status ShardHeader::ParseIndexFields(const json &index_fields) {
Status ShardHeader::ParsePage(const json &pages, int shard_index, bool load_dataset) { Status ShardHeader::ParsePage(const json &pages, int shard_index, bool load_dataset) {
// set shard_index when load_dataset is false // set shard_index when load_dataset is false
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxFileCount, "Invalid input, The number of MindRecord files " + CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxFileCount,
std::to_string(shard_count_) + "is not int range (0, " + "Invalid file, the number of mindrecord files: " + std::to_string(shard_count_) +
std::to_string(kMaxFileCount) + "]."); "is not in range (0, " + std::to_string(kMaxFileCount) +
"].\nPlease use 'FileWriter' to generate fewer mindrecord files.");
if (pages_.empty()) { if (pages_.empty()) {
pages_.resize(shard_count_); pages_.resize(shard_count_);
} }
@ -261,7 +275,7 @@ Status ShardHeader::ParseStatistics(const json &statistics) {
for (auto &statistic : statistics) { for (auto &statistic : statistics) {
CHECK_FAIL_RETURN_UNEXPECTED( CHECK_FAIL_RETURN_UNEXPECTED(
statistic.find("desc") != statistic.end() && statistic.find("statistics") != statistic.end(), statistic.find("desc") != statistic.end() && statistic.find("statistics") != statistic.end(),
"Failed to deserialize statistics, statistic info: " + statistics.dump()); "[Internal ERROR] Failed to deserialize statistics: " + statistics.dump());
std::string statistic_description = statistic["desc"].get<std::string>(); std::string statistic_description = statistic["desc"].get<std::string>();
json statistic_body = statistic["statistics"]; json statistic_body = statistic["statistics"];
std::shared_ptr<Statistics> parsed_statistic = Statistics::Build(statistic_description, statistic_body); std::shared_ptr<Statistics> parsed_statistic = Statistics::Build(statistic_description, statistic_body);
@ -276,7 +290,7 @@ Status ShardHeader::ParseSchema(const json &schemas) {
// change how we get schemaBody once design is finalized // change how we get schemaBody once design is finalized
CHECK_FAIL_RETURN_UNEXPECTED(schema.find("desc") != schema.end() && schema.find("blob_fields") != schema.end() && CHECK_FAIL_RETURN_UNEXPECTED(schema.find("desc") != schema.end() && schema.find("blob_fields") != schema.end() &&
schema.find("schema") != schema.end(), schema.find("schema") != schema.end(),
"Failed to deserialize schema, schema info: " + schema.dump()); "[Internal ERROR] Failed to deserialize schema: " + schema.dump());
std::string schema_description = schema["desc"].get<std::string>(); std::string schema_description = schema["desc"].get<std::string>();
std::vector<std::string> blob_fields = schema["blob_fields"].get<std::vector<std::string>>(); std::vector<std::string> blob_fields = schema["blob_fields"].get<std::vector<std::string>>();
json schema_body = schema["schema"]; json schema_body = schema["schema"];
@ -373,7 +387,7 @@ Status ShardHeader::GetPage(const int &shard_id, const int &page_id, std::shared
return Status::OK(); return Status::OK();
} }
page_ptr = nullptr; page_ptr = nullptr;
RETURN_STATUS_UNEXPECTED("Failed to get Page, 'page_id': " + std::to_string(page_id)); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to get Page, 'page_id': " + std::to_string(page_id));
} }
Status ShardHeader::SetPage(const std::shared_ptr<Page> &new_page) { Status ShardHeader::SetPage(const std::shared_ptr<Page> &new_page) {
@ -383,7 +397,7 @@ Status ShardHeader::SetPage(const std::shared_ptr<Page> &new_page) {
pages_[shard_id][page_id] = new_page; pages_[shard_id][page_id] = new_page;
return Status::OK(); return Status::OK();
} }
RETURN_STATUS_UNEXPECTED("Failed to set Page, 'page_id': " + std::to_string(page_id)); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to set Page, 'page_id': " + std::to_string(page_id));
} }
Status ShardHeader::AddPage(const std::shared_ptr<Page> &new_page) { Status ShardHeader::AddPage(const std::shared_ptr<Page> &new_page) {
@ -393,7 +407,7 @@ Status ShardHeader::AddPage(const std::shared_ptr<Page> &new_page) {
pages_[shard_id].push_back(new_page); pages_[shard_id].push_back(new_page);
return Status::OK(); return Status::OK();
} }
RETURN_STATUS_UNEXPECTED("Failed to add Page, 'page_id': " + std::to_string(page_id)); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to add Page, 'page_id': " + std::to_string(page_id));
} }
int64_t ShardHeader::GetLastPageId(const int &shard_id) { int64_t ShardHeader::GetLastPageId(const int &shard_id) {
@ -419,7 +433,10 @@ int ShardHeader::GetLastPageIdByType(const int &shard_id, const std::string &pag
Status ShardHeader::GetPageByGroupId(const int &group_id, const int &shard_id, std::shared_ptr<Page> *page_ptr) { Status ShardHeader::GetPageByGroupId(const int &group_id, const int &shard_id, std::shared_ptr<Page> *page_ptr) {
RETURN_UNEXPECTED_IF_NULL(page_ptr); RETURN_UNEXPECTED_IF_NULL(page_ptr);
CHECK_FAIL_RETURN_UNEXPECTED(shard_id < static_cast<int>(pages_.size()), "Shard id is more than sum of shards."); CHECK_FAIL_RETURN_UNEXPECTED(shard_id < static_cast<int>(pages_.size()),
"[Internal ERROR] 'shard_id': " + std::to_string(shard_id) +
" should be smaller than the size of 'pages_': " + std::to_string(pages_.size()) +
".");
for (uint64_t i = pages_[shard_id].size(); i >= 1; i--) { for (uint64_t i = pages_[shard_id].size(); i >= 1; i--) {
auto page = pages_[shard_id][i - 1]; auto page = pages_[shard_id][i - 1];
if (page->GetPageType() == kPageTypeBlob && page->GetPageTypeID() == group_id) { if (page->GetPageType() == kPageTypeBlob && page->GetPageTypeID() == group_id) {
@ -428,17 +445,17 @@ Status ShardHeader::GetPageByGroupId(const int &group_id, const int &shard_id, s
} }
} }
page_ptr = nullptr; page_ptr = nullptr;
RETURN_STATUS_UNEXPECTED("Failed to get Page, 'group_id': " + std::to_string(group_id)); RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to get Page, 'group_id': " + std::to_string(group_id));
} }
int ShardHeader::AddSchema(std::shared_ptr<Schema> schema) { int ShardHeader::AddSchema(std::shared_ptr<Schema> schema) {
if (schema == nullptr) { if (schema == nullptr) {
MS_LOG(ERROR) << "The pointer of schema is null."; MS_LOG(ERROR) << "[Internal ERROR] The pointer of schema is NULL.";
return -1; return -1;
} }
if (!schema_.empty()) { if (!schema_.empty()) {
MS_LOG(ERROR) << "The schema can not be added twice."; MS_LOG(ERROR) << "The schema is added repeatedly. Please remove the redundant 'add_schema' function.";
return -1; return -1;
} }
@ -474,11 +491,16 @@ std::shared_ptr<Index> ShardHeader::InitIndexPtr() {
Status ShardHeader::CheckIndexField(const std::string &field, const json &schema) { Status ShardHeader::CheckIndexField(const std::string &field, const json &schema) {
// check field name is or is not valid // check field name is or is not valid
CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field) != schema.end(), CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field) != schema.end(),
"Invalid input, field [" + field + "] can not found in schema."); "Invalid input, 'index_fields': " + field + " can not found in schema: " +
schema.dump() + ".\n Please use 'add_index' function to add proper 'index_fields'.");
CHECK_FAIL_RETURN_UNEXPECTED(schema[field]["type"] != "Bytes", CHECK_FAIL_RETURN_UNEXPECTED(schema[field]["type"] != "Bytes",
"Invalid input, byte type field [" + field + "] can not set as an index field."); "Invalid input, type of 'index_fields': " + field +
" is bytes and can not set as an 'index_fields'.\n Please use 'add_index' function to "
"add the other 'index_fields'.");
CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field) == schema.end() || schema[field].find("shape") == schema[field].end(), CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field) == schema.end() || schema[field].find("shape") == schema[field].end(),
"Invalid input, array type field [" + field + "] can not set as an index field."); "Invalid input, type of 'index_fields': " + field +
" is array and can not set as an 'index_fields'.\n Please use 'add_index' function to "
"add the other 'index_fields'.");
return Status::OK(); return Status::OK();
} }
@ -486,7 +508,8 @@ Status ShardHeader::AddIndexFields(const std::vector<std::string> &fields) {
if (fields.empty()) { if (fields.empty()) {
return Status::OK(); return Status::OK();
} }
CHECK_FAIL_RETURN_UNEXPECTED(!GetSchemas().empty(), "Invalid data, schema is empty."); CHECK_FAIL_RETURN_UNEXPECTED(!GetSchemas().empty(),
"Invalid data, schema is empty. Please use 'add_schema' function to add schema first.");
// create index Object // create index Object
std::shared_ptr<Index> index = InitIndexPtr(); std::shared_ptr<Index> index = InitIndexPtr();
for (const auto &schemaPtr : schema_) { for (const auto &schemaPtr : schema_) {
@ -499,8 +522,9 @@ Status ShardHeader::AddIndexFields(const std::vector<std::string> &fields) {
field_set.insert(item.second); field_set.insert(item.second);
} }
for (const auto &field : fields) { for (const auto &field : fields) {
CHECK_FAIL_RETURN_UNEXPECTED(field_set.find(field) == field_set.end(), CHECK_FAIL_RETURN_UNEXPECTED(
"Invalid data, the same index field [" + field + "] can not added twice."); field_set.find(field) == field_set.end(),
"The 'index_fields': " + field + " is added repeatedly. Please remove the redundant 'add_index' function.");
// check field name is or is not valid // check field name is or is not valid
RETURN_IF_NOT_OK(CheckIndexField(field, schema)); RETURN_IF_NOT_OK(CheckIndexField(field, schema));
field_set.insert(field); field_set.insert(field);
@ -517,7 +541,7 @@ Status ShardHeader::GetAllSchemaID(std::set<uint64_t> &bucket_count) {
for (const auto &schema : schema_) { for (const auto &schema : schema_) {
auto schema_id = schema->GetSchemaID(); auto schema_id = schema->GetSchemaID();
CHECK_FAIL_RETURN_UNEXPECTED(bucket_count.find(schema_id) == bucket_count.end(), CHECK_FAIL_RETURN_UNEXPECTED(bucket_count.find(schema_id) == bucket_count.end(),
"Invalid data, duplicate schema exist, schema id: " + std::to_string(schema_id)); "[Internal ERROR] duplicate schema exist, schema id: " + std::to_string(schema_id));
bucket_count.insert(schema_id); bucket_count.insert(schema_id);
} }
return Status::OK(); return Status::OK();
@ -539,19 +563,21 @@ Status ShardHeader::AddIndexFields(std::vector<std::pair<uint64_t, std::string>>
} }
for (const auto &field : fields) { for (const auto &field : fields) {
CHECK_FAIL_RETURN_UNEXPECTED(field_set.find(field) == field_set.end(), CHECK_FAIL_RETURN_UNEXPECTED(field_set.find(field) == field_set.end(),
"Invalid data, the same index field [" + field.second + "] can not added twice."); "The 'index_fields': " + field.second +
" is added repeatedly. Please remove the redundant 'add_index' function.");
uint64_t schema_id = field.first; uint64_t schema_id = field.first;
std::string field_name = field.second; std::string field_name = field.second;
// check schemaId is or is not valid // check schemaId is or is not valid
CHECK_FAIL_RETURN_UNEXPECTED(bucket_count.find(schema_id) != bucket_count.end(), CHECK_FAIL_RETURN_UNEXPECTED(bucket_count.find(schema_id) != bucket_count.end(),
"Invalid data, schema id [" + std::to_string(schema_id) + "] is invalid."); "[Internal ERROR] 'schema_id': " + std::to_string(schema_id) + " can not found.");
// check field name is or is not valid // check field name is or is not valid
std::shared_ptr<Schema> schema_ptr; std::shared_ptr<Schema> schema_ptr;
RETURN_IF_NOT_OK(GetSchemaByID(schema_id, &schema_ptr)); RETURN_IF_NOT_OK(GetSchemaByID(schema_id, &schema_ptr));
json schema = schema_ptr->GetSchema().at("schema"); json schema = schema_ptr->GetSchema().at("schema");
CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field_name) != schema.end(), CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field_name) != schema.end(),
"Invalid data, field [" + field_name + "] is not found in schema."); "Invalid input, 'index_fields': " + field_name + " can not found in schema: " +
schema.dump() + ".\n Please use 'add_index' function to add proper 'index_fields'.");
RETURN_IF_NOT_OK(CheckIndexField(field_name, schema)); RETURN_IF_NOT_OK(CheckIndexField(field_name, schema));
field_set.insert(field); field_set.insert(field);
// add field into index // add field into index
@ -580,7 +606,7 @@ Status ShardHeader::GetSchemaByID(int64_t schema_id, std::shared_ptr<Schema> *sc
RETURN_UNEXPECTED_IF_NULL(schema_ptr); RETURN_UNEXPECTED_IF_NULL(schema_ptr);
int64_t schema_size = schema_.size(); int64_t schema_size = schema_.size();
CHECK_FAIL_RETURN_UNEXPECTED(schema_id >= 0 && schema_id < schema_size, CHECK_FAIL_RETURN_UNEXPECTED(schema_id >= 0 && schema_id < schema_size,
"Invalid data, schema id [" + std::to_string(schema_id) + "] is not in range [0, " + "[Internal ERROR] 'schema_id': " + std::to_string(schema_id) + " is not in range [0, " +
std::to_string(schema_size) + ")."); std::to_string(schema_size) + ").");
*schema_ptr = schema_.at(schema_id); *schema_ptr = schema_.at(schema_id);
return Status::OK(); return Status::OK();
@ -590,18 +616,20 @@ Status ShardHeader::GetStatisticByID(int64_t statistic_id, std::shared_ptr<Stati
RETURN_UNEXPECTED_IF_NULL(statistics_ptr); RETURN_UNEXPECTED_IF_NULL(statistics_ptr);
int64_t statistics_size = statistics_.size(); int64_t statistics_size = statistics_.size();
CHECK_FAIL_RETURN_UNEXPECTED(statistic_id >= 0 && statistic_id < statistics_size, CHECK_FAIL_RETURN_UNEXPECTED(statistic_id >= 0 && statistic_id < statistics_size,
"Invalid data, statistic id [" + std::to_string(statistic_id) + "[Internal ERROR] 'statistic_id': " + std::to_string(statistic_id) +
"] is not in range [0, " + std::to_string(statistics_size) + ")."); " is not in range [0, " + std::to_string(statistics_size) + ").");
*statistics_ptr = statistics_.at(statistic_id); *statistics_ptr = statistics_.at(statistic_id);
return Status::OK(); return Status::OK();
} }
Status ShardHeader::PagesToFile(const std::string dump_file_name) { Status ShardHeader::PagesToFile(const std::string dump_file_name) {
auto realpath = FileUtils::GetRealPath(dump_file_name.data()); auto realpath = FileUtils::GetRealPath(dump_file_name.data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + dump_file_name); CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(),
"[Internal ERROR] Failed to get the realpath of Pages file, path: " + dump_file_name);
// write header content to file, dump whatever is in the file before // write header content to file, dump whatever is in the file before
std::ofstream page_out_handle(realpath.value(), std::ios_base::trunc | std::ios_base::out); std::ofstream page_out_handle(realpath.value(), std::ios_base::trunc | std::ios_base::out);
CHECK_FAIL_RETURN_UNEXPECTED(page_out_handle.good(), "Failed to open page file, path: " + dump_file_name); CHECK_FAIL_RETURN_UNEXPECTED(page_out_handle.good(),
"[Internal ERROR] Failed to open Pages file, path: " + dump_file_name);
auto pages = SerializePage(); auto pages = SerializePage();
for (const auto &shard_pages : pages) { for (const auto &shard_pages : pages) {
page_out_handle << shard_pages << "\n"; page_out_handle << shard_pages << "\n";
@ -615,11 +643,12 @@ Status ShardHeader::FileToPages(const std::string dump_file_name) {
v.clear(); v.clear();
} }
auto realpath = FileUtils::GetRealPath(dump_file_name.data()); auto realpath = FileUtils::GetRealPath(dump_file_name.data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + dump_file_name); CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(),
"[Internal ERROR] Failed to get the realpath of Pages file, path: " + dump_file_name);
// attempt to open the file contains the page in json // attempt to open the file contains the page in json
std::ifstream page_in_handle(realpath.value()); std::ifstream page_in_handle(realpath.value());
CHECK_FAIL_RETURN_UNEXPECTED(page_in_handle.good(), CHECK_FAIL_RETURN_UNEXPECTED(page_in_handle.good(),
"Invalid file, page file does not exist, path: " + dump_file_name); "[Internal ERROR] Pages file does not exist, path: " + dump_file_name);
std::string line; std::string line;
while (std::getline(page_in_handle, line)) { while (std::getline(page_in_handle, line)) {
RETURN_IF_NOT_OK(ParsePage(json::parse(line), -1, true)); RETURN_IF_NOT_OK(ParsePage(json::parse(line), -1, true));
@ -633,7 +662,8 @@ Status ShardHeader::Initialize(const std::shared_ptr<ShardHeader> *header_ptr, c
uint64_t &schema_id) { uint64_t &schema_id) {
RETURN_UNEXPECTED_IF_NULL(header_ptr); RETURN_UNEXPECTED_IF_NULL(header_ptr);
auto schema_ptr = Schema::Build("mindrecord", schema); auto schema_ptr = Schema::Build("mindrecord", schema);
CHECK_FAIL_RETURN_UNEXPECTED(schema_ptr != nullptr, "Failed to build schema: " + schema.dump() + "."); CHECK_FAIL_RETURN_UNEXPECTED(schema_ptr != nullptr,
"[Internal ERROR] Failed to build schema: " + schema.dump() + ".");
schema_id = (*header_ptr)->AddSchema(schema_ptr); schema_id = (*header_ptr)->AddSchema(schema_ptr);
// create index // create index
std::vector<std::pair<uint64_t, std::string>> id_index_fields; std::vector<std::pair<uint64_t, std::string>> id_index_fields;

View File

@ -100,8 +100,8 @@ def test_invalid_mindrecord():
f.write('just for test') f.write('just for test')
columns_list = ["data", "file_name", "label"] columns_list = ["data", "file_name", "label"]
num_readers = 4 num_readers = 4
with pytest.raises(RuntimeError, match="Unexpected error. Invalid file " with pytest.raises(RuntimeError, match="Unexpected error. Invalid file, the size of mindrecord file header "
"content, incorrect file or file header is exceeds the upper limit."): "is larger than the upper limit."):
data_set = ds.MindDataset(file_name, columns_list, num_readers) data_set = ds.MindDataset(file_name, columns_list, num_readers)
for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True): for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
pass pass

View File

@ -123,7 +123,7 @@ def test_cifar100_to_mindrecord_directory(fixture_file):
when destination path is directory. when destination path is directory.
""" """
with pytest.raises(RuntimeError, with pytest.raises(RuntimeError,
match="Invalid file, Mindrecord files already existed in path:"): match="Invalid file, mindrecord files already exist. Please check file path:"):
cifar100_transformer = Cifar100ToMR(CIFAR100_DIR, cifar100_transformer = Cifar100ToMR(CIFAR100_DIR,
CIFAR100_DIR) CIFAR100_DIR)
cifar100_transformer.transform() cifar100_transformer.transform()
@ -134,7 +134,7 @@ def test_cifar100_to_mindrecord_filename_equals_cifar100(fixture_file):
when destination path equals source path. when destination path equals source path.
""" """
with pytest.raises(RuntimeError, with pytest.raises(RuntimeError,
match="Invalid file, Mindrecord files already existed in path:"): match="Invalid file, mindrecord files already exist. Please check file path:"):
cifar100_transformer = Cifar100ToMR(CIFAR100_DIR, cifar100_transformer = Cifar100ToMR(CIFAR100_DIR,
CIFAR100_DIR + "/train") CIFAR100_DIR + "/train")
cifar100_transformer.transform() cifar100_transformer.transform()

View File

@ -135,7 +135,7 @@ def test_cifar10_to_mindrecord_directory(fixture_file):
when destination path is directory. when destination path is directory.
""" """
with pytest.raises(RuntimeError, with pytest.raises(RuntimeError,
match="Unexpected error. Invalid file, Mindrecord files already existed in path:"): match="Unexpected error. Invalid file, mindrecord files already exist. Please check file path:"):
cifar10_transformer = Cifar10ToMR(CIFAR10_DIR, CIFAR10_DIR) cifar10_transformer = Cifar10ToMR(CIFAR10_DIR, CIFAR10_DIR)
cifar10_transformer.transform() cifar10_transformer.transform()
@ -146,7 +146,7 @@ def test_cifar10_to_mindrecord_filename_equals_cifar10():
when destination path equals source path. when destination path equals source path.
""" """
with pytest.raises(RuntimeError, with pytest.raises(RuntimeError,
match="Unexpected error. Invalid file, Mindrecord files already existed in path:"): match="Unexpected error. Invalid file, mindrecord files already exist. Please check file path:"):
cifar10_transformer = Cifar10ToMR(CIFAR10_DIR, cifar10_transformer = Cifar10ToMR(CIFAR10_DIR,
CIFAR10_DIR + "/data_batch_0") CIFAR10_DIR + "/data_batch_0")
cifar10_transformer.transform() cifar10_transformer.transform()

View File

@ -1224,7 +1224,7 @@ def test_cv_file_overwrite_exception_01():
"label": {"type": "int64"}, "data": {"type": "bytes"}} "label": {"type": "int64"}, "data": {"type": "bytes"}}
writer.add_schema(cv_schema_json, "img_schema") writer.add_schema(cv_schema_json, "img_schema")
writer.write_raw_data(data) writer.write_raw_data(data)
assert 'Unexpected error. Invalid file, Mindrecord files already existed in path:' in str(err.value) assert 'Unexpected error. Invalid file, mindrecord files already exist. Please check file path:' in str(err.value)
remove_multi_files(mindrecord_file_name, FILES_NUM) remove_multi_files(mindrecord_file_name, FILES_NUM)
def test_cv_file_overwrite_exception_02(): def test_cv_file_overwrite_exception_02():
@ -1243,5 +1243,5 @@ def test_cv_file_overwrite_exception_02():
"label": {"type": "int64"}, "data": {"type": "bytes"}} "label": {"type": "int64"}, "data": {"type": "bytes"}}
writer.add_schema(cv_schema_json, "img_schema") writer.add_schema(cv_schema_json, "img_schema")
writer.write_raw_data(data) writer.write_raw_data(data)
assert 'Unexpected error. Invalid file, Mindrecord files already existed in path:' in str(err.value) assert 'Unexpected error. Invalid file, mindrecord files already exist. Please check file path:' in str(err.value)
remove_multi_files(mindrecord_file_name, FILES_NUM) remove_multi_files(mindrecord_file_name, FILES_NUM)

View File

@ -234,7 +234,7 @@ def test_invalid_mindrecord():
f.write(dummy) f.write(dummy)
with pytest.raises(RuntimeError) as err: with pytest.raises(RuntimeError) as err:
FileReader(file_name) FileReader(file_name)
assert "Unexpected error. Invalid file content, incorrect file or file header" in str(err.value) assert "Invalid file, the size of mindrecord file header is larger than the upper limit." in str(err.value)
remove_file(file_name) remove_file(file_name)
def test_invalid_db(): def test_invalid_db():
@ -264,7 +264,7 @@ def test_overwrite_invalid_mindrecord():
f.write('just for test') f.write('just for test')
with pytest.raises(RuntimeError) as err: with pytest.raises(RuntimeError) as err:
create_cv_mindrecord(1, file_name) create_cv_mindrecord(1, file_name)
assert 'Unexpected error. Invalid file, Mindrecord files already existed in path:' in str(err.value) assert 'Unexpected error. Invalid file, mindrecord files already exist. Please check file path:' in str(err.value)
remove_file(file_name) remove_file(file_name)
def test_overwrite_invalid_db(): def test_overwrite_invalid_db():
@ -278,7 +278,7 @@ def test_overwrite_invalid_db():
f.write('just for test') f.write('just for test')
with pytest.raises(RuntimeError) as err: with pytest.raises(RuntimeError) as err:
create_cv_mindrecord(1, file_name) create_cv_mindrecord(1, file_name)
assert 'Unexpected error. Invalid file, Mindrecord files already existed in path:' in str(err.value) assert 'Unexpected error. Invalid file, mindrecord files already exist. Please check file path:' in str(err.value)
remove_file(file_name) remove_file(file_name)
def test_read_after_close(): def test_read_after_close():
@ -560,7 +560,8 @@ def test_write_with_invalid_data():
mindrecord_file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0] mindrecord_file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
# field: file_name => filename # field: file_name => filename
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."): with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, " \
"the number of schema should be positive but got:"):
remove_one_file(mindrecord_file_name) remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db") remove_one_file(mindrecord_file_name + ".db")
@ -594,43 +595,9 @@ def test_write_with_invalid_data():
writer.write_raw_data(data) writer.write_raw_data(data)
writer.commit() writer.commit()
# field: mask => masks
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
data = [{"file_name": "001.jpg", "label": 43, "score": 0.8, "masks": np.array([3, 6, 9], dtype=np.int64),
"segments": np.array([[5.0, 1.6], [65.2, 8.3]], dtype=np.float32),
"data": bytes("image bytes abc", encoding='UTF-8')},
{"file_name": "002.jpg", "label": 91, "score": 5.4, "masks": np.array([1, 4, 7], dtype=np.int64),
"segments": np.array([[5.1, 9.1], [2.0, 65.4]], dtype=np.float32),
"data": bytes("image bytes def", encoding='UTF-8')},
{"file_name": "003.jpg", "label": 61, "score": 6.4, "masks": np.array([7, 6, 3], dtype=np.int64),
"segments": np.array([[0.0, 5.6], [3.0, 16.3]], dtype=np.float32),
"data": bytes("image bytes ghi", encoding='UTF-8')},
{"file_name": "004.jpg", "label": 29, "score": 8.1, "masks": np.array([2, 8, 0], dtype=np.int64),
"segments": np.array([[5.9, 7.2], [4.0, 89.0]], dtype=np.float32),
"data": bytes("image bytes jkl", encoding='UTF-8')},
{"file_name": "005.jpg", "label": 78, "score": 7.7, "masks": np.array([3, 1, 2], dtype=np.int64),
"segments": np.array([[0.6, 8.1], [5.3, 49.3]], dtype=np.float32),
"data": bytes("image bytes mno", encoding='UTF-8')},
{"file_name": "006.jpg", "label": 37, "score": 9.4, "masks": np.array([7, 6, 7], dtype=np.int64),
"segments": np.array([[4.2, 6.3], [8.9, 81.8]], dtype=np.float32),
"data": bytes("image bytes pqr", encoding='UTF-8')}
]
writer = FileWriter(mindrecord_file_name)
schema = {"file_name": {"type": "string"},
"label": {"type": "int32"},
"score": {"type": "float64"},
"mask": {"type": "int64", "shape": [-1]},
"segments": {"type": "float32", "shape": [2, 2]},
"data": {"type": "bytes"}}
writer.add_schema(schema, "data is so cool")
writer.write_raw_data(data)
writer.commit()
# field: data => image # field: data => image
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."): with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, " \
"the number of schema should be positive but got:"):
remove_one_file(mindrecord_file_name) remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db") remove_one_file(mindrecord_file_name + ".db")
@ -664,78 +631,9 @@ def test_write_with_invalid_data():
writer.write_raw_data(data) writer.write_raw_data(data)
writer.commit() writer.commit()
# field: label => labels
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
data = [{"file_name": "001.jpg", "labels": 43, "score": 0.8, "mask": np.array([3, 6, 9], dtype=np.int64),
"segments": np.array([[5.0, 1.6], [65.2, 8.3]], dtype=np.float32),
"data": bytes("image bytes abc", encoding='UTF-8')},
{"file_name": "002.jpg", "labels": 91, "score": 5.4, "mask": np.array([1, 4, 7], dtype=np.int64),
"segments": np.array([[5.1, 9.1], [2.0, 65.4]], dtype=np.float32),
"data": bytes("image bytes def", encoding='UTF-8')},
{"file_name": "003.jpg", "labels": 61, "score": 6.4, "mask": np.array([7, 6, 3], dtype=np.int64),
"segments": np.array([[0.0, 5.6], [3.0, 16.3]], dtype=np.float32),
"data": bytes("image bytes ghi", encoding='UTF-8')},
{"file_name": "004.jpg", "labels": 29, "score": 8.1, "mask": np.array([2, 8, 0], dtype=np.int64),
"segments": np.array([[5.9, 7.2], [4.0, 89.0]], dtype=np.float32),
"data": bytes("image bytes jkl", encoding='UTF-8')},
{"file_name": "005.jpg", "labels": 78, "score": 7.7, "mask": np.array([3, 1, 2], dtype=np.int64),
"segments": np.array([[0.6, 8.1], [5.3, 49.3]], dtype=np.float32),
"data": bytes("image bytes mno", encoding='UTF-8')},
{"file_name": "006.jpg", "labels": 37, "score": 9.4, "mask": np.array([7, 6, 7], dtype=np.int64),
"segments": np.array([[4.2, 6.3], [8.9, 81.8]], dtype=np.float32),
"data": bytes("image bytes pqr", encoding='UTF-8')}
]
writer = FileWriter(mindrecord_file_name)
schema = {"file_name": {"type": "string"},
"label": {"type": "int32"},
"score": {"type": "float64"},
"mask": {"type": "int64", "shape": [-1]},
"segments": {"type": "float32", "shape": [2, 2]},
"data": {"type": "bytes"}}
writer.add_schema(schema, "data is so cool")
writer.write_raw_data(data)
writer.commit()
# field: score => scores
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
data = [{"file_name": "001.jpg", "label": 43, "scores": 0.8, "mask": np.array([3, 6, 9], dtype=np.int64),
"segments": np.array([[5.0, 1.6], [65.2, 8.3]], dtype=np.float32),
"data": bytes("image bytes abc", encoding='UTF-8')},
{"file_name": "002.jpg", "label": 91, "scores": 5.4, "mask": np.array([1, 4, 7], dtype=np.int64),
"segments": np.array([[5.1, 9.1], [2.0, 65.4]], dtype=np.float32),
"data": bytes("image bytes def", encoding='UTF-8')},
{"file_name": "003.jpg", "label": 61, "scores": 6.4, "mask": np.array([7, 6, 3], dtype=np.int64),
"segments": np.array([[0.0, 5.6], [3.0, 16.3]], dtype=np.float32),
"data": bytes("image bytes ghi", encoding='UTF-8')},
{"file_name": "004.jpg", "label": 29, "scores": 8.1, "mask": np.array([2, 8, 0], dtype=np.int64),
"segments": np.array([[5.9, 7.2], [4.0, 89.0]], dtype=np.float32),
"data": bytes("image bytes jkl", encoding='UTF-8')},
{"file_name": "005.jpg", "label": 78, "scores": 7.7, "mask": np.array([3, 1, 2], dtype=np.int64),
"segments": np.array([[0.6, 8.1], [5.3, 49.3]], dtype=np.float32),
"data": bytes("image bytes mno", encoding='UTF-8')},
{"file_name": "006.jpg", "label": 37, "scores": 9.4, "mask": np.array([7, 6, 7], dtype=np.int64),
"segments": np.array([[4.2, 6.3], [8.9, 81.8]], dtype=np.float32),
"data": bytes("image bytes pqr", encoding='UTF-8')}
]
writer = FileWriter(mindrecord_file_name)
schema = {"file_name": {"type": "string"},
"label": {"type": "int32"},
"score": {"type": "float64"},
"mask": {"type": "int64", "shape": [-1]},
"segments": {"type": "float32", "shape": [2, 2]},
"data": {"type": "bytes"}}
writer.add_schema(schema, "data is so cool")
writer.write_raw_data(data)
writer.commit()
# string type with int value # string type with int value
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."): with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, " \
"the number of schema should be positive but got:"):
remove_one_file(mindrecord_file_name) remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db") remove_one_file(mindrecord_file_name + ".db")
@ -770,7 +668,8 @@ def test_write_with_invalid_data():
writer.commit() writer.commit()
# field with int64 type, but the real data is string # field with int64 type, but the real data is string
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."): with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, " \
"the number of schema should be positive but got:"):
remove_one_file(mindrecord_file_name) remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db") remove_one_file(mindrecord_file_name + ".db")
@ -805,7 +704,8 @@ def test_write_with_invalid_data():
writer.commit() writer.commit()
# bytes field is string # bytes field is string
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."): with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, " \
"the number of schema should be positive but got:"):
remove_one_file(mindrecord_file_name) remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db") remove_one_file(mindrecord_file_name + ".db")
@ -840,7 +740,8 @@ def test_write_with_invalid_data():
writer.commit() writer.commit()
# field is not numpy type # field is not numpy type
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."): with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, " \
"the number of schema should be positive but got:"):
remove_one_file(mindrecord_file_name) remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db") remove_one_file(mindrecord_file_name + ".db")
@ -875,7 +776,8 @@ def test_write_with_invalid_data():
writer.commit() writer.commit()
# not enough field # not enough field
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."): with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, " \
"the number of schema should be positive but got:"):
remove_one_file(mindrecord_file_name) remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db") remove_one_file(mindrecord_file_name + ".db")