core,okhttp: add TransportTracer to okhttpclient (#3809)

diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
index 555d634..8b2bcf1 100644
--- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
@@ -16,6 +16,7 @@
 
 package io.grpc.internal;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
 
@@ -86,7 +87,6 @@
     void cancel(Status status);
   }
 
-  @Nullable // okhttp does not support transportTracer yet
   private final TransportTracer transportTracer;
   private final Framer framer;
   private boolean useGet;
@@ -102,11 +102,11 @@
   protected AbstractClientStream(
       WritableBufferAllocator bufferAllocator,
       StatsTraceContext statsTraceCtx,
-      @Nullable TransportTracer transportTracer,
+      TransportTracer transportTracer,
       Metadata headers,
       boolean useGet) {
-    Preconditions.checkNotNull(headers, "headers");
-    this.transportTracer = transportTracer;
+    checkNotNull(headers, "headers");
+    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
     this.useGet = useGet;
     if (!useGet) {
       framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
@@ -217,9 +217,9 @@
     protected TransportState(
         int maxMessageSize,
         StatsTraceContext statsTraceCtx,
-        @Nullable TransportTracer transportTracer) {
+        TransportTracer transportTracer) {
       super(maxMessageSize, statsTraceCtx, transportTracer);
-      this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
+      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
     }
 
     private void setFullStreamDecompression(boolean fullStreamDecompression) {
@@ -229,13 +229,13 @@
     private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
       Preconditions.checkState(this.listener == null, "Already called start");
       this.decompressorRegistry =
-          Preconditions.checkNotNull(decompressorRegistry, "decompressorRegistry");
+          checkNotNull(decompressorRegistry, "decompressorRegistry");
     }
 
     @VisibleForTesting
     public final void setListener(ClientStreamListener listener) {
       Preconditions.checkState(this.listener == null, "Already called setListener");
-      this.listener = Preconditions.checkNotNull(listener, "listener");
+      this.listener = checkNotNull(listener, "listener");
     }
 
     @Override
@@ -308,7 +308,7 @@
      * @param frame the received data frame. Its ownership is transferred to this method.
      */
     protected void inboundDataReceived(ReadableBuffer frame) {
-      Preconditions.checkNotNull(frame, "frame");
+      checkNotNull(frame, "frame");
       boolean needToCloseFrame = true;
       try {
         if (statusReported) {
@@ -332,8 +332,8 @@
      * @param status the status extracted from the trailers
      */
     protected void inboundTrailersReceived(Metadata trailers, Status status) {
-      Preconditions.checkNotNull(status, "status");
-      Preconditions.checkNotNull(trailers, "trailers");
+      checkNotNull(status, "status");
+      checkNotNull(trailers, "trailers");
       if (statusReported) {
         log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
             new Object[]{status, trailers});
@@ -356,8 +356,8 @@
      */
     public final void transportReportStatus(final Status status, boolean stopDelivery,
         final Metadata trailers) {
-      Preconditions.checkNotNull(status, "status");
-      Preconditions.checkNotNull(trailers, "trailers");
+      checkNotNull(status, "status");
+      checkNotNull(trailers, "trailers");
       // If stopDelivery, we continue in case previous invocation is waiting for stall
       if (statusReported && !stopDelivery) {
         return;
@@ -404,8 +404,8 @@
     private byte[] payload;
 
     public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
-      this.headers = Preconditions.checkNotNull(headers, "headers");
-      this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
+      this.headers = checkNotNull(headers, "headers");
+      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
     }
 
     @Override
diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index 8bb9df5..7f6ec15 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -24,7 +24,6 @@
 import io.grpc.Compressor;
 import io.grpc.Decompressor;
 import java.io.InputStream;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 /**
@@ -108,7 +107,6 @@
     private Deframer deframer;
     private final Object onReadyLock = new Object();
     private final StatsTraceContext statsTraceCtx;
-    @Nullable // okhttp transports don't trace yet
     private final TransportTracer transportTracer;
 
     /**
@@ -133,9 +131,9 @@
     protected TransportState(
         int maxMessageSize,
         StatsTraceContext statsTraceCtx,
-        @Nullable TransportTracer transportTracer) { // nullable: okhttp transports don't trace yet
+        TransportTracer transportTracer) {
       this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
-      this.transportTracer = transportTracer;
+      this.transportTracer = checkNotNull(transportTracer, "transportTracer");
       deframer = new MessageDeframer(
           this,
           Codec.Identity.NONE,
@@ -234,9 +232,7 @@
         allocated = true;
       }
       notifyIfReady();
-      if (transportTracer != null) {
-        transportTracer.reportStreamStarted();
-      }
+      transportTracer.reportStreamStarted();
     }
 
     /**
diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java
index 6fb09f4..a941b86 100644
--- a/core/src/main/java/io/grpc/internal/MessageDeframer.java
+++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java
@@ -88,8 +88,6 @@
   private Listener listener;
   private int maxInboundMessageSize;
   private final StatsTraceContext statsTraceCtx;
-  // transportTracer is nullable until it is integrated with client transports
-  @Nullable
   private final TransportTracer transportTracer;
   private final String debugString;
   private Decompressor decompressor;
@@ -123,13 +121,13 @@
       Decompressor decompressor,
       int maxMessageSize,
       StatsTraceContext statsTraceCtx,
-      @Nullable TransportTracer transportTracer,
+      TransportTracer transportTracer,
       String debugString) {
     this.listener = checkNotNull(listener, "sink");
     this.decompressor = checkNotNull(decompressor, "decompressor");
     this.maxInboundMessageSize = maxMessageSize;
     this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
-    this.transportTracer = transportTracer;
+    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
     this.debugString = debugString;
   }
 
@@ -395,9 +393,7 @@
 
     currentMessageSeqNo++;
     statsTraceCtx.inboundMessage(currentMessageSeqNo);
-    if (transportTracer != null) {
-      transportTracer.reportMessageReceived();
-    }
+    transportTracer.reportMessageReceived();
     // Continue reading the frame body.
     state = State.BODY;
   }
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index 3dd05a6..41ffc0e 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -38,6 +38,7 @@
 import io.grpc.internal.ProxyParameters;
 import io.grpc.internal.SharedResourceHolder;
 import io.grpc.internal.SharedResourceHolder.Resource;
+import io.grpc.internal.TransportTracer;
 import io.grpc.okhttp.internal.Platform;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -127,6 +128,13 @@
     }
   }
 
+  @VisibleForTesting
+  final OkHttpChannelBuilder setTransportTracerFactory(
+      TransportTracer.Factory transportTracerFactory) {
+    this.transportTracerFactory = transportTracerFactory;
+    return this;
+  }
+
   /**
    * Override the default executor necessary for internal transport use.
    *
@@ -310,7 +318,8 @@
     boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED;
     return new OkHttpTransportFactory(transportExecutor,
         createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(),
-        enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls);
+        enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
+        transportTracerFactory);
   }
 
   @Override
@@ -376,6 +385,7 @@
   static final class OkHttpTransportFactory implements ClientTransportFactory {
     private final Executor executor;
     private final boolean usingSharedExecutor;
+    private final TransportTracer.Factory transportTracerFactory;
     @Nullable
     private final SSLSocketFactory socketFactory;
     @Nullable
@@ -398,7 +408,8 @@
         boolean enableKeepAlive,
         long keepAliveTimeNanos,
         long keepAliveTimeoutNanos,
-        boolean keepAliveWithoutCalls) {
+        boolean keepAliveWithoutCalls,
+        TransportTracer.Factory transportTracerFactory) {
       this.socketFactory = socketFactory;
       this.hostnameVerifier = hostnameVerifier;
       this.connectionSpec = connectionSpec;
@@ -409,6 +420,8 @@
       this.keepAliveWithoutCalls = keepAliveWithoutCalls;
 
       usingSharedExecutor = executor == null;
+      this.transportTracerFactory =
+          Preconditions.checkNotNull(transportTracerFactory, "transportTracerFactory");
       if (usingSharedExecutor) {
         // The executor was unspecified, using the shared executor.
         this.executor = SharedResourceHolder.get(SHARED_EXECUTOR);
@@ -444,7 +457,8 @@
           proxy == null ? null : proxy.proxyAddress,
           proxy == null ? null : proxy.username,
           proxy == null ? null : proxy.password,
-          tooManyPingsRunnable);
+          tooManyPingsRunnable,
+          transportTracerFactory.create());
       if (enableKeepAlive) {
         transport.enableKeepAlive(
             true, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls);
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
index ee4a221..8349fd6 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
@@ -27,6 +27,7 @@
 import io.grpc.internal.AbstractClientStream;
 import io.grpc.internal.Http2ClientStreamTransportState;
 import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
 import io.grpc.internal.WritableBuffer;
 import io.grpc.okhttp.internal.framed.ErrorCode;
 import io.grpc.okhttp.internal.framed.Header;
@@ -69,11 +70,12 @@
       int maxMessageSize,
       String authority,
       String userAgent,
-      StatsTraceContext statsTraceCtx) {
+      StatsTraceContext statsTraceCtx,
+      TransportTracer transportTracer) {
     super(
         new OkHttpWritableBufferAllocator(),
         statsTraceCtx,
-        /*transportTracer=*/ null,
+        transportTracer,
         headers,
         method.isSafe());
     this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
@@ -152,6 +154,7 @@
 
       synchronized (state.lock) {
         state.sendBuffer(buffer, endOfStream, flush);
+        getTransportTracer().reportMessageSent(numMessages);
       }
     }
 
@@ -200,7 +203,7 @@
         AsyncFrameWriter frameWriter,
         OutboundFlowController outboundFlow,
         OkHttpClientTransport transport) {
-      super(maxMessageSize, statsTraceCtx,  /*transportTracer=*/ null);
+      super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer());
       this.lock = checkNotNull(lock, "lock");
       this.frameWriter = frameWriter;
       this.outboundFlow = outboundFlow;
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
index 579aa71..2e4e6ad 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
@@ -46,6 +46,7 @@
 import io.grpc.internal.SerializingExecutor;
 import io.grpc.internal.SharedResourceHolder;
 import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
 import io.grpc.okhttp.internal.ConnectionSpec;
 import io.grpc.okhttp.internal.framed.ErrorCode;
 import io.grpc.okhttp.internal.framed.FrameReader;
@@ -181,6 +182,8 @@
   @Nullable
   private final String proxyPassword;
   private final Runnable tooManyPingsRunnable;
+  @GuardedBy("lock")
+  private final TransportTracer transportTracer;
 
   // The following fields should only be used for test.
   Runnable connectingCallback;
@@ -190,7 +193,8 @@
       Executor executor, @Nullable SSLSocketFactory sslSocketFactory,
       @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec,
       int maxMessageSize, @Nullable InetSocketAddress proxyAddress, @Nullable String proxyUsername,
-      @Nullable String proxyPassword, Runnable tooManyPingsRunnable) {
+      @Nullable String proxyPassword, Runnable tooManyPingsRunnable,
+      TransportTracer transportTracer) {
     this.address = Preconditions.checkNotNull(address, "address");
     this.defaultAuthority = authority;
     this.maxMessageSize = maxMessageSize;
@@ -209,6 +213,8 @@
     this.proxyPassword = proxyPassword;
     this.tooManyPingsRunnable =
         Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
+    this.transportTracer = Preconditions.checkNotNull(transportTracer);
+    initTransportTracer();
   }
 
   /**
@@ -226,7 +232,8 @@
       @Nullable Runnable connectingCallback,
       SettableFuture<Void> connectedFuture,
       int maxMessageSize,
-      Runnable tooManyPingsRunnable) {
+      Runnable tooManyPingsRunnable,
+      TransportTracer transportTracer) {
     address = null;
     this.maxMessageSize = maxMessageSize;
     defaultAuthority = "notarealauthority:80";
@@ -246,6 +253,23 @@
     this.proxyPassword = null;
     this.tooManyPingsRunnable =
         Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
+    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
+    initTransportTracer();
+  }
+
+  private void initTransportTracer() {
+    synchronized (lock) { // to make @GuardedBy linter happy
+      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
+        @Override
+        public TransportTracer.FlowControlWindows read() {
+          synchronized (lock) {
+            long local = -1; // okhttp does not track the local window size
+            long remote = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
+            return new TransportTracer.FlowControlWindows(local, remote);
+          }
+        }
+      });
+    }
   }
 
   /**
@@ -286,6 +310,7 @@
         stopwatch.start();
         p = ping = new Http2Ping(data, stopwatch);
         writePing = true;
+        transportTracer.reportKeepAliveSent();
       }
     }
     if (writePing) {
@@ -302,8 +327,18 @@
     Preconditions.checkNotNull(method, "method");
     Preconditions.checkNotNull(headers, "headers");
     StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
-    return new OkHttpClientStream(method, headers, frameWriter, OkHttpClientTransport.this,
-        outboundFlow, lock, maxMessageSize, defaultAuthority, userAgent, statsTraceCtx);
+    return new OkHttpClientStream(
+        method,
+        headers,
+        frameWriter,
+        OkHttpClientTransport.this,
+        outboundFlow,
+        lock,
+        maxMessageSize,
+        defaultAuthority,
+        userAgent,
+        statsTraceCtx,
+        transportTracer);
   }
 
   @GuardedBy("lock")
@@ -858,9 +893,11 @@
 
   @Override
   public Future<InternalTransportStats> getTransportStats() {
-    SettableFuture<InternalTransportStats> ret = SettableFuture.create();
-    ret.set(null);
-    return ret;
+    synchronized (lock) {
+      SettableFuture<InternalTransportStats> ret = SettableFuture.create();
+      ret.set(transportTracer.getStats());
+      return ret;
+    }
   }
 
   /**
diff --git a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java
index 983f24a..71cdfe3 100644
--- a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java
+++ b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java
@@ -23,7 +23,9 @@
 public final class AccessProtectedHack {
   public static InternalServer serverBuilderBuildTransportServer(
       AbstractServerImplBuilder<?> builder,
-      List<ServerStreamTracer.Factory> streamTracerFactories) {
+      List<ServerStreamTracer.Factory> streamTracerFactories,
+      TransportTracer.Factory transportTracerFactory) {
+    builder.transportTracerFactory = transportTracerFactory;
     return builder.buildTransportServer(streamTracerFactories);
   }
 
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
index 1a249c4..007e35f 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
@@ -33,6 +33,7 @@
 import io.grpc.internal.ClientStreamListener;
 import io.grpc.internal.GrpcUtil;
 import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
 import io.grpc.okhttp.internal.framed.ErrorCode;
 import io.grpc.okhttp.internal.framed.Header;
 import java.io.ByteArrayInputStream;
@@ -62,6 +63,7 @@
   @Captor private ArgumentCaptor<List<Header>> headersCaptor;
 
   private final Object lock = new Object();
+  private final TransportTracer transportTracer = new TransportTracer();
 
   private MethodDescriptor<?, ?> methodDescriptor;
   private OkHttpClientStream stream;
@@ -76,8 +78,18 @@
         .setResponseMarshaller(marshaller)
         .build();
 
-    stream = new OkHttpClientStream(methodDescriptor, new Metadata(), frameWriter, transport,
-        flowController, lock, MAX_MESSAGE_SIZE, "localhost", "userAgent", StatsTraceContext.NOOP);
+    stream = new OkHttpClientStream(
+        methodDescriptor,
+        new Metadata(),
+        frameWriter,
+        transport,
+        flowController,
+        lock,
+        MAX_MESSAGE_SIZE,
+        "localhost",
+        "userAgent",
+        StatsTraceContext.NOOP,
+        transportTracer);
   }
 
   @Test
@@ -134,7 +146,7 @@
     metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
     stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
         flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
-        StatsTraceContext.NOOP);
+        StatsTraceContext.NOOP, transportTracer);
     stream.start(new BaseClientStreamListener());
     stream.transportState().start(3);
 
@@ -149,7 +161,7 @@
     metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
     stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
         flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
-        StatsTraceContext.NOOP);
+        StatsTraceContext.NOOP, transportTracer);
     stream.start(new BaseClientStreamListener());
     stream.transportState().start(3);
 
@@ -177,7 +189,7 @@
         .build();
     stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport,
         flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
-        StatsTraceContext.NOOP);
+        StatsTraceContext.NOOP, transportTracer);
     stream.start(new BaseClientStreamListener());
 
     // GET streams send headers after halfClose is called.
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
index 5bc0ed5..2ecfc77 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
@@ -54,6 +54,7 @@
 import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.CallOptions;
 import io.grpc.InternalStatus;
+import io.grpc.InternalTransportStats;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import io.grpc.MethodDescriptor.MethodType;
@@ -65,6 +66,7 @@
 import io.grpc.internal.ClientTransport;
 import io.grpc.internal.GrpcUtil;
 import io.grpc.internal.ManagedClientTransport;
+import io.grpc.internal.TransportTracer;
 import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler;
 import io.grpc.okhttp.internal.ConnectionSpec;
 import io.grpc.okhttp.internal.framed.ErrorCode;
@@ -92,6 +94,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLSocketFactory;
 import okio.Buffer;
 import okio.ByteString;
 import org.junit.After;
@@ -127,9 +131,16 @@
 
   @Mock
   private ManagedClientTransport.Listener transportListener;
+
+  private final SSLSocketFactory sslSocketFactory = null;
+  private final HostnameVerifier hostnameVerifier = null;
+  private final InetSocketAddress proxyAddr = null;
+  private final String proxyUser = null;
+  private final String proxyPassword = null;
+  private final TransportTracer transportTracer = new TransportTracer();
   private OkHttpClientTransport clientTransport;
   private MockFrameReader frameReader;
-  private ExecutorService executor;
+  private ExecutorService executor = Executors.newCachedThreadPool();
   private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
   private SettableFuture<Void> connectedFuture;
   private DelayConnectedCallback delayConnectedCallback;
@@ -143,7 +154,6 @@
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
-    executor = Executors.newCachedThreadPool();
     when(frameWriter.maxDataLength()).thenReturn(Integer.MAX_VALUE);
     frameReader = new MockFrameReader();
   }
@@ -192,7 +202,8 @@
         connectingCallback,
         connectedFuture,
         maxMessageSize,
-        tooManyPingsRunnable);
+        tooManyPingsRunnable,
+        new TransportTracer());
     clientTransport.start(transportListener);
     if (waitingForConnected) {
       connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
@@ -203,10 +214,19 @@
   public void testToString() throws Exception {
     InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
     clientTransport = new OkHttpClientTransport(
-        address, "hostname", null /* agent */, executor, /* sslSocketFactory */ null,
-        /* hostnameVerifier */null,
-        Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC), DEFAULT_MAX_MESSAGE_SIZE,
-        null, null, null, tooManyPingsRunnable);
+        address,
+        "hostname",
+        /*agent=*/ null,
+        executor,
+        sslSocketFactory,
+        hostnameVerifier,
+        Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC),
+        DEFAULT_MAX_MESSAGE_SIZE,
+        proxyAddr,
+        proxyUser,
+        proxyPassword,
+        tooManyPingsRunnable,
+        transportTracer);
     String s = clientTransport.toString();
     assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport"));
     assertTrue("Unexpected: " + s, s.contains(address.toString()));
@@ -525,6 +545,30 @@
   }
 
   @Test
+  public void transportTracer_windowSizeDefault() throws Exception {
+    initTransport();
+    InternalTransportStats stats = clientTransport.getTransportStats().get();
+    assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
+    // okhttp does not track local window sizes
+    assertEquals(-1, stats.localFlowControlWindow);
+  }
+
+  @Test
+  public void transportTracer_windowSize_remote() throws Exception {
+    initTransport();
+    InternalTransportStats before = clientTransport.getTransportStats().get();
+    assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
+    // okhttp does not track local window sizes
+    assertEquals(-1, before.localFlowControlWindow);
+
+    frameHandler().windowUpdate(0, 1000);
+    InternalTransportStats after = clientTransport.getTransportStats().get();
+    assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow);
+    // okhttp does not track local window sizes
+    assertEquals(-1, after.localFlowControlWindow);
+  }
+
+  @Test
   public void windowUpdate() throws Exception {
     initTransport();
     MockStreamListener listener1 = new MockStreamListener();
@@ -1234,9 +1278,11 @@
     initTransport();
     PingCallbackImpl callback1 = new PingCallbackImpl();
     clientTransport.ping(callback1, MoreExecutors.directExecutor());
+    assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
     // add'l ping will be added as listener to outstanding operation
     PingCallbackImpl callback2 = new PingCallbackImpl();
     clientTransport.ping(callback2, MoreExecutors.directExecutor());
+    assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
 
     ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class);
     ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class);
@@ -1269,6 +1315,7 @@
     // now that previous ping is done, next request returns a different future
     callback1 = new PingCallbackImpl();
     clientTransport.ping(callback1, MoreExecutors.directExecutor());
+    assertEquals(2, clientTransport.getTransportStats().get().keepAlivesSent);
     assertEquals(0, callback1.invocationCount);
     shutdownAndVerify();
   }
@@ -1278,6 +1325,7 @@
     initTransport();
     PingCallbackImpl callback = new PingCallbackImpl();
     clientTransport.ping(callback, MoreExecutors.directExecutor());
+    assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
     assertEquals(0, callback.invocationCount);
 
     clientTransport.shutdown(SHUTDOWN_REASON);
@@ -1289,6 +1337,7 @@
     // now that handler is in terminal state, all future pings fail immediately
     callback = new PingCallbackImpl();
     clientTransport.ping(callback, MoreExecutors.directExecutor());
+    assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
     assertEquals(1, callback.invocationCount);
     assertTrue(callback.failureCause instanceof StatusException);
     assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
@@ -1300,6 +1349,7 @@
     initTransport();
     PingCallbackImpl callback = new PingCallbackImpl();
     clientTransport.ping(callback, MoreExecutors.directExecutor());
+    assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
     assertEquals(0, callback.invocationCount);
 
     clientTransport.onException(new IOException());
@@ -1312,6 +1362,7 @@
     // now that handler is in terminal state, all future pings fail immediately
     callback = new PingCallbackImpl();
     clientTransport.ping(callback, MoreExecutors.directExecutor());
+    assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
     assertEquals(1, callback.invocationCount);
     assertTrue(callback.failureCause instanceof StatusException);
     assertEquals(Status.Code.UNAVAILABLE,
@@ -1392,14 +1443,15 @@
         "invalid_authority",
         "userAgent",
         executor,
-        null,
-        null,
+        sslSocketFactory,
+        hostnameVerifier,
         ConnectionSpec.CLEARTEXT,
         DEFAULT_MAX_MESSAGE_SIZE,
-        null,
-        null,
-        null,
-        tooManyPingsRunnable);
+        proxyAddr,
+        proxyUser,
+        proxyPassword,
+        tooManyPingsRunnable,
+        transportTracer);
 
     String host = clientTransport.getOverridenHost();
     int port = clientTransport.getOverridenPort();
@@ -1415,14 +1467,15 @@
         "authority",
         "userAgent",
         executor,
-        null,
-        null,
+        sslSocketFactory,
+        hostnameVerifier,
         ConnectionSpec.CLEARTEXT,
         DEFAULT_MAX_MESSAGE_SIZE,
-        null,
-        null,
-        null,
-        tooManyPingsRunnable);
+        proxyAddr,
+        proxyUser,
+        proxyPassword,
+        tooManyPingsRunnable,
+        new TransportTracer());
 
     ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
     clientTransport.start(listener);
@@ -1446,14 +1499,15 @@
         "authority",
         "userAgent",
         executor,
-        null,
-        null,
+        sslSocketFactory,
+        hostnameVerifier,
         ConnectionSpec.CLEARTEXT,
         DEFAULT_MAX_MESSAGE_SIZE,
         (InetSocketAddress) serverSocket.getLocalSocketAddress(),
-        null,
-        null,
-        tooManyPingsRunnable);
+        proxyUser,
+        proxyPassword,
+        tooManyPingsRunnable,
+        transportTracer);
     clientTransport.start(transportListener);
 
     Socket sock = serverSocket.accept();
@@ -1496,14 +1550,15 @@
         "authority",
         "userAgent",
         executor,
-        null,
-        null,
+        sslSocketFactory,
+        hostnameVerifier,
         ConnectionSpec.CLEARTEXT,
         DEFAULT_MAX_MESSAGE_SIZE,
         (InetSocketAddress) serverSocket.getLocalSocketAddress(),
-        null,
-        null,
-        tooManyPingsRunnable);
+        proxyUser,
+        proxyPassword,
+        tooManyPingsRunnable,
+        transportTracer);
     clientTransport.start(transportListener);
 
     Socket sock = serverSocket.accept();
@@ -1545,14 +1600,15 @@
         "authority",
         "userAgent",
         executor,
-        null,
-        null,
+        sslSocketFactory,
+        hostnameVerifier,
         ConnectionSpec.CLEARTEXT,
         DEFAULT_MAX_MESSAGE_SIZE,
         (InetSocketAddress) serverSocket.getLocalSocketAddress(),
-        null,
-        null,
-        tooManyPingsRunnable);
+        proxyUser,
+        proxyPassword,
+        tooManyPingsRunnable,
+        transportTracer);
     clientTransport.start(transportListener);
 
     Socket sock = serverSocket.accept();
@@ -1576,14 +1632,15 @@
         "authority",
         "userAgent",
         executor,
-        null,
-        null,
+        sslSocketFactory,
+        hostnameVerifier,
         ConnectionSpec.CLEARTEXT,
         DEFAULT_MAX_MESSAGE_SIZE,
         InetSocketAddress.createUnresolved("unresolvedproxy", 80),
-        null,
-        null,
-        tooManyPingsRunnable);
+        proxyUser,
+        proxyPassword,
+        tooManyPingsRunnable,
+        transportTracer);
     clientTransport.start(transportListener);
 
     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
index 7609d0e..7797cd5 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
@@ -19,12 +19,15 @@
 import io.grpc.ServerStreamTracer;
 import io.grpc.internal.AccessProtectedHack;
 import io.grpc.internal.ClientTransportFactory;
+import io.grpc.internal.FakeClock;
 import io.grpc.internal.InternalServer;
 import io.grpc.internal.ManagedClientTransport;
+import io.grpc.internal.TransportTracer;
 import io.grpc.internal.testing.AbstractTransportTest;
 import io.grpc.netty.NettyServerBuilder;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -34,10 +37,19 @@
 /** Unit tests for OkHttp transport. */
 @RunWith(JUnit4.class)
 public class OkHttpTransportTest extends AbstractTransportTest {
+  private final FakeClock fakeClock = new FakeClock();
+  private final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
+      new TransportTracer.TimeProvider() {
+        @Override
+        public long currentTimeMillis() {
+          return fakeClock.currentTimeMillis();
+        }
+      });
   private ClientTransportFactory clientFactory = OkHttpChannelBuilder
       // Although specified here, address is ignored because we never call build.
       .forAddress("localhost", 0)
       .negotiationType(NegotiationType.PLAINTEXT)
+      .setTransportTracerFactory(fakeClockTransportTracer)
       .buildTransportFactory();
 
   @After
@@ -51,7 +63,8 @@
         NettyServerBuilder
           .forPort(0)
           .flowControlWindow(65 * 1024),
-        streamTracerFactories);
+        streamTracerFactories,
+        fakeClockTransportTracer);
   }
 
   @Override
@@ -62,7 +75,8 @@
         NettyServerBuilder
             .forPort(port)
             .flowControlWindow(65 * 1024),
-        streamTracerFactories);
+        streamTracerFactories,
+        fakeClockTransportTracer);
   }
 
   @Override
@@ -80,9 +94,24 @@
         null /* proxy */);
   }
 
+  @Override
+  protected void advanceClock(long offset, TimeUnit unit) {
+    fakeClock.forwardNanos(unit.toNanos(offset));
+  }
+
+  @Override
+  protected long currentTimeMillis() {
+    return fakeClock.currentTimeMillis();
+  }
+
   // TODO(ejona): Flaky/Broken
   @Test
   @Ignore
   @Override
   public void flowControlPushBack() {}
+
+  @Override
+  protected boolean haveTransportTracer() {
+    return true;
+  }
 }