io_uring: ctx-wide rsrc nodes

If we're going to ever support multiple types of resources we need
shared rsrc nodes to not bloat requests, that is implemented in this
patch. It also gives a nicer API and saves one pointer dereference
in io_req_set_rsrc_node().

We may say that all requests bound to a resource belong to one and only
one rsrc node, and considering that nodes are removed and recycled
strictly in-order, this separates requests into generations, where
generation are changed on each node switch (i.e. io_rsrc_node_switch()).

The API is simple, io_rsrc_node_switch() switches to a new generation if
needed, and also optionally kills a passed in io_rsrc_data. Each call to
io_rsrc_node_switch() have to be preceded with
io_rsrc_node_switch_start(). The start function is idempotent and should
not necessarily be followed by switch.

One difference is that once a node was set it will always retain a valid
rsrc node, even on unregister. It may be a nuisance at the moment, but
makes much sense for multiple types of resources. Another thing changed
is that nodes are bound to/associated with a io_rsrc_data later just
before killing (i.e. switching).

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/7e9c693b4b9a2f47aa784b616ce29843021bb65a.1617287883.git.asml.silence@gmail.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Pavel Begunkov 2021-04-01 15:43:46 +01:00 committed by Jens Axboe
parent e7c78371bb
commit a7f0ed5acd
1 changed files with 38 additions and 37 deletions

View File

@ -235,7 +235,6 @@ struct io_rsrc_data {
struct io_ring_ctx *ctx;
rsrc_put_fn *do_put;
struct io_rsrc_node *node;
struct percpu_ref refs;
struct completion done;
bool quiesce;
@ -448,6 +447,7 @@ struct io_ring_ctx {
struct llist_head rsrc_put_llist;
struct list_head rsrc_ref_list;
spinlock_t rsrc_ref_lock;
struct io_rsrc_node *rsrc_node;
struct io_rsrc_node *rsrc_backup_node;
struct io_restriction restrictions;
@ -1080,7 +1080,7 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req)
struct io_ring_ctx *ctx = req->ctx;
if (!req->fixed_rsrc_refs) {
req->fixed_rsrc_refs = &ctx->file_data->node->refs;
req->fixed_rsrc_refs = &ctx->rsrc_node->refs;
percpu_ref_get(req->fixed_rsrc_refs);
}
}
@ -7093,36 +7093,32 @@ static inline void io_rsrc_ref_unlock(struct io_ring_ctx *ctx)
spin_unlock_bh(&ctx->rsrc_ref_lock);
}
static void io_rsrc_node_set(struct io_ring_ctx *ctx,
struct io_rsrc_data *rsrc_data)
static void io_rsrc_node_switch(struct io_ring_ctx *ctx,
struct io_rsrc_data *data_to_kill)
{
struct io_rsrc_node *rsrc_node = ctx->rsrc_backup_node;
WARN_ON_ONCE(!ctx->rsrc_backup_node);
WARN_ON_ONCE(data_to_kill && !ctx->rsrc_node);
WARN_ON_ONCE(!rsrc_node);
if (data_to_kill) {
struct io_rsrc_node *rsrc_node = ctx->rsrc_node;
ctx->rsrc_backup_node = NULL;
rsrc_node->rsrc_data = rsrc_data;
rsrc_node->rsrc_data = data_to_kill;
io_rsrc_ref_lock(ctx);
list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list);
io_rsrc_ref_unlock(ctx);
io_rsrc_ref_lock(ctx);
rsrc_data->node = rsrc_node;
list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list);
io_rsrc_ref_unlock(ctx);
percpu_ref_get(&rsrc_data->refs);
percpu_ref_get(&data_to_kill->refs);
percpu_ref_kill(&rsrc_node->refs);
ctx->rsrc_node = NULL;
}
if (!ctx->rsrc_node) {
ctx->rsrc_node = ctx->rsrc_backup_node;
ctx->rsrc_backup_node = NULL;
}
}
static void io_rsrc_node_kill(struct io_ring_ctx *ctx, struct io_rsrc_data *data)
{
struct io_rsrc_node *ref_node = NULL;
io_rsrc_ref_lock(ctx);
ref_node = data->node;
data->node = NULL;
io_rsrc_ref_unlock(ctx);
if (ref_node)
percpu_ref_kill(&ref_node->refs);
}
static int io_rsrc_node_prealloc(struct io_ring_ctx *ctx)
static int io_rsrc_node_switch_start(struct io_ring_ctx *ctx)
{
if (ctx->rsrc_backup_node)
return 0;
@ -7139,10 +7135,11 @@ static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, struct io_ring_ctx *ct
data->quiesce = true;
do {
ret = io_rsrc_node_prealloc(ctx);
ret = io_rsrc_node_switch_start(ctx);
if (ret)
break;
io_rsrc_node_kill(ctx, data);
io_rsrc_node_switch(ctx, data);
percpu_ref_kill(&data->refs);
flush_delayed_work(&ctx->rsrc_put_work);
@ -7151,7 +7148,6 @@ static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, struct io_ring_ctx *ct
break;
percpu_ref_resurrect(&data->refs);
io_rsrc_node_set(ctx, data);
reinit_completion(&data->done);
mutex_unlock(&ctx->uring_lock);
@ -7632,7 +7628,7 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return -EINVAL;
if (nr_args > IORING_MAX_FIXED_FILES)
return -EMFILE;
ret = io_rsrc_node_prealloc(ctx);
ret = io_rsrc_node_switch_start(ctx);
if (ret)
return ret;
@ -7694,7 +7690,7 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return ret;
}
io_rsrc_node_set(ctx, file_data);
io_rsrc_node_switch(ctx, NULL);
return ret;
out_fput:
for (i = 0; i < ctx->nr_user_files; i++) {
@ -7783,7 +7779,7 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
return -EOVERFLOW;
if (done > ctx->nr_user_files)
return -EINVAL;
err = io_rsrc_node_prealloc(ctx);
err = io_rsrc_node_switch_start(ctx);
if (err)
return err;
@ -7802,7 +7798,7 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
if (*file_slot) {
file = (struct file *) ((unsigned long) *file_slot & FFS_MASK);
err = io_queue_rsrc_removal(data, data->node, file);
err = io_queue_rsrc_removal(data, ctx->rsrc_node, file);
if (err)
break;
*file_slot = NULL;
@ -7837,10 +7833,8 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
}
}
if (needs_switch) {
percpu_ref_kill(&data->node->refs);
io_rsrc_node_set(ctx, data);
}
if (needs_switch)
io_rsrc_node_switch(ctx, data);
return done ? done : err;
}
@ -8514,8 +8508,15 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_eventfd_unregister(ctx);
io_destroy_buffers(ctx);
/* there are no registered resources left, nobody uses it */
if (ctx->rsrc_node)
io_rsrc_node_destroy(ctx->rsrc_node);
if (ctx->rsrc_backup_node)
io_rsrc_node_destroy(ctx->rsrc_backup_node);
flush_delayed_work(&ctx->rsrc_put_work);
WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
#if defined(CONFIG_UNIX)
if (ctx->ring_sock) {