From d19b0cf1c19dcd9828fdfdf5926ec73cbce470b7 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 10 Apr 2019 11:21:27 -0700 Subject: [PATCH] Refactor LogSet with two new constructors --- fdbrpc/ReplicationUtils.cpp | 1 - fdbserver/LogSystem.h | 4 + fdbserver/TagPartitionedLogSystem.actor.cpp | 157 +++++++------------- 3 files changed, 54 insertions(+), 108 deletions(-) diff --git a/fdbrpc/ReplicationUtils.cpp b/fdbrpc/ReplicationUtils.cpp index d6be5507f8..356194d5da 100644 --- a/fdbrpc/ReplicationUtils.cpp +++ b/fdbrpc/ReplicationUtils.cpp @@ -835,7 +835,6 @@ void filterLocalityDataForPolicy(Reference policy, LocalityD void filterLocalityDataForPolicy(Reference policy, std::vector* vld) { if (!policy) return; - std::set keys = policy->attributeKeys(); for (LocalityData& ld : *vld) { filterLocalityDataForPolicy(policy, &ld); } diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 88e9448f2a..29a3e65ebb 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -30,6 +30,8 @@ #include "fdbrpc/Replication.h" struct DBCoreState; +struct TLogSet; +struct CoreTLogSet; class LogSet : NonCopyable, public ReferenceCounted { public: @@ -50,6 +52,8 @@ public: std::vector> satelliteTagLocations; LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {} + LogSet(const TLogSet& tlogSet); + LogSet(const CoreTLogSet& coreSet); std::string logRouterString() { std::string result; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 38488a4efc..c9d928e898 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -58,6 +58,35 @@ struct LogLockInfo { LogLockInfo() : epochEnd(std::numeric_limits::max()), isCurrent(false) {} }; +LogSet::LogSet(const TLogSet& tLogSet) : + tLogWriteAntiQuorum(tLogSet.tLogWriteAntiQuorum), + tLogReplicationFactor(tLogSet.tLogReplicationFactor), + tLogLocalities(tLogSet.tLogLocalities), tLogVersion(tLogSet.tLogVersion), + tLogPolicy(tLogSet.tLogPolicy), isLocal(tLogSet.isLocal), + locality(tLogSet.locality), startVersion(tLogSet.startVersion), + satelliteTagLocations(tLogSet.satelliteTagLocations) +{ + for(const auto& log : tLogSet.tLogs) { + logServers.push_back(Reference>>(new AsyncVar>(log))); + } + for(const auto& log : tLogSet.logRouters) { + logRouters.push_back(Reference>>(new AsyncVar>(log))); + } +} + +LogSet::LogSet(const CoreTLogSet& coreSet) : + tLogWriteAntiQuorum(coreSet.tLogWriteAntiQuorum), + tLogReplicationFactor(coreSet.tLogReplicationFactor), + tLogLocalities(coreSet.tLogLocalities), tLogVersion(coreSet.tLogVersion), + tLogPolicy(coreSet.tLogPolicy), isLocal(coreSet.isLocal), + locality(coreSet.locality), startVersion(coreSet.startVersion), + satelliteTagLocations(coreSet.satelliteTagLocations) +{ + for(const auto& log : coreSet.tLogs) { + logServers.push_back(Reference>>(new AsyncVar>(OptionalInterface(log)))); + } +} + struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { UID dbgid; LogSystemType logSystemType; @@ -134,26 +163,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedrecoveredAt = lsConf.recoveredAt; } - for( int i = 0; i < lsConf.tLogs.size(); i++ ) { - TLogSet const& tLogSet = lsConf.tLogs[i]; - if(!excludeRemote || tLogSet.isLocal) { - Reference logSet = Reference( new LogSet() ); - logSystem->tLogs.push_back( logSet ); - for( auto& log : tLogSet.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto& log : tLogSet.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogSet.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; - logSet->tLogPolicy = tLogSet.tLogPolicy; - logSet->tLogLocalities = tLogSet.tLogLocalities; - logSet->isLocal = tLogSet.isLocal; - logSet->locality = tLogSet.locality; - logSet->startVersion = tLogSet.startVersion; - logSet->satelliteTagLocations = tLogSet.satelliteTagLocations; + for (const TLogSet& tLogSet : lsConf.tLogs) { + if (!excludeRemote || tLogSet.isLocal) { + Reference logSet(new LogSet(tLogSet)); + logSystem->tLogs.push_back(logSet); filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -162,25 +175,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData.resize(lsConf.oldTLogs.size()); for( int i = 0; i < lsConf.oldTLogs.size(); i++ ) { logSystem->oldLogData[i].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); - for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) { - Reference logSet = Reference( new LogSet() ); - logSystem->oldLogData[i].tLogs[j] = logSet; + for (int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++) { TLogSet const& tLogData = lsConf.oldTLogs[i].tLogs[j]; - for( auto & log : tLogData.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto & log : tLogData.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogData.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor; - logSet->tLogPolicy = tLogData.tLogPolicy; - logSet->tLogLocalities = tLogData.tLogLocalities; - logSet->isLocal = tLogData.isLocal; - logSet->locality = tLogData.locality; - logSet->startVersion = tLogData.startVersion; - logSet->satelliteTagLocations = tLogData.satelliteTagLocations; + Reference logSet(new LogSet(tLogData)); + logSystem->oldLogData[i].tLogs[j] = logSet; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -200,24 +198,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs.resize( lsConf.oldTLogs[0].tLogs.size()); for( int i = 0; i < lsConf.oldTLogs[0].tLogs.size(); i++ ) { - Reference logSet = Reference( new LogSet() ); - logSystem->tLogs[i] = logSet; TLogSet const& tLogSet = lsConf.oldTLogs[0].tLogs[i]; - for( auto & log : tLogSet.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto & log : tLogSet.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogSet.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; - logSet->tLogPolicy = tLogSet.tLogPolicy; - logSet->tLogLocalities = tLogSet.tLogLocalities; - logSet->isLocal = tLogSet.isLocal; - logSet->locality = tLogSet.locality; - logSet->startVersion = tLogSet.startVersion; - logSet->satelliteTagLocations = tLogSet.satelliteTagLocations; + Reference logSet(new LogSet(tLogSet)); + logSystem->tLogs[i] = logSet; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -228,24 +211,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData[i-1].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) { - Reference logSet = Reference( new LogSet() ); - logSystem->oldLogData[i-1].tLogs[j] = logSet; TLogSet const& tLogSet = lsConf.oldTLogs[i].tLogs[j]; - for( auto & log : tLogSet.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto & log : tLogSet.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogSet.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; - logSet->tLogPolicy = tLogSet.tLogPolicy; - logSet->tLogLocalities = tLogSet.tLogLocalities; - logSet->isLocal = tLogSet.isLocal; - logSet->locality = tLogSet.locality; - logSet->startVersion = tLogSet.startVersion; - logSet->satelliteTagLocations = tLogSet.satelliteTagLocations; + Reference logSet(new LogSet(tLogSet)); + logSystem->oldLogData[i-1].tLogs[j] = logSet; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -1374,27 +1342,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> failureTrackers; logServers.resize(prevState.tLogs.size()); - for( int i = 0; i < prevState.tLogs.size(); i++ ) { - Reference logSet = Reference( new LogSet() ); - logServers[i] = logSet; + for (int i = 0; i < prevState.tLogs.size(); i++) { CoreTLogSet const& coreSet = prevState.tLogs[i]; + Reference logSet(new LogSet(coreSet)); + logServers[i] = logSet; std::vector>> failed; - for(int j = 0; j < coreSet.tLogs.size(); j++) { - Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(coreSet.tLogs[j]) ) ); - logSet->logServers.push_back( logVar ); - allLogServers.push_back( logVar ); - failed.push_back( Reference>( new AsyncVar() ) ); - failureTrackers.push_back( monitorLog(logVar, failed[j] ) ); + for (const auto& logVar : logSet->logServers) { + allLogServers.push_back(logVar); + failed.push_back(Reference>(new AsyncVar())); + failureTrackers.push_back(monitorLog(logVar, failed.back())); } - logSet->tLogVersion = coreSet.tLogVersion; - logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor; - logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum; - logSet->tLogPolicy = coreSet.tLogPolicy; - logSet->tLogLocalities = coreSet.tLogLocalities; - logSet->isLocal = coreSet.isLocal; - logSet->locality = coreSet.locality; - logSet->startVersion = coreSet.startVersion; - logSet->satelliteTagLocations = coreSet.satelliteTagLocations; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); logFailed.push_back(failed); @@ -1404,24 +1361,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSet = Reference( new LogSet() ); + for (int j = 0; j < old.tLogs.size(); j++) { + Reference logSet(new LogSet(old.tLogs[j])); oldData.tLogs[j] = logSet; - CoreTLogSet const& log = old.tLogs[j]; - for(int k = 0; k < log.tLogs.size(); k++) { - Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(log.tLogs[k]) ) ); - logSet->logServers.push_back( logVar ); - allLogServers.push_back( logVar ); - } - logSet->tLogVersion = log.tLogVersion; - logSet->tLogReplicationFactor = log.tLogReplicationFactor; - logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum; - logSet->tLogPolicy = log.tLogPolicy; - logSet->tLogLocalities = log.tLogLocalities; - logSet->isLocal = log.isLocal; - logSet->locality = log.locality; - logSet->startVersion = log.startVersion; - logSet->satelliteTagLocations = log.satelliteTagLocations; + allLogServers.insert(allLogServers.end(), logSet->logServers.begin(), logSet->logServers.end()); filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); }