From 703107332ad87aa7a489801f610b5f5a3beaf82d Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Fri, 19 Feb 2021 12:22:00 -0800 Subject: [PATCH] Add special keys for profile client get/set --- fdbclient/NativeAPI.actor.cpp | 5 + fdbclient/SpecialKeySpace.actor.cpp | 98 +++++++++++++++- fdbclient/SpecialKeySpace.actor.h | 7 ++ .../SpecialKeySpaceCorrectness.actor.cpp | 110 +++++++++++++++++- 4 files changed, 216 insertions(+), 4 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 604128d530..8704263214 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -955,6 +955,11 @@ DatabaseContext::DatabaseContext(Reference( singleKeyRange(LiteralStringRef("min_required_commit_version")) .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 5201342dee..dca1156bae 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -21,6 +21,7 @@ #include "boost/lexical_cast.hpp" #include "fdbclient/SpecialKeySpace.actor.h" +#include "flow/Arena.h" #include "flow/UnitTest.h" #include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/StatusClient.h" @@ -59,7 +60,9 @@ std::unordered_map SpecialKeySpace::managementApiCommandT { "consistencycheck", singleKeyRange(LiteralStringRef("consistency_check_suspended")) .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, { "advanceversion", singleKeyRange(LiteralStringRef("min_required_commit_version")) - .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, + { "profile", KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0")) + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } }; std::set SpecialKeySpace::options = { "excluded/force", "failed/force" }; @@ -1415,4 +1418,97 @@ Future> AdvanceVersionImpl::commit(ReadYourWritesTransacti ryw->getTransaction().clear(minRequiredCommitVersionKey); } return Optional(); +} + +ClientProfilingImpl::ClientProfilingImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +ACTOR static Future> ClientProfilingGetRangeActor(ReadYourWritesTransaction* ryw, + KeyRef prefix, KeyRangeRef kr) { + state Standalone result; + // client_txn_sample_rate + state Key sampleRateKey = LiteralStringRef("client_txn_sample_rate").withPrefix(prefix); + if (kr.contains(sampleRateKey)) { + auto entry = ryw->getSpecialKeySpaceWriteMap()[sampleRateKey]; + if (!ryw->readYourWritesDisabled() && entry.first) { + if (entry.second.present()) + result.push_back_deep(result.arena(), KeyValueRef(sampleRateKey, entry.second.get())); + // check clear is forbidden + } else { + Optional f = wait(ryw->getTransaction().get(fdbClientInfoTxnSampleRate)); + std::string sampleRateStr = "default"; + if (f.present()) { + const double sampleRateDbl = BinaryReader::fromStringRef(f.get(), Unversioned()); + if (!std::isinf(sampleRateDbl)) { + sampleRateStr = boost::lexical_cast(sampleRateDbl); + } + } + result.push_back_deep(result.arena(), KeyValueRef(sampleRateKey, Value(sampleRateStr))); + } + } + // client_txn_size_limit + state Key txnSizeLimitKey = LiteralStringRef("client_txn_size_limit").withPrefix(prefix); + if (kr.contains(txnSizeLimitKey)) { + auto entry = ryw->getSpecialKeySpaceWriteMap()[txnSizeLimitKey]; + if (!ryw->readYourWritesDisabled() && entry.first) { + if (entry.second.present()) + result.push_back_deep(result.arena(), KeyValueRef(txnSizeLimitKey, entry.second.get())); + // check clear is forbidden + } else { + Optional f = wait(ryw->getTransaction().get(fdbClientInfoTxnSizeLimit)); + std::string sizeLimitStr = "default"; + if (f.present()) { + const int64_t sizeLimit = BinaryReader::fromStringRef(f.get(), Unversioned()); + if (sizeLimit != -1) { + sizeLimitStr = boost::lexical_cast(sizeLimit); + } + } + result.push_back_deep(result.arena(), KeyValueRef(txnSizeLimitKey, Value(sizeLimitStr))); + } + } + return result; +} + +Future> ClientProfilingImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + return ClientProfilingGetRangeActor(ryw, getKeyRange().begin, kr); +} + +Future> ClientProfilingImpl::commit(ReadYourWritesTransaction* ryw) { + // client_txn_sample_rate + Key sampleRateKey = LiteralStringRef("client_txn_sample_rate").withPrefix(getKeyRange().begin); + auto rateEntry = ryw->getSpecialKeySpaceWriteMap()[sampleRateKey]; + + if (rateEntry.first && rateEntry.second.present()) { + std::string sampleRateStr = rateEntry.second.get().toString(); + double sampleRate; + if (sampleRateStr == "default") + sampleRate = std::numeric_limits::infinity(); + else { + try { + sampleRate = boost::lexical_cast(sampleRateStr); + } catch (boost::bad_lexical_cast& e) { + return Optional(ManagementAPIError::toJsonString( + false, "profile", "Invalid transaction sample rate(double): " + sampleRateStr)); + } + } + ryw->getTransaction().set(fdbClientInfoTxnSampleRate, BinaryWriter::toValue(sampleRate, Unversioned())); + } + // client_txn_size_limit + Key txnSizeLimitKey = LiteralStringRef("client_txn_size_limit").withPrefix(getKeyRange().begin); + auto sizeLimitEntry = ryw->getSpecialKeySpaceWriteMap()[txnSizeLimitKey]; + if (sizeLimitEntry.first && sizeLimitEntry.second.present()) { + std::string sizeLimitStr = sizeLimitEntry.second.get().toString(); + int64_t sizeLimit; + if (sizeLimitStr == "default") + sizeLimit = -1; + else { + try { + sizeLimit = boost::lexical_cast(sizeLimitStr); + } catch (boost::bad_lexical_cast& e) { + return Optional(ManagementAPIError::toJsonString( + false, "profile", "Invalid transaction size limit(int64_t): " + sizeLimitStr)); + } + } + ryw->getTransaction().set(fdbClientInfoTxnSizeLimit, BinaryWriter::toValue(sizeLimit, Unversioned())); + } + return Optional(); } \ No newline at end of file diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index bfbe71a118..d5504b0bc9 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -339,5 +339,12 @@ public: Future> commit(ReadYourWritesTransaction* ryw) override; }; +class ClientProfilingImpl : public SpecialKeyRangeRWImpl { +public: + explicit ClientProfilingImpl(KeyRangeRef kr); + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; + Future> commit(ReadYourWritesTransaction* ryw) override; +}; + #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index f087720706..16fb507829 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -18,6 +18,8 @@ * limitations under the License. */ +#include "boost/lexical_cast.hpp" + #include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/ReadYourWrites.h" @@ -25,6 +27,8 @@ #include "fdbclient/SpecialKeySpace.actor.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" +#include "flow/Arena.h" +#include "flow/IRandom.h" #include "flow/actorcompiler.h" struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { @@ -434,9 +438,10 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { } // test case when registered range is the same as the underlying module try { - state Standalone result = wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), - LiteralStringRef("\xff\xff/worker_interfaces0")), - CLIENT_KNOBS->TOO_MANY)); + state Standalone result = + wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), + LiteralStringRef("\xff\xff/worker_interfaces0")), + CLIENT_KNOBS->TOO_MANY)); // We should have at least 1 process in the cluster ASSERT(result.size()); state KeyValueRef entry = deterministicRandom()->randomChoice(result); @@ -902,6 +907,105 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { } catch (Error& e) { wait(tx->onError(e)); } + // profile client get + loop { + try { + tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + // client_txn_sample_rate + state Optional txnSampleRate = + wait(tx->get(LiteralStringRef("client_txn_sample_rate") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")))); + ASSERT(txnSampleRate.present()); + Optional txnSampleRateKey = wait(tx->get(fdbClientInfoTxnSampleRate)); + if (txnSampleRateKey.present()) { + const double sampleRateDbl = + BinaryReader::fromStringRef(txnSampleRateKey.get(), Unversioned()); + if (!std::isinf(sampleRateDbl)) { + ASSERT(txnSampleRate.get().toString() == boost::lexical_cast(sampleRateDbl)); + } else { + ASSERT(txnSampleRate.get().toString() == "default"); + } + } else { + ASSERT(txnSampleRate.get().toString() == "default"); + } + // client_txn_size_limit + state Optional txnSizeLimit = + wait(tx->get(LiteralStringRef("client_txn_size_limit") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")))); + ASSERT(txnSizeLimit.present()); + Optional txnSizeLimitKey = wait(tx->get(fdbClientInfoTxnSizeLimit)); + if (txnSizeLimitKey.present()) { + const int64_t sizeLimit = + BinaryReader::fromStringRef(txnSizeLimitKey.get(), Unversioned()); + if (sizeLimit != -1) { + ASSERT(txnSizeLimit.get().toString() == boost::lexical_cast(sizeLimit)); + } else { + ASSERT(txnSizeLimit.get().toString() == "default"); + } + } else { + ASSERT(txnSizeLimit.get().toString() == "default"); + } + tx->reset(); + break; + } catch (Error& e) { + TraceEvent(SevDebug, "ProfileClientGet").error(e); + wait(tx->onError(e)); + } + } + { + state double r_sample_rate = deterministicRandom()->random01(); + state int64_t r_size_limit = deterministicRandom()->randomInt64(1e3, 1e6); + // update the sample rate and size limit + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tx->set(LiteralStringRef("client_txn_sample_rate") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")), + Value(boost::lexical_cast(r_sample_rate))); + tx->set(LiteralStringRef("client_txn_size_limit") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")), + Value(boost::lexical_cast(r_size_limit))); + wait(tx->commit()); + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + // commit successfully, verify the system key changed + loop { + try { + tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + Optional sampleRate = wait(tx->get(fdbClientInfoTxnSampleRate)); + ASSERT(sampleRate.present()); + ASSERT(r_sample_rate == BinaryReader::fromStringRef(sampleRate.get(), Unversioned())); + Optional sizeLimit = wait(tx->get(fdbClientInfoTxnSizeLimit)); + ASSERT(sizeLimit.present()); + ASSERT(r_size_limit == BinaryReader::fromStringRef(sizeLimit.get(), Unversioned())); + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + // Change back to default + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tx->set(LiteralStringRef("client_txn_sample_rate") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")), + LiteralStringRef("default")); + tx->set(LiteralStringRef("client_txn_size_limit") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")), + LiteralStringRef("default")); + wait(tx->commit()); + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + } return Void(); } };