From f41b61aacf0a30e2c4a5d92df22e258807411f0e Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Fri, 27 Jan 2023 16:46:26 -0600 Subject: [PATCH] Blobstore static connection pool, and observability improvements (#9234) * Adding global connection pool for multiple blobstore instances * adding knob to enable/disable blobstore global connection pool * Adding BlobStoreMetrics and BlobStoreRequestLatency logging for better blobstore observability --- fdbclient/ClientKnobs.cpp | 5 + fdbclient/S3BlobStore.actor.cpp | 30 ++++- fdbclient/include/fdbclient/ClientKnobs.h | 5 + fdbclient/include/fdbclient/S3BlobStore.h | 139 ++++++++++++++++++++-- 4 files changed, 164 insertions(+), 15 deletions(-) diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 1dd639aac7..95cb816d62 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -224,6 +224,11 @@ void ClientKnobs::initialize(Randomize randomize) { init( BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE, 2 ); init( BLOBSTORE_MULTIPART_MAX_PART_SIZE, 20000000 ); init( BLOBSTORE_MULTIPART_MIN_PART_SIZE, 5242880 ); + init( BLOBSTORE_GLOBAL_CONNECTION_POOL, true ); + init( BLOBSTORE_ENABLE_LOGGING, true ); + init( BLOBSTORE_STATS_LOGGING_INTERVAL, 10.0 ); + init( BLOBSTORE_LATENCY_LOGGING_INTERVAL, 120.0 ); + init( BLOBSTORE_LATENCY_LOGGING_ACCURACY, 0.01 ); // These are basically unlimited by default but can be used to reduce blob IO if needed init( BLOBSTORE_REQUESTS_PER_SECOND, 200 ); diff --git a/fdbclient/S3BlobStore.actor.cpp b/fdbclient/S3BlobStore.actor.cpp index e2a488207a..a5793a8da2 100644 --- a/fdbclient/S3BlobStore.actor.cpp +++ b/fdbclient/S3BlobStore.actor.cpp @@ -69,6 +69,11 @@ S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::Stats::operator-(const Stats& rh } S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::s_stats; +std::unique_ptr S3BlobStoreEndpoint::blobStats; +Future S3BlobStoreEndpoint::statsLogger = Never(); + +std::unordered_map> + S3BlobStoreEndpoint::globalConnectionPool; S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() { secure_connection = 1; @@ -96,6 +101,7 @@ S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() { max_send_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_SEND_BYTES_PER_SECOND; max_recv_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_RECV_BYTES_PER_SECOND; sdk_auth = false; + global_connection_pool = CLIENT_KNOBS->BLOBSTORE_GLOBAL_CONNECTION_POOL; } bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) { @@ -134,6 +140,7 @@ bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) { TRY_PARAM(max_send_bytes_per_second, sbps); TRY_PARAM(max_recv_bytes_per_second, rbps); TRY_PARAM(sdk_auth, sa); + TRY_PARAM(global_connection_pool, gcp); #undef TRY_PARAM return false; } @@ -171,6 +178,8 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const { _CHECK_PARAM(read_cache_blocks_per_file, rcb); _CHECK_PARAM(max_send_bytes_per_second, sbps); _CHECK_PARAM(max_recv_bytes_per_second, rbps); + _CHECK_PARAM(sdk_auth, sa); + _CHECK_PARAM(global_connection_pool, gcp); #undef _CHECK_PARAM return r; } @@ -721,20 +730,23 @@ ACTOR Future connect_impl(ReferenceconnectionPool.empty()) { - S3BlobStoreEndpoint::ReusableConnection rconn = b->connectionPool.front(); - b->connectionPool.pop(); + while (!b->connectionPool->pool.empty()) { + S3BlobStoreEndpoint::ReusableConnection rconn = b->connectionPool->pool.front(); + b->connectionPool->pool.pop(); // If the connection expires in the future then return it if (rconn.expirationTime > now()) { *reusingConn = true; + ++b->blobStats->reusedConnections; TraceEvent("S3BlobStoreEndpointReusingConnected") .suppressFor(60) .detail("RemoteEndpoint", rconn.conn->getPeerAddress()) .detail("ExpiresIn", rconn.expirationTime - now()); return rconn; } + ++b->blobStats->expiredConnections; } + ++b->blobStats->newConnections; std::string host = b->host, service = b->service; if (service.empty()) { if (b->useProxy) { @@ -743,7 +755,7 @@ ACTOR Future connect_impl(Referenceknobs.secure_connection ? "https" : "http"; } - bool isTLS = b->knobs.secure_connection == 1; + bool isTLS = b->knobs.isTLS(); state Reference conn; if (b->useProxy) { if (isTLS) { @@ -779,7 +791,9 @@ Future S3BlobStoreEndpoint::connect(boo void S3BlobStoreEndpoint::returnConnection(ReusableConnection& rconn) { // If it expires in the future then add it to the pool in the front if (rconn.expirationTime > now()) { - connectionPool.push(rconn); + connectionPool->pool.push(rconn); + } else { + ++blobStats->expiredConnections; } rconn.conn = Reference(); } @@ -945,6 +959,8 @@ ACTOR Future> doRequest_impl(Referenceheaders["Connection"] != "close") { bstore->returnConnection(rconn); + } else { + ++bstore->blobStats->expiredConnections; } rconn.conn.clear(); @@ -958,16 +974,19 @@ ACTOR Future> doRequest_impl(Referencetimer(); double connectDuration = reqStartTimer - connectStartTimer; double reqDuration = end - reqStartTimer; + bstore->blobStats->requestLatency.addMeasurement(reqDuration); // If err is not present then r is valid. // If r->code is in successCodes then record the successful request and return r. if (!err.present() && successCodes.count(r->code) != 0) { bstore->s_stats.requests_successful++; + ++bstore->blobStats->requestsSuccessful; return r; } // Otherwise, this request is considered failed. Update failure count. bstore->s_stats.requests_failed++; + ++bstore->blobStats->requestsFailed; // All errors in err are potentially retryable as well as certain HTTP response codes... bool retryable = err.present() || r->code == 500 || r->code == 502 || r->code == 503 || r->code == 429; @@ -1014,6 +1033,7 @@ ACTOR Future> doRequest_impl(ReferenceblobStats->fastRetries; wait(delay(0)); } else if (retryable) { // We will wait delay seconds before the next retry, start with nextRetryDelay. diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index dced079f8b..709efea059 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -239,6 +239,11 @@ public: int BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE; int BLOBSTORE_MAX_SEND_BYTES_PER_SECOND; int BLOBSTORE_MAX_RECV_BYTES_PER_SECOND; + bool BLOBSTORE_GLOBAL_CONNECTION_POOL; + bool BLOBSTORE_ENABLE_LOGGING; + double BLOBSTORE_STATS_LOGGING_INTERVAL; + double BLOBSTORE_LATENCY_LOGGING_INTERVAL; + double BLOBSTORE_LATENCY_LOGGING_ACCURACY; int BGR_READ_BLOCK_SIZE; int CONSISTENCY_CHECK_RATE_LIMIT_MAX; diff --git a/fdbclient/include/fdbclient/S3BlobStore.h b/fdbclient/include/fdbclient/S3BlobStore.h index 6e03a5a6d4..ab4a78a8b6 100644 --- a/fdbclient/include/fdbclient/S3BlobStore.h +++ b/fdbclient/include/fdbclient/S3BlobStore.h @@ -21,15 +21,53 @@ #pragma once #include +#include #include +#include "flow/IRandom.h" #include "flow/flow.h" #include "flow/Net2Packet.h" #include "fdbclient/Knobs.h" #include "flow/IRateControl.h" #include "fdbrpc/HTTP.h" +#include "fdbrpc/Stats.h" #include "fdbclient/JSONDoc.h" #include "flow/IConnection.h" +#include + +// unique key that indentifies interchangeable connections for the same settings and destination +// FIXME: can we define std::hash as a struct member of a S3BlobStoreEndpoint? +struct BlobStoreConnectionPoolKey { + std::string host; + std::string service; + std::string region; + bool isTLS; + + BlobStoreConnectionPoolKey(const std::string& host, + const std::string& service, + const std::string& region, + bool isTLS) + : host(host), service(service), region(region), isTLS(isTLS) {} + + bool operator==(const BlobStoreConnectionPoolKey& other) const { + return isTLS == other.isTLS && host == other.host && service == other.service && region == other.region; + } +}; + +namespace std { +template <> +struct hash { + std::size_t operator()(const BlobStoreConnectionPoolKey& key) const { + std::size_t seed = 0; + boost::hash_combine(seed, std::hash{}(key.host)); + boost::hash_combine(seed, std::hash{}(key.service)); + boost::hash_combine(seed, std::hash{}(key.region)); + boost::hash_combine(seed, std::hash{}(key.isTLS)); + return seed; + } +}; +} // namespace std + // Representation of all the things you need to connect to a blob store instance with some credentials. // Reference counted because a very large number of them could be needed. class S3BlobStoreEndpoint : public ReferenceCounted { @@ -47,6 +85,54 @@ public: static Stats s_stats; + struct BlobStats { + UID id; + CounterCollection cc; + Counter requestsSuccessful; + Counter requestsFailed; + Counter newConnections; + Counter expiredConnections; + Counter reusedConnections; + Counter fastRetries; + + LatencySample requestLatency; + + // init not in static codepath, to avoid initialization race issues and so no blob connections means no + // unecessary blob stats traces + BlobStats() + : id(deterministicRandom()->randomUniqueID()), cc("BlobStoreStats", id.toString()), + requestsSuccessful("RequestsSuccessful", cc), requestsFailed("RequestsFailed", cc), + newConnections("NewConnections", cc), expiredConnections("ExpiredConnections", cc), + reusedConnections("ReusedConnections", cc), fastRetries("FastRetries", cc), + requestLatency("BlobStoreRequestLatency", + id, + CLIENT_KNOBS->BLOBSTORE_LATENCY_LOGGING_INTERVAL, + CLIENT_KNOBS->BLOBSTORE_LATENCY_LOGGING_ACCURACY) {} + }; + // null when initialized, so no blob stats until a blob connection is used + static std::unique_ptr blobStats; + static Future statsLogger; + + void maybeStartStatsLogger() { + if (!blobStats && CLIENT_KNOBS->BLOBSTORE_ENABLE_LOGGING) { + blobStats = std::make_unique(); + specialCounter( + blobStats->cc, "GlobalConnectionPoolCount", [this]() { return this->globalConnectionPool.size(); }); + specialCounter(blobStats->cc, "GlobalConnectionPoolSize", [this]() { + // FIXME: could track this explicitly via an int variable with extra logic, but this should be small and + // infrequent + int totalConnections = 0; + for (auto& it : this->globalConnectionPool) { + totalConnections += it.second->pool.size(); + } + return totalConnections; + }); + + statsLogger = blobStats->cc.traceCounters( + "BlobStoreMetrics", blobStats->id, CLIENT_KNOBS->BLOBSTORE_STATS_LOGGING_INTERVAL, "BlobStoreMetrics"); + } + } + struct Credentials { std::string key; std::string secret; @@ -60,7 +146,7 @@ public: delete_requests_per_second, multipart_max_part_size, multipart_min_part_size, concurrent_requests, concurrent_uploads, concurrent_lists, concurrent_reads_per_file, concurrent_writes_per_file, enable_read_cache, read_block_size, read_ahead_blocks, read_cache_blocks_per_file, - max_send_bytes_per_second, max_recv_bytes_per_second, sdk_auth; + max_send_bytes_per_second, max_recv_bytes_per_second, sdk_auth, global_connection_pool; bool set(StringRef name, int value); std::string getURLParameters() const; static std::vector getKnobDescriptions() { @@ -95,11 +181,27 @@ public: "max_recv_bytes_per_second (or rbps) Max receive bytes per second for all requests combined (NOT YET " "USED).", "sdk_auth (or sa) Use AWS SDK to resolve credentials. Only valid if " - "BUILD_AWS_BACKUP is enabled." + "BUILD_AWS_BACKUP is enabled.", + "global_connection_pool (or gcp) Enable shared connection pool between all blobstore instances." }; } + + bool isTLS() const { return secure_connection == 1; } }; + struct ReusableConnection { + Reference conn; + double expirationTime; + }; + + // basically, reference counted queue with option to add other fields + struct ConnectionPoolData : NonCopyable, ReferenceCounted { + std::queue pool; + }; + + // global connection pool for multiple blobstore endpoints with same connection settings and request destination + static std::unordered_map> globalConnectionPool; + S3BlobStoreEndpoint(std::string const& host, std::string const& service, std::string region, @@ -123,15 +225,34 @@ public: if (host.empty() || (proxyHost.present() != proxyPort.present())) throw connection_string_invalid(); + + // set connection pool instance + if (useProxy || !knobs.global_connection_pool) { + // don't use global connection pool if there's a proxy, as it complicates the logic + // FIXME: handle proxies? + connectionPool = makeReference(); + } else { + BlobStoreConnectionPoolKey key(host, service, region, knobs.isTLS()); + auto it = globalConnectionPool.find(key); + if (it != globalConnectionPool.end()) { + connectionPool = it->second; + } else { + connectionPool = makeReference(); + globalConnectionPool.insert({ key, connectionPool }); + } + } + ASSERT(connectionPool.isValid()); + + maybeStartStatsLogger(); } static std::string getURLFormat(bool withResource = false) { const char* resource = ""; if (withResource) resource = ""; - return format( - "blobstore://::@[:]/%s[?=[&=]...]", - resource); + return format("blobstore://::@[:]/" + "%s[?=[&=]...]", + resource); } typedef std::map ParametersT; @@ -149,11 +270,9 @@ public: // parameters in addition to the passed params string std::string getResourceURL(std::string resource, std::string params) const; - struct ReusableConnection { - Reference conn; - double expirationTime; - }; - std::queue connectionPool; + // FIXME: add periodic connection reaper to pool + // local connection pool for this blobstore + Reference connectionPool; Future connect(bool* reusingConn); void returnConnection(ReusableConnection& conn);