netty: make unexpected reads fail negotiation, and log close failures
In case a negotiating handler misses a read, and it reaches the WBAEH, it should cause a failure. Also, if closing the channel fails while handling another error, log the second failure.
diff --git a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
index 103d300..8312c9f 100644
--- a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
+++ b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
@@ -19,6 +19,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Status;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -34,7 +36,7 @@
/**
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
- * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
+ * {@link #failWrites(Throwable)} is called. This handler allows us to
* write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
* i.e. before it's active or the TLS Handshake is complete.
*/
@@ -96,7 +98,16 @@
// 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write.
// 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect.
if (ctx.channel().isActive() && previousFailure == null) {
- ctx.close();
+ final class LogOnFailure implements ChannelFutureListener {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ logger.log(Level.FINE, "Failed closing channel", future.cause());
+ }
+ }
+ }
+
+ ctx.close().addListener(new LogOnFailure());
}
}
@@ -139,6 +150,26 @@
promise.addListener(new ConnectListener());
}
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ try {
+ if (logger.isLoggable(Level.FINE)) {
+ Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg;
+ logger.log(
+ Level.FINE,
+ "Unexpected channelRead()->{0} reached end of pipeline {1}",
+ new Object[] {loggedMsg, ctx.pipeline().names()});
+ }
+ exceptionCaught(
+ ctx,
+ Status.INTERNAL.withDescription(
+ "channelRead() missed by ProtocolNegotiator handler: " + msg)
+ .asRuntimeException());
+ } finally {
+ ReferenceCountUtil.safeRelease(msg);
+ }
+ }
+
/**
* Calls to this method will not trigger an immediate flush. The flush will be deferred until
* {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
diff --git a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java
index da03639..e9d94b5 100644
--- a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java
@@ -27,6 +27,7 @@
import io.grpc.Status.Code;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
@@ -345,4 +346,38 @@
assertTrue(flush.get());
assertThat(chan.pipeline()).doesNotContain(handler);
}
+
+ @Test
+ public void uncaughtReadFails() throws Exception {
+ WriteBufferingAndExceptionHandler handler =
+ new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
+ LocalAddress addr = new LocalAddress("local");
+ ChannelFuture cf = new Bootstrap()
+ .channel(LocalChannel.class)
+ .handler(handler)
+ .group(group)
+ .register();
+ chan = cf.channel();
+ cf.sync();
+ ChannelFuture sf = new ServerBootstrap()
+ .channel(LocalServerChannel.class)
+ .childHandler(new ChannelHandlerAdapter() {})
+ .group(group)
+ .bind(addr);
+ server = sf.channel();
+ sf.sync();
+
+ ChannelFuture wf = chan.writeAndFlush(new Object());
+ chan.connect(addr);
+ chan.pipeline().fireChannelRead(Unpooled.copiedBuffer(new byte[] {'a'}));
+
+ try {
+ wf.sync();
+ fail();
+ } catch (Exception e) {
+ Status status = Status.fromThrowable(e);
+ assertThat(status.getCode()).isEqualTo(Code.INTERNAL);
+ assertThat(status.getDescription()).contains("channelRead() missed");
+ }
+ }
}