Merge branch 'master' of github.com:apple/foundationdb

This commit is contained in:
Yichi Chiang 2017-11-01 16:20:38 -07:00
commit eeaea60f94
17 changed files with 85 additions and 46 deletions

View File

@ -22,10 +22,8 @@ package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import com.apple.foundationdb.async.AsyncUtil;
@ -72,7 +70,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
public <T> CompletableFuture<T> runAsync(final Function<? super Transaction, CompletableFuture<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<>(createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<>();
return AsyncUtil.whileTrue(v -> {
return AsyncUtil.whileTrue(() -> {
CompletableFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());
return process.thenComposeAsync(returnVal ->

View File

@ -22,18 +22,15 @@ package com.apple.foundationdb.async;
import static com.apple.foundationdb.FDB.DEFAULT_EXECUTOR;
import com.apple.foundationdb.FDBException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Provided utilities for using and manipulating {@link CompletableFuture}s.
@ -85,9 +82,9 @@ public class AsyncUtil {
final List<V> accumulator = new LinkedList<V>();
// The condition of the while loop is simply "onHasNext()" returning true
Function<Void, CompletableFuture<Boolean>> condition = new Function<Void, CompletableFuture<Boolean>>() {
Supplier<CompletableFuture<Boolean>> condition = new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void v) {
public CompletableFuture<Boolean> get() {
return it.onHasNext().thenApply(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean o) {
@ -170,11 +167,11 @@ public class AsyncUtil {
}
private static class LoopPartial implements BiFunction<Boolean, Throwable, Void> {
final Function<Void, ? extends CompletableFuture<Boolean>> body;
final Supplier<? extends CompletableFuture<Boolean>> body;
final CompletableFuture<Void> done;
final Executor executor;
public LoopPartial(Function<Void, ? extends CompletableFuture<Boolean>> body, Executor executor) {
public LoopPartial(Supplier<? extends CompletableFuture<Boolean>> body, Executor executor) {
this.body = body;
this.done = new CompletableFuture<>();
this.executor = executor;
@ -192,7 +189,7 @@ public class AsyncUtil {
}
CompletableFuture<Boolean> result;
try {
result = body.apply(null);
result = body.get();
} catch (Exception e) {
done.completeExceptionally(e);
break;
@ -226,7 +223,10 @@ public class AsyncUtil {
* @param body the asynchronous operation over which to loop
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier) whileTrue} that takes a
* {@link Supplier} instead.
*/
@Deprecated
public static CompletableFuture<Void> whileTrue(Function<Void,? extends CompletableFuture<Boolean>> body) {
return whileTrue(body, DEFAULT_EXECUTOR);
}
@ -238,8 +238,34 @@ public class AsyncUtil {
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier, Executor) whileTrue} that takes a
* {@link Supplier} instead.
*/
@Deprecated
public static CompletableFuture<Void> whileTrue(Function<Void,? extends CompletableFuture<Boolean>> body, Executor executor) {
return whileTrue(() -> body.apply(null), executor);
}
/**
* Executes an asynchronous operation repeatedly until it returns {@code False}.
*
* @param body the asynchronous operation over which to loop
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
*/
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body) {
return whileTrue(body, DEFAULT_EXECUTOR);
}
/**
* Executes an asynchronous operation repeatedly until it returns {@code False}.
*
* @param body the asynchronous operation over which to loop
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
*/
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body, Executor executor) {
return new LoopPartial(body, executor).run();
}

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
@ -730,9 +731,9 @@ public class DirectoryLayer implements Directory
tr.clear(Range.startsWith(nodeSubspace.unpack(node.getKey()).getBytes(0)));
tr.clear(node.range());
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
CompletableFuture<Void> subdirRemoveFuture;
if(rangeItr.onHasNext().isDone() && rangeItr.hasNext())
subdirRemoveFuture = removeRecursive(tr, nodeWithPrefix(rangeItr.next().getValue()));
@ -1039,9 +1040,9 @@ public class DirectoryLayer implements Directory
node = new Node(rootNode, currentPath, path);
currentPath = new ArrayList<String>();
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
if(index == path.size())
return CompletableFuture.completedFuture(false);
@ -1163,9 +1164,9 @@ public class DirectoryLayer implements Directory
}
public CompletableFuture<byte[]> find(final Transaction tr, final HighContentionAllocator allocator) {
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext()
.thenApply(new Function<Boolean, Void>() {
@ -1203,9 +1204,9 @@ public class DirectoryLayer implements Directory
public CompletableFuture<Void> chooseWindow(final Transaction tr, final HighContentionAllocator allocator) {
final long initialWindowStart = windowStart;
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
final byte[] counterKey = allocator.counters.get(windowStart).getKey();
Range oldCounters = new Range(allocator.counters.getKey(), counterKey);
@ -1244,9 +1245,9 @@ public class DirectoryLayer implements Directory
public CompletableFuture<Boolean> choosePrefix(final Transaction tr, final HighContentionAllocator allocator) {
restart = false;
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional

View File

@ -36,10 +36,8 @@ import com.apple.foundationdb.KeySelector;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.Tuple;
@ -520,7 +518,7 @@ public class AsyncStackTester {
return inst.popParams(listSize).thenApply(new Function<List<Object>, Void>() {
@Override
public Void apply(List<Object> rawElements) {
List<Tuple> tuples = new ArrayList(listSize);
List<Tuple> tuples = new ArrayList<Tuple>(listSize);
for(Object o : rawElements) {
tuples.add(Tuple.fromBytes((byte[])o));
}

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.Tuple;
@ -40,9 +41,9 @@ class DirectoryUtil {
}
CompletableFuture<List<Tuple>> pop() {
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
if(num-- == 0) {
return CompletableFuture.completedFuture(false);
}

View File

@ -29,7 +29,7 @@ public class WhileTrueTest {
// This should cause memory issues using the old implementation but not the new one.
// Pro tip: Run with options -Xms16m -Xmx16m -XX:+HeadDumpOnOutOfMemoryError
AtomicInteger count = new AtomicInteger(1000000);
AsyncUtil.whileTrue(v -> CompletableFuture.completedFuture(count.decrementAndGet()).thenApplyAsync(c -> c > 0)).join();
AsyncUtil.whileTrue(() -> CompletableFuture.completedFuture(count.decrementAndGet()).thenApplyAsync(c -> c > 0)).join();
System.out.println("Final value: " + count.get());
}
}

View File

@ -115,7 +115,7 @@ struct OpenDatabaseRequest {
// info changes. Returns immediately if the current client info id is different from
// knownClientInfoID; otherwise returns when it next changes (or perhaps after a long interval)
Arena arena;
StringRef dbName, issues;
StringRef dbName, issues, traceLogGroup;
VectorRef<ClientVersionRef> supportedVersions;
UID knownClientInfoID;
ReplyPromise< struct ClientDBInfo > reply;
@ -123,7 +123,7 @@ struct OpenDatabaseRequest {
template <class Ar>
void serialize(Ar& ar) {
ASSERT( ar.protocolVersion() >= 0x0FDB00A400040001LL );
ar & dbName & issues & supportedVersions & knownClientInfoID & reply & arena;
ar & dbName & issues & supportedVersions & traceLogGroup & knownClientInfoID & reply & arena;
}
};

View File

@ -135,20 +135,22 @@ public:
}
Future<Optional<T>> get(Database cx, bool snapshot = false) const {
auto &copy = *this;
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
return get(tr, snapshot);
return copy.get(tr, snapshot);
});
}
Future<T> getOrThrow(Database cx, bool snapshot = false, Error err = key_not_found()) const {
auto &copy = *this;
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
return getOrThrow(tr, snapshot, err);
return copy.getOrThrow(tr, snapshot, err);
});
}

View File

@ -474,6 +474,7 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
req.knownClientInfoID = outInfo->get().id;
req.dbName = dbName;
req.supportedVersions = VectorRef<ClientVersionRef>(req.arena, networkOptions.supportedVersions);
req.traceLogGroup = StringRef(req.arena, networkOptions.traceLogGroup);
ClusterConnectionString fileConnectionString;
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
@ -2484,6 +2485,9 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
state double startTime;
if (info.debugID.present())
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
state CommitTransactionRequest ctReq(*req);
try {
Version v = wait( readVersion );
req->transaction.read_snapshot = v;
@ -2562,8 +2566,8 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
} else {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_not_committed && e.code() != error_code_database_locked)
TraceEvent(SevError, "tryCommitError").error(e);
if (e.code() != error_code_actor_cancelled && trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventCommitError(startTime, static_cast<int>(e.code()), req));
if (trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventCommitError(startTime, static_cast<int>(e.code()), &ctReq));
throw;
}
}

View File

@ -75,6 +75,7 @@ public:
ProcessIssuesMap clientsWithIssues, workersWithIssues;
std::map<NetworkAddress, double> incompatibleConnections;
ClientVersionMap clientVersionMap;
std::map<NetworkAddress, std::string> traceLogGroupMap;
Promise<Void> forceMasterFailure;
int64_t masterRegistrationCount;
DatabaseConfiguration config; // Asynchronously updated via master registration
@ -951,6 +952,7 @@ ACTOR Future<Void> clusterOpenDatabase(
UID knownClientInfoID,
std::string issues,
Standalone<VectorRef<ClientVersionRef>> supportedVersions,
Standalone<StringRef> traceLogGroup,
ReplyPromise<ClientDBInfo> reply)
{
// NOTE: The client no longer expects this function to return errors
@ -961,6 +963,8 @@ ACTOR Future<Void> clusterOpenDatabase(
db->clientVersionMap[reply.getEndpoint().address] = supportedVersions;
}
db->traceLogGroupMap[reply.getEndpoint().address] = traceLogGroup.toString();
while (db->clientInfo->get().id == knownClientInfoID) {
choose {
when (Void _ = wait( db->clientInfo->onChange() )) {}
@ -970,6 +974,7 @@ ACTOR Future<Void> clusterOpenDatabase(
removeIssue( db->clientsWithIssues, reply.getEndpoint().address, issues, issueID );
db->clientVersionMap.erase(reply.getEndpoint().address);
db->traceLogGroupMap.erase(reply.getEndpoint().address);
reply.send( db->clientInfo->get() );
return Void();
@ -1501,7 +1506,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
}
}
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, coordinators, incompatibleConnections)));
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();
@ -1669,7 +1674,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
return Void();
}
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
addActor.send( clusterOpenDatabase( &self.db, req.dbName, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.reply ) );
addActor.send( clusterOpenDatabase( &self.db, req.dbName, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.traceLogGroup, req.reply ) );
}
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
addActor.send( clusterRecruitFromConfiguration( &self, req ) );

View File

@ -868,7 +868,7 @@ ACTOR static Future<StatusObject> processStatusFetcher(
return processMap;
}
static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap, std::map<NetworkAddress, std::string> traceLogGroupMap) {
StatusObject clientStatus;
clientStatus["count"] = (int64_t)clientVersionMap.size();
@ -890,10 +890,13 @@ static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
StatusArray clients = StatusArray();
for(auto client : cv.second) {
clients.push_back(client.toString());
StatusObject cli;
cli["address"] = client.toString();
cli["log_group"] = traceLogGroupMap[client];
clients.push_back(cli);
}
ver["clients"] = clients;
ver["connected_clients"] = clients;
versionsArray.push_back(ver);
}
@ -1688,6 +1691,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
ProcessIssuesMap workerIssues,
ProcessIssuesMap clientIssues,
ClientVersionMap clientVersionMap,
std::map<NetworkAddress, std::string> traceLogGroupMap,
ServerCoordinators coordinators,
std::vector<NetworkAddress> incompatibleConnections )
{
@ -1866,7 +1870,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
StatusObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, latestError, traceFileOpenErrors, programStarts, processIssues, storageServers, tLogs, cx, configuration, &status_incomplete_reasons));
statusObj["processes"] = processStatus;
statusObj["clients"] = clientStatusFetcher(clientVersionMap);
statusObj["clients"] = clientStatusFetcher(clientVersionMap, traceLogGroupMap);
StatusArray incompatibleConnectionsArray;
for(auto it : incompatibleConnections) {

View File

@ -32,6 +32,6 @@ typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > Clie
std::string extractAttribute( std::string const& expanded, std::string const& attributeToExtract );
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<std::pair<WorkerInterface, ProcessClass>> const& workers,
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections );
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections );
#endif

View File

@ -51,7 +51,7 @@ using namespace boost::asio::ip;
// These impact both communications and the deserialization of certain database and IKeyValueStore keys
// xyzdev
// vvvv
uint64_t currentProtocolVersion = 0x0FDB00A551020001LL;
uint64_t currentProtocolVersion = 0x0FDB00A551030001LL;
uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -70,7 +70,7 @@ Future<Optional<T>> stopAfter( Future<T> what ) {
T _ = wait(what);
ret = Optional<T>(_);
} catch (Error& e) {
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete;
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled;
TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
if(!ok) {
fprintf(stderr, "Fatal Error: %s\n", e.what());

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long