netty: Reduce race window size between GOAWAY and new streams

The race between new streams and transport shutdown is #2562, but it is still
far from being generally solved. This reduces the race window of new streams
from (transport selection → stream created on network thread) to (transport
selection → stream enqueued on network thread). Since only a single thread now
needs to do work in the stream creation race window, the window should be
dramatically smaller.

This only reduces GOAWAY races when the server performs a graceful shutdown
(using two GOAWAYs), as that is the only non-racy way on-the-wire to shutdown a
connection in HTTP/2.
diff --git a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
index e9f7040..e10150c 100644
--- a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
+++ b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
@@ -43,14 +43,25 @@
     listener.transportReady();
   }
 
-  public void notifyShutdown(Status s) {
+  /**
+   * Marks transport as shutdown, but does not set the error status. This must eventually be
+   * followed by a call to notifyShutdown.
+   */
+  public void notifyGracefulShutdown(Status s) {
     if (transportShutdown) {
       return;
     }
     transportShutdown = true;
+    listener.transportShutdown(s);
+  }
+
+  public void notifyShutdown(Status s) {
+    notifyGracefulShutdown(s);
+    if (shutdownStatus != null) {
+      return;
+    }
     shutdownStatus = s;
     shutdownThrowable = s.asException();
-    listener.transportShutdown(s);
   }
 
   public void notifyInUse(boolean inUse) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index f04b3e6..677358f 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -755,10 +755,21 @@
 
   /**
    * Handler for a GOAWAY being received. Fails any streams created after the
-   * last known stream.
+   * last known stream. May only be called during a read.
    */
   private void goingAway(Status status) {
+    lifecycleManager.notifyGracefulShutdown(status);
+    // Try to allocate as many in-flight streams as possible, to reduce race window of
+    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
+    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
+    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
+    // processed and thus this processing must be in-line before processing additional reads.
+
+    // This can cause reentrancy, but should be minor since it is normal to handle writes in
+    // response to a read. Also, the call stack is rather shallow at this point
+    clientWriteQueue.drainNow();
     lifecycleManager.notifyShutdown(status);
+
     final Status goAwayStatus = lifecycleManager.getShutdownStatus();
     final int lastKnownStream = connection().local().lastStreamKnownByPeer();
     try {
diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java
index cfbfe75..f80e3fc 100644
--- a/netty/src/main/java/io/grpc/netty/WriteQueue.java
+++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java
@@ -102,6 +102,19 @@
   }
 
   /**
+   * Executes enqueued work directly on the current thread. This can be used to trigger writes
+   * before performing additional reads. Must be called from the event loop. This method makes no
+   * guarantee that the work queue is empty when it returns.
+   */
+  void drainNow() {
+    Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop");
+    if (queue.peek() == null) {
+      return;
+    }
+    flush();
+  }
+
+  /**
    * Process the queue of commands and dispatch them to the stream. This method is only
    * called in the event loop
    */
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index dfdfcf4..c5f8132 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -362,6 +362,19 @@
   }
 
   @Test
+  public void receivedGoAwayShouldNotAffectRacingQueuedStreamId() throws Exception {
+    // This command has not actually been executed yet
+    ChannelFuture future = writeQueue().enqueue(
+        newCreateStreamCommand(grpcHeaders, streamTransportState), true);
+    channelRead(goAwayFrame(streamId));
+    verify(streamListener, never())
+        .closed(any(Status.class), any(Metadata.class));
+    verify(streamListener, never())
+        .closed(any(Status.class), any(RpcProgress.class), any(Metadata.class));
+    assertTrue(future.isDone());
+  }
+
+  @Test
   public void receivedResetWithRefuseCode() throws Exception {
     ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
     channelRead(rstStreamFrame(streamId, (int) Http2Error.REFUSED_STREAM.code() ));
diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
index 1e08704..fab3c76 100644
--- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
+++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
@@ -203,6 +203,10 @@
     }
   }
 
+  protected final WriteQueue writeQueue() {
+    return writeQueue;
+  }
+
   protected final T handler() {
     return handler;
   }