diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index 20c68a6d0c..ee5b0829ad 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -22,6 +22,8 @@ #include "fdbclient/PaxosConfigTransaction.h" #include "flow/actorcompiler.h" // must be last include +using ConfigTransactionInfo = ModelInterface; + class CommitQuorum { ActorCollection actors{ false }; std::vector ctis; @@ -224,10 +226,12 @@ class PaxosConfigTransactionImpl { loop { try { ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); - // TODO: Load balance + state Reference configNodes( + new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas(), false)); ConfigTransactionGetReply reply = - wait(timeoutError(self->getGenerationQuorum.getReadReplicas()[0].get.getReply( - ConfigTransactionGetRequest{ generation, configKey }), + wait(timeoutError(basicLoadBalance(configNodes, + &ConfigTransactionInterface::get, + ConfigTransactionGetRequest{ generation, configKey }), CLIENT_KNOBS->GET_KNOB_TIMEOUT)); if (reply.value.present()) { return reply.value.get().toValue(); @@ -245,10 +249,12 @@ class PaxosConfigTransactionImpl { ACTOR static Future getConfigClasses(PaxosConfigTransactionImpl* self) { ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); - // TODO: Load balance + state Reference configNodes( + new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas(), false)); ConfigTransactionGetConfigClassesReply reply = - wait(retryBrokenPromise(self->getGenerationQuorum.getReadReplicas()[0].getClasses, - ConfigTransactionGetConfigClassesRequest{ generation })); + wait(basicLoadBalance(configNodes, + &ConfigTransactionInterface::getClasses, + ConfigTransactionGetConfigClassesRequest{ generation })); RangeResult result; result.reserve(result.arena(), reply.configClasses.size()); for (const auto& configClass : reply.configClasses) { @@ -259,10 +265,12 @@ class PaxosConfigTransactionImpl { ACTOR static Future getKnobs(PaxosConfigTransactionImpl* self, Optional configClass) { ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); - // TODO: Load balance + state Reference configNodes( + new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas(), false)); ConfigTransactionGetKnobsReply reply = - wait(retryBrokenPromise(self->getGenerationQuorum.getReadReplicas()[0].getKnobs, - ConfigTransactionGetKnobsRequest{ generation, configClass })); + wait(basicLoadBalance(configNodes, + &ConfigTransactionInterface::getKnobs, + ConfigTransactionGetKnobsRequest{ generation, configClass })); RangeResult result; result.reserve(result.arena(), reply.knobNames.size()); for (const auto& knobName : reply.knobNames) { diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index c87d9f5ce6..0f0f0de89e 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -719,6 +719,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( COORDINATOR_LEADER_CONNECTION_TIMEOUT, 20.0 ); // Dynamic Knobs (implementation) + init( COMPACTION_INTERVAL, isSimulated ? 5.0 : 300.0 ); init( UPDATE_NODE_TIMEOUT, 3.0 ); init( GET_COMMITTED_VERSION_TIMEOUT, 3.0 ); init( GET_SNAPSHOT_AND_CHANGES_TIMEOUT, 3.0 ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 257746a0c5..4c2d708274 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -657,6 +657,7 @@ public: double COORDINATOR_LEADER_CONNECTION_TIMEOUT; // Dynamic Knobs (implementation) + double COMPACTION_INTERVAL; double UPDATE_NODE_TIMEOUT; double GET_COMMITTED_VERSION_TIMEOUT; double GET_SNAPSHOT_AND_CHANGES_TIMEOUT; diff --git a/fdbserver/ConfigBroadcaster.actor.cpp b/fdbserver/ConfigBroadcaster.actor.cpp index f7467a41a0..0ba9e4d6cf 100644 --- a/fdbserver/ConfigBroadcaster.actor.cpp +++ b/fdbserver/ConfigBroadcaster.actor.cpp @@ -94,6 +94,7 @@ class ConfigBroadcasterImpl { int coordinators = 0; std::unordered_set activeConfigNodes; + std::unordered_set registrationResponses; bool disallowUnregistered = false; Promise newConfigNodesAllowed; @@ -217,6 +218,7 @@ class ConfigBroadcasterImpl { self->clients.erase(clientUID); self->clientFailures.erase(clientUID); self->activeConfigNodes.erase(clientAddress); + self->registrationResponses.erase(clientAddress); // See comment where this promise is reset below. if (self->newConfigNodesAllowed.isSet()) { self->newConfigNodesAllowed.reset(); @@ -258,6 +260,7 @@ class ConfigBroadcasterImpl { self->newConfigNodesAllowed.reset(); } } + self->registrationResponses.insert(address); if (registered) { if (!self->disallowUnregistered) { @@ -265,9 +268,18 @@ class ConfigBroadcasterImpl { } self->activeConfigNodes.insert(address); self->disallowUnregistered = true; - } else if (self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) { - // Need to allow registration of previously unregistered nodes when - // the cluster first starts up. + } else if ((self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) || + self->coordinators - self->registrationResponses.size() <= + self->coordinators / 2 + 1 - self->activeConfigNodes.size()) { + // Received a registration request from an unregistered node. There + // are two cases where we want to allow unregistered nodes to + // register: + // * the cluster is just starting and no nodes are registered + // * a minority of nodes are registered and a majority are + // unregistered. This situation should only occur in rare + // circumstances where the cluster controller dies with only a + // minority of config nodes having received a + // ConfigBroadcastReadyRequest self->activeConfigNodes.insert(address); if (self->activeConfigNodes.size() >= self->coordinators / 2 + 1 && self->newConfigNodesAllowed.canBeSet()) { @@ -390,9 +402,9 @@ public: this->coordinators = coordinators.configServers.size(); if (configDBType != ConfigDBType::DISABLED) { if (configDBType == ConfigDBType::SIMPLE) { - consumer = IConfigConsumer::createSimple(coordinators, 0.5, Optional{}); + consumer = IConfigConsumer::createSimple(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL); } else { - consumer = IConfigConsumer::createPaxos(coordinators, 0.5, Optional{}); + consumer = IConfigConsumer::createPaxos(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL); } TraceEvent(SevDebug, "ConfigBroadcasterStartingConsumer", id) .detail("Consumer", consumer->getID()) diff --git a/fdbserver/ConfigFollowerInterface.h b/fdbserver/ConfigFollowerInterface.h index da841ecbb3..5b0d264c56 100644 --- a/fdbserver/ConfigFollowerInterface.h +++ b/fdbserver/ConfigFollowerInterface.h @@ -176,14 +176,16 @@ struct ConfigFollowerRollforwardRequest { struct ConfigFollowerGetCommittedVersionReply { static constexpr FileIdentifier file_identifier = 9214735; + Version lastCompacted; Version lastCommitted; ConfigFollowerGetCommittedVersionReply() = default; - explicit ConfigFollowerGetCommittedVersionReply(Version lastCommitted) : lastCommitted(lastCommitted) {} + explicit ConfigFollowerGetCommittedVersionReply(Version lastCompacted, Version lastCommitted) + : lastCompacted(lastCompacted), lastCommitted(lastCommitted) {} template void serialize(Ar& ar) { - serializer(ar, lastCommitted); + serializer(ar, lastCompacted, lastCommitted); } }; diff --git a/fdbserver/ConfigNode.actor.cpp b/fdbserver/ConfigNode.actor.cpp index be7d9a867c..7152c8ce9b 100644 --- a/fdbserver/ConfigNode.actor.cpp +++ b/fdbserver/ConfigNode.actor.cpp @@ -495,7 +495,7 @@ class ConfigNodeImpl { } ACTOR static Future rollforward(ConfigNodeImpl* self, ConfigFollowerRollforwardRequest req) { - Version lastCompactedVersion = wait(getLastCompactedVersion(self)); + state Version lastCompactedVersion = wait(getLastCompactedVersion(self)); if (req.lastKnownCommitted < lastCompactedVersion) { req.reply.sendError(version_already_compacted()); return Void(); @@ -529,6 +529,10 @@ class ConfigNodeImpl { versionedAnnotationKey(currentGeneration.committedVersion + 1))); currentGeneration.committedVersion = req.rollback.get(); + if (req.rollback.get() < lastCompactedVersion) { + self->kvStore->set( + KeyValueRef(lastCompactedVersionKey, BinaryWriter::toValue(req.rollback.get(), IncludeVersion()))); + } // The mutation commit loop below should persist the new generation // to disk, so we don't need to do it here. } @@ -536,13 +540,15 @@ class ConfigNodeImpl { // committed version and rollforward version. ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion); wait(commitMutations(self, req.mutations, req.annotations, req.target)); + req.reply.send(Void()); return Void(); } ACTOR static Future getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) { + state Version lastCompacted = wait(getLastCompactedVersion(self)); ConfigGeneration generation = wait(getGeneration(self)); - req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion }); + req.reply.send(ConfigFollowerGetCommittedVersionReply{ lastCompacted, generation.committedVersion }); return Void(); } diff --git a/fdbserver/PaxosConfigConsumer.actor.cpp b/fdbserver/PaxosConfigConsumer.actor.cpp index ddae968d48..1ebb73be95 100644 --- a/fdbserver/PaxosConfigConsumer.actor.cpp +++ b/fdbserver/PaxosConfigConsumer.actor.cpp @@ -27,6 +27,8 @@ #include "fdbserver/Knobs.h" #include "flow/actorcompiler.h" // This must be the last #include. +using ConfigFollowerInfo = ModelInterface; + struct CommittedVersions { Version secondToLastCommitted; Version lastCommitted; @@ -42,6 +44,10 @@ class GetCommittedVersionQuorum { std::vector cfis; std::map> replies; std::map priorVersions; + std::map committed; + // Need to know the largest compacted version on any node to avoid asking + // for changes that have already been compacted. + Version largestCompactedResponse{ 0 }; // Last durably committed version. Version lastSeenVersion; size_t totalRepliesReceived{ 0 }; @@ -58,6 +64,7 @@ class GetCommittedVersionQuorum { ACTOR static Future updateNode(GetCommittedVersionQuorum* self, CommittedVersions nodeVersion, CommittedVersions quorumVersion, + Version lastCompacted, ConfigFollowerInterface cfi) { state Version target = quorumVersion.lastCommitted; if (nodeVersion.lastCommitted == target) { @@ -79,37 +86,41 @@ class GetCommittedVersionQuorum { rollback = std::max(nodeVersion.lastCommitted - 1, Version{ 0 }); } + if (rollback.present()) { + // When a new ConfigBroadcaster is created, it may not know + // about the last committed version on the ConfigNodes. If + // compaction has occurred, this can cause change requests to + // be sent to nodes asking for version 0 when the node has + // already compacted that version, causing an error. Make sure + // the rollback version is at least set to the last compacted + // version to prevent this issue. + rollback = std::max(rollback.get(), lastCompacted); + } + // Now roll node forward to match the largest committed version of // the replies. - // TODO: Load balance over quorum. Also need to catch - // error_code_process_behind and retry with the next ConfigNode in - // the quorum. - state ConfigFollowerInterface quorumCfi = self->replies[target][0]; + state Reference quorumCfi(new ConfigFollowerInfo(self->replies[target], false)); try { - state Version lastSeenVersion = rollback.present() ? rollback.get() : nodeVersion.lastCommitted; - ConfigFollowerGetChangesReply reply = wait(timeoutError( - quorumCfi.getChanges.getReply(ConfigFollowerGetChangesRequest{ lastSeenVersion, target }), - SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); + state Version lastSeenVersion = std::max( + rollback.present() ? rollback.get() : nodeVersion.lastCommitted, self->largestCompactedResponse); + ConfigFollowerGetChangesReply reply = + wait(timeoutError(basicLoadBalance(quorumCfi, + &ConfigFollowerInterface::getChanges, + ConfigFollowerGetChangesRequest{ lastSeenVersion, target }), + SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); wait(timeoutError(cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{ rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }), SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); } catch (Error& e) { - if (e.code() == error_code_version_already_compacted) { - TEST(true); // PaxosConfigConsumer rollforward compacted ConfigNode - ConfigFollowerGetSnapshotAndChangesReply reply = wait(retryBrokenPromise( - quorumCfi.getSnapshotAndChanges, ConfigFollowerGetSnapshotAndChangesRequest{ target })); - wait(retryBrokenPromise( - cfi.rollforward, - ConfigFollowerRollforwardRequest{ - rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations })); - } else if (e.code() == error_code_transaction_too_old) { + if (e.code() == error_code_transaction_too_old) { // Seeing this trace is not necessarily a problem. There // are legitimate scenarios where a ConfigNode could return - // transaction_too_old in response to a rollforward - // request. + // one of these errors in response to a get changes or + // rollforward request. The retry loop should handle this + // case. TraceEvent(SevInfo, "ConfigNodeRollforwardError").error(e); } else { - throw e; + throw; } } } @@ -123,6 +134,8 @@ class GetCommittedVersionQuorum { SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); ++self->totalRepliesReceived; + self->largestCompactedResponse = std::max(self->largestCompactedResponse, reply.lastCompacted); + state Version lastCompacted = reply.lastCompacted; self->largestCommitted = std::max(self->largestCommitted, reply.lastCommitted); state CommittedVersions committedVersions = CommittedVersions{ self->lastSeenVersion, reply.lastCommitted }; if (self->priorVersions.find(committedVersions.lastCommitted) == self->priorVersions.end()) { @@ -136,14 +149,15 @@ class GetCommittedVersionQuorum { if (self->quorumVersion.canBeSet()) { self->quorumVersion.send(QuorumVersion{ committedVersions, true }); } - wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get().versions, cfi)); + wait(self->updateNode( + self, committedVersions, self->quorumVersion.getFuture().get().versions, lastCompacted, cfi)); } else if (self->maxAgreement >= self->cfis.size() / 2 + 1) { // A quorum of ConfigNodes agree on the latest committed version, // but the node we just got a reply from is not one of them. We may // need to roll it forward or back. QuorumVersion quorumVersion = wait(self->quorumVersion.getFuture()); ASSERT(committedVersions.lastCommitted != quorumVersion.versions.lastCommitted); - wait(self->updateNode(self, committedVersions, quorumVersion.versions, cfi)); + wait(self->updateNode(self, committedVersions, quorumVersion.versions, lastCompacted, cfi)); } else if (self->maxAgreement + (self->cfis.size() - self->totalRepliesReceived) < (self->cfis.size() / 2 + 1)) { // It is impossible to reach a quorum of ConfigNodes that agree @@ -158,18 +172,25 @@ class GetCommittedVersionQuorum { self->quorumVersion.send( QuorumVersion{ CommittedVersions{ largestCommittedPrior, largestCommitted }, false }); } - wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get().versions, cfi)); + wait(self->updateNode( + self, committedVersions, self->quorumVersion.getFuture().get().versions, lastCompacted, cfi)); } else { // Still building up responses; don't have enough data to act on // yet, so wait until we do. QuorumVersion quorumVersion = wait(self->quorumVersion.getFuture()); - wait(self->updateNode(self, committedVersions, quorumVersion.versions, cfi)); + wait(self->updateNode(self, committedVersions, quorumVersion.versions, lastCompacted, cfi)); } } catch (Error& e) { // Count a timeout as a reply. ++self->totalRepliesReceived; - if (e.code() != error_code_timed_out) { - throw; + if (e.code() == error_code_version_already_compacted) { + if (self->quorumVersion.canBeSet()) { + self->quorumVersion.sendError(e); + } + } else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) { + if (self->quorumVersion.canBeSet()) { + self->quorumVersion.sendError(e); + } } else if (self->totalRepliesReceived == self->cfis.size() && self->quorumVersion.canBeSet() && !self->quorumVersion.isError()) { size_t nonTimeoutReplies = @@ -178,14 +199,10 @@ class GetCommittedVersionQuorum { }); if (nonTimeoutReplies >= self->cfis.size() / 2 + 1) { // Make sure to trigger the quorumVersion if a timeout - // occurred, a quorum disagree on the committed version, and - // there are no more incoming responses. Note that this means - // that it is impossible to reach a quorum, so send back the - // largest committed version seen. We also need to store the - // interface for the timed out server for future communication - // attempts. - auto& nodes = self->replies[self->largestCommitted]; - nodes.push_back(cfi); + // occurred, a quorum disagree on the committed version, + // and there are no more incoming responses. Note that this + // means that it is impossible to reach a quorum, so send + // back the largest committed version seen. self->quorumVersion.send( QuorumVersion{ CommittedVersions{ self->lastSeenVersion, self->largestCommitted }, false }); } else if (!self->quorumVersion.isSet()) { @@ -219,6 +236,16 @@ public: ASSERT(isReady()); return replies.at(quorumVersion.getFuture().get().versions.lastCommitted); } + Version getSmallestCommitted() const { + if (committed.size() == cfis.size()) { + Version smallest = MAX_VERSION; + for (const auto& [key, value] : committed) { + smallest = std::min(smallest, value); + } + return smallest; + } + return ::invalidVersion; + } Future complete() const { return waitForAll(actors); } }; @@ -226,6 +253,7 @@ class PaxosConfigConsumerImpl { std::vector cfis; GetCommittedVersionQuorum getCommittedVersionQuorum; Version lastSeenVersion{ 0 }; + Version compactionVersion{ 0 }; double pollingInterval; Optional compactionInterval; UID id; @@ -238,13 +266,15 @@ class PaxosConfigConsumerImpl { return quorumVersion.versions.lastCommitted; } + // Periodically compact knob changes on the configuration nodes. All nodes + // must have received a version before it can be compacted. ACTOR static Future compactor(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) { if (!self->compactionInterval.present()) { wait(Never()); return Void(); } loop { - state Version compactionVersion = self->lastSeenVersion; + state Version compactionVersion = self->compactionVersion; wait(delayJittered(self->compactionInterval.get())); std::vector> compactionRequests; compactionRequests.reserve(compactionRequests.size()); @@ -263,12 +293,14 @@ class PaxosConfigConsumerImpl { loop { self->resetCommittedVersionQuorum(); // TODO: This seems to fix a segfault, investigate more try { - // TODO: Load balance state Version committedVersion = wait(getCommittedVersion(self)); - ConfigFollowerGetSnapshotAndChangesReply reply = wait( - timeoutError(self->getCommittedVersionQuorum.getReadReplicas()[0].getSnapshotAndChanges.getReply( - ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }), - SERVER_KNOBS->GET_SNAPSHOT_AND_CHANGES_TIMEOUT)); + state Reference configNodes( + new ConfigFollowerInfo(self->getCommittedVersionQuorum.getReadReplicas(), false)); + ConfigFollowerGetSnapshotAndChangesReply reply = + wait(timeoutError(basicLoadBalance(configNodes, + &ConfigFollowerInterface::getSnapshotAndChanges, + ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }), + SERVER_KNOBS->GET_SNAPSHOT_AND_CHANGES_TIMEOUT)); TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id) .detail("SnapshotVersion", reply.snapshotVersion) .detail("SnapshotSize", reply.snapshot.size()) @@ -277,6 +309,8 @@ class PaxosConfigConsumerImpl { .detail("AnnotationsSize", reply.annotations.size()); ASSERT_GE(committedVersion, self->lastSeenVersion); self->lastSeenVersion = committedVersion; + Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted(); + self->compactionVersion = std::max(self->compactionVersion, smallestCommitted); broadcaster->applySnapshotAndChanges(std::move(reply.snapshot), reply.snapshotVersion, reply.changes, @@ -288,7 +322,8 @@ class PaxosConfigConsumerImpl { } catch (Error& e) { if (e.code() == error_code_failed_to_reach_quorum) { wait(self->getCommittedVersionQuorum.complete()); - } else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) { + } else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise && + e.code() != error_code_version_already_compacted && e.code() != error_code_process_behind) { throw; } wait(delayJittered(0.1)); @@ -313,13 +348,14 @@ class PaxosConfigConsumerImpl { // ConfigNodes changes to 1, 1, 2, the committed version // returned would be 1. if (committedVersion > self->lastSeenVersion) { - // TODO: Load balance to avoid always hitting the - // node at index 0 first ASSERT(self->getCommittedVersionQuorum.getReadReplicas().size() >= self->cfis.size() / 2 + 1); - ConfigFollowerGetChangesReply reply = wait( - timeoutError(self->getCommittedVersionQuorum.getReadReplicas()[0].getChanges.getReply( + state Reference configNodes( + new ConfigFollowerInfo(self->getCommittedVersionQuorum.getReadReplicas(), false)); + ConfigFollowerGetChangesReply reply = wait(timeoutError( + basicLoadBalance(configNodes, + &ConfigFollowerInterface::getChanges, ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }), - SERVER_KNOBS->FETCH_CHANGES_TIMEOUT)); + SERVER_KNOBS->FETCH_CHANGES_TIMEOUT)); for (const auto& versionedMutation : reply.changes) { TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id); te.detail("Version", versionedMutation.version) @@ -333,19 +369,20 @@ class PaxosConfigConsumerImpl { } } self->lastSeenVersion = committedVersion; + Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted(); + self->compactionVersion = std::max(self->compactionVersion, smallestCommitted); broadcaster->applyChanges(reply.changes, committedVersion, reply.annotations, self->getCommittedVersionQuorum.getReadReplicas()); - // TODO: Catch error_code_process_behind and retry with - // the next ConfigNode in the quorum. } else if (committedVersion == self->lastSeenVersion) { broadcaster->applyChanges({}, -1, {}, self->getCommittedVersionQuorum.getReadReplicas()); } wait(delayJittered(self->pollingInterval)); } catch (Error& e) { if (e.code() == error_code_version_already_compacted || e.code() == error_code_timed_out || - e.code() == error_code_failed_to_reach_quorum) { + e.code() == error_code_failed_to_reach_quorum || e.code() == error_code_version_already_compacted || + e.code() == error_code_process_behind) { TEST(true); // PaxosConfigConsumer get version_already_compacted error if (e.code() == error_code_failed_to_reach_quorum) { try { @@ -365,7 +402,7 @@ class PaxosConfigConsumerImpl { self->resetCommittedVersionQuorum(); continue; } else { - throw e; + throw; } } try {