Retry with well known endpoints, move last committed check to consumer
This commit is contained in:
parent
1631a1b352
commit
7357d7714c
|
@ -40,7 +40,9 @@ class CommitQuorum {
|
||||||
if (successful >= ctis.size() / 2 + 1 && result.canBeSet()) {
|
if (successful >= ctis.size() / 2 + 1 && result.canBeSet()) {
|
||||||
result.send(Void());
|
result.send(Void());
|
||||||
} else if (failed >= ctis.size() / 2 + 1 && result.canBeSet()) {
|
} else if (failed >= ctis.size() / 2 + 1 && result.canBeSet()) {
|
||||||
result.sendError(not_committed());
|
// Rollforwards could cause a version that didn't have quorum to
|
||||||
|
// commit.
|
||||||
|
result.sendError(commit_unknown_result());
|
||||||
} else {
|
} else {
|
||||||
// Check if it is possible to ever receive quorum agreement
|
// Check if it is possible to ever receive quorum agreement
|
||||||
auto totalRequestsOutstanding = ctis.size() - (failed + successful + maybeCommitted);
|
auto totalRequestsOutstanding = ctis.size() - (failed + successful + maybeCommitted);
|
||||||
|
|
|
@ -187,16 +187,14 @@ struct ConfigFollowerRollforwardRequest {
|
||||||
|
|
||||||
struct ConfigFollowerGetCommittedVersionReply {
|
struct ConfigFollowerGetCommittedVersionReply {
|
||||||
static constexpr FileIdentifier file_identifier = 9214735;
|
static constexpr FileIdentifier file_identifier = 9214735;
|
||||||
Version secondToLastCommitted;
|
|
||||||
Version lastCommitted;
|
Version lastCommitted;
|
||||||
|
|
||||||
ConfigFollowerGetCommittedVersionReply() = default;
|
ConfigFollowerGetCommittedVersionReply() = default;
|
||||||
explicit ConfigFollowerGetCommittedVersionReply(Version secondToLastCommitted, Version lastCommitted)
|
explicit ConfigFollowerGetCommittedVersionReply(Version lastCommitted) : lastCommitted(lastCommitted) {}
|
||||||
: secondToLastCommitted(secondToLastCommitted), lastCommitted(lastCommitted) {}
|
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
serializer(ar, secondToLastCommitted, lastCommitted);
|
serializer(ar, lastCommitted);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,6 @@ TEST_CASE("/fdbserver/ConfigDB/ConfigNode/Internal/versionedMutationKeyOrdering"
|
||||||
class ConfigNodeImpl {
|
class ConfigNodeImpl {
|
||||||
UID id;
|
UID id;
|
||||||
OnDemandStore kvStore;
|
OnDemandStore kvStore;
|
||||||
Version priorCommitted;
|
|
||||||
CounterCollection cc;
|
CounterCollection cc;
|
||||||
|
|
||||||
// Follower counters
|
// Follower counters
|
||||||
|
@ -376,7 +375,6 @@ class ConfigNodeImpl {
|
||||||
}
|
}
|
||||||
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||||
annotations.emplace_back_deep(annotations.arena(), req.generation.liveVersion, req.annotation);
|
annotations.emplace_back_deep(annotations.arena(), req.generation.liveVersion, req.annotation);
|
||||||
self->priorCommitted = currentGeneration.committedVersion;
|
|
||||||
wait(commitMutations(self, mutations, annotations, req.generation.liveVersion));
|
wait(commitMutations(self, mutations, annotations, req.generation.liveVersion));
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
return Void();
|
return Void();
|
||||||
|
@ -485,8 +483,6 @@ class ConfigNodeImpl {
|
||||||
versionedAnnotationKey(generation.committedVersion + 1)));
|
versionedAnnotationKey(generation.committedVersion + 1)));
|
||||||
|
|
||||||
generation.committedVersion = req.version;
|
generation.committedVersion = req.version;
|
||||||
// TODO: Set prior generation to a non-zero value?
|
|
||||||
self->priorCommitted = 0;
|
|
||||||
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion())));
|
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion())));
|
||||||
wait(self->kvStore->commit());
|
wait(self->kvStore->commit());
|
||||||
}
|
}
|
||||||
|
@ -502,7 +498,6 @@ class ConfigNodeImpl {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion);
|
ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion);
|
||||||
self->priorCommitted = currentGeneration.committedVersion;
|
|
||||||
wait(commitMutations(self, req.mutations, req.annotations, req.target));
|
wait(commitMutations(self, req.mutations, req.annotations, req.target));
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
return Void();
|
return Void();
|
||||||
|
@ -510,7 +505,7 @@ class ConfigNodeImpl {
|
||||||
|
|
||||||
ACTOR static Future<Void> getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) {
|
ACTOR static Future<Void> getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) {
|
||||||
ConfigGeneration generation = wait(getGeneration(self));
|
ConfigGeneration generation = wait(getGeneration(self));
|
||||||
req.reply.send(ConfigFollowerGetCommittedVersionReply{ self->priorCommitted, generation.committedVersion });
|
req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion });
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -548,8 +543,8 @@ class ConfigNodeImpl {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ConfigNodeImpl(std::string const& folder)
|
ConfigNodeImpl(std::string const& folder)
|
||||||
: id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), priorCommitted(0),
|
: id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), cc("ConfigNode"),
|
||||||
cc("ConfigNode"), compactRequests("CompactRequests", cc), rollbackRequests("RollbackRequests", cc),
|
compactRequests("CompactRequests", cc), rollbackRequests("RollbackRequests", cc),
|
||||||
rollforwardRequests("RollforwardRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc),
|
rollforwardRequests("RollforwardRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc),
|
||||||
failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc),
|
failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc),
|
||||||
getCommittedVersionRequests("GetCommittedVersionRequests", cc), successfulCommits("SuccessfulCommits", cc),
|
getCommittedVersionRequests("GetCommittedVersionRequests", cc), successfulCommits("SuccessfulCommits", cc),
|
||||||
|
|
|
@ -34,6 +34,8 @@ class GetCommittedVersionQuorum {
|
||||||
std::vector<ConfigFollowerInterface> cfis;
|
std::vector<ConfigFollowerInterface> cfis;
|
||||||
std::map<Version, std::vector<ConfigFollowerInterface>> replies;
|
std::map<Version, std::vector<ConfigFollowerInterface>> replies;
|
||||||
std::map<Version, Version> priorVersions; // TODO: Would be nice to combine this with `replies`
|
std::map<Version, Version> priorVersions; // TODO: Would be nice to combine this with `replies`
|
||||||
|
// Last durably committed version.
|
||||||
|
Version lastSeenVersion;
|
||||||
size_t totalRepliesReceived{ 0 };
|
size_t totalRepliesReceived{ 0 };
|
||||||
size_t maxAgreement{ 0 };
|
size_t maxAgreement{ 0 };
|
||||||
// Set to the <secondToLastCommitted, lastCommitted> versions a quorum of
|
// Set to the <secondToLastCommitted, lastCommitted> versions a quorum of
|
||||||
|
@ -44,22 +46,22 @@ class GetCommittedVersionQuorum {
|
||||||
// with the latest committed version as determined by the quorum. Should
|
// with the latest committed version as determined by the quorum. Should
|
||||||
// only be called after a committed version has been determined.
|
// only be called after a committed version has been determined.
|
||||||
ACTOR static Future<Void> updateNode(GetCommittedVersionQuorum* self,
|
ACTOR static Future<Void> updateNode(GetCommittedVersionQuorum* self,
|
||||||
Version secondToLastCommitted,
|
CommittedVersions nodeVersion,
|
||||||
Version lastCommitted,
|
|
||||||
CommittedVersions quorumVersion,
|
CommittedVersions quorumVersion,
|
||||||
ConfigFollowerInterface cfi) {
|
ConfigFollowerInterface cfi) {
|
||||||
if (lastCommitted == quorumVersion.lastCommitted) {
|
if (nodeVersion.lastCommitted == quorumVersion.lastCommitted) {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if (lastCommitted > quorumVersion.lastCommitted) {
|
if (nodeVersion.lastCommitted > quorumVersion.lastCommitted) {
|
||||||
wait(cfi.rollback.getReply(ConfigFollowerRollbackRequest{ quorumVersion.lastCommitted }));
|
wait(retryBrokenPromise(cfi.rollback, ConfigFollowerRollbackRequest{ quorumVersion.lastCommitted }));
|
||||||
} else {
|
} else {
|
||||||
if (secondToLastCommitted > quorumVersion.secondToLastCommitted) {
|
if (nodeVersion.secondToLastCommitted > quorumVersion.secondToLastCommitted) {
|
||||||
// If the non-quorum node has a last committed version less
|
// If the non-quorum node has a last committed version less
|
||||||
// than the last committed version on the quorum, but greater
|
// than the last committed version on the quorum, but greater
|
||||||
// than the second to last committed version on the quorum, it
|
// than the second to last committed version on the quorum, it
|
||||||
// needs to be rolled back before being rolled forward.
|
// needs to be rolled back before being rolled forward.
|
||||||
wait(cfi.rollback.getReply(ConfigFollowerRollbackRequest{ quorumVersion.secondToLastCommitted }));
|
wait(retryBrokenPromise(cfi.rollback,
|
||||||
|
ConfigFollowerRollbackRequest{ quorumVersion.secondToLastCommitted }));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now roll node forward to match the last committed version of the
|
// Now roll node forward to match the last committed version of the
|
||||||
|
@ -67,18 +69,22 @@ class GetCommittedVersionQuorum {
|
||||||
// TODO: Load balance over quorum
|
// TODO: Load balance over quorum
|
||||||
state ConfigFollowerInterface quorumCfi = self->replies[quorumVersion.lastCommitted][0];
|
state ConfigFollowerInterface quorumCfi = self->replies[quorumVersion.lastCommitted][0];
|
||||||
try {
|
try {
|
||||||
ConfigFollowerGetChangesReply reply = wait(quorumCfi.getChanges.getReply(
|
ConfigFollowerGetChangesReply reply = wait(retryBrokenPromise(
|
||||||
ConfigFollowerGetChangesRequest{ lastCommitted, quorumVersion.lastCommitted }));
|
quorumCfi.getChanges,
|
||||||
wait(cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{
|
ConfigFollowerGetChangesRequest{ nodeVersion.lastCommitted, quorumVersion.lastCommitted }));
|
||||||
lastCommitted, quorumVersion.lastCommitted, reply.changes, reply.annotations }));
|
wait(retryBrokenPromise(
|
||||||
|
cfi.rollforward,
|
||||||
|
ConfigFollowerRollforwardRequest{
|
||||||
|
nodeVersion.lastCommitted, quorumVersion.lastCommitted, reply.changes, reply.annotations }));
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
if (e.code() == error_code_version_already_compacted) {
|
if (e.code() == error_code_version_already_compacted) {
|
||||||
TEST(true); // PaxosConfigConsumer rollforward compacted ConfigNode
|
TEST(true); // PaxosConfigConsumer rollforward compacted ConfigNode
|
||||||
ConfigFollowerGetSnapshotAndChangesReply reply = wait(quorumCfi.getSnapshotAndChanges.getReply(
|
ConfigFollowerGetSnapshotAndChangesReply reply = wait(
|
||||||
ConfigFollowerGetSnapshotAndChangesRequest{ quorumVersion.lastCommitted }));
|
retryBrokenPromise(quorumCfi.getSnapshotAndChanges,
|
||||||
|
ConfigFollowerGetSnapshotAndChangesRequest{ quorumVersion.lastCommitted }));
|
||||||
// TODO: Send the whole snapshot to `cfi`
|
// TODO: Send the whole snapshot to `cfi`
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
// return cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{ lastCommitted,
|
// return retryBrokenPromise(cfi.rollforward, ConfigFollowerRollforwardRequest{ lastCommitted,
|
||||||
// quorumVersion.second, reply.changes, reply.annotations });
|
// quorumVersion.second, reply.changes, reply.annotations });
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -95,27 +101,25 @@ class GetCommittedVersionQuorum {
|
||||||
wait(timeoutError(cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}), 3));
|
wait(timeoutError(cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}), 3));
|
||||||
|
|
||||||
++self->totalRepliesReceived;
|
++self->totalRepliesReceived;
|
||||||
state Version priorVersion = reply.secondToLastCommitted;
|
state CommittedVersions committedVersions = CommittedVersions{ self->lastSeenVersion, reply.lastCommitted };
|
||||||
state Version version = reply.lastCommitted;
|
if (self->replies.find(committedVersions.lastCommitted) == self->replies.end()) {
|
||||||
if (self->replies.find(version) == self->replies.end()) {
|
self->priorVersions[committedVersions.lastCommitted] = self->lastSeenVersion;
|
||||||
self->replies[version] = {};
|
|
||||||
self->priorVersions[version] = priorVersion;
|
|
||||||
}
|
}
|
||||||
auto& nodes = self->replies[version];
|
auto& nodes = self->replies[committedVersions.lastCommitted];
|
||||||
nodes.push_back(cfi);
|
nodes.push_back(cfi);
|
||||||
self->maxAgreement = std::max(nodes.size(), self->maxAgreement);
|
self->maxAgreement = std::max(nodes.size(), self->maxAgreement);
|
||||||
if (nodes.size() >= self->cfis.size() / 2 + 1) {
|
if (nodes.size() >= self->cfis.size() / 2 + 1) {
|
||||||
// A quorum of ConfigNodes agree on the latest committed version.
|
// A quorum of ConfigNodes agree on the latest committed version.
|
||||||
if (self->quorumVersion.canBeSet()) {
|
if (self->quorumVersion.canBeSet()) {
|
||||||
self->quorumVersion.send(CommittedVersions{ priorVersion, version });
|
self->quorumVersion.send(committedVersions);
|
||||||
}
|
}
|
||||||
} else if (self->maxAgreement >= self->cfis.size() / 2 + 1) {
|
} else if (self->maxAgreement >= self->cfis.size() / 2 + 1) {
|
||||||
// A quorum of ConfigNodes agree on the latest committed version,
|
// A quorum of ConfigNodes agree on the latest committed version,
|
||||||
// but the node we just got a reply from is not one of them. We may
|
// but the node we just got a reply from is not one of them. We may
|
||||||
// need to roll it forward or back.
|
// need to roll it forward or back.
|
||||||
CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture());
|
CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture());
|
||||||
ASSERT(version != quorumVersion.lastCommitted);
|
ASSERT(committedVersions.lastCommitted != quorumVersion.lastCommitted);
|
||||||
wait(self->updateNode(self, priorVersion, version, quorumVersion, cfi));
|
wait(self->updateNode(self, committedVersions, quorumVersion, cfi));
|
||||||
} else if (self->maxAgreement + (self->cfis.size() - self->totalRepliesReceived) <
|
} else if (self->maxAgreement + (self->cfis.size() - self->totalRepliesReceived) <
|
||||||
(self->cfis.size() / 2 + 1)) {
|
(self->cfis.size() / 2 + 1)) {
|
||||||
// It is impossible to reach a quorum of ConfigNodes that agree
|
// It is impossible to reach a quorum of ConfigNodes that agree
|
||||||
|
@ -129,12 +133,12 @@ class GetCommittedVersionQuorum {
|
||||||
if (self->quorumVersion.canBeSet()) {
|
if (self->quorumVersion.canBeSet()) {
|
||||||
self->quorumVersion.send(CommittedVersions{ largestCommittedPrior, largestCommitted });
|
self->quorumVersion.send(CommittedVersions{ largestCommittedPrior, largestCommitted });
|
||||||
}
|
}
|
||||||
wait(self->updateNode(self, priorVersion, version, self->quorumVersion.getFuture().get(), cfi));
|
wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get(), cfi));
|
||||||
} else {
|
} else {
|
||||||
// Still building up responses; don't have enough data to act on
|
// Still building up responses; don't have enough data to act on
|
||||||
// yet, so wait until we do.
|
// yet, so wait until we do.
|
||||||
CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture());
|
CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture());
|
||||||
wait(self->updateNode(self, priorVersion, version, quorumVersion, cfi));
|
wait(self->updateNode(self, committedVersions, quorumVersion, cfi));
|
||||||
}
|
}
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
if (e.code() != error_code_timed_out) {
|
if (e.code() != error_code_timed_out) {
|
||||||
|
@ -145,7 +149,8 @@ class GetCommittedVersionQuorum {
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit GetCommittedVersionQuorum(std::vector<ConfigFollowerInterface> const& cfis) : cfis(cfis) {}
|
explicit GetCommittedVersionQuorum(std::vector<ConfigFollowerInterface> const& cfis, Version lastSeenVersion)
|
||||||
|
: cfis(cfis), lastSeenVersion(lastSeenVersion) {}
|
||||||
Future<CommittedVersions> getCommittedVersion() {
|
Future<CommittedVersions> getCommittedVersion() {
|
||||||
ASSERT(!isReady()); // ensures this function is not accidentally called before resetting state
|
ASSERT(!isReady()); // ensures this function is not accidentally called before resetting state
|
||||||
for (const auto& cfi : cfis) {
|
for (const auto& cfi : cfis) {
|
||||||
|
@ -259,7 +264,7 @@ class PaxosConfigConsumerImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void reset() { getCommittedVersionQuorum = GetCommittedVersionQuorum{ cfis }; }
|
void reset() { getCommittedVersionQuorum = GetCommittedVersionQuorum{ cfis, lastSeenVersion }; }
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Future<Void> consume(ConfigBroadcaster& broadcaster) {
|
Future<Void> consume(ConfigBroadcaster& broadcaster) {
|
||||||
|
@ -271,7 +276,7 @@ public:
|
||||||
PaxosConfigConsumerImpl(std::vector<ConfigFollowerInterface> const& cfis,
|
PaxosConfigConsumerImpl(std::vector<ConfigFollowerInterface> const& cfis,
|
||||||
double pollingInterval,
|
double pollingInterval,
|
||||||
Optional<double> compactionInterval)
|
Optional<double> compactionInterval)
|
||||||
: cfis(cfis), getCommittedVersionQuorum(cfis), pollingInterval(pollingInterval),
|
: cfis(cfis), getCommittedVersionQuorum(cfis, 0), pollingInterval(pollingInterval),
|
||||||
compactionInterval(compactionInterval), id(deterministicRandom()->randomUniqueID()) {}
|
compactionInterval(compactionInterval), id(deterministicRandom()->randomUniqueID()) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue