Add comments, fix erase bug, make optimizations
This commit is contained in:
parent
c38ddf5eb7
commit
7de23918c0
|
@ -98,12 +98,12 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
|||
}
|
||||
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GlobalConfigTupleError").detail("What", e.what());
|
||||
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void GlobalConfig::erase(KeyRef key) {
|
||||
erase(KeyRangeRef(key, keyAfter(key)));
|
||||
data.erase(key);
|
||||
}
|
||||
|
||||
void GlobalConfig::erase(KeyRangeRef range) {
|
||||
|
@ -120,6 +120,8 @@ void GlobalConfig::erase(KeyRangeRef range) {
|
|||
// Updates local copy of global configuration by reading the entire key-range
|
||||
// from storage.
|
||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
||||
self->data.clear();
|
||||
|
||||
Transaction tr(self->cx);
|
||||
Standalone<RangeResultRef> result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
for (const auto& kv : result) {
|
||||
|
|
|
@ -39,7 +39,8 @@
|
|||
|
||||
// The global configuration is a series of typed key-value pairs synced to all
|
||||
// nodes (server and client) in an FDB cluster in an eventually consistent
|
||||
// manner.
|
||||
// manner. Only small key-value pairs should be stored in global configuration;
|
||||
// an excessive amount of data can cause synchronization slowness.
|
||||
|
||||
// Keys
|
||||
extern const KeyRef fdbClientInfoTxnSampleRate;
|
||||
|
@ -59,11 +60,8 @@ struct ConfigValue : ReferenceCounted<ConfigValue> {
|
|||
ConfigValue(Arena&& a, std::any&& v) : arena(a), value(v) {}
|
||||
};
|
||||
|
||||
class GlobalConfig {
|
||||
class GlobalConfig : NonCopyable {
|
||||
public:
|
||||
GlobalConfig(const GlobalConfig&) = delete;
|
||||
GlobalConfig& operator=(const GlobalConfig&) = delete;
|
||||
|
||||
// Creates a GlobalConfig singleton, accessed by calling GlobalConfig().
|
||||
// This function should only be called once by each process (however, it is
|
||||
// idempotent and calling it multiple times will have no effect).
|
||||
|
@ -106,10 +104,11 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
// To write into the global configuration, submit a transaction to
|
||||
// \xff\xff/global_config/<your-key> with <your-value> encoded using the
|
||||
// FDB tuple typecodes. Use the helper function `prefixedKey` to correctly
|
||||
// prefix your global configuration key.
|
||||
// Trying to write into the global configuration keyspace? To write data,
|
||||
// submit a transaction to \xff\xff/global_config/<your-key> with
|
||||
// <your-value> encoded using the FDB tuple typecodes. Use the helper
|
||||
// function `prefixedKey` to correctly prefix your global configuration
|
||||
// key.
|
||||
|
||||
// Triggers the returned future when the global configuration singleton has
|
||||
// been created and is ready.
|
||||
|
|
|
@ -5380,9 +5380,8 @@ void Transaction::checkDeferredError() {
|
|||
|
||||
Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) {
|
||||
if (!cx->isError()) {
|
||||
double sampleRate = GlobalConfig::globalConfig().get<double>(fdbClientInfoTxnSampleRate,
|
||||
std::numeric_limits<double>::infinity());
|
||||
double clientSamplingProbability = std::isinf(sampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : sampleRate;
|
||||
double clientSamplingProbability = GlobalConfig::globalConfig().get<double>(
|
||||
fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY);
|
||||
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
|
||||
deterministicRandom()->random01() < clientSamplingProbability &&
|
||||
(!g_network->isSimulated() || !g_simulator.speedUpSimulation)) {
|
||||
|
|
|
@ -1441,14 +1441,15 @@ ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* gl
|
|||
while (iter != ranges.end()) {
|
||||
std::pair<bool, Optional<Value>> entry = iter->value();
|
||||
if (entry.first) {
|
||||
if (entry.second.present()) {
|
||||
if (entry.second.present() && iter->begin().startsWith(globalConfig->getKeyRange().begin)) {
|
||||
Key bareKey = iter->begin().removePrefix(globalConfig->getKeyRange().begin);
|
||||
vh.mutations.emplace_back_deep(vh.mutations.arena(),
|
||||
MutationRef(MutationRef::SetValue, bareKey, entry.second.get()));
|
||||
|
||||
Key systemKey = bareKey.withPrefix(globalConfigKeysPrefix);
|
||||
tr.set(systemKey, entry.second.get());
|
||||
} else {
|
||||
} else if (!entry.second.present() && iter->range().begin.startsWith(globalConfig->getKeyRange().begin) &&
|
||||
iter->range().end.startsWith(globalConfig->getKeyRange().begin)) {
|
||||
KeyRef bareRangeBegin = iter->range().begin.removePrefix(globalConfig->getKeyRange().begin);
|
||||
KeyRef bareRangeEnd = iter->range().end.removePrefix(globalConfig->getKeyRange().begin);
|
||||
vh.mutations.emplace_back_deep(vh.mutations.arena(),
|
||||
|
|
|
@ -249,9 +249,9 @@ extern const KeyRef globalConfigKeysPrefix;
|
|||
extern const KeyRangeRef globalConfigHistoryKeys;
|
||||
extern const KeyRef globalConfigHistoryPrefix;
|
||||
|
||||
// "\xff/globalConfig/v" := "version,protocol"
|
||||
// Read-only key which returns the version and protocol of the most recent
|
||||
// data written to the global configuration keyspace.
|
||||
// "\xff/globalConfig/v" := "version"
|
||||
// Read-only key which returns the commit version of the most recent mutation
|
||||
// made to the global configuration keyspace.
|
||||
extern const KeyRef globalConfigVersionKey;
|
||||
|
||||
// "\xff/workers/[[processID]]" := ""
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "fdbclient/Tuple.h"
|
||||
|
||||
// TODO: Many functions copied from bindings/flow/Tuple.cpp. Merge at some point.
|
||||
static float bigEndianFloat(float orig) {
|
||||
int32_t big = *(int32_t*)&orig;
|
||||
big = bigEndian32(big);
|
||||
|
@ -32,7 +33,7 @@ static double bigEndianDouble(double orig) {
|
|||
return *(double*)&big;
|
||||
}
|
||||
|
||||
static size_t find_string_terminator(const StringRef data, size_t offset) {
|
||||
static size_t findStringTerminator(const StringRef data, size_t offset) {
|
||||
size_t i = offset;
|
||||
while (i < data.size() - 1 && !(data[i] == '\x00' && data[i + 1] != (uint8_t)'\xff')) {
|
||||
i += (data[i] == '\x00' ? 2 : 1);
|
||||
|
@ -44,7 +45,7 @@ static size_t find_string_terminator(const StringRef data, size_t offset) {
|
|||
// If encoding and the sign bit is 1 (the number is negative), flip all the bits.
|
||||
// If decoding and the sign bit is 0 (the number is negative), flip all the bits.
|
||||
// Otherwise, the number is positive, so flip the sign bit.
|
||||
static void adjust_floating_point(uint8_t* bytes, size_t size, bool encode) {
|
||||
static void adjustFloatingPoint(uint8_t* bytes, size_t size, bool encode) {
|
||||
if ((encode && ((uint8_t)(bytes[0] & 0x80) != (uint8_t)0x00)) ||
|
||||
(!encode && ((uint8_t)(bytes[0] & 0x80) != (uint8_t)0x80))) {
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
|
@ -63,7 +64,7 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
|
|||
offsets.push_back(i);
|
||||
|
||||
if (data[i] == '\x01' || data[i] == '\x02') {
|
||||
i = find_string_terminator(str, i + 1) + 1;
|
||||
i = findStringTerminator(str, i + 1) + 1;
|
||||
} else if (data[i] >= '\x0c' && data[i] <= '\x1c') {
|
||||
i += abs(data[i] - '\x14') + 1;
|
||||
} else if (data[i] == 0x20) {
|
||||
|
@ -147,7 +148,7 @@ Tuple& Tuple::appendFloat(float value) {
|
|||
offsets.push_back(data.size());
|
||||
float swap = bigEndianFloat(value);
|
||||
uint8_t* bytes = (uint8_t*)&swap;
|
||||
adjust_floating_point(bytes, sizeof(float), true);
|
||||
adjustFloatingPoint(bytes, sizeof(float), true);
|
||||
|
||||
data.push_back(data.arena(), 0x20);
|
||||
data.append(data.arena(), bytes, sizeof(float));
|
||||
|
@ -159,7 +160,7 @@ Tuple& Tuple::appendDouble(double value) {
|
|||
double swap = value;
|
||||
swap = bigEndianDouble(swap);
|
||||
uint8_t* bytes = (uint8_t*)&swap;
|
||||
adjust_floating_point(bytes, sizeof(double), true);
|
||||
adjustFloatingPoint(bytes, sizeof(double), true);
|
||||
|
||||
data.push_back(data.arena(), 0x21);
|
||||
data.append(data.arena(), bytes, sizeof(double));
|
||||
|
@ -300,7 +301,7 @@ float Tuple::getFloat(size_t index) const {
|
|||
uint8_t* bytes = (uint8_t*)&swap;
|
||||
ASSERT_LE(offsets[index] + 1 + sizeof(float), data.size());
|
||||
swap = *(float*)(data.begin() + offsets[index] + 1);
|
||||
adjust_floating_point(bytes, sizeof(float), false);
|
||||
adjustFloatingPoint(bytes, sizeof(float), false);
|
||||
|
||||
return bigEndianFloat(swap);
|
||||
}
|
||||
|
@ -319,7 +320,7 @@ double Tuple::getDouble(size_t index) const {
|
|||
uint8_t* bytes = (uint8_t*)&swap;
|
||||
ASSERT_LE(offsets[index] + 1 + sizeof(double), data.size());
|
||||
swap = *(double*)(data.begin() + offsets[index] + 1);
|
||||
adjust_floating_point(bytes, sizeof(double), false);
|
||||
adjustFloatingPoint(bytes, sizeof(double), false);
|
||||
|
||||
return bigEndianDouble(swap);
|
||||
}
|
||||
|
|
|
@ -3245,15 +3245,29 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
|||
clientInfo.history.push_back(std::move(vh));
|
||||
}
|
||||
|
||||
if (clientInfo.history.size() > 0) {
|
||||
// The first item in the historical list of mutations
|
||||
// is only used to:
|
||||
// a) Recognize that some historical changes may have
|
||||
// been missed, and the entire global
|
||||
// configuration keyspace needs to be read, or..
|
||||
// b) Check which historical updates have already
|
||||
// been applied. If this is the case, the first
|
||||
// history item must have a version greater than
|
||||
// or equal to whatever version the global
|
||||
// configuration was last updated at, and
|
||||
// therefore won't need to be applied again.
|
||||
clientInfo.history[0].mutations = Standalone<VectorRef<MutationRef>>();
|
||||
}
|
||||
|
||||
clientInfo.id = deterministicRandom()->randomUniqueID();
|
||||
db->clientInfo->set(clientInfo);
|
||||
}
|
||||
|
||||
state Future<Void> globalConfigFuture = tr.watch(globalConfigVersionKey);
|
||||
wait(tr.commit());
|
||||
choose {
|
||||
when (wait(globalConfigFuture)) { break; }
|
||||
}
|
||||
wait(globalConfigFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue