Add a finalizer warning to all Disposable objects. Futures now dispose when marshalled (rather than just release memory). Range futures gets disposed as soon as their results are actually marshalled (in RangeQuery). AsyncIterator is no longer disposable, but a new interface DisposableAsyncIterator provides a disposable version. getBoundaryKeys returns a DisposableAsyncIterator rather than an AsyncIterable. Update the stack testers to dispose of their objects.

This commit is contained in:
A.J. Beamon 2017-12-06 09:58:53 -08:00
parent 3ded271153
commit bfa701ac8e
28 changed files with 628 additions and 430 deletions

View File

@ -25,7 +25,10 @@ import java.util.concurrent.Executor;
/**
* The {@code Cluster} represents a connection to a physical set of cooperating machines
* running FoundationDB. A {@code Cluster} is opened with a reference to a cluster file.
* running FoundationDB. A {@code Cluster} is opened with a reference to a cluster file.<br>
* <br>
* <b>Note:</b> {@code Cluster} objects must be disposed when no longer in use in order
* to free associated native memory.
*/
public class Cluster extends DefaultDisposableImpl implements Disposable {
private ClusterOptions options;
@ -39,12 +42,12 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
this.options = new ClusterOptions(new OptionConsumer() {
@Override
public void setOption(int code, byte[] parameter) {
pointerReadLock.lock();
try {
Cluster_setOption(getPtr(), code, parameter);
} finally {
pointerReadLock.unlock();
}
pointerReadLock.lock();
try {
Cluster_setOption(getPtr(), code, parameter);
} finally {
pointerReadLock.unlock();
}
}
});
}
@ -59,6 +62,7 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
@Override
protected void finalize() throws Throwable {
checkUndisposed("Cluster");
dispose();
super.finalize();
}

View File

@ -45,7 +45,10 @@ public interface Database extends Disposable, TransactionContext {
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.
* {@link Transaction#onError} is called.<br>
* <br>
* <b>Note:</b> {@code Database} objects must be disposed when no longer in use in order
* to free associated native memory.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Database}.
*/

View File

@ -48,6 +48,15 @@ abstract class DefaultDisposableImpl implements Disposable {
return disposed;
}
public void checkUndisposed(String context) {
try {
if(FDB.getInstance().warnOnUndisposed && !disposed) {
System.err.println(context + " not disposed");
}
}
catch(Exception e) {}
}
@Override
public void dispose() {
rwl.writeLock().lock();

View File

@ -21,13 +21,15 @@
package com.apple.foundationdb;
/**
* A FoundationDB object with native resources that can be freed. It is not mandatory to call
* {@link Disposable#dispose()} most of the time, as disposal will happen at finalization.
* An object with native FoundationDB resources that must be freed. Failure to dispose of
* {@code Disposable} objects will result in memory leaks.
*/
public interface Disposable {
/**
* Dispose of the object. This can be called multiple times, but care should be
* taken that an object is not in use in another thread at the time of the call.
* Dispose of the object. This must be called once the object is no longer in use to
* free any native resources associated with the object. This can be called multiple times,
* but care should be taken that an object is not in use in another thread at the time of
* the call.
*/
void dispose();
}

View File

@ -81,6 +81,7 @@ public class FDB {
final int apiVersion;
private volatile boolean netStarted = false;
private volatile boolean netStopped = false;
volatile boolean warnOnUndisposed = true;
final private Semaphore netRunning = new Semaphore(1);
private final NetworkOptions options;
@ -159,8 +160,28 @@ public class FDB {
throw new IllegalArgumentException("API version not supported (minimum 500)");
if(version > 510)
throw new IllegalArgumentException("API version not supported (maximum 510)");
Select_API_version(version);
return singleton = new FDB(version);
FDB fdb = new FDB(version);
if(version < 510) {
fdb.warnOnUndisposed = false;
}
return singleton = fdb;
}
public void setUndisposedWarning(boolean warnOnUndisposed) {
this.warnOnUndisposed = warnOnUndisposed;
}
// Singleton is initialized to null and only set once by a call to selectAPIVersion
static FDB getInstance() {
if(singleton != null) {
return singleton;
}
throw new IllegalStateException("API version has not been selected");
}
/**

View File

@ -101,6 +101,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
@Override
protected void finalize() throws Throwable {
checkUndisposed("Database");
dispose();
super.finalize();
}

View File

@ -60,7 +60,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse, StreamingMode mode) {
return RangeQuery.start(FDBTransaction.this, true, begin, end, limit, reverse, mode);
return new RangeQuery(FDBTransaction.this, true, begin, end, limit, reverse, mode);
}
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
@ -230,7 +230,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse, StreamingMode mode) {
return RangeQuery.start(this, false, begin, end, limit, reverse, mode);
return new RangeQuery(this, false, begin, end, limit, reverse, mode);
}
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
@ -555,7 +555,9 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
protected void finalize() throws Throwable {
checkUndisposed("Transaction");
dispose();
super.finalize();
}
@Override

View File

@ -32,7 +32,7 @@ class FutureCluster extends NativeFuture<Cluster> {
}
@Override
public Cluster getIfDone_internal() throws FDBException {
protected Cluster getIfDone_internal(long cPtr) throws FDBException {
return new Cluster(FutureCluster_get(cPtr), executor);
}

View File

@ -32,7 +32,7 @@ class FutureDatabase extends NativeFuture<Database> {
}
@Override
public Database getIfDone_internal() throws FDBException {
protected Database getIfDone_internal(long cPtr) throws FDBException {
return new FDBDatabase(FutureDatabase_get(cPtr), executor);
}

View File

@ -29,7 +29,7 @@ class FutureKey extends NativeFuture<byte[]> {
}
@Override
public byte[] getIfDone_internal() throws FDBException {
protected byte[] getIfDone_internal(long cPtr) throws FDBException {
return FutureKey_get(cPtr);
}

View File

@ -29,7 +29,7 @@ class FutureResult extends NativeFuture<byte[]> {
}
@Override
public byte[] getIfDone_internal() throws FDBException {
protected byte[] getIfDone_internal(long cPtr) throws FDBException {
return FutureResult_get(cPtr);
}

View File

@ -34,7 +34,7 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
}
@Override
public RangeResultInfo getIfDone_internal() throws FDBException {
protected RangeResultInfo getIfDone_internal(long cPtr) throws FDBException {
FDBException err = Future_getError(cPtr);
if(!err.isSuccess()) {
@ -45,11 +45,23 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
}
public RangeResultSummary getSummary() {
return FutureResults_getSummary(cPtr);
try {
pointerReadLock.lock();
return FutureResults_getSummary(getPtr());
}
finally {
pointerReadLock.unlock();
}
}
public RangeResult getResults() {
return FutureResults_get(cPtr);
try {
pointerReadLock.lock();
return FutureResults_get(getPtr());
}
finally {
pointerReadLock.unlock();
}
}
private native RangeResultSummary FutureResults_getSummary(long ptr) throws FDBException;

View File

@ -29,7 +29,7 @@ class FutureStrings extends NativeFuture<String[]> {
}
@Override
public String[] getIfDone_internal() throws FDBException {
protected String[] getIfDone_internal(long cPtr) throws FDBException {
return FutureStrings_get(cPtr);
}

View File

@ -29,7 +29,7 @@ class FutureVersion extends NativeFuture<Long> {
}
@Override
Long getIfDone_internal() throws FDBException {
protected Long getIfDone_internal(long cPtr) throws FDBException {
return FutureVersion_get(cPtr);
}

View File

@ -29,7 +29,7 @@ class FutureVoid extends NativeFuture<Void> {
}
@Override
public Void getIfDone_internal() throws FDBException {
protected Void getIfDone_internal(long cPtr) throws FDBException {
// With "future-cleanup" we get rid of FutureVoid_get and replace instead
// with a get on the error and throw if the error is not success.
FDBException err = Future_getError(cPtr);

View File

@ -22,14 +22,13 @@ package com.apple.foundationdb;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.async.DisposableAsyncIterator;
import com.apple.foundationdb.tuple.ByteArrayUtil;
/**
@ -41,7 +40,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
*/
public class LocalityUtil {
/**
* Returns a {@code AsyncIterable} of keys {@code k} such that
* Returns a {@code DisposableAsyncIterator} of keys {@code k} such that
* {@code begin <= k < end} and {@code k} is located at the start of a
* contiguous range stored on a single server.<br>
*<br>
@ -54,12 +53,12 @@ public class LocalityUtil {
*
* @return an sequence of keys denoting the start of single-server ranges
*/
public static AsyncIterable<byte[]> getBoundaryKeys(Database db, byte[] begin, byte[] end) {
public static DisposableAsyncIterator<byte[]> getBoundaryKeys(Database db, byte[] begin, byte[] end) {
return getBoundaryKeys_internal(db.createTransaction(), begin, end);
}
/**
* Returns a {@code AsyncIterable} of keys {@code k} such that
* Returns a {@code DisposableAsyncIterator} of keys {@code k} such that
* {@code begin <= k < end} and {@code k} is located at the start of a
* contiguous range stored on a single server.<br>
*<br>
@ -80,13 +79,13 @@ public class LocalityUtil {
*
* @return an sequence of keys denoting the start of single-server ranges
*/
public static AsyncIterable<byte[]> getBoundaryKeys(Transaction tr, byte[] begin, byte[] end) {
public static DisposableAsyncIterator<byte[]> getBoundaryKeys(Transaction tr, byte[] begin, byte[] end) {
Transaction local = tr.getDatabase().createTransaction();
CompletableFuture<Long> readVersion = tr.getReadVersion();
if(readVersion.isDone() && !readVersion.isCompletedExceptionally()) {
local.setReadVersion(readVersion.getNow(null));
}
return new BoundaryIterable(local, begin, end);
return new BoundaryIterator(local, begin, end);
}
/**
@ -111,125 +110,126 @@ public class LocalityUtil {
return ((FDBTransaction)tr).getAddressesForKey(key);
}
private static AsyncIterable<byte[]> getBoundaryKeys_internal(Transaction tr, byte[] begin, byte[] end) {
return new BoundaryIterable(tr, begin, end);
private static DisposableAsyncIterator<byte[]> getBoundaryKeys_internal(Transaction tr, byte[] begin, byte[] end) {
return new BoundaryIterator(tr, begin, end);
}
static class BoundaryIterable implements AsyncIterable<byte[]> {
final Transaction tr;
final byte[] begin;
static class BoundaryIterator implements DisposableAsyncIterator<byte[]> {
Transaction tr;
byte[] begin;
byte[] lastBegin;
final byte[] end;
final AsyncIterable<KeyValue> firstGet;
public BoundaryIterable(Transaction tr, byte[] begin, byte[] end) {
AsyncIterator<KeyValue> block;
private CompletableFuture<Boolean> nextFuture;
private boolean disposed;
BoundaryIterator(Transaction tr, byte[] begin, byte[] end) {
this.tr = tr;
this.begin = Arrays.copyOf(begin, begin.length);
this.end = Arrays.copyOf(end, end.length);
lastBegin = begin;
tr.options().setReadSystemKeys();
tr.options().setLockAware();
firstGet = tr.getRange(keyServersForKey(begin), keyServersForKey(end));
block = firstGet.iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
disposed = false;
}
@Override
public AsyncIterator<byte[]> iterator() {
return new BoundaryIterator();
public CompletableFuture<Boolean> onHasNext() {
return nextFuture;
}
@Override
public CompletableFuture<List<byte[]>> asList() {
return AsyncUtil.collect(this, tr.getExecutor());
}
@Override
public boolean hasNext() {
return nextFuture.join();
}
class BoundaryIterator implements AsyncIterator<byte[]> {
AsyncIterator<KeyValue> block = BoundaryIterable.this.firstGet.iterator();
Transaction tr = BoundaryIterable.this.tr;
byte[] begin = BoundaryIterable.this.begin;
byte[] lastBegin = begin;
private CompletableFuture<Boolean> nextFuture;
public BoundaryIterator() {
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
CompletableFuture<Boolean> restartGet() {
if(ByteArrayUtil.compareUnsigned(begin, end) >= 0) {
return CompletableFuture.completedFuture(false);
}
lastBegin = begin;
tr.options().setReadSystemKeys();
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return nextFuture;
}
BiFunction<Boolean, Throwable, CompletableFuture<Boolean>> handler = new BiFunction<Boolean, Throwable, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> onHasNext() {
return nextFuture;
}
@Override
public boolean hasNext() {
return nextFuture.join();
}
CompletableFuture<Boolean> restartGet() {
if(ByteArrayUtil.compareUnsigned(begin, end) >= 0) {
return CompletableFuture.completedFuture(false);
public CompletableFuture<Boolean> apply(Boolean b, Throwable o) {
if(b != null) {
return CompletableFuture.completedFuture(b);
}
lastBegin = begin;
tr.options().setReadSystemKeys();
block.dispose();
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return nextFuture;
}
BiFunction<Boolean, Throwable, CompletableFuture<Boolean>> handler = new BiFunction<Boolean, Throwable, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Boolean b, Throwable o) {
if(b != null) {
return CompletableFuture.completedFuture(b);
}
if(o instanceof FDBException) {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction();
return restartGet();
}
}
if(!(o instanceof RuntimeException))
throw new CompletionException(o);
CompletableFuture<Transaction> onError = BoundaryIterator.this.tr.onError((RuntimeException) o);
return onError.thenComposeAsync(tr -> {
BoundaryIterator.this.tr = tr;
if(o instanceof FDBException) {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction();
return restartGet();
}, tr.getExecutor());
}
}
};
@Override
public byte[] next() {
if(!nextFuture.isDone()) {
throw new IllegalStateException("Call to next without hasNext()=true");
if(!(o instanceof RuntimeException))
throw new CompletionException(o);
CompletableFuture<Transaction> onError = BoundaryIterator.this.tr.onError(o);
return onError.thenComposeAsync(tr -> {
BoundaryIterator.this.tr = tr;
return restartGet();
}, tr.getExecutor());
}
};
@Override
public byte[] next() {
if(!nextFuture.isDone()) {
throw new IllegalStateException("Call to next without hasNext()=true");
}
KeyValue o = block.next();
byte[] key = o.getKey();
byte[] suffix = Arrays.copyOfRange(key, 13, key.length);
BoundaryIterator.this.begin = ByteArrayUtil.join(suffix, new byte[] { (byte)0 });
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return suffix;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Boundary keys are read-only");
}
@Override
public void cancel() {
BoundaryIterator.this.tr.dispose();
disposed = true;
}
@Override
public void dispose() {
cancel();
}
@Override
protected void finalize() throws Throwable {
try {
if(FDB.getInstance().warnOnUndisposed && !disposed) {
System.err.println("DisposableAsyncIterator not disposed (getBoundaryKeys)");
}
KeyValue o = block.next();
byte[] key = o.getKey();
byte[] suffix = Arrays.copyOfRange(key, 13, key.length);
BoundaryIterator.this.begin = ByteArrayUtil.join(suffix, new byte[] { (byte)0 });
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return suffix;
}
catch(Exception e) {}
@Override
public void remove() {
throw new UnsupportedOperationException("Boundary keys are read-only");
}
@Override
public void cancel() {
// TODO Auto-generated method stub
}
@Override
public void dispose() {
BoundaryIterator.this.tr.dispose();
block.dispose();
}
super.finalize();
}
}

View File

@ -22,9 +22,14 @@ package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
abstract class NativeFuture<T> extends CompletableFuture<T> {
protected final long cPtr;
abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposable {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final Lock pointerReadLock = rwl.readLock();
private long cPtr;
protected NativeFuture(long cPtr) {
this.cPtr = cPtr;
@ -36,15 +41,33 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
// constructor of this class because a quickly completing future can
// lead to a race where the marshalWhenDone tries to run on an
// unconstructed subclass.
//
// Since this must be called from a constructor, we assume that dispose
// cannot be called concurrently.
protected void registerMarshalCallback(Executor executor) {
Future_registerCallback(cPtr, () -> executor.execute(this::marshalWhenDone));
if(cPtr != 0) {
Future_registerCallback(cPtr, () -> executor.execute(this::marshalWhenDone));
}
}
private void marshalWhenDone() {
try {
T val = getIfDone_internal();
postMarshal();
complete(val);
T val = null;
boolean shouldComplete = false;
try {
pointerReadLock.lock();
if(cPtr != 0) {
val = getIfDone_internal(cPtr);
shouldComplete = true;
}
}
finally {
pointerReadLock.unlock();
}
if(shouldComplete) {
complete(val);
}
} catch(FDBException t) {
assert(t.getCode() != 2015); // future_not_set not possible
if(t.getCode() != 1102) { // future_released
@ -52,6 +75,8 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
}
} catch(Throwable t) {
completeExceptionally(t);
} finally {
postMarshal();
}
}
@ -59,23 +84,53 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
dispose();
}
abstract T getIfDone_internal() throws FDBException;
abstract protected T getIfDone_internal(long cPtr) throws FDBException;
public void dispose() {
Future_releaseMemory(cPtr);
@Override
public void dispose() {
long ptr = 0;
rwl.writeLock().lock();
if(cPtr != 0) {
ptr = cPtr;
cPtr = 0;
}
rwl.writeLock().unlock();
if(ptr != 0) {
Future_dispose(ptr);
completeExceptionally(new IllegalStateException("Future has been disposed"));
}
}
@Override
protected void finalize() throws Throwable {
Future_dispose(cPtr);
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);
try {
rwl.readLock().lock();
if(cPtr != 0) {
Future_cancel(cPtr);
}
return result;
}
finally {
rwl.readLock().unlock();
}
}
@Override
public T join() {
Future_blockUntilReady(cPtr);
return super.join();
protected long getPtr() {
// we must have a read lock for this function to make sense, however it
// does not make sense to take the lock here, since the code that uses
// the result must inherently have the read lock itself.
assert( rwl.getReadHoldCount() > 0 );
if(cPtr == 0)
throw new IllegalStateException("Cannot access disposed object");
return cPtr;
}
private native void Future_registerCallback(long cPtr, Runnable callback);
private native void Future_blockUntilReady(long cPtr);
private native boolean Future_isReady(long cPtr);

View File

@ -25,7 +25,6 @@ import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncIterator;
@ -53,12 +52,10 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
private final int rowLimit;
private final boolean reverse;
private final StreamingMode streamingMode;
private final FutureResults firstChunk;
private RangeQuery(FDBTransaction transaction, boolean isSnapshot,
RangeQuery(FDBTransaction transaction, boolean isSnapshot,
KeySelector begin, KeySelector end, int rowLimit,
boolean reverse, StreamingMode streamingMode,
FutureResults firstChunk) {
boolean reverse, StreamingMode streamingMode) {
this.tr = transaction;
this.begin = begin;
this.end = end;
@ -66,17 +63,6 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
this.rowLimit = rowLimit;
this.reverse = reverse;
this.streamingMode = streamingMode;
this.firstChunk = firstChunk;
}
static RangeQuery start(FDBTransaction transaction, boolean isSnapshot,
KeySelector begin, KeySelector end, int rowLimit,
boolean reverse, StreamingMode streamingMode) {
// start the first fetch...
FutureResults firstChunk = transaction.getRange_internal(begin, end,
rowLimit, 0, streamingMode.code(), 1, isSnapshot, reverse);
return new RangeQuery(transaction, isSnapshot, begin, end, rowLimit, reverse, streamingMode, firstChunk);
}
/**
@ -95,20 +81,16 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// if the streaming mode is EXACT, try and grab things as one chunk
if(mode == StreamingMode.EXACT) {
CompletableFuture<RangeResultInfo> range = tr.getRange_internal(
FutureResults range = tr.getRange_internal(
this.begin, this.end, this.rowLimit, 0, StreamingMode.EXACT.code(),
1, this.snapshot, this.reverse);
return range.thenApply(new Function<RangeResultInfo, List<KeyValue>>() {
@Override
public List<KeyValue> apply(RangeResultInfo o) {
return o.get().values;
}
});
return range.thenApply(result -> result.get().values)
.whenComplete((result, e) -> range.dispose());
}
// If the streaming mode is not EXACT, simply collect the results of an iteration into a list
return AsyncUtil.collect(
new RangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode, firstChunk), tr.getExecutor());
new RangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode), tr.getExecutor());
}
/**
@ -130,16 +112,16 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// There is the chance for parallelism in the two "chunks" for fetched data
private RangeResult chunk = null;
private RangeResult nextChunk = null;
private boolean fetchOutstanding = true;
private boolean fetchOutstanding = false;
private byte[] prevKey = null;
private int index = 0;
// The first request is made in the constructor for the parent Iterable, so start at 1
private int iteration = 1;
private int iteration = 0;
private KeySelector begin;
private KeySelector end;
private int rowsRemaining;
private FutureResults fetchingChunk;
private CompletableFuture<Boolean> nextFuture;
private boolean isCancelled = false;
@ -151,19 +133,7 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
this.reverse = reverse;
this.streamingMode = streamingMode;
// Register for completion, etc. on the first chunk. Some of the fields in
// this class were initialized with the knowledge that this fetch is active
// at creation time. This set normally happens in startNextFetch, but
// the first fetch has already been configured and started.
CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
nextFuture = promise;
// FIXME: should we propagate cancellation into the first chuck fetch?
// This would invalidate the whole iterable, not just the iterator
//promise.onCancelledCancel(firstChunk);
// FIXME: I have no idea if this will just get garbage collected away, etc.
firstChunk.whenComplete(new FetchComplete(firstChunk, promise));
startNextFetch();
}
private synchronized boolean mainChunkIsTheLast() {
@ -174,54 +144,61 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
final FutureResults fetchingChunk;
final CompletableFuture<Boolean> promise;
public FetchComplete(FutureResults fetch, CompletableFuture<Boolean> promise) {
FetchComplete(FutureResults fetch, CompletableFuture<Boolean> promise) {
this.fetchingChunk = fetch;
this.promise = promise;
}
@Override
public void accept(RangeResultInfo data, Throwable error) {
final RangeResultSummary summary;
try {
final RangeResultSummary summary;
if(error != null) {
promise.completeExceptionally(error);
if(error instanceof Error) {
throw (Error)error;
if(error != null) {
promise.completeExceptionally(error);
if(error instanceof Error) {
throw (Error) error;
}
return;
}
return;
}
summary = data.getSummary();
if(summary.lastKey == null) {
promise.complete(Boolean.FALSE);
return;
}
synchronized(AsyncRangeIterator.this) {
fetchOutstanding = false;
// adjust the total number of rows we should ever fetch
rowsRemaining -= summary.keyCount;
// set up the next fetch
if (reverse) {
end = KeySelector.firstGreaterOrEqual(summary.lastKey);
} else {
begin = KeySelector.firstGreaterThan(summary.lastKey);
summary = data.getSummary();
if(summary.lastKey == null) {
promise.complete(Boolean.FALSE);
return;
}
// If this is the first fetch or the main chunk is exhausted
if(chunk == null || index == chunk.values.size()) {
nextChunk = null;
chunk = data.get();
index = 0;
} else {
nextChunk = data.get();
}
}
synchronized(AsyncRangeIterator.this) {
fetchOutstanding = false;
promise.complete(Boolean.TRUE);
// adjust the total number of rows we should ever fetch
rowsRemaining -= summary.keyCount;
// set up the next fetch
if(reverse) {
end = KeySelector.firstGreaterOrEqual(summary.lastKey);
}
else {
begin = KeySelector.firstGreaterThan(summary.lastKey);
}
// If this is the first fetch or the main chunk is exhausted
if(chunk == null || index == chunk.values.size()) {
nextChunk = null;
chunk = data.get();
index = 0;
}
else {
nextChunk = data.get();
}
}
promise.complete(Boolean.TRUE);
}
finally {
fetchingChunk.dispose();
}
}
}
@ -231,24 +208,18 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
if(isCancelled)
return;
if(mainChunkIsTheLast())
if(chunk != null && mainChunkIsTheLast())
return;
fetchOutstanding = true;
nextChunk = null;
FutureResults fetchingChunk = tr.getRange_internal(begin, end,
fetchingChunk = tr.getRange_internal(begin, end,
rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(),
++iteration, snapshot, reverse);
CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
nextFuture = promise;
// FIXME: BOOOOOOOOOO! Maybe we don't need this?
// promise.onCancelledCancel(fetchingChunk);
// TODO: again, I have no idea if this will get out-of-scope collected right away
fetchingChunk.whenComplete(new FetchComplete(fetchingChunk, promise));
nextFuture = new CompletableFuture<>();
fetchingChunk.whenComplete(new FetchComplete(fetchingChunk, nextFuture));
}
@Override
@ -341,11 +312,7 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
public synchronized void cancel() {
isCancelled = true;
nextFuture.cancel(true);
}
@Override
public void dispose() {
cancel();
fetchingChunk.cancel(true);
}
private final Function<Boolean, KeyValue> NEXT_MAPPER = new Function<Boolean, KeyValue>() {

View File

@ -68,7 +68,10 @@ import com.apple.foundationdb.tuple.Tuple;
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after {@link #onError}
* is called.
* is called.<br>
* <br>
* <b>Note:</b> {@code Transaction} objects must be disposed when no longer in use in order
* to free associated native memory.
*/
public interface Transaction extends Disposable, ReadTransaction, TransactionContext {

View File

@ -24,8 +24,6 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import com.apple.foundationdb.Disposable;
/**
* A version of {@code Iterator} that allows for non-blocking iteration over elements.
* Calls to {@link #next()} will not block if {@link #onHasNext()} has been called
@ -34,10 +32,10 @@ import com.apple.foundationdb.Disposable;
*
* @param <T> the type of object yielded by {@code next()}
*/
public interface AsyncIterator<T> extends Iterator<T>, Disposable {
public interface AsyncIterator<T> extends Iterator<T> {
/**
* Returns a asynchronous signal for the presence of more elements in the sequence.
* Once the future returned by {@link #onHasNext()} is ready, the next call to
* Once the future returned by {@code onHasNext()} is ready, the next call to
* {@link #next} will not block.
*
* @return a {@code CompletableFuture} that will be set to {@code true} if {@code next()}
@ -78,11 +76,4 @@ public interface AsyncIterator<T> extends Iterator<T>, Disposable {
* Cancels any outstanding asynchronous work associated with this {@code AsyncIterator}.
*/
public void cancel();
/**
* Cancel this {@code AsyncIterable} and dispose of associated resources. Equivalent
* to calling {@link AsyncIterator#cancel()}.
*/
@Override
public void dispose();
}

View File

@ -68,6 +68,18 @@ public class AsyncUtil {
return collect(iterable, DEFAULT_EXECUTOR);
}
/**
* Iterates over a set of items and returns the result as a list.
*
* @param iterator the source of data over which to iterate. This function will exhaust the iterator.
*
* @return a {@code CompletableFuture} which will be set to the amalgamation of results
* from iteration.
*/
public static <V> CompletableFuture<List<V>> collect(final AsyncIterator<V> iterator) {
return collect(iterator, DEFAULT_EXECUTOR);
}
/**
* Iterates over a set of items and returns the result as a list.
*
@ -78,29 +90,32 @@ public class AsyncUtil {
* from iteration.
*/
public static <V> CompletableFuture<List<V>> collect(final AsyncIterable<V> iterable, final Executor executor) {
final AsyncIterator<V> it = iterable.iterator();
final List<V> accumulator = new LinkedList<V>();
return collect(iterable.iterator(), executor);
}
/**
* Iterates over a set of items and returns the result as a list.
*
* @param iterator the source of data over which to iterate. This function will exhaust the iterator.
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code CompletableFuture} which will be set to the amalgamation of results
* from iteration.
*/
public static <V> CompletableFuture<List<V>> collect(final AsyncIterator<V> iterator, final Executor executor) {
final List<V> accumulator = new LinkedList<>();
// The condition of the while loop is simply "onHasNext()" returning true
Supplier<CompletableFuture<Boolean>> condition = new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> get() {
return it.onHasNext().thenApply(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean o) {
if(o) {
accumulator.add(it.next());
}
return o;
}
});
}
};
Supplier<CompletableFuture<Boolean>> condition = () ->
iterator.onHasNext().thenApply(hasNext -> {
if(hasNext) {
accumulator.add(iterator.next());
}
return hasNext;
});
CompletableFuture<Void> complete = whileTrue(condition, executor);
CompletableFuture<List<V>> result = tag(complete, accumulator);
return result.whenComplete((v, t) -> it.dispose());
return tag(complete, accumulator);
}
/**
@ -116,56 +131,102 @@ public class AsyncUtil {
return new AsyncIterable<T>() {
@Override
public AsyncIterator<T> iterator() {
final AsyncIterator<V> it = iterable.iterator();
return new AsyncIterator<T>() {
@Override
public void remove() {
it.remove();
}
@Override
public CompletableFuture<Boolean> onHasNext() {
return it.onHasNext();
}
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public T next() {
return func.apply(it.next());
}
@Override
public void cancel() {
it.cancel();
}
@Override
public void dispose() {
it.dispose();
}
};
return mapIterator(iterable.iterator(), func);
}
@Override
public CompletableFuture<List<T>> asList() {
return iterable.asList().thenApply(new Function<List<V>, List<T>>() {
@Override
public List<T> apply(List<V> o) {
ArrayList<T> out = new ArrayList<T>(o.size());
for(V in : o)
out.add(func.apply(in));
return out;
}
return iterable.asList().thenApply(result -> {
ArrayList<T> out = new ArrayList<>(result.size());
for(V in : result)
out.add(func.apply(in));
return out;
});
}
};
}
/**
* Map an {@code AsyncIterator} into an {@code AsyncIterator} of another type or with
* each element modified in some fashion.
*
* @param iterator input
* @param func mapping function applied to each element
* @return a new iterator with each element mapped to a different value
*/
public static <V, T> AsyncIterator<T> mapIterator(final AsyncIterator<V> iterator,
final Function<V, T> func) {
return new AsyncIterator<T>() {
@Override
public void remove() {
iterator.remove();
}
@Override
public CompletableFuture<Boolean> onHasNext() {
return iterator.onHasNext();
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return func.apply(iterator.next());
}
@Override
public void cancel() {
iterator.cancel();
}
};
}
/**
* Map a {@code DisposableAsyncIterator} into a {@code DisposableAsyncIterator} of another type or with
* each element modified in some fashion.
*
* @param iterator input
* @param func mapping function applied to each element
* @return a new iterator with each element mapped to a different value
*/
public static <V, T> DisposableAsyncIterator<T> mapIterator(final DisposableAsyncIterator<V> iterator,
final Function<V, T> func) {
return new DisposableAsyncIterator<T>() {
@Override
public void remove() {
iterator.remove();
}
@Override
public CompletableFuture<Boolean> onHasNext() {
return iterator.onHasNext();
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return func.apply(iterator.next());
}
@Override
public void cancel() {
iterator.cancel();
}
@Override
public void dispose() {
iterator.dispose();
}
};
}
private static class LoopPartial implements BiFunction<Boolean, Throwable, Void> {
final Supplier<? extends CompletableFuture<Boolean>> body;
final CompletableFuture<Void> done;
@ -320,13 +381,13 @@ public class AsyncUtil {
}).thenCompose(new Function<Throwable, CompletableFuture<V>>() {
@Override
public CompletableFuture<V> apply(Throwable e) {
if (e != null) {
return fn.apply(e);
} else {
return task;
}
if (e != null) {
return fn.apply(e);
} else {
return task;
}
}
});
});
}
/**
@ -342,11 +403,11 @@ public class AsyncUtil {
@Override
public List<V> apply(Void o) {
List<V> result = new ArrayList<V>();
for(CompletableFuture<V> f : tasks) {
assert(f.isDone());
result.add(f.getNow(null));
}
return result;
for(CompletableFuture<V> f : tasks) {
assert(f.isDone());
result.add(f.getNow(null));
}
return result;
}
});
}

View File

@ -0,0 +1,31 @@
/*
* DisposableAsyncIterator.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb.async;
import com.apple.foundationdb.Disposable;
/**
* A version of {@link AsyncIterator} that holds native FDB resources and
* must be disposed once it is no longer in use.
*
* @param <T> the type of object yielded by {@code next()}
*/
public interface DisposableAsyncIterator<T> extends Disposable, AsyncIterator<T> {}

View File

@ -748,8 +748,7 @@ public class DirectoryLayer implements Directory
}
}, tr.getExecutor());
}
}, tr.getExecutor())
.whenComplete((v, t) -> rangeItr.dispose());
}, tr.getExecutor());
}
private CompletableFuture<Boolean> isPrefixFree(final ReadTransaction tr, final byte[] prefix) {
@ -775,8 +774,7 @@ public class DirectoryLayer implements Directory
public Boolean apply(Boolean hasNext) {
return !hasNext;
}
})
.whenComplete((v, t) -> it.dispose());
});
}
}, tr.getExecutor());
}
@ -1182,7 +1180,6 @@ public class DirectoryLayer implements Directory
return null;
}
})
.whenComplete((v, t) -> rangeItr.dispose())
.thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void ignore) {

View File

@ -709,7 +709,9 @@ public class AsyncStackTester {
return inst.tr.commit().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void o) {
inst.tr = inst.context.newTransaction();
inst.releaseTransaction();
inst.context.newTransaction();
inst.tr = inst.context.getCurrentTransaction();
return logStack(inst, prefix, saved);
}
});
@ -718,7 +720,9 @@ public class AsyncStackTester {
return inst.tr.commit().thenApplyAsync(new Function<Void, Void>() {
@Override
public Void apply(Void a) {
inst.tr = inst.context.newTransaction();
inst.releaseTransaction();
inst.context.newTransaction();
inst.tr = inst.context.getCurrentTransaction();
return null;
}
});
@ -784,24 +788,21 @@ public class AsyncStackTester {
}*/
if(inst.op.startsWith(DIRECTORY_PREFIX))
return directoryExtension.processInstruction(inst);
return directoryExtension.processInstruction(inst).whenComplete((x, t) -> inst.releaseTransaction());
else {
return AsyncUtil.composeExceptionally(processInstruction(inst),
new Function<Throwable, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Throwable e) {
FDBException ex = StackUtils.getRootFDBException(e);
if(ex != null) {
StackUtils.pushError(inst, ex);
return CompletableFuture.completedFuture(null);
}
else {
CompletableFuture<Void> f = new CompletableFuture<Void>();
f.completeExceptionally(e);
return f;
}
}
});
return AsyncUtil.composeExceptionally(processInstruction(inst), (e) -> {
FDBException ex = StackUtils.getRootFDBException(e);
if(ex != null) {
StackUtils.pushError(inst, ex);
return CompletableFuture.completedFuture(null);
}
else {
CompletableFuture<Void> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}
})
.whenComplete((x, t) -> inst.releaseTransaction());
}
}
@ -810,8 +811,6 @@ public class AsyncStackTester {
}
CompletableFuture<Void> executeRemainingOperations() {
Transaction t = db.createTransaction();
final Function<Void, CompletableFuture<Void>> processNext = new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void ignore) {
@ -821,7 +820,10 @@ public class AsyncStackTester {
};
if(operations == null || ++currentOp == operations.size()) {
return t.getRange(nextKey, endKey, 1000).asList()
Transaction tr = db.createTransaction();
return tr.getRange(nextKey, endKey, 1000).asList()
.whenComplete((x, t) -> tr.dispose())
.thenComposeAsync(new Function<List<KeyValue>, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(List<KeyValue> next) {
@ -915,6 +917,9 @@ public class AsyncStackTester {
byte[] bs = db.createTransaction().get(key).get();
System.out.println("output of " + ByteArrayUtil.printable(key) + " as: " + ByteArrayUtil.printable(bs));*/
db.dispose();
System.gc();
/*fdb.stopNetwork();
executor.shutdown();*/
}

View File

@ -20,13 +20,13 @@
package com.apple.foundationdb.test;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDBException;
@ -42,12 +42,13 @@ abstract class Context implements Runnable {
final Database db;
final String preStr;
int instructionIndex = 0;
String trName;
KeySelector nextKey, endKey;
Long lastVersion = null;
List<Thread> children = new LinkedList<Thread>();
static ConcurrentHashMap<String, Transaction> transactionMap = new ConcurrentHashMap<>();
private String trName;
private List<Thread> children = new LinkedList<>();
static private Map<String, Transaction> transactionMap = new HashMap<>();
static private Map<Transaction, AtomicInteger> transactionRefCounts = new HashMap<>();
Context(Database db, byte[] prefix) {
this.db = db;
@ -83,42 +84,58 @@ abstract class Context implements Runnable {
}
}
public Transaction getCurrentTransaction() {
return Context.transactionMap.get(this.trName);
}
public void updateCurrentTransaction(Transaction tr) {
Context.transactionMap.put(this.trName, tr);
}
public boolean updateCurrentTransaction(Transaction oldTr, Transaction newTr) {
return Context.transactionMap.replace(this.trName, oldTr, newTr);
}
public Transaction newTransaction() {
Transaction tr = db.createTransaction();
Context.transactionMap.put(this.trName, tr);
public synchronized Transaction getCurrentTransaction() {
Transaction tr = Context.transactionMap.get(this.trName);
Context.transactionRefCounts.get(tr).incrementAndGet();
return tr;
}
public Transaction newTransaction(Transaction oldTr) {
Transaction newTr = db.createTransaction();
boolean replaced = Context.transactionMap.replace(this.trName, oldTr, newTr);
if(replaced) {
return newTr;
}
else {
newTr.cancel();
return Context.transactionMap.get(this.trName);
public synchronized void releaseTransaction(Transaction tr) {
if(tr != null) {
AtomicInteger count = Context.transactionRefCounts.get(tr);
if(count.decrementAndGet() == 0) {
Context.transactionRefCounts.remove(tr);
tr.dispose();
}
}
}
public void switchTransaction(byte[] trName) {
public synchronized void updateCurrentTransaction(Transaction tr) {
Context.transactionRefCounts.putIfAbsent(tr, new AtomicInteger(1));
releaseTransaction(Context.transactionMap.put(this.trName, tr));
}
public synchronized boolean updateCurrentTransaction(Transaction oldTr, Transaction newTr) {
Context.transactionRefCounts.putIfAbsent(newTr, new AtomicInteger(1));
if(Context.transactionMap.replace(this.trName, oldTr, newTr)) {
releaseTransaction(oldTr);
return true;
}
else {
Context.transactionRefCounts.remove(newTr);
return false;
}
}
public void newTransaction() {
Transaction tr = db.createTransaction();
updateCurrentTransaction(tr);
}
public void newTransaction(Transaction oldTr) {
Transaction newTr = db.createTransaction();
if(!updateCurrentTransaction(oldTr, newTr)) {
newTr.dispose();
}
}
public synchronized void switchTransaction(byte[] trName) {
this.trName = ByteArrayUtil.printable(trName);
Transaction tr = db.createTransaction();
Context.transactionRefCounts.put(tr, new AtomicInteger(1));
Transaction previousTr = Context.transactionMap.putIfAbsent(this.trName, tr);
if(previousTr != null) {
tr.cancel();
releaseTransaction(tr);
}
}
@ -140,34 +157,31 @@ abstract class Context implements Runnable {
throw new IllegalArgumentException("Invalid code: " + code);
}
void popParams(int num, final List<Object> params, final CompletableFuture<Void> done) {
private void popParams(int num, final List<Object> params, final CompletableFuture<Void> done) {
while(num-- > 0) {
Object item = stack.pop().value;
if(item instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
final CompletableFuture<Object> future = (CompletableFuture<Object>)item;
final int nextNum = num;
future.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable t) {
if(t != null) {
Throwable root = StackUtils.getRootFDBException(t);
if(root instanceof FDBException) {
params.add(StackUtils.getErrorBytes((FDBException)root));
popParams(nextNum, params, done);
}
else {
done.completeExceptionally(t);
}
}
else {
if(o == null)
params.add("RESULT_NOT_PRESENT".getBytes());
else
params.add(o);
future.whenCompleteAsync((o, t) -> {
if(t != null) {
FDBException root = StackUtils.getRootFDBException(t);
if(root != null) {
params.add(StackUtils.getErrorBytes(root));
popParams(nextNum, params, done);
}
else {
done.completeExceptionally(t);
}
}
else {
if(o == null)
params.add("RESULT_NOT_PRESENT".getBytes());
else
params.add(o);
popParams(nextNum, params, done);
}
});
@ -181,15 +195,16 @@ abstract class Context implements Runnable {
}
CompletableFuture<List<Object>> popParams(int num) {
final List<Object> params = new LinkedList<Object>();
CompletableFuture<Void> done = new CompletableFuture<Void>();
final List<Object> params = new LinkedList<>();
CompletableFuture<Void> done = new CompletableFuture<>();
popParams(num, params, done);
return done.thenApplyAsync(new Function<Void, List<Object>>() {
@Override
public List<Object> apply(Void n) {
return params;
}
});
return done.thenApplyAsync((x) -> params);
}
void dispose() {
for(Transaction tr : transactionMap.values()) {
tr.dispose();
}
}
}

View File

@ -21,7 +21,6 @@
package com.apple.foundationdb.test;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.ReadTransactionContext;
@ -54,8 +53,8 @@ class Instruction extends Stack {
isSnapshot = op.endsWith(SUFFIX_SNAPSHOT);
if(isDatabase) {
this.tr = context.db.createTransaction();
readTr = this.tr;
this.tr = null;
readTr = null;
op = op.substring(0, op.length() - SUFFIX_DATABASE.length());
}
else if(isSnapshot) {
@ -73,19 +72,24 @@ class Instruction extends Stack {
}
void setTransaction(Transaction tr) {
this.tr = tr;
if(isSnapshot) {
readTr = this.tr.snapshot();
}
else {
readTr = tr;
}
if(!isDatabase) {
context.releaseTransaction(this.tr);
context.updateCurrentTransaction(tr);
this.tr = context.getCurrentTransaction();
if(isSnapshot) {
readTr = this.tr.snapshot();
}
else {
readTr = tr;
}
}
}
void releaseTransaction() {
context.releaseTransaction(this.tr);
}
void push(Object o) {
context.stack.push(context.instructionIndex, o);
}
@ -120,10 +124,6 @@ class Instruction extends Stack {
CompletableFuture<Object> popParam() {
return popParams(1)
.thenApplyAsync(new Function<List<Object>, Object>() {
public Object apply(List<Object> params) {
return params.get(0);
}
});
.thenApplyAsync((params) -> params.get(0));
}
}

View File

@ -27,7 +27,7 @@ import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.LocalityUtil;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.DisposableAsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.ByteArrayUtil;
@ -45,11 +45,15 @@ public class LocalityTests {
}
long start = System.currentTimeMillis();
AsyncIterable<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[] { (byte)255 } );
DisposableAsyncIterator<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[] { (byte)255 } );
CompletableFuture<List<byte[]>> collection = AsyncUtil.collect(keys);
List<byte[]> list = collection.join();
System.out.println("Took " + (System.currentTimeMillis() - start) + "ms to get " +
list.size() + " items");
keys.dispose();
int i = 0;
for(byte[] key : collection.join()) {
System.out.println(i++ + ": " + ByteArrayUtil.printable(key));

View File

@ -42,8 +42,11 @@ import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.async.DisposableAsyncIterator;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.async.AsyncUtil;
/**
* Implements a cross-binding test of the FoundationDB API.
*
@ -507,6 +510,8 @@ public class StackTester {
directoryExtension.processInstruction(inst);
else
processInstruction(inst);
inst.releaseTransaction();
}
@Override
@ -515,8 +520,10 @@ public class StackTester {
while(true) {
Transaction t = db.createTransaction();
List<KeyValue> keyValues = t.getRange(begin, endKey/*, 1000*/).asList().join();
if(keyValues.size() == 0)
t.dispose();
if(keyValues.size() == 0) {
break;
}
//System.out.println(" * Got " + keyValues.size() + " instructions");
for(KeyValue next : keyValues) {
@ -525,6 +532,7 @@ public class StackTester {
instructionIndex++;
}
}
//System.out.println(" * Completed " + instructionIndex + " instructions");
}
}
@ -669,22 +677,27 @@ public class StackTester {
tr.options().setTimeout(60*1000);
tr.options().setReadSystemKeys();
tr.getReadVersion().join();
AsyncIterable<byte[]> boundaryKeys = LocalityUtil.getBoundaryKeys(
DisposableAsyncIterator<byte[]> boundaryKeys = LocalityUtil.getBoundaryKeys(
tr, new byte[0], new byte[]{(byte) 255, (byte) 255});
List<byte[]> keys = boundaryKeys.asList().join();
for(int i = 0; i < keys.size() - 1; i++) {
byte[] start = keys.get(i);
byte[] end = tr.getKey(KeySelector.lastLessThan(keys.get(i + 1))).join();
List<String> startAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, start).join());
List<String> endAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, end).join());
for(String a : startAddresses) {
if(!endAddresses.contains(a)) {
throw new RuntimeException("Locality not internally consistent.");
try {
List<byte[]> keys = AsyncUtil.collect(boundaryKeys).join();
for(int i = 0; i < keys.size() - 1; i++) {
byte[] start = keys.get(i);
byte[] end = tr.getKey(KeySelector.lastLessThan(keys.get(i + 1))).join();
List<String> startAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, start).join());
List<String> endAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, end).join());
for(String a : startAddresses) {
if(!endAddresses.contains(a)) {
throw new RuntimeException("Locality not internally consistent.");
}
}
}
}
return null;
return null;
}
finally {
boundaryKeys.dispose();
}
});
}
@ -709,6 +722,8 @@ public class StackTester {
//System.out.println("Starting test...");
c.run();
//System.out.println("Done with test.");
db.dispose();
System.gc();
}
}