Update a bunch of tests and some documentation to use dispose.
This commit is contained in:
parent
34bdd8de28
commit
11dba3e8ef
|
@ -35,31 +35,29 @@ for your platform and a running server.<br>
|
|||
<br>
|
||||
<pre>
|
||||
{@code
|
||||
import com.apple.foundationdb.*;
|
||||
import Function;
|
||||
import Tuple;
|
||||
import com.apple.foundationdb.Database;
|
||||
import com.apple.foundationdb.FDB;
|
||||
import com.apple.foundationdb.tuple.Tuple;
|
||||
|
||||
public class Example {
|
||||
public static void main(String[] args) {
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
Database db = fdb.open();
|
||||
|
||||
// Run an operation on the database
|
||||
db.run(new Function<Transaction, Void>() {
|
||||
public Void apply(Transaction tr) {
|
||||
try(Database db = fdb.open()) {
|
||||
// Run an operation on the database
|
||||
db.run(tr -> {
|
||||
tr.set(Tuple.from("hello").pack(), Tuple.from("world").pack());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Get the value of 'hello' from the database
|
||||
String hello = db.run(new Function<Transaction, String>() {
|
||||
public String apply(Transaction tr) {
|
||||
byte[] result = tr.get(Tuple.from("hello").pack()).get();
|
||||
// Get the value of 'hello' from the database
|
||||
String hello = db.run(tr -> {
|
||||
byte[] result = tr.get(Tuple.from("hello").pack()).join();
|
||||
return Tuple.fromBytes(result).getString(0);
|
||||
}
|
||||
});
|
||||
System.out.println("Hello " + hello);
|
||||
});
|
||||
|
||||
System.out.println("Hello " + hello);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,20 +43,18 @@ public abstract class AbstractTester {
|
|||
}
|
||||
|
||||
public void runTest() {
|
||||
Database db;
|
||||
|
||||
try {
|
||||
db = fdb.open();
|
||||
} catch (Exception e) {
|
||||
try(Database db = fdb.open()) {
|
||||
try {
|
||||
testPerformance(db);
|
||||
}
|
||||
catch (Exception e) {
|
||||
result.addError(wrapAndPrintError(e, "Failed to complete all tests"));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
result.addError(wrapAndPrintError(e, "fdb.open failed"));
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
testPerformance(db);
|
||||
} catch (Exception e) {
|
||||
result.addError(wrapAndPrintError(e, "Failed to complete all tests"));
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void testPerformance(Database db);
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
/*
|
||||
* AsyncPerformanceTester.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.apple.foundationdb.test;
|
||||
|
||||
public class AsyncPerformanceTester {
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Running Java async performance test on Java version " + System.getProperty("java.version"));
|
||||
}
|
||||
}
|
|
@ -37,68 +37,70 @@ public class BlockingBenchmark {
|
|||
|
||||
// The cluster file DOES NOT need to be valid, although it must exist.
|
||||
// This is because the database is never really contacted in this test.
|
||||
Database database = fdb.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
|
||||
try(Database database = fdb.open()) {
|
||||
try(Transaction tr = database.createTransaction()) {
|
||||
tr.setReadVersion(100000);
|
||||
|
||||
byte[] key = {0x1, 0x1, 0x1, 0x1, 0x1};
|
||||
byte[] val = {0x2, 0x2, 0x2, 0x2, 0x2};
|
||||
System.out.println("readVersion().join():");
|
||||
runTests(tr, o -> {
|
||||
try {
|
||||
o.join();
|
||||
}
|
||||
catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
Transaction tr = database.createTransaction();
|
||||
tr.setReadVersion(100000);
|
||||
System.out.println("readVersion().get():");
|
||||
runTests(tr, o -> {
|
||||
try {
|
||||
o.get();
|
||||
}
|
||||
catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
System.out.println("readVersion().join():");
|
||||
runTests(tr, o -> {
|
||||
try {
|
||||
o.join();
|
||||
} catch(Exception e) {
|
||||
// Ignore
|
||||
System.out.println("readVersion().thenApplyAsync(identity).get():");
|
||||
runTests(tr, o -> {
|
||||
try {
|
||||
o.thenApplyAsync(Function.identity(), FDB.DEFAULT_EXECUTOR).get();
|
||||
}
|
||||
catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
System.out.println("readVersion().thenApplyAsync^10(identity).get():");
|
||||
runTests(tr, o -> {
|
||||
for(int i = 0; i < 10; i++)
|
||||
o = o.thenApplyAsync(Function.identity(), FDB.DEFAULT_EXECUTOR);
|
||||
try {
|
||||
o.get();
|
||||
}
|
||||
catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
System.out.println("readVersion().get^100():");
|
||||
runTests(tr, o -> {
|
||||
for(int i = 0; i < 100; i++) {
|
||||
try {
|
||||
o.get();
|
||||
}
|
||||
catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
System.out.println("readVersion().get():");
|
||||
runTests(tr, o -> {
|
||||
try {
|
||||
o.get();
|
||||
} catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
System.out.println("readVersion().thenApplyAsync(identity).get():");
|
||||
runTests(tr, o -> {
|
||||
try {
|
||||
o.thenApplyAsync(Function.identity(), FDB.DEFAULT_EXECUTOR).get();
|
||||
} catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
System.out.println("readVersion().thenApplyAsync^10(identity).get():");
|
||||
runTests(tr, o -> {
|
||||
for(int i=0; i<10; i++)
|
||||
o = o.thenApplyAsync(Function.identity(), FDB.DEFAULT_EXECUTOR);
|
||||
try {
|
||||
o.get();
|
||||
} catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
System.out.println("readVersion().get^100():");
|
||||
runTests(tr, o -> {
|
||||
for(int i=0; i<100; i++) {
|
||||
try {
|
||||
o.get();
|
||||
} catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private static void runTests(Transaction tr, Function<CompletableFuture<Long>, Void> blockMethod) {
|
||||
|
|
|
@ -48,8 +48,9 @@ public class ConcurrentGetSetGet {
|
|||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Database database = FDB.selectAPIVersion(510).open();
|
||||
new ConcurrentGetSetGet().apply(database);
|
||||
try(Database database = FDB.selectAPIVersion(510).open()) {
|
||||
new ConcurrentGetSetGet().apply(database);
|
||||
}
|
||||
}
|
||||
|
||||
public void apply(Database db) {
|
||||
|
|
|
@ -32,14 +32,14 @@ import com.apple.foundationdb.directory.DirectoryLayer;
|
|||
import com.apple.foundationdb.directory.DirectorySubspace;
|
||||
|
||||
public class DirectoryTest {
|
||||
private static final String CLUSTER_FILE = "/home/ajb/fdb.cluster";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
try {
|
||||
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
|
||||
Database db = c.openDatabase();
|
||||
runTests(db);
|
||||
} catch(Throwable t) {
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
try(Database db = fdb.open()) {
|
||||
runTests(db);
|
||||
}
|
||||
}
|
||||
catch(Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,31 +20,28 @@
|
|||
|
||||
package com.apple.foundationdb.test;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.apple.foundationdb.Database;
|
||||
import com.apple.foundationdb.FDB;
|
||||
import com.apple.foundationdb.Transaction;
|
||||
import com.apple.foundationdb.tuple.Tuple;
|
||||
|
||||
public class Example {
|
||||
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||||
public static void main(String[] args) {
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
Database db = fdb.open();
|
||||
|
||||
// Run an operation on the database
|
||||
db.run((Function<Transaction, Void>) tr -> {
|
||||
tr.set(Tuple.from("hello").pack(), Tuple.from("world").pack());
|
||||
return null;
|
||||
});
|
||||
try(Database db = fdb.open()) {
|
||||
// Run an operation on the database
|
||||
db.run(tr -> {
|
||||
tr.set(Tuple.from("hello").pack(), Tuple.from("world").pack());
|
||||
return null;
|
||||
});
|
||||
|
||||
// Get the value of 'hello' from the database
|
||||
String hello = db.run(tr -> {
|
||||
byte[] result = tr.get(Tuple.from("hello").pack()).join();
|
||||
return Tuple.fromBytes(result).getString(0);
|
||||
});
|
||||
System.out.println("Hello " + hello);
|
||||
// Get the value of 'hello' from the database
|
||||
String hello = db.run(tr -> {
|
||||
byte[] result = tr.get(Tuple.from("hello").pack()).join();
|
||||
return Tuple.fromBytes(result).getString(0);
|
||||
});
|
||||
System.out.println("Hello " + hello);
|
||||
}
|
||||
}
|
||||
|
||||
private Example() {}
|
||||
|
|
|
@ -29,15 +29,15 @@ import com.apple.foundationdb.KeyValue;
|
|||
import com.apple.foundationdb.TransactionContext;
|
||||
|
||||
public class IterableTest {
|
||||
private static final String CLUSTER_FILE = "C:\\Users\\Ben\\workspace\\fdb\\fdb.cluster";
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
final int reps = 1000;
|
||||
try {
|
||||
Cluster cluster = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
|
||||
Database db = cluster.openDatabase();
|
||||
runTests(reps, db);
|
||||
} catch(Throwable t) {
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
try(Database db = fdb.open()) {
|
||||
runTests(reps, db);
|
||||
}
|
||||
}
|
||||
catch(Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,8 @@ public class IterableTest {
|
|||
}
|
||||
return null;
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
|
|
|
@ -35,27 +35,28 @@ public class LocalityTests {
|
|||
|
||||
public static void main(String[] args) {
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
Database database = fdb.open(args[0]);
|
||||
try(Database database = fdb.open(args[0])) {
|
||||
try(Transaction tr = database.createTransaction()) {
|
||||
String[] keyAddresses = LocalityUtil.getAddressesForKey(tr, "a".getBytes()).join();
|
||||
for(String s : keyAddresses) {
|
||||
System.out.println(" @ " + s);
|
||||
}
|
||||
}
|
||||
|
||||
Transaction tr = database.createTransaction();
|
||||
String[] keyAddresses = LocalityUtil.getAddressesForKey(tr, "a".getBytes()).join();
|
||||
for(String s : keyAddresses) {
|
||||
System.out.println(" @ " + s);
|
||||
}
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
CloseableAsyncIterator<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[]{(byte) 255});
|
||||
CompletableFuture<List<byte[]>> collection = AsyncUtil.collectRemaining(keys);
|
||||
List<byte[]> list = collection.join();
|
||||
System.out.println("Took " + (System.currentTimeMillis() - start) + "ms to get " +
|
||||
list.size() + " items");
|
||||
|
||||
CloseableAsyncIterator<byte[]> keys = LocalityUtil.getBoundaryKeys(database, new byte[0], new byte[] { (byte)255 });
|
||||
CompletableFuture<List<byte[]>> collection = AsyncUtil.collectRemaining(keys);
|
||||
List<byte[]> list = collection.join();
|
||||
System.out.println("Took " + (System.currentTimeMillis() - start) + "ms to get " +
|
||||
list.size() + " items");
|
||||
keys.close();
|
||||
|
||||
keys.close();
|
||||
|
||||
int i = 0;
|
||||
for(byte[] key : collection.join()) {
|
||||
System.out.println(i++ + ": " + ByteArrayUtil.printable(key));
|
||||
int i = 0;
|
||||
for(byte[] key : collection.join()) {
|
||||
System.out.println(i++ + ": " + ByteArrayUtil.printable(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -44,84 +44,86 @@ public class ParallelRandomScan {
|
|||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
FDB api = FDB.selectAPIVersion(510);
|
||||
Database database = api.open(args[0]);
|
||||
|
||||
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {
|
||||
runTest(database, i, ROWS, DURATION_MS);
|
||||
Thread.sleep(1000);
|
||||
try(Database database = api.open(args[0])) {
|
||||
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {
|
||||
runTest(database, i, ROWS, DURATION_MS);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void runTest(Database database,
|
||||
int parallelism, int rows, int duration) throws InterruptedException {
|
||||
int parallelism, int rows, int duration) throws InterruptedException
|
||||
{
|
||||
final Random r = new Random();
|
||||
final AtomicInteger readsCompleted = new AtomicInteger(0);
|
||||
final AtomicInteger errors = new AtomicInteger(0);
|
||||
final Transaction tr = database.createTransaction();
|
||||
final Semaphore coordinator = new Semaphore(parallelism);
|
||||
final ContinuousSample<Long> latencies = new ContinuousSample<>(1000);
|
||||
|
||||
tr.options().setReadYourWritesDisable();
|
||||
try(final Transaction tr = database.createTransaction()) {
|
||||
tr.options().setReadYourWritesDisable();
|
||||
|
||||
// Clearing the whole database before starting means all reads are local
|
||||
/*ByteBuffer buf = ByteBuffer.allocate(4);
|
||||
buf.putInt(0, Integer.MAX_VALUE);
|
||||
tr.clear(new byte[0], buf.array());*/
|
||||
// Clearing the whole database before starting means all reads are local
|
||||
/*ByteBuffer buf = ByteBuffer.allocate(4);
|
||||
buf.putInt(0, Integer.MAX_VALUE);
|
||||
tr.clear(new byte[0], buf.array());*/
|
||||
|
||||
// We use this for the key generation
|
||||
ByteBuffer buf = ByteBuffer.allocate(4);
|
||||
// We use this for the key generation
|
||||
ByteBuffer buf = ByteBuffer.allocate(4);
|
||||
|
||||
// Eat the cost of the read version up-front
|
||||
tr.getReadVersion().join();
|
||||
// Eat the cost of the read version up-front
|
||||
tr.getReadVersion().join();
|
||||
|
||||
final long start = System.currentTimeMillis();
|
||||
while(true) {
|
||||
coordinator.acquire();
|
||||
if(System.currentTimeMillis() - start > duration) {
|
||||
coordinator.release();
|
||||
break;
|
||||
final long start = System.currentTimeMillis();
|
||||
while(true) {
|
||||
coordinator.acquire();
|
||||
if(System.currentTimeMillis() - start > duration) {
|
||||
coordinator.release();
|
||||
break;
|
||||
}
|
||||
|
||||
int row = r.nextInt(rows - 1);
|
||||
buf.putInt(0, row);
|
||||
AsyncIterable<KeyValue> range = tr.getRange(
|
||||
buf.array(), ByteArrayUtil.strinc(buf.array()), 1, false, StreamingMode.SMALL);
|
||||
|
||||
final long launch = System.nanoTime();
|
||||
|
||||
final AsyncIterator<KeyValue> it = range.iterator();
|
||||
final CompletableFuture<KeyValue> f = it.onHasNext().thenApplyAsync(hasFirst -> {
|
||||
if(!hasFirst) {
|
||||
return null;
|
||||
}
|
||||
return it.next();
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
f.whenCompleteAsync((kv, t) -> {
|
||||
if(kv != null) {
|
||||
readsCompleted.incrementAndGet();
|
||||
long timeTaken = System.nanoTime() - launch;
|
||||
synchronized(latencies) {
|
||||
latencies.addSample(timeTaken);
|
||||
}
|
||||
}
|
||||
else if(t != null) {
|
||||
errors.incrementAndGet();
|
||||
}
|
||||
|
||||
coordinator.release();
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
}
|
||||
|
||||
int row = r.nextInt(rows - 1);
|
||||
buf.putInt(0, row);
|
||||
AsyncIterable<KeyValue> range = tr.getRange(
|
||||
buf.array(), ByteArrayUtil.strinc(buf.array()), 1, false, StreamingMode.SMALL);
|
||||
// Block for ALL tasks to end!
|
||||
coordinator.acquire(parallelism);
|
||||
long end = System.currentTimeMillis();
|
||||
|
||||
final long launch = System.nanoTime();
|
||||
|
||||
final AsyncIterator<KeyValue> it = range.iterator();
|
||||
final CompletableFuture<KeyValue> f = it.onHasNext().thenApplyAsync(hasFirst -> {
|
||||
if(!hasFirst) {
|
||||
return null;
|
||||
}
|
||||
return it.next();
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
f.whenCompleteAsync((kv, t) -> {
|
||||
if(kv != null) {
|
||||
readsCompleted.incrementAndGet();
|
||||
long timeTaken = System.nanoTime() - launch;
|
||||
synchronized(latencies) {
|
||||
latencies.addSample(timeTaken);
|
||||
}
|
||||
}
|
||||
else if(t != null) {
|
||||
errors.incrementAndGet();
|
||||
}
|
||||
|
||||
coordinator.release();
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
double rowsPerSecond = readsCompleted.get() / ((end - start) / 1000.0);
|
||||
System.out.println(parallelism + " ->\t" + rowsPerSecond);
|
||||
System.out.println(String.format(" Reads: %d, errors: %d, time: %dms",
|
||||
readsCompleted.get(), errors.get(), (int) (end - start)));
|
||||
System.out.println(String.format(" Mean: %.2f, Median: %d, 98%%: %d",
|
||||
latencies.mean(), latencies.median(), latencies.percentile(0.98)));
|
||||
}
|
||||
|
||||
// Block for ALL tasks to end!
|
||||
coordinator.acquire(parallelism);
|
||||
long end = System.currentTimeMillis();
|
||||
|
||||
double rowsPerSecond = readsCompleted.get() / ((end - start) / 1000.0);
|
||||
System.out.println(parallelism + " ->\t" + rowsPerSecond);
|
||||
System.out.println(String.format(" Reads: %d, errors: %d, time: %dms",
|
||||
readsCompleted.get(), errors.get(), (int)(end - start)));
|
||||
System.out.println(String.format(" Mean: %.2f, Median: %d, 98%%: %d",
|
||||
latencies.mean(), latencies.median(), latencies.percentile(0.98)));
|
||||
}
|
||||
|
||||
private ParallelRandomScan() {}
|
||||
|
|
|
@ -220,42 +220,44 @@ public class PerformanceTester extends AbstractTester {
|
|||
}
|
||||
|
||||
public Double clear(Database db, int count) {
|
||||
Transaction tr = db.createTransaction();
|
||||
long start = System.nanoTime();
|
||||
for (int i = 0; i < count; i++) {
|
||||
tr.clear(randomKey());
|
||||
}
|
||||
long end = System.nanoTime();
|
||||
tr.cancel();
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
long start = System.nanoTime();
|
||||
for(int i = 0; i < count; i++) {
|
||||
tr.clear(randomKey());
|
||||
}
|
||||
long end = System.nanoTime();
|
||||
tr.cancel();
|
||||
|
||||
return count*1_000_000_000.0/(end - start);
|
||||
return count * 1_000_000_000.0 / (end - start);
|
||||
}
|
||||
}
|
||||
|
||||
public Double clearRange(Database db, int count) {
|
||||
Transaction tr = db.createTransaction();
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
long start = System.nanoTime();
|
||||
for(int i = 0; i < count; i++) {
|
||||
int keyIndex = randomKeyIndex();
|
||||
tr.clear(key(keyIndex), key(keyIndex + 1));
|
||||
}
|
||||
long end = System.nanoTime();
|
||||
tr.cancel();
|
||||
|
||||
long start = System.nanoTime();
|
||||
for (int i = 0; i < count; i++) {
|
||||
int keyIndex = randomKeyIndex();
|
||||
tr.clear(key(keyIndex), key(keyIndex+1));
|
||||
return count * 1_000_000_000.0 / (end - start);
|
||||
}
|
||||
long end = System.nanoTime();
|
||||
tr.cancel();
|
||||
|
||||
return count*1_000_000_000.0/(end - start);
|
||||
}
|
||||
|
||||
public Double set(Database db, int count) {
|
||||
Transaction tr = db.createTransaction();
|
||||
long start = System.nanoTime();
|
||||
for (int i = 0; i < count; i++) {
|
||||
int keyIndex = randomKeyIndex();
|
||||
tr.set(key(keyIndex), value(keyIndex));
|
||||
}
|
||||
long end = System.nanoTime();
|
||||
tr.cancel();
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
long start = System.nanoTime();
|
||||
for(int i = 0; i < count; i++) {
|
||||
int keyIndex = randomKeyIndex();
|
||||
tr.set(key(keyIndex), value(keyIndex));
|
||||
}
|
||||
long end = System.nanoTime();
|
||||
tr.cancel();
|
||||
|
||||
return count*1_000_000_000.0/(end - start);
|
||||
return count * 1_000_000_000.0 / (end - start);
|
||||
}
|
||||
}
|
||||
|
||||
public Double parallelGet(TransactionContext tcx, int count) {
|
||||
|
|
|
@ -87,53 +87,58 @@ public class RYWBenchmark extends AbstractTester {
|
|||
|
||||
@Override
|
||||
public void testPerformance(Database db) {
|
||||
Transaction tr = db.createTransaction();
|
||||
insertData(tr);
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
insertData(tr);
|
||||
|
||||
List<String> testsToRun;
|
||||
if (args.getTestsToRun().isEmpty()) {
|
||||
testsToRun = Arrays.stream(Tests.values()).map(Tests::name).map(String::toLowerCase).sorted().collect(Collectors.toList());
|
||||
} else {
|
||||
testsToRun = args.getTestsToRun();
|
||||
}
|
||||
List<String> testsToRun;
|
||||
if(args.getTestsToRun().isEmpty()) {
|
||||
testsToRun = Arrays.stream(Tests.values()).map(Tests::name).map(String::toLowerCase).sorted().collect(Collectors.toList());
|
||||
}
|
||||
else {
|
||||
testsToRun = args.getTestsToRun();
|
||||
}
|
||||
|
||||
for (String test : testsToRun) {
|
||||
Tests testObj;
|
||||
try {
|
||||
testObj = Tests.valueOf(test.toUpperCase());
|
||||
} catch (IllegalArgumentException e) {
|
||||
result.addError(new IllegalArgumentException("Test " + test + " not implemented"));
|
||||
continue;
|
||||
}
|
||||
for(String test : testsToRun) {
|
||||
Tests testObj;
|
||||
try {
|
||||
testObj = Tests.valueOf(test.toUpperCase());
|
||||
}
|
||||
catch(IllegalArgumentException e) {
|
||||
result.addError(new IllegalArgumentException("Test " + test + " not implemented"));
|
||||
continue;
|
||||
}
|
||||
|
||||
Function<? super Transaction, ? extends Double> function = testObj.getFunction();
|
||||
Function<? super Transaction, ? extends Double> function = testObj.getFunction();
|
||||
|
||||
try {
|
||||
Thread.sleep(5_000);
|
||||
} catch (InterruptedException e) {
|
||||
result.addError(wrapAndPrintError(e, "Interrupted while sleeping"));
|
||||
}
|
||||
try {
|
||||
Thread.sleep(5_000);
|
||||
}
|
||||
catch(InterruptedException e) {
|
||||
result.addError(wrapAndPrintError(e, "Interrupted while sleeping"));
|
||||
}
|
||||
|
||||
System.out.println("Running test " + test);
|
||||
System.out.println("Running test " + test);
|
||||
|
||||
List<Double> results = new ArrayList<>(NUM_RUNS);
|
||||
List<Double> results = new ArrayList<>(NUM_RUNS);
|
||||
|
||||
for (int i = 0; i < NUM_RUNS; i++) {
|
||||
try {
|
||||
results.add(function.apply(tr));
|
||||
} catch (Exception e) {
|
||||
result.addError(wrapAndPrintError(e, "Performance test failed: " + test));
|
||||
break;
|
||||
}
|
||||
}
|
||||
for(int i = 0; i < NUM_RUNS; i++) {
|
||||
try {
|
||||
results.add(function.apply(tr));
|
||||
}
|
||||
catch(Exception e) {
|
||||
result.addError(wrapAndPrintError(e, "Performance test failed: " + test));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (results.size() == NUM_RUNS) {
|
||||
Collections.sort(results);
|
||||
result.addKpi(String.format("%s", testObj.getKpi()), results.get(results.size() / 2).intValue(), "keys/s");
|
||||
}
|
||||
}
|
||||
if(results.size() == NUM_RUNS) {
|
||||
Collections.sort(results);
|
||||
result.addKpi(String.format("%s", testObj.getKpi()), results.get(results.size() / 2).intValue(), "keys/s");
|
||||
}
|
||||
}
|
||||
|
||||
tr.cancel();
|
||||
tr.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
public Double getSingle(Transaction tr, int count) {
|
||||
|
|
|
@ -40,121 +40,123 @@ public class RangeTest {
|
|||
System.out.println("About to use version " + API_VERSION);
|
||||
FDB fdb = FDB.selectAPIVersion(API_VERSION);
|
||||
|
||||
/*
|
||||
final String CLUSTER_FILE = "T:\\Ben\\cluster";
|
||||
String clusterFile = CLUSTER_FILE;
|
||||
if(args.length > 0) {
|
||||
clusterFile = args[0];
|
||||
}
|
||||
try(Database db = fdb.open()) {
|
||||
try {
|
||||
db.run((Function<Transaction, Void>) tr -> {
|
||||
long version = tr.getReadVersion().join();
|
||||
System.out.println("DB version: " + version);
|
||||
tr.get("apple1".getBytes()).join();
|
||||
tr.set("apple1".getBytes(), "crunchy1".getBytes());
|
||||
tr.set("apple2".getBytes(), "crunchy2".getBytes());
|
||||
tr.set("apple3".getBytes(), "crunchy3".getBytes());
|
||||
tr.set("apple4".getBytes(), "crunchy4".getBytes());
|
||||
tr.set("apple5".getBytes(), "crunchy5".getBytes());
|
||||
tr.set("apple6".getBytes(), "crunchy6".getBytes());
|
||||
System.out.println("Attempting to commit apple/crunchy pairs...");
|
||||
|
||||
System.out.println("Using cluster file: " + clusterFile);
|
||||
Cluster cluster = fdb.createCluster(clusterFile).get();
|
||||
Database db = cluster.openDatabase().get();
|
||||
*/
|
||||
|
||||
Database db = fdb.open();
|
||||
|
||||
try {
|
||||
db.run((Function<Transaction, Void>) tr -> {
|
||||
long version = tr.getReadVersion().join();
|
||||
System.out.println("DB version: " + version);
|
||||
tr.get("apple1".getBytes()).join();
|
||||
tr.set("apple1".getBytes(), "crunchy1".getBytes());
|
||||
tr.set("apple2".getBytes(), "crunchy2".getBytes());
|
||||
tr.set("apple3".getBytes(), "crunchy3".getBytes());
|
||||
tr.set("apple4".getBytes(), "crunchy4".getBytes());
|
||||
tr.set("apple5".getBytes(), "crunchy5".getBytes());
|
||||
tr.set("apple6".getBytes(), "crunchy6".getBytes());
|
||||
System.out.println("Attempting to commit apple/crunchy pairs...");
|
||||
|
||||
return null;
|
||||
});
|
||||
} catch (Throwable e){
|
||||
e.printStackTrace();
|
||||
System.out.println("Non retryable exception caught...");
|
||||
}
|
||||
|
||||
System.out.println("First transaction was successful");
|
||||
|
||||
checkRange(db.createTransaction());
|
||||
|
||||
Transaction tr = db.createTransaction();
|
||||
long version = tr.getReadVersion().join();
|
||||
System.out.println("DB version: " + version);
|
||||
|
||||
byte[] bs = tr.get("apple3".getBytes()).join();
|
||||
System.out.println("Got apple3: " + new String(bs));
|
||||
|
||||
tr.cancel();
|
||||
try {
|
||||
tr.get("apple3".getBytes()).join();
|
||||
throw new RuntimeException("The get() should have thrown an error!");
|
||||
} catch(CompletionException ex) {
|
||||
FDBException e = (FDBException)ex.getCause();
|
||||
if(e.getCode() != 1025) {
|
||||
System.err.println("Transaction was not cancelled correctly (" + e.getCode() + ")");
|
||||
throw e;
|
||||
return null;
|
||||
});
|
||||
}
|
||||
System.out.println("Transaction was cancelled correctly");
|
||||
catch(Throwable e) {
|
||||
e.printStackTrace();
|
||||
System.out.println("Non retryable exception caught...");
|
||||
}
|
||||
|
||||
System.out.println("First transaction was successful");
|
||||
|
||||
checkRange(db.createTransaction());
|
||||
|
||||
long version;
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
version = tr.getReadVersion().join();
|
||||
System.out.println("DB version: " + version);
|
||||
|
||||
byte[] bs = tr.get("apple3".getBytes()).join();
|
||||
System.out.println("Got apple3: " + new String(bs));
|
||||
|
||||
tr.cancel();
|
||||
try {
|
||||
tr.get("apple3".getBytes()).join();
|
||||
throw new RuntimeException("The get() should have thrown an error!");
|
||||
}
|
||||
catch(CompletionException ex) {
|
||||
FDBException e = (FDBException) ex.getCause();
|
||||
if(e.getCode() != 1025) {
|
||||
System.err.println("Transaction was not cancelled correctly (" + e.getCode() + ")");
|
||||
throw e;
|
||||
}
|
||||
System.out.println("Transaction was cancelled correctly");
|
||||
}
|
||||
}
|
||||
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
version = tr.getReadVersion().join();
|
||||
System.out.println("DB version: " + version);
|
||||
|
||||
tr.clear("apple3".getBytes(), "apple6".getBytes());
|
||||
try {
|
||||
tr.commit().join();
|
||||
System.out.println("Clear range transaction was successful");
|
||||
}
|
||||
catch(FDBException e) {
|
||||
System.err.println("Error in the clear of a single value");
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
checkRange(tr);
|
||||
}
|
||||
|
||||
Range r1 = new Range("apple".getBytes(), "banana".getBytes());
|
||||
Range r2 = new Range("apple".getBytes(), "banana".getBytes());
|
||||
Range r3 = new Range("apple".getBytes(), "crepe".getBytes());
|
||||
Range r4 = new Range(null, "banana".getBytes());
|
||||
Range r5 = new Range(new byte[]{0x15, 0x01}, null);
|
||||
|
||||
System.out.println("ranges: " + r1 + ", " + r2 + ", " + r3 + ", " + r4 + ", " + r5);
|
||||
|
||||
if(r1.equals(null)) {
|
||||
System.err.println("range " + r1 + " equals null");
|
||||
}
|
||||
else if(!r1.equals(r1)) {
|
||||
System.err.println("range equality not reflexive");
|
||||
}
|
||||
else if(r1.hashCode() != r1.hashCode()) {
|
||||
System.err.println("range hashcode not reflexive");
|
||||
}
|
||||
else if(!r1.equals(r2)) {
|
||||
System.err.println("range " + r1 + " and " + r2 + " not equal");
|
||||
}
|
||||
else if(r1.hashCode() != r2.hashCode()) {
|
||||
System.err.println("ranges " + r1 + " and " + r2 + " do not have same hash codes");
|
||||
}
|
||||
else if(r1.equals(r3)) {
|
||||
System.err.println("ranges " + r1 + " and " + r3 + " are equal");
|
||||
}
|
||||
else if(r1.hashCode() == r3.hashCode()) {
|
||||
System.err.println("range " + r1 + " and " + r3 + " have same hash code");
|
||||
}
|
||||
else if(r1.equals(r4)) {
|
||||
System.err.println("ranges " + r1 + " and " + r4 + " are equal");
|
||||
}
|
||||
else if(r1.hashCode() == r4.hashCode()) {
|
||||
System.err.println("range " + r1 + " and " + r4 + " have same hash code");
|
||||
}
|
||||
else if(r1.equals(r5)) {
|
||||
System.err.println("ranges " + r1 + " and " + r5 + " are equal");
|
||||
}
|
||||
else if(r1.hashCode() == r5.hashCode()) {
|
||||
System.err.println("range " + r1 + " and " + r5 + " have same hash code");
|
||||
}
|
||||
else {
|
||||
System.out.println("range comparisons okay");
|
||||
}
|
||||
|
||||
//fdb.stopNetwork();
|
||||
System.out.println("Done with test program");
|
||||
}
|
||||
|
||||
tr = db.createTransaction();
|
||||
version = tr.getReadVersion().join();
|
||||
System.out.println("DB version: " + version);
|
||||
|
||||
tr.clear("apple3".getBytes(), "apple6".getBytes());
|
||||
try {
|
||||
tr.commit().join();
|
||||
System.out.println("Clear range transaction was successful");
|
||||
} catch(FDBException e) {
|
||||
System.err.println("Error in the clear of a single value");
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
//db.close();
|
||||
//cluster.close();
|
||||
|
||||
tr = db.createTransaction();
|
||||
checkRange(tr);
|
||||
|
||||
Range r1 = new Range("apple".getBytes(), "banana".getBytes());
|
||||
Range r2 = new Range("apple".getBytes(), "banana".getBytes());
|
||||
Range r3 = new Range("apple".getBytes(), "crepe".getBytes());
|
||||
Range r4 = new Range(null, "banana".getBytes());
|
||||
Range r5 = new Range(new byte[]{0x15, 0x01}, null);
|
||||
|
||||
System.out.println("ranges: " + r1 + ", " + r2 + ", " + r3 + ", " + r4 + ", " + r5);
|
||||
|
||||
if(r1.equals(null)) {
|
||||
System.err.println("range " + r1 + " equals null");
|
||||
} else if(!r1.equals(r1)) {
|
||||
System.err.println("range equality not reflexive");
|
||||
} else if(r1.hashCode() != r1.hashCode()) {
|
||||
System.err.println("range hashcode not reflexive");
|
||||
} else if(!r1.equals(r2)) {
|
||||
System.err.println("range " + r1 + " and " + r2 + " not equal");
|
||||
} else if(r1.hashCode() != r2.hashCode()) {
|
||||
System.err.println("ranges " + r1 + " and " + r2 + " do not have same hash codes");
|
||||
} else if(r1.equals(r3)) {
|
||||
System.err.println("ranges " + r1 + " and " + r3 + " are equal");
|
||||
} else if (r1.hashCode() == r3.hashCode()) {
|
||||
System.err.println("range " + r1 + " and " + r3 + " have same hash code");
|
||||
} else if(r1.equals(r4)) {
|
||||
System.err.println("ranges " + r1 + " and " + r4 + " are equal");
|
||||
} else if(r1.hashCode() == r4.hashCode()) {
|
||||
System.err.println("range " + r1 + " and " + r4 + " have same hash code");
|
||||
} else if(r1.equals(r5)) {
|
||||
System.err.println("ranges " + r1 + " and " + r5 + " are equal");
|
||||
} else if(r1.hashCode() == r5.hashCode()) {
|
||||
System.err.println("range " + r1 + " and " + r5 + " have same hash code");
|
||||
} else {
|
||||
System.out.println("range comparisons okay");
|
||||
}
|
||||
|
||||
db.close();
|
||||
//cluster.close();
|
||||
//fdb.stopNetwork();
|
||||
System.out.println("Done with test program");
|
||||
}
|
||||
|
||||
private static void checkRange(Transaction tr) {
|
||||
|
|
|
@ -35,29 +35,31 @@ public class SerialInsertion {
|
|||
|
||||
public static void main(String[] args) {
|
||||
FDB api = FDB.selectAPIVersion(510);
|
||||
Database database = api.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
|
||||
long start = System.currentTimeMillis();
|
||||
try(Database database = api.open()) {
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
List<InsertionThread> threads = new ArrayList<InsertionThread>(THREAD_COUNT);
|
||||
int nodesPerThread = NODES / THREAD_COUNT;
|
||||
for(int i = 0; i < THREAD_COUNT; i++) {
|
||||
// deal with non even division by adding remainder onto last thread's work
|
||||
if(i == THREAD_COUNT - 1) {
|
||||
nodesPerThread += (NODES % THREAD_COUNT);
|
||||
List<InsertionThread> threads = new ArrayList<InsertionThread>(THREAD_COUNT);
|
||||
int nodesPerThread = NODES / THREAD_COUNT;
|
||||
for(int i = 0; i < THREAD_COUNT; i++) {
|
||||
// deal with non even division by adding remainder onto last thread's work
|
||||
if(i == THREAD_COUNT - 1) {
|
||||
nodesPerThread += (NODES % THREAD_COUNT);
|
||||
}
|
||||
InsertionThread t = new InsertionThread(database, nodesPerThread * i, nodesPerThread);
|
||||
t.start();
|
||||
threads.add(t);
|
||||
}
|
||||
InsertionThread t = new InsertionThread(database, nodesPerThread * i, nodesPerThread);
|
||||
t.start();
|
||||
threads.add(t);
|
||||
}
|
||||
for(InsertionThread t : threads) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
for(InsertionThread t : threads) {
|
||||
try {
|
||||
t.join();
|
||||
}
|
||||
catch(InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("Time taken: " + (System.currentTimeMillis() - start) + "ms");
|
||||
System.out.println("Time taken: " + (System.currentTimeMillis() - start) + "ms");
|
||||
}
|
||||
}
|
||||
|
||||
static class InsertionThread extends Thread {
|
||||
|
@ -77,20 +79,26 @@ public class SerialInsertion {
|
|||
int done = 0;
|
||||
ByteBuffer buf = ByteBuffer.allocate(4);
|
||||
Transaction tr = db.createTransaction();
|
||||
while(done < insertionCount) {
|
||||
try {
|
||||
int i = 0;
|
||||
for(; i < BATCH_SIZE && done + i < insertionCount; i++) {
|
||||
buf.putInt(0, insertionStart + done + i);
|
||||
tr.set(buf.array(), value);
|
||||
try {
|
||||
while(done < insertionCount) {
|
||||
try {
|
||||
int i = 0;
|
||||
for(; i < BATCH_SIZE && done + i < insertionCount; i++) {
|
||||
buf.putInt(0, insertionStart + done + i);
|
||||
tr.set(buf.array(), value);
|
||||
}
|
||||
tr.commit().join();
|
||||
tr = db.createTransaction();
|
||||
done += i;
|
||||
}
|
||||
catch(RuntimeException e) {
|
||||
tr = tr.onError(e).join();
|
||||
}
|
||||
tr.commit().join();
|
||||
tr = db.createTransaction();
|
||||
done += i;
|
||||
} catch(RuntimeException e) {
|
||||
tr = tr.onError(e).join();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
tr.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,16 +40,16 @@ public class SerialIteration {
|
|||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
FDB api = FDB.selectAPIVersion(510);
|
||||
Database database = api.open(args[0]);
|
||||
|
||||
for(int i = 1; i <= THREAD_COUNT; i++) {
|
||||
runThreadedTest(database, i);
|
||||
Thread.sleep(1000);
|
||||
try(Database database = api.open(args[0])) {
|
||||
for(int i = 1; i <= THREAD_COUNT; i++) {
|
||||
runThreadedTest(database, i);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static double runThreadedTest(Database database, int threadCount) {
|
||||
List<IterationThread> threads = new ArrayList<IterationThread>(threadCount);
|
||||
List<IterationThread> threads = new ArrayList<>(threadCount);
|
||||
for(int i = 0; i < threadCount; i++) {
|
||||
IterationThread thread = new IterationThread(database);
|
||||
thread.start();
|
||||
|
@ -115,23 +115,25 @@ public class SerialIteration {
|
|||
}
|
||||
|
||||
private static int scanDatabase(Database database, int rows) {
|
||||
Transaction tr = database.createTransaction();
|
||||
tr.options().setReadYourWritesDisable();
|
||||
try(Transaction tr = database.createTransaction()) {
|
||||
tr.options().setReadYourWritesDisable();
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(4);
|
||||
buf.putInt(0, Integer.MAX_VALUE);
|
||||
AsyncIterable<KeyValue> range = tr.getRange(new byte[0], buf.array(),
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL);
|
||||
ByteBuffer buf = ByteBuffer.allocate(4);
|
||||
buf.putInt(0, Integer.MAX_VALUE);
|
||||
AsyncIterable<KeyValue> range = tr.getRange(new byte[0], buf.array(),
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL);
|
||||
|
||||
int counter = 0;
|
||||
try {
|
||||
for(@SuppressWarnings("unused") KeyValue keys : range) {
|
||||
counter++;
|
||||
int counter = 0;
|
||||
try {
|
||||
for(@SuppressWarnings("unused") KeyValue keys : range) {
|
||||
counter++;
|
||||
}
|
||||
}
|
||||
} catch(Exception e) {
|
||||
e.printStackTrace();
|
||||
catch(Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return counter;
|
||||
}
|
||||
return counter;
|
||||
}
|
||||
|
||||
private SerialIteration() {}
|
||||
|
|
|
@ -28,21 +28,13 @@ import com.apple.foundationdb.FDB;
|
|||
import com.apple.foundationdb.TransactionContext;
|
||||
|
||||
public class SerialTest {
|
||||
|
||||
private static final String CLUSTER_FILE = "T:\\Ben\\cluster";
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
final int reps = 1000;
|
||||
try {
|
||||
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
|
||||
Database db = c.openDatabase();
|
||||
runTests(reps, db);
|
||||
|
||||
/*Cluster fCluster = Cluster.create("C:\\Users\\Ben\\workspace\\fdb\\fdb.cluster").get();
|
||||
System.out.println("I now have the cluster");
|
||||
Database db = cluster.openDatabase().get();
|
||||
|
||||
runTests(reps, db);*/
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
try(Database db = fdb.open()) {
|
||||
runTests(reps, db);
|
||||
}
|
||||
} catch(Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
|
|
|
@ -27,14 +27,13 @@ import com.apple.foundationdb.TransactionContext;
|
|||
import com.apple.foundationdb.tuple.Tuple;
|
||||
|
||||
public class TupleTest {
|
||||
private static final String CLUSTER_FILE = "C:\\Users\\Ben\\workspace\\fdb\\fdb.cluster";
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
final int reps = 1000;
|
||||
try {
|
||||
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
|
||||
Database db = c.openDatabase();
|
||||
runTests(reps, db);
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
try(Database db = fdb.open()) {
|
||||
runTests(reps, db);
|
||||
}
|
||||
} catch(Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
|
|
|
@ -33,32 +33,32 @@ import com.apple.foundationdb.tuple.Versionstamp;
|
|||
public class VersionstampSmokeTest {
|
||||
public static void main(String[] args) {
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
Database db = fdb.open();
|
||||
try(Database db = fdb.open()) {
|
||||
db.run(tr -> {
|
||||
tr.clear(Tuple.from("prefix").range());
|
||||
return null;
|
||||
});
|
||||
|
||||
db.run(tr -> {
|
||||
tr.clear(Tuple.from("prefix").range());
|
||||
return null;
|
||||
});
|
||||
CompletableFuture<byte[]> trVersionFuture = db.run((Transaction tr) -> {
|
||||
// The incomplete Versionstamp will have tr's version information when committed.
|
||||
Tuple t = Tuple.from("prefix", Versionstamp.incomplete());
|
||||
tr.mutate(MutationType.SET_VERSIONSTAMPED_KEY, t.packWithVersionstamp(), new byte[0]);
|
||||
return tr.getVersionstamp();
|
||||
});
|
||||
|
||||
CompletableFuture<byte[]> trVersionFuture = db.run((Transaction tr) -> {
|
||||
// The incomplete Versionstamp will have tr's version information when committed.
|
||||
Tuple t = Tuple.from("prefix", Versionstamp.incomplete());
|
||||
tr.mutate(MutationType.SET_VERSIONSTAMPED_KEY, t.packWithVersionstamp(), new byte[0]);
|
||||
return tr.getVersionstamp();
|
||||
});
|
||||
byte[] trVersion = trVersionFuture.join();
|
||||
|
||||
byte[] trVersion = trVersionFuture.join();
|
||||
Versionstamp v = db.run((Transaction tr) -> {
|
||||
Subspace subspace = new Subspace(Tuple.from("prefix"));
|
||||
byte[] serialized = tr.getRange(subspace.range(), 1).iterator().next().getKey();
|
||||
Tuple t = subspace.unpack(serialized);
|
||||
return t.getVersionstamp(0);
|
||||
});
|
||||
|
||||
Versionstamp v = db.run((Transaction tr) -> {
|
||||
Subspace subspace = new Subspace(Tuple.from("prefix"));
|
||||
byte[] serialized = tr.getRange(subspace.range(), 1).iterator().next().getKey();
|
||||
Tuple t = subspace.unpack(serialized);
|
||||
return t.getVersionstamp(0);
|
||||
});
|
||||
|
||||
System.out.println(v);
|
||||
System.out.println(Versionstamp.complete(trVersion));
|
||||
assert v.equals(Versionstamp.complete(trVersion));
|
||||
System.out.println(v);
|
||||
System.out.println(Versionstamp.complete(trVersion));
|
||||
assert v.equals(Versionstamp.complete(trVersion));
|
||||
}
|
||||
}
|
||||
|
||||
private VersionstampSmokeTest() {}
|
||||
|
|
|
@ -35,79 +35,84 @@ public class WatchTest {
|
|||
|
||||
public static void main(String[] args) {
|
||||
FDB fdb = FDB.selectAPIVersion(510);
|
||||
Database database = fdb.open(args[0]);
|
||||
database.options().setLocationCacheSize(42);
|
||||
Transaction tr = database.createTransaction();
|
||||
byte[] bs = tr.get("a".getBytes()).join();
|
||||
System.out.println("`a' -> " + (bs == null ? "<null>" : new String(bs)));
|
||||
final CompletableFuture<Void> watch = tr.watch("a".getBytes());
|
||||
System.err.println("Watch started...");
|
||||
//System.exit(0);
|
||||
tr.commit().join();
|
||||
watch.cancel(true);
|
||||
try {
|
||||
watch.join();
|
||||
System.out.println("`a' changed");
|
||||
} catch(FDBException e) {
|
||||
System.out.println("`a' watch error -> " + e.getMessage());
|
||||
if(e.getCode() != 1101)
|
||||
throw e;
|
||||
}
|
||||
try(Database database = fdb.open(args[0])) {
|
||||
database.options().setLocationCacheSize(42);
|
||||
try(Transaction tr = database.createTransaction()) {
|
||||
byte[] bs = tr.get("a".getBytes()).join();
|
||||
System.out.println("`a' -> " + (bs == null ? "<null>" : new String(bs)));
|
||||
final CompletableFuture<Void> watch = tr.watch("a".getBytes());
|
||||
System.err.println("Watch started...");
|
||||
//System.exit(0);
|
||||
tr.commit().join();
|
||||
watch.cancel(true);
|
||||
try {
|
||||
watch.join();
|
||||
System.out.println("`a' changed");
|
||||
}
|
||||
catch(FDBException e) {
|
||||
System.out.println("`a' watch error -> " + e.getMessage());
|
||||
if(e.getCode() != 1101)
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
raceTest(database);
|
||||
raceTest(database);
|
||||
}
|
||||
}
|
||||
|
||||
public static void raceTest(Database db) {
|
||||
ExecutorService e = Executors.newCachedThreadPool(); // Executors.newFixedThreadPool(2);
|
||||
Random r = new Random();
|
||||
|
||||
Transaction tr = db.createTransaction();
|
||||
byte[] key = "hello".getBytes();
|
||||
try(Transaction tr = db.createTransaction()) {
|
||||
byte[] key = "hello".getBytes();
|
||||
|
||||
for(int i = 0; i < 10000; i++) {
|
||||
final CompletableFuture<Void> f = tr.watch(key);
|
||||
final AtomicInteger a = new AtomicInteger();
|
||||
Runnable cancel = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for(int i = 0; i < 10000; i++) {
|
||||
final CompletableFuture<Void> f = tr.watch(key);
|
||||
final AtomicInteger a = new AtomicInteger();
|
||||
Runnable cancel = () -> {
|
||||
System.err.println("`f' cancel()...");
|
||||
f.cancel(true);
|
||||
a.incrementAndGet();
|
||||
};
|
||||
Runnable get = () -> {
|
||||
try {
|
||||
System.err.println("`f' get()...");
|
||||
f.join();
|
||||
System.err.println("`f' changed");
|
||||
}
|
||||
catch(FDBException e12) {
|
||||
System.err.println("`f' watch error -> " + e12.getMessage());
|
||||
if(e12.getCode() != 1101)
|
||||
throw e12;
|
||||
}
|
||||
finally {
|
||||
a.incrementAndGet();
|
||||
}
|
||||
};
|
||||
if(r.nextBoolean()) {
|
||||
e.execute(cancel);
|
||||
e.execute(get);
|
||||
}
|
||||
};
|
||||
Runnable get = () -> {
|
||||
try {
|
||||
System.err.println("`f' get()...");
|
||||
f.join();
|
||||
System.err.println("`f' changed");
|
||||
} catch(FDBException e12) {
|
||||
System.err.println("`f' watch error -> " + e12.getMessage());
|
||||
if(e12.getCode() != 1101)
|
||||
throw e12;
|
||||
} finally {
|
||||
a.incrementAndGet();
|
||||
else {
|
||||
e.execute(get);
|
||||
e.execute(cancel);
|
||||
}
|
||||
};
|
||||
if(r.nextBoolean()) {
|
||||
e.execute(cancel);
|
||||
e.execute(get);
|
||||
} else {
|
||||
e.execute(get);
|
||||
e.execute(cancel);
|
||||
}
|
||||
|
||||
while(a.get() != 2) {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e1) {
|
||||
// TODO Auto-generated catch block
|
||||
e1.printStackTrace();
|
||||
while(a.get() != 2) {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
catch(InterruptedException e1) {
|
||||
// TODO Auto-generated catch block
|
||||
e1.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//if(i % 1000 == 0) {
|
||||
System.out.println("Done with " + i);
|
||||
//}
|
||||
//if(i % 1000 == 0) {
|
||||
System.out.println("Done with " + i);
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue