core: In PF, pass around SubchannelData instead of Subchannel

Each usage of the subchannel immediately looked up the SubchannelData.
This commit is contained in:
Eric Anderson 2024-06-28 23:24:11 -07:00
parent f9b072cfe2
commit 15456f8f0a
1 changed files with 24 additions and 30 deletions

View File

@ -214,14 +214,13 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
} }
void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
ConnectivityState newState = stateInfo.getState(); ConnectivityState newState = stateInfo.getState();
SubchannelData subchannelData = subchannels.get(getAddress(subchannel));
// Shutdown channels/previously relevant subchannels can still callback with state updates. // Shutdown channels/previously relevant subchannels can still callback with state updates.
// To prevent pickers from returning these obsolete subchannels, this logic // To prevent pickers from returning these obsolete subchannels, this logic
// is included to check if the current list of active subchannels includes this subchannel. // is included to check if the current list of active subchannels includes this subchannel.
if (subchannelData == null || subchannelData.getSubchannel() != subchannel) { if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
return; return;
} }
@ -269,7 +268,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
case READY: case READY:
shutdownRemaining(subchannelData); shutdownRemaining(subchannelData);
addressIndex.seekTo(getAddress(subchannel)); addressIndex.seekTo(getAddress(subchannelData.subchannel));
rawConnectivityState = READY; rawConnectivityState = READY;
updateHealthCheckedState(subchannelData); updateHealthCheckedState(subchannelData);
break; break;
@ -277,7 +276,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
case TRANSIENT_FAILURE: case TRANSIENT_FAILURE:
// If we are looking at current channel, request a connection if possible // If we are looking at current channel, request a connection if possible
if (addressIndex.isValid() if (addressIndex.isValid()
&& subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() == subchannel) { && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
if (addressIndex.increment()) { if (addressIndex.increment()) {
cancelScheduleTask(); cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses requestConnection(); // is recursive so might hit the end of the addresses
@ -317,7 +316,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel))); new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
} else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) { } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError( updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
subchannelData.healthListener.healthStateInfo.getStatus()))); subchannelData.healthStateInfo.getStatus())));
} else if (concludedState != TRANSIENT_FAILURE) { } else if (concludedState != TRANSIENT_FAILURE) {
updateBalancingState(subchannelData.getHealthState(), updateBalancingState(subchannelData.getHealthState(),
new Picker(PickResult.withNoResult())); new Picker(PickResult.withNoResult()));
@ -377,25 +376,24 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
return; return;
} }
Subchannel subchannel; SocketAddress currentAddress = addressIndex.getCurrentAddress();
SocketAddress currentAddress; SubchannelData subchannelData = subchannels.get(currentAddress);
currentAddress = addressIndex.getCurrentAddress(); if (subchannelData == null) {
subchannel = subchannels.containsKey(currentAddress) subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
? subchannels.get(currentAddress).getSubchannel() }
: createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
ConnectivityState subchannelState = subchannels.get(currentAddress).getState(); ConnectivityState subchannelState = subchannelData.getState();
switch (subchannelState) { switch (subchannelState) {
case IDLE: case IDLE:
subchannel.requestConnection(); subchannelData.subchannel.requestConnection();
subchannels.get(currentAddress).updateState(CONNECTING); subchannelData.updateState(CONNECTING);
scheduleNextConnection(); scheduleNextConnection();
break; break;
case CONNECTING: case CONNECTING:
if (enableHappyEyeballs) { if (enableHappyEyeballs) {
scheduleNextConnection(); scheduleNextConnection();
} else { } else {
subchannel.requestConnection(); subchannelData.subchannel.requestConnection();
} }
break; break;
case TRANSIENT_FAILURE: case TRANSIENT_FAILURE:
@ -455,7 +453,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
} }
} }
private Subchannel createNewSubchannel(SocketAddress addr, Attributes attrs) { private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
HealthListener hcListener = new HealthListener(); HealthListener hcListener = new HealthListener();
final Subchannel subchannel = helper.createSubchannel( final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder() CreateSubchannelArgs.newBuilder()
@ -467,15 +465,15 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
log.warning("Was not able to create subchannel for " + addr); log.warning("Was not able to create subchannel for " + addr);
throw new IllegalStateException("Can't create subchannel"); throw new IllegalStateException("Can't create subchannel");
} }
SubchannelData subchannelData = new SubchannelData(subchannel, IDLE, hcListener); SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
hcListener.subchannelData = subchannelData; hcListener.subchannelData = subchannelData;
subchannels.put(addr, subchannelData); subchannels.put(addr, subchannelData);
Attributes scAttrs = subchannel.getAttributes(); Attributes scAttrs = subchannel.getAttributes();
if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) { if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
hcListener.healthStateInfo = ConnectivityStateInfo.forNonError(READY); subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
} }
subchannel.start(stateInfo -> processSubchannelState(subchannel, stateInfo)); subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
return subchannel; return subchannelData;
} }
private boolean isPassComplete() { private boolean isPassComplete() {
@ -492,17 +490,15 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
} }
private final class HealthListener implements SubchannelStateListener { private final class HealthListener implements SubchannelStateListener {
private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
private SubchannelData subchannelData; private SubchannelData subchannelData;
@Override @Override
public void onSubchannelState(ConnectivityStateInfo newState) { public void onSubchannelState(ConnectivityStateInfo newState) {
log.log(Level.FINE, "Received health status {0} for subchannel {1}", log.log(Level.FINE, "Received health status {0} for subchannel {1}",
new Object[]{newState, subchannelData.subchannel}); new Object[]{newState, subchannelData.subchannel});
healthStateInfo = newState; subchannelData.healthStateInfo = newState;
try { try {
SubchannelData curSubChanData = subchannels.get(addressIndex.getCurrentAddress()); if (subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
if (curSubChanData != null && curSubChanData.healthListener == this) {
updateHealthCheckedState(subchannelData); updateHealthCheckedState(subchannelData);
} }
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
@ -663,14 +659,12 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private static final class SubchannelData { private static final class SubchannelData {
private final Subchannel subchannel; private final Subchannel subchannel;
private ConnectivityState state; private ConnectivityState state;
private final HealthListener healthListener;
private boolean completedConnectivityAttempt = false; private boolean completedConnectivityAttempt = false;
private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
public SubchannelData(Subchannel subchannel, ConnectivityState state, public SubchannelData(Subchannel subchannel, ConnectivityState state) {
HealthListener subchannelHealthListener) {
this.subchannel = subchannel; this.subchannel = subchannel;
this.state = state; this.state = state;
this.healthListener = subchannelHealthListener;
} }
public Subchannel getSubchannel() { public Subchannel getSubchannel() {
@ -695,7 +689,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
} }
private ConnectivityState getHealthState() { private ConnectivityState getHealthState() {
return healthListener.healthStateInfo.getState(); return healthStateInfo.getState();
} }
} }