From 0201c5a9a7551c736d6f63476b145eca4f17c3b6 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 29 May 2020 15:05:46 -0700 Subject: [PATCH] stub: Have disableAutoRequest() on server-side This splits server-side flow control from client-side, but tailors the API for each case. Client-side continues having disableAutoRequestWithInitial(). While client-side could have disableAutoRequest(), it seems like it will only rarely be used and disableAutoRequestWithInitial(0) isn't that bad. So we leave it off for now; we can always add it in the future. --- .../ManualFlowControlServer.java | 2 +- .../services/ProtoReflectionService.java | 3 +- .../java/io/grpc/stub/CallStreamObserver.java | 32 ++----------------- .../grpc/stub/ClientCallStreamObserver.java | 15 +++++++++ .../grpc/stub/ServerCallStreamObserver.java | 22 +++++++++++++ .../main/java/io/grpc/stub/ServerCalls.java | 11 +++---- .../java/io/grpc/stub/ClientCallsTest.java | 2 +- .../java/io/grpc/stub/ServerCallsTest.java | 6 ++-- 8 files changed, 51 insertions(+), 42 deletions(-) diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java index b8fbff65a3..de8142596e 100644 --- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java +++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java @@ -39,7 +39,7 @@ public class ManualFlowControlServer { // stream's flow control using the response stream's observer, but this is the way it is. final ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) responseObserver; - serverCallStreamObserver.disableAutoRequestWithInitial(0); + serverCallStreamObserver.disableAutoRequest(); // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked // when the consuming side has enough buffer space to receive more messages. diff --git a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java index 2eaacf1790..4a7840a3ad 100644 --- a/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java +++ b/services/src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java @@ -133,7 +133,8 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef ProtoReflectionStreamObserver requestObserver = new ProtoReflectionStreamObserver(getRefreshedIndex(), serverCallStreamObserver); serverCallStreamObserver.setOnReadyHandler(requestObserver); - serverCallStreamObserver.disableAutoRequestWithInitial(1); + serverCallStreamObserver.disableAutoRequest(); + serverCallStreamObserver.request(1); return requestObserver; } diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index 878ba8f80a..7b3d4e55b3 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -107,38 +107,12 @@ public abstract class CallStreamObserver implements StreamObserver { * *

* - *

To migrate to {@link #disableAutoRequestWithInitial} on the server side, call - * {@code disableAutoRequestWithInitial(0)} as {@code disableAutoInboundFlowControl} - * already disables all inbound requests. On the client side, {@code - * disableAutoRequestWithInitial(1)} should be called to maintain existing behavior as - * {@code disableAutoInboundFlowControl} does not disable the initial request. + *

This API is being replaced, but is not yet deprecated. On server-side it being replaced + * with {@link ServerCallStreamObserver#disableAutoRequest}. On client-side {@link + * ClientCallStreamObserver#disableAutoRequestWithInitial disableAutoRequestWithInitial(1)}. */ public abstract void disableAutoInboundFlowControl(); - /** - * Disables automatic flow control where an additional message is requested to be read after a - * call to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. A - * number of initial requests to make when the call is started may be specified. - * - *

On client-side this method may only be called during {@link - * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial - * call to the application, before the service returns its {@code StreamObserver}. - * - *

Note that for server-side cases where the message is received before the handler is invoked, - * this method will have no effect. This is true for: - * - *

    - *
  • {@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.
  • - *
  • {@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.
  • - *
- *

- * - *

This API is still a work in-progress and will likely change in the future. - */ - public void disableAutoRequestWithInitial(int request) { - throw new UnsupportedOperationException(); - } - /** * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' * {@link StreamObserver}. diff --git a/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java index 36fe05b733..e18d19fc37 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java @@ -48,4 +48,19 @@ public abstract class ClientCallStreamObserver extends CallStreamObserver * @param cause if not {@code null}, will appear as the cause of the CANCELLED status */ public abstract void cancel(@Nullable String message, @Nullable Throwable cause); + + /** + * Swaps to manual flow control where no message will be delivered to {@link + * StreamObserver#onNext(Object)} unless it is {@link #request request()}ed. Since {@code + * request()} may not be called before the call is started, a number of initial requests may be + * specified. + * + *

This method may only be called during {@link ClientResponseObserver#beforeStart + * ClientResponseObserver.beforeStart()}. + * + *

This API is still a work in-progress and may change in the future. + */ + public void disableAutoRequestWithInitial(int request) { + throw new UnsupportedOperationException(); + } } diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 7f52b7d24c..9190aa580e 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -64,4 +64,26 @@ public abstract class ServerCallStreamObserver extends CallStreamObserver * @throws IllegalArgumentException if the compressor name can not be found. */ public abstract void setCompression(String compression); + + /** + * Swaps to manual flow control where no message will be delivered to {@link + * StreamObserver#onNext(Object)} unless it is {@link #request request()}ed. + * + *

It may only be called during the initial call to the application, before the service returns + * its {@code StreamObserver}. + * + *

Note that for cases where the message is received before the service handler is invoked, + * this method will have no effect. This is true for: + * + *

    + *
  • {@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.
  • + *
  • {@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.
  • + *
+ *

+ * + *

This API is still a work in-progress and may change in the future. + */ + public void disableAutoRequest() { + throw new UnsupportedOperationException(); + } } diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 13dba8ea1d..8e5ed1e106 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -223,8 +223,8 @@ public final class ServerCalls { new ServerCallStreamObserverImpl<>(call); StreamObserver requestObserver = method.invoke(responseObserver); responseObserver.freeze(); - if (responseObserver.initialRequest > 0) { - call.request(responseObserver.initialRequest); + if (responseObserver.autoRequestEnabled) { + call.request(1); } return new StreamingServerCallListener(requestObserver, responseObserver, call); } @@ -308,7 +308,6 @@ public final class ServerCalls { final ServerCall call; volatile boolean cancelled; private boolean frozen; - private int initialRequest = 1; private boolean autoRequestEnabled = true; private boolean sentHeaders; private Runnable onReadyHandler; @@ -403,14 +402,12 @@ public final class ServerCalls { @Deprecated @Override public void disableAutoInboundFlowControl() { - disableAutoRequestWithInitial(0); + disableAutoRequest(); } @Override - public void disableAutoRequestWithInitial(int request) { + public void disableAutoRequest() { checkState(!frozen, "Cannot disable auto flow control after initialization"); - Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative"); - initialRequest = request; autoRequestEnabled = false; } diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java index 2daf11ac48..6b54ba1f62 100644 --- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java @@ -643,7 +643,7 @@ public class ClientCallsTest { public StreamObserver invoke(StreamObserver responseObserver) { final ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - serverCallObserver.disableAutoRequestWithInitial(0); + serverCallObserver.disableAutoRequest(); observerFuture.set(serverCallObserver); return new StreamObserver() { @Override diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 775c86717d..29981932fb 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -276,7 +276,7 @@ public class ServerCallsTest { callHandler.startCall(serverCall, new Metadata()); callListener.onMessage(1); try { - callObserver.get().disableAutoRequestWithInitial(0); + callObserver.get().disableAutoRequest(); fail("Cannot set onCancel handler after service invocation"); } catch (IllegalStateException expected) { // Expected @@ -315,7 +315,7 @@ public class ServerCallsTest { public StreamObserver invoke(StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - serverCallObserver.disableAutoRequestWithInitial(0); + serverCallObserver.disableAutoRequest(); return new ServerCalls.NoopStreamObserver<>(); } }); @@ -338,7 +338,7 @@ public class ServerCallsTest { public void invoke(Integer req, StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - serverCallObserver.disableAutoRequestWithInitial(0); + serverCallObserver.disableAutoRequest(); } }); callHandler.startCall(serverCall, new Metadata());