chore: handle FlowControllerTest thread start variation by polling (#1586)

* chore: handle FlowControllerTest thread start variation by polling

* chore: add license to new files

* chore: use assertThrows and ensure original assertion is cause

* chore: simplify assertByPolling to method with 2 args

* chore: lint

* chore: fix javadoc
diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java
new file mode 100644
index 0000000..d392a73
--- /dev/null
+++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google LLC nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package com.google.api.gax.batching;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.common.base.Stopwatch;
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * Blocks the current thread to poll the given assertion every 10ms until it's successful or the
+ * timeout is exceeded. Expected usage:
+ *
+ * <pre>{@code
+ * assertByPolling(Duration.ofSeconds(2), () -> assertThat(...));
+ * }</pre>
+ */
+public class AssertByPolling {
+
+  public static void assertByPolling(Duration timeout, Runnable assertion)
+      throws InterruptedException {
+    Objects.requireNonNull(timeout, "Timeout must not be null");
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (true) {
+      try {
+        assertion.run();
+        return; // Success
+
+      } catch (AssertionError err) {
+        if (stopwatch.elapsed(MILLISECONDS) < timeout.toMillis()) {
+          MILLISECONDS.sleep(10);
+        } else {
+          throw new AssertionError("Timeout waiting for successful assertion.", err);
+        }
+      }
+    }
+  }
+}
diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java
new file mode 100644
index 0000000..eabf5ed
--- /dev/null
+++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google LLC nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package com.google.api.gax.batching;
+
+import static com.google.api.gax.batching.AssertByPolling.assertByPolling;
+
+import com.google.common.truth.Truth;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AssertByPollingTest {
+
+  @Test
+  public void testFailsWhenTimeoutExceeded() {
+    AssertionError error =
+        Assert.assertThrows(
+            AssertionError.class,
+            () -> assertByPolling(Duration.ofNanos(2), () -> Truth.assertThat(1).isAtLeast(2)));
+
+    Throwable cause = error.getCause();
+    Truth.assertThat(cause).isInstanceOf(AssertionError.class);
+    // Error provides original assertion failure that never came true.
+    Truth.assertThat(cause.getMessage()).contains("expected to be at least");
+  }
+
+  @Test
+  public void testImmediateSuccessSucceedsRegardlessOfTimeout() throws InterruptedException {
+    Runnable succeedsAfter1ms =
+        () -> {
+          try {
+            TimeUnit.MILLISECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        };
+    Duration timeout = Duration.ofNanos(0);
+    assertByPolling(timeout, succeedsAfter1ms);
+  }
+
+  @Test
+  public void testSucceedsThirdTime() throws InterruptedException {
+    AtomicInteger attempt = new AtomicInteger(1);
+    AtomicInteger numFailures = new AtomicInteger(0);
+    Runnable succeedsThirdTime =
+        () -> {
+          if (attempt.getAndIncrement() < 3) {
+            numFailures.incrementAndGet();
+            Assert.fail();
+          }
+        };
+
+    Duration timeout = Duration.ofMillis(100);
+    assertByPolling(timeout, succeedsThirdTime);
+    Truth.assertThat(numFailures.get()).isEqualTo(2);
+  }
+}
diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
index cb82d28..534ad3c 100644
--- a/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
+++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
@@ -29,6 +29,7 @@
  */
 package com.google.api.gax.batching;
 
+import static com.google.api.gax.batching.AssertByPolling.assertByPolling;
 import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -41,6 +42,7 @@
 import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
 import com.google.common.util.concurrent.SettableFuture;
 import java.lang.Thread.State;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -522,10 +524,10 @@
     final AtomicInteger totalDecreased = new AtomicInteger(0);
     final AtomicInteger releasedCounter = new AtomicInteger(0);
 
-    List<Future> reserveThreads =
+    List<Future<?>> reserveThreads =
         testConcurrentUpdates(
             flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter);
-    for (Future t : reserveThreads) {
+    for (Future<?> t : reserveThreads) {
       t.get(200, TimeUnit.MILLISECONDS);
     }
     assertEquals(reserveThreads.size(), releasedCounter.get());
@@ -555,10 +557,10 @@
     AtomicInteger totalIncreased = new AtomicInteger(0);
     AtomicInteger totalDecreased = new AtomicInteger(0);
     AtomicInteger releasedCounter = new AtomicInteger(0);
-    List<Future> reserveThreads =
+    List<Future<?>> reserveThreads =
         testConcurrentUpdates(
             flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
-    for (Future t : reserveThreads) {
+    for (Future<?> t : reserveThreads) {
       t.get(200, TimeUnit.MILLISECONDS);
     }
     assertEquals(reserveThreads.size(), releasedCounter.get());
@@ -596,19 +598,18 @@
     // will be blocked by reserve 10
     Thread t =
         new Thread(
-            new Runnable() {
-              @Override
-              public void run() {
-                try {
-                  flowController.reserve(0, 100);
-                } catch (FlowControlException e) {
-                }
+            () -> {
+              try {
+                flowController.reserve(0, 100);
+              } catch (FlowControlException e) {
+                throw new AssertionError(e);
               }
             });
     t.start();
+
     // wait for thread to start, and check it should be blocked
-    Thread.sleep(50);
-    assertEquals(State.WAITING, t.getState());
+    assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState()));
+
     // increase and decrease should not be blocked
     int increase = 5, decrease = 20;
     flowController.decreaseThresholds(0, decrease);
@@ -641,19 +642,18 @@
                 .build());
     Thread t =
         new Thread(
-            new Runnable() {
-              @Override
-              public void run() {
-                try {
-                  flowController.reserve(initial + 10, 10);
-                } catch (FlowControlException e) {
-                }
+            () -> {
+              try {
+                flowController.reserve(initial + 10, 10);
+              } catch (FlowControlException e) {
+                throw new AssertionError(e);
               }
             });
     t.start();
+
     // wait for thread to start, and check it should be blocked
-    Thread.sleep(50);
-    assertEquals(State.WAITING, t.getState());
+    assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState()));
+
     // increase and decrease should not be blocked
     int increase = 5, decrease = 20;
     flowController.decreaseThresholds(decrease, 0);
@@ -734,7 +734,7 @@
         .isAtLeast(currentTime);
   }
 
-  private List<Future> testConcurrentUpdates(
+  private List<Future<?>> testConcurrentUpdates(
       final FlowController flowController,
       final int increaseStepRange,
       final int decreaseStepRange,
@@ -774,8 +774,8 @@
             }
           }
         };
-    List<Future> updateFuture = new ArrayList<>();
-    List<Future> reserveReleaseFuture = new ArrayList<>();
+    List<Future<?>> updateFuture = new ArrayList<>();
+    List<Future<?>> reserveReleaseFuture = new ArrayList<>();
     ExecutorService executors = Executors.newFixedThreadPool(10);
     ExecutorService reserveExecutor = Executors.newFixedThreadPool(10);
     for (int i = 0; i < 5; i++) {
@@ -783,7 +783,7 @@
       updateFuture.add(executors.submit(decreaseRunnable));
       reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable));
     }
-    for (Future t : updateFuture) {
+    for (Future<?> t : updateFuture) {
       t.get(50, TimeUnit.MILLISECONDS);
     }
     executors.shutdown();