Refactor and clean up
This commit is contained in:
parent
1f166bc183
commit
3669615e4b
|
@ -319,6 +319,8 @@ public:
|
|||
|
||||
int snapshotRywEnabled;
|
||||
|
||||
bool tracingEnabled;
|
||||
|
||||
Future<Void> logger;
|
||||
Future<Void> throttleExpirer;
|
||||
|
||||
|
|
|
@ -873,7 +873,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc),
|
||||
transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0), latencies(1000), readLatencies(1000),
|
||||
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal), tracingEnabled(true),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)) {
|
||||
|
@ -949,8 +949,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::TRACING, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<TracingOptionsImpl>(
|
||||
KeyRangeRef(LiteralStringRef("a"), LiteralStringRef("a0")) // TODO: Remove prefix
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING).begin)));
|
||||
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
|
||||
// Uncomment this and communt out the above instantiation of TracingOptionsImpl successfully read from \xff\xff/tracing/test/
|
||||
// std::make_unique<TracingOptionsImpl>(
|
||||
// KeyRangeRef(LiteralStringRef("test/"), LiteralStringRef("test0"))
|
||||
// .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING).begin)));
|
||||
}
|
||||
if (apiVersionAtLeast(630)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
|
@ -1049,7 +1052,7 @@ DatabaseContext::DatabaseContext(const Error& err)
|
|||
transactionsProcessBehind("ProcessBehind", cc), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), internal(false) {}
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), internal(false), tracingEnabled(true) {}
|
||||
|
||||
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, TaskPriority taskID, bool lockAware, int apiVersion, bool switchable) {
|
||||
return Database( new DatabaseContext( Reference<AsyncVar<Reference<ClusterConnectionFile>>>(), clientInfo, clientInfoMonitor, taskID, clientLocality, enableLocalityLoadBalance, lockAware, true, apiVersion, switchable ) );
|
||||
|
@ -1212,6 +1215,10 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
|
|||
validateOptionValue(value, false);
|
||||
snapshotRywEnabled--;
|
||||
break;
|
||||
case FDBDatabaseOptions::DISABLE_TRACING:
|
||||
validateOptionValue(value, false);
|
||||
tracingEnabled = false;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -2697,8 +2704,22 @@ void debugAddTags(Transaction *tr) {
|
|||
|
||||
}
|
||||
|
||||
SpanID generateSpanID(bool tracingEnabled) {
|
||||
UID uid = deterministicRandom()->randomUniqueID();
|
||||
if (tracingEnabled) {
|
||||
return SpanID(uid.first(), uid.second());
|
||||
} else {
|
||||
return SpanID(uid.first(), 0);
|
||||
}
|
||||
return uid;
|
||||
}
|
||||
|
||||
Transaction::Transaction()
|
||||
: info(TaskPriority::DefaultEndpoint, generateSpanID(true)),
|
||||
span(info.spanID, "Transaction"_loc) {}
|
||||
|
||||
Transaction::Transaction(Database const& cx)
|
||||
: cx(cx), info(cx->taskID, deterministicRandom()->randomUniqueID()), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF),
|
||||
: cx(cx), info(cx->taskID, generateSpanID(cx->tracingEnabled)), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF),
|
||||
committedVersion(invalidVersion), versionstampPromise(Promise<Standalone<StringRef>>()), options(cx), numErrors(0),
|
||||
trLogInfo(createTrLogInfoProbabilistically(cx)), tr(info.spanID), span(info.spanID, "Transaction"_loc) {
|
||||
if (DatabaseContext::debugUseTags) {
|
||||
|
@ -4610,6 +4631,16 @@ Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(cons
|
|||
return Reference<TransactionLogInfo>();
|
||||
}
|
||||
|
||||
void Transaction::setTransactionID(uint64_t id) {
|
||||
ASSERT(getSize() == 0);
|
||||
info.spanID = SpanID(id, info.spanID.second());
|
||||
}
|
||||
|
||||
void Transaction::setToken(uint64_t token) {
|
||||
ASSERT(getSize() == 0);
|
||||
info.spanID = SpanID(info.spanID.first(), token);
|
||||
}
|
||||
|
||||
void enableClientInfoLogging() {
|
||||
ASSERT(networkOptions.logClientInfo.present() == false);
|
||||
networkOptions.logClientInfo = true;
|
||||
|
|
|
@ -291,9 +291,7 @@ public:
|
|||
void flushTrLogsIfEnabled();
|
||||
|
||||
// These are to permit use as state variables in actors:
|
||||
Transaction()
|
||||
: info(TaskPriority::DefaultEndpoint, deterministicRandom()->randomUniqueID()),
|
||||
span(info.spanID, "Transaction"_loc) {}
|
||||
Transaction();
|
||||
void operator=(Transaction&& r) noexcept;
|
||||
|
||||
void reset();
|
||||
|
@ -323,6 +321,9 @@ public:
|
|||
double startTime;
|
||||
Reference<TransactionLogInfo> trLogInfo;
|
||||
|
||||
void setTransactionID(uint64_t id);
|
||||
void setToken(uint64_t token);
|
||||
|
||||
const vector<Future<std::pair<Key, Key>>>& getExtraReadConflictRanges() const { return extraConflictRanges; }
|
||||
Standalone<VectorRef<KeyRangeRef>> readConflictRanges() const {
|
||||
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.read_conflict_ranges, tr.arena);
|
||||
|
|
|
@ -1593,6 +1593,14 @@ void ReadYourWritesTransaction::getWriteConflicts( KeyRangeMap<bool> *result ) {
|
|||
}
|
||||
}
|
||||
|
||||
void ReadYourWritesTransaction::setTransactionID(uint64_t id) {
|
||||
tr.setTransactionID(id);
|
||||
}
|
||||
|
||||
void ReadYourWritesTransaction::setToken(uint64_t token) {
|
||||
tr.setToken(token);
|
||||
}
|
||||
|
||||
Standalone<RangeResultRef> ReadYourWritesTransaction::getReadConflictRangeIntersecting(KeyRangeRef kr) {
|
||||
TEST(true); // Special keys read conflict range
|
||||
ASSERT(readConflictRangeKeysRange.contains(kr));
|
||||
|
@ -2069,12 +2077,6 @@ void ReadYourWritesTransaction::setOptionImpl( FDBTransactionOptions::Option opt
|
|||
validateOptionValue(value, false);
|
||||
options.specialKeySpaceChangeConfiguration = true;
|
||||
break;
|
||||
case FDBTransactionOptions::CUSTOM_TRANSACTION_ID:
|
||||
options.transactionId = value.present() ? value.get().toString() : Optional<std::string>();
|
||||
break;
|
||||
case FDBTransactionOptions::DISABLE_TRACING:
|
||||
options.disableTracing = value.present();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -39,8 +39,6 @@ struct ReadYourWritesTransactionOptions {
|
|||
bool disableUsedDuringCommitProtection : 1;
|
||||
bool specialKeySpaceRelaxed : 1;
|
||||
bool specialKeySpaceChangeConfiguration : 1;
|
||||
Optional<std::string> transactionId;
|
||||
bool disableTracing : 1;
|
||||
double timeoutInSeconds;
|
||||
int maxRetries;
|
||||
int snapshotRywEnabled;
|
||||
|
@ -146,6 +144,9 @@ public:
|
|||
return tr.info;
|
||||
}
|
||||
|
||||
void setTransactionID(uint64_t id);
|
||||
void setToken(uint64_t token);
|
||||
|
||||
// Read from the special key space readConflictRangeKeysRange
|
||||
Standalone<RangeResultRef> getReadConflictRangeIntersecting(KeyRangeRef kr);
|
||||
// Read from the special key space writeConflictRangeKeysRange
|
||||
|
@ -154,9 +155,6 @@ public:
|
|||
bool specialKeySpaceRelaxed() const { return options.specialKeySpaceRelaxed; }
|
||||
bool specialKeySpaceChangeConfiguration() const { return options.specialKeySpaceChangeConfiguration; }
|
||||
|
||||
Optional<std::string> transactionId() const { return options.transactionId; }
|
||||
bool disableTracing() const { return options.disableTracing; }
|
||||
|
||||
KeyRangeMap<std::pair<bool, Optional<Value>>>& getSpecialKeySpaceWriteMap() { return specialKeySpaceWriteMap; }
|
||||
bool readYourWritesDisabled() const { return options.readYourWritesDisabled; }
|
||||
const Optional<std::string>& getSpecialKeySpaceErrorMsg() { return specialKeySpaceErrorMsg; }
|
||||
|
|
|
@ -24,6 +24,11 @@
|
|||
#include "fdbclient/StatusClient.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
const std::string kTracingTransactionIdKey = "transaction_id";
|
||||
const std::string kTracingTokenKey = "token";
|
||||
}
|
||||
|
||||
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
|
||||
{ SpecialKeySpace::MODULE::TRANSACTION,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0")) },
|
||||
|
@ -1266,37 +1271,39 @@ Future<Optional<std::string>> ConsistencyCheckImpl::commit(ReadYourWritesTransac
|
|||
return Optional<std::string>();
|
||||
}
|
||||
|
||||
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {
|
||||
TraceEvent("TracingOptionsImpl::TracingOptionsImpl").detail("Range", kr);
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
Standalone<RangeResultRef> result;
|
||||
|
||||
if (kr.contains(std::string("\xff\xff/tracing/a/transaction_id"))) {
|
||||
if (ryw->transactionId().present()) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, ryw->transactionId().get()));
|
||||
} else {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, KeyRef(deterministicRandom()->randomUniqueID().toString())));
|
||||
}
|
||||
if (kr.contains(getKeyRange().begin.withSuffix(kTracingTransactionIdKey))) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, std::to_string(ryw->getTransactionInfo().spanID.first())));
|
||||
}
|
||||
|
||||
if (kr.contains(std::string("\xff\xff/tracing/a/token"))) {
|
||||
StringRef token;
|
||||
if (ryw->disableTracing()) {
|
||||
token = StringRef("0");
|
||||
} else {
|
||||
token = StringRef(deterministicRandom()->randomUniqueID().toString());
|
||||
}
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, token));
|
||||
if (kr.contains(getKeyRange().begin.withSuffix(kTracingTokenKey))) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, std::to_string(ryw->getTransactionInfo().spanID.second())));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void TracingOptionsImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
|
||||
if (key == std::string("\xff\xff/tracing/a/transaction_id")) {
|
||||
ryw->setOption(FDBTransactionOptions::CUSTOM_TRANSACTION_ID, value);
|
||||
} else if (key == std::string("\xff\xff/tracing/a/token")) {
|
||||
ryw->setOption(FDBTransactionOptions::DISABLE_TRACING, value);
|
||||
if (ryw->getApproximateSize() > 0) {
|
||||
ryw->setSpecialKeySpaceErrorMsg("tracing options must be set first");
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
if (key.endsWith(kTracingTransactionIdKey)) {
|
||||
ryw->setTransactionID(std::stoul(value.toString()));
|
||||
} else if (key.endsWith(kTracingTokenKey)) {
|
||||
if (value.toString() == "true") {
|
||||
ryw->setToken(deterministicRandom()->randomUInt64());
|
||||
} else if (value.toString() == "false") {
|
||||
ryw->setToken(0);
|
||||
} else {
|
||||
ryw->setSpecialKeySpaceErrorMsg("token must be set to true/false");
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1305,19 +1312,11 @@ Future<Optional<std::string>> TracingOptionsImpl::commit(ReadYourWritesTransacti
|
|||
}
|
||||
|
||||
void TracingOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
|
||||
if (range.contains(std::string("\xff\xff/tracing/a/transaction_id"))) {
|
||||
ryw->setOption(FDBTransactionOptions::CUSTOM_TRANSACTION_ID);
|
||||
}
|
||||
|
||||
if (range.contains(std::string("\xff\xff/tracing/a/token"))) {
|
||||
ryw->setOption(FDBTransactionOptions::DISABLE_TRACING);
|
||||
}
|
||||
ryw->setSpecialKeySpaceErrorMsg("clear range disabled");
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
void TracingOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
|
||||
if (key == std::string("\xff\xff/tracing/a/transaction_id")) {
|
||||
ryw->setOption(FDBTransactionOptions::CUSTOM_TRANSACTION_ID);
|
||||
} else if (key == std::string("\xff\xff/tracing/a/token")) {
|
||||
ryw->setOption(FDBTransactionOptions::DISABLE_TRACING);
|
||||
}
|
||||
ryw->setSpecialKeySpaceErrorMsg("clear disabled");
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
|
|
@ -182,6 +182,8 @@ description is not currently required but encouraged.
|
|||
<Option name="transaction_include_port_in_address" code="505"
|
||||
description="Addresses returned by get_addresses_for_key include the port when enabled. As of api version 630, this option is enabled by default and setting this has no effect."
|
||||
defaultFor="23"/>
|
||||
<Option name="disable_tracing" code="600"
|
||||
description="Disable tracing for all transactions." />
|
||||
</Scope>
|
||||
|
||||
<Scope name="TransactionOption">
|
||||
|
@ -266,10 +268,6 @@ description is not currently required but encouraged.
|
|||
description="By default, the special key space will only allow users to read from exactly one module (a subspace in the special key space). Use this option to allow reading from zero or more modules. Users who set this option should be prepared for new modules, which may have different behaviors than the modules they're currently reading. For example, a new module might block or return an error." />
|
||||
<Option name="special_key_space_enable_writes" code="714"
|
||||
description="By default, users are not allowed to write to special keys. Enable this option will implicitly enable all options required to achieve the configuration change." />
|
||||
<Option name="custom_transaction_id" code="715" paramType="String" paramDescription="String identifier used to identify the transaction."
|
||||
description="Use a custom identifier for the transaction. If this option is not set, a random identifier will be generated for the transaction. The identifier is used to trace a transaction throughout the system." />
|
||||
<Option name="disable_tracing" code="716"
|
||||
description="Set this option to disable tracing for this transaction." />
|
||||
<Option name="tag" code="800" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."
|
||||
description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />
|
||||
<Option name="auto_throttle_tag" code="801" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."
|
||||
|
|
|
@ -80,6 +80,10 @@ uint32_t DeterministicRandom::randomUInt32() {
|
|||
return gen64();
|
||||
}
|
||||
|
||||
uint64_t DeterministicRandom::randomUInt64() {
|
||||
return gen64();
|
||||
}
|
||||
|
||||
uint32_t DeterministicRandom::randomSkewedUInt32(uint32_t min, uint32_t maxPlusOne) {
|
||||
std::uniform_real_distribution<double> distribution(std::log(min), std::log(maxPlusOne - 1));
|
||||
double logpower = distribution(random);
|
||||
|
|
|
@ -44,6 +44,7 @@ public:
|
|||
int randomInt(int min, int maxPlusOne) override;
|
||||
int64_t randomInt64(int64_t min, int64_t maxPlusOne) override;
|
||||
uint32_t randomUInt32() override;
|
||||
uint64_t randomUInt64() override;
|
||||
uint32_t randomSkewedUInt32(uint32_t min, uint32_t maxPlusOne) override;
|
||||
UID randomUniqueID() override;
|
||||
char randomAlphaNumeric() override;
|
||||
|
|
|
@ -130,6 +130,7 @@ public:
|
|||
virtual int randomInt(int min, int maxPlusOne) = 0;
|
||||
virtual int64_t randomInt64(int64_t min, int64_t maxPlusOne) = 0;
|
||||
virtual uint32_t randomUInt32() = 0;
|
||||
virtual uint64_t randomUInt64() = 0;
|
||||
virtual UID randomUniqueID() = 0;
|
||||
virtual char randomAlphaNumeric() = 0;
|
||||
virtual std::string randomAlphaNumeric( int length ) = 0;
|
||||
|
|
Loading…
Reference in New Issue