Revert "Request tracing"

This commit is contained in:
Jingyu Zhou 2020-06-16 12:32:42 -07:00 committed by GitHub
parent 4d0a76d146
commit 327cc31e35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 214 additions and 660 deletions

View File

@ -223,13 +223,11 @@ public:
bool enableLocalityLoadBalance;
struct VersionRequest {
SpanID spanContext;
Promise<GetReadVersionReply> reply;
TagSet tags;
Optional<UID> debugID;
VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), tags(tags), debugID(debugID) {}
VersionRequest(TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>()) : tags(tags), debugID(debugID) {}
};
// Transaction start request batching

View File

@ -36,7 +36,6 @@ typedef uint64_t Sequence;
typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;
typedef UID SpanID;
enum {
tagLocalitySpecial = -1,

View File

@ -153,7 +153,6 @@ struct CommitTransactionRequest : TimedRequest {
bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
Arena arena;
SpanID spanContext;
CommitTransactionRef transaction;
ReplyPromise<CommitID> reply;
uint32_t flags;
@ -163,7 +162,7 @@ struct CommitTransactionRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transaction, reply, arena, flags, debugID, spanContext);
serializer(ar, transaction, reply, arena, flags, debugID);
}
};
@ -210,7 +209,6 @@ struct GetReadVersionRequest : TimedRequest {
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
};
SpanID spanContext;
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;
@ -221,11 +219,9 @@ struct GetReadVersionRequest : TimedRequest {
ReplyPromise<GetReadVersionReply> reply;
GetReadVersionRequest() : transactionCount(1), flags(0) {}
GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority,
uint32_t flags = 0, TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags),
debugID(debugID) {
GetReadVersionRequest(uint32_t transactionCount, TransactionPriority priority, uint32_t flags = 0, TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(), Optional<UID> debugID = Optional<UID>())
: transactionCount(transactionCount), priority(priority), flags(flags), tags(tags), debugID(debugID)
{
flags = flags & ~FLAG_PRIORITY_MASK;
switch(priority) {
case TransactionPriority::BATCH:
@ -246,7 +242,7 @@ struct GetReadVersionRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext);
serializer(ar, transactionCount, flags, tags, debugID, reply);
if(ar.isDeserializing) {
if((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {
@ -279,7 +275,6 @@ struct GetKeyServerLocationsReply {
struct GetKeyServerLocationsRequest {
constexpr static FileIdentifier file_identifier = 9144680;
Arena arena;
SpanID spanContext;
KeyRef begin;
Optional<KeyRef> end;
int limit;
@ -287,28 +282,24 @@ struct GetKeyServerLocationsRequest {
ReplyPromise<GetKeyServerLocationsReply> reply;
GetKeyServerLocationsRequest() : limit(0), reverse(false) {}
GetKeyServerLocationsRequest(SpanID spanContext, KeyRef const& begin, Optional<KeyRef> const& end, int limit,
bool reverse, Arena const& arena)
: spanContext(spanContext), begin(begin), end(end), limit(limit), reverse(reverse), arena(arena) {}
GetKeyServerLocationsRequest( KeyRef const& begin, Optional<KeyRef> const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, limit, reverse, reply, spanContext, arena);
serializer(ar, begin, end, limit, reverse, reply, arena);
}
};
struct GetRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 12954034;
SpanID spanContext;
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;
explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional<UID> const& debugID = Optional<UID>()) : spanContext(spanContext), debugID(debugID) {}
explicit GetRawCommittedVersionRequest() : spanContext(), debugID() {}
explicit GetRawCommittedVersionRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, debugID, reply, spanContext);
serializer(ar, debugID, reply);
}
};

View File

@ -36,7 +36,6 @@
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ManagementAPI.actor.h"
@ -47,7 +46,6 @@
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/versions.h"
#include "fdbrpc/LoadBalance.h"
#include "fdbrpc/Net2FileSystem.h"
#include "fdbrpc/simulator.h"
@ -61,10 +59,12 @@
#include "flow/Platform.h"
#include "flow/SystemMonitor.h"
#include "flow/TLSConfig.actor.h"
#include "flow/Tracing.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/serialize.h"
#include "fdbclient/versions.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
@ -1539,7 +1539,6 @@ ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInt
//If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
state Span span("NAPI:getKeyLocation"_loc, { info.span->context });
if (isBackward) {
ASSERT( key != allKeys.begin && key <= allKeys.end );
} else {
@ -1553,10 +1552,7 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when(GetKeyServerLocationsReply rep = wait(basicLoadBalance(
cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span->context, key, Optional<KeyRef>(), 100, isBackward, key.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
when ( GetKeyServerLocationsReply rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
++cx->transactionKeyServerLocationRequestsCompleted;
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
@ -1592,7 +1588,6 @@ Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation(Database const& c
}
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
state Span span("NAPI:getKeyRangeLocations"_loc, { info.span->context });
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
@ -1600,10 +1595,7 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span->context, keys.begin, keys.end, limit, reverse, keys.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
when ( GetKeyServerLocationsReply _rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
++cx->transactionKeyServerLocationRequestsCompleted;
state GetKeyServerLocationsReply rep = _rep;
if( info.debugID.present() )
@ -1694,7 +1686,6 @@ Future<Void> Transaction::warmRange(Database cx, KeyRange keys) {
ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Database cx, TransactionInfo info, Reference<TransactionLogInfo> trLogInfo, TagSet tags )
{
state Version ver = wait( version );
state Span span("NAPI:getValue"_loc, { info.span->context });
cx->validateVersion(ver);
loop {
@ -1727,12 +1718,10 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
}
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetValueReply _reply = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue,
GetValueRequest(span->context, key, ver,
cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
when(GetValueReply _reply =
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getValue,
GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID), TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -1790,7 +1779,6 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
wait(success(version));
state Optional<UID> getKeyID = Optional<UID>();
state Span span("NAPI:getKey"_loc, { info.span->context });
if( info.debugID.present() ) {
getKeyID = nondeterministicRandom()->randomUniqueID();
@ -1819,11 +1807,9 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetKeyReply _reply =
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey,
GetKeyRequest(span->context, k, version.get(),
cx->sampleReadTags() ? tags : Optional<TagSet>(), getKeyID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get(), cx->sampleReadTags() ? tags : Optional<TagSet>(), getKeyID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -1856,15 +1842,12 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
}
}
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, SpanID spanContext ) {
state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext });
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
try {
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when(GetReadVersionReply v = wait(basicLoadBalance(
cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(span->context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
if (v.version >= version)
@ -1880,14 +1863,11 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, Spa
}
}
ACTOR Future<Version> getRawVersion( Database cx, SpanID spanContext ) {
state Span span("NAPI:getRawVersion"_loc, { spanContext });
ACTOR Future<Version> getRawVersion( Database cx ) {
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
return v.version;
}
}
@ -1901,7 +1881,6 @@ ACTOR Future<Void> readVersionBatcher(
ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state Version ver = wait( version );
state Span span(deterministicRandom()->randomUniqueID(), "NAPI:watchValue"_loc, { info.span->context });
cx->validateVersion(ver);
ASSERT(ver != latestVersion);
@ -1918,11 +1897,9 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
}
state WatchValueReply resp;
choose {
when(WatchValueReply r = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
WatchValueRequest(span->context, key, value, ver,
cx->sampleReadTags() ? tags : Optional<TagSet>(), watchValueID),
TaskPriority::DefaultPromiseEndpoint))) {
when(WatchValueReply r = wait(loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
WatchValueRequest(key, value, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), watchValueID),
TaskPriority::DefaultPromiseEndpoint))) {
resp = r;
}
when(wait(cx->connectionFile ? cx->connectionFile->onChange() : Never())) { wait(Never()); }
@ -1933,7 +1910,7 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
//FIXME: wait for known committed version on the storage server before replying,
//cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop)
Version v = wait(waitForCommittedVersion(cx, resp.version, span->context));
Version v = wait(waitForCommittedVersion(cx, resp.version));
//TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value);
@ -1986,7 +1963,6 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
KeyRange keys, GetRangeLimits limits, bool reverse, TransactionInfo info, TagSet tags )
{
state Standalone<RangeResultRef> output;
state Span span("NAPI:getExactRange"_loc, { info.span->context });
//printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
loop {
@ -2000,7 +1976,6 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
req.version = version;
req.begin = firstGreaterOrEqual( range.begin );
req.end = firstGreaterOrEqual( range.end );
req.spanContext = span->context;
transformRangeLimits(limits, reverse, req);
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
@ -2245,7 +2220,6 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
state KeySelector originalBegin = begin;
state KeySelector originalEnd = end;
state Standalone<RangeResultRef> output;
state Span span("NAPI:getRange"_loc, info.span);
try {
state Version version = wait( fVersion );
@ -2298,7 +2272,6 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
req.tags = cx->sampleReadTags() ? tags : Optional<TagSet>();
req.debugID = info.debugID;
req.spanContext = span->context;
try {
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
@ -2636,7 +2609,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch, Database cx, TagSet tags, Trans
}
Future<Version> Transaction::getRawReadVersion() {
return ::getRawVersion(cx, info.span->context);
return ::getRawVersion(cx);
}
Future< Void > Transaction::watch( Reference<Watch> watch ) {
@ -2999,7 +2972,6 @@ void Transaction::reset() {
void Transaction::fullReset() {
reset();
info.span = Span(info.span->location);
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
}
@ -3116,8 +3088,6 @@ ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCo
ACTOR static Future<Void> commitDummyTransaction( Database cx, KeyRange range, TransactionInfo info, TransactionOptions options ) {
state Transaction tr(cx);
state int retries = 0;
state Span span("NAPI:dummyTransaction"_loc, info.span);
tr.info.span->parents.insert(span->context);
loop {
try {
TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries);
@ -3164,8 +3134,6 @@ void Transaction::setupWatches() {
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
state TraceInterval interval( "TransactionCommit" );
state double startTime = now();
state Span span("NAPI:tryCommit"_loc, { info.span->context });
req.spanContext = span->context;
if (info.debugID.present())
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
try {
@ -3589,14 +3557,6 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
options.readTags.addTag(value.get());
break;
case FDBTransactionOptions::SPAN_PARENT:
validateOptionValue(value, true);
if (value.get().size() != 16) {
throw invalid_option_value();
}
info.span->parents.emplace(BinaryReader::fromStringRef<UID>(value.get(), Unversioned()));
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
@ -3607,16 +3567,13 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
}
}
ACTOR Future<GetReadVersionReply> getConsistentReadVersion(Span parentSpan, DatabaseContext* cx, uint32_t transactionCount,
TransactionPriority priority, uint32_t flags,
TransactionTagMap<uint32_t> tags, Optional<UID> debugID) {
state Span span("NAPI:getConsistentReadVersion"_loc, parentSpan);
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, TransactionPriority priority, uint32_t flags, TransactionTagMap<uint32_t> tags, Optional<UID> debugID ) {
try {
++cx->transactionReadVersionBatches;
if( debugID.present() )
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
loop {
state GetReadVersionRequest req( span->context, transactionCount, priority, flags, tags, debugID );
state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID );
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
@ -3667,7 +3624,6 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
state PromiseStream<double> replyTimes;
state PromiseStream<Error> _errorStream;
state double batchTime = 0;
state Span span("NAPI:readVersionBatcher"_loc);
loop {
send_batch = false;
choose {
@ -3678,7 +3634,6 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
}
span->parents.insert(req.spanContext);
requests.push_back(req.reply);
for(auto tag : req.tags) {
++tags[tag];
@ -3706,10 +3661,9 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
addActor.send(ready(timeReply(GRVReply.getFuture(), replyTimes)));
Future<Void> batch = incrementalBroadcastWithError(
getConsistentReadVersion(span, cx, count, priority, flags, std::move(tags), std::move(debugID)),
getConsistentReadVersion(cx, count, priority, flags, std::move(tags), std::move(debugID)),
std::move(requests), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
span = Span("NAPI:readVersionBatcher"_loc);
tags.clear();
debugID = Optional<UID>();
requests.clear();
@ -3719,11 +3673,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
}
ACTOR Future<Version> extractReadVersion(Span parentSpan, DatabaseContext* cx, TransactionPriority priority,
Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f,
bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion,
TagSet tags) {
// parentSpan here is only used to keep the parent alive until the request completes
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, TransactionPriority priority, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion, TagSet tags) {
GetReadVersionReply rep = wait(f);
double latency = now() - startTime;
cx->GRVLatencies.addSample(latency);
@ -3845,12 +3795,10 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), options.priority, flags );
}
Span span("NAPI:getReadVersion"_loc, info.span);
auto const req = DatabaseContext::VersionRequest(span->context, options.tags, info.debugID);
auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion(span, cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(),
options.lockAware, startTime, metadataVersion, options.tags);
readVersion = extractReadVersion( cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags);
}
return readVersion;
}

View File

@ -19,8 +19,6 @@
*/
#pragma once
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H)
#define FDBCLIENT_NATIVEAPI_ACTOR_G_H
#include "fdbclient/NativeAPI.actor.g.h"
@ -154,16 +152,13 @@ class ReadYourWritesTransaction; // workaround cyclic dependency
struct TransactionInfo {
Optional<UID> debugID;
TaskPriority taskID;
Span span;
bool useProvisionalProxies;
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
explicit TransactionInfo(TaskPriority taskID)
: taskID(taskID), span(deterministicRandom()->randomUniqueID(), "Transaction"_loc), useProvisionalProxies(false) {
}
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
};
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
@ -339,7 +334,7 @@ private:
Future<Void> committing;
};
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext);
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version);
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx, KeyRange keys,
int shardLimit);

View File

@ -169,7 +169,6 @@ struct GetValueReply : public LoadBalancedReply {
struct GetValueRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 8454530;
SpanID spanContext;
Key key;
Version version;
Optional<TagSet> tags;
@ -177,12 +176,11 @@ struct GetValueRequest : TimedRequest {
ReplyPromise<GetValueReply> reply;
GetValueRequest(){}
GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID)
: spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {}
GetValueRequest(const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, version, tags, debugID, reply, spanContext);
serializer(ar, key, version, tags, debugID, reply);
}
};
@ -202,7 +200,6 @@ struct WatchValueReply {
struct WatchValueRequest {
constexpr static FileIdentifier file_identifier = 14747733;
SpanID spanContext;
Key key;
Optional<Value> value;
Version version;
@ -211,13 +208,11 @@ struct WatchValueRequest {
ReplyPromise<WatchValueReply> reply;
WatchValueRequest(){}
WatchValueRequest(SpanID spanContext, const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
WatchValueRequest(const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, value, version, tags, debugID, reply, spanContext);
serializer(ar, key, value, version, tags, debugID, reply);
}
};
@ -239,7 +234,6 @@ struct GetKeyValuesReply : public LoadBalancedReply {
struct GetKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
KeySelectorRef begin, end;
Version version; // or latestVersion
@ -252,7 +246,7 @@ struct GetKeyValuesRequest : TimedRequest {
GetKeyValuesRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, arena);
}
};
@ -272,7 +266,6 @@ struct GetKeyReply : public LoadBalancedReply {
struct GetKeyRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 10457870;
SpanID spanContext;
Arena arena;
KeySelectorRef sel;
Version version; // or latestVersion
@ -281,13 +274,11 @@ struct GetKeyRequest : TimedRequest {
ReplyPromise<GetKeyReply> reply;
GetKeyRequest() {}
GetKeyRequest(SpanID spanContext, KeySelectorRef const& sel, Version version, Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), sel(sel), version(version), debugID(debugID) {}
GetKeyRequest(KeySelectorRef const& sel, Version version, Optional<TagSet> tags, Optional<UID> debugID) : sel(sel), version(version), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, sel, version, tags, debugID, reply, spanContext, arena);
serializer(ar, sel, version, tags, debugID, reply, arena);
}
};

View File

@ -268,8 +268,6 @@ description is not currently required but encouraged.
description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="auto_throttle_tag" code="801" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."
description="Adds a tag to the transaction that can be used to apply manual or automatic targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="span_parent" code="900" paramType="Bytes" paramDescription="A byte string of length 16 used to associate the span of this transaction with a parent"
description="Adds a parent to the Span of this transaction. Used for transaction tracing. A span can be identified with any 16 bytes"/>
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -32,8 +32,6 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define SevDebugMemory SevVerbose
@ -431,9 +429,8 @@ struct BackupData {
}
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
state Span span(deterministicRandom()->randomUniqueID(), "BA:GetMinCommittedVersion"_loc);
loop {
GetReadVersionRequest request(span->context, 1, TransactionPriority::DEFAULT,
GetReadVersionRequest request(1, TransactionPriority::DEFAULT,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onMasterProxiesChanged())) {}

View File

@ -155,21 +155,18 @@ struct GetCommitVersionReply {
struct GetCommitVersionRequest {
constexpr static FileIdentifier file_identifier = 16683181;
SpanID spanContext;
uint64_t requestNum;
uint64_t mostRecentProcessedRequestNum;
UID requestingProxy;
ReplyPromise<GetCommitVersionReply> reply;
GetCommitVersionRequest() { }
GetCommitVersionRequest(SpanID spanContext, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum,
UID requestingProxy)
: spanContext(spanContext), requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum),
requestingProxy(requestingProxy) {}
GetCommitVersionRequest(uint64_t requestNum, uint64_t mostRecentProcessedRequestNum, UID requestingProxy)
: requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum), requestingProxy(requestingProxy) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply, spanContext);
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply);
}
};

View File

@ -44,13 +44,10 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/Stats.h"
#include "flow/TDMetric.actor.h"
#include "flow/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include <tuple>
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) {
state ReplyPromise<Void> reply = req.reply;
@ -290,9 +287,9 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
ACTOR Future<Void> queueTransactionStartRequests(
Reference<AsyncVar<ServerDBInfo>> db,
SpannedDeque<GetReadVersionRequest> *systemQueue,
SpannedDeque<GetReadVersionRequest> *defaultQueue,
SpannedDeque<GetReadVersionRequest> *batchQueue,
Deque<GetReadVersionRequest> *systemQueue,
Deque<GetReadVersionRequest> *defaultQueue,
Deque<GetReadVersionRequest> *batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
@ -329,11 +326,9 @@ ACTOR Future<Void> queueTransactionStartRequests(
if (req.priority >= TransactionPriority::IMMEDIATE) {
stats->txnSystemPriorityStartIn += req.transactionCount;
systemQueue->push_back(req);
systemQueue->span->parents.insert(req.spanContext);
} else if (req.priority >= TransactionPriority::DEFAULT) {
stats->txnDefaultPriorityStartIn += req.transactionCount;
defaultQueue->push_back(req);
defaultQueue->span->parents.insert(req.spanContext);
} else {
// Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.proxies.size(), 1);
@ -345,7 +340,6 @@ ACTOR Future<Void> queueTransactionStartRequests(
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
batchQueue->span->parents.insert(req.spanContext);
}
}
}
@ -511,11 +505,8 @@ struct ResolutionRequestBuilder {
// [CommitTransactionRef_Index][Resolver_Index][Read_Conflict_Range_Index_on_Resolver]
// -> read_conflict_range's original index in the commitTransactionRef
ResolutionRequestBuilder(ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion,
Span& parentSpan)
: self(self), requests(self->resolvers.size()) {
for (auto& req : requests) {
req.spanContext = parentSpan->context;
ResolutionRequestBuilder( ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion) : self(self), requests(self->resolvers.size()) {
for(auto& req : requests) {
req.prevVersion = prevVersion;
req.version = version;
req.lastReceivedVersion = lastReceivedVersion;
@ -799,7 +790,6 @@ ACTOR Future<Void> commitBatch(
state Optional<UID> debugID;
state bool forceRecovery = false;
state int batchOperations = 0;
state Span span("MP:commitBatch"_loc);
int64_t batchBytes = 0;
for (int t = 0; t<trs.size(); t++) {
batchOperations += trs[t].transaction.mutations.size();
@ -822,7 +812,6 @@ ACTOR Future<Void> commitBatch(
debugID = nondeterministicRandom()->randomUniqueID();
g_traceBatch.addAttach("CommitAttachID", trs[t].debugID.get().first(), debugID.get().first());
}
span->parents.insert(trs[t].spanContext);
}
if(localBatchNumber == 2 && !debugID.present() && self->firstProxy && !g_network->isSimulated()) {
@ -843,7 +832,7 @@ ACTOR Future<Void> commitBatch(
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
GetCommitVersionRequest req(span->context, self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
self->mostRecentProcessedRequestNumber = versionReply.requestNum;
@ -864,7 +853,7 @@ ACTOR Future<Void> commitBatch(
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version, span );
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
int conflictRangeCount = 0;
state int64_t maxTransactionBytes = 0;
for (int t = 0; t<trs.size(); t++) {
@ -1177,21 +1166,17 @@ ACTOR Future<Void> commitBatch(
// We prevent this by limiting the number of versions which are semi-committed but not fully committed to be less than the MVCC window
if(self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
computeDuration += g_network->timer() - computeStart;
state Span waitVersionSpan;
while (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
// This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
waitVersionSpan = Span(deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, {span->context});
choose{
when(wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
wait(yield());
break;
}
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(
GetReadVersionRequest(waitVersionSpan->context, 0, TransactionPriority::IMMEDIATE,
GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if (v.version > self->committedVersion.get()) {
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(GetReadVersionRequest(0, TransactionPriority::IMMEDIATE, GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if(v.version > self->committedVersion.get()) {
self->locked = v.locked;
self->metadataVersion = v.metadataVersion;
self->committedVersion.set(v.version);
@ -1202,7 +1187,6 @@ ACTOR Future<Void> commitBatch(
}
}
}
waitVersionSpan = Span{};
computeStart = g_network->timer();
}
@ -1397,19 +1381,18 @@ ACTOR Future<Void> updateLastCommit(ProxyCommitData* self, Optional<UID> debugID
return Void();
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(Span parentSpan, ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
{
// Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (1) The version returned is the committedVersion of some proxy at some point before the request returns, so it is committed.
// (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
// and no other proxy could have already committed anything without first ending the epoch
state Span span("MP:getLiveCommittedVersion"_loc, parentSpan);
++commitData->stats.txnStartBatch;
state vector<Future<GetReadVersionReply>> proxyVersions;
for (auto const& p : *otherProxies)
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(span->context, debugID), TaskPriority::TLogConfirmRunningReply)));
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
wait(updateLastCommit(commitData, debugID));
@ -1509,16 +1492,15 @@ ACTOR static Future<Void> transactionStarter(
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state SpannedDeque<GetReadVersionRequest> systemQueue("MP:transactionStarterSystemQueue"_loc);
state SpannedDeque<GetReadVersionRequest> defaultQueue("MP:transactionStarterDefaultQueue"_loc);
state SpannedDeque<GetReadVersionRequest> batchQueue("MP:transactionStarterBatchQueue"_loc);
state Deque<GetReadVersionRequest> systemQueue;
state Deque<GetReadVersionRequest> defaultQueue;
state Deque<GetReadVersionRequest> batchQueue;
state vector<MasterProxyInterface> otherProxies;
state TransactionTagMap<uint64_t> transactionTagCounter;
state PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
state PromiseStream<double> replyTimes;
state Span span;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
@ -1560,16 +1542,13 @@ ACTOR static Future<Void> transactionStarter(
int requestsToStart = 0;
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
SpannedDeque<GetReadVersionRequest>* transactionQueue;
Deque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
transactionQueue = &systemQueue;
span = systemQueue.resetSpan();
} else if(!defaultQueue.empty()) {
transactionQueue = &defaultQueue;
span = defaultQueue.resetSpan();
} else if(!batchQueue.empty()) {
transactionQueue = &batchQueue;
span = batchQueue.resetSpan();
} else {
break;
}
@ -1634,9 +1613,7 @@ ACTOR static Future<Void> transactionStarter(
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(
span, commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i],
defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats,
commitData->minKnownCommittedVersion, throttledTags));
@ -1646,7 +1623,6 @@ ACTOR static Future<Void> transactionStarter(
}
}
}
span.reset();
}
}
@ -2105,7 +2081,6 @@ ACTOR Future<Void> masterProxyServerCore(
}
when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
//TraceEvent("ProxyGetRCV", proxy.id());
Span span("MP:getRawCommittedReadVersion"_loc, { req.spanContext });
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
GetReadVersionReply rep;

View File

@ -20,9 +20,6 @@
#ifndef FDBSERVER_RESOLVERINTERFACE_H
#define FDBSERVER_RESOLVERINTERFACE_H
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/fdbrpc.h"
#pragma once
#include "fdbrpc/Locality.h"
@ -97,19 +94,17 @@ struct ResolveTransactionBatchRequest {
constexpr static FileIdentifier file_identifier = 16462858;
Arena arena;
SpanID spanContext;
Version prevVersion;
Version version; // FIXME: ?
Version lastReceivedVersion;
VectorRef<struct CommitTransactionRef> transactions;
VectorRef<CommitTransactionRef> transactions;
VectorRef<int> txnStateTransactions; // Offsets of elements of transactions that have (transaction subsystem state) mutations
ReplyPromise<ResolveTransactionBatchReply> reply;
Optional<UID> debugID;
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena,
debugID, spanContext);
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena, debugID);
}
};

View File

@ -447,6 +447,7 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageCacheData* data, Version ve
ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
state int64_t resultSize = 0;
try {
++data->counters.getValueQueries;
++data->counters.allQueries;
@ -456,13 +457,12 @@ ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
//TODO what's this?
wait( delay(0, TaskPriority::DefaultEndpoint) );
if( req.debugID.present() ) {
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
//FIXME
}
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );

View File

@ -20,9 +20,6 @@
// There's something in one of the files below that defines a macros
// a macro that makes boost interprocess break on Windows.
#include "flow/Tracing.h"
#include <cctype>
#include <iterator>
#define BOOST_DATE_TIME_NO_LIB
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/algorithm/string.hpp>
@ -81,7 +78,7 @@
// clang-format off
enum {
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_TRACER, OPT_NEWCONSOLE,
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_NEWCONSOLE,
OPT_NOBOX, OPT_TESTFILE, OPT_RESTARTING, OPT_RESTORING, OPT_RANDOMSEED, OPT_KEY, OPT_MEMLIMIT, OPT_STORAGEMEMLIMIT, OPT_CACHEMEMLIMIT, OPT_MACHINEID,
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACECLOCK,
OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
@ -114,7 +111,6 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_MAXLOGSSIZE, "--maxlogssize", SO_REQ_SEP },
{ OPT_LOGGROUP, "--loggroup", SO_REQ_SEP },
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
{ OPT_TRACER, "--tracer", SO_REQ_SEP },
#ifdef _WIN32
{ OPT_NEWCONSOLE, "-n", SO_NONE },
{ OPT_NEWCONSOLE, "--newconsole", SO_NONE },
@ -518,9 +514,6 @@ static void printUsage( const char *name, bool devhelp ) {
printf(" --trace_format FORMAT\n"
" Select the format of the log files. xml (the default) and json\n"
" are supported.\n");
printf(" --tracer TRACER\n"
" Select a tracer for transaction tracing. Currently disabled\n"
" (the default) and log_file are supported.\n");
printf(" -i ID, --machine_id ID\n"
" Machine and zone identifier key (up to 16 hex characters).\n"
" Defaults to a random value shared by all fdbserver processes\n"
@ -1176,22 +1169,6 @@ private:
break;
}
#endif
case OPT_TRACER:
{
std::string arg = args.OptionArg();
std::string tracer;
std::transform(arg.begin(), arg.end(), std::back_inserter(tracer), [](char c) { return tolower(c); });
if (tracer == "none" || tracer == "disabled") {
openTracer(TracerType::DISABLED);
} else if (tracer == "logfile" || tracer == "file" || tracer == "log_file") {
openTracer(TracerType::LOG_FILE);
} else {
fprintf(stderr, "ERROR: Unknown or unsupported tracer: `%s'", args.OptionArg());
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
break;
}
case OPT_TESTFILE:
testFile = args.OptionArg();
break;

View File

@ -915,7 +915,6 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
}
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
state Span span("M:getVersion"_loc, { req.spanContext });
state std::map<UID, ProxyVersionReplies>::iterator proxyItr = self->lastProxyVersionReplies.find(req.requestingProxy); // lastProxyVersionReplies never changes
if (proxyItr == self->lastProxyVersionReplies.end()) {

View File

@ -21,9 +21,6 @@
#include <cinttypes>
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "flow/IndexedSet.h"
#include "flow/Hash3.h"
#include "flow/ActorCollection.h"
@ -715,7 +712,7 @@ public:
}
template<class Request, class HandleFunction>
Future<Void> readGuard(const Span& parentSpan, const Request& request, const HandleFunction& fun) {
Future<Void> readGuard(const Request& request, const HandleFunction& fun) {
auto rate = currentRate();
if (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD && deterministicRandom()->random01() > std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE, rate/SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD)) {
//request.error = future_version();
@ -723,7 +720,7 @@ public:
++counters.readsRejected;
return Void();
}
return fun(this, request, parentSpan);
return fun(this, request);
}
};
@ -849,8 +846,7 @@ updateProcessStats(StorageServer* self)
#pragma region Queries
#endif
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) {
state Span span("SS.WaitForVersion"_loc, { spanContext });
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version) {
choose {
when(wait(data->version.whenAtLeast(version))) {
// FIXME: A bunch of these can block with or without the following delay 0.
@ -869,7 +865,7 @@ ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version,
}
}
Future<Version> waitForVersion(StorageServer* data, Version version, SpanID spanContext) {
Future<Version> waitForVersion(StorageServer* data, Version version) {
if (version == latestVersion) {
version = std::max(Version(1), data->version.get());
}
@ -887,7 +883,7 @@ Future<Version> waitForVersion(StorageServer* data, Version version, SpanID span
if (deterministicRandom()->random01() < 0.001) {
TraceEvent("WaitForVersion1000x");
}
return waitForVersionActor(data, version, spanContext);
return waitForVersionActor(data, version);
}
ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
@ -911,7 +907,7 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version versi
}
}
ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req, Span span ) {
ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
state int64_t resultSize = 0;
try {
@ -928,7 +924,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req, Span spa
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
state Version version = wait( waitForVersion( data, req.version ) );
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
@ -1012,8 +1008,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req, Span spa
return Void();
};
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req, SpanID parent ) {
state Span span("SS:WatchValueImpl"_loc, { parent });
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req ) {
try {
++data->counters.watchQueries;
@ -1028,11 +1023,9 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req,
try {
state Version latest = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(req.key);
state Span getValueSpan(deterministicRandom()->randomUniqueID(), "SS:GetValue"_loc, { span->context });
GetValueRequest getReq( getValueSpan->context, req.key, latest, req.tags, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq, span ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueRequest getReq( req.key, latest, req.tags, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
getValueSpan.reset();
//TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest);
if(reply.error.present()) {
@ -1079,8 +1072,8 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req,
return Void();
}
ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req, Span span ) {
state Future<Void> watch = watchValue_impl( data, req, span->context );
ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
state Future<Void> watch = watchValue_impl( data, req );
state double startTime = now();
loop {
@ -1185,7 +1178,7 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
// If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
// readRange has O(|result|) + O(log |data|) cost
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes, Span parentSpan ) {
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes ) {
state GetKeyValuesReply result;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
state StorageServer::VersionedData::iterator vCurrent = view.end();
@ -1193,7 +1186,6 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount = 0;
state Span span("SS:readRange"_loc, parentSpan);
// for caching the storage queue results during the first PTree traversal
state VectorRef<KeyValueRef> resultCache;
@ -1365,7 +1357,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
// return sel.getKey() >= range.begin && (sel.isBackward() ? sel.getKey() <= range.end : sel.getKey() < range.end);
//}
ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset, SpanID parentSpan)
ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
// If it is found, offset is set to 0 and a key is returned which falls inside range.
@ -1382,7 +1374,6 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
state int sign = forward ? +1 : -1;
state bool skipEqualKey = sel.orEqual == forward;
state int distance = forward ? sel.offset : 1-sel.offset;
state Span span("SS.findKey"_loc, { parentSpan });
//Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from the read range in this case)
state int maxBytes;
@ -1391,18 +1382,14 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
else
maxBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_LIMIT_BYTES : SERVER_KNOBS->STORAGE_LIMIT_BYTES;
state GetKeyValuesReply rep = wait(
readRange(data, version,
forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())),
(distance + skipEqualKey) * sign, &maxBytes, span));
state GetKeyValuesReply rep = wait( readRange( data, version, forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())), (distance + skipEqualKey)*sign, &maxBytes ) );
state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
//If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in a loop
if(more && !forward && rep.data.size() == 1) {
TEST(true); //Reverse key selector returned only one result in range read
maxBytes = std::numeric_limits<int>::max();
GetKeyValuesReply rep2 =
wait(readRange(data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes, span));
GetKeyValuesReply rep2 = wait( readRange( data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes ) );
rep = rep2;
more = rep.more && rep.data.size() != distance + skipEqualKey;
ASSERT(rep.data.size() == 2 || !more);
@ -1457,7 +1444,7 @@ KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
return i->range();
}
ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req, Span span )
ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents
// all data from being read in one range read
{
@ -1482,7 +1469,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req,
try {
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
state Version version = wait( waitForVersion( data, req.version, span->context ) );
state Version version = wait( waitForVersion( data, req.version ) );
state uint64_t changeCounter = data->shardChangeCounter;
// try {
@ -1500,8 +1487,8 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req,
state int offset1;
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1, span->context );
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2, span->context );
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1 );
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
if( req.debugID.present() )
@ -1535,7 +1522,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req,
} else {
state int remainingLimitBytes = req.limitBytes;
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span) );
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
GetKeyValuesReply r = _r;
if( req.debugID.present() )
@ -1597,7 +1584,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req,
return Void();
}
ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req, Span span ) {
ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req ) {
state int64_t resultSize = 0;
++data->counters.getKeyQueries;
@ -1610,12 +1597,12 @@ ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req, Span span )
wait( delay(0, TaskPriority::DefaultEndpoint) );
try {
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
state Version version = wait( waitForVersion( data, req.version ) );
state uint64_t changeCounter = data->shardChangeCounter;
state KeyRange shard = getShardKeyRange( data, req.sel );
state int offset;
Key k = wait( findKey( data, req.sel, version, shard, &offset, req.spanContext ) );
Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
@ -3676,7 +3663,6 @@ ACTOR Future<Void> checkBehind( StorageServer* self ) {
ACTOR Future<Void> serveGetValueRequests( StorageServer* self, FutureStream<GetValueRequest> getValue ) {
loop {
GetValueRequest req = waitNext(getValue);
Span span("SS:getValue"_loc, { req.spanContext });
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
@ -3684,35 +3670,32 @@ ACTOR Future<Void> serveGetValueRequests( StorageServer* self, FutureStream<GetV
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
req.reply.send(GetValueReply());
else
self->actors.add(self->readGuard(span, req , getValueQ));
self->actors.add(self->readGuard(req , getValueQ));
}
}
ACTOR Future<Void> serveGetKeyValuesRequests( StorageServer* self, FutureStream<GetKeyValuesRequest> getKeyValues ) {
loop {
GetKeyValuesRequest req = waitNext(getKeyValues);
Span span("SS:getKeyValues"_loc, { req.spanContext });
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
self->actors.add(self->readGuard(span, req, getKeyValuesQ));
self->actors.add(self->readGuard(req, getKeyValuesQ));
}
}
ACTOR Future<Void> serveGetKeyRequests( StorageServer* self, FutureStream<GetKeyRequest> getKey ) {
loop {
GetKeyRequest req = waitNext(getKey);
Span span("SS:getKey"_loc, { req.spanContext });
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
self->actors.add(self->readGuard(span, req , getKeyQ));
self->actors.add(self->readGuard(req , getKeyQ));
}
}
ACTOR Future<Void> serveWatchValueRequests( StorageServer* self, FutureStream<WatchValueRequest> watchValue ) {
loop {
WatchValueRequest req = waitNext(watchValue);
Span span("SS:watchValue"_loc, { req.spanContext });
// TODO: fast load balancing?
// SOMEDAY: combine watches for the same key/value into a single watch
self->actors.add(self->readGuard(span, req, watchValueQ));
self->actors.add(self->readGuard(req, watchValueQ));
}
}

View File

@ -21,7 +21,6 @@
#include <math.h>
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -377,16 +376,12 @@ struct ConsistencyCheckWorkload : TestWorkload
state Key begin = keyServersKeys.begin;
state Key end = keyServersKeys.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);
while (begin < end) {
state Reference<ProxyInfo> proxyInfo = wait(cx->getMasterProxiesFuture(false));
keyServerLocationFutures.clear();
for (int i = 0; i < proxyInfo->size(); i++)
keyServerLocationFutures.push_back(
proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations)
.getReplyUnlessFailedFor(
GetKeyServerLocationsRequest(span->context, begin, end, limitKeyServers, false, Arena()), 2, 0));
keyServerLocationFutures.push_back(proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(begin, end, limitKeyServers, false, Arena()), 2, 0));
state bool keyServersInsertedForThisIteration = false;
choose {

View File

@ -18,21 +18,15 @@
* limitations under the License.
*/
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/serialize.h"
#include <cstring>
struct CycleWorkload : TestWorkload {
int actorCount, nodeCount;
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond, traceParentProbability;
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond;
Key keyPrefix;
vector<Future<Void>> clients;
@ -44,13 +38,12 @@ struct CycleWorkload : TestWorkload {
transactions("Transactions"), retries("Retries"), totalLatency("Latency"),
tooOldRetries("Retries.too_old"), commitFailedRetries("Retries.commit_failed")
{
testDuration = getOption( options, "testDuration"_sr, 10.0 );
transactionsPerSecond = getOption( options, "transactionsPerSecond"_sr, 5000.0 ) / clientCount;
actorCount = getOption( options, "actorsPerClient"_sr, transactionsPerSecond / 5 );
nodeCount = getOption(options, "nodeCount"_sr, transactionsPerSecond * clientCount);
keyPrefix = unprintable( getOption(options, "keyPrefix"_sr, LiteralStringRef("")).toString() );
traceParentProbability = getOption(options, "traceParentProbability "_sr, 0.01);
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, "expectedRate"_sr, 0.7);
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 ) / clientCount;
actorCount = getOption( options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5 );
nodeCount = getOption(options, LiteralStringRef("nodeCount"), transactionsPerSecond * clientCount);
keyPrefix = unprintable( getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("")).toString() );
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, LiteralStringRef("expectedRate"), 0.7);
}
virtual std::string description() { return "CycleWorkload"; }
@ -105,12 +98,6 @@ struct CycleWorkload : TestWorkload {
state double tstart = now();
state int r = deterministicRandom()->randomInt(0, self->nodeCount);
state Transaction tr(cx);
if (deterministicRandom()->random01() >= self->traceParentProbability) {
state Span span("CycleClient"_loc);
TraceEvent("CycleTracingTransaction", span->context);
tr.setOption(FDBTransactionOptions::SPAN_PARENT,
BinaryWriter::toValue(span->context, Unversioned()));
}
while (true) {
try {
// Reverse next and next^2 node

View File

@ -631,9 +631,6 @@ struct Traceable<Standalone<T>> : std::conditional<Traceable<T>::value, std::tru
};
#define LiteralStringRef( str ) StringRef( (const uint8_t*)(str), sizeof((str))-1 )
inline StringRef operator "" _sr(const char* str, size_t size) {
return StringRef(reinterpret_cast<const uint8_t*>(str), size);
}
// makeString is used to allocate a Standalone<StringRef> of a known length for later
// mutation (via mutateString). If you need to append to a string of unknown length,

View File

@ -28,7 +28,6 @@ set(FLOW_SRCS
IRandom.h
IThreadPool.cpp
IThreadPool.h
ITrace.h
IndexedSet.actor.h
IndexedSet.cpp
IndexedSet.h
@ -62,8 +61,6 @@ set(FLOW_SRCS
ThreadSafeQueue.h
Trace.cpp
Trace.h
Tracing.h
Tracing.cpp
TreeBenchmark.h
UnitTest.cpp
UnitTest.h

View File

@ -48,36 +48,6 @@
#include <fcntl.h>
#include <cmath>
struct IssuesListImpl {
IssuesListImpl(){}
void addIssue(std::string issue) {
MutexHolder h(mutex);
issues.insert(issue);
}
void retrieveIssues(std::set<std::string>& out) {
MutexHolder h(mutex);
for (auto const& i : issues) {
out.insert(i);
}
}
void resolveIssue(std::string issue) {
MutexHolder h(mutex);
issues.erase(issue);
}
private:
Mutex mutex;
std::set<std::string> issues;
};
IssuesList::IssuesList() : impl(new IssuesListImpl{}) {}
IssuesList::~IssuesList() { delete impl; }
void IssuesList::addIssue(std::string issue) { impl->addIssue(issue); }
void IssuesList::retrieveIssues(std::set<std::string> &out) { impl->retrieveIssues(out); }
void IssuesList::resolveIssue(std::string issue) { impl->resolveIssue(issue); }
FileTraceLogWriter::FileTraceLogWriter(std::string directory, std::string processName, std::string basename,
std::string extension, uint64_t maxLogsSize, std::function<void()> onError,
Reference<ITraceLogIssuesReporter> issues)
@ -102,16 +72,8 @@ void FileTraceLogWriter::lastError(int err) {
}
void FileTraceLogWriter::write(const std::string& str) {
write(str.data(), str.size());
}
void FileTraceLogWriter::write(const StringRef& str) {
write(reinterpret_cast<const char*>(str.begin()), str.size());
}
void FileTraceLogWriter::write(const char* str, size_t len) {
auto ptr = str;
int remaining = len;
auto ptr = str.c_str();
int remaining = str.size();
bool needsResolve = false;
while ( remaining ) {

View File

@ -23,29 +23,11 @@
#define FLOW_FILE_TRACE_LOG_WRITER_H
#pragma once
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/Trace.h"
#include <functional>
struct IssuesListImpl;
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList();
virtual ~IssuesList();
void addIssue(std::string issue) override;
void retrieveIssues(std::set<std::string>& out) override;
void resolveIssue(std::string issue) override;
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() { ThreadSafeReferenceCounted<IssuesList>::delref(); }
private:
IssuesListImpl* impl;
};
class FileTraceLogWriter : public ITraceLogWriter, ReferenceCounted<FileTraceLogWriter> {
private:
std::string directory;
@ -60,8 +42,6 @@ private:
std::function<void()> onError;
void write(const char* str, size_t size);
public:
FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension,
uint64_t maxLogsSize, std::function<void()> onError, Reference<ITraceLogIssuesReporter> issues);
@ -71,12 +51,11 @@ public:
void lastError(int err);
void write(const std::string& str) override;
void write(StringRef const& str) override;
void open() override;
void close() override;
void roll() override;
void sync() override;
void write(const std::string& str);
void open();
void close();
void roll();
void sync();
void cleanupTraceFiles();
};

View File

@ -109,41 +109,5 @@ private:
Reference<IThreadPool> createGenericThreadPool();
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
DummyThreadPool() : thread(NULL) {}
Future<Void> getError() {
return errors.getFuture();
}
void addThread( IThreadPoolReceiver* userData ) {
ASSERT( !thread );
thread = userData;
}
void post( PThreadAction action ) {
try {
(*action)( thread );
} catch (Error& e) {
errors.sendError( e );
} catch (...) {
errors.sendError( unknown_error() );
}
}
Future<Void> stop(Error const& e) {
return Void();
}
void addref() {
ReferenceCounted<DummyThreadPool>::addref();
}
void delref() {
ReferenceCounted<DummyThreadPool>::delref();
}
private:
IThreadPoolReceiver* thread;
Promise<Void> errors;
};
#endif

View File

@ -1,61 +0,0 @@
/*
* ITrace.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <set>
class StringRef;
struct ITraceLogWriter {
virtual void open() = 0;
virtual void roll() = 0;
virtual void close() = 0;
virtual void write(const std::string&) = 0;
virtual void write(const StringRef&) = 0;
virtual void sync() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
class TraceEventFields;
struct ITraceLogFormatter {
virtual const char* getExtension() = 0;
virtual const char* getHeader() = 0; // Called when starting a new file
virtual const char* getFooter() = 0; // Called when ending a file
virtual std::string formatEvent(const TraceEventFields&) = 0; // Called for each event
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogIssuesReporter {
virtual ~ITraceLogIssuesReporter();
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
virtual void retrieveIssues(std::set<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};

View File

@ -21,7 +21,6 @@
#include "flow/Trace.h"
#include "flow/FileTraceLogWriter.h"
#include "flow/Knobs.h"
#include "flow/XmlTraceLogFormatter.h"
#include "flow/JsonTraceLogFormatter.h"
#include "flow/flow.h"
@ -55,7 +54,40 @@
// during an open trace event
thread_local int g_allocation_tracing_disabled = 1;
ITraceLogIssuesReporter::~ITraceLogIssuesReporter() {}
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
DummyThreadPool() : thread(NULL) {}
Future<Void> getError() {
return errors.getFuture();
}
void addThread( IThreadPoolReceiver* userData ) {
ASSERT( !thread );
thread = userData;
}
void post( PThreadAction action ) {
try {
(*action)( thread );
} catch (Error& e) {
errors.sendError( e );
} catch (...) {
errors.sendError( unknown_error() );
}
}
Future<Void> stop(Error const& e) {
return Void();
}
void addref() {
ReferenceCounted<DummyThreadPool>::addref();
}
void delref() {
ReferenceCounted<DummyThreadPool>::delref();
}
private:
IThreadPoolReceiver* thread;
Promise<Void> errors;
};
struct SuppressionMap {
struct SuppressionInfo {
@ -197,6 +229,33 @@ public:
}
};
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList(){};
void addIssue(std::string issue) override {
MutexHolder h(mutex);
issues.insert(issue);
}
void retrieveIssues(std::set<std::string>& out) override {
MutexHolder h(mutex);
for (auto const& i : issues) {
out.insert(i);
}
}
void resolveIssue(std::string issue) override {
MutexHolder h(mutex);
issues.erase(issue);
}
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() { ThreadSafeReferenceCounted<IssuesList>::delref(); }
private:
Mutex mutex;
std::set<std::string> issues;
};
Reference<IssuesList> issues;
Reference<BarrierList> barriers;

View File

@ -31,7 +31,6 @@
#include <type_traits>
#include "flow/IRandom.h"
#include "flow/Error.h"
#include "flow/ITrace.h"
#define TRACE_DEFAULT_ROLL_SIZE (10 << 20)
#define TRACE_DEFAULT_MAX_LOGS_SIZE (10 * TRACE_DEFAULT_ROLL_SIZE)
@ -517,7 +516,36 @@ private:
bool init( struct TraceInterval& );
};
class StringRef;
struct ITraceLogWriter {
virtual void open() = 0;
virtual void roll() = 0;
virtual void close() = 0;
virtual void write(const std::string&) = 0;
virtual void sync() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogFormatter {
virtual const char* getExtension() = 0;
virtual const char* getHeader() = 0; // Called when starting a new file
virtual const char* getFooter() = 0; // Called when ending a file
virtual std::string formatEvent(const TraceEventFields&) = 0; // Called for each event
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogIssuesReporter {
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
virtual void retrieveIssues(std::set<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
struct TraceInterval {
TraceInterval( const char* type ) : count(-1), type(type), severity(SevInfo) {}

View File

@ -1,82 +0,0 @@
/*
* Tracing.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/Tracing.h"
namespace {
struct NoopTracer : ITracer {
TracerType type() const { return TracerType::DISABLED; }
void trace(SpanImpl* span) override { delete span; }
};
struct LogfileTracer : ITracer {
TracerType type() const { return TracerType::LOG_FILE; }
void trace(SpanImpl* span) override {
TraceEvent te(SevInfo, "TracingSpan", span->context);
te.detail("Location", span->location.name).detail("Begin", span->begin).detail("End", span->end);
if (span->parents.size() == 1) {
te.detail("Parent", *span->parents.begin());
} else {
for (auto parent : span->parents) {
TraceEvent(SevInfo, "TracingSpanAddParent", span->context).detail("AddParent", parent);
}
}
}
};
ITracer* g_tracer = new NoopTracer();
} // namespace
void openTracer(TracerType type) {
if (g_tracer->type() == type) {
return;
}
delete g_tracer;
switch (type) {
case TracerType::DISABLED:
g_tracer = new NoopTracer{};
break;
case TracerType::LOG_FILE:
g_tracer = new LogfileTracer{};
break;
}
}
ITracer::~ITracer() {}
SpanImpl::SpanImpl(UID context, Location location, std::unordered_set<UID> const& parents)
: context(context), location(location), parents(parents) {
begin = g_network->now();
}
SpanImpl::~SpanImpl() {}
void SpanImpl::addref() {
++refCount;
}
void SpanImpl::delref() {
if (--refCount == 0) {
end = g_network->now();
g_tracer->trace(this);
}
}

View File

@ -1,114 +0,0 @@
/*
* Tracing.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/FDBTypes.h"
#include "flow/IRandom.h"
#include <unordered_set>
#include <atomic>
struct Location {
StringRef name;
};
inline Location operator"" _loc(const char* str, size_t size) {
return Location{ StringRef(reinterpret_cast<const uint8_t*>(str), size) };
}
struct SpanImpl {
explicit SpanImpl(UID contex, Location location,
std::unordered_set<UID> const& parents = std::unordered_set<UID>());
SpanImpl(const SpanImpl&) = delete;
SpanImpl(SpanImpl&&) = delete;
SpanImpl& operator=(const SpanImpl&) = delete;
SpanImpl& operator=(SpanImpl&&) = delete;
void addref();
void delref();
~SpanImpl();
UID context;
double begin, end;
Location location;
std::unordered_set<UID> parents;
private:
std::atomic<unsigned> refCount = 1;
};
class Span {
Reference<SpanImpl> impl;
public:
Span(UID context, Location location, std::unordered_set<UID> const& parents = std::unordered_set<UID>())
: impl(new SpanImpl(context, location, parents)) {}
Span(Location location, std::unordered_set<UID> const& parents = std::unordered_set<UID>())
: impl(new SpanImpl(deterministicRandom()->randomUniqueID(), location, parents)) {}
Span(Location location, Span const& parent)
: impl(new SpanImpl(deterministicRandom()->randomUniqueID(), location, { parent->context })) {}
Span(Location location, std::initializer_list<Span> const& parents)
: impl(new SpanImpl(deterministicRandom()->randomUniqueID(), location)) {
for (const auto& parent : parents) {
impl->parents.insert(parent->context);
}
}
Span(const Span&) = default;
Span(Span&&) = default;
Span() {}
Span& operator=(Span&&) = default;
Span& operator=(const Span&) = default;
SpanImpl* operator->() const { return impl.getPtr(); }
SpanImpl& operator*() const { return *impl; }
void reset() { impl.clear(); }
};
enum class TracerType { DISABLED, LOG_FILE };
struct ITracer {
virtual ~ITracer();
virtual TracerType type() const = 0;
// passed ownership to the tracer
virtual void trace(SpanImpl* span) = 0;
};
void openTracer(TracerType type);
template <class T>
struct SpannedDeque : Deque<T> {
Span span;
explicit SpannedDeque(Location loc) : span(deterministicRandom()->randomUniqueID(), loc) {}
explicit SpannedDeque(Span span) : span(span) {}
SpannedDeque(SpannedDeque&& other) : Deque<T>(std::move(other)), span(std::move(other.span)) {}
SpannedDeque(SpannedDeque const& other) : Deque<T>(other), span(other.span) {}
SpannedDeque& operator=(SpannedDeque const& other) {
*static_cast<Deque<T>*>(this) = other;
span = other.span;
return *this;
}
SpannedDeque& operator=(SpannedDeque&& other) {
*static_cast<Deque<T>*>(this) = std::move(other);
span = std::move(other.span);
}
Span resetSpan() {
auto res = span;
span = Span(span->location);
return res;
}
};