to support more complicated policies in the future for determining the best location for a tag within a set of tlogs, use an integer instead of a bool
This commit is contained in:
parent
2e3b1d7ab8
commit
af97a512f5
|
@ -41,19 +41,19 @@ struct CoreTLogSet {
|
|||
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
|
||||
IRepPolicyRef tLogPolicy;
|
||||
bool isLocal;
|
||||
bool hasBest;
|
||||
int32_t hasBestPolicy;
|
||||
int8_t locality;
|
||||
|
||||
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {}
|
||||
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(-99) {}
|
||||
|
||||
bool operator == (CoreTLogSet const& rhs) const {
|
||||
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal &&
|
||||
hasBest == rhs.hasBest && locality == rhs.locality && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
|
||||
hasBestPolicy == rhs.hasBestPolicy && locality == rhs.locality && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
|
||||
}
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest & locality;
|
||||
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -44,14 +44,23 @@ public:
|
|||
std::vector<int> logIndexArray;
|
||||
std::map<int,LocalityEntry> logEntryMap;
|
||||
bool isLocal;
|
||||
bool hasBest;
|
||||
int32_t hasBestPolicy;
|
||||
int8_t locality;
|
||||
|
||||
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {}
|
||||
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(-99) {}
|
||||
|
||||
int bestLocationFor( Tag tag ) {
|
||||
if(tag == txsTag) return hasBest ? txsTagOld % logServers.size() : -1;
|
||||
return hasBest ? tag.id % logServers.size() : -1;
|
||||
if(hasBestPolicy == HasBestPolicyNone) {
|
||||
return -1;
|
||||
} else if(hasBestPolicy == HasBestPolicyId) {
|
||||
//This policy supports upgrades from 5.X
|
||||
if(tag == txsTag) return txsTagOld % logServers.size();
|
||||
return tag.id % logServers.size();
|
||||
} else {
|
||||
//Unsupported policy
|
||||
ASSERT(false);
|
||||
throw internal_error();
|
||||
}
|
||||
}
|
||||
|
||||
void updateLocalitySet() {
|
||||
|
@ -94,7 +103,7 @@ public:
|
|||
alsoServers.clear();
|
||||
resultEntries.clear();
|
||||
|
||||
if(hasBest) {
|
||||
if(hasBestPolicy) {
|
||||
for(auto& t : tags) {
|
||||
if(t.locality == locality || t.locality == tagLocalitySpecial || locality == tagLocalitySpecial) {
|
||||
newLocations.push_back(bestLocationFor(t));
|
||||
|
|
|
@ -55,6 +55,8 @@ protected:
|
|||
Optional<Interface> iface;
|
||||
};
|
||||
|
||||
enum { HasBestPolicyNone = 0, HasBestPolicyId = 1 };
|
||||
|
||||
struct TLogSet {
|
||||
std::vector<OptionalInterface<TLogInterface>> tLogs;
|
||||
std::vector<OptionalInterface<TLogInterface>> logRouters;
|
||||
|
@ -63,16 +65,16 @@ struct TLogSet {
|
|||
IRepPolicyRef tLogPolicy;
|
||||
int8_t locality;
|
||||
bool isLocal;
|
||||
bool hasBest;
|
||||
int32_t hasBestPolicy;
|
||||
|
||||
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {}
|
||||
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(-99) {}
|
||||
|
||||
std::string toString() const {
|
||||
return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBest, logRouters.size(), describe(tLogs).c_str(), locality);
|
||||
return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBestPolicy, logRouters.size(), describe(tLogs).c_str(), locality);
|
||||
}
|
||||
|
||||
bool operator == ( const TLogSet& rhs ) const {
|
||||
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBest != rhs.hasBest || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality) {
|
||||
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBestPolicy != rhs.hasBestPolicy || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality) {
|
||||
return false;
|
||||
}
|
||||
if ((tLogPolicy && !rhs.tLogPolicy) || (!tLogPolicy && rhs.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != rhs.tLogPolicy->info()))) {
|
||||
|
@ -87,7 +89,7 @@ struct TLogSet {
|
|||
}
|
||||
|
||||
bool isEqualIds(TLogSet const& r) const {
|
||||
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBest != r.hasBest || tLogs.size() != r.tLogs.size() || locality != r.locality) {
|
||||
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBestPolicy != r.hasBestPolicy || tLogs.size() != r.tLogs.size() || locality != r.locality) {
|
||||
return false;
|
||||
}
|
||||
if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) {
|
||||
|
@ -103,7 +105,7 @@ struct TLogSet {
|
|||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest & locality;
|
||||
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->tLogPolicy = tLogSet.tLogPolicy;
|
||||
logSet->tLogLocalities = tLogSet.tLogLocalities;
|
||||
logSet->isLocal = tLogSet.isLocal;
|
||||
logSet->hasBest = tLogSet.hasBest;
|
||||
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
|
||||
logSet->locality = tLogSet.locality;
|
||||
logSet->updateLocalitySet();
|
||||
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
|
||||
|
@ -153,7 +153,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->tLogPolicy = tLogData.tLogPolicy;
|
||||
logSet->tLogLocalities = tLogData.tLogLocalities;
|
||||
logSet->isLocal = tLogData.isLocal;
|
||||
logSet->hasBest = tLogData.hasBest;
|
||||
logSet->hasBestPolicy = tLogData.hasBestPolicy;
|
||||
logSet->locality = tLogData.locality;
|
||||
//logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->tLogPolicy = tLogSet.tLogPolicy;
|
||||
logSet->tLogLocalities = tLogSet.tLogLocalities;
|
||||
logSet->isLocal = tLogSet.isLocal;
|
||||
logSet->hasBest = tLogSet.hasBest;
|
||||
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
|
||||
logSet->locality = tLogSet.locality;
|
||||
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
|
||||
}
|
||||
|
@ -210,7 +210,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->tLogPolicy = tLogSet.tLogPolicy;
|
||||
logSet->tLogLocalities = tLogSet.tLogLocalities;
|
||||
logSet->isLocal = tLogSet.isLocal;
|
||||
logSet->hasBest = tLogSet.hasBest;
|
||||
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
|
||||
logSet->locality = tLogSet.locality;
|
||||
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
|
||||
coreSet.tLogPolicy = t->tLogPolicy;
|
||||
coreSet.isLocal = t->isLocal;
|
||||
coreSet.hasBest = t->hasBest;
|
||||
coreSet.hasBestPolicy = t->hasBestPolicy;
|
||||
coreSet.locality = t->locality;
|
||||
newState.tLogs.push_back(coreSet);
|
||||
}
|
||||
|
@ -256,7 +256,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
|
||||
coreSet.tLogPolicy = t->tLogPolicy;
|
||||
coreSet.isLocal = t->isLocal;
|
||||
coreSet.hasBest = t->hasBest;
|
||||
coreSet.hasBestPolicy = t->hasBestPolicy;
|
||||
coreSet.locality = t->locality;
|
||||
newState.oldTLogData[i].tLogs.push_back(coreSet);
|
||||
}
|
||||
|
@ -352,7 +352,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
int bestSet = -1;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
if(tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -368,7 +368,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
|
||||
int bestOldSet = -1;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->hasBest && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
if(oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
bestOldSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -395,7 +395,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
} else {
|
||||
int bestSet = -1;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
if(tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -420,7 +420,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for(int i = 0; i < history.size(); i++) {
|
||||
bestSet = -1;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->hasBest && (tLogs[t]->locality == history[i].second.locality || history[i].second.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
if(tLogs[t]->hasBestPolicy && (tLogs[t]->locality == history[i].second.locality || history[i].second.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -575,7 +575,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
log.tLogPolicy = logSet->tLogPolicy;
|
||||
log.tLogLocalities = logSet->tLogLocalities;
|
||||
log.isLocal = logSet->isLocal;
|
||||
log.hasBest = logSet->hasBest;
|
||||
log.hasBestPolicy = logSet->hasBestPolicy;
|
||||
log.locality = logSet->locality;
|
||||
|
||||
for( int i = 0; i < logSet->logServers.size(); i++ ) {
|
||||
|
@ -601,7 +601,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
log.tLogPolicy = logSet->tLogPolicy;
|
||||
log.tLogLocalities = logSet->tLogLocalities;
|
||||
log.isLocal = logSet->isLocal;
|
||||
log.hasBest = logSet->hasBest;
|
||||
log.hasBestPolicy = logSet->hasBestPolicy;
|
||||
log.locality = logSet->locality;
|
||||
|
||||
for( int i = 0; i < logSet->logServers.size(); i++ ) {
|
||||
|
@ -752,7 +752,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->tLogPolicy = coreSet.tLogPolicy;
|
||||
logSet->tLogLocalities = coreSet.tLogLocalities;
|
||||
logSet->isLocal = coreSet.isLocal;
|
||||
logSet->hasBest = coreSet.hasBest;
|
||||
logSet->hasBestPolicy = coreSet.hasBestPolicy;
|
||||
logSet->locality = coreSet.locality;
|
||||
logFailed.push_back(failed);
|
||||
}
|
||||
|
@ -775,7 +775,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->tLogPolicy = log.tLogPolicy;
|
||||
logSet->tLogLocalities = log.tLogLocalities;
|
||||
logSet->isLocal = log.isLocal;
|
||||
logSet->hasBest = log.hasBest;
|
||||
logSet->hasBestPolicy = log.hasBestPolicy;
|
||||
logSet->locality = log.locality;
|
||||
}
|
||||
oldData.epochEnd = old.epochEnd;
|
||||
|
@ -994,7 +994,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
|
||||
logSet->tLogPolicy = configuration.remoteTLogPolicy;
|
||||
logSet->isLocal = false;
|
||||
logSet->hasBest = true;
|
||||
logSet->hasBestPolicy = HasBestPolicyId;
|
||||
logSet->locality = remoteLocality;
|
||||
|
||||
//recruit temporary log routers and update registration with them
|
||||
|
@ -1082,7 +1082,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
|
||||
logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy;
|
||||
logSystem->tLogs[0]->isLocal = true;
|
||||
logSystem->tLogs[0]->hasBest = true;
|
||||
logSystem->tLogs[0]->hasBestPolicy = HasBestPolicyId;
|
||||
logSystem->tLogs[0]->locality = primaryLocality;
|
||||
|
||||
if(configuration.satelliteTLogReplicationFactor > 0) {
|
||||
|
@ -1091,7 +1091,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->tLogs[1]->tLogReplicationFactor = configuration.satelliteTLogReplicationFactor;
|
||||
logSystem->tLogs[1]->tLogPolicy = configuration.satelliteTLogPolicy;
|
||||
logSystem->tLogs[1]->isLocal = true;
|
||||
logSystem->tLogs[1]->hasBest = false;
|
||||
logSystem->tLogs[1]->hasBestPolicy = HasBestPolicyNone;
|
||||
logSystem->tLogs[1]->locality = -99;
|
||||
logSystem->expectedLogSets++;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue