core: retry part 5 continued, logic on retry policy

Implement logic for making retry decision using RetryPolicy.
This commit is contained in:
ZHANG Dapeng 2018-02-07 18:00:04 -08:00 committed by GitHub
parent f44fc50310
commit ad62cc2775
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 502 additions and 113 deletions

View File

@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@ -52,6 +53,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
@VisibleForTesting
static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
@VisibleForTesting
static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
private static final Status CANCELLED_BECAUSE_COMMITTED =
Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
@ -81,6 +85,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private ClientStreamListener masterListener;
private Future<?> scheduledRetry;
private double nextBackoffIntervalInSeconds;
RetriableStream(
MethodDescriptor<ReqT, ?> method, Metadata headers,
@ -95,6 +100,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
this.scheduledExecutorService = scheduledExecutorService;
this.headers = headers;
this.retryPolicy = checkNotNull(retryPolicy, "retryPolicy");
nextBackoffIntervalInSeconds = retryPolicy.initialBackoffInSeconds;
}
@Nullable // null if already committed
@ -485,11 +491,11 @@ abstract class RetriableStream<ReqT> implements ClientStream {
return Attributes.EMPTY;
}
// TODO(zdapeng): implement retry policy.
// Retry policy is obtained from the combination of the name resolver plus channel builder, and
// passed all the way down to this class.
boolean shouldRetry() {
return false;
private static Random random = new Random();
@VisibleForTesting
static void setRandom(Random random) {
RetriableStream.random = random;
}
boolean hasHedging() {
@ -532,36 +538,82 @@ abstract class RetriableStream<ReqT> implements ClientStream {
return;
}
if (state.winningSubstream == null && shouldRetry()) {
// The check state.winningSubstream == null, checking if is not already committed, is racy,
// but is still safe b/c the retry will also handle committed/cancellation
// TODO(zdapeng): compute backoff
long backoffInMillis = 0L;
scheduledRetry = scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
scheduledRetry = null;
callExecutor.execute(new Runnable() {
@Override
public void run() {
retry(substream.previousAttempts + 1);
}
});
}
},
backoffInMillis,
TimeUnit.MILLISECONDS);
} else if (!hasHedging()) {
if (state.winningSubstream == null) {
RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers);
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
scheduledRetry = scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
scheduledRetry = null;
callExecutor.execute(new Runnable() {
@Override
public void run() {
retry(substream.previousAttempts + 1);
}
});
}
},
retryPlan.backoffInMillis,
TimeUnit.MILLISECONDS);
return;
}
}
if (!hasHedging()) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
masterListener.closed(status, trailers);
}
}
// TODO(zdapeng): in hedge case, if this is a fatal status, cancel all the other attempts, and
// close the masterListener.
}
/**
* Decides in current situation whether or not the RPC should retry and if it should retry how
* long the backoff should be. The decision does not take the commitment status into account, so
* caller should check it separately.
*/
// TODO(zdapeng): add HedgingPolicy as param
private RetryPlan makeRetryDecision(RetryPolicy retryPolicy, Status status, Metadata trailer) {
boolean shouldRetry = false;
long backoffInMillis = 0L;
if (retryPolicy.maxAttempts > substream.previousAttempts + 1) {
String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
if (pushbackStr == null) {
if (retryPolicy.retryableStatusCodes.contains(status.getCode())) {
shouldRetry = true;
backoffInMillis = (long) (nextBackoffIntervalInSeconds * 1000D * random.nextDouble());
nextBackoffIntervalInSeconds = Math.min(
nextBackoffIntervalInSeconds * retryPolicy.backoffMultiplier,
retryPolicy.maxBackoffInSeconds);
} // else no retry
} else {
int pushback;
try {
pushback = Integer.parseInt(pushbackStr);
} catch (NumberFormatException e) {
pushback = -1;
}
if (pushback >= 0) {
shouldRetry = true;
backoffInMillis = pushback;
nextBackoffIntervalInSeconds = retryPolicy.initialBackoffInSeconds;
} // else no retry
}
}
// TODO(zdapeng): transparent retry
// TODO(zdapeng): hedging
// TODO(zdapeng): throttling
return new RetryPlan(shouldRetry, backoffInMillis);
}
@Override
public void messagesAvailable(MessageProducer producer) {
State savedState = state;
@ -772,7 +824,6 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
}
/**
* Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
* the Channel. There should be a single instance of it for each channel.
@ -837,4 +888,15 @@ abstract class RetriableStream<ReqT> implements ClientStream {
retryableStatusCodes);
}
}
private static final class RetryPlan {
final boolean shouldRetry;
// TODO(zdapeng) boolean hasHedging
final long backoffInMillis;
RetryPlan(boolean shouldRetry, long backoffInMillis) {
this.shouldRetry = shouldRetry;
this.backoffInMillis = backoffInMillis;
}
}
}

View File

@ -54,7 +54,10 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -71,9 +74,34 @@ public class RetriableStreamTest {
private static final DecompressorRegistry DECOMPRESSOR_REGISTRY =
DecompressorRegistry.getDefaultInstance();
private static final int MAX_INBOUND_MESSAGE_SIZE = 1234;
private static final int MAX_OUTNBOUND_MESSAGE_SIZE = 5678;
private static final int MAX_OUTBOUND_MESSAGE_SIZE = 5678;
private static final long PER_RPC_BUFFER_LIMIT = 1000;
private static final long CHANNEL_BUFFER_LIMIT = 2000;
private static final int MAX_ATTEMPTS = 6;
private static final double INITIAL_BACKOFF_IN_SECONDS = 100D;
private static final double MAX_BACKOFF_IN_SECONDS = 700D;
private static final double BACKOFF_MULTIPLIER = 2D;
private static final double FAKE_RANDOM = .5D;
static {
RetriableStream.setRandom(
// not random
new Random() {
@Override
public double nextDouble() {
return FAKE_RANDOM;
}
});
}
private static final Code RETRIABLE_STATUS_CODE_1 = Code.UNAVAILABLE;
private static final Code RETRIABLE_STATUS_CODE_2 = Code.DATA_LOSS;
private static final Code NON_RETRIABLE_STATUS_CODE = Code.INTERNAL;
private static final RetryPolicy RETRY_POLICY =
new RetryPolicy(
MAX_ATTEMPTS, INITIAL_BACKOFF_IN_SECONDS, MAX_BACKOFF_IN_SECONDS, BACKOFF_MULTIPLIER,
Arrays.asList(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2));
private final RetriableStreamRecorder retriableStreamRecorder =
mock(RetriableStreamRecorder.class);
private final ClientStreamListener masterListener = mock(ClientStreamListener.class);
@ -89,8 +117,7 @@ public class RetriableStreamTest {
private final RetriableStream<String> retriableStream =
new RetriableStream<String>(
method, new Metadata(),channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(),
new RetryPolicy(4, 100D, 300D, 2D, Arrays.asList(Code.UNAVAILABLE, Code.DATA_LOSS))) {
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY) {
@Override
void postCommit() {
retriableStreamRecorder.postCommit();
@ -109,20 +136,23 @@ public class RetriableStreamTest {
Status prestart() {
return retriableStreamRecorder.prestart();
}
@Override
boolean shouldRetry() {
return retriableStreamRecorder.shouldRetry();
}
};
private ClientStreamTracer bufferSizeTracer;
@After
public void tearDown() {
assertEquals(0, fakeClock.numPendingTasks());
}
@Test
public void retry_everythingDrained() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
InOrder inOrder = inOrder(retriableStreamRecorder, masterListener, mockStream1);
InOrder inOrder =
inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2, mockStream3);
// stream settings before start
retriableStream.setAuthority(AUTHORITY);
@ -132,7 +162,7 @@ public class RetriableStreamTest {
retriableStream.setFullStreamDecompression(true);
retriableStream.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
retriableStream.setMessageCompression(true);
retriableStream.setMaxOutboundMessageSize(MAX_OUTNBOUND_MESSAGE_SIZE);
retriableStream.setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
retriableStream.setMessageCompression(false);
inOrder.verifyNoMoreInteractions();
@ -150,7 +180,7 @@ public class RetriableStreamTest {
inOrder.verify(mockStream1).setFullStreamDecompression(true);
inOrder.verify(mockStream1).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream1).setMessageCompression(true);
inOrder.verify(mockStream1).setMaxOutboundMessageSize(MAX_OUTNBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream1).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream1).setMessageCompression(false);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
@ -174,15 +204,19 @@ public class RetriableStreamTest {
inOrder.verifyNoMoreInteractions();
// retry1
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
inOrder = inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2);
doReturn(true).when(retriableStreamRecorder).shouldRetry();
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).shouldRetry();
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
fakeClock.forwardNanos(0L);
// send more messages during backoff
retriableStream.sendMessage("msg1 during backoff1");
retriableStream.sendMessage("msg2 during backoff1");
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).newSubstream(1);
inOrder.verify(mockStream2).setAuthority(AUTHORITY);
inOrder.verify(mockStream2).setCompressor(COMPRESSOR);
@ -191,7 +225,7 @@ public class RetriableStreamTest {
inOrder.verify(mockStream2).setFullStreamDecompression(true);
inOrder.verify(mockStream2).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream2).setMessageCompression(true);
inOrder.verify(mockStream2).setMaxOutboundMessageSize(MAX_OUTNBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream2).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream2).setMessageCompression(false);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
@ -202,6 +236,7 @@ public class RetriableStreamTest {
inOrder.verify(mockStream2, times(2)).flush();
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(456);
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
inOrder.verifyNoMoreInteractions();
// send more messages
@ -213,16 +248,22 @@ public class RetriableStreamTest {
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
// retry2
ClientStream mockStream3 = mock(ClientStream.class);
doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
inOrder =
inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2, mockStream3);
doReturn(true).when(retriableStreamRecorder).shouldRetry();
sublistenerCaptor2.getValue().closed(Status.UNAVAILABLE, new Metadata());
sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).shouldRetry();
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
fakeClock.forwardNanos(0L);
// send more messages during backoff
retriableStream.sendMessage("msg1 during backoff2");
retriableStream.sendMessage("msg2 during backoff2");
retriableStream.sendMessage("msg3 during backoff2");
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).newSubstream(2);
inOrder.verify(mockStream3).setAuthority(AUTHORITY);
inOrder.verify(mockStream3).setCompressor(COMPRESSOR);
@ -231,7 +272,7 @@ public class RetriableStreamTest {
inOrder.verify(mockStream3).setFullStreamDecompression(true);
inOrder.verify(mockStream3).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream3).setMessageCompression(true);
inOrder.verify(mockStream3).setMaxOutboundMessageSize(MAX_OUTNBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream3).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
inOrder.verify(mockStream3).setMessageCompression(false);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
@ -242,14 +283,13 @@ public class RetriableStreamTest {
inOrder.verify(mockStream3, times(2)).flush();
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
inOrder.verify(mockStream3).request(456);
inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class));
inOrder.verifyNoMoreInteractions();
// no more retry
doReturn(false).when(retriableStreamRecorder).shouldRetry();
sublistenerCaptor3.getValue().closed(Status.UNAVAILABLE, new Metadata());
sublistenerCaptor3.getValue().closed(
Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata());
inOrder.verify(retriableStreamRecorder).shouldRetry();
inOrder.verify(retriableStreamRecorder).postCommit();
inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions();
@ -289,12 +329,11 @@ public class RetriableStreamTest {
verify(mockStream1).start(sublistenerCaptor1.capture());
// retry
// TODO(zdapeng): forward backoff ticker w/ right amount
doReturn(true).when(retriableStreamRecorder).shouldRetry();
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
fakeClock.forwardNanos(0L);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -328,12 +367,12 @@ public class RetriableStreamTest {
inOrder.verify(retriableStreamRecorder).postCommit();
Status status = Status.UNAVAILABLE;
Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
Metadata metadata = new Metadata();
sublistenerCaptor1.getValue().closed(status, metadata);
inOrder.verify(retriableStreamRecorder, never()).postCommit();
verify(masterListener).closed(status, metadata);
inOrder.verify(retriableStreamRecorder, never()).postCommit();
}
@Test
@ -349,12 +388,10 @@ public class RetriableStreamTest {
verify(mockStream1).start(sublistenerCaptor1.capture());
// retry
// TODO(zdapeng): forward backoff ticker w/ right amount
doReturn(true).when(retriableStreamRecorder).shouldRetry();
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
fakeClock.forwardNanos(0L);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -366,14 +403,13 @@ public class RetriableStreamTest {
inOrder.verify(retriableStreamRecorder).postCommit();
// closed
doReturn(false).when(retriableStreamRecorder).shouldRetry();
Status status = Status.UNAVAILABLE;
// closed even with retriable status
Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
Metadata metadata = new Metadata();
sublistenerCaptor2.getValue().closed(status, metadata);
inOrder.verify(retriableStreamRecorder, never()).postCommit();
verify(masterListener).closed(status, metadata);
inOrder.verify(retriableStreamRecorder, never()).postCommit();
}
@Test
@ -397,10 +433,8 @@ public class RetriableStreamTest {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
// closed
// even shouldRetry() returns true
doReturn(true).when(retriableStreamRecorder).shouldRetry();
Status status = Status.UNAVAILABLE;
// closed even with retriable status
Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
Metadata metadata = new Metadata();
sublistenerCaptor1.getValue().closed(status, metadata);
inOrder.verify(retriableStreamRecorder, never()).postCommit();
@ -419,12 +453,10 @@ public class RetriableStreamTest {
verify(mockStream1).start(sublistenerCaptor1.capture());
// retry
// TODO(zdapeng): forward backoff ticker w/ right amount
doReturn(true).when(retriableStreamRecorder).shouldRetry();
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
fakeClock.forwardNanos(0L);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -441,8 +473,7 @@ public class RetriableStreamTest {
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
// closed
doReturn(false).when(retriableStreamRecorder).shouldRetry();
Status status = Status.UNAVAILABLE;
Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE);
Metadata metadata = new Metadata();
sublistenerCaptor2.getValue().closed(status, metadata);
inOrder.verify(retriableStreamRecorder, never()).postCommit();
@ -461,7 +492,7 @@ public class RetriableStreamTest {
verify(mockStream1).start(sublistenerCaptor1.capture());
// closed
Status status = Status.UNAVAILABLE;
Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE);
Metadata metadata = new Metadata();
sublistenerCaptor1.getValue().closed(status, metadata);
@ -486,13 +517,10 @@ public class RetriableStreamTest {
verify(mockStream1).start(sublistenerCaptor1.capture());
// retry
// TODO(zdapeng): forward backoff ticker w/ right amount
fakeClock.forwardNanos(0L);
doReturn(true).when(retriableStreamRecorder).shouldRetry();
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
fakeClock.forwardNanos(0L);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -500,8 +528,7 @@ public class RetriableStreamTest {
inOrder.verify(retriableStreamRecorder, never()).postCommit();
// closed
doReturn(false).when(retriableStreamRecorder).shouldRetry();
Status status = Status.UNAVAILABLE;
Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE);
Metadata metadata = new Metadata();
sublistenerCaptor2.getValue().closed(status, metadata);
@ -515,7 +542,28 @@ public class RetriableStreamTest {
@Test
public void retry_cancelWhileBackoff() {
// TODO(zdapeng)
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
// retry
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
// cancel while backoff
assertEquals(1, fakeClock.numPendingTasks());
verify(retriableStreamRecorder, never()).postCommit();
retriableStream.cancel(Status.CANCELLED);
verify(retriableStreamRecorder).postCommit();
verifyNoMoreInteractions(mockStream1);
verifyNoMoreInteractions(mockStream2);
}
@Test
@ -582,14 +630,16 @@ public class RetriableStreamTest {
inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1"
// retry
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
doReturn(true).when(retriableStreamRecorder).shouldRetry();
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
fakeClock.forwardNanos(0L);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
// send more requests during backoff
retriableStream.request(789);
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
inOrder.verify(mockStream2).start(sublistenerCaptor2.get());
inOrder.verify(mockStream2).request(3);
inOrder.verify(retriableStreamRecorder).postCommit();
inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); // msg "substream1 request 3"
@ -598,9 +648,11 @@ public class RetriableStreamTest {
inOrder.verify(mockStream2).request(1);
// msg "substream1 request 1"
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(789);
// msg "substream2 request 3"
// msg "substream2 request 2"
inOrder.verify(mockStream2, times(3)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(100);
verify(mockStream2).cancel(cancelStatus);
@ -691,17 +743,21 @@ public class RetriableStreamTest {
readiness.add(retriableStream.isReady()); // expected true
// retry
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
doReturn(true).when(retriableStreamRecorder).shouldRetry();
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
doReturn(false).when(mockStream1).isReady(); // mockStream1 closed, so isReady false
sublistenerCaptor1.get().closed(Status.UNAVAILABLE, new Metadata());
fakeClock.forwardNanos(0L);
sublistenerCaptor1.get().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
// send more requests during backoff
retriableStream.request(789);
readiness.add(retriableStream.isReady()); // expected false b/c in backoff
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class));
readiness.add(retriableStream.isReady()); // expected true
assertThat(readiness).containsExactly(false, true, false, true).inOrder();
assertThat(readiness).containsExactly(false, true, false, false, true).inOrder();
}
@Test
@ -733,12 +789,11 @@ public class RetriableStreamTest {
@Override
public void start(ClientStreamListener listener) {
// closed while draning
listener.closed(Status.UNAVAILABLE, new Metadata());
listener.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
}
}));
final ClientStream mockStream3 = mock(ClientStream.class);
doReturn(true).when(retriableStreamRecorder).shouldRetry();
when(retriableStreamRecorder.newSubstream(anyInt()))
.thenReturn(mockStream1, mockStream2, mockStream3);
@ -753,17 +808,24 @@ public class RetriableStreamTest {
ClientStreamListener listener1 = sublistenerCaptor1.getValue();
// retry
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
listener1.closed(Status.UNAVAILABLE, new Metadata());
fakeClock.forwardNanos(0L);
listener1.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
// send requests during backoff
retriableStream.request(3);
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS);
retriableStream.request(1);
verify(mockStream1, never()).request(1);
verify(mockStream2, never()).request(1);
verify(mockStream1, never()).request(anyInt());
verify(mockStream2, never()).request(anyInt());
verify(mockStream3).request(3);
verify(mockStream3).request(1);
}
// TODO(zdapeng): test buffer limit exceeded during backoff
@Test
public void perRpcBufferLimitExceeded() {
ClientStream mockStream1 = mock(ClientStream.class);
@ -783,6 +845,42 @@ public class RetriableStreamTest {
assertEquals(0, channelBufferUsed.addAndGet(0));
}
@Test
public void perRpcBufferLimitExceededDuringBackoff() {
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
// retry
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
// bufferSizeTracer.outboundWireSize() quits immediately while backoff b/c substream1 is closed
assertEquals(1, fakeClock.numPendingTasks());
bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder, never()).postCommit();
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class));
// bufferLimitExceeded
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
verify(retriableStreamRecorder, never()).postCommit();
bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder).postCommit();
verifyNoMoreInteractions(mockStream1);
verifyNoMoreInteractions(mockStream2);
}
@Test
public void channelBufferLimitExceeded() {
ClientStream mockStream1 = mock(ClientStream.class);
@ -814,6 +912,237 @@ public class RetriableStreamTest {
assertNull(originalHeaders.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
}
@Test
public void expBackoff_maxBackoff_maxRetryAttempts() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class);
ClientStream mockStream4 = mock(ClientStream.class);
ClientStream mockStream5 = mock(ClientStream.class);
ClientStream mockStream6 = mock(ClientStream.class);
ClientStream mockStream7 = mock(ClientStream.class);
InOrder inOrder = inOrder(
mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
when(retriableStreamRecorder.newSubstream(anyInt())).thenReturn(
mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
retriableStream.start(masterListener);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verifyNoMoreInteractions();
// retry1
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verifyNoMoreInteractions();
// retry2
sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(2);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verifyNoMoreInteractions();
// retry3
sublistenerCaptor3.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
- 1L,
TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(3);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
inOrder.verifyNoMoreInteractions();
// retry4
sublistenerCaptor4.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(4);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
inOrder.verifyNoMoreInteractions();
// retry5
sublistenerCaptor5.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(5);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
inOrder.verifyNoMoreInteractions();
// can not retry any more
verify(retriableStreamRecorder, never()).postCommit();
sublistenerCaptor6.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
verify(retriableStreamRecorder).postCommit();
inOrder.verifyNoMoreInteractions();
}
@Test
public void pushback() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class);
ClientStream mockStream4 = mock(ClientStream.class);
ClientStream mockStream5 = mock(ClientStream.class);
ClientStream mockStream6 = mock(ClientStream.class);
ClientStream mockStream7 = mock(ClientStream.class);
InOrder inOrder = inOrder(
mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
when(retriableStreamRecorder.newSubstream(anyInt())).thenReturn(
mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
retriableStream.start(masterListener);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verifyNoMoreInteractions();
// retry1
int pushbackInMillis = 123;
Metadata headers = new Metadata();
headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(pushbackInMillis - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(1);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verifyNoMoreInteractions();
// retry2
pushbackInMillis = 4567 * 1000;
headers = new Metadata();
headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), headers);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(pushbackInMillis - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(2);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verifyNoMoreInteractions();
// retry3
sublistenerCaptor3.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(3);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
inOrder.verifyNoMoreInteractions();
// retry4
sublistenerCaptor4.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(4);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
inOrder.verifyNoMoreInteractions();
// retry5
sublistenerCaptor5.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(
(long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
- 1L,
TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(5);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
inOrder.verifyNoMoreInteractions();
// can not retry any more even pushback is positive
pushbackInMillis = 4567 * 1000;
headers = new Metadata();
headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
verify(retriableStreamRecorder, never()).postCommit();
sublistenerCaptor6.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
verify(retriableStreamRecorder).postCommit();
inOrder.verifyNoMoreInteractions();
}
@Test
public void pushback_noRetry() {
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(anyInt());
retriableStream.start(masterListener);
assertEquals(0, fakeClock.numPendingTasks());
verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
verify(retriableStreamRecorder, never()).postCommit();
// pushback no retry
Metadata headers = new Metadata();
headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "");
sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
verify(retriableStreamRecorder, never()).newSubstream(1);
verify(retriableStreamRecorder).postCommit();
}
/**
* Used to stub a retriable stream as well as to record methods of the retriable stream being
* called.
@ -824,7 +1153,5 @@ public class RetriableStreamTest {
ClientStream newSubstream(int previousAttempts);
Status prestart();
boolean shouldRetry();
}
}