Merge branch 'main' of github.com:apple/foundationdb into jfu-grv-cache

This commit is contained in:
Jon Fu 2022-01-24 13:08:55 -05:00
commit 5d9843ec32
15 changed files with 495 additions and 86 deletions

View File

@ -22,6 +22,7 @@ use_libcxx(_use_libcxx)
env_set(USE_LIBCXX "${_use_libcxx}" BOOL "Use libc++")
static_link_libcxx(_static_link_libcxx)
env_set(STATIC_LINK_LIBCXX "${_static_link_libcxx}" BOOL "Statically link libstdcpp/libc++")
env_set(TRACE_PC_GUARD_INSTRUMENTATION_LIB "" STRING "Path to a library containing an implementation for __sanitizer_cov_trace_pc_guard. See https://clang.llvm.org/docs/SanitizerCoverage.html for more info.")
set(USE_SANITIZER OFF)
if(USE_ASAN OR USE_VALGRIND OR USE_MSAN OR USE_TSAN OR USE_UBSAN)
@ -155,6 +156,10 @@ else()
# we always compile with debug symbols. CPack will strip them out
# and create a debuginfo rpm
add_compile_options(-ggdb -fno-omit-frame-pointer)
if(TRACE_PC_GUARD_INSTRUMENTATION_LIB)
add_compile_options(-fsanitize-coverage=trace-pc-guard)
link_libraries(${TRACE_PC_GUARD_INSTRUMENTATION_LIB})
endif()
if(USE_ASAN)
list(APPEND SANITIZER_COMPILE_OPTIONS
-fsanitize=address

View File

@ -67,7 +67,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( RESOURCE_CONSTRAINED_MAX_BACKOFF, 30.0 );
init( PROXY_COMMIT_OVERHEAD_BYTES, 23 ); //The size of serializing 7 tags (3 primary, 3 remote, 1 log router) + 2 for the tag length
init( SHARD_STAT_SMOOTH_AMOUNT, 5.0 );
init( INIT_MID_SHARD_BYTES, 200000 ); if( randomize && BUGGIFY ) INIT_MID_SHARD_BYTES = 40000; // The same value as SERVER_KNOBS->MIN_SHARD_BYTES
init( INIT_MID_SHARD_BYTES, 50000000 ); if( randomize && BUGGIFY ) INIT_MID_SHARD_BYTES = 40000; else if(randomize && !BUGGIFY) INIT_MID_SHARD_BYTES = 200000; // The same value as SERVER_KNOBS->MIN_SHARD_BYTES
init( TRANSACTION_SIZE_LIMIT, 1e7 );
init( KEY_SIZE_LIMIT, 1e4 );

View File

@ -59,22 +59,34 @@ struct ClientLeaderRegInterface {
class ClusterConnectionString {
public:
ClusterConnectionString() {}
ClusterConnectionString(std::string const& connectionString);
ClusterConnectionString(std::vector<NetworkAddress>, Key);
ClusterConnectionString(const std::string& connStr);
ClusterConnectionString(const std::vector<NetworkAddress>& coordinators, Key key);
ClusterConnectionString(const std::vector<Hostname>& hosts, Key key);
std::vector<NetworkAddress> const& coordinators() const { return coord; }
std::vector<NetworkAddress> const& coordinators() const { return coords; }
void addResolved(Hostname hostname, NetworkAddress address) {
coords.push_back(address);
networkAddressToHostname.emplace(address, hostname);
}
Key clusterKey() const { return key; }
Key clusterKeyName() const {
return keyDesc;
} // Returns the "name" or "description" part of the clusterKey (the part before the ':')
std::string toString() const;
static std::string getErrorString(std::string const& source, Error const& e);
Future<Void> resolveHostnames();
void resetToUnresolved();
bool hasUnresolvedHostnames = false;
std::vector<NetworkAddress> coords;
std::vector<Hostname> hostnames;
private:
void parseKey(std::string const& key);
std::vector<NetworkAddress> coord;
void parseConnString();
void parseKey(const std::string& key);
std::unordered_map<NetworkAddress, Hostname> networkAddressToHostname;
Key key, keyDesc;
std::string connectionString;
};
FDB_DECLARE_BOOLEAN_PARAM(ConnectionStringNeedsPersisted);
@ -95,6 +107,8 @@ public:
// been persisted or if the persistent storage for the record has been modified externally.
ClusterConnectionString const& getConnectionString() const;
ClusterConnectionString* getMutableConnectionString();
// Sets the connections string held by this object and persists it.
virtual Future<Void> setConnectionString(ClusterConnectionString const&) = 0;
@ -124,6 +138,9 @@ public:
// Signals to the connection record that it was successfully used to connect to a cluster.
void notifyConnected();
bool hasUnresolvedHostnames() const;
Future<Void> resolveHostnames();
virtual void addref() = 0;
virtual void delref() = 0;
@ -151,7 +168,10 @@ struct LeaderInfo {
UID changeID;
static const uint64_t changeIDMask = ~(uint64_t(0b1111111) << 57);
Value serializedInfo;
bool forward; // If true, serializedInfo is a connection string instead!
// If true, serializedInfo is a connection string instead!
// If true, it also means the receipient need to update their local cluster file
// with the latest list of coordinators
bool forward;
LeaderInfo() : forward(false) {}
LeaderInfo(UID changeID) : changeID(changeID), forward(false) {}

View File

@ -665,6 +665,10 @@ struct RangeResultRef : VectorRef<KeyValueRef> {
serializer(ar, ((VectorRef<KeyValueRef>&)*this), more, readThrough, readToBegin, readThroughEnd);
}
int logicalSize() const {
return VectorRef<KeyValueRef>::expectedSize() - VectorRef<KeyValueRef>::size() * sizeof(KeyValueRef);
}
std::string toString() const {
return "more:" + std::to_string(more) +
" readThrough:" + (readThrough.present() ? readThrough.get().toString() : "[unset]") +

View File

@ -58,6 +58,10 @@ ClusterConnectionString const& IClusterConnectionRecord::getConnectionString() c
return cs;
}
ClusterConnectionString* IClusterConnectionRecord::getMutableConnectionString() {
return &cs;
}
Future<bool> IClusterConnectionRecord::upToDate() {
ClusterConnectionString temp;
return upToDate(temp);
@ -77,6 +81,14 @@ void IClusterConnectionRecord::setPersisted() {
connectionStringNeedsPersisted = false;
}
bool IClusterConnectionRecord::hasUnresolvedHostnames() const {
return cs.hasUnresolvedHostnames;
}
Future<Void> IClusterConnectionRecord::resolveHostnames() {
return cs.resolveHostnames();
}
std::string ClusterConnectionString::getErrorString(std::string const& source, Error const& e) {
if (e.code() == error_code_connection_string_invalid) {
return format("Invalid connection string `%s: %d %s", source.c_str(), e.code(), e.what());
@ -85,28 +97,87 @@ std::string ClusterConnectionString::getErrorString(std::string const& source, E
}
}
ClusterConnectionString::ClusterConnectionString(std::string const& connectionString) {
auto trimmed = trim(connectionString);
// Split on '@' into key@addrs
int pAt = trimmed.find_first_of('@');
if (pAt == trimmed.npos)
throw connection_string_invalid();
std::string key = trimmed.substr(0, pAt);
std::string addrs = trimmed.substr(pAt + 1);
parseKey(key);
coord = NetworkAddress::parseList(addrs);
ASSERT(coord.size() > 0); // parseList() always returns at least one address if it doesn't throw
std::sort(coord.begin(), coord.end());
// Check that there are no duplicate addresses
if (std::unique(coord.begin(), coord.end()) != coord.end())
ACTOR Future<Void> resolveHostnamesImpl(ClusterConnectionString* self) {
std::vector<Future<Void>> fs;
for (auto const& hostName : self->hostnames) {
fs.push_back(map(INetworkConnections::net()->resolveTCPEndpoint(hostName.host, hostName.service),
[=](std::vector<NetworkAddress> const& addresses) -> Void {
NetworkAddress addr = addresses[deterministicRandom()->randomInt(0, addresses.size())];
addr.flags = 0; // Reset the parsed address to public
addr.fromHostname = NetworkAddressFromHostname::True;
if (hostName.isTLS) {
addr.flags |= NetworkAddress::FLAG_TLS;
}
self->addResolved(hostName, addr);
return Void();
}));
}
wait(waitForAll(fs));
std::sort(self->coords.begin(), self->coords.end());
if (std::unique(self->coords.begin(), self->coords.end()) != self->coords.end()) {
throw connection_string_invalid();
}
self->hasUnresolvedHostnames = false;
return Void();
}
TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") {
Future<Void> ClusterConnectionString::resolveHostnames() {
if (!hasUnresolvedHostnames) {
return Void();
} else {
return resolveHostnamesImpl(this);
}
}
void ClusterConnectionString::resetToUnresolved() {
if (hostnames.size() > 0) {
coords.clear();
hostnames.clear();
networkAddressToHostname.clear();
hasUnresolvedHostnames = true;
parseConnString();
}
}
void ClusterConnectionString::parseConnString() {
// Split on '@' into key@addrs
int pAt = connectionString.find_first_of('@');
if (pAt == connectionString.npos) {
throw connection_string_invalid();
}
std::string key = connectionString.substr(0, pAt);
std::string addrs = connectionString.substr(pAt + 1);
parseKey(key);
std::string curAddr;
for (int p = 0; p <= addrs.size();) {
int pComma = addrs.find_first_of(',', p);
if (pComma == addrs.npos)
pComma = addrs.size();
curAddr = addrs.substr(p, pComma - p);
if (Hostname::isHostname(curAddr)) {
hostnames.push_back(Hostname::parse(curAddr));
} else {
coords.push_back(NetworkAddress::parse(curAddr));
}
p = pComma + 1;
}
hasUnresolvedHostnames = hostnames.size() > 0;
ASSERT((coords.size() + hostnames.size()) > 0);
std::sort(coords.begin(), coords.end());
// Check that there are no duplicate addresses
if (std::unique(coords.begin(), coords.end()) != coords.end()) {
throw connection_string_invalid();
}
}
ClusterConnectionString::ClusterConnectionString(const std::string& connStr) {
connectionString = trim(connStr);
parseConnString();
}
TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/addresses") {
std::string input;
{
@ -157,6 +228,97 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") {
return Void();
}
TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
std::string input;
{
input = "asdf:2345@localhost:1234";
ClusterConnectionString cs(input);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.hostnames.size() == 1);
ASSERT(input == cs.toString());
}
{
input = "0xxdeadbeef:100100100@localhost:34534,host-name:23443";
ClusterConnectionString cs(input);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
{
input = "0xxdeadbeef:100100100@localhost:34534,host-name:23443";
std::string commented("#start of comment\n");
commented += input;
commented += "\n";
commented += "# asdfasdf ##";
ClusterConnectionString cs(commented);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
{
input = "0xxdeadbeef:100100100@localhost:34534,host-name_part1.host-name_part2:1234:tls";
std::string commented("#start of comment\n");
commented += input;
commented += "\n";
commented += "# asdfasdf ##";
ClusterConnectionString cs(commented);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
return Void();
}
TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") {
state std::string connectionString = "TestCluster:0@localhost:1234,host-name:5678";
std::string hn1 = "localhost", port1 = "1234";
state std::string hn2 = "host-name";
state std::string port2 = "5678";
state std::vector<Hostname> hostnames;
hostnames.push_back(Hostname::parse(hn1 + ":" + port1));
hostnames.push_back(Hostname::parse(hn2 + ":" + port2));
NetworkAddress address1 = NetworkAddress::parse("127.0.0.0:1234");
NetworkAddress address2 = NetworkAddress::parse("127.0.0.1:5678");
INetworkConnections::net()->addMockTCPEndpoint(hn1, port1, { address1 });
INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address2 });
state ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0"));
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
wait(cs.resolveHostnames());
ASSERT(!cs.hasUnresolvedHostnames);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 2);
ASSERT(cs.toString() == connectionString);
cs.resetToUnresolved();
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
ASSERT(cs.toString() == connectionString);
INetworkConnections::net()->removeMockTCPEndpoint(hn2, port2);
NetworkAddress address3 = NetworkAddress::parse("127.0.0.0:5678");
INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address3 });
try {
wait(cs.resolveHostnames());
} catch (Error& e) {
ASSERT(e.code() == error_code_connection_string_invalid);
}
return Void();
}
TEST_CASE("/flow/FlatBuffers/LeaderInfo") {
{
LeaderInfo in;
@ -237,29 +399,56 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/fuzz") {
return Void();
}
ClusterConnectionString::ClusterConnectionString(std::vector<NetworkAddress> servers, Key key) : coord(servers) {
parseKey(key.toString());
ClusterConnectionString::ClusterConnectionString(const std::vector<NetworkAddress>& servers, Key key)
: coords(servers) {
std::string keyString = key.toString();
parseKey(keyString);
connectionString = keyString + "@";
for (int i = 0; i < coords.size(); i++) {
if (i) {
connectionString += ',';
}
connectionString += coords[i].toString();
}
}
void ClusterConnectionString::parseKey(std::string const& key) {
ClusterConnectionString::ClusterConnectionString(const std::vector<Hostname>& hosts, Key key)
: hasUnresolvedHostnames(true), hostnames(hosts) {
std::string keyString = key.toString();
parseKey(keyString);
connectionString = keyString + "@";
for (int i = 0; i < hostnames.size(); i++) {
if (i) {
connectionString += ',';
}
connectionString += hostnames[i].toString();
}
}
void ClusterConnectionString::parseKey(const std::string& key) {
// Check the structure of the given key, and fill in this->key and this->keyDesc
// The key must contain one (and only one) : character
int colon = key.find_first_of(':');
if (colon == key.npos)
if (colon == key.npos) {
throw connection_string_invalid();
}
std::string desc = key.substr(0, colon);
std::string id = key.substr(colon + 1);
// Check that description contains only allowed characters (a-z, A-Z, 0-9, _)
for (auto c = desc.begin(); c != desc.end(); ++c)
if (!(isalnum(*c) || *c == '_'))
for (auto c = desc.begin(); c != desc.end(); ++c) {
if (!(isalnum(*c) || *c == '_')) {
throw connection_string_invalid();
}
}
// Check that ID contains only allowed characters (a-z, A-Z, 0-9)
for (auto c = id.begin(); c != id.end(); ++c)
if (!isalnum(*c))
for (auto c = id.begin(); c != id.end(); ++c) {
if (!isalnum(*c)) {
throw connection_string_invalid();
}
}
this->key = StringRef(key);
this->keyDesc = StringRef(desc);
@ -268,11 +457,19 @@ void ClusterConnectionString::parseKey(std::string const& key) {
std::string ClusterConnectionString::toString() const {
std::string s = key.toString();
s += '@';
for (int i = 0; i < coord.size(); i++) {
if (i) {
for (int i = 0; i < coords.size(); i++) {
if (networkAddressToHostname.find(coords[i]) == networkAddressToHostname.end()) {
if (s.find('@') != s.length() - 1) {
s += ',';
}
s += coords[i].toString();
}
}
for (auto const& host : hostnames) {
if (s.find('@') != s.length() - 1) {
s += ',';
}
s += coord[i].toString();
s += host.toString();
}
return s;
}
@ -669,7 +866,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
Key traceLogGroup) {
state ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString();
state std::vector<NetworkAddress> addrs = cs.coordinators();
state int idx = 0;
state int index = 0;
state int successIndex = 0;
state Optional<double> incorrectTime;
state std::vector<UID> lastCommitProxyUIDs;
@ -679,7 +876,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
deterministicRandom()->randomShuffle(addrs);
loop {
state ClientLeaderRegInterface clientLeaderServer(addrs[idx]);
state ClientLeaderRegInterface clientLeaderServer(addrs[index]);
state OpenDatabaseCoordRequest req;
coordinator->set(clientLeaderServer);
@ -742,11 +939,11 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
auto& ni = rep.get().mutate();
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
clientInfo->set(ni);
successIndex = idx;
successIndex = index;
} else {
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller
idx = (idx + 1) % addrs.size();
if (idx == successIndex) {
index = (index + 1) % addrs.size();
if (index == successIndex) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
}
}

View File

@ -1335,7 +1335,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
bytesPerCommit(1000), outstandingWatches(0), lastTimedGrv(0.0), cachedRv(0), sharedStatePtr(nullptr),
bytesPerCommit(1000), outstandingWatches(0), sharedStatePtr(nullptr), lastTimedGrv(0.0), cachedRv(0),
lastTimedRkThrottle(0.0), lastProxyRequest(0.0), transactionTracingSample(false), taskID(taskID),
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion),
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),

View File

@ -148,7 +148,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( RETRY_RELOCATESHARD_DELAY, 0.1 );
init( DATA_DISTRIBUTION_FAILURE_REACTION_TIME, 60.0 ); if( randomize && BUGGIFY ) DATA_DISTRIBUTION_FAILURE_REACTION_TIME = 1.0;
bool buggifySmallShards = randomize && BUGGIFY;
init( MIN_SHARD_BYTES, 200000 ); if( buggifySmallShards ) MIN_SHARD_BYTES = 40000; //FIXME: data distribution tracker (specifically StorageMetrics) relies on this number being larger than the maximum size of a key value pair
bool simulationMediumShards = !buggifySmallShards && randomize && !BUGGIFY; // prefer smaller shards in simulation
init( MIN_SHARD_BYTES, 50000000 ); if( buggifySmallShards ) MIN_SHARD_BYTES = 40000; if (simulationMediumShards) MIN_SHARD_BYTES = 200000; //FIXME: data distribution tracker (specifically StorageMetrics) relies on this number being larger than the maximum size of a key value pair
init( SHARD_BYTES_RATIO, 4 );
init( SHARD_BYTES_PER_SQRT_BYTES, 45 ); if( buggifySmallShards ) SHARD_BYTES_PER_SQRT_BYTES = 0;//Approximately 10000 bytes per shard
init( MAX_SHARD_BYTES, 500000000 );
@ -767,7 +768,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( FASTRESTORE_RATE_UPDATE_SECONDS, 1.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_RATE_UPDATE_SECONDS = deterministicRandom()->random01() < 0.5 ? 0.1 : 2;}
init( FASTRESTORE_DUMP_INSERT_RANGE_VERSION, false );
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
init( REDWOOD_DEFAULT_PAGE_SIZE, 8192 );
init( REDWOOD_DEFAULT_EXTENT_SIZE, 32 * 1024 * 1024 );
init( REDWOOD_DEFAULT_EXTENT_READ_SIZE, 1024 * 1024 );
init( REDWOOD_EXTENT_CONCURRENT_READS, 4 );

View File

@ -249,6 +249,9 @@ class TestConfig {
if (attrib == "disableTss") {
disableTss = strcmp(value.c_str(), "true") == 0;
}
if (attrib == "disableHostname") {
disableHostname = strcmp(value.c_str(), "true") == 0;
}
if (attrib == "restartInfoLocation") {
isFirstTestInRestart = true;
}
@ -276,6 +279,8 @@ public:
bool isFirstTestInRestart = false;
// 7.0 cannot be downgraded to 6.3 after enabling TSS, so disable TSS for 6.3 downgrade tests
bool disableTss = false;
// 7.1 cannot be downgraded to 7.0 and below after enabling hostname, so disable hostname for 7.0 downgrade tests
bool disableHostname = false;
// Storage Engine Types: Verify match with SimulationConfig::generateNormalConfig
// 0 = "ssd"
// 1 = "memory"
@ -329,6 +334,7 @@ public:
.add("storageEngineExcludeTypes", &storageEngineExcludeTypes)
.add("maxTLogVersion", &maxTLogVersion)
.add("disableTss", &disableTss)
.add("disableHostname", &disableHostname)
.add("simpleConfig", &simpleConfig)
.add("generateFearless", &generateFearless)
.add("datacenters", &datacenters)
@ -1041,6 +1047,12 @@ ACTOR Future<Void> restartSimulatedSystem(std::vector<Future<Void>>* systemActor
if (enableExtraDB) {
g_simulator.extraDB = new ClusterConnectionString(ini.GetValue("META", "connectionString"));
}
if (!testConfig.disableHostname) {
auto mockDNSStr = ini.GetValue("META", "mockDNS");
if (mockDNSStr != nullptr) {
INetworkConnections::net()->parseMockDNSFromString(mockDNSStr);
}
}
*pConnString = conn;
*pTesterCount = testerCount;
bool usingSSL = conn.toString().find(":tls") != std::string::npos || listenersPerProcess > 1;
@ -1883,6 +1895,7 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
// Use SSL 5% of the time
bool sslEnabled = deterministicRandom()->random01() < 0.10;
bool sslOnly = sslEnabled && deterministicRandom()->coinflip();
bool isTLS = sslEnabled && sslOnly;
g_simulator.listenersPerProcess = sslEnabled && !sslOnly ? 2 : 1;
TEST(sslEnabled); // SSL enabled
TEST(!sslEnabled); // SSL disabled
@ -1892,8 +1905,18 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
TEST(useIPv6); // Use IPv6
TEST(!useIPv6); // Use IPv4
// TODO(renxuan): Use hostname 25% of the time, unless it is disabled
bool useHostname = false; // !testConfig.disableHostname && deterministicRandom()->random01() < 0.25;
TEST(useHostname); // Use hostname
TEST(!useHostname); // Use IP address
NetworkAddressFromHostname fromHostname =
useHostname ? NetworkAddressFromHostname::True : NetworkAddressFromHostname::False;
std::vector<NetworkAddress> coordinatorAddresses;
std::vector<Hostname> coordinatorHostnames;
std::vector<NetworkAddress> extraCoordinatorAddresses; // Used by extra DB if the DR db is a new one
std::vector<Hostname> extraCoordinatorHostnames;
if (testConfig.minimumRegions > 1) {
// do not put coordinators in the primary region so that we can kill that region safely
int nonPrimaryDcs = dataCenters / 2;
@ -1901,12 +1924,27 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
int dcCoordinators = coordinatorCount / nonPrimaryDcs + ((dc - 1) / 2 < coordinatorCount % nonPrimaryDcs);
for (int m = 0; m < dcCoordinators; m++) {
auto ip = makeIPAddressForSim(useIPv6, { 2, dc, 1, m });
coordinatorAddresses.push_back(
NetworkAddress(ip, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
uint16_t port = sslEnabled && !sslOnly ? 2 : 1;
NetworkAddress coordinator(ip, port, true, isTLS, fromHostname);
coordinatorAddresses.push_back(coordinator);
auto extraIp = makeIPAddressForSim(useIPv6, { 4, dc, 1, m });
extraCoordinatorAddresses.push_back(
NetworkAddress(extraIp, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back());
NetworkAddress extraCoordinator(extraIp, port, true, isTLS, fromHostname);
extraCoordinatorAddresses.push_back(extraCoordinator);
if (useHostname) {
std::string hostname = "fakeCoordinatorDC" + std::to_string(dc) + "M" + std::to_string(m);
Hostname coordinatorHostname(hostname, std::to_string(port), isTLS);
coordinatorHostnames.push_back(coordinatorHostname);
INetworkConnections::net()->addMockTCPEndpoint(hostname, std::to_string(port), { coordinator });
hostname = "fakeExtraCoordinatorDC" + std::to_string(dc) + "M" + std::to_string(m);
Hostname extraCoordinatorHostname(hostname, std::to_string(port), isTLS);
extraCoordinatorHostnames.push_back(extraCoordinatorHostname);
INetworkConnections::net()->addMockTCPEndpoint(
hostname, std::to_string(port), { extraCoordinator });
}
TraceEvent("SelectedCoordinator")
.detail("Hostname", useHostname ? coordinatorHostnames.back().toString().c_str() : "N/A")
.detail("Address", coordinatorAddresses.back());
}
}
} else {
@ -1932,12 +1970,25 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
.detail("CoordinatorCount", coordinatorCount);
} else {
auto ip = makeIPAddressForSim(useIPv6, { 2, dc, 1, m });
coordinatorAddresses.push_back(
NetworkAddress(ip, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
uint16_t port = sslEnabled && !sslOnly ? 2 : 1;
NetworkAddress coordinator(ip, port, true, isTLS, fromHostname);
coordinatorAddresses.push_back(coordinator);
auto extraIp = makeIPAddressForSim(useIPv6, { 4, dc, 1, m });
extraCoordinatorAddresses.push_back(
NetworkAddress(extraIp, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
NetworkAddress extraCoordinator(extraIp, port, true, isTLS, fromHostname);
extraCoordinatorAddresses.push_back(extraCoordinator);
if (useHostname) {
std::string hostname = "fakeCoordinatorDC" + std::to_string(dc) + "M" + std::to_string(m);
Hostname coordinatorHostname(hostname, std::to_string(port), isTLS);
coordinatorHostnames.push_back(coordinatorHostname);
INetworkConnections::net()->addMockTCPEndpoint(hostname, std::to_string(port), { coordinator });
hostname = "fakeExtraCoordinatorDC" + std::to_string(dc) + "M" + std::to_string(m);
Hostname extraCoordinatorHostname(hostname, std::to_string(port), isTLS);
extraCoordinatorHostnames.push_back(extraCoordinatorHostname);
INetworkConnections::net()->addMockTCPEndpoint(
hostname, std::to_string(port), { extraCoordinator });
}
TraceEvent("SelectedCoordinator")
.detail("Hostname", useHostname ? coordinatorHostnames.back().toString().c_str() : "N/A")
.detail("Address", coordinatorAddresses.back())
.detail("M", m)
.detail("Machines", machines)
@ -1970,20 +2021,30 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
ASSERT_EQ(coordinatorAddresses.size(), coordinatorCount);
ClusterConnectionString conn(coordinatorAddresses, LiteralStringRef("TestCluster:0"));
if (useHostname) {
conn = ClusterConnectionString(coordinatorHostnames, LiteralStringRef("TestCluster:0"));
}
// If extraDB==0, leave g_simulator.extraDB as null because the test does not use DR.
if (testConfig.extraDB == 1) {
// The DR database can be either a new database or itself
g_simulator.extraDB =
BUGGIFY ? new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"))
: new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
BUGGIFY
? (useHostname ? new ClusterConnectionString(coordinatorHostnames, LiteralStringRef("TestCluster:0"))
: new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0")))
: (useHostname
? new ClusterConnectionString(extraCoordinatorHostnames, LiteralStringRef("ExtraCluster:0"))
: new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0")));
} else if (testConfig.extraDB == 2) {
// The DR database is a new database
g_simulator.extraDB =
new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
useHostname ? new ClusterConnectionString(extraCoordinatorHostnames, LiteralStringRef("ExtraCluster:0"))
: new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
} else if (testConfig.extraDB == 3) {
// The DR database is the same database
g_simulator.extraDB = new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"));
g_simulator.extraDB =
useHostname ? new ClusterConnectionString(coordinatorHostnames, LiteralStringRef("TestCluster:0"))
: new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"));
}
*pConnString = conn;

View File

@ -207,24 +207,33 @@ struct StorageServerDisk {
Future<Void> commit() { return storage->commit(); }
// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
// Read the key that is equal or greater then 'key' from the storage engine.
// For example, readNextKeyInclusive("a") should return:
// - "a", if key "a" exist
// - "b", if key "a" doesn't exist, and "b" is the next existing key in total order
// - allKeys.end, if keyrange [a, allKeys.end) is empty
Future<Key> readNextKeyInclusive(KeyRef key, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
++(*kvScans);
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), type);
}
Future<Optional<Value>> readValue(KeyRef key,
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
Optional<UID> debugID = Optional<UID>()) {
++(*kvGets);
return storage->readValue(key, type, debugID);
}
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
Optional<UID> debugID = Optional<UID>()) {
++(*kvGets);
return storage->readValuePrefix(key, maxLength, type, debugID);
}
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
++(*kvScans);
return storage->readRange(keys, rowLimit, byteLimit, type);
}
@ -232,10 +241,16 @@ struct StorageServerDisk {
StorageBytes getStorageBytes() const { return storage->getStorageBytes(); }
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
// The following are pointers to the Counters in StorageServer::counters of the same names.
Counter* kvCommitLogicalBytes;
Counter* kvClearRanges;
Counter* kvGets;
Counter* kvScans;
Counter* kvCommits;
private:
struct StorageServer* data;
IKeyValueStore* storage;
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, IKeyValueStore::ReadType type) {
@ -799,6 +814,19 @@ public:
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
Counter bytesInput;
// Bytes pulled from TLogs, it counts the size of the key value pairs, e.g., key-value pair ("a", "b") is
// counted as 2 Bytes.
Counter logicalBytesInput;
// Bytes pulled from TLogs for moving-in shards, it counts the mutations sent to the moving-in shard during
// Fetching and Waiting phases.
Counter logicalBytesMoveInOverhead;
// Bytes committed to the underlying storage engine by SS, it counts the size of key value pairs.
Counter kvCommitLogicalBytes;
// Count of all clearRange operatons to the storage engine.
Counter kvClearRanges;
// ClearRange operations issued by FDB, instead of from users, e.g., ClearRange operations to remove a shard
// from a storage server, as in removeDataRange().
Counter kvSystemClearRanges;
// Bytes of the mutations that have been removed from memory because they durable. The counting is same as
// bytesInput, instead of the actual bytes taken in the storages, so that (bytesInput - bytesDurable) can
// reflect the current memory footprint of MVCC.
@ -826,6 +854,19 @@ public:
// fallback).
Counter quickGetValueHit, quickGetValueMiss, quickGetKeyValuesHit, quickGetKeyValuesMiss;
// The number of logical bytes returned from storage engine, in response to readRange operations.
Counter kvScanBytes;
// The number of logical bytes returned from storage engine, in response to readValue operations.
Counter kvGetBytes;
// The number of keys read from storage engine by eagerReads.
Counter eagerReadsKeys;
// The count of readValue operation to the storage engine.
Counter kvGets;
// The count of readValue operation to the storage engine.
Counter kvScans;
// The count of commit operation to the storage engine.
Counter kvCommits;
LatencySample readLatencySample;
LatencyBands readLatencyBands;
@ -836,17 +877,23 @@ public:
getRangeStreamQueries("GetRangeStreamQueries", cc), finishedQueries("FinishedQueries", cc),
lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc),
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc),
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc),
fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc),
readsRejected("ReadsRejected", cc), wrongShardServer("WrongShardServer", cc),
fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc),
quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc),
quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc),
bytesInput("BytesInput", cc), logicalBytesInput("LogicalBytesInput", cc),
logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc),
kvCommitLogicalBytes("KVCommitLogicalBytes", cc), kvClearRanges("KVClearRanges", cc),
kvSystemClearRanges("KVSystemClearRanges", cc), bytesDurable("BytesDurable", cc),
bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc),
sampledBytesCleared("SampledBytesCleared", cc), kvFetched("KVFetched", cc), mutations("Mutations", cc),
setMutations("SetMutations", cc), clearRangeMutations("ClearRangeMutations", cc),
atomicMutations("AtomicMutations", cc), updateBatches("UpdateBatches", cc),
updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc),
fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc),
wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc),
fetchesFromLogs("FetchesFromLogs", cc), quickGetValueHit("QuickGetValueHit", cc),
quickGetValueMiss("QuickGetValueMiss", cc), quickGetKeyValuesHit("QuickGetKeyValuesHit", cc),
quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc), kvScanBytes("KVScanBytes", cc),
kvGetBytes("KVGetBytes", cc), eagerReadsKeys("EagerReadsKeys", cc), kvGets("KVGets", cc),
kvScans("KVScans", cc), kvCommits("KVCommits", cc),
readLatencySample("ReadLatencyMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
@ -874,6 +921,9 @@ public:
}
} counters;
// Bytes read from storage engine when a storage server starts.
int64_t bytesRestored;
Reference<EventCacheHolder> storageServerSourceTLogIDEventHolder;
StorageServer(IKeyValueStore* storage,
@ -928,6 +978,12 @@ public:
addShard(ShardInfo::newNotAssigned(allKeys));
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True);
this->storage.kvCommitLogicalBytes = &counters.kvCommitLogicalBytes;
this->storage.kvClearRanges = &counters.kvClearRanges;
this->storage.kvGets = &counters.kvGets;
this->storage.kvScans = &counters.kvScans;
this->storage.kvCommits = &counters.kvCommits;
}
//~StorageServer() { fclose(log); }
@ -1340,6 +1396,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
} else if (!i || !i->isClearTo() || i->getEndKey() <= req.key) {
path = 2;
Optional<Value> vv = wait(data->storage.readValue(req.key, IKeyValueStore::ReadType::NORMAL, req.debugID));
data->counters.kvGetBytes += vv.expectedSize();
// Validate that while we were reading the data we didn't lose the version or shard
if (version < data->storageVersion()) {
TEST(true); // transaction_too_old after readValue
@ -1606,6 +1663,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
if (feed->second->storageVersion != invalidVersion) {
self->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, req.version)));
++self->counters.kvSystemClearRanges;
if (req.version > feed->second->storageVersion) {
feed->second->storageVersion = invalidVersion;
feed->second->durableVersion = invalidVersion;
@ -1774,6 +1832,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
changeFeedDurableKey(req.rangeID, req.end)),
1 << 30,
remainingDurableBytes));
data->counters.kvScanBytes += res.logicalSize();
if (!req.range.empty()) {
data->checkChangeCounter(changeCounter, req.range);
@ -2187,6 +2246,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
readEnd = vCurrent ? std::min(vCurrent.key(), range.end) : range.end;
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
data->counters.kvScanBytes += atStorageVersion.logicalSize();
ASSERT(atStorageVersion.size() <= limit);
if (data->storageVersion() > version)
@ -2268,6 +2328,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
: range.begin;
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
data->counters.kvScanBytes += atStorageVersion.logicalSize();
ASSERT(atStorageVersion.size() <= -limit);
if (data->storageVersion() > version)
@ -3364,9 +3425,13 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
std::vector<Future<Key>> keyEnd(eager->keyBegin.size());
for (int i = 0; i < keyEnd.size(); i++)
keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i], IKeyValueStore::ReadType::EAGER);
data->counters.eagerReadsKeys += keyEnd.size();
state Future<std::vector<Key>> futureKeyEnds = getAll(keyEnd);
state std::vector<Key> keyEndVal = wait(futureKeyEnds);
for (const auto& key : keyEndVal) {
data->counters.kvScanBytes += key.expectedSize();
}
eager->keyEnd = keyEndVal;
}
@ -3377,6 +3442,12 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
state Future<std::vector<Optional<Value>>> futureValues = getAll(value);
std::vector<Optional<Value>> optionalValues = wait(futureValues);
for (const auto& value : optionalValues) {
if (value.present()) {
data->counters.kvGetBytes += value.expectedSize();
}
}
data->counters.eagerReadsKeys += eager->keys.size();
eager->value = optionalValues;
return Void();
@ -3667,6 +3738,7 @@ void removeDataRange(StorageServer* ss,
MutationRef m(MutationRef::ClearRange, range.end, endClear->getEndKey());
m = ss->addMutationToMutationLog(mLV, m);
data.insert(m.param1, ValueOrClearToRef::clearTo(m.param2));
++ss->counters.kvSystemClearRanges;
}
auto beginClear = data.atLatest().lastLess(range.begin);
@ -4385,6 +4457,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
if (shard->phase < AddingShard::Waiting) {
data->storage.clearRange(keys);
++data->counters.kvSystemClearRanges;
data->byteSampleApplyClear(keys, invalidVersion);
} else {
ASSERT(data->data().getLatestVersion() > data->version.get());
@ -4416,6 +4489,7 @@ AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys)
}
void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const& mutation) {
server->counters.logicalBytesMoveInOverhead += mutation.expectedSize();
if (mutation.type == mutation.ClearRange) {
ASSERT(keys.begin <= mutation.param1 && mutation.param2 <= keys.end);
} else if (isSingleKeyMutation((MutationRef::Type)mutation.type)) {
@ -4623,6 +4697,7 @@ void changeServerKeys(StorageServer* data,
data->addMutation(data->data().getLatestVersion(), true, clearRange, range, data->updateEagerReads);
data->newestAvailableVersion.insert(range, latestVersion);
setAvailableStatus(data, range, true);
++data->counters.kvSystemClearRanges;
}
validate(data);
@ -4648,10 +4723,12 @@ void changeServerKeys(StorageServer* data,
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(f.first, 0),
changeFeedDurableKey(f.first, version)));
++data->counters.kvSystemClearRanges;
auto rs = data->keyChangeFeed.modify(f.second);
for (auto r = rs.begin(); r != rs.end(); ++r) {
auto& feedList = r->value();
@ -4899,10 +4976,12 @@ private:
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, currentVersion)));
++data->counters.kvSystemClearRanges;
auto rs = data->keyChangeFeed.modify(feed->second->range);
for (auto r = rs.begin(); r != rs.end(); ++r) {
auto& feedList = r->value();
@ -4923,6 +5002,7 @@ private:
if (feed->second->storageVersion != invalidVersion) {
data->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, popVersion)));
++data->counters.kvSystemClearRanges;
if (popVersion > feed->second->storageVersion) {
feed->second->storageVersion = invalidVersion;
feed->second->durableVersion = invalidVersion;
@ -5289,6 +5369,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
updater.applyMutation(data, msg, ver, false);
mutationBytes += msg.totalSize();
data->counters.mutationBytes += msg.totalSize();
data->counters.logicalBytesInput += msg.expectedSize();
++data->counters.mutations;
switch (msg.type) {
case MutationRef::SetValue:
@ -5503,6 +5584,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
state double beforeStorageCommit = now();
state Future<Void> durable = data->storage.commit();
++data->counters.kvCommits;
state Future<Void> durableDelay = Void();
if (bytesLeft > 0) {
@ -5606,6 +5688,7 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
//TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", availableKeys.begin).detail("RangeEnd", availableKeys.end);
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, availableKeys.begin, availableKeys.end));
++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV,
MutationRef(MutationRef::SetValue,
availableKeys.begin,
@ -5626,6 +5709,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
persistShardAssignedKeys.begin.toString() + keys.end.toString());
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end));
++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV,
MutationRef(MutationRef::SetValue,
assignedKeys.begin,
@ -5641,17 +5725,21 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
void StorageServerDisk::clearRange(KeyRangeRef keys) {
storage->clear(keys);
++(*kvClearRanges);
}
void StorageServerDisk::writeKeyValue(KeyValueRef kv) {
storage->set(kv);
*kvCommitLogicalBytes += kv.expectedSize();
}
void StorageServerDisk::writeMutation(MutationRef mutation) {
if (mutation.type == MutationRef::SetValue) {
storage->set(KeyValueRef(mutation.param1, mutation.param2));
*kvCommitLogicalBytes += mutation.expectedSize();
} else if (mutation.type == MutationRef::ClearRange) {
storage->clear(KeyRangeRef(mutation.param1, mutation.param2));
++(*kvClearRanges);
} else
ASSERT(false);
}
@ -5663,8 +5751,10 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
DEBUG_MUTATION(debugContext, debugVersion, m, data->thisServerID);
if (m.type == MutationRef::SetValue) {
storage->set(KeyValueRef(m.param1, m.param2));
*kvCommitLogicalBytes += m.expectedSize();
} else if (m.type == MutationRef::ClearRange) {
storage->clear(KeyRangeRef(m.param1, m.param2));
++(*kvClearRanges);
}
}
}
@ -5696,6 +5786,7 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion,
// Update data->storage to persist the changes from (data->storageVersion(),version]
void StorageServerDisk::makeVersionDurable(Version version) {
storage->set(KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())));
*kvCommitLogicalBytes += persistVersion.expectedSize() + sizeof(Version);
// TraceEvent("MakeDurable", data->thisServerID)
// .detail("FromVersion", prevStorageVersion)
@ -5724,8 +5815,11 @@ ACTOR Future<Void> applyByteSampleResult(StorageServer* data,
loop {
RangeResult bs = wait(storage->readRange(
KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES));
if (results)
if (results) {
results->push_back(bs.castTo<VectorRef<KeyValueRef>>());
data->bytesRestored += bs.logicalSize();
data->counters.kvScanBytes += bs.logicalSize();
}
int rangeSize = bs.expectedSize();
totalFetches++;
totalKeys += bs.size();
@ -5894,6 +5988,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
data->sk = Key();
return false;
}
data->bytesRestored += fFormat.get().expectedSize();
if (!persistFormatReadableRange.contains(fFormat.get().get())) {
TraceEvent(SevError, "UnsupportedDBFormat")
.detail("Format", fFormat.get().get().toString())
@ -5901,12 +5996,15 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
throw worker_recovery_failed();
}
data->thisServerID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned());
data->bytesRestored += fID.get().expectedSize();
if (ftssPairID.get().present()) {
data->setTssPair(BinaryReader::fromStringRef<UID>(ftssPairID.get().get(), Unversioned()));
data->bytesRestored += ftssPairID.get().expectedSize();
}
if (fClusterID.get().present()) {
data->clusterId.send(BinaryReader::fromStringRef<UID>(fClusterID.get().get(), Unversioned()));
data->bytesRestored += fClusterID.get().expectedSize();
} else {
TEST(true); // storage server upgraded to version supporting cluster IDs
data->actors.add(persistClusterId(data));
@ -5918,22 +6016,29 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
if (fTssQuarantine.get().present()) {
TEST(true); // TSS restarted while quarantined
data->tssInQuarantine = true;
data->bytesRestored += fTssQuarantine.get().expectedSize();
}
data->sk = serverKeysPrefixFor((data->tssPairID.present()) ? data->tssPairID.get() : data->thisServerID)
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
if (fLogProtocol.get().present())
if (fLogProtocol.get().present()) {
data->logProtocol = BinaryReader::fromStringRef<ProtocolVersion>(fLogProtocol.get().get(), Unversioned());
data->bytesRestored += fLogProtocol.get().expectedSize();
}
if (fPrimaryLocality.get().present())
if (fPrimaryLocality.get().present()) {
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned());
data->bytesRestored += fPrimaryLocality.get().expectedSize();
}
state Version version = BinaryReader::fromStringRef<Version>(fVersion.get().get(), Unversioned());
debug_checkRestoredVersion(data->thisServerID, version, "StorageServer");
data->setInitialVersion(version);
data->bytesRestored += fVersion.get().expectedSize();
state RangeResult available = fShardAvailable.get();
data->bytesRestored += available.logicalSize();
state int availableLoc;
for (availableLoc = 0; availableLoc < available.size(); availableLoc++) {
KeyRangeRef keys(available[availableLoc].key.removePrefix(persistShardAvailableKeys.begin),
@ -5950,6 +6055,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
}
state RangeResult assigned = fShardAssigned.get();
data->bytesRestored += assigned.logicalSize();
state int assignedLoc;
for (assignedLoc = 0; assignedLoc < assigned.size(); assignedLoc++) {
KeyRangeRef keys(assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin),
@ -5968,6 +6074,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
}
state RangeResult changeFeeds = fChangeFeeds.get();
data->bytesRestored += changeFeeds.logicalSize();
state int feedLoc;
for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) {
Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin);
@ -6005,9 +6112,11 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
++it) {
if (it->value() == invalidVersion) {
KeyRangeRef clearRange(it->begin(), it->end());
++data->counters.kvSystemClearRanges;
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
// DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange);
storage->clear(clearRange);
++data->counters.kvSystemClearRanges;
data->byteSampleApplyClear(clearRange, invalidVersion);
}
}
@ -6085,6 +6194,7 @@ void StorageServer::byteSampleApplySet(KeyValueRef kv, Version ver) {
auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin));
addMutationToMutationLogOrStorage(ver,
MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
++counters.kvSystemClearRanges;
}
}
@ -6131,6 +6241,7 @@ void StorageServer::byteSampleApplyClear(KeyRangeRef range, Version ver) {
byteSample.eraseAsync(range.begin, range.end);
auto diskRange = range.withPrefix(persistByteSampleKeys.begin);
addMutationToMutationLogOrStorage(ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
++counters.kvSystemClearRanges;
}
}
@ -6242,6 +6353,7 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
state Future<Void> doPollMetrics = Void();
wait(self->byteSampleRecovery);
TraceEvent("StorageServerRestoreDurableState", self->thisServerID).detail("RestoredBytes", self->bytesRestored);
// Logs all counters in `counters.cc` and reset the interval.
self->actors.add(traceCounters("StorageMetrics",
@ -6793,6 +6905,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
try {
wait(self.storage.init());
wait(self.storage.commit());
++self.counters.kvCommits;
if (seedTag == invalidTag) {
std::pair<Version, Tag> verAndTag = wait(addStorageServer(
@ -6809,6 +6922,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
self.storage.makeNewStorageServerDurable();
wait(self.storage.commit());
++self.counters.kvCommits;
TraceEvent("StorageServerInit", ssi.id())
.detail("Version", self.version.get())
@ -7008,6 +7122,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
throw worker_removed();
}
}
++self.counters.kvCommits;
bool ok = wait(self.storage.restoreDurableState());
if (!ok) {

View File

@ -1083,7 +1083,9 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStorageEngineExcludeTypes", ""); } },
{ "maxTLogVersion",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedMaxTLogVersion", ""); } },
{ "disableTss", [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableTSS", ""); } }
{ "disableTss", [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableTSS", ""); } },
{ "disableHostname",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableHostname", ""); } }
};
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {

View File

@ -66,6 +66,7 @@ struct SaveAndKillWorkload : TestWorkload {
ini.SetValue("META", "connectionString", g_simulator.connectionString.c_str());
ini.SetValue("META", "testerCount", format("%d", g_simulator.testerCount).c_str());
ini.SetValue("META", "tssMode", format("%d", g_simulator.tssMode).c_str());
ini.SetValue("META", "mockDNS", INetworkConnections::net()->convertMockDNSToString().c_str());
std::vector<ISimulator::ProcessInfo*> processes = g_simulator.getAllProcesses();
std::map<NetworkAddress, ISimulator::ProcessInfo*> rebootingProcesses = g_simulator.currentlyRebootingProcesses;

View File

@ -180,13 +180,13 @@ std::string formatIpPort(const IPAddress& ip, uint16_t port) {
Future<Reference<IConnection>> INetworkConnections::connect(const std::string& host,
const std::string& service,
bool useTLS) {
bool isTLS) {
// Use map to create an actor that returns an endpoint or throws
Future<NetworkAddress> pickEndpoint =
map(resolveTCPEndpoint(host, service), [=](std::vector<NetworkAddress> const& addresses) -> NetworkAddress {
NetworkAddress addr = addresses[deterministicRandom()->randomInt(0, addresses.size())];
addr.fromHostname = true;
if (useTLS) {
if (isTLS) {
addr.flags = NetworkAddress::FLAG_TLS;
}
return addr;
@ -272,25 +272,25 @@ TEST_CASE("/flow/network/hostname") {
ASSERT(hn1.toString() == hn1s);
ASSERT(hn1.host == "localhost");
ASSERT(hn1.service == "1234");
ASSERT(!hn1.useTLS);
ASSERT(!hn1.isTLS);
auto hn2 = Hostname::parse(hn2s);
ASSERT(hn2.toString() == hn2s);
ASSERT(hn2.host == "host-name");
ASSERT(hn2.service == "1234");
ASSERT(!hn2.useTLS);
ASSERT(!hn2.isTLS);
auto hn3 = Hostname::parse(hn3s);
ASSERT(hn3.toString() == hn3s);
ASSERT(hn3.host == "host.name");
ASSERT(hn3.service == "1234");
ASSERT(!hn3.useTLS);
ASSERT(!hn3.isTLS);
auto hn4 = Hostname::parse(hn4s);
ASSERT(hn4.toString() == hn4s);
ASSERT(hn4.host == "host-name_part1.host-name_part2");
ASSERT(hn4.service == "1234");
ASSERT(hn4.useTLS);
ASSERT(hn4.isTLS);
ASSERT(Hostname::isHostname(hn1s));
ASSERT(Hostname::isHostname(hn2s));

View File

@ -138,9 +138,10 @@ class Void;
struct Hostname {
std::string host;
std::string service; // decimal port number
bool useTLS;
bool isTLS;
Hostname(std::string host, std::string service, bool useTLS) : host(host), service(service), useTLS(useTLS) {}
Hostname(std::string host, std::string service, bool isTLS) : host(host), service(service), isTLS(isTLS) {}
Hostname() : host(""), service(""), isTLS(false) {}
// Allow hostnames in forms like following:
// hostname:1234
@ -155,7 +156,7 @@ struct Hostname {
static Hostname parse(std::string const& str);
std::string toString() const { return host + ":" + service + (useTLS ? ":tls" : ""); }
std::string toString() const { return host + ":" + service + (isTLS ? ":tls" : ""); }
};
struct IPAddress {
@ -704,10 +705,10 @@ public:
const std::string& service) = 0;
// Convenience function to resolve host/service and connect to one of its NetworkAddresses randomly
// useTLS has to be a parameter here because it is passed to connect() as part of the toAddr object.
// isTLS has to be a parameter here because it is passed to connect() as part of the toAddr object.
virtual Future<Reference<IConnection>> connect(const std::string& host,
const std::string& service,
bool useTLS = false);
bool isTLS = false);
// Listen for connections on the given local address
virtual Reference<IListener> listen(NetworkAddress localAddr) = 0;

View File

@ -2,6 +2,7 @@
storageEngineExcludeTypes=[-1,-2]
maxTLogVersion=6
disableTss=true
disableHostname=true
[[test]]
testTitle='Clogged'

View File

@ -2,6 +2,7 @@
storageEngineExcludeTypes=[-1,-2]
maxTLogVersion=6
disableTss=true
disableHostname=true
[[test]]
testTitle='Clogged'