diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 4adec0dac2..2ba8a78dcc 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -184,7 +184,8 @@ public abstract class AbstractClientStream extends AbstractStream private ClientStreamListener listener; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); - private Runnable deliveryStalledTask; + private boolean deframerClosed = false; + private Runnable deframerClosedTask; /** * Whether the stream is closed from the transport's perspective. This can differ from {@link @@ -210,18 +211,14 @@ public abstract class AbstractClientStream extends AbstractStream } @Override - public final void deliveryStalled() { - if (deliveryStalledTask != null) { - deliveryStalledTask.run(); - deliveryStalledTask = null; + public void deframerClosed(boolean hasPartialMessageIgnored) { + deframerClosed = true; + if (deframerClosedTask != null) { + deframerClosedTask.run(); + deframerClosedTask = null; } } - @Override - public final void endOfStream() { - deliveryStalled(); - } - @Override protected final ClientStreamListener listener() { return listener; @@ -266,7 +263,7 @@ public abstract class AbstractClientStream extends AbstractStream } needToCloseFrame = false; - deframe(frame, false); + deframe(frame); } finally { if (needToCloseFrame) { frame.close(); @@ -314,18 +311,18 @@ public abstract class AbstractClientStream extends AbstractStream statusReported = true; onStreamDeallocated(); - // If not stopping delivery, then we must wait until the deframer is stalled (i.e., it has no - // complete messages to deliver). - if (stopDelivery || isDeframerStalled()) { - deliveryStalledTask = null; + if (deframerClosed) { + deframerClosedTask = null; closeListener(status, trailers); } else { - deliveryStalledTask = new Runnable() { - @Override - public void run() { - closeListener(status, trailers); - } - }; + deframerClosedTask = + new Runnable() { + @Override + public void run() { + closeListener(status, trailers); + } + }; + closeDeframer(stopDelivery); } } @@ -337,7 +334,6 @@ public abstract class AbstractClientStream extends AbstractStream private void closeListener(Status status, Metadata trailers) { if (!listenerClosed) { listenerClosed = true; - closeDeframer(); statsTraceCtx.streamClosed(status); listener().closed(status, trailers); } diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index 14486e202c..5cad715226 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -182,6 +182,11 @@ public abstract class AbstractServerStream extends AbstractStream private ServerStreamListener listener; private final StatsTraceContext statsTraceCtx; + private boolean endOfStream = false; + private boolean deframerClosed = false; + private boolean immediateCloseRequested = false; + private Runnable deframerClosedTask; + protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) { super(maxMessageSize, statsTraceCtx); this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); @@ -202,12 +207,24 @@ public abstract class AbstractServerStream extends AbstractStream } @Override - public void deliveryStalled() {} - - @Override - public void endOfStream() { - closeDeframer(); - listener().halfClosed(); + public void deframerClosed(boolean hasPartialMessage) { + deframerClosed = true; + if (endOfStream) { + if (!immediateCloseRequested && hasPartialMessage) { + // We've received the entire stream and have data available but we don't have + // enough to read the next frame ... this is bad. + deframeFailed(Status.INTERNAL + .withDescription("Encountered end-of-stream mid-frame") + .asRuntimeException()); + deframerClosedTask = null; + return; + } + listener.halfClosed(); + } + if (deframerClosedTask != null) { + deframerClosedTask.run(); + deframerClosedTask = null; + } } @Override @@ -224,8 +241,13 @@ public abstract class AbstractServerStream extends AbstractStream * @param endOfStream {@code true} if no more data will be received on the stream. */ public void inboundDataReceived(ReadableBuffer frame, boolean endOfStream) { + Preconditions.checkState(!this.endOfStream, "Past end of stream"); // Deframe the message. If a failure occurs, deframeFailed will be called. - deframe(frame, endOfStream); + deframe(frame); + if (endOfStream) { + this.endOfStream = true; + closeDeframer(false); + } } /** @@ -238,9 +260,22 @@ public abstract class AbstractServerStream extends AbstractStream * * @param status the error status. Must not be {@link Status#OK}. */ - public final void transportReportStatus(Status status) { + public final void transportReportStatus(final Status status) { Preconditions.checkArgument(!status.isOk(), "status must not be OK"); - closeListener(status); + if (deframerClosed) { + deframerClosedTask = null; + closeListener(status); + } else { + deframerClosedTask = + new Runnable() { + @Override + public void run() { + closeListener(status); + } + }; + immediateCloseRequested = true; + closeDeframer(true); + } } /** @@ -249,7 +284,20 @@ public abstract class AbstractServerStream extends AbstractStream * #transportReportStatus}. */ public void complete() { - closeListener(Status.OK); + if (deframerClosed) { + deframerClosedTask = null; + closeListener(Status.OK); + } else { + deframerClosedTask = + new Runnable() { + @Override + public void run() { + closeListener(Status.OK); + } + }; + immediateCloseRequested = true; + closeDeframer(true); + } } /** @@ -263,7 +311,6 @@ public abstract class AbstractServerStream extends AbstractStream } listenerClosed = true; onStreamDeallocated(); - closeDeframer(); listener().closed(newStatus); } } diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 74dfef4891..55fffcd66d 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -147,7 +147,7 @@ public abstract class AbstractStream implements Stream { } /** - * Called when a {@link #deframe(ReadableBuffer, boolean)} operation failed. + * Called when a {@link #deframe(ReadableBuffer)} operation failed. * * @param cause the actual failure */ @@ -156,31 +156,36 @@ public abstract class AbstractStream implements Stream { /** * Closes this deframer and frees any resources. After this method is called, additional calls * will have no effect. + * + *
When {@code stopDelivery} is false, the deframer will wait to close until any already + * queued messages have been delivered. + * + *
The deframer will invoke {@link #deframerClosed(boolean)} upon closing.
+ *
+ * @param stopDelivery interrupt pending deliveries and close immediately
*/
- protected final void closeDeframer() {
- deframer.close();
- }
-
- /**
- * Indicates whether delivery is currently stalled, pending receipt of more data.
- */
- protected final boolean isDeframerStalled() {
- return deframer.isStalled();
+ protected final void closeDeframer(boolean stopDelivery) {
+ if (stopDelivery) {
+ deframer.close();
+ } else {
+ deframer.closeWhenComplete();
+ }
}
/**
* Called to parse a received frame and attempt delivery of any completed
* messages. Must be called from the transport thread.
*/
- protected final void deframe(ReadableBuffer frame, boolean endOfStream) {
+ protected final void deframe(ReadableBuffer frame) {
if (deframer.isClosed()) {
frame.close();
return;
}
try {
- deframer.deframe(frame, endOfStream);
+ deframer.deframe(frame);
} catch (Throwable t) {
deframeFailed(t);
+ deframer.close(); // unrecoverable state
}
}
@@ -196,6 +201,7 @@ public abstract class AbstractStream implements Stream {
deframer.request(numMessages);
} catch (Throwable t) {
deframeFailed(t);
+ deframer.close(); // unrecoverable state
}
}
diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java
index 4e3f1868bb..875258e337 100644
--- a/core/src/main/java/io/grpc/internal/MessageDeframer.java
+++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java
@@ -58,20 +58,14 @@ public class MessageDeframer implements Closeable {
/**
* Called to deliver the next complete message.
*
- * @param is stream containing the message.
+ * @param message stream containing the message.
*/
- void messageRead(InputStream is);
+ void messageRead(InputStream message);
/**
- * Called when end-of-stream has not yet been reached but there are no complete messages
- * remaining to be delivered.
+ * Called when the deframer closes.
*/
- void deliveryStalled();
-
- /**
- * Called when the stream is complete and all messages have been successfully delivered.
- */
- void endOfStream();
+ void deframerClosed(boolean hasPartialMessage);
}
private enum State {
@@ -86,13 +80,13 @@ public class MessageDeframer implements Closeable {
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private boolean compressedFlag;
- private boolean endOfStream;
private CompositeReadableBuffer nextFrame;
private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer();
private long pendingDeliveries;
- private boolean deliveryStalled = true;
private boolean inDelivery = false;
+ private boolean closeWhenComplete = false;
+
/**
* Create a deframer.
*
@@ -147,24 +141,18 @@ public class MessageDeframer implements Closeable {
* Adds the given data to this deframer and attempts delivery to the listener.
*
* @param data the raw data read from the remote endpoint. Must be non-null.
- * @param endOfStream if {@code true}, indicates that {@code data} is the end of the stream from
- * the remote endpoint. End of stream should not be used in the event of a transport
- * error, such as a stream reset.
- * @throws IllegalStateException if {@link #close()} has been called previously or if
- * this method has previously been called with {@code endOfStream=true}.
+ * @throws IllegalStateException if {@link #close()} or {@link #closeWhenComplete()} has been
+ * called previously.
*/
- public void deframe(ReadableBuffer data, boolean endOfStream) {
+ public void deframe(ReadableBuffer data) {
Preconditions.checkNotNull(data, "data");
boolean needToCloseData = true;
try {
- checkNotClosed();
- Preconditions.checkState(!this.endOfStream, "Past end of stream");
+ checkNotClosedOrScheduledToClose();
unprocessed.addBuffer(data);
needToCloseData = false;
- // Indicate that all of the data for this stream has been received.
- this.endOfStream = endOfStream;
deliver();
} finally {
if (needToCloseData) {
@@ -173,12 +161,17 @@ public class MessageDeframer implements Closeable {
}
}
- /**
- * Indicates whether delivery is currently stalled, pending receipt of more data. This means
- * that no additional data can be delivered to the application.
- */
- public boolean isStalled() {
- return deliveryStalled;
+ /** Close when any messages currently in unprocessed have been requested and delivered. */
+ public void closeWhenComplete() {
+ if (unprocessed == null) {
+ return;
+ }
+ boolean stalled = unprocessed.readableBytes() == 0;
+ if (stalled) {
+ close();
+ } else {
+ closeWhenComplete = true;
+ }
}
/**
@@ -187,6 +180,10 @@ public class MessageDeframer implements Closeable {
*/
@Override
public void close() {
+ if (isClosed()) {
+ return;
+ }
+ boolean hasPartialMessage = nextFrame != null && nextFrame.readableBytes() > 0;
try {
if (unprocessed != null) {
unprocessed.close();
@@ -198,6 +195,7 @@ public class MessageDeframer implements Closeable {
unprocessed = null;
nextFrame = null;
}
+ listener.deframerClosed(hasPartialMessage);
}
/**
@@ -210,8 +208,9 @@ public class MessageDeframer implements Closeable {
/**
* Throws if this deframer has already been closed.
*/
- private void checkNotClosed() {
+ private void checkNotClosedOrScheduledToClose() {
Preconditions.checkState(!isClosed(), "MessageDeframer is already closed");
+ Preconditions.checkState(!closeWhenComplete, "MessageDeframer is scheduled to close");
}
/**
@@ -253,26 +252,8 @@ public class MessageDeframer implements Closeable {
* be in unprocessed.
*/
boolean stalled = unprocessed.readableBytes() == 0;
-
- if (endOfStream && stalled) {
- boolean havePartialMessage = nextFrame != null && nextFrame.readableBytes() > 0;
- if (!havePartialMessage) {
- listener.endOfStream();
- deliveryStalled = false;
- return;
- } else {
- // We've received the entire stream and have data available but we don't have
- // enough to read the next frame ... this is bad.
- throw Status.INTERNAL.withDescription(
- debugString + ": Encountered end-of-stream mid-frame").asRuntimeException();
- }
- }
-
- // If we're transitioning to the stalled state, notify the listener.
- boolean previouslyStalled = deliveryStalled;
- deliveryStalled = stalled;
- if (stalled && !previouslyStalled) {
- listener.deliveryStalled();
+ if (closeWhenComplete && stalled) {
+ close();
}
} finally {
inDelivery = false;
diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
index ecd65bb137..1e6b8a0c9b 100644
--- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
@@ -200,7 +200,7 @@ public class AbstractClientStreamTest {
// on the transport thread.
stream.transportState().requestMessagesFromDeframer(1);
// Send first byte of 2 byte message
- stream.transportState().deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}), false);
+ stream.transportState().deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}));
Status status = Status.INTERNAL;
// Simulate getting a reset
stream.transportState().transportReportStatus(status, false /*stop delivery*/, new Metadata());
diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
index 5da2c260dc..48e641c359 100644
--- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
@@ -27,15 +27,18 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.AbstractServerStream.TransportState;
import io.grpc.internal.MessageFramerTest.ByteWritableBuffer;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -48,6 +51,7 @@ import org.mockito.ArgumentCaptor;
*/
@RunWith(JUnit4.class)
public class AbstractServerStreamTest {
+ private static final int TIMEOUT_MS = 1000;
private static final int MAX_MESSAGE_SIZE = 100;
@Rule public final ExpectedException thrown = ExpectedException.none();
@@ -90,6 +94,59 @@ public class AbstractServerStreamTest {
assertNull("no message expected", streamListenerMessageQueue.poll());
}
+ @Test
+ public void queuedBytesInDeframerShouldNotBlockComplete() throws Exception {
+ final SettableFuture