Add GetGenerationQuorum to get generation from a quorum of config nodes
This commit is contained in:
parent
bd5a87e0e3
commit
79ba9c4e3a
|
@ -55,6 +55,22 @@ bool ConfigGeneration::operator!=(ConfigGeneration const& rhs) const {
|
|||
return !(*this == rhs);
|
||||
}
|
||||
|
||||
bool ConfigGeneration::operator<(ConfigGeneration const& rhs) const {
|
||||
if (committedVersion != rhs.committedVersion) {
|
||||
return committedVersion < rhs.committedVersion;
|
||||
} else {
|
||||
return liveVersion < rhs.liveVersion;
|
||||
}
|
||||
}
|
||||
|
||||
bool ConfigGeneration::operator>(ConfigGeneration const& rhs) const {
|
||||
if (committedVersion != rhs.committedVersion) {
|
||||
return committedVersion > rhs.committedVersion;
|
||||
} else {
|
||||
return liveVersion > rhs.liveVersion;
|
||||
}
|
||||
}
|
||||
|
||||
void ConfigTransactionCommitRequest::set(KeyRef key, ValueRef value) {
|
||||
if (key == configTransactionDescriptionKey) {
|
||||
annotation.description = KeyRef(arena, value);
|
||||
|
|
|
@ -28,18 +28,20 @@
|
|||
#include "flow/flow.h"
|
||||
|
||||
struct ConfigGeneration {
|
||||
// The live version of each node is monotonically increasing
|
||||
Version liveVersion{ 0 };
|
||||
// The committedVersion of each node is the version of the last commit made durable.
|
||||
// Each committedVersion was previously given to clients as a liveVersion, prior to commit.
|
||||
Version committedVersion{ 0 };
|
||||
// The live version of each node is monotonically increasing
|
||||
Version liveVersion{ 0 };
|
||||
|
||||
bool operator==(ConfigGeneration const&) const;
|
||||
bool operator!=(ConfigGeneration const&) const;
|
||||
bool operator<(ConfigGeneration const&) const;
|
||||
bool operator>(ConfigGeneration const&) const;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, liveVersion, committedVersion);
|
||||
serializer(ar, committedVersion, liveVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -57,12 +59,16 @@ struct ConfigTransactionGetGenerationReply {
|
|||
|
||||
struct ConfigTransactionGetGenerationRequest {
|
||||
static constexpr FileIdentifier file_identifier = 138941;
|
||||
// A hint to catch up lagging nodes:
|
||||
Optional<Version> lastSeenLiveVersion;
|
||||
ReplyPromise<ConfigTransactionGetGenerationReply> reply;
|
||||
ConfigTransactionGetGenerationRequest() = default;
|
||||
explicit ConfigTransactionGetGenerationRequest(Optional<Version> const& lastSeenLiveVersion)
|
||||
: lastSeenLiveVersion(lastSeenLiveVersion) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply);
|
||||
serializer(ar, lastSeenLiveVersion, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -22,24 +22,81 @@
|
|||
#include "fdbclient/PaxosConfigTransaction.h"
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
namespace {
|
||||
|
||||
class GetGenerationQuorum {
|
||||
std::vector<Future<Void>> futures;
|
||||
std::map<ConfigGeneration, size_t> seenGenerations;
|
||||
Promise<ConfigGeneration> result;
|
||||
size_t totalRepliesReceived{ 0 };
|
||||
size_t maxAgreement{ 0 };
|
||||
size_t size{ 0 };
|
||||
Optional<Version> lastSeenLiveVersion;
|
||||
|
||||
ACTOR static Future<Void> handleReplyActor(GetGenerationQuorum* self,
|
||||
Future<ConfigTransactionGetGenerationReply> replyFuture) {
|
||||
ConfigTransactionGetGenerationReply reply = wait(replyFuture);
|
||||
++self->totalRepliesReceived;
|
||||
auto gen = reply.generation;
|
||||
self->lastSeenLiveVersion = std::max(gen.liveVersion, self->lastSeenLiveVersion.orDefault(::invalidVersion));
|
||||
auto& count = self->seenGenerations[gen];
|
||||
++count;
|
||||
self->maxAgreement = std::max(count, self->maxAgreement);
|
||||
if (count == self->size / 2 + 1) {
|
||||
self->result.send(gen); // self may be destroyed here
|
||||
} else if (self->maxAgreement + (self->size - self->totalRepliesReceived) < (self->size / 2 + 1)) {
|
||||
self->result.sendError(failed_to_reach_quorum());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
GetGenerationQuorum(size_t size, Optional<Version> const& lastSeenLiveVersion)
|
||||
: size(size), lastSeenLiveVersion(lastSeenLiveVersion) {
|
||||
futures.reserve(size);
|
||||
}
|
||||
void addReplyCallback(Future<ConfigTransactionGetGenerationReply> replyFuture) {
|
||||
futures.push_back(handleReplyActor(this, replyFuture));
|
||||
}
|
||||
Future<ConfigGeneration> getGeneration() const { return result.getFuture(); }
|
||||
Optional<Version> getLastSeenLiveVersion() const { return lastSeenLiveVersion; }
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
class PaxosConfigTransactionImpl {
|
||||
ConfigTransactionCommitRequest toCommit;
|
||||
Future<ConfigGeneration> getGenerationFuture;
|
||||
std::vector<ConfigTransactionInterface> ctis;
|
||||
int numRetries{ 0 };
|
||||
bool committed{ false };
|
||||
Optional<Version> lastSeenLiveVersion;
|
||||
Optional<UID> dID;
|
||||
Database cx;
|
||||
|
||||
ACTOR static Future<ConfigGeneration> getGeneration(PaxosConfigTransactionImpl* self) {
|
||||
state std::vector<Future<ConfigTransactionGetGenerationReply>> getGenerationFutures;
|
||||
getGenerationFutures.reserve(self->ctis.size());
|
||||
for (auto const& cti : self->ctis) {
|
||||
getGenerationFutures.push_back(cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{}));
|
||||
state GetGenerationQuorum quorum(self->ctis.size(), self->lastSeenLiveVersion);
|
||||
state int retries = 0;
|
||||
loop {
|
||||
for (auto const& cti : self->ctis) {
|
||||
quorum.addReplyCallback(
|
||||
cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion }));
|
||||
}
|
||||
try {
|
||||
state ConfigGeneration gen = wait(quorum.getGeneration());
|
||||
wait(delay(0.0)); // Let reply callback actors finish before destructing quorum
|
||||
return gen;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_failed_to_reach_quorum) {
|
||||
TEST(true); // Failed to reach quorum getting generation
|
||||
wait(delayJittered(0.01 * (1 << retries)));
|
||||
++retries;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
self->lastSeenLiveVersion = quorum.getLastSeenLiveVersion();
|
||||
}
|
||||
// FIXME: Must tolerate failures and disagreement
|
||||
wait(waitForAll(getGenerationFutures));
|
||||
return getGenerationFutures[0].get().generation;
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<Value>> get(PaxosConfigTransactionImpl* self, Key key) {
|
||||
|
@ -48,7 +105,7 @@ class PaxosConfigTransactionImpl {
|
|||
}
|
||||
state ConfigKey configKey = ConfigKey::decodeKey(key);
|
||||
ConfigGeneration generation = wait(self->getGenerationFuture);
|
||||
// TODO: Load balance
|
||||
// TODO: Load balance, and only send request to replicas that we have gotten the current generation from
|
||||
ConfigTransactionGetReply reply =
|
||||
wait(self->ctis[0].get.getReply(ConfigTransactionGetRequest{ generation, configKey }));
|
||||
if (reply.value.present()) {
|
||||
|
|
|
@ -203,6 +203,10 @@ class ConfigNodeImpl {
|
|||
ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) {
|
||||
state ConfigGeneration generation = wait(getGeneration(self));
|
||||
++generation.liveVersion;
|
||||
if (req.lastSeenLiveVersion.present()) {
|
||||
TEST(req.lastSeenLiveVersion.get() >= generation.liveVersion); // Node is lagging behind some other node
|
||||
generation.liveVersion = std::max(generation.liveVersion, req.lastSeenLiveVersion.get() + 1);
|
||||
}
|
||||
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion())));
|
||||
wait(self->kvStore->commit());
|
||||
req.reply.send(ConfigTransactionGetGenerationReply{ generation });
|
||||
|
|
|
@ -66,13 +66,16 @@ class ConfigIncrementWorkload : public TestWorkload {
|
|||
try {
|
||||
state Reference<ISingleThreadTransaction> tr = self->getTransaction(cx);
|
||||
state int currentValue = wait(get(tr));
|
||||
ASSERT_GE(currentValue, self->lastKnownValue);
|
||||
set(tr, currentValue + 1);
|
||||
wait(delay(deterministicRandom()->random01() * 2 * self->meanSleepWithinTransactions));
|
||||
wait(tr->commit());
|
||||
ASSERT_GT(tr->getCommittedVersion(), self->lastKnownCommittedVersion);
|
||||
ASSERT_GE(currentValue, self->lastKnownValue);
|
||||
self->lastKnownCommittedVersion = tr->getCommittedVersion();
|
||||
self->lastKnownValue = currentValue + 1;
|
||||
TraceEvent("ConfigIncrementSucceeded")
|
||||
.detail("CommittedVersion", self->lastKnownCommittedVersion)
|
||||
.detail("CommittedValue", self->lastKnownValue);
|
||||
++self->transactions;
|
||||
++trsComplete;
|
||||
wait(delay(deterministicRandom()->random01() * 2 * self->meanSleepBetweenTransactions));
|
||||
|
|
|
@ -77,6 +77,7 @@ ERROR( dd_not_found, 1053, "Data distributor not found")
|
|||
ERROR( wrong_connection_file, 1054, "Connection file mismatch")
|
||||
ERROR( version_already_compacted, 1055, "The requested changes have been compacted away")
|
||||
ERROR( local_config_changed, 1056, "Local configuration file has changed. Restart and apply these changes" )
|
||||
ERROR( failed_to_reach_quorum, 1057, "Failed to reach quorum from configuration database nodes. Retry sending these requests" )
|
||||
|
||||
ERROR( broken_promise, 1100, "Broken promise" )
|
||||
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
|
||||
|
|
Loading…
Reference in New Issue