From 2b3041f205ed845721911fa73d44200cfaa0a2a3 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Fri, 20 Aug 2021 15:53:13 -0700 Subject: [PATCH] Add CommitQuorum to PaxosConfigTransaction.actor.cpp --- fdbclient/PaxosConfigTransaction.actor.cpp | 97 +++++++++++++++++----- 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index 1460a4241b..494c863306 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -22,6 +22,66 @@ #include "fdbclient/PaxosConfigTransaction.h" #include "flow/actorcompiler.h" // must be last include +class CommitQuorum { + std::vector ctis; + std::vector> futures; + size_t failed{ 0 }; + size_t successful{ 0 }; + size_t maybeCommitted{ 0 }; + Promise result; + ConfigTransactionCommitRequest toCommit; + + void updateResult() { + if (successful >= ctis.size() / 2 + 1 && !result.isSet()) { + result.send(Void()); + } else if (failed >= ctis.size() / 2 + 1 && !result.isError()) { + result.sendError(not_committed()); + } else { + // Check if it is possible to ever receive quorum agreement + auto totalRequestsOutstanding = ctis.size() - (failed + successful + maybeCommitted); + if ((failed + totalRequestsOutstanding < ctis.size() / 2 + 1) && + (successful + totalRequestsOutstanding < ctis.size() / 2 + 1) && !result.isError()) { + result.sendError(commit_unknown_result()); + } + } + } + + ACTOR static Future addRequestActor(CommitQuorum* self, ConfigTransactionInterface cti) { + try { + wait(cti.commit.getReply(self->toCommit)); + ++self->successful; + } catch (Error& e) { + if (e.code() == error_code_request_maybe_delivered) { + ++self->maybeCommitted; + } else { + ++self->failed; + } + } + self->updateResult(); + return Void(); + } + +public: + CommitQuorum() = default; + explicit CommitQuorum(std::vector const& ctis) : ctis(ctis) { + futures.reserve(ctis.size()); + } + void set(KeyRef key, ValueRef value) { toCommit.set(key, value); } + void clear(KeyRef key) { toCommit.clear(key); } + void setGeneration(ConfigGeneration generation) { toCommit.generation = generation; } + void setTimestamp() { toCommit.annotation.timestamp = now(); } + size_t expectedSize() const { return toCommit.expectedSize(); } + Future commit() { + // Send commit message to all replicas, even those that did not return the used replica. + // This way, slow replicas are kept up date. + for (const auto& cti : ctis) { + futures.push_back(addRequestActor(this, cti)); + } + return result.getFuture(); + } + bool committed() const { return result.isSet(); } +}; + class GetGenerationQuorum { std::vector ctis; std::vector> futures; @@ -101,11 +161,10 @@ public: }; class PaxosConfigTransactionImpl { - ConfigTransactionCommitRequest toCommit; - GetGenerationQuorum getGenerationQuorum; std::vector ctis; + GetGenerationQuorum getGenerationQuorum; + CommitQuorum commitQuorum; int numRetries{ 0 }; - bool committed{ false }; Optional dID; Database cx; @@ -151,18 +210,9 @@ class PaxosConfigTransactionImpl { ACTOR static Future commit(PaxosConfigTransactionImpl* self) { ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); - self->toCommit.generation = generation; - self->toCommit.annotation.timestamp = now(); - std::vector> commitFutures; - commitFutures.reserve(self->ctis.size()); - // Send commit message to all replicas, even those that did not return the used replica. - // This way, slow replicas are kept up date. - for (const auto& cti : self->ctis) { - commitFutures.push_back(cti.commit.getReply(self->toCommit)); - } - // FIXME: Must tolerate failures and disagreement - wait(quorum(commitFutures, commitFutures.size() / 2 + 1)); - self->committed = true; + self->commitQuorum.setGeneration(generation); + self->commitQuorum.setTimestamp(); + wait(self->commitQuorum.commit()); return Void(); } @@ -181,14 +231,15 @@ public: } Version getCommittedVersion() const { - return committed ? getGenerationQuorum.getCachedGeneration().get().liveVersion : ::invalidVersion; + return commitQuorum.committed() ? getGenerationQuorum.getCachedGeneration().get().liveVersion + : ::invalidVersion; } - int64_t getApproximateSize() const { return toCommit.expectedSize(); } + int64_t getApproximateSize() const { return commitQuorum.expectedSize(); } - void set(KeyRef key, ValueRef value) { toCommit.set(key, value); } + void set(KeyRef key, ValueRef value) { commitQuorum.set(key, value); } - void clear(KeyRef key) { toCommit.clear(key); } + void clear(KeyRef key) { commitQuorum.clear(key); } Future> get(Key const& key) { return get(this, key); } @@ -207,6 +258,7 @@ public: Future onError(Error const& e) { // TODO: Improve this: + TraceEvent("ConfigIncrementOnError").error(e).detail("NumRetries", numRetries); if (e.code() == error_code_transaction_too_old) { reset(); return delay((1 << numRetries++) * 0.01 * deterministicRandom()->random01()); @@ -218,8 +270,7 @@ public: void reset() { getGenerationQuorum = GetGenerationQuorum{ ctis }; - toCommit = {}; - committed = false; + commitQuorum = CommitQuorum{ ctis }; } void fullReset() { @@ -246,9 +297,11 @@ public: ctis.emplace_back(coordinator); } getGenerationQuorum = GetGenerationQuorum{ ctis }; + commitQuorum = CommitQuorum{ ctis }; } - PaxosConfigTransactionImpl(std::vector const& ctis) : ctis(ctis) {} + PaxosConfigTransactionImpl(std::vector const& ctis) + : ctis(ctis), getGenerationQuorum(ctis), commitQuorum(ctis) {} }; Future PaxosConfigTransaction::getReadVersion() {