rls: LruCache interface and implementation (#6799)

diff --git a/rls/src/main/java/io/grpc/rls/internal/LinkedHashLruCache.java b/rls/src/main/java/io/grpc/rls/internal/LinkedHashLruCache.java
new file mode 100644
index 0000000..b2a122e
--- /dev/null
+++ b/rls/src/main/java/io/grpc/rls/internal/LinkedHashLruCache.java
@@ -0,0 +1,386 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.rls.internal;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import io.grpc.internal.TimeProvider;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A LinkedHashLruCache implements least recently used caching where it supports access order lru
+ * cache eviction while allowing entry level expiration time. When the cache reaches max capacity,
+ * LruCache try to remove up to one already expired entries. If it doesn't find any expired entries,
+ * it will remove based on access order of entry. On top of this, LruCache also proactively removes
+ * expired entries based on configured time interval.
+ */
+@ThreadSafe
+abstract class LinkedHashLruCache<K, V> implements LruCache<K, V> {
+
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private final LinkedHashMap<K, SizedValue> delegate;
+  private final PeriodicCleaner periodicCleaner;
+  private final TimeProvider timeProvider;
+  private final EvictionListener<K, SizedValue> evictionListener;
+  private final AtomicLong estimatedSizeBytes = new AtomicLong();
+  private long estimatedMaxSizeBytes;
+
+  LinkedHashLruCache(
+      final long estimatedMaxSizeBytes,
+      @Nullable final EvictionListener<K, V> evictionListener,
+      int cleaningInterval,
+      TimeUnit cleaningIntervalUnit,
+      ScheduledExecutorService ses,
+      final TimeProvider timeProvider) {
+    checkState(estimatedMaxSizeBytes > 0, "max estimated cache size should be positive");
+    this.estimatedMaxSizeBytes = estimatedMaxSizeBytes;
+    this.evictionListener = new SizeHandlingEvictionListener(evictionListener);
+    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
+    delegate = new LinkedHashMap<K, SizedValue>(
+        // rough estimate or minimum hashmap default
+        Math.max((int) (estimatedMaxSizeBytes / 1000), 16),
+        /* loadFactor= */ 0.75f,
+        /* accessOrder= */ true) {
+      @Override
+      protected boolean removeEldestEntry(Map.Entry<K, SizedValue> eldest) {
+        if (estimatedSizeBytes.get() <= LinkedHashLruCache.this.estimatedMaxSizeBytes) {
+          return false;
+        }
+
+        // first, remove at most 1 expired entry
+        boolean removed = cleanupExpiredEntries(1, timeProvider.currentTimeNanos());
+        // handles size based eviction if necessary no expired entry
+        boolean shouldRemove =
+            !removed && shouldInvalidateEldestEntry(eldest.getKey(), eldest.getValue().value);
+        if (shouldRemove) {
+          // remove entry by us to make sure lruIterator and cache is in sync
+          LinkedHashLruCache.this.invalidate(eldest.getKey(), EvictionType.SIZE);
+        }
+        return false;
+      }
+    };
+    periodicCleaner = new PeriodicCleaner(ses, cleaningInterval, cleaningIntervalUnit).start();
+  }
+
+  /**
+   * Determines if the eldest entry should be kept or not when the cache size limit is reached. Note
+   * that LruCache is access level and the eldest is determined by access pattern.
+   */
+  @SuppressWarnings("unused")
+  protected boolean shouldInvalidateEldestEntry(K eldestKey, V eldestValue) {
+    return true;
+  }
+
+  /** Determines if the entry is already expired or not. */
+  protected abstract boolean isExpired(K key, V value, long nowNanos);
+
+  /**
+   * Returns estimated size of entry to keep track. If it always returns 1, the max size bytes
+   * behaves like max number of entry (default behavior).
+   */
+  @SuppressWarnings("unused")
+  protected int estimateSizeOf(K key, V value) {
+    return 1;
+  }
+
+  /** Updates size for given key if entry exists. It is useful if the cache value is mutated. */
+  public void updateEntrySize(K key) {
+    synchronized (lock) {
+      SizedValue entry = readInternal(key);
+      if (entry == null) {
+        return;
+      }
+      int prevSize = entry.size;
+      int newSize = estimateSizeOf(key, entry.value);
+      entry.size = newSize;
+      estimatedSizeBytes.addAndGet(newSize - prevSize);
+    }
+  }
+
+  @Override
+  @Nullable
+  public final V cache(K key, V value) {
+    checkNotNull(key, "key");
+    checkNotNull(value, "value");
+    SizedValue existing;
+    int size = estimateSizeOf(key, value);
+    synchronized (lock) {
+      estimatedSizeBytes.addAndGet(size);
+      existing = delegate.put(key, new SizedValue(size, value));
+      if (existing != null) {
+        evictionListener.onEviction(key, existing, EvictionType.REPLACED);
+      }
+    }
+    return existing == null ? null : existing.value;
+  }
+
+  @Override
+  @Nullable
+  @CheckReturnValue
+  public final V read(K key) {
+    SizedValue entry = readInternal(key);
+    if (entry != null) {
+      return entry.value;
+    }
+    return null;
+  }
+
+  @Nullable
+  @CheckReturnValue
+  private SizedValue readInternal(K key) {
+    checkNotNull(key, "key");
+    synchronized (lock) {
+      SizedValue existing = delegate.get(key);
+      if (existing != null && isExpired(key, existing.value, timeProvider.currentTimeNanos())) {
+        invalidate(key, EvictionType.EXPIRED);
+        return null;
+      }
+      return existing;
+    }
+  }
+
+  @Override
+  @Nullable
+  public final V invalidate(K key) {
+    return invalidate(key, EvictionType.EXPLICIT);
+  }
+
+  @Nullable
+  private V invalidate(K key, EvictionType cause) {
+    checkNotNull(key, "key");
+    checkNotNull(cause, "cause");
+    synchronized (lock) {
+      SizedValue existing = delegate.remove(key);
+      if (existing != null) {
+        evictionListener.onEviction(key, existing, cause);
+      }
+      return existing == null ? null : existing.value;
+    }
+  }
+
+  @Override
+  public final void invalidateAll(Iterable<K> keys) {
+    checkNotNull(keys, "keys");
+    synchronized (lock) {
+      for (K key : keys) {
+        SizedValue existing = delegate.remove(key);
+        if (existing != null) {
+          evictionListener.onEviction(key, existing, EvictionType.EXPLICIT);
+        }
+      }
+    }
+  }
+
+  @Override
+  @CheckReturnValue
+  public final boolean hasCacheEntry(K key) {
+    // call readInternal to filter already expired entry in the cache
+    return readInternal(key) != null;
+  }
+
+  /** Returns shallow copied values in the cache. */
+  public final List<V> values() {
+    synchronized (lock) {
+      List<V> list = new ArrayList<>(delegate.size());
+      for (SizedValue value : delegate.values()) {
+        list.add(value.value);
+      }
+      return Collections.unmodifiableList(list);
+    }
+  }
+
+  /**
+   * Resizes cache. If new size is smaller than current estimated size, it will free up space by
+   * removing expired entries and removing oldest entries by LRU order.
+   */
+  public final void resize(int newSizeBytes) {
+    long now = timeProvider.currentTimeNanos();
+    synchronized (lock) {
+      long estimatedSizeBytesCopy = estimatedMaxSizeBytes;
+      this.estimatedMaxSizeBytes = newSizeBytes;
+      if (estimatedSizeBytesCopy <= newSizeBytes) {
+        // new size is larger no need to do cleanup
+        return;
+      }
+      // cleanup expired entries
+      cleanupExpiredEntries(now);
+
+      // cleanup eldest entry until new size limit
+      Iterator<Map.Entry<K, SizedValue>> lruIter = delegate.entrySet().iterator();
+      while (lruIter.hasNext() && estimatedMaxSizeBytes > this.estimatedSizeBytes.get()) {
+        Map.Entry<K, SizedValue> entry = lruIter.next();
+        lruIter.remove();
+        // eviction listener will update the estimatedSizeBytes
+        evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.SIZE);
+      }
+    }
+  }
+
+  @Override
+  @CheckReturnValue
+  public final int estimatedSize() {
+    synchronized (lock) {
+      return delegate.size();
+    }
+  }
+
+  private boolean cleanupExpiredEntries(long now) {
+    return cleanupExpiredEntries(Integer.MAX_VALUE, now);
+  }
+
+  // maxExpiredEntries is by number of entries
+  private boolean cleanupExpiredEntries(int maxExpiredEntries, long now) {
+    checkArgument(maxExpiredEntries > 0, "maxExpiredEntries must be positive");
+    boolean removedAny = false;
+    synchronized (lock) {
+      Iterator<Map.Entry<K, SizedValue>> lruIter = delegate.entrySet().iterator();
+      while (lruIter.hasNext() && maxExpiredEntries > 0) {
+        Map.Entry<K, SizedValue> entry = lruIter.next();
+        if (isExpired(entry.getKey(), entry.getValue().value, now)) {
+          lruIter.remove();
+          evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.EXPIRED);
+          removedAny = true;
+          maxExpiredEntries--;
+        }
+      }
+    }
+    return removedAny;
+  }
+
+  @Override
+  public final void close() {
+    synchronized (lock) {
+      periodicCleaner.stop();
+      doClose();
+      delegate.clear();
+    }
+  }
+
+  protected void doClose() {}
+
+  /** Periodically cleans up the AsyncRequestCache. */
+  private final class PeriodicCleaner {
+
+    private final ScheduledExecutorService ses;
+    private final int interval;
+    private final TimeUnit intervalUnit;
+    private ScheduledFuture<?> scheduledFuture;
+
+    PeriodicCleaner(ScheduledExecutorService ses, int interval, TimeUnit intervalUnit) {
+      this.ses = checkNotNull(ses, "ses");
+      checkState(interval > 0, "interval must be positive");
+      this.interval = interval;
+      this.intervalUnit = checkNotNull(intervalUnit, "intervalUnit");
+    }
+
+    PeriodicCleaner start() {
+      checkState(scheduledFuture == null, "cleaning task can be started only once");
+      this.scheduledFuture =
+          ses.scheduleAtFixedRate(new CleaningTask(), interval, interval, intervalUnit);
+      return this;
+    }
+
+    void stop() {
+      if (scheduledFuture != null) {
+        scheduledFuture.cancel(false);
+        scheduledFuture = null;
+      }
+    }
+
+    private class CleaningTask implements Runnable {
+
+      @Override
+      public void run() {
+        cleanupExpiredEntries(timeProvider.currentTimeNanos());
+      }
+    }
+  }
+
+  /** A {@link EvictionListener} keeps track of size. */
+  private final class SizeHandlingEvictionListener implements EvictionListener<K, SizedValue> {
+
+    private final EvictionListener<K, V> delegate;
+
+    SizeHandlingEvictionListener(@Nullable EvictionListener<K, V> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void onEviction(K key, SizedValue value, EvictionType cause) {
+      estimatedSizeBytes.addAndGet(-1 * value.size);
+      if (delegate != null) {
+        delegate.onEviction(key, value.value, cause);
+      }
+    }
+  }
+
+  private final class SizedValue {
+    volatile int size;
+    final V value;
+
+    SizedValue(int size, V value) {
+      this.size = size;
+      this.value = value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      // NOTE: the size doesn't affect equality
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      LinkedHashLruCache<?, ?>.SizedValue that = (LinkedHashLruCache<?, ?>.SizedValue) o;
+      return Objects.equals(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+      // NOTE: the size doesn't affect hashCode
+      return Objects.hash(value);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("size", size)
+          .add("value", value)
+          .toString();
+    }
+  }
+}
diff --git a/rls/src/main/java/io/grpc/rls/internal/LruCache.java b/rls/src/main/java/io/grpc/rls/internal/LruCache.java
new file mode 100644
index 0000000..0790991
--- /dev/null
+++ b/rls/src/main/java/io/grpc/rls/internal/LruCache.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.rls.internal;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nullable;
+
+/** An LruCache is a cache with least recently used eviction. */
+interface LruCache<K, V> {
+
+  /**
+   * Populates a cache entry. If the cache entry for given key already exists, the value will be
+   * replaced to the new value.
+   *
+   * @return the previous value associated with key, otherwise {@code null}
+   */
+  @Nullable
+  V cache(K key, V value);
+
+  /**
+   * Returns cached value for given key if exists, otherwise {@code null}. This operation doesn't
+   * return already expired cache entry.
+   */
+  @Nullable
+  @CheckReturnValue
+  V read(K key);
+
+  /**
+   * Invalidates an entry for given key if exists. This operation will trigger {@link
+   * EvictionListener} with {@link EvictionType#EXPLICIT}.
+   *
+   * @return the previous value associated with key, otherwise {@code null}
+   */
+  @Nullable
+  V invalidate(K key);
+
+  /**
+   * Invalidates cache entries for given keys. This operation will trigger {@link EvictionListener}
+   * with {@link EvictionType#EXPLICIT}.
+   */
+  void invalidateAll(Iterable<K> keys);
+
+  /** Returns {@code true} if given key is cached. */
+  @CheckReturnValue
+  boolean hasCacheEntry(K key);
+
+  /**
+   * Returns the estimated number of entry of the cache. Note that the size can be larger than its
+   * true size, because there might be already expired cache.
+   */
+  @CheckReturnValue
+  int estimatedSize();
+
+  /** Closes underlying resources. */
+  void close();
+
+  /** A Listener notifies cache eviction events. */
+  interface EvictionListener<K, V> {
+
+    /**
+     * Notifies the listener when any cache entry is evicted. Implementation can assume that this
+     * method is called serially. Implementation should be non blocking, for long running task
+     * consider offloading the task to {@link java.util.concurrent.Executor}.
+     */
+    void onEviction(K key, V value, EvictionType cause);
+  }
+
+  /** Type of cache eviction. */
+  enum EvictionType {
+    /** Explicitly removed by user. */
+    EXPLICIT,
+    /** Evicted due to size limit. */
+    SIZE,
+    /** Evicted due to entry expired. */
+    EXPIRED,
+    /** Removed due to error. */
+    ERROR,
+    /** Evicted by replacement. */
+    REPLACED
+  }
+}
diff --git a/rls/src/test/java/io/grpc/rls/internal/LinkedHashLruCacheTest.java b/rls/src/test/java/io/grpc/rls/internal/LinkedHashLruCacheTest.java
new file mode 100644
index 0000000..e8252a5
--- /dev/null
+++ b/rls/src/test/java/io/grpc/rls/internal/LinkedHashLruCacheTest.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.rls.internal;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.internal.TimeProvider;
+import io.grpc.rls.internal.LruCache.EvictionListener;
+import io.grpc.rls.internal.LruCache.EvictionType;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class LinkedHashLruCacheTest {
+
+  private static final int MAX_SIZE = 5;
+
+  @Rule
+  public final MockitoRule mocks = MockitoJUnit.rule();
+
+  private final DoNotUseFakeScheduledService fakeScheduledService =
+      mock(DoNotUseFakeScheduledService.class, CALLS_REAL_METHODS);
+  private final TimeProvider timeProvider = fakeScheduledService.getFakeTicker();
+
+  @Mock
+  private EvictionListener<Integer, Entry> evictionListener;
+  private LinkedHashLruCache<Integer, Entry> cache;
+
+  @Before
+  public void setUp() {
+    this.cache = new LinkedHashLruCache<Integer, Entry>(
+        MAX_SIZE,
+        evictionListener,
+        10,
+        TimeUnit.NANOSECONDS,
+        fakeScheduledService,
+        timeProvider) {
+      @Override
+      protected boolean isExpired(Integer key, Entry value, long nowNanos) {
+        return value.expireTime <= nowNanos;
+      }
+    };
+  }
+
+  @Test
+  public void eviction_size() {
+    for (int i = 1; i <= MAX_SIZE; i++) {
+      cache.cache(i, new Entry("Entry" + i, Long.MAX_VALUE));
+    }
+    cache.cache(MAX_SIZE + 1, new Entry("should kick the first", Long.MAX_VALUE));
+
+    verify(evictionListener).onEviction(1, new Entry("Entry1", Long.MAX_VALUE), EvictionType.SIZE);
+    assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE);
+  }
+
+  @Test
+  public void size() {
+    Entry entry1 = new Entry("Entry0", timeProvider.currentTimeNanos() + 10);
+    Entry entry2 = new Entry("Entry1", timeProvider.currentTimeNanos() + 20);
+    cache.cache(0, entry1);
+    cache.cache(1, entry2);
+    assertThat(cache.estimatedSize()).isEqualTo(2);
+
+    assertThat(cache.invalidate(0)).isEqualTo(entry1);
+    assertThat(cache.estimatedSize()).isEqualTo(1);
+
+    assertThat(cache.invalidate(1)).isEqualTo(entry2);
+    assertThat(cache.estimatedSize()).isEqualTo(0);
+  }
+
+  @Test
+  public void eviction_expire() {
+    Entry toBeEvicted = new Entry("Entry0", timeProvider.currentTimeNanos() + 10);
+    Entry survivor = new Entry("Entry1", timeProvider.currentTimeNanos() + 20);
+    cache.cache(0, toBeEvicted);
+    cache.cache(1, survivor);
+
+    fakeScheduledService.advance(10, TimeUnit.NANOSECONDS);
+    verify(evictionListener).onEviction(0, toBeEvicted, EvictionType.EXPIRED);
+
+    fakeScheduledService.advance(10, TimeUnit.NANOSECONDS);
+    verify(evictionListener).onEviction(1, survivor, EvictionType.EXPIRED);
+  }
+
+  @Test
+  public void eviction_explicit() {
+    Entry toBeEvicted = new Entry("Entry0", timeProvider.currentTimeNanos() + 10);
+    Entry survivor = new Entry("Entry1", timeProvider.currentTimeNanos() + 20);
+    cache.cache(0, toBeEvicted);
+    cache.cache(1, survivor);
+
+    assertThat(cache.invalidate(0)).isEqualTo(toBeEvicted);
+
+    verify(evictionListener).onEviction(0, toBeEvicted, EvictionType.EXPLICIT);
+  }
+
+  @Test
+  public void eviction_replaced() {
+    Entry toBeEvicted = new Entry("Entry0", timeProvider.currentTimeNanos() + 10);
+    Entry survivor = new Entry("Entry1", timeProvider.currentTimeNanos() + 20);
+    cache.cache(0, toBeEvicted);
+    cache.cache(0, survivor);
+
+    verify(evictionListener).onEviction(0, toBeEvicted, EvictionType.REPLACED);
+  }
+
+  @Test
+  public void eviction_size_shouldEvictAlreadyExpired() {
+    for (int i = 1; i <= MAX_SIZE; i++) {
+      // last two entries are <= current time (already expired)
+      cache.cache(i, new Entry("Entry" + i, timeProvider.currentTimeNanos() + MAX_SIZE - i - 1));
+    }
+    cache.cache(MAX_SIZE + 1, new Entry("should kick the first", Long.MAX_VALUE));
+
+    // should remove MAX_SIZE-1 instead of MAX_SIZE because MAX_SIZE is accessed later
+    verify(evictionListener)
+        .onEviction(eq(MAX_SIZE - 1), any(Entry.class), eq(EvictionType.EXPIRED));
+    assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE);
+  }
+
+  @Test
+  public void eviction_get_shouldNotReturnAlreadyExpired() {
+    for (int i = 1; i <= MAX_SIZE; i++) {
+      // last entry is already expired when added
+      cache.cache(i, new Entry("Entry" + i, timeProvider.currentTimeNanos() + MAX_SIZE - i));
+    }
+
+    assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE);
+    assertThat(cache.read(MAX_SIZE)).isNull();
+    assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE - 1);
+    verify(evictionListener).onEviction(eq(MAX_SIZE), any(Entry.class), eq(EvictionType.EXPIRED));
+  }
+
+  private static final class Entry {
+    String value;
+    long expireTime;
+
+    Entry(String value, long expireTime) {
+      this.value = value;
+      this.expireTime = expireTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Entry entry = (Entry) o;
+      return expireTime == entry.expireTime && Objects.equals(value, entry.value);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(value, expireTime);
+    }
+  }
+
+  /**
+   * A fake minimal implementation of ScheduledExecutorService *only* supports scheduledAtFixedRate
+   * with a lot of limitation / assumptions. Only intended to be used in this test with
+   * CALL_REAL_METHODS mock.
+   */
+  private abstract static class DoNotUseFakeScheduledService implements ScheduledExecutorService {
+
+    private long currTimeNanos;
+    private long period;
+    private long nextRun;
+    private AtomicReference<Runnable> command;
+
+    @Override
+    public final ScheduledFuture<?> scheduleAtFixedRate(
+        Runnable command, long initialDelay, long period, TimeUnit unit) {
+      // hack to initialize
+      if (this.command == null) {
+        this.command = new AtomicReference<>();
+      }
+      checkState(this.command.get() == null, "only can schedule one");
+      checkState(period > 0, "period should be positive");
+      checkState(initialDelay >= 0, "initial delay should be >= 0");
+      if (initialDelay == 0) {
+        initialDelay = period;
+        command.run();
+      }
+      this.command.set(checkNotNull(command, "command"));
+      this.nextRun = checkNotNull(unit, "unit").toNanos(initialDelay) + currTimeNanos;
+      this.period = unit.toNanos(period);
+      return mock(ScheduledFuture.class);
+    }
+
+    TimeProvider getFakeTicker() {
+      return new TimeProvider() {
+        @Override
+        public long currentTimeNanos() {
+          return currTimeNanos;
+        }
+      };
+    }
+
+    void advance(long delta, TimeUnit unit) {
+      // if scheduled command, only can advance the ticker to trigger at most 1 event
+      boolean scheduled = command != null && command.get() != null;
+      long deltaNanos = unit.toNanos(delta);
+      if (scheduled) {
+        checkArgument(
+            (this.currTimeNanos + deltaNanos) < (nextRun + 2 * period),
+            "Cannot advance ticker because more than one repeated tasks will run");
+        long finalTime = this.currTimeNanos + deltaNanos;
+        if (finalTime >= nextRun) {
+          nextRun += period;
+          this.currTimeNanos = nextRun;
+          command.get().run();
+        }
+        this.currTimeNanos = finalTime;
+      } else {
+        this.currTimeNanos += deltaNanos;
+      }
+    }
+  }
+}