Merge pull request #6435 from sfc-gh-ljoswiak/fixes/dynamic-knobs-release-readiness
Dynamic knobs improvements
This commit is contained in:
commit
10c536c700
|
@ -22,6 +22,8 @@
|
|||
#include "fdbclient/PaxosConfigTransaction.h"
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
using ConfigTransactionInfo = ModelInterface<ConfigTransactionInterface>;
|
||||
|
||||
class CommitQuorum {
|
||||
ActorCollection actors{ false };
|
||||
std::vector<ConfigTransactionInterface> ctis;
|
||||
|
@ -224,10 +226,12 @@ class PaxosConfigTransactionImpl {
|
|||
loop {
|
||||
try {
|
||||
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
|
||||
// TODO: Load balance
|
||||
state Reference<ConfigTransactionInfo> configNodes(
|
||||
new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas(), false));
|
||||
ConfigTransactionGetReply reply =
|
||||
wait(timeoutError(self->getGenerationQuorum.getReadReplicas()[0].get.getReply(
|
||||
ConfigTransactionGetRequest{ generation, configKey }),
|
||||
wait(timeoutError(basicLoadBalance(configNodes,
|
||||
&ConfigTransactionInterface::get,
|
||||
ConfigTransactionGetRequest{ generation, configKey }),
|
||||
CLIENT_KNOBS->GET_KNOB_TIMEOUT));
|
||||
if (reply.value.present()) {
|
||||
return reply.value.get().toValue();
|
||||
|
@ -245,10 +249,12 @@ class PaxosConfigTransactionImpl {
|
|||
|
||||
ACTOR static Future<RangeResult> getConfigClasses(PaxosConfigTransactionImpl* self) {
|
||||
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
|
||||
// TODO: Load balance
|
||||
state Reference<ConfigTransactionInfo> configNodes(
|
||||
new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas(), false));
|
||||
ConfigTransactionGetConfigClassesReply reply =
|
||||
wait(retryBrokenPromise(self->getGenerationQuorum.getReadReplicas()[0].getClasses,
|
||||
ConfigTransactionGetConfigClassesRequest{ generation }));
|
||||
wait(basicLoadBalance(configNodes,
|
||||
&ConfigTransactionInterface::getClasses,
|
||||
ConfigTransactionGetConfigClassesRequest{ generation }));
|
||||
RangeResult result;
|
||||
result.reserve(result.arena(), reply.configClasses.size());
|
||||
for (const auto& configClass : reply.configClasses) {
|
||||
|
@ -259,10 +265,12 @@ class PaxosConfigTransactionImpl {
|
|||
|
||||
ACTOR static Future<RangeResult> getKnobs(PaxosConfigTransactionImpl* self, Optional<Key> configClass) {
|
||||
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
|
||||
// TODO: Load balance
|
||||
state Reference<ConfigTransactionInfo> configNodes(
|
||||
new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas(), false));
|
||||
ConfigTransactionGetKnobsReply reply =
|
||||
wait(retryBrokenPromise(self->getGenerationQuorum.getReadReplicas()[0].getKnobs,
|
||||
ConfigTransactionGetKnobsRequest{ generation, configClass }));
|
||||
wait(basicLoadBalance(configNodes,
|
||||
&ConfigTransactionInterface::getKnobs,
|
||||
ConfigTransactionGetKnobsRequest{ generation, configClass }));
|
||||
RangeResult result;
|
||||
result.reserve(result.arena(), reply.knobNames.size());
|
||||
for (const auto& knobName : reply.knobNames) {
|
||||
|
|
|
@ -719,6 +719,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( COORDINATOR_LEADER_CONNECTION_TIMEOUT, 20.0 );
|
||||
|
||||
// Dynamic Knobs (implementation)
|
||||
init( COMPACTION_INTERVAL, isSimulated ? 5.0 : 300.0 );
|
||||
init( UPDATE_NODE_TIMEOUT, 3.0 );
|
||||
init( GET_COMMITTED_VERSION_TIMEOUT, 3.0 );
|
||||
init( GET_SNAPSHOT_AND_CHANGES_TIMEOUT, 3.0 );
|
||||
|
|
|
@ -657,6 +657,7 @@ public:
|
|||
double COORDINATOR_LEADER_CONNECTION_TIMEOUT;
|
||||
|
||||
// Dynamic Knobs (implementation)
|
||||
double COMPACTION_INTERVAL;
|
||||
double UPDATE_NODE_TIMEOUT;
|
||||
double GET_COMMITTED_VERSION_TIMEOUT;
|
||||
double GET_SNAPSHOT_AND_CHANGES_TIMEOUT;
|
||||
|
|
|
@ -94,6 +94,7 @@ class ConfigBroadcasterImpl {
|
|||
|
||||
int coordinators = 0;
|
||||
std::unordered_set<NetworkAddress> activeConfigNodes;
|
||||
std::unordered_set<NetworkAddress> registrationResponses;
|
||||
bool disallowUnregistered = false;
|
||||
Promise<Void> newConfigNodesAllowed;
|
||||
|
||||
|
@ -217,6 +218,7 @@ class ConfigBroadcasterImpl {
|
|||
self->clients.erase(clientUID);
|
||||
self->clientFailures.erase(clientUID);
|
||||
self->activeConfigNodes.erase(clientAddress);
|
||||
self->registrationResponses.erase(clientAddress);
|
||||
// See comment where this promise is reset below.
|
||||
if (self->newConfigNodesAllowed.isSet()) {
|
||||
self->newConfigNodesAllowed.reset();
|
||||
|
@ -258,6 +260,7 @@ class ConfigBroadcasterImpl {
|
|||
self->newConfigNodesAllowed.reset();
|
||||
}
|
||||
}
|
||||
self->registrationResponses.insert(address);
|
||||
|
||||
if (registered) {
|
||||
if (!self->disallowUnregistered) {
|
||||
|
@ -265,9 +268,18 @@ class ConfigBroadcasterImpl {
|
|||
}
|
||||
self->activeConfigNodes.insert(address);
|
||||
self->disallowUnregistered = true;
|
||||
} else if (self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) {
|
||||
// Need to allow registration of previously unregistered nodes when
|
||||
// the cluster first starts up.
|
||||
} else if ((self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) ||
|
||||
self->coordinators - self->registrationResponses.size() <=
|
||||
self->coordinators / 2 + 1 - self->activeConfigNodes.size()) {
|
||||
// Received a registration request from an unregistered node. There
|
||||
// are two cases where we want to allow unregistered nodes to
|
||||
// register:
|
||||
// * the cluster is just starting and no nodes are registered
|
||||
// * a minority of nodes are registered and a majority are
|
||||
// unregistered. This situation should only occur in rare
|
||||
// circumstances where the cluster controller dies with only a
|
||||
// minority of config nodes having received a
|
||||
// ConfigBroadcastReadyRequest
|
||||
self->activeConfigNodes.insert(address);
|
||||
if (self->activeConfigNodes.size() >= self->coordinators / 2 + 1 &&
|
||||
self->newConfigNodesAllowed.canBeSet()) {
|
||||
|
@ -390,9 +402,9 @@ public:
|
|||
this->coordinators = coordinators.configServers.size();
|
||||
if (configDBType != ConfigDBType::DISABLED) {
|
||||
if (configDBType == ConfigDBType::SIMPLE) {
|
||||
consumer = IConfigConsumer::createSimple(coordinators, 0.5, Optional<double>{});
|
||||
consumer = IConfigConsumer::createSimple(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL);
|
||||
} else {
|
||||
consumer = IConfigConsumer::createPaxos(coordinators, 0.5, Optional<double>{});
|
||||
consumer = IConfigConsumer::createPaxos(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL);
|
||||
}
|
||||
TraceEvent(SevDebug, "ConfigBroadcasterStartingConsumer", id)
|
||||
.detail("Consumer", consumer->getID())
|
||||
|
|
|
@ -176,14 +176,16 @@ struct ConfigFollowerRollforwardRequest {
|
|||
|
||||
struct ConfigFollowerGetCommittedVersionReply {
|
||||
static constexpr FileIdentifier file_identifier = 9214735;
|
||||
Version lastCompacted;
|
||||
Version lastCommitted;
|
||||
|
||||
ConfigFollowerGetCommittedVersionReply() = default;
|
||||
explicit ConfigFollowerGetCommittedVersionReply(Version lastCommitted) : lastCommitted(lastCommitted) {}
|
||||
explicit ConfigFollowerGetCommittedVersionReply(Version lastCompacted, Version lastCommitted)
|
||||
: lastCompacted(lastCompacted), lastCommitted(lastCommitted) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, lastCommitted);
|
||||
serializer(ar, lastCompacted, lastCommitted);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -495,7 +495,7 @@ class ConfigNodeImpl {
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> rollforward(ConfigNodeImpl* self, ConfigFollowerRollforwardRequest req) {
|
||||
Version lastCompactedVersion = wait(getLastCompactedVersion(self));
|
||||
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
|
||||
if (req.lastKnownCommitted < lastCompactedVersion) {
|
||||
req.reply.sendError(version_already_compacted());
|
||||
return Void();
|
||||
|
@ -529,6 +529,10 @@ class ConfigNodeImpl {
|
|||
versionedAnnotationKey(currentGeneration.committedVersion + 1)));
|
||||
|
||||
currentGeneration.committedVersion = req.rollback.get();
|
||||
if (req.rollback.get() < lastCompactedVersion) {
|
||||
self->kvStore->set(
|
||||
KeyValueRef(lastCompactedVersionKey, BinaryWriter::toValue(req.rollback.get(), IncludeVersion())));
|
||||
}
|
||||
// The mutation commit loop below should persist the new generation
|
||||
// to disk, so we don't need to do it here.
|
||||
}
|
||||
|
@ -536,13 +540,15 @@ class ConfigNodeImpl {
|
|||
// committed version and rollforward version.
|
||||
ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion);
|
||||
wait(commitMutations(self, req.mutations, req.annotations, req.target));
|
||||
|
||||
req.reply.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) {
|
||||
state Version lastCompacted = wait(getLastCompactedVersion(self));
|
||||
ConfigGeneration generation = wait(getGeneration(self));
|
||||
req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion });
|
||||
req.reply.send(ConfigFollowerGetCommittedVersionReply{ lastCompacted, generation.committedVersion });
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
using ConfigFollowerInfo = ModelInterface<ConfigFollowerInterface>;
|
||||
|
||||
struct CommittedVersions {
|
||||
Version secondToLastCommitted;
|
||||
Version lastCommitted;
|
||||
|
@ -42,6 +44,10 @@ class GetCommittedVersionQuorum {
|
|||
std::vector<ConfigFollowerInterface> cfis;
|
||||
std::map<Version, std::vector<ConfigFollowerInterface>> replies;
|
||||
std::map<Version, Version> priorVersions;
|
||||
std::map<NetworkAddress, Version> committed;
|
||||
// Need to know the largest compacted version on any node to avoid asking
|
||||
// for changes that have already been compacted.
|
||||
Version largestCompactedResponse{ 0 };
|
||||
// Last durably committed version.
|
||||
Version lastSeenVersion;
|
||||
size_t totalRepliesReceived{ 0 };
|
||||
|
@ -58,6 +64,7 @@ class GetCommittedVersionQuorum {
|
|||
ACTOR static Future<Void> updateNode(GetCommittedVersionQuorum* self,
|
||||
CommittedVersions nodeVersion,
|
||||
CommittedVersions quorumVersion,
|
||||
Version lastCompacted,
|
||||
ConfigFollowerInterface cfi) {
|
||||
state Version target = quorumVersion.lastCommitted;
|
||||
if (nodeVersion.lastCommitted == target) {
|
||||
|
@ -79,37 +86,41 @@ class GetCommittedVersionQuorum {
|
|||
rollback = std::max(nodeVersion.lastCommitted - 1, Version{ 0 });
|
||||
}
|
||||
|
||||
if (rollback.present()) {
|
||||
// When a new ConfigBroadcaster is created, it may not know
|
||||
// about the last committed version on the ConfigNodes. If
|
||||
// compaction has occurred, this can cause change requests to
|
||||
// be sent to nodes asking for version 0 when the node has
|
||||
// already compacted that version, causing an error. Make sure
|
||||
// the rollback version is at least set to the last compacted
|
||||
// version to prevent this issue.
|
||||
rollback = std::max(rollback.get(), lastCompacted);
|
||||
}
|
||||
|
||||
// Now roll node forward to match the largest committed version of
|
||||
// the replies.
|
||||
// TODO: Load balance over quorum. Also need to catch
|
||||
// error_code_process_behind and retry with the next ConfigNode in
|
||||
// the quorum.
|
||||
state ConfigFollowerInterface quorumCfi = self->replies[target][0];
|
||||
state Reference<ConfigFollowerInfo> quorumCfi(new ConfigFollowerInfo(self->replies[target], false));
|
||||
try {
|
||||
state Version lastSeenVersion = rollback.present() ? rollback.get() : nodeVersion.lastCommitted;
|
||||
ConfigFollowerGetChangesReply reply = wait(timeoutError(
|
||||
quorumCfi.getChanges.getReply(ConfigFollowerGetChangesRequest{ lastSeenVersion, target }),
|
||||
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
|
||||
state Version lastSeenVersion = std::max(
|
||||
rollback.present() ? rollback.get() : nodeVersion.lastCommitted, self->largestCompactedResponse);
|
||||
ConfigFollowerGetChangesReply reply =
|
||||
wait(timeoutError(basicLoadBalance(quorumCfi,
|
||||
&ConfigFollowerInterface::getChanges,
|
||||
ConfigFollowerGetChangesRequest{ lastSeenVersion, target }),
|
||||
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
|
||||
wait(timeoutError(cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{
|
||||
rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }),
|
||||
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_version_already_compacted) {
|
||||
TEST(true); // PaxosConfigConsumer rollforward compacted ConfigNode
|
||||
ConfigFollowerGetSnapshotAndChangesReply reply = wait(retryBrokenPromise(
|
||||
quorumCfi.getSnapshotAndChanges, ConfigFollowerGetSnapshotAndChangesRequest{ target }));
|
||||
wait(retryBrokenPromise(
|
||||
cfi.rollforward,
|
||||
ConfigFollowerRollforwardRequest{
|
||||
rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }));
|
||||
} else if (e.code() == error_code_transaction_too_old) {
|
||||
if (e.code() == error_code_transaction_too_old) {
|
||||
// Seeing this trace is not necessarily a problem. There
|
||||
// are legitimate scenarios where a ConfigNode could return
|
||||
// transaction_too_old in response to a rollforward
|
||||
// request.
|
||||
// one of these errors in response to a get changes or
|
||||
// rollforward request. The retry loop should handle this
|
||||
// case.
|
||||
TraceEvent(SevInfo, "ConfigNodeRollforwardError").error(e);
|
||||
} else {
|
||||
throw e;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -123,6 +134,8 @@ class GetCommittedVersionQuorum {
|
|||
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
|
||||
|
||||
++self->totalRepliesReceived;
|
||||
self->largestCompactedResponse = std::max(self->largestCompactedResponse, reply.lastCompacted);
|
||||
state Version lastCompacted = reply.lastCompacted;
|
||||
self->largestCommitted = std::max(self->largestCommitted, reply.lastCommitted);
|
||||
state CommittedVersions committedVersions = CommittedVersions{ self->lastSeenVersion, reply.lastCommitted };
|
||||
if (self->priorVersions.find(committedVersions.lastCommitted) == self->priorVersions.end()) {
|
||||
|
@ -136,14 +149,15 @@ class GetCommittedVersionQuorum {
|
|||
if (self->quorumVersion.canBeSet()) {
|
||||
self->quorumVersion.send(QuorumVersion{ committedVersions, true });
|
||||
}
|
||||
wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get().versions, cfi));
|
||||
wait(self->updateNode(
|
||||
self, committedVersions, self->quorumVersion.getFuture().get().versions, lastCompacted, cfi));
|
||||
} else if (self->maxAgreement >= self->cfis.size() / 2 + 1) {
|
||||
// A quorum of ConfigNodes agree on the latest committed version,
|
||||
// but the node we just got a reply from is not one of them. We may
|
||||
// need to roll it forward or back.
|
||||
QuorumVersion quorumVersion = wait(self->quorumVersion.getFuture());
|
||||
ASSERT(committedVersions.lastCommitted != quorumVersion.versions.lastCommitted);
|
||||
wait(self->updateNode(self, committedVersions, quorumVersion.versions, cfi));
|
||||
wait(self->updateNode(self, committedVersions, quorumVersion.versions, lastCompacted, cfi));
|
||||
} else if (self->maxAgreement + (self->cfis.size() - self->totalRepliesReceived) <
|
||||
(self->cfis.size() / 2 + 1)) {
|
||||
// It is impossible to reach a quorum of ConfigNodes that agree
|
||||
|
@ -158,18 +172,25 @@ class GetCommittedVersionQuorum {
|
|||
self->quorumVersion.send(
|
||||
QuorumVersion{ CommittedVersions{ largestCommittedPrior, largestCommitted }, false });
|
||||
}
|
||||
wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get().versions, cfi));
|
||||
wait(self->updateNode(
|
||||
self, committedVersions, self->quorumVersion.getFuture().get().versions, lastCompacted, cfi));
|
||||
} else {
|
||||
// Still building up responses; don't have enough data to act on
|
||||
// yet, so wait until we do.
|
||||
QuorumVersion quorumVersion = wait(self->quorumVersion.getFuture());
|
||||
wait(self->updateNode(self, committedVersions, quorumVersion.versions, cfi));
|
||||
wait(self->updateNode(self, committedVersions, quorumVersion.versions, lastCompacted, cfi));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// Count a timeout as a reply.
|
||||
++self->totalRepliesReceived;
|
||||
if (e.code() != error_code_timed_out) {
|
||||
throw;
|
||||
if (e.code() == error_code_version_already_compacted) {
|
||||
if (self->quorumVersion.canBeSet()) {
|
||||
self->quorumVersion.sendError(e);
|
||||
}
|
||||
} else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) {
|
||||
if (self->quorumVersion.canBeSet()) {
|
||||
self->quorumVersion.sendError(e);
|
||||
}
|
||||
} else if (self->totalRepliesReceived == self->cfis.size() && self->quorumVersion.canBeSet() &&
|
||||
!self->quorumVersion.isError()) {
|
||||
size_t nonTimeoutReplies =
|
||||
|
@ -178,14 +199,10 @@ class GetCommittedVersionQuorum {
|
|||
});
|
||||
if (nonTimeoutReplies >= self->cfis.size() / 2 + 1) {
|
||||
// Make sure to trigger the quorumVersion if a timeout
|
||||
// occurred, a quorum disagree on the committed version, and
|
||||
// there are no more incoming responses. Note that this means
|
||||
// that it is impossible to reach a quorum, so send back the
|
||||
// largest committed version seen. We also need to store the
|
||||
// interface for the timed out server for future communication
|
||||
// attempts.
|
||||
auto& nodes = self->replies[self->largestCommitted];
|
||||
nodes.push_back(cfi);
|
||||
// occurred, a quorum disagree on the committed version,
|
||||
// and there are no more incoming responses. Note that this
|
||||
// means that it is impossible to reach a quorum, so send
|
||||
// back the largest committed version seen.
|
||||
self->quorumVersion.send(
|
||||
QuorumVersion{ CommittedVersions{ self->lastSeenVersion, self->largestCommitted }, false });
|
||||
} else if (!self->quorumVersion.isSet()) {
|
||||
|
@ -219,6 +236,16 @@ public:
|
|||
ASSERT(isReady());
|
||||
return replies.at(quorumVersion.getFuture().get().versions.lastCommitted);
|
||||
}
|
||||
Version getSmallestCommitted() const {
|
||||
if (committed.size() == cfis.size()) {
|
||||
Version smallest = MAX_VERSION;
|
||||
for (const auto& [key, value] : committed) {
|
||||
smallest = std::min(smallest, value);
|
||||
}
|
||||
return smallest;
|
||||
}
|
||||
return ::invalidVersion;
|
||||
}
|
||||
Future<Void> complete() const { return waitForAll(actors); }
|
||||
};
|
||||
|
||||
|
@ -226,6 +253,7 @@ class PaxosConfigConsumerImpl {
|
|||
std::vector<ConfigFollowerInterface> cfis;
|
||||
GetCommittedVersionQuorum getCommittedVersionQuorum;
|
||||
Version lastSeenVersion{ 0 };
|
||||
Version compactionVersion{ 0 };
|
||||
double pollingInterval;
|
||||
Optional<double> compactionInterval;
|
||||
UID id;
|
||||
|
@ -238,13 +266,15 @@ class PaxosConfigConsumerImpl {
|
|||
return quorumVersion.versions.lastCommitted;
|
||||
}
|
||||
|
||||
// Periodically compact knob changes on the configuration nodes. All nodes
|
||||
// must have received a version before it can be compacted.
|
||||
ACTOR static Future<Void> compactor(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
|
||||
if (!self->compactionInterval.present()) {
|
||||
wait(Never());
|
||||
return Void();
|
||||
}
|
||||
loop {
|
||||
state Version compactionVersion = self->lastSeenVersion;
|
||||
state Version compactionVersion = self->compactionVersion;
|
||||
wait(delayJittered(self->compactionInterval.get()));
|
||||
std::vector<Future<Void>> compactionRequests;
|
||||
compactionRequests.reserve(compactionRequests.size());
|
||||
|
@ -263,12 +293,14 @@ class PaxosConfigConsumerImpl {
|
|||
loop {
|
||||
self->resetCommittedVersionQuorum(); // TODO: This seems to fix a segfault, investigate more
|
||||
try {
|
||||
// TODO: Load balance
|
||||
state Version committedVersion = wait(getCommittedVersion(self));
|
||||
ConfigFollowerGetSnapshotAndChangesReply reply = wait(
|
||||
timeoutError(self->getCommittedVersionQuorum.getReadReplicas()[0].getSnapshotAndChanges.getReply(
|
||||
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }),
|
||||
SERVER_KNOBS->GET_SNAPSHOT_AND_CHANGES_TIMEOUT));
|
||||
state Reference<ConfigFollowerInfo> configNodes(
|
||||
new ConfigFollowerInfo(self->getCommittedVersionQuorum.getReadReplicas(), false));
|
||||
ConfigFollowerGetSnapshotAndChangesReply reply =
|
||||
wait(timeoutError(basicLoadBalance(configNodes,
|
||||
&ConfigFollowerInterface::getSnapshotAndChanges,
|
||||
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }),
|
||||
SERVER_KNOBS->GET_SNAPSHOT_AND_CHANGES_TIMEOUT));
|
||||
TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id)
|
||||
.detail("SnapshotVersion", reply.snapshotVersion)
|
||||
.detail("SnapshotSize", reply.snapshot.size())
|
||||
|
@ -277,6 +309,8 @@ class PaxosConfigConsumerImpl {
|
|||
.detail("AnnotationsSize", reply.annotations.size());
|
||||
ASSERT_GE(committedVersion, self->lastSeenVersion);
|
||||
self->lastSeenVersion = committedVersion;
|
||||
Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted();
|
||||
self->compactionVersion = std::max(self->compactionVersion, smallestCommitted);
|
||||
broadcaster->applySnapshotAndChanges(std::move(reply.snapshot),
|
||||
reply.snapshotVersion,
|
||||
reply.changes,
|
||||
|
@ -288,7 +322,8 @@ class PaxosConfigConsumerImpl {
|
|||
} catch (Error& e) {
|
||||
if (e.code() == error_code_failed_to_reach_quorum) {
|
||||
wait(self->getCommittedVersionQuorum.complete());
|
||||
} else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) {
|
||||
} else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise &&
|
||||
e.code() != error_code_version_already_compacted && e.code() != error_code_process_behind) {
|
||||
throw;
|
||||
}
|
||||
wait(delayJittered(0.1));
|
||||
|
@ -313,13 +348,14 @@ class PaxosConfigConsumerImpl {
|
|||
// ConfigNodes changes to 1, 1, 2, the committed version
|
||||
// returned would be 1.
|
||||
if (committedVersion > self->lastSeenVersion) {
|
||||
// TODO: Load balance to avoid always hitting the
|
||||
// node at index 0 first
|
||||
ASSERT(self->getCommittedVersionQuorum.getReadReplicas().size() >= self->cfis.size() / 2 + 1);
|
||||
ConfigFollowerGetChangesReply reply = wait(
|
||||
timeoutError(self->getCommittedVersionQuorum.getReadReplicas()[0].getChanges.getReply(
|
||||
state Reference<ConfigFollowerInfo> configNodes(
|
||||
new ConfigFollowerInfo(self->getCommittedVersionQuorum.getReadReplicas(), false));
|
||||
ConfigFollowerGetChangesReply reply = wait(timeoutError(
|
||||
basicLoadBalance(configNodes,
|
||||
&ConfigFollowerInterface::getChanges,
|
||||
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }),
|
||||
SERVER_KNOBS->FETCH_CHANGES_TIMEOUT));
|
||||
SERVER_KNOBS->FETCH_CHANGES_TIMEOUT));
|
||||
for (const auto& versionedMutation : reply.changes) {
|
||||
TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id);
|
||||
te.detail("Version", versionedMutation.version)
|
||||
|
@ -333,19 +369,20 @@ class PaxosConfigConsumerImpl {
|
|||
}
|
||||
}
|
||||
self->lastSeenVersion = committedVersion;
|
||||
Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted();
|
||||
self->compactionVersion = std::max(self->compactionVersion, smallestCommitted);
|
||||
broadcaster->applyChanges(reply.changes,
|
||||
committedVersion,
|
||||
reply.annotations,
|
||||
self->getCommittedVersionQuorum.getReadReplicas());
|
||||
// TODO: Catch error_code_process_behind and retry with
|
||||
// the next ConfigNode in the quorum.
|
||||
} else if (committedVersion == self->lastSeenVersion) {
|
||||
broadcaster->applyChanges({}, -1, {}, self->getCommittedVersionQuorum.getReadReplicas());
|
||||
}
|
||||
wait(delayJittered(self->pollingInterval));
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_version_already_compacted || e.code() == error_code_timed_out ||
|
||||
e.code() == error_code_failed_to_reach_quorum) {
|
||||
e.code() == error_code_failed_to_reach_quorum || e.code() == error_code_version_already_compacted ||
|
||||
e.code() == error_code_process_behind) {
|
||||
TEST(true); // PaxosConfigConsumer get version_already_compacted error
|
||||
if (e.code() == error_code_failed_to_reach_quorum) {
|
||||
try {
|
||||
|
@ -365,7 +402,7 @@ class PaxosConfigConsumerImpl {
|
|||
self->resetCommittedVersionQuorum();
|
||||
continue;
|
||||
} else {
|
||||
throw e;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue