core: fix a deterministic race between the old createSubchannel() and Subchannel.requestConnection(). (#5785)

The old createSubchannel() doesn't require being called from
sync-context, thus its implementation schedules start() on
sync-context because start() has such requirement.  However, if a
LoadBalancer call createSubchannel() in sync-context, start() is
queued and only called after the control exits the sync-context.  If
createSubchannel() is immediately followed by other Subchannel methods
that requires start(), such as requestConnection(), they will fail.
This will be a very common issue as most LoadBalancers call
createSubchannel() in the sync-context.

This fix splits out the real work of start() into internalStart() which
is called by the old createSubchannel().
This commit is contained in:
Kun Zhang 2019-05-23 17:10:27 -07:00 committed by GitHub
parent d7ef437dac
commit 881dbeb3a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 16 deletions

View File

@ -230,8 +230,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final AtomicBoolean shutdown = new AtomicBoolean(false);
// Must only be mutated and read from syncContext
private boolean shutdownNowed;
// Must only be mutated and read from syncContext
private boolean terminating;
// Must only be mutated from syncContext
private volatile boolean terminating;
// Must be mutated from syncContext
private volatile boolean terminated;
private final CountDownLatch terminatedLatch = new CountDownLatch(1);
@ -1060,7 +1060,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
// TODO(ejona): can we be even stricter? Like loadBalancer == null?
checkNotNull(addressGroups, "addressGroups");
checkNotNull(attrs, "attrs");
final AbstractSubchannel subchannel = createSubchannelInternal(
final SubchannelImpl subchannel = createSubchannelInternal(
CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroups)
.setAttributes(attrs)
@ -1074,12 +1074,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
};
syncContext.execute(new Runnable() {
@Override
public void run() {
subchannel.start(listener);
}
});
subchannel.internalStart(listener);
return subchannel;
}
@ -1089,7 +1084,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
return createSubchannelInternal(args);
}
private AbstractSubchannel createSubchannelInternal(CreateSubchannelArgs args) {
private SubchannelImpl createSubchannelInternal(CreateSubchannelArgs args) {
// TODO(ejona): can we be even stricter? Like loadBalancer == null?
checkState(!terminated, "Channel is terminated");
return new SubchannelImpl(args, this);
@ -1429,13 +1424,15 @@ final class ManagedChannelImpl extends ManagedChannel implements
subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
}
@Override
public void start(final SubchannelStateListener listener) {
syncContext.throwIfNotInThisSynchronizationContext();
// This can be called either in or outside of syncContext
// TODO(zhangkun83): merge it back into start() once the caller createSubchannel() is deleted.
private void internalStart(final SubchannelStateListener listener) {
checkState(!started, "already started");
checkState(!shutdown, "already shutdown");
started = true;
this.listener = listener;
// TODO(zhangkun): possibly remove the volatile of terminating when this whole method is
// required to be called from syncContext
if (terminating) {
syncContext.execute(new Runnable() {
@Override
@ -1475,7 +1472,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
}
InternalSubchannel internalSubchannel = new InternalSubchannel(
final InternalSubchannel internalSubchannel = new InternalSubchannel(
args.getAddresses(),
authority(),
userAgent,
@ -1498,9 +1495,22 @@ final class ManagedChannelImpl extends ManagedChannel implements
.setSubchannelRef(internalSubchannel)
.build());
channelz.addSubchannel(internalSubchannel);
this.subchannel = internalSubchannel;
subchannels.add(internalSubchannel);
// TODO(zhangkun83): no need to schedule on syncContext when this whole method is required
// to be called from syncContext
syncContext.execute(new Runnable() {
@Override
public void run() {
channelz.addSubchannel(internalSubchannel);
subchannels.add(internalSubchannel);
}
});
}
@Override
public void start(SubchannelStateListener listener) {
syncContext.throwIfNotInThisSynchronizationContext();
internalStart(listener);
}
@Override

View File

@ -373,6 +373,25 @@ public class ManagedChannelImplTest {
}
}
@Deprecated
@Test
public void createSubchannel_old_insideSyncContextFollowedByRequestConnectionShouldSucceed() {
createChannel();
final AtomicReference<Throwable> error = new AtomicReference<>();
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
try {
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
} catch (Throwable e) {
error.set(e);
}
}
});
assertThat(error.get()).isNull();
}
@Deprecated
@Test
@SuppressWarnings("deprecation")