Merge pull request #3329 from sfc-gh-mpilman/features/flatbuffers-debugtx

Request tracing
This commit is contained in:
Meng Xu 2020-06-12 14:50:19 -07:00 committed by GitHub
commit 80334351c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 661 additions and 214 deletions

View File

@ -223,11 +223,13 @@ public:
bool enableLocalityLoadBalance;
struct VersionRequest {
SpanID spanContext;
Promise<GetReadVersionReply> reply;
TagSet tags;
Optional<UID> debugID;
VersionRequest(TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>()) : tags(tags), debugID(debugID) {}
VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), tags(tags), debugID(debugID) {}
};
// Transaction start request batching

View File

@ -36,6 +36,7 @@ typedef uint64_t Sequence;
typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;
typedef UID SpanID;
enum {
tagLocalitySpecial = -1,

View File

@ -153,6 +153,7 @@ struct CommitTransactionRequest : TimedRequest {
bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
Arena arena;
SpanID spanContext;
CommitTransactionRef transaction;
ReplyPromise<CommitID> reply;
uint32_t flags;
@ -162,7 +163,7 @@ struct CommitTransactionRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transaction, reply, arena, flags, debugID);
serializer(ar, transaction, reply, arena, flags, debugID, spanContext);
}
};
@ -209,6 +210,7 @@ struct GetReadVersionRequest : TimedRequest {
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
};
SpanID spanContext;
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;
@ -219,9 +221,11 @@ struct GetReadVersionRequest : TimedRequest {
ReplyPromise<GetReadVersionReply> reply;
GetReadVersionRequest() : transactionCount(1), flags(0) {}
GetReadVersionRequest(uint32_t transactionCount, TransactionPriority priority, uint32_t flags = 0, TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(), Optional<UID> debugID = Optional<UID>())
: transactionCount(transactionCount), priority(priority), flags(flags), tags(tags), debugID(debugID)
{
GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority,
uint32_t flags = 0, TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags),
debugID(debugID) {
flags = flags & ~FLAG_PRIORITY_MASK;
switch(priority) {
case TransactionPriority::BATCH:
@ -237,12 +241,12 @@ struct GetReadVersionRequest : TimedRequest {
ASSERT(false);
}
}
bool operator < (GetReadVersionRequest const& rhs) const { return priority < rhs.priority; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply);
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext);
if(ar.isDeserializing) {
if((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {
@ -275,6 +279,7 @@ struct GetKeyServerLocationsReply {
struct GetKeyServerLocationsRequest {
constexpr static FileIdentifier file_identifier = 9144680;
Arena arena;
SpanID spanContext;
KeyRef begin;
Optional<KeyRef> end;
int limit;
@ -282,24 +287,28 @@ struct GetKeyServerLocationsRequest {
ReplyPromise<GetKeyServerLocationsReply> reply;
GetKeyServerLocationsRequest() : limit(0), reverse(false) {}
GetKeyServerLocationsRequest( KeyRef const& begin, Optional<KeyRef> const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {}
template <class Ar>
GetKeyServerLocationsRequest(SpanID spanContext, KeyRef const& begin, Optional<KeyRef> const& end, int limit,
bool reverse, Arena const& arena)
: spanContext(spanContext), begin(begin), end(end), limit(limit), reverse(reverse), arena(arena) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, limit, reverse, reply, arena);
serializer(ar, begin, end, limit, reverse, reply, spanContext, arena);
}
};
struct GetRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 12954034;
SpanID spanContext;
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;
explicit GetRawCommittedVersionRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional<UID> const& debugID = Optional<UID>()) : spanContext(spanContext), debugID(debugID) {}
explicit GetRawCommittedVersionRequest() : spanContext(), debugID() {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, debugID, reply);
serializer(ar, debugID, reply, spanContext);
}
};

View File

@ -36,6 +36,7 @@
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ManagementAPI.actor.h"
@ -46,6 +47,7 @@
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/versions.h"
#include "fdbrpc/LoadBalance.h"
#include "fdbrpc/Net2FileSystem.h"
#include "fdbrpc/simulator.h"
@ -59,12 +61,10 @@
#include "flow/Platform.h"
#include "flow/SystemMonitor.h"
#include "flow/TLSConfig.actor.h"
#include "flow/Trace.h"
#include "flow/Tracing.h"
#include "flow/UnitTest.h"
#include "flow/serialize.h"
#include "fdbclient/versions.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
@ -1539,6 +1539,7 @@ ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInt
//If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
state Span span("NAPI:getKeyLocation"_loc, { info.span->context });
if (isBackward) {
ASSERT( key != allKeys.begin && key <= allKeys.end );
} else {
@ -1552,7 +1553,10 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
when(GetKeyServerLocationsReply rep = wait(basicLoadBalance(
cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span->context, key, Optional<KeyRef>(), 100, isBackward, key.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
@ -1588,6 +1592,7 @@ Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation(Database const& c
}
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
state Span span("NAPI:getKeyRangeLocations"_loc, { info.span->context });
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
@ -1595,7 +1600,10 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply _rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span->context, keys.begin, keys.end, limit, reverse, keys.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
state GetKeyServerLocationsReply rep = _rep;
if( info.debugID.present() )
@ -1686,6 +1694,7 @@ Future<Void> Transaction::warmRange(Database cx, KeyRange keys) {
ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Database cx, TransactionInfo info, Reference<TransactionLogInfo> trLogInfo, TagSet tags )
{
state Version ver = wait( version );
state Span span("NAPI:getValue"_loc, { info.span->context });
cx->validateVersion(ver);
loop {
@ -1718,10 +1727,12 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
}
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetValueReply _reply =
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue,
GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID), TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
when(GetValueReply _reply = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue,
GetValueRequest(span->context, key, ver,
cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -1779,6 +1790,7 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
wait(success(version));
state Optional<UID> getKeyID = Optional<UID>();
state Span span("NAPI:getKey"_loc, { info.span->context });
if( info.debugID.present() ) {
getKeyID = nondeterministicRandom()->randomUniqueID();
@ -1807,9 +1819,11 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetKeyReply _reply =
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get(), cx->sampleReadTags() ? tags : Optional<TagSet>(), getKeyID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey,
GetKeyRequest(span->context, k, version.get(),
cx->sampleReadTags() ? tags : Optional<TagSet>(), getKeyID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -1842,12 +1856,15 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
}
}
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, SpanID spanContext ) {
state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext });
try {
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
when(GetReadVersionReply v = wait(basicLoadBalance(
cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(span->context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
if (v.version >= version)
@ -1863,11 +1880,14 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
}
}
ACTOR Future<Version> getRawVersion( Database cx ) {
ACTOR Future<Version> getRawVersion( Database cx, SpanID spanContext ) {
state Span span("NAPI:getRawVersion"_loc, { spanContext });
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
return v.version;
}
}
@ -1881,6 +1901,7 @@ ACTOR Future<Void> readVersionBatcher(
ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state Version ver = wait( version );
state Span span(deterministicRandom()->randomUniqueID(), "NAPI:watchValue"_loc, { info.span->context });
cx->validateVersion(ver);
ASSERT(ver != latestVersion);
@ -1897,9 +1918,11 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
}
state WatchValueReply resp;
choose {
when(WatchValueReply r = wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
WatchValueRequest(key, value, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), watchValueID),
TaskPriority::DefaultPromiseEndpoint))) {
when(WatchValueReply r = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
WatchValueRequest(span->context, key, value, ver,
cx->sampleReadTags() ? tags : Optional<TagSet>(), watchValueID),
TaskPriority::DefaultPromiseEndpoint))) {
resp = r;
}
when(wait(cx->connectionFile ? cx->connectionFile->onChange() : Never())) { wait(Never()); }
@ -1910,7 +1933,7 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
//FIXME: wait for known committed version on the storage server before replying,
//cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop)
Version v = wait(waitForCommittedVersion(cx, resp.version));
Version v = wait(waitForCommittedVersion(cx, resp.version, span->context));
//TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value);
@ -1963,6 +1986,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
KeyRange keys, GetRangeLimits limits, bool reverse, TransactionInfo info, TagSet tags )
{
state Standalone<RangeResultRef> output;
state Span span("NAPI:getExactRange"_loc, { info.span->context });
//printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
loop {
@ -1976,6 +2000,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
req.version = version;
req.begin = firstGreaterOrEqual( range.begin );
req.end = firstGreaterOrEqual( range.end );
req.spanContext = span->context;
transformRangeLimits(limits, reverse, req);
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
@ -2220,6 +2245,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
state KeySelector originalBegin = begin;
state KeySelector originalEnd = end;
state Standalone<RangeResultRef> output;
state Span span("NAPI:getRange"_loc, info.span);
try {
state Version version = wait( fVersion );
@ -2272,6 +2298,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
req.tags = cx->sampleReadTags() ? tags : Optional<TagSet>();
req.debugID = info.debugID;
req.spanContext = span->context;
try {
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
@ -2610,7 +2637,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch, Database cx, TagSet tags, Trans
}
Future<Version> Transaction::getRawReadVersion() {
return ::getRawVersion(cx);
return ::getRawVersion(cx, info.span->context);
}
Future< Void > Transaction::watch( Reference<Watch> watch ) {
@ -2964,6 +2991,7 @@ void Transaction::reset() {
void Transaction::fullReset() {
reset();
info.span = Span(info.span->location);
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
}
@ -3080,6 +3108,8 @@ ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCo
ACTOR static Future<Void> commitDummyTransaction( Database cx, KeyRange range, TransactionInfo info, TransactionOptions options ) {
state Transaction tr(cx);
state int retries = 0;
state Span span("NAPI:dummyTransaction"_loc, info.span);
tr.info.span->parents.insert(span->context);
loop {
try {
TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries);
@ -3126,6 +3156,8 @@ void Transaction::setupWatches() {
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
state TraceInterval interval( "TransactionCommit" );
state double startTime = now();
state Span span("NAPI:tryCommit"_loc, { info.span->context });
req.spanContext = span->context;
if (info.debugID.present())
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
try {
@ -3549,6 +3581,14 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
options.readTags.addTag(value.get());
break;
case FDBTransactionOptions::SPAN_PARENT:
validateOptionValue(value, true);
if (value.get().size() != 16) {
throw invalid_option_value();
}
info.span->parents.emplace(BinaryReader::fromStringRef<UID>(value.get(), Unversioned()));
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
@ -3559,13 +3599,16 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
}
}
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, TransactionPriority priority, uint32_t flags, TransactionTagMap<uint32_t> tags, Optional<UID> debugID ) {
ACTOR Future<GetReadVersionReply> getConsistentReadVersion(Span parentSpan, DatabaseContext* cx, uint32_t transactionCount,
TransactionPriority priority, uint32_t flags,
TransactionTagMap<uint32_t> tags, Optional<UID> debugID) {
state Span span("NAPI:getConsistentReadVersion"_loc, parentSpan);
try {
++cx->transactionReadVersionBatches;
if( debugID.present() )
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
loop {
state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID );
state GetReadVersionRequest req( span->context, transactionCount, priority, flags, tags, debugID );
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
@ -3616,6 +3659,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
state PromiseStream<double> replyTimes;
state PromiseStream<Error> _errorStream;
state double batchTime = 0;
state Span span("NAPI:readVersionBatcher"_loc);
loop {
send_batch = false;
choose {
@ -3626,6 +3670,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
}
span->parents.insert(req.spanContext);
requests.push_back(req.reply);
for(auto tag : req.tags) {
++tags[tag];
@ -3653,9 +3698,10 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
addActor.send(ready(timeReply(GRVReply.getFuture(), replyTimes)));
Future<Void> batch = incrementalBroadcastWithError(
getConsistentReadVersion(cx, count, priority, flags, std::move(tags), std::move(debugID)),
getConsistentReadVersion(span, cx, count, priority, flags, std::move(tags), std::move(debugID)),
std::move(requests), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
span = Span("NAPI:readVersionBatcher"_loc);
tags.clear();
debugID = Optional<UID>();
requests.clear();
@ -3665,7 +3711,11 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
}
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, TransactionPriority priority, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion, TagSet tags) {
ACTOR Future<Version> extractReadVersion(Span parentSpan, DatabaseContext* cx, TransactionPriority priority,
Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f,
bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion,
TagSet tags) {
// parentSpan here is only used to keep the parent alive until the request completes
GetReadVersionReply rep = wait(f);
double latency = now() - startTime;
cx->GRVLatencies.addSample(latency);
@ -3787,10 +3837,12 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), options.priority, flags );
}
auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID);
Span span("NAPI:getReadVersion"_loc, info.span);
auto const req = DatabaseContext::VersionRequest(span->context, options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion( cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags);
readVersion = extractReadVersion(span, cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(),
options.lockAware, startTime, metadataVersion, options.tags);
}
return readVersion;
}

View File

@ -19,6 +19,8 @@
*/
#pragma once
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H)
#define FDBCLIENT_NATIVEAPI_ACTOR_G_H
#include "fdbclient/NativeAPI.actor.g.h"
@ -147,13 +149,16 @@ class ReadYourWritesTransaction; // workaround cyclic dependency
struct TransactionInfo {
Optional<UID> debugID;
TaskPriority taskID;
Span span;
bool useProvisionalProxies;
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
explicit TransactionInfo(TaskPriority taskID)
: taskID(taskID), span(deterministicRandom()->randomUniqueID(), "Transaction"_loc), useProvisionalProxies(false) {
}
};
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
@ -329,7 +334,7 @@ private:
Future<Void> committing;
};
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version);
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext);
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx, KeyRange keys,
int shardLimit);

View File

@ -1163,6 +1163,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
options(tr), deferredError(cx->deferredError), versionStampFuture(tr.getVersionstamp()) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
std::back_inserter(persistentOptions));
debugTransaction( deterministicRandom()->randomUniqueID() );
applyPersistentOptions();
}

View File

@ -169,6 +169,7 @@ struct GetValueReply : public LoadBalancedReply {
struct GetValueRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 8454530;
SpanID spanContext;
Key key;
Version version;
Optional<TagSet> tags;
@ -176,11 +177,12 @@ struct GetValueRequest : TimedRequest {
ReplyPromise<GetValueReply> reply;
GetValueRequest(){}
GetValueRequest(const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID)
: spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, version, tags, debugID, reply);
serializer(ar, key, version, tags, debugID, reply, spanContext);
}
};
@ -200,6 +202,7 @@ struct WatchValueReply {
struct WatchValueRequest {
constexpr static FileIdentifier file_identifier = 14747733;
SpanID spanContext;
Key key;
Optional<Value> value;
Version version;
@ -208,11 +211,13 @@ struct WatchValueRequest {
ReplyPromise<WatchValueReply> reply;
WatchValueRequest(){}
WatchValueRequest(const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
WatchValueRequest(SpanID spanContext, const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, value, version, tags, debugID, reply);
serializer(ar, key, value, version, tags, debugID, reply, spanContext);
}
};
@ -234,6 +239,7 @@ struct GetKeyValuesReply : public LoadBalancedReply {
struct GetKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
KeySelectorRef begin, end;
Version version; // or latestVersion
@ -246,7 +252,7 @@ struct GetKeyValuesRequest : TimedRequest {
GetKeyValuesRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, arena);
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
}
};
@ -266,6 +272,7 @@ struct GetKeyReply : public LoadBalancedReply {
struct GetKeyRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 10457870;
SpanID spanContext;
Arena arena;
KeySelectorRef sel;
Version version; // or latestVersion
@ -274,11 +281,13 @@ struct GetKeyRequest : TimedRequest {
ReplyPromise<GetKeyReply> reply;
GetKeyRequest() {}
GetKeyRequest(KeySelectorRef const& sel, Version version, Optional<TagSet> tags, Optional<UID> debugID) : sel(sel), version(version), debugID(debugID) {}
GetKeyRequest(SpanID spanContext, KeySelectorRef const& sel, Version version, Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), sel(sel), version(version), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, sel, version, tags, debugID, reply, arena);
serializer(ar, sel, version, tags, debugID, reply, spanContext, arena);
}
};

View File

@ -268,6 +268,8 @@ description is not currently required but encouraged.
description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="auto_throttle_tag" code="801" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."
description="Adds a tag to the transaction that can be used to apply manual or automatic targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="span_parent" code="900" paramType="Bytes" paramDescription="A byte string of length 16 used to associate the span of this transaction with a parent"
description="Adds a parent to the Span of this transaction. Used for transaction tracing. A span can be identified with any 16 bytes"/>
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -32,6 +32,8 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define SevDebugMemory SevVerbose
@ -429,8 +431,9 @@ struct BackupData {
}
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
state Span span(deterministicRandom()->randomUniqueID(), "BA:GetMinCommittedVersion"_loc);
loop {
GetReadVersionRequest request(1, TransactionPriority::DEFAULT,
GetReadVersionRequest request(span->context, 1, TransactionPriority::DEFAULT,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onMasterProxiesChanged())) {}

View File

@ -155,18 +155,21 @@ struct GetCommitVersionReply {
struct GetCommitVersionRequest {
constexpr static FileIdentifier file_identifier = 16683181;
SpanID spanContext;
uint64_t requestNum;
uint64_t mostRecentProcessedRequestNum;
UID requestingProxy;
ReplyPromise<GetCommitVersionReply> reply;
GetCommitVersionRequest() { }
GetCommitVersionRequest(uint64_t requestNum, uint64_t mostRecentProcessedRequestNum, UID requestingProxy)
: requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum), requestingProxy(requestingProxy) {}
GetCommitVersionRequest(SpanID spanContext, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum,
UID requestingProxy)
: spanContext(spanContext), requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum),
requestingProxy(requestingProxy) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply);
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply, spanContext);
}
};

View File

@ -44,10 +44,13 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/Stats.h"
#include "flow/TDMetric.actor.h"
#include "flow/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include <tuple>
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) {
state ReplyPromise<Void> reply = req.reply;
@ -287,9 +290,9 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
ACTOR Future<Void> queueTransactionStartRequests(
Reference<AsyncVar<ServerDBInfo>> db,
Deque<GetReadVersionRequest> *systemQueue,
Deque<GetReadVersionRequest> *defaultQueue,
Deque<GetReadVersionRequest> *batchQueue,
SpannedDeque<GetReadVersionRequest> *systemQueue,
SpannedDeque<GetReadVersionRequest> *defaultQueue,
SpannedDeque<GetReadVersionRequest> *batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
@ -326,9 +329,11 @@ ACTOR Future<Void> queueTransactionStartRequests(
if (req.priority >= TransactionPriority::IMMEDIATE) {
stats->txnSystemPriorityStartIn += req.transactionCount;
systemQueue->push_back(req);
systemQueue->span->parents.insert(req.spanContext);
} else if (req.priority >= TransactionPriority::DEFAULT) {
stats->txnDefaultPriorityStartIn += req.transactionCount;
defaultQueue->push_back(req);
defaultQueue->span->parents.insert(req.spanContext);
} else {
// Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.proxies.size(), 1);
@ -340,6 +345,7 @@ ACTOR Future<Void> queueTransactionStartRequests(
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
batchQueue->span->parents.insert(req.spanContext);
}
}
}
@ -505,8 +511,11 @@ struct ResolutionRequestBuilder {
// [CommitTransactionRef_Index][Resolver_Index][Read_Conflict_Range_Index_on_Resolver]
// -> read_conflict_range's original index in the commitTransactionRef
ResolutionRequestBuilder( ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion) : self(self), requests(self->resolvers.size()) {
for(auto& req : requests) {
ResolutionRequestBuilder(ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion,
Span& parentSpan)
: self(self), requests(self->resolvers.size()) {
for (auto& req : requests) {
req.spanContext = parentSpan->context;
req.prevVersion = prevVersion;
req.version = version;
req.lastReceivedVersion = lastReceivedVersion;
@ -790,6 +799,7 @@ ACTOR Future<Void> commitBatch(
state Optional<UID> debugID;
state bool forceRecovery = false;
state int batchOperations = 0;
state Span span("MP:commitBatch"_loc);
int64_t batchBytes = 0;
for (int t = 0; t<trs.size(); t++) {
batchOperations += trs[t].transaction.mutations.size();
@ -812,6 +822,7 @@ ACTOR Future<Void> commitBatch(
debugID = nondeterministicRandom()->randomUniqueID();
g_traceBatch.addAttach("CommitAttachID", trs[t].debugID.get().first(), debugID.get().first());
}
span->parents.insert(trs[t].spanContext);
}
if(localBatchNumber == 2 && !debugID.present() && self->firstProxy && !g_network->isSimulated()) {
@ -832,7 +843,7 @@ ACTOR Future<Void> commitBatch(
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
GetCommitVersionRequest req(span->context, self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
self->mostRecentProcessedRequestNumber = versionReply.requestNum;
@ -853,7 +864,7 @@ ACTOR Future<Void> commitBatch(
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version, span );
int conflictRangeCount = 0;
state int64_t maxTransactionBytes = 0;
for (int t = 0; t<trs.size(); t++) {
@ -1166,27 +1177,32 @@ ACTOR Future<Void> commitBatch(
// We prevent this by limiting the number of versions which are semi-committed but not fully committed to be less than the MVCC window
if(self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
computeDuration += g_network->timer() - computeStart;
state Span waitVersionSpan;
while (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
// This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
waitVersionSpan = Span(deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, {span->context});
choose{
when(wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
wait(yield());
break;
}
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(GetReadVersionRequest(0, TransactionPriority::IMMEDIATE, GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if(v.version > self->committedVersion.get()) {
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(
GetReadVersionRequest(waitVersionSpan->context, 0, TransactionPriority::IMMEDIATE,
GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if (v.version > self->committedVersion.get()) {
self->locked = v.locked;
self->metadataVersion = v.metadataVersion;
self->committedVersion.set(v.version);
}
if (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)
wait(delay(SERVER_KNOBS->PROXY_SPIN_DELAY));
}
}
}
waitVersionSpan = Span{};
computeStart = g_network->timer();
}
@ -1381,18 +1397,19 @@ ACTOR Future<Void> updateLastCommit(ProxyCommitData* self, Optional<UID> debugID
return Void();
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(Span parentSpan, ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
{
// Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (1) The version returned is the committedVersion of some proxy at some point before the request returns, so it is committed.
// (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
// and no other proxy could have already committed anything without first ending the epoch
state Span span("MP:getLiveCommittedVersion"_loc, parentSpan);
++commitData->stats.txnStartBatch;
state vector<Future<GetReadVersionReply>> proxyVersions;
for (auto const& p : *otherProxies)
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(span->context, debugID), TaskPriority::TLogConfirmRunningReply)));
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
wait(updateLastCommit(commitData, debugID));
@ -1492,15 +1509,16 @@ ACTOR static Future<Void> transactionStarter(
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state Deque<GetReadVersionRequest> systemQueue;
state Deque<GetReadVersionRequest> defaultQueue;
state Deque<GetReadVersionRequest> batchQueue;
state SpannedDeque<GetReadVersionRequest> systemQueue("MP:transactionStarterSystemQueue"_loc);
state SpannedDeque<GetReadVersionRequest> defaultQueue("MP:transactionStarterDefaultQueue"_loc);
state SpannedDeque<GetReadVersionRequest> batchQueue("MP:transactionStarterBatchQueue"_loc);
state vector<MasterProxyInterface> otherProxies;
state TransactionTagMap<uint64_t> transactionTagCounter;
state PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
state PromiseStream<double> replyTimes;
state Span span;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
@ -1542,13 +1560,16 @@ ACTOR static Future<Void> transactionStarter(
int requestsToStart = 0;
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
Deque<GetReadVersionRequest>* transactionQueue;
SpannedDeque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
transactionQueue = &systemQueue;
span = systemQueue.resetSpan();
} else if(!defaultQueue.empty()) {
transactionQueue = &defaultQueue;
span = defaultQueue.resetSpan();
} else if(!batchQueue.empty()) {
transactionQueue = &batchQueue;
span = batchQueue.resetSpan();
} else {
break;
}
@ -1613,7 +1634,9 @@ ACTOR static Future<Void> transactionStarter(
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(
span, commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i],
defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats,
commitData->minKnownCommittedVersion, throttledTags));
@ -1623,6 +1646,7 @@ ACTOR static Future<Void> transactionStarter(
}
}
}
span.reset();
}
}
@ -2081,6 +2105,7 @@ ACTOR Future<Void> masterProxyServerCore(
}
when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
//TraceEvent("ProxyGetRCV", proxy.id());
Span span("MP:getRawCommittedReadVersion"_loc, { req.spanContext });
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
GetReadVersionReply rep;

View File

@ -20,6 +20,9 @@
#ifndef FDBSERVER_RESOLVERINTERFACE_H
#define FDBSERVER_RESOLVERINTERFACE_H
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/fdbrpc.h"
#pragma once
#include "fdbrpc/Locality.h"
@ -94,17 +97,19 @@ struct ResolveTransactionBatchRequest {
constexpr static FileIdentifier file_identifier = 16462858;
Arena arena;
SpanID spanContext;
Version prevVersion;
Version version; // FIXME: ?
Version lastReceivedVersion;
VectorRef<CommitTransactionRef> transactions;
VectorRef<struct CommitTransactionRef> transactions;
VectorRef<int> txnStateTransactions; // Offsets of elements of transactions that have (transaction subsystem state) mutations
ReplyPromise<ResolveTransactionBatchReply> reply;
Optional<UID> debugID;
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena, debugID);
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena,
debugID, spanContext);
}
};

View File

@ -447,7 +447,6 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageCacheData* data, Version ve
ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
state int64_t resultSize = 0;
try {
++data->counters.getValueQueries;
++data->counters.allQueries;
@ -457,12 +456,13 @@ ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
//TODO what's this?
wait( delay(0, TaskPriority::DefaultEndpoint) );
if( req.debugID.present() )
if( req.debugID.present() ) {
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
//FIXME
}
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );

View File

@ -20,6 +20,9 @@
// There's something in one of the files below that defines a macros
// a macro that makes boost interprocess break on Windows.
#include "flow/Tracing.h"
#include <cctype>
#include <iterator>
#define BOOST_DATE_TIME_NO_LIB
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/algorithm/string.hpp>
@ -78,7 +81,7 @@
// clang-format off
enum {
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_NEWCONSOLE,
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_TRACER, OPT_NEWCONSOLE,
OPT_NOBOX, OPT_TESTFILE, OPT_RESTARTING, OPT_RESTORING, OPT_RANDOMSEED, OPT_KEY, OPT_MEMLIMIT, OPT_STORAGEMEMLIMIT, OPT_CACHEMEMLIMIT, OPT_MACHINEID,
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACECLOCK,
OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
@ -111,6 +114,7 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_MAXLOGSSIZE, "--maxlogssize", SO_REQ_SEP },
{ OPT_LOGGROUP, "--loggroup", SO_REQ_SEP },
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
{ OPT_TRACER, "--tracer", SO_REQ_SEP },
#ifdef _WIN32
{ OPT_NEWCONSOLE, "-n", SO_NONE },
{ OPT_NEWCONSOLE, "--newconsole", SO_NONE },
@ -514,6 +518,9 @@ static void printUsage( const char *name, bool devhelp ) {
printf(" --trace_format FORMAT\n"
" Select the format of the log files. xml (the default) and json\n"
" are supported.\n");
printf(" --tracer TRACER\n"
" Select a tracer for transaction tracing. Currently disabled\n"
" (the default) and log_file are supported.\n");
printf(" -i ID, --machine_id ID\n"
" Machine and zone identifier key (up to 16 hex characters).\n"
" Defaults to a random value shared by all fdbserver processes\n"
@ -1169,6 +1176,22 @@ private:
break;
}
#endif
case OPT_TRACER:
{
std::string arg = args.OptionArg();
std::string tracer;
std::transform(arg.begin(), arg.end(), std::back_inserter(tracer), [](char c) { return tolower(c); });
if (tracer == "none" || tracer == "disabled") {
openTracer(TracerType::DISABLED);
} else if (tracer == "logfile" || tracer == "file" || tracer == "log_file") {
openTracer(TracerType::LOG_FILE);
} else {
fprintf(stderr, "ERROR: Unknown or unsupported tracer: `%s'", args.OptionArg());
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
break;
}
case OPT_TESTFILE:
testFile = args.OptionArg();
break;

View File

@ -915,6 +915,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
}
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
state Span span("M:getVersion"_loc, { req.spanContext });
state std::map<UID, ProxyVersionReplies>::iterator proxyItr = self->lastProxyVersionReplies.find(req.requestingProxy); // lastProxyVersionReplies never changes
if (proxyItr == self->lastProxyVersionReplies.end()) {

View File

@ -21,6 +21,9 @@
#include <cinttypes>
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "flow/IndexedSet.h"
#include "flow/Hash3.h"
#include "flow/ActorCollection.h"
@ -712,7 +715,7 @@ public:
}
template<class Request, class HandleFunction>
Future<Void> readGuard(const Request& request, const HandleFunction& fun) {
Future<Void> readGuard(const Span& parentSpan, const Request& request, const HandleFunction& fun) {
auto rate = currentRate();
if (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD && deterministicRandom()->random01() > std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE, rate/SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD)) {
//request.error = future_version();
@ -720,7 +723,7 @@ public:
++counters.readsRejected;
return Void();
}
return fun(this, request);
return fun(this, request, parentSpan);
}
};
@ -846,7 +849,8 @@ updateProcessStats(StorageServer* self)
#pragma region Queries
#endif
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version) {
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) {
state Span span("SS.WaitForVersion"_loc, { spanContext });
choose {
when(wait(data->version.whenAtLeast(version))) {
// FIXME: A bunch of these can block with or without the following delay 0.
@ -865,7 +869,7 @@ ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version)
}
}
Future<Version> waitForVersion(StorageServer* data, Version version) {
Future<Version> waitForVersion(StorageServer* data, Version version, SpanID spanContext) {
if (version == latestVersion) {
version = std::max(Version(1), data->version.get());
}
@ -883,7 +887,7 @@ Future<Version> waitForVersion(StorageServer* data, Version version) {
if (deterministicRandom()->random01() < 0.001) {
TraceEvent("WaitForVersion1000x");
}
return waitForVersionActor(data, version);
return waitForVersionActor(data, version, spanContext);
}
ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
@ -907,7 +911,7 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version versi
}
}
ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req, Span span ) {
state int64_t resultSize = 0;
try {
@ -924,7 +928,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
@ -1008,7 +1012,8 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
return Void();
};
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req ) {
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req, SpanID parent ) {
state Span span("SS:WatchValueImpl"_loc, { parent });
try {
++data->counters.watchQueries;
@ -1023,9 +1028,11 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
try {
state Version latest = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(req.key);
GetValueRequest getReq( req.key, latest, req.tags, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
state Span getValueSpan(deterministicRandom()->randomUniqueID(), "SS:GetValue"_loc, { span->context });
GetValueRequest getReq( getValueSpan->context, req.key, latest, req.tags, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq, span ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
getValueSpan.reset();
//TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest);
if(reply.error.present()) {
@ -1072,8 +1079,8 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
return Void();
}
ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
state Future<Void> watch = watchValue_impl( data, req );
ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req, Span span ) {
state Future<Void> watch = watchValue_impl( data, req, span->context );
state double startTime = now();
loop {
@ -1178,7 +1185,7 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
// If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
// readRange has O(|result|) + O(log |data|) cost
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes ) {
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes, Span parentSpan ) {
state GetKeyValuesReply result;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
state StorageServer::VersionedData::iterator vCurrent = view.end();
@ -1186,6 +1193,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount = 0;
state Span span("SS:readRange"_loc, parentSpan);
// for caching the storage queue results during the first PTree traversal
state VectorRef<KeyValueRef> resultCache;
@ -1357,7 +1365,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
// return sel.getKey() >= range.begin && (sel.isBackward() ? sel.getKey() <= range.end : sel.getKey() < range.end);
//}
ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset)
ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset, SpanID parentSpan)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
// If it is found, offset is set to 0 and a key is returned which falls inside range.
@ -1374,6 +1382,7 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
state int sign = forward ? +1 : -1;
state bool skipEqualKey = sel.orEqual == forward;
state int distance = forward ? sel.offset : 1-sel.offset;
state Span span("SS.findKey"_loc, { parentSpan });
//Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from the read range in this case)
state int maxBytes;
@ -1382,14 +1391,18 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
else
maxBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_LIMIT_BYTES : SERVER_KNOBS->STORAGE_LIMIT_BYTES;
state GetKeyValuesReply rep = wait( readRange( data, version, forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())), (distance + skipEqualKey)*sign, &maxBytes ) );
state GetKeyValuesReply rep = wait(
readRange(data, version,
forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())),
(distance + skipEqualKey) * sign, &maxBytes, span));
state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
//If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in a loop
if(more && !forward && rep.data.size() == 1) {
TEST(true); //Reverse key selector returned only one result in range read
maxBytes = std::numeric_limits<int>::max();
GetKeyValuesReply rep2 = wait( readRange( data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes ) );
GetKeyValuesReply rep2 =
wait(readRange(data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes, span));
rep = rep2;
more = rep.more && rep.data.size() != distance + skipEqualKey;
ASSERT(rep.data.size() == 2 || !more);
@ -1444,7 +1457,7 @@ KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
return i->range();
}
ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req, Span span )
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents
// all data from being read in one range read
{
@ -1469,7 +1482,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
try {
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
state Version version = wait( waitForVersion( data, req.version ) );
state Version version = wait( waitForVersion( data, req.version, span->context ) );
state uint64_t changeCounter = data->shardChangeCounter;
// try {
@ -1487,8 +1500,8 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
state int offset1;
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1 );
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1, span->context );
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2, span->context );
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
if( req.debugID.present() )
@ -1522,7 +1535,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
} else {
state int remainingLimitBytes = req.limitBytes;
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span) );
GetKeyValuesReply r = _r;
if( req.debugID.present() )
@ -1584,7 +1597,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
return Void();
}
ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req ) {
ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req, Span span ) {
state int64_t resultSize = 0;
++data->counters.getKeyQueries;
@ -1597,12 +1610,12 @@ ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req ) {
wait( delay(0, TaskPriority::DefaultEndpoint) );
try {
state Version version = wait( waitForVersion( data, req.version ) );
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
state uint64_t changeCounter = data->shardChangeCounter;
state KeyRange shard = getShardKeyRange( data, req.sel );
state int offset;
Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
Key k = wait( findKey( data, req.sel, version, shard, &offset, req.spanContext ) );
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
@ -3663,6 +3676,7 @@ ACTOR Future<Void> checkBehind( StorageServer* self ) {
ACTOR Future<Void> serveGetValueRequests( StorageServer* self, FutureStream<GetValueRequest> getValue ) {
loop {
GetValueRequest req = waitNext(getValue);
Span span("SS:getValue"_loc, { req.spanContext });
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
@ -3670,32 +3684,35 @@ ACTOR Future<Void> serveGetValueRequests( StorageServer* self, FutureStream<GetV
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
req.reply.send(GetValueReply());
else
self->actors.add(self->readGuard(req , getValueQ));
self->actors.add(self->readGuard(span, req , getValueQ));
}
}
ACTOR Future<Void> serveGetKeyValuesRequests( StorageServer* self, FutureStream<GetKeyValuesRequest> getKeyValues ) {
loop {
GetKeyValuesRequest req = waitNext(getKeyValues);
Span span("SS:getKeyValues"_loc, { req.spanContext });
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
self->actors.add(self->readGuard(req, getKeyValuesQ));
self->actors.add(self->readGuard(span, req, getKeyValuesQ));
}
}
ACTOR Future<Void> serveGetKeyRequests( StorageServer* self, FutureStream<GetKeyRequest> getKey ) {
loop {
GetKeyRequest req = waitNext(getKey);
Span span("SS:getKey"_loc, { req.spanContext });
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
self->actors.add(self->readGuard(req , getKeyQ));
self->actors.add(self->readGuard(span, req , getKeyQ));
}
}
ACTOR Future<Void> serveWatchValueRequests( StorageServer* self, FutureStream<WatchValueRequest> watchValue ) {
loop {
WatchValueRequest req = waitNext(watchValue);
Span span("SS:watchValue"_loc, { req.spanContext });
// TODO: fast load balancing?
// SOMEDAY: combine watches for the same key/value into a single watch
self->actors.add(self->readGuard(req, watchValueQ));
self->actors.add(self->readGuard(span, req, watchValueQ));
}
}

View File

@ -21,6 +21,7 @@
#include <math.h>
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -376,12 +377,16 @@ struct ConsistencyCheckWorkload : TestWorkload
state Key begin = keyServersKeys.begin;
state Key end = keyServersKeys.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);
while (begin < end) {
state Reference<ProxyInfo> proxyInfo = wait(cx->getMasterProxiesFuture(false));
keyServerLocationFutures.clear();
for (int i = 0; i < proxyInfo->size(); i++)
keyServerLocationFutures.push_back(proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(begin, end, limitKeyServers, false, Arena()), 2, 0));
keyServerLocationFutures.push_back(
proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations)
.getReplyUnlessFailedFor(
GetKeyServerLocationsRequest(span->context, begin, end, limitKeyServers, false, Arena()), 2, 0));
state bool keyServersInsertedForThisIteration = false;
choose {

View File

@ -18,15 +18,21 @@
* limitations under the License.
*/
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/serialize.h"
#include <cstring>
struct CycleWorkload : TestWorkload {
int actorCount, nodeCount;
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond;
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond, traceParentProbability;
Key keyPrefix;
vector<Future<Void>> clients;
@ -38,12 +44,13 @@ struct CycleWorkload : TestWorkload {
transactions("Transactions"), retries("Retries"), totalLatency("Latency"),
tooOldRetries("Retries.too_old"), commitFailedRetries("Retries.commit_failed")
{
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 ) / clientCount;
actorCount = getOption( options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5 );
nodeCount = getOption(options, LiteralStringRef("nodeCount"), transactionsPerSecond * clientCount);
keyPrefix = unprintable( getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("")).toString() );
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, LiteralStringRef("expectedRate"), 0.7);
testDuration = getOption( options, "testDuration"_sr, 10.0 );
transactionsPerSecond = getOption( options, "transactionsPerSecond"_sr, 5000.0 ) / clientCount;
actorCount = getOption( options, "actorsPerClient"_sr, transactionsPerSecond / 5 );
nodeCount = getOption(options, "nodeCount"_sr, transactionsPerSecond * clientCount);
keyPrefix = unprintable( getOption(options, "keyPrefix"_sr, LiteralStringRef("")).toString() );
traceParentProbability = getOption(options, "traceParentProbability "_sr, 0.01);
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, "expectedRate"_sr, 0.7);
}
virtual std::string description() { return "CycleWorkload"; }
@ -98,6 +105,12 @@ struct CycleWorkload : TestWorkload {
state double tstart = now();
state int r = deterministicRandom()->randomInt(0, self->nodeCount);
state Transaction tr(cx);
if (deterministicRandom()->random01() >= self->traceParentProbability) {
state Span span("CycleClient"_loc);
TraceEvent("CycleTracingTransaction", span->context);
tr.setOption(FDBTransactionOptions::SPAN_PARENT,
BinaryWriter::toValue(span->context, Unversioned()));
}
while (true) {
try {
// Reverse next and next^2 node

View File

@ -631,6 +631,9 @@ struct Traceable<Standalone<T>> : std::conditional<Traceable<T>::value, std::tru
};
#define LiteralStringRef( str ) StringRef( (const uint8_t*)(str), sizeof((str))-1 )
inline StringRef operator "" _sr(const char* str, size_t size) {
return StringRef(reinterpret_cast<const uint8_t*>(str), size);
}
// makeString is used to allocate a Standalone<StringRef> of a known length for later
// mutation (via mutateString). If you need to append to a string of unknown length,

View File

@ -28,6 +28,7 @@ set(FLOW_SRCS
IRandom.h
IThreadPool.cpp
IThreadPool.h
ITrace.h
IndexedSet.actor.h
IndexedSet.cpp
IndexedSet.h
@ -61,6 +62,8 @@ set(FLOW_SRCS
ThreadSafeQueue.h
Trace.cpp
Trace.h
Tracing.h
Tracing.cpp
TreeBenchmark.h
UnitTest.cpp
UnitTest.h

View File

@ -48,6 +48,36 @@
#include <fcntl.h>
#include <cmath>
struct IssuesListImpl {
IssuesListImpl(){}
void addIssue(std::string issue) {
MutexHolder h(mutex);
issues.insert(issue);
}
void retrieveIssues(std::set<std::string>& out) {
MutexHolder h(mutex);
for (auto const& i : issues) {
out.insert(i);
}
}
void resolveIssue(std::string issue) {
MutexHolder h(mutex);
issues.erase(issue);
}
private:
Mutex mutex;
std::set<std::string> issues;
};
IssuesList::IssuesList() : impl(new IssuesListImpl{}) {}
IssuesList::~IssuesList() { delete impl; }
void IssuesList::addIssue(std::string issue) { impl->addIssue(issue); }
void IssuesList::retrieveIssues(std::set<std::string> &out) { impl->retrieveIssues(out); }
void IssuesList::resolveIssue(std::string issue) { impl->resolveIssue(issue); }
FileTraceLogWriter::FileTraceLogWriter(std::string directory, std::string processName, std::string basename,
std::string extension, uint64_t maxLogsSize, std::function<void()> onError,
Reference<ITraceLogIssuesReporter> issues)
@ -72,8 +102,16 @@ void FileTraceLogWriter::lastError(int err) {
}
void FileTraceLogWriter::write(const std::string& str) {
auto ptr = str.c_str();
int remaining = str.size();
write(str.data(), str.size());
}
void FileTraceLogWriter::write(const StringRef& str) {
write(reinterpret_cast<const char*>(str.begin()), str.size());
}
void FileTraceLogWriter::write(const char* str, size_t len) {
auto ptr = str;
int remaining = len;
bool needsResolve = false;
while ( remaining ) {

View File

@ -23,11 +23,29 @@
#define FLOW_FILE_TRACE_LOG_WRITER_H
#pragma once
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/Trace.h"
#include <functional>
struct IssuesListImpl;
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList();
virtual ~IssuesList();
void addIssue(std::string issue) override;
void retrieveIssues(std::set<std::string>& out) override;
void resolveIssue(std::string issue) override;
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() { ThreadSafeReferenceCounted<IssuesList>::delref(); }
private:
IssuesListImpl* impl;
};
class FileTraceLogWriter : public ITraceLogWriter, ReferenceCounted<FileTraceLogWriter> {
private:
std::string directory;
@ -42,6 +60,8 @@ private:
std::function<void()> onError;
void write(const char* str, size_t size);
public:
FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension,
uint64_t maxLogsSize, std::function<void()> onError, Reference<ITraceLogIssuesReporter> issues);
@ -51,11 +71,12 @@ public:
void lastError(int err);
void write(const std::string& str);
void open();
void close();
void roll();
void sync();
void write(const std::string& str) override;
void write(StringRef const& str) override;
void open() override;
void close() override;
void roll() override;
void sync() override;
void cleanupTraceFiles();
};

View File

@ -109,5 +109,41 @@ private:
Reference<IThreadPool> createGenericThreadPool();
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
DummyThreadPool() : thread(NULL) {}
Future<Void> getError() {
return errors.getFuture();
}
void addThread( IThreadPoolReceiver* userData ) {
ASSERT( !thread );
thread = userData;
}
void post( PThreadAction action ) {
try {
(*action)( thread );
} catch (Error& e) {
errors.sendError( e );
} catch (...) {
errors.sendError( unknown_error() );
}
}
Future<Void> stop(Error const& e) {
return Void();
}
void addref() {
ReferenceCounted<DummyThreadPool>::addref();
}
void delref() {
ReferenceCounted<DummyThreadPool>::delref();
}
private:
IThreadPoolReceiver* thread;
Promise<Void> errors;
};
#endif

61
flow/ITrace.h Normal file
View File

@ -0,0 +1,61 @@
/*
* ITrace.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <set>
class StringRef;
struct ITraceLogWriter {
virtual void open() = 0;
virtual void roll() = 0;
virtual void close() = 0;
virtual void write(const std::string&) = 0;
virtual void write(const StringRef&) = 0;
virtual void sync() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
class TraceEventFields;
struct ITraceLogFormatter {
virtual const char* getExtension() = 0;
virtual const char* getHeader() = 0; // Called when starting a new file
virtual const char* getFooter() = 0; // Called when ending a file
virtual std::string formatEvent(const TraceEventFields&) = 0; // Called for each event
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogIssuesReporter {
virtual ~ITraceLogIssuesReporter();
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
virtual void retrieveIssues(std::set<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};

View File

@ -21,6 +21,7 @@
#include "flow/Trace.h"
#include "flow/FileTraceLogWriter.h"
#include "flow/Knobs.h"
#include "flow/XmlTraceLogFormatter.h"
#include "flow/JsonTraceLogFormatter.h"
#include "flow/flow.h"
@ -54,40 +55,7 @@
// during an open trace event
thread_local int g_allocation_tracing_disabled = 1;
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
DummyThreadPool() : thread(NULL) {}
Future<Void> getError() {
return errors.getFuture();
}
void addThread( IThreadPoolReceiver* userData ) {
ASSERT( !thread );
thread = userData;
}
void post( PThreadAction action ) {
try {
(*action)( thread );
} catch (Error& e) {
errors.sendError( e );
} catch (...) {
errors.sendError( unknown_error() );
}
}
Future<Void> stop(Error const& e) {
return Void();
}
void addref() {
ReferenceCounted<DummyThreadPool>::addref();
}
void delref() {
ReferenceCounted<DummyThreadPool>::delref();
}
private:
IThreadPoolReceiver* thread;
Promise<Void> errors;
};
ITraceLogIssuesReporter::~ITraceLogIssuesReporter() {}
struct SuppressionMap {
struct SuppressionInfo {
@ -229,33 +197,6 @@ public:
}
};
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList(){};
void addIssue(std::string issue) override {
MutexHolder h(mutex);
issues.insert(issue);
}
void retrieveIssues(std::set<std::string>& out) override {
MutexHolder h(mutex);
for (auto const& i : issues) {
out.insert(i);
}
}
void resolveIssue(std::string issue) override {
MutexHolder h(mutex);
issues.erase(issue);
}
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() { ThreadSafeReferenceCounted<IssuesList>::delref(); }
private:
Mutex mutex;
std::set<std::string> issues;
};
Reference<IssuesList> issues;
Reference<BarrierList> barriers;

View File

@ -31,6 +31,7 @@
#include <type_traits>
#include "flow/IRandom.h"
#include "flow/Error.h"
#include "flow/ITrace.h"
#define TRACE_DEFAULT_ROLL_SIZE (10 << 20)
#define TRACE_DEFAULT_MAX_LOGS_SIZE (10 * TRACE_DEFAULT_ROLL_SIZE)
@ -516,36 +517,7 @@ private:
bool init( struct TraceInterval& );
};
struct ITraceLogWriter {
virtual void open() = 0;
virtual void roll() = 0;
virtual void close() = 0;
virtual void write(const std::string&) = 0;
virtual void sync() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogFormatter {
virtual const char* getExtension() = 0;
virtual const char* getHeader() = 0; // Called when starting a new file
virtual const char* getFooter() = 0; // Called when ending a file
virtual std::string formatEvent(const TraceEventFields&) = 0; // Called for each event
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogIssuesReporter {
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
virtual void retrieveIssues(std::set<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
class StringRef;
struct TraceInterval {
TraceInterval( const char* type ) : count(-1), type(type), severity(SevInfo) {}

82
flow/Tracing.cpp Normal file
View File

@ -0,0 +1,82 @@
/*
* Tracing.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/Tracing.h"
namespace {
struct NoopTracer : ITracer {
TracerType type() const { return TracerType::DISABLED; }
void trace(SpanImpl* span) override { delete span; }
};
struct LogfileTracer : ITracer {
TracerType type() const { return TracerType::LOG_FILE; }
void trace(SpanImpl* span) override {
TraceEvent te(SevInfo, "TracingSpan", span->context);
te.detail("Location", span->location.name).detail("Begin", span->begin).detail("End", span->end);
if (span->parents.size() == 1) {
te.detail("Parent", *span->parents.begin());
} else {
for (auto parent : span->parents) {
TraceEvent(SevInfo, "TracingSpanAddParent", span->context).detail("AddParent", parent);
}
}
}
};
ITracer* g_tracer = new NoopTracer();
} // namespace
void openTracer(TracerType type) {
if (g_tracer->type() == type) {
return;
}
delete g_tracer;
switch (type) {
case TracerType::DISABLED:
g_tracer = new NoopTracer{};
break;
case TracerType::LOG_FILE:
g_tracer = new LogfileTracer{};
break;
}
}
ITracer::~ITracer() {}
SpanImpl::SpanImpl(UID context, Location location, std::unordered_set<UID> const& parents)
: context(context), location(location), parents(parents) {
begin = g_network->now();
}
SpanImpl::~SpanImpl() {}
void SpanImpl::addref() {
++refCount;
}
void SpanImpl::delref() {
if (--refCount == 0) {
end = g_network->now();
g_tracer->trace(this);
}
}

114
flow/Tracing.h Normal file
View File

@ -0,0 +1,114 @@
/*
* Tracing.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/FDBTypes.h"
#include "flow/IRandom.h"
#include <unordered_set>
#include <atomic>
struct Location {
StringRef name;
};
inline Location operator"" _loc(const char* str, size_t size) {
return Location{ StringRef(reinterpret_cast<const uint8_t*>(str), size) };
}
struct SpanImpl {
explicit SpanImpl(UID contex, Location location,
std::unordered_set<UID> const& parents = std::unordered_set<UID>());
SpanImpl(const SpanImpl&) = delete;
SpanImpl(SpanImpl&&) = delete;
SpanImpl& operator=(const SpanImpl&) = delete;
SpanImpl& operator=(SpanImpl&&) = delete;
void addref();
void delref();
~SpanImpl();
UID context;
double begin, end;
Location location;
std::unordered_set<UID> parents;
private:
std::atomic<unsigned> refCount = 1;
};
class Span {
Reference<SpanImpl> impl;
public:
Span(UID context, Location location, std::unordered_set<UID> const& parents = std::unordered_set<UID>())
: impl(new SpanImpl(context, location, parents)) {}
Span(Location location, std::unordered_set<UID> const& parents = std::unordered_set<UID>())
: impl(new SpanImpl(deterministicRandom()->randomUniqueID(), location, parents)) {}
Span(Location location, Span const& parent)
: impl(new SpanImpl(deterministicRandom()->randomUniqueID(), location, { parent->context })) {}
Span(Location location, std::initializer_list<Span> const& parents)
: impl(new SpanImpl(deterministicRandom()->randomUniqueID(), location)) {
for (const auto& parent : parents) {
impl->parents.insert(parent->context);
}
}
Span(const Span&) = default;
Span(Span&&) = default;
Span() {}
Span& operator=(Span&&) = default;
Span& operator=(const Span&) = default;
SpanImpl* operator->() const { return impl.getPtr(); }
SpanImpl& operator*() const { return *impl; }
void reset() { impl.clear(); }
};
enum class TracerType { DISABLED, LOG_FILE };
struct ITracer {
virtual ~ITracer();
virtual TracerType type() const = 0;
// passed ownership to the tracer
virtual void trace(SpanImpl* span) = 0;
};
void openTracer(TracerType type);
template <class T>
struct SpannedDeque : Deque<T> {
Span span;
explicit SpannedDeque(Location loc) : span(deterministicRandom()->randomUniqueID(), loc) {}
explicit SpannedDeque(Span span) : span(span) {}
SpannedDeque(SpannedDeque&& other) : Deque<T>(std::move(other)), span(std::move(other.span)) {}
SpannedDeque(SpannedDeque const& other) : Deque<T>(other), span(other.span) {}
SpannedDeque& operator=(SpannedDeque const& other) {
*static_cast<Deque<T>*>(this) = other;
span = other.span;
return *this;
}
SpannedDeque& operator=(SpannedDeque&& other) {
*static_cast<Deque<T>*>(this) = std::move(other);
span = std::move(other.span);
}
Span resetSpan() {
auto res = span;
span = Span(span->location);
return res;
}
};