A couple minor fixes to Channel for bugs exposed while integration testing with Netty transport.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69939586
This commit is contained in:
nathanmittler 2014-06-25 09:18:42 -07:00 committed by Eric Anderson
parent 6933667178
commit f4694f57f9
2 changed files with 16 additions and 10 deletions

View File

@ -52,9 +52,13 @@ public final class ChannelImpl extends AbstractService implements Channel {
@Override
protected synchronized void doStop() {
activeTransport.stopAsync();
activeTransport = null;
// The last TransportListener will call notifyStopped().
if (activeTransport != null) {
activeTransport.stopAsync();
activeTransport = null;
// The last TransportListener will call notifyStopped().
} else {
notifyStopped();
}
}
@Override
@ -115,7 +119,7 @@ public final class ChannelImpl extends AbstractService implements Channel {
private class CallImpl<ReqT, RespT> extends Call<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final SerializingExecutor executor;
private final SerializingExecutor callExecutor;
// TODO(user): Consider moving flow control notification/management to Call itself.
private final Collection<SettableFuture<Void>> inProcessFutures
= Collections.synchronizedSet(new HashSet<SettableFuture<Void>>());
@ -123,7 +127,7 @@ public final class ChannelImpl extends AbstractService implements Channel {
public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor) {
this.method = method;
this.executor = executor;
this.callExecutor = executor;
}
@Override
@ -134,8 +138,11 @@ public final class ChannelImpl extends AbstractService implements Channel {
@Override
public void cancel() {
Preconditions.checkState(stream != null, "Not started");
stream.cancel();
// Cancel is called in exception handling cases, so it may be the case that the
// stream was never successfully created.
if (stream != null) {
stream.cancel();
}
}
@Override
@ -200,7 +207,7 @@ public final class ChannelImpl extends AbstractService implements Channel {
private ListenableFuture<Void> dispatchCallable(
final Callable<ListenableFuture<Void>> callable) {
final SettableFuture<Void> ours = SettableFuture.create();
executor.execute(new Runnable() {
callExecutor.execute(new Runnable() {
@Override
public void run() {
try {
@ -252,7 +259,7 @@ public final class ChannelImpl extends AbstractService implements Channel {
future.cancel(false);
}
inProcessFutures.clear();
executor.execute(new Runnable() {
callExecutor.execute(new Runnable() {
@Override
public void run() {
observer.onClose(status);

View File

@ -18,7 +18,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Utility functions for processing different call idioms. We have one-to-one correspondence
* between utilities in this class and the potential signatures in a generated stub class so