core: move closed check from Stream.isReady() to Call.isReady() (#8566)

This fixes data race described in #8565.

We are doubtful whether checking closed in isReady() is necessary (#3201 might be a requirement), but it was easier to just maintain the existing behavior than think heavily about it.
This commit is contained in:
ZHANG Dapeng 2021-09-29 09:42:59 -07:00 committed by GitHub
parent f57de6bd03
commit 28f2647aaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 26 additions and 10 deletions

View File

@ -91,9 +91,6 @@ public abstract class AbstractStream implements Stream {
@Override
public boolean isReady() {
if (framer().isClosed()) {
return false;
}
return transportState().isReady();
}

View File

@ -544,6 +544,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public boolean isReady() {
if (halfCloseCalled) {
return false;
}
return stream.isReady();
}

View File

@ -194,6 +194,9 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public boolean isReady() {
if (closeCalled) {
return false;
}
return stream.isReady();
}

View File

@ -1022,6 +1022,24 @@ public class ClientCallImplTest {
assertSame(cause, status.getCause());
}
@Test
public void halfClosedShouldNotBeReady() {
when(stream.isReady()).thenReturn(true);
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
clientStreamProvider,
deadlineCancellationExecutor,
channelCallTracer, configSelector);
call.start(callListener, new Metadata());
assertThat(call.isReady()).isTrue();
call.halfClose();
assertThat(call.isReady()).isFalse();
}
@Test
public void startAddsMaxSize() {
CallOptions callOptions =

View File

@ -334,6 +334,8 @@ public class ServerCallImplTest {
when(stream.isReady()).thenReturn(true);
assertTrue(call.isReady());
call.close(Status.OK, new Metadata());
assertFalse(call.isReady());
}
@Test

View File

@ -138,13 +138,6 @@ public abstract class NettyStreamTestBase<T extends Stream> {
assertTrue(stream.isReady());
}
@Test
public void closedShouldNotBeReady() throws IOException {
assertTrue(stream.isReady());
closeStream();
assertFalse(stream.isReady());
}
@Test
public void notifiedOnReadyAfterWriteCompletes() throws IOException {
sendHeadersIfServer();