SUNRPC: Use poll() to fix up the socket requeue races
Because we clear XPRT_SOCK_DATA_READY before reading, we can end up with a situation where new data arrives, causing xs_data_ready() to queue up a second receive worker job for the same socket, which then immediately gets stuck waiting on the transport receive mutex. The fix is to only clear XPRT_SOCK_DATA_READY once we're done reading, and then to use poll() to check if we might need to queue up a new job in order to deal with any new data. Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
This commit is contained in:
parent
a1231fda7e
commit
0ffe86f480
|
@ -656,13 +656,34 @@ out_err:
|
||||||
return ret != 0 ? ret : -ESHUTDOWN;
|
return ret != 0 ? ret : -ESHUTDOWN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static __poll_t xs_poll_socket(struct sock_xprt *transport)
|
||||||
|
{
|
||||||
|
return transport->sock->ops->poll(NULL, transport->sock, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool xs_poll_socket_readable(struct sock_xprt *transport)
|
||||||
|
{
|
||||||
|
__poll_t events = xs_poll_socket(transport);
|
||||||
|
|
||||||
|
return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void xs_poll_check_readable(struct sock_xprt *transport)
|
||||||
|
{
|
||||||
|
|
||||||
|
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
|
||||||
|
if (!xs_poll_socket_readable(transport))
|
||||||
|
return;
|
||||||
|
if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
|
||||||
|
queue_work(xprtiod_workqueue, &transport->recv_worker);
|
||||||
|
}
|
||||||
|
|
||||||
static void xs_stream_data_receive(struct sock_xprt *transport)
|
static void xs_stream_data_receive(struct sock_xprt *transport)
|
||||||
{
|
{
|
||||||
size_t read = 0;
|
size_t read = 0;
|
||||||
ssize_t ret = 0;
|
ssize_t ret = 0;
|
||||||
|
|
||||||
mutex_lock(&transport->recv_mutex);
|
mutex_lock(&transport->recv_mutex);
|
||||||
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
|
|
||||||
if (transport->sock == NULL)
|
if (transport->sock == NULL)
|
||||||
goto out;
|
goto out;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
@ -672,6 +693,7 @@ static void xs_stream_data_receive(struct sock_xprt *transport)
|
||||||
read += ret;
|
read += ret;
|
||||||
cond_resched();
|
cond_resched();
|
||||||
}
|
}
|
||||||
|
xs_poll_check_readable(transport);
|
||||||
out:
|
out:
|
||||||
mutex_unlock(&transport->recv_mutex);
|
mutex_unlock(&transport->recv_mutex);
|
||||||
trace_xs_stream_read_data(&transport->xprt, ret, read);
|
trace_xs_stream_read_data(&transport->xprt, ret, read);
|
||||||
|
@ -1362,7 +1384,6 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
|
||||||
int err;
|
int err;
|
||||||
|
|
||||||
mutex_lock(&transport->recv_mutex);
|
mutex_lock(&transport->recv_mutex);
|
||||||
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
|
|
||||||
sk = transport->inet;
|
sk = transport->inet;
|
||||||
if (sk == NULL)
|
if (sk == NULL)
|
||||||
goto out;
|
goto out;
|
||||||
|
@ -1374,6 +1395,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
|
||||||
consume_skb(skb);
|
consume_skb(skb);
|
||||||
cond_resched();
|
cond_resched();
|
||||||
}
|
}
|
||||||
|
xs_poll_check_readable(transport);
|
||||||
out:
|
out:
|
||||||
mutex_unlock(&transport->recv_mutex);
|
mutex_unlock(&transport->recv_mutex);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue