Addressing PR comments
This commit is contained in:
parent
c6731cc053
commit
75be7243e8
|
@ -112,7 +112,7 @@ class EventKeeperTest {
|
|||
|
||||
// now check the timer and see if it recorded any events
|
||||
Assertions.assertEquals(1, timer.getCount(Events.RANGE_QUERY_FETCHES), "Unexpected number of chunk fetches");
|
||||
Assertions.assertEquals(testKvs.size(), timer.getCount(Events.RANGE_QUERY_TUPLES_FETCHED),
|
||||
Assertions.assertEquals(testKvs.size(), timer.getCount(Events.RANGE_QUERY_RECORDS_FETCHED),
|
||||
"Unexpected number of tuples fetched");
|
||||
Assertions.assertEquals(expectedByteSize, timer.getCount(Events.BYTES_FETCHED),
|
||||
"Incorrect number of bytes fetched");
|
||||
|
@ -124,12 +124,7 @@ class EventKeeperTest {
|
|||
|
||||
@Override
|
||||
public void count(Event event, long amt) {
|
||||
Long currCnt = counterMap.get(event);
|
||||
if (currCnt == null) {
|
||||
counterMap.put(event, amt);
|
||||
} else {
|
||||
counterMap.put(event, currCnt + amt);
|
||||
}
|
||||
counterMap.compute(event, (e,present)->present==null? amt:amt+present);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* TransactionTimer.java
|
||||
* EventKeeper.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -160,7 +160,7 @@ public interface EventKeeper {
|
|||
/**
|
||||
* The number of tuples fetched during a range query
|
||||
*/
|
||||
RANGE_QUERY_TUPLES_FETCHED,
|
||||
RANGE_QUERY_RECORDS_FETCHED,
|
||||
/**
|
||||
* The number of times a range query chunk fetch failed
|
||||
*/
|
||||
|
|
|
@ -428,7 +428,7 @@ public class FDB {
|
|||
*
|
||||
* @return a {@code CompletableFuture} that will be set to a FoundationDB {@link Database}
|
||||
*/
|
||||
public Database open(String clusterFilePath, Executor e,EventKeeper eventKeeper) throws FDBException {
|
||||
public Database open(String clusterFilePath, Executor e, EventKeeper eventKeeper) throws FDBException {
|
||||
synchronized(this) {
|
||||
if(!isConnected()) {
|
||||
startNetwork();
|
||||
|
|
|
@ -191,7 +191,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
this(cPtr,database,executor,null);
|
||||
}
|
||||
|
||||
protected FDBTransaction(long cPtr, Database database, Executor executor,EventKeeper eventKeeper) {
|
||||
protected FDBTransaction(long cPtr, Database database, Executor executor, EventKeeper eventKeeper) {
|
||||
super(cPtr);
|
||||
this.database = database;
|
||||
this.executor = executor;
|
||||
|
@ -318,7 +318,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
@Override
|
||||
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
|
||||
int limit, boolean reverse, StreamingMode mode) {
|
||||
return new RangeQuery(this, false, begin, end, limit, reverse, mode,eventKeeper);
|
||||
return new RangeQuery(this, false, begin, end, limit, reverse, mode, eventKeeper);
|
||||
}
|
||||
@Override
|
||||
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
|
||||
|
@ -406,7 +406,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
|
||||
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
|
||||
streamingMode, iteration, isSnapshot, reverse),
|
||||
FDB.instance().isDirectBufferQueriesEnabled(), executor,eventKeeper);
|
||||
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import com.apple.foundationdb.EventKeeper.Events;
|
|||
|
||||
class FutureResults extends NativeFuture<RangeResultInfo> {
|
||||
private final EventKeeper eventKeeper;
|
||||
FutureResults(long cPtr, boolean enableDirectBufferQueries, Executor executor,EventKeeper eventKeeper) {
|
||||
FutureResults(long cPtr, boolean enableDirectBufferQueries, Executor executor, EventKeeper eventKeeper) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback(executor);
|
||||
this.enableDirectBufferQueries = enableDirectBufferQueries;
|
||||
|
@ -54,25 +54,23 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
|
|||
}
|
||||
|
||||
public RangeResult getResults() {
|
||||
ByteBuffer buffer = enableDirectBufferQueries
|
||||
? DirectBufferPool.getInstance().poll()
|
||||
: null;
|
||||
ByteBuffer buffer = enableDirectBufferQueries ? DirectBufferPool.getInstance().poll() : null;
|
||||
if (buffer != null && eventKeeper != null) {
|
||||
eventKeeper.increment(Events.RANGE_QUERY_DIRECT_BUFFER_HIT);
|
||||
eventKeeper.increment(Events.JNI_CALL);
|
||||
} else if (eventKeeper != null) {
|
||||
eventKeeper.increment(Events.RANGE_QUERY_DIRECT_BUFFER_MISS);
|
||||
eventKeeper.increment(Events.JNI_CALL);
|
||||
}
|
||||
|
||||
try {
|
||||
pointerReadLock.lock();
|
||||
if (buffer != null) {
|
||||
if (eventKeeper != null) {
|
||||
eventKeeper.increment(Events.RANGE_QUERY_DIRECT_BUFFER_HIT);
|
||||
eventKeeper.increment(Events.JNI_CALL);
|
||||
}
|
||||
try (DirectBufferIterator directIterator = new DirectBufferIterator(buffer)) {
|
||||
FutureResults_getDirect(getPtr(), directIterator.getBuffer(), directIterator.getBuffer().capacity());
|
||||
return new RangeResult(directIterator);
|
||||
}
|
||||
} else {
|
||||
if (eventKeeper != null) {
|
||||
eventKeeper.increment(Events.RANGE_QUERY_DIRECT_BUFFER_MISS);
|
||||
eventKeeper.increment(Events.JNI_CALL);
|
||||
}
|
||||
return FutureResults_get(getPtr());
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -53,7 +53,7 @@ abstract class NativeFuture<T> extends CompletableFuture<T> implements AutoClose
|
|||
}
|
||||
}
|
||||
|
||||
void marshalWhenDone() {
|
||||
private void marshalWhenDone() {
|
||||
try {
|
||||
T val = null;
|
||||
boolean shouldComplete = false;
|
||||
|
|
|
@ -288,7 +288,7 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
|
|||
// (note: account for the length fields as well when recording the bytes
|
||||
// fetched)
|
||||
eventKeeper.count(Events.BYTES_FETCHED, result.getKey().length + result.getValue().length + 8);
|
||||
eventKeeper.increment(Events.RANGE_QUERY_TUPLES_FETCHED);
|
||||
eventKeeper.increment(Events.RANGE_QUERY_RECORDS_FETCHED);
|
||||
}
|
||||
|
||||
// If this is the first call to next() on a chunk there cannot
|
||||
|
|
Loading…
Reference in New Issue