Merge branch 'java-add-missing-dispose' into java-future-cleanup
This commit is contained in:
commit
4d734a4925
|
@ -27,10 +27,10 @@ import java.util.concurrent.Executor;
|
|||
* The {@code Cluster} represents a connection to a physical set of cooperating machines
|
||||
* running FoundationDB. A {@code Cluster} is opened with a reference to a cluster file.<br>
|
||||
* <br>
|
||||
* <b>Note:</b> {@code Cluster} objects must be disposed when no longer in use in order
|
||||
* to free associated native memory.
|
||||
* <b>Note:</b> {@code Cluster} objects must be {@link #close closed} when no longer in use
|
||||
* in order to free any associated resources.
|
||||
*/
|
||||
public class Cluster extends DefaultDisposableImpl implements Disposable {
|
||||
public class Cluster extends NativeObjectWrapper {
|
||||
private ClusterOptions options;
|
||||
private final Executor executor;
|
||||
|
||||
|
@ -60,8 +60,8 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
|
|||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
checkUndisposed("Cluster");
|
||||
dispose();
|
||||
checkUnclosed("Cluster");
|
||||
close();
|
||||
}
|
||||
finally {
|
||||
super.finalize();
|
||||
|
@ -96,7 +96,7 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void disposeInternal(long cPtr) {
|
||||
protected void closeInternal(long cPtr) {
|
||||
Cluster_dispose(cPtr);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,10 +37,10 @@ import java.util.function.Function;
|
|||
* executed. These methods will not return successfully until {@code commit()} has
|
||||
* returned successfully.<br>
|
||||
* <br>
|
||||
* <b>Note:</b> {@code Database} objects must be disposed when no longer in use in order
|
||||
* to free associated native memory.
|
||||
* <b>Note:</b> {@code Database} objects must be {@link #close closed} when no longer
|
||||
* in use in order to free any associated resources.
|
||||
*/
|
||||
public interface Database extends Disposable, TransactionContext {
|
||||
public interface Database extends AutoCloseable, TransactionContext {
|
||||
/**
|
||||
* Creates a {@link Transaction} that operates on this {@code Database}.<br>
|
||||
* <br>
|
||||
|
@ -211,4 +211,12 @@ public interface Database extends Disposable, TransactionContext {
|
|||
*/
|
||||
<T> CompletableFuture<T> runAsync(
|
||||
Function<? super Transaction, ? extends CompletableFuture<T>> retryable, Executor e);
|
||||
|
||||
/**
|
||||
* Close the {@code Database} object and release any associated resources. This must be called at
|
||||
* least once after the {@code Database} object is no longer in use. This can be called multiple
|
||||
* times, but care should be taken that it is not in use in another thread at the time of the call.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Disposable.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.apple.foundationdb;
|
||||
|
||||
/**
|
||||
* An object with native FoundationDB resources that must be freed. Failure to dispose of
|
||||
* {@code Disposable} objects will result in memory leaks.
|
||||
*/
|
||||
public interface Disposable {
|
||||
/**
|
||||
* Dispose of the object. This must be called once the object is no longer in use to
|
||||
* free any native resources associated with the object. This can be called multiple times,
|
||||
* but care should be taken that an object is not in use in another thread at the time of
|
||||
* the call.
|
||||
*/
|
||||
void dispose();
|
||||
}
|
|
@ -81,7 +81,7 @@ public class FDB {
|
|||
final int apiVersion;
|
||||
private volatile boolean netStarted = false;
|
||||
private volatile boolean netStopped = false;
|
||||
volatile boolean warnOnUndisposed = true;
|
||||
volatile boolean warnOnUnclosed = true;
|
||||
final private Semaphore netRunning = new Semaphore(1);
|
||||
private final NetworkOptions options;
|
||||
|
||||
|
@ -152,14 +152,14 @@ public class FDB {
|
|||
FDB fdb = new FDB(version);
|
||||
|
||||
if(version < 510) {
|
||||
fdb.warnOnUndisposed = false;
|
||||
fdb.warnOnUnclosed = false;
|
||||
}
|
||||
|
||||
return singleton = fdb;
|
||||
}
|
||||
|
||||
public void setUndisposedWarning(boolean warnOnUndisposed) {
|
||||
this.warnOnUndisposed = warnOnUndisposed;
|
||||
public void setUnclosedWarning(boolean warnOnUnclosed) {
|
||||
this.warnOnUnclosed = warnOnUnclosed;
|
||||
}
|
||||
|
||||
// Singleton is initialized to null and only set once by a call to selectAPIVersion
|
||||
|
@ -286,7 +286,7 @@ public class FDB {
|
|||
}
|
||||
Cluster c = f.join();
|
||||
Database db = c.openDatabase(e);
|
||||
c.dispose();
|
||||
c.close();
|
||||
|
||||
return db;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.function.Function;
|
|||
|
||||
import com.apple.foundationdb.async.AsyncUtil;
|
||||
|
||||
class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable, OptionConsumer {
|
||||
class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsumer {
|
||||
private DatabaseOptions options;
|
||||
private final Executor executor;
|
||||
|
||||
|
@ -57,7 +57,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
t.dispose();
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
|
|||
}, e);
|
||||
}, e)
|
||||
.thenApply(o -> returnValue.get())
|
||||
.whenComplete((v, t) -> trRef.get().dispose());
|
||||
.whenComplete((v, t) -> trRef.get().close());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,8 +102,8 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
|
|||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
checkUndisposed("Database");
|
||||
dispose();
|
||||
checkUnclosed("Database");
|
||||
close();
|
||||
}
|
||||
finally {
|
||||
super.finalize();
|
||||
|
@ -120,7 +120,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
|
|||
return tr;
|
||||
} catch(RuntimeException err) {
|
||||
if(tr != null) {
|
||||
tr.dispose();
|
||||
tr.close();
|
||||
}
|
||||
|
||||
throw err;
|
||||
|
@ -145,7 +145,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void disposeInternal(long cPtr) {
|
||||
protected void closeInternal(long cPtr) {
|
||||
Database_dispose(cPtr);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import java.util.function.Function;
|
|||
import com.apple.foundationdb.async.*;
|
||||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
|
||||
class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transaction, OptionConsumer {
|
||||
class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionConsumer {
|
||||
private final Database database;
|
||||
private final Executor executor;
|
||||
private final TransactionOptions options;
|
||||
|
@ -300,7 +300,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
return database;
|
||||
}
|
||||
|
||||
// Users of this function must dispose of the returned FutureResults when finished
|
||||
// Users of this function must close the returned FutureResults when finished
|
||||
protected FutureResults getRange_internal(
|
||||
KeySelector begin, KeySelector end,
|
||||
int rowLimit, int targetBytes, int streamingMode,
|
||||
|
@ -496,13 +496,13 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
return f.thenApply(v -> tr)
|
||||
.whenComplete((v, t) -> {
|
||||
if(t != null) {
|
||||
tr.dispose();
|
||||
tr.close();
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
if(!transactionOwner) {
|
||||
dispose();
|
||||
close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -537,7 +537,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
}
|
||||
catch(RuntimeException err) {
|
||||
if(tr != null) {
|
||||
tr.dispose();
|
||||
tr.close();
|
||||
}
|
||||
|
||||
throw err;
|
||||
|
@ -557,8 +557,8 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
checkUndisposed("Transaction");
|
||||
dispose();
|
||||
checkUnclosed("Transaction");
|
||||
close();
|
||||
}
|
||||
finally {
|
||||
super.finalize();
|
||||
|
@ -566,7 +566,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void disposeInternal(long cPtr) {
|
||||
protected void closeInternal(long cPtr) {
|
||||
if(transactionOwner) {
|
||||
Transaction_dispose(cPtr);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
|
|||
|
||||
@Override
|
||||
protected void postMarshal() {
|
||||
// We can't dispose because this class actually marshals on-demand
|
||||
// We can't close because this class actually marshals on-demand
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -30,7 +30,7 @@ import java.util.function.BiFunction;
|
|||
import com.apple.foundationdb.async.AsyncIterable;
|
||||
import com.apple.foundationdb.async.AsyncIterator;
|
||||
import com.apple.foundationdb.async.AsyncUtil;
|
||||
import com.apple.foundationdb.async.DisposableAsyncIterator;
|
||||
import com.apple.foundationdb.async.CloseableAsyncIterator;
|
||||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
|
||||
/**
|
||||
|
@ -42,7 +42,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
|
|||
*/
|
||||
public class LocalityUtil {
|
||||
/**
|
||||
* Returns a {@code DisposableAsyncIterator} of keys {@code k} such that
|
||||
* Returns a {@code CloseableAsyncIterator} of keys {@code k} such that
|
||||
* {@code begin <= k < end} and {@code k} is located at the start of a
|
||||
* contiguous range stored on a single server.<br>
|
||||
*<br>
|
||||
|
@ -55,12 +55,12 @@ public class LocalityUtil {
|
|||
*
|
||||
* @return an sequence of keys denoting the start of single-server ranges
|
||||
*/
|
||||
public static DisposableAsyncIterator<byte[]> getBoundaryKeys(Database db, byte[] begin, byte[] end) {
|
||||
public static CloseableAsyncIterator<byte[]> getBoundaryKeys(Database db, byte[] begin, byte[] end) {
|
||||
return getBoundaryKeys_internal(db.createTransaction(), begin, end);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@code DisposableAsyncIterator} of keys {@code k} such that
|
||||
* Returns a {@code CloseableAsyncIterator} of keys {@code k} such that
|
||||
* {@code begin <= k < end} and {@code k} is located at the start of a
|
||||
* contiguous range stored on a single server.<br>
|
||||
*<br>
|
||||
|
@ -81,7 +81,7 @@ public class LocalityUtil {
|
|||
*
|
||||
* @return an sequence of keys denoting the start of single-server ranges
|
||||
*/
|
||||
public static DisposableAsyncIterator<byte[]> getBoundaryKeys(Transaction tr, byte[] begin, byte[] end) {
|
||||
public static CloseableAsyncIterator<byte[]> getBoundaryKeys(Transaction tr, byte[] begin, byte[] end) {
|
||||
Transaction local = tr.getDatabase().createTransaction();
|
||||
CompletableFuture<Long> readVersion = tr.getReadVersion();
|
||||
if(readVersion.isDone() && !readVersion.isCompletedExceptionally()) {
|
||||
|
@ -112,11 +112,11 @@ public class LocalityUtil {
|
|||
return ((FDBTransaction)tr).getAddressesForKey(key);
|
||||
}
|
||||
|
||||
private static DisposableAsyncIterator<byte[]> getBoundaryKeys_internal(Transaction tr, byte[] begin, byte[] end) {
|
||||
private static CloseableAsyncIterator<byte[]> getBoundaryKeys_internal(Transaction tr, byte[] begin, byte[] end) {
|
||||
return new BoundaryIterator(tr, begin, end);
|
||||
}
|
||||
|
||||
static class BoundaryIterator implements DisposableAsyncIterator<byte[]> {
|
||||
static class BoundaryIterator implements CloseableAsyncIterator<byte[]> {
|
||||
Transaction tr;
|
||||
byte[] begin;
|
||||
byte[] lastBegin;
|
||||
|
@ -125,7 +125,7 @@ public class LocalityUtil {
|
|||
|
||||
AsyncIterator<KeyValue> block;
|
||||
private CompletableFuture<Boolean> nextFuture;
|
||||
private boolean disposed;
|
||||
private boolean closed;
|
||||
|
||||
BoundaryIterator(Transaction tr, byte[] begin, byte[] end) {
|
||||
this.tr = tr;
|
||||
|
@ -141,7 +141,7 @@ public class LocalityUtil {
|
|||
block = firstGet.iterator();
|
||||
nextFuture = AsyncUtil.composeHandleAsync(block.onHasNext(), handler, tr.getExecutor());
|
||||
|
||||
disposed = false;
|
||||
closed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,7 +177,7 @@ public class LocalityUtil {
|
|||
FDBException err = (FDBException) o;
|
||||
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
|
||||
Executor executor = BoundaryIterator.this.tr.getExecutor();
|
||||
BoundaryIterator.this.tr.dispose();
|
||||
BoundaryIterator.this.tr.close();
|
||||
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction(executor);
|
||||
return restartGet();
|
||||
}
|
||||
|
@ -213,22 +213,18 @@ public class LocalityUtil {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
BoundaryIterator.this.tr.dispose();
|
||||
disposed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
cancel();
|
||||
public void close() {
|
||||
BoundaryIterator.this.tr.close();
|
||||
closed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
if(FDB.getInstance().warnOnUndisposed && !disposed) {
|
||||
System.err.println("DisposableAsyncIterator not disposed (getBoundaryKeys)");
|
||||
if(FDB.getInstance().warnOnUnclosed && !closed) {
|
||||
System.err.println("CloseableAsyncIterator not closed (getBoundaryKeys)");
|
||||
}
|
||||
close();
|
||||
}
|
||||
finally {
|
||||
super.finalize();
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.concurrent.Executor;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposable {
|
||||
abstract class NativeFuture<T> extends CompletableFuture<T> implements AutoCloseable {
|
||||
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
protected final Lock pointerReadLock = rwl.readLock();
|
||||
|
||||
|
@ -42,7 +42,7 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposabl
|
|||
// lead to a race where the marshalWhenDone tries to run on an
|
||||
// unconstructed subclass.
|
||||
//
|
||||
// Since this must be called from a constructor, we assume that dispose
|
||||
// Since this must be called from a constructor, we assume that close
|
||||
// cannot be called concurrently.
|
||||
protected void registerMarshalCallback(Executor executor) {
|
||||
if(cPtr != 0) {
|
||||
|
@ -69,10 +69,8 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposabl
|
|||
complete(val);
|
||||
}
|
||||
} catch(FDBException t) {
|
||||
assert(t.getCode() != 2015); // future_not_set not possible
|
||||
if(t.getCode() != 1102) { // future_released
|
||||
completeExceptionally(t);
|
||||
}
|
||||
assert(t.getCode() != 1102 && t.getCode() != 2015); // future_released, future_not_set not possible
|
||||
completeExceptionally(t);
|
||||
} catch(Throwable t) {
|
||||
completeExceptionally(t);
|
||||
} finally {
|
||||
|
@ -81,13 +79,13 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposabl
|
|||
}
|
||||
|
||||
protected void postMarshal() {
|
||||
dispose();
|
||||
close();
|
||||
}
|
||||
|
||||
abstract protected T getIfDone_internal(long cPtr) throws FDBException;
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
public void close() {
|
||||
long ptr = 0;
|
||||
|
||||
rwl.writeLock().lock();
|
||||
|
@ -100,7 +98,7 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposabl
|
|||
if(ptr != 0) {
|
||||
Future_dispose(ptr);
|
||||
if(!isDone()) {
|
||||
completeExceptionally(new IllegalStateException("Future has been disposed"));
|
||||
completeExceptionally(new IllegalStateException("Future has been closed"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -127,7 +125,7 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements Disposabl
|
|||
assert( rwl.getReadHoldCount() > 0 );
|
||||
|
||||
if(cPtr == 0)
|
||||
throw new IllegalStateException("Cannot access disposed object");
|
||||
throw new IllegalStateException("Cannot access closed object");
|
||||
|
||||
return cPtr;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* DefaultDisposableImpl.java
|
||||
* NativeObjectWrapper.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -23,56 +23,53 @@ package com.apple.foundationdb;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
abstract class DefaultDisposableImpl implements Disposable {
|
||||
abstract class NativeObjectWrapper implements AutoCloseable {
|
||||
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
protected final Lock pointerReadLock = rwl.readLock();
|
||||
|
||||
private boolean disposed = false;
|
||||
private boolean closed = false;
|
||||
private long cPtr;
|
||||
|
||||
public DefaultDisposableImpl() {
|
||||
}
|
||||
|
||||
public DefaultDisposableImpl(long cPtr) {
|
||||
public NativeObjectWrapper(long cPtr) {
|
||||
this.cPtr = cPtr;
|
||||
if(this.cPtr == 0)
|
||||
this.disposed = true;
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
public boolean isDisposed() {
|
||||
public boolean isClosed() {
|
||||
// we must have a read lock for this function to make sense, however it
|
||||
// does not make sense to take the lock here, since the code that uses
|
||||
// the result must inherently have the read lock itself.
|
||||
assert( rwl.getReadHoldCount() > 0 );
|
||||
|
||||
return disposed;
|
||||
return closed;
|
||||
}
|
||||
|
||||
public void checkUndisposed(String context) {
|
||||
public void checkUnclosed(String context) {
|
||||
try {
|
||||
if(FDB.getInstance().warnOnUndisposed && !disposed) {
|
||||
System.err.println(context + " not disposed");
|
||||
if(FDB.getInstance().warnOnUnclosed && !closed) {
|
||||
System.err.println(context + " not closed");
|
||||
}
|
||||
}
|
||||
catch(Exception e) {}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
public void close() {
|
||||
rwl.writeLock().lock();
|
||||
long ptr = 0;
|
||||
try {
|
||||
if(disposed)
|
||||
if(closed)
|
||||
return;
|
||||
|
||||
ptr = cPtr;
|
||||
this.cPtr = 0;
|
||||
disposed = true;
|
||||
closed = true;
|
||||
} finally {
|
||||
rwl.writeLock().unlock();
|
||||
}
|
||||
|
||||
disposeInternal(ptr);
|
||||
closeInternal(ptr);
|
||||
}
|
||||
|
||||
protected long getPtr() {
|
||||
|
@ -81,11 +78,11 @@ abstract class DefaultDisposableImpl implements Disposable {
|
|||
// the result must inherently have the read lock itself.
|
||||
assert( rwl.getReadHoldCount() > 0 );
|
||||
|
||||
if(this.disposed)
|
||||
throw new IllegalStateException("Cannot access disposed object");
|
||||
if(this.closed)
|
||||
throw new IllegalStateException("Cannot access closed object");
|
||||
|
||||
return this.cPtr;
|
||||
}
|
||||
|
||||
protected abstract void disposeInternal(long cPtr);
|
||||
protected abstract void closeInternal(long cPtr);
|
||||
}
|
|
@ -85,7 +85,7 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
|
|||
this.begin, this.end, this.rowLimit, 0, StreamingMode.EXACT.code(),
|
||||
1, this.snapshot, this.reverse);
|
||||
return range.thenApply(result -> result.get().values)
|
||||
.whenComplete((result, e) -> range.dispose());
|
||||
.whenComplete((result, e) -> range.close());
|
||||
}
|
||||
|
||||
// If the streaming mode is not EXACT, simply collect the results of an iteration into a list
|
||||
|
@ -197,7 +197,7 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
|
|||
promise.complete(Boolean.TRUE);
|
||||
}
|
||||
finally {
|
||||
fetchingChunk.dispose();
|
||||
fetchingChunk.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,10 +70,10 @@ import com.apple.foundationdb.tuple.Tuple;
|
|||
* option. This is because the Java bindings disallow use of {@code Transaction} objects after {@link #onError}
|
||||
* is called.<br>
|
||||
* <br>
|
||||
* <b>Note:</b> {@code Transaction} objects must be disposed when no longer in use in order
|
||||
* to free associated native memory.
|
||||
* <b>Note:</b> {@code Transaction} objects must be {@link #close closed} when no longer
|
||||
* in use in order to free any associated resources.
|
||||
*/
|
||||
public interface Transaction extends Disposable, ReadTransaction, TransactionContext {
|
||||
public interface Transaction extends AutoCloseable, ReadTransaction, TransactionContext {
|
||||
|
||||
/**
|
||||
* Return special-purpose, read-only view of the database. Reads done through this interface are known as "snapshot reads".
|
||||
|
@ -369,4 +369,11 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
|
|||
<T> CompletableFuture<T> runAsync(
|
||||
Function<? super Transaction, ? extends CompletableFuture<T>> retryable);
|
||||
|
||||
/**
|
||||
* Close the {@code Transaction object and release any associated resources. This must be called at
|
||||
* least once after the {@code Transaction} object is no longer in use. This can be called multiple
|
||||
* times, but care should be taken that it is not in use in another thread at the time of the call.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
|
|
|
@ -204,16 +204,16 @@ public class AsyncUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Map a {@code DisposableAsyncIterator} into a {@code DisposableAsyncIterator} of another type or with
|
||||
* Map a {@code CloseableAsyncIterator} into a {@code CloseableAsyncIterator} of another type or with
|
||||
* each element modified in some fashion.
|
||||
*
|
||||
* @param iterator input
|
||||
* @param func mapping function applied to each element
|
||||
* @return a new iterator with each element mapped to a different value
|
||||
*/
|
||||
public static <V, T> DisposableAsyncIterator<T> mapIterator(final DisposableAsyncIterator<V> iterator,
|
||||
public static <V, T> CloseableAsyncIterator<T> mapIterator(final CloseableAsyncIterator<V> iterator,
|
||||
final Function<V, T> func) {
|
||||
return new DisposableAsyncIterator<T>() {
|
||||
return new CloseableAsyncIterator<T>() {
|
||||
@Override
|
||||
public void remove() {
|
||||
iterator.remove();
|
||||
|
@ -240,8 +240,8 @@ public class AsyncUtil {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
iterator.dispose();
|
||||
public void close() {
|
||||
iterator.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -302,7 +302,7 @@ public class AsyncUtil {
|
|||
*
|
||||
* @param body the asynchronous operation over which to loop
|
||||
*
|
||||
* @return a {@code PartialFuture} which will be set at completion of the loop.
|
||||
* @return a {@link CompletableFuture} which will be set at completion of the loop.
|
||||
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier) whileTrue} that takes a
|
||||
* {@link Supplier} instead.
|
||||
*/
|
||||
|
@ -317,7 +317,7 @@ public class AsyncUtil {
|
|||
* @param body the asynchronous operation over which to loop
|
||||
* @param executor the {@link Executor} to use for asynchronous operations
|
||||
*
|
||||
* @return a {@code PartialFuture} which will be set at completion of the loop.
|
||||
* @return a {@link CompletableFuture} which will be set at completion of the loop.
|
||||
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier, Executor) whileTrue} that takes a
|
||||
* {@link Supplier} instead.
|
||||
*/
|
||||
|
@ -331,7 +331,7 @@ public class AsyncUtil {
|
|||
*
|
||||
* @param body the asynchronous operation over which to loop
|
||||
*
|
||||
* @return a {@code PartialFuture} which will be set at completion of the loop.
|
||||
* @return a {@link CompletableFuture} which will be set at completion of the loop.
|
||||
*/
|
||||
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body) {
|
||||
return whileTrue(body, DEFAULT_EXECUTOR);
|
||||
|
@ -343,7 +343,7 @@ public class AsyncUtil {
|
|||
* @param body the asynchronous operation over which to loop
|
||||
* @param executor the {@link Executor} to use for asynchronous operations
|
||||
*
|
||||
* @return a {@code PartialFuture} which will be set at completion of the loop.
|
||||
* @return a {@link CompletableFuture} which will be set at completion of the loop.
|
||||
*/
|
||||
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body, Executor executor) {
|
||||
return new LoopPartial(body, executor).run();
|
||||
|
@ -432,10 +432,10 @@ public class AsyncUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return a {@code CompletableFuture} that will be set when any of the {@code PartialFuture}
|
||||
* Return a {@code CompletableFuture} that will be set when any of the {@link CompletableFuture}
|
||||
* inputs are done. A {@code CompletableFuture} is done both on success and failure.
|
||||
*
|
||||
* @param input the list of {@code PartialFuture}s to monitor. This list
|
||||
* @param input the list of {@link CompletableFuture}s to monitor. This list
|
||||
* <b>must not</b> be modified during the execution of this call.
|
||||
*
|
||||
* @return a signal that will be set when any of the {@code CompletableFuture}s are done
|
||||
|
@ -447,10 +447,10 @@ public class AsyncUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return a {@code CompletableFuture} that will be set when all the {@code PartialFuture}
|
||||
* Return a {@code CompletableFuture} that will be set when all the {@link CompletableFuture}
|
||||
* inputs are done. A {@code CompletableFuture} is done both on success and failure.
|
||||
*
|
||||
* @param input the list of {@code PartialFuture}s to monitor. This list
|
||||
* @param input the list of {@link CompletableFuture}s to monitor. This list
|
||||
* <b>must not</b> be modified during the execution of this call.
|
||||
*
|
||||
* @return a signal that will be set when all of the {@code CompletableFuture}s are done
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* DisposableAsyncIterator.java
|
||||
* CloseableAsyncIterator.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -20,12 +20,27 @@
|
|||
|
||||
package com.apple.foundationdb.async;
|
||||
|
||||
import com.apple.foundationdb.Disposable;
|
||||
|
||||
/**
|
||||
* A version of {@link AsyncIterator} that holds native FDB resources and
|
||||
* must be disposed once it is no longer in use.
|
||||
* A version of {@link AsyncIterator} that must be closed once no longer in use in order to free
|
||||
* any associated resources.
|
||||
*
|
||||
* @param <T> the type of object yielded by {@code next()}
|
||||
*/
|
||||
public interface DisposableAsyncIterator<T> extends Disposable, AsyncIterator<T> {}
|
||||
public interface CloseableAsyncIterator<T> extends AutoCloseable, AsyncIterator<T> {
|
||||
/**
|
||||
* Cancels any outstanding asynchronous work, closes the iterator, and frees any associated
|
||||
* resources. This must be called at least once after the object is no longer in use. This
|
||||
* can be called multiple times, but care should be taken that an object is not in use
|
||||
* in another thread at the time of the call.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Alias for {@link #close}.
|
||||
*/
|
||||
@Override
|
||||
default void cancel() {
|
||||
close();
|
||||
}
|
||||
}
|
|
@ -792,7 +792,7 @@ public class AsyncStackTester {
|
|||
Transaction tr = db.createTransaction();
|
||||
|
||||
return tr.getRange(nextKey, endKey, 1000).asList()
|
||||
.whenComplete((x, t) -> tr.dispose())
|
||||
.whenComplete((x, t) -> tr.close())
|
||||
.thenComposeAsync(new Function<List<KeyValue>, CompletableFuture<Void>>() {
|
||||
@Override
|
||||
public CompletableFuture<Void> apply(List<KeyValue> next) {
|
||||
|
@ -886,7 +886,7 @@ public class AsyncStackTester {
|
|||
byte[] bs = db.createTransaction().get(key).get();
|
||||
System.out.println("output of " + ByteArrayUtil.printable(key) + " as: " + ByteArrayUtil.printable(bs));*/
|
||||
|
||||
db.dispose();
|
||||
db.close();
|
||||
System.gc();
|
||||
|
||||
/*fdb.stopNetwork();
|
||||
|
|
|
@ -37,7 +37,7 @@ import com.apple.foundationdb.Transaction;
|
|||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
import com.apple.foundationdb.tuple.Tuple;
|
||||
|
||||
abstract class Context implements Runnable {
|
||||
abstract class Context implements Runnable, AutoCloseable {
|
||||
final Stack stack = new Stack();
|
||||
final Database db;
|
||||
final String preStr;
|
||||
|
@ -104,7 +104,7 @@ abstract class Context implements Runnable {
|
|||
if(count.decrementAndGet() == 0) {
|
||||
assert !transactionMap.containsValue(tr);
|
||||
transactionRefCounts.remove(tr);
|
||||
tr.dispose();
|
||||
tr.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -140,7 +140,7 @@ abstract class Context implements Runnable {
|
|||
public void newTransaction(Transaction oldTr) {
|
||||
Transaction newTr = db.createTransaction();
|
||||
if(!updateCurrentTransaction(oldTr, newTr)) {
|
||||
newTr.dispose();
|
||||
newTr.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -212,9 +212,10 @@ abstract class Context implements Runnable {
|
|||
return done.thenApplyAsync((x) -> params);
|
||||
}
|
||||
|
||||
void dispose() {
|
||||
@Override
|
||||
public void close() {
|
||||
for(Transaction tr : transactionMap.values()) {
|
||||
tr.dispose();
|
||||
tr.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import com.apple.foundationdb.Database;
|
|||
import com.apple.foundationdb.FDB;
|
||||
import com.apple.foundationdb.LocalityUtil;
|
||||
import com.apple.foundationdb.Transaction;
|
||||
import com.apple.foundationdb.async.DisposableAsyncIterator;
|
||||
import com.apple.foundationdb.async.CloseableAsyncIterator;
|
||||
import com.apple.foundationdb.async.AsyncUtil;
|
||||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
|
||||
|
@ -46,13 +46,13 @@ public class LocalityTests {
|
|||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
DisposableAsyncIterator<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[] { (byte)255 } );
|
||||
CloseableAsyncIterator<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[] { (byte)255 } );
|
||||
CompletableFuture<List<byte[]>> collection = AsyncUtil.collect(keys);
|
||||
List<byte[]> list = collection.join();
|
||||
System.out.println("Took " + (System.currentTimeMillis() - start) + "ms to get " +
|
||||
list.size() + " items");
|
||||
|
||||
keys.dispose();
|
||||
keys.close();
|
||||
|
||||
int i = 0;
|
||||
for(byte[] key : collection.join()) {
|
||||
|
|
|
@ -114,8 +114,8 @@ public class RangeTest {
|
|||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
//db.dispose();
|
||||
//cluster.dispose();
|
||||
//db.close();
|
||||
//cluster.close();
|
||||
|
||||
tr = db.createTransaction();
|
||||
checkRange(tr);
|
||||
|
@ -154,8 +154,8 @@ public class RangeTest {
|
|||
System.out.println("range comparisons okay");
|
||||
}
|
||||
|
||||
db.dispose();
|
||||
//cluster.dispose();
|
||||
db.close();
|
||||
//cluster.close();
|
||||
//fdb.stopNetwork();
|
||||
System.out.println("Done with test program");
|
||||
}
|
||||
|
|
|
@ -37,15 +37,13 @@ import com.apple.foundationdb.KeyValue;
|
|||
import com.apple.foundationdb.LocalityUtil;
|
||||
import com.apple.foundationdb.MutationType;
|
||||
import com.apple.foundationdb.Range;
|
||||
import com.apple.foundationdb.ReadTransaction;
|
||||
import com.apple.foundationdb.StreamingMode;
|
||||
import com.apple.foundationdb.Transaction;
|
||||
import com.apple.foundationdb.async.AsyncIterable;
|
||||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
import com.apple.foundationdb.async.DisposableAsyncIterator;
|
||||
import com.apple.foundationdb.tuple.Tuple;
|
||||
|
||||
import com.apple.foundationdb.async.AsyncUtil;
|
||||
import com.apple.foundationdb.async.CloseableAsyncIterator;
|
||||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
import com.apple.foundationdb.tuple.Tuple;
|
||||
|
||||
/**
|
||||
* Implements a cross-binding test of the FoundationDB API.
|
||||
|
@ -520,7 +518,7 @@ public class StackTester {
|
|||
while(true) {
|
||||
Transaction t = db.createTransaction();
|
||||
List<KeyValue> keyValues = t.getRange(begin, endKey/*, 1000*/).asList().join();
|
||||
t.dispose();
|
||||
t.close();
|
||||
if(keyValues.size() == 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -677,7 +675,7 @@ public class StackTester {
|
|||
tr.options().setTimeout(60*1000);
|
||||
tr.options().setReadSystemKeys();
|
||||
tr.getReadVersion().join();
|
||||
DisposableAsyncIterator<byte[]> boundaryKeys = LocalityUtil.getBoundaryKeys(
|
||||
CloseableAsyncIterator<byte[]> boundaryKeys = LocalityUtil.getBoundaryKeys(
|
||||
tr, new byte[0], new byte[]{(byte) 255, (byte) 255});
|
||||
try {
|
||||
List<byte[]> keys = AsyncUtil.collect(boundaryKeys).join();
|
||||
|
@ -696,7 +694,7 @@ public class StackTester {
|
|||
return null;
|
||||
}
|
||||
finally {
|
||||
boundaryKeys.dispose();
|
||||
boundaryKeys.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -722,7 +720,7 @@ public class StackTester {
|
|||
//System.out.println("Starting test...");
|
||||
c.run();
|
||||
//System.out.println("Done with test.");
|
||||
db.dispose();
|
||||
db.close();
|
||||
System.gc();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue