mirror of https://github.com/grpc/grpc-java.git
core: Provide DEADLINE_EXCEEDED insights for context deadline
We provided extra details when the RPC is killed by CallOptions' Deadline, but didn't do the same for Context. To avoid duplicating code, things were restructured, including the threading. There are more code flows now, but I think the multi-threading came out more obvious and less error-prone. I didn't change the status when the deadline is already expired, because the text is shared with DelayedClientCall and AbstractInteropTest doesn't distinguish between the two cases. This is a roll-forward that avoids a NPE when cancel() is called without an earlier call to start(). As seen at b/300991330
This commit is contained in:
parent
51f811df86
commit
3abab95e75
|
@ -28,7 +28,6 @@ import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
|
|||
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
|
||||
import static java.lang.Math.max;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
|
@ -62,6 +61,7 @@ import java.util.concurrent.Executor;
|
|||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -82,16 +82,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
private final boolean callExecutorIsDirect;
|
||||
private final CallTracer channelCallsTracer;
|
||||
private final Context context;
|
||||
private volatile ScheduledFuture<?> deadlineCancellationFuture;
|
||||
private CancellationHandler cancellationHandler;
|
||||
private final boolean unaryRequest;
|
||||
private CallOptions callOptions;
|
||||
private ClientStream stream;
|
||||
private volatile boolean cancelListenersShouldBeRemoved;
|
||||
private boolean cancelCalled;
|
||||
private boolean halfCloseCalled;
|
||||
private final ClientStreamProvider clientStreamProvider;
|
||||
private final ContextCancellationListener cancellationListener =
|
||||
new ContextCancellationListener();
|
||||
private final ScheduledExecutorService deadlineCancellationExecutor;
|
||||
private boolean fullStreamDecompression;
|
||||
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
|
||||
|
@ -128,13 +125,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
PerfMark.event("ClientCall.<init>", tag);
|
||||
}
|
||||
|
||||
private final class ContextCancellationListener implements CancellationListener {
|
||||
@Override
|
||||
public void cancelled(Context context) {
|
||||
stream.cancel(statusFromCancelled(context));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provider of {@link ClientStream}s.
|
||||
*/
|
||||
|
@ -252,21 +242,21 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
|
||||
|
||||
Deadline effectiveDeadline = effectiveDeadline();
|
||||
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
|
||||
boolean contextIsDeadlineSource = effectiveDeadline != null
|
||||
&& effectiveDeadline.equals(context.getDeadline());
|
||||
cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
|
||||
boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
|
||||
if (!deadlineExceeded) {
|
||||
logIfContextNarrowedTimeout(
|
||||
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
|
||||
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
|
||||
} else {
|
||||
ClientStreamTracer[] tracers =
|
||||
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
|
||||
String deadlineName =
|
||||
isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context";
|
||||
String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
|
||||
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
||||
String description = String.format(
|
||||
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
|
||||
+ "Name resolution delay %.9f seconds.", deadlineName,
|
||||
effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS,
|
||||
cancellationHandler.remainingNanos / NANO_TO_SECS,
|
||||
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
|
||||
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
|
||||
}
|
||||
|
@ -298,21 +288,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
// they receive cancel before start. Issue #1343 has more details
|
||||
|
||||
// Propagate later Context cancellation to the remote side.
|
||||
context.addListener(cancellationListener, directExecutor());
|
||||
if (effectiveDeadline != null
|
||||
// If the context has the effective deadline, we don't need to schedule an extra task.
|
||||
&& !effectiveDeadline.equals(context.getDeadline())
|
||||
// If the channel has been terminated, we don't need to schedule an extra task.
|
||||
&& deadlineCancellationExecutor != null) {
|
||||
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
|
||||
}
|
||||
if (cancelListenersShouldBeRemoved) {
|
||||
// Race detected! ClientStreamListener.closed may have been called before
|
||||
// deadlineCancellationFuture was set / context listener added, thereby preventing the future
|
||||
// and listener from being cancelled. Go ahead and cancel again, just to be sure it
|
||||
// was cancelled.
|
||||
removeContextListenerAndCancelDeadlineFuture();
|
||||
}
|
||||
cancellationHandler.setUp();
|
||||
}
|
||||
|
||||
private void applyMethodConfig() {
|
||||
|
@ -354,54 +330,77 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
}
|
||||
}
|
||||
|
||||
private static void logIfContextNarrowedTimeout(
|
||||
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
|
||||
@Nullable Deadline callDeadline) {
|
||||
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
|
||||
|| !effectiveDeadline.equals(outerCallDeadline)) {
|
||||
return;
|
||||
}
|
||||
|
||||
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
|
||||
StringBuilder builder = new StringBuilder(String.format(
|
||||
Locale.US,
|
||||
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
|
||||
if (callDeadline == null) {
|
||||
builder.append(" Explicit call timeout was not set.");
|
||||
} else {
|
||||
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
|
||||
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
|
||||
}
|
||||
|
||||
log.fine(builder.toString());
|
||||
}
|
||||
|
||||
private void removeContextListenerAndCancelDeadlineFuture() {
|
||||
context.removeListener(cancellationListener);
|
||||
ScheduledFuture<?> f = deadlineCancellationFuture;
|
||||
if (f != null) {
|
||||
f.cancel(false);
|
||||
}
|
||||
}
|
||||
|
||||
private class DeadlineTimer implements Runnable {
|
||||
private final class CancellationHandler implements Runnable, CancellationListener {
|
||||
private final boolean contextIsDeadlineSource;
|
||||
private final boolean hasDeadline;
|
||||
private final long remainingNanos;
|
||||
private volatile ScheduledFuture<?> deadlineCancellationFuture;
|
||||
private volatile boolean tearDownCalled;
|
||||
|
||||
DeadlineTimer(long remainingNanos) {
|
||||
this.remainingNanos = remainingNanos;
|
||||
CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
|
||||
this.contextIsDeadlineSource = contextIsDeadlineSource;
|
||||
if (deadline == null) {
|
||||
hasDeadline = false;
|
||||
remainingNanos = 0;
|
||||
} else {
|
||||
hasDeadline = true;
|
||||
remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
void setUp() {
|
||||
if (tearDownCalled) {
|
||||
return;
|
||||
}
|
||||
if (hasDeadline
|
||||
// If the context has the effective deadline, we don't need to schedule an extra task.
|
||||
&& !contextIsDeadlineSource
|
||||
// If the channel has been terminated, we don't need to schedule an extra task.
|
||||
&& deadlineCancellationExecutor != null) {
|
||||
deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
|
||||
new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
context.addListener(this, directExecutor());
|
||||
if (tearDownCalled) {
|
||||
// Race detected! Re-run to make sure the future is cancelled and context listener removed
|
||||
tearDown();
|
||||
}
|
||||
}
|
||||
|
||||
// May be called multiple times, and race with setUp()
|
||||
void tearDown() {
|
||||
tearDownCalled = true;
|
||||
ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
|
||||
if (deadlineCancellationFuture != null) {
|
||||
deadlineCancellationFuture.cancel(false);
|
||||
}
|
||||
context.removeListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelled(Context context) {
|
||||
if (hasDeadline && contextIsDeadlineSource
|
||||
&& context.cancellationCause() instanceof TimeoutException) {
|
||||
stream.cancel(formatDeadlineExceededStatus());
|
||||
return;
|
||||
}
|
||||
stream.cancel(statusFromCancelled(context));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
InsightBuilder insight = new InsightBuilder();
|
||||
stream.appendTimeoutInsight(insight);
|
||||
stream.cancel(formatDeadlineExceededStatus());
|
||||
}
|
||||
|
||||
Status formatDeadlineExceededStatus() {
|
||||
// DelayedStream.cancel() is safe to call from a thread that is different from where the
|
||||
// stream is created.
|
||||
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
|
||||
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
|
||||
|
||||
StringBuilder buf = new StringBuilder();
|
||||
buf.append("deadline exceeded after ");
|
||||
buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
|
||||
buf.append(" deadline exceeded after ");
|
||||
if (remainingNanos < 0) {
|
||||
buf.append('-');
|
||||
}
|
||||
|
@ -409,20 +408,18 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
buf.append(String.format(Locale.US, ".%09d", nanos));
|
||||
buf.append("s. ");
|
||||
Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
||||
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ",
|
||||
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
|
||||
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
|
||||
buf.append(insight);
|
||||
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
|
||||
if (stream != null) {
|
||||
InsightBuilder insight = new InsightBuilder();
|
||||
stream.appendTimeoutInsight(insight);
|
||||
buf.append(" ");
|
||||
buf.append(insight);
|
||||
}
|
||||
return DEADLINE_EXCEEDED.withDescription(buf.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
|
||||
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
|
||||
return deadlineCancellationExecutor.schedule(
|
||||
new LogExceptionRunnable(
|
||||
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private Deadline effectiveDeadline() {
|
||||
// Call options and context are immutable, so we don't need to cache the deadline.
|
||||
|
@ -440,16 +437,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
return deadline0.minimum(deadline1);
|
||||
}
|
||||
|
||||
private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
|
||||
if (deadline0 == null) {
|
||||
return false;
|
||||
}
|
||||
if (deadline1 == null) {
|
||||
return true;
|
||||
}
|
||||
return deadline0.isBefore(deadline1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void request(int numMessages) {
|
||||
try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
|
||||
|
@ -493,7 +480,10 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
stream.cancel(status);
|
||||
}
|
||||
} finally {
|
||||
removeContextListenerAndCancelDeadlineFuture();
|
||||
// start() might not have been called
|
||||
if (cancellationHandler != null) {
|
||||
cancellationHandler.tearDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -699,10 +689,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
// description. Since our timer may be delayed in firing, we double-check the deadline and
|
||||
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
|
||||
if (deadline.isExpired()) {
|
||||
InsightBuilder insight = new InsightBuilder();
|
||||
stream.appendTimeoutInsight(insight);
|
||||
status = DEADLINE_EXCEEDED.augmentDescription(
|
||||
"ClientCall was cancelled at or after deadline. " + insight);
|
||||
status = cancellationHandler.formatDeadlineExceededStatus();
|
||||
// Replace trailers to prevent mixing sources of status and trailers.
|
||||
trailers = new Metadata();
|
||||
}
|
||||
|
@ -725,6 +712,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
}
|
||||
|
||||
private void runInternal() {
|
||||
cancellationHandler.tearDown();
|
||||
Status status = savedStatus;
|
||||
Metadata trailers = savedTrailers;
|
||||
if (exceptionStatus != null) {
|
||||
|
@ -737,11 +725,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
// Replace trailers to prevent mixing sources of status and trailers.
|
||||
trailers = new Metadata();
|
||||
}
|
||||
cancelListenersShouldBeRemoved = true;
|
||||
try {
|
||||
closeObserver(observer, status, trailers);
|
||||
} finally {
|
||||
removeContextListenerAndCancelDeadlineFuture();
|
||||
channelCallsTracer.reportCallEnded(status.isOk());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -926,7 +926,7 @@ public class ClientCallImplTest {
|
|||
verify(stream, times(1)).cancel(statusCaptor.capture());
|
||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
||||
assertThat(statusCaptor.getValue().getDescription())
|
||||
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. "
|
||||
.matches("CallOptions deadline exceeded after [0-9]+\\.[0-9]+s. "
|
||||
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
||||
}
|
||||
|
||||
|
@ -954,7 +954,24 @@ public class ClientCallImplTest {
|
|||
|
||||
verify(stream, times(1)).cancel(statusCaptor.capture());
|
||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
||||
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
|
||||
assertThat(statusCaptor.getValue().getDescription())
|
||||
.matches("Context deadline exceeded after [0-9]+\\.[0-9]+s. "
|
||||
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelWithoutStart() {
|
||||
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
|
||||
|
||||
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
|
||||
method,
|
||||
MoreExecutors.directExecutor(),
|
||||
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
|
||||
clientStreamProvider,
|
||||
deadlineCancellationExecutor,
|
||||
channelCallTracer, configSelector);
|
||||
// Nothing happens as a result, but it shouldn't throw
|
||||
call.cancel("canceled", null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1190,7 +1190,7 @@ public abstract class AbstractInteropTest {
|
|||
assertTrue(desc,
|
||||
// There is a race between client and server-side deadline expiration.
|
||||
// If client expires first, it'd generate this message
|
||||
Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc)
|
||||
Pattern.matches("CallOptions deadline exceeded after .*s. \\[.*\\]", desc)
|
||||
// If server expires first, it'd reset the stream and client would generate a different
|
||||
// message
|
||||
|| desc.startsWith("ClientCall was cancelled at or after deadline."));
|
||||
|
|
Loading…
Reference in New Issue