From de871da75f453cfb6c2ba8061cf1f7e6c7f5a57b Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Sun, 18 Jul 2021 14:02:45 -0700 Subject: [PATCH] Add some simple implementations to PaxosConfigTransaction methods --- fdbclient/ConfigTransactionInterface.cpp | 20 +++ fdbclient/ConfigTransactionInterface.h | 3 + fdbclient/PaxosConfigTransaction.actor.cpp | 165 +++++++++++++++----- fdbclient/PaxosConfigTransaction.h | 1 + fdbclient/SimpleConfigTransaction.actor.cpp | 20 +-- 5 files changed, 152 insertions(+), 57 deletions(-) diff --git a/fdbclient/ConfigTransactionInterface.cpp b/fdbclient/ConfigTransactionInterface.cpp index 838e69e091..66618e01d7 100644 --- a/fdbclient/ConfigTransactionInterface.cpp +++ b/fdbclient/ConfigTransactionInterface.cpp @@ -20,6 +20,7 @@ #include "fdbclient/ConfigTransactionInterface.h" #include "fdbclient/CoordinationInterface.h" +#include "fdbclient/SystemData.h" #include "flow/IRandom.h" ConfigTransactionInterface::ConfigTransactionInterface() : _id(deterministicRandom()->randomUniqueID()) {} @@ -53,3 +54,22 @@ bool ConfigGeneration::operator==(ConfigGeneration const& rhs) const { bool ConfigGeneration::operator!=(ConfigGeneration const& rhs) const { return !(*this == rhs); } + +void ConfigTransactionCommitRequest::set(KeyRef key, ValueRef value) { + if (key == configTransactionDescriptionKey) { + annotation.description = KeyRef(arena, value); + } else { + ConfigKey configKey = ConfigKeyRef::decodeKey(key); + auto knobValue = IKnobCollection::parseKnobValue( + configKey.knobName.toString(), value.toString(), IKnobCollection::Type::TEST); + mutations.emplace_back_deep(arena, configKey, knobValue.contents()); + } +} + +void ConfigTransactionCommitRequest::clear(KeyRef key) { + if (key == configTransactionDescriptionKey) { + annotation.description = ""_sr; + } else { + mutations.emplace_back_deep(arena, ConfigKeyRef::decodeKey(key), Optional{}); + } +} diff --git a/fdbclient/ConfigTransactionInterface.h b/fdbclient/ConfigTransactionInterface.h index b5a6437378..d2e19ad0ab 100644 --- a/fdbclient/ConfigTransactionInterface.h +++ b/fdbclient/ConfigTransactionInterface.h @@ -101,6 +101,9 @@ struct ConfigTransactionCommitRequest { size_t expectedSize() const { return mutations.expectedSize() + annotation.expectedSize(); } + void set(KeyRef key, ValueRef value); + void clear(KeyRef key); + template void serialize(Ar& ar) { serializer(ar, arena, generation, mutations, annotation, reply); diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index f6ac8b69e9..ac94b70b1b 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -18,24 +18,125 @@ * limitations under the License. */ +#include "fdbclient/DatabaseContext.h" #include "fdbclient/PaxosConfigTransaction.h" #include "flow/actorcompiler.h" // must be last include -class PaxosConfigTransactionImpl {}; +class PaxosConfigTransactionImpl { + ConfigTransactionCommitRequest toCommit; + Future getGenerationFuture; + std::vector ctis; + int numRetries{ 0 }; + bool committed{ false }; + Optional dID; + Database cx; + + ACTOR static Future getGeneration(PaxosConfigTransactionImpl* self) { + state std::vector> getGenerationFutures; + getGenerationFutures.reserve(self->ctis.size()); + for (auto const& cti : self->ctis) { + getGenerationFutures.push_back(cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{})); + } + // FIXME: Must tolerate failures and disagreement + wait(waitForAll(getGenerationFutures)); + return getGenerationFutures[0].get().generation; + } + + ACTOR static Future> get(PaxosConfigTransactionImpl* self, Key key) { + if (!self->getGenerationFuture.isValid()) { + self->getGenerationFuture = getGeneration(self); + } + state ConfigKey configKey = ConfigKey::decodeKey(key); + ConfigGeneration generation = wait(self->getGenerationFuture); + // TODO: Load balance + ConfigTransactionGetReply reply = + wait(self->ctis[0].get.getReply(ConfigTransactionGetRequest{ generation, configKey })); + if (reply.value.present()) { + return reply.value.get().toValue(); + } else { + return Optional{}; + } + } + +public: + Future getReadVersion() { + if (!getGenerationFuture.isValid()) { + getGenerationFuture = getGeneration(this); + } + return map(getGenerationFuture, [](auto const& gen) { return gen.committedVersion; }); + } + + Optional getCachedReadVersion() const { + if (getGenerationFuture.isValid() && getGenerationFuture.isReady() && !getGenerationFuture.isError()) { + return getGenerationFuture.get().liveVersion; + } else { + return {}; + } + } + + Version getCommittedVersion() const { return committed ? getGenerationFuture.get().liveVersion : ::invalidVersion; } + + int64_t getApproximateSize() const { return toCommit.expectedSize(); } + + void set(KeyRef key, ValueRef value) { toCommit.set(key, value); } + + void clear(KeyRef key) { toCommit.clear(key); } + + Future> get(Key const& key) { return get(this, key); } + + Future onError(Error const& e) { + // TODO: Improve this: + if (e.code() == error_code_transaction_too_old) { + reset(); + return delay((1 << numRetries++) * 0.01 * deterministicRandom()->random01()); + } + throw e; + } + + void debugTransaction(UID dID) { this->dID = dID; } + + void reset() { + getGenerationFuture = Future{}; + toCommit = {}; + committed = false; + } + + void fullReset() { + numRetries = 0; + dID = {}; + reset(); + } + + void checkDeferredError(Error const& deferredError) const { + if (deferredError.code() != invalid_error_code) { + throw deferredError; + } + if (cx.getPtr()) { + cx->checkDeferredError(); + } + } + + PaxosConfigTransactionImpl(Database const& cx) : cx(cx) { + auto coordinators = cx->getConnectionFile()->getConnectionString().coordinators(); + ctis.reserve(coordinators.size()); + for (const auto& coordinator : coordinators) { + ctis.emplace_back(coordinator); + } + } + + PaxosConfigTransactionImpl(std::vector const& ctis) : ctis(ctis) {} +}; Future PaxosConfigTransaction::getReadVersion() { - // TODO: Implement - return ::invalidVersion; + return impl().getReadVersion(); } Optional PaxosConfigTransaction::getCachedReadVersion() const { - // TODO: Implement - return ::invalidVersion; + return impl().getCachedReadVersion(); } -Future> PaxosConfigTransaction::get(Key const& key, Snapshot snapshot) { - // TODO: Implement - return Optional{}; +Future> PaxosConfigTransaction::get(Key const& key, Snapshot) { + return impl().get(key); } Future> PaxosConfigTransaction::getRange(KeySelector const& begin, @@ -59,13 +160,11 @@ Future> PaxosConfigTransaction::getRange(KeySelector } void PaxosConfigTransaction::set(KeyRef const& key, ValueRef const& value) { - // TODO: Implememnt - ASSERT(false); + return impl().set(key, value); } void PaxosConfigTransaction::clear(KeyRef const& key) { - // TODO: Implememnt - ASSERT(false); + return impl().clear(key); } Future PaxosConfigTransaction::commit() { @@ -75,61 +174,49 @@ Future PaxosConfigTransaction::commit() { } Version PaxosConfigTransaction::getCommittedVersion() const { - // TODO: Implement - ASSERT(false); - return ::invalidVersion; + return impl().getCommittedVersion(); } int64_t PaxosConfigTransaction::getApproximateSize() const { - // TODO: Implement - ASSERT(false); - return 0; + return impl().getApproximateSize(); } void PaxosConfigTransaction::setOption(FDBTransactionOptions::Option option, Optional value) { - // TODO: Implement - ASSERT(false); + // TODO: Support using this option to determine atomicity } Future PaxosConfigTransaction::onError(Error const& e) { - // TODO: Implement - ASSERT(false); - return Void(); + return impl().onError(e); } void PaxosConfigTransaction::cancel() { - // TODO: Implement - ASSERT(false); + // TODO: Implement someday + throw client_invalid_operation(); } void PaxosConfigTransaction::reset() { - // TODO: Implement - ASSERT(false); + impl().reset(); } void PaxosConfigTransaction::fullReset() { - // TODO: Implement - ASSERT(false); + impl().fullReset(); } void PaxosConfigTransaction::debugTransaction(UID dID) { - // TODO: Implement - ASSERT(false); + impl().debugTransaction(dID); } void PaxosConfigTransaction::checkDeferredError() const { - // TODO: Implement - ASSERT(false); + impl().checkDeferredError(deferredError); } -PaxosConfigTransaction::PaxosConfigTransaction() { - // TODO: Implement - ASSERT(false); -} +PaxosConfigTransaction::PaxosConfigTransaction(std::vector const& ctis) + : _impl(std::make_unique(ctis)) {} + +PaxosConfigTransaction::PaxosConfigTransaction() = default; PaxosConfigTransaction::~PaxosConfigTransaction() = default; void PaxosConfigTransaction::setDatabase(Database const& cx) { - // TODO: Implement - ASSERT(false); + _impl = std::make_unique(cx); } diff --git a/fdbclient/PaxosConfigTransaction.h b/fdbclient/PaxosConfigTransaction.h index 7c68fcba05..8a9bdd7ebe 100644 --- a/fdbclient/PaxosConfigTransaction.h +++ b/fdbclient/PaxosConfigTransaction.h @@ -33,6 +33,7 @@ class PaxosConfigTransaction final : public IConfigTransaction, public FastAlloc PaxosConfigTransactionImpl& impl() { return *_impl; } public: + PaxosConfigTransaction(std::vector const&); PaxosConfigTransaction(); ~PaxosConfigTransaction(); void setDatabase(Database const&) override; diff --git a/fdbclient/SimpleConfigTransaction.actor.cpp b/fdbclient/SimpleConfigTransaction.actor.cpp index 70e8ec7b0d..8511716e04 100644 --- a/fdbclient/SimpleConfigTransaction.actor.cpp +++ b/fdbclient/SimpleConfigTransaction.actor.cpp @@ -124,25 +124,9 @@ public: SimpleConfigTransactionImpl(ConfigTransactionInterface const& cti) : cti(cti) {} - void set(KeyRef key, ValueRef value) { - if (key == configTransactionDescriptionKey) { - toCommit.annotation.description = KeyRef(toCommit.arena, value); - } else { - ConfigKey configKey = ConfigKeyRef::decodeKey(key); - auto knobValue = IKnobCollection::parseKnobValue( - configKey.knobName.toString(), value.toString(), IKnobCollection::Type::TEST); - toCommit.mutations.emplace_back_deep(toCommit.arena, configKey, knobValue.contents()); - } - } + void set(KeyRef key, ValueRef value) { toCommit.set(key, value); } - void clear(KeyRef key) { - if (key == configTransactionDescriptionKey) { - toCommit.annotation.description = ""_sr; - } else { - toCommit.mutations.emplace_back_deep( - toCommit.arena, ConfigKeyRef::decodeKey(key), Optional{}); - } - } + void clear(KeyRef key) { toCommit.clear(key); } Future> get(KeyRef key) { return get(this, key); }