Fix global config not updating on server processes
This commit is contained in:
parent
f20bfb87de
commit
ba25b95c6a
|
@ -34,16 +34,7 @@ const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_inf
|
||||||
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
|
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
|
||||||
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
|
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
|
||||||
|
|
||||||
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
|
GlobalConfig::GlobalConfig(Database& cx) : cx(cx), lastUpdate(0) {}
|
||||||
|
|
||||||
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
|
||||||
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
|
|
||||||
auto config = new GlobalConfig{};
|
|
||||||
config->cx = Database(cx);
|
|
||||||
g_network->setGlobal(INetwork::enGlobalConfig, config);
|
|
||||||
config->_updater = updater(config, dbInfo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
GlobalConfig& GlobalConfig::globalConfig() {
|
GlobalConfig& GlobalConfig::globalConfig() {
|
||||||
void* res = g_network->global(INetwork::enGlobalConfig);
|
void* res = g_network->global(INetwork::enGlobalConfig);
|
||||||
|
@ -77,6 +68,14 @@ Future<Void> GlobalConfig::onInitialized() {
|
||||||
return initialized.getFuture();
|
return initialized.getFuture();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<Void> GlobalConfig::onChange() {
|
||||||
|
return configChanged.onTrigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn) {
|
||||||
|
callbacks.emplace(key, std::move(fn));
|
||||||
|
}
|
||||||
|
|
||||||
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||||
data.erase(key);
|
data.erase(key);
|
||||||
|
|
||||||
|
@ -89,6 +88,8 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||||
any = StringRef(arena, t.getString(0).contents());
|
any = StringRef(arena, t.getString(0).contents());
|
||||||
} else if (t.getType(0) == Tuple::ElementType::INT) {
|
} else if (t.getType(0) == Tuple::ElementType::INT) {
|
||||||
any = t.getInt(0);
|
any = t.getInt(0);
|
||||||
|
} else if (t.getType(0) == Tuple::ElementType::BOOL) {
|
||||||
|
any = t.getBool(0);
|
||||||
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
|
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
|
||||||
any = t.getFloat(0);
|
any = t.getFloat(0);
|
||||||
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
|
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
|
||||||
|
@ -97,19 +98,26 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
|
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
|
||||||
|
|
||||||
|
if (callbacks.find(stableKey) != callbacks.end()) {
|
||||||
|
callbacks[stableKey](data[stableKey]->value);
|
||||||
|
}
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
|
TraceEvent(SevWarn, "GlobalConfigTupleParseError").detail("What", e.what());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void GlobalConfig::erase(KeyRef key) {
|
void GlobalConfig::erase(Key key) {
|
||||||
data.erase(key);
|
erase(KeyRangeRef(key, keyAfter(key)));
|
||||||
}
|
}
|
||||||
|
|
||||||
void GlobalConfig::erase(KeyRangeRef range) {
|
void GlobalConfig::erase(KeyRangeRef range) {
|
||||||
auto it = data.begin();
|
auto it = data.begin();
|
||||||
while (it != data.end()) {
|
while (it != data.end()) {
|
||||||
if (range.contains(it->first)) {
|
if (range.contains(it->first)) {
|
||||||
|
if (callbacks.find(it->first) != callbacks.end()) {
|
||||||
|
callbacks[it->first](std::nullopt);
|
||||||
|
}
|
||||||
it = data.erase(it);
|
it = data.erase(it);
|
||||||
} else {
|
} else {
|
||||||
++it;
|
++it;
|
||||||
|
@ -134,36 +142,39 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
||||||
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
|
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
|
||||||
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
|
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
|
||||||
|
|
||||||
loop {
|
try {
|
||||||
try {
|
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
// The value doesn't matter too much, as long as the key is set.
|
||||||
// The value doesn't matter too much, as long as the key is set.
|
tr->set(migratedKey.contents(), "1"_sr);
|
||||||
tr->set(migratedKey.contents(), "1"_sr);
|
if (sampleRate.present()) {
|
||||||
if (sampleRate.present()) {
|
const double sampleRateDbl =
|
||||||
const double sampleRateDbl =
|
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
|
||||||
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
|
Tuple rate = Tuple().appendDouble(sampleRateDbl);
|
||||||
Tuple rate = Tuple().appendDouble(sampleRateDbl);
|
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
|
||||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
|
|
||||||
}
|
|
||||||
if (sizeLimit.present()) {
|
|
||||||
const int64_t sizeLimitInt =
|
|
||||||
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
|
|
||||||
Tuple size = Tuple().append(sizeLimitInt);
|
|
||||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
|
|
||||||
}
|
|
||||||
|
|
||||||
wait(tr->commit());
|
|
||||||
return Void();
|
|
||||||
} catch (Error& e) {
|
|
||||||
throw;
|
|
||||||
}
|
}
|
||||||
|
if (sizeLimit.present()) {
|
||||||
|
const int64_t sizeLimitInt =
|
||||||
|
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
|
||||||
|
Tuple size = Tuple().append(sizeLimitInt);
|
||||||
|
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(tr->commit());
|
||||||
|
} catch (Error& e) {
|
||||||
|
// If multiple fdbserver processes are started at once, they will all
|
||||||
|
// attempt this migration at the same time, sometimes resulting in
|
||||||
|
// aborts due to conflicts. Purposefully avoid retrying, making this
|
||||||
|
// migration best-effort.
|
||||||
|
TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updates local copy of global configuration by reading the entire key-range
|
// Updates local copy of global configuration by reading the entire key-range
|
||||||
// from storage.
|
// from storage.
|
||||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
||||||
self->data.clear();
|
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
|
||||||
|
|
||||||
Transaction tr(self->cx);
|
Transaction tr(self->cx);
|
||||||
RangeResult result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
|
RangeResult result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
|
||||||
|
@ -176,7 +187,8 @@ ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
||||||
|
|
||||||
// Applies updates to the local copy of the global configuration when this
|
// Applies updates to the local copy of the global configuration when this
|
||||||
// process receives an updated history.
|
// process receives an updated history.
|
||||||
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
|
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo* dbInfo) {
|
||||||
|
wait(self->cx->onConnected());
|
||||||
wait(self->migrate(self));
|
wait(self->migrate(self));
|
||||||
|
|
||||||
wait(self->refresh(self));
|
wait(self->refresh(self));
|
||||||
|
@ -184,9 +196,9 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
wait(dbInfo->onChange());
|
wait(self->dbInfoChanged.onTrigger());
|
||||||
|
|
||||||
auto& history = dbInfo->get().history;
|
auto& history = dbInfo->history;
|
||||||
if (history.size() == 0) {
|
if (history.size() == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -196,8 +208,8 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<
|
||||||
// history updates or the protocol version changed, so it
|
// history updates or the protocol version changed, so it
|
||||||
// must re-read the entire configuration range.
|
// must re-read the entire configuration range.
|
||||||
wait(self->refresh(self));
|
wait(self->refresh(self));
|
||||||
if (dbInfo->get().history.size() > 0) {
|
if (dbInfo->history.size() > 0) {
|
||||||
self->lastUpdate = dbInfo->get().history.back().version;
|
self->lastUpdate = dbInfo->history.back().version;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Apply history in order, from lowest version to highest
|
// Apply history in order, from lowest version to highest
|
||||||
|
@ -222,6 +234,8 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<
|
||||||
self->lastUpdate = vh.version;
|
self->lastUpdate = vh.version;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self->configChanged.trigger();
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,10 +62,28 @@ struct ConfigValue : ReferenceCounted<ConfigValue> {
|
||||||
|
|
||||||
class GlobalConfig : NonCopyable {
|
class GlobalConfig : NonCopyable {
|
||||||
public:
|
public:
|
||||||
// Creates a GlobalConfig singleton, accessed by calling GlobalConfig().
|
// Creates a GlobalConfig singleton, accessed by calling
|
||||||
// This function should only be called once by each process (however, it is
|
// GlobalConfig::globalConfig(). This function requires a database object
|
||||||
// idempotent and calling it multiple times will have no effect).
|
// to allow global configuration to run transactions on the database, and
|
||||||
static void create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
// an AsyncVar object to watch for changes on. The ClientDBInfo pointer
|
||||||
|
// should point to a ClientDBInfo object which will contain the updated
|
||||||
|
// global configuration history when the given AsyncVar changes. This
|
||||||
|
// function should be called whenever the database object changes, in order
|
||||||
|
// to allow global configuration to run transactions on the latest
|
||||||
|
// database.
|
||||||
|
template <class T>
|
||||||
|
static void create(Database& cx, Reference<AsyncVar<T>> db, const ClientDBInfo* dbInfo) {
|
||||||
|
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
|
||||||
|
auto config = new GlobalConfig{cx};
|
||||||
|
g_network->setGlobal(INetwork::enGlobalConfig, config);
|
||||||
|
config->_updater = updater(config, dbInfo);
|
||||||
|
// Bind changes in `db` to the `dbInfoChanged` AsyncTrigger.
|
||||||
|
forward(db, std::addressof(config->dbInfoChanged));
|
||||||
|
} else {
|
||||||
|
GlobalConfig* oldConfig = reinterpret_cast<GlobalConfig*>(g_network->global(INetwork::enGlobalConfig));
|
||||||
|
oldConfig->cx = cx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Returns a reference to the global GlobalConfig object. Clients should
|
// Returns a reference to the global GlobalConfig object. Clients should
|
||||||
// call this function whenever they need to read a value out of the global
|
// call this function whenever they need to read a value out of the global
|
||||||
|
@ -114,8 +132,18 @@ public:
|
||||||
// been created and is ready.
|
// been created and is ready.
|
||||||
Future<Void> onInitialized();
|
Future<Void> onInitialized();
|
||||||
|
|
||||||
|
// Triggers the returned future when any key-value pair in the global
|
||||||
|
// configuration changes.
|
||||||
|
Future<Void> onChange();
|
||||||
|
|
||||||
|
// Calls \ref fn when the value associated with \ref key is changed. \ref
|
||||||
|
// fn is passed the updated value for the key, or an empty optional if the
|
||||||
|
// key has been cleared. If the value is an allocated object, its memory
|
||||||
|
// remains in the control of the global configuration.
|
||||||
|
void trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
GlobalConfig();
|
GlobalConfig(Database& cx);
|
||||||
|
|
||||||
// The functions below only affect the local copy of the global
|
// The functions below only affect the local copy of the global
|
||||||
// configuration keyspace! To insert or remove values across all nodes you
|
// configuration keyspace! To insert or remove values across all nodes you
|
||||||
|
@ -127,20 +155,23 @@ private:
|
||||||
void insert(KeyRef key, ValueRef value);
|
void insert(KeyRef key, ValueRef value);
|
||||||
// Removes the given key (and associated value) from the local copy of the
|
// Removes the given key (and associated value) from the local copy of the
|
||||||
// global configuration keyspace.
|
// global configuration keyspace.
|
||||||
void erase(KeyRef key);
|
void erase(Key key);
|
||||||
// Removes the given key range (and associated values) from the local copy
|
// Removes the given key range (and associated values) from the local copy
|
||||||
// of the global configuration keyspace.
|
// of the global configuration keyspace.
|
||||||
void erase(KeyRangeRef range);
|
void erase(KeyRangeRef range);
|
||||||
|
|
||||||
ACTOR static Future<Void> migrate(GlobalConfig* self);
|
ACTOR static Future<Void> migrate(GlobalConfig* self);
|
||||||
ACTOR static Future<Void> refresh(GlobalConfig* self);
|
ACTOR static Future<Void> refresh(GlobalConfig* self);
|
||||||
ACTOR static Future<Void> updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo);
|
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);
|
||||||
|
|
||||||
Database cx;
|
Database cx;
|
||||||
|
AsyncTrigger dbInfoChanged;
|
||||||
Future<Void> _updater;
|
Future<Void> _updater;
|
||||||
Promise<Void> initialized;
|
Promise<Void> initialized;
|
||||||
|
AsyncTrigger configChanged;
|
||||||
std::unordered_map<StringRef, Reference<ConfigValue>> data;
|
std::unordered_map<StringRef, Reference<ConfigValue>> data;
|
||||||
Version lastUpdate;
|
Version lastUpdate;
|
||||||
|
std::unordered_map<KeyRef, std::function<void(std::optional<std::any>)>> callbacks;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1152,8 +1152,6 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
||||||
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
|
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
|
||||||
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
|
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
|
||||||
|
|
||||||
GlobalConfig::create(this, clientInfo);
|
|
||||||
|
|
||||||
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
|
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
|
||||||
monitorTssInfoChange = monitorTssChange(this);
|
monitorTssInfoChange = monitorTssChange(this);
|
||||||
tssMismatchHandler = handleTssMismatches(this);
|
tssMismatchHandler = handleTssMismatches(this);
|
||||||
|
@ -1754,7 +1752,9 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
|
||||||
/*switchable*/ true);
|
/*switchable*/ true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Database(db);
|
auto database = Database(db);
|
||||||
|
GlobalConfig::create(database, clientInfo, std::addressof(clientInfo->get()));
|
||||||
|
return database;
|
||||||
}
|
}
|
||||||
|
|
||||||
Database Database::createDatabase(std::string connFileName,
|
Database Database::createDatabase(std::string connFileName,
|
||||||
|
|
|
@ -1384,6 +1384,9 @@ Future<RangeResult> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw, K
|
||||||
} else if (config->value.type() == typeid(int64_t)) {
|
} else if (config->value.type() == typeid(int64_t)) {
|
||||||
result.push_back_deep(result.arena(),
|
result.push_back_deep(result.arena(),
|
||||||
KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(config->value))));
|
KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(config->value))));
|
||||||
|
} else if (config->value.type() == typeid(bool)) {
|
||||||
|
result.push_back_deep(result.arena(),
|
||||||
|
KeyValueRef(prefixedKey, std::to_string(std::any_cast<bool>(config->value))));
|
||||||
} else if (config->value.type() == typeid(float)) {
|
} else if (config->value.type() == typeid(float)) {
|
||||||
result.push_back_deep(result.arena(),
|
result.push_back_deep(result.arena(),
|
||||||
KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(config->value))));
|
KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(config->value))));
|
||||||
|
|
|
@ -71,6 +71,8 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
|
||||||
i += sizeof(float) + 1;
|
i += sizeof(float) + 1;
|
||||||
} else if (data[i] == 0x21) {
|
} else if (data[i] == 0x21) {
|
||||||
i += sizeof(double) + 1;
|
i += sizeof(double) + 1;
|
||||||
|
} else if (data[i] == 0x26 || data[i] == 0x27) {
|
||||||
|
i += 1;
|
||||||
} else if (data[i] == '\x00') {
|
} else if (data[i] == '\x00') {
|
||||||
i += 1;
|
i += 1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -144,6 +146,16 @@ Tuple& Tuple::append(int64_t value) {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Tuple& Tuple::appendBool(bool value) {
|
||||||
|
offsets.push_back(data.size());
|
||||||
|
if (value) {
|
||||||
|
data.push_back(data.arena(), 0x27);
|
||||||
|
} else {
|
||||||
|
data.push_back(data.arena(), 0x26);
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
Tuple& Tuple::appendFloat(float value) {
|
Tuple& Tuple::appendFloat(float value) {
|
||||||
offsets.push_back(data.size());
|
offsets.push_back(data.size());
|
||||||
float swap = bigEndianFloat(value);
|
float swap = bigEndianFloat(value);
|
||||||
|
@ -192,6 +204,8 @@ Tuple::ElementType Tuple::getType(size_t index) const {
|
||||||
return ElementType::FLOAT;
|
return ElementType::FLOAT;
|
||||||
} else if (code == 0x21) {
|
} else if (code == 0x21) {
|
||||||
return ElementType::DOUBLE;
|
return ElementType::DOUBLE;
|
||||||
|
} else if (code == 0x26 || code == 0x27) {
|
||||||
|
return ElementType::BOOL;
|
||||||
} else {
|
} else {
|
||||||
throw invalid_tuple_data_type();
|
throw invalid_tuple_data_type();
|
||||||
}
|
}
|
||||||
|
@ -287,6 +301,21 @@ int64_t Tuple::getInt(size_t index, bool allow_incomplete) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Combine with bindings/flow/Tuple.*. This code is copied from there.
|
// TODO: Combine with bindings/flow/Tuple.*. This code is copied from there.
|
||||||
|
bool Tuple::getBool(size_t index) const {
|
||||||
|
if (index >= offsets.size()) {
|
||||||
|
throw invalid_tuple_index();
|
||||||
|
}
|
||||||
|
ASSERT_LT(offsets[index], data.size());
|
||||||
|
uint8_t code = data[offsets[index]];
|
||||||
|
if (code == 0x26) {
|
||||||
|
return false;
|
||||||
|
} else if (code == 0x27) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
throw invalid_tuple_data_type();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
float Tuple::getFloat(size_t index) const {
|
float Tuple::getFloat(size_t index) const {
|
||||||
if (index >= offsets.size()) {
|
if (index >= offsets.size()) {
|
||||||
throw invalid_tuple_index();
|
throw invalid_tuple_index();
|
||||||
|
|
|
@ -40,6 +40,7 @@ struct Tuple {
|
||||||
Tuple& append(int64_t);
|
Tuple& append(int64_t);
|
||||||
// There are some ambiguous append calls in fdbclient, so to make it easier
|
// There are some ambiguous append calls in fdbclient, so to make it easier
|
||||||
// to add append for floats and doubles, name them differently for now.
|
// to add append for floats and doubles, name them differently for now.
|
||||||
|
Tuple& appendBool(bool);
|
||||||
Tuple& appendFloat(float);
|
Tuple& appendFloat(float);
|
||||||
Tuple& appendDouble(double);
|
Tuple& appendDouble(double);
|
||||||
Tuple& appendNull();
|
Tuple& appendNull();
|
||||||
|
@ -51,7 +52,7 @@ struct Tuple {
|
||||||
return append(t);
|
return append(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, FLOAT, DOUBLE };
|
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE };
|
||||||
|
|
||||||
// this is number of elements, not length of data
|
// this is number of elements, not length of data
|
||||||
size_t size() const { return offsets.size(); }
|
size_t size() const { return offsets.size(); }
|
||||||
|
@ -59,6 +60,7 @@ struct Tuple {
|
||||||
ElementType getType(size_t index) const;
|
ElementType getType(size_t index) const;
|
||||||
Standalone<StringRef> getString(size_t index) const;
|
Standalone<StringRef> getString(size_t index) const;
|
||||||
int64_t getInt(size_t index, bool allow_incomplete = false) const;
|
int64_t getInt(size_t index, bool allow_incomplete = false) const;
|
||||||
|
bool getBool(size_t index) const;
|
||||||
float getFloat(size_t index) const;
|
float getFloat(size_t index) const;
|
||||||
double getDouble(size_t index) const;
|
double getDouble(size_t index) const;
|
||||||
|
|
||||||
|
|
|
@ -3988,7 +3988,7 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
||||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
|
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
|
||||||
state ClientDBInfo clientInfo = db->clientInfo->get();
|
state ClientDBInfo clientInfo = db->serverInfo->get().client;
|
||||||
|
|
||||||
if (globalConfigVersion.present()) {
|
if (globalConfigVersion.present()) {
|
||||||
// Since the history keys end with versionstamps, they
|
// Since the history keys end with versionstamps, they
|
||||||
|
@ -4046,6 +4046,14 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
||||||
}
|
}
|
||||||
|
|
||||||
clientInfo.id = deterministicRandom()->randomUniqueID();
|
clientInfo.id = deterministicRandom()->randomUniqueID();
|
||||||
|
// Update ServerDBInfo so fdbserver processes receive updated history.
|
||||||
|
ServerDBInfo serverInfo = db->serverInfo->get();
|
||||||
|
serverInfo.id = deterministicRandom()->randomUniqueID();
|
||||||
|
serverInfo.infoGeneration = ++db->dbInfoCount;
|
||||||
|
serverInfo.client = clientInfo;
|
||||||
|
db->serverInfo->set(serverInfo);
|
||||||
|
|
||||||
|
// Update ClientDBInfo so client processes receive updated history.
|
||||||
db->clientInfo->set(clientInfo);
|
db->clientInfo->set(clientInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include <boost/lexical_cast.hpp>
|
#include <boost/lexical_cast.hpp>
|
||||||
|
|
||||||
#include "fdbrpc/Locality.h"
|
#include "fdbrpc/Locality.h"
|
||||||
|
#include "fdbclient/GlobalConfig.actor.h"
|
||||||
#include "fdbclient/StorageServerInterface.h"
|
#include "fdbclient/StorageServerInterface.h"
|
||||||
#include "fdbserver/Knobs.h"
|
#include "fdbserver/Knobs.h"
|
||||||
#include "flow/ActorCollection.h"
|
#include "flow/ActorCollection.h"
|
||||||
|
@ -139,12 +140,14 @@ Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
|
||||||
bool enableLocalityLoadBalance,
|
bool enableLocalityLoadBalance,
|
||||||
bool lockAware) {
|
bool lockAware) {
|
||||||
auto info = makeReference<AsyncVar<ClientDBInfo>>();
|
auto info = makeReference<AsyncVar<ClientDBInfo>>();
|
||||||
return DatabaseContext::create(info,
|
auto cx = DatabaseContext::create(info,
|
||||||
extractClientInfo(db, info),
|
extractClientInfo(db, info),
|
||||||
enableLocalityLoadBalance ? db->get().myLocality : LocalityData(),
|
enableLocalityLoadBalance ? db->get().myLocality : LocalityData(),
|
||||||
enableLocalityLoadBalance,
|
enableLocalityLoadBalance,
|
||||||
taskID,
|
taskID,
|
||||||
lockAware);
|
lockAware);
|
||||||
|
GlobalConfig::create(cx, db, std::addressof(db->get().client));
|
||||||
|
return cx;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ErrorInfo {
|
struct ErrorInfo {
|
||||||
|
@ -1292,7 +1295,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
||||||
notUpdated = interf.updateServerDBInfo.getEndpoint();
|
notUpdated = interf.updateServerDBInfo.getEndpoint();
|
||||||
} else if (localInfo.infoGeneration > dbInfo->get().infoGeneration ||
|
} else if (localInfo.infoGeneration > dbInfo->get().infoGeneration ||
|
||||||
dbInfo->get().clusterInterface != ccInterface->get().get()) {
|
dbInfo->get().clusterInterface != ccInterface->get().get()) {
|
||||||
|
|
||||||
TraceEvent("GotServerDBInfoChange")
|
TraceEvent("GotServerDBInfoChange")
|
||||||
.detail("ChangeID", localInfo.id)
|
.detail("ChangeID", localInfo.id)
|
||||||
.detail("MasterID", localInfo.master.id())
|
.detail("MasterID", localInfo.master.id())
|
||||||
|
|
|
@ -697,6 +697,16 @@ private:
|
||||||
AsyncVar<Void> v;
|
AsyncVar<Void> v;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
|
||||||
|
// the AsyncTrigger is triggered.
|
||||||
|
ACTOR template <class T>
|
||||||
|
void forward(Reference<AsyncVar<T>> from, AsyncTrigger* to) {
|
||||||
|
loop {
|
||||||
|
wait(from->onChange());
|
||||||
|
to->trigger();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class Debouncer : NonCopyable {
|
class Debouncer : NonCopyable {
|
||||||
public:
|
public:
|
||||||
explicit Debouncer(double delay) { worker = debounceWorker(this, delay); }
|
explicit Debouncer(double delay) { worker = debounceWorker(this, delay); }
|
||||||
|
|
Loading…
Reference in New Issue