Fix a rare configuration database data loss bug
See the comment contained in this commit. This bug could only manifest under a specific set of circumstances: 1. A coordinator change is started 2. The coordinator change succeeds, but its action of clearing `previousCoordinatorsKey` is delayed. 3. A minority of `ConfigNode`s have an old state of the configuration database, compared to the majority. 4. A `ConfigNode` in the majority dies and permanently loses data. 5. A long delay occurs on the `PaxosConfigConsumer` when it tries to read the latest changes from the `ConfigNode`s. In the above circumstances, the `ConfigBroadcaster` could incorrectly send a snapshot of an old state of the configuration database to a majority of `ConfigNode`s. This would cause new, durable, and acknowledged commit data to be overwritten. Note that this bug only affects the configuration database (used for knob storage). It does not affect the normal keyspace.
This commit is contained in:
parent
0b7baf4437
commit
795b666e23
|
@ -962,6 +962,14 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
|
|||
if (!disableConfigDB) {
|
||||
wait(verifyConfigurationDatabaseAlive(tr->getDatabase()));
|
||||
}
|
||||
if (BUGGIFY_WITH_PROB(0.1)) {
|
||||
// Introduce a random delay in simulation to allow processes to be
|
||||
// killed before previousCoordinatorKeys has been reset. This will
|
||||
// help test scenarios where the previous configuration database
|
||||
// state has been transferred to the new coordinators but the
|
||||
// broadcaster thinks it has not been transferred.
|
||||
wait(delay(deterministicRandom()->random01() * 10));
|
||||
}
|
||||
wait(resetPreviousCoordinatorsKey(tr->getDatabase()));
|
||||
return CoordinatorsResult::SAME_NETWORK_ADDRESSES;
|
||||
}
|
||||
|
|
|
@ -218,8 +218,12 @@ class GetGenerationQuorum {
|
|||
if (self->coordinatorsChangedFuture.isReady()) {
|
||||
throw coordinators_changed();
|
||||
}
|
||||
wait(delayJittered(std::clamp(
|
||||
0.005 * (1 << std::min(retries, 30)), 0.0, CLIENT_KNOBS->TIMEOUT_RETRY_UPPER_BOUND)));
|
||||
if (deterministicRandom()->random01() < 0.95) {
|
||||
// Add some random jitter to prevent clients from
|
||||
// contending.
|
||||
wait(delayJittered(std::clamp(
|
||||
0.006 * (1 << std::min(retries, 30)), 0.0, CLIENT_KNOBS->TIMEOUT_RETRY_UPPER_BOUND)));
|
||||
}
|
||||
if (deterministicRandom()->random01() < 0.05) {
|
||||
// Randomly inject a delay of at least the generation
|
||||
// reply timeout, to try to prevent contention between
|
||||
|
|
|
@ -92,10 +92,10 @@ class ConfigBroadcasterImpl {
|
|||
|
||||
// Used to read a snapshot from the previous coordinators after a change
|
||||
// coordinators command.
|
||||
Version maxLastSeenVersion = ::invalidVersion;
|
||||
Future<Optional<Value>> previousCoordinatorsFuture;
|
||||
std::unique_ptr<IConfigConsumer> previousCoordinatorsConsumer;
|
||||
Future<Void> previousCoordinatorsSnapshotFuture;
|
||||
Version largestConfigNodeVersion{ ::invalidVersion };
|
||||
|
||||
UID id;
|
||||
CounterCollection cc;
|
||||
|
@ -106,6 +106,7 @@ class ConfigBroadcasterImpl {
|
|||
Future<Void> logger;
|
||||
|
||||
int coordinators = 0;
|
||||
std::unordered_set<NetworkAddress> registeredConfigNodes;
|
||||
std::unordered_set<NetworkAddress> activeConfigNodes;
|
||||
std::unordered_set<NetworkAddress> registrationResponses;
|
||||
std::unordered_set<NetworkAddress> registrationResponsesUnregistered;
|
||||
|
@ -268,7 +269,7 @@ class ConfigBroadcasterImpl {
|
|||
// Ask the registering ConfigNode whether it has registered in the past.
|
||||
state ConfigBroadcastRegisteredReply reply = wait(
|
||||
brokenPromiseToNever(configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{})));
|
||||
self->maxLastSeenVersion = std::max(self->maxLastSeenVersion, reply.lastSeenVersion);
|
||||
self->largestConfigNodeVersion = std::max(self->largestConfigNodeVersion, reply.lastSeenVersion);
|
||||
state bool registered = reply.registered;
|
||||
TraceEvent("ConfigBroadcasterRegisterNodeReceivedRegistrationReply", self->id)
|
||||
.detail("Address", address)
|
||||
|
@ -302,6 +303,7 @@ class ConfigBroadcasterImpl {
|
|||
int nodesTillQuorum = self->coordinators / 2 + 1 - (int)self->activeConfigNodes.size();
|
||||
|
||||
if (registered) {
|
||||
self->registeredConfigNodes.insert(address);
|
||||
self->activeConfigNodes.insert(address);
|
||||
self->disallowUnregistered = true;
|
||||
} else if ((self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) ||
|
||||
|
@ -365,6 +367,52 @@ class ConfigBroadcasterImpl {
|
|||
|
||||
state bool sendSnapshot =
|
||||
self->previousCoordinatorsConsumer && reply.lastSeenVersion <= self->mostRecentVersion;
|
||||
|
||||
// If a coordinator change is ongoing, a quorum of ConfigNodes are
|
||||
// already registered and the largest version at least one of those
|
||||
// ConfigNodes knows about is greater than the version of the latest
|
||||
// snapshot the broadcaster has, don't send a snapshot to any
|
||||
// ConfigNodes. This could end up overwriting committed data. Consider
|
||||
// the following scenario, with three ConfigNodes:
|
||||
//
|
||||
// T=0:
|
||||
// A: v5
|
||||
// T=1:
|
||||
// change coordinators, new coordinators are B, C, D
|
||||
// T=2:
|
||||
// B: v5, C: v5, D: v5
|
||||
// T=3:
|
||||
// B: v5, C: v10, D: v10
|
||||
// (some commits happen on only C and D)
|
||||
// (previousCoordinatorsKey has not been cleared yet)
|
||||
// T=4:
|
||||
// D dies and loses its data
|
||||
// T=5:
|
||||
// D starts
|
||||
// B: v5 (registered=yes), C: v10 (registered=yes), D: v0 (registered=no)
|
||||
// Broadcaster: has an old snapshot, only knows about v5
|
||||
// self->mostRecentVersion=5
|
||||
// T=6:
|
||||
// B, C, D (re-)register with broadcaster
|
||||
//
|
||||
// At T=5, the broadcaster would send snapshots to B and D because the
|
||||
// largest version they know about (5) is less than or equal to
|
||||
// self->mostRecentVersion (5). But this would cause a majority of
|
||||
// nodes to think v5 is the latest committed version, causing C to be
|
||||
// rolled back, and losing commit data between versions 5 and 10.
|
||||
//
|
||||
// This is a special case where the coordinators are being changed.
|
||||
// During a coordinator change, a majority of ConfigNodes being
|
||||
// registered means the coordinator change already took place, and it
|
||||
// is being retried due to some failure. In that case, we don't want to
|
||||
// resend snapshots if a majority of the new ConfigNodes are
|
||||
// registered, because they could have been accepting commits. Instead,
|
||||
// let the rollback/rollforward algorithm update the out of date nodes.
|
||||
if (self->previousCoordinatorsConsumer && self->largestConfigNodeVersion > self->mostRecentVersion &&
|
||||
self->registeredConfigNodes.size() >= self->coordinators / 2 + 1) {
|
||||
sendSnapshot = false;
|
||||
}
|
||||
|
||||
// Unregistered nodes need to wait for either:
|
||||
// 1. A quorum of registered nodes to register and send their
|
||||
// snapshots, so the unregistered nodes can be rolled forward, or
|
||||
|
|
|
@ -234,10 +234,13 @@ class ConfigNodeImpl {
|
|||
req.reply.sendError(process_behind()); // Reuse the process_behind error
|
||||
return Void();
|
||||
}
|
||||
if (BUGGIFY) {
|
||||
wait(delay(deterministicRandom()->random01() * 2));
|
||||
}
|
||||
state Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
|
||||
wait(getMutations(self, req.lastSeenVersion + 1, committedVersion));
|
||||
wait(getMutations(self, req.lastSeenVersion + 1, req.mostRecentVersion));
|
||||
state Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> versionedAnnotations =
|
||||
wait(getAnnotations(self, req.lastSeenVersion + 1, committedVersion));
|
||||
wait(getAnnotations(self, req.lastSeenVersion + 1, req.mostRecentVersion));
|
||||
TraceEvent(SevInfo, "ConfigNodeSendingChanges", self->id)
|
||||
.detail("ReqLastSeenVersion", req.lastSeenVersion)
|
||||
.detail("ReqMostRecentVersion", req.mostRecentVersion)
|
||||
|
@ -245,7 +248,7 @@ class ConfigNodeImpl {
|
|||
.detail("NumMutations", versionedMutations.size())
|
||||
.detail("NumCommits", versionedAnnotations.size());
|
||||
++self->successfulChangeRequests;
|
||||
req.reply.send(ConfigFollowerGetChangesReply{ committedVersion, versionedMutations, versionedAnnotations });
|
||||
req.reply.send(ConfigFollowerGetChangesReply{ versionedMutations, versionedAnnotations });
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -487,12 +487,12 @@ class PaxosConfigConsumerImpl {
|
|||
.detail("LargestLiveVersion", self->getCommittedVersionQuorum.getLargestLive())
|
||||
.detail("SmallestCommitted", smallestCommitted);
|
||||
ASSERT_GE(committedVersion, self->lastSeenVersion);
|
||||
self->lastSeenVersion = committedVersion;
|
||||
self->lastSeenVersion = std::max(self->lastSeenVersion, committedVersion);
|
||||
self->compactionVersion = std::max(self->compactionVersion, smallestCommitted);
|
||||
broadcaster->applySnapshotAndChanges(std::move(reply.snapshot),
|
||||
reply.snapshotVersion,
|
||||
reply.changes,
|
||||
committedVersion,
|
||||
self->lastSeenVersion,
|
||||
reply.annotations,
|
||||
self->getCommittedVersionQuorum.getReadReplicas(),
|
||||
self->getCommittedVersionQuorum.getLargestLive(),
|
||||
|
@ -534,6 +534,13 @@ class PaxosConfigConsumerImpl {
|
|||
if (committedVersion > self->lastSeenVersion) {
|
||||
ASSERT(self->getCommittedVersionQuorum.getReadReplicas().size() >= self->cfis.size() / 2 + 1 ||
|
||||
self->getCommittedVersionQuorum.isSpecialZeroQuorum());
|
||||
if (BUGGIFY) {
|
||||
// Inject a random delay between getting the committed
|
||||
// version and reading any changes. The goal is to
|
||||
// allow attrition to occasionally kill ConfigNodes in
|
||||
// this in-between state.
|
||||
wait(delay(deterministicRandom()->random01() * 5));
|
||||
}
|
||||
state std::vector<ConfigFollowerInterface> readReplicas =
|
||||
self->getCommittedVersionQuorum.getReadReplicas();
|
||||
std::vector<Future<Void>> fs;
|
||||
|
@ -567,7 +574,7 @@ class PaxosConfigConsumerImpl {
|
|||
Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted();
|
||||
self->compactionVersion = std::max(self->compactionVersion, smallestCommitted);
|
||||
broadcaster->applyChanges(reply.changes,
|
||||
committedVersion,
|
||||
self->lastSeenVersion,
|
||||
reply.annotations,
|
||||
self->getCommittedVersionQuorum.getReadReplicas());
|
||||
} else if (committedVersion == self->lastSeenVersion) {
|
||||
|
|
|
@ -110,8 +110,7 @@ struct ConfigFollowerGetChangesReply {
|
|||
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||
|
||||
ConfigFollowerGetChangesReply() = default;
|
||||
explicit ConfigFollowerGetChangesReply(Version mostRecentVersion,
|
||||
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
|
||||
explicit ConfigFollowerGetChangesReply(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
|
||||
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations)
|
||||
: changes(changes), annotations(annotations) {}
|
||||
|
||||
|
|
Loading…
Reference in New Issue