Merge DelayedStream's setError() into cancel()
DelayedClientTransport.PendingStream will override cancel(), which has a
clearer semantic.
Also permitting all status codes except OK in ClientStream.cancel(),
instead of just 4 codes.
diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
index 563cf6c..2feb1b7 100644
--- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
@@ -34,7 +34,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static io.grpc.internal.GrpcUtil.CANCEL_REASONS;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
@@ -292,7 +291,7 @@
*/
@Override
public final void cancel(Status reason) {
- checkArgument(CANCEL_REASONS.contains(reason.getCode()), "Invalid cancellation reason");
+ checkArgument(!reason.isOk(), "Should not cancel with OK status");
cancelled = true;
sendCancel(reason);
dispose();
diff --git a/core/src/main/java/io/grpc/internal/ClientStream.java b/core/src/main/java/io/grpc/internal/ClientStream.java
index ee5f5a6..9d875a2 100644
--- a/core/src/main/java/io/grpc/internal/ClientStream.java
+++ b/core/src/main/java/io/grpc/internal/ClientStream.java
@@ -46,9 +46,7 @@
* period until {@link ClientStreamListener#closed} is called. This method is safe to be called
* at any time and multiple times and from any thread.
*
- * @param reason must have {@link io.grpc.Status.Code#CANCELLED},
- * {@link io.grpc.Status.Code#DEADLINE_EXCEEDED}, {@link io.grpc.Status.Code#INTERNAL},
- * or {@link io.grpc.Status.Code#UNKNOWN}
+ * @param reason must be non-OK
*/
void cancel(Status reason);
diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
index 0b329d1..6b861a4 100644
--- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
+++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
@@ -95,9 +95,7 @@
return pendingStream;
}
}
- DelayedStream stream = new DelayedStream();
- stream.setError(Status.UNAVAILABLE.withDescription("transport shutdown"));
- return stream;
+ return new FailingClientStream(Status.UNAVAILABLE.withDescription("transport shutdown"));
}
@Override
@@ -164,7 +162,7 @@
}
if (savedPendingStreams != null) {
for (PendingStream stream : savedPendingStreams) {
- stream.setError(status);
+ stream.cancel(status);
}
listener.transportTerminated();
}
@@ -245,10 +243,9 @@
setStream(transport.newStream(method, headers));
}
- // TODO(zhangkun83): DelayedStream.setError() doesn't have a clearly-defined semantic to be
- // overriden. Make it clear or find another method to override.
@Override
- void setError(Status reason) {
+ public void cancel(Status reason) {
+ super.cancel(reason);
synchronized (lock) {
if (pendingStreams != null) {
pendingStreams.remove(this);
@@ -258,7 +255,6 @@
}
}
}
- super.setError(reason);
}
}
diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java
index a18f7a8..c615293 100644
--- a/core/src/main/java/io/grpc/internal/DelayedStream.java
+++ b/core/src/main/java/io/grpc/internal/DelayedStream.java
@@ -34,6 +34,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.Metadata;
@@ -53,8 +55,6 @@
* DelayedStream} may be internally altered by different threads, thus internal synchronization is
* necessary.
*/
-// TODO(zhangkun83): merge it with DelayedClientTransport.PendingStream as it will be no longer
-// needed by ClientCallImpl as we move away from ListenableFuture<ClientTransport>
class DelayedStream implements ClientStream {
// set to non null once both listener and realStream are valid. After this point it is safe
@@ -163,13 +163,16 @@
startedRealStream = realStream;
}
- void setStream(ClientStream stream) {
+ /**
+ * Transfers all pending and future requests and mutations to the given stream.
+ *
+ * <p>No-op if either this method or {@link #cancel} have already been called.
+ */
+ final void setStream(ClientStream stream) {
synchronized (this) {
- if (error != null) {
- // If there is an error, unstartedStream will be a Noop.
+ if (error != null || realStream != null) {
return;
}
- checkState(realStream == null, "Stream already created: %s", realStream);
realStream = checkNotNull(stream, "stream");
// listener can only be non-null if start has already been called.
if (listener != null) {
@@ -178,21 +181,6 @@
}
}
- void setError(Status reason) {
- synchronized (this) {
- // If the client has already cancelled the stream don't bother keeping the next error.
- if (error == null) {
- error = checkNotNull(reason);
- realStream = NoopClientStream.INSTANCE;
- if (listener != null) {
- listener.closed(error, new Metadata());
- // call startStream anyways to drain pending messages.
- startStream();
- }
- }
- }
- }
-
@Override
public void writeMessage(InputStream message) {
if (startedRealStream == null) {
@@ -221,15 +209,34 @@
@Override
public void cancel(Status reason) {
- if (startedRealStream == null) {
+ // At least one of them is null.
+ ClientStream streamToBeCancelled = startedRealStream;
+ ClientStreamListener listenerToBeCalled = null;
+ if (streamToBeCancelled == null) {
synchronized (this) {
- if (startedRealStream == null) {
- setError(reason);
- return;
- }
+ if (realStream != null) {
+ // realStream already set. Just cancel it.
+ streamToBeCancelled = realStream;
+ } else if (error == null) {
+ // Neither realStream and error are set. Will set the error and call the listener if
+ // it's set.
+ error = checkNotNull(reason);
+ realStream = NoopClientStream.INSTANCE;
+ if (listener != null) {
+ // call startStream anyways to drain pending messages.
+ startStream();
+ listenerToBeCalled = listener;
+ }
+ } // else: error already set, do nothing.
}
}
- startedRealStream.cancel(reason);
+ if (listenerToBeCalled != null) {
+ Preconditions.checkState(streamToBeCancelled == null, "unexpected streamToBeCancelled");
+ listenerToBeCalled.closed(reason, new Metadata());
+ }
+ if (streamToBeCancelled != null) {
+ streamToBeCancelled.cancel(reason);
+ }
}
@Override
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 8ea81b0..7d28bc8 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -32,8 +32,6 @@
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument;
-import static io.grpc.Status.Code.CANCELLED;
-import static io.grpc.Status.Code.DEADLINE_EXCEEDED;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -51,9 +49,7 @@
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.EnumSet;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -146,12 +142,6 @@
*/
public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192;
- /**
- * The set of valid status codes for client cancellation.
- */
- public static final Set<Status.Code> CANCEL_REASONS =
- EnumSet.of(CANCELLED, DEADLINE_EXCEEDED, Status.Code.INTERNAL, Status.Code.UNKNOWN);
-
public static final Splitter ACCEPT_ENCODING_SPLITER = Splitter.on(',').trimResults();
public static final Joiner ACCEPT_ENCODING_JOINER = Joiner.on(',');
diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
index 3bd8db5..4ccb4e8 100644
--- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
@@ -83,13 +83,12 @@
};
@Test
- public void cancel_onlyExpectedCodesAccepted() {
+ public void cancel_doNotAcceptOk() {
for (Code code : Code.values()) {
ClientStreamListener listener = new BaseClientStreamListener();
AbstractClientStream<Integer> stream = new BaseAbstractClientStream<Integer>(allocator);
stream.start(listener);
- if (code == Code.DEADLINE_EXCEEDED || code == Code.CANCELLED || code == Code.INTERNAL
- || code == Code.UNKNOWN) {
+ if (code != Code.OK) {
stream.cancel(Status.fromCodeValue(code.value()));
} else {
try {
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index 8b6d47e..da059b9 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -109,7 +109,6 @@
@Mock private ClientStreamListener streamListener;
@Mock private ClientTransport clientTransport;
- @Mock private DelayedStream delayedStream;
@Captor private ArgumentCaptor<Status> statusCaptor;
@Mock
diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
index b35e935..229a643 100644
--- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
@@ -33,6 +33,7 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.same;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -138,21 +139,41 @@
}
@Test
- public void setStream_cantCreateTwice() {
+ public void startThenCancelled() {
stream.start(listener);
- // The first call will be a success
- stream.setStream(realStream);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Stream already created");
-
- stream.setStream(realStream);
+ stream.cancel(Status.CANCELLED);
+ verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class));
}
@Test
- public void streamCancelled() {
+ public void startThenSetStreamThenCancelled() {
+ stream.start(listener);
+ stream.setStream(realStream);
+ stream.cancel(Status.CANCELLED);
+ verify(realStream).start(same(listener));
+ verify(realStream).cancel(same(Status.CANCELLED));
+ }
+
+ @Test
+ public void setStreamThenStartThenCancelled() {
+ stream.setStream(realStream);
stream.start(listener);
stream.cancel(Status.CANCELLED);
+ verify(realStream).start(same(listener));
+ verify(realStream).cancel(same(Status.CANCELLED));
+ }
+
+ @Test
+ public void setStreamThenCancelled() {
+ stream.setStream(realStream);
+ stream.cancel(Status.CANCELLED);
+ verify(realStream).cancel(same(Status.CANCELLED));
+ }
+
+ @Test
+ public void cancelledThenStart() {
+ stream.cancel(Status.CANCELLED);
+ stream.start(listener);
verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class));
}
}
diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
index 351498f..2540075 100644
--- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
+++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
@@ -31,8 +31,6 @@
package io.grpc.netty;
-import static io.grpc.internal.GrpcUtil.CANCEL_REASONS;
-
import com.google.common.base.Preconditions;
import io.grpc.Status;
@@ -47,8 +45,7 @@
CancelClientStreamCommand(NettyClientStream stream, Status reason) {
this.stream = Preconditions.checkNotNull(stream, "stream");
Preconditions.checkNotNull(reason);
- Preconditions.checkArgument(CANCEL_REASONS.contains(reason.getCode()),
- "Invalid cancellation reason");
+ Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
this.reason = reason;
}