Blobstore static connection pool, and observability improvements (#9234)

* Adding global connection pool for multiple blobstore instances

* adding knob to enable/disable blobstore global connection pool

* Adding BlobStoreMetrics and BlobStoreRequestLatency logging for better blobstore observability
This commit is contained in:
Josh Slocum 2023-01-27 16:46:26 -06:00 committed by GitHub
parent f91aa35f70
commit f41b61aacf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 164 additions and 15 deletions

View File

@ -224,6 +224,11 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE, 2 );
init( BLOBSTORE_MULTIPART_MAX_PART_SIZE, 20000000 );
init( BLOBSTORE_MULTIPART_MIN_PART_SIZE, 5242880 );
init( BLOBSTORE_GLOBAL_CONNECTION_POOL, true );
init( BLOBSTORE_ENABLE_LOGGING, true );
init( BLOBSTORE_STATS_LOGGING_INTERVAL, 10.0 );
init( BLOBSTORE_LATENCY_LOGGING_INTERVAL, 120.0 );
init( BLOBSTORE_LATENCY_LOGGING_ACCURACY, 0.01 );
// These are basically unlimited by default but can be used to reduce blob IO if needed
init( BLOBSTORE_REQUESTS_PER_SECOND, 200 );

View File

@ -69,6 +69,11 @@ S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::Stats::operator-(const Stats& rh
}
S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::s_stats;
std::unique_ptr<S3BlobStoreEndpoint::BlobStats> S3BlobStoreEndpoint::blobStats;
Future<Void> S3BlobStoreEndpoint::statsLogger = Never();
std::unordered_map<BlobStoreConnectionPoolKey, Reference<S3BlobStoreEndpoint::ConnectionPoolData>>
S3BlobStoreEndpoint::globalConnectionPool;
S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
secure_connection = 1;
@ -96,6 +101,7 @@ S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
max_send_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_SEND_BYTES_PER_SECOND;
max_recv_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_RECV_BYTES_PER_SECOND;
sdk_auth = false;
global_connection_pool = CLIENT_KNOBS->BLOBSTORE_GLOBAL_CONNECTION_POOL;
}
bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
@ -134,6 +140,7 @@ bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
TRY_PARAM(max_send_bytes_per_second, sbps);
TRY_PARAM(max_recv_bytes_per_second, rbps);
TRY_PARAM(sdk_auth, sa);
TRY_PARAM(global_connection_pool, gcp);
#undef TRY_PARAM
return false;
}
@ -171,6 +178,8 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
_CHECK_PARAM(read_cache_blocks_per_file, rcb);
_CHECK_PARAM(max_send_bytes_per_second, sbps);
_CHECK_PARAM(max_recv_bytes_per_second, rbps);
_CHECK_PARAM(sdk_auth, sa);
_CHECK_PARAM(global_connection_pool, gcp);
#undef _CHECK_PARAM
return r;
}
@ -721,20 +730,23 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
bool* reusingConn) {
// First try to get a connection from the pool
*reusingConn = false;
while (!b->connectionPool.empty()) {
S3BlobStoreEndpoint::ReusableConnection rconn = b->connectionPool.front();
b->connectionPool.pop();
while (!b->connectionPool->pool.empty()) {
S3BlobStoreEndpoint::ReusableConnection rconn = b->connectionPool->pool.front();
b->connectionPool->pool.pop();
// If the connection expires in the future then return it
if (rconn.expirationTime > now()) {
*reusingConn = true;
++b->blobStats->reusedConnections;
TraceEvent("S3BlobStoreEndpointReusingConnected")
.suppressFor(60)
.detail("RemoteEndpoint", rconn.conn->getPeerAddress())
.detail("ExpiresIn", rconn.expirationTime - now());
return rconn;
}
++b->blobStats->expiredConnections;
}
++b->blobStats->newConnections;
std::string host = b->host, service = b->service;
if (service.empty()) {
if (b->useProxy) {
@ -743,7 +755,7 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
}
service = b->knobs.secure_connection ? "https" : "http";
}
bool isTLS = b->knobs.secure_connection == 1;
bool isTLS = b->knobs.isTLS();
state Reference<IConnection> conn;
if (b->useProxy) {
if (isTLS) {
@ -779,7 +791,9 @@ Future<S3BlobStoreEndpoint::ReusableConnection> S3BlobStoreEndpoint::connect(boo
void S3BlobStoreEndpoint::returnConnection(ReusableConnection& rconn) {
// If it expires in the future then add it to the pool in the front
if (rconn.expirationTime > now()) {
connectionPool.push(rconn);
connectionPool->pool.push(rconn);
} else {
++blobStats->expiredConnections;
}
rconn.conn = Reference<IConnection>();
}
@ -945,6 +959,8 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<S3BlobStoreEndp
// received the "Connection: close" header.
if (r->headers["Connection"] != "close") {
bstore->returnConnection(rconn);
} else {
++bstore->blobStats->expiredConnections;
}
rconn.conn.clear();
@ -958,16 +974,19 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<S3BlobStoreEndp
double end = g_network->timer();
double connectDuration = reqStartTimer - connectStartTimer;
double reqDuration = end - reqStartTimer;
bstore->blobStats->requestLatency.addMeasurement(reqDuration);
// If err is not present then r is valid.
// If r->code is in successCodes then record the successful request and return r.
if (!err.present() && successCodes.count(r->code) != 0) {
bstore->s_stats.requests_successful++;
++bstore->blobStats->requestsSuccessful;
return r;
}
// Otherwise, this request is considered failed. Update failure count.
bstore->s_stats.requests_failed++;
++bstore->blobStats->requestsFailed;
// All errors in err are potentially retryable as well as certain HTTP response codes...
bool retryable = err.present() || r->code == 500 || r->code == 502 || r->code == 503 || r->code == 429;
@ -1014,6 +1033,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<S3BlobStoreEndp
++thisTry;
if (fastRetry) {
++bstore->blobStats->fastRetries;
wait(delay(0));
} else if (retryable) {
// We will wait delay seconds before the next retry, start with nextRetryDelay.

View File

@ -239,6 +239,11 @@ public:
int BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE;
int BLOBSTORE_MAX_SEND_BYTES_PER_SECOND;
int BLOBSTORE_MAX_RECV_BYTES_PER_SECOND;
bool BLOBSTORE_GLOBAL_CONNECTION_POOL;
bool BLOBSTORE_ENABLE_LOGGING;
double BLOBSTORE_STATS_LOGGING_INTERVAL;
double BLOBSTORE_LATENCY_LOGGING_INTERVAL;
double BLOBSTORE_LATENCY_LOGGING_ACCURACY;
int BGR_READ_BLOCK_SIZE;
int CONSISTENCY_CHECK_RATE_LIMIT_MAX;

View File

@ -21,15 +21,53 @@
#pragma once
#include <map>
#include <unordered_map>
#include <functional>
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "flow/Net2Packet.h"
#include "fdbclient/Knobs.h"
#include "flow/IRateControl.h"
#include "fdbrpc/HTTP.h"
#include "fdbrpc/Stats.h"
#include "fdbclient/JSONDoc.h"
#include "flow/IConnection.h"
#include <boost/functional/hash.hpp>
// unique key that indentifies interchangeable connections for the same settings and destination
// FIXME: can we define std::hash as a struct member of a S3BlobStoreEndpoint?
struct BlobStoreConnectionPoolKey {
std::string host;
std::string service;
std::string region;
bool isTLS;
BlobStoreConnectionPoolKey(const std::string& host,
const std::string& service,
const std::string& region,
bool isTLS)
: host(host), service(service), region(region), isTLS(isTLS) {}
bool operator==(const BlobStoreConnectionPoolKey& other) const {
return isTLS == other.isTLS && host == other.host && service == other.service && region == other.region;
}
};
namespace std {
template <>
struct hash<BlobStoreConnectionPoolKey> {
std::size_t operator()(const BlobStoreConnectionPoolKey& key) const {
std::size_t seed = 0;
boost::hash_combine(seed, std::hash<std::string>{}(key.host));
boost::hash_combine(seed, std::hash<std::string>{}(key.service));
boost::hash_combine(seed, std::hash<std::string>{}(key.region));
boost::hash_combine(seed, std::hash<bool>{}(key.isTLS));
return seed;
}
};
} // namespace std
// Representation of all the things you need to connect to a blob store instance with some credentials.
// Reference counted because a very large number of them could be needed.
class S3BlobStoreEndpoint : public ReferenceCounted<S3BlobStoreEndpoint> {
@ -47,6 +85,54 @@ public:
static Stats s_stats;
struct BlobStats {
UID id;
CounterCollection cc;
Counter requestsSuccessful;
Counter requestsFailed;
Counter newConnections;
Counter expiredConnections;
Counter reusedConnections;
Counter fastRetries;
LatencySample requestLatency;
// init not in static codepath, to avoid initialization race issues and so no blob connections means no
// unecessary blob stats traces
BlobStats()
: id(deterministicRandom()->randomUniqueID()), cc("BlobStoreStats", id.toString()),
requestsSuccessful("RequestsSuccessful", cc), requestsFailed("RequestsFailed", cc),
newConnections("NewConnections", cc), expiredConnections("ExpiredConnections", cc),
reusedConnections("ReusedConnections", cc), fastRetries("FastRetries", cc),
requestLatency("BlobStoreRequestLatency",
id,
CLIENT_KNOBS->BLOBSTORE_LATENCY_LOGGING_INTERVAL,
CLIENT_KNOBS->BLOBSTORE_LATENCY_LOGGING_ACCURACY) {}
};
// null when initialized, so no blob stats until a blob connection is used
static std::unique_ptr<BlobStats> blobStats;
static Future<Void> statsLogger;
void maybeStartStatsLogger() {
if (!blobStats && CLIENT_KNOBS->BLOBSTORE_ENABLE_LOGGING) {
blobStats = std::make_unique<BlobStats>();
specialCounter(
blobStats->cc, "GlobalConnectionPoolCount", [this]() { return this->globalConnectionPool.size(); });
specialCounter(blobStats->cc, "GlobalConnectionPoolSize", [this]() {
// FIXME: could track this explicitly via an int variable with extra logic, but this should be small and
// infrequent
int totalConnections = 0;
for (auto& it : this->globalConnectionPool) {
totalConnections += it.second->pool.size();
}
return totalConnections;
});
statsLogger = blobStats->cc.traceCounters(
"BlobStoreMetrics", blobStats->id, CLIENT_KNOBS->BLOBSTORE_STATS_LOGGING_INTERVAL, "BlobStoreMetrics");
}
}
struct Credentials {
std::string key;
std::string secret;
@ -60,7 +146,7 @@ public:
delete_requests_per_second, multipart_max_part_size, multipart_min_part_size, concurrent_requests,
concurrent_uploads, concurrent_lists, concurrent_reads_per_file, concurrent_writes_per_file,
enable_read_cache, read_block_size, read_ahead_blocks, read_cache_blocks_per_file,
max_send_bytes_per_second, max_recv_bytes_per_second, sdk_auth;
max_send_bytes_per_second, max_recv_bytes_per_second, sdk_auth, global_connection_pool;
bool set(StringRef name, int value);
std::string getURLParameters() const;
static std::vector<std::string> getKnobDescriptions() {
@ -95,11 +181,27 @@ public:
"max_recv_bytes_per_second (or rbps) Max receive bytes per second for all requests combined (NOT YET "
"USED).",
"sdk_auth (or sa) Use AWS SDK to resolve credentials. Only valid if "
"BUILD_AWS_BACKUP is enabled."
"BUILD_AWS_BACKUP is enabled.",
"global_connection_pool (or gcp) Enable shared connection pool between all blobstore instances."
};
}
bool isTLS() const { return secure_connection == 1; }
};
struct ReusableConnection {
Reference<IConnection> conn;
double expirationTime;
};
// basically, reference counted queue with option to add other fields
struct ConnectionPoolData : NonCopyable, ReferenceCounted<ConnectionPoolData> {
std::queue<ReusableConnection> pool;
};
// global connection pool for multiple blobstore endpoints with same connection settings and request destination
static std::unordered_map<BlobStoreConnectionPoolKey, Reference<ConnectionPoolData>> globalConnectionPool;
S3BlobStoreEndpoint(std::string const& host,
std::string const& service,
std::string region,
@ -123,14 +225,33 @@ public:
if (host.empty() || (proxyHost.present() != proxyPort.present()))
throw connection_string_invalid();
// set connection pool instance
if (useProxy || !knobs.global_connection_pool) {
// don't use global connection pool if there's a proxy, as it complicates the logic
// FIXME: handle proxies?
connectionPool = makeReference<ConnectionPoolData>();
} else {
BlobStoreConnectionPoolKey key(host, service, region, knobs.isTLS());
auto it = globalConnectionPool.find(key);
if (it != globalConnectionPool.end()) {
connectionPool = it->second;
} else {
connectionPool = makeReference<ConnectionPoolData>();
globalConnectionPool.insert({ key, connectionPool });
}
}
ASSERT(connectionPool.isValid());
maybeStartStatsLogger();
}
static std::string getURLFormat(bool withResource = false) {
const char* resource = "";
if (withResource)
resource = "<name>";
return format(
"blobstore://<api_key>:<secret>:<security_token>@<host>[:<port>]/%s[?<param>=<value>[&<param>=<value>]...]",
return format("blobstore://<api_key>:<secret>:<security_token>@<host>[:<port>]/"
"%s[?<param>=<value>[&<param>=<value>]...]",
resource);
}
@ -149,11 +270,9 @@ public:
// parameters in addition to the passed params string
std::string getResourceURL(std::string resource, std::string params) const;
struct ReusableConnection {
Reference<IConnection> conn;
double expirationTime;
};
std::queue<ReusableConnection> connectionPool;
// FIXME: add periodic connection reaper to pool
// local connection pool for this blobstore
Reference<ConnectionPoolData> connectionPool;
Future<ReusableConnection> connect(bool* reusingConn);
void returnConnection(ReusableConnection& conn);