Merge DelayedStream's setError() into cancel()

DelayedClientTransport.PendingStream will override cancel(), which has a
clearer semantic.

Also permitting all status codes except OK in ClientStream.cancel(),
instead of just 4 codes.
diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
index 563cf6c..2feb1b7 100644
--- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
@@ -34,7 +34,6 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static io.grpc.internal.GrpcUtil.CANCEL_REASONS;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
@@ -292,7 +291,7 @@
    */
   @Override
   public final void cancel(Status reason) {
-    checkArgument(CANCEL_REASONS.contains(reason.getCode()), "Invalid cancellation reason");
+    checkArgument(!reason.isOk(), "Should not cancel with OK status");
     cancelled = true;
     sendCancel(reason);
     dispose();
diff --git a/core/src/main/java/io/grpc/internal/ClientStream.java b/core/src/main/java/io/grpc/internal/ClientStream.java
index ee5f5a6..9d875a2 100644
--- a/core/src/main/java/io/grpc/internal/ClientStream.java
+++ b/core/src/main/java/io/grpc/internal/ClientStream.java
@@ -46,9 +46,7 @@
    * period until {@link ClientStreamListener#closed} is called. This method is safe to be called
    * at any time and multiple times and from any thread.
    *
-   * @param reason must have {@link io.grpc.Status.Code#CANCELLED},
-   *     {@link io.grpc.Status.Code#DEADLINE_EXCEEDED}, {@link io.grpc.Status.Code#INTERNAL},
-   *     or {@link io.grpc.Status.Code#UNKNOWN}
+   * @param reason must be non-OK
    */
   void cancel(Status reason);
 
diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
index 0b329d1..6b861a4 100644
--- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
+++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
@@ -95,9 +95,7 @@
         return pendingStream;
       }
     }
-    DelayedStream stream = new DelayedStream();
-    stream.setError(Status.UNAVAILABLE.withDescription("transport shutdown"));
-    return stream;
+    return new FailingClientStream(Status.UNAVAILABLE.withDescription("transport shutdown"));
   }
 
   @Override
@@ -164,7 +162,7 @@
     }
     if (savedPendingStreams != null) {
       for (PendingStream stream : savedPendingStreams) {
-        stream.setError(status);
+        stream.cancel(status);
       }
       listener.transportTerminated();
     }
@@ -245,10 +243,9 @@
       setStream(transport.newStream(method, headers));
     }
 
-    // TODO(zhangkun83): DelayedStream.setError() doesn't have a clearly-defined semantic to be
-    // overriden. Make it clear or find another method to override.
     @Override
-    void setError(Status reason) {
+    public void cancel(Status reason) {
+      super.cancel(reason);
       synchronized (lock) {
         if (pendingStreams != null) {
           pendingStreams.remove(this);
@@ -258,7 +255,6 @@
           }
         }
       }
-      super.setError(reason);
     }
   }
 
diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java
index a18f7a8..c615293 100644
--- a/core/src/main/java/io/grpc/internal/DelayedStream.java
+++ b/core/src/main/java/io/grpc/internal/DelayedStream.java
@@ -34,6 +34,8 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.base.Preconditions;
+
 import io.grpc.Compressor;
 import io.grpc.Decompressor;
 import io.grpc.Metadata;
@@ -53,8 +55,6 @@
  * DelayedStream} may be internally altered by different threads, thus internal synchronization is
  * necessary.
  */
-// TODO(zhangkun83): merge it with DelayedClientTransport.PendingStream as it will be no longer
-// needed by ClientCallImpl as we move away from ListenableFuture<ClientTransport>
 class DelayedStream implements ClientStream {
 
   // set to non null once both listener and realStream are valid.  After this point it is safe
@@ -163,13 +163,16 @@
     startedRealStream = realStream;
   }
 
-  void setStream(ClientStream stream) {
+  /**
+   * Transfers all pending and future requests and mutations to the given stream.
+   *
+   * <p>No-op if either this method or {@link #cancel} have already been called.
+   */
+  final void setStream(ClientStream stream) {
     synchronized (this) {
-      if (error != null) {
-        // If there is an error, unstartedStream will be a Noop.
+      if (error != null || realStream != null) {
         return;
       }
-      checkState(realStream == null, "Stream already created: %s", realStream);
       realStream = checkNotNull(stream, "stream");
       // listener can only be non-null if start has already been called.
       if (listener != null) {
@@ -178,21 +181,6 @@
     }
   }
 
-  void setError(Status reason) {
-    synchronized (this) {
-      // If the client has already cancelled the stream don't bother keeping the next error.
-      if (error == null) {
-        error = checkNotNull(reason);
-        realStream = NoopClientStream.INSTANCE;
-        if (listener != null) {
-          listener.closed(error, new Metadata());
-          // call startStream anyways to drain pending messages.
-          startStream();
-        }
-      }
-    }
-  }
-
   @Override
   public void writeMessage(InputStream message) {
     if (startedRealStream == null) {
@@ -221,15 +209,34 @@
 
   @Override
   public void cancel(Status reason) {
-    if (startedRealStream == null) {
+    // At least one of them is null.
+    ClientStream streamToBeCancelled = startedRealStream;
+    ClientStreamListener listenerToBeCalled = null;
+    if (streamToBeCancelled == null) {
       synchronized (this) {
-        if (startedRealStream == null) {
-          setError(reason);
-          return;
-        }
+        if (realStream != null) {
+          // realStream already set. Just cancel it.
+          streamToBeCancelled = realStream;
+        } else if (error == null) {
+          // Neither realStream and error are set. Will set the error and call the listener if
+          // it's set.
+          error = checkNotNull(reason);
+          realStream = NoopClientStream.INSTANCE;
+          if (listener != null) {
+            // call startStream anyways to drain pending messages.
+            startStream();
+            listenerToBeCalled = listener;
+          }
+        }  // else: error already set, do nothing.
       }
     }
-    startedRealStream.cancel(reason);
+    if (listenerToBeCalled != null) {
+      Preconditions.checkState(streamToBeCancelled == null, "unexpected streamToBeCancelled");
+      listenerToBeCalled.closed(reason, new Metadata());
+    }
+    if (streamToBeCancelled != null) {
+      streamToBeCancelled.cancel(reason);
+    }
   }
 
   @Override
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 8ea81b0..7d28bc8 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -32,8 +32,6 @@
 package io.grpc.internal;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static io.grpc.Status.Code.CANCELLED;
-import static io.grpc.Status.Code.DEADLINE_EXCEEDED;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -51,9 +49,7 @@
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.EnumSet;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -146,12 +142,6 @@
    */
   public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192;
 
-  /**
-   * The set of valid status codes for client cancellation.
-   */
-  public static final Set<Status.Code> CANCEL_REASONS =
-          EnumSet.of(CANCELLED, DEADLINE_EXCEEDED, Status.Code.INTERNAL, Status.Code.UNKNOWN);
-
   public static final Splitter ACCEPT_ENCODING_SPLITER = Splitter.on(',').trimResults();
 
   public static final Joiner ACCEPT_ENCODING_JOINER = Joiner.on(',');
diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
index 3bd8db5..4ccb4e8 100644
--- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
@@ -83,13 +83,12 @@
   };
 
   @Test
-  public void cancel_onlyExpectedCodesAccepted() {
+  public void cancel_doNotAcceptOk() {
     for (Code code : Code.values()) {
       ClientStreamListener listener = new BaseClientStreamListener();
       AbstractClientStream<Integer> stream = new BaseAbstractClientStream<Integer>(allocator);
       stream.start(listener);
-      if (code == Code.DEADLINE_EXCEEDED || code == Code.CANCELLED || code == Code.INTERNAL
-          || code == Code.UNKNOWN) {
+      if (code != Code.OK) {
         stream.cancel(Status.fromCodeValue(code.value()));
       } else {
         try {
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index 8b6d47e..da059b9 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -109,7 +109,6 @@
 
   @Mock private ClientStreamListener streamListener;
   @Mock private ClientTransport clientTransport;
-  @Mock private DelayedStream delayedStream;
   @Captor private ArgumentCaptor<Status> statusCaptor;
 
   @Mock
diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
index b35e935..229a643 100644
--- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
@@ -33,6 +33,7 @@
 
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -138,21 +139,41 @@
   }
 
   @Test
-  public void setStream_cantCreateTwice() {
+  public void startThenCancelled() {
     stream.start(listener);
-    // The first call will be a success
-    stream.setStream(realStream);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Stream already created");
-
-    stream.setStream(realStream);
+    stream.cancel(Status.CANCELLED);
+    verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class));
   }
 
   @Test
-  public void streamCancelled() {
+  public void startThenSetStreamThenCancelled() {
+    stream.start(listener);
+    stream.setStream(realStream);
+    stream.cancel(Status.CANCELLED);
+    verify(realStream).start(same(listener));
+    verify(realStream).cancel(same(Status.CANCELLED));
+  }
+
+  @Test
+  public void setStreamThenStartThenCancelled() {
+    stream.setStream(realStream);
     stream.start(listener);
     stream.cancel(Status.CANCELLED);
+    verify(realStream).start(same(listener));
+    verify(realStream).cancel(same(Status.CANCELLED));
+  }
+
+  @Test
+  public void setStreamThenCancelled() {
+    stream.setStream(realStream);
+    stream.cancel(Status.CANCELLED);
+    verify(realStream).cancel(same(Status.CANCELLED));
+  }
+
+  @Test
+  public void cancelledThenStart() {
+    stream.cancel(Status.CANCELLED);
+    stream.start(listener);
     verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class));
   }
 }
diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
index 351498f..2540075 100644
--- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
+++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
@@ -31,8 +31,6 @@
 
 package io.grpc.netty;
 
-import static io.grpc.internal.GrpcUtil.CANCEL_REASONS;
-
 import com.google.common.base.Preconditions;
 
 import io.grpc.Status;
@@ -47,8 +45,7 @@
   CancelClientStreamCommand(NettyClientStream stream, Status reason) {
     this.stream = Preconditions.checkNotNull(stream, "stream");
     Preconditions.checkNotNull(reason);
-    Preconditions.checkArgument(CANCEL_REASONS.contains(reason.getCode()),
-            "Invalid cancellation reason");
+    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
     this.reason = reason;
   }