Address review comments
This commit is contained in:
parent
f6c50c60f3
commit
9ef93714ac
|
@ -223,13 +223,13 @@ public:
|
|||
bool enableLocalityLoadBalance;
|
||||
|
||||
struct VersionRequest {
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
Promise<GetReadVersionReply> reply;
|
||||
TagSet tags;
|
||||
Optional<UID> debugID;
|
||||
|
||||
VersionRequest(SpanID spanID, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
|
||||
: spanID(spanID), tags(tags), debugID(debugID) {}
|
||||
VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
|
||||
: spanContext(spanContext), tags(tags), debugID(debugID) {}
|
||||
};
|
||||
|
||||
// Transaction start request batching
|
||||
|
|
|
@ -153,7 +153,7 @@ struct CommitTransactionRequest : TimedRequest {
|
|||
bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
|
||||
|
||||
Arena arena;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
CommitTransactionRef transaction;
|
||||
ReplyPromise<CommitID> reply;
|
||||
uint32_t flags;
|
||||
|
@ -163,7 +163,7 @@ struct CommitTransactionRequest : TimedRequest {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, transaction, reply, arena, flags, debugID, spanID);
|
||||
serializer(ar, transaction, reply, arena, flags, debugID, spanContext);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -210,7 +210,7 @@ struct GetReadVersionRequest : TimedRequest {
|
|||
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
|
||||
};
|
||||
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
uint32_t transactionCount;
|
||||
uint32_t flags;
|
||||
TransactionPriority priority;
|
||||
|
@ -221,10 +221,10 @@ struct GetReadVersionRequest : TimedRequest {
|
|||
ReplyPromise<GetReadVersionReply> reply;
|
||||
|
||||
GetReadVersionRequest() : transactionCount(1), flags(0) {}
|
||||
GetReadVersionRequest(SpanID spanID, uint32_t transactionCount, TransactionPriority priority,
|
||||
GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority,
|
||||
uint32_t flags = 0, TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
|
||||
Optional<UID> debugID = Optional<UID>())
|
||||
: spanID(spanID), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags),
|
||||
: spanContext(spanContext), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags),
|
||||
debugID(debugID) {
|
||||
flags = flags & ~FLAG_PRIORITY_MASK;
|
||||
switch(priority) {
|
||||
|
@ -246,7 +246,7 @@ struct GetReadVersionRequest : TimedRequest {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, transactionCount, flags, tags, debugID, reply, spanID);
|
||||
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext);
|
||||
|
||||
if(ar.isDeserializing) {
|
||||
if((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {
|
||||
|
@ -279,7 +279,7 @@ struct GetKeyServerLocationsReply {
|
|||
struct GetKeyServerLocationsRequest {
|
||||
constexpr static FileIdentifier file_identifier = 9144680;
|
||||
Arena arena;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
KeyRef begin;
|
||||
Optional<KeyRef> end;
|
||||
int limit;
|
||||
|
@ -287,28 +287,28 @@ struct GetKeyServerLocationsRequest {
|
|||
ReplyPromise<GetKeyServerLocationsReply> reply;
|
||||
|
||||
GetKeyServerLocationsRequest() : limit(0), reverse(false) {}
|
||||
GetKeyServerLocationsRequest(SpanID spanID, KeyRef const& begin, Optional<KeyRef> const& end, int limit,
|
||||
GetKeyServerLocationsRequest(SpanID spanContext, KeyRef const& begin, Optional<KeyRef> const& end, int limit,
|
||||
bool reverse, Arena const& arena)
|
||||
: spanID(spanID), begin(begin), end(end), limit(limit), reverse(reverse), arena(arena) {}
|
||||
: spanContext(spanContext), begin(begin), end(end), limit(limit), reverse(reverse), arena(arena) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, begin, end, limit, reverse, reply, spanID, arena);
|
||||
serializer(ar, begin, end, limit, reverse, reply, spanContext, arena);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetRawCommittedVersionRequest {
|
||||
constexpr static FileIdentifier file_identifier = 12954034;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
Optional<UID> debugID;
|
||||
ReplyPromise<GetReadVersionReply> reply;
|
||||
|
||||
explicit GetRawCommittedVersionRequest(SpanID spanID, Optional<UID> const& debugID = Optional<UID>()) : spanID(spanID), debugID(debugID) {}
|
||||
explicit GetRawCommittedVersionRequest() : spanID(), debugID() {}
|
||||
explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional<UID> const& debugID = Optional<UID>()) : spanContext(spanContext), debugID(debugID) {}
|
||||
explicit GetRawCommittedVersionRequest() : spanContext(), debugID() {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, debugID, reply, spanID);
|
||||
serializer(ar, debugID, reply, spanContext);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1858,8 +1858,8 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, SpanID spanID ) {
|
||||
state Span span("NA:waitForCommittedVersion"_loc, { spanID });
|
||||
ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, SpanID spanContext ) {
|
||||
state Span span("NA:waitForCommittedVersion"_loc, { spanContext });
|
||||
try {
|
||||
loop {
|
||||
choose {
|
||||
|
@ -1882,14 +1882,14 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, Spa
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> getRawVersion( Database cx, SpanID spanID ) {
|
||||
state Span span("NA:getRawVersion"_loc, { spanID });
|
||||
ACTOR Future<Version> getRawVersion( Database cx, SpanID spanContext ) {
|
||||
state Span span("NA:getRawVersion"_loc, { spanContext });
|
||||
loop {
|
||||
choose {
|
||||
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
||||
when(GetReadVersionReply v =
|
||||
wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
|
||||
GetReadVersionRequest(spanID, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
|
||||
GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
|
||||
return v.version;
|
||||
}
|
||||
}
|
||||
|
@ -2002,7 +2002,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
|||
req.version = version;
|
||||
req.begin = firstGreaterOrEqual( range.begin );
|
||||
req.end = firstGreaterOrEqual( range.end );
|
||||
req.spanID = span->context;
|
||||
req.spanContext = span->context;
|
||||
|
||||
transformRangeLimits(limits, reverse, req);
|
||||
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
|
||||
|
@ -2300,7 +2300,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
|||
|
||||
req.tags = cx->sampleReadTags() ? tags : Optional<TagSet>();
|
||||
req.debugID = info.debugID;
|
||||
req.spanID = span->context;
|
||||
req.spanContext = span->context;
|
||||
try {
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
|
||||
|
@ -3159,7 +3159,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
state TraceInterval interval( "TransactionCommit" );
|
||||
state double startTime = now();
|
||||
state Span span("NA:tryCommit"_loc, { info.span->context });
|
||||
req.spanID = span->context;
|
||||
req.spanContext = span->context;
|
||||
if (info.debugID.present())
|
||||
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
|
||||
try {
|
||||
|
@ -3672,7 +3672,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
|
|||
}
|
||||
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
|
||||
}
|
||||
span->parents.insert(req.spanID);
|
||||
span->parents.insert(req.spanContext);
|
||||
requests.push_back(req.reply);
|
||||
for(auto tag : req.tags) {
|
||||
++tags[tag];
|
||||
|
|
|
@ -334,7 +334,7 @@ private:
|
|||
Future<Void> committing;
|
||||
};
|
||||
|
||||
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanID);
|
||||
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext);
|
||||
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx, KeyRange keys,
|
||||
int shardLimit);
|
||||
|
||||
|
|
|
@ -171,7 +171,7 @@ struct GetValueReply : public LoadBalancedReply {
|
|||
|
||||
struct GetValueRequest : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 8454530;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
Key key;
|
||||
Version version;
|
||||
Optional<TagSet> tags;
|
||||
|
@ -179,12 +179,12 @@ struct GetValueRequest : TimedRequest {
|
|||
ReplyPromise<GetValueReply> reply;
|
||||
|
||||
GetValueRequest(){}
|
||||
GetValueRequest(SpanID spanID, const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID)
|
||||
: spanID(spanID), key(key), version(ver), tags(tags), debugID(debugID) {}
|
||||
GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID)
|
||||
: spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, key, version, tags, debugID, reply, spanID);
|
||||
serializer(ar, key, version, tags, debugID, reply, spanContext);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -204,7 +204,7 @@ struct WatchValueReply {
|
|||
|
||||
struct WatchValueRequest {
|
||||
constexpr static FileIdentifier file_identifier = 14747733;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
Key key;
|
||||
Optional<Value> value;
|
||||
Version version;
|
||||
|
@ -213,13 +213,13 @@ struct WatchValueRequest {
|
|||
ReplyPromise<WatchValueReply> reply;
|
||||
|
||||
WatchValueRequest(){}
|
||||
WatchValueRequest(SpanID spanID, const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags,
|
||||
WatchValueRequest(SpanID spanContext, const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags,
|
||||
Optional<UID> debugID)
|
||||
: spanID(spanID), key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
|
||||
: spanContext(spanContext), key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, key, value, version, tags, debugID, reply, spanID);
|
||||
serializer(ar, key, value, version, tags, debugID, reply, spanContext);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -241,7 +241,7 @@ struct GetKeyValuesReply : public LoadBalancedReply {
|
|||
|
||||
struct GetKeyValuesRequest : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 6795746;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
Arena arena;
|
||||
KeySelectorRef begin, end;
|
||||
Version version; // or latestVersion
|
||||
|
@ -254,7 +254,7 @@ struct GetKeyValuesRequest : TimedRequest {
|
|||
GetKeyValuesRequest() : isFetchKeys(false) {}
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanID, arena);
|
||||
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -274,7 +274,7 @@ struct GetKeyReply : public LoadBalancedReply {
|
|||
|
||||
struct GetKeyRequest : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 10457870;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
Arena arena;
|
||||
KeySelectorRef sel;
|
||||
Version version; // or latestVersion
|
||||
|
@ -283,13 +283,13 @@ struct GetKeyRequest : TimedRequest {
|
|||
ReplyPromise<GetKeyReply> reply;
|
||||
|
||||
GetKeyRequest() {}
|
||||
GetKeyRequest(SpanID spanID, KeySelectorRef const& sel, Version version, Optional<TagSet> tags,
|
||||
GetKeyRequest(SpanID spanContext, KeySelectorRef const& sel, Version version, Optional<TagSet> tags,
|
||||
Optional<UID> debugID)
|
||||
: spanID(spanID), sel(sel), version(version), debugID(debugID) {}
|
||||
: spanContext(spanContext), sel(sel), version(version), debugID(debugID) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, sel, version, tags, debugID, reply, spanID, arena);
|
||||
serializer(ar, sel, version, tags, debugID, reply, spanContext, arena);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -155,21 +155,21 @@ struct GetCommitVersionReply {
|
|||
|
||||
struct GetCommitVersionRequest {
|
||||
constexpr static FileIdentifier file_identifier = 16683181;
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
uint64_t requestNum;
|
||||
uint64_t mostRecentProcessedRequestNum;
|
||||
UID requestingProxy;
|
||||
ReplyPromise<GetCommitVersionReply> reply;
|
||||
|
||||
GetCommitVersionRequest() { }
|
||||
GetCommitVersionRequest(SpanID spanID, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum,
|
||||
GetCommitVersionRequest(SpanID spanContext, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum,
|
||||
UID requestingProxy)
|
||||
: spanID(spanID), requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum),
|
||||
: spanContext(spanContext), requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum),
|
||||
requestingProxy(requestingProxy) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply, spanID);
|
||||
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply, spanContext);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -329,11 +329,11 @@ ACTOR Future<Void> queueTransactionStartRequests(
|
|||
if (req.priority >= TransactionPriority::IMMEDIATE) {
|
||||
stats->txnSystemPriorityStartIn += req.transactionCount;
|
||||
systemQueue->push_back(req);
|
||||
systemQueue->span->parents.insert(req.spanID);
|
||||
systemQueue->span->parents.insert(req.spanContext);
|
||||
} else if (req.priority >= TransactionPriority::DEFAULT) {
|
||||
stats->txnDefaultPriorityStartIn += req.transactionCount;
|
||||
defaultQueue->push_back(req);
|
||||
defaultQueue->span->parents.insert(req.spanID);
|
||||
defaultQueue->span->parents.insert(req.spanContext);
|
||||
} else {
|
||||
// Return error for batch_priority GRV requests
|
||||
int64_t proxiesCount = std::max((int)db->get().client.proxies.size(), 1);
|
||||
|
@ -345,7 +345,7 @@ ACTOR Future<Void> queueTransactionStartRequests(
|
|||
|
||||
stats->txnBatchPriorityStartIn += req.transactionCount;
|
||||
batchQueue->push_back(req);
|
||||
batchQueue->span->parents.insert(req.spanID);
|
||||
batchQueue->span->parents.insert(req.spanContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -515,7 +515,7 @@ struct ResolutionRequestBuilder {
|
|||
Span& parentSpan)
|
||||
: self(self), requests(self->resolvers.size()) {
|
||||
for (auto& req : requests) {
|
||||
req.spanID = parentSpan->context;
|
||||
req.spanContext = parentSpan->context;
|
||||
req.prevVersion = prevVersion;
|
||||
req.version = version;
|
||||
req.lastReceivedVersion = lastReceivedVersion;
|
||||
|
@ -822,7 +822,7 @@ ACTOR Future<Void> commitBatch(
|
|||
debugID = nondeterministicRandom()->randomUniqueID();
|
||||
g_traceBatch.addAttach("CommitAttachID", trs[t].debugID.get().first(), debugID.get().first());
|
||||
}
|
||||
span->parents.insert(trs[t].spanID);
|
||||
span->parents.insert(trs[t].spanContext);
|
||||
}
|
||||
|
||||
if(localBatchNumber == 2 && !debugID.present() && self->firstProxy && !g_network->isSimulated()) {
|
||||
|
@ -2105,7 +2105,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
}
|
||||
when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
|
||||
//TraceEvent("ProxyGetRCV", proxy.id());
|
||||
Span span("MP:getRawCommittedReadVersion"_loc, { req.spanID });
|
||||
Span span("MP:getRawCommittedReadVersion"_loc, { req.spanContext });
|
||||
if (req.debugID.present())
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
|
||||
GetReadVersionReply rep;
|
||||
|
|
|
@ -97,7 +97,7 @@ struct ResolveTransactionBatchRequest {
|
|||
constexpr static FileIdentifier file_identifier = 16462858;
|
||||
Arena arena;
|
||||
|
||||
SpanID spanID;
|
||||
SpanID spanContext;
|
||||
Version prevVersion;
|
||||
Version version; // FIXME: ?
|
||||
Version lastReceivedVersion;
|
||||
|
@ -109,7 +109,7 @@ struct ResolveTransactionBatchRequest {
|
|||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena,
|
||||
debugID, spanID);
|
||||
debugID, spanContext);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -911,7 +911,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
|
|||
}
|
||||
|
||||
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
|
||||
state Span span("M:getVersion"_loc, { req.spanID });
|
||||
state Span span("M:getVersion"_loc, { req.spanContext });
|
||||
state std::map<UID, ProxyVersionReplies>::iterator proxyItr = self->lastProxyVersionReplies.find(req.requestingProxy); // lastProxyVersionReplies never changes
|
||||
|
||||
if (proxyItr == self->lastProxyVersionReplies.end()) {
|
||||
|
|
|
@ -849,8 +849,8 @@ updateProcessStats(StorageServer* self)
|
|||
#pragma region Queries
|
||||
#endif
|
||||
|
||||
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, UID context) {
|
||||
state Span span(context, "SS.WaitForVersion"_loc);
|
||||
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) {
|
||||
state Span span("SS.WaitForVersion"_loc, { spanContext });
|
||||
choose {
|
||||
when(wait(data->version.whenAtLeast(version))) {
|
||||
// FIXME: A bunch of these can block with or without the following delay 0.
|
||||
|
@ -869,7 +869,7 @@ ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version,
|
|||
}
|
||||
}
|
||||
|
||||
Future<Version> waitForVersion(StorageServer* data, Version version, UID context) {
|
||||
Future<Version> waitForVersion(StorageServer* data, Version version, SpanID spanContext) {
|
||||
if (version == latestVersion) {
|
||||
version = std::max(Version(1), data->version.get());
|
||||
}
|
||||
|
@ -887,7 +887,7 @@ Future<Version> waitForVersion(StorageServer* data, Version version, UID context
|
|||
if (deterministicRandom()->random01() < 0.001) {
|
||||
TraceEvent("WaitForVersion1000x");
|
||||
}
|
||||
return waitForVersionActor(data, version, context);
|
||||
return waitForVersionActor(data, version, spanContext);
|
||||
}
|
||||
|
||||
ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
|
||||
|
@ -928,7 +928,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req, Span spa
|
|||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
||||
state Optional<Value> v;
|
||||
state Version version = wait( waitForVersion( data, req.version, req.spanID ) );
|
||||
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
|
||||
if( req.debugID.present() )
|
||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
||||
|
@ -1610,12 +1610,12 @@ ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req, Span span )
|
|||
wait( delay(0, TaskPriority::DefaultEndpoint) );
|
||||
|
||||
try {
|
||||
state Version version = wait( waitForVersion( data, req.version, req.spanID ) );
|
||||
state Version version = wait( waitForVersion( data, req.version, req.spanContext ) );
|
||||
state uint64_t changeCounter = data->shardChangeCounter;
|
||||
state KeyRange shard = getShardKeyRange( data, req.sel );
|
||||
|
||||
state int offset;
|
||||
Key k = wait( findKey( data, req.sel, version, shard, &offset, req.spanID ) );
|
||||
Key k = wait( findKey( data, req.sel, version, shard, &offset, req.spanContext ) );
|
||||
|
||||
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
|
||||
|
||||
|
@ -3676,7 +3676,7 @@ ACTOR Future<Void> checkBehind( StorageServer* self ) {
|
|||
ACTOR Future<Void> serveGetValueRequests( StorageServer* self, FutureStream<GetValueRequest> getValue ) {
|
||||
loop {
|
||||
GetValueRequest req = waitNext(getValue);
|
||||
Span span("SS:getValue"_loc, { req.spanID });
|
||||
Span span("SS:getValue"_loc, { req.spanContext });
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
|
||||
if( req.debugID.present() )
|
||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
@ -3691,7 +3691,7 @@ ACTOR Future<Void> serveGetValueRequests( StorageServer* self, FutureStream<GetV
|
|||
ACTOR Future<Void> serveGetKeyValuesRequests( StorageServer* self, FutureStream<GetKeyValuesRequest> getKeyValues ) {
|
||||
loop {
|
||||
GetKeyValuesRequest req = waitNext(getKeyValues);
|
||||
Span span("SS:getKeyValues"_loc, { req.spanID });
|
||||
Span span("SS:getKeyValues"_loc, { req.spanContext });
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
|
||||
self->actors.add(self->readGuard(span, req, getKeyValuesQ));
|
||||
}
|
||||
|
@ -3700,7 +3700,7 @@ ACTOR Future<Void> serveGetKeyValuesRequests( StorageServer* self, FutureStream<
|
|||
ACTOR Future<Void> serveGetKeyRequests( StorageServer* self, FutureStream<GetKeyRequest> getKey ) {
|
||||
loop {
|
||||
GetKeyRequest req = waitNext(getKey);
|
||||
Span span("SS:getKey"_loc, { req.spanID });
|
||||
Span span("SS:getKey"_loc, { req.spanContext });
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
|
||||
self->actors.add(self->readGuard(span, req , getKeyQ));
|
||||
}
|
||||
|
@ -3709,7 +3709,7 @@ ACTOR Future<Void> serveGetKeyRequests( StorageServer* self, FutureStream<GetKey
|
|||
ACTOR Future<Void> serveWatchValueRequests( StorageServer* self, FutureStream<WatchValueRequest> watchValue ) {
|
||||
loop {
|
||||
WatchValueRequest req = waitNext(watchValue);
|
||||
Span span("SS:watchValue"_loc, { req.spanID });
|
||||
Span span("SS:watchValue"_loc, { req.spanContext });
|
||||
// TODO: fast load balancing?
|
||||
// SOMEDAY: combine watches for the same key/value into a single watch
|
||||
self->actors.add(self->readGuard(span, req, watchValueQ));
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* FBTrace.cpp
|
||||
* Tracing.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* FBTrace.h
|
||||
* Tracing.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue