diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index 1d290fb95d..d70b66ce7a 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -40,7 +40,9 @@ class CommitQuorum { if (successful >= ctis.size() / 2 + 1 && result.canBeSet()) { result.send(Void()); } else if (failed >= ctis.size() / 2 + 1 && result.canBeSet()) { - result.sendError(not_committed()); + // Rollforwards could cause a version that didn't have quorum to + // commit. + result.sendError(commit_unknown_result()); } else { // Check if it is possible to ever receive quorum agreement auto totalRequestsOutstanding = ctis.size() - (failed + successful + maybeCommitted); diff --git a/fdbserver/ConfigFollowerInterface.h b/fdbserver/ConfigFollowerInterface.h index 85789c4754..fbb4873469 100644 --- a/fdbserver/ConfigFollowerInterface.h +++ b/fdbserver/ConfigFollowerInterface.h @@ -187,16 +187,14 @@ struct ConfigFollowerRollforwardRequest { struct ConfigFollowerGetCommittedVersionReply { static constexpr FileIdentifier file_identifier = 9214735; - Version secondToLastCommitted; Version lastCommitted; ConfigFollowerGetCommittedVersionReply() = default; - explicit ConfigFollowerGetCommittedVersionReply(Version secondToLastCommitted, Version lastCommitted) - : secondToLastCommitted(secondToLastCommitted), lastCommitted(lastCommitted) {} + explicit ConfigFollowerGetCommittedVersionReply(Version lastCommitted) : lastCommitted(lastCommitted) {} template void serialize(Ar& ar) { - serializer(ar, secondToLastCommitted, lastCommitted); + serializer(ar, lastCommitted); } }; diff --git a/fdbserver/ConfigNode.actor.cpp b/fdbserver/ConfigNode.actor.cpp index 11f5133017..1b4c9eaa3e 100644 --- a/fdbserver/ConfigNode.actor.cpp +++ b/fdbserver/ConfigNode.actor.cpp @@ -96,7 +96,6 @@ TEST_CASE("/fdbserver/ConfigDB/ConfigNode/Internal/versionedMutationKeyOrdering" class ConfigNodeImpl { UID id; OnDemandStore kvStore; - Version priorCommitted; CounterCollection cc; // Follower counters @@ -376,7 +375,6 @@ class ConfigNodeImpl { } Standalone> annotations; annotations.emplace_back_deep(annotations.arena(), req.generation.liveVersion, req.annotation); - self->priorCommitted = currentGeneration.committedVersion; wait(commitMutations(self, mutations, annotations, req.generation.liveVersion)); req.reply.send(Void()); return Void(); @@ -485,8 +483,6 @@ class ConfigNodeImpl { versionedAnnotationKey(generation.committedVersion + 1))); generation.committedVersion = req.version; - // TODO: Set prior generation to a non-zero value? - self->priorCommitted = 0; self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion()))); wait(self->kvStore->commit()); } @@ -502,7 +498,6 @@ class ConfigNodeImpl { return Void(); } ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion); - self->priorCommitted = currentGeneration.committedVersion; wait(commitMutations(self, req.mutations, req.annotations, req.target)); req.reply.send(Void()); return Void(); @@ -510,7 +505,7 @@ class ConfigNodeImpl { ACTOR static Future getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) { ConfigGeneration generation = wait(getGeneration(self)); - req.reply.send(ConfigFollowerGetCommittedVersionReply{ self->priorCommitted, generation.committedVersion }); + req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion }); return Void(); } @@ -548,8 +543,8 @@ class ConfigNodeImpl { public: ConfigNodeImpl(std::string const& folder) - : id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), priorCommitted(0), - cc("ConfigNode"), compactRequests("CompactRequests", cc), rollbackRequests("RollbackRequests", cc), + : id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), cc("ConfigNode"), + compactRequests("CompactRequests", cc), rollbackRequests("RollbackRequests", cc), rollforwardRequests("RollforwardRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc), failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc), getCommittedVersionRequests("GetCommittedVersionRequests", cc), successfulCommits("SuccessfulCommits", cc), diff --git a/fdbserver/PaxosConfigConsumer.actor.cpp b/fdbserver/PaxosConfigConsumer.actor.cpp index 3949e33dbf..e092ad9b1b 100644 --- a/fdbserver/PaxosConfigConsumer.actor.cpp +++ b/fdbserver/PaxosConfigConsumer.actor.cpp @@ -34,6 +34,8 @@ class GetCommittedVersionQuorum { std::vector cfis; std::map> replies; std::map priorVersions; // TODO: Would be nice to combine this with `replies` + // Last durably committed version. + Version lastSeenVersion; size_t totalRepliesReceived{ 0 }; size_t maxAgreement{ 0 }; // Set to the versions a quorum of @@ -44,22 +46,22 @@ class GetCommittedVersionQuorum { // with the latest committed version as determined by the quorum. Should // only be called after a committed version has been determined. ACTOR static Future updateNode(GetCommittedVersionQuorum* self, - Version secondToLastCommitted, - Version lastCommitted, + CommittedVersions nodeVersion, CommittedVersions quorumVersion, ConfigFollowerInterface cfi) { - if (lastCommitted == quorumVersion.lastCommitted) { + if (nodeVersion.lastCommitted == quorumVersion.lastCommitted) { return Void(); } - if (lastCommitted > quorumVersion.lastCommitted) { - wait(cfi.rollback.getReply(ConfigFollowerRollbackRequest{ quorumVersion.lastCommitted })); + if (nodeVersion.lastCommitted > quorumVersion.lastCommitted) { + wait(retryBrokenPromise(cfi.rollback, ConfigFollowerRollbackRequest{ quorumVersion.lastCommitted })); } else { - if (secondToLastCommitted > quorumVersion.secondToLastCommitted) { + if (nodeVersion.secondToLastCommitted > quorumVersion.secondToLastCommitted) { // If the non-quorum node has a last committed version less // than the last committed version on the quorum, but greater // than the second to last committed version on the quorum, it // needs to be rolled back before being rolled forward. - wait(cfi.rollback.getReply(ConfigFollowerRollbackRequest{ quorumVersion.secondToLastCommitted })); + wait(retryBrokenPromise(cfi.rollback, + ConfigFollowerRollbackRequest{ quorumVersion.secondToLastCommitted })); } // Now roll node forward to match the last committed version of the @@ -67,18 +69,22 @@ class GetCommittedVersionQuorum { // TODO: Load balance over quorum state ConfigFollowerInterface quorumCfi = self->replies[quorumVersion.lastCommitted][0]; try { - ConfigFollowerGetChangesReply reply = wait(quorumCfi.getChanges.getReply( - ConfigFollowerGetChangesRequest{ lastCommitted, quorumVersion.lastCommitted })); - wait(cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{ - lastCommitted, quorumVersion.lastCommitted, reply.changes, reply.annotations })); + ConfigFollowerGetChangesReply reply = wait(retryBrokenPromise( + quorumCfi.getChanges, + ConfigFollowerGetChangesRequest{ nodeVersion.lastCommitted, quorumVersion.lastCommitted })); + wait(retryBrokenPromise( + cfi.rollforward, + ConfigFollowerRollforwardRequest{ + nodeVersion.lastCommitted, quorumVersion.lastCommitted, reply.changes, reply.annotations })); } catch (Error& e) { if (e.code() == error_code_version_already_compacted) { TEST(true); // PaxosConfigConsumer rollforward compacted ConfigNode - ConfigFollowerGetSnapshotAndChangesReply reply = wait(quorumCfi.getSnapshotAndChanges.getReply( - ConfigFollowerGetSnapshotAndChangesRequest{ quorumVersion.lastCommitted })); + ConfigFollowerGetSnapshotAndChangesReply reply = wait( + retryBrokenPromise(quorumCfi.getSnapshotAndChanges, + ConfigFollowerGetSnapshotAndChangesRequest{ quorumVersion.lastCommitted })); // TODO: Send the whole snapshot to `cfi` ASSERT(false); - // return cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{ lastCommitted, + // return retryBrokenPromise(cfi.rollforward, ConfigFollowerRollforwardRequest{ lastCommitted, // quorumVersion.second, reply.changes, reply.annotations }); } else { throw e; @@ -95,27 +101,25 @@ class GetCommittedVersionQuorum { wait(timeoutError(cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}), 3)); ++self->totalRepliesReceived; - state Version priorVersion = reply.secondToLastCommitted; - state Version version = reply.lastCommitted; - if (self->replies.find(version) == self->replies.end()) { - self->replies[version] = {}; - self->priorVersions[version] = priorVersion; + state CommittedVersions committedVersions = CommittedVersions{ self->lastSeenVersion, reply.lastCommitted }; + if (self->replies.find(committedVersions.lastCommitted) == self->replies.end()) { + self->priorVersions[committedVersions.lastCommitted] = self->lastSeenVersion; } - auto& nodes = self->replies[version]; + auto& nodes = self->replies[committedVersions.lastCommitted]; nodes.push_back(cfi); self->maxAgreement = std::max(nodes.size(), self->maxAgreement); if (nodes.size() >= self->cfis.size() / 2 + 1) { // A quorum of ConfigNodes agree on the latest committed version. if (self->quorumVersion.canBeSet()) { - self->quorumVersion.send(CommittedVersions{ priorVersion, version }); + self->quorumVersion.send(committedVersions); } } else if (self->maxAgreement >= self->cfis.size() / 2 + 1) { // A quorum of ConfigNodes agree on the latest committed version, // but the node we just got a reply from is not one of them. We may // need to roll it forward or back. CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture()); - ASSERT(version != quorumVersion.lastCommitted); - wait(self->updateNode(self, priorVersion, version, quorumVersion, cfi)); + ASSERT(committedVersions.lastCommitted != quorumVersion.lastCommitted); + wait(self->updateNode(self, committedVersions, quorumVersion, cfi)); } else if (self->maxAgreement + (self->cfis.size() - self->totalRepliesReceived) < (self->cfis.size() / 2 + 1)) { // It is impossible to reach a quorum of ConfigNodes that agree @@ -129,12 +133,12 @@ class GetCommittedVersionQuorum { if (self->quorumVersion.canBeSet()) { self->quorumVersion.send(CommittedVersions{ largestCommittedPrior, largestCommitted }); } - wait(self->updateNode(self, priorVersion, version, self->quorumVersion.getFuture().get(), cfi)); + wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get(), cfi)); } else { // Still building up responses; don't have enough data to act on // yet, so wait until we do. CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture()); - wait(self->updateNode(self, priorVersion, version, quorumVersion, cfi)); + wait(self->updateNode(self, committedVersions, quorumVersion, cfi)); } } catch (Error& e) { if (e.code() != error_code_timed_out) { @@ -145,7 +149,8 @@ class GetCommittedVersionQuorum { } public: - explicit GetCommittedVersionQuorum(std::vector const& cfis) : cfis(cfis) {} + explicit GetCommittedVersionQuorum(std::vector const& cfis, Version lastSeenVersion) + : cfis(cfis), lastSeenVersion(lastSeenVersion) {} Future getCommittedVersion() { ASSERT(!isReady()); // ensures this function is not accidentally called before resetting state for (const auto& cfi : cfis) { @@ -259,7 +264,7 @@ class PaxosConfigConsumerImpl { } } - void reset() { getCommittedVersionQuorum = GetCommittedVersionQuorum{ cfis }; } + void reset() { getCommittedVersionQuorum = GetCommittedVersionQuorum{ cfis, lastSeenVersion }; } public: Future consume(ConfigBroadcaster& broadcaster) { @@ -271,7 +276,7 @@ public: PaxosConfigConsumerImpl(std::vector const& cfis, double pollingInterval, Optional compactionInterval) - : cfis(cfis), getCommittedVersionQuorum(cfis), pollingInterval(pollingInterval), + : cfis(cfis), getCommittedVersionQuorum(cfis, 0), pollingInterval(pollingInterval), compactionInterval(compactionInterval), id(deterministicRandom()->randomUniqueID()) {} };