netty: use singleton for the custom allocator. (#6526)

The allocator has a circular reference that prevents it from GC'ed,
thus causes memory leak if gRPC Channels are created and shutdown
(even cleanly) on a regular basis.

See https://github.com/netty/netty/issues/6891#issuecomment-457809308
and internal b/146074696.
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index b8de408..0a0f952 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -40,7 +40,6 @@
 import io.grpc.internal.ObjectPool;
 import io.grpc.internal.SharedResourcePool;
 import io.grpc.internal.TransportTracer;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelOption;
@@ -75,8 +74,6 @@
       new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
   private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
       SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
-  private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
-      SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
 
   private final Map<ChannelOption<?>, Object> channelOptions =
       new HashMap<>();
@@ -406,7 +403,7 @@
 
     return new NettyTransportFactory(
         negotiator, channelFactory, channelOptions,
-        eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(),
+        eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
         maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
         transportTracerFactory, localSocketPicker, useGetForSafeMethods);
   }
@@ -521,8 +518,6 @@
     private final Map<ChannelOption<?>, ?> channelOptions;
     private final ObjectPool<? extends EventLoopGroup> groupPool;
     private final EventLoopGroup group;
-    private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
-    private final ByteBufAllocator allocator;
     private final int flowControlWindow;
     private final int maxMessageSize;
     private final int maxHeaderListSize;
@@ -538,7 +533,6 @@
     NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
         ChannelFactory<? extends Channel> channelFactory,
         Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
-        ObjectPool<? extends ByteBufAllocator> allocatorPool,
         int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
         long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
         TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
@@ -548,8 +542,6 @@
       this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
       this.groupPool = groupPool;
       this.group = groupPool.getObject();
-      this.allocatorPool = allocatorPool;
-      this.allocator = allocatorPool.getObject();
       this.flowControlWindow = flowControlWindow;
       this.maxMessageSize = maxMessageSize;
       this.maxHeaderListSize = maxHeaderListSize;
@@ -588,7 +580,7 @@
 
       // TODO(carl-mastrangelo): Pass channelLogger in.
       NettyClientTransport transport = new NettyClientTransport(
-          serverAddress, channelFactory, channelOptions, group, allocator,
+          serverAddress, channelFactory, channelOptions, group,
           localNegotiator, flowControlWindow,
           maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
           keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
@@ -609,7 +601,6 @@
       }
       closed = true;
 
-      allocatorPool.returnObject(allocator);
       protocolNegotiator.close();
       groupPool.returnObject(group);
     }
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 61730c8..753c1e4 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -44,7 +44,6 @@
 import io.grpc.internal.TransportTracer;
 import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
 import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelFuture;
@@ -76,7 +75,6 @@
   private final SocketAddress remoteAddress;
   private final ChannelFactory<? extends Channel> channelFactory;
   private final EventLoopGroup group;
-  private final ByteBufAllocator allocator;
   private final ProtocolNegotiator negotiator;
   private final String authorityString;
   private final AsciiString authority;
@@ -108,7 +106,6 @@
   NettyClientTransport(
       SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
       Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
-      ByteBufAllocator allocator,
       ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
       int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
       boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
@@ -119,7 +116,6 @@
     this.negotiationScheme = this.negotiator.scheme();
     this.remoteAddress = Preconditions.checkNotNull(address, "address");
     this.group = Preconditions.checkNotNull(group, "group");
-    this.allocator = Preconditions.checkNotNull(allocator, "allocator");
     this.channelFactory = channelFactory;
     this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
     this.flowControlWindow = flowControlWindow;
@@ -230,7 +226,7 @@
     ChannelHandler negotiationHandler = negotiator.newHandler(handler);
 
     Bootstrap b = new Bootstrap();
-    b.option(ALLOCATOR, allocator);
+    b.option(ALLOCATOR, Utils.getByteBufAllocator());
     b.attr(LOGGER_KEY, channelLogger);
     b.group(eventLoop);
     b.channelFactory(channelFactory);
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index ec1dfc2..5c40019 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -38,7 +38,6 @@
 import io.grpc.internal.ServerTransportListener;
 import io.grpc.internal.TransportTracer;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelFuture;
@@ -75,10 +74,8 @@
   private final int maxStreamsPerConnection;
   private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
   private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
-  private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
   private EventLoopGroup bossGroup;
   private EventLoopGroup workerGroup;
-  private ByteBufAllocator allocator;
   private ServerListener listener;
   private Channel channel;
   private final int flowControlWindow;
@@ -105,7 +102,6 @@
       Map<ChannelOption<?>, ?> channelOptions,
       ObjectPool<? extends EventLoopGroup> bossGroupPool,
       ObjectPool<? extends EventLoopGroup> workerGroupPool,
-      ObjectPool<? extends ByteBufAllocator> allocatorPool,
       ProtocolNegotiator protocolNegotiator,
       List<? extends ServerStreamTracer.Factory> streamTracerFactories,
       TransportTracer.Factory transportTracerFactory,
@@ -121,10 +117,8 @@
     this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
     this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
     this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
-    this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool");
     this.bossGroup = bossGroupPool.getObject();
     this.workerGroup = workerGroupPool.getObject();
-    this.allocator = allocatorPool.getObject();
     this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
     this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
     this.transportTracerFactory = transportTracerFactory;
@@ -163,8 +157,8 @@
     listener = checkNotNull(serverListener, "serverListener");
 
     ServerBootstrap b = new ServerBootstrap();
-    b.option(ALLOCATOR, allocator);
-    b.childOption(ALLOCATOR, allocator);
+    b.option(ALLOCATOR, Utils.getByteBufAllocator());
+    b.childOption(ALLOCATOR, Utils.getByteBufAllocator());
     b.group(bossGroup, workerGroup);
     b.channelFactory(channelFactory);
     // For non-socket based channel, the option will be ignored.
@@ -331,13 +325,6 @@
           }
         } finally {
           workerGroup = null;
-          try  {
-            if (allocator != null) {
-              allocatorPool.returnObject(allocator);
-            }
-          } finally {
-            allocator = null;
-          }
         }
       }
     }
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index 96d087f..d9547c0 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -35,7 +35,6 @@
 import io.grpc.internal.KeepAliveManager;
 import io.grpc.internal.ObjectPool;
 import io.grpc.internal.SharedResourcePool;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -80,8 +79,6 @@
       SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
   private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
       SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
-  private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
-      SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
 
   private final List<SocketAddress> listenAddresses = new ArrayList<>();
 
@@ -544,7 +541,7 @@
     for (SocketAddress listenAddress : listenAddresses) {
       NettyServer transportServer = new NettyServer(
           listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
-          workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories,
+          workerEventLoopGroupPool, negotiator, streamTracerFactories,
           getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
           maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
           maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java
index df8d7a5..c5df0cb 100644
--- a/netty/src/main/java/io/grpc/netty/Utils.java
+++ b/netty/src/main/java/io/grpc/netty/Utils.java
@@ -86,41 +86,37 @@
   public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
   public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
 
-  public static final Resource<ByteBufAllocator> BYTE_BUF_ALLOCATOR =
-      new Resource<ByteBufAllocator>() {
-        @Override
-        public ByteBufAllocator create() {
-          if (Boolean.parseBoolean(
-                  System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
-            int maxOrder;
-            if (System.getProperty("io.netty.allocator.maxOrder") == null) {
-              // See the implementation of PooledByteBufAllocator.  DEFAULT_MAX_ORDER in there is
-              // 11, which makes chunk size to be 8192 << 11 = 16 MiB.  We want the chunk size to be
-              // 2MiB, thus reducing the maxOrder to 8.
-              maxOrder = 8;
-            } else {
-              maxOrder = PooledByteBufAllocator.defaultMaxOrder();
-            }
-            return new PooledByteBufAllocator(
-                PooledByteBufAllocator.defaultPreferDirect(),
-                PooledByteBufAllocator.defaultNumHeapArena(),
-                PooledByteBufAllocator.defaultNumDirectArena(),
-                PooledByteBufAllocator.defaultPageSize(),
-                maxOrder,
-                PooledByteBufAllocator.defaultTinyCacheSize(),
-                PooledByteBufAllocator.defaultSmallCacheSize(),
-                PooledByteBufAllocator.defaultNormalCacheSize(),
-                PooledByteBufAllocator.defaultUseCacheForAllThreads());
-          } else {
-            return ByteBufAllocator.DEFAULT;
-          }
-        }
+  // This class is initialized on first use, thus provides delayed allocator creation.
+  private static final class ByteBufAllocatorHolder {
+    private static final ByteBufAllocator allocator;
 
-        @Override
-        public void close(ByteBufAllocator allocator) {
-          // PooledByteBufAllocator doesn't provide a shutdown method.  Leaving it to GC.
+    static {
+      if (Boolean.parseBoolean(
+              System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
+        int maxOrder;
+        if (System.getProperty("io.netty.allocator.maxOrder") == null) {
+          // See the implementation of PooledByteBufAllocator.  DEFAULT_MAX_ORDER in there is
+          // 11, which makes chunk size to be 8192 << 11 = 16 MiB.  We want the chunk size to be
+          // 2MiB, thus reducing the maxOrder to 8.
+          maxOrder = 8;
+        } else {
+          maxOrder = PooledByteBufAllocator.defaultMaxOrder();
         }
-      };
+        allocator = new PooledByteBufAllocator(
+            PooledByteBufAllocator.defaultPreferDirect(),
+            PooledByteBufAllocator.defaultNumHeapArena(),
+            PooledByteBufAllocator.defaultNumDirectArena(),
+            PooledByteBufAllocator.defaultPageSize(),
+            maxOrder,
+            PooledByteBufAllocator.defaultTinyCacheSize(),
+            PooledByteBufAllocator.defaultSmallCacheSize(),
+            PooledByteBufAllocator.defaultNormalCacheSize(),
+            PooledByteBufAllocator.defaultUseCacheForAllThreads());
+      } else {
+        allocator = ByteBufAllocator.DEFAULT;
+      }
+    }
+  }
 
   public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
   public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
@@ -148,6 +144,10 @@
     }
   }
 
+  public static ByteBufAllocator getByteBufAllocator() {
+    return ByteBufAllocatorHolder.allocator;
+  }
+
   public static Metadata convertHeaders(Http2Headers http2Headers) {
     if (http2Headers instanceof GrpcHttp2InboundHeaders) {
       GrpcHttp2InboundHeaders h = (GrpcHttp2InboundHeaders) http2Headers;
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 2642f32..dc1a4b9 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -62,12 +62,9 @@
 import io.grpc.internal.ServerStreamListener;
 import io.grpc.internal.ServerTransport;
 import io.grpc.internal.ServerTransportListener;
-import io.grpc.internal.SharedResourceHolder;
-import io.grpc.internal.SharedResourcePool;
 import io.grpc.internal.TransportTracer;
 import io.grpc.internal.testing.TestUtils;
 import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelConfig;
 import io.netty.channel.ChannelDuplexHandler;
@@ -126,7 +123,6 @@
   private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
       new LinkedBlockingQueue<>();
   private final NioEventLoopGroup group = new NioEventLoopGroup(1);
-  private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR);
   private final EchoServerListener serverListener = new EchoServerListener();
   private final InternalChannelz channelz = new InternalChannelz();
   private Runnable tooManyPingsRunnable = new Runnable() {
@@ -157,7 +153,6 @@
     }
 
     group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
-    SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator);
   }
 
   @Test
@@ -195,7 +190,7 @@
     channelOptions.put(ChannelOption.SO_LINGER, soLinger);
     NettyClientTransport transport = new NettyClientTransport(
         address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
-        allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
+        newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
         null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
         new SocketPicker(), new FakeChannelLogger(), false);
@@ -440,7 +435,7 @@
     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
     NettyClientTransport transport = new NettyClientTransport(
         address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
-        new HashMap<ChannelOption<?>, Object>(), group, allocator,
+        new HashMap<ChannelOption<?>, Object>(), group,
         newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
         null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
@@ -710,7 +705,7 @@
       keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
     }
     NettyClientTransport transport = new NettyClientTransport(
-        address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, allocator,
+        address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
         negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
         keepAliveTimeNano, keepAliveTimeoutNano,
         false, authority, userAgent, tooManyPingsRunnable,
@@ -728,8 +723,7 @@
         TestUtils.testServerAddress(new InetSocketAddress(0)),
         new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
         new HashMap<ChannelOption<?>, Object>(),
-        new FixedObjectPool<>(group), new FixedObjectPool<>(group),
-        SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator,
+        new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
         Collections.<ServerStreamTracer.Factory>emptyList(),
         TransportTracer.getDefaultFactory(),
         maxStreamsPerConnection,
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index f1f9679..b873785 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -83,7 +83,6 @@
         new HashMap<ChannelOption<?>, Object>(),
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
-        SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
         protocolNegotiator,
         Collections.<ServerStreamTracer.Factory>emptyList(),
         TransportTracer.getDefaultFactory(),
@@ -128,7 +127,6 @@
         new HashMap<ChannelOption<?>, Object>(),
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
-        SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
         ProtocolNegotiators.plaintext(),
         Collections.<ServerStreamTracer.Factory>emptyList(),
         TransportTracer.getDefaultFactory(),
@@ -167,7 +165,6 @@
         channelOptions,
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
-        SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
         ProtocolNegotiators.plaintext(),
         Collections.<ServerStreamTracer.Factory>emptyList(),
         TransportTracer.getDefaultFactory(),
@@ -218,7 +215,6 @@
         new HashMap<ChannelOption<?>, Object>(),
         SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
         SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
-        SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
         ProtocolNegotiators.plaintext(),
         Collections.<ServerStreamTracer.Factory>emptyList(),
         TransportTracer.getDefaultFactory(),