move most old-style closures to Java 8-style closures

This commit is contained in:
Alec Grieser 2017-12-11 08:54:35 -08:00
parent 7818beac18
commit 154000e2c3
13 changed files with 521 additions and 904 deletions

View File

@ -117,7 +117,7 @@ public interface Database extends Disposable, TransactionContext {
*/
@Override
default <T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, CompletableFuture<T>> retryable) {
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable) {
return readAsync(retryable, getExecutor());
}
@ -133,7 +133,7 @@ public interface Database extends Disposable, TransactionContext {
* @see #readAsync(Function)
*/
<T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, CompletableFuture<T>> retryable, Executor e);
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable, Executor e);
/**
* Runs a transactional function against this {@code Database} with retry logic.
@ -195,7 +195,7 @@ public interface Database extends Disposable, TransactionContext {
*/
@Override
default <T> CompletableFuture<T> runAsync(
Function<? super Transaction, CompletableFuture<T>> retryable) {
Function<? super Transaction, ? extends CompletableFuture<T>> retryable) {
return runAsync(retryable, getExecutor());
}
@ -211,5 +211,5 @@ public interface Database extends Disposable, TransactionContext {
* @see #run(Function)
*/
<T> CompletableFuture<T> runAsync(
Function<? super Transaction, CompletableFuture<T>> retryable, Executor e);
Function<? super Transaction, ? extends CompletableFuture<T>> retryable, Executor e);
}

View File

@ -103,21 +103,8 @@ public class FDB {
private FDB(int apiVersion) {
this.apiVersion = apiVersion;
options = new NetworkOptions(new OptionConsumer() {
@Override
public void setOption(int code, byte[] parameter) {
Network_setOption(code, parameter);
}
});
Runtime.getRuntime().addShutdownHook(new Thread(
new Runnable(){
@Override
public void run() {
FDB.this.stopNetwork();
}
}
));
options = new NetworkOptions(this::Network_setOption);
Runtime.getRuntime().addShutdownHook(new Thread(this::stopNetwork));
}
/**
@ -345,39 +332,38 @@ public class FDB {
Network_setup();
netStarted = true;
e.execute(new Runnable() {
@Override
public void run() {
boolean acquired = false;
try {
while(!acquired) {
try {
// make attempt to avoid a needless deadlock
synchronized (FDB.this) {
if(netStopped) {
return;
}
}
e.execute(() -> {
boolean acquired = false;
try {
while(!acquired) {
try {
// make attempt to avoid a needless deadlock
synchronized (FDB.this) {
if(netStopped) {
return;
}
}
netRunning.acquire();
acquired = true;
} catch(InterruptedException e) {}
netRunning.acquire();
acquired = true;
} catch(InterruptedException err) {
// Swallow thread interruption
}
try {
Network_run();
} catch (Throwable t) {
System.err.println("Unhandled error in FoundationDB network thread: " + t.getMessage());
// eat this error. we have nowhere to send it.
}
} finally {
if(acquired) {
netRunning.release();
}
synchronized (FDB.this) {
netStopped = true;
}
}
}
}
try {
Network_run();
} catch (Throwable t) {
System.err.println("Unhandled error in FoundationDB network thread: " + t.getMessage());
// eat this error. we have nowhere to send it.
}
} finally {
if(acquired) {
netRunning.release();
}
synchronized (FDB.this) {
netStopped = true;
}
}
});
}

View File

@ -67,18 +67,18 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
}
@Override
public <T> CompletableFuture<T> runAsync(final Function<? super Transaction, CompletableFuture<T>> retryable, Executor e) {
public <T> CompletableFuture<T> runAsync(final Function<? super Transaction, ? extends CompletableFuture<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<>(createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<>();
return AsyncUtil.whileTrue(() -> {
CompletableFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());
return process.thenComposeAsync(returnVal ->
return AsyncUtil.composeHandleAsync(process.thenComposeAsync(returnVal ->
trRef.get().commit().thenApply(o -> {
returnValue.set(returnVal);
return false;
})
, e).handleAsync((value, t) -> {
, e), (value, t) -> {
if(t == null)
return CompletableFuture.completedFuture(value);
if(!(t instanceof RuntimeException))
@ -87,7 +87,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
trRef.set(newTr);
return true;
});
}, e).thenCompose(x -> x);
}, e);
}, e)
.thenApply(o -> returnValue.get())
.whenComplete((v, t) -> trRef.get().dispose());
@ -95,7 +95,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
@Override
public <T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, CompletableFuture<T>> retryable, Executor e) {
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable, Executor e) {
return this.runAsync(retryable, e);
}

View File

@ -137,7 +137,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
public <T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, CompletableFuture<T>> retryable) {
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable) {
return AsyncUtil.applySafely(retryable, this);
}
@ -356,7 +356,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
public <T> CompletableFuture<T> runAsync(
Function<? super Transaction, CompletableFuture<T>> retryable) {
Function<? super Transaction, ? extends CompletableFuture<T>> retryable) {
return AsyncUtil.applySafely(retryable, this);
}
@ -367,7 +367,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
public <T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, CompletableFuture<T>> retryable) {
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable) {
return AsyncUtil.applySafely(retryable, this);
}

View File

@ -24,10 +24,12 @@ import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.async.DisposableAsyncIterator;
import com.apple.foundationdb.tuple.ByteArrayUtil;
@ -137,7 +139,7 @@ public class LocalityUtil {
firstGet = tr.getRange(keyServersForKey(begin), keyServersForKey(end));
block = firstGet.iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
nextFuture = AsyncUtil.composeHandleAsync(block.onHasNext(), handler, tr.getExecutor());
disposed = false;
}
@ -161,7 +163,7 @@ public class LocalityUtil {
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
nextFuture = AsyncUtil.composeHandleAsync(block.onHasNext(), handler, tr.getExecutor());
return nextFuture;
}
@ -174,8 +176,9 @@ public class LocalityUtil {
if(o instanceof FDBException) {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
Executor executor = BoundaryIterator.this.tr.getExecutor();
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction(executor);
return restartGet();
}
}
@ -200,7 +203,7 @@ public class LocalityUtil {
byte[] key = o.getKey();
byte[] suffix = Arrays.copyOfRange(key, 13, key.length);
BoundaryIterator.this.begin = ByteArrayUtil.join(suffix, new byte[] { (byte)0 });
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
nextFuture = AsyncUtil.composeHandleAsync(block.onHasNext(), handler, tr.getExecutor());
return suffix;
}

View File

@ -234,14 +234,14 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// We have a chunk and are still working though it
if(index < chunk.values.size()) {
return CompletableFuture.completedFuture(true);
return AsyncUtil.READY_TRUE;
}
// If we are at the end of the current chunk there is either:
// - no more data -or-
// - we are already fetching the next block
return mainChunkIsTheLast() ?
CompletableFuture.completedFuture(false) :
AsyncUtil.READY_FALSE :
nextFuture;
}

View File

@ -62,7 +62,7 @@ public interface ReadTransactionContext {
* to {@code retryable}
*/
<T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, CompletableFuture<T>> retryable);
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable);
/**
* Retrieves the {@link Executor} used by this {@code TransactionContext} when running

View File

@ -367,6 +367,6 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
*/
@Override
<T> CompletableFuture<T> runAsync(
Function<? super Transaction, CompletableFuture<T>> retryable);
Function<? super Transaction, ? extends CompletableFuture<T>> retryable);
}

View File

@ -59,5 +59,5 @@ public interface TransactionContext extends ReadTransactionContext {
* @return a {@code CompletableFuture} that will be set to the value returned by the last call
* to {@code retryable}
*/
<T> CompletableFuture<T> runAsync(Function<? super Transaction, CompletableFuture<T>> retryable);
<T> CompletableFuture<T> runAsync(Function<? super Transaction, ? extends CompletableFuture<T>> retryable);
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -36,6 +37,10 @@ import java.util.function.Supplier;
* Provided utilities for using and manipulating {@link CompletableFuture}s.
*/
public class AsyncUtil {
public static final CompletableFuture<Void> DONE = CompletableFuture.completedFuture(null);
public static final CompletableFuture<Boolean> READY_TRUE = CompletableFuture.completedFuture(true);
public static final CompletableFuture<Boolean> READY_FALSE = CompletableFuture.completedFuture(false);
/**
* Run {@code Function} {@code func}, returning all caught exceptions as a
* {@code CompletableFuture} in an error state.
@ -46,7 +51,7 @@ public class AsyncUtil {
* @return the output of {@code func}, or a {@code CompletableFuture} carrying any exception
* caught in the process.
*/
public static <I,O> CompletableFuture<O> applySafely( Function<I, CompletableFuture<O>> func, I value ) {
public static <I,O> CompletableFuture<O> applySafely( Function<I, ? extends CompletableFuture<O>> func, I value ) {
try {
return func.apply(value);
} catch (RuntimeException e) {
@ -56,6 +61,31 @@ public class AsyncUtil {
}
}
public static <V> CompletableFuture<Void> forEach(final AsyncIterable<V> iterable, final Consumer<? super V> consumer) {
return forEach(iterable.iterator(), consumer);
}
public static <V> CompletableFuture<Void> forEach(final AsyncIterable<V> iterable, final Consumer<? super V> consumer, final Executor executor) {
return forEach(iterable.iterator(), consumer, executor);
}
public static <V> CompletableFuture<Void> forEach(final AsyncIterator<V> iterator, final Consumer<? super V> consumer) {
return forEach(iterator, consumer, DEFAULT_EXECUTOR);
}
public static <V> CompletableFuture<Void> forEach(final AsyncIterator<V> iterator, final Consumer<? super V> consumer, final Executor executor) {
return iterator.onHasNext().thenComposeAsync(hasAny -> {
if (hasAny) {
return whileTrue(() -> {
consumer.accept(iterator.next());
return iterator.onHasNext();
}, executor);
} else {
return DONE;
}
}, executor);
}
/**
* Iterates over a set of items and returns the result as a list.
*
@ -104,18 +134,11 @@ public class AsyncUtil {
*/
public static <V> CompletableFuture<List<V>> collect(final AsyncIterator<V> iterator, final Executor executor) {
final List<V> accumulator = new LinkedList<>();
return forEach(iterator, accumulator::add, executor).thenApply(ignore -> accumulator);
}
// The condition of the while loop is simply "onHasNext()" returning true
Supplier<CompletableFuture<Boolean>> condition = () ->
iterator.onHasNext().thenApply(hasNext -> {
if(hasNext) {
accumulator.add(iterator.next());
}
return hasNext;
});
CompletableFuture<Void> complete = whileTrue(condition, executor);
return tag(complete, accumulator);
public static <V, T> AsyncIterable<T> mapIterable(final AsyncIterable<V> iterable, final Function<V, T> func) {
return mapIterable(iterable, func, DEFAULT_EXECUTOR);
}
/**
@ -127,7 +150,7 @@ public class AsyncUtil {
* @return a new iterable with each element mapped to a different value
*/
public static <V, T> AsyncIterable<T> mapIterable(final AsyncIterable<V> iterable,
final Function<V, T> func) {
final Function<V, T> func, final Executor executor) {
return new AsyncIterable<T>() {
@Override
public AsyncIterator<T> iterator() {
@ -136,12 +159,8 @@ public class AsyncUtil {
@Override
public CompletableFuture<List<T>> asList() {
return iterable.asList().thenApply(result -> {
ArrayList<T> out = new ArrayList<>(result.size());
for(V in : result)
out.add(func.apply(in));
return out;
});
final List<T> accumulator = new LinkedList<>();
return tag(AsyncUtil.forEach(iterable, value -> accumulator.add(func.apply(value))), accumulator);
}
};
}
@ -354,8 +373,7 @@ public class AsyncUtil {
* @return a new {@link CompletableFuture} that is set when {@code task} is ready.
*/
public static <V> CompletableFuture<Void> whenReady(CompletableFuture<V> task) {
return task.thenApply(o -> (Void)null)
.exceptionally(o -> null);
return task.handle((v, t) -> null);
}
public static <V> CompletableFuture<V> composeExceptionally(CompletableFuture<V> task, Function<Throwable, CompletableFuture<V>> fn) {
@ -369,6 +387,18 @@ public class AsyncUtil {
});
}
public static <V, T> CompletableFuture<T> composeHandle(CompletableFuture<V> future, BiFunction<V,Throwable,? extends CompletableFuture<T>> fn) {
return future.handle(fn).thenCompose(Function.identity());
}
public static <V, T> CompletableFuture<T> composeHandleAsync(CompletableFuture<V> future, BiFunction<V,Throwable,? extends CompletableFuture<T>> fn) {
return composeHandleAsync(future, fn, DEFAULT_EXECUTOR);
}
public static <V, T> CompletableFuture<T> composeHandleAsync(CompletableFuture<V> future, BiFunction<V,Throwable,? extends CompletableFuture<T>> fn, Executor executor) {
return future.handleAsync(fn, executor).thenCompose(Function.identity());
}
/**
* Collects the results of many asynchronous processes into one asynchronous output. If
* any of the tasks returns an error, the output is set to that error.
@ -379,7 +409,7 @@ public class AsyncUtil {
*/
public static <V> CompletableFuture<List<V>> getAll(final Collection<CompletableFuture<V>> tasks) {
return whenAll(tasks).thenApply(unused -> {
List<V> result = new ArrayList<>();
List<V> result = new ArrayList<>(tasks.size());
for(CompletableFuture<V> f : tasks) {
assert(f.isDone());
result.add(f.getNow(null));
@ -411,8 +441,7 @@ public class AsyncUtil {
* @return a signal that will be set when any of the {@code CompletableFuture}s are done
*/
public static <V> CompletableFuture<Void> whenAny(final Collection<? extends CompletableFuture<V>> input) {
@SuppressWarnings("unchecked")
CompletableFuture<V>[] array = (CompletableFuture<V>[]) input.toArray(new CompletableFuture<?>[input.size()]);
CompletableFuture<?>[] array = input.toArray(new CompletableFuture<?>[input.size()]);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(array);
return success(anyOf);
}
@ -427,8 +456,7 @@ public class AsyncUtil {
* @return a signal that will be set when all of the {@code CompletableFuture}s are done
*/
public static <V> CompletableFuture<Void> whenAll(final Collection<? extends CompletableFuture<V>> input) {
@SuppressWarnings("unchecked")
CompletableFuture<V>[] array = (CompletableFuture<V>[]) input.toArray(new CompletableFuture<?>[input.size()]);
CompletableFuture<?>[] array = input.toArray(new CompletableFuture<?>[input.size()]);
return CompletableFuture.allOf(array);
}

View File

@ -30,7 +30,6 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
@ -308,12 +307,7 @@ public class DirectoryLayer implements Directory
*/
@Override
public CompletableFuture<DirectorySubspace> createOrOpen(TransactionContext tcx, final List<String> path, final byte[] layer) {
return tcx.runAsync(new Function<Transaction, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(Transaction tr) {
return createOrOpenInternal(tr, tr, path, layer, null, true, true);
}
});
return tcx.runAsync(tr -> createOrOpenInternal(tr, tr, path, layer, null, true, true));
}
/**
@ -334,12 +328,7 @@ public class DirectoryLayer implements Directory
*/
@Override
public CompletableFuture<DirectorySubspace> open(ReadTransactionContext tcx, final List<String> path, final byte[] layer) {
return tcx.readAsync(new Function<ReadTransaction, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(ReadTransaction rtr) {
return createOrOpenInternal(rtr, null, path, layer, null, false, true);
}
});
return tcx.readAsync(rtr -> createOrOpenInternal(rtr, null, path, layer, null, false, true));
}
/**
@ -361,12 +350,7 @@ public class DirectoryLayer implements Directory
*/
@Override
public CompletableFuture<DirectorySubspace> create(TransactionContext tcx, final List<String> path, final byte[] layer, final byte[] prefix) {
return tcx.runAsync(new Function<Transaction, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(Transaction tr) {
return createOrOpenInternal(tr, tr, path, layer, prefix, true, false);
}
});
return tcx.runAsync(tr -> createOrOpenInternal(tr, tr, path, layer, prefix, true, false));
}
/**
@ -384,7 +368,7 @@ public class DirectoryLayer implements Directory
*/
@Override
public CompletableFuture<DirectorySubspace> moveTo(TransactionContext tcx, List<String> newAbsolutePath) {
CompletableFuture<DirectorySubspace> future = new CompletableFuture<DirectorySubspace>();
CompletableFuture<DirectorySubspace> future = new CompletableFuture<>();
future.completeExceptionally(new DirectoryMoveException("The root directory cannot be moved.", path, newAbsolutePath));
return future;
}
@ -422,71 +406,51 @@ public class DirectoryLayer implements Directory
*/
@Override
public CompletableFuture<DirectorySubspace> move(final TransactionContext tcx, final List<String> oldPath, final List<String> newPath) {
final List<String> oldPathCopy = new ArrayList<String>(oldPath);
final List<String> newPathCopy = new ArrayList<String>(newPath);
final List<String> oldPathCopy = new ArrayList<>(oldPath);
final List<String> newPathCopy = new ArrayList<>(newPath);
return tcx.runAsync(new Function<Transaction, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(final Transaction tr) {
return checkOrWriteVersion(tr)
.thenComposeAsync(new Function<Void, CompletableFuture<List<Node>>>() {
@Override
public CompletableFuture<List<Node>> apply(Void ignore) {
if(oldPathCopy.size() <= newPathCopy.size() && oldPathCopy.equals(newPathCopy.subList(0, oldPathCopy.size())))
throw new DirectoryMoveException("The destination directory cannot be a subdirectory of the source directory.", toAbsolutePath(oldPathCopy), toAbsolutePath(newPathCopy));
return tcx.runAsync(tr -> checkOrWriteVersion(tr).thenComposeAsync(ignore -> {
if(oldPathCopy.size() <= newPathCopy.size() && oldPathCopy.equals(newPathCopy.subList(0, oldPathCopy.size())))
throw new DirectoryMoveException("The destination directory cannot be a subdirectory of the source directory.", toAbsolutePath(oldPathCopy), toAbsolutePath(newPathCopy));
ArrayList<CompletableFuture<Node>> futures = new ArrayList<CompletableFuture<Node>>();
futures.add(new NodeFinder(oldPathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()));
futures.add(new NodeFinder(newPathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()));
ArrayList<CompletableFuture<Node>> futures = new ArrayList<>();
futures.add(new NodeFinder(oldPathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()));
futures.add(new NodeFinder(newPathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()));
return AsyncUtil.getAll(futures);
}
}, tr.getExecutor())
.thenCompose(new Function<List<Node>, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(List<Node> nodes) {
final Node oldNode = nodes.get(0);
final Node newNode = nodes.get(1);
return AsyncUtil.getAll(futures);
}, tr.getExecutor())
.thenComposeAsync(nodes -> {
final Node oldNode = nodes.get(0);
final Node newNode = nodes.get(1);
if(!oldNode.exists())
throw new NoSuchDirectoryException(toAbsolutePath(oldPathCopy));
if(!oldNode.exists())
throw new NoSuchDirectoryException(toAbsolutePath(oldPathCopy));
if(oldNode.isInPartition(false) || newNode.isInPartition(false)) {
if(!oldNode.isInPartition(false) || !newNode.isInPartition(false) || !oldNode.path.equals(newNode.path))
throw new DirectoryMoveException("Cannot move between partitions.", toAbsolutePath(oldPathCopy), toAbsolutePath(newPathCopy));
if(oldNode.isInPartition(false) || newNode.isInPartition(false)) {
if(!oldNode.isInPartition(false) || !newNode.isInPartition(false) || !oldNode.path.equals(newNode.path))
throw new DirectoryMoveException("Cannot move between partitions.", toAbsolutePath(oldPathCopy), toAbsolutePath(newPathCopy));
return newNode.getContents().move(tr, oldNode.getPartitionSubpath(), newNode.getPartitionSubpath());
}
return newNode.getContents().move(tr, oldNode.getPartitionSubpath(), newNode.getPartitionSubpath());
}
if(newNode.exists())
throw new DirectoryAlreadyExistsException(toAbsolutePath(newPathCopy));
if(newNode.exists())
throw new DirectoryAlreadyExistsException(toAbsolutePath(newPathCopy));
final List<String> parentPath = PathUtil.popBack(newPathCopy);
return new NodeFinder(parentPath).find(tr)
.thenComposeAsync(new Function<Node, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(Node parentNode) {
if(!parentNode.exists())
throw new NoSuchDirectoryException(toAbsolutePath(parentPath));
final List<String> parentPath = PathUtil.popBack(newPathCopy);
return new NodeFinder(parentPath).find(tr).thenComposeAsync(parentNode -> {
if(!parentNode.exists())
throw new NoSuchDirectoryException(toAbsolutePath(parentPath));
tr.set(
parentNode.subspace.get(SUB_DIR_KEY).get(getLast(newPathCopy)).getKey(),
contentsOfNode(oldNode.subspace, EMPTY_PATH, EMPTY_BYTES).getKey()
);
tr.set(
parentNode.subspace.get(SUB_DIR_KEY).get(getLast(newPathCopy)).getKey(),
contentsOfNode(oldNode.subspace, EMPTY_PATH, EMPTY_BYTES).getKey()
);
return removeFromParent(tr, oldPathCopy)
.thenApply(new Function<Void, DirectorySubspace>() {
@Override
public DirectorySubspace apply(Void ignore) {
return contentsOfNode(oldNode.subspace, newPathCopy, oldNode.layer);
}
});
}
}, tr.getExecutor());
}
});
}
});
return removeFromParent(tr, oldPathCopy)
.thenApply(ignore -> contentsOfNode(oldNode.subspace, newPathCopy, oldNode.layer));
}, tr.getExecutor());
}, tr.getExecutor())
);
}
/**
@ -549,39 +513,27 @@ public class DirectoryLayer implements Directory
public CompletableFuture<List<String>> list(final ReadTransactionContext tcx, final List<String> path) {
final List<String> pathCopy = new ArrayList<String>(path);
return tcx.readAsync(new Function<ReadTransaction, CompletableFuture<List<String>>>() {
@Override
public CompletableFuture<List<String>> apply(final ReadTransaction tr) {
return checkVersion(tr)
.thenComposeAsync(new Function<Void, CompletableFuture<Node>>() {
@Override
public CompletableFuture<Node> apply(Void ignore) {
return new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor());
}
}, tr.getExecutor())
.thenComposeAsync(new Function<Node, CompletableFuture<List<String>>>() {
@Override
public CompletableFuture<List<String>> apply(Node node) {
if(!node.exists())
throw new NoSuchDirectoryException(toAbsolutePath(pathCopy));
return tcx.readAsync(tr -> checkVersion(tr)
.thenComposeAsync(ignore ->
new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()),
tr.getExecutor())
.thenComposeAsync(node -> {
if(!node.exists())
throw new NoSuchDirectoryException(toAbsolutePath(pathCopy));
if(node.isInPartition(true))
return node.getContents().list(tr, node.getPartitionSubpath());
if(node.isInPartition(true))
return node.getContents().list(tr, node.getPartitionSubpath());
final Subspace subdir = node.subspace.get(SUB_DIR_KEY);
final Subspace subdir = node.subspace.get(SUB_DIR_KEY);
return AsyncUtil.collect(
AsyncUtil.mapIterable(tr.getRange(subdir.range()),
new Function<KeyValue, String>() {
@Override
public String apply(KeyValue o) {
return subdir.unpack(o.getKey()).getString(0);
}
}), tr.getExecutor());
}
}, tr.getExecutor());
}
});
return AsyncUtil.collect(
AsyncUtil.mapIterable(tr.getRange(subdir.range()),
kv -> subdir.unpack(kv.getKey()).getString(0),
tr.getExecutor()
)
);
}, tr.getExecutor())
);
}
/**
@ -592,7 +544,7 @@ public class DirectoryLayer implements Directory
*/
@Override
public CompletableFuture<Boolean> exists(ReadTransactionContext tcx) {
return CompletableFuture.completedFuture(true);
return AsyncUtil.READY_TRUE;
}
/**
@ -605,33 +557,20 @@ public class DirectoryLayer implements Directory
*/
@Override
public CompletableFuture<Boolean> exists(final ReadTransactionContext tcx, final List<String> path) {
final List<String> pathCopy = new ArrayList<String>(path);
final List<String> pathCopy = new ArrayList<>(path);
return tcx.readAsync(new Function<ReadTransaction, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(final ReadTransaction tr) {
return checkVersion(tr)
.thenComposeAsync(new Function<Void, CompletableFuture<Node>>() {
@Override
public CompletableFuture<Node> apply(Void ignore) {
return new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor());
};
}, tr.getExecutor())
.thenComposeAsync(new Function<Node, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Node node) {
if(!node.exists())
//return new ReadyFuture<Boolean>(false);
return CompletableFuture.completedFuture(false);
else if(node.isInPartition(false))
return node.getContents().exists(tr, node.getPartitionSubpath());
return tcx.readAsync(tr -> checkVersion(tr)
.thenComposeAsync(ignore ->
new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()),
tr.getExecutor())
.thenComposeAsync(node -> {
if(!node.exists())
return AsyncUtil.READY_FALSE;
else if(node.isInPartition(false))
return node.getContents().exists(tr, node.getPartitionSubpath());
//return new ReadyFuture<Boolean>(true);
return CompletableFuture.completedFuture(true);
}
}, tr.getExecutor());
}
});
return AsyncUtil.READY_TRUE;
}, tr.getExecutor()));
}
//
@ -655,19 +594,16 @@ public class DirectoryLayer implements Directory
return tr.getRange(nodeSubspace.range().begin, ByteArrayUtil.join(nodeSubspace.pack(key), new byte[]{0x00}), 1, true)
.asList()
.thenApply(new Function<List<KeyValue>, Subspace>() {
@Override
public Subspace apply(List<KeyValue> results) {
if(results.size() > 0) {
byte[] resultKey = results.get(0).getKey();
byte[] prevPrefix = nodeSubspace.unpack(resultKey).getBytes(0);
if(ByteArrayUtil.startsWith(key, prevPrefix)) {
return nodeWithPrefix(prevPrefix);
}
}
.thenApply(results -> {
if(results.size() > 0) {
byte[] resultKey = results.get(0).getKey();
byte[] prevPrefix = nodeSubspace.unpack(resultKey).getBytes(0);
if(ByteArrayUtil.startsWith(key, prevPrefix)) {
return nodeWithPrefix(prevPrefix);
}
}
return null;
}
return null;
});
}
@ -685,55 +621,37 @@ public class DirectoryLayer implements Directory
}
private CompletableFuture<Boolean> removeInternal(final TransactionContext tcx, final List<String> path, final boolean mustExist) {
final List<String> pathCopy = new ArrayList<String>(path);
final List<String> pathCopy = new ArrayList<>(path);
return tcx.runAsync(new Function<Transaction, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(final Transaction tr) {
return checkOrWriteVersion(tr)
.thenComposeAsync(new Function<Void, CompletableFuture<Node>>() {
@Override
public CompletableFuture<Node> apply(Void ignore) {
if(pathCopy.size() == 0)
throw new DirectoryException("The root directory cannot be removed.", toAbsolutePath(pathCopy));
return tcx.runAsync(tr -> checkOrWriteVersion(tr).thenComposeAsync(ignore -> {
if(pathCopy.size() == 0)
throw new DirectoryException("The root directory cannot be removed.", toAbsolutePath(pathCopy));
return new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor());
}
}, tr.getExecutor())
.thenComposeAsync(new Function<Node, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Node node) {
if(!node.exists()) {
if(mustExist)
throw new NoSuchDirectoryException(toAbsolutePath(pathCopy));
else
return CompletableFuture.completedFuture(false);
}
return new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor());
}, tr.getExecutor())
.thenComposeAsync(node -> {
if(!node.exists()) {
if(mustExist)
throw new NoSuchDirectoryException(toAbsolutePath(pathCopy));
else
return AsyncUtil.READY_FALSE;
}
if(node.isInPartition(false))
return node.getContents().getDirectoryLayer().removeInternal(tr, node.getPartitionSubpath(), mustExist);
else {
ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
futures.add(removeRecursive(tr, node.subspace));
futures.add(removeFromParent(tr, pathCopy));
return AsyncUtil.tag(AsyncUtil.whenAll(futures), true);
}
}
}, tr.getExecutor());
}
});
if(node.isInPartition(false))
return node.getContents().getDirectoryLayer().removeInternal(tr, node.getPartitionSubpath(), mustExist);
else {
ArrayList<CompletableFuture<Void>> futures = new ArrayList<>();
futures.add(removeRecursive(tr, node.subspace));
futures.add(removeFromParent(tr, pathCopy));
return AsyncUtil.tag(AsyncUtil.whenAll(futures), true);
}
}, tr.getExecutor())
);
}
private CompletableFuture<Void> removeFromParent(final Transaction tr, final List<String> path) {
return new NodeFinder(PathUtil.popBack(path)).find(tr)
.thenApply(new Function<Node, Void>() {
@Override
public Void apply(Node parent) {
tr.clear(parent.subspace.get(SUB_DIR_KEY).get(getLast(path)).getKey());
return null;
}
});
.thenAccept(parent -> tr.clear(parent.subspace.get(SUB_DIR_KEY).get(getLast(path)).getKey()));
}
private CompletableFuture<Void> removeRecursive(final Transaction tr, final Subspace node) {
@ -743,23 +661,14 @@ public class DirectoryLayer implements Directory
tr.clear(Range.startsWith(nodeSubspace.unpack(node.getKey()).getBytes(0)));
tr.clear(node.range());
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> get() {
CompletableFuture<Void> subdirRemoveFuture;
if(rangeItr.onHasNext().isDone() && rangeItr.hasNext())
subdirRemoveFuture = removeRecursive(tr, nodeWithPrefix(rangeItr.next().getValue()));
else
subdirRemoveFuture = CompletableFuture.completedFuture(null);
return AsyncUtil.whileTrue(() -> {
CompletableFuture<Void> subdirRemoveFuture;
if(rangeItr.onHasNext().isDone() && rangeItr.hasNext())
subdirRemoveFuture = removeRecursive(tr, nodeWithPrefix(rangeItr.next().getValue()));
else
subdirRemoveFuture = AsyncUtil.DONE;
return subdirRemoveFuture
.thenComposeAsync(new Function<Void, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
return rangeItr.onHasNext();
}
}, tr.getExecutor());
}
return subdirRemoveFuture.thenCompose(ignore -> rangeItr.onHasNext());
}, tr.getExecutor());
}
@ -768,26 +677,14 @@ public class DirectoryLayer implements Directory
// allocated prefix (including the root node). This means that it neither
// contains any other prefix nor is contained by any other prefix.
if(prefix == null || prefix.length == 0)
//return new ReadyFuture<Boolean>(false);
return CompletableFuture.completedFuture(false);
return AsyncUtil.READY_FALSE;
return nodeContainingKey(tr, prefix)
.thenComposeAsync(new Function<Subspace, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Subspace node) {
if(node != null)
//return new ReadyFuture<Boolean>(false);
return CompletableFuture.completedFuture(false);
return nodeContainingKey(tr, prefix).thenComposeAsync(node -> {
if(node != null)
return AsyncUtil.READY_FALSE;
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext()
.thenApply(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean hasNext) {
return !hasNext;
}
});
}
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext().thenApply(hasNext -> !hasNext);
}, tr.getExecutor());
}
@ -811,7 +708,7 @@ public class DirectoryLayer implements Directory
final boolean allowCreate,
final boolean allowOpen)
{
final List<String> pathCopy = new ArrayList<String>(path);
final List<String> pathCopy = new ArrayList<>(path);
if(prefix != null && !allowManualPrefixes) {
String errorMessage;
@ -820,40 +717,33 @@ public class DirectoryLayer implements Directory
else
errorMessage = "Cannot specify a prefix in a partition.";
CompletableFuture<DirectorySubspace> future = new CompletableFuture<DirectorySubspace>();
CompletableFuture<DirectorySubspace> future = new CompletableFuture<>();
future.completeExceptionally(new IllegalArgumentException(errorMessage));
return future;
}
return checkVersion(rtr)
.thenComposeAsync(new Function<Void, CompletableFuture<Node>>() {
@Override
public CompletableFuture<Node> apply(Void ignore) {
// Root directory contains node metadata and so may not be opened.
if(pathCopy.size() == 0) {
throw new IllegalArgumentException("The root directory may not be opened.");
}
return checkVersion(rtr).thenComposeAsync(ignore -> {
// Root directory contains node metadata and so may not be opened.
if(pathCopy.size() == 0) {
throw new IllegalArgumentException("The root directory may not be opened.");
}
return new NodeFinder(pathCopy).find(rtr).thenComposeAsync(new NodeMetadataLoader(rtr), rtr.getExecutor());
}
return new NodeFinder(pathCopy).find(rtr).thenComposeAsync(new NodeMetadataLoader(rtr), rtr.getExecutor());
}, rtr.getExecutor())
.thenComposeAsync(new Function<Node, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(final Node existingNode) {
if(existingNode.exists()) {
if(existingNode.isInPartition(false)) {
List<String> subpath = existingNode.getPartitionSubpath();
DirectoryLayer directoryLayer = existingNode.getContents().getDirectoryLayer();
return directoryLayer.createOrOpenInternal(
rtr, tr, subpath, layer, prefix, allowCreate, allowOpen);
}
.thenComposeAsync(existingNode -> {
if(existingNode.exists()) {
if(existingNode.isInPartition(false)) {
List<String> subpath = existingNode.getPartitionSubpath();
DirectoryLayer directoryLayer = existingNode.getContents().getDirectoryLayer();
return directoryLayer.createOrOpenInternal(
rtr, tr, subpath, layer, prefix, allowCreate, allowOpen);
}
DirectorySubspace opened = openInternal(pathCopy, layer, existingNode, allowOpen);
return CompletableFuture.completedFuture(opened);
}
else
return createInternal(tr, pathCopy, layer, prefix, allowCreate);
}
DirectorySubspace opened = openInternal(pathCopy, layer, existingNode, allowOpen);
return CompletableFuture.completedFuture(opened);
}
else
return createInternal(tr, pathCopy, layer, prefix, allowCreate);
}, rtr.getExecutor());
}
@ -884,78 +774,47 @@ public class DirectoryLayer implements Directory
throw new NoSuchDirectoryException(toAbsolutePath(path));
}
return checkOrWriteVersion(tr)
.thenComposeAsync(new Function<Void, CompletableFuture<byte[]>>() {
@Override
public CompletableFuture<byte[]> apply(Void ignore) {
if(prefix == null) {
return allocator.allocate(tr)
.thenComposeAsync(new Function<byte[], CompletableFuture<byte[]>>() {
@Override
public CompletableFuture<byte[]> apply(byte[] allocated) {
final byte[] finalPrefix = ByteArrayUtil.join(contentSubspace.getKey(), allocated);
return tr.getRange(Range.startsWith(finalPrefix), 1)
.asList()
.thenApply(new Function<List<KeyValue>, byte[]>() {
@Override
public byte[] apply(List<KeyValue> results) {
if(results.size() > 0) {
throw new IllegalStateException("The database has keys stored at the prefix chosen by the automatic " +
"prefix allocator: " + ByteArrayUtil.printable(finalPrefix) + ".");
}
return finalPrefix;
}
});
}
}, tr.getExecutor());
}
else
return CompletableFuture.completedFuture(prefix);
//return new ReadyFuture<byte[]>(prefix);
}
return checkOrWriteVersion(tr).thenComposeAsync(ignore -> {
if(prefix == null) {
return allocator.allocate(tr).thenComposeAsync(allocated -> {
final byte[] finalPrefix = ByteArrayUtil.join(contentSubspace.getKey(), allocated);
return tr.getRange(Range.startsWith(finalPrefix), 1).iterator().onHasNext().thenApply(hasAny -> {
if(hasAny) {
throw new IllegalStateException("The database has keys stored at the prefix chosen by the automatic " +
"prefix allocator: " + ByteArrayUtil.printable(finalPrefix) + ".");
}
return finalPrefix;
});
}, tr.getExecutor());
}
else
return CompletableFuture.completedFuture(prefix);
}, tr.getExecutor())
.thenComposeAsync(new Function<byte[], CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(final byte[] actualPrefix) {
return isPrefixFree(prefix == null ? tr.snapshot() : tr, actualPrefix)
.thenComposeAsync(new Function<Boolean, CompletableFuture<Subspace>>() {
@Override
public CompletableFuture<Subspace> apply(Boolean prefixFree) {
if(!prefixFree) {
if(prefix == null) {
throw new IllegalStateException("The directory layer has manually allocated prefixes that conflict " +
"with the automatic prefix allocator.");
}
else
throw new IllegalArgumentException("Prefix already in use: " + ByteArrayUtil.printable(actualPrefix) + ".");
}
else if(path.size() > 1) {
return createOrOpen(tr, PathUtil.popBack(path))
.thenApply(new Function<DirectorySubspace, Subspace>() {
@Override
public Subspace apply(DirectorySubspace dir) {
return nodeWithPrefix(dir.getKey());
}
});
}
else
return CompletableFuture.completedFuture(rootNode);
}
}, tr.getExecutor())
.thenApply(new Function<Subspace, DirectorySubspace>() {
@Override
public DirectorySubspace apply(Subspace parentNode) {
if(parentNode == null)
throw new IllegalStateException("The parent directory does not exist."); //Shouldn't happen
Subspace node = nodeWithPrefix(actualPrefix);
tr.set(parentNode.get(SUB_DIR_KEY).get(getLast(path)).getKey(), actualPrefix);
tr.set(node.get(LAYER_KEY).getKey(), layer);
return contentsOfNode(node, path, layer);
}
});
}
}, tr.getExecutor());
.thenComposeAsync(actualPrefix -> isPrefixFree(prefix == null ? tr.snapshot() : tr, actualPrefix)
.thenComposeAsync(prefixFree -> {
if(!prefixFree) {
if(prefix == null) {
throw new IllegalStateException("The directory layer has manually allocated prefixes that conflict " +
"with the automatic prefix allocator.");
}
else
throw new IllegalArgumentException("Prefix already in use: " + ByteArrayUtil.printable(actualPrefix) + ".");
}
else if(path.size() > 1) {
return createOrOpen(tr, PathUtil.popBack(path)).thenApply(dir -> nodeWithPrefix(dir.getKey()));
}
else
return CompletableFuture.completedFuture(rootNode);
}, tr.getExecutor())
.thenApplyAsync(parentNode -> {
if(parentNode == null)
throw new IllegalStateException("The parent directory does not exist."); //Shouldn't happen
Subspace node = nodeWithPrefix(actualPrefix);
tr.set(parentNode.get(SUB_DIR_KEY).get(getLast(path)).getKey(), actualPrefix);
tr.set(node.get(LAYER_KEY).getKey(), layer);
return contentsOfNode(node, path, layer);
}, tr.getExecutor()),
tr.getExecutor());
}
//
@ -1050,42 +909,26 @@ public class DirectoryLayer implements Directory
public CompletableFuture<Node> find(final ReadTransaction tr) {
index = 0;
node = new Node(rootNode, currentPath, path);
currentPath = new ArrayList<String>();
currentPath = new ArrayList<>();
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> get() {
if(index == path.size())
return CompletableFuture.completedFuture(false);
return AsyncUtil.whileTrue(() -> {
if(index == path.size())
return CompletableFuture.completedFuture(false);
return tr.get(node.subspace.get(SUB_DIR_KEY).get(path.get(index)).getKey())
.thenComposeAsync(new Function<byte[], CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(byte[] key) {
currentPath.add(path.get(index));
node = new Node(nodeWithPrefix(key), currentPath, path);
return tr.get(node.subspace.get(SUB_DIR_KEY).get(path.get(index)).getKey()).thenComposeAsync(key -> {
currentPath.add(path.get(index));
node = new Node(nodeWithPrefix(key), currentPath, path);
if(!node.exists())
return CompletableFuture.completedFuture(false);
if(!node.exists())
return CompletableFuture.completedFuture(false);
return node.loadMetadata(tr)
.thenApply(new Function<Node, Boolean>() {
@Override
public Boolean apply(Node ignore) {
++index;
return !Arrays.equals(node.layer, DirectoryLayer.PARTITION_LAYER);
}
});
}
}, tr.getExecutor());
}
return node.loadMetadata(tr).thenApply(ignore -> {
++index;
return !Arrays.equals(node.layer, DirectoryLayer.PARTITION_LAYER);
});
}, tr.getExecutor());
}, tr.getExecutor())
.thenApply(new Function<Void, Node>() {
@Override
public Node apply(Void ignore) {
return node;
}
});
.thenApply(ignore -> node);
}
}
@ -1130,13 +973,10 @@ public class DirectoryLayer implements Directory
}
return tr.get(subspace.pack(new Tuple().add(LAYER_KEY)))
.thenApply(new Function<byte[], Node>() {
@Override
public Node apply(byte[] value) {
layer = value;
loadedMetadata = true;
return Node.this;
}
.thenApply(value -> {
layer = value;
loadedMetadata = true;
return Node.this;
});
}
@ -1176,146 +1016,100 @@ public class DirectoryLayer implements Directory
}
public CompletableFuture<byte[]> find(final Transaction tr, final HighContentionAllocator allocator) {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> get() {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext()
.thenApply(new Function<Boolean, Void>() {
@Override
public Void apply(Boolean hasNext) {
if(hasNext) {
KeyValue kv = rangeItr.next();
windowStart = allocator.counters.unpack(kv.getKey()).getLong(0);
}
return AsyncUtil.whileTrue(() -> {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext().thenApply(hasNext -> {
if(hasNext) {
KeyValue kv = rangeItr.next();
windowStart = allocator.counters.unpack(kv.getKey()).getLong(0);
}
return null;
}
})
.thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void ignore) {
return chooseWindow(tr, allocator);
}
}, tr.getExecutor())
.thenComposeAsync(new Function<Void, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
return choosePrefix(tr, allocator); // false exits the loop (i.e. we have a valid prefix)
}
}, tr.getExecutor());
}
return null;
})
.thenComposeAsync(ignore -> chooseWindow(tr, allocator), tr.getExecutor())
.thenComposeAsync(ignore -> choosePrefix(tr, allocator), tr.getExecutor());
}, tr.getExecutor())
.thenApply(new Function<Void, byte[]>() {
@Override
public byte[] apply(Void ignore) {
return Tuple.from(candidate).pack();
}
});
.thenApply(ignore -> Tuple.from(candidate).pack());
}
public CompletableFuture<Void> chooseWindow(final Transaction tr, final HighContentionAllocator allocator) {
final long initialWindowStart = windowStart;
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> get() {
final byte[] counterKey = allocator.counters.get(windowStart).getKey();
return AsyncUtil.whileTrue(() -> {
final byte[] counterKey = allocator.counters.get(windowStart).getKey();
Range oldCounters = new Range(allocator.counters.getKey(), counterKey);
Range oldAllocations = new Range(allocator.recent.getKey(), allocator.recent.get(windowStart).getKey());
Range oldCounters = new Range(allocator.counters.getKey(), counterKey);
Range oldAllocations = new Range(allocator.recent.getKey(), allocator.recent.get(windowStart).getKey());
CompletableFuture<byte[]> newCountRead;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
if(windowStart > initialWindowStart) {
tr.clear(oldCounters);
tr.options().setNextWriteNoWriteConflictRange();
tr.clear(oldAllocations);
}
CompletableFuture<byte[]> newCountRead;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
if(windowStart > initialWindowStart) {
tr.clear(oldCounters);
tr.options().setNextWriteNoWriteConflictRange();
tr.clear(oldAllocations);
}
tr.mutate(MutationType.ADD, counterKey, LITTLE_ENDIAN_LONG_ONE);
newCountRead = tr.snapshot().get(counterKey);
}
tr.mutate(MutationType.ADD, counterKey, LITTLE_ENDIAN_LONG_ONE);
newCountRead = tr.snapshot().get(counterKey);
}
return newCountRead
.thenApply(new Function<byte[], Boolean>() {
@Override
public Boolean apply(byte[] newCountBytes) {
long newCount = newCountBytes == null ? 0 : unpackLittleEndian(newCountBytes);
windowSize = getWindowSize(windowStart);
if(newCount * 2 >= windowSize) {
windowStart += windowSize;
return true;
}
return newCountRead.thenApply(newCountBytes -> {
long newCount = newCountBytes == null ? 0 : unpackLittleEndian(newCountBytes);
windowSize = getWindowSize(windowStart);
if(newCount * 2 >= windowSize) {
windowStart += windowSize;
return true;
}
return false; // exit the loop
}
});
}
return false; // exit the loop
});
}, tr.getExecutor());
}
public CompletableFuture<Boolean> choosePrefix(final Transaction tr, final HighContentionAllocator allocator) {
restart = false;
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> get() {
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional
// subsequent risk of conflict for this transaction.
candidate = windowStart + random.nextInt(windowSize);
final byte[] allocationKey = allocator.recent.get(candidate).getKey();
Range countersRange = allocator.counters.range();
return AsyncUtil.whileTrue(() -> {
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional
// subsequent risk of conflict for this transaction.
candidate = windowStart + random.nextInt(windowSize);
final byte[] allocationKey = allocator.recent.get(candidate).getKey();
Range countersRange = allocator.counters.range();
AsyncIterable<KeyValue> counterRange;
CompletableFuture<byte[]> allocationTemp;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
counterRange = tr.snapshot().getRange(countersRange, 1, true);
allocationTemp = tr.get(allocationKey);
tr.options().setNextWriteNoWriteConflictRange();
tr.set(allocationKey, EMPTY_BYTES);
}
AsyncIterable<KeyValue> counterRange;
CompletableFuture<byte[]> allocationTemp;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
counterRange = tr.snapshot().getRange(countersRange, 1, true);
allocationTemp = tr.get(allocationKey);
tr.options().setNextWriteNoWriteConflictRange();
tr.set(allocationKey, EMPTY_BYTES);
}
final CompletableFuture<List<KeyValue>> lastCounter = counterRange.asList();
final CompletableFuture<byte[]> allocation = allocationTemp;
final CompletableFuture<List<KeyValue>> lastCounter = counterRange.asList();
final CompletableFuture<byte[]> allocation = allocationTemp;
List<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
futures.add(AsyncUtil.success(lastCounter));
futures.add(AsyncUtil.success(allocation));
return lastCounter.thenCombineAsync(allocation, (result, allocationValue) -> {
long currentWindowStart = 0;
if(!result.isEmpty()) {
currentWindowStart = allocator.counters.unpack(result.get(0).getKey()).getLong(0);
}
return AsyncUtil.whenAll(futures)
.thenApply(new Function<Void, Boolean>() {
@Override
public Boolean apply(Void ignore) {
long currentWindowStart = 0;
List<KeyValue> result = lastCounter.join();
if(!result.isEmpty()) {
currentWindowStart = allocator.counters.unpack(result.get(0).getKey()).getLong(0);
}
if(currentWindowStart > windowStart) {
restart = true;
return false; // exit the loop and rerun the allocation from the beginning
}
if(currentWindowStart > windowStart) {
restart = true;
return false; // exit the loop and rerun the allocation from the beginning
}
if(allocationValue == null) {
tr.addWriteConflictKey(allocationKey);
return false; // exit the loop and return this candidate
}
if(allocation.join() == null) {
tr.addWriteConflictKey(allocationKey);
return false; // exit the loop and return this candidate
}
return true;
}
});
}
return true;
}, tr.getExecutor());
}, tr.getExecutor())
.thenApply(new Function<Void, Boolean>() {
@Override
public Boolean apply(Void ignore) {
return restart;
}
});
.thenApply(ignore -> restart);
}
private static int getWindowSize(long start) {

View File

@ -22,13 +22,11 @@ package com.apple.foundationdb.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.directory.Directory;
import com.apple.foundationdb.directory.DirectoryLayer;
@ -37,7 +35,7 @@ import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
class AsyncDirectoryExtension {
List<Object> dirList = new ArrayList<Object>();
List<Object> dirList = new ArrayList<>();
int dirIndex = 0;
int errorIndex = 0;
@ -54,13 +52,9 @@ class AsyncDirectoryExtension {
}
CompletableFuture<Void> processInstruction(final Instruction inst) {
return executeInstruction(inst)
.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable e) {
DirectoryUtil.pushError(inst, e, dirList);
return null;
}
return executeInstruction(inst).exceptionally(e -> {
DirectoryUtil.pushError(inst, e, dirList);
return null;
});
}
@ -69,372 +63,184 @@ class AsyncDirectoryExtension {
if(op == DirectoryOperation.DIRECTORY_CREATE_SUBSPACE) {
return DirectoryUtil.popTuple(inst)
.thenComposeAsync(new Function<Tuple, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(final Tuple prefix) {
return inst.popParam()
.thenApplyAsync(new Function<Object, Void>() {
@Override
public Void apply(Object rawPrefix) {
dirList.add(new Subspace(prefix, (byte[])rawPrefix));
return null;
}
});
}
});
.thenComposeAsync(prefix -> inst.popParam()
.thenAccept(rawPrefix -> dirList.add(new Subspace(prefix, (byte[])rawPrefix))));
}
else if(op == DirectoryOperation.DIRECTORY_CREATE_LAYER) {
return inst.popParams(3)
.thenApplyAsync(new Function<List<Object>, Void>() {
@Override
public Void apply(List<Object> params) {
Subspace nodeSubspace = (Subspace)dirList.get(StackUtils.getInt(params.get(0)));
Subspace contentSubspace = (Subspace)dirList.get(StackUtils.getInt(params.get(1)));
boolean allowManualPrefixes = StackUtils.getInt(params.get(2)) == 1;
return inst.popParams(3).thenAcceptAsync(params -> {
Subspace nodeSubspace = (Subspace)dirList.get(StackUtils.getInt(params.get(0)));
Subspace contentSubspace = (Subspace)dirList.get(StackUtils.getInt(params.get(1)));
boolean allowManualPrefixes = StackUtils.getInt(params.get(2)) == 1;
if(nodeSubspace == null || contentSubspace == null)
dirList.add(null);
else
dirList.add(new DirectoryLayer(nodeSubspace, contentSubspace, allowManualPrefixes));
return null;
}
if(nodeSubspace == null || contentSubspace == null)
dirList.add(null);
else
dirList.add(new DirectoryLayer(nodeSubspace, contentSubspace, allowManualPrefixes));
});
}
else if(op == DirectoryOperation.DIRECTORY_CHANGE) {
return inst.popParam()
.thenApplyAsync(new Function<Object, Void>() {
@Override
public Void apply(Object index) {
dirIndex = StackUtils.getInt(index);
if(dirList.get(dirIndex) == null)
dirIndex = errorIndex;
return null;
}
return inst.popParam().thenAcceptAsync(index -> {
dirIndex = StackUtils.getInt(index);
if(dirList.get(dirIndex) == null)
dirIndex = errorIndex;
});
}
else if(op == DirectoryOperation.DIRECTORY_SET_ERROR_INDEX) {
return inst.popParam()
.thenApplyAsync(new Function<Object, Void>() {
@Override
public Void apply(Object index) {
errorIndex = StackUtils.getInt(index);
return null;
}
});
return inst.popParam().thenAcceptAsync(index -> errorIndex = StackUtils.getInt(index));
}
else if(op == DirectoryOperation.DIRECTORY_CREATE_OR_OPEN || op == DirectoryOperation.DIRECTORY_OPEN) {
return DirectoryUtil.popPath(inst)
.thenComposeAsync(new Function<List<String>, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(final List<String> path) {
return inst.popParam()
.thenComposeAsync(new Function<Object, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Object layer) {
CompletableFuture<DirectorySubspace> dir;
if(layer == null) {
if(op == DirectoryOperation.DIRECTORY_CREATE_OR_OPEN)
dir = directory().createOrOpen(inst.tcx, path);
else
dir = directory().open(inst.readTcx, path);
}
else {
if(op == DirectoryOperation.DIRECTORY_CREATE_OR_OPEN)
dir = directory().createOrOpen(inst.tcx, path, (byte[])layer);
else
dir = directory().open(inst.readTcx, path, (byte[])layer);
}
.thenComposeAsync(path -> inst.popParam().thenComposeAsync(layer -> {
CompletableFuture<DirectorySubspace> dir;
if(layer == null) {
if(op == DirectoryOperation.DIRECTORY_CREATE_OR_OPEN)
dir = directory().createOrOpen(inst.tcx, path);
else
dir = directory().open(inst.readTcx, path);
}
else {
if(op == DirectoryOperation.DIRECTORY_CREATE_OR_OPEN)
dir = directory().createOrOpen(inst.tcx, path, (byte[])layer);
else
dir = directory().open(inst.readTcx, path, (byte[])layer);
}
return dir.thenApplyAsync(new Function<DirectorySubspace, Void>() {
@Override
public Void apply(DirectorySubspace dirSubspace) {
dirList.add(dirSubspace);
return null;
}
});
}
});
}
});
return dir.thenAccept(dirList::add);
}));
}
else if(op == DirectoryOperation.DIRECTORY_CREATE) {
return DirectoryUtil.popPath(inst)
.thenComposeAsync(new Function<List<String>, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(final List<String> path) {
return inst.popParams(2)
.thenComposeAsync(new Function<List<Object>, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(List<Object> params) {
byte[] layer = (byte[])params.get(0);
byte[] prefix = (byte[])params.get(1);
return DirectoryUtil.popPath(inst).thenComposeAsync(path -> inst.popParams(2).thenComposeAsync(params -> {
byte[] layer = (byte[])params.get(0);
byte[] prefix = (byte[])params.get(1);
CompletableFuture<DirectorySubspace> dir;
if(layer == null && prefix == null)
dir = directory().create(inst.tcx, path);
else if(prefix == null)
dir = directory().create(inst.tcx, path, layer);
else {
if(layer == null)
layer = new byte[0];
CompletableFuture<DirectorySubspace> dir;
if(layer == null && prefix == null)
dir = directory().create(inst.tcx, path);
else if(prefix == null)
dir = directory().create(inst.tcx, path, layer);
else {
if(layer == null)
layer = new byte[0];
dir = directory().create(inst.tcx, path, layer, prefix);
}
dir = directory().create(inst.tcx, path, layer, prefix);
}
return dir.thenApplyAsync(new Function<DirectorySubspace, Void>() {
@Override
public Void apply(DirectorySubspace dirSubspace) {
dirList.add(dirSubspace);
return null;
}
});
}
});
}
});
return dir.thenAccept(dirList::add);
}));
}
else if(op == DirectoryOperation.DIRECTORY_MOVE) {
return DirectoryUtil.popPaths(inst, 2)
.thenComposeAsync(new Function<List<List<String>>, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(List<List<String>> paths) {
return directory().move(inst.tcx, paths.get(0), paths.get(1));
}
})
.thenApplyAsync(new Function<DirectorySubspace, Void>() {
@Override
public Void apply(DirectorySubspace dirSubspace) {
dirList.add(dirSubspace);
return null;
}
});
.thenComposeAsync(paths -> directory().move(inst.tcx, paths.get(0), paths.get(1)))
.thenAccept(dirList::add);
}
else if(op == DirectoryOperation.DIRECTORY_MOVE_TO) {
return DirectoryUtil.popPath(inst)
.thenComposeAsync(new Function<List<String>, CompletableFuture<DirectorySubspace>>() {
@Override
public CompletableFuture<DirectorySubspace> apply(List<String> newAbsolutePath) {
return directory().moveTo(inst.tcx, newAbsolutePath);
}
})
.thenApplyAsync(new Function<DirectorySubspace, Void>() {
@Override
public Void apply(DirectorySubspace dirSubspace) {
dirList.add(dirSubspace);
return null;
}
});
.thenComposeAsync(newAbsolutePath -> directory().moveTo(inst.tcx, newAbsolutePath))
.thenAccept(dirList::add);
}
else if(op == DirectoryOperation.DIRECTORY_REMOVE) {
return inst.popParam()
.thenComposeAsync(new Function<Object, CompletableFuture<List<List<String>>>>() {
@Override
public CompletableFuture<List<List<String>>> apply(Object count) {
return DirectoryUtil.popPaths(inst, StackUtils.getInt(count));
}
})
.thenComposeAsync(new Function<List<List<String>>, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(List<List<String>> path) {
if(path.size() == 0)
return directory().remove(inst.tcx);
else
return directory().remove(inst.tcx, path.get(0));
}
.thenComposeAsync(count -> DirectoryUtil.popPaths(inst, StackUtils.getInt(count)))
.thenComposeAsync(path -> {
if(path.size() == 0)
return directory().remove(inst.tcx);
else
return directory().remove(inst.tcx, path.get(0));
});
}
else if(op == DirectoryOperation.DIRECTORY_REMOVE_IF_EXISTS) {
return inst.popParam()
.thenComposeAsync(new Function<Object, CompletableFuture<List<List<String>>>>() {
@Override
public CompletableFuture<List<List<String>>> apply(Object count) {
return DirectoryUtil.popPaths(inst, StackUtils.getInt(count));
}
})
.thenComposeAsync(new Function<List<List<String>>, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(List<List<String>> path) {
if(path.size() == 0)
return AsyncUtil.success(directory().removeIfExists(inst.tcx));
else
return AsyncUtil.success(directory().removeIfExists(inst.tcx, path.get(0)));
}
});
.thenComposeAsync(count -> DirectoryUtil.popPaths(inst, StackUtils.getInt(count)))
.thenComposeAsync(path -> {
if(path.size() == 0)
return AsyncUtil.success(directory().removeIfExists(inst.tcx));
else
return AsyncUtil.success(directory().removeIfExists(inst.tcx, path.get(0)));
});
}
else if(op == DirectoryOperation.DIRECTORY_LIST) {
return inst.popParam()
.thenComposeAsync(new Function<Object, CompletableFuture<List<List<String>>>>() {
@Override
public CompletableFuture<List<List<String>>> apply(Object count) {
return DirectoryUtil.popPaths(inst, StackUtils.getInt(count));
}
})
.thenComposeAsync(new Function<List<List<String>>, CompletableFuture<List<String>>>() {
@Override
public CompletableFuture<List<String>> apply(List<List<String>> path) {
if(path.size() == 0)
return directory().list(inst.readTcx);
else
return directory().list(inst.readTcx, path.get(0));
}
})
.thenApplyAsync(new Function<List<String>, Void>() {
@Override
public Void apply(List<String> children) {
inst.push(Tuple.fromItems(children).pack());
return null;
}
});
.thenComposeAsync(count -> DirectoryUtil.popPaths(inst, StackUtils.getInt(count)))
.thenComposeAsync(path -> {
if(path.size() == 0)
return directory().list(inst.readTcx);
else
return directory().list(inst.readTcx, path.get(0));
})
.thenAccept(children -> inst.push(Tuple.fromItems(children).pack()));
}
else if(op == DirectoryOperation.DIRECTORY_EXISTS) {
return inst.popParam()
.thenComposeAsync(new Function<Object, CompletableFuture<List<List<String>>>>() {
@Override
public CompletableFuture<List<List<String>>> apply(Object count) {
return DirectoryUtil.popPaths(inst, StackUtils.getInt(count));
}
.thenComposeAsync(count -> DirectoryUtil.popPaths(inst, StackUtils.getInt(count)))
.thenComposeAsync(path -> {
if(path.size() == 0)
return directory().exists(inst.readTcx);
else
return directory().exists(inst.readTcx, path.get(0));
})
.thenComposeAsync(new Function<List<List<String>>, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(List<List<String>> path) {
if(path.size() == 0)
return directory().exists(inst.readTcx);
else
return directory().exists(inst.readTcx, path.get(0));
}
})
.thenApplyAsync(new Function<Boolean, Void>() {
@Override
public Void apply(Boolean exists){
inst.push(exists ? 1 : 0);
return null;
}
});
.thenAccept(exists -> inst.push(exists ? 1 : 0));
}
else if(op == DirectoryOperation.DIRECTORY_PACK_KEY) {
return DirectoryUtil.popTuple(inst)
.thenApplyAsync(new Function<Tuple, Void>() {
@Override
public Void apply(Tuple keyTuple) {
inst.push(subspace().pack(keyTuple));
return null;
}
});
return DirectoryUtil.popTuple(inst).thenAccept(keyTuple -> inst.push(subspace().pack(keyTuple)));
}
else if(op == DirectoryOperation.DIRECTORY_UNPACK_KEY) {
return inst.popParam()
.thenApplyAsync(new Function<Object, Void>() {
@Override
public Void apply(Object key) {
Tuple tup = subspace().unpack((byte[])key);
for(Object o : tup)
inst.push(o);
return null;
}
return inst.popParam().thenAcceptAsync(key -> {
Tuple tup = subspace().unpack((byte[])key);
for(Object o : tup)
inst.push(o);
});
}
else if(op == DirectoryOperation.DIRECTORY_RANGE) {
return DirectoryUtil.popTuple(inst)
.thenApplyAsync(new Function<Tuple, Void>() {
@Override
public Void apply(Tuple tup) {
Range range = subspace().range(tup);
inst.push(range.begin);
inst.push(range.end);
return null;
}
return DirectoryUtil.popTuple(inst).thenAcceptAsync(tup -> {
Range range = subspace().range(tup);
inst.push(range.begin);
inst.push(range.end);
});
}
else if(op == DirectoryOperation.DIRECTORY_CONTAINS) {
return inst.popParam()
.thenApplyAsync(new Function<Object, Void>() {
@Override
public Void apply(Object key) {
inst.push(subspace().contains((byte[])key) ? 1 : 0);
return null;
}
});
return inst.popParam().thenAccept(key -> inst.push(subspace().contains((byte[])key) ? 1 : 0));
}
else if(op == DirectoryOperation.DIRECTORY_OPEN_SUBSPACE) {
return DirectoryUtil.popTuple(inst)
.thenApplyAsync(new Function<Tuple, Void>() {
@Override
public Void apply(Tuple prefix) {
dirList.add(subspace().subspace(prefix));
return null;
}
});
return DirectoryUtil.popTuple(inst).thenAcceptAsync(prefix -> dirList.add(subspace().subspace(prefix)));
}
else if(op == DirectoryOperation.DIRECTORY_LOG_SUBSPACE) {
return inst.popParam()
.thenComposeAsync(new Function<Object, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(final Object prefix) {
return inst.tcx.runAsync(new Function<Transaction, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Transaction tr) {
tr.set(Tuple.from(dirIndex).pack((byte[])prefix), subspace().getKey());
return CompletableFuture.completedFuture(null);
}
});
}
});
return inst.popParam().thenComposeAsync(prefix ->
inst.tcx.runAsync(tr -> {
tr.set(Tuple.from(dirIndex).pack((byte[])prefix), subspace().getKey());
return AsyncUtil.DONE;
})
);
}
else if(op == DirectoryOperation.DIRECTORY_LOG_DIRECTORY) {
return inst.popParam()
.thenComposeAsync(new Function<Object, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Object prefix) {
final Subspace logSubspace = new Subspace(new Tuple().add(dirIndex), (byte[])prefix);
return inst.tcx.runAsync(new Function<Transaction, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(final Transaction tr) {
return directory().exists(tr)
.thenComposeAsync(new Function<Boolean, CompletableFuture<List<String>>>() {
@Override
public CompletableFuture<List<String>> apply(Boolean exists) {
tr.set(logSubspace.pack("path"), Tuple.fromItems(directory().getPath()).pack());
tr.set(logSubspace.pack("layer"), new Tuple().add(directory().getLayer()).pack());
tr.set(logSubspace.pack("exists"), new Tuple().add(exists ? 1 : 0).pack());
if(exists)
return directory().list(tr);
else
return CompletableFuture.completedFuture(new ArrayList<String>());
}
})
.thenApplyAsync(new Function<List<String>, Void>() {
@Override
public Void apply(List<String> children) {
tr.set(logSubspace.pack("children"), Tuple.fromItems(children).pack());
return null;
}
});
}
});
}
return inst.popParam().thenComposeAsync(prefix -> {
final Subspace logSubspace = new Subspace(new Tuple().add(dirIndex), (byte[])prefix);
return inst.tcx.runAsync(tr -> directory().exists(tr)
.thenComposeAsync(exists -> {
tr.set(logSubspace.pack("path"), Tuple.fromItems(directory().getPath()).pack());
tr.set(logSubspace.pack("layer"), new Tuple().add(directory().getLayer()).pack());
tr.set(logSubspace.pack("exists"), new Tuple().add(exists ? 1 : 0).pack());
if(exists)
return directory().list(tr);
else
return CompletableFuture.completedFuture(Collections.emptyList());
})
.thenAcceptAsync(children -> tr.set(logSubspace.pack("children"), Tuple.fromItems(children).pack()))
);
});
}
else if(op == DirectoryOperation.DIRECTORY_STRIP_PREFIX) {
return inst.popParam()
.thenApplyAsync(new Function<Object, Void>() {
@Override
public Void apply(Object param) {
byte[] str = (byte[])param;
byte[] rawPrefix = subspace().getKey();
return inst.popParam().thenAcceptAsync(param -> {
byte[] str = (byte[])param;
byte[] rawPrefix = subspace().getKey();
if(str.length < rawPrefix.length)
throw new RuntimeException("String does not start with raw prefix");
if(str.length < rawPrefix.length)
throw new RuntimeException("String does not start with raw prefix");
for(int i = 0; i < rawPrefix.length; ++i)
if(str[i] != rawPrefix[i])
throw new RuntimeException("String does not start with raw prefix");
for(int i = 0; i < rawPrefix.length; ++i)
if(str[i] != rawPrefix[i])
throw new RuntimeException("String does not start with raw prefix");
inst.push(Arrays.copyOfRange(str, rawPrefix.length, str.length));
return null;
}
inst.push(Arrays.copyOfRange(str, rawPrefix.length, str.length));
});
}
else {

View File

@ -81,11 +81,11 @@ public class AsyncStackTester {
System.out.println(inst.context.preStr + " - " + "Pushing null");
else
System.out.println(inst.context.preStr + " - " + "Pushing item of type " + item.getClass().getName());*/
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.POP) {
inst.pop();
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.DUP) {
if(inst.size() == 0)
@ -93,11 +93,11 @@ public class AsyncStackTester {
StackEntry e = inst.pop();
inst.push(e);
inst.push(e);
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.EMPTY_STACK) {
inst.clear();
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.SWAP) {
return inst.popParam()
@ -152,7 +152,7 @@ public class AsyncStackTester {
}
else if(op == StackOperation.NEW_TRANSACTION) {
inst.context.newTransaction();
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.USE_TRANSACTION) {
return inst.popParam()
@ -173,7 +173,7 @@ public class AsyncStackTester {
@Override
public CompletableFuture<Void> apply(Transaction tr) {
tr.set((byte[])params.get(0), (byte[])params.get(1));
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
});
}
@ -188,7 +188,7 @@ public class AsyncStackTester {
@Override
public CompletableFuture<Void> apply(Transaction tr) {
tr.clear((byte[])param);
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
});
}
@ -202,7 +202,7 @@ public class AsyncStackTester {
@Override
public CompletableFuture<Void> apply(Transaction tr) {
tr.clear((byte[])params.get(0), (byte[])params.get(1));
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
});
}
@ -216,7 +216,7 @@ public class AsyncStackTester {
@Override
public CompletableFuture<Void> apply(Transaction tr) {
tr.clear(Range.startsWith((byte[])param));
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
});
}
@ -232,7 +232,7 @@ public class AsyncStackTester {
@Override
public CompletableFuture<Void> apply(Transaction tr) {
tr.mutate(optype, (byte[])params.get(1), (byte[])params.get(2));
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
}
);
@ -241,15 +241,15 @@ public class AsyncStackTester {
}
else if(op == StackOperation.COMMIT) {
inst.push(inst.tr.commit());
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.RESET) {
inst.context.newTransaction();
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.CANCEL) {
inst.tr.cancel();
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.READ_CONFLICT_RANGE) {
return inst.popParams(2).thenApplyAsync(new Function<List<Object>, Void>() {
@ -293,7 +293,7 @@ public class AsyncStackTester {
}
else if(op == StackOperation.DISABLE_WRITE_CONFLICT) {
inst.tr.options().setNextWriteNoWriteConflictRange();
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.GET) {
return inst.popParam().thenApplyAsync(new Function<Object, Void>() {
@ -378,7 +378,7 @@ public class AsyncStackTester {
StackUtils.pushError(inst, e);
}
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.GET_VERSIONSTAMP) {
try {
@ -388,13 +388,13 @@ public class AsyncStackTester {
StackUtils.pushError(inst, e);
}
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.SET_READ_VERSION) {
if(inst.context.lastVersion == null)
throw new IllegalArgumentException("Read version has not been read");
inst.tr.setReadVersion(inst.context.lastVersion);
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else if(op == StackOperation.ON_ERROR) {
return inst.popParam().thenComposeAsync(new Function<Object, CompletableFuture<Void>>() {
@ -429,7 +429,7 @@ public class AsyncStackTester {
}
inst.push(f);
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
});
}
@ -693,7 +693,7 @@ public class AsyncStackTester {
tr.set(pk, pv.length < 40000 ? pv : Arrays.copyOfRange(pv, 0, 40000));
}
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
});
}
private static CompletableFuture<Void> logStack(final Instruction inst, final byte[] prefix, int i) {
@ -794,7 +794,7 @@ public class AsyncStackTester {
FDBException ex = StackUtils.getRootFDBException(e);
if(ex != null) {
StackUtils.pushError(inst, ex);
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
else {
CompletableFuture<Void> f = new CompletableFuture<>();
@ -829,7 +829,7 @@ public class AsyncStackTester {
public CompletableFuture<Void> apply(List<KeyValue> next) {
if(next.size() < 1) {
//System.out.println("No key found after: " + ByteArrayUtil.printable(nextKey.getKey()));
return CompletableFuture.completedFuture(null);
return AsyncUtil.DONE;
}
operations = next;