netty: NettyServer should close ProtocolNegotiator

diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index a1f5347..ec1dfc2 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -288,10 +288,11 @@
         if (stats != null) {
           channelz.removeListenSocket(stats);
         }
+        sharedResourceReferenceCounter.release();
+        protocolNegotiator.close();
         synchronized (NettyServer.this) {
           listener.serverShutdown();
         }
-        sharedResourceReferenceCounter.release();
       }
     });
     try {
diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java
index c3c668d..7dad1af 100644
--- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java
+++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java
@@ -38,9 +38,10 @@
   ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler);
 
   /**
-   * Releases resources held by this negotiator. Called when the Channel transitions to terminated.
-   * Is currently only supported on client-side; server-side protocol negotiators will not see this
-   * method called.
+   * Releases resources held by this negotiator. Called when the Channel transitions to terminated
+   * or when InternalServer is shutdown (depending on client or server). That means handlers
+   * returned by {@link #newHandler()} can outlive their parent negotiator on server-side, but not
+   * on client-side.
    */
   void close();
 }
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index 174ca1e..f1f9679 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -36,14 +36,17 @@
 import io.grpc.internal.SharedResourcePool;
 import io.grpc.internal.TransportTracer;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.WriteBufferWaterMark;
+import io.netty.util.AsciiString;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -54,8 +57,26 @@
   private final InternalChannelz channelz = new InternalChannelz();
 
   @Test
-  public void getPort() throws Exception {
+  public void startStop() throws Exception {
     InetSocketAddress addr = new InetSocketAddress(0);
+
+    class TestProtocolNegotiator implements ProtocolNegotiator {
+      boolean closed;
+
+      @Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override public void close() {
+        closed = true;
+      }
+
+      @Override public AsciiString scheme() {
+        return Utils.HTTP;
+      }
+    }
+
+    TestProtocolNegotiator protocolNegotiator = new TestProtocolNegotiator();
     NettyServer ns = new NettyServer(
         addr,
         Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
@@ -63,7 +84,7 @@
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
-        ProtocolNegotiators.plaintext(),
+        protocolNegotiator,
         Collections.<ServerStreamTracer.Factory>emptyList(),
         TransportTracer.getDefaultFactory(),
         1, // ignore
@@ -75,6 +96,7 @@
         1, 1, // ignore
         true, 0, // ignore
         channelz);
+    final SettableFuture<Void> serverShutdownCalled = SettableFuture.create();
     ns.start(new ServerListener() {
       @Override
       public ServerTransportListener transportCreated(ServerTransport transport) {
@@ -82,7 +104,9 @@
       }
 
       @Override
-      public void serverShutdown() {}
+      public void serverShutdown() {
+        serverShutdownCalled.set(null);
+      }
     });
 
     // Check that we got an actual port.
@@ -90,6 +114,9 @@
 
     // Cleanup
     ns.shutdown();
+    // serverShutdown() signals that resources are freed
+    serverShutdownCalled.get(1, TimeUnit.SECONDS);
+    assertThat(protocolNegotiator.closed).isTrue();
   }
 
   @Test