Merge pull request #1290 from etschannen/feature-cheap-policy

Optimized a few uses of the replication policy engine
This commit is contained in:
Steve Atherton 2019-03-13 17:01:19 -07:00 committed by GitHub
commit be0da73938
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 231 additions and 227 deletions

View File

@ -38,7 +38,7 @@ void DatabaseConfiguration::resetInternal() {
autoDesiredTLogCount = CLIENT_KNOBS->DEFAULT_AUTO_LOGS;
usableRegions = 1;
regions.clear();
tLogPolicy = storagePolicy = remoteTLogPolicy = IRepPolicyRef();
tLogPolicy = storagePolicy = remoteTLogPolicy = Reference<IReplicationPolicy>();
remoteDesiredTLogCount = -1;
remoteTLogReplicationFactor = repopulateRegionAntiQuorum = 0;
}
@ -48,7 +48,7 @@ void parse( int* i, ValueRef const& v ) {
*i = atoi(v.toString().c_str());
}
void parseReplicationPolicy(IRepPolicyRef* policy, ValueRef const& v) {
void parseReplicationPolicy(Reference<IReplicationPolicy>* policy, ValueRef const& v) {
BinaryReader reader(v, IncludeVersion());
serializeReplicationPolicy(reader, *policy);
}
@ -91,35 +91,35 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
info.satelliteTLogReplicationFactor = 1;
info.satelliteTLogUsableDcs = 1;
info.satelliteTLogWriteAntiQuorum = 0;
info.satelliteTLogPolicy = IRepPolicyRef(new PolicyOne());
info.satelliteTLogPolicy = Reference<IReplicationPolicy>(new PolicyOne());
} else if(satelliteReplication == "one_satellite_double") {
info.satelliteTLogReplicationFactor = 2;
info.satelliteTLogUsableDcs = 1;
info.satelliteTLogWriteAntiQuorum = 0;
info.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
info.satelliteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else if(satelliteReplication == "one_satellite_triple") {
info.satelliteTLogReplicationFactor = 3;
info.satelliteTLogUsableDcs = 1;
info.satelliteTLogWriteAntiQuorum = 0;
info.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
info.satelliteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else if(satelliteReplication == "two_satellite_safe") {
info.satelliteTLogReplicationFactor = 4;
info.satelliteTLogUsableDcs = 2;
info.satelliteTLogWriteAntiQuorum = 0;
info.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))));
info.satelliteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "dcid", Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))));
info.satelliteTLogReplicationFactorFallback = 2;
info.satelliteTLogUsableDcsFallback = 1;
info.satelliteTLogWriteAntiQuorumFallback = 0;
info.satelliteTLogPolicyFallback = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
info.satelliteTLogPolicyFallback = Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else if(satelliteReplication == "two_satellite_fast") {
info.satelliteTLogReplicationFactor = 4;
info.satelliteTLogUsableDcs = 2;
info.satelliteTLogWriteAntiQuorum = 2;
info.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))));
info.satelliteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "dcid", Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))));
info.satelliteTLogReplicationFactorFallback = 2;
info.satelliteTLogUsableDcsFallback = 1;
info.satelliteTLogWriteAntiQuorumFallback = 0;
info.satelliteTLogPolicyFallback = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
info.satelliteTLogPolicyFallback = Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else {
throw invalid_option();
}
@ -141,20 +141,20 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
void DatabaseConfiguration::setDefaultReplicationPolicy() {
if(!storagePolicy) {
storagePolicy = IRepPolicyRef(new PolicyAcross(storageTeamSize, "zoneid", IRepPolicyRef(new PolicyOne())));
storagePolicy = Reference<IReplicationPolicy>(new PolicyAcross(storageTeamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
}
if(!tLogPolicy) {
tLogPolicy = IRepPolicyRef(new PolicyAcross(tLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(tLogReplicationFactor, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
}
if(remoteTLogReplicationFactor > 0 && !remoteTLogPolicy) {
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(remoteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
remoteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(remoteTLogReplicationFactor, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
}
for(auto& r : regions) {
if(r.satelliteTLogReplicationFactor > 0 && !r.satelliteTLogPolicy) {
r.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(r.satelliteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
r.satelliteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(r.satelliteTLogReplicationFactor, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
}
if(r.satelliteTLogReplicationFactorFallback > 0 && !r.satelliteTLogPolicyFallback) {
r.satelliteTLogPolicyFallback = IRepPolicyRef(new PolicyAcross(r.satelliteTLogReplicationFactorFallback, "zoneid", IRepPolicyRef(new PolicyOne())));
r.satelliteTLogPolicyFallback = Reference<IReplicationPolicy>(new PolicyAcross(r.satelliteTLogReplicationFactorFallback, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
}
}
}

View File

@ -49,13 +49,13 @@ struct RegionInfo {
Key dcId;
int32_t priority;
IRepPolicyRef satelliteTLogPolicy;
Reference<IReplicationPolicy> satelliteTLogPolicy;
int32_t satelliteDesiredTLogCount;
int32_t satelliteTLogReplicationFactor;
int32_t satelliteTLogWriteAntiQuorum;
int32_t satelliteTLogUsableDcs;
IRepPolicyRef satelliteTLogPolicyFallback;
Reference<IReplicationPolicy> satelliteTLogPolicyFallback;
int32_t satelliteTLogReplicationFactorFallback;
int32_t satelliteTLogWriteAntiQuorumFallback;
int32_t satelliteTLogUsableDcsFallback;
@ -157,7 +157,7 @@ struct DatabaseConfiguration {
int32_t autoResolverCount;
// TLogs
IRepPolicyRef tLogPolicy;
Reference<IReplicationPolicy> tLogPolicy;
int32_t desiredTLogCount;
int32_t autoDesiredTLogCount;
int32_t tLogWriteAntiQuorum;
@ -167,7 +167,7 @@ struct DatabaseConfiguration {
TLogSpillType tLogSpillType;
// Storage Servers
IRepPolicyRef storagePolicy;
Reference<IReplicationPolicy> storagePolicy;
int32_t storageTeamSize;
KeyValueStoreType storageServerStoreType;
@ -175,7 +175,7 @@ struct DatabaseConfiguration {
int32_t desiredLogRouterCount;
int32_t remoteDesiredTLogCount;
int32_t remoteTLogReplicationFactor;
IRepPolicyRef remoteTLogPolicy;
Reference<IReplicationPolicy> remoteTLogPolicy;
//Data centers
int32_t usableRegions;
@ -195,7 +195,7 @@ struct DatabaseConfiguration {
if(desired == -1) return autoDesiredTLogCount; return desired;
}
int32_t getRemoteTLogReplicationFactor() const { if(remoteTLogReplicationFactor == 0) return tLogReplicationFactor; return remoteTLogReplicationFactor; }
IRepPolicyRef getRemoteTLogPolicy() const { if(remoteTLogReplicationFactor == 0) return tLogPolicy; return remoteTLogPolicy; }
Reference<IReplicationPolicy> getRemoteTLogPolicy() const { if(remoteTLogReplicationFactor == 0) return tLogPolicy; return remoteTLogPolicy; }
bool operator == ( DatabaseConfiguration const& rhs ) const {
const_cast<DatabaseConfiguration*>(this)->makeConfigurationImmutable();

View File

@ -99,42 +99,42 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
}
std::string redundancy, log_replicas;
IRepPolicyRef storagePolicy;
IRepPolicyRef tLogPolicy;
Reference<IReplicationPolicy> storagePolicy;
Reference<IReplicationPolicy> tLogPolicy;
bool redundancySpecified = true;
if (mode == "single") {
redundancy="1";
log_replicas="1";
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyOne());
storagePolicy = tLogPolicy = Reference<IReplicationPolicy>(new PolicyOne());
} else if(mode == "double" || mode == "fast_recovery_double") {
redundancy="2";
log_replicas="2";
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
storagePolicy = tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else if(mode == "triple" || mode == "fast_recovery_triple") {
redundancy="3";
log_replicas="3";
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
storagePolicy = tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else if(mode == "three_datacenter" || mode == "multi_dc") {
redundancy="6";
log_replicas="4";
storagePolicy = IRepPolicyRef(new PolicyAcross(3, "dcid",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
storagePolicy = Reference<IReplicationPolicy>(new PolicyAcross(3, "dcid",
Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))
));
tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "dcid",
Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))
));
} else if(mode == "three_datacenter_fallback") {
redundancy="4";
log_replicas="4";
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))));
storagePolicy = tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "dcid", Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))));
} else if(mode == "three_data_hall") {
redundancy="3";
log_replicas="4";
storagePolicy = IRepPolicyRef(new PolicyAcross(3, "data_hall", IRepPolicyRef(new PolicyOne())));
tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
storagePolicy = Reference<IReplicationPolicy>(new PolicyAcross(3, "data_hall", Reference<IReplicationPolicy>(new PolicyOne())));
tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "data_hall",
Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))
));
} else
redundancySpecified = false;
@ -154,29 +154,29 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
}
std::string remote_redundancy, remote_log_replicas;
IRepPolicyRef remoteTLogPolicy;
Reference<IReplicationPolicy> remoteTLogPolicy;
bool remoteRedundancySpecified = true;
if (mode == "remote_default") {
remote_redundancy="0";
remote_log_replicas="0";
remoteTLogPolicy = IRepPolicyRef();
remoteTLogPolicy = Reference<IReplicationPolicy>();
} else if (mode == "remote_single") {
remote_redundancy="1";
remote_log_replicas="1";
remoteTLogPolicy = IRepPolicyRef(new PolicyOne());
remoteTLogPolicy = Reference<IReplicationPolicy>(new PolicyOne());
} else if(mode == "remote_double") {
remote_redundancy="2";
remote_log_replicas="2";
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
remoteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else if(mode == "remote_triple") {
remote_redundancy="3";
remote_log_replicas="3";
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
remoteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
} else if(mode == "remote_three_data_hall") { //FIXME: not tested in simulation
remote_redundancy="3";
remote_log_replicas="4";
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
remoteTLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "data_hall",
Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))
));
} else
remoteRedundancySpecified = false;
@ -212,7 +212,7 @@ ConfigurationResult::Type buildConfiguration( std::vector<StringRef> const& mode
auto p = configKeysPrefix.toString();
if(!outConf.count(p + "storage_replication_policy") && outConf.count(p + "storage_replicas")) {
int storageCount = stoi(outConf[p + "storage_replicas"]);
IRepPolicyRef storagePolicy = IRepPolicyRef(new PolicyAcross(storageCount, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> storagePolicy = Reference<IReplicationPolicy>(new PolicyAcross(storageCount, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, storagePolicy);
outConf[p+"storage_replication_policy"] = policyWriter.toStringRef().toString();
@ -220,7 +220,7 @@ ConfigurationResult::Type buildConfiguration( std::vector<StringRef> const& mode
if(!outConf.count(p + "log_replication_policy") && outConf.count(p + "log_replicas")) {
int logCount = stoi(outConf[p + "log_replicas"]);
IRepPolicyRef logPolicy = IRepPolicyRef(new PolicyAcross(logCount, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> logPolicy = Reference<IReplicationPolicy>(new PolicyAcross(logCount, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, logPolicy);
outConf[p+"log_replication_policy"] = policyWriter.toStringRef().toString();

View File

@ -36,23 +36,23 @@ public:
virtual void delref() { ReferenceCounted<LocalitySet>::delref(); }
bool selectReplicas(
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results)
{
LocalitySetRef fromServers = LocalitySetRef::addRef(this);
Reference<LocalitySet> fromServers = Reference<LocalitySet>::addRef(this);
return policy->selectReplicas(fromServers, alsoServers, results);
}
bool selectReplicas(
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityEntry> & results)
{ return selectReplicas(policy, std::vector<LocalityEntry>(), results); }
bool validate(
IRepPolicyRef const& policy) const
Reference<IReplicationPolicy> const& policy) const
{
LocalitySetRef const solutionSet = LocalitySetRef::addRef((LocalitySet*) this);
Reference<LocalitySet> const solutionSet = Reference<LocalitySet>::addRef((LocalitySet*) this);
return policy->validate(solutionSet);
}
@ -159,7 +159,7 @@ public:
}
static void staticDisplayEntries(
LocalitySetRef const& fromServers,
Reference<LocalitySet> const& fromServers,
std::vector<LocalityEntry> const& entryArray,
const char* name = "zone")
{
@ -174,8 +174,8 @@ public:
// the specified value for the given key
// The returned LocalitySet contains the LocalityRecords that have the same value as
// the indexValue under the same indexKey (e.g., zoneid)
LocalitySetRef restrict(AttribKey indexKey, AttribValue indexValue ) {
LocalitySetRef localitySet;
Reference<LocalitySet> restrict(AttribKey indexKey, AttribValue indexValue ) {
Reference<LocalitySet> localitySet;
LocalityCacheRecord searchRecord(AttribRecord(indexKey, indexValue), localitySet);
auto itKeyValue = std::lower_bound(_cacheArray.begin(), _cacheArray.end(), searchRecord, LocalityCacheRecord::compareKeyValue);
@ -185,7 +185,7 @@ public:
localitySet = itKeyValue->_resultset;
}
else {
localitySet = LocalitySetRef(new LocalitySet(*_localitygroup));
localitySet = Reference<LocalitySet>(new LocalitySet(*_localitygroup));
_cachemisses ++;
// If the key is not within the current key set, skip it because no items within
// the current entry array has the key
@ -213,8 +213,8 @@ public:
}
// This function is used to create an subset containing the specified entries
LocalitySetRef restrict(std::vector<LocalityEntry> const& entryArray) {
LocalitySetRef localitySet(new LocalitySet(*_localitygroup));
Reference<LocalitySet> restrict(std::vector<LocalityEntry> const& entryArray) {
Reference<LocalitySet> localitySet(new LocalitySet(*_localitygroup));
for (auto& entry : entryArray) {
localitySet->add(getRecordViaEntry(entry), *this);
}
@ -453,8 +453,8 @@ protected:
// This class stores the cache record for each entry within the locality set
struct LocalityCacheRecord {
AttribRecord _attribute;
LocalitySetRef _resultset;
LocalityCacheRecord(AttribRecord const& attribute, LocalitySetRef resultset):_attribute(attribute),_resultset(resultset){}
Reference<LocalitySet> _resultset;
LocalityCacheRecord(AttribRecord const& attribute, Reference<LocalitySet> resultset):_attribute(attribute),_resultset(resultset){}
LocalityCacheRecord(LocalityCacheRecord const& source):_attribute(source._attribute),_resultset(source._resultset){}
virtual ~LocalityCacheRecord(){}
LocalityCacheRecord& operator=(LocalityCacheRecord const& source) {
@ -584,7 +584,7 @@ struct LocalityMap : public LocalityGroup {
virtual ~LocalityMap() {}
bool selectReplicas(
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& entryResults,
std::vector<V*> & results)
@ -601,7 +601,7 @@ struct LocalityMap : public LocalityGroup {
}
bool selectReplicas(
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityEntry> const& alsoServers,
std::vector<V*> & results)
{
@ -610,7 +610,7 @@ struct LocalityMap : public LocalityGroup {
}
bool selectReplicas(
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<V*> & results)
{ return selectReplicas(policy, std::vector<LocalityEntry>(), results); }

View File

@ -24,14 +24,14 @@
bool IReplicationPolicy::selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> & results )
{
return selectReplicas(fromServers, std::vector<LocalityEntry>(), results);
}
bool IReplicationPolicy::validate(
LocalitySetRef const& solutionSet ) const
Reference<LocalitySet> const& solutionSet ) const
{
return validate(solutionSet->getEntries(), solutionSet);
}
@ -40,7 +40,7 @@ bool IReplicationPolicy::validateFull(
bool solved,
std::vector<LocalityEntry> const& solutionSet,
std::vector<LocalityEntry> const& alsoServers,
LocalitySetRef const& fromServers )
Reference<LocalitySet> const& fromServers )
{
bool valid = true;
std::vector<LocalityEntry> totalSolution(solutionSet);
@ -105,7 +105,7 @@ bool IReplicationPolicy::validateFull(
}
bool PolicyOne::selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results )
{
@ -131,12 +131,12 @@ bool PolicyOne::selectReplicas(
bool PolicyOne::validate(
std::vector<LocalityEntry> const& solutionSet,
LocalitySetRef const& fromServers ) const
Reference<LocalitySet> const& fromServers ) const
{
return ((solutionSet.size() > 0) && (fromServers->size() > 0));
}
PolicyAcross::PolicyAcross(int count, std::string const& attribKey, IRepPolicyRef const policy):
PolicyAcross::PolicyAcross(int count, std::string const& attribKey, Reference<IReplicationPolicy> const policy):
_count(count),_attribKey(attribKey),_policy(policy)
{
return;
@ -150,7 +150,7 @@ PolicyAcross::~PolicyAcross()
// Debug purpose only
// Trace all record entries to help debug
// fromServers is the servers locality to be printed out.
void IReplicationPolicy::traceLocalityRecords(LocalitySetRef const& fromServers) {
void IReplicationPolicy::traceLocalityRecords(Reference<LocalitySet> const& fromServers) {
std::vector<Reference<LocalityRecord>> const& recordArray = fromServers->getRecordArray();
TraceEvent("LocalityRecordArray").detail("Size", recordArray.size());
for (auto& record : recordArray) {
@ -158,7 +158,7 @@ void IReplicationPolicy::traceLocalityRecords(LocalitySetRef const& fromServers)
}
}
void IReplicationPolicy::traceOneLocalityRecord(Reference<LocalityRecord> record, LocalitySetRef const& fromServers) {
void IReplicationPolicy::traceOneLocalityRecord(Reference<LocalityRecord> record, Reference<LocalitySet> const& fromServers) {
int localityEntryIndex = record->_entryIndex._id;
Reference<KeyValueMap> const& dataMap = record->_dataMap;
std::vector<AttribRecord> const& keyValueArray = dataMap->_keyvaluearray;
@ -185,7 +185,7 @@ void IReplicationPolicy::traceOneLocalityRecord(Reference<LocalityRecord> record
// return true if the team satisfies the policy; false otherwise
bool PolicyAcross::validate(
std::vector<LocalityEntry> const& solutionSet,
LocalitySetRef const& fromServers ) const
Reference<LocalitySet> const& fromServers ) const
{
bool valid = true;
int count = 0;
@ -262,7 +262,7 @@ bool PolicyAcross::validate(
// that should be excluded from being selected as replicas.
// FIXME: Simplify this function, such as removing unnecessary printf
bool PolicyAcross::selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results )
{
@ -437,7 +437,7 @@ bool PolicyAcross::selectReplicas(
bool PolicyAnd::validate(
std::vector<LocalityEntry> const& solutionSet,
LocalitySetRef const& fromServers ) const
Reference<LocalitySet> const& fromServers ) const
{
bool valid = true;
for (auto& policy : _policies) {
@ -450,7 +450,7 @@ bool PolicyAnd::validate(
}
bool PolicyAnd::selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results )
{
@ -486,26 +486,26 @@ bool PolicyAnd::selectReplicas(
return passed;
}
void testPolicySerialization(IRepPolicyRef& policy) {
void testPolicySerialization(Reference<IReplicationPolicy>& policy) {
std::string policyInfo = policy->info();
BinaryWriter writer(IncludeVersion());
serializeReplicationPolicy(writer, policy);
BinaryReader reader(writer.getData(), writer.getLength(), IncludeVersion());
IRepPolicyRef copy;
Reference<IReplicationPolicy> copy;
serializeReplicationPolicy(reader, copy);
ASSERT(policy->info() == copy->info());
}
void testReplicationPolicy(int nTests) {
IRepPolicyRef policy = IRepPolicyRef(new PolicyAcross(1, "data_hall", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(1, "data_hall", Reference<IReplicationPolicy>(new PolicyOne())));
testPolicySerialization(policy);
policy = IRepPolicyRef(new PolicyAnd({
IRepPolicyRef(new PolicyAcross(2, "data_center", IRepPolicyRef(new PolicyAcross(3, "rack", IRepPolicyRef(new PolicyOne()))))),
IRepPolicyRef(new PolicyAcross(2, "data_center", IRepPolicyRef(new PolicyAcross(2, "data_hall", IRepPolicyRef(new PolicyOne())))))
policy = Reference<IReplicationPolicy>(new PolicyAnd({
Reference<IReplicationPolicy>(new PolicyAcross(2, "data_center", Reference<IReplicationPolicy>(new PolicyAcross(3, "rack", Reference<IReplicationPolicy>(new PolicyOne()))))),
Reference<IReplicationPolicy>(new PolicyAcross(2, "data_center", Reference<IReplicationPolicy>(new PolicyAcross(2, "data_hall", Reference<IReplicationPolicy>(new PolicyOne())))))
}));
testPolicySerialization(policy);

View File

@ -26,7 +26,7 @@
#include "fdbrpc/ReplicationTypes.h"
template <class Ar>
void serializeReplicationPolicy(Ar& ar, IRepPolicyRef& policy);
void serializeReplicationPolicy(Ar& ar, Reference<IReplicationPolicy>& policy);
extern void testReplicationPolicy(int nTests);
@ -40,36 +40,36 @@ struct IReplicationPolicy : public ReferenceCounted<IReplicationPolicy> {
virtual int maxResults() const = 0;
virtual int depth() const = 0;
virtual bool selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results ) = 0;
virtual void traceLocalityRecords(LocalitySetRef const& fromServers);
virtual void traceOneLocalityRecord(Reference<LocalityRecord> record, LocalitySetRef const& fromServers);
virtual void traceLocalityRecords(Reference<LocalitySet> const& fromServers);
virtual void traceOneLocalityRecord(Reference<LocalityRecord> record, Reference<LocalitySet> const& fromServers);
virtual bool validate(
std::vector<LocalityEntry> const& solutionSet,
LocalitySetRef const& fromServers ) const = 0;
Reference<LocalitySet> const& fromServers ) const = 0;
bool operator == ( const IReplicationPolicy& r ) const { return info() == r.info(); }
bool operator != ( const IReplicationPolicy& r ) const { return info() != r.info(); }
template <class Ar>
void serialize(Ar& ar) {
IRepPolicyRef refThis(this);
Reference<IReplicationPolicy> refThis(this);
serializeReplicationPolicy(ar, refThis);
refThis->delref_no_destroy();
}
// Utility functions
bool selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> & results );
bool validate(
LocalitySetRef const& solutionSet ) const;
Reference<LocalitySet> const& solutionSet ) const;
bool validateFull(
bool solved,
std::vector<LocalityEntry> const& solutionSet,
std::vector<LocalityEntry> const& alsoServers,
LocalitySetRef const& fromServers );
Reference<LocalitySet> const& fromServers );
// Returns a set of the attributes that this policy uses in selection and validation.
std::set<std::string> attributeKeys() const
@ -78,7 +78,7 @@ struct IReplicationPolicy : public ReferenceCounted<IReplicationPolicy> {
};
template <class Archive>
inline void load( Archive& ar, IRepPolicyRef& value ) {
inline void load( Archive& ar, Reference<IReplicationPolicy>& value ) {
bool present = (value.getPtr());
ar >> present;
if (present) {
@ -90,11 +90,11 @@ inline void load( Archive& ar, IRepPolicyRef& value ) {
}
template <class Archive>
inline void save( Archive& ar, const IRepPolicyRef& value ) {
inline void save( Archive& ar, const Reference<IReplicationPolicy>& value ) {
bool present = (value.getPtr());
ar << present;
if (present) {
serializeReplicationPolicy(ar, (IRepPolicyRef&) value);
serializeReplicationPolicy(ar, (Reference<IReplicationPolicy>&) value);
}
}
@ -107,9 +107,9 @@ struct PolicyOne : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
virtual int depth() const { return 1; }
virtual bool validate(
std::vector<LocalityEntry> const& solutionSet,
LocalitySetRef const& fromServers ) const;
Reference<LocalitySet> const& fromServers ) const;
virtual bool selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results );
template <class Ar>
@ -119,7 +119,7 @@ struct PolicyOne : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
};
struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross> {
PolicyAcross(int count, std::string const& attribKey, IRepPolicyRef const policy);
PolicyAcross(int count, std::string const& attribKey, Reference<IReplicationPolicy> const policy);
virtual ~PolicyAcross();
virtual std::string name() const { return "Across"; }
virtual std::string info() const
@ -128,9 +128,9 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross>
virtual int depth() const { return 1 + _policy->depth(); }
virtual bool validate(
std::vector<LocalityEntry> const& solutionSet,
LocalitySetRef const& fromServers ) const;
Reference<LocalitySet> const& fromServers ) const;
virtual bool selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results );
@ -149,18 +149,18 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross>
protected:
int _count;
std::string _attribKey;
IRepPolicyRef _policy;
Reference<IReplicationPolicy> _policy;
// Cache temporary members
std::vector<AttribValue> _usedValues;
std::vector<LocalityEntry> _newResults;
LocalitySetRef _selected;
Reference<LocalitySet> _selected;
VectorRef<std::pair<int,int>> _addedResults;
Arena _arena;
};
struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
PolicyAnd(std::vector<IRepPolicyRef> policies): _policies(policies), _sortedPolicies(policies)
PolicyAnd(std::vector<Reference<IReplicationPolicy>> policies): _policies(policies), _sortedPolicies(policies)
{
// Sort the policy array
std::sort(_sortedPolicies.begin(), _sortedPolicies.end(), PolicyAnd::comparePolicy);
@ -194,14 +194,14 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
}
virtual bool validate(
std::vector<LocalityEntry> const& solutionSet,
LocalitySetRef const& fromServers ) const;
Reference<LocalitySet> const& fromServers ) const;
virtual bool selectReplicas(
LocalitySetRef & fromServers,
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry> & results );
static bool comparePolicy(const IRepPolicyRef& rhs, const IRepPolicyRef& lhs)
static bool comparePolicy(const Reference<IReplicationPolicy>& rhs, const Reference<IReplicationPolicy>& lhs)
{ return (lhs->maxResults() < rhs->maxResults()) || (!(rhs->maxResults() < lhs->maxResults()) && (lhs->depth() < rhs->depth())); }
template <class Ar>
@ -219,18 +219,18 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
}
virtual void attributeKeys(std::set<std::string> *set) const override
{ for (const IRepPolicyRef& r : _policies) { r->attributeKeys(set); } }
{ for (const Reference<IReplicationPolicy>& r : _policies) { r->attributeKeys(set); } }
protected:
std::vector<IRepPolicyRef> _policies;
std::vector<IRepPolicyRef> _sortedPolicies;
std::vector<Reference<IReplicationPolicy>> _policies;
std::vector<Reference<IReplicationPolicy>> _sortedPolicies;
};
extern int testReplication();
template <class Ar>
void serializeReplicationPolicy(Ar& ar, IRepPolicyRef& policy) {
void serializeReplicationPolicy(Ar& ar, Reference<IReplicationPolicy>& policy) {
if(Ar::isDeserializing) {
StringRef name;
serializer(ar, name);
@ -238,20 +238,20 @@ void serializeReplicationPolicy(Ar& ar, IRepPolicyRef& policy) {
if(name == LiteralStringRef("One")) {
PolicyOne* pointer = new PolicyOne();
pointer->serialize(ar);
policy = IRepPolicyRef(pointer);
policy = Reference<IReplicationPolicy>(pointer);
}
else if(name == LiteralStringRef("Across")) {
PolicyAcross* pointer = new PolicyAcross(0, "", IRepPolicyRef());
PolicyAcross* pointer = new PolicyAcross(0, "", Reference<IReplicationPolicy>());
pointer->serialize(ar);
policy = IRepPolicyRef(pointer);
policy = Reference<IReplicationPolicy>(pointer);
}
else if(name == LiteralStringRef("And")) {
PolicyAnd* pointer = new PolicyAnd({});
pointer->serialize(ar);
policy = IRepPolicyRef(pointer);
policy = Reference<IReplicationPolicy>(pointer);
}
else if(name == LiteralStringRef("None")) {
policy = IRepPolicyRef();
policy = Reference<IReplicationPolicy>();
}
else {
TraceEvent(SevError, "SerializingInvalidPolicyType")

View File

@ -34,9 +34,6 @@ struct LocalityRecord;
struct StringToIntMap;
struct IReplicationPolicy;
typedef Reference<LocalitySet> LocalitySetRef;
typedef Reference<IReplicationPolicy> IRepPolicyRef;
extern int g_replicationdebug;
struct AttribKey {

View File

@ -27,8 +27,8 @@
double ratePolicy(
LocalitySetRef & localitySet,
IRepPolicyRef const& policy,
Reference<LocalitySet> & localitySet,
Reference<IReplicationPolicy> const& policy,
unsigned int nTestTotal)
{
double rating = -1.0;
@ -85,14 +85,14 @@ double ratePolicy(
bool findBestPolicySet(
std::vector<LocalityEntry>& bestResults,
LocalitySetRef & localitySet,
IRepPolicyRef const& policy,
Reference<LocalitySet> & localitySet,
Reference<IReplicationPolicy> const& policy,
unsigned int nMinItems,
unsigned int nSelectTests,
unsigned int nPolicyTests)
{
bool bSucceeded = true;
LocalitySetRef bestLocalitySet, testLocalitySet;
Reference<LocalitySet> bestLocalitySet, testLocalitySet;
std::vector<LocalityEntry> results;
double testRate, bestRate = -1.0;
@ -162,15 +162,15 @@ bool findBestPolicySet(
bool findBestUniquePolicySet(
std::vector<LocalityEntry>& bestResults,
LocalitySetRef & localitySet,
IRepPolicyRef const& policy,
Reference<LocalitySet> & localitySet,
Reference<IReplicationPolicy> const& policy,
StringRef localityUniquenessKey,
unsigned int nMinItems,
unsigned int nSelectTests,
unsigned int nPolicyTests)
{
bool bSucceeded = true;
LocalitySetRef bestLocalitySet, testLocalitySet;
Reference<LocalitySet> bestLocalitySet, testLocalitySet;
std::vector<LocalityEntry> results;
double testRate, bestRate = -1.0;
@ -262,7 +262,7 @@ bool findBestUniquePolicySet(
bool validateAllCombinations(
std::vector<LocalityData> & offendingCombo,
LocalityGroup const& localitySet,
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityData> const& newItems,
unsigned int nCombinationSize,
bool bCheckIfValid)
@ -286,25 +286,39 @@ bool validateAllCombinations(
}
else
{
bool bIsValidGroup;
LocalityGroup localityGroup;
bool bIsValidGroup;
Reference<LocalitySet> localSet = Reference<LocalitySet>( new LocalityGroup() );
LocalityGroup* localGroup = (LocalityGroup*) localSet.getPtr();
localGroup->deep_copy(localitySet);
std::vector<LocalityEntry> localityGroupEntries = localGroup->getEntries();
int originalSize = localityGroupEntries.size();
for (int i = 0; i < newItems.size(); ++i) {
localGroup->add(newItems[i]);
}
std::string bitmask(nCombinationSize, 1); // K leading 1's
bitmask.resize(newItems.size(), 0); // N-K trailing 0's
std::vector<LocalityEntry> resultEntries;
do
{
localityGroup.deep_copy(localitySet);
localityGroupEntries.resize(originalSize);
// [0..N-1] integers
for (int i = 0; i < newItems.size(); ++i) {
for (int i = 0; i < bitmask.size(); ++i) {
if (bitmask[i]) {
localityGroup.add(newItems[i]);
localityGroupEntries.push_back(localGroup->getEntry(originalSize + i));
}
}
// Check if the group combination passes validation
bIsValidGroup = localityGroup.validate(policy);
resultEntries.clear();
// Run the policy, assert if unable to satisfy
bool result = localSet->selectReplicas(policy, localityGroupEntries, resultEntries);
ASSERT(result);
bIsValidGroup = resultEntries.size() == 0;
if (((bCheckIfValid) &&
(!bIsValidGroup) ) ||
@ -319,7 +333,7 @@ bool validateAllCombinations(
}
if (g_replicationdebug > 2) {
printf("Invalid group\n");
localityGroup.DisplayEntries();
localGroup->DisplayEntries();
}
if (g_replicationdebug > 3) {
printf("Full set\n");
@ -337,7 +351,7 @@ bool validateAllCombinations(
bool validateAllCombinations(
LocalityGroup const& localitySet,
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityData> const& newItems,
unsigned int nCombinationSize,
bool bCheckIfValid)
@ -358,10 +372,10 @@ repTestType convertToTestType(int iValue) {
return sValue;
}
LocalitySetRef createTestLocalityMap(std::vector<repTestType>& indexes, int dcTotal,
Reference<LocalitySet> createTestLocalityMap(std::vector<repTestType>& indexes, int dcTotal,
int szTotal, int rackTotal, int slotTotal, int independentItems, int independentTotal)
{
LocalitySetRef buildServer(new LocalityMap<repTestType>());
Reference<LocalitySet> buildServer(new LocalityMap<repTestType>());
LocalityMap<repTestType>* serverMap = (LocalityMap<repTestType>*) buildServer.getPtr();
int serverValue, dcLoop, szLoop, rackLoop, slotLoop;
std::string dcText, szText, rackText, slotText, independentName, independentText;
@ -442,8 +456,8 @@ LocalitySetRef createTestLocalityMap(std::vector<repTestType>& indexes, int dcTo
}
bool testPolicy(
LocalitySetRef servers,
IRepPolicyRef const& policy,
Reference<LocalitySet> servers,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityEntry> const& including,
bool validate)
{
@ -506,109 +520,109 @@ bool testPolicy(
}
bool testPolicy(
LocalitySetRef servers,
IRepPolicyRef const& policy,
Reference<LocalitySet> servers,
Reference<IReplicationPolicy> const& policy,
bool validate)
{
return testPolicy(servers, policy, emptyEntryArray, validate);
}
std::vector<IRepPolicyRef> const& getStaticPolicies()
std::vector<Reference<IReplicationPolicy>> const& getStaticPolicies()
{
static std::vector<IRepPolicyRef> staticPolicies;
static std::vector<Reference<IReplicationPolicy>> staticPolicies;
if (staticPolicies.empty())
{
staticPolicies = {
IRepPolicyRef( new PolicyOne() ),
Reference<IReplicationPolicy>( new PolicyOne() ),
// 1 'dc^2 x 1'
IRepPolicyRef( new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyOne() ) ) ),
Reference<IReplicationPolicy>( new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyOne() ) ) ),
// 2 'dc^3 x 1'
IRepPolicyRef( new PolicyAcross(3, "dc", IRepPolicyRef( new PolicyOne() ) ) ),
Reference<IReplicationPolicy>( new PolicyAcross(3, "dc", Reference<IReplicationPolicy>( new PolicyOne() ) ) ),
// 3 'sz^3 x 1'
IRepPolicyRef( new PolicyAcross(3, "sz", IRepPolicyRef( new PolicyOne() ) ) ),
Reference<IReplicationPolicy>( new PolicyAcross(3, "sz", Reference<IReplicationPolicy>( new PolicyOne() ) ) ),
// 4 'dc^1 x az^3 x 1'
IRepPolicyRef( new PolicyAcross(1, "dc", IRepPolicyRef( new PolicyAcross(3, "az", IRepPolicyRef( new PolicyOne() ))) ) ),
Reference<IReplicationPolicy>( new PolicyAcross(1, "dc", Reference<IReplicationPolicy>( new PolicyAcross(3, "az", Reference<IReplicationPolicy>( new PolicyOne() ))) ) ),
// 5 '(sz^3 x rack^2 x 1) + (dc^2 x az^3 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(3, "sz", IRepPolicyRef(new PolicyAcross(2, "rack", IRepPolicyRef(new PolicyOne() ))))), IRepPolicyRef(new PolicyAcross(2, "dc", IRepPolicyRef(new PolicyAcross(3, "az", IRepPolicyRef(new PolicyOne()) ))) )} ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(3, "sz", Reference<IReplicationPolicy>(new PolicyAcross(2, "rack", Reference<IReplicationPolicy>(new PolicyOne() ))))), Reference<IReplicationPolicy>(new PolicyAcross(2, "dc", Reference<IReplicationPolicy>(new PolicyAcross(3, "az", Reference<IReplicationPolicy>(new PolicyOne()) ))) )} ) ),
// 6 '(sz^1 x 1)'
IRepPolicyRef( new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne())) ),
Reference<IReplicationPolicy>( new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne())) ),
// 7 '(sz^1 x 1) + (sz^1 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))) } ) ),
// 8 '(sz^2 x 1) + (sz^2 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))) } ) ),
// 9 '(dc^1 x sz^2 x 1)'
IRepPolicyRef( new PolicyAcross(1, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))),
Reference<IReplicationPolicy>( new PolicyAcross(1, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))),
//10 '(dc^2 x sz^2 x 1)'
IRepPolicyRef( new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))),
Reference<IReplicationPolicy>( new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))),
//11 '(dc^1 x sz^2 x 1) + (dc^2 x sz^2 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(1, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))), IRepPolicyRef(new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(1, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))), Reference<IReplicationPolicy>(new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))) } ) ),
//12 '(dc^2 x sz^2 x 1) + (dc^1 x sz^2 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))), IRepPolicyRef(new PolicyAcross(1, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))), Reference<IReplicationPolicy>(new PolicyAcross(1, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))) } ) ),
//13 '(sz^2 x 1) + (dc^1 x sz^2 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(1, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(1, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))) } ) ),
//14 '(sz^2 x 1) + (dc^2 x sz^2 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))) } ) ),
//15 '(sz^3 x 1) + (dc^2 x sz^2 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(3, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(3, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))))) } ) ),
//16 '(sz^1 x 1) + (sz^2 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))) } ) ),
//17 '(sz^2 x 1) + (sz^3 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(3, "sz", IRepPolicyRef(new PolicyOne()))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(3, "sz", Reference<IReplicationPolicy>(new PolicyOne()))) } ) ),
//18 '(sz^1 x 1) + (sz^2 x 1) + (sz^3 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(3, "sz", IRepPolicyRef(new PolicyOne()))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(3, "sz", Reference<IReplicationPolicy>(new PolicyOne()))) } ) ),
//19 '(sz^1 x 1) + (machine^1 x 1)'
IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(1, "zoneid", IRepPolicyRef(new PolicyOne()))) } ) ),
Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(1, "zoneid", Reference<IReplicationPolicy>(new PolicyOne()))) } ) ),
// '(dc^1 x 1) + (sz^1 x 1) + (machine^1 x 1)'
// IRepPolicyRef( new PolicyAnd( { IRepPolicyRef(new PolicyAcross(1, "dc", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(1, "zoneid", IRepPolicyRef(new PolicyOne()))) } ) ),
// Reference<IReplicationPolicy>( new PolicyAnd( { Reference<IReplicationPolicy>(new PolicyAcross(1, "dc", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(1, "zoneid", Reference<IReplicationPolicy>(new PolicyOne()))) } ) ),
// '(dc^1 x sz^3 x 1)'
IRepPolicyRef( new PolicyAcross(1, "dc", IRepPolicyRef( new PolicyAcross(3, "sz", IRepPolicyRef(new PolicyOne())))) ),
Reference<IReplicationPolicy>( new PolicyAcross(1, "dc", Reference<IReplicationPolicy>( new PolicyAcross(3, "sz", Reference<IReplicationPolicy>(new PolicyOne())))) ),
// '(dc^2 x sz^3 x 1)'
IRepPolicyRef( new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(3, "sz", IRepPolicyRef(new PolicyOne())))) ),
Reference<IReplicationPolicy>( new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(3, "sz", Reference<IReplicationPolicy>(new PolicyOne())))) ),
// '(dc^2 x az^3 x 1)'
IRepPolicyRef( new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(3, "az", IRepPolicyRef(new PolicyOne())))) ),
Reference<IReplicationPolicy>( new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(3, "az", Reference<IReplicationPolicy>(new PolicyOne())))) ),
// '(sz^1 x 1) + (dc^2 x az^3 x 1)'
IRepPolicyRef( new PolicyAnd({IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(2, "dc", IRepPolicyRef( new PolicyAcross(3, "az", IRepPolicyRef(new PolicyOne())))))}) ),
Reference<IReplicationPolicy>( new PolicyAnd({Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(2, "dc", Reference<IReplicationPolicy>( new PolicyAcross(3, "az", Reference<IReplicationPolicy>(new PolicyOne())))))}) ),
// 'dc^1 x (az^2 x 1) + (sz^2 x 1)'
// IRepPolicyRef( new PolicyAcross(1, "dc", IRepPolicyRef(new PolicyAnd({IRepPolicyRef(new PolicyAcross(2, "az", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(2, "sz", IRepPolicyRef(new PolicyOne())))}))) ),
// Reference<IReplicationPolicy>( new PolicyAcross(1, "dc", Reference<IReplicationPolicy>(new PolicyAnd({Reference<IReplicationPolicy>(new PolicyAcross(2, "az", Reference<IReplicationPolicy>(new PolicyOne()))), Reference<IReplicationPolicy>(new PolicyAcross(2, "sz", Reference<IReplicationPolicy>(new PolicyOne())))}))) ),
// Require backtracking
IRepPolicyRef( new PolicyAcross(8, "zoneid", IRepPolicyRef(new PolicyAcross(1, "az", IRepPolicyRef(new PolicyOne()))) ) ),
IRepPolicyRef( new PolicyAcross(8, "zoneid", IRepPolicyRef(new PolicyAcross(1, "sz", IRepPolicyRef(new PolicyOne()))) ) )
Reference<IReplicationPolicy>( new PolicyAcross(8, "zoneid", Reference<IReplicationPolicy>(new PolicyAcross(1, "az", Reference<IReplicationPolicy>(new PolicyOne()))) ) ),
Reference<IReplicationPolicy>( new PolicyAcross(8, "zoneid", Reference<IReplicationPolicy>(new PolicyAcross(1, "sz", Reference<IReplicationPolicy>(new PolicyOne()))) ) )
};
}
return staticPolicies;
}
IRepPolicyRef const randomAcrossPolicy(LocalitySet const& serverSet)
Reference<IReplicationPolicy> const randomAcrossPolicy(LocalitySet const& serverSet)
{
int usedKeyTotal, keysUsed, keyIndex, valueTotal, maxValueTotal, maxKeyTotal, skips, lastKeyIndex;
std::vector<std::string> keyArray(serverSet.getGroupKeyMap()->_lookuparray);
@ -616,7 +630,7 @@ IRepPolicyRef const randomAcrossPolicy(LocalitySet const& serverSet)
AttribKey indexKey;
Optional<AttribValue> keyValue;
std::string keyText;
IRepPolicyRef policy(new PolicyOne());
Reference<IReplicationPolicy> policy(new PolicyOne());
// Determine the number of keys to used within the policy
usedKeyTotal = g_random->randomInt(1, keyArray.size()+1);
@ -669,7 +683,7 @@ IRepPolicyRef const randomAcrossPolicy(LocalitySet const& serverSet)
}
valueTotal = g_random->randomInt(1, valueSet.size()+2);
if ((valueTotal > maxValueTotal) && (g_random->random01() > .25)) valueTotal = maxValueTotal;
policy = IRepPolicyRef( new PolicyAcross(valueTotal, keyText, policy) );
policy = Reference<IReplicationPolicy>( new PolicyAcross(valueTotal, keyText, policy) );
if (g_replicationdebug > 1) {
printf(" item%3d: (%3d =>%3d) %-10s =>%4d\n", keysUsed+1, keyIndex, indexKey._id, keyText.c_str(), valueTotal);
}
@ -725,8 +739,8 @@ int testReplication()
int policyMin = policyMinEnv ? atoi(policyMinEnv) : 2;
int policyIndex, testCounter, alsoSize, debugBackup, maxAlsoSize;
std::vector<repTestType> serverIndexes;
LocalitySetRef testServers;
std::vector<IRepPolicyRef> policies;
Reference<LocalitySet> testServers;
std::vector<Reference<IReplicationPolicy>> policies;
std::vector<LocalityEntry> alsoServers, bestSet;
int totalErrors = 0;
@ -819,12 +833,12 @@ void filterLocalityDataForPolicy(const std::set<std::string>& keys, LocalityData
}
}
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld) {
void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, LocalityData* ld) {
if (!policy) return;
filterLocalityDataForPolicy(policy->attributeKeys(), ld);
}
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld) {
void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, std::vector<LocalityData>* vld) {
if (!policy) return;
std::set<std::string> keys = policy->attributeKeys();
for (LocalityData& ld : *vld) {

View File

@ -34,22 +34,22 @@ extern repTestType convertToTestType(int iValue);
extern int testReplication();
extern double ratePolicy(
LocalitySetRef & localitySet,
IRepPolicyRef const& policy,
Reference<LocalitySet> & localitySet,
Reference<IReplicationPolicy> const& policy,
unsigned int nSelectTests);
extern bool findBestPolicySet(
std::vector<LocalityEntry>& bestResults,
LocalitySetRef & localitySet,
IRepPolicyRef const& policy,
Reference<LocalitySet> & localitySet,
Reference<IReplicationPolicy> const& policy,
unsigned int nMinItems,
unsigned int nSelectTests,
unsigned int nPolicyTests);
extern bool findBestUniquePolicySet(
std::vector<LocalityEntry>& bestResults,
LocalitySetRef & localitySet,
IRepPolicyRef const& policy,
Reference<LocalitySet> & localitySet,
Reference<IReplicationPolicy> const& policy,
StringRef localityUniquenessKey,
unsigned int nMinItems,
unsigned int nSelectTests,
@ -60,20 +60,20 @@ extern bool findBestUniquePolicySet(
extern bool validateAllCombinations(
std::vector<LocalityData> & offendingCombo,
LocalityGroup const& localitySet,
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityData> const& newItems,
unsigned int nCombinationSize,
bool bCheckIfValid = true);
extern bool validateAllCombinations(
LocalityGroup const& localitySet,
IRepPolicyRef const& policy,
Reference<IReplicationPolicy> const& policy,
std::vector<LocalityData> const& newItems,
unsigned int nCombinationSize,
bool bCheckIfValid = true);
/// Remove all pieces of locality information from the LocalityData that will not be used when validating the policy.
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld);
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld);
void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, LocalityData* ld);
void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, std::vector<LocalityData>* vld);
#endif

View File

@ -280,11 +280,11 @@ public:
std::set<NetworkAddress> protectedAddresses;
std::map<NetworkAddress, ProcessInfo*> currentlyRebootingProcesses;
class ClusterConnectionString* extraDB;
IRepPolicyRef storagePolicy;
IRepPolicyRef tLogPolicy;
Reference<IReplicationPolicy> storagePolicy;
Reference<IReplicationPolicy> tLogPolicy;
int32_t tLogWriteAntiQuorum;
Optional<Standalone<StringRef>> primaryDcId;
IRepPolicyRef remoteTLogPolicy;
Reference<IReplicationPolicy> remoteTLogPolicy;
int32_t usableRegions;
std::string disablePrimary;
std::string disableRemote;
@ -292,8 +292,8 @@ public:
bool allowLogSetKills;
Optional<Standalone<StringRef>> remoteDcId;
bool hasSatelliteReplication;
IRepPolicyRef satelliteTLogPolicy;
IRepPolicyRef satelliteTLogPolicyFallback;
Reference<IReplicationPolicy> satelliteTLogPolicy;
Reference<IReplicationPolicy> satelliteTLogPolicyFallback;
int32_t satelliteTLogWriteAntiQuorum;
int32_t satelliteTLogWriteAntiQuorumFallback;
std::vector<Optional<Standalone<StringRef>>> primarySatelliteDcIds;

View File

@ -234,10 +234,10 @@ public:
throw no_more_servers();
}
std::vector<WorkerDetails> getWorkersForSeedServers( DatabaseConfiguration const& conf, IRepPolicyRef const& policy, Optional<Optional<Standalone<StringRef>>> const& dcId = Optional<Optional<Standalone<StringRef>>>() ) {
std::vector<WorkerDetails> getWorkersForSeedServers( DatabaseConfiguration const& conf, Reference<IReplicationPolicy> const& policy, Optional<Optional<Standalone<StringRef>>> const& dcId = Optional<Optional<Standalone<StringRef>>>() ) {
std::map<ProcessClass::Fitness, vector<WorkerDetails>> fitness_workers;
std::vector<WorkerDetails> results;
LocalitySetRef logServerSet = Reference<LocalitySet>(new LocalityMap<WorkerDetails>());
Reference<LocalitySet> logServerSet = Reference<LocalitySet>(new LocalityMap<WorkerDetails>());
LocalityMap<WorkerDetails>* logServerMap = (LocalityMap<WorkerDetails>*) logServerSet.getPtr();
bool bCompleted = false;
@ -275,11 +275,11 @@ public:
return results;
}
std::vector<WorkerDetails> getWorkersForTlogs( DatabaseConfiguration const& conf, int32_t required, int32_t desired, IRepPolicyRef const& policy, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>() ) {
std::vector<WorkerDetails> getWorkersForTlogs( DatabaseConfiguration const& conf, int32_t required, int32_t desired, Reference<IReplicationPolicy> const& policy, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>() ) {
std::map<std::pair<ProcessClass::Fitness,bool>, vector<WorkerDetails>> fitness_workers;
std::vector<WorkerDetails> results;
std::vector<LocalityData> unavailableLocals;
LocalitySetRef logServerSet;
Reference<LocalitySet> logServerSet;
LocalityMap<WorkerDetails>* logServerMap;
bool bCompleted = false;

View File

@ -41,7 +41,7 @@ struct CoreTLogSet {
int32_t tLogWriteAntiQuorum; // The write anti quorum previously used to write to tLogs, which might be different from the anti quorum suggested by the current configuration going forward!
int32_t tLogReplicationFactor; // The replication factor previously used to write to tLogs, which might be different from the current configuration
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
Reference<IReplicationPolicy> tLogPolicy;
bool isLocal;
int8_t locality;
Version startVersion;

View File

@ -3698,7 +3698,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
return Void();
}
DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int processCount) {
DDTeamCollection* testTeamCollection(int teamSize, Reference<IReplicationPolicy> policy, int processCount) {
Database database = DatabaseContext::create(
Reference<AsyncVar<ClientDBInfo>>(new AsyncVar<ClientDBInfo>()),
Never(),
@ -3740,7 +3740,7 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
return collection;
}
DDTeamCollection* testMachineTeamCollection(int teamSize, IRepPolicyRef policy, int processCount) {
DDTeamCollection* testMachineTeamCollection(int teamSize, Reference<IReplicationPolicy> policy, int processCount) {
Database database = DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>>(new AsyncVar<ClientDBInfo>()),
Never(), LocalityData(), false);
@ -3792,7 +3792,7 @@ TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") {
int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
IRepPolicyRef policy = IRepPolicyRef(new PolicyAcross(teamSize, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(teamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state DDTeamCollection* collection = testMachineTeamCollection(teamSize, policy, processSize);
int result = collection->addTeamsBestOf(30, desiredTeams, maxTeams);
@ -3812,7 +3812,7 @@ TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
IRepPolicyRef policy = IRepPolicyRef(new PolicyAcross(teamSize, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(teamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state DDTeamCollection* collection = testMachineTeamCollection(teamSize, policy, processSize);
if (collection == NULL) {
@ -3830,7 +3830,7 @@ TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
}
TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
IRepPolicyRef policy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 10;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
@ -3849,7 +3849,7 @@ TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
}
TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
IRepPolicyRef policy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 10;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
@ -3867,7 +3867,7 @@ TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
wait(Future<Void>(Void()));
IRepPolicyRef policy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 10;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
@ -3897,7 +3897,7 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
wait(Future<Void>(Void()));
IRepPolicyRef policy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state int processSize = 5;
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;

View File

@ -40,8 +40,8 @@ public:
int32_t tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
TLogVersion tLogVersion;
IRepPolicyRef tLogPolicy;
LocalitySetRef logServerSet;
Reference<IReplicationPolicy> tLogPolicy;
Reference<LocalitySet> logServerSet;
std::vector<int> logIndexArray;
std::vector<LocalityEntry> logEntryArray;
bool isLocal;
@ -84,7 +84,7 @@ public:
used_servers.insert(std::make_pair(0,i));
}
LocalitySetRef serverSet = Reference<LocalitySet>(new LocalityMap<std::pair<int,int>>());
Reference<LocalitySet> serverSet = Reference<LocalitySet>(new LocalityMap<std::pair<int,int>>());
LocalityMap<std::pair<int,int>>* serverMap = (LocalityMap<std::pair<int,int>>*) serverSet.getPtr();
std::vector<std::pair<int,int>> resultPairs;
for(int loc = 0; loc < satelliteTagLocations.size(); loc++) {
@ -189,7 +189,7 @@ public:
void updateLocalitySet( vector<LocalityData> const& localities ) {
LocalityMap<int>* logServerMap;
logServerSet = LocalitySetRef(new LocalityMap<int>());
logServerSet = Reference<LocalitySet>(new LocalityMap<int>());
logServerMap = (LocalityMap<int>*) logServerSet.getPtr();
logEntryArray.clear();
@ -412,7 +412,7 @@ struct ILogSystem {
int tLogReplicationFactor;
MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin );
MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector<LocalityData> const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor );
MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector<LocalityData> const& tLogLocalities, Reference<IReplicationPolicy> const tLogPolicy, int tLogReplicationFactor );
MergedPeekCursor( vector< Reference<IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, Reference<LogSet> logSet, int tLogReplicationFactor );
virtual Reference<IPeekCursor> cloneNoMore();

View File

@ -61,7 +61,7 @@ struct TLogSet {
int32_t tLogWriteAntiQuorum, tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
TLogVersion tLogVersion;
IRepPolicyRef tLogPolicy;
Reference<IReplicationPolicy> tLogPolicy;
bool isLocal;
int8_t locality;
Version startVersion;

View File

@ -273,7 +273,7 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IP
}
ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end,
bool parallelGetMore, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
bool parallelGetMore, std::vector< LocalityData > const& tLogLocalities, Reference<IReplicationPolicy> const tLogPolicy, int tLogReplicationFactor )
: bestServer(bestServer), readQuorum(readQuorum), tag(tag), currentCursor(0), hasNextMessage(false), messageVersion(begin), randomID(g_random->randomUniqueID()), tLogReplicationFactor(tLogReplicationFactor) {
if(tLogPolicy) {
logSet = Reference<LogSet>( new LogSet() );

View File

@ -531,12 +531,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(begin >= lastBegin) {
TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) );
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) ) );
int i = 0;
while(begin < lastBegin) {
if(i == oldLogData.size()) {
@ -565,7 +565,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
.detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion);
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
thisBegin, lastBegin, false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
thisBegin, lastBegin, false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) ) );
epochEnds.push_back(LogMessageVersion(lastBegin));
lastBegin = thisBegin;
}
@ -959,24 +959,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
wait( quorum( alive, std::min(logSet->tLogReplicationFactor, numPresent - logSet->tLogWriteAntiQuorum) ) );
state Reference<LocalityGroup> locked(new LocalityGroup());
state std::vector<bool> responded(alive.size());
for (int i = 0; i < alive.size(); i++) {
responded[i] = false;
}
state std::vector<LocalityEntry> aliveEntries;
state std::vector<bool> responded(alive.size(), false);
loop {
for (int i = 0; i < alive.size(); i++) {
if (!responded[i] && alive[i].isReady() && !alive[i].isError()) {
locked->add(logSet->tLogLocalities[i]);
aliveEntries.push_back(logSet->logEntryArray[i]);
responded[i] = true;
}
}
bool quorum_obtained = locked->validate(logSet->tLogPolicy);
// We intentionally skip considering antiquorums, as the CPU cost of doing so is prohibitive.
if (logSet->tLogReplicationFactor == 1 && locked->size() > 0) {
ASSERT(quorum_obtained);
}
if (quorum_obtained) {
if (logSet->satisfiesPolicy(aliveEntries)) {
return Void();
}
@ -1566,7 +1559,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
ACTOR static Future<Void> recruitOldLogRouters( TagPartitionedLogSystem* self, vector<WorkerInterface> workers, LogEpoch recoveryCount, int8_t locality, Version startVersion,
std::vector<LocalityData> tLogLocalities, IRepPolicyRef tLogPolicy, bool forRemote ) {
std::vector<LocalityData> tLogLocalities, Reference<IReplicationPolicy> tLogPolicy, bool forRemote ) {
state vector<vector<Future<TLogInterface>>> logRouterInitializationReplies;
state vector<Future<TLogInterface>> allReplies;
int nextRouter = 0;

View File

@ -120,7 +120,7 @@ struct InitializeLogRouterRequest {
Tag routerTag;
Version startVersion;
std::vector<LocalityData> tLogLocalities;
IRepPolicyRef tLogPolicy;
Reference<IReplicationPolicy> tLogPolicy;
int8_t locality;
ReplyPromise<struct TLogInterface> reply;