Several improvements to SimpleConfig pipeline and test workload
This commit is contained in:
parent
8ba59d8aa9
commit
8a30ebf27f
|
@ -78,19 +78,32 @@ struct ConfigFollowerGetFullDatabaseRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct VersionedMutationRef {
|
||||
Version version;
|
||||
MutationRef mutation;
|
||||
|
||||
VersionedMutationRef()=default;
|
||||
explicit VersionedMutationRef(Arena &arena, Version version, MutationRef mutation) : version(version), mutation(arena, mutation) {}
|
||||
|
||||
template<class Ar>
|
||||
void serialize(Ar &ar) {
|
||||
serializer(ar, version, mutation);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetChangesReply {
|
||||
static constexpr FileIdentifier file_identifier = 234859;
|
||||
Version mostRecentVersion;
|
||||
Standalone<VectorRef<MutationRef>> mutations;
|
||||
Standalone<VectorRef<VersionedMutationRef>> versionedMutations;
|
||||
|
||||
ConfigFollowerGetChangesReply() : mostRecentVersion(-1) {}
|
||||
explicit ConfigFollowerGetChangesReply(Version mostRecentVersion,
|
||||
Standalone<VectorRef<MutationRef>> const& mutations)
|
||||
: mostRecentVersion(mostRecentVersion), mutations(mutations) {}
|
||||
Standalone<VectorRef<VersionedMutationRef>> const& versionedMutations)
|
||||
: mostRecentVersion(mostRecentVersion), versionedMutations(versionedMutations) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, mostRecentVersion, mutations);
|
||||
serializer(ar, mostRecentVersion, versionedMutations);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -22,21 +22,100 @@
|
|||
|
||||
class SimpleConfigBroadcasterImpl {
|
||||
ConfigFollowerInterface subscriber;
|
||||
std::map<Key, Value> fullDatabase;
|
||||
Standalone<VectorRef<VersionedMutationRef>> versionedMutations;
|
||||
Version lastCompactedVersion;
|
||||
Version mostRecentVersion;
|
||||
ActorCollection actors{ false };
|
||||
|
||||
static const double POLLING_INTERVAL; // TODO: Make knob?
|
||||
|
||||
ACTOR static Future<Void> fetchUpdates(SimpleConfigBroadcasterImpl *self) {
|
||||
loop {
|
||||
ConfigFollowerGetChangesReply reply = wait(self->subscriber.getChanges.getReply(ConfigFollowerGetChangesRequest{self->mostRecentVersion, {}}));
|
||||
for (const auto &versionedMutation : reply.versionedMutations) {
|
||||
self->versionedMutations.push_back(self->versionedMutations.arena(), versionedMutation);
|
||||
}
|
||||
self->mostRecentVersion = reply.mostRecentVersion;
|
||||
wait(delay(POLLING_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
||||
void traceQueuedMutations() {
|
||||
TraceEvent te("SimpleConfigBroadcasterQueuedMutations");
|
||||
te.detail("Size", versionedMutations.size());
|
||||
int index = 0;
|
||||
for (const auto &versionedMutation : versionedMutations) {
|
||||
te.detail(format("Version%d", index), versionedMutation.version);
|
||||
te.detail(format("Mutation%d", index), versionedMutation.mutation.type);
|
||||
te.detail(format("FirstParam%d", index), versionedMutation.mutation.param1);
|
||||
te.detail(format("SecondParam%d", index), versionedMutation.mutation.param2);
|
||||
++index;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> serve(SimpleConfigBroadcasterImpl *self, ConfigFollowerInterface *publisher) {
|
||||
ConfigFollowerGetVersionReply versionReply = wait(self->subscriber.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
self->mostRecentVersion = versionReply.version;
|
||||
ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber.getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{self->mostRecentVersion, Optional<Value>{}}));
|
||||
self->fullDatabase = reply.database;
|
||||
self->actors.add(fetchUpdates(self));
|
||||
loop {
|
||||
self->traceQueuedMutations();
|
||||
choose {
|
||||
when(ConfigFollowerGetVersionRequest req = waitNext(publisher->getVersion.getFuture())) {
|
||||
req.reply.send(self->mostRecentVersion);
|
||||
}
|
||||
when(ConfigFollowerGetFullDatabaseRequest req = waitNext(publisher->getFullDatabase.getFuture())) {
|
||||
ConfigFollowerGetFullDatabaseReply reply;
|
||||
reply.database = self->fullDatabase;
|
||||
for (const auto &versionedMutation : self->versionedMutations) {
|
||||
const auto &version = versionedMutation.version;
|
||||
const auto &mutation = versionedMutation.mutation;
|
||||
if (version > req.version) {
|
||||
break;
|
||||
}
|
||||
if (mutation.type == MutationRef::SetValue) {
|
||||
reply.database[mutation.param1] = mutation.param2;
|
||||
} else if (mutation.type == MutationRef::ClearRange) {
|
||||
reply.database.erase(reply.database.find(mutation.param1), reply.database.find(mutation.param2));
|
||||
}
|
||||
}
|
||||
req.reply.send(ConfigFollowerGetFullDatabaseReply{self->fullDatabase});
|
||||
}
|
||||
when(ConfigFollowerGetChangesRequest req = waitNext(publisher->getChanges.getFuture())) {
|
||||
ConfigFollowerGetChangesReply reply;
|
||||
reply.mostRecentVersion = self->mostRecentVersion;
|
||||
for (const auto &versionedMutation : self->versionedMutations) {
|
||||
if (versionedMutation.version > req.lastSeenVersion) {
|
||||
reply.versionedMutations.push_back(reply.versionedMutations.arena(), versionedMutation);
|
||||
}
|
||||
}
|
||||
req.reply.send(reply);
|
||||
}
|
||||
when(ConfigFollowerCompactRequest req = waitNext(publisher->compact.getFuture())) {
|
||||
// TODO: Implement
|
||||
req.reply.send(Void());
|
||||
}
|
||||
when(wait(self->actors.getResult())) { ASSERT(false); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
SimpleConfigBroadcasterImpl(ClusterConnectionString const& ccs) {
|
||||
SimpleConfigBroadcasterImpl(ClusterConnectionString const& ccs) : lastCompactedVersion(0), mostRecentVersion(0) {
|
||||
auto coordinators = ccs.coordinators();
|
||||
std::sort(coordinators.begin(), coordinators.end());
|
||||
subscriber = ConfigFollowerInterface(coordinators[0]);
|
||||
}
|
||||
|
||||
Future<Void> serve(ConfigFollowerInterface& publisher) {
|
||||
// TODO: Implement
|
||||
wait(Never());
|
||||
return Void();
|
||||
return serve(this, &publisher);
|
||||
}
|
||||
};
|
||||
|
||||
const double SimpleConfigBroadcasterImpl::POLLING_INTERVAL = 0.5;
|
||||
|
||||
SimpleConfigBroadcaster::SimpleConfigBroadcaster(ClusterConnectionString const& ccs)
|
||||
: impl(std::make_unique<SimpleConfigBroadcasterImpl>(ccs)) {}
|
||||
|
||||
|
|
|
@ -29,18 +29,12 @@
|
|||
|
||||
namespace {
|
||||
|
||||
const KeyRef versionKey = LiteralStringRef("version");
|
||||
const KeyRef liveTransactionVersionKey = LiteralStringRef("liveTransactionVersion");
|
||||
const KeyRef committedVersionKey = LiteralStringRef("committedVersion");
|
||||
const KeyRangeRef kvKeys = KeyRangeRef(LiteralStringRef("kv/"), LiteralStringRef("kv0"));
|
||||
const KeyRangeRef mutationKeys = KeyRangeRef(LiteralStringRef("mutation/"), LiteralStringRef("mutation0"));
|
||||
|
||||
struct VersionedMutationRef {
|
||||
Version version;
|
||||
MutationRef mutation;
|
||||
|
||||
VersionedMutationRef()=default; // FIXME Undefined memory?
|
||||
explicit VersionedMutationRef(Arena &arena, Version version, MutationRef mutation) : version(version), mutation(arena, mutation) {}
|
||||
};
|
||||
|
||||
// FIXME: negative versions break ordering
|
||||
Key versionedMutationKey(Version version, int index) {
|
||||
BinaryWriter bw(IncludeVersion());
|
||||
bw << version;
|
||||
|
@ -63,24 +57,33 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
ActorCollection actors{ false };
|
||||
FlowLock globalLock;
|
||||
|
||||
ACTOR static Future<Version> getCurrentVersion(SimpleConfigDatabaseNodeImpl *self) {
|
||||
Optional<Value> value = wait(self->kvStore->readValue(versionKey));
|
||||
state Version version = 0;
|
||||
ACTOR static Future<Version> getLiveTransactionVersion(SimpleConfigDatabaseNodeImpl *self) {
|
||||
Optional<Value> value = wait(self->kvStore->readValue(liveTransactionVersionKey));
|
||||
state Version liveTransactionVersion = 0;
|
||||
if (value.present()) {
|
||||
BinaryReader br(value.get(), IncludeVersion());
|
||||
br >> version;
|
||||
liveTransactionVersion = BinaryReader::fromStringRef<Version>(value.get(), IncludeVersion());
|
||||
} else {
|
||||
BinaryWriter bw(IncludeVersion());
|
||||
bw << version;
|
||||
self->kvStore->set(KeyValueRef(versionKey, bw.toValue()));
|
||||
self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(liveTransactionVersion, IncludeVersion())));
|
||||
wait(self->kvStore->commit());
|
||||
}
|
||||
return version;
|
||||
return liveTransactionVersion;
|
||||
}
|
||||
|
||||
ACTOR static Future<Standalone<VectorRef<VersionedMutationRef>>> getMutations(SimpleConfigDatabaseNodeImpl *self, int startVersion, int endVersion) {
|
||||
Value serializedStartVersion = BinaryWriter::toValue(startVersion, IncludeVersion());
|
||||
KeyRangeRef keys(serializedStartVersion.withPrefix(mutationKeys.begin), mutationKeys.end);
|
||||
ACTOR static Future<Version> getCommittedVersion(SimpleConfigDatabaseNodeImpl *self) {
|
||||
Optional<Value> value = wait(self->kvStore->readValue(committedVersionKey));
|
||||
state Version committedVersion = 0;
|
||||
if (value.present()) {
|
||||
committedVersion = BinaryReader::fromStringRef<Version>(value.get(), IncludeVersion());
|
||||
} else {
|
||||
self->kvStore->set(KeyValueRef(committedVersionKey, BinaryWriter::toValue(committedVersion, IncludeVersion())));
|
||||
wait(self->kvStore->commit());
|
||||
}
|
||||
return committedVersion;
|
||||
}
|
||||
|
||||
ACTOR static Future<Standalone<VectorRef<VersionedMutationRef>>> getMutations(SimpleConfigDatabaseNodeImpl *self, Version startVersion, Version endVersion) {
|
||||
Key startVersionKey = versionedMutationKey(startVersion, 0);
|
||||
state KeyRangeRef keys(startVersionKey, mutationKeys.end);
|
||||
Standalone<RangeResultRef> range = wait(self->kvStore->readRange(keys));
|
||||
Standalone<VectorRef<VersionedMutationRef>> result;
|
||||
for (const auto &kv : range) {
|
||||
|
@ -94,17 +97,24 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
return result;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getCurrentVersion(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetVersionRequest req) {
|
||||
Version version = wait(getCurrentVersion(self));
|
||||
req.reply.send(ConfigFollowerGetVersionReply(version));
|
||||
ACTOR static Future<Void> getChanges(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetChangesRequest req) {
|
||||
state Version committedVersion = wait(getCommittedVersion(self));
|
||||
Standalone<VectorRef<VersionedMutationRef>> mutations = wait(getMutations(self, req.lastSeenVersion+1, committedVersion));
|
||||
req.reply.send(ConfigFollowerGetChangesReply{committedVersion, mutations});
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getCommittedVersion(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetVersionRequest req) {
|
||||
Version committedVersion = wait(getCommittedVersion(self));
|
||||
req.reply.send(ConfigFollowerGetVersionReply(committedVersion));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getNewVersion(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetVersionRequest req) {
|
||||
wait(self->globalLock.take());
|
||||
state FlowLock::Releaser releaser(self->globalLock);
|
||||
state Version currentVersion = wait(getCurrentVersion(self));
|
||||
self->kvStore->set(KeyValueRef(versionKey, BinaryWriter::toValue(++currentVersion, IncludeVersion())));
|
||||
state Version currentVersion = wait(getLiveTransactionVersion(self));
|
||||
self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(++currentVersion, IncludeVersion())));
|
||||
wait(self->kvStore->commit());
|
||||
req.reply.send(ConfigTransactionGetVersionReply(currentVersion));
|
||||
return Void();
|
||||
|
@ -113,13 +123,13 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
ACTOR static Future<Void> get(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetRequest req) {
|
||||
wait(self->globalLock.take());
|
||||
state FlowLock::Releaser releaser(self->globalLock);
|
||||
Version currentVersion = wait(getCurrentVersion(self));
|
||||
Version currentVersion = wait(getLiveTransactionVersion(self));
|
||||
if (req.version != currentVersion) {
|
||||
req.reply.sendError(transaction_too_old());
|
||||
return Void();
|
||||
}
|
||||
state Optional<Value> value = wait(self->kvStore->readValue(req.key.withPrefix(kvKeys.begin)));
|
||||
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, -1, req.version));
|
||||
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, 0, req.version));
|
||||
for (const auto &versionedMutation : versionedMutations) {
|
||||
const auto &mutation = versionedMutation.mutation;
|
||||
if (mutation.type == MutationRef::Type::SetValue) {
|
||||
|
@ -141,7 +151,7 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
ACTOR static Future<Void> commit(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionCommitRequest req) {
|
||||
wait(self->globalLock.take());
|
||||
state FlowLock::Releaser releaser(self->globalLock);
|
||||
Version currentVersion = wait(getCurrentVersion(self));
|
||||
Version currentVersion = wait(getLiveTransactionVersion(self));
|
||||
if (req.version != currentVersion) {
|
||||
req.reply.sendError(transaction_too_old());
|
||||
return Void();
|
||||
|
@ -152,13 +162,32 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
Value value = BinaryWriter::toValue(mutation, IncludeVersion());
|
||||
self->kvStore->set(KeyValueRef(key, value));
|
||||
}
|
||||
self->kvStore->set(KeyValueRef(committedVersionKey, BinaryWriter::toValue(req.version, IncludeVersion())));
|
||||
wait(self->kvStore->commit());
|
||||
req.reply.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> traceQueuedMutations(SimpleConfigDatabaseNodeImpl *self) {
|
||||
state Version currentVersion = wait(getCommittedVersion(self));
|
||||
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, 0, currentVersion));
|
||||
TraceEvent te("SimpleConfigNodeQueuedMutations");
|
||||
te.detail("Size", versionedMutations.size());
|
||||
te.detail("CommittedVersion", currentVersion);
|
||||
int index = 0;
|
||||
for (const auto &versionedMutation : versionedMutations) {
|
||||
te.detail(format("Version%d", index), versionedMutation.version);
|
||||
te.detail(format("Mutation%d", index), versionedMutation.mutation.type);
|
||||
te.detail(format("FirstParam%d", index), versionedMutation.mutation.param1);
|
||||
te.detail(format("SecondParam%d", index), versionedMutation.mutation.param2);
|
||||
++index;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionInterface* cti) {
|
||||
loop {
|
||||
wait(traceQueuedMutations(self));
|
||||
choose {
|
||||
when(ConfigTransactionGetVersionRequest req = waitNext(cti->getVersion.getFuture())) {
|
||||
self->actors.add(getNewVersion(self, req));
|
||||
|
@ -182,7 +211,7 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
reply.database[kv.key] = kv.value;
|
||||
}
|
||||
Standalone<VectorRef<VersionedMutationRef>> versionedMutations =
|
||||
wait(getMutations(self, ::invalidVersion, req.version));
|
||||
wait(getMutations(self, 0, req.version));
|
||||
for (const auto& versionedMutation : versionedMutations) {
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
if (mutation.type == MutationRef::SetValue) {
|
||||
|
@ -199,15 +228,13 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
loop {
|
||||
choose {
|
||||
when(ConfigFollowerGetVersionRequest req = waitNext(cfi->getVersion.getFuture())) {
|
||||
self->actors.add(getCurrentVersion(self, req));
|
||||
self->actors.add(getCommittedVersion(self, req));
|
||||
}
|
||||
when(ConfigFollowerGetFullDatabaseRequest req = waitNext(cfi->getFullDatabase.getFuture())) {
|
||||
self->actors.add(getFullDatabase(self, req));
|
||||
continue;
|
||||
}
|
||||
when(ConfigFollowerGetChangesRequest req = waitNext(cfi->getChanges.getFuture())) {
|
||||
// TODO: Implement
|
||||
continue;
|
||||
self->actors.add(getChanges(self, req));
|
||||
}
|
||||
when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) {
|
||||
// TODO: Implement
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
|
||||
#include "fdbclient/IConfigTransaction.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/ConfigFollowerInterface.h"
|
||||
#include "fdbserver/IConfigBroadcaster.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
||||
|
@ -52,7 +54,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> start(ConfigurationDatabaseWorkload* self, Database cx) {
|
||||
ACTOR static Future<Void> runClient(Database cx) {
|
||||
state Key key = LiteralStringRef("key");
|
||||
state Key value = LiteralStringRef("value");
|
||||
Optional<Value> currentValue = wait(get(cx, key));
|
||||
|
@ -65,6 +67,41 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> runBroadcaster(Database cx, ConfigFollowerInterface cfi) {
|
||||
state SimpleConfigBroadcaster broadcaster(cx->getConnectionFile()->getConnectionString());
|
||||
wait(success(timeout(broadcaster.serve(cfi), 60.0)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Version> getCurrentVersion(ConfigFollowerInterface cfi) {
|
||||
ConfigFollowerGetVersionReply versionReply = wait(cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
return versionReply.version;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> runConsumer(ConfigFollowerInterface cfi) {
|
||||
state Version mostRecentVersion = wait(getCurrentVersion(cfi));
|
||||
ConfigFollowerGetFullDatabaseReply fullDBReply = wait(cfi.getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{mostRecentVersion, {}}));
|
||||
state std::map<Key, Value> database = fullDBReply.database;
|
||||
state int runs = 0;
|
||||
loop {
|
||||
state ConfigFollowerGetChangesReply changesReply = wait(cfi.getChanges.getReply(ConfigFollowerGetChangesRequest{mostRecentVersion, {}}));
|
||||
wait(delay(1.0));
|
||||
if (++runs > 5) {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> start(ConfigurationDatabaseWorkload *self, Database cx) {
|
||||
state std::vector<Future<Void>> futures;
|
||||
state ConfigFollowerInterface cfi;
|
||||
futures.push_back(runClient(cx));
|
||||
futures.push_back(runBroadcaster(cx, cfi));
|
||||
futures.push_back(runConsumer(cfi));
|
||||
wait(waitForAll(futures));
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
ConfigurationDatabaseWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
|
Loading…
Reference in New Issue