diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index 52355f2c19..5a06a31c40 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -39,10 +39,10 @@ inline bool isMetadataMutation(MutationRef const& m) { Reference getStorageInfo(UID id, std::map>* storageCache, IKeyValueStore* txnStateStore); -void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference logSystem, - const VectorRef& mutations, LogPushData* pToCommit, bool& confChange, - Version popVersion, bool initialCommit); -void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef& mutations, - IKeyValueStore* txnStateStore); +void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena, + Reference logSystem, const VectorRef& mutations, + LogPushData* pToCommit, bool& confChange, Version popVersion, bool initialCommit); +void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena, + const VectorRef& mutations, IKeyValueStore* txnStateStore); #endif diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index c50b1ea5a3..b683646d81 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -679,7 +679,7 @@ struct ILogSystem { // Never returns normally, but throws an error if the subsystem stops working //Future push( UID bundle, int64_t seq, VectorRef messages ); - virtual Future push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, Optional debugID = Optional() ) = 0; + virtual Future push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, SpanID const& spanContext, Optional debugID = Optional() ) = 0; // Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered earlier) // Puts the given messages into the bundle, each with the given tags, and with message versions (version, 0) - (version, N) // Changes the version number of the bundle to be version (unblocking the next push) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 7d8a36a66a..7a2ffea45d 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -296,6 +296,8 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::mapaddTransactionInfo(SpanID()); + // Serialize the log range mutations within the map for (; logRangeMutation != logRangeMutations->end(); ++logRangeMutation) { @@ -357,7 +359,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::maptagsForKey(backupMutation.param1); toCommit->addTags(tags); - toCommit->addTypedMessage(backupMutation); + toCommit->writeTypedMessage(backupMutation); // if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) { // TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString()) @@ -396,7 +398,7 @@ struct CommitBatchContext { int batchOperations = 0; - Span span = Span("MP:commitBatch"_loc); + Span span; int64_t batchBytes = 0; @@ -476,7 +478,9 @@ CommitBatchContext::CommitBatchContext(ProxyCommitData* const pProxyCommitData_, localBatchNumber(++pProxyCommitData->localCommitBatchesStarted), toCommit(pProxyCommitData->logSystem), - committed(trs.size()) { + committed(trs.size()), + + span("MP:commitBatch"_loc) { evaluateBatchSize(); @@ -671,7 +675,7 @@ void applyMetadataEffect(CommitBatchContext* self) { for (int resolver = 0; resolver < self->resolution.size(); resolver++) committed = committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed; if (committed) { - applyMetadataMutations(*self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem, + applyMetadataMutations(SpanID(), *self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem, self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations, /* pToCommit= */ nullptr, self->forceRecovery, /* popVersion= */ 0, /* initialCommit */ false); @@ -754,7 +758,7 @@ ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self for (t = 0; t < trs.size() && !self->forceRecovery; t++) { if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) { self->commitCount++; - applyMetadataMutations(*pProxyCommitData, self->arena, pProxyCommitData->logSystem, + applyMetadataMutations(trs[t].spanContext, *pProxyCommitData, self->arena, pProxyCommitData->logSystem, trs[t].transaction.mutations, &self->toCommit, self->forceRecovery, self->commitVersion + 1, /* initialCommit= */ false); } @@ -803,6 +807,9 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { state Optional* trCost = &trs[self->transactionNum].commitCostEstimation; state int mutationNum = 0; state VectorRef* pMutations = &trs[self->transactionNum].transaction.mutations; + + self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext); + for (; mutationNum < pMutations->size(); mutationNum++) { if(self->yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) { self->yieldBytes = 0; @@ -857,7 +864,7 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if(pProxyCommitData->cacheInfo[m.param1]) { self->toCommit.addTag(cacheTag); } - self->toCommit.addTypedMessage(m); + self->toCommit.writeTypedMessage(m); } else if (m.type == MutationRef::ClearRange) { KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2)); @@ -908,7 +915,7 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if(pProxyCommitData->needsCacheTag(clearRange)) { self->toCommit.addTag(cacheTag); } - self->toCommit.addTypedMessage(m); + self->toCommit.writeTypedMessage(m); } else { UNREACHABLE(); } @@ -1049,7 +1056,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { if(firstMessage) { self->toCommit.addTxsTag(); } - self->toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage); + self->toCommit.writeMessage(StringRef(m.begin(), m.size()), !firstMessage); firstMessage = false; } @@ -1064,7 +1071,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { self->commitStartTime = now(); pProxyCommitData->lastStartCommit = self->commitStartTime; - self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->debugID ); + self->loggingComplete = pProxyCommitData->logSystem->push( self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, self->span.context, self->debugID ); if (!self->forceRecovery) { ASSERT(pProxyCommitData->latestLocalCommitBatchLogging.get() == self->localBatchNumber-1); @@ -1806,7 +1813,7 @@ ACTOR Future masterProxyServerCore( Arena arena; bool confChanges; - applyMetadataMutations(commitData, arena, Reference(), mutations, + applyMetadataMutations(SpanID(), commitData, arena, Reference(), mutations, /* pToCommit= */ nullptr, confChanges, /* popVersion= */ 0, /* initialCommit= */ true); } diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index f5afa5df87..6fff6fb1f5 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -240,6 +240,7 @@ struct TLogCommitReply { struct TLogCommitRequest { constexpr static FileIdentifier file_identifier = 4022206; + SpanID spanContext; Arena arena; Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion; @@ -249,11 +250,11 @@ struct TLogCommitRequest { Optional debugID; TLogCommitRequest() {} - TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional debugID ) - : arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {} + TLogCommitRequest( const SpanID& context, const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional debugID ) + : spanContext(context), arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {} template void serialize( Ar& ar ) { - serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID); + serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, spanContext, debugID); } }; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 529cf58ac9..09e90d6eb1 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -527,7 +527,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted push(Version prevVersion, Version version, Version knownCommittedVersion, - Version minKnownCommittedVersion, LogPushData& data, Optional debugID) final { + Version minKnownCommittedVersion, LogPushData& data, + SpanID const& spanContext, Optional debugID) final { // FIXME: Randomize request order as in LegacyLogSystem? vector> quorumResults; vector> allReplies; @@ -542,7 +543,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> tLogCommitResults; for(int loc=0; loc< it->logServers.size(); loc++) { Standalone msg = data.getMessages(location); - allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) ); + allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( spanContext, msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) ); Future commitSuccess = success(allReplies.back()); addActor.get().send(commitSuccess); tLogCommitResults.push_back(commitSuccess); diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index ce5c993d77..bc1168c58a 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1613,7 +1613,7 @@ ACTOR Future masterCore( Reference self ) { } } - applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), + applyMetadataMutations(SpanID(), self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), self->txnStateStore); mmApplied = tr.mutations.size();