stub: stabilize StreamObserver APIs

Resolves #1788
This commit is contained in:
ZHANG Dapeng 2021-03-10 13:41:31 -08:00 committed by GitHub
parent bb737a2e31
commit 2ca28a2903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 15 deletions

View File

@ -16,8 +16,6 @@
package io.grpc.stub;
import io.grpc.ExperimentalApi;
import javax.annotation.Nullable;
/**
@ -30,7 +28,6 @@ import javax.annotation.Nullable;
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and make a fake for the server-side.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788")
public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V> {
/**
* Prevent any further processing for this {@code ClientCallStreamObserver}. No further messages
@ -57,10 +54,60 @@ public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V>
*
* <p>This method may only be called during {@link ClientResponseObserver#beforeStart
* ClientResponseObserver.beforeStart()}.
*
* <p>This API is still a work in-progress and may change in the future.
*/
public void disableAutoRequestWithInitial(int request) {
throw new UnsupportedOperationException();
}
/**
* If {@code true}, indicates that the observer is capable of sending additional messages
* without requiring excessive buffering internally. This value is just a suggestion and the
* application is free to ignore it, however doing so may result in excessive buffering within the
* observer.
*
* <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after
* {@code isReady()} transitions to {@code true}.
*/
@Override
public abstract boolean isReady();
/**
* Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state
* changes from {@code false} to {@code true}. While it is not guaranteed that the same
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
* notifications by checking {@code isReady()}'s current value instead of assuming it is now
* {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
* <em>another</em> {@code onReadyHandler} callback.
*
* @param onReadyHandler to call when peer is ready to receive more messages.
*/
@Override
public abstract void setOnReadyHandler(Runnable onReadyHandler);
/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.
*
* <p>This method is safe to call from multiple threads without external synchronization.
*
* @param count more messages
*/
@Override
public abstract void request(int count);
/**
* Sets message compression for subsequent calls to {@link #onNext}.
*
* @param enable whether to enable compression.
*/
@Override
public abstract void setMessageCompression(boolean enable);
}

View File

@ -16,13 +16,10 @@
package io.grpc.stub;
import io.grpc.ExperimentalApi;
/**
* Specialization of {@link StreamObserver} implemented by clients in order to interact with the
* advanced features of a call such as flow-control.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4693")
public interface ClientResponseObserver<ReqT, RespT> extends StreamObserver<RespT> {
/**
* Called by the runtime priot to the start of a call to provide a reference to the

View File

@ -16,8 +16,6 @@
package io.grpc.stub;
import io.grpc.ExperimentalApi;
/**
* A refinement of {@link CallStreamObserver} to allows for interaction with call
* cancellation events on the server side.
@ -28,7 +26,6 @@ import io.grpc.ExperimentalApi;
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and interact with the server using a normal client stub.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788")
public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V> {
/**
@ -89,10 +86,61 @@ public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V>
* <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
* </ul>
* </p>
*
* <p>This API is still a work in-progress and may change in the future.
*/
public void disableAutoRequest() {
throw new UnsupportedOperationException();
}
/**
* If {@code true}, indicates that the observer is capable of sending additional messages
* without requiring excessive buffering internally. This value is just a suggestion and the
* application is free to ignore it, however doing so may result in excessive buffering within the
* observer.
*
* <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after
* {@code isReady()} transitions to {@code true}.
*/
@Override
public abstract boolean isReady();
/**
* Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state
* changes from {@code false} to {@code true}. While it is not guaranteed that the same
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
* notifications by checking {@code isReady()}'s current value instead of assuming it is now
* {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
* <em>another</em> {@code onReadyHandler} callback.
*
* @param onReadyHandler to call when peer is ready to receive more messages.
*/
@Override
public abstract void setOnReadyHandler(Runnable onReadyHandler);
/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.
*
* <p>This method is safe to call from multiple threads without external synchronization.
*
* @param count more messages
*/
@Override
public abstract void request(int count);
/**
* Sets message compression for subsequent calls to {@link #onNext}.
*
* @param enable whether to enable compression.
*/
@Override
public abstract void setMessageCompression(boolean enable);
}

View File

@ -390,7 +390,7 @@ public class ClientCallsTest {
ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
requestStream.disableAutoRequestWithInitial(1);
}
@Override

View File

@ -288,7 +288,7 @@ public class ServerCallsTest {
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
serverCallObserver.disableAutoRequest();
return new ServerCalls.NoopStreamObserver<>();
}
});