alts: handle inline flushes on close in frame handler
gRPC issues flushes after close in the WriteQueue, which can show up as an NPE in the framer. This was thought to have been handled, by checking to see if there were any pending writes, but if the close() call gets far enough, the writes will be null. This causes an NPE when the flush comes though.
The issue is difficult to reproduce, and I think my test case emulates the failure. EmbeddedChannel is different than the normal Channels we use, making the precise ordering tough. The test case isn't exactly what the production code would do, but it does have the same ordering.
cc @jiangtaoli2016
Sample Stack trace:
```
Jun 10, 2019 2:09:03 PM io.grpc.ChannelLogger log
FINEST: [OobChannel<10>] Entering SHUTDOWN state
Jun 10, 2019 2:09:03 PM io.grpc.ChannelLogger log
FINEST: [Subchannel-OOB<11>: (fake-authority-that-is-always-the-same)] NettyClientTransport<14>: (/0:0:0:0:0:0:0:1:20008) SHUTDOWN with UNAVAILABLE(OobChannel is shutdown)
Jun 10, 2019 2:09:03 PM io.grpc.netty.NettyClientHandler close
FINE: Network channel being closed by the application.
Jun 10, 2019 2:09:03 PM io.grpc.internal.ClientCallImpl logIfContextNarrowedTimeout
FINE: Call timeout set to '4999299080' ns, due to context deadline. Explicit call timeout was not set.
Jun 10, 2019 2:09:03 PM io.netty.handler.codec.http2.Http2FrameLogger logGoAway
FINE: [id: 0x4bcebba6, L:/0:0:0:0:0:0:0:1:33296 - R:/0:0:0:0:0:0:0:1:20008] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes=
Jun 10, 2019 2:09:03 PM io.grpc.netty.NettyClientHandler onConnectionError
FINE: Caught a connection error
java.lang.NullPointerException
at io.grpc.alts.internal.TsiFrameHandler.flush(TsiFrameHandler.java:126)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:754)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:746)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:732)
at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:754)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:746)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:732)
at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
at io.grpc.netty.WriteQueue.flush(WriteQueue.java:124)
at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Jun 10, 2019 2:09:03 PM io.netty.channel.AbstractChannelHandlerContext notifyHandlerException
WARNING: An exception was thrown by a user handler while handling an exceptionCaught event
java.lang.NullPointerException
at io.grpc.alts.internal.TsiFrameHandler.flush(TsiFrameHandler.java:126)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:754)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:746)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:732)
at io.netty.handler.codec.http2.Http2ConnectionHandler.onError(Http2ConnectionHandler.java:629)
at io.grpc.netty.AbstractNettyHandler.exceptionCaught(AbstractNettyHandler.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:268)
at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:143)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:836)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:746)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:732)
at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:754)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:746)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:732)
at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
at io.grpc.netty.WriteQueue.flush(WriteQueue.java:124)
at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Jun 10, 2019 2:09:03 PM io.grpc.netty.NettyClientHandler channelInactive
FINE: Network channel is closed
Jun 10, 2019 2:09:03 PM io.grpc.ChannelLogger log
FINEST: [Subchannel-OOB<11>: (fake-authority-that-is-always-the-same)] NettyClientTransport<14>: (/0:0:0:0:0:0:0:1:20008) Terminated
Jun 10, 2019 2:09:03 PM io.grpc.ChannelLogger log
FINEST: [Subchannel-OOB<11>: (fake-authority-that-is-always-the-same)] Terminated
```
diff --git a/alts/src/main/java/io/grpc/alts/internal/TsiFrameHandler.java b/alts/src/main/java/io/grpc/alts/internal/TsiFrameHandler.java
index 70b30fe..2294d7a 100644
--- a/alts/src/main/java/io/grpc/alts/internal/TsiFrameHandler.java
+++ b/alts/src/main/java/io/grpc/alts/internal/TsiFrameHandler.java
@@ -83,11 +83,7 @@
@Override
public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
- if (pendingUnprotectedWrites != null && !pendingUnprotectedWrites.isEmpty()) {
- pendingUnprotectedWrites.removeAndFailAll(
- new ChannelException("Pending write on removal of TSI handler"));
- }
- destroyProtector();
+ destroyProtectorAndWrites();
}
@Override
@@ -115,15 +111,14 @@
} catch (GeneralSecurityException e) {
logger.log(Level.FINE, "Ignored error on flush before close", e);
} finally {
- pendingUnprotectedWrites = null;
- destroyProtector();
+ destroyProtectorAndWrites();
}
}
@Override
@SuppressWarnings("FutureReturnValueIgnored") // for aggregatePromise.doneAllocatingPromises
public void flush(final ChannelHandlerContext ctx) throws GeneralSecurityException {
- if (pendingUnprotectedWrites.isEmpty()) {
+ if (pendingUnprotectedWrites == null || pendingUnprotectedWrites.isEmpty()) {
// Return early if there's nothing to write. Otherwise protector.protectFlush() below may
// not check for "no-data" and go on writing the 0-byte "data" to the socket with the
// protection framing.
@@ -184,7 +179,15 @@
ctx.read();
}
- private void destroyProtector() {
+ private void destroyProtectorAndWrites() {
+ try {
+ if (pendingUnprotectedWrites != null && !pendingUnprotectedWrites.isEmpty()) {
+ pendingUnprotectedWrites.removeAndFailAll(
+ new ChannelException("Pending write on teardown of TSI handler"));
+ }
+ } finally {
+ pendingUnprotectedWrites = null;
+ }
if (protector != null) {
try {
protector.destroy();
diff --git a/alts/src/test/java/io/grpc/alts/internal/TsiFrameHandlerTest.java b/alts/src/test/java/io/grpc/alts/internal/TsiFrameHandlerTest.java
index df2314b..f9ed0fa 100644
--- a/alts/src/test/java/io/grpc/alts/internal/TsiFrameHandlerTest.java
+++ b/alts/src/test/java/io/grpc/alts/internal/TsiFrameHandlerTest.java
@@ -22,6 +22,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import java.security.GeneralSecurityException;
@@ -86,6 +89,30 @@
channel.checkException();
}
+ @Test
+ public void flushAfterCloseShouldWork() throws InterruptedException {
+ ByteBuf msg = Unpooled.copiedBuffer("message after handshake failed", CharsetUtil.UTF_8);
+ channel.write(msg);
+
+ channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ // We have to call flush while doing a close, since close() tears down the pipeline
+ // immediately after.
+ channel.flush();
+ super.close(ctx, promise);
+ }
+ });
+
+ assertThat(channel.outboundMessages()).isEmpty();
+
+ channel.close().sync();
+ Object actual = channel.readOutbound();
+
+ assertWithMessage("pending write should be flushed on close").that(actual).isEqualTo(msg);
+ channel.checkException();
+ }
+
private static final class IdentityFrameProtector implements TsiFrameProtector {
@Override