From 6163a194e02ab6cab2758b277a0ae082378dd4e6 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:08 -0400 Subject: [PATCH 1/8] tipc: decrease connection flow control window Memory overhead when allocating big buffers for data transfer may be quite significant. E.g., truesize of a 64 KB buffer turns out to be 132 KB, 2 x the requested size. This invalidates the "worst case" calculation we have been using to determine the default socket receive buffer limit, which is based on the assumption that 1024x64KB = 67MB buffers may be queued up on a socket. Since TIPC connections cannot survive hitting the buffer limit, we have to compensate for this overhead. We do that in this commit by dividing the fix connection flow control window from 1024 (2*512) messages to 512 (2*256). Since older version nodes send out acks at 512 message intervals, compatibility with such nodes is guaranteed, although performance may be non-optimal in such cases. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/core.c | 7 ++++--- net/tipc/port.h | 9 +++++---- net/tipc/socket.c | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/net/tipc/core.c b/net/tipc/core.c index 57f8ae9aa466..676d18015dd8 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -154,10 +154,11 @@ static int __init tipc_init(void) tipc_max_ports = CONFIG_TIPC_PORTS; tipc_net_id = 4711; - sysctl_tipc_rmem[0] = CONN_OVERLOAD_LIMIT >> 4 << TIPC_LOW_IMPORTANCE; - sysctl_tipc_rmem[1] = CONN_OVERLOAD_LIMIT >> 4 << + sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << + TIPC_LOW_IMPORTANCE; + sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << TIPC_CRITICAL_IMPORTANCE; - sysctl_tipc_rmem[2] = CONN_OVERLOAD_LIMIT; + sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT; res = tipc_core_start(); if (res) diff --git a/net/tipc/port.h b/net/tipc/port.h index a00397393bd1..5dfd165df1d7 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -42,9 +42,10 @@ #include "msg.h" #include "node_subscr.h" -#define TIPC_FLOW_CONTROL_WIN 512 -#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \ - SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) +#define TIPC_CONNACK_INTV 256 +#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) +#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ + SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) /** * struct tipc_port - TIPC port structure @@ -187,7 +188,7 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr) static inline int tipc_port_congested(struct tipc_port *p_ptr) { - return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2); + return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN); } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3f9912f87d0d..8685daf060f9 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1101,7 +1101,7 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { if ((sock->state != SS_READY) && - (++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) + (++port->conn_unacked >= TIPC_CONNACK_INTV)) tipc_acknowledge(port->ref, port->conn_unacked); advance_rx_queue(sk); } @@ -1210,7 +1210,7 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) + if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) tipc_acknowledge(port->ref, port->conn_unacked); advance_rx_queue(sk); } From 4f4482dcd9a0606a30541ff165ddaca64748299b Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:09 -0400 Subject: [PATCH 2/8] tipc: compensate for double accounting in socket rcv buffer The function net/core/sock.c::__release_sock() runs a tight loop to move buffers from the socket backlog queue to the receive queue. As a security measure, sk_backlog.len of the receiving socket is not set to zero until after the loop is finished, i.e., until the whole backlog queue has been transferred to the receive queue. During this transfer, the data that has already been moved is counted both in the backlog queue and the receive queue, hence giving an incorrect picture of the available queue space for new arriving buffers. This leads to unnecessary rejection of buffers by sk_add_backlog(), which in TIPC leads to unnecessarily broken connections. In this commit, we compensate for this double accounting by adding a counter that keeps track of it. The function socket.c::backlog_rcv() receives buffers one by one from __release_sock(), and adds them to the socket receive queue. If the transfer is successful, it increases a new atomic counter 'tipc_sock::dupl_rcvcnt' with 'truesize' of the transferred buffer. If a new buffer arrives during this transfer and finds the socket busy (owned), we attempt to add it to the backlog. However, when sk_add_backlog() is called, we adjust the 'limit' parameter with the value of the new counter, so that the risk of inadvertent rejection is eliminated. It should be noted that this change does not invalidate the original purpose of zeroing 'sk_backlog.len' after the full transfer. We set an upper limit for dupl_rcvcnt, so that if a 'wild' sender (i.e., one that doesn't respect the send window) keeps pumping in buffers to sk_add_backlog(), he will eventually reach an upper limit, (2 x TIPC_CONN_OVERLOAD_LIMIT). After that, no messages can be added to the backlog, and the connection will be broken. Ordinary, well- behaved senders will never reach this buffer limit at all. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/socket.c | 28 +++++++++++++++++++--------- net/tipc/socket.h | 2 ++ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 8685daf060f9..249500614568 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,5 +1,5 @@ /* - * net/tipc/socket.c: TIPC socket API +* net/tipc/socket.c: TIPC socket API * * Copyright (c) 2001-2007, 2012-2014, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems @@ -45,7 +45,7 @@ #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ -static int backlog_rcv(struct sock *sk, struct sk_buff *skb); +static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static int tipc_release(struct socket *sock); @@ -196,11 +196,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock, sock->state = state; sock_init_data(sock, sk); - sk->sk_backlog_rcv = backlog_rcv; + sk->sk_backlog_rcv = tipc_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; - tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; + tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; + atomic_set(&tsk->dupl_rcvcnt, 0); tipc_port_unlock(port); if (sock->state == SS_READY) { @@ -1416,7 +1417,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) } /** - * backlog_rcv - handle incoming message from backlog queue + * tipc_backlog_rcv - handle incoming message from backlog queue * @sk: socket * @buf: message * @@ -1424,13 +1425,18 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) * * Returns 0 */ -static int backlog_rcv(struct sock *sk, struct sk_buff *buf) +static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) { u32 res; + struct tipc_sock *tsk = tipc_sk(sk); res = filter_rcv(sk, buf); - if (res) + if (unlikely(res)) tipc_reject_msg(buf, res); + + if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) + atomic_add(buf->truesize, &tsk->dupl_rcvcnt); + return 0; } @@ -1445,8 +1451,9 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf) */ u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf) { + struct tipc_sock *tsk = tipc_sk(sk); u32 res; - + uint limit; /* * Process message if socket is unlocked; otherwise add to backlog queue * @@ -1457,7 +1464,10 @@ u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf) if (!sock_owned_by_user(sk)) { res = filter_rcv(sk, buf); } else { - if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf))) + if (sk->sk_backlog.len == 0) + atomic_set(&tsk->dupl_rcvcnt, 0); + limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); + if (sk_add_backlog(sk, buf, limit)) res = TIPC_ERR_OVERLOAD; else res = TIPC_OK; diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 74e5c7f195a6..86c27cc51e33 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -44,12 +44,14 @@ * @port: port - interacts with 'sk' and with the rest of the TIPC stack * @peer_name: the peer of the connection, if any * @conn_timeout: the time we can wait for an unresponded setup request + * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue */ struct tipc_sock { struct sock sk; struct tipc_port port; unsigned int conn_timeout; + atomic_t dupl_rcvcnt; }; static inline struct tipc_sock *tipc_sk(const struct sock *sk) From ec37dcd382b4f3673bfbf36ccd348ef48f98ffe3 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:10 -0400 Subject: [PATCH 3/8] tipc: don't record link RESET or ACTIVATE messages as traffic In the current code, all incoming LINK_PROTOCOL messages, irrespective of type, nudge the "last message received" checkpoint, informing the link state machine that a message was received from the peer since last supervision timeout event. This inhibits the link from starting probing the peer unnecessarily. However, not only STATE messages are recorded as legitimate incoming traffic this way, but even RESET and ACTIVATE messages, which in reality are there to inform the link that the peer endpoint has been reset. At the same time, some RESET messages may be dropped instead of causing a link reset. This happens when the link endpoint thinks it is fully up and working, and the session number of the RESET is lower than or equal to the current link session. In such cases the RESET is perceived as a delayed remnant from an earlier session, or the current one, and dropped. Now, if a TIPC module is removed and then immediately reinserted, e.g. when using a script, RESET messages may arrive at the peer link endpoint before this one has had time to discover the failure. The RESET may be dropped because of the session number, but only after it has been recorded as a legitimate traffic event. Hence, the receiving link will not start probing, and not discover that the peer endpoint is down, at the same time ignoring the periodic RESET messages coming from that endpoint. We have ended up in a stale state where a failed link cannot be re-established. In this commit, we remedy this by nudging the checkpoint only for received STATE messages, not for RESET or ACTIVATE messages. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 2140837fbab9..6cf7938784c6 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1831,9 +1831,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) if (l_ptr->exp_msg_count) goto exit; - /* record unnumbered packet arrival (force mismatch on next timeout) */ - l_ptr->checkpoint--; - if (l_ptr->net_plane != msg_net_plane(msg)) if (tipc_own_addr > msg_prevnode(msg)) l_ptr->net_plane = msg_net_plane(msg); @@ -1909,6 +1906,10 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) tipc_link_reset(l_ptr); /* Enforce change to take effect */ break; } + + /* Record reception; force mismatch at next timeout: */ + l_ptr->checkpoint--; + link_state_event(l_ptr, TRAFFIC_MSG_EVT); l_ptr->stats.recv_states++; if (link_reset_unknown(l_ptr)) From 5074ab89c555dd130ceeac129546670423d634b8 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:11 -0400 Subject: [PATCH 4/8] tipc: mark head of reassembly buffer as non-linear The message reassembly function does not update the 'len' and 'data_len' fields of the head skbuff correctly when fragments are chained to it. This may sometimes lead to obsure errors, such as fragment reordering when we receive fragments which are cloned buffers. This commit fixes this, by ensuring that the two fields are updated correctly. Suggested-by: Eric Dumazet Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/net/tipc/link.c b/net/tipc/link.c index 6cf7938784c6..9272d4cc0225 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2341,6 +2341,8 @@ int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail, (*tail)->next = frag; *tail = frag; (*head)->truesize += frag->truesize; + (*head)->data_len += frag->len; + (*head)->len += frag->len; } if (fragid == LAST_FRAGMENT) { *fbuf = *head; From 37e22164a8a3c39bdad45aa463b1e69a1fdf4110 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:12 -0400 Subject: [PATCH 5/8] tipc: rename and move message reassembly function The function tipc_link_frag_rcv() is in reality a re-entrant generic message reassemby function that has nothing in particular to do with the link, where it is defined now. This becomes obvious when we see the need to call the function from other places in the code. In this commit rename it to tipc_buf_append() and move it to the file msg.c. We also simplify its signature by moving the tail pointer to the control block of the head buffer, hence making the head buffer self-contained. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 10 +++----- net/tipc/core.h | 1 + net/tipc/link.c | 67 ++++-------------------------------------------- net/tipc/link.h | 14 ++-------- net/tipc/msg.c | 55 ++++++++++++++++++++++++++++++++++++++- net/tipc/msg.h | 5 +++- net/tipc/node.c | 7 +++-- net/tipc/node.h | 6 ++--- 8 files changed, 74 insertions(+), 91 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index a0978d0890cb..671f9817b4f4 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -506,18 +506,14 @@ receive: tipc_node_unlock(node); tipc_link_bundle_rcv(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { - int ret; - ret = tipc_link_frag_rcv(&node->bclink.reasm_head, - &node->bclink.reasm_tail, - &buf); - if (ret == LINK_REASM_ERROR) + tipc_buf_append(&node->bclink.reasm_buf, &buf); + if (unlikely(!buf && !node->bclink.reasm_buf)) goto unlock; tipc_bclink_lock(); bclink_accept_pkt(node, seqno); bcl->stats.recv_fragments++; - if (ret == LINK_REASM_COMPLETE) { + if (buf) { bcl->stats.recv_fragmented++; - /* Point msg to inner header */ msg = buf_msg(buf); tipc_bclink_unlock(); goto receive; diff --git a/net/tipc/core.h b/net/tipc/core.h index ae55d37267e6..2e00682bc455 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -187,6 +187,7 @@ static inline void k_term_timer(struct timer_list *timer) struct tipc_skb_cb { void *handle; bool deferred; + struct sk_buff *tail; }; #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) diff --git a/net/tipc/link.c b/net/tipc/link.c index 9272d4cc0225..24d058796cd9 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -398,9 +398,8 @@ static void link_release_outqueue(struct tipc_link *l_ptr) */ void tipc_link_reset_fragments(struct tipc_link *l_ptr) { - kfree_skb(l_ptr->reasm_head); - l_ptr->reasm_head = NULL; - l_ptr->reasm_tail = NULL; + kfree_skb(l_ptr->reasm_buf); + l_ptr->reasm_buf = NULL; } /** @@ -1573,17 +1572,12 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) } msg = buf_msg(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { - int rc; - l_ptr->stats.recv_fragments++; - rc = tipc_link_frag_rcv(&l_ptr->reasm_head, - &l_ptr->reasm_tail, - &buf); - if (rc == LINK_REASM_COMPLETE) { + if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) { l_ptr->stats.recv_fragmented++; msg = buf_msg(buf); } else { - if (rc == LINK_REASM_ERROR) + if (!l_ptr->reasm_buf) tipc_link_reset(l_ptr); tipc_node_unlock(n_ptr); continue; @@ -2169,9 +2163,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, } if (msg_user(msg) == MSG_FRAGMENTER) { l_ptr->stats.recv_fragments++; - tipc_link_frag_rcv(&l_ptr->reasm_head, - &l_ptr->reasm_tail, - &buf); + tipc_buf_append(&l_ptr->reasm_buf, &buf); } } exit: @@ -2309,55 +2301,6 @@ static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) return dsz; } -/* tipc_link_frag_rcv(): Called with node lock on. Returns - * the reassembled buffer if message is complete. - */ -int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff **fbuf) -{ - struct sk_buff *frag = *fbuf; - struct tipc_msg *msg = buf_msg(frag); - u32 fragid = msg_type(msg); - bool headstolen; - int delta; - - skb_pull(frag, msg_hdr_sz(msg)); - if (fragid == FIRST_FRAGMENT) { - if (*head || skb_unclone(frag, GFP_ATOMIC)) - goto out_free; - *head = frag; - skb_frag_list_init(*head); - *fbuf = NULL; - return 0; - } else if (*head && - skb_try_coalesce(*head, frag, &headstolen, &delta)) { - kfree_skb_partial(frag, headstolen); - } else { - if (!*head) - goto out_free; - if (!skb_has_frag_list(*head)) - skb_shinfo(*head)->frag_list = frag; - else - (*tail)->next = frag; - *tail = frag; - (*head)->truesize += frag->truesize; - (*head)->data_len += frag->len; - (*head)->len += frag->len; - } - if (fragid == LAST_FRAGMENT) { - *fbuf = *head; - *tail = *head = NULL; - return LINK_REASM_COMPLETE; - } - *fbuf = NULL; - return 0; -out_free: - pr_warn_ratelimited("Link unable to reassemble fragmented message\n"); - kfree_skb(*fbuf); - *fbuf = NULL; - return LINK_REASM_ERROR; -} - static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) { if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) diff --git a/net/tipc/link.h b/net/tipc/link.h index 7ba73fa6b81e..200d518b218e 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -40,11 +40,6 @@ #include "msg.h" #include "node.h" -/* Link reassembly status codes - */ -#define LINK_REASM_ERROR -1 -#define LINK_REASM_COMPLETE 1 - /* Out-of-range value for link sequence numbers */ #define INVALID_LINK_SEQ 0x10000 @@ -140,8 +135,7 @@ struct tipc_stats { * @next_out: ptr to first unsent outbound message in queue * @waiting_ports: linked list of ports waiting for link congestion to abate * @long_msg_seq_no: next identifier to use for outbound fragmented messages - * @reasm_head: list head of partially reassembled inbound message fragments - * @reasm_tail: last fragment received + * @reasm_buf: head of partially reassembled inbound message fragments * @stats: collects statistics regarding link activity */ struct tipc_link { @@ -204,8 +198,7 @@ struct tipc_link { /* Fragmentation/reassembly */ u32 long_msg_seq_no; - struct sk_buff *reasm_head; - struct sk_buff *reasm_tail; + struct sk_buff *reasm_buf; /* Statistics */ struct tipc_stats stats; @@ -242,9 +235,6 @@ int tipc_link_iovec_xmit_fast(struct tipc_port *sender, struct iovec const *msg_sect, unsigned int len, u32 destnode); void tipc_link_bundle_rcv(struct sk_buff *buf); -int tipc_link_frag_rcv(struct sk_buff **reasm_head, - struct sk_buff **reasm_tail, - struct sk_buff **fbuf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); void tipc_link_push_queue(struct tipc_link *l_ptr); diff --git a/net/tipc/msg.c b/net/tipc/msg.c index e525f8ce1dee..8be6e94a1ca9 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -1,7 +1,7 @@ /* * net/tipc/msg.c: TIPC message header routines * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -99,3 +99,56 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, } return dsz; } + +/* tipc_buf_append(): Append a buffer to the fragment list of another buffer + * Let first buffer become head buffer + * Returns 1 and sets *buf to headbuf if chain is complete, otherwise 0 + * Leaves headbuf pointer at NULL if failure + */ +int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) +{ + struct sk_buff *head = *headbuf; + struct sk_buff *frag = *buf; + struct sk_buff *tail; + struct tipc_msg *msg = buf_msg(frag); + u32 fragid = msg_type(msg); + bool headstolen; + int delta; + + skb_pull(frag, msg_hdr_sz(msg)); + + if (fragid == FIRST_FRAGMENT) { + if (head || skb_unclone(frag, GFP_ATOMIC)) + goto out_free; + head = *headbuf = frag; + skb_frag_list_init(head); + return 0; + } + if (!head) + goto out_free; + tail = TIPC_SKB_CB(head)->tail; + if (skb_try_coalesce(head, frag, &headstolen, &delta)) { + kfree_skb_partial(frag, headstolen); + } else { + if (!skb_has_frag_list(head)) + skb_shinfo(head)->frag_list = frag; + else + tail->next = frag; + head->truesize += frag->truesize; + head->data_len += frag->len; + head->len += frag->len; + TIPC_SKB_CB(head)->tail = frag; + } + if (fragid == LAST_FRAGMENT) { + *buf = head; + TIPC_SKB_CB(head)->tail = NULL; + *headbuf = NULL; + return 1; + } + *buf = NULL; + return 0; +out_free: + pr_warn_ratelimited("Unable to build fragment list\n"); + kfree_skb(*buf); + return 0; +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 76d1269b9443..503511903d1d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@ /* * net/tipc/msg.h: Include file for TIPC message header routines * - * Copyright (c) 2000-2007, Ericsson AB + * Copyright (c) 2000-2007, 2014, Ericsson AB * Copyright (c) 2005-2008, 2010-2011, Wind River Systems * All rights reserved. * @@ -711,4 +711,7 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, unsigned int len, int max_size, struct sk_buff **buf); + +int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); + #endif diff --git a/net/tipc/node.c b/net/tipc/node.c index facd5611e785..5b44c3041be4 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -286,10 +286,9 @@ static void node_lost_contact(struct tipc_node *n_ptr) kfree_skb_list(n_ptr->bclink.deferred_head); n_ptr->bclink.deferred_size = 0; - if (n_ptr->bclink.reasm_head) { - kfree_skb(n_ptr->bclink.reasm_head); - n_ptr->bclink.reasm_head = NULL; - n_ptr->bclink.reasm_tail = NULL; + if (n_ptr->bclink.reasm_buf) { + kfree_skb(n_ptr->bclink.reasm_buf); + n_ptr->bclink.reasm_buf = NULL; } tipc_bclink_remove_node(n_ptr->addr); diff --git a/net/tipc/node.h b/net/tipc/node.h index 5454edf994c3..9087063793f2 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -69,8 +69,7 @@ enum { * @deferred_size: number of OOS b'cast messages in deferred queue * @deferred_head: oldest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node - * @reasm_head: broadcast reassembly queue head from node - * @reasm_tail: last broadcast fragment received from node + * @reasm_buf: broadcast reassembly queue head from node * @recv_permitted: true if node is allowed to receive b'cast messages */ struct tipc_node_bclink { @@ -81,8 +80,7 @@ struct tipc_node_bclink { u32 deferred_size; struct sk_buff *deferred_head; struct sk_buff *deferred_tail; - struct sk_buff *reasm_head; - struct sk_buff *reasm_tail; + struct sk_buff *reasm_buf; bool recv_permitted; }; From 38504c28a201a80d12a6a0f821fecb331cb1f223 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:13 -0400 Subject: [PATCH 6/8] tipc: improve and extend media address conversion functions TIPC currently handles two media specific addresses: Ethernet MAC addresses and InfiniBand addresses. Those are kept in three different formats: 1) A "raw" format as obtained from the device. This format is known only by the media specific adapter code in eth_media.c and ib_media.c. 2) A "generic" internal format, in the form of struct tipc_media_addr, which can be referenced and passed around by the generic media- unaware code. 3) A serialized version of the latter, to be conveyed in neighbor discovery messages. Conversion between the three formats can only be done by the media specific code, so we have function pointers for this purpose in struct tipc_media. Here, the media adapters can install their own conversion functions at startup. We now introduce a new such function, 'raw2addr()', whose purpose is to convert from format 1 to format 2 above. We also try to as far as possible uniform commenting, variable names and usage of these functions, with the purpose of making them more comprehensible. We can now also remove the function tipc_l2_media_addr_set(), whose job is done better by the new function. Finally, we expand the field for serialized addresses (format 3) in discovery messages from 20 to 32 bytes. This is permitted according to the spec, and reduces the risk of problems when we add new media in the future. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bearer.c | 36 ++++++------------------------- net/tipc/bearer.h | 35 ++++++++++++++++-------------- net/tipc/core.h | 1 + net/tipc/discover.c | 2 +- net/tipc/eth_media.c | 51 +++++++++++++++++++++++++++----------------- net/tipc/ib_media.c | 34 +++++++++++++++++++---------- 6 files changed, 83 insertions(+), 76 deletions(-) diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index f3259d4133b6..264474394f9f 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -411,28 +411,6 @@ int tipc_disable_bearer(const char *name) return res; } - -/* tipc_l2_media_addr_set - initialize Ethernet media address structure - * - * Media-dependent "value" field stores MAC address in first 6 bytes - * and zeroes out the remaining bytes. - */ -void tipc_l2_media_addr_set(const struct tipc_bearer *b, - struct tipc_media_addr *a, char *mac) -{ - int len = b->media->hwaddr_len; - - if (unlikely(sizeof(a->value) < len)) { - WARN_ONCE(1, "Media length invalid\n"); - return; - } - - memcpy(a->value, mac, len); - memset(a->value + len, 0, sizeof(a->value) - len); - a->media_id = b->media->type_id; - a->broadcast = !memcmp(mac, b->bcast_addr.value, len); -} - int tipc_enable_l2_media(struct tipc_bearer *b) { struct net_device *dev; @@ -443,21 +421,21 @@ int tipc_enable_l2_media(struct tipc_bearer *b) if (!dev) return -ENODEV; - /* Associate TIPC bearer with Ethernet bearer */ + /* Associate TIPC bearer with L2 bearer */ rcu_assign_pointer(b->media_ptr, dev); - memset(b->bcast_addr.value, 0, sizeof(b->bcast_addr.value)); + memset(&b->bcast_addr, 0, sizeof(b->bcast_addr)); memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); b->bcast_addr.media_id = b->media->type_id; b->bcast_addr.broadcast = 1; b->mtu = dev->mtu; - tipc_l2_media_addr_set(b, &b->addr, (char *)dev->dev_addr); + b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr); rcu_assign_pointer(dev->tipc_ptr, b); return 0; } -/* tipc_disable_l2_media - detach TIPC bearer from an Ethernet interface +/* tipc_disable_l2_media - detach TIPC bearer from an L2 interface * - * Mark Ethernet bearer as inactive so that incoming buffers are thrown away, + * Mark L2 bearer as inactive so that incoming buffers are thrown away, * then get worker thread to complete bearer cleanup. (Can't do cleanup * here because cleanup code needs to sleep and caller holds spinlocks.) */ @@ -473,7 +451,7 @@ void tipc_disable_l2_media(struct tipc_bearer *b) } /** - * tipc_l2_send_msg - send a TIPC packet out over an Ethernet interface + * tipc_l2_send_msg - send a TIPC packet out over an L2 interface * @buf: the packet to be sent * @b_ptr: the bearer through which the packet is to be sent * @dest: peer destination address @@ -597,7 +575,7 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt, tipc_reset_bearer(b_ptr); break; case NETDEV_CHANGEADDR: - tipc_l2_media_addr_set(b_ptr, &b_ptr->addr, + b_ptr->media->raw2addr(b_ptr, &b_ptr->addr, (char *)dev->dev_addr); tipc_reset_bearer(b_ptr); break; diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index a983b3005e71..78fccc49de23 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -42,14 +42,12 @@ #define MAX_BEARERS 2 #define MAX_MEDIA 2 -/* - * Identifiers associated with TIPC message header media address info - * - * - address info field is 20 bytes long - * - media type identifier located at offset 3 - * - remaining bytes vary according to media type +/* Identifiers associated with TIPC message header media address info + * - address info field is 32 bytes long + * - the field's actual content and length is defined per media + * - remaining unused bytes in the field are set to zero */ -#define TIPC_MEDIA_ADDR_SIZE 20 +#define TIPC_MEDIA_ADDR_SIZE 32 #define TIPC_MEDIA_TYPE_OFFSET 3 /* @@ -77,9 +75,10 @@ struct tipc_bearer; * @send_msg: routine which handles buffer transmission * @enable_media: routine which enables a media * @disable_media: routine which disables a media - * @addr2str: routine which converts media address to string - * @addr2msg: routine which converts media address to protocol message area - * @msg2addr: routine which converts media address from protocol message area + * @addr2str: convert media address format to string + * @addr2msg: convert from media addr format to discovery msg addr format + * @msg2addr: convert from discovery msg addr format to media addr format + * @raw2addr: convert from raw addr format to media addr format * @priority: default link (and bearer) priority * @tolerance: default time (in ms) before declaring link failure * @window: default window (in packets) before declaring link congestion @@ -93,10 +92,16 @@ struct tipc_media { struct tipc_media_addr *dest); int (*enable_media)(struct tipc_bearer *b_ptr); void (*disable_media)(struct tipc_bearer *b_ptr); - int (*addr2str)(struct tipc_media_addr *a, char *str_buf, int str_size); - int (*addr2msg)(struct tipc_media_addr *a, char *msg_area); - int (*msg2addr)(const struct tipc_bearer *b_ptr, - struct tipc_media_addr *a, char *msg_area); + int (*addr2str)(struct tipc_media_addr *addr, + char *strbuf, + int bufsz); + int (*addr2msg)(char *msg, struct tipc_media_addr *addr); + int (*msg2addr)(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg); + int (*raw2addr)(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *raw); u32 priority; u32 tolerance; u32 window; @@ -175,8 +180,6 @@ int tipc_media_set_priority(const char *name, u32 new_value); int tipc_media_set_window(const char *name, u32 new_value); void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a); struct sk_buff *tipc_media_get_names(void); -void tipc_l2_media_addr_set(const struct tipc_bearer *b, - struct tipc_media_addr *a, char *mac); int tipc_enable_l2_media(struct tipc_bearer *b); void tipc_disable_l2_media(struct tipc_bearer *b); int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b, diff --git a/net/tipc/core.h b/net/tipc/core.h index 2e00682bc455..bb26ed1ee966 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -57,6 +57,7 @@ #include #include #include +#include #define TIPC_MOD_VER "2.0.0" diff --git a/net/tipc/discover.c b/net/tipc/discover.c index bd35c4a0746f..b15cbedfea13 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -83,7 +83,7 @@ static void tipc_disc_init_msg(struct sk_buff *buf, u32 type, msg_set_node_sig(msg, tipc_random); msg_set_dest_domain(msg, dest_domain); msg_set_bc_netid(msg, tipc_net_id); - b_ptr->media->addr2msg(&b_ptr->addr, msg_media_addr(msg)); + b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); } /** diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index 67cf3f935dba..5e1426f1751f 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -1,7 +1,7 @@ /* * net/tipc/eth_media.c: Ethernet bearer support for TIPC * - * Copyright (c) 2001-2007, 2013, Ericsson AB + * Copyright (c) 2001-2007, 2013-2014, Ericsson AB * Copyright (c) 2005-2008, 2011-2013, Wind River Systems * All rights reserved. * @@ -37,39 +37,52 @@ #include "core.h" #include "bearer.h" -#define ETH_ADDR_OFFSET 4 /* message header offset of MAC address */ +#define ETH_ADDR_OFFSET 4 /* MAC addr position inside address field */ -/* convert Ethernet address to string */ -static int tipc_eth_addr2str(struct tipc_media_addr *a, char *str_buf, - int str_size) +/* Convert Ethernet address (media address format) to string */ +static int tipc_eth_addr2str(struct tipc_media_addr *addr, + char *strbuf, int bufsz) { - if (str_size < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */ + if (bufsz < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */ return 1; - sprintf(str_buf, "%pM", a->value); + sprintf(strbuf, "%pM", addr->value); return 0; } -/* convert Ethernet address format to message header format */ -static int tipc_eth_addr2msg(struct tipc_media_addr *a, char *msg_area) +/* Convert from media address format to discovery message addr format */ +static int tipc_eth_addr2msg(char *msg, struct tipc_media_addr *addr) { - memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); - msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH; - memcpy(msg_area + ETH_ADDR_OFFSET, a->value, ETH_ALEN); + memset(msg, 0, TIPC_MEDIA_ADDR_SIZE); + msg[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH; + memcpy(msg + ETH_ADDR_OFFSET, addr->value, ETH_ALEN); return 0; } -/* convert message header address format to Ethernet format */ -static int tipc_eth_msg2addr(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *msg_area) +/* Convert raw mac address format to media addr format */ +static int tipc_eth_raw2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) { - if (msg_area[TIPC_MEDIA_TYPE_OFFSET] != TIPC_MEDIA_TYPE_ETH) - return 1; + char bcast_mac[ETH_ALEN] = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; - tipc_l2_media_addr_set(tb_ptr, a, msg_area + ETH_ADDR_OFFSET); + memset(addr, 0, sizeof(*addr)); + ether_addr_copy(addr->value, msg); + addr->media_id = TIPC_MEDIA_TYPE_ETH; + addr->broadcast = !memcmp(addr->value, bcast_mac, ETH_ALEN); return 0; } +/* Convert discovery msg addr format to Ethernet media addr format */ +static int tipc_eth_msg2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) +{ + /* Skip past preamble: */ + msg += ETH_ADDR_OFFSET; + return tipc_eth_raw2addr(b, addr, msg); +} + /* Ethernet media registration info */ struct tipc_media eth_media_info = { .send_msg = tipc_l2_send_msg, @@ -78,6 +91,7 @@ struct tipc_media eth_media_info = { .addr2str = tipc_eth_addr2str, .addr2msg = tipc_eth_addr2msg, .msg2addr = tipc_eth_msg2addr, + .raw2addr = tipc_eth_raw2addr, .priority = TIPC_DEF_LINK_PRI, .tolerance = TIPC_DEF_LINK_TOL, .window = TIPC_DEF_LINK_WIN, @@ -85,4 +99,3 @@ struct tipc_media eth_media_info = { .hwaddr_len = ETH_ALEN, .name = "eth" }; - diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c index 844a77e25828..8522eef9c136 100644 --- a/net/tipc/ib_media.c +++ b/net/tipc/ib_media.c @@ -42,7 +42,7 @@ #include "core.h" #include "bearer.h" -/* convert InfiniBand address to string */ +/* convert InfiniBand address (media address format) media address to string */ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size) { @@ -54,23 +54,35 @@ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf, return 0; } -/* convert InfiniBand address format to message header format */ -static int tipc_ib_addr2msg(struct tipc_media_addr *a, char *msg_area) +/* Convert from media address format to discovery message addr format */ +static int tipc_ib_addr2msg(char *msg, struct tipc_media_addr *addr) { - memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); - msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_IB; - memcpy(msg_area, a->value, INFINIBAND_ALEN); + memset(msg, 0, TIPC_MEDIA_ADDR_SIZE); + memcpy(msg, addr->value, INFINIBAND_ALEN); return 0; } -/* convert message header address format to InfiniBand format */ -static int tipc_ib_msg2addr(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *msg_area) +/* Convert raw InfiniBand address format to media addr format */ +static int tipc_ib_raw2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) { - tipc_l2_media_addr_set(tb_ptr, a, msg_area); + memset(addr, 0, sizeof(*addr)); + memcpy(addr->value, msg, INFINIBAND_ALEN); + addr->media_id = TIPC_MEDIA_TYPE_IB; + addr->broadcast = !memcmp(msg, b->bcast_addr.value, + INFINIBAND_ALEN); return 0; } +/* Convert discovery msg addr format to InfiniBand media addr format */ +static int tipc_ib_msg2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) +{ + return tipc_ib_raw2addr(b, addr, msg); +} + /* InfiniBand media registration info */ struct tipc_media ib_media_info = { .send_msg = tipc_l2_send_msg, @@ -79,6 +91,7 @@ struct tipc_media ib_media_info = { .addr2str = tipc_ib_addr2str, .addr2msg = tipc_ib_addr2msg, .msg2addr = tipc_ib_msg2addr, + .raw2addr = tipc_ib_raw2addr, .priority = TIPC_DEF_LINK_PRI, .tolerance = TIPC_DEF_LINK_TOL, .window = TIPC_DEF_LINK_WIN, @@ -86,4 +99,3 @@ struct tipc_media ib_media_info = { .hwaddr_len = INFINIBAND_ALEN, .name = "ib" }; - From c82910e2a8d6fc9dd321a1f30dd4e89fb779cfe1 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:14 -0400 Subject: [PATCH 7/8] tipc: clean up neigbor discovery message reception The function tipc_disc_rcv(), which is handling received neighbor discovery messages, is perceived as messy, and it is hard to verify its correctness by code inspection. The fact that the task it is set to resolve is fairly complex does not make the situation better. In this commit we try to take a more systematic approach to the problem. We define a decision machine which takes three state flags as input, and produces three action flags as output. We then walk through all permutations of the state flags, and for each of them we describe verbally what is going on, plus that we set zero or more of the action flags. The action flags indicate what should be done once the decision machine has finished its job, while the last part of the function deals with performing those actions. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/discover.c | 219 ++++++++++++++++++++++---------------------- 1 file changed, 111 insertions(+), 108 deletions(-) diff --git a/net/tipc/discover.c b/net/tipc/discover.c index b15cbedfea13..aa722a42ef8b 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -1,7 +1,7 @@ /* * net/tipc/discover.c * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2006, 2014, Ericsson AB * Copyright (c) 2005-2006, 2010-2011, Wind River Systems * All rights reserved. * @@ -106,147 +106,150 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr, } /** - * tipc_disc_rcv - handle incoming link setup message (request or response) + * tipc_disc_rcv - handle incoming discovery message (request or response) * @buf: buffer containing message - * @b_ptr: bearer that message arrived on + * @bearer: bearer that message arrived on */ -void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr) +void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *bearer) { - struct tipc_node *n_ptr; + struct tipc_node *node; struct tipc_link *link; - struct tipc_media_addr media_addr; + struct tipc_media_addr maddr; struct sk_buff *rbuf; struct tipc_msg *msg = buf_msg(buf); - u32 dest = msg_dest_domain(msg); - u32 orig = msg_prevnode(msg); + u32 ddom = msg_dest_domain(msg); + u32 onode = msg_prevnode(msg); u32 net_id = msg_bc_netid(msg); - u32 type = msg_type(msg); + u32 mtyp = msg_type(msg); u32 signature = msg_node_sig(msg); - int addr_mismatch; - int link_fully_up; + bool addr_match = false; + bool sign_match = false; + bool link_up = false; + bool accept_addr = false; + bool accept_sign = false; + bool respond = false; - media_addr.broadcast = 1; - b_ptr->media->msg2addr(b_ptr, &media_addr, msg_media_addr(msg)); + bearer->media->msg2addr(bearer, &maddr, msg_media_addr(msg)); kfree_skb(buf); /* Ensure message from node is valid and communication is permitted */ if (net_id != tipc_net_id) return; - if (media_addr.broadcast) + if (maddr.broadcast) return; - if (!tipc_addr_domain_valid(dest)) + if (!tipc_addr_domain_valid(ddom)) return; - if (!tipc_addr_node_valid(orig)) - return; - if (orig == tipc_own_addr) { - if (memcmp(&media_addr, &b_ptr->addr, sizeof(media_addr))) - disc_dupl_alert(b_ptr, tipc_own_addr, &media_addr); - return; - } - if (!tipc_in_scope(dest, tipc_own_addr)) - return; - if (!tipc_in_scope(b_ptr->domain, orig)) + if (!tipc_addr_node_valid(onode)) return; - /* Locate structure corresponding to requesting node */ - n_ptr = tipc_node_find(orig); - if (!n_ptr) { - n_ptr = tipc_node_create(orig); - if (!n_ptr) - return; + if (in_own_node(onode)) { + if (memcmp(&maddr, &bearer->addr, sizeof(maddr))) + disc_dupl_alert(bearer, tipc_own_addr, &maddr); + return; } - tipc_node_lock(n_ptr); + if (!tipc_in_scope(ddom, tipc_own_addr)) + return; + if (!tipc_in_scope(bearer->domain, onode)) + return; + + /* Locate, or if necessary, create, node: */ + node = tipc_node_find(onode); + if (!node) + node = tipc_node_create(onode); + if (!node) + return; + + tipc_node_lock(node); + link = node->links[bearer->identity]; /* Prepare to validate requesting node's signature and media address */ - link = n_ptr->links[b_ptr->identity]; - addr_mismatch = (link != NULL) && - memcmp(&link->media_addr, &media_addr, sizeof(media_addr)); + sign_match = (signature == node->signature); + addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr)); + link_up = link && tipc_link_is_up(link); - /* - * Ensure discovery message's signature is correct - * - * If signature is incorrect and there is no working link to the node, - * accept the new signature but invalidate all existing links to the - * node so they won't re-activate without a new discovery message. - * - * If signature is incorrect and the requested link to the node is - * working, accept the new signature. (This is an instance of delayed - * rediscovery, where a link endpoint was able to re-establish contact - * with its peer endpoint on a node that rebooted before receiving a - * discovery message from that node.) - * - * If signature is incorrect and there is a working link to the node - * that is not the requested link, reject the request (must be from - * a duplicate node). - */ - if (signature != n_ptr->signature) { - if (n_ptr->working_links == 0) { - struct tipc_link *curr_link; - int i; - for (i = 0; i < MAX_BEARERS; i++) { - curr_link = n_ptr->links[i]; - if (curr_link) { - memset(&curr_link->media_addr, 0, - sizeof(media_addr)); - tipc_link_reset(curr_link); - } - } - addr_mismatch = (link != NULL); - } else if (tipc_link_is_up(link) && !addr_mismatch) { - /* delayed rediscovery */ - } else { - disc_dupl_alert(b_ptr, orig, &media_addr); - tipc_node_unlock(n_ptr); - return; - } - n_ptr->signature = signature; + /* These three flags give us eight permutations: */ + + if (sign_match && addr_match && link_up) { + /* All is fine. Do nothing. */ + } else if (sign_match && addr_match && !link_up) { + /* Respond. The link will come up in due time */ + respond = true; + } else if (sign_match && !addr_match && link_up) { + /* Peer has changed i/f address without rebooting. + * If so, the link will reset soon, and the next + * discovery will be accepted. So we can ignore it. + * It may also be an cloned or malicious peer having + * chosen the same node address and signature as an + * existing one. + * Ignore requests until the link goes down, if ever. + */ + disc_dupl_alert(bearer, onode, &maddr); + } else if (sign_match && !addr_match && !link_up) { + /* Peer link has changed i/f address without rebooting. + * It may also be a cloned or malicious peer; we can't + * distinguish between the two. + * The signature is correct, so we must accept. + */ + accept_addr = true; + respond = true; + } else if (!sign_match && addr_match && link_up) { + /* Peer node rebooted. Two possibilities: + * - Delayed re-discovery; this link endpoint has already + * reset and re-established contact with the peer, before + * receiving a discovery message from that node. + * (The peer happened to receive one from this node first). + * - The peer came back so fast that our side has not + * discovered it yet. Probing from this side will soon + * reset the link, since there can be no working link + * endpoint at the peer end, and the link will re-establish. + * Accept the signature, since it comes from a known peer. + */ + accept_sign = true; + } else if (!sign_match && addr_match && !link_up) { + /* The peer node has rebooted. + * Accept signature, since it is a known peer. + */ + accept_sign = true; + respond = true; + } else if (!sign_match && !addr_match && link_up) { + /* Peer rebooted with new address, or a new/duplicate peer. + * Ignore until the link goes down, if ever. + */ + disc_dupl_alert(bearer, onode, &maddr); + } else if (!sign_match && !addr_match && !link_up) { + /* Peer rebooted with new address, or it is a new peer. + * Accept signature and address. + */ + accept_sign = true; + accept_addr = true; + respond = true; } - /* - * Ensure requesting node's media address is correct - * - * If media address doesn't match and the link is working, reject the - * request (must be from a duplicate node). - * - * If media address doesn't match and the link is not working, accept - * the new media address and reset the link to ensure it starts up - * cleanly. - */ - if (addr_mismatch) { - if (tipc_link_is_up(link)) { - disc_dupl_alert(b_ptr, orig, &media_addr); - tipc_node_unlock(n_ptr); - return; - } else { - memcpy(&link->media_addr, &media_addr, - sizeof(media_addr)); + if (accept_sign) + node->signature = signature; + + if (accept_addr) { + if (!link) + link = tipc_link_create(node, bearer, &maddr); + if (link) { + memcpy(&link->media_addr, &maddr, sizeof(maddr)); tipc_link_reset(link); + } else { + respond = false; } } - /* Create a link endpoint for this bearer, if necessary */ - if (!link) { - link = tipc_link_create(n_ptr, b_ptr, &media_addr); - if (!link) { - tipc_node_unlock(n_ptr); - return; - } - } - - /* Accept discovery message & send response, if necessary */ - link_fully_up = link_working_working(link); - - if ((type == DSC_REQ_MSG) && !link_fully_up) { + /* Send response, if necessary */ + if (respond && (mtyp == DSC_REQ_MSG)) { rbuf = tipc_buf_acquire(INT_H_SIZE); if (rbuf) { - tipc_disc_init_msg(rbuf, DSC_RESP_MSG, b_ptr); - tipc_bearer_send(b_ptr->identity, rbuf, &media_addr); + tipc_disc_init_msg(rbuf, DSC_RESP_MSG, bearer); + tipc_bearer_send(bearer->identity, rbuf, &maddr); kfree_skb(rbuf); } } - - tipc_node_unlock(n_ptr); + tipc_node_unlock(node); } /** From 9816f0615d549b948a76e6d2385159b4366e4658 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Wed, 14 May 2014 05:39:15 -0400 Subject: [PATCH 8/8] tipc: merge port message reception into socket reception function In order to reduce complexity and save a call level during message reception at port/socket level, we remove the function tipc_port_rcv() and merge its functionality into tipc_sk_rcv(). Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 3 ++- net/tipc/net.c | 3 ++- net/tipc/port.c | 39 ++++--------------------------- net/tipc/port.h | 1 - net/tipc/socket.c | 59 ++++++++++++++++++++++++++++++----------------- net/tipc/socket.h | 2 +- 6 files changed, 47 insertions(+), 60 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 24d058796cd9..ad2c57f5868d 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -37,6 +37,7 @@ #include "core.h" #include "link.h" #include "port.h" +#include "socket.h" #include "name_distr.h" #include "discover.h" #include "config.h" @@ -1590,7 +1591,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: tipc_node_unlock(n_ptr); - tipc_port_rcv(buf); + tipc_sk_rcv(buf); continue; case MSG_BUNDLER: l_ptr->stats.recv_bundles++; diff --git a/net/tipc/net.c b/net/tipc/net.c index f8fc95d58c0d..f64375e7f99f 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -39,6 +39,7 @@ #include "name_distr.h" #include "subscr.h" #include "port.h" +#include "socket.h" #include "node.h" #include "config.h" @@ -141,7 +142,7 @@ void tipc_net_route_msg(struct sk_buff *buf) if (msg_mcast(msg)) tipc_port_mcast_rcv(buf, NULL); else if (msg_destport(msg)) - tipc_port_rcv(buf); + tipc_sk_rcv(buf); else net_route_named_msg(buf); return; diff --git a/net/tipc/port.c b/net/tipc/port.c index 5c14c7801ee6..5fd7acce01ea 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -165,7 +165,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) msg_set_destnode(msg, tipc_own_addr); if (dp->count == 1) { msg_set_destport(msg, dp->ports[0]); - tipc_port_rcv(buf); + tipc_sk_rcv(buf); tipc_port_list_free(dp); return; } @@ -180,7 +180,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) if ((index == 0) && (cnt != 0)) item = item->next; msg_set_destport(buf_msg(b), item->ports[index]); - tipc_port_rcv(b); + tipc_sk_rcv(b); } } exit: @@ -343,7 +343,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err) /* send returned message & dispose of rejected message */ src_node = msg_prevnode(msg); if (in_own_node(src_node)) - tipc_port_rcv(rbuf); + tipc_sk_rcv(rbuf); else tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg)); exit: @@ -754,37 +754,6 @@ int tipc_port_shutdown(u32 ref) return tipc_port_disconnect(ref); } -/** - * tipc_port_rcv - receive message from lower layer and deliver to port user - */ -int tipc_port_rcv(struct sk_buff *buf) -{ - struct tipc_port *p_ptr; - struct tipc_msg *msg = buf_msg(buf); - u32 destport = msg_destport(msg); - u32 dsz = msg_data_sz(msg); - u32 err; - - /* forward unresolved named message */ - if (unlikely(!destport)) { - tipc_net_route_msg(buf); - return dsz; - } - - /* validate destination & pass to port, otherwise reject message */ - p_ptr = tipc_port_lock(destport); - if (likely(p_ptr)) { - err = tipc_sk_rcv(&tipc_port_to_sock(p_ptr)->sk, buf); - tipc_port_unlock(p_ptr); - if (likely(!err)) - return dsz; - } else { - err = TIPC_ERR_NO_PORT; - } - - return tipc_reject_msg(buf, err); -} - /* * tipc_port_iovec_rcv: Concatenate and deliver sectioned * message for this node. @@ -798,7 +767,7 @@ static int tipc_port_iovec_rcv(struct tipc_port *sender, res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf); if (likely(buf)) - tipc_port_rcv(buf); + tipc_sk_rcv(buf); return res; } diff --git a/net/tipc/port.h b/net/tipc/port.h index 5dfd165df1d7..cf4ca5b1d9a4 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -135,7 +135,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); /* * TIPC messaging routines */ -int tipc_port_rcv(struct sk_buff *buf); int tipc_send(struct tipc_port *port, struct iovec const *msg_sect, diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 249500614568..ac08966f2858 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1442,39 +1442,56 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) /** * tipc_sk_rcv - handle incoming message - * @sk: socket receiving message - * @buf: message - * - * Called with port lock already taken. - * - * Returns TIPC error status code (TIPC_OK if message is not to be rejected) + * @buf: buffer containing arriving message + * Consumes buffer + * Returns 0 if success, or errno: -EHOSTUNREACH */ -u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf) +int tipc_sk_rcv(struct sk_buff *buf) { - struct tipc_sock *tsk = tipc_sk(sk); - u32 res; + struct tipc_sock *tsk; + struct tipc_port *port; + struct sock *sk; + u32 dport = msg_destport(buf_msg(buf)); + int err = TIPC_OK; uint limit; - /* - * Process message if socket is unlocked; otherwise add to backlog queue - * - * This code is based on sk_receive_skb(), but must be distinct from it - * since a TIPC-specific filter/reject mechanism is utilized - */ + + /* Forward unresolved named message */ + if (unlikely(!dport)) { + tipc_net_route_msg(buf); + return 0; + } + + /* Validate destination */ + port = tipc_port_lock(dport); + if (unlikely(!port)) { + err = TIPC_ERR_NO_PORT; + goto exit; + } + + tsk = tipc_port_to_sock(port); + sk = &tsk->sk; + + /* Queue message */ bh_lock_sock(sk); + if (!sock_owned_by_user(sk)) { - res = filter_rcv(sk, buf); + err = filter_rcv(sk, buf); } else { if (sk->sk_backlog.len == 0) atomic_set(&tsk->dupl_rcvcnt, 0); limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); if (sk_add_backlog(sk, buf, limit)) - res = TIPC_ERR_OVERLOAD; - else - res = TIPC_OK; + err = TIPC_ERR_OVERLOAD; } - bh_unlock_sock(sk); - return res; + bh_unlock_sock(sk); + tipc_port_unlock(port); + + if (likely(!err)) + return 0; +exit: + tipc_reject_msg(buf, err); + return -EHOSTUNREACH; } static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 86c27cc51e33..3afcd2a70b31 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -69,6 +69,6 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk) tsk->sk.sk_write_space(&tsk->sk); } -u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf); +int tipc_sk_rcv(struct sk_buff *buf); #endif