core: allow setting custom Deadline.Ticker to InProcessServerBuilder (#6034)

ServerImpl uses that ticker to create incoming Deadlines. This feature is specifically restricted to in-process, as it can also customize ScheduledExecutorService, and them together can fake out the clock which is useful in tests. On the other hand, a fake Ticker won't work with Netty's ScheduledExecutorService.

Also improved mismatch detection, documentation and tests in Deadline.
diff --git a/context/src/main/java/io/grpc/Deadline.java b/context/src/main/java/io/grpc/Deadline.java
index fadbe50..b330081 100644
--- a/context/src/main/java/io/grpc/Deadline.java
+++ b/context/src/main/java/io/grpc/Deadline.java
@@ -44,6 +44,8 @@
    * <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change.  If you'd like it to be
    * stabilized or have any feedback, please
    * <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
+   *
+   * @since 1.24.0
    */
   public static Ticker getSystemTicker() {
     return SYSTEM_TICKER;
@@ -64,6 +66,11 @@
   /**
    * Create a deadline that will expire at the specified offset based on the given {@link Ticker}.
    *
+   * <p><strong>CAUTION</strong>: Only deadlines created with the same {@link Ticker} instance can
+   * be compared by methods like {@link #minimum}, {@link #isBefore} and {@link #compareTo}.  Custom
+   * Tickers should only be used in tests where you fake out the clock.  Always use the {@link
+   * #getSystemTicker system ticker} in production, or serious errors may occur.
+   *
    * <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change.  If you'd like it to be
    * stabilized or have any feedback, please
    * <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
@@ -72,6 +79,8 @@
    * @param units The time unit for the duration.
    * @param ticker Where this deadline refer the current time
    * @return A new deadline.
+   *
+   * @since 1.24.0
    */
   public static Deadline after(long duration, TimeUnit units, Ticker ticker) {
     checkNotNull(units, "units");
@@ -111,19 +120,22 @@
   }
 
   /**
-   * Is {@code this} deadline before another.
+   * Is {@code this} deadline before another.  Two deadlines must be created using the same {@link
+   * Ticker}.
    */
   public boolean isBefore(Deadline other) {
-    assert this.ticker == other.ticker : "Tickers don't match";
+    checkTicker(other);
     return this.deadlineNanos - other.deadlineNanos < 0;
   }
 
   /**
-   * Return the minimum deadline of {@code this} or an other deadline.
+   * Return the minimum deadline of {@code this} or an other deadline.  They must be created using
+   * the same {@link Ticker}.
+   *
    * @param other deadline to compare with {@code this}.
    */
   public Deadline minimum(Deadline other) {
-    assert this.ticker == other.ticker : "Tickers don't match";
+    checkTicker(other);
     return isBefore(other) ? this : other;
   }
 
@@ -157,6 +169,11 @@
 
   /**
    * Schedule a task to be run when the deadline expires.
+   *
+   * <p>Note if this deadline was created with a custom {@link Ticker}, the {@code scheduler}'s
+   * underlying clock should be synchronized with that Ticker.  Otherwise the task won't be run at
+   * the expected point of time.
+   *
    * @param task to run on expiration
    * @param scheduler used to execute the task
    * @return {@link ScheduledFuture} which can be used to cancel execution of the task
@@ -182,12 +199,20 @@
       buf.append(String.format(".%09d", nanos));
     }
     buf.append("s from now");
+    if (ticker != SYSTEM_TICKER) {
+      buf.append(" (ticker=" + ticker + ")");
+    }
     return buf.toString();
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * <p>Both deadlines must be created with the same {@link Ticker}.
+   */
   @Override
   public int compareTo(Deadline that) {
-    assert this.ticker == that.ticker : "Tickers don't match";
+    checkTicker(that);
     long diff = this.deadlineNanos - that.deadlineNanos;
     if (diff < 0) {
       return -1;
@@ -200,12 +225,18 @@
   /**
    * Time source representing nanoseconds since fixed but arbitrary point in time.
    *
+   * <p>DO NOT use custom {@link Ticker} implementations in production, because deadlines created
+   * with custom tickers are incompatible with those created with the system ticker.  Always use
+   * the {@link #getSystemTicker system ticker} whenever you need to provide one in production code.
+   *
    * <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change.  If you'd like it to be
    * stabilized or have any feedback, please
    * <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
    *
    * <p>In general implementations should be thread-safe, unless it's implemented and used in a
    * localized environment (like unit tests) where you are sure the usages are synchronized.
+   *
+   * @since 1.24.0
    */
   public abstract static class Ticker {
     /** Returns the number of nanoseconds since this source's epoch. */
@@ -225,4 +256,12 @@
     }
     return reference;
   }
+
+  private void checkTicker(Deadline other) {
+    if (ticker != other.ticker) {
+      throw new AssertionError(
+          "Tickers (" + ticker + " and " + other.ticker + ") don't match."
+          + " Custom Ticker should only be used in tests!");
+    }
+  }
 }
diff --git a/context/src/test/java/io/grpc/DeadlineTest.java b/context/src/test/java/io/grpc/DeadlineTest.java
index 48e172d..5f35918 100644
--- a/context/src/test/java/io/grpc/DeadlineTest.java
+++ b/context/src/test/java/io/grpc/DeadlineTest.java
@@ -17,6 +17,7 @@
 package io.grpc;
 
 import static com.google.common.truth.Truth.assertAbout;
+import static com.google.common.truth.Truth.assertThat;
 import static io.grpc.testing.DeadlineSubject.deadline;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,6 +70,18 @@
   }
 
   @Test
+  public void minimum() {
+    Deadline d1 = Deadline.after(1, TimeUnit.MINUTES, ticker);
+    Deadline d2 = Deadline.after(2, TimeUnit.MINUTES, ticker);
+    Deadline d3 = Deadline.after(3, TimeUnit.MINUTES, ticker);
+
+    assertThat(d1.minimum(d2)).isSameInstanceAs(d1);
+    assertThat(d2.minimum(d1)).isSameInstanceAs(d1);
+    assertThat(d3.minimum(d2)).isSameInstanceAs(d2);
+    assertThat(d2.minimum(d3)).isSameInstanceAs(d2);
+  }
+
+  @Test
   public void timeCanOverflow() {
     ticker.reset(Long.MAX_VALUE);
     Deadline d = Deadline.after(10, TimeUnit.DAYS, ticker);
@@ -209,9 +222,15 @@
   }
 
   @Test
+  public void toString_systemTickerNotShown() {
+    Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS);
+    assertThat(d.toString()).endsWith("s from now");
+  }
+
+  @Test
   public void toString_exact() {
     Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS, ticker);
-    assertEquals("0s from now", d.toString());
+    assertEquals("0s from now (ticker=FAKE_TICKER)", d.toString());
   }
 
   @Test
@@ -219,17 +238,17 @@
     Deadline d;
 
     d = Deadline.after(-1, TimeUnit.MINUTES, ticker);
-    assertEquals("-60s from now", d.toString());
+    assertEquals("-60s from now (ticker=FAKE_TICKER)", d.toString());
     d = Deadline.after(-1, TimeUnit.MILLISECONDS, ticker);
-    assertEquals("-0.001000000s from now", d.toString());
+    assertEquals("-0.001000000s from now (ticker=FAKE_TICKER)", d.toString());
     d = Deadline.after(-500, TimeUnit.MILLISECONDS, ticker);
-    assertEquals("-0.500000000s from now", d.toString());
+    assertEquals("-0.500000000s from now (ticker=FAKE_TICKER)", d.toString());
     d = Deadline.after(-1000, TimeUnit.MILLISECONDS, ticker);
-    assertEquals("-1s from now", d.toString());
+    assertEquals("-1s from now (ticker=FAKE_TICKER)", d.toString());
     d = Deadline.after(-1500, TimeUnit.MILLISECONDS, ticker);
-    assertEquals("-1.500000000s from now", d.toString());
+    assertEquals("-1.500000000s from now (ticker=FAKE_TICKER)", d.toString());
     d = Deadline.after(-1023456789, TimeUnit.NANOSECONDS, ticker);
-    assertEquals("-1.023456789s from now", d.toString());
+    assertEquals("-1.023456789s from now (ticker=FAKE_TICKER)", d.toString());
   }
 
   @Test
@@ -256,9 +275,39 @@
   }
 
   @Test
+  public void tickersDontMatch() {
+    Deadline d1 = Deadline.after(10, TimeUnit.SECONDS);
+    Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+    boolean success = false;
+    try {
+      d1.compareTo(d2);
+      success = true;
+    } catch (AssertionError e) {
+      // Expected
+    }
+    assertFalse(success);
+
+    try {
+      d1.minimum(d2);
+      success = true;
+    } catch (AssertionError e) {
+      // Expected
+    }
+    assertFalse(success);
+
+    try {
+      d1.isBefore(d2);
+      success = true;
+    } catch (AssertionError e) {
+      // Expected
+    }
+    assertFalse(success);
+  }
+
+  @Test
   public void toString_before() {
     Deadline d = Deadline.after(12, TimeUnit.MICROSECONDS, ticker);
-    assertEquals("0.000012000s from now", d.toString());
+    assertEquals("0.000012000s from now (ticker=FAKE_TICKER)", d.toString());
   }
 
   private static class FakeTicker extends Deadline.Ticker {
@@ -279,5 +328,10 @@
       }
       this.time += unit.toNanos(period);
     }
+
+    @Override
+    public String toString() {
+      return "FAKE_TICKER";
+    }
   }
 }
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
index ed27f3f..603a794 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
@@ -19,6 +19,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Preconditions;
+import io.grpc.Deadline;
 import io.grpc.ExperimentalApi;
 import io.grpc.ServerStreamTracer;
 import io.grpc.internal.AbstractServerImplBuilder;
@@ -126,6 +127,24 @@
   }
 
   /**
+   * Provides a custom deadline ticker that this server will use to create incoming {@link
+   * Deadline}s.
+   *
+   * <p>This is intended for unit tests that fake out the clock.  You should also have a fake {@link
+   * ScheduledExecutorService} whose clock is synchronized with this ticker and set it to {@link
+   * #scheduledExecutorService}. DO NOT use this in production.
+   *
+   * @return this
+   * @see Deadline#after(long, TimeUnit, Deadline.Ticker)
+   *
+   * @since 1.24.0
+   */
+  public InProcessServerBuilder deadlineTicker(Deadline.Ticker ticker) {
+    setDeadlineTicker(ticker);
+    return this;
+  }
+
+  /**
    * Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables
    * the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}).
    *
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 04e3fbc..c53c79a 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -25,6 +25,7 @@
 import io.grpc.BindableService;
 import io.grpc.CompressorRegistry;
 import io.grpc.Context;
+import io.grpc.Deadline;
 import io.grpc.DecompressorRegistry;
 import io.grpc.HandlerRegistry;
 import io.grpc.InternalChannelz;
@@ -78,6 +79,7 @@
   DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
   CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
   long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS;
+  Deadline.Ticker ticker = Deadline.getSystemTicker();
   @Nullable private CensusStatsModule censusStatsOverride;
   private boolean statsEnabled = true;
   private boolean recordStartedRpcs = true;
@@ -216,6 +218,13 @@
     tracingEnabled = value;
   }
 
+  /**
+   * Sets a custom deadline ticker.  This should only be called from InProcessServerBuilder.
+   */
+  protected void setDeadlineTicker(Deadline.Ticker ticker) {
+    this.ticker = checkNotNull(ticker, "ticker");
+  }
+
   @Override
   public final Server build() {
     ServerImpl server = new ServerImpl(
diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java
index abbd4ec..471507e 100644
--- a/core/src/main/java/io/grpc/internal/ServerImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerImpl.java
@@ -16,6 +16,7 @@
 
 package io.grpc.internal;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static io.grpc.Contexts.statusFromCancelled;
@@ -33,6 +34,7 @@
 import io.grpc.BinaryLog;
 import io.grpc.CompressorRegistry;
 import io.grpc.Context;
+import io.grpc.Deadline;
 import io.grpc.Decompressor;
 import io.grpc.DecompressorRegistry;
 import io.grpc.HandlerRegistry;
@@ -123,6 +125,7 @@
 
   private final InternalChannelz channelz;
   private final CallTracer serverCallTracer;
+  private final Deadline.Ticker ticker;
 
   /**
    * Construct a server.
@@ -157,6 +160,7 @@
     this.binlog = builder.binlog;
     this.channelz = builder.channelz;
     this.serverCallTracer = builder.callTracerFactory.create();
+    this.ticker = checkNotNull(builder.ticker, "ticker");
 
     channelz.addServer(this);
   }
@@ -578,8 +582,10 @@
         return baseContext.withCancellation();
       }
 
-      Context.CancellableContext context = baseContext.withDeadlineAfter(
-          timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService());
+      Context.CancellableContext context =
+          baseContext.withDeadline(
+              Deadline.after(timeoutNanos, NANOSECONDS, ticker),
+              transport.getScheduledExecutorService());
       final class ServerStreamCancellationListener implements Context.CancellationListener {
         @Override
         public void cancelled(Context context) {
diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java
index ae8a418..d708af5 100644
--- a/core/src/test/java/io/grpc/internal/FakeClock.java
+++ b/core/src/test/java/io/grpc/internal/FakeClock.java
@@ -20,6 +20,7 @@
 import com.google.common.base.Supplier;
 import com.google.common.base.Ticker;
 import com.google.common.util.concurrent.AbstractFuture;
+import io.grpc.Deadline;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -61,6 +62,13 @@
         }
       };
 
+  private final Deadline.Ticker deadlineTicker =
+      new Deadline.Ticker() {
+        @Override public long nanoTime() {
+          return currentTimeNanos;
+        }
+      };
+
   private final Supplier<Stopwatch> stopwatchSupplier =
       new Supplier<Stopwatch>() {
         @Override public Stopwatch get() {
@@ -230,6 +238,13 @@
   }
 
   /**
+   * Deadline ticker of the FakeClock.
+   */
+  public Deadline.Ticker getDeadlineTicker() {
+    return deadlineTicker;
+  }
+
+  /**
    * Run all due tasks. Immediately due tasks that are queued during the process also get executed.
    *
    * @return the number of tasks run by this call
diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java
index dd6aa46..1a79f93 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java
@@ -19,6 +19,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static io.grpc.InternalChannelz.id;
 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
+import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -199,6 +200,7 @@
   public void startUp() throws IOException {
     MockitoAnnotations.initMocks(this);
     builder.channelz = channelz;
+    builder.ticker = timer.getDeadlineTicker();
     streamTracerFactories = Arrays.asList(streamTracerFactory);
     when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
     when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
@@ -977,10 +979,11 @@
     verify(stream, times(0)).close(isA(Status.class), ArgumentMatchers.<Metadata>isNotNull());
   }
 
-  private ServerStreamListener testClientClose_setup(
+  private ServerStreamListener testStreamClose_setup(
       final AtomicReference<ServerCall<String, Integer>> callReference,
       final AtomicReference<Context> context,
-      final AtomicBoolean contextCancelled) throws Exception {
+      final AtomicBoolean contextCancelled,
+      @Nullable Long timeoutNanos) throws Exception {
     createAndStartServer();
     callListener = new ServerCall.Listener<String>() {
       @Override
@@ -1011,6 +1014,9 @@
         = transportServer.registerNewServerTransport(new SimpleServerTransport());
     transportListener.transportReady(Attributes.EMPTY);
     Metadata requestHeaders = new Metadata();
+    if (timeoutNanos != null) {
+      requestHeaders.put(TIMEOUT_KEY, timeoutNanos);
+    }
     StatsTraceContext statsTraceCtx =
         StatsTraceContext.newServerContext(streamTracerFactories, "Waitier/serve", requestHeaders);
     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
@@ -1025,14 +1031,14 @@
   }
 
   @Test
-  public void testClientClose_cancelTriggersImmediateCancellation() throws Exception {
+  public void testStreamClose_clientCancelTriggersImmediateCancellation() throws Exception {
     AtomicBoolean contextCancelled = new AtomicBoolean(false);
     AtomicReference<Context> context = new AtomicReference<>();
     AtomicReference<ServerCall<String, Integer>> callReference
         = new AtomicReference<>();
 
-    ServerStreamListener streamListener = testClientClose_setup(callReference,
-        context, contextCancelled);
+    ServerStreamListener streamListener = testStreamClose_setup(callReference,
+        context, contextCancelled, null);
 
     // For close status being non OK:
     // isCancelled is expected to be true immediately after calling closed(), without needing
@@ -1048,14 +1054,14 @@
   }
 
   @Test
-  public void testClientClose_OkTriggersDelayedCancellation() throws Exception {
+  public void testStreamClose_clientOkTriggersDelayedCancellation() throws Exception {
     AtomicBoolean contextCancelled = new AtomicBoolean(false);
     AtomicReference<Context> context = new AtomicReference<>();
     AtomicReference<ServerCall<String, Integer>> callReference
         = new AtomicReference<>();
 
-    ServerStreamListener streamListener = testClientClose_setup(callReference,
-        context, contextCancelled);
+    ServerStreamListener streamListener = testStreamClose_setup(callReference,
+        context, contextCancelled, null);
 
     // For close status OK:
     // isCancelled is expected to be true after all pending work is done
@@ -1072,6 +1078,27 @@
   }
 
   @Test
+  public void testStreamClose_deadlineExceededTriggersImmediateCancellation() throws Exception {
+    AtomicBoolean contextCancelled = new AtomicBoolean(false);
+    AtomicReference<Context> context = new AtomicReference<>();
+    AtomicReference<ServerCall<String, Integer>> callReference
+        = new AtomicReference<>();
+
+    testStreamClose_setup(callReference, context, contextCancelled, 50L);
+
+    timer.forwardNanos(49);
+
+    assertFalse(callReference.get().isCancelled());
+    assertFalse(context.get().isCancelled());
+
+    assertEquals(1, timer.forwardNanos(1));
+    
+    assertTrue(callReference.get().isCancelled());
+    assertTrue(context.get().isCancelled());
+    assertTrue(contextCancelled.get());
+  }
+
+  @Test
   public void getPort() throws Exception {
     final InetSocketAddress addr = new InetSocketAddress(65535);
     transportServer = new SimpleServer() {