tipc: eliminate delayed link deletion at link failover

When a bearer is disabled manually, all its links have to be reset
and deleted. However, if there is a remaining, parallel link ready
to take over a deleted link's traffic, we currently delay the delete
of the removed link until the failover procedure is finished. This
is because the remaining link needs to access state from the reset
link, such as the last received packet number, and any partially
reassembled buffer, in order to perform a successful failover.

In this commit, we do instead move the state data over to the new
link, so that it can fulfill the procedure autonomously, without
accessing any data on the old link. This means that we can now
proceed and delete all pertaining links immediately when a bearer
is disabled. This saves us from some unnecessary complexity in such
situations.

We also choose to change the confusing definitions CHANGEOVER_PROTOCOL,
ORIGINAL_MSG and DUPLICATE_MSG to the more descriptive TUNNEL_PROTOCOL,
FAILOVER_MSG and SYNCH_MSG respectively.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index c697cf6..b1e1795 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -89,17 +89,9 @@
 #define  TIMEOUT_EVT     560817u	/* link timer expired */
 
 /*
- * The following two 'message types' is really just implementation
- * data conveniently stored in the message header.
- * They must not be considered part of the protocol
+ * State value stored in 'failover_pkts'
  */
-#define OPEN_MSG   0
-#define CLOSED_MSG 1
-
-/*
- * State value stored in 'exp_msg_count'
- */
-#define START_CHANGEOVER 100000u
+#define FIRST_FAILOVER 0xffffu
 
 static void link_handle_out_of_seq_msg(struct tipc_link *link,
 				       struct sk_buff *skb);
@@ -113,8 +105,7 @@
 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
 static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
-static bool tipc_link_failover_rcv(struct tipc_node *node,
-				   struct sk_buff **skb);
+static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
 /*
  *  Simple link routines
  */
@@ -332,15 +323,19 @@
 }
 
 /**
- * link_delete - Conditional deletion of link.
- *               If timer still running, real delete is done when it expires
- * @link: link to be deleted
+ * tipc_link_delete - Delete a link
+ * @l: link to be deleted
  */
-void tipc_link_delete(struct tipc_link *link)
+void tipc_link_delete(struct tipc_link *l)
 {
-	tipc_link_reset_fragments(link);
-	tipc_node_detach_link(link->owner, link);
-	tipc_link_put(link);
+	tipc_link_reset(l);
+	if (del_timer(&l->timer))
+		tipc_link_put(l);
+	l->flags |= LINK_STOPPED;
+	/* Delete link now, or when timer is finished: */
+	tipc_link_reset_fragments(l);
+	tipc_node_detach_link(l->owner, l);
+	tipc_link_put(l);
 }
 
 void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
@@ -349,23 +344,12 @@
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_link *link;
 	struct tipc_node *node;
-	bool del_link;
 
 	rcu_read_lock();
 	list_for_each_entry_rcu(node, &tn->node_list, list) {
 		tipc_node_lock(node);
 		link = node->links[bearer_id];
-		if (!link) {
-			tipc_node_unlock(node);
-			continue;
-		}
-		del_link = !tipc_link_is_up(link) && !link->exp_msg_count;
-		tipc_link_reset(link);
-		if (del_timer(&link->timer))
-			tipc_link_put(link);
-		link->flags |= LINK_STOPPED;
-		/* Delete link now, or when failover is finished: */
-		if (shutting_down || !tipc_node_is_up(node) || del_link)
+		if (link)
 			tipc_link_delete(link);
 		tipc_node_unlock(node);
 	}
@@ -472,9 +456,9 @@
 void tipc_link_reset(struct tipc_link *l_ptr)
 {
 	u32 prev_state = l_ptr->state;
-	u32 checkpoint = l_ptr->next_in_no;
 	int was_active_link = tipc_link_is_active(l_ptr);
 	struct tipc_node *owner = l_ptr->owner;
+	struct tipc_link *pl = tipc_parallel_link(l_ptr);
 
 	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 
@@ -492,11 +476,15 @@
 	tipc_node_link_down(l_ptr->owner, l_ptr);
 	tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
 
-	if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
-		l_ptr->reset_checkpoint = checkpoint;
-		l_ptr->exp_msg_count = START_CHANGEOVER;
+	if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
+		l_ptr->flags |= LINK_FAILINGOVER;
+		l_ptr->failover_checkpt = l_ptr->next_in_no;
+		pl->failover_pkts = FIRST_FAILOVER;
+		pl->failover_checkpt = l_ptr->next_in_no;
+		pl->failover_skb = l_ptr->reasm_buf;
+	} else {
+		kfree_skb(l_ptr->reasm_buf);
 	}
-
 	/* Clean up all queues, except inputq: */
 	__skb_queue_purge(&l_ptr->transmq);
 	__skb_queue_purge(&l_ptr->deferdq);
@@ -506,6 +494,7 @@
 	if (!skb_queue_empty(owner->inputq))
 		owner->action_flags |= TIPC_MSG_EVT;
 	tipc_link_purge_backlog(l_ptr);
+	l_ptr->reasm_buf = NULL;
 	l_ptr->rcv_unacked = 0;
 	l_ptr->checkpoint = 1;
 	l_ptr->next_out_no = 1;
@@ -557,8 +546,7 @@
 	if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
 		return;		/* Not yet. */
 
-	/* Check whether changeover is going on */
-	if (l_ptr->exp_msg_count) {
+	if (l_ptr->flags & LINK_FAILINGOVER) {
 		if (event == TIMEOUT_EVT)
 			link_set_timer(l_ptr, cont_intv);
 		return;
@@ -1242,7 +1230,7 @@
 			node->action_flags |= TIPC_NAMED_MSG_EVT;
 		return true;
 	case MSG_BUNDLER:
-	case CHANGEOVER_PROTOCOL:
+	case TUNNEL_PROTOCOL:
 	case MSG_FRAGMENTER:
 	case BCAST_PROTOCOL:
 		return false;
@@ -1269,14 +1257,14 @@
 		return;
 
 	switch (msg_user(msg)) {
-	case CHANGEOVER_PROTOCOL:
+	case TUNNEL_PROTOCOL:
 		if (msg_dup(msg)) {
 			link->flags |= LINK_SYNCHING;
 			link->synch_point = msg_seqno(msg_get_wrapped(msg));
 			kfree_skb(skb);
 			break;
 		}
-		if (!tipc_link_failover_rcv(node, &skb))
+		if (!tipc_link_failover_rcv(link, &skb))
 			break;
 		if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
 			tipc_data_input(link, skb);
@@ -1391,8 +1379,8 @@
 	u32 msg_size = sizeof(l_ptr->proto_msg);
 	int r_flag;
 
-	/* Don't send protocol message during link changeover */
-	if (l_ptr->exp_msg_count)
+	/* Don't send protocol message during link failover */
+	if (l_ptr->flags & LINK_FAILINGOVER)
 		return;
 
 	/* Abort non-RESET send if communication with node is prohibited */
@@ -1444,7 +1432,7 @@
 		}
 		l_ptr->stats.sent_states++;
 	} else {		/* RESET_MSG or ACTIVATE_MSG */
-		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
+		msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1));
 		msg_set_seq_gap(msg, 0);
 		msg_set_next_sent(msg, 1);
 		msg_set_probe(msg, 0);
@@ -1486,8 +1474,7 @@
 	u32 msg_tol;
 	struct tipc_msg *msg = buf_msg(buf);
 
-	/* Discard protocol message during link changeover */
-	if (l_ptr->exp_msg_count)
+	if (l_ptr->flags & LINK_FAILINGOVER)
 		goto exit;
 
 	if (l_ptr->net_plane != msg_net_plane(msg))
@@ -1659,8 +1646,8 @@
 	if (!tunnel)
 		return;
 
-	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
-		      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
+	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL,
+		      FAILOVER_MSG, INT_H_SIZE, l_ptr->addr);
 	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
 	tipc_link_purge_backlog(l_ptr);
 	msgcount = skb_queue_len(&l_ptr->transmq);
@@ -1722,8 +1709,8 @@
 	struct sk_buff_head *queue = &link->transmq;
 	int mcnt;
 
-	tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
-		      DUPLICATE_MSG, INT_H_SIZE, link->addr);
+	tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL,
+		      SYNCH_MSG, INT_H_SIZE, link->addr);
 	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
 	msg_set_msgcnt(&tnl_hdr, mcnt);
 	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
@@ -1756,36 +1743,37 @@
 	goto tunnel_queue;
 }
 
-/*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
+/*  tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
  *  Owner node is locked.
  */
-static bool tipc_link_failover_rcv(struct tipc_node *node,
+static bool tipc_link_failover_rcv(struct tipc_link *link,
 				   struct sk_buff **skb)
 {
 	struct tipc_msg *msg = buf_msg(*skb);
 	struct sk_buff *iskb = NULL;
-	struct tipc_link *link = NULL;
+	struct tipc_link *pl = NULL;
 	int bearer_id = msg_bearer_id(msg);
 	int pos = 0;
 
-	if (msg_type(msg) != ORIGINAL_MSG) {
+	if (msg_type(msg) != FAILOVER_MSG) {
 		pr_warn("%sunknown tunnel pkt received\n", link_co_err);
 		goto exit;
 	}
 	if (bearer_id >= MAX_BEARERS)
 		goto exit;
-	link = node->links[bearer_id];
-	if (!link)
-		goto exit;
-	if (tipc_link_is_up(link))
-		tipc_link_reset(link);
 
-	/* First failover packet? */
-	if (link->exp_msg_count == START_CHANGEOVER)
-		link->exp_msg_count = msg_msgcnt(msg);
+	if (bearer_id == link->bearer_id)
+		goto exit;
+
+	pl = link->owner->links[bearer_id];
+	if (pl && tipc_link_is_up(pl))
+		tipc_link_reset(pl);
+
+	if (link->failover_pkts == FIRST_FAILOVER)
+		link->failover_pkts = msg_msgcnt(msg);
 
 	/* Should we expect an inner packet? */
-	if (!link->exp_msg_count)
+	if (!link->failover_pkts)
 		goto exit;
 
 	if (!tipc_msg_extract(*skb, &iskb, &pos)) {
@@ -1793,22 +1781,22 @@
 		*skb = NULL;
 		goto exit;
 	}
-	link->exp_msg_count--;
+	link->failover_pkts--;
 	*skb = NULL;
 
-	/* Was packet already delivered? */
-	if (less(buf_seqno(iskb), link->reset_checkpoint)) {
+	/* Was this packet already delivered? */
+	if (less(buf_seqno(iskb), link->failover_checkpt)) {
 		kfree_skb(iskb);
 		iskb = NULL;
 		goto exit;
 	}
 	if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
 		link->stats.recv_fragments++;
-		tipc_buf_append(&link->reasm_buf, &iskb);
+		tipc_buf_append(&link->failover_skb, &iskb);
 	}
 exit:
-	if (link && (!link->exp_msg_count) && (link->flags & LINK_STOPPED))
-		tipc_link_delete(link);
+	if (!link->failover_pkts && pl)
+		pl->flags &= ~LINK_FAILINGOVER;
 	kfree_skb(*skb);
 	*skb = iskb;
 	return *skb;