chore: handle FlowControllerTest thread start variation by polling (#1586)
* chore: handle FlowControllerTest thread start variation by polling
* chore: add license to new files
* chore: use assertThrows and ensure original assertion is cause
* chore: simplify assertByPolling to method with 2 args
* chore: lint
* chore: fix javadoc
diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java
new file mode 100644
index 0000000..d392a73
--- /dev/null
+++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google LLC nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package com.google.api.gax.batching;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.common.base.Stopwatch;
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * Blocks the current thread to poll the given assertion every 10ms until it's successful or the
+ * timeout is exceeded. Expected usage:
+ *
+ * <pre>{@code
+ * assertByPolling(Duration.ofSeconds(2), () -> assertThat(...));
+ * }</pre>
+ */
+public class AssertByPolling {
+
+ public static void assertByPolling(Duration timeout, Runnable assertion)
+ throws InterruptedException {
+ Objects.requireNonNull(timeout, "Timeout must not be null");
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ try {
+ assertion.run();
+ return; // Success
+
+ } catch (AssertionError err) {
+ if (stopwatch.elapsed(MILLISECONDS) < timeout.toMillis()) {
+ MILLISECONDS.sleep(10);
+ } else {
+ throw new AssertionError("Timeout waiting for successful assertion.", err);
+ }
+ }
+ }
+ }
+}
diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java
new file mode 100644
index 0000000..eabf5ed
--- /dev/null
+++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google LLC nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package com.google.api.gax.batching;
+
+import static com.google.api.gax.batching.AssertByPolling.assertByPolling;
+
+import com.google.common.truth.Truth;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AssertByPollingTest {
+
+ @Test
+ public void testFailsWhenTimeoutExceeded() {
+ AssertionError error =
+ Assert.assertThrows(
+ AssertionError.class,
+ () -> assertByPolling(Duration.ofNanos(2), () -> Truth.assertThat(1).isAtLeast(2)));
+
+ Throwable cause = error.getCause();
+ Truth.assertThat(cause).isInstanceOf(AssertionError.class);
+ // Error provides original assertion failure that never came true.
+ Truth.assertThat(cause.getMessage()).contains("expected to be at least");
+ }
+
+ @Test
+ public void testImmediateSuccessSucceedsRegardlessOfTimeout() throws InterruptedException {
+ Runnable succeedsAfter1ms =
+ () -> {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ };
+ Duration timeout = Duration.ofNanos(0);
+ assertByPolling(timeout, succeedsAfter1ms);
+ }
+
+ @Test
+ public void testSucceedsThirdTime() throws InterruptedException {
+ AtomicInteger attempt = new AtomicInteger(1);
+ AtomicInteger numFailures = new AtomicInteger(0);
+ Runnable succeedsThirdTime =
+ () -> {
+ if (attempt.getAndIncrement() < 3) {
+ numFailures.incrementAndGet();
+ Assert.fail();
+ }
+ };
+
+ Duration timeout = Duration.ofMillis(100);
+ assertByPolling(timeout, succeedsThirdTime);
+ Truth.assertThat(numFailures.get()).isEqualTo(2);
+ }
+}
diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
index cb82d28..534ad3c 100644
--- a/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
+++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
@@ -29,6 +29,7 @@
*/
package com.google.api.gax.batching;
+import static com.google.api.gax.batching.AssertByPolling.assertByPolling;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,6 +42,7 @@
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.common.util.concurrent.SettableFuture;
import java.lang.Thread.State;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -522,10 +524,10 @@
final AtomicInteger totalDecreased = new AtomicInteger(0);
final AtomicInteger releasedCounter = new AtomicInteger(0);
- List<Future> reserveThreads =
+ List<Future<?>> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter);
- for (Future t : reserveThreads) {
+ for (Future<?> t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
@@ -555,10 +557,10 @@
AtomicInteger totalIncreased = new AtomicInteger(0);
AtomicInteger totalDecreased = new AtomicInteger(0);
AtomicInteger releasedCounter = new AtomicInteger(0);
- List<Future> reserveThreads =
+ List<Future<?>> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
- for (Future t : reserveThreads) {
+ for (Future<?> t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
@@ -596,19 +598,18 @@
// will be blocked by reserve 10
Thread t =
new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- flowController.reserve(0, 100);
- } catch (FlowControlException e) {
- }
+ () -> {
+ try {
+ flowController.reserve(0, 100);
+ } catch (FlowControlException e) {
+ throw new AssertionError(e);
}
});
t.start();
+
// wait for thread to start, and check it should be blocked
- Thread.sleep(50);
- assertEquals(State.WAITING, t.getState());
+ assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState()));
+
// increase and decrease should not be blocked
int increase = 5, decrease = 20;
flowController.decreaseThresholds(0, decrease);
@@ -641,19 +642,18 @@
.build());
Thread t =
new Thread(
- new Runnable() {
- @Override
- public void run() {
- try {
- flowController.reserve(initial + 10, 10);
- } catch (FlowControlException e) {
- }
+ () -> {
+ try {
+ flowController.reserve(initial + 10, 10);
+ } catch (FlowControlException e) {
+ throw new AssertionError(e);
}
});
t.start();
+
// wait for thread to start, and check it should be blocked
- Thread.sleep(50);
- assertEquals(State.WAITING, t.getState());
+ assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState()));
+
// increase and decrease should not be blocked
int increase = 5, decrease = 20;
flowController.decreaseThresholds(decrease, 0);
@@ -734,7 +734,7 @@
.isAtLeast(currentTime);
}
- private List<Future> testConcurrentUpdates(
+ private List<Future<?>> testConcurrentUpdates(
final FlowController flowController,
final int increaseStepRange,
final int decreaseStepRange,
@@ -774,8 +774,8 @@
}
}
};
- List<Future> updateFuture = new ArrayList<>();
- List<Future> reserveReleaseFuture = new ArrayList<>();
+ List<Future<?>> updateFuture = new ArrayList<>();
+ List<Future<?>> reserveReleaseFuture = new ArrayList<>();
ExecutorService executors = Executors.newFixedThreadPool(10);
ExecutorService reserveExecutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
@@ -783,7 +783,7 @@
updateFuture.add(executors.submit(decreaseRunnable));
reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable));
}
- for (Future t : updateFuture) {
+ for (Future<?> t : updateFuture) {
t.get(50, TimeUnit.MILLISECONDS);
}
executors.shutdown();