improve performance (#501)

This commit is contained in:
qicosmos 2023-11-21 17:38:08 +08:00 committed by GitHub
parent a37a8d3708
commit 493578223e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 559 additions and 253 deletions

View File

@ -14,10 +14,15 @@
* limitations under the License.
*/
#pragma once
#include <async_simple/Promise.h>
#include <async_simple/Traits.h>
#include <async_simple/coro/FutureAwaiter.h>
#include <cstdio>
#include <filesystem>
#include <fstream>
#include "io_context_pool.hpp"
#if defined(YLT_ENABLE_FILE_IO_URING)
#include <asio/random_access_file.hpp>
#include <asio/stream_file.hpp>
@ -25,11 +30,8 @@
#include <async_simple/coro/Lazy.h>
#include <asio/error.hpp>
#include <asio/io_context.hpp>
#include <cstddef>
#include <exception>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>
@ -39,69 +41,90 @@
#include <vector>
#include "coro_io.hpp"
#include "io_context_pool.hpp"
namespace coro_io {
#if defined(YLT_ENABLE_FILE_IO_URING)
inline asio::file_base::flags default_flags() {
return asio::stream_file::read_write | asio::stream_file::append |
asio::stream_file::create;
}
#endif
enum class open_mode { read, write };
/*
fopen() mode open() flags
r O_RDONLY
w O_WRONLY | O_CREAT | O_TRUNC
a O_WRONLY | O_CREAT | O_APPEND
r+ O_RDWR
w+ O_RDWR | O_CREAT | O_TRUNC
a+ O_RDWR | O_CREAT | O_APPEND
*/
enum flags {
#if defined(ASIO_WINDOWS)
read_only = 1,
write_only = 2,
read_write = 4,
append = 8,
create = 16,
exclusive = 32,
truncate = 64,
create_write = create | write_only,
create_write_trunc = create | write_only | truncate,
create_read_write_trunc = read_write | create | truncate,
create_read_write_append = read_write | create | append,
sync_all_on_write = 128
#else // defined(ASIO_WINDOWS)
read_only = O_RDONLY,
write_only = O_WRONLY,
read_write = O_RDWR,
append = O_APPEND,
create = O_CREAT,
exclusive = O_EXCL,
truncate = O_TRUNC,
create_write = O_CREAT | O_WRONLY,
create_write_trunc = O_WRONLY | O_CREAT | O_TRUNC,
create_read_write_trunc = O_RDWR | O_CREAT | O_TRUNC,
create_read_write_append = O_RDWR | O_CREAT | O_APPEND,
sync_all_on_write = O_SYNC
#endif // defined(ASIO_WINDOWS)
};
class coro_file {
public:
#if defined(YLT_ENABLE_FILE_IO_URING)
coro_file(
std::string_view filepath, open_mode flags = open_mode::read,
coro_io::ExecutorWrapper<>* executor = coro_io::get_global_executor())
: coro_file(filepath, flags, executor->get_asio_executor()) {}
: coro_file(executor->get_asio_executor()) {}
coro_file(std::string_view filepath, open_mode flags,
asio::io_context::executor_type executor) {
try {
stream_file_ = std::make_unique<asio::stream_file>(executor);
} catch (std::exception& ex) {
std::cout << ex.what() << "\n";
return;
}
std::error_code ec;
stream_file_->open(filepath.data(), default_flags(), ec);
if (ec) {
std::cout << ec.message() << "\n";
}
}
coro_file(asio::io_context::executor_type executor)
: executor_wrapper_(executor) {}
#else
coro_file(std::string_view filepath, open_mode flags = open_mode::read,
coro_io::ExecutorWrapper<>* executor =
coro_file(coro_io::ExecutorWrapper<>* executor =
coro_io::get_global_block_executor())
: coro_file(filepath, flags, executor->get_asio_executor()) {}
: coro_file(executor->get_asio_executor()) {}
coro_file(std::string_view filepath, open_mode flags,
asio::io_context::executor_type executor)
: executor_wrapper_(executor) {
std::ios::openmode open_flags = flags == open_mode::read
? std::ios::binary | std::ios::in
: std::ios::out | std::ios::app;
stream_file_ = std::make_unique<std::fstream>(
std::filesystem::path(filepath), open_flags);
if (!stream_file_->is_open()) {
std::cout << "open file " << filepath << " failed "
<< "\n";
stream_file_.reset();
}
}
coro_file(asio::io_context::executor_type executor)
: executor_wrapper_(executor) {}
#endif
bool is_open() {
bool is_open() { return stream_file_ != nullptr; }
void flush() {
#if defined(YLT_ENABLE_FILE_IO_URING)
return stream_file_ && stream_file_->is_open();
#else
return stream_file_ && stream_file_->is_open();
if (stream_file_) {
auto fptr = stream_file_.get();
#if defined(__GNUC__) and defined(USE_PREAD_WRITE)
int fd = *stream_file_;
fsync(fd);
#else
fflush(fptr);
#endif
}
#endif
}
@ -116,6 +139,37 @@ class coro_file {
}
#if defined(YLT_ENABLE_FILE_IO_URING)
async_simple::coro::Lazy<bool> async_open(std::string_view filepath,
int open_mode = flags::read_write) {
try {
stream_file_ = std::make_unique<asio::stream_file>(
executor_wrapper_.get_asio_executor());
} catch (std::exception& ex) {
std::cout << ex.what() << "\n";
co_return false;
}
std::error_code ec;
stream_file_->open(filepath.data(),
static_cast<asio::file_base::flags>(open_mode), ec);
if (ec) {
std::cout << ec.message() << "\n";
co_return false;
}
co_return true;
}
bool seek(long offset, int whence) {
std::error_code seek_ec;
stream_file_->seek(offset, static_cast<asio::file_base::seek_basis>(whence),
seek_ec);
if (seek_ec) {
return false;
}
return true;
}
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
char* data, size_t size) {
size_t left_size = size;
@ -185,59 +239,148 @@ class coro_file {
co_return std::error_code{};
}
#else
std::string str_mode(int open_mode) {
switch (open_mode) {
case flags::read_only:
return "r";
case flags::create_write:
case flags::write_only:
return "w";
case flags::read_write:
return "r+";
case flags::append:
return "a";
case flags::create_read_write_append:
return "a+";
case flags::truncate:
return "w+";
default:
return "r+";
}
}
#if defined(__GNUC__) and defined(USE_PREAD_WRITE)
async_simple::coro::Lazy<bool> async_open(std::string filepath,
int open_mode = flags::read_write) {
if (stream_file_) {
co_return true;
}
int fd = open(filepath.data(), open_mode);
if (fd < 0) {
co_return false;
}
stream_file_ = std::shared_ptr<int>(new int(fd), [](int* ptr) {
::close(*ptr);
delete ptr;
});
co_return true;
}
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_prw(
auto io_func, bool is_read, size_t offset, char* buf, size_t size) {
std::function<int()> func = [=, this] {
int fd = *stream_file_;
return io_func(fd, buf, size, offset);
};
std::error_code ec{};
size_t op_size = 0;
auto len_val = co_await coro_io::post(std::move(func), &executor_wrapper_);
int len = len_val.value();
if (len == 0) {
if (is_read) {
eof_ = true;
}
}
else if (len > 0) {
op_size = len;
}
else {
ec = std::make_error_code(std::errc::io_error);
}
co_return std::make_pair(ec, op_size);
}
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
size_t offset, char* data, size_t size) {
co_return co_await async_prw(pread, true, offset, data, size);
}
async_simple::coro::Lazy<std::error_code> async_write(size_t offset,
const char* data,
size_t size) {
auto result = co_await async_prw(pwrite, false, offset, (char*)data, size);
co_return result.first;
}
#else
bool seek(long offset, int whence) {
return fseek(stream_file_.get(), offset, whence) == 0;
}
async_simple::coro::Lazy<bool> async_open(std::string filepath,
int open_mode = flags::read_write) {
if (stream_file_ != nullptr) {
co_return true;
}
auto result = co_await coro_io::post(
[this, &filepath, open_mode] {
auto fptr = fopen(filepath.data(), str_mode(open_mode).data());
if (fptr == nullptr) {
std::cout << "open file " << filepath << " failed "
<< "\n";
return false;
}
stream_file_ = std::shared_ptr<FILE>(fptr, [](FILE* ptr) {
fclose(ptr);
});
return true;
},
&executor_wrapper_);
co_return result.value();
}
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
char* data, size_t size) {
async_simple::Promise<std::pair<std::error_code, size_t>> promise;
async_read_impl(data, size)
.via(&executor_wrapper_)
.start([&promise](auto&& t) {
if (t.available()) {
promise.setValue(t.value());
auto result = co_await coro_io::post(
[this, data, size] {
auto fptr = stream_file_.get();
size_t read_size = fread(data, sizeof(char), size, fptr);
if (ferror(fptr)) {
return std::pair<std::error_code, size_t>(
std::make_error_code(std::errc::io_error), 0);
}
else {
promise.setValue(std::make_pair(
std::make_error_code(std::errc::io_error), size_t(0)));
}
});
eof_ = feof(fptr);
return std::pair<std::error_code, size_t>(std::error_code{},
read_size);
},
&executor_wrapper_);
co_return co_await promise.getFuture();
co_return result.value();
}
async_simple::coro::Lazy<std::error_code> async_write(const char* data,
size_t size) {
async_simple::Promise<std::error_code> promise;
async_write_impl(data, size)
.via(&executor_wrapper_)
.start([&promise](auto&& t) {
if (t.available()) {
promise.setValue(t.value());
auto result = co_await coro_io::post(
[this, data, size] {
auto fptr = stream_file_.get();
fwrite(data, sizeof(char), size, fptr);
if (ferror(fptr)) {
return std::make_error_code(std::errc::io_error);
}
else {
promise.setValue(std::make_error_code(std::errc::io_error));
}
});
return std::error_code{};
},
&executor_wrapper_);
co_return co_await promise.getFuture();
co_return result.value();
}
#endif
private:
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_impl(
char* data, size_t size) {
stream_file_->read(data, size);
size_t read_size = stream_file_->gcount();
if (!stream_file_ && read_size == 0) {
co_return std::make_pair(std::make_error_code(std::errc::io_error), 0);
}
eof_ = stream_file_->eof();
co_return std::make_pair(std::error_code{}, read_size);
}
async_simple::coro::Lazy<std::error_code> async_write_impl(const char* data,
size_t size) {
stream_file_->write(data, size);
stream_file_->flush();
co_return std::error_code{};
}
#endif
private:
@ -245,10 +388,15 @@ class coro_file {
std::unique_ptr<asio::stream_file> stream_file_;
std::atomic<size_t> seek_offset_ = 0;
#else
std::unique_ptr<std::fstream> stream_file_;
coro_io::ExecutorWrapper<> executor_wrapper_;
#if defined(__GNUC__) and defined(USE_PREAD_WRITE)
std::shared_ptr<int> stream_file_;
#else
std::shared_ptr<FILE> stream_file_;
#endif
#endif
coro_io::ExecutorWrapper<> executor_wrapper_;
std::atomic<bool> eof_ = false;
};
} // namespace coro_io
} // namespace coro_io

View File

@ -104,6 +104,12 @@ struct multipart_t {
size_t size = 0;
};
struct read_result {
std::string_view buf;
bool eof;
std::error_code err;
};
class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
public:
struct config {
@ -681,8 +687,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string filename,
std::string range = "") {
resp_data data{};
auto file = std::make_shared<coro_io::coro_file>(filename,
coro_io::open_mode::write);
auto file = std::make_shared<coro_io::coro_file>();
co_await file->async_open(filename, coro_io::flags::create_write);
if (!file->is_open()) {
data.net_err = std::make_error_code(std::errc::no_such_file_or_directory);
data.status = 404;
@ -751,9 +757,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string_view get_port() { return port_; }
template <typename S, typename File>
template <typename S, typename Source>
async_simple::coro::Lazy<resp_data> async_upload_chunked(
S uri, http_method method, File file,
S uri, http_method method, Source source,
req_content_type content_type = req_content_type::text,
std::unordered_map<std::string, std::string> headers = {}) {
std::shared_ptr<int> guard(nullptr, [this](auto) {
@ -762,6 +768,10 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
});
if (!resp_chunk_str_.empty()) {
resp_chunk_str_.clear();
}
req_context<> ctx{content_type};
resp_data data{};
auto [ok, u] = handle_uri(data, uri);
@ -769,15 +779,16 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return resp_data{{}, 404};
}
constexpr bool is_stream_file = is_stream_ptr_v<File>;
constexpr bool is_stream_file = is_stream_ptr_v<Source>;
if constexpr (is_stream_file) {
if (!file) {
if (!source) {
co_return resp_data{
std::make_error_code(std::errc::no_such_file_or_directory), 404};
}
}
else {
if (!std::filesystem::exists(file)) {
else if constexpr (std::is_same_v<Source, std::string> ||
std::is_same_v<Source, std::string_view>) {
if (!std::filesystem::exists(source)) {
co_return resp_data{
std::make_error_code(std::errc::no_such_file_or_directory), 404};
}
@ -817,18 +828,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string chunk_size_str;
if constexpr (is_stream_file) {
while (!file->eof()) {
while (!source->eof()) {
size_t rd_size =
file->read(file_data.data(), file_data.size()).gcount();
source->read(file_data.data(), file_data.size()).gcount();
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
file_data.data(), rd_size, chunk_size_str, file->eof());
file_data.data(), rd_size, chunk_size_str, source->eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
co_return resp_data{ec, 404};
}
}
}
else {
coro_io::coro_file coro_file(file, coro_io::open_mode::read);
else if constexpr (std::is_same_v<Source, std::string> ||
std::is_same_v<Source, std::string_view>) {
coro_io::coro_file coro_file(source, coro_io::flags::read_only);
while (!coro_file.eof()) {
auto [rd_ec, rd_size] =
co_await coro_file.async_read(file_data.data(), file_data.size());
@ -839,6 +851,20 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}
}
else {
std::string chunk_size_str;
while (true) {
auto result = co_await source();
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
result.buf.data(), result.buf.size(), chunk_size_str, result.eof);
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
co_return resp_data{ec, 404};
}
if (result.eof) {
break;
}
}
}
bool is_keep_alive = true;
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
@ -1546,10 +1572,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
if (is_file) {
coro_io::coro_file file(part.filename, coro_io::open_mode::read);
coro_io::coro_file file{};
co_await file.async_open(part.filename, coro_io::flags::read_only);
assert(file.is_open());
std::string file_data;
file_data.resize(max_single_part_size_);
detail::resize(file_data, max_single_part_size_);
while (!file.eof()) {
auto [rd_ec, rd_size] =
co_await file.async_read(file_data.data(), file_data.size());

View File

@ -49,8 +49,9 @@ void test_read_file() {
ioc.run();
});
coro_io::coro_file file(filename, coro_io::open_mode::read,
ioc.get_executor());
coro_io::coro_file file{};
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
bool r = file.is_open();
if (!file.is_open()) {
return;
@ -84,8 +85,10 @@ void test_write_and_read_file() {
ioc.run();
});
coro_io::coro_file file(filename, coro_io::open_mode::write,
ioc.get_executor());
coro_io::coro_file file{ioc.get_executor()};
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
bool r = file.is_open();
if (!file.is_open()) {
return;
@ -106,8 +109,10 @@ void test_write_and_read_file() {
std::cout << ec.message() << "\n";
}
coro_io::coro_file file1(filename, coro_io::open_mode::read,
ioc.get_executor());
coro_io::coro_file file1{ioc.get_executor()};
async_simple::coro::syncAwait(
file1.async_open(filename, coro_io::flags::read_only));
r = file1.is_open();
if (!file1.is_open()) {
return;
@ -134,7 +139,9 @@ void test_read_with_pool() {
std::string filename = "test1.txt";
create_temp_file("test1.txt", 1024);
coro_io::coro_file file(filename);
coro_io::coro_file file{};
async_simple::coro::syncAwait(file.async_open(filename));
bool r = file.is_open();
if (!file.is_open()) {
return;
@ -155,7 +162,10 @@ void test_read_with_pool() {
}
std::string str = "test async write";
coro_io::coro_file file1(filename, coro_io::open_mode::write);
coro_io::coro_file file1{};
async_simple::coro::syncAwait(
file1.async_open(filename, coro_io::flags::create_write));
r = file1.is_open();
if (!file1.is_open()) {
return;
@ -171,7 +181,10 @@ void test_write_with_pool() {
std::string filename = "test1.txt";
create_temp_file("test1.txt", 10);
coro_io::coro_file file(filename, coro_io::open_mode::write);
coro_io::coro_file file{};
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
bool r = file.is_open();
if (!file.is_open()) {
return;

View File

@ -61,130 +61,125 @@ void create_file(std::string filename, size_t file_size,
return;
}
// TODO: will revert later.
// TEST_CASE("multithread for balance") {
// size_t total = 100;
// std::vector<std::string> filenames;
// for (size_t i = 0; i < total; ++i) {
// filenames.push_back("temp" + std::to_string(i + 1));
// }
//
// std::vector<std::string> write_str_vec;
// char ch = 'a';
// for (int i = 0; i < 26; ++i) {
// std::string str(100, ch + i);
// write_str_vec.push_back(std::move(str));
// }
//
// std::vector<async_simple::coro::Lazy<void>> write_vec;
// auto write_file_func =
// [&write_str_vec](std::string filename,
// int index) mutable -> async_simple::coro::Lazy<void> {
// coro_io::coro_file file(filename, coro_io::open_mode::write,
// coro_io::get_global_block_executor<
// coro_io::multithread_context_pool>());
// CHECK(file.is_open());
//
// size_t id = index % write_str_vec.size();
// auto& str = write_str_vec[id];
// auto ec = co_await file.async_write(str.data(), str.size());
// CHECK(!ec);
// co_return;
// };
//
// for (size_t i = 0; i < total; ++i) {
// write_vec.push_back(write_file_func(filenames[i], i));
// }
//
// auto wait_func =
// [write_vec =
// std::move(write_vec)]() mutable -> async_simple::coro::Lazy<void>
// {
// co_await async_simple::coro::collectAll(std::move(write_vec));
// };
//
// async_simple::coro::syncAwait(wait_func());
//
// std::cout << "write finished\n";
//
// // read and compare
// std::vector<async_simple::coro::Lazy<void>> read_vec;
//
// auto read_file_func =
// [&write_str_vec](std::string filename,
// int index) mutable -> async_simple::coro::Lazy<void> {
// coro_io::coro_file file(filename, coro_io::open_mode::read,
// coro_io::get_global_block_executor<
// coro_io::multithread_context_pool>());
// CHECK(file.is_open());
//
// size_t id = index % write_str_vec.size();
// auto& str = write_str_vec[id];
// std::string buf;
// buf.resize(write_str_vec.back().size());
//
// std::error_code ec;
// size_t read_size;
// std::tie(ec, read_size) = co_await file.async_read(buf.data(),
// buf.size()); CHECK(!ec); bool ok = (str == buf); if (!ok) {
// std::cout << "str: " << str << "\n";
// std::cout << "read buf: " << buf << "\n";
// }
// CHECK(ok);
// co_return;
// };
//
// for (size_t i = 0; i < total; ++i) {
// read_vec.push_back(read_file_func(filenames[i], i));
// }
//
// auto wait_read_func =
// [read_vec =
// std::move(read_vec)]() mutable -> async_simple::coro::Lazy<void> {
// co_await async_simple::coro::collectAll(std::move(read_vec));
// };
//
// async_simple::coro::syncAwait(wait_read_func());
// std::cout << "read finished\n";
//
// std::error_code ec;
// for (auto& filename : filenames) {
// fs::remove(fs::path(filename), ec);
// if (ec) {
// std::cout << "remove file error: " << ec.message() << "\n";
// }
// }
// }
async_simple::coro::Lazy<void> foo() { co_return; }
TEST_CASE("test currentThreadInExecutor") {
CHECK(*coro_io::get_current() == nullptr);
CHECK(coro_io::get_global_executor()->currentContextId() == 0);
CHECK_NOTHROW(
async_simple::coro::syncAwait(foo().via(coro_io::get_global_executor())));
auto executor = coro_io::get_global_executor();
foo().via(executor).start([executor](auto&&) {
auto ptr = &executor->get_asio_executor().context();
CHECK(ptr == *coro_io::get_current());
size_t id = executor->currentContextId();
CHECK(id > 0);
});
void create_files(const std::vector<std::string>& files, size_t file_size) {
std::string content(file_size, 'A');
for (auto& filename : files) {
std::ofstream out(filename, std::ios::binary);
out.write(content.data(), content.size());
}
}
TEST_CASE("read write 100 small files") {
#if defined(__GNUC__) and defined(USE_PREAD_WRITE)
TEST_CASE("coro_file pread and pwrite basic test") {
std::string filename = "test.tmp";
create_files({filename}, 190);
{
coro_io::coro_file file{};
async_simple::coro::syncAwait(
file.async_open(filename.data(), coro_io::flags::read_only));
CHECK(file.is_open());
char buf[100];
auto pair = async_simple::coro::syncAwait(file.async_read(0, buf, 10));
CHECK(std::string_view(buf, pair.second) == "AAAAAAAAAA");
CHECK(!file.eof());
pair = async_simple::coro::syncAwait(file.async_read(10, buf, 100));
CHECK(!file.eof());
CHECK(pair.second == 100);
pair = async_simple::coro::syncAwait(file.async_read(110, buf, 100));
CHECK(!file.eof());
CHECK(pair.second == 80);
// only read size equal 0 is eof.
pair = async_simple::coro::syncAwait(file.async_read(200, buf, 100));
CHECK(file.eof());
CHECK(pair.second == 0);
}
{
coro_io::coro_file file{};
async_simple::coro::syncAwait(
file.async_open(filename.data(), coro_io::flags::read_write));
CHECK(file.is_open());
std::string buf = "cccccccccc";
auto ec = async_simple::coro::syncAwait(
file.async_write(0, buf.data(), buf.size()));
CHECK(!ec);
std::string buf1 = "dddddddddd";
ec = async_simple::coro::syncAwait(
file.async_write(10, buf1.data(), buf1.size()));
CHECK(!ec);
char buf2[100];
auto pair = async_simple::coro::syncAwait(file.async_read(0, buf2, 10));
CHECK(!file.eof());
CHECK(std::string_view(buf2, pair.second) == "cccccccccc");
pair = async_simple::coro::syncAwait(file.async_read(10, buf2, 10));
CHECK(!file.eof());
CHECK(std::string_view(buf2, pair.second) == "dddddddddd");
}
}
#else
async_simple::coro::Lazy<void> test_basic_read(std::string filename) {
coro_io::coro_file file{};
co_await file.async_open(filename.data(), coro_io::flags::read_only);
std::string str;
str.resize(200);
{
auto [ec, size] = co_await file.async_read(str.data(), 10);
std::cout << size << ", " << file.eof() << "\n";
}
{
bool ok = file.seek(10, SEEK_CUR);
std::cout << ok << "\n";
}
{
auto [ec, size] = co_await file.async_read(str.data(), str.size());
std::cout << size << ", " << file.eof() << "\n";
}
}
async_simple::coro::Lazy<void> test_basic_write(std::string filename) {
coro_io::coro_file file{};
co_await file.async_open(filename.data(), coro_io::flags::read_write);
std::string str = "hello";
{
auto ec = co_await file.async_write(str.data(), str.size());
std::string result;
result.resize(10);
file.seek(0, SEEK_SET);
auto [rd_ec, size] = co_await file.async_read(result.data(), 5);
std::string_view s(result.data(), size);
CHECK(s == "hello");
}
{
bool ok = file.seek(10, SEEK_SET);
auto ec = co_await file.async_write(str.data(), str.size());
file.seek(10, SEEK_SET);
std::string result;
result.resize(10);
auto [rd_ec, size] = co_await file.async_read(result.data(), 5);
std::string_view s(result.data(), size);
CHECK(s == "hello");
std::cout << ec << "\n";
}
}
TEST_CASE("multithread for balance") {
size_t total = 100;
std::vector<std::string> filenames;
for (size_t i = 0; i < total; ++i) {
filenames.push_back("temp" + std::to_string(i + 1));
}
coro_io::io_context_pool pool(std::thread::hardware_concurrency());
std::thread thd([&pool] {
pool.run();
});
std::vector<std::string> write_str_vec;
char ch = 'a';
for (int i = 0; i < 26; ++i) {
@ -192,13 +187,14 @@ TEST_CASE("read write 100 small files") {
write_str_vec.push_back(std::move(str));
}
std::vector<async_simple::coro::Lazy<void>> write_vec;
std::vector<async_simple::coro::Lazy<void> > write_vec;
auto write_file_func =
[&pool, &write_str_vec](
std::string filename,
int index) mutable -> async_simple::coro::Lazy<void> {
coro_io::coro_file file(filename, coro_io::open_mode::write);
[&write_str_vec](std::string filename,
int index) mutable -> async_simple::coro::Lazy<void> {
coro_io::coro_file file(coro_io::get_global_block_executor<
coro_io::multithread_context_pool>());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
CHECK(file.is_open());
size_t id = index % write_str_vec.size();
@ -221,13 +217,106 @@ TEST_CASE("read write 100 small files") {
async_simple::coro::syncAwait(wait_func());
// read and compare
std::vector<async_simple::coro::Lazy<void>> read_vec;
std::vector<async_simple::coro::Lazy<void> > read_vec;
auto read_file_func =
[&write_str_vec](std::string filename,
int index) mutable -> async_simple::coro::Lazy<void> {
coro_io::coro_file file(coro_io::get_global_block_executor<
coro_io::multithread_context_pool>());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
CHECK(file.is_open());
size_t id = index % write_str_vec.size();
auto& str = write_str_vec[id];
std::string buf;
buf.resize(write_str_vec.back().size());
std::error_code ec;
size_t read_size;
std::tie(ec, read_size) = co_await file.async_read(buf.data(), buf.size());
CHECK(!ec);
CHECK(str == buf);
co_return;
};
for (size_t i = 0; i < total; ++i) {
read_vec.push_back(read_file_func(filenames[i], i));
}
auto wait_read_func =
[read_vec =
std::move(read_vec)]() mutable -> async_simple::coro::Lazy<void> {
co_await async_simple::coro::collectAll(std::move(read_vec));
};
async_simple::coro::syncAwait(wait_read_func());
for (auto& filename : filenames) {
fs::remove(fs::path(filename));
}
}
TEST_CASE("read write 100 small files") {
size_t total = 100;
std::vector<std::string> filenames;
for (size_t i = 0; i < total; ++i) {
filenames.push_back("temp" + std::to_string(i + 1));
}
coro_io::io_context_pool pool(std::thread::hardware_concurrency());
std::thread thd([&pool] {
pool.run();
});
std::vector<std::string> write_str_vec;
char ch = 'a';
for (int i = 0; i < 26; ++i) {
std::string str(100, ch + i);
write_str_vec.push_back(std::move(str));
}
std::vector<async_simple::coro::Lazy<void> > write_vec;
auto write_file_func =
[&pool, &write_str_vec](
std::string filename,
int index) mutable -> async_simple::coro::Lazy<void> {
coro_io::coro_file file(pool.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
CHECK(file.is_open());
size_t id = index % write_str_vec.size();
auto& str = write_str_vec[id];
auto ec = co_await file.async_write(str.data(), str.size());
CHECK(!ec);
co_return;
};
for (size_t i = 0; i < total; ++i) {
write_vec.push_back(write_file_func(filenames[i], i));
}
auto wait_func =
[write_vec =
std::move(write_vec)]() mutable -> async_simple::coro::Lazy<void> {
co_await async_simple::coro::collectAll(std::move(write_vec));
};
async_simple::coro::syncAwait(wait_func());
// read and compare
std::vector<async_simple::coro::Lazy<void> > read_vec;
auto read_file_func =
[&pool, &write_str_vec](
std::string filename,
int index) mutable -> async_simple::coro::Lazy<void> {
coro_io::coro_file file(filename);
coro_io::coro_file file(pool.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
CHECK(file.is_open());
size_t id = index % write_str_vec.size();
@ -274,7 +363,9 @@ TEST_CASE("small_file_read_test") {
ioc.run();
});
coro_io::coro_file file(filename);
coro_io::coro_file file(ioc.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
CHECK(file.is_open());
char buf[block_size]{};
@ -309,7 +400,9 @@ TEST_CASE("large_file_read_test") {
ioc.run();
});
coro_io::coro_file file(filename);
coro_io::coro_file file(ioc.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
CHECK(file.is_open());
char buf[block_size]{};
@ -344,7 +437,9 @@ TEST_CASE("empty_file_read_test") {
ioc.run();
});
coro_io::coro_file file(filename);
coro_io::coro_file file(ioc.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
CHECK(file.is_open());
char buf[block_size]{};
@ -375,7 +470,9 @@ TEST_CASE("small_file_read_with_pool_test") {
pool.run();
});
coro_io::coro_file file(filename);
coro_io::coro_file file(pool.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
CHECK(file.is_open());
char buf[block_size]{};
@ -409,7 +506,9 @@ TEST_CASE("large_file_read_with_pool_test") {
pool.run();
});
coro_io::coro_file file(filename);
coro_io::coro_file file(pool.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::read_only));
CHECK(file.is_open());
char buf[block_size]{};
@ -441,7 +540,9 @@ TEST_CASE("small_file_write_test") {
ioc.run();
});
coro_io::coro_file file(filename, coro_io::open_mode::write);
coro_io::coro_file file(ioc.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
CHECK(file.is_open());
char buf[512]{};
@ -454,6 +555,8 @@ TEST_CASE("small_file_write_test") {
std::cout << ec.message() << "\n";
}
file.flush();
std::ifstream is(filename, std::ios::binary);
if (!is.is_open()) {
std::cout << "Failed to open file: " << filename << "\n";
@ -476,6 +579,7 @@ TEST_CASE("small_file_write_test") {
if (ec) {
std::cout << ec.message() << "\n";
}
file.flush();
is.open(filename, std::ios::binary);
if (!is.is_open()) {
std::cout << "Failed to open file: " << filename << "\n";
@ -505,7 +609,9 @@ TEST_CASE("large_file_write_test") {
ioc.run();
});
coro_io::coro_file file(filename, coro_io::open_mode::write);
coro_io::coro_file file(ioc.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
CHECK(file.is_open());
auto block_vec = create_filled_vec("large_file_write_test");
@ -526,6 +632,7 @@ TEST_CASE("large_file_write_test") {
std::cout << ec.message() << "\n";
}
}
file.flush();
CHECK(fs::file_size(filename) == file_size);
std::ifstream is(filename, std::ios::binary);
if (!is.is_open()) {
@ -557,7 +664,9 @@ TEST_CASE("empty_file_write_test") {
ioc.run();
});
coro_io::coro_file file(filename, coro_io::open_mode::write);
coro_io::coro_file file(ioc.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
CHECK(file.is_open());
char buf[512]{};
@ -569,7 +678,7 @@ TEST_CASE("empty_file_write_test") {
if (ec) {
std::cout << ec.message() << "\n";
}
file.flush();
std::ifstream is(filename, std::ios::binary);
if (!is.is_open()) {
std::cout << "Failed to open file: " << filename << "\n";
@ -591,7 +700,9 @@ TEST_CASE("small_file_write_with_pool_test") {
pool.run();
});
coro_io::coro_file file(filename, coro_io::open_mode::write);
coro_io::coro_file file(pool.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
CHECK(file.is_open());
char buf[512]{};
@ -603,6 +714,7 @@ TEST_CASE("small_file_write_with_pool_test") {
if (ec) {
std::cout << ec.message() << "\n";
}
file.flush();
std::ifstream is(filename, std::ios::binary);
if (!is.is_open()) {
@ -626,6 +738,7 @@ TEST_CASE("small_file_write_with_pool_test") {
if (ec) {
std::cout << ec.message() << "\n";
}
file.flush();
is.open(filename, std::ios::binary);
if (!is.is_open()) {
std::cout << "Failed to open file: " << filename << "\n";
@ -654,7 +767,9 @@ TEST_CASE("large_file_write_with_pool_test") {
pool.run();
});
coro_io::coro_file file(filename, coro_io::open_mode::write);
coro_io::coro_file file(pool.get_executor());
async_simple::coro::syncAwait(
file.async_open(filename, coro_io::flags::create_write));
CHECK(file.is_open());
auto block_vec = create_filled_vec("large_file_write_with_pool_test");
@ -675,7 +790,9 @@ TEST_CASE("large_file_write_with_pool_test") {
std::cout << ec.message() << "\n";
}
}
CHECK(fs::file_size(filename) == file_size);
file.flush();
size_t sz = fs::file_size(filename);
CHECK(sz == file_size);
std::ifstream is(filename, std::ios::binary);
if (!is.is_open()) {
std::cout << "Failed to open file: " << filename << "\n";
@ -698,3 +815,4 @@ TEST_CASE("large_file_write_with_pool_test") {
file.close();
fs::remove(fs::path(filename));
}
#endif