Add support for changing coordinators to the configuration database

Configuration database data lives on the coordinators. When a change
coordinators command is issued, the data must be sent to the new
coordinators to keep the database consistent.
This commit is contained in:
Lukas Joswiak 2022-06-07 14:49:02 -07:00
parent b641bd6c04
commit 74ac617a34
36 changed files with 1047 additions and 294 deletions

View File

@ -844,7 +844,7 @@ ACTOR Future<Optional<ClusterConnectionString>> getClusterConnectionStringFromSt
// equal to one of the previously issued requests, there is a bug
// and we are breaking the promises we make with
// commit_unknown_result (the transaction must no longer be in
// progress when receiving this error).
// progress when receiving commit_unknown_result).
int n = connectionStrings.size() > 0 ? connectionStrings.size() - 1 : 0; // avoid underflow
for (int i = 0; i < n; ++i) {
ASSERT(currentKey.get() != connectionStrings.at(i));
@ -872,12 +872,58 @@ ACTOR Future<Optional<ClusterConnectionString>> getClusterConnectionStringFromSt
}
}
ACTOR Future<Void> verifyConfigurationDatabaseAlive(Database cx) {
state Backoff backoff;
loop {
try {
// Attempt to read a random value from the configuration
// database to make sure it is online.
state Reference<ISingleThreadTransaction> configTr =
ISingleThreadTransaction::create(ISingleThreadTransaction::Type::PAXOS_CONFIG, cx);
Tuple tuple;
tuple.appendNull(); // config class
tuple << "test"_sr;
Optional<Value> serializedValue = wait(configTr->get(tuple.pack()));
TraceEvent("ChangeQuorumCheckerNewCoordinatorsOnline").log();
return Void();
} catch (Error& e) {
TraceEvent("ChangeQuorumCheckerNewCoordinatorsError").error(e);
if (e.code() == error_code_coordinators_changed) {
wait(backoff.onError());
configTr->reset();
} else {
wait(configTr->onError(e));
}
}
}
}
ACTOR Future<Void> resetPreviousCoordinatorsKey(Database cx) {
loop {
// When the change coordinators transaction succeeds, it uses the
// special key space error message to return a message to the client.
// This causes the underlying transaction to not be committed. In order
// to make sure we clear the previous coordinators key, we have to use
// a new transaction here.
state Reference<ISingleThreadTransaction> clearTr =
ISingleThreadTransaction::create(ISingleThreadTransaction::Type::RYW, cx);
try {
clearTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
clearTr->clear(previousCoordinatorsKey);
wait(clearTr->commit());
return Void();
} catch (Error& e2) {
wait(clearTr->onError(e2));
}
}
}
} // namespace
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
ClusterConnectionString* conn,
std::string newName) {
TraceEvent("ChangeQuorumCheckerStart").detail("NewConnectionString", conn->toString());
state Optional<ClusterConnectionString> clusterConnectionStringOptional =
wait(getClusterConnectionStringFromStorageServer(tr));
@ -892,7 +938,7 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
conn->hostnames = old.hostnames;
conn->coords = old.coords;
}
std::vector<NetworkAddress> desiredCoordinators = wait(conn->tryResolveHostnames());
state std::vector<NetworkAddress> desiredCoordinators = wait(conn->tryResolveHostnames());
if (desiredCoordinators.size() != conn->hostnames.size() + conn->coords.size()) {
TraceEvent("ChangeQuorumCheckerEarlyTermination")
.detail("Reason", "One or more hostnames are unresolvable")
@ -909,6 +955,8 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
std::sort(old.coords.begin(), old.coords.end());
if (conn->hostnames == old.hostnames && conn->coords == old.coords && old.clusterKeyName() == newName) {
connectionStrings.clear();
wait(verifyConfigurationDatabaseAlive(tr->getDatabase()));
wait(resetPreviousCoordinatorsKey(tr->getDatabase()));
return CoordinatorsResult::SAME_NETWORK_ADDRESSES;
}
@ -958,6 +1006,9 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
when(wait(waitForAll(leaderServers))) {}
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
}
TraceEvent("ChangeQuorumCheckerSetCoordinatorsKey")
.detail("CurrentCoordinators", old.toString())
.detail("NewCoordinators", conn->toString());
tr->set(coordinatorsKey, conn->toString());
return Optional<CoordinatorsResult>();
}

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/PaxosConfigTransaction.h"
#include "flow/actorcompiler.h" // must be last include
@ -34,8 +35,8 @@ class CommitQuorum {
Standalone<VectorRef<ConfigMutationRef>> mutations;
ConfigCommitAnnotation annotation;
ConfigTransactionCommitRequest getCommitRequest(ConfigGeneration generation) const {
return ConfigTransactionCommitRequest(generation, mutations, annotation);
ConfigTransactionCommitRequest getCommitRequest(ConfigGeneration generation, size_t coordinatorsHash) const {
return ConfigTransactionCommitRequest(coordinatorsHash, generation, mutations, annotation);
}
void updateResult() {
@ -62,14 +63,16 @@ class CommitQuorum {
ACTOR static Future<Void> addRequestActor(CommitQuorum* self,
ConfigGeneration generation,
size_t coordinatorsHash,
ConfigTransactionInterface cti) {
try {
if (cti.hostname.present()) {
wait(timeoutError(retryGetReplyFromHostname(
self->getCommitRequest(generation), cti.hostname.get(), WLTOKEN_CONFIGTXN_COMMIT),
wait(timeoutError(retryGetReplyFromHostname(self->getCommitRequest(generation, coordinatorsHash),
cti.hostname.get(),
WLTOKEN_CONFIGTXN_COMMIT),
CLIENT_KNOBS->COMMIT_QUORUM_TIMEOUT));
} else {
wait(timeoutError(cti.commit.getReply(self->getCommitRequest(generation)),
wait(timeoutError(cti.commit.getReply(self->getCommitRequest(generation, coordinatorsHash)),
CLIENT_KNOBS->COMMIT_QUORUM_TIMEOUT));
}
++self->successful;
@ -109,11 +112,11 @@ public:
}
void setTimestamp() { annotation.timestamp = now(); }
size_t expectedSize() const { return annotation.expectedSize() + mutations.expectedSize(); }
Future<Void> commit(ConfigGeneration generation) {
Future<Void> commit(ConfigGeneration generation, size_t coordinatorsHash) {
// Send commit message to all replicas, even those that did not return the used replica.
// This way, slow replicas are kept up date.
for (const auto& cti : ctis) {
actors.add(addRequestActor(this, generation, cti));
actors.add(addRequestActor(this, generation, coordinatorsHash, cti));
}
return result.getFuture();
}
@ -122,11 +125,13 @@ public:
class GetGenerationQuorum {
ActorCollection actors{ false };
size_t coordinatorsHash;
std::vector<ConfigTransactionInterface> ctis;
std::map<ConfigGeneration, std::vector<ConfigTransactionInterface>> seenGenerations;
Promise<ConfigGeneration> result;
size_t totalRepliesReceived{ 0 };
size_t maxAgreement{ 0 };
Future<Void> coordinatorsChangedFuture;
Optional<Version> lastSeenLiveVersion;
Future<ConfigGeneration> getGenerationFuture;
@ -137,14 +142,15 @@ class GetGenerationQuorum {
if (cti.hostname.present()) {
wait(timeoutError(store(reply,
retryGetReplyFromHostname(
ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion },
ConfigTransactionGetGenerationRequest{ self->coordinatorsHash,
self->lastSeenLiveVersion },
cti.hostname.get(),
WLTOKEN_CONFIGTXN_GETGENERATION)),
CLIENT_KNOBS->GET_GENERATION_QUORUM_TIMEOUT));
} else {
wait(timeoutError(store(reply,
cti.getGeneration.getReply(
ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion })),
cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{
self->coordinatorsHash, self->lastSeenLiveVersion })),
CLIENT_KNOBS->GET_GENERATION_QUORUM_TIMEOUT));
}
@ -155,6 +161,14 @@ class GetGenerationQuorum {
auto& replicas = self->seenGenerations[gen];
replicas.push_back(cti);
self->maxAgreement = std::max(replicas.size(), self->maxAgreement);
// TraceEvent("ConfigTransactionGotGenerationReply")
// .detail("From", cti.getGeneration.getEndpoint().getPrimaryAddress())
// .detail("TotalRepliesReceived", self->totalRepliesReceived)
// .detail("ReplyGeneration", gen.toString())
// .detail("Replicas", replicas.size())
// .detail("Coordinators", self->ctis.size())
// .detail("MaxAgreement", self->maxAgreement)
// .detail("LastSeenLiveVersion", self->lastSeenLiveVersion);
if (replicas.size() >= self->ctis.size() / 2 + 1 && !self->result.isSet()) {
self->result.send(gen);
} else if (self->maxAgreement + (self->ctis.size() - self->totalRepliesReceived) <
@ -200,8 +214,18 @@ class GetGenerationQuorum {
} catch (Error& e) {
if (e.code() == error_code_failed_to_reach_quorum) {
CODE_PROBE(true, "Failed to reach quorum getting generation");
if (self->coordinatorsChangedFuture.isReady()) {
throw coordinators_changed();
}
wait(delayJittered(
std::clamp(0.005 * (1 << retries), 0.0, CLIENT_KNOBS->TIMEOUT_RETRY_UPPER_BOUND)));
if (deterministicRandom()->random01() < 0.05) {
// Randomly inject a delay of at least the generation
// reply timeout, to try to prevent contention between
// clients.
wait(delay(CLIENT_KNOBS->GET_GENERATION_QUORUM_TIMEOUT *
(deterministicRandom()->random01() + 1.0)));
}
++retries;
self->actors.clear(false);
self->seenGenerations.clear();
@ -217,9 +241,12 @@ class GetGenerationQuorum {
public:
GetGenerationQuorum() = default;
explicit GetGenerationQuorum(std::vector<ConfigTransactionInterface> const& ctis,
explicit GetGenerationQuorum(size_t coordinatorsHash,
std::vector<ConfigTransactionInterface> const& ctis,
Future<Void> coordinatorsChangedFuture,
Optional<Version> const& lastSeenLiveVersion = {})
: ctis(ctis), lastSeenLiveVersion(lastSeenLiveVersion) {}
: coordinatorsHash(coordinatorsHash), ctis(ctis), coordinatorsChangedFuture(coordinatorsChangedFuture),
lastSeenLiveVersion(lastSeenLiveVersion) {}
Future<ConfigGeneration> getGeneration() {
if (!getGenerationFuture.isValid()) {
getGenerationFuture = getGenerationActor(this);
@ -240,12 +267,14 @@ public:
};
class PaxosConfigTransactionImpl {
size_t coordinatorsHash;
std::vector<ConfigTransactionInterface> ctis;
GetGenerationQuorum getGenerationQuorum;
CommitQuorum commitQuorum;
int numRetries{ 0 };
Optional<UID> dID;
Database cx;
Future<Void> watchClusterFileFuture;
ACTOR static Future<Optional<Value>> get(PaxosConfigTransactionImpl* self, Key key) {
state ConfigKey configKey = ConfigKey::decodeKey(key);
@ -263,18 +292,19 @@ class PaxosConfigTransactionImpl {
}
wait(waitForAll(fs));
state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas));
ConfigTransactionGetReply reply =
wait(timeoutError(basicLoadBalance(configNodes,
&ConfigTransactionInterface::get,
ConfigTransactionGetRequest{ generation, configKey }),
CLIENT_KNOBS->GET_KNOB_TIMEOUT));
ConfigTransactionGetReply reply = wait(timeoutError(
basicLoadBalance(configNodes,
&ConfigTransactionInterface::get,
ConfigTransactionGetRequest{ self->coordinatorsHash, generation, configKey }),
CLIENT_KNOBS->GET_KNOB_TIMEOUT));
if (reply.value.present()) {
return reply.value.get().toValue();
} else {
return Optional<Value>{};
}
} catch (Error& e) {
if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) {
if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise &&
e.code() != error_code_coordinators_changed) {
throw;
}
self->reset();
@ -283,58 +313,87 @@ class PaxosConfigTransactionImpl {
}
ACTOR static Future<RangeResult> getConfigClasses(PaxosConfigTransactionImpl* self) {
state ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
state std::vector<ConfigTransactionInterface> readReplicas = self->getGenerationQuorum.getReadReplicas();
std::vector<Future<Void>> fs;
for (ConfigTransactionInterface& readReplica : readReplicas) {
if (readReplica.hostname.present()) {
fs.push_back(tryInitializeRequestStream(
&readReplica.getClasses, readReplica.hostname.get(), WLTOKEN_CONFIGTXN_GETCLASSES));
loop {
try {
state ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
state std::vector<ConfigTransactionInterface> readReplicas =
self->getGenerationQuorum.getReadReplicas();
std::vector<Future<Void>> fs;
for (ConfigTransactionInterface& readReplica : readReplicas) {
if (readReplica.hostname.present()) {
fs.push_back(tryInitializeRequestStream(
&readReplica.getClasses, readReplica.hostname.get(), WLTOKEN_CONFIGTXN_GETCLASSES));
}
}
wait(waitForAll(fs));
state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas));
ConfigTransactionGetConfigClassesReply reply =
wait(basicLoadBalance(configNodes,
&ConfigTransactionInterface::getClasses,
ConfigTransactionGetConfigClassesRequest{ generation }));
RangeResult result;
result.reserve(result.arena(), reply.configClasses.size());
for (const auto& configClass : reply.configClasses) {
result.push_back_deep(result.arena(), KeyValueRef(configClass, ""_sr));
}
return result;
} catch (Error& e) {
if (e.code() != error_code_coordinators_changed) {
throw;
}
self->reset();
}
}
wait(waitForAll(fs));
state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas));
ConfigTransactionGetConfigClassesReply reply =
wait(basicLoadBalance(configNodes,
&ConfigTransactionInterface::getClasses,
ConfigTransactionGetConfigClassesRequest{ generation }));
RangeResult result;
result.reserve(result.arena(), reply.configClasses.size());
for (const auto& configClass : reply.configClasses) {
result.push_back_deep(result.arena(), KeyValueRef(configClass, ""_sr));
}
return result;
}
ACTOR static Future<RangeResult> getKnobs(PaxosConfigTransactionImpl* self, Optional<Key> configClass) {
state ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
state std::vector<ConfigTransactionInterface> readReplicas = self->getGenerationQuorum.getReadReplicas();
std::vector<Future<Void>> fs;
for (ConfigTransactionInterface& readReplica : readReplicas) {
if (readReplica.hostname.present()) {
fs.push_back(tryInitializeRequestStream(
&readReplica.getKnobs, readReplica.hostname.get(), WLTOKEN_CONFIGTXN_GETKNOBS));
loop {
try {
state ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
state std::vector<ConfigTransactionInterface> readReplicas =
self->getGenerationQuorum.getReadReplicas();
std::vector<Future<Void>> fs;
for (ConfigTransactionInterface& readReplica : readReplicas) {
if (readReplica.hostname.present()) {
fs.push_back(tryInitializeRequestStream(
&readReplica.getKnobs, readReplica.hostname.get(), WLTOKEN_CONFIGTXN_GETKNOBS));
}
}
wait(waitForAll(fs));
state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas));
ConfigTransactionGetKnobsReply reply =
wait(basicLoadBalance(configNodes,
&ConfigTransactionInterface::getKnobs,
ConfigTransactionGetKnobsRequest{ generation, configClass }));
RangeResult result;
result.reserve(result.arena(), reply.knobNames.size());
for (const auto& knobName : reply.knobNames) {
result.push_back_deep(result.arena(), KeyValueRef(knobName, ""_sr));
}
return result;
} catch (Error& e) {
if (e.code() != error_code_coordinators_changed) {
throw;
}
self->reset();
}
}
wait(waitForAll(fs));
state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas));
ConfigTransactionGetKnobsReply reply =
wait(basicLoadBalance(configNodes,
&ConfigTransactionInterface::getKnobs,
ConfigTransactionGetKnobsRequest{ generation, configClass }));
RangeResult result;
result.reserve(result.arena(), reply.knobNames.size());
for (const auto& knobName : reply.knobNames) {
result.push_back_deep(result.arena(), KeyValueRef(knobName, ""_sr));
}
return result;
}
ACTOR static Future<Void> commit(PaxosConfigTransactionImpl* self) {
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
self->commitQuorum.setTimestamp();
wait(self->commitQuorum.commit(generation));
return Void();
loop {
try {
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
self->commitQuorum.setTimestamp();
wait(self->commitQuorum.commit(generation, self->coordinatorsHash));
return Void();
} catch (Error& e) {
if (e.code() != error_code_coordinators_changed) {
throw;
}
self->reset();
}
}
}
ACTOR static Future<Void> onError(PaxosConfigTransactionImpl* self, Error e) {
@ -350,6 +409,24 @@ class PaxosConfigTransactionImpl {
throw e;
}
// Returns when the cluster interface updates with a new connection string.
ACTOR static Future<Void> watchClusterFile(Database cx) {
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(
new AsyncVar<Optional<ClusterInterface>>);
state Future<Void> _ = monitorLeader<ClusterInterface>(cx->getConnectionRecord(), clusterInterface);
state std::string connectionString = cx->getConnectionRecord()->getConnectionString().toString();
loop {
choose {
when(wait(clusterInterface->onChange())) {
if (cx->getConnectionRecord()->getConnectionString().toString() != connectionString) {
return Void();
}
}
}
}
}
public:
Future<Version> getReadVersion() {
return map(getGenerationQuorum.getGeneration(), [](auto const& gen) { return gen.committedVersion; });
@ -395,7 +472,21 @@ public:
void debugTransaction(UID dID) { this->dID = dID; }
void reset() {
getGenerationQuorum = GetGenerationQuorum{ ctis };
ctis.clear();
// Re-read connection string. If the cluster file changed, this will
// return the updated value.
const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString();
ctis.reserve(cs.hostnames.size() + cs.coords.size());
for (const auto& h : cs.hostnames) {
ctis.emplace_back(h);
}
for (const auto& c : cs.coords) {
ctis.emplace_back(c);
}
coordinatorsHash = std::hash<std::string>()(cx->getConnectionRecord()->getConnectionString().toString());
getGenerationQuorum = GetGenerationQuorum{
coordinatorsHash, ctis, watchClusterFile(cx), getGenerationQuorum.getLastSeenLiveVersion()
};
commitQuorum = CommitQuorum{ ctis };
}
@ -416,21 +507,10 @@ public:
Future<Void> commit() { return commit(this); }
PaxosConfigTransactionImpl(Database const& cx) : cx(cx) {
const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString();
ctis.reserve(cs.hostnames.size() + cs.coords.size());
for (const auto& h : cs.hostnames) {
ctis.emplace_back(h);
}
for (const auto& c : cs.coords) {
ctis.emplace_back(c);
}
getGenerationQuorum = GetGenerationQuorum{ ctis };
commitQuorum = CommitQuorum{ ctis };
}
PaxosConfigTransactionImpl(Database const& cx) : cx(cx) { reset(); }
PaxosConfigTransactionImpl(std::vector<ConfigTransactionInterface> const& ctis)
: ctis(ctis), getGenerationQuorum(ctis), commitQuorum(ctis) {}
: ctis(ctis), getGenerationQuorum(0, ctis, Future<Void>()), commitQuorum(ctis) {}
};
Future<Version> PaxosConfigTransaction::getReadVersion() {

View File

@ -70,11 +70,12 @@ class SimpleConfigTransactionImpl {
state ConfigTransactionGetReply reply;
if (self->cti.hostname.present()) {
wait(store(reply,
retryGetReplyFromHostname(ConfigTransactionGetRequest{ generation, configKey },
retryGetReplyFromHostname(ConfigTransactionGetRequest{ 0, generation, configKey },
self->cti.hostname.get(),
WLTOKEN_CONFIGTXN_GET)));
} else {
wait(store(reply, retryBrokenPromise(self->cti.get, ConfigTransactionGetRequest{ generation, configKey })));
wait(store(reply,
retryBrokenPromise(self->cti.get, ConfigTransactionGetRequest{ 0, generation, configKey })));
}
if (self->dID.present()) {
TraceEvent("SimpleConfigTransactionGotValue", self->dID.get())

View File

@ -1002,6 +1002,7 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
return ids;
}
const KeyRef previousCoordinatorsKey = LiteralStringRef("\xff/previousCoordinators");
const KeyRef coordinatorsKey = LiteralStringRef("\xff/coordinators");
const KeyRef logsKey = LiteralStringRef("\xff/logs");
const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCommitVersion");

View File

@ -55,7 +55,7 @@ Reference<ITenant> ThreadSafeDatabase::openTenant(TenantNameRef tenantName) {
}
Reference<ITransaction> ThreadSafeDatabase::createTransaction() {
auto type = isConfigDB ? ISingleThreadTransaction::Type::SIMPLE_CONFIG : ISingleThreadTransaction::Type::RYW;
auto type = isConfigDB ? ISingleThreadTransaction::Type::PAXOS_CONFIG : ISingleThreadTransaction::Type::RYW;
return Reference<ITransaction>(new ThreadSafeTransaction(db, type, Optional<TenantName>()));
}
@ -224,7 +224,7 @@ ThreadSafeDatabase::~ThreadSafeDatabase() {
}
Reference<ITransaction> ThreadSafeTenant::createTransaction() {
auto type = db->isConfigDB ? ISingleThreadTransaction::Type::SIMPLE_CONFIG : ISingleThreadTransaction::Type::RYW;
auto type = db->isConfigDB ? ISingleThreadTransaction::Type::PAXOS_CONFIG : ISingleThreadTransaction::Type::RYW;
return Reference<ITransaction>(new ThreadSafeTransaction(db->db, type, name));
}

View File

@ -65,16 +65,18 @@ struct ConfigTransactionGetGenerationReply {
struct ConfigTransactionGetGenerationRequest {
static constexpr FileIdentifier file_identifier = 138941;
size_t coordinatorsHash;
// A hint to catch up lagging nodes:
Optional<Version> lastSeenLiveVersion;
ReplyPromise<ConfigTransactionGetGenerationReply> reply;
ConfigTransactionGetGenerationRequest() = default;
explicit ConfigTransactionGetGenerationRequest(Optional<Version> const& lastSeenLiveVersion)
: lastSeenLiveVersion(lastSeenLiveVersion) {}
explicit ConfigTransactionGetGenerationRequest(size_t coordinatorsHash,
Optional<Version> const& lastSeenLiveVersion)
: coordinatorsHash(coordinatorsHash), lastSeenLiveVersion(lastSeenLiveVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, lastSeenLiveVersion, reply);
serializer(ar, coordinatorsHash, lastSeenLiveVersion, reply);
}
};
@ -92,39 +94,43 @@ struct ConfigTransactionGetReply {
struct ConfigTransactionGetRequest {
static constexpr FileIdentifier file_identifier = 923040;
size_t coordinatorsHash;
ConfigGeneration generation;
ConfigKey key;
ReplyPromise<ConfigTransactionGetReply> reply;
ConfigTransactionGetRequest() = default;
explicit ConfigTransactionGetRequest(ConfigGeneration generation, ConfigKey key)
: generation(generation), key(key) {}
explicit ConfigTransactionGetRequest(size_t coordinatorsHash, ConfigGeneration generation, ConfigKey key)
: coordinatorsHash(coordinatorsHash), generation(generation), key(key) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, generation, key, reply);
serializer(ar, coordinatorsHash, generation, key, reply);
}
};
struct ConfigTransactionCommitRequest {
static constexpr FileIdentifier file_identifier = 103841;
Arena arena;
size_t coordinatorsHash;
ConfigGeneration generation{ ::invalidVersion, ::invalidVersion };
VectorRef<ConfigMutationRef> mutations;
ConfigCommitAnnotationRef annotation;
ReplyPromise<Void> reply;
ConfigTransactionCommitRequest() = default;
explicit ConfigTransactionCommitRequest(ConfigGeneration generation,
explicit ConfigTransactionCommitRequest(size_t coordinatorsHash,
ConfigGeneration generation,
VectorRef<ConfigMutationRef> mutations,
ConfigCommitAnnotationRef annotation)
: generation(generation), mutations(arena, mutations), annotation(arena, annotation) {}
: coordinatorsHash(coordinatorsHash), generation(generation), mutations(arena, mutations),
annotation(arena, annotation) {}
size_t expectedSize() const { return mutations.expectedSize() + annotation.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, generation, mutations, annotation, reply, arena);
serializer(ar, coordinatorsHash, generation, mutations, annotation, reply, arena);
}
};

View File

@ -377,6 +377,12 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
// 1 = Send a signal to pause/already paused.
extern const KeyRef backupPausedKey;
// "\xff/previousCoordinators" = "[[ClusterConnectionString]]"
// Set to the encoded structure of the cluster's previous set of coordinators.
// Changed when performing quorumChange.
// See "CoordinationInterface.h" struct ClusterConnectionString for more details
extern const KeyRef previousCoordinatorsKey;
// "\xff/coordinators" = "[[ClusterConnectionString]]"
// Set to the encoded structure of the cluster's current set of coordinators.
// Changed when performing quorumChange.

View File

@ -48,8 +48,9 @@ enum WellKnownEndpoints {
WLTOKEN_CONFIGFOLLOWER_COMPACT, // 20
WLTOKEN_CONFIGFOLLOWER_ROLLFORWARD, // 21
WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION, // 22
WLTOKEN_PROCESS, // 23
WLTOKEN_RESERVED_COUNT // 24
WLTOKEN_CONFIGFOLLOWER_LOCK, // 23
WLTOKEN_PROCESS, // 24
WLTOKEN_RESERVED_COUNT // 25
};
static_assert(WLTOKEN_PROTOCOL_INFO ==

View File

@ -327,7 +327,8 @@ private:
}
void checkSetConfigKeys(MutationRef m) {
if (!m.param1.startsWith(configKeysPrefix) && m.param1 != coordinatorsKey) {
if (!m.param1.startsWith(configKeysPrefix) && m.param1 != coordinatorsKey &&
m.param1 != previousCoordinatorsKey) {
return;
}
if (Optional<StringRef>(m.param2) !=
@ -343,7 +344,8 @@ private:
TraceEvent("MutationRequiresRestart", dbgid)
.detail("M", m)
.detail("PrevValue", t.orDefault("(none)"_sr))
.detail("ToCommit", toCommit != nullptr);
.detail("ToCommit", toCommit != nullptr)
.detail("InitialCommit", initialCommit);
confChange = true;
}
}
@ -1116,6 +1118,9 @@ private:
if (initialCommit) {
return;
}
if (range.contains(previousCoordinatorsKey)) {
txnStateStore->clear(singleKeyRange(previousCoordinatorsKey));
}
if (range.contains(coordinatorsKey)) {
txnStateStore->clear(singleKeyRange(coordinatorsKey));
}

View File

@ -43,7 +43,6 @@
#include "fdbserver/ClusterRecovery.actor.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/DBCoreState.h"
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/LeaderElection.h"
#include "fdbserver/LogSystem.h"
@ -196,6 +195,21 @@ struct EncryptKeyProxySingleton : Singleton<EncryptKeyProxyInterface> {
}
};
ACTOR Future<Optional<Value>> getPreviousCoordinators(ClusterControllerData* self) {
state ReadYourWritesTransaction tr(self->db.db);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> previousCoordinators = wait(tr.get(previousCoordinatorsKey));
return previousCoordinators;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
ClusterControllerData::DBInfo* db,
ServerCoordinators coordinators,
@ -1209,13 +1223,14 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
self->masterProcessId = w.locality.processId();
}
if (configBroadcaster != nullptr && isCoordinator) {
if (configBroadcaster != nullptr) {
self->addActor.send(configBroadcaster->registerNode(
w,
req.lastSeenKnobVersion,
req.knobConfigClassSet,
self->id_worker[w.locality.processId()].watcher,
self->id_worker[w.locality.processId()].details.interf.configBroadcastInterface));
self->id_worker[w.locality.processId()].details.interf.configBroadcastInterface,
isCoordinator));
}
self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint());
self->updateDBInfo.trigger();
@ -1246,12 +1261,13 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint());
self->updateDBInfo.trigger();
}
if (configBroadcaster != nullptr && isCoordinator) {
if (configBroadcaster != nullptr) {
self->addActor.send(configBroadcaster->registerNode(w,
req.lastSeenKnobVersion,
req.knobConfigClassSet,
info->second.watcher,
info->second.details.interf.configBroadcastInterface));
info->second.details.interf.configBroadcastInterface,
isCoordinator));
}
checkOutstandingRequests(self);
} else {
@ -2536,10 +2552,10 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
ConfigDBType configDBType,
Future<Void> recoveredDiskFiles) {
state ClusterControllerData self(interf, locality, coordinators);
state ConfigBroadcaster configBroadcaster(coordinators, configDBType);
state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY);
state uint64_t step = 0;
state Future<ErrorOr<Void>> error = errorOr(actorCollection(self.addActor.getFuture()));
state ConfigBroadcaster configBroadcaster(coordinators, configDBType, getPreviousCoordinators(&self));
// EncryptKeyProxy is necessary for TLog recovery, recruit it as the first process
if (SERVER_KNOBS->ENABLE_ENCRYPTION) {

View File

@ -521,6 +521,7 @@ ACTOR Future<Void> changeCoordinators(Reference<ClusterRecoveryData> self) {
loop {
ChangeCoordinatorsRequest req = waitNext(self->clusterController.changeCoordinators.getFuture());
TraceEvent("ChangeCoordinators", self->dbgid).log();
++self->changeCoordinatorsRequests;
state ChangeCoordinatorsRequest changeCoordinatorsRequest = req;
if (self->masterInterface.id() != changeCoordinatorsRequest.masterId) {
@ -1637,6 +1638,11 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
tr.set(
recoveryCommitRequest.arena, primaryLocalityKey, BinaryWriter::toValue(self->primaryLocality, Unversioned()));
tr.set(recoveryCommitRequest.arena, backupVersionKey, backupVersionValue);
Optional<Value> txnStateStoreCoords = self->txnStateStore->readValue(coordinatorsKey).get();
if (txnStateStoreCoords.present() &&
txnStateStoreCoords.get() != self->coordinators.ccr->getConnectionString().toString()) {
tr.set(recoveryCommitRequest.arena, previousCoordinatorsKey, txnStateStoreCoords.get());
}
tr.set(recoveryCommitRequest.arena, coordinatorsKey, self->coordinators.ccr->getConnectionString().toString());
tr.set(recoveryCommitRequest.arena, logsKey, self->logSystem->getLogsValue());
tr.set(recoveryCommitRequest.arena,

View File

@ -618,7 +618,7 @@ struct CommitBatchContext {
bool isMyFirstBatch;
bool firstStateMutations;
Optional<Value> oldCoordinators;
Optional<Value> previousCoordinators;
StoreCommit_t storeCommits;
@ -1146,7 +1146,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
ASSERT(self->commitVersion);
if (!self->isMyFirstBatch &&
pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get() != self->oldCoordinators.get()) {
pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get() != self->previousCoordinators.get()) {
wait(brokenPromiseToNever(pProxyCommitData->db->get().clusterInterface.changeCoordinators.getReply(
ChangeCoordinatorsRequest(pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get(),
self->pProxyCommitData->master.id()))));
@ -1374,7 +1374,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
}
self->isMyFirstBatch = !pProxyCommitData->version.get();
self->oldCoordinators = pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get();
self->previousCoordinators = pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get();
assertResolutionStateMutationsSizeConsistent(self->resolution);

View File

@ -20,6 +20,7 @@
#include <algorithm>
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/IConfigConsumer.h"
@ -77,13 +78,24 @@ class ConfigBroadcasterImpl {
std::deque<VersionedConfigMutation> mutationHistory;
std::deque<VersionedConfigCommitAnnotation> annotationHistory;
Version lastCompactedVersion;
Version largestLiveVersion;
Version mostRecentVersion;
size_t coordinatorsHash;
std::unique_ptr<IConfigConsumer> consumer;
Future<Void> consumerFuture;
ActorCollection actors{ false };
std::map<UID, BroadcastClientDetails> clients;
std::map<UID, Future<Void>> clientFailures;
// State related to changing coordinators
// Used to read a snapshot from the previous coordinators after a change
// coordinators command.
Version maxLastSeenVersion = ::invalidVersion;
Future<Optional<Value>> previousCoordinatorsFuture;
std::unique_ptr<IConfigConsumer> previousCoordinatorsConsumer;
Future<Void> previousCoordinatorsSnapshotFuture;
UID id;
CounterCollection cc;
Counter compactRequest;
@ -95,6 +107,7 @@ class ConfigBroadcasterImpl {
int coordinators = 0;
std::unordered_set<NetworkAddress> activeConfigNodes;
std::unordered_set<NetworkAddress> registrationResponses;
std::unordered_set<NetworkAddress> registrationResponsesUnregistered;
bool disallowUnregistered = false;
Promise<Void> newConfigNodesAllowed;
@ -155,8 +168,8 @@ class ConfigBroadcasterImpl {
}
ConfigBroadcasterImpl()
: lastCompactedVersion(0), mostRecentVersion(0), id(deterministicRandom()->randomUniqueID()),
cc("ConfigBroadcaster"), compactRequest("CompactRequest", cc),
: lastCompactedVersion(0), largestLiveVersion(0), mostRecentVersion(0),
id(deterministicRandom()->randomUniqueID()), cc("ConfigBroadcaster"), compactRequest("CompactRequest", cc),
successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc),
snapshotRequest("SnapshotRequest", cc) {
logger = traceCounters(
@ -183,45 +196,44 @@ class ConfigBroadcasterImpl {
}
}
template <class Snapshot>
Future<Void> setSnapshot(Snapshot&& snapshot, Version snapshotVersion) {
this->snapshot = std::forward<Snapshot>(snapshot);
this->lastCompactedVersion = snapshotVersion;
ACTOR static Future<Void> pushSnapshotAndChanges(ConfigBroadcasterImpl* self, Version snapshotVersion) {
std::vector<Future<Void>> futures;
for (const auto& [id, client] : clients) {
futures.push_back(brokenPromiseToNever(pushSnapshot(snapshotVersion, client)));
for (const auto& [id, client] : self->clients) {
futures.push_back(brokenPromiseToNever(self->pushSnapshot(snapshotVersion, client)));
}
return waitForAll(futures);
}
ACTOR template <class Snapshot>
static Future<Void> pushSnapshotAndChanges(ConfigBroadcasterImpl* self,
Snapshot snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations) {
// Make sure all snapshot messages were received before sending changes.
wait(self->setSnapshot(snapshot, snapshotVersion));
self->addChanges(changes, changesVersion, annotations);
wait(waitForAll(futures));
return Void();
}
ACTOR static Future<Void> waitForFailure(ConfigBroadcasterImpl* self,
Future<Void> watcher,
UID clientUID,
NetworkAddress clientAddress) {
NetworkAddress clientAddress,
bool isCoordinator) {
wait(watcher);
TraceEvent(SevDebug, "ConfigBroadcastClientDied", self->id)
.detail("ClientID", clientUID)
.detail("Address", clientAddress);
.detail("Address", clientAddress)
.detail("IsUnregistered",
self->registrationResponsesUnregistered.find(clientAddress) !=
self->registrationResponsesUnregistered.end())
.detail("IsActive", self->activeConfigNodes.find(clientAddress) != self->activeConfigNodes.end());
self->clients.erase(clientUID);
self->clientFailures.erase(clientUID);
self->activeConfigNodes.erase(clientAddress);
self->registrationResponses.erase(clientAddress);
// See comment where this promise is reset below.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
if (isCoordinator) {
self->registrationResponses.erase(clientAddress);
if (self->activeConfigNodes.find(clientAddress) != self->activeConfigNodes.end()) {
self->activeConfigNodes.erase(clientAddress);
if (self->registrationResponsesUnregistered.find(clientAddress) !=
self->registrationResponsesUnregistered.end()) {
self->registrationResponsesUnregistered.erase(clientAddress);
self->disallowUnregistered = false;
// See comment where this promise is reset below.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
}
}
}
}
return Void();
}
@ -231,57 +243,71 @@ class ConfigBroadcasterImpl {
// ensure strict serializability, some nodes may be temporarily restricted
// from participation until the other nodes in the system are brought up to
// date.
ACTOR static Future<Void> registerNodeInternal(ConfigBroadcasterImpl* self,
WorkerInterface w,
Version lastSeenVersion) {
ACTOR static Future<Void> registerNodeInternal(ConfigBroadcaster* broadcaster,
ConfigBroadcasterImpl* self,
WorkerInterface w) {
if (self->configDBType == ConfigDBType::SIMPLE) {
wait(success(retryBrokenPromise(w.configBroadcastInterface.ready, ConfigBroadcastReadyRequest{})));
wait(success(
brokenPromiseToNever(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{}))));
return Void();
}
state NetworkAddress address = w.address();
// Ask the registering ConfigNode whether it has registered in the past.
ConfigBroadcastRegisteredReply reply =
wait(w.configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{}));
state ConfigBroadcastRegisteredReply reply = wait(
brokenPromiseToNever(w.configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{})));
self->maxLastSeenVersion = std::max(self->maxLastSeenVersion, reply.lastSeenVersion);
state bool registered = reply.registered;
TraceEvent("ConfigBroadcasterRegisterNodeReceivedRegistrationReply", self->id)
.detail("Address", address)
.detail("Registered", registered)
.detail("DisallowUnregistered", self->disallowUnregistered)
.detail("LastSeenVersion", reply.lastSeenVersion);
if (self->activeConfigNodes.find(address) != self->activeConfigNodes.end()) {
self->activeConfigNodes.erase(address);
// Since a node can die and re-register before the broadcaster
// receives notice that the node has died, we need to check for
// re-registration of a node here. There are two places that can
// reset the promise to allow new nodes, make sure the promise is
// actually set before resetting it. This prevents a node from
// dying, registering, waiting on the promise, then the broadcaster
// receives the notification the node has died and resets the
// promise again.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
if (self->registrationResponsesUnregistered.find(address) !=
self->registrationResponsesUnregistered.end()) {
self->registrationResponsesUnregistered.erase(address);
// If an unregistered node died which was active, reset the
// disallow unregistered flag so if it re-registers it can be
// set as active again.
self->disallowUnregistered = false;
// Since a node can die and re-register before the broadcaster
// receives notice that the node has died, we need to check for
// re-registration of a node here. There are two places that can
// reset the promise to allow new nodes, so make sure the promise
// is actually set before resetting it. This prevents a node from
// dying, registering, waiting on the promise, then the broadcaster
// receives the notification the node has died and resets the
// promise again.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
}
}
}
self->registrationResponses.insert(address);
if (registered) {
if (!self->disallowUnregistered) {
self->activeConfigNodes.clear();
}
self->activeConfigNodes.insert(address);
self->disallowUnregistered = true;
} else if ((self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) ||
self->coordinators - self->registrationResponses.size() <=
self->coordinators / 2 + 1 - self->activeConfigNodes.size()) {
self->registrationResponsesUnregistered.size() < self->coordinators / 2) {
// Received a registration request from an unregistered node. There
// are two cases where we want to allow unregistered nodes to
// register:
// * the cluster is just starting and no nodes are registered
// * a minority of nodes are registered and a majority are
// unregistered. This situation should only occur in rare
// circumstances where the cluster controller dies with only a
// minority of config nodes having received a
// ConfigBroadcastReadyRequest
// * there are registered and unregistered nodes, but the
// registered nodes may not represent a majority due to previous
// data loss. In this case, unregistered nodes must be allowed
// to register so they can be rolled forward and form a quorum.
// But only a minority of unregistered nodes should be allowed
// to register so they cannot override the registered nodes as
// a source of truth
self->activeConfigNodes.insert(address);
if (self->activeConfigNodes.size() >= self->coordinators / 2 + 1 &&
self->registrationResponsesUnregistered.insert(address);
if ((self->activeConfigNodes.size() >= self->coordinators / 2 + 1 ||
self->registrationResponsesUnregistered.size() >= self->coordinators / 2 + 1) &&
self->newConfigNodesAllowed.canBeSet()) {
self->newConfigNodesAllowed.send(Void());
}
@ -289,11 +315,78 @@ class ConfigBroadcasterImpl {
self->disallowUnregistered = true;
}
if (!registered) {
// Read previous coordinators and fetch snapshot from them if they
// exist. This path should only be hit once after the coordinators are
// changed.
wait(yield());
Optional<Value> previousCoordinators = wait(self->previousCoordinatorsFuture);
TraceEvent("ConfigBroadcasterRegisterNodeReadPreviousCoordinators", self->id)
.detail("PreviousCoordinators", previousCoordinators)
.detail("HasStartedConsumer", self->previousCoordinatorsSnapshotFuture.isValid());
if (previousCoordinators.present()) {
if (!self->previousCoordinatorsSnapshotFuture.isValid()) {
// Create a consumer to read a snapshot from the previous
// coordinators. The snapshot will be forwarded to the new
// coordinators to bring them up to date.
size_t previousCoordinatorsHash = std::hash<std::string>()(previousCoordinators.get().toString());
if (previousCoordinatorsHash != self->coordinatorsHash) {
ServerCoordinators previousCoordinatorsData(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(previousCoordinators.get().toString())));
TraceEvent("ConfigBroadcasterRegisterNodeStartingConsumer", self->id).log();
self->previousCoordinatorsConsumer = IConfigConsumer::createPaxos(
previousCoordinatorsData, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL, true);
self->previousCoordinatorsSnapshotFuture =
self->previousCoordinatorsConsumer->readSnapshot(*broadcaster);
} else {
// If the cluster controller restarts without a coordinator
// change having taken place, there is no need to read a
// previous snapshot.
self->previousCoordinatorsSnapshotFuture = Void();
}
}
wait(self->previousCoordinatorsSnapshotFuture);
}
state bool sendSnapshot =
self->previousCoordinatorsConsumer && reply.lastSeenVersion <= self->mostRecentVersion;
// Unregistered nodes need to wait for either:
// 1. A quorum of registered nodes to register and send their
// snapshots, so the unregistered nodes can be rolled forward, or
// 2. A quorum of unregistered nodes to contact the broadcaster (this
// means there is no previous data in the configuration database)
// The above conditions do not apply when changing coordinators, as a
// snapshot of the current state of the configuration database needs to
// be sent to all new coordinators.
TraceEvent("ConfigBroadcasterRegisterNodeDetermineEligibility", self->id)
.detail("Registered", registered)
.detail("SendSnapshot", sendSnapshot);
if (!registered && !sendSnapshot) {
wait(self->newConfigNodesAllowed.getFuture());
}
wait(success(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{})));
TraceEvent("ConfigBroadcasterRegisterNodeSendingReadyRequest", self->id)
.detail("ConfigNodeAddress", address)
.detail("SendSnapshot", sendSnapshot)
.detail("SnapshotVersion", self->mostRecentVersion)
.detail("SnapshotSize", self->snapshot.size())
.detail("LargestLiveVersion", self->largestLiveVersion);
if (sendSnapshot) {
Version liveVersion = std::max(self->largestLiveVersion, self->mostRecentVersion);
wait(success(brokenPromiseToNever(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{
self->coordinatorsHash, self->snapshot, self->mostRecentVersion, liveVersion }))));
} else {
wait(success(brokenPromiseToNever(w.configBroadcastInterface.ready.getReply(
ConfigBroadcastReadyRequest{ self->coordinatorsHash, {}, -1, -1 }))));
}
// Start the consumer last, so at least some nodes will be registered.
if (!self->consumerFuture.isValid()) {
if (sendSnapshot) {
self->consumer->allowSpecialCaseRollforward();
}
self->consumerFuture = self->consumer->consume(*broadcaster);
}
return Void();
}
@ -303,12 +396,10 @@ class ConfigBroadcasterImpl {
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
ConfigBroadcastInterface broadcastInterface,
bool isCoordinator) {
state BroadcastClientDetails client(
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface));
if (!impl->consumerFuture.isValid()) {
impl->consumerFuture = impl->consumer->consume(*self);
}
if (impl->clients.count(broadcastInterface.id())) {
// Client already registered
@ -317,15 +408,18 @@ class ConfigBroadcasterImpl {
TraceEvent(SevDebug, "ConfigBroadcasterRegisteringWorker", impl->id)
.detail("ClientID", broadcastInterface.id())
.detail("MostRecentVersion", impl->mostRecentVersion);
.detail("MostRecentVersion", impl->mostRecentVersion)
.detail("IsCoordinator", isCoordinator);
impl->actors.add(registerNodeInternal(impl, w, lastSeenVersion));
if (isCoordinator) {
impl->actors.add(registerNodeInternal(self, impl, w));
}
// Push full snapshot to worker if it isn't up to date.
wait(impl->pushSnapshot(impl->mostRecentVersion, client));
impl->clients[broadcastInterface.id()] = client;
impl->clientFailures[broadcastInterface.id()] =
waitForFailure(impl, watcher, broadcastInterface.id(), w.address());
waitForFailure(impl, watcher, broadcastInterface.id(), w.address(), isCoordinator);
return Void();
}
@ -335,8 +429,10 @@ public:
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface const& broadcastInterface) {
return registerNode(&self, this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface);
ConfigBroadcastInterface const& broadcastInterface,
bool isCoordinator) {
return registerNode(
&self, this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface, isCoordinator);
}
// Updates the broadcasters knowledge of which replicas are fully up to
@ -377,17 +473,36 @@ public:
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators) {
TraceEvent(SevDebug, "ConfigBroadcasterApplyingSnapshotAndChanges", id)
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("SnapshotSize", snapshot.size())
.detail("SnapshotVersion", snapshotVersion)
.detail("ChangesSize", changes.size())
.detail("ChangesVersion", changesVersion)
.detail("ActiveReplicas", readReplicas.size());
actors.add(pushSnapshotAndChanges(this, snapshot, snapshotVersion, changes, changesVersion, annotations));
.detail("ActiveReplicas", readReplicas.size())
.detail("LargestLiveVersion", largestLiveVersion)
.detail("FromPreviousCoordinators", fromPreviousCoordinators);
// Avoid updating state if the snapshot contains no mutations, or if it
// contains old mutations. This can happen when the set of coordinators
// is changed, and a new coordinator comes online that has not yet had
// the current configuration database pushed to it, or when a new
// coordinator contains state from an old configuration database
// generation.
if ((snapshot.size() != 0 || changes.size() != 0) &&
(snapshotVersion > this->mostRecentVersion || changesVersion > this->mostRecentVersion)) {
this->snapshot = std::forward<Snapshot>(snapshot);
this->lastCompactedVersion = snapshotVersion;
this->largestLiveVersion = std::max(this->largestLiveVersion, largestLiveVersion);
addChanges(changes, changesVersion, annotations);
actors.add(pushSnapshotAndChanges(this, snapshotVersion));
}
updateKnownReplicas(readReplicas);
if (!fromPreviousCoordinators) {
updateKnownReplicas(readReplicas);
}
}
ConfigBroadcasterImpl(ConfigFollowerInterface const& cfi) : ConfigBroadcasterImpl() {
@ -397,18 +512,27 @@ public:
TraceEvent(SevDebug, "ConfigBroadcasterStartingConsumer", id).detail("Consumer", consumer->getID());
}
ConfigBroadcasterImpl(ServerCoordinators const& coordinators, ConfigDBType configDBType) : ConfigBroadcasterImpl() {
ConfigBroadcasterImpl(ServerCoordinators const& coordinators,
ConfigDBType configDBType,
Future<Optional<Value>> previousCoordinatorsFuture)
: ConfigBroadcasterImpl() {
this->configDBType = configDBType;
this->coordinators = coordinators.configServers.size();
if (configDBType != ConfigDBType::DISABLED) {
if (configDBType == ConfigDBType::SIMPLE) {
consumer = IConfigConsumer::createSimple(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL);
} else {
this->previousCoordinatorsFuture = previousCoordinatorsFuture;
consumer = IConfigConsumer::createPaxos(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL);
}
coordinatorsHash = std::hash<std::string>()(coordinators.ccr->getConnectionString().toString());
TraceEvent(SevDebug, "ConfigBroadcasterStartingConsumer", id)
.detail("Consumer", consumer->getID())
.detail("UsingSimpleConsumer", configDBType == ConfigDBType::SIMPLE);
.detail("UsingSimpleConsumer", configDBType == ConfigDBType::SIMPLE)
.detail("CoordinatorsCount", this->coordinators)
.detail("CoordinatorsHash", coordinatorsHash);
}
}
@ -419,9 +543,12 @@ public:
JsonBuilderObject mutationObject;
mutationObject["version"] = versionedMutation.version;
const auto& mutation = versionedMutation.mutation;
mutationObject["type"] = mutation.isSet() ? "set" : "clear";
mutationObject["config_class"] = mutation.getConfigClass().orDefault("<global>"_sr);
mutationObject["knob_name"] = mutation.getKnobName();
mutationObject["knob_value"] = mutation.getValue().toString();
if (mutation.isSet()) {
mutationObject["knob_value"] = mutation.getValue().toString();
}
mutationsArray.push_back(std::move(mutationObject));
}
result["mutations"] = std::move(mutationsArray);
@ -477,11 +604,15 @@ public:
static void runPendingRequestStoreTest(bool includeGlobalMutation, int expectedMatches);
};
ConfigBroadcaster::ConfigBroadcaster() {}
ConfigBroadcaster::ConfigBroadcaster(ConfigFollowerInterface const& cfi)
: impl(PImpl<ConfigBroadcasterImpl>::create(cfi)) {}
ConfigBroadcaster::ConfigBroadcaster(ServerCoordinators const& coordinators, ConfigDBType configDBType)
: impl(PImpl<ConfigBroadcasterImpl>::create(coordinators, configDBType)) {}
ConfigBroadcaster::ConfigBroadcaster(ServerCoordinators const& coordinators,
ConfigDBType configDBType,
Future<Optional<Value>> previousCoordinatorsFuture)
: impl(PImpl<ConfigBroadcasterImpl>::create(coordinators, configDBType, previousCoordinatorsFuture)) {}
ConfigBroadcaster::ConfigBroadcaster(ConfigBroadcaster&&) = default;
@ -493,8 +624,9 @@ Future<Void> ConfigBroadcaster::registerNode(WorkerInterface const& w,
Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface const& broadcastInterface) {
return impl->registerNode(*this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface);
ConfigBroadcastInterface const& broadcastInterface,
bool isCoordinator) {
return impl->registerNode(*this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface, isCoordinator);
}
void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
@ -510,8 +642,17 @@ void ConfigBroadcaster::applySnapshotAndChanges(
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
impl->applySnapshotAndChanges(snapshot, snapshotVersion, changes, changesVersion, annotations, readReplicas);
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators) {
impl->applySnapshotAndChanges(snapshot,
snapshotVersion,
changes,
changesVersion,
annotations,
readReplicas,
largestLiveVersion,
fromPreviousCoordinators);
}
void ConfigBroadcaster::applySnapshotAndChanges(
@ -520,9 +661,17 @@ void ConfigBroadcaster::applySnapshotAndChanges(
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
impl->applySnapshotAndChanges(
std::move(snapshot), snapshotVersion, changes, changesVersion, annotations, readReplicas);
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators) {
impl->applySnapshotAndChanges(std::move(snapshot),
snapshotVersion,
changes,
changesVersion,
annotations,
readReplicas,
largestLiveVersion,
fromPreviousCoordinators);
}
Future<Void> ConfigBroadcaster::getError() const {
@ -544,3 +693,27 @@ JsonBuilderObject ConfigBroadcaster::getStatus() const {
void ConfigBroadcaster::compact(Version compactionVersion) {
impl->compact(compactionVersion);
}
ACTOR static Future<Void> lockConfigNodesImpl(ServerCoordinators coordinators) {
size_t coordinatorsHash = std::hash<std::string>()(coordinators.ccr->getConnectionString().toString());
std::vector<Future<Void>> lockRequests;
lockRequests.reserve(coordinators.configServers.size());
for (int i = 0; i < coordinators.configServers.size(); i++) {
if (coordinators.configServers[i].hostname.present()) {
lockRequests.push_back(retryGetReplyFromHostname(ConfigFollowerLockRequest{ coordinatorsHash },
coordinators.configServers[i].hostname.get(),
WLTOKEN_CONFIGFOLLOWER_LOCK));
} else {
lockRequests.push_back(
retryBrokenPromise(coordinators.configServers[i].lock, ConfigFollowerLockRequest{ coordinatorsHash }));
}
}
int quorum_size = lockRequests.size() / 2 + 1;
wait(quorum(lockRequests, quorum_size));
return Void();
}
Future<Void> ConfigBroadcaster::lockConfigNodes(ServerCoordinators coordinators) {
return lockConfigNodesImpl(coordinators);
}

View File

@ -270,7 +270,7 @@ class BroadcasterToLocalConfigEnvironment {
self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>();
self->readFrom.connectToBroadcaster(self->cbi);
self->broadcastServer = self->broadcaster.registerNode(
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get());
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get(), true);
return Void();
}
@ -309,7 +309,8 @@ public:
readFrom.lastSeenVersion(),
readFrom.configClassSet(),
workerFailure.getFuture(),
cbi->get());
cbi->get(),
true);
}
Future<Void> restartLocalConfig(std::string const& newConfigPath) {
@ -442,7 +443,7 @@ class TransactionToLocalConfigEnvironment {
self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>();
self->readFrom.connectToBroadcaster(self->cbi);
self->broadcastServer = self->broadcaster.registerNode(
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get());
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get(), true);
return Void();
}
@ -465,7 +466,8 @@ public:
readFrom.lastSeenVersion(),
readFrom.configClassSet(),
workerFailure.getFuture(),
cbi->get());
cbi->get(),
true);
}
Future<Void> restartLocalConfig(std::string const& newConfigPath) {

View File

@ -29,6 +29,7 @@ void ConfigFollowerInterface::setupWellKnownEndpoints() {
compact.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_COMPACT, TaskPriority::Coordination);
rollforward.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_ROLLFORWARD, TaskPriority::Coordination);
getCommittedVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION, TaskPriority::Coordination);
lock.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_LOCK, TaskPriority::Coordination);
}
ConfigFollowerInterface::ConfigFollowerInterface() : _id(deterministicRandom()->randomUniqueID()) {}
@ -39,7 +40,8 @@ ConfigFollowerInterface::ConfigFollowerInterface(NetworkAddress const& remote)
getChanges(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGFOLLOWER_GETCHANGES)),
compact(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGFOLLOWER_COMPACT)),
rollforward(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGFOLLOWER_ROLLFORWARD)),
getCommittedVersion(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION)) {}
getCommittedVersion(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION)),
lock(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGFOLLOWER_LOCK)) {}
ConfigFollowerInterface::ConfigFollowerInterface(Hostname const& remote)
: _id(deterministicRandom()->randomUniqueID()), hostname(remote) {}

View File

@ -32,9 +32,11 @@
namespace {
const KeyRef coordinatorsHashKey = "id"_sr;
const KeyRef lastCompactedVersionKey = "lastCompactedVersion"_sr;
const KeyRef currentGenerationKey = "currentGeneration"_sr;
const KeyRef registeredKey = "registered"_sr;
const KeyRef lockedKey = "locked"_sr;
const KeyRangeRef kvKeys = KeyRangeRef("kv/"_sr, "kv0"_sr);
const KeyRangeRef mutationKeys = KeyRangeRef("mutation/"_sr, "mutation0"_sr);
const KeyRangeRef annotationKeys = KeyRangeRef("annotation/"_sr, "annotation0"_sr);
@ -122,6 +124,7 @@ class ConfigNodeImpl {
Counter failedChangeRequests;
Counter snapshotRequests;
Counter getCommittedVersionRequests;
Counter lockRequests;
// Transaction counters
Counter successfulCommits;
@ -132,6 +135,22 @@ class ConfigNodeImpl {
Counter getGenerationRequests;
Future<Void> logger;
ACTOR static Future<Optional<size_t>> getCoordinatorsHash(ConfigNodeImpl* self) {
Optional<Value> value = wait(self->kvStore->readValue(coordinatorsHashKey));
if (!value.present()) {
return Optional<size_t>();
}
return BinaryReader::fromStringRef<size_t>(value.get(), IncludeVersion());
}
ACTOR static Future<Optional<size_t>> getLocked(ConfigNodeImpl* self) {
Optional<Value> value = wait(self->kvStore->readValue(lockedKey));
if (!value.present()) {
return false;
}
return BinaryReader::fromStringRef<Optional<size_t>>(value.get(), IncludeVersion());
}
ACTOR static Future<ConfigGeneration> getGeneration(ConfigNodeImpl* self) {
state ConfigGeneration generation;
Optional<Value> value = wait(self->kvStore->readValue(currentGenerationKey));
@ -216,6 +235,7 @@ class ConfigNodeImpl {
wait(getAnnotations(self, req.lastSeenVersion + 1, committedVersion));
TraceEvent(SevDebug, "ConfigNodeSendingChanges", self->id)
.detail("ReqLastSeenVersion", req.lastSeenVersion)
.detail("ReqMostRecentVersion", req.mostRecentVersion)
.detail("CommittedVersion", committedVersion)
.detail("NumMutations", versionedMutations.size())
.detail("NumCommits", versionedAnnotations.size());
@ -227,6 +247,12 @@ class ConfigNodeImpl {
// New transactions increment the database's current live version. This effectively serves as a lock, providing
// serializability
ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) {
state Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self));
ASSERT(coordinatorsHash.present());
if (req.coordinatorsHash != coordinatorsHash.get()) {
req.reply.sendError(coordinators_changed());
return Void();
}
state ConfigGeneration generation = wait(getGeneration(self));
++generation.liveVersion;
if (req.lastSeenLiveVersion.present()) {
@ -241,6 +267,18 @@ class ConfigNodeImpl {
}
ACTOR static Future<Void> get(ConfigNodeImpl* self, ConfigTransactionGetRequest req) {
state Optional<size_t> locked = wait(getLocked(self));
if (locked.present()) {
CODE_PROBE(true, "attempting to read from a locked ConfigNode");
req.reply.sendError(coordinators_changed());
return Void();
}
state Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self));
ASSERT(coordinatorsHash.present());
if (req.coordinatorsHash != coordinatorsHash.get()) {
req.reply.sendError(coordinators_changed());
return Void();
}
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) {
// TODO: Also send information about highest seen version
@ -273,6 +311,13 @@ class ConfigNodeImpl {
// TODO: Currently it is possible that extra configuration classes may be returned, we
// may want to fix this to clean up the contract
ACTOR static Future<Void> getConfigClasses(ConfigNodeImpl* self, ConfigTransactionGetConfigClassesRequest req) {
state Optional<size_t> locked = wait(getLocked(self));
if (locked.present()) {
CODE_PROBE(true, "attempting to read config classes from locked ConfigNode");
req.reply.sendError(coordinators_changed());
return Void();
}
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) {
req.reply.sendError(transaction_too_old());
@ -306,6 +351,13 @@ class ConfigNodeImpl {
// Retrieve all knobs explicitly defined for the specified configuration class
ACTOR static Future<Void> getKnobs(ConfigNodeImpl* self, ConfigTransactionGetKnobsRequest req) {
state Optional<size_t> locked = wait(getLocked(self));
if (locked.present()) {
CODE_PROBE(true, "attempting to read knobs from locked ConfigNode");
req.reply.sendError(coordinators_changed());
return Void();
}
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) {
req.reply.sendError(transaction_too_old());
@ -383,6 +435,19 @@ class ConfigNodeImpl {
}
ACTOR static Future<Void> commit(ConfigNodeImpl* self, ConfigTransactionCommitRequest req) {
state Optional<size_t> locked = wait(getLocked(self));
if (locked.present()) {
CODE_PROBE(true, "attempting to write to locked ConfigNode");
req.reply.sendError(coordinators_changed());
return Void();
}
state Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self));
ASSERT(coordinatorsHash.present());
if (req.coordinatorsHash != coordinatorsHash.get()) {
req.reply.sendError(coordinators_changed());
return Void();
}
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation.committedVersion != currentGeneration.committedVersion) {
++self->failedCommits;
@ -454,7 +519,7 @@ class ConfigNodeImpl {
// However, commit annotations for compacted mutations are lost
ACTOR static Future<Void> compact(ConfigNodeImpl* self, ConfigFollowerCompactRequest req) {
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
TraceEvent(SevDebug, "ConfigNodeCompacting", self->id)
TraceEvent(SevInfo, "ConfigNodeCompacting", self->id)
.detail("Version", req.version)
.detail("LastCompacted", lastCompactedVersion);
if (req.version <= lastCompactedVersion) {
@ -506,11 +571,13 @@ class ConfigNodeImpl {
req.reply.sendError(transaction_too_old());
return Void();
}
TraceEvent("ConfigNodeRollforward")
TraceEvent("ConfigNodeRollforward", self->id)
.detail("RollbackTo", req.rollback)
.detail("Target", req.target)
.detail("LastKnownCommitted", req.lastKnownCommitted)
.detail("Committed", currentGeneration.committedVersion);
.detail("Committed", currentGeneration.committedVersion)
.detail("CurrentGeneration", currentGeneration.toString())
.detail("LastCompactedVersion", lastCompactedVersion);
// Rollback to prior known committed version to erase any commits not
// made on a quorum.
if (req.rollback.present() && req.rollback.get() < currentGeneration.committedVersion) {
@ -539,8 +606,11 @@ class ConfigNodeImpl {
}
// Now rollforward by applying all mutations between last known
// committed version and rollforward version.
ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion);
wait(commitMutations(self, req.mutations, req.annotations, req.target));
if (req.mutations.size() > 0) {
ASSERT_GT(req.mutations.size(), 0);
ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion);
wait(commitMutations(self, req.mutations, req.annotations, req.target));
}
req.reply.send(Void());
return Void();
@ -548,39 +618,20 @@ class ConfigNodeImpl {
ACTOR static Future<Void> getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) {
state Version lastCompacted = wait(getLastCompactedVersion(self));
ConfigGeneration generation = wait(getGeneration(self));
req.reply.send(ConfigFollowerGetCommittedVersionReply{ lastCompacted, generation.committedVersion });
state ConfigGeneration generation = wait(getGeneration(self));
bool isRegistered = wait(registered(self));
req.reply.send(ConfigFollowerGetCommittedVersionReply{
isRegistered, lastCompacted, generation.liveVersion, generation.committedVersion });
return Void();
}
ACTOR static Future<Void> serve(ConfigNodeImpl* self, ConfigFollowerInterface const* cfi) {
loop {
choose {
when(ConfigFollowerGetSnapshotAndChangesRequest req =
waitNext(cfi->getSnapshotAndChanges.getFuture())) {
++self->snapshotRequests;
wait(getSnapshotAndChanges(self, req));
}
when(ConfigFollowerGetChangesRequest req = waitNext(cfi->getChanges.getFuture())) {
wait(getChanges(self, req));
}
when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) {
++self->compactRequests;
wait(compact(self, req));
}
when(ConfigFollowerRollforwardRequest req = waitNext(cfi->rollforward.getFuture())) {
++self->rollforwardRequests;
wait(rollforward(self, req));
}
when(ConfigFollowerGetCommittedVersionRequest req = waitNext(cfi->getCommittedVersion.getFuture())) {
++self->getCommittedVersionRequests;
wait(getCommittedVersion(self, req));
}
when(wait(self->kvStore->getError())) { ASSERT(false); }
}
}
}
// Requires ConfigNodes to register with the ConfigBroadcaster before being
// allowed to respond to most requests. The ConfigBroadcaster will first
// ask the ConfigNode whether it is registered (kickstarted by the worker
// registering with the cluster controller). Then, the ConfigBroadcaster
// will send the ConfigNode a ready message, containing a snapshot if the
// ConfigNode is a new coordinator and needs updated state, or empty
// otherwise.
ACTOR static Future<Void> serve(ConfigNodeImpl* self, ConfigBroadcastInterface const* cbi, bool infinite) {
loop {
// Normally, the ConfigBroadcaster will first send a
@ -593,10 +644,61 @@ class ConfigNodeImpl {
// ConfigNode.
choose {
when(state ConfigBroadcastRegisteredRequest req = waitNext(cbi->registered.getFuture())) {
bool isRegistered = wait(registered(self));
req.reply.send(ConfigBroadcastRegisteredReply{ isRegistered });
state bool isRegistered = wait(registered(self));
ConfigGeneration generation = wait(getGeneration(self));
TraceEvent("ConfigNodeSendingRegisteredReply", self->id)
.detail("Generation", generation.toString());
req.reply.send(ConfigBroadcastRegisteredReply{ isRegistered, generation.committedVersion });
}
when(ConfigBroadcastReadyRequest readyReq = waitNext(cbi->ready.getFuture())) {
when(state ConfigBroadcastReadyRequest readyReq = waitNext(cbi->ready.getFuture())) {
state Optional<size_t> locked = wait(getLocked(self));
// New ConfigNodes with no previous state should always
// apply snapshots from the ConfigBroadcaster. Otherwise,
// the ConfigNode must be part of a new generation to
// accept a snapshot. An existing ConfigNode that restarts
// shouldn't apply a snapshot and overwrite its state if
// the set of coordinators hasn't changed.
if ((!infinite && !locked.present()) ||
(locked.present() && locked.get() != readyReq.coordinatorsHash)) {
// Apply snapshot if necessary.
if (readyReq.snapshot.size() > 0) {
for (const auto& [configKey, knobValue] : readyReq.snapshot) {
TraceEvent("ConfigNodeSettingFromSnapshot", self->id)
.detail("ConfigClass", configKey.configClass)
.detail("KnobName", configKey.knobName)
.detail("Value", knobValue.toString())
.detail("Version", readyReq.snapshotVersion);
self->kvStore->set(KeyValueRef(
BinaryWriter::toValue(configKey, IncludeVersion()).withPrefix(kvKeys.begin),
ObjectWriter::toValue(knobValue, IncludeVersion())));
}
ConfigGeneration newGeneration = { readyReq.snapshotVersion, readyReq.liveVersion };
self->kvStore->set(KeyValueRef(currentGenerationKey,
BinaryWriter::toValue(newGeneration, IncludeVersion())));
// Clear out any mutations to the keys. If these
// aren't cleared, they will overwrite the
// snapshotted values when the knobs are read.
self->kvStore->clear(KeyRangeRef(versionedMutationKey(0, 0),
versionedMutationKey(readyReq.snapshotVersion + 1, 0)));
self->kvStore->clear(KeyRangeRef(versionedAnnotationKey(0),
versionedAnnotationKey(readyReq.snapshotVersion + 1)));
self->kvStore->set(
KeyValueRef(lastCompactedVersionKey,
BinaryWriter::toValue(readyReq.snapshotVersion, IncludeVersion())));
}
// Make sure freshly up to date ConfigNode isn't
// locked! This is possible if it was a coordinator in
// a previous generation.
self->kvStore->set(
KeyValueRef(lockedKey, BinaryWriter::toValue(Optional<size_t>(), IncludeVersion())));
}
self->kvStore->set(KeyValueRef(coordinatorsHashKey,
BinaryWriter::toValue(readyReq.coordinatorsHash, IncludeVersion())));
wait(self->kvStore->commit());
TraceEvent("ConfigNodeReady", self->id).detail("WasLocked", locked.present());
readyReq.reply.send(ConfigBroadcastReadyReply{});
if (!infinite) {
return Void();
@ -606,17 +708,73 @@ class ConfigNodeImpl {
}
}
ACTOR static Future<Void> serveRegistered(ConfigNodeImpl* self, ConfigFollowerInterface const* cfi) {
loop {
choose {
when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) {
++self->compactRequests;
wait(compact(self, req));
}
}
}
}
// Many of the ConfigNode interfaces need to be served before the
// ConfigNode is officially registered with the ConfigBroadcaster. This is
// necessary due to edge cases around coordinator changes. For example, a
// ConfigNode that loses its coordinator status but then restarts before
// serving its snapshot to the new coordinators needs to be able to
// continue serving its snapshot interface when it restarts, even though it
// is no longer a coordinator.
ACTOR static Future<Void> serveUnregistered(ConfigNodeImpl* self, ConfigFollowerInterface const* cfi) {
loop {
choose {
when(ConfigFollowerGetSnapshotAndChangesRequest req =
waitNext(cfi->getSnapshotAndChanges.getFuture())) {
++self->snapshotRequests;
wait(getSnapshotAndChanges(self, req));
}
when(ConfigFollowerGetChangesRequest req = waitNext(cfi->getChanges.getFuture())) {
wait(getChanges(self, req));
}
when(ConfigFollowerRollforwardRequest req = waitNext(cfi->rollforward.getFuture())) {
++self->rollforwardRequests;
wait(rollforward(self, req));
}
when(ConfigFollowerGetCommittedVersionRequest req = waitNext(cfi->getCommittedVersion.getFuture())) {
++self->getCommittedVersionRequests;
wait(getCommittedVersion(self, req));
}
when(state ConfigFollowerLockRequest req = waitNext(cfi->lock.getFuture())) {
++self->lockRequests;
Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self));
if (!coordinatorsHash.present() || coordinatorsHash.get() == req.coordinatorsHash) {
TraceEvent("ConfigNodeLocking", self->id).log();
self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(false, IncludeVersion())));
self->kvStore->set(KeyValueRef(
lockedKey,
BinaryWriter::toValue(Optional<size_t>(req.coordinatorsHash), IncludeVersion())));
wait(self->kvStore->commit());
}
req.reply.send(Void());
}
when(wait(self->kvStore->getError())) { ASSERT(false); }
}
}
}
ACTOR static Future<Void> serve(ConfigNodeImpl* self,
ConfigBroadcastInterface const* cbi,
ConfigTransactionInterface const* cti,
ConfigFollowerInterface const* cfi) {
state Future<Void> serveUnregisteredFuture = serveUnregistered(self, cfi);
wait(serve(self, cbi, false));
self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(true, IncludeVersion())));
wait(self->kvStore->commit());
// Shouldn't return (coordinationServer will throw an error if it does).
wait(serve(self, cbi, true) || serve(self, cti) || serve(self, cfi));
wait(serve(self, cbi, true) || serve(self, cti) || serveRegistered(self, cfi) || serveUnregisteredFuture);
return Void();
}
@ -631,11 +789,12 @@ public:
compactRequests("CompactRequests", cc), rollbackRequests("RollbackRequests", cc),
rollforwardRequests("RollforwardRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc),
failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc),
getCommittedVersionRequests("GetCommittedVersionRequests", cc), successfulCommits("SuccessfulCommits", cc),
failedCommits("FailedCommits", cc), setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
getCommittedVersionRequests("GetCommittedVersionRequests", cc), lockRequests("LockRequests", cc),
successfulCommits("SuccessfulCommits", cc), failedCommits("FailedCommits", cc),
setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
getValueRequests("GetValueRequests", cc), getGenerationRequests("GetGenerationRequests", cc) {
logger = traceCounters("ConfigNodeMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigNode");
TraceEvent(SevDebug, "StartingConfigNode", id).detail("KVStoreAlreadyExists", kvStore.exists());
TraceEvent(SevInfo, "StartingConfigNode", id).detail("KVStoreAlreadyExists", kvStore.exists());
}
Future<Void> serve(ConfigBroadcastInterface const& cbi,
@ -646,7 +805,9 @@ public:
Future<Void> serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); }
Future<Void> serve(ConfigFollowerInterface const& cfi) { return serve(this, &cfi); }
Future<Void> serve(ConfigFollowerInterface const& cfi) {
return serveUnregistered(this, &cfi) && serveRegistered(this, &cfi);
}
void close() { kvStore.close(); }

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/CoordinatedState.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/Knobs.h"
@ -343,10 +344,17 @@ struct MovableCoordinatedStateImpl {
if (BUGGIFY)
wait(delay(5));
if (BUGGIFY_WITH_PROB(0.001)) {
// Simulate random cluster controller death during coordinator
// change.
throw actor_cancelled();
}
// SOMEDAY: If we are worried about someone magically getting the new cluster ID and interfering, do a second
// cs.setExclusive( encode( ReallyTo, ... ) )
TraceEvent("ChangingQuorum").detail("ConnectionString", nc.toString());
wait(changeLeaderCoordinators(self->coordinators, StringRef(nc.toString())));
wait(ConfigBroadcaster::lockConfigNodes(self->coordinators) &&
changeLeaderCoordinators(self->coordinators, StringRef(nc.toString())));
TraceEvent("ChangedQuorum").detail("ConnectionString", nc.toString());
throw coordinators_changed();
}

View File

@ -725,9 +725,9 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
when(ForwardRequest req = waitNext(interf.forward.getFuture())) {
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
if (forward.present()) {
req.reply.send(Void());
else {
} else {
StringRef clusterName = ccr->getConnectionString().clusterKeyName();
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && getClusterDescriptor(req.key).compare(clusterName)) {
TraceEvent(SevWarn, "CCRMismatch")
@ -761,12 +761,14 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder,
state Future<Void> configDatabaseServer = Never();
TraceEvent("CoordinationServer", myID)
.detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress())
.detail("Folder", dataFolder);
.detail("Folder", dataFolder)
.detail("ConfigNodeValid", configNode.isValid());
if (configNode.isValid()) {
configTransactionInterface.setupWellKnownEndpoints();
configFollowerInterface.setupWellKnownEndpoints();
configDatabaseServer = configNode->serve(cbi, configTransactionInterface, configFollowerInterface);
configDatabaseServer =
brokenPromiseToNever(configNode->serve(cbi, configTransactionInterface, configFollowerInterface));
}
try {

View File

@ -36,6 +36,8 @@ std::unique_ptr<IConfigConsumer> IConfigConsumer::createSimple(ServerCoordinator
std::unique_ptr<IConfigConsumer> IConfigConsumer::createPaxos(ServerCoordinators const& coordinators,
double pollingInterval,
Optional<double> compactionInterval) {
return std::make_unique<PaxosConfigConsumer>(coordinators, pollingInterval, compactionInterval);
Optional<double> compactionInterval,
bool readPreviousCoordinators) {
return std::make_unique<PaxosConfigConsumer>(
coordinators, pollingInterval, compactionInterval, readPreviousCoordinators);
}

View File

@ -53,10 +53,18 @@ class GetCommittedVersionQuorum {
Version largestCompactedResponse{ 0 };
// Last durably committed version.
Version lastSeenVersion;
// Largest compacted version on the existing ConfigNodes.
Version largestCompacted;
size_t totalRepliesReceived{ 0 };
size_t maxAgreement{ 0 };
// Stores the largest live version out of all the responses.
Version largestLive{ 0 };
// Stores the largest committed version out of all responses.
Version largestCommitted{ 0 };
bool allowSpecialCaseRollforward_;
// True if a quorum has zero as their committed version. See explanation
// comment below.
bool specialZeroQuorum{ false };
// Sends rollback/rollforward messages to any nodes that are not up to date
// with the latest committed version as determined by the quorum. Should
@ -67,9 +75,18 @@ class GetCommittedVersionQuorum {
Version lastCompacted,
ConfigFollowerInterface cfi) {
state Version target = quorumVersion.lastCommitted;
// TraceEvent("ConsumerUpdateNodeStart")
// .detail("NodeAddress", cfi.address())
// .detail("Target", target)
// .detail("NodeVersionLastCommitted", nodeVersion.lastCommitted)
// .detail("NodeVersionSecondToLastCommitted", nodeVersion.secondToLastCommitted)
// .detail("QuorumVersionLastCommitted", quorumVersion.lastCommitted)
// .detail("QuorumVersionSecondToLastCommitted", quorumVersion.secondToLastCommitted)
// .detail("LargestCompacted", self->largestCompacted);
if (nodeVersion.lastCommitted == target) {
return Void();
}
if (nodeVersion.lastCommitted < target) {
state Optional<Version> rollback;
if (nodeVersion.lastCommitted > quorumVersion.secondToLastCommitted) {
@ -83,7 +100,7 @@ class GetCommittedVersionQuorum {
// On the other hand, if the node is on an older committed
// version, it's possible the version it is on was never made
// durable. To be safe, roll it back by one version.
rollback = std::max(nodeVersion.lastCommitted - 1, Version{ 0 });
rollback = std::max(nodeVersion.lastCommitted - 1, self->largestCompacted);
}
if (rollback.present()) {
@ -118,6 +135,15 @@ class GetCommittedVersionQuorum {
ConfigFollowerGetChangesRequest{ lastSeenVersion, target }),
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
// TraceEvent("ConsumerUpdateNodeSendingRollforward")
// .detail("NodeAddress", cfi.address())
// .detail("RollbackTo", rollback)
// .detail("LastKnownCommitted", nodeVersion.lastCommitted)
// .detail("Target", target)
// .detail("ChangesSize", reply.changes.size())
// .detail("AnnotationsSize", reply.annotations.size())
// .detail("LargestCompacted", self->largestCompactedResponse)
// .detail("SpecialZeroQuorum", self->specialZeroQuorum);
if (cfi.hostname.present()) {
wait(timeoutError(
retryGetReplyFromHostname(
@ -139,7 +165,7 @@ class GetCommittedVersionQuorum {
// one of these errors in response to a get changes or
// rollforward request. The retry loop should handle this
// case.
TraceEvent(SevInfo, "ConfigNodeRollforwardError").error(e);
TraceEvent(SevInfo, "ConsumerConfigNodeRollforwardError").error(e);
} else {
throw;
}
@ -163,9 +189,19 @@ class GetCommittedVersionQuorum {
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
}
if (!reply.registered) {
// ConfigNodes serve their GetCommittedVersion interface before
// being registered to allow them to be rolled forward.
// However, their responses should not count towards the
// quorum.
throw future_version();
}
++self->totalRepliesReceived;
self->largestCompactedResponse = std::max(self->largestCompactedResponse, reply.lastCompacted);
state Version lastCompacted = reply.lastCompacted;
self->committed[cfi.address()] = reply.lastCommitted;
self->largestLive = std::max(self->largestLive, reply.lastLive);
self->largestCommitted = std::max(self->largestCommitted, reply.lastCommitted);
state CommittedVersions committedVersions = CommittedVersions{ self->lastSeenVersion, reply.lastCommitted };
if (self->priorVersions.find(committedVersions.lastCommitted) == self->priorVersions.end()) {
@ -174,7 +210,59 @@ class GetCommittedVersionQuorum {
auto& nodes = self->replies[committedVersions.lastCommitted];
nodes.push_back(cfi);
self->maxAgreement = std::max(nodes.size(), self->maxAgreement);
// TraceEvent("ConsumerGetCommittedVersionReply")
// .detail("From", cfi.address())
// .detail("LastCompactedVersion", lastCompacted)
// .detail("LastCommittedVersion", reply.lastCommitted)
// .detail("LastSeenVersion", self->lastSeenVersion)
// .detail("Replies", self->totalRepliesReceived)
// .detail("RepliesMatchingVersion", nodes.size())
// .detail("Coordinators", self->cfis.size())
// .detail("AllowSpecialCaseRollforward", self->allowSpecialCaseRollforward_);
if (nodes.size() >= self->cfis.size() / 2 + 1) {
// A quorum at version 0 should use any higher committed
// version seen instead of 0. Imagine the following scenario
// with three coordinators:
//
// t0 t1 t2 t3
// A 1 1 | 1
// B 1 dies | 0
// C 0 0 | 0
//
// At t0, a value at version 1 is committed to A and B. At t1,
// B dies, and now the value only exists on A. At t2, a change
// coordinators command is executed by a client, causing a
// recovery. When the ConfigBroadcaster comes online and
// attempts to read the state of the previous coordinators (at
// time t3) so it can transfer it to the new coordinators, 2/3
// ConfigNodes are unregistered and only know about version 0.
// Quorum logic dictates the committed version is, thus,
// version 0. But we know a majority committed version 1. This
// is a special case error where a ConfigNode losing data is
// immediately followed by a coordinator change and recovery,
// and 0 is a special case. Imagine the following if C instead
// has had some values committed:
//
// t0 t1 t2 t3 t4
// A 1 2 2 | 2
// B 1 2 dies | 0
// C 1 1 1 | 1
//
// In this case, there is no quorum, and so all nodes would
// (correctly) be rolled forward to version 2. Since a node
// losing data is equivalent to saying it has a committed
// version of 0, we must treat a quorum of nodes at version 0
// as a special case, and instead use the largest committed
// version we've seen as the quorum version. This does not
// affect correctness because version 0 means nothing was
// committed, so there shouldn't be an issue rolling those
// nodes forward.
if (self->allowSpecialCaseRollforward_ && committedVersions.lastCommitted == 0 &&
self->largestCommitted > 0) {
self->specialZeroQuorum = true;
committedVersions = CommittedVersions{ 0, self->largestCommitted };
}
// A quorum of ConfigNodes agree on the latest committed version.
if (self->quorumVersion.canBeSet()) {
self->quorumVersion.send(QuorumVersion{ committedVersions, true });
@ -186,7 +274,8 @@ class GetCommittedVersionQuorum {
// but the node we just got a reply from is not one of them. We may
// need to roll it forward or back.
QuorumVersion quorumVersion = wait(self->quorumVersion.getFuture());
ASSERT(committedVersions.lastCommitted != quorumVersion.versions.lastCommitted);
ASSERT(committedVersions.lastCommitted != quorumVersion.versions.lastCommitted ||
self->specialZeroQuorum);
wait(self->updateNode(self, committedVersions, quorumVersion.versions, lastCompacted, cfi));
} else if (self->maxAgreement + (self->cfis.size() - self->totalRepliesReceived) <
(self->cfis.size() / 2 + 1)) {
@ -213,13 +302,18 @@ class GetCommittedVersionQuorum {
} catch (Error& e) {
// Count a timeout as a reply.
++self->totalRepliesReceived;
// TraceEvent("ConsumerGetCommittedVersionError").error(e)
// .detail("From", cfi.address())
// .detail("Replies", self->totalRepliesReceived)
// .detail("Coordinators", self->cfis.size());
if (e.code() == error_code_version_already_compacted) {
if (self->quorumVersion.canBeSet()) {
// Calling sendError could delete self
auto local = self->quorumVersion;
local.sendError(e);
}
} else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) {
} else if (e.code() != error_code_timed_out && e.code() != error_code_future_version &&
e.code() != error_code_broken_promise) {
if (self->quorumVersion.canBeSet()) {
// Calling sendError could delete self
auto local = self->quorumVersion;
@ -231,6 +325,7 @@ class GetCommittedVersionQuorum {
std::accumulate(self->replies.begin(), self->replies.end(), 0, [](int value, auto const& p) {
return value + p.second.size();
});
if (nonTimeoutReplies >= self->cfis.size() / 2 + 1) {
// Make sure to trigger the quorumVersion if a timeout
// occurred, a quorum disagree on the committed version,
@ -239,6 +334,14 @@ class GetCommittedVersionQuorum {
// back the largest committed version seen.
self->quorumVersion.send(
QuorumVersion{ CommittedVersions{ self->lastSeenVersion, self->largestCommitted }, false });
if (e.code() == error_code_future_version) {
wait(self->updateNode(self,
CommittedVersions{ self->lastSeenVersion, self->largestCommitted },
self->quorumVersion.getFuture().get().versions,
self->largestCompactedResponse,
cfi));
}
} else if (!self->quorumVersion.isSet()) {
// Otherwise, if a quorum agree on the committed version,
// some other occurred. Notify the caller of it.
@ -253,8 +356,10 @@ class GetCommittedVersionQuorum {
}
public:
explicit GetCommittedVersionQuorum(std::vector<ConfigFollowerInterface> const& cfis, Version lastSeenVersion)
: cfis(cfis), lastSeenVersion(lastSeenVersion) {}
explicit GetCommittedVersionQuorum(std::vector<ConfigFollowerInterface> const& cfis,
Version lastSeenVersion,
Version largestCompacted)
: cfis(cfis), lastSeenVersion(lastSeenVersion), largestCompacted(largestCompacted) {}
Future<QuorumVersion> getCommittedVersion() {
ASSERT(!isReady()); // ensures this function is not accidentally called before resetting state
for (const auto& cfi : cfis) {
@ -273,6 +378,7 @@ public:
ASSERT(isReady());
return replies.at(quorumVersion.getFuture().get().versions.lastCommitted);
}
Version getLargestLive() const { return largestLive; }
Version getSmallestCommitted() const {
if (committed.size() == cfis.size()) {
Version smallest = MAX_VERSION;
@ -283,6 +389,8 @@ public:
}
return ::invalidVersion;
}
void allowSpecialCaseRollforward() { allowSpecialCaseRollforward_ = true; }
bool isSpecialZeroQuorum() const { return specialZeroQuorum; }
Future<Void> complete() const { return waitForAll(actors); }
};
@ -293,9 +401,14 @@ class PaxosConfigConsumerImpl {
Version compactionVersion{ 0 };
double pollingInterval;
Optional<double> compactionInterval;
bool allowSpecialCaseRollforward_;
bool readPreviousCoordinators;
UID id;
ACTOR static Future<Version> getCommittedVersion(PaxosConfigConsumerImpl* self) {
if (self->allowSpecialCaseRollforward_) {
self->getCommittedVersionQuorum.allowSpecialCaseRollforward();
}
QuorumVersion quorumVersion = wait(self->getCommittedVersionQuorum.getCommittedVersion());
if (!quorumVersion.isQuorum) {
throw failed_to_reach_quorum();
@ -357,29 +470,37 @@ class PaxosConfigConsumerImpl {
&ConfigFollowerInterface::getSnapshotAndChanges,
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }),
SERVER_KNOBS->GET_SNAPSHOT_AND_CHANGES_TIMEOUT));
Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted();
TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesVersion", committedVersion)
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
.detail("AnnotationsSize", reply.annotations.size())
.detail("LargestLiveVersion", self->getCommittedVersionQuorum.getLargestLive())
.detail("SmallestCommitted", smallestCommitted);
ASSERT_GE(committedVersion, self->lastSeenVersion);
self->lastSeenVersion = committedVersion;
Version smallestCommitted = self->getCommittedVersionQuorum.getSmallestCommitted();
self->compactionVersion = std::max(self->compactionVersion, smallestCommitted);
broadcaster->applySnapshotAndChanges(std::move(reply.snapshot),
reply.snapshotVersion,
reply.changes,
committedVersion,
reply.annotations,
self->getCommittedVersionQuorum.getReadReplicas());
self->getCommittedVersionQuorum.getReadReplicas(),
self->getCommittedVersionQuorum.getLargestLive(),
self->readPreviousCoordinators);
wait(self->getCommittedVersionQuorum.complete());
if (self->allowSpecialCaseRollforward_) {
self->allowSpecialCaseRollforward_ = false;
}
break;
} catch (Error& e) {
if (e.code() == error_code_failed_to_reach_quorum) {
wait(self->getCommittedVersionQuorum.complete());
} else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise &&
e.code() != error_code_version_already_compacted && e.code() != error_code_process_behind) {
e.code() != error_code_version_already_compacted && e.code() != error_code_process_behind &&
e.code() != error_code_future_version) {
throw;
}
wait(delayJittered(0.1));
@ -404,7 +525,8 @@ class PaxosConfigConsumerImpl {
// ConfigNodes changes to 1, 1, 2, the committed version
// returned would be 1.
if (committedVersion > self->lastSeenVersion) {
ASSERT(self->getCommittedVersionQuorum.getReadReplicas().size() >= self->cfis.size() / 2 + 1);
ASSERT(self->getCommittedVersionQuorum.getReadReplicas().size() >= self->cfis.size() / 2 + 1 ||
self->getCommittedVersionQuorum.isSpecialZeroQuorum());
state std::vector<ConfigFollowerInterface> readReplicas =
self->getCommittedVersionQuorum.getReadReplicas();
std::vector<Future<Void>> fs;
@ -448,8 +570,8 @@ class PaxosConfigConsumerImpl {
} catch (Error& e) {
if (e.code() == error_code_version_already_compacted || e.code() == error_code_timed_out ||
e.code() == error_code_failed_to_reach_quorum || e.code() == error_code_version_already_compacted ||
e.code() == error_code_process_behind) {
CODE_PROBE(true, "PaxosConfigConsumer get version_already_compacted error");
e.code() == error_code_process_behind || e.code() == error_code_future_version) {
CODE_PROBE(true, "PaxosConfigConsumer fetch error");
if (e.code() == error_code_failed_to_reach_quorum) {
try {
wait(self->getCommittedVersionQuorum.complete());
@ -483,39 +605,58 @@ class PaxosConfigConsumerImpl {
}
void resetCommittedVersionQuorum() {
getCommittedVersionQuorum = GetCommittedVersionQuorum{ cfis, lastSeenVersion };
getCommittedVersionQuorum = GetCommittedVersionQuorum{ cfis, lastSeenVersion, compactionVersion };
}
public:
Future<Void> readSnapshot(ConfigBroadcaster& broadcaster) { return getSnapshotAndChanges(this, &broadcaster); }
Future<Void> consume(ConfigBroadcaster& broadcaster) {
return fetchChanges(this, &broadcaster) || compactor(this, &broadcaster);
}
void allowSpecialCaseRollforward() { this->allowSpecialCaseRollforward_ = true; }
UID getID() const { return id; }
PaxosConfigConsumerImpl(std::vector<ConfigFollowerInterface> const& cfis,
double pollingInterval,
Optional<double> compactionInterval)
: cfis(cfis), getCommittedVersionQuorum(cfis, 0), pollingInterval(pollingInterval),
compactionInterval(compactionInterval), id(deterministicRandom()->randomUniqueID()) {}
Optional<double> compactionInterval,
bool readPreviousCoordinators)
: cfis(cfis), getCommittedVersionQuorum(cfis, 0, 0), pollingInterval(pollingInterval),
compactionInterval(compactionInterval), readPreviousCoordinators(readPreviousCoordinators),
id(deterministicRandom()->randomUniqueID()) {}
};
PaxosConfigConsumer::PaxosConfigConsumer(std::vector<ConfigFollowerInterface> const& cfis,
double pollingInterval,
Optional<double> compactionInterval)
: impl(PImpl<PaxosConfigConsumerImpl>::create(cfis, pollingInterval, compactionInterval)) {}
Optional<double> compactionInterval,
bool readPreviousCoordinators)
: impl(PImpl<PaxosConfigConsumerImpl>::create(cfis, pollingInterval, compactionInterval, readPreviousCoordinators)) {}
PaxosConfigConsumer::PaxosConfigConsumer(ServerCoordinators const& coordinators,
double pollingInterval,
Optional<double> compactionInterval)
: impl(PImpl<PaxosConfigConsumerImpl>::create(coordinators.configServers, pollingInterval, compactionInterval)) {}
Optional<double> compactionInterval,
bool readPreviousCoordinators)
: impl(PImpl<PaxosConfigConsumerImpl>::create(coordinators.configServers,
pollingInterval,
compactionInterval,
readPreviousCoordinators)) {}
PaxosConfigConsumer::~PaxosConfigConsumer() = default;
Future<Void> PaxosConfigConsumer::readSnapshot(ConfigBroadcaster& broadcaster) {
return impl->readSnapshot(broadcaster);
}
Future<Void> PaxosConfigConsumer::consume(ConfigBroadcaster& broadcaster) {
return impl->consume(broadcaster);
}
void PaxosConfigConsumer::allowSpecialCaseRollforward() {
impl->allowSpecialCaseRollforward();
}
UID PaxosConfigConsumer::getID() const {
return impl->getID();
}

View File

@ -145,7 +145,8 @@ class SimpleConfigConsumerImpl {
reply.changes,
committedVersion,
reply.annotations,
{ self->cfi });
{ self->cfi },
committedVersion);
return Void();
}
@ -186,10 +187,19 @@ SimpleConfigConsumer::SimpleConfigConsumer(ServerCoordinators const& coordinator
Optional<double> compactionInterval)
: impl(PImpl<SimpleConfigConsumerImpl>::create(coordinators, pollingInterval, compactionInterval)) {}
Future<Void> SimpleConfigConsumer::readSnapshot(ConfigBroadcaster& broadcaster) {
ASSERT(false);
return Void();
}
Future<Void> SimpleConfigConsumer::consume(ConfigBroadcaster& broadcaster) {
return impl->consume(broadcaster);
}
void SimpleConfigConsumer::allowSpecialCaseRollforward() {
ASSERT(false);
}
SimpleConfigConsumer::~SimpleConfigConsumer() = default;
UID SimpleConfigConsumer::getID() const {

View File

@ -1069,7 +1069,7 @@ struct CLIOptions {
const char* blobCredsFromENV = nullptr;
std::string configPath;
ConfigDBType configDBType{ ConfigDBType::DISABLED };
ConfigDBType configDBType{ ConfigDBType::PAXOS };
Reference<IClusterConnectionRecord> connectionFile;
Standalone<StringRef> machineId;
@ -1627,6 +1627,7 @@ private:
case OPT_USE_TEST_CONFIG_DB:
configDBType = ConfigDBType::SIMPLE;
break;
// TODO: Add no_config_db option which disables the configuration database
case OPT_FLOW_PROCESS_NAME:
flowProcessName = args.OptionArg();
std::cout << flowProcessName << std::endl;

View File

@ -131,10 +131,10 @@ inline bool containsMetadataMutation(const VectorRef<MutationRef>& mutations) {
(serverTagKeys.intersects(range)) || (serverTagHistoryKeys.intersects(range)) ||
(range.intersects(applyMutationsEndRange)) || (range.intersects(applyMutationsKeyVersionMapRange)) ||
(range.intersects(logRangesRange)) || (tssMappingKeys.intersects(range)) ||
(tssQuarantineKeys.intersects(range)) || (range.contains(coordinatorsKey)) ||
(range.contains(databaseLockedKey)) || (range.contains(metadataVersionKey)) ||
(range.contains(mustContainSystemMutationsKey)) || (range.contains(writeRecoveryKey)) ||
(range.intersects(testOnlyTxnStateStorePrefixRange))) {
(tssQuarantineKeys.intersects(range)) || (range.contains(previousCoordinatorsKey)) ||
(range.contains(coordinatorsKey)) || (range.contains(databaseLockedKey)) ||
(range.contains(metadataVersionKey)) || (range.contains(mustContainSystemMutationsKey)) ||
(range.contains(writeRecoveryKey)) || (range.intersects(testOnlyTxnStateStorePrefixRange))) {
return true;
}
}

View File

@ -93,6 +93,8 @@ public:
return previousWrite;
}
ServerCoordinators getCoordinators() { return coordinators; }
Future<Void> move(ClusterConnectionString const& nc) { return cstate.move(nc); }
private:

View File

@ -116,13 +116,15 @@ struct ConfigBroadcastChangesRequest {
struct ConfigBroadcastRegisteredReply {
static constexpr FileIdentifier file_identifier = 12041047;
bool registered;
Version lastSeenVersion;
ConfigBroadcastRegisteredReply() = default;
explicit ConfigBroadcastRegisteredReply(bool registered) : registered(registered) {}
explicit ConfigBroadcastRegisteredReply(bool registered, Version lastSeenVersion)
: registered(registered), lastSeenVersion(lastSeenVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, registered);
serializer(ar, registered, lastSeenVersion);
}
};
@ -151,13 +153,23 @@ struct ConfigBroadcastReadyReply {
struct ConfigBroadcastReadyRequest {
static constexpr FileIdentifier file_identifier = 7402862;
size_t coordinatorsHash;
std::map<ConfigKey, KnobValue> snapshot;
Version snapshotVersion;
Version liveVersion;
ReplyPromise<ConfigBroadcastReadyReply> reply;
ConfigBroadcastReadyRequest() = default;
ConfigBroadcastReadyRequest(size_t coordinatorsHash,
std::map<ConfigKey, KnobValue> const& snapshot,
Version snapshotVersion,
Version liveVersion)
: coordinatorsHash(coordinatorsHash), snapshot(snapshot), snapshotVersion(snapshotVersion),
liveVersion(liveVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
serializer(ar, coordinatorsHash, snapshot, snapshotVersion, liveVersion, reply);
}
};

View File

@ -39,7 +39,8 @@ class ConfigBroadcaster {
PImpl<class ConfigBroadcasterImpl> impl;
public:
explicit ConfigBroadcaster(ServerCoordinators const&, ConfigDBType);
ConfigBroadcaster();
explicit ConfigBroadcaster(ServerCoordinators const&, ConfigDBType, Future<Optional<Value>>);
ConfigBroadcaster(ConfigBroadcaster&&);
ConfigBroadcaster& operator=(ConfigBroadcaster&&);
~ConfigBroadcaster();
@ -47,7 +48,8 @@ public:
Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface const& worker);
ConfigBroadcastInterface const& worker,
bool isCoordinator);
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
@ -57,18 +59,26 @@ public:
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas);
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators = false);
void applySnapshotAndChanges(std::map<ConfigKey, KnobValue>&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas);
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators = false);
Future<Void> getError() const;
UID getID() const;
JsonBuilderObject getStatus() const;
void compact(Version compactionVersion);
// Locks all ConfigNodes running on the given coordinators, returning when
// a quorum have successfully locked.
static Future<Void> lockConfigNodes(ServerCoordinators coordinators);
public: // Testing
explicit ConfigBroadcaster(ConfigFollowerInterface const&);
Future<Void> getClientFailure(UID clientUID) const;

View File

@ -177,16 +177,21 @@ struct ConfigFollowerRollforwardRequest {
struct ConfigFollowerGetCommittedVersionReply {
static constexpr FileIdentifier file_identifier = 9214735;
bool registered;
Version lastCompacted;
Version lastLive;
Version lastCommitted;
ConfigFollowerGetCommittedVersionReply() = default;
explicit ConfigFollowerGetCommittedVersionReply(Version lastCompacted, Version lastCommitted)
: lastCompacted(lastCompacted), lastCommitted(lastCommitted) {}
explicit ConfigFollowerGetCommittedVersionReply(bool registered,
Version lastCompacted,
Version lastLive,
Version lastCommitted)
: registered(registered), lastCompacted(lastCompacted), lastLive(lastLive), lastCommitted(lastCommitted) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, lastCompacted, lastCommitted);
serializer(ar, registered, lastCompacted, lastLive, lastCommitted);
}
};
@ -202,6 +207,20 @@ struct ConfigFollowerGetCommittedVersionRequest {
}
};
struct ConfigFollowerLockRequest {
static constexpr FileIdentifier file_identifier = 1867800;
size_t coordinatorsHash;
ReplyPromise<Void> reply;
ConfigFollowerLockRequest() = default;
explicit ConfigFollowerLockRequest(size_t coordinatorsHash) : coordinatorsHash(coordinatorsHash) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, coordinatorsHash, reply);
}
};
/*
* Configuration database nodes serve a ConfigFollowerInterface which contains well known endpoints,
* used by workers to receive configuration database updates
@ -217,6 +236,7 @@ public:
RequestStream<ConfigFollowerRollforwardRequest> rollforward;
RequestStream<ConfigFollowerGetCommittedVersionRequest> getCommittedVersion;
Optional<Hostname> hostname;
RequestStream<ConfigFollowerLockRequest> lock;
ConfigFollowerInterface();
void setupWellKnownEndpoints();
@ -229,6 +249,7 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollforward, getCommittedVersion, hostname);
serializer(
ar, _id, getSnapshotAndChanges, getChanges, compact, rollforward, getCommittedVersion, hostname, lock);
}
};

View File

@ -224,6 +224,7 @@ class ConfigNode;
class ServerCoordinators : public ClientCoordinators {
public:
ServerCoordinators() {}
explicit ServerCoordinators(Reference<IClusterConnectionRecord> ccr);
std::vector<LeaderElectionRegInterface> leaderElectionServers;

View File

@ -35,7 +35,9 @@
class IConfigConsumer {
public:
virtual ~IConfigConsumer() = default;
virtual Future<Void> readSnapshot(ConfigBroadcaster& broadcaster) = 0;
virtual Future<Void> consume(ConfigBroadcaster& broadcaster) = 0;
virtual void allowSpecialCaseRollforward() = 0;
virtual UID getID() const = 0;
static std::unique_ptr<IConfigConsumer> createTestSimple(ConfigFollowerInterface const& cfi,
@ -46,5 +48,6 @@ public:
Optional<double> compactionInterval);
static std::unique_ptr<IConfigConsumer> createPaxos(ServerCoordinators const& coordinators,
double pollingInterval,
Optional<double> compactionInterval);
Optional<double> compactionInterval,
bool readPreviousCoordinators = false);
};

View File

@ -32,13 +32,17 @@ class PaxosConfigConsumer : public IConfigConsumer {
public:
PaxosConfigConsumer(ServerCoordinators const& coordinators,
double pollingInterval,
Optional<double> compactionInterval);
Optional<double> compactionInterval,
bool readPreviousCoordinators);
~PaxosConfigConsumer();
Future<Void> readSnapshot(ConfigBroadcaster& broadcaster) override;
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
void allowSpecialCaseRollforward() override;
UID getID() const override;
public: // Testing
PaxosConfigConsumer(std::vector<ConfigFollowerInterface> const& cfis,
double pollingInterval,
Optional<double> compactionInterval);
Optional<double> compactionInterval,
bool readPreviousCoordinators);
};

View File

@ -37,7 +37,9 @@ public:
double pollingInterval,
Optional<double> compactionInterval);
~SimpleConfigConsumer();
Future<Void> readSnapshot(ConfigBroadcaster& broadcaster) override;
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
void allowSpecialCaseRollforward() override;
UID getID() const override;
public: // Testing

View File

@ -1948,7 +1948,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
recoveredDiskFiles));
if (configNode.isValid()) {
errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));
errorForwarders.add(brokenPromiseToNever(localConfig->consume(interf.configBroadcastInterface)));
}
if (SERVER_KNOBS->ENABLE_WORKER_HEALTH_MONITOR) {
@ -3319,8 +3319,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
state std::vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
state Reference<ConfigNode> configNode;
state Reference<LocalConfiguration> localConfig =
makeReference<LocalConfiguration>(dataFolder, configPath, manualKnobOverrides);
state Reference<LocalConfiguration> localConfig = makeReference<LocalConfiguration>(
dataFolder, configPath, manualKnobOverrides, g_network->isSimulated() ? IsTest::True : IsTest::False);
// setupStackSignal();
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::Worker;
@ -3329,11 +3329,9 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
}
// FIXME: Initializing here causes simulation issues, these must be fixed
/*
if (configDBType != ConfigDBType::DISABLED) {
wait(localConfig->initialize());
}
*/
// if (configDBType != ConfigDBType::DISABLED) {
// wait(localConfig->initialize());
// }
actors.push_back(serveProtocolInfo());
actors.push_back(serveProcess());

View File

@ -33,6 +33,7 @@ struct ChangeConfigWorkload : TestWorkload {
double minDelayBeforeChange, maxDelayBeforeChange;
std::string configMode; //<\"single\"|\"double\"|\"triple\">
std::string networkAddresses; // comma separated list e.g. "127.0.0.1:4000,127.0.0.1:4001"
int coordinatorChanges; // number of times to change coordinators. Only applied if `coordinators` is set to `auto`
ChangeConfigWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
minDelayBeforeChange = getOption(options, LiteralStringRef("minDelayBeforeChange"), 0);
@ -40,6 +41,7 @@ struct ChangeConfigWorkload : TestWorkload {
ASSERT(maxDelayBeforeChange >= minDelayBeforeChange);
configMode = getOption(options, LiteralStringRef("configMode"), StringRef()).toString();
networkAddresses = getOption(options, LiteralStringRef("coordinators"), StringRef()).toString();
coordinatorChanges = getOption(options, LiteralStringRef("coordinatorChanges"), 1);
}
std::string description() const override { return "ChangeConfig"; }
@ -124,6 +126,15 @@ struct ChangeConfigWorkload : TestWorkload {
wait(CoordinatorsChangeActor(cx, self));
}
// Run additional coordinator changes if specified.
if (self->networkAddresses.size() && self->networkAddresses == "auto") {
state int i;
for (i = 1; i < self->coordinatorChanges; ++i) {
wait(delay(20));
wait(CoordinatorsChangeActor(cx, self, true));
}
}
if (!extraConfigureBefore) {
wait(self->configureExtraDatabases(self));
}

View File

@ -39,12 +39,14 @@ class ConfigIncrementWorkload : public TestWorkload {
static Key getConfigKey() { return Tuple::makeTuple(/* config class */ nullptr, testKnobName).pack(); }
ACTOR static Future<int> get(Reference<ISingleThreadTransaction> tr) {
TraceEvent(SevDebug, "ConfigIncrementGet");
state TraceEvent te(SevDebug, "ConfigIncrementGet");
Optional<Value> serializedValue = wait(tr->get(getConfigKey()));
if (!serializedValue.present()) {
return 0;
} else {
return BinaryReader::fromStringRef<int>(serializedValue.get(), Unversioned());
int value = BinaryReader::fromStringRef<int>(serializedValue.get(), Unversioned());
te.detail("Value", value);
return value;
}
}
@ -98,9 +100,9 @@ class ConfigIncrementWorkload : public TestWorkload {
}
ACTOR static Future<bool> check(ConfigIncrementWorkload* self, Database cx) {
state Reference<ISingleThreadTransaction> tr = self->getTransaction(cx);
loop {
try {
state Reference<ISingleThreadTransaction> tr = self->getTransaction(cx);
state int currentValue = wait(get(tr));
auto expectedValue = self->incrementActors * self->incrementsPerActor;
TraceEvent("ConfigIncrementCheck")

View File

@ -9,7 +9,7 @@ testTitle = 'ConfigIncrement'
incrementActors = 2
incrementsPerActor = 10
meanSleepWithinTransactions = 0.01
meanSleepBetweenTransactions = 0.1
meanSleepBetweenTransactions = 10
[[test.workload]]
testName = 'Attrition'
@ -17,3 +17,9 @@ testTitle = 'ConfigIncrement'
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test.workload]]
testName = 'ChangeConfig'
maxDelayBeforeChange = 120.0
coordinators = 'auto'
coordinatorChanges = 2

View File

@ -14,3 +14,9 @@ testTitle = 'ConfigIncrement'
[[test.workload]]
testName = 'Attrition'
reboot = false
[[test.workload]]
testName = 'ChangeConfig'
maxDelayBeforeChange = 120.0
coordinators = 'auto'
coordinatorChanges = 2