Merge branch 'master' of github.com:apple/foundationdb into jfu-test-conf-options

This commit is contained in:
Jon Fu 2021-04-15 12:27:31 -04:00
commit ccc65c7924
26 changed files with 957 additions and 131 deletions

View File

@ -364,6 +364,17 @@ extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d
return DB(d)->getMainThreadBusyness();
}
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is non-zero, the future won't return until the protocol version is different than expected
extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version) {
Optional<ProtocolVersion> expected;
if (expected_version > 0) {
expected = ProtocolVersion(expected_version);
}
return (FDBFuture*)(DB(db)->getServerProtocol(expected).extractPtr());
}
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {
try {
TXN(tr)->delref();
@ -583,10 +594,6 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_approximate_size(FDBTransact
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
}
extern "C" DLLEXPORT FDBFuture* fdb_get_server_protocol(const char* clusterFilePath) {
return (FDBFuture*)(API->getServerProtocol(clusterFilePath ? clusterFilePath : "").extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr) {
return (FDBFuture*)(TXN(tr)->getVersionstamp().extractPtr());
}

View File

@ -189,6 +189,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_create_snapshot(FDBDatabase
DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version);
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);
DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr);
@ -281,8 +283,6 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_transaction_get_committed_version(F
*/
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_get_server_protocol(const char* clusterFilePath);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_on_error(FDBTransaction* tr, fdb_error_t error);

View File

@ -1515,15 +1515,15 @@ TEST_CASE("fdb_transaction_get_approximate_size") {
TEST_CASE("fdb_get_server_protocol") {
// We don't really have any expectations other than "don't crash" here
FDBFuture* protocolFuture = fdb_get_server_protocol(clusterFilePath.c_str());
FDBFuture* protocolFuture = fdb_database_get_server_protocol(db, 0);
uint64_t out;
fdb_check(fdb_future_block_until_ready(protocolFuture));
fdb_check(fdb_future_get_uint64(protocolFuture, &out));
fdb_future_destroy(protocolFuture);
// "Default" cluster file version
protocolFuture = fdb_get_server_protocol(nullptr);
// Passing in an expected version that's different than the cluster version
protocolFuture = fdb_database_get_server_protocol(db, 0x0FDB00A200090000LL);
fdb_check(fdb_future_block_until_ready(protocolFuture));
fdb_check(fdb_future_get_uint64(protocolFuture, &out));
fdb_future_destroy(protocolFuture);

View File

@ -24,6 +24,7 @@
#include "fdbclient/Status.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/ClusterInterface.h"
@ -3841,25 +3842,16 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
is_error = true;
continue;
}
state Future<Optional<Standalone<StringRef>>> sampleRateFuture =
tr->get(fdbClientInfoTxnSampleRate);
state Future<Optional<Standalone<StringRef>>> sizeLimitFuture =
tr->get(fdbClientInfoTxnSizeLimit);
wait(makeInterruptable(success(sampleRateFuture) && success(sizeLimitFuture)));
const double sampleRateDbl = GlobalConfig::globalConfig().get<double>(
fdbClientInfoTxnSampleRate, std::numeric_limits<double>::infinity());
const int64_t sizeLimit =
GlobalConfig::globalConfig().get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
std::string sampleRateStr = "default", sizeLimitStr = "default";
if (sampleRateFuture.get().present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRateFuture.get().get(), Unversioned());
if (!std::isinf(sampleRateDbl)) {
sampleRateStr = boost::lexical_cast<std::string>(sampleRateDbl);
}
if (!std::isinf(sampleRateDbl)) {
sampleRateStr = boost::lexical_cast<std::string>(sampleRateDbl);
}
if (sizeLimitFuture.get().present()) {
const int64_t sizeLimit =
BinaryReader::fromStringRef<int64_t>(sizeLimitFuture.get().get(), Unversioned());
if (sizeLimit != -1) {
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
}
if (sizeLimit != -1) {
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
}
printf("Client profiling rate is set to %s and size limit is set to %s.\n",
sampleRateStr.c_str(),
@ -3897,8 +3889,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
}
tr->set(fdbClientInfoTxnSampleRate, BinaryWriter::toValue(sampleRate, Unversioned()));
tr->set(fdbClientInfoTxnSizeLimit, BinaryWriter::toValue(sizeLimit, Unversioned()));
Tuple rate = Tuple().appendDouble(sampleRate);
Tuple size = Tuple().append(sizeLimit);
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
if (!intrans) {
wait(commitTransaction(tr));
}

View File

@ -28,6 +28,9 @@ set(FDBCLIENT_SRCS
FDBOptions.h
FDBTypes.h
FileBackupAgent.actor.cpp
GlobalConfig.h
GlobalConfig.actor.h
GlobalConfig.actor.cpp
GrvProxyInterface.h
HTTP.actor.cpp
IClientApi.h

View File

@ -31,6 +31,7 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbclient/TagThrottle.h"
#include "fdbclient/GlobalConfig.h"
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"
@ -113,16 +114,10 @@ struct ClientDBInfo {
vector<CommitProxyInterface> commitProxies;
Optional<CommitProxyInterface>
firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk
double clientTxnInfoSampleRate;
int64_t clientTxnInfoSizeLimit;
Optional<Value> forward;
double transactionTagSampleRate;
double transactionTagSampleCost;
vector<VersionHistory> history;
ClientDBInfo()
: clientTxnInfoSampleRate(std::numeric_limits<double>::infinity()), clientTxnInfoSizeLimit(-1),
transactionTagSampleRate(CLIENT_KNOBS->READ_TAG_SAMPLE_RATE),
transactionTagSampleCost(CLIENT_KNOBS->COMMIT_SAMPLE_COST) {}
ClientDBInfo() {}
bool operator==(ClientDBInfo const& r) const { return id == r.id; }
bool operator!=(ClientDBInfo const& r) const { return id != r.id; }
@ -132,15 +127,7 @@ struct ClientDBInfo {
if constexpr (!is_fb_function<Archive>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar,
grvProxies,
commitProxies,
id,
clientTxnInfoSampleRate,
clientTxnInfoSizeLimit,
forward,
transactionTagSampleRate,
transactionTagSampleCost);
serializer(ar, grvProxies, commitProxies, id, forward, history);
}
};

View File

@ -0,0 +1,229 @@
/*
* GlobalConfig.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tuple.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("config/fdb_client_info/client_txn_sample_rate");
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_info/client_txn_size_limit");
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{};
config->cx = Database(cx);
g_network->setGlobal(INetwork::enGlobalConfig, config);
config->_updater = updater(config, dbInfo);
}
}
GlobalConfig& GlobalConfig::globalConfig() {
void* res = g_network->global(INetwork::enGlobalConfig);
ASSERT(res);
return *reinterpret_cast<GlobalConfig*>(res);
}
Key GlobalConfig::prefixedKey(KeyRef key) {
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
}
const Reference<ConfigValue> GlobalConfig::get(KeyRef name) {
auto it = data.find(name);
if (it == data.end()) {
return Reference<ConfigValue>();
}
return it->second;
}
const std::map<KeyRef, Reference<ConfigValue>> GlobalConfig::get(KeyRangeRef range) {
std::map<KeyRef, Reference<ConfigValue>> results;
for (const auto& [key, value] : data) {
if (range.contains(key)) {
results[key] = value;
}
}
return results;
}
Future<Void> GlobalConfig::onInitialized() {
return initialized.getFuture();
}
void GlobalConfig::insert(KeyRef key, ValueRef value) {
data.erase(key);
Arena arena(key.expectedSize() + value.expectedSize());
KeyRef stableKey = KeyRef(arena, key);
try {
std::any any;
Tuple t = Tuple::unpack(value);
if (t.getType(0) == Tuple::ElementType::UTF8) {
any = StringRef(arena, t.getString(0).contents());
} else if (t.getType(0) == Tuple::ElementType::INT) {
any = t.getInt(0);
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
any = t.getFloat(0);
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
any = t.getDouble(0);
} else {
ASSERT(false);
}
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
} catch (Error& e) {
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
}
}
void GlobalConfig::erase(KeyRef key) {
data.erase(key);
}
void GlobalConfig::erase(KeyRangeRef range) {
auto it = data.begin();
while (it != data.end()) {
if (range.contains(it->first)) {
it = data.erase(it);
} else {
++it;
}
}
}
// Older FDB versions used different keys for client profiling data. This
// function performs a one-time migration of data in these keys to the new
// global configuration key space.
ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Key migratedKey("\xff\x02/fdbClientInfo/migrated/"_sr);
state Optional<Value> migrated = wait(tr->get(migratedKey));
if (migrated.present()) {
// Already performed migration.
return Void();
}
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}
wait(tr->commit());
return Void();
} catch (Error& e) {
throw;
}
}
}
// Updates local copy of global configuration by reading the entire key-range
// from storage.
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
self->data.clear();
Transaction tr(self->cx);
Standalone<RangeResultRef> result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
for (const auto& kv : result) {
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
self->insert(systemKey, kv.value);
}
return Void();
}
// Applies updates to the local copy of the global configuration when this
// process receives an updated history.
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
wait(self->migrate(self));
wait(self->refresh(self));
self->initialized.send(Void());
loop {
try {
wait(dbInfo->onChange());
auto& history = dbInfo->get().history;
if (history.size() == 0) {
continue;
}
if (self->lastUpdate < history[0].version) {
// This process missed too many global configuration
// history updates or the protocol version changed, so it
// must re-read the entire configuration range.
wait(self->refresh(self));
if (dbInfo->get().history.size() > 0) {
self->lastUpdate = dbInfo->get().history.back().version;
}
} else {
// Apply history in order, from lowest version to highest
// version. Mutation history should already be stored in
// ascending version order.
for (const auto& vh : history) {
if (vh.version <= self->lastUpdate) {
continue; // already applied this mutation
}
for (const auto& mutation : vh.mutations.contents()) {
if (mutation.type == MutationRef::SetValue) {
self->insert(mutation.param1, mutation.param2);
} else if (mutation.type == MutationRef::ClearRange) {
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
} else {
ASSERT(false);
}
}
ASSERT(vh.version > self->lastUpdate);
self->lastUpdate = vh.version;
}
}
} catch (Error& e) {
throw;
}
}
}

View File

@ -0,0 +1,146 @@
/*
* GlobalConfig.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_GLOBALCONFIG_ACTOR_G_H)
#define FDBCLIENT_GLOBALCONFIG_ACTOR_G_H
#include "fdbclient/GlobalConfig.actor.g.h"
#elif !defined(FDBCLIENT_GLOBALCONFIG_ACTOR_H)
#define FDBCLIENT_GLOBALCONFIG_ACTOR_H
#include <any>
#include <map>
#include <type_traits>
#include <unordered_map>
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/GlobalConfig.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/actorcompiler.h" // has to be last include
// The global configuration is a series of typed key-value pairs synced to all
// nodes (server and client) in an FDB cluster in an eventually consistent
// manner. Only small key-value pairs should be stored in global configuration;
// an excessive amount of data can cause synchronization slowness.
// Keys
extern const KeyRef fdbClientInfoTxnSampleRate;
extern const KeyRef fdbClientInfoTxnSizeLimit;
extern const KeyRef transactionTagSampleRate;
extern const KeyRef transactionTagSampleCost;
// Structure used to hold the values stored by global configuration. The arena
// is used as memory to store both the key and the value (the value is only
// stored in the arena if it is an object; primitives are just copied).
struct ConfigValue : ReferenceCounted<ConfigValue> {
Arena arena;
std::any value;
ConfigValue() {}
ConfigValue(Arena&& a, std::any&& v) : arena(a), value(v) {}
};
class GlobalConfig : NonCopyable {
public:
// Creates a GlobalConfig singleton, accessed by calling GlobalConfig().
// This function should only be called once by each process (however, it is
// idempotent and calling it multiple times will have no effect).
static void create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo);
// Returns a reference to the global GlobalConfig object. Clients should
// call this function whenever they need to read a value out of the global
// configuration.
static GlobalConfig& globalConfig();
// Use this function to turn a global configuration key defined above into
// the full path needed to set the value in the database.
//
// For example, given "config/a", returns "\xff\xff/global_config/config/a".
static Key prefixedKey(KeyRef key);
// Get a value from the framework. Values are returned as a ConfigValue
// reference which also contains the arena holding the object. As long as
// the caller keeps the ConfigValue reference, the value is guaranteed to
// be readable. An empty reference is returned if the value does not exist.
const Reference<ConfigValue> get(KeyRef name);
const std::map<KeyRef, Reference<ConfigValue>> get(KeyRangeRef range);
// For arithmetic value types, returns a copy of the value for the given
// key, or the supplied default value if the framework does not know about
// the key.
template <typename T, typename std::enable_if<std::is_arithmetic<T>{}, bool>::type = true>
const T get(KeyRef name, T defaultVal) {
try {
auto configValue = get(name);
if (configValue.isValid()) {
if (configValue->value.has_value()) {
return std::any_cast<T>(configValue->value);
}
}
return defaultVal;
} catch (Error& e) {
throw;
}
}
// Trying to write into the global configuration keyspace? To write data,
// submit a transaction to \xff\xff/global_config/<your-key> with
// <your-value> encoded using the FDB tuple typecodes. Use the helper
// function `prefixedKey` to correctly prefix your global configuration
// key.
// Triggers the returned future when the global configuration singleton has
// been created and is ready.
Future<Void> onInitialized();
private:
GlobalConfig();
// The functions below only affect the local copy of the global
// configuration keyspace! To insert or remove values across all nodes you
// must use a transaction (see the note above).
// Inserts the given key-value pair into the local copy of the global
// configuration keyspace, overwriting the old key-value pair if it exists.
// `value` must be encoded using the FDB tuple typecodes.
void insert(KeyRef key, ValueRef value);
// Removes the given key (and associated value) from the local copy of the
// global configuration keyspace.
void erase(KeyRef key);
// Removes the given key range (and associated values) from the local copy
// of the global configuration keyspace.
void erase(KeyRangeRef range);
ACTOR static Future<Void> migrate(GlobalConfig* self);
ACTOR static Future<Void> refresh(GlobalConfig* self);
ACTOR static Future<Void> updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo);
Database cx;
Future<Void> _updater;
Promise<Void> initialized;
std::unordered_map<StringRef, Reference<ConfigValue>> data;
Version lastUpdate;
};
#endif

45
fdbclient/GlobalConfig.h Normal file
View File

@ -0,0 +1,45 @@
/*
* GlobalConfig.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
// Used to store a list of mutations made to the global configuration at a
// specific version.
struct VersionHistory {
constexpr static FileIdentifier file_identifier = 5863456;
VersionHistory() {}
VersionHistory(Version v) : version(v) {}
Version version;
Standalone<VectorRef<MutationRef>> mutations;
bool operator<(const VersionHistory& other) const { return version < other.version; }
int expectedSize() const { return sizeof(version) + mutations.expectedSize(); }
template <typename Ar>
void serialize(Ar& ar) {
serializer(ar, mutations, version);
}
};

View File

@ -28,6 +28,7 @@
#include "flow/ThreadHelper.actor.h"
// An interface that represents a transaction created by a client
class ITransaction {
public:
virtual ~ITransaction() {}
@ -90,6 +91,7 @@ public:
virtual void delref() = 0;
};
// An interface that represents a connection to a cluster made by a client
class IDatabase {
public:
virtual ~IDatabase() {}
@ -98,6 +100,11 @@ public:
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0;
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
virtual ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
@ -110,13 +117,16 @@ public:
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
};
// An interface that presents the top-level FDB client API as exposed through the C bindings
//
// This interface and its associated objects are intended to live outside the network thread, so its asynchronous
// operations use ThreadFutures and implementations should be thread safe.
class IClientApi {
public:
virtual ~IClientApi() {}
virtual void selectApiVersion(int apiVersion) = 0;
virtual const char* getClientVersion() = 0;
virtual ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) = 0;
virtual void setNetworkOption(FDBNetworkOptions::Option option,
Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -356,7 +356,32 @@ double DLDatabase::getMainThreadBusyness() {
return 0;
}
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
ASSERT(api->databaseGetServerProtocol != nullptr);
uint64_t expected =
expectedVersion.map<uint64_t>([](const ProtocolVersion& v) { return v.version(); }).orDefault(0);
FdbCApi::FDBFuture* f = api->databaseGetServerProtocol(db, expected);
return toThreadFuture<ProtocolVersion>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
uint64_t pv;
FdbCApi::fdb_error_t error = api->futureGetUInt64(f, &pv);
ASSERT(!error);
return ProtocolVersion(pv);
});
}
// DLApi
// Loads the specified function from a dynamic library
//
// fp - The function pointer where the loaded function will be stored
// lib - The dynamic library where the function is loaded from
// libPath - The path of the dynamic library (used for logging)
// functionName - The function to load
// requireFunction - Determines the behavior if the function is not present. If true, an error is thrown. If false,
// the function pointer will be set to nullptr.
template <class T>
void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) {
*(void**)(fp) = loadFunction(lib, functionName);
@ -403,6 +428,8 @@ void DLApi::init() {
fdbCPath,
"fdb_database_get_main_thread_busyness",
headerVersion >= 700);
loadClientFunction(
&api->databaseGetServerProtocol, lib, fdbCPath, "fdb_database_get_server_protocol", headerVersion >= 700);
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
loadClientFunction(&api->databaseForceRecoveryWithDataLoss,
@ -452,7 +479,7 @@ void DLApi::init() {
loadClientFunction(
&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version");
loadClientFunction(&api->futureGetUInt64, lib, fdbCPath, "fdb_future_get_uint64");
loadClientFunction(&api->futureGetUInt64, lib, fdbCPath, "fdb_future_get_uint64", headerVersion >= 700);
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
@ -488,11 +515,6 @@ const char* DLApi::getClientVersion() {
return api->getClientVersion();
}
ThreadFuture<uint64_t> DLApi::getServerProtocol(const char* clusterFilePath) {
ASSERT(false);
return ThreadFuture<uint64_t>();
}
void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
throwIfError(api->setNetworkOption(
option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0));
@ -856,7 +878,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
std::string clusterFilePath,
Reference<IDatabase> db,
bool openConnectors)
: dbState(new DatabaseState()) {
: dbState(new DatabaseState()), clusterFilePath(clusterFilePath) {
dbState->db = db;
dbState->dbVar->set(db);
@ -941,6 +963,15 @@ double MultiVersionDatabase::getMainThreadBusyness() {
return 0;
}
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
// TODO: send this out through the active database
return MultiVersionApi::api->getLocalClient()
->api->createDatabase(clusterFilePath.c_str())
->getServerProtocol(expectedVersion);
}
void MultiVersionDatabase::Connector::connect() {
addref();
onMainThreadVoid(
@ -1181,10 +1212,6 @@ const char* MultiVersionApi::getClientVersion() {
return localClient->api->getClientVersion();
}
ThreadFuture<uint64_t> MultiVersionApi::getServerProtocol(const char* clusterFilePath) {
return api->localClient->api->getServerProtocol(clusterFilePath);
}
void validateOption(Optional<StringRef> value, bool canBePresent, bool canBeAbsent, bool canBeEmpty = true) {
ASSERT(canBePresent || canBeAbsent);

View File

@ -28,6 +28,8 @@
#include "flow/ThreadHelper.actor.h"
// FdbCApi is used as a wrapper around the FoundationDB C API that gets loaded from an external client library.
// All of the required functions loaded from that external library are stored in function pointers in this struct.
struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct future FDBFuture;
typedef struct cluster FDBCluster;
@ -55,7 +57,6 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
// Network
fdb_error_t (*selectApiVersion)(int runtimeVersion, int headerVersion);
const char* (*getClientVersion)();
FDBFuture* (*getServerProtocol)(const char* clusterFilePath);
fdb_error_t (*setNetworkOption)(FDBNetworkOptions::Option option, uint8_t const* value, int valueLength);
fdb_error_t (*setupNetwork)();
fdb_error_t (*runNetwork)();
@ -81,6 +82,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
uint8_t const* snapshotCommmand,
int snapshotCommandLength);
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
// Transaction
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
@ -185,6 +187,8 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
fdb_error_t (*futureGetCluster)(FDBFuture* f, FDBCluster** outCluster);
};
// An implementation of ITransaction that wraps a transaction object created on an externally loaded client library.
// All API calls to that transaction are routed through the external library.
class DLTransaction : public ITransaction, ThreadSafeReferenceCounted<DLTransaction> {
public:
DLTransaction(Reference<FdbCApi> api, FdbCApi::FDBTransaction* tr) : api(api), tr(tr) {}
@ -249,6 +253,8 @@ private:
FdbCApi::FDBTransaction* const tr;
};
// An implementation of IDatabase that wraps a database object created on an externally loaded client library.
// All API calls to that database are routed through the external library.
class DLDatabase : public IDatabase, ThreadSafeReferenceCounted<DLDatabase> {
public:
DLDatabase(Reference<FdbCApi> api, FdbCApi::FDBDatabase* db) : api(api), db(db), ready(Void()) {}
@ -265,6 +271,11 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
@ -279,13 +290,14 @@ private:
ThreadFuture<Void> ready;
};
// An implementation of IClientApi that re-issues API calls to the C API of an externally loaded client library.
// The DL prefix stands for "dynamic library".
class DLApi : public IClientApi {
public:
DLApi(std::string fdbCPath, bool unlinkOnLoad = false);
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;
@ -312,6 +324,9 @@ private:
class MultiVersionDatabase;
// An implementation of ITransaction that wraps a transaction created either locally or through a dynamically loaded
// external client. When needed (e.g on cluster version change), the MultiVersionTransaction can automatically replace
// its wrapped transaction with one from another client.
class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<MultiVersionTransaction> {
public:
MultiVersionTransaction(Reference<MultiVersionDatabase> db,
@ -413,6 +428,9 @@ struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
class MultiVersionApi;
// An implementation of IDatabase that wraps a database created either locally or through a dynamically loaded
// external client. The MultiVersionDatabase monitors the protocol version of the cluster and automatically
// replaces the wrapped database when the protocol version changes.
class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
public:
MultiVersionDatabase(MultiVersionApi* api,
@ -426,6 +444,11 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
@ -487,15 +510,19 @@ private:
Mutex optionLock;
};
std::string clusterFilePath;
const Reference<DatabaseState> dbState;
friend class MultiVersionTransaction;
};
// An implementation of IClientApi that can choose between multiple different client implementations either provided
// locally within the primary loaded fdb_c client or through any number of dynamically loaded clients.
//
// This functionality is used to provide support for multiple protocol versions simultaneously.
class MultiVersionApi : public IClientApi {
public:
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;

View File

@ -36,6 +36,7 @@
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/JsonBuilder.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h"
@ -505,12 +506,13 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
}
}
cx->clientStatusUpdater.outStatusQ.clear();
double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate)
? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY
: cx->clientInfo->get().clientTxnInfoSampleRate;
int64_t clientTxnInfoSizeLimit = cx->clientInfo->get().clientTxnInfoSizeLimit == -1
? CLIENT_KNOBS->CSI_SIZE_LIMIT
: cx->clientInfo->get().clientTxnInfoSizeLimit;
wait(GlobalConfig::globalConfig().onInitialized());
double sampleRate = GlobalConfig::globalConfig().get<double>(fdbClientInfoTxnSampleRate,
std::numeric_limits<double>::infinity());
double clientSamplingProbability =
std::isinf(sampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : sampleRate;
int64_t sizeLimit = GlobalConfig::globalConfig().get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
int64_t clientTxnInfoSizeLimit = sizeLimit == -1 ? CLIENT_KNOBS->CSI_SIZE_LIMIT : sizeLimit;
if (!trChunksQ.empty() && deterministicRandom()->random01() < clientSamplingProbability)
wait(delExcessClntTxnEntriesActor(&tr, clientTxnInfoSizeLimit));
@ -956,6 +958,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
GlobalConfig::create(this, clientInfo);
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this);
@ -1018,9 +1022,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
singleKeyRange(LiteralStringRef("consistency_check_suspended"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::TRACING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
SpecialKeySpace::MODULE::GLOBALCONFIG, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<GlobalConfigImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::TRACING, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READWRITE,
@ -1266,14 +1274,16 @@ Future<Void> DatabaseContext::onProxiesChanged() {
}
bool DatabaseContext::sampleReadTags() const {
return clientInfo->get().transactionTagSampleRate > 0 &&
deterministicRandom()->random01() <= clientInfo->get().transactionTagSampleRate;
double sampleRate = GlobalConfig::globalConfig().get(transactionTagSampleRate, CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
return sampleRate > 0 && deterministicRandom()->random01() <= sampleRate;
}
bool DatabaseContext::sampleOnCost(uint64_t cost) const {
if (clientInfo->get().transactionTagSampleCost <= 0)
double sampleCost =
GlobalConfig::globalConfig().get<double>(transactionTagSampleCost, CLIENT_KNOBS->COMMIT_SAMPLE_COST);
if (sampleCost <= 0)
return false;
return deterministicRandom()->random01() <= (double)cost / clientInfo->get().transactionTagSampleCost;
return deterministicRandom()->random01() <= (double)cost / sampleCost;
}
int64_t extractIntOption(Optional<StringRef> value, int64_t minValue, int64_t maxValue) {
@ -4900,9 +4910,18 @@ ACTOR Future<ProtocolVersion> coordinatorProtocolsFetcher(Reference<ClusterConne
return ProtocolVersion(majorityProtocol);
}
ACTOR Future<uint64_t> getCoordinatorProtocols(Reference<ClusterConnectionFile> f) {
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f));
return protocolVersion.version();
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion) {
loop {
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f));
if (!expectedVersion.present() || protocolVersion != expectedVersion.get()) {
return protocolVersion;
} else {
wait(delay(2.0)); // TODO: this is temporary, so not making into a knob yet
}
}
}
uint32_t Transaction::getSize() {
@ -5370,9 +5389,8 @@ void Transaction::checkDeferredError() {
Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) {
if (!cx->isError()) {
double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate)
? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY
: cx->clientInfo->get().clientTxnInfoSampleRate;
double clientSamplingProbability = GlobalConfig::globalConfig().get<double>(
fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY);
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
deterministicRandom()->random01() < clientSamplingProbability &&
(!g_network->isSimulated() || !g_simulator.speedUpSimulation)) {

View File

@ -400,7 +400,10 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
ACTOR Future<uint64_t> getCoordinatorProtocols(Reference<ClusterConnectionFile> f);
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion);
inline uint64_t getWriteOperationCost(uint64_t bytes) {
return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1;

View File

@ -21,7 +21,7 @@
#include "boost/lexical_cast.hpp"
#include "boost/algorithm/string.hpp"
#include "fdbclient/Knobs.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "flow/Arena.h"
#include "flow/UnitTest.h"
@ -64,6 +64,8 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
{ SpecialKeySpace::MODULE::ERRORMSG, singleKeyRange(LiteralStringRef("\xff\xff/error_message")) },
{ SpecialKeySpace::MODULE::CONFIGURATION,
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/"), LiteralStringRef("\xff\xff/configuration0")) },
{ SpecialKeySpace::MODULE::GLOBALCONFIG,
KeyRangeRef(LiteralStringRef("\xff\xff/global_config/"), LiteralStringRef("\xff\xff/global_config0")) },
{ SpecialKeySpace::MODULE::TRACING,
KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) }
};
@ -1369,10 +1371,129 @@ Future<Optional<std::string>> ConsistencyCheckImpl::commit(ReadYourWritesTransac
return Optional<std::string>();
}
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {
TraceEvent("TracingOptionsImpl::TracingOptionsImpl").detail("Range", kr);
GlobalConfigImpl::GlobalConfigImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
// Returns key-value pairs for each value stored in the global configuration
// framework within the range specified. The special-key-space getrange
// function should only be used for informational purposes. All values are
// returned as strings regardless of their true type.
Future<Standalone<RangeResultRef>> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
auto& globalConfig = GlobalConfig::globalConfig();
KeyRangeRef modified =
KeyRangeRef(kr.begin.removePrefix(getKeyRange().begin), kr.end.removePrefix(getKeyRange().begin));
std::map<KeyRef, Reference<ConfigValue>> values = globalConfig.get(modified);
for (const auto& [key, config] : values) {
Key prefixedKey = key.withPrefix(getKeyRange().begin);
if (config.isValid() && config->value.has_value()) {
if (config->value.type() == typeid(StringRef)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::any_cast<StringRef>(config->value).toString()));
} else if (config->value.type() == typeid(int64_t)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(config->value))));
} else if (config->value.type() == typeid(float)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(config->value))));
} else if (config->value.type() == typeid(double)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<double>(config->value))));
} else {
ASSERT(false);
}
}
}
return result;
}
// Marks the key for insertion into global configuration.
void GlobalConfigImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(value)));
}
// Writes global configuration changes to durable memory. Also writes the
// changes made in the transaction to a recent history set, and updates the
// latest version which the global configuration was updated at.
ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* globalConfig,
ReadYourWritesTransaction* ryw) {
state Transaction& tr = ryw->getTransaction();
// History should only contain three most recent updates. If it currently
// has three items, remove the oldest to make room for a new item.
Standalone<RangeResultRef> history = wait(tr.getRange(globalConfigHistoryKeys, CLIENT_KNOBS->TOO_MANY));
constexpr int kGlobalConfigMaxHistorySize = 3;
if (history.size() > kGlobalConfigMaxHistorySize - 1) {
for (int i = 0; i < history.size() - (kGlobalConfigMaxHistorySize - 1); ++i) {
tr.clear(history[i].key);
}
}
VersionHistory vh;
// Transform writes from the special-key-space (\xff\xff/global_config/) to
// the system key space (\xff/globalConfig/), and writes mutations to
// latest version history.
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys);
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::iterator iter = ranges.begin();
while (iter != ranges.end()) {
std::pair<bool, Optional<Value>> entry = iter->value();
if (entry.first) {
if (entry.second.present() && iter->begin().startsWith(globalConfig->getKeyRange().begin)) {
Key bareKey = iter->begin().removePrefix(globalConfig->getKeyRange().begin);
vh.mutations.emplace_back_deep(vh.mutations.arena(),
MutationRef(MutationRef::SetValue, bareKey, entry.second.get()));
Key systemKey = bareKey.withPrefix(globalConfigKeysPrefix);
tr.set(systemKey, entry.second.get());
} else if (!entry.second.present() && iter->range().begin.startsWith(globalConfig->getKeyRange().begin) &&
iter->range().end.startsWith(globalConfig->getKeyRange().begin)) {
KeyRef bareRangeBegin = iter->range().begin.removePrefix(globalConfig->getKeyRange().begin);
KeyRef bareRangeEnd = iter->range().end.removePrefix(globalConfig->getKeyRange().begin);
vh.mutations.emplace_back_deep(vh.mutations.arena(),
MutationRef(MutationRef::ClearRange, bareRangeBegin, bareRangeEnd));
Key systemRangeBegin = bareRangeBegin.withPrefix(globalConfigKeysPrefix);
Key systemRangeEnd = bareRangeEnd.withPrefix(globalConfigKeysPrefix);
tr.clear(KeyRangeRef(systemRangeBegin, systemRangeEnd));
}
}
++iter;
}
// Record the mutations in this commit into the global configuration history.
Key historyKey = addVersionStampAtEnd(globalConfigHistoryPrefix);
ObjectWriter historyWriter(IncludeVersion());
historyWriter.serialize(vh);
tr.atomicOp(historyKey, historyWriter.toStringRef(), MutationRef::SetVersionstampedKey);
// Write version key to trigger update in cluster controller.
tr.atomicOp(globalConfigVersionKey,
LiteralStringRef("0123456789\x00\x00\x00\x00"), // versionstamp
MutationRef::SetVersionstampedValue);
return Optional<std::string>();
}
// Called when a transaction includes keys in the global configuration special-key-space range.
Future<Optional<std::string>> GlobalConfigImpl::commit(ReadYourWritesTransaction* ryw) {
return globalConfigCommitActor(this, ryw);
}
// Marks the range for deletion from global configuration.
void GlobalConfigImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
}
// Marks the key for deletion from global configuration.
void GlobalConfigImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
}
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
for (const auto& option : SpecialKeySpace::getTracingOptions()) {

View File

@ -146,6 +146,7 @@ public:
CONFIGURATION, // Configuration of the cluster
CONNECTIONSTRING,
ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space
GLOBALCONFIG, // Global configuration options synchronized to all nodes
MANAGEMENT, // Management-API
METRICS, // data-distribution metrics
TESTONLY, // only used by correctness tests
@ -336,6 +337,16 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class GlobalConfigImpl : public SpecialKeyRangeRWImpl {
public:
explicit GlobalConfigImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
};
class TracingOptionsImpl : public SpecialKeyRangeRWImpl {
public:
explicit TracingOptionsImpl(KeyRangeRef kr);

View File

@ -632,7 +632,18 @@ std::string encodeFailedServersKey(AddressExclusion const& addr) {
return failedServersPrefix.toString() + addr.toString();
}
const KeyRangeRef workerListKeys(LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0"));
// const KeyRangeRef globalConfigKeys( LiteralStringRef("\xff/globalConfig/"), LiteralStringRef("\xff/globalConfig0") );
// const KeyRef globalConfigPrefix = globalConfigKeys.begin;
const KeyRangeRef globalConfigDataKeys( LiteralStringRef("\xff/globalConfig/k/"), LiteralStringRef("\xff/globalConfig/k0") );
const KeyRef globalConfigKeysPrefix = globalConfigDataKeys.begin;
const KeyRangeRef globalConfigHistoryKeys( LiteralStringRef("\xff/globalConfig/h/"), LiteralStringRef("\xff/globalConfig/h0") );
const KeyRef globalConfigHistoryPrefix = globalConfigHistoryKeys.begin;
const KeyRef globalConfigVersionKey = LiteralStringRef("\xff/globalConfig/v");
const KeyRangeRef workerListKeys( LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0") );
const KeyRef workerListPrefix = workerListKeys.begin;
const Key workerListKeyFor(StringRef processID) {
@ -748,8 +759,7 @@ const KeyRef tagThrottleCountKey = LiteralStringRef("\xff\x02/throttledTags/manu
// Client status info prefix
const KeyRangeRef fdbClientInfoPrefixRange(LiteralStringRef("\xff\x02/fdbClientInfo/"),
LiteralStringRef("\xff\x02/fdbClientInfo0"));
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_sample_rate/");
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_size_limit/");
// See remaining fields in GlobalConfig.actor.h
// ConsistencyCheck settings
const KeyRef fdbShouldConsistencyCheckBeSuspended = LiteralStringRef("\xff\x02/ConsistencyCheck/Suspend");

View File

@ -230,6 +230,30 @@ extern const KeyRef failedServersVersionKey; // The value of this key shall be c
const AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix)
std::string encodeFailedServersKey(AddressExclusion const&);
// "\xff/globalConfig/[[option]]" := "value"
// An umbrella prefix for global configuration data synchronized to all nodes.
// extern const KeyRangeRef globalConfigData;
// extern const KeyRef globalConfigDataPrefix;
// "\xff/globalConfig/k/[[key]]" := "value"
// Key-value pairs that have been set. The range this keyspace represents
// contains all globally configured options.
extern const KeyRangeRef globalConfigDataKeys;
extern const KeyRef globalConfigKeysPrefix;
// "\xff/globalConfig/h/[[version]]" := "value"
// Maps a commit version to a list of mutations made to the global
// configuration at that commit. Shipped to nodes periodically. In general,
// clients should not write to keys in this keyspace; it will be written
// automatically when updating global configuration keys.
extern const KeyRangeRef globalConfigHistoryKeys;
extern const KeyRef globalConfigHistoryPrefix;
// "\xff/globalConfig/v" := "version"
// Read-only key which returns the commit version of the most recent mutation
// made to the global configuration keyspace.
extern const KeyRef globalConfigVersionKey;
// "\xff/workers/[[processID]]" := ""
// Asynchronously updated by the cluster controller, this is a list of fdbserver processes that have joined the cluster
// and are currently (recently) available
@ -355,8 +379,6 @@ extern const KeyRangeRef applyMutationsKeyVersionCountRange;
// FdbClient Info prefix
extern const KeyRangeRef fdbClientInfoPrefixRange;
extern const KeyRef fdbClientInfoTxnSampleRate;
extern const KeyRef fdbClientInfoTxnSizeLimit;
// Consistency Check settings
extern const KeyRef fdbShouldConsistencyCheckBeSuspended;

View File

@ -97,6 +97,15 @@ double ThreadSafeDatabase::getMainThreadBusyness() {
return g_network->networkInfo.metrics.networkBusyness;
}
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
DatabaseContext* db = this->db;
return onMainThread([db, expectedVersion]() -> Future<ProtocolVersion> {
return getClusterProtocol(db->getConnectionFile(), expectedVersion);
});
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);
@ -407,16 +416,6 @@ const char* ThreadSafeApi::getClientVersion() {
return clientVersion.c_str();
}
// Wait until a quorum of coordinators with the same protocol version are available, and then return that protocol
// version.
ThreadFuture<uint64_t> ThreadSafeApi::getServerProtocol(const char* clusterFilePath) {
return onMainThread([clusterFilePath = std::string(clusterFilePath)]() -> Future<uint64_t> {
auto [clusterFile, isDefault] = ClusterConnectionFile::lookupClusterFileName(clusterFilePath);
Reference<ClusterConnectionFile> f = Reference<ClusterConnectionFile>(new ClusterConnectionFile(clusterFile));
return getCoordinatorProtocols(f);
});
}
void ThreadSafeApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
if (option == FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID) {
if (value.present()) {

View File

@ -27,6 +27,8 @@
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/IClientApi.h"
// An implementation of IDatabase that serializes operations onto the network thread and interacts with the lower-level
// client APIs exposed by NativeAPI and ReadYourWrites.
class ThreadSafeDatabase : public IDatabase, public ThreadSafeReferenceCounted<ThreadSafeDatabase> {
public:
~ThreadSafeDatabase() override;
@ -37,9 +39,14 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
ThreadFuture<Void>
onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The
// cluster file therefore is valid, but the database might be unavailable.
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
// Returns after a majority of coordination servers are available and have reported a leader. The
// cluster file therefore is valid, but the database might be unavailable.
ThreadFuture<Void> onConnected();
void addref() override { ThreadSafeReferenceCounted<ThreadSafeDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<ThreadSafeDatabase>::delref(); }
@ -58,6 +65,8 @@ public: // Internal use only
DatabaseContext* unsafeGetPtr() const { return db; }
};
// An implementation of ITransaction that serializes operations onto the network thread and interacts with the
// lower-level client APIs exposed by NativeAPI and ReadYourWrites.
class ThreadSafeTransaction : public ITransaction, ThreadSafeReferenceCounted<ThreadSafeTransaction>, NonCopyable {
public:
explicit ThreadSafeTransaction(DatabaseContext* cx);
@ -135,11 +144,12 @@ private:
ReadYourWritesTransaction* tr;
};
// An implementation of IClientApi that serializes operations onto the network thread and interacts with the lower-level
// client APIs exposed by NativeAPI and ReadYourWrites.
class ThreadSafeApi : public IClientApi, ThreadSafeReferenceCounted<ThreadSafeApi> {
public:
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;

View File

@ -20,7 +20,20 @@
#include "fdbclient/Tuple.h"
static size_t find_string_terminator(const StringRef data, size_t offset) {
// TODO: Many functions copied from bindings/flow/Tuple.cpp. Merge at some point.
static float bigEndianFloat(float orig) {
int32_t big = *(int32_t*)&orig;
big = bigEndian32(big);
return *(float*)&big;
}
static double bigEndianDouble(double orig) {
int64_t big = *(int64_t*)&orig;
big = bigEndian64(big);
return *(double*)&big;
}
static size_t findStringTerminator(const StringRef data, size_t offset) {
size_t i = offset;
while (i < data.size() - 1 && !(data[i] == '\x00' && data[i + 1] != (uint8_t)'\xff')) {
i += (data[i] == '\x00' ? 2 : 1);
@ -29,6 +42,20 @@ static size_t find_string_terminator(const StringRef data, size_t offset) {
return i;
}
// If encoding and the sign bit is 1 (the number is negative), flip all the bits.
// If decoding and the sign bit is 0 (the number is negative), flip all the bits.
// Otherwise, the number is positive, so flip the sign bit.
static void adjustFloatingPoint(uint8_t* bytes, size_t size, bool encode) {
if ((encode && ((uint8_t)(bytes[0] & 0x80) != (uint8_t)0x00)) ||
(!encode && ((uint8_t)(bytes[0] & 0x80) != (uint8_t)0x80))) {
for (size_t i = 0; i < size; i++) {
bytes[i] ^= (uint8_t)0xff;
}
} else {
bytes[0] ^= (uint8_t)0x80;
}
}
Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
data.append(data.arena(), str.begin(), str.size());
@ -37,9 +64,13 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
offsets.push_back(i);
if (data[i] == '\x01' || data[i] == '\x02') {
i = find_string_terminator(str, i + 1) + 1;
i = findStringTerminator(str, i + 1) + 1;
} else if (data[i] >= '\x0c' && data[i] <= '\x1c') {
i += abs(data[i] - '\x14') + 1;
} else if (data[i] == 0x20) {
i += sizeof(float) + 1;
} else if (data[i] == 0x21) {
i += sizeof(double) + 1;
} else if (data[i] == '\x00') {
i += 1;
} else {
@ -113,6 +144,29 @@ Tuple& Tuple::append(int64_t value) {
return *this;
}
Tuple& Tuple::appendFloat(float value) {
offsets.push_back(data.size());
float swap = bigEndianFloat(value);
uint8_t* bytes = (uint8_t*)&swap;
adjustFloatingPoint(bytes, sizeof(float), true);
data.push_back(data.arena(), 0x20);
data.append(data.arena(), bytes, sizeof(float));
return *this;
}
Tuple& Tuple::appendDouble(double value) {
offsets.push_back(data.size());
double swap = value;
swap = bigEndianDouble(swap);
uint8_t* bytes = (uint8_t*)&swap;
adjustFloatingPoint(bytes, sizeof(double), true);
data.push_back(data.arena(), 0x21);
data.append(data.arena(), bytes, sizeof(double));
return *this;
}
Tuple& Tuple::appendNull() {
offsets.push_back(data.size());
data.push_back(data.arena(), (uint8_t)'\x00');
@ -134,6 +188,10 @@ Tuple::ElementType Tuple::getType(size_t index) const {
return ElementType::UTF8;
} else if (code >= '\x0c' && code <= '\x1c') {
return ElementType::INT;
} else if (code == 0x20) {
return ElementType::FLOAT;
} else if (code == 0x21) {
return ElementType::DOUBLE;
} else {
throw invalid_tuple_data_type();
}
@ -228,6 +286,45 @@ int64_t Tuple::getInt(size_t index, bool allow_incomplete) const {
return swap;
}
// TODO: Combine with bindings/flow/Tuple.*. This code is copied from there.
float Tuple::getFloat(size_t index) const {
if (index >= offsets.size()) {
throw invalid_tuple_index();
}
ASSERT_LT(offsets[index], data.size());
uint8_t code = data[offsets[index]];
if (code != 0x20) {
throw invalid_tuple_data_type();
}
float swap;
uint8_t* bytes = (uint8_t*)&swap;
ASSERT_LE(offsets[index] + 1 + sizeof(float), data.size());
swap = *(float*)(data.begin() + offsets[index] + 1);
adjustFloatingPoint(bytes, sizeof(float), false);
return bigEndianFloat(swap);
}
double Tuple::getDouble(size_t index) const {
if (index >= offsets.size()) {
throw invalid_tuple_index();
}
ASSERT_LT(offsets[index], data.size());
uint8_t code = data[offsets[index]];
if (code != 0x21) {
throw invalid_tuple_data_type();
}
double swap;
uint8_t* bytes = (uint8_t*)&swap;
ASSERT_LE(offsets[index] + 1 + sizeof(double), data.size());
swap = *(double*)(data.begin() + offsets[index] + 1);
adjustFloatingPoint(bytes, sizeof(double), false);
return bigEndianDouble(swap);
}
KeyRange Tuple::range(Tuple const& tuple) const {
VectorRef<uint8_t> begin;
VectorRef<uint8_t> end;

View File

@ -38,6 +38,10 @@ struct Tuple {
Tuple& append(Tuple const& tuple);
Tuple& append(StringRef const& str, bool utf8 = false);
Tuple& append(int64_t);
// There are some ambiguous append calls in fdbclient, so to make it easier
// to add append for floats and doubles, name them differently for now.
Tuple& appendFloat(float);
Tuple& appendDouble(double);
Tuple& appendNull();
StringRef pack() const { return StringRef(data.begin(), data.size()); }
@ -47,7 +51,7 @@ struct Tuple {
return append(t);
}
enum ElementType { NULL_TYPE, INT, BYTES, UTF8 };
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, FLOAT, DOUBLE };
// this is number of elements, not length of data
size_t size() const { return offsets.size(); }
@ -55,6 +59,8 @@ struct Tuple {
ElementType getType(size_t index) const;
Standalone<StringRef> getString(size_t index) const;
int64_t getInt(size_t index, bool allow_incomplete = false) const;
float getFloat(size_t index) const;
double getDouble(size_t index) const;
KeyRange range(Tuple const& tuple = Tuple()) const;

View File

@ -41,6 +41,7 @@
#include "fdbserver/Status.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbserver/RecoveryState.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbrpc/Replication.h"
@ -2715,8 +2716,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
clientInfo.id = deterministicRandom()->randomUniqueID();
clientInfo.commitProxies = req.commitProxies;
clientInfo.grvProxies = req.grvProxies;
clientInfo.clientTxnInfoSampleRate = db->clientInfo->get().clientTxnInfoSampleRate;
clientInfo.clientTxnInfoSizeLimit = db->clientInfo->get().clientTxnInfoSizeLimit;
db->clientInfo->set(clientInfo);
dbInfo.client = db->clientInfo->get();
}
@ -3191,36 +3190,84 @@ ACTOR Future<Void> monitorServerInfoConfig(ClusterControllerData::DBInfo* db) {
}
}
ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db) {
// Monitors the global configuration version key for changes. When changes are
// made, the global configuration history is read and any updates are sent to
// all processes in the system by updating the ClientDBInfo object. The
// GlobalConfig actor class contains the functionality to read the latest
// history and update the processes local view.
ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
loop {
state ReadYourWritesTransaction tr(db->db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Optional<Value> rateVal = wait(tr.get(fdbClientInfoTxnSampleRate));
state Optional<Value> limitVal = wait(tr.get(fdbClientInfoTxnSizeLimit));
ClientDBInfo clientInfo = db->clientInfo->get();
double sampleRate = rateVal.present()
? BinaryReader::fromStringRef<double>(rateVal.get(), Unversioned())
: std::numeric_limits<double>::infinity();
int64_t sizeLimit =
limitVal.present() ? BinaryReader::fromStringRef<int64_t>(limitVal.get(), Unversioned()) : -1;
if (sampleRate != clientInfo.clientTxnInfoSampleRate ||
sizeLimit != clientInfo.clientTxnInfoSampleRate) {
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
state ClientDBInfo clientInfo = db->clientInfo->get();
if (globalConfigVersion.present()) {
// Since the history keys end with versionstamps, they
// should be sorted correctly (versionstamps are stored in
// big-endian order).
Standalone<RangeResultRef> globalConfigHistory =
wait(tr.getRange(globalConfigHistoryKeys, CLIENT_KNOBS->TOO_MANY));
// If the global configuration version key has been set,
// the history should contain at least one item.
ASSERT(globalConfigHistory.size() > 0);
clientInfo.history.clear();
for (const auto& kv : globalConfigHistory) {
ObjectReader reader(kv.value.begin(), IncludeVersion());
if (reader.protocolVersion() != g_network->protocolVersion()) {
// If the protocol version has changed, the
// GlobalConfig actor should refresh its view by
// reading the entire global configuration key
// range. Setting the version to the max int64_t
// will always cause the global configuration
// updater to refresh its view of the configuration
// keyspace.
clientInfo.history.clear();
clientInfo.history.emplace_back(std::numeric_limits<Version>::max());
break;
}
VersionHistory vh;
reader.deserialize(vh);
// Read commit version out of versionstamp at end of key.
BinaryReader versionReader =
BinaryReader(kv.key.removePrefix(globalConfigHistoryPrefix), Unversioned());
Version historyCommitVersion;
versionReader >> historyCommitVersion;
historyCommitVersion = bigEndian64(historyCommitVersion);
vh.version = historyCommitVersion;
clientInfo.history.push_back(std::move(vh));
}
if (clientInfo.history.size() > 0) {
// The first item in the historical list of mutations
// is only used to:
// a) Recognize that some historical changes may have
// been missed, and the entire global
// configuration keyspace needs to be read, or..
// b) Check which historical updates have already
// been applied. If this is the case, the first
// history item must have a version greater than
// or equal to whatever version the global
// configuration was last updated at, and
// therefore won't need to be applied again.
clientInfo.history[0].mutations = Standalone<VectorRef<MutationRef>>();
}
clientInfo.id = deterministicRandom()->randomUniqueID();
clientInfo.clientTxnInfoSampleRate = sampleRate;
clientInfo.clientTxnInfoSizeLimit = sizeLimit;
db->clientInfo->set(clientInfo);
}
state Future<Void> watchRateFuture = tr.watch(fdbClientInfoTxnSampleRate);
state Future<Void> watchLimitFuture = tr.watch(fdbClientInfoTxnSizeLimit);
state Future<Void> globalConfigFuture = tr.watch(globalConfigVersionKey);
wait(tr.commit());
choose {
when(wait(watchRateFuture)) { break; }
when(wait(watchLimitFuture)) { break; }
}
wait(globalConfigFuture);
break;
} catch (Error& e) {
wait(tr.onError(e));
}
@ -3686,7 +3733,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(timeKeeper(&self));
self.addActor.send(monitorProcessClasses(&self));
self.addActor.send(monitorServerInfoConfig(&self.db));
self.addActor.send(monitorClientTxnInfoConfigs(&self.db));
self.addActor.send(monitorGlobalConfig(&self.db));
self.addActor.send(updatedChangingDatacenters(&self));
self.addActor.send(updatedChangedDatacenters(&self));
self.addActor.send(updateDatacenterVersionDifference(&self));

View File

@ -1,5 +1,6 @@
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "flow/actorcompiler.h" // has to be last include
@ -269,10 +270,12 @@ struct ClientTransactionProfileCorrectnessWorkload : TestWorkload {
ACTOR Future<Void> changeProfilingParameters(Database cx, int64_t sizeLimit, double sampleProbability) {
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->set(fdbClientInfoTxnSampleRate, BinaryWriter::toValue(sampleProbability, Unversioned()));
tr->set(fdbClientInfoTxnSizeLimit, BinaryWriter::toValue(sizeLimit, Unversioned()));
Tuple rate = Tuple().appendDouble(sampleProbability);
Tuple size = Tuple().append(sizeLimit);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
return Void();
}));
return Void();

View File

@ -21,6 +21,7 @@
#include "boost/lexical_cast.hpp"
#include "boost/algorithm/string.hpp"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"

View File

@ -481,7 +481,8 @@ public:
enBlobCredentialFiles = 10,
enNetworkAddressesFunc = 11,
enClientFailureMonitor = 12,
enSQLiteInjectedError = 13
enSQLiteInjectedError = 13,
enGlobalConfig = 14
};
virtual void longTaskCheck(const char* name) {}