From c367b267c66d773433a158833f7b928004fc5c87 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 2 Mar 2023 15:17:54 -0800 Subject: [PATCH] xds, wrr: randomize the initial deadline in the scheduler (#9922) --- .../xds/WeightedRoundRobinLoadBalancer.java | 29 ++++++++++++------- .../WeightedRoundRobinLoadBalancerTest.java | 15 ++++++++-- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 60804fec7b..8a0d97c57b 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -64,10 +64,10 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { private final Ticker ticker; public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) { - this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker); + this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random()); } - public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker) { + public WeightedRoundRobinLoadBalancer(WrrHelper helper, Ticker ticker, Random random) { super(helper); helper.setLoadBalancer(this); this.ticker = checkNotNull(ticker, "ticker"); @@ -75,7 +75,12 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); this.updateWeightTask = new UpdateWeightTask(); - this.random = new Random(); + this.random = random; + } + + @VisibleForTesting + WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker, Random random) { + this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, random); } @Override @@ -100,8 +105,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { @Override public RoundRobinPicker createReadyPicker(List activeList) { - int startIndex = random.nextInt(activeList.size()); - return new WeightedRoundRobinPicker(activeList, startIndex); + return new WeightedRoundRobinPicker(activeList); } private final class UpdateWeightTask implements Runnable { @@ -228,8 +232,8 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { private volatile EdfScheduler scheduler; private volatile boolean rrMode; - WeightedRoundRobinPicker(List list, int startIndex) { - super(checkNotNull(list, "list"), startIndex); + WeightedRoundRobinPicker(List list) { + super(checkNotNull(list, "list"), random.nextInt(list.size())); Preconditions.checkArgument(!list.isEmpty(), "empty list"); this.list = list; updateWeight(); @@ -266,7 +270,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { rrMode = true; return; } - EdfScheduler scheduler = new EdfScheduler(list.size()); + EdfScheduler scheduler = new EdfScheduler(list.size(), random); avgWeight /= 1.0 * weightedChannelCount; for (int i = 0; i < list.size(); i++) { WrrSubchannel subchannel = (WrrSubchannel) list.get(i); @@ -348,11 +352,13 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { private final Object lock = new Object(); + private final Random random; + /** * Use the item's deadline as the order in the priority queue. If the deadlines are the same, * use the index. Index should be unique. */ - EdfScheduler(int initialCapacity) { + EdfScheduler(int initialCapacity, Random random) { this.prioQueue = new PriorityQueue(initialCapacity, (o1, o2) -> { if (o1.deadline == o2.deadline) { return Integer.compare(o1.index, o2.index); @@ -360,6 +366,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { return Double.compare(o1.deadline, o2.deadline); } }); + this.random = random; } /** @@ -371,8 +378,8 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { void add(int index, double weight) { checkArgument(weight > 0.0, "Weights need to be positive."); ObjectState state = new ObjectState(Math.max(weight, MINIMUM_WEIGHT), index); - state.deadline = 1 / state.weight; - // TODO(zivy): randomize the initial deadline. + // Randomize the initial deadline. + state.deadline = random.nextDouble() * (1 / state.weight); prioQueue.add(state); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index ed8540ff13..eedcea935c 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -171,7 +171,8 @@ public class WeightedRoundRobinLoadBalancerTest { return subchannel; } }); - wrr = new WeightedRoundRobinLoadBalancer(helper, fakeClock.getDeadlineTicker()); + wrr = new WeightedRoundRobinLoadBalancer(helper, fakeClock.getDeadlineTicker(), + new FakeRandom()); } @Test @@ -625,7 +626,7 @@ public class WeightedRoundRobinLoadBalancerTest { double totalWeight = 0; int capacity = random.nextInt(10) + 1; double[] weights = new double[capacity]; - EdfScheduler scheduler = new EdfScheduler(capacity); + EdfScheduler scheduler = new EdfScheduler(capacity, random); for (int i = 0; i < capacity; i++) { weights[i] = random.nextDouble(); scheduler.add(i, weights[i]); @@ -643,7 +644,7 @@ public class WeightedRoundRobinLoadBalancerTest { @Test public void edsScheduler_sameWeight() { - EdfScheduler scheduler = new EdfScheduler(2); + EdfScheduler scheduler = new EdfScheduler(2, new FakeRandom()); scheduler.add(0, 0.5); scheduler.add(1, 0.5); assertThat(scheduler.pick()).isEqualTo(0); @@ -670,4 +671,12 @@ public class WeightedRoundRobinLoadBalancerTest { return "FakeSocketAddress-" + name; } } + + private static class FakeRandom extends Random { + @Override + public double nextDouble() { + // return constant value to disable init deadline randomization in the scheduler + return 0.322023; + } + } }