fix: populateSatelliteTagLocations was broken

fix: satellites do not index the upgraded locality
This commit is contained in:
Evan Tschannen 2018-06-17 13:29:17 -07:00
parent 6931a00993
commit f637c680f1
3 changed files with 32 additions and 37 deletions

View File

@ -77,16 +77,14 @@ public:
satelliteTagLocations.clear();
satelliteTagLocations.resize(std::max(logRouterTags,oldLogRouterTags) + 1);
std::vector<std::set<int>> used_servers;
used_servers.resize(satelliteTagLocations.size() + 1);
std::set<std::pair<int,int>> used_servers;
for(int i = 0; i < tLogLocalities.size(); i++) {
used_servers[0].insert(i);
used_servers.insert(std::make_pair(0,i));
}
LocalitySetRef serverSet = Reference<LocalitySet>(new LocalityMap<std::pair<int,int>>());
LocalityMap<std::pair<int,int>>* serverMap = (LocalityMap<std::pair<int,int>>*) serverSet.getPtr();
std::vector<std::pair<int,int>> serverLocations;
serverLocations.resize(tLogLocalities.size());
std::vector<std::pair<int,int>> resultPairs;
for(int loc = 0; loc < satelliteTagLocations.size(); loc++) {
int team = loc;
if(loc < logRouterTags) {
@ -95,42 +93,33 @@ public:
team = 0;
}
int used = 0;
int nextServerLocation = 0;
bool teamComplete = false;
alsoServers.resize(1);
serverMap->clear();
loop {
ASSERT(used < used_servers.size());
if(!used_servers[used].size()) {
continue;
}
for(int idx : used_servers[used]) {
serverLocations[nextServerLocation].first = used;
serverLocations[nextServerLocation].second = idx;
auto entry = serverMap->add(tLogLocalities[idx], &serverLocations[nextServerLocation]);
nextServerLocation++;
if(!satelliteTagLocations[team].size()) {
satelliteTagLocations[team].push_back(idx);
alsoServers[0] = entry;
}
resultPairs.clear();
for(auto& used_idx : used_servers) {
auto entry = serverMap->add(tLogLocalities[used_idx.second], &used_idx);
if(!resultPairs.size()) {
resultPairs.push_back(used_idx);
alsoServers[0] = entry;
}
resultEntries.clear();
if( serverSet->selectReplicas(tLogPolicy, alsoServers, resultEntries) ) {
for (auto& entry : resultEntries) {
auto obj = serverMap->getObject(entry);
satelliteTagLocations[team].push_back(obj->second);
used_servers[obj->first].erase(obj->second);
used_servers[obj->first+1].insert(obj->second);
for(auto& entry : resultEntries) {
resultPairs.push_back(*serverMap->getObject(entry));
}
used_servers[serverLocations[0].first].erase(serverLocations[0].second);
used_servers[serverLocations[0].first+1].insert(serverLocations[0].second);
for(auto& res : resultPairs) {
satelliteTagLocations[team].push_back(res.second);
used_servers.erase(res);
res.first++;
used_servers.insert(res);
}
teamComplete = true;
break;
}
used++;
}
ASSERT(teamComplete);
}
checkSatelliteTagLocations();
@ -201,7 +190,7 @@ public:
void getPushLocations( std::vector<Tag> const& tags, std::vector<int>& locations, int locationOffset ) {
if(locality == tagLocalitySatellite) {
for(auto& t : tags) {
if(t.locality < 0) {
if(t == txsTag || t.locality == tagLocalityLogRouter) {
for(int loc : satelliteTagLocations[t == txsTag ? 0 : t.id + 1]) {
locations.push_back(locationOffset + loc);
}

View File

@ -808,7 +808,11 @@ void commitMessages( Reference<LogData> self, Version version, const std::vector
block.append(block.arena(), msg.message.begin(), msg.message.size());
for(auto tag : msg.tags) {
if(!(self->locality == tagLocalitySpecial || self->locality == tag.locality || tag.locality < 0)) {
if(self->locality == tagLocalitySatellite) {
if(!(tag == txsTag || tag.locality == tagLocalityLogRouter)) {
continue;
}
} else if(!(self->locality == tagLocalitySpecial || self->locality == tag.locality || tag.locality < 0)) {
continue;
}

View File

@ -418,7 +418,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<Reference<LogSet>> localSets;
Version lastBegin = 0;
for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality || tag.locality < 0)) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality ||
tag == txsTag || tag.locality == tagLocalityLogRouter || (tag.locality == tagLocalityUpgraded && log->locality != tagLocalitySatellite))) {
lastBegin = std::max(lastBegin, log->startVersion);
localSets.push_back(log);
if(log->locality != tagLocalitySatellite) {
@ -460,7 +461,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<Reference<LogSet>> localOldSets;
Version thisBegin = begin;
for(auto& log : oldLogData[i].tLogs) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality || tag.locality < 0)) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality ||
tag == txsTag || tag.locality == tagLocalityLogRouter || (tag.locality == tagLocalityUpgraded && log->locality != tagLocalitySatellite))) {
thisBegin = std::max(thisBegin, log->startVersion);
localOldSets.push_back(log);
if(log->locality != tagLocalitySatellite) {
@ -593,7 +595,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int bestSet = -1;
bool foundSpecial = false;
for(int t = 0; t < tLogs.size(); t++) {
if(tLogs[t]->logServers.size() && (tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || tLogs[t]->locality == tag.locality)) {
if(tLogs[t]->logServers.size() && (tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || tLogs[t]->locality == tag.locality || tag.locality == tagLocalityUpgraded)) {
if( tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}
@ -627,7 +629,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int bestOldSet = -1;
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == tag.locality)) {
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalityUpgraded)) {
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}