Push updates to workers (clang-formatted files)

This commit is contained in:
Lukas Joswiak 2021-08-05 18:50:23 -07:00
parent 0116d17dbb
commit 38d05a2f49
8 changed files with 576 additions and 560 deletions

View File

@ -3823,7 +3823,10 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
self->masterProcessId = w.locality.processId();
}
if (configBroadcaster != nullptr) {
self->addActor.send(configBroadcaster->registerWorker(req.lastSeenKnobVersion, req.knobConfigClassSet, self->id_worker[w.locality.processId()].watcher, &self->id_worker[w.locality.processId()].details));
self->addActor.send(configBroadcaster->registerWorker(req.lastSeenKnobVersion,
req.knobConfigClassSet,
self->id_worker[w.locality.processId()].watcher,
&self->id_worker[w.locality.processId()].details));
}
checkOutstandingRequests(self);
} else if (info->second.details.interf.id() != w.id() || req.generation >= info->second.gen) {
@ -3844,7 +3847,8 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
info->second.watcher = workerAvailabilityWatch(w, newProcessClass, self);
}
if (configBroadcaster != nullptr) {
self->addActor.send(configBroadcaster->registerWorker(req.lastSeenKnobVersion, req.knobConfigClassSet, info->second.watcher, &info->second.details));
self->addActor.send(configBroadcaster->registerWorker(
req.lastSeenKnobVersion, req.knobConfigClassSet, info->second.watcher, &info->second.details));
}
checkOutstandingRequests(self);
} else {

View File

@ -98,8 +98,9 @@ struct ConfigBroadcastChangesRequest {
ReplyPromise<ConfigBroadcastChangesReply> reply;
ConfigBroadcastChangesRequest() = default;
explicit ConfigBroadcastChangesRequest(Version mostRecentVersion, Standalone<VectorRef<VersionedConfigMutationRef>> const& changes)
: mostRecentVersion(mostRecentVersion), changes(changes) {}
explicit ConfigBroadcastChangesRequest(Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes)
: mostRecentVersion(mostRecentVersion), changes(changes) {}
template <class Ar>
void serialize(Ar& ar) {

View File

@ -54,69 +54,69 @@ class ConfigBroadcasterImpl {
// pending requests with affected configuration classes
/*
class PendingRequestStore {
using Req = ConfigBroadcastFollowerGetChangesRequest;
std::map<Key, std::set<Endpoint::Token>> configClassToTokens;
std::map<Endpoint::Token, Req> tokenToRequest;
using Req = ConfigBroadcastFollowerGetChangesRequest;
std::map<Key, std::set<Endpoint::Token>> configClassToTokens;
std::map<Endpoint::Token, Req> tokenToRequest;
public:
void addRequest(Req const& req) {
auto token = req.reply.getEndpoint().token;
tokenToRequest[token] = req;
for (const auto& configClass : req.configClassSet.getClasses()) {
configClassToTokens[configClass].insert(token);
}
}
void addRequest(Req const& req) {
auto token = req.reply.getEndpoint().token;
tokenToRequest[token] = req;
for (const auto& configClass : req.configClassSet.getClasses()) {
configClassToTokens[configClass].insert(token);
}
}
std::vector<Req> getRequestsToNotify(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes) const {
std::set<Endpoint::Token> tokenSet;
for (const auto& change : changes) {
if (!change.mutation.getConfigClass().present()) {
// Update everything
for (const auto& [token, req] : tokenToRequest) {
if (req.lastSeenVersion < change.version) {
tokenSet.insert(token);
}
}
} else {
Key configClass = change.mutation.getConfigClass().get();
if (configClassToTokens.count(configClass)) {
auto tokens = get(configClassToTokens, Key(change.mutation.getConfigClass().get()));
for (const auto& token : tokens) {
auto req = get(tokenToRequest, token);
if (req.lastSeenVersion < change.version) {
tokenSet.insert(token);
} else {
TEST(true); // Worker is ahead of config broadcaster
}
}
}
}
}
std::vector<Req> result;
for (const auto& token : tokenSet) {
result.push_back(get(tokenToRequest, token));
}
return result;
}
std::vector<Req> getRequestsToNotify(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes) const {
std::set<Endpoint::Token> tokenSet;
for (const auto& change : changes) {
if (!change.mutation.getConfigClass().present()) {
// Update everything
for (const auto& [token, req] : tokenToRequest) {
if (req.lastSeenVersion < change.version) {
tokenSet.insert(token);
}
}
} else {
Key configClass = change.mutation.getConfigClass().get();
if (configClassToTokens.count(configClass)) {
auto tokens = get(configClassToTokens, Key(change.mutation.getConfigClass().get()));
for (const auto& token : tokens) {
auto req = get(tokenToRequest, token);
if (req.lastSeenVersion < change.version) {
tokenSet.insert(token);
} else {
TEST(true); // Worker is ahead of config broadcaster
}
}
}
}
}
std::vector<Req> result;
for (const auto& token : tokenSet) {
result.push_back(get(tokenToRequest, token));
}
return result;
}
std::vector<Req> getOutdatedRequests(Version newSnapshotVersion) {
std::vector<Req> result;
for (const auto& [token, req] : tokenToRequest) {
if (req.lastSeenVersion < newSnapshotVersion) {
result.push_back(req);
}
}
return result;
}
std::vector<Req> getOutdatedRequests(Version newSnapshotVersion) {
std::vector<Req> result;
for (const auto& [token, req] : tokenToRequest) {
if (req.lastSeenVersion < newSnapshotVersion) {
result.push_back(req);
}
}
return result;
}
void removeRequest(Req const& req) {
auto token = req.reply.getEndpoint().token;
for (const auto& configClass : req.configClassSet.getClasses()) {
remove(get(configClassToTokens, configClass), token);
// TODO: Don't leak config classes
}
remove(tokenToRequest, token);
}
void removeRequest(Req const& req) {
auto token = req.reply.getEndpoint().token;
for (const auto& configClass : req.configClassSet.getClasses()) {
remove(get(configClassToTokens, configClass), token);
// TODO: Don't leak config classes
}
remove(tokenToRequest, token);
}
} pending;
*/
std::map<ConfigKey, KnobValue> snapshot;
@ -138,7 +138,9 @@ class ConfigBroadcasterImpl {
// Push changes to the specified clients.
template <class Changes>
Future<Void> pushChanges(std::vector<std::tuple<ConfigClassSet, Version, WorkerDetails*>>::iterator begin, std::vector<std::tuple<ConfigClassSet, Version, WorkerDetails*>>::iterator end, Changes const& changes) {
Future<Void> pushChanges(std::vector<std::tuple<ConfigClassSet, Version, WorkerDetails*>>::iterator begin,
std::vector<std::tuple<ConfigClassSet, Version, WorkerDetails*>>::iterator end,
Changes const& changes) {
std::vector<Future<Void>> responses;
for (auto it = begin; it != end; ++it) {
auto& [configClassSet, lastSeenVersion, worker] = *it;
@ -213,7 +215,11 @@ class ConfigBroadcasterImpl {
}
public:
Future<Void> registerWorker(ConfigBroadcaster* self, Version lastSeenVersion, ConfigClassSet configClassSet, Future<Void>& watcher, WorkerDetails* worker) {
Future<Void> registerWorker(ConfigBroadcaster* self,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void>& watcher,
WorkerDetails* worker) {
actors.add(consumer->consume(*self));
// TODO: Use `watcher` to detect death of client
workers.push_back(std::make_tuple(std::move(configClassSet), lastSeenVersion, worker));
@ -338,7 +344,10 @@ ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default;
ConfigBroadcaster::~ConfigBroadcaster() = default;
Future<Void> ConfigBroadcaster::registerWorker(Version lastSeenVersion, ConfigClassSet configClassSet, Future<Void>& watcher, WorkerDetails* worker) {
Future<Void> ConfigBroadcaster::registerWorker(Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void>& watcher,
WorkerDetails* worker) {
return impl().registerWorker(this, lastSeenVersion, configClassSet, watcher, worker);
}
@ -431,7 +440,7 @@ namespace {
// ConfigBroadcasterImpl::runPendingRequestStoreTest(false, 2);
// return Void();
// }
//
//
// TEST_CASE("/fdbserver/ConfigDB/ConfigBroadcaster/Internal/PendingRequestStore/GlobalMutation") {
// ConfigBroadcasterImpl::runPendingRequestStoreTest(true, 4);
// return Void();

View File

@ -44,7 +44,10 @@ public:
ConfigBroadcaster(ConfigBroadcaster&&);
ConfigBroadcaster& operator=(ConfigBroadcaster&&);
~ConfigBroadcaster();
Future<Void> registerWorker(Version lastSeenVersion, ConfigClassSet configClassSet, Future<Void>& watcher, WorkerDetails* worker);
Future<Void> registerWorker(Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void>& watcher,
WorkerDetails* worker);
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);

File diff suppressed because it is too large Load Diff

View File

@ -263,7 +263,6 @@ class LocalConfigurationImpl {
++self->changeRequestsFetched;
for (const auto& versionedMutation : changes) {
ASSERT_GT(versionedMutation.version, self->lastSeenVersion);
TraceEvent("LUKAS_addChanges").detail("Version", versionedMutation.version).detail("Key", versionedMutation.mutation.getKnobName()).detail("Value", versionedMutation.mutation.getValue().toString()).detail("ConfigClass", versionedMutation.mutation.getConfigClass());
++self->mutations;
const auto& mutation = versionedMutation.mutation;
auto serializedKey = BinaryWriter::toValue(mutation.getKey(), IncludeVersion());
@ -316,8 +315,7 @@ public:
IsTest isTest)
: id(deterministicRandom()->randomUniqueID()), kvStore(dataFolder, id, "localconf-"),
configKnobOverrides(configPath), manualKnobOverrides(manualKnobOverrides), cc("LocalConfiguration"),
snapshots("Snapshots", cc),
changeRequestsFetched("ChangeRequestsFetched", cc), mutations("Mutations", cc) {
snapshots("Snapshots", cc), changeRequestsFetched("ChangeRequestsFetched", cc), mutations("Mutations", cc) {
if (isTest) {
testKnobCollection =
IKnobCollection::create(IKnobCollection::Type::TEST,

View File

@ -120,7 +120,7 @@ struct WorkerInterface {
workerSnapReq,
backup,
updateServerDBInfo,
configBroadcastInterface);
configBroadcastInterface);
}
};
@ -392,10 +392,11 @@ struct RegisterWorkerRequest {
Optional<DataDistributorInterface> ddInterf,
Optional<RatekeeperInterface> rkInterf,
bool degraded,
Version lastSeenKnobVersion,
ConfigClassSet knobConfigClassSet)
Version lastSeenKnobVersion,
ConfigClassSet knobConfigClassSet)
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet) {}
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), degraded(degraded),
lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet) {}
template <class Ar>
void serialize(Ar& ar) {
@ -411,8 +412,8 @@ struct RegisterWorkerRequest {
incompatiblePeers,
reply,
degraded,
lastSeenKnobVersion,
knobConfigClassSet);
lastSeenKnobVersion,
knobConfigClassSet);
}
};

View File

@ -511,7 +511,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
Reference<AsyncVar<bool> const> degraded,
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<std::set<std::string>> const> issues,
LocalConfiguration* localConfig) {
LocalConfiguration* localConfig) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
@ -1216,7 +1216,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
std::string _coordFolder,
std::string whitelistBinPaths,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
LocalConfiguration* localConfig) {
LocalConfiguration* localConfig) {
state PromiseStream<ErrorInfo> errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
new AsyncVar<Optional<DataDistributorInterface>>());
@ -2374,7 +2374,7 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
coordFolder,
whitelistBinPaths,
dbInfo,
&localConfig),
&localConfig),
"WorkerServer",
UID(),
&normalWorkerErrors()));