Add arena per object in global config
This commit is contained in:
parent
e5e48da5ce
commit
b7cd8175be
|
@ -26,19 +26,21 @@
|
|||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("fdbClientInfo/client_txn_sample_rate");
|
||||
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("fdbClientInfo/client_txn_size_limit");
|
||||
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("config/fdbClientInfo/client_txn_sample_rate");
|
||||
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdbClientInfo/client_txn_size_limit");
|
||||
|
||||
const KeyRef transactionTagSampleRate = LiteralStringRef("transactionTagSampleRate");
|
||||
const KeyRef transactionTagSampleCost = LiteralStringRef("transactionTagSampleCost");
|
||||
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transactionTagSampleRate");
|
||||
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transactionTagSampleCost");
|
||||
|
||||
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
|
||||
|
||||
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
||||
auto config = new GlobalConfig{};
|
||||
config->cx = Database(cx);
|
||||
g_network->setGlobal(INetwork::enGlobalConfig, config);
|
||||
config->_updater = updater(config, dbInfo);
|
||||
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
|
||||
auto config = new GlobalConfig{};
|
||||
config->cx = Database(cx);
|
||||
g_network->setGlobal(INetwork::enGlobalConfig, config);
|
||||
config->_updater = updater(config, dbInfo);
|
||||
}
|
||||
}
|
||||
|
||||
GlobalConfig& GlobalConfig::globalConfig() {
|
||||
|
@ -47,16 +49,16 @@ GlobalConfig& GlobalConfig::globalConfig() {
|
|||
return *reinterpret_cast<GlobalConfig*>(res);
|
||||
}
|
||||
|
||||
const std::any GlobalConfig::get(KeyRef name) {
|
||||
const ConfigValue GlobalConfig::get(KeyRef name) {
|
||||
auto it = data.find(name);
|
||||
if (it == data.end()) {
|
||||
return std::any{};
|
||||
return ConfigValue{ Arena(), std::any{} };
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
const std::map<KeyRef, std::any> GlobalConfig::get(KeyRangeRef range) {
|
||||
std::map<KeyRef, std::any> results;
|
||||
const std::map<KeyRef, ConfigValue> GlobalConfig::get(KeyRangeRef range) {
|
||||
std::map<KeyRef, ConfigValue> results;
|
||||
for (const auto& [key, value] : data) {
|
||||
if (range.contains(key)) {
|
||||
results[key] = value;
|
||||
|
@ -70,17 +72,18 @@ Future<Void> GlobalConfig::onInitialized() {
|
|||
}
|
||||
|
||||
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||
Arena arena(1);
|
||||
KeyRef stableKey = KeyRef(arena, key);
|
||||
try {
|
||||
Tuple t = Tuple::unpack(value);
|
||||
if (t.getType(0) == Tuple::ElementType::UTF8) {
|
||||
data[stableKey] = t.getString(0);
|
||||
data[stableKey] = ConfigValue{ arena, StringRef(arena, t.getString(0).contents()) };
|
||||
} else if (t.getType(0) == Tuple::ElementType::INT) {
|
||||
data[stableKey] = t.getInt(0);
|
||||
data[stableKey] = ConfigValue{ arena, t.getInt(0) };
|
||||
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
|
||||
data[stableKey] = t.getFloat(0);
|
||||
data[stableKey] = ConfigValue{ arena, t.getFloat(0) };
|
||||
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
|
||||
data[stableKey] = t.getDouble(0);
|
||||
data[stableKey] = ConfigValue{ arena, t.getDouble(0) };
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
@ -94,7 +97,6 @@ void GlobalConfig::erase(KeyRef key) {
|
|||
}
|
||||
|
||||
void GlobalConfig::erase(KeyRangeRef range) {
|
||||
// TODO: Memory leak -- memory for key remains allocated in arena
|
||||
auto it = data.begin();
|
||||
while (it != data.end()) {
|
||||
if (range.contains(it->first)) {
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
|
||||
#include <any>
|
||||
#include <map>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
|
@ -46,6 +47,11 @@ extern const KeyRef fdbClientInfoTxnSizeLimit;
|
|||
extern const KeyRef transactionTagSampleRate;
|
||||
extern const KeyRef transactionTagSampleCost;
|
||||
|
||||
struct ConfigValue {
|
||||
Arena arena;
|
||||
std::any value;
|
||||
};
|
||||
|
||||
class GlobalConfig {
|
||||
public:
|
||||
GlobalConfig(const GlobalConfig&) = delete;
|
||||
|
@ -54,27 +60,31 @@ public:
|
|||
static void create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
||||
static GlobalConfig& globalConfig();
|
||||
|
||||
const std::any get(KeyRef name);
|
||||
const std::map<KeyRef, std::any> get(KeyRangeRef range);
|
||||
const ConfigValue get(KeyRef name);
|
||||
const std::map<KeyRef, ConfigValue> get(KeyRangeRef range);
|
||||
|
||||
template <typename T>
|
||||
template <typename T, typename std::enable_if<std::is_arithmetic<T>{}, bool>::type = true>
|
||||
const T get(KeyRef name) {
|
||||
try {
|
||||
auto any = get(name);
|
||||
auto any = get(name).value;
|
||||
return std::any_cast<T>(any);
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename T, typename std::enable_if<std::is_arithmetic<T>{}, bool>::type = true>
|
||||
const T get(KeyRef name, T defaultVal) {
|
||||
auto any = get(name);
|
||||
if (any.has_value()) {
|
||||
return std::any_cast<T>(any);
|
||||
}
|
||||
try {
|
||||
auto any = get(name).value;
|
||||
if (any.has_value()) {
|
||||
return std::any_cast<T>(any);
|
||||
}
|
||||
|
||||
return defaultVal;
|
||||
return defaultVal;
|
||||
} catch (Error& e) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
// To write into the global configuration, submit a transaction to
|
||||
|
@ -96,8 +106,7 @@ private:
|
|||
Database cx;
|
||||
Future<Void> _updater;
|
||||
Promise<Void> initialized;
|
||||
Arena arena;
|
||||
std::unordered_map<StringRef, std::any> data;
|
||||
std::unordered_map<StringRef, ConfigValue> data;
|
||||
Version lastUpdate;
|
||||
};
|
||||
|
||||
|
|
|
@ -1379,18 +1379,18 @@ Future<Standalone<RangeResultRef>> GlobalConfigImpl::getRange(ReadYourWritesTran
|
|||
|
||||
auto& globalConfig = GlobalConfig::globalConfig();
|
||||
KeyRangeRef modified = KeyRangeRef(kr.begin.removePrefix(getKeyRange().begin), kr.end.removePrefix(getKeyRange().begin));
|
||||
std::map<KeyRef, std::any> values = globalConfig.get(modified);
|
||||
for (const auto& [key, any] : values) {
|
||||
std::map<KeyRef, ConfigValue> values = globalConfig.get(modified);
|
||||
for (const auto& [key, config] : values) {
|
||||
Key prefixedKey = key.withPrefix(getKeyRange().begin);
|
||||
if (any.has_value()) {
|
||||
if (any.type() == typeid(Standalone<StringRef>)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::any_cast<Standalone<StringRef>>(any).contents()));
|
||||
} else if (any.type() == typeid(int64_t)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(any))));
|
||||
} else if (any.type() == typeid(float)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(any))));
|
||||
} else if (any.type() == typeid(double)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast<double>(any))));
|
||||
if (config.value.has_value()) {
|
||||
if (config.value.type() == typeid(StringRef)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::any_cast<StringRef>(config.value).toString()));
|
||||
} else if (config.value.type() == typeid(int64_t)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(config.value))));
|
||||
} else if (config.value.type() == typeid(float)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(config.value))));
|
||||
} else if (config.value.type() == typeid(double)) {
|
||||
result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast<double>(config.value))));
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue