From a5b916cd8d6defb306cd66aebb3be91511119625 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sat, 7 Aug 2021 19:42:41 -0700 Subject: [PATCH] PaxosConfigTransaction should only send read requests to valid replicas --- fdbclient/PaxosConfigTransaction.actor.cpp | 85 +++++++++++++--------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index 0143aa949e..ba47b949d1 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -24,26 +24,38 @@ namespace { +// TODO: Some replicas may reply after quorum has already been achieved, and we may want to add them to the readReplicas +// list class GetGenerationQuorum { +public: + struct Result { + ConfigGeneration generation; + std::vector readReplicas; + Result(ConfigGeneration const& generation, std::vector const& readReplicas) + : generation(generation), readReplicas(readReplicas) {} + Result() = default; + }; + +private: std::vector> futures; - std::map seenGenerations; - Promise result; + std::map> seenGenerations; + Promise result; size_t totalRepliesReceived{ 0 }; size_t maxAgreement{ 0 }; size_t size{ 0 }; Optional lastSeenLiveVersion; - ACTOR static Future handleReplyActor(GetGenerationQuorum* self, - Future replyFuture) { - ConfigTransactionGetGenerationReply reply = wait(replyFuture); + ACTOR static Future addRequestActor(GetGenerationQuorum* self, ConfigTransactionInterface cti) { + ConfigTransactionGetGenerationReply reply = + wait(cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion })); ++self->totalRepliesReceived; auto gen = reply.generation; self->lastSeenLiveVersion = std::max(gen.liveVersion, self->lastSeenLiveVersion.orDefault(::invalidVersion)); - auto& count = self->seenGenerations[gen]; - ++count; - self->maxAgreement = std::max(count, self->maxAgreement); - if (count == self->size / 2 + 1) { - self->result.send(gen); // self may be destroyed here + auto& replicas = self->seenGenerations[gen]; + replicas.push_back(cti); + self->maxAgreement = std::max(replicas.size(), self->maxAgreement); + if (replicas.size() == self->size / 2 + 1) { + self->result.send(Result{ gen, replicas }); } else if (self->maxAgreement + (self->size - self->totalRepliesReceived) < (self->size / 2 + 1)) { self->result.sendError(failed_to_reach_quorum()); } @@ -55,10 +67,8 @@ public: : size(size), lastSeenLiveVersion(lastSeenLiveVersion) { futures.reserve(size); } - void addReplyCallback(Future replyFuture) { - futures.push_back(handleReplyActor(this, replyFuture)); - } - Future getGeneration() const { return result.getFuture(); } + void addRequest(ConfigTransactionInterface cti) { futures.push_back(addRequestActor(this, cti)); } + Future getResult() const { return result.getFuture(); } Optional getLastSeenLiveVersion() const { return lastSeenLiveVersion; } }; @@ -66,26 +76,26 @@ public: class PaxosConfigTransactionImpl { ConfigTransactionCommitRequest toCommit; - Future getGenerationFuture; + Future getGenerationFuture; std::vector ctis; int numRetries{ 0 }; bool committed{ false }; Optional lastSeenLiveVersion; Optional dID; Database cx; + std::vector readReplicas; - ACTOR static Future getGeneration(PaxosConfigTransactionImpl* self) { + ACTOR static Future getGeneration(PaxosConfigTransactionImpl* self) { state GetGenerationQuorum quorum(self->ctis.size(), self->lastSeenLiveVersion); state int retries = 0; loop { for (auto const& cti : self->ctis) { - quorum.addReplyCallback( - cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion })); + quorum.addRequest(cti); } try { - state ConfigGeneration gen = wait(quorum.getGeneration()); + state GetGenerationQuorum::Result result = wait(quorum.getResult()); wait(delay(0.0)); // Let reply callback actors finish before destructing quorum - return gen; + return result; } catch (Error& e) { if (e.code() == error_code_failed_to_reach_quorum) { TEST(true); // Failed to reach quorum getting generation @@ -104,10 +114,10 @@ class PaxosConfigTransactionImpl { self->getGenerationFuture = getGeneration(self); } state ConfigKey configKey = ConfigKey::decodeKey(key); - ConfigGeneration generation = wait(self->getGenerationFuture); - // TODO: Load balance, and only send request to replicas that we have gotten the current generation from - ConfigTransactionGetReply reply = - wait(self->ctis[0].get.getReply(ConfigTransactionGetRequest{ generation, configKey })); + GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture); + // TODO: Load balance + ConfigTransactionGetReply reply = wait( + genResult.readReplicas[0].get.getReply(ConfigTransactionGetRequest{ genResult.generation, configKey })); if (reply.value.present()) { return reply.value.get().toValue(); } else { @@ -119,10 +129,10 @@ class PaxosConfigTransactionImpl { if (!self->getGenerationFuture.isValid()) { self->getGenerationFuture = getGeneration(self); } - ConfigGeneration generation = wait(self->getGenerationFuture); + GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture); // TODO: Load balance - ConfigTransactionGetConfigClassesReply reply = - wait(self->ctis[0].getClasses.getReply(ConfigTransactionGetConfigClassesRequest{ generation })); + ConfigTransactionGetConfigClassesReply reply = wait(genResult.readReplicas[0].getClasses.getReply( + ConfigTransactionGetConfigClassesRequest{ genResult.generation })); RangeResult result; result.reserve(result.arena(), reply.configClasses.size()); for (const auto& configClass : reply.configClasses) { @@ -135,10 +145,10 @@ class PaxosConfigTransactionImpl { if (!self->getGenerationFuture.isValid()) { self->getGenerationFuture = getGeneration(self); } - ConfigGeneration generation = wait(self->getGenerationFuture); + GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture); // TODO: Load balance - ConfigTransactionGetKnobsReply reply = - wait(self->ctis[0].getKnobs.getReply(ConfigTransactionGetKnobsRequest{ generation, configClass })); + ConfigTransactionGetKnobsReply reply = wait(genResult.readReplicas[0].getKnobs.getReply( + ConfigTransactionGetKnobsRequest{ genResult.generation, configClass })); RangeResult result; result.reserve(result.arena(), reply.knobNames.size()); for (const auto& knobName : reply.knobNames) { @@ -151,10 +161,13 @@ class PaxosConfigTransactionImpl { if (!self->getGenerationFuture.isValid()) { self->getGenerationFuture = getGeneration(self); } - wait(store(self->toCommit.generation, self->getGenerationFuture)); + GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture); + self->toCommit.generation = genResult.generation; self->toCommit.annotation.timestamp = now(); std::vector> commitFutures; commitFutures.reserve(self->ctis.size()); + // Send commit message to all replicas, even those that did not return the used replica. + // This way, slow replicas are kept up date. for (const auto& cti : self->ctis) { commitFutures.push_back(cti.commit.getReply(self->toCommit)); } @@ -169,18 +182,20 @@ public: if (!getGenerationFuture.isValid()) { getGenerationFuture = getGeneration(this); } - return map(getGenerationFuture, [](auto const& gen) { return gen.committedVersion; }); + return map(getGenerationFuture, [](auto const& genResult) { return genResult.generation.committedVersion; }); } Optional getCachedReadVersion() const { if (getGenerationFuture.isValid() && getGenerationFuture.isReady() && !getGenerationFuture.isError()) { - return getGenerationFuture.get().committedVersion; + return getGenerationFuture.get().generation.committedVersion; } else { return {}; } } - Version getCommittedVersion() const { return committed ? getGenerationFuture.get().liveVersion : ::invalidVersion; } + Version getCommittedVersion() const { + return committed ? getGenerationFuture.get().generation.liveVersion : ::invalidVersion; + } int64_t getApproximateSize() const { return toCommit.expectedSize(); } @@ -215,7 +230,7 @@ public: void debugTransaction(UID dID) { this->dID = dID; } void reset() { - getGenerationFuture = Future{}; + getGenerationFuture = Future{}; toCommit = {}; committed = false; }