netty: Reduce race window size between GOAWAY and new streams
The race between new streams and transport shutdown is #2562, but it is still
far from being generally solved. This reduces the race window of new streams
from (transport selection → stream created on network thread) to (transport
selection → stream enqueued on network thread). Since only a single thread now
needs to do work in the stream creation race window, the window should be
dramatically smaller.
This only reduces GOAWAY races when the server performs a graceful shutdown
(using two GOAWAYs), as that is the only non-racy way on-the-wire to shutdown a
connection in HTTP/2.
diff --git a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
index e9f7040..e10150c 100644
--- a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
+++ b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java
@@ -43,14 +43,25 @@
listener.transportReady();
}
- public void notifyShutdown(Status s) {
+ /**
+ * Marks transport as shutdown, but does not set the error status. This must eventually be
+ * followed by a call to notifyShutdown.
+ */
+ public void notifyGracefulShutdown(Status s) {
if (transportShutdown) {
return;
}
transportShutdown = true;
+ listener.transportShutdown(s);
+ }
+
+ public void notifyShutdown(Status s) {
+ notifyGracefulShutdown(s);
+ if (shutdownStatus != null) {
+ return;
+ }
shutdownStatus = s;
shutdownThrowable = s.asException();
- listener.transportShutdown(s);
}
public void notifyInUse(boolean inUse) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index f04b3e6..677358f 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -755,10 +755,21 @@
/**
* Handler for a GOAWAY being received. Fails any streams created after the
- * last known stream.
+ * last known stream. May only be called during a read.
*/
private void goingAway(Status status) {
+ lifecycleManager.notifyGracefulShutdown(status);
+ // Try to allocate as many in-flight streams as possible, to reduce race window of
+ // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
+ // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
+ // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
+ // processed and thus this processing must be in-line before processing additional reads.
+
+ // This can cause reentrancy, but should be minor since it is normal to handle writes in
+ // response to a read. Also, the call stack is rather shallow at this point
+ clientWriteQueue.drainNow();
lifecycleManager.notifyShutdown(status);
+
final Status goAwayStatus = lifecycleManager.getShutdownStatus();
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
try {
diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java
index cfbfe75..f80e3fc 100644
--- a/netty/src/main/java/io/grpc/netty/WriteQueue.java
+++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java
@@ -102,6 +102,19 @@
}
/**
+ * Executes enqueued work directly on the current thread. This can be used to trigger writes
+ * before performing additional reads. Must be called from the event loop. This method makes no
+ * guarantee that the work queue is empty when it returns.
+ */
+ void drainNow() {
+ Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop");
+ if (queue.peek() == null) {
+ return;
+ }
+ flush();
+ }
+
+ /**
* Process the queue of commands and dispatch them to the stream. This method is only
* called in the event loop
*/
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index dfdfcf4..c5f8132 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -362,6 +362,19 @@
}
@Test
+ public void receivedGoAwayShouldNotAffectRacingQueuedStreamId() throws Exception {
+ // This command has not actually been executed yet
+ ChannelFuture future = writeQueue().enqueue(
+ newCreateStreamCommand(grpcHeaders, streamTransportState), true);
+ channelRead(goAwayFrame(streamId));
+ verify(streamListener, never())
+ .closed(any(Status.class), any(Metadata.class));
+ verify(streamListener, never())
+ .closed(any(Status.class), any(RpcProgress.class), any(Metadata.class));
+ assertTrue(future.isDone());
+ }
+
+ @Test
public void receivedResetWithRefuseCode() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(rstStreamFrame(streamId, (int) Http2Error.REFUSED_STREAM.code() ));
diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
index 1e08704..fab3c76 100644
--- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
+++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
@@ -203,6 +203,10 @@
}
}
+ protected final WriteQueue writeQueue() {
+ return writeQueue;
+ }
+
protected final T handler() {
return handler;
}