PaxosConfigTransaction should only send read requests to valid replicas

This commit is contained in:
sfc-gh-tclinkenbeard 2021-08-07 19:42:41 -07:00
parent 79ba9c4e3a
commit a5b916cd8d
1 changed files with 50 additions and 35 deletions

View File

@ -24,26 +24,38 @@
namespace {
// TODO: Some replicas may reply after quorum has already been achieved, and we may want to add them to the readReplicas
// list
class GetGenerationQuorum {
public:
struct Result {
ConfigGeneration generation;
std::vector<ConfigTransactionInterface> readReplicas;
Result(ConfigGeneration const& generation, std::vector<ConfigTransactionInterface> const& readReplicas)
: generation(generation), readReplicas(readReplicas) {}
Result() = default;
};
private:
std::vector<Future<Void>> futures;
std::map<ConfigGeneration, size_t> seenGenerations;
Promise<ConfigGeneration> result;
std::map<ConfigGeneration, std::vector<ConfigTransactionInterface>> seenGenerations;
Promise<Result> result;
size_t totalRepliesReceived{ 0 };
size_t maxAgreement{ 0 };
size_t size{ 0 };
Optional<Version> lastSeenLiveVersion;
ACTOR static Future<Void> handleReplyActor(GetGenerationQuorum* self,
Future<ConfigTransactionGetGenerationReply> replyFuture) {
ConfigTransactionGetGenerationReply reply = wait(replyFuture);
ACTOR static Future<Void> addRequestActor(GetGenerationQuorum* self, ConfigTransactionInterface cti) {
ConfigTransactionGetGenerationReply reply =
wait(cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion }));
++self->totalRepliesReceived;
auto gen = reply.generation;
self->lastSeenLiveVersion = std::max(gen.liveVersion, self->lastSeenLiveVersion.orDefault(::invalidVersion));
auto& count = self->seenGenerations[gen];
++count;
self->maxAgreement = std::max(count, self->maxAgreement);
if (count == self->size / 2 + 1) {
self->result.send(gen); // self may be destroyed here
auto& replicas = self->seenGenerations[gen];
replicas.push_back(cti);
self->maxAgreement = std::max(replicas.size(), self->maxAgreement);
if (replicas.size() == self->size / 2 + 1) {
self->result.send(Result{ gen, replicas });
} else if (self->maxAgreement + (self->size - self->totalRepliesReceived) < (self->size / 2 + 1)) {
self->result.sendError(failed_to_reach_quorum());
}
@ -55,10 +67,8 @@ public:
: size(size), lastSeenLiveVersion(lastSeenLiveVersion) {
futures.reserve(size);
}
void addReplyCallback(Future<ConfigTransactionGetGenerationReply> replyFuture) {
futures.push_back(handleReplyActor(this, replyFuture));
}
Future<ConfigGeneration> getGeneration() const { return result.getFuture(); }
void addRequest(ConfigTransactionInterface cti) { futures.push_back(addRequestActor(this, cti)); }
Future<Result> getResult() const { return result.getFuture(); }
Optional<Version> getLastSeenLiveVersion() const { return lastSeenLiveVersion; }
};
@ -66,26 +76,26 @@ public:
class PaxosConfigTransactionImpl {
ConfigTransactionCommitRequest toCommit;
Future<ConfigGeneration> getGenerationFuture;
Future<GetGenerationQuorum::Result> getGenerationFuture;
std::vector<ConfigTransactionInterface> ctis;
int numRetries{ 0 };
bool committed{ false };
Optional<Version> lastSeenLiveVersion;
Optional<UID> dID;
Database cx;
std::vector<ConfigTransactionInterface> readReplicas;
ACTOR static Future<ConfigGeneration> getGeneration(PaxosConfigTransactionImpl* self) {
ACTOR static Future<GetGenerationQuorum::Result> getGeneration(PaxosConfigTransactionImpl* self) {
state GetGenerationQuorum quorum(self->ctis.size(), self->lastSeenLiveVersion);
state int retries = 0;
loop {
for (auto const& cti : self->ctis) {
quorum.addReplyCallback(
cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion }));
quorum.addRequest(cti);
}
try {
state ConfigGeneration gen = wait(quorum.getGeneration());
state GetGenerationQuorum::Result result = wait(quorum.getResult());
wait(delay(0.0)); // Let reply callback actors finish before destructing quorum
return gen;
return result;
} catch (Error& e) {
if (e.code() == error_code_failed_to_reach_quorum) {
TEST(true); // Failed to reach quorum getting generation
@ -104,10 +114,10 @@ class PaxosConfigTransactionImpl {
self->getGenerationFuture = getGeneration(self);
}
state ConfigKey configKey = ConfigKey::decodeKey(key);
ConfigGeneration generation = wait(self->getGenerationFuture);
// TODO: Load balance, and only send request to replicas that we have gotten the current generation from
ConfigTransactionGetReply reply =
wait(self->ctis[0].get.getReply(ConfigTransactionGetRequest{ generation, configKey }));
GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture);
// TODO: Load balance
ConfigTransactionGetReply reply = wait(
genResult.readReplicas[0].get.getReply(ConfigTransactionGetRequest{ genResult.generation, configKey }));
if (reply.value.present()) {
return reply.value.get().toValue();
} else {
@ -119,10 +129,10 @@ class PaxosConfigTransactionImpl {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
ConfigGeneration generation = wait(self->getGenerationFuture);
GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture);
// TODO: Load balance
ConfigTransactionGetConfigClassesReply reply =
wait(self->ctis[0].getClasses.getReply(ConfigTransactionGetConfigClassesRequest{ generation }));
ConfigTransactionGetConfigClassesReply reply = wait(genResult.readReplicas[0].getClasses.getReply(
ConfigTransactionGetConfigClassesRequest{ genResult.generation }));
RangeResult result;
result.reserve(result.arena(), reply.configClasses.size());
for (const auto& configClass : reply.configClasses) {
@ -135,10 +145,10 @@ class PaxosConfigTransactionImpl {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
ConfigGeneration generation = wait(self->getGenerationFuture);
GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture);
// TODO: Load balance
ConfigTransactionGetKnobsReply reply =
wait(self->ctis[0].getKnobs.getReply(ConfigTransactionGetKnobsRequest{ generation, configClass }));
ConfigTransactionGetKnobsReply reply = wait(genResult.readReplicas[0].getKnobs.getReply(
ConfigTransactionGetKnobsRequest{ genResult.generation, configClass }));
RangeResult result;
result.reserve(result.arena(), reply.knobNames.size());
for (const auto& knobName : reply.knobNames) {
@ -151,10 +161,13 @@ class PaxosConfigTransactionImpl {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
wait(store(self->toCommit.generation, self->getGenerationFuture));
GetGenerationQuorum::Result genResult = wait(self->getGenerationFuture);
self->toCommit.generation = genResult.generation;
self->toCommit.annotation.timestamp = now();
std::vector<Future<Void>> commitFutures;
commitFutures.reserve(self->ctis.size());
// Send commit message to all replicas, even those that did not return the used replica.
// This way, slow replicas are kept up date.
for (const auto& cti : self->ctis) {
commitFutures.push_back(cti.commit.getReply(self->toCommit));
}
@ -169,18 +182,20 @@ public:
if (!getGenerationFuture.isValid()) {
getGenerationFuture = getGeneration(this);
}
return map(getGenerationFuture, [](auto const& gen) { return gen.committedVersion; });
return map(getGenerationFuture, [](auto const& genResult) { return genResult.generation.committedVersion; });
}
Optional<Version> getCachedReadVersion() const {
if (getGenerationFuture.isValid() && getGenerationFuture.isReady() && !getGenerationFuture.isError()) {
return getGenerationFuture.get().committedVersion;
return getGenerationFuture.get().generation.committedVersion;
} else {
return {};
}
}
Version getCommittedVersion() const { return committed ? getGenerationFuture.get().liveVersion : ::invalidVersion; }
Version getCommittedVersion() const {
return committed ? getGenerationFuture.get().generation.liveVersion : ::invalidVersion;
}
int64_t getApproximateSize() const { return toCommit.expectedSize(); }
@ -215,7 +230,7 @@ public:
void debugTransaction(UID dID) { this->dID = dID; }
void reset() {
getGenerationFuture = Future<ConfigGeneration>{};
getGenerationFuture = Future<GetGenerationQuorum::Result>{};
toCommit = {};
committed = false;
}