netty: Differentiate GOAWAY closure status descriptions
With this, it will be clear if the RPC failed because the server didn't
use a double-GOAWAY or if it failed because of MAX_CONCURRENT_STREAMS or
if it was due to a local race. It also fixes the status code to be
UNAVAILABLE except for the RPCs included in the GOAWAY error (modulo the
Netty bug).
Fixes #5855
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index bc2fb51..2ebe001 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -345,7 +345,11 @@
Http2Error(int code, Status status) {
this.code = code;
- this.status = status.augmentDescription("HTTP/2 error code: " + this.name());
+ String description = "HTTP/2 error code: " + this.name();
+ if (status.getDescription() != null) {
+ description += " (" + status.getDescription() + ")";
+ }
+ this.status = status.withDescription(description);
}
/**
diff --git a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
index e10150c..1111904 100644
--- a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
+++ b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
@@ -16,6 +16,7 @@
package io.grpc.netty;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.Status;
import io.grpc.internal.ManagedClientTransport;
@@ -55,13 +56,16 @@
listener.transportShutdown(s);
}
- public void notifyShutdown(Status s) {
+ /** Returns {@code true} if was the first shutdown. */
+ @CanIgnoreReturnValue
+ public boolean notifyShutdown(Status s) {
notifyGracefulShutdown(s);
if (shutdownStatus != null) {
- return;
+ return false;
}
shutdownStatus = s;
shutdownThrowable = s.asException();
+ return true;
}
public void notifyInUse(boolean inUse) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 631d9e8..a6929e6 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -130,6 +130,7 @@
private Attributes attributes;
private InternalChannelz.Security securityInfo;
private Status abruptGoAwayStatus;
+ private Status channelInactiveReason;
static NettyClientHandler newHandler(
ClientTransportLifecycleManager lifecycleManager,
@@ -278,7 +279,7 @@
@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
- goingAway(statusFromGoAway(errorCode, debugDataBytes));
+ goingAway(errorCode, debugDataBytes);
if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
String data = new String(debugDataBytes, UTF_8);
logger.log(
@@ -400,8 +401,7 @@
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
if (stream != null) {
PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
- Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
- .augmentDescription("Received Rst Stream");
+ Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
stream.transportReportStatus(
status,
errorCode == Http2Error.REFUSED_STREAM.code()
@@ -433,6 +433,12 @@
logger.fine("Network channel is closed");
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
lifecycleManager.notifyShutdown(status);
+ final Status streamStatus;
+ if (channelInactiveReason != null) {
+ streamStatus = channelInactiveReason;
+ } else {
+ streamStatus = lifecycleManager.getShutdownStatus();
+ }
try {
cancelPing(lifecycleManager.getShutdownThrowable());
// Report status to the application layer for any open streams
@@ -441,8 +447,7 @@
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
- clientStream.transportReportStatus(
- lifecycleManager.getShutdownStatus(), false, new Metadata());
+ clientStream.transportReportStatus(streamStatus, false, new Metadata());
}
return true;
}
@@ -630,8 +635,11 @@
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
StreamBufferingEncoder.Http2GoAwayException e =
(StreamBufferingEncoder.Http2GoAwayException) cause;
- lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData()));
- promise.setFailure(lifecycleManager.getShutdownThrowable());
+ Status status = statusFromH2Error(
+ Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
+ e.errorCode(), e.debugData());
+ stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata());
+ promise.setFailure(status.asRuntimeException());
} else {
promise.setFailure(cause);
}
@@ -786,9 +794,20 @@
* Handler for a GOAWAY being received. Fails any streams created after the
* last known stream. May only be called during a read.
*/
- private void goingAway(Status status) {
- lifecycleManager.notifyGracefulShutdown(status);
- abruptGoAwayStatus = status;
+ private void goingAway(long errorCode, byte[] debugData) {
+ Status finalStatus = statusFromH2Error(
+ Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
+ lifecycleManager.notifyGracefulShutdown(finalStatus);
+ abruptGoAwayStatus = statusFromH2Error(
+ Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
+ // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
+ // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
+ // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
+ // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
+ // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
+ // UNAVAILABLE. https://github.com/netty/netty/issues/10670
+ final Status abruptGoAwayStatusConservative = statusFromH2Error(
+ null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
// Try to allocate as many in-flight streams as possible, to reduce race window of
// https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
// gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
@@ -798,9 +817,13 @@
// This can cause reentrancy, but should be minor since it is normal to handle writes in
// response to a read. Also, the call stack is rather shallow at this point
clientWriteQueue.drainNow();
- lifecycleManager.notifyShutdown(status);
+ if (lifecycleManager.notifyShutdown(finalStatus)) {
+ // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
+ // RPCs were not observed by the remote and so should be UNAVAILABLE.
+ channelInactiveReason = statusFromH2Error(
+ null, "Connection closed after GOAWAY", errorCode, debugData);
+ }
- final Status goAwayStatus = lifecycleManager.getShutdownStatus();
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
try {
connection().forEachActiveStream(new Http2StreamVisitor() {
@@ -809,8 +832,13 @@
if (stream.id() > lastKnownStream) {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
+ // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
+ // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
+ // retries, but our main goal of transporent retries is to resolve the local race. We
+ // still hope/expect servers to use the graceful double-GOAWAY when closing
+ // connections.
clientStream.transportReportStatus(
- goAwayStatus, RpcProgress.REFUSED, false, new Metadata());
+ abruptGoAwayStatusConservative, RpcProgress.PROCESSED, false, new Metadata());
}
stream.close();
}
@@ -829,15 +857,20 @@
}
}
- private Status statusFromGoAway(long errorCode, byte[] debugData) {
- Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
- .augmentDescription("Received Goaway");
+ /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
+ private Status statusFromH2Error(
+ Status.Code statusCode, String context, long errorCode, byte[] debugData) {
+ Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode);
+ if (statusCode == null) {
+ statusCode = status.getCode();
+ }
+ String debugString = "";
if (debugData != null && debugData.length > 0) {
// If a debug message was provided, use it.
- String msg = new String(debugData, UTF_8);
- status = status.augmentDescription(msg);
+ debugString = ", debug data: " + new String(debugData, UTF_8);
}
- return status;
+ return statusCode.toStatus()
+ .withDescription(context + ". " + status.getDescription() + debugString);
}
/**
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index 4e469e7..b708d20 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -331,23 +331,12 @@
}
@Test
- public void receivedGoAwayShouldCancelBufferedStream() throws Exception {
- // Force the stream to be buffered.
- receiveMaxConcurrentStreams(0);
- ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
- channelRead(goAwayFrame(0));
- assertTrue(future.isDone());
- assertFalse(future.isSuccess());
- Status status = Status.fromThrowable(future.cause());
- assertEquals(Status.Code.UNAVAILABLE, status.getCode());
- assertEquals("HTTP/2 error code: NO_ERROR\nReceived Goaway", status.getDescription());
- }
-
- @Test
public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(goAwayFrame(streamId - 1));
- verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
+ // This _should_ be REFUSED, but we purposefully use PROCESSED. See comment for
+ // abruptGoAwayStatusConservative in NettyClientHandler
+ verify(streamListener).closed(any(Status.class), eq(PROCESSED), any(Metadata.class));
assertTrue(future.isDone());
}
@@ -386,8 +375,10 @@
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture(), same(REFUSED),
ArgumentMatchers.<Metadata>notNull());
- assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
- assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+ assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
+ assertEquals(
+ "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: CANCEL, "
+ + "debug data: this is a test",
captor.getValue().getDescription());
assertTrue(future.isDone());
}
@@ -415,15 +406,18 @@
// Read a GOAWAY that indicates our stream was never processed by the server.
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
- verify(streamListener).closed(captor.capture(), same(REFUSED),
+ // See comment for abruptGoAwayStatusConservative in NettyClientHandler
+ verify(streamListener).closed(captor.capture(), same(PROCESSED),
ArgumentMatchers.<Metadata>notNull());
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
- assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+ assertEquals(
+ "Abrupt GOAWAY closed sent stream. HTTP/2 error code: CANCEL, "
+ + "debug data: this is a test",
captor.getValue().getDescription());
}
@Test
- public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
+ public void receivedGoAwayShouldFailBufferedStreams() throws Exception {
receiveMaxConcurrentStreams(0);
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
@@ -433,8 +427,10 @@
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
- assertEquals(Status.CANCELLED.getCode(), status.getCode());
- assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+ assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
+ assertEquals(
+ "GOAWAY closed buffered stream. HTTP/2 error code: CANCEL, "
+ + "debug data: this is a test",
status.getDescription());
}
@@ -448,8 +444,10 @@
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
- assertEquals(Status.CANCELLED.getCode(), status.getCode());
- assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
+ assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
+ assertEquals(
+ "GOAWAY shut down transport. HTTP/2 error code: CANCEL, "
+ + "debug data: this is a test",
status.getDescription());
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 8762f0b..02b24b7 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -529,7 +529,7 @@
Throwable rootCause = getRootCause(e);
Status status = ((StatusException) rootCause).getStatus();
assertEquals(Status.Code.INTERNAL, status.getCode());
- assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream",
+ assertEquals("RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR",
status.getDescription());
}
}