Merge branch 'master' into getrange-perf-improvements

This commit is contained in:
A.J. Beamon 2017-11-02 08:42:26 -07:00
commit 07c80484eb
47 changed files with 531 additions and 127 deletions

View File

@ -1506,10 +1506,37 @@ REGISTER_INSTRUCTION_FUNC(AtomicOPFunc);
struct UnitTestsFunc : InstructionFunc {
static const char* name;
static Future<Void> call(Reference<FlowTesterData> const& data, Reference<InstructionData> const& instruction) {
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
ASSERT(data->api->evaluatePredicate(FDBErrorPredicate::FDB_ERROR_PREDICATE_RETRYABLE, Error(1020)));
ASSERT(!data->api->evaluatePredicate(FDBErrorPredicate::FDB_ERROR_PREDICATE_RETRYABLE, Error(10)));
state Reference<Transaction> tr(new Transaction(data->db));
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_BATCH);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_CAUSAL_READ_RISKY);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_CAUSAL_WRITE_RISKY);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_AHEAD_DISABLE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_DURABILITY_DEV_NULL_IS_WEB_SCALE);
const uint64_t timeout = 60*1000;
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_TIMEOUT, Optional<StringRef>(StringRef((const uint8_t*)&timeout, 8)));
const uint64_t retryLimit = 50;
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_RETRY_LIMIT, Optional<StringRef>(StringRef((const uint8_t*)&retryLimit, 8)));
const uint64_t maxRetryDelay = 100;
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_MAX_RETRY_DELAY, Optional<StringRef>(StringRef((const uint8_t*)&maxRetryDelay, 8)));
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_USED_DURING_COMMIT_PROTECTION_DISABLE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_TRANSACTION_LOGGING_ENABLE, Optional<StringRef>(LiteralStringRef("my_transaction")));
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_LOCK_AWARE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE);
Optional<FDBStandalone<ValueRef> > _ = wait(tr->get(LiteralStringRef("\xff")));
tr->cancel();
return Void();
}
};
const char* UnitTestsFunc::name = "UNIT_TESTS";

View File

@ -745,6 +745,8 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
tr.Options().SetMaxRetryDelay(100)
tr.Options().SetUsedDuringCommitProtectionDisable()
tr.Options().SetTransactionLoggingEnable("my_transaction")
tr.Options().SetReadLockAware()
tr.Options().SetLockAware()
return tr.Get(fdb.Key("\xff")).MustGet(), nil
})

View File

@ -22,10 +22,8 @@ package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import com.apple.foundationdb.async.AsyncUtil;
@ -72,7 +70,7 @@ class FDBDatabase extends DefaultDisposableImpl implements Database, Disposable,
public <T> CompletableFuture<T> runAsync(final Function<? super Transaction, CompletableFuture<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<>(createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<>();
return AsyncUtil.whileTrue(v -> {
return AsyncUtil.whileTrue(() -> {
CompletableFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());
return process.thenComposeAsync(returnVal ->

View File

@ -22,18 +22,15 @@ package com.apple.foundationdb.async;
import static com.apple.foundationdb.FDB.DEFAULT_EXECUTOR;
import com.apple.foundationdb.FDBException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Provided utilities for using and manipulating {@link CompletableFuture}s.
@ -85,9 +82,9 @@ public class AsyncUtil {
final List<V> accumulator = new LinkedList<V>();
// The condition of the while loop is simply "onHasNext()" returning true
Function<Void, CompletableFuture<Boolean>> condition = new Function<Void, CompletableFuture<Boolean>>() {
Supplier<CompletableFuture<Boolean>> condition = new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void v) {
public CompletableFuture<Boolean> get() {
return it.onHasNext().thenApply(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean o) {
@ -170,11 +167,11 @@ public class AsyncUtil {
}
private static class LoopPartial implements BiFunction<Boolean, Throwable, Void> {
final Function<Void, ? extends CompletableFuture<Boolean>> body;
final Supplier<? extends CompletableFuture<Boolean>> body;
final CompletableFuture<Void> done;
final Executor executor;
public LoopPartial(Function<Void, ? extends CompletableFuture<Boolean>> body, Executor executor) {
public LoopPartial(Supplier<? extends CompletableFuture<Boolean>> body, Executor executor) {
this.body = body;
this.done = new CompletableFuture<>();
this.executor = executor;
@ -192,7 +189,7 @@ public class AsyncUtil {
}
CompletableFuture<Boolean> result;
try {
result = body.apply(null);
result = body.get();
} catch (Exception e) {
done.completeExceptionally(e);
break;
@ -226,7 +223,10 @@ public class AsyncUtil {
* @param body the asynchronous operation over which to loop
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier) whileTrue} that takes a
* {@link Supplier} instead.
*/
@Deprecated
public static CompletableFuture<Void> whileTrue(Function<Void,? extends CompletableFuture<Boolean>> body) {
return whileTrue(body, DEFAULT_EXECUTOR);
}
@ -238,8 +238,34 @@ public class AsyncUtil {
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
* @deprecated Since version 5.1.0. Use the version of {@link #whileTrue(Supplier, Executor) whileTrue} that takes a
* {@link Supplier} instead.
*/
@Deprecated
public static CompletableFuture<Void> whileTrue(Function<Void,? extends CompletableFuture<Boolean>> body, Executor executor) {
return whileTrue(() -> body.apply(null), executor);
}
/**
* Executes an asynchronous operation repeatedly until it returns {@code False}.
*
* @param body the asynchronous operation over which to loop
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
*/
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body) {
return whileTrue(body, DEFAULT_EXECUTOR);
}
/**
* Executes an asynchronous operation repeatedly until it returns {@code False}.
*
* @param body the asynchronous operation over which to loop
* @param executor the {@link Executor} to use for asynchronous operations
*
* @return a {@code PartialFuture} which will be set at completion of the loop.
*/
public static CompletableFuture<Void> whileTrue(Supplier<CompletableFuture<Boolean>> body, Executor executor) {
return new LoopPartial(body, executor).run();
}

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
@ -730,9 +731,9 @@ public class DirectoryLayer implements Directory
tr.clear(Range.startsWith(nodeSubspace.unpack(node.getKey()).getBytes(0)));
tr.clear(node.range());
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
CompletableFuture<Void> subdirRemoveFuture;
if(rangeItr.onHasNext().isDone() && rangeItr.hasNext())
subdirRemoveFuture = removeRecursive(tr, nodeWithPrefix(rangeItr.next().getValue()));
@ -1039,9 +1040,9 @@ public class DirectoryLayer implements Directory
node = new Node(rootNode, currentPath, path);
currentPath = new ArrayList<String>();
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
if(index == path.size())
return CompletableFuture.completedFuture(false);
@ -1163,9 +1164,9 @@ public class DirectoryLayer implements Directory
}
public CompletableFuture<byte[]> find(final Transaction tr, final HighContentionAllocator allocator) {
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
final AsyncIterator<KeyValue> rangeItr = tr.snapshot().getRange(allocator.counters.range(), 1, true).iterator();
return rangeItr.onHasNext()
.thenApply(new Function<Boolean, Void>() {
@ -1203,9 +1204,9 @@ public class DirectoryLayer implements Directory
public CompletableFuture<Void> chooseWindow(final Transaction tr, final HighContentionAllocator allocator) {
final long initialWindowStart = windowStart;
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
final byte[] counterKey = allocator.counters.get(windowStart).getKey();
Range oldCounters = new Range(allocator.counters.getKey(), counterKey);
@ -1244,9 +1245,9 @@ public class DirectoryLayer implements Directory
public CompletableFuture<Boolean> choosePrefix(final Transaction tr, final HighContentionAllocator allocator) {
restart = false;
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional

View File

@ -36,10 +36,8 @@ import com.apple.foundationdb.KeySelector;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.Tuple;
@ -520,7 +518,7 @@ public class AsyncStackTester {
return inst.popParams(listSize).thenApply(new Function<List<Object>, Void>() {
@Override
public Void apply(List<Object> rawElements) {
List<Tuple> tuples = new ArrayList(listSize);
List<Tuple> tuples = new ArrayList<Tuple>(listSize);
for(Object o : rawElements) {
tuples.add(Tuple.fromBytes((byte[])o));
}
@ -593,6 +591,8 @@ public class AsyncStackTester {
tr.options().setMaxRetryDelay(100);
tr.options().setUsedDuringCommitProtectionDisable();
tr.options().setTransactionLoggingEnable("my_transaction");
tr.options().setReadLockAware();
tr.options().setLockAware();
if(!(new FDBException("Fake", 1020)).isRetryable() ||
(new FDBException("Fake", 10)).isRetryable())

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.tuple.Tuple;
@ -40,9 +41,9 @@ class DirectoryUtil {
}
CompletableFuture<List<Tuple>> pop() {
return AsyncUtil.whileTrue(new Function<Void, CompletableFuture<Boolean>>() {
return AsyncUtil.whileTrue(new Supplier<CompletableFuture<Boolean>>() {
@Override
public CompletableFuture<Boolean> apply(Void ignore) {
public CompletableFuture<Boolean> get() {
if(num-- == 0) {
return CompletableFuture.completedFuture(false);
}

View File

@ -408,6 +408,8 @@ public class StackTester {
tr.options().setMaxRetryDelay(100);
tr.options().setUsedDuringCommitProtectionDisable();
tr.options().setTransactionLoggingEnable("my_transaction");
tr.options().setReadLockAware();
tr.options().setLockAware();
if(!(new FDBException("Fake", 1020)).isRetryable() ||
(new FDBException("Fake", 10)).isRetryable())

View File

@ -29,7 +29,7 @@ public class WhileTrueTest {
// This should cause memory issues using the old implementation but not the new one.
// Pro tip: Run with options -Xms16m -Xmx16m -XX:+HeadDumpOnOutOfMemoryError
AtomicInteger count = new AtomicInteger(1000000);
AsyncUtil.whileTrue(v -> CompletableFuture.completedFuture(count.decrementAndGet()).thenApplyAsync(c -> c > 0)).join();
AsyncUtil.whileTrue(() -> CompletableFuture.completedFuture(count.decrementAndGet()).thenApplyAsync(c -> c > 0)).join();
System.out.println("Final value: " + count.get());
}
}

View File

@ -632,6 +632,8 @@ public class AsyncStackTester {
tr.options().setMaxRetryDelay(100);
tr.options().setUsedDuringCommitProtectionDisable();
tr.options().setTransactionLoggingEnable("my_transaction");
tr.options().setReadLockAware();
tr.options().setLockAware();
if(!(new FDBException("Fake", 1020)).isRetryable() ||
(new FDBException("Fake", 10)).isRetryable())

View File

@ -439,6 +439,8 @@ public class StackTester {
tr.options().setMaxRetryDelay(100);
tr.options().setUsedDuringCommitProtectionDisable();
tr.options().setTransactionLoggingEnable("my_transaction");
tr.options().setReadLockAware();
tr.options().setLockAware();
if(!(new FDBException("Fake", 1020)).isRetryable() ||
(new FDBException("Fake", 10)).isRetryable())

View File

@ -719,6 +719,8 @@ function processOperation(context, inst, cb) {
tr.options.setMaxRetryDelay(100);
tr.options.setUsedDuringCommitProtectionDisable();
tr.options.setTransactionLoggingEnable('my_transaction');
tr.options.setReadLockAware()
tr.options.setLockAware()
tr.get(fdb.buffer.fromByteLiteral('\xff'), innerCb);
})

View File

@ -135,6 +135,8 @@ def test_options(tr):
tr.options.set_max_retry_delay(100);
tr.options.set_used_during_commit_protection_disable()
tr.options.set_transaction_logging_enable('my_transaction')
tr.options.set_read_lock_aware()
tr.options.set_lock_aware()
tr.get(b'\xff').wait()

View File

@ -445,6 +445,8 @@ class Tester
tr.options.set_max_retry_delay(100)
tr.options.set_used_during_commit_protection_disable
tr.options.set_transaction_logging_enable('my_transaction')
tr.options.set_read_lock_aware()
tr.options.set_lock_aware()
tr.get("\xff").to_s
end

View File

@ -73,11 +73,11 @@ enum enumProgramExe {
};
enum enumBackupType {
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_DISABLE, BACKUP_ENABLE
};
enum enumDBType {
DB_UNDEFINED=0, DB_START, DB_STATUS, DB_SWITCH, DB_ABORT
DB_UNDEFINED=0, DB_START, DB_STATUS, DB_SWITCH, DB_ABORT, DB_DISABLE, DB_ENABLE
};
enum enumRestoreType {
@ -277,6 +277,29 @@ CSimpleOpt::SOption g_rgBackupWaitOptions[] = {
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupDisableOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgRestoreOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
@ -455,6 +478,31 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgDBDisableOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_SOURCE_CLUSTER, "-s", SO_REQ_SEP },
{ OPT_SOURCE_CLUSTER, "--source", SO_REQ_SEP },
{ OPT_DEST_CLUSTER, "-d", SO_REQ_SEP },
{ OPT_DEST_CLUSTER, "--destination", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBlobOptions[] = {
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_VERSION, "--version", SO_NONE },
@ -548,7 +596,7 @@ void printBackupContainerInfo() {
static void printBackupUsage(bool devhelp) {
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
printf("Usage: %s (start | status | abort | wait | discontinue) [OPTIONS]\n\n", exeBackup.toString().c_str());
printf("Usage: %s (start | status | abort | wait | discontinue | disable | enable) [OPTIONS]\n\n", exeBackup.toString().c_str());
printf(" -C CONNFILE The path of a file containing the connection string for the\n"
" FoundationDB cluster. The default is first the value of the\n"
" FDB_CLUSTER_FILE environment variable, then `./fdb.cluster',\n"
@ -644,7 +692,7 @@ static void printDBAgentUsage(bool devhelp) {
static void printDBBackupUsage(bool devhelp) {
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
printf("Usage: %s (start | status | switch | abort) [OPTIONS]\n\n", exeDatabaseBackup.toString().c_str());
printf("Usage: %s (start | status | switch | abort | disable | enable) [OPTIONS]\n\n", exeDatabaseBackup.toString().c_str());
printf(" -d, --destination CONNFILE\n"
" The path of a file containing the connection string for the\n");
printf(" destination FoundationDB cluster.\n");
@ -806,6 +854,8 @@ enumBackupType getBackupType(std::string backupType)
values["abort"] = BACKUP_ABORT;
values["wait"] = BACKUP_WAIT;
values["discontinue"] = BACKUP_DISCONTINUE;
values["disable"] = BACKUP_DISABLE;
values["enable"] = BACKUP_ENABLE;
}
auto i = values.find(backupType);
@ -836,6 +886,8 @@ enumDBType getDBType(std::string dbType)
values["status"] = DB_STATUS;
values["switch"] = DB_SWITCH;
values["abort"] = DB_ABORT;
values["disable"] = DB_DISABLE;
values["enable"] = DB_ENABLE;
}
auto i = values.find(dbType);
@ -864,7 +916,6 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
op.create("version") = readVer + 120 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
layerRoot.create("instances_running.$sum") = 1;
layerRoot.create("total_workers.$sum") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
layerRoot.create("last_updated.$max") = now();
state JSONDoc o = layerRoot.subDoc("instances." + id);
@ -876,7 +927,7 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
o.create("resident_size") = (int64_t)getResidentMemoryUsage();
o.create("main_thread_cpu_seconds") = getProcessorTimeThread();
o.create("process_cpu_seconds") = getProcessorTimeProcess();
o.create("workers") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
o.create("configured_workers") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
if(exe == EXE_AGENT) {
static BlobStoreEndpoint::Stats last_stats;
@ -899,10 +950,11 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
state FileBackupAgent fba;
state std::vector<KeyBackedTag> backupTags = wait(getAllBackupTags(tr));
state std::vector<Future<Version>> tagLastRestorableVersions;
state std::vector<Future<int>> tagStates;
state std::vector<Future<EBackupState>> tagStates;
state std::vector<Future<std::string>> tagContainers;
state std::vector<Future<int64_t>> tagRangeBytes;
state std::vector<Future<int64_t>> tagLogBytes;
state Future<Optional<Value>> fBackupDisabled = tr->get(fba.taskBucket->getDisableKey());
state int i = 0;
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -910,27 +962,21 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
state std::vector<KeyBackedTag>::iterator tag;
for (tag = backupTags.begin(); tag != backupTags.end(); tag++) {
UidAndAbortedFlagT uidAndAbortedFlag = wait(tag->getOrThrow(tr));
state BackupConfig config(uidAndAbortedFlag.first);
EBackupState status = wait(config.stateEnum().getOrThrow(tr));
tagStates.push_back(status);
int64_t rangeBytesWritten = wait(config.rangeBytesWritten().getD(tr, 0));
tagRangeBytes.push_back(rangeBytesWritten);
int64_t logBytesWritten = wait(config.logBytesWritten().getD(tr, 0));
tagLogBytes.push_back(logBytesWritten);
std::string backupContainer = wait(config.backupContainer().getOrThrow(tr));
tagContainers.push_back(backupContainer);
BackupConfig config(uidAndAbortedFlag.first);
tagStates.push_back(config.stateEnum().getOrThrow(tr));
tagRangeBytes.push_back(config.rangeBytesWritten().getD(tr, 0));
tagLogBytes.push_back(config.logBytesWritten().getD(tr, 0));
tagContainers.push_back(config.backupContainer().getOrThrow(tr));
tagLastRestorableVersions.push_back(fba.getLastRestorable(tr, StringRef(tag->tagName)));
}
Void _ = wait( waitForAll(tagLastRestorableVersions) && waitForAll(tagStates) && waitForAll(tagContainers) && waitForAll(tagRangeBytes) && waitForAll(tagLogBytes));
Void _ = wait( waitForAll(tagLastRestorableVersions) && waitForAll(tagStates) && waitForAll(tagContainers) && waitForAll(tagRangeBytes) && waitForAll(tagLogBytes) && success(fBackupDisabled));
JSONDoc tagsRoot = layerRoot.subDoc("tags.$latest");
layerRoot.create("tags.timestamp") = now();
layerRoot.create("total_workers.$sum") = fBackupDisabled.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
layerRoot.create("disabled.$latest") = fBackupDisabled.get().present();
int j = 0;
for (KeyBackedTag eachTag : backupTags) {
@ -963,6 +1009,7 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
state std::vector<Future<int>> backupStatus;
state std::vector<Future<int64_t>> tagRangeBytesDR;
state std::vector<Future<int64_t>> tagLogBytesDR;
state Future<Optional<Value>> fDRDisabled = tr->get(dba.taskBucket->getDisableKey());
for(int i = 0; i < tagNames.size(); i++) {
backupVersion.push_back(tr2->get(tagNames[i].value.withPrefix(applyMutationsBeginRange.begin)));
@ -972,10 +1019,12 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
tagLogBytesDR.push_back(dba.getLogBytesWritten(tr2, tagUID));
}
Void _ = wait(waitForAll(backupStatus) && waitForAll(backupVersion) && waitForAll(tagRangeBytesDR) && waitForAll(tagLogBytesDR));
Void _ = wait(waitForAll(backupStatus) && waitForAll(backupVersion) && waitForAll(tagRangeBytesDR) && waitForAll(tagLogBytesDR) && success(fDRDisabled));
JSONDoc tagsRoot = layerRoot.subDoc("tags.$latest");
layerRoot.create("tags.timestamp") = now();
layerRoot.create("total_workers.$sum") = fDRDisabled.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
layerRoot.create("disabled.$latest") = fDRDisabled.get().present();
for (int i = 0; i < tagNames.size(); i++) {
std::string tagName = dba.sourceTagNames.unpack(tagNames[i].key).getString(0).toString();
@ -1529,6 +1578,38 @@ ACTOR Future<Void> discontinueBackup(Database db, std::string tagName, bool wait
return Void();
}
ACTOR Future<Void> changeBackupEnabled(Database db, bool disable) {
try {
state FileBackupAgent backupAgent;
Void _ = wait(backupAgent.taskBucket->changeDisable(db, disable));
printf("All backup agents have been %s.\n", disable ? "disabled" : "enabled");
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
ACTOR Future<Void> changeDBBackupEnabled(Database src, Database dest, bool disable) {
try {
state DatabaseBackupAgent backupAgent(src);
Void _ = wait(backupAgent.taskBucket->changeDisable(dest, disable));
printf("All DR agents have been %s.\n", disable ? "disabled" : "enabled");
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
ACTOR Future<Void> runRestore(Database db, std::string tagName, std::string container, Standalone<VectorRef<KeyRangeRef>> ranges, Version dbVersion, bool performRestore, bool verbose, bool waitForDone, std::string addPrefix, std::string removePrefix) {
try
{
@ -1930,6 +2011,12 @@ int main(int argc, char* argv[]) {
case BACKUP_DISCONTINUE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDiscontinueOptions, SO_O_EXACT);
break;
case BACKUP_DISABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDisableOptions, SO_O_EXACT);
break;
case BACKUP_ENABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDisableOptions, SO_O_EXACT);
break;
case BACKUP_UNDEFINED:
default:
// Display help, if requested
@ -1973,6 +2060,12 @@ int main(int argc, char* argv[]) {
case DB_ABORT:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBAbortOptions, SO_O_EXACT);
break;
case DB_DISABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBDisableOptions, SO_O_EXACT);
break;
case DB_ENABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBDisableOptions, SO_O_EXACT);
break;
case DB_UNDEFINED:
default:
// Display help, if requested
@ -2517,6 +2610,14 @@ int main(int argc, char* argv[]) {
f = stopAfter( discontinueBackup(db, tagName, waitForDone) );
break;
case BACKUP_DISABLE:
f = stopAfter( changeBackupEnabled(db, true) );
break;
case BACKUP_ENABLE:
f = stopAfter( changeBackupEnabled(db, false) );
break;
case BACKUP_UNDEFINED:
default:
fprintf(stderr, "ERROR: Unsupported backup action %s\n", argv[1]);
@ -2557,7 +2658,7 @@ int main(int argc, char* argv[]) {
case EXE_DR_AGENT:
f = stopAfter( runDBAgent(source_db, db) );
break;
case EXE_DB_BACKUP: //DB_START, DB_STATUS, DB_SWITCH, DB_ABORT, DB_CLEANUP
case EXE_DB_BACKUP:
switch (dbType)
{
case DB_START:
@ -2572,6 +2673,12 @@ int main(int argc, char* argv[]) {
case DB_ABORT:
f = stopAfter( abortDBBackup(source_db, db, tagName, partial) );
break;
case DB_DISABLE:
f = stopAfter( changeDBBackupEnabled(source_db, db, true) );
break;
case DB_ENABLE:
f = stopAfter( changeDBBackupEnabled(source_db, db, false) );
break;
case DB_UNDEFINED:
default:
fprintf(stderr, "ERROR: Unsupported DR action %s\n", argv[1]);

View File

@ -2576,7 +2576,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if (sizeLimitFuture.get().present()) {
const int64_t sizeLimit = BinaryReader::fromStringRef<int64_t>(sizeLimitFuture.get().get(), Unversioned());
if (sizeLimit != -1) {
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimitStr);
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
}
}
printf("Client profiling rate is set to %s and size limit is set to %s.\n", sampleRateStr.c_str(), sizeLimitStr.c_str());
@ -2674,6 +2674,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
for (const auto& pair : interfaces) {
ProfilerRequest profileRequest;
profileRequest.type = ProfilerRequest::Type::FLOW;
profileRequest.action = ProfilerRequest::Action::RUN;
profileRequest.duration = duration;
profileRequest.outputFile = tokens[4];
all_profiler_addresses.push_back(pair.first);
@ -2691,6 +2692,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
for (int tokenidx = 5; tokenidx < tokens.size(); tokenidx++) {
ProfilerRequest profileRequest;
profileRequest.type = ProfilerRequest::Type::FLOW;
profileRequest.action = ProfilerRequest::Action::RUN;
profileRequest.duration = duration;
profileRequest.outputFile = tokens[4];
all_profiler_addresses.push_back(tokens[tokenidx]);

View File

@ -277,6 +277,8 @@ public:
static std::string getDataFilename(Version version, int64_t size, int blockSize);
static std::string getLogFilename(Version beginVer, Version endVer, int64_t size, int blockSize);
static const Key keyLastRestorable;
Future<int64_t> getTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->getTaskCount(tr); }
Future<int64_t> getTaskCount(Database cx) { return taskBucket->getTaskCount(cx); }
Future<Void> watchTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->watchTaskCount(tr); }

View File

@ -80,5 +80,7 @@ struct ProfilerRequest {
ar & reply & type & action & duration & outputFile;
}
};
BINARY_SERIALIZABLE( ProfilerRequest::Type );
BINARY_SERIALIZABLE( ProfilerRequest::Action );
#endif

View File

@ -115,7 +115,7 @@ struct OpenDatabaseRequest {
// info changes. Returns immediately if the current client info id is different from
// knownClientInfoID; otherwise returns when it next changes (or perhaps after a long interval)
Arena arena;
StringRef dbName, issues;
StringRef dbName, issues, traceLogGroup;
VectorRef<ClientVersionRef> supportedVersions;
UID knownClientInfoID;
ReplyPromise< struct ClientDBInfo > reply;
@ -123,7 +123,7 @@ struct OpenDatabaseRequest {
template <class Ar>
void serialize(Ar& ar) {
ASSERT( ar.protocolVersion() >= 0x0FDB00A400040001LL );
ar & dbName & issues & supportedVersions & knownClientInfoID & reply & arena;
ar & dbName & issues & supportedVersions & traceLogGroup & knownClientInfoID & reply & arena;
}
};

7
fdbclient/DatabaseBackupAgent.actor.cpp Executable file → Normal file
View File

@ -1637,6 +1637,8 @@ public:
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<Value>> fDisabled = tr->get(backupAgent->taskBucket->getDisableKey());
int backupStateInt = wait(backupAgent->getStateValue(tr, logUid));
state BackupAgentBase::enumState backupState = (BackupAgentBase::enumState)backupStateInt;
@ -1722,6 +1724,11 @@ public:
}
}
Optional<Value> disabled = wait(fDisabled);
if(disabled.present()) {
statusText += format("\nAll DR agents have been disabled.\n");
}
break;
}
catch (Error &e) {

View File

@ -101,6 +101,8 @@ public:
// Key DB-specific information
Reference<AsyncVar<ClientDBInfo>> clientInfo;
AsyncTrigger masterProxiesChangeTrigger;
Future<Void> monitorMasterProxiesInfoChange;
Reference<ProxyInfo> masterProxies;
UID masterProxiesLastChange;
LocalityData clientLocality;

View File

@ -139,4 +139,4 @@ ACTOR Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct Clus
state Future<Void> client = ci->get().present() ? failureMonitorClientLoop(monitor, ci->get().get(), fmState, trackMyStatus) : Void();
Void _ = wait( ci->onChange() );
}
}
}

View File

@ -35,6 +35,8 @@
#include <boost/algorithm/string/classification.hpp>
#include <algorithm>
const Key FileBackupAgent::keyLastRestorable = LiteralStringRef("last_restorable");
// For convenience
typedef FileBackupAgent::ERestoreState ERestoreState;
@ -304,6 +306,7 @@ FileBackupAgent::FileBackupAgent()
: subspace(Subspace(fileBackupPrefixRange.begin))
// The other subspaces have logUID -> value
, config(subspace.get(BackupAgentBase::keyConfig))
, lastRestorable(subspace.get(FileBackupAgent::keyLastRestorable))
, taskBucket(new TaskBucket(subspace.get(BackupAgentBase::keyTasks), true, false, true))
, futureBucket(new FutureBucket(subspace.get(BackupAgentBase::keyFutures), true, true))
{
@ -3501,6 +3504,7 @@ public:
statusText = "";
tag = makeBackupTag(tagName);
state Optional<UidAndAbortedFlagT> uidAndAbortedFlag = wait(tag.get(tr));
state Future<Optional<Value>> fDisabled = tr->get(backupAgent->taskBucket->getDisableKey());
if (uidAndAbortedFlag.present()) {
config = BackupConfig(uidAndAbortedFlag.get().first);
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
@ -3541,6 +3545,12 @@ public:
statusText += format("[%lld]: %s\n", errMsg.get().second, errMsg.get().first.c_str());
}
}
Optional<Value> disabled = wait(fDisabled);
if(disabled.present()) {
statusText += format("\nAll backup agents have been disabled.\n");
}
break;
}
catch (Error &e) {

View File

@ -135,20 +135,22 @@ public:
}
Future<Optional<T>> get(Database cx, bool snapshot = false) const {
auto &copy = *this;
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
return get(tr, snapshot);
return copy.get(tr, snapshot);
});
}
Future<T> getOrThrow(Database cx, bool snapshot = false, Error err = key_not_found()) const {
auto &copy = *this;
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
return getOrThrow(tr, snapshot, err);
return copy.getOrThrow(tr, snapshot, err);
});
}

View File

@ -442,12 +442,25 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext *cx) {
}
}
ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo, AsyncTrigger *triggerVar) {
state vector< MasterProxyInterface > curProxies;
curProxies = clientDBInfo->get().proxies;
loop{
Void _ = wait(clientDBInfo->onChange());
if (clientDBInfo->get().proxies != curProxies) {
curProxies = clientDBInfo->get().proxies;
triggerVar->trigger();
}
}
}
DatabaseContext::DatabaseContext(
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<Cluster> cluster, Future<Void> clientInfoMonitor,
Standalone<StringRef> dbName, Standalone<StringRef> dbId,
int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware )
: clientInfo(clientInfo), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId),
: clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId),
transactionsReadVersions(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsPastVersions(0),
transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID),
outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
@ -460,6 +473,7 @@ DatabaseContext::DatabaseContext(
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
}
@ -474,6 +488,7 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
req.knownClientInfoID = outInfo->get().id;
req.dbName = dbName;
req.supportedVersions = VectorRef<ClientVersionRef>(req.arena, networkOptions.supportedVersions);
req.traceLogGroup = StringRef(req.arena, networkOptions.traceLogGroup);
ClusterConnectionString fileConnectionString;
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
@ -541,6 +556,7 @@ Database DatabaseContext::create( Reference<AsyncVar<ClientDBInfo>> info, Future
}
DatabaseContext::~DatabaseContext() {
monitorMasterProxiesInfoChange.cancel();
locationCacheLock.assertNotEntered();
SSInterfaceCacheLock.assertNotEntered();
SSInterfaceCache.clear();
@ -642,7 +658,7 @@ void DatabaseContext::invalidateCache( std::vector<UID> const& ids ) {
}
Future<Void> DatabaseContext::onMasterProxiesChanged() {
return this->clientInfo->onChange();
return this->masterProxiesChangeTrigger.onTrigger();
}
int64_t extractIntOption( Optional<StringRef> value, int64_t minValue, int64_t maxValue ) {
@ -2492,6 +2508,9 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
state double startTime;
if (info.debugID.present())
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
state CommitTransactionRequest ctReq(*req);
try {
Version v = wait( readVersion );
req->transaction.read_snapshot = v;
@ -2570,8 +2589,8 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
} else {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_not_committed && e.code() != error_code_database_locked)
TraceEvent(SevError, "tryCommitError").error(e);
if (e.code() != error_code_actor_cancelled && trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventCommitError(startTime, static_cast<int>(e.code()), req));
if (trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventCommitError(startTime, static_cast<int>(e.code()), &ctReq));
throw;
}
}

View File

@ -386,7 +386,7 @@ public:
return true;
}
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
ACTOR static Future<Void> dispatch(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
state std::vector<Future<bool>> tasks(maxConcurrentTasks);
for(auto &f : tasks)
f = Never();
@ -449,6 +449,36 @@ public:
}
}
ACTOR static Future<Void> watchDisabled(Database cx, Reference<TaskBucket> taskBucket, Reference<AsyncVar<bool>> disabled) {
loop {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
try {
taskBucket->setOptions(tr);
Optional<Value> disabledVal = wait(tr->get(taskBucket->disableKey));
disabled->set(disabledVal.present());
state Future<Void> watchDisabledFuture = tr->watch(taskBucket->disableKey);
Void _ = wait(tr->commit());
Void _ = wait(watchDisabledFuture);
}
catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
}
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
state Reference<AsyncVar<bool>> disabled = Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) );
state Future<Void> watchDisabledFuture = watchDisabled(cx, taskBucket, disabled);
loop {
while(disabled->get()) {
Void _ = wait(disabled->onChange() || watchDisabledFuture);
}
Void _ = wait(dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || disabled->onChange() || watchDisabledFuture);
}
}
static Future<Standalone<StringRef>> addIdle(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket) {
taskBucket->setOptions(tr);
@ -693,6 +723,7 @@ TaskBucket::TaskBucket(const Subspace& subspace, bool sysAccess, bool priorityBa
, available(prefix.get(LiteralStringRef("av")))
, available_prioritized(prefix.get(LiteralStringRef("avp")))
, timeouts(prefix.get(LiteralStringRef("to")))
, disableKey(prefix.pack(LiteralStringRef("disable")))
, timeout(CLIENT_KNOBS->TASKBUCKET_TIMEOUT_VERSIONS)
, system_access(sysAccess)
, priority_batch(priorityBatch)
@ -711,6 +742,18 @@ Future<Void> TaskBucket::clear(Reference<ReadYourWritesTransaction> tr){
return Void();
}
Future<Void> TaskBucket::changeDisable(Reference<ReadYourWritesTransaction> tr, bool disable){
setOptions(tr);
if(disable) {
tr->set(disableKey, StringRef());
} else {
tr->clear(disableKey);
}
return Void();
}
Key TaskBucket::addTask(Reference<ReadYourWritesTransaction> tr, Reference<Task> task) {
setOptions(tr);
@ -775,6 +818,10 @@ Future<Void> TaskBucket::run(Database cx, Reference<FutureBucket> futureBucket,
return TaskBucketImpl::run(cx, Reference<TaskBucket>::addRef(this), futureBucket, pollDelay, maxConcurrentTasks);
}
Future<Void> TaskBucket::watchDisabled(Database cx, Reference<AsyncVar<bool>> disabled) {
return TaskBucketImpl::watchDisabled(cx, Reference<TaskBucket>::addRef(this), disabled);
}
Future<bool> TaskBucket::isEmpty(Reference<ReadYourWritesTransaction> tr){
return TaskBucketImpl::isEmpty(tr, Reference<TaskBucket>::addRef(this));
}

View File

@ -69,6 +69,11 @@ public:
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
}
Future<Void> changeDisable(Reference<ReadYourWritesTransaction> tr, bool disable);
Future<Void> changeDisable(Database cx, bool disable) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return changeDisable(tr, disable); });
}
Future<Void> clear(Reference<ReadYourWritesTransaction> tr);
Future<Void> clear(Database cx) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return clear(tr); });
@ -99,6 +104,7 @@ public:
Future<bool> doOne(Database cx, Reference<FutureBucket> futureBucket);
Future<Void> run(Database cx, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks);
Future<Void> watchDisabled(Database cx, Reference<AsyncVar<bool>> disabled);
Future<bool> isEmpty(Reference<ReadYourWritesTransaction> tr);
Future<bool> isEmpty(Database cx){
@ -148,6 +154,10 @@ public:
return lock_aware;
}
Key getDisableKey() const {
return disableKey;
}
Subspace getAvailableSpace(int priority = 0) {
if(priority == 0)
return available;
@ -165,6 +175,7 @@ private:
Subspace prefix;
Subspace active;
Key disableKey;
// Available task subspaces. Priority 0, the default, will be under available which is backward
// compatible with pre-priority TaskBucket processes. Priority 1 and higher will be in

View File

@ -28,6 +28,10 @@
#include "crc32c.h"
#include "simulator.h"
#if VALGRIND
#include <memcheck.h>
#endif
static NetworkAddress g_currentDeliveryPeerAddress;
const UID WLTOKEN_ENDPOINT_NOT_FOUND(-1, 0);
@ -532,6 +536,9 @@ static void scanPackets( TransportData* transport, uint8_t*& unprocessed_begin,
}
}
#if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(p, packetLen);
#endif
ArenaReader reader( arena, StringRef(p, packetLen), AssumeVersion(peerProtocolVersion) );
UID token; reader >> token;
@ -807,6 +814,9 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
BinaryWriter wr( AssumeVersion(currentProtocolVersion) );
what.serializeBinaryWriter(wr);
Standalone<StringRef> copy = wr.toStringRef();
#if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(copy.begin(), copy.size());
#endif
deliver( self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false );
@ -891,6 +901,16 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
self->warnAlwaysForLargePacket = false;
}
#if VALGRIND
SendBuffer *checkbuf = pb;
while (checkbuf) {
int size = checkbuf->bytes_written;
const uint8_t* data = checkbuf->data;
VALGRIND_CHECK_MEM_IS_DEFINED(data, size);
checkbuf = checkbuf -> next;
}
#endif
peer->send(pb, rp, firstUnsent);
return (PacketID)rp;

View File

@ -48,6 +48,7 @@ public:
}
};
#pragma pack(pop)
BINARY_SERIALIZABLE( Endpoint );
class NetworkMessageReceiver {

View File

@ -75,6 +75,7 @@ public:
ProcessIssuesMap clientsWithIssues, workersWithIssues;
std::map<NetworkAddress, double> incompatibleConnections;
ClientVersionMap clientVersionMap;
std::map<NetworkAddress, std::string> traceLogGroupMap;
Promise<Void> forceMasterFailure;
int64_t masterRegistrationCount;
DatabaseConfiguration config; // Asynchronously updated via master registration
@ -951,6 +952,7 @@ ACTOR Future<Void> clusterOpenDatabase(
UID knownClientInfoID,
std::string issues,
Standalone<VectorRef<ClientVersionRef>> supportedVersions,
Standalone<StringRef> traceLogGroup,
ReplyPromise<ClientDBInfo> reply)
{
// NOTE: The client no longer expects this function to return errors
@ -961,6 +963,8 @@ ACTOR Future<Void> clusterOpenDatabase(
db->clientVersionMap[reply.getEndpoint().address] = supportedVersions;
}
db->traceLogGroupMap[reply.getEndpoint().address] = traceLogGroup.toString();
while (db->clientInfo->get().id == knownClientInfoID) {
choose {
when (Void _ = wait( db->clientInfo->onChange() )) {}
@ -970,6 +974,7 @@ ACTOR Future<Void> clusterOpenDatabase(
removeIssue( db->clientsWithIssues, reply.getEndpoint().address, issues, issueID );
db->clientVersionMap.erase(reply.getEndpoint().address);
db->traceLogGroupMap.erase(reply.getEndpoint().address);
reply.send( db->clientInfo->get() );
return Void();
@ -1501,7 +1506,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
}
}
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, coordinators, incompatibleConnections)));
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();
@ -1619,19 +1624,15 @@ ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db
state Optional<Value> rateVal = wait(tr.get(fdbClientInfoTxnSampleRate));
state Optional<Value> limitVal = wait(tr.get(fdbClientInfoTxnSizeLimit));
ClientDBInfo clientInfo = db->clientInfo->get();
if (rateVal.present()) {
double rate = BinaryReader::fromStringRef<double>(rateVal.get(), Unversioned());
clientInfo.clientTxnInfoSampleRate = rate;
}
if (limitVal.present()) {
int64_t limit = BinaryReader::fromStringRef<int64_t>(limitVal.get(), Unversioned());
clientInfo.clientTxnInfoSizeLimit = limit;
}
if (rateVal.present() || limitVal.present()) {
double sampleRate = rateVal.present() ? BinaryReader::fromStringRef<double>(rateVal.get(), Unversioned()) : std::numeric_limits<double>::infinity();
int64_t sizeLimit = limitVal.present() ? BinaryReader::fromStringRef<int64_t>(limitVal.get(), Unversioned()) : -1;
if (sampleRate != clientInfo.clientTxnInfoSampleRate || sizeLimit != clientInfo.clientTxnInfoSampleRate) {
clientInfo.id = g_random->randomUniqueID();
clientInfo.clientTxnInfoSampleRate = sampleRate;
clientInfo.clientTxnInfoSizeLimit = sizeLimit;
db->clientInfo->set(clientInfo);
}
state Future<Void> watchRateFuture = tr.watch(fdbClientInfoTxnSampleRate);
state Future<Void> watchLimitFuture = tr.watch(fdbClientInfoTxnSizeLimit);
Void _ = wait(tr.commit());
@ -1677,7 +1678,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
return Void();
}
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
addActor.send( clusterOpenDatabase( &self.db, req.dbName, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.reply ) );
addActor.send( clusterOpenDatabase( &self.db, req.dbName, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.traceLogGroup, req.reply ) );
}
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
addActor.send( clusterRecruitFromConfiguration( &self, req ) );

View File

@ -100,15 +100,21 @@ void restartShardTrackers(
ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize) {
ShardSizeBounds bounds;
bounds.max.bytes = maxShardSize;
if(shard.begin >= keyServersKeys.begin) {
bounds.max.bytes = SERVER_KNOBS->KEY_SERVER_SHARD_BYTES;
} else {
bounds.max.bytes = maxShardSize;
}
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.max.iosPerKSecond = bounds.max.infinity;
//The first shard can have arbitrarily small size
if(shard.begin != allKeys.begin)
bounds.min.bytes = bounds.max.bytes / SERVER_KNOBS->SHARD_BYTES_RATIO;
else
if(shard.begin == allKeys.begin) {
bounds.min.bytes = 0;
} else {
bounds.min.bytes = maxShardSize / SERVER_KNOBS->SHARD_BYTES_RATIO;
}
bounds.min.bytesPerKSecond = 0;
bounds.min.iosPerKSecond = 0;

View File

@ -86,6 +86,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( SHARD_BYTES_RATIO, 4 );
init( SHARD_BYTES_PER_SQRT_BYTES, 45 ); if( buggifySmallShards ) SHARD_BYTES_PER_SQRT_BYTES = 0;//Approximately 10000 bytes per shard
init( MAX_SHARD_BYTES, 500000000 );
init( KEY_SERVER_SHARD_BYTES, 500000000 );
bool buggifySmallBandwidthSplit = randomize && BUGGIFY;
init( SHARD_MAX_BYTES_PER_KSEC, 1LL*1000000*1000 ); if( buggifySmallBandwidthSplit ) SHARD_MAX_BYTES_PER_KSEC = 10LL*1000*1000;
/* 10*1MB/sec * 1000sec/ksec

View File

@ -87,7 +87,7 @@ public:
// Data distribution
double RETRY_RELOCATESHARD_DELAY;
double DATA_DISTRIBUTION_FAILURE_REACTION_TIME;
int MIN_SHARD_BYTES, SHARD_BYTES_RATIO, SHARD_BYTES_PER_SQRT_BYTES, MAX_SHARD_BYTES;
int MIN_SHARD_BYTES, SHARD_BYTES_RATIO, SHARD_BYTES_PER_SQRT_BYTES, MAX_SHARD_BYTES, KEY_SERVER_SHARD_BYTES;
int64_t SHARD_MAX_BYTES_PER_KSEC, // Shards with more than this bandwidth will be split immediately
SHARD_MIN_BYTES_PER_KSEC, // Shards with more than this bandwidth will not be merged
SHARD_SPLIT_BYTES_PER_KSEC; // When splitting a shard, it is split into pieces with less than this bandwidth

View File

@ -238,7 +238,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
//SOMEDAY: test lower memory limits, without making them too small and causing the database to stop making progress
FlowTransport::createInstance(1);
Sim2FileSystem::newFileSystem();
//simInitTLS();
simInitTLS();
NetworkAddress n(ip, port, true, useSSL);
Future<Void> listen = FlowTransport::transport().bind( n, n );
Future<Void> fd = fdbd( connFile, localities, processClass, *dataFolder, *coordFolder, 500e6, "", "");
@ -806,11 +806,10 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
// half the time, when we have more than 4 machines that are not the first in their dataCenter, assign classes
bool assignClasses = machineCount - dataCenters > 4 && g_random->random01() < 0.5;
// Use SSL half the time
//bool sslEnabled = g_random->random01() < 0.05;
//TEST( sslEnabled ); // SSL enabled
//TEST( !sslEnabled ); // SSL disabled
bool sslEnabled = false; // FIXME: Until we have a good solution for the TLS plugin, don't test SSL connections.
// Use SSL 5% of the time
bool sslEnabled = g_random->random01() < 0.05;
TEST( sslEnabled ); // SSL enabled
TEST( !sslEnabled ); // SSL disabled
vector<NetworkAddress> coordinatorAddresses;
for( int dc = 0; dc < dataCenters; dc++ ) {
@ -1012,7 +1011,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
"TestSystem", 0x01010101, 1, LocalityData(Optional<Standalone<StringRef>>(), Standalone<StringRef>(g_random->randomUniqueID().toString()), Optional<Standalone<StringRef>>(), Optional<Standalone<StringRef>>()), ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "" ), TaskDefaultYield ) );
Sim2FileSystem::newFileSystem();
FlowTransport::createInstance(1);
//simInitTLS();
simInitTLS();
TEST(true); // Simulation start

View File

@ -868,7 +868,7 @@ ACTOR static Future<StatusObject> processStatusFetcher(
return processMap;
}
static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap, std::map<NetworkAddress, std::string> traceLogGroupMap) {
StatusObject clientStatus;
clientStatus["count"] = (int64_t)clientVersionMap.size();
@ -890,10 +890,13 @@ static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
StatusArray clients = StatusArray();
for(auto client : cv.second) {
clients.push_back(client.toString());
StatusObject cli;
cli["address"] = client.toString();
cli["log_group"] = traceLogGroupMap[client];
clients.push_back(cli);
}
ver["clients"] = clients;
ver["connected_clients"] = clients;
versionsArray.push_back(ver);
}
@ -1688,6 +1691,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
ProcessIssuesMap workerIssues,
ProcessIssuesMap clientIssues,
ClientVersionMap clientVersionMap,
std::map<NetworkAddress, std::string> traceLogGroupMap,
ServerCoordinators coordinators,
std::vector<NetworkAddress> incompatibleConnections )
{
@ -1866,7 +1870,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
StatusObject processStatus = wait(processStatusFetcher(db, workers, pMetrics, mMetrics, latestError, traceFileOpenErrors, programStarts, processIssues, storageServers, tLogs, cx, configuration, &status_incomplete_reasons));
statusObj["processes"] = processStatus;
statusObj["clients"] = clientStatusFetcher(clientVersionMap);
statusObj["clients"] = clientStatusFetcher(clientVersionMap, traceLogGroupMap);
StatusArray incompatibleConnectionsArray;
for(auto it : incompatibleConnections) {

View File

@ -32,6 +32,6 @@ typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > Clie
std::string extractAttribute( std::string const& expanded, std::string const& attributeToExtract );
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<std::pair<WorkerInterface, ProcessClass>> const& workers,
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections );
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections );
#endif

View File

@ -77,7 +77,9 @@ struct CpuProfilerWorkload : TestWorkload
for(i = 0; i < self->profilingWorkers.size(); i++)
{
ProfilerRequest req;
req.type = ProfilerRequest::Type::FLOW;
req.action = enabled ? ProfilerRequest::Action::ENABLE : ProfilerRequest::Action::DISABLE;
req.duration = 0; //unused
//The profiler output name will be the ip.port.prof
req.outputFile = StringRef(toIPString(self->profilingWorkers[i].address().ip) + "." + format("%d", self->profilingWorkers[i].address().port) + ".prof");
@ -98,7 +100,9 @@ struct CpuProfilerWorkload : TestWorkload
//Enable (or disable) the profiler on the current tester
ProfilerRequest req;
req.type = ProfilerRequest::Type::FLOW;
req.action = enabled ? ProfilerRequest::Action::ENABLE : ProfilerRequest::Action::DISABLE;
req.duration = 0; //unused
req.outputFile = StringRef(toIPString(g_network->getLocalAddress().ip) + "." + format("%d", g_network->getLocalAddress().port) + ".prof");
updateCpuProfiler(req);

View File

@ -21,13 +21,7 @@
#include "Knobs.h"
#include "flow/flow.h"
FlowKnobs const* FLOW_KNOBS = getFlowKnobs();
FlowKnobs const* getFlowKnobs() {
if (!FLOW_KNOBS)
FLOW_KNOBS = new FlowKnobs();
return FLOW_KNOBS;
}
FlowKnobs const* FLOW_KNOBS = new FlowKnobs();
#define init( knob, value ) initKnob( knob, value, #knob )
@ -121,9 +115,9 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( MIN_TRACE_SEVERITY, isSimulated ? 0 : 10 ); // Related to the trace severity in Trace.h
init( MAX_TRACE_SUPPRESSIONS, 1e4 );
init( TRACE_FSYNC_ENABLED, 0 );
init( TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, 100 );
init( TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY, 120 );
init( TRACE_EVENT_THROTTLER_MSG_LIMIT, 1000 );
init( TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, 500 );
init( TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY, 1800.0 ); // 30 mins
init( TRACE_EVENT_THROTTLER_MSG_LIMIT, 20000 );
//TDMetrics
init( MAX_METRICS, 600 );

View File

@ -170,7 +170,4 @@ public:
extern FlowKnobs const* FLOW_KNOBS;
// This api should be used if FLOW_KNOBS is required during static initialization of global variables.
FlowKnobs const* getFlowKnobs();
#endif

View File

@ -92,4 +92,82 @@ private:
return metric;
}
};
template <class T>
struct TransientThresholdMetricSample : MetricSample<T> {
Deque< std::tuple<double, T, int64_t> > queue;
IndexedSet<T, int64_t> thresholdCrossedSet;
int64_t thresholdLimit;
TransientThresholdMetricSample(int64_t metricUnitsPerSample, int64_t threshold) : MetricSample<T>(metricUnitsPerSample), thresholdLimit(threshold) { }
bool roll(int64_t metric) {
return g_nondeterministic_random->random01() < (double)metric / this->metricUnitsPerSample; //< SOMEDAY: Better randomInt64?
}
template <class U>
bool isAboveThreshold(const U& key) {
auto i = thresholdCrossedSet.find(key);
if (i == thresholdCrossedSet.end())
return false;
else
return true;
}
// Returns the sampled metric value (possibly 0, possibly increased by the sampling factor)
template <class T_>
int64_t addAndExpire(T_&& key, int64_t metric, double expiration) {
int64_t x = add(std::forward<T_>(key), metric);
if (x)
queue.push_back(std::make_tuple(expiration, *this->sample.find(key), -x));
return x;
}
void poll() {
double now = ::now();
while (queue.size() &&
std::get<0>(queue.front()) <= now)
{
const T& key = std::get<1>(queue.front());
int64_t delta = std::get<2>(queue.front());
ASSERT(delta != 0);
int64_t val = this->sample.addMetric(T(key), delta);
if (val < thresholdLimit && (val + std::abs(delta)) >= thresholdLimit) {
auto iter = thresholdCrossedSet.find(key);
ASSERT(iter != thresholdCrossedSet.end())
thresholdCrossedSet.erase(iter);
}
if (val == 0)
this->sample.erase(key);
queue.pop_front();
}
}
private:
template <class T_>
int64_t add(T_&& key, int64_t metric) {
if (!metric) return 0;
int64_t mag = std::abs(metric);
if (mag < this->metricUnitsPerSample) {
if (!roll(mag))
return 0;
metric = metric<0 ? -this->metricUnitsPerSample : this->metricUnitsPerSample;
}
int64_t val = this->sample.addMetric(T(key), metric);
if (val >= thresholdLimit) {
ASSERT((val - metric) < thresholdLimit ? thresholdCrossedSet.find(key) == thresholdCrossedSet.end() : thresholdCrossedSet.find(key) != thresholdCrossedSet.end());
thresholdCrossedSet.insert(key, val);
}
if (val == 0)
this->sample.erase(key);
return metric;
}
};
#endif

View File

@ -51,7 +51,7 @@ using namespace boost::asio::ip;
// These impact both communications and the deserialization of certain database and IKeyValueStore keys
// xyzdev
// vvvv
uint64_t currentProtocolVersion = 0x0FDB00A551020001LL;
uint64_t currentProtocolVersion = 0x0FDB00A551030001LL;
uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -132,9 +132,10 @@ IRandom* trace_random = NULL;
LatestEventCache latestEventCache;
SuppressionMap suppressedEvents;
static TransientMetricSample<const char *> traceEventThrottlerCache(getFlowKnobs()->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE);
static TransientThresholdMetricSample<Standalone<StringRef>> *traceEventThrottlerCache;
static const char *TRACE_EVENT_THROTTLE_STARTING_TYPE = "TraceEventThrottle_";
struct TraceLog {
Standalone< VectorRef<StringRef> > buffer;
int file_length;
@ -425,7 +426,7 @@ struct TraceLog {
}
ThreadFuture<Void> flush() {
traceEventThrottlerCache.poll();
traceEventThrottlerCache->poll();
MutexHolder hold(mutex);
bool roll = false;
@ -685,20 +686,6 @@ bool TraceEvent::init( Severity severity, const char* type ) {
} else
enabled = false;
// TRACE_EVENT_THROTTLER
if ( enabled && (severity > SevDebug) &&
(strncmp(TRACE_EVENT_THROTTLE_STARTING_TYPE, type, strlen(TRACE_EVENT_THROTTLE_STARTING_TYPE)) != 0) ) {
// Not a Trace Event Throttle Type
if (traceEventThrottlerCache.getMetric(type) >= FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT) {
// Throttle Msg
enabled = false;
TraceEvent(SevWarnAlways, std::string(TRACE_EVENT_THROTTLE_STARTING_TYPE).append(type).c_str()).suppressFor(5);
}
else {
traceEventThrottlerCache.addAndExpire(type, 1, FLOW_KNOBS->TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY);
}
}
return enabled;
}
@ -877,6 +864,18 @@ TraceEvent& TraceEvent::backtrace(std::string prefix) {
TraceEvent::~TraceEvent() {
try {
if (enabled) {
// TRACE_EVENT_THROTTLER
if (severity > SevDebug && isNetworkThread()) {
if (traceEventThrottlerCache->isAboveThreshold(StringRef((uint8_t *)type, strlen(type)))) {
TraceEvent(SevWarnAlways, std::string(TRACE_EVENT_THROTTLE_STARTING_TYPE).append(type).c_str()).suppressFor(5);
// Throttle Msg
return;
}
else {
traceEventThrottlerCache->addAndExpire(StringRef((uint8_t *)type, strlen(type)), 1, now() + FLOW_KNOBS->TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY);
}
} // End of Throttler
_detailf("logGroup", "%.*s", g_traceLog.logGroup.size(), g_traceLog.logGroup.data());
if (!trackingKey.empty()) {
if(!isNetworkThread()) {
@ -981,6 +980,7 @@ void TraceEvent::writeEscapedfv( const char* format, va_list args ) {
thread_local bool TraceEvent::networkThread = false;
void TraceEvent::setNetworkThread() {
traceEventThrottlerCache = new TransientThresholdMetricSample<Standalone<StringRef>>(FLOW_KNOBS->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT);
networkThread = true;
}

View File

@ -70,7 +70,7 @@ Future<Optional<T>> stopAfter( Future<T> what ) {
T _ = wait(what);
ret = Optional<T>(_);
} catch (Error& e) {
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete;
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled;
TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
if(!ok) {
fprintf(stderr, "Fatal Error: %s\n", e.what());

View File

@ -28,6 +28,27 @@
#include "Arena.h"
#include <algorithm>
// Though similar, is_binary_serializable cannot be replaced by std::is_pod, as doing so would prefer
// memcpy over a defined serialize() method on a POD struct. As not all of our structs are packed,
// this would both inflate message sizes by transmitting padding, and mean that we're transmitting
// undefined bytes over the wire.
// A more intelligent SFINAE that does "binarySerialize if POD and no serialize() is defined" could
// replace the usage of is_binary_serializable.
template <class T>
struct is_binary_serializable { enum { value = 0 }; };
#define BINARY_SERIALIZABLE( T ) template<> struct is_binary_serializable<T> { enum { value = 1 }; };
BINARY_SERIALIZABLE( uint8_t );
BINARY_SERIALIZABLE( int16_t );
BINARY_SERIALIZABLE( uint16_t );
BINARY_SERIALIZABLE( int32_t );
BINARY_SERIALIZABLE( uint32_t );
BINARY_SERIALIZABLE( int64_t );
BINARY_SERIALIZABLE( uint64_t );
BINARY_SERIALIZABLE( bool );
BINARY_SERIALIZABLE( double );
template <class Archive, class Item>
inline typename Archive::WRITER& operator << (Archive& ar, const Item& item ) {
save(ar, const_cast<Item&>(item));
@ -88,7 +109,7 @@ inline void save( Archive& ar, const std::string& value ) {
}
template <class Archive, class T>
class Serializer< Archive, T, typename std::enable_if< std::is_pod<T>::value >::type> {
class Serializer< Archive, T, typename std::enable_if< is_binary_serializable<T>::value >::type> {
public:
static void serialize( Archive& ar, T& t ) {
ar.serializeBinaryItem(t);

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long