Add CommitQuorum to PaxosConfigTransaction.actor.cpp
This commit is contained in:
parent
3a067b9cc8
commit
2b3041f205
|
@ -22,6 +22,66 @@
|
|||
#include "fdbclient/PaxosConfigTransaction.h"
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
class CommitQuorum {
|
||||
std::vector<ConfigTransactionInterface> ctis;
|
||||
std::vector<Future<Void>> futures;
|
||||
size_t failed{ 0 };
|
||||
size_t successful{ 0 };
|
||||
size_t maybeCommitted{ 0 };
|
||||
Promise<Void> result;
|
||||
ConfigTransactionCommitRequest toCommit;
|
||||
|
||||
void updateResult() {
|
||||
if (successful >= ctis.size() / 2 + 1 && !result.isSet()) {
|
||||
result.send(Void());
|
||||
} else if (failed >= ctis.size() / 2 + 1 && !result.isError()) {
|
||||
result.sendError(not_committed());
|
||||
} else {
|
||||
// Check if it is possible to ever receive quorum agreement
|
||||
auto totalRequestsOutstanding = ctis.size() - (failed + successful + maybeCommitted);
|
||||
if ((failed + totalRequestsOutstanding < ctis.size() / 2 + 1) &&
|
||||
(successful + totalRequestsOutstanding < ctis.size() / 2 + 1) && !result.isError()) {
|
||||
result.sendError(commit_unknown_result());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> addRequestActor(CommitQuorum* self, ConfigTransactionInterface cti) {
|
||||
try {
|
||||
wait(cti.commit.getReply(self->toCommit));
|
||||
++self->successful;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_request_maybe_delivered) {
|
||||
++self->maybeCommitted;
|
||||
} else {
|
||||
++self->failed;
|
||||
}
|
||||
}
|
||||
self->updateResult();
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
CommitQuorum() = default;
|
||||
explicit CommitQuorum(std::vector<ConfigTransactionInterface> const& ctis) : ctis(ctis) {
|
||||
futures.reserve(ctis.size());
|
||||
}
|
||||
void set(KeyRef key, ValueRef value) { toCommit.set(key, value); }
|
||||
void clear(KeyRef key) { toCommit.clear(key); }
|
||||
void setGeneration(ConfigGeneration generation) { toCommit.generation = generation; }
|
||||
void setTimestamp() { toCommit.annotation.timestamp = now(); }
|
||||
size_t expectedSize() const { return toCommit.expectedSize(); }
|
||||
Future<Void> commit() {
|
||||
// Send commit message to all replicas, even those that did not return the used replica.
|
||||
// This way, slow replicas are kept up date.
|
||||
for (const auto& cti : ctis) {
|
||||
futures.push_back(addRequestActor(this, cti));
|
||||
}
|
||||
return result.getFuture();
|
||||
}
|
||||
bool committed() const { return result.isSet(); }
|
||||
};
|
||||
|
||||
class GetGenerationQuorum {
|
||||
std::vector<ConfigTransactionInterface> ctis;
|
||||
std::vector<Future<Void>> futures;
|
||||
|
@ -101,11 +161,10 @@ public:
|
|||
};
|
||||
|
||||
class PaxosConfigTransactionImpl {
|
||||
ConfigTransactionCommitRequest toCommit;
|
||||
GetGenerationQuorum getGenerationQuorum;
|
||||
std::vector<ConfigTransactionInterface> ctis;
|
||||
GetGenerationQuorum getGenerationQuorum;
|
||||
CommitQuorum commitQuorum;
|
||||
int numRetries{ 0 };
|
||||
bool committed{ false };
|
||||
Optional<UID> dID;
|
||||
Database cx;
|
||||
|
||||
|
@ -151,18 +210,9 @@ class PaxosConfigTransactionImpl {
|
|||
|
||||
ACTOR static Future<Void> commit(PaxosConfigTransactionImpl* self) {
|
||||
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
|
||||
self->toCommit.generation = generation;
|
||||
self->toCommit.annotation.timestamp = now();
|
||||
std::vector<Future<Void>> commitFutures;
|
||||
commitFutures.reserve(self->ctis.size());
|
||||
// Send commit message to all replicas, even those that did not return the used replica.
|
||||
// This way, slow replicas are kept up date.
|
||||
for (const auto& cti : self->ctis) {
|
||||
commitFutures.push_back(cti.commit.getReply(self->toCommit));
|
||||
}
|
||||
// FIXME: Must tolerate failures and disagreement
|
||||
wait(quorum(commitFutures, commitFutures.size() / 2 + 1));
|
||||
self->committed = true;
|
||||
self->commitQuorum.setGeneration(generation);
|
||||
self->commitQuorum.setTimestamp();
|
||||
wait(self->commitQuorum.commit());
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -181,14 +231,15 @@ public:
|
|||
}
|
||||
|
||||
Version getCommittedVersion() const {
|
||||
return committed ? getGenerationQuorum.getCachedGeneration().get().liveVersion : ::invalidVersion;
|
||||
return commitQuorum.committed() ? getGenerationQuorum.getCachedGeneration().get().liveVersion
|
||||
: ::invalidVersion;
|
||||
}
|
||||
|
||||
int64_t getApproximateSize() const { return toCommit.expectedSize(); }
|
||||
int64_t getApproximateSize() const { return commitQuorum.expectedSize(); }
|
||||
|
||||
void set(KeyRef key, ValueRef value) { toCommit.set(key, value); }
|
||||
void set(KeyRef key, ValueRef value) { commitQuorum.set(key, value); }
|
||||
|
||||
void clear(KeyRef key) { toCommit.clear(key); }
|
||||
void clear(KeyRef key) { commitQuorum.clear(key); }
|
||||
|
||||
Future<Optional<Value>> get(Key const& key) { return get(this, key); }
|
||||
|
||||
|
@ -207,6 +258,7 @@ public:
|
|||
|
||||
Future<Void> onError(Error const& e) {
|
||||
// TODO: Improve this:
|
||||
TraceEvent("ConfigIncrementOnError").error(e).detail("NumRetries", numRetries);
|
||||
if (e.code() == error_code_transaction_too_old) {
|
||||
reset();
|
||||
return delay((1 << numRetries++) * 0.01 * deterministicRandom()->random01());
|
||||
|
@ -218,8 +270,7 @@ public:
|
|||
|
||||
void reset() {
|
||||
getGenerationQuorum = GetGenerationQuorum{ ctis };
|
||||
toCommit = {};
|
||||
committed = false;
|
||||
commitQuorum = CommitQuorum{ ctis };
|
||||
}
|
||||
|
||||
void fullReset() {
|
||||
|
@ -246,9 +297,11 @@ public:
|
|||
ctis.emplace_back(coordinator);
|
||||
}
|
||||
getGenerationQuorum = GetGenerationQuorum{ ctis };
|
||||
commitQuorum = CommitQuorum{ ctis };
|
||||
}
|
||||
|
||||
PaxosConfigTransactionImpl(std::vector<ConfigTransactionInterface> const& ctis) : ctis(ctis) {}
|
||||
PaxosConfigTransactionImpl(std::vector<ConfigTransactionInterface> const& ctis)
|
||||
: ctis(ctis), getGenerationQuorum(ctis), commitQuorum(ctis) {}
|
||||
};
|
||||
|
||||
Future<Version> PaxosConfigTransaction::getReadVersion() {
|
||||
|
|
Loading…
Reference in New Issue