core: move closed check from Stream.isReady() to Call.isReady() (#8566)
This fixes data race described in #8565.
We are doubtful whether checking closed in isReady() is necessary (#3201 might be a requirement), but it was easier to just maintain the existing behavior than think heavily about it.
diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index e066018..69df1ee 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -91,9 +91,6 @@
@Override
public boolean isReady() {
- if (framer().isClosed()) {
- return false;
- }
return transportState().isReady();
}
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 6f850ad..db1a992 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -544,6 +544,9 @@
@Override
public boolean isReady() {
+ if (halfCloseCalled) {
+ return false;
+ }
return stream.isReady();
}
diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java
index deba21c..b31aadd 100644
--- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java
@@ -194,6 +194,9 @@
@Override
public boolean isReady() {
+ if (closeCalled) {
+ return false;
+ }
return stream.isReady();
}
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index ecf7a90..e409f2f 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -1023,6 +1023,24 @@
}
@Test
+ public void halfClosedShouldNotBeReady() {
+ when(stream.isReady()).thenReturn(true);
+ ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
+ method,
+ MoreExecutors.directExecutor(),
+ baseCallOptions,
+ clientStreamProvider,
+ deadlineCancellationExecutor,
+ channelCallTracer, configSelector);
+
+ call.start(callListener, new Metadata());
+ assertThat(call.isReady()).isTrue();
+
+ call.halfClose();
+ assertThat(call.isReady()).isFalse();
+ }
+
+ @Test
public void startAddsMaxSize() {
CallOptions callOptions =
baseCallOptions.withMaxInboundMessageSize(1).withMaxOutboundMessageSize(2);
diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java
index edf303a..9c25f47 100644
--- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java
@@ -334,6 +334,8 @@
when(stream.isReady()).thenReturn(true);
assertTrue(call.isReady());
+ call.close(Status.OK, new Metadata());
+ assertFalse(call.isReady());
}
@Test
diff --git a/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java b/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java
index 0f7d54e..dc245b3 100644
--- a/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java
+++ b/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java
@@ -139,13 +139,6 @@
}
@Test
- public void closedShouldNotBeReady() throws IOException {
- assertTrue(stream.isReady());
- closeStream();
- assertFalse(stream.isReady());
- }
-
- @Test
public void notifiedOnReadyAfterWriteCompletes() throws IOException {
sendHeadersIfServer();
assertTrue(stream.isReady());