From bc369921d6708542eb93da33478762f1162a5805 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Tue, 19 Oct 2021 20:31:26 +0100 Subject: [PATCH 1/4] io-wq: max_worker fixes First, fix nr_workers checks against max_workers, with max_worker registration, it may pretty easily happen that nr_workers > max_workers. Also, synchronise writing to acct->max_worker with wqe->lock. It's not an actual problem, but as we don't care about io_wqe_create_worker(), it's better than WRITE_ONCE()/READ_ONCE(). Fixes: 2e480058ddc2 ("io-wq: provide a way to limit max number of workers") Reported-by: Beld Zhang Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/11f90e6b49410b7d1a88f5d04fb8d95bb86b8cf3.1634671835.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- fs/io-wq.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 5bf8aa81715e..422a7ed6a9bd 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -253,7 +253,7 @@ static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) pr_warn_once("io-wq is not configured for unbound workers"); raw_spin_lock(&wqe->lock); - if (acct->nr_workers == acct->max_workers) { + if (acct->nr_workers >= acct->max_workers) { raw_spin_unlock(&wqe->lock); return true; } @@ -1291,15 +1291,18 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) rcu_read_lock(); for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; struct io_wqe_acct *acct; + raw_spin_lock(&wqe->lock); for (i = 0; i < IO_WQ_ACCT_NR; i++) { - acct = &wq->wqes[node]->acct[i]; + acct = &wqe->acct[i]; prev = max_t(int, acct->max_workers, prev); if (new_count[i]) acct->max_workers = new_count[i]; new_count[i] = prev; } + raw_spin_unlock(&wqe->lock); } rcu_read_unlock(); return 0; From e139a1ec92f8dbaaa7380d7e7ea17e148d473d06 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Tue, 19 Oct 2021 23:43:46 +0100 Subject: [PATCH 2/4] io_uring: apply max_workers limit to all future users Currently, IORING_REGISTER_IOWQ_MAX_WORKERS applies only to the task that issued it, it's unexpected for users. If one task creates a ring, limits workers and then passes it to another task the limit won't be applied to the other task. Another pitfall is that a task should either create a ring or submit at least one request for IORING_REGISTER_IOWQ_MAX_WORKERS to work at all, furher complicating the picture. Change the API, save the limits and apply to all future users. Note, it should be done first before giving away the ring or submitting new requests otherwise the result is not guaranteed. Fixes: 2e480058ddc2 ("io-wq: provide a way to limit max number of workers") Link: https://github.com/axboe/liburing/issues/460 Reported-by: Beld Zhang Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/51d0bae97180e08ab722c0d5c93e7439cfb6f697.1634683237.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- fs/io_uring.c | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index e68d27829bb2..e8b71f14ac8b 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -456,6 +456,8 @@ struct io_ring_ctx { struct work_struct exit_work; struct list_head tctx_list; struct completion ref_comp; + u32 iowq_limits[2]; + bool iowq_limits_set; }; }; @@ -9638,7 +9640,16 @@ static int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) ret = io_uring_alloc_task_context(current, ctx); if (unlikely(ret)) return ret; + tctx = current->io_uring; + if (ctx->iowq_limits_set) { + unsigned int limits[2] = { ctx->iowq_limits[0], + ctx->iowq_limits[1], }; + + ret = io_wq_max_workers(tctx->io_wq, limits); + if (ret) + return ret; + } } if (!xa_load(&tctx->xa, (unsigned long)ctx)) { node = kmalloc(sizeof(*node), GFP_KERNEL); @@ -10674,13 +10685,19 @@ static int io_register_iowq_max_workers(struct io_ring_ctx *ctx, tctx = current->io_uring; } - ret = -EINVAL; - if (!tctx || !tctx->io_wq) - goto err; + BUILD_BUG_ON(sizeof(new_count) != sizeof(ctx->iowq_limits)); - ret = io_wq_max_workers(tctx->io_wq, new_count); - if (ret) - goto err; + memcpy(ctx->iowq_limits, new_count, sizeof(new_count)); + ctx->iowq_limits_set = true; + + ret = -EINVAL; + if (tctx && tctx->io_wq) { + ret = io_wq_max_workers(tctx->io_wq, new_count); + if (ret) + goto err; + } else { + memset(new_count, 0, sizeof(new_count)); + } if (sqd) { mutex_unlock(&sqd->lock); From 4ea672ab694c23886b52e97cee10dea056e43e62 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 20 Oct 2021 09:53:02 +0100 Subject: [PATCH 3/4] io_uring: fix ltimeout unprep io_unprep_linked_timeout() is broken, first it needs to return back REQ_F_ARM_LTIMEOUT, so the linked timeout is enqueued and disarmed. But now we refcounted it, and linked timeouts may get not executed at all, leaking a request. Just kill the unprep optimisation. Fixes: 906c6caaf586 ("io_uring: optimise io_prep_linked_timeout()") Reported-by: Beld Zhang Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/51b8e2bfc4bea8ee625cf2ba62b2a350cc9be031.1634719585.git.asml.silence@gmail.com Link: https://github.com/axboe/liburing/issues/460 Reported-by: Beld Zhang Signed-off-by: Jens Axboe --- fs/io_uring.c | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index e8b71f14ac8b..d5cc103224f1 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1370,11 +1370,6 @@ static void io_req_track_inflight(struct io_kiocb *req) } } -static inline void io_unprep_linked_timeout(struct io_kiocb *req) -{ - req->flags &= ~REQ_F_LINK_TIMEOUT; -} - static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req) { if (WARN_ON_ONCE(!req->link)) @@ -6985,7 +6980,7 @@ issue_sqe: switch (io_arm_poll_handler(req)) { case IO_APOLL_READY: if (linked_timeout) - io_unprep_linked_timeout(req); + io_queue_linked_timeout(linked_timeout); goto issue_sqe; case IO_APOLL_ABORTED: /* From b22fa62a35d7f2029d757a518d78041822b7c7c1 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Thu, 21 Oct 2021 13:20:29 +0100 Subject: [PATCH 4/4] io_uring: apply worker limits to previous users Another change to the API io-wq worker limitation API added in 5.15, apply the limit to all prior users that already registered a tctx. It may be confusing as it's now, in particular the change covers the following 2 cases: TASK1 | TASK2 _________________________________________________ ring = create() | | limit_iowq_workers() *not limited* | TASK1 | TASK2 _________________________________________________ ring = create() | | issue_requests() limit_iowq_workers() | | *not limited* A note on locking, it's safe to traverse ->tctx_list as we hold ->uring_lock, but do that after dropping sqd->lock to avoid possible problems. It's also safe to access tctx->io_wq there because tasks kill it only after removing themselves from tctx_list, see io_uring_cancel_generic() -> io_uring_clean_tctx() Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/d6e09ecc3545e4dc56e43c906ee3d71b7ae21bed.1634818641.git.asml.silence@gmail.com Reviewed-by: Hao Xu Signed-off-by: Jens Axboe --- fs/io_uring.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/fs/io_uring.c b/fs/io_uring.c index d5cc103224f1..bc18af5e0a93 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -10649,7 +10649,9 @@ static int io_unregister_iowq_aff(struct io_ring_ctx *ctx) static int io_register_iowq_max_workers(struct io_ring_ctx *ctx, void __user *arg) + __must_hold(&ctx->uring_lock) { + struct io_tctx_node *node; struct io_uring_task *tctx = NULL; struct io_sq_data *sqd = NULL; __u32 new_count[2]; @@ -10702,6 +10704,22 @@ static int io_register_iowq_max_workers(struct io_ring_ctx *ctx, if (copy_to_user(arg, new_count, sizeof(new_count))) return -EFAULT; + /* that's it for SQPOLL, only the SQPOLL task creates requests */ + if (sqd) + return 0; + + /* now propagate the restriction to all registered users */ + list_for_each_entry(node, &ctx->tctx_list, ctx_node) { + struct io_uring_task *tctx = node->task->io_uring; + + if (WARN_ON_ONCE(!tctx->io_wq)) + continue; + + for (i = 0; i < ARRAY_SIZE(new_count); i++) + new_count[i] = ctx->iowq_limits[i]; + /* ignore errors, it always returns zero anyway */ + (void)io_wq_max_workers(tctx->io_wq, new_count); + } return 0; err: if (sqd) {