mirror of https://github.com/grpc/grpc-java.git
core: replumb RetryPolicy
- replumbed `RetryPolicy` with `MethodInfo` without breaking the existing `RetryPolicyTest`. - moved `ServiceConfigInterceptor.MethodInfo.RetryPolicy` out as a top level class so that `RetriableStream` does not import `ServiceConfigInterceptor`.
This commit is contained in:
parent
3f63cd0ad4
commit
6d7e19cbe5
|
@ -127,7 +127,11 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
int maxHedgedAttempts = 5;
|
||||
long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES;
|
||||
long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES;
|
||||
boolean retryDisabled = true; // TODO(zdapeng): default to false
|
||||
boolean retryEnabled = false; // TODO(zdapeng): default to true
|
||||
// Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know
|
||||
// what should be the desired behavior for retry + stats/tracing.
|
||||
// TODO(zdapeng): delete me
|
||||
boolean temporarilyDisableRetry;
|
||||
|
||||
Channelz channelz = Channelz.instance();
|
||||
|
||||
|
@ -315,13 +319,13 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
|
||||
@Override
|
||||
public final T disableRetry() {
|
||||
retryDisabled = true;
|
||||
retryEnabled = false;
|
||||
return thisT();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final T enableRetry() {
|
||||
retryDisabled = false;
|
||||
retryEnabled = true;
|
||||
return thisT();
|
||||
}
|
||||
|
||||
|
@ -398,8 +402,9 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
final List<ClientInterceptor> getEffectiveInterceptors() {
|
||||
List<ClientInterceptor> effectiveInterceptors =
|
||||
new ArrayList<ClientInterceptor>(this.interceptors);
|
||||
temporarilyDisableRetry = false;
|
||||
if (statsEnabled) {
|
||||
retryDisabled = true;
|
||||
temporarilyDisableRetry = true;
|
||||
CensusStatsModule censusStats = this.censusStatsOverride;
|
||||
if (censusStats == null) {
|
||||
censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
|
||||
|
@ -410,7 +415,7 @@ public abstract class AbstractManagedChannelImplBuilder
|
|||
0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs));
|
||||
}
|
||||
if (tracingEnabled) {
|
||||
retryDisabled = true;
|
||||
temporarilyDisableRetry = true;
|
||||
CensusTracingModule censusTracing =
|
||||
new CensusTracingModule(Tracing.getTracer(),
|
||||
Tracing.getPropagationComponent().getBinaryFormat());
|
||||
|
|
|
@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
|
|||
import static io.grpc.ConnectivityState.IDLE;
|
||||
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static io.grpc.internal.RetriableStream.RetryPolicy.DEFAULT;
|
||||
import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
|
@ -55,8 +55,6 @@ import io.grpc.Status;
|
|||
import io.grpc.internal.Channelz.ChannelStats;
|
||||
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
|
||||
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
|
||||
import io.grpc.internal.RetriableStream.RetryPolicies;
|
||||
import io.grpc.internal.RetriableStream.RetryPolicy;
|
||||
import io.grpc.internal.RetriableStream.Throttle;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -135,7 +133,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
|
||||
private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
|
||||
|
||||
private final ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor();
|
||||
private final ServiceConfigInterceptor serviceConfigInterceptor;
|
||||
|
||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||
|
||||
|
@ -213,12 +211,9 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
@Nullable
|
||||
private Throttle throttle;
|
||||
|
||||
private final int maxRetryAttempts;
|
||||
private final int maxHedgedAttempts;
|
||||
private final long perRpcBufferLimit;
|
||||
private final long channelBufferLimit;
|
||||
|
||||
private RetryPolicies retryPolicies;
|
||||
// Temporary false flag that can skip the retry code path.
|
||||
private final boolean retryEnabled;
|
||||
|
||||
|
@ -481,11 +476,10 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
final Metadata headers,
|
||||
final Context context) {
|
||||
checkState(retryEnabled, "retry should be enabled");
|
||||
RetryPolicy retryPolicy = retryPolicies == null ? DEFAULT : retryPolicies.get(method);
|
||||
return new RetriableStream<ReqT>(
|
||||
method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
|
||||
getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
|
||||
retryPolicy, throttle) {
|
||||
callOptions.getOption(RETRY_POLICY_KEY), throttle) {
|
||||
@Override
|
||||
Status prestart() {
|
||||
return uncommittedRetriableStreamsRegistry.add(this);
|
||||
|
@ -539,6 +533,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
this.backoffPolicyProvider = backoffPolicyProvider;
|
||||
this.transportFactory =
|
||||
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
|
||||
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
|
||||
serviceConfigInterceptor = new ServiceConfigInterceptor(retryEnabled, builder.maxRetryAttempts);
|
||||
Channel channel = new RealChannel();
|
||||
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
|
||||
if (builder.binlogProvider != null) {
|
||||
|
@ -576,11 +572,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
|
||||
this.userAgent = builder.userAgent;
|
||||
|
||||
this.maxRetryAttempts = builder.maxRetryAttempts;
|
||||
this.maxHedgedAttempts = builder.maxHedgedAttempts;
|
||||
this.channelBufferLimit = builder.retryBufferSize;
|
||||
this.perRpcBufferLimit = builder.perRpcBufferLimit;
|
||||
this.retryEnabled = !builder.retryDisabled;
|
||||
|
||||
this.callTracerFactory = callTracerFactory;
|
||||
channelCallTracer = callTracerFactory.create();
|
||||
|
@ -1171,7 +1164,6 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
try {
|
||||
serviceConfigInterceptor.handleUpdate(serviceConfig);
|
||||
if (retryEnabled) {
|
||||
retryPolicies = getRetryPolicies(config);
|
||||
throttle = getThrottle(config);
|
||||
}
|
||||
} catch (RuntimeException re) {
|
||||
|
@ -1231,12 +1223,6 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(zdapeng): test retryEnabled = true/flase really works as expected
|
||||
private RetryPolicies getRetryPolicies(Attributes config) {
|
||||
return ServiceConfigUtil.getRetryPolicies(
|
||||
config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG), maxRetryAttempts);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static Throttle getThrottle(Attributes config) {
|
||||
return ServiceConfigUtil.getThrottlePolicy(
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
|
@ -45,10 +44,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/** A logical {@link ClientStream} that is retriable. */
|
||||
abstract class RetriableStream<ReqT> implements ClientStream {
|
||||
|
@ -68,7 +65,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
|||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
// Must not modify it.
|
||||
private final Metadata headers;
|
||||
private final RetryPolicy retryPolicy;
|
||||
private final RetryPolicy.Provider retryPolicyProvider;
|
||||
private RetryPolicy retryPolicy;
|
||||
|
||||
/** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
|
||||
private final Object lock = new Object();
|
||||
|
@ -94,13 +92,13 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
|||
|
||||
private ClientStreamListener masterListener;
|
||||
private Future<?> scheduledRetry;
|
||||
private double nextBackoffIntervalInSeconds;
|
||||
private long nextBackoffIntervalNanos;
|
||||
|
||||
RetriableStream(
|
||||
MethodDescriptor<ReqT, ?> method, Metadata headers,
|
||||
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
|
||||
Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
|
||||
RetryPolicy retryPolicy, @Nullable Throttle throttle) {
|
||||
RetryPolicy.Provider retryPolicyProvider, @Nullable Throttle throttle) {
|
||||
this.method = method;
|
||||
this.channelBufferUsed = channelBufferUsed;
|
||||
this.perRpcBufferLimit = perRpcBufferLimit;
|
||||
|
@ -108,8 +106,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
|||
this.callExecutor = callExecutor;
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
this.headers = headers;
|
||||
this.retryPolicy = checkNotNull(retryPolicy, "retryPolicy");
|
||||
nextBackoffIntervalInSeconds = retryPolicy.initialBackoffInSeconds;
|
||||
this.retryPolicyProvider = checkNotNull(retryPolicyProvider, "retryPolicyProvider");
|
||||
this.throttle = throttle;
|
||||
}
|
||||
|
||||
|
@ -572,6 +569,12 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
|||
// TODO(zdapeng): cancel all scheduled hedges (TBD)
|
||||
} else {
|
||||
noMoreTransparentRetry = true;
|
||||
|
||||
if (retryPolicy == null) {
|
||||
retryPolicy = retryPolicyProvider.get();
|
||||
nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
|
||||
}
|
||||
|
||||
RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers);
|
||||
if (retryPlan.shouldRetry) {
|
||||
// The check state.winningSubstream == null, checking if is not already committed, is
|
||||
|
@ -591,8 +594,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
|||
});
|
||||
}
|
||||
},
|
||||
retryPlan.backoffInMillis,
|
||||
TimeUnit.MILLISECONDS);
|
||||
retryPlan.backoffNanos,
|
||||
TimeUnit.NANOSECONDS);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -617,45 +620,46 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
|||
// TODO(zdapeng): add HedgingPolicy as param
|
||||
private RetryPlan makeRetryDecision(RetryPolicy retryPolicy, Status status, Metadata trailer) {
|
||||
boolean shouldRetry = false;
|
||||
long backoffInMillis = 0L;
|
||||
long backoffNanos = 0L;
|
||||
boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
|
||||
|
||||
String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
|
||||
Integer pushback = null;
|
||||
Integer pushbackMillis = null;
|
||||
if (pushbackStr != null) {
|
||||
try {
|
||||
pushback = Integer.valueOf(pushbackStr);
|
||||
pushbackMillis = Integer.valueOf(pushbackStr);
|
||||
} catch (NumberFormatException e) {
|
||||
pushback = -1;
|
||||
pushbackMillis = -1;
|
||||
}
|
||||
}
|
||||
|
||||
boolean isThrottled = false;
|
||||
if (throttle != null) {
|
||||
if (isRetryableStatusCode || (pushback != null && pushback < 0)) {
|
||||
if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
|
||||
isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
|
||||
}
|
||||
}
|
||||
|
||||
if (retryPolicy.maxAttempts > substream.previousAttempts + 1 && !isThrottled) {
|
||||
if (pushback == null) {
|
||||
if (pushbackMillis == null) {
|
||||
if (isRetryableStatusCode) {
|
||||
shouldRetry = true;
|
||||
backoffInMillis = (long) (nextBackoffIntervalInSeconds * 1000D * random.nextDouble());
|
||||
nextBackoffIntervalInSeconds = Math.min(
|
||||
nextBackoffIntervalInSeconds * retryPolicy.backoffMultiplier,
|
||||
retryPolicy.maxBackoffInSeconds);
|
||||
backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
|
||||
nextBackoffIntervalNanos = Math.min(
|
||||
(long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
|
||||
retryPolicy.maxBackoffNanos);
|
||||
|
||||
} // else no retry
|
||||
} else if (pushback >= 0) {
|
||||
} else if (pushbackMillis >= 0) {
|
||||
shouldRetry = true;
|
||||
backoffInMillis = pushback;
|
||||
nextBackoffIntervalInSeconds = retryPolicy.initialBackoffInSeconds;
|
||||
backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
|
||||
nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
|
||||
} // else no retry
|
||||
} // else no retry
|
||||
|
||||
// TODO(zdapeng): transparent retry
|
||||
// TODO(zdapeng): hedging
|
||||
return new RetryPlan(shouldRetry, backoffInMillis);
|
||||
return new RetryPlan(shouldRetry, backoffNanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -972,72 +976,14 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
|||
}
|
||||
}
|
||||
|
||||
interface RetryPolicies {
|
||||
@Nonnull
|
||||
RetryPolicy get(MethodDescriptor<?, ?> method);
|
||||
}
|
||||
|
||||
@Immutable
|
||||
static final class RetryPolicy {
|
||||
private final int maxAttempts;
|
||||
private final double initialBackoffInSeconds;
|
||||
private final double maxBackoffInSeconds;
|
||||
private final double backoffMultiplier;
|
||||
private final Collection<Status.Code> retryableStatusCodes;
|
||||
|
||||
RetryPolicy(
|
||||
int maxAttempts, double initialBackoffInSeconds, double maxBackoffInSeconds,
|
||||
double backoffMultiplier, Collection<Status.Code> retryableStatusCodes) {
|
||||
checkArgument(maxAttempts >= 1, "maxAttempts");
|
||||
this.maxAttempts = maxAttempts;
|
||||
checkArgument(initialBackoffInSeconds >= 0D, "initialBackoffInSeconds");
|
||||
this.initialBackoffInSeconds = initialBackoffInSeconds;
|
||||
checkArgument(
|
||||
maxBackoffInSeconds >= initialBackoffInSeconds,
|
||||
"maxBackoffInSeconds should be at least initialBackoffInSeconds");
|
||||
this.maxBackoffInSeconds = maxBackoffInSeconds;
|
||||
checkArgument(backoffMultiplier > 0D, "backoffMultiplier");
|
||||
this.backoffMultiplier = backoffMultiplier;
|
||||
this.retryableStatusCodes = Collections.unmodifiableSet(
|
||||
new HashSet<Status.Code>(checkNotNull(retryableStatusCodes, "retryableStatusCodes")));
|
||||
}
|
||||
|
||||
/** No retry. */
|
||||
static final RetryPolicy DEFAULT =
|
||||
new RetryPolicy(1, 0, 0, 1, Collections.<Status.Code>emptyList());
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof RetryPolicy)) {
|
||||
return false;
|
||||
}
|
||||
RetryPolicy that = (RetryPolicy) o;
|
||||
return maxAttempts == that.maxAttempts
|
||||
&& Double.compare(backoffMultiplier, that.backoffMultiplier) == 0
|
||||
&& Double.compare(initialBackoffInSeconds, that.initialBackoffInSeconds) == 0
|
||||
&& Double.compare(maxBackoffInSeconds, that.maxBackoffInSeconds) == 0
|
||||
&& Objects.equal(retryableStatusCodes, that.retryableStatusCodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(
|
||||
maxAttempts, initialBackoffInSeconds, maxBackoffInSeconds, backoffMultiplier,
|
||||
retryableStatusCodes);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class RetryPlan {
|
||||
final boolean shouldRetry;
|
||||
// TODO(zdapeng) boolean hasHedging
|
||||
final long backoffInMillis;
|
||||
final long backoffNanos;
|
||||
|
||||
RetryPlan(boolean shouldRetry, long backoffInMillis) {
|
||||
RetryPlan(boolean shouldRetry, long backoffNanos) {
|
||||
this.shouldRetry = shouldRetry;
|
||||
this.backoffInMillis = backoffInMillis;
|
||||
this.backoffNanos = backoffNanos;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright 2018, gRPC Authors All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* Retry policy data object.
|
||||
*/
|
||||
@Immutable
|
||||
final class RetryPolicy {
|
||||
final int maxAttempts;
|
||||
final long initialBackoffNanos;
|
||||
final long maxBackoffNanos;
|
||||
final double backoffMultiplier;
|
||||
final Set<Code> retryableStatusCodes;
|
||||
|
||||
/** No retry. */
|
||||
static final RetryPolicy DEFAULT =
|
||||
new RetryPolicy(1, 0, 0, 1, Collections.<Status.Code>emptySet());
|
||||
|
||||
/**
|
||||
* The caller is supposed to have validated the arguments and handled throwing exception or
|
||||
* logging warnings already, so we avoid repeating args check here.
|
||||
*/
|
||||
RetryPolicy(
|
||||
int maxAttempts,
|
||||
long initialBackoffNanos,
|
||||
long maxBackoffNanos,
|
||||
double backoffMultiplier,
|
||||
@Nonnull Set<Code> retryableStatusCodes) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
this.initialBackoffNanos = initialBackoffNanos;
|
||||
this.maxBackoffNanos = maxBackoffNanos;
|
||||
this.backoffMultiplier = backoffMultiplier;
|
||||
this.retryableStatusCodes = ImmutableSet.copyOf(retryableStatusCodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(
|
||||
maxAttempts,
|
||||
initialBackoffNanos,
|
||||
maxBackoffNanos,
|
||||
backoffMultiplier,
|
||||
retryableStatusCodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof RetryPolicy)) {
|
||||
return false;
|
||||
}
|
||||
RetryPolicy that = (RetryPolicy) other;
|
||||
return Objects.equal(this.maxAttempts, that.maxAttempts)
|
||||
&& Objects.equal(this.initialBackoffNanos, that.initialBackoffNanos)
|
||||
&& Objects.equal(this.maxBackoffNanos, that.maxBackoffNanos)
|
||||
&& Double.compare(this.backoffMultiplier, that.backoffMultiplier) == 0
|
||||
&& Objects.equal(this.retryableStatusCodes, that.retryableStatusCodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("maxAttempts", maxAttempts)
|
||||
.add("initialBackoffNanos", initialBackoffNanos)
|
||||
.add("maxBackoffNanos", maxBackoffNanos)
|
||||
.add("backoffMultiplier", backoffMultiplier)
|
||||
.add("retryableStatusCodes", retryableStatusCodes)
|
||||
.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Providers the most suitable retry policy for a call when this will have to provide a retry
|
||||
* policy.
|
||||
*/
|
||||
interface Provider {
|
||||
|
||||
/**
|
||||
* This method is used no more than once for each call.
|
||||
*/
|
||||
@Nonnull
|
||||
RetryPolicy get();
|
||||
}
|
||||
}
|
|
@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.CheckForNull;
|
||||
|
||||
/**
|
||||
* Modifies RPCs in in conformance with a Service Config.
|
||||
|
@ -56,7 +57,17 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
final AtomicReference<Map<String, MethodInfo>> serviceMap
|
||||
= new AtomicReference<Map<String, MethodInfo>>();
|
||||
|
||||
ServiceConfigInterceptor() {}
|
||||
private final boolean retryEnabled;
|
||||
private final int maxRetryAttemptsLimit;
|
||||
|
||||
// Setting this to true and observing this equal to true are run in different threads.
|
||||
private volatile boolean nameResolveComplete;
|
||||
|
||||
ServiceConfigInterceptor(
|
||||
boolean retryEnabled, int maxRetryAttemptsLimit) {
|
||||
this.retryEnabled = retryEnabled;
|
||||
this.maxRetryAttemptsLimit = maxRetryAttemptsLimit;
|
||||
}
|
||||
|
||||
void handleUpdate(Map<String, Object> serviceConfig) {
|
||||
Map<String, MethodInfo> newServiceMethodConfigs = new HashMap<String, MethodInfo>();
|
||||
|
@ -69,11 +80,12 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
ServiceConfigUtil.getMethodConfigFromServiceConfig(serviceConfig);
|
||||
if (methodConfigs == null) {
|
||||
logger.log(Level.FINE, "No method configs found, skipping");
|
||||
nameResolveComplete = true;
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map<String, Object> methodConfig : methodConfigs) {
|
||||
MethodInfo info = new MethodInfo(methodConfig);
|
||||
MethodInfo info = new MethodInfo(methodConfig, retryEnabled, maxRetryAttemptsLimit);
|
||||
|
||||
List<Map<String, Object>> nameList =
|
||||
ServiceConfigUtil.getNameListFromMethodConfig(methodConfig);
|
||||
|
@ -104,10 +116,11 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
// Okay, service config is good, swap it.
|
||||
serviceMethodMap.set(Collections.unmodifiableMap(newServiceMethodConfigs));
|
||||
serviceMap.set(Collections.unmodifiableMap(newServiceConfigs));
|
||||
nameResolveComplete = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent of MethodConfig from a ServiceConfig.
|
||||
* Equivalent of MethodConfig from a ServiceConfig with restrictions from Channel setting.
|
||||
*/
|
||||
static final class MethodInfo {
|
||||
final Long timeoutNanos;
|
||||
|
@ -116,7 +129,13 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
final Integer maxOutboundMessageSize;
|
||||
final RetryPolicy retryPolicy;
|
||||
|
||||
MethodInfo(Map<String, Object> methodConfig) {
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param retryEnabled when false, the argument maxRetryAttemptsLimit will have no effect.
|
||||
*/
|
||||
MethodInfo(
|
||||
Map<String, Object> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit) {
|
||||
timeoutNanos = ServiceConfigUtil.getTimeoutFromMethodConfig(methodConfig);
|
||||
waitForReady = ServiceConfigUtil.getWaitForReadyFromMethodConfig(methodConfig);
|
||||
maxInboundMessageSize =
|
||||
|
@ -134,14 +153,16 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
"maxOutboundMessageSize %s exceeds bounds", maxOutboundMessageSize);
|
||||
}
|
||||
|
||||
Map<String, Object> policy = ServiceConfigUtil.getRetryPolicyFromMethodConfig(methodConfig);
|
||||
retryPolicy = policy == null ? null : new RetryPolicy(policy);
|
||||
Map<String, Object> policy =
|
||||
retryEnabled ? ServiceConfigUtil.getRetryPolicyFromMethodConfig(methodConfig) : null;
|
||||
retryPolicy =
|
||||
policy == null ? RetryPolicy.DEFAULT : retryPolicy(policy, maxRetryAttemptsLimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(
|
||||
timeoutNanos, waitForReady, maxInboundMessageSize, maxOutboundMessageSize);
|
||||
timeoutNanos, waitForReady, maxInboundMessageSize, maxOutboundMessageSize, retryPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -153,7 +174,8 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
return Objects.equal(this.timeoutNanos, that.timeoutNanos)
|
||||
&& Objects.equal(this.waitForReady, that.waitForReady)
|
||||
&& Objects.equal(this.maxInboundMessageSize, that.maxInboundMessageSize)
|
||||
&& Objects.equal(this.maxOutboundMessageSize, that.maxOutboundMessageSize);
|
||||
&& Objects.equal(this.maxOutboundMessageSize, that.maxOutboundMessageSize)
|
||||
&& Objects.equal(this.retryPolicy, that.retryPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -163,125 +185,95 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
.add("waitForReady", waitForReady)
|
||||
.add("maxInboundMessageSize", maxInboundMessageSize)
|
||||
.add("maxOutboundMessageSize", maxOutboundMessageSize)
|
||||
.add("retryPolicy", retryPolicy)
|
||||
.toString();
|
||||
}
|
||||
|
||||
static final class RetryPolicy {
|
||||
final int maxAttempts;
|
||||
final long initialBackoffNanos;
|
||||
final long maxBackoffNanos;
|
||||
final double backoffMultiplier;
|
||||
final Set<Code> retryableStatusCodes;
|
||||
private static RetryPolicy retryPolicy(Map<String, Object> retryPolicy, int maxAttemptsLimit) {
|
||||
int maxAttempts = checkNotNull(
|
||||
ServiceConfigUtil.getMaxAttemptsFromRetryPolicy(retryPolicy),
|
||||
"maxAttempts cannot be empty");
|
||||
checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
|
||||
maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
|
||||
|
||||
RetryPolicy(Map<String, Object> retryPolicy) {
|
||||
maxAttempts = checkNotNull(
|
||||
ServiceConfigUtil.getMaxAttemptsFromRetryPolicy(retryPolicy),
|
||||
"maxAttempts cannot be empty");
|
||||
checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
|
||||
long initialBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getInitialBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"initialBackoff cannot be empty");
|
||||
checkArgument(
|
||||
initialBackoffNanos > 0,
|
||||
"initialBackoffNanos must be greater than 0: %s",
|
||||
initialBackoffNanos);
|
||||
|
||||
initialBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getInitialBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"initialBackoff cannot be empty");
|
||||
checkArgument(
|
||||
initialBackoffNanos > 0,
|
||||
"initialBackoffNanos must be greater than 0: %s",
|
||||
initialBackoffNanos);
|
||||
long maxBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getMaxBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"maxBackoff cannot be empty");
|
||||
checkArgument(
|
||||
maxBackoffNanos > 0, "maxBackoff must be greater than 0: %s", maxBackoffNanos);
|
||||
|
||||
maxBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getMaxBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"maxBackoff cannot be empty");
|
||||
checkArgument(
|
||||
maxBackoffNanos > 0, "maxBackoff must be greater than 0: %s", maxBackoffNanos);
|
||||
double backoffMultiplier = checkNotNull(
|
||||
ServiceConfigUtil.getBackoffMultiplierFromRetryPolicy(retryPolicy),
|
||||
"backoffMultiplier cannot be empty");
|
||||
checkArgument(
|
||||
backoffMultiplier > 0,
|
||||
"backoffMultiplier must be greater than 0: %s",
|
||||
backoffMultiplier);
|
||||
|
||||
backoffMultiplier = checkNotNull(
|
||||
ServiceConfigUtil.getBackoffMultiplierFromRetryPolicy(retryPolicy),
|
||||
"backoffMultiplier cannot be empty");
|
||||
checkArgument(
|
||||
backoffMultiplier > 0,
|
||||
"backoffMultiplier must be greater than 0: %s",
|
||||
backoffMultiplier);
|
||||
|
||||
List<String> rawCodes =
|
||||
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy);
|
||||
checkNotNull(rawCodes, "rawCodes must be present");
|
||||
checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
|
||||
EnumSet<Code> codes = EnumSet.noneOf(Code.class);
|
||||
// service config doesn't say if duplicates are allowed, so just accept them.
|
||||
for (String rawCode : rawCodes) {
|
||||
codes.add(Code.valueOf(rawCode));
|
||||
}
|
||||
retryableStatusCodes = Collections.unmodifiableSet(codes);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
RetryPolicy(
|
||||
int maxAttempts,
|
||||
long initialBackoffNanos,
|
||||
long maxBackoffNanos,
|
||||
double backoffMultiplier,
|
||||
Set<Code> retryableStatusCodes) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
this.initialBackoffNanos = initialBackoffNanos;
|
||||
this.maxBackoffNanos = maxBackoffNanos;
|
||||
this.backoffMultiplier = backoffMultiplier;
|
||||
this.retryableStatusCodes = retryableStatusCodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(
|
||||
maxAttempts,
|
||||
initialBackoffNanos,
|
||||
maxBackoffNanos,
|
||||
backoffMultiplier,
|
||||
retryableStatusCodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof RetryPolicy)) {
|
||||
return false;
|
||||
}
|
||||
RetryPolicy that = (RetryPolicy) other;
|
||||
return Objects.equal(this.maxAttempts, that.maxAttempts)
|
||||
&& Objects.equal(this.initialBackoffNanos, that.initialBackoffNanos)
|
||||
&& Objects.equal(this.maxBackoffNanos, that.maxBackoffNanos)
|
||||
&& Objects.equal(this.backoffMultiplier, that.backoffMultiplier)
|
||||
&& Objects.equal(this.retryableStatusCodes, that.retryableStatusCodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("maxAttempts", maxAttempts)
|
||||
.add("initialBackoffNanos", initialBackoffNanos)
|
||||
.add("maxBackoffNanos", maxBackoffNanos)
|
||||
.add("backoffMultiplier", backoffMultiplier)
|
||||
.add("retryableStatusCodes", retryableStatusCodes)
|
||||
.toString();
|
||||
List<String> rawCodes =
|
||||
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy);
|
||||
checkNotNull(rawCodes, "rawCodes must be present");
|
||||
checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
|
||||
EnumSet<Code> codes = EnumSet.noneOf(Code.class);
|
||||
// service config doesn't say if duplicates are allowed, so just accept them.
|
||||
for (String rawCode : rawCodes) {
|
||||
codes.add(Code.valueOf(rawCode));
|
||||
}
|
||||
Set<Code> retryableStatusCodes = Collections.unmodifiableSet(codes);
|
||||
|
||||
return new RetryPolicy(
|
||||
maxAttempts, initialBackoffNanos, maxBackoffNanos, backoffMultiplier,
|
||||
retryableStatusCodes);
|
||||
}
|
||||
}
|
||||
|
||||
static final CallOptions.Key<MethodInfo.RetryPolicy> RETRY_POLICY_KEY =
|
||||
static final CallOptions.Key<RetryPolicy.Provider> RETRY_POLICY_KEY =
|
||||
CallOptions.Key.of("internal-retry-policy", null);
|
||||
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
|
||||
Map<String, MethodInfo> localServiceMethodMap = serviceMethodMap.get();
|
||||
MethodInfo info = null;
|
||||
if (localServiceMethodMap != null) {
|
||||
info = localServiceMethodMap.get(method.getFullMethodName());
|
||||
}
|
||||
if (info == null) {
|
||||
Map<String, MethodInfo> localServiceMap = serviceMap.get();
|
||||
if (localServiceMap != null) {
|
||||
info = localServiceMap.get(
|
||||
MethodDescriptor.extractFullServiceName(method.getFullMethodName()));
|
||||
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
|
||||
if (retryEnabled) {
|
||||
if (nameResolveComplete) {
|
||||
final RetryPolicy retryPolicy = getRetryPolicyFromConfig(method);
|
||||
final class ImmediateRetryPolicyProvider implements RetryPolicy.Provider {
|
||||
@Override
|
||||
public RetryPolicy get() {
|
||||
return retryPolicy;
|
||||
}
|
||||
}
|
||||
|
||||
callOptions = callOptions.withOption(RETRY_POLICY_KEY, new ImmediateRetryPolicyProvider());
|
||||
} else {
|
||||
final class DelayedRetryPolicyProvider implements RetryPolicy.Provider {
|
||||
/**
|
||||
* Returns RetryPolicy.DEFAULT if name resolving is not complete at the moment the method
|
||||
* is invoked, otherwise returns the RetryPolicy computed from service config.
|
||||
*
|
||||
* <p>Note that this method is used no more than once for each call.
|
||||
*/
|
||||
@Override
|
||||
public RetryPolicy get() {
|
||||
if (!nameResolveComplete) {
|
||||
return RetryPolicy.DEFAULT;
|
||||
}
|
||||
return getRetryPolicyFromConfig(method);
|
||||
}
|
||||
}
|
||||
|
||||
callOptions = callOptions.withOption(RETRY_POLICY_KEY, new DelayedRetryPolicyProvider());
|
||||
}
|
||||
}
|
||||
|
||||
MethodInfo info = getMethodInfo(method);
|
||||
if (info == null) {
|
||||
return next.newCall(method, callOptions);
|
||||
}
|
||||
|
@ -316,10 +308,33 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
|
||||
}
|
||||
}
|
||||
if (info.retryPolicy != null) {
|
||||
callOptions = callOptions.withOption(RETRY_POLICY_KEY, info.retryPolicy);
|
||||
}
|
||||
|
||||
return next.newCall(method, callOptions);
|
||||
}
|
||||
|
||||
@CheckForNull
|
||||
private MethodInfo getMethodInfo(MethodDescriptor<?, ?> method) {
|
||||
Map<String, MethodInfo> localServiceMethodMap = serviceMethodMap.get();
|
||||
MethodInfo info = null;
|
||||
if (localServiceMethodMap != null) {
|
||||
info = localServiceMethodMap.get(method.getFullMethodName());
|
||||
}
|
||||
if (info == null) {
|
||||
Map<String, MethodInfo> localServiceMap = serviceMap.get();
|
||||
if (localServiceMap != null) {
|
||||
info = localServiceMap.get(
|
||||
MethodDescriptor.extractFullServiceName(method.getFullMethodName()));
|
||||
}
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
RetryPolicy getRetryPolicyFromConfig(MethodDescriptor<?, ?> method) {
|
||||
MethodInfo info = getMethodInfo(method);
|
||||
if (info == null || info.retryPolicy == null) {
|
||||
return RetryPolicy.DEFAULT;
|
||||
}
|
||||
return info.retryPolicy;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,17 +20,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.math.LongMath.checkedAdd;
|
||||
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.Status.Code;
|
||||
import io.grpc.internal.RetriableStream.RetryPolicies;
|
||||
import io.grpc.internal.RetriableStream.RetryPolicy;
|
||||
import io.grpc.internal.RetriableStream.Throttle;
|
||||
import java.text.ParseException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -65,109 +58,6 @@ final class ServiceConfigUtil {
|
|||
|
||||
private ServiceConfigUtil() {}
|
||||
|
||||
/**
|
||||
* Gets retry policies from the service config.
|
||||
*
|
||||
* @throws ClassCastException if the service config doesn't parse properly
|
||||
*/
|
||||
static RetryPolicies getRetryPolicies(
|
||||
@Nullable Map<String, Object> serviceConfig, int maxAttemptsLimit) {
|
||||
final Map<String, RetryPolicy> fullMethodNameMap = new HashMap<String, RetryPolicy>();
|
||||
final Map<String, RetryPolicy> serviceNameMap = new HashMap<String, RetryPolicy>();
|
||||
|
||||
if (serviceConfig != null) {
|
||||
|
||||
/* schema as follows
|
||||
{
|
||||
"methodConfig": [
|
||||
{
|
||||
"name": [
|
||||
{
|
||||
"service": string,
|
||||
"method": string, // Optional
|
||||
}
|
||||
],
|
||||
"retryPolicy": {
|
||||
"maxAttempts": number,
|
||||
"initialBackoff": string, // Long decimal with "s" appended
|
||||
"maxBackoff": string, // Long decimal with "s" appended
|
||||
"backoffMultiplier": number
|
||||
"retryableStatusCodes": []
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
*/
|
||||
|
||||
if (serviceConfig.containsKey("methodConfig")) {
|
||||
List<Object> methodConfigs = getList(serviceConfig, "methodConfig");
|
||||
for (int i = 0; i < methodConfigs.size(); i++) {
|
||||
Map<String, Object> methodConfig = getObject(methodConfigs, i);
|
||||
if (methodConfig.containsKey("retryPolicy")) {
|
||||
Map<String, Object> retryPolicy = getObject(methodConfig, "retryPolicy");
|
||||
|
||||
int maxAttempts = getDouble(retryPolicy, "maxAttempts").intValue();
|
||||
maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
|
||||
|
||||
String initialBackoffStr = getString(retryPolicy, "initialBackoff");
|
||||
checkState(
|
||||
initialBackoffStr.charAt(initialBackoffStr.length() - 1) == 's',
|
||||
"invalid value of initialBackoff");
|
||||
double initialBackoff =
|
||||
Double.parseDouble(initialBackoffStr.substring(0, initialBackoffStr.length() - 1));
|
||||
|
||||
String maxBackoffStr = getString(retryPolicy, "maxBackoff");
|
||||
checkState(
|
||||
maxBackoffStr.charAt(maxBackoffStr.length() - 1) == 's',
|
||||
"invalid value of maxBackoff");
|
||||
double maxBackoff =
|
||||
Double.parseDouble(maxBackoffStr.substring(0, maxBackoffStr.length() - 1));
|
||||
|
||||
double backoffMultiplier = getDouble(retryPolicy, "backoffMultiplier");
|
||||
|
||||
List<Object> retryableStatusCodes = getList(retryPolicy, "retryableStatusCodes");
|
||||
Set<Code> codeSet = new HashSet<Code>(retryableStatusCodes.size());
|
||||
for (int j = 0; j < retryableStatusCodes.size(); j++) {
|
||||
String code = getString(retryableStatusCodes, j);
|
||||
codeSet.add(Code.valueOf(code));
|
||||
}
|
||||
|
||||
RetryPolicy pojoPolicy = new RetryPolicy(
|
||||
maxAttempts, initialBackoff, maxBackoff, backoffMultiplier, codeSet);
|
||||
|
||||
List<Object> names = getList(methodConfig, "name");
|
||||
for (int j = 0; j < names.size(); j++) {
|
||||
Map<String, Object> name = getObject(names, j);
|
||||
String service = getString(name, "service");
|
||||
if (name.containsKey("method")) {
|
||||
String method = getString(name, "method");
|
||||
fullMethodNameMap.put(
|
||||
MethodDescriptor.generateFullMethodName(service, method), pojoPolicy);
|
||||
} else {
|
||||
serviceNameMap.put(service, pojoPolicy);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new RetryPolicies() {
|
||||
@Override
|
||||
public RetryPolicy get(MethodDescriptor<?, ?> method) {
|
||||
RetryPolicy retryPolicy = fullMethodNameMap.get(method.getFullMethodName());
|
||||
if (retryPolicy == null) {
|
||||
retryPolicy = serviceNameMap
|
||||
.get(MethodDescriptor.extractFullServiceName(method.getFullMethodName()));
|
||||
}
|
||||
if (retryPolicy == null) {
|
||||
retryPolicy = RetryPolicy.DEFAULT;
|
||||
}
|
||||
return retryPolicy;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Nullable
|
||||
static Throttle getThrottlePolicy(@Nullable Map<String, Object> serviceConfig) {
|
||||
String retryThrottlingKey = "retryThrottling";
|
||||
|
|
|
@ -393,16 +393,16 @@ public class AbstractManagedChannelImplBuilderTest {
|
|||
Builder builder = new Builder("target");
|
||||
|
||||
builder.enableRetry();
|
||||
assertFalse(builder.retryDisabled);
|
||||
assertTrue(builder.retryEnabled);
|
||||
|
||||
builder.disableRetry();
|
||||
assertTrue(builder.retryDisabled);
|
||||
assertFalse(builder.retryEnabled);
|
||||
|
||||
builder.enableRetry();
|
||||
assertFalse(builder.retryDisabled);
|
||||
assertTrue(builder.retryEnabled);
|
||||
|
||||
builder.disableRetry();
|
||||
assertTrue(builder.retryDisabled);
|
||||
assertFalse(builder.retryEnabled);
|
||||
}
|
||||
|
||||
static class Builder extends AbstractManagedChannelImplBuilder<Builder> {
|
||||
|
|
|
@ -39,6 +39,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
|
@ -52,12 +53,10 @@ import io.grpc.Status;
|
|||
import io.grpc.Status.Code;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
|
||||
import io.grpc.internal.RetriableStream.RetryPolicy;
|
||||
import io.grpc.internal.RetriableStream.Throttle;
|
||||
import io.grpc.internal.StreamListener.MessageProducer;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -86,8 +85,8 @@ public class RetriableStreamTest {
|
|||
private static final long PER_RPC_BUFFER_LIMIT = 1000;
|
||||
private static final long CHANNEL_BUFFER_LIMIT = 2000;
|
||||
private static final int MAX_ATTEMPTS = 6;
|
||||
private static final double INITIAL_BACKOFF_IN_SECONDS = 100D;
|
||||
private static final double MAX_BACKOFF_IN_SECONDS = 700D;
|
||||
private static final long INITIAL_BACKOFF_IN_SECONDS = 100;
|
||||
private static final long MAX_BACKOFF_IN_SECONDS = 700;
|
||||
private static final double BACKOFF_MULTIPLIER = 2D;
|
||||
private static final double FAKE_RANDOM = .5D;
|
||||
|
||||
|
@ -107,8 +106,11 @@ public class RetriableStreamTest {
|
|||
private static final Code NON_RETRIABLE_STATUS_CODE = Code.INTERNAL;
|
||||
private static final RetryPolicy RETRY_POLICY =
|
||||
new RetryPolicy(
|
||||
MAX_ATTEMPTS, INITIAL_BACKOFF_IN_SECONDS, MAX_BACKOFF_IN_SECONDS, BACKOFF_MULTIPLIER,
|
||||
Arrays.asList(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2));
|
||||
MAX_ATTEMPTS,
|
||||
TimeUnit.SECONDS.toNanos(INITIAL_BACKOFF_IN_SECONDS),
|
||||
TimeUnit.SECONDS.toNanos(MAX_BACKOFF_IN_SECONDS),
|
||||
BACKOFF_MULTIPLIER,
|
||||
ImmutableSet.of(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2));
|
||||
|
||||
private final RetriableStreamRecorder retriableStreamRecorder =
|
||||
mock(RetriableStreamRecorder.class);
|
||||
|
@ -128,10 +130,18 @@ public class RetriableStreamTest {
|
|||
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
|
||||
Executor callExecutor,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
RetryPolicy retryPolicy,
|
||||
final RetryPolicy retryPolicy,
|
||||
@Nullable Throttle throttle) {
|
||||
super(method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, callExecutor,
|
||||
scheduledExecutorService, retryPolicy, throttle);
|
||||
super(
|
||||
method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, callExecutor,
|
||||
scheduledExecutorService,
|
||||
new RetryPolicy.Provider() {
|
||||
@Override
|
||||
public RetryPolicy get() {
|
||||
return retryPolicy;
|
||||
}
|
||||
},
|
||||
throttle);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,24 +16,30 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
|
||||
import static java.lang.Double.parseDouble;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.Status.Code;
|
||||
import io.grpc.internal.RetriableStream.RetryPolicies;
|
||||
import io.grpc.internal.RetriableStream.RetryPolicy;
|
||||
import io.grpc.internal.RetriableStream.Throttle;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
/** Unit tests for RetryPolicy. */
|
||||
@RunWith(JUnit4.class)
|
||||
|
@ -55,37 +61,96 @@ public class RetryPolicyTest {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj;
|
||||
RetryPolicies retryPolicies = ServiceConfigUtil.getRetryPolicies(serviceConfig, 4);
|
||||
assertNotNull(retryPolicies);
|
||||
|
||||
ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor(
|
||||
true /* retryEnabled */, 4 /* maxRetryAttemptsLimit */);
|
||||
serviceConfigInterceptor.handleUpdate(serviceConfig);
|
||||
|
||||
MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder();
|
||||
|
||||
MethodDescriptor<Void, Void> method = builder.setFullMethodName("not/exist").build();
|
||||
assertEquals(
|
||||
RetryPolicy.DEFAULT,
|
||||
retryPolicies.get(builder.setFullMethodName("not/exist").build()));
|
||||
serviceConfigInterceptor.getRetryPolicyFromConfig(method));
|
||||
|
||||
method = builder.setFullMethodName("not_exist/Foo1").build();
|
||||
assertEquals(
|
||||
RetryPolicy.DEFAULT,
|
||||
retryPolicies.get(builder.setFullMethodName("not_exist/Foo1").build()));
|
||||
serviceConfigInterceptor.getRetryPolicyFromConfig(method));
|
||||
|
||||
method = builder.setFullMethodName("SimpleService1/not_exist").build();
|
||||
|
||||
assertEquals(
|
||||
new RetryPolicy(
|
||||
3, parseDouble("2.1"), parseDouble("2.2"), parseDouble("3"),
|
||||
Arrays.asList(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)),
|
||||
retryPolicies.get(builder.setFullMethodName("SimpleService1/not_exist").build()));
|
||||
3,
|
||||
TimeUnit.MILLISECONDS.toNanos(2100),
|
||||
TimeUnit.MILLISECONDS.toNanos(2200),
|
||||
parseDouble("3"),
|
||||
ImmutableSet.of(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)),
|
||||
serviceConfigInterceptor.getRetryPolicyFromConfig(method));
|
||||
|
||||
method = builder.setFullMethodName("SimpleService1/Foo1").build();
|
||||
assertEquals(
|
||||
new RetryPolicy(
|
||||
4, parseDouble(".1"), parseDouble("1"), parseDouble("2"),
|
||||
Arrays.asList(Code.UNAVAILABLE)),
|
||||
retryPolicies.get(builder.setFullMethodName("SimpleService1/Foo1").build()));
|
||||
4,
|
||||
TimeUnit.MILLISECONDS.toNanos(100),
|
||||
TimeUnit.MILLISECONDS.toNanos(1000),
|
||||
parseDouble("2"),
|
||||
ImmutableSet.of(Code.UNAVAILABLE)),
|
||||
serviceConfigInterceptor.getRetryPolicyFromConfig(method));
|
||||
|
||||
method = builder.setFullMethodName("SimpleService2/not_exist").build();
|
||||
assertEquals(
|
||||
RetryPolicy.DEFAULT,
|
||||
retryPolicies.get(builder.setFullMethodName("SimpleService2/not_exist").build()));
|
||||
serviceConfigInterceptor.getRetryPolicyFromConfig(method));
|
||||
|
||||
method = builder.setFullMethodName("SimpleService2/Foo2").build();
|
||||
assertEquals(
|
||||
new RetryPolicy(
|
||||
4, parseDouble(".1"), parseDouble("1"), parseDouble("2"),
|
||||
Arrays.asList(Code.UNAVAILABLE)),
|
||||
retryPolicies.get(builder.setFullMethodName("SimpleService2/Foo2").build()));
|
||||
4,
|
||||
TimeUnit.MILLISECONDS.toNanos(100),
|
||||
TimeUnit.MILLISECONDS.toNanos(1000),
|
||||
parseDouble("2"),
|
||||
ImmutableSet.of(Code.UNAVAILABLE)),
|
||||
serviceConfigInterceptor.getRetryPolicyFromConfig(method));
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getRetryPolicies_retryDisabled() throws Exception {
|
||||
Channel channel = mock(Channel.class);
|
||||
ArgumentCaptor<CallOptions> callOptionsCap = ArgumentCaptor.forClass(CallOptions.class);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
reader = new BufferedReader(new InputStreamReader(RetryPolicyTest.class.getResourceAsStream(
|
||||
"/io/grpc/internal/test_retry_service_config.json"), "UTF-8"));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
sb.append(line).append('\n');
|
||||
}
|
||||
Object serviceConfigObj = JsonParser.parse(sb.toString());
|
||||
assertTrue(serviceConfigObj instanceof Map);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj;
|
||||
|
||||
ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor(
|
||||
false /* retryEnabled */, 4 /* maxRetryAttemptsLimit */);
|
||||
serviceConfigInterceptor.handleUpdate(serviceConfig);
|
||||
|
||||
MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder();
|
||||
|
||||
MethodDescriptor<Void, Void> method =
|
||||
builder.setFullMethodName("SimpleService1/Foo1").build();
|
||||
|
||||
serviceConfigInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
|
||||
verify(channel).newCall(eq(method), callOptionsCap.capture());
|
||||
assertThat(callOptionsCap.getValue().getOption(RETRY_POLICY_KEY)).isNull();
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package io.grpc.internal;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
|
@ -58,7 +59,8 @@ public class ServiceConfigInterceptorTest {
|
|||
MockitoAnnotations.initMocks(this);
|
||||
}
|
||||
|
||||
private final ServiceConfigInterceptor interceptor = new ServiceConfigInterceptor();
|
||||
private final ServiceConfigInterceptor interceptor = new ServiceConfigInterceptor(
|
||||
true /* retryEnabled */, 5 /* maxRetryAttemptsLimit */);
|
||||
|
||||
private final String fullMethodName =
|
||||
MethodDescriptor.generateFullMethodName("service", "method");
|
||||
|
@ -98,6 +100,16 @@ public class ServiceConfigInterceptorTest {
|
|||
assertThat(callOptionsCap.getValue().isWaitForReady()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handleUpdateNotCalledBeforeInterceptCall() {
|
||||
interceptor.interceptCall(methodDescriptor, CallOptions.DEFAULT.withoutWaitForReady(), channel);
|
||||
|
||||
verify(channel).newCall(eq(methodDescriptor), callOptionsCap.capture());
|
||||
assertThat(callOptionsCap.getValue().isWaitForReady()).isFalse();
|
||||
assertThat(callOptionsCap.getValue().getOption(RETRY_POLICY_KEY).get())
|
||||
.isEqualTo(RetryPolicy.DEFAULT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void withMaxRequestSize() {
|
||||
JsonObj name = new JsonObj("service", "service");
|
||||
|
@ -351,9 +363,9 @@ public class ServiceConfigInterceptorTest {
|
|||
assertThat(interceptor.serviceMethodMap.get())
|
||||
.containsExactly(
|
||||
methodDescriptor.getFullMethodName(),
|
||||
new MethodInfo(methodConfig));
|
||||
assertThat(interceptor.serviceMap.get())
|
||||
.containsExactly("service2", new MethodInfo(methodConfig));
|
||||
new MethodInfo(methodConfig, false, 1));
|
||||
assertThat(interceptor.serviceMap.get()).containsExactly(
|
||||
"service2", new MethodInfo(methodConfig, false, 1));
|
||||
}
|
||||
|
||||
|
||||
|
@ -364,7 +376,7 @@ public class ServiceConfigInterceptorTest {
|
|||
|
||||
thrown.expectMessage("Duration value is out of range");
|
||||
|
||||
new MethodInfo(methodConfig);
|
||||
new MethodInfo(methodConfig, false, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -372,7 +384,7 @@ public class ServiceConfigInterceptorTest {
|
|||
JsonObj name = new JsonObj("service", "service");
|
||||
JsonObj methodConfig = new JsonObj("name", new JsonList(name), "timeout", "315576000000s");
|
||||
|
||||
MethodInfo info = new MethodInfo(methodConfig);
|
||||
MethodInfo info = new MethodInfo(methodConfig, false, 1);
|
||||
|
||||
assertThat(info.timeoutNanos).isEqualTo(Long.MAX_VALUE);
|
||||
}
|
||||
|
@ -386,7 +398,7 @@ public class ServiceConfigInterceptorTest {
|
|||
thrown.expect(IllegalArgumentException.class);
|
||||
thrown.expectMessage("exceeds bounds");
|
||||
|
||||
new MethodInfo(methodConfig);
|
||||
new MethodInfo(methodConfig, false, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -397,7 +409,7 @@ public class ServiceConfigInterceptorTest {
|
|||
thrown.expect(IllegalArgumentException.class);
|
||||
thrown.expectMessage("exceeds bounds");
|
||||
|
||||
new MethodInfo(methodConfig);
|
||||
new MethodInfo(methodConfig, false, 1);
|
||||
}
|
||||
|
||||
private static final class NoopMarshaller implements MethodDescriptor.Marshaller<Void> {
|
||||
|
|
|
@ -41,7 +41,7 @@
|
|||
"waitForReady":true,
|
||||
"retryPolicy":{
|
||||
"maxAttempts":5,
|
||||
"initialBackoff":".1s",
|
||||
"initialBackoff":"0.1s",
|
||||
"maxBackoff":"1s",
|
||||
"backoffMultiplier":2,
|
||||
"retryableStatusCodes":[
|
||||
|
|
Loading…
Reference in New Issue