tipc: move link input queue to tipc_node At present, the link input queue and the name distributor receive queues are fields aggregated in struct tipc_link. This is a hazard, because a link might be deleted while a receiving socket still keeps reference to one of the queues. This commit fixes this bug. However, rather than adding yet another reference counter to the critical data path, we move the two queues to safe ground inside struct tipc_node, which is already protected, and let the link code only handle references to the queues. This is also in line with planned later changes in this area. Reviewed-by: Ying Xue <[email protected]> Signed-off-by: Jon Maloy <[email protected]> Signed-off-by: David S. Miller <[email protected]>
diff --git a/net/tipc/link.c b/net/tipc/link.c index 03372a7..f8e0e2c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c
@@ -227,7 +227,9 @@ */ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, struct tipc_bearer *b_ptr, - const struct tipc_media_addr *media_addr) + const struct tipc_media_addr *media_addr, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq) { struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); struct tipc_link *l_ptr; @@ -289,8 +291,9 @@ __skb_queue_head_init(&l_ptr->backlogq); __skb_queue_head_init(&l_ptr->deferdq); skb_queue_head_init(&l_ptr->wakeupq); - skb_queue_head_init(&l_ptr->inputq); - skb_queue_head_init(&l_ptr->namedq); + l_ptr->inputq = inputq; + l_ptr->namedq = namedq; + skb_queue_head_init(l_ptr->inputq); link_reset_statistics(l_ptr); tipc_node_attach_link(n_ptr, l_ptr); setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); @@ -391,8 +394,8 @@ if ((pnd[imp] + l->backlog[imp].len) >= lim) break; skb_unlink(skb, &l->wakeupq); - skb_queue_tail(&l->inputq, skb); - l->owner->inputq = &l->inputq; + skb_queue_tail(l->inputq, skb); + l->owner->inputq = l->inputq; l->owner->action_flags |= TIPC_MSG_EVT; } } @@ -465,7 +468,7 @@ __skb_queue_purge(&l_ptr->transmq); __skb_queue_purge(&l_ptr->deferdq); if (!owner->inputq) - owner->inputq = &l_ptr->inputq; + owner->inputq = l_ptr->inputq; skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); if (!skb_queue_empty(owner->inputq)) owner->action_flags |= TIPC_MSG_EVT; @@ -962,7 +965,7 @@ /* Is it still in the input queue ? */ post_synch = mod(pl->rcv_nxt - l->synch_point) - 1; - if (skb_queue_len(&pl->inputq) > post_synch) + if (skb_queue_len(pl->inputq) > post_synch) return false; synched: l->flags &= ~LINK_SYNCHING; @@ -1141,16 +1144,16 @@ case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: case CONN_MANAGER: - if (tipc_skb_queue_tail(&link->inputq, skb, dport)) { - node->inputq = &link->inputq; + if (tipc_skb_queue_tail(link->inputq, skb, dport)) { + node->inputq = link->inputq; node->action_flags |= TIPC_MSG_EVT; } return true; case NAME_DISTRIBUTOR: node->bclink.recv_permitted = true; - node->namedq = &link->namedq; - skb_queue_tail(&link->namedq, skb); - if (skb_queue_len(&link->namedq) == 1) + node->namedq = link->namedq; + skb_queue_tail(link->namedq, skb); + if (skb_queue_len(link->namedq) == 1) node->action_flags |= TIPC_NAMED_MSG_EVT; return true; case MSG_BUNDLER: