dlm: retry failed SCTP sends

Currently if a SCTP send fails, we lose the data we were trying
to send because the writequeue_entry is released when we do the send.
When this happens other nodes will then hang waiting for a reply.

This adds support for SCTP to retry the send operation.

I also removed the retry limit for SCTP use, because we want
to make sure we try every path during init time and for longer
failures we want to continually retry in case paths come back up
while trying other paths. We will do this until userspace tells us
to stop.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Mike Christie 2013-06-14 04:56:13 -05:00 committed by David Teigland
parent 98e1b60ecc
commit 5d6898714f
1 changed files with 75 additions and 29 deletions

View File

@ -607,15 +607,56 @@ static void sctp_init_failed(void)
mutex_unlock(&connections_lock);
}
static void retry_failed_sctp_send(struct connection *recv_con,
struct sctp_send_failed *sn_send_failed,
char *buf)
{
int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
struct dlm_mhandle *mh;
struct connection *con;
char *retry_buf;
int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
log_print("Retry sending %d bytes to node id %d", len, nodeid);
con = nodeid2con(nodeid, 0);
if (!con) {
log_print("Could not look up con for nodeid %d\n",
nodeid);
return;
}
mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
if (!mh) {
log_print("Could not allocate buf for retry.");
return;
}
memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
dlm_lowcomms_commit_buffer(mh);
/*
* If we got a assoc changed event before the send failed event then
* we only need to retry the send.
*/
if (con->sctp_assoc) {
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
queue_work(send_workqueue, &con->swork);
} else
sctp_init_failed_foreach(con);
}
/* Something happened to an association */
static void process_sctp_notification(struct connection *con,
struct msghdr *msg, char *buf)
{
union sctp_notification *sn = (union sctp_notification *)buf;
if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
switch (sn->sn_header.sn_type) {
case SCTP_SEND_FAILED:
retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
break;
case SCTP_ASSOC_CHANGE:
switch (sn->sn_assoc_change.sac_state) {
case SCTP_COMM_UP:
case SCTP_RESTART:
{
@ -713,14 +754,10 @@ static void process_sctp_notification(struct connection *con,
}
break;
/* We don't know which INIT failed, so clear the PENDING flags
* on them all. if assoc_id is zero then it will then try
* again */
case SCTP_CANT_STR_ASSOC:
{
/* Will retry init when we get the send failed notification */
log_print("Can't start SCTP association - retrying");
sctp_init_failed();
}
break;
@ -729,6 +766,8 @@ static void process_sctp_notification(struct connection *con,
(int)sn->sn_assoc_change.sac_assoc_id,
sn->sn_assoc_change.sac_state);
}
default:
; /* fall through */
}
}
@ -988,6 +1027,24 @@ static void free_entry(struct writequeue_entry *e)
kfree(e);
}
/*
* writequeue_entry_complete - try to delete and free write queue entry
* @e: write queue entry to try to delete
* @completed: bytes completed
*
* writequeue_lock must be held.
*/
static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
{
e->offset += completed;
e->len -= completed;
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
}
}
/* Initiate an SCTP association.
This is a special case of send_to_sock() in that we don't yet have a
peeled-off socket for this association, so we use the listening socket
@ -1007,16 +1064,14 @@ static void sctp_init_assoc(struct connection *con)
int addrlen;
struct kvec iov[1];
mutex_lock(&con->sock_mutex);
if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
return;
if (con->retries++ > MAX_CONNECT_RETRIES)
return;
goto unlock;
if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
con->try_new_addr)) {
log_print("no address for nodeid %d", con->nodeid);
return;
goto unlock;
}
base_con = nodeid2con(0, 0);
BUG_ON(base_con == NULL);
@ -1034,17 +1089,17 @@ static void sctp_init_assoc(struct connection *con)
if (list_empty(&con->writequeue)) {
spin_unlock(&con->writequeue_lock);
log_print("writequeue empty for nodeid %d", con->nodeid);
return;
goto unlock;
}
e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
len = e->len;
offset = e->offset;
spin_unlock(&con->writequeue_lock);
/* Send the first block off the write queue */
iov[0].iov_base = page_address(e->page)+offset;
iov[0].iov_len = len;
spin_unlock(&con->writequeue_lock);
if (rem_addr.ss_family == AF_INET) {
struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
@ -1060,7 +1115,7 @@ static void sctp_init_assoc(struct connection *con)
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
outmessage.msg_controllen = cmsg->cmsg_len;
sinfo->sinfo_flags |= SCTP_ADDR_OVER;
@ -1075,15 +1130,12 @@ static void sctp_init_assoc(struct connection *con)
}
else {
spin_lock(&con->writequeue_lock);
e->offset += ret;
e->len -= ret;
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
}
writequeue_entry_complete(e, ret);
spin_unlock(&con->writequeue_lock);
}
unlock:
mutex_unlock(&con->sock_mutex);
}
/* Connect a new socket to its peer */
@ -1533,13 +1585,7 @@ static void send_to_sock(struct connection *con)
}
spin_lock(&con->writequeue_lock);
e->offset += ret;
e->len -= ret;
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
}
writequeue_entry_complete(e, ret);
}
spin_unlock(&con->writequeue_lock);
out: