netty: use singleton for the custom allocator. (#6526)
The allocator has a circular reference that prevents it from GC'ed,
thus causes memory leak if gRPC Channels are created and shutdown
(even cleanly) on a regular basis.
See https://github.com/netty/netty/issues/6891#issuecomment-457809308
and internal b/146074696.
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index b8de408..0a0f952 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -40,7 +40,6 @@
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
@@ -75,8 +74,6 @@
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
- private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
- SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<>();
@@ -406,7 +403,7 @@
return new NettyTransportFactory(
negotiator, channelFactory, channelOptions,
- eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(),
+ eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
}
@@ -521,8 +518,6 @@
private final Map<ChannelOption<?>, ?> channelOptions;
private final ObjectPool<? extends EventLoopGroup> groupPool;
private final EventLoopGroup group;
- private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
- private final ByteBufAllocator allocator;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
@@ -538,7 +533,6 @@
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
- ObjectPool<? extends ByteBufAllocator> allocatorPool,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
@@ -548,8 +542,6 @@
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool;
this.group = groupPool.getObject();
- this.allocatorPool = allocatorPool;
- this.allocator = allocatorPool.getObject();
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
@@ -588,7 +580,7 @@
// TODO(carl-mastrangelo): Pass channelLogger in.
NettyClientTransport transport = new NettyClientTransport(
- serverAddress, channelFactory, channelOptions, group, allocator,
+ serverAddress, channelFactory, channelOptions, group,
localNegotiator, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
@@ -609,7 +601,6 @@
}
closed = true;
- allocatorPool.returnObject(allocator);
protocolNegotiator.close();
groupPool.returnObject(group);
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 61730c8..753c1e4 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -44,7 +44,6 @@
import io.grpc.internal.TransportTracer;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
@@ -76,7 +75,6 @@
private final SocketAddress remoteAddress;
private final ChannelFactory<? extends Channel> channelFactory;
private final EventLoopGroup group;
- private final ByteBufAllocator allocator;
private final ProtocolNegotiator negotiator;
private final String authorityString;
private final AsciiString authority;
@@ -108,7 +106,6 @@
NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
- ByteBufAllocator allocator,
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
@@ -119,7 +116,6 @@
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
- this.allocator = Preconditions.checkNotNull(allocator, "allocator");
this.channelFactory = channelFactory;
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
this.flowControlWindow = flowControlWindow;
@@ -230,7 +226,7 @@
ChannelHandler negotiationHandler = negotiator.newHandler(handler);
Bootstrap b = new Bootstrap();
- b.option(ALLOCATOR, allocator);
+ b.option(ALLOCATOR, Utils.getByteBufAllocator());
b.attr(LOGGER_KEY, channelLogger);
b.group(eventLoop);
b.channelFactory(channelFactory);
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index ec1dfc2..5c40019 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -38,7 +38,6 @@
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
@@ -75,10 +74,8 @@
private final int maxStreamsPerConnection;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
- private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
- private ByteBufAllocator allocator;
private ServerListener listener;
private Channel channel;
private final int flowControlWindow;
@@ -105,7 +102,6 @@
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
- ObjectPool<? extends ByteBufAllocator> allocatorPool,
ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
@@ -121,10 +117,8 @@
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
- this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool");
this.bossGroup = bossGroupPool.getObject();
this.workerGroup = workerGroupPool.getObject();
- this.allocator = allocatorPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracerFactory = transportTracerFactory;
@@ -163,8 +157,8 @@
listener = checkNotNull(serverListener, "serverListener");
ServerBootstrap b = new ServerBootstrap();
- b.option(ALLOCATOR, allocator);
- b.childOption(ALLOCATOR, allocator);
+ b.option(ALLOCATOR, Utils.getByteBufAllocator());
+ b.childOption(ALLOCATOR, Utils.getByteBufAllocator());
b.group(bossGroup, workerGroup);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
@@ -331,13 +325,6 @@
}
} finally {
workerGroup = null;
- try {
- if (allocator != null) {
- allocatorPool.returnObject(allocator);
- }
- } finally {
- allocator = null;
- }
}
}
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index 96d087f..d9547c0 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -35,7 +35,6 @@
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@@ -80,8 +79,6 @@
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
- private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
- SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
private final List<SocketAddress> listenAddresses = new ArrayList<>();
@@ -544,7 +541,7 @@
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
- workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories,
+ workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java
index df8d7a5..c5df0cb 100644
--- a/netty/src/main/java/io/grpc/netty/Utils.java
+++ b/netty/src/main/java/io/grpc/netty/Utils.java
@@ -86,41 +86,37 @@
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
- public static final Resource<ByteBufAllocator> BYTE_BUF_ALLOCATOR =
- new Resource<ByteBufAllocator>() {
- @Override
- public ByteBufAllocator create() {
- if (Boolean.parseBoolean(
- System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
- int maxOrder;
- if (System.getProperty("io.netty.allocator.maxOrder") == null) {
- // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
- // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
- // 2MiB, thus reducing the maxOrder to 8.
- maxOrder = 8;
- } else {
- maxOrder = PooledByteBufAllocator.defaultMaxOrder();
- }
- return new PooledByteBufAllocator(
- PooledByteBufAllocator.defaultPreferDirect(),
- PooledByteBufAllocator.defaultNumHeapArena(),
- PooledByteBufAllocator.defaultNumDirectArena(),
- PooledByteBufAllocator.defaultPageSize(),
- maxOrder,
- PooledByteBufAllocator.defaultTinyCacheSize(),
- PooledByteBufAllocator.defaultSmallCacheSize(),
- PooledByteBufAllocator.defaultNormalCacheSize(),
- PooledByteBufAllocator.defaultUseCacheForAllThreads());
- } else {
- return ByteBufAllocator.DEFAULT;
- }
- }
+ // This class is initialized on first use, thus provides delayed allocator creation.
+ private static final class ByteBufAllocatorHolder {
+ private static final ByteBufAllocator allocator;
- @Override
- public void close(ByteBufAllocator allocator) {
- // PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC.
+ static {
+ if (Boolean.parseBoolean(
+ System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
+ int maxOrder;
+ if (System.getProperty("io.netty.allocator.maxOrder") == null) {
+ // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
+ // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
+ // 2MiB, thus reducing the maxOrder to 8.
+ maxOrder = 8;
+ } else {
+ maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}
- };
+ allocator = new PooledByteBufAllocator(
+ PooledByteBufAllocator.defaultPreferDirect(),
+ PooledByteBufAllocator.defaultNumHeapArena(),
+ PooledByteBufAllocator.defaultNumDirectArena(),
+ PooledByteBufAllocator.defaultPageSize(),
+ maxOrder,
+ PooledByteBufAllocator.defaultTinyCacheSize(),
+ PooledByteBufAllocator.defaultSmallCacheSize(),
+ PooledByteBufAllocator.defaultNormalCacheSize(),
+ PooledByteBufAllocator.defaultUseCacheForAllThreads());
+ } else {
+ allocator = ByteBufAllocator.DEFAULT;
+ }
+ }
+ }
public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
@@ -148,6 +144,10 @@
}
}
+ public static ByteBufAllocator getByteBufAllocator() {
+ return ByteBufAllocatorHolder.allocator;
+ }
+
public static Metadata convertHeaders(Http2Headers http2Headers) {
if (http2Headers instanceof GrpcHttp2InboundHeaders) {
GrpcHttp2InboundHeaders h = (GrpcHttp2InboundHeaders) http2Headers;
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 2642f32..dc1a4b9 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -62,12 +62,9 @@
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
-import io.grpc.internal.SharedResourceHolder;
-import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
@@ -126,7 +123,6 @@
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
- private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR);
private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz();
private Runnable tooManyPingsRunnable = new Runnable() {
@@ -157,7 +153,6 @@
}
group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
- SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator);
}
@Test
@@ -195,7 +190,7 @@
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
- allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
+ newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false);
@@ -440,7 +435,7 @@
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
- new HashMap<ChannelOption<?>, Object>(), group, allocator,
+ new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
@@ -710,7 +705,7 @@
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
NettyClientTransport transport = new NettyClientTransport(
- address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, allocator,
+ address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
@@ -728,8 +723,7 @@
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
- new FixedObjectPool<>(group), new FixedObjectPool<>(group),
- SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator,
+ new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index f1f9679..b873785 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -83,7 +83,6 @@
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
- SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
protocolNegotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@@ -128,7 +127,6 @@
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
- SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@@ -167,7 +165,6 @@
channelOptions,
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
- SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@@ -218,7 +215,6 @@
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
- SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),