netty: Rely on ChannelFactory in NettyServer instead of dynamic classes

Fixes #5649
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index 72c5c27..f0cfea9 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -38,6 +38,7 @@
 import io.grpc.internal.TransportTracer;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
@@ -66,7 +67,7 @@
 
   private final InternalLogId logId;
   private final SocketAddress address;
-  private final Class<? extends ServerChannel> channelType;
+  private final ChannelFactory<? extends ServerChannel> channelFactory;
   private final Map<ChannelOption<?>, ?> channelOptions;
   private final ProtocolNegotiator protocolNegotiator;
   private final int maxStreamsPerConnection;
@@ -95,7 +96,7 @@
       new AtomicReference<>();
 
   NettyServer(
-      SocketAddress address, Class<? extends ServerChannel> channelType,
+      SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory,
       Map<ChannelOption<?>, ?> channelOptions,
       ObjectPool<? extends EventLoopGroup> bossGroupPool,
       ObjectPool<? extends EventLoopGroup> workerGroupPool,
@@ -109,7 +110,7 @@
       boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
       InternalChannelz channelz) {
     this.address = address;
-    this.channelType = checkNotNull(channelType, "channelType");
+    this.channelFactory = checkNotNull(channelFactory, "channelFactory");
     checkNotNull(channelOptions, "channelOptions");
     this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
     this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
@@ -155,7 +156,7 @@
 
     ServerBootstrap b = new ServerBootstrap();
     b.group(bossGroup, workerGroup);
-    b.channel(channelType);
+    b.channelFactory(channelFactory);
     // For non-socket based channel, the option will be ignored.
     b.option(SO_BACKLOG, 128);
     b.childOption(SO_KEEPALIVE, true);
@@ -170,7 +171,7 @@
 
     b.childHandler(new ChannelInitializer<Channel>() {
       @Override
-      public void initChannel(Channel ch) throws Exception {
+      public void initChannel(Channel ch) {
 
         ChannelPromise channelDone = ch.newPromise();
 
@@ -217,7 +218,7 @@
          * Releases the event loop if the channel is "done", possibly due to the channel closing.
          */
         final class LoopReleaser implements ChannelFutureListener {
-          boolean done;
+          private boolean done;
 
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index 94d68f2..22fab43 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -25,7 +25,6 @@
 import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import io.grpc.ExperimentalApi;
 import io.grpc.Internal;
@@ -36,8 +35,10 @@
 import io.grpc.internal.KeepAliveManager;
 import io.grpc.internal.ObjectPool;
 import io.grpc.internal.SharedResourcePool;
+import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ReflectiveChannelFactory;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.ssl.SslContext;
@@ -79,7 +80,9 @@
       SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
 
   private final List<SocketAddress> listenAddresses = new ArrayList<>();
-  private Class<? extends ServerChannel> channelType = null;
+
+  private ChannelFactory<? extends ServerChannel> channelFactory =
+      Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
   private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
   private ObjectPool<? extends EventLoopGroup> bossEventLoopGroupPool =
       DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
@@ -91,7 +94,7 @@
   private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
   private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
   private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
-  private long keepAliveTimeInNanos =  DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
+  private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
   private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
   private long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
   private long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
@@ -142,8 +145,14 @@
   }
 
   /**
-   * Specify the channel type to use, by default we use {@link NioServerSocketChannel} or {@code
-   * EpollServerSocketChannel}.
+   * Specifies the channel type to use, by default we use {@code EpollServerSocketChannel} if
+   * available, otherwise using {@link NioServerSocketChannel}.
+   *
+   * <p>You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
+   * {@link ServerChannel} implementation has no no-args constructor.
+   *
+   * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
+   * when the channel is built, the builder will use the default one which is static.
    *
    * <p>You must also provide corresponding {@link EventLoopGroup} using {@link
    * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
@@ -151,7 +160,26 @@
    * io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
    */
   public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType) {
-    this.channelType = Preconditions.checkNotNull(channelType, "channelType");
+    checkNotNull(channelType, "channelType");
+    return channelFactory(new ReflectiveChannelFactory<>(channelType));
+  }
+
+  /**
+   * Specifies the {@link ChannelFactory} to create {@link ServerChannel} instances. This method is
+   * usually only used if the specific {@code ServerChannel} requires complex logic which requires
+   * additional information to create the {@code ServerChannel}. Otherwise, recommend to use {@link
+   * #channelType(Class)}.
+   *
+   * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
+   * when the channel is built, the builder will use the default one which is static.
+   *
+   * <p>You must also provide corresponding {@link EventLoopGroup} using {@link
+   * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
+   * example, if the factory creates {@link NioServerSocketChannel} you must use {@link
+   * io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
+   */
+  public NettyServerBuilder channelFactory(ChannelFactory<? extends ServerChannel> channelFactory) {
+    this.channelFactory = checkNotNull(channelFactory, "channelFactory");
     return this;
   }
 
@@ -499,16 +527,13 @@
     ProtocolNegotiator negotiator = protocolNegotiator;
     if (negotiator == null) {
       negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
-              ProtocolNegotiators.serverPlaintext();
+          ProtocolNegotiators.serverPlaintext();
     }
 
-    Class<? extends ServerChannel> resolvedChannelType =
-        channelType == null ? Utils.DEFAULT_SERVER_CHANNEL_TYPE : channelType;
-
     List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
     for (SocketAddress listenAddress : listenAddresses) {
       NettyServer transportServer = new NettyServer(
-          listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
+          listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
           workerEventLoopGroupPool, negotiator, streamTracerFactories,
           getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
           maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
@@ -521,10 +546,10 @@
 
   @VisibleForTesting
   void assertEventLoopsAndChannelType() {
-    boolean allProvided = channelType != null
+    boolean allProvided = channelFactory != Utils.DEFAULT_SERVER_CHANNEL_FACTORY
         && bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
         && workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
-    boolean nonProvided = channelType == null
+    boolean nonProvided = channelFactory == Utils.DEFAULT_SERVER_CHANNEL_FACTORY
         && bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
         && workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
     checkState(
diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java
index 76f40b0..41d5699 100644
--- a/netty/src/main/java/io/grpc/netty/Utils.java
+++ b/netty/src/main/java/io/grpc/netty/Utils.java
@@ -36,8 +36,10 @@
 import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ReflectiveChannelFactory;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -81,7 +83,7 @@
   public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
   public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
 
-  public static final Class<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_TYPE;
+  public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
   public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
 
   @Nullable
@@ -90,8 +92,8 @@
   static {
     // Decide default channel types and EventLoopGroup based on Epoll availability
     if (isEpollAvailable()) {
-      DEFAULT_SERVER_CHANNEL_TYPE = epollServerChannelType();
       DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType();
+      DEFAULT_SERVER_CHANNEL_FACTORY = new ReflectiveChannelFactory<>(epollServerChannelType());
       EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR = epollEventLoopGroupConstructor();
       DEFAULT_BOSS_EVENT_LOOP_GROUP
         = new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG", EventLoopGroupType.EPOLL);
@@ -99,7 +101,7 @@
         = new DefaultEventLoopGroupResource(0,"grpc-default-worker-ELG", EventLoopGroupType.EPOLL);
     } else {
       logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause());
-      DEFAULT_SERVER_CHANNEL_TYPE = NioServerSocketChannel.class;
+      DEFAULT_SERVER_CHANNEL_FACTORY = nioServerChannelFactory();
       DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class;
       DEFAULT_BOSS_EVENT_LOOP_GROUP = NIO_BOSS_EVENT_LOOP_GROUP;
       DEFAULT_WORKER_EVENT_LOOP_GROUP = NIO_WORKER_EVENT_LOOP_GROUP;
@@ -290,6 +292,15 @@
     }
   }
 
+  private static ChannelFactory<ServerChannel> nioServerChannelFactory() {
+    return new ChannelFactory<ServerChannel>() {
+      @Override
+      public ServerChannel newChannel() {
+        return new NioServerSocketChannel();
+      }
+    };
+  }
+
   /**
    * Returns TCP_USER_TIMEOUT channel option for Epoll channel if Epoll is available, otherwise
    * null.
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 75e40bb..a32f7e0 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -719,7 +719,7 @@
   private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
     server = new NettyServer(
         TestUtils.testServerAddress(new InetSocketAddress(0)),
-        NioServerSocketChannel.class,
+        new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
         new HashMap<ChannelOption<?>, Object>(),
         new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
         Collections.<ServerStreamTracer.Factory>emptyList(),
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index b4fa966..51f0a50 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -58,7 +58,7 @@
     InetSocketAddress addr = new InetSocketAddress(0);
     NettyServer ns = new NettyServer(
         addr,
-        Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+        Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
         new HashMap<ChannelOption<?>, Object>(),
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -96,7 +96,7 @@
     InetSocketAddress addr = new InetSocketAddress(0);
     NettyServer ns = new NettyServer(
         addr,
-        Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+        Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
         new HashMap<ChannelOption<?>, Object>(),
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -134,7 +134,7 @@
     InetSocketAddress addr = new InetSocketAddress(0);
     NettyServer ns = new NettyServer(
         addr,
-        Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+        Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
         channelOptions,
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -184,7 +184,7 @@
     InetSocketAddress addr = new InetSocketAddress(0);
     NettyServer ns = new NettyServer(
         addr,
-        Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+        Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
         new HashMap<ChannelOption<?>, Object>(),
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
diff --git a/netty/src/test/java/io/grpc/netty/UtilsTest.java b/netty/src/test/java/io/grpc/netty/UtilsTest.java
index bb66d29..c5d5113 100644
--- a/netty/src/test/java/io/grpc/netty/UtilsTest.java
+++ b/netty/src/test/java/io/grpc/netty/UtilsTest.java
@@ -29,9 +29,11 @@
 import io.grpc.Status;
 import io.grpc.internal.GrpcUtil;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ConnectTimeoutException;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -204,13 +206,13 @@
   }
 
   @Test
-  public void defaultServerChannelType_whenEpollIsAvailable() {
+  public void defaultServerChannelFactory_whenEpollIsAvailable() {
     assume().that(Utils.isEpollAvailable()).isTrue();
 
-    Class<? extends Channel> clientChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE;
+    ChannelFactory<? extends ServerChannel> channelFactory = Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
 
-    assertThat(clientChannelType.getName())
-        .isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel");
+    assertThat(channelFactory.toString())
+        .isEqualTo("ReflectiveChannelFactory(EpollServerSocketChannel.class)");
   }
 
   @Test