Add special keys for profile client get/set
This commit is contained in:
parent
ac860b3c22
commit
703107332a
|
@ -955,6 +955,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
std::make_unique<AdvanceVersionImpl>(
|
||||
singleKeyRange(LiteralStringRef("min_required_commit_version"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<ClientProfilingImpl>(
|
||||
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
}
|
||||
if (apiVersionAtLeast(630)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "boost/lexical_cast.hpp"
|
||||
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/StatusClient.h"
|
||||
|
@ -59,6 +60,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
|
|||
{ "consistencycheck", singleKeyRange(LiteralStringRef("consistency_check_suspended"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "advanceversion", singleKeyRange(LiteralStringRef("min_required_commit_version"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "profile", KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
};
|
||||
|
||||
|
@ -1416,3 +1419,96 @@ Future<Optional<std::string>> AdvanceVersionImpl::commit(ReadYourWritesTransacti
|
|||
}
|
||||
return Optional<std::string>();
|
||||
}
|
||||
|
||||
ClientProfilingImpl::ClientProfilingImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> ClientProfilingGetRangeActor(ReadYourWritesTransaction* ryw,
|
||||
KeyRef prefix, KeyRangeRef kr) {
|
||||
state Standalone<RangeResultRef> result;
|
||||
// client_txn_sample_rate
|
||||
state Key sampleRateKey = LiteralStringRef("client_txn_sample_rate").withPrefix(prefix);
|
||||
if (kr.contains(sampleRateKey)) {
|
||||
auto entry = ryw->getSpecialKeySpaceWriteMap()[sampleRateKey];
|
||||
if (!ryw->readYourWritesDisabled() && entry.first) {
|
||||
if (entry.second.present())
|
||||
result.push_back_deep(result.arena(), KeyValueRef(sampleRateKey, entry.second.get()));
|
||||
// check clear is forbidden
|
||||
} else {
|
||||
Optional<Value> f = wait(ryw->getTransaction().get(fdbClientInfoTxnSampleRate));
|
||||
std::string sampleRateStr = "default";
|
||||
if (f.present()) {
|
||||
const double sampleRateDbl = BinaryReader::fromStringRef<double>(f.get(), Unversioned());
|
||||
if (!std::isinf(sampleRateDbl)) {
|
||||
sampleRateStr = boost::lexical_cast<std::string>(sampleRateDbl);
|
||||
}
|
||||
}
|
||||
result.push_back_deep(result.arena(), KeyValueRef(sampleRateKey, Value(sampleRateStr)));
|
||||
}
|
||||
}
|
||||
// client_txn_size_limit
|
||||
state Key txnSizeLimitKey = LiteralStringRef("client_txn_size_limit").withPrefix(prefix);
|
||||
if (kr.contains(txnSizeLimitKey)) {
|
||||
auto entry = ryw->getSpecialKeySpaceWriteMap()[txnSizeLimitKey];
|
||||
if (!ryw->readYourWritesDisabled() && entry.first) {
|
||||
if (entry.second.present())
|
||||
result.push_back_deep(result.arena(), KeyValueRef(txnSizeLimitKey, entry.second.get()));
|
||||
// check clear is forbidden
|
||||
} else {
|
||||
Optional<Value> f = wait(ryw->getTransaction().get(fdbClientInfoTxnSizeLimit));
|
||||
std::string sizeLimitStr = "default";
|
||||
if (f.present()) {
|
||||
const int64_t sizeLimit = BinaryReader::fromStringRef<int64_t>(f.get(), Unversioned());
|
||||
if (sizeLimit != -1) {
|
||||
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
|
||||
}
|
||||
}
|
||||
result.push_back_deep(result.arena(), KeyValueRef(txnSizeLimitKey, Value(sizeLimitStr)));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> ClientProfilingImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
return ClientProfilingGetRangeActor(ryw, getKeyRange().begin, kr);
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> ClientProfilingImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
// client_txn_sample_rate
|
||||
Key sampleRateKey = LiteralStringRef("client_txn_sample_rate").withPrefix(getKeyRange().begin);
|
||||
auto rateEntry = ryw->getSpecialKeySpaceWriteMap()[sampleRateKey];
|
||||
|
||||
if (rateEntry.first && rateEntry.second.present()) {
|
||||
std::string sampleRateStr = rateEntry.second.get().toString();
|
||||
double sampleRate;
|
||||
if (sampleRateStr == "default")
|
||||
sampleRate = std::numeric_limits<double>::infinity();
|
||||
else {
|
||||
try {
|
||||
sampleRate = boost::lexical_cast<double>(sampleRateStr);
|
||||
} catch (boost::bad_lexical_cast& e) {
|
||||
return Optional<std::string>(ManagementAPIError::toJsonString(
|
||||
false, "profile", "Invalid transaction sample rate(double): " + sampleRateStr));
|
||||
}
|
||||
}
|
||||
ryw->getTransaction().set(fdbClientInfoTxnSampleRate, BinaryWriter::toValue(sampleRate, Unversioned()));
|
||||
}
|
||||
// client_txn_size_limit
|
||||
Key txnSizeLimitKey = LiteralStringRef("client_txn_size_limit").withPrefix(getKeyRange().begin);
|
||||
auto sizeLimitEntry = ryw->getSpecialKeySpaceWriteMap()[txnSizeLimitKey];
|
||||
if (sizeLimitEntry.first && sizeLimitEntry.second.present()) {
|
||||
std::string sizeLimitStr = sizeLimitEntry.second.get().toString();
|
||||
int64_t sizeLimit;
|
||||
if (sizeLimitStr == "default")
|
||||
sizeLimit = -1;
|
||||
else {
|
||||
try {
|
||||
sizeLimit = boost::lexical_cast<int64_t>(sizeLimitStr);
|
||||
} catch (boost::bad_lexical_cast& e) {
|
||||
return Optional<std::string>(ManagementAPIError::toJsonString(
|
||||
false, "profile", "Invalid transaction size limit(int64_t): " + sizeLimitStr));
|
||||
}
|
||||
}
|
||||
ryw->getTransaction().set(fdbClientInfoTxnSizeLimit, BinaryWriter::toValue(sizeLimit, Unversioned()));
|
||||
}
|
||||
return Optional<std::string>();
|
||||
}
|
|
@ -339,5 +339,12 @@ public:
|
|||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
class ClientProfilingImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit ClientProfilingImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "boost/lexical_cast.hpp"
|
||||
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
|
@ -25,6 +27,8 @@
|
|||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
||||
|
@ -434,7 +438,8 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
// test case when registered range is the same as the underlying module
|
||||
try {
|
||||
state Standalone<RangeResultRef> result = wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
|
||||
state Standalone<RangeResultRef> result =
|
||||
wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
|
||||
LiteralStringRef("\xff\xff/worker_interfaces0")),
|
||||
CLIENT_KNOBS->TOO_MANY));
|
||||
// We should have at least 1 process in the cluster
|
||||
|
@ -902,6 +907,105 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
} catch (Error& e) {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
// profile client get
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
// client_txn_sample_rate
|
||||
state Optional<Value> txnSampleRate =
|
||||
wait(tx->get(LiteralStringRef("client_txn_sample_rate")
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
|
||||
ASSERT(txnSampleRate.present());
|
||||
Optional<Value> txnSampleRateKey = wait(tx->get(fdbClientInfoTxnSampleRate));
|
||||
if (txnSampleRateKey.present()) {
|
||||
const double sampleRateDbl =
|
||||
BinaryReader::fromStringRef<double>(txnSampleRateKey.get(), Unversioned());
|
||||
if (!std::isinf(sampleRateDbl)) {
|
||||
ASSERT(txnSampleRate.get().toString() == boost::lexical_cast<std::string>(sampleRateDbl));
|
||||
} else {
|
||||
ASSERT(txnSampleRate.get().toString() == "default");
|
||||
}
|
||||
} else {
|
||||
ASSERT(txnSampleRate.get().toString() == "default");
|
||||
}
|
||||
// client_txn_size_limit
|
||||
state Optional<Value> txnSizeLimit =
|
||||
wait(tx->get(LiteralStringRef("client_txn_size_limit")
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
|
||||
ASSERT(txnSizeLimit.present());
|
||||
Optional<Value> txnSizeLimitKey = wait(tx->get(fdbClientInfoTxnSizeLimit));
|
||||
if (txnSizeLimitKey.present()) {
|
||||
const int64_t sizeLimit =
|
||||
BinaryReader::fromStringRef<int64_t>(txnSizeLimitKey.get(), Unversioned());
|
||||
if (sizeLimit != -1) {
|
||||
ASSERT(txnSizeLimit.get().toString() == boost::lexical_cast<std::string>(sizeLimit));
|
||||
} else {
|
||||
ASSERT(txnSizeLimit.get().toString() == "default");
|
||||
}
|
||||
} else {
|
||||
ASSERT(txnSizeLimit.get().toString() == "default");
|
||||
}
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "ProfileClientGet").error(e);
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
{
|
||||
state double r_sample_rate = deterministicRandom()->random01();
|
||||
state int64_t r_size_limit = deterministicRandom()->randomInt64(1e3, 1e6);
|
||||
// update the sample rate and size limit
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tx->set(LiteralStringRef("client_txn_sample_rate")
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
|
||||
Value(boost::lexical_cast<std::string>(r_sample_rate)));
|
||||
tx->set(LiteralStringRef("client_txn_size_limit")
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
|
||||
Value(boost::lexical_cast<std::string>(r_size_limit)));
|
||||
wait(tx->commit());
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// commit successfully, verify the system key changed
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
Optional<Value> sampleRate = wait(tx->get(fdbClientInfoTxnSampleRate));
|
||||
ASSERT(sampleRate.present());
|
||||
ASSERT(r_sample_rate == BinaryReader::fromStringRef<double>(sampleRate.get(), Unversioned()));
|
||||
Optional<Value> sizeLimit = wait(tx->get(fdbClientInfoTxnSizeLimit));
|
||||
ASSERT(sizeLimit.present());
|
||||
ASSERT(r_size_limit == BinaryReader::fromStringRef<int64_t>(sizeLimit.get(), Unversioned()));
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
// Change back to default
|
||||
loop {
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tx->set(LiteralStringRef("client_txn_sample_rate")
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
|
||||
LiteralStringRef("default"));
|
||||
tx->set(LiteralStringRef("client_txn_size_limit")
|
||||
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
|
||||
LiteralStringRef("default"));
|
||||
wait(tx->commit());
|
||||
tx->reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tx->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue