Added IDependentAsyncVar generic class
This commit is contained in:
parent
77f8b18456
commit
6836bec63b
|
@ -232,17 +232,10 @@ class LocalConfigurationImpl : public NonCopyable {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorBroadcaster(Reference<AsyncVar<ServerDBInfo> const> serverDBInfo,
|
||||
Reference<AsyncVar<ConfigFollowerInterface>> broadcaster) {
|
||||
loop {
|
||||
wait(serverDBInfo->onChange());
|
||||
broadcaster->set(serverDBInfo->get().configBroadcaster);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> consumeLoopIteration(LocalConfiguration* self,
|
||||
LocalConfigurationImpl* impl,
|
||||
Reference<AsyncVar<ConfigFollowerInterface>> broadcaster) {
|
||||
ACTOR static Future<Void> consumeLoopIteration(
|
||||
LocalConfiguration* self,
|
||||
LocalConfigurationImpl* impl,
|
||||
Reference<IDependentAsyncVar<ConfigFollowerInterface> const> broadcaster) {
|
||||
// TODO: Cache lastSeenVersion in memory
|
||||
// state Version lastSeenVersion = wait(impl->getLastSeenVersion());
|
||||
state SimpleConfigConsumer consumer(broadcaster->get());
|
||||
|
@ -256,11 +249,8 @@ class LocalConfigurationImpl : public NonCopyable {
|
|||
|
||||
ACTOR static Future<Void> consume(LocalConfiguration* self,
|
||||
LocalConfigurationImpl* impl,
|
||||
Reference<AsyncVar<ServerDBInfo> const> serverDBInfo) {
|
||||
Reference<IDependentAsyncVar<ConfigFollowerInterface> const> broadcaster) {
|
||||
wait(impl->initFuture);
|
||||
state Reference<AsyncVar<ConfigFollowerInterface>> broadcaster =
|
||||
makeReference<AsyncVar<ConfigFollowerInterface>>(serverDBInfo->get().configBroadcaster);
|
||||
impl->actors.add(monitorBroadcaster(serverDBInfo, broadcaster));
|
||||
loop { wait(consumeLoopIteration(self, impl, broadcaster)); }
|
||||
}
|
||||
|
||||
|
@ -282,9 +272,8 @@ public:
|
|||
return initFuture;
|
||||
}
|
||||
|
||||
Future<Void> setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion) {
|
||||
// TODO: Remove unnecessary copy
|
||||
auto f = setSnapshot(this, std::move(snapshot), lastCompactedVersion);
|
||||
Future<Void> setSnapshot(std::map<ConfigKey, Value> const& snapshot, Version lastCompactedVersion) {
|
||||
auto f = setSnapshot(this, snapshot, lastCompactedVersion);
|
||||
actors.add(f);
|
||||
return f;
|
||||
}
|
||||
|
@ -316,8 +305,9 @@ public:
|
|||
return testKnobs;
|
||||
}
|
||||
|
||||
Future<Void> consume(LocalConfiguration& self, Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo) {
|
||||
return consume(&self, this, serverDBInfo);
|
||||
Future<Void> consume(LocalConfiguration& self,
|
||||
Reference<IDependentAsyncVar<ConfigFollowerInterface> const> const& broadcaster) {
|
||||
return consume(&self, this, broadcaster);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -349,12 +339,13 @@ TestKnobs const& LocalConfiguration::getTestKnobs() const {
|
|||
return impl->getTestKnobs();
|
||||
}
|
||||
|
||||
Future<Void> LocalConfiguration::consume(Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo) {
|
||||
return impl->consume(*this, serverDBInfo);
|
||||
Future<Void> LocalConfiguration::consume(
|
||||
Reference<IDependentAsyncVar<ConfigFollowerInterface> const> const& broadcaster) {
|
||||
return impl->consume(*this, broadcaster);
|
||||
}
|
||||
|
||||
Future<Void> LocalConfiguration::setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion) {
|
||||
return impl->setSnapshot(std::move(snapshot), lastCompactedVersion);
|
||||
Future<Void> LocalConfiguration::setSnapshot(std::map<ConfigKey, Value> const& snapshot, Version lastCompactedVersion) {
|
||||
return impl->setSnapshot(snapshot, lastCompactedVersion);
|
||||
}
|
||||
|
||||
Future<Void> LocalConfiguration::addVersionedMutations(
|
||||
|
@ -474,7 +465,7 @@ ACTOR Future<Void> setTestSnapshot(LocalConfiguration* localConfiguration, Versi
|
|||
{ ConfigKeyRef("class-C"_sr, "test_int"_sr), "3"_sr },
|
||||
{ ConfigKeyRef("class-A"_sr, "test_string"_sr), "x"_sr },
|
||||
};
|
||||
wait(localConfiguration->setSnapshot(std::move(snapshot), ++(*version)));
|
||||
wait(localConfiguration->setSnapshot(snapshot, ++(*version)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
|
||||
#include "fdbclient/ConfigKnobs.h"
|
||||
#include "fdbserver/ConfigFollowerInterface.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/Knobs.h"
|
||||
|
||||
|
@ -55,8 +54,8 @@ public:
|
|||
ServerKnobs const& getServerKnobs() const;
|
||||
TestKnobs const& getTestKnobs() const;
|
||||
// TODO: Only one field of serverDBInfo is required, so improve encapsulation
|
||||
Future<Void> consume(Reference<AsyncVar<ServerDBInfo> const> const&);
|
||||
Future<Void> setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion);
|
||||
Future<Void> consume(Reference<IDependentAsyncVar<ConfigFollowerInterface> const> const& broadcaster);
|
||||
Future<Void> setSnapshot(std::map<ConfigKey, Value> const& snapshot, Version lastCompactedVersion);
|
||||
Future<Void> addVersionedMutations(Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations,
|
||||
Version mostRecentVersion);
|
||||
};
|
||||
|
|
|
@ -2060,7 +2060,9 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
|
|||
makeReference<AsyncVar<ClusterControllerPriorityInfo>>(getCCPriorityInfo(fitnessFilePath, processClass));
|
||||
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>();
|
||||
|
||||
actors.push_back(reportErrors(localConfig.consume(dbInfo), "LocalConfiguration"));
|
||||
actors.push_back(reportErrors(localConfig.consume(IDependentAsyncVar<ConfigFollowerInterface>::create(
|
||||
dbInfo, [](auto const& info) { return info.configBroadcaster; })),
|
||||
"LocalConfiguration"));
|
||||
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
|
||||
"MonitorAndWriteCCPriorityInfo"));
|
||||
if (processClass.machineClassFitness(ProcessClass::ClusterController) == ProcessClass::NeverAssign) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
ACTOR Future<bool> allTrue(std::vector<Future<bool>> all) {
|
||||
|
@ -136,3 +137,45 @@ ACTOR Future<Void> lowPriorityDelay(double waitTime) {
|
|||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct DummyState {
|
||||
int changed{ 0 };
|
||||
int unchanged{ 0 };
|
||||
bool operator==(DummyState const& rhs) const { return changed == rhs.changed && unchanged == rhs.unchanged; }
|
||||
bool operator!=(DummyState const& rhs) const { return !(*this == rhs); }
|
||||
};
|
||||
|
||||
ACTOR Future<Void> testPublisher(Reference<AsyncVar<DummyState>> input) {
|
||||
state int i = 0;
|
||||
for (; i < 100; ++i) {
|
||||
wait(delay(deterministicRandom()->random01()));
|
||||
auto var = input->get();
|
||||
++var.changed;
|
||||
input->set(var);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> testSubscriber(Reference<IDependentAsyncVar<int>> output, Optional<int> expected) {
|
||||
loop {
|
||||
wait(output->onChange());
|
||||
ASSERT(expected.present());
|
||||
if (output->get() == expected.get()) {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("/flow/genericactors/DependentAsyncVar") {
|
||||
auto input = makeReference<AsyncVar<DummyState>>();
|
||||
state Future<Void> subscriber1 =
|
||||
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.changed; }), 100);
|
||||
state Future<Void> subscriber2 =
|
||||
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.unchanged; }), {});
|
||||
wait(subscriber1 && testPublisher(input));
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -1891,6 +1891,41 @@ Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
|
|||
return runAfter(lhs, rhs);
|
||||
}
|
||||
|
||||
template <class Output>
|
||||
class IDependentAsyncVar : public ReferenceCounted<IDependentAsyncVar<Output>> {
|
||||
public:
|
||||
virtual ~IDependentAsyncVar() = default;
|
||||
virtual Output const& get() const = 0;
|
||||
virtual Future<Void> onChange() const = 0;
|
||||
template <class Input, class F>
|
||||
static Reference<IDependentAsyncVar> create(Reference<AsyncVar<Input>> const& input, F const& f);
|
||||
};
|
||||
|
||||
template <class Input, class Output, class F>
|
||||
class DependentAsyncVar final : public IDependentAsyncVar<Output> {
|
||||
Reference<AsyncVar<Output>> output;
|
||||
Future<Void> monitorActor;
|
||||
ACTOR static Future<Void> monitor(Reference<AsyncVar<Input>> input, Reference<AsyncVar<Output>> output, F f) {
|
||||
loop {
|
||||
wait(input->onChange());
|
||||
output->set(f(input->get()));
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
DependentAsyncVar(Reference<AsyncVar<Input>> const& input, F const& f)
|
||||
: output(makeReference<AsyncVar<Output>>(f(input->get()))), monitorActor(monitor(input, output, f)) {}
|
||||
Output const& get() const override { return output->get(); }
|
||||
Future<Void> onChange() const override { return output->onChange(); }
|
||||
};
|
||||
|
||||
template <class Output>
|
||||
template <class Input, class F>
|
||||
Reference<IDependentAsyncVar<Output>> IDependentAsyncVar<Output>::create(Reference<AsyncVar<Input>> const& input,
|
||||
F const& f) {
|
||||
return makeReference<DependentAsyncVar<Input, Output, F>>(input, f);
|
||||
}
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
||||
#endif
|
||||
|
|
|
@ -4,4 +4,4 @@ useDB=false
|
|||
|
||||
testName=UnitTests
|
||||
maxTestCases=100
|
||||
testsMatching=/fdbserver/ConfigDB/LocalConfiguration/Restart
|
||||
testsMatching=/fdbserver/ConfigDB/
|
||||
|
|
Loading…
Reference in New Issue