Refactor LogSet with two new constructors

This commit is contained in:
Jingyu Zhou 2019-04-10 11:21:27 -07:00
parent 0b1984978a
commit d19b0cf1c1
3 changed files with 54 additions and 108 deletions

View File

@ -835,7 +835,6 @@ void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, LocalityD
void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, std::vector<LocalityData>* vld) {
if (!policy) return;
std::set<std::string> keys = policy->attributeKeys();
for (LocalityData& ld : *vld) {
filterLocalityDataForPolicy(policy, &ld);
}

View File

@ -30,6 +30,8 @@
#include "fdbrpc/Replication.h"
struct DBCoreState;
struct TLogSet;
struct CoreTLogSet;
class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
public:
@ -50,6 +52,8 @@ public:
std::vector<std::vector<int>> satelliteTagLocations;
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
LogSet(const TLogSet& tlogSet);
LogSet(const CoreTLogSet& coreSet);
std::string logRouterString() {
std::string result;

View File

@ -58,6 +58,35 @@ struct LogLockInfo {
LogLockInfo() : epochEnd(std::numeric_limits<Version>::max()), isCurrent(false) {}
};
LogSet::LogSet(const TLogSet& tLogSet) :
tLogWriteAntiQuorum(tLogSet.tLogWriteAntiQuorum),
tLogReplicationFactor(tLogSet.tLogReplicationFactor),
tLogLocalities(tLogSet.tLogLocalities), tLogVersion(tLogSet.tLogVersion),
tLogPolicy(tLogSet.tLogPolicy), isLocal(tLogSet.isLocal),
locality(tLogSet.locality), startVersion(tLogSet.startVersion),
satelliteTagLocations(tLogSet.satelliteTagLocations)
{
for(const auto& log : tLogSet.tLogs) {
logServers.push_back(Reference<AsyncVar<OptionalInterface<TLogInterface>>>(new AsyncVar<OptionalInterface<TLogInterface>>(log)));
}
for(const auto& log : tLogSet.logRouters) {
logRouters.push_back(Reference<AsyncVar<OptionalInterface<TLogInterface>>>(new AsyncVar<OptionalInterface<TLogInterface>>(log)));
}
}
LogSet::LogSet(const CoreTLogSet& coreSet) :
tLogWriteAntiQuorum(coreSet.tLogWriteAntiQuorum),
tLogReplicationFactor(coreSet.tLogReplicationFactor),
tLogLocalities(coreSet.tLogLocalities), tLogVersion(coreSet.tLogVersion),
tLogPolicy(coreSet.tLogPolicy), isLocal(coreSet.isLocal),
locality(coreSet.locality), startVersion(coreSet.startVersion),
satelliteTagLocations(coreSet.satelliteTagLocations)
{
for(const auto& log : coreSet.tLogs) {
logServers.push_back(Reference<AsyncVar<OptionalInterface<TLogInterface>>>(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(log))));
}
}
struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogSystem> {
UID dbgid;
LogSystemType logSystemType;
@ -134,26 +163,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(useRecoveredAt) {
logSystem->recoveredAt = lsConf.recoveredAt;
}
for( int i = 0; i < lsConf.tLogs.size(); i++ ) {
TLogSet const& tLogSet = lsConf.tLogs[i];
if(!excludeRemote || tLogSet.isLocal) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->tLogs.push_back( logSet );
for( auto& log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto& log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
for (const TLogSet& tLogSet : lsConf.tLogs) {
if (!excludeRemote || tLogSet.isLocal) {
Reference<LogSet> logSet(new LogSet(tLogSet));
logSystem->tLogs.push_back(logSet);
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}
@ -162,25 +175,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->oldLogData.resize(lsConf.oldTLogs.size());
for( int i = 0; i < lsConf.oldTLogs.size(); i++ ) {
logSystem->oldLogData[i].tLogs.resize(lsConf.oldTLogs[i].tLogs.size());
for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->oldLogData[i].tLogs[j] = logSet;
for (int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++) {
TLogSet const& tLogData = lsConf.oldTLogs[i].tLogs[j];
for( auto & log : tLogData.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : tLogData.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogData.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor;
logSet->tLogPolicy = tLogData.tLogPolicy;
logSet->tLogLocalities = tLogData.tLogLocalities;
logSet->isLocal = tLogData.isLocal;
logSet->locality = tLogData.locality;
logSet->startVersion = tLogData.startVersion;
logSet->satelliteTagLocations = tLogData.satelliteTagLocations;
Reference<LogSet> logSet(new LogSet(tLogData));
logSystem->oldLogData[i].tLogs[j] = logSet;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}
@ -200,24 +198,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(lsConf.oldTLogs.size()) {
logSystem->tLogs.resize( lsConf.oldTLogs[0].tLogs.size());
for( int i = 0; i < lsConf.oldTLogs[0].tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->tLogs[i] = logSet;
TLogSet const& tLogSet = lsConf.oldTLogs[0].tLogs[i];
for( auto & log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
Reference<LogSet> logSet(new LogSet(tLogSet));
logSystem->tLogs[i] = logSet;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}
@ -228,24 +211,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) {
logSystem->oldLogData[i-1].tLogs.resize(lsConf.oldTLogs[i].tLogs.size());
for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->oldLogData[i-1].tLogs[j] = logSet;
TLogSet const& tLogSet = lsConf.oldTLogs[i].tLogs[j];
for( auto & log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
Reference<LogSet> logSet(new LogSet(tLogSet));
logSystem->oldLogData[i-1].tLogs[j] = logSet;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}
@ -1374,27 +1342,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state std::vector<Future<Void>> failureTrackers;
logServers.resize(prevState.tLogs.size());
for( int i = 0; i < prevState.tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logServers[i] = logSet;
for (int i = 0; i < prevState.tLogs.size(); i++) {
CoreTLogSet const& coreSet = prevState.tLogs[i];
Reference<LogSet> logSet(new LogSet(coreSet));
logServers[i] = logSet;
std::vector<Reference<AsyncVar<bool>>> failed;
for(int j = 0; j < coreSet.tLogs.size(); j++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(coreSet.tLogs[j]) ) );
logSet->logServers.push_back( logVar );
allLogServers.push_back( logVar );
failed.push_back( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) );
failureTrackers.push_back( monitorLog(logVar, failed[j] ) );
for (const auto& logVar : logSet->logServers) {
allLogServers.push_back(logVar);
failed.push_back(Reference<AsyncVar<bool>>(new AsyncVar<bool>()));
failureTrackers.push_back(monitorLog(logVar, failed.back()));
}
logSet->tLogVersion = coreSet.tLogVersion;
logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum;
logSet->tLogPolicy = coreSet.tLogPolicy;
logSet->tLogLocalities = coreSet.tLogLocalities;
logSet->isLocal = coreSet.isLocal;
logSet->locality = coreSet.locality;
logSet->startVersion = coreSet.startVersion;
logSet->satelliteTagLocations = coreSet.satelliteTagLocations;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
logFailed.push_back(failed);
@ -1404,24 +1361,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
OldLogData& oldData = oldLogData[i];
OldTLogCoreData const& old = prevState.oldTLogData[i];
oldData.tLogs.resize(old.tLogs.size());
for( int j = 0; j < old.tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
for (int j = 0; j < old.tLogs.size(); j++) {
Reference<LogSet> logSet(new LogSet(old.tLogs[j]));
oldData.tLogs[j] = logSet;
CoreTLogSet const& log = old.tLogs[j];
for(int k = 0; k < log.tLogs.size(); k++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(log.tLogs[k]) ) );
logSet->logServers.push_back( logVar );
allLogServers.push_back( logVar );
}
logSet->tLogVersion = log.tLogVersion;
logSet->tLogReplicationFactor = log.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum;
logSet->tLogPolicy = log.tLogPolicy;
logSet->tLogLocalities = log.tLogLocalities;
logSet->isLocal = log.isLocal;
logSet->locality = log.locality;
logSet->startVersion = log.startVersion;
logSet->satelliteTagLocations = log.satelliteTagLocations;
allLogServers.insert(allLogServers.end(), logSet->logServers.begin(), logSet->logServers.end());
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}