core: Fix NPE race during hedging

The problem was one hedge was committed before another had drained
start(). This was not testable because HedgingRunnable checks whether
scheduledHedgingRef is cancelled, which is racy, but there's no way to
deterministically trigger either race.

The same problem couldn't be triggered with retries because only one
attempt will be draining at a time. Retries with cancellation also
couldn't trigger it, for the surprising reason that the noop stream used
in cancel() wasn't considered drained.

This commit marks the noop stream as drained with cancel(), which allows
memory to be garbage collected sooner and exposes the race for tests.
That then showed the stream as hanging, because inFlightSubStreams
wasn't being decremented.

Fixes #9185
This commit is contained in:
Eric Anderson 2023-04-03 15:18:54 -07:00
parent 1c6a7412bb
commit 8d98e5ff7f
2 changed files with 66 additions and 20 deletions

View File

@ -282,14 +282,12 @@ abstract class RetriableStream<ReqT> implements ClientStream {
synchronized (lock) {
savedState = state;
if (streamStarted) {
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
@ -326,15 +324,13 @@ abstract class RetriableStream<ReqT> implements ClientStream {
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}
if (streamStarted) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
}
}
@ -344,6 +340,10 @@ abstract class RetriableStream<ReqT> implements ClientStream {
return;
}
if (!streamStarted) {
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
substream.stream.start(new Sublistener(substream));
}
substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
@ -484,6 +484,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
if (cancelled) {
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
newSubstream.stream.start(new Sublistener(newSubstream));
newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
return;
}
@ -507,6 +509,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
Runnable runnable = commit(noopSubstream);
if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;

View File

@ -188,7 +188,7 @@ public class RetriableStreamTest {
}
}
private final RetriableStream<String> retriableStream =
private RetriableStream<String> retriableStream =
newThrottledRetriableStream(null /* throttle */);
private final RetriableStream<String> hedgingStream =
newThrottledHedgingStream(null /* throttle */);
@ -196,10 +196,13 @@ public class RetriableStreamTest {
private ClientStreamTracer bufferSizeTracer;
private RetriableStream<String> newThrottledRetriableStream(Throttle throttle) {
return newThrottledRetriableStream(throttle, MoreExecutors.directExecutor());
}
private RetriableStream<String> newThrottledRetriableStream(Throttle throttle, Executor drainer) {
return new RecordedRetriableStream(
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY,
null, throttle);
drainer, fakeClock.getScheduledExecutorService(), RETRY_POLICY, null, throttle);
}
private RetriableStream<String> newThrottledHedgingStream(Throttle throttle) {
@ -598,6 +601,44 @@ public class RetriableStreamTest {
inOrder.verify(retriableStreamRecorder, never()).postCommit();
}
@Test
public void transparentRetry_cancel_race() {
FakeClock drainer = new FakeClock();
retriableStream = newThrottledRetriableStream(null, drainer.getScheduledExecutorService());
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
InOrder inOrder = inOrder(retriableStreamRecorder);
retriableStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
// retry, but don't drain
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0);
sublistenerCaptor1.getValue().closed(
Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata());
assertEquals(1, drainer.numPendingTasks());
// cancel
retriableStream.cancel(Status.CANCELLED);
// drain transparent retry
drainer.runDueTasks();
inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockStream2).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
sublistenerCaptor2.getValue().closed(statusCaptor.getValue(), PROCESSED, new Metadata());
verify(masterListener).closed(same(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
}
@Test
public void unretriableClosed_cancel() {
ClientStream mockStream1 = mock(ClientStream.class);