add overload to whileTrue that takes supplier ; deprecated version that takes a Function

This commit is contained in:
Alec Grieser 2017-11-01 11:21:23 -07:00
parent b1e3864c0e
commit eb62b9d9a9
6 changed files with 52 additions and 28 deletions

View File

@ -22,10 +22,8 @@ package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import com.apple.foundationdb.async.AsyncUtil;
@ -72,7 +70,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
public <T> CompletableFuture<T> runAsync(final Function<? super Transaction, CompletableFuture<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<>(createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<>();
return AsyncUtil.whileTrue(v -> {
return AsyncUtil.whileTrue(() -> {
CompletableFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());
return process.thenComposeAsync(returnVal ->

View File

@ -22,18 +22,15 @@ package com.apple.foundationdb.async;
import static com.apple.foundationdb.FDB.DEFAULT_EXECUTOR;
import com.apple.foundationdb.FDBException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Provided utilities for using and manipulating {@link CompletableFuture}s.
@ -85,9 +82,9 @@ public class AsyncUtil {
final List<V> accumulator = new LinkedList<V>();
// The condition of the while loop is simply "onHasNext()" returning true
Function<Void, CompletableFuture<Boolean>> condition = new Function<Void, CompletableFuture<Boolean>>() {
Supplier<CompletableFuture<Boolean>> condition = new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void v) {
public CompletableFuture<Boolean> get() {
return it.onHasNext().thenApply(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean o) {
@ -170,11 +167,11 @@ public class AsyncUtil {
}
private static class LoopPartial implements BiFunction<Boolean, Throwable, Void> {
final Function<Void, ? extends CompletableFuture<Boolean>> body;
final Supplier<? extends CompletableFuture<Boolean>> body;
final CompletableFuture<Void> done;
final Executor executor;
public LoopPartial(Function<Void, ? extends CompletableFuture<Boolean>> body, Executor executor) {
public LoopPartial(Supplier<? extends CompletableFuture<Boolean>> body, Executor executor) {
this.body = body;
this.done = new CompletableFuture<>();
this.executor = executor;
@ -192,7 +189,7 @@ public class AsyncUtil {
}
CompletableFuture<Boolean> result;
try {
result = body.apply(null);
result = body.get();
} catch (Exception e) {
done.completeExceptionally(e);
break;
@ -226,7 +223,10 @@ public class AsyncUtil {
* @param body the asynchronous operation over which to loop
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier) whileTrue} that takes a
* {@link Supplier} instead.
*/
@Deprecated
public static CompletableFuture<Void> whileTrue(Function<Void,? extends CompletableFuture<Boolean>> body) {
return whileTrue(body, DEFAULT_EXECUTOR);
}
@ -238,8 +238,34 @@ public class AsyncUtil {
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier, Executor) whileTrue} that takes a
* {@link Supplier} instead.
*/
@Deprecated
public static CompletableFuture<Void> whileTrue(Function<Void,? extends CompletableFuture<Boolean>> body, Executor executor) {
return whileTrue(() -> body.apply(null), executor);
}
/**
* Executes an asynchronous operation repeatedly until it returns {@code False}.
*
* @param body the asynchronous operation over which to loop
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
*/
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body) {
return whileTrue(body, DEFAULT_EXECUTOR);
}
/**
* Executes an asynchronous operation repeatedly until it returns {@code False}.
*
* @param body the asynchronous operation over which to loop
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
*/
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body, Executor executor) {
return new LoopPartial(body, executor).run();
}

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
@ -730,9 +731,9 @@ public class DirectoryLayer implements Directory
tr.clear(Range.startsWith(nodeSubspace.unpack(node.getKey()).getBytes(0)));
tr.clear(node.range());
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
CompletableFuture<Void> subdirRemoveFuture;
if(rangeItr.onHasNext().isDone() && rangeItr.hasNext())
subdirRemoveFuture = removeRecursive(tr, nodeWithPrefix(rangeItr.next().getValue()));
@ -1039,9 +1040,9 @@ public class DirectoryLayer implements Directory
node = new Node(rootNode, currentPath, path);
currentPath = new ArrayList<String>();
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
if(index == path.size())
return CompletableFuture.completedFuture(false);
@ -1163,9 +1164,9 @@ public class DirectoryLayer implements Directory
}
public CompletableFuture<byte[]> find(final Transaction tr, final HighContentionAllocator allocator) {
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext()
.thenApply(new Function<Boolean, Void>() {
@ -1203,9 +1204,9 @@ public class DirectoryLayer implements Directory
public CompletableFuture<Void> chooseWindow(final Transaction tr, final HighContentionAllocator allocator) {
final long initialWindowStart = windowStart;
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
final byte[] counterKey = allocator.counters.get(windowStart).getKey();
Range oldCounters = new Range(allocator.counters.getKey(), counterKey);
@ -1244,9 +1245,9 @@ public class DirectoryLayer implements Directory
public CompletableFuture<Boolean> choosePrefix(final Transaction tr, final HighContentionAllocator allocator) {
restart = false;
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional

View File

@ -36,10 +36,8 @@ import com.apple.foundationdb.KeySelector;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.Tuple;
@ -520,7 +518,7 @@ public class AsyncStackTester {
return inst.popParams(listSize).thenApply(new Function<List<Object>, Void>() {
@Override
public Void apply(List<Object> rawElements) {
List<Tuple> tuples = new ArrayList(listSize);
List<Tuple> tuples = new ArrayList<Tuple>(listSize);
for(Object o : rawElements) {
tuples.add(Tuple.fromBytes((byte[])o));
}

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.Tuple;
@ -40,9 +41,9 @@ class DirectoryUtil {
}
CompletableFuture<List<Tuple>> pop() {
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
if(num-- == 0) {
return CompletableFuture.completedFuture(false);
}

View File

@ -29,7 +29,7 @@ public class WhileTrueTest {
// This should cause memory issues using the old implementation but not the new one.
// Pro tip: Run with options -Xms16m -Xmx16m -XX:+HeadDumpOnOutOfMemoryError
AtomicInteger count = new AtomicInteger(1000000);
AsyncUtil.whileTrue(v -> CompletableFuture.completedFuture(count.decrementAndGet()).thenApplyAsync(c -> c > 0)).join();
AsyncUtil.whileTrue(() -> CompletableFuture.completedFuture(count.decrementAndGet()).thenApplyAsync(c -> c > 0)).join();
System.out.println("Final value: " + count.get());
}
}