netty: Differentiate GOAWAY closure status descriptions

With this, it will be clear if the RPC failed because the server didn't
use a double-GOAWAY or if it failed because of MAX_CONCURRENT_STREAMS or
if it was due to a local race. It also fixes the status code to be
UNAVAILABLE except for the RPCs included in the GOAWAY error (modulo the
Netty bug).

Fixes #5855
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index bc2fb51..2ebe001 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -345,7 +345,11 @@
 
     Http2Error(int code, Status status) {
       this.code = code;
-      this.status = status.augmentDescription("HTTP/2 error code: " + this.name());
+      String description = "HTTP/2 error code: " + this.name();
+      if (status.getDescription() != null) {
+        description += " (" + status.getDescription() + ")";
+      }
+      this.status = status.withDescription(description);
     }
 
     /**
diff --git a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
index e10150c..1111904 100644
--- a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
+++ b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
@@ -16,6 +16,7 @@
 
 package io.grpc.netty;
 
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import io.grpc.Status;
 import io.grpc.internal.ManagedClientTransport;
 
@@ -55,13 +56,16 @@
     listener.transportShutdown(s);
   }
 
-  public void notifyShutdown(Status s) {
+  /** Returns {@code true} if was the first shutdown. */
+  @CanIgnoreReturnValue
+  public boolean notifyShutdown(Status s) {
     notifyGracefulShutdown(s);
     if (shutdownStatus != null) {
-      return;
+      return false;
     }
     shutdownStatus = s;
     shutdownThrowable = s.asException();
+    return true;
   }
 
   public void notifyInUse(boolean inUse) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 631d9e8..a6929e6 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -130,6 +130,7 @@
   private Attributes attributes;
   private InternalChannelz.Security securityInfo;
   private Status abruptGoAwayStatus;
+  private Status channelInactiveReason;
 
   static NettyClientHandler newHandler(
       ClientTransportLifecycleManager lifecycleManager,
@@ -278,7 +279,7 @@
       @Override
       public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
         byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
-        goingAway(statusFromGoAway(errorCode, debugDataBytes));
+        goingAway(errorCode, debugDataBytes);
         if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
           String data = new String(debugDataBytes, UTF_8);
           logger.log(
@@ -400,8 +401,7 @@
     NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
     if (stream != null) {
       PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
-      Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
-          .augmentDescription("Received Rst Stream");
+      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
       stream.transportReportStatus(
           status,
           errorCode == Http2Error.REFUSED_STREAM.code()
@@ -433,6 +433,12 @@
       logger.fine("Network channel is closed");
       Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
       lifecycleManager.notifyShutdown(status);
+      final Status streamStatus;
+      if (channelInactiveReason != null) {
+        streamStatus = channelInactiveReason;
+      } else {
+        streamStatus = lifecycleManager.getShutdownStatus();
+      }
       try {
         cancelPing(lifecycleManager.getShutdownThrowable());
         // Report status to the application layer for any open streams
@@ -441,8 +447,7 @@
           public boolean visit(Http2Stream stream) throws Http2Exception {
             NettyClientStream.TransportState clientStream = clientStream(stream);
             if (clientStream != null) {
-              clientStream.transportReportStatus(
-                  lifecycleManager.getShutdownStatus(), false, new Metadata());
+              clientStream.transportReportStatus(streamStatus, false, new Metadata());
             }
             return true;
           }
@@ -630,8 +635,11 @@
               if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
                 StreamBufferingEncoder.Http2GoAwayException e =
                     (StreamBufferingEncoder.Http2GoAwayException) cause;
-                lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData()));
-                promise.setFailure(lifecycleManager.getShutdownThrowable());
+                Status status = statusFromH2Error(
+                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
+                    e.errorCode(), e.debugData());
+                stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata());
+                promise.setFailure(status.asRuntimeException());
               } else {
                 promise.setFailure(cause);
               }
@@ -786,9 +794,20 @@
    * Handler for a GOAWAY being received. Fails any streams created after the
    * last known stream. May only be called during a read.
    */
-  private void goingAway(Status status) {
-    lifecycleManager.notifyGracefulShutdown(status);
-    abruptGoAwayStatus = status;
+  private void goingAway(long errorCode, byte[] debugData) {
+    Status finalStatus = statusFromH2Error(
+        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
+    lifecycleManager.notifyGracefulShutdown(finalStatus);
+    abruptGoAwayStatus = statusFromH2Error(
+        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
+    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
+    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
+    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
+    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
+    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
+    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
+    final Status abruptGoAwayStatusConservative = statusFromH2Error(
+        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
     // Try to allocate as many in-flight streams as possible, to reduce race window of
     // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
     // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
@@ -798,9 +817,13 @@
     // This can cause reentrancy, but should be minor since it is normal to handle writes in
     // response to a read. Also, the call stack is rather shallow at this point
     clientWriteQueue.drainNow();
-    lifecycleManager.notifyShutdown(status);
+    if (lifecycleManager.notifyShutdown(finalStatus)) {
+      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
+      // RPCs were not observed by the remote and so should be UNAVAILABLE.
+      channelInactiveReason = statusFromH2Error(
+          null, "Connection closed after GOAWAY", errorCode, debugData);
+    }
 
-    final Status goAwayStatus = lifecycleManager.getShutdownStatus();
     final int lastKnownStream = connection().local().lastStreamKnownByPeer();
     try {
       connection().forEachActiveStream(new Http2StreamVisitor() {
@@ -809,8 +832,13 @@
           if (stream.id() > lastKnownStream) {
             NettyClientStream.TransportState clientStream = clientStream(stream);
             if (clientStream != null) {
+              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
+              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
+              // retries, but our main goal of transporent retries is to resolve the local race. We
+              // still hope/expect servers to use the graceful double-GOAWAY when closing
+              // connections.
               clientStream.transportReportStatus(
-                  goAwayStatus, RpcProgress.REFUSED, false, new Metadata());
+                  abruptGoAwayStatusConservative, RpcProgress.PROCESSED, false, new Metadata());
             }
             stream.close();
           }
@@ -829,15 +857,20 @@
     }
   }
 
-  private Status statusFromGoAway(long errorCode, byte[] debugData) {
-    Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
-        .augmentDescription("Received Goaway");
+  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
+  private Status statusFromH2Error(
+      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
+    Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode);
+    if (statusCode == null) {
+      statusCode = status.getCode();
+    }
+    String debugString = "";
     if (debugData != null && debugData.length > 0) {
       // If a debug message was provided, use it.
-      String msg = new String(debugData, UTF_8);
-      status = status.augmentDescription(msg);
+      debugString = ", debug data: " + new String(debugData, UTF_8);
     }
-    return status;
+    return statusCode.toStatus()
+        .withDescription(context + ". " + status.getDescription() + debugString);
   }
 
   /**
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index 4e469e7..b708d20 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -331,23 +331,12 @@
   }
 
   @Test
-  public void receivedGoAwayShouldCancelBufferedStream() throws Exception {
-    // Force the stream to be buffered.
-    receiveMaxConcurrentStreams(0);
-    ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
-    channelRead(goAwayFrame(0));
-    assertTrue(future.isDone());
-    assertFalse(future.isSuccess());
-    Status status = Status.fromThrowable(future.cause());
-    assertEquals(Status.Code.UNAVAILABLE, status.getCode());
-    assertEquals("HTTP/2 error code: NO_ERROR\nReceived Goaway", status.getDescription());
-  }
-
-  @Test
   public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
     ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
     channelRead(goAwayFrame(streamId - 1));
-    verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
+    // This _should_ be REFUSED, but we purposefully use PROCESSED. See comment for
+    // abruptGoAwayStatusConservative in NettyClientHandler
+    verify(streamListener).closed(any(Status.class), eq(PROCESSED), any(Metadata.class));
     assertTrue(future.isDone());
   }
 
@@ -386,8 +375,10 @@
     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
     verify(streamListener).closed(captor.capture(), same(REFUSED),
         ArgumentMatchers.<Metadata>notNull());
-    assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
-    assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+    assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
+    assertEquals(
+        "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: CANCEL, "
+          + "debug data: this is a test",
         captor.getValue().getDescription());
     assertTrue(future.isDone());
   }
@@ -415,15 +406,18 @@
     // Read a GOAWAY that indicates our stream was never processed by the server.
     channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
-    verify(streamListener).closed(captor.capture(), same(REFUSED),
+    // See comment for abruptGoAwayStatusConservative in NettyClientHandler
+    verify(streamListener).closed(captor.capture(), same(PROCESSED),
         ArgumentMatchers.<Metadata>notNull());
     assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
-    assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+    assertEquals(
+        "Abrupt GOAWAY closed sent stream. HTTP/2 error code: CANCEL, "
+          + "debug data: this is a test",
         captor.getValue().getDescription());
   }
 
   @Test
-  public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
+  public void receivedGoAwayShouldFailBufferedStreams() throws Exception {
     receiveMaxConcurrentStreams(0);
 
     ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
@@ -433,8 +427,10 @@
     assertTrue(future.isDone());
     assertFalse(future.isSuccess());
     Status status = Status.fromThrowable(future.cause());
-    assertEquals(Status.CANCELLED.getCode(), status.getCode());
-    assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+    assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
+    assertEquals(
+        "GOAWAY closed buffered stream. HTTP/2 error code: CANCEL, "
+          + "debug data: this is a test",
         status.getDescription());
   }
 
@@ -448,8 +444,10 @@
     assertTrue(future.isDone());
     assertFalse(future.isSuccess());
     Status status = Status.fromThrowable(future.cause());
-    assertEquals(Status.CANCELLED.getCode(), status.getCode());
-    assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+    assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
+    assertEquals(
+        "GOAWAY shut down transport. HTTP/2 error code: CANCEL, "
+          + "debug data: this is a test",
         status.getDescription());
   }
 
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 8762f0b..02b24b7 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -529,7 +529,7 @@
       Throwable rootCause = getRootCause(e);
       Status status = ((StatusException) rootCause).getStatus();
       assertEquals(Status.Code.INTERNAL, status.getCode());
-      assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream",
+      assertEquals("RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR",
           status.getDescription());
     }
   }