stub: Have disableAutoRequest() on server-side

This splits server-side flow control from client-side, but tailors the API for
each case. Client-side continues having disableAutoRequestWithInitial(). While
client-side could have disableAutoRequest(), it seems like it will only rarely
be used and disableAutoRequestWithInitial(0) isn't that bad. So we leave it off
for now; we can always add it in the future.
This commit is contained in:
Eric Anderson 2020-05-29 15:05:46 -07:00 committed by GitHub
parent 0cb91d97bf
commit 0201c5a9a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 51 additions and 42 deletions

View File

@ -39,7 +39,7 @@ public class ManualFlowControlServer {
// stream's flow control using the response stream's observer, but this is the way it is.
final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
(ServerCallStreamObserver<HelloReply>) responseObserver;
serverCallStreamObserver.disableAutoRequestWithInitial(0);
serverCallStreamObserver.disableAutoRequest();
// Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.

View File

@ -133,7 +133,8 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(getRefreshedIndex(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
serverCallStreamObserver.disableAutoRequestWithInitial(1);
serverCallStreamObserver.disableAutoRequest();
serverCallStreamObserver.request(1);
return requestObserver;
}

View File

@ -107,38 +107,12 @@ public abstract class CallStreamObserver<V> implements StreamObserver<V> {
* </ul>
* </p>
*
* <p>To migrate to {@link #disableAutoRequestWithInitial} on the server side, call
* {@code disableAutoRequestWithInitial(0)} as {@code disableAutoInboundFlowControl}
* already disables all inbound requests. On the client side, {@code
* disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as
* {@code disableAutoInboundFlowControl} does not disable the initial request.
* <p>This API is being replaced, but is not yet deprecated. On server-side it being replaced
* with {@link ServerCallStreamObserver#disableAutoRequest}. On client-side {@link
* ClientCallStreamObserver#disableAutoRequestWithInitial disableAutoRequestWithInitial(1)}.
*/
public abstract void disableAutoInboundFlowControl();
/**
* Disables automatic flow control where an additional message is requested to be read after a
* call to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. A
* number of initial requests to make when the call is started may be specified.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
*
* <p>Note that for server-side cases where the message is received before the handler is invoked,
* this method will have no effect. This is true for:
*
* <ul>
* <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.</li>
* <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
* </ul>
* </p>
*
* <p>This API is still a work in-progress and will likely change in the future.
*/
public void disableAutoRequestWithInitial(int request) {
throw new UnsupportedOperationException();
}
/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.

View File

@ -48,4 +48,19 @@ public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V>
* @param cause if not {@code null}, will appear as the cause of the CANCELLED status
*/
public abstract void cancel(@Nullable String message, @Nullable Throwable cause);
/**
* Swaps to manual flow control where no message will be delivered to {@link
* StreamObserver#onNext(Object)} unless it is {@link #request request()}ed. Since {@code
* request()} may not be called before the call is started, a number of initial requests may be
* specified.
*
* <p>This method may only be called during {@link ClientResponseObserver#beforeStart
* ClientResponseObserver.beforeStart()}.
*
* <p>This API is still a work in-progress and may change in the future.
*/
public void disableAutoRequestWithInitial(int request) {
throw new UnsupportedOperationException();
}
}

View File

@ -64,4 +64,26 @@ public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V>
* @throws IllegalArgumentException if the compressor name can not be found.
*/
public abstract void setCompression(String compression);
/**
* Swaps to manual flow control where no message will be delivered to {@link
* StreamObserver#onNext(Object)} unless it is {@link #request request()}ed.
*
* <p>It may only be called during the initial call to the application, before the service returns
* its {@code StreamObserver}.
*
* <p>Note that for cases where the message is received before the service handler is invoked,
* this method will have no effect. This is true for:
*
* <ul>
* <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.</li>
* <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
* </ul>
* </p>
*
* <p>This API is still a work in-progress and may change in the future.
*/
public void disableAutoRequest() {
throw new UnsupportedOperationException();
}
}

View File

@ -223,8 +223,8 @@ public final class ServerCalls {
new ServerCallStreamObserverImpl<>(call);
StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
responseObserver.freeze();
if (responseObserver.initialRequest > 0) {
call.request(responseObserver.initialRequest);
if (responseObserver.autoRequestEnabled) {
call.request(1);
}
return new StreamingServerCallListener(requestObserver, responseObserver, call);
}
@ -308,7 +308,6 @@ public final class ServerCalls {
final ServerCall<ReqT, RespT> call;
volatile boolean cancelled;
private boolean frozen;
private int initialRequest = 1;
private boolean autoRequestEnabled = true;
private boolean sentHeaders;
private Runnable onReadyHandler;
@ -403,14 +402,12 @@ public final class ServerCalls {
@Deprecated
@Override
public void disableAutoInboundFlowControl() {
disableAutoRequestWithInitial(0);
disableAutoRequest();
}
@Override
public void disableAutoRequestWithInitial(int request) {
public void disableAutoRequest() {
checkState(!frozen, "Cannot disable auto flow control after initialization");
Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
initialRequest = request;
autoRequestEnabled = false;
}

View File

@ -643,7 +643,7 @@ public class ClientCallsTest {
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
final ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoRequestWithInitial(0);
serverCallObserver.disableAutoRequest();
observerFuture.set(serverCallObserver);
return new StreamObserver<Integer>() {
@Override

View File

@ -276,7 +276,7 @@ public class ServerCallsTest {
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().disableAutoRequestWithInitial(0);
callObserver.get().disableAutoRequest();
fail("Cannot set onCancel handler after service invocation");
} catch (IllegalStateException expected) {
// Expected
@ -315,7 +315,7 @@ public class ServerCallsTest {
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoRequestWithInitial(0);
serverCallObserver.disableAutoRequest();
return new ServerCalls.NoopStreamObserver<>();
}
});
@ -338,7 +338,7 @@ public class ServerCallsTest {
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoRequestWithInitial(0);
serverCallObserver.disableAutoRequest();
}
});
callHandler.startCall(serverCall, new Metadata());