From ec1bc5cfca3d02ae0cc2539c582ef625b3cc0399 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 8 Apr 2019 14:45:16 -0700 Subject: [PATCH 01/17] Add LogSystemType enum --- fdbserver/DBCoreState.h | 6 +++--- fdbserver/LogSystem.h | 1 - fdbserver/LogSystemConfig.h | 10 ++++++++-- fdbserver/OldTLogServer_6_0.actor.cpp | 2 +- fdbserver/TLogServer.actor.cpp | 2 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 18 +++++++++--------- 6 files changed, 22 insertions(+), 17 deletions(-) diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index ebaaff8c73..5622a9c861 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -20,10 +20,10 @@ #ifndef FDBSERVER_DBCORESTATE_H #define FDBSERVER_DBCORESTATE_H -#pragma once #include "fdbclient/FDBTypes.h" #include "fdbrpc/ReplicationPolicy.h" +#include "fdbserver/LogSystemConfig.h" #include "fdbserver/MasterInterface.h" // This structure is stored persistently in CoordinatedState and must be versioned carefully! @@ -95,9 +95,9 @@ struct DBCoreState { int32_t logRouterTags; std::vector oldTLogData; DBRecoveryCount recoveryCount; // Increases with sequential successful recoveries. - int logSystemType; + LogSystemType logSystemType; - DBCoreState() : logRouterTags(0), recoveryCount(0), logSystemType(0) {} + DBCoreState() : logRouterTags(0), recoveryCount(0), logSystemType(LogSystemType::empty) {} vector getPriorCommittedLogServers() { vector priorCommittedLogServers; diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 390daeb095..d7b2fc9361 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -20,7 +20,6 @@ #ifndef FDBSERVER_LOGSYSTEM_H #define FDBSERVER_LOGSYSTEM_H -#pragma once #include "fdbserver/TLogInterface.h" #include "fdbserver/WorkerInterface.actor.h" diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index 6890726579..fdcaf0d80e 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -155,8 +155,14 @@ struct OldTLogConf { } }; +enum class LogSystemType { + empty = 0, + tagPartitioned = 2, +}; +BINARY_SERIALIZABLE(LogSystemType); + struct LogSystemConfig { - int32_t logSystemType; + LogSystemType logSystemType; std::vector tLogs; int32_t logRouterTags; std::vector oldTLogs; @@ -165,7 +171,7 @@ struct LogSystemConfig { bool stopped; Optional recoveredAt; - LogSystemConfig() : logSystemType(0), logRouterTags(0), expectedLogSets(0), stopped(false) {} + LogSystemConfig() : logSystemType(LogSystemType::empty), logRouterTags(0), expectedLogSets(0), stopped(false) {} std::string toString() const { return format("type: %d oldGenerations: %d tags: %d %s", logSystemType, oldTLogs.size(), logRouterTags, describe(tLogs).c_str()); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 5a0cc3ed37..c114145402 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1980,7 +1980,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit throw logData->removed.getError(); } - if (req.recoverFrom.logSystemType == 2) { + if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) { logData->unrecoveredBefore = req.startVersion; logData->recoveredAt = req.recoverAt; logData->knownCommittedVersion = req.startVersion - 1; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 6021f7410b..2ff87e4245 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -2367,7 +2367,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit throw logData->removed.getError(); } - if (req.recoverFrom.logSystemType == 2) { + if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) { logData->unrecoveredBefore = req.startVersion; logData->recoveredAt = req.recoverAt; logData->knownCommittedVersion = req.startVersion - 1; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 362d70a980..38488a4efc 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -60,7 +60,7 @@ struct LogLockInfo { struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { UID dbgid; - int logSystemType; + LogSystemType logSystemType; std::vector> tLogs; int expectedLogSets; int logRouterTags; @@ -88,7 +88,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted oldLogData; AsyncTrigger logSystemConfigChanged; - TagPartitionedLogSystem( UID dbgid, LocalityData locality, Optional>> addActor = Optional>>() ) : dbgid(dbgid), locality(locality), addActor(addActor), popActors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), logRouterTags(0), expectedLogSets(0), hasRemoteServers(false), stopped(false), repopulateRegionAntiQuorum(0) {} + TagPartitionedLogSystem( UID dbgid, LocalityData locality, Optional>> addActor = Optional>>() ) : dbgid(dbgid), locality(locality), addActor(addActor), popActors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(LogSystemType::empty), logRouterTags(0), expectedLogSets(0), hasRemoteServers(false), stopped(false), repopulateRegionAntiQuorum(0) {} virtual void stopRejoins() { rejoins = Future(); @@ -122,7 +122,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted fromLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf, bool excludeRemote, bool useRecoveredAt, Optional>> addActor ) { - ASSERT( lsConf.logSystemType == 2 || (lsConf.logSystemType == 0 && !lsConf.tLogs.size()) ); + ASSERT(lsConf.logSystemType == LogSystemType::tagPartitioned || (lsConf.logSystemType == LogSystemType::empty && !lsConf.tLogs.size())); //ASSERT(lsConf.epoch == epoch); //< FIXME Reference logSystem( new TagPartitionedLogSystem(dbgid, locality, addActor) ); @@ -193,7 +193,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted fromOldLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf ) { - ASSERT( lsConf.logSystemType == 2 || (lsConf.logSystemType == 0 && !lsConf.tLogs.size()) ); + ASSERT( lsConf.logSystemType == LogSystemType::tagPartitioned || (lsConf.logSystemType == LogSystemType::empty && !lsConf.tLogs.size()) ); //ASSERT(lsConf.epoch == epoch); //< FIXME Reference logSystem( new TagPartitionedLogSystem(dbgid, locality) ); @@ -1850,7 +1850,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted allTags, Reference> recruitmentStalled ) { state double startTime = now(); state Reference logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) ); - logSystem->logSystemType = 2; + logSystem->logSystemType = LogSystemType::tagPartitioned; logSystem->expectedLogSets = 1; logSystem->recoveredAt = oldLogSystem->recoverAt; logSystem->repopulateRegionAntiQuorum = configuration.repopulateRegionAntiQuorum; @@ -2318,18 +2318,18 @@ Future ILogSystem::recoverAndEndEpoch(Reference ILogSystem::fromLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf, bool excludeRemote, bool useRecoveredAt, Optional>> addActor ) { - if (conf.logSystemType == 0) + if (conf.logSystemType == LogSystemType::empty) return Reference(); - else if (conf.logSystemType == 2) + else if (conf.logSystemType == LogSystemType::tagPartitioned) return TagPartitionedLogSystem::fromLogSystemConfig( dbgid, locality, conf, excludeRemote, useRecoveredAt, addActor ); else throw internal_error(); } Reference ILogSystem::fromOldLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf ) { - if (conf.logSystemType == 0) + if (conf.logSystemType == LogSystemType::empty) return Reference(); - else if (conf.logSystemType == 2) + else if (conf.logSystemType == LogSystemType::tagPartitioned) return TagPartitionedLogSystem::fromOldLogSystemConfig( dbgid, locality, conf ); else throw internal_error(); From 0b1984978afb44dfbb32dd64713d5c4b4c2205e2 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 1 Apr 2019 13:56:45 -0700 Subject: [PATCH 02/17] Small code refactoring. --- fdbclient/FDBTypes.h | 4 ++++ fdbserver/LogSystem.h | 2 +- fdbserver/LogSystemConfig.h | 2 +- fdbserver/OldTLogServer_6_0.actor.cpp | 5 ++--- fdbserver/TLogServer.actor.cpp | 5 ++--- 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index a8b29df913..e263eadbe4 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -47,6 +47,10 @@ struct Tag { bool operator != ( const Tag& r ) const { return locality!=r.locality || id!=r.id; } bool operator < ( const Tag& r ) const { return locality < r.locality || (locality == r.locality && id < r.id); } + int toTagDataIndex() { + return locality >= 0 ? 2 * locality : 1 - (2 * locality); + } + std::string toString() const { return format("%d:%d", locality, id); } diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index d7b2fc9361..88e9448f2a 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -283,7 +283,7 @@ struct ILogSystem { virtual bool hasMessage() = 0; //pre: only callable if hasMessage() returns true - //return the tags associated with the message for teh current sequence + //return the tags associated with the message for the current sequence virtual const std::vector& getTags() = 0; //pre: only callable if hasMessage() returns true diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index fdcaf0d80e..8dbf332aa8 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -312,7 +312,7 @@ struct LogSystemConfig { } for( auto& i : r.tLogs ) { - for( auto& j : oldTLogs[0].tLogs ) { + for( auto& j : oldTLogs[0].tLogs ) { if( i.isEqualIds(j) ) { return true; } diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index c114145402..35de35251c 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -373,7 +373,7 @@ struct LogData : NonCopyable, public ReferenceCounted { int unpoppedRecoveredTags; Reference getTagData(Tag tag) { - int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality); + int idx = tag.toTagDataIndex(); if(idx >= tag_data.size()) { tag_data.resize(idx+1); } @@ -389,8 +389,7 @@ struct LogData : NonCopyable, public ReferenceCounted { popped = recoveredAt + 1; } Reference newTagData = Reference( new TagData(tag, popped, nothingPersistent, poppedRecently, unpoppedRecovered) ); - int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality); - tag_data[idx][tag.id] = newTagData; + tag_data[tag.toTagDataIndex()][tag.id] = newTagData; return newTagData; } diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 2ff87e4245..67cd0ee5fd 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -429,7 +429,7 @@ struct LogData : NonCopyable, public ReferenceCounted { int unpoppedRecoveredTags; Reference getTagData(Tag tag) { - int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality); + int idx = tag.toTagDataIndex(); if(idx >= tag_data.size()) { tag_data.resize(idx+1); } @@ -445,8 +445,7 @@ struct LogData : NonCopyable, public ReferenceCounted { popped = recoveredAt + 1; } Reference newTagData = Reference( new TagData(tag, popped, 0, nothingPersistent, poppedRecently, unpoppedRecovered) ); - int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality); - tag_data[idx][tag.id] = newTagData; + tag_data[tag.toTagDataIndex()][tag.id] = newTagData; return newTagData; } From d19b0cf1c19dcd9828fdfdf5926ec73cbce470b7 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 10 Apr 2019 11:21:27 -0700 Subject: [PATCH 03/17] Refactor LogSet with two new constructors --- fdbrpc/ReplicationUtils.cpp | 1 - fdbserver/LogSystem.h | 4 + fdbserver/TagPartitionedLogSystem.actor.cpp | 157 +++++++------------- 3 files changed, 54 insertions(+), 108 deletions(-) diff --git a/fdbrpc/ReplicationUtils.cpp b/fdbrpc/ReplicationUtils.cpp index d6be5507f8..356194d5da 100644 --- a/fdbrpc/ReplicationUtils.cpp +++ b/fdbrpc/ReplicationUtils.cpp @@ -835,7 +835,6 @@ void filterLocalityDataForPolicy(Reference policy, LocalityD void filterLocalityDataForPolicy(Reference policy, std::vector* vld) { if (!policy) return; - std::set keys = policy->attributeKeys(); for (LocalityData& ld : *vld) { filterLocalityDataForPolicy(policy, &ld); } diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 88e9448f2a..29a3e65ebb 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -30,6 +30,8 @@ #include "fdbrpc/Replication.h" struct DBCoreState; +struct TLogSet; +struct CoreTLogSet; class LogSet : NonCopyable, public ReferenceCounted { public: @@ -50,6 +52,8 @@ public: std::vector> satelliteTagLocations; LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {} + LogSet(const TLogSet& tlogSet); + LogSet(const CoreTLogSet& coreSet); std::string logRouterString() { std::string result; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 38488a4efc..c9d928e898 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -58,6 +58,35 @@ struct LogLockInfo { LogLockInfo() : epochEnd(std::numeric_limits::max()), isCurrent(false) {} }; +LogSet::LogSet(const TLogSet& tLogSet) : + tLogWriteAntiQuorum(tLogSet.tLogWriteAntiQuorum), + tLogReplicationFactor(tLogSet.tLogReplicationFactor), + tLogLocalities(tLogSet.tLogLocalities), tLogVersion(tLogSet.tLogVersion), + tLogPolicy(tLogSet.tLogPolicy), isLocal(tLogSet.isLocal), + locality(tLogSet.locality), startVersion(tLogSet.startVersion), + satelliteTagLocations(tLogSet.satelliteTagLocations) +{ + for(const auto& log : tLogSet.tLogs) { + logServers.push_back(Reference>>(new AsyncVar>(log))); + } + for(const auto& log : tLogSet.logRouters) { + logRouters.push_back(Reference>>(new AsyncVar>(log))); + } +} + +LogSet::LogSet(const CoreTLogSet& coreSet) : + tLogWriteAntiQuorum(coreSet.tLogWriteAntiQuorum), + tLogReplicationFactor(coreSet.tLogReplicationFactor), + tLogLocalities(coreSet.tLogLocalities), tLogVersion(coreSet.tLogVersion), + tLogPolicy(coreSet.tLogPolicy), isLocal(coreSet.isLocal), + locality(coreSet.locality), startVersion(coreSet.startVersion), + satelliteTagLocations(coreSet.satelliteTagLocations) +{ + for(const auto& log : coreSet.tLogs) { + logServers.push_back(Reference>>(new AsyncVar>(OptionalInterface(log)))); + } +} + struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { UID dbgid; LogSystemType logSystemType; @@ -134,26 +163,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedrecoveredAt = lsConf.recoveredAt; } - for( int i = 0; i < lsConf.tLogs.size(); i++ ) { - TLogSet const& tLogSet = lsConf.tLogs[i]; - if(!excludeRemote || tLogSet.isLocal) { - Reference logSet = Reference( new LogSet() ); - logSystem->tLogs.push_back( logSet ); - for( auto& log : tLogSet.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto& log : tLogSet.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogSet.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; - logSet->tLogPolicy = tLogSet.tLogPolicy; - logSet->tLogLocalities = tLogSet.tLogLocalities; - logSet->isLocal = tLogSet.isLocal; - logSet->locality = tLogSet.locality; - logSet->startVersion = tLogSet.startVersion; - logSet->satelliteTagLocations = tLogSet.satelliteTagLocations; + for (const TLogSet& tLogSet : lsConf.tLogs) { + if (!excludeRemote || tLogSet.isLocal) { + Reference logSet(new LogSet(tLogSet)); + logSystem->tLogs.push_back(logSet); filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -162,25 +175,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData.resize(lsConf.oldTLogs.size()); for( int i = 0; i < lsConf.oldTLogs.size(); i++ ) { logSystem->oldLogData[i].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); - for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) { - Reference logSet = Reference( new LogSet() ); - logSystem->oldLogData[i].tLogs[j] = logSet; + for (int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++) { TLogSet const& tLogData = lsConf.oldTLogs[i].tLogs[j]; - for( auto & log : tLogData.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto & log : tLogData.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogData.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor; - logSet->tLogPolicy = tLogData.tLogPolicy; - logSet->tLogLocalities = tLogData.tLogLocalities; - logSet->isLocal = tLogData.isLocal; - logSet->locality = tLogData.locality; - logSet->startVersion = tLogData.startVersion; - logSet->satelliteTagLocations = tLogData.satelliteTagLocations; + Reference logSet(new LogSet(tLogData)); + logSystem->oldLogData[i].tLogs[j] = logSet; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -200,24 +198,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs.resize( lsConf.oldTLogs[0].tLogs.size()); for( int i = 0; i < lsConf.oldTLogs[0].tLogs.size(); i++ ) { - Reference logSet = Reference( new LogSet() ); - logSystem->tLogs[i] = logSet; TLogSet const& tLogSet = lsConf.oldTLogs[0].tLogs[i]; - for( auto & log : tLogSet.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto & log : tLogSet.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogSet.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; - logSet->tLogPolicy = tLogSet.tLogPolicy; - logSet->tLogLocalities = tLogSet.tLogLocalities; - logSet->isLocal = tLogSet.isLocal; - logSet->locality = tLogSet.locality; - logSet->startVersion = tLogSet.startVersion; - logSet->satelliteTagLocations = tLogSet.satelliteTagLocations; + Reference logSet(new LogSet(tLogSet)); + logSystem->tLogs[i] = logSet; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -228,24 +211,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData[i-1].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) { - Reference logSet = Reference( new LogSet() ); - logSystem->oldLogData[i-1].tLogs[j] = logSet; TLogSet const& tLogSet = lsConf.oldTLogs[i].tLogs[j]; - for( auto & log : tLogSet.tLogs) { - logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - for( auto & log : tLogSet.logRouters) { - logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); - } - logSet->tLogVersion = tLogSet.tLogVersion; - logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; - logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; - logSet->tLogPolicy = tLogSet.tLogPolicy; - logSet->tLogLocalities = tLogSet.tLogLocalities; - logSet->isLocal = tLogSet.isLocal; - logSet->locality = tLogSet.locality; - logSet->startVersion = tLogSet.startVersion; - logSet->satelliteTagLocations = tLogSet.satelliteTagLocations; + Reference logSet(new LogSet(tLogSet)); + logSystem->oldLogData[i-1].tLogs[j] = logSet; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } @@ -1374,27 +1342,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> failureTrackers; logServers.resize(prevState.tLogs.size()); - for( int i = 0; i < prevState.tLogs.size(); i++ ) { - Reference logSet = Reference( new LogSet() ); - logServers[i] = logSet; + for (int i = 0; i < prevState.tLogs.size(); i++) { CoreTLogSet const& coreSet = prevState.tLogs[i]; + Reference logSet(new LogSet(coreSet)); + logServers[i] = logSet; std::vector>> failed; - for(int j = 0; j < coreSet.tLogs.size(); j++) { - Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(coreSet.tLogs[j]) ) ); - logSet->logServers.push_back( logVar ); - allLogServers.push_back( logVar ); - failed.push_back( Reference>( new AsyncVar() ) ); - failureTrackers.push_back( monitorLog(logVar, failed[j] ) ); + for (const auto& logVar : logSet->logServers) { + allLogServers.push_back(logVar); + failed.push_back(Reference>(new AsyncVar())); + failureTrackers.push_back(monitorLog(logVar, failed.back())); } - logSet->tLogVersion = coreSet.tLogVersion; - logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor; - logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum; - logSet->tLogPolicy = coreSet.tLogPolicy; - logSet->tLogLocalities = coreSet.tLogLocalities; - logSet->isLocal = coreSet.isLocal; - logSet->locality = coreSet.locality; - logSet->startVersion = coreSet.startVersion; - logSet->satelliteTagLocations = coreSet.satelliteTagLocations; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); logFailed.push_back(failed); @@ -1404,24 +1361,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSet = Reference( new LogSet() ); + for (int j = 0; j < old.tLogs.size(); j++) { + Reference logSet(new LogSet(old.tLogs[j])); oldData.tLogs[j] = logSet; - CoreTLogSet const& log = old.tLogs[j]; - for(int k = 0; k < log.tLogs.size(); k++) { - Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(log.tLogs[k]) ) ); - logSet->logServers.push_back( logVar ); - allLogServers.push_back( logVar ); - } - logSet->tLogVersion = log.tLogVersion; - logSet->tLogReplicationFactor = log.tLogReplicationFactor; - logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum; - logSet->tLogPolicy = log.tLogPolicy; - logSet->tLogLocalities = log.tLogLocalities; - logSet->isLocal = log.isLocal; - logSet->locality = log.locality; - logSet->startVersion = log.startVersion; - logSet->satelliteTagLocations = log.satelliteTagLocations; + allLogServers.insert(allLogServers.end(), logSet->logServers.begin(), logSet->logServers.end()); filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } From 82ec80c42f1e35c2e6aac9ce32a7284ab3fd0efd Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 11 Apr 2019 11:36:24 -0700 Subject: [PATCH 04/17] Refactor TLogSet ctor --- fdbserver/LogSystemConfig.h | 3 + fdbserver/TagPartitionedLogSystem.actor.cpp | 71 ++++++++------------- 2 files changed, 28 insertions(+), 46 deletions(-) diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index 8dbf332aa8..8da102023a 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -55,6 +55,8 @@ protected: Optional iface; }; +class LogSet; + struct TLogSet { std::vector> tLogs; std::vector> logRouters; @@ -68,6 +70,7 @@ struct TLogSet { std::vector> satelliteTagLocations; TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {} + explicit TLogSet(const LogSet& rhs); std::string toString() const { return format("anti: %d replication: %d local: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, logRouters.size(), describe(tLogs).c_str(), locality); diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index c9d928e898..8a0441dc13 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -87,6 +87,23 @@ LogSet::LogSet(const CoreTLogSet& coreSet) : } } +TLogSet::TLogSet(const LogSet& rhs) : + tLogWriteAntiQuorum(rhs.tLogWriteAntiQuorum), + tLogReplicationFactor(rhs.tLogReplicationFactor), + tLogLocalities(rhs.tLogLocalities), tLogVersion(rhs.tLogVersion), + tLogPolicy(rhs.tLogPolicy), isLocal(rhs.isLocal), locality(rhs.locality), + startVersion(rhs.startVersion), + satelliteTagLocations(rhs.satelliteTagLocations) +{ + for (const auto& tlog : rhs.logServers) { + tLogs.push_back(tlog->get()); + } + + for (const auto& logRouter : rhs.logRouters) { + logRouters.push_back(logRouter->get()); + } +} + struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { UID dbgid; LogSystemType logSystemType; @@ -995,59 +1012,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSet = tLogs[i]; - if(logSet->isLocal || remoteLogsWrittenToCoreState) { - logSystemConfig.tLogs.push_back(TLogSet()); - TLogSet& log = logSystemConfig.tLogs.back(); - log.tLogVersion = logSet->tLogVersion; - log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; - log.tLogReplicationFactor = logSet->tLogReplicationFactor; - log.tLogPolicy = logSet->tLogPolicy; - log.tLogLocalities = logSet->tLogLocalities; - log.isLocal = logSet->isLocal; - log.locality = logSet->locality; - log.startVersion = logSet->startVersion; - log.satelliteTagLocations = logSet->satelliteTagLocations; - - for( int i = 0; i < logSet->logServers.size(); i++ ) { - log.tLogs.push_back(logSet->logServers[i]->get()); - } - - for( int i = 0; i < logSet->logRouters.size(); i++ ) { - log.logRouters.push_back(logSet->logRouters[i]->get()); - } + for (const Reference& logSet : tLogs) { + if (logSet->isLocal || remoteLogsWrittenToCoreState) { + logSystemConfig.tLogs.push_back(TLogSet(*logSet)); } } if(!recoveryCompleteWrittenToCoreState.get()) { - for( int i = 0; i < oldLogData.size(); i++ ) { + for (const auto& oldData : oldLogData) { logSystemConfig.oldTLogs.push_back(OldTLogConf()); - logSystemConfig.oldTLogs[i].tLogs.resize(oldLogData[i].tLogs.size()); - for( int j = 0; j < oldLogData[i].tLogs.size(); j++ ) { - TLogSet& log = logSystemConfig.oldTLogs[i].tLogs[j]; - Reference logSet = oldLogData[i].tLogs[j]; - log.tLogVersion = logSet->tLogVersion; - log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; - log.tLogReplicationFactor = logSet->tLogReplicationFactor; - log.tLogPolicy = logSet->tLogPolicy; - log.tLogLocalities = logSet->tLogLocalities; - log.isLocal = logSet->isLocal; - log.locality = logSet->locality; - log.startVersion = logSet->startVersion; - log.satelliteTagLocations = logSet->satelliteTagLocations; - - for( int i = 0; i < logSet->logServers.size(); i++ ) { - log.tLogs.push_back(logSet->logServers[i]->get()); - } - - for( int i = 0; i < logSet->logRouters.size(); i++ ) { - log.logRouters.push_back(logSet->logRouters[i]->get()); - } + for (const Reference& logSet : oldData.tLogs) { + logSystemConfig.oldTLogs.back().tLogs.push_back(TLogSet(*logSet)); } - logSystemConfig.oldTLogs[i].logRouterTags = oldLogData[i].logRouterTags; - logSystemConfig.oldTLogs[i].epochEnd = oldLogData[i].epochEnd; + logSystemConfig.oldTLogs.back().logRouterTags = oldData.logRouterTags; + logSystemConfig.oldTLogs.back().epochEnd = oldData.epochEnd; } } return logSystemConfig; From b4e7e7a85bef8e667f36babda10b39d656b72ed7 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 12 Apr 2019 11:11:11 -0700 Subject: [PATCH 05/17] Refactor StorageCache updates --- fdbserver/ApplyMetadataMutation.h | 39 +++++++++++++-------------- fdbserver/MasterProxyServer.actor.cpp | 25 +++-------------- 2 files changed, 22 insertions(+), 42 deletions(-) diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index 752435f06c..ed273cd2c7 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -42,6 +42,20 @@ struct applyMutationsData { Reference> keyVersion; }; +static Reference getStorageInfo(UID id, std::map>* storageCache, IKeyValueStore* txnStateStore) { + Reference storageInfo; + auto cacheItr = storageCache->find(id); + if(cacheItr == storageCache->end()) { + storageInfo = Reference( new StorageInfo() ); + storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); + storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); + (*storageCache)[id] = storageInfo; + } else { + storageInfo = cacheItr->second; + } + return storageInfo; +} + // It is incredibly important that any modifications to txnStateStore are done in such a way that // the same operations will be done on all proxies at the same time. Otherwise, the data stored in // txnStateStore will become corrupted. @@ -62,36 +76,19 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef storageInfo; ServerCacheInfo info; info.tags.reserve(src.size() + dest.size()); info.src_info.reserve(src.size()); info.dest_info.reserve(dest.size()); - for(auto id : src) { - auto cacheItr = storageCache->find(id); - if(cacheItr == storageCache->end()) { - storageInfo = Reference( new StorageInfo() ); - storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); - (*storageCache)[id] = storageInfo; - } else { - storageInfo = cacheItr->second; - } + for (const auto& id : src) { + auto storageInfo = getStorageInfo(id, storageCache, txnStateStore); ASSERT(storageInfo->tag != invalidTag); info.tags.push_back( storageInfo->tag ); info.src_info.push_back( storageInfo ); } - for(auto id : dest) { - auto cacheItr = storageCache->find(id); - if(cacheItr == storageCache->end()) { - storageInfo = Reference( new StorageInfo() ); - storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); - (*storageCache)[id] = storageInfo; - } else { - storageInfo = cacheItr->second; - } + for (const auto& id : dest) { + auto storageInfo = getStorageInfo(id, storageCache, txnStateStore); ASSERT(storageInfo->tag != invalidTag); info.tags.push_back( storageInfo->tag ); info.dest_info.push_back( storageInfo ); diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index fe0301260a..2f75f0380c 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -1561,7 +1561,6 @@ ACTOR Future masterProxyServerCore( Standalone> mutations; std::vector,int>> keyInfoData; vector src, dest; - Reference storageInfo; ServerCacheInfo info; for(auto &kv : data) { if( kv.key.startsWith(keyServersPrefix) ) { @@ -1571,30 +1570,14 @@ ACTOR Future masterProxyServerCore( info.tags.clear(); info.src_info.clear(); info.dest_info.clear(); - for(auto& id : src) { - auto cacheItr = commitData.storageCache.find(id); - if(cacheItr == commitData.storageCache.end()) { - storageInfo = Reference( new StorageInfo() ); - storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); - commitData.storageCache[id] = storageInfo; - } else { - storageInfo = cacheItr->second; - } + for (const auto& id : src) { + auto storageInfo = getStorageInfo(id, &commitData.storageCache, commitData.txnStateStore); ASSERT(storageInfo->tag != invalidTag); info.tags.push_back( storageInfo->tag ); info.src_info.push_back( storageInfo ); } - for(auto& id : dest) { - auto cacheItr = commitData.storageCache.find(id); - if(cacheItr == commitData.storageCache.end()) { - storageInfo = Reference( new StorageInfo() ); - storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); - commitData.storageCache[id] = storageInfo; - } else { - storageInfo = cacheItr->second; - } + for (const auto& id : dest) { + auto storageInfo = getStorageInfo(id, &commitData.storageCache, commitData.txnStateStore); ASSERT(storageInfo->tag != invalidTag); info.tags.push_back( storageInfo->tag ); info.dest_info.push_back( storageInfo ); From 966ec30fccbc4283378d3f0d09480da703e47de3 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 10 Apr 2019 10:30:34 -0700 Subject: [PATCH 06/17] Add pseudoLocalities for special tag consumers --- fdbserver/ApplyMetadataMutation.h | 6 ++---- fdbserver/DBCoreState.h | 17 +++++++++++------ fdbserver/LogRouter.actor.cpp | 2 +- fdbserver/LogSystem.h | 9 +++++++++ fdbserver/LogSystemConfig.h | 5 +++-- fdbserver/MasterProxyServer.actor.cpp | 12 ++++-------- fdbserver/TagPartitionedLogSystem.actor.cpp | 16 ++++++++-------- fdbserver/masterserver.actor.cpp | 2 -- 8 files changed, 38 insertions(+), 31 deletions(-) diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index ed273cd2c7..1c5ffda43e 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -255,15 +255,13 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRefaddTag(t); + toCommit->addTags(allTags); toCommit->addTypedMessage(LogProtocolMessage()); } MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - for (auto t : allTags) - toCommit->addTag(t); + toCommit->addTags(allTags); toCommit->addTypedMessage(privatized); } } diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index 5622a9c861..ec6d1375d4 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -21,6 +21,9 @@ #ifndef FDBSERVER_DBCORESTATE_H #define FDBSERVER_DBCORESTATE_H +#include +#include + #include "fdbclient/FDBTypes.h" #include "fdbrpc/ReplicationPolicy.h" #include "fdbserver/LogSystemConfig.h" @@ -52,12 +55,12 @@ struct CoreTLogSet { bool operator == (CoreTLogSet const& rhs) const { return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && satelliteTagLocations == rhs.satelliteTagLocations && - locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); + locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); } template void serialize(Archive& ar) { - serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); + serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { tLogVersion = TLogVersion::V2; } else { @@ -70,17 +73,18 @@ struct OldTLogCoreData { std::vector tLogs; int32_t logRouterTags; Version epochEnd; + std::set pseudoLocalities; OldTLogCoreData() : epochEnd(0), logRouterTags(0) {} bool operator == (OldTLogCoreData const& rhs) const { - return tLogs == rhs.tLogs && logRouterTags == rhs.logRouterTags && epochEnd == rhs.epochEnd; + return tLogs == rhs.tLogs && logRouterTags == rhs.logRouterTags && epochEnd == rhs.epochEnd && pseudoLocalities == rhs.pseudoLocalities; } template void serialize(Archive& ar) { if( ar.protocolVersion() >= 0x0FDB00A560010001LL) { - serializer(ar, tLogs, logRouterTags, epochEnd); + serializer(ar, tLogs, logRouterTags, epochEnd, pseudoLocalities); } else if(ar.isDeserializing) { tLogs.push_back(CoreTLogSet()); @@ -96,6 +100,7 @@ struct DBCoreState { std::vector oldTLogData; DBRecoveryCount recoveryCount; // Increases with sequential successful recoveries. LogSystemType logSystemType; + std::set pseudoLocalities; DBCoreState() : logRouterTags(0), recoveryCount(0), logSystemType(LogSystemType::empty) {} @@ -117,7 +122,7 @@ struct DBCoreState { } bool isEqual(DBCoreState const& r) const { - return logSystemType == r.logSystemType && recoveryCount == r.recoveryCount && tLogs == r.tLogs && oldTLogData == r.oldTLogData && logRouterTags == r.logRouterTags; + return logSystemType == r.logSystemType && recoveryCount == r.recoveryCount && tLogs == r.tLogs && oldTLogData == r.oldTLogData && logRouterTags == r.logRouterTags && pseudoLocalities == r.pseudoLocalities; } bool operator == ( const DBCoreState& rhs ) const { return isEqual(rhs); } @@ -131,7 +136,7 @@ struct DBCoreState { ASSERT(ar.protocolVersion() >= 0x0FDB00A460010001LL); if(ar.protocolVersion() >= 0x0FDB00A560010001LL) { - serializer(ar, tLogs, logRouterTags, oldTLogData, recoveryCount, logSystemType); + serializer(ar, tLogs, logRouterTags, oldTLogData, recoveryCount, logSystemType, pseudoLocalities); } else if(ar.isDeserializing) { tLogs.push_back(CoreTLogSet()); serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, recoveryCount, tLogs[0].tLogReplicationFactor, logSystemType); diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 0e537ec98d..c6da8537b9 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -103,7 +103,7 @@ struct LogRouterData { return newTagData; } - LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar>()), version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false) { + LogRouterData(UID dbgid, const InitializeLogRouterRequest& req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar>()), version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false) { //setup just enough of a logSet to be able to call getPushLocations logSet.logServers.resize(req.tLogLocalities.size()); logSet.tLogPolicy = req.tLogPolicy; diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 29a3e65ebb..425941322f 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -21,6 +21,9 @@ #ifndef FDBSERVER_LOGSYSTEM_H #define FDBSERVER_LOGSYSTEM_H +#include +#include + #include "fdbserver/TLogInterface.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbclient/DatabaseConfiguration.h" @@ -50,6 +53,7 @@ public: Version startVersion; std::vector> replies; std::vector> satelliteTagLocations; + std::set pseudoLocalities; LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {} LogSet(const TLogSet& tlogSet); @@ -728,6 +732,11 @@ struct LogPushData : NonCopyable { next_message_tags.push_back( tag ); } + template + void addTags(T tags) { + next_message_tags.insert(next_message_tags.end(), tags.begin(), tags.end()); + } + void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations = false ) { if( !usePreviousLocations ) { prev_tags.clear(); diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index 8da102023a..464bc4b2ba 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -173,6 +173,7 @@ struct LogSystemConfig { UID recruitmentID; bool stopped; Optional recoveredAt; + std::set pseudoLocalities; LogSystemConfig() : logSystemType(LogSystemType::empty), logRouterTags(0), expectedLogSets(0), stopped(false) {} @@ -295,7 +296,7 @@ struct LogSystemConfig { bool operator == ( const LogSystemConfig& rhs ) const { return isEqual(rhs); } bool isEqual(LogSystemConfig const& r) const { - return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags && recruitmentID == r.recruitmentID && stopped == r.stopped && recoveredAt == r.recoveredAt; + return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags && recruitmentID == r.recruitmentID && stopped == r.stopped && recoveredAt == r.recoveredAt && pseudoLocalities == r.pseudoLocalities; } bool isEqualIds(LogSystemConfig const& r) const { @@ -326,7 +327,7 @@ struct LogSystemConfig { template void serialize( Ar& ar ) { - serializer(ar, logSystemType, tLogs, logRouterTags, oldTLogs, expectedLogSets, recruitmentID, stopped, recoveredAt); + serializer(ar, logSystemType, tLogs, logRouterTags, oldTLogs, expectedLogSets, recruitmentID, stopped, recoveredAt, pseudoLocalities); } }; diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 2f75f0380c..e5a383ff16 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -699,8 +699,7 @@ ACTOR Future commitBatch( if (debugMutation("ProxyCommit", commitVersion, m)) TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("Mutation", m.toString()).detail("Version", commitVersion); - for (auto& tag : tags) - toCommit.addTag(tag); + toCommit.addTags(tags); toCommit.addTypedMessage(m); } else if (m.type == MutationRef::ClearRange) { @@ -723,8 +722,7 @@ ACTOR Future commitBatch( uniquify(tags); } - for (auto& tag : tags) - toCommit.addTag(tag); + toCommit.addTags(tags); } else { TEST(true); //A clear range extends past a shard boundary @@ -744,8 +742,7 @@ ACTOR Future commitBatch( } if (debugMutation("ProxyCommit", commitVersion, m)) TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion); - for (auto& tag : allSources) - toCommit.addTag(tag); + toCommit.addTags(allSources); } toCommit.addTypedMessage(m); } @@ -836,8 +833,7 @@ ACTOR Future commitBatch( ASSERT( backupMutation.param1.startsWith(logRangeMutation.first) ); // We are writing into the configured destination auto& tags = self->tagsForKey(backupMutation.param1); - for (auto& tag : tags) - toCommit.addTag(tag); + toCommit.addTags(tags); toCommit.addTypedMessage(backupMutation); // if (debugMutation("BackupProxyCommit", commitVersion, backupMutation)) { diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 8a0441dc13..51292a2645 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -45,6 +45,7 @@ struct OldLogData { std::vector> tLogs; int32_t logRouterTags; Version epochEnd; + std::set pseudoLocalities; OldLogData() : epochEnd(0), logRouterTags(0) {} }; @@ -113,6 +114,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted pseudoLocalities; // new members Future rejoins; @@ -180,6 +182,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedrecoveredAt = lsConf.recoveredAt; } + logSystem->pseudoLocalities = lsConf.pseudoLocalities; for (const TLogSet& tLogSet : lsConf.tLogs) { if (!excludeRemote || tLogSet.isLocal) { Reference logSet(new LogSet(tLogSet)); @@ -1473,6 +1476,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedknownCommittedVersion = knownCommittedVersion; logSystem->remoteLogsWrittenToCoreState = true; logSystem->stopped = true; + logSystem->pseudoLocalities = prevState.pseudoLocalities; outLogSystem->set(logSystem); } @@ -1782,8 +1786,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 1) { logSystem->logRouterTags = recr.tLogs.size() * std::max(1, configuration.desiredLogRouterCount / std::max(1,recr.tLogs.size())); logSystem->expectedLogSets++; - } else { - logSystem->logRouterTags = 0; + logSystem->pseudoLocalities.insert(tagLocalityLogRouter); } logSystem->tLogs.push_back( Reference( new LogSet() ) ); @@ -1829,11 +1832,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData[0].tLogs = oldLogSystem->tLogs; logSystem->oldLogData[0].epochEnd = oldLogSystem->knownCommittedVersion + 1; logSystem->oldLogData[0].logRouterTags = oldLogSystem->logRouterTags; + logSystem->oldLogData[0].pseudoLocalities = oldLogSystem->pseudoLocalities; } - - for(int i = 0; i < oldLogSystem->oldLogData.size(); i++) { - logSystem->oldLogData.push_back(oldLogSystem->oldLogData[i]); - } + logSystem->oldLogData.insert(logSystem->oldLogData.end(), oldLogSystem->oldLogData.begin(), oldLogSystem->oldLogData.end()); logSystem->tLogs[0]->startVersion = oldLogSystem->knownCommittedVersion + 1; state int lockNum = 0; @@ -1930,8 +1931,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> recoveryComplete; if(region.satelliteTLogReplicationFactor > 0) { - std::vector satelliteTags; - satelliteTags.push_back(txsTag); + std::vector satelliteTags(1, txsTag); state vector> satelliteInitializationReplies; vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() ); diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 191ddced1e..9f6630d3d5 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -326,12 +326,10 @@ ACTOR Future newTLogServers( Reference self, RecruitFromConfig Future fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max(1, self->configuration.desiredLogRouterCount / std::max(1, recr.tLogs.size())), exclusionWorkerIds) ) ); self->primaryLocality = self->dcId_locality[recr.dcId]; - self->logSystem = Reference(); Reference newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) ); self->logSystem = newLogSystem; } else { self->primaryLocality = tagLocalitySpecial; - self->logSystem = Reference(); Reference newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) ); self->logSystem = newLogSystem; } From 66000a07a5d105a415f70725e32b3b85c93e00bd Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 16 Apr 2019 22:34:56 -0700 Subject: [PATCH 07/17] Use emplace_back instead of push_back --- fdbserver/LogRouter.actor.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index c6da8537b9..a069046078 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -145,7 +145,7 @@ void commitMessages( LogRouterData* self, Version version, const std::vector block.capacity() - block.size()) { - self->messageBlocks.push_back( std::make_pair(version, block) ); + self->messageBlocks.emplace_back(version, block); block = Standalone>(); block.reserve(block.arena(), std::max(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize)); } @@ -158,7 +158,7 @@ void commitMessages( LogRouterData* self, Version version, const std::vector= tagData->popped) { - tagData->version_messages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size())))); + tagData->version_messages.emplace_back(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))); if(tagData->version_messages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) { TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tagData->version_messages.back().second.expectedSize()); } @@ -167,7 +167,7 @@ void commitMessages( LogRouterData* self, Version version, const std::vectormessageBlocks.push_back( std::make_pair(version, block) ); + self->messageBlocks.emplace_back(version, block); } ACTOR Future waitForVersion( LogRouterData *self, Version ver ) { @@ -259,8 +259,8 @@ ACTOR Future pullAsyncData( LogRouterData *self ) { tagAndMsg.message = r->getMessageWithTags(); tags.clear(); self->logSet.getPushLocations(r->getTags(), tags, 0); - for(auto t : tags) { - tagAndMsg.tags.push_back(Tag(tagLocalityRemoteLog, t)); + for (const auto& t : tags) { + tagAndMsg.tags.emplace_back(tagLocalityRemoteLog, t); } messages.push_back(std::move(tagAndMsg)); From 7befce6bf19b08743f411ad32b42c044d1857505 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 18 Apr 2019 10:18:11 -0700 Subject: [PATCH 08/17] More pseudoLocalities and refactors. --- fdbserver/DBCoreState.h | 8 +- fdbserver/LogSystem.h | 9 ++ fdbserver/LogSystemConfig.h | 12 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 159 +++++++++----------- 4 files changed, 92 insertions(+), 96 deletions(-) diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index ec6d1375d4..c6b3eff7e8 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -29,6 +29,8 @@ #include "fdbserver/LogSystemConfig.h" #include "fdbserver/MasterInterface.h" +class LogSet; + // This structure is stored persistently in CoordinatedState and must be versioned carefully! // It records a synchronous replication topology which can be used in the absence of faults (or under a limited // number of failures, in the case of less than full write quorums) to durably commit transactions. When faults or @@ -50,17 +52,19 @@ struct CoreTLogSet { Version startVersion; std::vector> satelliteTagLocations; TLogVersion tLogVersion; + std::set pseudoLocalitites; CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {} + explicit CoreTLogSet(const LogSet& logset); bool operator == (CoreTLogSet const& rhs) const { return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && satelliteTagLocations == rhs.satelliteTagLocations && - locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); + pseudoLocalitites == rhs.pseudoLocalitites && locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); } template void serialize(Archive& ar) { - serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); + serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, pseudoLocalitites); if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { tLogVersion = TLogVersion::V2; } else { diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 425941322f..4a2e44564d 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -70,6 +70,15 @@ public: return result; } + bool hasLogRouter(UID id) { + for (const auto& router : logRouters) { + if (router->get().id() == id) { + return true; + } + } + return false; + } + std::string logServerString() { std::string result; for(int i = 0; i < logServers.size(); i++) { diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index 464bc4b2ba..dadb807484 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -68,6 +68,7 @@ struct TLogSet { int8_t locality; Version startVersion; std::vector> satelliteTagLocations; + std::set pseudoLocalitites; TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {} explicit TLogSet(const LogSet& rhs); @@ -78,7 +79,7 @@ struct TLogSet { bool operator == ( const TLogSet& rhs ) const { if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || satelliteTagLocations != rhs.satelliteTagLocations || - startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) { + pseudoLocalitites != rhs.pseudoLocalitites || startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) { return false; } if ((tLogPolicy && !rhs.tLogPolicy) || (!tLogPolicy && rhs.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != rhs.tLogPolicy->info()))) { @@ -99,7 +100,7 @@ struct TLogSet { bool isEqualIds(TLogSet const& r) const { if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || satelliteTagLocations != r.satelliteTagLocations || - startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) { + pseudoLocalitites != r.pseudoLocalitites || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) { return false; } if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) { @@ -115,7 +116,7 @@ struct TLogSet { template void serialize( Ar& ar ) { - serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); + serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, pseudoLocalitites); if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { tLogVersion = TLogVersion::V2; } else { @@ -129,6 +130,7 @@ struct OldTLogConf { std::vector tLogs; Version epochEnd; int32_t logRouterTags; + std::set pseudoLocalities; OldTLogConf() : epochEnd(0), logRouterTags(0) {} @@ -137,7 +139,7 @@ struct OldTLogConf { } bool operator == ( const OldTLogConf& rhs ) const { - return tLogs == rhs.tLogs && epochEnd == rhs.epochEnd && logRouterTags == rhs.logRouterTags; + return tLogs == rhs.tLogs && epochEnd == rhs.epochEnd && logRouterTags == rhs.logRouterTags && pseudoLocalities == rhs.pseudoLocalities; } bool isEqualIds(OldTLogConf const& r) const { @@ -154,7 +156,7 @@ struct OldTLogConf { template void serialize( Ar& ar ) { - serializer(ar, tLogs, epochEnd, logRouterTags); + serializer(ar, tLogs, epochEnd, logRouterTags, pseudoLocalities); } }; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 51292a2645..9aeea4d2f7 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -48,6 +48,18 @@ struct OldLogData { std::set pseudoLocalities; OldLogData() : epochEnd(0), logRouterTags(0) {} + explicit OldLogData(const OldTLogConf& conf) + : logRouterTags(conf.logRouterTags), epochEnd(conf.epochEnd), + pseudoLocalities(conf.pseudoLocalities) + { + tLogs.resize(conf.tLogs.size()); + for (int j = 0; j < conf.tLogs.size(); j++) { + Reference logSet(new LogSet(conf.tLogs[j])); + tLogs[j] = logSet; + filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); + logSet->updateLocalitySet(logSet->tLogLocalities); + } + } }; struct LogLockInfo { @@ -65,7 +77,8 @@ LogSet::LogSet(const TLogSet& tLogSet) : tLogLocalities(tLogSet.tLogLocalities), tLogVersion(tLogSet.tLogVersion), tLogPolicy(tLogSet.tLogPolicy), isLocal(tLogSet.isLocal), locality(tLogSet.locality), startVersion(tLogSet.startVersion), - satelliteTagLocations(tLogSet.satelliteTagLocations) + satelliteTagLocations(tLogSet.satelliteTagLocations), + pseudoLocalities(tLogSet.pseudoLocalitites) { for(const auto& log : tLogSet.tLogs) { logServers.push_back(Reference>>(new AsyncVar>(log))); @@ -81,7 +94,8 @@ LogSet::LogSet(const CoreTLogSet& coreSet) : tLogLocalities(coreSet.tLogLocalities), tLogVersion(coreSet.tLogVersion), tLogPolicy(coreSet.tLogPolicy), isLocal(coreSet.isLocal), locality(coreSet.locality), startVersion(coreSet.startVersion), - satelliteTagLocations(coreSet.satelliteTagLocations) + satelliteTagLocations(coreSet.satelliteTagLocations), + pseudoLocalities(coreSet.pseudoLocalitites) { for(const auto& log : coreSet.tLogs) { logServers.push_back(Reference>>(new AsyncVar>(OptionalInterface(log)))); @@ -94,7 +108,8 @@ TLogSet::TLogSet(const LogSet& rhs) : tLogLocalities(rhs.tLogLocalities), tLogVersion(rhs.tLogVersion), tLogPolicy(rhs.tLogPolicy), isLocal(rhs.isLocal), locality(rhs.locality), startVersion(rhs.startVersion), - satelliteTagLocations(rhs.satelliteTagLocations) + satelliteTagLocations(rhs.satelliteTagLocations), + pseudoLocalitites(rhs.pseudoLocalities) { for (const auto& tlog : rhs.logServers) { tLogs.push_back(tlog->get()); @@ -105,6 +120,21 @@ TLogSet::TLogSet(const LogSet& rhs) : } } +CoreTLogSet::CoreTLogSet(const LogSet& logset) : + tLogWriteAntiQuorum(logset.tLogWriteAntiQuorum), + tLogReplicationFactor(logset.tLogReplicationFactor), + tLogLocalities(logset.tLogLocalities), + tLogPolicy(logset.tLogPolicy), isLocal(logset.isLocal), + locality(logset.locality), startVersion(logset.startVersion), + satelliteTagLocations(logset.satelliteTagLocations), + tLogVersion(logset.tLogVersion), + pseudoLocalitites(logset.pseudoLocalities) +{ + for (const auto &log : logset.logServers) { + tLogs.push_back(log->get().id()); + } +} + struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { UID dbgid; LogSystemType logSystemType; @@ -192,21 +222,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData.resize(lsConf.oldTLogs.size()); - for( int i = 0; i < lsConf.oldTLogs.size(); i++ ) { - logSystem->oldLogData[i].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); - for (int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++) { - TLogSet const& tLogData = lsConf.oldTLogs[i].tLogs[j]; - Reference logSet(new LogSet(tLogData)); - logSystem->oldLogData[i].tLogs[j] = logSet; - filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); - logSet->updateLocalitySet(logSet->tLogLocalities); - } - logSystem->oldLogData[i].logRouterTags = lsConf.oldTLogs[i].logRouterTags; - logSystem->oldLogData[i].epochEnd = lsConf.oldTLogs[i].epochEnd; + for (const auto& oldTlogConf : lsConf.oldTLogs) { + logSystem->oldLogData.emplace_back(oldTlogConf); } logSystem->logSystemType = lsConf.logSystemType; + logSystem->pseudoLocalities = lsConf.pseudoLocalities; return logSystem; } @@ -227,22 +248,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouterTags = lsConf.oldTLogs[0].logRouterTags; //logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd; - logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1); - for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) { - logSystem->oldLogData[i-1].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); - for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) { - TLogSet const& tLogSet = lsConf.oldTLogs[i].tLogs[j]; - Reference logSet(new LogSet(tLogSet)); - logSystem->oldLogData[i-1].tLogs[j] = logSet; - filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); - logSet->updateLocalitySet(logSet->tLogLocalities); - } - logSystem->oldLogData[i-1].logRouterTags = lsConf.oldTLogs[i].logRouterTags; - logSystem->oldLogData[i-1].epochEnd = lsConf.oldTLogs[i].epochEnd; + for (int i = 1; i < lsConf.oldTLogs.size(); i++ ) { + logSystem->oldLogData.emplace_back(lsConf.oldTLogs[i]); } } logSystem->logSystemType = lsConf.logSystemType; logSystem->stopped = true; + logSystem->pseudoLocalities = lsConf.pseudoLocalities; return logSystem; } @@ -256,49 +268,25 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size()) { - CoreTLogSet coreSet; - for(auto &log : t->logServers) { - coreSet.tLogs.push_back(log->get().id()); - coreSet.tLogLocalities.push_back(log->get().interf().locality); - } - coreSet.tLogVersion = t->tLogVersion; - coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum; - coreSet.tLogReplicationFactor = t->tLogReplicationFactor; - coreSet.tLogPolicy = t->tLogPolicy; - coreSet.isLocal = t->isLocal; - coreSet.locality = t->locality; - coreSet.startVersion = t->startVersion; - coreSet.satelliteTagLocations = t->satelliteTagLocations; - newState.tLogs.push_back(coreSet); + newState.pseudoLocalities = pseudoLocalities; + for (const auto &t : tLogs) { + if (t->logServers.size()) { + newState.tLogs.emplace_back(*t); } } newState.oldTLogData.clear(); if(!recoveryComplete.isValid() || !recoveryComplete.isReady() || (repopulateRegionAntiQuorum == 0 && (!remoteRecoveryComplete.isValid() || !remoteRecoveryComplete.isReady()))) { newState.oldTLogData.resize(oldLogData.size()); - for(int i = 0; i < oldLogData.size(); i++) { - for(auto &t : oldLogData[i].tLogs) { - if(t->logServers.size()) { - CoreTLogSet coreSet; - for(auto &log : t->logServers) { - coreSet.tLogs.push_back(log->get().id()); - } - coreSet.tLogLocalities = t->tLogLocalities; - coreSet.tLogVersion = t->tLogVersion; - coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum; - coreSet.tLogReplicationFactor = t->tLogReplicationFactor; - coreSet.tLogPolicy = t->tLogPolicy; - coreSet.isLocal = t->isLocal; - coreSet.locality = t->locality; - coreSet.startVersion = t->startVersion; - coreSet.satelliteTagLocations = t->satelliteTagLocations; - newState.oldTLogData[i].tLogs.push_back(coreSet); + for (int i = 0; i < oldLogData.size(); i++) { + for (const auto& t : oldLogData[i].tLogs) { + if (t->logServers.size()) { + newState.oldTLogData[i].tLogs.emplace_back(*t); } } newState.oldTLogData[i].logRouterTags = oldLogData[i].logRouterTags; newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd; + newState.oldTLogData[i].pseudoLocalities = oldLogData[i].pseudoLocalities; } } @@ -763,12 +751,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted peekLogRouter( UID dbgid, Version begin, Tag tag ) { bool found = false; for( auto& log : tLogs ) { - for( auto& router : log->logRouters ) { - if(router->get().id() == dbgid) { - found = true; - break; - } - } + found = log->hasLogRouter(dbgid); if(found) { break; } @@ -803,12 +786,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouters ) { - if(router->get().id() == dbgid) { - found = true; - break; - } - } + found = log->hasLogRouter(dbgid); if(found) { break; } @@ -1017,7 +995,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted& logSet : tLogs) { if (logSet->isLocal || remoteLogsWrittenToCoreState) { - logSystemConfig.tLogs.push_back(TLogSet(*logSet)); + logSystemConfig.tLogs.emplace_back(*logSet); } } @@ -1026,10 +1004,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted& logSet : oldData.tLogs) { - logSystemConfig.oldTLogs.back().tLogs.push_back(TLogSet(*logSet)); + logSystemConfig.oldTLogs.back().tLogs.emplace_back(*logSet); } logSystemConfig.oldTLogs.back().logRouterTags = oldData.logRouterTags; logSystemConfig.oldTLogs.back().epochEnd = oldData.epochEnd; + logSystemConfig.oldTLogs.back().pseudoLocalities = oldData.pseudoLocalities; } } return logSystemConfig; @@ -1653,6 +1632,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted getLocalTags(int8_t locality, const std::vector& allTags) { + std::vector localTags; + for (const auto& tag : allTags) { + if (locality == tagLocalitySpecial || locality == tag.locality || tag.locality < 0) { + localTags.push_back(tag); + } + } + return localTags; + } + ACTOR static Future newRemoteEpoch( TagPartitionedLogSystem* self, Reference oldLogSystem, Future fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector allTags ) { TraceEvent("RemoteLogRecruitment_WaitingForWorkers"); state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers ); @@ -1705,12 +1694,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedTLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); } - std::vector localTags; - for(auto& tag : allTags) { - if(remoteLocality == tagLocalitySpecial || remoteLocality == tag.locality || tag.locality < 0) { - localTags.push_back(tag); - } - } + std::vector localTags = getLocalTags(remoteLocality, allTags); + LogSystemConfig oldLogSystemConfig = oldLogSystem->getLogSystemConfig(); state vector> remoteTLogInitializationReplies; vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() ); @@ -1720,7 +1705,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedgetLogSystemConfig(); + req.recoverFrom = oldLogSystemConfig; req.recoverAt = oldLogSystem->recoverAt.get(); req.knownCommittedVersion = oldLogSystem->knownCommittedVersion; req.epoch = recoveryCount; @@ -1882,12 +1867,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogSystemConfigChanged.trigger(); } - std::vector localTags; - for(auto& tag : allTags) { - if(primaryLocality == tagLocalitySpecial || primaryLocality == tag.locality || tag.locality < 0) { - localTags.push_back(tag); - } - } + std::vector localTags = getLocalTags(primaryLocality, allTags); + state LogSystemConfig oldLogSystemConfig = oldLogSystem->getLogSystemConfig(); state vector> initializationReplies; vector< InitializeTLogRequest > reqs( recr.tLogs.size() ); @@ -1897,7 +1878,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedgetLogSystemConfig(); + req.recoverFrom = oldLogSystemConfig; req.recoverAt = oldLogSystem->recoverAt.get(); req.knownCommittedVersion = oldLogSystem->knownCommittedVersion; req.epoch = recoveryCount; @@ -1941,7 +1922,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedgetLogSystemConfig(); + req.recoverFrom = oldLogSystemConfig; req.recoverAt = oldLogSystem->recoverAt.get(); req.knownCommittedVersion = oldLogSystem->knownCommittedVersion; req.epoch = recoveryCount; From 010f825afffaca6ee58c55f9c02deff938f75f3d Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 18 Apr 2019 13:41:37 -0700 Subject: [PATCH 09/17] Remove pseudoLocalities from LogSet, TLogSet, and CoreTLogSet --- fdbserver/DBCoreState.h | 7 +-- fdbserver/LogSystem.h | 1 - fdbserver/LogSystemConfig.h | 9 ++-- fdbserver/TagPartitionedLogSystem.actor.cpp | 54 ++++++++++----------- 4 files changed, 36 insertions(+), 35 deletions(-) diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index c6b3eff7e8..c69226b18c 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -30,6 +30,7 @@ #include "fdbserver/MasterInterface.h" class LogSet; +struct OldLogData; // This structure is stored persistently in CoordinatedState and must be versioned carefully! // It records a synchronous replication topology which can be used in the absence of faults (or under a limited @@ -52,19 +53,18 @@ struct CoreTLogSet { Version startVersion; std::vector> satelliteTagLocations; TLogVersion tLogVersion; - std::set pseudoLocalitites; CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {} explicit CoreTLogSet(const LogSet& logset); bool operator == (CoreTLogSet const& rhs) const { return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && satelliteTagLocations == rhs.satelliteTagLocations && - pseudoLocalitites == rhs.pseudoLocalitites && locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); + locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); } template void serialize(Archive& ar) { - serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, pseudoLocalitites); + serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { tLogVersion = TLogVersion::V2; } else { @@ -80,6 +80,7 @@ struct OldTLogCoreData { std::set pseudoLocalities; OldTLogCoreData() : epochEnd(0), logRouterTags(0) {} + explicit OldTLogCoreData(const OldLogData&); bool operator == (OldTLogCoreData const& rhs) const { return tLogs == rhs.tLogs && logRouterTags == rhs.logRouterTags && epochEnd == rhs.epochEnd && pseudoLocalities == rhs.pseudoLocalities; diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 4a2e44564d..98cf20f139 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -53,7 +53,6 @@ public: Version startVersion; std::vector> replies; std::vector> satelliteTagLocations; - std::set pseudoLocalities; LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {} LogSet(const TLogSet& tlogSet); diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index dadb807484..bd06329384 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -56,6 +56,7 @@ protected: }; class LogSet; +struct OldLogData; struct TLogSet { std::vector> tLogs; @@ -68,7 +69,6 @@ struct TLogSet { int8_t locality; Version startVersion; std::vector> satelliteTagLocations; - std::set pseudoLocalitites; TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {} explicit TLogSet(const LogSet& rhs); @@ -79,7 +79,7 @@ struct TLogSet { bool operator == ( const TLogSet& rhs ) const { if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || satelliteTagLocations != rhs.satelliteTagLocations || - pseudoLocalitites != rhs.pseudoLocalitites || startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) { + startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) { return false; } if ((tLogPolicy && !rhs.tLogPolicy) || (!tLogPolicy && rhs.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != rhs.tLogPolicy->info()))) { @@ -100,7 +100,7 @@ struct TLogSet { bool isEqualIds(TLogSet const& r) const { if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || satelliteTagLocations != r.satelliteTagLocations || - pseudoLocalitites != r.pseudoLocalitites || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) { + startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) { return false; } if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) { @@ -116,7 +116,7 @@ struct TLogSet { template void serialize( Ar& ar ) { - serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, pseudoLocalitites); + serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { tLogVersion = TLogVersion::V2; } else { @@ -133,6 +133,7 @@ struct OldTLogConf { std::set pseudoLocalities; OldTLogConf() : epochEnd(0), logRouterTags(0) {} + explicit OldTLogConf(const OldLogData&); std::string toString() const { return format("end: %d tags: %d %s", epochEnd, logRouterTags, describe(tLogs).c_str()); diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 9aeea4d2f7..32b66c82f0 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -77,8 +77,7 @@ LogSet::LogSet(const TLogSet& tLogSet) : tLogLocalities(tLogSet.tLogLocalities), tLogVersion(tLogSet.tLogVersion), tLogPolicy(tLogSet.tLogPolicy), isLocal(tLogSet.isLocal), locality(tLogSet.locality), startVersion(tLogSet.startVersion), - satelliteTagLocations(tLogSet.satelliteTagLocations), - pseudoLocalities(tLogSet.pseudoLocalitites) + satelliteTagLocations(tLogSet.satelliteTagLocations) { for(const auto& log : tLogSet.tLogs) { logServers.push_back(Reference>>(new AsyncVar>(log))); @@ -94,8 +93,7 @@ LogSet::LogSet(const CoreTLogSet& coreSet) : tLogLocalities(coreSet.tLogLocalities), tLogVersion(coreSet.tLogVersion), tLogPolicy(coreSet.tLogPolicy), isLocal(coreSet.isLocal), locality(coreSet.locality), startVersion(coreSet.startVersion), - satelliteTagLocations(coreSet.satelliteTagLocations), - pseudoLocalities(coreSet.pseudoLocalitites) + satelliteTagLocations(coreSet.satelliteTagLocations) { for(const auto& log : coreSet.tLogs) { logServers.push_back(Reference>>(new AsyncVar>(OptionalInterface(log)))); @@ -108,8 +106,7 @@ TLogSet::TLogSet(const LogSet& rhs) : tLogLocalities(rhs.tLogLocalities), tLogVersion(rhs.tLogVersion), tLogPolicy(rhs.tLogPolicy), isLocal(rhs.isLocal), locality(rhs.locality), startVersion(rhs.startVersion), - satelliteTagLocations(rhs.satelliteTagLocations), - pseudoLocalitites(rhs.pseudoLocalities) + satelliteTagLocations(rhs.satelliteTagLocations) { for (const auto& tlog : rhs.logServers) { tLogs.push_back(tlog->get()); @@ -120,6 +117,15 @@ TLogSet::TLogSet(const LogSet& rhs) : } } +OldTLogConf::OldTLogConf(const OldLogData& oldLogData) : + logRouterTags(oldLogData.logRouterTags), epochEnd(oldLogData.epochEnd), + pseudoLocalities(oldLogData.pseudoLocalities) +{ + for (const Reference& logSet : oldLogData.tLogs) { + tLogs.emplace_back(*logSet); + } +} + CoreTLogSet::CoreTLogSet(const LogSet& logset) : tLogWriteAntiQuorum(logset.tLogWriteAntiQuorum), tLogReplicationFactor(logset.tLogReplicationFactor), @@ -127,14 +133,24 @@ CoreTLogSet::CoreTLogSet(const LogSet& logset) : tLogPolicy(logset.tLogPolicy), isLocal(logset.isLocal), locality(logset.locality), startVersion(logset.startVersion), satelliteTagLocations(logset.satelliteTagLocations), - tLogVersion(logset.tLogVersion), - pseudoLocalitites(logset.pseudoLocalities) + tLogVersion(logset.tLogVersion) { for (const auto &log : logset.logServers) { tLogs.push_back(log->get().id()); } } +OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData) : + logRouterTags(oldData.logRouterTags), epochEnd(oldData.epochEnd), + pseudoLocalities(oldData.pseudoLocalities) +{ + for (const Reference& logSet : oldData.tLogs) { + if (logSet->logServers.size()) { + tLogs.emplace_back(*logSet); + } + } +} + struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { UID dbgid; LogSystemType logSystemType; @@ -227,7 +243,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogSystemType = lsConf.logSystemType; - logSystem->pseudoLocalities = lsConf.pseudoLocalities; return logSystem; } @@ -277,16 +292,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size()) { - newState.oldTLogData[i].tLogs.emplace_back(*t); - } - } - newState.oldTLogData[i].logRouterTags = oldLogData[i].logRouterTags; - newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd; - newState.oldTLogData[i].pseudoLocalities = oldLogData[i].pseudoLocalities; + for (const auto& oldData : oldLogData) { + newState.oldTLogData.emplace_back(oldData); } } @@ -1001,14 +1008,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted& logSet : oldData.tLogs) { - logSystemConfig.oldTLogs.back().tLogs.emplace_back(*logSet); - } - logSystemConfig.oldTLogs.back().logRouterTags = oldData.logRouterTags; - logSystemConfig.oldTLogs.back().epochEnd = oldData.epochEnd; - logSystemConfig.oldTLogs.back().pseudoLocalities = oldData.pseudoLocalities; + logSystemConfig.oldTLogs.emplace_back(oldData); } } return logSystemConfig; From 97986a28b73d6b65133c49bba271a785a153f3e0 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 18 Apr 2019 14:53:51 -0700 Subject: [PATCH 10/17] Replace push_back with emplace_back for efficiency And better code readability. --- fdbserver/TagPartitionedLogSystem.actor.cpp | 58 ++++++++++----------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 32b66c82f0..cc4f07b2c8 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -80,10 +80,10 @@ LogSet::LogSet(const TLogSet& tLogSet) : satelliteTagLocations(tLogSet.satelliteTagLocations) { for(const auto& log : tLogSet.tLogs) { - logServers.push_back(Reference>>(new AsyncVar>(log))); + logServers.emplace_back(new AsyncVar>(log)); } for(const auto& log : tLogSet.logRouters) { - logRouters.push_back(Reference>>(new AsyncVar>(log))); + logRouters.emplace_back(new AsyncVar>(log)); } } @@ -96,7 +96,7 @@ LogSet::LogSet(const CoreTLogSet& coreSet) : satelliteTagLocations(coreSet.satelliteTagLocations) { for(const auto& log : coreSet.tLogs) { - logServers.push_back(Reference>>(new AsyncVar>(OptionalInterface(log)))); + logServers.emplace_back(new AsyncVar>(OptionalInterface(log))); } } @@ -439,7 +439,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServerString()); - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, lastBegin, end, parallelGetMore)) ); + cursors.emplace_back(new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, lastBegin, end, parallelGetMore)); } int i = 0; while(begin < lastBegin) { @@ -484,7 +484,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServerString()).detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin); - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet, localOldSets[bestOldSet]->bestLocationFor( tag ), tag, thisBegin, std::min(lastBegin, end), parallelGetMore)) ); + cursors.emplace_back(new ILogSystem::SetPeekCursor(localOldSets, bestOldSet, localOldSets[bestOldSet]->bestLocationFor( tag ), tag, thisBegin, std::min(lastBegin, end), parallelGetMore)); epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end))); } lastBegin = thisBegin; @@ -520,7 +520,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted > cursors; std::vector< LogMessageVersion > epochEnds; TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString()); - cursors.push_back( Reference( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector(), Reference(), 0 ) ) ); + cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector(), Reference(), 0 ) ); int i = 0; while(begin < lastBegin) { if(i == oldLogData.size()) { @@ -548,9 +548,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouterString()) .detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion); - cursors.push_back( Reference( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag, - thisBegin, lastBegin, false, std::vector(), Reference(), 0 ) ) ); - epochEnds.push_back(LogMessageVersion(lastBegin)); + cursors.emplace_back(new ILogSystem::MergedPeekCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag, + thisBegin, lastBegin, false, std::vector(), Reference(), 0)); + epochEnds.emplace_back(lastBegin); lastBegin = thisBegin; } i++; @@ -635,10 +635,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedstartVersion < end) { TraceEvent("TLogPeekLocalAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestSet", bestSet).detail("BestSetStart", tLogs[bestSet]->startVersion).detail("LogId", tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )]->get().id()); if(useMergePeekCursors) { - cursors.push_back( Reference( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logServers, tLogs[bestSet]->bestLocationFor( tag ), tLogs[bestSet]->logServers.size() + 1 - tLogs[bestSet]->tLogReplicationFactor, tag, - tLogs[bestSet]->startVersion, end, true, tLogs[bestSet]->tLogLocalities, tLogs[bestSet]->tLogPolicy, tLogs[bestSet]->tLogReplicationFactor) ) ); + cursors.emplace_back(new ILogSystem::MergedPeekCursor(tLogs[bestSet]->logServers, tLogs[bestSet]->bestLocationFor( tag ), tLogs[bestSet]->logServers.size() + 1 - tLogs[bestSet]->tLogReplicationFactor, tag, + tLogs[bestSet]->startVersion, end, true, tLogs[bestSet]->tLogLocalities, tLogs[bestSet]->tLogPolicy, tLogs[bestSet]->tLogReplicationFactor)); } else { - cursors.push_back( Reference( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, tLogs[bestSet]->startVersion, end, false, false ) ) ); + cursors.emplace_back(new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, tLogs[bestSet]->startVersion, end, false, false)); } } Version lastBegin = tLogs[bestSet]->startVersion; @@ -688,9 +688,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServerString()).detail("ThisBegin", thisBegin).detail("LastBegin", lastBegin); - cursors.push_back( Reference( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logServers, oldLogData[i].tLogs[bestOldSet]->bestLocationFor( tag ), oldLogData[i].tLogs[bestOldSet]->logServers.size() + 1 - oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor, tag, - thisBegin, std::min(lastBegin, end), useMergePeekCursors, oldLogData[i].tLogs[bestOldSet]->tLogLocalities, oldLogData[i].tLogs[bestOldSet]->tLogPolicy, oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor))); - epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end))); + cursors.emplace_back(new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logServers, oldLogData[i].tLogs[bestOldSet]->bestLocationFor( tag ), oldLogData[i].tLogs[bestOldSet]->logServers.size() + 1 - oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor, tag, + thisBegin, std::min(lastBegin, end), useMergePeekCursors, oldLogData[i].tLogs[bestOldSet]->tLogLocalities, oldLogData[i].tLogs[bestOldSet]->tLogPolicy, oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor)); + epochEnds.emplace_back(std::min(lastBegin, end)); } lastBegin = thisBegin; } @@ -719,7 +719,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted( new ILogSystem::MultiCursor(cursors, epochEnds) ); } catch( Error& e ) { @@ -748,7 +748,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted( new ILogSystem::MultiCursor(cursors, epochEnds) ); @@ -1020,7 +1020,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedisLocal || remoteLogsWrittenToCoreState) { for( int i = 0; i < t->logServers.size(); i++ ) { - logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress())); + logs.emplace_back(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress()); } } } @@ -1028,7 +1028,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size(); j++ ) { - oldLogs.push_back(std::make_pair(t->logServers[j]->get().id(), t->logServers[j]->get().present() ? t->logServers[j]->get().interf().address() : NetworkAddress())); + oldLogs.emplace_back(t->logServers[j]->get().id(), t->logServers[j]->get().present() ? t->logServers[j]->get().interf().address() : NetworkAddress()); } } } @@ -1310,7 +1310,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted>> failed; for (const auto& logVar : logSet->logServers) { allLogServers.push_back(logVar); - failed.push_back(Reference>(new AsyncVar())); + failed.emplace_back(new AsyncVar()); failureTrackers.push_back(monitorLog(logVar, failed.back())); } filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); @@ -1411,7 +1411,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted>> failed; for(auto& log : logServers[0]->logServers) { - failed.push_back( Reference>( new AsyncVar() ) ); + failed.emplace_back(new AsyncVar()); failureTrackers.push_back( monitorLog(log, failed.back() ) ); } ASSERT(logFailed.size() == 1); @@ -1507,7 +1507,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs) { //Recruit log routers for old generations of the primary locality if(tLogs->locality == locality) { - logRouterInitializationReplies.push_back(vector>()); + logRouterInitializationReplies.emplace_back(); for( int i = 0; i < self->logRouterTags; i++) { InitializeLogRouterRequest req; req.recoveryCount = recoveryCount; @@ -1556,7 +1556,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlocality == locality) { - logRouterInitializationReplies.push_back(vector>()); + logRouterInitializationReplies.emplace_back(); for( int i = 0; i < old.logRouterTags; i++) { InitializeLogRouterRequest req; req.recoveryCount = recoveryCount; @@ -1592,7 +1592,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs) { if(tLogs->locality == locality) { for( int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++ ) { - tLogs->logRouters.push_back( Reference>>( new AsyncVar>( OptionalInterface(logRouterInitializationReplies[nextReplies][i].get()) ) ) ); + tLogs->logRouters.emplace_back(new AsyncVar>(OptionalInterface(logRouterInitializationReplies[nextReplies][i].get()))); failed.push_back( waitFailureClient( logRouterInitializationReplies[nextReplies][i].get().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) ); } nextReplies++; @@ -1614,7 +1614,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlocality == locality) { for( int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++ ) { - tLogs->logRouters.push_back( Reference>>( new AsyncVar>( OptionalInterface(logRouterInitializationReplies[nextReplies][i].get()) ) ) ); + tLogs->logRouters.emplace_back(new AsyncVar>(OptionalInterface(logRouterInitializationReplies[nextReplies][i].get()))); if(!forRemote) { failed.push_back( waitFailureClient( logRouterInitializationReplies[nextReplies][i].get().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) ); } @@ -1738,7 +1738,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouters.push_back( Reference>>( new AsyncVar>( OptionalInterface(logRouterInitializationReplies[i].get()) ) ) ); + logSet->logRouters.emplace_back(new AsyncVar>(OptionalInterface(logRouterInitializationReplies[i].get()))); } for( int i = 0; i < remoteTLogInitializationReplies.size(); i++ ) { @@ -1774,7 +1774,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedpseudoLocalities.insert(tagLocalityLogRouter); } - logSystem->tLogs.push_back( Reference( new LogSet() ) ); + logSystem->tLogs.emplace_back(new LogSet()); logSystem->tLogs[0]->tLogVersion = configuration.tLogVersion; logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum; logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor; @@ -1785,7 +1785,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 0) { - logSystem->tLogs.push_back( Reference( new LogSet() ) ); + logSystem->tLogs.emplace_back(new LogSet()); if(recr.satelliteFallback) { logSystem->tLogs[1]->tLogWriteAntiQuorum = region.satelliteTLogWriteAntiQuorumFallback; logSystem->tLogs[1]->tLogReplicationFactor = region.satelliteTLogReplicationFactorFallback; @@ -1813,7 +1813,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs.size()) { - logSystem->oldLogData.push_back(OldLogData()); + logSystem->oldLogData.emplace_back(); logSystem->oldLogData[0].tLogs = oldLogSystem->tLogs; logSystem->oldLogData[0].epochEnd = oldLogSystem->knownCommittedVersion + 1; logSystem->oldLogData[0].logRouterTags = oldLogSystem->logRouterTags; From 9e8ffd2ff764c8a745f10e9440e974c23c14509a Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 19 Apr 2019 09:41:09 -0700 Subject: [PATCH 11/17] Refactor OldLogData ctor --- fdbserver/TagPartitionedLogSystem.actor.cpp | 95 ++++++++------------- 1 file changed, 35 insertions(+), 60 deletions(-) diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index cc4f07b2c8..4d79dead1c 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -48,7 +48,10 @@ struct OldLogData { std::set pseudoLocalities; OldLogData() : epochEnd(0), logRouterTags(0) {} - explicit OldLogData(const OldTLogConf& conf) + + // Constructor for T of OldTLogConf and OldTLogCoreData + template + explicit OldLogData(const T& conf) : logRouterTags(conf.logRouterTags), epochEnd(conf.epochEnd), pseudoLocalities(conf.pseudoLocalities) { @@ -56,8 +59,6 @@ struct OldLogData { for (int j = 0; j < conf.tLogs.size(); j++) { Reference logSet(new LogSet(conf.tLogs[j])); tLogs[j] = logSet; - filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); - logSet->updateLocalitySet(logSet->tLogLocalities); } } }; @@ -85,6 +86,8 @@ LogSet::LogSet(const TLogSet& tLogSet) : for(const auto& log : tLogSet.logRouters) { logRouters.emplace_back(new AsyncVar>(log)); } + filterLocalityDataForPolicy(tLogPolicy, &tLogLocalities); + updateLocalitySet(tLogLocalities); } LogSet::LogSet(const CoreTLogSet& coreSet) : @@ -98,6 +101,8 @@ LogSet::LogSet(const CoreTLogSet& coreSet) : for(const auto& log : coreSet.tLogs) { logServers.emplace_back(new AsyncVar>(OptionalInterface(log))); } + filterLocalityDataForPolicy(tLogPolicy, &tLogLocalities); + updateLocalitySet(tLogLocalities); } TLogSet::TLogSet(const LogSet& rhs) : @@ -231,10 +236,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedpseudoLocalities = lsConf.pseudoLocalities; for (const TLogSet& tLogSet : lsConf.tLogs) { if (!excludeRemote || tLogSet.isLocal) { - Reference logSet(new LogSet(tLogSet)); - logSystem->tLogs.push_back(logSet); - filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); - logSet->updateLocalitySet(logSet->tLogLocalities); + logSystem->tLogs.emplace_back(new LogSet(tLogSet)); } } @@ -251,14 +253,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSystem( new TagPartitionedLogSystem(dbgid, locality) ); - if(lsConf.oldTLogs.size()) { - logSystem->tLogs.resize( lsConf.oldTLogs[0].tLogs.size()); - for( int i = 0; i < lsConf.oldTLogs[0].tLogs.size(); i++ ) { - TLogSet const& tLogSet = lsConf.oldTLogs[0].tLogs[i]; - Reference logSet(new LogSet(tLogSet)); - logSystem->tLogs[i] = logSet; - filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); - logSet->updateLocalitySet(logSet->tLogLocalities); + if (lsConf.oldTLogs.size()) { + for (const TLogSet& tLogSet : lsConf.oldTLogs[0].tLogs) { + logSystem->tLogs.emplace_back(new LogSet(tLogSet)); } logSystem->logRouterTags = lsConf.oldTLogs[0].logRouterTags; //logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd; @@ -1302,35 +1299,23 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted>>> logFailed; state std::vector> failureTrackers; - logServers.resize(prevState.tLogs.size()); - for (int i = 0; i < prevState.tLogs.size(); i++) { - CoreTLogSet const& coreSet = prevState.tLogs[i]; - Reference logSet(new LogSet(coreSet)); - logServers[i] = logSet; + for (const CoreTLogSet& coreSet : prevState.tLogs) { + logServers.emplace_back(new LogSet(coreSet)); std::vector>> failed; - for (const auto& logVar : logSet->logServers) { + for (const auto& logVar : logServers.back()->logServers) { allLogServers.push_back(logVar); failed.emplace_back(new AsyncVar()); failureTrackers.push_back(monitorLog(logVar, failed.back())); } - filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); - logSet->updateLocalitySet(logSet->tLogLocalities); logFailed.push_back(failed); } - oldLogData.resize(prevState.oldTLogData.size()); - for( int i = 0; i < prevState.oldTLogData.size(); i++ ) { - OldLogData& oldData = oldLogData[i]; - OldTLogCoreData const& old = prevState.oldTLogData[i]; - oldData.tLogs.resize(old.tLogs.size()); - for (int j = 0; j < old.tLogs.size(); j++) { - Reference logSet(new LogSet(old.tLogs[j])); - oldData.tLogs[j] = logSet; + + for (const auto& oldTlogData : prevState.oldTLogData) { + oldLogData.emplace_back(oldTlogData); + + for (const auto& logSet : oldLogData.back().tLogs) { allLogServers.insert(allLogServers.end(), logSet->logServers.begin(), logSet->logServers.end()); - filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); - logSet->updateLocalitySet(logSet->tLogLocalities); } - oldData.epochEnd = old.epochEnd; - oldData.logRouterTags = old.logRouterTags; } state Future rejoins = trackRejoins( dbgid, allLogServers, rejoinRequests ); @@ -1472,12 +1457,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted::max(); if(!forRemote) { - Version maxStart = 0; - for(auto& logSet : self->tLogs) { - if(logSet->isLocal) { - maxStart = std::max(maxStart, logSet->startVersion); - } - } + Version maxStart = getMaxLocalStartVersion(self->tLogs); lastStart = std::max(startVersion, maxStart); if( self->logRouterTags == 0 ) { @@ -1526,12 +1506,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData) { - Version maxStart = 0; - for(auto& logSet : old.tLogs) { - if(logSet->isLocal) { - maxStart = std::max(maxStart, logSet->startVersion); - } - } + Version maxStart = getMaxLocalStartVersion(old.tLogs); if(old.logRouterTags == 0 || maxStart >= lastStart) { break; @@ -1581,12 +1556,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> failed; if(!forRemote) { - Version maxStart = 0; - for(auto& logSet : self->tLogs) { - if(logSet->isLocal) { - maxStart = std::max(maxStart, logSet->startVersion); - } - } + Version maxStart = getMaxLocalStartVersion(self->tLogs); lastStart = std::max(startVersion, maxStart); for(auto& tLogs : self->tLogs) { @@ -1601,12 +1571,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData) { - Version maxStart = 0; - for(auto& logSet : old.tLogs) { - if(logSet->isLocal) { - maxStart = std::max(maxStart, logSet->startVersion); - } - } + Version maxStart = getMaxLocalStartVersion(old.tLogs); if(old.logRouterTags == 0 || maxStart >= lastStart) { break; } @@ -1632,6 +1597,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted>& tLogs) { + Version maxStart = 0; + for (const auto& logSet : tLogs) { + if(logSet->isLocal) { + maxStart = std::max(maxStart, logSet->startVersion); + } + } + return maxStart; + } + static std::vector getLocalTags(int8_t locality, const std::vector& allTags) { std::vector localTags; for (const auto& tag : allTags) { @@ -1646,7 +1621,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSet = Reference( new LogSet() ); + state Reference logSet(new LogSet()); logSet->tLogReplicationFactor = configuration.getRemoteTLogReplicationFactor(); logSet->tLogVersion = configuration.tLogVersion; logSet->tLogPolicy = configuration.getRemoteTLogPolicy(); From 8b67da57bba2e88f04ec4a033bbe55e676f7a82b Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 19 Apr 2019 14:29:32 -0700 Subject: [PATCH 12/17] Fix upgrade test failure Serialize pseudoLocalities if protocol version is larger than 0x0FDB00B061060001LL. Note this version may need to be changed to "currentProtocolVersion" when merging into the master, and "currentProtocolVersion" should be incremented. --- fdbserver/DBCoreState.h | 10 ++++++++-- flow/Net2.actor.cpp | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index c69226b18c..c1f5d244ff 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -89,13 +89,16 @@ struct OldTLogCoreData { template void serialize(Archive& ar) { if( ar.protocolVersion() >= 0x0FDB00A560010001LL) { - serializer(ar, tLogs, logRouterTags, epochEnd, pseudoLocalities); + serializer(ar, tLogs, logRouterTags, epochEnd); } else if(ar.isDeserializing) { tLogs.push_back(CoreTLogSet()); serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, tLogs[0].tLogReplicationFactor, tLogs[0].tLogPolicy, epochEnd, tLogs[0].tLogLocalities); tLogs[0].tLogVersion = TLogVersion::V2; } + if (ar.protocolVersion() > 0x0FDB00B061060001LL) { + serializer(ar, pseudoLocalities); + } } }; @@ -141,7 +144,10 @@ struct DBCoreState { ASSERT(ar.protocolVersion() >= 0x0FDB00A460010001LL); if(ar.protocolVersion() >= 0x0FDB00A560010001LL) { - serializer(ar, tLogs, logRouterTags, oldTLogData, recoveryCount, logSystemType, pseudoLocalities); + serializer(ar, tLogs, logRouterTags, oldTLogData, recoveryCount, logSystemType); + if (ar.protocolVersion() > 0x0FDB00B061060001LL) { + serializer(ar, pseudoLocalities); + } } else if(ar.isDeserializing) { tLogs.push_back(CoreTLogSet()); serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, recoveryCount, tLogs[0].tLogReplicationFactor, logSystemType); diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 39a6fa6951..6b5bd1ac42 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -55,7 +55,7 @@ using namespace boost::asio::ip; // // xyzdev // vvvv -const uint64_t currentProtocolVersion = 0x0FDB00B061060001LL; +const uint64_t currentProtocolVersion = 0x0FDB00B061070001LL; const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL; const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL; From 7cb61c766bf72fe2ee0bc0dfdccb67b7d6a870f9 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 19 Apr 2019 22:01:08 -0700 Subject: [PATCH 13/17] Fix tLogLocalities for current LogSet In toCoreState(), the serialization of current LogSet is different from old TLog sets. The locality data should be generated, not copied over. Found by: -r simulation --crash -f tests/fast/KillRegionCycle.txt -s 254666356 -b on --- fdbserver/TagPartitionedLogSystem.actor.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 4d79dead1c..e6128515bc 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -284,6 +284,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size()) { newState.tLogs.emplace_back(*t); + newState.tLogs.back().tLogLocalities.clear(); + for (const auto& log : t->logServers) { + newState.tLogs.back().tLogLocalities.push_back(log->get().interf().locality); + } } } From d2b215b92664ae4ed49b6d37209cdd8db10fc5fa Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 22 Apr 2019 11:55:04 -0700 Subject: [PATCH 14/17] Refactor tag population of ServerCacheInfo --- fdbclient/StorageServerInterface.h | 12 +++++++++++ fdbserver/MasterProxyServer.actor.cpp | 29 +++++---------------------- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 1d048f6b53..9ea511e8dd 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -94,6 +94,18 @@ struct ServerCacheInfo { std::vector tags; std::vector> src_info; std::vector> dest_info; + + void populateTags() { + if (tags.size()) return; + + for (const auto& info : src_info) { + tags.push_back(info->tag); + } + for (const auto& info : dest_info) { + tags.push_back(info->tag); + } + uniquify(tags); + } }; struct GetValueReply : public LoadBalancedReply { diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index e5a383ff16..eaa78b5f60 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -710,35 +710,16 @@ ACTOR Future commitBatch( // Fast path if (debugMutation("ProxyCommit", commitVersion, m)) TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion); - - auto& tags = ranges.begin().value().tags; - if(!tags.size()) { - for( auto info : ranges.begin().value().src_info ) { - tags.push_back( info->tag ); - } - for( auto info : ranges.begin().value().dest_info ) { - tags.push_back( info->tag ); - } - uniquify(tags); - } - - toCommit.addTags(tags); + + ranges.begin().value().populateTags(); + toCommit.addTags(ranges.begin().value().tags); } else { TEST(true); //A clear range extends past a shard boundary std::set allSources; for (auto r : ranges) { - auto& tags = r.value().tags; - if(!tags.size()) { - for( auto info : r.value().src_info ) { - tags.push_back(info->tag); - } - for( auto info : r.value().dest_info ) { - tags.push_back(info->tag); - } - uniquify(tags); - } - allSources.insert(tags.begin(), tags.end()); + r.value().populateTags(); + allSources.insert(r.value().tags.begin(), r.value().tags.end()); } if (debugMutation("ProxyCommit", commitVersion, m)) TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion); From 439d5a3843e97cffa282d269ef59e413e9e8d13b Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 22 Apr 2019 14:03:48 -0700 Subject: [PATCH 15/17] Use emplace_back instead of push_back in Proxy --- fdbserver/MasterProxyServer.actor.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index eaa78b5f60..352fadf868 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -469,7 +469,7 @@ ACTOR Future commitBatch( for(auto it : versionReply.resolverChanges) { auto rs = self->keyResolvers.modify(it.range); for(auto r = rs.begin(); r != rs.end(); ++r) - r->value().push_back(std::make_pair(versionReply.resolverChangesVersion,it.dest)); + r->value().emplace_back(versionReply.resolverChangesVersion,it.dest); } //TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion); @@ -560,7 +560,7 @@ ACTOR Future commitBatch( // These changes to txnStateStore will be committed by the other proxy, so we simply discard the commit message auto fcm = self->logAdapter->getCommitMessage(); - storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit())); + storeCommits.emplace_back(fcm, self->txnStateStore->commit()); //discardCommit( dbgid, fcm, txnStateStore->commit() ); if (initialState) { @@ -647,7 +647,7 @@ ACTOR Future commitBatch( state Optional metadataVersionAfter = self->txnStateStore->readValue(metadataVersionKey).get(); auto fcm = self->logAdapter->getCommitMessage(); - storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit())); + storeCommits.emplace_back(fcm, self->txnStateStore->commit()); self->version = commitVersion; if (!self->validState.isSet()) self->validState.send(Void()); ASSERT(commitVersion); @@ -907,7 +907,7 @@ ACTOR Future commitBatch( self->txsPopVersions.pop_front(); } - self->txsPopVersions.push_back(std::make_pair(commitVersion, msg.popTo)); + self->txsPopVersions.emplace_back(commitVersion, msg.popTo); } self->logSystem->pop(msg.popTo, txsTag); @@ -1220,7 +1220,7 @@ ACTOR static Future readRequestServer( for(auto& it : r.value().src_info) { ssis.push_back(it->interf); } - rep.results.push_back(std::make_pair(r.range(), ssis)); + rep.results.emplace_back(r.range(), ssis); } else if(!req.reverse) { int count = 0; for(auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) { @@ -1229,7 +1229,7 @@ ACTOR static Future readRequestServer( for(auto& it : r.value().src_info) { ssis.push_back(it->interf); } - rep.results.push_back(std::make_pair(r.range(), ssis)); + rep.results.emplace_back(r.range(), ssis); count++; } } else { @@ -1241,7 +1241,7 @@ ACTOR static Future readRequestServer( for(auto& it : r.value().src_info) { ssis.push_back(it->interf); } - rep.results.push_back(std::make_pair(r.range(), ssis)); + rep.results.emplace_back(r.range(), ssis); if(r == commitData->keyInfo.ranges().begin()) { break; } @@ -1258,7 +1258,7 @@ ACTOR static Future readRequestServer( rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() ); Standalone> history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get(); for(int i = history.size()-1; i >= 0; i-- ) { - rep.history.push_back(std::make_pair(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value))); + rep.history.emplace_back(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value)); } auto localityKey = commitData->txnStateStore->readValue(tagLocalityListKeyFor(req.dcId)).get(); if( localityKey.present() ) { @@ -1427,7 +1427,7 @@ ACTOR Future masterProxyServerCore( auto rs = commitData.keyResolvers.modify(allKeys); for(auto r = rs.begin(); r != rs.end(); ++r) - r->value().push_back(std::make_pair(0,0)); + r->value().emplace_back(0,0); commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor); commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference>(), false); @@ -1560,7 +1560,7 @@ ACTOR Future masterProxyServerCore( info.dest_info.push_back( storageInfo ); } uniquify(info.tags); - keyInfoData.push_back( std::make_pair(MapPair(k, info), 1) ); + keyInfoData.emplace_back(MapPair(k, info), 1); } } else { mutations.push_back(mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value)); From 5462f560e7f5fecd9cc8b3addd0402d2221ecb3d Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 23 Apr 2019 15:39:26 -0700 Subject: [PATCH 16/17] Add pseudo locality for log routers and tlogs This changes the logic of pop operations from log routers (LG): - LG pops tagLocalityLogRouterMapped from TLogs; - TLog converts tagLocalityLogRouterMapped back to tagLocalityLogRouter before popping. Later when we add more psuedo localities, the same pattern can be used. --- fdbclient/FDBTypes.h | 10 +++++- fdbserver/LogRouter.actor.cpp | 3 +- fdbserver/LogSystem.h | 10 ++++++ fdbserver/OldTLogServer_6_0.actor.cpp | 25 ++++++++----- fdbserver/TLogServer.actor.cpp | 25 ++++++++----- fdbserver/TagPartitionedLogSystem.actor.cpp | 40 +++++++++++++++++++-- 6 files changed, 91 insertions(+), 22 deletions(-) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index e263eadbe4..790c70c6b6 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -33,7 +33,15 @@ typedef StringRef KeyRef; typedef StringRef ValueRef; typedef int64_t Generation; -enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4, tagLocalitySatellite = -5, tagLocalityInvalid = -99 }; //The TLog and LogRouter require these number to be as compact as possible +enum { + tagLocalitySpecial = -1, + tagLocalityLogRouter = -2, + tagLocalityRemoteLog = -3, + tagLocalityUpgraded = -4, + tagLocalitySatellite = -5, + tagLocalityLogRouterMapped = -6, + tagLocalityInvalid = -99 +}; //The TLog and LogRouter require these number to be as compact as possible #pragma pack(push, 1) struct Tag { diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index a069046078..b76784b5a3 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -378,7 +378,8 @@ ACTOR Future logRouterPop( LogRouterData* self, TLogPopRequest req ) { self->poppedVersion = std::min(minKnownCommittedVersion, self->minKnownCommittedVersion); if(self->logSystem->get() && self->allowPops) { - self->logSystem->get()->pop(self->poppedVersion, self->routerTag); + const Tag popTag = self->logSystem->get()->getPseudoPopTag(self->routerTag, ProcessClass::LogRouterClass); + self->logSystem->get()->pop(self->poppedVersion, popTag); } req.reply.send(Void()); self->minPopped.set(std::max(minPopped, self->minPopped.get())); diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 98cf20f139..df7ef6ecc7 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -697,6 +697,16 @@ struct ILogSystem { virtual Tag getRandomRouterTag() = 0; virtual void stopRejoins() = 0; + + virtual void addPseudoLocality(int8_t locality) = 0; + + // Returns the pseudo tag to be popped for the given process class. If the + // process class doesn't use pseudo tag, return the same tag. + virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) = 0; + + virtual bool isPseudoLocality(int8_t locality) = 0; + + virtual Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) = 0; }; struct LengthPrefixedStringRef { diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 35de35251c..5cd0895a66 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -882,25 +882,32 @@ std::deque> & getVersionMessages( Re }; ACTOR Future tLogPop( TLogData* self, TLogPopRequest req, Reference logData ) { - auto tagData = logData->getTagData(req.tag); + state Version upTo = req.to; + int8_t tagLocality = req.tag.locality; + if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) { + upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to); + tagLocality = tagLocalityLogRouter; + } + state Tag tag(tagLocality, req.tag.id); + auto tagData = logData->getTagData(tag); if (!tagData) { - tagData = logData->createTagData(req.tag, req.to, true, true, false); - } else if (req.to > tagData->popped) { - tagData->popped = req.to; + tagData = logData->createTagData(tag, upTo, true, true, false); + } else if (upTo > tagData->popped) { + tagData->popped = upTo; tagData->poppedRecently = true; - if(tagData->unpoppedRecovered && req.to > logData->recoveredAt) { + if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) { tagData->unpoppedRecovered = false; logData->unpoppedRecoveredTags--; - TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", req.tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt); + TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt); if(logData->unpoppedRecoveredTags == 0 && logData->durableKnownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) { logData->recoveryComplete.send(Void()); } } - if ( req.to > logData->persistentDataDurableVersion ) - wait(tagData->eraseMessagesBefore( req.to, self, logData, TaskTLogPop )); - //TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to); + if (upTo > logData->persistentDataDurableVersion) + wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskTLogPop)); + //TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo); } req.reply.send(Void()); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 67cd0ee5fd..710668fc78 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1122,26 +1122,33 @@ std::deque> & getVersionMessages( Re }; ACTOR Future tLogPop( TLogData* self, TLogPopRequest req, Reference logData ) { - auto tagData = logData->getTagData(req.tag); + state Version upTo = req.to; + int8_t tagLocality = req.tag.locality; + if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) { + upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to); + tagLocality = tagLocalityLogRouter; + } + state Tag tag(tagLocality, req.tag.id); + auto tagData = logData->getTagData(tag); if (!tagData) { - tagData = logData->createTagData(req.tag, req.to, true, true, false); - } else if (req.to > tagData->popped) { - tagData->popped = req.to; + tagData = logData->createTagData(tag, upTo, true, true, false); + } else if (upTo > tagData->popped) { + tagData->popped = upTo; tagData->poppedRecently = true; tagData->requiresPoppedLocationUpdate = true; - if(tagData->unpoppedRecovered && req.to > logData->recoveredAt) { + if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) { tagData->unpoppedRecovered = false; logData->unpoppedRecoveredTags--; - TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", req.tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt); + TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt); if(logData->unpoppedRecoveredTags == 0 && logData->durableKnownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) { logData->recoveryComplete.send(Void()); } } - if ( req.to > logData->persistentDataDurableVersion ) - wait(tagData->eraseMessagesBefore( req.to, self, logData, TaskTLogPop )); - //TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to); + if (upTo > logData->persistentDataDurableVersion) + wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskTLogPop)); + //TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo); } req.reply.send(Void()); diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index e6128515bc..63f81f1967 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -166,6 +166,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted pseudoLocalities; + std::map pseudoLocalityPopVersion; // new members Future rejoins; @@ -216,6 +217,41 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 0) { + tag.locality = tagLocalityLogRouterMapped; + } + break; + + default: + break; + } + return tag; + } + + bool isPseudoLocality(int8_t locality) override { + return pseudoLocalities.count(locality) > 0; + } + + Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) override { + ASSERT(isPseudoLocality(locality)); + auto& localityVersion = pseudoLocalityPopVersion[locality]; + localityVersion = std::max(localityVersion, upTo); + Version minVersion = localityVersion; + for (const auto& it : pseudoLocalityPopVersion) { + minVersion = std::min(minVersion, it.second); + } + return minVersion; + } + static Future recoverAndEndEpoch(Reference>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream const& rejoins, LocalityData const& locality, bool* forceRecovery) { return epochEnd( outLogSystem, dbgid, oldState, rejoins, locality, forceRecovery ); } @@ -1083,7 +1119,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 0; + return logRouterTags > 0 || pseudoLocalities.size() > 0; } virtual Tag getRandomRouterTag() { @@ -1750,7 +1786,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 1) { logSystem->logRouterTags = recr.tLogs.size() * std::max(1, configuration.desiredLogRouterCount / std::max(1,recr.tLogs.size())); logSystem->expectedLogSets++; - logSystem->pseudoLocalities.insert(tagLocalityLogRouter); + logSystem->addPseudoLocality(tagLocalityLogRouterMapped); } logSystem->tLogs.emplace_back(new LogSet()); From 8b5449e608bdb033071dab528c7aa730134b2149 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 29 Apr 2019 16:44:11 -0700 Subject: [PATCH 17/17] Fix review comments for PR #1473 --- fdbserver/LogSystem.h | 4 +--- fdbserver/OldTLogServer_6_0.actor.cpp | 2 +- fdbserver/TLogServer.actor.cpp | 2 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 4 ++-- fdbserver/masterserver.actor.cpp | 2 ++ 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index df7ef6ecc7..fb9c5506fa 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -698,15 +698,13 @@ struct ILogSystem { virtual void stopRejoins() = 0; - virtual void addPseudoLocality(int8_t locality) = 0; - // Returns the pseudo tag to be popped for the given process class. If the // process class doesn't use pseudo tag, return the same tag. virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) = 0; virtual bool isPseudoLocality(int8_t locality) = 0; - virtual Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) = 0; + virtual Version popPseudoLocalityTag(int8_t locality, Version upTo) = 0; }; struct LengthPrefixedStringRef { diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 5cd0895a66..88a8638ec8 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -885,7 +885,7 @@ ACTOR Future tLogPop( TLogData* self, TLogPopRequest req, ReferencelogSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) { - upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to); + upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, req.to); tagLocality = tagLocalityLogRouter; } state Tag tag(tagLocality, req.tag.id); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 710668fc78..7d4fef1b44 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1125,7 +1125,7 @@ ACTOR Future tLogPop( TLogData* self, TLogPopRequest req, ReferencelogSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) { - upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to); + upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, req.to); tagLocality = tagLocalityLogRouter; } state Tag tag(tagLocality, req.tag.id); diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 63f81f1967..2a3beea47a 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -217,7 +217,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 0; } - Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) override { + Version popPseudoLocalityTag(int8_t locality, Version upTo) override { ASSERT(isPseudoLocality(locality)); auto& localityVersion = pseudoLocalityPopVersion[locality]; localityVersion = std::max(localityVersion, upTo); diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 9f6630d3d5..92d9ce2fe0 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -326,10 +326,12 @@ ACTOR Future newTLogServers( Reference self, RecruitFromConfig Future fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max(1, self->configuration.desiredLogRouterCount / std::max(1, recr.tLogs.size())), exclusionWorkerIds) ) ); self->primaryLocality = self->dcId_locality[recr.dcId]; + self->logSystem = Reference(); // Cancels the actors in the previous log system. Reference newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) ); self->logSystem = newLogSystem; } else { self->primaryLocality = tagLocalitySpecial; + self->logSystem = Reference(); // Cancels the actors in the previous log system. Reference newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) ); self->logSystem = newLogSystem; }