netty: Rely on ChannelFactory in NettyServer instead of dynamic classes
Fixes #5649
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index 72c5c27..f0cfea9 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -38,6 +38,7 @@
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
@@ -66,7 +67,7 @@
private final InternalLogId logId;
private final SocketAddress address;
- private final Class<? extends ServerChannel> channelType;
+ private final ChannelFactory<? extends ServerChannel> channelFactory;
private final Map<ChannelOption<?>, ?> channelOptions;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
@@ -95,7 +96,7 @@
new AtomicReference<>();
NettyServer(
- SocketAddress address, Class<? extends ServerChannel> channelType,
+ SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
@@ -109,7 +110,7 @@
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
InternalChannelz channelz) {
this.address = address;
- this.channelType = checkNotNull(channelType, "channelType");
+ this.channelFactory = checkNotNull(channelFactory, "channelFactory");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
@@ -155,7 +156,7 @@
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
- b.channel(channelType);
+ b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
@@ -170,7 +171,7 @@
b.childHandler(new ChannelInitializer<Channel>() {
@Override
- public void initChannel(Channel ch) throws Exception {
+ public void initChannel(Channel ch) {
ChannelPromise channelDone = ch.newPromise();
@@ -217,7 +218,7 @@
* Releases the event loop if the channel is "done", possibly due to the channel closing.
*/
final class LoopReleaser implements ChannelFutureListener {
- boolean done;
+ private boolean done;
@Override
public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index 94d68f2..22fab43 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -25,7 +25,6 @@
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ExperimentalApi;
import io.grpc.Internal;
@@ -36,8 +35,10 @@
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
+import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ReflectiveChannelFactory;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
@@ -79,7 +80,9 @@
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private final List<SocketAddress> listenAddresses = new ArrayList<>();
- private Class<? extends ServerChannel> channelType = null;
+
+ private ChannelFactory<? extends ServerChannel> channelFactory =
+ Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
private ObjectPool<? extends EventLoopGroup> bossEventLoopGroupPool =
DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
@@ -91,7 +94,7 @@
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
- private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
+ private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
private long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
private long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
@@ -142,8 +145,14 @@
}
/**
- * Specify the channel type to use, by default we use {@link NioServerSocketChannel} or {@code
- * EpollServerSocketChannel}.
+ * Specifies the channel type to use, by default we use {@code EpollServerSocketChannel} if
+ * available, otherwise using {@link NioServerSocketChannel}.
+ *
+ * <p>You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
+ * {@link ServerChannel} implementation has no no-args constructor.
+ *
+ * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
+ * when the channel is built, the builder will use the default one which is static.
*
* <p>You must also provide corresponding {@link EventLoopGroup} using {@link
* #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
@@ -151,7 +160,26 @@
* io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
*/
public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType) {
- this.channelType = Preconditions.checkNotNull(channelType, "channelType");
+ checkNotNull(channelType, "channelType");
+ return channelFactory(new ReflectiveChannelFactory<>(channelType));
+ }
+
+ /**
+ * Specifies the {@link ChannelFactory} to create {@link ServerChannel} instances. This method is
+ * usually only used if the specific {@code ServerChannel} requires complex logic which requires
+ * additional information to create the {@code ServerChannel}. Otherwise, recommend to use {@link
+ * #channelType(Class)}.
+ *
+ * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
+ * when the channel is built, the builder will use the default one which is static.
+ *
+ * <p>You must also provide corresponding {@link EventLoopGroup} using {@link
+ * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
+ * example, if the factory creates {@link NioServerSocketChannel} you must use {@link
+ * io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
+ */
+ public NettyServerBuilder channelFactory(ChannelFactory<? extends ServerChannel> channelFactory) {
+ this.channelFactory = checkNotNull(channelFactory, "channelFactory");
return this;
}
@@ -499,16 +527,13 @@
ProtocolNegotiator negotiator = protocolNegotiator;
if (negotiator == null) {
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
- ProtocolNegotiators.serverPlaintext();
+ ProtocolNegotiators.serverPlaintext();
}
- Class<? extends ServerChannel> resolvedChannelType =
- channelType == null ? Utils.DEFAULT_SERVER_CHANNEL_TYPE : channelType;
-
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
- listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
+ listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
@@ -521,10 +546,10 @@
@VisibleForTesting
void assertEventLoopsAndChannelType() {
- boolean allProvided = channelType != null
+ boolean allProvided = channelFactory != Utils.DEFAULT_SERVER_CHANNEL_FACTORY
&& bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
&& workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
- boolean nonProvided = channelType == null
+ boolean nonProvided = channelFactory == Utils.DEFAULT_SERVER_CHANNEL_FACTORY
&& bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
&& workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
checkState(
diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java
index 76f40b0..41d5699 100644
--- a/netty/src/main/java/io/grpc/netty/Utils.java
+++ b/netty/src/main/java/io/grpc/netty/Utils.java
@@ -36,8 +36,10 @@
import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ReflectiveChannelFactory;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -81,7 +83,7 @@
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
- public static final Class<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_TYPE;
+ public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
@Nullable
@@ -90,8 +92,8 @@
static {
// Decide default channel types and EventLoopGroup based on Epoll availability
if (isEpollAvailable()) {
- DEFAULT_SERVER_CHANNEL_TYPE = epollServerChannelType();
DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType();
+ DEFAULT_SERVER_CHANNEL_FACTORY = new ReflectiveChannelFactory<>(epollServerChannelType());
EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR = epollEventLoopGroupConstructor();
DEFAULT_BOSS_EVENT_LOOP_GROUP
= new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG", EventLoopGroupType.EPOLL);
@@ -99,7 +101,7 @@
= new DefaultEventLoopGroupResource(0,"grpc-default-worker-ELG", EventLoopGroupType.EPOLL);
} else {
logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause());
- DEFAULT_SERVER_CHANNEL_TYPE = NioServerSocketChannel.class;
+ DEFAULT_SERVER_CHANNEL_FACTORY = nioServerChannelFactory();
DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class;
DEFAULT_BOSS_EVENT_LOOP_GROUP = NIO_BOSS_EVENT_LOOP_GROUP;
DEFAULT_WORKER_EVENT_LOOP_GROUP = NIO_WORKER_EVENT_LOOP_GROUP;
@@ -290,6 +292,15 @@
}
}
+ private static ChannelFactory<ServerChannel> nioServerChannelFactory() {
+ return new ChannelFactory<ServerChannel>() {
+ @Override
+ public ServerChannel newChannel() {
+ return new NioServerSocketChannel();
+ }
+ };
+ }
+
/**
* Returns TCP_USER_TIMEOUT channel option for Epoll channel if Epoll is available, otherwise
* null.
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 75e40bb..a32f7e0 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -719,7 +719,7 @@
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(new InetSocketAddress(0)),
- NioServerSocketChannel.class,
+ new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index b4fa966..51f0a50 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -58,7 +58,7 @@
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
- Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+ Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -96,7 +96,7 @@
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
- Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+ Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -134,7 +134,7 @@
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
- Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+ Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
channelOptions,
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -184,7 +184,7 @@
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
- Utils.DEFAULT_SERVER_CHANNEL_TYPE,
+ Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
diff --git a/netty/src/test/java/io/grpc/netty/UtilsTest.java b/netty/src/test/java/io/grpc/netty/UtilsTest.java
index bb66d29..c5d5113 100644
--- a/netty/src/test/java/io/grpc/netty/UtilsTest.java
+++ b/netty/src/test/java/io/grpc/netty/UtilsTest.java
@@ -29,9 +29,11 @@
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -204,13 +206,13 @@
}
@Test
- public void defaultServerChannelType_whenEpollIsAvailable() {
+ public void defaultServerChannelFactory_whenEpollIsAvailable() {
assume().that(Utils.isEpollAvailable()).isTrue();
- Class<? extends Channel> clientChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE;
+ ChannelFactory<? extends ServerChannel> channelFactory = Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
- assertThat(clientChannelType.getName())
- .isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel");
+ assertThat(channelFactory.toString())
+ .isEqualTo("ReflectiveChannelFactory(EpollServerSocketChannel.class)");
}
@Test