Remove the usage of txsTag (#11688)

* Add assertions to code paths with txsTag

txsTag should be obsolete by now, since it's used in 6.1, which is no longer
supported for upgrade.

* Actually remove txsTag usage

20240926-225930-jzhou-7ed3304c415ae65e

* Remove more code

20240926-235242-jzhou-7ed3304c415ae65e

* Disable two verbose trace events

They can cause TraceTooManyLines errors.
This commit is contained in:
Jingyu Zhou 2024-09-30 04:53:37 -07:00 committed by GitHub
parent 35a1c51e3f
commit f86058fba6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 38 additions and 100 deletions

View File

@ -67,7 +67,7 @@ backup data and is *NOT* metadata mutations.
When a commit proxy writes metadata mutations to the log system, the proxy assigns a
"txs" tag to the mutation. Depending on FDB versions, the "txs" tag can be one special
tag `txsTag{ tagLocalitySpecial, 1 }` for `TLogVersion::V3` (FDB 6.1) or a randomized
tag `txsTag{ tagLocalitySpecial, 1 }` for `TLogVersion::V3` (FDB 6.1, obsolete now) or a randomized
"txs" tag for `TLogVersion::V4` (FDB 6.2 and later) and larger. The idea of randomized
"txs" tag is to spread metadata mutations to all TLogs for faster parallel recovery of
`txnStateStore`.

View File

@ -2532,7 +2532,7 @@ void ReadYourWritesTransaction::setOption(FDBTransactionOptions::Option option,
}
void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option option, Optional<StringRef> value) {
TraceEvent(SevDebug, "TransactionSetOption").detail("Option", option).detail("Value", value);
TraceEvent(SevVerbose, "TransactionSetOption").detail("Option", option).detail("Value", value);
switch (option) {
case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
validateOptionValueNotPresent(value);

View File

@ -155,11 +155,9 @@ struct hash<Tag> {
} // namespace std
static const Tag invalidTag{ tagLocalitySpecial, 0 };
static const Tag txsTag{ tagLocalitySpecial, 1 };
static const Tag txsTag{ tagLocalitySpecial, 1 }; // obsolete now
static const Tag cacheTag{ tagLocalitySpecial, 2 };
enum { txsTagOld = -1, invalidTagOld = -100 };
struct TagsAndMessage {
StringRef message;
VectorRef<Tag> tags;

View File

@ -180,12 +180,9 @@ void LogSet::checkSatelliteTagLocations() {
int LogSet::bestLocationFor(Tag tag) {
if (locality == tagLocalitySatellite) {
return satelliteTagLocations[tag == txsTag ? 0 : tag.id + 1][0];
return satelliteTagLocations[tag.id + 1][0];
}
// the following logic supports upgrades from 5.X
if (tag == txsTag)
return txsTagOld % logServers.size();
return tag.id % logServers.size();
}
@ -219,8 +216,8 @@ bool LogSet::satisfiesPolicy(const std::vector<LocalityEntry>& locations) {
void LogSet::getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, int locationOffset, bool allLocations) {
if (locality == tagLocalitySatellite) {
for (auto& t : tags) {
if (t == txsTag || t.locality == tagLocalityTxs || t.locality == tagLocalityLogRouter) {
for (int loc : satelliteTagLocations[t == txsTag ? 0 : t.id + 1]) {
if (t.locality == tagLocalityTxs || t.locality == tagLocalityLogRouter) {
for (int loc : satelliteTagLocations[t.id + 1]) {
locations.push_back(locationOffset + loc);
}
}
@ -282,11 +279,7 @@ LogPushData::LogPushData(Reference<ILogSystem> logSystem, int tlogCount) : logSy
}
void LogPushData::addTxsTag() {
if (logSystem->getTLogVersion() >= TLogVersion::V4) {
next_message_tags.push_back(logSystem->getRandomTxsTag());
} else {
next_message_tags.push_back(txsTag);
}
}
void LogPushData::addTransactionInfo(SpanContext const& context) {

View File

@ -90,8 +90,8 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
}
TagsAndMessage msg;
msg.loadFromArena(&rdr, nullptr);
bool logAdapterMessage = std::any_of(
msg.tags.begin(), msg.tags.end(), [](const Tag& t) { return t == txsTag || t.locality == tagLocalityTxs; });
bool logAdapterMessage =
std::any_of(msg.tags.begin(), msg.tags.end(), [](const Tag& t) { return t.locality == tagLocalityTxs; });
StringRef mutationData = msg.getMessageWithoutTags();
uint8_t mutationType = *mutationData.begin();
if (logAdapterMessage) {

View File

@ -1402,6 +1402,7 @@ ACTOR Future<Void> fetchKeys(StorageCacheData* data, AddingCacheRange* cacheRang
// TODO: NEELAM: what's this for?
// FIXME: remove when we no longer support upgrades from 5.X
if (debug_getRangeRetries >= 100) {
ASSERT(false);
data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::False;
}

View File

@ -687,9 +687,8 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekAll(UID dbgid,
foundSpecial = true;
}
if (log->isLocal && log->logServers.size() &&
(log->locality == tagLocalitySpecial || log->locality == tag.locality || tag == txsTag ||
tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter ||
(tag == cacheTag && log->locality != tagLocalitySatellite))) {
(log->locality == tagLocalitySpecial || log->locality == tag.locality || tag.locality == tagLocalityTxs ||
tag.locality == tagLocalityLogRouter || (tag == cacheTag && log->locality != tagLocalitySatellite))) {
lastBegin = std::max(lastBegin, log->startVersion);
localSets.push_back(log);
if (log->locality != tagLocalitySatellite) {
@ -725,7 +724,7 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekAll(UID dbgid,
}
for (int i = 0; begin < lastBegin; i++) {
if (i == oldLogData.size()) {
if (tag == txsTag || tag.locality == tagLocalityTxs || tag == cacheTag) {
if (tag.locality == tagLocalityTxs || tag == cacheTag) {
break;
}
TraceEvent("TLogPeekAllDead", dbgid)
@ -747,7 +746,7 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekAll(UID dbgid,
thisSpecial = true;
}
if (log->isLocal && log->logServers.size() &&
(log->locality == tagLocalitySpecial || log->locality == tag.locality || tag == txsTag ||
(log->locality == tagLocalitySpecial || log->locality == tag.locality ||
tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter ||
(tag == cacheTag && log->locality != tagLocalitySatellite))) {
thisBegin = std::max(thisBegin, log->startVersion);
@ -1058,7 +1057,7 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
Version lastBegin = tLogs[bestSet]->startVersion;
for (int i = 0; begin < lastBegin; i++) {
if (i == oldLogData.size()) {
if ((tag == txsTag || tag.locality == tagLocalityTxs) && cursors.size()) {
if (tag.locality == tagLocalityTxs && cursors.size()) {
break;
}
TraceEvent("TLogPeekLocalDead", dbgid)
@ -1163,7 +1162,7 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekTxs(UID dbgid,
if (!tLogs.size()) {
TraceEvent("TLogPeekTxsNoLogs", dbgid).log();
return makeReference<ILogSystem::ServerPeekCursor>(
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), txsTag, begin, end, false, false);
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), invalidTag, begin, end, false, false);
}
TraceEvent("TLogPeekTxs", dbgid)
.detail("Begin", begin)
@ -1173,10 +1172,8 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekTxs(UID dbgid,
.detail("CanDiscardPopped", canDiscardPopped);
int maxTxsTags = txsTags;
bool needsOldTxs = tLogs[0]->tLogVersion < TLogVersion::V4;
for (auto& it : oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.txsTags);
needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4;
}
if (peekLocality < 0 || localEnd == invalidVersion || localEnd <= begin) {
@ -1185,10 +1182,6 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekTxs(UID dbgid,
for (int i = 0; i < maxTxsTags; i++) {
cursors.push_back(peekAll(dbgid, begin, end, Tag(tagLocalityTxs, i), true));
}
// SOMEDAY: remove once upgrades from 6.2 are no longer supported
if (needsOldTxs) {
cursors.push_back(peekAll(dbgid, begin, end, txsTag, true));
}
return makeReference<ILogSystem::BufferedCursor>(cursors, begin, end, false, canDiscardPopped);
}
@ -1200,10 +1193,6 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekTxs(UID dbgid,
for (int i = 0; i < maxTxsTags; i++) {
cursors.push_back(peekLocal(dbgid, Tag(tagLocalityTxs, i), begin, end, true, peekLocality));
}
// SOMEDAY: remove once upgrades from 6.2 are no longer supported
if (needsOldTxs) {
cursors.push_back(peekLocal(dbgid, txsTag, begin, end, true, peekLocality));
}
return makeReference<ILogSystem::BufferedCursor>(cursors, begin, end, false, canDiscardPopped);
}
@ -1219,11 +1208,6 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekTxs(UID dbgid,
localCursors.push_back(peekLocal(dbgid, Tag(tagLocalityTxs, i), begin, localEnd, true, peekLocality));
allCursors.push_back(peekAll(dbgid, localEnd, end, Tag(tagLocalityTxs, i), true));
}
// SOMEDAY: remove once upgrades from 6.2 are no longer supported
if (needsOldTxs) {
localCursors.push_back(peekLocal(dbgid, txsTag, begin, localEnd, true, peekLocality));
allCursors.push_back(peekAll(dbgid, localEnd, end, txsTag, true));
}
cursors[1] = makeReference<ILogSystem::BufferedCursor>(localCursors, begin, localEnd, false, canDiscardPopped);
cursors[0] = makeReference<ILogSystem::BufferedCursor>(allCursors, localEnd, end, false, false);
@ -1237,10 +1221,6 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekTxs(UID dbgid,
for (int i = 0; i < maxTxsTags; i++) {
cursors.push_back(peekAll(dbgid, begin, end, Tag(tagLocalityTxs, i), true));
}
// SOMEDAY: remove once upgrades from 6.2 are no longer supported
if (needsOldTxs) {
cursors.push_back(peekAll(dbgid, begin, end, txsTag, true));
}
return makeReference<ILogSystem::BufferedCursor>(cursors, begin, end, false, canDiscardPopped);
}
@ -1493,14 +1473,10 @@ void TagPartitionedLogSystem::popLogRouter(Version upTo,
}
void TagPartitionedLogSystem::popTxs(Version upTo, int8_t popLocality) {
if (getTLogVersion() < TLogVersion::V4) {
pop(upTo, txsTag, 0, popLocality);
} else {
for (int i = 0; i < txsTags; i++) {
pop(upTo, Tag(tagLocalityTxs, i), 0, popLocality);
}
}
}
void TagPartitionedLogSystem::pop(Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality) {
if (upTo <= 0)
@ -1593,8 +1569,7 @@ ACTOR Future<Version> TagPartitionedLogSystem::getPoppedTxs(TagPartitionedLogSys
poppedFutures.push_back(std::vector<Future<Version>>());
for (auto& it : self->tLogs) {
for (auto& log : it->logServers) {
poppedFutures.back().push_back(TagPartitionedLogSystem::getPoppedFromTLog(
log, self->tLogs[0]->tLogVersion < TLogVersion::V4 ? txsTag : Tag(tagLocalityTxs, 0)));
poppedFutures.back().push_back(TagPartitionedLogSystem::getPoppedFromTLog(log, Tag(tagLocalityTxs, 0)));
}
}
poppedReady.push_back(waitForAny(poppedFutures.back()));
@ -1605,8 +1580,8 @@ ACTOR Future<Version> TagPartitionedLogSystem::getPoppedTxs(TagPartitionedLogSys
poppedFutures.push_back(std::vector<Future<Version>>());
for (auto& it : old.tLogs) {
for (auto& log : it->logServers) {
poppedFutures.back().push_back(TagPartitionedLogSystem::getPoppedFromTLog(
log, old.tLogs[0]->tLogVersion < TLogVersion::V4 ? txsTag : Tag(tagLocalityTxs, 0)));
poppedFutures.back().push_back(
TagPartitionedLogSystem::getPoppedFromTLog(log, Tag(tagLocalityTxs, 0)));
}
}
poppedReady.push_back(waitForAny(poppedFutures.back()));
@ -2840,7 +2815,6 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
std::vector<InitializeTLogRequest> remoteTLogReqs(remoteWorkers.remoteTLogs.size());
state std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> allRemoteTLogServers;
bool nonShardedTxs = self->getTLogVersion() < TLogVersion::V4;
if (oldLogSystem->logRouterTags == 0) {
std::vector<int> locations;
for (Tag tag : localTags) {
@ -2852,14 +2826,12 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
if (oldLogSystem->tLogs.size()) {
int maxTxsTags = oldLogSystem->txsTags;
bool needsOldTxs = oldLogSystem->tLogs[0]->tLogVersion < TLogVersion::V4;
for (auto& it : oldLogSystem->oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.txsTags);
needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4;
}
for (int i = needsOldTxs ? -1 : 0; i < maxTxsTags; i++) {
Tag tag = i == -1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i == -1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i % self->txsTags);
for (int i = 0; i < maxTxsTags; i++) {
Tag tag = Tag(tagLocalityTxs, i);
Tag pushTag = Tag(tagLocalityTxs, i % self->txsTags);
locations.clear();
logSet->getPushLocations(VectorRef<Tag>(&pushTag, 1), locations, 0);
for (int loc : locations)
@ -2869,14 +2841,10 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
}
if (oldLogSystem->tLogs.size()) {
if (nonShardedTxs) {
localTags.push_back(txsTag);
} else {
for (int i = 0; i < self->txsTags; i++) {
localTags.push_back(Tag(tagLocalityTxs, i));
}
}
}
for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) {
InitializeTLogRequest& req = remoteTLogReqs[i];
@ -3001,10 +2969,8 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
state RegionInfo region = configuration.getRegion(recr.dcId);
state int maxTxsTags = oldLogSystem->txsTags;
state bool needsOldTxs = oldLogSystem->tLogs.size() && oldLogSystem->getTLogVersion() < TLogVersion::V4;
for (auto& it : oldLogSystem->oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.txsTags);
needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4;
}
if (region.satelliteTLogReplicationFactor > 0 && configuration.usableRegions > 1) {
@ -3138,24 +3104,20 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
Tag tag = Tag(tagLocalityLogRouter, i);
reqs[logSystem->tLogs[0]->bestLocationFor(tag)].recoverTags.push_back(tag);
}
bool nonShardedTxs = logSystem->getTLogVersion() < TLogVersion::V4;
if (oldLogSystem->tLogs.size()) {
for (int i = needsOldTxs ? -1 : 0; i < maxTxsTags; i++) {
Tag tag = i == -1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i == -1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i % logSystem->txsTags);
for (int i = 0; i < maxTxsTags; i++) {
Tag tag = Tag(tagLocalityTxs, i);
Tag pushTag = Tag(tagLocalityTxs, i % logSystem->txsTags);
locations.clear();
logSystem->tLogs[0]->getPushLocations(VectorRef<Tag>(&pushTag, 1), locations, 0);
for (int loc : locations)
reqs[loc].recoverTags.push_back(tag);
}
if (nonShardedTxs) {
localTags.push_back(txsTag);
} else {
for (int i = 0; i < logSystem->txsTags; i++) {
localTags.push_back(Tag(tagLocalityTxs, i));
}
}
}
// Should be sorted by descending orders of versions.
state std::vector<Version> oldGenerationRecoverAtVersions;
@ -3224,22 +3186,18 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
}
}
if (oldLogSystem->tLogs.size()) {
for (int i = needsOldTxs ? -1 : 0; i < maxTxsTags; i++) {
Tag tag = i == -1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i == -1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i % logSystem->txsTags);
for (int i = 0; i < maxTxsTags; i++) {
Tag tag = Tag(tagLocalityTxs, i);
Tag pushTag = Tag(tagLocalityTxs, i % logSystem->txsTags);
locations.clear();
logSystem->tLogs[1]->getPushLocations(VectorRef<Tag>(&pushTag, 1), locations, 0);
for (int loc : locations)
sreqs[loc].recoverTags.push_back(tag);
}
if (nonShardedTxs) {
satelliteTags.push_back(txsTag);
} else {
for (int i = 0; i < logSystem->txsTags; i++) {
satelliteTags.push_back(Tag(tagLocalityTxs, i));
}
}
}
for (int i = 0; i < recr.satelliteTLogs.size(); i++) {
InitializeTLogRequest& req = sreqs[i];

View File

@ -8622,7 +8622,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
}
ASSERT(fetchVersion >= shard->fetchVersion); // at this point, shard->fetchVersion is the last fetchVersion
shard->fetchVersion = fetchVersion;
TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID)
TraceEvent(SevVerbose, "FetchKeysUnblocked", data->thisServerID)
.detail("FKID", interval.pairID)
.detail("Version", fetchVersion);
@ -8747,12 +8747,6 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.suppressFor(1.0)
.detail("FKID", interval.pairID);
// FIXME: remove when we no longer support upgrades from 5.X
if (debug_getRangeRetries >= 100) {
data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::False;
TraceEvent(SevWarnAlways, "FKDisableLB").detail("FKID", fetchKeysID);
}
debug_getRangeRetries++;
if (debug_nextRetryToLog == debug_getRangeRetries) {
debug_nextRetryToLog += std::min(debug_nextRetryToLog, 1024);
@ -8806,12 +8800,6 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
}
}
// FIXME: remove when we no longer support upgrades from 5.X
if (!data->cx->enableLocalityLoadBalance) {
data->cx->enableLocalityLoadBalance = EnableLocalityLoadBalance::True;
TraceEvent(SevWarnAlways, "FKReenableLB").detail("FKID", fetchKeysID);
}
// We have completed the fetch and write of the data, now we wait for MVCC window to pass.
// As we have finished this work, we will allow more work to start...
shard->fetchComplete.send(Void());