Update the Java bindings to call add missing dispose calls.

This commit is contained in:
A.J. Beamon 2017-11-15 15:56:50 -08:00
parent 02a9978612
commit db017317ac
8 changed files with 87 additions and 23 deletions

View File

@ -88,10 +88,9 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
return true;
});
}, e).thenCompose(x -> x);
}, e).thenApply(o -> {
trRef.get().dispose();
return returnValue.get();
});
}, e)
.thenApply(o -> returnValue.get())
.whenComplete((v, t) -> trRef.get().dispose());
}
@Override
@ -109,11 +108,16 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
@Override
public Transaction createTransaction(Executor e) {
pointerReadLock.lock();
Transaction tr = null;
try {
Transaction tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr.options().setUsedDuringCommitProtectionDisable();
return tr;
} finally {
if(tr != null) {
tr.dispose();
}
pointerReadLock.unlock();
}
}

View File

@ -168,6 +168,7 @@ public class LocalityUtil {
}
lastBegin = begin;
tr.options().setReadSystemKeys();
block.dispose();
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
@ -185,8 +186,7 @@ public class LocalityUtil {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr =
BoundaryIterator.this.tr.getDatabase().createTransaction();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction();
return restartGet();
}
}
@ -228,6 +228,7 @@ public class LocalityUtil {
@Override
public void dispose() {
BoundaryIterator.this.tr.dispose();
block.dispose();
}
}
}

View File

@ -100,7 +100,7 @@ public class AsyncUtil {
CompletableFuture<Void> complete = whileTrue(condition, executor);
CompletableFuture<List<V>> result = tag(complete, accumulator);
return result;
return result.whenComplete((v, t) -> it.dispose());
}
/**

View File

@ -748,7 +748,8 @@ public class DirectoryLayer implements Directory
}
}, tr.getExecutor());
}
}, tr.getExecutor());
}, tr.getExecutor())
.whenComplete((v, t) -> rangeItr.dispose());
}
private CompletableFuture<Boolean> isPrefixFree(final ReadTransaction tr, final byte[] prefix) {
@ -767,14 +768,15 @@ public class DirectoryLayer implements Directory
//return new ReadyFuture<Boolean>(false);
return CompletableFuture.completedFuture(false);
AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext()
.thenApply(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean hasNext) {
return !hasNext;
}
});
})
.whenComplete((v, t) -> it.dispose());
}
}, tr.getExecutor());
}
@ -1180,6 +1182,7 @@ public class DirectoryLayer implements Directory
return null;
}
})
.whenComplete((v, t) -> rangeItr.dispose())
.thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void ignore) {

View File

@ -121,7 +121,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
public <T> Future<T> runAsync(final Function<? super Transaction, Future<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<Transaction>(createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<T>();
return AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
Future<T> result = AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
@Override
public Future<Boolean> apply(Void v) {
Future<T> process = AsyncUtil.applySafely(retryable, trRef.get());
@ -153,10 +153,18 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
}).map(new Function<Void, T>(){
@Override
public T apply(Void o) {
trRef.get().dispose();
return returnValue.get();
}
});
result.onReady(new Runnable() {
@Override
public void run() {
trRef.get().dispose();
}
});
return result;
}
@Override
@ -180,7 +188,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
public <T> PartialFuture<T> runAsync(final PartialFunction<? super Transaction, ? extends PartialFuture<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<Transaction>(createTransaction());
final AtomicReference<T> returnValue = new AtomicReference<T>();
return AsyncUtil.whileTrue(new Function<Void, PartialFuture<Boolean>>() {
PartialFuture<T> result = AsyncUtil.whileTrue(new Function<Void, PartialFuture<Boolean>>() {
@Override
public PartialFuture<Boolean> apply(Void v) {
PartialFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());
@ -209,13 +217,21 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
}
});
}
}).map(new Function<Void, T>(){
}).map(new Function<Void, T>() {
@Override
public T apply(Void o) {
trRef.get().dispose();
return returnValue.get();
}
});
result.onReady(new Runnable() {
@Override
public void run() {
trRef.get().dispose();
}
});
return result;
}
@Override
@ -244,11 +260,16 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
@Override
public Transaction createTransaction(Executor e) {
pointerReadLock.lock();
Transaction tr = null;
try {
Transaction tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr = new FDBTransaction(Database_createTransaction(getPtr()), this, e);
tr.options().setUsedDuringCommitProtectionDisable();
return tr;
} finally {
if(tr != null) {
tr.dispose();
}
pointerReadLock.unlock();
}
}

View File

@ -165,6 +165,7 @@ public class LocalityUtil {
}
lastBegin = begin;
tr.options().setReadSystemKeys();
block.dispose();
block = tr.getRange(
keyServersForKey(begin),
keyServersForKey(end)).iterator();
@ -179,8 +180,7 @@ public class LocalityUtil {
FDBException err = (FDBException) o;
if(err.getCode() == 1007 && !Arrays.equals(begin, lastBegin)) {
BoundaryIterator.this.tr.dispose();
BoundaryIterator.this.tr =
BoundaryIterator.this.tr.getDatabase().createTransaction();
BoundaryIterator.this.tr = BoundaryIterator.this.tr.getDatabase().createTransaction();
return restartGet();
}
}
@ -222,6 +222,7 @@ public class LocalityUtil {
@Override
public void dispose() {
BoundaryIterator.this.tr.dispose();
block.dispose();
}
}
}

View File

@ -99,6 +99,13 @@ public class AsyncUtil {
Future<Void> complete = whileTrue(condition);
Future<List<V>> result = tag(complete, accumulator);
result.onReady(new Runnable() {
@Override
public void run() {
it.dispose();
}
});
return result;
}

View File

@ -842,7 +842,7 @@ public class DirectoryLayer implements Directory
tr.clear(Range.startsWith(nodeSubspace.unpack(node.getKey()).getBytes(0)));
tr.clear(node.range());
return AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
Future<Void> result = AsyncUtil.whileTrue(new Function<Void, Future<Boolean>>() {
@Override
public Future<Boolean> apply(Void ignore) {
Future<Void> subdirRemoveFuture;
@ -860,6 +860,15 @@ public class DirectoryLayer implements Directory
});
}
});
result.onReady(new Runnable() {
@Override
public void run() {
rangeItr.dispose();
}
});
return result;
}
private Future<Boolean> isPrefixFree(final ReadTransaction tr, final byte[] prefix) {
@ -876,14 +885,23 @@ public class DirectoryLayer implements Directory
if(node != null)
return new ReadyFuture<Boolean>(false);
AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext()
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
Future<Boolean> result = it.onHasNext()
.map(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean hasNext) {
return !hasNext;
}
});
result.onReady(new Runnable() {
@Override
public void run() {
it.dispose();
}
});
return result;
}
});
}
@ -1274,7 +1292,7 @@ public class DirectoryLayer implements Directory
@Override
public Future<Boolean> apply(Void ignore) {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext()
Future<Boolean> result = rangeItr.onHasNext()
.map(new Function<Boolean, Void>() {
@Override
public Void apply(Boolean hasNext) {
@ -1298,6 +1316,15 @@ public class DirectoryLayer implements Directory
return choosePrefix(tr, allocator); // false exits the loop (i.e. we have a valid prefix)
}
});
result.onReady(new Runnable() {
@Override
public void run() {
rangeItr.dispose();
}
});
return result;
}
})
.map(new Function<Void, byte[]>() {