From e76b8e7ee8017d3b29eac698a9be4b96a989b72b Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 5 Aug 2015 13:24:41 -0700 Subject: [PATCH] Renamed Server payload to message --- .../benchmarks/netty/AbstractBenchmark.java | 34 +++++++++---------- ...wControlledMessagesPerSecondBenchmark.java | 4 +-- .../SingleThreadBlockingQpsBenchmark.java | 4 +-- .../StreamingPingPongsPerSecondBenchmark.java | 4 +-- .../StreamingResponseBandwidthBenchmark.java | 4 +-- .../netty/UnaryCallQpsBenchmark.java | 4 +-- .../UnaryCallResponseBandwidthBenchmark.java | 4 +-- .../java/io/grpc/ForwardingServerCall.java | 4 +-- .../io/grpc/ForwardingServerCallListener.java | 4 +-- core/src/main/java/io/grpc/ServerCall.java | 26 +++++++------- core/src/main/java/io/grpc/ServerImpl.java | 14 ++++---- .../src/test/java/io/grpc/ServerImplTest.java | 6 ++-- .../header/HeaderServerInterceptor.java | 4 +-- .../main/java/io/grpc/stub/ServerCalls.java | 12 +++---- .../main/java/io/grpc/testing/TestUtils.java | 4 +-- 15 files changed, 66 insertions(+), 66 deletions(-) diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java index 2ed262c720..fdc88e05c3 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java @@ -51,15 +51,15 @@ import java.util.concurrent.atomic.AtomicReference; public abstract class AbstractBenchmark { /** - * Standard payload sizes. + * Standard message sizes. */ - public enum PayloadSize { - // Max out at 1MB to avoid creating payloads larger than Netty's buffer pool can handle + public enum MessageSize { + // Max out at 1MB to avoid creating messages larger than Netty's buffer pool can handle // by default SMALL(10), MEDIUM(1024), LARGE(65536), JUMBO(1048576); private final int bytes; - PayloadSize(int bytes) { + MessageSize(int bytes) { this.bytes = bytes; } @@ -172,8 +172,8 @@ public abstract class AbstractBenchmark { */ public void setup(ExecutorType clientExecutor, ExecutorType serverExecutor, - PayloadSize requestSize, - PayloadSize responseSize, + MessageSize requestSize, + MessageSize responseSize, FlowWindowSize windowSize, ChannelType channelType, int maxConcurrentStreams, @@ -246,10 +246,10 @@ public abstract class AbstractBenchmark { call.request(1); return new ServerCall.Listener() { @Override - public void onPayload(ByteBuf payload) { + public void onMessage(ByteBuf message) { // no-op - payload.release(); - call.sendPayload(response.slice()); + message.release(); + call.sendMessage(response.slice()); } @Override @@ -277,9 +277,9 @@ public abstract class AbstractBenchmark { call.request(1); return new ServerCall.Listener() { @Override - public void onPayload(ByteBuf payload) { - payload.release(); - call.sendPayload(response.slice()); + public void onMessage(ByteBuf message) { + message.release(); + call.sendMessage(response.slice()); // Request next message call.request(1); } @@ -310,10 +310,10 @@ public abstract class AbstractBenchmark { call.request(1); return new ServerCall.Listener() { @Override - public void onPayload(ByteBuf payload) { - payload.release(); + public void onMessage(ByteBuf message) { + message.release(); while (call.isReady()) { - call.sendPayload(response.slice()); + call.sendMessage(response.slice()); } // Request next message call.request(1); @@ -337,7 +337,7 @@ public abstract class AbstractBenchmark { @Override public void onReady() { while (call.isReady()) { - call.sendPayload(response.slice()); + call.sendMessage(response.slice()); } } }; @@ -510,7 +510,7 @@ public abstract class AbstractBenchmark { @Override public ByteBuf parse(InputStream stream) { try { - // We don't do anything with the payload and it's already been read into buffers + // We don't do anything with the message and it's already been read into buffers // so just skip copying it. stream.skip(stream.available()); return EMPTY_BYTE_BUF; diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java index 178608cfaf..6a70c88d51 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java @@ -31,7 +31,7 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark public ExecutorType clientExecutor = ExecutorType.DIRECT; @Param({"SMALL"}) - public PayloadSize responseSize = PayloadSize.SMALL; + public MessageSize responseSize = MessageSize.SMALL; private static AtomicLong callCounter; private AtomicBoolean completed; @@ -61,7 +61,7 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark public void setup() throws Exception { super.setup(clientExecutor, ExecutorType.DIRECT, - PayloadSize.SMALL, + MessageSize.SMALL, responseSize, FlowWindowSize.MEDIUM, ChannelType.NIO, diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java index 66e59e87cb..4d131f8915 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java @@ -28,8 +28,8 @@ public class SingleThreadBlockingQpsBenchmark extends AbstractBenchmark { public void setup() throws Exception { super.setup(ExecutorType.DIRECT, ExecutorType.DIRECT, - PayloadSize.SMALL, - PayloadSize.SMALL, + MessageSize.SMALL, + MessageSize.SMALL, FlowWindowSize.MEDIUM, ChannelType.NIO, 1, diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java index 45726c2781..02637e0222 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java @@ -55,8 +55,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { public void setup() throws Exception { super.setup(ExecutorType.DIRECT, ExecutorType.DIRECT, - PayloadSize.SMALL, - PayloadSize.SMALL, + MessageSize.SMALL, + MessageSize.SMALL, FlowWindowSize.MEDIUM, ChannelType.NIO, maxConcurrentStreams, diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java index bfcd44e1a5..15e01745c5 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java @@ -26,7 +26,7 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { public int maxConcurrentStreams = 1; @Param({"LARGE", "JUMBO"}) - public PayloadSize responseSize = PayloadSize.JUMBO; + public MessageSize responseSize = MessageSize.JUMBO; @Param({"MEDIUM", "LARGE", "JUMBO"}) public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; @@ -59,7 +59,7 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { public void setup() throws Exception { super.setup(ExecutorType.DIRECT, ExecutorType.DIRECT, - PayloadSize.SMALL, + MessageSize.SMALL, responseSize, clientInboundFlowWindow, ChannelType.NIO, diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallQpsBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallQpsBenchmark.java index 7e063cecb4..172b909028 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallQpsBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallQpsBenchmark.java @@ -85,8 +85,8 @@ public class UnaryCallQpsBenchmark extends AbstractBenchmark { public void setup() throws Exception { super.setup(ExecutorType.DIRECT, ExecutorType.DIRECT, - PayloadSize.SMALL, - PayloadSize.SMALL, + MessageSize.SMALL, + MessageSize.SMALL, FlowWindowSize.LARGE, ChannelType.NIO, maxConcurrentStreams, diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java index 118209bd79..3cf0d4bb1d 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java @@ -26,7 +26,7 @@ public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark { public int maxConcurrentStreams = 1; @Param({"LARGE", "JUMBO"}) - public PayloadSize responseSize = PayloadSize.JUMBO; + public MessageSize responseSize = MessageSize.JUMBO; @Param({"MEDIUM", "LARGE", "JUMBO"}) public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; @@ -60,7 +60,7 @@ public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark { public void setup() throws Exception { super.setup(ExecutorType.DIRECT, ExecutorType.DIRECT, - PayloadSize.SMALL, + MessageSize.SMALL, responseSize, clientInboundFlowWindow, ChannelType.NIO, diff --git a/core/src/main/java/io/grpc/ForwardingServerCall.java b/core/src/main/java/io/grpc/ForwardingServerCall.java index 05fac3901d..f0a27bee03 100644 --- a/core/src/main/java/io/grpc/ForwardingServerCall.java +++ b/core/src/main/java/io/grpc/ForwardingServerCall.java @@ -51,8 +51,8 @@ public abstract class ForwardingServerCall extends ServerCall { } @Override - public void sendPayload(RespT payload) { - delegate().sendPayload(payload); + public void sendMessage(RespT message) { + delegate().sendMessage(message); } @Override diff --git a/core/src/main/java/io/grpc/ForwardingServerCallListener.java b/core/src/main/java/io/grpc/ForwardingServerCallListener.java index 72f86e430f..2fe78bb644 100644 --- a/core/src/main/java/io/grpc/ForwardingServerCallListener.java +++ b/core/src/main/java/io/grpc/ForwardingServerCallListener.java @@ -42,8 +42,8 @@ public abstract class ForwardingServerCallListener extends ServerCall.List protected abstract ServerCall.Listener delegate(); @Override - public void onPayload(ReqT payload) { - delegate().onPayload(payload); + public void onMessage(ReqT message) { + delegate().onMessage(message); } @Override diff --git a/core/src/main/java/io/grpc/ServerCall.java b/core/src/main/java/io/grpc/ServerCall.java index 8cd7272c7d..379155ffa1 100644 --- a/core/src/main/java/io/grpc/ServerCall.java +++ b/core/src/main/java/io/grpc/ServerCall.java @@ -38,10 +38,10 @@ package io.grpc; * requests and responses. This API is generally intended for use by generated handlers, * but applications may use it directly if they need to. * - *

Headers must be sent before any payloads, which must be sent before closing. + *

Headers must be sent before any messages, which must be sent before closing. * *

No generic method for determining message receipt or providing acknowledgement is provided. - * Applications are expected to utilize normal payload messages for such signals, as a response + * Applications are expected to utilize normal messages for such signals, as a response * naturally acknowledges its request. * *

Methods are guaranteed to be non-blocking. Implementations are not required to be thread-safe. @@ -52,7 +52,7 @@ public abstract class ServerCall { /** * Callbacks for consuming incoming RPC messages. * - *

Any contexts are guaranteed to arrive before any payloads, which are guaranteed before half + *

Any contexts are guaranteed to arrive before any messages, which are guaranteed before half * close, which is guaranteed before completion. * *

Implementations are free to block for extended periods of time. Implementations are not @@ -66,9 +66,9 @@ public abstract class ServerCall { * A request message has been received. For streaming calls, there may be zero or more request * messages. * - * @param payload a received request message. + * @param message a received request message. */ - public abstract void onPayload(RequestT payload); + public abstract void onMessage(RequestT message); /** * The client completed all message sending. However, the call may still be cancelled. @@ -94,7 +94,7 @@ public abstract class ServerCall { /** * This indicates that the call is now capable of sending additional messages (via - * {@link #sendPayload}) without requiring excessive buffering internally. This event is + * {@link #sendMessage}) without requiring excessive buffering internally. This event is * just a suggestion and the application is free to ignore it, however doing so may * result in excessive buffering within the call. */ @@ -103,7 +103,7 @@ public abstract class ServerCall { /** * Requests up to the given number of messages from the call to be delivered to - * {@link Listener#onPayload(Object)}. Once {@code numMessages} have been delivered + * {@link Listener#onMessage(Object)}. Once {@code numMessages} have been delivered * no further request messages will be delivered until more messages are requested by * calling this method again. * @@ -114,23 +114,23 @@ public abstract class ServerCall { public abstract void request(int numMessages); /** - * Send response header metadata prior to sending a response payload. This method may - * only be called once and cannot be called after calls to {@link #sendPayload} or {@link #close}. + * Send response header metadata prior to sending a response message. This method may + * only be called once and cannot be called after calls to {@link #sendMessage} or {@link #close}. * * @param headers metadata to send prior to any response body. - * @throws IllegalStateException if {@code close} has been called, a payload has been sent, or + * @throws IllegalStateException if {@code close} has been called, a message has been sent, or * headers have already been sent */ public abstract void sendHeaders(Metadata.Headers headers); /** - * Send a response message. Payload messages are the primary form of communication associated with + * Send a response message. Messages are the primary form of communication associated with * RPCs. Multiple response messages may exist for streaming calls. * - * @param payload response message. + * @param message response message. * @throws IllegalStateException if call is {@link #close}d */ - public abstract void sendPayload(ResponseT payload); + public abstract void sendMessage(ResponseT message); /** * If {@code true}, indicates that the call is capable of sending additional messages diff --git a/core/src/main/java/io/grpc/ServerImpl.java b/core/src/main/java/io/grpc/ServerImpl.java index 00631875a7..8415a6b46f 100644 --- a/core/src/main/java/io/grpc/ServerImpl.java +++ b/core/src/main/java/io/grpc/ServerImpl.java @@ -472,7 +472,7 @@ public final class ServerImpl extends Server { private volatile boolean cancelled; private boolean sendHeadersCalled; private boolean closeCalled; - private boolean sendPayloadCalled; + private boolean sendMessageCalled; public ServerCallImpl(ServerStream stream, MethodDescriptor method) { this.stream = stream; @@ -488,18 +488,18 @@ public final class ServerImpl extends Server { public void sendHeaders(Metadata.Headers headers) { Preconditions.checkState(!sendHeadersCalled, "sendHeaders has already been called"); Preconditions.checkState(!closeCalled, "call is closed"); - Preconditions.checkState(!sendPayloadCalled, "sendPayload has already been called"); + Preconditions.checkState(!sendMessageCalled, "sendMessage has already been called"); sendHeadersCalled = true; stream.writeHeaders(headers); } @Override - public void sendPayload(RespT payload) { + public void sendMessage(RespT message) { Preconditions.checkState(!closeCalled, "call is closed"); - sendPayloadCalled = true; + sendMessageCalled = true; try { - InputStream message = method.streamResponse(payload); - stream.writeMessage(message); + InputStream resp = method.streamResponse(message); + stream.writeMessage(resp); stream.flush(); } catch (Throwable t) { close(Status.fromThrowable(t), new Metadata.Trailers()); @@ -549,7 +549,7 @@ public final class ServerImpl extends Server { return; } - listener.onPayload(method.parseRequest(message)); + listener.onMessage(method.parseRequest(message)); } finally { try { message.close(); diff --git a/core/src/test/java/io/grpc/ServerImplTest.java b/core/src/test/java/io/grpc/ServerImplTest.java index 7b86d7aec4..79d6070788 100644 --- a/core/src/test/java/io/grpc/ServerImplTest.java +++ b/core/src/test/java/io/grpc/ServerImplTest.java @@ -208,9 +208,9 @@ public class ServerImplTest { String order = "Lots of pizza, please"; streamListener.messageRead(STRING_MARSHALLER.stream(order)); - verify(callListener, timeout(2000)).onPayload(order); + verify(callListener, timeout(2000)).onMessage(order); - call.sendPayload(314); + call.sendMessage(314); ArgumentCaptor inputCaptor = ArgumentCaptor.forClass(InputStream.class); verify(stream).writeMessage(inputCaptor.capture()); verify(stream).flush(); @@ -220,7 +220,7 @@ public class ServerImplTest { executeBarrier(executor).await(); verify(callListener).onHalfClose(); - call.sendPayload(50); + call.sendMessage(50); verify(stream, times(2)).writeMessage(inputCaptor.capture()); verify(stream, times(2)).flush(); assertEquals(50, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue()); diff --git a/examples/src/main/java/io/grpc/examples/header/HeaderServerInterceptor.java b/examples/src/main/java/io/grpc/examples/header/HeaderServerInterceptor.java index 28a6a11ffb..b3e69a8b90 100644 --- a/examples/src/main/java/io/grpc/examples/header/HeaderServerInterceptor.java +++ b/examples/src/main/java/io/grpc/examples/header/HeaderServerInterceptor.java @@ -69,11 +69,11 @@ public class HeaderServerInterceptor implements ServerInterceptor { } @Override - public void sendPayload(RespT payload) { + public void sendMessage(RespT message) { if (!sentHeaders) { sendHeaders(new Metadata.Headers()); } - super.sendPayload(payload); + super.sendMessage(message); } @Override diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 16f519af63..dad836d65e 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -125,12 +125,12 @@ public class ServerCalls { String fullMethodName, final ServerCall call, Metadata.Headers headers) { final ResponseObserver responseObserver = new ResponseObserver(call); // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client - // sends more than 1 requests, we will catch it in onPayload() and emit INVALID_ARGUMENT. + // sends more than 1 requests, we will catch it in onMessage() and emit INVALID_ARGUMENT. call.request(2); return new EmptyServerCallListener() { ReqT request; @Override - public void onPayload(ReqT request) { + public void onMessage(ReqT request) { if (this.request == null) { // We delay calling method.invoke() until onHalfClose(), because application may call // close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose(). @@ -138,7 +138,7 @@ public class ServerCalls { } else { call.close( Status.INVALID_ARGUMENT.withDescription( - "More than one request payloads for unary call or server streaming call"), + "More than one request messages for unary call or server streaming call"), new Metadata.Trailers()); } } @@ -180,7 +180,7 @@ public class ServerCalls { boolean halfClosed = false; @Override - public void onPayload(ReqT request) { + public void onMessage(ReqT request) { requestObserver.onValue(request); // Request delivery of the next inbound message. @@ -226,7 +226,7 @@ public class ServerCalls { if (cancelled) { throw Status.CANCELLED.asRuntimeException(); } - call.sendPayload(response); + call.sendMessage(response); // Request delivery of the next inbound message. call.request(1); @@ -249,7 +249,7 @@ public class ServerCalls { private static class EmptyServerCallListener extends ServerCall.Listener { @Override - public void onPayload(ReqT request) { + public void onMessage(ReqT request) { } @Override diff --git a/testing/src/main/java/io/grpc/testing/TestUtils.java b/testing/src/main/java/io/grpc/testing/TestUtils.java index dbfe1e824b..d8b50d5756 100644 --- a/testing/src/main/java/io/grpc/testing/TestUtils.java +++ b/testing/src/main/java/io/grpc/testing/TestUtils.java @@ -96,11 +96,11 @@ public class TestUtils { } @Override - public void sendPayload(RespT payload) { + public void sendMessage(RespT message) { if (!sentHeaders) { sendHeaders(new Metadata.Headers()); } - super.sendPayload(payload); + super.sendMessage(message); } @Override