Merge branch 'master' into features/actor-lineage

This commit is contained in:
Lukas Joswiak 2021-05-02 14:11:42 -07:00 committed by GitHub
commit 8dcd779fc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1980 additions and 596 deletions

View File

@ -23,6 +23,7 @@
#define FDB_INCLUDE_LEGACY_TYPES
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/MultiVersionAssignmentVars.h"
#include "foundationdb/fdb_c.h"
int g_api_version = 0;
@ -364,15 +365,20 @@ extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d
return DB(d)->getMainThreadBusyness();
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is non-zero, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version) {
Optional<ProtocolVersion> expected;
if (expected_version > 0) {
expected = ProtocolVersion(expected_version);
}
return (FDBFuture*)(DB(db)->getServerProtocol(expected).extractPtr());
return (
FDBFuture*)(mapThreadFuture<ProtocolVersion,
uint64_t>(DB(db)->getServerProtocol(expected), [](ErrorOr<ProtocolVersion> result) {
return result.map<uint64_t>([](ProtocolVersion pv) { return pv.versionWithFlags(); });
}).extractPtr());
}
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {

View File

@ -103,54 +103,10 @@ function(symlink_files)
endif()
endfunction()
# 'map' from (destination, package) to path
# format vars like install_destination_for_${destination}_${package}
set(install_destination_for_bin_tgz "bin")
set(install_destination_for_bin_deb "usr/bin")
set(install_destination_for_bin_el6 "usr/bin")
set(install_destination_for_bin_el7 "usr/bin")
set(install_destination_for_bin_pm "usr/local/bin")
set(install_destination_for_sbin_tgz "sbin")
set(install_destination_for_sbin_deb "usr/sbin")
set(install_destination_for_sbin_el6 "usr/sbin")
set(install_destination_for_sbin_el7 "usr/sbin")
set(install_destination_for_sbin_pm "usr/local/libexec")
set(install_destination_for_lib_tgz "lib")
set(install_destination_for_lib_deb "usr/lib")
set(install_destination_for_lib_el6 "usr/lib64")
set(install_destination_for_lib_el7 "usr/lib64")
set(install_destination_for_lib_pm "usr/local/lib")
set(install_destination_for_fdbmonitor_tgz "sbin")
set(install_destination_for_fdbmonitor_deb "usr/lib/foundationdb")
set(install_destination_for_fdbmonitor_el6 "usr/lib/foundationdb")
set(install_destination_for_fdbmonitor_el7 "usr/lib/foundationdb")
set(install_destination_for_fdbmonitor_pm "usr/local/libexec")
set(install_destination_for_include_tgz "include")
set(install_destination_for_include_deb "usr/include")
set(install_destination_for_include_el6 "usr/include")
set(install_destination_for_include_el7 "usr/include")
set(install_destination_for_include_pm "usr/local/include")
set(install_destination_for_etc_tgz "etc/foundationdb")
set(install_destination_for_etc_deb "etc/foundationdb")
set(install_destination_for_etc_el6 "etc/foundationdb")
set(install_destination_for_etc_el7 "etc/foundationdb")
set(install_destination_for_etc_pm "usr/local/etc/foundationdb")
set(install_destination_for_log_tgz "log/foundationdb")
set(install_destination_for_log_deb "var/log/foundationdb")
set(install_destination_for_log_el6 "var/log/foundationdb")
set(install_destination_for_log_el7 "var/log/foundationdb")
set(install_destination_for_log_pm "usr/local/foundationdb/logs")
set(install_destination_for_data_tgz "lib/foundationdb")
set(install_destination_for_data_deb "var/lib/foundationdb/data")
set(install_destination_for_data_el6 "var/lib/foundationdb/data")
set(install_destination_for_data_el7 "var/lib/foundationdb/data")
set(install_destination_for_data_pm "usr/local/foundationdb/data")
fdb_install_packages(TGZ DEB EL7 PM VERSIONED)
fdb_install_dirs(BIN SBIN LIB FDBMONITOR INCLUDE ETC LOG DATA)
message(STATUS "FDB_INSTALL_DIRS -> ${FDB_INSTALL_DIRS}")
# 'map' from (destination, package) to path
# format vars like install_destination_for_${destination}_${package}
install_destinations(TGZ
BIN bin
SBIN sbin
@ -169,7 +125,7 @@ install_destinations(DEB
INCLUDE usr/include
ETC etc/foundationdb
LOG var/log/foundationdb
DATA var/lib/foundationdb)
DATA var/lib/foundationdb/data)
copy_install_destinations(DEB EL7)
install_destinations(EL7 LIB usr/lib64)
install_destinations(PM
@ -227,6 +183,13 @@ set(LIB_DIR lib64)
configure_file("${PROJECT_SOURCE_DIR}/packaging/multiversion/clients/postinst" "${script_dir}/clients/postinst-el7" @ONLY)
configure_file("${PROJECT_SOURCE_DIR}/packaging/multiversion/clients/prerm" "${script_dir}/clients" @ONLY)
################################################################################
# Move Docker Setup
################################################################################
file(COPY "${PROJECT_SOURCE_DIR}/packaging/docker" DESTINATION "${PROJECT_BINARY_DIR}/packages/")
################################################################################
# General CPack configuration
################################################################################

View File

@ -2,6 +2,10 @@
Release Notes
#############
6.3.13
======
* The multi-version client now requires at most two client connections with version 6.2 or larger, regardless of how many external clients are configured. Clients older than 6.2 will continue to create an additional connection each. `(PR #4667) <https://github.com/apple/foundationdb/pull/4667>`_
6.3.12
======
* Change the default for --knob_tls_server_handshake_threads to 64. The previous was 1000. This avoids starting 1000 threads by default, but may adversely affect recovery time for large clusters using tls. Users with large tls clusters should consider explicitly setting this knob in their foundationdb.conf file. `(PR #4421) <https://github.com/apple/foundationdb/pull/4421>`_

View File

@ -15,7 +15,8 @@ Features
Performance
-----------
* Increased performance of dr_agent when copying the mutation log. The ``COPY_LOG_BLOCK_SIZE``, ``COPY_LOG_BLOCKS_PER_TASK``, ``COPY_LOG_PREFETCH_BLOCKS``, ``COPY_LOG_READ_AHEAD_BYTES`` and ``COPY_LOG_TASK_DURATION_NANOS`` knobs can be set. `(PR 3436) <https://github.com/apple/foundationdb/pull/3436>`_
* Increased performance of dr_agent when copying the mutation log. The ``COPY_LOG_BLOCK_SIZE``, ``COPY_LOG_BLOCKS_PER_TASK``, ``COPY_LOG_PREFETCH_BLOCKS``, ``COPY_LOG_READ_AHEAD_BYTES`` and ``COPY_LOG_TASK_DURATION_NANOS`` knobs can be set. `(PR #3436) <https://github.com/apple/foundationdb/pull/3436>`_
* Reduced the number of connections required by the multi-version client when loading external clients. When connecting to 7.0 clusters, only one connection with version 6.2 or larger will be used. With older clusters, at most two connections with version 6.2 or larger will be used. Clients older than version 6.2 will continue to create an additional connection each. `(PR #4667) <https://github.com/apple/foundationdb/pull/4667>`_
Reliability
-----------

View File

@ -35,6 +35,7 @@ constexpr UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE(-1, 3);
constexpr UID WLTOKEN_PROTOCOL_INFO(-1, 10);
// The coordinator interface as exposed to clients
struct ClientLeaderRegInterface {
RequestStream<struct GetLeaderRequest> getLeader;
RequestStream<struct OpenDatabaseCoordRequest> openDatabase;
@ -42,6 +43,10 @@ struct ClientLeaderRegInterface {
ClientLeaderRegInterface() {}
ClientLeaderRegInterface(NetworkAddress remote);
ClientLeaderRegInterface(INetwork* local);
bool operator==(const ClientLeaderRegInterface& rhs) const {
return getLeader == rhs.getLeader && openDatabase == rhs.openDatabase;
}
};
class ClusterConnectionString {

View File

@ -152,6 +152,7 @@ public:
return (DatabaseContext*)DatabaseContext::operator new(sizeof(DatabaseContext));
}
// Static constructor used by server processes to create a DatabaseContext
// For internal (fdbserver) use only
static Database create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
Future<Void> clientInfoMonitor,
@ -164,9 +165,11 @@ public:
~DatabaseContext();
// Constructs a new copy of this DatabaseContext from the parameters of this DatabaseContext
Database clone() const {
return Database(new DatabaseContext(connectionFile,
clientInfo,
coordinator,
clientInfoMonitor,
taskID,
clientLocality,
@ -196,6 +199,11 @@ public:
Future<Void> onProxiesChanged();
Future<HealthMetrics> getHealthMetrics(bool detailed);
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
Future<ProtocolVersion> getClusterProtocol(Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>());
// Update the watch counter for the database
void addWatch();
void removeWatch();
@ -247,6 +255,7 @@ public:
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -380,6 +389,9 @@ public:
Future<Void> clientInfoMonitor;
Future<Void> connected;
// An AsyncVar that reports the coordinator this DatabaseContext is interacting with
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator;
Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface;
Future<Void> statusLeaderMon;
double lastStatusFetch;

View File

@ -4043,6 +4043,8 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
state Version beginVersion;
state Reference<IBackupContainer> bc;
state std::vector<KeyRange> ranges;
state bool logsOnly;
state bool inconsistentSnapshotOnly;
loop {
try {
@ -4050,11 +4052,12 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkTaskVersion(tr->getDatabase(), task, name, version));
Optional<Version> _beginVersion = wait(restore.beginVersion().get(tr));
beginVersion = _beginVersion.present() ? _beginVersion.get() : invalidVersion;
wait(store(beginVersion, restore.beginVersion().getD(tr, false, invalidVersion)));
wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)));
wait(store(ranges, restore.getRestoreRangesOrDefault(tr)));
wait(store(logsOnly, restore.onlyAppyMutationLogs().getD(tr, false, false)));
wait(store(inconsistentSnapshotOnly, restore.inconsistentSnapshotOnly().getD(tr, false, false)));
wait(taskBucket->keepRunning(tr, task));
@ -4101,7 +4104,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
}
}
state bool logsOnly = wait(restore.onlyAppyMutationLogs().getD(tr, false, false));
state Version firstConsistentVersion = invalidVersion;
if (beginVersion == invalidVersion) {
beginVersion = 0;
}
@ -4111,25 +4114,46 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
}
state Optional<RestorableFileSet> restorable =
wait(bc->getRestoreSet(restoreVersion, keyRangesFilter, logsOnly, beginVersion));
if (!logsOnly) {
beginVersion = restorable.get().snapshot.beginVersion;
}
if (!restorable.present())
throw restore_missing_data();
// First version for which log data should be applied
Params.firstVersion().set(task, beginVersion);
// Convert the two lists in restorable (logs and ranges) to a single list of RestoreFiles.
// Order does not matter, they will be put in order when written to the restoreFileMap below.
state std::vector<RestoreConfig::RestoreFile> files;
state Version firstConsistentVersion = beginVersion;
for (const RangeFile& f : restorable.get().ranges) {
files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize });
firstConsistentVersion = std::max(firstConsistentVersion, f.version);
if (!logsOnly) {
beginVersion = restorable.get().snapshot.beginVersion;
if (!inconsistentSnapshotOnly) {
for (const RangeFile& f : restorable.get().ranges) {
files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize });
// In a restore with both snapshots and logs, the firstConsistentVersion is the highest version of
// any range file.
firstConsistentVersion = std::max(firstConsistentVersion, f.version);
}
} else {
for (int i = 0; i < restorable.get().ranges.size(); ++i) {
const RangeFile& f = restorable.get().ranges[i];
files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize });
// In inconsistentSnapshotOnly mode, if all range files have the same version, then it is the
// firstConsistentVersion, otherwise unknown (use -1).
if (i != 0 && f.version != firstConsistentVersion) {
firstConsistentVersion = invalidVersion;
} else {
firstConsistentVersion = f.version;
}
}
}
} else {
// In logs-only (incremental) mode, the firstConsistentVersion should just be restore.beginVersion().
firstConsistentVersion = beginVersion;
}
if (!inconsistentSnapshotOnly) {
for (const LogFile& f : restorable.get().logs) {
files.push_back({ f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion });
}
}
// First version for which log data should be applied
Params.firstVersion().set(task, beginVersion);
tr->reset();
loop {
try {
@ -4143,16 +4167,6 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
}
}
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
bool inconsistentSnapshotOnly = wait(restore.inconsistentSnapshotOnly().getD(tr, false, false));
if (!inconsistentSnapshotOnly) {
for (const LogFile& f : restorable.get().logs) {
files.push_back({ f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion });
}
}
state std::vector<RestoreConfig::RestoreFile>::iterator start = files.begin();
state std::vector<RestoreConfig::RestoreFile>::iterator end = files.end();

View File

@ -100,8 +100,9 @@ public:
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
virtual ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) = 0;

View File

@ -496,7 +496,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<ClusterConn
if (leader.get().first.forward) {
TraceEvent("MonitorLeaderForwarding")
.detail("NewConnStr", leader.get().first.serializedInfo.toString())
.detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
.detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString()).trackLatest("MonitorLeaderForwarding");
info.intermediateConnFile = makeReference<ClusterConnectionFile>(
connFile->getFilename(), ClusterConnectionString(leader.get().first.serializedInfo.toString()));
return info;
@ -758,6 +758,7 @@ void shrinkProxyList(ClientDBInfo& ni,
ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
MonitorLeaderInfo info,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
@ -775,6 +776,9 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
loop {
state ClientLeaderRegInterface clientLeaderServer(addrs[idx]);
state OpenDatabaseCoordRequest req;
coordinator->set(clientLeaderServer);
req.clusterKey = cs.clusterKey();
req.coordinators = cs.coordinators();
req.knownClientInfoID = clientInfo->get().id;
@ -841,13 +845,14 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
ACTOR Future<Void> monitorProxies(
Reference<AsyncVar<Reference<ClusterConnectionFile>>> connFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
state MonitorLeaderInfo info(connFile->get());
loop {
choose {
when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration(
connFile->get(), clientInfo, info, supportedVersions, traceLogGroup))) {
connFile->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
info = _info;
}
when(wait(connFile->onChange())) {

View File

@ -76,6 +76,7 @@ Future<Void> monitorLeaderForProxies(Value const& key,
Future<Void> monitorProxies(
Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile,
Reference<AsyncVar<ClientDBInfo>> const& clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> const& coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> const& supportedVersions,
Key const& traceLogGroup);

View File

@ -289,12 +289,15 @@ void DLTransaction::reset() {
// DLDatabase
DLDatabase::DLDatabase(Reference<FdbCApi> api, ThreadFuture<FdbCApi::FDBDatabase*> dbFuture) : api(api), db(nullptr) {
addref();
ready = mapThreadFuture<FdbCApi::FDBDatabase*, Void>(dbFuture, [this](ErrorOr<FdbCApi::FDBDatabase*> db) {
if (db.isError()) {
delref();
return ErrorOr<Void>(db.getError());
}
this->db = db.get();
delref();
return ErrorOr<Void>(Void());
});
}
@ -356,8 +359,9 @@ double DLDatabase::getMainThreadBusyness() {
return 0;
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
ASSERT(api->databaseGetServerProtocol != nullptr);
@ -877,35 +881,52 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
int threadIdx,
std::string clusterFilePath,
Reference<IDatabase> db,
Reference<IDatabase> versionMonitorDb,
bool openConnectors)
: dbState(new DatabaseState()), clusterFilePath(clusterFilePath) {
: dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) {
dbState->db = db;
dbState->dbVar->set(db);
if (!openConnectors) {
dbState->currentClientIndex = 0;
} else {
if (openConnectors) {
if (!api->localClientDisabled) {
dbState->currentClientIndex = 0;
dbState->addConnection(api->getLocalClient(), clusterFilePath);
} else {
dbState->currentClientIndex = -1;
dbState->addClient(api->getLocalClient());
}
api->runOnExternalClients(threadIdx, [this, clusterFilePath](Reference<ClientInfo> client) {
dbState->addConnection(client, clusterFilePath);
api->runOnExternalClients(threadIdx, [this](Reference<ClientInfo> client) { dbState->addClient(client); });
if (!externalClientsInitialized.test_and_set()) {
api->runOnExternalClientsAllThreads([&clusterFilePath](Reference<ClientInfo> client) {
// This creates a database to initialize some client state on the external library
// We only do this on 6.2+ clients to avoid some bugs associated with older versions
// This deletes the new database immediately to discard its connections
if (client->protocolVersion.hasCloseUnusedConnection()) {
Reference<IDatabase> newDb = client->api->createDatabase(clusterFilePath.c_str());
}
});
}
// For clients older than 6.2 we create and maintain our database connection
api->runOnExternalClients(threadIdx, [this, &clusterFilePath](Reference<ClientInfo> client) {
if (!client->protocolVersion.hasCloseUnusedConnection()) {
dbState->legacyDatabaseConnections[client->protocolVersion] =
client->api->createDatabase(clusterFilePath.c_str());
}
});
dbState->startConnections();
Reference<DatabaseState> dbStateRef = dbState;
onMainThreadVoid([dbStateRef]() { dbStateRef->protocolVersionMonitor = dbStateRef->monitorProtocolVersion(); },
nullptr);
}
}
MultiVersionDatabase::~MultiVersionDatabase() {
dbState->cancelConnections();
dbState->close();
}
// Create a MultiVersionDatabase that wraps an already created IDatabase object
// For internal use in testing
Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, false));
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, db, false));
}
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
@ -963,189 +984,279 @@ double MultiVersionDatabase::getMainThreadBusyness() {
return 0;
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
// TODO: send this out through the active database
return MultiVersionApi::api->getLocalClient()
->api->createDatabase(clusterFilePath.c_str())
->getServerProtocol(expectedVersion);
return dbState->versionMonitorDb->getServerProtocol(expectedVersion);
}
void MultiVersionDatabase::Connector::connect() {
addref();
onMainThreadVoid(
[this]() {
if (!cancelled) {
connected = false;
if (connectionFuture.isValid()) {
connectionFuture.cancel();
}
MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb)
: clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb),
dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))) {}
candidateDatabase = client->api->createDatabase(clusterFilePath.c_str());
if (client->external) {
connectionFuture = candidateDatabase.castTo<DLDatabase>()->onReady();
} else {
connectionFuture = ThreadFuture<Void>(Void());
}
// Adds a client (local or externally loaded) that can be used to connect to the cluster
void MultiVersionDatabase::DatabaseState::addClient(Reference<ClientInfo> client) {
ProtocolVersion baseVersion = client->protocolVersion.normalizedVersion();
auto [itr, inserted] = clients.insert({ baseVersion, client });
if (!inserted) {
// SOMEDAY: prefer client with higher release version if protocol versions are compatible
Reference<ClientInfo> keptClient = itr->second;
Reference<ClientInfo> discardedClient = client;
if (client->canReplace(itr->second)) {
std::swap(keptClient, discardedClient);
clients[baseVersion] = client;
}
connectionFuture = flatMapThreadFuture<Void, Void>(connectionFuture, [this](ErrorOr<Void> ready) {
if (ready.isError()) {
return ErrorOr<ThreadFuture<Void>>(ready.getError());
}
discardedClient->failed = true;
TraceEvent(SevWarn, "DuplicateClientVersion")
.detail("Keeping", keptClient->libPath)
.detail("KeptProtocolVersion", keptClient->protocolVersion)
.detail("Disabling", discardedClient->libPath)
.detail("DisabledProtocolVersion", discardedClient->protocolVersion);
tr = candidateDatabase->createTransaction();
return ErrorOr<ThreadFuture<Void>>(
mapThreadFuture<Version, Void>(tr->getReadVersion(), [](ErrorOr<Version> v) {
// If the version attempt returns an error, we regard that as a connection (except
// operation_cancelled)
if (v.isError() && v.getError().code() == error_code_operation_cancelled) {
return ErrorOr<Void>(v.getError());
} else {
return ErrorOr<Void>(Void());
}
}));
});
int userParam;
connectionFuture.callOrSetAsCallback(this, userParam, 0);
} else {
delref();
}
},
nullptr);
}
// Only called from main thread
void MultiVersionDatabase::Connector::cancel() {
connected = false;
cancelled = true;
if (connectionFuture.isValid()) {
connectionFuture.cancel();
}
}
void MultiVersionDatabase::Connector::fire(const Void& unused, int& userParam) {
onMainThreadVoid(
[this]() {
if (!cancelled) {
connected = true;
dbState->stateChanged();
}
delref();
},
nullptr);
}
void MultiVersionDatabase::Connector::error(const Error& e, int& userParam) {
if (e.code() != error_code_operation_cancelled) {
// TODO: is it right to abandon this connection attempt?
client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
TraceEvent(SevError, "DatabaseConnectionError").error(e).detail("ClientLibrary", this->client->libPath);
}
delref();
if (!client->protocolVersion.hasInexpensiveMultiVersionClient() && !client->failed) {
TraceEvent("AddingLegacyVersionMonitor")
.detail("LibPath", client->libPath)
.detail("ProtocolVersion", client->protocolVersion);
legacyVersionMonitors.emplace_back(new LegacyVersionMonitor(client));
}
}
MultiVersionDatabase::DatabaseState::DatabaseState()
: dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))), currentClientIndex(-1) {}
// Watch the cluster protocol version for changes and update the database state when it does.
// Must be called from the main thread
ThreadFuture<Void> MultiVersionDatabase::DatabaseState::monitorProtocolVersion() {
startLegacyVersionMonitors();
// Only called from main thread
void MultiVersionDatabase::DatabaseState::stateChanged() {
int newIndex = -1;
for (int i = 0; i < clients.size(); ++i) {
if (i != currentClientIndex && connectionAttempts[i]->connected) {
if (currentClientIndex >= 0 && !clients[i]->canReplace(clients[currentClientIndex])) {
TraceEvent(SevWarn, "DuplicateClientVersion")
.detail("Keeping", clients[currentClientIndex]->libPath)
.detail("KeptClientProtocolVersion", clients[currentClientIndex]->protocolVersion.version())
.detail("Disabling", clients[i]->libPath)
.detail("DisabledClientProtocolVersion", clients[i]->protocolVersion.version());
connectionAttempts[i]->connected = false; // Permanently disable this client in favor of the current one
clients[i]->failed = true;
MultiVersionApi::api->updateSupportedVersions();
return;
Optional<ProtocolVersion> expected = dbProtocolVersion;
ThreadFuture<ProtocolVersion> f = versionMonitorDb->getServerProtocol(dbProtocolVersion);
Reference<DatabaseState> self = Reference<DatabaseState>::addRef(this);
return mapThreadFuture<ProtocolVersion, Void>(f, [self, expected](ErrorOr<ProtocolVersion> cv) {
if (cv.isError()) {
if (cv.getError().code() == error_code_operation_cancelled) {
return ErrorOr<Void>(cv.getError());
}
newIndex = i;
break;
TraceEvent("ErrorGettingClusterProtocolVersion")
.detail("ExpectedProtocolVersion", expected)
.error(cv.getError());
}
ProtocolVersion clusterVersion =
!cv.isError() ? cv.get() : self->dbProtocolVersion.orDefault(currentProtocolVersion);
onMainThreadVoid([self, clusterVersion]() { self->protocolVersionChanged(clusterVersion); }, nullptr);
return ErrorOr<Void>(Void());
});
}
// Called when a change to the protocol version of the cluster has been detected.
// Must be called from the main thread
void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion protocolVersion) {
// If the protocol version changed but is still compatible, update our local version but keep the same connection
if (dbProtocolVersion.present() &&
protocolVersion.normalizedVersion() == dbProtocolVersion.get().normalizedVersion()) {
dbProtocolVersion = protocolVersion;
ASSERT(protocolVersionMonitor.isValid());
protocolVersionMonitor.cancel();
protocolVersionMonitor = monitorProtocolVersion();
}
// The protocol version has changed to a different, incompatible version
else {
TraceEvent("ProtocolVersionChanged")
.detail("NewProtocolVersion", protocolVersion)
.detail("OldProtocolVersion", dbProtocolVersion);
dbProtocolVersion = protocolVersion;
auto itr = clients.find(protocolVersion.normalizedVersion());
if (itr != clients.end()) {
auto& client = itr->second;
TraceEvent("CreatingDatabaseOnClient")
.detail("LibraryPath", client->libPath)
.detail("Failed", client->failed)
.detail("External", client->external);
Reference<IDatabase> newDb = client->api->createDatabase(clusterFilePath.c_str());
if (client->external && !MultiVersionApi::apiVersionAtLeast(610)) {
// Old API versions return a future when creating the database, so we need to wait for it
Reference<DatabaseState> self = Reference<DatabaseState>::addRef(this);
dbReady = mapThreadFuture<Void, Void>(
newDb.castTo<DLDatabase>()->onReady(), [self, newDb, client](ErrorOr<Void> ready) {
if (!ready.isError()) {
onMainThreadVoid([self, newDb, client]() { self->updateDatabase(newDb, client); }, nullptr);
} else {
onMainThreadVoid([self, client]() { self->updateDatabase(Reference<IDatabase>(), client); },
nullptr);
}
return ready;
});
} else {
updateDatabase(newDb, client);
}
} else {
// We don't have a client matching the current protocol
updateDatabase(Reference<IDatabase>(), Reference<ClientInfo>());
}
}
}
if (newIndex == -1) {
ASSERT_EQ(currentClientIndex, 0); // This can only happen for the local client, which we set as the current
// connection before we know it's connected
return;
}
// Replaces the active database connection with a new one. Must be called from the main thread.
void MultiVersionDatabase::DatabaseState::updateDatabase(Reference<IDatabase> newDb, Reference<ClientInfo> client) {
if (newDb) {
optionLock.enter();
for (auto option : options) {
try {
// In practice, this will set a deferred error instead of throwing. If that happens, the database
// will be unusable (attempts to use it will throw errors).
newDb->setOption(option.first, option.second.castTo<StringRef>());
} catch (Error& e) {
optionLock.leave();
// Restart connection for replaced client
auto newDb = connectionAttempts[newIndex]->candidateDatabase;
optionLock.enter();
for (auto option : options) {
try {
newDb->setOption(option.first,
option.second.castTo<StringRef>()); // In practice, this will set a deferred error instead
// of throwing. If that happens, the database will be
// unusable (attempts to use it will throw errors).
} catch (Error& e) {
optionLock.leave();
TraceEvent(SevError, "ClusterVersionChangeOptionError")
.error(e)
.detail("Option", option.first)
.detail("OptionValue", option.second)
.detail("LibPath", clients[newIndex]->libPath);
connectionAttempts[newIndex]->connected = false;
clients[newIndex]->failed = true;
MultiVersionApi::api->updateSupportedVersions();
return; // If we can't set all of the options on a cluster, we abandon the client
// If we can't set all of the options on a cluster, we abandon the client
TraceEvent(SevError, "ClusterVersionChangeOptionError")
.error(e)
.detail("Option", option.first)
.detail("OptionValue", option.second)
.detail("LibPath", client->libPath);
client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
newDb = Reference<IDatabase>();
break;
}
}
}
db = newDb;
optionLock.leave();
db = newDb;
optionLock.leave();
if (dbProtocolVersion.get().hasStableInterfaces() && db) {
versionMonitorDb = db;
} else {
// For older clients that don't have an API to get the protocol version, we have to monitor it locally
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
}
} else {
// We don't have a database connection, so use the local client to monitor the protocol version
db = Reference<IDatabase>();
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
}
dbVar->set(db);
if (currentClientIndex >= 0 && connectionAttempts[currentClientIndex]->connected) {
connectionAttempts[currentClientIndex]->connected = false;
connectionAttempts[currentClientIndex]->connect();
}
ASSERT(newIndex >= 0 && newIndex < clients.size());
currentClientIndex = newIndex;
ASSERT(protocolVersionMonitor.isValid());
protocolVersionMonitor.cancel();
protocolVersionMonitor = monitorProtocolVersion();
}
void MultiVersionDatabase::DatabaseState::addConnection(Reference<ClientInfo> client, std::string clusterFilePath) {
clients.push_back(client);
connectionAttempts.push_back(
makeReference<Connector>(Reference<DatabaseState>::addRef(this), client, clusterFilePath));
}
void MultiVersionDatabase::DatabaseState::startConnections() {
for (auto c : connectionAttempts) {
c->connect();
// Starts version monitors for old client versions that don't support connect packet monitoring (<= 5.0).
// Must be called from the main thread
void MultiVersionDatabase::DatabaseState::startLegacyVersionMonitors() {
for (auto itr = legacyVersionMonitors.begin(); itr != legacyVersionMonitors.end(); ++itr) {
while (itr != legacyVersionMonitors.end() && (*itr)->client->failed) {
(*itr)->close();
itr = legacyVersionMonitors.erase(itr);
}
if (itr != legacyVersionMonitors.end() &&
(!dbProtocolVersion.present() || (*itr)->client->protocolVersion != dbProtocolVersion.get())) {
(*itr)->startConnectionMonitor(Reference<DatabaseState>::addRef(this));
}
}
}
void MultiVersionDatabase::DatabaseState::cancelConnections() {
addref();
// Cleans up state for the legacy version monitors to break reference cycles
void MultiVersionDatabase::DatabaseState::close() {
Reference<DatabaseState> self = Reference<DatabaseState>::addRef(this);
onMainThreadVoid(
[this]() {
for (auto c : connectionAttempts) {
c->cancel();
[self]() {
if (self->protocolVersionMonitor.isValid()) {
self->protocolVersionMonitor.cancel();
}
for (auto monitor : self->legacyVersionMonitors) {
monitor->close();
}
connectionAttempts.clear();
clients.clear();
delref();
self->legacyVersionMonitors.clear();
},
nullptr);
}
// MultiVersionApi
// Starts the connection monitor by creating a database object at an old version.
// Must be called from the main thread
void MultiVersionDatabase::LegacyVersionMonitor::startConnectionMonitor(
Reference<MultiVersionDatabase::DatabaseState> dbState) {
if (!monitorRunning) {
monitorRunning = true;
auto itr = dbState->legacyDatabaseConnections.find(client->protocolVersion);
ASSERT(itr != dbState->legacyDatabaseConnections.end());
db = itr->second;
tr = Reference<ITransaction>();
TraceEvent("StartingLegacyVersionMonitor").detail("ProtocolVersion", client->protocolVersion);
Reference<LegacyVersionMonitor> self = Reference<LegacyVersionMonitor>::addRef(this);
versionMonitor =
mapThreadFuture<Void, Void>(db.castTo<DLDatabase>()->onReady(), [self, dbState](ErrorOr<Void> ready) {
onMainThreadVoid(
[self, ready, dbState]() {
if (ready.isError()) {
if (ready.getError().code() != error_code_operation_cancelled) {
TraceEvent(SevError, "FailedToOpenDatabaseOnClient")
.error(ready.getError())
.detail("LibPath", self->client->libPath);
self->client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
}
} else {
self->runGrvProbe(dbState);
}
},
nullptr);
return ready;
});
}
}
// Runs a GRV probe on the cluster to determine if the client version is compatible with the cluster.
// Must be called from main thread
void MultiVersionDatabase::LegacyVersionMonitor::runGrvProbe(Reference<MultiVersionDatabase::DatabaseState> dbState) {
tr = db->createTransaction();
Reference<LegacyVersionMonitor> self = Reference<LegacyVersionMonitor>::addRef(this);
versionMonitor = mapThreadFuture<Version, Void>(tr->getReadVersion(), [self, dbState](ErrorOr<Version> v) {
// If the version attempt returns an error, we regard that as a connection (except operation_cancelled)
if (!v.isError() || v.getError().code() != error_code_operation_cancelled) {
onMainThreadVoid(
[self, dbState]() {
self->monitorRunning = false;
dbState->protocolVersionChanged(self->client->protocolVersion);
},
nullptr);
}
return v.map<Void>([](Version v) { return Void(); });
});
}
void MultiVersionDatabase::LegacyVersionMonitor::close() {
if (versionMonitor.isValid()) {
versionMonitor.cancel();
}
}
std::atomic_flag MultiVersionDatabase::externalClientsInitialized = ATOMIC_FLAG_INIT;
// MultiVersionApi
bool MultiVersionApi::apiVersionAtLeast(int minVersion) {
ASSERT_NE(MultiVersionApi::api->apiVersion, 0);
return MultiVersionApi::api->apiVersion >= minVersion || MultiVersionApi::api->apiVersion < 0;
@ -1608,6 +1719,7 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void*
}
}
// Creates an IDatabase object that represents a connection to the cluster
Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath) {
lock.enter();
if (!networkSetup) {
@ -1622,28 +1734,21 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
int threadIdx = nextThread;
nextThread = (nextThread + 1) % threadCount;
lock.leave();
for (auto it : externalClients) {
TraceEvent("CreatingDatabaseOnExternalClient")
.detail("LibraryPath", it.first)
.detail("Failed", it.second[threadIdx]->failed);
}
return Reference<IDatabase>(new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>()));
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
return Reference<IDatabase>(
new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>(), localDb));
}
lock.leave();
ASSERT_LE(threadCount, 1);
auto db = localClient->api->createDatabase(clusterFilePath);
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
if (bypassMultiClientApi) {
return db;
return localDb;
} else {
for (auto it : externalClients) {
TraceEvent("CreatingDatabaseOnExternalClient")
.detail("LibraryPath", it.first)
.detail("Failed", it.second[0]->failed);
}
return Reference<IDatabase>(new MultiVersionDatabase(this, 0, clusterFile, db));
return Reference<IDatabase>(new MultiVersionDatabase(this, 0, clusterFile, Reference<IDatabase>(), localDb));
}
}
@ -1975,6 +2080,12 @@ ACTOR Future<Void> checkUndestroyedFutures(std::vector<ThreadSingleAssignmentVar
return Void();
}
// Common code for tests of single assignment vars. Tests both correctness and thread safety.
// T should be a class that has a static method with the following signature:
//
// static FutureInfo createThreadFuture(FutureInfo f);
//
// See AbortableTest for an example T type
template <class T>
THREAD_FUNC runSingleAssignmentVarTest(void* arg) {
noUnseed = true;
@ -1987,6 +2098,9 @@ THREAD_FUNC runSingleAssignmentVarTest(void* arg) {
tf.validate();
tf.future.extractPtr(); // leaks
for (auto t : tf.threads) {
waitThread(t);
}
}
for (int numRuns = 0; numRuns < 25; ++numRuns) {
@ -2057,12 +2171,14 @@ struct AbortableTest {
TEST_CASE("/fdbclient/multiversionclient/AbortableSingleAssignmentVar") {
state volatile bool done = false;
g_network->startThread(runSingleAssignmentVarTest<AbortableTest>, (void*)&done);
state THREAD_HANDLE thread = g_network->startThread(runSingleAssignmentVarTest<AbortableTest>, (void*)&done);
while (!done) {
wait(delay(1.0));
}
waitThread(thread);
return Void();
}
@ -2134,20 +2250,24 @@ TEST_CASE("/fdbclient/multiversionclient/DLSingleAssignmentVar") {
state volatile bool done = false;
MultiVersionApi::api->callbackOnMainThread = true;
g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
state THREAD_HANDLE thread = g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
while (!done) {
wait(delay(1.0));
}
waitThread(thread);
done = false;
MultiVersionApi::api->callbackOnMainThread = false;
g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
thread = g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
while (!done) {
wait(delay(1.0));
}
waitThread(thread);
return Void();
}
@ -2172,12 +2292,14 @@ struct MapTest {
TEST_CASE("/fdbclient/multiversionclient/MapSingleAssignmentVar") {
state volatile bool done = false;
g_network->startThread(runSingleAssignmentVarTest<MapTest>, (void*)&done);
state THREAD_HANDLE thread = g_network->startThread(runSingleAssignmentVarTest<MapTest>, (void*)&done);
while (!done) {
wait(delay(1.0));
}
waitThread(thread);
return Void();
}
@ -2209,11 +2331,13 @@ struct FlatMapTest {
TEST_CASE("/fdbclient/multiversionclient/FlatMapSingleAssignmentVar") {
state volatile bool done = false;
g_network->startThread(runSingleAssignmentVarTest<FlatMapTest>, (void*)&done);
state THREAD_HANDLE thread = g_network->startThread(runSingleAssignmentVarTest<FlatMapTest>, (void*)&done);
while (!done) {
wait(delay(1.0));
}
waitThread(thread);
return Void();
}

View File

@ -271,8 +271,9 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
@ -437,82 +438,128 @@ public:
int threadIdx,
std::string clusterFilePath,
Reference<IDatabase> db,
Reference<IDatabase> versionMonitorDb,
bool openConnectors = true);
~MultiVersionDatabase() override;
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
// Create a MultiVersionDatabase that wraps an already created IDatabase object
// For internal use in testing
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
private:
struct DatabaseState;
// private:
struct Connector : ThreadCallback, ThreadSafeReferenceCounted<Connector> {
Connector(Reference<DatabaseState> dbState, Reference<ClientInfo> client, std::string clusterFilePath)
: dbState(dbState), client(client), clusterFilePath(clusterFilePath), connected(false), cancelled(false) {}
void connect();
void cancel();
bool canFire(int notMadeActive) const override { return true; }
void fire(const Void& unused, int& userParam) override;
void error(const Error& e, int& userParam) override;
const Reference<ClientInfo> client;
const std::string clusterFilePath;
const Reference<DatabaseState> dbState;
ThreadFuture<Void> connectionFuture;
Reference<IDatabase> candidateDatabase;
Reference<ITransaction> tr;
bool connected;
bool cancelled;
};
struct LegacyVersionMonitor;
// A struct that manages the current connection state of the MultiVersionDatabase. This wraps the underlying
// IDatabase object that is currently interacting with the cluster.
struct DatabaseState : ThreadSafeReferenceCounted<DatabaseState> {
DatabaseState();
DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb);
void stateChanged();
void addConnection(Reference<ClientInfo> client, std::string clusterFilePath);
void startConnections();
void cancelConnections();
// Replaces the active database connection with a new one. Must be called from the main thread.
void updateDatabase(Reference<IDatabase> newDb, Reference<ClientInfo> client);
// Called when a change to the protocol version of the cluster has been detected.
// Must be called from the main thread
void protocolVersionChanged(ProtocolVersion protocolVersion);
// Adds a client (local or externally loaded) that can be used to connect to the cluster
void addClient(Reference<ClientInfo> client);
// Watch the cluster protocol version for changes and update the database state when it does.
// Must be called from the main thread
ThreadFuture<Void> monitorProtocolVersion();
// Starts version monitors for old client versions that don't support connect packet monitoring (<= 5.0).
// Must be called from the main thread
void startLegacyVersionMonitors();
// Cleans up state for the legacy version monitors to break reference cycles
void close();
Reference<IDatabase> db;
const Reference<ThreadSafeAsyncVar<Reference<IDatabase>>> dbVar;
std::string clusterFilePath;
// Used to monitor the cluster protocol version. Will be the same as db unless we have either not connected
// yet or if the client version associated with db does not support protocol monitoring. In those cases,
// this will be a specially created local db.
Reference<IDatabase> versionMonitorDb;
ThreadFuture<Void> changed;
bool cancelled;
int currentClientIndex;
std::vector<Reference<ClientInfo>> clients;
std::vector<Reference<Connector>> connectionAttempts;
ThreadFuture<Void> dbReady;
ThreadFuture<Void> protocolVersionMonitor;
// Versions older than 6.1 do not benefit from having their database connections closed. Additionally,
// there are various issues that result in negative behavior in some cases if the connections are closed.
// Therefore, we leave them open.
std::map<ProtocolVersion, Reference<IDatabase>> legacyDatabaseConnections;
// Versions 5.0 and older do not support connection packet monitoring and require alternate techniques to
// determine the cluster version.
std::list<Reference<LegacyVersionMonitor>> legacyVersionMonitors;
Optional<ProtocolVersion> dbProtocolVersion;
// This maps a normalized protocol version to the client associated with it. This prevents compatible
// differences in protocol version not matching each other.
std::map<ProtocolVersion, Reference<ClientInfo>> clients;
std::vector<std::pair<FDBDatabaseOptions::Option, Optional<Standalone<StringRef>>>> options;
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaultOptions;
Mutex optionLock;
};
std::string clusterFilePath;
// A struct that enables monitoring whether the cluster is running an old version (<= 5.0) that doesn't support
// connect packet monitoring.
struct LegacyVersionMonitor : ThreadSafeReferenceCounted<LegacyVersionMonitor> {
LegacyVersionMonitor(Reference<ClientInfo> const& client) : client(client), monitorRunning(false) {}
// Terminates the version monitor to break reference cycles
void close();
// Starts the connection monitor by creating a database object at an old version.
// Must be called from the main thread
void startConnectionMonitor(Reference<DatabaseState> dbState);
// Runs a GRV probe on the cluster to determine if the client version is compatible with the cluster.
// Must be called from main thread
void runGrvProbe(Reference<DatabaseState> dbState);
Reference<ClientInfo> client;
Reference<IDatabase> db;
Reference<ITransaction> tr;
ThreadFuture<Void> versionMonitor;
bool monitorRunning;
};
const Reference<DatabaseState> dbState;
friend class MultiVersionTransaction;
// Clients must create a database object in order to initialize some of their state.
// This needs to be done only once, and this flag tracks whether that has happened.
static std::atomic_flag externalClientsInitialized;
};
// An implementation of IClientApi that can choose between multiple different client implementations either provided
@ -530,6 +577,7 @@ public:
void stopNetwork() override;
void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override;
// Creates an IDatabase object that represents a connection to the cluster
Reference<IDatabase> createDatabase(const char* clusterFilePath) override;
static MultiVersionApi* api;

View File

@ -905,6 +905,7 @@ Future<Standalone<RangeResultRef>> HealthMetricsRangeImpl::getRange(ReadYourWrit
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -913,9 +914,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
bool internal,
int apiVersion,
bool switchable)
: connectionFile(connectionFile), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID),
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
apiVersion(apiVersion), switchable(switchable), proxyProvisional(false), cc("TransactionMetrics"),
: connectionFile(connectionFile), clientInfo(clientInfo), coordinator(coordinator),
clientInfoMonitor(clientInfoMonitor), taskID(taskID), clientLocality(clientLocality),
enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware), apiVersion(apiVersion),
switchable(switchable), proxyProvisional(false), cc("TransactionMetrics"),
transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
transactionReadVersionBatches("ReadVersionBatches", cc),
@ -1029,13 +1031,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
singleKeyRange(LiteralStringRef("consistency_check_suspended"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::GLOBALCONFIG, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<GlobalConfigImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
SpecialKeySpace::MODULE::GLOBALCONFIG,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<GlobalConfigImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::TRACING, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
SpecialKeySpace::MODULE::TRACING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READWRITE,
@ -1189,6 +1191,8 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), internal(false),
transactionTracingEnabled(true) {}
// Static constructor used by server processes to create a DatabaseContext
// For internal (fdbserver) use only
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
Future<Void> clientInfoMonitor,
LocalityData clientLocality,
@ -1199,6 +1203,7 @@ Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
bool switchable) {
return Database(new DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>>(),
clientInfo,
makeReference<AsyncVar<Optional<ClientLeaderRegInterface>>>(),
clientInfoMonitor,
taskID,
clientLocality,
@ -1481,6 +1486,9 @@ void DatabaseContext::expireThrottles() {
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
// Creates a database object that represents a connection to a cluster
// This constructor uses a preallocated DatabaseContext that may have been created
// on another thread
Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
int apiVersion,
bool internal,
@ -1527,15 +1535,20 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
g_network->initTLS();
auto clientInfo = makeReference<AsyncVar<ClientDBInfo>>();
auto coordinator = makeReference<AsyncVar<Optional<ClientLeaderRegInterface>>>();
auto connectionFile = makeReference<AsyncVar<Reference<ClusterConnectionFile>>>();
connectionFile->set(connFile);
Future<Void> clientInfoMonitor = monitorProxies(
connectionFile, clientInfo, networkOptions.supportedVersions, StringRef(networkOptions.traceLogGroup));
Future<Void> clientInfoMonitor = monitorProxies(connectionFile,
clientInfo,
coordinator,
networkOptions.supportedVersions,
StringRef(networkOptions.traceLogGroup));
DatabaseContext* db;
if (preallocatedDb) {
db = new (preallocatedDb) DatabaseContext(connectionFile,
clientInfo,
coordinator,
clientInfoMonitor,
TaskPriority::DefaultEndpoint,
clientLocality,
@ -1547,6 +1560,7 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
} else {
db = new DatabaseContext(connectionFile,
clientInfo,
coordinator,
clientInfoMonitor,
TaskPriority::DefaultEndpoint,
clientLocality,
@ -4910,48 +4924,97 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
return versionstampPromise.getFuture();
}
ACTOR Future<ProtocolVersion> coordinatorProtocolsFetcher(Reference<ClusterConnectionFile> f) {
state ClientCoordinators coord(f);
// Gets the protocol version reported by a coordinator via the protocol info interface
ACTOR Future<ProtocolVersion> getCoordinatorProtocol(NetworkAddressList coordinatorAddresses) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{ { coordinatorAddresses }, WLTOKEN_PROTOCOL_INFO } };
ProtocolInfoReply reply = wait(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
wait(smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5));
std::unordered_map<uint64_t, int> protocolCount;
for (int i = 0; i < coordProtocols.size(); i++) {
if (coordProtocols[i].isReady()) {
protocolCount[coordProtocols[i].get().version.version()]++;
}
}
uint64_t majorityProtocol = std::max_element(protocolCount.begin(),
protocolCount.end(),
[](const std::pair<uint64_t, int>& l,
const std::pair<uint64_t, int>& r) { return l.second < r.second; })
->first;
return ProtocolVersion(majorityProtocol);
return reply.version;
}
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion) {
// Gets the protocol version reported by a coordinator in its connect packet
// If we are unable to get a version from the connect packet (e.g. because we lost connection with the peer), then this
// function will return with an unset result.
// If an expected version is given, this future won't return if the actual protocol version matches the expected version
ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
NetworkAddress coordinatorAddress,
Optional<ProtocolVersion> expectedVersion) {
state Reference<AsyncVar<Optional<ProtocolVersion>>> protocolVersion =
FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress);
loop {
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f));
if (!expectedVersion.present() || protocolVersion != expectedVersion.get()) {
return protocolVersion;
} else {
wait(delay(2.0)); // TODO: this is temporary, so not making into a knob yet
if (protocolVersion->get().present() && protocolVersion->get() != expectedVersion) {
return protocolVersion->get();
}
Future<Void> change = protocolVersion->onChange();
if (!protocolVersion->get().present()) {
// If we still don't have any connection info after a timeout, retry sending the protocol version request
change = timeout(change, FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Void());
}
wait(change);
if (!protocolVersion->get().present()) {
return protocolVersion->get();
}
}
}
// Returns the protocol version reported by the given coordinator
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Optional<ProtocolVersion> expectedVersion) {
state bool needToConnect = true;
state Future<ProtocolVersion> protocolVersion = Never();
loop {
if (!coordinator->get().present()) {
wait(coordinator->onChange());
} else {
Endpoint coordinatorEndpoint = coordinator->get().get().getLeader.getEndpoint();
if (needToConnect) {
// Even though we typically rely on the connect packet to get the protocol version, we need to send some
// request in order to start a connection. This protocol version request serves that purpose.
protocolVersion = getCoordinatorProtocol(coordinatorEndpoint.addresses);
needToConnect = false;
}
choose {
when(wait(coordinator->onChange())) { needToConnect = true; }
when(ProtocolVersion pv = wait(protocolVersion)) {
if (!expectedVersion.present() || expectedVersion.get() != pv) {
return pv;
}
protocolVersion = Never();
}
// Older versions of FDB don't have an endpoint to return the protocol version, so we get this info from
// the connect packet
when(Optional<ProtocolVersion> pv = wait(getCoordinatorProtocolFromConnectPacket(
coordinatorEndpoint.getPrimaryAddress(), expectedVersion))) {
if (pv.present()) {
return pv.get();
} else {
needToConnect = true;
}
}
}
}
}
}
// Returns the protocol version reported by the coordinator this client is currently connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
Future<ProtocolVersion> DatabaseContext::getClusterProtocol(Optional<ProtocolVersion> expectedVersion) {
return getClusterProtocolImpl(coordinator, expectedVersion);
}
uint32_t Transaction::getSize() {
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
tr.transaction.write_conflict_ranges.expectedSize();

View File

@ -76,11 +76,15 @@ class Database {
public:
enum { API_VERSION_LATEST = -1 };
// Creates a database object that represents a connection to a cluster
// This constructor uses a preallocated DatabaseContext that may have been created
// on another thread
static Database createDatabase(Reference<ClusterConnectionFile> connFile,
int apiVersion,
bool internal = true,
LocalityData const& clientLocality = LocalityData(),
DatabaseContext* preallocatedDb = nullptr);
static Database createDatabase(std::string connFileName,
int apiVersion,
bool internal = true,
@ -400,11 +404,6 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion);
inline uint64_t getWriteOperationCost(uint64_t bytes) {
return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1;
}

View File

@ -194,6 +194,18 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"p99":0.0,
"p99.9":0.0
},
"commit_batching_window_size":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
},
"grv_latency_bands":{
"$map": 1
},

View File

@ -97,13 +97,13 @@ double ThreadSafeDatabase::getMainThreadBusyness() {
return g_network->networkInfo.metrics.networkBusyness;
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
DatabaseContext* db = this->db;
return onMainThread([db, expectedVersion]() -> Future<ProtocolVersion> {
return getClusterProtocol(db->getConnectionFile(), expectedVersion);
});
return onMainThread(
[db, expectedVersion]() -> Future<ProtocolVersion> { return db->getClusterProtocol(expectedVersion); });
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {

View File

@ -39,8 +39,9 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;

View File

@ -760,6 +760,13 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
conn->close();
conn = Reference<IConnection>();
// Old versions will throw this error, and we don't want to forget their protocol versions.
// This means we can't tell the difference between an old protocol version and one we
// can no longer connect to.
if (e.code() != error_code_incompatible_protocol_version) {
self->protocolVersion->set(Optional<ProtocolVersion>());
}
}
// Clients might send more packets in response, which needs to go out on the next connection
@ -787,7 +794,8 @@ Peer::Peer(TransportData* transport, NetworkAddress const& destination)
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0),
bytesSent(0), lastLoggedBytesSent(0), lastLoggedTime(0.0), connectOutgoingCount(0), connectIncomingCount(0),
connectFailedCount(0), connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1) {
connectFailedCount(0), connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1),
protocolVersion(Reference<AsyncVar<Optional<ProtocolVersion>>>(new AsyncVar<Optional<ProtocolVersion>>())) {
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
}
@ -1103,12 +1111,12 @@ static int getNewBufferSize(const uint8_t* begin,
packetLen + sizeof(uint32_t) * (peerAddress.isTLS() ? 2 : 3));
}
// This actor exists whenever there is an open or opening connection, whether incoming or outgoing
// For incoming connections conn is set and peer is initially nullptr; for outgoing connections it is the reverse
ACTOR static Future<Void> connectionReader(TransportData* transport,
Reference<IConnection> conn,
Reference<Peer> peer,
Promise<Reference<Peer>> onConnected) {
// This actor exists whenever there is an open or opening connection, whether incoming or outgoing
// For incoming connections conn is set and peer is initially nullptr; for outgoing connections it is the reverse
state Arena arena;
state uint8_t* unprocessed_begin = nullptr;
@ -1206,7 +1214,11 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
}
compatible = false;
if (!protocolVersion.hasMultiVersionClient()) {
if (!protocolVersion.hasInexpensiveMultiVersionClient()) {
if(peer) {
peer->protocolVersion->set(protocolVersion);
}
// Older versions expected us to hang up. It may work even if we don't hang up here, but
// it's safer to keep the old behavior.
throw incompatible_protocol_version();
@ -1256,6 +1268,7 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
onConnected.send(peer);
wait(delay(0)); // Check for cancellation
}
peer->protocolVersion->set(peerProtocolVersion);
}
}
@ -1669,6 +1682,16 @@ Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
return self->degraded;
}
// Returns the protocol version of the peer at the specified address. The result is returned as an AsyncVar that
// can be used to monitor for changes of a peer's protocol. The protocol version will be unset in the event that
// there is no connection established to the peer.
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) {
return self->peers.at(addr)->protocolVersion;
}
void FlowTransport::resetConnection(NetworkAddress address) {
auto peer = self->getPeer(address);
if (peer) {

View File

@ -152,6 +152,9 @@ struct Peer : public ReferenceCounted<Peer> {
double lastLoggedTime;
int64_t lastLoggedBytesReceived;
int64_t lastLoggedBytesSent;
Reference<AsyncVar<Optional<ProtocolVersion>>> protocolVersion;
// Cleared every time stats are logged for this peer.
int connectOutgoingCount;
int connectIncomingCount;
@ -174,64 +177,64 @@ public:
FlowTransport(uint64_t transportId);
~FlowTransport();
static void createInstance(bool isClient, uint64_t transportId);
// Creates a new FlowTransport and makes FlowTransport::transport() return it. This uses g_network->global()
// variables, so it will be private to a simulation.
static void createInstance(bool isClient, uint64_t transportId);
static bool isClient() { return g_network->global(INetwork::enClientFailureMonitor) != nullptr; }
void initMetrics();
// Metrics must be initialized after FlowTransport::createInstance has been called
void initMetrics();
Future<Void> bind(NetworkAddress publicAddress, NetworkAddress listenAddress);
// Starts a server listening on the given listenAddress, and sets publicAddress to be the public
// address of this server. Returns only errors.
Future<Void> bind(NetworkAddress publicAddress, NetworkAddress listenAddress);
NetworkAddress getLocalAddress() const;
// Returns first local NetworkAddress.
NetworkAddress getLocalAddress() const;
NetworkAddressList getLocalAddresses() const;
// Returns all local NetworkAddress.
NetworkAddressList getLocalAddresses() const;
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
Future<Void> onIncompatibleChanged();
// Returns when getIncompatiblePeers has at least one peer which is incompatible.
Future<Void> onIncompatibleChanged();
void addPeerReference(const Endpoint&, bool isStream);
// Signal that a peer connection is being used, even if no messages are currently being sent to the peer
void addPeerReference(const Endpoint&, bool isStream);
void removePeerReference(const Endpoint&, bool isStream);
// Signal that a peer connection is no longer being used
void removePeerReference(const Endpoint&, bool isStream);
void addEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
// Sets endpoint to be a new local endpoint which delivers messages to the given receiver
void addEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
void addEndpoints(std::vector<std::pair<struct FlowReceiver*, TaskPriority>> const& streams);
void removeEndpoint(const Endpoint&, NetworkMessageReceiver*);
// The given local endpoint no longer delivers messages to the given receiver or uses resources
void removeEndpoint(const Endpoint&, NetworkMessageReceiver*);
void addWellKnownEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
// Sets endpoint to a new local endpoint (without changing its token) which delivers messages to the given receiver
// Implementations may have limitations on when this function is called and what endpoint.token may be!
void addWellKnownEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
// sendReliable will keep trying to deliver the data to the destination until cancelReliable is called. It will
// retry sending if the connection is closed or the failure manager reports the destination become available (edge
// triggered).
ReliablePacket* sendReliable(ISerializeSource const& what, const Endpoint& destination);
// sendReliable will keep trying to deliver the data to the destination until cancelReliable is
// called. It will retry sending if the connection is closed or the failure manager reports
// the destination become available (edge triggered).
// Makes Packet "unreliable" (either the data or a connection close event will be delivered eventually). It can
// still be used safely to send a reply to a "reliable" request.
void cancelReliable(ReliablePacket*);
// Makes Packet "unreliable" (either the data or a connection close event will be delivered
// eventually). It can still be used safely to send a reply to a "reliable" request.
Reference<AsyncVar<bool>> getDegraded();
// This async var will be set to true when the process cannot connect to a public network address that the failure
// monitor thinks is healthy.
Reference<AsyncVar<bool>> getDegraded();
void resetConnection(NetworkAddress address);
// Forces the connection with this address to be reset
void resetConnection(NetworkAddress address);
Reference<Peer> sendUnreliable(ISerializeSource const& what,
const Endpoint& destination,
@ -239,6 +242,14 @@ public:
bool incompatibleOutgoingConnectionsPresent();
// Returns the protocol version of the peer at the specified address. The result is returned as an AsyncVar that
// can be used to monitor for changes of a peer's protocol. The protocol version will be unset in the event that
// there is no connection established to the peer.
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> getPeerProtocolAsyncVar(NetworkAddress addr);
static FlowTransport& transport() {
return *static_cast<FlowTransport*>((void*)g_network->global(INetwork::enFlowTransport));
}

View File

@ -75,109 +75,170 @@ struct LoadBalancedReply {
Optional<LoadBalancedReply> getLoadBalancedReply(const LoadBalancedReply* reply);
Optional<LoadBalancedReply> getLoadBalancedReply(const void*);
// Returns true if we got a value for our request
// Throws an error if the request returned an error that should bubble out
// Returns false if we got an error that should result in reissuing the request
template <class T>
bool checkAndProcessResult(ErrorOr<T> result, Reference<ModelHolder> holder, bool atMostOnce, bool triedAllOptions) {
Optional<LoadBalancedReply> loadBalancedReply;
if (!result.isError()) {
loadBalancedReply = getLoadBalancedReply(&result.get());
// Stores state for a request made by the load balancer
template <class Request>
struct RequestData : NonCopyable {
typedef ErrorOr<REPLY_TYPE(Request)> Reply;
Future<Reply> response;
Reference<ModelHolder> modelHolder;
bool triedAllOptions = false;
bool requestStarted = false; // true once the request has been sent to an alternative
bool requestProcessed = false; // true once a response has been received and handled by checkAndProcessResult
// Whether or not the response future is valid
// This is true once setupRequest is called, even though at that point the response is Never().
bool isValid() { return response.isValid(); }
// Initializes the request state and starts it, possibly after a backoff delay
void startRequest(double backoff,
bool triedAllOptions,
RequestStream<Request> const* stream,
Request const& request,
QueueModel* model) {
modelHolder = Reference<ModelHolder>();
requestStarted = false;
if (backoff > 0) {
response = mapAsync<Void, std::function<Future<Reply>(Void)>, Reply>(
delay(backoff), [this, stream, &request, model](Void _) {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
return stream->tryGetReply(request);
});
} else {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
response = stream->tryGetReply(request);
}
requestProcessed = false;
this->triedAllOptions = triedAllOptions;
}
int errCode;
if (loadBalancedReply.present()) {
errCode =
loadBalancedReply.get().error.present() ? loadBalancedReply.get().error.get().code() : error_code_success;
} else {
errCode = result.isError() ? result.getError().code() : error_code_success;
}
// Implementation of the logic to handle a response.
// Checks the state of the response, updates the queue model, and returns one of the following outcomes:
// A return value of true means that the request completed successfully
// A return value of false means that the request failed but should be retried
// A return value with an error means that the error should be thrown back to original caller
static ErrorOr<bool> checkAndProcessResultImpl(Reply const& result,
Reference<ModelHolder> modelHolder,
bool atMostOnce,
bool triedAllOptions) {
ASSERT(modelHolder);
bool maybeDelivered = errCode == error_code_broken_promise || errCode == error_code_request_maybe_delivered;
bool receivedResponse = loadBalancedReply.present() ? !loadBalancedReply.get().error.present() : result.present();
receivedResponse = receivedResponse || (!maybeDelivered && errCode != error_code_process_behind);
bool futureVersion = errCode == error_code_future_version || errCode == error_code_process_behind;
Optional<LoadBalancedReply> loadBalancedReply;
if (!result.isError()) {
loadBalancedReply = getLoadBalancedReply(&result.get());
}
holder->release(
receivedResponse, futureVersion, loadBalancedReply.present() ? loadBalancedReply.get().penalty : -1.0);
int errCode;
if (loadBalancedReply.present()) {
errCode = loadBalancedReply.get().error.present() ? loadBalancedReply.get().error.get().code()
: error_code_success;
} else {
errCode = result.isError() ? result.getError().code() : error_code_success;
}
bool maybeDelivered = errCode == error_code_broken_promise || errCode == error_code_request_maybe_delivered;
bool receivedResponse =
loadBalancedReply.present() ? !loadBalancedReply.get().error.present() : result.present();
receivedResponse = receivedResponse || (!maybeDelivered && errCode != error_code_process_behind);
bool futureVersion = errCode == error_code_future_version || errCode == error_code_process_behind;
modelHolder->release(
receivedResponse, futureVersion, loadBalancedReply.present() ? loadBalancedReply.get().penalty : -1.0);
if (errCode == error_code_server_overloaded) {
return false;
}
if (loadBalancedReply.present() && !loadBalancedReply.get().error.present()) {
return true;
}
if (!loadBalancedReply.present() && result.present()) {
return true;
}
if (receivedResponse) {
return loadBalancedReply.present() ? loadBalancedReply.get().error.get() : result.getError();
}
if (atMostOnce && maybeDelivered) {
return request_maybe_delivered();
}
if (triedAllOptions && errCode == error_code_process_behind) {
return process_behind();
}
if (errCode == error_code_server_overloaded) {
return false;
}
if (loadBalancedReply.present() && !loadBalancedReply.get().error.present()) {
return true;
// Checks the state of the response, updates the queue model, and returns one of the following outcomes:
// A return value of true means that the request completed successfully
// A return value of false means that the request failed but should be retried
// In the event of a non-retryable failure, an error is thrown indicating the failure
bool checkAndProcessResult(bool atMostOnce) {
ASSERT(response.isReady());
requestProcessed = true;
ErrorOr<bool> outcome =
checkAndProcessResultImpl(response.get(), std::move(modelHolder), atMostOnce, triedAllOptions);
if (outcome.isError()) {
throw outcome.getError();
} else if (!outcome.get()) {
response = Future<Reply>();
}
return outcome.get();
}
if (!loadBalancedReply.present() && result.present()) {
return true;
// Convert this request to a lagging request. Such a request is no longer being waited on, but it still needs to be
// processed so we can update the queue model.
void makeLaggingRequest() {
ASSERT(response.isValid());
ASSERT(!response.isReady());
ASSERT(modelHolder);
ASSERT(modelHolder->model);
QueueModel* model = modelHolder->model;
if (model->laggingRequestCount > FLOW_KNOBS->MAX_LAGGING_REQUESTS_OUTSTANDING ||
model->laggingRequests.isReady()) {
model->laggingRequests.cancel();
model->laggingRequestCount = 0;
model->addActor = PromiseStream<Future<Void>>();
model->laggingRequests = actorCollection(model->addActor.getFuture(), &model->laggingRequestCount);
}
// We need to process the lagging request in order to update the queue model
Reference<ModelHolder> holderCapture = std::move(modelHolder);
bool triedAllOptionsCapture = triedAllOptions;
Future<Void> updateModel = map(response, [holderCapture, triedAllOptionsCapture](Reply result) {
checkAndProcessResultImpl(result, holderCapture, false, triedAllOptionsCapture);
return Void();
});
model->addActor.send(updateModel);
}
if (receivedResponse) {
throw loadBalancedReply.present() ? loadBalancedReply.get().error.get() : result.getError();
}
if (atMostOnce && maybeDelivered) {
throw request_maybe_delivered();
}
if (triedAllOptions && errCode == error_code_process_behind) {
throw process_behind();
}
return false;
}
ACTOR template <class Request>
Future<Optional<REPLY_TYPE(Request)>> makeRequest(RequestStream<Request> const* stream,
Request request,
double backoff,
Future<Void> requestUnneeded,
QueueModel* model,
bool isFirstRequest,
bool atMostOnce,
bool triedAllOptions) {
if (backoff > 0.0) {
wait(delay(backoff) || requestUnneeded);
}
if (requestUnneeded.isReady()) {
return Optional<REPLY_TYPE(Request)>();
}
state Reference<ModelHolder> holder(new ModelHolder(model, stream->getEndpoint().token.first()));
ErrorOr<REPLY_TYPE(Request)> result = wait(stream->tryGetReply(request));
if (checkAndProcessResult(result, holder, atMostOnce, triedAllOptions)) {
return result.get();
} else {
return Optional<REPLY_TYPE(Request)>();
}
}
template <class Reply>
void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinished, QueueModel* model) {
requestFinished.send(Void());
if (!reply.isReady()) {
if (model) {
if (model->laggingRequestCount > FLOW_KNOBS->MAX_LAGGING_REQUESTS_OUTSTANDING ||
model->laggingRequests.isReady()) {
model->laggingRequests.cancel();
model->laggingRequestCount = 0;
model->addActor = PromiseStream<Future<Void>>();
model->laggingRequests = actorCollection(model->addActor.getFuture(), &model->laggingRequestCount);
}
model->addActor.send(success(errorOr(reply)));
~RequestData() {
// If the request has been started but hasn't completed, mark it as a lagging request
if (requestStarted && !requestProcessed && modelHolder && modelHolder->model) {
makeLaggingRequest();
}
}
}
};
// Keep trying to get a reply from any of servers until success or cancellation; tries to take into account
// failMon's information for load balancing and avoiding failed servers
// Try to get a reply from one of the alternatives until success, cancellation, or certain errors.
// Load balancing has a budget to race requests to a second alternative if the first request is slow.
// Tries to take into account failMon's information for load balancing and avoiding failed servers.
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the
// list of servers. When model is set, load balance among alternatives in the same DC, aiming to balance request queue
// length on these interfaces. If too many interfaces in the same DC are bad, try remote interfaces.
// list of servers.
// When model is set, load balance among alternatives in the same DC aims to balance request queue length on these
// interfaces. If too many interfaces in the same DC are bad, try remote interfaces.
ACTOR template <class Interface, class Request, class Multi>
Future<REPLY_TYPE(Request)> loadBalance(
Reference<MultiInterface<Multi>> alternatives,
@ -186,9 +247,11 @@ Future<REPLY_TYPE(Request)> loadBalance(
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = nullptr) {
state Future<Optional<REPLY_TYPE(Request)>> firstRequest;
state RequestData<Request> firstRequestData;
state RequestData<Request> secondRequestData;
state Optional<uint64_t> firstRequestEndpoint;
state Future<Optional<REPLY_TYPE(Request)>> secondRequest;
state Future<Void> secondDelay = Never();
state Promise<Void> requestFinished;
@ -320,7 +383,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
}
// Find an alternative, if any, that is not failed, starting with
// nextAlt. This logic matters only if model == NULL. Otherwise, the
// nextAlt. This logic matters only if model == nullptr. Otherwise, the
// bestAlt and nextAlt have been decided.
state RequestStream<Request> const* stream = nullptr;
for (int alternativeNum = 0; alternativeNum < alternatives->size(); alternativeNum++) {
@ -340,7 +403,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
stream = nullptr;
}
if (!stream && !firstRequest.isValid()) {
if (!stream && !firstRequestData.isValid()) {
// Everything is down! Wait for someone to be up.
vector<Future<Void>> ok(alternatives->size());
@ -391,50 +454,33 @@ Future<REPLY_TYPE(Request)> loadBalance(
numAttempts = 0; // now that we've got a server back, reset the backoff
} else if (!stream) {
// Only the first location is available.
Optional<REPLY_TYPE(Request)> result = wait(firstRequest);
if (result.present()) {
ErrorOr<REPLY_TYPE(Request)> result = wait(firstRequestData.response);
if (firstRequestData.checkAndProcessResult(atMostOnce)) {
return result.get();
}
firstRequest = Future<Optional<REPLY_TYPE(Request)>>();
firstRequestEndpoint = Optional<uint64_t>();
} else if (firstRequest.isValid()) {
} else if (firstRequestData.isValid()) {
// Issue a second request, the first one is taking a long time.
secondRequest = makeRequest(
stream, request, backoff, requestFinished.getFuture(), model, false, atMostOnce, triedAllOptions);
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
state bool firstFinished = false;
loop {
choose {
when(ErrorOr<Optional<REPLY_TYPE(Request)>> result =
wait(firstRequest.isValid() ? errorOr(firstRequest) : Never())) {
if (result.isError() || result.get().present()) {
addLaggingRequest(secondRequest, requestFinished, model);
if (result.isError()) {
throw result.getError();
} else {
return result.get().get();
}
}
firstRequest = Future<Optional<REPLY_TYPE(Request)>>();
firstRequestEndpoint = Optional<uint64_t>();
firstFinished = true;
loop choose {
when(ErrorOr<REPLY_TYPE(Request)> result =
wait(firstRequestData.response.isValid() ? firstRequestData.response : Never())) {
if (firstRequestData.checkAndProcessResult(atMostOnce)) {
return result.get();
}
when(ErrorOr<Optional<REPLY_TYPE(Request)>> result = wait(errorOr(secondRequest))) {
if (result.isError() || result.get().present()) {
if (!firstFinished) {
addLaggingRequest(firstRequest, requestFinished, model);
}
if (result.isError()) {
throw result.getError();
} else {
return result.get().get();
}
}
break;
firstRequestEndpoint = Optional<uint64_t>();
firstFinished = true;
}
when(ErrorOr<REPLY_TYPE(Request)> result = wait(secondRequestData.response)) {
if (secondRequestData.checkAndProcessResult(atMostOnce)) {
return result.get();
}
break;
}
}
@ -445,13 +491,12 @@ Future<REPLY_TYPE(Request)> loadBalance(
}
} else {
// Issue a request, if it takes too long to get a reply, go around the loop
firstRequest = makeRequest(
stream, request, backoff, requestFinished.getFuture(), model, true, atMostOnce, triedAllOptions);
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
firstRequestEndpoint = stream->getEndpoint().token.first();
loop {
choose {
when(ErrorOr<Optional<REPLY_TYPE(Request)>> result = wait(errorOr(firstRequest))) {
when(ErrorOr<REPLY_TYPE(Request)> result = wait(firstRequestData.response)) {
if (model) {
model->secondMultiplier =
std::max(model->secondMultiplier - FLOW_KNOBS->SECOND_REQUEST_MULTIPLIER_DECAY, 1.0);
@ -460,15 +505,10 @@ Future<REPLY_TYPE(Request)> loadBalance(
FLOW_KNOBS->SECOND_REQUEST_MAX_BUDGET);
}
if (result.isError()) {
throw result.getError();
if (firstRequestData.checkAndProcessResult(atMostOnce)) {
return result.get();
}
if (result.get().present()) {
return result.get().get();
}
firstRequest = Future<Optional<REPLY_TYPE(Request)>>();
firstRequestEndpoint = Optional<uint64_t>();
break;
}

View File

@ -616,6 +616,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
//Worker
init( WORKER_LOGGING_INTERVAL, 5.0 );
init( HEAP_PROFILER_INTERVAL, 30.0 );
init( REGISTER_WORKER_REQUEST_TIMEOUT, 300.0 );
init( DEGRADED_RESET_INTERVAL, 24*60*60 ); if ( randomize && BUGGIFY ) DEGRADED_RESET_INTERVAL = 10;
init( DEGRADED_WARNING_LIMIT, 1 );
init( DEGRADED_WARNING_RESET_DELAY, 7*24*60*60 );

View File

@ -543,6 +543,7 @@ public:
// Worker
double WORKER_LOGGING_INTERVAL;
double HEAP_PROFILER_INTERVAL;
double REGISTER_WORKER_REQUEST_TIMEOUT;
double DEGRADED_RESET_INTERVAL;
double DEGRADED_WARNING_LIMIT;
double DEGRADED_WARNING_RESET_DELAY;

View File

@ -143,7 +143,9 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
}
coordinators.ccf->setConnectionString(
ClusterConnectionString(leader.get().first.serializedInfo.toString()));
TraceEvent("LeaderForwarding").detail("ConnStr", coordinators.ccf->getConnectionString().toString());
TraceEvent("LeaderForwarding")
.detail("ConnStr", coordinators.ccf->getConnectionString().toString())
.trackLatest("LeaderForwarding");
throw coordinators_changed();
}

View File

@ -615,6 +615,11 @@ struct RolesInfo {
TraceEventFields const& commitLatencyBands = metrics.at("CommitLatencyBands");
if (commitLatencyBands.size()) {
obj["commit_latency_bands"] = addLatencyBandInfo(commitLatencyBands);
}
TraceEventFields const& commitBatchingWindowSize = metrics.at("CommitBatchingWindowSize");
if (commitBatchingWindowSize.size()) {
obj["commit_batching_window_size"] = addLatencyStatistics(commitBatchingWindowSize);
}
} catch (Error& e) {
if (e.code() != error_code_attribute_not_found) {
@ -1853,7 +1858,7 @@ ACTOR static Future<vector<std::pair<CommitProxyInterface, EventMap>>> getCommit
vector<std::pair<CommitProxyInterface, EventMap>> results =
wait(getServerMetrics(db->get().client.commitProxies,
address_workers,
std::vector<std::string>{ "CommitLatencyMetrics", "CommitLatencyBands" }));
std::vector<std::string>{ "CommitLatencyMetrics", "CommitLatencyBands", "CommitBatchingWindowSize"}));
return results;
}

View File

@ -562,21 +562,27 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
}
}
Future<RegisterWorkerReply> registrationReply =
state Future<RegisterWorkerReply> registrationReply =
ccInterface->get().present()
? brokenPromiseToNever(ccInterface->get().get().registerWorker.getReply(request))
: Never();
choose {
state double startTime = now();
loop choose {
when(RegisterWorkerReply reply = wait(registrationReply)) {
processClass = reply.processClass;
asyncPriorityInfo->set(reply.priorityInfo);
TraceEvent("WorkerRegisterReply").detail("CCID", ccInterface->get().get().id());
break;
}
when(wait(ccInterface->onChange())) {}
when(wait(ddInterf->onChange())) {}
when(wait(rkInterf->onChange())) {}
when(wait(degraded->onChange())) {}
when(wait(FlowTransport::transport().onIncompatibleChanged())) {}
when(wait(issues->onChange())) {}
when(wait(delay(SERVER_KNOBS->REGISTER_WORKER_REQUEST_TIMEOUT))) {
TraceEvent(SevWarn, "WorkerRegisterTimeout").detail("WaitTime", now() - startTime);
}
when(wait(ccInterface->onChange())) { break; }
when(wait(ddInterf->onChange())) { break; }
when(wait(rkInterf->onChange())) { break; }
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
}
}
}

View File

@ -22,40 +22,35 @@
#define FLOW_FASTREF_H
#pragma once
#include <atomic>
#include <cstdint>
#include "flow/Platform.h"
#if VALGRIND
#include <drd.h>
#endif
// The thread safety this class provides is that it's safe to call addref and
// delref on the same object concurrently in different threads. Subclass does
// not get deleted until after all calls to delref complete.
//
// Importantly, this class does _not_ make accessing Subclass automatically
// thread safe. Clients will need to provide their own external synchronization
// for that.
template <class Subclass>
class ThreadSafeReferenceCounted {
public:
ThreadSafeReferenceCounted() : referenceCount(1) {}
// NO virtual destructor! Subclass should have a virtual destructor if it is not sealed.
void addref() const { ++referenceCount; }
void addref() const { referenceCount.fetch_add(1); }
// If return value is true, caller is responsible for destruction of object
bool delref_no_destroy() const {
if (--referenceCount != 0) {
#ifdef VALGRIND
ANNOTATE_HAPPENS_BEFORE(&referenceCount);
#endif
return false;
}
#ifdef VALGRIND
ANNOTATE_HAPPENS_AFTER(&referenceCount);
#endif
return true;
// The performance of this seems comparable to a version with less strict memory ordering (see e.g.
// https://www.boost.org/doc/libs/1_57_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_reference_counters),
// on both x86 and ARM, with gcc8.
return referenceCount.fetch_sub(1) == 1;
}
void delref() const {
if (delref_no_destroy())
delete (Subclass*)this;
}
void setrefCountUnsafe(int32_t count) const { referenceCount = count; }
int32_t debugGetReferenceCount() const { return referenceCount; } // Never use in production code, only for tracing
bool isSoleOwnerUnsafe() const { return referenceCount == 1; }
void setrefCountUnsafe(int32_t count) const { referenceCount.store(count); }
int32_t debugGetReferenceCount() const { return referenceCount.load(); }
private:
ThreadSafeReferenceCounted(const ThreadSafeReferenceCounted&) /* = delete*/;

View File

@ -20,6 +20,7 @@
#pragma once
#include <cstdint>
#include "flow/Trace.h"
#define PROTOCOL_VERSION_FEATURE(v, x) \
struct x { \
@ -50,6 +51,10 @@ public:
return (other.version() & compatibleProtocolVersionMask) == (version() & compatibleProtocolVersionMask);
}
// Returns a normalized protocol version that will be the same for all compatible versions
constexpr ProtocolVersion normalizedVersion() const {
return ProtocolVersion(_version & compatibleProtocolVersionMask);
}
constexpr bool isValid() const { return version() >= minValidProtocolVersion; }
constexpr uint64_t version() const { return _version & versionFlagMask; }
@ -86,7 +91,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00A446020000LL, Locality);
PROTOCOL_VERSION_FEATURE(0x0FDB00A460010000LL, MultiGenerationTLog);
PROTOCOL_VERSION_FEATURE(0x0FDB00A460010000LL, SharedMutations);
PROTOCOL_VERSION_FEATURE(0x0FDB00A551000000LL, MultiVersionClient);
PROTOCOL_VERSION_FEATURE(0x0FDB00A551000000LL, InexpensiveMultiVersionClient);
PROTOCOL_VERSION_FEATURE(0x0FDB00A560010000LL, TagLocality);
PROTOCOL_VERSION_FEATURE(0x0FDB00B060000000LL, Fearless);
PROTOCOL_VERSION_FEATURE(0x0FDB00B061020000LL, EndpointAddrList);
@ -113,6 +118,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, BackupMutations);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ClusterControllerPriorityInfo);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ProcessIDFile);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, CloseUnusedConnection);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, DBCoreState);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, TagThrottleValue);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ServerListValue);
@ -134,6 +140,13 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext);
};
template <>
struct Traceable<ProtocolVersion> : std::true_type {
static std::string toString(const ProtocolVersion& protocolVersion) {
return format("0x%016lX", protocolVersion.version());
}
};
// These impact both communications and the deserialization of certain database and IKeyValueStore keys.
//
// The convention is that 'x' and 'y' should match the major and minor version of the software, and 'z' should be 0.

View File

@ -253,21 +253,36 @@ LoadedTLSConfig TLSConfig::loadSync() const {
const std::string certPath = getCertificatePathSync();
if (certPath.size()) {
loaded.tlsCertBytes = readFileBytes(certPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
try {
loaded.tlsCertBytes = readFileBytes(certPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
} catch (Error& e) {
fprintf(stderr, "Error reading TLS Certificate [%s]: %s\n", certPath.c_str(), e.what());
throw;
}
} else {
loaded.tlsCertBytes = tlsCertBytes;
}
const std::string keyPath = getKeyPathSync();
if (keyPath.size()) {
loaded.tlsKeyBytes = readFileBytes(keyPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
try {
loaded.tlsKeyBytes = readFileBytes(keyPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
} catch (Error& e) {
fprintf(stderr, "Error reading TLS Key [%s]: %s\n", keyPath.c_str(), e.what());
throw;
}
} else {
loaded.tlsKeyBytes = tlsKeyBytes;
}
const std::string CAPath = getCAPathSync();
if (CAPath.size()) {
loaded.tlsCABytes = readFileBytes(CAPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
try {
loaded.tlsCABytes = readFileBytes(CAPath, FLOW_KNOBS->CERT_FILE_MAX_SIZE);
} catch (Error& e) {
fprintf(stderr, "Error reading TLS CA [%s]: %s\n", CAPath.c_str(), e.what());
throw;
}
} else {
loaded.tlsCABytes = tlsCABytes;
}
@ -297,28 +312,49 @@ ACTOR Future<LoadedTLSConfig> TLSConfig::loadAsync(const TLSConfig* self) {
state LoadedTLSConfig loaded;
state std::vector<Future<Void>> reads;
const std::string& certPath = self->getCertificatePathSync();
state int32_t certIdx = -1;
state int32_t keyIdx = -1;
state int32_t caIdx = -1;
state std::string certPath = self->getCertificatePathSync();
if (certPath.size()) {
reads.push_back(readEntireFile(certPath, &loaded.tlsCertBytes));
certIdx = reads.size() - 1;
} else {
loaded.tlsCertBytes = self->tlsCertBytes;
}
const std::string& keyPath = self->getKeyPathSync();
state std::string keyPath = self->getKeyPathSync();
if (keyPath.size()) {
reads.push_back(readEntireFile(keyPath, &loaded.tlsKeyBytes));
keyIdx = reads.size() - 1;
} else {
loaded.tlsKeyBytes = self->tlsKeyBytes;
}
const std::string& CAPath = self->getCAPathSync();
state std::string CAPath = self->getCAPathSync();
if (CAPath.size()) {
reads.push_back(readEntireFile(CAPath, &loaded.tlsCABytes));
caIdx = reads.size() - 1;
} else {
loaded.tlsCABytes = self->tlsCABytes;
}
wait(waitForAll(reads));
try {
wait(waitForAll(reads));
} catch (Error& e) {
if (certIdx != -1 && reads[certIdx].isError()) {
fprintf(stderr, "Failure reading TLS Certificate [%s]: %s\n", certPath.c_str(), e.what());
} else if (keyIdx != -1 && reads[keyIdx].isError()) {
fprintf(stderr, "Failure reading TLS Key [%s]: %s\n", keyPath.c_str(), e.what());
} else if (caIdx != -1 && reads[caIdx].isError()) {
fprintf(stderr, "Failure reading TLS Key [%s]: %s\n", CAPath.c_str(), e.what());
} else {
fprintf(stderr, "Failure reading TLS needed file: %s\n", e.what());
}
throw;
}
loaded.tlsPassword = self->tlsPassword;
loaded.tlsVerifyPeers = self->tlsVerifyPeers;

View File

@ -26,12 +26,14 @@
#include <memory>
struct Empty : public ReferenceCounted<Empty>, public FastAllocated<Empty> {};
struct EmptyTSRC : public ThreadSafeReferenceCounted<EmptyTSRC>, public FastAllocated<EmptyTSRC> {};
enum class RefType {
RawPointer,
UniquePointer,
SharedPointer,
FlowReference,
FlowReferenceThreadSafe,
};
template <RefType refType>
@ -61,6 +63,12 @@ struct Factory<RefType::FlowReference> {
static void cleanup(const Reference<Empty>&) {}
};
template <>
struct Factory<RefType::FlowReferenceThreadSafe> {
static Reference<EmptyTSRC> create() { return makeReference<EmptyTSRC>(); }
static void cleanup(const Reference<EmptyTSRC>&) {}
};
template <RefType refType>
static void bench_ref_create_and_destroy(benchmark::State& state) {
while (state.KeepRunning()) {
@ -86,7 +94,9 @@ BENCHMARK_TEMPLATE(bench_ref_create_and_destroy, RefType::RawPointer)->ReportAgg
BENCHMARK_TEMPLATE(bench_ref_create_and_destroy, RefType::UniquePointer)->ReportAggregatesOnly(true);
BENCHMARK_TEMPLATE(bench_ref_create_and_destroy, RefType::SharedPointer)->ReportAggregatesOnly(true);
BENCHMARK_TEMPLATE(bench_ref_create_and_destroy, RefType::FlowReference)->ReportAggregatesOnly(true);
BENCHMARK_TEMPLATE(bench_ref_create_and_destroy, RefType::FlowReferenceThreadSafe)->ReportAggregatesOnly(true);
BENCHMARK_TEMPLATE(bench_ref_copy, RefType::RawPointer)->ReportAggregatesOnly(true);
BENCHMARK_TEMPLATE(bench_ref_copy, RefType::SharedPointer)->ReportAggregatesOnly(true);
BENCHMARK_TEMPLATE(bench_ref_copy, RefType::FlowReference)->ReportAggregatesOnly(true);
BENCHMARK_TEMPLATE(bench_ref_copy, RefType::FlowReferenceThreadSafe)->ReportAggregatesOnly(true);

View File

@ -0,0 +1,89 @@
FROM amazonlinux:2.0.20210326.0 as base
RUN yum install -y \
bind-utils \
curl \
jq \
less \
lsof \
nc \
net-tools \
perf \
python38 \
python3-pip \
strace \
tar \
traceroute \
telnet \
tcpdump \
vim
#todo: nload, iperf, numademo
COPY misc/tini-amd64.sha256sum /tmp/
# Adding tini as PID 1 https://github.com/krallin/tini
ARG TINI_VERSION=v0.19.0
RUN curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64 && \
sha256sum -c /tmp/tini-amd64.sha256sum && \
chmod +x tini-amd64 && \
mv tini-amd64 /usr/bin/tini
COPY sidecar/requirements.txt /tmp
RUN pip3 install -r /tmp/requirements.txt
# TODO: Only used by sidecar
RUN groupadd --gid 4059 fdb && \
useradd --gid 4059 --uid 4059 --no-create-home --shell /bin/bash fdb
ARG FDB_VERSION
# These are the output of the current build (not stripped)
COPY --chown=root bin /usr/bin/
COPY --chown=root lib/libfdb_c.so /var/fdb/lib/
RUN mv /var/fdb/lib/libfdb_c.so /var/fdb/lib/libfdb_c_${FDB_VERSION%.*}.so
RUN ln -s /var/fdb/lib/libfdb_c_${FDB_VERSION%.*}.so /var/fdb/lib/libfdb_c.so
# -------------------------------------------------
FROM base as foundationdb
COPY release/*.bash /var/fdb/scripts/
RUN mkdir -p /var/fdb/logs
# TODO: FDB_ADDITIONAL_VERSIONS
RUN mkdir -p /usr/lib/fdb/multiversion
VOLUME /var/fdb/data
# Runtime Configuration Options
ENV FDB_PORT 4500
ENV FDB_CLUSTER_FILE /var/fdb/fdb.cluster
ENV FDB_NETWORKING_MODE container
ENV FDB_COORDINATOR ""
ENV FDB_COORDINATOR_PORT 4500
ENV FDB_CLUSTER_FILE_CONTENTS ""
ENV FDB_PROCESS_CLASS unset
ENTRYPOINT ["/usr/bin/tini", "-g", "--"]
CMD /var/fdb/scripts/fdb.bash
# -------------------------------------------------
FROM base AS sidecar
COPY sidecar/entrypoint.bash /
COPY sidecar/sidecar.py /
RUN chmod a+x /sidecar.py /entrypoint.bash
VOLUME /var/input-files
VOLUME /var/output-files
ARG FDB_VERSION
RUN echo ${FDB_VERSION} ; echo ${FDB_VERSION}> /var/fdb/version
RUN mkdir -p /var/fdb/lib
ENV LISTEN_PORT 8080
USER fdb
ENTRYPOINT ["/usr/bin/tini", "-g", "--", "/entrypoint.bash"]

View File

@ -1,21 +1,29 @@
# Overview
This directory provides a Docker image for running FoundationDB.
This directory provides various Docker images for running FoundationDB.
The image in this directory is based on Ubuntu 18.04, but the commands and
scripts used to build it should be suitable for most other distros with small
tweaks to the installation of dependencies.
The image relies on the following dependencies:
* bash
* wget
* dig
* glibc
This directory includes two sets of images. The "release" images are based
on Ubuntu 18.04. The EKS images use Amazon Linux, which allows us to profile
FoundationDB when it is running inside of Amazon EKS.
# Build Configuration
This image supports several build arguments for build-time configuration.
The build scripts are configured using the following environment variables:
`TAG` is the base docker tag for this build. The sidecar tag will be this
string, with a "-1" appended to it. If you do not specify a tag, then the
scripts attempt to provide a reasonable default.
`ECR` is the name of the Docker registry the images should be published to.
It defaults to a private registry, so it is likely you will need to override this.
`STRIPPED` if true, the Docker images will contain stripped binaries without
debugging symbols. Debugging symbols add approximately 2GiB to the image size.
# Release Dockerfile arguments.
These arguments are set automatically by the build scripts, but are documented here
in case you need to invoke the release Dockerfiles directly.
### FDB_VERSION
@ -26,6 +34,10 @@ The version of FoundationDB to install in the container. This is required.
The base URL for the FoundationDB website. The default is
`https://www.foundationdb.org`.
You can build the docker without talking to a webserver by using the URL
`file:///mnt/website` and mirroring the directory tree of the webserver
inside the `website` subdirectory.
### FDB_ADDITIONAL_VERSIONS
A list of additional client library versions to include in this image. These
@ -76,3 +88,24 @@ files you may want to copy are:
* `/var/fdb/scripts/create_cluster_file.bash`: A script for setting up the
cluster file based on an `FDB_COORDINATOR` environment variable.
* `/usr/bin/fdbcli`: The FoundationDB CLI.
If you are running FDB inside of a Kubernetes cluster, you should probably use
the sidecar image instead. It makes it easier to automatically copy a compatible
`libfdb_c.so` and cluster file into application containers.
TODO: Document sidecar.py
# Example Usages
### Build an Ubuntu-based image with a custom tag and unstripped binaries
```
# compile FDB, then:
cd ~/build_output/packages/docker/
TAG=my-custom-tag ./build-release-docker.sh
```
### Build an Amazon Linux-based image with a default tag and stripped binaries
```
# compile FDB, then:
cd ~/build_output/packages/docker/
STRIPPED=true ./build-eks-docker.sh
```

View File

@ -0,0 +1,60 @@
#!/bin/sh
set -euxo pipefail
DOCKER_ROOT=$(realpath $(dirname ${BASH_SOURCE[0]}))
BUILD_OUTPUT=$(realpath ${DOCKER_ROOT}/../..)
echo Docker root: $DOCKER_ROOT
echo Build output: $BUILD_OUTPUT
cd ${DOCKER_ROOT}
## eg: CMAKE_PROJECT_VERSION:STATIC=7.0.0
FDB_VERSION=$(grep CMAKE_PROJECT_VERSION\: ${BUILD_OUTPUT}/CMakeCache.txt | cut -d '=' -f 2)
# Options (passed via environment variables)
# Feel free to customize the image tag.
# TODO: add a mechanism to set TAG=FDB_VERSION when we're building public releases.
TAG=${TAG:-${FDB_VERSION}-${OKTETO_NAME}}
ECR=${ECR:-112664522426.dkr.ecr.us-west-2.amazonaws.com}
echo Building with tag ${TAG}
# Login to ECR
# TODO: Move this to a common place instead of repeatedly copy-pasting it.
aws ecr get-login-password | docker login --username AWS --password-stdin ${ECR}
docker pull ${ECR}/amazonlinux:2.0.20210326.0
docker tag ${ECR}/amazonlinux:2.0.20210326.0 amazonlinux:2.0.20210326.0
#derived variables
IMAGE=foundationdb/foundationdb:${TAG}
SIDECAR=foundationdb/foundationdb-kubernetes-sidecar:${TAG}-1
STRIPPED=${STRIPPED:-false}
if $STRIPPED; then
rsync -av --delete --exclude=*.xml ${BUILD_OUTPUT}/packages/bin .
rsync -av --delete --exclude=*.a --exclude=*.xml ${BUILD_OUTPUT}/packages/lib .
else
rsync -av --delete --exclude=*.xml ${BUILD_OUTPUT}/bin .
rsync -av --delete --exclude=*.a --exclude=*.xml ${BUILD_OUTPUT}/lib .
fi
BUILD_ARGS="--build-arg FDB_VERSION=$FDB_VERSION"
docker build ${BUILD_ARGS} -t ${IMAGE} --target foundationdb -f Dockerfile.eks .
docker build ${BUILD_ARGS} -t ${SIDECAR} --target sidecar -f Dockerfile.eks .
docker tag ${IMAGE} ${ECR}/${IMAGE}
docker tag ${SIDECAR} ${ECR}/${SIDECAR}
docker push ${ECR}/${IMAGE}
docker push ${ECR}/${SIDECAR}

View File

@ -0,0 +1,62 @@
#!/bin/bash
set -euxo pipefail
DOCKER_ROOT=$(realpath $(dirname ${BASH_SOURCE[0]}))
BUILD_OUTPUT=$(realpath ${DOCKER_ROOT}/../..)
echo Docker root: $DOCKER_ROOT
echo Build output: $BUILD_OUTPUT
cd ${DOCKER_ROOT}
## eg: CMAKE_PROJECT_VERSION:STATIC=7.0.0
FDB_VERSION=$(grep CMAKE_PROJECT_VERSION\: ${BUILD_OUTPUT}/CMakeCache.txt | cut -d '=' -f 2)
# Options (passed via environment variables)
# Feel free to customize the image tag.
# TODO: add a mechanism to set TAG=FDB_VERSION when we're building public releases.
TAG=${TAG:-${FDB_VERSION}-${OKTETO_NAME}}
ECR=${ECR:-112664522426.dkr.ecr.us-west-2.amazonaws.com}
echo Building with tag ${TAG}
# Login to ECR
# TODO: Move this to a common place instead of repeatedly copy-pasting it.
aws ecr get-login-password | docker login --username AWS --password-stdin ${ECR}
docker pull ${ECR}/ubuntu:18.04
docker tag ${ECR}/ubuntu:18.04 ubuntu:18.04
docker pull ${ECR}/python:3.9-slim
docker tag ${ECR}/python:3.9-slim python:3.9-slim
# derived variables
IMAGE=foundationdb/foundationdb:${TAG}
SIDECAR=foundationdb/foundationdb-kubernetes-sidecar:${TAG}-1
STRIPPED=${STRIPPED:-false}
WEBSITE_BIN_DIR=website/downloads/${FDB_VERSION}/linux/
TARBALL=${WEBSITE_BIN_DIR}/fdb_${FDB_VERSION}.tar.gz
mkdir -p ${WEBSITE_BIN_DIR}
if $STRIPPED; then
tar -C ~/build_output/packages/ -zcvf ${TARBALL} bin lib
cp ~/build_output/packages/lib/libfdb_c.so ${WEBSITE_BIN_DIR}/libfdb_c_${FDB_VERSION}.so
else
tar -C ~/build_output/ -zcvf ${TARBALL} bin lib
cp ~/build_output/lib/libfdb_c.so ${WEBSITE_BIN_DIR}/libfdb_c_${FDB_VERSION}.so
fi
BUILD_ARGS="--build-arg FDB_WEBSITE=file:///mnt/website "
BUILD_ARGS+="--build-arg FDB_VERSION=$FDB_VERSION "
BUILD_ARGS+="--build-arg FDB_ADDITIONAL_VERSIONS=$FDB_VERSION"
docker build -t ${IMAGE} ${BUILD_ARGS} -f release/Dockerfile .
docker build -t ${SIDECAR} ${BUILD_ARGS} -f sidecar/Dockerfile .
docker tag ${IMAGE} ${ECR}/${IMAGE}
docker tag ${SIDECAR} ${ECR}/${SIDECAR}
docker push ${ECR}/${IMAGE}
docker push ${ECR}/${SIDECAR}

View File

@ -0,0 +1 @@
93dcc18adc78c65a028a84799ecf8ad40c936fdfc5f2a57b1acda5a8117fa82c tini-amd64

View File

@ -19,8 +19,6 @@
FROM ubuntu:18.04
# Install dependencies
RUN apt-get update && \
apt-get install -y curl>=7.58.0-2ubuntu3.6 \
dnsutils>=1:9.11.3+dfsg-1ubuntu1.7 \
@ -34,47 +32,51 @@ RUN apt-get update && \
vim>=2:8.0.1453-1ubuntu1.4 \
net-tools>=1.60+git20161116.90da8a0-1ubuntu1 \
jq>=1.5+dfsg-2 && \
rm -r /var/lib/apt/lists/*
rm -rf /var/lib/apt/lists/*
# Install FoundationDB Binaries
COPY misc/tini-amd64.sha256sum /tmp/
# Adding tini as PID 1 https://github.com/krallin/tini
ARG TINI_VERSION=v0.19.0
RUN curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64 && \
sha256sum -c /tmp/tini-amd64.sha256sum && \
chmod +x tini-amd64 && \
mv tini-amd64 /usr/bin/tini
ARG FDB_VERSION
ARG FDB_ADDITIONAL_VERSIONS="5.1.7"
ARG FDB_WEBSITE=https://www.foundationdb.org
WORKDIR /var/fdb/tmp
ADD website /mnt/website
RUN ls -l /mnt/website
RUN curl $FDB_WEBSITE/downloads/$FDB_VERSION/linux/fdb_$FDB_VERSION.tar.gz -o fdb_$FDB_VERSION.tar.gz && \
tar -xzf fdb_$FDB_VERSION.tar.gz --strip-components=1 && \
rm fdb_$FDB_VERSION.tar.gz && \
COPY website /mnt/website/
# Install FoundationDB Binaries
RUN curl $FDB_WEBSITE/downloads/$FDB_VERSION/linux/fdb_$FDB_VERSION.tar.gz | tar zxf - --strip-components=1 && \
chmod u+x fdbbackup fdbcli fdbdr fdbmonitor fdbrestore fdbserver backup_agent dr_agent && \
mv fdbbackup fdbcli fdbdr fdbmonitor fdbrestore fdbserver backup_agent dr_agent /usr/bin && \
rm -r /var/fdb/tmp
WORKDIR /var/fdb
WORKDIR /
# Install FoundationDB Client Libraries
## TODO: Can unify everything above this line
## TODO: we can almost unify the additional client library download,
## but sidecar.py expects them in a different location,
## with a different naming convention.
ARG FDB_ADDITIONAL_VERSIONS="5.1.7"
COPY download_multiversion_libraries.bash scripts/
RUN curl $FDB_WEBSITE/downloads/$FDB_VERSION/linux/libfdb_c_$FDB_VERSION.so -o /usr/lib/libfdb_c.so && \
bash scripts/download_multiversion_libraries.bash $FDB_WEBSITE $FDB_ADDITIONAL_VERSIONS && \
rm -rf /mnt/website
RUN curl $FDB_WEBSITE/downloads/$FDB_VERSION/linux/libfdb_c_$FDB_VERSION.so -o /usr/lib/libfdb_c.so
# Set Up Runtime Scripts and Directories
ADD release/*.bash /var/fdb/scripts/
RUN chmod a+x /var/fdb/scripts/*.bash
# Install additional FoundationDB Client Libraries
RUN /var/fdb/scripts/download_multiversion_libraries.bash $FDB_WEBSITE $FDB_ADDITIONAL_VERSIONS
RUN rm -rf /mnt/website
RUN mkdir -p logs
COPY fdb.bash scripts/
COPY create_server_environment.bash scripts/
COPY create_cluster_file.bash scripts/
RUN chmod u+x scripts/*.bash && \
mkdir -p logs
VOLUME /var/fdb/data
CMD /var/fdb/scripts/fdb.bash
# Runtime Configuration Options
ENV FDB_PORT 4500
@ -85,12 +87,5 @@ ENV FDB_COORDINATOR_PORT 4500
ENV FDB_CLUSTER_FILE_CONTENTS ""
ENV FDB_PROCESS_CLASS unset
# Adding tini as PID 1 https://github.com/krallin/tini
ARG TINI_VERSION=v0.19.0
RUN curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64 && \
curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64.sha256sum && \
sha256sum -c tini-amd64.sha256sum && \
rm -f tini-amd64.sha256sum && \
chmod +x tini-amd64 && \
mv tini-amd64 /usr/bin/tini
ENTRYPOINT ["/usr/bin/tini", "-g", "--"]
CMD /var/fdb/scripts/fdb.bash

View File

View File

@ -19,8 +19,8 @@
COMPOSE_PROJECT_NAME=fdbgolangsample
FDB_API_VERSION=620
FDB_VERSION=6.2.28
FDB_API_VERSION=630
FDB_VERSION=6.3.12
FDB_COORDINATOR=fdb-coordinator
FDB_NETWORKING_MODE=container
FDB_COORDINATOR_PORT=4500
FDB_COORDINATOR_PORT=4500

View File

@ -19,6 +19,6 @@
module foundationdb.org/docker/samples/golang/v0/fdb-demo-golang
go 1.13
go 1.16
require github.com/apple/foundationdb/bindings/go v0.0.0-20191129023120-e16ae7cadf80
require github.com/apple/foundationdb/bindings/go v0.0.0-20210414233633-40942b2d9d13

View File

@ -1,2 +1,4 @@
github.com/apple/foundationdb/bindings/go v0.0.0-20191129023120-e16ae7cadf80 h1:VKL6OsaB8X91ijz5DEDOw2lBIxmqTUVm5A//EExEyvo=
github.com/apple/foundationdb/bindings/go v0.0.0-20191129023120-e16ae7cadf80/go.mod h1:OMVSB21p9+xQUIqlGizHPZfjK+SHws1ht+ZytVDoz9U=
github.com/apple/foundationdb/bindings/go v0.0.0-20210414233633-40942b2d9d13 h1:RxQG4vcIkRCjxCtN/QFm9SkMGvikjStR4TLHR0Z78+8=
github.com/apple/foundationdb/bindings/go v0.0.0-20210414233633-40942b2d9d13/go.mod h1:w63jdZTFCtvdjsUj5yrdKgjxaAD5uXQX6hJ7EaiLFRs=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -0,0 +1,75 @@
# Dockerfile
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2018-2019 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM python:3.9-slim
RUN apt-get update && \
apt-get install -y --no-install-recommends curl && \
rm -rf /var/lub/apt/lists/*
COPY misc/tini-amd64.sha256sum /tmp/
# Adding tini as PID 1 https://github.com/krallin/tini
ARG TINI_VERSION=v0.19.0
RUN curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64 && \
sha256sum -c /tmp/tini-amd64.sha256sum && \
chmod +x tini-amd64 && \
mv tini-amd64 /usr/bin/tini
COPY sidecar/requirements.txt /tmp
RUN pip install -r tmp/requirements.txt
ARG FDB_VERSION=
ARG FDB_ADDITIONAL_VERSIONS="6.2.30 6.1.13"
ARG FDB_WEBSITE=https://www.foundationdb.org
WORKDIR /var/fdb/tmp
COPY website /mnt/website/
# Install FoundationDB Binaries
RUN curl $FDB_WEBSITE/downloads/$FDB_VERSION/linux/fdb_$FDB_VERSION.tar.gz | tar zxf - --strip-components=1 && \
chmod u+x fdbbackup fdbcli fdbdr fdbmonitor fdbrestore fdbserver backup_agent dr_agent && \
mv fdbbackup fdbcli fdbdr fdbmonitor fdbrestore fdbserver backup_agent dr_agent /usr/bin && \
rm -r /var/fdb/tmp
WORKDIR /
# Set Up Runtime Scripts and Directories
ADD sidecar/entrypoint.bash sidecar/sidecar.py /
RUN chmod a+x /entrypoint.bash /sidecar.py
# Install additional FoundationDB Client Libraries
RUN mkdir -p /var/fdb/lib && \
for version in $FDB_ADDITIONAL_VERSIONS; do curl $FDB_WEBSITE/downloads/$version/linux/libfdb_c_$version.so -o /var/fdb/lib/libfdb_c_${version%.*}.so; done
RUN rm -rf /mnt/website
RUN echo ${FDB_VERSION} > /var/fdb/version && \
mkdir -p /var/fdb/lib && \
groupadd --gid 4059 fdb && \
useradd --gid 4059 --uid 4059 --no-create-home --shell /bin/bash fdb
VOLUME /var/input-files
VOLUME /var/output-files
USER fdb
ENV LISTEN_PORT 8080
ENTRYPOINT ["/usr/bin/tini", "-g", "--", "/entrypoint.bash"]

View File

@ -0,0 +1,27 @@
#! /bin/bash
# entrypoint.bash
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2018-2019 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
if [[ -n "$ADDITIONAL_ENV_FILE" ]]; then
source $ADDITIONAL_ENV_FILE
fi
/sidecar.py $*

View File

@ -0,0 +1 @@
watchdog==0.9.0

View File

@ -0,0 +1,633 @@
#! /usr/bin/env python3
# entrypoint.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2018-2019 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import hashlib
import http.server
import logging
import json
import os
import shutil
import socket
import ssl
import stat
import time
import traceback
import sys
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class Config(object):
def __init__(self):
parser = argparse.ArgumentParser(description="FoundationDB Kubernetes Sidecar")
parser.add_argument(
"--init-mode",
help=(
"Whether to run the sidecar in init mode "
"which causes it to copy the files once and "
"exit without starting a server."
),
action="store_true",
)
parser.add_argument(
"--bind-address", help="IP and port to bind on", default="0.0.0.0:8080"
)
parser.add_argument(
"--tls",
help=("This flag enables TLS for incoming " "connections"),
action="store_true",
)
parser.add_argument(
"--tls-certificate-file",
help=(
"The path to the certificate file for TLS "
"connections. If this is not provided we "
"will take the path from the "
"FDB_TLS_CERTIFICATE_FILE environment "
"variable."
),
)
parser.add_argument(
"--tls-ca-file",
help=(
"The path to the certificate authority file "
"for TLS connections If this is not "
"provided we will take the path from the "
"FDB_TLS_CA_FILE environment variable."
),
)
parser.add_argument(
"--tls-key-file",
help=(
"The path to the key file for TLS "
"connections. If this is not provided we "
"will take the path from the "
"FDB_TLS_KEY_FILE environment "
"variable."
),
)
parser.add_argument(
"--tls-verify-peers",
help=(
"The peer verification rules for incoming "
"TLS connections. If this is not provided "
"we will take the rules from the "
"FDB_TLS_VERIFY_PEERS environment variable. "
"The format of this is the same as the TLS "
"peer verification rules in FoundationDB."
),
)
parser.add_argument(
"--input-dir",
help=("The directory containing the input files " "the config map."),
default="/var/input-files",
)
parser.add_argument(
"--output-dir",
help=(
"The directory into which the sidecar should "
"place the file it generates."
),
default="/var/output-files",
)
parser.add_argument(
"--substitute-variable",
help=(
"A custom environment variable that should "
"available for substitution in the monitor "
"conf."
),
action="append",
)
parser.add_argument(
"--copy-file",
help=("A file to copy from the config map to the " "output directory."),
action="append",
)
parser.add_argument(
"--copy-binary",
help=("A binary to copy from the to the output" "directory."),
action="append",
)
parser.add_argument(
"--copy-library",
help=(
"A version of the client library to copy " "to the output directory."
),
action="append",
)
parser.add_argument(
"--input-monitor-conf",
help=("The name of a monitor conf template in the " "input files"),
)
parser.add_argument(
"--main-container-version",
help=("The version of the main foundationdb " "container in the pod"),
)
parser.add_argument(
"--main-container-conf-dir",
help=(
"The directory where the dynamic conf "
"written by the sidecar will be mounted in "
"the main container."
),
default="/var/dynamic-conf",
)
parser.add_argument(
"--require-not-empty",
help=(
"A file that must be present and non-empty " "in the input directory"
),
action="append",
)
args = parser.parse_args()
self.bind_address = args.bind_address
self.input_dir = args.input_dir
self.output_dir = args.output_dir
self.enable_tls = args.tls
self.copy_files = args.copy_file or []
self.copy_binaries = args.copy_binary or []
self.copy_libraries = args.copy_library or []
self.input_monitor_conf = args.input_monitor_conf
self.init_mode = args.init_mode
self.main_container_version = args.main_container_version
self.require_not_empty = args.require_not_empty
with open("/var/fdb/version") as version_file:
self.primary_version = version_file.read().strip()
version_split = self.primary_version.split(".")
self.minor_version = [int(version_split[0]), int(version_split[1])]
forbid_deprecated_environment_variables = self.is_at_least([6, 3])
if self.enable_tls:
self.certificate_file = args.tls_certificate_file or os.getenv(
"FDB_TLS_CERTIFICATE_FILE"
)
assert self.certificate_file, (
"You must provide a certificate file, either through the "
"tls_certificate_file argument or the FDB_TLS_CERTIFICATE_FILE "
"environment variable"
)
self.ca_file = args.tls_ca_file or os.getenv("FDB_TLS_CA_FILE")
assert self.ca_file, (
"You must provide a CA file, either through the tls_ca_file "
"argument or the FDB_TLS_CA_FILE environment variable"
)
self.key_file = args.tls_key_file or os.getenv("FDB_TLS_KEY_FILE")
assert self.key_file, (
"You must provide a key file, either through the tls_key_file "
"argument or the FDB_TLS_KEY_FILE environment variable"
)
self.peer_verification_rules = args.tls_verify_peers or os.getenv(
"FDB_TLS_VERIFY_PEERS"
)
self.substitutions = {}
for key in [
"FDB_PUBLIC_IP",
"FDB_MACHINE_ID",
"FDB_ZONE_ID",
"FDB_INSTANCE_ID",
]:
self.substitutions[key] = os.getenv(key, "")
if self.substitutions["FDB_MACHINE_ID"] == "":
self.substitutions["FDB_MACHINE_ID"] = os.getenv("HOSTNAME", "")
if self.substitutions["FDB_ZONE_ID"] == "":
self.substitutions["FDB_ZONE_ID"] = self.substitutions["FDB_MACHINE_ID"]
if self.substitutions["FDB_PUBLIC_IP"] == "":
address_info = socket.getaddrinfo(
self.substitutions["FDB_MACHINE_ID"],
4500,
family=socket.AddressFamily.AF_INET,
)
if len(address_info) > 0:
self.substitutions["FDB_PUBLIC_IP"] = address_info[0][4][0]
if self.main_container_version == self.primary_version:
self.substitutions["BINARY_DIR"] = "/usr/bin"
else:
self.substitutions["BINARY_DIR"] = target_path = str(
Path("%s/bin/%s" % (args.main_container_conf_dir, self.primary_version))
)
for variable in args.substitute_variable or []:
self.substitutions[variable] = os.getenv(variable)
if forbid_deprecated_environment_variables:
for variable in [
"SIDECAR_CONF_DIR",
"INPUT_DIR",
"OUTPUT_DIR",
"COPY_ONCE",
]:
if os.getenv(variable):
print(
f"""Environment variable {variable} is not supported in this version of FoundationDB.
Please use the command-line arguments instead."""
)
sys.exit(1)
if os.getenv("SIDECAR_CONF_DIR"):
with open(
os.path.join(os.getenv("SIDECAR_CONF_DIR"), "config.json")
) as conf_file:
config = json.load(conf_file)
else:
config = {}
if os.getenv("INPUT_DIR"):
self.input_dir = os.getenv("INPUT_DIR")
if os.getenv("OUTPUT_DIR"):
self.output_dir = os.getenv("OUTPUT_DIR")
if "ADDITIONAL_SUBSTITUTIONS" in config and config["ADDITIONAL_SUBSTITUTIONS"]:
for key in config["ADDITIONAL_SUBSTITUTIONS"]:
self.substitutions[key] = os.getenv(key, key)
if "COPY_FILES" in config and config["COPY_FILES"]:
self.copy_files.extend(config["COPY_FILES"])
if "COPY_BINARIES" in config and config["COPY_BINARIES"]:
self.copy_binaries.extend(config["COPY_BINARIES"])
if "COPY_LIBRARIES" in config and config["COPY_LIBRARIES"]:
self.copy_libraries.extend(config["COPY_LIBRARIES"])
if "INPUT_MONITOR_CONF" in config and config["INPUT_MONITOR_CONF"]:
self.input_monitor_conf = config["INPUT_MONITOR_CONF"]
if os.getenv("COPY_ONCE", "0") == "1":
self.init_mode = True
@classmethod
def shared(cls):
if cls.shared_config:
return cls.shared_config
cls.shared_config = Config()
return cls.shared_config
shared_config = None
def is_at_least(self, target_version):
return self.minor_version[0] > target_version[0] or (
self.minor_version[0] == target_version[0]
and self.minor_version[1] >= target_version[1]
)
class Server(http.server.BaseHTTPRequestHandler):
ssl_context = None
@classmethod
def start(cls):
"""
This method starts the server.
"""
config = Config.shared()
(address, port) = config.bind_address.split(":")
log.info("Listening on %s:%s" % (address, port))
httpd = http.server.HTTPServer((address, int(port)), cls)
if config.enable_tls:
context = Server.load_ssl_context()
httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
observer = Observer()
event_handler = CertificateEventHandler()
for path in set(
[
Path(config.certificate_file).parent.as_posix(),
Path(config.key_file).parent.as_posix(),
]
):
observer.schedule(event_handler, path)
observer.start()
httpd.serve_forever()
@classmethod
def load_ssl_context(cls):
config = Config.shared()
if not cls.ssl_context:
cls.ssl_context = ssl.create_default_context(cafile=config.ca_file)
cls.ssl_context.check_hostname = False
cls.ssl_context.verify_mode = ssl.CERT_REQUIRED
cls.ssl_context.load_cert_chain(config.certificate_file, config.key_file)
return cls.ssl_context
def send_text(self, text, code=200, content_type="text/plain", add_newline=True):
"""
This method sends a text response.
"""
if add_newline:
text += "\n"
self.send_response(code)
response = bytes(text, encoding="utf-8")
self.send_header("Content-Length", str(len(response)))
self.send_header("Content-Type", content_type)
self.end_headers()
self.wfile.write(response)
def check_request_cert(self):
config = Config.shared()
approved = not config.enable_tls or self.check_cert(
self.connection.getpeercert(), config.peer_verification_rules
)
if not approved:
self.send_error(401, "Client certificate was not approved")
return approved
def check_cert(self, cert, rules):
"""
This method checks that the client's certificate is valid.
If there is any problem with the certificate, this will return a string
describing the error.
"""
if not rules:
return True
for option in rules.split(";"):
option_valid = True
for rule in option.split(","):
if not self.check_cert_rule(cert, rule):
option_valid = False
break
if option_valid:
return True
return False
def check_cert_rule(self, cert, rule):
(key, expected_value) = rule.split("=", 1)
if "." in key:
(scope_key, field_key) = key.split(".", 1)
else:
scope_key = "S"
field_key = key
if scope_key == "S" or scope_key == "Subject":
scope_name = "subject"
elif scope_key == "I" or scope_key == "Issuer":
scope_name = "issuer"
elif scope_key == "R" or scope_key == "Root":
scope_name = "root"
else:
assert False, "Unknown certificate scope %s" % scope_key
if scope_name not in cert:
return False
rdns = None
operator = ""
if field_key == "CN":
field_name = "commonName"
elif field_key == "C":
field_name = "country"
elif field_key == "L":
field_name = "localityName"
elif field_key == "ST":
field_name = "stateOrProvinceName"
elif field_key == "O":
field_name = "organizationName"
elif field_key == "OU":
field_name = "organizationalUnitName"
elif field_key == "UID":
field_name = "userId"
elif field_key == "DC":
field_name = "domainComponent"
elif field_key.startswith("subjectAltName") and scope_name == "subject":
operator = field_key[14:]
field_key = field_key[0:14]
(field_name, expected_value) = expected_value.split(":", 1)
if field_key not in cert:
return False
rdns = [cert["subjectAltName"]]
else:
assert False, "Unknown certificate field %s" % field_key
if not rdns:
rdns = list(cert[scope_name])
for rdn in rdns:
for entry in list(rdn):
if entry[0] == field_name:
if operator == "" and entry[1] == expected_value:
return True
elif operator == "<" and entry[1].endswith(expected_value):
return True
elif operator == ">" and entry[1].startswith(expected_value):
return True
def do_GET(self):
"""
This method executes a GET request.
"""
try:
if not self.check_request_cert():
return
if self.path.startswith("/check_hash/"):
try:
self.send_text(check_hash(self.path[12:]), add_newline=False)
except FileNotFoundError:
self.send_error(404, "Path not found")
self.end_headers()
elif self.path == "/ready":
self.send_text(ready())
elif self.path == "/substitutions":
self.send_text(get_substitutions())
else:
self.send_error(404, "Path not found")
self.end_headers()
except RequestException as e:
self.send_error(400, e.message)
except Exception as ex:
log.error(f"Error processing request {ex}", exc_info=True)
self.send_error(500)
self.end_headers()
def do_POST(self):
"""
This method executes a POST request.
"""
try:
if not self.check_request_cert():
return
if self.path == "/copy_files":
self.send_text(copy_files())
elif self.path == "/copy_binaries":
self.send_text(copy_binaries())
elif self.path == "/copy_libraries":
self.send_text(copy_libraries())
elif self.path == "/copy_monitor_conf":
self.send_text(copy_monitor_conf())
elif self.path == "/refresh_certs":
self.send_text(refresh_certs())
elif self.path == "/restart":
self.send_text("OK")
exit(1)
else:
self.send_error(404, "Path not found")
self.end_headers()
except SystemExit as e:
raise e
except RequestException as e:
self.send_error(400, e.message)
except e:
log.error("Error processing request", exc_info=True)
self.send_error(500)
self.end_headers()
def log_message(self, format, *args):
log.info(format % args)
class CertificateEventHandler(FileSystemEventHandler):
def on_any_event(self, event):
log.info("Detected change to certificates")
time.sleep(10)
log.info("Reloading certificates")
Server.load_ssl_context()
def check_hash(filename):
with open(os.path.join(Config.shared().output_dir, filename), "rb") as contents:
m = hashlib.sha256()
m.update(contents.read())
return m.hexdigest()
def copy_files():
config = Config.shared()
if config.require_not_empty:
for filename in config.require_not_empty:
path = os.path.join(config.input_dir, filename)
if not os.path.isfile(path) or os.path.getsize(path) == 0:
raise Exception("No contents for file %s" % path)
for filename in config.copy_files:
tmp_file = os.path.join(config.output_dir, f"{filename}.tmp")
shutil.copy(os.path.join(config.input_dir, filename), tmp_file)
os.replace(tmp_file, os.path.join(config.output_dir, filename))
return "OK"
def copy_binaries():
config = Config.shared()
if config.main_container_version != config.primary_version:
for binary in config.copy_binaries:
path = Path(f"/usr/bin/{binary}")
target_path = Path(
f"{config.output_dir}/bin/{config.primary_version}/{binary}"
)
if not target_path.exists():
target_path.parent.mkdir(parents=True, exist_ok=True)
tmp_file = f"{target_path}.tmp"
shutil.copy(path, tmp_file)
os.replace(tmp_file, target_path)
target_path.chmod(0o744)
return "OK"
def copy_libraries():
config = Config.shared()
for version in config.copy_libraries:
path = Path(f"/var/fdb/lib/libfdb_c_{version}.so")
if version == config.copy_libraries[0]:
target_path = Path(f"{config.output_dir}/lib/libfdb_c.so")
else:
target_path = Path(
f"{config.output_dir}/lib/multiversion/libfdb_c_{version}.so"
)
if not target_path.exists():
target_path.parent.mkdir(parents=True, exist_ok=True)
tmp_file = f"{target_path}.tmp"
shutil.copy(path, tmp_file)
os.replace(tmp_file, target_path)
return "OK"
def copy_monitor_conf():
config = Config.shared()
if config.input_monitor_conf:
with open(
os.path.join(config.input_dir, config.input_monitor_conf)
) as monitor_conf_file:
monitor_conf = monitor_conf_file.read()
for variable in config.substitutions:
monitor_conf = monitor_conf.replace(
"$" + variable, config.substitutions[variable]
)
tmp_file = os.path.join(config.output_dir, "fdbmonitor.conf.tmp")
target_file = os.path.join(config.output_dir, "fdbmonitor.conf")
with open(tmp_file, "w") as output_conf_file:
output_conf_file.write(monitor_conf)
os.replace(tmp_file, target_file)
return "OK"
def get_substitutions():
return json.dumps(Config.shared().substitutions)
def ready():
return "OK"
def refresh_certs():
if not Config.shared().enable_tls:
raise RequestException("Server is not using TLS")
Server.load_ssl_context()
return "OK"
class RequestException(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s")
copy_files()
copy_binaries()
copy_libraries()
copy_monitor_conf()
if not Config.shared().init_mode:
Server.start()

View File

@ -263,6 +263,10 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES status/separate_not_enough_servers.txt)
add_fdb_test(TEST_FILES status/single_process_too_many_config_params.txt)
add_test(
NAME multiversion_client/unit_tests
COMMAND $<TARGET_FILE:fdbserver> -r unittests -f /fdbclient/multiversionclient/
)
verify_testing()
if (NOT OPEN_FOR_IDE AND NOT WIN32)