core: allow setting custom Deadline.Ticker to InProcessServerBuilder (#6034)
ServerImpl uses that ticker to create incoming Deadlines. This feature is specifically restricted to in-process, as it can also customize ScheduledExecutorService, and them together can fake out the clock which is useful in tests. On the other hand, a fake Ticker won't work with Netty's ScheduledExecutorService.
Also improved mismatch detection, documentation and tests in Deadline.
diff --git a/context/src/main/java/io/grpc/Deadline.java b/context/src/main/java/io/grpc/Deadline.java
index fadbe50..b330081 100644
--- a/context/src/main/java/io/grpc/Deadline.java
+++ b/context/src/main/java/io/grpc/Deadline.java
@@ -44,6 +44,8 @@
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
+ *
+ * @since 1.24.0
*/
public static Ticker getSystemTicker() {
return SYSTEM_TICKER;
@@ -64,6 +66,11 @@
/**
* Create a deadline that will expire at the specified offset based on the given {@link Ticker}.
*
+ * <p><strong>CAUTION</strong>: Only deadlines created with the same {@link Ticker} instance can
+ * be compared by methods like {@link #minimum}, {@link #isBefore} and {@link #compareTo}. Custom
+ * Tickers should only be used in tests where you fake out the clock. Always use the {@link
+ * #getSystemTicker system ticker} in production, or serious errors may occur.
+ *
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
@@ -72,6 +79,8 @@
* @param units The time unit for the duration.
* @param ticker Where this deadline refer the current time
* @return A new deadline.
+ *
+ * @since 1.24.0
*/
public static Deadline after(long duration, TimeUnit units, Ticker ticker) {
checkNotNull(units, "units");
@@ -111,19 +120,22 @@
}
/**
- * Is {@code this} deadline before another.
+ * Is {@code this} deadline before another. Two deadlines must be created using the same {@link
+ * Ticker}.
*/
public boolean isBefore(Deadline other) {
- assert this.ticker == other.ticker : "Tickers don't match";
+ checkTicker(other);
return this.deadlineNanos - other.deadlineNanos < 0;
}
/**
- * Return the minimum deadline of {@code this} or an other deadline.
+ * Return the minimum deadline of {@code this} or an other deadline. They must be created using
+ * the same {@link Ticker}.
+ *
* @param other deadline to compare with {@code this}.
*/
public Deadline minimum(Deadline other) {
- assert this.ticker == other.ticker : "Tickers don't match";
+ checkTicker(other);
return isBefore(other) ? this : other;
}
@@ -157,6 +169,11 @@
/**
* Schedule a task to be run when the deadline expires.
+ *
+ * <p>Note if this deadline was created with a custom {@link Ticker}, the {@code scheduler}'s
+ * underlying clock should be synchronized with that Ticker. Otherwise the task won't be run at
+ * the expected point of time.
+ *
* @param task to run on expiration
* @param scheduler used to execute the task
* @return {@link ScheduledFuture} which can be used to cancel execution of the task
@@ -182,12 +199,20 @@
buf.append(String.format(".%09d", nanos));
}
buf.append("s from now");
+ if (ticker != SYSTEM_TICKER) {
+ buf.append(" (ticker=" + ticker + ")");
+ }
return buf.toString();
}
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Both deadlines must be created with the same {@link Ticker}.
+ */
@Override
public int compareTo(Deadline that) {
- assert this.ticker == that.ticker : "Tickers don't match";
+ checkTicker(that);
long diff = this.deadlineNanos - that.deadlineNanos;
if (diff < 0) {
return -1;
@@ -200,12 +225,18 @@
/**
* Time source representing nanoseconds since fixed but arbitrary point in time.
*
+ * <p>DO NOT use custom {@link Ticker} implementations in production, because deadlines created
+ * with custom tickers are incompatible with those created with the system ticker. Always use
+ * the {@link #getSystemTicker system ticker} whenever you need to provide one in production code.
+ *
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
*
* <p>In general implementations should be thread-safe, unless it's implemented and used in a
* localized environment (like unit tests) where you are sure the usages are synchronized.
+ *
+ * @since 1.24.0
*/
public abstract static class Ticker {
/** Returns the number of nanoseconds since this source's epoch. */
@@ -225,4 +256,12 @@
}
return reference;
}
+
+ private void checkTicker(Deadline other) {
+ if (ticker != other.ticker) {
+ throw new AssertionError(
+ "Tickers (" + ticker + " and " + other.ticker + ") don't match."
+ + " Custom Ticker should only be used in tests!");
+ }
+ }
}
diff --git a/context/src/test/java/io/grpc/DeadlineTest.java b/context/src/test/java/io/grpc/DeadlineTest.java
index 48e172d..5f35918 100644
--- a/context/src/test/java/io/grpc/DeadlineTest.java
+++ b/context/src/test/java/io/grpc/DeadlineTest.java
@@ -17,6 +17,7 @@
package io.grpc;
import static com.google.common.truth.Truth.assertAbout;
+import static com.google.common.truth.Truth.assertThat;
import static io.grpc.testing.DeadlineSubject.deadline;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -69,6 +70,18 @@
}
@Test
+ public void minimum() {
+ Deadline d1 = Deadline.after(1, TimeUnit.MINUTES, ticker);
+ Deadline d2 = Deadline.after(2, TimeUnit.MINUTES, ticker);
+ Deadline d3 = Deadline.after(3, TimeUnit.MINUTES, ticker);
+
+ assertThat(d1.minimum(d2)).isSameInstanceAs(d1);
+ assertThat(d2.minimum(d1)).isSameInstanceAs(d1);
+ assertThat(d3.minimum(d2)).isSameInstanceAs(d2);
+ assertThat(d2.minimum(d3)).isSameInstanceAs(d2);
+ }
+
+ @Test
public void timeCanOverflow() {
ticker.reset(Long.MAX_VALUE);
Deadline d = Deadline.after(10, TimeUnit.DAYS, ticker);
@@ -209,9 +222,15 @@
}
@Test
+ public void toString_systemTickerNotShown() {
+ Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS);
+ assertThat(d.toString()).endsWith("s from now");
+ }
+
+ @Test
public void toString_exact() {
Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS, ticker);
- assertEquals("0s from now", d.toString());
+ assertEquals("0s from now (ticker=FAKE_TICKER)", d.toString());
}
@Test
@@ -219,17 +238,17 @@
Deadline d;
d = Deadline.after(-1, TimeUnit.MINUTES, ticker);
- assertEquals("-60s from now", d.toString());
+ assertEquals("-60s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1, TimeUnit.MILLISECONDS, ticker);
- assertEquals("-0.001000000s from now", d.toString());
+ assertEquals("-0.001000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-500, TimeUnit.MILLISECONDS, ticker);
- assertEquals("-0.500000000s from now", d.toString());
+ assertEquals("-0.500000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1000, TimeUnit.MILLISECONDS, ticker);
- assertEquals("-1s from now", d.toString());
+ assertEquals("-1s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1500, TimeUnit.MILLISECONDS, ticker);
- assertEquals("-1.500000000s from now", d.toString());
+ assertEquals("-1.500000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1023456789, TimeUnit.NANOSECONDS, ticker);
- assertEquals("-1.023456789s from now", d.toString());
+ assertEquals("-1.023456789s from now (ticker=FAKE_TICKER)", d.toString());
}
@Test
@@ -256,9 +275,39 @@
}
@Test
+ public void tickersDontMatch() {
+ Deadline d1 = Deadline.after(10, TimeUnit.SECONDS);
+ Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ boolean success = false;
+ try {
+ d1.compareTo(d2);
+ success = true;
+ } catch (AssertionError e) {
+ // Expected
+ }
+ assertFalse(success);
+
+ try {
+ d1.minimum(d2);
+ success = true;
+ } catch (AssertionError e) {
+ // Expected
+ }
+ assertFalse(success);
+
+ try {
+ d1.isBefore(d2);
+ success = true;
+ } catch (AssertionError e) {
+ // Expected
+ }
+ assertFalse(success);
+ }
+
+ @Test
public void toString_before() {
Deadline d = Deadline.after(12, TimeUnit.MICROSECONDS, ticker);
- assertEquals("0.000012000s from now", d.toString());
+ assertEquals("0.000012000s from now (ticker=FAKE_TICKER)", d.toString());
}
private static class FakeTicker extends Deadline.Ticker {
@@ -279,5 +328,10 @@
}
this.time += unit.toNanos(period);
}
+
+ @Override
+ public String toString() {
+ return "FAKE_TICKER";
+ }
}
}
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
index ed27f3f..603a794 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
@@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Preconditions;
+import io.grpc.Deadline;
import io.grpc.ExperimentalApi;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
@@ -126,6 +127,24 @@
}
/**
+ * Provides a custom deadline ticker that this server will use to create incoming {@link
+ * Deadline}s.
+ *
+ * <p>This is intended for unit tests that fake out the clock. You should also have a fake {@link
+ * ScheduledExecutorService} whose clock is synchronized with this ticker and set it to {@link
+ * #scheduledExecutorService}. DO NOT use this in production.
+ *
+ * @return this
+ * @see Deadline#after(long, TimeUnit, Deadline.Ticker)
+ *
+ * @since 1.24.0
+ */
+ public InProcessServerBuilder deadlineTicker(Deadline.Ticker ticker) {
+ setDeadlineTicker(ticker);
+ return this;
+ }
+
+ /**
* Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables
* the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}).
*
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 04e3fbc..c53c79a 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -25,6 +25,7 @@
import io.grpc.BindableService;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
+import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.InternalChannelz;
@@ -78,6 +79,7 @@
DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS;
+ Deadline.Ticker ticker = Deadline.getSystemTicker();
@Nullable private CensusStatsModule censusStatsOverride;
private boolean statsEnabled = true;
private boolean recordStartedRpcs = true;
@@ -216,6 +218,13 @@
tracingEnabled = value;
}
+ /**
+ * Sets a custom deadline ticker. This should only be called from InProcessServerBuilder.
+ */
+ protected void setDeadlineTicker(Deadline.Ticker ticker) {
+ this.ticker = checkNotNull(ticker, "ticker");
+ }
+
@Override
public final Server build() {
ServerImpl server = new ServerImpl(
diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java
index abbd4ec..471507e 100644
--- a/core/src/main/java/io/grpc/internal/ServerImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerImpl.java
@@ -16,6 +16,7 @@
package io.grpc.internal;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.Contexts.statusFromCancelled;
@@ -33,6 +34,7 @@
import io.grpc.BinaryLog;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
+import io.grpc.Deadline;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
@@ -123,6 +125,7 @@
private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
+ private final Deadline.Ticker ticker;
/**
* Construct a server.
@@ -157,6 +160,7 @@
this.binlog = builder.binlog;
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
+ this.ticker = checkNotNull(builder.ticker, "ticker");
channelz.addServer(this);
}
@@ -578,8 +582,10 @@
return baseContext.withCancellation();
}
- Context.CancellableContext context = baseContext.withDeadlineAfter(
- timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService());
+ Context.CancellableContext context =
+ baseContext.withDeadline(
+ Deadline.after(timeoutNanos, NANOSECONDS, ticker),
+ transport.getScheduledExecutorService());
final class ServerStreamCancellationListener implements Context.CancellationListener {
@Override
public void cancelled(Context context) {
diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java
index ae8a418..d708af5 100644
--- a/core/src/test/java/io/grpc/internal/FakeClock.java
+++ b/core/src/test/java/io/grpc/internal/FakeClock.java
@@ -20,6 +20,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.AbstractFuture;
+import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -61,6 +62,13 @@
}
};
+ private final Deadline.Ticker deadlineTicker =
+ new Deadline.Ticker() {
+ @Override public long nanoTime() {
+ return currentTimeNanos;
+ }
+ };
+
private final Supplier<Stopwatch> stopwatchSupplier =
new Supplier<Stopwatch>() {
@Override public Stopwatch get() {
@@ -230,6 +238,13 @@
}
/**
+ * Deadline ticker of the FakeClock.
+ */
+ public Deadline.Ticker getDeadlineTicker() {
+ return deadlineTicker;
+ }
+
+ /**
* Run all due tasks. Immediately due tasks that are queued during the process also get executed.
*
* @return the number of tasks run by this call
diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java
index dd6aa46..1a79f93 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java
@@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.InternalChannelz.id;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
+import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -199,6 +200,7 @@
public void startUp() throws IOException {
MockitoAnnotations.initMocks(this);
builder.channelz = channelz;
+ builder.ticker = timer.getDeadlineTicker();
streamTracerFactories = Arrays.asList(streamTracerFactory);
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
@@ -977,10 +979,11 @@
verify(stream, times(0)).close(isA(Status.class), ArgumentMatchers.<Metadata>isNotNull());
}
- private ServerStreamListener testClientClose_setup(
+ private ServerStreamListener testStreamClose_setup(
final AtomicReference<ServerCall<String, Integer>> callReference,
final AtomicReference<Context> context,
- final AtomicBoolean contextCancelled) throws Exception {
+ final AtomicBoolean contextCancelled,
+ @Nullable Long timeoutNanos) throws Exception {
createAndStartServer();
callListener = new ServerCall.Listener<String>() {
@Override
@@ -1011,6 +1014,9 @@
= transportServer.registerNewServerTransport(new SimpleServerTransport());
transportListener.transportReady(Attributes.EMPTY);
Metadata requestHeaders = new Metadata();
+ if (timeoutNanos != null) {
+ requestHeaders.put(TIMEOUT_KEY, timeoutNanos);
+ }
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(streamTracerFactories, "Waitier/serve", requestHeaders);
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
@@ -1025,14 +1031,14 @@
}
@Test
- public void testClientClose_cancelTriggersImmediateCancellation() throws Exception {
+ public void testStreamClose_clientCancelTriggersImmediateCancellation() throws Exception {
AtomicBoolean contextCancelled = new AtomicBoolean(false);
AtomicReference<Context> context = new AtomicReference<>();
AtomicReference<ServerCall<String, Integer>> callReference
= new AtomicReference<>();
- ServerStreamListener streamListener = testClientClose_setup(callReference,
- context, contextCancelled);
+ ServerStreamListener streamListener = testStreamClose_setup(callReference,
+ context, contextCancelled, null);
// For close status being non OK:
// isCancelled is expected to be true immediately after calling closed(), without needing
@@ -1048,14 +1054,14 @@
}
@Test
- public void testClientClose_OkTriggersDelayedCancellation() throws Exception {
+ public void testStreamClose_clientOkTriggersDelayedCancellation() throws Exception {
AtomicBoolean contextCancelled = new AtomicBoolean(false);
AtomicReference<Context> context = new AtomicReference<>();
AtomicReference<ServerCall<String, Integer>> callReference
= new AtomicReference<>();
- ServerStreamListener streamListener = testClientClose_setup(callReference,
- context, contextCancelled);
+ ServerStreamListener streamListener = testStreamClose_setup(callReference,
+ context, contextCancelled, null);
// For close status OK:
// isCancelled is expected to be true after all pending work is done
@@ -1072,6 +1078,27 @@
}
@Test
+ public void testStreamClose_deadlineExceededTriggersImmediateCancellation() throws Exception {
+ AtomicBoolean contextCancelled = new AtomicBoolean(false);
+ AtomicReference<Context> context = new AtomicReference<>();
+ AtomicReference<ServerCall<String, Integer>> callReference
+ = new AtomicReference<>();
+
+ testStreamClose_setup(callReference, context, contextCancelled, 50L);
+
+ timer.forwardNanos(49);
+
+ assertFalse(callReference.get().isCancelled());
+ assertFalse(context.get().isCancelled());
+
+ assertEquals(1, timer.forwardNanos(1));
+
+ assertTrue(callReference.get().isCancelled());
+ assertTrue(context.get().isCancelled());
+ assertTrue(contextCancelled.get());
+ }
+
+ @Test
public void getPort() throws Exception {
final InetSocketAddress addr = new InetSocketAddress(65535);
transportServer = new SimpleServer() {