From ef31a5f2ae2014d706453d1866b6b8f9de195369 Mon Sep 17 00:00:00 2001 From: nathanmittler Date: Tue, 3 Jun 2014 07:48:25 -0700 Subject: [PATCH] First cut at the GRPC Transport interfaces for Java. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=68458345 --- .../net/stubby/newtransport/ClientStream.java | 16 +++++ .../stubby/newtransport/ClientTransport.java | 30 ++++++++ .../stubby/newtransport/ServerListener.java | 17 +++++ .../net/stubby/newtransport/ServerStream.java | 19 +++++ .../newtransport/ServerTransportListener.java | 18 +++++ .../net/stubby/newtransport/Stream.java | 70 +++++++++++++++++++ .../stubby/newtransport/StreamListener.java | 57 +++++++++++++++ .../net/stubby/newtransport/StreamState.java | 26 +++++++ 8 files changed, 253 insertions(+) create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ClientStream.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ServerListener.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/Stream.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/StreamState.java diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/ClientStream.java new file mode 100644 index 0000000000..7011608bea --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/ClientStream.java @@ -0,0 +1,16 @@ +package com.google.net.stubby.newtransport; + + +/** + * Extension of {@link Stream} to support client-side termination semantics. + */ +public interface ClientStream extends Stream { + + /** + * Used to abnormally terminate the stream. Any internally buffered messages are dropped. After + * this is called, no further messages may be sent and no further {@link StreamListener} callbacks + * (with the exception of onClosed) will be invoked for this stream. Any frames received for this + * stream after returning from this method will be discarded. + */ + void cancel(); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java new file mode 100644 index 0000000000..3df8a2f541 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java @@ -0,0 +1,30 @@ +package com.google.net.stubby.newtransport; + +import com.google.common.util.concurrent.Service; +import com.google.net.stubby.stub.MethodDescriptor; + +/** + * The client-side transport encapsulating a single connection to a remote server. Allows creation + * of new {@link Stream} instances for communication with the server. + *

+ * Transport life cycle (i.e. startup/shutdown) is exposed via the {@link Service} interface. + * Observers of the transport life-cycle may be added via {@link Service#addListener}. + */ +public interface ClientTransport extends Service { + + /** + * Creates a new stream for sending messages to the remote end-point. If the service is already + * stopped, throws an {@link IllegalStateException}. + * TODO(user): Consider also throwing for stopping. + *

+ * This method returns immediately and does not wait for any validation of the request. If + * creation fails for any reason, {@link StreamListener#closed} will be called to provide the + * error information. Any sent messages for this stream will be buffered until creation has + * completed (either successfully or unsuccessfully). + * + * @param method the descriptor of the remote method to be called for this stream. + * @param listener the listener for the newly created stream. + * @return the newly created stream. + */ + ClientStream newStream(MethodDescriptor method, StreamListener listener); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerListener.java new file mode 100644 index 0000000000..19f1c7a01f --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerListener.java @@ -0,0 +1,17 @@ +package com.google.net.stubby.newtransport; + +import com.google.common.util.concurrent.Service; + +/** + * A listener to a server for transport creation events. + */ +public interface ServerListener { + + /** + * Called upon the establishment of a new client connection. + * + * @param transport the new transport to be observed. + * @return a listener for stream creation events on the transport. + */ + ServerTransportListener transportCreated(Service transport); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java new file mode 100644 index 0000000000..bf8a723abd --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java @@ -0,0 +1,19 @@ +package com.google.net.stubby.newtransport; + +import com.google.net.stubby.transport.Transport.Status; + + +/** + * Extension of {@link Stream} to support server-side termination semantics. + */ +public interface ServerStream extends Stream { + + /** + * Closes the local side of this stream. A status code of + * {@link com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the + * local side of the stream (i.e. half-closed). Any other value implies abnormal termination. + * + * @param status details for the closure of the local-side of this stream. + */ + void close(Status status); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java new file mode 100644 index 0000000000..11a2642429 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java @@ -0,0 +1,18 @@ +package com.google.net.stubby.newtransport; + +import com.google.net.stubby.stub.MethodDescriptor; + +/** + * A observer of a server-side transport for stream creation events. + */ +public interface ServerTransportListener { + + /** + * Called when a new stream was created by the remote client. + * + * @param stream the newly created stream. + * @param method the method being called on the server. + * @return a listener for events on the new stream. + */ + StreamListener streamCreated(ServerStream stream, MethodDescriptor method); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Stream.java b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java new file mode 100644 index 0000000000..4c37fb3866 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java @@ -0,0 +1,70 @@ +package com.google.net.stubby.newtransport; + +import java.io.InputStream; + +/** + * A single stream of communication between two end-points within a transport. + */ +public interface Stream> { + + /** + * Gets the current state of this stream. + */ + StreamState state(); + + /** + * Closes the local side of this stream and flushes any remaining messages. After this is called, + * no further messages may be sent on this stream, but additional messages may be received until + * the remote end-point is closed. Calling this method automatically causes a {@link #flush()} to + * occur, so this method may block if awaiting resources. + */ + void close(); + + /** + * Writes the context name/value pair to the remote end-point. This method may block if awaiting + * resources. + * + * @param name the unique application-defined name for the context propery. + * @param value the value of the context property. + * @param offset the offset within the value array that is the start of the value. + * @param length the length of the value starting from the offset index. + * @return this stream instance. + */ + T writeContext(String name, byte[] value, int offset, int length); + + /** + * Writes the context name/value pair to the remote end-point. This method may block if awaiting + * resources. + * + * @param name the unique application-defined name for the context propery. + * @param value the value of the context property. + * @param length the length of the {@link InputStream}. + * @return this stream instance. + */ + T writeContext(String name, InputStream value, int length); + + /** + * Writes a message payload to the remote end-point. This method may block if awaiting resources. + * + * @param message array containing the serialized message to be sent + * @param offset the offset within the message array that is the start of the value. + * @param length the length of the message starting from the offset index. + * @return this stream instance. + */ + T writeMessage(byte[] message, int offset, int length); + + /** + * Writes a message payload to the remote end-point. This method may block if awaiting resources. + * + * @param message stream containing the serialized message to be sent + * @param length the length of the {@link InputStream}. + * @return this stream instance. + */ + T writeMessage(InputStream message, int length); + + /** + * Flushes any internally buffered messages to the remote end-point. This method may block if + * awaiting resources. + */ + T flush(); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java new file mode 100644 index 0000000000..98bbe08334 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java @@ -0,0 +1,57 @@ +package com.google.net.stubby.newtransport; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.net.stubby.transport.Transport.Status; + +import java.io.InputStream; + +/** + * An observer of {@link Stream} events. + */ +public interface StreamListener { + + /** + * Called upon receiving context information from the remote end-point. The {@link InputStream} is + * non-blocking and contains the entire context. + *

+ * This method is called within the context of the transport thread. It is guaranteed to only have + * one concurrent callback at a time. + * + * @param name the unique name of the context + * @param value the value of the context. + * @param length the length of the value {@link InputStream}. + * @return a future that can be observed by flow control to determine when the context has been + * processed by the application. If {@code null}, processing of this context is assumed to + * be complete upon returning from this method. + */ + ListenableFuture contextRead(String name, InputStream value, int length); + + /** + * Called upon receiving a message from the remote end-point. The {@link InputStream} is + * non-blocking and contains the entire message. + *

+ * This method is called within the context of the transport thread. It is guaranteed to only have + * one concurrent callback at a time. + * + * @param message the bytes of the message. + * @param length the length of the message {@link InputStream}. + * @return a future that can be observed by flow control to determine when the message has been + * processed by the application. If {@code null}, processing of this message is assumed to + * be complete upon returning from this method. + */ + ListenableFuture messageRead(InputStream message, int length); + + /** + * Called when the remote side of the transport closed. A status code of + * {@link com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the + * remote side of the stream (i.e. half-closed). Any other value implies abnormal termination. If + * the remote end-point was abnormally terminated, no further messages will be received on the + * stream. + *

+ * This method is called within the context of the transport thread. It is guaranteed to only have + * one concurrent callback at a time. + * + * @param status details of the remote stream closure. + */ + void closed(Status status); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/StreamState.java b/core/src/main/java/com/google/net/stubby/newtransport/StreamState.java new file mode 100644 index 0000000000..163680b78b --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/StreamState.java @@ -0,0 +1,26 @@ +package com.google.net.stubby.newtransport; + +/** + * The state of a single {@link Stream} within a transport. + */ +public enum StreamState { + /** + * The stream is open for write by both endpoints. + */ + OPEN, + + /** + * Only the remote endpoint may send data. The local endpoint may only read. + */ + READ_ONLY, + + /** + * Only the local endpoint may send data. The remote endpoint may only read. + */ + WRITE_ONLY, + + /** + * Neither endpoint may send data. + */ + CLOSED +}