Rollforward out of date nodes, compaction fixes

This commit is contained in:
Lukas Joswiak 2022-03-13 22:44:41 -07:00
parent 803fc86e61
commit d0da6c63c1
1 changed files with 55 additions and 8 deletions

View File

@ -44,6 +44,7 @@ class GetCommittedVersionQuorum {
std::vector<ConfigFollowerInterface> cfis; std::vector<ConfigFollowerInterface> cfis;
std::map<Version, std::vector<ConfigFollowerInterface>> replies; std::map<Version, std::vector<ConfigFollowerInterface>> replies;
std::map<Version, Version> priorVersions; std::map<Version, Version> priorVersions;
std::map<NetworkAddress, Version> committed;
// Last durably committed version. // Last durably committed version.
Version lastSeenVersion; Version lastSeenVersion;
size_t totalRepliesReceived{ 0 }; size_t totalRepliesReceived{ 0 };
@ -95,18 +96,42 @@ class GetCommittedVersionQuorum {
rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }), rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }),
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
} catch (Error& e) { } catch (Error& e) {
if (e.code() == error_code_transaction_too_old) { if (e.code() == error_code_version_already_compacted || e.code() == error_code_process_behind) {
// In the case of an already_compacted or process_behind
// error, fetch the latest snapshot and attempt to roll the
// node forward.
TEST(true); // PaxosConfigConsumer rollforward compacted or behind ConfigNode
try {
ConfigFollowerGetSnapshotAndChangesReply reply =
wait(timeoutError(basicLoadBalance(quorumCfi,
&ConfigFollowerInterface::getSnapshotAndChanges,
ConfigFollowerGetSnapshotAndChangesRequest{ target }),
SERVER_KNOBS->GET_SNAPSHOT_AND_CHANGES_TIMEOUT));
if (reply.changes.size() == 0 || reply.changes[reply.changes.size() - 1].version < target) {
return Void();
}
int64_t rollbackTo = reply.changes[0].version - 1;
if (rollback.present()) {
rollbackTo = std::min(rollbackTo, rollback.get());
}
wait(timeoutError(
cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{
rollbackTo, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }),
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
} catch (Error& e2) {
if (e2.code() != error_code_transaction_too_old) {
throw;
}
}
} else if (e.code() == error_code_transaction_too_old) {
// Seeing this trace is not necessarily a problem. There // Seeing this trace is not necessarily a problem. There
// are legitimate scenarios where a ConfigNode could return // are legitimate scenarios where a ConfigNode could return
// transaction_too_old in response to a rollforward // transaction_too_old in response to a rollforward
// request. // request.
TraceEvent(SevInfo, "ConfigNodeRollforwardError").error(e); TraceEvent(SevInfo, "ConfigNodeRollforwardError").error(e);
} else { } else {
// In the case of an already_compacted error, the retry
// loop will fetch the latest snapshot and a rollforward
// request will eventually be resent.
TEST(e.code() ==
error_code_version_already_compacted); // PaxosConfigConsumer rollforward compacted ConfigNode
throw; throw;
} }
} }
@ -119,6 +144,7 @@ class GetCommittedVersionQuorum {
ConfigFollowerGetCommittedVersionReply reply = ConfigFollowerGetCommittedVersionReply reply =
wait(timeoutError(cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}), wait(timeoutError(cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}),
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
self->committed[cfi.address()] = reply.lastCommitted;
++self->totalRepliesReceived; ++self->totalRepliesReceived;
self->largestCommitted = std::max(self->largestCommitted, reply.lastCommitted); self->largestCommitted = std::max(self->largestCommitted, reply.lastCommitted);
@ -166,7 +192,11 @@ class GetCommittedVersionQuorum {
} catch (Error& e) { } catch (Error& e) {
// Count a timeout as a reply. // Count a timeout as a reply.
++self->totalRepliesReceived; ++self->totalRepliesReceived;
if (e.code() != error_code_timed_out) { if (e.code() == error_code_version_already_compacted) {
if (self->quorumVersion.canBeSet()) {
self->quorumVersion.sendError(e);
}
} else if (e.code() != error_code_timed_out) {
throw; throw;
} else if (self->totalRepliesReceived == self->cfis.size() && self->quorumVersion.canBeSet() && } else if (self->totalRepliesReceived == self->cfis.size() && self->quorumVersion.canBeSet() &&
!self->quorumVersion.isError()) { !self->quorumVersion.isError()) {
@ -217,6 +247,16 @@ public:
ASSERT(isReady()); ASSERT(isReady());
return replies.at(quorumVersion.getFuture().get().versions.lastCommitted); return replies.at(quorumVersion.getFuture().get().versions.lastCommitted);
} }
Version getSmallestCommitted() const {
if (committed.size() == cfis.size()) {
Version smallest = MAX_VERSION;
for (const auto& [key, value] : committed) {
smallest = std::min(smallest, value);
}
return smallest;
}
return invalidVersion;
}
Future<Void> complete() const { return waitForAll(actors); } Future<Void> complete() const { return waitForAll(actors); }
}; };
@ -224,6 +264,7 @@ class PaxosConfigConsumerImpl {
std::vector<ConfigFollowerInterface> cfis; std::vector<ConfigFollowerInterface> cfis;
GetCommittedVersionQuorum getCommittedVersionQuorum; GetCommittedVersionQuorum getCommittedVersionQuorum;
Version lastSeenVersion{ 0 }; Version lastSeenVersion{ 0 };
Version compactionVersion{ 0 };
double pollingInterval; double pollingInterval;
Optional<double> compactionInterval; Optional<double> compactionInterval;
UID id; UID id;
@ -236,13 +277,15 @@ class PaxosConfigConsumerImpl {
return quorumVersion.versions.lastCommitted; return quorumVersion.versions.lastCommitted;
} }
// Periodically compact knob changes on the configuration nodes. All nodes
// must have received a version before it can be compacted.
ACTOR static Future<Void> compactor(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) { ACTOR static Future<Void> compactor(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
if (!self->compactionInterval.present()) { if (!self->compactionInterval.present()) {
wait(Never()); wait(Never());
return Void(); return Void();
} }
loop { loop {
state Version compactionVersion = self->lastSeenVersion; state Version compactionVersion = self->compactionVersion;
wait(delayJittered(self->compactionInterval.get())); wait(delayJittered(self->compactionInterval.get()));
std::vector<Future<Void>> compactionRequests; std::vector<Future<Void>> compactionRequests;
compactionRequests.reserve(compactionRequests.size()); compactionRequests.reserve(compactionRequests.size());
@ -277,6 +320,8 @@ class PaxosConfigConsumerImpl {
.detail("AnnotationsSize", reply.annotations.size()); .detail("AnnotationsSize", reply.annotations.size());
ASSERT_GE(committedVersion, self->lastSeenVersion); ASSERT_GE(committedVersion, self->lastSeenVersion);
self->lastSeenVersion = committedVersion; self->lastSeenVersion = committedVersion;
Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted();
self->compactionVersion = std::max(self->compactionVersion, smallestCommitted);
broadcaster->applySnapshotAndChanges(std::move(reply.snapshot), broadcaster->applySnapshotAndChanges(std::move(reply.snapshot),
reply.snapshotVersion, reply.snapshotVersion,
reply.changes, reply.changes,
@ -334,6 +379,8 @@ class PaxosConfigConsumerImpl {
} }
} }
self->lastSeenVersion = committedVersion; self->lastSeenVersion = committedVersion;
Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted();
self->compactionVersion = std::max(self->compactionVersion, smallestCommitted);
broadcaster->applyChanges(reply.changes, broadcaster->applyChanges(reply.changes,
committedVersion, committedVersion,
reply.annotations, reply.annotations,