alts: Make concurrent handshake limit part of ALTS instead of TSI

The handshake limit is more a property of ALTS than TSI. This allows
other TSI implementations to accept a high connection rate (b/179376431)
diff --git a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java
index ce718f4..0ea1624 100644
--- a/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java
+++ b/alts/src/main/java/io/grpc/alts/internal/AltsProtocolNegotiator.java
@@ -50,6 +50,10 @@
 // TODO(carl-mastrangelo): rename this AltsProtocolNegotiators.
 public final class AltsProtocolNegotiator {
   private static final Logger logger = Logger.getLogger(AltsProtocolNegotiator.class.getName());
+  // Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake
+  // server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until
+  // there is an async TSI handshaking API to avoid the blocking.
+  private static final AsyncSemaphore handshakeSemaphore = new AsyncSemaphore(32);
 
   @Grpc.TransportAttr
   public static final Attributes.Key<TsiPeer> TSI_PEER_KEY = Attributes.Key.create("TSI_PEER");
@@ -110,8 +114,8 @@
       TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
       NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
       ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
-      ChannelHandler thh =
-          new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
+      ChannelHandler thh = new TsiHandshakeHandler(
+          gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
       ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
       return wuah;
     }
@@ -165,8 +169,8 @@
       TsiHandshaker handshaker = handshakerFactory.newHandshaker(/* authority= */ null);
       NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
       ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
-      ChannelHandler thh =
-          new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
+      ChannelHandler thh = new TsiHandshakeHandler(
+          gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
       ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
       return wuah;
     }
@@ -259,8 +263,8 @@
           || isXdsDirectPath) {
         TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
         NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
-        securityHandler =
-            new TsiHandshakeHandler(gnh, nettyHandshaker, new AltsHandshakeValidator());
+        securityHandler = new TsiHandshakeHandler(
+            gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
       } else {
         securityHandler = InternalProtocolNegotiators.clientTlsHandler(
             gnh, sslContext, grpcHandler.getAuthority());
diff --git a/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java b/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java
new file mode 100644
index 0000000..3ccdcfc
--- /dev/null
+++ b/alts/src/main/java/io/grpc/alts/internal/AsyncSemaphore.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.alts.internal;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.LinkedList;
+import java.util.Queue;
+import javax.annotation.concurrent.GuardedBy;
+
+/** Provides a semaphore primitive, without blocking waiting on permits. */
+final class AsyncSemaphore {
+  private final Object lock = new Object();
+  @SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues
+  private final Queue<ChannelPromise> queue = new LinkedList<>();
+  @GuardedBy("lock")
+  private int permits;
+
+  public AsyncSemaphore(int permits) {
+    this.permits = permits;
+  }
+
+  public ChannelFuture acquire(ChannelHandlerContext ctx) {
+    synchronized (lock) {
+      if (permits > 0) {
+        permits--;
+        return ctx.newSucceededFuture();
+      }
+      ChannelPromise promise = ctx.newPromise();
+      queue.add(promise);
+      return promise;
+    }
+  }
+
+  public void release() {
+    ChannelPromise next;
+    synchronized (lock) {
+      next = queue.poll();
+      if (next == null) {
+        permits++;
+        return;
+      }
+    }
+    next.setSuccess();
+  }
+}
diff --git a/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java b/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java
index 1b4737f..f2e19d3 100644
--- a/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java
+++ b/alts/src/main/java/io/grpc/alts/internal/TsiHandshakeHandler.java
@@ -35,12 +35,9 @@
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import java.security.GeneralSecurityException;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import javax.annotation.Nullable;
 
 /**
@@ -82,14 +79,11 @@
   }
 
   private static final int HANDSHAKE_FRAME_SIZE = 1024;
-  // Avoid performing too many handshakes in parallel, as it may cause queuing in the handshake
-  // server and cause unbounded blocking on the event loop (b/168808426). This is a workaround until
-  // there is an async TSI handshaking API to avoid the blocking.
-  private static final AsyncSemaphore semaphore = new AsyncSemaphore(32);
 
   private final NettyTsiHandshaker handshaker;
   private final HandshakeValidator handshakeValidator;
   private final ChannelHandler next;
+  private final AsyncSemaphore semaphore;
 
   private ProtocolNegotiationEvent pne;
   private boolean semaphoreAcquired;
@@ -99,9 +93,20 @@
    */
   public TsiHandshakeHandler(
       ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator) {
+    this(next, handshaker, handshakeValidator, null);
+  }
+
+  /**
+   * Constructs a TsHandshakeHandler. If a semaphore is provided, a permit from the semaphore is
+   * required to start the handshake and is returned when the handshake ends.
+   */
+  public TsiHandshakeHandler(
+      ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator,
+      AsyncSemaphore semaphore) {
     this.handshaker = checkNotNull(handshaker, "handshaker");
     this.handshakeValidator = checkNotNull(handshakeValidator, "handshakeValidator");
     this.next = checkNotNull(next, "next");
+    this.semaphore = semaphore;
   }
 
   @Override
@@ -152,7 +157,7 @@
       pne = (ProtocolNegotiationEvent) evt;
       InternalProtocolNegotiators.negotiationLogger(ctx)
           .log(ChannelLogLevel.INFO, "TsiHandshake started");
-      ChannelFuture acquire = semaphore.acquire(ctx);
+      ChannelFuture acquire = semaphoreAcquire(ctx);
       if (acquire.isSuccess()) {
         semaphoreAcquired = true;
         sendHandshake(ctx);
@@ -164,7 +169,7 @@
               return;
             }
             if (ctx.isRemoved()) {
-              semaphore.release();
+              semaphoreRelease();
               return;
             }
             semaphoreAcquired = true;
@@ -222,44 +227,23 @@
   @Override
   protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
     if (semaphoreAcquired) {
-      semaphore.release();
+      semaphoreRelease();
       semaphoreAcquired = false;
     }
     handshaker.close();
   }
 
-  private static class AsyncSemaphore {
-    private final Object lock = new Object();
-    @SuppressWarnings("JdkObsolete") // LinkedList avoids high watermark memory issues
-    private final Queue<ChannelPromise> queue = new LinkedList<>();
-    private int permits;
-
-    public AsyncSemaphore(int permits) {
-      this.permits = permits;
+  private ChannelFuture semaphoreAcquire(ChannelHandlerContext ctx) {
+    if (semaphore == null) {
+      return ctx.newSucceededFuture();
+    } else {
+      return semaphore.acquire(ctx);
     }
+  }
 
-    public ChannelFuture acquire(ChannelHandlerContext ctx) {
-      synchronized (lock) {
-        if (permits > 0) {
-          permits--;
-          return ctx.newSucceededFuture();
-        }
-        ChannelPromise promise = ctx.newPromise();
-        queue.add(promise);
-        return promise;
-      }
-    }
-
-    public void release() {
-      ChannelPromise next;
-      synchronized (lock) {
-        next = queue.poll();
-        if (next == null) {
-          permits++;
-          return;
-        }
-      }
-      next.setSuccess();
+  private void semaphoreRelease() {
+    if (semaphore != null) {
+      semaphore.release();
     }
   }
 }