Merge branch 'main' into fdb#4271

This commit is contained in:
Bharadwaj V.R 2022-02-23 10:04:19 -08:00 committed by GitHub
commit 102dcb30c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 2471 additions and 1877 deletions

View File

@ -22,7 +22,7 @@ Contributing to FoundationDB can be in contributions to the code base, sharing y
### Binary downloads
Developers interested in using FoundationDB can get started by downloading and installing a binary package. Please see the [downloads page](https://www.foundationdb.org/download/) for a list of available packages.
Developers interested in using FoundationDB can get started by downloading and installing a binary package. Please see the [downloads page](https://github.com/apple/foundationdb/releases) for a list of available packages.
### Compiling from source
@ -181,4 +181,4 @@ Under Windows, only Visual Studio with ClangCl is supported
1. `mkdir build && cd build`
1. `cmake -G "Visual Studio 16 2019" -A x64 -T ClangCl <PATH_TO_FOUNDATIONDB_SOURCE>`
1. `msbuild /p:Configuration=Release foundationdb.sln`
1. To increase build performance, use `/p:UseMultiToolTask=true` and `/p:CL_MPCount=<NUMBER_OF_PARALLEL_JOBS>`
1. To increase build performance, use `/p:UseMultiToolTask=true` and `/p:CL_MPCount=<NUMBER_OF_PARALLEL_JOBS>`

View File

@ -176,7 +176,7 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
case ConfigurationResult::STORAGE_MIGRATION_DISABLED:
fprintf(stderr,
"ERROR: Storage engine type cannot be changed because "
"storage_migration_mode=disabled.\n");
"storage_migration_type=disabled.\n");
fprintf(stderr,
"Type `configure perpetual_storage_wiggle=1 storage_migration_type=gradual' to enable gradual "
"migration with the perpetual wiggle, or `configure "

View File

@ -100,6 +100,7 @@ ACTOR Future<bool> changeCoordinators(Reference<IDatabase> db, std::vector<Strin
state std::vector<StringRef>::iterator t;
for (t = tokens.begin() + 1; t != tokens.end(); ++t) {
try {
// TODO(renxuan): add hostname parsing here.
auto const& addr = NetworkAddress::parse(t->toString());
if (new_coordinators_addresses.count(addr)) {
fprintf(stderr, "ERROR: passed redundant coordinators: `%s'\n", addr.toString().c_str());

View File

@ -1157,7 +1157,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state bool writeMode = false;
state std::string clusterConnectString;
state std::map<Key, std::pair<Value, ClientLeaderRegInterface>> address_interface;
state FdbOptions globalOptions;
@ -1171,6 +1170,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
ClusterConnectionFile::lookupClusterFileName(opt.clusterFile);
try {
ccf = makeReference<ClusterConnectionFile>(resolvedClusterFile.first);
wait(ccf->resolveHostnames());
} catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
return 1;

View File

@ -28,6 +28,7 @@
#include "fdbclient/CoordinationInterface.h"
// Determine public IP address by calling the first coordinator.
IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
try {
using namespace boost::asio;
@ -35,6 +36,7 @@ IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
io_service ioService;
ip::udp::socket socket(ioService);
ccs.resolveHostnamesBlocking();
const auto& coordAddr = ccs.coordinators()[0];
const auto boostIp = coordAddr.ip.isV6() ? ip::address(ip::address_v6(coordAddr.ip.toV6()))
: ip::address(ip::address_v4(coordAddr.ip.toV4()));

View File

@ -58,13 +58,28 @@ struct ClientLeaderRegInterface {
// - There is no address present more than once
class ClusterConnectionString {
public:
enum ConnectionStringStatus { RESOLVED, RESOLVING, UNRESOLVED };
ClusterConnectionString() {}
ClusterConnectionString(const std::string& connStr);
ClusterConnectionString(const std::vector<NetworkAddress>& coordinators, Key key);
ClusterConnectionString(const std::vector<Hostname>& hosts, Key key);
ClusterConnectionString(const ClusterConnectionString& rhs) { operator=(rhs); }
ClusterConnectionString& operator=(const ClusterConnectionString& rhs) {
// Copy everything except AsyncTrigger resolveFinish.
status = rhs.status;
coords = rhs.coords;
hostnames = rhs.hostnames;
networkAddressToHostname = rhs.networkAddressToHostname;
key = rhs.key;
keyDesc = rhs.keyDesc;
connectionString = rhs.connectionString;
return *this;
}
std::vector<NetworkAddress> const& coordinators() const { return coords; }
void addResolved(Hostname hostname, NetworkAddress address) {
void addResolved(const Hostname& hostname, const NetworkAddress& address) {
coords.push_back(address);
networkAddressToHostname.emplace(address, hostname);
}
@ -80,14 +95,15 @@ public:
void resolveHostnamesBlocking();
void resetToUnresolved();
bool hasUnresolvedHostnames = false;
ConnectionStringStatus status = RESOLVED;
AsyncTrigger resolveFinish;
std::vector<NetworkAddress> coords;
std::vector<Hostname> hostnames;
std::unordered_map<NetworkAddress, Hostname> networkAddressToHostname;
private:
void parseConnString();
void parseKey(const std::string& key);
std::unordered_map<NetworkAddress, Hostname> networkAddressToHostname;
Key key, keyDesc;
std::string connectionString;
};
@ -139,7 +155,7 @@ public:
// Signals to the connection record that it was successfully used to connect to a cluster.
void notifyConnected();
bool hasUnresolvedHostnames() const;
ClusterConnectionString::ConnectionStringStatus connectionStringStatus() const;
Future<Void> resolveHostnames();
// This one should only be used when resolving asynchronously is impossible. For all other cases, resolveHostnames()
// should be preferred.

View File

@ -169,7 +169,7 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
} else if (value == "gradual") {
type = StorageMigrationType::GRADUAL;
} else {
printf("Error: Only disabled|aggressive|gradual are valid for storage_migration_mode.\n");
printf("Error: Only disabled|aggressive|gradual are valid for storage_migration_type.\n");
return out;
}
out[p + key] = format("%d", type);

View File

@ -77,8 +77,8 @@ void IClusterConnectionRecord::setPersisted() {
connectionStringNeedsPersisted = false;
}
bool IClusterConnectionRecord::hasUnresolvedHostnames() const {
return cs.hasUnresolvedHostnames;
ClusterConnectionString::ConnectionStringStatus IClusterConnectionRecord::connectionStringStatus() const {
return cs.status;
}
Future<Void> IClusterConnectionRecord::resolveHostnames() {
@ -98,39 +98,56 @@ std::string ClusterConnectionString::getErrorString(std::string const& source, E
}
ACTOR Future<Void> resolveHostnamesImpl(ClusterConnectionString* self) {
std::vector<Future<Void>> fs;
for (auto const& hostName : self->hostnames) {
fs.push_back(map(INetworkConnections::net()->resolveTCPEndpoint(hostName.host, hostName.service),
[=](std::vector<NetworkAddress> const& addresses) -> Void {
NetworkAddress addr = addresses[deterministicRandom()->randomInt(0, addresses.size())];
addr.flags = 0; // Reset the parsed address to public
addr.fromHostname = NetworkAddressFromHostname::True;
if (hostName.isTLS) {
addr.flags |= NetworkAddress::FLAG_TLS;
}
self->addResolved(hostName, addr);
return Void();
}));
loop {
if (self->status == ClusterConnectionString::UNRESOLVED) {
self->status = ClusterConnectionString::RESOLVING;
std::vector<Future<Void>> fs;
for (auto const& hostname : self->hostnames) {
fs.push_back(map(INetworkConnections::net()->resolveTCPEndpoint(hostname.host, hostname.service),
[=](std::vector<NetworkAddress> const& addresses) -> Void {
NetworkAddress address =
addresses[deterministicRandom()->randomInt(0, addresses.size())];
address.flags = 0; // Reset the parsed address to public
address.fromHostname = NetworkAddressFromHostname::True;
if (hostname.isTLS) {
address.flags |= NetworkAddress::FLAG_TLS;
}
self->addResolved(hostname, address);
return Void();
}));
}
wait(waitForAll(fs));
std::sort(self->coords.begin(), self->coords.end());
if (std::unique(self->coords.begin(), self->coords.end()) != self->coords.end()) {
self->status = ClusterConnectionString::UNRESOLVED;
self->resolveFinish.trigger();
throw connection_string_invalid();
}
self->status = ClusterConnectionString::RESOLVED;
self->resolveFinish.trigger();
break;
} else if (self->status == ClusterConnectionString::RESOLVING) {
wait(self->resolveFinish.onTrigger());
if (self->status == ClusterConnectionString::RESOLVED) {
break;
}
// Otherwise, this means other threads failed on resolve, so here we go back to the loop and try to resolve
// again.
} else {
// status is RESOLVED, nothing to do.
break;
}
}
wait(waitForAll(fs));
std::sort(self->coords.begin(), self->coords.end());
if (std::unique(self->coords.begin(), self->coords.end()) != self->coords.end()) {
throw connection_string_invalid();
}
self->hasUnresolvedHostnames = false;
return Void();
}
Future<Void> ClusterConnectionString::resolveHostnames() {
if (!hasUnresolvedHostnames) {
return Void();
} else {
return resolveHostnamesImpl(this);
}
return resolveHostnamesImpl(this);
}
void ClusterConnectionString::resolveHostnamesBlocking() {
if (hasUnresolvedHostnames) {
if (status != RESOLVED) {
status = RESOLVING;
for (auto const& hostname : hostnames) {
std::vector<NetworkAddress> addresses =
INetworkConnections::net()->resolveTCPEndpointBlocking(hostname.host, hostname.service);
@ -140,14 +157,14 @@ void ClusterConnectionString::resolveHostnamesBlocking() {
if (hostname.isTLS) {
address.flags |= NetworkAddress::FLAG_TLS;
}
coords.push_back(address);
networkAddressToHostname.emplace(address, hostname);
addResolved(hostname, address);
}
std::sort(coords.begin(), coords.end());
if (std::unique(coords.begin(), coords.end()) != coords.end()) {
status = UNRESOLVED;
throw connection_string_invalid();
}
hasUnresolvedHostnames = false;
status = RESOLVED;
}
}
@ -156,7 +173,7 @@ void ClusterConnectionString::resetToUnresolved() {
coords.clear();
hostnames.clear();
networkAddressToHostname.clear();
hasUnresolvedHostnames = true;
status = UNRESOLVED;
parseConnString();
}
}
@ -184,7 +201,9 @@ void ClusterConnectionString::parseConnString() {
}
p = pComma + 1;
}
hasUnresolvedHostnames = hostnames.size() > 0;
if (hostnames.size() > 0) {
status = UNRESOLVED;
}
ASSERT((coords.size() + hostnames.size()) > 0);
std::sort(coords.begin(), coords.end());
@ -256,7 +275,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
{
input = "asdf:2345@localhost:1234";
ClusterConnectionString cs(input);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 1);
ASSERT(input == cs.toString());
}
@ -264,7 +283,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
{
input = "0xxdeadbeef:100100100@localhost:34534,host-name:23443";
ClusterConnectionString cs(input);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
@ -277,7 +296,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
commented += "# asdfasdf ##";
ClusterConnectionString cs(commented);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
@ -290,7 +309,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
commented += "# asdfasdf ##";
ClusterConnectionString cs(commented);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
@ -314,16 +333,16 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") {
INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address2 });
state ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0"));
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
wait(cs.resolveHostnames());
ASSERT(!cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::RESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 2);
ASSERT(cs.toString() == connectionString);
cs.resetToUnresolved();
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
ASSERT(cs.toString() == connectionString);
@ -422,7 +441,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/fuzz") {
}
ClusterConnectionString::ClusterConnectionString(const std::vector<NetworkAddress>& servers, Key key)
: coords(servers) {
: status(RESOLVED), coords(servers) {
std::string keyString = key.toString();
parseKey(keyString);
connectionString = keyString + "@";
@ -435,7 +454,7 @@ ClusterConnectionString::ClusterConnectionString(const std::vector<NetworkAddres
}
ClusterConnectionString::ClusterConnectionString(const std::vector<Hostname>& hosts, Key key)
: hasUnresolvedHostnames(true), hostnames(hosts) {
: status(UNRESOLVED), hostnames(hosts) {
std::string keyString = key.toString();
parseKey(keyString);
connectionString = keyString + "@";
@ -497,6 +516,7 @@ std::string ClusterConnectionString::toString() const {
}
ClientCoordinators::ClientCoordinators(Reference<IClusterConnectionRecord> ccr) : ccr(ccr) {
ASSERT(ccr->connectionStringStatus() == ClusterConnectionString::RESOLVED);
ClusterConnectionString cs = ccr->getConnectionString();
for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s)
clientLeaderServers.push_back(ClientLeaderRegInterface(*s));
@ -525,15 +545,44 @@ ClientLeaderRegInterface::ClientLeaderRegInterface(INetwork* local) {
// Nominee is the worker among all workers that are considered as leader by one coordinator
// This function contacts a coordinator coord to ask who is its nominee.
// Note: for coordinators whose NetworkAddress is parsed out of a hostname, a connection failure will cause this actor
// to throw `coordinators_changed()` error
ACTOR Future<Void> monitorNominee(Key key,
ClientLeaderRegInterface coord,
AsyncTrigger* nomineeChange,
Optional<LeaderInfo>* info) {
Optional<LeaderInfo>* info,
Optional<Hostname> hostname = Optional<Hostname>()) {
loop {
state Optional<LeaderInfo> li =
wait(retryBrokenPromise(coord.getLeader,
GetLeaderRequest(key, info->present() ? info->get().changeID : UID()),
TaskPriority::CoordinationReply));
state Optional<LeaderInfo> li;
if (coord.getLeader.getEndpoint().getPrimaryAddress().fromHostname) {
state ErrorOr<Optional<LeaderInfo>> rep =
wait(coord.getLeader.tryGetReply(GetLeaderRequest(key, info->present() ? info->get().changeID : UID()),
TaskPriority::CoordinationReply));
if (rep.isError()) {
// Connecting to nominee failed, most likely due to connection failed.
TraceEvent("MonitorNomineeError")
.detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname")
.detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString())
.error(rep.getError());
if (rep.getError().code() == error_code_request_maybe_delivered) {
// 50 milliseconds delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(0.05));
throw coordinators_changed();
} else {
throw rep.getError();
}
} else if (rep.present()) {
li = rep.get();
}
} else {
Optional<LeaderInfo> tmp =
wait(retryBrokenPromise(coord.getLeader,
GetLeaderRequest(key, info->present() ? info->get().changeID : UID()),
TaskPriority::CoordinationReply));
li = tmp;
}
wait(Future<Void>(Void())); // Make sure we weren't cancelled
TraceEvent("GetLeaderReply")
@ -608,53 +657,74 @@ Optional<std::pair<LeaderInfo, bool>> getLeader(const std::vector<Optional<Leade
ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> outSerializedLeaderInfo,
MonitorLeaderInfo info) {
state ClientCoordinators coordinators(info.intermediateConnRecord);
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
state Future<Void> allActors;
nominees.resize(coordinators.clientLeaderServers.size());
std::vector<Future<Void>> actors;
// Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
actors.reserve(coordinators.clientLeaderServers.size());
for (int i = 0; i < coordinators.clientLeaderServers.size(); i++)
actors.push_back(
monitorNominee(coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i]));
allActors = waitForAll(actors);
loop {
Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
TraceEvent("MonitorLeaderChange")
.detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1, 1));
if (leader.present()) {
if (leader.get().first.forward) {
TraceEvent("MonitorLeaderForwarding")
.detail("NewConnStr", leader.get().first.serializedInfo.toString())
.detail("OldConnStr", info.intermediateConnRecord->getConnectionString().toString())
.trackLatest("MonitorLeaderForwarding");
info.intermediateConnRecord = connRecord->makeIntermediateRecord(
ClusterConnectionString(leader.get().first.serializedInfo.toString()));
return info;
}
if (connRecord != info.intermediateConnRecord) {
if (!info.hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection")
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", connRecord->getConnectionString().toString())
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString());
info.intermediateConnRecord = connRecord;
}
wait(connRecord->resolveHostnames());
wait(info.intermediateConnRecord->resolveHostnames());
state ClientCoordinators coordinators(info.intermediateConnRecord);
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
state Future<Void> allActors;
info.hasConnected = true;
connRecord->notifyConnected();
nominees.resize(coordinators.clientLeaderServers.size());
outSerializedLeaderInfo->set(leader.get().first.serializedInfo);
state std::vector<Future<Void>> actors;
// Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
actors.reserve(coordinators.clientLeaderServers.size());
for (int i = 0; i < coordinators.clientLeaderServers.size(); i++) {
Optional<Hostname> hostname;
auto r = connRecord->getConnectionString().networkAddressToHostname.find(
coordinators.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress());
if (r != connRecord->getConnectionString().networkAddressToHostname.end()) {
hostname = r->second;
}
actors.push_back(monitorNominee(
coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], hostname));
}
allActors = waitForAll(actors);
loop {
Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
TraceEvent("MonitorLeaderChange")
.detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1, 1));
if (leader.present()) {
if (leader.get().first.forward) {
TraceEvent("MonitorLeaderForwarding")
.detail("NewConnStr", leader.get().first.serializedInfo.toString())
.detail("OldConnStr", info.intermediateConnRecord->getConnectionString().toString())
.trackLatest("MonitorLeaderForwarding");
info.intermediateConnRecord = connRecord->makeIntermediateRecord(
ClusterConnectionString(leader.get().first.serializedInfo.toString()));
return info;
}
if (connRecord != info.intermediateConnRecord) {
if (!info.hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection")
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", connRecord->getConnectionString().toString())
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString());
info.intermediateConnRecord = connRecord;
}
info.hasConnected = true;
connRecord->notifyConnected();
outSerializedLeaderInfo->set(leader.get().first.serializedInfo);
}
try {
wait(nomineeChange.onTrigger() || allActors);
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
TraceEvent("MonitorLeaderCoordinatorsChanged").suppressFor(1.0);
connRecord->getConnectionString().resetToUnresolved();
break;
} else {
throw e;
}
}
}
wait(nomineeChange.onTrigger() || allActors);
}
}
@ -774,8 +844,8 @@ ACTOR Future<Void> getClientInfoFromLeader(Reference<AsyncVar<Optional<ClusterCo
when(ClientDBInfo ni =
wait(brokenPromiseToNever(knownLeader->get().get().clientInterface.openDatabase.getReply(req)))) {
TraceEvent("GetClientInfoFromLeaderGotClientInfo", knownLeader->get().get().clientInterface.id())
.detail("CommitProxy0", ni.commitProxies.size() ? ni.commitProxies[0].id() : UID())
.detail("GrvProxy0", ni.grvProxies.size() ? ni.grvProxies[0].id() : UID())
.detail("CommitProxy0", ni.commitProxies.size() ? ni.commitProxies[0].address().toString() : "")
.detail("GrvProxy0", ni.grvProxies.size() ? ni.grvProxies[0].address().toString() : "")
.detail("ClientID", ni.id);
clientData->clientInfo->set(CachedSerialization<ClientDBInfo>(ni));
}
@ -787,7 +857,8 @@ ACTOR Future<Void> getClientInfoFromLeader(Reference<AsyncVar<Optional<ClusterCo
ACTOR Future<Void> monitorLeaderAndGetClientInfo(Key clusterKey,
std::vector<NetworkAddress> coordinators,
ClientData* clientData,
Reference<AsyncVar<Optional<LeaderInfo>>> leaderInfo) {
Reference<AsyncVar<Optional<LeaderInfo>>> leaderInfo,
Reference<AsyncVar<Void>> coordinatorsChanged) {
state std::vector<ClientLeaderRegInterface> clientLeaderServers;
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
@ -835,7 +906,14 @@ ACTOR Future<Void> monitorLeaderAndGetClientInfo(Key clusterKey,
leaderInfo->set(leader.get().first);
}
}
wait(nomineeChange.onTrigger() || allActors);
try {
wait(nomineeChange.onTrigger() || allActors);
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
coordinatorsChanged->trigger();
}
throw e;
}
}
}
@ -964,9 +1042,15 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
successIndex = index;
} else {
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller
if (rep.getError().code() == error_code_coordinators_changed) {
throw coordinators_changed();
}
index = (index + 1) % addrs.size();
if (index == successIndex) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
// When the client fails talking to all coordinators, we throw coordinators_changed() and let the caller
// re-resolve the connection string and retry.
throw coordinators_changed();
}
}
}
@ -978,16 +1062,27 @@ ACTOR Future<Void> monitorProxies(
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
wait(connRecord->get()->resolveHostnames());
state MonitorLeaderInfo info(connRecord->get());
loop {
choose {
when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration(
connRecord->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
info = _info;
try {
wait(info.intermediateConnRecord->resolveHostnames());
choose {
when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration(
connRecord->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
info = _info;
}
when(wait(connRecord->onChange())) {
info.hasConnected = false;
info.intermediateConnRecord = connRecord->get();
}
}
when(wait(connRecord->onChange())) {
info.hasConnected = false;
info.intermediateConnRecord = connRecord->get();
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
TraceEvent("MonitorProxiesCoordinatorsChanged").suppressFor(1.0);
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
} else {
throw e;
}
}
}

View File

@ -74,10 +74,11 @@ Future<Void> monitorLeader(Reference<IClusterConnectionRecord> const& connFile,
// This is one place where the leader election algorithm is run. The coodinator contacts all coodinators to collect
// nominees, the nominee with the most nomination is the leader, and collects client data from the leader. This function
// also monitors the change of the leader.
Future<Void> monitorLeaderAndGetClientInfo(Value const& key,
Future<Void> monitorLeaderAndGetClientInfo(Key const& clusterKey,
std::vector<NetworkAddress> const& coordinators,
ClientData* const& clientData,
Reference<AsyncVar<Optional<LeaderInfo>>> const& leaderInfo);
Reference<AsyncVar<Optional<LeaderInfo>>> const& leaderInfo,
Reference<AsyncVar<Void>> const& coordinatorsChanged);
Future<Void> monitorProxies(
Reference<AsyncVar<Reference<IClusterConnectionRecord>>> const& connRecord,

View File

@ -732,16 +732,18 @@ Future<Void> attemptGRVFromOldProxies(std::vector<GrvProxyInterface> oldProxies,
ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
AsyncTrigger* proxyChangeTrigger) {
AsyncTrigger* proxiesChangeTrigger) {
state std::vector<CommitProxyInterface> curCommitProxies;
state std::vector<GrvProxyInterface> curGrvProxies;
state ActorCollection actors(false);
state Future<Void> clientDBInfoOnChange = clientDBInfo->onChange();
curCommitProxies = clientDBInfo->get().commitProxies;
curGrvProxies = clientDBInfo->get().grvProxies;
loop {
choose {
when(wait(clientDBInfo->onChange())) {
when(wait(clientDBInfoOnChange)) {
clientDBInfoOnChange = clientDBInfo->onChange();
if (clientDBInfo->get().commitProxies != curCommitProxies ||
clientDBInfo->get().grvProxies != curGrvProxies) {
// This condition is a bit complicated. Here we want to verify that we're unable to receive a read
@ -758,7 +760,7 @@ ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
}
curCommitProxies = clientDBInfo->get().commitProxies;
curGrvProxies = clientDBInfo->get().grvProxies;
proxyChangeTrigger->trigger();
proxiesChangeTrigger->trigger();
}
}
when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
@ -5713,9 +5715,10 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan,
loop {
try {
state GetReadVersionRequest req(span.context, transactionCount, priority, flags, tags, debugID);
state Future<Void> onProxiesChanged = cx->onProxiesChanged();
choose {
when(wait(cx->onProxiesChanged())) {}
when(wait(onProxiesChanged)) { onProxiesChanged = cx->onProxiesChanged(); }
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies(
flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES)),
@ -6881,6 +6884,7 @@ ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion
throw;
}
TraceEvent("ExclusionSafetyCheckCoordinators").log();
wait(cx->getConnectionRecord()->resolveHostnames());
state ClientCoordinators coordinatorList(cx->getConnectionRecord());
state std::vector<Future<Optional<LeaderInfo>>> leaderServers;
leaderServers.reserve(coordinatorList.clientLeaderServers.size());

View File

@ -29,12 +29,12 @@ void HealthMonitor::reportPeerClosed(const NetworkAddress& peerAddress) {
}
void HealthMonitor::purgeOutdatedHistory() {
for (auto it = peerClosedHistory.begin(); it != peerClosedHistory.end();) {
if (it->first < now() - FLOW_KNOBS->HEALTH_MONITOR_CLIENT_REQUEST_INTERVAL_SECS) {
auto& count = peerClosedNum[it->second];
while (!peerClosedHistory.empty()) {
auto const& p = peerClosedHistory.front();
if (p.first < now() - FLOW_KNOBS->HEALTH_MONITOR_CLIENT_REQUEST_INTERVAL_SECS) {
auto& count = peerClosedNum[p.second];
--count;
ASSERT(count >= 0);
++it; // Increment before pop_front to avoid iterator invalidation
peerClosedHistory.pop_front();
} else {
break;

View File

@ -90,6 +90,7 @@ set(FDBSERVER_SRCS
QuietDatabase.actor.cpp
QuietDatabase.h
RadixTree.h
Ratekeeper.h
Ratekeeper.actor.cpp
RatekeeperInterface.h
RecoveryState.h
@ -130,6 +131,8 @@ set(FDBSERVER_SRCS
storageserver.actor.cpp
TagPartitionedLogSystem.actor.cpp
TagPartitionedLogSystem.actor.h
TagThrottler.actor.cpp
TagThrottler.h
template_fdb.h
TCInfo.actor.cpp
TCInfo.h

View File

@ -96,6 +96,7 @@ LeaderElectionRegInterface::LeaderElectionRegInterface(INetwork* local) : Client
}
ServerCoordinators::ServerCoordinators(Reference<IClusterConnectionRecord> ccr) : ClientCoordinators(ccr) {
ASSERT(ccr->connectionStringStatus() == ClusterConnectionString::RESOLVED);
ClusterConnectionString cs = ccr->getConnectionString();
for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) {
leaderElectionServers.emplace_back(*s);
@ -205,8 +206,11 @@ ACTOR Future<Void> openDatabase(ClientData* db,
int* clientCount,
Reference<AsyncVar<bool>> hasConnectedClients,
OpenDatabaseCoordRequest req,
Future<Void> checkStuck) {
Future<Void> checkStuck,
Reference<AsyncVar<Void>> coordinatorsChanged) {
state ErrorOr<CachedSerialization<ClientDBInfo>> replyContents;
state Future<Void> coordinatorsChangedOnChange = coordinatorsChanged->onChange();
state Future<Void> clientInfoOnChange = db->clientInfo->onChange();
++(*clientCount);
hasConnectedClients->set(true);
@ -223,7 +227,15 @@ ACTOR Future<Void> openDatabase(ClientData* db,
replyContents = failed_to_progress();
break;
}
when(wait(yieldedFuture(db->clientInfo->onChange()))) { replyContents = db->clientInfo->get(); }
when(wait(yieldedFuture(clientInfoOnChange))) {
clientInfoOnChange = db->clientInfo->onChange();
replyContents = db->clientInfo->get();
}
when(wait(coordinatorsChangedOnChange)) {
coordinatorsChangedOnChange = coordinatorsChanged->onChange();
replyContents = coordinators_changed();
break;
}
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) {
if (db->clientInfo->get().read().id.isValid()) {
replyContents = db->clientInfo->get();
@ -254,18 +266,33 @@ ACTOR Future<Void> openDatabase(ClientData* db,
ACTOR Future<Void> remoteMonitorLeader(int* clientCount,
Reference<AsyncVar<bool>> hasConnectedClients,
Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader,
ElectionResultRequest req) {
ElectionResultRequest req,
Reference<AsyncVar<Void>> coordinatorsChanged) {
state bool coordinatorsChangeDetected = false;
state Future<Void> coordinatorsChangedOnChange = coordinatorsChanged->onChange();
state Future<Void> currentElectedLeaderOnChange = currentElectedLeader->onChange();
++(*clientCount);
hasConnectedClients->set(true);
while (!currentElectedLeader->get().present() || req.knownLeader == currentElectedLeader->get().get().changeID) {
choose {
when(wait(yieldedFuture(currentElectedLeader->onChange()))) {}
when(wait(yieldedFuture(currentElectedLeaderOnChange))) {
currentElectedLeaderOnChange = currentElectedLeader->onChange();
}
when(wait(coordinatorsChangedOnChange)) {
coordinatorsChangedOnChange = coordinatorsChanged->onChange();
coordinatorsChangeDetected = true;
break;
}
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) { break; }
}
}
req.reply.send(currentElectedLeader->get());
if (coordinatorsChangeDetected) {
req.reply.sendError(coordinators_changed());
} else {
req.reply.send(currentElectedLeader->get());
}
if (--(*clientCount) == 0) {
hasConnectedClients->set(false);
@ -296,6 +323,9 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
state Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader =
makeReference<AsyncVar<Optional<LeaderInfo>>>();
state LivenessChecker canConnectToLeader(SERVER_KNOBS->COORDINATOR_LEADER_CONNECTION_TIMEOUT);
state Reference<AsyncVar<Void>> coordinatorsChanged = makeReference<AsyncVar<Void>>();
state Future<Void> coordinatorsChangedOnChange = coordinatorsChanged->onChange();
state Future<Void> hasConnectedClientsOnChange = hasConnectedClients->onChange();
loop choose {
when(OpenDatabaseCoordRequest req = waitNext(interf.openDatabase.getFuture())) {
@ -306,10 +336,14 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
} else {
if (!leaderMon.isValid()) {
leaderMon = monitorLeaderAndGetClientInfo(
req.clusterKey, req.coordinators, &clientData, currentElectedLeader);
req.clusterKey, req.coordinators, &clientData, currentElectedLeader, coordinatorsChanged);
}
actors.add(
openDatabase(&clientData, &clientCount, hasConnectedClients, req, canConnectToLeader.checkStuck()));
actors.add(openDatabase(&clientData,
&clientCount,
hasConnectedClients,
req,
canConnectToLeader.checkStuck(),
coordinatorsChanged));
}
}
when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) {
@ -318,10 +352,11 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
req.reply.send(currentElectedLeader->get());
} else {
if (!leaderMon.isValid()) {
leaderMon =
monitorLeaderAndGetClientInfo(req.key, req.coordinators, &clientData, currentElectedLeader);
leaderMon = monitorLeaderAndGetClientInfo(
req.key, req.coordinators, &clientData, currentElectedLeader, coordinatorsChanged);
}
actors.add(remoteMonitorLeader(&clientCount, hasConnectedClients, currentElectedLeader, req));
actors.add(remoteMonitorLeader(
&clientCount, hasConnectedClients, currentElectedLeader, req, coordinatorsChanged));
}
}
when(GetLeaderRequest req = waitNext(interf.getLeader.getFuture())) {
@ -454,13 +489,18 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
notify.pop_front();
}
}
when(wait(hasConnectedClients->onChange())) {
when(wait(hasConnectedClientsOnChange)) {
hasConnectedClientsOnChange = hasConnectedClients->onChange();
if (!hasConnectedClients->get() && !nextInterval.isValid()) {
TraceEvent("LeaderRegisterUnneeded").detail("Key", key);
return Void();
}
}
when(wait(actors.getResult())) {}
when(wait(coordinatorsChangedOnChange)) {
leaderMon = Future<Void>();
coordinatorsChangedOnChange = coordinatorsChanged->onChange();
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -171,6 +171,7 @@ typedef AsyncMap<UID, ServerStatus> ServerStatusMap;
class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
friend class DDTeamCollectionImpl;
friend class DDTeamCollectionUnitTest;
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3 };
@ -521,6 +522,37 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
void noHealthyTeams() const;
// To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) const;
// Check if the server belongs to a machine; if not, create the machine.
// Establish the two-direction link between server and machine
Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server);
// Group storage servers (process) based on their machineId in LocalityData
// All created machines are healthy
// Return The number of healthy servers we grouped into machines
int constructMachinesFromServers();
// Create machineTeamsToBuild number of machine teams
// No operation if machineTeamsToBuild is 0
// Note: The creation of machine teams should not depend on server teams:
// No matter how server teams will be created, we will create the same set of machine teams;
// We should never use server team number in building machine teams.
//
// Five steps to create each machine team, which are document in the function
// Reuse ReplicationPolicy selectReplicas func to select machine team
// return number of added machine teams
int addBestMachineTeams(int machineTeamsToBuild);
// Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams
bool sanityCheckTeams() const;
void disableBuildingTeams() { doBuildTeams = false; }
void setCheckTeamDelay() { this->checkTeamDelay = Void(); }
public:
Database cx;
@ -595,39 +627,6 @@ public:
void addTeam(std::set<UID> const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); }
// FIXME: Public for testing only
void disableBuildingTeams() { doBuildTeams = false; }
// FIXME: Public for testing only
void setCheckTeamDelay() { this->checkTeamDelay = Void(); }
// FIXME: Public for testing only
// Group storage servers (process) based on their machineId in LocalityData
// All created machines are healthy
// Return The number of healthy servers we grouped into machines
int constructMachinesFromServers();
// FIXME: Public for testing only
// To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) const;
// FIXME: Public for testing only
// Create machineTeamsToBuild number of machine teams
// No operation if machineTeamsToBuild is 0
// Note: The creation of machine teams should not depend on server teams:
// No matter how server teams will be created, we will create the same set of machine teams;
// We should never use server team number in building machine teams.
//
// Five steps to create each machine team, which are document in the function
// Reuse ReplicationPolicy selectReplicas func to select machine team
// return number of added machine teams
int addBestMachineTeams(int machineTeamsToBuild);
// FIXME: Public for testing only
// Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams
bool sanityCheckTeams() const;
// Create server teams based on machine teams
// Before the number of machine teams reaches the threshold, build a machine team for each server team
// When it reaches the threshold, first try to build a server team with existing machine teams; if failed,
@ -642,11 +641,6 @@ public:
bool removeTeam(Reference<TCTeamInfo> team);
// FIXME: Public for testing only
// Check if the server belongs to a machine; if not, create the machine.
// Establish the two-direction link between server and machine
Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server);
void removeTSS(UID removedServer);
void removeServer(UID removedServer);

View File

@ -158,8 +158,9 @@ ACTOR Future<std::vector<WorkerInterface>> getCoordWorkers(Database cx,
if (!coordinators.present()) {
throw operation_failed();
}
std::vector<NetworkAddress> coordinatorsAddr =
ClusterConnectionString(coordinators.get().toString()).coordinators();
state ClusterConnectionString ccs(coordinators.get().toString());
wait(ccs.resolveHostnames());
std::vector<NetworkAddress> coordinatorsAddr = ccs.coordinators();
std::set<NetworkAddress> coordinatorsAddrSet;
for (const auto& addr : coordinatorsAddr) {
TraceEvent(SevDebug, "CoordinatorAddress").detail("Addr", addr);

File diff suppressed because it is too large Load Diff

207
fdbserver/Ratekeeper.h Normal file
View File

@ -0,0 +1,207 @@
/*
* Ratekeeper.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbrpc/Smoother.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TLogInterface.h"
enum limitReason_t {
unlimited, // TODO: rename to workload?
storage_server_write_queue_size, // 1
storage_server_write_bandwidth_mvcc,
storage_server_readable_behind,
log_server_mvcc_write_bandwidth,
log_server_write_queue, // 5
storage_server_min_free_space, // a storage server's normal limits are being reduced by low free space
storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio
log_server_min_free_space,
log_server_min_free_space_ratio,
storage_server_durability_lag, // 10
storage_server_list_fetch_failed,
limitReason_t_end
};
struct StorageQueueInfo {
bool valid;
UID id;
LocalityData locality;
StorageQueuingMetricsReply lastReply;
StorageQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothDurableVersion, smoothLatestVersion;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
limitReason_t limitReason;
Optional<TransactionTag> busiestReadTag, busiestWriteTag;
double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0;
double busiestReadTagRate = 0, busiestWriteTagRate = 0;
Reference<EventCacheHolder> busiestWriteTagEventHolder;
// refresh periodically
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
uint64_t totalWriteCosts = 0;
int totalWriteOps = 0;
StorageQueueInfo(UID id, LocalityData locality)
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
limitReason(limitReason_t::unlimited),
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(id.toString() + "/BusiestWriteTag")) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
}
};
struct TLogQueueInfo {
bool valid;
UID id;
TLogQueuingMetricsReply lastReply;
TLogQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
TLogQueueInfo(UID id)
: valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from
// storageQueueInfO)
lastReply.instanceID = -1;
}
};
struct RatekeeperLimits {
double tpsLimit;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle reasonMetric;
int64_t storageTargetBytes;
int64_t storageSpringBytes;
int64_t logTargetBytes;
int64_t logSpringBytes;
double maxVersionDifference;
int64_t durabilityLagTargetVersions;
int64_t lastDurabilityLag;
double durabilityLagLimit;
TransactionPriority priority;
std::string context;
Reference<EventCacheHolder> rkUpdateEventCacheHolder;
RatekeeperLimits(TransactionPriority priority,
std::string context,
int64_t storageTargetBytes,
int64_t storageSpringBytes,
int64_t logTargetBytes,
int64_t logSpringBytes,
double maxVersionDifference,
int64_t durabilityLagTargetVersions)
: tpsLimit(std::numeric_limits<double>::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
durabilityLagTargetVersions(
durabilityLagTargetVersions +
SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not
// be durable on the storage servers
lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits<double>::infinity()), priority(priority),
context(context), rkUpdateEventCacheHolder(makeReference<EventCacheHolder>("RkUpdate" + context)) {}
};
class Ratekeeper {
friend class RatekeeperImpl;
// Differentiate from GrvProxyInfo in DatabaseContext.h
struct GrvProxyInfo {
int64_t totalTransactions;
int64_t batchTransactions;
uint64_t lastThrottledTagChangeId;
double lastUpdateTime;
double lastTagPushTime;
GrvProxyInfo()
: totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0),
lastTagPushTime(0) {}
};
UID id;
Database db;
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, Ratekeeper::GrvProxyInfo> grvProxyInfo;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
DatabaseConfiguration configuration;
PromiseStream<Future<Void>> addActor;
Int64MetricHandle actualTpsMetric;
double lastWarning;
double lastSSListFetchedTimestamp;
std::unique_ptr<class TagThrottler> tagThrottler;
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Deque<double> actualTpsHistory;
Optional<Key> remoteDC;
Future<Void> expiredTagThrottleCleanup;
double lastBusiestCommitTagPick;
Ratekeeper(UID id, Database db);
Future<Void> configurationMonitor();
void updateCommitCostEstimation(UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation);
void updateRate(RatekeeperLimits* limits);
Future<Void> refreshStorageServerCommitCost();
Future<Void> monitorServerListChange(PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
Future<Void> trackEachStorageServer(FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
// SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
Future<Void> trackStorageServerQueueInfo(StorageServerInterface);
Future<Void> trackTLogQueueInfo(TLogInterface);
void tryAutoThrottleTag(TransactionTag, double rate, double busyness, TagThrottledReason);
void tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag);
Future<Void> monitorThrottlingChanges();
public:
static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
};

View File

@ -1905,8 +1905,8 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
TEST(useIPv6); // Use IPv6
TEST(!useIPv6); // Use IPv4
// TODO(renxuan): Use hostname 25% of the time, unless it is disabled
bool useHostname = false; // !testConfig.disableHostname && deterministicRandom()->random01() < 0.25;
// Use hostname 25% of the time, unless it is disabled
bool useHostname = !testConfig.disableHostname && deterministicRandom()->random01() < 0.25;
TEST(useHostname); // Use hostname
TEST(!useHostname); // Use IP address
NetworkAddressFromHostname fromHostname =

View File

@ -0,0 +1,598 @@
/*
* TagThrottler.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/TagThrottler.h"
class RkTagThrottleCollection : NonCopyable {
struct RkTagData {
Smoother requestRate;
RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
};
struct RkTagThrottleData {
ClientTagThrottleLimits limits;
Smoother clientRate;
// Only used by auto-throttles
double created = now();
double lastUpdated = 0;
double lastReduced = now();
bool rateSet = false;
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
double getTargetRate(Optional<double> requestRate) {
if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) {
return limits.tpsRate;
} else {
return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal());
}
}
Optional<double> updateAndGetClientRate(Optional<double> requestRate) {
if (limits.expiration > now()) {
double targetRate = getTargetRate(requestRate);
if (targetRate == std::numeric_limits<double>::max()) {
rateSet = false;
return targetRate;
}
if (!rateSet) {
rateSet = true;
clientRate.reset(targetRate);
} else {
clientRate.setTotal(targetRate);
}
double rate = clientRate.smoothTotal();
ASSERT(rate >= 0);
return rate;
} else {
TEST(true); // Get throttle rate for expired throttle
rateSet = false;
return Optional<double>();
}
}
};
void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); }
public:
RkTagThrottleCollection() {}
RkTagThrottleCollection(RkTagThrottleCollection&& other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
void operator=(RkTagThrottleCollection&& other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) {
ASSERT(currentBusyness > 0);
if (targetBusyness < 1) {
double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness);
return requestRate * targetFraction;
} else {
return std::numeric_limits<double>::max();
}
}
// Returns the TPS rate if the throttle is updated, otherwise returns an empty optional
Optional<double> autoThrottleTag(UID id,
TransactionTag const& tag,
double fractionalBusyness,
Optional<double> tpsRate = Optional<double>(),
Optional<double> expiration = Optional<double>()) {
ASSERT(!tpsRate.present() || tpsRate.get() >= 0);
ASSERT(!expiration.present() || expiration.get() > now());
auto itr = autoThrottledTags.find(tag);
bool present = (itr != autoThrottledTags.end());
if (!present) {
if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) {
TEST(true); // Reached auto-throttle limit
return Optional<double>();
}
itr = autoThrottledTags.try_emplace(tag).first;
initializeTag(tag);
} else if (itr->second.limits.expiration <= now()) {
TEST(true); // Re-throttling expired tag that hasn't been cleaned up
present = false;
itr->second = RkTagThrottleData();
}
auto& throttle = itr->second;
if (!tpsRate.present()) {
if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
tpsRate = std::numeric_limits<double>::max();
if (present) {
return Optional<double>();
}
} else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
TEST(true); // Tag auto-throttled too quickly
return Optional<double>();
} else {
tpsRate = computeTargetTpsRate(fractionalBusyness,
SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS,
tagData[tag].requestRate.smoothRate());
if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) {
TEST(true); // Tag auto-throttle rate increase attempt while active
return Optional<double>();
}
throttle.lastUpdated = now();
if (tpsRate.get() < throttle.limits.tpsRate) {
throttle.lastReduced = now();
}
}
}
if (!expiration.present()) {
expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION;
}
ASSERT(tpsRate.present() && tpsRate.get() >= 0);
throttle.limits.tpsRate = tpsRate.get();
throttle.limits.expiration = expiration.get();
Optional<double> clientRate = throttle.updateAndGetClientRate(getRequestRate(tag));
TraceEvent("RkSetAutoThrottle", id)
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
.detail("Expiration", expiration.get() - now())
.detail("ClientRate", clientRate)
.detail("Created", now() - throttle.created)
.detail("LastUpdate", now() - throttle.lastUpdated)
.detail("LastReduced", now() - throttle.lastReduced);
if (tpsRate.get() != std::numeric_limits<double>::max()) {
return tpsRate.get();
} else {
return Optional<double>();
}
}
void manualThrottleTag(UID id,
TransactionTag const& tag,
TransactionPriority priority,
double tpsRate,
double expiration,
Optional<ClientTagThrottleLimits> const& oldLimits) {
ASSERT(tpsRate >= 0);
ASSERT(expiration > now());
auto& priorityThrottleMap = manualThrottledTags[tag];
auto result = priorityThrottleMap.try_emplace(priority);
initializeTag(tag);
ASSERT(result.second); // Updating to the map is done by copying the whole map
result.first->second.limits.tpsRate = tpsRate;
result.first->second.limits.expiration = expiration;
if (!oldLimits.present()) {
TEST(true); // Transaction tag manually throttled
TraceEvent("RatekeeperAddingManualThrottle", id)
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
} else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) {
TEST(true); // Manual transaction tag throttle updated
TraceEvent("RatekeeperUpdatingManualThrottle", id)
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
}
Optional<double> clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag));
ASSERT(clientRate.present());
}
Optional<ClientTagThrottleLimits> getManualTagThrottleLimits(TransactionTag const& tag,
TransactionPriority priority) {
auto itr = manualThrottledTags.find(tag);
if (itr != manualThrottledTags.end()) {
auto priorityItr = itr->second.find(priority);
if (priorityItr != itr->second.end()) {
return priorityItr->second.limits;
}
}
return Optional<ClientTagThrottleLimits>();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates(bool autoThrottlingEnabled) {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
for (auto tagItr = tagData.begin(); tagItr != tagData.end();) {
bool tagPresent = false;
double requestRate = tagItr->second.requestRate.smoothRate();
auto manualItr = manualThrottledTags.find(tagItr->first);
if (manualItr != manualThrottledTags.end()) {
Optional<ClientTagThrottleLimits> manualClientRate;
for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend());
++priority) {
auto priorityItr = manualItr->second.find(*priority);
if (priorityItr != manualItr->second.end()) {
Optional<double> priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate);
if (!priorityClientRate.present()) {
TEST(true); // Manual priority throttle expired
priorityItr = manualItr->second.erase(priorityItr);
} else {
if (!manualClientRate.present() ||
manualClientRate.get().tpsRate > priorityClientRate.get()) {
manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(),
priorityItr->second.limits.expiration);
} else {
TEST(true); // Manual throttle overriden by higher priority
}
++priorityItr;
}
}
if (manualClientRate.present()) {
tagPresent = true;
TEST(true); // Using manual throttle
clientRates[*priority][tagItr->first] = manualClientRate.get();
}
}
if (manualItr->second.empty()) {
TEST(true); // All manual throttles expired
manualThrottledTags.erase(manualItr);
break;
}
}
auto autoItr = autoThrottledTags.find(tagItr->first);
if (autoItr != autoThrottledTags.end()) {
Optional<double> autoClientRate = autoItr->second.updateAndGetClientRate(requestRate);
if (autoClientRate.present()) {
double adjustedRate = autoClientRate.get();
double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION -
SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
if (now() >= rampStartTime && adjustedRate != std::numeric_limits<double>::max()) {
TEST(true); // Tag auto-throttle ramping up
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
if (targetBusyness == 0) {
targetBusyness = 0.01;
}
double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
adjustedRate =
computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate);
}
tagPresent = true;
if (autoThrottlingEnabled) {
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(
tagItr->first, adjustedRate, autoItr->second.limits.expiration);
if (!result.second && result.first->second.tpsRate > adjustedRate) {
result.first->second =
ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
} else {
TEST(true); // Auto throttle overriden by manual throttle
}
clientRates[TransactionPriority::BATCH][tagItr->first] =
ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
} else {
ASSERT(autoItr->second.limits.expiration <= now());
TEST(true); // Auto throttle expired
if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup
tagPresent = true;
} else {
autoThrottledTags.erase(autoItr);
}
}
}
if (!tagPresent) {
TEST(true); // All tag throttles expired
tagItr = tagData.erase(tagItr);
} else {
++tagItr;
}
}
return clientRates;
}
void addRequests(TransactionTag const& tag, int requests) {
if (requests > 0) {
TEST(true); // Requests reported for throttled tag
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
double requestRate = tagItr.first->second.requestRate.smoothRate();
auto autoItr = autoThrottledTags.find(tag);
if (autoItr != autoThrottledTags.end()) {
autoItr->second.updateAndGetClientRate(requestRate);
}
auto manualItr = manualThrottledTags.find(tag);
if (manualItr != manualThrottledTags.end()) {
for (auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end();
++priorityItr) {
priorityItr->second.updateAndGetClientRate(requestRate);
}
}
}
}
Optional<double> getRequestRate(TransactionTag const& tag) {
auto itr = tagData.find(tag);
if (itr != tagData.end()) {
return itr->second.requestRate.smoothRate();
}
return Optional<double>();
}
int64_t autoThrottleCount() const { return autoThrottledTags.size(); }
int64_t manualThrottleCount() const {
int64_t count = 0;
for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) {
count += itr->second.size();
}
return count;
}
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
TransactionTagMap<std::map<TransactionPriority, RkTagThrottleData>> manualThrottledTags;
TransactionTagMap<RkTagData> tagData;
uint32_t busyReadTagCount = 0, busyWriteTagCount = 0;
};
class TagThrottlerImpl {
Database db;
UID id;
RkTagThrottleCollection throttledTags;
uint64_t throttledTagChangeId{ 0 };
bool autoThrottlingEnabled{ false };
ACTOR static Future<Void> monitorThrottlingChanges(TagThrottlerImpl* self) {
state bool committed = false;
loop {
state ReadYourWritesTransaction tr(self->db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Future<RangeResult> throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY);
state Future<Optional<Value>> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey);
if (!committed) {
BinaryWriter limitWriter(Unversioned());
limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
tr.set(tagThrottleLimitKey, limitWriter.toValue());
}
wait(success(throttledTagKeys) && success(autoThrottlingEnabled));
if (autoThrottlingEnabled.get().present() &&
autoThrottlingEnabled.get().get() == LiteralStringRef("0")) {
TEST(true); // Auto-throttling disabled
if (self->autoThrottlingEnabled) {
TraceEvent("AutoTagThrottlingDisabled", self->id).log();
}
self->autoThrottlingEnabled = false;
} else if (autoThrottlingEnabled.get().present() &&
autoThrottlingEnabled.get().get() == LiteralStringRef("1")) {
TEST(true); // Auto-throttling enabled
if (!self->autoThrottlingEnabled) {
TraceEvent("AutoTagThrottlingEnabled", self->id).log();
}
self->autoThrottlingEnabled = true;
} else {
TEST(true); // Auto-throttling unspecified
if (autoThrottlingEnabled.get().present()) {
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id)
.detail("Value", autoThrottlingEnabled.get().get());
}
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
if (!committed)
tr.set(tagThrottleAutoEnabledKey,
LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0"));
}
RkTagThrottleCollection updatedTagThrottles;
TraceEvent("RatekeeperReadThrottledTags", self->id)
.detail("NumThrottledTags", throttledTagKeys.get().size());
for (auto entry : throttledTagKeys.get()) {
TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key);
TagThrottleValue tagValue = TagThrottleValue::fromValue(entry.value);
ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported
if (tagValue.expirationTime == 0 ||
tagValue.expirationTime > now() + tagValue.initialDuration) {
TEST(true); // Converting tag throttle duration to absolute time
tagValue.expirationTime = now() + tagValue.initialDuration;
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValueReason()));
wr << tagValue;
state Value value = wr.toValue();
tr.set(entry.key, value);
}
if (tagValue.expirationTime > now()) {
TransactionTag tag = *tagKey.tags.begin();
Optional<ClientTagThrottleLimits> oldLimits =
self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority);
if (tagKey.throttleType == TagThrottleType::AUTO) {
updatedTagThrottles.autoThrottleTag(
self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
if (tagValue.reason == TagThrottledReason::BUSY_READ) {
updatedTagThrottles.busyReadTagCount++;
} else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) {
updatedTagThrottles.busyWriteTagCount++;
}
} else {
updatedTagThrottles.manualThrottleTag(self->id,
tag,
tagKey.priority,
tagValue.tpsRate,
tagValue.expirationTime,
oldLimits);
}
}
}
self->throttledTags = std::move(updatedTagThrottles);
++self->throttledTagChangeId;
state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
wait(tr.commit());
committed = true;
wait(watchFuture);
TraceEvent("RatekeeperThrottleSignaled", self->id).log();
TEST(true); // Tag throttle changes detected
break;
} catch (Error& e) {
TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e);
wait(tr.onError(e));
}
}
}
}
Optional<double> autoThrottleTag(UID id, TransactionTag tag, double busyness) {
return throttledTags.autoThrottleTag(id, tag, busyness);
}
Future<Void> tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) {
// NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE
// currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs.
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
TEST(true); // Transaction tag auto-throttled
Optional<double> clientRate = autoThrottleTag(id, tag, busyness);
if (clientRate.present()) {
TagSet tags;
tags.addTag(tag);
Reference<DatabaseContext> dbRef = Reference<DatabaseContext>::addRef(db.getPtr());
return ThrottleApi::throttleTags(dbRef,
tags,
clientRate.get(),
SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
TagThrottleType::AUTO,
TransactionPriority::DEFAULT,
now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
reason);
}
}
return Void();
}
public:
TagThrottlerImpl(Database db, UID id) : db(db), id(id) {}
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
void addRequests(TransactionTag tag, int count) { throttledTags.addRequests(tag, count); }
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
return throttledTags.getClientRates(autoThrottlingEnabled);
}
int64_t autoThrottleCount() const { return throttledTags.autoThrottleCount(); }
uint32_t busyReadTagCount() const { return throttledTags.busyReadTagCount; }
uint32_t busyWriteTagCount() const { return throttledTags.busyWriteTagCount; }
int64_t manualThrottleCount() const { return throttledTags.manualThrottleCount(); }
bool isAutoThrottlingEnabled() const { return autoThrottlingEnabled; }
Future<Void> tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQueue, int64_t storageDurabilityLag) {
// NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In
// most of situation, this works. More indicators besides queue size and durability lag could be investigated in
// the future
if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
if (ss.busiestWriteTag.present()) {
return tryAutoThrottleTag(ss.busiestWriteTag.get(),
ss.busiestWriteTagRate,
ss.busiestWriteTagFractionalBusyness,
TagThrottledReason::BUSY_WRITE);
}
if (ss.busiestReadTag.present()) {
return tryAutoThrottleTag(ss.busiestReadTag.get(),
ss.busiestReadTagRate,
ss.busiestReadTagFractionalBusyness,
TagThrottledReason::BUSY_READ);
}
}
return Void();
}
}; // class TagThrottlerImpl
TagThrottler::TagThrottler(Database db, UID id) : impl(PImpl<TagThrottlerImpl>::create(db, id)) {}
TagThrottler::~TagThrottler() = default;
Future<Void> TagThrottler::monitorThrottlingChanges() {
return impl->monitorThrottlingChanges();
}
void TagThrottler::addRequests(TransactionTag tag, int count) {
impl->addRequests(tag, count);
}
uint64_t TagThrottler::getThrottledTagChangeId() const {
return impl->getThrottledTagChangeId();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> TagThrottler::getClientRates() {
return impl->getClientRates();
}
int64_t TagThrottler::autoThrottleCount() const {
return impl->autoThrottleCount();
}
uint32_t TagThrottler::busyReadTagCount() const {
return impl->busyReadTagCount();
}
uint32_t TagThrottler::busyWriteTagCount() const {
return impl->busyWriteTagCount();
}
int64_t TagThrottler::manualThrottleCount() const {
return impl->manualThrottleCount();
}
bool TagThrottler::isAutoThrottlingEnabled() const {
return impl->isAutoThrottlingEnabled();
}
Future<Void> TagThrottler::tryAutoThrottleTag(StorageQueueInfo& ss,
int64_t storageQueue,
int64_t storageDurabilityLag) {
return impl->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag);
}

42
fdbserver/TagThrottler.h Normal file
View File

@ -0,0 +1,42 @@
/*
* TagThrottler.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/PImpl.h"
#include "fdbserver/Ratekeeper.h"
class TagThrottler {
PImpl<class TagThrottlerImpl> impl;
public:
TagThrottler(Database db, UID id);
~TagThrottler();
Future<Void> monitorThrottlingChanges();
void addRequests(TransactionTag tag, int count);
uint64_t getThrottledTagChangeId() const;
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates();
int64_t autoThrottleCount() const;
uint32_t busyReadTagCount() const;
uint32_t busyWriteTagCount() const;
int64_t manualThrottleCount() const;
bool isAutoThrottlingEnabled() const;
Future<Void> tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag);
};

View File

@ -833,6 +833,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
NetworkAddressList publicNetworkAddresses;
NetworkAddressList listenNetworkAddresses;
connectionRecord.resolveHostnamesBlocking();
auto& coordinators = connectionRecord.getConnectionString().coordinators();
ASSERT(coordinators.size() > 0);
@ -1022,6 +1023,29 @@ struct CLIOptions {
return opts;
}
// Determine publicAddresses and listenAddresses by calling buildNetworkAddresses().
void buildNetwork(const char* name) {
try {
if (!publicAddressStrs.empty()) {
std::tie(publicAddresses, listenAddresses) =
buildNetworkAddresses(*connectionFile, publicAddressStrs, listenAddressStrs);
}
} catch (Error&) {
printHelpTeaser(name);
flushAndExit(FDB_EXIT_ERROR);
}
if (role == ServerRole::ConsistencyCheck) {
if (!publicAddressStrs.empty()) {
fprintf(stderr, "ERROR: Public address cannot be specified for consistency check processes\n");
printHelpTeaser(name);
flushAndExit(FDB_EXIT_ERROR);
}
auto publicIP = determinePublicIPAutomatically(connectionFile->getConnectionString());
publicAddresses.address = NetworkAddress(publicIP, ::getpid());
}
}
private:
CLIOptions() = default;
@ -1594,26 +1618,6 @@ private:
// failmon?
}
try {
if (!publicAddressStrs.empty()) {
std::tie(publicAddresses, listenAddresses) =
buildNetworkAddresses(*connectionFile, publicAddressStrs, listenAddressStrs);
}
} catch (Error&) {
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
if (role == ServerRole::ConsistencyCheck) {
if (!publicAddressStrs.empty()) {
fprintf(stderr, "ERROR: Public address cannot be specified for consistency check processes\n");
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
auto publicIP = determinePublicIPAutomatically(connectionFile->getConnectionString());
publicAddresses.address = NetworkAddress(publicIP, ::getpid());
}
if (role == ServerRole::Simulation) {
Optional<bool> buggifyOverride = checkBuggifyOverride(testFile);
if (buggifyOverride.present())
@ -1692,7 +1696,7 @@ int main(int argc, char* argv[]) {
//_set_output_format(_TWO_DIGIT_EXPONENT);
#endif
const auto opts = CLIOptions::parseArgs(argc, argv);
auto opts = CLIOptions::parseArgs(argc, argv);
const auto role = opts.role;
#ifdef _WIN32
@ -1787,6 +1791,7 @@ int main(int argc, char* argv[]) {
if (role == ServerRole::Simulation || role == ServerRole::CreateTemplateDatabase) {
// startOldSimulator();
opts.buildNetwork(argv[0]);
startNewSimulator(opts.printSimTime);
openTraceFile(NetworkAddress(), opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup);
openTracer(TracerType(deterministicRandom()->randomInt(static_cast<int>(TracerType::DISABLED),
@ -1795,6 +1800,7 @@ int main(int argc, char* argv[]) {
g_network = newNet2(opts.tlsConfig, opts.useThreadPool, true);
g_network->addStopCallback(Net2FileSystem::stop);
FlowTransport::createInstance(false, 1, WLTOKEN_RESERVED_COUNT);
opts.buildNetwork(argv[0]);
const bool expectsPublicAddress =
(role == ServerRole::FDBD || role == ServerRole::NetworkTestServer || role == ServerRole::Restore);

View File

@ -2308,10 +2308,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ACTOR Future<Void> extractClusterInterface(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> in,
Reference<AsyncVar<Optional<ClusterInterface>>> out) {
loop {
if (in->get().present())
if (in->get().present()) {
out->set(in->get().get().clientInterface);
else
} else {
out->set(Optional<ClusterInterface>());
}
wait(in->onChange());
}
}
@ -2509,9 +2510,14 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
}
successIndex = index;
} else {
if (leader.isError() && leader.getError().code() == error_code_coordinators_changed) {
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
throw coordinators_changed();
}
index = (index + 1) % addrs.size();
if (index == successIndex) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
throw coordinators_changed();
}
}
}
@ -2519,11 +2525,22 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
ACTOR Future<Void> monitorLeaderWithDelayedCandidacyImplInternal(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Value>> outSerializedLeaderInfo) {
wait(connRecord->resolveHostnames());
state MonitorLeaderInfo info(connRecord);
loop {
MonitorLeaderInfo _info =
wait(monitorLeaderWithDelayedCandidacyImplOneGeneration(connRecord, outSerializedLeaderInfo, info));
info = _info;
try {
wait(info.intermediateConnRecord->resolveHostnames());
MonitorLeaderInfo _info =
wait(monitorLeaderWithDelayedCandidacyImplOneGeneration(connRecord, outSerializedLeaderInfo, info));
info = _info;
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
TraceEvent("MonitorLeaderWithDelayedCandidacyCoordinatorsChanged").suppressFor(1.0);
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
} else {
throw e;
}
}
}
}
@ -2657,6 +2674,7 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
actors.push_back(serveProcess());
try {
wait(connRecord->resolveHostnames());
ServerCoordinators coordinators(connRecord);
if (g_network->isSimulated()) {
whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,";

View File

@ -36,9 +36,17 @@ static const char* storageMigrationTypes[] = { "perpetual_storage_wiggle=0 stora
"perpetual_storage_wiggle=1",
"perpetual_storage_wiggle=1 storage_migration_type=gradual",
"storage_migration_type=aggressive" };
static const char* logTypes[] = { "log_engine:=1", "log_engine:=2", "log_spill:=1", "log_spill:=2",
"log_version:=2", "log_version:=3", "log_version:=4", "log_version:=5",
"log_version:=6", "log_version:=7" };
static const char* logTypes[] = { "log_engine:=1",
"log_engine:=2",
"log_spill:=1",
"log_spill:=2",
"log_version:=2",
"log_version:=3",
"log_version:=4",
"log_version:=5",
"log_version:=6",
// downgrade incompatible log version
"log_version:=7" };
static const char* redundancies[] = { "single", "double", "triple" };
static const char* backupTypes[] = { "backup_worker_enabled:=0", "backup_worker_enabled:=1" };
@ -220,6 +228,8 @@ struct ConfigureDatabaseWorkload : TestWorkload {
int additionalDBs;
bool allowDescriptorChange;
bool allowTestStorageMigration;
bool waitStoreTypeCheck;
bool downgradeTest1; // if this is true, don't pick up downgrade incompatible config
std::vector<Future<Void>> clients;
PerfIntCounter retries;
@ -229,6 +239,8 @@ struct ConfigureDatabaseWorkload : TestWorkload {
getOption(options, LiteralStringRef("allowDescriptorChange"), SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT);
allowTestStorageMigration =
getOption(options, "allowTestStorageMigration"_sr, false) && g_simulator.allowStorageMigrationTypeChange;
waitStoreTypeCheck = getOption(options, "waitStoreTypeCheck"_sr, false);
downgradeTest1 = getOption(options, "downgradeTest1"_sr, false);
g_simulator.usableRegions = 1;
}
@ -273,7 +285,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
// only storage_migration_type=gradual && perpetual_storage_wiggle=1 need this check because in QuietDatabase
// perpetual wiggle will be forced to close For other cases, later ConsistencyCheck will check KV store type
// there
if (self->allowTestStorageMigration) {
if (self->allowTestStorageMigration || self->waitStoreTypeCheck) {
loop {
// There exists a race where the check can start before the last transaction that singleDB issued
// finishes, if singleDB gets actor cancelled from a timeout at the end of a test. This means the
@ -404,8 +416,14 @@ struct ConfigureDatabaseWorkload : TestWorkload {
true)));
} else if (randomChoice == 6) {
// Some configurations will be invalid, and that's fine.
wait(success(IssueConfigurationChange(
cx, logTypes[deterministicRandom()->randomInt(0, sizeof(logTypes) / sizeof(logTypes[0]))], false)));
int length = sizeof(logTypes) / sizeof(logTypes[0]);
if (self->downgradeTest1) {
length -= 1;
}
wait(success(
IssueConfigurationChange(cx, logTypes[deterministicRandom()->randomInt(0, length)], false)));
} else if (randomChoice == 7) {
wait(success(IssueConfigurationChange(
cx,

View File

@ -926,10 +926,11 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
wait(tx->get(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
ASSERT(coordinator_processes_key.present());
std::vector<std::string> process_addresses;
state std::vector<std::string> process_addresses;
boost::split(
process_addresses, coordinator_processes_key.get().toString(), [](char c) { return c == ','; });
ASSERT(process_addresses.size() == cs.coordinators().size());
ASSERT(process_addresses.size() == cs.coordinators().size() + cs.hostnames.size());
wait(cs.resolveHostnames());
// compare the coordinator process network addresses one by one
for (const auto& network_address : cs.coordinators()) {
ASSERT(std::find(process_addresses.begin(), process_addresses.end(), network_address.toString()) !=
@ -970,16 +971,15 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
old_coordinators_processes, processes_key.get().toString(), [](char c) { return c == ','; });
// pick up one non-coordinator process if possible
std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
std::string old_coordinators_processes_string = describe(old_coordinators_processes);
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("OldCoordinators", describe(old_coordinators_processes))
.detail("OldCoordinators", old_coordinators_processes_string)
.detail("WorkerSize", workers.size());
if (workers.size() > old_coordinators_processes.size()) {
loop {
auto worker = deterministicRandom()->randomChoice(workers);
new_coordinator_process = worker.address.toString();
if (std::find(old_coordinators_processes.begin(),
old_coordinators_processes.end(),
worker.address.toString()) == old_coordinators_processes.end()) {
if (old_coordinators_processes_string.find(new_coordinator_process) == std::string::npos) {
break;
}
}
@ -1049,10 +1049,11 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> res = wait(tx->get(coordinatorsKey));
ASSERT(res.present()); // Otherwise, database is in a bad state
ClusterConnectionString cs(res.get().toString());
ASSERT(cs.coordinators().size() == old_coordinators_processes.size() + 1);
state ClusterConnectionString csNew(res.get().toString());
wait(csNew.resolveHostnames());
ASSERT(csNew.coordinators().size() == old_coordinators_processes.size() + 1);
// verify the coordinators' addresses
for (const auto& network_address : cs.coordinators()) {
for (const auto& network_address : csNew.coordinators()) {
std::string address_str = network_address.toString();
ASSERT(std::find(old_coordinators_processes.begin(),
old_coordinators_processes.end(),
@ -1060,7 +1061,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
new_coordinator_process == address_str);
}
// verify the cluster decription
ASSERT(new_cluster_description == cs.clusterKeyName().toString());
ASSERT(new_cluster_description == csNew.clusterKeyName().toString());
tx->reset();
} catch (Error& e) {
wait(tx->onError(e));

View File

@ -172,7 +172,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( MIN_LOGGED_PRIORITY_BUSY_FRACTION, 0.05 );
init( CERT_FILE_MAX_SIZE, 5 * 1024 * 1024 );
init( READY_QUEUE_RESERVED_SIZE, 8192 );
init( ITERATIONS_PER_REACTOR_CHECK, 5 );
init( ITERATIONS_PER_REACTOR_CHECK, 100 );
//Network
init( PACKET_LIMIT, 100LL<<20 );

View File

@ -20,12 +20,14 @@
#include "flow/StreamCipher.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/ITrace.h"
#include "flow/UnitTest.h"
#include <memory>
std::unordered_set<EVP_CIPHER_CTX*> StreamCipher::ctxs;
std::unordered_set<StreamCipherKey*> StreamCipherKey::cipherKeys;
UID StreamCipherKey::globalKeyId;
std::unordered_map<UID, EVP_CIPHER_CTX*> StreamCipher::ctxs;
std::unordered_map<UID, StreamCipherKey*> StreamCipherKey::cipherKeys;
std::unique_ptr<StreamCipherKey> StreamCipherKey::globalKey;
bool StreamCipherKey::isGlobalKeyPresent() {
@ -36,8 +38,9 @@ void StreamCipherKey::allocGlobalCipherKey() {
if (StreamCipherKey::isGlobalKeyPresent()) {
return;
}
StreamCipherKey::globalKeyId = deterministicRandom()->randomUniqueID();
StreamCipherKey::globalKey = std::make_unique<StreamCipherKey>(AES_256_KEY_LENGTH);
StreamCipherKey::cipherKeys.insert(StreamCipherKey::globalKey.get());
StreamCipherKey::cipherKeys[StreamCipherKey::globalKeyId] = StreamCipherKey::globalKey.get();
}
void StreamCipherKey::initializeGlobalRandomTestKey() {
@ -56,8 +59,8 @@ StreamCipherKey const* StreamCipherKey::getGlobalCipherKey() {
}
void StreamCipherKey::cleanup() noexcept {
for (auto cipherKey : cipherKeys) {
cipherKey->reset();
for (const auto& itr : cipherKeys) {
itr.second->reset();
}
}
@ -67,31 +70,33 @@ void StreamCipherKey::initializeKey(uint8_t* data, int len) {
memcpy(arr.get(), data, copyLen);
}
StreamCipherKey::StreamCipherKey(int size) : arr(std::make_unique<uint8_t[]>(size)), keySize(size) {
StreamCipherKey::StreamCipherKey(int size)
: id(deterministicRandom()->randomUniqueID()), arr(std::make_unique<uint8_t[]>(size)), keySize(size) {
memset(arr.get(), 0, keySize);
cipherKeys.insert(this);
cipherKeys[id] = this;
}
StreamCipherKey::~StreamCipherKey() {
reset();
cipherKeys.erase(this);
cipherKeys.erase(this->id);
}
StreamCipher::StreamCipher(int keySize)
: ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()), cipherKey(std::make_unique<StreamCipherKey>(keySize)) {
ctxs.insert(ctx);
: id(deterministicRandom()->randomUniqueID()), ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()),
cipherKey(std::make_unique<StreamCipherKey>(keySize)) {
ctxs[id] = ctx;
}
StreamCipher::StreamCipher()
: ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()),
: id(deterministicRandom()->randomUniqueID()), ctx(EVP_CIPHER_CTX_new()), hmacCtx(HMAC_CTX_new()),
cipherKey(std::make_unique<StreamCipherKey>(AES_256_KEY_LENGTH)) {
ctxs.insert(ctx);
ctxs[id] = ctx;
}
StreamCipher::~StreamCipher() {
HMAC_CTX_free(hmacCtx);
EVP_CIPHER_CTX_free(ctx);
ctxs.erase(ctx);
ctxs.erase(id);
}
EVP_CIPHER_CTX* StreamCipher::getCtx() {
@ -103,8 +108,8 @@ HMAC_CTX* StreamCipher::getHmacCtx() {
}
void StreamCipher::cleanup() noexcept {
for (auto ctx : ctxs) {
EVP_CIPHER_CTX_free(ctx);
for (auto itr : ctxs) {
EVP_CIPHER_CTX_free(itr.second);
}
}

View File

@ -44,8 +44,10 @@
// Wrapper class for openssl implementation of AES GCM
// encryption/decryption
class StreamCipherKey : NonCopyable {
static UID globalKeyId;
static std::unique_ptr<StreamCipherKey> globalKey;
static std::unordered_set<StreamCipherKey*> cipherKeys;
static std::unordered_map<UID, StreamCipherKey*> cipherKeys;
UID id;
std::unique_ptr<uint8_t[]> arr;
int keySize;
@ -67,7 +69,8 @@ public:
};
class StreamCipher final : NonCopyable {
static std::unordered_set<EVP_CIPHER_CTX*> ctxs;
UID id;
static std::unordered_map<UID, EVP_CIPHER_CTX*> ctxs;
EVP_CIPHER_CTX* ctx;
HMAC_CTX* hmacCtx;
std::unique_ptr<StreamCipherKey> cipherKey;

View File

@ -255,6 +255,14 @@ if(WITH_PYTHON)
add_fdb_test(
TEST_FILES restarting/from_7.0.0/SnapCycleRestart-1.txt
restarting/from_7.0.0/SnapCycleRestart-2.txt)
add_fdb_test(
TEST_FILES restarting/to_7.1.0/ConfigureStorageMigrationTestRestart-1.toml
restarting/to_7.1.0/ConfigureStorageMigrationTestRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.1.0/ConfigureStorageMigrationTestRestart-1.toml
restarting/from_7.1.0/ConfigureStorageMigrationTestRestart-2.toml)
add_fdb_test(TEST_FILES slow/ApiCorrectness.toml)
add_fdb_test(TEST_FILES slow/ApiCorrectnessAtomicRestore.toml)
add_fdb_test(TEST_FILES slow/ApiCorrectnessSwitchover.toml)

View File

@ -0,0 +1,27 @@
[configuration]
extraMachineCountDC = 2
[[test]]
testTitle = 'CloggedConfigureDatabaseTest'
clearAfterTest = false
[[test.workload]]
testName = 'ConfigureDatabase'
testDuration = 30.0
allowTestStorageMigration = true
allowDescriptorChange = false
[[test.workload]]
testName = 'RandomClogging'
testDuration = 30.0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 30.0
scale = 0.1
clogginess = 2.0
[[test.workload]]
testName='SaveAndKill'
restartInfoLocation='simfdb/restartInfo.ini'
testDuration=30.0

View File

@ -0,0 +1,22 @@
[configuration]
extraMachineCountDC = 2
[[test]]
testTitle = 'CloggedConfigureDatabaseTest'
runSetup=false
waitForQuiescenceBegin=false
[[test.workload]]
testName = 'ConfigureDatabase'
testDuration = 300.0
waitStoreTypeCheck = true
[[test.workload]]
testName = 'RandomClogging'
testDuration = 300.0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 300.0
scale = 0.1
clogginess = 2.0

View File

@ -0,0 +1,31 @@
[configuration]
extraMachineCountDC = 2
maxTLogVersion=6
disableHostname=true
storageEngineExcludeTypes=[4]
[[test]]
testTitle = 'CloggedConfigureDatabaseTest'
clearAfterTest = false
[[test.workload]]
testName = 'ConfigureDatabase'
testDuration = 30.0
allowTestStorageMigration = true
allowDescriptorChange = false
downgradeTest1 = true
[[test.workload]]
testName = 'RandomClogging'
testDuration = 30.0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 30.0
scale = 0.1
clogginess = 2.0
[[test.workload]]
testName='SaveAndKill'
restartInfoLocation='simfdb/restartInfo.ini'
testDuration=30.0

View File

@ -0,0 +1,22 @@
[configuration]
extraMachineCountDC = 2
[[test]]
testTitle = 'CloggedConfigureDatabaseTest'
runSetup=false
waitForQuiescenceBegin=false
[[test.workload]]
testName = 'ConfigureDatabase'
testDuration = 300.0
waitStoreTypeCheck = true
[[test.workload]]
testName = 'RandomClogging'
testDuration = 300.0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 300.0
scale = 0.1
clogginess = 2.0