Merge pull request #112 from cie/executor-marshall-on-callback
<rdar://problem/32413365> Java Bindings: Execute marshalling and callbacks off of network thread
This commit is contained in:
parent
16cc0821b1
commit
84d4bfe749
|
@ -183,7 +183,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
public CompletableFuture<Long> getReadVersion() {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureVersion( Transaction_getReadVersion(getPtr()));
|
||||
return new FutureVersion( Transaction_getReadVersion(getPtr()), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -200,7 +200,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
private CompletableFuture<byte[]> get_internal(byte[] key, boolean isSnapshot) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureResult( Transaction_get(getPtr(), key, isSnapshot));
|
||||
return new FutureResult( Transaction_get(getPtr(), key, isSnapshot), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -218,7 +218,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureKey( Transaction_getKey(getPtr(),
|
||||
selector.getKey(), selector.orEqual(), selector.getOffset(), isSnapshot));
|
||||
selector.getKey(), selector.orEqual(), selector.getOffset(), isSnapshot), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
return new FutureResults(Transaction_getRange(
|
||||
getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
|
||||
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
|
||||
streamingMode, iteration, isSnapshot, reverse));
|
||||
streamingMode, iteration, isSnapshot, reverse), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -442,7 +442,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
public CompletableFuture<Void> commit() {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureVoid(Transaction_commit(getPtr()));
|
||||
return new FutureVoid(Transaction_commit(getPtr()), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -462,7 +462,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
public CompletableFuture<byte[]> getVersionstamp() {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureKey(Transaction_getVersionstamp(getPtr()));
|
||||
return new FutureKey(Transaction_getVersionstamp(getPtr()), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -472,7 +472,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
public CompletableFuture<Void> watch(byte[] key) throws FDBException {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureVoid(Transaction_watch(getPtr(), key));
|
||||
return new FutureVoid(Transaction_watch(getPtr(), key), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -490,7 +490,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
}
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
CompletableFuture<Void> f = new FutureVoid(Transaction_onError(getPtr(), ((FDBException)e).getCode()));
|
||||
CompletableFuture<Void> f = new FutureVoid(Transaction_onError(getPtr(), ((FDBException)e).getCode()), executor);
|
||||
final Transaction tr = transfer();
|
||||
return f.thenApply(v -> tr)
|
||||
.whenComplete((v, t) -> {
|
||||
|
@ -533,7 +533,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
|
|||
public CompletableFuture<String[]> getAddressesForKey(byte[] key) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureStrings(Transaction_getKeyLocations(getPtr(), key));
|
||||
return new FutureStrings(Transaction_getKeyLocations(getPtr(), key), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ class FutureCluster extends NativeFuture<Cluster> {
|
|||
protected FutureCluster(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
this.executor = executor;
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,7 +28,7 @@ class FutureDatabase extends NativeFuture<Database> {
|
|||
FutureDatabase(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
this.executor = executor;
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,12 +20,12 @@
|
|||
|
||||
package com.apple.cie.foundationdb;
|
||||
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureKey extends NativeFuture<byte[]> {
|
||||
FutureKey(long cPtr) {
|
||||
FutureKey(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,11 +20,12 @@
|
|||
|
||||
package com.apple.cie.foundationdb;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureResult extends NativeFuture<byte[]> {
|
||||
FutureResult(long cPtr) {
|
||||
FutureResult(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,11 +20,12 @@
|
|||
|
||||
package com.apple.cie.foundationdb;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureResults extends NativeFuture<RangeResultInfo> {
|
||||
FutureResults(long cPtr) {
|
||||
FutureResults(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,11 +20,12 @@
|
|||
|
||||
package com.apple.cie.foundationdb;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureStrings extends NativeFuture<String[]> {
|
||||
FutureStrings(long cPtr) {
|
||||
FutureStrings(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,11 +20,12 @@
|
|||
|
||||
package com.apple.cie.foundationdb;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureVersion extends NativeFuture<Long> {
|
||||
FutureVersion(long cPtr) {
|
||||
FutureVersion(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,11 +20,12 @@
|
|||
|
||||
package com.apple.cie.foundationdb;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureVoid extends NativeFuture<Void> {
|
||||
FutureVoid(long cPtr) {
|
||||
FutureVoid(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback();
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
package com.apple.cie.foundationdb;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
abstract class NativeFuture<T> extends CompletableFuture<T> {
|
||||
protected final long cPtr;
|
||||
|
@ -35,13 +36,8 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
|
|||
// constructor of this class because a quickly completing future can
|
||||
// lead to a race where the marshalWhenDone tries to run on an
|
||||
// unconstructed subclass.
|
||||
protected void registerMarshalCallback() {
|
||||
Future_registerCallback(cPtr, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
NativeFuture.this.marshalWhenDone();
|
||||
}
|
||||
});
|
||||
protected void registerMarshalCallback(Executor executor) {
|
||||
Future_registerCallback(cPtr, () -> executor.execute(this::marshalWhenDone));
|
||||
}
|
||||
|
||||
private void marshalWhenDone() {
|
||||
|
|
Loading…
Reference in New Issue