diff --git a/fs/io_uring.c b/fs/io_uring.c
index b94d57c1b0e5..a8413f006417 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -611,6 +611,7 @@ struct io_sr_msg {
 	int				msg_flags;
 	int				bgid;
 	size_t				len;
+	size_t				done_io;
 };
 
 struct io_open {
@@ -781,6 +782,7 @@ enum {
 	REQ_F_SKIP_LINK_CQES_BIT,
 	REQ_F_SINGLE_POLL_BIT,
 	REQ_F_DOUBLE_POLL_BIT,
+	REQ_F_PARTIAL_IO_BIT,
 	/* keep async read/write and isreg together and in order */
 	REQ_F_SUPPORT_NOWAIT_BIT,
 	REQ_F_ISREG_BIT,
@@ -843,6 +845,8 @@ enum {
 	REQ_F_SINGLE_POLL	= BIT(REQ_F_SINGLE_POLL_BIT),
 	/* double poll may active */
 	REQ_F_DOUBLE_POLL	= BIT(REQ_F_DOUBLE_POLL_BIT),
+	/* request has already done partial IO */
+	REQ_F_PARTIAL_IO	= BIT(REQ_F_PARTIAL_IO_BIT),
 };
 
 struct async_poll {
@@ -923,7 +927,6 @@ struct io_kiocb {
 	struct io_wq_work_node		comp_list;
 	atomic_t			refs;
 	atomic_t			poll_refs;
-	struct io_kiocb			*link;
 	struct io_task_work		io_task_work;
 	/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
 	struct hlist_node		hash_node;
@@ -931,9 +934,11 @@ struct io_kiocb {
 	struct async_poll		*apoll;
 	/* opcode allocated if it needs to store data for async defer */
 	void				*async_data;
-	/* custom credentials, valid IFF REQ_F_CREDS is set */
 	/* stores selected buf, valid IFF REQ_F_BUFFER_SELECTED is set */
 	struct io_buffer		*kbuf;
+	/* linked requests, IFF REQ_F_HARDLINK or REQ_F_LINK are set */
+	struct io_kiocb			*link;
+	/* custom credentials, valid IFF REQ_F_CREDS is set */
 	const struct cred		*creds;
 	struct io_wq_work		work;
 };
@@ -962,6 +967,7 @@ struct io_op_def {
 	/* set if opcode supports polled "wait" */
 	unsigned		pollin : 1;
 	unsigned		pollout : 1;
+	unsigned		poll_exclusive : 1;
 	/* op supports buffer selection */
 	unsigned		buffer_select : 1;
 	/* do prep async if is going to be punted */
@@ -1056,6 +1062,7 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.pollin			= 1,
+		.poll_exclusive		= 1,
 	},
 	[IORING_OP_ASYNC_CANCEL] = {
 		.audit_skip		= 1,
@@ -1330,6 +1337,8 @@ static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list)
 
 static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
 {
+	lockdep_assert_held(&req->ctx->completion_lock);
+
 	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
 		return 0;
 	return __io_put_kbuf(req, &req->ctx->io_buffers_comp);
@@ -1362,6 +1371,8 @@ static inline unsigned int io_put_kbuf(struct io_kiocb *req,
 		cflags = __io_put_kbuf(req, &ctx->io_buffers_comp);
 		spin_unlock(&ctx->completion_lock);
 	} else {
+		lockdep_assert_held(&req->ctx->uring_lock);
+
 		cflags = __io_put_kbuf(req, &req->ctx->io_buffers_cache);
 	}
 
@@ -1382,7 +1393,7 @@ static struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx,
 	return NULL;
 }
 
-static void io_kbuf_recycle(struct io_kiocb *req)
+static void io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_buffer_list *bl;
@@ -1390,6 +1401,12 @@ static void io_kbuf_recycle(struct io_kiocb *req)
 
 	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
 		return;
+	/* don't recycle if we already did IO to this buffer */
+	if (req->flags & REQ_F_PARTIAL_IO)
+		return;
+
+	if (issue_flags & IO_URING_F_UNLOCKED)
+		mutex_lock(&ctx->uring_lock);
 
 	lockdep_assert_held(&ctx->uring_lock);
 
@@ -1398,6 +1415,9 @@ static void io_kbuf_recycle(struct io_kiocb *req)
 	list_add(&buf->list, &bl->buf_list);
 	req->flags &= ~REQ_F_BUFFER_SELECTED;
 	req->kbuf = NULL;
+
+	if (issue_flags & IO_URING_F_UNLOCKED)
+		mutex_unlock(&ctx->uring_lock);
 }
 
 static bool io_match_task(struct io_kiocb *head, struct task_struct *task,
@@ -2104,6 +2124,12 @@ static void __io_req_complete_post(struct io_kiocb *req, s32 res,
 			}
 		}
 		io_req_put_rsrc(req, ctx);
+		/*
+		 * Selected buffer deallocation in io_clean_op() assumes that
+		 * we don't hold ->completion_lock. Clean them here to avoid
+		 * deadlocks.
+		 */
+		io_put_kbuf_comp(req);
 		io_dismantle_req(req);
 		io_put_task(req->task, 1);
 		wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
@@ -2148,7 +2174,7 @@ static inline void io_req_complete(struct io_kiocb *req, s32 res)
 static void io_req_complete_failed(struct io_kiocb *req, s32 res)
 {
 	req_set_fail(req);
-	io_req_complete_post(req, res, io_put_kbuf(req, 0));
+	io_req_complete_post(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
 }
 
 static void io_req_complete_fail_submit(struct io_kiocb *req)
@@ -2437,6 +2463,8 @@ static void handle_prev_tw_list(struct io_wq_work_node *node,
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
 						    io_task_work.node);
 
+		prefetch(container_of(next, struct io_kiocb, io_task_work.node));
+
 		if (req->ctx != *ctx) {
 			if (unlikely(!*uring_locked && *ctx))
 				ctx_commit_and_unlock(*ctx);
@@ -2469,6 +2497,8 @@ static void handle_tw_list(struct io_wq_work_node *node,
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
 						    io_task_work.node);
 
+		prefetch(container_of(next, struct io_kiocb, io_task_work.node));
+
 		if (req->ctx != *ctx) {
 			ctx_flush_and_put(*ctx, locked);
 			*ctx = req->ctx;
@@ -2974,8 +3004,12 @@ static bool io_rw_should_reissue(struct io_kiocb *req)
 
 static bool __io_complete_rw_common(struct io_kiocb *req, long res)
 {
-	if (req->rw.kiocb.ki_flags & IOCB_WRITE)
+	if (req->rw.kiocb.ki_flags & IOCB_WRITE) {
 		kiocb_end_write(req);
+		fsnotify_modify(req->file);
+	} else {
+		fsnotify_access(req->file);
+	}
 	if (unlikely(res != req->result)) {
 		if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
 		    io_rw_should_reissue(req)) {
@@ -4439,9 +4473,6 @@ static int io_msg_ring_prep(struct io_kiocb *req,
 		     sqe->splice_fd_in || sqe->buf_index || sqe->personality))
 		return -EINVAL;
 
-	if (req->file->f_op != &io_uring_fops)
-		return -EBADFD;
-
 	req->msg.user_data = READ_ONCE(sqe->off);
 	req->msg.len = READ_ONCE(sqe->len);
 	return 0;
@@ -4451,14 +4482,18 @@ static int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_ring_ctx *target_ctx;
 	struct io_msg *msg = &req->msg;
-	int ret = -EOVERFLOW;
 	bool filled;
+	int ret;
 
+	ret = -EBADFD;
+	if (req->file->f_op != &io_uring_fops)
+		goto done;
+
+	ret = -EOVERFLOW;
 	target_ctx = req->file->private_data;
 
 	spin_lock(&target_ctx->completion_lock);
-	filled = io_fill_cqe_aux(target_ctx, msg->user_data, msg->len,
-					IORING_CQE_F_MSG);
+	filled = io_fill_cqe_aux(target_ctx, msg->user_data, msg->len, 0);
 	io_commit_cqring(target_ctx);
 	spin_unlock(&target_ctx->completion_lock);
 
@@ -4467,6 +4502,9 @@ static int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
 		ret = 0;
 	}
 
+done:
+	if (ret < 0)
+		req_set_fail(req);
 	__io_req_complete(req, issue_flags, ret, 0);
 	return 0;
 }
@@ -4537,6 +4575,8 @@ static int io_fallocate(struct io_kiocb *req, unsigned int issue_flags)
 				req->sync.len);
 	if (ret < 0)
 		req_set_fail(req);
+	else
+		fsnotify_modify(req->file);
 	io_req_complete(req, ret);
 	return 0;
 }
@@ -5419,12 +5459,21 @@ static int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	if (req->ctx->compat)
 		sr->msg_flags |= MSG_CMSG_COMPAT;
 #endif
+	sr->done_io = 0;
 	return 0;
 }
 
+static bool io_net_retry(struct socket *sock, int flags)
+{
+	if (!(flags & MSG_WAITALL))
+		return false;
+	return sock->type == SOCK_STREAM || sock->type == SOCK_SEQPACKET;
+}
+
 static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_async_msghdr iomsg, *kmsg;
+	struct io_sr_msg *sr = &req->sr_msg;
 	struct socket *sock;
 	struct io_buffer *kbuf;
 	unsigned flags;
@@ -5467,6 +5516,11 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 			return io_setup_async_msg(req, kmsg);
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
+		if (ret > 0 && io_net_retry(sock, flags)) {
+			sr->done_io += ret;
+			req->flags |= REQ_F_PARTIAL_IO;
+			return io_setup_async_msg(req, kmsg);
+		}
 		req_set_fail(req);
 	} else if ((flags & MSG_WAITALL) && (kmsg->msg.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
 		req_set_fail(req);
@@ -5476,6 +5530,10 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	if (kmsg->free_iov)
 		kfree(kmsg->free_iov);
 	req->flags &= ~REQ_F_NEED_CLEANUP;
+	if (ret >= 0)
+		ret += sr->done_io;
+	else if (sr->done_io)
+		ret = sr->done_io;
 	__io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags));
 	return 0;
 }
@@ -5526,12 +5584,23 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 			return -EAGAIN;
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
+		if (ret > 0 && io_net_retry(sock, flags)) {
+			sr->len -= ret;
+			sr->buf += ret;
+			sr->done_io += ret;
+			req->flags |= REQ_F_PARTIAL_IO;
+			return -EAGAIN;
+		}
 		req_set_fail(req);
 	} else if ((flags & MSG_WAITALL) && (msg.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
 out_free:
 		req_set_fail(req);
 	}
 
+	if (ret >= 0)
+		ret += sr->done_io;
+	else if (sr->done_io)
+		ret = sr->done_io;
 	__io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags));
 	return 0;
 }
@@ -5569,9 +5638,6 @@ static int io_accept(struct io_kiocb *req, unsigned int issue_flags)
 	struct file *file;
 	int ret, fd;
 
-	if (req->file->f_flags & O_NONBLOCK)
-		req->flags |= REQ_F_NOWAIT;
-
 	if (!fixed) {
 		fd = __get_unused_fd_flags(accept->flags, accept->nofile);
 		if (unlikely(fd < 0))
@@ -5801,7 +5867,7 @@ struct io_poll_table {
 };
 
 #define IO_POLL_CANCEL_FLAG	BIT(31)
-#define IO_POLL_REF_MASK	((1u << 20)-1)
+#define IO_POLL_REF_MASK	GENMASK(30, 0)
 
 /*
  * If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can
@@ -6035,10 +6101,13 @@ static void io_poll_cancel_req(struct io_kiocb *req)
 	io_poll_execute(req, 0, 0);
 }
 
+#define wqe_to_req(wait)	((void *)((unsigned long) (wait)->private & ~1))
+#define wqe_is_double(wait)	((unsigned long) (wait)->private & 1)
+
 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 			void *key)
 {
-	struct io_kiocb *req = wait->private;
+	struct io_kiocb *req = wqe_to_req(wait);
 	struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
 						 wait);
 	__poll_t mask = key_to_poll(key);
@@ -6076,7 +6145,10 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 		if (mask && poll->events & EPOLLONESHOT) {
 			list_del_init(&poll->wait.entry);
 			poll->head = NULL;
-			req->flags &= ~REQ_F_SINGLE_POLL;
+			if (wqe_is_double(wait))
+				req->flags &= ~REQ_F_DOUBLE_POLL;
+			else
+				req->flags &= ~REQ_F_SINGLE_POLL;
 		}
 		__io_poll_execute(req, mask, poll->events);
 	}
@@ -6088,6 +6160,7 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt,
 			    struct io_poll_iocb **poll_ptr)
 {
 	struct io_kiocb *req = pt->req;
+	unsigned long wqe_private = (unsigned long) req;
 
 	/*
 	 * The file being polled uses multiple waitqueues for poll handling
@@ -6113,6 +6186,8 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt,
 			pt->error = -ENOMEM;
 			return;
 		}
+		/* mark as double wq entry */
+		wqe_private |= 1;
 		req->flags |= REQ_F_DOUBLE_POLL;
 		io_init_poll_iocb(poll, first->events, first->wait.func);
 		*poll_ptr = poll;
@@ -6123,7 +6198,7 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt,
 	req->flags |= REQ_F_SINGLE_POLL;
 	pt->nr_entries++;
 	poll->head = head;
-	poll->wait.private = req;
+	poll->wait.private = (void *) wqe_private;
 
 	if (poll->events & EPOLLEXCLUSIVE)
 		add_wait_queue_exclusive(head, &poll->wait);
@@ -6150,7 +6225,6 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
 	INIT_HLIST_NODE(&req->hash_node);
 	io_init_poll_iocb(poll, mask, io_poll_wake);
 	poll->file = req->file;
-	poll->wait.private = req;
 
 	ipt->pt._key = mask;
 	ipt->req = req;
@@ -6238,7 +6312,8 @@ static int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
 	} else {
 		mask |= POLLOUT | POLLWRNORM;
 	}
-
+	if (def->poll_exclusive)
+		mask |= EPOLLEXCLUSIVE;
 	if (!(issue_flags & IO_URING_F_UNLOCKED) &&
 	    !list_empty(&ctx->apoll_cache)) {
 		apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
@@ -6254,6 +6329,8 @@ static int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
 	req->flags |= REQ_F_POLLED;
 	ipt.pt._qproc = io_async_queue_proc;
 
+	io_kbuf_recycle(req, issue_flags);
+
 	ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask);
 	if (ret || ipt.error)
 		return ret ? IO_APOLL_READY : IO_APOLL_ABORTED;
@@ -6281,6 +6358,7 @@ static __cold bool io_poll_remove_all(struct io_ring_ctx *ctx,
 		list = &ctx->cancel_hash[i];
 		hlist_for_each_entry_safe(req, tmp, list, hash_node) {
 			if (io_match_task_safe(req, tsk, cancel_all)) {
+				hlist_del_init(&req->hash_node);
 				io_poll_cancel_req(req);
 				found = true;
 			}
@@ -7075,8 +7153,11 @@ fail:
 
 static void io_clean_op(struct io_kiocb *req)
 {
-	if (req->flags & REQ_F_BUFFER_SELECTED)
+	if (req->flags & REQ_F_BUFFER_SELECTED) {
+		spin_lock(&req->ctx->completion_lock);
 		io_put_kbuf_comp(req);
+		spin_unlock(&req->ctx->completion_lock);
+	}
 
 	if (req->flags & REQ_F_NEED_CLEANUP) {
 		switch (req->opcode) {
@@ -7505,11 +7586,9 @@ static void io_queue_sqe_arm_apoll(struct io_kiocb *req)
 		 * Queued up for async execution, worker will release
 		 * submit reference when the iocb is actually submitted.
 		 */
-		io_kbuf_recycle(req);
 		io_queue_async_work(req, NULL);
 		break;
 	case IO_APOLL_OK:
-		io_kbuf_recycle(req);
 		break;
 	}
 
@@ -8053,6 +8132,13 @@ static int io_sq_thread(void *data)
 					needs_sched = false;
 					break;
 				}
+
+				/*
+				 * Ensure the store of the wakeup flag is not
+				 * reordered with the load of the SQ tail
+				 */
+				smp_mb();
+
 				if (io_sqring_entries(ctx)) {
 					needs_sched = false;
 					break;
@@ -8782,6 +8868,7 @@ static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
 			fput(fpl->fp[i]);
 	} else {
 		kfree_skb(skb);
+		free_uid(fpl->user);
 		kfree(fpl);
 	}
 
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index d2be4eb22008..784adc6f6ed2 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -201,11 +201,9 @@ struct io_uring_cqe {
  *
  * IORING_CQE_F_BUFFER	If set, the upper 16 bits are the buffer ID
  * IORING_CQE_F_MORE	If set, parent SQE will generate more CQE entries
- * IORING_CQE_F_MSG	If set, CQE was generated with IORING_OP_MSG_RING
  */
 #define IORING_CQE_F_BUFFER		(1U << 0)
 #define IORING_CQE_F_MORE		(1U << 1)
-#define IORING_CQE_F_MSG		(1U << 2)
 
 enum {
 	IORING_CQE_BUFFER_SHIFT		= 16,