Remove tagLocalityUpgraded usage at various places
Since we have removed old tlog implementation, so the code path using this tag can be deleted to simplify the code.
This commit is contained in:
parent
960db554a5
commit
7b76561bb9
|
@ -88,6 +88,7 @@ const Value keyServersValue(RangeResult result, const std::vector<UID>& src, con
|
|||
if (std::find(src.begin(), src.end(), uid) != src.end()) {
|
||||
srcTag.push_back(decodeServerTagValue(kv.value));
|
||||
if (srcTag.back().locality == tagLocalityUpgraded) {
|
||||
ASSERT(false);
|
||||
foundOldLocality = true;
|
||||
break;
|
||||
}
|
||||
|
@ -95,6 +96,7 @@ const Value keyServersValue(RangeResult result, const std::vector<UID>& src, con
|
|||
if (std::find(dest.begin(), dest.end(), uid) != dest.end()) {
|
||||
destTag.push_back(decodeServerTagValue(kv.value));
|
||||
if (destTag.back().locality == tagLocalityUpgraded) {
|
||||
ASSERT(false);
|
||||
foundOldLocality = true;
|
||||
break;
|
||||
}
|
||||
|
@ -832,6 +834,7 @@ Tag decodeServerTagValue(ValueRef const& value) {
|
|||
Tag s;
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
if (!reader.protocolVersion().hasTagLocality()) {
|
||||
ASSERT(false);
|
||||
int16_t id;
|
||||
reader >> id;
|
||||
if (id == invalidTagOld) {
|
||||
|
|
|
@ -3209,7 +3209,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
|
|||
rep.newLocality = false;
|
||||
if (localityKey.present()) {
|
||||
int8_t locality = decodeTagLocalityListValue(localityKey.get());
|
||||
if (rep.tag.locality != tagLocalityUpgraded && locality != rep.tag.locality) {
|
||||
if (locality != rep.tag.locality) {
|
||||
TraceEvent(SevWarnAlways, "SSRejoinedWithChangedLocality")
|
||||
.detail("Tag", rep.tag.toString())
|
||||
.detail("DcId", req.dcId)
|
||||
|
@ -3248,13 +3248,8 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
|
|||
.detail("Tag", rep.tag.toString())
|
||||
.detail("DcId", req.dcId);
|
||||
} else {
|
||||
rep.newLocality = true;
|
||||
int8_t maxTagLocality = -1;
|
||||
auto localityKeys = commitData->txnStateStore->readRange(tagLocalityListKeys).get();
|
||||
for (auto& kv : localityKeys) {
|
||||
maxTagLocality = std::max(maxTagLocality, decodeTagLocalityListValue(kv.value));
|
||||
}
|
||||
rep.newTag = Tag(maxTagLocality + 1, 0);
|
||||
// TODO: remove this
|
||||
ASSERT(false); // tagLocalityUpgraded is no longer used after removing old tlog format
|
||||
}
|
||||
rep.encryptMode = commitData->encryptMode;
|
||||
req.reply.send(rep);
|
||||
|
|
|
@ -683,14 +683,13 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekAll(UID dbgid,
|
|||
Version lastBegin = 0;
|
||||
bool foundSpecial = false;
|
||||
for (auto& log : tLogs) {
|
||||
if (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded) {
|
||||
if (log->locality == tagLocalitySpecial) {
|
||||
foundSpecial = true;
|
||||
}
|
||||
if (log->isLocal && log->logServers.size() &&
|
||||
(log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded ||
|
||||
log->locality == tag.locality || tag == txsTag || tag.locality == tagLocalityTxs ||
|
||||
tag.locality == tagLocalityLogRouter ||
|
||||
((tag.locality == tagLocalityUpgraded || tag == cacheTag) && log->locality != tagLocalitySatellite))) {
|
||||
(log->locality == tagLocalitySpecial || log->locality == tag.locality || tag == txsTag ||
|
||||
tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter ||
|
||||
(tag == cacheTag && log->locality != tagLocalitySatellite))) {
|
||||
lastBegin = std::max(lastBegin, log->startVersion);
|
||||
localSets.push_back(log);
|
||||
if (log->locality != tagLocalitySatellite) {
|
||||
|
@ -744,15 +743,13 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekAll(UID dbgid,
|
|||
Version thisBegin = begin;
|
||||
bool thisSpecial = false;
|
||||
for (auto& log : oldLogData[i].tLogs) {
|
||||
if (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded) {
|
||||
if (log->locality == tagLocalitySpecial) {
|
||||
thisSpecial = true;
|
||||
}
|
||||
if (log->isLocal && log->logServers.size() &&
|
||||
(log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded ||
|
||||
log->locality == tag.locality || tag == txsTag || tag.locality == tagLocalityTxs ||
|
||||
tag.locality == tagLocalityLogRouter ||
|
||||
((tag.locality == tagLocalityUpgraded || tag == cacheTag) &&
|
||||
log->locality != tagLocalitySatellite))) {
|
||||
(log->locality == tagLocalitySpecial || log->locality == tag.locality || tag == txsTag ||
|
||||
tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter ||
|
||||
(tag == cacheTag && log->locality != tagLocalitySatellite))) {
|
||||
thisBegin = std::max(thisBegin, log->startVersion);
|
||||
localOldSets.push_back(log);
|
||||
if (log->locality != tagLocalitySatellite) {
|
||||
|
@ -969,10 +966,10 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
|
|||
Version end,
|
||||
bool useMergePeekCursors,
|
||||
int8_t peekLocality) {
|
||||
if (tag.locality >= 0 || tag.locality == tagLocalityUpgraded || tag.locality == tagLocalitySpecial) {
|
||||
if (tag.locality >= 0 || tag.locality == tagLocalitySpecial) {
|
||||
peekLocality = tag.locality;
|
||||
}
|
||||
ASSERT(peekLocality >= 0 || peekLocality == tagLocalityUpgraded || tag.locality == tagLocalitySpecial);
|
||||
ASSERT(peekLocality >= 0 || tag.locality == tagLocalitySpecial);
|
||||
|
||||
int bestSet = -1;
|
||||
bool foundSpecial = false;
|
||||
|
@ -981,11 +978,9 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
|
|||
if (tLogs[t]->logServers.size() && tLogs[t]->locality != tagLocalitySatellite) {
|
||||
logCount++;
|
||||
}
|
||||
if (tLogs[t]->logServers.size() &&
|
||||
(tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded ||
|
||||
tLogs[t]->locality == peekLocality || peekLocality == tagLocalityUpgraded ||
|
||||
peekLocality == tagLocalitySpecial)) {
|
||||
if (tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded) {
|
||||
if (tLogs[t]->logServers.size() && (tLogs[t]->locality == tagLocalitySpecial ||
|
||||
tLogs[t]->locality == peekLocality || peekLocality == tagLocalitySpecial)) {
|
||||
if (tLogs[t]->locality == tagLocalitySpecial) {
|
||||
foundSpecial = true;
|
||||
}
|
||||
bestSet = t;
|
||||
|
@ -1089,11 +1084,8 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
|
|||
}
|
||||
if (oldLogData[i].tLogs[t]->logServers.size() &&
|
||||
(oldLogData[i].tLogs[t]->locality == tagLocalitySpecial ||
|
||||
oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ||
|
||||
oldLogData[i].tLogs[t]->locality == peekLocality || peekLocality == tagLocalityUpgraded ||
|
||||
peekLocality == tagLocalitySpecial)) {
|
||||
if (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial ||
|
||||
oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded) {
|
||||
oldLogData[i].tLogs[t]->locality == peekLocality || peekLocality == tagLocalitySpecial)) {
|
||||
if (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial) {
|
||||
nextFoundSpecial = true;
|
||||
}
|
||||
if (foundSpecial && !oldLogData[i].tLogs[t]->isLocal) {
|
||||
|
@ -1523,7 +1515,7 @@ void TagPartitionedLogSystem::pop(Version upTo, Tag tag, Version durableKnownCom
|
|||
return;
|
||||
}
|
||||
for (auto& t : tLogs) {
|
||||
if (t->locality == tagLocalitySpecial || t->locality == tag.locality || tag.locality == tagLocalityUpgraded ||
|
||||
if (t->locality == tagLocalitySpecial || t->locality == tag.locality ||
|
||||
(tag.locality < 0 && ((popLocality == tagLocalityInvalid) == t->isLocal))) {
|
||||
for (auto& log : t->logServers) {
|
||||
Version prev = outstandingPops[std::make_pair(log->get().id(), tag)].first;
|
||||
|
@ -2394,7 +2386,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
|
|||
std::set<int8_t> lockedLocalities;
|
||||
bool foundSpecial = false;
|
||||
for (int i = 0; i < logServers.size(); i++) {
|
||||
if (logServers[i]->locality == tagLocalitySpecial || logServers[i]->locality == tagLocalityUpgraded) {
|
||||
if (logServers[i]->locality == tagLocalitySpecial) {
|
||||
foundSpecial = true;
|
||||
}
|
||||
lockedLocalities.insert(logServers[i]->locality);
|
||||
|
@ -2411,7 +2403,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
|
|||
break;
|
||||
}
|
||||
for (auto& log : old.tLogs) {
|
||||
if (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded) {
|
||||
if (log->locality == tagLocalitySpecial) {
|
||||
foundSpecial = true;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -120,6 +120,7 @@ struct OldTLogCoreData {
|
|||
if (ar.protocolVersion().hasTagLocality()) {
|
||||
serializer(ar, tLogs, logRouterTags, epochEnd);
|
||||
} else if (ar.isDeserializing) {
|
||||
ASSERT(false);
|
||||
tLogs.push_back(CoreTLogSet());
|
||||
serializer(ar,
|
||||
tLogs[0].tLogs,
|
||||
|
@ -206,6 +207,7 @@ struct DBCoreState {
|
|||
serializer(ar, encryptionAtRestMode);
|
||||
}
|
||||
} else if (ar.isDeserializing) {
|
||||
ASSERT(false);
|
||||
tLogs.push_back(CoreTLogSet());
|
||||
serializer(ar,
|
||||
tLogs[0].tLogs,
|
||||
|
|
Loading…
Reference in New Issue