stub: disable exception if onCancelHandler set

This restrains a cancellation Exception when an onCancelHandler
is set in ServerCallStreamObserverImpl.

Signed-off-by: Venil Noronha <veniln@vmware.com>
This commit is contained in:
Venil Noronha 2018-11-30 16:36:12 -08:00 committed by Eric Anderson
parent 3dab7aed2f
commit 3f4c3f0406
3 changed files with 82 additions and 5 deletions

View File

@ -39,12 +39,15 @@ public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V>
public abstract boolean isCancelled();
/**
* Set a {@link Runnable} that will be called if the calls {@link #isCancelled()} state
* Set a {@link Runnable} that will be called if the calls {@link #isCancelled()} state
* changes from {@code false} to {@code true}. It is guaranteed that execution of the
* {@link Runnable} are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>Note that the handler may be called some time after {@link #isCancelled} has transitioned to
* {@code true} as other callbacks may still be executing in the 'inbound' observer.
* <p>Note that the handler may be called some time after {@link #isCancelled()} has transitioned
* to {@code true} as other callbacks may still be executing in the 'inbound' observer.
*
* <p>Setting the onCancelHandler will suppress the on-cancel exception thrown by
* {@link #onNext()}.
*
* @param onCancelHandler to call when client has cancelled the call.
*/

View File

@ -332,7 +332,10 @@ public final class ServerCalls {
@Override
public void onNext(RespT response) {
if (cancelled) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
return;
}
if (!sentHeaders) {
call.sendHeaders(new Metadata());
@ -353,7 +356,9 @@ public final class ServerCalls {
@Override
public void onCompleted() {
if (cancelled) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
} else {
call.close(Status.OK, new Metadata());
}

View File

@ -33,6 +33,7 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.ByteArrayInputStream;
@ -134,6 +135,74 @@ public class ServerCallsTest {
assertThat(serverCall.requestCalls).containsExactly(1, 1).inOrder();
}
@Test
public void noCancellationExceptionIfOnCancelHandlerSet() throws Exception {
final AtomicBoolean onCancelCalled = new AtomicBoolean();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
callObserver.set(serverCallObserver);
serverCallObserver.setOnCancelHandler(new Runnable() {
@Override
public void run() {
onCancelCalled.set(true);
}
});
return new ServerCalls.NoopStreamObserver<Integer>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onReady();
callListener.onCancel();
assertTrue(onCancelCalled.get());
serverCall.isCancelled = true;
assertTrue(callObserver.get().isCancelled());
callObserver.get().onNext(null);
callObserver.get().onCompleted();
}
@Test
public void expectCancellationExceptionIfOnCancelHandlerNotSet() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
callObserver.set(serverCallObserver);
return new ServerCalls.NoopStreamObserver<Integer>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onReady();
callListener.onCancel();
serverCall.isCancelled = true;
assertTrue(callObserver.get().isCancelled());
try {
callObserver.get().onNext(null);
fail("Expected cancellation exception when onCallHandler not set");
} catch (StatusRuntimeException expected) {
// Expected
}
try {
callObserver.get().onCompleted();
fail("Expected cancellation exception when onCallHandler not set");
} catch (StatusRuntimeException expected) {
// Expected
}
}
@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =