Abstract the cluster file into a cluster connection record that can be backed by something other than the filesystem.

This commit is contained in:
A.J. Beamon 2021-10-10 20:44:56 -07:00
parent d153519188
commit e882eb33fc
49 changed files with 1102 additions and 465 deletions

View File

@ -37,6 +37,7 @@
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/IKnobCollection.h"
#include "fdbclient/RunTransaction.actor.h"
@ -3095,7 +3096,7 @@ Optional<Database> connectToCluster(std::string const& clusterFile,
} catch (Error& e) {
if (!quiet) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", ccf->getLocation().c_str());
}
return db;
}

View File

@ -19,6 +19,7 @@
*/
#include "boost/lexical_cast.hpp"
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/IClientApi.h"
@ -1034,8 +1035,8 @@ ACTOR Future<bool> exclude(Database db,
locality.c_str());
}
ClusterConnectionString ccs = wait(ccf->getStoredConnectionString());
bool foundCoordinator = false;
auto ccs = ClusterConnectionFile(ccf->getFilename()).getConnectionString();
for (const auto& c : ccs.coordinators()) {
if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) ||
std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) {
@ -1584,12 +1585,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
try {
localDb = Database::createDatabase(ccf, opt.api_version, IsInternal::False);
if (!opt.exec.present()) {
printf("Using cluster file `%s'.\n", ccf->getFilename().c_str());
printf("Using cluster file `%s'.\n", ccf->getLocation().c_str());
}
db = API->createDatabase(opt.clusterFile.c_str());
} catch (Error& e) {
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
printf("Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
printf("Unable to connect to cluster from `%s'\n", ccf->getLocation().c_str());
return 1;
}
@ -1600,7 +1601,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
.detail("Version", FDB_VT_VERSION)
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(nullptr))
.detail("ClusterFile", ccf->getFilename().c_str())
.detail("ClusterFile", ccf->toString())
.detail("ConnectionString", ccf->getConnectionString().toString())
.setMaxFieldLength(10000)
.detail("CommandLine", opt.commandLine)

View File

@ -26,6 +26,12 @@ set(FDBCLIENT_SRCS
ClientKnobs.h
ClientLogEvents.h
ClientWorkerInterface.h
ClusterConnectionFile.actor.cpp
ClusterConnectionFile.h
ClusterConnectionKey.actor.cpp
ClusterConnectionKey.actor.h
ClusterConnectionMemoryRecord.actor.cpp
ClusterConnectionMemoryRecord.h
ClusterInterface.h
CommitProxyInterface.h
CommitTransaction.h

View File

@ -0,0 +1,174 @@
/*
* ClusterConnectionFile.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // has to be last include
// Loads and parses the file at 'path', throwing errors if the file cannot be read or the format is invalid.
ClusterConnectionFile::ClusterConnectionFile(std::string const& filename) : IClusterConnectionRecord(false) {
if (!fileExists(filename)) {
throw no_cluster_file_found();
}
cs = ClusterConnectionString(readFileBytes(filename, MAX_CLUSTER_FILE_BYTES));
this->filename = filename;
}
// Creates a cluster file with a given connection string and saves it to the specified file.
ClusterConnectionFile::ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents)
: IClusterConnectionRecord(true) {
this->filename = filename;
cs = contents;
}
// Returns the connection string currently held in this object. This may not match the string on disk if it hasn't
// been persisted or if the file has been modified externally.
ClusterConnectionString const& ClusterConnectionFile::getConnectionString() const {
return cs;
}
// Sets the connections string held by this object. Calling this function does not persist the string to disk.
Future<Void> ClusterConnectionFile::setConnectionString(ClusterConnectionString const& conn) {
ASSERT(filename.size());
cs = conn;
return success(persist());
}
// Get the connection string stored in the file.
Future<ClusterConnectionString> ClusterConnectionFile::getStoredConnectionString() {
try {
return ClusterConnectionFile(filename).cs;
} catch (Error& e) {
return e;
}
}
// Checks whether the connection string in the file matches the connection string stored in memory. The cluster
// string stored in the file is returned via the reference parameter connectionString.
Future<bool> ClusterConnectionFile::upToDate(ClusterConnectionString& fileConnectionString) {
try {
// the cluster file hasn't been created yet so there's nothing to check
if (needsToBePersisted())
return true;
ClusterConnectionFile temp(filename);
fileConnectionString = temp.getConnectionString();
return fileConnectionString.toString() == cs.toString();
} catch (Error& e) {
TraceEvent(SevWarnAlways, "ClusterFileError").error(e).detail("Filename", filename);
return false; // Swallow the error and report that the file is out of date
}
}
// Returns the specified path of the cluster file.
std::string ClusterConnectionFile::getLocation() const {
return filename;
}
// Creates a copy of this object with a modified connection string but that isn't persisted.
Reference<IClusterConnectionRecord> ClusterConnectionFile::makeIntermediateRecord(
ClusterConnectionString const& connectionString) const {
return makeReference<ClusterConnectionFile>(filename, connectionString);
}
// Returns a string representation of this cluster connection record. This will include the type of record and the
// filename of the cluster file.
std::string ClusterConnectionFile::toString() const {
return "File: " + filename;
}
// returns <resolved name, was default file>
std::pair<std::string, bool> ClusterConnectionFile::lookupClusterFileName(std::string const& filename) {
if (filename.length())
return std::make_pair(filename, false);
std::string f;
bool isDefaultFile = true;
if (platform::getEnvironmentVar(CLUSTER_FILE_ENV_VAR_NAME, f)) {
// If this is set but points to a file that does not
// exist, we will not fallback to any other methods
isDefaultFile = false;
} else if (fileExists("fdb.cluster"))
f = "fdb.cluster";
else
f = platform::getDefaultClusterFilePath();
return std::make_pair(f, isDefaultFile);
}
// get a human readable error message describing the error returned from the constructor
std::string ClusterConnectionFile::getErrorString(std::pair<std::string, bool> const& resolvedClusterFile,
Error const& e) {
bool isDefault = resolvedClusterFile.second;
if (e.code() == error_code_connection_string_invalid) {
return format("Invalid cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
} else if (e.code() == error_code_no_cluster_file_found) {
if (isDefault)
return format("Unable to read cluster file `./fdb.cluster' or `%s' and %s unset: %d %s",
platform::getDefaultClusterFilePath().c_str(),
CLUSTER_FILE_ENV_VAR_NAME,
e.code(),
e.what());
else
return format(
"Unable to read cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
} else {
return format(
"Unexpected error loading cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
}
}
// Writes the connection string to the cluster file
Future<bool> ClusterConnectionFile::persist() {
setPersisted();
if (filename.size()) {
try {
atomicReplace(filename,
"# DO NOT EDIT!\n# This file is auto-generated, it is not to be edited by hand\n" +
cs.toString().append("\n"));
Future<bool> isUpToDate = IClusterConnectionRecord::upToDate();
// The implementation of upToDate in this class is synchronous
ASSERT(isUpToDate.isReady());
if (!isUpToDate.get()) {
// This should only happen in rare scenarios where multiple processes are updating the same file to
// different values simultaneously In that case, we don't have any guarantees about which file will
// ultimately be written
TraceEvent(SevWarnAlways, "ClusterFileChangedAfterReplace")
.detail("Filename", filename)
.detail("ConnStr", cs.toString());
return false;
}
return true;
} catch (Error& e) {
TraceEvent(SevWarnAlways, "UnableToChangeConnectionFile")
.error(e)
.detail("Filename", filename)
.detail("ConnStr", cs.toString());
}
}
return false;
}

View File

@ -0,0 +1,81 @@
/*
* ClusterConnectionFile.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef FDBCLIENT_CLUSTERCONNECTIONFILE_H
#define FDBCLIENT_CLUSTERCONNECTIONFILE_H
#include "fdbclient/CoordinationInterface.h"
#include "flow/actorcompiler.h" // has to be last include
// An implementation of IClusterConnectionRecord backed by a file.
class ClusterConnectionFile : public IClusterConnectionRecord, ReferenceCounted<ClusterConnectionFile>, NonCopyable {
public:
// Loads and parses the file at 'path', throwing errors if the file cannot be read or the format is invalid.
explicit ClusterConnectionFile(std::string const& path);
// Creates a cluster file with a given connection string and saves it to the specified file.
explicit ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents);
// Returns the connection string currently held in this object. This may not match the string on disk if it hasn't
// been persisted or if the file has been modified externally.
ClusterConnectionString const& getConnectionString() const override;
// Sets the connections string held by this object. Calling this function does not persist the string to disk.
Future<Void> setConnectionString(ClusterConnectionString const&) override;
// Get the connection string stored in the file.
Future<ClusterConnectionString> getStoredConnectionString() override;
// Checks whether the connection string in the file matches the connection string stored in memory. The cluster
// string stored in the file is returned via the reference parameter connectionString.
Future<bool> upToDate(ClusterConnectionString& fileConnectionString) override;
// Returns the specified path of the cluster file.
std::string getLocation() const override;
// Creates a copy of this object with a modified connection string but that isn't persisted.
Reference<IClusterConnectionRecord> makeIntermediateRecord(
ClusterConnectionString const& connectionString) const override;
// Returns a string representation of this cluster connection record. This will include the type of record and the
// filename of the cluster file.
std::string toString() const override;
void addref() override { ReferenceCounted<ClusterConnectionFile>::addref(); }
void delref() override { ReferenceCounted<ClusterConnectionFile>::delref(); }
// returns <resolved name, was default file>
static std::pair<std::string, bool> lookupClusterFileName(std::string const& filename);
// get a human readable error message describing the error returned from the constructor
static std::string getErrorString(std::pair<std::string, bool> const& resolvedFile, Error const& e);
protected:
// Writes the connection string to the cluster file
Future<bool> persist() override;
private:
ClusterConnectionString cs;
std::string filename;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -0,0 +1,144 @@
/*
* ClusterConnectionKey.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionKey.actor.h"
#include "flow/actorcompiler.h" // has to be last include
// Creates a cluster connection record with a given connection string and saves it to the specified key. Needs to be
// persisted should be set to true unless this ClusterConnectionKey is being created with the value read from the
// key.
ClusterConnectionKey::ClusterConnectionKey(Database db,
Key connectionStringKey,
ClusterConnectionString const& contents,
bool needsToBePersisted)
: IClusterConnectionRecord(needsToBePersisted), db(db), cs(contents), connectionStringKey(connectionStringKey) {}
// Loads and parses the connection string at the specified key, throwing errors if the file cannot be read or the
// format is invalid.
ACTOR Future<Reference<ClusterConnectionKey>> ClusterConnectionKey::loadClusterConnectionKey(Database db,
Key connectionStringKey) {
state Transaction tr(db);
loop {
try {
Optional<Value> v = wait(tr.get(connectionStringKey));
if (!v.present()) {
throw connection_string_invalid();
}
return makeReference<ClusterConnectionKey>(
db, connectionStringKey, ClusterConnectionString(v.get().toString()), false);
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Returns the connection string currently held in this object. This may not match the string in the database if it
// hasn't been persisted or if the key has been modified externally.
ClusterConnectionString const& ClusterConnectionKey::getConnectionString() const {
return cs;
}
// Sets the connections string held by this object. Calling this function does not persist the string to the
// database.
Future<Void> ClusterConnectionKey::setConnectionString(ClusterConnectionString const& connectionString) {
cs = connectionString;
return success(persist());
}
// Get the connection string stored in the database.
ACTOR Future<ClusterConnectionString> ClusterConnectionKey::getStoredConnectionStringImpl(
Reference<ClusterConnectionKey> self) {
Reference<ClusterConnectionKey> cck =
wait(ClusterConnectionKey::loadClusterConnectionKey(self->db, self->connectionStringKey));
return cck->cs;
}
Future<ClusterConnectionString> ClusterConnectionKey::getStoredConnectionString() {
return getStoredConnectionStringImpl(Reference<ClusterConnectionKey>::addRef(this));
}
ACTOR Future<bool> ClusterConnectionKey::upToDateImpl(Reference<ClusterConnectionKey> self,
ClusterConnectionString* connectionString) {
try {
// the cluster file hasn't been created yet so there's nothing to check
if (self->needsToBePersisted())
return true;
Reference<ClusterConnectionKey> temp =
wait(ClusterConnectionKey::loadClusterConnectionKey(self->db, self->connectionStringKey));
*connectionString = temp->getConnectionString();
return connectionString->toString() == self->cs.toString();
} catch (Error& e) {
TraceEvent(SevWarnAlways, "ClusterKeyError").error(e).detail("Key", self->connectionStringKey);
return false; // Swallow the error and report that the file is out of date
}
}
// Checks whether the connection string in the database matches the connection string stored in memory. The cluster
// string stored in the database is returned via the reference parameter connectionString.
Future<bool> ClusterConnectionKey::upToDate(ClusterConnectionString& connectionString) {
return upToDateImpl(Reference<ClusterConnectionKey>::addRef(this), &connectionString);
}
// Returns the key where the connection string is stored.
std::string ClusterConnectionKey::getLocation() const {
return printable(connectionStringKey);
}
// Creates a copy of this object with a modified connection string but that isn't persisted.
Reference<IClusterConnectionRecord> ClusterConnectionKey::makeIntermediateRecord(
ClusterConnectionString const& connectionString) const {
return makeReference<ClusterConnectionKey>(db, connectionStringKey, connectionString);
}
// Returns a string representation of this cluster connection record. This will include the type of record and the
// key where the record is stored.
std::string ClusterConnectionKey::toString() const {
return "Key: " + printable(connectionStringKey);
}
ACTOR Future<bool> ClusterConnectionKey::persistImpl(Reference<ClusterConnectionKey> self) {
self->setPersisted();
try {
state Transaction tr(self->db);
loop {
try {
tr.set(self->connectionStringKey, StringRef(self->cs.toString()));
wait(tr.commit());
return true;
} catch (Error& e) {
wait(tr.onError(e));
}
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "UnableToChangeConnectionKey")
.error(e)
.detail("ConnectionKey", self->connectionStringKey)
.detail("ConnStr", self->cs.toString());
}
return false;
};
// Writes the connection string to the database
Future<bool> ClusterConnectionKey::persist() {
return persistImpl(Reference<ClusterConnectionKey>::addRef(this));
}

View File

@ -0,0 +1,97 @@
/*
* ClusterConnectionKey.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_CLUSTERCONNECTIONKEY_ACTOR_G_H)
#define FDBCLIENT_CLUSTERCONNECTIONKEY_ACTOR_G_H
#include "fdbclient/ClusterConnectionKey.actor.g.h"
#elif !defined(FDBCLIENT_CLUSTERCONNECTIONKEY_ACTOR_H)
#define FDBCLIENT_CLUSTERCONNECTIONKEY_ACTOR_H
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/NativeAPI.actor.h"
#include "flow/actorcompiler.h" // has to be last include
// An implementation of IClusterConnectionRecord backed by a key in a FoundationDB database.
class ClusterConnectionKey : public IClusterConnectionRecord, ReferenceCounted<ClusterConnectionKey>, NonCopyable {
public:
// Creates a cluster connection record with a given connection string and saves it to the specified key. Needs to be
// persisted should be set to true unless this ClusterConnectionKey is being created with the value read from the
// key.
ClusterConnectionKey(Database db,
Key connectionStringKey,
ClusterConnectionString const& contents,
bool needsToBePersisted = true);
// Loads and parses the connection string at the specified key, throwing errors if the file cannot be read or the
// format is invalid.
ACTOR static Future<Reference<ClusterConnectionKey>> loadClusterConnectionKey(Database db, Key connectionStringKey);
// Returns the connection string currently held in this object. This may not match the string in the database if it
// hasn't been persisted or if the key has been modified externally.
ClusterConnectionString const& getConnectionString() const override;
// Sets the connections string held by this object. Calling this function does not persist the string to the
// database.
Future<Void> setConnectionString(ClusterConnectionString const&) override;
// Get the connection string stored in the database.
Future<ClusterConnectionString> getStoredConnectionString() override;
// Checks whether the connection string in the database matches the connection string stored in memory. The cluster
// string stored in the database is returned via the reference parameter connectionString.
Future<bool> upToDate(ClusterConnectionString& connectionString) override;
// Returns the key where the connection string is stored.
std::string getLocation() const override;
// Creates a copy of this object with a modified connection string but that isn't persisted.
Reference<IClusterConnectionRecord> makeIntermediateRecord(
ClusterConnectionString const& connectionString) const override;
// Returns a string representation of this cluster connection record. This will include the type of record and the
// key where the record is stored.
std::string toString() const override;
void addref() override { ReferenceCounted<ClusterConnectionKey>::addref(); }
void delref() override { ReferenceCounted<ClusterConnectionKey>::delref(); }
protected:
// Writes the connection string to the database
Future<bool> persist() override;
private:
ACTOR static Future<ClusterConnectionString> getStoredConnectionStringImpl(Reference<ClusterConnectionKey> self);
ACTOR static Future<bool> upToDateImpl(Reference<ClusterConnectionKey> self,
ClusterConnectionString* connectionString);
ACTOR static Future<bool> persistImpl(Reference<ClusterConnectionKey> self);
// The database where the connection key is stored. Note that this does not need to be the same database as the one
// that the connection string would connect to.
Database db;
ClusterConnectionString cs;
Key connectionStringKey;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -0,0 +1,68 @@
/*
* ClusterConnectionMemoryRecord.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // has to be last include
// Returns the connection string currently held in this object.
ClusterConnectionString const& ClusterConnectionMemoryRecord::getConnectionString() const {
return cs;
}
// Sets the connections string held by this object.
Future<Void> ClusterConnectionMemoryRecord::setConnectionString(ClusterConnectionString const& conn) {
cs = conn;
return Void();
}
// Returns the connection string currently held in this object (there is no persistent storage).
Future<ClusterConnectionString> ClusterConnectionMemoryRecord::getStoredConnectionString() {
return cs;
}
// Because the memory record is not persisted, it is always up to date and this returns true. The connection string
// is returned via the reference parameter connectionString.
Future<bool> ClusterConnectionMemoryRecord::upToDate(ClusterConnectionString& fileConnectionString) {
fileConnectionString = cs;
return true;
}
// Returns the ID of the memory record.
std::string ClusterConnectionMemoryRecord::getLocation() const {
return id.toString();
}
// Returns a copy of this object with a modified connection string.
Reference<IClusterConnectionRecord> ClusterConnectionMemoryRecord::makeIntermediateRecord(
ClusterConnectionString const& connectionString) const {
return makeReference<ClusterConnectionMemoryRecord>(connectionString);
}
// Returns a string representation of this cluster connection record. This will include the type and id of the
// record.
std::string ClusterConnectionMemoryRecord::toString() const {
return "Memory: " + id.toString();
}
// This is a no-op for memory records. Returns true to indicate success.
Future<bool> ClusterConnectionMemoryRecord::persist() {
return true;
}

View File

@ -0,0 +1,73 @@
/*
* ClusterConnectionMemoryRecord.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef FDBCLIENT_CLUSTERCONNECTIONMEMORYRECORD_H
#define FDBCLIENT_CLUSTERCONNECTIONMEMORYRECORD_H
#include "fdbclient/CoordinationInterface.h"
// An implementation of IClusterConnectionRecord that is stored in memory only and not persisted.
class ClusterConnectionMemoryRecord : public IClusterConnectionRecord,
ReferenceCounted<ClusterConnectionMemoryRecord>,
NonCopyable {
public:
// Creates a cluster file with a given connection string.
explicit ClusterConnectionMemoryRecord(ClusterConnectionString const& cs)
: IClusterConnectionRecord(false), id(deterministicRandom()->randomUniqueID()), cs(cs) {}
// Returns the connection string currently held in this object.
ClusterConnectionString const& getConnectionString() const override;
// Sets the connections string held by this object.
Future<Void> setConnectionString(ClusterConnectionString const&) override;
// Returns the connection string currently held in this object (there is no persistent storage).
Future<ClusterConnectionString> getStoredConnectionString() override;
// Because the memory record is not persisted, it is always up to date and this returns true. The connection string
// is returned via the reference parameter connectionString.
Future<bool> upToDate(ClusterConnectionString& fileConnectionString) override;
// Returns a location string for the memory record that includes its ID.
std::string getLocation() const override;
// Returns a copy of this object with a modified connection string.
Reference<IClusterConnectionRecord> makeIntermediateRecord(
ClusterConnectionString const& connectionString) const override;
// Returns a string representation of this cluster connection record. This will include the type and id of the
// record.
std::string toString() const override;
void addref() override { ReferenceCounted<ClusterConnectionMemoryRecord>::addref(); }
void delref() override { ReferenceCounted<ClusterConnectionMemoryRecord>::delref(); }
protected:
// This is a no-op for memory records. Returns true to indicate success.
Future<bool> persist() override;
private:
// A unique ID for the record
UID id;
ClusterConnectionString cs;
};
#endif

View File

@ -45,11 +45,23 @@ struct ClientLeaderRegInterface {
}
};
// A string containing the information necessary to connect to a cluster.
//
// The format of the connection string is: description:id@[addrs]+
// The description and id together are called the "key"
//
// The following is enforced about the format of the file:
// - The key must contain one (and only one) ':' character
// - The description contains only allowed characters (a-z, A-Z, 0-9, _)
// - The ID contains only allowed characters (a-z, A-Z, 0-9)
// - At least one address is specified
// - There is no address present more than once
class ClusterConnectionString {
public:
ClusterConnectionString() {}
ClusterConnectionString(std::string const& connectionString);
ClusterConnectionString(std::vector<NetworkAddress>, Key);
std::vector<NetworkAddress> const& coordinators() const { return coord; }
Key clusterKey() const { return key; }
Key clusterKeyName() const {
@ -65,45 +77,68 @@ private:
Key key, keyDesc;
};
class ClusterConnectionFile : NonCopyable, public ReferenceCounted<ClusterConnectionFile> {
// A record that stores the connection string used to connect to a cluster. This record can be updated when a cluster
// notifies a connected party that the connection string has changed.
//
// The typically used cluster connection record is a cluster file (implemented in ClusterConnectionFile). This interface
// provides an abstraction over the cluster file so that we can persist the connection string in other locations or have
// one that is only stored in memory.
class IClusterConnectionRecord {
public:
ClusterConnectionFile() {}
// Loads and parses the file at 'path', throwing errors if the file cannot be read or the format is invalid.
//
// The format of the file is: description:id@[addrs]+
// The description and id together are called the "key"
//
// The following is enforced about the format of the file:
// - The key must contain one (and only one) ':' character
// - The description contains only allowed characters (a-z, A-Z, 0-9, _)
// - The ID contains only allowed characters (a-z, A-Z, 0-9)
// - At least one address is specified
// - There is no address present more than once
explicit ClusterConnectionFile(std::string const& path);
explicit ClusterConnectionFile(ClusterConnectionString const& cs) : cs(cs), setConn(false) {}
explicit ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents);
IClusterConnectionRecord(bool connectionStringNeedsPersisted)
: connectionStringNeedsPersisted(connectionStringNeedsPersisted) {}
virtual ~IClusterConnectionRecord() {}
// returns <resolved name, was default file>
static std::pair<std::string, bool> lookupClusterFileName(std::string const& filename);
// get a human readable error message describing the error returned from the constructor
static std::string getErrorString(std::pair<std::string, bool> const& resolvedFile, Error const& e);
// Returns the connection string currently held in this object. This may not match the stored record if it hasn't
// been persisted or if the persistent storage for the record has been modified externally.
virtual ClusterConnectionString const& getConnectionString() const = 0;
ClusterConnectionString const& getConnectionString() const;
bool writeFile();
void setConnectionString(ClusterConnectionString const&);
std::string const& getFilename() const {
ASSERT(filename.size());
return filename;
}
bool canGetFilename() const { return filename.size() != 0; }
bool fileContentsUpToDate() const;
bool fileContentsUpToDate(ClusterConnectionString& fileConnectionString) const;
// Sets the connections string held by this object. Calling this function does not persist the record.
virtual Future<Void> setConnectionString(ClusterConnectionString const&) = 0;
// If this record is backed by persistent storage, get the connection string from that storage. Otherwise, return
// the connection string stored in memory.
virtual Future<ClusterConnectionString> getStoredConnectionString() = 0;
// Checks whether the connection string in persisten storage matches the connection string stored in memory.
Future<bool> upToDate();
// Checks whether the connection string in persisten storage matches the connection string stored in memory. The
// cluster string stored in persistent storage is returned via the reference parameter connectionString.
virtual Future<bool> upToDate(ClusterConnectionString& connectionString) = 0;
// Returns a string representing the location of the cluster record. For example, this could be the filename or key
// that stores the connection string.
virtual std::string getLocation() const = 0;
// Creates a copy of this object with a modified connection string but that isn't persisted.
virtual Reference<IClusterConnectionRecord> makeIntermediateRecord(
ClusterConnectionString const& connectionString) const = 0;
// Returns a string representation of this cluster connection record. This will include the type and location of the
// record.
virtual std::string toString() const = 0;
// Signals to the connection record that it was successfully used to connect to a cluster.
void notifyConnected();
virtual void addref() = 0;
virtual void delref() = 0;
protected:
// Writes the connection string to the backing persistent storage, if applicable.
virtual Future<bool> persist() = 0;
// Returns whether the connection record contains a connection string that should be persisted upon connection.
bool needsToBePersisted() const;
// Clears the flag needs persisted flag.
void setPersisted();
private:
ClusterConnectionString cs;
std::string filename;
bool setConn;
// A flag that indicates whether this connection record should be persisted when it succesfully establishes a
// connection.
bool connectionStringNeedsPersisted;
};
struct LeaderInfo {
@ -199,9 +234,9 @@ class ClientCoordinators {
public:
std::vector<ClientLeaderRegInterface> clientLeaderServers;
Key clusterKey;
Reference<ClusterConnectionFile> ccf;
Reference<IClusterConnectionRecord> ccr;
explicit ClientCoordinators(Reference<ClusterConnectionFile> ccf);
explicit ClientCoordinators(Reference<IClusterConnectionRecord> ccr);
explicit ClientCoordinators(Key clusterKey, std::vector<NetworkAddress> coordinators);
ClientCoordinators() {}
};

View File

@ -167,7 +167,7 @@ public:
// Constructs a new copy of this DatabaseContext from the parameters of this DatabaseContext
Database clone() const {
return Database(new DatabaseContext(connectionFile,
return Database(new DatabaseContext(connectionRecord,
clientInfo,
coordinator,
clientInfoMonitor,
@ -231,16 +231,16 @@ public:
Future<Void> onConnected(); // Returns after a majority of coordination servers are available and have reported a
// leader. The cluster file therefore is valid, but the database might be unavailable.
Reference<ClusterConnectionFile> getConnectionFile();
Reference<IClusterConnectionRecord> getConnectionRecord();
// Switch the database to use the new connection file, and recreate all pending watches for committed transactions.
//
// Meant to be used as part of a 'hot standby' solution to switch to the standby. A correct switch will involve
// advancing the version on the new cluster sufficiently far that any transaction begun with a read version from the
// old cluster will fail to commit. Assuming the above version-advancing is done properly, a call to
// switchConnectionFile guarantees that any read with a version from the old cluster will not be attempted on the
// switchConnectionRecord guarantees that any read with a version from the old cluster will not be attempted on the
// new cluster.
Future<Void> switchConnectionFile(Reference<ClusterConnectionFile> standby);
Future<Void> switchConnectionRecord(Reference<IClusterConnectionRecord> standby);
Future<Void> connectionFileChanged();
IsSwitchable switchable{ false };
@ -253,7 +253,7 @@ public:
Future<Void> createSnapshot(StringRef uid, StringRef snapshot_command);
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor,
@ -270,7 +270,7 @@ public:
void expireThrottles();
// Key DB-specific information
Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile;
Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord;
AsyncTrigger proxiesChangeTrigger;
Future<Void> monitorProxiesInfoChange;
Future<Void> monitorTssInfoChange;

View File

@ -24,6 +24,7 @@
#include "fdbclient/Knobs.h"
#include "flow/Arena.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ReadYourWrites.h"
@ -778,15 +779,18 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely?
state ClusterConnectionString old(currentKey.get().toString());
if (tr->getDatabase()->getConnectionFile() &&
if (tr->getDatabase()->getConnectionRecord() &&
old.clusterKeyName().toString() !=
tr->getDatabase()->getConnectionFile()->getConnectionString().clusterKeyName())
tr->getDatabase()->getConnectionRecord()->getConnectionString().clusterKeyName())
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database??
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
if (!desiredCoordinators->size()) {
std::vector<NetworkAddress> _desiredCoordinators = wait(change->getDesiredCoordinators(
tr, old.coordinators(), Reference<ClusterConnectionFile>(new ClusterConnectionFile(old)), result));
tr,
old.coordinators(),
Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(old)),
result));
*desiredCoordinators = _desiredCoordinators;
}
@ -821,7 +825,7 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
}
std::vector<Future<Optional<LeaderInfo>>> leaderServers;
ClientCoordinators coord(Reference<ClusterConnectionFile>(new ClusterConnectionFile(conn)));
ClientCoordinators coord(Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(conn)));
leaderServers.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++)
@ -854,14 +858,17 @@ ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChan
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely?
state ClusterConnectionString old(currentKey.get().toString());
if (cx->getConnectionFile() &&
old.clusterKeyName().toString() != cx->getConnectionFile()->getConnectionString().clusterKeyName())
if (cx->getConnectionRecord() &&
old.clusterKeyName().toString() != cx->getConnectionRecord()->getConnectionString().clusterKeyName())
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database??
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
if (!desiredCoordinators.size()) {
std::vector<NetworkAddress> _desiredCoordinators = wait(change->getDesiredCoordinators(
&tr, old.coordinators(), Reference<ClusterConnectionFile>(new ClusterConnectionFile(old)), result));
&tr,
old.coordinators(),
Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(old)),
result));
desiredCoordinators = _desiredCoordinators;
}
@ -907,7 +914,8 @@ ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChan
TEST(old.clusterKeyName() == conn.clusterKeyName()); // Quorum change with unchanged name
state std::vector<Future<Optional<LeaderInfo>>> leaderServers;
state ClientCoordinators coord(Reference<ClusterConnectionFile>(new ClusterConnectionFile(conn)));
state ClientCoordinators coord(
Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(conn)));
// check if allowed to modify the cluster descriptor
if (!change->getDesiredClusterKeyName().empty()) {
CheckDescriptorMutableReply mutabilityReply =
@ -942,7 +950,7 @@ struct SpecifiedQuorumChange final : IQuorumChange {
explicit SpecifiedQuorumChange(std::vector<NetworkAddress> const& desired) : desired(desired) {}
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile>,
Reference<IClusterConnectionRecord>,
CoordinatorsResult&) override {
return desired;
}
@ -954,7 +962,7 @@ Reference<IQuorumChange> specifiedQuorumChange(std::vector<NetworkAddress> const
struct NoQuorumChange final : IQuorumChange {
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile>,
Reference<IClusterConnectionRecord>,
CoordinatorsResult&) override {
return oldCoordinators;
}
@ -970,9 +978,9 @@ struct NameQuorumChange final : IQuorumChange {
: newName(newName), otherChange(otherChange) {}
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile> cf,
Reference<IClusterConnectionRecord> ccr,
CoordinatorsResult& t) override {
return otherChange->getDesiredCoordinators(tr, oldCoordinators, cf, t);
return otherChange->getDesiredCoordinators(tr, oldCoordinators, ccr, t);
}
std::string getDesiredClusterKeyName() const override { return newName; }
};
@ -986,9 +994,9 @@ struct AutoQuorumChange final : IQuorumChange {
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile> ccf,
Reference<IClusterConnectionRecord> ccr,
CoordinatorsResult& err) override {
return getDesired(Reference<AutoQuorumChange>::addRef(this), tr, oldCoordinators, ccf, &err);
return getDesired(Reference<AutoQuorumChange>::addRef(this), tr, oldCoordinators, ccr, &err);
}
ACTOR static Future<int> getRedundancy(AutoQuorumChange* self, Transaction* tr) {
@ -1006,7 +1014,7 @@ struct AutoQuorumChange final : IQuorumChange {
ACTOR static Future<bool> isAcceptable(AutoQuorumChange* self,
Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile> ccf,
Reference<IClusterConnectionRecord> ccr,
int desiredCount,
std::set<AddressExclusion>* excluded) {
// Are there enough coordinators for the redundancy level?
@ -1016,7 +1024,7 @@ struct AutoQuorumChange final : IQuorumChange {
return false;
// Check availability
ClientCoordinators coord(ccf);
ClientCoordinators coord(ccr);
std::vector<Future<Optional<LeaderInfo>>> leaderServers;
leaderServers.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
@ -1054,7 +1062,7 @@ struct AutoQuorumChange final : IQuorumChange {
ACTOR static Future<std::vector<NetworkAddress>> getDesired(Reference<AutoQuorumChange> self,
Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile> ccf,
Reference<IClusterConnectionRecord> ccr,
CoordinatorsResult* err) {
state int desiredCount = self->desired;
@ -1088,7 +1096,7 @@ struct AutoQuorumChange final : IQuorumChange {
}
if (checkAcceptable) {
bool ok = wait(isAcceptable(self.getPtr(), tr, oldCoordinators, ccf, desiredCount, &excluded));
bool ok = wait(isAcceptable(self.getPtr(), tr, oldCoordinators, ccr, desiredCount, &excluded));
if (ok)
return oldCoordinators;
}
@ -2093,7 +2101,7 @@ ACTOR Future<Void> advanceVersion(Database cx, Version v) {
}
}
ACTOR Future<Void> forceRecovery(Reference<ClusterConnectionFile> clusterFile, Key dcId) {
ACTOR Future<Void> forceRecovery(Reference<IClusterConnectionRecord> clusterFile, Key dcId) {
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);

View File

@ -130,7 +130,7 @@ struct IQuorumChange : ReferenceCounted<IQuorumChange> {
virtual ~IQuorumChange() {}
virtual Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile>,
Reference<IClusterConnectionRecord>,
CoordinatorsResult&) = 0;
virtual std::string getDesiredClusterKeyName() const { return std::string(); }
};
@ -211,7 +211,7 @@ ACTOR Future<Void> advanceVersion(Database cx, Version v);
ACTOR Future<int> setDDMode(Database cx, int mode);
ACTOR Future<Void> forceRecovery(Reference<ClusterConnectionFile> clusterFile, Standalone<StringRef> dcId);
ACTOR Future<Void> forceRecovery(Reference<IClusterConnectionRecord> clusterFile, Standalone<StringRef> dcId);
ACTOR Future<Void> printHealthyZone(Database cx);
ACTOR Future<Void> setDDIgnoreRebalanceSwitch(Database cx, bool ignoreRebalance);

View File

@ -18,8 +18,10 @@
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/NativeAPI.actor.h"
#include "flow/ActorCollection.h"
#include "flow/UnitTest.h"
#include "fdbrpc/genericactors.actor.h"
@ -48,124 +50,23 @@ std::string trim(std::string const& connectionString) {
} // namespace
std::pair<std::string, bool> ClusterConnectionFile::lookupClusterFileName(std::string const& filename) {
if (filename.length())
return std::make_pair(filename, false);
std::string f;
bool isDefaultFile = true;
if (platform::getEnvironmentVar(CLUSTER_FILE_ENV_VAR_NAME, f)) {
// If this is set but points to a file that does not
// exist, we will not fallback to any other methods
isDefaultFile = false;
} else if (fileExists("fdb.cluster"))
f = "fdb.cluster";
else
f = platform::getDefaultClusterFilePath();
return std::make_pair(f, isDefaultFile);
}
std::string ClusterConnectionFile::getErrorString(std::pair<std::string, bool> const& resolvedClusterFile,
Error const& e) {
bool isDefault = resolvedClusterFile.second;
if (e.code() == error_code_connection_string_invalid) {
return format("Invalid cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
} else if (e.code() == error_code_no_cluster_file_found) {
if (isDefault)
return format("Unable to read cluster file `./fdb.cluster' or `%s' and %s unset: %d %s",
platform::getDefaultClusterFilePath().c_str(),
CLUSTER_FILE_ENV_VAR_NAME,
e.code(),
e.what());
else
return format(
"Unable to read cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
} else {
return format(
"Unexpected error loading cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
}
}
ClusterConnectionFile::ClusterConnectionFile(std::string const& filename) {
if (!fileExists(filename)) {
throw no_cluster_file_found();
}
cs = ClusterConnectionString(readFileBytes(filename, MAX_CLUSTER_FILE_BYTES));
this->filename = filename;
setConn = false;
}
ClusterConnectionFile::ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents) {
this->filename = filename;
cs = contents;
setConn = true;
}
ClusterConnectionString const& ClusterConnectionFile::getConnectionString() const {
return cs;
}
void ClusterConnectionFile::notifyConnected() {
if (setConn) {
this->writeFile();
}
}
bool ClusterConnectionFile::fileContentsUpToDate() const {
Future<bool> IClusterConnectionRecord::upToDate() {
ClusterConnectionString temp;
return fileContentsUpToDate(temp);
return upToDate(temp);
}
bool ClusterConnectionFile::fileContentsUpToDate(ClusterConnectionString& fileConnectionString) const {
try {
// the cluster file hasn't been created yet so there's nothing to check
if (setConn)
return true;
ClusterConnectionFile temp(filename);
fileConnectionString = temp.getConnectionString();
return fileConnectionString.toString() == cs.toString();
} catch (Error& e) {
TraceEvent(SevWarnAlways, "ClusterFileError").error(e).detail("Filename", filename);
return false; // Swallow the error and report that the file is out of date
void IClusterConnectionRecord::notifyConnected() {
if (connectionStringNeedsPersisted) {
this->persist();
}
}
bool ClusterConnectionFile::writeFile() {
setConn = false;
if (filename.size()) {
try {
atomicReplace(filename,
"# DO NOT EDIT!\n# This file is auto-generated, it is not to be edited by hand\n" +
cs.toString().append("\n"));
if (!fileContentsUpToDate()) {
// This should only happen in rare scenarios where multiple processes are updating the same file to
// different values simultaneously In that case, we don't have any guarantees about which file will
// ultimately be written
TraceEvent(SevWarnAlways, "ClusterFileChangedAfterReplace")
.detail("Filename", filename)
.detail("ConnStr", cs.toString());
return false;
}
return true;
} catch (Error& e) {
TraceEvent(SevWarnAlways, "UnableToChangeConnectionFile")
.error(e)
.detail("Filename", filename)
.detail("ConnStr", cs.toString());
}
}
return false;
bool IClusterConnectionRecord::needsToBePersisted() const {
return connectionStringNeedsPersisted;
}
void ClusterConnectionFile::setConnectionString(ClusterConnectionString const& conn) {
ASSERT(filename.size());
cs = conn;
writeFile();
void IClusterConnectionRecord::setPersisted() {
connectionStringNeedsPersisted = false;
}
std::string ClusterConnectionString::getErrorString(std::string const& source, Error const& e) {
@ -367,8 +268,8 @@ std::string ClusterConnectionString::toString() const {
return s;
}
ClientCoordinators::ClientCoordinators(Reference<ClusterConnectionFile> ccf) : ccf(ccf) {
ClusterConnectionString cs = ccf->getConnectionString();
ClientCoordinators::ClientCoordinators(Reference<IClusterConnectionRecord> ccr) : ccr(ccr) {
ClusterConnectionString cs = ccr->getConnectionString();
for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s)
clientLeaderServers.push_back(ClientLeaderRegInterface(*s));
clusterKey = cs.clusterKey();
@ -379,7 +280,7 @@ ClientCoordinators::ClientCoordinators(Key clusterKey, std::vector<NetworkAddres
for (const auto& coord : coordinators) {
clientLeaderServers.push_back(ClientLeaderRegInterface(coord));
}
ccf = makeReference<ClusterConnectionFile>(ClusterConnectionString(coordinators, clusterKey));
ccr = makeReference<ClusterConnectionMemoryRecord>(ClusterConnectionString(coordinators, clusterKey));
}
ClientLeaderRegInterface::ClientLeaderRegInterface(NetworkAddress remote)
@ -476,10 +377,10 @@ Optional<std::pair<LeaderInfo, bool>> getLeader(const std::vector<Optional<Leade
}
// Leader is the process that will be elected by coordinators as the cluster controller
ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<ClusterConnectionFile> connFile,
ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> outSerializedLeaderInfo,
MonitorLeaderInfo info) {
state ClientCoordinators coordinators(info.intermediateConnFile);
state ClientCoordinators coordinators(info.intermediateConnRecord);
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
state Future<Void> allActors;
@ -502,25 +403,26 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<ClusterConn
if (leader.get().first.forward) {
TraceEvent("MonitorLeaderForwarding")
.detail("NewConnStr", leader.get().first.serializedInfo.toString())
.detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString())
.detail("OldConnStr", info.intermediateConnRecord->getConnectionString().toString())
.trackLatest("MonitorLeaderForwarding");
info.intermediateConnFile = makeReference<ClusterConnectionFile>(
connFile->getFilename(), ClusterConnectionString(leader.get().first.serializedInfo.toString()));
info.intermediateConnRecord = connRecord->makeIntermediateRecord(
ClusterConnectionString(leader.get().first.serializedInfo.toString()));
return info;
}
if (connFile != info.intermediateConnFile) {
if (connRecord != info.intermediateConnRecord) {
if (!info.hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
.detail("CurrentConnectionString", info.intermediateConnFile->getConnectionString().toString());
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", connRecord->getConnectionString().toString())
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
info.intermediateConnFile = connFile;
connRecord->setConnectionString(info.intermediateConnRecord->getConnectionString());
info.intermediateConnRecord = connRecord;
}
info.hasConnected = true;
connFile->notifyConnected();
connRecord->notifyConnected();
outSerializedLeaderInfo->set(leader.get().first.serializedInfo);
}
@ -528,11 +430,11 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<ClusterConn
}
}
ACTOR Future<Void> monitorLeaderInternal(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> monitorLeaderInternal(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> outSerializedLeaderInfo) {
state MonitorLeaderInfo info(connFile);
state MonitorLeaderInfo info(connRecord);
loop {
MonitorLeaderInfo _info = wait(monitorLeaderOneGeneration(connFile, outSerializedLeaderInfo, info));
MonitorLeaderInfo _info = wait(monitorLeaderOneGeneration(connRecord, outSerializedLeaderInfo, info));
info = _info;
}
}
@ -750,13 +652,13 @@ void shrinkProxyList(ClientDBInfo& ni,
}
ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
Reference<ClusterConnectionFile> connFile,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
MonitorLeaderInfo info,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
state ClusterConnectionString cs = info.intermediateConnFile->getConnectionString();
state ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString();
state std::vector<NetworkAddress> addrs = cs.coordinators();
state int idx = 0;
state int successIndex = 0;
@ -779,20 +681,24 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
req.supportedVersions = supportedVersions->get();
req.traceLogGroup = traceLogGroup;
ClusterConnectionString fileConnectionString;
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
req.issues.push_back_deep(req.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connFile->getConnectionString().toString();
if (!incorrectTime.present()) {
incorrectTime = now();
}
if (connFile->canGetFilename()) {
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the
// file right before us)
state ClusterConnectionString storedConnectionString;
if (connRecord) {
bool upToDate = wait(connRecord->upToDate(storedConnectionString));
if (!upToDate) {
req.issues.push_back_deep(req.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connRecord->getConnectionString().toString();
if (!incorrectTime.present()) {
incorrectTime = now();
}
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing
// the file right before us)
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", fileConnectionString.toString())
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", storedConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
} else {
incorrectTime = Optional<double>();
}
} else {
incorrectTime = Optional<double>();
@ -804,24 +710,25 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
if (rep.get().read().forward.present()) {
TraceEvent("MonitorProxiesForwarding")
.detail("NewConnStr", rep.get().read().forward.get().toString())
.detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(
connFile->getFilename(), ClusterConnectionString(rep.get().read().forward.get().toString())));
.detail("OldConnStr", info.intermediateConnRecord->getConnectionString().toString());
info.intermediateConnRecord = connRecord->makeIntermediateRecord(
ClusterConnectionString(rep.get().read().forward.get().toString()));
return info;
}
if (connFile != info.intermediateConnFile) {
if (connRecord != info.intermediateConnRecord) {
if (!info.hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
.detail("CurrentConnectionString", info.intermediateConnFile->getConnectionString().toString());
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", connRecord->getConnectionString().toString())
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
info.intermediateConnFile = connFile;
connRecord->setConnectionString(info.intermediateConnRecord->getConnectionString());
info.intermediateConnRecord = connRecord;
}
info.hasConnected = true;
connFile->notifyConnected();
connRecord->notifyConnected();
auto& ni = rep.get().mutate();
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
@ -838,21 +745,21 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
}
ACTOR Future<Void> monitorProxies(
Reference<AsyncVar<Reference<ClusterConnectionFile>>> connFile,
Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connRecord,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
state MonitorLeaderInfo info(connFile->get());
state MonitorLeaderInfo info(connRecord->get());
loop {
choose {
when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration(
connFile->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
connRecord->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
info = _info;
}
when(wait(connFile->onChange())) {
when(wait(connRecord->onChange())) {
info.hasConnected = false;
info.intermediateConnFile = connFile->get();
info.intermediateConnRecord = connRecord->get();
}
}
}

View File

@ -54,11 +54,11 @@ struct ClientData {
struct MonitorLeaderInfo {
bool hasConnected;
Reference<ClusterConnectionFile> intermediateConnFile;
Reference<IClusterConnectionRecord> intermediateConnRecord;
MonitorLeaderInfo() : hasConnected(false) {}
explicit MonitorLeaderInfo(Reference<ClusterConnectionFile> intermediateConnFile)
: hasConnected(false), intermediateConnFile(intermediateConnFile) {}
explicit MonitorLeaderInfo(Reference<IClusterConnectionRecord> intermediateConnRecord)
: hasConnected(false), intermediateConnRecord(intermediateConnRecord) {}
};
Optional<std::pair<LeaderInfo, bool>> getLeader(const std::vector<Optional<LeaderInfo>>& nominees);
@ -68,7 +68,7 @@ Optional<std::pair<LeaderInfo, bool>> getLeader(const std::vector<Optional<Leade
// If a leader is elected for long enough and communication with a quorum of coordinators is possible, eventually
// outKnownLeader will be that leader's interface.
template <class LeaderInterface>
Future<Void> monitorLeader(Reference<ClusterConnectionFile> const& connFile,
Future<Void> monitorLeader(Reference<IClusterConnectionRecord> const& connFile,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader);
// This is one place where the leader election algorithm is run. The coodinator contacts all coodinators to collect
@ -80,7 +80,7 @@ Future<Void> monitorLeaderAndGetClientInfo(Value const& key,
Reference<AsyncVar<Optional<LeaderInfo>>> const& leaderInfo);
Future<Void> monitorProxies(
Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile,
Reference<AsyncVar<Reference<IClusterConnectionRecord>>> const& connRecord,
Reference<AsyncVar<ClientDBInfo>> const& clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> const& coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> const& supportedVersions,
@ -96,7 +96,7 @@ void shrinkProxyList(ClientDBInfo& ni,
#pragma region Implementation
#endif
Future<Void> monitorLeaderInternal(Reference<ClusterConnectionFile> const& connFile,
Future<Void> monitorLeaderInternal(Reference<IClusterConnectionRecord> const& connRecord,
Reference<AsyncVar<Value>> const& outSerializedLeaderInfo);
template <class LeaderInterface>
@ -119,11 +119,11 @@ struct LeaderDeserializer<ClusterInterface> {
};
template <class LeaderInterface>
Future<Void> monitorLeader(Reference<ClusterConnectionFile> const& connFile,
Future<Void> monitorLeader(Reference<IClusterConnectionRecord> const& connRecord,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader) {
LeaderDeserializer<LeaderInterface> deserializer;
auto serializedInfo = makeReference<AsyncVar<Value>>();
Future<Void> m = monitorLeaderInternal(connFile, serializedInfo);
Future<Void> m = monitorLeaderInternal(connRecord, serializedInfo);
return m || deserializer(serializedInfo, outKnownLeader);
}

View File

@ -36,6 +36,7 @@
#include "fdbclient/AnnotateActor.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
@ -376,8 +377,9 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
ev.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged)
.detail("Cluster",
cx->getConnectionFile() ? cx->getConnectionFile()->getConnectionString().clusterKeyName().toString()
: "")
cx->getConnectionRecord()
? cx->getConnectionRecord()->getConnectionString().clusterKeyName().toString()
: "")
.detail("Internal", cx->internal);
cx->cc.logToTraceEvent(ev);
@ -1027,14 +1029,14 @@ void DatabaseContext::registerSpecialKeySpaceModule(SpecialKeySpace::MODULE modu
specialKeySpaceModules.push_back(std::move(impl));
}
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<ClusterConnectionFile> clusterFile);
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
ACTOR Future<Optional<Value>> getJSON(Database db);
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Key prefix = Key(getKeyRange().begin);
return map(getWorkerInterfaces(ryw->getDatabase()->getConnectionFile()),
return map(getWorkerInterfaces(ryw->getDatabase()->getConnectionRecord()),
[prefix = prefix, kr = KeyRange(kr)](const RangeResult& in) {
RangeResult result;
for (const auto& [k_, v] : in) {
@ -1166,7 +1168,7 @@ Future<RangeResult> HealthMetricsRangeImpl::getRange(ReadYourWritesTransaction*
return healthMetricsGetRangeActor(ryw, kr);
}
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor,
@ -1177,7 +1179,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
IsInternal internal,
int apiVersion,
IsSwitchable switchable)
: lockAware(lockAware), switchable(switchable), connectionFile(connectionFile), proxyProvisional(false),
: lockAware(lockAware), switchable(switchable), connectionRecord(connectionRecord), proxyProvisional(false),
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), internal(internal),
cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
@ -1383,7 +1385,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
std::make_unique<SingleSpecialKeyImpl>(LiteralStringRef("\xff\xff/status/json"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() &&
ryw->getDatabase()->getConnectionFile()) {
ryw->getDatabase()->getConnectionRecord()) {
++ryw->getDatabase()->transactionStatusRequests;
return getJSON(ryw->getDatabase());
} else {
@ -1397,8 +1399,9 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
LiteralStringRef("\xff\xff/cluster_file_path"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(ryw->getDatabase()->getConnectionFile()->getFilename());
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Optional<Value> output =
StringRef(ryw->getDatabase()->getConnectionRecord()->getLocation());
return output;
}
} catch (Error& e) {
@ -1414,8 +1417,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
LiteralStringRef("\xff\xff/connection_string"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = ryw->getDatabase()->getConnectionFile();
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Reference<IClusterConnectionRecord> f = ryw->getDatabase()->getConnectionRecord();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
}
@ -1475,7 +1478,7 @@ Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo,
LockAware lockAware,
int apiVersion,
IsSwitchable switchable) {
return Database(new DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>>(),
return Database(new DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>>(),
clientInfo,
makeReference<AsyncVar<Optional<ClientLeaderRegInterface>>>(),
clientInfoMonitor,
@ -1699,11 +1702,12 @@ Future<Void> DatabaseContext::onConnected() {
return connected;
}
ACTOR static Future<Void> switchConnectionFileImpl(Reference<ClusterConnectionFile> connFile, DatabaseContext* self) {
ACTOR static Future<Void> switchConnectionRecordImpl(Reference<IClusterConnectionRecord> connRecord,
DatabaseContext* self) {
TEST(true); // Switch connection file
TraceEvent("SwitchConnectionFile")
.detail("ConnectionFile", connFile->canGetFilename() ? connFile->getFilename() : "")
.detail("ConnectionString", connFile->getConnectionString().toString());
TraceEvent("SwitchConnectionRecord")
.detail("ClusterFile", connRecord->toString())
.detail("ConnectionString", connRecord->getConnectionString().toString());
// Reset state from former cluster.
self->commitProxies.clear();
@ -1716,38 +1720,38 @@ ACTOR static Future<Void> switchConnectionFileImpl(Reference<ClusterConnectionFi
clearedClientInfo.grvProxies.clear();
clearedClientInfo.id = deterministicRandom()->randomUniqueID();
self->clientInfo->set(clearedClientInfo);
self->connectionFile->set(connFile);
self->connectionRecord->set(connRecord);
state Database db(Reference<DatabaseContext>::addRef(self));
state Transaction tr(db);
loop {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
try {
TraceEvent("SwitchConnectionFileAttemptingGRV").log();
TraceEvent("SwitchConnectionRecordAttemptingGRV").log();
Version v = wait(tr.getReadVersion());
TraceEvent("SwitchConnectionFileGotRV")
TraceEvent("SwitchConnectionRecordGotRV")
.detail("ReadVersion", v)
.detail("MinAcceptableReadVersion", self->minAcceptableReadVersion);
ASSERT(self->minAcceptableReadVersion != std::numeric_limits<Version>::max());
self->connectionFileChangedTrigger.trigger();
return Void();
} catch (Error& e) {
TraceEvent("SwitchConnectionFileError").detail("Error", e.what());
TraceEvent("SwitchConnectionRecordError").detail("Error", e.what());
wait(tr.onError(e));
}
}
}
Reference<ClusterConnectionFile> DatabaseContext::getConnectionFile() {
if (connectionFile) {
return connectionFile->get();
Reference<IClusterConnectionRecord> DatabaseContext::getConnectionRecord() {
if (connectionRecord) {
return connectionRecord->get();
}
return Reference<ClusterConnectionFile>();
return Reference<IClusterConnectionRecord>();
}
Future<Void> DatabaseContext::switchConnectionFile(Reference<ClusterConnectionFile> standby) {
Future<Void> DatabaseContext::switchConnectionRecord(Reference<IClusterConnectionRecord> standby) {
ASSERT(switchable);
return switchConnectionFileImpl(standby, this);
return switchConnectionRecordImpl(standby, this);
}
Future<Void> DatabaseContext::connectionFileChanged() {
@ -1772,7 +1776,7 @@ extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& c
// Creates a database object that represents a connection to a cluster
// This constructor uses a preallocated DatabaseContext that may have been created
// on another thread
Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
Database Database::createDatabase(Reference<IClusterConnectionRecord> connRecord,
int apiVersion,
IsInternal internal,
LocalityData const& clientLocality,
@ -1780,13 +1784,13 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
if (!g_network)
throw network_not_setup();
if (connFile) {
if (connRecord) {
if (networkOptions.traceDirectory.present() && !traceFileIsOpen()) {
g_network->initMetrics();
FlowTransport::transport().initMetrics();
initTraceEventMetrics();
auto publicIP = determinePublicIPAutomatically(connFile->getConnectionString());
auto publicIP = determinePublicIPAutomatically(connRecord->getConnectionString());
selectTraceFormatter(networkOptions.traceFormat);
selectTraceClockSource(networkOptions.traceClockSource);
openTraceFile(NetworkAddress(publicIP, ::getpid()),
@ -1802,8 +1806,8 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
.detail("SourceVersion", getSourceVersion())
.detail("Version", FDB_VT_VERSION)
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detail("ClusterFile", connFile->getFilename().c_str())
.detail("ConnectionString", connFile->getConnectionString().toString())
.detail("ClusterFile", connRecord->toString())
.detail("ConnectionString", connRecord->getConnectionString().toString())
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(nullptr))
.detail("ApiVersion", apiVersion)
.detailf("ImageOffset", "%p", platform::getImageOffset())
@ -1820,9 +1824,9 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
auto clientInfo = makeReference<AsyncVar<ClientDBInfo>>();
auto coordinator = makeReference<AsyncVar<Optional<ClientLeaderRegInterface>>>();
auto connectionFile = makeReference<AsyncVar<Reference<ClusterConnectionFile>>>();
connectionFile->set(connFile);
Future<Void> clientInfoMonitor = monitorProxies(connectionFile,
auto connectionRecord = makeReference<AsyncVar<Reference<IClusterConnectionRecord>>>();
connectionRecord->set(connRecord);
Future<Void> clientInfoMonitor = monitorProxies(connectionRecord,
clientInfo,
coordinator,
networkOptions.supportedVersions,
@ -1830,7 +1834,7 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
DatabaseContext* db;
if (preallocatedDb) {
db = new (preallocatedDb) DatabaseContext(connectionFile,
db = new (preallocatedDb) DatabaseContext(connectionRecord,
clientInfo,
coordinator,
clientInfoMonitor,
@ -1842,7 +1846,7 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
apiVersion,
IsSwitchable::True);
} else {
db = new DatabaseContext(connectionFile,
db = new DatabaseContext(connectionRecord,
clientInfo,
coordinator,
clientInfoMonitor,
@ -1867,9 +1871,9 @@ Database Database::createDatabase(std::string connFileName,
int apiVersion,
IsInternal internal,
LocalityData const& clientLocality) {
Reference<ClusterConnectionFile> rccf = Reference<ClusterConnectionFile>(
Reference<IClusterConnectionRecord> rccr = Reference<IClusterConnectionRecord>(
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first));
return Database::createDatabase(rccf, apiVersion, internal, clientLocality);
return Database::createDatabase(rccr, apiVersion, internal, clientLocality);
}
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(KeyRef key) const {
@ -2831,7 +2835,7 @@ ACTOR Future<Version> watchValue(Future<Version> version,
TaskPriority::DefaultPromiseEndpoint))) {
resp = r;
}
when(wait(cx->connectionFile ? cx->connectionFile->onChange() : Never())) { wait(Never()); }
when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) { wait(Never()); }
}
if (info.debugID.present()) {
g_traceBatch.addEvent("WatchValueDebug",
@ -6537,7 +6541,7 @@ ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion
throw;
}
TraceEvent("ExclusionSafetyCheckCoordinators").log();
state ClientCoordinators coordinatorList(cx->getConnectionFile());
state ClientCoordinators coordinatorList(cx->getConnectionRecord());
state std::vector<Future<Optional<LeaderInfo>>> leaderServers;
leaderServers.reserve(coordinatorList.clientLeaderServers.size());
for (int i = 0; i < coordinatorList.clientLeaderServers.size(); i++) {
@ -6616,9 +6620,9 @@ ACTOR static Future<int64_t> rebootWorkerActor(DatabaseContext* cx, ValueRef add
duration = 0;
// fetch the addresses of all workers
state std::map<Key, std::pair<Value, ClientLeaderRegInterface>> address_interface;
if (!cx->getConnectionFile())
if (!cx->getConnectionRecord())
return 0;
RangeResult kvs = wait(getWorkerInterfaces(cx->getConnectionFile()));
RangeResult kvs = wait(getWorkerInterfaces(cx->getConnectionRecord()));
ASSERT(!kvs.more);
// Note: reuse this knob from fdbcli, change it if necessary
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
@ -6640,7 +6644,7 @@ Future<int64_t> DatabaseContext::rebootWorker(StringRef addr, bool check, int du
}
Future<Void> DatabaseContext::forceRecoveryWithDataLoss(StringRef dcId) {
return forceRecovery(getConnectionFile(), dcId);
return forceRecovery(getConnectionRecord(), dcId);
}
ACTOR static Future<Void> createSnapshotActor(DatabaseContext* cx, UID snapUID, StringRef snapCmd) {

View File

@ -83,7 +83,7 @@ public:
// Creates a database object that represents a connection to a cluster
// This constructor uses a preallocated DatabaseContext that may have been created
// on another thread
static Database createDatabase(Reference<ClusterConnectionFile> connFile,
static Database createDatabase(Reference<IClusterConnectionRecord> connRecord,
int apiVersion,
IsInternal internal = IsInternal::True,
LocalityData const& clientLocality = LocalityData(),

View File

@ -317,7 +317,7 @@ public:
Future<Void> commit() { return commit(this); }
PaxosConfigTransactionImpl(Database const& cx) : cx(cx) {
auto coordinators = cx->getConnectionFile()->getConnectionString().coordinators();
auto coordinators = cx->getConnectionRecord()->getConnectionString().coordinators();
ctis.reserve(coordinators.size());
for (const auto& coordinator : coordinators) {
ctis.emplace_back(coordinator);

View File

@ -1336,9 +1336,9 @@ ACTOR Future<Optional<Value>> getJSON(Database db) {
return getValueFromJSON(statusObj);
}
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<ClusterConnectionFile> clusterFile) {
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> connRecord) {
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(connRecord, clusterInterface);
loop {
choose {
@ -1371,7 +1371,7 @@ Future<Optional<Value>> ReadYourWritesTransaction::get(const Key& key, Snapshot
}
} else {
if (key == LiteralStringRef("\xff\xff/status/json")) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionRecord()) {
++tr.getDatabase()->transactionStatusRequests;
return getJSON(tr.getDatabase());
} else {
@ -1381,8 +1381,8 @@ Future<Optional<Value>> ReadYourWritesTransaction::get(const Key& key, Snapshot
if (key == LiteralStringRef("\xff\xff/cluster_file_path")) {
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(tr.getDatabase()->getConnectionFile()->getFilename());
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionRecord()) {
Optional<Value> output = StringRef(tr.getDatabase()->getConnectionRecord()->getLocation());
return output;
}
} catch (Error& e) {
@ -1393,8 +1393,8 @@ Future<Optional<Value>> ReadYourWritesTransaction::get(const Key& key, Snapshot
if (key == LiteralStringRef("\xff\xff/connection_string")) {
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = tr.getDatabase()->getConnectionFile();
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionRecord()) {
Reference<IClusterConnectionRecord> f = tr.getDatabase()->getConnectionRecord();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
}
@ -1454,8 +1454,8 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(KeySelector begin,
}
} else {
if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getWorkerInterfaces(tr.getDatabase()->getConnectionFile());
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionRecord()) {
return getWorkerInterfaces(tr.getDatabase()->getConnectionRecord());
} else {
return RangeResult();
}

View File

@ -126,7 +126,7 @@ class SimpleConfigTransactionImpl {
public:
SimpleConfigTransactionImpl(Database const& cx) : cx(cx) {
auto coordinators = cx->getConnectionFile()->getConnectionString().coordinators();
auto coordinators = cx->getConnectionRecord()->getConnectionString().coordinators();
std::sort(coordinators.begin(), coordinators.end());
cti = ConfigTransactionInterface(coordinators[0]);
}

View File

@ -27,6 +27,7 @@
#include <exception>
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ProcessInterface.h"
#include "fdbclient/GlobalConfig.actor.h"
@ -1590,8 +1591,7 @@ CoordinatorsImpl::CoordinatorsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {
Future<RangeResult> CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
RangeResult result;
KeyRef prefix(getKeyRange().begin);
// the constructor of ClusterConnectionFile already checks whether the file is valid
auto cs = ClusterConnectionFile(ryw->getDatabase()->getConnectionFile()->getFilename()).getConnectionString();
auto cs = ryw->getDatabase()->getConnectionRecord()->getConnectionString();
auto coordinator_processes = cs.coordinators();
Key cluster_decription_key = prefix.withSuffix(LiteralStringRef("cluster_description"));
if (kr.contains(cluster_decription_key)) {
@ -1737,7 +1737,10 @@ ACTOR static Future<RangeResult> CoordinatorsAutoImplActor(ReadYourWritesTransac
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
std::vector<NetworkAddress> _desiredCoordinators = wait(autoQuorumChange()->getDesiredCoordinators(
&tr, old.coordinators(), Reference<ClusterConnectionFile>(new ClusterConnectionFile(old)), result));
&tr,
old.coordinators(),
Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(old)),
result));
if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES) {
// we could get not_enough_machines if we happen to see the database while the cluster controller is updating

View File

@ -302,11 +302,11 @@ void JSONDoc::mergeValueInto(json_spirit::mValue& dst, const json_spirit::mValue
// Check if a quorum of coordination servers is reachable
// Will not throw, will just return non-present Optional if error
ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<ClusterConnectionFile> f,
ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<IClusterConnectionRecord> connRecord,
bool* quorum_reachable,
int* coordinatorsFaultTolerance) {
try {
state ClientCoordinators coord(f);
state ClientCoordinators coord(connRecord);
state StatusObject statusObj;
state std::vector<Future<Optional<LeaderInfo>>> leaderServers;
@ -365,14 +365,16 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<C
// Client section of the json output
// Will NOT throw, errors will be put into messages array
ACTOR Future<StatusObject> clientStatusFetcher(Reference<ClusterConnectionFile> f,
ACTOR Future<StatusObject> clientStatusFetcher(Reference<IClusterConnectionRecord> connRecord,
StatusArray* messages,
bool* quorum_reachable,
int* coordinatorsFaultTolerance) {
state StatusObject statusObj;
Optional<StatusObject> coordsStatusObj =
wait(clientCoordinatorsStatusFetcher(f, quorum_reachable, coordinatorsFaultTolerance));
state Optional<StatusObject> coordsStatusObj =
wait(clientCoordinatorsStatusFetcher(connRecord, quorum_reachable, coordinatorsFaultTolerance));
state bool contentsUpToDate = wait(connRecord->upToDate());
if (coordsStatusObj.present()) {
statusObj["coordinators"] = coordsStatusObj.get();
if (!*quorum_reachable)
@ -381,17 +383,17 @@ ACTOR Future<StatusObject> clientStatusFetcher(Reference<ClusterConnectionFile>
messages->push_back(makeMessage("status_incomplete_coordinators", "Could not fetch coordinator info."));
StatusObject statusObjClusterFile;
statusObjClusterFile["path"] = f->getFilename();
bool contentsUpToDate = f->fileContentsUpToDate();
statusObjClusterFile["path"] = connRecord->getLocation();
statusObjClusterFile["up_to_date"] = contentsUpToDate;
statusObj["cluster_file"] = statusObjClusterFile;
if (!contentsUpToDate) {
ClusterConnectionString storedConnectionString = wait(connRecord->getStoredConnectionString());
std::string description = "Cluster file contents do not match current cluster connection string.";
description += "\nThe file contains the connection string: ";
description += ClusterConnectionFile(f->getFilename()).getConnectionString().toString().c_str();
description += storedConnectionString.toString().c_str();
description += "\nThe current connection string is: ";
description += f->getConnectionString().toString().c_str();
description += connRecord->getConnectionString().toString().c_str();
description += "\nVerify the cluster file and its parent directory are writable and that the cluster file has "
"not been overwritten externally. To change coordinators without manual intervention, the "
"cluster file and its containing folder must be writable by all servers and clients. If a "
@ -491,7 +493,7 @@ StatusObject getClientDatabaseStatus(StatusObjectReader client, StatusObjectRead
return databaseStatus;
}
ACTOR Future<StatusObject> statusFetcherImpl(Reference<ClusterConnectionFile> f,
ACTOR Future<StatusObject> statusFetcherImpl(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface) {
if (!g_network)
throw network_not_setup();
@ -508,7 +510,7 @@ ACTOR Future<StatusObject> statusFetcherImpl(Reference<ClusterConnectionFile> f,
state int64_t clientTime = g_network->timer();
StatusObject _statusObjClient =
wait(clientStatusFetcher(f, &clientMessages, &quorum_reachable, &coordinatorsFaultTolerance));
wait(clientStatusFetcher(connRecord, &clientMessages, &quorum_reachable, &coordinatorsFaultTolerance));
statusObjClient = _statusObjClient;
if (clientTime != -1)
@ -598,7 +600,7 @@ ACTOR Future<StatusObject> statusFetcherImpl(Reference<ClusterConnectionFile> f,
}
ACTOR Future<Void> timeoutMonitorLeader(Database db) {
state Future<Void> leadMon = monitorLeader<ClusterInterface>(db->getConnectionFile(), db->statusClusterInterface);
state Future<Void> leadMon = monitorLeader<ClusterInterface>(db->getConnectionRecord(), db->statusClusterInterface);
loop {
wait(delay(CLIENT_KNOBS->STATUS_IDLE_TIMEOUT + 0.00001 + db->lastStatusFetch - now()));
if (now() - db->lastStatusFetch > CLIENT_KNOBS->STATUS_IDLE_TIMEOUT) {
@ -615,5 +617,5 @@ Future<StatusObject> StatusClient::statusFetcher(Database db) {
db->statusLeaderMon = timeoutMonitorLeader(db);
}
return statusFetcherImpl(db->getConnectionFile(), db->statusClusterInterface);
return statusFetcherImpl(db->getConnectionRecord(), db->statusClusterInterface);
}

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/ThreadSafeTransaction.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/versions.h"

View File

@ -27,6 +27,7 @@
#include "fdbrpc/FailureMonitor.h"
#include "flow/ActorCollection.h"
#include "flow/SystemMonitor.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/CoordinationInterface.h"
@ -5299,7 +5300,7 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
}
}
ACTOR Future<Void> clusterController(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
Future<Void> recoveredDiskFiles,
@ -5309,7 +5310,7 @@ ACTOR Future<Void> clusterController(Reference<ClusterConnectionFile> connFile,
state bool hasConnected = false;
loop {
try {
ServerCoordinators coordinators(connFile);
ServerCoordinators coordinators(connRecord);
wait(clusterController(coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType));
} catch (Error& e) {
if (e.code() != error_code_coordinators_changed)
@ -5328,7 +5329,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
state ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
ServerCoordinators(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
state NetworkAddress workerAddress(IPAddress(0x01010101), 1);
state NetworkAddress badPeer1(IPAddress(0x02020202), 1);
state NetworkAddress badPeer2(IPAddress(0x03030303), 1);
@ -5386,7 +5388,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateRecoveredWorkers") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
ServerCoordinators(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
NetworkAddress worker1(IPAddress(0x01010101), 1);
NetworkAddress worker2(IPAddress(0x11111111), 1);
NetworkAddress badPeer1(IPAddress(0x02020202), 1);
@ -5422,7 +5425,8 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
ServerCoordinators(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
NetworkAddress worker(IPAddress(0x01010101), 1);
NetworkAddress badPeer1(IPAddress(0x02020202), 1);
NetworkAddress badPeer2(IPAddress(0x03030303), 1);
@ -5523,7 +5527,8 @@ TEST_CASE("/fdbserver/clustercontroller/recentRecoveryCountDueToHealth") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
ServerCoordinators(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
ASSERT_EQ(data.recentRecoveryCountDueToHealth(), 0);
@ -5543,7 +5548,8 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
ServerCoordinators(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
NetworkAddress master(IPAddress(0x01010101), 1);
NetworkAddress tlog(IPAddress(0x02020202), 1);
NetworkAddress satelliteTlog(IPAddress(0x03030303), 1);
@ -5647,7 +5653,8 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
ServerCoordinators(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(ClusterConnectionString()))));
NetworkAddress master(IPAddress(0x01010101), 1);
NetworkAddress tlog(IPAddress(0x02020202), 1);
NetworkAddress satelliteTlog(IPAddress(0x03030303), 1);

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbserver/CoordinatedState.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/Knobs.h"
@ -288,8 +289,7 @@ struct MovableCoordinatedStateImpl {
// reached the point where a leader elected by the new coordinators should be doing the rest of the work
// (and therefore the caller should die).
state CoordinatedState cs(self->coordinators);
state CoordinatedState nccs(
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile(nc))));
state CoordinatedState nccs(ServerCoordinators(makeReference<ClusterConnectionMemoryRecord>(nc)));
state Future<Void> creationTimeout = delay(30);
ASSERT(self->lastValue.present() && self->lastCSValue.present());
TraceEvent("StartMove").detail("ConnectionString", nc.toString());
@ -306,7 +306,7 @@ struct MovableCoordinatedStateImpl {
when(wait(nccs.setExclusive(
BinaryWriter::toValue(MovableValue(self->lastValue.get(),
MovableValue::MovingFrom,
self->coordinators.ccf->getConnectionString().toString()),
self->coordinators.ccr->getConnectionString().toString()),
IncludeVersion(ProtocolVersion::withMovableCoordinatedStateV2()))))) {}
}

View File

@ -95,8 +95,8 @@ LeaderElectionRegInterface::LeaderElectionRegInterface(INetwork* local) : Client
forward.makeWellKnownEndpoint(WLTOKEN_LEADERELECTIONREG_FORWARD, TaskPriority::Coordination);
}
ServerCoordinators::ServerCoordinators(Reference<ClusterConnectionFile> cf) : ClientCoordinators(cf) {
ClusterConnectionString cs = ccf->getConnectionString();
ServerCoordinators::ServerCoordinators(Reference<IClusterConnectionRecord> ccr) : ClientCoordinators(ccr) {
ClusterConnectionString cs = ccr->getConnectionString();
for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) {
leaderElectionServers.emplace_back(*s);
stateServers.emplace_back(*s);
@ -588,7 +588,7 @@ StringRef getClusterDescriptor(Key key) {
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
OnDemandStore* pStore,
UID id,
Reference<ClusterConnectionFile> ccf) {
Reference<IClusterConnectionRecord> ccr) {
state LeaderRegisterCollection regs(pStore);
state ActorCollection forwarders(false);
@ -609,12 +609,12 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
info.forward = forward.get().serializedInfo;
req.reply.send(CachedSerialization<ClientDBInfo>(info));
} else {
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
StringRef clusterName = ccr->getConnectionString().clusterKeyName();
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
getClusterDescriptor(req.clusterKey).compare(clusterName)) {
TraceEvent(SevWarn, "CCFMismatch")
TraceEvent(SevWarn, "CCRMismatch")
.detail("RequestType", "OpenDatabaseCoordRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("LocalCS", ccr->getConnectionString().toString())
.detail("IncomingClusterKey", req.clusterKey)
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
req.reply.sendError(wrong_connection_file());
@ -628,13 +628,13 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
if (forward.present()) {
req.reply.send(forward.get());
} else {
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
StringRef clusterName = ccr->getConnectionString().clusterKeyName();
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && getClusterDescriptor(req.key).compare(clusterName)) {
TraceEvent(SevWarn, "CCFMismatch")
TraceEvent(SevWarn, "CCRMismatch")
.detail("RequestType", "ElectionResultRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("LocalCS", ccr->getConnectionString().toString())
.detail("IncomingClusterKey", req.key)
.detail("ClusterKey", ccf->getConnectionString().clusterKey())
.detail("ClusterKey", ccr->getConnectionString().clusterKey())
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
req.reply.sendError(wrong_connection_file());
} else {
@ -647,13 +647,13 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
if (forward.present())
req.reply.send(forward.get());
else {
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
StringRef clusterName = ccr->getConnectionString().clusterKeyName();
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && getClusterDescriptor(req.key).compare(clusterName)) {
TraceEvent(SevWarn, "CCFMismatch")
TraceEvent(SevWarn, "CCRMismatch")
.detail("RequestType", "GetLeaderRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("LocalCS", ccr->getConnectionString().toString())
.detail("IncomingClusterKey", req.key)
.detail("ClusterKey", ccf->getConnectionString().clusterKey());
.detail("ClusterKey", ccr->getConnectionString().clusterKey());
req.reply.sendError(wrong_connection_file());
} else {
regs.getInterface(req.key, id).getLeader.send(req);
@ -665,11 +665,11 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
if (forward.present())
req.reply.send(forward.get());
else {
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
StringRef clusterName = ccr->getConnectionString().clusterKeyName();
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && getClusterDescriptor(req.key).compare(clusterName)) {
TraceEvent(SevWarn, "CCFMismatch")
TraceEvent(SevWarn, "CCRMismatch")
.detail("RequestType", "CandidacyRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("LocalCS", ccr->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
req.reply.sendError(wrong_connection_file());
} else {
@ -682,11 +682,11 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
if (forward.present())
req.reply.send(LeaderHeartbeatReply{ false });
else {
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
StringRef clusterName = ccr->getConnectionString().clusterKeyName();
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && getClusterDescriptor(req.key).compare(clusterName)) {
TraceEvent(SevWarn, "CCFMismatch")
TraceEvent(SevWarn, "CCRMismatch")
.detail("RequestType", "LeaderHeartbeatRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("LocalCS", ccr->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
req.reply.sendError(wrong_connection_file());
} else {
@ -699,11 +699,11 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
if (forward.present())
req.reply.send(Void());
else {
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
StringRef clusterName = ccr->getConnectionString().clusterKeyName();
if (!SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT && getClusterDescriptor(req.key).compare(clusterName)) {
TraceEvent(SevWarn, "CCFMismatch")
TraceEvent(SevWarn, "CCRMismatch")
.detail("RequestType", "ForwardRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("LocalCS", ccr->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
req.reply.sendError(wrong_connection_file());
} else {
@ -721,7 +721,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
ACTOR Future<Void> coordinationServer(std::string dataFolder,
Reference<ClusterConnectionFile> ccf,
Reference<IClusterConnectionRecord> ccr,
ConfigDBType configDBType) {
state UID myID = deterministicRandom()->randomUniqueID();
state LeaderElectionRegInterface myLeaderInterface(g_network);
@ -744,7 +744,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder,
}
try {
wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID, ccf) ||
wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID, ccr) ||
store.getError() || configDatabaseServer);
throw internal_error();
} catch (Error& e) {

View File

@ -214,7 +214,7 @@ struct ForwardRequest {
class ServerCoordinators : public ClientCoordinators {
public:
explicit ServerCoordinators(Reference<ClusterConnectionFile>);
explicit ServerCoordinators(Reference<IClusterConnectionRecord>);
std::vector<LeaderElectionRegInterface> leaderElectionServers;
std::vector<GenerationRegInterface> stateServers;
@ -222,7 +222,7 @@ public:
};
Future<Void> coordinationServer(std::string const& dataFolder,
Reference<ClusterConnectionFile> const& ccf,
Reference<IClusterConnectionRecord> const& ccf,
ConfigDBType const&);
#endif

View File

@ -137,21 +137,21 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
if (!hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection")
.detail("Filename", coordinators.ccf->getFilename())
.detail("ConnectionStringFromFile", coordinators.ccf->getConnectionString().toString())
.detail("ClusterFile", coordinators.ccr->toString())
.detail("StoredConnectionString", coordinators.ccr->getConnectionString().toString())
.detail("CurrentConnectionString", leader.get().first.serializedInfo.toString());
}
coordinators.ccf->setConnectionString(
coordinators.ccr->setConnectionString(
ClusterConnectionString(leader.get().first.serializedInfo.toString()));
TraceEvent("LeaderForwarding")
.detail("ConnStr", coordinators.ccf->getConnectionString().toString())
.detail("ConnStr", coordinators.ccr->getConnectionString().toString())
.trackLatest("LeaderForwarding");
throw coordinators_changed();
}
if (leader.present() && leader.get().second) {
hasConnected = true;
coordinators.ccf->notifyConnected();
coordinators.ccr->notifyConnected();
}
if (leader.present() && leader.get().second && leader.get().first.equalInternalId(myInfo)) {

View File

@ -406,11 +406,11 @@ ACTOR Future<Void> _restoreWorker(Database cx, LocalityData locality) {
return Void();
}
ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> restoreWorker(Reference<IClusterConnectionRecord> connRecord,
LocalityData locality,
std::string coordFolder) {
try {
Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST, IsInternal::True, locality);
Database cx = Database::createDatabase(connRecord, Database::API_VERSION_LATEST, IsInternal::True, locality);
wait(reportErrors(_restoreWorker(cx, locality), "RestoreWorker"));
} catch (Error& e) {
TraceEvent("FastRestoreWorker").detail("Error", e.what());

View File

@ -711,7 +711,9 @@ std::string getRoleStr(RestoreRole role);
////--- Interface functions
ACTOR Future<Void> _restoreWorker(Database cx, LocalityData locality);
ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> ccf, LocalityData locality, std::string coordFolder);
ACTOR Future<Void> restoreWorker(Reference<IClusterConnectionRecord> ccr,
LocalityData locality,
std::string coordFolder);
extern const KeyRef restoreLeaderKey;
extern const KeyRangeRef restoreWorkersKeys;

View File

@ -26,6 +26,8 @@
#include <toml.hpp>
#include "fdbrpc/Locality.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -387,7 +389,7 @@ T simulate(const T& in) {
return out;
}
ACTOR Future<Void> runBackup(Reference<ClusterConnectionFile> connFile) {
ACTOR Future<Void> runBackup(Reference<IClusterConnectionRecord> connRecord) {
state std::vector<Future<Void>> agentFutures;
while (g_simulator.backupAgents == ISimulator::BackupAgentType::WaitForType) {
@ -395,7 +397,7 @@ ACTOR Future<Void> runBackup(Reference<ClusterConnectionFile> connFile) {
}
if (g_simulator.backupAgents == ISimulator::BackupAgentType::BackupToFile) {
Database cx = Database::createDatabase(connFile, -1);
Database cx = Database::createDatabase(connRecord, -1);
state FileBackupAgent fileAgent;
agentFutures.push_back(fileAgent.run(
@ -414,7 +416,7 @@ ACTOR Future<Void> runBackup(Reference<ClusterConnectionFile> connFile) {
throw internal_error();
}
ACTOR Future<Void> runDr(Reference<ClusterConnectionFile> connFile) {
ACTOR Future<Void> runDr(Reference<IClusterConnectionRecord> connRecord) {
state std::vector<Future<Void>> agentFutures;
while (g_simulator.drAgents == ISimulator::BackupAgentType::WaitForType) {
@ -422,13 +424,13 @@ ACTOR Future<Void> runDr(Reference<ClusterConnectionFile> connFile) {
}
if (g_simulator.drAgents == ISimulator::BackupAgentType::BackupToDB) {
Database cx = Database::createDatabase(connFile, -1);
Database cx = Database::createDatabase(connRecord, -1);
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
state Database extraDB = Database::createDatabase(extraFile, -1);
TraceEvent("StartingDrAgents")
.detail("ConnFile", connFile->getConnectionString().toString())
.detail("ConnectionString", connRecord->getConnectionString().toString())
.detail("ExtraString", extraFile->getConnectionString().toString());
state DatabaseBackupAgent dbAgent = DatabaseBackupAgent(cx);
@ -459,7 +461,7 @@ enum AgentMode { AgentNone = 0, AgentOnly = 1, AgentAddition = 2 };
// SOMEDAY: when a process can be rebooted in isolation from the other on that machine,
// a loop{} will be needed around the waiting on simulatedFDBD(). For now this simply
// takes care of house-keeping such as context switching and file closing.
ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnectionFile> connFile,
ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConnectionRecord> connRecord,
IPAddress ip,
bool sslEnabled,
uint16_t port,
@ -525,7 +527,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
.detail("Version", FDB_VT_VERSION)
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detail("DataFolder", *dataFolder)
.detail("ConnectionString", connFile ? connFile->getConnectionString().toString() : "")
.detail("ConnectionString", connRecord ? connRecord->getConnectionString().toString() : "")
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(nullptr))
.detail("CommandLine", "fdbserver -r simulation")
.detail("BuggifyEnabled", isBuggifyEnabled(BuggifyType::General))
@ -546,7 +548,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
futures.push_back(FlowTransport::transport().bind(n, n));
}
if (runBackupAgents != AgentOnly) {
futures.push_back(fdbd(connFile,
futures.push_back(fdbd(connRecord,
localities,
processClass,
*dataFolder,
@ -561,8 +563,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
configDBType));
}
if (runBackupAgents != AgentNone) {
futures.push_back(runBackup(connFile));
futures.push_back(runDr(connFile));
futures.push_back(runBackup(connRecord));
futures.push_back(runDr(connRecord));
}
futures.push_back(success(onShutdown));
@ -666,9 +668,9 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
if (!useSeedFile) {
writeFile(joinPath(*dataFolder, "fdb.cluster"), connStr.toString());
connFile = makeReference<ClusterConnectionFile>(joinPath(*dataFolder, "fdb.cluster"));
connRecord = makeReference<ClusterConnectionFile>(joinPath(*dataFolder, "fdb.cluster"));
} else {
connFile =
connRecord =
makeReference<ClusterConnectionFile>(joinPath(*dataFolder, "fdb.cluster"), connStr.toString());
}
} else {
@ -747,9 +749,9 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
state std::vector<Future<ISimulator::KillType>> processes;
for (int i = 0; i < ips.size(); i++) {
std::string path = joinPath(myFolders[i], "fdb.cluster");
Reference<ClusterConnectionFile> clusterFile(useSeedFile
? new ClusterConnectionFile(path, connStr.toString())
: new ClusterConnectionFile(path));
Reference<IClusterConnectionRecord> clusterFile(
useSeedFile ? new ClusterConnectionFile(path, connStr.toString())
: new ClusterConnectionFile(path));
const int listenPort = i * listenPerProcess + 1;
AgentMode agentMode =
runBackupAgents == AgentOnly ? (i == ips.size() - 1 ? AgentOnly : AgentNone) : runBackupAgents;
@ -2196,7 +2198,7 @@ ACTOR void setupAndRun(std::string dataFolder,
bool restoring,
std::string whitelistBinPaths) {
state std::vector<Future<Void>> systemActors;
state Optional<ClusterConnectionString> connFile;
state Optional<ClusterConnectionString> connectionString;
state Standalone<StringRef> startingConfiguration;
state int testerCount = 1;
state TestConfig testConfig;
@ -2258,7 +2260,7 @@ ACTOR void setupAndRun(std::string dataFolder,
wait(timeoutError(restartSimulatedSystem(&systemActors,
dataFolder,
&testerCount,
&connFile,
&connectionString,
&startingConfiguration,
testConfig,
whitelistBinPaths,
@ -2273,7 +2275,7 @@ ACTOR void setupAndRun(std::string dataFolder,
setupSimulatedSystem(&systemActors,
dataFolder,
&testerCount,
&connFile,
&connectionString,
&startingConfiguration,
whitelistBinPaths,
testConfig,
@ -2282,7 +2284,7 @@ ACTOR void setupAndRun(std::string dataFolder,
}
std::string clusterFileDir = joinPath(dataFolder, deterministicRandom()->randomUniqueID().toString());
platform::createDirectory(clusterFileDir);
writeFile(joinPath(clusterFileDir, "fdb.cluster"), connFile.get().toString());
writeFile(joinPath(clusterFileDir, "fdb.cluster"), connectionString.get().toString());
wait(timeoutError(runTests(makeReference<ClusterConnectionFile>(joinPath(clusterFileDir, "fdb.cluster")),
TEST_TYPE_FROM_FILE,
TEST_ON_TESTERS,

View File

@ -813,7 +813,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
}
}
for (auto& coordinator : coordinators.ccf->getConnectionString().coordinators()) {
for (auto& coordinator : coordinators.ccr->getConnectionString().coordinators()) {
roles.addCoordinatorRole(coordinator);
}
@ -2423,7 +2423,7 @@ static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration confi
workerZones[worker.interf.address()] = worker.interf.locality.zoneId().orDefault(LiteralStringRef(""));
}
std::map<StringRef, int> coordinatorZoneCounts;
for (auto& coordinator : coordinators.ccf->getConnectionString().coordinators()) {
for (auto& coordinator : coordinators.ccr->getConnectionString().coordinators()) {
auto zone = workerZones[coordinator];
coordinatorZoneCounts[zone] += 1;
}
@ -2806,7 +2806,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state JsonBuilderObject data_overlay;
statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version());
statusObj["connection_string"] = coordinators.ccf->getConnectionString().toString();
statusObj["connection_string"] = coordinators.ccr->getConnectionString().toString();
statusObj["bounce_impact"] = getBounceImpactInfo(statusCode);
state Optional<DatabaseConfiguration> configuration;

View File

@ -113,14 +113,14 @@ struct TesterInterface {
};
ACTOR Future<Void> testerServerCore(TesterInterface interf,
Reference<ClusterConnectionFile> ccf,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> serverDBInfo,
LocalityData locality);
enum test_location_t { TEST_HERE, TEST_ON_SERVERS, TEST_ON_TESTERS };
enum test_type_t { TEST_TYPE_FROM_FILE, TEST_TYPE_CONSISTENCY_CHECK, TEST_TYPE_UNIT_TESTS };
ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
test_type_t whatToRun,
test_location_t whereToRun,
int minTestersExpected,

View File

@ -884,7 +884,7 @@ ACTOR Future<Void> extractClusterInterface(
Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>> const> in,
Reference<AsyncVar<Optional<struct ClusterInterface>>> out);
ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> ccf,
ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> ccr,
LocalityData localities,
ProcessClass processClass,
std::string dataFolder,
@ -898,7 +898,7 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> ccf,
std::map<std::string, std::string> manualKnobOverrides,
ConfigDBType configDBType);
ACTOR Future<Void> clusterController(Reference<ClusterConnectionFile> ccf,
ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
Future<Void> recoveredDiskFiles,
@ -922,8 +922,8 @@ ACTOR Future<Void> storageServer(
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<ClusterConnectionFile>
connFile); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
Reference<IClusterConnectionRecord>
connRecord); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,

View File

@ -36,6 +36,7 @@
#include <boost/interprocess/managed_shared_memory.hpp>
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/IKnobCollection.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
@ -805,7 +806,7 @@ Optional<bool> checkBuggifyOverride(const char* testFile) {
// Takes a vector of public and listen address strings given via command line, and returns vector of NetworkAddress
// objects.
std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
const ClusterConnectionFile& connectionFile,
const IClusterConnectionRecord& connectionRecord,
const std::vector<std::string>& publicAddressStrs,
std::vector<std::string>& listenAddressStrs) {
if (listenAddressStrs.size() > 0 && publicAddressStrs.size() != listenAddressStrs.size()) {
@ -823,7 +824,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
NetworkAddressList publicNetworkAddresses;
NetworkAddressList listenNetworkAddresses;
auto& coordinators = connectionFile.getConnectionString().coordinators();
auto& coordinators = connectionRecord.getConnectionString().coordinators();
ASSERT(coordinators.size() > 0);
for (int ii = 0; ii < publicAddressStrs.size(); ++ii) {
@ -833,7 +834,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
if (autoPublicAddress) {
try {
const NetworkAddress& parsedAddress = NetworkAddress::parse("0.0.0.0:" + publicAddressStr.substr(5));
const IPAddress publicIP = determinePublicIPAutomatically(connectionFile.getConnectionString());
const IPAddress publicIP = determinePublicIPAutomatically(connectionRecord.getConnectionString());
currentPublicAddress = NetworkAddress(publicIP, parsedAddress.port, true, parsedAddress.isTLS());
} catch (Error& e) {
fprintf(stderr,
@ -998,7 +999,7 @@ struct CLIOptions {
std::string configPath;
ConfigDBType configDBType{ ConfigDBType::DISABLED };
Reference<ClusterConnectionFile> connectionFile;
Reference<IClusterConnectionRecord> connectionFile;
Standalone<StringRef> machineId;
UnitTestParameters testParams;
@ -1849,7 +1850,7 @@ int main(int argc, char* argv[]) {
.detail("FileSystem", opts.fileSystemPath)
.detail("DataFolder", opts.dataFolder)
.detail("WorkingDirectory", cwd)
.detail("ClusterFile", opts.connectionFile ? opts.connectionFile->getFilename().c_str() : "")
.detail("ClusterFile", opts.connectionFile ? opts.connectionFile->toString() : "")
.detail("ConnectionString",
opts.connectionFile ? opts.connectionFile->getConnectionString().toString() : "")
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(nullptr))

View File

@ -1867,7 +1867,7 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
tr.set(
recoveryCommitRequest.arena, primaryLocalityKey, BinaryWriter::toValue(self->primaryLocality, Unversioned()));
tr.set(recoveryCommitRequest.arena, backupVersionKey, backupVersionValue);
tr.set(recoveryCommitRequest.arena, coordinatorsKey, self->coordinators.ccf->getConnectionString().toString());
tr.set(recoveryCommitRequest.arena, coordinatorsKey, self->coordinators.ccr->getConnectionString().toString());
tr.set(recoveryCommitRequest.arena, logsKey, self->logSystem->getLogsValue());
tr.set(recoveryCommitRequest.arena,
primaryDatacenterKey,

View File

@ -5211,13 +5211,13 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
return false;
}
ACTOR Future<Void> memoryStoreRecover(IKeyValueStore* store, Reference<ClusterConnectionFile> connFile, UID id) {
if (store->getType() != KeyValueStoreType::MEMORY || connFile.getPtr() == nullptr) {
ACTOR Future<Void> memoryStoreRecover(IKeyValueStore* store, Reference<IClusterConnectionRecord> connRecord, UID id) {
if (store->getType() != KeyValueStoreType::MEMORY || connRecord.getPtr() == nullptr) {
return Never();
}
// create a temp client connect to DB
Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST);
Database cx = Database::createDatabase(connRecord, Database::API_VERSION_LATEST);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state int noCanRemoveCount = 0;
@ -5451,7 +5451,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<ClusterConnectionFile> connFile) {
Reference<IClusterConnectionRecord> connRecord) {
state StorageServer self(persistentData, db, ssi);
self.folder = folder;
@ -5465,7 +5465,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
// for memory storage engine type, wait until recovery is done before commit
when(wait(self.storage.commit())) {}
when(wait(memoryStoreRecover(persistentData, connFile, self.thisServerID))) {
when(wait(memoryStoreRecover(persistentData, connRecord, self.thisServerID))) {
TraceEvent("DisposeStorageServer", self.thisServerID).log();
throw worker_removed();
}

View File

@ -599,7 +599,7 @@ ACTOR Future<Void> runWorkloadAsync(Database cx,
}
ACTOR Future<Void> testerServerWorkload(WorkloadRequest work,
Reference<ClusterConnectionFile> ccf,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> dbInfo,
LocalityData locality) {
state WorkloadInterface workIface;
@ -614,7 +614,7 @@ ACTOR Future<Void> testerServerWorkload(WorkloadRequest work,
startRole(Role::TESTER, workIface.id(), UID(), details);
if (work.useDatabase) {
cx = Database::createDatabase(ccf, -1, IsInternal::True, locality);
cx = Database::createDatabase(ccr, -1, IsInternal::True, locality);
wait(delay(1.0));
}
@ -658,7 +658,7 @@ ACTOR Future<Void> testerServerWorkload(WorkloadRequest work,
}
ACTOR Future<Void> testerServerCore(TesterInterface interf,
Reference<ClusterConnectionFile> ccf,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> dbInfo,
LocalityData locality) {
state PromiseStream<Future<Void>> addWorkload;
@ -668,7 +668,7 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
loop choose {
when(wait(workerFatalError)) {}
when(WorkloadRequest work = waitNext(interf.recruitments.getFuture())) {
addWorkload.send(testerServerWorkload(work, ccf, dbInfo, locality));
addWorkload.send(testerServerWorkload(work, ccr, dbInfo, locality));
}
}
}
@ -1583,8 +1583,8 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
* functionality. Its main purpose is to generate the test specification from passed arguments and then call into the
* correct actor which will orchestrate the actual test.
*
* \param connFile A cluster connection file. Not all tests require a functional cluster but all tests require
* a cluster file.
* \param connRecord A cluster connection record. Not all tests require a functional cluster but all tests require
* a cluster record.
* \param whatToRun TEST_TYPE_FROM_FILE to read the test description from a passed toml file or
* TEST_TYPE_CONSISTENCY_CHECK to generate a test spec for consistency checking
* \param at TEST_HERE: this process will act as a test client and execute the given workload. TEST_ON_SERVERS: Run a
@ -1600,7 +1600,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
*
* \returns A future which will be set after all tests finished.
*/
ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
test_type_t whatToRun,
test_location_t at,
int minTestersExpected,
@ -1612,8 +1612,8 @@ ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
auto cc = makeReference<AsyncVar<Optional<ClusterControllerFullInterface>>>();
auto ci = makeReference<AsyncVar<Optional<ClusterInterface>>>();
std::vector<Future<Void>> actors;
if (connFile) {
actors.push_back(reportErrors(monitorLeader(connFile, cc), "MonitorLeader"));
if (connRecord) {
actors.push_back(reportErrors(monitorLeader(connRecord, cc), "MonitorLeader"));
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface"));
}
@ -1688,7 +1688,7 @@ ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
std::vector<TesterInterface> iTesters(1);
actors.push_back(
reportErrors(monitorServerDBInfo(cc, LocalityData(), db), "MonitorServerDBInfo")); // FIXME: Locality
actors.push_back(reportErrors(testerServerCore(iTesters[0], connFile, db, locality), "TesterServerCore"));
actors.push_back(reportErrors(testerServerCore(iTesters[0], connRecord, db, locality), "TesterServerCore"));
tests = runTests(cc, ci, iTesters, testSpecs, startingConfiguration, locality);
} else {
tests = reportErrors(runTests(cc, ci, testSpecs, at, minTestersExpected, startingConfiguration, locality),

View File

@ -518,7 +518,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<ClusterConnectionFile> connFile,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<std::set<std::string>> const> issues,
Reference<LocalConfiguration> localConfig) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
@ -532,6 +532,16 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
state Future<Void> cacheErrorsFuture;
state Optional<double> incorrectTime;
loop {
state ClusterConnectionString storedConnectionString;
state bool upToDate = true;
if (connRecord) {
bool upToDateResult = wait(connRecord->upToDate(storedConnectionString));
upToDate = upToDateResult;
}
if (upToDate) {
incorrectTime = Optional<double>();
}
RegisterWorkerRequest request(interf,
initialClass,
processClass,
@ -542,28 +552,25 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
degraded->get(),
localConfig->lastSeenVersion(),
localConfig->configClassSet());
for (auto const& i : issues->get()) {
request.issues.push_back_deep(request.issues.arena(), i);
}
ClusterConnectionString fileConnectionString;
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
if (!upToDate) {
request.issues.push_back_deep(request.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connFile->getConnectionString().toString();
std::string connectionString = connRecord->getConnectionString().toString();
if (!incorrectTime.present()) {
incorrectTime = now();
}
if (connFile->canGetFilename()) {
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the
// file right before us)
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", fileConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
}
} else {
incorrectTime = Optional<double>();
}
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing
// the file right before us)
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", storedConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
}
auto peers = FlowTransport::transport().getIncompatiblePeers();
for (auto it = peers->begin(); it != peers->end();) {
if (now() - it->second.second > FLOW_KNOBS->INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING) {
@ -1095,7 +1102,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
DUMPTOKEN(recruited.getKeyValuesStream);
prevStorageServer =
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<ClusterConnectionFile>(nullptr));
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<IClusterConnectionRecord>(nullptr));
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed());
}
}
@ -1308,7 +1315,7 @@ struct SharedLogsValue {
: actor(actor), uid(uid), requests(requests) {}
};
ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
LocalityData locality,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
@ -1383,7 +1390,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
errorForwarders.add(loadedPonger(interf.debugPing.getFuture()));
errorForwarders.add(waitFailureServer(interf.waitFailure.getFuture()));
errorForwarders.add(monitorTraceLogIssues(issues));
errorForwarders.add(testerServerCore(interf.testerInterface, connFile, dbInfo, locality));
errorForwarders.add(testerServerCore(interf.testerInterface, connRecord, dbInfo, locality));
errorForwarders.add(monitorHighMemory(memoryProfileThreshold));
filesClosed.add(stopping.getFuture());
@ -1464,7 +1471,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
DUMPTOKEN(recruited.getKeyValuesStream);
Promise<Void> recovery;
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connFile);
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord);
recoveries.push_back(recovery.getFuture());
f = handleIOErrors(f, kv, s.storeID, kvClosed);
f = storageServerRollbackRebooter(&runningStorages,
@ -1590,7 +1597,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
ddInterf,
rkInterf,
degraded,
connFile,
connRecord,
issues,
localConfig));
@ -1705,7 +1712,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
// printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer(
recruited, dbInfo, ccInterface, ServerCoordinators(connFile), req.lifetime, req.forceRecovery);
recruited, dbInfo, ccInterface, ServerCoordinators(connRecord), req.lifetime, req.forceRecovery);
errorForwarders.add(
zombie(recruited, forwardError(errors, Role::MASTER, recruited.id(), masterProcess)));
req.reply.send(recruited);
@ -2293,10 +2300,10 @@ ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
}
ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGeneration(
Reference<ClusterConnectionFile> connFile,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> result,
MonitorLeaderInfo info) {
state ClusterConnectionString ccf = info.intermediateConnFile->getConnectionString();
state ClusterConnectionString ccf = info.intermediateConnRecord->getConnectionString();
state std::vector<NetworkAddress> addrs = ccf.coordinators();
state ElectionResultRequest request;
state int index = 0;
@ -2314,24 +2321,24 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
if (leader.present()) {
if (leader.get().present()) {
if (leader.get().get().forward) {
info.intermediateConnFile = makeReference<ClusterConnectionFile>(
connFile->getFilename(), ClusterConnectionString(leader.get().get().serializedInfo.toString()));
info.intermediateConnRecord = connRecord->makeIntermediateRecord(
ClusterConnectionString(leader.get().get().serializedInfo.toString()));
return info;
}
if (connFile != info.intermediateConnFile) {
if (connRecord != info.intermediateConnRecord) {
if (!info.hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", connRecord->getConnectionString().toString())
.detail("CurrentConnectionString",
info.intermediateConnFile->getConnectionString().toString());
info.intermediateConnRecord->getConnectionString().toString());
}
connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
info.intermediateConnFile = connFile;
connRecord->setConnectionString(info.intermediateConnRecord->getConnectionString());
info.intermediateConnRecord = connRecord;
}
info.hasConnected = true;
connFile->notifyConnected();
connRecord->notifyConnected();
request.knownLeader = leader.get().get().changeID;
ClusterControllerPriorityInfo info = leader.get().get().getPriorityInfo();
@ -2354,35 +2361,35 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
}
}
ACTOR Future<Void> monitorLeaderWithDelayedCandidacyImplInternal(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> monitorLeaderWithDelayedCandidacyImplInternal(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> outSerializedLeaderInfo) {
state MonitorLeaderInfo info(connFile);
state MonitorLeaderInfo info(connRecord);
loop {
MonitorLeaderInfo _info =
wait(monitorLeaderWithDelayedCandidacyImplOneGeneration(connFile, outSerializedLeaderInfo, info));
wait(monitorLeaderWithDelayedCandidacyImplOneGeneration(connRecord, outSerializedLeaderInfo, info));
info = _info;
}
}
template <class LeaderInterface>
Future<Void> monitorLeaderWithDelayedCandidacyImpl(
Reference<ClusterConnectionFile> const& connFile,
Reference<IClusterConnectionRecord> const& connRecord,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader) {
LeaderDeserializer<LeaderInterface> deserializer;
auto serializedInfo = makeReference<AsyncVar<Value>>();
Future<Void> m = monitorLeaderWithDelayedCandidacyImplInternal(connFile, serializedInfo);
Future<Void> m = monitorLeaderWithDelayedCandidacyImplInternal(connRecord, serializedInfo);
return m || deserializer(serializedInfo, outKnownLeader);
}
ACTOR Future<Void> monitorLeaderWithDelayedCandidacy(
Reference<ClusterConnectionFile> connFile,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
Future<Void> recoveredDiskFiles,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
ConfigDBType configDBType) {
state Future<Void> monitor = monitorLeaderWithDelayedCandidacyImpl(connFile, currentCC);
state Future<Void> monitor = monitorLeaderWithDelayedCandidacyImpl(connRecord, currentCC);
state Future<Void> timeout;
wait(recoveredDiskFiles);
@ -2408,7 +2415,7 @@ ACTOR Future<Void> monitorLeaderWithDelayedCandidacy(
when(wait(timeout.isValid() ? timeout : Never())) {
monitor.cancel();
wait(clusterController(
connFile, currentCC, asyncPriorityInfo, recoveredDiskFiles, locality, configDBType));
connRecord, currentCC, asyncPriorityInfo, recoveredDiskFiles, locality, configDBType));
return Void();
}
}
@ -2458,7 +2465,7 @@ ACTOR Future<Void> serveProcess() {
}
}
ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
LocalityData localities,
ProcessClass processClass,
std::string dataFolder,
@ -2489,7 +2496,7 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
actors.push_back(serveProcess());
try {
ServerCoordinators coordinators(connFile);
ServerCoordinators coordinators(connRecord);
if (g_network->isSimulated()) {
whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,";
}
@ -2506,7 +2513,7 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
if (coordFolder.size()) {
// SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up
// their files
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder, coordinators.ccf, configDBType)));
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder, coordinators.ccr, configDBType)));
}
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));
@ -2523,21 +2530,25 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
"MonitorAndWriteCCPriorityInfo"));
if (processClass.machineClassFitness(ProcessClass::ClusterController) == ProcessClass::NeverAssign) {
actors.push_back(reportErrors(monitorLeader(connFile, cc), "ClusterController"));
actors.push_back(reportErrors(monitorLeader(connRecord, cc), "ClusterController"));
} else if (processClass.machineClassFitness(ProcessClass::ClusterController) == ProcessClass::WorstFit &&
SERVER_KNOBS->MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS > 0) {
actors.push_back(reportErrors(
monitorLeaderWithDelayedCandidacy(
connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities, dbInfo, configDBType),
"ClusterController"));
actors.push_back(reportErrors(monitorLeaderWithDelayedCandidacy(connRecord,
cc,
asyncPriorityInfo,
recoveredDiskFiles.getFuture(),
localities,
dbInfo,
configDBType),
"ClusterController"));
} else {
actors.push_back(reportErrors(
clusterController(
connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities, configDBType),
connRecord, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities, configDBType),
"ClusterController"));
}
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface"));
actors.push_back(reportErrorsExcept(workerServer(connFile,
actors.push_back(reportErrorsExcept(workerServer(connRecord,
cc,
localities,
asyncPriorityInfo,

View File

@ -23,6 +23,7 @@
#pragma once
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/ThreadSafeTransaction.h"
#include "fdbserver/workloads/MemoryKeyValueStore.h"
@ -239,7 +240,7 @@ struct ApiWorkload : TestWorkload {
useExtraDB = g_simulator.extraDB != nullptr;
if (useExtraDB) {
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
}
}

View File

@ -20,6 +20,7 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -38,7 +39,7 @@ struct AtomicSwitchoverWorkload : TestWorkload {
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
}

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -35,7 +36,7 @@ struct BackupToDBAbort : TestWorkload {
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
lockid = UID(0xbeeffeed, 0xdecaf00d);

View File

@ -20,6 +20,7 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -127,7 +128,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
}
}
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
TraceEvent("BARW_Start").detail("Locked", locked);

View File

@ -20,6 +20,7 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
@ -75,7 +76,7 @@ struct BackupToDBUpgradeWorkload : TestWorkload {
}
}
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
TraceEvent("DRU_Start").log();

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
@ -56,7 +57,7 @@ struct ChangeConfigWorkload : TestWorkload {
// for the extra cluster.
ACTOR Future<Void> extraDatabaseConfigure(ChangeConfigWorkload* self) {
if (g_network->isSimulated() && g_simulator.extraDB) {
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
state Database extraDB = Database::createDatabase(extraFile, -1);
wait(delay(5 * deterministicRandom()->random01()));

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbrpc/simulator.h"
@ -37,7 +38,7 @@ struct DifferentClustersSameRVWorkload : TestWorkload {
DifferentClustersSameRVWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
ASSERT(g_simulator.extraDB != nullptr);
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
testDuration = getOption(options, LiteralStringRef("testDuration"), 100.0);
switchAfter = getOption(options, LiteralStringRef("switchAfter"), 50.0);
@ -53,7 +54,7 @@ struct DifferentClustersSameRVWorkload : TestWorkload {
if (clientId != 0) {
return Void();
}
auto switchConnFileDb = Database::createDatabase(cx->getConnectionFile(), -1);
auto switchConnFileDb = Database::createDatabase(cx->getConnectionRecord(), -1);
originalDB = cx;
std::vector<Future<Void>> clients = { readerClientSeparateDBs(cx, this),
doSwitch(switchConnFileDb, this),
@ -141,8 +142,8 @@ struct DifferentClustersSameRVWorkload : TestWorkload {
TraceEvent("DifferentClusters_CopiedDatabase").log();
wait(advanceVersion(self->extraDB, rv));
TraceEvent("DifferentClusters_AdvancedVersion").log();
wait(cx->switchConnectionFile(
makeReference<ClusterConnectionFile>(self->extraDB->getConnectionFile()->getConnectionString())));
wait(cx->switchConnectionRecord(
makeReference<ClusterConnectionMemoryRecord>(self->extraDB->getConnectionRecord()->getConnectionString())));
TraceEvent("DifferentClusters_SwitchedConnectionFile").log();
state Transaction tr(cx);
tr.setVersion(rv);
@ -156,7 +157,7 @@ struct DifferentClustersSameRVWorkload : TestWorkload {
TraceEvent("DifferentClusters_ReadError").error(e);
wait(tr.onError(e));
}
// In an actual switch we would call switchConnectionFile after unlocking the database. But it's possible
// In an actual switch we would call switchConnectionRecord after unlocking the database. But it's possible
// that a storage server serves a read at |rv| even after the recovery caused by unlocking the database, and we
// want to make that more likely for this test. So read at |rv| then unlock.
wait(unlockDatabase(self->extraDB, lockUid));

View File

@ -101,7 +101,7 @@ struct KillRegionWorkload : TestWorkload {
TraceEvent("ForceRecovery_Begin").log();
wait(forceRecovery(cx->getConnectionFile(), LiteralStringRef("1")));
wait(forceRecovery(cx->getConnectionRecord(), LiteralStringRef("1")));
TraceEvent("ForceRecovery_UsableRegions").log();

View File

@ -19,6 +19,7 @@
*/
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
@ -150,7 +151,7 @@ struct VersionStampWorkload : TestWorkload {
ACTOR Future<bool> _check(Database cx, VersionStampWorkload* self) {
if (self->validateExtraDB) {
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
cx = Database::createDatabase(extraFile, -1);
}
state ReadYourWritesTransaction tr(cx);
@ -309,7 +310,7 @@ struct VersionStampWorkload : TestWorkload {
state Database extraDB;
if (g_simulator.extraDB != nullptr) {
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
}

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbclient/ReadYourWrites.h"
@ -81,7 +82,7 @@ struct WriteDuringReadWorkload : TestWorkload {
useExtraDB = g_simulator.extraDB != nullptr;
if (useExtraDB) {
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
extraDB = Database::createDatabase(extraFile, -1);
useSystemKeys = false;
}