Added *BOOLEAN_PARAM macros to enforce documentation of boolean parameters

This commit is contained in:
sfc-gh-tclinkenbeard 2021-07-02 15:04:42 -07:00
parent 49d3b0c853
commit 79ff07a071
30 changed files with 250 additions and 163 deletions

View File

@ -598,7 +598,7 @@ int main(int argc, char** argv) {
Error::init();
StringRef url(param.container_url);
setupNetwork(0, true);
setupNetwork(0, UseMetrics::TRUE);
TraceEvent::setNetworkThread();
openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, param.log_dir, "convert", param.trace_log_group);

View File

@ -579,7 +579,7 @@ int main(int argc, char** argv) {
Error::init();
StringRef url(param.container_url);
setupNetwork(0, true);
setupNetwork(0, UseMetrics::TRUE);
TraceEvent::setNetworkThread();
openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, param.log_dir, "decode", param.trace_log_group);

View File

@ -3769,7 +3769,7 @@ int main(int argc, char* argv[]) {
Reference<IBackupContainer> c;
try {
setupNetwork(0, true);
setupNetwork(0, UseMetrics::TRUE);
} catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
return FDB_EXIT_ERROR;
@ -3813,7 +3813,7 @@ int main(int argc, char* argv[]) {
}
try {
db = Database::createDatabase(ccf, -1, true, localities);
db = Database::createDatabase(ccf, -1, IsInternal::TRUE, localities);
} catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
@ -3833,7 +3833,7 @@ int main(int argc, char* argv[]) {
}
try {
sourceDb = Database::createDatabase(sourceCcf, -1, true, localities);
sourceDb = Database::createDatabase(sourceCcf, -1, IsInternal::TRUE, localities);
} catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", sourceCcf->getFilename().c_str());

View File

@ -3322,7 +3322,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
TraceEvent::setNetworkThread();
try {
db = Database::createDatabase(ccf, -1, false);
db = Database::createDatabase(ccf, -1, IsInternal::FALSE);
if (!opt.exec.present()) {
printf("Using cluster file `%s'.\n", ccf->getFilename().c_str());
}

View File

@ -146,6 +146,13 @@ public:
WatchMetadata(Key key, Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
};
#ifndef __DATABASE_BOOLEAN_PARAMS__
#define __DATABASE_BOOLEAN_PARAMS__
DECLARE_BOOLEAN_PARAM(LockAware);
DECLARE_BOOLEAN_PARAM(EnableLocalityLoadBalance);
#endif
DECLARE_BOOLEAN_PARAM(IsSwitchable);
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
public:
static DatabaseContext* allocateOnForeignThread() {
@ -157,11 +164,11 @@ public:
static Database create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
Future<Void> clientInfoMonitor,
LocalityData clientLocality,
bool enableLocalityLoadBalance,
EnableLocalityLoadBalance,
TaskPriority taskID = TaskPriority::DefaultEndpoint,
bool lockAware = false,
LockAware lockAware = LockAware::FALSE,
int apiVersion = Database::API_VERSION_LATEST,
bool switchable = false);
IsSwitchable switchable = IsSwitchable::FALSE);
~DatabaseContext();
@ -217,7 +224,7 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value);
Error deferredError;
bool lockAware;
LockAware lockAware{ LockAware::FALSE };
bool isError() const { return deferredError.code() != invalid_error_code; }
@ -242,7 +249,7 @@ public:
// new cluster.
Future<Void> switchConnectionFile(Reference<ClusterConnectionFile> standby);
Future<Void> connectionFileChanged();
bool switchable = false;
IsSwitchable switchable{ IsSwitchable::FALSE };
// Management API, Attempt to kill or suspend a process, return 1 for request sent out, 0 for failure
Future<int64_t> rebootWorker(StringRef address, bool check = false, int duration = 0);
@ -259,11 +266,11 @@ public:
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
bool enableLocalityLoadBalance,
bool lockAware,
bool internal = true,
EnableLocalityLoadBalance,
LockAware,
IsInternal internal = IsInternal::TRUE,
int apiVersion = Database::API_VERSION_LATEST,
bool switchable = false);
IsSwitchable switchable = IsSwitchable::FALSE);
explicit DatabaseContext(const Error& err);
@ -282,7 +289,7 @@ public:
UID proxiesLastChange;
LocalityData clientLocality;
QueueModel queueModel;
bool enableLocalityLoadBalance;
EnableLocalityLoadBalance enableLocalityLoadBalance{ EnableLocalityLoadBalance::FALSE };
struct VersionRequest {
SpanID spanContext;
@ -329,7 +336,7 @@ public:
std::unordered_map<UID, Reference<TSSMetrics>> tssMetrics;
UID dbId;
bool internal; // Only contexts created through the C client and fdbcli are non-internal
IsInternal internal; // Only contexts created through the C client and fdbcli are non-internal
PrioritizedTransactionTagMap<ClientTagThrottleData> throttledTags;

View File

@ -85,6 +85,12 @@ using std::max;
using std::min;
using std::pair;
DEFINE_BOOLEAN_PARAM(LockAware);
DEFINE_BOOLEAN_PARAM(EnableLocalityLoadBalance);
DEFINE_BOOLEAN_PARAM(IsSwitchable);
DEFINE_BOOLEAN_PARAM(UseMetrics);
DEFINE_BOOLEAN_PARAM(IsInternal);
namespace {
template <class Interface, class Request>
@ -94,7 +100,8 @@ Future<REPLY_TYPE(Request)> loadBalance(
RequestStream<Request> Interface::*channel,
const Request& request = Request(),
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
AtMostOnce atMostOnce =
AtMostOnce::FALSE, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = nullptr) {
if (alternatives->hasCaches) {
return loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model);
@ -296,11 +303,16 @@ void DatabaseContext::validateVersion(Version version) {
ASSERT(version > 0 || version == latestVersion);
}
void validateOptionValue(Optional<StringRef> value, bool shouldBePresent) {
if (shouldBePresent && !value.present())
void validateOptionValuePresent(Optional<StringRef> value) {
if (!value.present()) {
throw invalid_option_value();
if (!shouldBePresent && value.present() && value.get().size() > 0)
}
}
void validateOptionValueNotPresent(Optional<StringRef> value) {
if (value.present() && value.get().size() > 0) {
throw invalid_option_value();
}
}
void dumpMutations(const MutationListRef& mutations) {
@ -1080,11 +1092,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
bool enableLocalityLoadBalance,
bool lockAware,
bool internal,
EnableLocalityLoadBalance enableLocalityLoadBalance,
LockAware lockAware,
IsInternal internal,
int apiVersion,
bool switchable)
IsSwitchable switchable)
: connectionFile(connectionFile), clientInfo(clientInfo), coordinator(coordinator),
clientInfoMonitor(clientInfoMonitor), taskID(taskID), clientLocality(clientLocality),
enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware), apiVersion(apiVersion),
@ -1360,7 +1372,7 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), latencies(1000),
readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), internal(false),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), internal(IsInternal::FALSE),
transactionTracingEnabled(true) {}
// Static constructor used by server processes to create a DatabaseContext
@ -1368,11 +1380,11 @@ DatabaseContext::DatabaseContext(const Error& err)
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
Future<Void> clientInfoMonitor,
LocalityData clientLocality,
bool enableLocalityLoadBalance,
EnableLocalityLoadBalance enableLocalityLoadBalance,
TaskPriority taskID,
bool lockAware,
LockAware lockAware,
int apiVersion,
bool switchable) {
IsSwitchable switchable) {
return Database(new DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>>(),
clientInfo,
makeReference<AsyncVar<Optional<ClientLeaderRegInterface>>>(),
@ -1381,7 +1393,7 @@ Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
clientLocality,
enableLocalityLoadBalance,
lockAware,
true,
IsInternal::TRUE,
apiVersion,
switchable));
}
@ -1491,7 +1503,7 @@ bool DatabaseContext::sampleOnCost(uint64_t cost) const {
}
int64_t extractIntOption(Optional<StringRef> value, int64_t minValue, int64_t maxValue) {
validateOptionValue(value, true);
validateOptionValuePresent(value);
if (value.get().size() != 8) {
throw invalid_option_value();
}
@ -1553,23 +1565,23 @@ void DatabaseContext::setOption(FDBDatabaseOptions::Option option, Optional<Stri
locationCache.insert(allKeys, Reference<LocationInfo>());
break;
case FDBDatabaseOptions::SNAPSHOT_RYW_ENABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
snapshotRywEnabled++;
break;
case FDBDatabaseOptions::SNAPSHOT_RYW_DISABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
snapshotRywEnabled--;
break;
case FDBDatabaseOptions::DISTRIBUTED_TRANSACTION_TRACE_ENABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
transactionTracingEnabled++;
break;
case FDBDatabaseOptions::DISTRIBUTED_TRANSACTION_TRACE_DISABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
transactionTracingEnabled--;
break;
case FDBDatabaseOptions::USE_CONFIG_DATABASE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
useConfigDatabase = true;
break;
default:
@ -1669,7 +1681,7 @@ extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& c
// on another thread
Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
int apiVersion,
bool internal,
IsInternal internal,
LocalityData const& clientLocality,
DatabaseContext* preallocatedDb) {
if (!g_network)
@ -1730,11 +1742,11 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
clientInfoMonitor,
TaskPriority::DefaultEndpoint,
clientLocality,
true,
false,
EnableLocalityLoadBalance::TRUE,
LockAware::FALSE,
internal,
apiVersion,
/*switchable*/ true);
IsSwitchable::TRUE);
} else {
db = new DatabaseContext(connectionFile,
clientInfo,
@ -1742,11 +1754,11 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
clientInfoMonitor,
TaskPriority::DefaultEndpoint,
clientLocality,
true,
false,
EnableLocalityLoadBalance::TRUE,
LockAware::FALSE,
internal,
apiVersion,
/*switchable*/ true);
IsSwitchable::TRUE);
}
auto database = Database(db);
@ -1756,7 +1768,7 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
Database Database::createDatabase(std::string connFileName,
int apiVersion,
bool internal,
IsInternal internal,
LocalityData const& clientLocality) {
Reference<ClusterConnectionFile> rccf = Reference<ClusterConnectionFile>(
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first));
@ -1803,15 +1815,15 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
networkOptions.traceDirectory = value.present() ? value.get().toString() : "";
break;
case FDBNetworkOptions::TRACE_ROLL_SIZE:
validateOptionValue(value, true);
validateOptionValuePresent(value);
networkOptions.traceRollSize = extractIntOption(value, 0, std::numeric_limits<int64_t>::max());
break;
case FDBNetworkOptions::TRACE_MAX_LOGS_SIZE:
validateOptionValue(value, true);
validateOptionValuePresent(value);
networkOptions.traceMaxLogsSize = extractIntOption(value, 0, std::numeric_limits<int64_t>::max());
break;
case FDBNetworkOptions::TRACE_FORMAT:
validateOptionValue(value, true);
validateOptionValuePresent(value);
networkOptions.traceFormat = value.get().toString();
if (!validateTraceFormat(networkOptions.traceFormat)) {
fprintf(stderr, "Unrecognized trace format: `%s'\n", networkOptions.traceFormat.c_str());
@ -1819,7 +1831,7 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
}
break;
case FDBNetworkOptions::TRACE_FILE_IDENTIFIER:
validateOptionValue(value, true);
validateOptionValuePresent(value);
networkOptions.traceFileIdentifier = value.get().toString();
if (networkOptions.traceFileIdentifier.length() > CLIENT_KNOBS->TRACE_LOG_FILE_IDENTIFIER_MAX_LENGTH) {
fprintf(stderr, "Trace file identifier provided is too long.\n");
@ -1840,7 +1852,7 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
}
break;
case FDBNetworkOptions::TRACE_CLOCK_SOURCE:
validateOptionValue(value, true);
validateOptionValuePresent(value);
networkOptions.traceClockSource = value.get().toString();
if (!validateTraceClockSource(networkOptions.traceClockSource)) {
fprintf(stderr, "Unrecognized trace clock source: `%s'\n", networkOptions.traceClockSource.c_str());
@ -1848,7 +1860,7 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
}
break;
case FDBNetworkOptions::KNOB: {
validateOptionValue(value, true);
validateOptionValuePresent(value);
std::string optionValue = value.get().toString();
TraceEvent("SetKnob").detail("KnobString", optionValue);
@ -1872,42 +1884,42 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
break;
}
case FDBNetworkOptions::TLS_PLUGIN:
validateOptionValue(value, true);
validateOptionValuePresent(value);
break;
case FDBNetworkOptions::TLS_CERT_PATH:
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.setCertificatePath(value.get().toString());
break;
case FDBNetworkOptions::TLS_CERT_BYTES: {
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.setCertificateBytes(value.get().toString());
break;
}
case FDBNetworkOptions::TLS_CA_PATH: {
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.setCAPath(value.get().toString());
break;
}
case FDBNetworkOptions::TLS_CA_BYTES: {
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.setCABytes(value.get().toString());
break;
}
case FDBNetworkOptions::TLS_PASSWORD:
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.setPassword(value.get().toString());
break;
case FDBNetworkOptions::TLS_KEY_PATH:
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.setKeyPath(value.get().toString());
break;
case FDBNetworkOptions::TLS_KEY_BYTES: {
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.setKeyBytes(value.get().toString());
break;
}
case FDBNetworkOptions::TLS_VERIFY_PEERS:
validateOptionValue(value, true);
validateOptionValuePresent(value);
tlsConfig.clearVerifyPeers();
tlsConfig.addVerifyPeers(value.get().toString());
break;
@ -1918,16 +1930,16 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
enableBuggify(false, BuggifyType::Client);
break;
case FDBNetworkOptions::CLIENT_BUGGIFY_SECTION_ACTIVATED_PROBABILITY:
validateOptionValue(value, true);
validateOptionValuePresent(value);
clearBuggifySections(BuggifyType::Client);
P_BUGGIFIED_SECTION_ACTIVATED[int(BuggifyType::Client)] = double(extractIntOption(value, 0, 100)) / 100.0;
break;
case FDBNetworkOptions::CLIENT_BUGGIFY_SECTION_FIRED_PROBABILITY:
validateOptionValue(value, true);
validateOptionValuePresent(value);
P_BUGGIFIED_SECTION_FIRES[int(BuggifyType::Client)] = double(extractIntOption(value, 0, 100)) / 100.0;
break;
case FDBNetworkOptions::DISABLE_CLIENT_STATISTICS_LOGGING:
validateOptionValue(value, false);
validateOptionValuePresent(value);
networkOptions.logClientInfo = false;
break;
case FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS: {
@ -1947,11 +1959,11 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
break;
}
case FDBNetworkOptions::ENABLE_RUN_LOOP_PROFILING: // Same as ENABLE_SLOW_TASK_PROFILING
validateOptionValue(value, false);
validateOptionValuePresent(value);
networkOptions.runLoopProfilingEnabled = true;
break;
case FDBNetworkOptions::DISTRIBUTED_CLIENT_TRACER: {
validateOptionValue(value, true);
validateOptionValuePresent(value);
std::string tracer = value.get().toString();
if (tracer == "none" || tracer == "disabled") {
openTracer(TracerType::DISABLED);
@ -1994,7 +2006,7 @@ ACTOR Future<Void> monitorNetworkBusyness() {
}
// Setup g_network and start monitoring for network busyness
void setupNetwork(uint64_t transportId, bool useMetrics) {
void setupNetwork(uint64_t transportId, UseMetrics useMetrics) {
if (g_network)
throw network_already_setup();
@ -2469,7 +2481,7 @@ ACTOR Future<Optional<Value>> getValue(Future<Version> version,
GetValueRequest(
span.context, key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID),
TaskPriority::DefaultPromiseEndpoint,
false,
AtMostOnce::FALSE,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
@ -2581,7 +2593,7 @@ ACTOR Future<Key> getKey(Database cx, KeySelector k, Future<Version> version, Tr
&StorageServerInterface::getKey,
req,
TaskPriority::DefaultPromiseEndpoint,
false,
AtMostOnce::FALSE,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
@ -2974,7 +2986,7 @@ ACTOR Future<RangeResult> getExactRange(Database cx,
&StorageServerInterface::getKeyValues,
req,
TaskPriority::DefaultPromiseEndpoint,
false,
AtMostOnce::FALSE,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
rep = _rep;
}
@ -3325,7 +3337,7 @@ ACTOR Future<RangeResult> getRange(Database cx,
&StorageServerInterface::getKeyValues,
req,
TaskPriority::DefaultPromiseEndpoint,
false,
AtMostOnce::FALSE,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
rep = _rep;
++cx->transactionPhysicalReadsCompleted;
@ -4797,7 +4809,7 @@ ACTOR static Future<Void> tryCommit(Database cx,
&CommitProxyInterface::commit,
req,
TaskPriority::DefaultPromiseEndpoint,
true);
AtMostOnce::TRUE);
}
choose {
@ -5047,7 +5059,7 @@ Future<Void> Transaction::commit() {
void Transaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
switch (option) {
case FDBTransactionOptions::INITIALIZE_NEW_DATABASE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
if (readVersion.isValid())
throw read_version_already_set();
readVersion = Version(0);
@ -5055,37 +5067,37 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::CAUSAL_READ_RISKY:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.getReadVersionFlags |= GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY;
break;
case FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.priority = TransactionPriority::IMMEDIATE;
break;
case FDBTransactionOptions::PRIORITY_BATCH:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.priority = TransactionPriority::BATCH;
break;
case FDBTransactionOptions::CAUSAL_WRITE_RISKY:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.causalWriteRisky = true;
break;
case FDBTransactionOptions::COMMIT_ON_FIRST_PROXY:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.commitOnFirstProxy = true;
break;
case FDBTransactionOptions::CHECK_WRITES_ENABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.checkWritesEnabled = true;
break;
case FDBTransactionOptions::DEBUG_DUMP:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.debugDump = true;
break;
@ -5095,7 +5107,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER:
validateOptionValue(value, true);
validateOptionValuePresent(value);
if (value.get().size() > 100 || value.get().size() == 0) {
throw invalid_option_value();
@ -5122,7 +5134,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::LOG_TRANSACTION:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
if (trLogInfo && !trLogInfo->identifier.empty()) {
trLogInfo->logTo(TransactionLogInfo::TRACE_LOG);
} else {
@ -5133,7 +5145,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::TRANSACTION_LOGGING_MAX_FIELD_LENGTH:
validateOptionValue(value, true);
validateOptionValuePresent(value);
{
int maxFieldLength = extractIntOption(value, -1, std::numeric_limits<int32_t>::max());
if (maxFieldLength == 0) {
@ -5147,7 +5159,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::SERVER_REQUEST_TRACING:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
debugTransaction(deterministicRandom()->randomUniqueID());
if (trLogInfo && !trLogInfo->identifier.empty()) {
TraceEvent(SevInfo, "TransactionBeingTraced")
@ -5157,23 +5169,23 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::MAX_RETRY_DELAY:
validateOptionValue(value, true);
validateOptionValuePresent(value);
options.maxBackoff = extractIntOption(value, 0, std::numeric_limits<int32_t>::max()) / 1000.0;
break;
case FDBTransactionOptions::SIZE_LIMIT:
validateOptionValue(value, true);
validateOptionValuePresent(value);
options.sizeLimit = extractIntOption(value, 32, CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT);
break;
case FDBTransactionOptions::LOCK_AWARE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.lockAware = true;
options.readOnly = false;
break;
case FDBTransactionOptions::READ_LOCK_AWARE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
if (!options.lockAware) {
options.lockAware = true;
options.readOnly = true;
@ -5181,34 +5193,34 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::FIRST_IN_BATCH:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.firstInBatch = true;
break;
case FDBTransactionOptions::USE_PROVISIONAL_PROXIES:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.getReadVersionFlags |= GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES;
info.useProvisionalProxies = true;
break;
case FDBTransactionOptions::INCLUDE_PORT_IN_ADDRESS:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.includePort = true;
break;
case FDBTransactionOptions::TAG:
validateOptionValue(value, true);
validateOptionValuePresent(value);
options.tags.addTag(value.get());
break;
case FDBTransactionOptions::AUTO_THROTTLE_TAG:
validateOptionValue(value, true);
validateOptionValuePresent(value);
options.tags.addTag(value.get());
options.readTags.addTag(value.get());
break;
case FDBTransactionOptions::SPAN_PARENT:
validateOptionValue(value, true);
validateOptionValuePresent(value);
if (value.get().size() != 16) {
throw invalid_option_value();
}
@ -5216,12 +5228,12 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.reportConflictingKeys = true;
break;
case FDBTransactionOptions::EXPENSIVE_CLEAR_COST_ESTIMATION_ENABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.expensiveClearCostEstimation = true;
break;
@ -6117,7 +6129,7 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
&CommitProxyInterface::proxySnapReq,
ProxySnapRequest(snapCmd, snapUID, snapUID),
cx->taskID,
true /*atmostOnce*/))) {
AtMostOnce::TRUE))) {
TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd.toString()).detail("UID", snapUID);
return Void();
}
@ -6297,4 +6309,4 @@ ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, bool lock
}
}
return Void();
}
}

View File

@ -27,6 +27,7 @@
#elif !defined(FDBCLIENT_NATIVEAPI_ACTOR_H)
#define FDBCLIENT_NATIVEAPI_ACTOR_H
#include "flow/BooleanParam.h"
#include "flow/flow.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/FDBTypes.h"
@ -51,7 +52,8 @@ void addref(DatabaseContext* ptr);
template <>
void delref(DatabaseContext* ptr);
void validateOptionValue(Optional<StringRef> value, bool shouldBePresent);
void validateOptionValuePresent(Optional<StringRef> value);
void validateOptionValueNotPresent(Optional<StringRef> value);
void enableClientInfoLogging();
@ -72,6 +74,9 @@ struct NetworkOptions {
NetworkOptions();
};
// TODO: Reduce scope?
DECLARE_BOOLEAN_PARAM(IsInternal);
class Database {
public:
enum { API_VERSION_LATEST = -1 };
@ -81,13 +86,13 @@ public:
// on another thread
static Database createDatabase(Reference<ClusterConnectionFile> connFile,
int apiVersion,
bool internal = true,
IsInternal internal = IsInternal::TRUE,
LocalityData const& clientLocality = LocalityData(),
DatabaseContext* preallocatedDb = nullptr);
static Database createDatabase(std::string connFileName,
int apiVersion,
bool internal = true,
IsInternal internal = IsInternal::TRUE,
LocalityData const& clientLocality = LocalityData());
Database() {} // an uninitialized database can be destructed or reassigned safely; that's it
@ -111,8 +116,10 @@ private:
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
DECLARE_BOOLEAN_PARAM(UseMetrics);
// Configures the global networking machinery
void setupNetwork(uint64_t transportId = 0, bool useMetrics = false);
void setupNetwork(uint64_t transportId = 0, UseMetrics useMetrics = UseMetrics::FALSE);
// This call blocks while the network is running. To use the API in a single-threaded
// environment, the calling program must have ACTORs already launched that are waiting

View File

@ -2165,7 +2165,7 @@ void ReadYourWritesTransaction::setOption(FDBTransactionOptions::Option option,
void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option option, Optional<StringRef> value) {
switch (option) {
case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
if (!reading.isReady() || !cache.empty() || !writes.empty())
throw client_invalid_operation();
@ -2174,26 +2174,26 @@ void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option opti
break;
case FDBTransactionOptions::READ_AHEAD_DISABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.readAheadDisabled = true;
break;
case FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.nextWriteDisableConflictRange = true;
break;
case FDBTransactionOptions::ACCESS_SYSTEM_KEYS:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.readSystemKeys = true;
options.writeSystemKeys = true;
break;
case FDBTransactionOptions::READ_SYSTEM_KEYS:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.readSystemKeys = true;
break;
@ -2217,30 +2217,30 @@ void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option opti
transactionDebugInfo->transactionName = value.present() ? value.get().toString() : "";
break;
case FDBTransactionOptions::SNAPSHOT_RYW_ENABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.snapshotRywEnabled++;
break;
case FDBTransactionOptions::SNAPSHOT_RYW_DISABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.snapshotRywEnabled--;
break;
case FDBTransactionOptions::USED_DURING_COMMIT_PROTECTION_DISABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.disableUsedDuringCommitProtection = true;
break;
case FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.specialKeySpaceRelaxed = true;
break;
case FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.specialKeySpaceChangeConfiguration = true;
break;
case FDBTransactionOptions::BYPASS_UNREADABLE:
validateOptionValue(value, false);
validateOptionValueNotPresent(value);
options.bypassUnreadable = true;
break;
default:

View File

@ -122,7 +122,7 @@ ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion)
[db, connFile, apiVersion]() {
try {
Database::createDatabase(
Reference<ClusterConnectionFile>(connFile), apiVersion, false, LocalityData(), db)
Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::FALSE, LocalityData(), db)
.extractPtr();
} catch (Error& e) {
new (db) DatabaseContext(e);

View File

@ -18,9 +18,13 @@
* limitations under the License.
*/
#include "fdbrpc/LoadBalance.actor.h"
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
DEFINE_BOOLEAN_PARAM(AtMostOnce);
DEFINE_BOOLEAN_PARAM(TriedAllOptions);
// Throwing all_alternatives_failed will cause the client to issue a GetKeyLocationRequest to the proxy, so this actor
// attempts to limit the number of these errors thrown by a single client to prevent it from saturating the proxies with
// these requests
@ -49,4 +53,4 @@ ACTOR Future<Void> allAlternativesFailedDelay(Future<Void> okFuture) {
when(wait(::delayJittered(delay))) { throw all_alternatives_failed(); }
}
return Void();
}
}

View File

@ -28,6 +28,7 @@
#elif !defined(FLOW_LOADBALANCE_ACTOR_H)
#define FLOW_LOADBALANCE_ACTOR_H
#include "flow/BooleanParam.h"
#include "flow/flow.h"
#include "flow/Knobs.h"
@ -238,6 +239,9 @@ Future<Void> tssComparison(Req req,
return Void();
}
DECLARE_BOOLEAN_PARAM(AtMostOnce);
DECLARE_BOOLEAN_PARAM(TriedAllOptions);
// Stores state for a request made by the load balancer
template <class Request, class Interface, class Multi>
struct RequestData : NonCopyable {
@ -245,7 +249,7 @@ struct RequestData : NonCopyable {
Future<Reply> response;
Reference<ModelHolder> modelHolder;
bool triedAllOptions = false;
TriedAllOptions triedAllOptions{ TriedAllOptions::FALSE };
bool requestStarted = false; // true once the request has been sent to an alternative
bool requestProcessed = false; // true once a response has been received and handled by checkAndProcessResult
@ -284,7 +288,7 @@ struct RequestData : NonCopyable {
// Initializes the request state and starts it, possibly after a backoff delay
void startRequest(
double backoff,
bool triedAllOptions,
TriedAllOptions triedAllOptions,
RequestStream<Request> const* stream,
Request& request,
QueueModel* model,
@ -320,8 +324,8 @@ struct RequestData : NonCopyable {
// A return value with an error means that the error should be thrown back to original caller
static ErrorOr<bool> checkAndProcessResultImpl(Reply const& result,
Reference<ModelHolder> modelHolder,
bool atMostOnce,
bool triedAllOptions) {
AtMostOnce atMostOnce,
TriedAllOptions triedAllOptions) {
ASSERT(modelHolder);
Optional<LoadBalancedReply> loadBalancedReply;
@ -377,7 +381,7 @@ struct RequestData : NonCopyable {
// A return value of true means that the request completed successfully
// A return value of false means that the request failed but should be retried
// In the event of a non-retryable failure, an error is thrown indicating the failure
bool checkAndProcessResult(bool atMostOnce) {
bool checkAndProcessResult(AtMostOnce atMostOnce) {
ASSERT(response.isReady());
requestProcessed = true;
@ -412,9 +416,9 @@ struct RequestData : NonCopyable {
// We need to process the lagging request in order to update the queue model
Reference<ModelHolder> holderCapture = std::move(modelHolder);
bool triedAllOptionsCapture = triedAllOptions;
auto triedAllOptionsCapture = triedAllOptions;
Future<Void> updateModel = map(response, [holderCapture, triedAllOptionsCapture](Reply result) {
checkAndProcessResultImpl(result, holderCapture, false, triedAllOptionsCapture);
checkAndProcessResultImpl(result, holderCapture, AtMostOnce::FALSE, triedAllOptionsCapture);
return Void();
});
model->addActor.send(updateModel);
@ -441,7 +445,8 @@ Future<REPLY_TYPE(Request)> loadBalance(
RequestStream<Request> Interface::*channel,
Request request = Request(),
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
AtMostOnce atMostOnce =
AtMostOnce::FALSE, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = nullptr) {
state RequestData<Request, Interface, Multi> firstRequestData;
@ -453,6 +458,8 @@ Future<REPLY_TYPE(Request)> loadBalance(
state Promise<Void> requestFinished;
state double startTime = now();
state TriedAllOptions triedAllOptions = TriedAllOptions::FALSE;
setReplyPriority(request, taskID);
if (!alternatives)
return Never();
@ -556,7 +563,6 @@ Future<REPLY_TYPE(Request)> loadBalance(
state int numAttempts = 0;
state double backoff = 0;
state bool triedAllOptions = false;
// Issue requests to selected servers.
loop {
if (now() - startTime > (g_network->isSimulated() ? 30.0 : 600.0)) {
@ -595,7 +601,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
break;
nextAlt = (nextAlt + 1) % alternatives->size();
if (nextAlt == startAlt)
triedAllOptions = true;
triedAllOptions = TriedAllOptions::TRUE;
stream = nullptr;
}
@ -702,7 +708,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
nextAlt = (nextAlt + 1) % alternatives->size();
if (nextAlt == startAlt)
triedAllOptions = true;
triedAllOptions = TriedAllOptions::TRUE;
resetReply(request, taskID);
secondDelay = Never();
}
@ -724,7 +730,7 @@ Future<REPLY_TYPE(Request)> basicLoadBalance(Reference<ModelInterface<Multi>> al
RequestStream<Request> Interface::*channel,
Request request = Request(),
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false) {
AtMostOnce atMostOnce = AtMostOnce::FALSE) {
setReplyPriority(request, taskID);
if (!alternatives)
return Never();

View File

@ -243,7 +243,7 @@ struct BackupData {
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false),
lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)) {
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE);
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });

View File

@ -133,9 +133,9 @@ public:
serverInfo(new AsyncVar<ServerDBInfo>()), db(DatabaseContext::create(clientInfo,
Future<Void>(),
LocalityData(),
true,
EnableLocalityLoadBalance::TRUE,
TaskPriority::DefaultEndpoint,
true)) // SOMEDAY: Locality!
LockAware::TRUE)) // SOMEDAY: Locality!
{}
void setDistributor(const DataDistributorInterface& interf) {
@ -2860,7 +2860,7 @@ public:
serverInfo.clusterInterface = ccInterface;
serverInfo.myLocality = locality;
db.serverInfo->set(serverInfo);
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, true, true);
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE);
}
~ClusterControllerData() {

View File

@ -5765,7 +5765,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
state double lastLimited = 0;
self->addActor.send(monitorBatchLimitedTime(self->dbInfo, &lastLimited));
state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DataDistributionLaunch, true, true);
state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DataDistributionLaunch, LockAware::TRUE);
cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE;
// cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*)
@ -6106,7 +6106,7 @@ static std::set<int> const& normalDataDistributorErrors() {
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, true, true);
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::TRUE);
state ReadYourWritesTransaction tr(cx);
loop {
try {
@ -6447,7 +6447,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
state Reference<DataDistributorData> self(new DataDistributorData(db, di.id()));
state Future<Void> collection = actorCollection(self->addActor.getFuture());
state PromiseStream<GetMetricsListRequest> getShardMetricsList;
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, true, true);
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::TRUE);
state ActorCollection actors(false);
state DDEnabledState ddEnabledState;
self->addActor.send(actors.getResult());
@ -6498,8 +6498,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
Reference<IReplicationPolicy> policy,
int processCount) {
Database database =
DatabaseContext::create(makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), false);
Database database = DatabaseContext::create(
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::FALSE);
DatabaseConfiguration conf;
conf.storageTeamSize = teamSize;
@ -6541,8 +6541,8 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
Reference<IReplicationPolicy> policy,
int processCount) {
Database database =
DatabaseContext::create(makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), false);
Database database = DatabaseContext::create(
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::FALSE);
DatabaseConfiguration conf;
conf.storageTeamSize = teamSize;

View File

@ -253,7 +253,7 @@ struct GrvProxyData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Reference<AsyncVar<ServerDBInfo>> db)
: dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db), lastStartCommit(0),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE)), db(db), lastStartCommit(0),
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0),
minKnownCommittedVersion(invalidVersion) {}
};

View File

@ -311,7 +311,7 @@ struct TLogData : NonCopyable {
targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopRequest(false),
ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped() {
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE);
}
};

View File

@ -375,7 +375,7 @@ struct TLogData : NonCopyable {
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopRequest(false),
ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped() {
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE);
}
};

View File

@ -247,7 +247,7 @@ struct ProxyCommitData {
mostRecentProcessedRequestNumber(0), getConsistentReadVersion(getConsistentReadVersion), commit(commit),
lastCoalesceTime(0), localCommitBatchesStarted(0), locked(false),
commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), firstProxy(firstProxy),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0),
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0),
lastMasterReset(now()), lastResolverReset(now()) {

View File

@ -1409,7 +1409,7 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true));
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE));
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;
state std::vector<TLogInterface> tlogInterfs;

View File

@ -410,7 +410,7 @@ ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> connFile,
LocalityData locality,
std::string coordFolder) {
try {
Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST, true, locality);
Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST, IsInternal::TRUE, locality);
wait(reportErrors(_restoreWorker(cx, locality), "RestoreWorker"));
} catch (Error& e) {
TraceEvent("FastRestoreWorker").detail("Error", e.what());

View File

@ -2605,10 +2605,9 @@ ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<ServerDBI
std::set<std::string>* incomplete_reasons) {
state JsonBuilderObject statusObj;
state Database cx = openDBOnServer(db,
TaskPriority::DefaultEndpoint,
true,
false); // Open a new database connection that isn't lock-aware
state Database cx =
openDBOnServer(db,
TaskPriority::DefaultEndpoint); // Open a new database connection that isn't lock-aware
state Transaction tr(cx);
state int timeoutSeconds = 5;
state Future<Void> getTimeout = delay(timeoutSeconds);

View File

@ -251,7 +251,7 @@ public:
newestAvailableVersion.insert(allKeys, invalidVersion);
newestDirtyVersion.insert(allKeys, invalidVersion);
addCacheRange(CacheRangeInfo::newNotAssigned(allKeys));
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE);
}
// Puts the given cacheRange into cachedRangeMap. The caller is responsible for adding cacheRanges
@ -1392,7 +1392,7 @@ ACTOR Future<Void> fetchKeys(StorageCacheData* data, AddingCacheRange* cacheRang
// TODO: NEELAM: what's this for?
// FIXME: remove when we no longer support upgrades from 5.X
if (debug_getRangeRetries >= 100) {
data->cx->enableLocalityLoadBalance = false;
data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::FALSE;
}
debug_getRangeRetries++;

View File

@ -386,7 +386,7 @@ struct TLogData : NonCopyable {
commitLatencyDist(Histogram::getHistogram(LiteralStringRef("tLog"),
LiteralStringRef("commit"),
Histogram::Unit::microseconds)) {
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE);
}
};

View File

@ -830,10 +830,16 @@ ACTOR Future<Void> traceRole(Role role, UID roleId);
struct ServerDBInfo;
#ifndef __DATABASE_BOOLEAN_PARAMS__
#define __DATABASE_BOOLEAN_PARAMS__
DECLARE_BOOLEAN_PARAM(EnableLocalityLoadBalance);
DECLARE_BOOLEAN_PARAM(LockAware);
#endif
class Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
TaskPriority taskID = TaskPriority::DefaultEndpoint,
bool enableLocalityLoadBalance = true,
bool lockAware = false);
LockAware lockAware = LockAware::FALSE,
EnableLocalityLoadBalance enableLocalityLoadBalance = EnableLocalityLoadBalance::TRUE);
ACTOR Future<Void> extractClusterInterface(Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> a,
Reference<AsyncVar<Optional<struct ClusterInterface>>> b);

View File

@ -577,7 +577,7 @@ Future<Void> sendMasterRegistration(MasterData* self,
}
ACTOR Future<Void> updateRegistration(Reference<MasterData> self, Reference<ILogSystem> logSystem) {
state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DefaultEndpoint, true, true);
state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE);
state Future<Void> trigger = self->registrationTrigger.onTrigger();
state Future<Void> updateLogsKey;
@ -1965,7 +1965,7 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
self->addActor.send(resolutionBalancing(self));
self->addActor.send(changeCoordinators(self));
Database cx = openDBOnServer(self->dbInfo, TaskPriority::DefaultEndpoint, true, true);
Database cx = openDBOnServer(self->dbInfo, TaskPriority::DefaultEndpoint, LockAware::TRUE);
self->addActor.send(configurationMonitor(self, cx));
if (self->configuration.backupWorkerEnabled) {
self->addActor.send(recruitBackupWorkers(self, cx));

View File

@ -852,7 +852,7 @@ public:
newestDirtyVersion.insert(allKeys, invalidVersion);
addShard(ShardInfo::newNotAssigned(allKeys));
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE);
}
//~StorageServer() { fclose(log); }
@ -2970,7 +2970,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// FIXME: remove when we no longer support upgrades from 5.X
if (debug_getRangeRetries >= 100) {
data->cx->enableLocalityLoadBalance = false;
data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::FALSE;
TraceEvent(SevWarnAlways, "FKDisableLB").detail("FKID", fetchKeysID);
}
@ -3018,7 +3018,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
}
// FIXME: remove when we no longer support upgrades from 5.X
data->cx->enableLocalityLoadBalance = true;
data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::TRUE;
TraceEvent(SevWarnAlways, "FKReenableLB").detail("FKID", fetchKeysID);
// We have completed the fetch and write of the data, now we wait for MVCC window to pass.

View File

@ -607,7 +607,7 @@ ACTOR Future<Void> testerServerWorkload(WorkloadRequest work,
startRole(Role::TESTER, workIface.id(), UID(), details);
if (work.useDatabase) {
cx = Database::createDatabase(ccf, -1, true, locality);
cx = Database::createDatabase(ccf, -1, IsInternal::TRUE, locality);
wait(delay(1.0));
}

View File

@ -138,8 +138,8 @@ ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo>> db
Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
TaskPriority taskID,
bool enableLocalityLoadBalance,
bool lockAware) {
LockAware lockAware,
EnableLocalityLoadBalance enableLocalityLoadBalance) {
auto info = makeReference<AsyncVar<ClientDBInfo>>();
auto cx = DatabaseContext::create(info,
extractClientInfo(db, info),
@ -1215,15 +1215,15 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
if (metricsConnFile.size() > 0) {
try {
state Database db =
Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, true, locality);
Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, IsInternal::TRUE, locality);
metricsLogger = runMetrics(db, KeyRef(metricsPrefix));
} catch (Error& e) {
TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);
}
} else {
bool lockAware = metricsPrefix.size() && metricsPrefix[0] == '\xff';
metricsLogger = runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, lockAware),
KeyRef(metricsPrefix));
auto lockAware = metricsPrefix.size() && metricsPrefix[0] == '\xff' ? LockAware::TRUE : LockAware::FALSE;
metricsLogger =
runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, lockAware), KeyRef(metricsPrefix));
}
}

45
flow/BooleanParam.h Normal file
View File

@ -0,0 +1,45 @@
/*
* Arena.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "flow/Trace.h"
#define DECLARE_BOOLEAN_PARAM(ParamName) \
class ParamName { \
bool value; \
explicit constexpr ParamName(bool value) : value(value) {} \
\
public: \
operator bool() const { return value; } \
static ParamName const TRUE, FALSE; \
}; \
template <> \
struct Traceable<ParamName> : std::true_type { \
static std::string toString(ParamName const& value) { return Traceable<bool>::toString(value); } \
};
#define DEFINE_BOOLEAN_PARAM(ParamName) \
ParamName const ParamName::TRUE = ParamName(true); \
ParamName const ParamName::FALSE = ParamName(false);
#define BOOLEAN_PARAM(ParamName) \
DECLARE_BOOLEAN_PARAM(ParamName) \
DEFINE_BOOLEAN_PARAM(ParamName)\

View File

@ -6,6 +6,7 @@ set(FLOW_SRCS
Arena.cpp
Arena.h
AsioReactor.h
BooleanParam.h
CompressedInt.actor.cpp
CompressedInt.h
Deque.cpp