From f86058fba6d1ca44f9d1e091a3eadd2246f0b213 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 30 Sep 2024 04:53:37 -0700 Subject: [PATCH] Remove the usage of txsTag (#11688) * Add assertions to code paths with txsTag txsTag should be obsolete by now, since it's used in 6.1, which is no longer supported for upgrade. * Actually remove txsTag usage 20240926-225930-jzhou-7ed3304c415ae65e * Remove more code 20240926-235242-jzhou-7ed3304c415ae65e * Disable two verbose trace events They can cause TraceTooManyLines errors. --- design/transaction-state-store.md | 2 +- fdbclient/ReadYourWrites.actor.cpp | 2 +- fdbclient/include/fdbclient/FDBTypes.h | 4 +- fdbserver/LogSystem.cpp | 15 +--- fdbserver/MutationTracking.cpp | 4 +- fdbserver/StorageCache.actor.cpp | 1 + fdbserver/TagPartitionedLogSystem.actor.cpp | 96 ++++++--------------- fdbserver/storageserver.actor.cpp | 14 +-- 8 files changed, 38 insertions(+), 100 deletions(-) diff --git a/design/transaction-state-store.md b/design/transaction-state-store.md index 5ad04a9ef7..f7d6425c5a 100644 --- a/design/transaction-state-store.md +++ b/design/transaction-state-store.md @@ -67,7 +67,7 @@ backup data and is *NOT* metadata mutations. When a commit proxy writes metadata mutations to the log system, the proxy assigns a "txs" tag to the mutation. Depending on FDB versions, the "txs" tag can be one special -tag `txsTag{ tagLocalitySpecial, 1 }` for `TLogVersion::V3` (FDB 6.1) or a randomized +tag `txsTag{ tagLocalitySpecial, 1 }` for `TLogVersion::V3` (FDB 6.1, obsolete now) or a randomized "txs" tag for `TLogVersion::V4` (FDB 6.2 and later) and larger. The idea of randomized "txs" tag is to spread metadata mutations to all TLogs for faster parallel recovery of `txnStateStore`. diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 3ad53380be..3664ddc6ca 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -2532,7 +2532,7 @@ void ReadYourWritesTransaction::setOption(FDBTransactionOptions::Option option, } void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option option, Optional value) { - TraceEvent(SevDebug, "TransactionSetOption").detail("Option", option).detail("Value", value); + TraceEvent(SevVerbose, "TransactionSetOption").detail("Option", option).detail("Value", value); switch (option) { case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE: validateOptionValueNotPresent(value); diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index 8e93409692..2e276dccdb 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -155,11 +155,9 @@ struct hash { } // namespace std static const Tag invalidTag{ tagLocalitySpecial, 0 }; -static const Tag txsTag{ tagLocalitySpecial, 1 }; +static const Tag txsTag{ tagLocalitySpecial, 1 }; // obsolete now static const Tag cacheTag{ tagLocalitySpecial, 2 }; -enum { txsTagOld = -1, invalidTagOld = -100 }; - struct TagsAndMessage { StringRef message; VectorRef tags; diff --git a/fdbserver/LogSystem.cpp b/fdbserver/LogSystem.cpp index c6facbdcdf..d201a404d8 100644 --- a/fdbserver/LogSystem.cpp +++ b/fdbserver/LogSystem.cpp @@ -180,12 +180,9 @@ void LogSet::checkSatelliteTagLocations() { int LogSet::bestLocationFor(Tag tag) { if (locality == tagLocalitySatellite) { - return satelliteTagLocations[tag == txsTag ? 0 : tag.id + 1][0]; + return satelliteTagLocations[tag.id + 1][0]; } - // the following logic supports upgrades from 5.X - if (tag == txsTag) - return txsTagOld % logServers.size(); return tag.id % logServers.size(); } @@ -219,8 +216,8 @@ bool LogSet::satisfiesPolicy(const std::vector& locations) { void LogSet::getPushLocations(VectorRef tags, std::vector& locations, int locationOffset, bool allLocations) { if (locality == tagLocalitySatellite) { for (auto& t : tags) { - if (t == txsTag || t.locality == tagLocalityTxs || t.locality == tagLocalityLogRouter) { - for (int loc : satelliteTagLocations[t == txsTag ? 0 : t.id + 1]) { + if (t.locality == tagLocalityTxs || t.locality == tagLocalityLogRouter) { + for (int loc : satelliteTagLocations[t.id + 1]) { locations.push_back(locationOffset + loc); } } @@ -282,11 +279,7 @@ LogPushData::LogPushData(Reference logSystem, int tlogCount) : logSy } void LogPushData::addTxsTag() { - if (logSystem->getTLogVersion() >= TLogVersion::V4) { - next_message_tags.push_back(logSystem->getRandomTxsTag()); - } else { - next_message_tags.push_back(txsTag); - } + next_message_tags.push_back(logSystem->getRandomTxsTag()); } void LogPushData::addTransactionInfo(SpanContext const& context) { diff --git a/fdbserver/MutationTracking.cpp b/fdbserver/MutationTracking.cpp index 291d31a5c8..ce9dfac36c 100644 --- a/fdbserver/MutationTracking.cpp +++ b/fdbserver/MutationTracking.cpp @@ -90,8 +90,8 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri } TagsAndMessage msg; msg.loadFromArena(&rdr, nullptr); - bool logAdapterMessage = std::any_of( - msg.tags.begin(), msg.tags.end(), [](const Tag& t) { return t == txsTag || t.locality == tagLocalityTxs; }); + bool logAdapterMessage = + std::any_of(msg.tags.begin(), msg.tags.end(), [](const Tag& t) { return t.locality == tagLocalityTxs; }); StringRef mutationData = msg.getMessageWithoutTags(); uint8_t mutationType = *mutationData.begin(); if (logAdapterMessage) { diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 1863423bfa..255637b43c 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -1402,6 +1402,7 @@ ACTOR Future fetchKeys(StorageCacheData* data, AddingCacheRange* cacheRang // TODO: NEELAM: what's this for? // FIXME: remove when we no longer support upgrades from 5.X if (debug_getRangeRetries >= 100) { + ASSERT(false); data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::False; } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index fbf42e0cf8..145a6a09be 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -687,9 +687,8 @@ Reference TagPartitionedLogSystem::peekAll(UID dbgid, foundSpecial = true; } if (log->isLocal && log->logServers.size() && - (log->locality == tagLocalitySpecial || log->locality == tag.locality || tag == txsTag || - tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter || - (tag == cacheTag && log->locality != tagLocalitySatellite))) { + (log->locality == tagLocalitySpecial || log->locality == tag.locality || tag.locality == tagLocalityTxs || + tag.locality == tagLocalityLogRouter || (tag == cacheTag && log->locality != tagLocalitySatellite))) { lastBegin = std::max(lastBegin, log->startVersion); localSets.push_back(log); if (log->locality != tagLocalitySatellite) { @@ -725,7 +724,7 @@ Reference TagPartitionedLogSystem::peekAll(UID dbgid, } for (int i = 0; begin < lastBegin; i++) { if (i == oldLogData.size()) { - if (tag == txsTag || tag.locality == tagLocalityTxs || tag == cacheTag) { + if (tag.locality == tagLocalityTxs || tag == cacheTag) { break; } TraceEvent("TLogPeekAllDead", dbgid) @@ -747,7 +746,7 @@ Reference TagPartitionedLogSystem::peekAll(UID dbgid, thisSpecial = true; } if (log->isLocal && log->logServers.size() && - (log->locality == tagLocalitySpecial || log->locality == tag.locality || tag == txsTag || + (log->locality == tagLocalitySpecial || log->locality == tag.locality || tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter || (tag == cacheTag && log->locality != tagLocalitySatellite))) { thisBegin = std::max(thisBegin, log->startVersion); @@ -1058,7 +1057,7 @@ Reference TagPartitionedLogSystem::peekLocal(UID dbgid, Version lastBegin = tLogs[bestSet]->startVersion; for (int i = 0; begin < lastBegin; i++) { if (i == oldLogData.size()) { - if ((tag == txsTag || tag.locality == tagLocalityTxs) && cursors.size()) { + if (tag.locality == tagLocalityTxs && cursors.size()) { break; } TraceEvent("TLogPeekLocalDead", dbgid) @@ -1163,7 +1162,7 @@ Reference TagPartitionedLogSystem::peekTxs(UID dbgid, if (!tLogs.size()) { TraceEvent("TLogPeekTxsNoLogs", dbgid).log(); return makeReference( - Reference>>(), txsTag, begin, end, false, false); + Reference>>(), invalidTag, begin, end, false, false); } TraceEvent("TLogPeekTxs", dbgid) .detail("Begin", begin) @@ -1173,10 +1172,8 @@ Reference TagPartitionedLogSystem::peekTxs(UID dbgid, .detail("CanDiscardPopped", canDiscardPopped); int maxTxsTags = txsTags; - bool needsOldTxs = tLogs[0]->tLogVersion < TLogVersion::V4; for (auto& it : oldLogData) { maxTxsTags = std::max(maxTxsTags, it.txsTags); - needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4; } if (peekLocality < 0 || localEnd == invalidVersion || localEnd <= begin) { @@ -1185,10 +1182,6 @@ Reference TagPartitionedLogSystem::peekTxs(UID dbgid, for (int i = 0; i < maxTxsTags; i++) { cursors.push_back(peekAll(dbgid, begin, end, Tag(tagLocalityTxs, i), true)); } - // SOMEDAY: remove once upgrades from 6.2 are no longer supported - if (needsOldTxs) { - cursors.push_back(peekAll(dbgid, begin, end, txsTag, true)); - } return makeReference(cursors, begin, end, false, canDiscardPopped); } @@ -1200,10 +1193,6 @@ Reference TagPartitionedLogSystem::peekTxs(UID dbgid, for (int i = 0; i < maxTxsTags; i++) { cursors.push_back(peekLocal(dbgid, Tag(tagLocalityTxs, i), begin, end, true, peekLocality)); } - // SOMEDAY: remove once upgrades from 6.2 are no longer supported - if (needsOldTxs) { - cursors.push_back(peekLocal(dbgid, txsTag, begin, end, true, peekLocality)); - } return makeReference(cursors, begin, end, false, canDiscardPopped); } @@ -1219,11 +1208,6 @@ Reference TagPartitionedLogSystem::peekTxs(UID dbgid, localCursors.push_back(peekLocal(dbgid, Tag(tagLocalityTxs, i), begin, localEnd, true, peekLocality)); allCursors.push_back(peekAll(dbgid, localEnd, end, Tag(tagLocalityTxs, i), true)); } - // SOMEDAY: remove once upgrades from 6.2 are no longer supported - if (needsOldTxs) { - localCursors.push_back(peekLocal(dbgid, txsTag, begin, localEnd, true, peekLocality)); - allCursors.push_back(peekAll(dbgid, localEnd, end, txsTag, true)); - } cursors[1] = makeReference(localCursors, begin, localEnd, false, canDiscardPopped); cursors[0] = makeReference(allCursors, localEnd, end, false, false); @@ -1237,10 +1221,6 @@ Reference TagPartitionedLogSystem::peekTxs(UID dbgid, for (int i = 0; i < maxTxsTags; i++) { cursors.push_back(peekAll(dbgid, begin, end, Tag(tagLocalityTxs, i), true)); } - // SOMEDAY: remove once upgrades from 6.2 are no longer supported - if (needsOldTxs) { - cursors.push_back(peekAll(dbgid, begin, end, txsTag, true)); - } return makeReference(cursors, begin, end, false, canDiscardPopped); } @@ -1493,12 +1473,8 @@ void TagPartitionedLogSystem::popLogRouter(Version upTo, } void TagPartitionedLogSystem::popTxs(Version upTo, int8_t popLocality) { - if (getTLogVersion() < TLogVersion::V4) { - pop(upTo, txsTag, 0, popLocality); - } else { - for (int i = 0; i < txsTags; i++) { - pop(upTo, Tag(tagLocalityTxs, i), 0, popLocality); - } + for (int i = 0; i < txsTags; i++) { + pop(upTo, Tag(tagLocalityTxs, i), 0, popLocality); } } @@ -1593,8 +1569,7 @@ ACTOR Future TagPartitionedLogSystem::getPoppedTxs(TagPartitionedLogSys poppedFutures.push_back(std::vector>()); for (auto& it : self->tLogs) { for (auto& log : it->logServers) { - poppedFutures.back().push_back(TagPartitionedLogSystem::getPoppedFromTLog( - log, self->tLogs[0]->tLogVersion < TLogVersion::V4 ? txsTag : Tag(tagLocalityTxs, 0))); + poppedFutures.back().push_back(TagPartitionedLogSystem::getPoppedFromTLog(log, Tag(tagLocalityTxs, 0))); } } poppedReady.push_back(waitForAny(poppedFutures.back())); @@ -1605,8 +1580,8 @@ ACTOR Future TagPartitionedLogSystem::getPoppedTxs(TagPartitionedLogSys poppedFutures.push_back(std::vector>()); for (auto& it : old.tLogs) { for (auto& log : it->logServers) { - poppedFutures.back().push_back(TagPartitionedLogSystem::getPoppedFromTLog( - log, old.tLogs[0]->tLogVersion < TLogVersion::V4 ? txsTag : Tag(tagLocalityTxs, 0))); + poppedFutures.back().push_back( + TagPartitionedLogSystem::getPoppedFromTLog(log, Tag(tagLocalityTxs, 0))); } } poppedReady.push_back(waitForAny(poppedFutures.back())); @@ -2840,7 +2815,6 @@ ACTOR Future TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst std::vector remoteTLogReqs(remoteWorkers.remoteTLogs.size()); state std::vector>>> allRemoteTLogServers; - bool nonShardedTxs = self->getTLogVersion() < TLogVersion::V4; if (oldLogSystem->logRouterTags == 0) { std::vector locations; for (Tag tag : localTags) { @@ -2852,14 +2826,12 @@ ACTOR Future TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst if (oldLogSystem->tLogs.size()) { int maxTxsTags = oldLogSystem->txsTags; - bool needsOldTxs = oldLogSystem->tLogs[0]->tLogVersion < TLogVersion::V4; for (auto& it : oldLogSystem->oldLogData) { maxTxsTags = std::max(maxTxsTags, it.txsTags); - needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4; } - for (int i = needsOldTxs ? -1 : 0; i < maxTxsTags; i++) { - Tag tag = i == -1 ? txsTag : Tag(tagLocalityTxs, i); - Tag pushTag = (i == -1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i % self->txsTags); + for (int i = 0; i < maxTxsTags; i++) { + Tag tag = Tag(tagLocalityTxs, i); + Tag pushTag = Tag(tagLocalityTxs, i % self->txsTags); locations.clear(); logSet->getPushLocations(VectorRef(&pushTag, 1), locations, 0); for (int loc : locations) @@ -2869,12 +2841,8 @@ ACTOR Future TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst } if (oldLogSystem->tLogs.size()) { - if (nonShardedTxs) { - localTags.push_back(txsTag); - } else { - for (int i = 0; i < self->txsTags; i++) { - localTags.push_back(Tag(tagLocalityTxs, i)); - } + for (int i = 0; i < self->txsTags; i++) { + localTags.push_back(Tag(tagLocalityTxs, i)); } } @@ -3001,10 +2969,8 @@ ACTOR Future> TagPartitionedLogSystem::newEpoch( state RegionInfo region = configuration.getRegion(recr.dcId); state int maxTxsTags = oldLogSystem->txsTags; - state bool needsOldTxs = oldLogSystem->tLogs.size() && oldLogSystem->getTLogVersion() < TLogVersion::V4; for (auto& it : oldLogSystem->oldLogData) { maxTxsTags = std::max(maxTxsTags, it.txsTags); - needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4; } if (region.satelliteTLogReplicationFactor > 0 && configuration.usableRegions > 1) { @@ -3138,22 +3104,18 @@ ACTOR Future> TagPartitionedLogSystem::newEpoch( Tag tag = Tag(tagLocalityLogRouter, i); reqs[logSystem->tLogs[0]->bestLocationFor(tag)].recoverTags.push_back(tag); } - bool nonShardedTxs = logSystem->getTLogVersion() < TLogVersion::V4; + if (oldLogSystem->tLogs.size()) { - for (int i = needsOldTxs ? -1 : 0; i < maxTxsTags; i++) { - Tag tag = i == -1 ? txsTag : Tag(tagLocalityTxs, i); - Tag pushTag = (i == -1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i % logSystem->txsTags); + for (int i = 0; i < maxTxsTags; i++) { + Tag tag = Tag(tagLocalityTxs, i); + Tag pushTag = Tag(tagLocalityTxs, i % logSystem->txsTags); locations.clear(); logSystem->tLogs[0]->getPushLocations(VectorRef(&pushTag, 1), locations, 0); for (int loc : locations) reqs[loc].recoverTags.push_back(tag); } - if (nonShardedTxs) { - localTags.push_back(txsTag); - } else { - for (int i = 0; i < logSystem->txsTags; i++) { - localTags.push_back(Tag(tagLocalityTxs, i)); - } + for (int i = 0; i < logSystem->txsTags; i++) { + localTags.push_back(Tag(tagLocalityTxs, i)); } } @@ -3224,20 +3186,16 @@ ACTOR Future> TagPartitionedLogSystem::newEpoch( } } if (oldLogSystem->tLogs.size()) { - for (int i = needsOldTxs ? -1 : 0; i < maxTxsTags; i++) { - Tag tag = i == -1 ? txsTag : Tag(tagLocalityTxs, i); - Tag pushTag = (i == -1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i % logSystem->txsTags); + for (int i = 0; i < maxTxsTags; i++) { + Tag tag = Tag(tagLocalityTxs, i); + Tag pushTag = Tag(tagLocalityTxs, i % logSystem->txsTags); locations.clear(); logSystem->tLogs[1]->getPushLocations(VectorRef(&pushTag, 1), locations, 0); for (int loc : locations) sreqs[loc].recoverTags.push_back(tag); } - if (nonShardedTxs) { - satelliteTags.push_back(txsTag); - } else { - for (int i = 0; i < logSystem->txsTags; i++) { - satelliteTags.push_back(Tag(tagLocalityTxs, i)); - } + for (int i = 0; i < logSystem->txsTags; i++) { + satelliteTags.push_back(Tag(tagLocalityTxs, i)); } } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c6b9daa2d9..4e22c88eb5 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -8622,7 +8622,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { } ASSERT(fetchVersion >= shard->fetchVersion); // at this point, shard->fetchVersion is the last fetchVersion shard->fetchVersion = fetchVersion; - TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID) + TraceEvent(SevVerbose, "FetchKeysUnblocked", data->thisServerID) .detail("FKID", interval.pairID) .detail("Version", fetchVersion); @@ -8747,12 +8747,6 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .suppressFor(1.0) .detail("FKID", interval.pairID); - // FIXME: remove when we no longer support upgrades from 5.X - if (debug_getRangeRetries >= 100) { - data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::False; - TraceEvent(SevWarnAlways, "FKDisableLB").detail("FKID", fetchKeysID); - } - debug_getRangeRetries++; if (debug_nextRetryToLog == debug_getRangeRetries) { debug_nextRetryToLog += std::min(debug_nextRetryToLog, 1024); @@ -8806,12 +8800,6 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { } } - // FIXME: remove when we no longer support upgrades from 5.X - if (!data->cx->enableLocalityLoadBalance) { - data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::True; - TraceEvent(SevWarnAlways, "FKReenableLB").detail("FKID", fetchKeysID); - } - // We have completed the fetch and write of the data, now we wait for MVCC window to pass. // As we have finished this work, we will allow more work to start... shard->fetchComplete.send(Void());