!23310 [MD] fix error message of mindrecord

Merge pull request !23310 from liyong126/fix_log_msg
This commit is contained in:
i-robot 2021-09-16 00:45:11 +00:00 committed by Gitee
commit 57b8c8e2d9
17 changed files with 386 additions and 302 deletions

View File

@ -348,7 +348,7 @@ json ToJsonImpl(const py::handle &obj) {
}
return out;
}
MS_LOG(ERROR) << "Python to json failed, obj is: " << py::cast<std::string>(obj);
MS_LOG(ERROR) << "Failed to convert Python object to json, object is: " << py::cast<std::string>(obj);
return json();
}
} // namespace detail

View File

@ -62,22 +62,22 @@ Status GetFileName(const std::string &path, std::shared_ptr<std::string> *fn_ptr
char real_path[PATH_MAX] = {0};
char buf[PATH_MAX] = {0};
if (strncpy_s(buf, PATH_MAX, common::SafeCStr(path), path.length()) != EOK) {
RETURN_STATUS_UNEXPECTED("Securec func [strncpy_s] failed, path: " + path);
RETURN_STATUS_UNEXPECTED("Failed to call securec func [strncpy_s], path: " + path);
}
char tmp[PATH_MAX] = {0};
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(tmp, dirname(&(buf[0])), PATH_MAX) == nullptr) {
RETURN_STATUS_UNEXPECTED("Invalid file path, path: " + std::string(buf));
RETURN_STATUS_UNEXPECTED("Invalid file, path: " + std::string(buf));
}
if (_fullpath(real_path, common::SafeCStr(path), PATH_MAX) == nullptr) {
MS_LOG(DEBUG) << "Path: " << common::SafeCStr(path) << "check successfully";
MS_LOG(DEBUG) << "Path: " << common::SafeCStr(path) << "check success.";
}
#else
if (realpath(dirname(&(buf[0])), tmp) == nullptr) {
RETURN_STATUS_UNEXPECTED(std::string("Invalid file path, path: ") + buf);
RETURN_STATUS_UNEXPECTED(std::string("Invalid file, path: ") + buf);
}
if (realpath(common::SafeCStr(path), real_path) == nullptr) {
MS_LOG(DEBUG) << "Path: " << path << "check successfully";
MS_LOG(DEBUG) << "Path: " << path << "check success.";
}
#endif
std::string s = real_path;
@ -102,17 +102,17 @@ Status GetParentDir(const std::string &path, std::shared_ptr<std::string> *pd_pt
char tmp[PATH_MAX] = {0};
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(tmp, dirname(&(buf[0])), PATH_MAX) == nullptr) {
RETURN_STATUS_UNEXPECTED("Invalid file path, path: " + std::string(buf));
RETURN_STATUS_UNEXPECTED("Invalid file, path: " + std::string(buf));
}
if (_fullpath(real_path, common::SafeCStr(path), PATH_MAX) == nullptr) {
MS_LOG(DEBUG) << "Path: " << common::SafeCStr(path) << "check successfully";
MS_LOG(DEBUG) << "Path: " << common::SafeCStr(path) << "check success.";
}
#else
if (realpath(dirname(&(buf[0])), tmp) == nullptr) {
RETURN_STATUS_UNEXPECTED(std::string("Invalid file path, path: ") + buf);
RETURN_STATUS_UNEXPECTED(std::string("Invalid file, path: ") + buf);
}
if (realpath(common::SafeCStr(path), real_path) == nullptr) {
MS_LOG(DEBUG) << "Path: " << path << "check successfully";
MS_LOG(DEBUG) << "Path: " << path << "check success.";
}
#endif
std::string s = real_path;
@ -173,7 +173,7 @@ Status GetDiskSize(const std::string &str_dir, const DiskSizeType &disk_type, st
uint64_t ll_count = 0;
struct statfs disk_info;
if (statfs(common::SafeCStr(str_dir), &disk_info) == -1) {
RETURN_STATUS_UNEXPECTED("Get disk size error.");
RETURN_STATUS_UNEXPECTED("Failed to get disk size.");
}
switch (disk_type) {

View File

@ -56,7 +56,7 @@ ShardReader::ShardReader()
Status ShardReader::GetMeta(const std::string &file_path, std::shared_ptr<json> meta_data_ptr,
std::shared_ptr<std::vector<std::string>> *addresses_ptr) {
RETURN_UNEXPECTED_IF_NULL(addresses_ptr);
CHECK_FAIL_RETURN_UNEXPECTED(IsLegalFile(file_path), "Invalid file path: " + file_path);
CHECK_FAIL_RETURN_UNEXPECTED(IsLegalFile(file_path), "Invalid file, path: " + file_path);
std::shared_ptr<json> header_ptr;
RETURN_IF_NOT_OK(ShardHeader::BuildSingleHeader(file_path, &header_ptr));
@ -79,13 +79,14 @@ Status ShardReader::Init(const std::vector<std::string> &file_paths, bool load_d
} else if (file_paths.size() >= 1 && load_dataset == false) {
file_paths_ = file_paths;
} else {
RETURN_STATUS_UNEXPECTED("Error in parameter file_path or load_dataset.");
RETURN_STATUS_UNEXPECTED("Invalid data, number of MindRecord files [" + std::to_string(file_paths.size()) +
"] or 'load_dataset' [" + std::to_string(load_dataset) + "]is invalid.");
}
for (const auto &file : file_paths_) {
auto meta_data_ptr = std::make_shared<json>();
RETURN_IF_NOT_OK(GetMeta(file, meta_data_ptr, &addresses_ptr));
CHECK_FAIL_RETURN_UNEXPECTED(*meta_data_ptr == *first_meta_data_ptr,
"Mindrecord files meta information is different.");
"Invalid data, MindRecord files meta data is not consistent.");
sqlite3 *db = nullptr;
RETURN_IF_NOT_OK(VerifyDataset(&db, file));
database_paths_.push_back(db);
@ -113,20 +114,21 @@ Status ShardReader::Init(const std::vector<std::string> &file_paths, bool load_d
if (num_rows_ > LAZY_LOAD_THRESHOLD) {
lazy_load_ = true;
MS_LOG(WARNING) << "The number of samples is larger than " << LAZY_LOAD_THRESHOLD
<< ", enable lazy load mode. If you want to speed up data loading, "
<< "it is recommended that you save multiple samples into one record when creating mindrecord file,"
<< " so that you can enable fast loading mode, and don't forget to adjust your batch size "
<< "according to the current samples.";
MS_LOG(WARNING)
<< "The number of samples is larger than " << LAZY_LOAD_THRESHOLD
<< ", enable lazy load mode. If you want to speed up data loading, "
<< "it is recommended that you save multiple samples into one record when creating MindRecord files,"
<< " so that you can enable fast loading mode, and don't forget to adjust your batch size "
<< "according to the current samples.";
}
auto disk_size = page_size_ * row_group_summary.size();
auto compression_size = shard_header_->GetCompressionSize();
total_blob_size_ = disk_size + compression_size;
MS_LOG(INFO) << "Blob data size, on disk: " << disk_size << " , additional uncompression: " << compression_size
<< " , Total: " << total_blob_size_;
MS_LOG(INFO) << "Blob data size on disk: " << disk_size << " , additional uncompression size: " << compression_size
<< " , Total blob size: " << total_blob_size_;
MS_LOG(INFO) << "Get meta from mindrecord file & index file successfully.";
MS_LOG(INFO) << "Succeed to get meta from mindrecord file & index file.";
return Status::OK();
}
@ -135,26 +137,27 @@ Status ShardReader::VerifyDataset(sqlite3 **db, const string &file) {
// sqlite3_open create a database if not found, use sqlite3_open_v2 instead of it
CHECK_FAIL_RETURN_UNEXPECTED(
sqlite3_open_v2(common::SafeCStr(file + ".db"), db, SQLITE_OPEN_READONLY, nullptr) == SQLITE_OK,
"Invalid database file: " + file + ".db, error: " + sqlite3_errmsg(*db));
MS_LOG(DEBUG) << "Opened database successfully";
"Invalid database file, path: " + file + ".db, " + sqlite3_errmsg(*db));
MS_LOG(DEBUG) << "Succeed to Open database, path: " << file << ".db.";
string sql = "SELECT NAME from SHARD_NAME;";
std::vector<std::vector<std::string>> name;
char *errmsg = nullptr;
if (sqlite3_exec(*db, common::SafeCStr(sql), SelectCallback, &name, &errmsg) != SQLITE_OK) {
std::ostringstream oss;
oss << "Error in execute sql: [ " << sql + " ], error: " << errmsg;
oss << "Failed to execute sql [ " << sql + " ], " << errmsg;
sqlite3_free(errmsg);
sqlite3_close(*db);
RETURN_STATUS_UNEXPECTED(oss.str());
} else {
MS_LOG(DEBUG) << "Get " << static_cast<int>(name.size()) << " records from index.";
MS_LOG(DEBUG) << "Succeed to get " << static_cast<int>(name.size()) << " records from index.";
std::shared_ptr<std::string> fn_ptr;
RETURN_IF_NOT_OK(GetFileName(file, &fn_ptr));
if (name.empty() || name[0][0] != *fn_ptr) {
sqlite3_free(errmsg);
sqlite3_close(*db);
RETURN_STATUS_UNEXPECTED("Invalid file, DB file can not match file: " + file);
RETURN_STATUS_UNEXPECTED("Invalid database file, shard name [" + *fn_ptr + "] can not match [" + name[0][0] +
"].");
}
}
return Status::OK();
@ -171,7 +174,7 @@ Status ShardReader::CheckColumnList(const std::vector<std::string> &selected_col
}
}
CHECK_FAIL_RETURN_UNEXPECTED(!std::any_of(std::begin(inSchema), std::end(inSchema), [](int x) { return x == 0; }),
"Column not found in schema.");
"Invalid data, column is not found in schema.");
return Status::OK();
}
@ -186,7 +189,7 @@ Status ShardReader::Open() {
}
auto realpath = FileUtils::GetRealPath(dir.value().data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Get real path failed, path=" + file);
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + file);
std::optional<std::string> whole_path = "";
FileUtils::ConcatDirAndFileName(&realpath, &local_file_name, &whole_path);
@ -194,12 +197,11 @@ Status ShardReader::Open() {
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(whole_path.value(), std::ios::in | std::ios::binary);
if (!fs->good()) {
CHECK_FAIL_RETURN_UNEXPECTED(
!fs->fail(),
"Maybe reach the maximum number of open files, use \"ulimit -a\" to view \"open files\" and further resize");
RETURN_STATUS_UNEXPECTED("Failed to open file: " + file);
RETURN_STATUS_UNEXPECTED(
"Failed to open file: " + file +
", reach the maximum number of open files, use \"ulimit -a\" to view \"open files\" and further resize");
}
MS_LOG(INFO) << "Open shard file successfully.";
MS_LOG(INFO) << "Succeed to open shard file.";
file_streams_.push_back(fs);
}
return Status::OK();
@ -218,7 +220,7 @@ Status ShardReader::Open(int n_consumer) {
}
auto realpath = FileUtils::GetRealPath(dir.value().data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Get real path failed, path=" + file);
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + file);
std::optional<std::string> whole_path = "";
FileUtils::ConcatDirAndFileName(&realpath, &local_file_name, &whole_path);
@ -226,14 +228,13 @@ Status ShardReader::Open(int n_consumer) {
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(whole_path.value(), std::ios::in | std::ios::binary);
if (!fs->good()) {
CHECK_FAIL_RETURN_UNEXPECTED(
!fs->fail(),
"Maybe reach the maximum number of open files, use \"ulimit -a\" to view \"open files\" and further resize");
RETURN_STATUS_UNEXPECTED("Failed to open file: " + file);
RETURN_STATUS_UNEXPECTED(
"Failed to open file: " + file +
", reach the maximum number of open files, use \"ulimit -a\" to view \"open files\" and further resize");
}
file_streams_random_[j].push_back(fs);
}
MS_LOG(INFO) << "Open shard file successfully.";
MS_LOG(INFO) << "Succeed to open file, path: " << file;
}
return Status::OK();
}
@ -255,7 +256,7 @@ void ShardReader::FileStreamsOperator() {
if (database_paths_[i] != nullptr) {
auto ret = sqlite3_close(database_paths_[i]);
if (ret != SQLITE_OK) {
MS_LOG(ERROR) << "Close db failed. Error code: " << ret << ".";
MS_LOG(ERROR) << "Failed to close database, error code: " << ret << ".";
}
database_paths_[i] = nullptr;
}
@ -387,13 +388,15 @@ Status ShardReader::ConvertLabelToJson(const std::vector<std::vector<std::string
}
} catch (std::out_of_range &e) {
fs->close();
RETURN_STATUS_UNEXPECTED("Out of range: " + std::string(e.what()));
RETURN_STATUS_UNEXPECTED("Out of range exception raised in ConvertLabelToJson function, " +
std::string(e.what()));
} catch (std::invalid_argument &e) {
fs->close();
RETURN_STATUS_UNEXPECTED("Invalid argument: " + std::string(e.what()));
RETURN_STATUS_UNEXPECTED("Invalid argument exception raised in ConvertLabelToJson function, " +
std::string(e.what()));
} catch (...) {
fs->close();
RETURN_STATUS_UNEXPECTED("Exception was caught while convert label to json.");
RETURN_STATUS_UNEXPECTED("Unknown exception raised in ConvertLabelToJson function");
}
}
@ -410,20 +413,21 @@ Status ShardReader::ReadAllRowsInShard(int shard_id, const std::string &sql, con
int rc = sqlite3_exec(db, common::SafeCStr(sql), SelectCallback, &labels, &errmsg);
if (rc != SQLITE_OK) {
std::ostringstream oss;
oss << "Error in execute sql: [ " << sql + " ], error: " << errmsg;
oss << "Failed to execute sql [ " << sql + " ], " << errmsg;
sqlite3_free(errmsg);
sqlite3_close(db);
db = nullptr;
RETURN_STATUS_UNEXPECTED(oss.str());
}
MS_LOG(INFO) << "Get " << static_cast<int>(labels.size()) << " records from shard " << shard_id << " index.";
MS_LOG(INFO) << "Succeed to get " << static_cast<int>(labels.size()) << " records from shard "
<< std::to_string(shard_id) << " index.";
std::string file_name = file_paths_[shard_id];
auto realpath = FileUtils::GetRealPath(file_name.data());
if (!realpath.has_value()) {
sqlite3_free(errmsg);
sqlite3_close(db);
RETURN_STATUS_UNEXPECTED("Get real path failed, path=" + file_name);
RETURN_STATUS_UNEXPECTED("Failed to get real path, path: " + file_name);
}
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
@ -432,7 +436,7 @@ Status ShardReader::ReadAllRowsInShard(int shard_id, const std::string &sql, con
if (!fs->good()) {
sqlite3_free(errmsg);
sqlite3_close(db);
RETURN_STATUS_UNEXPECTED("Invalid file, failed to open file: " + file_name);
RETURN_STATUS_UNEXPECTED("Failed to open file, path: " + file_name);
}
}
sqlite3_free(errmsg);
@ -446,7 +450,7 @@ Status ShardReader::GetAllClasses(const std::string &category_field,
index_columns[field.second] = field.first;
}
CHECK_FAIL_RETURN_UNEXPECTED(index_columns.find(category_field) != index_columns.end(),
"Index field " + category_field + " does not exist.");
"Invalid data, index field " + category_field + " does not exist.");
std::shared_ptr<std::string> fn_ptr;
RETURN_IF_NOT_OK(
ShardIndexGenerator::GenerateFieldName(std::make_pair(index_columns[category_field], category_field), &fn_ptr));
@ -474,10 +478,11 @@ void ShardReader::GetClassesInShard(sqlite3 *db, int shard_id, const std::string
sqlite3_free(errmsg);
sqlite3_close(db);
db = nullptr;
MS_LOG(ERROR) << "Error in select sql statement, sql: " << common::SafeCStr(sql) << ", error: " << errmsg;
MS_LOG(ERROR) << "Failed to execute sql [ " << common::SafeCStr(sql) << " ], " << errmsg;
return;
}
MS_LOG(INFO) << "Get " << static_cast<int>(columns.size()) << " records from shard " << shard_id << " index.";
MS_LOG(INFO) << "Succeed to get " << static_cast<int>(columns.size()) << " records from shard "
<< std::to_string(shard_id) << " index.";
std::lock_guard<std::mutex> lck(shard_locker_);
for (int i = 0; i < static_cast<int>(columns.size()); ++i) {
category_ptr->emplace(columns[i][0]);
@ -620,13 +625,13 @@ std::vector<std::vector<uint64_t>> ShardReader::GetImageOffset(int page_id, int
char *errmsg = nullptr;
int rc = sqlite3_exec(db, common::SafeCStr(sql), SelectCallback, &image_offsets, &errmsg);
if (rc != SQLITE_OK) {
MS_LOG(ERROR) << "Error in select statement, sql: " << sql << ", error: " << errmsg;
MS_LOG(ERROR) << "Failed to execute sql [ " << common::SafeCStr(sql) << " ], " << errmsg;
sqlite3_free(errmsg);
sqlite3_close(db);
db = nullptr;
return std::vector<std::vector<uint64_t>>();
} else {
MS_LOG(DEBUG) << "Get " << static_cast<int>(image_offsets.size()) << " records from index.";
MS_LOG(DEBUG) << "Succeed to get " << static_cast<int>(image_offsets.size()) << " records from index.";
}
std::vector<std::vector<uint64_t>> res;
for (int i = static_cast<int>(image_offsets.size()) - 1; i >= 0; i--) res.emplace_back(std::vector<uint64_t>{0, 0});
@ -665,9 +670,9 @@ Status ShardReader::GetPagesByCategory(int shard_id, const std::pair<std::string
sqlite3_free(errmsg);
sqlite3_close(db);
db = nullptr;
RETURN_STATUS_UNEXPECTED("Error in select statement, sql: " + sql + ", error: " + ss);
RETURN_STATUS_UNEXPECTED(std::string("Failed to execute sql [") + common::SafeCStr(sql) + " ], " + ss);
} else {
MS_LOG(DEBUG) << "Get " << page_ids.size() << "pages from index.";
MS_LOG(DEBUG) << "Succeed to get " << page_ids.size() << "pages from index.";
}
for (int i = 0; i < static_cast<int>(page_ids.size()); ++i) {
(*pages_ptr)->emplace_back(std::stoull(page_ids[i][0]));
@ -708,11 +713,11 @@ Status ShardReader::QueryWithCriteria(sqlite3 *db, const string &sql, const stri
std::shared_ptr<std::vector<std::vector<std::string>>> labels_ptr) {
sqlite3_stmt *stmt = nullptr;
if (sqlite3_prepare_v2(db, common::SafeCStr(sql), -1, &stmt, 0) != SQLITE_OK) {
RETURN_STATUS_UNEXPECTED(std::string("SQL error: could not prepare statement, sql: ") + sql);
RETURN_STATUS_UNEXPECTED("Failed to prepare statement sql [ " + sql + " ].");
}
int index = sqlite3_bind_parameter_index(stmt, ":criteria");
if (sqlite3_bind_text(stmt, index, common::SafeCStr(criteria), -1, SQLITE_STATIC) != SQLITE_OK) {
RETURN_STATUS_UNEXPECTED("SQL error: could not bind parameter, index: " + std::to_string(index) +
RETURN_STATUS_UNEXPECTED("Failed to bind parameter of sql, index: " + std::to_string(index) +
", field value: " + criteria);
}
int rc = sqlite3_step(stmt);
@ -735,11 +740,11 @@ Status ShardReader::GetLabelsFromBinaryFile(int shard_id, const std::vector<std:
RETURN_UNEXPECTED_IF_NULL(labels_ptr);
std::string file_name = file_paths_[shard_id];
auto realpath = FileUtils::GetRealPath(file_name.data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Get real path failed, path=" + file_name);
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path=" + file_name);
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(realpath.value(), std::ios::in | std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(fs->good(), "Invalid file, failed to open file: " + file_name);
CHECK_FAIL_RETURN_UNEXPECTED(fs->good(), "Failed to open file, path: " + file_name);
// init the return
for (unsigned int i = 0; i < label_offsets.size(); ++i) {
(*labels_ptr)->emplace_back(json{});
@ -755,13 +760,13 @@ Status ShardReader::GetLabelsFromBinaryFile(int shard_id, const std::vector<std:
auto &io_seekg = fs->seekg(page_size_ * raw_page_id + header_size_ + label_start, std::ios::beg);
if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) {
fs->close();
RETURN_STATUS_UNEXPECTED("Failed to seekg file.");
RETURN_STATUS_UNEXPECTED("Failed to seekg file, path: " + file_name);
}
auto &io_read = fs->read(reinterpret_cast<char *>(&label_raw[0]), len);
if (!io_read.good() || io_read.fail() || io_read.bad()) {
fs->close();
RETURN_STATUS_UNEXPECTED("Failed to read file.");
RETURN_STATUS_UNEXPECTED("Failed to read file, path: " + file_name);
}
json label_json = json::from_msgpack(label_raw);
@ -793,13 +798,13 @@ Status ShardReader::GetLabelsFromPage(int page_id, int shard_id, const std::vect
int rc = sqlite3_exec(db, common::SafeCStr(sql), SelectCallback, label_offset_ptr.get(), &errmsg);
if (rc != SQLITE_OK) {
std::ostringstream oss;
oss << "Error in execute sql: [ " << sql + " ], error: " << errmsg;
oss << "Failed to execute sql [ " << common::SafeCStr(sql) << " ], " << errmsg;
sqlite3_free(errmsg);
sqlite3_close(db);
db = nullptr;
RETURN_STATUS_UNEXPECTED(oss.str());
}
MS_LOG(DEBUG) << "Get " << label_offset_ptr->size() << " records from index.";
MS_LOG(DEBUG) << "Succeed to get " << label_offset_ptr->size() << " records from index.";
sqlite3_free(errmsg);
}
// get labels from binary file
@ -832,13 +837,13 @@ Status ShardReader::GetLabels(int page_id, int shard_id, const std::vector<std::
int rc = sqlite3_exec(db, common::SafeCStr(sql), SelectCallback, labels.get(), &errmsg);
if (rc != SQLITE_OK) {
std::ostringstream oss;
oss << "Error in execute sql: [ " << sql + " ], error: " << errmsg;
oss << "Failed to execute sql [ " << common::SafeCStr(sql) << " ], " << errmsg;
sqlite3_free(errmsg);
sqlite3_close(db);
db = nullptr;
RETURN_STATUS_UNEXPECTED(oss.str());
} else {
MS_LOG(DEBUG) << "Get " << static_cast<int>(labels->size()) << " records from index.";
MS_LOG(DEBUG) << "Succeed to get " << static_cast<int>(labels->size()) << " records from index.";
}
sqlite3_free(errmsg);
}
@ -885,7 +890,7 @@ int64_t ShardReader::GetNumClasses(const std::string &category_field) {
}
if (map_schema_id_fields.find(category_field) == map_schema_id_fields.end()) {
MS_LOG(ERROR) << "Field " << category_field << " does not exist.";
MS_LOG(ERROR) << "Invalid data, field " << category_field << " does not exist.";
return -1;
}
std::shared_ptr<std::string> fn_ptr;
@ -898,8 +903,7 @@ int64_t ShardReader::GetNumClasses(const std::string &category_field) {
for (int x = 0; x < shard_count; x++) {
int rc = sqlite3_open_v2(common::SafeCStr(file_paths_[x] + ".db"), &db, SQLITE_OPEN_READONLY, nullptr);
if (SQLITE_OK != rc) {
MS_LOG(ERROR) << "Invalid file, failed to open database: " << file_paths_[x] + ".db, error: "
<< sqlite3_errmsg(db);
MS_LOG(ERROR) << "Failed to open database: " << file_paths_[x] + ".db, " << sqlite3_errmsg(db);
return -1;
}
threads[x] = std::thread(&ShardReader::GetClassesInShard, this, db, x, sql, category_ptr);
@ -930,7 +934,6 @@ Status ShardReader::CountTotalRows(const std::vector<std::string> &file_paths, b
num_samples = op->GetNumSamples(num_samples, 0);
if (num_padded > 0 && root == true) {
num_samples += num_padded;
MS_LOG(DEBUG) << "Padding samples work on shuffle sampler.";
root = false;
}
} else if (std::dynamic_pointer_cast<ShardCategory>(op)) {
@ -943,8 +946,9 @@ Status ShardReader::CountTotalRows(const std::vector<std::string> &file_paths, b
if (tmp != 0 && num_samples != -1) {
num_samples = std::min(num_samples, tmp);
}
CHECK_FAIL_RETURN_UNEXPECTED(num_samples != -1, "Number of samples exceeds the upper limit: " +
std::to_string(std::numeric_limits<int64_t>::max()));
CHECK_FAIL_RETURN_UNEXPECTED(
num_samples != -1, "Invalid input, number of samples: " + std::to_string(num_samples) +
" exceeds the upper limit: " + std::to_string(std::numeric_limits<int64_t>::max()));
}
} else if (std::dynamic_pointer_cast<ShardSample>(op)) {
if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) {
@ -952,8 +956,9 @@ Status ShardReader::CountTotalRows(const std::vector<std::string> &file_paths, b
if (root == true) {
sampler_op->SetNumPaddedSamples(num_padded);
num_samples = op->GetNumSamples(num_samples, 0);
CHECK_FAIL_RETURN_UNEXPECTED(
num_samples != -1, "Dataset size plus number of padded samples is not divisible by number of shards.");
CHECK_FAIL_RETURN_UNEXPECTED(num_samples != -1, "Invalid data, dataset size plus number of padded samples: " +
std::to_string(num_samples) +
" can not be divisible by number of shards.");
root = false;
}
} else {
@ -1013,13 +1018,14 @@ Status ShardReader::Launch(bool is_sample_read) {
// Start provider consumer threads
thread_set_ = std::vector<std::thread>(n_consumer_);
CHECK_FAIL_RETURN_UNEXPECTED(n_consumer_ > 0 && n_consumer_ <= kMaxConsumerCount,
"Number of consumer is out of range.");
"Invalid data, number of consumer: " + std::to_string(n_consumer_) +
" exceeds the upper limit: " + std::to_string(kMaxConsumerCount));
for (int x = 0; x < n_consumer_; ++x) {
thread_set_[x] = std::thread(&ShardReader::ConsumerByRow, this, x);
}
MS_LOG(INFO) << "Launch read thread successfully.";
MS_LOG(INFO) << "Succeed to launch read thread.";
return Status::OK();
}
@ -1032,15 +1038,16 @@ Status ShardReader::CreateTasksByCategory(const std::shared_ptr<ShardOperator> &
if (std::dynamic_pointer_cast<ShardPkSample>(op)) {
num_samples = std::dynamic_pointer_cast<ShardPkSample>(op)->GetNumSamples();
CHECK_FAIL_RETURN_UNEXPECTED(
num_samples >= 0, "Invalid parameter, num_samples must be greater than or equal to 0, but got " + num_samples);
num_samples >= 0,
"Invalid input, num_samples must be greater than or equal to 0, but got " + std::to_string(num_samples));
}
CHECK_FAIL_RETURN_UNEXPECTED(num_elements > 0,
"Invalid parameter, num_elements must be greater than 0, but got " + num_elements);
CHECK_FAIL_RETURN_UNEXPECTED(
num_elements > 0, "Invalid input, num_elements must be greater than 0, but got " + std::to_string(num_elements));
if (categories.empty() == true) {
std::string category_field = category_op->GetCategoryField();
int64_t num_categories = category_op->GetNumCategories();
CHECK_FAIL_RETURN_UNEXPECTED(num_categories > 0,
"Invalid parameter, num_categories must be greater than 0, but got " + num_elements);
CHECK_FAIL_RETURN_UNEXPECTED(num_categories > 0, "Invalid input, num_categories must be greater than 0, but got " +
std::to_string(num_elements));
auto category_ptr = std::make_shared<std::set<std::string>>();
RETURN_IF_NOT_OK(GetAllClasses(category_field, category_ptr));
int i = 0;
@ -1095,12 +1102,14 @@ Status ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, int,
RETURN_IF_NOT_OK(ReadAllRowGroup(selected_columns_, &row_group_ptr));
auto &offsets = std::get<0>(*row_group_ptr);
auto &local_columns = std::get<1>(*row_group_ptr);
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxFileCount, "shard count is out of range.");
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxFileCount,
"Invalid data, number of shards: " + std::to_string(shard_count_) +
" exceeds the upper limit: " + std::to_string(kMaxFileCount));
int sample_count = 0;
for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
sample_count += offsets[shard_id].size();
}
MS_LOG(DEBUG) << "There are " << sample_count << " records in the dataset.";
MS_LOG(DEBUG) << "Succeed to get " << sample_count << " records from dataset.";
// Init the tasks_ size
tasks_.ResizeTask(sample_count);
@ -1131,9 +1140,11 @@ Status ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, int,
Status ShardReader::CreateLazyTasksByRow(const std::vector<std::tuple<int, int, int, uint64_t>> &row_group_summary,
const std::vector<std::shared_ptr<ShardOperator>> &operators) {
CheckIfColumnInIndex(selected_columns_);
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxFileCount, "shard count is out of range.");
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxFileCount,
"Invalid data, number of shards: " + std::to_string(shard_count_) +
" exceeds the upper limit: " + std::to_string(kMaxFileCount));
uint32_t sample_count = shard_sample_count_[shard_sample_count_.size() - 1];
MS_LOG(DEBUG) << "There are " << sample_count << " records in the dataset.";
MS_LOG(DEBUG) << "Succeed to get " << sample_count << " records from dataset.";
// Init the tasks_ size
tasks_.ResizeTask(sample_count);
@ -1189,7 +1200,7 @@ Status ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, uint
} else {
RETURN_IF_NOT_OK(CreateTasksByCategory(operators[category_operator]));
}
MS_LOG(DEBUG) << "Created initial list of tasks. There are " << tasks_.Size() << " to start with before sampling.";
MS_LOG(DEBUG) << "Succeed to create " << tasks_.Size() << " initial task to start with before sampling.";
tasks_.InitSampleIds();
for (uint32_t operator_no = 0; operator_no < operators.size(); operator_no++) {
@ -1206,8 +1217,8 @@ Status ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, uint
if (tasks_.permutation_.empty()) tasks_.MakePerm();
num_rows_ = tasks_.Size();
MS_LOG(INFO) << "Total rows is " << num_rows_
<< " and total amount sampled initially is: " << tasks_.sample_ids_.size();
MS_LOG(INFO) << "The total number of samples is " << num_rows_
<< ", the number of samples after sampling is: " << tasks_.sample_ids_.size();
return Status::OK();
}
@ -1216,7 +1227,9 @@ Status ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_id,
std::shared_ptr<TASK_CONTENT> *task_content_ptr) {
RETURN_UNEXPECTED_IF_NULL(task_content_ptr);
// All tasks are done
CHECK_FAIL_RETURN_UNEXPECTED(task_id < static_cast<int>(tasks_.Size()), "task id is out of range.");
CHECK_FAIL_RETURN_UNEXPECTED(
task_id < static_cast<int>(tasks_.Size()),
"Invalid data, task id: " + std::to_string(task_id) + " exceeds the upper limit: " + std::to_string(tasks_.Size()));
uint32_t shard_id = 0;
uint32_t group_id = 0;
uint32_t blob_start = 0;
@ -1259,7 +1272,7 @@ Status ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_id,
// read the blob from data file
std::shared_ptr<Page> page_ptr;
RETURN_IF_NOT_OK(shard_header_->GetPageByGroupId(group_id, shard_id, &page_ptr));
MS_LOG(DEBUG) << "Success to get page by group id.";
MS_LOG(DEBUG) << "Success to get page by group id: " << group_id;
// Pack image list
std::vector<uint8_t> images(blob_end - blob_start);
@ -1306,7 +1319,7 @@ void ShardReader::ConsumerByRow(int consumer_id) {
auto task_content_ptr =
std::make_shared<TASK_CONTENT>(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>());
if (ConsumerOneTask(tasks_.sample_ids_[sample_id_pos], consumer_id, &task_content_ptr).IsError()) {
MS_LOG(ERROR) << "Error in ConsumerOneTask.";
MS_LOG(ERROR) << "Error raised in ConsumerOneTask function.";
return;
}
const auto &batch = (*task_content_ptr).second;
@ -1407,12 +1420,12 @@ void ShardReader::ShuffleTask() {
if (std::dynamic_pointer_cast<ShardShuffle>(op) && has_sharding == false) {
auto s = (*op)(tasks_);
if (s.IsError()) {
MS_LOG(WARNING) << "Redo randomSampler failed.";
MS_LOG(WARNING) << "Failed to redo randomSampler in new epoch.";
}
} else if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) {
auto s = (*op)(tasks_);
if (s.IsError()) {
MS_LOG(WARNING) << "Redo distributeSampler failed.";
MS_LOG(WARNING) << "Failed to redo distributeSampler in new epoch.";
}
}
}

View File

@ -45,13 +45,13 @@ Status ShardSegment::GetCategoryFields(std::shared_ptr<vector<std::string>> *fie
int rc = sqlite3_exec(database_paths_[0], common::SafeCStr(sql), SelectCallback, &field_names, &errmsg);
if (rc != SQLITE_OK) {
std::ostringstream oss;
oss << "Error in select statement, sql: " << sql + ", error: " << errmsg;
oss << "Failed to execute sql [ " << common::SafeCStr(sql) << " ], " << errmsg;
sqlite3_free(errmsg);
sqlite3_close(database_paths_[0]);
database_paths_[0] = nullptr;
RETURN_STATUS_UNEXPECTED(oss.str());
} else {
MS_LOG(INFO) << "Get " << static_cast<int>(field_names.size()) << " records from index.";
MS_LOG(INFO) << "Succeed to get " << static_cast<int>(field_names.size()) << " records from index.";
}
uint32_t idx = kStartFieldId;
@ -60,7 +60,8 @@ Status ShardSegment::GetCategoryFields(std::shared_ptr<vector<std::string>> *fie
sqlite3_free(errmsg);
sqlite3_close(database_paths_[0]);
database_paths_[0] = nullptr;
RETURN_STATUS_UNEXPECTED("idx is out of range.");
RETURN_STATUS_UNEXPECTED("Invalid data, field_names size must be greater than 1, but got " +
std::to_string(field_names[idx].size()));
}
candidate_category_fields_.push_back(field_names[idx][1]);
idx += 2;
@ -79,19 +80,16 @@ Status ShardSegment::SetCategoryField(std::string category_field) {
current_category_field_ = category_field;
return Status::OK();
}
RETURN_STATUS_UNEXPECTED("Field " + category_field + " is not a candidate category field.");
RETURN_STATUS_UNEXPECTED("Invalid data, field '" + category_field + "' is not a candidate category field.");
}
Status ShardSegment::ReadCategoryInfo(std::shared_ptr<std::string> *category_ptr) {
RETURN_UNEXPECTED_IF_NULL(category_ptr);
MS_LOG(INFO) << "Read category begin";
auto category_info_ptr = std::make_shared<CATEGORY_INFO>();
RETURN_IF_NOT_OK(WrapCategoryInfo(&category_info_ptr));
// Convert category info to json string
*category_ptr = std::make_shared<std::string>(ToJsonForCategory(*category_info_ptr));
MS_LOG(INFO) << "Read category end";
return Status::OK();
}
@ -99,7 +97,7 @@ Status ShardSegment::WrapCategoryInfo(std::shared_ptr<CATEGORY_INFO> *category_i
RETURN_UNEXPECTED_IF_NULL(category_info_ptr);
std::map<std::string, int> counter;
CHECK_FAIL_RETURN_UNEXPECTED(ValidateFieldName(current_category_field_),
"Category field error from index, it is: " + current_category_field_);
"Invalid data, field: " + current_category_field_ + "is invalid.");
std::string sql = "SELECT " + current_category_field_ + ", COUNT(" + current_category_field_ +
") AS `value_occurrence` FROM indexes GROUP BY " + current_category_field_ + ";";
@ -109,13 +107,13 @@ Status ShardSegment::WrapCategoryInfo(std::shared_ptr<CATEGORY_INFO> *category_i
char *errmsg = nullptr;
if (sqlite3_exec(db, common::SafeCStr(sql), SelectCallback, &field_count, &errmsg) != SQLITE_OK) {
std::ostringstream oss;
oss << "Error in select statement, sql: " << sql + ", error: " << errmsg;
oss << "Failed to execute sql [ " << common::SafeCStr(sql) << " ], " << errmsg;
sqlite3_free(errmsg);
sqlite3_close(db);
db = nullptr;
RETURN_STATUS_UNEXPECTED(oss.str());
} else {
MS_LOG(INFO) << "Get " << static_cast<int>(field_count.size()) << " records from index.";
MS_LOG(INFO) << "Succeed to get " << static_cast<int>(field_count.size()) << " records from index.";
}
for (const auto &field : field_count) {
@ -156,13 +154,14 @@ Status ShardSegment::ReadAtPageById(int64_t category_id, int64_t page_no, int64_
auto category_info_ptr = std::make_shared<CATEGORY_INFO>();
RETURN_IF_NOT_OK(WrapCategoryInfo(&category_info_ptr));
CHECK_FAIL_RETURN_UNEXPECTED(category_id < static_cast<int>(category_info_ptr->size()) && category_id >= 0,
"Invalid category id, id: " + std::to_string(category_id));
"Invalid data, category_id: " + std::to_string(category_id) +
" must be in the range [0, " + std::to_string(category_info_ptr->size()) + "].");
int total_rows_in_category = std::get<2>((*category_info_ptr)[category_id]);
// Quit if category not found or page number is out of range
CHECK_FAIL_RETURN_UNEXPECTED(total_rows_in_category > 0 && page_no >= 0 && n_rows_of_page > 0 &&
page_no * n_rows_of_page < total_rows_in_category,
"Invalid page no / page size, page no: " + std::to_string(page_no) +
", page size: " + std::to_string(n_rows_of_page));
"Invalid data, page no: " + std::to_string(page_no) +
"or page size: " + std::to_string(n_rows_of_page) + " is invalid.");
auto row_group_summary = ReadRowGroupSummary();
@ -234,7 +233,7 @@ Status ShardSegment::ReadAtPageByName(std::string category_name, int64_t page_no
}
}
RETURN_STATUS_UNEXPECTED("Category name can not match.");
RETURN_STATUS_UNEXPECTED("category_name: " + category_name + " could not found.");
}
Status ShardSegment::ReadAllAtPageById(int64_t category_id, int64_t page_no, int64_t n_rows_of_page,
@ -243,15 +242,15 @@ Status ShardSegment::ReadAllAtPageById(int64_t category_id, int64_t page_no, int
auto category_info_ptr = std::make_shared<CATEGORY_INFO>();
RETURN_IF_NOT_OK(WrapCategoryInfo(&category_info_ptr));
CHECK_FAIL_RETURN_UNEXPECTED(category_id < static_cast<int64_t>(category_info_ptr->size()),
"Invalid category id: " + std::to_string(category_id));
"Invalid data, category_id: " + std::to_string(category_id) +
" must be in the range [0, " + std::to_string(category_info_ptr->size()) + "].");
int total_rows_in_category = std::get<2>((*category_info_ptr)[category_id]);
// Quit if category not found or page number is out of range
CHECK_FAIL_RETURN_UNEXPECTED(total_rows_in_category > 0 && page_no >= 0 && n_rows_of_page > 0 &&
page_no * n_rows_of_page < total_rows_in_category,
"Invalid page no / page size / total size, page no: " + std::to_string(page_no) +
", page size of page: " + std::to_string(n_rows_of_page) +
", total size: " + std::to_string(total_rows_in_category));
"Invalid data, page no: " + std::to_string(page_no) +
"or page size: " + std::to_string(n_rows_of_page) + " is invalid.");
auto row_group_summary = ReadRowGroupSummary();
int i_start = page_no * n_rows_of_page;
@ -278,7 +277,7 @@ Status ShardSegment::ReadAllAtPageById(int64_t category_id, int64_t page_no, int
continue;
}
CHECK_FAIL_RETURN_UNEXPECTED(number_of_rows <= static_cast<int>(labels.size()),
"Invalid row number of page: " + number_of_rows);
"Invalid data, number_of_rows: " + std::to_string(number_of_rows) + " is invalid.");
for (int i = 0; i < number_of_rows; ++i, ++idx) {
if (idx >= i_start && idx < i_end) {
auto images_ptr = std::make_shared<std::vector<uint8_t>>();
@ -305,7 +304,7 @@ Status ShardSegment::ReadAllAtPageByName(std::string category_name, int64_t page
break;
}
}
CHECK_FAIL_RETURN_UNEXPECTED(category_id != -1, "Invalid category name.");
CHECK_FAIL_RETURN_UNEXPECTED(category_id != -1, "category_name: " + category_name + " could not found.");
return ReadAllAtPageById(category_id, page_no, n_rows_of_page, pages_ptr);
}

View File

@ -43,11 +43,12 @@ ShardWriter::~ShardWriter() {
Status ShardWriter::GetFullPathFromFileName(const std::vector<std::string> &paths) {
// Get full path from file name
for (const auto &path : paths) {
CHECK_FAIL_RETURN_UNEXPECTED(CheckIsValidUtf8(path), "The filename contains invalid uft-8 data: " + path);
CHECK_FAIL_RETURN_UNEXPECTED(CheckIsValidUtf8(path),
"Invalid data, file name: " + path + " contains invalid uft-8 character.");
char resolved_path[PATH_MAX] = {0};
char buf[PATH_MAX] = {0};
CHECK_FAIL_RETURN_UNEXPECTED(strncpy_s(buf, PATH_MAX, common::SafeCStr(path), path.length()) == EOK,
"Secure func failed");
"Failed to call securec func [strncpy_s], path: " + path);
#if defined(_WIN32) || defined(_WIN64)
RETURN_UNEXPECTED_IF_NULL(_fullpath(resolved_path, dirname(&(buf[0])), PATH_MAX));
RETURN_UNEXPECTED_IF_NULL(_fullpath(resolved_path, common::SafeCStr(path), PATH_MAX));
@ -55,7 +56,7 @@ Status ShardWriter::GetFullPathFromFileName(const std::vector<std::string> &path
CHECK_FAIL_RETURN_UNEXPECTED(realpath(dirname(&(buf[0])), resolved_path) != nullptr,
"Invalid file, path: " + std::string(resolved_path));
if (realpath(common::SafeCStr(path), resolved_path) == nullptr) {
MS_LOG(DEBUG) << "Path " << resolved_path;
MS_LOG(DEBUG) << "Path: " << common::SafeCStr(path) << "check success.";
}
#endif
file_paths_.emplace_back(string(resolved_path));
@ -74,7 +75,7 @@ Status ShardWriter::OpenDataFiles(bool append) {
}
auto realpath = FileUtils::GetRealPath(dir.value().data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Get real path failed, path=" + file);
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + file);
std::optional<std::string> whole_path = "";
FileUtils::ConcatDirAndFileName(&realpath, &local_file_name, &whole_path);
@ -85,23 +86,23 @@ Status ShardWriter::OpenDataFiles(bool append) {
fs->open(whole_path.value(), std::ios::in | std::ios::binary);
if (fs->good()) {
fs->close();
RETURN_STATUS_UNEXPECTED("MindRecord file already existed, please delete file: " + file);
RETURN_STATUS_UNEXPECTED("Invalid file, Mindrecord files already existed in path: " + file);
}
fs->close();
// open the mindrecord file to write
fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary | std::ios::trunc);
if (!fs->good()) {
RETURN_STATUS_UNEXPECTED("MindRecord file could not opened: " + file);
RETURN_STATUS_UNEXPECTED("Failed to open file, path: " + file);
}
} else {
// open the mindrecord file to append
fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary);
if (!fs->good()) {
fs->close();
RETURN_STATUS_UNEXPECTED("MindRecord file could not opened for append: " + file);
RETURN_STATUS_UNEXPECTED("Failed to open file for append data, path: " + file);
}
}
MS_LOG(INFO) << "Open shard file successfully.";
MS_LOG(INFO) << "Succeed to open shard file, path: " << file;
file_streams_.push_back(fs);
}
return Status::OK();
@ -111,18 +112,18 @@ Status ShardWriter::RemoveLockFile() {
// Remove temporary file
int ret = std::remove(pages_file_.c_str());
if (ret == 0) {
MS_LOG(DEBUG) << "Remove page file.";
MS_LOG(DEBUG) << "Succeed to remove page file, path: " << pages_file_;
}
ret = std::remove(lock_file_.c_str());
if (ret == 0) {
MS_LOG(DEBUG) << "Remove lock file.";
MS_LOG(DEBUG) << "Succeed to remove lock file, path: " << lock_file_;
}
return Status::OK();
}
Status ShardWriter::InitLockFile() {
CHECK_FAIL_RETURN_UNEXPECTED(file_paths_.size() != 0, "File path not initialized.");
CHECK_FAIL_RETURN_UNEXPECTED(file_paths_.size() != 0, "Invalid data, file_paths_ is not initialized.");
lock_file_ = file_paths_[0] + kLockFileSuffix;
pages_file_ = file_paths_[0] + kPageFileSuffix;
@ -132,10 +133,9 @@ Status ShardWriter::InitLockFile() {
Status ShardWriter::Open(const std::vector<std::string> &paths, bool append) {
shard_count_ = paths.size();
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxShardCount && shard_count_ != 0,
"The Shard Count greater than max value(1000) or equal to 0, but got " + shard_count_);
CHECK_FAIL_RETURN_UNEXPECTED(schema_count_ <= kMaxSchemaCount,
"The schema Count greater than max value(1), but got " + schema_count_);
"Invalid data, schema_count_ must be less than or equal to " +
std::to_string(kMaxSchemaCount) + ", but got " + std::to_string(schema_count_));
// Get full path from file name
RETURN_IF_NOT_OK(GetFullPathFromFileName(paths));
@ -147,7 +147,7 @@ Status ShardWriter::Open(const std::vector<std::string> &paths, bool append) {
}
Status ShardWriter::OpenForAppend(const std::string &path) {
CHECK_FAIL_RETURN_UNEXPECTED(IsLegalFile(path), "Invalid file pacth.");
CHECK_FAIL_RETURN_UNEXPECTED(IsLegalFile(path), "Invalid file, path: " + path);
std::shared_ptr<json> header_ptr;
RETURN_IF_NOT_OK(ShardHeader::BuildSingleHeader(path, &header_ptr));
auto ds = std::make_shared<std::vector<std::string>>();
@ -171,7 +171,7 @@ Status ShardWriter::Commit() {
RETURN_IF_NOT_OK(shard_header_->FileToPages(pages_file_));
}
RETURN_IF_NOT_OK(WriteShardHeader());
MS_LOG(INFO) << "Write metadata successfully.";
MS_LOG(INFO) << "Succeed to write meta data.";
// Remove lock file
RETURN_IF_NOT_OK(RemoveLockFile());
@ -183,7 +183,7 @@ Status ShardWriter::SetShardHeader(std::shared_ptr<ShardHeader> header_data) {
// set fields in mindrecord when empty
std::vector<std::pair<uint64_t, std::string>> fields = header_data->GetFields();
if (fields.empty()) {
MS_LOG(DEBUG) << "Missing index fields by user, auto generate index fields.";
MS_LOG(DEBUG) << "Index field is not set, it will be generated automatically.";
std::vector<std::shared_ptr<Schema>> schemas = header_data->GetSchemas();
for (const auto &schema : schemas) {
json jsonSchema = schema->GetSchema()["schema"];
@ -213,8 +213,10 @@ Status ShardWriter::SetShardHeader(std::shared_ptr<ShardHeader> header_data) {
Status ShardWriter::SetHeaderSize(const uint64_t &header_size) {
// header_size [16KB, 128MB]
CHECK_FAIL_RETURN_UNEXPECTED(header_size >= kMinHeaderSize && header_size <= kMaxHeaderSize,
"Header size should between 16KB and 128MB.");
CHECK_FAIL_RETURN_UNEXPECTED(header_size % 4 == 0, "Header size should be divided by four.");
"Invalid data, header size: " + std::to_string(header_size) + " should be in range [" +
std::to_string(kMinHeaderSize) + "MB, " + std::to_string(kMaxHeaderSize) + "MB].");
CHECK_FAIL_RETURN_UNEXPECTED(
header_size % 4 == 0, "Invalid data, header size " + std::to_string(header_size) + " should be divided by four.");
header_size_ = header_size;
return Status::OK();
}
@ -222,8 +224,10 @@ Status ShardWriter::SetHeaderSize(const uint64_t &header_size) {
Status ShardWriter::SetPageSize(const uint64_t &page_size) {
// PageSize [32KB, 256MB]
CHECK_FAIL_RETURN_UNEXPECTED(page_size >= kMinPageSize && page_size <= kMaxPageSize,
"Page size should between 16KB and 256MB.");
CHECK_FAIL_RETURN_UNEXPECTED(page_size % 4 == 0, "Page size should be divided by four.");
"Invalid data, page size: " + std::to_string(page_size) + " should be in range [" +
std::to_string(kMinPageSize) + "MB, " + std::to_string(kMaxPageSize) + "MB].");
CHECK_FAIL_RETURN_UNEXPECTED(page_size % 4 == 0,
"Invalid data, page size " + std::to_string(page_size) + " should be divided by four.");
page_size_ = page_size;
return Status::OK();
}
@ -238,7 +242,7 @@ void ShardWriter::DeleteErrorData(std::map<uint64_t, std::vector<json>> &raw_dat
for (auto &subMg : sub_err_mg) {
int loc = subMg.first;
std::string message = subMg.second;
MS_LOG(ERROR) << "For schema " << id << ", " << loc + 1 << " th data is wrong: " << message;
MS_LOG(ERROR) << "Invalid input, the " << loc + 1 << " th data is invalid, " << message;
(void)delete_set.insert(loc);
}
}
@ -275,7 +279,8 @@ Status ShardWriter::CheckDataTypeAndValue(const std::string &key, const json &va
(data_type == "int64" && !data[key].is_number_integer()) ||
(data_type == "float32" && !data[key].is_number_float()) ||
(data_type == "float64" && !data[key].is_number_float()) || (data_type == "string" && !data[key].is_string())) {
std::string message = "field: " + key + " type : " + data_type + " value: " + data[key].dump() + " is not matched";
std::string message =
"field: " + key + " ,type : " + data_type + " ,value: " + data[key].dump() + " is not matched.";
PopulateMutexErrorData(i, message, err_raw_data);
RETURN_STATUS_UNEXPECTED(message);
}
@ -285,7 +290,7 @@ Status ShardWriter::CheckDataTypeAndValue(const std::string &key, const json &va
if (static_cast<int64_t>(temp_value) < static_cast<int64_t>(std::numeric_limits<int32_t>::min()) &&
static_cast<int64_t>(temp_value) > static_cast<int64_t>(std::numeric_limits<int32_t>::max())) {
std::string message =
"field: " + key + " type : " + data_type + " value: " + data[key].dump() + " is out of range";
"field: " + key + " ,type : " + data_type + " ,value: " + data[key].dump() + " is out of range.";
PopulateMutexErrorData(i, message, err_raw_data);
RETURN_STATUS_UNEXPECTED(message);
}
@ -305,7 +310,7 @@ void ShardWriter::CheckSliceData(int start_row, int end_row, json schema, const
std::string key = iter.key();
json value = iter.value();
if (data.find(key) == data.end()) {
std::string message = "there is not '" + key + "' object in the raw data";
std::string message = "'" + key + "' object can not found in data: " + value.dump();
PopulateMutexErrorData(i, message, err_raw_data);
break;
}
@ -341,7 +346,7 @@ Status ShardWriter::CheckData(const std::map<uint64_t, std::vector<json>> &raw_d
// calculate start position and end position for each thread
int batch_size = rawdata_iter->second.size() / shard_count_;
int thread_num = shard_count_;
CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "Invalid thread number.");
CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "Invalid data, thread_num should be positive.");
if (thread_num > kMaxThreadCount) {
thread_num = kMaxThreadCount;
}
@ -360,7 +365,9 @@ Status ShardWriter::CheckData(const std::map<uint64_t, std::vector<json>> &raw_d
thread_set[x] = std::thread(&ShardWriter::CheckSliceData, this, start_row, end_row, schema,
std::ref(sub_raw_data), std::ref(sub_err_mg));
}
CHECK_FAIL_RETURN_UNEXPECTED(thread_num <= kMaxThreadCount, "Invalid thread number.");
CHECK_FAIL_RETURN_UNEXPECTED(
thread_num <= kMaxThreadCount,
"Invalid data, thread_num should be less than or equal to " + std::to_string(kMaxThreadCount));
// Wait for threads done
for (int x = 0; x < thread_num; ++x) {
thread_set[x].join();
@ -377,22 +384,25 @@ Status ShardWriter::ValidateRawData(std::map<uint64_t, std::vector<json>> &raw_d
RETURN_UNEXPECTED_IF_NULL(count_ptr);
auto rawdata_iter = raw_data.begin();
schema_count_ = raw_data.size();
CHECK_FAIL_RETURN_UNEXPECTED(schema_count_ > 0, "Data size is not positive.");
CHECK_FAIL_RETURN_UNEXPECTED(schema_count_ > 0, "Invalid data, schema count should be positive.");
// keep schema_id
std::set<int64_t> schema_ids;
row_count_ = (rawdata_iter->second).size();
MS_LOG(DEBUG) << "Schema count is " << schema_count_;
// Determine if the number of schemas is the same
CHECK_FAIL_RETURN_UNEXPECTED(shard_header_->GetSchemas().size() == schema_count_,
"Data size is not equal with the schema size");
"Invalid data, schema count: " + std::to_string(schema_count_) + " is not matched.");
// Determine raw_data size == blob_data size
CHECK_FAIL_RETURN_UNEXPECTED(raw_data[0].size() == blob_data.size(), "Raw data size is not equal blob data size");
CHECK_FAIL_RETURN_UNEXPECTED(raw_data[0].size() == blob_data.size(),
"Invalid data, raw data size: " + std::to_string(raw_data[0].size()) +
" is not equal to blob data size: " + std::to_string(blob_data.size()) + ".");
// Determine whether the number of samples corresponding to each schema is the same
for (rawdata_iter = raw_data.begin(); rawdata_iter != raw_data.end(); ++rawdata_iter) {
CHECK_FAIL_RETURN_UNEXPECTED(row_count_ == rawdata_iter->second.size(), "Data size is not equal");
CHECK_FAIL_RETURN_UNEXPECTED(
row_count_ == rawdata_iter->second.size(),
"Invalid data, number of samples: " + std::to_string(rawdata_iter->second.size()) + " for schemais not matched.");
(void)schema_ids.insert(rawdata_iter->first);
}
const std::vector<std::shared_ptr<Schema>> &schemas = shard_header_->GetSchemas();
@ -401,7 +411,7 @@ Status ShardWriter::ValidateRawData(std::map<uint64_t, std::vector<json>> &raw_d
[schema_ids](const std::shared_ptr<Schema> &schema) {
return schema_ids.find(schema->GetSchemaID()) == schema_ids.end();
}),
"Input rawdata schema id do not match real schema id.");
"Invalid data, schema id of data is not matched.");
if (!sign) {
*count_ptr = std::make_shared<std::pair<int, int>>(schema_count_, row_count_);
return Status::OK();
@ -448,8 +458,8 @@ Status ShardWriter::LockWriter(bool parallel_writer, std::unique_ptr<int> *fd_pt
}
#if defined(_WIN32) || defined(_WIN64)
MS_LOG(DEBUG) << "Lock file done by python.";
const int fd = 0;
MS_LOG(DEBUG) << "Lock file done by Python.";
#else
const int fd = open(lock_file_.c_str(), O_WRONLY | O_CREAT, 0666);
@ -457,7 +467,7 @@ Status ShardWriter::LockWriter(bool parallel_writer, std::unique_ptr<int> *fd_pt
flock(fd, LOCK_EX);
} else {
close(fd);
RETURN_STATUS_UNEXPECTED("Shard writer failed when locking file.");
RETURN_STATUS_UNEXPECTED("Failed to lock file, path: " + lock_file_);
}
#endif
@ -467,20 +477,20 @@ Status ShardWriter::LockWriter(bool parallel_writer, std::unique_ptr<int> *fd_pt
auto realpath = FileUtils::GetRealPath(file.data());
if (!realpath.has_value()) {
close(fd);
RETURN_STATUS_UNEXPECTED("Get real path failed, path=" + file);
RETURN_STATUS_UNEXPECTED("Failed to get real path, path: " + file);
}
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(realpath.value(), std::ios::in | std::ios::out | std::ios::binary);
if (fs->fail()) {
close(fd);
RETURN_STATUS_UNEXPECTED("Invalid file, failed to open file: " + file);
RETURN_STATUS_UNEXPECTED("Failed to open file, path: " + file);
}
file_streams_.push_back(fs);
}
auto status = shard_header_->FileToPages(pages_file_);
if (status.IsError()) {
close(fd);
RETURN_STATUS_UNEXPECTED("Error in FileToPages.");
RETURN_STATUS_UNEXPECTED("Error raised in FileToPages function.");
}
*fd_ptr = std::make_unique<int>(fd);
return Status::OK();
@ -495,7 +505,8 @@ Status ShardWriter::UnlockWriter(int fd, bool parallel_writer) {
file_streams_[i]->close();
}
#if defined(_WIN32) || defined(_WIN64)
MS_LOG(DEBUG) << "Unlock file done by python.";
MS_LOG(DEBUG) << "Unlock file done by Python.";
#else
flock(fd, LOCK_UN);
close(fd);
@ -509,7 +520,8 @@ Status ShardWriter::WriteRawDataPreCheck(std::map<uint64_t, std::vector<json>> &
// check the free disk size
std::shared_ptr<uint64_t> size_ptr;
RETURN_IF_NOT_OK(GetDiskSize(file_paths_[0], kFreeSize, &size_ptr));
CHECK_FAIL_RETURN_UNEXPECTED(*size_ptr >= kMinFreeDiskSize, "IO error / there is no free disk to be used");
CHECK_FAIL_RETURN_UNEXPECTED(*size_ptr >= kMinFreeDiskSize,
"No free disk to be used, free disk size: " + std::to_string(*size_ptr));
// compress blob
if (shard_column_->CheckCompressBlob()) {
for (auto &blob : blob_data) {
@ -583,7 +595,7 @@ Status ShardWriter::WriteRawData(std::map<uint64_t, std::vector<json>> &raw_data
// Serialize raw data
RETURN_IF_NOT_OK(WriteRawDataPreCheck(raw_data, blob_data, sign, &schema_count, &row_count));
CHECK_FAIL_RETURN_UNEXPECTED(row_count >= kInt0, "Raw data size is not positive.");
CHECK_FAIL_RETURN_UNEXPECTED(row_count >= kInt0, "Invalid data, waw data size should be positive.");
if (row_count == kInt0) {
return Status::OK();
}
@ -596,7 +608,7 @@ Status ShardWriter::WriteRawData(std::map<uint64_t, std::vector<json>> &raw_data
RETURN_IF_NOT_OK(SetBlobDataSize(blob_data));
// Write data to disk with multi threads
RETURN_IF_NOT_OK(ParallelWriteData(blob_data, bin_raw_data));
MS_LOG(INFO) << "Write " << bin_raw_data.size() << " records successfully.";
MS_LOG(INFO) << "Succeed to write " << bin_raw_data.size() << " records.";
RETURN_IF_NOT_OK(UnlockWriter(*fd_ptr, parallel_writer));
@ -644,7 +656,7 @@ Status ShardWriter::ParallelWriteData(const std::vector<std::vector<uint8_t>> &b
auto shards = BreakIntoShards();
// define the number of thread
int thread_num = static_cast<int>(shard_count_);
CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "Invalid thread number.");
CHECK_FAIL_RETURN_UNEXPECTED(thread_num > 0, "Invalid data, thread_num should be positive.");
if (thread_num > kMaxThreadCount) {
thread_num = kMaxThreadCount;
}
@ -708,11 +720,14 @@ Status ShardWriter::CutRowGroup(int start_row, int end_row, const std::vector<st
auto n_byte_raw = last_raw_page_size - last_raw_offset;
int page_start_row = start_row;
CHECK_FAIL_RETURN_UNEXPECTED(start_row <= end_row, "Invalid start row.");
CHECK_FAIL_RETURN_UNEXPECTED(start_row <= end_row,
"Invalid data, start row: " + std::to_string(start_row) +
" should be less than or equal to end row: " + std::to_string(end_row));
CHECK_FAIL_RETURN_UNEXPECTED(
end_row <= static_cast<int>(blob_data_size_.size()) && end_row <= static_cast<int>(raw_data_size_.size()),
"Invalid end row.");
"Invalid data, end row: " + std::to_string(end_row) + " should be less than blob data size: " +
std::to_string(blob_data_size_.size()) + " and raw data size: " + std::to_string(raw_data_size_.size()) + ".");
for (int i = start_row; i < end_row; ++i) {
// n_byte_blob(0) indicate appendBlobPage
if (n_byte_blob == 0 || n_byte_blob + blob_data_size_[i] > page_size_ ||
@ -810,7 +825,9 @@ Status ShardWriter::ShiftRawPage(const int &shard_id, const std::vector<std::pai
std::vector<uint8_t> buf(shift_size);
// Read last row group from previous raw data page
CHECK_FAIL_RETURN_UNEXPECTED(shard_id >= 0 && shard_id < file_streams_.size(), "Invalid shard id");
CHECK_FAIL_RETURN_UNEXPECTED(
shard_id >= 0 && shard_id < file_streams_.size(),
"Invalid data, shard_id should be in range [0, " + std::to_string(file_streams_.size()) + ").");
auto &io_seekg = file_streams_[shard_id]->seekg(
page_size_ * last_raw_page_id + header_size_ + last_row_group_id_offset, std::ios::beg);
@ -921,7 +938,8 @@ Status ShardWriter::FlushBlobChunk(const std::shared_ptr<std::fstream> &out,
const std::pair<int, int> &blob_row) {
CHECK_FAIL_RETURN_UNEXPECTED(
blob_row.first <= blob_row.second && blob_row.second <= static_cast<int>(blob_data.size()) && blob_row.first >= 0,
"Invalid blob row");
"Invalid data, blob_row: " + std::to_string(blob_row.first) + ", " + std::to_string(blob_row.second) +
" is invalid.");
for (int j = blob_row.first; j < blob_row.second; ++j) {
// Write the size of blob
uint64_t line_len = blob_data[j].size();
@ -1003,7 +1021,8 @@ Status ShardWriter::WriteShardHeader() {
// Write header data to multi files
CHECK_FAIL_RETURN_UNEXPECTED(
shard_count_ <= static_cast<int>(file_streams_.size()) && shard_count_ <= static_cast<int>(shard_header.size()),
"Invalid shard count");
"Invalid data, shard count should be less than or equal to file size: " + std::to_string(file_streams_.size()) +
", and header size: " + std::to_string(shard_header.size()) + ".");
if (shard_count_ <= kMaxShardCount) {
for (int shard_id = 0; shard_id < shard_count_; ++shard_id) {
auto &io_seekp = file_streams_[shard_id]->seekp(0, std::ios::beg);
@ -1061,7 +1080,7 @@ Status ShardWriter::SerializeRawData(std::map<uint64_t, std::vector<json>> &raw_
// Set obstacles to prevent the main thread from running
thread_set[x].join();
}
CHECK_FAIL_RETURN_SYNTAX_ERROR(flag_ != true, "Error in FailArray");
CHECK_FAIL_RETURN_SYNTAX_ERROR(flag_ != true, "Error raised in FillArray function.");
return Status::OK();
}
@ -1072,8 +1091,9 @@ Status ShardWriter::SetRawDataSize(const std::vector<std::vector<uint8_t>> &bin_
bin_raw_data.begin() + (i * schema_count_), bin_raw_data.begin() + (i * schema_count_) + schema_count_, 0,
[](uint64_t accumulator, const std::vector<uint8_t> &row) { return accumulator + kInt64Len + row.size(); });
}
CHECK_FAIL_RETURN_SYNTAX_ERROR(*std::max_element(raw_data_size_.begin(), raw_data_size_.end()) <= page_size_,
"Page size is too small to save a row!");
CHECK_FAIL_RETURN_SYNTAX_ERROR(
*std::max_element(raw_data_size_.begin(), raw_data_size_.end()) <= page_size_,
"Invalid data, Page size: " + std::to_string(page_size_) + " is too small to save a raw row!");
return Status::OK();
}
@ -1081,15 +1101,16 @@ Status ShardWriter::SetBlobDataSize(const std::vector<std::vector<uint8_t>> &blo
blob_data_size_ = std::vector<uint64_t>(row_count_);
(void)std::transform(blob_data.begin(), blob_data.end(), blob_data_size_.begin(),
[](const std::vector<uint8_t> &row) { return kInt64Len + row.size(); });
CHECK_FAIL_RETURN_SYNTAX_ERROR(*std::max_element(blob_data_size_.begin(), blob_data_size_.end()) <= page_size_,
"Page size is too small to save a row!");
CHECK_FAIL_RETURN_SYNTAX_ERROR(
*std::max_element(blob_data_size_.begin(), blob_data_size_.end()) <= page_size_,
"Invalid data, Page size: " + std::to_string(page_size_) + " is too small to save a blob row!");
return Status::OK();
}
Status ShardWriter::SetLastRawPage(const int &shard_id, std::shared_ptr<Page> &last_raw_page) {
// Get last raw page
auto last_raw_page_id = shard_header_->GetLastPageIdByType(shard_id, kPageTypeRaw);
CHECK_FAIL_RETURN_SYNTAX_ERROR(last_raw_page_id >= 0, "Invalid last_raw_page_id.");
CHECK_FAIL_RETURN_SYNTAX_ERROR(last_raw_page_id >= 0, "Invalid data, last_raw_page_id should be positive.");
RETURN_IF_NOT_OK(shard_header_->GetPage(shard_id, last_raw_page_id, &last_raw_page));
return Status::OK();
}
@ -1097,7 +1118,7 @@ Status ShardWriter::SetLastRawPage(const int &shard_id, std::shared_ptr<Page> &l
Status ShardWriter::SetLastBlobPage(const int &shard_id, std::shared_ptr<Page> &last_blob_page) {
// Get last blob page
auto last_blob_page_id = shard_header_->GetLastPageIdByType(shard_id, kPageTypeBlob);
CHECK_FAIL_RETURN_SYNTAX_ERROR(last_blob_page_id >= 0, "Invalid last_blob_page_id.");
CHECK_FAIL_RETURN_SYNTAX_ERROR(last_blob_page_id >= 0, "Invalid data, last_blob_page_id should be positive.");
RETURN_IF_NOT_OK(shard_header_->GetPage(shard_id, last_blob_page_id, &last_blob_page));
return Status::OK();
}

View File

@ -81,7 +81,7 @@ Status ShardColumn::GetColumnTypeByName(const std::string &column_name, ColumnDa
RETURN_UNEXPECTED_IF_NULL(column_category);
// Skip if column not found
*column_category = CheckColumnName(column_name);
CHECK_FAIL_RETURN_UNEXPECTED(*column_category != ColumnNotFound, "Invalid column category.");
CHECK_FAIL_RETURN_UNEXPECTED(*column_category != ColumnNotFound, "Invalid data, column category is not found.");
// Get data type and size
auto column_id = column_name_id_[column_name];
@ -101,7 +101,7 @@ Status ShardColumn::GetColumnValueByName(const std::string &column_name, const s
RETURN_UNEXPECTED_IF_NULL(column_shape);
// Skip if column not found
auto column_category = CheckColumnName(column_name);
CHECK_FAIL_RETURN_UNEXPECTED(column_category != ColumnNotFound, "Invalid column category.");
CHECK_FAIL_RETURN_UNEXPECTED(column_category != ColumnNotFound, "Invalid data, column category is not found.");
// Get data type and size
auto column_id = column_name_id_[column_name];
*column_data_type = column_data_type_[column_id];
@ -133,8 +133,9 @@ Status ShardColumn::GetColumnFromJson(const std::string &column_name, const json
// Initialize num bytes
*n_bytes = ColumnDataTypeSize[column_data_type];
auto json_column_value = columns_json[column_name];
CHECK_FAIL_RETURN_UNEXPECTED(json_column_value.is_string() || json_column_value.is_number(),
"Conversion to string or number failed (" + json_column_value.dump() + ").");
CHECK_FAIL_RETURN_UNEXPECTED(
json_column_value.is_string() || json_column_value.is_number(),
"Invalid data, column value [" + json_column_value.dump() + "] is not string or number.");
switch (column_data_type) {
case ColumnFloat32: {
return GetFloat<float>(data_ptr, json_column_value, false);
@ -184,7 +185,8 @@ Status ShardColumn::GetFloat(std::unique_ptr<unsigned char[]> *data_ptr, const j
array_data[0] = json_column_value.get<float>();
}
} catch (json::exception &e) {
RETURN_STATUS_UNEXPECTED("Conversion to float failed (" + json_column_value.dump() + ").");
RETURN_STATUS_UNEXPECTED("Failed to convert [" + json_column_value.dump() + "] to float, " +
std::string(e.what()));
}
}
@ -219,17 +221,17 @@ Status ShardColumn::GetInt(std::unique_ptr<unsigned char[]> *data_ptr, const jso
temp_value = static_cast<int64_t>(std::stoull(string_value));
}
} catch (std::invalid_argument &e) {
RETURN_STATUS_UNEXPECTED("Conversion to int failed: " + std::string(e.what()));
RETURN_STATUS_UNEXPECTED("Failed to convert [" + string_value + "] to int, " + std::string(e.what()));
} catch (std::out_of_range &e) {
RETURN_STATUS_UNEXPECTED("Conversion to int failed: " + std::string(e.what()));
RETURN_STATUS_UNEXPECTED("Failed to convert [" + string_value + "] to int, " + std::string(e.what()));
}
} else {
RETURN_STATUS_UNEXPECTED("Conversion to int failed.");
RETURN_STATUS_UNEXPECTED("Invalid data, column value [" + json_column_value.dump() + "] is not string or number.");
}
if ((less_than_zero && temp_value < static_cast<int64_t>(std::numeric_limits<T>::min())) ||
(!less_than_zero && static_cast<uint64_t>(temp_value) > static_cast<uint64_t>(std::numeric_limits<T>::max()))) {
RETURN_STATUS_UNEXPECTED("Conversion to int failed, out of range.");
RETURN_STATUS_UNEXPECTED("Invalid data, column value [" + std::to_string(temp_value) + "] is out of range.");
}
array_data[0] = static_cast<T>(temp_value);
@ -310,7 +312,7 @@ std::vector<uint8_t> ShardColumn::CompressBlob(const std::vector<uint8_t> &blob,
dst_blob.insert(dst_blob.end(), dst_blob_slice.begin(), dst_blob_slice.end());
i_src += kInt64Len + num_bytes;
}
MS_LOG(DEBUG) << "Compress all blob from " << blob.size() << " to " << dst_blob.size() << ".";
MS_LOG(DEBUG) << "Compress blob data from " << blob.size() << " to " << dst_blob.size() << ".";
*compression_size = static_cast<int64_t>(blob.size()) - static_cast<int64_t>(dst_blob.size());
return dst_blob;
}
@ -406,7 +408,7 @@ Status ShardColumn::UncompressInt(const uint64_t &column_id, std::unique_ptr<uns
if (*num_bytes == 0) {
return Status::OK();
}
CHECK_FAIL_RETURN_UNEXPECTED(memcpy_s(data_ptr->get(), *num_bytes, data, *num_bytes) == 0, "Failed to copy data!");
CHECK_FAIL_RETURN_UNEXPECTED(memcpy_s(data_ptr->get(), *num_bytes, data, *num_bytes) == 0, "Failed to copy data.");
return Status::OK();
}

View File

@ -58,8 +58,10 @@ int64_t ShardDistributedSample::GetNumSamples(int64_t dataset_size, int64_t num_
Status ShardDistributedSample::PreExecute(ShardTaskList &tasks) {
auto total_no = tasks.Size();
if (no_of_padded_samples_ > 0 && first_epoch_) {
CHECK_FAIL_RETURN_UNEXPECTED(total_no % denominator_ == 0,
"Dataset size plus number of padded samples is not divisible by number of shards.");
CHECK_FAIL_RETURN_UNEXPECTED(
total_no % denominator_ == 0,
"Invalid input, number of padding samples: " + std::to_string(no_of_padded_samples_) +
" plus dataset size is not divisible by num_shards: " + std::to_string(denominator_) + ".");
}
if (first_epoch_) {
first_epoch_ = false;

View File

@ -61,20 +61,20 @@ Status ShardHeader::InitializeHeader(const std::vector<json> &headers, bool load
Status ShardHeader::CheckFileStatus(const std::string &path) {
auto realpath = FileUtils::GetRealPath(path.data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Get real path failed, path: " + path);
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + path);
std::ifstream fin(realpath.value(), std::ios::in | std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(fin, "Failed to open file. path: " + path);
CHECK_FAIL_RETURN_UNEXPECTED(fin, "Failed to open file, file path: " + path);
// fetch file size
auto &io_seekg = fin.seekg(0, std::ios::end);
if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) {
fin.close();
RETURN_STATUS_UNEXPECTED("File seekg failed. path: " + path);
RETURN_STATUS_UNEXPECTED("Failed to seekg file, file path: " + path);
}
size_t file_size = fin.tellg();
if (file_size < kMinFileSize) {
fin.close();
RETURN_STATUS_UNEXPECTED("Invalid file. path: " + path);
RETURN_STATUS_UNEXPECTED("Invalid file content, file " + path + " size is smaller than the lower limit.");
}
fin.close();
return Status::OK();
@ -86,18 +86,18 @@ Status ShardHeader::ValidateHeader(const std::string &path, std::shared_ptr<json
// read header size
json json_header;
std::ifstream fin(common::SafeCStr(path), std::ios::in | std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(fin.is_open(), "Failed to open file. path: " + path);
CHECK_FAIL_RETURN_UNEXPECTED(fin.is_open(), "Failed to open file, file path: " + path);
uint64_t header_size = 0;
auto &io_read = fin.read(reinterpret_cast<char *>(&header_size), kInt64Len);
if (!io_read.good() || io_read.fail() || io_read.bad()) {
fin.close();
RETURN_STATUS_UNEXPECTED("File read failed");
RETURN_STATUS_UNEXPECTED("Failed to read file, file path: " + path);
}
if (header_size > kMaxHeaderSize) {
fin.close();
RETURN_STATUS_UNEXPECTED("Invalid file content. path: " + path);
RETURN_STATUS_UNEXPECTED("Invalid file content, incorrect file or file header is exceeds the upper limit.");
}
// read header content
@ -105,7 +105,7 @@ Status ShardHeader::ValidateHeader(const std::string &path, std::shared_ptr<json
auto &io_read_content = fin.read(reinterpret_cast<char *>(&header_content[0]), header_size);
if (!io_read_content.good() || io_read_content.fail() || io_read_content.bad()) {
fin.close();
RETURN_STATUS_UNEXPECTED("File read failed. path: " + path);
RETURN_STATUS_UNEXPECTED("Failed to read file, file path: " + path);
}
fin.close();
@ -114,7 +114,7 @@ Status ShardHeader::ValidateHeader(const std::string &path, std::shared_ptr<json
try {
json_header = json::parse(raw_header_content);
} catch (json::parse_error &e) {
RETURN_STATUS_UNEXPECTED("Json parse error: " + std::string(e.what()));
RETURN_STATUS_UNEXPECTED("Json parse failed: " + std::string(e.what()));
}
*header_ptr = std::make_shared<json>(json_header);
return Status::OK();
@ -178,15 +178,16 @@ void ShardHeader::GetHeadersOneTask(int start, int end, std::vector<json> &heade
}
for (int x = start; x < end; ++x) {
std::shared_ptr<json> header;
if (ValidateHeader(realAddresses[x], &header).IsError()) {
auto status = ValidateHeader(realAddresses[x], &header);
if (status.IsError()) {
thread_status = true;
return;
}
(*header)["shard_addresses"] = realAddresses;
if (std::find(kSupportedVersion.begin(), kSupportedVersion.end(), (*header)["version"]) ==
kSupportedVersion.end()) {
MS_LOG(ERROR) << "Version wrong, file version is: " << (*header)["version"].dump()
<< ", lib version is: " << kVersion;
MS_LOG(ERROR) << "Invalid version, file version " << (*header)["version"].dump() << " can not match lib version "
<< kVersion << ".";
thread_status = true;
return;
}
@ -204,7 +205,8 @@ Status ShardHeader::InitByFiles(const std::vector<std::string> &file_paths) {
shard_addresses_ = std::move(file_names);
shard_count_ = file_paths.size();
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ != 0 && (shard_count_ <= kMaxShardCount),
"shard count is invalid. shard count: " + std::to_string(shard_count_));
"Invalid input, The number of MindRecord files " + std::to_string(shard_count_) +
"is not int range (0, " + std::to_string(kMaxShardCount) + "].");
pages_.resize(shard_count_);
return Status::OK();
}
@ -223,10 +225,10 @@ Status ShardHeader::ParseIndexFields(const json &index_fields) {
Status ShardHeader::ParsePage(const json &pages, int shard_index, bool load_dataset) {
// set shard_index when load_dataset is false
CHECK_FAIL_RETURN_UNEXPECTED(
shard_count_ <= kMaxFileCount,
"The number of mindrecord files is greater than max value: " + std::to_string(kMaxFileCount));
if (pages_.empty() && shard_count_ <= kMaxFileCount) {
CHECK_FAIL_RETURN_UNEXPECTED(shard_count_ <= kMaxFileCount, "Invalid input, The number of MindRecord files " +
std::to_string(shard_count_) + "is not int range (0, " +
std::to_string(kMaxFileCount) + "].");
if (pages_.empty()) {
pages_.resize(shard_count_);
}
@ -259,7 +261,7 @@ Status ShardHeader::ParseStatistics(const json &statistics) {
for (auto &statistic : statistics) {
CHECK_FAIL_RETURN_UNEXPECTED(
statistic.find("desc") != statistic.end() && statistic.find("statistics") != statistic.end(),
"Deserialize statistics failed, statistic: " + statistics.dump());
"Failed to deserialize statistics, statistic info: " + statistics.dump());
std::string statistic_description = statistic["desc"].get<std::string>();
json statistic_body = statistic["statistics"];
std::shared_ptr<Statistics> parsed_statistic = Statistics::Build(statistic_description, statistic_body);
@ -274,7 +276,7 @@ Status ShardHeader::ParseSchema(const json &schemas) {
// change how we get schemaBody once design is finalized
CHECK_FAIL_RETURN_UNEXPECTED(schema.find("desc") != schema.end() && schema.find("blob_fields") != schema.end() &&
schema.find("schema") != schema.end(),
"Deserialize schema failed. schema: " + schema.dump());
"Failed to deserialize schema, schema info: " + schema.dump());
std::string schema_description = schema["desc"].get<std::string>();
std::vector<std::string> blob_fields = schema["blob_fields"].get<std::vector<std::string>>();
json schema_body = schema["schema"];
@ -371,7 +373,7 @@ Status ShardHeader::GetPage(const int &shard_id, const int &page_id, std::shared
return Status::OK();
}
page_ptr = nullptr;
RETURN_STATUS_UNEXPECTED("Failed to Get Page.");
RETURN_STATUS_UNEXPECTED("Failed to get Page, 'page_id': " + std::to_string(page_id));
}
Status ShardHeader::SetPage(const std::shared_ptr<Page> &new_page) {
@ -381,7 +383,7 @@ Status ShardHeader::SetPage(const std::shared_ptr<Page> &new_page) {
pages_[shard_id][page_id] = new_page;
return Status::OK();
}
RETURN_STATUS_UNEXPECTED("Failed to Set Page.");
RETURN_STATUS_UNEXPECTED("Failed to set Page, 'page_id': " + std::to_string(page_id));
}
Status ShardHeader::AddPage(const std::shared_ptr<Page> &new_page) {
@ -391,7 +393,7 @@ Status ShardHeader::AddPage(const std::shared_ptr<Page> &new_page) {
pages_[shard_id].push_back(new_page);
return Status::OK();
}
RETURN_STATUS_UNEXPECTED("Failed to Add Page.");
RETURN_STATUS_UNEXPECTED("Failed to add Page, 'page_id': " + std::to_string(page_id));
}
int64_t ShardHeader::GetLastPageId(const int &shard_id) {
@ -426,17 +428,17 @@ Status ShardHeader::GetPageByGroupId(const int &group_id, const int &shard_id, s
}
}
page_ptr = nullptr;
RETURN_STATUS_UNEXPECTED("Failed to get page by group id: " + group_id);
RETURN_STATUS_UNEXPECTED("Failed to get Page, 'group_id': " + std::to_string(group_id));
}
int ShardHeader::AddSchema(std::shared_ptr<Schema> schema) {
if (schema == nullptr) {
MS_LOG(ERROR) << "Schema is illegal";
MS_LOG(ERROR) << "The pointer of schema is null.";
return -1;
}
if (!schema_.empty()) {
MS_LOG(ERROR) << "Only support one schema";
MS_LOG(ERROR) << "The schema can not be added twice.";
return -1;
}
@ -471,10 +473,12 @@ std::shared_ptr<Index> ShardHeader::InitIndexPtr() {
Status ShardHeader::CheckIndexField(const std::string &field, const json &schema) {
// check field name is or is not valid
CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field) != schema.end(), "Filed can not found in schema.");
CHECK_FAIL_RETURN_UNEXPECTED(schema[field]["type"] != "Bytes", "bytes can not be as index field.");
CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field) != schema.end(),
"Invalid input, field [" + field + "] can not found in schema.");
CHECK_FAIL_RETURN_UNEXPECTED(schema[field]["type"] != "Bytes",
"Invalid input, byte type field [" + field + "] can not set as an index field.");
CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field) == schema.end() || schema[field].find("shape") == schema[field].end(),
"array can not be as index field.");
"Invalid input, array type field [" + field + "] can not set as an index field.");
return Status::OK();
}
@ -482,7 +486,7 @@ Status ShardHeader::AddIndexFields(const std::vector<std::string> &fields) {
if (fields.empty()) {
return Status::OK();
}
CHECK_FAIL_RETURN_UNEXPECTED(!GetSchemas().empty(), "Schema is empty.");
CHECK_FAIL_RETURN_UNEXPECTED(!GetSchemas().empty(), "Invalid data, schema is empty.");
// create index Object
std::shared_ptr<Index> index = InitIndexPtr();
for (const auto &schemaPtr : schema_) {
@ -495,7 +499,8 @@ Status ShardHeader::AddIndexFields(const std::vector<std::string> &fields) {
field_set.insert(item.second);
}
for (const auto &field : fields) {
CHECK_FAIL_RETURN_UNEXPECTED(field_set.find(field) == field_set.end(), "Add same index field twice.");
CHECK_FAIL_RETURN_UNEXPECTED(field_set.find(field) == field_set.end(),
"Invalid data, the same index field [" + field + "] can not added twice.");
// check field name is or is not valid
RETURN_IF_NOT_OK(CheckIndexField(field, schema));
field_set.insert(field);
@ -510,9 +515,10 @@ Status ShardHeader::AddIndexFields(const std::vector<std::string> &fields) {
Status ShardHeader::GetAllSchemaID(std::set<uint64_t> &bucket_count) {
// get all schema id
for (const auto &schema : schema_) {
auto bucket_it = bucket_count.find(schema->GetSchemaID());
CHECK_FAIL_RETURN_UNEXPECTED(bucket_it == bucket_count.end(), "Schema duplication.");
bucket_count.insert(schema->GetSchemaID());
auto schema_id = schema->GetSchemaID();
CHECK_FAIL_RETURN_UNEXPECTED(bucket_count.find(schema_id) == bucket_count.end(),
"Invalid data, duplicate schema exist, schema id: " + std::to_string(schema_id));
bucket_count.insert(schema_id);
}
return Status::OK();
}
@ -532,18 +538,20 @@ Status ShardHeader::AddIndexFields(std::vector<std::pair<uint64_t, std::string>>
field_set.insert(item);
}
for (const auto &field : fields) {
CHECK_FAIL_RETURN_UNEXPECTED(field_set.find(field) == field_set.end(), "Add same index field twice.");
CHECK_FAIL_RETURN_UNEXPECTED(field_set.find(field) == field_set.end(),
"Invalid data, the same index field [" + field.second + "] can not added twice.");
uint64_t schema_id = field.first;
std::string field_name = field.second;
// check schemaId is or is not valid
CHECK_FAIL_RETURN_UNEXPECTED(bucket_count.find(schema_id) != bucket_count.end(), "Invalid schema id: " + schema_id);
CHECK_FAIL_RETURN_UNEXPECTED(bucket_count.find(schema_id) != bucket_count.end(),
"Invalid data, schema id [" + std::to_string(schema_id) + "] is invalid.");
// check field name is or is not valid
std::shared_ptr<Schema> schema_ptr;
RETURN_IF_NOT_OK(GetSchemaByID(schema_id, &schema_ptr));
json schema = schema_ptr->GetSchema().at("schema");
CHECK_FAIL_RETURN_UNEXPECTED(schema.find(field_name) != schema.end(),
"Schema " + std::to_string(schema_id) + " do not contain the field: " + field_name);
"Invalid data, field [" + field_name + "] is not found in schema.");
RETURN_IF_NOT_OK(CheckIndexField(field_name, schema));
field_set.insert(field);
// add field into index
@ -570,8 +578,10 @@ std::shared_ptr<Index> ShardHeader::GetIndex() { return index_; }
Status ShardHeader::GetSchemaByID(int64_t schema_id, std::shared_ptr<Schema> *schema_ptr) {
RETURN_UNEXPECTED_IF_NULL(schema_ptr);
int64_t schemaSize = schema_.size();
CHECK_FAIL_RETURN_UNEXPECTED(schema_id >= 0 && schema_id < schemaSize, "schema id is invalid.");
int64_t schema_size = schema_.size();
CHECK_FAIL_RETURN_UNEXPECTED(schema_id >= 0 && schema_id < schema_size,
"Invalid data, schema id [" + std::to_string(schema_id) + "] is not in range [0, " +
std::to_string(schema_size) + ").");
*schema_ptr = schema_.at(schema_id);
return Status::OK();
}
@ -579,17 +589,19 @@ Status ShardHeader::GetSchemaByID(int64_t schema_id, std::shared_ptr<Schema> *sc
Status ShardHeader::GetStatisticByID(int64_t statistic_id, std::shared_ptr<Statistics> *statistics_ptr) {
RETURN_UNEXPECTED_IF_NULL(statistics_ptr);
int64_t statistics_size = statistics_.size();
CHECK_FAIL_RETURN_UNEXPECTED(statistic_id >= 0 && statistic_id < statistics_size, "statistic id is invalid.");
CHECK_FAIL_RETURN_UNEXPECTED(statistic_id >= 0 && statistic_id < statistics_size,
"Invalid data, statistic id [" + std::to_string(statistic_id) +
"] is not in range [0, " + std::to_string(statistics_size) + ").");
*statistics_ptr = statistics_.at(statistic_id);
return Status::OK();
}
Status ShardHeader::PagesToFile(const std::string dump_file_name) {
auto realpath = FileUtils::GetRealPath(dump_file_name.data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Get real path failed, path=" + dump_file_name);
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + dump_file_name);
// write header content to file, dump whatever is in the file before
std::ofstream page_out_handle(realpath.value(), std::ios_base::trunc | std::ios_base::out);
CHECK_FAIL_RETURN_UNEXPECTED(page_out_handle.good(), "Failed to open page file.");
CHECK_FAIL_RETURN_UNEXPECTED(page_out_handle.good(), "Failed to open page file, path: " + dump_file_name);
auto pages = SerializePage();
for (const auto &shard_pages : pages) {
page_out_handle << shard_pages << "\n";
@ -603,10 +615,11 @@ Status ShardHeader::FileToPages(const std::string dump_file_name) {
v.clear();
}
auto realpath = FileUtils::GetRealPath(dump_file_name.data());
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Get real path failed, path=" + dump_file_name);
CHECK_FAIL_RETURN_UNEXPECTED(realpath.has_value(), "Failed to get real path, path: " + dump_file_name);
// attempt to open the file contains the page in json
std::ifstream page_in_handle(realpath.value());
CHECK_FAIL_RETURN_UNEXPECTED(page_in_handle.good(), "No page file exists.");
CHECK_FAIL_RETURN_UNEXPECTED(page_in_handle.good(),
"Invalid file, page file does not exist, path: " + dump_file_name);
std::string line;
while (std::getline(page_in_handle, line)) {
RETURN_IF_NOT_OK(ParsePage(json::parse(line), -1, true));

View File

@ -110,7 +110,6 @@ Status ShardSample::UpdateTasks(ShardTaskList &tasks, int taking) {
ShardTaskList::TaskListSwap(tasks, new_tasks);
} else {
ShardTaskList new_tasks;
CHECK_FAIL_RETURN_UNEXPECTED(taking <= static_cast<int>(tasks.sample_ids_.size()), "taking is out of range.");
int total_no = static_cast<int>(tasks.permutation_.size());
int cnt = 0;
for (size_t i = partition_id_ * taking; i < (partition_id_ + 1) * taking; i++) {
@ -145,7 +144,8 @@ Status ShardSample::Execute(ShardTaskList &tasks) {
taking = no_of_samples_ - no_of_samples_ % no_of_categories;
} else if (sampler_type_ == kSubsetRandomSampler || sampler_type_ == kSubsetSampler) {
CHECK_FAIL_RETURN_UNEXPECTED(indices_.size() <= static_cast<size_t>(total_no),
"Parameter indices's size is greater than dataset size.");
"Invalid input, indices size: " + std::to_string(indices_.size()) +
" need to be less than dataset size: " + std::to_string(total_no) + ".");
} else { // constructor TopPercent
if (numerator_ > 0 && denominator_ > 0 && numerator_ <= denominator_) {
if (numerator_ == 1 && denominator_ > 1) { // sharding
@ -155,7 +155,9 @@ Status ShardSample::Execute(ShardTaskList &tasks) {
taking -= (taking % no_of_categories);
}
} else {
RETURN_STATUS_UNEXPECTED("Parameter numerator or denominator is invalid.");
RETURN_STATUS_UNEXPECTED("Invalid input, numerator: " + std::to_string(numerator_) +
" need to be positive and be less than denominator: " + std::to_string(denominator_) +
".");
}
}
return UpdateTasks(tasks, taking);

View File

@ -67,20 +67,20 @@ std::vector<std::string> Schema::PopulateBlobFields(json schema) {
bool Schema::ValidateNumberShape(const json &it_value) {
if (it_value.find("shape") == it_value.end()) {
MS_LOG(ERROR) << it_value["type"].dump() << " supports shape only.";
MS_LOG(ERROR) << "Invalid data, 'shape' object can not found in " << it_value.dump();
return false;
}
auto shape = it_value["shape"];
if (!shape.is_array()) {
MS_LOG(ERROR) << "Shape " << it_value["type"].dump() << ", format is wrong.";
MS_LOG(ERROR) << "Invalid data, shape [" << it_value["shape"].dump() << "] is invalid.";
return false;
}
int num_negtive_one = 0;
for (const auto &i : shape) {
if (i == 0 || i < -1) {
MS_LOG(ERROR) << "Shape " << it_value["shape"].dump() << ", dimension is wrong.";
MS_LOG(ERROR) << "Invalid data, shape [" << it_value["shape"].dump() << "]dimension is invalid.";
return false;
}
if (i == -1) {
@ -89,7 +89,8 @@ bool Schema::ValidateNumberShape(const json &it_value) {
}
if (num_negtive_one > 1) {
MS_LOG(ERROR) << "Shape " << it_value["shape"].dump() << ", have at most 1 variable-length dimension.";
MS_LOG(ERROR) << "Invalid data, shape [" << it_value["shape"].dump()
<< "] have more than 1 variable dimension(-1).";
return false;
}
@ -98,25 +99,26 @@ bool Schema::ValidateNumberShape(const json &it_value) {
bool Schema::Validate(json schema) {
if (schema.size() == kInt0) {
MS_LOG(ERROR) << "Schema is null";
MS_LOG(ERROR) << "Invalid data, schema is empty.";
return false;
}
for (json::iterator it = schema.begin(); it != schema.end(); ++it) {
// make sure schema key name must be composed of '0-9' or 'a-z' or 'A-Z' or '_'
if (!ValidateFieldName(it.key())) {
MS_LOG(ERROR) << "Field name must be composed of '0-9' or 'a-z' or 'A-Z' or '_', fieldName: " << it.key();
MS_LOG(ERROR) << "Invalid data, field [" << it.key()
<< "] in schema is not composed of '0-9' or 'a-z' or 'A-Z' or '_'.";
return false;
}
json it_value = it.value();
if (it_value.find("type") == it_value.end()) {
MS_LOG(ERROR) << "No 'type' field exist: " << it_value.dump();
MS_LOG(ERROR) << "Invalid data, 'type' object can not found in field [" << it_value.dump() << "].";
return false;
}
if (kFieldTypeSet.find(it_value["type"]) == kFieldTypeSet.end()) {
MS_LOG(ERROR) << "Wrong type: " << it_value["type"].dump();
MS_LOG(ERROR) << "Invalid data, type [" << it_value["type"].dump() << "] is not supported.";
return false;
}
@ -125,12 +127,12 @@ bool Schema::Validate(json schema) {
}
if (it_value["type"] == "bytes" || it_value["type"] == "string") {
MS_LOG(ERROR) << it_value["type"].dump() << " can not 1 field only.";
MS_LOG(ERROR) << "Invalid data, field [" << it_value.dump() << "] is invalid.";
return false;
}
if (it_value.size() != kInt2) {
MS_LOG(ERROR) << it_value["type"].dump() << " can have at most 2 fields.";
MS_LOG(ERROR) << "Invalid data, field [" << it_value.dump() << "] is invalid.";
return false;
}

View File

@ -58,8 +58,6 @@ Status ShardSequentialSample::Execute(ShardTaskList &tasks) {
ShardTaskList::TaskListSwap(tasks, new_tasks);
} else { // shuffled
ShardTaskList new_tasks;
CHECK_FAIL_RETURN_UNEXPECTED(taking <= static_cast<int64_t>(tasks.permutation_.size()),
"Taking is out of task range.");
total_no = static_cast<int64_t>(tasks.permutation_.size());
for (size_t i = offset_; i < taking + offset_; ++i) {
new_tasks.AssignTask(tasks, tasks.permutation_[i % total_no]);

View File

@ -69,7 +69,8 @@ Status ShardShuffle::ShuffleFiles(ShardTaskList &tasks) {
if (no_of_samples_ == 0) {
no_of_samples_ = static_cast<int>(tasks.Size());
}
CHECK_FAIL_RETURN_UNEXPECTED(no_of_samples_ > 0, "Parameter no_of_samples need to be positive.");
CHECK_FAIL_RETURN_UNEXPECTED(no_of_samples_ > 0, "Invalid input, Number of samples [" +
std::to_string(no_of_samples_) + "] need to be positive.");
auto shard_sample_cout = GetShardSampleCount();
// shuffle the files index
@ -122,7 +123,8 @@ Status ShardShuffle::ShuffleInfile(ShardTaskList &tasks) {
if (no_of_samples_ == 0) {
no_of_samples_ = static_cast<int>(tasks.Size());
}
CHECK_FAIL_RETURN_UNEXPECTED(no_of_samples_ > 0, "Parameter no_of_samples need to be positive.");
CHECK_FAIL_RETURN_UNEXPECTED(no_of_samples_ > 0, "Invalid input, Number of samples [" +
std::to_string(no_of_samples_) + "] need to be positive.");
// reconstruct the permutation in file
// -- before --
// file1: [0, 1, 2]
@ -153,8 +155,12 @@ Status ShardShuffle::ShuffleInfile(ShardTaskList &tasks) {
}
Status ShardShuffle::Execute(ShardTaskList &tasks) {
if (reshuffle_each_epoch_) shuffle_seed_++;
CHECK_FAIL_RETURN_UNEXPECTED(tasks.categories >= 1, "Task category is invalid.");
if (reshuffle_each_epoch_) {
shuffle_seed_++;
}
CHECK_FAIL_RETURN_UNEXPECTED(
tasks.categories >= 1,
"Invalid data, task categories [" + std::to_string(tasks.categories) + "] need to be larger than 1.");
if (shuffle_type_ == kShuffleSample) { // shuffle each sample
if (tasks.permutation_.empty() == true) {
tasks.MakePerm();
@ -163,7 +169,8 @@ Status ShardShuffle::Execute(ShardTaskList &tasks) {
if (replacement_ == true) {
ShardTaskList new_tasks;
if (no_of_samples_ == 0) no_of_samples_ = static_cast<int>(tasks.sample_ids_.size());
CHECK_FAIL_RETURN_UNEXPECTED(no_of_samples_ > 0, "Parameter no_of_samples need to be positive.");
CHECK_FAIL_RETURN_UNEXPECTED(no_of_samples_ > 0, "Invalid input, Number of samples [" +
std::to_string(no_of_samples_) + "] need to be positive.");
for (uint32_t i = 0; i < no_of_samples_; ++i) {
new_tasks.AssignTask(tasks, tasks.GetRandomTaskID());
}

View File

@ -50,11 +50,11 @@ int64_t Statistics::GetStatisticsID() const { return statistics_id_; }
bool Statistics::Validate(const json &statistics) {
if (statistics.size() != kInt1) {
MS_LOG(ERROR) << "Statistics object is null";
MS_LOG(ERROR) << "Invalid data, 'statistics' is empty.";
return false;
}
if (statistics.find("level") == statistics.end()) {
MS_LOG(ERROR) << "There is not 'level' object in statistic";
MS_LOG(ERROR) << "Invalid data, 'level' object can not found in statistic";
return false;
}
return LevelRecursive(statistics["level"]);
@ -66,18 +66,18 @@ bool Statistics::LevelRecursive(json level) {
json a = it.value();
if (a.size() == kInt2) {
if ((a.find("key") == a.end()) || (a.find("count") == a.end())) {
MS_LOG(ERROR) << "The node field is 2, but 'key'/'count' is not existed";
MS_LOG(ERROR) << "Invalid data, the node field is 2, but 'key'/'count' object does not existed";
return false;
}
} else if (a.size() == kInt3) {
if ((a.find("key") == a.end()) || (a.find("count") == a.end()) || a.find("level") == a.end()) {
MS_LOG(ERROR) << "The node field is 3, but 'key'/'count'/'level' is not existed";
MS_LOG(ERROR) << "Invalid data, the node field is 3, but 'key'/'count'/'level' object does not existed";
return false;
} else {
ini = LevelRecursive(a.at("level"));
}
} else {
MS_LOG(ERROR) << "The node field is not equal 2/3";
MS_LOG(ERROR) << "Invalid data, the node field is not equal to 2 or 3";
return false;
}
}

View File

@ -32,8 +32,10 @@ def create_cv_mindrecord(files_num):
if os.path.exists("{}.db".format(CV_FILE_NAME)):
os.remove("{}.db".format(CV_FILE_NAME))
writer = FileWriter(CV_FILE_NAME, files_num)
cv_schema_json = {"file_name": {"type": "string"}, "label": {"type": "int32"}, "data": {"type": "bytes"}}
data = [{"file_name": "001.jpg", "label": 43, "data": bytes('0xffsafdafda', encoding='utf-8')}]
cv_schema_json = {"file_name": {"type": "string"},
"label": {"type": "int32"}, "data": {"type": "bytes"}}
data = [{"file_name": "001.jpg", "label": 43,
"data": bytes('0xffsafdafda', encoding='utf-8')}]
writer.add_schema(cv_schema_json, "img_schema")
writer.add_index(["file_name", "label"])
writer.write_raw_data(data)
@ -47,8 +49,10 @@ def create_diff_schema_cv_mindrecord(files_num):
if os.path.exists("{}.db".format(CV1_FILE_NAME)):
os.remove("{}.db".format(CV1_FILE_NAME))
writer = FileWriter(CV1_FILE_NAME, files_num)
cv_schema_json = {"file_name_1": {"type": "string"}, "label": {"type": "int32"}, "data": {"type": "bytes"}}
data = [{"file_name_1": "001.jpg", "label": 43, "data": bytes('0xffsafdafda', encoding='utf-8')}]
cv_schema_json = {"file_name_1": {"type": "string"},
"label": {"type": "int32"}, "data": {"type": "bytes"}}
data = [{"file_name_1": "001.jpg", "label": 43,
"data": bytes('0xffsafdafda', encoding='utf-8')}]
writer.add_schema(cv_schema_json, "img_schema")
writer.add_index(["file_name_1", "label"])
writer.write_raw_data(data)
@ -63,8 +67,10 @@ def create_diff_page_size_cv_mindrecord(files_num):
os.remove("{}.db".format(CV1_FILE_NAME))
writer = FileWriter(CV1_FILE_NAME, files_num)
writer.set_page_size(1 << 26) # 64MB
cv_schema_json = {"file_name": {"type": "string"}, "label": {"type": "int32"}, "data": {"type": "bytes"}}
data = [{"file_name": "001.jpg", "label": 43, "data": bytes('0xffsafdafda', encoding='utf-8')}]
cv_schema_json = {"file_name": {"type": "string"},
"label": {"type": "int32"}, "data": {"type": "bytes"}}
data = [{"file_name": "001.jpg", "label": 43,
"data": bytes('0xffsafdafda', encoding='utf-8')}]
writer.add_schema(cv_schema_json, "img_schema")
writer.add_index(["file_name", "label"])
writer.write_raw_data(data)
@ -77,7 +83,8 @@ def test_cv_lack_json():
columns_list = ["data", "file_name", "label"]
num_readers = 4
with pytest.raises(Exception):
ds.MindDataset(CV_FILE_NAME, "no_exist.json", columns_list, num_readers)
ds.MindDataset(CV_FILE_NAME, "no_exist.json",
columns_list, num_readers)
os.remove(CV_FILE_NAME)
os.remove("{}.db".format(CV_FILE_NAME))
@ -95,8 +102,10 @@ def test_invalid_mindrecord():
f.write('just for test')
columns_list = ["data", "file_name", "label"]
num_readers = 4
with pytest.raises(RuntimeError, match="Unexpected error. Invalid file content. path:"):
data_set = ds.MindDataset('dummy.mindrecord', columns_list, num_readers)
with pytest.raises(RuntimeError, match="Unexpected error. Invalid file "
"content, incorrect file or file header is exceeds the upper limit."):
data_set = ds.MindDataset(
'dummy.mindrecord', columns_list, num_readers)
for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
pass
os.remove('dummy.mindrecord')
@ -107,7 +116,7 @@ def test_minddataset_lack_db():
os.remove("{}.db".format(CV_FILE_NAME))
columns_list = ["data", "file_name", "label"]
num_readers = 4
with pytest.raises(RuntimeError, match="Unexpected error. Invalid database file:"):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid database file, path:"):
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, num_readers)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
@ -127,7 +136,8 @@ def test_cv_minddataset_pk_sample_error_class_column():
num_readers = 4
sampler = ds.PKSampler(5, None, True, 'no_exist_column')
with pytest.raises(RuntimeError, match="Unexpected error. Failed to launch read threads."):
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, num_readers, sampler=sampler)
data_set = ds.MindDataset(
CV_FILE_NAME, columns_list, num_readers, sampler=sampler)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
num_iter += 1
@ -155,7 +165,8 @@ def test_cv_minddataset_reader_different_schema():
create_diff_schema_cv_mindrecord(1)
columns_list = ["data", "label"]
num_readers = 4
with pytest.raises(RuntimeError, match="Mindrecord files meta information is different"):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, "
"MindRecord files meta data is not consistent."):
data_set = ds.MindDataset([CV_FILE_NAME, CV1_FILE_NAME], columns_list,
num_readers)
num_iter = 0
@ -172,7 +183,8 @@ def test_cv_minddataset_reader_different_page_size():
create_diff_page_size_cv_mindrecord(1)
columns_list = ["data", "label"]
num_readers = 4
with pytest.raises(RuntimeError, match="Mindrecord files meta information is different"):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, "
"MindRecord files meta data is not consistent."):
data_set = ds.MindDataset([CV_FILE_NAME, CV1_FILE_NAME], columns_list,
num_readers)
num_iter = 0
@ -189,12 +201,14 @@ def test_minddataset_invalidate_num_shards():
columns_list = ["data", "label"]
num_readers = 4
with pytest.raises(Exception) as error_info:
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, num_readers, True, 1, 2)
data_set = ds.MindDataset(
CV_FILE_NAME, columns_list, num_readers, True, 1, 2)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1):
num_iter += 1
try:
assert 'Input shard_id is not within the required interval of [0, 0].' in str(error_info.value)
assert 'Input shard_id is not within the required interval of [0, 0].' in str(
error_info.value)
except Exception as error:
os.remove(CV_FILE_NAME)
os.remove("{}.db".format(CV_FILE_NAME))
@ -209,12 +223,14 @@ def test_minddataset_invalidate_shard_id():
columns_list = ["data", "label"]
num_readers = 4
with pytest.raises(Exception) as error_info:
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, num_readers, True, 1, -1)
data_set = ds.MindDataset(
CV_FILE_NAME, columns_list, num_readers, True, 1, -1)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1):
num_iter += 1
try:
assert 'Input shard_id is not within the required interval of [0, 0].' in str(error_info.value)
assert 'Input shard_id is not within the required interval of [0, 0].' in str(
error_info.value)
except Exception as error:
os.remove(CV_FILE_NAME)
os.remove("{}.db".format(CV_FILE_NAME))
@ -229,24 +245,28 @@ def test_minddataset_shard_id_bigger_than_num_shard():
columns_list = ["data", "label"]
num_readers = 4
with pytest.raises(Exception) as error_info:
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, num_readers, True, 2, 2)
data_set = ds.MindDataset(
CV_FILE_NAME, columns_list, num_readers, True, 2, 2)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1):
num_iter += 1
try:
assert 'Input shard_id is not within the required interval of [0, 1].' in str(error_info.value)
assert 'Input shard_id is not within the required interval of [0, 1].' in str(
error_info.value)
except Exception as error:
os.remove(CV_FILE_NAME)
os.remove("{}.db".format(CV_FILE_NAME))
raise error
with pytest.raises(Exception) as error_info:
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, num_readers, True, 2, 5)
data_set = ds.MindDataset(
CV_FILE_NAME, columns_list, num_readers, True, 2, 5)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1):
num_iter += 1
try:
assert 'Input shard_id is not within the required interval of [0, 1].' in str(error_info.value)
assert 'Input shard_id is not within the required interval of [0, 1].' in str(
error_info.value)
except Exception as error:
os.remove(CV_FILE_NAME)
os.remove("{}.db".format(CV_FILE_NAME))
@ -274,7 +294,8 @@ def test_cv_minddataset_partition_num_samples_equals_0():
with pytest.raises(ValueError) as error_info:
partitions(5)
try:
assert 'num_samples exceeds the boundary between 0 and 9223372036854775807(INT64_MAX)' in str(error_info.value)
assert 'num_samples exceeds the boundary between 0 and 9223372036854775807(INT64_MAX)' in str(
error_info.value)
except Exception as error:
os.remove(CV_FILE_NAME)
os.remove("{}.db".format(CV_FILE_NAME))
@ -294,19 +315,22 @@ def test_mindrecord_exception():
columns_list = ["data", "file_name", "label"]
with pytest.raises(RuntimeError, match="The corresponding data files"):
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, shuffle=False)
data_set = data_set.map(operations=exception_func, input_columns=["data"], num_parallel_workers=1)
data_set = data_set.map(operations=exception_func, input_columns=["data"],
num_parallel_workers=1)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
num_iter += 1
with pytest.raises(RuntimeError, match="The corresponding data files"):
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, shuffle=False)
data_set = data_set.map(operations=exception_func, input_columns=["file_name"], num_parallel_workers=1)
data_set = data_set.map(operations=exception_func, input_columns=["file_name"],
num_parallel_workers=1)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
num_iter += 1
with pytest.raises(RuntimeError, match="The corresponding data files"):
data_set = ds.MindDataset(CV_FILE_NAME, columns_list, shuffle=False)
data_set = data_set.map(operations=exception_func, input_columns=["label"], num_parallel_workers=1)
data_set = data_set.map(operations=exception_func, input_columns=["label"],
num_parallel_workers=1)
num_iter = 0
for _ in data_set.create_dict_iterator(num_epochs=1, output_numpy=True):
num_iter += 1

View File

@ -119,7 +119,7 @@ def test_cifar100_to_mindrecord_directory(fixture_file):
when destination path is directory.
"""
with pytest.raises(RuntimeError,
match="MindRecord file already existed, please delete file:"):
match="Invalid file, Mindrecord files already existed in path:"):
cifar100_transformer = Cifar100ToMR(CIFAR100_DIR,
CIFAR100_DIR)
cifar100_transformer.transform()
@ -130,7 +130,7 @@ def test_cifar100_to_mindrecord_filename_equals_cifar100(fixture_file):
when destination path equals source path.
"""
with pytest.raises(RuntimeError,
match="indRecord file already existed, please delete file:"):
match="Invalid file, Mindrecord files already existed in path:"):
cifar100_transformer = Cifar100ToMR(CIFAR100_DIR,
CIFAR100_DIR + "/train")
cifar100_transformer.transform()

View File

@ -147,7 +147,7 @@ def test_cifar10_to_mindrecord_directory(fixture_file):
when destination path is directory.
"""
with pytest.raises(RuntimeError,
match="MindRecord file already existed, please delete file:"):
match="Unexpected error. Invalid file, Mindrecord files already existed in path:"):
cifar10_transformer = Cifar10ToMR(CIFAR10_DIR, CIFAR10_DIR)
cifar10_transformer.transform()
@ -158,7 +158,7 @@ def test_cifar10_to_mindrecord_filename_equals_cifar10():
when destination path equals source path.
"""
with pytest.raises(RuntimeError,
match="MindRecord file already existed, please delete file:"):
match="Unexpected error. Invalid file, Mindrecord files already existed in path:"):
cifar10_transformer = Cifar10ToMR(CIFAR10_DIR,
CIFAR10_DIR + "/data_batch_0")
cifar10_transformer.transform()

View File

@ -108,7 +108,7 @@ def test_lack_partition_and_db():
with pytest.raises(RuntimeError) as err:
reader = FileReader('dummy.mindrecord')
reader.close()
assert 'Unexpected error. Invalid file path:' in str(err.value)
assert 'Unexpected error. Invalid file, path:' in str(err.value)
def test_lack_db(fixture_cv_file):
"""test file reader when db file does not exist."""
@ -117,7 +117,7 @@ def test_lack_db(fixture_cv_file):
with pytest.raises(RuntimeError) as err:
reader = FileReader(CV_FILE_NAME)
reader.close()
assert 'Unexpected error. Invalid database file:' in str(err.value)
assert 'Unexpected error. Invalid database file, path:' in str(err.value)
def test_lack_some_partition_and_db(fixture_cv_file):
"""test file reader when some partition and db do not exist."""
@ -129,7 +129,7 @@ def test_lack_some_partition_and_db(fixture_cv_file):
with pytest.raises(RuntimeError) as err:
reader = FileReader(CV_FILE_NAME + "0")
reader.close()
assert 'Unexpected error. Invalid file path:' in str(err.value)
assert 'Unexpected error. Invalid file, path:' in str(err.value)
def test_lack_some_partition_first(fixture_cv_file):
"""test file reader when first partition does not exist."""
@ -140,7 +140,7 @@ def test_lack_some_partition_first(fixture_cv_file):
with pytest.raises(RuntimeError) as err:
reader = FileReader(CV_FILE_NAME + "0")
reader.close()
assert 'Unexpected error. Invalid file path:' in str(err.value)
assert 'Unexpected error. Invalid file, path:' in str(err.value)
def test_lack_some_partition_middle(fixture_cv_file):
"""test file reader when some partition does not exist."""
@ -151,7 +151,7 @@ def test_lack_some_partition_middle(fixture_cv_file):
with pytest.raises(RuntimeError) as err:
reader = FileReader(CV_FILE_NAME + "0")
reader.close()
assert 'Unexpected error. Invalid file path:' in str(err.value)
assert 'Unexpected error. Invalid file, path:' in str(err.value)
def test_lack_some_partition_last(fixture_cv_file):
"""test file reader when last partition does not exist."""
@ -162,7 +162,7 @@ def test_lack_some_partition_last(fixture_cv_file):
with pytest.raises(RuntimeError) as err:
reader = FileReader(CV_FILE_NAME + "0")
reader.close()
assert 'Unexpected error. Invalid file path:' in str(err.value)
assert 'Unexpected error. Invalid file, path:' in str(err.value)
def test_mindpage_lack_some_partition(fixture_cv_file):
"""test page reader when some partition does not exist."""
@ -172,7 +172,7 @@ def test_mindpage_lack_some_partition(fixture_cv_file):
os.remove("{}".format(paths[0]))
with pytest.raises(RuntimeError) as err:
MindPage(CV_FILE_NAME + "0")
assert 'Unexpected error. Invalid file path:' in str(err.value)
assert 'Unexpected error. Invalid file, path:' in str(err.value)
def test_lack_some_db(fixture_cv_file):
"""test file reader when some db does not exist."""
@ -183,7 +183,7 @@ def test_lack_some_db(fixture_cv_file):
with pytest.raises(RuntimeError) as err:
reader = FileReader(CV_FILE_NAME + "0")
reader.close()
assert 'Unexpected error. Invalid database file:' in str(err.value)
assert 'Unexpected error. Invalid database file, path:' in str(err.value)
def test_invalid_mindrecord():
@ -193,7 +193,7 @@ def test_invalid_mindrecord():
f.write(dummy)
with pytest.raises(RuntimeError) as err:
FileReader(CV_FILE_NAME)
assert 'Unexpected error. Invalid file content. path:' in str(err.value)
assert "Unexpected error. Invalid file content, incorrect file or file header" in str(err.value)
os.remove(CV_FILE_NAME)
def test_invalid_db(fixture_cv_file):
@ -204,7 +204,7 @@ def test_invalid_db(fixture_cv_file):
f.write('just for test')
with pytest.raises(RuntimeError) as err:
FileReader('imagenet.mindrecord')
assert 'Unexpected error. Error in execute sql:' in str(err.value)
assert "Unexpected error. Failed to execute sql [ SELECT NAME from SHARD_NAME; ], " in str(err.value)
def test_overwrite_invalid_mindrecord(fixture_cv_file):
"""test file writer when overwrite invalid mindreocrd file."""
@ -212,8 +212,7 @@ def test_overwrite_invalid_mindrecord(fixture_cv_file):
f.write('just for test')
with pytest.raises(RuntimeError) as err:
create_cv_mindrecord(1)
assert 'Unexpected error. MindRecord file already existed, please delete file:' \
in str(err.value)
assert 'Unexpected error. Invalid file, Mindrecord files already existed in path:' in str(err.value)
def test_overwrite_invalid_db(fixture_cv_file):
"""test file writer when overwrite invalid db file."""
@ -291,7 +290,8 @@ def test_mindpage_pageno_pagesize_not_int(fixture_cv_file):
with pytest.raises(ParamValueError):
reader.read_at_page_by_name("822", 0, "qwer")
with pytest.raises(RuntimeError, match="Unexpected error. Invalid category id:"):
with pytest.raises(RuntimeError, match=r"Unexpected error. Invalid data, "
r"category_id: 99999 must be in the range \[0, 10\]."):
reader.read_at_page_by_id(99999, 0, 1)
@ -309,10 +309,11 @@ def test_mindpage_filename_not_exist(fixture_cv_file):
info = reader.read_category_info()
logger.info("category info: {}".format(info))
with pytest.raises(RuntimeError, match="Unexpected error. Invalid category id:"):
with pytest.raises(RuntimeError, match=r"Unexpected error. Invalid data, "
r"category_id: 9999 must be in the range \[0, 10\]."):
reader.read_at_page_by_id(9999, 0, 1)
with pytest.raises(RuntimeError, match="Unexpected error. Invalid category name."):
with pytest.raises(RuntimeError, match="Unexpected error. category_name: abc.jpg could not found."):
reader.read_at_page_by_name("abc.jpg", 0, 1)
with pytest.raises(ParamValueError):
@ -464,7 +465,7 @@ def test_write_with_invalid_data():
mindrecord_file_name = "test.mindrecord"
# field: file_name => filename
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -499,7 +500,7 @@ def test_write_with_invalid_data():
writer.commit()
# field: mask => masks
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -534,7 +535,7 @@ def test_write_with_invalid_data():
writer.commit()
# field: data => image
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -569,7 +570,7 @@ def test_write_with_invalid_data():
writer.commit()
# field: label => labels
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -604,7 +605,7 @@ def test_write_with_invalid_data():
writer.commit()
# field: score => scores
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -639,7 +640,7 @@ def test_write_with_invalid_data():
writer.commit()
# string type with int value
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -674,7 +675,7 @@ def test_write_with_invalid_data():
writer.commit()
# field with int64 type, but the real data is string
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -709,7 +710,7 @@ def test_write_with_invalid_data():
writer.commit()
# bytes field is string
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -744,7 +745,7 @@ def test_write_with_invalid_data():
writer.commit()
# field is not numpy type
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")
@ -779,7 +780,7 @@ def test_write_with_invalid_data():
writer.commit()
# not enough field
with pytest.raises(RuntimeError, match="Unexpected error. Data size is not positive."):
with pytest.raises(RuntimeError, match="Unexpected error. Invalid data, schema count should be positive."):
remove_one_file(mindrecord_file_name)
remove_one_file(mindrecord_file_name + ".db")