io_uring: support true async buffered reads, if file provides it

If the file is flagged with FMODE_BUF_RASYNC, then we don't have to punt
the buffered read to an io-wq worker. Instead we can rely on page
unlocking callbacks to support retry based async IO. This is a lot more
efficient than doing async thread offload.

The retry is done similarly to how we handle poll based retry. From
the unlock callback, we simply queue the retry to a task_work based
handler.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Jens Axboe 2020-05-22 09:24:42 -06:00
parent d1932dc3dc
commit bcf5a06304
1 changed files with 135 additions and 4 deletions

View File

@ -78,6 +78,7 @@
#include <linux/fs_struct.h>
#include <linux/splice.h>
#include <linux/task_work.h>
#include <linux/pagemap.h>
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
@ -503,6 +504,8 @@ struct io_async_rw {
struct iovec *iov;
ssize_t nr_segs;
ssize_t size;
struct wait_page_queue wpq;
struct callback_head task_work;
};
struct io_async_ctx {
@ -2750,6 +2753,126 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
return 0;
}
static void __io_async_buf_error(struct io_kiocb *req, int error)
{
struct io_ring_ctx *ctx = req->ctx;
spin_lock_irq(&ctx->completion_lock);
io_cqring_fill_event(req, error);
io_commit_cqring(ctx);
spin_unlock_irq(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
req_set_fail_links(req);
io_double_put_req(req);
}
static void io_async_buf_cancel(struct callback_head *cb)
{
struct io_async_rw *rw;
struct io_kiocb *req;
rw = container_of(cb, struct io_async_rw, task_work);
req = rw->wpq.wait.private;
__io_async_buf_error(req, -ECANCELED);
}
static void io_async_buf_retry(struct callback_head *cb)
{
struct io_async_rw *rw;
struct io_ring_ctx *ctx;
struct io_kiocb *req;
rw = container_of(cb, struct io_async_rw, task_work);
req = rw->wpq.wait.private;
ctx = req->ctx;
__set_current_state(TASK_RUNNING);
if (!io_sq_thread_acquire_mm(ctx, req)) {
mutex_lock(&ctx->uring_lock);
__io_queue_sqe(req, NULL);
mutex_unlock(&ctx->uring_lock);
} else {
__io_async_buf_error(req, -EFAULT);
}
}
static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
int sync, void *arg)
{
struct wait_page_queue *wpq;
struct io_kiocb *req = wait->private;
struct io_async_rw *rw = &req->io->rw;
struct wait_page_key *key = arg;
struct task_struct *tsk;
int ret;
wpq = container_of(wait, struct wait_page_queue, wait);
ret = wake_page_match(wpq, key);
if (ret != 1)
return ret;
list_del_init(&wait->entry);
init_task_work(&rw->task_work, io_async_buf_retry);
/* submit ref gets dropped, acquire a new one */
refcount_inc(&req->refs);
tsk = req->task;
ret = task_work_add(tsk, &rw->task_work, true);
if (unlikely(ret)) {
/* queue just for cancelation */
init_task_work(&rw->task_work, io_async_buf_cancel);
tsk = io_wq_get_task(req->ctx->io_wq);
task_work_add(tsk, &rw->task_work, true);
}
wake_up_process(tsk);
return 1;
}
static bool io_rw_should_retry(struct io_kiocb *req)
{
struct kiocb *kiocb = &req->rw.kiocb;
int ret;
/* never retry for NOWAIT, we just complete with -EAGAIN */
if (req->flags & REQ_F_NOWAIT)
return false;
/* already tried, or we're doing O_DIRECT */
if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
return false;
/*
* just use poll if we can, and don't attempt if the fs doesn't
* support callback based unlocks
*/
if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
return false;
/*
* If request type doesn't require req->io to defer in general,
* we need to allocate it here
*/
if (!req->io && __io_alloc_async_ctx(req))
return false;
ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
io_async_buf_func, req);
if (!ret) {
io_get_req_task(req);
return true;
}
return false;
}
static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
{
if (req->file->f_op->read_iter)
return call_read_iter(req->file, &req->rw.kiocb, iter);
return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
}
static int io_read(struct io_kiocb *req, bool force_nonblock)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
@ -2784,10 +2907,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
unsigned long nr_segs = iter.nr_segs;
ssize_t ret2 = 0;
if (req->file->f_op->read_iter)
ret2 = call_read_iter(req->file, kiocb, &iter);
else
ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
ret2 = io_iter_do_read(req, &iter);
/* Catch -EAGAIN return for forced non-blocking submission */
if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
@ -2804,6 +2924,17 @@ copy_iov:
if (!(req->flags & REQ_F_NOWAIT) &&
!file_can_poll(req->file))
req->flags |= REQ_F_MUST_PUNT;
/* if we can retry, do so with the callbacks armed */
if (io_rw_should_retry(req)) {
ret2 = io_iter_do_read(req, &iter);
if (ret2 == -EIOCBQUEUED) {
goto out_free;
} else if (ret2 != -EAGAIN) {
kiocb_done(kiocb, ret2);
goto out_free;
}
}
kiocb->ki_flags &= ~IOCB_WAITQ;
return -EAGAIN;
}
}