util: Improve RoundRobinLoadBalancer scalability with stickiness

- Rework stickiness picker logic to be non-blocking
- Stash `Subchannel` ref in an attribute rather than dedicated map
This commit is contained in:
Nick Hill 2018-06-29 09:49:23 -07:00 committed by ZHANG Dapeng
parent ac55604527
commit b2dd6ae7f0
1 changed files with 64 additions and 42 deletions

View File

@ -45,11 +45,14 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -99,6 +102,8 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
@VisibleForTesting
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");
// package-private to avoid synthetic access
static final Attributes.Key<Ref<Subchannel>> STICKY_REF = Attributes.Key.create("sticky-ref");
private static final Logger logger = Logger.getLogger(RoundRobinLoadBalancer.class.getName());
@ -144,16 +149,23 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
// NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel
// doesn't need them. They're describing the resolved server list but we're not taking
// any action based on this information.
Attributes subchannelAttrs = Attributes.newBuilder()
Attributes.Builder subchannelAttrs = Attributes.newBuilder()
// NB(lukaszx0): because attributes are immutable we can't set new value for the key
// after creation but since we can mutate the values we leverge that and set
// after creation but since we can mutate the values we leverage that and set
// AtomicReference which will allow mutating state info for given channel.
.set(
STATE_INFO, new Ref<ConnectivityStateInfo>(ConnectivityStateInfo.forNonError(IDLE)))
.build();
.set(STATE_INFO,
new Ref<ConnectivityStateInfo>(ConnectivityStateInfo.forNonError(IDLE)));
Subchannel subchannel =
checkNotNull(helper.createSubchannel(addressGroup, subchannelAttrs), "subchannel");
Ref<Subchannel> stickyRef = null;
if (stickinessState != null) {
subchannelAttrs.set(STICKY_REF, stickyRef = new Ref<Subchannel>(null));
}
Subchannel subchannel = checkNotNull(
helper.createSubchannel(addressGroup, subchannelAttrs.build()), "subchannel");
if (stickyRef != null) {
stickyRef.value = subchannel;
}
subchannels.put(addressGroup, subchannel);
subchannel.requestConnection();
}
@ -296,58 +308,66 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
static final int MAX_ENTRIES = 1000;
final Key<String> key;
final Map<String, Ref<Subchannel>> stickinessMap =
new LinkedHashMap<String, Ref<Subchannel>>() {
@Override
protected boolean removeEldestEntry(Map.Entry<String,Ref<Subchannel>> eldest) {
return size() > MAX_ENTRIES;
}
};
final ConcurrentMap<String, Ref<Subchannel>> stickinessMap =
new ConcurrentHashMap<String, Ref<Subchannel>>();
final Map<Subchannel, Ref<Subchannel>> subchannelRefs =
new HashMap<Subchannel, Ref<Subchannel>>();
final Queue<String> evictionQueue = new ConcurrentLinkedQueue<String>();
StickinessState(@Nonnull String stickinessKey) {
this.key = Key.of(stickinessKey, Metadata.ASCII_STRING_MARSHALLER);
}
/**
* Returns the subchannel asscoicated to the stickiness value if available in both the
* Returns the subchannel associated to the stickiness value if available in both the
* registry and the round robin list, otherwise associates the given subchannel with the
* stickiness key in the registry and returns the given subchannel.
*/
@Nonnull
synchronized Subchannel maybeRegister(
Subchannel maybeRegister(
String stickinessValue, @Nonnull Subchannel subchannel, List<Subchannel> rrList) {
Subchannel existingSubchannel = getSubchannel(stickinessValue);
if (existingSubchannel != null && rrList.contains(existingSubchannel)) {
return existingSubchannel;
final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
while (true) {
Ref<Subchannel> existingSubchannelRef =
stickinessMap.putIfAbsent(stickinessValue, newSubchannelRef);
if (existingSubchannelRef == null) {
// new entry
addToEvictionQueue(stickinessValue);
return subchannel;
} else {
// existing entry
Subchannel existingSubchannel = existingSubchannelRef.value;
if (existingSubchannel != null && rrList.contains(existingSubchannel)) {
return existingSubchannel;
}
}
// existingSubchannelRef is not null but no longer valid, replace it
if (stickinessMap.replace(stickinessValue, existingSubchannelRef, newSubchannelRef)) {
return subchannel;
}
// another thread concurrently removed or updated the entry, try again
}
}
Ref<Subchannel> subchannelRef = subchannelRefs.get(subchannel);
if (subchannelRef == null) {
subchannelRef = new Ref<Subchannel>(subchannel);
subchannelRefs.put(subchannel, subchannelRef);
private void addToEvictionQueue(String value) {
String oldValue;
while (stickinessMap.size() >= MAX_ENTRIES && (oldValue = evictionQueue.poll()) != null) {
stickinessMap.remove(oldValue);
}
stickinessMap.put(stickinessValue, subchannelRef);
return subchannel;
evictionQueue.add(value);
}
/**
* Unregister the subchannel from StickinessState.
*/
synchronized void remove(Subchannel subchannel) {
if (subchannelRefs.containsKey(subchannel)) {
subchannelRefs.get(subchannel).value = null;
subchannelRefs.remove(subchannel);
}
void remove(Subchannel subchannel) {
subchannel.getAttributes().get(STICKY_REF).value = null;
}
/**
* Gets the subchannel associated with the stickiness value if there is.
*/
@Nullable
synchronized Subchannel getSubchannel(String stickinessValue) {
Subchannel getSubchannel(String stickinessValue) {
Ref<Subchannel> subchannelRef = stickinessMap.get(stickinessValue);
if (subchannelRef != null) {
return subchannelRef.value;
@ -381,16 +401,18 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (list.size() > 0) {
if (stickinessState != null && args.getHeaders().containsKey(stickinessState.key)) {
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
Subchannel subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !list.contains(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel(), list);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !list.contains(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel(), list);
}
}
return PickResult.withSubchannel(subchannel);
}
return PickResult.withSubchannel(nextSubchannel());
return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}
if (status != null) {
@ -401,10 +423,10 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
}
private Subchannel nextSubchannel() {
if (list.isEmpty()) {
int size = list.size();
if (size == 0) {
throw new NoSuchElementException();
}
int size = list.size();
int i = indexUpdater.incrementAndGet(this);
if (i >= size) {