Merge remote-tracking branch 'origin/java-add-missing-dispose' into java-future-cleanup

This commit is contained in:
Alec Grieser 2017-12-06 16:44:17 -08:00
commit 7818beac18
34 changed files with 762 additions and 448 deletions

View File

@ -25,7 +25,10 @@ import java.util.concurrent.Executor;
/**
* The {@code Cluster} represents a connection to a physical set of cooperating machines
* running FoundationDB. A {@code Cluster} is opened with a reference to a cluster file.
* running FoundationDB. A {@code Cluster} is opened with a reference to a cluster file.<br>
* <br>
* <b>Note:</b> {@code Cluster} objects must be disposed when no longer in use in order
* to free associated native memory.
*/
public class Cluster extends DefaultDisposableImpl implements Disposable {
private ClusterOptions options;
@ -56,8 +59,13 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
@Override
protected void finalize() throws Throwable {
dispose();
super.finalize();
try {
checkUndisposed("Cluster");
dispose();
}
finally {
super.finalize();
}
}
/**

View File

@ -44,7 +44,10 @@ public interface Database extends Disposable, TransactionContext {
* <br>
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.
* {@link Transaction#onError} is called.<br>
* <br>
* <b>Note:</b> {@code Database} objects must be disposed when no longer in use in order
* to free associated native memory.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Database}.
*/

View File

@ -48,6 +48,15 @@ abstract class DefaultDisposableImpl implements Disposable {
return disposed;
}
public void checkUndisposed(String context) {
try {
if(FDB.getInstance().warnOnUndisposed && !disposed) {
System.err.println(context + " not disposed");
}
}
catch(Exception e) {}
}
@Override
public void dispose() {
rwl.writeLock().lock();

View File

@ -21,13 +21,15 @@
package com.apple.foundationdb;
/**
* A FoundationDB object with native resources that can be freed. It is not mandatory to call
* {@link Disposable#dispose()} most of the time, as disposal will happen at finalization.
* An object with native FoundationDB resources that must be freed. Failure to dispose of
* {@code Disposable} objects will result in memory leaks.
*/
public interface Disposable {
/**
* Dispose of the object. This can be called multiple times, but care should be
* taken that an object is not in use in another thread at the time of the call.
* Dispose of the object. This must be called once the object is no longer in use to
* free any native resources associated with the object. This can be called multiple times,
* but care should be taken that an object is not in use in another thread at the time of
* the call.
*/
void dispose();
}

View File

@ -81,6 +81,7 @@ public class FDB {
final int apiVersion;
private volatile boolean netStarted = false;
private volatile boolean netStopped = false;
volatile boolean warnOnUndisposed = true;
final private Semaphore netRunning = new Semaphore(1);
private final NetworkOptions options;
@ -159,8 +160,28 @@ public class FDB {
throw new IllegalArgumentException("API version not supported (minimum 500)");
if(version > 510)
throw new IllegalArgumentException("API version not supported (maximum 510)");
Select_API_version(version);
return singleton = new FDB(version);
FDB fdb = new FDB(version);
if(version < 510) {
fdb.warnOnUndisposed = false;
}
return singleton = fdb;
}
public void setUndisposedWarning(boolean warnOnUndisposed) {
this.warnOnUndisposed = warnOnUndisposed;
}
// Singleton is initialized to null and only set once by a call to selectAPIVersion
static FDB getInstance() {
if(singleton != null) {
return singleton;
}
throw new IllegalStateException("API version has not been selected");
}
/**
@ -277,7 +298,10 @@ public class FDB {
f = new FutureCluster(Cluster_create(clusterFilePath), e);
}
Cluster c = f.join();
return c.openDatabase(e);
Database db = c.openDatabase(e);
c.dispose();
return db;
}
/**

View File

@ -88,10 +88,9 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
return true;
});
}, e).thenCompose(x -> x);
}, e).thenApply(o -> {
trRef.get().dispose();
return returnValue.get();
});
}, e)
.thenApply(o -> returnValue.get())
.whenComplete((v, t) -> trRef.get().dispose());
}
@Override
@ -102,17 +101,29 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
@Override
protected void finalize() throws Throwable {
dispose();
super.finalize();
try {
checkUndisposed("Database");
dispose();
}
finally {
super.finalize();
}
}
@Override
public Transaction createTransaction(Executor e) {
pointerReadLock.lock();
Transaction tr = null;
try {
Transaction tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr.options().setUsedDuringCommitProtectionDisable();
return tr;
} catch(RuntimeException err) {
if(tr != null) {
tr.dispose();
}
throw err;
} finally {
pointerReadLock.unlock();
}

View File

@ -60,7 +60,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse, StreamingMode mode) {
return RangeQuery.start(FDBTransaction.this, true, begin, end, limit, reverse, mode);
return new RangeQuery(FDBTransaction.this, true, begin, end, limit, reverse, mode);
}
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
@ -230,7 +230,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse, StreamingMode mode) {
return RangeQuery.start(this, false, begin, end, limit, reverse, mode);
return new RangeQuery(this, false, begin, end, limit, reverse, mode);
}
@Override
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
@ -527,10 +527,20 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
// Must hold pointerReadLock when calling
private FDBTransaction transfer() {
FDBTransaction tr = new FDBTransaction(getPtr(), database, executor);
tr.options().setUsedDuringCommitProtectionDisable();
transactionOwner = false;
return tr;
FDBTransaction tr = null;
try {
tr = new FDBTransaction(getPtr(), database, executor);
tr.options().setUsedDuringCommitProtectionDisable();
transactionOwner = false;
return tr;
}
catch(RuntimeException err) {
if(tr != null) {
tr.dispose();
}
throw err;
}
}
@Override
@ -545,7 +555,13 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
@Override
protected void finalize() throws Throwable {
dispose();
try {
checkUndisposed("Transaction");
dispose();
}
finally {
super.finalize();
}
}
@Override

View File

@ -32,7 +32,7 @@ class FutureCluster extends NativeFuture<Cluster> {
}
@Override
public Cluster getIfDone_internal() throws FDBException {
protected Cluster getIfDone_internal(long cPtr) throws FDBException {
return new Cluster(FutureCluster_get(cPtr), executor);
}

View File

@ -32,7 +32,7 @@ class FutureDatabase extends NativeFuture<Database> {
}
@Override
public Database getIfDone_internal() throws FDBException {
protected Database getIfDone_internal(long cPtr) throws FDBException {
return new FDBDatabase(FutureDatabase_get(cPtr), executor);
}

View File

@ -29,7 +29,7 @@ class FutureKey extends NativeFuture<byte[]> {
}
@Override
public byte[] getIfDone_internal() throws FDBException {
protected byte[] getIfDone_internal(long cPtr) throws FDBException {
return FutureKey_get(cPtr);
}

View File

@ -29,7 +29,7 @@ class FutureResult extends NativeFuture<byte[]> {
}
@Override
public byte[] getIfDone_internal() throws FDBException {
protected byte[] getIfDone_internal(long cPtr) throws FDBException {
return FutureResult_get(cPtr);
}

View File

@ -34,7 +34,7 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
}
@Override
public RangeResultInfo getIfDone_internal() throws FDBException {
protected RangeResultInfo getIfDone_internal(long cPtr) throws FDBException {
FDBException err = Future_getError(cPtr);
if(!err.isSuccess()) {
@ -45,11 +45,23 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
}
public RangeResultSummary getSummary() {
return FutureResults_getSummary(cPtr);
try {
pointerReadLock.lock();
return FutureResults_getSummary(getPtr());
}
finally {
pointerReadLock.unlock();
}
}
public RangeResult getResults() {
return FutureResults_get(cPtr);
try {
pointerReadLock.lock();
return FutureResults_get(getPtr());
}
finally {
pointerReadLock.unlock();
}
}
private native RangeResultSummary FutureResults_getSummary(long ptr) throws FDBException;

View File

@ -29,7 +29,7 @@ class FutureStrings extends NativeFuture<String[]> {
}
@Override
public String[] getIfDone_internal() throws FDBException {
protected String[] getIfDone_internal(long cPtr) throws FDBException {
return FutureStrings_get(cPtr);
}

View File

@ -29,7 +29,7 @@ class FutureVersion extends NativeFuture<Long> {
}
@Override
Long getIfDone_internal() throws FDBException {
protected Long getIfDone_internal(long cPtr) throws FDBException {
return FutureVersion_get(cPtr);
}

View File

@ -29,7 +29,7 @@ class FutureVoid extends NativeFuture<Void> {
}
@Override
public Void getIfDone_internal() throws FDBException {
protected Void getIfDone_internal(long cPtr) throws FDBException {
// With "future-cleanup" we get rid of FutureVoid_get and replace instead
// with a get on the error and throw if the error is not success.
FDBException err = Future_getError(cPtr);

View File

@ -22,14 +22,13 @@ package com.apple.foundationdb;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.async.DisposableAsyncIterator;
import com.apple.foundationdb.tuple.ByteArrayUtil;
/**
@ -41,7 +40,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
*/
public class LocalityUtil {
/**
* Returns a {@code AsyncIterable} of keys {@code k} such that
* Returns a {@code DisposableAsyncIterator} of keys {@code k} such that
* {@code begin <= k < end} and {@code k} is located at the start of a
* contiguous range stored on a single server.<br>
*<br>
@ -54,12 +53,12 @@ public class LocalityUtil {
*
* @return an sequence of keys denoting the start of single-server ranges
*/
public static AsyncIterable<byte[]> getBoundaryKeys(Database db, byte[] begin, byte[] end) {
public static DisposableAsyncIterator<byte[]> getBoundaryKeys(Database db, byte[] begin, byte[] end) {
return getBoundaryKeys_internal(db.createTransaction(), begin, end);
}
/**
* Returns a {@code AsyncIterable} of keys {@code k} such that
* Returns a {@code DisposableAsyncIterator} of keys {@code k} such that
* {@code begin <= k < end} and {@code k} is located at the start of a
* contiguous range stored on a single server.<br>
*<br>
@ -80,13 +79,13 @@ public class LocalityUtil {
*
* @return an sequence of keys denoting the start of single-server ranges
*/
public static AsyncIterable<byte[]> getBoundaryKeys(Transaction tr, byte[] begin, byte[] end) {
public static DisposableAsyncIterator<byte[]> getBoundaryKeys(Transaction tr, byte[] begin, byte[] end) {
Transaction local = tr.getDatabase().createTransaction();
CompletableFuture<Long> readVersion = tr.getReadVersion();
if(readVersion.isDone() && !readVersion.isCompletedExceptionally()) {
local.setReadVersion(readVersion.getNow(null));
}
return new BoundaryIterable(local, begin, end);
return new BoundaryIterator(local, begin, end);
}
/**
@ -111,123 +110,125 @@ public class LocalityUtil {
return ((FDBTransaction)tr).getAddressesForKey(key);
}
private static AsyncIterable<byte[]> getBoundaryKeys_internal(Transaction tr, byte[] begin, byte[] end) {
return new BoundaryIterable(tr, begin, end);
private static DisposableAsyncIterator<byte[]> getBoundaryKeys_internal(Transaction tr, byte[] begin, byte[] end) {
return new BoundaryIterator(tr, begin, end);
}
static class BoundaryIterable implements AsyncIterable<byte[]> {
final Transaction tr;
final byte[] begin;
static class BoundaryIterator implements DisposableAsyncIterator<byte[]> {
Transaction tr;
byte[] begin;
byte[] lastBegin;
final byte[] end;
final AsyncIterable<KeyValue> firstGet;
public BoundaryIterable(Transaction tr, byte[] begin, byte[] end) {
AsyncIterator<KeyValue> block;
private CompletableFuture<Boolean> nextFuture;
private boolean disposed;
BoundaryIterator(Transaction tr, byte[] begin, byte[] end) {
this.tr = tr;
this.begin = Arrays.copyOf(begin, begin.length);
this.end = Arrays.copyOf(end, end.length);
lastBegin = begin;
tr.options().setReadSystemKeys();
tr.options().setLockAware();
firstGet = tr.getRange(keyServersForKey(begin), keyServersForKey(end));
block = firstGet.iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
disposed = false;
}
@Override
public AsyncIterator<byte[]> iterator() {
return new BoundaryIterator();
public CompletableFuture<Boolean> onHasNext() {
return nextFuture;
}
@Override
public CompletableFuture<List<byte[]>> asList() {
return AsyncUtil.collect(this, tr.getExecutor());
}
@Override
public boolean hasNext() {
return nextFuture.join();
}
class BoundaryIterator implements AsyncIterator<byte[]> {
AsyncIterator<KeyValue> block = BoundaryIterable.this.firstGet.iterator();
Transaction tr = BoundaryIterable.this.tr;
byte[] begin = BoundaryIterable.this.begin;
byte[] lastBegin = begin;
private CompletableFuture<Boolean> nextFuture;
public BoundaryIterator() {
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
CompletableFuture<Boolean> restartGet() {
if(ByteArrayUtil.compareUnsigned(begin, end) >= 0) {
return CompletableFuture.completedFuture(false);
}
lastBegin = begin;
tr.options().setReadSystemKeys();
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return nextFuture;
}
BiFunction<Boolean, Throwable, CompletableFuture<Boolean>> handler = new BiFunction<Boolean, Throwable, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> onHasNext() {
return nextFuture;
}
@Override
public boolean hasNext() {
return nextFuture.join();
}
CompletableFuture<Boolean> restartGet() {
if(ByteArrayUtil.compareUnsigned(begin, end) >= 0) {
return CompletableFuture.completedFuture(false);
public CompletableFuture<Boolean> apply(Boolean b, Throwable o) {
if(b != null) {
return CompletableFuture.completedFuture(b);
}
lastBegin = begin;
tr.options().setReadSystemKeys();
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return nextFuture;
}
BiFunction<Boolean, Throwable, CompletableFuture<Boolean>> handler = new BiFunction<Boolean, Throwable, CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Boolean b, Throwable o) {
if(b != null) {
return CompletableFuture.completedFuture(b);
}
if(o instanceof FDBException) {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr =
BoundaryIterator.this.tr.getDatabase().createTransaction();
return restartGet();
}
}
if(!(o instanceof RuntimeException))
throw new CompletionException(o);
CompletableFuture<Transaction> onError = BoundaryIterator.this.tr.onError((RuntimeException) o);
return onError.thenComposeAsync(tr -> {
BoundaryIterator.this.tr = tr;
if(o instanceof FDBException) {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction();
return restartGet();
}, tr.getExecutor());
}
}
};
@Override
public byte[] next() {
if(!nextFuture.isDone()) {
throw new IllegalStateException("Call to next without hasNext()=true");
if(!(o instanceof RuntimeException))
throw new CompletionException(o);
CompletableFuture<Transaction> onError = BoundaryIterator.this.tr.onError(o);
return onError.thenComposeAsync(tr -> {
BoundaryIterator.this.tr = tr;
return restartGet();
}, tr.getExecutor());
}
};
@Override
public byte[] next() {
if(!nextFuture.isDone()) {
throw new IllegalStateException("Call to next without hasNext()=true");
}
KeyValue o = block.next();
byte[] key = o.getKey();
byte[] suffix = Arrays.copyOfRange(key, 13, key.length);
BoundaryIterator.this.begin = ByteArrayUtil.join(suffix, new byte[] { (byte)0 });
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return suffix;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Boundary keys are read-only");
}
@Override
public void cancel() {
BoundaryIterator.this.tr.dispose();
disposed = true;
}
@Override
public void dispose() {
cancel();
}
@Override
protected void finalize() throws Throwable {
try {
if(FDB.getInstance().warnOnUndisposed && !disposed) {
System.err.println("DisposableAsyncIterator not disposed (getBoundaryKeys)");
}
KeyValue o = block.next();
byte[] key = o.getKey();
byte[] suffix = Arrays.copyOfRange(key, 13, key.length);
BoundaryIterator.this.begin = ByteArrayUtil.join(suffix, new byte[] { (byte)0 });
nextFuture = block.onHasNext().handleAsync(handler, tr.getExecutor()).thenCompose(x -> x);
return suffix;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Boundary keys are read-only");
}
@Override
public void cancel() {
// TODO Auto-generated method stub
}
@Override
public void dispose() {
BoundaryIterator.this.tr.dispose();
finally {
super.finalize();
}
}
}

View File

@ -22,9 +22,14 @@ package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
abstract class NativeFuture<T> extends CompletableFuture<T> {
protected final long cPtr;
abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposable {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final Lock pointerReadLock = rwl.readLock();
private long cPtr;
protected NativeFuture(long cPtr) {
this.cPtr = cPtr;
@ -36,15 +41,33 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
// constructor of this class because a quickly completing future can
// lead to a race where the marshalWhenDone tries to run on an
// unconstructed subclass.
//
// Since this must be called from a constructor, we assume that dispose
// cannot be called concurrently.
protected void registerMarshalCallback(Executor executor) {
Future_registerCallback(cPtr, () -> executor.execute(this::marshalWhenDone));
if(cPtr != 0) {
Future_registerCallback(cPtr, () -> executor.execute(this::marshalWhenDone));
}
}
private void marshalWhenDone() {
try {
T val = getIfDone_internal();
postMarshal();
complete(val);
T val = null;
boolean shouldComplete = false;
try {
pointerReadLock.lock();
if(cPtr != 0) {
val = getIfDone_internal(cPtr);
shouldComplete = true;
}
}
finally {
pointerReadLock.unlock();
}
if(shouldComplete) {
complete(val);
}
} catch(FDBException t) {
assert(t.getCode() != 2015); // future_not_set not possible
if(t.getCode() != 1102) { // future_released
@ -52,6 +75,8 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
}
} catch(Throwable t) {
completeExceptionally(t);
} finally {
postMarshal();
}
}
@ -59,21 +84,52 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
dispose();
}
abstract T getIfDone_internal() throws FDBException;
abstract protected T getIfDone_internal(long cPtr) throws FDBException;
public void dispose() {
Future_releaseMemory(cPtr);
@Override
public void dispose() {
long ptr = 0;
rwl.writeLock().lock();
if(cPtr != 0) {
ptr = cPtr;
cPtr = 0;
}
rwl.writeLock().unlock();
if(ptr != 0) {
Future_dispose(ptr);
if(!isDone()) {
completeExceptionally(new IllegalStateException("Future has been disposed"));
}
}
}
@Override
protected void finalize() throws Throwable {
Future_dispose(cPtr);
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);
try {
rwl.readLock().lock();
if(cPtr != 0) {
Future_cancel(cPtr);
}
return result;
}
finally {
rwl.readLock().unlock();
}
}
@Override
public T join() {
Future_blockUntilReady(cPtr);
return super.join();
protected long getPtr() {
// we must have a read lock for this function to make sense, however it
// does not make sense to take the lock here, since the code that uses
// the result must inherently have the read lock itself.
assert( rwl.getReadHoldCount() > 0 );
if(cPtr == 0)
throw new IllegalStateException("Cannot access disposed object");
return cPtr;
}
private native void Future_registerCallback(long cPtr, Runnable callback);

View File

@ -25,7 +25,6 @@ import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncIterator;
@ -53,12 +52,10 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
private final int rowLimit;
private final boolean reverse;
private final StreamingMode streamingMode;
private final FutureResults firstChunk;
private RangeQuery(FDBTransaction transaction, boolean isSnapshot,
RangeQuery(FDBTransaction transaction, boolean isSnapshot,
KeySelector begin, KeySelector end, int rowLimit,
boolean reverse, StreamingMode streamingMode,
FutureResults firstChunk) {
boolean reverse, StreamingMode streamingMode) {
this.tr = transaction;
this.begin = begin;
this.end = end;
@ -66,17 +63,6 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
this.rowLimit = rowLimit;
this.reverse = reverse;
this.streamingMode = streamingMode;
this.firstChunk = firstChunk;
}
static RangeQuery start(FDBTransaction transaction, boolean isSnapshot,
KeySelector begin, KeySelector end, int rowLimit,
boolean reverse, StreamingMode streamingMode) {
// start the first fetch...
FutureResults firstChunk = transaction.getRange_internal(begin, end,
rowLimit, 0, streamingMode.code(), 1, isSnapshot, reverse);
return new RangeQuery(transaction, isSnapshot, begin, end, rowLimit, reverse, streamingMode, firstChunk);
}
/**
@ -95,20 +81,16 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// if the streaming mode is EXACT, try and grab things as one chunk
if(mode == StreamingMode.EXACT) {
CompletableFuture<RangeResultInfo> range = tr.getRange_internal(
FutureResults range = tr.getRange_internal(
this.begin, this.end, this.rowLimit, 0, StreamingMode.EXACT.code(),
1, this.snapshot, this.reverse);
return range.thenApply(new Function<RangeResultInfo, List<KeyValue>>() {
@Override
public List<KeyValue> apply(RangeResultInfo o) {
return o.get().values;
}
});
return range.thenApply(result -> result.get().values)
.whenComplete((result, e) -> range.dispose());
}
// If the streaming mode is not EXACT, simply collect the results of an iteration into a list
return AsyncUtil.collect(
new RangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode, firstChunk), tr.getExecutor());
new RangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode), tr.getExecutor());
}
/**
@ -130,16 +112,16 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// There is the chance for parallelism in the two "chunks" for fetched data
private RangeResult chunk = null;
private RangeResult nextChunk = null;
private boolean fetchOutstanding = true;
private boolean fetchOutstanding = false;
private byte[] prevKey = null;
private int index = 0;
// The first request is made in the constructor for the parent Iterable, so start at 1
private int iteration = 1;
private int iteration = 0;
private KeySelector begin;
private KeySelector end;
private int rowsRemaining;
private FutureResults fetchingChunk;
private CompletableFuture<Boolean> nextFuture;
private boolean isCancelled = false;
@ -151,19 +133,7 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
this.reverse = reverse;
this.streamingMode = streamingMode;
// Register for completion, etc. on the first chunk. Some of the fields in
// this class were initialized with the knowledge that this fetch is active
// at creation time. This set normally happens in startNextFetch, but
// the first fetch has already been configured and started.
CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
nextFuture = promise;
// FIXME: should we propagate cancellation into the first chuck fetch?
// This would invalidate the whole iterable, not just the iterator
//promise.onCancelledCancel(firstChunk);
// FIXME: I have no idea if this will just get garbage collected away, etc.
firstChunk.whenComplete(new FetchComplete(firstChunk, promise));
startNextFetch();
}
private synchronized boolean mainChunkIsTheLast() {
@ -174,54 +144,61 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
final FutureResults fetchingChunk;
final CompletableFuture<Boolean> promise;
public FetchComplete(FutureResults fetch, CompletableFuture<Boolean> promise) {
FetchComplete(FutureResults fetch, CompletableFuture<Boolean> promise) {
this.fetchingChunk = fetch;
this.promise = promise;
}
@Override
public void accept(RangeResultInfo data, Throwable error) {
final RangeResultSummary summary;
try {
final RangeResultSummary summary;
if(error != null) {
promise.completeExceptionally(error);
if(error instanceof Error) {
throw (Error)error;
if(error != null) {
promise.completeExceptionally(error);
if(error instanceof Error) {
throw (Error) error;
}
return;
}
return;
}
summary = data.getSummary();
if(summary.lastKey == null) {
promise.complete(Boolean.FALSE);
return;
}
synchronized(AsyncRangeIterator.this) {
fetchOutstanding = false;
// adjust the total number of rows we should ever fetch
rowsRemaining -= summary.keyCount;
// set up the next fetch
if (reverse) {
end = KeySelector.firstGreaterOrEqual(summary.lastKey);
} else {
begin = KeySelector.firstGreaterThan(summary.lastKey);
summary = data.getSummary();
if(summary.lastKey == null) {
promise.complete(Boolean.FALSE);
return;
}
// If this is the first fetch or the main chunk is exhausted
if(chunk == null || index == chunk.values.size()) {
nextChunk = null;
chunk = data.get();
index = 0;
} else {
nextChunk = data.get();
}
}
synchronized(AsyncRangeIterator.this) {
fetchOutstanding = false;
promise.complete(Boolean.TRUE);
// adjust the total number of rows we should ever fetch
rowsRemaining -= summary.keyCount;
// set up the next fetch
if(reverse) {
end = KeySelector.firstGreaterOrEqual(summary.lastKey);
}
else {
begin = KeySelector.firstGreaterThan(summary.lastKey);
}
// If this is the first fetch or the main chunk is exhausted
if(chunk == null || index == chunk.values.size()) {
nextChunk = null;
chunk = data.get();
index = 0;
}
else {
nextChunk = data.get();
}
}
promise.complete(Boolean.TRUE);
}
finally {
fetchingChunk.dispose();
}
}
}
@ -231,24 +208,18 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
if(isCancelled)
return;
if(mainChunkIsTheLast())
if(chunk != null && mainChunkIsTheLast())
return;
fetchOutstanding = true;
nextChunk = null;
FutureResults fetchingChunk = tr.getRange_internal(begin, end,
fetchingChunk = tr.getRange_internal(begin, end,
rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(),
++iteration, snapshot, reverse);
CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
nextFuture = promise;
// FIXME: BOOOOOOOOOO! Maybe we don't need this?
// promise.onCancelledCancel(fetchingChunk);
// TODO: again, I have no idea if this will get out-of-scope collected right away
fetchingChunk.whenComplete(new FetchComplete(fetchingChunk, promise));
nextFuture = new CompletableFuture<>();
fetchingChunk.whenComplete(new FetchComplete(fetchingChunk, nextFuture));
}
@Override
@ -340,11 +311,7 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
public synchronized void cancel() {
isCancelled = true;
nextFuture.cancel(true);
}
@Override
public void dispose() {
cancel();
fetchingChunk.cancel(true);
}
}
}

View File

@ -68,7 +68,10 @@ import com.apple.foundationdb.tuple.Tuple;
* <br>
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after {@link #onError}
* is called.
* is called.<br>
* <br>
* <b>Note:</b> {@code Transaction} objects must be disposed when no longer in use in order
* to free associated native memory.
*/
public interface Transaction extends Disposable, ReadTransaction, TransactionContext {

View File

@ -24,8 +24,6 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import com.apple.foundationdb.Disposable;
/**
* A version of {@code Iterator} that allows for non-blocking iteration over elements.
* Calls to {@link #next()} will not block if {@link #onHasNext()} has been called
@ -34,10 +32,10 @@ import com.apple.foundationdb.Disposable;
*
* @param <T> the type of object yielded by {@code next()}
*/
public interface AsyncIterator<T> extends Iterator<T>, Disposable {
public interface AsyncIterator<T> extends Iterator<T> {
/**
* Returns a asynchronous signal for the presence of more elements in the sequence.
* Once the future returned by {@link #onHasNext()} is ready, the next call to
* Once the future returned by {@code onHasNext()} is ready, the next call to
* {@link #next} will not block.
*
* @return a {@code CompletableFuture} that will be set to {@code true} if {@code next()}
@ -78,11 +76,4 @@ public interface AsyncIterator<T> extends Iterator<T>, Disposable {
* Cancels any outstanding asynchronous work associated with this {@code AsyncIterator}.
*/
void cancel();
/**
* Cancel this {@code AsyncIterable} and dispose of associated resources. Equivalent
* to calling {@link AsyncIterator#cancel()}.
*/
@Override
void dispose();
}

View File

@ -68,6 +68,18 @@ public class AsyncUtil {
return collect(iterable, DEFAULT_EXECUTOR);
}
/**
* Iterates over a set of items and returns the result as a list.
*
* @param iterator the source of data over which to iterate. This function will exhaust the iterator.
*
* @return a {@code CompletableFuture} which will be set to the amalgamation of results
* from iteration.
*/
public static <V> CompletableFuture<List<V>> collect(final AsyncIterator<V> iterator) {
return collect(iterator, DEFAULT_EXECUTOR);
}
/**
* Iterates over a set of items and returns the result as a list.
*
@ -78,29 +90,32 @@ public class AsyncUtil {
* from iteration.
*/
public static <V> CompletableFuture<List<V>> collect(final AsyncIterable<V> iterable, final Executor executor) {
final AsyncIterator<V> it = iterable.iterator();
final List<V> accumulator = new LinkedList<V>();
return collect(iterable.iterator(), executor);
}
/**
* Iterates over a set of items and returns the result as a list.
*
* @param iterator the source of data over which to iterate. This function will exhaust the iterator.
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code CompletableFuture} which will be set to the amalgamation of results
* from iteration.
*/
public static <V> CompletableFuture<List<V>> collect(final AsyncIterator<V> iterator, final Executor executor) {
final List<V> accumulator = new LinkedList<>();
// The condition of the while loop is simply "onHasNext()" returning true
Supplier<CompletableFuture<Boolean>> condition = new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> get() {
return it.onHasNext().thenApply(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean o) {
if(o) {
accumulator.add(it.next());
}
return o;
}
});
}
};
Supplier<CompletableFuture<Boolean>> condition = () ->
iterator.onHasNext().thenApply(hasNext -> {
if(hasNext) {
accumulator.add(iterator.next());
}
return hasNext;
});
CompletableFuture<Void> complete = whileTrue(condition, executor);
CompletableFuture<List<V>> result = tag(complete, accumulator);
return result;
return tag(complete, accumulator);
}
/**
@ -116,56 +131,102 @@ public class AsyncUtil {
return new AsyncIterable<T>() {
@Override
public AsyncIterator<T> iterator() {
final AsyncIterator<V> it = iterable.iterator();
return new AsyncIterator<T>() {
@Override
public void remove() {
it.remove();
}
@Override
public CompletableFuture<Boolean> onHasNext() {
return it.onHasNext();
}
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public T next() {
return func.apply(it.next());
}
@Override
public void cancel() {
it.cancel();
}
@Override
public void dispose() {
it.dispose();
}
};
return mapIterator(iterable.iterator(), func);
}
@Override
public CompletableFuture<List<T>> asList() {
return iterable.asList().thenApply(new Function<List<V>, List<T>>() {
@Override
public List<T> apply(List<V> o) {
ArrayList<T> out = new ArrayList<T>(o.size());
for(V in : o)
out.add(func.apply(in));
return out;
}
return iterable.asList().thenApply(result -> {
ArrayList<T> out = new ArrayList<>(result.size());
for(V in : result)
out.add(func.apply(in));
return out;
});
}
};
}
/**
* Map an {@code AsyncIterator} into an {@code AsyncIterator} of another type or with
* each element modified in some fashion.
*
* @param iterator input
* @param func mapping function applied to each element
* @return a new iterator with each element mapped to a different value
*/
public static <V, T> AsyncIterator<T> mapIterator(final AsyncIterator<V> iterator,
final Function<V, T> func) {
return new AsyncIterator<T>() {
@Override
public void remove() {
iterator.remove();
}
@Override
public CompletableFuture<Boolean> onHasNext() {
return iterator.onHasNext();
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return func.apply(iterator.next());
}
@Override
public void cancel() {
iterator.cancel();
}
};
}
/**
* Map a {@code DisposableAsyncIterator} into a {@code DisposableAsyncIterator} of another type or with
* each element modified in some fashion.
*
* @param iterator input
* @param func mapping function applied to each element
* @return a new iterator with each element mapped to a different value
*/
public static <V, T> DisposableAsyncIterator<T> mapIterator(final DisposableAsyncIterator<V> iterator,
final Function<V, T> func) {
return new DisposableAsyncIterator<T>() {
@Override
public void remove() {
iterator.remove();
}
@Override
public CompletableFuture<Boolean> onHasNext() {
return iterator.onHasNext();
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return func.apply(iterator.next());
}
@Override
public void cancel() {
iterator.cancel();
}
@Override
public void dispose() {
iterator.dispose();
}
};
}
private static class LoopPartial implements BiFunction<Boolean, Throwable, Void> {
final Supplier<? extends CompletableFuture<Boolean>> body;
final CompletableFuture<Void> done;
@ -305,7 +366,7 @@ public class AsyncUtil {
} else {
return task;
}
});
});
}
/**

View File

@ -0,0 +1,31 @@
/*
* DisposableAsyncIterator.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb.async;
import com.apple.foundationdb.Disposable;
/**
* A version of {@link AsyncIterator} that holds native FDB resources and
* must be disposed once it is no longer in use.
*
* @param <T> the type of object yielded by {@code next()}
*/
public interface DisposableAsyncIterator<T> extends Disposable, AsyncIterator<T> {}

View File

@ -779,7 +779,7 @@ public class DirectoryLayer implements Directory
//return new ReadyFuture<Boolean>(false);
return CompletableFuture.completedFuture(false);
AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext()
.thenApply(new Function<Boolean, Boolean>() {
@Override

View File

@ -418,7 +418,7 @@ public class AsyncStackTester {
inst.context.newTransaction(oldTr); // Other bindings allow reuse of non-retryable transactions, so we need to emulate that behavior.
}
else {
inst.context.updateCurrentTransaction(oldTr, tr);
inst.setTransaction(oldTr, tr);
}
})
.thenApply(v -> null);
@ -709,7 +709,9 @@ public class AsyncStackTester {
return inst.tr.commit().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void o) {
inst.tr = inst.context.newTransaction();
inst.releaseTransaction();
inst.context.newTransaction();
inst.tr = inst.context.getCurrentTransaction();
return logStack(inst, prefix, saved);
}
});
@ -718,7 +720,9 @@ public class AsyncStackTester {
return inst.tr.commit().thenApplyAsync(new Function<Void, Void>() {
@Override
public Void apply(Void a) {
inst.tr = inst.context.newTransaction();
inst.releaseTransaction();
inst.context.newTransaction();
inst.tr = inst.context.getCurrentTransaction();
return null;
}
});
@ -784,24 +788,21 @@ public class AsyncStackTester {
}*/
if(inst.op.startsWith(DIRECTORY_PREFIX))
return directoryExtension.processInstruction(inst);
return directoryExtension.processInstruction(inst).whenComplete((x, t) -> inst.releaseTransaction());
else {
return AsyncUtil.composeExceptionally(processInstruction(inst),
new Function<Throwable, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Throwable e) {
FDBException ex = StackUtils.getRootFDBException(e);
if(ex != null) {
StackUtils.pushError(inst, ex);
return CompletableFuture.completedFuture(null);
}
else {
CompletableFuture<Void> f = new CompletableFuture<Void>();
f.completeExceptionally(e);
return f;
}
}
});
return AsyncUtil.composeExceptionally(processInstruction(inst), (e) -> {
FDBException ex = StackUtils.getRootFDBException(e);
if(ex != null) {
StackUtils.pushError(inst, ex);
return CompletableFuture.completedFuture(null);
}
else {
CompletableFuture<Void> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}
})
.whenComplete((x, t) -> inst.releaseTransaction());
}
}
@ -810,8 +811,6 @@ public class AsyncStackTester {
}
CompletableFuture<Void> executeRemainingOperations() {
Transaction t = db.createTransaction();
final Function<Void, CompletableFuture<Void>> processNext = new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void ignore) {
@ -821,7 +820,10 @@ public class AsyncStackTester {
};
if(operations == null || ++currentOp == operations.size()) {
return t.getRange(nextKey, endKey, 1000).asList()
Transaction tr = db.createTransaction();
return tr.getRange(nextKey, endKey, 1000).asList()
.whenComplete((x, t) -> tr.dispose())
.thenComposeAsync(new Function<List<KeyValue>, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(List<KeyValue> next) {
@ -915,6 +917,9 @@ public class AsyncStackTester {
byte[] bs = db.createTransaction().get(key).get();
System.out.println("output of " + ByteArrayUtil.printable(key) + " as: " + ByteArrayUtil.printable(bs));*/
db.dispose();
System.gc();
/*fdb.stopNetwork();
executor.shutdown();*/
}

View File

@ -20,13 +20,13 @@
package com.apple.foundationdb.test;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDBException;
@ -42,12 +42,13 @@ abstract class Context implements Runnable {
final Database db;
final String preStr;
int instructionIndex = 0;
String trName;
KeySelector nextKey, endKey;
Long lastVersion = null;
List<Thread> children = new LinkedList<Thread>();
static ConcurrentHashMap<String, Transaction> transactionMap = new ConcurrentHashMap<>();
private String trName;
private List<Thread> children = new LinkedList<>();
static private Map<String, Transaction> transactionMap = new HashMap<>();
static private Map<Transaction, AtomicInteger> transactionRefCounts = new HashMap<>();
Context(Database db, byte[] prefix) {
this.db = db;
@ -83,43 +84,54 @@ abstract class Context implements Runnable {
}
}
public Transaction getCurrentTransaction() {
return Context.transactionMap.get(this.trName);
}
public void updateCurrentTransaction(Transaction tr) {
Context.transactionMap.put(this.trName, tr);
}
public boolean updateCurrentTransaction(Transaction oldTr, Transaction newTr) {
return Context.transactionMap.replace(this.trName, oldTr, newTr);
}
public Transaction newTransaction() {
Transaction tr = db.createTransaction();
Context.transactionMap.put(this.trName, tr);
public synchronized Transaction getCurrentTransaction() {
Transaction tr = Context.transactionMap.get(this.trName);
Context.transactionRefCounts.get(tr).incrementAndGet();
return tr;
}
public Transaction newTransaction(Transaction oldTr) {
Transaction newTr = db.createTransaction();
boolean replaced = Context.transactionMap.replace(this.trName, oldTr, newTr);
if(replaced) {
return newTr;
}
else {
newTr.cancel();
return Context.transactionMap.get(this.trName);
public synchronized void releaseTransaction(Transaction tr) {
if(tr != null) {
AtomicInteger count = Context.transactionRefCounts.get(tr);
if(count.decrementAndGet() == 0) {
Context.transactionRefCounts.remove(tr);
tr.dispose();
}
}
}
public void switchTransaction(byte[] trName) {
this.trName = ByteArrayUtil.printable(trName);
Transaction tr = db.createTransaction();
Transaction previousTr = Context.transactionMap.putIfAbsent(this.trName, tr);
if(previousTr != null) {
tr.cancel();
public synchronized void updateCurrentTransaction(Transaction tr) {
Context.transactionRefCounts.computeIfAbsent(tr, x -> new AtomicInteger(1));
releaseTransaction(Context.transactionMap.put(this.trName, tr));
}
public synchronized boolean updateCurrentTransaction(Transaction oldTr, Transaction newTr) {
if(Context.transactionMap.replace(this.trName, oldTr, newTr)) {
AtomicInteger count = Context.transactionRefCounts.computeIfAbsent(newTr, x -> new AtomicInteger(0));
count.incrementAndGet();
releaseTransaction(oldTr);
return true;
}
return false;
}
public void newTransaction() {
Transaction tr = db.createTransaction();
updateCurrentTransaction(tr);
}
public void newTransaction(Transaction oldTr) {
Transaction newTr = db.createTransaction();
if(!updateCurrentTransaction(oldTr, newTr)) {
newTr.dispose();
}
}
public synchronized void switchTransaction(byte[] trName) {
this.trName = ByteArrayUtil.printable(trName);
Transaction tr = Context.transactionMap.computeIfAbsent(this.trName, x -> db.createTransaction());
Context.transactionRefCounts.computeIfAbsent(tr, x -> new AtomicInteger(1));
}
abstract void executeOperations() throws Throwable;
@ -140,34 +152,31 @@ abstract class Context implements Runnable {
throw new IllegalArgumentException("Invalid code: " + code);
}
void popParams(int num, final List<Object> params, final CompletableFuture<Void> done) {
private void popParams(int num, final List<Object> params, final CompletableFuture<Void> done) {
while(num-- > 0) {
Object item = stack.pop().value;
if(item instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
final CompletableFuture<Object> future = (CompletableFuture<Object>)item;
final int nextNum = num;
future.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable t) {
if(t != null) {
Throwable root = StackUtils.getRootFDBException(t);
if(root instanceof FDBException) {
params.add(StackUtils.getErrorBytes((FDBException)root));
popParams(nextNum, params, done);
}
else {
done.completeExceptionally(t);
}
}
else {
if(o == null)
params.add("RESULT_NOT_PRESENT".getBytes());
else
params.add(o);
future.whenCompleteAsync((o, t) -> {
if(t != null) {
FDBException root = StackUtils.getRootFDBException(t);
if(root != null) {
params.add(StackUtils.getErrorBytes(root));
popParams(nextNum, params, done);
}
else {
done.completeExceptionally(t);
}
}
else {
if(o == null)
params.add("RESULT_NOT_PRESENT".getBytes());
else
params.add(o);
popParams(nextNum, params, done);
}
});
@ -181,15 +190,16 @@ abstract class Context implements Runnable {
}
CompletableFuture<List<Object>> popParams(int num) {
final List<Object> params = new LinkedList<Object>();
CompletableFuture<Void> done = new CompletableFuture<Void>();
final List<Object> params = new LinkedList<>();
CompletableFuture<Void> done = new CompletableFuture<>();
popParams(num, params, done);
return done.thenApplyAsync(new Function<Void, List<Object>>() {
@Override
public List<Object> apply(Void n) {
return params;
}
});
return done.thenApplyAsync((x) -> params);
}
void dispose() {
for(Transaction tr : transactionMap.values()) {
tr.dispose();
}
}
}

View File

@ -21,7 +21,6 @@
package com.apple.foundationdb.test;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.ReadTransactionContext;
@ -54,8 +53,8 @@ class Instruction extends Stack {
isSnapshot = op.endsWith(SUFFIX_SNAPSHOT);
if(isDatabase) {
this.tr = context.db.createTransaction();
readTr = this.tr;
this.tr = null;
readTr = null;
op = op.substring(0, op.length() - SUFFIX_DATABASE.length());
}
else if(isSnapshot) {
@ -73,19 +72,38 @@ class Instruction extends Stack {
}
void setTransaction(Transaction tr) {
this.tr = tr;
if(isSnapshot) {
readTr = this.tr.snapshot();
}
else {
readTr = tr;
}
if(!isDatabase) {
context.releaseTransaction(this.tr);
context.updateCurrentTransaction(tr);
this.tr = context.getCurrentTransaction();
if(isSnapshot) {
readTr = this.tr.snapshot();
}
else {
readTr = tr;
}
}
}
void setTransaction(Transaction oldTr, Transaction newTr) {
if(!isDatabase) {
context.updateCurrentTransaction(oldTr, newTr);
this.tr = context.getCurrentTransaction();
if(isSnapshot) {
readTr = this.tr.snapshot();
}
else {
readTr = tr;
}
}
}
void releaseTransaction() {
context.releaseTransaction(this.tr);
}
void push(Object o) {
context.stack.push(context.instructionIndex, o);
}
@ -120,10 +138,6 @@ class Instruction extends Stack {
CompletableFuture<Object> popParam() {
return popParams(1)
.thenApplyAsync(new Function<List<Object>, Object>() {
public Object apply(List<Object> params) {
return params.get(0);
}
});
.thenApplyAsync((params) -> params.get(0));
}
}

View File

@ -27,7 +27,7 @@ import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.LocalityUtil;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.DisposableAsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.ByteArrayUtil;
@ -45,11 +45,15 @@ public class LocalityTests {
}
long start = System.currentTimeMillis();
AsyncIterable<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[] { (byte)255 } );
DisposableAsyncIterator<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[] { (byte)255 } );
CompletableFuture<List<byte[]>> collection = AsyncUtil.collect(keys);
List<byte[]> list = collection.join();
System.out.println("Took " + (System.currentTimeMillis() - start) + "ms to get " +
list.size() + " items");
keys.dispose();
int i = 0;
for(byte[] key : collection.join()) {
System.out.println(i++ + ": " + ByteArrayUtil.printable(key));

View File

@ -42,8 +42,11 @@ import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.async.DisposableAsyncIterator;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.async.AsyncUtil;
/**
* Implements a cross-binding test of the FoundationDB API.
*
@ -507,6 +510,8 @@ public class StackTester {
directoryExtension.processInstruction(inst);
else
processInstruction(inst);
inst.releaseTransaction();
}
@Override
@ -515,8 +520,10 @@ public class StackTester {
while(true) {
Transaction t = db.createTransaction();
List<KeyValue> keyValues = t.getRange(begin, endKey/*, 1000*/).asList().join();
if(keyValues.size() == 0)
t.dispose();
if(keyValues.size() == 0) {
break;
}
//System.out.println(" * Got " + keyValues.size() + " instructions");
for(KeyValue next : keyValues) {
@ -525,6 +532,7 @@ public class StackTester {
instructionIndex++;
}
}
//System.out.println(" * Completed " + instructionIndex + " instructions");
}
}
@ -669,22 +677,27 @@ public class StackTester {
tr.options().setTimeout(60*1000);
tr.options().setReadSystemKeys();
tr.getReadVersion().join();
AsyncIterable<byte[]> boundaryKeys = LocalityUtil.getBoundaryKeys(
DisposableAsyncIterator<byte[]> boundaryKeys = LocalityUtil.getBoundaryKeys(
tr, new byte[0], new byte[]{(byte) 255, (byte) 255});
List<byte[]> keys = boundaryKeys.asList().join();
for(int i = 0; i < keys.size() - 1; i++) {
byte[] start = keys.get(i);
byte[] end = tr.getKey(KeySelector.lastLessThan(keys.get(i + 1))).join();
List<String> startAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, start).join());
List<String> endAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, end).join());
for(String a : startAddresses) {
if(!endAddresses.contains(a)) {
throw new RuntimeException("Locality not internally consistent.");
try {
List<byte[]> keys = AsyncUtil.collect(boundaryKeys).join();
for(int i = 0; i < keys.size() - 1; i++) {
byte[] start = keys.get(i);
byte[] end = tr.getKey(KeySelector.lastLessThan(keys.get(i + 1))).join();
List<String> startAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, start).join());
List<String> endAddresses = Arrays.asList(LocalityUtil.getAddressesForKey(tr, end).join());
for(String a : startAddresses) {
if(!endAddresses.contains(a)) {
throw new RuntimeException("Locality not internally consistent.");
}
}
}
}
return null;
return null;
}
finally {
boundaryKeys.dispose();
}
});
}
@ -709,6 +722,8 @@ public class StackTester {
//System.out.println("Starting test...");
c.run();
//System.out.println("Done with test.");
db.dispose();
System.gc();
}
}

View File

@ -277,7 +277,10 @@ public class FDB {
f = new FutureCluster(Cluster_create(clusterFilePath), e);
}
Cluster c = f.get();
return c.openDatabase(e);
Database db = c.openDatabase(e);
c.dispose();
return db;
}
/**

View File

@ -121,7 +121,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
public <T> Future<T> runAsync(final Function<? super Transaction, Future<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<Transaction>(createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<T>();
return AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
Future<T> result = AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
@Override
public Future<Boolean> apply(Void v) {
Future<T> process = AsyncUtil.applySafely(retryable, trRef.get());
@ -150,13 +150,21 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
}
});
}
}).map(new Function<Void, T>(){
}).map(new Function<Void, T>() {
@Override
public T apply(Void o) {
trRef.get().dispose();
return returnValue.get();
}
});
result.onReady(new Runnable() {
@Override
public void run() {
trRef.get().dispose();
}
});
return result;
}
@Override
@ -180,7 +188,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
public <T> PartialFuture<T> runAsync(final PartialFunction<? super Transaction, ? extends PartialFuture<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<Transaction>(createTransaction());
final AtomicReference<T> returnValue = new AtomicReference<T>();
return AsyncUtil.whileTrue(new Function<Void, PartialFuture<Boolean>>() {
PartialFuture<T> result = AsyncUtil.whileTrue(new Function<Void, PartialFuture<Boolean>>() {
@Override
public PartialFuture<Boolean> apply(Void v) {
PartialFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());
@ -209,13 +217,21 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
}
});
}
}).map(new Function<Void, T>(){
}).map(new Function<Void, T>() {
@Override
public T apply(Void o) {
trRef.get().dispose();
return returnValue.get();
}
});
result.onReady(new Runnable() {
@Override
public void run() {
trRef.get().dispose();
}
});
return result;
}
@Override
@ -244,10 +260,17 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
@Override
public Transaction createTransaction(Executor e) {
pointerReadLock.lock();
Transaction tr = null;
try {
Transaction tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr.options().setUsedDuringCommitProtectionDisable();
return tr;
} catch(RuntimeException err) {
if(tr != null) {
tr.dispose();
}
throw err;
} finally {
pointerReadLock.unlock();
}
@ -276,4 +299,4 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
private native long Database_createTransaction(long cPtr);
private native void Database_dispose(long cPtr);
private native void Database_setOption(long cPtr, int code, byte[] value) throws FDBException;
}
}

View File

@ -579,10 +579,20 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
// Must hold pointerReadLock when calling
private FDBTransaction transfer() {
FDBTransaction tr = new FDBTransaction(getPtr(), database, executor);
tr.options().setUsedDuringCommitProtectionDisable();
transactionOwner = false;
return tr;
FDBTransaction tr = null;
try {
tr = new FDBTransaction(getPtr(), database, executor);
tr.options().setUsedDuringCommitProtectionDisable();
transactionOwner = false;
return tr;
}
catch(RuntimeException err) {
if(tr != null) {
tr.dispose();
}
throw err;
}
}
@Override

View File

@ -165,6 +165,7 @@ public class LocalityUtil {
}
lastBegin = begin;
tr.options().setReadSystemKeys();
block.dispose();
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
@ -179,8 +180,7 @@ public class LocalityUtil {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr =
BoundaryIterator.this.tr.getDatabase().createTransaction();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction();
return restartGet();
}
}
@ -222,6 +222,7 @@ public class LocalityUtil {
@Override
public void dispose() {
BoundaryIterator.this.tr.dispose();
block.dispose();
}
}
}

View File

@ -99,6 +99,13 @@ public class AsyncUtil {
Future<Void> complete = whileTrue(condition);
Future<List<V>> result = tag(complete, accumulator);
result.onReady(new Runnable() {
@Override
public void run() {
it.dispose();
}
});
return result;
}

View File

@ -854,7 +854,7 @@ public class DirectoryLayer implements Directory
tr.clear(Range.startsWith(nodeSubspace.unpack(node.getKey()).getBytes(0)));
tr.clear(node.range());
return AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
Future<Void> result = AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
@Override
public Future<Boolean> apply(Void ignore) {
Future<Void> subdirRemoveFuture;
@ -872,6 +872,15 @@ public class DirectoryLayer implements Directory
});
}
});
result.onReady(new Runnable() {
@Override
public void run() {
rangeItr.dispose();
}
});
return result;
}
private Future<Boolean> isPrefixFree(final ReadTransaction tr, final byte[] prefix) {
@ -888,14 +897,23 @@ public class DirectoryLayer implements Directory
if(node != null)
return new ReadyFuture<Boolean>(false);
AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext()
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
Future<Boolean> result = it.onHasNext()
.map(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean hasNext) {
return !hasNext;
}
});
result.onReady(new Runnable() {
@Override
public void run() {
it.dispose();
}
});
return result;
}
});
}
@ -1286,7 +1304,7 @@ public class DirectoryLayer implements Directory
@Override
public Future<Boolean> apply(Void ignore) {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext()
Future<Boolean> result = rangeItr.onHasNext()
.map(new Function<Boolean, Void>() {
@Override
public Void apply(Boolean hasNext) {
@ -1310,6 +1328,15 @@ public class DirectoryLayer implements Directory
return choosePrefix(tr, allocator); // false exits the loop (i.e. we have a valid prefix)
}
});
result.onReady(new Runnable() {
@Override
public void run() {
rangeItr.dispose();
}
});
return result;
}
})
.map(new Function<Void, byte[]>() {