Merge branch 'master' into java-add-missing-dispose

# Conflicts:
#	bindings/java/src-completable/main/com/apple/foundationdb/Cluster.java
#	bindings/java/src-completable/main/com/apple/foundationdb/async/AsyncIterator.java
#	bindings/java/src-completable/main/com/apple/foundationdb/async/AsyncUtil.java
This commit is contained in:
A.J. Beamon 2017-12-06 10:22:39 -08:00
commit f456c67bda
64 changed files with 648 additions and 444 deletions

View File

@ -35,6 +35,13 @@ ifeq ($(PLATFORM),Linux)
CXX ?= g++
CXXFLAGS += -std=c++0x
HARDENING_CFLAGS := -fstack-protector-all -Wstack-protector --param ssp-buffer-size=4 -fPIC
CFLAGS += ${HARDENING_CFLAGS}
# TODO(alexmiller): boost 1.52.0 prevents us from using most of these with -Werror.
# Reassess after boost has been upgraded to >1.52.0.
# CFLAGS += -Wall -Wextra -Wformat-security -Wconversion -Wsign-conversion -Werror
HARDENING_LDFLAGS := -Wl,-z,noexecstack -Wl,-z,relro -Wl,-z,now
LDFLAGS := ${HARDENING_CFLAGS} ${HARDENING_LDFLAGS}
BOOSTDIR ?= /opt/boost_1_52_0
DLEXT := so
@ -84,7 +91,6 @@ CFLAGS += -g
# valgrind-compatibile builds are enabled by uncommenting lines in valgind.mk
CXXFLAGS += -Wno-deprecated
LDFLAGS :=
LIBS :=
STATIC_LIBS :=

View File

@ -39,16 +39,13 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
protected Cluster(long cPtr, Executor executor) {
super(cPtr);
this.executor = executor;
this.options = new ClusterOptions(new OptionConsumer() {
@Override
public void setOption(int code, byte[] parameter) {
this.options = new ClusterOptions((code, parameter) -> {
pointerReadLock.lock();
try {
Cluster_setOption(getPtr(), code, parameter);
} finally {
pointerReadLock.unlock();
}
}
});
}
@ -84,7 +81,7 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
* successful connection.
*/
public Database openDatabase(Executor e) throws FDBException {
FutureDatabase futureDatabase = null;
FutureDatabase futureDatabase;
pointerReadLock.lock();
try {
futureDatabase = new FutureDatabase(Cluster_createDatabase(getPtr(), "DB".getBytes(UTF8)), e);

View File

@ -21,7 +21,6 @@
package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
@ -43,7 +42,7 @@ public interface Database extends Disposable, TransactionContext {
/**
* Creates a {@link Transaction} that operates on this {@code Database}.<br>
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.<br>
* <br>

View File

@ -103,7 +103,7 @@ public class LocalityUtil {
*/
public static CompletableFuture<String[]> getAddressesForKey(Transaction tr, byte[] key) {
if (!(tr instanceof FDBTransaction)) {
CompletableFuture<String[]> future = new CompletableFuture<String[]>();
CompletableFuture<String[]> future = new CompletableFuture<>();
future.completeExceptionally(new FDBException("locality_information_unavailable", 1033));
return future;
}
@ -233,7 +233,7 @@ public class LocalityUtil {
}
}
static Charset ASCII = Charset.forName("US-ASCII");
private static Charset ASCII = Charset.forName("US-ASCII");
static byte[] keyServersForKey(byte[] key) {
return ByteArrayUtil.join(new byte[] { (byte)255 },
"/keyServers/".getBytes(ASCII),

View File

@ -33,5 +33,5 @@ public interface OptionConsumer {
* @param code the encoded parameter to set
* @param parameter the value, the range of which is dependent on the parameter {@code code}
*/
public void setOption(int code, byte[] parameter);
void setOption(int code, byte[] parameter);
}

View File

@ -241,7 +241,6 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// - no more data -or-
// - we are already fetching the next block
return mainChunkIsTheLast() ?
//new ReadyFuture<Boolean>(false, tr.getExecutor()) :
CompletableFuture.completedFuture(false) :
nextFuture;
}
@ -251,11 +250,6 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
return onHasNext().join();
}
// moves to the last position in the current chunk
/*public synchronized void consumeAll() {
index = chunk.values.size() - 1;
}*/
@Override
public KeyValue next() {
CompletableFuture<Boolean> nextFuture;
@ -297,7 +291,12 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// If there was no result ready then we need to wait on the future
// and return the proper result, throwing if there are no more elements
return nextFuture.thenApply(NEXT_MAPPER).join();
return nextFuture.thenApply(hasNext -> {
if(hasNext) {
return next();
}
throw new NoSuchElementException();
}).join();
}
@Override
@ -314,14 +313,5 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
nextFuture.cancel(true);
fetchingChunk.cancel(true);
}
private final Function<Boolean, KeyValue> NEXT_MAPPER = new Function<Boolean, KeyValue>() {
@Override
public KeyValue apply(Boolean o) {
if(o)
return next();
throw new NoSuchElementException();
}
};
}
}

View File

@ -28,10 +28,9 @@ import com.apple.foundationdb.tuple.Tuple;
/**
* A read-only subset of a FoundationDB {@link Transaction}. This is the interface that
* {@code Transaction}'s {@link Transaction#snapshot snapshot} presents.
*
* {@code Transaction}'s {@link Transaction#snapshot snapshot} presents.<br>
* <br>
* Note: Client must call {@link Transaction#commit()} and wait on the result on all transactions,
* <b>Note:</b> Client must call {@link Transaction#commit()} and wait on the result on all transactions,
* even ones that only read. This is done automatically when using the retry loops from
* {@link Database#run(Function)}. This is explained more in the intro to {@link Transaction}.
*
@ -42,13 +41,13 @@ public interface ReadTransaction extends ReadTransactionContext {
* When passed to a {@code getRange()} call that takes a {@code limit} parameter,
* indicates that the query should return unlimited rows.
*/
public static final int ROW_LIMIT_UNLIMITED = 0;
int ROW_LIMIT_UNLIMITED = 0;
/**
* Gets the version at which the reads for this {@code Transaction} will access the database.
* @return the version for database reads
*/
public CompletableFuture<Long> getReadVersion();
CompletableFuture<Long> getReadVersion();
/**
* Gets a value from the database. The call will return {@code null} if the key is not
@ -59,7 +58,7 @@ public interface ReadTransaction extends ReadTransactionContext {
* @return a {@code CompletableFuture} which will be set to the value corresponding to
* the key or to null if the key does not exist.
*/
public CompletableFuture<byte[]> get(byte[] key);
CompletableFuture<byte[]> get(byte[] key);
/**
* Returns the key referenced by the specified {@code KeySelector}.
@ -74,7 +73,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a {@code CompletableFuture} which will be set to an absolute database key
*/
public CompletableFuture<byte[]> getKey(KeySelector selector);
CompletableFuture<byte[]> getKey(KeySelector selector);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -89,7 +88,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end);
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -107,7 +106,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit);
/**
@ -128,7 +127,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse);
/**
@ -152,7 +151,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse, StreamingMode mode);
/**
@ -168,7 +167,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end);
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -186,7 +185,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
int limit);
/**
@ -207,7 +206,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
int limit, boolean reverse);
/**
@ -231,7 +230,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
int limit, boolean reverse, StreamingMode mode);
/**
@ -250,7 +249,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range);
AsyncIterable<KeyValue> getRange(Range range);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -271,7 +270,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range,
AsyncIterable<KeyValue> getRange(Range range,
int limit);
/**
@ -295,7 +294,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range,
AsyncIterable<KeyValue> getRange(Range range,
int limit, boolean reverse);
/**
@ -322,7 +321,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range,
AsyncIterable<KeyValue> getRange(Range range,
int limit, boolean reverse, StreamingMode mode);
/**
@ -330,6 +329,5 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a set of transaction-specific options affecting this {@code Transaction}
*/
public TransactionOptions options();
TransactionOptions options();
}

View File

@ -54,7 +54,7 @@ import com.apple.foundationdb.tuple.Tuple;
* {@code runAsync()} on a {@code Transaction} will simply attempt the operations
* without any retry loop.<br>
* <br>
* Note: Client must call {@link #commit()} and wait on the result on all transactions, even
* <b>Note:</b> Client must call {@link #commit()} and wait on the result on all transactions, even
* ones that only read. This is done automatically when using the retry loops from
* {@link Database#run(Function)}. This is because outstanding reads originating from a
* {@code Transaction} will be cancelled when a {@code Transaction} is garbage collected.
@ -64,9 +64,9 @@ import com.apple.foundationdb.tuple.Tuple;
* all reads are complete, thereby saving the calling code from this potentially confusing
* situation.<br>
* <br>
* Note: All keys with a first byte of {@code 0xff} are reserved for internal use.<br>
* <b>Note:</b> All keys with a first byte of {@code 0xff} are reserved for internal use.<br>
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after {@link #onError}
* is called.<br>
* <br>
@ -84,7 +84,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* For more information about how to use snapshot reads correctly, see
* <a href="/documentation/developer-guide.html#using-snapshot-reads" target="_blank">Using snapshot reads</a>.
*/
public ReadTransaction snapshot();
ReadTransaction snapshot();
/**
* Directly sets the version of the database at which to execute reads. The
@ -95,7 +95,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
*
* @param version the version at which to read from the database
*/
public void setReadVersion(long version);
void setReadVersion(long version);
/**
@ -106,7 +106,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @param keyBegin the first key in the range (inclusive)
* @param keyEnd the ending key for the range (exclusive)
*/
public void addReadConflictRange(byte[] keyBegin, byte[] keyEnd);
void addReadConflictRange(byte[] keyBegin, byte[] keyEnd);
/**
* Adds a key to the transaction's read conflict ranges as if you had read
@ -115,7 +115,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
*
* @param key the key to be added to the range
*/
public void addReadConflictKey(byte[] key);
void addReadConflictKey(byte[] key);
/**
* Adds a range of keys to the transaction's write conflict ranges as if you
@ -125,7 +125,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @param keyBegin the first key in the range (inclusive)
* @param keyEnd the ending key for the range (exclusive)
*/
public void addWriteConflictRange(byte[] keyBegin, byte[] keyEnd);
void addWriteConflictRange(byte[] keyBegin, byte[] keyEnd);
/**
* Adds a key to the transaction's write conflict ranges as if you had
@ -134,7 +134,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
*
* @param key the key to be added to the range
*/
public void addWriteConflictKey(byte[] key);
void addWriteConflictKey(byte[] key);
/**
* Sets the value for a given key. This will not affect the
@ -145,7 +145,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @throws IllegalArgumentException
* @throws FDBException
*/
public void set(byte[] key, byte[] value);
void set(byte[] key, byte[] value);
/**
* Clears a given key from the database. This will not affect the
@ -155,7 +155,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @throws IllegalArgumentException
* @throws FDBException
*/
public void clear(byte[] key);
void clear(byte[] key);
/**
* Clears a range of keys in the database. The upper bound of the range is
@ -169,7 +169,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @throws IllegalArgumentException
* @throws FDBException
*/
public void clear(byte[] beginKey, byte[] endKey);
void clear(byte[] beginKey, byte[] endKey);
/**
* Clears a range of keys in the database. The upper bound of the range is
@ -182,7 +182,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
*
* @throws FDBException
*/
public void clear(Range range);
void clear(Range range);
/**
* Replace with calls to {@link #clear(Range)} with a parameter from a call to
@ -193,7 +193,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @throws FDBException
*/
@Deprecated
public void clearRangeStartsWith(byte[] prefix);
void clearRangeStartsWith(byte[] prefix);
/**
* An atomic operation is a single database command that carries out several
@ -214,7 +214,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* that are frequently modified. A common example is the use of a key-value
* pair as a counter.<br>
* <br>
* Note: If a transaction uses both an atomic operation and a serializable
* <b>Note:</b> If a transaction uses both an atomic operation and a serializable
* read on the same key, the benefits of using the atomic operation (for both
* conflict checking and performance) are lost.
*
@ -224,7 +224,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @param key the target of the operation
* @param param the value with which to modify the key
*/
public void mutate(MutationType optype, byte[] key, byte[] param);
void mutate(MutationType optype, byte[] key, byte[] param);
/**
* Commit this {@code Transaction}. See notes in class description. Consider using
@ -248,7 +248,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* throw an error code {@code used_during_commit}(2017). In this case, all
* subsequent operations on this transaction will throw this error.
*/
public CompletableFuture<Void> commit();
CompletableFuture<Void> commit();
/**
* Gets the version number at which a successful commit modified the database.
@ -260,7 +260,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
*
* @return the database version at which the commit succeeded
*/
public Long getCommittedVersion();
Long getCommittedVersion();
/**
* Returns a future which will contain the versionstamp which was used by any versionstamp
@ -273,7 +273,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @return a future containing the versionstamp which was used for any versionstamp operations
* in this transaction
*/
public CompletableFuture<byte[]> getVersionstamp();
CompletableFuture<byte[]> getVersionstamp();
/**
* Resets a transaction and returns a delayed signal for error recovery. If the error
@ -290,13 +290,13 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @param e the error caught while executing get()s and set()s on this {@code Transaction}
* @return a {@code CompletableFuture} to be set with a reset {@code Transaction} object to retry the transaction
*/
public CompletableFuture<Transaction> onError(Throwable e);
CompletableFuture<Transaction> onError(Throwable e);
/**
* Cancels the {@code Transaction}. All pending and any future uses of the
* {@code Transaction} will throw an {@link RuntimeException}.
*/
public void cancel();
void cancel();
/**
* Creates a watch that will become ready when it reports a change to
@ -340,7 +340,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* limit defaults to 10,000 and can be modified with a call to
* {@link DatabaseOptions#setMaxWatches(long)}.
*/
public CompletableFuture<Void> watch(byte[] key) throws FDBException;
CompletableFuture<Void> watch(byte[] key) throws FDBException;
/**
* Returns the {@link Database} that this {@code Transaction} is interacting
@ -348,7 +348,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
*
* @return the {@link Database} object
*/
public Database getDatabase();
Database getDatabase();
/**
* Run a function once against this {@code Transaction}. This call blocks while
@ -357,7 +357,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @return the return value of {@code retryable}
*/
@Override
public <T> T run(Function<? super Transaction, T> retryable);
<T> T run(Function<? super Transaction, T> retryable);
/**
* Run a function once against this {@code Transaction}. This call returns
@ -366,7 +366,7 @@ public interface Transaction extends Disposable, ReadTransaction, TransactionCon
* @return a {@code CompletableFuture} that will be set to the return value of {@code retryable}
*/
@Override
public <T> CompletableFuture<T> runAsync(
<T> CompletableFuture<T> runAsync(
Function<? super Transaction, CompletableFuture<T>> retryable);
}

View File

@ -36,7 +36,7 @@ public interface AsyncIterable<T> extends Iterable<T> {
* @return a handle to be used for non-blocking iteration
*/
@Override
public AsyncIterator<T> iterator();
AsyncIterator<T> iterator();
/**
* Asynchronously return the results of this operation as a {@code List}. This is
@ -47,5 +47,5 @@ public interface AsyncIterable<T> extends Iterable<T> {
*
* @return a {@code CompletableFuture} that will be set to contents of this operation
*/
public CompletableFuture<List<T>> asList();
CompletableFuture<List<T>> asList();
}

View File

@ -42,7 +42,7 @@ public interface AsyncIterator<T> extends Iterator<T> {
* would return another element without blocking or to {@code false} if there are
* no more elements in the sequence.
*/
public CompletableFuture<Boolean> onHasNext();
CompletableFuture<Boolean> onHasNext();
/**
* Blocking call to determine if the sequence contains more elements. This call
@ -54,7 +54,7 @@ public interface AsyncIterator<T> extends Iterator<T> {
* otherwise.
*/
@Override
public boolean hasNext();
boolean hasNext();
/**
* Returns the next element in the sequence. This will not block if, since the
@ -70,10 +70,10 @@ public interface AsyncIterator<T> extends Iterator<T> {
* @throws NoSuchElementException if the sequence has been exhausted.
*/
@Override
public T next();
T next();
/**
* Cancels any outstanding asynchronous work associated with this {@code AsyncIterator}.
*/
public void cancel();
void cancel();
}

View File

@ -50,7 +50,7 @@ public class AsyncUtil {
try {
return func.apply(value);
} catch (RuntimeException e) {
CompletableFuture<O> future = new CompletableFuture<O>();
CompletableFuture<O> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
@ -232,7 +232,7 @@ public class AsyncUtil {
final CompletableFuture<Void> done;
final Executor executor;
public LoopPartial(Supplier<? extends CompletableFuture<Boolean>> body, Executor executor) {
LoopPartial(Supplier<? extends CompletableFuture<Boolean>> body, Executor executor) {
this.body = body;
this.done = new CompletableFuture<>();
this.executor = executor;
@ -340,12 +340,7 @@ public class AsyncUtil {
* @return a newly created {@code CompletableFuture} that is set when {@code task} completes
*/
public static <V> CompletableFuture<Void> success(CompletableFuture<V> task) {
return task.thenApply(new Function<V, Void>() {
@Override
public Void apply(V o) {
return null;
}
});
return task.thenApply(o -> null);
}
/**
@ -359,34 +354,18 @@ public class AsyncUtil {
* @return a new {@link CompletableFuture} that is set when {@code task} is ready.
*/
public static <V> CompletableFuture<Void> whenReady(CompletableFuture<V> task) {
return task.thenApply(new Function<V, Void>() {
@Override
public Void apply(V o) {
return null;
}
}).exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable o) {
return null;
}
});
return task.thenApply(o -> (Void)null)
.exceptionally(o -> null);
}
public static <V> CompletableFuture<V> composeExceptionally(CompletableFuture<V> task, Function<Throwable, CompletableFuture<V>> fn) {
return task.handle(new BiFunction<V, Throwable, Throwable>() {
@Override
public Throwable apply(V v, Throwable e) {
return e;
}
}).thenCompose(new Function<Throwable, CompletableFuture<V>>() {
@Override
public CompletableFuture<V> apply(Throwable e) {
return task.handle((v,e) -> e)
.thenCompose(e -> {
if (e != null) {
return fn.apply(e);
} else {
return task;
}
}
});
}
@ -399,16 +378,13 @@ public class AsyncUtil {
* @return a {@code CompletableFuture} that will be set to the collective result of the tasks
*/
public static <V> CompletableFuture<List<V>> getAll(final Collection<CompletableFuture<V>> tasks) {
return whenAll(tasks).thenApply(new Function<Void, List<V>>() {
@Override
public List<V> apply(Void o) {
List<V> result = new ArrayList<V>();
return whenAll(tasks).thenApply(unused -> {
List<V> result = new ArrayList<>();
for(CompletableFuture<V> f : tasks) {
assert(f.isDone());
result.add(f.getNow(null));
}
return result;
}
});
}
@ -422,12 +398,7 @@ public class AsyncUtil {
* @return a {@code CompletableFuture} that will be set to {@code value} on completion of {@code task}
*/
public static <V, T> CompletableFuture<V> tag(CompletableFuture<T> task, final V value) {
return task.thenApply(new Function<T, V>() {
@Override
public V apply(T o) {
return value;
}
});
return task.thenApply(o -> value);
}
/**

View File

@ -235,6 +235,18 @@ public class DirectoryLayer implements Directory
&& contentSubspace.equals(other.contentSubspace);
}
/**
* Produces a hash of this {@code DirectoryLayer} based on its path and subspaces.
* This satisfies the necessary requirements to allow this class to be used as keys
* in hash tables or as values in hash-based sets.
*
* @return a hash based on the path and subspaces of this {@code DirectoryLayer}
*/
@Override
public int hashCode() {
return path.hashCode() ^ (nodeSubspace.hashCode() * 179) ^ (contentSubspace.hashCode() * 937);
}
/**
* Sets path of directory to {@code path}
*

View File

@ -138,11 +138,8 @@ public class TestResult {
outputBuilder.append('}');
BufferedWriter writer = null;
try {
writer = new BufferedWriter(new FileWriter(file));
try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
writer.write(outputBuilder.toString());
writer.close();
} catch (IOException e) {
System.out.println("Could not write results to file " + file);
throw new RuntimeException("Could not save results: " + e.getMessage(), e);

View File

@ -90,7 +90,7 @@ public class TesterArgs {
if (i + 1 < args.length) {
subspace = new Subspace(Tuple.from(args[++i]));
} else {
System.out.println("No subspace specified for argument " + args + "\n");
System.out.println("No subspace specified for argument " + arg + "\n");
printUsage();
throw new IllegalArgumentException("Not subspace specified for argument " + arg);
}

View File

@ -99,7 +99,7 @@ public class WatchTest {
e.execute(cancel);
}
while(a.get() != 2); {
while(a.get() != 2) {
try {
Thread.sleep(1);
} catch (InterruptedException e1) {

View File

@ -81,7 +81,7 @@ public class Cluster extends DefaultDisposableImpl implements Disposable {
* successful connection.
*/
public Database openDatabase(Executor e) throws FDBException {
FutureDatabase futureDatabase = null;
FutureDatabase futureDatabase;
pointerReadLock.lock();
try {
futureDatabase = new FutureDatabase(Cluster_createDatabase(getPtr(), "DB".getBytes(UTF8)), e);

View File

@ -45,13 +45,13 @@ public interface Database extends Disposable, TransactionContext {
/**
* Creates a {@link Transaction} that operates on this {@code Database}.<br>
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Database}.
*/
public Transaction createTransaction();
Transaction createTransaction();
/**
* Creates a {@link Transaction} that operates on this {@code Database} with the given {@link Executor}
@ -60,14 +60,14 @@ public interface Database extends Disposable, TransactionContext {
* @param e the {@link Executor} to use when executing asynchronous callbacks for the database
* @return a newly created {@code Transaction} that reads from and writes to this {@code Database}.
*/
public Transaction createTransaction(Executor e);
Transaction createTransaction(Executor e);
/**
* Returns a set of options that can be set on a {@code Database}
*
* @return a set of database-specific options affecting this {@code Database}
*/
public DatabaseOptions options();
DatabaseOptions options();
/**
* Runs a read-only transactional function against this {@code Database} with retry logic.
@ -81,7 +81,7 @@ public interface Database extends Disposable, TransactionContext {
* this database
*/
@Override
public <T> T read(Function<? super ReadTransaction, T> retryable);
<T> T read(Function<? super ReadTransaction, T> retryable);
/**
* Runs a read-only transactional function against this {@code Database} with retry logic. Use
@ -94,7 +94,7 @@ public interface Database extends Disposable, TransactionContext {
*
* @see #read(Function)
*/
public <T> T read(Function<? super ReadTransaction, T> retryable, Executor e);
<T> T read(Function<? super ReadTransaction, T> retryable, Executor e);
/**
* Runs a read-only transactional function against this {@code Database} with retry logic. Use
@ -109,7 +109,7 @@ public interface Database extends Disposable, TransactionContext {
* @see #read(Function)
*/
@Override
public abstract <T> T read(PartialFunction<? super ReadTransaction, T> retryable) throws Exception;
<T> T read(PartialFunction<? super ReadTransaction, T> retryable) throws Exception;
/**
* Runs a read-only transactional function against this {@code Database} with retry logic. Use
@ -125,7 +125,7 @@ public interface Database extends Disposable, TransactionContext {
*
* @see #read(Function)
*/
public abstract <T> T read(PartialFunction<? super ReadTransaction, T> retryable, Executor e) throws Exception;
<T> T read(PartialFunction<? super ReadTransaction, T> retryable, Executor e) throws Exception;
/**
* Runs a read-only transactional function against this {@code Database} with retry logic.
@ -143,7 +143,7 @@ public interface Database extends Disposable, TransactionContext {
* this database
*/
@Override
public abstract <T> Future<T> readAsync(
<T> Future<T> readAsync(
Function<? super ReadTransaction, Future<T>> retryable);
/**
@ -157,7 +157,7 @@ public interface Database extends Disposable, TransactionContext {
*
* @see #readAsync(Function)
*/
public abstract <T> Future<T> readAsync(
<T> Future<T> readAsync(
Function<? super ReadTransaction, Future<T>> retryable, Executor e);
/**
@ -171,7 +171,7 @@ public interface Database extends Disposable, TransactionContext {
* @see #read(Function)
*/
@Override
public abstract <T> PartialFuture<T> readAsync(
<T> PartialFuture<T> readAsync(
PartialFunction<? super ReadTransaction, ? extends PartialFuture<T>> retryable);
/**
@ -186,7 +186,7 @@ public interface Database extends Disposable, TransactionContext {
*
* @see #read(Function)
*/
public abstract <T> PartialFuture<T> readAsync(
<T> PartialFuture<T> readAsync(
PartialFunction<? super ReadTransaction, ? extends PartialFuture<T>> retryable, Executor e);
@ -210,7 +210,7 @@ public interface Database extends Disposable, TransactionContext {
* this database
*/
@Override
public <T> T run(Function<? super Transaction, T> retryable);
<T> T run(Function<? super Transaction, T> retryable);
/**
* Runs a transactional function against this {@code Database} with retry logic.
@ -222,7 +222,7 @@ public interface Database extends Disposable, TransactionContext {
* this database
* @param e the {@link Executor} to use for asynchronous callbacks
*/
public <T> T run(Function<? super Transaction, T> retryable, Executor e);
<T> T run(Function<? super Transaction, T> retryable, Executor e);
/**
* Runs a transactional function against this {@code Database} with retry logic. Use
@ -237,7 +237,7 @@ public interface Database extends Disposable, TransactionContext {
* @see #run(Function)
*/
@Override
public abstract <T> T run(PartialFunction<? super Transaction, T> retryable) throws Exception;
<T> T run(PartialFunction<? super Transaction, T> retryable) throws Exception;
/**
* Runs a transactional function against this {@code Database} with retry logic. Use
@ -253,7 +253,7 @@ public interface Database extends Disposable, TransactionContext {
*
* @see #run(Function)
*/
public abstract <T> T run(PartialFunction<? super Transaction, T> retryable, Executor e) throws Exception;
<T> T run(PartialFunction<? super Transaction, T> retryable, Executor e) throws Exception;
/**
* Runs a transactional function against this {@code Database} with retry logic.
@ -279,7 +279,7 @@ public interface Database extends Disposable, TransactionContext {
* this database
*/
@Override
public abstract <T> Future<T> runAsync(
<T> Future<T> runAsync(
Function<? super Transaction, Future<T>> retryable);
/**
@ -293,7 +293,7 @@ public interface Database extends Disposable, TransactionContext {
*
* @see #run(Function)
*/
public abstract <T> Future<T> runAsync(
<T> Future<T> runAsync(
Function<? super Transaction, Future<T>> retryable, Executor e);
/**
@ -307,7 +307,7 @@ public interface Database extends Disposable, TransactionContext {
* @see #run(Function)
*/
@Override
public abstract <T> PartialFuture<T> runAsync(
<T> PartialFuture<T> runAsync(
PartialFunction<? super Transaction, ? extends PartialFuture<T>> retryable);
/**
@ -322,7 +322,7 @@ public interface Database extends Disposable, TransactionContext {
*
* @see #run(Function)
*/
public abstract <T> PartialFuture<T> runAsync(
<T> PartialFuture<T> runAsync(
PartialFunction<? super Transaction, ? extends PartialFuture<T>> retryable, Executor e);
}

View File

@ -227,7 +227,7 @@ public class LocalityUtil {
}
}
static Charset ASCII = Charset.forName("US-ASCII");
private static Charset ASCII = Charset.forName("US-ASCII");
static byte[] keyServersForKey(byte[] key) {
return ByteArrayUtil.join(new byte[] { (byte)255 },
"/keyServers/".getBytes(ASCII),

View File

@ -33,5 +33,5 @@ public interface OptionConsumer {
* @param code the encoded parameter to set
* @param parameter the value, the range of which is dependent on the parameter {@code code}
*/
public void setOption(int code, byte[] parameter);
void setOption(int code, byte[] parameter);
}

View File

@ -283,11 +283,6 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
return onHasNext().get();
}
// moves to the last position in the current chunk
/*public synchronized void consumeAll() {
index = chunk.values.size() - 1;
}*/
@Override
public KeyValue next() {
Future<Boolean> nextFuture;

View File

@ -28,10 +28,9 @@ import com.apple.foundationdb.tuple.Tuple;
/**
* A read-only subset of a FoundationDB {@link Transaction}. This is the interface that
* {@code Transaction}'s {@link Transaction#snapshot snapshot} presents.
*
* {@code Transaction}'s {@link Transaction#snapshot snapshot} presents.<br>
* <br>
* Note: Client must call {@link Transaction#commit()} and wait on the result on all transactions,
* <b>Note:</b> Client must call {@link Transaction#commit()} and wait on the result on all transactions,
* even ones that only read. This is done automatically when using the retry loops from
* {@link Database#run(Function)}. This is explained more in the intro to {@link Transaction}.
*
@ -42,13 +41,13 @@ public interface ReadTransaction extends ReadTransactionContext {
* When passed to a {@code getRange()} call that takes a {@code limit} parameter,
* indicates that the query should return unlimited rows.
*/
public static final int ROW_LIMIT_UNLIMITED = 0;
int ROW_LIMIT_UNLIMITED = 0;
/**
* Gets the version at which the reads for this {@code Transaction} will access the database.
* @return the version for database reads
*/
public Future<Long> getReadVersion();
Future<Long> getReadVersion();
/**
* Gets a value from the database. The call will return {@code null} if the key is not
@ -59,7 +58,7 @@ public interface ReadTransaction extends ReadTransactionContext {
* @return a {@code Future} which will be set to the value corresponding to
* the key or to null if the key does not exist.
*/
public Future<byte[]> get(byte[] key);
Future<byte[]> get(byte[] key);
/**
* Returns the key referenced by the specified {@code KeySelector}.
@ -74,7 +73,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a {@code Future} which will be set to an absolute database key
*/
public Future<byte[]> getKey(KeySelector selector);
Future<byte[]> getKey(KeySelector selector);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -89,7 +88,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end);
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -107,7 +106,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit);
/**
@ -128,7 +127,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse);
/**
@ -152,7 +151,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
AsyncIterable<KeyValue> getRange(KeySelector begin, KeySelector end,
int limit, boolean reverse, StreamingMode mode);
/**
@ -168,7 +167,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end);
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -186,7 +185,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
int limit);
/**
@ -207,7 +206,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
int limit, boolean reverse);
/**
@ -231,7 +230,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
AsyncIterable<KeyValue> getRange(byte[] begin, byte[] end,
int limit, boolean reverse, StreamingMode mode);
/**
@ -250,7 +249,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range);
AsyncIterable<KeyValue> getRange(Range range);
/**
* Gets an ordered range of keys and values from the database. The begin
@ -271,7 +270,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range,
AsyncIterable<KeyValue> getRange(Range range,
int limit);
/**
@ -295,7 +294,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range,
AsyncIterable<KeyValue> getRange(Range range,
int limit, boolean reverse);
/**
@ -322,7 +321,7 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a handle to access the results of the asynchronous call
*/
public AsyncIterable<KeyValue> getRange(Range range,
AsyncIterable<KeyValue> getRange(Range range,
int limit, boolean reverse, StreamingMode mode);
/**
@ -330,5 +329,5 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @return a set of transaction-specific options affecting this {@code Transaction}
*/
public TransactionOptions options();
TransactionOptions options();
}

View File

@ -57,7 +57,7 @@ import com.apple.foundationdb.tuple.Tuple;
* {@code runAsync()} on a {@code Transaction} will simply attempt the operations
* without any retry loop.<br>
* <br>
* Note: Client must call {@link #commit()} and wait on the result on all transactions, even
* <b>Note:</b> Client must call {@link #commit()} and wait on the result on all transactions, even
* ones that only read. This is done automatically when using the retry loops from
* {@link Database#run(Function)}. This is because outstanding reads originating from a
* {@code Transaction} will be cancelled when a {@code Transaction} is garbage collected.
@ -67,9 +67,9 @@ import com.apple.foundationdb.tuple.Tuple;
* all reads are complete, thereby saving the calling code from this potentially confusing
* situation.<br>
* <br>
* Note: All keys with a first byte of {@code 0xff} are reserved for internal use.<br>
* <b>Note:</b> All keys with a first byte of {@code 0xff} are reserved for internal use.<br>
* <br>
* Note: Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after {@link #onError}
* is called.
*/
@ -84,7 +84,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* For more information about how to use snapshot reads correctly, see
* <a href="/documentation/developer-guide.html#using-snapshot-reads" target="_blank">Using snapshot reads</a>.
*/
public ReadTransaction snapshot();
ReadTransaction snapshot();
/**
* Directly sets the version of the database at which to execute reads. The
@ -95,7 +95,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
*
* @param version the version at which to read from the database
*/
public void setReadVersion(long version);
void setReadVersion(long version);
/**
@ -106,7 +106,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @param keyBegin the first key in the range (inclusive)
* @param keyEnd the ending key for the range (exclusive)
*/
public void addReadConflictRange(byte[] keyBegin, byte[] keyEnd);
void addReadConflictRange(byte[] keyBegin, byte[] keyEnd);
/**
* Adds a key to the transaction's read conflict ranges as if you had read
@ -115,7 +115,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
*
* @param key the key to be added to the range
*/
public void addReadConflictKey(byte[] key);
void addReadConflictKey(byte[] key);
/**
* Adds a range of keys to the transaction's write conflict ranges as if you
@ -125,7 +125,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @param keyBegin the first key in the range (inclusive)
* @param keyEnd the ending key for the range (exclusive)
*/
public void addWriteConflictRange(byte[] keyBegin, byte[] keyEnd);
void addWriteConflictRange(byte[] keyBegin, byte[] keyEnd);
/**
* Adds a key to the transaction's write conflict ranges as if you had
@ -134,7 +134,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
*
* @param key the key to be added to the range
*/
public void addWriteConflictKey(byte[] key);
void addWriteConflictKey(byte[] key);
/**
* Sets the value for a given key. This will not affect the
@ -145,7 +145,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @throws IllegalArgumentException
* @throws FDBException
*/
public void set(byte[] key, byte[] value);
void set(byte[] key, byte[] value);
/**
* Clears a given key from the database. This will not affect the
@ -155,7 +155,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @throws IllegalArgumentException
* @throws FDBException
*/
public void clear(byte[] key);
void clear(byte[] key);
/**
* Clears a range of keys in the database. The upper bound of the range is
@ -169,7 +169,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @throws IllegalArgumentException
* @throws FDBException
*/
public void clear(byte[] beginKey, byte[] endKey);
void clear(byte[] beginKey, byte[] endKey);
/**
* Clears a range of keys in the database. The upper bound of the range is
@ -182,7 +182,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
*
* @throws FDBException
*/
public void clear(Range range);
void clear(Range range);
/**
* Replace with calls to {@link #clear(Range)} with a parameter from a call to
@ -193,7 +193,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @throws FDBException
*/
@Deprecated
public void clearRangeStartsWith(byte[] prefix);
void clearRangeStartsWith(byte[] prefix);
/**
* An atomic operation is a single database command that carries out several
@ -214,7 +214,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* that are frequently modified. A common example is the use of a key-value
* pair as a counter.<br>
* <br>
* Note: If a transaction uses both an atomic operation and a serializable
* <b>Note:</b> If a transaction uses both an atomic operation and a serializable
* read on the same key, the benefits of using the atomic operation (for both
* conflict checking and performance) are lost.
*
@ -224,7 +224,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @param key the target of the operation
* @param param the value with which to modify the key
*/
public void mutate(MutationType optype, byte[] key, byte[] param);
void mutate(MutationType optype, byte[] key, byte[] param);
/**
* Commit this {@code Transaction}. See notes in class description. Consider using
@ -248,7 +248,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* throw an error code {@code used_during_commit}(2017). In this case, all
* subsequent operations on this transaction will throw this error.
*/
public Future<Void> commit();
Future<Void> commit();
/**
* Gets the version number at which a successful commit modified the database.
@ -260,7 +260,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
*
* @return the database version at which the commit succeeded
*/
public Long getCommittedVersion();
Long getCommittedVersion();
/**
* Returns a future which will contain the versionstamp which was used by any versionstamp
@ -273,7 +273,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @return a future containing the versionstamp which was used for any versionstamp operations
* in this transaction
*/
public Future<byte[]> getVersionstamp();
Future<byte[]> getVersionstamp();
/**
* Resets a transaction and returns a delayed signal for error recovery. If the error
@ -290,7 +290,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @param e the error caught while executing get()s and set()s on this {@code Transaction}
* @return a {@code Future} to be set with a reset {@code Transaction} object to retry the transaction
*/
public Future<Transaction> onError(RuntimeException e);
Future<Transaction> onError(RuntimeException e);
/**
* Resets a transaction and returns a delayed signal for error recovery. If the error
@ -304,14 +304,14 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @param e the error caught while executing get()s and set()s on this {@code Transaction}
* @return a {@code PartialFuture} to be set with a reset {@code Transaction} object to retry the transaction
*/
public PartialFuture<Transaction> onError(Exception e);
PartialFuture<Transaction> onError(Exception e);
/**
* Cancels the {@code Transaction}. All pending and any future uses of the
* {@code Transaction} will throw an {@link RuntimeException}.
*/
@Override
public void cancel();
void cancel();
/**
* Creates a watch that will become ready when it reports a change to
@ -355,7 +355,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* limit defaults to 10,000 and can be modified with a call to
* {@link DatabaseOptions#setMaxWatches(long)}.
*/
public Future<Void> watch(byte[] key) throws FDBException;
Future<Void> watch(byte[] key) throws FDBException;
/**
* Returns the {@link Database} that this {@code Transaction} is interacting
@ -363,7 +363,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
*
* @return the {@link Database} object
*/
public Database getDatabase();
Database getDatabase();
/**
* Run a function once against this {@code Transaction}. This call blocks while
@ -372,7 +372,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @return the return value of {@code retryable}
*/
@Override
public <T> T run(Function<? super Transaction, T> retryable);
<T> T run(Function<? super Transaction, T> retryable);
/**
* Run a function once against this {@code Transaction}. This call blocks while
@ -383,7 +383,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @throws Exception if an error is encountered during execution
*/
@Override
public <T> T run(PartialFunction<? super Transaction, T> retryable)
<T> T run(PartialFunction<? super Transaction, T> retryable)
throws Exception;
/**
@ -393,7 +393,7 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @return a {@code Future} that will be set to the return value of {@code retryable}
*/
@Override
public <T> Future<T> runAsync(
<T> Future<T> runAsync(
Function<? super Transaction, Future<T>> retryable);
/**
@ -404,6 +404,6 @@ public interface Transaction extends Cancellable, Disposable, ReadTransaction, T
* @return a {@code PartialFuture} that will be set to the return value of {@code retryable}
*/
@Override
public <T> PartialFuture<T> runAsync(
<T> PartialFuture<T> runAsync(
PartialFunction<? super Transaction, ? extends PartialFuture<T>> retryable);
}

View File

@ -35,7 +35,7 @@ public interface AsyncIterable<T> extends Iterable<T> {
* @return a handle to be used for non-blocking iteration
*/
@Override
public AsyncIterator<T> iterator();
AsyncIterator<T> iterator();
/**
* Asynchronously return the results of this operation as a {@code List}. This is
@ -46,5 +46,5 @@ public interface AsyncIterable<T> extends Iterable<T> {
*
* @return a {@code Future} that will be set to contents of this operation
*/
public Future<List<T>> asList();
Future<List<T>> asList();
}

View File

@ -43,7 +43,7 @@ public interface AsyncIterator<T> extends Iterator<T>, Cancellable, Disposable {
* would return another element without blocking or to {@code false} if there are
* no more elements in the sequence.
*/
public Future<Boolean> onHasNext();
Future<Boolean> onHasNext();
/**
* Blocking call to determine if the sequence contains more elements. This call
@ -55,7 +55,7 @@ public interface AsyncIterator<T> extends Iterator<T>, Cancellable, Disposable {
* otherwise.
*/
@Override
public boolean hasNext();
boolean hasNext();
/**
* Returns the next element in the sequence. This will not block if, since the
@ -71,18 +71,18 @@ public interface AsyncIterator<T> extends Iterator<T>, Cancellable, Disposable {
* @throws NoSuchElementException if the sequence has been exhausted.
*/
@Override
public T next();
T next();
/**
* Cancels any outstanding asynchronous work associated with this {@code AsyncIterator}.
*/
@Override
public void cancel();
void cancel();
/**
* Cancel this {@code AsyncIterable} and dispose of associated resources. Equivalent
* to calling {@link AsyncIterator#cancel()}.
*/
@Override
public void dispose();
void dispose();
}

View File

@ -189,7 +189,7 @@ public class AsyncUtil {
PartialFuture<Boolean> process;
boolean m_cancelled = false;
public LoopPartial(PartialFunction<Void, ? extends PartialFuture<Boolean>> body) {
LoopPartial(PartialFunction<Void, ? extends PartialFuture<Boolean>> body) {
this.body = body;
this.done = new SettablePartialFuture<Void>();
this.done.onCancelled(new Runnable() {

View File

@ -237,6 +237,18 @@ public class DirectoryLayer implements Directory
&& contentSubspace.equals(other.contentSubspace);
}
/**
* Produces a hash of this {@code DirectoryLayer} based on its path and subspaces.
* This satisfies the necessary requirements to allow this class to be used as keys
* in hash tables or as values in hash-based sets.
*
* @return a hash based on the path and subspaces of this {@code DirectoryLayer}
*/
@Override
public int hashCode() {
return path.hashCode() ^ (nodeSubspace.hashCode() * 179) ^ (contentSubspace.hashCode() * 937);
}
/**
* Sets path of directory to {@code path}
*

View File

@ -574,7 +574,7 @@ public class AsyncStackTester {
return inst.popParams(listSize).flatMap(new Function<List<Object>, Future<Void>>() {
@Override
public Future<Void> apply(List<Object> rawElements) {
List<Tuple> tuples = new ArrayList(listSize);
List<Tuple> tuples = new ArrayList<Tuple>(listSize);
for(Object o : rawElements) {
tuples.add(Tuple.fromBytes((byte[])o));
}

View File

@ -106,7 +106,7 @@ public class WatchTest {
e.execute(cancel);
}
while(a.get() != 2); {
while(a.get() != 2) {
try {
Thread.sleep(1);
} catch (InterruptedException e1) {

View File

@ -15,7 +15,9 @@ case $1 in
OPTIONS="$OPTIONS -Wl,-dylib_install_name -Wl,$( basename $3 )"
fi
else
OPTIONS=
if [ "$PLATFORM" = "linux" ]; then
OPTIONS="$OPTIONS -pie -fPIE"
fi
fi
OPTIONS=$( eval echo "$OPTIONS $LDFLAGS \$$2_LDFLAGS \$$2_OBJECTS \$$2_LIBS \$$2_STATIC_LIBS_REAL -o $3" )

View File

@ -82,19 +82,8 @@ ifdef JAVAC
ifneq ($(JAVAVERMAJOR),1)
$(warning Unable to compile source using Java version: $(JAVAVER) with compiler: $(JAVAC) on $(PLATFORM) $(ARCH))
else
# Set specific flags for Java 1.8
ifeq ($(JAVAVERMAJOR).$(JAVAVERMINOR),1.8)
JAVAFLAGS := -source 1.6 -target 1.6
JAVA-completableFLAGS :=
else
ifeq ($(JAVAVERMAJOR).$(JAVAVERMINOR),1.7)
JAVAFLAGS := -source 1.6 -target 1.6
JAVA-completableFLAGS :=
else
JAVAFLAGS :=
JAVA-completableFLAGS :=
endif
endif
JAVAFLAGS := -Xlint -source 1.6 -target 1.6
JAVA-completableFLAGS := -Xlint -source 1.8 -target 1.8
endif
endif

View File

@ -229,9 +229,7 @@ public:
// Tries to abort the restore for a tag. Returns the final (stable) state of the tag.
Future<ERestoreState> abortRestore(Reference<ReadYourWritesTransaction> tr, Key tagName);
Future<ERestoreState> abortRestore(Database cx, Key tagName) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return abortRestore(tr, tagName); });
}
Future<ERestoreState> abortRestore(Database cx, Key tagName);
// Waits for a restore tag to reach a final (stable) state.
Future<ERestoreState> waitRestore(Database cx, Key tagName, bool verbose);

View File

@ -24,10 +24,11 @@
#include "FDBTypes.h"
struct MutationRef {
static const char * typeString[] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP };
const char * typeString[MAX_ATOMIC_OP] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
// This is stored this way for serialization purposes.
uint8_t type;
StringRef param1, param2;

View File

@ -93,6 +93,7 @@ private:
struct LeaderInfo {
UID changeID;
uint64_t mask = ~(15ll << 60);
Value serializedInfo;
bool forward; // If true, serializedInfo is a connection string instead!
@ -103,32 +104,24 @@ struct LeaderInfo {
bool operator == (LeaderInfo const& r) const { return changeID == r.changeID; }
// The first 4 bits of ChangeID represent cluster controller process class fitness, the lower the better
bool updateChangeID(uint64_t processClassFitness) {
uint64_t mask = 15ll << 60;
processClassFitness <<= 60;
if ((changeID.first() & mask) == processClassFitness) {
return false;
}
changeID = UID((changeID.first() & ~mask) | processClassFitness, changeID.second());
return true;
void updateChangeID(uint64_t processClassFitness, bool isExcluded) {
changeID = UID( ( (uint64_t)isExcluded << 63) | (processClassFitness << 60) | (changeID.first() & mask ), changeID.second() );
}
// Change leader only if the candidate has better process class fitness
bool leaderChangeRequired(LeaderInfo const& candidate) const {
uint64_t mask = 15ll << 60;
if ((changeID.first() & mask) > (candidate.changeID.first() & mask)) {
// All but the first 4 bits are used to represent process id
bool equalInternalId(LeaderInfo const& leaderInfo) const {
if ( (changeID.first() & mask) == (leaderInfo.changeID.first() & mask) && changeID.second() == leaderInfo.changeID.second() ) {
return true;
} else {
return false;
}
}
// All but the first 4 bits are used to represent process id
bool equalInternalId(LeaderInfo const& leaderInfo) const {
uint64_t mask = ~(15ll << 60);
if ((changeID.first() & mask) == (leaderInfo.changeID.first() & mask)) {
// Change leader only if
// 1. the candidate has better process class fitness and the candidate is not the leader
// 2. the leader process class fitness become worse
bool leaderChangeRequired(LeaderInfo const& candidate) const {
if ( ((changeID.first() & ~mask) > (candidate.changeID.first() & ~mask) && !equalInternalId(candidate)) || ((changeID.first() & ~mask) < (candidate.changeID.first() & ~mask) && equalInternalId(candidate)) ) {
return true;
} else {
return false;

View File

@ -1528,7 +1528,6 @@ public:
throw backup_unneeded();
}
state Version current = wait(tr->getReadVersion());
Optional<Value> _backupUid = wait(tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyFolderId)));
backupUid = _backupUid.get();
@ -1539,17 +1538,6 @@ public:
const auto& log_uid = BinaryWriter::toValue(logUid, Unversioned());
tr->clear(log_uid.withPrefix(applyMutationsEndRange.begin));
// Ensure that we're at a version higher than the data that we've written.
Optional<Value> lastApplied = wait(tr->get(log_uid.withPrefix(applyMutationsBeginRange.begin)));
if (lastApplied.present()) {
Version applied = BinaryReader::fromStringRef<Version>(lastApplied.get(), Unversioned());
if (current <= applied) {
TraceEvent("DBA_abort_version_upgrade").detail("src", applied).detail("dest", current);
TEST(true); // Upgrading version of local database.
tr->set(minRequiredCommitVersionKey, BinaryWriter::toValue(applied+1, Unversioned()));
}
}
Key logsPath = uidPrefixKey(applyLogKeys.begin, logUid);
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
@ -1564,6 +1552,30 @@ public:
}
}
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
const auto& log_uid = BinaryWriter::toValue(logUid, Unversioned());
// Ensure that we're at a version higher than the data that we've written.
Optional<Value> lastApplied = wait(tr->get(log_uid.withPrefix(applyMutationsBeginRange.begin)));
if (lastApplied.present()) {
Version current = tr->getReadVersion().get();
Version applied = BinaryReader::fromStringRef<Version>(lastApplied.get(), Unversioned());
if (current <= applied) {
TraceEvent("DBA_abort_version_upgrade").detail("src", applied).detail("dest", current);
TEST(true); // Upgrading version of local database.
tr->set(minRequiredCommitVersionKey, BinaryWriter::toValue(applied+1, Unversioned()));
}
}
Void _ = wait(tr->commit());
break;
} catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
if(partial)
return Void();

View File

@ -850,6 +850,9 @@ namespace fileBackup {
static TaskParam<int64_t> fileSize() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<int64_t> nrKeys() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
StringRef getName() const { return name; };
@ -910,6 +913,7 @@ namespace fileBackup {
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Key begin, Key end, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>(), int priority = 0) {
TraceEvent(SevInfo, "FBA_schedBackupRangeTask").detail("begin", printable(begin)).detail("end", printable(end));
Key key = wait(addBackupTask(BackupRangeTaskFunc::name,
BackupRangeTaskFunc::version,
tr, taskBucket, completionKey,
@ -944,6 +948,7 @@ namespace fileBackup {
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
state int64_t nrKeys = 0;
Void _ = wait(checkTaskVersion(cx, task, BackupRangeTaskFunc::name, BackupRangeTaskFunc::version));
@ -1022,6 +1027,7 @@ namespace fileBackup {
TEST(true); // Backup range task did not finish before timeout
Params.backupRangeBeginKey().set(task, beginKey);
Params.fileSize().set(task, fileSize);
Params.nrKeys().set(task, nrKeys);
return Void();
}
@ -1041,6 +1047,7 @@ namespace fileBackup {
for (; i < values.first.size(); ++i) {
lastKey = values.first[i].key;
Void _ = wait(rangeFile.writeKV(lastKey, values.first[i].value));
nrKeys++;
}
}
catch (Error &e) {
@ -1075,6 +1082,7 @@ namespace fileBackup {
}
}
Params.fileSize().set(task, fileSize);
Params.nrKeys().set(task, nrKeys);
return Void();
}
@ -1127,6 +1135,9 @@ namespace fileBackup {
Void _ = wait(taskFuture->set(tr, taskBucket));
}
TraceEvent(SevInfo, "FBA_endBackupRangeTask").detail("begin", printable(Params.beginKey().get(task))).detail("end", printable(Params.endKey().get(task)))
.detail("size", Params.fileSize().get(task)).detail("nrKeys", Params.nrKeys().get(task));
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
@ -3010,10 +3021,42 @@ namespace fileBackup {
// Cancel the backup tasks on this tag
Void _ = wait(tag.cancel(tr));
Void _ = wait(unlockDatabase(tr, current.get().first));
return ERestoreState::ABORTED;
}
ACTOR Future<ERestoreState> abortRestore(Database cx, Key tagName) {
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>( new ReadYourWritesTransaction(cx) );
loop {
try {
ERestoreState estate = wait( abortRestore(tr, tagName) );
if(estate != ERestoreState::ABORTED) {
return estate;
}
Void _ = wait(tr->commit());
break;
} catch( Error &e ) {
Void _ = wait( tr->onError(e) );
}
}
tr = Reference<ReadYourWritesTransaction>( new ReadYourWritesTransaction(cx) );
//Commit a dummy transaction before returning success, to ensure the mutation applier has stopped submitting mutations
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->addReadConflictRange(singleKeyRange(KeyRef()));
tr->addWriteConflictRange(singleKeyRange(KeyRef()));
Void _ = wait(tr->commit());
return ERestoreState::ABORTED;
} catch( Error &e ) {
Void _ = wait( tr->onError(e) );
}
}
}
struct StartFullRestoreTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
@ -3760,6 +3803,10 @@ Future<ERestoreState> FileBackupAgent::abortRestore(Reference<ReadYourWritesTran
return fileBackup::abortRestore(tr, tagName);
}
Future<ERestoreState> FileBackupAgent::abortRestore(Database cx, Key tagName) {
return fileBackup::abortRestore(cx, tagName);
}
Future<std::string> FileBackupAgent::restoreStatus(Reference<ReadYourWritesTransaction> tr, Key tagName) {
return fileBackup::restoreStatus(tr, tagName);
}

View File

@ -38,6 +38,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( FAILURE_MAX_DELAY, 10.0 ); if( randomize && BUGGIFY ) FAILURE_MAX_DELAY = 5.0;
init( FAILURE_MIN_DELAY, 5.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 2.0;
init( FAILURE_TIMEOUT_DELAY, FAILURE_MIN_DELAY );
init( CLIENT_FAILURE_TIMEOUT_DELAY, FAILURE_MIN_DELAY );
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin

View File

@ -37,6 +37,7 @@ public:
double FAILURE_MAX_DELAY;
double FAILURE_MIN_DELAY;
double FAILURE_TIMEOUT_DELAY;
double CLIENT_FAILURE_TIMEOUT_DELAY;
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)

View File

@ -331,16 +331,39 @@ Optional<LeaderInfo> getLeader( vector<Optional<LeaderInfo>> nominees ) {
if (nominees[i].present() && nominees[i].get().forward)
return nominees[i].get();
if(!nominees.size())
return Optional<LeaderInfo>();
// There is a leader if a majority of the nominees are the same.
// If there is a majority, the median item is in it.
int quorum = nominees.size()/2 + 1;
int q = nominees.size() - quorum;
std::nth_element( nominees.begin(), nominees.begin() + q, nominees.end() );
auto median = nominees[q];
if (std::count( nominees.begin(), nominees.end(), median ) >= quorum)
return median;
else
int bestCount = 0;
Optional<LeaderInfo> currentNominee;
for(int i=0; i<nominees.size(); i++) {
if( (nominees[i].present() != currentNominee.present()) || (currentNominee.present() && !currentNominee.get().equalInternalId(nominees[i].get()) ) ) {
if(bestCount > 0) {
bestCount--;
} else {
bestCount = 1;
currentNominee = nominees[i];
}
} else {
bestCount++;
}
}
if(!currentNominee.present())
return Optional<LeaderInfo>();
int amountBest = 0;
for(int i=0; i<nominees.size(); i++) {
if( nominees[i].present() && currentNominee.get().equalInternalId(nominees[i].get()) ) {
amountBest++;
}
}
if(amountBest >= nominees.size()/2 + 1) {
return currentNominee;
}
return Optional<LeaderInfo>();
}
struct MonitorLeaderInfo {

View File

@ -2507,6 +2507,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
if (info.debugID.present())
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
req.transaction.read_snapshot = 0;
try {
Version v = wait( readVersion );
req.transaction.read_snapshot = v;

View File

@ -39,7 +39,9 @@ ACTOR Future<Void> waitForContinuousFailure( IFailureMonitor* monitor, Endpoint
// X == sustainedFailureDuration + slope * (now()-startT+X)
double waitDelay = (sustainedFailureDuration + slope * (now()-startT)) / (1-slope);
if(waitDelay < FLOW_KNOBS->CLIENT_REQUEST_INTERVAL) //We will not get a failure monitoring update in this amount of time, so there is no point in waiting for changes
//SOMEDAY: if we know that this process is a server or client we can tune this optimization better
if(waitDelay < std::min(FLOW_KNOBS->CLIENT_REQUEST_INTERVAL, FLOW_KNOBS->SERVER_REQUEST_INTERVAL)) //We will not get a failure monitoring update in this amount of time, so there is no point in waiting for changes
waitDelay = 0;
choose {
when (Void _ = wait( monitor->onStateEqual( endpoint, FailureStatus(false) ) )) {} // SOMEDAY: Use onStateChanged() for efficiency

View File

@ -385,7 +385,6 @@ protected:
LocalityEntry const& add(LocalityEntry const& entry, LocalityData const& data) {
_entryArray.push_back(entry);
_mutableEntryArray.push_back(entry);
ASSERT(data._data.size() > 0);
// Ensure that the key value array is large enough to hold the values
if (_keyValueArray.capacity() < _keyValueArray.size() + data._data.size()) {
@ -419,7 +418,6 @@ protected:
if (_keyValueArray.capacity() < _keyValueArray.size() + record->_dataMap->size()) {
_keyValueArray.reserve(_keyValueArray.size() + record->_dataMap->size());
}
ASSERT(record->_dataMap->_keyvaluearray.size() > 0);
for (auto& keyValuePair : record->_dataMap->_keyvaluearray) {
auto keyString = _localitygroup->keyText(keyValuePair.first);

View File

@ -68,6 +68,11 @@ struct IReplicationPolicy : public ReferenceCounted<IReplicationPolicy> {
std::vector<LocalityEntry> const& solutionSet,
std::vector<LocalityEntry> const& alsoServers,
LocalitySetRef const& fromServers );
// Returns a set of the attributes that this policy uses in selection and validation.
std::set<std::string> attributeKeys() const
{ std::set<std::string> keys; this->attributeKeys(&keys); return keys; }
virtual void attributeKeys(std::set<std::string>*) const = 0;
};
template <class Archive>
@ -108,6 +113,7 @@ struct PolicyOne : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
template <class Ar>
void serialize(Ar& ar) {
}
virtual void attributeKeys(std::set<std::string>* set) const override { return; }
};
struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross> {
@ -135,6 +141,9 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross>
static bool compareAddedResults(const std::pair<int, int>& rhs, const std::pair<int, int>& lhs)
{ return (rhs.first < lhs.first) || (!(lhs.first < rhs.first) && (rhs.second < lhs.second)); }
virtual void attributeKeys(std::set<std::string> *set) const override
{ set->insert(_attribKey); _policy->attributeKeys(set); }
protected:
int _count;
std::string _attribKey;
@ -207,6 +216,9 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
}
}
virtual void attributeKeys(std::set<std::string> *set) const override
{ for (const IRepPolicyRef& r : _policies) { r->attributeKeys(set); } }
protected:
std::vector<IRepPolicyRef> _policies;
std::vector<IRepPolicyRef> _sortedPolicies;

View File

@ -807,6 +807,31 @@ int testReplication()
return totalErrors;
}
namespace {
void filterLocalityDataForPolicy(const std::set<std::string>& keys, LocalityData* ld) {
for (auto iter = ld->_data.begin(); iter != ld->_data.end();) {
auto prev = iter;
iter++;
if (keys.find(prev->first.toString()) == keys.end()) {
ld->_data.erase(prev);
}
}
}
}
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld) {
if (!policy) return;
filterLocalityDataForPolicy(policy->attributeKeys(), ld);
}
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld) {
if (!policy) return;
std::set<std::string> keys = policy->attributeKeys();
for (LocalityData& ld : *vld) {
filterLocalityDataForPolicy(policy, &ld);
}
}
TEST_CASE("fdbrpc/Replication/test") {
printf("Running replication test\n");

View File

@ -71,4 +71,9 @@ extern bool validateAllCombinations(
std::vector<LocalityData> const& newItems,
unsigned int nCombinationSize,
bool bCheckIfValid = true);
/// Remove all pieces of locality information from the LocalityData that will not be used when validating the policy.
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld);
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld);
#endif

View File

@ -126,17 +126,8 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) {
if(Optional<StringRef>(m.param2) != txnStateStore->readValue(m.param1).get().cast_to<StringRef>()) { // FIXME: Make this check more specific, here or by reading configuration whenever there is a change
auto t = txnStateStore->readValue(m.param1).get();
if (logSystem && m.param1.startsWith( excludedServersPrefix )) {
// If one of our existing tLogs is now excluded, we have to die and recover
auto addr = decodeExcludedServersKey(m.param1);
for( auto tl : logSystem->getLogSystemConfig().tLogs ) {
if(!tl.present() || addr.excludes(tl.interf().commit.getEndpoint().address)) {
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("toCommit", toCommit!=NULL).detail("addr", addr.toString());
if(confChange) *confChange = true;
}
}
} else if(m.param1 != excludedServersVersionKey) {
if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey) {
auto t = txnStateStore->readValue(m.param1).get();
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("toCommit", toCommit!=NULL);
if(confChange) *confChange = true;
}

View File

@ -43,19 +43,20 @@ void failAfter( Future<Void> trigger, Endpoint e );
struct WorkerInfo : NonCopyable {
Future<Void> watcher;
ReplyPromise<ProcessClass> reply;
ReplyPromise<RegisterWorkerReply> reply;
Generation gen;
int reboots;
WorkerInterface interf;
ProcessClass initialClass;
ProcessClass processClass;
bool isExcluded;
WorkerInfo() : gen(-1), reboots(0) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<ProcessClass> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, bool isExcluded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded) {}
WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass) {}
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), isExcluded(r.isExcluded) {}
void operator=( WorkerInfo&& r ) noexcept(true) {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
@ -64,6 +65,7 @@ struct WorkerInfo : NonCopyable {
interf = std::move(r.interf);
initialClass = r.initialClass;
processClass = r.processClass;
isExcluded = r.isExcluded;
}
};
@ -79,6 +81,7 @@ public:
Promise<Void> forceMasterFailure;
int64_t masterRegistrationCount;
DatabaseConfiguration config; // Asynchronously updated via master registration
DatabaseConfiguration fullyRecoveredConfig;
Database db;
DBInfo() : masterRegistrationCount(0),
@ -259,7 +262,7 @@ public:
return results;
}
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDatacenters( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogs( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
{
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
@ -565,41 +568,29 @@ public:
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
};
struct AcrossDatacenterFitness {
struct TLogFitness {
ProcessClass::Fitness tlogFit;
int tlogCount;
AcrossDatacenterFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
TLogFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
AcrossDatacenterFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
TLogFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
AcrossDatacenterFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
std::set<Optional<Standalone<StringRef>>> dcs;
TLogFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
tlogFit = ProcessClass::BestFit;
for(auto it : tlogs) {
dcs.insert(it.first.locality.dcId());
tlogFit = std::max(tlogFit, it.second.machineClassFitness( ProcessClass::TLog ));
}
tlogCount = tlogs.size();
}
AcrossDatacenterFitness( vector<OptionalInterface<TLogInterface>> tlogs, std::vector<ProcessClass> processClasses ) {
std::set<Optional<Standalone<StringRef>>> dcs;
tlogFit = ProcessClass::BestFit;
for(int i = 0; i < tlogs.size(); i++) {
ASSERT(tlogs[i].present());
dcs.insert(tlogs[i].interf().locality.dcId());
tlogFit = std::max(tlogFit, processClasses[i].machineClassFitness( ProcessClass::TLog ));
}
tlogCount = tlogs.size();
}
bool operator < (AcrossDatacenterFitness const& r) const {
if(tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
bool operator < (TLogFitness const& r) const {
if (tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
return tlogCount > r.tlogCount;
}
bool operator == (AcrossDatacenterFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
bool operator == (TLogFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
};
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
@ -622,7 +613,7 @@ public:
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used );
auto tlogs = getWorkersForTlogs( req.configuration, id_used );
for(int i = 0; i < tlogs.size(); i++)
result.tLogs.push_back(tlogs[i].first);
@ -670,7 +661,7 @@ public:
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
( TLogFitness(tlogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
@ -685,15 +676,46 @@ public:
return false;
}
std::map< Optional<Standalone<StringRef>>, int> id_used;
// Get master process
auto masterWorker = id_worker.find(dbi.master.locality.processId());
if(masterWorker == id_worker.end())
if(masterWorker == id_worker.end()) {
return false;
}
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
// Get tlog processes
std::vector<std::pair<WorkerInterface, ProcessClass>> tlogs;
for( auto& it : dbi.logSystemConfig.tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
if ( tlogWorker->second.isExcluded )
return true;
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
// Get proxy classes
std::vector<ProcessClass> proxyClasses;
for(auto& it : dbi.client.proxies ) {
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
if ( proxyWorker->second.isExcluded )
return true;
proxyClasses.push_back(proxyWorker->second.processClass);
}
// Get resolver classes
std::vector<ProcessClass> resolverClasses;
for(auto& it : dbi.resolvers ) {
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
if ( resolverWorker->second.isExcluded )
return true;
resolverClasses.push_back(resolverWorker->second.processClass);
}
// Check master fitness. Don't return false if master is excluded in case all the processes are excluded, we still need master for recovery.
ProcessClass::Fitness oldMasterFit = masterWorker->second.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(dbi.master.address())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
@ -705,42 +727,26 @@ public:
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}
if(oldMasterFit < newMasterFit) return false;
if ( oldMasterFit < newMasterFit )
return false;
if ( oldMasterFit > newMasterFit && oldMasterFit == ProcessClass::ExcludeFit )
return true;
std::vector<ProcessClass> tlogProcessClasses;
for(auto& it : dbi.logSystemConfig.tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
tlogProcessClasses.push_back(tlogWorker->second.processClass);
}
AcrossDatacenterFitness oldAcrossFit(dbi.logSystemConfig.tLogs, tlogProcessClasses);
AcrossDatacenterFitness newAcrossFit(getWorkersForTlogsAcrossDatacenters(db.config, id_used, true));
// Check tLog fitness
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
if(oldAcrossFit < newAcrossFit) return false;
TLogFitness oldTLogFit(tlogs);
TLogFitness newTLotFit(getWorkersForTlogs(db.config, id_used, true));
if(oldTLogFit < newTLotFit) return false;
std::vector<ProcessClass> proxyClasses;
for(auto& it : dbi.client.proxies ) {
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
proxyClasses.push_back(proxyWorker->second.processClass);
}
std::vector<ProcessClass> resolverClasses;
for(auto& it : dbi.resolvers ) {
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
resolverClasses.push_back(resolverWorker->second.processClass);
}
// Check proxy/resolver fitness
InDatacenterFitness oldInFit(dbi.client.proxies, dbi.resolvers, proxyClasses, resolverClasses);
auto datacenters = getDatacenters( db.config, true );
InDatacenterFitness newInFit;
for(auto dcId : datacenters) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config, used );
@ -758,16 +764,17 @@ public:
if(oldInFit.betterInDatacenterFitness(newInFit)) return false;
if(oldMasterFit > newMasterFit || oldAcrossFit > newAcrossFit || oldInFit > newInFit) {
if(oldMasterFit > newMasterFit || oldTLogFit > newTLotFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldAcrossFitC", oldAcrossFit.tlogCount).detail("newAcrossFitC", newAcrossFit.tlogCount)
.detail("oldAcrossFitT", oldAcrossFit.tlogFit).detail("newAcrossFitT", newAcrossFit.tlogFit)
.detail("oldTLogFitC", oldTLogFit.tlogCount).detail("newTLotFitC", newTLotFit.tlogCount)
.detail("oldTLogFitT", oldTLogFit.tlogFit).detail("newTLotFitT", newTLotFit.tlogFit)
.detail("oldInFitP", oldInFit.proxyFit).detail("newInFitP", newInFit.proxyFit)
.detail("oldInFitR", oldInFit.resolverFit).detail("newInFitR", newInFit.resolverFit)
.detail("oldInFitPC", oldInFit.proxyCount).detail("newInFitPC", newInFit.proxyCount)
.detail("oldInFitRC", oldInFit.resolverCount).detail("newInFitRC", newInFit.resolverCount);
return true;
}
return false;
}
@ -775,6 +782,7 @@ public:
std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database
Standalone<RangeResultRef> lastProcessClasses;
bool gotProcessClasses;
bool gotFullyRecoveredConfig;
Optional<Standalone<StringRef>> masterProcessId;
Optional<Standalone<StringRef>> clusterControllerProcessId;
UID id;
@ -789,7 +797,7 @@ public:
double startTime;
explicit ClusterControllerData( ClusterControllerFullInterface ccInterface )
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), startTime(now())
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now())
{
auto serverInfo = db.serverInfo->get();
serverInfo.id = g_random->randomUniqueID();
@ -1067,7 +1075,7 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
when( Void _ = wait( failed ) ) { // remove workers that have failed
WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ];
if (!failedWorkerInfo.reply.isSet()) {
failedWorkerInfo.reply.send( failedWorkerInfo.processClass );
failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.isExcluded) );
}
cluster->id_worker.erase( worker.locality.processId() );
cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
@ -1101,7 +1109,6 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, FutureStream< FailureMo
state std::deque<SystemFailureStatus> statusHistory; // The last change in statusHistory is from currentVersion-1 to currentVersion
state Future<Void> periodically = Void();
state double lastT = 0;
state double clientRequestInterval = FLOW_KNOBS->CLIENT_REQUEST_INTERVAL;
loop choose {
when ( FailureMonitoringRequest req = waitNext( requests ) ) {
@ -1142,8 +1149,13 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, FutureStream< FailureMo
TEST(true); // failureDetectionServer sending failure data to requester
FailureMonitoringReply reply;
reply.failureInformationVersion = currentVersion;
reply.clientRequestIntervalMS = clientRequestInterval * 1000;
reply.considerServerFailedTimeoutMS = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY * 1000;
if( req.senderStatus.present() ) {
reply.clientRequestIntervalMS = FLOW_KNOBS->SERVER_REQUEST_INTERVAL * 1000;
reply.considerServerFailedTimeoutMS = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY * 1000;
} else {
reply.clientRequestIntervalMS = FLOW_KNOBS->CLIENT_REQUEST_INTERVAL * 1000;
reply.considerServerFailedTimeoutMS = CLIENT_KNOBS->CLIENT_FAILURE_TIMEOUT_DELAY * 1000;
}
ASSERT( currentVersion >= (int64_t)statusHistory.size());
@ -1165,7 +1177,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, FutureStream< FailureMo
}
}
when ( Void _ = wait( periodically ) ) {
periodically = delay( FLOW_KNOBS->CLIENT_REQUEST_INTERVAL );
periodically = delay( FLOW_KNOBS->SERVER_REQUEST_INTERVAL );
double t = now();
if (lastT != 0 && t - lastT > 1)
TraceEvent("LongDelayOnClusterController").detail("Duration", t - lastT);
@ -1184,7 +1196,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, FutureStream< FailureMo
std::nth_element(delays.begin(), delays.begin()+pivot, delays.end());
pivotDelay = *(delays.begin()+pivot);
}
pivotDelay = std::max(0.0, pivotDelay - clientRequestInterval);
pivotDelay = std::max(0.0, pivotDelay - FLOW_KNOBS->SERVER_REQUEST_INTERVAL);
TraceEvent("FailureDetectionPoll", uniqueID).detail("PivotDelay", pivotDelay).detail("Clients", currentStatus.size());
//TraceEvent("FailureDetectionAcceptableDelay").detail("ms", acceptableDelay*1000);
@ -1192,7 +1204,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, FutureStream< FailureMo
for(auto it = currentStatus.begin(); it != currentStatus.end(); ) {
double delay = t - it->second.lastRequestTime;
if ( it->first != g_network->getLocalAddress() && ( delay > pivotDelay * 2 + clientRequestInterval + CLIENT_KNOBS->FAILURE_MIN_DELAY || delay > CLIENT_KNOBS->FAILURE_MAX_DELAY ) ) {
if ( it->first != g_network->getLocalAddress() && ( delay > pivotDelay * 2 + FLOW_KNOBS->SERVER_REQUEST_INTERVAL + CLIENT_KNOBS->FAILURE_MIN_DELAY || delay > CLIENT_KNOBS->FAILURE_MAX_DELAY ) ) {
//printf("Failure Detection Server: Status of '%s' is now '%s' after %f sec\n", it->first.toString().c_str(), "Failed", now() - it->second.lastRequestTime);
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", it->first).detail("Status","Failed").detail("Why", "Timeout").detail("LastRequestAge", delay)
.detail("PivotDelay", pivotDelay);
@ -1278,7 +1290,20 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
}
db->masterRegistrationCount = req.registrationCount;
if(req.configuration.present()) db->config = req.configuration.get();
if ( req.configuration.present() ) {
db->config = req.configuration.get();
if ( req.recoveryState >= RecoveryState::FULLY_RECOVERED ) {
self->gotFullyRecoveredConfig = true;
db->fullyRecoveredConfig = req.configuration.get();
for ( auto& it : self->id_worker ) {
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(it.second.interf.address());
if ( it.second.isExcluded != isExcludedFromConfig && !it.second.reply.isSet() ) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, isExcludedFromConfig) );
}
}
}
}
bool isChanged = false;
auto dbInfo = self->db.serverInfo->get();
@ -1331,6 +1356,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
WorkerInterface w = req.wi;
ProcessClass newProcessClass = req.processClass;
bool newIsExcluded = req.isExcluded;
auto info = self->id_worker.find( w.locality.processId() );
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace();
@ -1339,39 +1365,43 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
self->clusterControllerProcessId = w.locality.processId();
}
// Check process class if needed
if (self->gotProcessClasses && (info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen)) {
auto classIter = self->id_class.find(w.locality.processId());
if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) {
newProcessClass = classIter->second;
} else {
newProcessClass = req.initialClass;
// Check process class and exclusive property
if ( info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if ( self->gotProcessClasses ) {
auto classIter = self->id_class.find(w.locality.processId());
if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) {
newProcessClass = classIter->second;
} else {
newProcessClass = req.initialClass;
}
}
// Notify the worker to register again with new process class
if (newProcessClass != req.processClass && !req.reply.isSet()) {
req.reply.send( newProcessClass );
if ( self->gotFullyRecoveredConfig ) {
newIsExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.address());
}
// Notify the worker to register again with new process class/exclusive property
if ( !req.reply.isSet() && ( newProcessClass != req.processClass || newIsExcluded != req.isExcluded ) ) {
req.reply.send( RegisterWorkerReply(newProcessClass, newIsExcluded) );
}
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass );
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, req.isExcluded );
checkOutstandingRequests( self );
return;
}
if( info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if (info->second.processClass != newProcessClass) {
info->second.processClass = newProcessClass;
}
info->second.initialClass = req.initialClass;
if (!info->second.reply.isSet()) {
info->second.reply.send( Never() );
}
info->second.reply = req.reply;
info->second.processClass = newProcessClass;
info->second.isExcluded = req.isExcluded;
info->second.initialClass = req.initialClass;
info->second.gen = req.generation;
if(info->second.interf.id() != w.id()) {
@ -1575,7 +1605,7 @@ ACTOR Future<Void> monitorProcessClasses(ClusterControllerData *self) {
if (newProcessClass != w.second.processClass) {
w.second.processClass = newProcessClass;
if (!w.second.reply.isSet()) {
w.second.reply.send( newProcessClass );
w.second.reply.send( RegisterWorkerReply(newProcessClass, w.second.isExcluded) );
}
}
}
@ -1724,14 +1754,14 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
}
}
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) {
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
loop {
state ClusterControllerFullInterface cci;
state bool inRole = false;
cci.initEndpoints();
try {
//Register as a possible leader; wait to be elected
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass );
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded );
while (!currentCC->get().present() || currentCC->get().get() != cci) {
choose {
@ -1755,12 +1785,12 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
}
}
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass) {
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
state bool hasConnected = false;
loop {
try {
ServerCoordinators coordinators( connFile );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass ) );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded ) );
} catch( Error &e ) {
if( e.code() != error_code_coordinators_changed )
throw; // Expected to terminate fdbserver

View File

@ -112,20 +112,34 @@ struct RecruitStorageRequest {
}
};
struct RegisterWorkerReply {
ProcessClass processClass;
bool isExcluded;
RegisterWorkerReply() {}
RegisterWorkerReply(ProcessClass processClass, bool isExcluded) : processClass(processClass), isExcluded(isExcluded) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & processClass & isExcluded;
}
};
struct RegisterWorkerRequest {
WorkerInterface wi;
ProcessClass processClass;
ProcessClass initialClass;
bool isExcluded;
Generation generation;
ReplyPromise<ProcessClass> reply;
ReplyPromise<RegisterWorkerReply> reply;
RegisterWorkerRequest() {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), generation(generation) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, bool isExcluded, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded), generation(generation) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & wi & initialClass & processClass & generation & reply;
ar & wi & initialClass & processClass & isExcluded & generation & reply;
}
};

View File

@ -250,7 +250,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
} else {
Optional<LeaderInfo> nextNominee;
if (availableLeaders.size() && availableCandidates.size()) {
nextNominee = (*availableLeaders.begin()).leaderChangeRequired(*availableCandidates.begin()) ? *availableCandidates.begin() : *availableLeaders.begin();
nextNominee = ( *availableLeaders.begin() < *availableCandidates.begin() ) ? *availableLeaders.begin() : *availableCandidates.begin();
} else if (availableLeaders.size()) {
nextNominee = *availableLeaders.begin();
} else if (availableCandidates.size()) {
@ -259,13 +259,16 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
nextNominee = Optional<LeaderInfo>();
}
if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && !currentNominee.get().equalInternalId(nextNominee.get())) || !availableLeaders.size() ) {
if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && currentNominee.get().leaderChangeRequired(nextNominee.get())) || !availableLeaders.size() ) {
TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
.detail("Changed", nextNominee != currentNominee).detail("Key", printable(key));
for(int i=0; i<notify.size(); i++)
notify[i].send( nextNominee );
notify.clear();
currentNominee = nextNominee;
} else if (currentNominee.present() && nextNominee.present() && currentNominee.get().equalInternalId(nextNominee.get())) {
// leader becomes better
currentNominee = nextNominee;
}
if( availableLeaders.size() ) {

View File

@ -324,7 +324,7 @@ ACTOR Future<Void> shardSplitter(
StorageMetrics splitMetrics;
splitMetrics.bytes = shardBounds.max.bytes / 2;
splitMetrics.bytesPerKSecond = SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
splitMetrics.bytesPerKSecond = keys.begin >= keyServersKeys.begin ? splitMetrics.infinity : SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
splitMetrics.iosPerKSecond = splitMetrics.infinity;
state Standalone<VectorRef<KeyRef>> splitKeys = wait( getSplitKeys(self, keys, splitMetrics, metrics ) );
@ -482,7 +482,7 @@ ACTOR Future<Void> shardEvaluator(
StorageMetrics const& stats = shardSize->get().get();
bool shouldSplit = stats.bytes > shardBounds.max.bytes ||
getBandwidthStatus( stats ) == BandwidthStatusHigh;
( getBandwidthStatus( stats ) == BandwidthStatusHigh && keys.begin < keyServersKeys.begin );
bool shouldMerge = stats.bytes < shardBounds.min.bytes &&
getBandwidthStatus( stats ) == BandwidthStatusLow;

View File

@ -111,11 +111,11 @@ private:
class RawDiskQueue_TwoFiles {
public:
RawDiskQueue_TwoFiles( std::string basename, UID dbgid )
RawDiskQueue_TwoFiles( std::string basename, UID dbgid, int64_t fileSizeWarningLimit )
: basename(basename), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
dbg_file0BeginSeq(0), fileExtensionBytes(10<<20), readingBuffer( dbgid ),
readyToPush(Void())
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit)
{
if(BUGGIFY)
fileExtensionBytes = 8<<10;
@ -184,6 +184,7 @@ public:
UID dbgid;
int64_t dbg_file0BeginSeq;
int64_t fileSizeWarningLimit;
Promise<Void> error, stopped;
Future<Void> onError, onStopped;
@ -249,6 +250,10 @@ public:
int64_t minExtension = pageData.size() + writingPos - files[1].size;
files[1].size += std::min(std::max(fileExtensionBytes, minExtension), files[0].size+files[1].size+minExtension);
waitfor.push_back( files[1].f->truncate( files[1].size ) );
if(fileSizeWarningLimit > 0 && files[1].size > fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", dbgid).detail("filename", filename(1)).detail("size", files[1].size).suppressFor(1.0);
}
}
}
@ -636,8 +641,8 @@ public:
class DiskQueue : public IDiskQueue {
public:
DiskQueue( std::string basename, UID dbgid )
: rawQueue( new RawDiskQueue_TwoFiles(basename, dbgid) ), dbgid(dbgid), anyPopped(false), nextPageSeq(0), poppedSeq(0), lastPoppedSeq(0),
DiskQueue( std::string basename, UID dbgid, int64_t fileSizeWarningLimit )
: rawQueue( new RawDiskQueue_TwoFiles(basename, dbgid,fileSizeWarningLimit) ), dbgid(dbgid), anyPopped(false), nextPageSeq(0), poppedSeq(0), lastPoppedSeq(0),
nextReadLocation(-1), readBufPage(NULL), readBufPos(0), pushed_page_buffer(NULL), recovered(false), lastCommittedSeq(0), warnAlwaysForMemory(true)
{
}
@ -1029,7 +1034,7 @@ private:
class DiskQueue_PopUncommitted : public IDiskQueue {
public:
DiskQueue_PopUncommitted( std::string basename, UID dbgid ) : queue(new DiskQueue(basename, dbgid)), pushed(0), popped(0), committed(0) { };
DiskQueue_PopUncommitted( std::string basename, UID dbgid, int64_t fileSizeWarningLimit ) : queue(new DiskQueue(basename, dbgid, fileSizeWarningLimit)), pushed(0), popped(0), committed(0) { };
//IClosable
Future<Void> getError() { return queue->getError(); }
@ -1097,6 +1102,6 @@ private:
}
};
IDiskQueue* openDiskQueue( std::string basename, UID dbgid ) {
return new DiskQueue_PopUncommitted( basename, dbgid );
IDiskQueue* openDiskQueue( std::string basename, UID dbgid, int64_t fileSizeWarningLimit ) {
return new DiskQueue_PopUncommitted( basename, dbgid, fileSizeWarningLimit );
}

View File

@ -55,6 +55,6 @@ public:
virtual StorageBytes getStorageBytes() = 0;
};
IDiskQueue* openDiskQueue( std::string basename, UID dbgid ); // opens basename+"0.fdq" and basename+"1.fdq"
IDiskQueue* openDiskQueue( std::string basename, UID dbgid, int64_t fileSizeWarningLimit = -1 ); // opens basename+"0.fdq" and basename+"1.fdq"
#endif

View File

@ -75,7 +75,7 @@ ACTOR Future<Void> changeLeaderCoordinators( ServerCoordinators coordinators, Va
return Void();
}
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) {
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
state Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees( new AsyncVar<vector<Optional<LeaderInfo>>>() );
state LeaderInfo myInfo;
state Future<Void> candidacies;
@ -94,7 +94,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
myInfo.changeID = g_random->randomUniqueID();
prevChangeID = myInfo.changeID;
myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController));
myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
vector<Future<Void>> cand;
for(int i=0; i<coordinators.leaderElectionServers.size(); i++)
@ -153,7 +153,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
break;
}
when (Void _ = wait(candidacies)) { ASSERT(false); }
when (Void _ = wait( asyncProcessClass->onChange() )) {
when (Void _ = wait( asyncProcessClass->onChange() || asyncIsExcluded->onChange() )) {
break;
}
}
@ -166,7 +166,8 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
loop {
prevChangeID = myInfo.changeID;
if (myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController))) {
myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
if (myInfo.changeID != prevChangeID) {
TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID);
}

View File

@ -32,7 +32,8 @@ Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass);
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded);
// Participates in the given coordination group's leader election process, nominating the given
// LeaderInterface (presumed to be a local interface) as leader. The leader election process is
@ -48,17 +49,18 @@ Future<Void> changeLeaderCoordinators( ServerCoordinators const& coordinators, V
#pragma region Implementation
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
template <class LeaderInterface>
Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass)
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded)
{
Reference<AsyncVar<Value>> serializedInfo( new AsyncVar<Value> );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass, asyncIsExcluded );
return m || asyncDeserialize( serializedInfo, outKnownLeader );
}

View File

@ -21,6 +21,7 @@
#include "LogSystem.h"
#include "fdbrpc/FailureMonitor.h"
#include "Knobs.h"
#include "fdbrpc/ReplicationUtils.h"
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) {
@ -243,12 +244,14 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<O
serverCursors.push_back( cursor );
}
sortedVersions.resize(serverCursors.size());
filterLocalityDataForPolicy(this->tLogPolicy, &this->tLogLocalities);
}
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
: serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities), tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor) {
sortedVersions.resize(serverCursors.size());
calcHasMessage();
filterLocalityDataForPolicy(this->tLogPolicy, &this->tLogLocalities);
}
Reference<ILogSystem::IPeekCursor> ILogSystem::MergedPeekCursor::cloneNoMore() {

View File

@ -757,6 +757,10 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
}
machine_count = g_random->randomInt( std::max( 2+datacenters, db.minMachinesRequired() ), extraDB ? 6 : 10 );
if(minimumReplication > 1 && datacenters == 3) {
//low latency tests in 3 data hall mode need 2 other data centers with 2 machines each to avoid waiting for logs to recover.
machine_count = std::max( machine_count, 6);
}
processes_per_machine = g_random->randomInt(1, (extraDB ? 14 : 28)/machine_count + 2 );
coordinators = BUGGIFY ? g_random->randomInt(1, machine_count+1) : std::min( machine_count, db.maxMachineFailuresTolerated()*2 + 1 );
}

View File

@ -1411,6 +1411,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
state UID lastId = UID(1,1); //initialized so it will not compare equal to a default UID
state double recoverMemoryLimit = SERVER_KNOBS->TARGET_BYTES_PER_TLOG + SERVER_KNOBS->SPRING_BYTES_TLOG;
if (BUGGIFY) recoverMemoryLimit = std::max<double>(SERVER_KNOBS->BUGGIFY_RECOVER_MEMORY_LIMIT, SERVER_KNOBS->TLOG_SPILL_THRESHOLD);
try {
loop {
if(allRemoved.isReady()) {

View File

@ -141,6 +141,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogLocalities = lsConf.tLogLocalities;
logSystem->logSystemType = lsConf.logSystemType;
logSystem->UpdateLocalitySet(lsConf.tLogs);
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
return logSystem;
}
@ -159,6 +160,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogReplicationFactor = lsConf.oldTLogs[0].tLogReplicationFactor;
logSystem->tLogPolicy = lsConf.oldTLogs[0].tLogPolicy;
logSystem->tLogLocalities = lsConf.oldTLogs[0].tLogLocalities;
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1);
for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) {
@ -346,21 +348,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Void _ = wait( quorum( alive, std::min(self->tLogReplicationFactor, numPresent - self->tLogWriteAntiQuorum) ) );
state Reference<LocalityGroup> locked(new LocalityGroup());
state std::vector<bool> responded(alive.size());
for (int i = 0; i < alive.size(); i++) {
responded[i] = false;
}
loop {
LocalityGroup locked;
std::vector<LocalityData> unlocked, unused;
for (int i = 0; i < alive.size(); i++) {
if (alive[i].isReady() && !alive[i].isError()) {
locked.add(self->tLogLocalities[i]);
} else {
unlocked.push_back(self->tLogLocalities[i]);
if (!responded[i] && alive[i].isReady() && !alive[i].isError()) {
locked->add(self->tLogLocalities[i]);
responded[i] = true;
}
}
bool quorum_obtained = locked.validate(self->tLogPolicy);
if (!quorum_obtained && self->tLogWriteAntiQuorum != 0) {
quorum_obtained = !validateAllCombinations(unused, locked, self->tLogPolicy, unlocked, self->tLogWriteAntiQuorum, false);
}
if (self->tLogReplicationFactor - self->tLogWriteAntiQuorum == 1 && locked.size() > 0) {
bool quorum_obtained = locked->validate(self->tLogPolicy);
// We intentionally skip considering antiquorums, as the CPU cost of doing so is prohibitive.
if (self->tLogReplicationFactor == 1 && locked->size() > 0) {
ASSERT(quorum_obtained);
}
if (quorum_obtained) {
@ -581,6 +583,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogPolicy = prevState.tLogPolicy;
logSystem->tLogLocalities = prevState.tLogLocalities;
logSystem->logSystemType = prevState.logSystemType;
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
logSystem->epochEndVersion = 0;
logSystem->knownCommittedVersion = 0;
@ -869,6 +872,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );
logSystem->tLogLocalities[i] = workers[i].locality;
}
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
//Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout
if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed();

View File

@ -256,8 +256,8 @@ class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, int
Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b );
Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, Reference<AsyncVar<bool>> const& asyncIsExcluded, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
// These servers are started by workerServer
Future<Void> storageServer(

View File

@ -954,6 +954,37 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
}
}
ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
state Database cx = openDBOnServer(self->dbInfo, TaskDefaultEndpoint, true, true);
loop {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Future<Standalone<RangeResultRef>> fresults = tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY );
Void _ = wait( success(fresults) );
Standalone<RangeResultRef> results = fresults.get();
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
DatabaseConfiguration conf;
conf.fromKeyValues((VectorRef<KeyValueRef>) results);
if(conf != self->configuration) {
self->configuration = conf;
self->registrationTrigger.trigger();
}
state Future<Void> watchFuture = tr.watch(excludedServersVersionKey);
Void _ = wait(tr.commit());
Void _ = wait(watchFuture);
break;
} catch (Error& e) {
Void _ = wait( tr.onError(e) );
}
}
}
}
ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<Void>> addActor )
{
state TraceInterval recoveryInterval("MasterRecovery");
@ -1157,6 +1188,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
if( self->resolvers.size() > 1 )
addActor.send( resolutionBalancing(self) );
state Future<Void> configMonitor = configurationMonitor( self );
addActor.send( changeCoordinators(self, skipTransition) );
addActor.send( trackTlogRecovery(self, oldLogSystems, skipTransition) );
@ -1164,7 +1196,8 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
when( Void _ = wait( tlogFailure ) ) { throw internal_error(); }
when( Void _ = wait( proxyFailure ) ) { throw internal_error(); }
when( Void _ = wait( resolverFailure ) ) { throw internal_error(); }
when (Void _ = wait(providingVersions)) { throw internal_error(); }
when( Void _ = wait( providingVersions ) ) { throw internal_error(); }
when( Void _ = wait( configMonitor ) ) { throw internal_error(); }
}
}

View File

@ -313,7 +313,7 @@ public:
CoalescedKeyRangeMap< Version > newestDirtyVersion; // Similar to newestAvailableVersion, but includes (only) keys that were only partly available (due to cancelled fetchKeys)
// The following are in rough order from newest to oldest
Version lastTLogVersion, lastVersionWithData;
Version lastTLogVersion, lastVersionWithData, restoredVersion;
NotifiedVersion version;
NotifiedVersion desiredOldestVersion; // We can increase oldestVersion (and then durableVersion) to this version when the disk permits
NotifiedVersion oldestVersion; // See also storageVersion()
@ -416,7 +416,7 @@ public:
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
: instanceID(g_random->randomUniqueID().first()),
storage(this, storage), db(db),
lastTLogVersion(0), lastVersionWithData(0),
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
updateEagerReads(0),
shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
@ -458,6 +458,7 @@ public:
oldestVersion = ver;
durableVersion = ver;
lastVersionWithData = ver;
restoredVersion = ver;
mutableData().createNewVersion(ver);
mutableData().forgetVersionsBefore(ver);
@ -2186,7 +2187,7 @@ bool containsRollback( VersionUpdateRef const& changes, Version& rollbackVersion
class StorageUpdater {
public:
StorageUpdater(Version fromVersion, Version newOldestVersion) : fromVersion(fromVersion), newOldestVersion(newOldestVersion), currentVersion(fromVersion), processedStartKey(false) {}
StorageUpdater(Version fromVersion, Version newOldestVersion, Version restoredVersion) : fromVersion(fromVersion), newOldestVersion(newOldestVersion), currentVersion(fromVersion), restoredVersion(restoredVersion), processedStartKey(false) {}
void applyMutation(StorageServer* data, MutationRef const& m, Version ver) {
//TraceEvent("SSNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver);
@ -2216,6 +2217,7 @@ public:
Version currentVersion;
private:
Version fromVersion;
Version restoredVersion;
KeyRef startKey;
bool nowAssigned;
@ -2252,7 +2254,7 @@ private:
BinaryReader br(m.param2, Unversioned());
br >> rollbackVersion;
if ( rollbackVersion < fromVersion ) {
if ( rollbackVersion < fromVersion && rollbackVersion > restoredVersion) {
TEST( true ); // ShardApplyPrivateData shard rollback
TraceEvent(SevWarn, "Rollback", data->thisServerID)
.detail("FromVersion", fromVersion)
@ -2389,7 +2391,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
data->updateEagerReads = &eager;
data->debug_inApplyUpdate = true;
StorageUpdater updater(data->lastVersionWithData, std::max( std::max(data->desiredOldestVersion.get(), data->oldestVersion.get()), minNewOldestVersion ));
StorageUpdater updater(data->lastVersionWithData, std::max( std::max(data->desiredOldestVersion.get(), data->oldestVersion.get()), minNewOldestVersion ), data->restoredVersion);
if (EXPENSIVE_VALIDATION) data->data().atLatest().validate();
validate(data);

View File

@ -250,16 +250,17 @@ std::vector< DiskStore > getDiskStores( std::string folder ) {
return result;
}
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass ) {
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
state Generation requestGeneration = 0;
loop {
Future<ProcessClass> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), requestGeneration++) ) ) : Never();
Future<RegisterWorkerReply> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), asyncIsExcluded->get(), requestGeneration++) ) ) : Never();
choose {
when ( ProcessClass newProcessClass = wait( registrationReply )) {
asyncProcessClass->set(newProcessClass);
when ( RegisterWorkerReply reply = wait( registrationReply )) {
asyncProcessClass->set( reply.processClass );
asyncIsExcluded->set( reply.isExcluded );
}
when ( Void _ = wait( ccInterface->onChange() )) { }
}
@ -479,7 +480,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities,
Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
state PromiseStream< ErrorInfo > errors;
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
state ActorCollection errorForwarders(false);
@ -614,7 +615,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
} else if( s.storedComponent == DiskStore::TLogData ) {
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
IDiskQueue* queue = openDiskQueue(
joinPath( folder, fileLogQueuePrefix.toString() + s.storeID.toString() + "-" ), s.storeID );
joinPath( folder, fileLogQueuePrefix.toString() + s.storeID.toString() + "-" ), s.storeID, 10*SERVER_KNOBS->TARGET_BYTES_PER_TLOG);
filesClosed.add( kv->onClosed() );
filesClosed.add( queue->onClosed() );
@ -642,7 +643,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
startRole( interf.id(), interf.id(), "Worker", details );
Void _ = wait(waitForAll(recoveries));
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass ) );
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass, asyncIsExcluded ) );
TraceEvent("RecoveriesComplete", interf.id());
@ -957,13 +958,14 @@ ACTOR Future<Void> fdbd(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> );
Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> );
Reference<AsyncVar<ProcessClass>> asyncProcessClass(new AsyncVar<ProcessClass>(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource)));
Reference<AsyncVar<bool>> asyncIsExcluded(new AsyncVar<bool>(false));
vector<Future<Void>> v;
if ( coordFolder.size() )
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass), "clusterController") );
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass, asyncIsExcluded), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, asyncIsExcluded, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
Void _ = wait( quorum(v,1) );

View File

@ -1187,44 +1187,65 @@ struct ConsistencyCheckWorkload : TestWorkload
//Returns true if all machines in the cluster that specified a desired class are operating in that class
ACTOR Future<bool> checkUsingDesiredClasses(Database cx, ConsistencyCheckWorkload *self)
{
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) );
state vector<std::pair<WorkerInterface, ProcessClass>> allWorkers = wait( getWorkers( self->dbInfo ) );
state vector<std::pair<WorkerInterface, ProcessClass>> nonExcludedWorkers = wait( getWorkers( self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) );
state vector<StorageServerInterface> storageServers = wait( getStorageServers( cx ) );
auto& db = self->dbInfo->get();
std::set<ProcessClass::ClassType> availableClassTypes;
std::map<NetworkAddress, ProcessClass> workerProcessMap;
std::set<ProcessClass::ClassType> allClassTypes;
std::map<NetworkAddress, ProcessClass> allWorkerProcessMap;
for (auto worker : allWorkers) {
allClassTypes.insert(worker.second.classType());
allWorkerProcessMap[worker.first.address()] = worker.second;
}
for (auto worker : workers) {
availableClassTypes.insert(worker.second.classType());
workerProcessMap[worker.first.address()] = worker.second;
std::set<ProcessClass::ClassType> nonExcludedClassTypes;
std::map<NetworkAddress, ProcessClass> nonExcludedWorkerProcessMap;
for (auto worker : nonExcludedWorkers) {
nonExcludedClassTypes.insert(worker.second.classType());
nonExcludedWorkerProcessMap[worker.first.address()] = worker.second;
}
// Check cluster controller
ProcessClass::Fitness bestClusterControllerFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::ClusterController);
if (!nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) || nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].machineClassFitness(ProcessClass::ClusterController) != bestClusterControllerFitness) {
TraceEvent("ConsistencyCheck_ClusterControllerNotBest").detail("bestClusterControllerFitness", bestClusterControllerFitness).detail("existingClusterControllerFit", nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) ? nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].machineClassFitness(ProcessClass::ClusterController) : -1);
return false;
}
// Check master
ProcessClass::Fitness bestMasterFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Master);
if (!workerProcessMap.count(db.master.address()) || workerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
TraceEvent("ConsistencyCheck_MasterNotBest").detail("bestMasterFitness", bestMasterFitness).detail("existingMasterFit", workerProcessMap.count(db.master.address()) ? workerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) : -1);
ProcessClass::Fitness bestMasterFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::Master);
if (bestMasterFitness == ProcessClass::NeverAssign) {
bestMasterFitness = getBestAvailableFitness(allClassTypes, ProcessClass::Master);
if (bestMasterFitness != ProcessClass::NeverAssign) {
bestMasterFitness = ProcessClass::ExcludeFit;
}
}
if (!allWorkerProcessMap.count(db.master.address()) || (!nonExcludedWorkerProcessMap.count(db.master.address()) && bestMasterFitness != ProcessClass::ExcludeFit) || nonExcludedWorkerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
TraceEvent("ConsistencyCheck_MasterNotBest").detail("bestMasterFitness", bestMasterFitness).detail("existingMasterFit", nonExcludedWorkerProcessMap.count(db.master.address()) ? nonExcludedWorkerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) : -1);
return false;
}
// Check master proxy
ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Proxy);
ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::Proxy);
for (auto masterProxy : db.client.proxies) {
if (!workerProcessMap.count(masterProxy.address()) || workerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
TraceEvent("ConsistencyCheck_ProxyNotBest").detail("bestMasterProxyFitness", bestMasterProxyFitness).detail("existingMasterProxyFitness", workerProcessMap.count(masterProxy.address()) ? workerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) : -1);
if (!nonExcludedWorkerProcessMap.count(masterProxy.address()) || nonExcludedWorkerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
TraceEvent("ConsistencyCheck_ProxyNotBest").detail("bestMasterProxyFitness", bestMasterProxyFitness).detail("existingMasterProxyFitness", nonExcludedWorkerProcessMap.count(masterProxy.address()) ? nonExcludedWorkerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) : -1);
return false;
}
}
// Check master resolver
ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Resolver);
// Check resolver
ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::Resolver);
for (auto resolver : db.resolvers) {
if (!workerProcessMap.count(resolver.address()) || workerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
TraceEvent("ConsistencyCheck_ResolverNotBest").detail("bestResolverFitness", bestResolverFitness).detail("existingResolverFitness", workerProcessMap.count(resolver.address()) ? workerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) : -1);
if (!nonExcludedWorkerProcessMap.count(resolver.address()) || nonExcludedWorkerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
TraceEvent("ConsistencyCheck_ResolverNotBest").detail("bestResolverFitness", bestResolverFitness).detail("existingResolverFitness", nonExcludedWorkerProcessMap.count(resolver.address()) ? nonExcludedWorkerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) : -1);
return false;
}
}
// TODO: Check Tlog and cluster controller
// TODO: Check Tlog
return true;
}

View File

@ -34,6 +34,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( DELAY_JITTER_RANGE, 0.2 );
init( BUSY_WAIT_THRESHOLD, 0 ); // 1e100 == never sleep
init( CLIENT_REQUEST_INTERVAL, 0.1 ); if( randomize && BUGGIFY ) CLIENT_REQUEST_INTERVAL = 1.0;
init( SERVER_REQUEST_INTERVAL, 0.1 ); if( randomize && BUGGIFY ) SERVER_REQUEST_INTERVAL = 1.0;
init( REACTOR_FLAGS, 0 );

View File

@ -55,6 +55,7 @@ public:
double DELAY_JITTER_RANGE;
double BUSY_WAIT_THRESHOLD;
double CLIENT_REQUEST_INTERVAL;
double SERVER_REQUEST_INTERVAL;
int DISABLE_ASSERTS;
double QUEUE_MODEL_SMOOTHING_AMOUNT;