removed hasBestPolicy

This commit is contained in:
Evan Tschannen 2018-06-15 12:36:19 -07:00
parent 0d87186821
commit f694f7c9ca
8 changed files with 57 additions and 102 deletions

View File

@ -794,10 +794,11 @@ public:
if ( tlogWorker->second.priorityInfo.isExcluded )
return true;
if(logSet.isLocal && logSet.hasBestPolicy > HasBestPolicyNone) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else if(logSet.isLocal) {
if(logSet.isLocal && logSet.locality == tagLocalitySatellite) {
satellite_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
else if(logSet.isLocal) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else {
remote_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
@ -1952,7 +1953,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().recoveryState == RecoveryState::REMOTE_RECOVERED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.hasBestPolicy) {
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
primaryLog = tLog.interf();

View File

@ -41,20 +41,19 @@ struct CoreTLogSet {
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
bool operator == (CoreTLogSet const& rhs) const {
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && hasBestPolicy == rhs.hasBestPolicy &&
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal &&
locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
}
template <class Archive>
void serialize(Archive& ar) {
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality & startVersion;
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
}
};

View File

@ -105,7 +105,6 @@ struct LogRouterData {
//setup just enough of a logSet to be able to call getPushLocations
logSet.logServers.resize(req.tLogLocalities.size());
logSet.tLogPolicy = req.tLogPolicy;
logSet.hasBestPolicy = req.hasBestPolicy;
logSet.locality = req.locality;
logSet.updateLocalitySet(req.tLogLocalities);
@ -413,7 +412,7 @@ ACTOR Future<Void> logRouter(
Reference<AsyncVar<ServerDBInfo>> db)
{
try {
TraceEvent("LogRouterStart", interf.id()).detail("Start", req.startVersion).detail("Tag", req.routerTag.toString()).detail("Localities", req.tLogLocalities.size()).detail("HasBestPolicy", req.hasBestPolicy).detail("Locality", req.locality);
TraceEvent("LogRouterStart", interf.id()).detail("Start", req.startVersion).detail("Tag", req.routerTag.toString()).detail("Localities", req.tLogLocalities.size()).detail("Locality", req.locality);
state Future<Void> core = logRouterCore(interf, req, db);
loop choose{
when(Void _ = wait(core)) { return Void(); }

View File

@ -44,12 +44,11 @@ public:
std::vector<int> logIndexArray;
std::map<int,LocalityEntry> logEntryMap;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;
std::vector<Future<TLogLockResult>> replies;
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
std::string logRouterString() {
std::string result;
@ -74,17 +73,8 @@ public:
}
int bestLocationFor( Tag tag ) {
if(hasBestPolicy == HasBestPolicyNone) {
return -1;
} else if(hasBestPolicy == HasBestPolicyId) {
//This policy supports upgrades from 5.X
if(tag == txsTag) return txsTagOld % logServers.size();
return tag.id % logServers.size();
} else {
//Unsupported policy
ASSERT(false);
throw internal_error();
}
if(tag == txsTag) return txsTagOld % logServers.size();
return tag.id % logServers.size();
}
void updateLocalitySet() {
@ -127,11 +117,9 @@ public:
alsoServers.clear();
resultEntries.clear();
if(hasBestPolicy) {
for(auto& t : tags) {
if(t.locality == locality || t.locality == tagLocalitySpecial || locality == tagLocalitySpecial || (isLocal && t.locality == tagLocalityLogRouter)) {
newLocations.push_back(bestLocationFor(t));
}
for(auto& t : tags) {
if(locality == tagLocalitySpecial || t.locality == locality || t.locality < 0) {
newLocations.push_back(bestLocationFor(t));
}
}

View File

@ -55,8 +55,6 @@ protected:
Optional<Interface> iface;
};
enum { HasBestPolicyNone = 0, HasBestPolicyId = 1 };
struct TLogSet {
std::vector<OptionalInterface<TLogInterface>> tLogs;
std::vector<OptionalInterface<TLogInterface>> logRouters;
@ -64,18 +62,17 @@ struct TLogSet {
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
std::string toString() const {
return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBestPolicy, logRouters.size(), describe(tLogs).c_str(), locality);
return format("anti: %d replication: %d local: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, logRouters.size(), describe(tLogs).c_str(), locality);
}
bool operator == ( const TLogSet& rhs ) const {
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBestPolicy != rhs.hasBestPolicy ||
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal ||
startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) {
return false;
}
@ -96,7 +93,7 @@ struct TLogSet {
}
bool isEqualIds(TLogSet const& r) const {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBestPolicy != r.hasBestPolicy || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
return false;
}
if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) {
@ -112,7 +109,7 @@ struct TLogSet {
template <class Ar>
void serialize( Ar& ar ) {
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality & startVersion;
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
}
};

View File

@ -1511,20 +1511,20 @@ static StatusArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<
}
}
maxFaultTolerance = std::max(maxFaultTolerance, it.tLogs[i].tLogReplicationFactor - 1 - it.tLogs[i].tLogWriteAntiQuorum - failedLogs);
if(it.tLogs[i].isLocal && it.tLogs[i].hasBestPolicy) {
if(it.tLogs[i].isLocal && it.tLogs[i].locality == tagLocalitySatellite) {
statusObj["satellite_log_replication_factor"] = it.tLogs[i].tLogReplicationFactor;
statusObj["satellite_log_write_anti_quorum"] = it.tLogs[i].tLogWriteAntiQuorum;
statusObj["satellite_log_fault_tolerance"] = it.tLogs[i].tLogReplicationFactor - 1 - it.tLogs[i].tLogWriteAntiQuorum - failedLogs;
}
else if(it.tLogs[i].isLocal) {
statusObj["log_replication_factor"] = it.tLogs[i].tLogReplicationFactor;
statusObj["log_write_anti_quorum"] = it.tLogs[i].tLogWriteAntiQuorum;
statusObj["log_fault_tolerance"] = it.tLogs[i].tLogReplicationFactor - 1 - it.tLogs[i].tLogWriteAntiQuorum - failedLogs;
}
else if(!it.tLogs[i].isLocal) {
else {
statusObj["remote_log_replication_factor"] = it.tLogs[i].tLogReplicationFactor;
statusObj["remote_log_fault_tolerance"] = it.tLogs[i].tLogReplicationFactor - 1 - failedLogs;
}
else if(it.tLogs[i].isLocal && !it.tLogs[i].hasBestPolicy) {
statusObj["satellite_log_replication_factor"] = it.tLogs[i].tLogReplicationFactor;
statusObj["satellite_log_write_anti_quorum"] = it.tLogs[i].tLogWriteAntiQuorum;
statusObj["satellite_log_fault_tolerance"] = it.tLogs[i].tLogReplicationFactor - 1 - it.tLogs[i].tLogWriteAntiQuorum - failedLogs;
}
}
*oldLogFaultTolerance = std::min(*oldLogFaultTolerance, maxFaultTolerance);
statusObj["logs"] = logsObj;

View File

@ -150,7 +150,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->updateLocalitySet();
@ -176,7 +175,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogData.tLogPolicy;
logSet->tLogLocalities = tLogData.tLogLocalities;
logSet->isLocal = tLogData.isLocal;
logSet->hasBestPolicy = tLogData.hasBestPolicy;
logSet->locality = tLogData.locality;
logSet->startVersion = tLogData.startVersion;
//logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs
@ -211,7 +209,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
@ -237,7 +234,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
@ -272,7 +268,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBestPolicy = t->hasBestPolicy;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
newState.tLogs.push_back(coreSet);
@ -294,7 +289,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBestPolicy = t->hasBestPolicy;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
newState.oldTLogData[i].tLogs.push_back(coreSet);
@ -414,17 +408,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore, bool throwIfDead ) {
int bestSet = -1;
int nextBestSet = -1;
int bestSet = 0;
std::vector<Reference<LogSet>> localSets;
Version lastBegin = 0;
for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size() && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || tag.locality == tagLocalityLogRouter)) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality || tag.locality < 0)) {
lastBegin = std::max(lastBegin, log->startVersion);
localSets.push_back(log);
if(log->hasBestPolicy) {
if(log->locality != tagLocalitySatellite) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
}
}
}
@ -434,17 +426,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(begin >= lastBegin) {
TraceEvent("TLogPeekAllCurrentOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", bestSet >= 0 ? localSets[bestSet]->logServerString() : "no best set");
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, end, parallelGetMore ) );
TraceEvent("TLogPeekAllCurrentOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localSets[bestSet]->logServerString());
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, end, parallelGetMore ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
if(lastBegin < end) {
TraceEvent("TLogPeekAllAddingCurrent", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", bestSet >= 0 ? localSets[bestSet]->logServerString() : "no best set");
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, lastBegin, end, parallelGetMore)) );
TraceEvent("TLogPeekAllAddingCurrent", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localSets[bestSet]->logServerString());
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, lastBegin, end, parallelGetMore)) );
}
int i = 0;
while(begin < lastBegin) {
@ -460,17 +450,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
int bestOldSet = -1;
int nextBestOldSet = -1;
int bestOldSet = 0;
std::vector<Reference<LogSet>> localOldSets;
Version thisBegin = begin;
for(auto& log : oldLogData[i].tLogs) {
if(log->isLocal && log->logServers.size() && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || tag.locality == tagLocalityLogRouter)) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality || tag.locality < 0)) {
thisBegin = std::max(thisBegin, log->startVersion);
localOldSets.push_back(log);
if(log->hasBestPolicy) {
if(log->locality != tagLocalitySatellite) {
bestOldSet = localOldSets.size()-1;
nextBestOldSet = bestOldSet;
}
}
}
@ -486,10 +474,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(thisBegin < lastBegin) {
if(thisBegin < end) {
TraceEvent("TLogPeekAllAddingOld", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end)
.detail("BestLogs", bestOldSet >= 0 ? localOldSets[bestOldSet]->logServerString() : "no best set").detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin);
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet == -1 ? nextBestOldSet : bestOldSet,
bestOldSet >= 0 ? localOldSets[bestOldSet]->bestLocationFor( tag ) : -1, tag, thisBegin, std::min(lastBegin, end), parallelGetMore)) );
TraceEvent("TLogPeekAllAddingOld", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localOldSets[bestOldSet]->logServerString()).detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin);
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet, localOldSets[bestOldSet]->bestLocationFor( tag ), tag, thisBegin, std::min(lastBegin, end), parallelGetMore)) );
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
}
lastBegin = thisBegin;
@ -596,11 +582,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Reference<IPeekCursor> peekLocal( UID dbgid, Tag tag, Version begin, Version end ) {
ASSERT(tag.locality >= 0 || tag.locality == tagLocalityUpgraded);
int bestSet = -1;
bool foundSpecial = false;
for(int t = 0; t < tLogs.size(); t++) {
if(tLogs[t]->logServers.size() && tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || (tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
if( tLogs[t]->locality == tagLocalitySpecial ) {
if(tLogs[t]->logServers.size() && (tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || tLogs[t]->locality == tag.locality)) {
if( tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}
bestSet = t;
@ -633,8 +621,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int bestOldSet = -1;
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
if(oldLogData[i].tLogs[t]->logServers.size() && oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || (oldLogData[i].tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial ) {
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == tag.locality)) {
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}
bestOldSet = t;
@ -709,27 +697,24 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if( found ) {
if(stopped) {
int bestSet = -1;
int nextBestSet = -1;
std::vector<Reference<LogSet>> localSets;
int bestSet = 0;
for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size()) {
TraceEvent("TLogPeekLogRouterLocalSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LogServers", log->logServerString());
localSets.push_back(log);
if(log->hasBestPolicy) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
if(log->locality != tagLocalitySatellite) {
bestSet = localSets.size() - 1;
}
}
}
TraceEvent("TLogPeekLogRouterSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), false ) );
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, getPeekEnd(), false ) );
} else {
for( auto& log : tLogs ) {
if( log->logServers.size() && log->isLocal && log->hasBestPolicy ) {
if(log->logServers.size() && log->isLocal && log->locality != tagLocalitySatellite) {
TraceEvent("TLogPeekLogRouterBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LogId", log->logServers[log->bestLocationFor( tag )]->get().id());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( log->logServers[log->bestLocationFor( tag )], tag, begin, getPeekEnd(), false, false ) );
}
@ -751,24 +736,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if( found ) {
int bestSet = -1;
int nextBestSet = -1;
int bestSet = 0;
std::vector<Reference<LogSet>> localSets;
for(auto& log : old.tLogs) {
if(log->isLocal && log->logServers.size()) {
TraceEvent("TLogPeekLogRouterOldLocalSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LogServers", log->logServerString());
localSets.push_back(log);
if(log->hasBestPolicy) {
if(log->locality != tagLocalitySatellite) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
}
}
}
TraceEvent("TLogPeekLogRouterOldSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("OldEpoch", old.epochEnd).detail("PreviousEpochEndVersion", previousEpochEndVersion.present() ? previousEpochEndVersion.get() : -1).detail("FirstOld", firstOld);
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, firstOld && previousEpochEndVersion.present() ? previousEpochEndVersion.get() + 1 : old.epochEnd, false ) );
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, firstOld && previousEpochEndVersion.present() ? previousEpochEndVersion.get() + 1 : old.epochEnd, false ) );
}
firstOld = false;
}
@ -950,7 +932,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBestPolicy = logSet->hasBestPolicy;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
@ -977,7 +958,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBestPolicy = logSet->hasBestPolicy;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
@ -1213,7 +1193,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = coreSet.tLogPolicy;
logSet->tLogLocalities = coreSet.tLogLocalities;
logSet->isLocal = coreSet.isLocal;
logSet->hasBestPolicy = coreSet.hasBestPolicy;
logSet->locality = coreSet.locality;
logSet->startVersion = coreSet.startVersion;
logFailed.push_back(failed);
@ -1237,7 +1216,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = log.tLogPolicy;
logSet->tLogLocalities = log.tLogLocalities;
logSet->isLocal = log.isLocal;
logSet->hasBestPolicy = log.hasBestPolicy;
logSet->locality = log.locality;
logSet->startVersion = log.startVersion;
}
@ -1329,7 +1307,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
ACTOR static Future<Void> recruitOldLogRouters( TagPartitionedLogSystem* self, vector<WorkerInterface> workers, LogEpoch recoveryCount, int8_t locality, Version startVersion,
std::vector<LocalityData> tLogLocalities, IRepPolicyRef tLogPolicy, int32_t hasBestPolicy, bool forRemote ) {
std::vector<LocalityData> tLogLocalities, IRepPolicyRef tLogPolicy, bool forRemote ) {
state vector<vector<Future<TLogInterface>>> logRouterInitializationReplies;
state vector<Future<TLogInterface>> allReplies;
int nextRouter = 0;
@ -1379,7 +1357,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.startVersion = lastStart;
req.tLogLocalities = tLogLocalities;
req.tLogPolicy = tLogPolicy;
req.hasBestPolicy = hasBestPolicy;
req.locality = locality;
auto reply = transformErrors( throwErrorOr( workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() );
logRouterInitializationReplies.back().push_back( reply );
@ -1429,7 +1406,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.startVersion = lastStart;
req.tLogLocalities = tLogLocalities;
req.tLogPolicy = tLogPolicy;
req.hasBestPolicy = hasBestPolicy;
req.locality = locality;
auto reply = transformErrors( throwErrorOr( workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() );
logRouterInitializationReplies.back().push_back( reply );
@ -1506,7 +1482,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
logSet->tLogPolicy = configuration.remoteTLogPolicy;
logSet->isLocal = false;
logSet->hasBestPolicy = HasBestPolicyId;
logSet->locality = remoteLocality;
logSet->startVersion = oldLogSystem->knownCommittedVersion + 1;
@ -1534,7 +1509,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Future<Void> oldRouterRecruitment = Void();
if(logSet->startVersion < oldLogSystem->knownCommittedVersion + 1) {
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(self, remoteWorkers.logRouters, recoveryCount, remoteLocality, logSet->startVersion, localities, logSet->tLogPolicy, logSet->hasBestPolicy, true);
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(self, remoteWorkers.logRouters, recoveryCount, remoteLocality, logSet->startVersion, localities, logSet->tLogPolicy, true);
}
state vector<Future<TLogInterface>> logRouterInitializationReplies;
@ -1545,7 +1520,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.startVersion = std::max(self->tLogs[0]->startVersion, logSet->startVersion);
req.tLogLocalities = localities;
req.tLogPolicy = logSet->tLogPolicy;
req.hasBestPolicy = logSet->hasBestPolicy;
req.locality = remoteLocality;
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
}
@ -1619,7 +1593,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy;
logSystem->tLogs[0]->isLocal = true;
logSystem->tLogs[0]->hasBestPolicy = HasBestPolicyId;
logSystem->tLogs[0]->locality = primaryLocality;
state RegionInfo region = configuration.getRegion(recr.dcId);
@ -1630,7 +1603,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogs[1]->tLogReplicationFactor = region.satelliteTLogReplicationFactor;
logSystem->tLogs[1]->tLogPolicy = region.satelliteTLogPolicy;
logSystem->tLogs[1]->isLocal = true;
logSystem->tLogs[1]->hasBestPolicy = HasBestPolicyNone;
logSystem->tLogs[1]->locality = tagLocalitySatellite;
logSystem->tLogs[1]->startVersion = oldLogSystem->knownCommittedVersion + 1;
logSystem->expectedLogSets++;
@ -1685,7 +1657,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Future<Void> oldRouterRecruitment = Never();
TraceEvent("NewEpochStartVersion", oldLogSystem->getDebugID()).detail("StartVersion", logSystem->tLogs[0]->startVersion).detail("EpochEnd", oldLogSystem->knownCommittedVersion + 1).detail("Locality", primaryLocality).detail("OldLogRouterTags", oldLogSystem->logRouterTags);
if(oldLogSystem->logRouterTags > 0 || logSystem->tLogs[0]->startVersion < oldLogSystem->knownCommittedVersion + 1) {
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(oldLogSystem.getPtr(), recr.oldLogRouters, recoveryCount, primaryLocality, logSystem->tLogs[0]->startVersion, localities, logSystem->tLogs[0]->tLogPolicy, logSystem->tLogs[0]->hasBestPolicy, false);
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(oldLogSystem.getPtr(), recr.oldLogRouters, recoveryCount, primaryLocality, logSystem->tLogs[0]->startVersion, localities, logSystem->tLogs[0]->tLogPolicy, false);
if(oldLogSystem->knownCommittedVersion - logSystem->tLogs[0]->startVersion > SERVER_KNOBS->MAX_RECOVERY_VERSIONS) {
//make sure we can recover in the other DC.
for(auto& lockResult : oldLogSystem->lockResults) {

View File

@ -98,13 +98,12 @@ struct InitializeLogRouterRequest {
Version startVersion;
std::vector<LocalityData> tLogLocalities;
IRepPolicyRef tLogPolicy;
int32_t hasBestPolicy;
int8_t locality;
ReplyPromise<struct TLogInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
ar & recoveryCount & routerTag & startVersion & tLogLocalities & tLogPolicy & hasBestPolicy & locality & reply;
ar & recoveryCount & routerTag & startVersion & tLogLocalities & tLogPolicy & locality & reply;
}
};