mirror of https://github.com/grpc/grpc-java.git
rls: Change AdaptiveThrottler to use Ticker instead of TimeProvider (#9390)
rls: Change AdaptiveThrottler to use Ticker instead of TimeProvider * Use a slot being null to mark invalid rather than relying on the slot's endNanos value. Fixes #9048
This commit is contained in:
parent
50cdfa9f05
commit
dcac7689fa
|
@ -21,7 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import io.grpc.internal.TimeProvider;
|
||||
import com.google.common.base.Ticker;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
|
@ -60,7 +60,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
* is currently accepting.
|
||||
*/
|
||||
private final float ratioForAccepts;
|
||||
private final TimeProvider timeProvider;
|
||||
private final Ticker ticker;
|
||||
/**
|
||||
* The number of requests attempted by the client during the Adaptive Throttler instance's
|
||||
* history of calls. This includes requests throttled at the client. The history period defaults
|
||||
|
@ -79,10 +79,10 @@ final class AdaptiveThrottler implements Throttler {
|
|||
this.historySeconds = builder.historySeconds;
|
||||
this.requestsPadding = builder.requestsPadding;
|
||||
this.ratioForAccepts = builder.ratioForAccepts;
|
||||
this.timeProvider = builder.timeProvider;
|
||||
this.ticker = builder.ticker;
|
||||
long internalNanos = TimeUnit.SECONDS.toNanos(historySeconds);
|
||||
this.requestStat = new TimeBasedAccumulator(internalNanos, timeProvider);
|
||||
this.throttledStat = new TimeBasedAccumulator(internalNanos, timeProvider);
|
||||
this.requestStat = new TimeBasedAccumulator(internalNanos, ticker);
|
||||
this.throttledStat = new TimeBasedAccumulator(internalNanos, ticker);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,7 +92,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
|
||||
@VisibleForTesting
|
||||
boolean shouldThrottle(float random) {
|
||||
long nowNanos = timeProvider.currentTimeNanos();
|
||||
long nowNanos = ticker.read();
|
||||
if (getThrottleProbability(nowNanos) <= random) {
|
||||
return false;
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
|
||||
@Override
|
||||
public void registerBackendResponse(boolean throttled) {
|
||||
long now = timeProvider.currentTimeNanos();
|
||||
long now = ticker.read();
|
||||
requestStat.increment(now);
|
||||
if (throttled) {
|
||||
throttledStat.increment(now);
|
||||
|
@ -150,7 +150,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
private float ratioForAccepts = DEFAULT_RATIO_FOR_ACCEPT;
|
||||
private int historySeconds = DEFAULT_HISTORY_SECONDS;
|
||||
private int requestsPadding = DEFAULT_REQUEST_PADDING;
|
||||
private TimeProvider timeProvider = TimeProvider.SYSTEM_TIME_PROVIDER;
|
||||
private Ticker ticker = Ticker.systemTicker();
|
||||
|
||||
public Builder setRatioForAccepts(float ratioForAccepts) {
|
||||
this.ratioForAccepts = ratioForAccepts;
|
||||
|
@ -167,8 +167,8 @@ final class AdaptiveThrottler implements Throttler {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setTimeProvider(TimeProvider timeProvider) {
|
||||
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
|
||||
public Builder setTicker(Ticker ticker) {
|
||||
this.ticker = checkNotNull(ticker, "ticker");
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -205,9 +205,6 @@ final class AdaptiveThrottler implements Throttler {
|
|||
}
|
||||
}
|
||||
|
||||
// Represents a slot which is not initialized and is unusable.
|
||||
private static final Slot NULL_SLOT = new Slot(-1);
|
||||
|
||||
/** The array of slots. */
|
||||
private final AtomicReferenceArray<Slot> slots = new AtomicReferenceArray<>(NUM_SLOTS);
|
||||
|
||||
|
@ -224,7 +221,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
*/
|
||||
private volatile int currentIndex;
|
||||
|
||||
private final TimeProvider timeProvider;
|
||||
private final Ticker ticker;
|
||||
|
||||
/**
|
||||
* Interval constructor.
|
||||
|
@ -232,7 +229,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
* @param internalNanos is the stat interval in nanoseconds
|
||||
* @throws IllegalArgumentException if the supplied interval is too small to be effective
|
||||
*/
|
||||
TimeBasedAccumulator(long internalNanos, TimeProvider timeProvider) {
|
||||
TimeBasedAccumulator(long internalNanos, Ticker ticker) {
|
||||
checkArgument(
|
||||
internalNanos >= NUM_SLOTS,
|
||||
"Interval must be greater than %s",
|
||||
|
@ -240,30 +237,27 @@ final class AdaptiveThrottler implements Throttler {
|
|||
this.interval = internalNanos;
|
||||
this.slotNanos = internalNanos / NUM_SLOTS;
|
||||
this.currentIndex = 0;
|
||||
for (int i = 0; i < NUM_SLOTS; i++) {
|
||||
slots.set(i, NULL_SLOT);
|
||||
}
|
||||
this.timeProvider = checkNotNull(timeProvider, "ticker");
|
||||
this.ticker = checkNotNull(ticker, "ticker");
|
||||
}
|
||||
|
||||
/** Gets the current slot. */
|
||||
private Slot getSlot(long now) {
|
||||
Slot currentSlot = slots.get(currentIndex);
|
||||
if (now < currentSlot.endNanos) {
|
||||
if (currentSlot != null && now - currentSlot.endNanos < 0) {
|
||||
return currentSlot;
|
||||
} else {
|
||||
long slotBoundary = getSlotEndTime(now);
|
||||
synchronized (this) {
|
||||
int index = currentIndex;
|
||||
currentSlot = slots.get(index);
|
||||
if (now < currentSlot.endNanos) {
|
||||
if (currentSlot != null && now - currentSlot.endNanos < 0) {
|
||||
return currentSlot;
|
||||
}
|
||||
int newIndex = (index == NUM_SLOTS - 1) ? 0 : index + 1;
|
||||
Slot nextSlot = new Slot(slotBoundary);
|
||||
slots.set(newIndex, nextSlot);
|
||||
// Set currentIndex only after assigning the new slot to slots, otherwise
|
||||
// racing readers will see NULL_SLOT or an old slot.
|
||||
// racing readers will see null or an old slot.
|
||||
currentIndex = newIndex;
|
||||
return nextSlot;
|
||||
}
|
||||
|
@ -294,7 +288,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
*
|
||||
* @param now is the time used to increment the count
|
||||
*/
|
||||
final void increment(long now) {
|
||||
void increment(long now) {
|
||||
getSlot(now).increment();
|
||||
}
|
||||
|
||||
|
@ -304,28 +298,33 @@ final class AdaptiveThrottler implements Throttler {
|
|||
* @param now the current time
|
||||
* @return the statistic count
|
||||
*/
|
||||
final long get(long now) {
|
||||
long get(long now) {
|
||||
long intervalEnd = getSlotEndTime(now);
|
||||
long intervalStart = intervalEnd - interval;
|
||||
// This is the point at which increments to new slots will be ignored.
|
||||
int index = currentIndex;
|
||||
|
||||
long accumulated = 0L;
|
||||
long prevSlotEnd = Long.MAX_VALUE;
|
||||
Long prevSlotEnd = null;
|
||||
for (int i = 0; i < NUM_SLOTS; i++) {
|
||||
if (index < 0) {
|
||||
index = NUM_SLOTS - 1;
|
||||
}
|
||||
Slot currentSlot = slots.get(index);
|
||||
index--;
|
||||
if (currentSlot == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
long currentSlotEnd = currentSlot.endNanos;
|
||||
|
||||
if (currentSlotEnd <= intervalStart || currentSlotEnd > prevSlotEnd) {
|
||||
if (currentSlotEnd - intervalStart <= 0
|
||||
|| (prevSlotEnd != null && currentSlotEnd - prevSlotEnd > 0)) {
|
||||
break;
|
||||
}
|
||||
prevSlotEnd = currentSlotEnd;
|
||||
|
||||
if (currentSlotEnd > intervalEnd) {
|
||||
if (currentSlotEnd - intervalEnd > 0) {
|
||||
continue;
|
||||
}
|
||||
accumulated = accumulated + currentSlot.count;
|
||||
|
@ -337,7 +336,7 @@ final class AdaptiveThrottler implements Throttler {
|
|||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("interval", interval)
|
||||
.add("current_count", get(timeProvider.currentTimeNanos()))
|
||||
.add("current_count", get(ticker.read()))
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ package io.grpc.rls;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import com.google.common.base.Ticker;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.TimeProvider;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -30,13 +30,13 @@ public class AdaptiveThrottlerTest {
|
|||
private static final float TOLERANCE = 0.0001f;
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
private final TimeProvider fakeTimeProvider = fakeClock.getTimeProvider();
|
||||
private final Ticker fakeTicker = fakeClock.getTicker();
|
||||
private final AdaptiveThrottler throttler =
|
||||
new AdaptiveThrottler.Builder()
|
||||
.setHistorySeconds(1)
|
||||
.setRatioForAccepts(1.0f)
|
||||
.setRequestsPadding(1)
|
||||
.setTimeProvider(fakeTimeProvider)
|
||||
.setTicker(fakeTicker)
|
||||
.build();
|
||||
|
||||
@Test
|
||||
|
@ -44,9 +44,9 @@ public class AdaptiveThrottlerTest {
|
|||
long startTime = fakeClock.currentTimeMillis();
|
||||
|
||||
// initial states
|
||||
assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(0L);
|
||||
assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(0L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(0L);
|
||||
assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(0L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTicker.read()))
|
||||
.isWithin(TOLERANCE).of(0.0f);
|
||||
|
||||
// Request 1, allowed by all.
|
||||
|
@ -54,10 +54,10 @@ public class AdaptiveThrottlerTest {
|
|||
fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
|
||||
throttler.registerBackendResponse(false);
|
||||
|
||||
assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.requestStat.get(fakeTicker.read()))
|
||||
.isEqualTo(1L);
|
||||
assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(0L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(0L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTicker.read()))
|
||||
.isWithin(TOLERANCE).of(0.0f);
|
||||
|
||||
// Request 2, throttled by backend
|
||||
|
@ -65,25 +65,26 @@ public class AdaptiveThrottlerTest {
|
|||
fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
|
||||
throttler.registerBackendResponse(true);
|
||||
|
||||
assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.requestStat.get(fakeTicker.read()))
|
||||
.isEqualTo(2L);
|
||||
assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.throttledStat.get(fakeTicker.read()))
|
||||
.isEqualTo(1L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.getThrottleProbability(fakeTicker.read()))
|
||||
.isWithin(TOLERANCE)
|
||||
.of(1.0f / 3.0f);
|
||||
|
||||
// Skip to half second mark from the beginning (half the duration).
|
||||
fakeClock.forwardTime(500 - (fakeClock.currentTimeMillis() - startTime), TimeUnit.MILLISECONDS);
|
||||
fakeClock.forwardTime(500 - (fakeClock.currentTimeMillis() - startTime),
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
// Request 3, throttled by backend
|
||||
assertThat(throttler.shouldThrottle(0.4f)).isFalse();
|
||||
fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
|
||||
throttler.registerBackendResponse(true);
|
||||
|
||||
assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(3L);
|
||||
assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(2L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(3L);
|
||||
assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(2L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTicker.read()))
|
||||
.isWithin(TOLERANCE)
|
||||
.of(2.0f / 4.0f);
|
||||
|
||||
|
@ -91,9 +92,9 @@ public class AdaptiveThrottlerTest {
|
|||
assertThat(throttler.shouldThrottle(0.4f)).isTrue();
|
||||
fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(4L);
|
||||
assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(3L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(4L);
|
||||
assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(3L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTicker.read()))
|
||||
.isWithin(TOLERANCE)
|
||||
.of(3.0f / 5.0f);
|
||||
|
||||
|
@ -101,10 +102,23 @@ public class AdaptiveThrottlerTest {
|
|||
fakeClock.forwardTime(
|
||||
1250 - (fakeClock.currentTimeMillis() - startTime), TimeUnit.MILLISECONDS);
|
||||
|
||||
assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(2L);
|
||||
assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(2L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos()))
|
||||
assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(2L);
|
||||
assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(2L);
|
||||
assertThat(throttler.getThrottleProbability(fakeTicker.read()))
|
||||
.isWithin(TOLERANCE)
|
||||
.of(2.0f / 3.0f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that when the ticker returns a negative value for now that the slot detection logic
|
||||
* is correctly handled and then when the value transitions from negative to positive that things
|
||||
* continue to work correctly.
|
||||
*/
|
||||
@Test
|
||||
public void negativeTickerValues() {
|
||||
long rewindAmount = TimeUnit.MILLISECONDS.toNanos(300) + fakeClock.getTicker().read();
|
||||
fakeClock.forwardTime(-1 * rewindAmount, TimeUnit.NANOSECONDS);
|
||||
assertThat(fakeClock.getTicker().read()).isEqualTo(TimeUnit.MILLISECONDS.toNanos(-300));
|
||||
shouldThrottle();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue