Added SimpleConfigDatabase compaction (not yet tested)

This commit is contained in:
sfc-gh-tclinkenbeard 2021-04-23 09:22:47 -07:00
parent e3be3bd90c
commit 1a6dcd6677
4 changed files with 67 additions and 15 deletions

View File

@ -84,6 +84,10 @@ struct VersionedMutationRef {
VersionedMutationRef()=default; VersionedMutationRef()=default;
explicit VersionedMutationRef(Arena &arena, Version version, MutationRef mutation) : version(version), mutation(arena, mutation) {} explicit VersionedMutationRef(Arena &arena, Version version, MutationRef mutation) : version(version), mutation(arena, mutation) {}
explicit VersionedMutationRef(Arena& arena, VersionedMutationRef const& rhs)
: version(rhs.version), mutation(arena, rhs.mutation) {}
size_t expectedSize() const { return sizeof(Version) + mutation.expectedSize(); }
template<class Ar> template<class Ar>
void serialize(Ar &ar) { void serialize(Ar &ar) {
@ -125,12 +129,12 @@ struct ConfigFollowerGetChangesRequest {
struct ConfigFollowerCompactRequest { struct ConfigFollowerCompactRequest {
static constexpr FileIdentifier file_identifier = 568910; static constexpr FileIdentifier file_identifier = 568910;
Version lastTruncatedVersion; Version version;
ReplyPromise<Void> reply; ReplyPromise<Void> reply;
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, lastTruncatedVersion, reply); serializer(ar, version, reply);
} }
}; };

View File

@ -22,8 +22,9 @@
class SimpleConfigBroadcasterImpl { class SimpleConfigBroadcasterImpl {
Reference<ConfigFollowerInterface> subscriber; Reference<ConfigFollowerInterface> subscriber;
std::map<Key, Value> fullDatabase; std::map<Key, Value> database;
Standalone<VectorRef<VersionedMutationRef>> versionedMutations; // TODO: Should create fewer arenas
std::deque<Standalone<VersionedMutationRef>> versionedMutations;
Version lastCompactedVersion; Version lastCompactedVersion;
Version mostRecentVersion; Version mostRecentVersion;
ActorCollection actors{ false }; ActorCollection actors{ false };
@ -35,7 +36,7 @@ class SimpleConfigBroadcasterImpl {
ConfigFollowerGetChangesReply reply = wait( ConfigFollowerGetChangesReply reply = wait(
self->subscriber->getChanges.getReply(ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} })); self->subscriber->getChanges.getReply(ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} }));
for (const auto &versionedMutation : reply.versionedMutations) { for (const auto &versionedMutation : reply.versionedMutations) {
self->versionedMutations.push_back(self->versionedMutations.arena(), versionedMutation); self->versionedMutations.push_back(versionedMutation);
} }
self->mostRecentVersion = reply.mostRecentVersion; self->mostRecentVersion = reply.mostRecentVersion;
wait(delay(POLLING_INTERVAL)); wait(delay(POLLING_INTERVAL));
@ -61,7 +62,7 @@ class SimpleConfigBroadcasterImpl {
self->mostRecentVersion = versionReply.version; self->mostRecentVersion = versionReply.version;
ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber->getFullDatabase.getReply( ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber->getFullDatabase.getReply(
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} })); ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} }));
self->fullDatabase = reply.database; self->database = reply.database;
self->actors.add(fetchUpdates(self)); self->actors.add(fetchUpdates(self));
loop { loop {
//self->traceQueuedMutations(); //self->traceQueuedMutations();
@ -71,7 +72,7 @@ class SimpleConfigBroadcasterImpl {
} }
when(ConfigFollowerGetFullDatabaseRequest req = waitNext(publisher->getFullDatabase.getFuture())) { when(ConfigFollowerGetFullDatabaseRequest req = waitNext(publisher->getFullDatabase.getFuture())) {
ConfigFollowerGetFullDatabaseReply reply; ConfigFollowerGetFullDatabaseReply reply;
reply.database = self->fullDatabase; reply.database = self->database;
for (const auto &versionedMutation : self->versionedMutations) { for (const auto &versionedMutation : self->versionedMutations) {
const auto &version = versionedMutation.version; const auto &version = versionedMutation.version;
const auto &mutation = versionedMutation.mutation; const auto &mutation = versionedMutation.mutation;
@ -82,22 +83,41 @@ class SimpleConfigBroadcasterImpl {
reply.database[mutation.param1] = mutation.param2; reply.database[mutation.param1] = mutation.param2;
} else if (mutation.type == MutationRef::ClearRange) { } else if (mutation.type == MutationRef::ClearRange) {
reply.database.erase(reply.database.find(mutation.param1), reply.database.find(mutation.param2)); reply.database.erase(reply.database.find(mutation.param1), reply.database.find(mutation.param2));
} else {
ASSERT(false);
} }
} }
req.reply.send(ConfigFollowerGetFullDatabaseReply{self->fullDatabase}); req.reply.send(ConfigFollowerGetFullDatabaseReply{ self->database });
} }
when(ConfigFollowerGetChangesRequest req = waitNext(publisher->getChanges.getFuture())) { when(ConfigFollowerGetChangesRequest req = waitNext(publisher->getChanges.getFuture())) {
ConfigFollowerGetChangesReply reply; ConfigFollowerGetChangesReply reply;
reply.mostRecentVersion = self->mostRecentVersion; reply.mostRecentVersion = self->mostRecentVersion;
for (const auto &versionedMutation : self->versionedMutations) { for (const auto &versionedMutation : self->versionedMutations) {
if (versionedMutation.version > req.lastSeenVersion) { if (versionedMutation.version > req.lastSeenVersion) {
reply.versionedMutations.push_back(reply.versionedMutations.arena(), versionedMutation); reply.versionedMutations.push_back_deep(reply.versionedMutations.arena(),
versionedMutation);
} }
} }
req.reply.send(reply); req.reply.send(reply);
} }
when(ConfigFollowerCompactRequest req = waitNext(publisher->compact.getFuture())) { when(ConfigFollowerCompactRequest req = waitNext(publisher->compact.getFuture())) {
// TODO: Implement while (!self->versionedMutations.empty()) {
const auto& versionedMutation = self->versionedMutations.front();
const auto& version = versionedMutation.version;
const auto& mutation = versionedMutation.mutation;
if (version > req.version) {
break;
} else if (mutation.type == MutationRef::SetValue) {
self->database[mutation.param1] = mutation.param2;
} else if (mutation.type == MutationRef::ClearRange) {
self->database.erase(self->database.find(mutation.param1),
self->database.find(mutation.param2));
} else {
ASSERT(false);
}
self->lastCompactedVersion = version;
self->versionedMutations.pop_front();
}
req.reply.send(Void()); req.reply.send(Void());
} }
when(wait(self->actors.getResult())) { ASSERT(false); } when(wait(self->actors.getResult())) { ASSERT(false); }

View File

@ -35,7 +35,6 @@ const KeyRef committedVersionKey = LiteralStringRef("committedVersion");
const KeyRangeRef kvKeys = KeyRangeRef(LiteralStringRef("kv/"), LiteralStringRef("kv0")); const KeyRangeRef kvKeys = KeyRangeRef(LiteralStringRef("kv/"), LiteralStringRef("kv0"));
const KeyRangeRef mutationKeys = KeyRangeRef(LiteralStringRef("mutation/"), LiteralStringRef("mutation0")); const KeyRangeRef mutationKeys = KeyRangeRef(LiteralStringRef("mutation/"), LiteralStringRef("mutation0"));
// FIXME: negative versions break ordering
Key versionedMutationKey(Version version, uint32_t index) { Key versionedMutationKey(Version version, uint32_t index) {
ASSERT(version >= 0); ASSERT(version >= 0);
BinaryWriter bw(IncludeVersion()); BinaryWriter bw(IncludeVersion());
@ -84,6 +83,7 @@ class SimpleConfigDatabaseNodeImpl {
IKeyValueStore* kvStore; // FIXME: Prevent leak IKeyValueStore* kvStore; // FIXME: Prevent leak
std::map<std::string, std::string> config; std::map<std::string, std::string> config;
ActorCollection actors{ false }; ActorCollection actors{ false };
Version lastCompactedVersion{ 0 };
FlowLock globalLock; FlowLock globalLock;
ACTOR static Future<Version> getLiveTransactionVersion(SimpleConfigDatabaseNodeImpl *self) { ACTOR static Future<Version> getLiveTransactionVersion(SimpleConfigDatabaseNodeImpl *self) {
@ -253,6 +253,31 @@ class SimpleConfigDatabaseNodeImpl {
return Void(); return Void();
} }
ACTOR static Future<Void> compact(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerCompactRequest req) {
// TODO: Lock
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, 0, req.version));
self->kvStore->clear(
KeyRangeRef(mutationKeys.begin, versionedMutationKey(req.version, 100000))); // FIXME: This is a hack
for (const auto& versionedMutation : versionedMutations) {
const auto& version = versionedMutation.version;
const auto& mutation = versionedMutation.mutation;
if (version > req.version) {
req.reply.send(Void());
return Void();
} else if (mutation.type == MutationRef::SetValue) {
self->kvStore->set(KeyValueRef(mutation.param1, mutation.param2));
} else if (mutation.type == MutationRef::ClearRange) {
self->kvStore->clear(KeyRangeRef(mutation.param1, mutation.param2));
} else {
ASSERT(false);
}
}
wait(self->kvStore->commit());
req.reply.send(Void());
self->lastCompactedVersion = req.version;
return Void();
}
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerInterface* cfi) { ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerInterface* cfi) {
loop { loop {
choose { choose {
@ -266,7 +291,7 @@ class SimpleConfigDatabaseNodeImpl {
self->actors.add(getChanges(self, req)); self->actors.add(getChanges(self, req));
} }
when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) { when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) {
// TODO: Implement self->actors.add(compact(self, req));
req.reply.send(Void()); req.reply.send(Void());
} }
when(wait(self->actors.getResult())) { ASSERT(false); } when(wait(self->actors.getResult())) { ASSERT(false); }

View File

@ -109,15 +109,18 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
state Future<int> expectedTotal = self->expectedTotal.getFuture(); state Future<int> expectedTotal = self->expectedTotal.getFuture();
state int currentValue = 0; state int currentValue = 0;
loop { loop {
state ConfigFollowerGetChangesReply reply = state ConfigFollowerGetChangesReply changesReply =
wait(cfi->getChanges.getReply(ConfigFollowerGetChangesRequest{ mostRecentVersion, {} })); wait(cfi->getChanges.getReply(ConfigFollowerGetChangesRequest{ mostRecentVersion, {} }));
mostRecentVersion = reply.mostRecentVersion; mostRecentVersion = changesReply.mostRecentVersion;
for (const auto& versionedMutation : reply.versionedMutations) { // wait(cfi->compact.getReply(ConfigFollowerCompactRequest{ mostRecentVersion }));
for (const auto& versionedMutation : changesReply.versionedMutations) {
const auto& mutation = versionedMutation.mutation; const auto& mutation = versionedMutation.mutation;
if (mutation.type == MutationRef::SetValue) { if (mutation.type == MutationRef::SetValue) {
database[mutation.param1] = mutation.param2; database[mutation.param1] = mutation.param2;
} else if (mutation.type == MutationRef::ClearRange) { } else if (mutation.type == MutationRef::ClearRange) {
database.erase(database.find(mutation.param1), database.find(mutation.param2)); database.erase(database.find(mutation.param1), database.find(mutation.param2));
} else {
ASSERT(false);
} }
} }
if (database.count(self->key)) { if (database.count(self->key)) {