okhttp: Remove RPCs-before-ready tests

In the olden days, before LB policies, transports had to accept RPCs as
soon as they were created. This hasn't been true for a very long time,
so remove the tests.

Since a978c9ed we're using real, legit code flows in the tests. This
allowed TSAN to discover that `attributes` is racy when read when
creating a new stream before the transport is ready. We could use a lock
or volatile, but the value of the attributes would still be incorrect
for any RPCs that are created before the transport is ready.

Since there's now only one test that delays the connection, I inline the
support code.
This commit is contained in:
Eric Anderson 2022-04-07 13:30:25 -07:00 committed by GitHub
parent 5351fb9c25
commit 054cb49b49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 89 deletions

View File

@ -45,7 +45,6 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
@ -171,7 +170,6 @@ public class OkHttpClientTransportTest {
private ExecutorService executor = Executors.newCachedThreadPool();
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
private SettableFuture<Void> connectedFuture;
private DelayConnectedCallback delayConnectedCallback;
private Runnable tooManyPingsRunnable = new Runnable() {
@Override public void run() {
throw new AssertionError();
@ -204,15 +202,6 @@ public class OkHttpClientTransportTest {
startTransport(startId, null, true, null);
}
private void initTransportAndDelayConnected() throws Exception {
delayConnectedCallback = new DelayConnectedCallback();
startTransport(
DEFAULT_START_STREAM_ID,
delayConnectedCallback,
false,
null);
}
private void startTransport(int startId, @Nullable Runnable connectingCallback,
boolean waitingForConnected, String userAgent)
throws Exception {
@ -1681,70 +1670,17 @@ public class OkHttpClientTransportTest {
shutdownAndVerify();
}
@Test
public void writeBeforeConnected() throws Exception {
initTransportAndDelayConnected();
reset(frameWriter);
final String message = "Hello Server";
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream.start(listener);
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
stream.writeMessage(input);
stream.flush();
// The message should be queued.
verifyNoMoreInteractions(frameWriter);
allowTransportConnected();
// The queued message should be sent out.
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(12 + HEADER_LENGTH));
Buffer sentFrame = capturedBuffer.poll();
assertEquals(createMessageFrame(message), sentFrame);
stream.cancel(Status.CANCELLED);
shutdownAndVerify();
}
@Test
public void cancelBeforeConnected() throws Exception {
initTransportAndDelayConnected();
reset(frameWriter);
final String message = "Hello Server";
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream.start(listener);
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
stream.writeMessage(input);
stream.flush();
stream.cancel(Status.CANCELLED);
verifyNoMoreInteractions(frameWriter);
allowTransportConnected();
verifyNoMoreInteractions(frameWriter);
shutdownAndVerify();
}
@Test
public void shutdownDuringConnecting() throws Exception {
initTransportAndDelayConnected();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream.start(listener);
SettableFuture<Void> delayed = SettableFuture.create();
Runnable connectingCallback = () -> Futures.getUnchecked(delayed);
startTransport(
DEFAULT_START_STREAM_ID,
connectingCallback,
false,
null);
clientTransport.shutdown(SHUTDOWN_REASON);
allowTransportConnected();
// The new stream should be failed, but not the pending stream.
assertNewStreamFail();
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
assertEquals(1, activeStreamCount());
stream.cancel(Status.CANCELLED);
listener.waitUntilStreamClosed();
assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
delayed.set(null);
shutdownAndVerify();
}
@ -2375,10 +2311,6 @@ public class OkHttpClientTransportTest {
}
}
private void allowTransportConnected() {
delayConnectedCallback.allowConnected();
}
private void shutdownAndVerify() {
clientTransport.shutdown(SHUTDOWN_REASON);
assertEquals(0, activeStreamCount());
@ -2390,19 +2322,6 @@ public class OkHttpClientTransportTest {
frameReader.assertClosed();
}
private static class DelayConnectedCallback implements Runnable {
SettableFuture<Void> delayed = SettableFuture.create();
@Override
public void run() {
Futures.getUnchecked(delayed);
}
void allowConnected() {
delayed.set(null);
}
}
private static TransportStats getTransportStats(InternalInstrumented<SocketStats> obj)
throws ExecutionException, InterruptedException {
return obj.getStats().get().data;