Various fixes to stack tester Transaction reference counting.

This commit is contained in:
A.J. Beamon 2017-12-08 14:52:01 -08:00
parent 570b60fe53
commit 0c0069a02e
4 changed files with 63 additions and 89 deletions

View File

@ -300,6 +300,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
return database;
}
// Users of this function must dispose of the returned FutureResults when finished
protected FutureResults getRange_internal(
KeySelector begin, KeySelector end,
int rowLimit, int targetBytes, int streamingMode,

View File

@ -696,37 +696,6 @@ public class AsyncStackTester {
return CompletableFuture.completedFuture(null);
});
}
private static CompletableFuture<Void> logStack(final Instruction inst, final byte[] prefix, int i) {
//System.out.println("Logging stack at " + i);
while(inst.size() > 0) {
StackEntry e = inst.pop();
byte[] pk = Tuple.from(i, e.idx).pack(prefix);
byte[] pv = Tuple.from(StackUtils.serializeFuture(e.value)).pack();
inst.tr.set(pk, pv.length < 40000 ? pv : Arrays.copyOfRange(pv, 0, 40000));
i--;
if(i % 100 == 0) {
final int saved = i;
return inst.tr.commit().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void o) {
inst.releaseTransaction();
inst.context.newTransaction();
inst.tr = inst.context.getCurrentTransaction();
return logStack(inst, prefix, saved);
}
});
}
}
return inst.tr.commit().thenApplyAsync(new Function<Void, Void>() {
@Override
public Void apply(Void a) {
inst.releaseTransaction();
inst.context.newTransaction();
inst.tr = inst.context.getCurrentTransaction();
return null;
}
});
}
private static CompletableFuture<Void> pushRange(Instruction inst, CompletableFuture<List<KeyValue>> range) {
return pushRange(inst, range, null);

View File

@ -84,31 +84,39 @@ abstract class Context implements Runnable {
}
}
public synchronized Transaction getCurrentTransaction() {
Transaction tr = Context.transactionMap.get(this.trName);
Context.transactionRefCounts.get(tr).incrementAndGet();
public static synchronized void addTransactionReference(Transaction tr) {
transactionRefCounts.computeIfAbsent(tr, x -> new AtomicInteger(0)).incrementAndGet();
}
private static synchronized Transaction getTransaction(String trName) {
Transaction tr = transactionMap.get(trName);
addTransactionReference(tr);
return tr;
}
public synchronized void releaseTransaction(Transaction tr) {
public Transaction getCurrentTransaction() {
return getTransaction(trName);
}
public static synchronized void releaseTransaction(Transaction tr) {
if(tr != null) {
AtomicInteger count = Context.transactionRefCounts.get(tr);
AtomicInteger count = transactionRefCounts.get(tr);
if(count.decrementAndGet() == 0) {
Context.transactionRefCounts.remove(tr);
assert !transactionMap.containsValue(tr);
transactionRefCounts.remove(tr);
tr.dispose();
}
}
}
public synchronized void updateCurrentTransaction(Transaction tr) {
Context.transactionRefCounts.computeIfAbsent(tr, x -> new AtomicInteger(1));
releaseTransaction(Context.transactionMap.put(this.trName, tr));
private static synchronized void updateTransaction(String trName, Transaction tr) {
releaseTransaction(transactionMap.put(trName, tr));
addTransactionReference(tr);
}
public synchronized boolean updateCurrentTransaction(Transaction oldTr, Transaction newTr) {
if(Context.transactionMap.replace(this.trName, oldTr, newTr)) {
AtomicInteger count = Context.transactionRefCounts.computeIfAbsent(newTr, x -> new AtomicInteger(0));
count.incrementAndGet();
private static synchronized boolean updateTransaction(String trName, Transaction oldTr, Transaction newTr) {
if(transactionMap.replace(trName, oldTr, newTr)) {
addTransactionReference(newTr);
releaseTransaction(oldTr);
return true;
}
@ -116,6 +124,14 @@ abstract class Context implements Runnable {
return false;
}
public void updateCurrentTransaction(Transaction tr) {
updateTransaction(trName, tr);
}
public boolean updateCurrentTransaction(Transaction oldTr, Transaction newTr) {
return updateTransaction(trName, oldTr, newTr);
}
public void newTransaction() {
Transaction tr = db.createTransaction();
updateCurrentTransaction(tr);
@ -128,10 +144,9 @@ abstract class Context implements Runnable {
}
}
public synchronized void switchTransaction(byte[] trName) {
this.trName = ByteArrayUtil.printable(trName);
Transaction tr = Context.transactionMap.computeIfAbsent(this.trName, x -> db.createTransaction());
Context.transactionRefCounts.computeIfAbsent(tr, x -> new AtomicInteger(1));
public void switchTransaction(byte[] rawTrName) {
trName = ByteArrayUtil.printable(rawTrName);
newTransaction(null);
}
abstract void executeOperations() throws Throwable;

View File

@ -34,77 +34,66 @@ class Instruction extends Stack {
private final static String SUFFIX_SNAPSHOT = "_SNAPSHOT";
private final static String SUFFIX_DATABASE = "_DATABASE";
String op;
Tuple tokens;
Context context;
boolean isDatabase;
boolean isSnapshot;
Transaction tr;
ReadTransaction readTr;
TransactionContext tcx;
ReadTransactionContext readTcx;
final String op;
final Tuple tokens;
final Context context;
final boolean isDatabase;
final boolean isSnapshot;
final Transaction tr;
final ReadTransaction readTr;
final TransactionContext tcx;
final ReadTransactionContext readTcx;
public Instruction(Context context, Tuple tokens) {
this.context = context;
this.tokens = tokens;
op = tokens.getString(0);
isDatabase = op.endsWith(SUFFIX_DATABASE);
isSnapshot = op.endsWith(SUFFIX_SNAPSHOT);
String fullOp = tokens.getString(0);
isDatabase = fullOp.endsWith(SUFFIX_DATABASE);
isSnapshot = fullOp.endsWith(SUFFIX_SNAPSHOT);
if(isDatabase) {
this.tr = null;
tr = null;
readTr = null;
op = op.substring(0, op.length() - SUFFIX_DATABASE.length());
op = fullOp.substring(0, fullOp.length() - SUFFIX_DATABASE.length());
}
else if(isSnapshot) {
this.tr = context.getCurrentTransaction();
readTr = this.tr.snapshot();
op = op.substring(0, op.length() - SUFFIX_SNAPSHOT.length());
tr = context.getCurrentTransaction();
readTr = tr.snapshot();
op = fullOp.substring(0, fullOp.length() - SUFFIX_SNAPSHOT.length());
}
else {
this.tr = context.getCurrentTransaction();
readTr = this.tr;
tr = context.getCurrentTransaction();
readTr = tr;
op = fullOp;
}
tcx = isDatabase ? context.db : this.tr;
readTcx = isDatabase ? context.db : this.readTr;
tcx = isDatabase ? context.db : tr;
readTcx = isDatabase ? context.db : readTr;
}
void setTransaction(Transaction tr) {
void setTransaction(Transaction newTr) {
if(!isDatabase) {
context.releaseTransaction(this.tr);
context.updateCurrentTransaction(tr);
this.tr = context.getCurrentTransaction();
if(isSnapshot) {
readTr = this.tr.snapshot();
}
else {
readTr = tr;
}
context.updateCurrentTransaction(newTr);
}
}
void setTransaction(Transaction oldTr, Transaction newTr) {
if(!isDatabase) {
context.updateCurrentTransaction(oldTr, newTr);
this.tr = context.getCurrentTransaction();
if(isSnapshot) {
readTr = this.tr.snapshot();
}
else {
readTr = tr;
}
}
}
void releaseTransaction() {
context.releaseTransaction(this.tr);
Context.releaseTransaction(tr);
}
void push(Object o) {
if(o instanceof CompletableFuture && tr != null) {
CompletableFuture<?> future = (CompletableFuture<?>)o;
Context.addTransactionReference(tr);
future.whenComplete((x, t) -> Context.releaseTransaction(tr));
}
context.stack.push(context.instructionIndex, o);
}