mirror of https://github.com/grpc/grpc-java.git
This reverts commit fe46edac
This commit is contained in:
parent
7dbf12fe2a
commit
258a95d0c4
|
@ -401,7 +401,6 @@ public final class ClientCalls {
|
||||||
private final CallToStreamObserverAdapter<ReqT> adapter;
|
private final CallToStreamObserverAdapter<ReqT> adapter;
|
||||||
private final boolean streamingResponse;
|
private final boolean streamingResponse;
|
||||||
private boolean firstResponseReceived;
|
private boolean firstResponseReceived;
|
||||||
private RespT unaryMessage;
|
|
||||||
|
|
||||||
// Non private to avoid synthetic class
|
// Non private to avoid synthetic class
|
||||||
StreamObserverToCallListenerAdapter(
|
StreamObserverToCallListenerAdapter(
|
||||||
|
@ -432,13 +431,7 @@ public final class ClientCalls {
|
||||||
.asRuntimeException();
|
.asRuntimeException();
|
||||||
}
|
}
|
||||||
firstResponseReceived = true;
|
firstResponseReceived = true;
|
||||||
|
observer.onNext(message);
|
||||||
if (streamingResponse) {
|
|
||||||
observer.onNext(message);
|
|
||||||
} else {
|
|
||||||
// will send message in onClose() for unary calls.
|
|
||||||
unaryMessage = message;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamingResponse && adapter.autoFlowControlEnabled) {
|
if (streamingResponse && adapter.autoFlowControlEnabled) {
|
||||||
// Request delivery of the next inbound message.
|
// Request delivery of the next inbound message.
|
||||||
|
@ -448,28 +441,10 @@ public final class ClientCalls {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onClose(Status status, Metadata trailers) {
|
public void onClose(Status status, Metadata trailers) {
|
||||||
Throwable error = null;
|
|
||||||
if (status.isOk()) {
|
if (status.isOk()) {
|
||||||
if (!streamingResponse) {
|
|
||||||
if (unaryMessage != null) {
|
|
||||||
try {
|
|
||||||
observer.onNext(unaryMessage);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
error = t;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error = Status.INTERNAL.withDescription("Response message is null for unary call")
|
|
||||||
.asRuntimeException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error = status.asRuntimeException(trailers);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (error == null) {
|
|
||||||
observer.onCompleted();
|
observer.onCompleted();
|
||||||
} else {
|
} else {
|
||||||
observer.onError(error);
|
observer.onError(status.asRuntimeException(trailers));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,123 +99,6 @@ public class ClientCallsTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void unaryAsyncCallStatusIsOkWithMessageSuccess() throws Exception {
|
|
||||||
Integer req = 2;
|
|
||||||
final String resp = "bar";
|
|
||||||
final Status status = Status.OK;
|
|
||||||
final Metadata trailers = new Metadata();
|
|
||||||
final List<String> actualResponse = new ArrayList<>();
|
|
||||||
final List<Boolean> completed = new ArrayList<>();
|
|
||||||
|
|
||||||
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
|
|
||||||
@Override
|
|
||||||
public void start(ClientCall.Listener<String> listener, Metadata headers) {
|
|
||||||
listener.onMessage(resp);
|
|
||||||
listener.onClose(status, trailers);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamObserver<String> responseObserver = new StreamObserver<String>() {
|
|
||||||
@Override
|
|
||||||
public void onNext(String value) {
|
|
||||||
actualResponse.add(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
fail("Should not reach here");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onCompleted() {
|
|
||||||
completed.add(true);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ClientCalls.asyncUnaryCall(call, req, responseObserver);
|
|
||||||
assertThat(actualResponse).hasSize(1);
|
|
||||||
assertEquals(resp, actualResponse.get(0));
|
|
||||||
assertThat(completed).hasSize(1);
|
|
||||||
assertThat(completed.get(0)).isTrue();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void unaryAsyncCallStatusIsOkWithNullMessageGetError() throws Exception {
|
|
||||||
Integer req = 2;
|
|
||||||
final Status status = Status.OK;
|
|
||||||
final Metadata trailers = new Metadata();
|
|
||||||
final List<Throwable> expected = new ArrayList<>();
|
|
||||||
|
|
||||||
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
|
|
||||||
@Override
|
|
||||||
public void start(ClientCall.Listener<String> listener, Metadata headers) {
|
|
||||||
listener.onMessage(null);
|
|
||||||
listener.onClose(status, trailers);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamObserver<String> responseObserver = new StreamObserver<String>() {
|
|
||||||
@Override
|
|
||||||
public void onNext(String value) {
|
|
||||||
fail("Should not reach here");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
expected.add(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onCompleted() {
|
|
||||||
fail("Should not reach here");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ClientCalls.asyncUnaryCall(call, req, responseObserver);
|
|
||||||
assertThat(expected).hasSize(1);
|
|
||||||
assertThat(expected.get(0)).hasMessageThat()
|
|
||||||
.isEqualTo("INTERNAL: Response message is null for unary call");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void unaryAsyncCallStatusIsNotOkWithMessageDoNotSendMessage() throws Exception {
|
|
||||||
Integer req = 2;
|
|
||||||
final Status status = Status.INTERNAL.withDescription("Unique status");
|
|
||||||
final String resp = "bar";
|
|
||||||
final Metadata trailers = new Metadata();
|
|
||||||
final List<Throwable> expected = new ArrayList<>();
|
|
||||||
|
|
||||||
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
|
|
||||||
@Override
|
|
||||||
public void start(io.grpc.ClientCall.Listener<String> listener, Metadata headers) {
|
|
||||||
listener.onMessage(resp);
|
|
||||||
listener.onClose(status, trailers);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamObserver<String> responseObserver = new StreamObserver<String>() {
|
|
||||||
@Override
|
|
||||||
public void onNext(String value) {
|
|
||||||
fail("Should not reach here");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
expected.add(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onCompleted() {
|
|
||||||
fail("Should not reach here");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ClientCalls.asyncUnaryCall(call, req, responseObserver);
|
|
||||||
assertThat(expected).hasSize(1);
|
|
||||||
assertThat(expected.get(0)).hasMessageThat().isEqualTo("INTERNAL: Unique status");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void unaryBlockingCallSuccess() throws Exception {
|
public void unaryBlockingCallSuccess() throws Exception {
|
||||||
Integer req = 2;
|
Integer req = 2;
|
||||||
|
|
Loading…
Reference in New Issue