core: move closed check from Stream.isReady() to Call.isReady() (#8566)

This fixes data race described in #8565.

We are doubtful whether checking closed in isReady() is necessary (#3201 might be a requirement), but it was easier to just maintain the existing behavior than think heavily about it.
diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index e066018..69df1ee 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -91,9 +91,6 @@
 
   @Override
   public boolean isReady() {
-    if (framer().isClosed()) {
-      return false;
-    }
     return transportState().isReady();
   }
 
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 6f850ad..db1a992 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -544,6 +544,9 @@
 
   @Override
   public boolean isReady() {
+    if (halfCloseCalled) {
+      return false;
+    }
     return stream.isReady();
   }
 
diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java
index deba21c..b31aadd 100644
--- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java
@@ -194,6 +194,9 @@
 
   @Override
   public boolean isReady() {
+    if (closeCalled) {
+      return false;
+    }
     return stream.isReady();
   }
 
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index ecf7a90..e409f2f 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -1023,6 +1023,24 @@
   }
 
   @Test
+  public void halfClosedShouldNotBeReady() {
+    when(stream.isReady()).thenReturn(true);
+    ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
+        method,
+        MoreExecutors.directExecutor(),
+        baseCallOptions,
+        clientStreamProvider,
+        deadlineCancellationExecutor,
+        channelCallTracer, configSelector);
+
+    call.start(callListener, new Metadata());
+    assertThat(call.isReady()).isTrue();
+
+    call.halfClose();
+    assertThat(call.isReady()).isFalse();
+  }
+
+  @Test
   public void startAddsMaxSize() {
     CallOptions callOptions =
         baseCallOptions.withMaxInboundMessageSize(1).withMaxOutboundMessageSize(2);
diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java
index edf303a..9c25f47 100644
--- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java
@@ -334,6 +334,8 @@
     when(stream.isReady()).thenReturn(true);
 
     assertTrue(call.isReady());
+    call.close(Status.OK, new Metadata());
+    assertFalse(call.isReady());
   }
 
   @Test
diff --git a/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java b/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java
index 0f7d54e..dc245b3 100644
--- a/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java
+++ b/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java
@@ -139,13 +139,6 @@
   }
 
   @Test
-  public void closedShouldNotBeReady() throws IOException {
-    assertTrue(stream.isReady());
-    closeStream();
-    assertFalse(stream.isReady());
-  }
-
-  @Test
   public void notifiedOnReadyAfterWriteCompletes() throws IOException {
     sendHeadersIfServer();
     assertTrue(stream.isReady());