Add migration for client profiling keys
This commit is contained in:
parent
2594d91f11
commit
51e4c19675
|
@ -26,7 +26,7 @@
|
|||
#include "flow/flow.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("config/fdb_client_info/client_txn_sample_rate");
|
||||
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_info/client_txn_size_limit");
|
||||
|
@ -117,6 +117,49 @@ void GlobalConfig::erase(KeyRangeRef range) {
|
|||
}
|
||||
}
|
||||
|
||||
// Older FDB versions used different keys for client profiling data. This
|
||||
// function performs a one-time migration of data in these keys to the new
|
||||
// global configuration key space.
|
||||
ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
state Key migratedKey("\xff\x02/fdbClientInfo/migrated/"_sr);
|
||||
state Optional<Value> migrated = wait(tr->get(migratedKey));
|
||||
if (migrated.present()) {
|
||||
// Already performed migration.
|
||||
return Void();
|
||||
}
|
||||
|
||||
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
|
||||
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
// The value doesn't matter too much, as long as the key is set.
|
||||
tr->set(migratedKey.contents(), "1"_sr);
|
||||
if (sampleRate.present()) {
|
||||
const double sampleRateDbl =
|
||||
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
|
||||
Tuple rate = Tuple().appendDouble(sampleRateDbl);
|
||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
|
||||
}
|
||||
if (sizeLimit.present()) {
|
||||
const int64_t sizeLimitInt =
|
||||
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
|
||||
Tuple size = Tuple().append(sizeLimitInt);
|
||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
|
||||
}
|
||||
|
||||
wait(tr->commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Updates local copy of global configuration by reading the entire key-range
|
||||
// from storage.
|
||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
||||
|
@ -134,6 +177,8 @@ ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
|||
// Applies updates to the local copy of the global configuration when this
|
||||
// process receives an updated history.
|
||||
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
wait(self->migrate(self));
|
||||
|
||||
wait(self->refresh(self));
|
||||
self->initialized.send(Void());
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ private:
|
|||
// of the global configuration keyspace.
|
||||
void erase(KeyRangeRef range);
|
||||
|
||||
ACTOR static Future<Void> migrate(GlobalConfig* self);
|
||||
ACTOR static Future<Void> refresh(GlobalConfig* self);
|
||||
ACTOR static Future<Void> updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
||||
|
||||
|
|
|
@ -1377,8 +1377,7 @@ GlobalConfigImpl::GlobalConfigImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {
|
|||
// framework within the range specified. The special-key-space getrange
|
||||
// function should only be used for informational purposes. All values are
|
||||
// returned as strings regardless of their true type.
|
||||
Future<Standalone<RangeResultRef>> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
Future<Standalone<RangeResultRef>> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
Standalone<RangeResultRef> result;
|
||||
|
||||
auto& globalConfig = GlobalConfig::globalConfig();
|
||||
|
@ -1417,7 +1416,8 @@ void GlobalConfigImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, co
|
|||
// Writes global configuration changes to durable memory. Also writes the
|
||||
// changes made in the transaction to a recent history set, and updates the
|
||||
// latest version which the global configuration was updated at.
|
||||
ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* globalConfig, ReadYourWritesTransaction* ryw) {
|
||||
ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* globalConfig,
|
||||
ReadYourWritesTransaction* ryw) {
|
||||
state Transaction& tr = ryw->getTransaction();
|
||||
|
||||
// History should only contain three most recent updates. If it currently
|
||||
|
@ -1494,8 +1494,7 @@ void GlobalConfigImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key)
|
|||
|
||||
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
Standalone<RangeResultRef> result;
|
||||
for (const auto& option : SpecialKeySpace::getTracingOptions()) {
|
||||
auto key = getKeyRange().begin.withSuffix(option);
|
||||
|
|
Loading…
Reference in New Issue