core,okhttp: add TransportTracer to okhttpclient (#3809)
diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
index 555d634..8b2bcf1 100644
--- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
@@ -16,6 +16,7 @@
package io.grpc.internal;
+import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
@@ -86,7 +87,6 @@
void cancel(Status status);
}
- @Nullable // okhttp does not support transportTracer yet
private final TransportTracer transportTracer;
private final Framer framer;
private boolean useGet;
@@ -102,11 +102,11 @@
protected AbstractClientStream(
WritableBufferAllocator bufferAllocator,
StatsTraceContext statsTraceCtx,
- @Nullable TransportTracer transportTracer,
+ TransportTracer transportTracer,
Metadata headers,
boolean useGet) {
- Preconditions.checkNotNull(headers, "headers");
- this.transportTracer = transportTracer;
+ checkNotNull(headers, "headers");
+ this.transportTracer = checkNotNull(transportTracer, "transportTracer");
this.useGet = useGet;
if (!useGet) {
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
@@ -217,9 +217,9 @@
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
- @Nullable TransportTracer transportTracer) {
+ TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
- this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
+ this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
}
private void setFullStreamDecompression(boolean fullStreamDecompression) {
@@ -229,13 +229,13 @@
private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
Preconditions.checkState(this.listener == null, "Already called start");
this.decompressorRegistry =
- Preconditions.checkNotNull(decompressorRegistry, "decompressorRegistry");
+ checkNotNull(decompressorRegistry, "decompressorRegistry");
}
@VisibleForTesting
public final void setListener(ClientStreamListener listener) {
Preconditions.checkState(this.listener == null, "Already called setListener");
- this.listener = Preconditions.checkNotNull(listener, "listener");
+ this.listener = checkNotNull(listener, "listener");
}
@Override
@@ -308,7 +308,7 @@
* @param frame the received data frame. Its ownership is transferred to this method.
*/
protected void inboundDataReceived(ReadableBuffer frame) {
- Preconditions.checkNotNull(frame, "frame");
+ checkNotNull(frame, "frame");
boolean needToCloseFrame = true;
try {
if (statusReported) {
@@ -332,8 +332,8 @@
* @param status the status extracted from the trailers
*/
protected void inboundTrailersReceived(Metadata trailers, Status status) {
- Preconditions.checkNotNull(status, "status");
- Preconditions.checkNotNull(trailers, "trailers");
+ checkNotNull(status, "status");
+ checkNotNull(trailers, "trailers");
if (statusReported) {
log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
new Object[]{status, trailers});
@@ -356,8 +356,8 @@
*/
public final void transportReportStatus(final Status status, boolean stopDelivery,
final Metadata trailers) {
- Preconditions.checkNotNull(status, "status");
- Preconditions.checkNotNull(trailers, "trailers");
+ checkNotNull(status, "status");
+ checkNotNull(trailers, "trailers");
// If stopDelivery, we continue in case previous invocation is waiting for stall
if (statusReported && !stopDelivery) {
return;
@@ -404,8 +404,8 @@
private byte[] payload;
public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
- this.headers = Preconditions.checkNotNull(headers, "headers");
- this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
+ this.headers = checkNotNull(headers, "headers");
+ this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
}
@Override
diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index 8bb9df5..7f6ec15 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -24,7 +24,6 @@
import io.grpc.Compressor;
import io.grpc.Decompressor;
import java.io.InputStream;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
@@ -108,7 +107,6 @@
private Deframer deframer;
private final Object onReadyLock = new Object();
private final StatsTraceContext statsTraceCtx;
- @Nullable // okhttp transports don't trace yet
private final TransportTracer transportTracer;
/**
@@ -133,9 +131,9 @@
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
- @Nullable TransportTracer transportTracer) { // nullable: okhttp transports don't trace yet
+ TransportTracer transportTracer) {
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
- this.transportTracer = transportTracer;
+ this.transportTracer = checkNotNull(transportTracer, "transportTracer");
deframer = new MessageDeframer(
this,
Codec.Identity.NONE,
@@ -234,9 +232,7 @@
allocated = true;
}
notifyIfReady();
- if (transportTracer != null) {
- transportTracer.reportStreamStarted();
- }
+ transportTracer.reportStreamStarted();
}
/**
diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java
index 6fb09f4..a941b86 100644
--- a/core/src/main/java/io/grpc/internal/MessageDeframer.java
+++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java
@@ -88,8 +88,6 @@
private Listener listener;
private int maxInboundMessageSize;
private final StatsTraceContext statsTraceCtx;
- // transportTracer is nullable until it is integrated with client transports
- @Nullable
private final TransportTracer transportTracer;
private final String debugString;
private Decompressor decompressor;
@@ -123,13 +121,13 @@
Decompressor decompressor,
int maxMessageSize,
StatsTraceContext statsTraceCtx,
- @Nullable TransportTracer transportTracer,
+ TransportTracer transportTracer,
String debugString) {
this.listener = checkNotNull(listener, "sink");
this.decompressor = checkNotNull(decompressor, "decompressor");
this.maxInboundMessageSize = maxMessageSize;
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
- this.transportTracer = transportTracer;
+ this.transportTracer = checkNotNull(transportTracer, "transportTracer");
this.debugString = debugString;
}
@@ -395,9 +393,7 @@
currentMessageSeqNo++;
statsTraceCtx.inboundMessage(currentMessageSeqNo);
- if (transportTracer != null) {
- transportTracer.reportMessageReceived();
- }
+ transportTracer.reportMessageReceived();
// Continue reading the frame body.
state = State.BODY;
}
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index 3dd05a6..41ffc0e 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -38,6 +38,7 @@
import io.grpc.internal.ProxyParameters;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;
+import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.Platform;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -127,6 +128,13 @@
}
}
+ @VisibleForTesting
+ final OkHttpChannelBuilder setTransportTracerFactory(
+ TransportTracer.Factory transportTracerFactory) {
+ this.transportTracerFactory = transportTracerFactory;
+ return this;
+ }
+
/**
* Override the default executor necessary for internal transport use.
*
@@ -310,7 +318,8 @@
boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED;
return new OkHttpTransportFactory(transportExecutor,
createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(),
- enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls);
+ enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
+ transportTracerFactory);
}
@Override
@@ -376,6 +385,7 @@
static final class OkHttpTransportFactory implements ClientTransportFactory {
private final Executor executor;
private final boolean usingSharedExecutor;
+ private final TransportTracer.Factory transportTracerFactory;
@Nullable
private final SSLSocketFactory socketFactory;
@Nullable
@@ -398,7 +408,8 @@
boolean enableKeepAlive,
long keepAliveTimeNanos,
long keepAliveTimeoutNanos,
- boolean keepAliveWithoutCalls) {
+ boolean keepAliveWithoutCalls,
+ TransportTracer.Factory transportTracerFactory) {
this.socketFactory = socketFactory;
this.hostnameVerifier = hostnameVerifier;
this.connectionSpec = connectionSpec;
@@ -409,6 +420,8 @@
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
usingSharedExecutor = executor == null;
+ this.transportTracerFactory =
+ Preconditions.checkNotNull(transportTracerFactory, "transportTracerFactory");
if (usingSharedExecutor) {
// The executor was unspecified, using the shared executor.
this.executor = SharedResourceHolder.get(SHARED_EXECUTOR);
@@ -444,7 +457,8 @@
proxy == null ? null : proxy.proxyAddress,
proxy == null ? null : proxy.username,
proxy == null ? null : proxy.password,
- tooManyPingsRunnable);
+ tooManyPingsRunnable,
+ transportTracerFactory.create());
if (enableKeepAlive) {
transport.enableKeepAlive(
true, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls);
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
index ee4a221..8349fd6 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
@@ -27,6 +27,7 @@
import io.grpc.internal.AbstractClientStream;
import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header;
@@ -69,11 +70,12 @@
int maxMessageSize,
String authority,
String userAgent,
- StatsTraceContext statsTraceCtx) {
+ StatsTraceContext statsTraceCtx,
+ TransportTracer transportTracer) {
super(
new OkHttpWritableBufferAllocator(),
statsTraceCtx,
- /*transportTracer=*/ null,
+ transportTracer,
headers,
method.isSafe());
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
@@ -152,6 +154,7 @@
synchronized (state.lock) {
state.sendBuffer(buffer, endOfStream, flush);
+ getTransportTracer().reportMessageSent(numMessages);
}
}
@@ -200,7 +203,7 @@
AsyncFrameWriter frameWriter,
OutboundFlowController outboundFlow,
OkHttpClientTransport transport) {
- super(maxMessageSize, statsTraceCtx, /*transportTracer=*/ null);
+ super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer());
this.lock = checkNotNull(lock, "lock");
this.frameWriter = frameWriter;
this.outboundFlow = outboundFlow;
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
index 579aa71..2e4e6ad 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
@@ -46,6 +46,7 @@
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.FrameReader;
@@ -181,6 +182,8 @@
@Nullable
private final String proxyPassword;
private final Runnable tooManyPingsRunnable;
+ @GuardedBy("lock")
+ private final TransportTracer transportTracer;
// The following fields should only be used for test.
Runnable connectingCallback;
@@ -190,7 +193,8 @@
Executor executor, @Nullable SSLSocketFactory sslSocketFactory,
@Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec,
int maxMessageSize, @Nullable InetSocketAddress proxyAddress, @Nullable String proxyUsername,
- @Nullable String proxyPassword, Runnable tooManyPingsRunnable) {
+ @Nullable String proxyPassword, Runnable tooManyPingsRunnable,
+ TransportTracer transportTracer) {
this.address = Preconditions.checkNotNull(address, "address");
this.defaultAuthority = authority;
this.maxMessageSize = maxMessageSize;
@@ -209,6 +213,8 @@
this.proxyPassword = proxyPassword;
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
+ this.transportTracer = Preconditions.checkNotNull(transportTracer);
+ initTransportTracer();
}
/**
@@ -226,7 +232,8 @@
@Nullable Runnable connectingCallback,
SettableFuture<Void> connectedFuture,
int maxMessageSize,
- Runnable tooManyPingsRunnable) {
+ Runnable tooManyPingsRunnable,
+ TransportTracer transportTracer) {
address = null;
this.maxMessageSize = maxMessageSize;
defaultAuthority = "notarealauthority:80";
@@ -246,6 +253,23 @@
this.proxyPassword = null;
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
+ this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
+ initTransportTracer();
+ }
+
+ private void initTransportTracer() {
+ synchronized (lock) { // to make @GuardedBy linter happy
+ transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
+ @Override
+ public TransportTracer.FlowControlWindows read() {
+ synchronized (lock) {
+ long local = -1; // okhttp does not track the local window size
+ long remote = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
+ return new TransportTracer.FlowControlWindows(local, remote);
+ }
+ }
+ });
+ }
}
/**
@@ -286,6 +310,7 @@
stopwatch.start();
p = ping = new Http2Ping(data, stopwatch);
writePing = true;
+ transportTracer.reportKeepAliveSent();
}
}
if (writePing) {
@@ -302,8 +327,18 @@
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers");
StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
- return new OkHttpClientStream(method, headers, frameWriter, OkHttpClientTransport.this,
- outboundFlow, lock, maxMessageSize, defaultAuthority, userAgent, statsTraceCtx);
+ return new OkHttpClientStream(
+ method,
+ headers,
+ frameWriter,
+ OkHttpClientTransport.this,
+ outboundFlow,
+ lock,
+ maxMessageSize,
+ defaultAuthority,
+ userAgent,
+ statsTraceCtx,
+ transportTracer);
}
@GuardedBy("lock")
@@ -858,9 +893,11 @@
@Override
public Future<InternalTransportStats> getTransportStats() {
- SettableFuture<InternalTransportStats> ret = SettableFuture.create();
- ret.set(null);
- return ret;
+ synchronized (lock) {
+ SettableFuture<InternalTransportStats> ret = SettableFuture.create();
+ ret.set(transportTracer.getStats());
+ return ret;
+ }
}
/**
diff --git a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java
index 983f24a..71cdfe3 100644
--- a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java
+++ b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java
@@ -23,7 +23,9 @@
public final class AccessProtectedHack {
public static InternalServer serverBuilderBuildTransportServer(
AbstractServerImplBuilder<?> builder,
- List<ServerStreamTracer.Factory> streamTracerFactories) {
+ List<ServerStreamTracer.Factory> streamTracerFactories,
+ TransportTracer.Factory transportTracerFactory) {
+ builder.transportTracerFactory = transportTracerFactory;
return builder.buildTransportServer(streamTracerFactories);
}
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
index 1a249c4..007e35f 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
@@ -33,6 +33,7 @@
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.Header;
import java.io.ByteArrayInputStream;
@@ -62,6 +63,7 @@
@Captor private ArgumentCaptor<List<Header>> headersCaptor;
private final Object lock = new Object();
+ private final TransportTracer transportTracer = new TransportTracer();
private MethodDescriptor<?, ?> methodDescriptor;
private OkHttpClientStream stream;
@@ -76,8 +78,18 @@
.setResponseMarshaller(marshaller)
.build();
- stream = new OkHttpClientStream(methodDescriptor, new Metadata(), frameWriter, transport,
- flowController, lock, MAX_MESSAGE_SIZE, "localhost", "userAgent", StatsTraceContext.NOOP);
+ stream = new OkHttpClientStream(
+ methodDescriptor,
+ new Metadata(),
+ frameWriter,
+ transport,
+ flowController,
+ lock,
+ MAX_MESSAGE_SIZE,
+ "localhost",
+ "userAgent",
+ StatsTraceContext.NOOP,
+ transportTracer);
}
@Test
@@ -134,7 +146,7 @@
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
- StatsTraceContext.NOOP);
+ StatsTraceContext.NOOP, transportTracer);
stream.start(new BaseClientStreamListener());
stream.transportState().start(3);
@@ -149,7 +161,7 @@
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
- StatsTraceContext.NOOP);
+ StatsTraceContext.NOOP, transportTracer);
stream.start(new BaseClientStreamListener());
stream.transportState().start(3);
@@ -177,7 +189,7 @@
.build();
stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
- StatsTraceContext.NOOP);
+ StatsTraceContext.NOOP, transportTracer);
stream.start(new BaseClientStreamListener());
// GET streams send headers after halfClose is called.
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
index 5bc0ed5..2ecfc77 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
@@ -54,6 +54,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions;
import io.grpc.InternalStatus;
+import io.grpc.InternalTransportStats;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
@@ -65,6 +66,7 @@
import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
+import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
@@ -92,6 +94,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLSocketFactory;
import okio.Buffer;
import okio.ByteString;
import org.junit.After;
@@ -127,9 +131,16 @@
@Mock
private ManagedClientTransport.Listener transportListener;
+
+ private final SSLSocketFactory sslSocketFactory = null;
+ private final HostnameVerifier hostnameVerifier = null;
+ private final InetSocketAddress proxyAddr = null;
+ private final String proxyUser = null;
+ private final String proxyPassword = null;
+ private final TransportTracer transportTracer = new TransportTracer();
private OkHttpClientTransport clientTransport;
private MockFrameReader frameReader;
- private ExecutorService executor;
+ private ExecutorService executor = Executors.newCachedThreadPool();
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
private SettableFuture<Void> connectedFuture;
private DelayConnectedCallback delayConnectedCallback;
@@ -143,7 +154,6 @@
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- executor = Executors.newCachedThreadPool();
when(frameWriter.maxDataLength()).thenReturn(Integer.MAX_VALUE);
frameReader = new MockFrameReader();
}
@@ -192,7 +202,8 @@
connectingCallback,
connectedFuture,
maxMessageSize,
- tooManyPingsRunnable);
+ tooManyPingsRunnable,
+ new TransportTracer());
clientTransport.start(transportListener);
if (waitingForConnected) {
connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
@@ -203,10 +214,19 @@
public void testToString() throws Exception {
InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
clientTransport = new OkHttpClientTransport(
- address, "hostname", null /* agent */, executor, /* sslSocketFactory */ null,
- /* hostnameVerifier */null,
- Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC), DEFAULT_MAX_MESSAGE_SIZE,
- null, null, null, tooManyPingsRunnable);
+ address,
+ "hostname",
+ /*agent=*/ null,
+ executor,
+ sslSocketFactory,
+ hostnameVerifier,
+ Utils.convertSpec(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC),
+ DEFAULT_MAX_MESSAGE_SIZE,
+ proxyAddr,
+ proxyUser,
+ proxyPassword,
+ tooManyPingsRunnable,
+ transportTracer);
String s = clientTransport.toString();
assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport"));
assertTrue("Unexpected: " + s, s.contains(address.toString()));
@@ -525,6 +545,30 @@
}
@Test
+ public void transportTracer_windowSizeDefault() throws Exception {
+ initTransport();
+ InternalTransportStats stats = clientTransport.getTransportStats().get();
+ assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
+ // okhttp does not track local window sizes
+ assertEquals(-1, stats.localFlowControlWindow);
+ }
+
+ @Test
+ public void transportTracer_windowSize_remote() throws Exception {
+ initTransport();
+ InternalTransportStats before = clientTransport.getTransportStats().get();
+ assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
+ // okhttp does not track local window sizes
+ assertEquals(-1, before.localFlowControlWindow);
+
+ frameHandler().windowUpdate(0, 1000);
+ InternalTransportStats after = clientTransport.getTransportStats().get();
+ assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow);
+ // okhttp does not track local window sizes
+ assertEquals(-1, after.localFlowControlWindow);
+ }
+
+ @Test
public void windowUpdate() throws Exception {
initTransport();
MockStreamListener listener1 = new MockStreamListener();
@@ -1234,9 +1278,11 @@
initTransport();
PingCallbackImpl callback1 = new PingCallbackImpl();
clientTransport.ping(callback1, MoreExecutors.directExecutor());
+ assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
// add'l ping will be added as listener to outstanding operation
PingCallbackImpl callback2 = new PingCallbackImpl();
clientTransport.ping(callback2, MoreExecutors.directExecutor());
+ assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class);
ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class);
@@ -1269,6 +1315,7 @@
// now that previous ping is done, next request returns a different future
callback1 = new PingCallbackImpl();
clientTransport.ping(callback1, MoreExecutors.directExecutor());
+ assertEquals(2, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(0, callback1.invocationCount);
shutdownAndVerify();
}
@@ -1278,6 +1325,7 @@
initTransport();
PingCallbackImpl callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
+ assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(0, callback.invocationCount);
clientTransport.shutdown(SHUTDOWN_REASON);
@@ -1289,6 +1337,7 @@
// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
+ assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
@@ -1300,6 +1349,7 @@
initTransport();
PingCallbackImpl callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
+ assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(0, callback.invocationCount);
clientTransport.onException(new IOException());
@@ -1312,6 +1362,7 @@
// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
+ assertEquals(1, clientTransport.getTransportStats().get().keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
@@ -1392,14 +1443,15 @@
"invalid_authority",
"userAgent",
executor,
- null,
- null,
+ sslSocketFactory,
+ hostnameVerifier,
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE,
- null,
- null,
- null,
- tooManyPingsRunnable);
+ proxyAddr,
+ proxyUser,
+ proxyPassword,
+ tooManyPingsRunnable,
+ transportTracer);
String host = clientTransport.getOverridenHost();
int port = clientTransport.getOverridenPort();
@@ -1415,14 +1467,15 @@
"authority",
"userAgent",
executor,
- null,
- null,
+ sslSocketFactory,
+ hostnameVerifier,
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE,
- null,
- null,
- null,
- tooManyPingsRunnable);
+ proxyAddr,
+ proxyUser,
+ proxyPassword,
+ tooManyPingsRunnable,
+ new TransportTracer());
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
clientTransport.start(listener);
@@ -1446,14 +1499,15 @@
"authority",
"userAgent",
executor,
- null,
- null,
+ sslSocketFactory,
+ hostnameVerifier,
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(),
- null,
- null,
- tooManyPingsRunnable);
+ proxyUser,
+ proxyPassword,
+ tooManyPingsRunnable,
+ transportTracer);
clientTransport.start(transportListener);
Socket sock = serverSocket.accept();
@@ -1496,14 +1550,15 @@
"authority",
"userAgent",
executor,
- null,
- null,
+ sslSocketFactory,
+ hostnameVerifier,
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(),
- null,
- null,
- tooManyPingsRunnable);
+ proxyUser,
+ proxyPassword,
+ tooManyPingsRunnable,
+ transportTracer);
clientTransport.start(transportListener);
Socket sock = serverSocket.accept();
@@ -1545,14 +1600,15 @@
"authority",
"userAgent",
executor,
- null,
- null,
+ sslSocketFactory,
+ hostnameVerifier,
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE,
(InetSocketAddress) serverSocket.getLocalSocketAddress(),
- null,
- null,
- tooManyPingsRunnable);
+ proxyUser,
+ proxyPassword,
+ tooManyPingsRunnable,
+ transportTracer);
clientTransport.start(transportListener);
Socket sock = serverSocket.accept();
@@ -1576,14 +1632,15 @@
"authority",
"userAgent",
executor,
- null,
- null,
+ sslSocketFactory,
+ hostnameVerifier,
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE,
InetSocketAddress.createUnresolved("unresolvedproxy", 80),
- null,
- null,
- tooManyPingsRunnable);
+ proxyUser,
+ proxyPassword,
+ tooManyPingsRunnable,
+ transportTracer);
clientTransport.start(transportListener);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
index 7609d0e..7797cd5 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
@@ -19,12 +19,15 @@
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AccessProtectedHack;
import io.grpc.internal.ClientTransportFactory;
+import io.grpc.internal.FakeClock;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
+import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.AbstractTransportTest;
import io.grpc.netty.NettyServerBuilder;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@@ -34,10 +37,19 @@
/** Unit tests for OkHttp transport. */
@RunWith(JUnit4.class)
public class OkHttpTransportTest extends AbstractTransportTest {
+ private final FakeClock fakeClock = new FakeClock();
+ private final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
+ new TransportTracer.TimeProvider() {
+ @Override
+ public long currentTimeMillis() {
+ return fakeClock.currentTimeMillis();
+ }
+ });
private ClientTransportFactory clientFactory = OkHttpChannelBuilder
// Although specified here, address is ignored because we never call build.
.forAddress("localhost", 0)
.negotiationType(NegotiationType.PLAINTEXT)
+ .setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportFactory();
@After
@@ -51,7 +63,8 @@
NettyServerBuilder
.forPort(0)
.flowControlWindow(65 * 1024),
- streamTracerFactories);
+ streamTracerFactories,
+ fakeClockTransportTracer);
}
@Override
@@ -62,7 +75,8 @@
NettyServerBuilder
.forPort(port)
.flowControlWindow(65 * 1024),
- streamTracerFactories);
+ streamTracerFactories,
+ fakeClockTransportTracer);
}
@Override
@@ -80,9 +94,24 @@
null /* proxy */);
}
+ @Override
+ protected void advanceClock(long offset, TimeUnit unit) {
+ fakeClock.forwardNanos(unit.toNanos(offset));
+ }
+
+ @Override
+ protected long currentTimeMillis() {
+ return fakeClock.currentTimeMillis();
+ }
+
// TODO(ejona): Flaky/Broken
@Test
@Ignore
@Override
public void flowControlPushBack() {}
+
+ @Override
+ protected boolean haveTransportTracer() {
+ return true;
+ }
}