Fix dynamic knobs correctness issues

This commit is contained in:
Lukas Joswiak 2022-02-01 22:27:12 -08:00
parent 96dd86ebf8
commit d5a562e6b8
23 changed files with 562 additions and 196 deletions

View File

@ -223,6 +223,11 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BLOBSTORE_READ_REQUESTS_PER_SECOND, 100 );
init( BLOBSTORE_DELETE_REQUESTS_PER_SECOND, 200 );
// Dynamic Knobs
init( COMMIT_QUORUM_TIMEOUT, 3.0 );
init( GET_GENERATION_QUORUM_TIMEOUT, 3.0 );
init( GET_KNOB_TIMEOUT, 3.0 );
// Client Status Info
init(CSI_SAMPLING_PROBABILITY, -1.0);
init(CSI_SIZE_LIMIT, std::numeric_limits<int64_t>::max());

View File

@ -189,6 +189,11 @@ public:
int32_t DEFAULT_AUTO_RESOLVERS;
int32_t DEFAULT_AUTO_LOGS;
// Dynamic Knobs
double COMMIT_QUORUM_TIMEOUT;
double GET_GENERATION_QUORUM_TIMEOUT;
double GET_KNOB_TIMEOUT;
// Client Status Info
double CSI_SAMPLING_PROBABILITY;
int64_t CSI_SIZE_LIMIT;

View File

@ -39,6 +39,10 @@ struct ConfigGeneration {
bool operator<(ConfigGeneration const&) const;
bool operator>(ConfigGeneration const&) const;
std::string toString() const {
return format("liveVersion: %d, committedVersion: %d", liveVersion, committedVersion);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, committedVersion, liveVersion);

View File

@ -57,15 +57,16 @@ class CommitQuorum {
ConfigGeneration generation,
ConfigTransactionInterface cti) {
try {
wait(retryBrokenPromise(cti.commit, self->getCommitRequest(generation)));
wait(timeoutError(cti.commit.getReply(self->getCommitRequest(generation)),
CLIENT_KNOBS->COMMIT_QUORUM_TIMEOUT));
++self->successful;
} catch (Error& e) {
// self might be destroyed if this actor is canceled
// self might be destroyed if this actor is cancelled
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (e.code() == error_code_not_committed) {
if (e.code() == error_code_not_committed || e.code() == error_code_timed_out) {
++self->failed;
} else {
++self->maybeCommitted;
@ -117,21 +118,41 @@ class GetGenerationQuorum {
Future<ConfigGeneration> getGenerationFuture;
ACTOR static Future<Void> addRequestActor(GetGenerationQuorum* self, ConfigTransactionInterface cti) {
ConfigTransactionGetGenerationReply reply = wait(
retryBrokenPromise(cti.getGeneration, ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion }));
loop {
try {
ConfigTransactionGetGenerationReply reply = wait(timeoutError(
cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion }),
CLIENT_KNOBS->GET_GENERATION_QUORUM_TIMEOUT));
++self->totalRepliesReceived;
auto gen = reply.generation;
self->lastSeenLiveVersion = std::max(gen.liveVersion, self->lastSeenLiveVersion.orDefault(::invalidVersion));
auto& replicas = self->seenGenerations[gen];
replicas.push_back(cti);
self->maxAgreement = std::max(replicas.size(), self->maxAgreement);
if (replicas.size() >= self->ctis.size() / 2 + 1 && !self->result.isSet()) {
self->result.send(gen);
} else if (self->maxAgreement + (self->ctis.size() - self->totalRepliesReceived) <
(self->ctis.size() / 2 + 1)) {
if (!self->result.isError()) {
self->result.sendError(failed_to_reach_quorum());
++self->totalRepliesReceived;
auto gen = reply.generation;
self->lastSeenLiveVersion =
std::max(gen.liveVersion, self->lastSeenLiveVersion.orDefault(::invalidVersion));
auto& replicas = self->seenGenerations[gen];
replicas.push_back(cti);
self->maxAgreement = std::max(replicas.size(), self->maxAgreement);
if (replicas.size() >= self->ctis.size() / 2 + 1 && !self->result.isSet()) {
self->result.send(gen);
} else if (self->maxAgreement + (self->ctis.size() - self->totalRepliesReceived) <
(self->ctis.size() / 2 + 1)) {
if (!self->result.isError()) {
self->result.sendError(failed_to_reach_quorum());
}
}
break;
} catch (Error& e) {
if (e.code() == error_code_broken_promise) {
continue;
} else if (e.code() == error_code_timed_out) {
++self->totalRepliesReceived;
if (self->totalRepliesReceived == self->ctis.size() && self->result.canBeSet() &&
!self->result.isError()) {
self->result.sendError(failed_to_reach_quorum());
}
break;
} else {
throw;
}
}
}
return Void();
@ -151,9 +172,10 @@ class GetGenerationQuorum {
} catch (Error& e) {
if (e.code() == error_code_failed_to_reach_quorum) {
TEST(true); // Failed to reach quorum getting generation
wait(delayJittered(0.01 * (1 << retries)));
wait(delayJittered(0.005 * (1 << retries)));
++retries;
self->actors.clear(false);
self->seenGenerations.clear();
self->result.reset();
self->totalRepliesReceived = 0;
self->maxAgreement = 0;
@ -197,15 +219,25 @@ class PaxosConfigTransactionImpl {
Database cx;
ACTOR static Future<Optional<Value>> get(PaxosConfigTransactionImpl* self, Key key) {
state ConfigKey configKey = ConfigKey::decodeKey(key);
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
// TODO: Load balance
ConfigTransactionGetReply reply = wait(retryBrokenPromise(
self->getGenerationQuorum.getReadReplicas()[0].get, ConfigTransactionGetRequest{ generation, configKey }));
if (reply.value.present()) {
return reply.value.get().toValue();
} else {
return Optional<Value>{};
loop {
try {
state ConfigKey configKey = ConfigKey::decodeKey(key);
ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration());
// TODO: Load balance
ConfigTransactionGetReply reply =
wait(timeoutError(self->getGenerationQuorum.getReadReplicas()[0].get.getReply(
ConfigTransactionGetRequest{ generation, configKey }),
CLIENT_KNOBS->GET_KNOB_TIMEOUT));
if (reply.value.present()) {
return reply.value.get().toValue();
} else {
return Optional<Value>{};
}
} catch (Error& e) {
if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) {
throw;
}
}
}
}

View File

@ -696,7 +696,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( COORDINATOR_LEADER_CONNECTION_TIMEOUT, 20.0 );
// Dynamic Knobs (implementation)
init( GET_COMMITTED_VERSION_TIMEOUT, 3.0 ); // Maximum time the consumer should wait for a response from a ConfigNode when asking for the latest committed version.
init( UPDATE_NODE_TIMEOUT, 3.0 );
init( GET_COMMITTED_VERSION_TIMEOUT, 3.0 );
init( GET_SNAPSHOT_AND_CHANGES_TIMEOUT, 3.0 );
init( FETCH_CHANGES_TIMEOUT , 3.0 );
// Buggification
init( BUGGIFIED_EVENTUAL_CONSISTENCY, 1.0 );

View File

@ -637,7 +637,10 @@ public:
double COORDINATOR_LEADER_CONNECTION_TIMEOUT;
// Dynamic Knobs (implementation)
double UPDATE_NODE_TIMEOUT;
double GET_COMMITTED_VERSION_TIMEOUT;
double GET_SNAPSHOT_AND_CHANGES_TIMEOUT;
double FETCH_CHANGES_TIMEOUT;
// Buggification
double BUGGIFIED_EVENTUAL_CONSISTENCY;

View File

@ -1067,12 +1067,23 @@ void haltRegisteringOrCurrentSingleton(ClusterControllerData* self,
}
}
void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, ConfigBroadcaster* configBroadcaster) {
void registerWorker(RegisterWorkerRequest req,
ClusterControllerData* self,
ServerCoordinators coordinators,
ConfigBroadcaster* configBroadcaster) {
const WorkerInterface& w = req.wi;
ProcessClass newProcessClass = req.processClass;
auto info = self->id_worker.find(w.locality.processId());
ClusterControllerPriorityInfo newPriorityInfo = req.priorityInfo;
newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController);
Optional<ConfigFollowerInterface> cfi;
bool isCoordinator =
std::find_if(coordinators.configServers.begin(),
coordinators.configServers.end(),
[&req](const ConfigFollowerInterface& cfi) {
return cfi.address() == req.wi.address() || (req.wi.secondaryAddress().present() &&
cfi.address() == req.wi.secondaryAddress().get());
}) != coordinators.configServers.end();
for (auto it : req.incompatiblePeers) {
self->db.incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
@ -1156,8 +1167,9 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
self->masterProcessId = w.locality.processId();
}
if (configBroadcaster != nullptr) {
self->addActor.send(configBroadcaster->registerWorker(
if (configBroadcaster != nullptr && isCoordinator) {
self->addActor.send(configBroadcaster->registerNode(
w,
req.lastSeenKnobVersion,
req.knobConfigClassSet,
self->id_worker[w.locality.processId()].watcher,
@ -1187,12 +1199,12 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint());
self->updateDBInfo.trigger();
}
if (configBroadcaster != nullptr) {
self->addActor.send(
configBroadcaster->registerWorker(req.lastSeenKnobVersion,
req.knobConfigClassSet,
info->second.watcher,
info->second.details.interf.configBroadcastInterface));
if (configBroadcaster != nullptr && isCoordinator) {
self->addActor.send(configBroadcaster->registerNode(w,
req.lastSeenKnobVersion,
req.knobConfigClassSet,
info->second.watcher,
info->second.details.interf.configBroadcastInterface));
}
checkOutstandingRequests(self);
} else {
@ -2503,7 +2515,8 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
}
when(RegisterWorkerRequest req = waitNext(interf.registerWorker.getFuture())) {
++self.registerWorkerRequests;
registerWorker(req, &self, (configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster);
registerWorker(
req, &self, coordinators, (configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster);
}
when(GetWorkersRequest req = waitNext(interf.getWorkers.getFuture())) {
++self.getWorkersRequests;

View File

@ -1885,4 +1885,4 @@ std::string& getRecoveryEventName(ClusterRecoveryEventType type) {
auto iter = recoveryEventNameMap.find(type);
ASSERT(iter != recoveryEventNameMap.end());
return iter->second;
}
}

View File

@ -113,6 +113,54 @@ struct ConfigBroadcastChangesRequest {
}
};
struct ConfigBroadcastRegisteredReply {
static constexpr FileIdentifier file_identifier = 12041047;
bool registered;
ConfigBroadcastRegisteredReply() = default;
explicit ConfigBroadcastRegisteredReply(bool registered) : registered(registered) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, registered);
}
};
struct ConfigBroadcastRegisteredRequest {
static constexpr FileIdentifier file_identifier = 6921417;
ReplyPromise<ConfigBroadcastRegisteredReply> reply;
ConfigBroadcastRegisteredRequest() = default;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct ConfigBroadcastReadyReply {
static constexpr FileIdentifier file_identifier = 7032251;
ConfigBroadcastReadyReply() = default;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar);
}
};
struct ConfigBroadcastReadyRequest {
static constexpr FileIdentifier file_identifier = 7402862;
ReplyPromise<ConfigBroadcastReadyReply> reply;
ConfigBroadcastReadyRequest() = default;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
/*
* The ConfigBroadcaster uses a ConfigBroadcastInterface from each worker to
* push updates made to the configuration database to the worker.
@ -124,6 +172,8 @@ public:
static constexpr FileIdentifier file_identifier = 1676543;
RequestStream<ConfigBroadcastSnapshotRequest> snapshot;
RequestStream<ConfigBroadcastChangesRequest> changes;
RequestStream<ConfigBroadcastRegisteredRequest> registered;
RequestStream<ConfigBroadcastReadyRequest> ready;
ConfigBroadcastInterface() : _id(deterministicRandom()->randomUniqueID()) {}
@ -133,6 +183,6 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, _id, snapshot, changes);
serializer(ar, _id, snapshot, changes, registered, ready);
}
};

View File

@ -91,6 +91,11 @@ class ConfigBroadcasterImpl {
Counter snapshotRequest;
Future<Void> logger;
int coordinators = 0;
std::unordered_set<NetworkAddress> activeConfigNodes;
bool disallowUnregistered = false;
Promise<Void> newConfigNodesAllowed;
Future<Void> pushSnapshot(Version snapshotVersion, BroadcastClientDetails const& client) {
if (client.lastSeenVersion >= snapshotVersion) {
return Void();
@ -200,20 +205,87 @@ class ConfigBroadcasterImpl {
return Void();
}
ACTOR static Future<Void> waitForFailure(ConfigBroadcasterImpl* self, Future<Void> watcher, UID clientUID) {
ACTOR static Future<Void> waitForFailure(ConfigBroadcasterImpl* self,
Future<Void> watcher,
UID clientUID,
NetworkAddress clientAddress) {
wait(watcher);
TraceEvent(SevDebug, "ConfigBroadcastClientDied", self->id).detail("ClientID", clientUID);
TraceEvent(SevDebug, "ConfigBroadcastClientDied", self->id)
.detail("ClientID", clientUID)
.detail("Address", clientAddress);
self->clients.erase(clientUID);
self->clientFailures.erase(clientUID);
self->activeConfigNodes.erase(clientAddress);
// See comment where this promise is reset below.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
}
return Void();
}
ACTOR static Future<Void> registerWorker(ConfigBroadcaster* self,
ConfigBroadcasterImpl* impl,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
// Determines whether the registering ConfigNode is allowed to start
// serving configuration database requests and snapshot data. In order to
// ensure strict serializability, some nodes may be temporarily restricted
// from participation until the other nodes in the system are brought up to
// date.
ACTOR static Future<Void> registerNode_internal(ConfigBroadcasterImpl* self,
WorkerInterface w,
Version lastSeenVersion) {
state NetworkAddress address = w.address();
// Ask the registering ConfigNode whether it has registered in the past.
ConfigBroadcastRegisteredReply reply =
wait(w.configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{}));
state bool registered = reply.registered;
if (self->activeConfigNodes.find(address) != self->activeConfigNodes.end()) {
self->activeConfigNodes.erase(address);
// Since a node can die and re-register before the broadcaster
// receives notice that the node has died, we need to check for
// re-registration of a node here. There are two places that can
// reset the promise to allow new nodes, make sure the promise is
// actually set before resetting it. This prevents a node from
// dying, registering, waiting on the promise, then the broadcaster
// receives the notification the node has died and resets the
// promise again.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
}
}
if (registered) {
if (!self->disallowUnregistered) {
self->activeConfigNodes.clear();
}
self->activeConfigNodes.insert(address);
self->disallowUnregistered = true;
} else if (self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) {
// Need to allow registration of previously unregistered nodes when
// the cluster first starts up.
self->activeConfigNodes.insert(address);
if (self->activeConfigNodes.size() >= self->coordinators / 2 + 1 &&
self->newConfigNodesAllowed.canBeSet()) {
self->newConfigNodesAllowed.send(Void());
}
} else {
self->disallowUnregistered = true;
}
if (!registered) {
wait(self->newConfigNodesAllowed.getFuture());
}
wait(success(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{})));
return Void();
}
ACTOR static Future<Void> registerNode(ConfigBroadcaster* self,
ConfigBroadcasterImpl* impl,
WorkerInterface w,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
state BroadcastClientDetails client(
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface));
if (!impl->consumerFuture.isValid()) {
@ -227,33 +299,50 @@ class ConfigBroadcasterImpl {
TraceEvent(SevDebug, "ConfigBroadcasterRegisteringWorker", impl->id)
.detail("ClientID", broadcastInterface.id())
.detail("MostRecentVersion", impl->mostRecentVersion)
.detail("ClientLastSeenVersion", lastSeenVersion);
.detail("MostRecentVersion", impl->mostRecentVersion);
impl->actors.add(registerNode_internal(impl, w, lastSeenVersion));
// Push full snapshot to worker if it isn't up to date.
wait(impl->pushSnapshot(impl->mostRecentVersion, client));
impl->clients[broadcastInterface.id()] = client;
impl->clientFailures[broadcastInterface.id()] = waitForFailure(impl, watcher, broadcastInterface.id());
impl->clientFailures[broadcastInterface.id()] =
waitForFailure(impl, watcher, broadcastInterface.id(), w.address());
return Void();
}
public:
Future<Void> registerWorker(ConfigBroadcaster& self,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
return registerWorker(&self, this, lastSeenVersion, configClassSet, watcher, broadcastInterface);
Future<Void> registerNode(ConfigBroadcaster& self,
WorkerInterface const& w,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface const& broadcastInterface) {
return registerNode(&self, this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface);
}
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
TraceEvent(SevDebug, "ConfigBroadcasterApplyingChanges", id)
.detail("ChangesSize", changes.size())
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("NewMostRecentVersion", mostRecentVersion)
.detail("AnnotationsSize", annotations.size());
addChanges(changes, mostRecentVersion, annotations);
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
if (mostRecentVersion >= 0) {
TraceEvent(SevDebug, "ConfigBroadcasterApplyingChanges", id)
.detail("ChangesSize", changes.size())
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("NewMostRecentVersion", mostRecentVersion)
.detail("ActiveReplicas", readReplicas.size());
addChanges(changes, mostRecentVersion, annotations);
}
if (newConfigNodesAllowed.canBeSet()) {
for (const auto& cfi : readReplicas) {
this->activeConfigNodes.insert(cfi.address());
}
if (activeConfigNodes.size() >= coordinators / 2 + 1) {
disallowUnregistered = true;
newConfigNodesAllowed.send(Void());
}
}
}
template <class Snapshot>
@ -261,23 +350,34 @@ public:
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
TraceEvent(SevDebug, "ConfigBroadcasterApplyingSnapshotAndChanges", id)
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("SnapshotSize", snapshot.size())
.detail("SnapshotVersion", snapshotVersion)
.detail("ChangesSize", changes.size())
.detail("ChangesVersion", changesVersion)
.detail("AnnotationsSize", annotations.size());
.detail("ActiveReplicas", readReplicas.size());
actors.add(pushSnapshotAndChanges(this, snapshot, snapshotVersion, changes, changesVersion, annotations));
for (const auto& cfi : readReplicas) {
this->activeConfigNodes.insert(cfi.address());
}
if (activeConfigNodes.size() >= coordinators / 2 + 1 && newConfigNodesAllowed.canBeSet()) {
disallowUnregistered = true;
newConfigNodesAllowed.send(Void());
}
}
ConfigBroadcasterImpl(ConfigFollowerInterface const& cfi) : ConfigBroadcasterImpl() {
coordinators = 1;
consumer = IConfigConsumer::createTestSimple(cfi, 0.5, Optional<double>{});
TraceEvent(SevDebug, "ConfigBroadcasterStartingConsumer", id).detail("Consumer", consumer->getID());
}
ConfigBroadcasterImpl(ServerCoordinators const& coordinators, ConfigDBType configDBType) : ConfigBroadcasterImpl() {
this->coordinators = coordinators.configServers.size();
if (configDBType != ConfigDBType::DISABLED) {
if (configDBType == ConfigDBType::SIMPLE) {
consumer = IConfigConsumer::createSimple(coordinators, 0.5, Optional<double>{});
@ -366,17 +466,19 @@ ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default;
ConfigBroadcaster::~ConfigBroadcaster() = default;
Future<Void> ConfigBroadcaster::registerWorker(Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
return impl->registerWorker(*this, lastSeenVersion, configClassSet, watcher, broadcastInterface);
Future<Void> ConfigBroadcaster::registerNode(WorkerInterface const& w,
Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface const& broadcastInterface) {
return impl->registerNode(*this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface);
}
void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
impl->applyChanges(changes, mostRecentVersion, annotations);
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
impl->applyChanges(changes, mostRecentVersion, annotations, readReplicas);
}
void ConfigBroadcaster::applySnapshotAndChanges(
@ -384,8 +486,9 @@ void ConfigBroadcaster::applySnapshotAndChanges(
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
impl->applySnapshotAndChanges(snapshot, snapshotVersion, changes, changesVersion, annotations);
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
impl->applySnapshotAndChanges(snapshot, snapshotVersion, changes, changesVersion, annotations, readReplicas);
}
void ConfigBroadcaster::applySnapshotAndChanges(
@ -393,8 +496,10 @@ void ConfigBroadcaster::applySnapshotAndChanges(
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
impl->applySnapshotAndChanges(std::move(snapshot), snapshotVersion, changes, changesVersion, annotations);
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
impl->applySnapshotAndChanges(
std::move(snapshot), snapshotVersion, changes, changesVersion, annotations, readReplicas);
}
Future<Void> ConfigBroadcaster::getError() const {

View File

@ -43,23 +43,27 @@ public:
ConfigBroadcaster(ConfigBroadcaster&&);
ConfigBroadcaster& operator=(ConfigBroadcaster&&);
~ConfigBroadcaster();
Future<Void> registerWorker(Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface worker);
Future<Void> registerNode(WorkerInterface const& w,
Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface const& worker);
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas);
void applySnapshotAndChanges(std::map<ConfigKey, KnobValue> const& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas);
void applySnapshotAndChanges(std::map<ConfigKey, KnobValue>&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas);
Future<Void> getError() const;
UID getID() const;
JsonBuilderObject getStatus() const;

View File

@ -269,15 +269,15 @@ class BroadcasterToLocalConfigEnvironment {
wait(self->readFrom.setup());
self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>();
self->readFrom.connectToBroadcaster(self->cbi);
self->broadcastServer =
self->broadcaster.registerWorker(0, configClassSet, self->workerFailure.getFuture(), self->cbi->get());
self->broadcastServer = self->broadcaster.registerNode(
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get());
return Void();
}
void addMutation(Optional<KeyRef> configClass, KeyRef knobName, KnobValueRef value) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
appendVersionedMutation(versionedMutations, ++lastWrittenVersion, configClass, knobName, value);
broadcaster.applyChanges(versionedMutations, lastWrittenVersion, {});
broadcaster.applyChanges(versionedMutations, lastWrittenVersion, {}, {});
}
public:
@ -303,8 +303,11 @@ public:
broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{});
readFrom.connectToBroadcaster(cbi);
broadcastServer = broadcaster.registerWorker(
readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), cbi->get());
broadcastServer = broadcaster.registerNode(WorkerInterface(),
readFrom.lastSeenVersion(),
readFrom.configClassSet(),
workerFailure.getFuture(),
cbi->get());
}
Future<Void> restartLocalConfig(std::string const& newConfigPath) {
@ -436,8 +439,8 @@ class TransactionToLocalConfigEnvironment {
wait(self->readFrom.setup());
self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>();
self->readFrom.connectToBroadcaster(self->cbi);
self->broadcastServer =
self->broadcaster.registerWorker(0, configClassSet, self->workerFailure.getFuture(), self->cbi->get());
self->broadcastServer = self->broadcaster.registerNode(
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get());
return Void();
}
@ -454,8 +457,11 @@ public:
broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{});
readFrom.connectToBroadcaster(cbi);
broadcastServer = broadcaster.registerWorker(
readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), cbi->get());
broadcastServer = broadcaster.registerNode(WorkerInterface(),
readFrom.lastSeenVersion(),
readFrom.configClassSet(),
workerFailure.getFuture(),
cbi->get());
}
Future<Void> restartLocalConfig(std::string const& newConfigPath) {

View File

@ -191,6 +191,8 @@ struct ConfigFollowerGetCommittedVersionRequest {
static constexpr FileIdentifier file_identifier = 1093472;
ReplyPromise<ConfigFollowerGetCommittedVersionReply> reply;
ConfigFollowerGetCommittedVersionRequest() = default;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
@ -218,6 +220,7 @@ public:
bool operator==(ConfigFollowerInterface const& rhs) const;
bool operator!=(ConfigFollowerInterface const& rhs) const;
UID id() const { return _id; }
NetworkAddress address() const { return getSnapshotAndChanges.getEndpoint().getPrimaryAddress(); }
template <class Ar>
void serialize(Ar& ar) {

View File

@ -34,6 +34,7 @@ namespace {
const KeyRef lastCompactedVersionKey = "lastCompactedVersion"_sr;
const KeyRef currentGenerationKey = "currentGeneration"_sr;
const KeyRef registeredKey = "registered"_sr;
const KeyRangeRef kvKeys = KeyRangeRef("kv/"_sr, "kv0"_sr);
const KeyRangeRef mutationKeys = KeyRangeRef("mutation/"_sr, "mutation0"_sr);
const KeyRangeRef annotationKeys = KeyRangeRef("annotation/"_sr, "annotation0"_sr);
@ -202,14 +203,14 @@ class ConfigNodeImpl {
state Version committedVersion =
wait(map(getGeneration(self), [](auto const& gen) { return gen.committedVersion; }));
// TODO: Reenable this when running the ConfigIncrement workload with reboot=false
// if (committedVersion < req.mostRecentVersion) {
// // Handle a very rare case where a ConfigNode loses data between
// // responding with a committed version and responding to the
// // subsequent get changes request.
// TEST(true); // ConfigNode data loss occurred on a minority of coordinators
// req.reply.sendError(process_behind()); // Reuse the process_behind error
// return Void();
// }
if (committedVersion < req.mostRecentVersion) {
// Handle a very rare case where a ConfigNode loses data between
// responding with a committed version and responding to the
// subsequent get changes request.
TEST(true); // ConfigNode data loss occurred on a minority of coordinators
req.reply.sendError(process_behind()); // Reuse the process_behind error
return Void();
}
state Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, req.lastSeenVersion + 1, committedVersion));
state Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> versionedAnnotations =
@ -443,8 +444,7 @@ class ConfigNodeImpl {
TraceEvent(SevDebug, "ConfigNodeGettingSnapshot", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
.detail("ChangesSize", reply.changes.size());
req.reply.send(reply);
return Void();
}
@ -506,6 +506,11 @@ class ConfigNodeImpl {
req.reply.sendError(transaction_too_old());
return Void();
}
TraceEvent("ConfigNodeRollforward")
.detail("RollbackTo", req.rollback)
.detail("Target", req.target)
.detail("LastKnownCommitted", req.lastKnownCommitted)
.detail("Committed", currentGeneration.committedVersion);
// Rollback to prior known committed version to erase any commits not
// made on a quorum.
if (req.rollback.present() && req.rollback.get() < currentGeneration.committedVersion) {
@ -570,6 +575,42 @@ class ConfigNodeImpl {
}
}
ACTOR static Future<Void> serve(ConfigNodeImpl* self, ConfigBroadcastInterface const* cbi, bool infinite) {
loop {
choose {
when(state ConfigBroadcastRegisteredRequest req = waitNext(cbi->registered.getFuture())) {
bool isRegistered = wait(registered(self));
req.reply.send(ConfigBroadcastRegisteredReply{ isRegistered });
}
when(ConfigBroadcastReadyRequest readyReq = waitNext(cbi->ready.getFuture())) {
readyReq.reply.send(ConfigBroadcastReadyReply{});
if (!infinite) {
return Void();
}
}
}
}
}
ACTOR static Future<Void> serve(ConfigNodeImpl* self,
ConfigBroadcastInterface const* cbi,
ConfigTransactionInterface const* cti,
ConfigFollowerInterface const* cfi) {
wait(serve(self, cbi, false));
self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(true, IncludeVersion())));
wait(self->kvStore->commit());
// Shouldn't return (coordinationServer will throw an error if it does).
wait(serve(self, cbi, true) || serve(self, cti) || serve(self, cfi));
return Void();
}
ACTOR static Future<bool> registered(ConfigNodeImpl* self) {
Optional<Value> value = wait(self->kvStore->readValue(registeredKey));
return value.present();
}
public:
ConfigNodeImpl(std::string const& folder)
: id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), cc("ConfigNode"),
@ -587,6 +628,12 @@ public:
Future<Void> serve(ConfigFollowerInterface const& cfi) { return serve(this, &cfi); }
Future<Void> serve(ConfigBroadcastInterface const& cbi,
ConfigTransactionInterface const& cti,
ConfigFollowerInterface const& cfi) {
return serve(this, &cbi, &cti, &cfi);
}
void close() { kvStore.close(); }
Future<Void> onClosed() { return kvStore.onClosed(); }
@ -604,6 +651,12 @@ Future<Void> ConfigNode::serve(ConfigFollowerInterface const& cfi) {
return impl->serve(cfi);
}
Future<Void> ConfigNode::serve(ConfigBroadcastInterface const& cbi,
ConfigTransactionInterface const& cti,
ConfigFollowerInterface const& cfi) {
return impl->serve(cbi, cti, cfi);
}
void ConfigNode::close() {
impl->close();
}

View File

@ -25,6 +25,7 @@
#include "fdbclient/ConfigTransactionInterface.h"
#include "fdbclient/PImpl.h"
#include "fdbserver/ConfigFollowerInterface.h"
#include "fdbserver/ConfigBroadcastInterface.h"
class ConfigNode : public ReferenceCounted<ConfigNode> {
PImpl<class ConfigNodeImpl> impl;
@ -34,6 +35,9 @@ public:
~ConfigNode();
Future<Void> serve(ConfigTransactionInterface const&);
Future<Void> serve(ConfigFollowerInterface const&);
Future<Void> serve(ConfigBroadcastInterface const&,
ConfigTransactionInterface const&,
ConfigFollowerInterface const&);
public: // Testing
void close();

View File

@ -732,25 +732,23 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
ACTOR Future<Void> coordinationServer(std::string dataFolder,
Reference<IClusterConnectionRecord> ccr,
ConfigDBType configDBType) {
Reference<ConfigNode> configNode,
ConfigBroadcastInterface cbi) {
state UID myID = deterministicRandom()->randomUniqueID();
state LeaderElectionRegInterface myLeaderInterface(g_network);
state GenerationRegInterface myInterface(g_network);
state OnDemandStore store(dataFolder, myID, "coordination-");
state ConfigTransactionInterface configTransactionInterface;
state ConfigFollowerInterface configFollowerInterface;
state Reference<ConfigNode> configNode;
state Future<Void> configDatabaseServer = Never();
TraceEvent("CoordinationServer", myID)
.detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress())
.detail("Folder", dataFolder);
if (configDBType != ConfigDBType::DISABLED) {
if (configNode.isValid()) {
configTransactionInterface.setupWellKnownEndpoints();
configFollowerInterface.setupWellKnownEndpoints();
configNode = makeReference<ConfigNode>(dataFolder);
configDatabaseServer =
configNode->serve(configTransactionInterface) || configNode->serve(configFollowerInterface);
configDatabaseServer = configNode->serve(cbi, configTransactionInterface, configFollowerInterface);
}
try {

View File

@ -25,6 +25,7 @@
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "fdbserver/ConfigFollowerInterface.h"
#include "fdbserver/ConfigBroadcastInterface.h"
struct GenerationRegInterface {
constexpr static FileIdentifier file_identifier = 16726744;
@ -212,6 +213,8 @@ struct ForwardRequest {
}
};
class ConfigNode;
class ServerCoordinators : public ClientCoordinators {
public:
explicit ServerCoordinators(Reference<IClusterConnectionRecord>);
@ -223,6 +226,7 @@ public:
Future<Void> coordinationServer(std::string const& dataFolder,
Reference<IClusterConnectionRecord> const& ccf,
ConfigDBType const&);
Reference<ConfigNode> const&,
ConfigBroadcastInterface const&);
#endif

View File

@ -31,6 +31,11 @@ struct CommittedVersions {
Version lastCommitted;
};
struct QuorumVersion {
CommittedVersions versions;
bool isQuorum;
};
class GetCommittedVersionQuorum {
std::vector<Future<Void>> actors;
std::vector<ConfigFollowerInterface> cfis;
@ -42,7 +47,7 @@ class GetCommittedVersionQuorum {
size_t maxAgreement{ 0 };
// Set to the <secondToLastCommitted, lastCommitted> versions a quorum of
// ConfigNodes agree on, otherwise unset.
Promise<CommittedVersions> quorumVersion;
Promise<QuorumVersion> quorumVersion;
// Stores the largest committed version out of all responses.
Version largestCommitted{ 0 };
@ -53,11 +58,11 @@ class GetCommittedVersionQuorum {
CommittedVersions nodeVersion,
CommittedVersions quorumVersion,
ConfigFollowerInterface cfi) {
ASSERT(nodeVersion.lastCommitted <= self->largestCommitted);
if (nodeVersion.lastCommitted == self->largestCommitted) {
state Version target = quorumVersion.lastCommitted;
if (nodeVersion.lastCommitted == target) {
return Void();
}
if (nodeVersion.lastCommitted < self->largestCommitted) {
if (nodeVersion.lastCommitted < target) {
state Optional<Version> rollback;
if (nodeVersion.lastCommitted > quorumVersion.secondToLastCommitted) {
// If a non-quorum node has a last committed version less than
@ -78,29 +83,24 @@ class GetCommittedVersionQuorum {
// TODO: Load balance over quorum. Also need to catch
// error_code_process_behind and retry with the next ConfigNode in
// the quorum.
state ConfigFollowerInterface quorumCfi = self->replies[self->largestCommitted][0];
state ConfigFollowerInterface quorumCfi = self->replies[target][0];
try {
auto lastSeenVersion = rollback.present() ? rollback.get() : nodeVersion.lastCommitted;
ConfigFollowerGetChangesReply reply = wait(retryBrokenPromise(
quorumCfi.getChanges, ConfigFollowerGetChangesRequest{ lastSeenVersion, self->largestCommitted }));
wait(retryBrokenPromise(cfi.rollforward,
ConfigFollowerRollforwardRequest{ rollback,
nodeVersion.lastCommitted,
self->largestCommitted,
reply.changes,
reply.annotations }));
state Version lastSeenVersion = rollback.present() ? rollback.get() : nodeVersion.lastCommitted;
ConfigFollowerGetChangesReply reply = wait(timeoutError(
quorumCfi.getChanges.getReply(ConfigFollowerGetChangesRequest{ lastSeenVersion, target }),
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
wait(timeoutError(cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{
rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }),
SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT));
} catch (Error& e) {
if (e.code() == error_code_version_already_compacted) {
TEST(true); // PaxosConfigConsumer rollforward compacted ConfigNode
ConfigFollowerGetSnapshotAndChangesReply reply =
wait(retryBrokenPromise(quorumCfi.getSnapshotAndChanges,
ConfigFollowerGetSnapshotAndChangesRequest{ self->largestCommitted }));
wait(retryBrokenPromise(cfi.rollforward,
ConfigFollowerRollforwardRequest{ rollback,
nodeVersion.lastCommitted,
self->largestCommitted,
reply.changes,
reply.annotations }));
ConfigFollowerGetSnapshotAndChangesReply reply = wait(retryBrokenPromise(
quorumCfi.getSnapshotAndChanges, ConfigFollowerGetSnapshotAndChangesRequest{ target }));
wait(retryBrokenPromise(
cfi.rollforward,
ConfigFollowerRollforwardRequest{
rollback, nodeVersion.lastCommitted, target, reply.changes, reply.annotations }));
} else if (e.code() == error_code_transaction_too_old) {
// Seeing this trace is not necessarily a problem. There
// are legitimate scenarios where a ConfigNode could return
@ -133,23 +133,16 @@ class GetCommittedVersionQuorum {
if (nodes.size() >= self->cfis.size() / 2 + 1) {
// A quorum of ConfigNodes agree on the latest committed version.
if (self->quorumVersion.canBeSet()) {
self->quorumVersion.send(committedVersions);
self->quorumVersion.send(QuorumVersion{ committedVersions, true });
}
// TODO: We need to wait for all the responses to come in
// before calling updateNode here. For example, imagine a
// scenario with ConfigNodes at versions 1, 1, 2. If responses
// were received from the two ConfigNodes at version 1 first,
// the quorum version would be set at 1 and updateNode would be
// called using version 1. However, in this scenario, these two
// ConfigNodes should actually be rolled forward to version 2.
wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get(), cfi));
wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get().versions, cfi));
} else if (self->maxAgreement >= self->cfis.size() / 2 + 1) {
// A quorum of ConfigNodes agree on the latest committed version,
// but the node we just got a reply from is not one of them. We may
// need to roll it forward or back.
CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture());
ASSERT(committedVersions.lastCommitted != quorumVersion.lastCommitted);
wait(self->updateNode(self, committedVersions, quorumVersion, cfi));
QuorumVersion quorumVersion = wait(self->quorumVersion.getFuture());
ASSERT(committedVersions.lastCommitted != quorumVersion.versions.lastCommitted);
wait(self->updateNode(self, committedVersions, quorumVersion.versions, cfi));
} else if (self->maxAgreement + (self->cfis.size() - self->totalRepliesReceived) <
(self->cfis.size() / 2 + 1)) {
// It is impossible to reach a quorum of ConfigNodes that agree
@ -161,30 +154,40 @@ class GetCommittedVersionQuorum {
Version largestCommitted = self->replies.rbegin()->first;
Version largestCommittedPrior = self->priorVersions[largestCommitted];
if (self->quorumVersion.canBeSet()) {
self->quorumVersion.send(CommittedVersions{ largestCommittedPrior, largestCommitted });
self->quorumVersion.send(
QuorumVersion{ CommittedVersions{ largestCommittedPrior, largestCommitted }, false });
}
wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get(), cfi));
wait(self->updateNode(self, committedVersions, self->quorumVersion.getFuture().get().versions, cfi));
} else {
// Still building up responses; don't have enough data to act on
// yet, so wait until we do.
CommittedVersions quorumVersion = wait(self->quorumVersion.getFuture());
wait(self->updateNode(self, committedVersions, quorumVersion, cfi));
QuorumVersion quorumVersion = wait(self->quorumVersion.getFuture());
wait(self->updateNode(self, committedVersions, quorumVersion.versions, cfi));
}
} catch (Error& e) {
// Count a timeout as a reply.
++self->totalRepliesReceived;
if (e.code() != error_code_timed_out) {
throw;
} else if (self->totalRepliesReceived == self->cfis.size() && !self->quorumVersion.isSet()) {
// Make sure to trigger the quorumVersion if a timeout
// occurred, the quorum version hasn't been set, and there are
// no more incoming responses. Note that this means that it is
// impossible to reach a quorum, so send back the largest
// committed version seen. We also need to store the interface
// for the timed out server for future communication attempts.
auto& nodes = self->replies[self->largestCommitted];
nodes.push_back(cfi);
self->quorumVersion.send(CommittedVersions{ self->lastSeenVersion, self->largestCommitted });
} else if (self->totalRepliesReceived == self->cfis.size() && self->quorumVersion.canBeSet() &&
!self->quorumVersion.isError()) {
if (self->replies.size() >= self->cfis.size() / 2 + 1) {
// Make sure to trigger the quorumVersion if a timeout
// occurred, a quorum disagree on the committed version, and
// there are no more incoming responses. Note that this means
// that it is impossible to reach a quorum, so send back the
// largest committed version seen. We also need to store the
// interface for the timed out server for future communication
// attempts.
auto& nodes = self->replies[self->largestCommitted];
nodes.push_back(cfi);
self->quorumVersion.send(
QuorumVersion{ CommittedVersions{ self->lastSeenVersion, self->largestCommitted }, false });
} else if (!self->quorumVersion.isSet()) {
// Otherwise, if a quorum agree on the committed version,
// some other occurred. Notify the caller of it.
self->quorumVersion.sendError(e);
}
}
}
return Void();
@ -193,7 +196,7 @@ class GetCommittedVersionQuorum {
public:
explicit GetCommittedVersionQuorum(std::vector<ConfigFollowerInterface> const& cfis, Version lastSeenVersion)
: cfis(cfis), lastSeenVersion(lastSeenVersion) {}
Future<CommittedVersions> getCommittedVersion() {
Future<QuorumVersion> getCommittedVersion() {
ASSERT(!isReady()); // ensures this function is not accidentally called before resetting state
for (const auto& cfi : cfis) {
actors.push_back(getCommittedVersionActor(this, cfi));
@ -205,8 +208,11 @@ public:
!quorumVersion.getFuture().isError();
}
std::vector<ConfigFollowerInterface> getReadReplicas() const {
if (quorumVersion.getFuture().isError()) {
throw quorumVersion.getFuture().getError();
}
ASSERT(isReady());
return replies.at(quorumVersion.getFuture().get().lastCommitted);
return replies.at(quorumVersion.getFuture().get().versions.lastCommitted);
}
Future<Void> complete() const { return waitForAll(actors); }
};
@ -220,8 +226,11 @@ class PaxosConfigConsumerImpl {
UID id;
ACTOR static Future<Version> getCommittedVersion(PaxosConfigConsumerImpl* self) {
CommittedVersions versions = wait(self->getCommittedVersionQuorum.getCommittedVersion());
return versions.lastCommitted;
QuorumVersion quorumVersion = wait(self->getCommittedVersionQuorum.getCommittedVersion());
if (!quorumVersion.isQuorum) {
throw config_node_no_quorum();
}
return quorumVersion.versions.lastCommitted;
}
ACTOR static Future<Void> compactor(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
@ -246,21 +255,41 @@ class PaxosConfigConsumerImpl {
}
ACTOR static Future<Void> getSnapshotAndChanges(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
state Version committedVersion = wait(getCommittedVersion(self));
// TODO: Load balance
ConfigFollowerGetSnapshotAndChangesReply reply =
wait(retryBrokenPromise(self->getCommittedVersionQuorum.getReadReplicas()[0].getSnapshotAndChanges,
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }));
TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesVersion", committedVersion)
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
ASSERT_GE(committedVersion, self->lastSeenVersion);
self->lastSeenVersion = committedVersion;
broadcaster->applySnapshotAndChanges(
std::move(reply.snapshot), reply.snapshotVersion, reply.changes, committedVersion, reply.annotations);
loop {
self->resetCommittedVersionQuorum(); // TODO: This seems to fix a segfault, investigate more
try {
// TODO: Load balance
state Version committedVersion = wait(getCommittedVersion(self));
ConfigFollowerGetSnapshotAndChangesReply reply = wait(
timeoutError(self->getCommittedVersionQuorum.getReadReplicas()[0].getSnapshotAndChanges.getReply(
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }),
SERVER_KNOBS->GET_SNAPSHOT_AND_CHANGES_TIMEOUT));
TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesVersion", committedVersion)
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
ASSERT_GE(committedVersion, self->lastSeenVersion);
self->lastSeenVersion = committedVersion;
broadcaster->applySnapshotAndChanges(std::move(reply.snapshot),
reply.snapshotVersion,
reply.changes,
committedVersion,
reply.annotations,
self->getCommittedVersionQuorum.getReadReplicas());
wait(self->getCommittedVersionQuorum.complete());
break;
} catch (Error& e) {
if (e.code() == error_code_config_node_no_quorum) {
wait(self->getCommittedVersionQuorum.complete());
} else if (e.code() != error_code_timed_out && e.code() != error_code_broken_promise) {
throw;
}
wait(delayJittered(0.1));
self->resetCommittedVersionQuorum();
}
}
return Void();
}
@ -281,9 +310,11 @@ class PaxosConfigConsumerImpl {
if (committedVersion > self->lastSeenVersion) {
// TODO: Load balance to avoid always hitting the
// node at index 0 first
ASSERT(self->getCommittedVersionQuorum.getReadReplicas().size() >= self->cfis.size() / 2 + 1);
ConfigFollowerGetChangesReply reply = wait(
retryBrokenPromise(self->getCommittedVersionQuorum.getReadReplicas()[0].getChanges,
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }));
timeoutError(self->getCommittedVersionQuorum.getReadReplicas()[0].getChanges.getReply(
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }),
SERVER_KNOBS->FETCH_CHANGES_TIMEOUT));
for (const auto& versionedMutation : reply.changes) {
TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id);
te.detail("Version", versionedMutation.version)
@ -297,21 +328,48 @@ class PaxosConfigConsumerImpl {
}
}
self->lastSeenVersion = committedVersion;
broadcaster->applyChanges(reply.changes, committedVersion, reply.annotations);
broadcaster->applyChanges(reply.changes,
committedVersion,
reply.annotations,
self->getCommittedVersionQuorum.getReadReplicas());
// TODO: Catch error_code_process_behind and retry with
// the next ConfigNode in the quorum.
} else if (committedVersion == self->lastSeenVersion) {
broadcaster->applyChanges({}, -1, {}, self->getCommittedVersionQuorum.getReadReplicas());
}
wait(delayJittered(self->pollingInterval));
} catch (Error& e) {
if (e.code() == error_code_version_already_compacted) {
if (e.code() == error_code_version_already_compacted || e.code() == error_code_timed_out ||
e.code() == error_code_config_node_no_quorum) {
TEST(true); // PaxosConfigConsumer get version_already_compacted error
if (e.code() == error_code_config_node_no_quorum) {
try {
wait(self->getCommittedVersionQuorum.complete());
} catch (Error& e) {
if (e.code() == error_code_broken_promise) {
self->resetCommittedVersionQuorum();
continue;
} else {
throw;
}
}
}
self->resetCommittedVersionQuorum();
wait(getSnapshotAndChanges(self, broadcaster));
} else if (e.code() == error_code_broken_promise) {
self->resetCommittedVersionQuorum();
continue;
} else {
throw e;
}
}
wait(self->getCommittedVersionQuorum.complete());
try {
wait(self->getCommittedVersionQuorum.complete());
} catch (Error& e) {
if (e.code() != error_code_broken_promise) {
throw;
}
}
self->resetCommittedVersionQuorum();
}
}

View File

@ -79,7 +79,7 @@ class SimpleConfigConsumerImpl {
}
}
self->lastSeenVersion = committedVersion;
broadcaster->applyChanges(reply.changes, committedVersion, reply.annotations);
broadcaster->applyChanges(reply.changes, committedVersion, reply.annotations, { self->cfi });
}
wait(delayJittered(self->pollingInterval));
} catch (Error& e) {
@ -107,8 +107,12 @@ class SimpleConfigConsumerImpl {
.detail("AnnotationsSize", reply.annotations.size());
ASSERT_GE(committedVersion, self->lastSeenVersion);
self->lastSeenVersion = committedVersion;
broadcaster->applySnapshotAndChanges(
std::move(reply.snapshot), reply.snapshotVersion, reply.changes, committedVersion, reply.annotations);
broadcaster->applySnapshotAndChanges(std::move(reply.snapshot),
reply.snapshotVersion,
reply.changes,
committedVersion,
reply.annotations,
{ self->cfi });
return Void();
}

View File

@ -47,6 +47,7 @@
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/ConfigNode.h"
#include "fdbserver/LocalConfiguration.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/ClientWorkerInterface.h"
@ -534,6 +535,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
Reference<AsyncVar<bool> const> degraded,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<std::set<std::string>> const> issues,
Reference<ConfigNode> configNode,
Reference<LocalConfiguration> localConfig,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
@ -1387,7 +1389,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
std::string _coordFolder,
std::string whitelistBinPaths,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
ConfigDBType configDBType,
ConfigBroadcastInterface configBroadcastInterface,
Reference<ConfigNode> configNode,
Reference<LocalConfiguration> localConfig) {
state PromiseStream<ErrorInfo> errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
@ -1418,6 +1421,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf(locality);
interf.configBroadcastInterface = configBroadcastInterface;
state std::set<std::pair<UID, KeyValueStoreType>> runningStorages;
interf.initEndpoints();
@ -1669,10 +1673,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
degraded,
connRecord,
issues,
configNode,
localConfig,
dbInfo));
if (configDBType != ConfigDBType::DISABLED) {
if (configNode.isValid()) {
errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));
}
@ -2631,11 +2636,16 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
ConfigDBType configDBType) {
state std::vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
state Reference<ConfigNode> configNode;
state Reference<LocalConfiguration> localConfig =
makeReference<LocalConfiguration>(dataFolder, configPath, manualKnobOverrides);
// setupStackSignal();
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::Worker;
if (configDBType != ConfigDBType::DISABLED) {
configNode = makeReference<ConfigNode>(dataFolder);
}
// FIXME: Initializing here causes simulation issues, these must be fixed
/*
if (configDBType != ConfigDBType::DISABLED) {
@ -2658,13 +2668,15 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
.detail("CoordPath", coordFolder)
.detail("WhiteListBinPath", whitelistBinPaths);
state ConfigBroadcastInterface configBroadcastInterface;
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
// Endpoints should be registered first before any process trying to connect to it.
// So coordinationServer actor should be the first one executed before any other.
if (coordFolder.size()) {
// SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up
// their files
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder, coordinators.ccr, configDBType)));
actors.push_back(fileNotFoundToNever(
coordinationServer(coordFolder, coordinators.ccr, configNode, configBroadcastInterface)));
}
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));
@ -2713,7 +2725,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
coordFolder,
whitelistBinPaths,
dbInfo,
configDBType,
configBroadcastInterface,
configNode,
localConfig),
"WorkerServer",
UID(),

View File

@ -148,7 +148,8 @@ public:
// TODO: The timeout is a hack to get the test to pass before rollforward and
// rollback are supported. Eventually, this timeout should be removed so
// we test that all clients make progress.
actors.push_back(timeout(incrementActor(this, cx), 60.0, Void()));
// TODO(ljoswiak): REMEMBER TO REMOVE THIS TIMEOUT ALTOGETHER
actors.push_back(timeout(incrementActor(this, cx), 600.0, Void()));
}
return waitForAll(actors);
}

View File

@ -84,6 +84,7 @@ ERROR( change_feed_not_registered, 1060, "Change feed not registered" )
ERROR( granule_assignment_conflict, 1061, "Conflicting attempts to assign blob granules" )
ERROR( change_feed_cancelled, 1062, "Change feed was cancelled" )
ERROR( blob_granule_file_load_error, 1063, "Error loading a blob file during granule materialization" )
ERROR( config_node_no_quorum, 1064, "Failed to reach a quorum of ConfigNode committed versions" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )

View File

@ -1,5 +1,6 @@
[configuration]
configDB = 'random'
configDB = 'paxos'
coordinators = 3
[[test]]
testTitle = 'ConfigIncrement'
@ -13,7 +14,3 @@ testTitle = 'ConfigIncrement'
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0