diff --git a/bindings/java/CMakeLists.txt b/bindings/java/CMakeLists.txt index ae64a97e07..b4d5234e87 100644 --- a/bindings/java/CMakeLists.txt +++ b/bindings/java/CMakeLists.txt @@ -22,14 +22,13 @@ set(JAVA_BINDING_SRCS src/main/com/apple/foundationdb/directory/NoSuchDirectoryException.java src/main/com/apple/foundationdb/directory/package-info.java src/main/com/apple/foundationdb/directory/PathUtil.java - src/main/com/apple/foundationdb/DirectRangeQuery.java - src/main/com/apple/foundationdb/DirectRangeIterator.java src/main/com/apple/foundationdb/DirectBufferIterator.java src/main/com/apple/foundationdb/FDB.java src/main/com/apple/foundationdb/FDBDatabase.java src/main/com/apple/foundationdb/FDBTransaction.java src/main/com/apple/foundationdb/FutureInt64.java src/main/com/apple/foundationdb/FutureKey.java + src/main/com/apple/foundationdb/FutureDirectResults.java src/main/com/apple/foundationdb/FutureResult.java src/main/com/apple/foundationdb/FutureResults.java src/main/com/apple/foundationdb/FutureStrings.java diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index d6954873a0..37079599c0 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -604,7 +604,22 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1 return (jlong)f; } -void getDirectRangeQueryResults(JNIEnv *jenv, FDBFuture* f, uint8_t* buffer, int bufferCapacity) { +JNIEXPORT void JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getDirect( + JNIEnv* jenv, jobject, jlong future, jobject jbuffer, jint bufferCapacity) { + + if( !future ) { + throwParamNotNull(jenv); + return; + } + + uint8_t* buffer = (uint8_t*)jenv->GetDirectBufferAddress(jbuffer); + if (!buffer) { + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return; + } + + FDBFuture* f = (FDBFuture*)future; const FDBKeyValue *kvs; int count; fdb_bool_t more; @@ -624,11 +639,10 @@ void getDirectRangeQueryResults(JNIEnv *jenv, FDBFuture* f, uint8_t* buffer, int for(int i = 0; i < count; i++) { totalCapacityNeeded += kvs[i].key_length + kvs[i].value_length; - } - - if (bufferCapacity < totalCapacityNeeded) { - throwRuntimeEx( jenv, "Error getting handle to native resources" ); - return; + if (bufferCapacity < totalCapacityNeeded) { + count = i; /* Only fit first `i` K/V pairs */ + break; + } } int offset = 0; @@ -665,89 +679,6 @@ void getDirectRangeQueryResults(JNIEnv *jenv, FDBFuture* f, uint8_t* buffer, int } } -int readKeySelector(uint8_t* buffer, jint* numBytes, jint* orEqual, jint* offset, uint8_t** bytes) { - memcpy(numBytes, buffer, sizeof(jint)); - memcpy(orEqual, buffer + sizeof(jint), sizeof(jint)); - memcpy(offset, buffer + 2*sizeof(jint), sizeof(jint)); - *bytes = buffer + sizeof(jint) * 3; - return *numBytes + sizeof(jint) * 3; -} - -JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getDirectRange( - JNIEnv* jenv, jobject, jlong tPtr, jobject jbuffer, jint bufferCapacity) { - - if (!tPtr || !jbuffer) { - throwParamNotNull(jenv); - return; - } - - uint8_t* buffer = (uint8_t*)jenv->GetDirectBufferAddress(jbuffer); - if (!buffer) { - if (!jenv->ExceptionOccurred()) throwRuntimeEx(jenv, "Error getting handle to native resources"); - return; - } - - int readOffset = 0; - - jint lengthBegin; - jint orEqualBegin; - jint offsetBegin; - uint8_t* barrBegin; - readOffset += readKeySelector(buffer + readOffset, &lengthBegin, &orEqualBegin, &offsetBegin, &barrBegin); - - jint lengthEnd; - jint orEqualEnd; - jint offsetEnd; - uint8_t* barrEnd; - readOffset += readKeySelector(buffer + readOffset, &lengthEnd, &orEqualEnd, &offsetEnd, &barrEnd); - - jint rowLimit; - memcpy(&rowLimit, buffer + readOffset, sizeof(jint)); - readOffset += sizeof(jint); - - jint targetBytes; - memcpy(&targetBytes, buffer + readOffset, sizeof(jint)); - readOffset += sizeof(jint); - - jint streamingMode; - memcpy(&streamingMode, buffer + readOffset, sizeof(jint)); - readOffset += sizeof(jint); - - jint iteration; - memcpy(&iteration, buffer + readOffset, sizeof(jint)); - readOffset += sizeof(jint); - - jint snapshot; - memcpy(&snapshot, buffer + readOffset, sizeof(jint)); - readOffset += sizeof(jint); - - jint reverse; - memcpy(&reverse, buffer + readOffset, sizeof(jint)); - readOffset += sizeof(jint); - - - // Update targetBytes based on our buffer capacity - if (targetBytes < bufferCapacity - readOffset*2) { - // Rough slack to consider return metadata - targetBytes = bufferCapacity - readOffset*2; - } - - FDBTransaction* tr = (FDBTransaction*)tPtr; - - FDBFuture* f = fdb_transaction_get_range(tr, barrBegin, lengthBegin, orEqualBegin, offsetBegin, barrEnd, lengthEnd, - orEqualEnd, offsetEnd, rowLimit, targetBytes, - (FDBStreamingMode)streamingMode, iteration, snapshot, reverse); - - if (fdb_future_block_until_ready(f) != 0) { - if (!jenv->ExceptionOccurred()) throwRuntimeEx(jenv, "Error executing get_range() query"); - fdb_future_destroy(f); - return; - } - - getDirectRangeQueryResults(jenv, f, buffer + readOffset, bufferCapacity - readOffset) ; - fdb_future_destroy(f); -} - JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getEstimatedRangeSizeBytes(JNIEnv *jenv, jobject, jlong tPtr, jbyteArray beginKeyBytes, jbyteArray endKeyBytes) { if( !tPtr || !beginKeyBytes || !endKeyBytes) { diff --git a/bindings/java/src/main/com/apple/foundationdb/DirectBufferIterator.java b/bindings/java/src/main/com/apple/foundationdb/DirectBufferIterator.java index f751b832c5..0948da7bba 100644 --- a/bindings/java/src/main/com/apple/foundationdb/DirectBufferIterator.java +++ b/bindings/java/src/main/com/apple/foundationdb/DirectBufferIterator.java @@ -34,12 +34,14 @@ import java.util.concurrent.CompletableFuture; */ class BufferPool { static final BufferPool __instance = new BufferPool(); - static private final int NUM_BUFFERS = 32; + static private final int NUM_BUFFERS = 128; private ArrayBlockingQueue buffers = new ArrayBlockingQueue<>(NUM_BUFFERS); public BufferPool() { while (buffers.size() < NUM_BUFFERS) { - buffers.add(ByteBuffer.allocateDirect(1024 * 1024 * 4)); + ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 512); + assert (buffer != null); + buffers.add(buffer); } } @@ -50,8 +52,8 @@ class BufferPool { /** * Requests a {@link #DirectByteBuffer} from our pool, and block if needed. */ - public synchronized ByteBuffer take() throws InterruptedException { - return buffers.take(); + public synchronized ByteBuffer poll() { + return buffers.poll(); } /** @@ -85,10 +87,8 @@ class DirectBufferIterator implements Iterator, Closeable { private RangeResultSummary summary; private final CompletableFuture promise = new CompletableFuture<>(); - public DirectBufferIterator() {} - - public void init() throws InterruptedException { - byteBuffer = BufferPool.getInstance().take(); + public DirectBufferIterator() { + byteBuffer = BufferPool.getInstance().poll(); byteBuffer.order(ByteOrder.nativeOrder()); } @@ -140,10 +140,10 @@ class DirectBufferIterator implements Iterator, Closeable { return summary; } - public String toString() { - return String.format("DirectBufferIterator{KeyCount=%d, Current=%d, More=%b, LastKey=\"%s\", Ref=%s}\n", - summary.keyCount, current, summary.more, summary.lastKey, super.toString()); - } + // public String toString() { + // return String.format("DirectBufferIterator{KeyCount=%d, Current=%d, More=%b, LastKey=\"%s\", Ref=%s}\n", + // summary.keyCount, current, summary.more, summary.lastKey, super.toString()); + // } public int currentIndex() { return current; @@ -151,7 +151,7 @@ class DirectBufferIterator implements Iterator, Closeable { public void readSummary() { byteBuffer.rewind(); - byteBuffer.position(resultOffset); + byteBuffer.position(0); final int keyCount = byteBuffer.getInt(); final boolean more = byteBuffer.getInt() > 0; @@ -165,29 +165,4 @@ class DirectBufferIterator implements Iterator, Closeable { summary = new RangeResultSummary(lastKey, keyCount, more); } - - public void prepareRequest(byte[] begin, boolean beginOrEqual, int beginOffset, byte[] end, boolean endOrEqual, - int endOffset, int rowLimit, int targetBytes, int streamingMode, int iteration, boolean isSnapshot, - boolean reverse) { - - // IMPORTANT!! Make sure the order is same when read in fdbJNI.cpp. - byteBuffer.rewind(); - byteBuffer.putInt(begin.length); - byteBuffer.putInt(beginOrEqual ? 1 : 0); - byteBuffer.putInt(beginOffset); - byteBuffer.put(begin); - - byteBuffer.putInt(end.length); - byteBuffer.putInt(endOrEqual ? 1 : 0); - byteBuffer.putInt(endOffset); - byteBuffer.put(end); - - byteBuffer.putInt(rowLimit); - byteBuffer.putInt(targetBytes); - byteBuffer.putInt(streamingMode); - byteBuffer.putInt(iteration); - byteBuffer.putInt(isSnapshot ? 1 : 0); - byteBuffer.putInt(reverse ? 1 : 0); - resultOffset = byteBuffer.position(); - } } diff --git a/bindings/java/src/main/com/apple/foundationdb/DirectRangeIterator.java b/bindings/java/src/main/com/apple/foundationdb/DirectRangeIterator.java deleted file mode 100644 index 279fe45f00..0000000000 --- a/bindings/java/src/main/com/apple/foundationdb/DirectRangeIterator.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * DirectRangeIterator.java - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2015-2021 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.apple.foundationdb; - -import com.apple.foundationdb.async.AsyncIterator; -import com.apple.foundationdb.async.AsyncUtil; - -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.BiConsumer; - -/** - * Represents the {@code Iterator} used by {@link DirectRangeQuery}. This class is - * responsible to actually make the requests to FDB server as we stream along. Make sure - * {@code Close()} is called to avoid leaks of {@link DirectBufferIterator} from {@link BufferPool}. - */ -// TODO (Vishesh): DRY. Shares lot of code with RangeQuery::Iterator. -class DirectRangeIterator implements AsyncIterator { - private final FDBTransaction tr; - - // immutable aspects of this iterator - private final boolean rowsLimited; - private final boolean reverse; - private final StreamingMode streamingMode; - private final boolean snapshot; - - // There is the chance for parallelism in the two "chunks" for fetched data - private Queue resultChunks = new ArrayDeque(); - private CompletableFuture fetchingChunk; - private CompletableFuture nextFuture; - - private boolean isCancelled = false; - private boolean fetchOutstanding = false; - - private byte[] prevKey = null; - private int iteration = 0; - private KeySelector begin; - private KeySelector end; - - private int rowsRemaining; - - DirectRangeIterator(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, - int rowLimit, boolean reverse, StreamingMode streamingMode) { - this.tr = transaction; - this.begin = begin; - this.end = end; - this.snapshot = isSnapshot; - this.rowsLimited = rowLimit != 0; - this.rowsRemaining = rowLimit; - this.reverse = reverse; - this.streamingMode = streamingMode; - - startNextFetch(); - } - - private DirectBufferIterator currentChunk() { - return resultChunks.peek(); - } - - private synchronized void startNextFetch() { - if (fetchOutstanding) - throw new IllegalStateException("Reentrant call not allowed"); // This can not be called reentrantly - - if (isCancelled) { - return; - } - - if (currentChunk() != null && mainChunkIsTheLast()) { - return; - } - - fetchOutstanding = true; - - DirectBufferIterator iter = new DirectBufferIterator(); - nextFuture = iter.onResultReady(); - - fetchingChunk = CompletableFuture.supplyAsync(() -> { - try { - iter.init(); - tr.getDirectRange_internal(iter, begin, end, rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(), - ++iteration, snapshot, reverse); - return iter; - } catch (InterruptedException e) { - synchronized (DirectRangeIterator.this) { - iter.close(); - } - throw new CompletionException(e); - } - }, tr.getExecutor()); - - fetchingChunk.whenComplete(new FetchComplete(fetchingChunk)); - } - - @Override - public synchronized CompletableFuture onHasNext() { - if (isCancelled) - throw new CancellationException(); - - // This will only happen before the first fetch has completed - if (currentChunk() == null) { - return nextFuture; - } - - // We have a chunk and are still working though it - if (currentChunk().hasNext()) { - return AsyncUtil.READY_TRUE; - } - - // If we are at the end of the current chunk there is either: - // - no more data -or- - // - we are already fetching the next block - return mainChunkIsTheLast() ? AsyncUtil.READY_FALSE : nextFuture; - } - - @Override - public boolean hasNext() { - return onHasNext().join(); - } - - @Override - public KeyValue next() { - CompletableFuture nextFuture; - synchronized (this) { - if (isCancelled) - throw new CancellationException(); - - freeChunks(); - - if (currentChunk() != null && currentChunk().hasNext()) { - // If this is the first call to next() on a chunk, then we will want to - // start fetching the data for the next block - boolean fetchNextChunk = (currentChunk().currentIndex() == 0); - - KeyValue result = currentChunk().next(); - prevKey = result.getKey(); - - freeChunks(); - - if (fetchNextChunk) { - startNextFetch(); - } - - return result; - } - - nextFuture = onHasNext(); - } - - // If there was no result ready then we need to wait on the future - // and return the proper result, throwing if there are no more elements - return nextFuture.thenApply(hasNext -> { - if (hasNext) { - return next(); - } - throw new NoSuchElementException(); - }).join(); - } - - @Override - public synchronized void remove() { - if (prevKey == null) - throw new IllegalStateException("No value has been fetched from database"); - - tr.clear(prevKey); - } - - @Override - public synchronized void cancel() { - isCancelled = true; - fetchingChunk.cancel(true); - nextFuture.cancel(true); - while (!resultChunks.isEmpty()) { - DirectBufferIterator it = resultChunks.poll(); - it.onResultReady().cancel(true); - it.close(); - } - } - - private synchronized boolean mainChunkIsTheLast() { - return !currentChunk().getSummary().more || (rowsLimited && rowsRemaining < 1); - } - - private boolean isNextChunkReady() { - return resultChunks.size() > 1; - } - - private void freeChunks() { - if (currentChunk() != null && !currentChunk().hasNext() && isNextChunkReady()) { - resultChunks.poll().close(); - } - } - - class FetchComplete implements BiConsumer { - final CompletableFuture fetchingChunk; - - FetchComplete(CompletableFuture fetch) { - this.fetchingChunk = fetch; - } - - @Override - public void accept(DirectBufferIterator iter, Throwable error) { - CompletableFuture promise = iter.onResultReady(); - if (error != null) { - promise.completeExceptionally(error); - if (error instanceof Error) { - throw (Error) error; - } - - return; - } - - synchronized (DirectRangeIterator.this) { - iter.readSummary(); - } - - final RangeResultSummary summary = iter.getSummary(); - if (summary.lastKey == null) { - promise.complete(Boolean.FALSE); - iter.close(); - return; - } - - synchronized (DirectRangeIterator.this) { - // adjust the total number of rows we should ever fetch - rowsRemaining -= summary.keyCount; - - // set up the next fetch - if (reverse) { - end = KeySelector.firstGreaterOrEqual(summary.lastKey); - } else { - begin = KeySelector.firstGreaterThan(summary.lastKey); - } - - fetchOutstanding = false; - freeChunks(); - resultChunks.offer(iter); - } - - promise.complete(Boolean.TRUE); - } - } -} diff --git a/bindings/java/src/main/com/apple/foundationdb/DirectRangeQuery.java b/bindings/java/src/main/com/apple/foundationdb/DirectRangeQuery.java deleted file mode 100644 index 2f956dbc22..0000000000 --- a/bindings/java/src/main/com/apple/foundationdb/DirectRangeQuery.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * DirectRangeQuery.java - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2015-2020 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.apple.foundationdb; - -import com.apple.foundationdb.async.AsyncIterable; -import com.apple.foundationdb.async.AsyncIterator; -import com.apple.foundationdb.async.AsyncUtil; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -/** - * Represents a query against FoundationDB for a range of keys. The - * result of this query can be iterated over in a blocking fashion with a call to - * {@link #iterator()} (as specified by {@link Iterable}). - * If the calling program uses an asynchronous paradigm, a non-blocking - * {@link AsyncIterator} is returned from {@link #iterator()}. Both of these - * constructions will not begin to query the database until the first call to - * {@code hasNext()}. As the query uses its {@link Transaction} of origin to fetch - * all the data, the use of this query object must not span more than a few seconds. - * - *

NOTE: although resulting {@code Iterator}s do support the {@code remove()} - * operation, the remove is not durable until {@code commit()} on the {@code Transaction} - * that yielded this query returns true. - * - * NOTE: This differs from {@link RangeQuery}, by the fact that it uses much higher performance - * {@link DirectByteBuffer} and {@link getDirectRange_internal} instead of {@link getRange_internal} - * which avoid several JNI calls, and memory allocations. - */ -public class DirectRangeQuery implements AsyncIterable, Iterable { - private final FDBTransaction tr; - private KeySelector begin; - private KeySelector end; - private final boolean snapshot; - private final int rowLimit; - private final boolean reverse; - private final StreamingMode streamingMode; - - DirectRangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit, - boolean reverse, StreamingMode streamingMode) { - this.tr = transaction; - this.begin = begin; - this.end = end; - this.snapshot = isSnapshot; - this.rowLimit = rowLimit; - this.reverse = reverse; - this.streamingMode = streamingMode; - } - - @Override - public AsyncIterator iterator() { - return new DirectRangeIterator(tr, snapshot, begin, end, rowLimit, reverse, streamingMode); - } - - @Override - public CompletableFuture> asList() { - StreamingMode mode = this.streamingMode; - if (mode == StreamingMode.ITERATOR) - mode = (this.rowLimit == 0) ? StreamingMode.WANT_ALL : StreamingMode.EXACT; - - // TODO (Vishesh) Update for DirectRangeQuery? We may not have big enough `DirectByteBuffer`. - // If the streaming mode is EXACT, try and grab things as one chunk - if (mode == StreamingMode.EXACT) { - FutureResults range = tr.getRange_internal(this.begin, this.end, this.rowLimit, 0, - StreamingMode.EXACT.code(), 1, this.snapshot, this.reverse); - return range.thenApply(result -> result.get().values).whenComplete((result, e) -> range.close()); - } - - // If the streaming mode is not EXACT, simply collect the results of an - // iteration into a list - return AsyncUtil.collect(new DirectRangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode), - tr.getExecutor()); - } -} diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java index 983291bff4..967c9e16ec 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java @@ -87,7 +87,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC @Override public AsyncIterable getRange(KeySelector begin, KeySelector end, int limit, boolean reverse, StreamingMode mode) { - return new DirectRangeQuery(FDBTransaction.this, true, begin, end, limit, reverse, mode); + return new RangeQuery(FDBTransaction.this, true, begin, end, limit, reverse, mode); } @Override public AsyncIterable getRange(KeySelector begin, KeySelector end, @@ -289,7 +289,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC @Override public AsyncIterable getRange(KeySelector begin, KeySelector end, int limit, boolean reverse, StreamingMode mode) { - return new DirectRangeQuery(this, false, begin, end, limit, reverse, mode); + return new RangeQuery(this, false, begin, end, limit, reverse, mode); } @Override public AsyncIterable getRange(KeySelector begin, KeySelector end, @@ -379,25 +379,6 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC } } - // Users of this function must close the returned FutureResults when finished - protected void getDirectRange_internal( - DirectBufferIterator iterator, - KeySelector begin, KeySelector end, - int rowLimit, int targetBytes, int streamingMode, - int iteration, boolean isSnapshot, boolean reverse) { - - pointerReadLock.lock(); - try { - iterator.prepareRequest(begin.getKey(), begin.orEqual(), begin.getOffset(), - end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes, - streamingMode, iteration, isSnapshot, reverse); - Transaction_getDirectRange(getPtr(), iterator.getBuffer(), iterator.getBuffer().capacity()); - iterator.readSummary(); - } finally { - pointerReadLock.unlock(); - } - } - @Override public boolean addReadConflictRangeIfNotSnapshot(byte[] keyBegin, byte[] keyEnd) { addReadConflictRange(keyBegin, keyEnd); @@ -687,7 +668,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC byte[] keyEnd, boolean orEqualEnd, int offsetEnd, int rowLimit, int targetBytes, int streamingMode, int iteration, boolean isSnapshot, boolean reverse); - private native void Transaction_getDirectRange(long cPtr, ByteBuffer jbuffer, int bufferCapacity); + private native long Transaction_getDirectRange(long cPtr, ByteBuffer jbuffer, int bufferCapacity); private native void Transaction_addConflictRange(long cPtr, byte[] keyBegin, byte[] keyEnd, int conflictRangeType); private native void Transaction_set(long cPtr, byte[] key, byte[] value); diff --git a/bindings/java/src/main/com/apple/foundationdb/FutureResults.java b/bindings/java/src/main/com/apple/foundationdb/FutureResults.java index 47fec76d6c..731866bb64 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FutureResults.java +++ b/bindings/java/src/main/com/apple/foundationdb/FutureResults.java @@ -20,6 +20,7 @@ package com.apple.foundationdb; +import java.nio.ByteBuffer; import java.util.concurrent.Executor; class FutureResults extends NativeFuture { @@ -45,13 +46,22 @@ class FutureResults extends NativeFuture { } public RangeResult getResults() { + DirectBufferIterator directIterator = new DirectBufferIterator(); try { pointerReadLock.lock(); - return FutureResults_get(getPtr()); + if (directIterator.getBuffer() != null) { + FutureResults_getDirect(getPtr(), directIterator.getBuffer(), directIterator.getBuffer().capacity()); + return new RangeResult(directIterator); + } else { + return FutureResults_get(getPtr()); + } } finally { + directIterator.close(); pointerReadLock.unlock(); } } private native RangeResult FutureResults_get(long cPtr) throws FDBException; + private native void FutureResults_getDirect(long cPtr, ByteBuffer buffer, int capacity) + throws FDBException; } diff --git a/bindings/java/src/main/com/apple/foundationdb/RangeResult.java b/bindings/java/src/main/com/apple/foundationdb/RangeResult.java index a35ff13dc2..efbafe0fa6 100644 --- a/bindings/java/src/main/com/apple/foundationdb/RangeResult.java +++ b/bindings/java/src/main/com/apple/foundationdb/RangeResult.java @@ -52,6 +52,18 @@ class RangeResult { this.more = more; } + RangeResult(DirectBufferIterator iterator) { + iterator.readSummary(); + more = iterator.getSummary().more; + + int count = iterator.getSummary().keyCount; + values = new ArrayList(count); + + for (int i = 0; i < count; ++i) { + values.add(iterator.next()); + } + } + public RangeResultSummary getSummary() { final int keyCount = values.size(); final byte[] lastKey = keyCount > 0 ? values.get(keyCount -1).getKey() : null;