Remove pseudoLocalities from LogSet, TLogSet, and CoreTLogSet
This commit is contained in:
parent
7befce6bf1
commit
010f825aff
|
@ -30,6 +30,7 @@
|
|||
#include "fdbserver/MasterInterface.h"
|
||||
|
||||
class LogSet;
|
||||
struct OldLogData;
|
||||
|
||||
// This structure is stored persistently in CoordinatedState and must be versioned carefully!
|
||||
// It records a synchronous replication topology which can be used in the absence of faults (or under a limited
|
||||
|
@ -52,19 +53,18 @@ struct CoreTLogSet {
|
|||
Version startVersion;
|
||||
std::vector<std::vector<int>> satelliteTagLocations;
|
||||
TLogVersion tLogVersion;
|
||||
std::set<int8_t> pseudoLocalitites;
|
||||
|
||||
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
|
||||
explicit CoreTLogSet(const LogSet& logset);
|
||||
|
||||
bool operator == (CoreTLogSet const& rhs) const {
|
||||
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && satelliteTagLocations == rhs.satelliteTagLocations &&
|
||||
pseudoLocalitites == rhs.pseudoLocalitites && locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
|
||||
locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
|
||||
}
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, pseudoLocalitites);
|
||||
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
|
||||
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
|
||||
tLogVersion = TLogVersion::V2;
|
||||
} else {
|
||||
|
@ -80,6 +80,7 @@ struct OldTLogCoreData {
|
|||
std::set<int8_t> pseudoLocalities;
|
||||
|
||||
OldTLogCoreData() : epochEnd(0), logRouterTags(0) {}
|
||||
explicit OldTLogCoreData(const OldLogData&);
|
||||
|
||||
bool operator == (OldTLogCoreData const& rhs) const {
|
||||
return tLogs == rhs.tLogs && logRouterTags == rhs.logRouterTags && epochEnd == rhs.epochEnd && pseudoLocalities == rhs.pseudoLocalities;
|
||||
|
|
|
@ -53,7 +53,6 @@ public:
|
|||
Version startVersion;
|
||||
std::vector<Future<TLogLockResult>> replies;
|
||||
std::vector<std::vector<int>> satelliteTagLocations;
|
||||
std::set<int8_t> pseudoLocalities;
|
||||
|
||||
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
|
||||
LogSet(const TLogSet& tlogSet);
|
||||
|
|
|
@ -56,6 +56,7 @@ protected:
|
|||
};
|
||||
|
||||
class LogSet;
|
||||
struct OldLogData;
|
||||
|
||||
struct TLogSet {
|
||||
std::vector<OptionalInterface<TLogInterface>> tLogs;
|
||||
|
@ -68,7 +69,6 @@ struct TLogSet {
|
|||
int8_t locality;
|
||||
Version startVersion;
|
||||
std::vector<std::vector<int>> satelliteTagLocations;
|
||||
std::set<int8_t> pseudoLocalitites;
|
||||
|
||||
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
|
||||
explicit TLogSet(const LogSet& rhs);
|
||||
|
@ -79,7 +79,7 @@ struct TLogSet {
|
|||
|
||||
bool operator == ( const TLogSet& rhs ) const {
|
||||
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || satelliteTagLocations != rhs.satelliteTagLocations ||
|
||||
pseudoLocalitites != rhs.pseudoLocalitites || startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) {
|
||||
startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) {
|
||||
return false;
|
||||
}
|
||||
if ((tLogPolicy && !rhs.tLogPolicy) || (!tLogPolicy && rhs.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != rhs.tLogPolicy->info()))) {
|
||||
|
@ -100,7 +100,7 @@ struct TLogSet {
|
|||
|
||||
bool isEqualIds(TLogSet const& r) const {
|
||||
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || satelliteTagLocations != r.satelliteTagLocations ||
|
||||
pseudoLocalitites != r.pseudoLocalitites || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
|
||||
startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
|
||||
return false;
|
||||
}
|
||||
if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) {
|
||||
|
@ -116,7 +116,7 @@ struct TLogSet {
|
|||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, pseudoLocalitites);
|
||||
serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
|
||||
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
|
||||
tLogVersion = TLogVersion::V2;
|
||||
} else {
|
||||
|
@ -133,6 +133,7 @@ struct OldTLogConf {
|
|||
std::set<int8_t> pseudoLocalities;
|
||||
|
||||
OldTLogConf() : epochEnd(0), logRouterTags(0) {}
|
||||
explicit OldTLogConf(const OldLogData&);
|
||||
|
||||
std::string toString() const {
|
||||
return format("end: %d tags: %d %s", epochEnd, logRouterTags, describe(tLogs).c_str());
|
||||
|
|
|
@ -77,8 +77,7 @@ LogSet::LogSet(const TLogSet& tLogSet) :
|
|||
tLogLocalities(tLogSet.tLogLocalities), tLogVersion(tLogSet.tLogVersion),
|
||||
tLogPolicy(tLogSet.tLogPolicy), isLocal(tLogSet.isLocal),
|
||||
locality(tLogSet.locality), startVersion(tLogSet.startVersion),
|
||||
satelliteTagLocations(tLogSet.satelliteTagLocations),
|
||||
pseudoLocalities(tLogSet.pseudoLocalitites)
|
||||
satelliteTagLocations(tLogSet.satelliteTagLocations)
|
||||
{
|
||||
for(const auto& log : tLogSet.tLogs) {
|
||||
logServers.push_back(Reference<AsyncVar<OptionalInterface<TLogInterface>>>(new AsyncVar<OptionalInterface<TLogInterface>>(log)));
|
||||
|
@ -94,8 +93,7 @@ LogSet::LogSet(const CoreTLogSet& coreSet) :
|
|||
tLogLocalities(coreSet.tLogLocalities), tLogVersion(coreSet.tLogVersion),
|
||||
tLogPolicy(coreSet.tLogPolicy), isLocal(coreSet.isLocal),
|
||||
locality(coreSet.locality), startVersion(coreSet.startVersion),
|
||||
satelliteTagLocations(coreSet.satelliteTagLocations),
|
||||
pseudoLocalities(coreSet.pseudoLocalitites)
|
||||
satelliteTagLocations(coreSet.satelliteTagLocations)
|
||||
{
|
||||
for(const auto& log : coreSet.tLogs) {
|
||||
logServers.push_back(Reference<AsyncVar<OptionalInterface<TLogInterface>>>(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(log))));
|
||||
|
@ -108,8 +106,7 @@ TLogSet::TLogSet(const LogSet& rhs) :
|
|||
tLogLocalities(rhs.tLogLocalities), tLogVersion(rhs.tLogVersion),
|
||||
tLogPolicy(rhs.tLogPolicy), isLocal(rhs.isLocal), locality(rhs.locality),
|
||||
startVersion(rhs.startVersion),
|
||||
satelliteTagLocations(rhs.satelliteTagLocations),
|
||||
pseudoLocalitites(rhs.pseudoLocalities)
|
||||
satelliteTagLocations(rhs.satelliteTagLocations)
|
||||
{
|
||||
for (const auto& tlog : rhs.logServers) {
|
||||
tLogs.push_back(tlog->get());
|
||||
|
@ -120,6 +117,15 @@ TLogSet::TLogSet(const LogSet& rhs) :
|
|||
}
|
||||
}
|
||||
|
||||
OldTLogConf::OldTLogConf(const OldLogData& oldLogData) :
|
||||
logRouterTags(oldLogData.logRouterTags), epochEnd(oldLogData.epochEnd),
|
||||
pseudoLocalities(oldLogData.pseudoLocalities)
|
||||
{
|
||||
for (const Reference<LogSet>& logSet : oldLogData.tLogs) {
|
||||
tLogs.emplace_back(*logSet);
|
||||
}
|
||||
}
|
||||
|
||||
CoreTLogSet::CoreTLogSet(const LogSet& logset) :
|
||||
tLogWriteAntiQuorum(logset.tLogWriteAntiQuorum),
|
||||
tLogReplicationFactor(logset.tLogReplicationFactor),
|
||||
|
@ -127,14 +133,24 @@ CoreTLogSet::CoreTLogSet(const LogSet& logset) :
|
|||
tLogPolicy(logset.tLogPolicy), isLocal(logset.isLocal),
|
||||
locality(logset.locality), startVersion(logset.startVersion),
|
||||
satelliteTagLocations(logset.satelliteTagLocations),
|
||||
tLogVersion(logset.tLogVersion),
|
||||
pseudoLocalitites(logset.pseudoLocalities)
|
||||
tLogVersion(logset.tLogVersion)
|
||||
{
|
||||
for (const auto &log : logset.logServers) {
|
||||
tLogs.push_back(log->get().id());
|
||||
}
|
||||
}
|
||||
|
||||
OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData) :
|
||||
logRouterTags(oldData.logRouterTags), epochEnd(oldData.epochEnd),
|
||||
pseudoLocalities(oldData.pseudoLocalities)
|
||||
{
|
||||
for (const Reference<LogSet>& logSet : oldData.tLogs) {
|
||||
if (logSet->logServers.size()) {
|
||||
tLogs.emplace_back(*logSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogSystem> {
|
||||
UID dbgid;
|
||||
LogSystemType logSystemType;
|
||||
|
@ -227,7 +243,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
|
||||
logSystem->logSystemType = lsConf.logSystemType;
|
||||
logSystem->pseudoLocalities = lsConf.pseudoLocalities;
|
||||
return logSystem;
|
||||
}
|
||||
|
||||
|
@ -277,16 +292,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
newState.oldTLogData.clear();
|
||||
if(!recoveryComplete.isValid() || !recoveryComplete.isReady() || (repopulateRegionAntiQuorum == 0 && (!remoteRecoveryComplete.isValid() || !remoteRecoveryComplete.isReady()))) {
|
||||
newState.oldTLogData.resize(oldLogData.size());
|
||||
for (int i = 0; i < oldLogData.size(); i++) {
|
||||
for (const auto& t : oldLogData[i].tLogs) {
|
||||
if (t->logServers.size()) {
|
||||
newState.oldTLogData[i].tLogs.emplace_back(*t);
|
||||
}
|
||||
}
|
||||
newState.oldTLogData[i].logRouterTags = oldLogData[i].logRouterTags;
|
||||
newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd;
|
||||
newState.oldTLogData[i].pseudoLocalities = oldLogData[i].pseudoLocalities;
|
||||
for (const auto& oldData : oldLogData) {
|
||||
newState.oldTLogData.emplace_back(oldData);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1001,14 +1008,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
if(!recoveryCompleteWrittenToCoreState.get()) {
|
||||
for (const auto& oldData : oldLogData) {
|
||||
logSystemConfig.oldTLogs.push_back(OldTLogConf());
|
||||
|
||||
for (const Reference<LogSet>& logSet : oldData.tLogs) {
|
||||
logSystemConfig.oldTLogs.back().tLogs.emplace_back(*logSet);
|
||||
}
|
||||
logSystemConfig.oldTLogs.back().logRouterTags = oldData.logRouterTags;
|
||||
logSystemConfig.oldTLogs.back().epochEnd = oldData.epochEnd;
|
||||
logSystemConfig.oldTLogs.back().pseudoLocalities = oldData.pseudoLocalities;
|
||||
logSystemConfig.oldTLogs.emplace_back(oldData);
|
||||
}
|
||||
}
|
||||
return logSystemConfig;
|
||||
|
|
Loading…
Reference in New Issue