I have had it with these monkey-fighting whitespaces changes in this Monday-to-Friday repo

This commit is contained in:
Alec Grieser 2017-12-14 11:45:08 -08:00
parent 311bb89258
commit c7a730006c
1 changed files with 170 additions and 170 deletions

View File

@ -242,7 +242,7 @@ public class DirectoryLayer implements Directory {
*/
@Override
public int hashCode() {
return path.hashCode() ^ (nodeSubspace.hashCode() * 179) ^ (contentSubspace.hashCode() * 937);
return path.hashCode() ^ (nodeSubspace.hashCode() * 179) ^ (contentSubspace.hashCode() * 937);
}
/**
@ -327,7 +327,7 @@ public class DirectoryLayer implements Directory {
*/
@Override
public CompletableFuture<DirectorySubspace> open(ReadTransactionContext tcx, final List<String> path, final byte[] layer) {
return tcx.readAsync(rtr -> createOrOpenInternal(rtr, null, path, layer, null, false, true));
return tcx.readAsync(rtr -> createOrOpenInternal(rtr, null, path, layer, null, false, true));
}
/**
@ -349,7 +349,7 @@ public class DirectoryLayer implements Directory {
*/
@Override
public CompletableFuture<DirectorySubspace> create(TransactionContext tcx, final List<String> path, final byte[] layer, final byte[] prefix) {
return tcx.runAsync(tr -> createOrOpenInternal(tr, tr, path, layer, prefix, true, false));
return tcx.runAsync(tr -> createOrOpenInternal(tr, tr, path, layer, prefix, true, false));
}
/**
@ -512,25 +512,25 @@ public class DirectoryLayer implements Directory {
final List<String> pathCopy = new ArrayList<String>(path);
return tcx.readAsync(tr -> checkVersion(tr)
.thenComposeAsync(ignore ->
new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()),
tr.getExecutor())
.thenComposeAsync(node -> {
if(!node.exists())
throw new NoSuchDirectoryException(toAbsolutePath(pathCopy));
.thenComposeAsync(ignore ->
new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()),
tr.getExecutor())
.thenComposeAsync(node -> {
if(!node.exists())
throw new NoSuchDirectoryException(toAbsolutePath(pathCopy));
if(node.isInPartition(true))
return node.getContents().list(tr, node.getPartitionSubpath());
if(node.isInPartition(true))
return node.getContents().list(tr, node.getPartitionSubpath());
final Subspace subdir = node.subspace.get(SUB_DIR_KEY);
final Subspace subdir = node.subspace.get(SUB_DIR_KEY);
return AsyncUtil.collect(
AsyncUtil.mapIterable(tr.getRange(subdir.range()),
return AsyncUtil.collect(
AsyncUtil.mapIterable(tr.getRange(subdir.range()),
kv -> subdir.unpack(kv.getKey()).getString(0),
tr.getExecutor()
)
);
}, tr.getExecutor())
)
);
}, tr.getExecutor())
);
}
@ -558,17 +558,17 @@ public class DirectoryLayer implements Directory {
final List<String> pathCopy = new ArrayList<>(path);
return tcx.readAsync(tr -> checkVersion(tr)
.thenComposeAsync(ignore ->
new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()),
tr.getExecutor())
.thenComposeAsync(node -> {
if(!node.exists())
return AsyncUtil.READY_FALSE;
else if(node.isInPartition(false))
return node.getContents().exists(tr, node.getPartitionSubpath());
.thenComposeAsync(ignore ->
new NodeFinder(pathCopy).find(tr).thenComposeAsync(new NodeMetadataLoader(tr), tr.getExecutor()),
tr.getExecutor())
.thenComposeAsync(node -> {
if(!node.exists())
return AsyncUtil.READY_FALSE;
else if(node.isInPartition(false))
return node.getContents().exists(tr, node.getPartitionSubpath());
return AsyncUtil.READY_TRUE;
}, tr.getExecutor()));
return AsyncUtil.READY_TRUE;
}, tr.getExecutor()));
}
//
@ -593,15 +593,15 @@ public class DirectoryLayer implements Directory {
return tr.getRange(nodeSubspace.range().begin, ByteArrayUtil.join(nodeSubspace.pack(key), new byte[]{0x00}), 1, true)
.asList()
.thenApply(results -> {
if(results.size() > 0) {
byte[] resultKey = results.get(0).getKey();
byte[] prevPrefix = nodeSubspace.unpack(resultKey).getBytes(0);
if(ByteArrayUtil.startsWith(key, prevPrefix)) {
return nodeWithPrefix(prevPrefix);
}
}
if(results.size() > 0) {
byte[] resultKey = results.get(0).getKey();
byte[] prevPrefix = nodeSubspace.unpack(resultKey).getBytes(0);
if(ByteArrayUtil.startsWith(key, prevPrefix)) {
return nodeWithPrefix(prevPrefix);
}
}
return null;
return null;
});
}
@ -659,13 +659,13 @@ public class DirectoryLayer implements Directory {
tr.clear(node.range());
return AsyncUtil.whileTrue(() -> {
CompletableFuture<Void> subdirRemoveFuture;
if(rangeItr.onHasNext().isDone() && rangeItr.hasNext())
subdirRemoveFuture = removeRecursive(tr, nodeWithPrefix(rangeItr.next().getValue()));
else
subdirRemoveFuture = AsyncUtil.DONE;
CompletableFuture<Void> subdirRemoveFuture;
if(rangeItr.onHasNext().isDone() && rangeItr.hasNext())
subdirRemoveFuture = removeRecursive(tr, nodeWithPrefix(rangeItr.next().getValue()));
else
subdirRemoveFuture = AsyncUtil.DONE;
return subdirRemoveFuture.thenCompose(ignore -> rangeItr.onHasNext());
return subdirRemoveFuture.thenCompose(ignore -> rangeItr.onHasNext());
}, tr.getExecutor());
}
@ -674,14 +674,14 @@ public class DirectoryLayer implements Directory {
// allocated prefix (including the root node). This means that it neither
// contains any other prefix nor is contained by any other prefix.
if(prefix == null || prefix.length == 0)
return AsyncUtil.READY_FALSE;
return AsyncUtil.READY_FALSE;
return nodeContainingKey(tr, prefix).thenComposeAsync(node -> {
if(node != null)
return AsyncUtil.READY_FALSE;
if(node != null)
return AsyncUtil.READY_FALSE;
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext().thenApply(hasNext -> !hasNext);
final AsyncIterator<KeyValue> it = tr.getRange(nodeSubspace.pack(prefix), nodeSubspace.pack(ByteArrayUtil.strinc(prefix)), 1).iterator();
return it.onHasNext().thenApply(hasNext -> !hasNext);
}, tr.getExecutor());
}
@ -769,46 +769,46 @@ public class DirectoryLayer implements Directory {
}
return checkOrWriteVersion(tr).thenComposeAsync(ignore -> {
if(prefix == null) {
return allocator.allocate(tr).thenComposeAsync(allocated -> {
final byte[] finalPrefix = ByteArrayUtil.join(contentSubspace.getKey(), allocated);
return tr.getRange(Range.startsWith(finalPrefix), 1).iterator().onHasNext().thenApply(hasAny -> {
if(hasAny) {
throw new IllegalStateException("The database has keys stored at the prefix chosen by the automatic " +
"prefix allocator: " + ByteArrayUtil.printable(finalPrefix) + ".");
}
return finalPrefix;
});
}, tr.getExecutor());
}
else
return CompletableFuture.completedFuture(prefix);
if(prefix == null) {
return allocator.allocate(tr).thenComposeAsync(allocated -> {
final byte[] finalPrefix = ByteArrayUtil.join(contentSubspace.getKey(), allocated);
return tr.getRange(Range.startsWith(finalPrefix), 1).iterator().onHasNext().thenApply(hasAny -> {
if(hasAny) {
throw new IllegalStateException("The database has keys stored at the prefix chosen by the automatic " +
"prefix allocator: " + ByteArrayUtil.printable(finalPrefix) + ".");
}
return finalPrefix;
});
}, tr.getExecutor());
}
else
return CompletableFuture.completedFuture(prefix);
}, tr.getExecutor())
.thenComposeAsync(actualPrefix -> isPrefixFree(prefix == null ? tr.snapshot() : tr, actualPrefix)
.thenComposeAsync(prefixFree -> {
if(!prefixFree) {
if(prefix == null) {
throw new IllegalStateException("The directory layer has manually allocated prefixes that conflict " +
"with the automatic prefix allocator.");
}
else
throw new IllegalArgumentException("Prefix already in use: " + ByteArrayUtil.printable(actualPrefix) + ".");
}
else if(path.size() > 1) {
return createOrOpen(tr, PathUtil.popBack(path)).thenApply(dir -> nodeWithPrefix(dir.getKey()));
}
else
return CompletableFuture.completedFuture(rootNode);
}, tr.getExecutor())
.thenApplyAsync(parentNode -> {
if(parentNode == null)
throw new IllegalStateException("The parent directory does not exist."); //Shouldn't happen
Subspace node = nodeWithPrefix(actualPrefix);
tr.set(parentNode.get(SUB_DIR_KEY).get(getLast(path)).getKey(), actualPrefix);
tr.set(node.get(LAYER_KEY).getKey(), layer);
return contentsOfNode(node, path, layer);
}, tr.getExecutor()),
tr.getExecutor());
.thenComposeAsync(prefixFree -> {
if(!prefixFree) {
if(prefix == null) {
throw new IllegalStateException("The directory layer has manually allocated prefixes that conflict " +
"with the automatic prefix allocator.");
}
else
throw new IllegalArgumentException("Prefix already in use: " + ByteArrayUtil.printable(actualPrefix) + ".");
}
else if(path.size() > 1) {
return createOrOpen(tr, PathUtil.popBack(path)).thenApply(dir -> nodeWithPrefix(dir.getKey()));
}
else
return CompletableFuture.completedFuture(rootNode);
}, tr.getExecutor())
.thenApplyAsync(parentNode -> {
if(parentNode == null)
throw new IllegalStateException("The parent directory does not exist."); //Shouldn't happen
Subspace node = nodeWithPrefix(actualPrefix);
tr.set(parentNode.get(SUB_DIR_KEY).get(getLast(path)).getKey(), actualPrefix);
tr.set(node.get(LAYER_KEY).getKey(), layer);
return contentsOfNode(node, path, layer);
}, tr.getExecutor()),
tr.getExecutor());
}
//
@ -906,23 +906,23 @@ public class DirectoryLayer implements Directory {
currentPath = new ArrayList<>();
return AsyncUtil.whileTrue(() -> {
if(index == path.size())
return AsyncUtil.READY_FALSE;
if(index == path.size())
return AsyncUtil.READY_FALSE;
return tr.get(node.subspace.get(SUB_DIR_KEY).get(path.get(index)).getKey()).thenComposeAsync(key -> {
currentPath.add(path.get(index));
node = new Node(nodeWithPrefix(key), currentPath, path);
return tr.get(node.subspace.get(SUB_DIR_KEY).get(path.get(index)).getKey()).thenComposeAsync(key -> {
currentPath.add(path.get(index));
node = new Node(nodeWithPrefix(key), currentPath, path);
if(!node.exists())
return AsyncUtil.READY_FALSE;
if(!node.exists())
return AsyncUtil.READY_FALSE;
return node.loadMetadata(tr).thenApply(ignore -> {
++index;
return !Arrays.equals(node.layer, DirectoryLayer.PARTITION_LAYER);
});
}, tr.getExecutor());
return node.loadMetadata(tr).thenApply(ignore -> {
++index;
return !Arrays.equals(node.layer, DirectoryLayer.PARTITION_LAYER);
});
}, tr.getExecutor());
}, tr.getExecutor())
.thenApply(ignore -> node);
.thenApply(ignore -> node);
}
}
@ -968,9 +968,9 @@ public class DirectoryLayer implements Directory {
return tr.get(subspace.pack(new Tuple().add(LAYER_KEY)))
.thenApply(value -> {
layer = value;
loadedMetadata = true;
return Node.this;
layer = value;
loadedMetadata = true;
return Node.this;
});
}
@ -1011,99 +1011,99 @@ public class DirectoryLayer implements Directory {
public CompletableFuture<byte[]> find(final Transaction tr, final HighContentionAllocator allocator) {
return AsyncUtil.whileTrue(() -> {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext().thenApply(hasNext -> {
if(hasNext) {
KeyValue kv = rangeItr.next();
windowStart = allocator.counters.unpack(kv.getKey()).getLong(0);
}
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext().thenApply(hasNext -> {
if(hasNext) {
KeyValue kv = rangeItr.next();
windowStart = allocator.counters.unpack(kv.getKey()).getLong(0);
}
return null;
})
.thenComposeAsync(ignore -> chooseWindow(tr, allocator), tr.getExecutor())
.thenComposeAsync(ignore -> choosePrefix(tr, allocator), tr.getExecutor());
return null;
})
.thenComposeAsync(ignore -> chooseWindow(tr, allocator), tr.getExecutor())
.thenComposeAsync(ignore -> choosePrefix(tr, allocator), tr.getExecutor());
}, tr.getExecutor())
.thenApply(ignore -> Tuple.from(candidate).pack());
.thenApply(ignore -> Tuple.from(candidate).pack());
}
public CompletableFuture<Void> chooseWindow(final Transaction tr, final HighContentionAllocator allocator) {
final long initialWindowStart = windowStart;
return AsyncUtil.whileTrue(() -> {
final byte[] counterKey = allocator.counters.get(windowStart).getKey();
final byte[] counterKey = allocator.counters.get(windowStart).getKey();
Range oldCounters = new Range(allocator.counters.getKey(), counterKey);
Range oldAllocations = new Range(allocator.recent.getKey(), allocator.recent.get(windowStart).getKey());
Range oldCounters = new Range(allocator.counters.getKey(), counterKey);
Range oldAllocations = new Range(allocator.recent.getKey(), allocator.recent.get(windowStart).getKey());
CompletableFuture<byte[]> newCountRead;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
if(windowStart > initialWindowStart) {
tr.clear(oldCounters);
tr.options().setNextWriteNoWriteConflictRange();
tr.clear(oldAllocations);
}
CompletableFuture<byte[]> newCountRead;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
if(windowStart > initialWindowStart) {
tr.clear(oldCounters);
tr.options().setNextWriteNoWriteConflictRange();
tr.clear(oldAllocations);
}
tr.mutate(MutationType.ADD, counterKey, LITTLE_ENDIAN_LONG_ONE);
newCountRead = tr.snapshot().get(counterKey);
}
tr.mutate(MutationType.ADD, counterKey, LITTLE_ENDIAN_LONG_ONE);
newCountRead = tr.snapshot().get(counterKey);
}
return newCountRead.thenApply(newCountBytes -> {
long newCount = newCountBytes == null ? 0 : unpackLittleEndian(newCountBytes);
windowSize = getWindowSize(windowStart);
if(newCount * 2 >= windowSize) {
windowStart += windowSize;
return true;
}
return newCountRead.thenApply(newCountBytes -> {
long newCount = newCountBytes == null ? 0 : unpackLittleEndian(newCountBytes);
windowSize = getWindowSize(windowStart);
if(newCount * 2 >= windowSize) {
windowStart += windowSize;
return true;
}
return false; // exit the loop
});
return false; // exit the loop
});
}, tr.getExecutor());
}
public CompletableFuture<Boolean> choosePrefix(final Transaction tr, final HighContentionAllocator allocator) {
restart = false;
return AsyncUtil.whileTrue(() -> {
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional
// subsequent risk of conflict for this transaction.
candidate = windowStart + random.nextInt(windowSize);
final byte[] allocationKey = allocator.recent.get(candidate).getKey();
Range countersRange = allocator.counters.range();
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional
// subsequent risk of conflict for this transaction.
candidate = windowStart + random.nextInt(windowSize);
final byte[] allocationKey = allocator.recent.get(candidate).getKey();
Range countersRange = allocator.counters.range();
AsyncIterable<KeyValue> counterRange;
CompletableFuture<byte[]> allocationTemp;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
counterRange = tr.snapshot().getRange(countersRange, 1, true);
allocationTemp = tr.get(allocationKey);
tr.options().setNextWriteNoWriteConflictRange();
tr.set(allocationKey, EMPTY_BYTES);
}
AsyncIterable<KeyValue> counterRange;
CompletableFuture<byte[]> allocationTemp;
// SOMEDAY: synchronize on something transaction local
synchronized(HighContentionAllocator.class) {
counterRange = tr.snapshot().getRange(countersRange, 1, true);
allocationTemp = tr.get(allocationKey);
tr.options().setNextWriteNoWriteConflictRange();
tr.set(allocationKey, EMPTY_BYTES);
}
final CompletableFuture<List<KeyValue>> lastCounter = counterRange.asList();
final CompletableFuture<byte[]> allocation = allocationTemp;
final CompletableFuture<List<KeyValue>> lastCounter = counterRange.asList();
final CompletableFuture<byte[]> allocation = allocationTemp;
return lastCounter.thenCombineAsync(allocation, (result, allocationValue) -> {
long currentWindowStart = 0;
if(!result.isEmpty()) {
currentWindowStart = allocator.counters.unpack(result.get(0).getKey()).getLong(0);
}
return lastCounter.thenCombineAsync(allocation, (result, allocationValue) -> {
long currentWindowStart = 0;
if(!result.isEmpty()) {
currentWindowStart = allocator.counters.unpack(result.get(0).getKey()).getLong(0);
}
if(currentWindowStart > windowStart) {
restart = true;
return false; // exit the loop and rerun the allocation from the beginning
}
if(currentWindowStart > windowStart) {
restart = true;
return false; // exit the loop and rerun the allocation from the beginning
}
if(allocationValue == null) {
tr.addWriteConflictKey(allocationKey);
return false; // exit the loop and return this candidate
}
if(allocationValue == null) {
tr.addWriteConflictKey(allocationKey);
return false; // exit the loop and return this candidate
}
return true;
}, tr.getExecutor());
return true;
}, tr.getExecutor());
}, tr.getExecutor())
.thenApply(ignore -> restart);
.thenApply(ignore -> restart);
}
private static int getWindowSize(long start) {
@ -1134,8 +1134,8 @@ public class DirectoryLayer implements Directory {
/**
* Returns a byte string that:
* <ol>
* <li>has never and will never be returned by another call to this method on the same subspace</li>
* <li>is nearly as short as possible given the above</li>
* <li>has never and will never be returned by another call to this method on the same subspace</li>
* <li>is nearly as short as possible given the above</li>
* </ol>
*/
public CompletableFuture<byte[]> allocate(final Transaction tr) {