mirror of https://github.com/grpc/grpc-java.git
xds, wrr: randomize the initial deadline in the scheduler (#9922)
This commit is contained in:
parent
7e41d82b5a
commit
c367b267c6
|
@ -64,10 +64,10 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
private final Ticker ticker;
|
||||
|
||||
public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
|
||||
this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker);
|
||||
this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
|
||||
}
|
||||
|
||||
public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker) {
|
||||
public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) {
|
||||
super(helper);
|
||||
helper.setLoadBalancer(this);
|
||||
this.ticker = checkNotNull(ticker, "ticker");
|
||||
|
@ -75,7 +75,12 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
|
||||
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
|
||||
this.updateWeightTask = new UpdateWeightTask();
|
||||
this.random = new Random();
|
||||
this.random = random;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) {
|
||||
this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,8 +105,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
|
||||
@Override
|
||||
public RoundRobinPicker createReadyPicker(List<Subchannel> activeList) {
|
||||
int startIndex = random.nextInt(activeList.size());
|
||||
return new WeightedRoundRobinPicker(activeList, startIndex);
|
||||
return new WeightedRoundRobinPicker(activeList);
|
||||
}
|
||||
|
||||
private final class UpdateWeightTask implements Runnable {
|
||||
|
@ -228,8 +232,8 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
private volatile EdfScheduler scheduler;
|
||||
private volatile boolean rrMode;
|
||||
|
||||
WeightedRoundRobinPicker(List<Subchannel> list, int startIndex) {
|
||||
super(checkNotNull(list, "list"), startIndex);
|
||||
WeightedRoundRobinPicker(List<Subchannel> list) {
|
||||
super(checkNotNull(list, "list"), random.nextInt(list.size()));
|
||||
Preconditions.checkArgument(!list.isEmpty(), "empty list");
|
||||
this.list = list;
|
||||
updateWeight();
|
||||
|
@ -266,7 +270,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
rrMode = true;
|
||||
return;
|
||||
}
|
||||
EdfScheduler scheduler = new EdfScheduler(list.size());
|
||||
EdfScheduler scheduler = new EdfScheduler(list.size(), random);
|
||||
avgWeight /= 1.0 * weightedChannelCount;
|
||||
for (int i = 0; i < list.size(); i++) {
|
||||
WrrSubchannel subchannel = (WrrSubchannel) list.get(i);
|
||||
|
@ -348,11 +352,13 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final Random random;
|
||||
|
||||
/**
|
||||
* Use the item's deadline as the order in the priority queue. If the deadlines are the same,
|
||||
* use the index. Index should be unique.
|
||||
*/
|
||||
EdfScheduler(int initialCapacity) {
|
||||
EdfScheduler(int initialCapacity, Random random) {
|
||||
this.prioQueue = new PriorityQueue<ObjectState>(initialCapacity, (o1, o2) -> {
|
||||
if (o1.deadline == o2.deadline) {
|
||||
return Integer.compare(o1.index, o2.index);
|
||||
|
@ -360,6 +366,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
return Double.compare(o1.deadline, o2.deadline);
|
||||
}
|
||||
});
|
||||
this.random = random;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -371,8 +378,8 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
void add(int index, double weight) {
|
||||
checkArgument(weight > 0.0, "Weights need to be positive.");
|
||||
ObjectState state = new ObjectState(Math.max(weight, MINIMUM_WEIGHT), index);
|
||||
state.deadline = 1 / state.weight;
|
||||
// TODO(zivy): randomize the initial deadline.
|
||||
// Randomize the initial deadline.
|
||||
state.deadline = random.nextDouble() * (1 / state.weight);
|
||||
prioQueue.add(state);
|
||||
}
|
||||
|
||||
|
|
|
@ -171,7 +171,8 @@ public class WeightedRoundRobinLoadBalancerTest {
|
|||
return subchannel;
|
||||
}
|
||||
});
|
||||
wrr = new WeightedRoundRobinLoadBalancer(helper, fakeClock.getDeadlineTicker());
|
||||
wrr = new WeightedRoundRobinLoadBalancer(helper, fakeClock.getDeadlineTicker(),
|
||||
new FakeRandom());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -625,7 +626,7 @@ public class WeightedRoundRobinLoadBalancerTest {
|
|||
double totalWeight = 0;
|
||||
int capacity = random.nextInt(10) + 1;
|
||||
double[] weights = new double[capacity];
|
||||
EdfScheduler scheduler = new EdfScheduler(capacity);
|
||||
EdfScheduler scheduler = new EdfScheduler(capacity, random);
|
||||
for (int i = 0; i < capacity; i++) {
|
||||
weights[i] = random.nextDouble();
|
||||
scheduler.add(i, weights[i]);
|
||||
|
@ -643,7 +644,7 @@ public class WeightedRoundRobinLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void edsScheduler_sameWeight() {
|
||||
EdfScheduler scheduler = new EdfScheduler(2);
|
||||
EdfScheduler scheduler = new EdfScheduler(2, new FakeRandom());
|
||||
scheduler.add(0, 0.5);
|
||||
scheduler.add(1, 0.5);
|
||||
assertThat(scheduler.pick()).isEqualTo(0);
|
||||
|
@ -670,4 +671,12 @@ public class WeightedRoundRobinLoadBalancerTest {
|
|||
return "FakeSocketAddress-" + name;
|
||||
}
|
||||
}
|
||||
|
||||
private static class FakeRandom extends Random {
|
||||
@Override
|
||||
public double nextDouble() {
|
||||
// return constant value to disable init deadline randomization in the scheduler
|
||||
return 0.322023;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue