Merge remote-tracking branch 'origin/master' into features/actor-lineage

This commit is contained in:
Markus Pilman 2021-04-19 11:29:52 -06:00
commit 7307750e5e
57 changed files with 1922 additions and 622 deletions

View File

@ -36,7 +36,7 @@ Members of the Apple FoundationDB team are part of the core committers helping r
## Contributing ## Contributing
### Opening a Pull Request ### Opening a Pull Request
We love pull requests! For minor changes, feel free to open up a PR directly. For larger feature development and any changes that may require community discussion, we ask that you discuss your ideas on the [community forums](https://forums.foundationdb.org) prior to opening a PR, and then reference that thread within your PR comment. We love pull requests! For minor changes, feel free to open up a PR directly. For larger feature development and any changes that may require community discussion, we ask that you discuss your ideas on the [community forums](https://forums.foundationdb.org) prior to opening a PR, and then reference that thread within your PR comment. Please refer to [FoundationDB Commit Process](https://github.com/apple/foundationdb/wiki/FoundationDB-Commit-Process) for more detailed guidelines.
CI will be run automatically for core committers, and for community PRs it will be initiated by the request of a core committer. Tests can also be run locally via `ctest`, and core committers can run additional validation on pull requests prior to merging them. CI will be run automatically for core committers, and for community PRs it will be initiated by the request of a core committer. Tests can also be run locally via `ctest`, and core committers can run additional validation on pull requests prior to merging them.

View File

@ -364,6 +364,17 @@ extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d
return DB(d)->getMainThreadBusyness(); return DB(d)->getMainThreadBusyness();
} }
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is non-zero, the future won't return until the protocol version is different than expected
extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version) {
Optional<ProtocolVersion> expected;
if (expected_version > 0) {
expected = ProtocolVersion(expected_version);
}
return (FDBFuture*)(DB(db)->getServerProtocol(expected).extractPtr());
}
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) { extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {
try { try {
TXN(tr)->delref(); TXN(tr)->delref();
@ -583,10 +594,6 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_approximate_size(FDBTransact
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr(); return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
} }
extern "C" DLLEXPORT FDBFuture* fdb_get_server_protocol(const char* clusterFilePath) {
return (FDBFuture*)(API->getServerProtocol(clusterFilePath ? clusterFilePath : "").extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr) { extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr) {
return (FDBFuture*)(TXN(tr)->getVersionstamp().extractPtr()); return (FDBFuture*)(TXN(tr)->getVersionstamp().extractPtr());
} }

View File

@ -189,6 +189,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_create_snapshot(FDBDatabase
DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db); DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version);
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr); DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);
DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr); DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr);
@ -281,8 +283,6 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_transaction_get_committed_version(F
*/ */
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr); DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_get_server_protocol(const char* clusterFilePath);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr); DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_on_error(FDBTransaction* tr, fdb_error_t error); DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_on_error(FDBTransaction* tr, fdb_error_t error);

View File

@ -1513,17 +1513,17 @@ TEST_CASE("fdb_transaction_get_approximate_size") {
} }
} }
TEST_CASE("fdb_get_server_protocol") { TEST_CASE("fdb_database_get_server_protocol") {
// We don't really have any expectations other than "don't crash" here // We don't really have any expectations other than "don't crash" here
FDBFuture* protocolFuture = fdb_get_server_protocol(clusterFilePath.c_str()); FDBFuture* protocolFuture = fdb_database_get_server_protocol(db, 0);
uint64_t out; uint64_t out;
fdb_check(fdb_future_block_until_ready(protocolFuture)); fdb_check(fdb_future_block_until_ready(protocolFuture));
fdb_check(fdb_future_get_uint64(protocolFuture, &out)); fdb_check(fdb_future_get_uint64(protocolFuture, &out));
fdb_future_destroy(protocolFuture); fdb_future_destroy(protocolFuture);
// "Default" cluster file version // Passing in an expected version that's different than the cluster version
protocolFuture = fdb_get_server_protocol(nullptr); protocolFuture = fdb_database_get_server_protocol(db, 0x0FDB00A200090000LL);
fdb_check(fdb_future_block_until_ready(protocolFuture)); fdb_check(fdb_future_block_until_ready(protocolFuture));
fdb_check(fdb_future_get_uint64(protocolFuture, &out)); fdb_check(fdb_future_get_uint64(protocolFuture, &out));
fdb_future_destroy(protocolFuture); fdb_future_destroy(protocolFuture);

View File

@ -95,7 +95,6 @@ def api_version(ver):
'transactional', 'transactional',
'options', 'options',
'StreamingMode', 'StreamingMode',
'get_server_protocol'
) )
_add_symbols(fdb.impl, list) _add_symbols(fdb.impl, list)

View File

@ -1531,9 +1531,6 @@ def init_c_api():
_capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p] _capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p _capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p
_capi.fdb_get_server_protocol.argtypes = [ctypes.c_char_p]
_capi.fdb_get_server_protocol.restype = ctypes.c_void_p
_capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p] _capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p _capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p
@ -1733,13 +1730,6 @@ open_databases = {}
cacheLock = threading.Lock() cacheLock = threading.Lock()
def get_server_protocol(clusterFilePath=None):
with _network_thread_reentrant_lock:
if not _network_thread:
init()
return FutureUInt64(_capi.fdb_get_server_protocol(optionalParamToBytes(clusterFilePath)[0]))
def open(cluster_file=None, event_model=None): def open(cluster_file=None, event_model=None):
"""Opens the given database (or the default database of the cluster indicated """Opens the given database (or the default database of the cluster indicated
by the fdb.cluster file in a platform-specific location, if no cluster_file by the fdb.cluster file in a platform-specific location, if no cluster_file

View File

@ -76,4 +76,9 @@ RUN rm -f /root/anaconda-ks.cfg && \
' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \ ' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \
'}' \ '}' \
'' \ '' \
>> .bashrc 'USER_BASHRC="$HOME/src/.bashrc.local"' \
'if test -f "$USER_BASHRC"; then' \
' source $USER_BASHRC' \
'fi' \
'' \
>> .bashrc

View File

@ -104,5 +104,10 @@ RUN rm -f /root/anaconda-ks.cfg && \
' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \ ' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \
'}' \ '}' \
'' \ '' \
'USER_BASHRC="$HOME/src/.bashrc.local"' \
'if test -f "$USER_BASHRC"; then' \
' source $USER_BASHRC' \
'fi' \
'' \
'bash ${HOME}/docker_proxy.sh' \ 'bash ${HOME}/docker_proxy.sh' \
>> .bashrc >> .bashrc

View File

@ -24,6 +24,7 @@
#include "fdbclient/Status.h" #include "fdbclient/Status.h"
#include "fdbclient/StatusClient.h" #include "fdbclient/StatusClient.h"
#include "fdbclient/DatabaseContext.h" #include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/NativeAPI.actor.h" #include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h" #include "fdbclient/ReadYourWrites.h"
#include "fdbclient/ClusterInterface.h" #include "fdbclient/ClusterInterface.h"
@ -3841,25 +3842,16 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
is_error = true; is_error = true;
continue; continue;
} }
state Future<Optional<Standalone<StringRef>>> sampleRateFuture = const double sampleRateDbl = GlobalConfig::globalConfig().get<double>(
tr->get(fdbClientInfoTxnSampleRate); fdbClientInfoTxnSampleRate, std::numeric_limits<double>::infinity());
state Future<Optional<Standalone<StringRef>>> sizeLimitFuture = const int64_t sizeLimit =
tr->get(fdbClientInfoTxnSizeLimit); GlobalConfig::globalConfig().get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
wait(makeInterruptable(success(sampleRateFuture) && success(sizeLimitFuture)));
std::string sampleRateStr = "default", sizeLimitStr = "default"; std::string sampleRateStr = "default", sizeLimitStr = "default";
if (sampleRateFuture.get().present()) { if (!std::isinf(sampleRateDbl)) {
const double sampleRateDbl = sampleRateStr = boost::lexical_cast<std::string>(sampleRateDbl);
BinaryReader::fromStringRef<double>(sampleRateFuture.get().get(), Unversioned());
if (!std::isinf(sampleRateDbl)) {
sampleRateStr = boost::lexical_cast<std::string>(sampleRateDbl);
}
} }
if (sizeLimitFuture.get().present()) { if (sizeLimit != -1) {
const int64_t sizeLimit = sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
BinaryReader::fromStringRef<int64_t>(sizeLimitFuture.get().get(), Unversioned());
if (sizeLimit != -1) {
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
}
} }
printf("Client profiling rate is set to %s and size limit is set to %s.\n", printf("Client profiling rate is set to %s and size limit is set to %s.\n",
sampleRateStr.c_str(), sampleRateStr.c_str(),
@ -3897,8 +3889,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue; continue;
} }
} }
tr->set(fdbClientInfoTxnSampleRate, BinaryWriter::toValue(sampleRate, Unversioned()));
tr->set(fdbClientInfoTxnSizeLimit, BinaryWriter::toValue(sizeLimit, Unversioned())); Tuple rate = Tuple().appendDouble(sampleRate);
Tuple size = Tuple().append(sizeLimit);
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
if (!intrans) { if (!intrans) {
wait(commitTransaction(tr)); wait(commitTransaction(tr));
} }

View File

@ -31,6 +31,9 @@ set(FDBCLIENT_SRCS
FDBOptions.h FDBOptions.h
FDBTypes.h FDBTypes.h
FileBackupAgent.actor.cpp FileBackupAgent.actor.cpp
GlobalConfig.h
GlobalConfig.actor.h
GlobalConfig.actor.cpp
GrvProxyInterface.h GrvProxyInterface.h
HTTP.actor.cpp HTTP.actor.cpp
IClientApi.h IClientApi.h

View File

@ -31,6 +31,7 @@
#include "fdbclient/CommitTransaction.h" #include "fdbclient/CommitTransaction.h"
#include "fdbserver/RatekeeperInterface.h" #include "fdbserver/RatekeeperInterface.h"
#include "fdbclient/TagThrottle.h" #include "fdbclient/TagThrottle.h"
#include "fdbclient/GlobalConfig.h"
#include "fdbrpc/Stats.h" #include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h" #include "fdbrpc/TimedRequest.h"
@ -113,16 +114,10 @@ struct ClientDBInfo {
vector<CommitProxyInterface> commitProxies; vector<CommitProxyInterface> commitProxies;
Optional<CommitProxyInterface> Optional<CommitProxyInterface>
firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk
double clientTxnInfoSampleRate;
int64_t clientTxnInfoSizeLimit;
Optional<Value> forward; Optional<Value> forward;
double transactionTagSampleRate; vector<VersionHistory> history;
double transactionTagSampleCost;
ClientDBInfo() ClientDBInfo() {}
: clientTxnInfoSampleRate(std::numeric_limits<double>::infinity()), clientTxnInfoSizeLimit(-1),
transactionTagSampleRate(CLIENT_KNOBS->READ_TAG_SAMPLE_RATE),
transactionTagSampleCost(CLIENT_KNOBS->COMMIT_SAMPLE_COST) {}
bool operator==(ClientDBInfo const& r) const { return id == r.id; } bool operator==(ClientDBInfo const& r) const { return id == r.id; }
bool operator!=(ClientDBInfo const& r) const { return id != r.id; } bool operator!=(ClientDBInfo const& r) const { return id != r.id; }
@ -132,15 +127,7 @@ struct ClientDBInfo {
if constexpr (!is_fb_function<Archive>) { if constexpr (!is_fb_function<Archive>) {
ASSERT(ar.protocolVersion().isValid()); ASSERT(ar.protocolVersion().isValid());
} }
serializer(ar, serializer(ar, grvProxies, commitProxies, id, forward, history);
grvProxies,
commitProxies,
id,
clientTxnInfoSampleRate,
clientTxnInfoSizeLimit,
forward,
transactionTagSampleRate,
transactionTagSampleCost);
} }
}; };

View File

@ -0,0 +1,229 @@
/*
* GlobalConfig.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tuple.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("config/fdb_client_info/client_txn_sample_rate");
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_info/client_txn_size_limit");
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{};
config->cx = Database(cx);
g_network->setGlobal(INetwork::enGlobalConfig, config);
config->_updater = updater(config, dbInfo);
}
}
GlobalConfig& GlobalConfig::globalConfig() {
void* res = g_network->global(INetwork::enGlobalConfig);
ASSERT(res);
return *reinterpret_cast<GlobalConfig*>(res);
}
Key GlobalConfig::prefixedKey(KeyRef key) {
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
}
const Reference<ConfigValue> GlobalConfig::get(KeyRef name) {
auto it = data.find(name);
if (it == data.end()) {
return Reference<ConfigValue>();
}
return it->second;
}
const std::map<KeyRef, Reference<ConfigValue>> GlobalConfig::get(KeyRangeRef range) {
std::map<KeyRef, Reference<ConfigValue>> results;
for (const auto& [key, value] : data) {
if (range.contains(key)) {
results[key] = value;
}
}
return results;
}
Future<Void> GlobalConfig::onInitialized() {
return initialized.getFuture();
}
void GlobalConfig::insert(KeyRef key, ValueRef value) {
data.erase(key);
Arena arena(key.expectedSize() + value.expectedSize());
KeyRef stableKey = KeyRef(arena, key);
try {
std::any any;
Tuple t = Tuple::unpack(value);
if (t.getType(0) == Tuple::ElementType::UTF8) {
any = StringRef(arena, t.getString(0).contents());
} else if (t.getType(0) == Tuple::ElementType::INT) {
any = t.getInt(0);
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
any = t.getFloat(0);
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
any = t.getDouble(0);
} else {
ASSERT(false);
}
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
} catch (Error& e) {
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
}
}
void GlobalConfig::erase(KeyRef key) {
data.erase(key);
}
void GlobalConfig::erase(KeyRangeRef range) {
auto it = data.begin();
while (it != data.end()) {
if (range.contains(it->first)) {
it = data.erase(it);
} else {
++it;
}
}
}
// Older FDB versions used different keys for client profiling data. This
// function performs a one-time migration of data in these keys to the new
// global configuration key space.
ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Key migratedKey("\xff\x02/fdbClientInfo/migrated/"_sr);
state Optional<Value> migrated = wait(tr->get(migratedKey));
if (migrated.present()) {
// Already performed migration.
return Void();
}
state Optional<Value> sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
state Optional<Value> sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// The value doesn't matter too much, as long as the key is set.
tr->set(migratedKey.contents(), "1"_sr);
if (sampleRate.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
Tuple rate = Tuple().appendDouble(sampleRateDbl);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
}
if (sizeLimit.present()) {
const int64_t sizeLimitInt =
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
Tuple size = Tuple().append(sizeLimitInt);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
}
wait(tr->commit());
return Void();
} catch (Error& e) {
throw;
}
}
}
// Updates local copy of global configuration by reading the entire key-range
// from storage.
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
self->data.clear();
Transaction tr(self->cx);
Standalone<RangeResultRef> result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
for (const auto& kv : result) {
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
self->insert(systemKey, kv.value);
}
return Void();
}
// Applies updates to the local copy of the global configuration when this
// process receives an updated history.
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
wait(self->migrate(self));
wait(self->refresh(self));
self->initialized.send(Void());
loop {
try {
wait(dbInfo->onChange());
auto& history = dbInfo->get().history;
if (history.size() == 0) {
continue;
}
if (self->lastUpdate < history[0].version) {
// This process missed too many global configuration
// history updates or the protocol version changed, so it
// must re-read the entire configuration range.
wait(self->refresh(self));
if (dbInfo->get().history.size() > 0) {
self->lastUpdate = dbInfo->get().history.back().version;
}
} else {
// Apply history in order, from lowest version to highest
// version. Mutation history should already be stored in
// ascending version order.
for (const auto& vh : history) {
if (vh.version <= self->lastUpdate) {
continue; // already applied this mutation
}
for (const auto& mutation : vh.mutations.contents()) {
if (mutation.type == MutationRef::SetValue) {
self->insert(mutation.param1, mutation.param2);
} else if (mutation.type == MutationRef::ClearRange) {
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
} else {
ASSERT(false);
}
}
ASSERT(vh.version > self->lastUpdate);
self->lastUpdate = vh.version;
}
}
} catch (Error& e) {
throw;
}
}
}

View File

@ -0,0 +1,146 @@
/*
* GlobalConfig.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_GLOBALCONFIG_ACTOR_G_H)
#define FDBCLIENT_GLOBALCONFIG_ACTOR_G_H
#include "fdbclient/GlobalConfig.actor.g.h"
#elif !defined(FDBCLIENT_GLOBALCONFIG_ACTOR_H)
#define FDBCLIENT_GLOBALCONFIG_ACTOR_H
#include <any>
#include <map>
#include <type_traits>
#include <unordered_map>
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/GlobalConfig.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/actorcompiler.h" // has to be last include
// The global configuration is a series of typed key-value pairs synced to all
// nodes (server and client) in an FDB cluster in an eventually consistent
// manner. Only small key-value pairs should be stored in global configuration;
// an excessive amount of data can cause synchronization slowness.
// Keys
extern const KeyRef fdbClientInfoTxnSampleRate;
extern const KeyRef fdbClientInfoTxnSizeLimit;
extern const KeyRef transactionTagSampleRate;
extern const KeyRef transactionTagSampleCost;
// Structure used to hold the values stored by global configuration. The arena
// is used as memory to store both the key and the value (the value is only
// stored in the arena if it is an object; primitives are just copied).
struct ConfigValue : ReferenceCounted<ConfigValue> {
Arena arena;
std::any value;
ConfigValue() {}
ConfigValue(Arena&& a, std::any&& v) : arena(a), value(v) {}
};
class GlobalConfig : NonCopyable {
public:
// Creates a GlobalConfig singleton, accessed by calling GlobalConfig().
// This function should only be called once by each process (however, it is
// idempotent and calling it multiple times will have no effect).
static void create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo);
// Returns a reference to the global GlobalConfig object. Clients should
// call this function whenever they need to read a value out of the global
// configuration.
static GlobalConfig& globalConfig();
// Use this function to turn a global configuration key defined above into
// the full path needed to set the value in the database.
//
// For example, given "config/a", returns "\xff\xff/global_config/config/a".
static Key prefixedKey(KeyRef key);
// Get a value from the framework. Values are returned as a ConfigValue
// reference which also contains the arena holding the object. As long as
// the caller keeps the ConfigValue reference, the value is guaranteed to
// be readable. An empty reference is returned if the value does not exist.
const Reference<ConfigValue> get(KeyRef name);
const std::map<KeyRef, Reference<ConfigValue>> get(KeyRangeRef range);
// For arithmetic value types, returns a copy of the value for the given
// key, or the supplied default value if the framework does not know about
// the key.
template <typename T, typename std::enable_if<std::is_arithmetic<T>{}, bool>::type = true>
const T get(KeyRef name, T defaultVal) {
try {
auto configValue = get(name);
if (configValue.isValid()) {
if (configValue->value.has_value()) {
return std::any_cast<T>(configValue->value);
}
}
return defaultVal;
} catch (Error& e) {
throw;
}
}
// Trying to write into the global configuration keyspace? To write data,
// submit a transaction to \xff\xff/global_config/<your-key> with
// <your-value> encoded using the FDB tuple typecodes. Use the helper
// function `prefixedKey` to correctly prefix your global configuration
// key.
// Triggers the returned future when the global configuration singleton has
// been created and is ready.
Future<Void> onInitialized();
private:
GlobalConfig();
// The functions below only affect the local copy of the global
// configuration keyspace! To insert or remove values across all nodes you
// must use a transaction (see the note above).
// Inserts the given key-value pair into the local copy of the global
// configuration keyspace, overwriting the old key-value pair if it exists.
// `value` must be encoded using the FDB tuple typecodes.
void insert(KeyRef key, ValueRef value);
// Removes the given key (and associated value) from the local copy of the
// global configuration keyspace.
void erase(KeyRef key);
// Removes the given key range (and associated values) from the local copy
// of the global configuration keyspace.
void erase(KeyRangeRef range);
ACTOR static Future<Void> migrate(GlobalConfig* self);
ACTOR static Future<Void> refresh(GlobalConfig* self);
ACTOR static Future<Void> updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo);
Database cx;
Future<Void> _updater;
Promise<Void> initialized;
std::unordered_map<StringRef, Reference<ConfigValue>> data;
Version lastUpdate;
};
#endif

45
fdbclient/GlobalConfig.h Normal file
View File

@ -0,0 +1,45 @@
/*
* GlobalConfig.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
// Used to store a list of mutations made to the global configuration at a
// specific version.
struct VersionHistory {
constexpr static FileIdentifier file_identifier = 5863456;
VersionHistory() {}
VersionHistory(Version v) : version(v) {}
Version version;
Standalone<VectorRef<MutationRef>> mutations;
bool operator<(const VersionHistory& other) const { return version < other.version; }
int expectedSize() const { return sizeof(version) + mutations.expectedSize(); }
template <typename Ar>
void serialize(Ar& ar) {
serializer(ar, mutations, version);
}
};

View File

@ -28,6 +28,7 @@
#include "flow/ThreadHelper.actor.h" #include "flow/ThreadHelper.actor.h"
// An interface that represents a transaction created by a client
class ITransaction { class ITransaction {
public: public:
virtual ~ITransaction() {} virtual ~ITransaction() {}
@ -90,6 +91,7 @@ public:
virtual void delref() = 0; virtual void delref() = 0;
}; };
// An interface that represents a connection to a cluster made by a client
class IDatabase { class IDatabase {
public: public:
virtual ~IDatabase() {} virtual ~IDatabase() {}
@ -98,6 +100,11 @@ public:
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0; virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0; virtual double getMainThreadBusyness() = 0;
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
virtual ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) = 0;
virtual void addref() = 0; virtual void addref() = 0;
virtual void delref() = 0; virtual void delref() = 0;
@ -110,13 +117,16 @@ public:
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0; virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
}; };
// An interface that presents the top-level FDB client API as exposed through the C bindings
//
// This interface and its associated objects are intended to live outside the network thread, so its asynchronous
// operations use ThreadFutures and implementations should be thread safe.
class IClientApi { class IClientApi {
public: public:
virtual ~IClientApi() {} virtual ~IClientApi() {}
virtual void selectApiVersion(int apiVersion) = 0; virtual void selectApiVersion(int apiVersion) = 0;
virtual const char* getClientVersion() = 0; virtual const char* getClientVersion() = 0;
virtual ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) = 0;
virtual void setNetworkOption(FDBNetworkOptions::Option option, virtual void setNetworkOption(FDBNetworkOptions::Option option,
Optional<StringRef> value = Optional<StringRef>()) = 0; Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -356,7 +356,32 @@ double DLDatabase::getMainThreadBusyness() {
return 0; return 0;
} }
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
ASSERT(api->databaseGetServerProtocol != nullptr);
uint64_t expected =
expectedVersion.map<uint64_t>([](const ProtocolVersion& v) { return v.version(); }).orDefault(0);
FdbCApi::FDBFuture* f = api->databaseGetServerProtocol(db, expected);
return toThreadFuture<ProtocolVersion>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
uint64_t pv;
FdbCApi::fdb_error_t error = api->futureGetUInt64(f, &pv);
ASSERT(!error);
return ProtocolVersion(pv);
});
}
// DLApi // DLApi
// Loads the specified function from a dynamic library
//
// fp - The function pointer where the loaded function will be stored
// lib - The dynamic library where the function is loaded from
// libPath - The path of the dynamic library (used for logging)
// functionName - The function to load
// requireFunction - Determines the behavior if the function is not present. If true, an error is thrown. If false,
// the function pointer will be set to nullptr.
template <class T> template <class T>
void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) { void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) {
*(void**)(fp) = loadFunction(lib, functionName); *(void**)(fp) = loadFunction(lib, functionName);
@ -403,6 +428,8 @@ void DLApi::init() {
fdbCPath, fdbCPath,
"fdb_database_get_main_thread_busyness", "fdb_database_get_main_thread_busyness",
headerVersion >= 700); headerVersion >= 700);
loadClientFunction(
&api->databaseGetServerProtocol, lib, fdbCPath, "fdb_database_get_server_protocol", headerVersion >= 700);
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy"); loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700); loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
loadClientFunction(&api->databaseForceRecoveryWithDataLoss, loadClientFunction(&api->databaseForceRecoveryWithDataLoss,
@ -452,7 +479,7 @@ void DLApi::init() {
loadClientFunction( loadClientFunction(
&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version"); &api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version");
loadClientFunction(&api->futureGetUInt64, lib, fdbCPath, "fdb_future_get_uint64"); loadClientFunction(&api->futureGetUInt64, lib, fdbCPath, "fdb_future_get_uint64", headerVersion >= 700);
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error"); loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key"); loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value"); loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
@ -488,11 +515,6 @@ const char* DLApi::getClientVersion() {
return api->getClientVersion(); return api->getClientVersion();
} }
ThreadFuture<uint64_t> DLApi::getServerProtocol(const char* clusterFilePath) {
ASSERT(false);
return ThreadFuture<uint64_t>();
}
void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) { void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
throwIfError(api->setNetworkOption( throwIfError(api->setNetworkOption(
option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0)); option, value.present() ? value.get().begin() : nullptr, value.present() ? value.get().size() : 0));
@ -856,7 +878,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
std::string clusterFilePath, std::string clusterFilePath,
Reference<IDatabase> db, Reference<IDatabase> db,
bool openConnectors) bool openConnectors)
: dbState(new DatabaseState()) { : dbState(new DatabaseState()), clusterFilePath(clusterFilePath) {
dbState->db = db; dbState->db = db;
dbState->dbVar->set(db); dbState->dbVar->set(db);
@ -941,6 +963,15 @@ double MultiVersionDatabase::getMainThreadBusyness() {
return 0; return 0;
} }
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
// TODO: send this out through the active database
return MultiVersionApi::api->getLocalClient()
->api->createDatabase(clusterFilePath.c_str())
->getServerProtocol(expectedVersion);
}
void MultiVersionDatabase::Connector::connect() { void MultiVersionDatabase::Connector::connect() {
addref(); addref();
onMainThreadVoid( onMainThreadVoid(
@ -1181,10 +1212,6 @@ const char* MultiVersionApi::getClientVersion() {
return localClient->api->getClientVersion(); return localClient->api->getClientVersion();
} }
ThreadFuture<uint64_t> MultiVersionApi::getServerProtocol(const char* clusterFilePath) {
return api->localClient->api->getServerProtocol(clusterFilePath);
}
void validateOption(Optional<StringRef> value, bool canBePresent, bool canBeAbsent, bool canBeEmpty = true) { void validateOption(Optional<StringRef> value, bool canBePresent, bool canBeAbsent, bool canBeEmpty = true) {
ASSERT(canBePresent || canBeAbsent); ASSERT(canBePresent || canBeAbsent);

View File

@ -28,6 +28,8 @@
#include "flow/ThreadHelper.actor.h" #include "flow/ThreadHelper.actor.h"
// FdbCApi is used as a wrapper around the FoundationDB C API that gets loaded from an external client library.
// All of the required functions loaded from that external library are stored in function pointers in this struct.
struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> { struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct future FDBFuture; typedef struct future FDBFuture;
typedef struct cluster FDBCluster; typedef struct cluster FDBCluster;
@ -55,7 +57,6 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
// Network // Network
fdb_error_t (*selectApiVersion)(int runtimeVersion, int headerVersion); fdb_error_t (*selectApiVersion)(int runtimeVersion, int headerVersion);
const char* (*getClientVersion)(); const char* (*getClientVersion)();
FDBFuture* (*getServerProtocol)(const char* clusterFilePath);
fdb_error_t (*setNetworkOption)(FDBNetworkOptions::Option option, uint8_t const* value, int valueLength); fdb_error_t (*setNetworkOption)(FDBNetworkOptions::Option option, uint8_t const* value, int valueLength);
fdb_error_t (*setupNetwork)(); fdb_error_t (*setupNetwork)();
fdb_error_t (*runNetwork)(); fdb_error_t (*runNetwork)();
@ -81,6 +82,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
uint8_t const* snapshotCommmand, uint8_t const* snapshotCommmand,
int snapshotCommandLength); int snapshotCommandLength);
double (*databaseGetMainThreadBusyness)(FDBDatabase* database); double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
// Transaction // Transaction
fdb_error_t (*transactionSetOption)(FDBTransaction* tr, fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
@ -185,6 +187,8 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
fdb_error_t (*futureGetCluster)(FDBFuture* f, FDBCluster** outCluster); fdb_error_t (*futureGetCluster)(FDBFuture* f, FDBCluster** outCluster);
}; };
// An implementation of ITransaction that wraps a transaction object created on an externally loaded client library.
// All API calls to that transaction are routed through the external library.
class DLTransaction : public ITransaction, ThreadSafeReferenceCounted<DLTransaction> { class DLTransaction : public ITransaction, ThreadSafeReferenceCounted<DLTransaction> {
public: public:
DLTransaction(Reference<FdbCApi> api, FdbCApi::FDBTransaction* tr) : api(api), tr(tr) {} DLTransaction(Reference<FdbCApi> api, FdbCApi::FDBTransaction* tr) : api(api), tr(tr) {}
@ -249,6 +253,8 @@ private:
FdbCApi::FDBTransaction* const tr; FdbCApi::FDBTransaction* const tr;
}; };
// An implementation of IDatabase that wraps a database object created on an externally loaded client library.
// All API calls to that database are routed through the external library.
class DLDatabase : public IDatabase, ThreadSafeReferenceCounted<DLDatabase> { class DLDatabase : public IDatabase, ThreadSafeReferenceCounted<DLDatabase> {
public: public:
DLDatabase(Reference<FdbCApi> api, FdbCApi::FDBDatabase* db) : api(api), db(db), ready(Void()) {} DLDatabase(Reference<FdbCApi> api, FdbCApi::FDBDatabase* db) : api(api), db(db), ready(Void()) {}
@ -265,6 +271,11 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override; void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override; double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); } void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); } void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
@ -279,13 +290,14 @@ private:
ThreadFuture<Void> ready; ThreadFuture<Void> ready;
}; };
// An implementation of IClientApi that re-issues API calls to the C API of an externally loaded client library.
// The DL prefix stands for "dynamic library".
class DLApi : public IClientApi { class DLApi : public IClientApi {
public: public:
DLApi(std::string fdbCPath, bool unlinkOnLoad = false); DLApi(std::string fdbCPath, bool unlinkOnLoad = false);
void selectApiVersion(int apiVersion) override; void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override; const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override; void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override; void setupNetwork() override;
@ -312,6 +324,9 @@ private:
class MultiVersionDatabase; class MultiVersionDatabase;
// An implementation of ITransaction that wraps a transaction created either locally or through a dynamically loaded
// external client. When needed (e.g on cluster version change), the MultiVersionTransaction can automatically replace
// its wrapped transaction with one from another client.
class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<MultiVersionTransaction> { class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<MultiVersionTransaction> {
public: public:
MultiVersionTransaction(Reference<MultiVersionDatabase> db, MultiVersionTransaction(Reference<MultiVersionDatabase> db,
@ -413,6 +428,9 @@ struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
class MultiVersionApi; class MultiVersionApi;
// An implementation of IDatabase that wraps a database created either locally or through a dynamically loaded
// external client. The MultiVersionDatabase monitors the protocol version of the cluster and automatically
// replaces the wrapped database when the protocol version changes.
class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> { class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
public: public:
MultiVersionDatabase(MultiVersionApi* api, MultiVersionDatabase(MultiVersionApi* api,
@ -426,6 +444,11 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override; void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override; double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); } void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); } void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
@ -487,15 +510,19 @@ private:
Mutex optionLock; Mutex optionLock;
}; };
std::string clusterFilePath;
const Reference<DatabaseState> dbState; const Reference<DatabaseState> dbState;
friend class MultiVersionTransaction; friend class MultiVersionTransaction;
}; };
// An implementation of IClientApi that can choose between multiple different client implementations either provided
// locally within the primary loaded fdb_c client or through any number of dynamically loaded clients.
//
// This functionality is used to provide support for multiple protocol versions simultaneously.
class MultiVersionApi : public IClientApi { class MultiVersionApi : public IClientApi {
public: public:
void selectApiVersion(int apiVersion) override; void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override; const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override; void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override; void setupNetwork() override;

View File

@ -37,6 +37,7 @@
#include "fdbclient/ClusterInterface.h" #include "fdbclient/ClusterInterface.h"
#include "fdbclient/CoordinationInterface.h" #include "fdbclient/CoordinationInterface.h"
#include "fdbclient/DatabaseContext.h" #include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/JsonBuilder.h" #include "fdbclient/JsonBuilder.h"
#include "fdbclient/KeyRangeMap.h" #include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h" #include "fdbclient/Knobs.h"
@ -506,12 +507,13 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
} }
} }
cx->clientStatusUpdater.outStatusQ.clear(); cx->clientStatusUpdater.outStatusQ.clear();
double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) wait(GlobalConfig::globalConfig().onInitialized());
? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY double sampleRate = GlobalConfig::globalConfig().get<double>(fdbClientInfoTxnSampleRate,
: cx->clientInfo->get().clientTxnInfoSampleRate; std::numeric_limits<double>::infinity());
int64_t clientTxnInfoSizeLimit = cx->clientInfo->get().clientTxnInfoSizeLimit == -1 double clientSamplingProbability =
? CLIENT_KNOBS->CSI_SIZE_LIMIT std::isinf(sampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : sampleRate;
: cx->clientInfo->get().clientTxnInfoSizeLimit; int64_t sizeLimit = GlobalConfig::globalConfig().get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
int64_t clientTxnInfoSizeLimit = sizeLimit == -1 ? CLIENT_KNOBS->CSI_SIZE_LIMIT : sizeLimit;
if (!trChunksQ.empty() && deterministicRandom()->random01() < clientSamplingProbability) if (!trChunksQ.empty() && deterministicRandom()->random01() < clientSamplingProbability)
wait(delExcessClntTxnEntriesActor(&tr, clientTxnInfoSizeLimit)); wait(delExcessClntTxnEntriesActor(&tr, clientTxnInfoSizeLimit));
@ -957,6 +959,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted")); getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted")); getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
GlobalConfig::create(this, clientInfo);
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger); monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this); clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this); cacheListMonitor = monitorCacheList(this);
@ -1019,9 +1023,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
singleKeyRange(LiteralStringRef("consistency_check_suspended")) singleKeyRange(LiteralStringRef("consistency_check_suspended"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule( registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::TRACING, SpecialKeySpace::MODULE::GLOBALCONFIG, SpecialKeySpace::IMPLTYPE::READWRITE,
SpecialKeySpace::IMPLTYPE::READWRITE, std::make_unique<GlobalConfigImpl>(
std::make_unique<TracingOptionsImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING))); SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::TRACING, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
registerSpecialKeySpaceModule( registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CONFIGURATION, SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READWRITE, SpecialKeySpace::IMPLTYPE::READWRITE,
@ -1271,14 +1279,16 @@ Future<Void> DatabaseContext::onProxiesChanged() {
} }
bool DatabaseContext::sampleReadTags() const { bool DatabaseContext::sampleReadTags() const {
return clientInfo->get().transactionTagSampleRate > 0 && double sampleRate = GlobalConfig::globalConfig().get(transactionTagSampleRate, CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
deterministicRandom()->random01() <= clientInfo->get().transactionTagSampleRate; return sampleRate > 0 && deterministicRandom()->random01() <= sampleRate;
} }
bool DatabaseContext::sampleOnCost(uint64_t cost) const { bool DatabaseContext::sampleOnCost(uint64_t cost) const {
if (clientInfo->get().transactionTagSampleCost <= 0) double sampleCost =
GlobalConfig::globalConfig().get<double>(transactionTagSampleCost, CLIENT_KNOBS->COMMIT_SAMPLE_COST);
if (sampleCost <= 0)
return false; return false;
return deterministicRandom()->random01() <= (double)cost / clientInfo->get().transactionTagSampleCost; return deterministicRandom()->random01() <= (double)cost / sampleCost;
} }
int64_t extractIntOption(Optional<StringRef> value, int64_t minValue, int64_t maxValue) { int64_t extractIntOption(Optional<StringRef> value, int64_t minValue, int64_t maxValue) {
@ -2483,7 +2493,6 @@ ACTOR Future<Version> watchValue(Future<Version> version,
cx->invalidateCache(key); cx->invalidateCache(key);
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
} else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) { } else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) {
TEST(e.code() == error_code_watch_cancelled); // Too many watches on the storage server, poll for changes instead
TEST(e.code() == error_code_watch_cancelled); // Too many watches on storage server, poll for changes TEST(e.code() == error_code_watch_cancelled); // Too many watches on storage server, poll for changes
TEST(e.code() == error_code_process_behind); // The storage servers are all behind TEST(e.code() == error_code_process_behind); // The storage servers are all behind
wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID)); wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID));
@ -4908,9 +4917,18 @@ ACTOR Future<ProtocolVersion> coordinatorProtocolsFetcher(Reference<ClusterConne
return ProtocolVersion(majorityProtocol); return ProtocolVersion(majorityProtocol);
} }
ACTOR Future<uint64_t> getCoordinatorProtocols(Reference<ClusterConnectionFile> f) { // Returns the protocol version reported by a quorum of coordinators
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f)); // If an expected version is given, the future won't return until the protocol version is different than expected
return protocolVersion.version(); ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion) {
loop {
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f));
if (!expectedVersion.present() || protocolVersion != expectedVersion.get()) {
return protocolVersion;
} else {
wait(delay(2.0)); // TODO: this is temporary, so not making into a knob yet
}
}
} }
uint32_t Transaction::getSize() { uint32_t Transaction::getSize() {
@ -5378,9 +5396,8 @@ void Transaction::checkDeferredError() {
Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) { Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) {
if (!cx->isError()) { if (!cx->isError()) {
double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) double clientSamplingProbability = GlobalConfig::globalConfig().get<double>(
? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY);
: cx->clientInfo->get().clientTxnInfoSampleRate;
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) && if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
deterministicRandom()->random01() < clientSamplingProbability && deterministicRandom()->random01() < clientSamplingProbability &&
(!g_network->isSimulated() || !g_simulator.speedUpSimulation)) { (!g_network->isSimulated() || !g_simulator.speedUpSimulation)) {

View File

@ -400,7 +400,10 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed // Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions); ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
ACTOR Future<uint64_t> getCoordinatorProtocols(Reference<ClusterConnectionFile> f); // Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion);
inline uint64_t getWriteOperationCost(uint64_t bytes) { inline uint64_t getWriteOperationCost(uint64_t bytes) {
return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1; return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1;

View File

@ -23,6 +23,7 @@
#include "fdbclient/Knobs.h" #include "fdbclient/Knobs.h"
#include "fdbclient/ProcessInterface.h" #include "fdbclient/ProcessInterface.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h" #include "fdbclient/SpecialKeySpace.actor.h"
#include "flow/Arena.h" #include "flow/Arena.h"
#include "flow/UnitTest.h" #include "flow/UnitTest.h"
@ -65,6 +66,8 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
{ SpecialKeySpace::MODULE::ERRORMSG, singleKeyRange(LiteralStringRef("\xff\xff/error_message")) }, { SpecialKeySpace::MODULE::ERRORMSG, singleKeyRange(LiteralStringRef("\xff\xff/error_message")) },
{ SpecialKeySpace::MODULE::CONFIGURATION, { SpecialKeySpace::MODULE::CONFIGURATION,
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/"), LiteralStringRef("\xff\xff/configuration0")) }, KeyRangeRef(LiteralStringRef("\xff\xff/configuration/"), LiteralStringRef("\xff\xff/configuration0")) },
{ SpecialKeySpace::MODULE::GLOBALCONFIG,
KeyRangeRef(LiteralStringRef("\xff\xff/global_config/"), LiteralStringRef("\xff\xff/global_config0")) },
{ SpecialKeySpace::MODULE::TRACING, { SpecialKeySpace::MODULE::TRACING,
KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) }, KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) },
{ SpecialKeySpace::MODULE::ACTORLINEAGE, { SpecialKeySpace::MODULE::ACTORLINEAGE,
@ -1372,10 +1375,129 @@ Future<Optional<std::string>> ConsistencyCheckImpl::commit(ReadYourWritesTransac
return Optional<std::string>(); return Optional<std::string>();
} }
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) { GlobalConfigImpl::GlobalConfigImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
TraceEvent("TracingOptionsImpl::TracingOptionsImpl").detail("Range", kr);
// Returns key-value pairs for each value stored in the global configuration
// framework within the range specified. The special-key-space getrange
// function should only be used for informational purposes. All values are
// returned as strings regardless of their true type.
Future<Standalone<RangeResultRef>> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
auto& globalConfig = GlobalConfig::globalConfig();
KeyRangeRef modified =
KeyRangeRef(kr.begin.removePrefix(getKeyRange().begin), kr.end.removePrefix(getKeyRange().begin));
std::map<KeyRef, Reference<ConfigValue>> values = globalConfig.get(modified);
for (const auto& [key, config] : values) {
Key prefixedKey = key.withPrefix(getKeyRange().begin);
if (config.isValid() && config->value.has_value()) {
if (config->value.type() == typeid(StringRef)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::any_cast<StringRef>(config->value).toString()));
} else if (config->value.type() == typeid(int64_t)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(config->value))));
} else if (config->value.type() == typeid(float)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(config->value))));
} else if (config->value.type() == typeid(double)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<double>(config->value))));
} else {
ASSERT(false);
}
}
}
return result;
} }
// Marks the key for insertion into global configuration.
void GlobalConfigImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(value)));
}
// Writes global configuration changes to durable memory. Also writes the
// changes made in the transaction to a recent history set, and updates the
// latest version which the global configuration was updated at.
ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* globalConfig,
ReadYourWritesTransaction* ryw) {
state Transaction& tr = ryw->getTransaction();
// History should only contain three most recent updates. If it currently
// has three items, remove the oldest to make room for a new item.
Standalone<RangeResultRef> history = wait(tr.getRange(globalConfigHistoryKeys, CLIENT_KNOBS->TOO_MANY));
constexpr int kGlobalConfigMaxHistorySize = 3;
if (history.size() > kGlobalConfigMaxHistorySize - 1) {
for (int i = 0; i < history.size() - (kGlobalConfigMaxHistorySize - 1); ++i) {
tr.clear(history[i].key);
}
}
VersionHistory vh{ 0 };
// Transform writes from the special-key-space (\xff\xff/global_config/) to
// the system key space (\xff/globalConfig/), and writes mutations to
// latest version history.
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys);
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::iterator iter = ranges.begin();
while (iter != ranges.end()) {
std::pair<bool, Optional<Value>> entry = iter->value();
if (entry.first) {
if (entry.second.present() && iter->begin().startsWith(globalConfig->getKeyRange().begin)) {
Key bareKey = iter->begin().removePrefix(globalConfig->getKeyRange().begin);
vh.mutations.emplace_back_deep(vh.mutations.arena(),
MutationRef(MutationRef::SetValue, bareKey, entry.second.get()));
Key systemKey = bareKey.withPrefix(globalConfigKeysPrefix);
tr.set(systemKey, entry.second.get());
} else if (!entry.second.present() && iter->range().begin.startsWith(globalConfig->getKeyRange().begin) &&
iter->range().end.startsWith(globalConfig->getKeyRange().begin)) {
KeyRef bareRangeBegin = iter->range().begin.removePrefix(globalConfig->getKeyRange().begin);
KeyRef bareRangeEnd = iter->range().end.removePrefix(globalConfig->getKeyRange().begin);
vh.mutations.emplace_back_deep(vh.mutations.arena(),
MutationRef(MutationRef::ClearRange, bareRangeBegin, bareRangeEnd));
Key systemRangeBegin = bareRangeBegin.withPrefix(globalConfigKeysPrefix);
Key systemRangeEnd = bareRangeEnd.withPrefix(globalConfigKeysPrefix);
tr.clear(KeyRangeRef(systemRangeBegin, systemRangeEnd));
}
}
++iter;
}
// Record the mutations in this commit into the global configuration history.
Key historyKey = addVersionStampAtEnd(globalConfigHistoryPrefix);
ObjectWriter historyWriter(IncludeVersion());
historyWriter.serialize(vh);
tr.atomicOp(historyKey, historyWriter.toStringRef(), MutationRef::SetVersionstampedKey);
// Write version key to trigger update in cluster controller.
tr.atomicOp(globalConfigVersionKey,
LiteralStringRef("0123456789\x00\x00\x00\x00"), // versionstamp
MutationRef::SetVersionstampedValue);
return Optional<std::string>();
}
// Called when a transaction includes keys in the global configuration special-key-space range.
Future<Optional<std::string>> GlobalConfigImpl::commit(ReadYourWritesTransaction* ryw) {
return globalConfigCommitActor(this, ryw);
}
// Marks the range for deletion from global configuration.
void GlobalConfigImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
}
// Marks the key for deletion from global configuration.
void GlobalConfigImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
}
TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result; Standalone<RangeResultRef> result;
for (const auto& option : SpecialKeySpace::getTracingOptions()) { for (const auto& option : SpecialKeySpace::getTracingOptions()) {

View File

@ -147,6 +147,7 @@ public:
CONFIGURATION, // Configuration of the cluster CONFIGURATION, // Configuration of the cluster
CONNECTIONSTRING, CONNECTIONSTRING,
ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space
GLOBALCONFIG, // Global configuration options synchronized to all nodes
MANAGEMENT, // Management-API MANAGEMENT, // Management-API
METRICS, // data-distribution metrics METRICS, // data-distribution metrics
TESTONLY, // only used by correctness tests TESTONLY, // only used by correctness tests
@ -337,6 +338,16 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override; Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
}; };
class GlobalConfigImpl : public SpecialKeyRangeRWImpl {
public:
explicit GlobalConfigImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
};
class TracingOptionsImpl : public SpecialKeyRangeRWImpl { class TracingOptionsImpl : public SpecialKeyRangeRWImpl {
public: public:
explicit TracingOptionsImpl(KeyRangeRef kr); explicit TracingOptionsImpl(KeyRangeRef kr);

View File

@ -632,7 +632,18 @@ std::string encodeFailedServersKey(AddressExclusion const& addr) {
return failedServersPrefix.toString() + addr.toString(); return failedServersPrefix.toString() + addr.toString();
} }
const KeyRangeRef workerListKeys(LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0")); // const KeyRangeRef globalConfigKeys( LiteralStringRef("\xff/globalConfig/"), LiteralStringRef("\xff/globalConfig0") );
// const KeyRef globalConfigPrefix = globalConfigKeys.begin;
const KeyRangeRef globalConfigDataKeys( LiteralStringRef("\xff/globalConfig/k/"), LiteralStringRef("\xff/globalConfig/k0") );
const KeyRef globalConfigKeysPrefix = globalConfigDataKeys.begin;
const KeyRangeRef globalConfigHistoryKeys( LiteralStringRef("\xff/globalConfig/h/"), LiteralStringRef("\xff/globalConfig/h0") );
const KeyRef globalConfigHistoryPrefix = globalConfigHistoryKeys.begin;
const KeyRef globalConfigVersionKey = LiteralStringRef("\xff/globalConfig/v");
const KeyRangeRef workerListKeys( LiteralStringRef("\xff/worker/"), LiteralStringRef("\xff/worker0") );
const KeyRef workerListPrefix = workerListKeys.begin; const KeyRef workerListPrefix = workerListKeys.begin;
const Key workerListKeyFor(StringRef processID) { const Key workerListKeyFor(StringRef processID) {
@ -748,8 +759,7 @@ const KeyRef tagThrottleCountKey = LiteralStringRef("\xff\x02/throttledTags/manu
// Client status info prefix // Client status info prefix
const KeyRangeRef fdbClientInfoPrefixRange(LiteralStringRef("\xff\x02/fdbClientInfo/"), const KeyRangeRef fdbClientInfoPrefixRange(LiteralStringRef("\xff\x02/fdbClientInfo/"),
LiteralStringRef("\xff\x02/fdbClientInfo0")); LiteralStringRef("\xff\x02/fdbClientInfo0"));
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_sample_rate/"); // See remaining fields in GlobalConfig.actor.h
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_size_limit/");
// ConsistencyCheck settings // ConsistencyCheck settings
const KeyRef fdbShouldConsistencyCheckBeSuspended = LiteralStringRef("\xff\x02/ConsistencyCheck/Suspend"); const KeyRef fdbShouldConsistencyCheckBeSuspended = LiteralStringRef("\xff\x02/ConsistencyCheck/Suspend");

View File

@ -230,6 +230,30 @@ extern const KeyRef failedServersVersionKey; // The value of this key shall be c
const AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix) const AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix)
std::string encodeFailedServersKey(AddressExclusion const&); std::string encodeFailedServersKey(AddressExclusion const&);
// "\xff/globalConfig/[[option]]" := "value"
// An umbrella prefix for global configuration data synchronized to all nodes.
// extern const KeyRangeRef globalConfigData;
// extern const KeyRef globalConfigDataPrefix;
// "\xff/globalConfig/k/[[key]]" := "value"
// Key-value pairs that have been set. The range this keyspace represents
// contains all globally configured options.
extern const KeyRangeRef globalConfigDataKeys;
extern const KeyRef globalConfigKeysPrefix;
// "\xff/globalConfig/h/[[version]]" := "value"
// Maps a commit version to a list of mutations made to the global
// configuration at that commit. Shipped to nodes periodically. In general,
// clients should not write to keys in this keyspace; it will be written
// automatically when updating global configuration keys.
extern const KeyRangeRef globalConfigHistoryKeys;
extern const KeyRef globalConfigHistoryPrefix;
// "\xff/globalConfig/v" := "version"
// Read-only key which returns the commit version of the most recent mutation
// made to the global configuration keyspace.
extern const KeyRef globalConfigVersionKey;
// "\xff/workers/[[processID]]" := "" // "\xff/workers/[[processID]]" := ""
// Asynchronously updated by the cluster controller, this is a list of fdbserver processes that have joined the cluster // Asynchronously updated by the cluster controller, this is a list of fdbserver processes that have joined the cluster
// and are currently (recently) available // and are currently (recently) available
@ -355,8 +379,6 @@ extern const KeyRangeRef applyMutationsKeyVersionCountRange;
// FdbClient Info prefix // FdbClient Info prefix
extern const KeyRangeRef fdbClientInfoPrefixRange; extern const KeyRangeRef fdbClientInfoPrefixRange;
extern const KeyRef fdbClientInfoTxnSampleRate;
extern const KeyRef fdbClientInfoTxnSizeLimit;
// Consistency Check settings // Consistency Check settings
extern const KeyRef fdbShouldConsistencyCheckBeSuspended; extern const KeyRef fdbShouldConsistencyCheckBeSuspended;

View File

@ -97,6 +97,15 @@ double ThreadSafeDatabase::getMainThreadBusyness() {
return g_network->networkInfo.metrics.networkBusyness; return g_network->networkInfo.metrics.networkBusyness;
} }
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
DatabaseContext* db = this->db;
return onMainThread([db, expectedVersion]() -> Future<ProtocolVersion> {
return getClusterProtocol(db->getConnectionFile(), expectedVersion);
});
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) { ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile = ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first); new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);
@ -407,16 +416,6 @@ const char* ThreadSafeApi::getClientVersion() {
return clientVersion.c_str(); return clientVersion.c_str();
} }
// Wait until a quorum of coordinators with the same protocol version are available, and then return that protocol
// version.
ThreadFuture<uint64_t> ThreadSafeApi::getServerProtocol(const char* clusterFilePath) {
return onMainThread([clusterFilePath = std::string(clusterFilePath)]() -> Future<uint64_t> {
auto [clusterFile, isDefault] = ClusterConnectionFile::lookupClusterFileName(clusterFilePath);
Reference<ClusterConnectionFile> f = Reference<ClusterConnectionFile>(new ClusterConnectionFile(clusterFile));
return getCoordinatorProtocols(f);
});
}
void ThreadSafeApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) { void ThreadSafeApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
if (option == FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID) { if (option == FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID) {
if (value.present()) { if (value.present()) {

View File

@ -27,6 +27,8 @@
#include "fdbclient/ClusterInterface.h" #include "fdbclient/ClusterInterface.h"
#include "fdbclient/IClientApi.h" #include "fdbclient/IClientApi.h"
// An implementation of IDatabase that serializes operations onto the network thread and interacts with the lower-level
// client APIs exposed by NativeAPI and ReadYourWrites.
class ThreadSafeDatabase : public IDatabase, public ThreadSafeReferenceCounted<ThreadSafeDatabase> { class ThreadSafeDatabase : public IDatabase, public ThreadSafeReferenceCounted<ThreadSafeDatabase> {
public: public:
~ThreadSafeDatabase() override; ~ThreadSafeDatabase() override;
@ -37,9 +39,14 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override; void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override; double getMainThreadBusyness() override;
ThreadFuture<Void> // Returns the protocol version reported by a quorum of coordinators
onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The // If an expected version is given, the future won't return until the protocol version is different than expected
// cluster file therefore is valid, but the database might be unavailable. ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
// Returns after a majority of coordination servers are available and have reported a leader. The
// cluster file therefore is valid, but the database might be unavailable.
ThreadFuture<Void> onConnected();
void addref() override { ThreadSafeReferenceCounted<ThreadSafeDatabase>::addref(); } void addref() override { ThreadSafeReferenceCounted<ThreadSafeDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<ThreadSafeDatabase>::delref(); } void delref() override { ThreadSafeReferenceCounted<ThreadSafeDatabase>::delref(); }
@ -58,6 +65,8 @@ public: // Internal use only
DatabaseContext* unsafeGetPtr() const { return db; } DatabaseContext* unsafeGetPtr() const { return db; }
}; };
// An implementation of ITransaction that serializes operations onto the network thread and interacts with the
// lower-level client APIs exposed by NativeAPI and ReadYourWrites.
class ThreadSafeTransaction : public ITransaction, ThreadSafeReferenceCounted<ThreadSafeTransaction>, NonCopyable { class ThreadSafeTransaction : public ITransaction, ThreadSafeReferenceCounted<ThreadSafeTransaction>, NonCopyable {
public: public:
explicit ThreadSafeTransaction(DatabaseContext* cx); explicit ThreadSafeTransaction(DatabaseContext* cx);
@ -135,11 +144,12 @@ private:
ReadYourWritesTransaction* tr; ReadYourWritesTransaction* tr;
}; };
// An implementation of IClientApi that serializes operations onto the network thread and interacts with the lower-level
// client APIs exposed by NativeAPI and ReadYourWrites.
class ThreadSafeApi : public IClientApi, ThreadSafeReferenceCounted<ThreadSafeApi> { class ThreadSafeApi : public IClientApi, ThreadSafeReferenceCounted<ThreadSafeApi> {
public: public:
void selectApiVersion(int apiVersion) override; void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override; const char* getClientVersion() override;
ThreadFuture<uint64_t> getServerProtocol(const char* clusterFilePath) override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override; void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override; void setupNetwork() override;

View File

@ -20,7 +20,20 @@
#include "fdbclient/Tuple.h" #include "fdbclient/Tuple.h"
static size_t find_string_terminator(const StringRef data, size_t offset) { // TODO: Many functions copied from bindings/flow/Tuple.cpp. Merge at some point.
static float bigEndianFloat(float orig) {
int32_t big = *(int32_t*)&orig;
big = bigEndian32(big);
return *(float*)&big;
}
static double bigEndianDouble(double orig) {
int64_t big = *(int64_t*)&orig;
big = bigEndian64(big);
return *(double*)&big;
}
static size_t findStringTerminator(const StringRef data, size_t offset) {
size_t i = offset; size_t i = offset;
while (i < data.size() - 1 && !(data[i] == '\x00' && data[i + 1] != (uint8_t)'\xff')) { while (i < data.size() - 1 && !(data[i] == '\x00' && data[i + 1] != (uint8_t)'\xff')) {
i += (data[i] == '\x00' ? 2 : 1); i += (data[i] == '\x00' ? 2 : 1);
@ -29,6 +42,20 @@ static size_t find_string_terminator(const StringRef data, size_t offset) {
return i; return i;
} }
// If encoding and the sign bit is 1 (the number is negative), flip all the bits.
// If decoding and the sign bit is 0 (the number is negative), flip all the bits.
// Otherwise, the number is positive, so flip the sign bit.
static void adjustFloatingPoint(uint8_t* bytes, size_t size, bool encode) {
if ((encode && ((uint8_t)(bytes[0] & 0x80) != (uint8_t)0x00)) ||
(!encode && ((uint8_t)(bytes[0] & 0x80) != (uint8_t)0x80))) {
for (size_t i = 0; i < size; i++) {
bytes[i] ^= (uint8_t)0xff;
}
} else {
bytes[0] ^= (uint8_t)0x80;
}
}
Tuple::Tuple(StringRef const& str, bool exclude_incomplete) { Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
data.append(data.arena(), str.begin(), str.size()); data.append(data.arena(), str.begin(), str.size());
@ -37,9 +64,13 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
offsets.push_back(i); offsets.push_back(i);
if (data[i] == '\x01' || data[i] == '\x02') { if (data[i] == '\x01' || data[i] == '\x02') {
i = find_string_terminator(str, i + 1) + 1; i = findStringTerminator(str, i + 1) + 1;
} else if (data[i] >= '\x0c' && data[i] <= '\x1c') { } else if (data[i] >= '\x0c' && data[i] <= '\x1c') {
i += abs(data[i] - '\x14') + 1; i += abs(data[i] - '\x14') + 1;
} else if (data[i] == 0x20) {
i += sizeof(float) + 1;
} else if (data[i] == 0x21) {
i += sizeof(double) + 1;
} else if (data[i] == '\x00') { } else if (data[i] == '\x00') {
i += 1; i += 1;
} else { } else {
@ -113,6 +144,29 @@ Tuple& Tuple::append(int64_t value) {
return *this; return *this;
} }
Tuple& Tuple::appendFloat(float value) {
offsets.push_back(data.size());
float swap = bigEndianFloat(value);
uint8_t* bytes = (uint8_t*)&swap;
adjustFloatingPoint(bytes, sizeof(float), true);
data.push_back(data.arena(), 0x20);
data.append(data.arena(), bytes, sizeof(float));
return *this;
}
Tuple& Tuple::appendDouble(double value) {
offsets.push_back(data.size());
double swap = value;
swap = bigEndianDouble(swap);
uint8_t* bytes = (uint8_t*)&swap;
adjustFloatingPoint(bytes, sizeof(double), true);
data.push_back(data.arena(), 0x21);
data.append(data.arena(), bytes, sizeof(double));
return *this;
}
Tuple& Tuple::appendNull() { Tuple& Tuple::appendNull() {
offsets.push_back(data.size()); offsets.push_back(data.size());
data.push_back(data.arena(), (uint8_t)'\x00'); data.push_back(data.arena(), (uint8_t)'\x00');
@ -134,6 +188,10 @@ Tuple::ElementType Tuple::getType(size_t index) const {
return ElementType::UTF8; return ElementType::UTF8;
} else if (code >= '\x0c' && code <= '\x1c') { } else if (code >= '\x0c' && code <= '\x1c') {
return ElementType::INT; return ElementType::INT;
} else if (code == 0x20) {
return ElementType::FLOAT;
} else if (code == 0x21) {
return ElementType::DOUBLE;
} else { } else {
throw invalid_tuple_data_type(); throw invalid_tuple_data_type();
} }
@ -228,6 +286,45 @@ int64_t Tuple::getInt(size_t index, bool allow_incomplete) const {
return swap; return swap;
} }
// TODO: Combine with bindings/flow/Tuple.*. This code is copied from there.
float Tuple::getFloat(size_t index) const {
if (index >= offsets.size()) {
throw invalid_tuple_index();
}
ASSERT_LT(offsets[index], data.size());
uint8_t code = data[offsets[index]];
if (code != 0x20) {
throw invalid_tuple_data_type();
}
float swap;
uint8_t* bytes = (uint8_t*)&swap;
ASSERT_LE(offsets[index] + 1 + sizeof(float), data.size());
swap = *(float*)(data.begin() + offsets[index] + 1);
adjustFloatingPoint(bytes, sizeof(float), false);
return bigEndianFloat(swap);
}
double Tuple::getDouble(size_t index) const {
if (index >= offsets.size()) {
throw invalid_tuple_index();
}
ASSERT_LT(offsets[index], data.size());
uint8_t code = data[offsets[index]];
if (code != 0x21) {
throw invalid_tuple_data_type();
}
double swap;
uint8_t* bytes = (uint8_t*)&swap;
ASSERT_LE(offsets[index] + 1 + sizeof(double), data.size());
swap = *(double*)(data.begin() + offsets[index] + 1);
adjustFloatingPoint(bytes, sizeof(double), false);
return bigEndianDouble(swap);
}
KeyRange Tuple::range(Tuple const& tuple) const { KeyRange Tuple::range(Tuple const& tuple) const {
VectorRef<uint8_t> begin; VectorRef<uint8_t> begin;
VectorRef<uint8_t> end; VectorRef<uint8_t> end;

View File

@ -38,6 +38,10 @@ struct Tuple {
Tuple& append(Tuple const& tuple); Tuple& append(Tuple const& tuple);
Tuple& append(StringRef const& str, bool utf8 = false); Tuple& append(StringRef const& str, bool utf8 = false);
Tuple& append(int64_t); Tuple& append(int64_t);
// There are some ambiguous append calls in fdbclient, so to make it easier
// to add append for floats and doubles, name them differently for now.
Tuple& appendFloat(float);
Tuple& appendDouble(double);
Tuple& appendNull(); Tuple& appendNull();
StringRef pack() const { return StringRef(data.begin(), data.size()); } StringRef pack() const { return StringRef(data.begin(), data.size()); }
@ -47,7 +51,7 @@ struct Tuple {
return append(t); return append(t);
} }
enum ElementType { NULL_TYPE, INT, BYTES, UTF8 }; enum ElementType { NULL_TYPE, INT, BYTES, UTF8, FLOAT, DOUBLE };
// this is number of elements, not length of data // this is number of elements, not length of data
size_t size() const { return offsets.size(); } size_t size() const { return offsets.size(); }
@ -55,6 +59,8 @@ struct Tuple {
ElementType getType(size_t index) const; ElementType getType(size_t index) const;
Standalone<StringRef> getString(size_t index) const; Standalone<StringRef> getString(size_t index) const;
int64_t getInt(size_t index, bool allow_incomplete = false) const; int64_t getInt(size_t index, bool allow_incomplete = false) const;
float getFloat(size_t index) const;
double getDouble(size_t index) const;
KeyRange range(Tuple const& tuple = Tuple()) const; KeyRange range(Tuple const& tuple = Tuple()) const;

View File

@ -197,7 +197,7 @@ private:
this->file = file; this->file = file;
this->filename = filename; this->filename = filename;
this->diskParameters = diskParameters; this->diskParameters = diskParameters;
maxWriteDelay = 5.0; maxWriteDelay = FLOW_KNOBS->NON_DURABLE_MAX_WRITE_DELAY;
hasBeenSynced = false; hasBeenSynced = false;
killMode = (KillMode)deterministicRandom()->randomInt(1, 3); killMode = (KillMode)deterministicRandom()->randomInt(1, 3);
@ -434,7 +434,8 @@ private:
state TaskPriority currentTaskID = g_network->getCurrentTask(); state TaskPriority currentTaskID = g_network->getCurrentTask();
wait(g_simulator.onMachine(currentProcess)); wait(g_simulator.onMachine(currentProcess));
state double delayDuration = deterministicRandom()->random01() * self->maxWriteDelay; state double delayDuration =
g_simulator.speedUpSimulation ? 0.0001 : (deterministicRandom()->random01() * self->maxWriteDelay);
state Standalone<StringRef> dataCopy(StringRef((uint8_t*)data, length)); state Standalone<StringRef> dataCopy(StringRef((uint8_t*)data, length));
state Future<bool> startSyncFuture = self->startSyncPromise.getFuture(); state Future<bool> startSyncFuture = self->startSyncPromise.getFuture();
@ -606,7 +607,8 @@ private:
state TaskPriority currentTaskID = g_network->getCurrentTask(); state TaskPriority currentTaskID = g_network->getCurrentTask();
wait(g_simulator.onMachine(currentProcess)); wait(g_simulator.onMachine(currentProcess));
state double delayDuration = deterministicRandom()->random01() * self->maxWriteDelay; state double delayDuration =
g_simulator.speedUpSimulation ? 0.0001 : (deterministicRandom()->random01() * self->maxWriteDelay);
state Future<bool> startSyncFuture = self->startSyncPromise.getFuture(); state Future<bool> startSyncFuture = self->startSyncPromise.getFuture();
try { try {

View File

@ -63,7 +63,7 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
default: default:
return ProcessClass::NeverAssign; return ProcessClass::NeverAssign;
} }
case ProcessClass::CommitProxy: case ProcessClass::CommitProxy: // Resolver, Master, CommitProxy, and GrvProxy need to be the same besides best fit
switch (_class) { switch (_class) {
case ProcessClass::CommitProxyClass: case ProcessClass::CommitProxyClass:
return ProcessClass::BestFit; return ProcessClass::BestFit;
@ -71,10 +71,6 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
return ProcessClass::GoodFit; return ProcessClass::GoodFit;
case ProcessClass::UnsetClass: case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit; return ProcessClass::UnsetFit;
case ProcessClass::GrvProxyClass:
return ProcessClass::OkayFit;
case ProcessClass::ResolutionClass:
return ProcessClass::OkayFit;
case ProcessClass::TransactionClass: case ProcessClass::TransactionClass:
return ProcessClass::OkayFit; return ProcessClass::OkayFit;
case ProcessClass::CoordinatorClass: case ProcessClass::CoordinatorClass:
@ -84,7 +80,7 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
default: default:
return ProcessClass::WorstFit; return ProcessClass::WorstFit;
} }
case ProcessClass::GrvProxy: case ProcessClass::GrvProxy: // Resolver, Master, CommitProxy, and GrvProxy need to be the same besides best fit
switch (_class) { switch (_class) {
case ProcessClass::GrvProxyClass: case ProcessClass::GrvProxyClass:
return ProcessClass::BestFit; return ProcessClass::BestFit;
@ -92,10 +88,6 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
return ProcessClass::GoodFit; return ProcessClass::GoodFit;
case ProcessClass::UnsetClass: case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit; return ProcessClass::UnsetFit;
case ProcessClass::CommitProxyClass:
return ProcessClass::OkayFit;
case ProcessClass::ResolutionClass:
return ProcessClass::OkayFit;
case ProcessClass::TransactionClass: case ProcessClass::TransactionClass:
return ProcessClass::OkayFit; return ProcessClass::OkayFit;
case ProcessClass::CoordinatorClass: case ProcessClass::CoordinatorClass:
@ -105,7 +97,7 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
default: default:
return ProcessClass::WorstFit; return ProcessClass::WorstFit;
} }
case ProcessClass::Master: case ProcessClass::Master: // Resolver, Master, CommitProxy, and GrvProxy need to be the same besides best fit
switch (_class) { switch (_class) {
case ProcessClass::MasterClass: case ProcessClass::MasterClass:
return ProcessClass::BestFit; return ProcessClass::BestFit;
@ -113,7 +105,7 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
return ProcessClass::GoodFit; return ProcessClass::GoodFit;
case ProcessClass::UnsetClass: case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit; return ProcessClass::UnsetFit;
case ProcessClass::ResolutionClass: case ProcessClass::TransactionClass:
return ProcessClass::OkayFit; return ProcessClass::OkayFit;
case ProcessClass::CoordinatorClass: case ProcessClass::CoordinatorClass:
case ProcessClass::TesterClass: case ProcessClass::TesterClass:
@ -122,7 +114,7 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
default: default:
return ProcessClass::WorstFit; return ProcessClass::WorstFit;
} }
case ProcessClass::Resolver: case ProcessClass::Resolver: // Resolver, Master, CommitProxy, and GrvProxy need to be the same besides best fit
switch (_class) { switch (_class) {
case ProcessClass::ResolutionClass: case ProcessClass::ResolutionClass:
return ProcessClass::BestFit; return ProcessClass::BestFit;
@ -147,8 +139,6 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
return ProcessClass::GoodFit; return ProcessClass::GoodFit;
case ProcessClass::UnsetClass: case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit; return ProcessClass::UnsetFit;
case ProcessClass::ResolutionClass:
return ProcessClass::OkayFit;
case ProcessClass::TransactionClass: case ProcessClass::TransactionClass:
return ProcessClass::OkayFit; return ProcessClass::OkayFit;
case ProcessClass::CoordinatorClass: case ProcessClass::CoordinatorClass:
@ -167,8 +157,6 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
return ProcessClass::GoodFit; return ProcessClass::GoodFit;
case ProcessClass::UnsetClass: case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit; return ProcessClass::UnsetFit;
case ProcessClass::ResolutionClass:
return ProcessClass::OkayFit;
case ProcessClass::TransactionClass: case ProcessClass::TransactionClass:
return ProcessClass::OkayFit; return ProcessClass::OkayFit;
case ProcessClass::CoordinatorClass: case ProcessClass::CoordinatorClass:

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,7 @@
#include <cstdint> #include <cstdint>
#include <fstream> #include <fstream>
#include <ostream> #include <ostream>
#include <sstream>
#include "fdbrpc/Locality.h" #include "fdbrpc/Locality.h"
#include "fdbrpc/simulator.h" #include "fdbrpc/simulator.h"
#include "fdbclient/DatabaseContext.h" #include "fdbclient/DatabaseContext.h"
@ -874,7 +875,9 @@ void SimulationConfig::set_config(std::string config) {
StringRef StringRefOf(const char* s) { StringRef StringRefOf(const char* s) {
return StringRef((uint8_t*)s, strlen(s)); return StringRef((uint8_t*)s, strlen(s));
} }
// Generates and sets an appropriate configuration for the database according to
// the provided testConfig. Some attributes are randomly generated for more coverage
// of different combinations
void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) { void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
set_config("new"); set_config("new");
const bool simple = false; // Set true to simplify simulation configs for easier debugging const bool simple = false; // Set true to simplify simulation configs for easier debugging
@ -897,7 +900,9 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
db.resolverCount = deterministicRandom()->randomInt(1, 7); db.resolverCount = deterministicRandom()->randomInt(1, 7);
int storage_engine_type = deterministicRandom()->randomInt(0, 4); int storage_engine_type = deterministicRandom()->randomInt(0, 4);
// Continuously re-pick the storage engine type if it's the one we want to exclude // Continuously re-pick the storage engine type if it's the one we want to exclude
while (storage_engine_type == testConfig.storageEngineExcludeType) { while (std::find(testConfig.storageEngineExcludeTypes.begin(),
testConfig.storageEngineExcludeTypes.end(),
storage_engine_type) != testConfig.storageEngineExcludeTypes.end()) {
storage_engine_type = deterministicRandom()->randomInt(0, 4); storage_engine_type = deterministicRandom()->randomInt(0, 4);
} }
switch (storage_engine_type) { switch (storage_engine_type) {
@ -989,11 +994,11 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
if (deterministicRandom()->random01() < 0.5) { if (deterministicRandom()->random01() < 0.5) {
int logSpill = deterministicRandom()->randomInt(TLogSpillType::VALUE, TLogSpillType::END); int logSpill = deterministicRandom()->randomInt(TLogSpillType::VALUE, TLogSpillType::END);
set_config(format("log_spill:=%d", logSpill)); set_config(format("log_spill:=%d", logSpill));
int logVersion = deterministicRandom()->randomInt(TLogVersion::MIN_RECRUITABLE, TLogVersion::MAX_SUPPORTED + 1); int logVersion = deterministicRandom()->randomInt(TLogVersion::MIN_RECRUITABLE, testConfig.maxTLogVersion + 1);
set_config(format("log_version:=%d", logVersion)); set_config(format("log_version:=%d", logVersion));
} else { } else {
if (deterministicRandom()->random01() < 0.7) if (deterministicRandom()->random01() < 0.7)
set_config(format("log_version:=%d", TLogVersion::MAX_SUPPORTED)); set_config(format("log_version:=%d", testConfig.maxTLogVersion));
if (deterministicRandom()->random01() < 0.5) if (deterministicRandom()->random01() < 0.5)
set_config(format("log_spill:=%d", TLogSpillType::DEFAULT)); set_config(format("log_spill:=%d", TLogSpillType::DEFAULT));
} }
@ -1663,8 +1668,17 @@ void checkTestConf(const char* testFile, TestConfig* testConfig) {
sscanf(value.c_str(), "%d", &testConfig->logAntiQuorum); sscanf(value.c_str(), "%d", &testConfig->logAntiQuorum);
} }
if (attrib == "storageEngineExcludeType") { if (attrib == "storageEngineExcludeTypes") {
sscanf(value.c_str(), "%d", &testConfig->storageEngineExcludeType); std::stringstream ss(value);
for (int i; ss >> i;) {
testConfig->storageEngineExcludeTypes.push_back(i);
if (ss.peek() == ',') {
ss.ignore();
}
}
}
if (attrib == "maxTLogVersion") {
sscanf(value.c_str(), "%d", &testConfig->maxTLogVersion);
} }
} }

View File

@ -29,6 +29,7 @@
#include "fdbrpc/fdbrpc.h" #include "fdbrpc/fdbrpc.h"
#include "fdbrpc/PerfMetric.h" #include "fdbrpc/PerfMetric.h"
#include "fdbclient/NativeAPI.actor.h" #include "fdbclient/NativeAPI.actor.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
struct CheckReply { struct CheckReply {
constexpr static FileIdentifier file_identifier = 11; constexpr static FileIdentifier file_identifier = 11;
@ -109,12 +110,15 @@ struct TestConfig {
bool startIncompatibleProcess = false; bool startIncompatibleProcess = false;
int logAntiQuorum = -1; int logAntiQuorum = -1;
// Storage Engine Types: Verify match with SimulationConfig::generateNormalConfig // Storage Engine Types: Verify match with SimulationConfig::generateNormalConfig
// -1 = None
// 0 = "ssd" // 0 = "ssd"
// 1 = "memory" // 1 = "memory"
// 2 = "memory-radixtree-beta" // 2 = "memory-radixtree-beta"
// 3 = "ssd-redwood-experimental" // 3 = "ssd-redwood-experimental"
int storageEngineExcludeType = -1; // Requires a comma-separated list of numbers WITHOUT whitespaces
std::vector<int> storageEngineExcludeTypes;
// Set the maximum TLog version that can be selected for a test
// Refer to FDBTypes.h::TLogVersion. Defaults to the maximum supported version.
int maxTLogVersion = TLogVersion::MAX_SUPPORTED;
}; };
struct TesterInterface { struct TesterInterface {
@ -135,7 +139,7 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
LocalityData locality); LocalityData locality);
enum test_location_t { TEST_HERE, TEST_ON_SERVERS, TEST_ON_TESTERS }; enum test_location_t { TEST_HERE, TEST_ON_SERVERS, TEST_ON_TESTERS };
enum test_type_t { TEST_TYPE_FROM_FILE, TEST_TYPE_CONSISTENCY_CHECK }; enum test_type_t { TEST_TYPE_FROM_FILE, TEST_TYPE_CONSISTENCY_CHECK, TEST_TYPE_UNIT_TESTS };
ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile, ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
test_type_t whatToRun, test_type_t whatToRun,
@ -143,7 +147,8 @@ ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
int minTestersExpected, int minTestersExpected,
std::string fileName = std::string(), std::string fileName = std::string(),
StringRef startingConfiguration = StringRef(), StringRef startingConfiguration = StringRef(),
LocalityData locality = LocalityData()); LocalityData locality = LocalityData(),
UnitTestParameters testOptions = UnitTestParameters());
#include "flow/unactorcompiler.h" #include "flow/unactorcompiler.h"
#endif #endif

View File

@ -6956,7 +6956,7 @@ RedwoodRecordRef randomRedwoodRecordRef(const std::string& keyBuffer, const std:
return rec; return rec;
} }
TEST_CASE("!/redwood/correctness/unit/RedwoodRecordRef") { TEST_CASE("/redwood/correctness/unit/RedwoodRecordRef") {
ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[0] == 3); ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[0] == 3);
ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[1] == 4); ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[1] == 4);
ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[2] == 6); ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[2] == 6);
@ -7029,7 +7029,7 @@ TEST_CASE("!/redwood/correctness/unit/RedwoodRecordRef") {
bytes += deltaTest(a, b); bytes += deltaTest(a, b);
} }
double elapsed = timer() - start; double elapsed = timer() - start;
printf("DeltaTest() on random large records %g M/s %g MB/s\n", count / elapsed / 1e6, bytes / elapsed / 1e6); printf("DeltaTest() on random large records %f M/s %f MB/s\n", count / elapsed / 1e6, bytes / elapsed / 1e6);
keyBuffer.resize(30); keyBuffer.resize(30);
valueBuffer.resize(100); valueBuffer.resize(100);
@ -7041,7 +7041,7 @@ TEST_CASE("!/redwood/correctness/unit/RedwoodRecordRef") {
RedwoodRecordRef b = randomRedwoodRecordRef(keyBuffer, valueBuffer); RedwoodRecordRef b = randomRedwoodRecordRef(keyBuffer, valueBuffer);
bytes += deltaTest(a, b); bytes += deltaTest(a, b);
} }
printf("DeltaTest() on random small records %g M/s %g MB/s\n", count / elapsed / 1e6, bytes / elapsed / 1e6); printf("DeltaTest() on random small records %f M/s %f MB/s\n", count / elapsed / 1e6, bytes / elapsed / 1e6);
RedwoodRecordRef rec1; RedwoodRecordRef rec1;
RedwoodRecordRef rec2; RedwoodRecordRef rec2;
@ -7058,7 +7058,7 @@ TEST_CASE("!/redwood/correctness/unit/RedwoodRecordRef") {
for (i = 0; i < count; ++i) { for (i = 0; i < count; ++i) {
total += rec1.getCommonPrefixLen(rec2, 50); total += rec1.getCommonPrefixLen(rec2, 50);
} }
printf("%" PRId64 " getCommonPrefixLen(skip=50) %g M/s\n", total, count / (timer() - start) / 1e6); printf("%" PRId64 " getCommonPrefixLen(skip=50) %f M/s\n", total, count / (timer() - start) / 1e6);
start = timer(); start = timer();
total = 0; total = 0;
@ -7066,7 +7066,7 @@ TEST_CASE("!/redwood/correctness/unit/RedwoodRecordRef") {
for (i = 0; i < count; ++i) { for (i = 0; i < count; ++i) {
total += rec1.getCommonPrefixLen(rec2, 0); total += rec1.getCommonPrefixLen(rec2, 0);
} }
printf("%" PRId64 " getCommonPrefixLen(skip=0) %g M/s\n", total, count / (timer() - start) / 1e6); printf("%" PRId64 " getCommonPrefixLen(skip=0) %f M/s\n", total, count / (timer() - start) / 1e6);
char buf[1000]; char buf[1000];
RedwoodRecordRef::Delta& d = *(RedwoodRecordRef::Delta*)buf; RedwoodRecordRef::Delta& d = *(RedwoodRecordRef::Delta*)buf;
@ -7079,7 +7079,7 @@ TEST_CASE("!/redwood/correctness/unit/RedwoodRecordRef") {
for (i = 0; i < count; ++i) { for (i = 0; i < count; ++i) {
total += rec1.writeDelta(d, rec2, commonPrefix); total += rec1.writeDelta(d, rec2, commonPrefix);
} }
printf("%" PRId64 " writeDelta(commonPrefix=%d) %g M/s\n", total, commonPrefix, count / (timer() - start) / 1e6); printf("%" PRId64 " writeDelta(commonPrefix=%d) %f M/s\n", total, commonPrefix, count / (timer() - start) / 1e6);
start = timer(); start = timer();
total = 0; total = 0;
@ -7087,12 +7087,12 @@ TEST_CASE("!/redwood/correctness/unit/RedwoodRecordRef") {
for (i = 0; i < count; ++i) { for (i = 0; i < count; ++i) {
total += rec1.writeDelta(d, rec2); total += rec1.writeDelta(d, rec2);
} }
printf("%" PRId64 " writeDelta() %g M/s\n", total, count / (timer() - start) / 1e6); printf("%" PRId64 " writeDelta() %f M/s\n", total, count / (timer() - start) / 1e6);
return Void(); return Void();
} }
TEST_CASE("!/redwood/correctness/unit/deltaTree/RedwoodRecordRef") { TEST_CASE("/redwood/correctness/unit/deltaTree/RedwoodRecordRef") {
// Sanity check on delta tree node format // Sanity check on delta tree node format
ASSERT(DeltaTree<RedwoodRecordRef>::Node::headerSize(false) == 4); ASSERT(DeltaTree<RedwoodRecordRef>::Node::headerSize(false) == 4);
ASSERT(DeltaTree<RedwoodRecordRef>::Node::headerSize(true) == 8); ASSERT(DeltaTree<RedwoodRecordRef>::Node::headerSize(true) == 8);
@ -7271,7 +7271,7 @@ TEST_CASE("!/redwood/correctness/unit/deltaTree/RedwoodRecordRef") {
return Void(); return Void();
} }
TEST_CASE("!/redwood/correctness/unit/deltaTree/IntIntPair") { TEST_CASE("/redwood/correctness/unit/deltaTree/IntIntPair") {
const int N = 200; const int N = 200;
IntIntPair prev = { 1, 0 }; IntIntPair prev = { 1, 0 };
IntIntPair next = { 10000, 10000 }; IntIntPair next = { 10000, 10000 };
@ -7615,7 +7615,7 @@ struct SimpleCounter {
std::string toString() { return format("%" PRId64 "/%.2f/%.2f", x, rate() / 1e6, avgRate() / 1e6); } std::string toString() { return format("%" PRId64 "/%.2f/%.2f", x, rate() / 1e6, avgRate() / 1e6); }
}; };
TEST_CASE("!/redwood/performance/mutationBuffer") { TEST_CASE(":/redwood/performance/mutationBuffer") {
// This test uses pregenerated short random keys // This test uses pregenerated short random keys
int count = 10e6; int count = 10e6;
@ -7643,34 +7643,47 @@ TEST_CASE("!/redwood/performance/mutationBuffer") {
return Void(); return Void();
} }
TEST_CASE("!/redwood/correctness/btree") { TEST_CASE("/redwood/correctness/btree") {
g_redwoodMetricsActor = Void(); // Prevent trace event metrics from starting g_redwoodMetricsActor = Void(); // Prevent trace event metrics from starting
g_redwoodMetrics.clear(); g_redwoodMetrics.clear();
state std::string pagerFile = "unittest_pageFile.redwood"; state std::string fileName = params.get("fileName").orDefault("unittest_pageFile.redwood");
IPager2* pager; IPager2* pager;
state bool serialTest = deterministicRandom()->coinflip(); state bool serialTest = params.getInt("serialTest").orDefault(deterministicRandom()->coinflip());
state bool shortTest = deterministicRandom()->coinflip(); state bool shortTest = params.getInt("shortTest").orDefault(deterministicRandom()->coinflip());
state int pageSize = state int pageSize =
shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400)); shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
state int64_t targetPageOps = shortTest ? 50000 : 1000000; state int64_t targetPageOps = params.getInt("targetPageOps").orDefault(shortTest ? 50000 : 1000000);
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .001); state bool pagerMemoryOnly =
state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2); params.getInt("pagerMemoryOnly").orDefault(shortTest && (deterministicRandom()->random01() < .001));
state int maxValueSize = randomSize(pageSize * 25); state int maxKeySize = params.getInt("maxKeySize").orDefault(deterministicRandom()->randomInt(1, pageSize * 2));
state int maxCommitSize = shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6)); state int maxValueSize = params.getInt("maxValueSize").orDefault(randomSize(pageSize * 25));
state double clearProbability = deterministicRandom()->random01() * .1; state int maxCommitSize =
state double clearSingleKeyProbability = deterministicRandom()->random01(); params.getInt("maxCommitSize")
state double clearPostSetProbability = deterministicRandom()->random01() * .1; .orDefault(shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6)));
state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3); state double clearProbability =
state double advanceOldVersionProbability = deterministicRandom()->random01(); params.getDouble("clearProbability").orDefault(deterministicRandom()->random01() * .1);
state double clearSingleKeyProbability =
params.getDouble("clearSingleKeyProbability").orDefault(deterministicRandom()->random01());
state double clearPostSetProbability =
params.getDouble("clearPostSetProbability").orDefault(deterministicRandom()->random01() * .1);
state double coldStartProbability = params.getDouble("coldStartProbability")
.orDefault(pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3));
state double advanceOldVersionProbability =
params.getDouble("advanceOldVersionProbability").orDefault(deterministicRandom()->random01());
state int64_t cacheSizeBytes = state int64_t cacheSizeBytes =
pagerMemoryOnly ? 2e9 : (pageSize * deterministicRandom()->randomInt(1, (BUGGIFY ? 2 : 10000) + 1)); params.getInt("cacheSizeBytes")
state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8); .orDefault(pagerMemoryOnly ? 2e9
state Version remapCleanupWindow = BUGGIFY ? 0 : deterministicRandom()->randomInt64(1, versionIncrement * 50); : (pageSize * deterministicRandom()->randomInt(1, (BUGGIFY ? 2 : 10000) + 1)));
state int maxVerificationMapEntries = 300e3; state Version versionIncrement =
params.getInt("versionIncrement").orDefault(deterministicRandom()->randomInt64(1, 1e8));
state Version remapCleanupWindow =
params.getInt("remapCleanupWindow")
.orDefault(BUGGIFY ? 0 : deterministicRandom()->randomInt64(1, versionIncrement * 50));
state int maxVerificationMapEntries = params.getInt("maxVerificationMapEntries").orDefault(300e3);
printf("\n"); printf("\n");
printf("targetPageOps: %" PRId64 "\n", targetPageOps); printf("targetPageOps: %" PRId64 "\n", targetPageOps);
@ -7693,11 +7706,11 @@ TEST_CASE("!/redwood/correctness/btree") {
printf("\n"); printf("\n");
printf("Deleting existing test data...\n"); printf("Deleting existing test data...\n");
deleteFile(pagerFile); deleteFile(fileName);
printf("Initializing...\n"); printf("Initializing...\n");
pager = new DWALPager(pageSize, pagerFile, cacheSizeBytes, remapCleanupWindow, pagerMemoryOnly); pager = new DWALPager(pageSize, fileName, cacheSizeBytes, remapCleanupWindow, pagerMemoryOnly);
state VersionedBTree* btree = new VersionedBTree(pager, pagerFile); state VersionedBTree* btree = new VersionedBTree(pager, fileName);
wait(btree->init()); wait(btree->init());
state std::map<std::pair<std::string, Version>, Optional<std::string>> written; state std::map<std::pair<std::string, Version>, Optional<std::string>> written;
@ -7900,8 +7913,8 @@ TEST_CASE("!/redwood/correctness/btree") {
wait(closedFuture); wait(closedFuture);
printf("Reopening btree from disk.\n"); printf("Reopening btree from disk.\n");
IPager2* pager = new DWALPager(pageSize, pagerFile, cacheSizeBytes, remapCleanupWindow); IPager2* pager = new DWALPager(pageSize, fileName, cacheSizeBytes, remapCleanupWindow);
btree = new VersionedBTree(pager, pagerFile); btree = new VersionedBTree(pager, fileName);
wait(btree->init()); wait(btree->init());
Version v = btree->getLatestVersion(); Version v = btree->getLatestVersion();
@ -7937,7 +7950,7 @@ TEST_CASE("!/redwood/correctness/btree") {
state Future<Void> closedFuture = btree->onClosed(); state Future<Void> closedFuture = btree->onClosed();
btree->close(); btree->close();
wait(closedFuture); wait(closedFuture);
btree = new VersionedBTree(new DWALPager(pageSize, pagerFile, cacheSizeBytes, 0), pagerFile); btree = new VersionedBTree(new DWALPager(pageSize, fileName, cacheSizeBytes, 0), fileName);
wait(btree->init()); wait(btree->init());
wait(btree->clearAllAndCheckSanity()); wait(btree->clearAllAndCheckSanity());
@ -8003,7 +8016,7 @@ ACTOR Future<Void> randomScans(VersionedBTree* btree,
return Void(); return Void();
} }
TEST_CASE("!/redwood/correctness/pager/cow") { TEST_CASE(":/redwood/correctness/pager/cow") {
state std::string pagerFile = "unittest_pageFile.redwood"; state std::string pagerFile = "unittest_pageFile.redwood";
printf("Deleting old test data\n"); printf("Deleting old test data\n");
deleteFile(pagerFile); deleteFile(pagerFile);
@ -8030,7 +8043,7 @@ TEST_CASE("!/redwood/correctness/pager/cow") {
return Void(); return Void();
} }
TEST_CASE("!/redwood/performance/set") { TEST_CASE(":/redwood/performance/set") {
state SignalableActorCollection actors; state SignalableActorCollection actors;
g_redwoodMetricsActor = Void(); // Prevent trace event metrics from starting g_redwoodMetricsActor = Void(); // Prevent trace event metrics from starting
@ -8045,21 +8058,22 @@ TEST_CASE("!/redwood/performance/set") {
deleteFile(pagerFile); deleteFile(pagerFile);
} }
state int pageSize = SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE; state int pageSize = params.getInt("pageSize").orDefault(SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE);
state int64_t pageCacheBytes = FLOW_KNOBS->PAGE_CACHE_4K; state int64_t pageCacheBytes = params.getInt("pageCacheBytes").orDefault(FLOW_KNOBS->PAGE_CACHE_4K);
state int nodeCount = 1e9; state int nodeCount = params.getInt("nodeCount").orDefault(1e9);
state int maxRecordsPerCommit = 20000; state int maxRecordsPerCommit = params.getInt("maxRecordsPerCommit").orDefault(20000);
state int maxKVBytesPerCommit = 20e6; state int maxKVBytesPerCommit = params.getInt("maxKVBytesPerCommit").orDefault(20e6);
state int64_t kvBytesTarget = 4e9; state int64_t kvBytesTarget = params.getInt("kvBytesTarget").orDefault(4e9);
state int minKeyPrefixBytes = 25; state int minKeyPrefixBytes = params.getInt("minKeyPrefixBytes").orDefault(25);
state int maxKeyPrefixBytes = 25; state int maxKeyPrefixBytes = params.getInt("maxKeyPrefixBytes").orDefault(25);
state int minValueSize = 100; state int minValueSize = params.getInt("minValueSize").orDefault(100);
state int maxValueSize = 500; state int maxValueSize = params.getInt("maxValueSize").orDefault(500);
state int minConsecutiveRun = 1; state int minConsecutiveRun = params.getInt("minConsecutiveRun").orDefault(1);
state int maxConsecutiveRun = 100000; state int maxConsecutiveRun = params.getInt("maxConsecutiveRun").orDefault(100);
state char firstKeyChar = 'a'; state char firstKeyChar = params.get("firstKeyChar").orDefault("a")[0];
state char lastKeyChar = 'm'; state char lastKeyChar = params.get("lastKeyChar").orDefault("m")[0];
state Version remapCleanupWindow = SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW; state Version remapCleanupWindow =
params.getInt("remapCleanupWindow").orDefault(SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW);
printf("pageSize: %d\n", pageSize); printf("pageSize: %d\n", pageSize);
printf("pageCacheBytes: %" PRId64 "\n", pageCacheBytes); printf("pageCacheBytes: %" PRId64 "\n", pageCacheBytes);
@ -8541,11 +8555,11 @@ ACTOR Future<Void> doPrefixInsertComparison(int suffixSize,
return Void(); return Void();
} }
TEST_CASE("!/redwood/performance/prefixSizeComparison") { TEST_CASE(":/redwood/performance/prefixSizeComparison") {
state int suffixSize = 12; state int suffixSize = params.getInt("suffixSize").orDefault(12);
state int valueSize = 100; state int valueSize = params.getInt("valueSize").orDefault(100);
state int recordCountTarget = 100e6; state int recordCountTarget = params.getInt("recordCountTarget").orDefault(100e6);
state int usePrefixesInOrder = false; state bool usePrefixesInOrder = params.getInt("usePrefixesInOrder").orDefault(0);
wait(doPrefixInsertComparison( wait(doPrefixInsertComparison(
suffixSize, valueSize, recordCountTarget, usePrefixesInOrder, KVSource({ { 10, 100000 } }))); suffixSize, valueSize, recordCountTarget, usePrefixesInOrder, KVSource({ { 10, 100000 } })));
@ -8562,10 +8576,10 @@ TEST_CASE("!/redwood/performance/prefixSizeComparison") {
return Void(); return Void();
} }
TEST_CASE("!/redwood/performance/sequentialInsert") { TEST_CASE(":/redwood/performance/sequentialInsert") {
state int prefixLen = 30; state int prefixLen = params.getInt("prefixLen").orDefault(30);
state int valueSize = 100; state int valueSize = params.getInt("valueSize").orDefault(100);
state int recordCountTarget = 100e6; state int recordCountTarget = params.getInt("recordCountTarget").orDefault(100e6);
deleteFile("test.redwood"); deleteFile("test.redwood");
wait(delay(5)); wait(delay(5));

View File

@ -67,6 +67,7 @@
#include "flow/TLSConfig.actor.h" #include "flow/TLSConfig.actor.h"
#include "flow/Tracing.h" #include "flow/Tracing.h"
#include "flow/WriteOnlySet.h" #include "flow/WriteOnlySet.h"
#include "flow/UnitTest.h"
#if defined(__linux__) || defined(__FreeBSD__) #if defined(__linux__) || defined(__FreeBSD__)
#include <execinfo.h> #include <execinfo.h>
@ -89,7 +90,7 @@ enum {
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_TRACER, OPT_NEWCONSOLE, OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_TRACER, OPT_NEWCONSOLE,
OPT_NOBOX, OPT_TESTFILE, OPT_RESTARTING, OPT_RESTORING, OPT_RANDOMSEED, OPT_KEY, OPT_MEMLIMIT, OPT_STORAGEMEMLIMIT, OPT_CACHEMEMLIMIT, OPT_MACHINEID, OPT_NOBOX, OPT_TESTFILE, OPT_RESTARTING, OPT_RESTORING, OPT_RANDOMSEED, OPT_KEY, OPT_MEMLIMIT, OPT_STORAGEMEMLIMIT, OPT_CACHEMEMLIMIT, OPT_MACHINEID,
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_BUILD_FLAGS, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_BUILD_FLAGS, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR,
OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE, OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_UNITTESTPARAM, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE, OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE,
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE
}; };
@ -163,6 +164,7 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_HELP, "--help", SO_NONE }, { OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE }, { OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP }, { OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_UNITTESTPARAM, "--test_", SO_REQ_SEP },
{ OPT_LOCALITY, "--locality_", SO_REQ_SEP }, { OPT_LOCALITY, "--locality_", SO_REQ_SEP },
{ OPT_TESTSERVERS, "--testservers", SO_REQ_SEP }, { OPT_TESTSERVERS, "--testservers", SO_REQ_SEP },
{ OPT_TEST_ON_SERVERS, "--testonservers", SO_NONE }, { OPT_TEST_ON_SERVERS, "--testonservers", SO_NONE },
@ -623,16 +625,19 @@ static void printUsage(const char* name, bool devhelp) {
printOptionUsage("-h, -?, --help", "Display this help and exit."); printOptionUsage("-h, -?, --help", "Display this help and exit.");
if (devhelp) { if (devhelp) {
printf(" --build_flags Print build information and exit.\n"); printf(" --build_flags Print build information and exit.\n");
printOptionUsage("-r ROLE, --role ROLE", printOptionUsage(
" Server role (valid options are fdbd, test, multitest," "-r ROLE, --role ROLE",
" simulation, networktestclient, networktestserver, restore" " Server role (valid options are fdbd, test, multitest,"
" consistencycheck, kvfileintegritycheck, kvfilegeneratesums). The default is `fdbd'."); " simulation, networktestclient, networktestserver, restore"
" consistencycheck, kvfileintegritycheck, kvfilegeneratesums, unittests). The default is `fdbd'.");
#ifdef _WIN32 #ifdef _WIN32
printOptionUsage("-n, --newconsole", " Create a new console."); printOptionUsage("-n, --newconsole", " Create a new console.");
printOptionUsage("-q, --no_dialog", " Disable error dialog on crash."); printOptionUsage("-q, --no_dialog", " Disable error dialog on crash.");
printOptionUsage("--parentpid PID", " Specify a process after whose termination to exit."); printOptionUsage("--parentpid PID", " Specify a process after whose termination to exit.");
#endif #endif
printOptionUsage("-f TESTFILE, --testfile", " Testfile to run, defaults to `tests/default.txt'."); printOptionUsage("-f TESTFILE, --testfile",
" Testfile to run, defaults to `tests/default.txt'. If role is `unittests', specifies which "
"unit tests to run as a search prefix.");
printOptionUsage("-R, --restarting", " Restart a previous simulation that was cleanly shut down."); printOptionUsage("-R, --restarting", " Restart a previous simulation that was cleanly shut down.");
printOptionUsage("-s SEED, --seed SEED", " Random seed."); printOptionUsage("-s SEED, --seed SEED", " Random seed.");
printOptionUsage("-k KEY, --key KEY", "Target key for search role."); printOptionUsage("-k KEY, --key KEY", "Target key for search role.");
@ -652,6 +657,8 @@ static void printUsage(const char* name, bool devhelp) {
printOptionUsage("--num_testers NUM", printOptionUsage("--num_testers NUM",
" A multitester will wait for NUM testers before starting" " A multitester will wait for NUM testers before starting"
" (defaults to 1)."); " (defaults to 1).");
printOptionUsage("--test_PARAMNAME PARAMVALUE",
" Set a UnitTest named parameter to the given value. Names are case sensitive.");
#ifdef __linux__ #ifdef __linux__
printOptionUsage("--rsssize SIZE", printOptionUsage("--rsssize SIZE",
" Turns on automatic heap profiling when RSS memory size exceeds" " Turns on automatic heap profiling when RSS memory size exceeds"
@ -923,6 +930,7 @@ enum class ServerRole {
SkipListTest, SkipListTest,
Test, Test,
VersionedMapTest, VersionedMapTest,
UnitTests
}; };
struct CLIOptions { struct CLIOptions {
std::string commandLine; std::string commandLine;
@ -971,6 +979,7 @@ struct CLIOptions {
Reference<ClusterConnectionFile> connectionFile; Reference<ClusterConnectionFile> connectionFile;
Standalone<StringRef> machineId; Standalone<StringRef> machineId;
UnitTestParameters testParams;
static CLIOptions parseArgs(int argc, char* argv[]) { static CLIOptions parseArgs(int argc, char* argv[]) {
CLIOptions opts; CLIOptions opts;
@ -1045,6 +1054,15 @@ private:
knobs.push_back(std::make_pair(syn, args.OptionArg())); knobs.push_back(std::make_pair(syn, args.OptionArg()));
break; break;
} }
case OPT_UNITTESTPARAM: {
std::string syn = args.OptionSyntax();
if (!StringRef(syn).startsWith(LiteralStringRef("--test_"))) {
fprintf(stderr, "ERROR: unable to parse knob option '%s'\n", syn.c_str());
flushAndExit(FDB_EXIT_ERROR);
}
testParams.set(syn.substr(7), args.OptionArg());
break;
}
case OPT_LOCALITY: { case OPT_LOCALITY: {
std::string syn = args.OptionSyntax(); std::string syn = args.OptionSyntax();
if (!StringRef(syn).startsWith(LiteralStringRef("--locality_"))) { if (!StringRef(syn).startsWith(LiteralStringRef("--locality_"))) {
@ -1103,6 +1121,8 @@ private:
role = ServerRole::KVFileGenerateIOLogChecksums; role = ServerRole::KVFileGenerateIOLogChecksums;
else if (!strcmp(sRole, "consistencycheck")) else if (!strcmp(sRole, "consistencycheck"))
role = ServerRole::ConsistencyCheck; role = ServerRole::ConsistencyCheck;
else if (!strcmp(sRole, "unittests"))
role = ServerRole::UnitTests;
else { else {
fprintf(stderr, "ERROR: Unknown role `%s'\n", sRole); fprintf(stderr, "ERROR: Unknown role `%s'\n", sRole);
printHelpTeaser(argv[0]); printHelpTeaser(argv[0]);
@ -1462,7 +1482,8 @@ private:
return StringRef(addr).startsWith(LiteralStringRef("auto:")); return StringRef(addr).startsWith(LiteralStringRef("auto:"));
}); });
if ((role != ServerRole::Simulation && role != ServerRole::CreateTemplateDatabase && if ((role != ServerRole::Simulation && role != ServerRole::CreateTemplateDatabase &&
role != ServerRole::KVFileIntegrityCheck && role != ServerRole::KVFileGenerateIOLogChecksums) || role != ServerRole::KVFileIntegrityCheck && role != ServerRole::KVFileGenerateIOLogChecksums &&
role != ServerRole::UnitTests) ||
autoPublicAddress) { autoPublicAddress) {
if (seedSpecified && !fileExists(connFile)) { if (seedSpecified && !fileExists(connFile)) {
@ -1999,6 +2020,18 @@ int main(int argc, char* argv[]) {
StringRef(), StringRef(),
opts.localities)); opts.localities));
g_network->run(); g_network->run();
} else if (role == ServerRole::UnitTests) {
setupRunLoopProfiler();
auto m = startSystemMonitor(opts.dataFolder, opts.dcId, opts.zoneId, opts.zoneId);
f = stopAfter(runTests(opts.connectionFile,
TEST_TYPE_UNIT_TESTS,
TEST_HERE,
1,
opts.testFile,
StringRef(),
opts.localities,
opts.testParams));
g_network->run();
} else if (role == ServerRole::CreateTemplateDatabase) { } else if (role == ServerRole::CreateTemplateDatabase) {
createTemplateDatabase(); createTemplateDatabase();
} else if (role == ServerRole::NetworkTestClient) { } else if (role == ServerRole::NetworkTestClient) {

View File

@ -517,13 +517,6 @@ struct P2PNetworkTest {
self->listeners.size(), self->listeners.size(),
self->remotes.size(), self->remotes.size(),
self->connectionsOut); self->connectionsOut);
printf("Request size: %s\n", self->requestBytes.toString().c_str());
printf("Response size: %s\n", self->replyBytes.toString().c_str());
printf("Requests per outgoing session: %d\n", self->requests.toString().c_str());
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
for (auto n : self->remotes) { for (auto n : self->remotes) {
printf("Remote: %s\n", n.toString().c_str()); printf("Remote: %s\n", n.toString().c_str());
@ -534,6 +527,19 @@ struct P2PNetworkTest {
actors.add(incoming(self, el)); actors.add(incoming(self, el));
} }
printf("Request size: %s\n", self->requestBytes.toString().c_str());
printf("Response size: %s\n", self->replyBytes.toString().c_str());
printf("Requests per outgoing session: %s\n", self->requests.toString().c_str());
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) {
printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n");
ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty());
}
if (!self->remotes.empty()) { if (!self->remotes.empty()) {
for (int i = 0; i < self->connectionsOut; ++i) { for (int i = 0; i < self->connectionsOut; ++i) {
actors.add(outgoing(self)); actors.add(outgoing(self));
@ -549,27 +555,30 @@ struct P2PNetworkTest {
Future<Void> run() { return run_impl(this); } Future<Void> run() { return run_impl(this); }
}; };
int getEnvInt(const char* name, int defaultValue = 0) { // Peer-to-Peer network test.
const char* val = getenv(name); // One or more instances can be run and set to talk to each other.
return val != nullptr ? atol(val) : defaultValue; // Each instance
} // - listens on 0 or more listenerAddresses
// - maintains 0 or more connectionsOut at a time, each to a random choice from remoteAddresses
std::string getEnvStr(const char* name, std::string defaultValue = "") { // Address lists are a string of comma-separated IP:port[:tls] strings.
const char* val = getenv(name); //
return val != nullptr ? val : defaultValue; // The other arguments can be specified as "fixedValue" or "minValue:maxValue".
} // Each outgoing connection will live for a random requests count.
// Each request will
// TODO: Remove this hacky thing and make a "networkp2ptest" role in fdbserver // - send a random requestBytes sized message
TEST_CASE("!p2ptest") { // - wait for a random replyBytes sized response.
state P2PNetworkTest p2p(getEnvStr("listenerAddresses", ""), // The client will close the connection after a random idleMilliseconds.
getEnvStr("remoteAddresses", ""), // Reads and writes can optionally preceded by random delays, waitReadMilliseconds and waitWriteMilliseconds.
getEnvInt("connectionsOut", 0), TEST_CASE(":/network/p2ptest") {
getEnvStr("requestBytes", "0"), state P2PNetworkTest p2p(params.get("listenerAddresses").orDefault(""),
getEnvStr("replyBytes", "0"), params.get("remoteAddresses").orDefault(""),
getEnvStr("requests", "0"), params.getInt("connectionsOut").orDefault(1),
getEnvStr("idleMilliseconds", "0"), params.get("requestBytes").orDefault("50:100"),
getEnvStr("waitReadMilliseconds", "0"), params.get("replyBytes").orDefault("500:1000"),
getEnvStr("waitWriteMilliseconds", "0")); params.get("requests").orDefault("10:10000"),
params.get("idleMilliseconds").orDefault("0"),
params.get("waitReadMilliseconds").orDefault("0"),
params.get("waitWriteMilliseconds").orDefault("0"));
wait(p2p.run()); wait(p2p.run());
return Void(); return Void();

View File

@ -763,7 +763,7 @@ ACTOR Future<DistributedTestResults> runWorkload(Database cx, std::vector<Tester
req.title = spec.title; req.title = spec.title;
req.useDatabase = spec.useDB; req.useDatabase = spec.useDB;
req.timeout = spec.timeout; req.timeout = spec.timeout;
req.databasePingDelay = spec.databasePingDelay; req.databasePingDelay = spec.useDB ? spec.databasePingDelay : 0.0;
req.options = spec.options; req.options = spec.options;
req.clientId = i; req.clientId = i;
req.clientCount = testers.size(); req.clientCount = testers.size();
@ -1036,8 +1036,10 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
} }, } },
{ "startIncompatibleProcess", { "startIncompatibleProcess",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value); } }, [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value); } },
{ "storageEngineExcludeType", { "storageEngineExcludeTypes",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStorageEngineExcludeType", ""); } } [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStorageEngineExcludeTypes", ""); } },
{ "maxTLogVersion",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedMaxTLogVersion", ""); } }
}; };
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = { std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {
@ -1572,13 +1574,16 @@ ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
int minTestersExpected, int minTestersExpected,
std::string fileName, std::string fileName,
StringRef startingConfiguration, StringRef startingConfiguration,
LocalityData locality) { LocalityData locality,
UnitTestParameters testOptions) {
state vector<TestSpec> testSpecs; state vector<TestSpec> testSpecs;
auto cc = makeReference<AsyncVar<Optional<ClusterControllerFullInterface>>>(); auto cc = makeReference<AsyncVar<Optional<ClusterControllerFullInterface>>>();
auto ci = makeReference<AsyncVar<Optional<ClusterInterface>>>(); auto ci = makeReference<AsyncVar<Optional<ClusterInterface>>>();
vector<Future<Void>> actors; vector<Future<Void>> actors;
actors.push_back(reportErrors(monitorLeader(connFile, cc), "MonitorLeader")); if (connFile) {
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface")); actors.push_back(reportErrors(monitorLeader(connFile, cc), "MonitorLeader"));
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface"));
}
if (whatToRun == TEST_TYPE_CONSISTENCY_CHECK) { if (whatToRun == TEST_TYPE_CONSISTENCY_CHECK) {
TestSpec spec; TestSpec spec;
@ -1603,6 +1608,22 @@ ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
KeyValueRef(LiteralStringRef("shuffleShards"), LiteralStringRef("true"))); KeyValueRef(LiteralStringRef("shuffleShards"), LiteralStringRef("true")));
spec.options.push_back_deep(spec.options.arena(), options); spec.options.push_back_deep(spec.options.arena(), options);
testSpecs.push_back(spec); testSpecs.push_back(spec);
} else if (whatToRun == TEST_TYPE_UNIT_TESTS) {
TestSpec spec;
Standalone<VectorRef<KeyValueRef>> options;
spec.title = LiteralStringRef("UnitTests");
spec.startDelay = 0;
spec.useDB = false;
spec.timeout = 0;
options.push_back_deep(options.arena(),
KeyValueRef(LiteralStringRef("testName"), LiteralStringRef("UnitTests")));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("testsMatching"), fileName));
// Add unit test options as test spec options
for (auto& kv : testOptions.params) {
options.push_back_deep(options.arena(), KeyValueRef(kv.first, kv.second));
}
spec.options.push_back_deep(spec.options.arena(), options);
testSpecs.push_back(spec);
} else { } else {
ifstream ifs; ifstream ifs;
ifs.open(fileName.c_str(), ifstream::in); ifs.open(fileName.c_str(), ifstream::in);

View File

@ -1,5 +1,6 @@
#include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/ServerDBInfo.h" #include "fdbserver/ServerDBInfo.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RunTransaction.actor.h" #include "fdbclient/RunTransaction.actor.h"
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
@ -269,10 +270,12 @@ struct ClientTransactionProfileCorrectnessWorkload : TestWorkload {
ACTOR Future<Void> changeProfilingParameters(Database cx, int64_t sizeLimit, double sampleProbability) { ACTOR Future<Void> changeProfilingParameters(Database cx, int64_t sizeLimit, double sampleProbability) {
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->set(fdbClientInfoTxnSampleRate, BinaryWriter::toValue(sampleProbability, Unversioned())); Tuple rate = Tuple().appendDouble(sampleProbability);
tr->set(fdbClientInfoTxnSizeLimit, BinaryWriter::toValue(sizeLimit, Unversioned())); Tuple size = Tuple().append(sizeLimit);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
return Void(); return Void();
})); }));
return Void(); return Void();

View File

@ -21,6 +21,7 @@
#include "boost/lexical_cast.hpp" #include "boost/lexical_cast.hpp"
#include "boost/algorithm/string.hpp" #include "boost/algorithm/string.hpp"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h" #include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h" #include "fdbclient/ReadYourWrites.h"

View File

@ -34,6 +34,7 @@ struct UnitTestWorkload : TestWorkload {
bool enabled; bool enabled;
std::string testPattern; std::string testPattern;
int testRunLimit; int testRunLimit;
UnitTestParameters testParams;
PerfIntCounter testsAvailable, testsExecuted, testsFailed; PerfIntCounter testsAvailable, testsExecuted, testsFailed;
PerfDoubleCounter totalWallTime, totalSimTime; PerfDoubleCounter totalWallTime, totalSimTime;
@ -45,6 +46,14 @@ struct UnitTestWorkload : TestWorkload {
enabled = !clientId; // only do this on the "first" client enabled = !clientId; // only do this on the "first" client
testPattern = getOption(options, LiteralStringRef("testsMatching"), Value()).toString(); testPattern = getOption(options, LiteralStringRef("testsMatching"), Value()).toString();
testRunLimit = getOption(options, LiteralStringRef("maxTestCases"), -1); testRunLimit = getOption(options, LiteralStringRef("maxTestCases"), -1);
// Consume all remaining options as testParams which the unit test can access
for (auto& kv : options) {
if (kv.value.size() != 0) {
testParams.set(kv.key.toString(), getOption(options, kv.key, StringRef()).toString());
}
}
forceLinkIndexedSetTests(); forceLinkIndexedSetTests();
forceLinkDequeTests(); forceLinkDequeTests();
forceLinkFlowTests(); forceLinkFlowTests();
@ -94,7 +103,7 @@ struct UnitTestWorkload : TestWorkload {
state double start_timer = timer(); state double start_timer = timer();
try { try {
wait(test->func()); wait(test->func(self->testParams));
} catch (Error& e) { } catch (Error& e) {
++self->testsFailed; ++self->testsFailed;
result = e; result = e;

View File

@ -135,6 +135,7 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( DISABLE_POSIX_KERNEL_AIO, 0 ); init( DISABLE_POSIX_KERNEL_AIO, 0 );
//AsyncFileNonDurable //AsyncFileNonDurable
init( NON_DURABLE_MAX_WRITE_DELAY, 5.0 );
init( MAX_PRIOR_MODIFICATION_DELAY, 1.0 ); if( randomize && BUGGIFY ) MAX_PRIOR_MODIFICATION_DELAY = 10.0; init( MAX_PRIOR_MODIFICATION_DELAY, 1.0 ); if( randomize && BUGGIFY ) MAX_PRIOR_MODIFICATION_DELAY = 10.0;
//GenericActors //GenericActors

View File

@ -149,6 +149,7 @@ public:
int DISABLE_POSIX_KERNEL_AIO; int DISABLE_POSIX_KERNEL_AIO;
// AsyncFileNonDurable // AsyncFileNonDurable
double NON_DURABLE_MAX_WRITE_DELAY;
double MAX_PRIOR_MODIFICATION_DELAY; double MAX_PRIOR_MODIFICATION_DELAY;
// GenericActors // GenericActors

View File

@ -26,3 +26,40 @@ UnitTest::UnitTest(const char* name, const char* file, int line, TestFunction fu
: name(name), file(file), line(line), func(func), next(g_unittests.tests) { : name(name), file(file), line(line), func(func), next(g_unittests.tests) {
g_unittests.tests = this; g_unittests.tests = this;
} }
void UnitTestParameters::set(const std::string& name, const std::string& value) {
printf("setting %s = %s\n", name.c_str(), value.c_str());
params[name] = value;
}
Optional<std::string> UnitTestParameters::get(const std::string& name) const {
auto it = params.find(name);
if (it != params.end()) {
return it->second;
}
return {};
}
void UnitTestParameters::set(const std::string& name, int64_t value) {
set(name, format("%" PRId64, value));
};
void UnitTestParameters::set(const std::string& name, double value) {
set(name, format("%g", value));
};
Optional<int64_t> UnitTestParameters::getInt(const std::string& name) const {
auto opt = get(name);
if (opt.present()) {
return atoll(opt.get().c_str());
}
return {};
}
Optional<double> UnitTestParameters::getDouble(const std::string& name) const {
auto opt = get(name);
if (opt.present()) {
return atof(opt.get().c_str());
}
return {};
}

View File

@ -45,8 +45,34 @@
#include "flow/flow.h" #include "flow/flow.h"
#include <cinttypes>
struct UnitTestParameters {
// Map of named case-sensitive parameters
std::map<std::string, std::string> params;
// Set a named parameter to a string value, replacing any existing value
void set(const std::string& name, const std::string& value);
// Set a named parameter to an integer converted to a string value, replacing any existing value
void set(const std::string& name, int64_t value);
// Set a named parameter to a double converted to a string value, replacing any existing value
void set(const std::string& name, double value);
// Get a parameter's value, will return !present() if parameter was not set
Optional<std::string> get(const std::string& name) const;
// Get a parameter's value as an integer, will return !present() if parameter was not set
Optional<int64_t> getInt(const std::string& name) const;
// Get a parameter's value parsed as a double, will return !present() if parameter was not set
Optional<double> getDouble(const std::string& name) const;
};
// Unit test definition structured as a linked list item
struct UnitTest { struct UnitTest {
typedef Future<Void> (*TestFunction)(); typedef Future<Void> (*TestFunction)(const UnitTestParameters& params);
const char* name; const char* name;
const char* file; const char* file;
@ -57,6 +83,7 @@ struct UnitTest {
UnitTest(const char* name, const char* file, int line, TestFunction func); UnitTest(const char* name, const char* file, int line, TestFunction func);
}; };
// Collection of unit tests in the form of a linked list
struct UnitTestCollection { struct UnitTestCollection {
UnitTest* tests; UnitTest* tests;
}; };
@ -71,17 +98,17 @@ extern UnitTestCollection g_unittests;
#ifdef FLOW_DISABLE_UNIT_TESTS #ifdef FLOW_DISABLE_UNIT_TESTS
#define TEST_CASE(name) static Future<Void> FILE_UNIQUE_NAME(disabled_testcase_func)() #define TEST_CASE(name) static Future<Void> FILE_UNIQUE_NAME(disabled_testcase_func)(const UnitTestParameters& params)
#define ACTOR_TEST_CASE(actorname, name) #define ACTOR_TEST_CASE(actorname, name)
#else #else
#define TEST_CASE(name) \ #define TEST_CASE(name) \
static Future<Void> FILE_UNIQUE_NAME(testcase_func)(); \ static Future<Void> FILE_UNIQUE_NAME(testcase_func)(const UnitTestParameters& params); \
namespace { \ namespace { \
static UnitTest FILE_UNIQUE_NAME(testcase)(name, __FILE__, __LINE__, &FILE_UNIQUE_NAME(testcase_func)); \ static UnitTest FILE_UNIQUE_NAME(testcase)(name, __FILE__, __LINE__, &FILE_UNIQUE_NAME(testcase_func)); \
} \ } \
static Future<Void> FILE_UNIQUE_NAME(testcase_func)() static Future<Void> FILE_UNIQUE_NAME(testcase_func)(const UnitTestParameters& params)
// ACTOR_TEST_CASE generated by actorcompiler; don't use directly // ACTOR_TEST_CASE generated by actorcompiler; don't use directly
#define ACTOR_TEST_CASE(actorname, name) \ #define ACTOR_TEST_CASE(actorname, name) \

View File

@ -535,7 +535,13 @@ namespace actorcompiler
actor.testCaseParameters = str(paramRange); actor.testCaseParameters = str(paramRange);
actor.name = "flowTestCase" + toks.First().SourceLine; actor.name = "flowTestCase" + toks.First().SourceLine;
actor.parameters = new VarDeclaration[] { }; actor.parameters = new VarDeclaration[] { new VarDeclaration {
name = "params",
type = "UnitTestParameters",
initializer = "",
initializerConstructorSyntax = false
}
};
actor.returnType = "Void"; actor.returnType = "Void";
} }

View File

@ -482,7 +482,8 @@ public:
enBlobCredentialFiles = 10, enBlobCredentialFiles = 10,
enNetworkAddressesFunc = 11, enNetworkAddressesFunc = 11,
enClientFailureMonitor = 12, enClientFailureMonitor = 12,
enSQLiteInjectedError = 13 enSQLiteInjectedError = 13,
enGlobalConfig = 14
}; };
virtual void longTaskCheck(const char* name) {} virtual void longTaskCheck(const char* name) {}

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/correctness/ testsMatching=/redwood/correctness/

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/correctness/btree testsMatching=/redwood/correctness/btree

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/correctness/pager testsMatching=:/redwood/correctness/pager

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/correctness/unit/ testsMatching=/redwood/correctness/unit/

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/performance/prefixSizeComparison testsMatching=:/redwood/performance/prefixSizeComparison

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/performance/sequentialInsert testsMatching=:/redwood/performance/sequentialInsert

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/performance/set testsMatching=:/redwood/performance/set

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests testName=UnitTests
maxTestCases=0 maxTestCases=0
testsMatching=!/redwood/performance/ testsMatching=:/redwood/performance/

View File

@ -264,6 +264,40 @@ def process_traces(basedir, testname, path, out, aggregationPolicy, symbolicateB
parser.writeObject({'CMakeSEED': str(cmake_seed)}) parser.writeObject({'CMakeSEED': str(cmake_seed)})
return res return res
class RestartTestPolicy:
def __init__(self, name, old_binary, new_binary):
# Default is to use the same binary for the restart test, unless constraints are satisfied.
self._first_binary = new_binary
self._second_binary = new_binary
if old_binary is None:
_logger.info("No old binary provided")
old_binary_version_raw = subprocess.check_output([old_binary, '--version']).decode('utf-8')
match = re.match('FoundationDB.*\(v([0-9]+\.[0-9]+\.[0-9]+)\)', old_binary_version_raw)
assert match, old_binary_version_raw
old_binary_version = tuple(map(int, match.group(1).split('.')))
match = re.match('.*/restarting/from_([0-9]+\.[0-9]+\.[0-9]+)/', name)
if match: # upgrading _from_
lower_bound = tuple(map(int, match.group(1).split('.')))
if old_binary_version >= lower_bound:
self._first_binary = old_binary
_logger.info("Using old binary as first binary: {} >= {}".format(old_binary_version, lower_bound))
else:
_logger.info("Using new binary as first binary: {} < {}".format(old_binary_version, lower_bound))
match = re.match('.*/restarting/to_([0-9]+\.[0-9]+\.[0-9]+)/', name)
if match: # downgrading _to_
lower_bound = tuple(map(int, match.group(1).split('.')))
if old_binary_version >= lower_bound:
self._second_binary = old_binary
_logger.info("Using old binary as second binary: {} >= {}".format(old_binary_version, lower_bound))
else:
_logger.info("Using new binary as second binary: {} < {}".format(old_binary_version, lower_bound))
def first_binary(self):
return self._first_binary
def second_binary(self):
return self._second_binary
def run_simulation_test(basedir, options): def run_simulation_test(basedir, options):
fdbserver = os.path.join(basedir, 'bin', 'fdbserver') fdbserver = os.path.join(basedir, 'bin', 'fdbserver')
pargs = [fdbserver, pargs = [fdbserver,
@ -298,14 +332,19 @@ def run_simulation_test(basedir, options):
os.mkdir(wd) os.mkdir(wd)
return_codes = {} # {command: return_code} return_codes = {} # {command: return_code}
first = True first = True
restart_test_policy = None
if len(options.testfile) > 1:
restart_test_policy = RestartTestPolicy(options.testfile[0], options.old_binary, fdbserver)
for testfile in options.testfile: for testfile in options.testfile:
tmp = list(pargs) tmp = list(pargs)
# old_binary is not under test, so don't run under valgrind
valgrind_args = [] valgrind_args = []
if first and options.old_binary is not None and len(options.testfile) > 1: if restart_test_policy is not None:
_logger.info("Run old binary at {}".format(options.old_binary)) if first:
tmp[0] = options.old_binary tmp[0] = restart_test_policy.first_binary()
elif options.use_valgrind: else:
tmp[0] = restart_test_policy.second_binary()
# old_binary is not under test, so don't run under valgrind
if options.use_valgrind and tmp[0] == fdbserver:
valgrind_args = ['valgrind', '--error-exitcode=99', '--'] valgrind_args = ['valgrind', '--error-exitcode=99', '--']
if not first: if not first:
tmp.append('-R') tmp.append('-R')

View File

@ -6,4 +6,4 @@ startDelay = 0
[[test.workload]] [[test.workload]]
testName = 'UnitTests' testName = 'UnitTests'
maxTestCases = 0 maxTestCases = 0
testsMatching = '!/redwood/correctness/btree' testsMatching = '/redwood/correctness/btree'

View File

@ -1,4 +1,5 @@
storageEngineExcludeType=-1 storageEngineExcludeTypes=-1,-2
maxTLogVersion=6
testTitle=Clogged testTitle=Clogged
clearAfterTest=false clearAfterTest=false
testName=Cycle testName=Cycle

View File

@ -1,4 +1,5 @@
storageEngineExcludeType=-1 storageEngineExcludeTypes=-1,-2
maxTLogVersion=6
testTitle=Clogged testTitle=Clogged
runSetup=false runSetup=false
testName=Cycle testName=Cycle