From 90a27bedf5bccb69447eb7e798e7f53df1055807 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Sat, 2 Oct 2021 20:25:47 -0700 Subject: [PATCH] fix conflict fdbserver/ApplyMetadataMutation.cpp --- fdbclient/CommitTransaction.h | 6 +++++- fdbserver/ApplyMetadataMutation.cpp | 15 +++++++++++++++ fdbserver/ApplyMetadataMutation.h | 7 +++++-- fdbserver/CommitProxyServer.actor.cpp | 8 ++++++-- fdbserver/Resolver.actor.cpp | 6 ++++-- 5 files changed, 35 insertions(+), 7 deletions(-) diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 34eb1a916d..d89dc90715 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -187,17 +187,21 @@ struct CommitTransactionRef { VectorRef mutations; Version read_snapshot; bool report_conflicting_keys; + SpanID spanContext; template force_inline void serialize(Ar& ar) { if constexpr (is_fb_function) { serializer( - ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot, report_conflicting_keys); + ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot, report_conflicting_keys, spanContext); } else { serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot); if (ar.protocolVersion().hasReportConflictingKeys()) { serializer(ar, report_conflicting_keys); } + if (ar.protocolVersion().hasSpanContext()) { + serializer(ar, spanContext); + } } } diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 674a23ce93..85e5dea20d 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -28,6 +28,7 @@ #include "fdbserver/LogProtocolMessage.h" #include "fdbserver/LogSystem.h" #include "flow/Error.h" +#include "flow/Trace.h" Reference getStorageInfo(UID id, std::map>* storageCache, @@ -231,6 +232,8 @@ private: privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString()); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", "LogProtocolMessage"); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); toCommit->addTag(tag); toCommit->writeTypedMessage(LogProtocolMessage(), true); toCommit->addTag(tag); @@ -293,6 +296,7 @@ private: // This is done to make the cache servers aware of the cached key-ranges MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); toCommit->addTag(cacheTag); toCommit->writeTypedMessage(privatized); } @@ -371,6 +375,7 @@ private: Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); if (tagV.present()) { + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); toCommit->addTag(decodeServerTagValue(tagV.get())); toCommit->writeTypedMessage(privatized); } @@ -400,6 +405,7 @@ private: if (tagV.present()) { MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); toCommit->addTag(decodeServerTagValue(tagV.get())); toCommit->writeTypedMessage(privatized); } @@ -518,10 +524,12 @@ private: if (m.param1 == lastEpochEndKey) { toCommit->addTags(allTags); toCommit->writeTypedMessage(LogProtocolMessage(), true); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", "LogProtocolMessage"); } MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", m.toString()); toCommit->addTags(allTags); toCommit->writeTypedMessage(privatized); } @@ -648,6 +656,8 @@ private: privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena); privatized.param2 = keyAfter(kv.key, arena).withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); + toCommit->addTag(decodeServerTagValue(kv.value)); toCommit->writeTypedMessage(privatized); } @@ -670,6 +680,7 @@ private: privatized.param2 = keyAfter(maybeTssRange.begin, arena).withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); toCommit->addTag(decodeServerTagValue(tagV.get())); toCommit->writeTypedMessage(privatized); } @@ -853,6 +864,7 @@ private: if (Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); tagV.present()) { MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); toCommit->addTag(decodeServerTagValue(tagV.get())); toCommit->writeTypedMessage(privatized); } @@ -878,6 +890,7 @@ private: MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", privatized.toString()); toCommit->addTag(decodeServerTagValue(tagV.get())); toCommit->writeTypedMessage(privatized); } @@ -971,6 +984,8 @@ private: } // Add the tags to both begin and end mutations + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", mutationBegin.toString()); + TraceEvent(SevDebug, "SendingPrivatized", dbgid).detail("M", mutationEnd.toString()); toCommit->addTags(allTags); toCommit->writeTypedMessage(mutationBegin); toCommit->addTags(allTags); diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index dc199a390b..4b2b078276 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -20,6 +20,7 @@ #ifndef FDBSERVER_APPLYMETADATAMUTATION_H #define FDBSERVER_APPLYMETADATAMUTATION_H +#include #pragma once #include "fdbclient/BackupAgent.actor.h" @@ -45,6 +46,7 @@ struct ResolverData { LogPushData* toCommit = nullptr; Version popVersion = 0; // exclusive, usually set to commitVersion + 1 std::map>* storageCache = nullptr; + std::unordered_map* tssMapping = nullptr; // For initial broadcast ResolverData(UID debugId, IKeyValueStore* store, KeyRangeMap* info) @@ -57,9 +59,10 @@ struct ResolverData { KeyRangeMap* info, LogPushData* toCommit, Version popVersion, - std::map>* storageCache) + std::map>* storageCache, + std::unordered_map* tssMapping) : dbgid(debugId), txnStateStore(store), keyInfo(info), logSystem(logSystem), toCommit(toCommit), - popVersion(popVersion), storageCache(storageCache) {} + popVersion(popVersion), storageCache(storageCache), tssMapping(tssMapping) {} }; inline bool isMetadataMutation(MutationRef const& m) { diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 98b24c0e3f..7ad2a84ee0 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -180,12 +180,16 @@ struct ResolutionRequestBuilder { getOutTransaction(resolver, trIn.read_snapshot) .write_conflict_ranges.push_back(requests[resolver].arena, r); } - if (isTXNStateTransaction) + if (isTXNStateTransaction) { for (int r = 0; r < requests.size(); r++) { int transactionNumberInRequest = &getOutTransaction(r, trIn.read_snapshot) - requests[r].transactions.begin(); requests[r].txnStateTransactions.push_back(requests[r].arena, transactionNumberInRequest); } + // Note only Resolver 0 got the correct spanContext, which means + // the reply from Resolver 0 has the right one back. + getOutTransaction(0, trIn.read_snapshot).spanContext = trRequest.spanContext; + } std::vector resolversUsed; for (int r = 0; r < outTr.size(); r++) @@ -941,7 +945,7 @@ ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self ResolveTransactionBatchReply& reply = self->resolution[0]; ASSERT_WE_THINK(privateMutations.size() == reply.privateMutations.size()); for (int i = 0; i < privateMutations.size(); i++) { - std::cout << i << "\n" << printable(privateMutations[i]) << "\n" << printable(reply.privateMutations[i]) << "\n\n"; + // std::cout << i << "\n" << printable(privateMutations[i]) << "\n" << printable(reply.privateMutations[i]) << "\n\n"; ASSERT_WE_THINK(privateMutations[i] == reply.privateMutations[i]); } if (self->forceRecovery) { diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index f3741a0f9a..929f231d54 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -72,6 +72,7 @@ struct Resolver : ReferenceCounted { std::map> storageCache; KeyRangeMap keyInfo; // keyrange -> all storage servers in all DCs for the keyrange + std::unordered_map tssMapping; Version debugMinRecentStateVersion; @@ -243,7 +244,8 @@ ACTOR Future resolveBatch(Reference self, ResolveTransactionBatc &self->keyInfo, &toCommit, req.version + 1, - &self->storageCache); + &self->storageCache, + &self->tssMapping); for (int t : req.txnStateTransactions) { stateMutations += req.transactions[t].mutations.size(); stateBytes += req.transactions[t].mutations.expectedSize(); @@ -254,7 +256,7 @@ ACTOR Future resolveBatch(Reference self, ResolveTransactionBatc // Generate private mutations for metadata mutations if (reply.committed[t] == ConflictBatch::TransactionCommitted) { - applyMetadataMutations(SpanID(), resolverData, req.transactions[t].mutations); + applyMetadataMutations(req.transactions[t].spanContext, resolverData, req.transactions[t].mutations); } }