netty, alts: expose ProtectedPromise, and writeBufferedAndRemove methods (#5542)

diff --git a/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java b/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java
index a19a816..e204acd 100644
--- a/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java
+++ b/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java
@@ -18,6 +18,7 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
+import io.grpc.Internal;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
@@ -34,14 +35,15 @@
  *
  * <p>NOTE: this code is based on code in Netty's {@code Http2CodecUtil}.
  */
-final class ProtectedPromise extends DefaultChannelPromise {
+@Internal
+public final class ProtectedPromise extends DefaultChannelPromise {
   private final List<ChannelPromise> unprotectedPromises;
   private int expectedCount;
   private int successfulCount;
   private int failureCount;
   private boolean doneAllocating;
 
-  ProtectedPromise(Channel channel, EventExecutor executor, int numUnprotectedPromises) {
+  public ProtectedPromise(Channel channel, EventExecutor executor, int numUnprotectedPromises) {
     super(channel, executor);
     unprotectedPromises = new ArrayList<>(numUnprotectedPromises);
   }
@@ -50,7 +52,7 @@
    * Adds a promise for a pending unprotected write. This will be notified after all of the writes
    * complete.
    */
-  void addUnprotectedPromise(ChannelPromise promise) {
+  public void addUnprotectedPromise(ChannelPromise promise) {
     unprotectedPromises.add(promise);
   }
 
@@ -60,7 +62,7 @@
    *
    * @return {@code this} promise.
    */
-  ChannelPromise newPromise() {
+  public ChannelPromise newPromise() {
     checkState(!doneAllocating, "Done allocating. No more promises can be allocated.");
     expectedCount++;
     return this;
@@ -72,7 +74,7 @@
    *
    * @return {@code this} promise.
    */
-  ChannelPromise doneAllocatingPromises() {
+  public ChannelPromise doneAllocatingPromises() {
     if (!doneAllocating) {
       doneAllocating = true;
       if (successfulCount == expectedCount) {
diff --git a/netty/src/main/java/io/grpc/netty/InternalWriteBufferingAndExceptionHandlerUtils.java b/netty/src/main/java/io/grpc/netty/InternalWriteBufferingAndExceptionHandlerUtils.java
new file mode 100644
index 0000000..0e09590
--- /dev/null
+++ b/netty/src/main/java/io/grpc/netty/InternalWriteBufferingAndExceptionHandlerUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.netty;
+
+import io.grpc.Internal;
+import io.netty.channel.Channel;
+
+/** Utility class for {@link WriteBufferingAndExceptionHandler}. */
+@Internal
+public final class InternalWriteBufferingAndExceptionHandlerUtils {
+
+  /**
+   * Writes buffered data and removes {@link WriteBufferingAndExceptionHandler} from {@link
+   * io.netty.channel.ChannelPipeline}.
+   *
+   * <p>Internal use only. Do not use.
+   */
+  public static void writeBufferingAndRemove(Channel channel) {
+    NettyClientHandler.writeBufferingAndRemove(channel);
+  }
+}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 90890df..5cf74d8 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -18,6 +18,7 @@
 
 import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
 import static io.netty.util.CharsetUtil.UTF_8;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -445,12 +446,17 @@
     this.attributes = this.attributes.toBuilder().setAll(attributes).build();
     this.securityInfo = securityInfo;
     super.handleProtocolNegotiationCompleted(attributes, securityInfo);
-    // Once protocol negotiator is complete, release all writes and remove the buffer.
+    writeBufferingAndRemove(ctx().channel());
+  }
+
+  static void writeBufferingAndRemove(Channel channel) {
+    checkNotNull(channel, "channel");
     ChannelHandlerContext handlerCtx =
-        ctx().pipeline().context(WriteBufferingAndExceptionHandler.class);
-    if (handlerCtx != null) {
-      ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
+        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
+    if (handlerCtx == null) {
+      return;
     }
+    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
   }
 
   @Override