Renamed Server payload to message

This commit is contained in:
Carl Mastrangelo 2015-08-05 13:24:41 -07:00
parent 67fc45d036
commit e76b8e7ee8
15 changed files with 66 additions and 66 deletions

View File

@ -51,15 +51,15 @@ import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractBenchmark { public abstract class AbstractBenchmark {
/** /**
* Standard payload sizes. * Standard message sizes.
*/ */
public enum PayloadSize { public enum MessageSize {
// Max out at 1MB to avoid creating payloads larger than Netty's buffer pool can handle // Max out at 1MB to avoid creating messages larger than Netty's buffer pool can handle
// by default // by default
SMALL(10), MEDIUM(1024), LARGE(65536), JUMBO(1048576); SMALL(10), MEDIUM(1024), LARGE(65536), JUMBO(1048576);
private final int bytes; private final int bytes;
PayloadSize(int bytes) { MessageSize(int bytes) {
this.bytes = bytes; this.bytes = bytes;
} }
@ -172,8 +172,8 @@ public abstract class AbstractBenchmark {
*/ */
public void setup(ExecutorType clientExecutor, public void setup(ExecutorType clientExecutor,
ExecutorType serverExecutor, ExecutorType serverExecutor,
PayloadSize requestSize, MessageSize requestSize,
PayloadSize responseSize, MessageSize responseSize,
FlowWindowSize windowSize, FlowWindowSize windowSize,
ChannelType channelType, ChannelType channelType,
int maxConcurrentStreams, int maxConcurrentStreams,
@ -246,10 +246,10 @@ public abstract class AbstractBenchmark {
call.request(1); call.request(1);
return new ServerCall.Listener<ByteBuf>() { return new ServerCall.Listener<ByteBuf>() {
@Override @Override
public void onPayload(ByteBuf payload) { public void onMessage(ByteBuf message) {
// no-op // no-op
payload.release(); message.release();
call.sendPayload(response.slice()); call.sendMessage(response.slice());
} }
@Override @Override
@ -277,9 +277,9 @@ public abstract class AbstractBenchmark {
call.request(1); call.request(1);
return new ServerCall.Listener<ByteBuf>() { return new ServerCall.Listener<ByteBuf>() {
@Override @Override
public void onPayload(ByteBuf payload) { public void onMessage(ByteBuf message) {
payload.release(); message.release();
call.sendPayload(response.slice()); call.sendMessage(response.slice());
// Request next message // Request next message
call.request(1); call.request(1);
} }
@ -310,10 +310,10 @@ public abstract class AbstractBenchmark {
call.request(1); call.request(1);
return new ServerCall.Listener<ByteBuf>() { return new ServerCall.Listener<ByteBuf>() {
@Override @Override
public void onPayload(ByteBuf payload) { public void onMessage(ByteBuf message) {
payload.release(); message.release();
while (call.isReady()) { while (call.isReady()) {
call.sendPayload(response.slice()); call.sendMessage(response.slice());
} }
// Request next message // Request next message
call.request(1); call.request(1);
@ -337,7 +337,7 @@ public abstract class AbstractBenchmark {
@Override @Override
public void onReady() { public void onReady() {
while (call.isReady()) { while (call.isReady()) {
call.sendPayload(response.slice()); call.sendMessage(response.slice());
} }
} }
}; };
@ -510,7 +510,7 @@ public abstract class AbstractBenchmark {
@Override @Override
public ByteBuf parse(InputStream stream) { public ByteBuf parse(InputStream stream) {
try { try {
// We don't do anything with the payload and it's already been read into buffers // We don't do anything with the message and it's already been read into buffers
// so just skip copying it. // so just skip copying it.
stream.skip(stream.available()); stream.skip(stream.available());
return EMPTY_BYTE_BUF; return EMPTY_BYTE_BUF;

View File

@ -31,7 +31,7 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark
public ExecutorType clientExecutor = ExecutorType.DIRECT; public ExecutorType clientExecutor = ExecutorType.DIRECT;
@Param({"SMALL"}) @Param({"SMALL"})
public PayloadSize responseSize = PayloadSize.SMALL; public MessageSize responseSize = MessageSize.SMALL;
private static AtomicLong callCounter; private static AtomicLong callCounter;
private AtomicBoolean completed; private AtomicBoolean completed;
@ -61,7 +61,7 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark
public void setup() throws Exception { public void setup() throws Exception {
super.setup(clientExecutor, super.setup(clientExecutor,
ExecutorType.DIRECT, ExecutorType.DIRECT,
PayloadSize.SMALL, MessageSize.SMALL,
responseSize, responseSize,
FlowWindowSize.MEDIUM, FlowWindowSize.MEDIUM,
ChannelType.NIO, ChannelType.NIO,

View File

@ -28,8 +28,8 @@ public class SingleThreadBlockingQpsBenchmark extends AbstractBenchmark {
public void setup() throws Exception { public void setup() throws Exception {
super.setup(ExecutorType.DIRECT, super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT, ExecutorType.DIRECT,
PayloadSize.SMALL, MessageSize.SMALL,
PayloadSize.SMALL, MessageSize.SMALL,
FlowWindowSize.MEDIUM, FlowWindowSize.MEDIUM,
ChannelType.NIO, ChannelType.NIO,
1, 1,

View File

@ -55,8 +55,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
public void setup() throws Exception { public void setup() throws Exception {
super.setup(ExecutorType.DIRECT, super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT, ExecutorType.DIRECT,
PayloadSize.SMALL, MessageSize.SMALL,
PayloadSize.SMALL, MessageSize.SMALL,
FlowWindowSize.MEDIUM, FlowWindowSize.MEDIUM,
ChannelType.NIO, ChannelType.NIO,
maxConcurrentStreams, maxConcurrentStreams,

View File

@ -26,7 +26,7 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
public int maxConcurrentStreams = 1; public int maxConcurrentStreams = 1;
@Param({"LARGE", "JUMBO"}) @Param({"LARGE", "JUMBO"})
public PayloadSize responseSize = PayloadSize.JUMBO; public MessageSize responseSize = MessageSize.JUMBO;
@Param({"MEDIUM", "LARGE", "JUMBO"}) @Param({"MEDIUM", "LARGE", "JUMBO"})
public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM;
@ -59,7 +59,7 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
public void setup() throws Exception { public void setup() throws Exception {
super.setup(ExecutorType.DIRECT, super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT, ExecutorType.DIRECT,
PayloadSize.SMALL, MessageSize.SMALL,
responseSize, responseSize,
clientInboundFlowWindow, clientInboundFlowWindow,
ChannelType.NIO, ChannelType.NIO,

View File

@ -85,8 +85,8 @@ public class UnaryCallQpsBenchmark extends AbstractBenchmark {
public void setup() throws Exception { public void setup() throws Exception {
super.setup(ExecutorType.DIRECT, super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT, ExecutorType.DIRECT,
PayloadSize.SMALL, MessageSize.SMALL,
PayloadSize.SMALL, MessageSize.SMALL,
FlowWindowSize.LARGE, FlowWindowSize.LARGE,
ChannelType.NIO, ChannelType.NIO,
maxConcurrentStreams, maxConcurrentStreams,

View File

@ -26,7 +26,7 @@ public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark {
public int maxConcurrentStreams = 1; public int maxConcurrentStreams = 1;
@Param({"LARGE", "JUMBO"}) @Param({"LARGE", "JUMBO"})
public PayloadSize responseSize = PayloadSize.JUMBO; public MessageSize responseSize = MessageSize.JUMBO;
@Param({"MEDIUM", "LARGE", "JUMBO"}) @Param({"MEDIUM", "LARGE", "JUMBO"})
public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM;
@ -60,7 +60,7 @@ public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark {
public void setup() throws Exception { public void setup() throws Exception {
super.setup(ExecutorType.DIRECT, super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT, ExecutorType.DIRECT,
PayloadSize.SMALL, MessageSize.SMALL,
responseSize, responseSize,
clientInboundFlowWindow, clientInboundFlowWindow,
ChannelType.NIO, ChannelType.NIO,

View File

@ -51,8 +51,8 @@ public abstract class ForwardingServerCall<RespT> extends ServerCall<RespT> {
} }
@Override @Override
public void sendPayload(RespT payload) { public void sendMessage(RespT message) {
delegate().sendPayload(payload); delegate().sendMessage(message);
} }
@Override @Override

View File

@ -42,8 +42,8 @@ public abstract class ForwardingServerCallListener<ReqT> extends ServerCall.List
protected abstract ServerCall.Listener<ReqT> delegate(); protected abstract ServerCall.Listener<ReqT> delegate();
@Override @Override
public void onPayload(ReqT payload) { public void onMessage(ReqT message) {
delegate().onPayload(payload); delegate().onMessage(message);
} }
@Override @Override

View File

@ -38,10 +38,10 @@ package io.grpc;
* requests and responses. This API is generally intended for use by generated handlers, * requests and responses. This API is generally intended for use by generated handlers,
* but applications may use it directly if they need to. * but applications may use it directly if they need to.
* *
* <p>Headers must be sent before any payloads, which must be sent before closing. * <p>Headers must be sent before any messages, which must be sent before closing.
* *
* <p>No generic method for determining message receipt or providing acknowledgement is provided. * <p>No generic method for determining message receipt or providing acknowledgement is provided.
* Applications are expected to utilize normal payload messages for such signals, as a response * Applications are expected to utilize normal messages for such signals, as a response
* naturally acknowledges its request. * naturally acknowledges its request.
* *
* <p>Methods are guaranteed to be non-blocking. Implementations are not required to be thread-safe. * <p>Methods are guaranteed to be non-blocking. Implementations are not required to be thread-safe.
@ -52,7 +52,7 @@ public abstract class ServerCall<ResponseT> {
/** /**
* Callbacks for consuming incoming RPC messages. * Callbacks for consuming incoming RPC messages.
* *
* <p>Any contexts are guaranteed to arrive before any payloads, which are guaranteed before half * <p>Any contexts are guaranteed to arrive before any messages, which are guaranteed before half
* close, which is guaranteed before completion. * close, which is guaranteed before completion.
* *
* <p>Implementations are free to block for extended periods of time. Implementations are not * <p>Implementations are free to block for extended periods of time. Implementations are not
@ -66,9 +66,9 @@ public abstract class ServerCall<ResponseT> {
* A request message has been received. For streaming calls, there may be zero or more request * A request message has been received. For streaming calls, there may be zero or more request
* messages. * messages.
* *
* @param payload a received request message. * @param message a received request message.
*/ */
public abstract void onPayload(RequestT payload); public abstract void onMessage(RequestT message);
/** /**
* The client completed all message sending. However, the call may still be cancelled. * The client completed all message sending. However, the call may still be cancelled.
@ -94,7 +94,7 @@ public abstract class ServerCall<ResponseT> {
/** /**
* This indicates that the call is now capable of sending additional messages (via * This indicates that the call is now capable of sending additional messages (via
* {@link #sendPayload}) without requiring excessive buffering internally. This event is * {@link #sendMessage}) without requiring excessive buffering internally. This event is
* just a suggestion and the application is free to ignore it, however doing so may * just a suggestion and the application is free to ignore it, however doing so may
* result in excessive buffering within the call. * result in excessive buffering within the call.
*/ */
@ -103,7 +103,7 @@ public abstract class ServerCall<ResponseT> {
/** /**
* Requests up to the given number of messages from the call to be delivered to * Requests up to the given number of messages from the call to be delivered to
* {@link Listener#onPayload(Object)}. Once {@code numMessages} have been delivered * {@link Listener#onMessage(Object)}. Once {@code numMessages} have been delivered
* no further request messages will be delivered until more messages are requested by * no further request messages will be delivered until more messages are requested by
* calling this method again. * calling this method again.
* *
@ -114,23 +114,23 @@ public abstract class ServerCall<ResponseT> {
public abstract void request(int numMessages); public abstract void request(int numMessages);
/** /**
* Send response header metadata prior to sending a response payload. This method may * Send response header metadata prior to sending a response message. This method may
* only be called once and cannot be called after calls to {@link #sendPayload} or {@link #close}. * only be called once and cannot be called after calls to {@link #sendMessage} or {@link #close}.
* *
* @param headers metadata to send prior to any response body. * @param headers metadata to send prior to any response body.
* @throws IllegalStateException if {@code close} has been called, a payload has been sent, or * @throws IllegalStateException if {@code close} has been called, a message has been sent, or
* headers have already been sent * headers have already been sent
*/ */
public abstract void sendHeaders(Metadata.Headers headers); public abstract void sendHeaders(Metadata.Headers headers);
/** /**
* Send a response message. Payload messages are the primary form of communication associated with * Send a response message. Messages are the primary form of communication associated with
* RPCs. Multiple response messages may exist for streaming calls. * RPCs. Multiple response messages may exist for streaming calls.
* *
* @param payload response message. * @param message response message.
* @throws IllegalStateException if call is {@link #close}d * @throws IllegalStateException if call is {@link #close}d
*/ */
public abstract void sendPayload(ResponseT payload); public abstract void sendMessage(ResponseT message);
/** /**
* If {@code true}, indicates that the call is capable of sending additional messages * If {@code true}, indicates that the call is capable of sending additional messages

View File

@ -472,7 +472,7 @@ public final class ServerImpl extends Server {
private volatile boolean cancelled; private volatile boolean cancelled;
private boolean sendHeadersCalled; private boolean sendHeadersCalled;
private boolean closeCalled; private boolean closeCalled;
private boolean sendPayloadCalled; private boolean sendMessageCalled;
public ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method) { public ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method) {
this.stream = stream; this.stream = stream;
@ -488,18 +488,18 @@ public final class ServerImpl extends Server {
public void sendHeaders(Metadata.Headers headers) { public void sendHeaders(Metadata.Headers headers) {
Preconditions.checkState(!sendHeadersCalled, "sendHeaders has already been called"); Preconditions.checkState(!sendHeadersCalled, "sendHeaders has already been called");
Preconditions.checkState(!closeCalled, "call is closed"); Preconditions.checkState(!closeCalled, "call is closed");
Preconditions.checkState(!sendPayloadCalled, "sendPayload has already been called"); Preconditions.checkState(!sendMessageCalled, "sendMessage has already been called");
sendHeadersCalled = true; sendHeadersCalled = true;
stream.writeHeaders(headers); stream.writeHeaders(headers);
} }
@Override @Override
public void sendPayload(RespT payload) { public void sendMessage(RespT message) {
Preconditions.checkState(!closeCalled, "call is closed"); Preconditions.checkState(!closeCalled, "call is closed");
sendPayloadCalled = true; sendMessageCalled = true;
try { try {
InputStream message = method.streamResponse(payload); InputStream resp = method.streamResponse(message);
stream.writeMessage(message); stream.writeMessage(resp);
stream.flush(); stream.flush();
} catch (Throwable t) { } catch (Throwable t) {
close(Status.fromThrowable(t), new Metadata.Trailers()); close(Status.fromThrowable(t), new Metadata.Trailers());
@ -549,7 +549,7 @@ public final class ServerImpl extends Server {
return; return;
} }
listener.onPayload(method.parseRequest(message)); listener.onMessage(method.parseRequest(message));
} finally { } finally {
try { try {
message.close(); message.close();

View File

@ -208,9 +208,9 @@ public class ServerImplTest {
String order = "Lots of pizza, please"; String order = "Lots of pizza, please";
streamListener.messageRead(STRING_MARSHALLER.stream(order)); streamListener.messageRead(STRING_MARSHALLER.stream(order));
verify(callListener, timeout(2000)).onPayload(order); verify(callListener, timeout(2000)).onMessage(order);
call.sendPayload(314); call.sendMessage(314);
ArgumentCaptor<InputStream> inputCaptor = ArgumentCaptor.forClass(InputStream.class); ArgumentCaptor<InputStream> inputCaptor = ArgumentCaptor.forClass(InputStream.class);
verify(stream).writeMessage(inputCaptor.capture()); verify(stream).writeMessage(inputCaptor.capture());
verify(stream).flush(); verify(stream).flush();
@ -220,7 +220,7 @@ public class ServerImplTest {
executeBarrier(executor).await(); executeBarrier(executor).await();
verify(callListener).onHalfClose(); verify(callListener).onHalfClose();
call.sendPayload(50); call.sendMessage(50);
verify(stream, times(2)).writeMessage(inputCaptor.capture()); verify(stream, times(2)).writeMessage(inputCaptor.capture());
verify(stream, times(2)).flush(); verify(stream, times(2)).flush();
assertEquals(50, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue()); assertEquals(50, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue());

View File

@ -69,11 +69,11 @@ public class HeaderServerInterceptor implements ServerInterceptor {
} }
@Override @Override
public void sendPayload(RespT payload) { public void sendMessage(RespT message) {
if (!sentHeaders) { if (!sentHeaders) {
sendHeaders(new Metadata.Headers()); sendHeaders(new Metadata.Headers());
} }
super.sendPayload(payload); super.sendMessage(message);
} }
@Override @Override

View File

@ -125,12 +125,12 @@ public class ServerCalls {
String fullMethodName, final ServerCall<RespT> call, Metadata.Headers headers) { String fullMethodName, final ServerCall<RespT> call, Metadata.Headers headers) {
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call); final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, we will catch it in onPayload() and emit INVALID_ARGUMENT. // sends more than 1 requests, we will catch it in onMessage() and emit INVALID_ARGUMENT.
call.request(2); call.request(2);
return new EmptyServerCallListener<ReqT>() { return new EmptyServerCallListener<ReqT>() {
ReqT request; ReqT request;
@Override @Override
public void onPayload(ReqT request) { public void onMessage(ReqT request) {
if (this.request == null) { if (this.request == null) {
// We delay calling method.invoke() until onHalfClose(), because application may call // We delay calling method.invoke() until onHalfClose(), because application may call
// close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose(). // close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose().
@ -138,7 +138,7 @@ public class ServerCalls {
} else { } else {
call.close( call.close(
Status.INVALID_ARGUMENT.withDescription( Status.INVALID_ARGUMENT.withDescription(
"More than one request payloads for unary call or server streaming call"), "More than one request messages for unary call or server streaming call"),
new Metadata.Trailers()); new Metadata.Trailers());
} }
} }
@ -180,7 +180,7 @@ public class ServerCalls {
boolean halfClosed = false; boolean halfClosed = false;
@Override @Override
public void onPayload(ReqT request) { public void onMessage(ReqT request) {
requestObserver.onValue(request); requestObserver.onValue(request);
// Request delivery of the next inbound message. // Request delivery of the next inbound message.
@ -226,7 +226,7 @@ public class ServerCalls {
if (cancelled) { if (cancelled) {
throw Status.CANCELLED.asRuntimeException(); throw Status.CANCELLED.asRuntimeException();
} }
call.sendPayload(response); call.sendMessage(response);
// Request delivery of the next inbound message. // Request delivery of the next inbound message.
call.request(1); call.request(1);
@ -249,7 +249,7 @@ public class ServerCalls {
private static class EmptyServerCallListener<ReqT> extends ServerCall.Listener<ReqT> { private static class EmptyServerCallListener<ReqT> extends ServerCall.Listener<ReqT> {
@Override @Override
public void onPayload(ReqT request) { public void onMessage(ReqT request) {
} }
@Override @Override

View File

@ -96,11 +96,11 @@ public class TestUtils {
} }
@Override @Override
public void sendPayload(RespT payload) { public void sendMessage(RespT message) {
if (!sentHeaders) { if (!sentHeaders) {
sendHeaders(new Metadata.Headers()); sendHeaders(new Metadata.Headers());
} }
super.sendPayload(payload); super.sendMessage(message);
} }
@Override @Override