diff --git a/fdbrpc/Replication.h b/fdbrpc/Replication.h index fffa4e5ae3..915be8daa9 100644 --- a/fdbrpc/Replication.h +++ b/fdbrpc/Replication.h @@ -385,7 +385,6 @@ protected: LocalityEntry const& add(LocalityEntry const& entry, LocalityData const& data) { _entryArray.push_back(entry); _mutableEntryArray.push_back(entry); - ASSERT(data._data.size() > 0); // Ensure that the key value array is large enough to hold the values if (_keyValueArray.capacity() < _keyValueArray.size() + data._data.size()) { @@ -419,7 +418,6 @@ protected: if (_keyValueArray.capacity() < _keyValueArray.size() + record->_dataMap->size()) { _keyValueArray.reserve(_keyValueArray.size() + record->_dataMap->size()); } - ASSERT(record->_dataMap->_keyvaluearray.size() > 0); for (auto& keyValuePair : record->_dataMap->_keyvaluearray) { auto keyString = _localitygroup->keyText(keyValuePair.first); diff --git a/fdbrpc/ReplicationPolicy.h b/fdbrpc/ReplicationPolicy.h index 43f8a34831..dd890b6048 100644 --- a/fdbrpc/ReplicationPolicy.h +++ b/fdbrpc/ReplicationPolicy.h @@ -68,6 +68,11 @@ struct IReplicationPolicy : public ReferenceCounted { std::vector const& solutionSet, std::vector const& alsoServers, LocalitySetRef const& fromServers ); + + // Returns a set of the attributes that this policy uses in selection and validation. + std::set attributeKeys() const + { std::set keys; this->attributeKeys(&keys); return keys; } + virtual void attributeKeys(std::set*) const = 0; }; template @@ -108,6 +113,7 @@ struct PolicyOne : IReplicationPolicy, public ReferenceCounted { template void serialize(Ar& ar) { } + virtual void attributeKeys(std::set* set) const override { return; } }; struct PolicyAcross : IReplicationPolicy, public ReferenceCounted { @@ -135,6 +141,9 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted static bool compareAddedResults(const std::pair& rhs, const std::pair& lhs) { return (rhs.first < lhs.first) || (!(lhs.first < rhs.first) && (rhs.second < lhs.second)); } + virtual void attributeKeys(std::set *set) const override + { set->insert(_attribKey); _policy->attributeKeys(set); } + protected: int _count; std::string _attribKey; @@ -207,6 +216,9 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted { } } + virtual void attributeKeys(std::set *set) const override + { for (const IRepPolicyRef& r : _policies) { r->attributeKeys(set); } } + protected: std::vector _policies; std::vector _sortedPolicies; diff --git a/fdbrpc/ReplicationUtils.cpp b/fdbrpc/ReplicationUtils.cpp index 22226a2580..6b1e0efcb3 100644 --- a/fdbrpc/ReplicationUtils.cpp +++ b/fdbrpc/ReplicationUtils.cpp @@ -807,6 +807,31 @@ int testReplication() return totalErrors; } +namespace { +void filterLocalityDataForPolicy(const std::set& keys, LocalityData* ld) { + for (auto iter = ld->_data.begin(); iter != ld->_data.end();) { + auto prev = iter; + iter++; + if (keys.find(prev->first.toString()) == keys.end()) { + ld->_data.erase(prev); + } + } +} +} + +void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld) { + if (!policy) return; + filterLocalityDataForPolicy(policy->attributeKeys(), ld); +} + +void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector* vld) { + if (!policy) return; + std::set keys = policy->attributeKeys(); + for (LocalityData& ld : *vld) { + filterLocalityDataForPolicy(policy, &ld); + } +} + TEST_CASE("fdbrpc/Replication/test") { printf("Running replication test\n"); diff --git a/fdbrpc/ReplicationUtils.h b/fdbrpc/ReplicationUtils.h index 27d7e284ac..4bb7d4edcf 100644 --- a/fdbrpc/ReplicationUtils.h +++ b/fdbrpc/ReplicationUtils.h @@ -71,4 +71,9 @@ extern bool validateAllCombinations( std::vector const& newItems, unsigned int nCombinationSize, bool bCheckIfValid = true); + +/// Remove all pieces of locality information from the LocalityData that will not be used when validating the policy. +void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld); +void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector* vld); + #endif diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index a2dbb32772..ad6ab2a299 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -21,6 +21,7 @@ #include "LogSystem.h" #include "fdbrpc/FailureMonitor.h" #include "Knobs.h" +#include "fdbrpc/ReplicationUtils.h" ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore ) : interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) { @@ -243,12 +244,14 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vectortLogPolicy, &this->tLogLocalities); } ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional nextVersion, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor ) : serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities), tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor) { sortedVersions.resize(serverCursors.size()); calcHasMessage(); + filterLocalityDataForPolicy(this->tLogPolicy, &this->tLogLocalities); } Reference ILogSystem::MergedPeekCursor::cloneNoMore() { diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index a756ea5ac5..d25035c3ef 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -141,6 +141,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities = lsConf.tLogLocalities; logSystem->logSystemType = lsConf.logSystemType; logSystem->UpdateLocalitySet(lsConf.tLogs); + filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities); return logSystem; } @@ -159,6 +160,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogReplicationFactor = lsConf.oldTLogs[0].tLogReplicationFactor; logSystem->tLogPolicy = lsConf.oldTLogs[0].tLogPolicy; logSystem->tLogLocalities = lsConf.oldTLogs[0].tLogLocalities; + filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities); logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1); for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) { @@ -346,21 +348,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogReplicationFactor, numPresent - self->tLogWriteAntiQuorum) ) ); + state Reference locked(new LocalityGroup()); + state std::vector responded(alive.size()); + for (int i = 0; i < alive.size(); i++) { + responded[i] = false; + } loop { - LocalityGroup locked; - std::vector unlocked, unused; for (int i = 0; i < alive.size(); i++) { - if (alive[i].isReady() && !alive[i].isError()) { - locked.add(self->tLogLocalities[i]); - } else { - unlocked.push_back(self->tLogLocalities[i]); + if (!responded[i] && alive[i].isReady() && !alive[i].isError()) { + locked->add(self->tLogLocalities[i]); + responded[i] = true; } } - bool quorum_obtained = locked.validate(self->tLogPolicy); - if (!quorum_obtained && self->tLogWriteAntiQuorum != 0) { - quorum_obtained = !validateAllCombinations(unused, locked, self->tLogPolicy, unlocked, self->tLogWriteAntiQuorum, false); - } - if (self->tLogReplicationFactor - self->tLogWriteAntiQuorum == 1 && locked.size() > 0) { + bool quorum_obtained = locked->validate(self->tLogPolicy); + // We intentionally skip considering antiquorums, as the CPU cost of doing so is prohibitive. + if (self->tLogReplicationFactor == 1 && locked->size() > 0) { ASSERT(quorum_obtained); } if (quorum_obtained) { @@ -581,6 +583,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogPolicy = prevState.tLogPolicy; logSystem->tLogLocalities = prevState.tLogLocalities; logSystem->logSystemType = prevState.logSystemType; + filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities); logSystem->epochEndVersion = 0; logSystem->knownCommittedVersion = 0; @@ -869,6 +872,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers[i] = Reference>>( new AsyncVar>( OptionalInterface(initializationReplies[i].get()) ) ); logSystem->tLogLocalities[i] = workers[i].locality; } + filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities); //Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed();