netty: make unexpected reads fail negotiation, and log close failures

In case a negotiating handler misses a read, and it reaches the WBAEH, it should cause a failure. Also, if closing the channel fails while handling another error, log the second failure.
diff --git a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
index 103d300..8312c9f 100644
--- a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
+++ b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
@@ -19,6 +19,8 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import io.grpc.Status;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -34,7 +36,7 @@
 
 /**
  * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
- * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
+ * {@link #failWrites(Throwable)} is called. This handler allows us to
  * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
  * i.e.  before it's active or the TLS Handshake is complete.
  */
@@ -96,7 +98,16 @@
     // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write.
     // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect.
     if (ctx.channel().isActive() && previousFailure == null) {
-      ctx.close();
+      final class LogOnFailure implements ChannelFutureListener {
+        @Override
+        public void operationComplete(ChannelFuture future) {
+          if (!future.isSuccess()) {
+            logger.log(Level.FINE, "Failed closing channel", future.cause());
+          }
+        }
+      }
+
+      ctx.close().addListener(new LogOnFailure());
     }
   }
 
@@ -139,6 +150,26 @@
     promise.addListener(new ConnectListener());
   }
 
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) {
+    try {
+      if (logger.isLoggable(Level.FINE)) {
+        Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg;
+        logger.log(
+            Level.FINE,
+            "Unexpected channelRead()->{0} reached end of pipeline {1}",
+            new Object[] {loggedMsg, ctx.pipeline().names()});
+      }
+      exceptionCaught(
+          ctx,
+          Status.INTERNAL.withDescription(
+              "channelRead() missed by ProtocolNegotiator handler: " + msg)
+              .asRuntimeException());
+    } finally {
+      ReferenceCountUtil.safeRelease(msg);
+    }
+  }
+
   /**
    * Calls to this method will not trigger an immediate flush. The flush will be deferred until
    * {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
diff --git a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java
index da03639..e9d94b5 100644
--- a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java
@@ -27,6 +27,7 @@
 import io.grpc.Status.Code;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
@@ -345,4 +346,38 @@
     assertTrue(flush.get());
     assertThat(chan.pipeline()).doesNotContain(handler);
   }
+
+  @Test
+  public void uncaughtReadFails() throws Exception {
+    WriteBufferingAndExceptionHandler handler =
+        new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
+    LocalAddress addr = new LocalAddress("local");
+    ChannelFuture cf = new Bootstrap()
+        .channel(LocalChannel.class)
+        .handler(handler)
+        .group(group)
+        .register();
+    chan = cf.channel();
+    cf.sync();
+    ChannelFuture sf = new ServerBootstrap()
+        .channel(LocalServerChannel.class)
+        .childHandler(new ChannelHandlerAdapter() {})
+        .group(group)
+        .bind(addr);
+    server = sf.channel();
+    sf.sync();
+
+    ChannelFuture wf = chan.writeAndFlush(new Object());
+    chan.connect(addr);
+    chan.pipeline().fireChannelRead(Unpooled.copiedBuffer(new byte[] {'a'}));
+
+    try {
+      wf.sync();
+      fail();
+    } catch (Exception e) {
+      Status status = Status.fromThrowable(e);
+      assertThat(status.getCode()).isEqualTo(Code.INTERNAL);
+      assertThat(status.getDescription()).contains("channelRead() missed");
+    }
+  }
 }