java-bindings: Use DirectBuffer with standard Async call
This commit is contained in:
parent
be184a9dc2
commit
9123ffb1bf
|
@ -22,14 +22,13 @@ set(JAVA_BINDING_SRCS
|
|||
src/main/com/apple/foundationdb/directory/NoSuchDirectoryException.java
|
||||
src/main/com/apple/foundationdb/directory/package-info.java
|
||||
src/main/com/apple/foundationdb/directory/PathUtil.java
|
||||
src/main/com/apple/foundationdb/DirectRangeQuery.java
|
||||
src/main/com/apple/foundationdb/DirectRangeIterator.java
|
||||
src/main/com/apple/foundationdb/DirectBufferIterator.java
|
||||
src/main/com/apple/foundationdb/FDB.java
|
||||
src/main/com/apple/foundationdb/FDBDatabase.java
|
||||
src/main/com/apple/foundationdb/FDBTransaction.java
|
||||
src/main/com/apple/foundationdb/FutureInt64.java
|
||||
src/main/com/apple/foundationdb/FutureKey.java
|
||||
src/main/com/apple/foundationdb/FutureDirectResults.java
|
||||
src/main/com/apple/foundationdb/FutureResult.java
|
||||
src/main/com/apple/foundationdb/FutureResults.java
|
||||
src/main/com/apple/foundationdb/FutureStrings.java
|
||||
|
|
|
@ -604,7 +604,22 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
return (jlong)f;
|
||||
}
|
||||
|
||||
void getDirectRangeQueryResults(JNIEnv *jenv, FDBFuture* f, uint8_t* buffer, int bufferCapacity) {
|
||||
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getDirect(
|
||||
JNIEnv* jenv, jobject, jlong future, jobject jbuffer, jint bufferCapacity) {
|
||||
|
||||
if( !future ) {
|
||||
throwParamNotNull(jenv);
|
||||
return;
|
||||
}
|
||||
|
||||
uint8_t* buffer = (uint8_t*)jenv->GetDirectBufferAddress(jbuffer);
|
||||
if (!buffer) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return;
|
||||
}
|
||||
|
||||
FDBFuture* f = (FDBFuture*)future;
|
||||
const FDBKeyValue *kvs;
|
||||
int count;
|
||||
fdb_bool_t more;
|
||||
|
@ -624,11 +639,10 @@ void getDirectRangeQueryResults(JNIEnv *jenv, FDBFuture* f, uint8_t* buffer, int
|
|||
|
||||
for(int i = 0; i < count; i++) {
|
||||
totalCapacityNeeded += kvs[i].key_length + kvs[i].value_length;
|
||||
}
|
||||
|
||||
if (bufferCapacity < totalCapacityNeeded) {
|
||||
throwRuntimeEx( jenv, "Error getting handle to native resources" );
|
||||
return;
|
||||
if (bufferCapacity < totalCapacityNeeded) {
|
||||
count = i; /* Only fit first `i` K/V pairs */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int offset = 0;
|
||||
|
@ -665,89 +679,6 @@ void getDirectRangeQueryResults(JNIEnv *jenv, FDBFuture* f, uint8_t* buffer, int
|
|||
}
|
||||
}
|
||||
|
||||
int readKeySelector(uint8_t* buffer, jint* numBytes, jint* orEqual, jint* offset, uint8_t** bytes) {
|
||||
memcpy(numBytes, buffer, sizeof(jint));
|
||||
memcpy(orEqual, buffer + sizeof(jint), sizeof(jint));
|
||||
memcpy(offset, buffer + 2*sizeof(jint), sizeof(jint));
|
||||
*bytes = buffer + sizeof(jint) * 3;
|
||||
return *numBytes + sizeof(jint) * 3;
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getDirectRange(
|
||||
JNIEnv* jenv, jobject, jlong tPtr, jobject jbuffer, jint bufferCapacity) {
|
||||
|
||||
if (!tPtr || !jbuffer) {
|
||||
throwParamNotNull(jenv);
|
||||
return;
|
||||
}
|
||||
|
||||
uint8_t* buffer = (uint8_t*)jenv->GetDirectBufferAddress(jbuffer);
|
||||
if (!buffer) {
|
||||
if (!jenv->ExceptionOccurred()) throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return;
|
||||
}
|
||||
|
||||
int readOffset = 0;
|
||||
|
||||
jint lengthBegin;
|
||||
jint orEqualBegin;
|
||||
jint offsetBegin;
|
||||
uint8_t* barrBegin;
|
||||
readOffset += readKeySelector(buffer + readOffset, &lengthBegin, &orEqualBegin, &offsetBegin, &barrBegin);
|
||||
|
||||
jint lengthEnd;
|
||||
jint orEqualEnd;
|
||||
jint offsetEnd;
|
||||
uint8_t* barrEnd;
|
||||
readOffset += readKeySelector(buffer + readOffset, &lengthEnd, &orEqualEnd, &offsetEnd, &barrEnd);
|
||||
|
||||
jint rowLimit;
|
||||
memcpy(&rowLimit, buffer + readOffset, sizeof(jint));
|
||||
readOffset += sizeof(jint);
|
||||
|
||||
jint targetBytes;
|
||||
memcpy(&targetBytes, buffer + readOffset, sizeof(jint));
|
||||
readOffset += sizeof(jint);
|
||||
|
||||
jint streamingMode;
|
||||
memcpy(&streamingMode, buffer + readOffset, sizeof(jint));
|
||||
readOffset += sizeof(jint);
|
||||
|
||||
jint iteration;
|
||||
memcpy(&iteration, buffer + readOffset, sizeof(jint));
|
||||
readOffset += sizeof(jint);
|
||||
|
||||
jint snapshot;
|
||||
memcpy(&snapshot, buffer + readOffset, sizeof(jint));
|
||||
readOffset += sizeof(jint);
|
||||
|
||||
jint reverse;
|
||||
memcpy(&reverse, buffer + readOffset, sizeof(jint));
|
||||
readOffset += sizeof(jint);
|
||||
|
||||
|
||||
// Update targetBytes based on our buffer capacity
|
||||
if (targetBytes < bufferCapacity - readOffset*2) {
|
||||
// Rough slack to consider return metadata
|
||||
targetBytes = bufferCapacity - readOffset*2;
|
||||
}
|
||||
|
||||
FDBTransaction* tr = (FDBTransaction*)tPtr;
|
||||
|
||||
FDBFuture* f = fdb_transaction_get_range(tr, barrBegin, lengthBegin, orEqualBegin, offsetBegin, barrEnd, lengthEnd,
|
||||
orEqualEnd, offsetEnd, rowLimit, targetBytes,
|
||||
(FDBStreamingMode)streamingMode, iteration, snapshot, reverse);
|
||||
|
||||
if (fdb_future_block_until_ready(f) != 0) {
|
||||
if (!jenv->ExceptionOccurred()) throwRuntimeEx(jenv, "Error executing get_range() query");
|
||||
fdb_future_destroy(f);
|
||||
return;
|
||||
}
|
||||
|
||||
getDirectRangeQueryResults(jenv, f, buffer + readOffset, bufferCapacity - readOffset) ;
|
||||
fdb_future_destroy(f);
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getEstimatedRangeSizeBytes(JNIEnv *jenv, jobject, jlong tPtr,
|
||||
jbyteArray beginKeyBytes, jbyteArray endKeyBytes) {
|
||||
if( !tPtr || !beginKeyBytes || !endKeyBytes) {
|
||||
|
|
|
@ -34,12 +34,14 @@ import java.util.concurrent.CompletableFuture;
|
|||
*/
|
||||
class BufferPool {
|
||||
static final BufferPool __instance = new BufferPool();
|
||||
static private final int NUM_BUFFERS = 32;
|
||||
static private final int NUM_BUFFERS = 128;
|
||||
private ArrayBlockingQueue<ByteBuffer> buffers = new ArrayBlockingQueue<>(NUM_BUFFERS);
|
||||
|
||||
public BufferPool() {
|
||||
while (buffers.size() < NUM_BUFFERS) {
|
||||
buffers.add(ByteBuffer.allocateDirect(1024 * 1024 * 4));
|
||||
ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 512);
|
||||
assert (buffer != null);
|
||||
buffers.add(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,8 +52,8 @@ class BufferPool {
|
|||
/**
|
||||
* Requests a {@link #DirectByteBuffer} from our pool, and block if needed.
|
||||
*/
|
||||
public synchronized ByteBuffer take() throws InterruptedException {
|
||||
return buffers.take();
|
||||
public synchronized ByteBuffer poll() {
|
||||
return buffers.poll();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -85,10 +87,8 @@ class DirectBufferIterator implements Iterator<KeyValue>, Closeable {
|
|||
private RangeResultSummary summary;
|
||||
private final CompletableFuture<Boolean> promise = new CompletableFuture<>();
|
||||
|
||||
public DirectBufferIterator() {}
|
||||
|
||||
public void init() throws InterruptedException {
|
||||
byteBuffer = BufferPool.getInstance().take();
|
||||
public DirectBufferIterator() {
|
||||
byteBuffer = BufferPool.getInstance().poll();
|
||||
byteBuffer.order(ByteOrder.nativeOrder());
|
||||
}
|
||||
|
||||
|
@ -140,10 +140,10 @@ class DirectBufferIterator implements Iterator<KeyValue>, Closeable {
|
|||
return summary;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return String.format("DirectBufferIterator{KeyCount=%d, Current=%d, More=%b, LastKey=\"%s\", Ref=%s}\n",
|
||||
summary.keyCount, current, summary.more, summary.lastKey, super.toString());
|
||||
}
|
||||
// public String toString() {
|
||||
// return String.format("DirectBufferIterator{KeyCount=%d, Current=%d, More=%b, LastKey=\"%s\", Ref=%s}\n",
|
||||
// summary.keyCount, current, summary.more, summary.lastKey, super.toString());
|
||||
// }
|
||||
|
||||
public int currentIndex() {
|
||||
return current;
|
||||
|
@ -151,7 +151,7 @@ class DirectBufferIterator implements Iterator<KeyValue>, Closeable {
|
|||
|
||||
public void readSummary() {
|
||||
byteBuffer.rewind();
|
||||
byteBuffer.position(resultOffset);
|
||||
byteBuffer.position(0);
|
||||
|
||||
final int keyCount = byteBuffer.getInt();
|
||||
final boolean more = byteBuffer.getInt() > 0;
|
||||
|
@ -165,29 +165,4 @@ class DirectBufferIterator implements Iterator<KeyValue>, Closeable {
|
|||
|
||||
summary = new RangeResultSummary(lastKey, keyCount, more);
|
||||
}
|
||||
|
||||
public void prepareRequest(byte[] begin, boolean beginOrEqual, int beginOffset, byte[] end, boolean endOrEqual,
|
||||
int endOffset, int rowLimit, int targetBytes, int streamingMode, int iteration, boolean isSnapshot,
|
||||
boolean reverse) {
|
||||
|
||||
// IMPORTANT!! Make sure the order is same when read in fdbJNI.cpp.
|
||||
byteBuffer.rewind();
|
||||
byteBuffer.putInt(begin.length);
|
||||
byteBuffer.putInt(beginOrEqual ? 1 : 0);
|
||||
byteBuffer.putInt(beginOffset);
|
||||
byteBuffer.put(begin);
|
||||
|
||||
byteBuffer.putInt(end.length);
|
||||
byteBuffer.putInt(endOrEqual ? 1 : 0);
|
||||
byteBuffer.putInt(endOffset);
|
||||
byteBuffer.put(end);
|
||||
|
||||
byteBuffer.putInt(rowLimit);
|
||||
byteBuffer.putInt(targetBytes);
|
||||
byteBuffer.putInt(streamingMode);
|
||||
byteBuffer.putInt(iteration);
|
||||
byteBuffer.putInt(isSnapshot ? 1 : 0);
|
||||
byteBuffer.putInt(reverse ? 1 : 0);
|
||||
resultOffset = byteBuffer.position();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,266 +0,0 @@
|
|||
/*
|
||||
* DirectRangeIterator.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2015-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.apple.foundationdb;
|
||||
|
||||
import com.apple.foundationdb.async.AsyncIterator;
|
||||
import com.apple.foundationdb.async.AsyncUtil;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* Represents the {@code Iterator} used by {@link DirectRangeQuery}. This class is
|
||||
* responsible to actually make the requests to FDB server as we stream along. Make sure
|
||||
* {@code Close()} is called to avoid leaks of {@link DirectBufferIterator} from {@link BufferPool}.
|
||||
*/
|
||||
// TODO (Vishesh): DRY. Shares lot of code with RangeQuery::Iterator.
|
||||
class DirectRangeIterator implements AsyncIterator<KeyValue> {
|
||||
private final FDBTransaction tr;
|
||||
|
||||
// immutable aspects of this iterator
|
||||
private final boolean rowsLimited;
|
||||
private final boolean reverse;
|
||||
private final StreamingMode streamingMode;
|
||||
private final boolean snapshot;
|
||||
|
||||
// There is the chance for parallelism in the two "chunks" for fetched data
|
||||
private Queue<DirectBufferIterator> resultChunks = new ArrayDeque<DirectBufferIterator>();
|
||||
private CompletableFuture<DirectBufferIterator> fetchingChunk;
|
||||
private CompletableFuture<Boolean> nextFuture;
|
||||
|
||||
private boolean isCancelled = false;
|
||||
private boolean fetchOutstanding = false;
|
||||
|
||||
private byte[] prevKey = null;
|
||||
private int iteration = 0;
|
||||
private KeySelector begin;
|
||||
private KeySelector end;
|
||||
|
||||
private int rowsRemaining;
|
||||
|
||||
DirectRangeIterator(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end,
|
||||
int rowLimit, boolean reverse, StreamingMode streamingMode) {
|
||||
this.tr = transaction;
|
||||
this.begin = begin;
|
||||
this.end = end;
|
||||
this.snapshot = isSnapshot;
|
||||
this.rowsLimited = rowLimit != 0;
|
||||
this.rowsRemaining = rowLimit;
|
||||
this.reverse = reverse;
|
||||
this.streamingMode = streamingMode;
|
||||
|
||||
startNextFetch();
|
||||
}
|
||||
|
||||
private DirectBufferIterator currentChunk() {
|
||||
return resultChunks.peek();
|
||||
}
|
||||
|
||||
private synchronized void startNextFetch() {
|
||||
if (fetchOutstanding)
|
||||
throw new IllegalStateException("Reentrant call not allowed"); // This can not be called reentrantly
|
||||
|
||||
if (isCancelled) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (currentChunk() != null && mainChunkIsTheLast()) {
|
||||
return;
|
||||
}
|
||||
|
||||
fetchOutstanding = true;
|
||||
|
||||
DirectBufferIterator iter = new DirectBufferIterator();
|
||||
nextFuture = iter.onResultReady();
|
||||
|
||||
fetchingChunk = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
iter.init();
|
||||
tr.getDirectRange_internal(iter, begin, end, rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(),
|
||||
++iteration, snapshot, reverse);
|
||||
return iter;
|
||||
} catch (InterruptedException e) {
|
||||
synchronized (DirectRangeIterator.this) {
|
||||
iter.close();
|
||||
}
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
}, tr.getExecutor());
|
||||
|
||||
fetchingChunk.whenComplete(new FetchComplete(fetchingChunk));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompletableFuture<Boolean> onHasNext() {
|
||||
if (isCancelled)
|
||||
throw new CancellationException();
|
||||
|
||||
// This will only happen before the first fetch has completed
|
||||
if (currentChunk() == null) {
|
||||
return nextFuture;
|
||||
}
|
||||
|
||||
// We have a chunk and are still working though it
|
||||
if (currentChunk().hasNext()) {
|
||||
return AsyncUtil.READY_TRUE;
|
||||
}
|
||||
|
||||
// If we are at the end of the current chunk there is either:
|
||||
// - no more data -or-
|
||||
// - we are already fetching the next block
|
||||
return mainChunkIsTheLast() ? AsyncUtil.READY_FALSE : nextFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return onHasNext().join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue next() {
|
||||
CompletableFuture<Boolean> nextFuture;
|
||||
synchronized (this) {
|
||||
if (isCancelled)
|
||||
throw new CancellationException();
|
||||
|
||||
freeChunks();
|
||||
|
||||
if (currentChunk() != null && currentChunk().hasNext()) {
|
||||
// If this is the first call to next() on a chunk, then we will want to
|
||||
// start fetching the data for the next block
|
||||
boolean fetchNextChunk = (currentChunk().currentIndex() == 0);
|
||||
|
||||
KeyValue result = currentChunk().next();
|
||||
prevKey = result.getKey();
|
||||
|
||||
freeChunks();
|
||||
|
||||
if (fetchNextChunk) {
|
||||
startNextFetch();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
nextFuture = onHasNext();
|
||||
}
|
||||
|
||||
// If there was no result ready then we need to wait on the future
|
||||
// and return the proper result, throwing if there are no more elements
|
||||
return nextFuture.thenApply(hasNext -> {
|
||||
if (hasNext) {
|
||||
return next();
|
||||
}
|
||||
throw new NoSuchElementException();
|
||||
}).join();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void remove() {
|
||||
if (prevKey == null)
|
||||
throw new IllegalStateException("No value has been fetched from database");
|
||||
|
||||
tr.clear(prevKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void cancel() {
|
||||
isCancelled = true;
|
||||
fetchingChunk.cancel(true);
|
||||
nextFuture.cancel(true);
|
||||
while (!resultChunks.isEmpty()) {
|
||||
DirectBufferIterator it = resultChunks.poll();
|
||||
it.onResultReady().cancel(true);
|
||||
it.close();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized boolean mainChunkIsTheLast() {
|
||||
return !currentChunk().getSummary().more || (rowsLimited && rowsRemaining < 1);
|
||||
}
|
||||
|
||||
private boolean isNextChunkReady() {
|
||||
return resultChunks.size() > 1;
|
||||
}
|
||||
|
||||
private void freeChunks() {
|
||||
if (currentChunk() != null && !currentChunk().hasNext() && isNextChunkReady()) {
|
||||
resultChunks.poll().close();
|
||||
}
|
||||
}
|
||||
|
||||
class FetchComplete implements BiConsumer<DirectBufferIterator, Throwable> {
|
||||
final CompletableFuture<DirectBufferIterator> fetchingChunk;
|
||||
|
||||
FetchComplete(CompletableFuture<DirectBufferIterator> fetch) {
|
||||
this.fetchingChunk = fetch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(DirectBufferIterator iter, Throwable error) {
|
||||
CompletableFuture<Boolean> promise = iter.onResultReady();
|
||||
if (error != null) {
|
||||
promise.completeExceptionally(error);
|
||||
if (error instanceof Error) {
|
||||
throw (Error) error;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (DirectRangeIterator.this) {
|
||||
iter.readSummary();
|
||||
}
|
||||
|
||||
final RangeResultSummary summary = iter.getSummary();
|
||||
if (summary.lastKey == null) {
|
||||
promise.complete(Boolean.FALSE);
|
||||
iter.close();
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (DirectRangeIterator.this) {
|
||||
// adjust the total number of rows we should ever fetch
|
||||
rowsRemaining -= summary.keyCount;
|
||||
|
||||
// set up the next fetch
|
||||
if (reverse) {
|
||||
end = KeySelector.firstGreaterOrEqual(summary.lastKey);
|
||||
} else {
|
||||
begin = KeySelector.firstGreaterThan(summary.lastKey);
|
||||
}
|
||||
|
||||
fetchOutstanding = false;
|
||||
freeChunks();
|
||||
resultChunks.offer(iter);
|
||||
}
|
||||
|
||||
promise.complete(Boolean.TRUE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,91 +0,0 @@
|
|||
/*
|
||||
* DirectRangeQuery.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2015-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.apple.foundationdb;
|
||||
|
||||
import com.apple.foundationdb.async.AsyncIterable;
|
||||
import com.apple.foundationdb.async.AsyncIterator;
|
||||
import com.apple.foundationdb.async.AsyncUtil;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Represents a query against FoundationDB for a range of keys. The
|
||||
* result of this query can be iterated over in a blocking fashion with a call to
|
||||
* {@link #iterator()} (as specified by {@link Iterable}).
|
||||
* If the calling program uses an asynchronous paradigm, a non-blocking
|
||||
* {@link AsyncIterator} is returned from {@link #iterator()}. Both of these
|
||||
* constructions will not begin to query the database until the first call to
|
||||
* {@code hasNext()}. As the query uses its {@link Transaction} of origin to fetch
|
||||
* all the data, the use of this query object must not span more than a few seconds.
|
||||
*
|
||||
* <br><br><b>NOTE:</b> although resulting {@code Iterator}s do support the {@code remove()}
|
||||
* operation, the remove is not durable until {@code commit()} on the {@code Transaction}
|
||||
* that yielded this query returns <code>true</code>.
|
||||
*
|
||||
* NOTE: This differs from {@link RangeQuery}, by the fact that it uses much higher performance
|
||||
* {@link DirectByteBuffer} and {@link getDirectRange_internal} instead of {@link getRange_internal}
|
||||
* which avoid several JNI calls, and memory allocations.
|
||||
*/
|
||||
public class DirectRangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
|
||||
private final FDBTransaction tr;
|
||||
private KeySelector begin;
|
||||
private KeySelector end;
|
||||
private final boolean snapshot;
|
||||
private final int rowLimit;
|
||||
private final boolean reverse;
|
||||
private final StreamingMode streamingMode;
|
||||
|
||||
DirectRangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit,
|
||||
boolean reverse, StreamingMode streamingMode) {
|
||||
this.tr = transaction;
|
||||
this.begin = begin;
|
||||
this.end = end;
|
||||
this.snapshot = isSnapshot;
|
||||
this.rowLimit = rowLimit;
|
||||
this.reverse = reverse;
|
||||
this.streamingMode = streamingMode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncIterator<KeyValue> iterator() {
|
||||
return new DirectRangeIterator(tr, snapshot, begin, end, rowLimit, reverse, streamingMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<KeyValue>> asList() {
|
||||
StreamingMode mode = this.streamingMode;
|
||||
if (mode == StreamingMode.ITERATOR)
|
||||
mode = (this.rowLimit == 0) ? StreamingMode.WANT_ALL : StreamingMode.EXACT;
|
||||
|
||||
// TODO (Vishesh) Update for DirectRangeQuery? We may not have big enough `DirectByteBuffer`.
|
||||
// If the streaming mode is EXACT, try and grab things as one chunk
|
||||
if (mode == StreamingMode.EXACT) {
|
||||
FutureResults range = tr.getRange_internal(this.begin, this.end, this.rowLimit, 0,
|
||||
StreamingMode.EXACT.code(), 1, this.snapshot, this.reverse);
|
||||
return range.thenApply(result -> result.get().values).whenComplete((result, e) -> range.close());
|
||||
}
|
||||
|
||||
// If the streaming mode is not EXACT, simply collect the results of an
|
||||
// iteration into a list
|
||||
return AsyncUtil.collect(new DirectRangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode),
|
||||
tr.getExecutor());
|
||||
}
|
||||
}
|
|
@ -87,7 +87,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
@Override
|
||||
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
|
||||
int limit, boolean reverse, StreamingMode mode) {
|
||||
return new DirectRangeQuery(FDBTransaction.this, true, begin, end, limit, reverse, mode);
|
||||
return new RangeQuery(FDBTransaction.this, true, begin, end, limit, reverse, mode);
|
||||
}
|
||||
@Override
|
||||
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
|
||||
|
@ -289,7 +289,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
@Override
|
||||
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
|
||||
int limit, boolean reverse, StreamingMode mode) {
|
||||
return new DirectRangeQuery(this, false, begin, end, limit, reverse, mode);
|
||||
return new RangeQuery(this, false, begin, end, limit, reverse, mode);
|
||||
}
|
||||
@Override
|
||||
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
|
||||
|
@ -379,25 +379,6 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
}
|
||||
}
|
||||
|
||||
// Users of this function must close the returned FutureResults when finished
|
||||
protected void getDirectRange_internal(
|
||||
DirectBufferIterator iterator,
|
||||
KeySelector begin, KeySelector end,
|
||||
int rowLimit, int targetBytes, int streamingMode,
|
||||
int iteration, boolean isSnapshot, boolean reverse) {
|
||||
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
iterator.prepareRequest(begin.getKey(), begin.orEqual(), begin.getOffset(),
|
||||
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
|
||||
streamingMode, iteration, isSnapshot, reverse);
|
||||
Transaction_getDirectRange(getPtr(), iterator.getBuffer(), iterator.getBuffer().capacity());
|
||||
iterator.readSummary();
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addReadConflictRangeIfNotSnapshot(byte[] keyBegin, byte[] keyEnd) {
|
||||
addReadConflictRange(keyBegin, keyEnd);
|
||||
|
@ -687,7 +668,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
boolean isSnapshot, boolean reverse);
|
||||
private native void Transaction_getDirectRange(long cPtr, ByteBuffer jbuffer, int bufferCapacity);
|
||||
private native long Transaction_getDirectRange(long cPtr, ByteBuffer jbuffer, int bufferCapacity);
|
||||
private native void Transaction_addConflictRange(long cPtr,
|
||||
byte[] keyBegin, byte[] keyEnd, int conflictRangeType);
|
||||
private native void Transaction_set(long cPtr, byte[] key, byte[] value);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package com.apple.foundationdb;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureResults extends NativeFuture<RangeResultInfo> {
|
||||
|
@ -45,13 +46,22 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
|
|||
}
|
||||
|
||||
public RangeResult getResults() {
|
||||
DirectBufferIterator directIterator = new DirectBufferIterator();
|
||||
try {
|
||||
pointerReadLock.lock();
|
||||
return FutureResults_get(getPtr());
|
||||
if (directIterator.getBuffer() != null) {
|
||||
FutureResults_getDirect(getPtr(), directIterator.getBuffer(), directIterator.getBuffer().capacity());
|
||||
return new RangeResult(directIterator);
|
||||
} else {
|
||||
return FutureResults_get(getPtr());
|
||||
}
|
||||
} finally {
|
||||
directIterator.close();
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private native RangeResult FutureResults_get(long cPtr) throws FDBException;
|
||||
private native void FutureResults_getDirect(long cPtr, ByteBuffer buffer, int capacity)
|
||||
throws FDBException;
|
||||
}
|
||||
|
|
|
@ -52,6 +52,18 @@ class RangeResult {
|
|||
this.more = more;
|
||||
}
|
||||
|
||||
RangeResult(DirectBufferIterator iterator) {
|
||||
iterator.readSummary();
|
||||
more = iterator.getSummary().more;
|
||||
|
||||
int count = iterator.getSummary().keyCount;
|
||||
values = new ArrayList<KeyValue>(count);
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
values.add(iterator.next());
|
||||
}
|
||||
}
|
||||
|
||||
public RangeResultSummary getSummary() {
|
||||
final int keyCount = values.size();
|
||||
final byte[] lastKey = keyCount > 0 ? values.get(keyCount -1).getKey() : null;
|
||||
|
|
Loading…
Reference in New Issue