Merge pull request #208 from cie/alexmiller/grvtfix
Fix the GRV performance regression
This commit is contained in:
commit
f19cb3bbbd
|
@ -385,7 +385,6 @@ protected:
|
|||
LocalityEntry const& add(LocalityEntry const& entry, LocalityData const& data) {
|
||||
_entryArray.push_back(entry);
|
||||
_mutableEntryArray.push_back(entry);
|
||||
ASSERT(data._data.size() > 0);
|
||||
|
||||
// Ensure that the key value array is large enough to hold the values
|
||||
if (_keyValueArray.capacity() < _keyValueArray.size() + data._data.size()) {
|
||||
|
@ -419,7 +418,6 @@ protected:
|
|||
if (_keyValueArray.capacity() < _keyValueArray.size() + record->_dataMap->size()) {
|
||||
_keyValueArray.reserve(_keyValueArray.size() + record->_dataMap->size());
|
||||
}
|
||||
ASSERT(record->_dataMap->_keyvaluearray.size() > 0);
|
||||
|
||||
for (auto& keyValuePair : record->_dataMap->_keyvaluearray) {
|
||||
auto keyString = _localitygroup->keyText(keyValuePair.first);
|
||||
|
|
|
@ -68,6 +68,11 @@ struct IReplicationPolicy : public ReferenceCounted<IReplicationPolicy> {
|
|||
std::vector<LocalityEntry> const& solutionSet,
|
||||
std::vector<LocalityEntry> const& alsoServers,
|
||||
LocalitySetRef const& fromServers );
|
||||
|
||||
// Returns a set of the attributes that this policy uses in selection and validation.
|
||||
std::set<std::string> attributeKeys() const
|
||||
{ std::set<std::string> keys; this->attributeKeys(&keys); return keys; }
|
||||
virtual void attributeKeys(std::set<std::string>*) const = 0;
|
||||
};
|
||||
|
||||
template <class Archive>
|
||||
|
@ -108,6 +113,7 @@ struct PolicyOne : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
|
|||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
}
|
||||
virtual void attributeKeys(std::set<std::string>* set) const override { return; }
|
||||
};
|
||||
|
||||
struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross> {
|
||||
|
@ -135,6 +141,9 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross>
|
|||
static bool compareAddedResults(const std::pair<int, int>& rhs, const std::pair<int, int>& lhs)
|
||||
{ return (rhs.first < lhs.first) || (!(lhs.first < rhs.first) && (rhs.second < lhs.second)); }
|
||||
|
||||
virtual void attributeKeys(std::set<std::string> *set) const override
|
||||
{ set->insert(_attribKey); _policy->attributeKeys(set); }
|
||||
|
||||
protected:
|
||||
int _count;
|
||||
std::string _attribKey;
|
||||
|
@ -207,6 +216,9 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
|
|||
}
|
||||
}
|
||||
|
||||
virtual void attributeKeys(std::set<std::string> *set) const override
|
||||
{ for (const IRepPolicyRef& r : _policies) { r->attributeKeys(set); } }
|
||||
|
||||
protected:
|
||||
std::vector<IRepPolicyRef> _policies;
|
||||
std::vector<IRepPolicyRef> _sortedPolicies;
|
||||
|
|
|
@ -807,6 +807,31 @@ int testReplication()
|
|||
return totalErrors;
|
||||
}
|
||||
|
||||
namespace {
|
||||
void filterLocalityDataForPolicy(const std::set<std::string>& keys, LocalityData* ld) {
|
||||
for (auto iter = ld->_data.begin(); iter != ld->_data.end();) {
|
||||
auto prev = iter;
|
||||
iter++;
|
||||
if (keys.find(prev->first.toString()) == keys.end()) {
|
||||
ld->_data.erase(prev);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld) {
|
||||
if (!policy) return;
|
||||
filterLocalityDataForPolicy(policy->attributeKeys(), ld);
|
||||
}
|
||||
|
||||
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld) {
|
||||
if (!policy) return;
|
||||
std::set<std::string> keys = policy->attributeKeys();
|
||||
for (LocalityData& ld : *vld) {
|
||||
filterLocalityDataForPolicy(policy, &ld);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("fdbrpc/Replication/test") {
|
||||
printf("Running replication test\n");
|
||||
|
||||
|
|
|
@ -71,4 +71,9 @@ extern bool validateAllCombinations(
|
|||
std::vector<LocalityData> const& newItems,
|
||||
unsigned int nCombinationSize,
|
||||
bool bCheckIfValid = true);
|
||||
|
||||
/// Remove all pieces of locality information from the LocalityData that will not be used when validating the policy.
|
||||
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld);
|
||||
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "LogSystem.h"
|
||||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "Knobs.h"
|
||||
#include "fdbrpc/ReplicationUtils.h"
|
||||
|
||||
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
|
||||
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) {
|
||||
|
@ -243,12 +244,14 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<O
|
|||
serverCursors.push_back( cursor );
|
||||
}
|
||||
sortedVersions.resize(serverCursors.size());
|
||||
filterLocalityDataForPolicy(this->tLogPolicy, &this->tLogLocalities);
|
||||
}
|
||||
|
||||
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
|
||||
: serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities), tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor) {
|
||||
sortedVersions.resize(serverCursors.size());
|
||||
calcHasMessage();
|
||||
filterLocalityDataForPolicy(this->tLogPolicy, &this->tLogLocalities);
|
||||
}
|
||||
|
||||
Reference<ILogSystem::IPeekCursor> ILogSystem::MergedPeekCursor::cloneNoMore() {
|
||||
|
|
|
@ -141,6 +141,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->tLogLocalities = lsConf.tLogLocalities;
|
||||
logSystem->logSystemType = lsConf.logSystemType;
|
||||
logSystem->UpdateLocalitySet(lsConf.tLogs);
|
||||
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
|
||||
|
||||
return logSystem;
|
||||
}
|
||||
|
@ -159,6 +160,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->tLogReplicationFactor = lsConf.oldTLogs[0].tLogReplicationFactor;
|
||||
logSystem->tLogPolicy = lsConf.oldTLogs[0].tLogPolicy;
|
||||
logSystem->tLogLocalities = lsConf.oldTLogs[0].tLogLocalities;
|
||||
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
|
||||
|
||||
logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1);
|
||||
for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) {
|
||||
|
@ -346,21 +348,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
Void _ = wait( quorum( alive, std::min(self->tLogReplicationFactor, numPresent - self->tLogWriteAntiQuorum) ) );
|
||||
|
||||
state Reference<LocalityGroup> locked(new LocalityGroup());
|
||||
state std::vector<bool> responded(alive.size());
|
||||
for (int i = 0; i < alive.size(); i++) {
|
||||
responded[i] = false;
|
||||
}
|
||||
loop {
|
||||
LocalityGroup locked;
|
||||
std::vector<LocalityData> unlocked, unused;
|
||||
for (int i = 0; i < alive.size(); i++) {
|
||||
if (alive[i].isReady() && !alive[i].isError()) {
|
||||
locked.add(self->tLogLocalities[i]);
|
||||
} else {
|
||||
unlocked.push_back(self->tLogLocalities[i]);
|
||||
if (!responded[i] && alive[i].isReady() && !alive[i].isError()) {
|
||||
locked->add(self->tLogLocalities[i]);
|
||||
responded[i] = true;
|
||||
}
|
||||
}
|
||||
bool quorum_obtained = locked.validate(self->tLogPolicy);
|
||||
if (!quorum_obtained && self->tLogWriteAntiQuorum != 0) {
|
||||
quorum_obtained = !validateAllCombinations(unused, locked, self->tLogPolicy, unlocked, self->tLogWriteAntiQuorum, false);
|
||||
}
|
||||
if (self->tLogReplicationFactor - self->tLogWriteAntiQuorum == 1 && locked.size() > 0) {
|
||||
bool quorum_obtained = locked->validate(self->tLogPolicy);
|
||||
// We intentionally skip considering antiquorums, as the CPU cost of doing so is prohibitive.
|
||||
if (self->tLogReplicationFactor == 1 && locked->size() > 0) {
|
||||
ASSERT(quorum_obtained);
|
||||
}
|
||||
if (quorum_obtained) {
|
||||
|
@ -581,6 +583,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->tLogPolicy = prevState.tLogPolicy;
|
||||
logSystem->tLogLocalities = prevState.tLogLocalities;
|
||||
logSystem->logSystemType = prevState.logSystemType;
|
||||
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
|
||||
|
||||
logSystem->epochEndVersion = 0;
|
||||
logSystem->knownCommittedVersion = 0;
|
||||
|
@ -869,6 +872,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );
|
||||
logSystem->tLogLocalities[i] = workers[i].locality;
|
||||
}
|
||||
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
|
||||
|
||||
//Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout
|
||||
if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed();
|
||||
|
|
Loading…
Reference in New Issue