netty, alts: expose ProtectedPromise, and writeBufferedAndRemove methods (#5542)
diff --git a/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java b/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java
index a19a816..e204acd 100644
--- a/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java
+++ b/alts/src/main/java/io/grpc/alts/internal/ProtectedPromise.java
@@ -18,6 +18,7 @@
import static com.google.common.base.Preconditions.checkState;
+import io.grpc.Internal;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
@@ -34,14 +35,15 @@
*
* <p>NOTE: this code is based on code in Netty's {@code Http2CodecUtil}.
*/
-final class ProtectedPromise extends DefaultChannelPromise {
+@Internal
+public final class ProtectedPromise extends DefaultChannelPromise {
private final List<ChannelPromise> unprotectedPromises;
private int expectedCount;
private int successfulCount;
private int failureCount;
private boolean doneAllocating;
- ProtectedPromise(Channel channel, EventExecutor executor, int numUnprotectedPromises) {
+ public ProtectedPromise(Channel channel, EventExecutor executor, int numUnprotectedPromises) {
super(channel, executor);
unprotectedPromises = new ArrayList<>(numUnprotectedPromises);
}
@@ -50,7 +52,7 @@
* Adds a promise for a pending unprotected write. This will be notified after all of the writes
* complete.
*/
- void addUnprotectedPromise(ChannelPromise promise) {
+ public void addUnprotectedPromise(ChannelPromise promise) {
unprotectedPromises.add(promise);
}
@@ -60,7 +62,7 @@
*
* @return {@code this} promise.
*/
- ChannelPromise newPromise() {
+ public ChannelPromise newPromise() {
checkState(!doneAllocating, "Done allocating. No more promises can be allocated.");
expectedCount++;
return this;
@@ -72,7 +74,7 @@
*
* @return {@code this} promise.
*/
- ChannelPromise doneAllocatingPromises() {
+ public ChannelPromise doneAllocatingPromises() {
if (!doneAllocating) {
doneAllocating = true;
if (successfulCount == expectedCount) {
diff --git a/netty/src/main/java/io/grpc/netty/InternalWriteBufferingAndExceptionHandlerUtils.java b/netty/src/main/java/io/grpc/netty/InternalWriteBufferingAndExceptionHandlerUtils.java
new file mode 100644
index 0000000..0e09590
--- /dev/null
+++ b/netty/src/main/java/io/grpc/netty/InternalWriteBufferingAndExceptionHandlerUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.netty;
+
+import io.grpc.Internal;
+import io.netty.channel.Channel;
+
+/** Utility class for {@link WriteBufferingAndExceptionHandler}. */
+@Internal
+public final class InternalWriteBufferingAndExceptionHandlerUtils {
+
+ /**
+ * Writes buffered data and removes {@link WriteBufferingAndExceptionHandler} from {@link
+ * io.netty.channel.ChannelPipeline}.
+ *
+ * <p>Internal use only. Do not use.
+ */
+ public static void writeBufferingAndRemove(Channel channel) {
+ NettyClientHandler.writeBufferingAndRemove(channel);
+ }
+}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 90890df..5cf74d8 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -18,6 +18,7 @@
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
import static io.netty.util.CharsetUtil.UTF_8;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -445,12 +446,17 @@
this.attributes = this.attributes.toBuilder().setAll(attributes).build();
this.securityInfo = securityInfo;
super.handleProtocolNegotiationCompleted(attributes, securityInfo);
- // Once protocol negotiator is complete, release all writes and remove the buffer.
+ writeBufferingAndRemove(ctx().channel());
+ }
+
+ static void writeBufferingAndRemove(Channel channel) {
+ checkNotNull(channel, "channel");
ChannelHandlerContext handlerCtx =
- ctx().pipeline().context(WriteBufferingAndExceptionHandler.class);
- if (handlerCtx != null) {
- ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
+ channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
+ if (handlerCtx == null) {
+ return;
}
+ ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
}
@Override