Normalize configuration commit annotations

This commit is contained in:
sfc-gh-tclinkenbeard 2021-05-24 01:25:50 -07:00
parent 1cb37a6216
commit 3d733deec9
12 changed files with 210 additions and 104 deletions

View File

@ -62,8 +62,6 @@ inline bool operator<(ConfigKeyRef const& lhs, ConfigKeyRef const& rhs) {
class ConfigMutationRef {
ConfigKeyRef key;
Optional<ValueRef> value;
StringRef description{ ""_sr };
double timestamp{ 0.0 };
public:
ConfigMutationRef() = default;
@ -77,13 +75,8 @@ public:
Optional<ValueRef> getValue() const { return value; }
ConfigMutationRef(Arena& arena, ConfigMutationRef const& rhs)
: key(arena, rhs.key), value(arena, rhs.value), description(arena, rhs.description), timestamp(rhs.timestamp) {}
ConfigMutationRef(Arena& arena, ConfigMutationRef const& rhs) : key(arena, rhs.key), value(arena, rhs.value) {}
StringRef getDescription() const { return description; }
void setDescription(StringRef description) { this->description = description; }
double getTimestamp() const { return timestamp; }
void setTimestamp(double timestamp) { this->timestamp = timestamp; }
bool isSet() const { return value.present(); }
static Standalone<ConfigMutationRef> createConfigMutation(KeyRef encodedKey, Optional<ValueRef> value) {
@ -93,11 +86,28 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, value, description, timestamp);
serializer(ar, key, value);
}
size_t expectedSize() const {
return key.expectedSize() + value.expectedSize() + description.expectedSize() + sizeof(decltype(timestamp));
}
size_t expectedSize() const { return key.expectedSize() + value.expectedSize(); }
};
using ConfigMutation = Standalone<ConfigMutationRef>;
struct ConfigCommitAnnotationRef {
KeyRef description;
double timestamp{ 0.0 };
ConfigCommitAnnotationRef() = default;
explicit ConfigCommitAnnotationRef(KeyRef description, double timestamp)
: description(description), timestamp(timestamp) {}
explicit ConfigCommitAnnotationRef(Arena& arena, ConfigCommitAnnotationRef& rhs)
: description(arena, rhs.description), timestamp(rhs.timestamp) {}
size_t expectedSize() const { return description.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, timestamp, description);
}
};
using ConfigCommitAnnotation = Standalone<ConfigCommitAnnotationRef>;

View File

@ -79,17 +79,17 @@ struct ConfigTransactionGetRequest {
struct ConfigTransactionCommitRequest {
static constexpr FileIdentifier file_identifier = 103841;
Version version;
Standalone<VectorRef<ConfigMutationRef>> mutations;
Arena arena;
Version version{ ::invalidVersion };
VectorRef<ConfigMutationRef> mutations;
ConfigCommitAnnotationRef annotation;
ReplyPromise<Void> reply;
ConfigTransactionCommitRequest() = default;
ConfigTransactionCommitRequest(Version version, Standalone<VectorRef<ConfigMutationRef>> mutations)
: version(version), mutations(mutations) {}
size_t expectedSize() const { return mutations.expectedSize() + annotation.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, mutations, reply);
serializer(ar, arena, version, mutations, annotation, reply);
}
};

View File

@ -32,7 +32,6 @@ protected:
public:
virtual ~IConfigTransaction() = default;
virtual void fullReset() = 0;
static Reference<IConfigTransaction> createSimple(ConfigTransactionInterface const&);
static Reference<IConfigTransaction> createSimple(ClusterConnectionString const&);

View File

@ -56,5 +56,6 @@ public:
void reset() override;
void debugTransaction(UID dID) override;
void checkDeferredError() const override;
void fullReset() override;
void fullReset();
};

View File

@ -26,9 +26,8 @@
#include "flow/actorcompiler.h" // This must be the last #include.
class SimpleConfigTransactionImpl {
Standalone<VectorRef<ConfigMutationRef>> mutations;
Future<Version> version;
Key description;
ConfigTransactionCommitRequest toCommit;
Future<Version> getVersionFuture;
ConfigTransactionInterface cti;
int numRetries{ 0 };
bool committed{ false };
@ -49,10 +48,10 @@ class SimpleConfigTransactionImpl {
}
ACTOR static Future<Optional<Value>> get(SimpleConfigTransactionImpl* self, KeyRef key) {
if (!self->version.isValid()) {
self->version = getReadVersion(self);
if (!self->getVersionFuture.isValid()) {
self->getVersionFuture = getReadVersion(self);
}
Version version = wait(self->version);
Version version = wait(self->getVersionFuture);
auto configKey = ConfigKey::decodeKey(key);
if (self->dID.present()) {
TraceEvent("SimpleConfigTransactionGettingValue", self->dID.get())
@ -83,16 +82,12 @@ class SimpleConfigTransactionImpl {
}
ACTOR static Future<Void> commit(SimpleConfigTransactionImpl* self) {
if (!self->version.isValid()) {
self->version = getReadVersion(self);
if (!self->getVersionFuture.isValid()) {
self->getVersionFuture = getReadVersion(self);
}
Version version = wait(self->version);
auto commitTime = now();
for (auto& mutation : self->mutations) {
mutation.setTimestamp(commitTime);
mutation.setDescription(self->description);
}
wait(self->cti.commit.getReply(ConfigTransactionCommitRequest(version, self->mutations)));
wait(store(self->toCommit.version, self->getVersionFuture));
self->toCommit.annotation.timestamp = now();
wait(self->cti.commit.getReply(self->toCommit));
self->committed = true;
return Void();
}
@ -108,17 +103,17 @@ public:
void set(KeyRef key, ValueRef value) {
if (key == "\xff\xff/description"_sr) {
description = value;
toCommit.annotation.description = KeyRef(toCommit.arena, value);
} else {
mutations.push_back_deep(mutations.arena(), ConfigMutationRef::createConfigMutation(key, value));
toCommit.mutations.push_back_deep(toCommit.arena, ConfigMutationRef::createConfigMutation(key, value));
}
}
void clear(KeyRef key) {
if (key == "\xff\xff/description"_sr) {
description = ""_sr;
toCommit.annotation.description = ""_sr;
} else {
mutations.emplace_back_deep(mutations.arena(), ConfigMutationRef::createConfigMutation(key, {}));
toCommit.mutations.push_back_deep(toCommit.arena, ConfigMutationRef::createConfigMutation(key, {}));
}
}
@ -138,24 +133,24 @@ public:
}
Future<Version> getReadVersion() {
if (!version.isValid())
version = getReadVersion(this);
return version;
if (!getVersionFuture.isValid())
getVersionFuture = getReadVersion(this);
return getVersionFuture;
}
Optional<Version> getCachedReadVersion() {
if (version.isValid() && version.isReady() && !version.isError()) {
return version.get();
Optional<Version> getCachedReadVersion() const {
if (getVersionFuture.isValid() && getVersionFuture.isReady() && !getVersionFuture.isError()) {
return getVersionFuture.get();
} else {
return {};
}
}
Version getCommittedVersion() const { return committed ? version.get() : ::invalidVersion; }
Version getCommittedVersion() const { return committed ? getVersionFuture.get() : ::invalidVersion; }
void reset() {
version = Future<Version>{};
mutations = Standalone<VectorRef<ConfigMutationRef>>{};
getVersionFuture = Future<Version>{};
toCommit = {};
committed = false;
}
@ -165,7 +160,7 @@ public:
reset();
}
size_t getApproximateSize() const { return mutations.expectedSize(); }
size_t getApproximateSize() const { return toCommit.expectedSize(); }
void debugTransaction(UID dID) {
this->dID = dID;

View File

@ -59,9 +59,10 @@ public:
void reset() override;
void debugTransaction(UID dID) override;
void checkDeferredError() const override;
void fullReset() override;
int64_t getApproximateSize() const override;
void set(KeyRef const&, ValueRef const&) override;
void clear(KeyRangeRef const&) override;
void clear(KeyRef const&) override;
void fullReset();
};

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <algorithm>
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/IConfigConsumer.h"
#include "flow/UnitTest.h"
@ -113,7 +115,8 @@ public:
class ConfigBroadcasterImpl {
PendingRequestStore pending;
std::map<ConfigKey, Value> snapshot;
std::deque<Standalone<VersionedConfigMutationRef>> mutationHistory;
std::deque<VersionedConfigMutation> mutationHistory;
std::deque<VersionedConfigCommitAnnotation> annotationHistory;
Version lastCompactedVersion;
Version mostRecentVersion;
std::unique_ptr<IConfigConsumer> consumer;
@ -214,9 +217,12 @@ class ConfigBroadcasterImpl {
}
}
void addChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes, Version mostRecentVersion) {
void addChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
this->mostRecentVersion = mostRecentVersion;
mutationHistory.insert(mutationHistory.end(), changes.begin(), changes.end());
annotationHistory.insert(annotationHistory.end(), annotations.begin(), annotations.end());
for (const auto& change : changes) {
const auto& mutation = change.mutation;
if (mutation.isSet()) {
@ -239,12 +245,15 @@ public:
return serve(self, this, cbfi);
}
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes, Version mostRecentVersion) {
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
TraceEvent(SevDebug, "ConfigBroadcasterApplyingChanges", id)
.detail("ChangesSize", changes.size())
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("NewMostRecentVersion", mostRecentVersion);
addChanges(changes, mostRecentVersion);
.detail("NewMostRecentVersion", mostRecentVersion)
.detail("AnnotationsSize", annotations.size());
addChanges(changes, mostRecentVersion, annotations);
notifyFollowers(changes);
}
@ -252,15 +261,17 @@ public:
void applySnapshotAndChanges(Snapshot&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion) {
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
TraceEvent(SevDebug, "ConfigBroadcasterApplyingSnapshotAndChanges", id)
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("SnapshotSize", snapshot.size())
.detail("SnapshotVersion", snapshotVersion)
.detail("ChangesSize", changes.size())
.detail("ChangesVersion", changesVersion);
.detail("ChangesVersion", changesVersion)
.detail("AnnotationsSize", annotations.size());
setSnapshot(std::forward<Snapshot>(snapshot), snapshotVersion);
addChanges(changes, changesVersion);
addChanges(changes, changesVersion, annotations);
notifyOutdatedRequests();
}
@ -290,14 +301,21 @@ public:
JsonBuilderObject mutationObject;
mutationObject["version"] = versionedMutation.version;
const auto& mutation = versionedMutation.mutation;
mutationObject["description"] = mutation.getDescription();
mutationObject["config_class"] = mutation.getConfigClass().orDefault("<global>"_sr);
mutationObject["knob_name"] = mutation.getKnobName();
mutationObject["knob_value"] = mutation.getValue().orDefault("<cleared>"_sr);
mutationObject["timestamp"] = mutation.getTimestamp();
mutationsArray.push_back(std::move(mutationObject));
}
result["mutations"] = std::move(mutationsArray);
JsonBuilderArray commitsArray;
for (const auto& versionedAnnotation : annotationHistory) {
JsonBuilderObject commitObject;
commitObject["version"] = versionedAnnotation.version;
commitObject["description"] = versionedAnnotation.annotation.description;
commitObject["timestamp"] = versionedAnnotation.annotation.timestamp;
commitsArray.push_back(std::move(commitObject));
}
result["commits"] = std::move(commitsArray);
JsonBuilderObject snapshotObject;
std::map<Optional<Key>, std::vector<std::pair<Key, Value>>> snapshotMap;
for (const auto& [configKey, value] : snapshot) {
@ -316,8 +334,19 @@ public:
return result;
}
void compact() {
// TODO: Implement this
void compact(Version compactionVersion) {
{
auto it = std::find_if(mutationHistory.begin(), mutationHistory.end(), [compactionVersion](const auto& vm) {
return vm.version > compactionVersion;
});
mutationHistory.erase(mutationHistory.begin(), it);
}
{
auto it = std::find_if(annotationHistory.begin(),
annotationHistory.end(),
[compactionVersion](const auto& va) { return va.version > compactionVersion; });
annotationHistory.erase(annotationHistory.begin(), it);
}
}
UID getID() const { return id; }
@ -340,22 +369,27 @@ Future<Void> ConfigBroadcaster::serve(ConfigBroadcastFollowerInterface const& cb
}
void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion) {
impl->applyChanges(changes, mostRecentVersion);
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
impl->applyChanges(changes, mostRecentVersion, annotations);
}
void ConfigBroadcaster::applySnapshotAndChanges(std::map<ConfigKey, Value> const& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion) {
impl->applySnapshotAndChanges(snapshot, snapshotVersion, changes, changesVersion);
void ConfigBroadcaster::applySnapshotAndChanges(
std::map<ConfigKey, Value> const& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
impl->applySnapshotAndChanges(snapshot, snapshotVersion, changes, changesVersion, annotations);
}
void ConfigBroadcaster::applySnapshotAndChanges(std::map<ConfigKey, Value>&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion) {
impl->applySnapshotAndChanges(std::move(snapshot), snapshotVersion, changes, changesVersion);
void ConfigBroadcaster::applySnapshotAndChanges(
std::map<ConfigKey, Value>&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
impl->applySnapshotAndChanges(std::move(snapshot), snapshotVersion, changes, changesVersion, annotations);
}
UID ConfigBroadcaster::getID() const {
@ -366,8 +400,8 @@ JsonBuilderObject ConfigBroadcaster::getStatus() const {
return impl->getStatus();
}
void ConfigBroadcaster::compact() {
impl->compact();
void ConfigBroadcaster::compact(Version compactionVersion) {
impl->compact(compactionVersion);
}
namespace {

View File

@ -37,18 +37,22 @@ public:
ConfigBroadcaster& operator=(ConfigBroadcaster&&);
~ConfigBroadcaster();
Future<Void> serve(ConfigBroadcastFollowerInterface const&);
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes, Version mostRecentVersion);
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);
void applySnapshotAndChanges(std::map<ConfigKey, Value> const& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion);
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);
void applySnapshotAndChanges(std::map<ConfigKey, Value>&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion);
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);
UID getID() const;
JsonBuilderObject getStatus() const;
void compact();
void compact(Version compactionVersion);
public: // Testing
explicit ConfigBroadcaster(ConfigFollowerInterface const&);

View File

@ -216,7 +216,7 @@ class BroadcasterToLocalConfigEnvironment {
void addMutation(Optional<KeyRef> configClass, Optional<ValueRef> value) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
appendVersionedMutation(versionedMutations, ++lastWrittenVersion, configClass, "test_long"_sr, value);
broadcaster.applyChanges(versionedMutations, lastWrittenVersion);
broadcaster.applyChanges(versionedMutations, lastWrittenVersion, {});
}
public:
@ -242,7 +242,7 @@ public:
return readFrom.restartLocalConfig(newConfigPath);
}
void compact() { broadcaster.compact(); }
void compact() { broadcaster.compact(lastWrittenVersion); }
Future<Void> getError() const { return readFrom.getError() || broadcastServer; }
};

View File

@ -42,23 +42,47 @@ struct VersionedConfigMutationRef {
serializer(ar, version, mutation);
}
};
using VersionedConfigMutation = Standalone<VersionedConfigMutationRef>;
struct VersionedConfigCommitAnnotationRef {
Version version;
ConfigCommitAnnotationRef annotation;
VersionedConfigCommitAnnotationRef() = default;
explicit VersionedConfigCommitAnnotationRef(Arena& arena, Version version, ConfigCommitAnnotationRef annotation)
: version(version), annotation(arena, annotation) {}
explicit VersionedConfigCommitAnnotationRef(Arena& arena, VersionedConfigCommitAnnotationRef rhs)
: version(rhs.version), annotation(arena, rhs.annotation) {}
size_t expectedSize() const { return annotation.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, annotation);
}
};
using VersionedConfigCommitAnnotation = Standalone<VersionedConfigCommitAnnotationRef>;
struct ConfigFollowerGetSnapshotAndChangesReply {
static constexpr FileIdentifier file_identifier = 1734095;
Version snapshotVersion;
Version changesVersion;
std::map<ConfigKey, Value> snapshot;
// TODO: Share arena
Standalone<VectorRef<VersionedConfigMutationRef>> changes;
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
ConfigFollowerGetSnapshotAndChangesReply() = default;
template <class Snapshot>
explicit ConfigFollowerGetSnapshotAndChangesReply(Version snapshotVersion,
Version changesVersion,
Snapshot&& snapshot,
Standalone<VectorRef<VersionedConfigMutationRef>> changes)
explicit ConfigFollowerGetSnapshotAndChangesReply(
Version snapshotVersion,
Version changesVersion,
Snapshot&& snapshot,
Standalone<VectorRef<VersionedConfigMutationRef>> changes,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations)
: snapshotVersion(snapshotVersion), changesVersion(changesVersion), snapshot(std::forward<Snapshot>(snapshot)),
changes(changes) {
ASSERT(changesVersion >= snapshotVersion);
changes(changes), annotations(annotations) {
ASSERT_GE(changesVersion, snapshotVersion);
}
template <class Ar>
@ -80,16 +104,19 @@ struct ConfigFollowerGetSnapshotAndChangesRequest {
struct ConfigFollowerGetChangesReply {
static constexpr FileIdentifier file_identifier = 234859;
Version mostRecentVersion;
// TODO: Share arena
Standalone<VectorRef<VersionedConfigMutationRef>> changes;
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
ConfigFollowerGetChangesReply() : mostRecentVersion(0) {}
explicit ConfigFollowerGetChangesReply(Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes)
: mostRecentVersion(mostRecentVersion), changes(changes) {}
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations)
: mostRecentVersion(mostRecentVersion), changes(changes), annotations(annotations) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mostRecentVersion, changes);
serializer(ar, mostRecentVersion, changes, annotations);
}
};

View File

@ -64,7 +64,7 @@ class SimpleConfigConsumerImpl {
}
if (reply.mostRecentVersion > self->lastSeenVersion) {
self->lastSeenVersion = reply.mostRecentVersion;
broadcaster->applyChanges(reply.changes, reply.mostRecentVersion);
broadcaster->applyChanges(reply.changes, reply.mostRecentVersion, reply.annotations);
}
wait(delayJittered(self->pollingInterval));
} catch (Error& e) {
@ -88,7 +88,7 @@ class SimpleConfigConsumerImpl {
.detail("ChangesVersion", reply.changesVersion)
.detail("ChangesSize", reply.changes.size());
broadcaster->applySnapshotAndChanges(
std::move(reply.snapshot), reply.snapshotVersion, reply.changes, reply.changesVersion);
std::move(reply.snapshot), reply.snapshotVersion, reply.changes, reply.changesVersion, reply.annotations);
self->lastSeenVersion = reply.changesVersion;
return Void();
}

View File

@ -36,9 +36,20 @@ const KeyRef liveTransactionVersionKey = "liveTransactionVersion"_sr;
const KeyRef committedVersionKey = "committedVersion"_sr;
const KeyRangeRef kvKeys = KeyRangeRef("kv/"_sr, "kv0"_sr);
const KeyRangeRef mutationKeys = KeyRangeRef("mutation/"_sr, "mutation0"_sr);
const KeyRangeRef annotationKeys = KeyRangeRef("annotation/"_sr, "annotation0"_sr);
Key versionedAnnotationKey(Version version) {
ASSERT_GE(version, 0);
return BinaryWriter::toValue(bigEndian64(version), IncludeVersion()).withPrefix(annotationKeys.begin);
}
Version getVersionFromVersionedAnnotationKey(KeyRef versionedAnnotationKey) {
return fromBigEndian64(BinaryReader::fromStringRef<uint64_t>(
versionedAnnotationKey.removePrefix(annotationKeys.begin), IncludeVersion()));
}
Key versionedMutationKey(Version version, uint32_t index) {
ASSERT(version >= 0);
ASSERT_GE(version, 0);
BinaryWriter bw(IncludeVersion());
bw << bigEndian64(version);
bw << bigEndian32(index);
@ -142,17 +153,32 @@ class SimpleConfigDatabaseNodeImpl {
return lastCompactedVersion;
}
ACTOR static Future<Standalone<VectorRef<VersionedConfigCommitAnnotationRef>>>
getAnnotations(SimpleConfigDatabaseNodeImpl* self, Version startVersion, Version endVersion) {
Key startKey = versionedAnnotationKey(startVersion);
Key endKey = versionedAnnotationKey(endVersion + 1);
state KeyRangeRef keys(startKey, endKey);
Standalone<RangeResultRef> range = wait(self->kvStore->readRange(keys));
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> result;
for (const auto& kv : range) {
auto version = getVersionFromVersionedAnnotationKey(kv.key);
ASSERT_LE(version, endVersion);
auto annotation = BinaryReader::fromStringRef<ConfigCommitAnnotation>(kv.value, IncludeVersion());
result.emplace_back_deep(result.arena(), version, annotation);
}
return result;
}
ACTOR static Future<Standalone<VectorRef<VersionedConfigMutationRef>>>
getMutations(SimpleConfigDatabaseNodeImpl* self, Version startVersion, Version endVersion) {
Key startVersionKey = versionedMutationKey(startVersion, 0);
state KeyRangeRef keys(startVersionKey, mutationKeys.end);
Key startKey = versionedMutationKey(startVersion, 0);
Key endKey = versionedMutationKey(endVersion + 1, 0);
state KeyRangeRef keys(startKey, endKey);
Standalone<RangeResultRef> range = wait(self->kvStore->readRange(keys));
Standalone<VectorRef<VersionedConfigMutationRef>> result;
for (const auto &kv : range) {
auto version = getVersionFromVersionedMutationKey(kv.key);
if (version > endVersion) {
break;
}
ASSERT_LE(version, endVersion);
auto mutation = BinaryReader::fromStringRef<Standalone<ConfigMutationRef>>(kv.value, IncludeVersion());
result.emplace_back_deep(result.arena(), version, mutation);
}
@ -167,14 +193,17 @@ class SimpleConfigDatabaseNodeImpl {
return Void();
}
state Version committedVersion = wait(getCommittedVersion(self));
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
state Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, req.lastSeenVersion + 1, committedVersion));
state Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> versionedAnnotations =
wait(getAnnotations(self, req.lastSeenVersion + 1, committedVersion));
TraceEvent(SevDebug, "ConfigDatabaseNodeSendingChanges")
.detail("ReqLastSeenVersion", req.lastSeenVersion)
.detail("CommittedVersion", committedVersion)
.detail("NumMutations", versionedMutations.size());
.detail("NumMutations", versionedMutations.size())
.detail("NumCommits", versionedAnnotations.size());
++self->successfulChangeRequests;
req.reply.send(ConfigFollowerGetChangesReply{ committedVersion, versionedMutations });
req.reply.send(ConfigFollowerGetChangesReply{ committedVersion, versionedMutations, versionedAnnotations });
return Void();
}
@ -275,6 +304,8 @@ class SimpleConfigDatabaseNodeImpl {
}
self->kvStore->set(KeyValueRef(key, value));
}
self->kvStore->set(
KeyValueRef(versionedAnnotationKey(req.version), BinaryWriter::toValue(req.annotation, IncludeVersion())));
self->kvStore->set(KeyValueRef(committedVersionKey, BinaryWriter::toValue(req.version, IncludeVersion())));
wait(self->kvStore->commit());
++self->successfulCommits;
@ -317,11 +348,13 @@ class SimpleConfigDatabaseNodeImpl {
wait(store(reply.snapshotVersion, getLastCompactedVersion(self)));
wait(store(reply.changesVersion, getCommittedVersion(self)));
wait(store(reply.changes, getMutations(self, reply.snapshotVersion + 1, reply.changesVersion)));
wait(store(reply.annotations, getAnnotations(self, reply.snapshotVersion + 1, reply.changesVersion)));
TraceEvent(SevDebug, "ConfigDatabaseNodeGettingSnapshot")
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("ChangesVersion", reply.changesVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesSize", reply.changes.size());
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
req.reply.send(reply);
return Void();
}
@ -339,13 +372,15 @@ class SimpleConfigDatabaseNodeImpl {
wait(getMutations(self, lastCompactedVersion + 1, req.version));
self->kvStore->clear(
KeyRangeRef(versionedMutationKey(lastCompactedVersion + 1, 0), versionedMutationKey(req.version + 1, 0)));
self->kvStore->clear(
KeyRangeRef(versionedAnnotationKey(lastCompactedVersion + 1), versionedAnnotationKey(req.version + 1)));
for (const auto& versionedMutation : versionedMutations) {
const auto& version = versionedMutation.version;
const auto& mutation = versionedMutation.mutation;
if (version > req.version) {
break;
} else {
TraceEvent(SevDebug, "ConfigDatabaseNodeCompactionApplyingMutation")
TraceEvent(SevDebug, "ConfigDatabaseNodeCompactionApplyingMutation", self->id)
.detail("IsSet", mutation.isSet())
.detail("MutationVersion", version)
.detail("LastCompactedVersion", lastCompactedVersion)