Spans for read and commit path
This commit is contained in:
parent
c829b12bbf
commit
caabbec466
|
@ -155,18 +155,21 @@ struct GetCommitVersionReply {
|
|||
|
||||
struct GetCommitVersionRequest {
|
||||
constexpr static FileIdentifier file_identifier = 16683181;
|
||||
SpanID spanID;
|
||||
uint64_t requestNum;
|
||||
uint64_t mostRecentProcessedRequestNum;
|
||||
UID requestingProxy;
|
||||
ReplyPromise<GetCommitVersionReply> reply;
|
||||
|
||||
GetCommitVersionRequest() { }
|
||||
GetCommitVersionRequest(uint64_t requestNum, uint64_t mostRecentProcessedRequestNum, UID requestingProxy)
|
||||
: requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum), requestingProxy(requestingProxy) {}
|
||||
GetCommitVersionRequest(SpanID spanID, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum,
|
||||
UID requestingProxy)
|
||||
: spanID(spanID), requestNum(requestNum), mostRecentProcessedRequestNum(mostRecentProcessedRequestNum),
|
||||
requestingProxy(requestingProxy) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply);
|
||||
serializer(ar, requestNum, mostRecentProcessedRequestNum, requestingProxy, reply, spanID);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -511,8 +511,11 @@ struct ResolutionRequestBuilder {
|
|||
// [CommitTransactionRef_Index][Resolver_Index][Read_Conflict_Range_Index_on_Resolver]
|
||||
// -> read_conflict_range's original index in the commitTransactionRef
|
||||
|
||||
ResolutionRequestBuilder( ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion) : self(self), requests(self->resolvers.size()) {
|
||||
for(auto& req : requests) {
|
||||
ResolutionRequestBuilder(ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion,
|
||||
Span& parentSpan)
|
||||
: self(self), requests(self->resolvers.size()) {
|
||||
for (auto& req : requests) {
|
||||
req.spanID = parentSpan->context;
|
||||
req.prevVersion = prevVersion;
|
||||
req.version = version;
|
||||
req.lastReceivedVersion = lastReceivedVersion;
|
||||
|
@ -840,7 +843,7 @@ ACTOR Future<Void> commitBatch(
|
|||
if (debugID.present())
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
|
||||
|
||||
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
|
||||
GetCommitVersionRequest req(span->context, self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
|
||||
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
|
||||
self->mostRecentProcessedRequestNumber = versionReply.requestNum;
|
||||
|
||||
|
@ -861,7 +864,7 @@ ACTOR Future<Void> commitBatch(
|
|||
if (debugID.present())
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
|
||||
|
||||
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
|
||||
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version, span );
|
||||
int conflictRangeCount = 0;
|
||||
state int64_t maxTransactionBytes = 0;
|
||||
for (int t = 0; t<trs.size(); t++) {
|
||||
|
|
|
@ -20,6 +20,9 @@
|
|||
|
||||
#ifndef FDBSERVER_RESOLVERINTERFACE_H
|
||||
#define FDBSERVER_RESOLVERINTERFACE_H
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
@ -91,17 +94,19 @@ struct ResolveTransactionBatchRequest {
|
|||
constexpr static FileIdentifier file_identifier = 16462858;
|
||||
Arena arena;
|
||||
|
||||
SpanID spanID;
|
||||
Version prevVersion;
|
||||
Version version; // FIXME: ?
|
||||
Version lastReceivedVersion;
|
||||
VectorRef<CommitTransactionRef> transactions;
|
||||
VectorRef<struct CommitTransactionRef> transactions;
|
||||
VectorRef<int> txnStateTransactions; // Offsets of elements of transactions that have (transaction subsystem state) mutations
|
||||
ReplyPromise<ResolveTransactionBatchReply> reply;
|
||||
Optional<UID> debugID;
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena, debugID);
|
||||
serializer(ar, prevVersion, version, lastReceivedVersion, transactions, txnStateTransactions, reply, arena,
|
||||
debugID, spanID);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -911,6 +911,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
|
|||
}
|
||||
|
||||
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
|
||||
state Span span("M:getVersion"_loc, { req.spanID });
|
||||
state std::map<UID, ProxyVersionReplies>::iterator proxyItr = self->lastProxyVersionReplies.find(req.requestingProxy); // lastProxyVersionReplies never changes
|
||||
|
||||
if (proxyItr == self->lastProxyVersionReplies.end()) {
|
||||
|
|
Loading…
Reference in New Issue