Better approach to non-locked stats gathering (including knowing what thread will operate)

This commit is contained in:
Scott Fines 2021-03-08 09:45:25 -06:00
parent 6c4ce1769d
commit e080e989ac
6 changed files with 43 additions and 42 deletions

View File

@ -267,18 +267,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
}
pointerReadLock.lock();
try {
CompletableFuture<byte[]> future =
new FutureResult(Transaction_get(getPtr(), key, isSnapshot), executor);
if (eventKeeper != null) {
future = future.thenApply((bytes) -> {
if (bytes != null) {
eventKeeper.count(Events.BYTES_FETCHED, bytes.length);
}
return bytes;
});
}
return future;
return new FutureResult(Transaction_get(getPtr(), key, isSnapshot), executor,eventKeeper);
} finally {
pointerReadLock.unlock();
}
@ -298,18 +287,9 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
}
pointerReadLock.lock();
try {
CompletableFuture<byte[]> future = new FutureKey(
return new FutureKey(
Transaction_getKey(getPtr(), selector.getKey(), selector.orEqual(), selector.getOffset(), isSnapshot),
executor);
if (eventKeeper != null) {
future = future.thenApply((bytes) -> {
if (bytes != null) {
eventKeeper.count(Events.BYTES_FETCHED, bytes.length);
}
return bytes;
});
}
return future;
executor,eventKeeper);
} finally {
pointerReadLock.unlock();
}

View File

@ -22,16 +22,26 @@ package com.apple.foundationdb;
import java.util.concurrent.Executor;
import com.apple.foundationdb.EventKeeper.Events;
class FutureKey extends NativeFuture<byte[]> {
FutureKey(long cPtr, Executor executor) {
private final EventKeeper eventKeeper;
FutureKey(long cPtr, Executor executor,EventKeeper eventKeeper) {
super(cPtr);
this.eventKeeper = eventKeeper;
registerMarshalCallback(executor);
}
@Override
protected byte[] getIfDone_internal(long cPtr) throws FDBException {
byte[] bytes = FutureKey_get(cPtr);
return bytes;
return FutureKey_get(cPtr);
}
@Override
protected void postMarshal(byte[] value) {
if(value!=null && eventKeeper!=null){
eventKeeper.count(Events.BYTES_FETCHED,value.length);
}
}
private native byte[] FutureKey_get(long cPtr) throws FDBException;

View File

@ -22,10 +22,14 @@ package com.apple.foundationdb;
import java.util.concurrent.Executor;
class FutureResult extends NativeFuture<byte[]> {
import com.apple.foundationdb.EventKeeper.Events;
FutureResult(long cPtr, Executor executor) {
class FutureResult extends NativeFuture<byte[]> {
private final EventKeeper eventKeeper;
FutureResult(long cPtr, Executor executor,EventKeeper eventKeeper) {
super(cPtr);
this.eventKeeper = eventKeeper;
registerMarshalCallback(executor);
}
@ -34,5 +38,12 @@ class FutureResult extends NativeFuture<byte[]> {
return FutureResult_get(cPtr);
}
@Override
protected void postMarshal(byte[] value){
if(value!=null && eventKeeper!=null){
eventKeeper.count(Events.BYTES_FETCHED, value.length);
}
}
private native byte[] FutureResult_get(long cPtr) throws FDBException;
}

View File

@ -35,7 +35,7 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
}
@Override
protected void postMarshal() {
protected void postMarshal(RangeResultInfo rri) {
// We can't close because this class actually marshals on-demand
}

View File

@ -54,8 +54,8 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements AutoClose
}
private void marshalWhenDone() {
T val = null;
try {
T val = null;
boolean shouldComplete = false;
try {
pointerReadLock.lock();
@ -77,11 +77,11 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements AutoClose
} catch(Throwable t) {
completeExceptionally(t);
} finally {
postMarshal();
postMarshal(val);
}
}
protected void postMarshal() {
protected void postMarshal(T value) {
close();
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import com.apple.foundationdb.EventKeeper.Events;
@ -220,19 +221,18 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
nextChunk = null;
nextFuture = new CompletableFuture<>();
if (eventKeeper != null) {
eventKeeper.increment(Events.RANGE_QUERY_FETCHES);
final long sTime = System.nanoTime();
nextFuture = nextFuture.thenApply((bool) -> {
eventKeeper.timeNanos(Events.RANGE_QUERY_FETCH_TIME_NANOS, System.nanoTime() - sTime);
return bool;
});
}
final long sTime = System.nanoTime();
fetchingChunk = tr.getRange_internal(begin, end, rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(),
++iteration, snapshot, reverse);
fetchingChunk.whenComplete(new FetchComplete(fetchingChunk, nextFuture));
BiConsumer<RangeResultInfo,Throwable> cons = new FetchComplete(fetchingChunk,nextFuture);
if(eventKeeper!=null){
cons = cons.andThen((r,t)->{
eventKeeper.timeNanos(Events.RANGE_QUERY_FETCH_TIME_NANOS, System.nanoTime()-sTime);
});
}
fetchingChunk.whenComplete(cons);
}
@Override