Very simple partial implementation of LocalConfiguration::consume

This commit is contained in:
sfc-gh-tclinkenbeard 2021-04-25 19:01:17 -07:00
parent 732fde2253
commit ae69608375
3 changed files with 34 additions and 23 deletions

View File

@ -35,7 +35,6 @@ class LocalConfigurationImpl {
ConfigClassSet configClasses;
Version lastSeenVersion { 0 };
Future<Void> initFuture;
Reference<AsyncVar<ServerDBInfo> const> serverDBInfo;
TestKnobs testKnobs;
ACTOR static Future<Void> saveConfigClasses(LocalConfigurationImpl* self) {
@ -87,20 +86,33 @@ class LocalConfigurationImpl {
return Void();
}
ACTOR static Future<Void> consume(LocalConfigurationImpl* self) {
Future<ConfigFollowerGetChangesReply> getChanges(ServerDBInfo serverDBInfo) {
if (!serverDBInfo.configFollowerInterface.present()) {
return Never();
} else {
return serverDBInfo.configFollowerInterface.get().getChanges.getReply(
ConfigFollowerGetChangesRequest{ lastSeenVersion, {} });
}
}
ACTOR static Future<Void> consume(LocalConfigurationImpl* self,
Reference<AsyncVar<ServerDBInfo> const> serverDBInfo) {
wait(self->initFuture);
state Future<Void> timeout = Void();
state Future<ConfigFollowerGetChangesReply> getChangesReply;
state Future<ConfigFollowerGetChangesReply> getChangesReply = Never();
loop {
choose {
when(wait(self->serverDBInfo->onChange())) { timeout = Void(); }
when(wait(serverDBInfo->onChange())) {
getChangesReply = self->getChanges(serverDBInfo->get());
timeout = Never();
}
when(wait(timeout)) {
getChangesReply = self->serverDBInfo->get().configFollowerInterface.get().getChanges.getReply(
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, {} });
timeout = Future<Void>{};
getChangesReply = self->getChanges(serverDBInfo->get());
timeout = Never();
}
when(ConfigFollowerGetChangesReply reply = wait(getChangesReply)) {
// TODO: Handle reply
getChangesReply = Never();
timeout = delay(0.5); // TODO: Make knob?
}
}
@ -108,10 +120,8 @@ class LocalConfigurationImpl {
}
public:
LocalConfigurationImpl(ConfigClassSet const& configClasses,
std::string const& dataFolder,
Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo)
: configClasses(configClasses), serverDBInfo(serverDBInfo) {
LocalConfigurationImpl(ConfigClassSet const& configClasses, std::string const& dataFolder)
: configClasses(configClasses) {
platform::createDirectory(dataFolder);
kvStore = keyValueStoreMemory(joinPath(dataFolder, "localconf-"), UID{}, 500e6);
}
@ -127,13 +137,13 @@ public:
return testKnobs;
}
Future<Void> consume() { return consume(this); }
Future<Void> consume(Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo) {
return consume(this, serverDBInfo);
}
};
LocalConfiguration::LocalConfiguration(ConfigClassSet const& configClasses,
std::string const& dataFolder,
Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo)
: impl(std::make_unique<LocalConfigurationImpl>(configClasses, dataFolder, serverDBInfo)) {}
LocalConfiguration::LocalConfiguration(ConfigClassSet const& configClasses, std::string const& dataFolder)
: impl(std::make_unique<LocalConfigurationImpl>(configClasses, dataFolder)) {}
LocalConfiguration::~LocalConfiguration() = default;
@ -145,8 +155,8 @@ TestKnobs const &LocalConfiguration::getKnobs() const {
return impl->getKnobs();
}
Future<Void> LocalConfiguration::consume() {
return impl->consume();
Future<Void> LocalConfiguration::consume(Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo) {
return impl->consume(serverDBInfo);
}
#define init(knob, value) initKnob(knob, value, #knob)

View File

@ -38,11 +38,10 @@ class LocalConfiguration {
std::unique_ptr<class LocalConfigurationImpl> impl;
public:
LocalConfiguration(ConfigClassSet const& configClasses,
std::string const& dataFolder,
Reference<AsyncVar<ServerDBInfo> const> const&);
LocalConfiguration(ConfigClassSet const& configClasses, std::string const& dataFolder);
~LocalConfiguration();
Future<Void> init();
TestKnobs const &getKnobs() const;
Future<Void> consume();
// TODO: Only one field of serverDBInfo is required, so improve encapsulation
Future<Void> consume(Reference<AsyncVar<ServerDBInfo> const> const&);
};

View File

@ -2022,6 +2022,8 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
std::string whitelistBinPaths) {
state vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
state LocalConfiguration localConfig(ConfigClassSet{}, dataFolder);
wait(localConfig.init());
actors.push_back(serveProtocolInfo());
@ -2056,8 +2058,8 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
auto asyncPriorityInfo =
makeReference<AsyncVar<ClusterControllerPriorityInfo>>(getCCPriorityInfo(fitnessFilePath, processClass));
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>();
LocalConfiguration localConfig(ConfigClassSet{}, dataFolder, dbInfo);
actors.push_back(reportErrors(localConfig.consume(dbInfo), "LocalConfiguration"));
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
"MonitorAndWriteCCPriorityInfo"));
if (processClass.machineClassFitness(ProcessClass::ClusterController) == ProcessClass::NeverAssign) {