Cleanup, refactor, add unit test
This commit is contained in:
parent
782f13c25e
commit
83d6c319fb
|
@ -110,12 +110,12 @@ public:
|
||||||
|
|
||||||
Future<Void> rollback(Version version) { return cfi.rollback.getReply(ConfigFollowerRollbackRequest{ version }); }
|
Future<Void> rollback(Version version) { return cfi.rollback.getReply(ConfigFollowerRollbackRequest{ version }); }
|
||||||
|
|
||||||
Future<Void> rollforward(Version beginVersion,
|
Future<Void> rollforward(Version lastKnownCommitted,
|
||||||
Version endVersion,
|
Version target,
|
||||||
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
||||||
const std::map<Version, ConfigCommitAnnotationRef>& annotations) {
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations) {
|
||||||
return cfi.rollforward.getReply(
|
return cfi.rollforward.getReply(
|
||||||
ConfigFollowerRollforwardRequest{ beginVersion, endVersion, mutations, annotations });
|
ConfigFollowerRollforwardRequest{ lastKnownCommitted, target, mutations, annotations });
|
||||||
}
|
}
|
||||||
|
|
||||||
void restartNode() {
|
void restartNode() {
|
||||||
|
@ -415,11 +415,11 @@ public:
|
||||||
|
|
||||||
Future<Void> compact() { return writeTo.compact(); }
|
Future<Void> compact() { return writeTo.compact(); }
|
||||||
Future<Void> rollback(Version version) { return writeTo.rollback(version); }
|
Future<Void> rollback(Version version) { return writeTo.rollback(version); }
|
||||||
Future<Void> rollforward(Version beginVersion,
|
Future<Void> rollforward(Version lastKnownCommitted,
|
||||||
Version endVersion,
|
Version target,
|
||||||
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
||||||
const std::map<Version, ConfigCommitAnnotationRef>& annotations) {
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations) {
|
||||||
return writeTo.rollforward(beginVersion, endVersion, mutations, annotations);
|
return writeTo.rollforward(lastKnownCommitted, target, mutations, annotations);
|
||||||
}
|
}
|
||||||
Future<Void> getError() const { return writeTo.getError(); }
|
Future<Void> getError() const { return writeTo.getError(); }
|
||||||
};
|
};
|
||||||
|
@ -963,16 +963,15 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/Rollforward") {
|
||||||
state TransactionEnvironment env(params.getDataDir());
|
state TransactionEnvironment env(params.getDataDir());
|
||||||
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
|
||||||
appendVersionedMutation(
|
appendVersionedMutation(
|
||||||
mutations, 0, "class-A"_sr, "test_long_v0"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
|
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
|
||||||
appendVersionedMutation(
|
appendVersionedMutation(
|
||||||
mutations, 1, "class-B"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
|
mutations, 2, "class-B"_sr, "test_long_v2"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
|
||||||
std::map<Version, ConfigCommitAnnotationRef> annotations = {
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||||
{ 0, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } },
|
annotations.emplace_back_deep(annotations.arena(), 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() });
|
||||||
{ 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } }
|
annotations.emplace_back_deep(annotations.arena(), 2, ConfigCommitAnnotationRef{ "unit_test"_sr, now() });
|
||||||
};
|
wait(rollforward(env, 0, 2, mutations, annotations));
|
||||||
wait(rollforward(env, 0, 1, mutations, annotations));
|
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 1 }));
|
||||||
wait(check(env, "class-A"_sr, "test_long_v0"_sr, Optional<int64_t>{ 1 }));
|
wait(check(env, "class-B"_sr, "test_long_v2"_sr, Optional<int64_t>{ 2 }));
|
||||||
wait(check(env, "class-B"_sr, "test_long_v1"_sr, Optional<int64_t>{ 2 }));
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -981,18 +980,16 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/RollforwardWithExistingMutation") {
|
||||||
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
|
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
|
||||||
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
|
||||||
appendVersionedMutation(
|
appendVersionedMutation(
|
||||||
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
|
mutations, 2, "class-A"_sr, "test_long_v2"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
|
||||||
appendVersionedMutation(
|
appendVersionedMutation(
|
||||||
mutations, 2, "class-A"_sr, "test_long_v2"_sr, KnobValueRef::create(int64_t{ 3 }).contents());
|
mutations, 3, "class-A"_sr, "test_long_v3"_sr, KnobValueRef::create(int64_t{ 3 }).contents());
|
||||||
std::map<Version, ConfigCommitAnnotationRef> annotations = {
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||||
{ 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } },
|
annotations.emplace_back_deep(annotations.arena(), 2, ConfigCommitAnnotationRef{ "unit_test"_sr, now() });
|
||||||
{ 2, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } }
|
annotations.emplace_back_deep(annotations.arena(), 3, ConfigCommitAnnotationRef{ "unit_test"_sr, now() });
|
||||||
};
|
wait(rollforward(env, 1, 3, mutations, annotations));
|
||||||
wait(rollforward(env, 1, 2, mutations, annotations));
|
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{ 1 }));
|
||||||
// Existing mutations will be overwritten by the rollforward request.
|
wait(check(env, "class-A"_sr, "test_long_v2"_sr, Optional<int64_t>{ 2 }));
|
||||||
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{}));
|
wait(check(env, "class-A"_sr, "test_long_v3"_sr, Optional<int64_t>{ 3 }));
|
||||||
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 2 }));
|
|
||||||
wait(check(env, "class-A"_sr, "test_long_v2"_sr, Optional<int64_t>{ 3 }));
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1003,15 +1000,30 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/RollforwardWithInvalidMutation") {
|
||||||
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
|
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
|
||||||
appendVersionedMutation(
|
appendVersionedMutation(
|
||||||
mutations, 10, "class-A"_sr, "test_long_v10"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
|
mutations, 10, "class-A"_sr, "test_long_v10"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
|
||||||
std::map<Version, ConfigCommitAnnotationRef> annotations = {
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||||
{ 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } }
|
annotations.emplace_back_deep(annotations.arena(), 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() });
|
||||||
};
|
|
||||||
wait(rollforward(env, 0, 5, mutations, annotations));
|
wait(rollforward(env, 0, 5, mutations, annotations));
|
||||||
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 1 }));
|
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 1 }));
|
||||||
wait(check(env, "class-A"_sr, "test_long_v10"_sr, Optional<int64_t>{}));
|
wait(check(env, "class-A"_sr, "test_long_v10"_sr, Optional<int64_t>{}));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("/fdbserver/ConfigDB/Transaction/RollbackThenRollforward") {
|
||||||
|
state TransactionEnvironment env(params.getDataDir());
|
||||||
|
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
|
||||||
|
wait(rollback(env, 0));
|
||||||
|
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{}));
|
||||||
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
|
||||||
|
appendVersionedMutation(
|
||||||
|
mutations, 1, "class-B"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
|
||||||
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||||
|
annotations.emplace_back_deep(annotations.arena(), 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() });
|
||||||
|
wait(rollforward(env, 0, 1, mutations, annotations));
|
||||||
|
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{}));
|
||||||
|
wait(check(env, "class-B"_sr, "test_long_v1"_sr, Optional<int64_t>{ 2 }));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("/fdbserver/ConfigDB/Transaction/GetConfigClasses") {
|
TEST_CASE("/fdbserver/ConfigDB/Transaction/GetConfigClasses") {
|
||||||
wait(testGetConfigClasses(params, false));
|
wait(testGetConfigClasses(params, false));
|
||||||
return Void();
|
return Void();
|
||||||
|
|
|
@ -166,22 +166,22 @@ struct ConfigFollowerRollbackRequest {
|
||||||
|
|
||||||
struct ConfigFollowerRollforwardRequest {
|
struct ConfigFollowerRollforwardRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 678894;
|
static constexpr FileIdentifier file_identifier = 678894;
|
||||||
Version beginVersion{ 0 };
|
Version lastKnownCommitted{ 0 };
|
||||||
Version endVersion{ 0 };
|
Version target{ 0 };
|
||||||
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
|
||||||
std::map<Version, ConfigCommitAnnotationRef> annotations;
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||||
ReplyPromise<Void> reply;
|
ReplyPromise<Void> reply;
|
||||||
|
|
||||||
ConfigFollowerRollforwardRequest() = default;
|
ConfigFollowerRollforwardRequest() = default;
|
||||||
explicit ConfigFollowerRollforwardRequest(Version beginVersion,
|
explicit ConfigFollowerRollforwardRequest(Version lastKnownCommitted,
|
||||||
Version endVersion,
|
Version target,
|
||||||
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
||||||
std::map<Version, ConfigCommitAnnotationRef> annotations)
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations)
|
||||||
: beginVersion(beginVersion), endVersion(endVersion), mutations(mutations), annotations(annotations) {}
|
: lastKnownCommitted(lastKnownCommitted), target(target), mutations(mutations), annotations(annotations) {}
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
serializer(ar, beginVersion, endVersion, mutations, annotations, reply);
|
serializer(ar, lastKnownCommitted, target, mutations, annotations, reply);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -318,7 +318,7 @@ class ConfigNodeImpl {
|
||||||
|
|
||||||
ACTOR static Future<Void> commitMutations(ConfigNodeImpl* self,
|
ACTOR static Future<Void> commitMutations(ConfigNodeImpl* self,
|
||||||
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
|
||||||
std::map<Version, ConfigCommitAnnotationRef> annotations,
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations,
|
||||||
Version commitVersion) {
|
Version commitVersion) {
|
||||||
Version latestVersion = 0;
|
Version latestVersion = 0;
|
||||||
int index = 0;
|
int index = 0;
|
||||||
|
@ -332,8 +332,6 @@ class ConfigNodeImpl {
|
||||||
latestVersion = mutation.version;
|
latestVersion = mutation.version;
|
||||||
index = 0;
|
index = 0;
|
||||||
}
|
}
|
||||||
// TODO: This ASSERT might be unnecessary since compaction destroys annotations
|
|
||||||
ASSERT(annotations.find(mutation.version) != annotations.end());
|
|
||||||
Key key = versionedMutationKey(mutation.version, index++);
|
Key key = versionedMutationKey(mutation.version, index++);
|
||||||
Value value = ObjectWriter::toValue(mutation.mutation, IncludeVersion());
|
Value value = ObjectWriter::toValue(mutation.mutation, IncludeVersion());
|
||||||
if (mutation.mutation.isSet()) {
|
if (mutation.mutation.isSet()) {
|
||||||
|
@ -348,9 +346,9 @@ class ConfigNodeImpl {
|
||||||
}
|
}
|
||||||
self->kvStore->set(KeyValueRef(key, value));
|
self->kvStore->set(KeyValueRef(key, value));
|
||||||
}
|
}
|
||||||
for (const auto& [version, annotation] : annotations) {
|
for (const auto& annotation : annotations) {
|
||||||
self->kvStore->set(
|
self->kvStore->set(KeyValueRef(versionedAnnotationKey(annotation.version),
|
||||||
KeyValueRef(versionedAnnotationKey(version), BinaryWriter::toValue(annotation, IncludeVersion())));
|
BinaryWriter::toValue(annotation.annotation, IncludeVersion())));
|
||||||
}
|
}
|
||||||
ConfigGeneration newGeneration = { commitVersion, commitVersion };
|
ConfigGeneration newGeneration = { commitVersion, commitVersion };
|
||||||
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(newGeneration, IncludeVersion())));
|
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(newGeneration, IncludeVersion())));
|
||||||
|
@ -374,8 +372,9 @@ class ConfigNodeImpl {
|
||||||
for (const auto& mutation : req.mutations) {
|
for (const auto& mutation : req.mutations) {
|
||||||
mutations.emplace_back_deep(mutations.arena(), req.generation.liveVersion, mutation);
|
mutations.emplace_back_deep(mutations.arena(), req.generation.liveVersion, mutation);
|
||||||
}
|
}
|
||||||
wait(commitMutations(
|
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
|
||||||
self, mutations, { { req.generation.liveVersion, req.annotation } }, req.generation.liveVersion));
|
annotations.emplace_back_deep(annotations.arena(), req.generation.liveVersion, req.annotation);
|
||||||
|
wait(commitMutations(self, mutations, annotations, req.generation.liveVersion));
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
@ -492,12 +491,13 @@ class ConfigNodeImpl {
|
||||||
|
|
||||||
ACTOR static Future<Void> rollforward(ConfigNodeImpl* self, ConfigFollowerRollforwardRequest req) {
|
ACTOR static Future<Void> rollforward(ConfigNodeImpl* self, ConfigFollowerRollforwardRequest req) {
|
||||||
ConfigGeneration currentGeneration = wait(getGeneration(self));
|
ConfigGeneration currentGeneration = wait(getGeneration(self));
|
||||||
if (req.beginVersion != currentGeneration.committedVersion) {
|
if (req.lastKnownCommitted != currentGeneration.committedVersion) {
|
||||||
++self->failedCommits;
|
++self->failedCommits;
|
||||||
req.reply.sendError(transaction_too_old());
|
req.reply.sendError(not_committed());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
wait(commitMutations(self, req.mutations, req.annotations, req.endVersion));
|
ASSERT(req.mutations[0].version > currentGeneration.committedVersion);
|
||||||
|
wait(commitMutations(self, req.mutations, req.annotations, req.target));
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue