alts: Make concurrent handshake limit part of ALTS instead of TSI
The handshake limit is more a property of ALTS than TSI. This allows
other TSI implementations to accept a high connection rate (b/179376431)
diff --git a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java
index ce718f4..0ea1624 100644
--- a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java
+++ b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java
@@ -50,6 +50,10 @@
// TODO(carl-mastrangelo): rename this AltsProtocolNegotiators.
public final class AltsProtocolNegotiator {
private static final Logger logger = Logger.getLogger(AltsProtocolNegotiator.class.getName());
+ // Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake
+ // server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until
+ // there is an async TSI handshaking API to avoid the blocking.
+ private static final AsyncSemaphore handshakeSemaphore = new AsyncSemaphore(32);
@Grpc.TransportAttr
public static final Attributes.Key<TsiPeer> TSI_PEER_KEY = Attributes.Key.create("TSI_PEER");
@@ -110,8 +114,8 @@
TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
- ChannelHandler thh =
- new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
+ ChannelHandler thh = new TsiHandshakeHandler(
+ gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
return wuah;
}
@@ -165,8 +169,8 @@
TsiHandshaker handshaker = handshakerFactory.newHandshaker(/* authority= */ null);
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
- ChannelHandler thh =
- new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
+ ChannelHandler thh = new TsiHandshakeHandler(
+ gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
return wuah;
}
@@ -259,8 +263,8 @@
|| isXdsDirectPath) {
TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
- securityHandler =
- new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
+ securityHandler = new TsiHandshakeHandler(
+ gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
} else {
securityHandler = InternalProtocolNegotiators.clientTlsHandler(
gnh, sslContext, grpcHandler.getAuthority());
diff --git a/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java b/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java
new file mode 100644
index 0000000..3ccdcfc
--- /dev/null
+++ b/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.alts.internal;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.LinkedList;
+import java.util.Queue;
+import javax.annotation.concurrent.GuardedBy;
+
+/** Provides a semaphore primitive, without blocking waiting on permits. */
+final class AsyncSemaphore {
+ private final Object lock = new Object();
+ @SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues
+ private final Queue<ChannelPromise> queue = new LinkedList<>();
+ @GuardedBy("lock")
+ private int permits;
+
+ public AsyncSemaphore(int permits) {
+ this.permits = permits;
+ }
+
+ public ChannelFuture acquire(ChannelHandlerContext ctx) {
+ synchronized (lock) {
+ if (permits > 0) {
+ permits--;
+ return ctx.newSucceededFuture();
+ }
+ ChannelPromise promise = ctx.newPromise();
+ queue.add(promise);
+ return promise;
+ }
+ }
+
+ public void release() {
+ ChannelPromise next;
+ synchronized (lock) {
+ next = queue.poll();
+ if (next == null) {
+ permits++;
+ return;
+ }
+ }
+ next.setSuccess();
+ }
+}
diff --git a/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java b/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java
index 1b4737f..f2e19d3 100644
--- a/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java
+++ b/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java
@@ -35,12 +35,9 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.security.GeneralSecurityException;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
import javax.annotation.Nullable;
/**
@@ -82,14 +79,11 @@
}
private static final int HANDSHAKE_FRAME_SIZE = 1024;
- // Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake
- // server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until
- // there is an async TSI handshaking API to avoid the blocking.
- private static final AsyncSemaphore semaphore = new AsyncSemaphore(32);
private final NettyTsiHandshaker handshaker;
private final HandshakeValidator handshakeValidator;
private final ChannelHandler next;
+ private final AsyncSemaphore semaphore;
private ProtocolNegotiationEvent pne;
private boolean semaphoreAcquired;
@@ -99,9 +93,20 @@
*/
public TsiHandshakeHandler(
ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator) {
+ this(next, handshaker, handshakeValidator, null);
+ }
+
+ /**
+ * Constructs a TsHandshakeHandler. If a semaphore is provided, a permit from the semaphore is
+ * required to start the handshake and is returned when the handshake ends.
+ */
+ public TsiHandshakeHandler(
+ ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator,
+ AsyncSemaphore semaphore) {
this.handshaker = checkNotNull(handshaker, "handshaker");
this.handshakeValidator = checkNotNull(handshakeValidator, "handshakeValidator");
this.next = checkNotNull(next, "next");
+ this.semaphore = semaphore;
}
@Override
@@ -152,7 +157,7 @@
pne = (ProtocolNegotiationEvent) evt;
InternalProtocolNegotiators.negotiationLogger(ctx)
.log(ChannelLogLevel.INFO, "TsiHandshake started");
- ChannelFuture acquire = semaphore.acquire(ctx);
+ ChannelFuture acquire = semaphoreAcquire(ctx);
if (acquire.isSuccess()) {
semaphoreAcquired = true;
sendHandshake(ctx);
@@ -164,7 +169,7 @@
return;
}
if (ctx.isRemoved()) {
- semaphore.release();
+ semaphoreRelease();
return;
}
semaphoreAcquired = true;
@@ -222,44 +227,23 @@
@Override
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
if (semaphoreAcquired) {
- semaphore.release();
+ semaphoreRelease();
semaphoreAcquired = false;
}
handshaker.close();
}
- private static class AsyncSemaphore {
- private final Object lock = new Object();
- @SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues
- private final Queue<ChannelPromise> queue = new LinkedList<>();
- private int permits;
-
- public AsyncSemaphore(int permits) {
- this.permits = permits;
+ private ChannelFuture semaphoreAcquire(ChannelHandlerContext ctx) {
+ if (semaphore == null) {
+ return ctx.newSucceededFuture();
+ } else {
+ return semaphore.acquire(ctx);
}
+ }
- public ChannelFuture acquire(ChannelHandlerContext ctx) {
- synchronized (lock) {
- if (permits > 0) {
- permits--;
- return ctx.newSucceededFuture();
- }
- ChannelPromise promise = ctx.newPromise();
- queue.add(promise);
- return promise;
- }
- }
-
- public void release() {
- ChannelPromise next;
- synchronized (lock) {
- next = queue.poll();
- if (next == null) {
- permits++;
- return;
- }
- }
- next.setSuccess();
+ private void semaphoreRelease() {
+ if (semaphore != null) {
+ semaphore.release();
}
}
}