The multi-version client monitors the cluster's protocol version and only activates the client library that can connect.

This commit is contained in:
A.J. Beamon 2021-04-15 11:45:14 -07:00
parent d3b6a543ab
commit b2d6930103
15 changed files with 350 additions and 306 deletions

View File

@ -364,7 +364,7 @@ extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d
return DB(d)->getMainThreadBusyness();
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is non-zero, the future won't return until the protocol version is different than expected
extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version) {
Optional<ProtocolVersion> expected;

View File

@ -35,6 +35,7 @@ constexpr UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE(-1, 3);
constexpr UID WLTOKEN_PROTOCOL_INFO(-1, 10);
// The coordinator interface as exposed to clients
struct ClientLeaderRegInterface {
RequestStream<struct GetLeaderRequest> getLeader;
RequestStream<struct OpenDatabaseCoordRequest> openDatabase;
@ -42,6 +43,10 @@ struct ClientLeaderRegInterface {
ClientLeaderRegInterface() {}
ClientLeaderRegInterface(NetworkAddress remote);
ClientLeaderRegInterface(INetwork* local);
bool operator==(const ClientLeaderRegInterface& rhs) const {
return getLeader == rhs.getLeader && openDatabase == rhs.openDatabase;
}
};
class ClusterConnectionString {

View File

@ -152,6 +152,7 @@ public:
return (DatabaseContext*)DatabaseContext::operator new(sizeof(DatabaseContext));
}
// Static constructor used by server processes to create a DatabaseContext
// For internal (fdbserver) use only
static Database create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
Future<Void> clientInfoMonitor,
@ -164,9 +165,11 @@ public:
~DatabaseContext();
// Constructs a new copy of this DatabaseContext from the parameters of this DatabaseContext
Database clone() const {
return Database(new DatabaseContext(connectionFile,
clientInfo,
coordinator,
clientInfoMonitor,
taskID,
clientLocality,
@ -196,6 +199,10 @@ public:
Future<Void> onProxiesChanged();
Future<HealthMetrics> getHealthMetrics(bool detailed);
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
Future<ProtocolVersion> getClusterProtocol(Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>());
// Update the watch counter for the database
void addWatch();
void removeWatch();
@ -247,6 +254,7 @@ public:
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -380,6 +388,9 @@ public:
Future<Void> clientInfoMonitor;
Future<Void> connected;
// An AsyncVar that reports the coordinator this DatabaseContext is interacting with
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator;
Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface;
Future<Void> statusLeaderMon;
double lastStatusFetch;

View File

@ -100,7 +100,7 @@ public:
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
virtual ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) = 0;

View File

@ -757,6 +757,7 @@ void shrinkProxyList(ClientDBInfo& ni,
ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
MonitorLeaderInfo info,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
@ -774,6 +775,9 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
loop {
state ClientLeaderRegInterface clientLeaderServer(addrs[idx]);
state OpenDatabaseCoordRequest req;
coordinator->set(clientLeaderServer);
req.clusterKey = cs.clusterKey();
req.coordinators = cs.coordinators();
req.knownClientInfoID = clientInfo->get().id;
@ -840,13 +844,14 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
ACTOR Future<Void> monitorProxies(
Reference<AsyncVar<Reference<ClusterConnectionFile>>> connFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
state MonitorLeaderInfo info(connFile->get());
loop {
choose {
when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration(
connFile->get(), clientInfo, info, supportedVersions, traceLogGroup))) {
connFile->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
info = _info;
}
when(wait(connFile->onChange())) {

View File

@ -76,6 +76,7 @@ Future<Void> monitorLeaderForProxies(Value const& key,
Future<Void> monitorProxies(
Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile,
Reference<AsyncVar<ClientDBInfo>> const& clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> const& coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> const& supportedVersions,
Key const& traceLogGroup);

View File

@ -356,7 +356,7 @@ double DLDatabase::getMainThreadBusyness() {
return 0;
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
ASSERT(api->databaseGetServerProtocol != nullptr);
@ -877,35 +877,35 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
int threadIdx,
std::string clusterFilePath,
Reference<IDatabase> db,
Reference<IDatabase> versionMonitorDb,
bool openConnectors)
: dbState(new DatabaseState()), clusterFilePath(clusterFilePath) {
: dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) {
dbState->db = db;
dbState->dbVar->set(db);
if (!openConnectors) {
dbState->currentClientIndex = 0;
} else {
if (openConnectors) {
if (!api->localClientDisabled) {
dbState->currentClientIndex = 0;
dbState->addConnection(api->getLocalClient(), clusterFilePath);
} else {
dbState->currentClientIndex = -1;
dbState->addClient(api->getLocalClient());
}
api->runOnExternalClients(threadIdx, [this, clusterFilePath](Reference<ClientInfo> client) {
dbState->addConnection(client, clusterFilePath);
});
if (!externalClientsInitialized.test_and_set()) {
api->runOnExternalClientsAllThreads([&clusterFilePath](Reference<ClientInfo> client) {
// This creates a database to initialize some client state on the external library,
// but it gets deleted immediately so that we don't keep open connections
Reference<IDatabase> newDb = client->api->createDatabase(clusterFilePath.c_str());
});
}
dbState->startConnections();
api->runOnExternalClients(threadIdx, [this](Reference<ClientInfo> client) { dbState->addClient(client); });
dbState->protocolVersionMonitor = dbState->monitorProtocolVersion();
}
}
MultiVersionDatabase::~MultiVersionDatabase() {
dbState->cancelConnections();
}
// Create a MultiVersionDatabase that wraps an already created IDatabase object
// For internal use in testing
Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, false));
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, db, false));
}
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
@ -963,189 +963,122 @@ double MultiVersionDatabase::getMainThreadBusyness() {
return 0;
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
// TODO: send this out through the active database
return MultiVersionApi::api->getLocalClient()
->api->createDatabase(clusterFilePath.c_str())
->getServerProtocol(expectedVersion);
return dbState->versionMonitorDb->getServerProtocol(expectedVersion);
}
void MultiVersionDatabase::Connector::connect() {
addref();
onMainThreadVoid(
[this]() {
if (!cancelled) {
connected = false;
if (connectionFuture.isValid()) {
connectionFuture.cancel();
}
MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb)
: clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb),
dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(NULL))) {}
candidateDatabase = client->api->createDatabase(clusterFilePath.c_str());
if (client->external) {
connectionFuture = candidateDatabase.castTo<DLDatabase>()->onReady();
} else {
connectionFuture = ThreadFuture<Void>(Void());
}
// Adds a client (local or externally loaded) that can be used to connect to the cluster
void MultiVersionDatabase::DatabaseState::addClient(Reference<ClientInfo> client) {
ProtocolVersion baseVersion = client->protocolVersion.normalizedVersion();
auto [itr, inserted] = clients.insert({ baseVersion, client });
if (!inserted) {
// SOMEDAY: prefer client with higher release version if protocol versions are compatible
Reference<ClientInfo> keptClient = itr->second;
Reference<ClientInfo> discardedClient = client;
if (client->canReplace(itr->second)) {
std::swap(keptClient, discardedClient);
clients[baseVersion] = client;
}
connectionFuture = flatMapThreadFuture<Void, Void>(connectionFuture, [this](ErrorOr<Void> ready) {
if (ready.isError()) {
return ErrorOr<ThreadFuture<Void>>(ready.getError());
}
discardedClient->failed = true;
TraceEvent(SevWarn, "DuplicateClientVersion")
.detail("Keeping", keptClient->libPath)
.detail("KeptProtocolVersion", keptClient->protocolVersion)
.detail("Disabling", discardedClient->libPath)
.detail("DisabledProtocolVersion", discardedClient->protocolVersion);
tr = candidateDatabase->createTransaction();
return ErrorOr<ThreadFuture<Void>>(
mapThreadFuture<Version, Void>(tr->getReadVersion(), [](ErrorOr<Version> v) {
// If the version attempt returns an error, we regard that as a connection (except
// operation_cancelled)
if (v.isError() && v.getError().code() == error_code_operation_cancelled) {
return ErrorOr<Void>(v.getError());
} else {
return ErrorOr<Void>(Void());
}
}));
});
int userParam;
connectionFuture.callOrSetAsCallback(this, userParam, 0);
} else {
delref();
}
},
nullptr);
}
// Only called from main thread
void MultiVersionDatabase::Connector::cancel() {
connected = false;
cancelled = true;
if (connectionFuture.isValid()) {
connectionFuture.cancel();
}
}
void MultiVersionDatabase::Connector::fire(const Void& unused, int& userParam) {
onMainThreadVoid(
[this]() {
if (!cancelled) {
connected = true;
dbState->stateChanged();
}
delref();
},
nullptr);
}
void MultiVersionDatabase::Connector::error(const Error& e, int& userParam) {
if (e.code() != error_code_operation_cancelled) {
// TODO: is it right to abandon this connection attempt?
client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
TraceEvent(SevError, "DatabaseConnectionError").error(e).detail("ClientLibrary", this->client->libPath);
}
delref();
}
MultiVersionDatabase::DatabaseState::DatabaseState()
: dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))), currentClientIndex(-1) {}
// Watch the cluster protocol version for changes and update the database state when it does
ThreadFuture<Void> MultiVersionDatabase::DatabaseState::monitorProtocolVersion() {
ThreadFuture<ProtocolVersion> f = versionMonitorDb->getServerProtocol(dbProtocolVersion);
return mapThreadFuture<ProtocolVersion, Void>(f, [this](ErrorOr<ProtocolVersion> cv) {
if (cv.isError()) {
TraceEvent("ErrorGettingClusterProtocolVersion")
.detail("ExpectedProtocolVersion", dbProtocolVersion)
.error(cv.getError());
}
// Only called from main thread
void MultiVersionDatabase::DatabaseState::stateChanged() {
int newIndex = -1;
for (int i = 0; i < clients.size(); ++i) {
if (i != currentClientIndex && connectionAttempts[i]->connected) {
if (currentClientIndex >= 0 && !clients[i]->canReplace(clients[currentClientIndex])) {
TraceEvent(SevWarn, "DuplicateClientVersion")
.detail("Keeping", clients[currentClientIndex]->libPath)
.detail("KeptClientProtocolVersion", clients[currentClientIndex]->protocolVersion.version())
.detail("Disabling", clients[i]->libPath)
.detail("DisabledClientProtocolVersion", clients[i]->protocolVersion.version());
connectionAttempts[i]->connected = false; // Permanently disable this client in favor of the current one
clients[i]->failed = true;
MultiVersionApi::api->updateSupportedVersions();
return;
ProtocolVersion clusterVersion = !cv.isError() ? cv.get() : dbProtocolVersion.orDefault(currentProtocolVersion);
onMainThreadVoid([this, clusterVersion]() { protocolVersionChanged(clusterVersion); }, nullptr);
return Void();
});
}
// Called when a change to the protocol version of the cluster has been detected. Must be called from the main
// thread.
void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion protocolVersion) {
if (dbProtocolVersion.present() &&
protocolVersion.normalizedVersion() == dbProtocolVersion.get().normalizedVersion()) {
dbProtocolVersion = protocolVersion;
} else {
TraceEvent("ProtocolVersionChanged")
.detail("NewProtocolVersion", protocolVersion)
.detail("OldProtocolVersion", dbProtocolVersion);
dbProtocolVersion = protocolVersion;
auto itr = clients.find(protocolVersion.normalizedVersion());
if (itr != clients.end()) {
auto& client = itr->second;
TraceEvent("CreatingDatabaseOnExternalClient")
.detail("LibraryPath", client->libPath)
.detail("Failed", client->failed);
Reference<IDatabase> newDb = client->api->createDatabase(clusterFilePath.c_str());
optionLock.enter();
for (auto option : options) {
try {
newDb->setOption(
option.first,
option.second.castTo<StringRef>()); // In practice, this will set a deferred error instead
// of throwing. If that happens, the database will be
// unusable (attempts to use it will throw errors).
} catch (Error& e) {
optionLock.leave();
TraceEvent(SevError, "ClusterVersionChangeOptionError")
.error(e)
.detail("Option", option.first)
.detail("OptionValue", option.second)
.detail("LibPath", client->libPath);
client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
db = Reference<IDatabase>(); // If we can't set all of the options on a cluster, we abandon the
// client
break;
}
}
newIndex = i;
break;
}
}
if (newIndex == -1) {
ASSERT_EQ(currentClientIndex, 0); // This can only happen for the local client, which we set as the current
// connection before we know it's connected
return;
}
// Restart connection for replaced client
auto newDb = connectionAttempts[newIndex]->candidateDatabase;
optionLock.enter();
for (auto option : options) {
try {
newDb->setOption(option.first,
option.second.castTo<StringRef>()); // In practice, this will set a deferred error instead
// of throwing. If that happens, the database will be
// unusable (attempts to use it will throw errors).
} catch (Error& e) {
db = newDb;
if (dbProtocolVersion.get().hasStableInterfaces()) {
versionMonitorDb = db;
} else {
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
}
optionLock.leave();
TraceEvent(SevError, "ClusterVersionChangeOptionError")
.error(e)
.detail("Option", option.first)
.detail("OptionValue", option.second)
.detail("LibPath", clients[newIndex]->libPath);
connectionAttempts[newIndex]->connected = false;
clients[newIndex]->failed = true;
MultiVersionApi::api->updateSupportedVersions();
return; // If we can't set all of the options on a cluster, we abandon the client
} else {
db = Reference<IDatabase>();
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
}
dbVar->set(db);
}
db = newDb;
optionLock.leave();
dbVar->set(db);
if (currentClientIndex >= 0 && connectionAttempts[currentClientIndex]->connected) {
connectionAttempts[currentClientIndex]->connected = false;
connectionAttempts[currentClientIndex]->connect();
}
ASSERT(newIndex >= 0 && newIndex < clients.size());
currentClientIndex = newIndex;
protocolVersionMonitor = monitorProtocolVersion();
}
void MultiVersionDatabase::DatabaseState::addConnection(Reference<ClientInfo> client, std::string clusterFilePath) {
clients.push_back(client);
connectionAttempts.push_back(
makeReference<Connector>(Reference<DatabaseState>::addRef(this), client, clusterFilePath));
}
void MultiVersionDatabase::DatabaseState::startConnections() {
for (auto c : connectionAttempts) {
c->connect();
}
}
void MultiVersionDatabase::DatabaseState::cancelConnections() {
addref();
onMainThreadVoid(
[this]() {
for (auto c : connectionAttempts) {
c->cancel();
}
connectionAttempts.clear();
clients.clear();
delref();
},
nullptr);
}
std::atomic_flag MultiVersionDatabase::externalClientsInitialized = ATOMIC_FLAG_INIT;
// MultiVersionApi
bool MultiVersionApi::apiVersionAtLeast(int minVersion) {
ASSERT_NE(MultiVersionApi::api->apiVersion, 0);
return MultiVersionApi::api->apiVersion >= minVersion || MultiVersionApi::api->apiVersion < 0;
@ -1608,6 +1541,7 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void*
}
}
// Creates an IDatabase object that represents a connections to the cluster
Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath) {
lock.enter();
if (!networkSetup) {
@ -1622,28 +1556,21 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath
int threadIdx = nextThread;
nextThread = (nextThread + 1) % threadCount;
lock.leave();
for (auto it : externalClients) {
TraceEvent("CreatingDatabaseOnExternalClient")
.detail("LibraryPath", it.first)
.detail("Failed", it.second[threadIdx]->failed);
}
return Reference<IDatabase>(new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>()));
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
return Reference<IDatabase>(
new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>(), localDb));
}
lock.leave();
ASSERT_LE(threadCount, 1);
auto db = localClient->api->createDatabase(clusterFilePath);
Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath);
if (bypassMultiClientApi) {
return db;
return localDb;
} else {
for (auto it : externalClients) {
TraceEvent("CreatingDatabaseOnExternalClient")
.detail("LibraryPath", it.first)
.detail("Failed", it.second[0]->failed);
}
return Reference<IDatabase>(new MultiVersionDatabase(this, 0, clusterFile, db));
return Reference<IDatabase>(new MultiVersionDatabase(this, 0, clusterFile, Reference<IDatabase>(), localDb));
}
}

View File

@ -271,7 +271,7 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
@ -437,14 +437,14 @@ public:
int threadIdx,
std::string clusterFilePath,
Reference<IDatabase> db,
Reference<IDatabase> versionMonitorDb,
bool openConnectors = true);
~MultiVersionDatabase() override;
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
@ -452,67 +452,59 @@ public:
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
// Create a MultiVersionDatabase that wraps an already created IDatabase object
// For internal use in testing
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
ThreadFuture<int64_t> rebootWorker(const StringRef& address, bool check, int duration) override;
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
private:
struct DatabaseState;
struct Connector : ThreadCallback, ThreadSafeReferenceCounted<Connector> {
Connector(Reference<DatabaseState> dbState, Reference<ClientInfo> client, std::string clusterFilePath)
: dbState(dbState), client(client), clusterFilePath(clusterFilePath), connected(false), cancelled(false) {}
void connect();
void cancel();
bool canFire(int notMadeActive) const override { return true; }
void fire(const Void& unused, int& userParam) override;
void error(const Error& e, int& userParam) override;
const Reference<ClientInfo> client;
const std::string clusterFilePath;
const Reference<DatabaseState> dbState;
ThreadFuture<Void> connectionFuture;
Reference<IDatabase> candidateDatabase;
Reference<ITransaction> tr;
bool connected;
bool cancelled;
};
// private:
// A struct that manages the current connection state of the MultiVersionDatabase. This wraps the underlying
// IDatabase object that is currently interacting with the cluster.
struct DatabaseState : ThreadSafeReferenceCounted<DatabaseState> {
DatabaseState();
DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb);
void stateChanged();
void addConnection(Reference<ClientInfo> client, std::string clusterFilePath);
void startConnections();
void cancelConnections();
// Called when a change to the protocol version of the cluster has been detected. Must be called from the main
// thread.
void protocolVersionChanged(ProtocolVersion protocolVersion);
// Adds a client (local or externally loaded) that can be used to connect to the cluster
void addClient(Reference<ClientInfo> client);
// Watch the cluster protocol version for changes and update the database state when it does
ThreadFuture<Void> monitorProtocolVersion();
Reference<IDatabase> db;
const Reference<ThreadSafeAsyncVar<Reference<IDatabase>>> dbVar;
std::string clusterFilePath;
// Used to monitor the cluster protocol version. Will be the same as db unless we have either not connected
// yet or if the client version associated with db does not support protocol monitoring. In those cases, this
// will be a specially created local db.
Reference<IDatabase> versionMonitorDb;
ThreadFuture<Void> changed;
bool cancelled;
int currentClientIndex;
std::vector<Reference<ClientInfo>> clients;
std::vector<Reference<Connector>> connectionAttempts;
ThreadFuture<Void> protocolVersionMonitor;
Optional<ProtocolVersion> dbProtocolVersion;
std::map<ProtocolVersion, Reference<ClientInfo>> clients;
std::vector<std::pair<FDBDatabaseOptions::Option, Optional<Standalone<StringRef>>>> options;
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaultOptions;
Mutex optionLock;
};
std::string clusterFilePath;
const Reference<DatabaseState> dbState;
friend class MultiVersionTransaction;
// Clients must create a database object in order to initialize some of their state.
// This needs to be done only once, and this flag tracks whether that has happened.
static std::atomic_flag externalClientsInitialized;
};
// An implementation of IClientApi that can choose between multiple different client implementations either provided
@ -530,6 +522,7 @@ public:
void stopNetwork() override;
void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override;
// Creates an IDatabase object that represents a connections to the cluster
Reference<IDatabase> createDatabase(const char* clusterFilePath) override;
static MultiVersionApi* api;

View File

@ -898,6 +898,7 @@ Future<Standalone<RangeResultRef>> HealthMetricsRangeImpl::getRange(ReadYourWrit
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -906,9 +907,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
bool internal,
int apiVersion,
bool switchable)
: connectionFile(connectionFile), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID),
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
apiVersion(apiVersion), switchable(switchable), proxyProvisional(false), cc("TransactionMetrics"),
: connectionFile(connectionFile), clientInfo(clientInfo), coordinator(coordinator),
clientInfoMonitor(clientInfoMonitor), taskID(taskID), clientLocality(clientLocality),
enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware), apiVersion(apiVersion),
switchable(switchable), proxyProvisional(false), cc("TransactionMetrics"),
transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
transactionReadVersionBatches("ReadVersionBatches", cc),
@ -1156,6 +1158,8 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), internal(false),
transactionTracingEnabled(true) {}
// Static constructor used by server processes to create a DatabaseContext
// For internal (fdbserver) use only
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
Future<Void> clientInfoMonitor,
LocalityData clientLocality,
@ -1166,6 +1170,7 @@ Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
bool switchable) {
return Database(new DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>>(),
clientInfo,
makeReference<AsyncVar<Optional<ClientLeaderRegInterface>>>(),
clientInfoMonitor,
taskID,
clientLocality,
@ -1446,6 +1451,9 @@ void DatabaseContext::expireThrottles() {
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
// Creates a database object that represents a connection to a cluster
// This constructor uses a preallocated DatabaseContext that may have been created
// on another thread
Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
int apiVersion,
bool internal,
@ -1492,15 +1500,20 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
g_network->initTLS();
auto clientInfo = makeReference<AsyncVar<ClientDBInfo>>();
auto coordinator = makeReference<AsyncVar<Optional<ClientLeaderRegInterface>>>();
auto connectionFile = makeReference<AsyncVar<Reference<ClusterConnectionFile>>>();
connectionFile->set(connFile);
Future<Void> clientInfoMonitor = monitorProxies(
connectionFile, clientInfo, networkOptions.supportedVersions, StringRef(networkOptions.traceLogGroup));
Future<Void> clientInfoMonitor = monitorProxies(connectionFile,
clientInfo,
coordinator,
networkOptions.supportedVersions,
StringRef(networkOptions.traceLogGroup));
DatabaseContext* db;
if (preallocatedDb) {
db = new (preallocatedDb) DatabaseContext(connectionFile,
clientInfo,
coordinator,
clientInfoMonitor,
TaskPriority::DefaultEndpoint,
clientLocality,
@ -1512,6 +1525,7 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
} else {
db = new DatabaseContext(connectionFile,
clientInfo,
coordinator,
clientInfoMonitor,
TaskPriority::DefaultEndpoint,
clientLocality,
@ -4872,48 +4886,95 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
return versionstampPromise.getFuture();
}
ACTOR Future<ProtocolVersion> coordinatorProtocolsFetcher(Reference<ClusterConnectionFile> f) {
state ClientCoordinators coord(f);
// Gets the protocol version reported by a coordinator via the protocol info interface
ACTOR Future<ProtocolVersion> getCoordinatorProtocol(NetworkAddressList coordinatorAddresses) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{ { coordinatorAddresses }, WLTOKEN_PROTOCOL_INFO } };
ProtocolInfoReply reply = wait(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
wait(smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5));
std::unordered_map<uint64_t, int> protocolCount;
for (int i = 0; i < coordProtocols.size(); i++) {
if (coordProtocols[i].isReady()) {
protocolCount[coordProtocols[i].get().version.version()]++;
}
}
uint64_t majorityProtocol = std::max_element(protocolCount.begin(),
protocolCount.end(),
[](const std::pair<uint64_t, int>& l,
const std::pair<uint64_t, int>& r) { return l.second < r.second; })
->first;
return ProtocolVersion(majorityProtocol);
return reply.version;
}
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion) {
// Gets the protocol version reported by a coordinator in its connect packet
// If we are unable to get a version from the connect packet (e.g. because we lost connection with the peer), then this
// function will return with an unset result.
// If an expected version is given, this future won't return if the actual protocol version matches the expected version
ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
NetworkAddress coordinatorAddress,
Optional<ProtocolVersion> expectedVersion) {
state Reference<AsyncVar<Optional<ProtocolVersion>>> protocolVersion =
FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress);
loop {
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f));
if (!expectedVersion.present() || protocolVersion != expectedVersion.get()) {
return protocolVersion;
} else {
wait(delay(2.0)); // TODO: this is temporary, so not making into a knob yet
if (protocolVersion->get().present() &&
(!expectedVersion.present() || expectedVersion.get() != protocolVersion->get().get())) {
return protocolVersion->get();
}
Future<Void> change = protocolVersion->onChange();
if (!protocolVersion->get().present()) {
// If we still don't have any connection info after a timeout, retry sending the protocol version request
change = timeout(change, FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Void());
}
wait(change);
if (!protocolVersion->get().present()) {
return protocolVersion->get();
}
}
}
// Returns the protocol version reported by the given coordinator
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Optional<ProtocolVersion> expectedVersion) {
state bool needToConnect = true;
state Future<ProtocolVersion> protocolVersion = Never();
loop {
if (!coordinator->get().present()) {
wait(coordinator->onChange());
} else {
Endpoint coordinatorEndpoint = coordinator->get().get().getLeader.getEndpoint();
if (needToConnect) {
// Even though we typically rely on the connect packet to get the protocol version, we need to send some
// request in order to start a connection. This protocol version request serves that purpose.
protocolVersion = getCoordinatorProtocol(coordinatorEndpoint.addresses);
needToConnect = false;
}
choose {
when(wait(coordinator->onChange())) { needToConnect = true; }
when(ProtocolVersion pv = wait(protocolVersion)) {
if (!expectedVersion.present() || expectedVersion.get() != pv) {
return pv;
}
}
// Older versions of FDB don't have an endpoint to return the protocol version, so we get this info from
// the connect packet
when(Optional<ProtocolVersion> pv = wait(getCoordinatorProtocolFromConnectPacket(
coordinatorEndpoint.getPrimaryAddress(), expectedVersion))) {
if (pv.present()) {
return pv.get();
} else {
needToConnect = true;
}
}
}
}
}
}
// Returns the protocol version reported by the coordinator this client is currently connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
Future<ProtocolVersion> DatabaseContext::getClusterProtocol(Optional<ProtocolVersion> expectedVersion) {
return getClusterProtocolImpl(coordinator, expectedVersion);
}
uint32_t Transaction::getSize() {
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
tr.transaction.write_conflict_ranges.expectedSize();

View File

@ -76,11 +76,15 @@ class Database {
public:
enum { API_VERSION_LATEST = -1 };
// Creates a database object that represents a connection to a cluster
// This constructor uses a preallocated DatabaseContext that may have been created
// on another thread
static Database createDatabase(Reference<ClusterConnectionFile> connFile,
int apiVersion,
bool internal = true,
LocalityData const& clientLocality = LocalityData(),
DatabaseContext* preallocatedDb = nullptr);
static Database createDatabase(std::string connFileName,
int apiVersion,
bool internal = true,
@ -400,11 +404,6 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
// Returns the protocol version reported by a quorum of coordinators
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocol(Reference<ClusterConnectionFile> f,
Optional<ProtocolVersion> expectedVersion);
inline uint64_t getWriteOperationCost(uint64_t bytes) {
return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1;
}

View File

@ -97,13 +97,12 @@ double ThreadSafeDatabase::getMainThreadBusyness() {
return g_network->networkInfo.metrics.networkBusyness;
}
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
DatabaseContext* db = this->db;
return onMainThread([db, expectedVersion]() -> Future<ProtocolVersion> {
return getClusterProtocol(db->getConnectionFile(), expectedVersion);
});
return onMainThread(
[db, expectedVersion]() -> Future<ProtocolVersion> { return db->getClusterProtocol(expectedVersion); });
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {

View File

@ -39,7 +39,7 @@ public:
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
// Returns the protocol version reported by a quorum of coordinators
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;

View File

@ -760,6 +760,13 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
conn->close();
conn = Reference<IConnection>();
// Old versions will throw this error, and we don't want to forget their protocol versions.
// This means we can't tell the difference between an old protocol version and one we
// can no longer connect to.
if (e.code() != error_code_incompatible_protocol_version) {
self->protocolVersion->set(Optional<ProtocolVersion>());
}
}
// Clients might send more packets in response, which needs to go out on the next connection
@ -787,7 +794,8 @@ Peer::Peer(TransportData* transport, NetworkAddress const& destination)
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0),
bytesSent(0), lastLoggedBytesSent(0), lastLoggedTime(0.0), connectOutgoingCount(0), connectIncomingCount(0),
connectFailedCount(0), connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1) {
connectFailedCount(0), connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1),
protocolVersion(Reference<AsyncVar<Optional<ProtocolVersion>>>(new AsyncVar<Optional<ProtocolVersion>>())) {
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
}
@ -1103,12 +1111,12 @@ static int getNewBufferSize(const uint8_t* begin,
packetLen + sizeof(uint32_t) * (peerAddress.isTLS() ? 2 : 3));
}
// This actor exists whenever there is an open or opening connection, whether incoming or outgoing
// For incoming connections conn is set and peer is initially nullptr; for outgoing connections it is the reverse
ACTOR static Future<Void> connectionReader(TransportData* transport,
Reference<IConnection> conn,
Reference<Peer> peer,
Promise<Reference<Peer>> onConnected) {
// This actor exists whenever there is an open or opening connection, whether incoming or outgoing
// For incoming connections conn is set and peer is initially nullptr; for outgoing connections it is the reverse
state Arena arena;
state uint8_t* unprocessed_begin = nullptr;
@ -1209,6 +1217,7 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
if (!protocolVersion.hasMultiVersionClient()) {
// Older versions expected us to hang up. It may work even if we don't hang up here, but
// it's safer to keep the old behavior.
peer->protocolVersion->set(peerProtocolVersion);
throw incompatible_protocol_version();
}
} else {
@ -1256,6 +1265,7 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
onConnected.send(peer);
wait(delay(0)); // Check for cancellation
}
peer->protocolVersion->set(peerProtocolVersion);
}
}
@ -1669,6 +1679,16 @@ Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
return self->degraded;
}
// Returns the protocol version of the peer at the specified address. The result is returned as an AsyncVar that
// can be used to monitor for changes of a peer's protocol. The protocol version will be unset in the event that
// there is no connection established to the peer.
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) {
return self->peers.at(addr)->protocolVersion;
}
void FlowTransport::resetConnection(NetworkAddress address) {
auto peer = self->getPeer(address);
if (peer) {

View File

@ -152,6 +152,9 @@ struct Peer : public ReferenceCounted<Peer> {
double lastLoggedTime;
int64_t lastLoggedBytesReceived;
int64_t lastLoggedBytesSent;
Reference<AsyncVar<Optional<ProtocolVersion>>> protocolVersion;
// Cleared every time stats are logged for this peer.
int connectOutgoingCount;
int connectIncomingCount;
@ -174,64 +177,64 @@ public:
FlowTransport(uint64_t transportId);
~FlowTransport();
static void createInstance(bool isClient, uint64_t transportId);
// Creates a new FlowTransport and makes FlowTransport::transport() return it. This uses g_network->global()
// variables, so it will be private to a simulation.
static void createInstance(bool isClient, uint64_t transportId);
static bool isClient() { return g_network->global(INetwork::enClientFailureMonitor) != nullptr; }
void initMetrics();
// Metrics must be initialized after FlowTransport::createInstance has been called
void initMetrics();
Future<Void> bind(NetworkAddress publicAddress, NetworkAddress listenAddress);
// Starts a server listening on the given listenAddress, and sets publicAddress to be the public
// address of this server. Returns only errors.
Future<Void> bind(NetworkAddress publicAddress, NetworkAddress listenAddress);
NetworkAddress getLocalAddress() const;
// Returns first local NetworkAddress.
NetworkAddress getLocalAddress() const;
NetworkAddressList getLocalAddresses() const;
// Returns all local NetworkAddress.
NetworkAddressList getLocalAddresses() const;
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
Future<Void> onIncompatibleChanged();
// Returns when getIncompatiblePeers has at least one peer which is incompatible.
Future<Void> onIncompatibleChanged();
void addPeerReference(const Endpoint&, bool isStream);
// Signal that a peer connection is being used, even if no messages are currently being sent to the peer
void addPeerReference(const Endpoint&, bool isStream);
void removePeerReference(const Endpoint&, bool isStream);
// Signal that a peer connection is no longer being used
void removePeerReference(const Endpoint&, bool isStream);
void addEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
// Sets endpoint to be a new local endpoint which delivers messages to the given receiver
void addEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
void addEndpoints(std::vector<std::pair<struct FlowReceiver*, TaskPriority>> const& streams);
void removeEndpoint(const Endpoint&, NetworkMessageReceiver*);
// The given local endpoint no longer delivers messages to the given receiver or uses resources
void removeEndpoint(const Endpoint&, NetworkMessageReceiver*);
void addWellKnownEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
// Sets endpoint to a new local endpoint (without changing its token) which delivers messages to the given receiver
// Implementations may have limitations on when this function is called and what endpoint.token may be!
void addWellKnownEndpoint(Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID);
// sendReliable will keep trying to deliver the data to the destination until cancelReliable is called. It will
// retry sending if the connection is closed or the failure manager reports the destination become available (edge
// triggered).
ReliablePacket* sendReliable(ISerializeSource const& what, const Endpoint& destination);
// sendReliable will keep trying to deliver the data to the destination until cancelReliable is
// called. It will retry sending if the connection is closed or the failure manager reports
// the destination become available (edge triggered).
// Makes Packet "unreliable" (either the data or a connection close event will be delivered eventually). It can
// still be used safely to send a reply to a "reliable" request.
void cancelReliable(ReliablePacket*);
// Makes Packet "unreliable" (either the data or a connection close event will be delivered
// eventually). It can still be used safely to send a reply to a "reliable" request.
Reference<AsyncVar<bool>> getDegraded();
// This async var will be set to true when the process cannot connect to a public network address that the failure
// monitor thinks is healthy.
Reference<AsyncVar<bool>> getDegraded();
void resetConnection(NetworkAddress address);
// Forces the connection with this address to be reset
void resetConnection(NetworkAddress address);
Reference<Peer> sendUnreliable(ISerializeSource const& what,
const Endpoint& destination,
@ -239,6 +242,14 @@ public:
bool incompatibleOutgoingConnectionsPresent();
// Returns the protocol version of the peer at the specified address. The result is returned as an AsyncVar that
// can be used to monitor for changes of a peer's protocol. The protocol version will be unset in the event that
// there is no connection established to the peer.
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> getPeerProtocolAsyncVar(NetworkAddress addr);
static FlowTransport& transport() {
return *static_cast<FlowTransport*>((void*)g_network->global(INetwork::enFlowTransport));
}

View File

@ -20,6 +20,7 @@
#pragma once
#include <cstdint>
#include "flow/Trace.h"
#define PROTOCOL_VERSION_FEATURE(v, x) \
struct x { \
@ -50,6 +51,10 @@ public:
return (other.version() & compatibleProtocolVersionMask) == (version() & compatibleProtocolVersionMask);
}
// Returns a normalized protocol version that will be the same for all compatible versions
constexpr ProtocolVersion normalizedVersion() const {
return ProtocolVersion(_version & compatibleProtocolVersionMask);
}
constexpr bool isValid() const { return version() >= minValidProtocolVersion; }
constexpr uint64_t version() const { return _version & versionFlagMask; }
@ -134,6 +139,13 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext);
};
template <>
struct Traceable<ProtocolVersion> : std::true_type {
static std::string toString(const ProtocolVersion& protocolVersion) {
return format("0x%016lX", protocolVersion.version());
}
};
// These impact both communications and the deserialization of certain database and IKeyValueStore keys.
//
// The convention is that 'x' and 'y' should match the major and minor version of the software, and 'z' should be 0.