This commit is contained in:
Lukas Joswiak 2021-05-07 16:28:39 -07:00
parent 71ba6f4501
commit 3f6ef14384
1 changed files with 25 additions and 29 deletions

View File

@ -145,36 +145,32 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}
wait(tr->commit());
return Void();
} catch (Error& e) {
TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what());
if (e.code() == error_code_not_committed) {
// If multiple fdbserver processes are started at once, they
// will all attempt this migration at the same time, sometimes
// resulting in aborts due to conflicts. To avoid continuous
// contention, just return if a conflict error occurs.
return Void();
}
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}
wait(tr->commit());
return Void();
} catch (Error& e) {
// If multiple fdbserver processes are started at once, they will all
// attempt this migration at the same time, sometimes resulting in
// aborts due to conflicts. Purposefully avoid retrying, making this
// migration best-effort.
TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what());
throw;
}
}