satellite log push locations are static per tag, which will reduce the number of tags each satellite log has to index, and reduce the proxy cpu when calculating push locations

This commit is contained in:
Evan Tschannen 2018-06-16 17:39:02 -07:00
parent f694f7c9ca
commit 6931a00993
4 changed files with 131 additions and 23 deletions

View File

@ -43,17 +43,18 @@ struct CoreTLogSet {
bool isLocal;
int8_t locality;
Version startVersion;
std::vector<std::vector<int>> satelliteTagLocations;
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
bool operator == (CoreTLogSet const& rhs) const {
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal &&
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && satelliteTagLocations == rhs.satelliteTagLocations &&
locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
}
template <class Archive>
void serialize(Archive& ar) {
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion & satelliteTagLocations;
}
};

View File

@ -47,6 +47,7 @@ public:
int8_t locality;
Version startVersion;
std::vector<Future<TLogLockResult>> replies;
std::vector<std::vector<int>> satelliteTagLocations;
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
@ -72,7 +73,92 @@ public:
return result;
}
void populateSatelliteTagLocations(int logRouterTags, int oldLogRouterTags) {
satelliteTagLocations.clear();
satelliteTagLocations.resize(std::max(logRouterTags,oldLogRouterTags) + 1);
std::vector<std::set<int>> used_servers;
used_servers.resize(satelliteTagLocations.size() + 1);
for(int i = 0; i < tLogLocalities.size(); i++) {
used_servers[0].insert(i);
}
LocalitySetRef serverSet = Reference<LocalitySet>(new LocalityMap<std::pair<int,int>>());
LocalityMap<std::pair<int,int>>* serverMap = (LocalityMap<std::pair<int,int>>*) serverSet.getPtr();
std::vector<std::pair<int,int>> serverLocations;
serverLocations.resize(tLogLocalities.size());
for(int loc = 0; loc < satelliteTagLocations.size(); loc++) {
int team = loc;
if(loc < logRouterTags) {
team = loc + 1;
} else if(loc == logRouterTags) {
team = 0;
}
int used = 0;
int nextServerLocation = 0;
alsoServers.resize(1);
serverMap->clear();
loop {
ASSERT(used < used_servers.size());
if(!used_servers[used].size()) {
continue;
}
for(int idx : used_servers[used]) {
serverLocations[nextServerLocation].first = used;
serverLocations[nextServerLocation].second = idx;
auto entry = serverMap->add(tLogLocalities[idx], &serverLocations[nextServerLocation]);
nextServerLocation++;
if(!satelliteTagLocations[team].size()) {
satelliteTagLocations[team].push_back(idx);
alsoServers[0] = entry;
}
}
resultEntries.clear();
if( serverSet->selectReplicas(tLogPolicy, alsoServers, resultEntries) ) {
for (auto& entry : resultEntries) {
auto obj = serverMap->getObject(entry);
satelliteTagLocations[team].push_back(obj->second);
used_servers[obj->first].erase(obj->second);
used_servers[obj->first+1].insert(obj->second);
}
used_servers[serverLocations[0].first].erase(serverLocations[0].second);
used_servers[serverLocations[0].first+1].insert(serverLocations[0].second);
break;
}
used++;
}
}
checkSatelliteTagLocations();
}
void checkSatelliteTagLocations() {
std::vector<int> used;
used.resize(tLogLocalities.size());
for(auto team : satelliteTagLocations) {
for(auto loc : team) {
used[loc]++;
}
}
int minUsed = satelliteTagLocations.size();
int maxUsed = 0;
for(auto i : used) {
minUsed = std::min(minUsed, i);
maxUsed = std::max(maxUsed, i);
}
TraceEvent(maxUsed - minUsed > 1 ? (g_network->isSimulated() ? SevError : SevWarnAlways) : SevInfo, "CheckSatelliteTagLocations").detail("MinUsed", minUsed).detail("MaxUsed", maxUsed);
}
int bestLocationFor( Tag tag ) {
if(locality == tagLocalitySatellite) {
return satelliteTagLocations[tag == txsTag ? 0 : tag.id + 1][0];
}
//the following logic supports upgrades from 5.X
if(tag == txsTag) return txsTagOld % logServers.size();
return tag.id % logServers.size();
}
@ -113,6 +199,17 @@ public:
}
void getPushLocations( std::vector<Tag> const& tags, std::vector<int>& locations, int locationOffset ) {
if(locality == tagLocalitySatellite) {
for(auto& t : tags) {
if(t.locality < 0) {
for(int loc : satelliteTagLocations[t == txsTag ? 0 : t.id + 1]) {
locations.push_back(locationOffset + loc);
}
}
}
return;
}
newLocations.clear();
alsoServers.clear();
resultEntries.clear();

View File

@ -64,6 +64,7 @@ struct TLogSet {
bool isLocal;
int8_t locality;
Version startVersion;
std::vector<std::vector<int>> satelliteTagLocations;
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
@ -72,7 +73,7 @@ struct TLogSet {
}
bool operator == ( const TLogSet& rhs ) const {
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal ||
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || satelliteTagLocations != rhs.satelliteTagLocations ||
startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) {
return false;
}
@ -93,7 +94,8 @@ struct TLogSet {
}
bool isEqualIds(TLogSet const& r) const {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || satelliteTagLocations != r.satelliteTagLocations ||
startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
return false;
}
if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) {
@ -109,7 +111,7 @@ struct TLogSet {
template <class Ar>
void serialize( Ar& ar ) {
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion & satelliteTagLocations;
}
};

View File

@ -152,6 +152,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
logSet->updateLocalitySet();
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
}
@ -177,6 +178,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->isLocal = tLogData.isLocal;
logSet->locality = tLogData.locality;
logSet->startVersion = tLogData.startVersion;
logSet->satelliteTagLocations = tLogData.satelliteTagLocations;
//logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs
}
logSystem->oldLogData[i].logRouterTags = lsConf.oldTLogs[i].logRouterTags;
@ -211,6 +213,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
}
logSystem->logRouterTags = lsConf.oldTLogs[0].logRouterTags;
@ -236,6 +239,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
}
logSystem->oldLogData[i-1].logRouterTags = lsConf.oldTLogs[i].logRouterTags;
@ -270,6 +274,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.isLocal = t->isLocal;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
coreSet.satelliteTagLocations = t->satelliteTagLocations;
newState.tLogs.push_back(coreSet);
}
}
@ -291,6 +296,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.isLocal = t->isLocal;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
coreSet.satelliteTagLocations = t->satelliteTagLocations;
newState.oldTLogData[i].tLogs.push_back(coreSet);
}
}
@ -934,6 +940,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
log.isLocal = logSet->isLocal;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
log.satelliteTagLocations = logSet->satelliteTagLocations;
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
@ -960,6 +967,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
log.isLocal = logSet->isLocal;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
log.satelliteTagLocations = logSet->satelliteTagLocations;
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
@ -1195,6 +1203,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->isLocal = coreSet.isLocal;
logSet->locality = coreSet.locality;
logSet->startVersion = coreSet.startVersion;
logSet->satelliteTagLocations = coreSet.satelliteTagLocations;
logFailed.push_back(failed);
}
oldLogData.resize(prevState.oldTLogData.size());
@ -1218,6 +1227,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->isLocal = log.isLocal;
logSet->locality = log.locality;
logSet->startVersion = log.startVersion;
logSet->satelliteTagLocations = log.satelliteTagLocations;
}
oldData.epochEnd = old.epochEnd;
oldData.logRouterTags = old.logRouterTags;
@ -1587,6 +1597,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->previousEpochEndVersion = oldLogSystem->epochEndVersion;
logSystem->recruitmentID = g_random->randomUniqueID();
oldLogSystem->recruitmentID = logSystem->recruitmentID;
if(configuration.remoteTLogReplicationFactor > 0) {
logSystem->logRouterTags = recr.tLogs.size();
logSystem->expectedLogSets++;
} else {
logSystem->logRouterTags = 0;
}
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum;
@ -1605,14 +1622,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogs[1]->isLocal = true;
logSystem->tLogs[1]->locality = tagLocalitySatellite;
logSystem->tLogs[1]->startVersion = oldLogSystem->knownCommittedVersion + 1;
logSystem->expectedLogSets++;
}
if(configuration.remoteTLogReplicationFactor > 0) {
logSystem->logRouterTags = recr.tLogs.size();
logSystem->tLogs[1]->tLogLocalities.resize( recr.satelliteTLogs.size() );
for(int i = 0; i < recr.satelliteTLogs.size(); i++) {
logSystem->tLogs[1]->tLogLocalities[i] = recr.satelliteTLogs[i].locality;
}
filterLocalityDataForPolicy(logSystem->tLogs[1]->tLogPolicy, &logSystem->tLogs[1]->tLogLocalities);
logSystem->tLogs[1]->logServers.resize( recr.satelliteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[1]->updateLocalitySet(logSystem->tLogs[1]->tLogLocalities);
logSystem->tLogs[1]->populateSatelliteTagLocations(logSystem->logRouterTags,oldLogSystem->logRouterTags);
logSystem->expectedLogSets++;
} else {
logSystem->logRouterTags = 0;
}
if(oldLogSystem->tLogs.size()) {
@ -1740,16 +1760,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.logRouterTags = logSystem->logRouterTags;
}
vector<LocalityData> satelliteLocalities;
satelliteLocalities.resize(recr.satelliteTLogs.size());
for(int i = 0; i < recr.satelliteTLogs.size(); i++) {
satelliteLocalities[i] = recr.satelliteTLogs[i].locality;
}
logSystem->tLogs[1]->tLogLocalities.resize( recr.satelliteTLogs.size() );
logSystem->tLogs[1]->logServers.resize( recr.satelliteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[1]->updateLocalitySet(satelliteLocalities);
for(int i = -1; i < oldLogSystem->logRouterTags; i++) {
Tag tag = i == -1 ? txsTag : Tag(tagLocalityLogRouter, i);
locations.clear();
@ -1765,9 +1775,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < satelliteInitializationReplies.size(); i++ ) {
logSystem->tLogs[1]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(satelliteInitializationReplies[i].get()) ) );
logSystem->tLogs[1]->tLogLocalities[i] = recr.satelliteTLogs[i].locality;
}
filterLocalityDataForPolicy(logSystem->tLogs[1]->tLogPolicy, &logSystem->tLogs[1]->tLogLocalities);
for( int i = 0; i < logSystem->tLogs[1]->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[1]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );