Added operation-specific rate controls to blob store interface.

This commit is contained in:
Stephen Atherton 2018-06-20 20:34:34 -07:00
parent 7072171b5d
commit e9e1e194f0
4 changed files with 54 additions and 1 deletions

View File

@ -164,6 +164,11 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BLOBSTORE_MAX_SEND_BYTES_PER_SECOND, 1e9 ); init( BLOBSTORE_MAX_SEND_BYTES_PER_SECOND, 1e9 );
init( BLOBSTORE_MAX_RECV_BYTES_PER_SECOND, 1e9 ); init( BLOBSTORE_MAX_RECV_BYTES_PER_SECOND, 1e9 );
init( BLOBSTORE_LIST_REQUESTS_PER_SECOND, 25 );
init( BLOBSTORE_WRITE_REQUESTS_PER_SECOND, 50 );
init( BLOBSTORE_READ_REQUESTS_PER_SECOND, 100 );
init( BLOBSTORE_DELETE_REQUESTS_PER_SECOND, 100 );
// Client Status Info // Client Status Info
init(CSI_SAMPLING_PROBABILITY, -1.0); init(CSI_SAMPLING_PROBABILITY, -1.0);
init(CSI_SIZE_LIMIT, std::numeric_limits<int64_t>::max()); init(CSI_SIZE_LIMIT, std::numeric_limits<int64_t>::max());

View File

@ -151,6 +151,10 @@ public:
int BLOBSTORE_REQUEST_TRIES; int BLOBSTORE_REQUEST_TRIES;
int BLOBSTORE_REQUEST_TIMEOUT; int BLOBSTORE_REQUEST_TIMEOUT;
int BLOBSTORE_REQUESTS_PER_SECOND; int BLOBSTORE_REQUESTS_PER_SECOND;
int BLOBSTORE_LIST_REQUESTS_PER_SECOND;
int BLOBSTORE_WRITE_REQUESTS_PER_SECOND;
int BLOBSTORE_READ_REQUESTS_PER_SECOND;
int BLOBSTORE_DELETE_REQUESTS_PER_SECOND;
int BLOBSTORE_CONCURRENT_REQUESTS; int BLOBSTORE_CONCURRENT_REQUESTS;
int BLOBSTORE_MULTIPART_MAX_PART_SIZE; int BLOBSTORE_MULTIPART_MAX_PART_SIZE;
int BLOBSTORE_MULTIPART_MIN_PART_SIZE; int BLOBSTORE_MULTIPART_MIN_PART_SIZE;

View File

@ -57,6 +57,10 @@ BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
request_timeout = CLIENT_KNOBS->BLOBSTORE_REQUEST_TIMEOUT; request_timeout = CLIENT_KNOBS->BLOBSTORE_REQUEST_TIMEOUT;
requests_per_second = CLIENT_KNOBS->BLOBSTORE_REQUESTS_PER_SECOND; requests_per_second = CLIENT_KNOBS->BLOBSTORE_REQUESTS_PER_SECOND;
concurrent_requests = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS; concurrent_requests = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS;
list_requests_per_second = CLIENT_KNOBS->BLOBSTORE_LIST_REQUESTS_PER_SECOND;
write_requests_per_second = CLIENT_KNOBS->BLOBSTORE_WRITE_REQUESTS_PER_SECOND;
read_requests_per_second = CLIENT_KNOBS->BLOBSTORE_READ_REQUESTS_PER_SECOND;
delete_requests_per_second = CLIENT_KNOBS->BLOBSTORE_DELETE_REQUESTS_PER_SECOND;
multipart_max_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MAX_PART_SIZE; multipart_max_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MAX_PART_SIZE;
multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE; multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE;
concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS; concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS;
@ -79,6 +83,10 @@ bool BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
TRY_PARAM(request_tries, rt); TRY_PARAM(request_tries, rt);
TRY_PARAM(request_timeout, rto); TRY_PARAM(request_timeout, rto);
TRY_PARAM(requests_per_second, rps); TRY_PARAM(requests_per_second, rps);
TRY_PARAM(list_requests_per_second, lrps);
TRY_PARAM(write_requests_per_second, wrps);
TRY_PARAM(read_requests_per_second, rrps);
TRY_PARAM(delete_requests_per_second, drps);
TRY_PARAM(concurrent_requests, cr); TRY_PARAM(concurrent_requests, cr);
TRY_PARAM(multipart_max_part_size, maxps); TRY_PARAM(multipart_max_part_size, maxps);
TRY_PARAM(multipart_min_part_size, minps); TRY_PARAM(multipart_min_part_size, minps);
@ -107,6 +115,10 @@ std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
_CHECK_PARAM(request_tries, rt); _CHECK_PARAM(request_tries, rt);
_CHECK_PARAM(request_timeout, rto); _CHECK_PARAM(request_timeout, rto);
_CHECK_PARAM(requests_per_second, rps); _CHECK_PARAM(requests_per_second, rps);
_CHECK_PARAM(list_requests_per_second, lrps);
_CHECK_PARAM(write_requests_per_second, wrps);
_CHECK_PARAM(read_requests_per_second, rrps);
_CHECK_PARAM(delete_requests_per_second, drps);
_CHECK_PARAM(concurrent_requests, cr); _CHECK_PARAM(concurrent_requests, cr);
_CHECK_PARAM(multipart_max_part_size, maxps); _CHECK_PARAM(multipart_max_part_size, maxps);
_CHECK_PARAM(multipart_min_part_size, minps); _CHECK_PARAM(multipart_min_part_size, minps);
@ -195,6 +207,8 @@ std::string BlobStoreEndpoint::getResourceURL(std::string resource) {
} }
ACTOR Future<bool> objectExists_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) { ACTOR Future<bool> objectExists_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) {
Void _ = wait(b->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object; std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers; HTTP::Headers headers;
@ -207,6 +221,8 @@ Future<bool> BlobStoreEndpoint::objectExists(std::string const &bucket, std::str
} }
ACTOR Future<Void> deleteObject_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) { ACTOR Future<Void> deleteObject_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) {
Void _ = wait(b->requestRateDelete->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object; std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers; HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0, {200, 204, 404})); Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0, {200, 204, 404}));
@ -273,9 +289,10 @@ Future<Void> BlobStoreEndpoint::deleteRecursively(std::string const &bucket, std
} }
ACTOR Future<Void> createBucket_impl(Reference<BlobStoreEndpoint> b, std::string bucket) { ACTOR Future<Void> createBucket_impl(Reference<BlobStoreEndpoint> b, std::string bucket) {
Void _ = wait(b->requestRateWrite->getAllowance(1));
std::string resource = std::string("/") + bucket; std::string resource = std::string("/") + bucket;
HTTP::Headers headers; HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("PUT", resource, headers, NULL, 0, {200, 409})); Reference<HTTP::Response> r = wait(b->doRequest("PUT", resource, headers, NULL, 0, {200, 409}));
return Void(); return Void();
} }
@ -285,6 +302,8 @@ Future<Void> BlobStoreEndpoint::createBucket(std::string const &bucket) {
} }
ACTOR Future<int64_t> objectSize_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) { ACTOR Future<int64_t> objectSize_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) {
Void _ = wait(b->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object; std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers; HTTP::Headers headers;
@ -789,6 +808,8 @@ void BlobStoreEndpoint::setAuthHeaders(std::string const &verb, std::string cons
} }
ACTOR Future<std::string> readEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) { ACTOR Future<std::string> readEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
Void _ = wait(bstore->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object; std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers; HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 404})); Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 404}));
@ -805,6 +826,7 @@ ACTOR Future<Void> writeEntireFileFromBuffer_impl(Reference<BlobStoreEndpoint> b
if(contentLen > bstore->knobs.multipart_max_part_size) if(contentLen > bstore->knobs.multipart_max_part_size)
throw file_too_large(); throw file_too_large();
Void _ = wait(bstore->requestRateWrite->getAllowance(1));
Void _ = wait(bstore->concurrentUploads.take()); Void _ = wait(bstore->concurrentUploads.take());
state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1); state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1);
@ -856,6 +878,8 @@ Future<Void> BlobStoreEndpoint::writeEntireFileFromBuffer(std::string const &buc
ACTOR Future<int> readObject_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, void *data, int length, int64_t offset) { ACTOR Future<int> readObject_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, void *data, int length, int64_t offset) {
if(length <= 0) if(length <= 0)
return 0; return 0;
Void _ = wait(bstore->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object; std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers; HTTP::Headers headers;
headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1); headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1);
@ -874,6 +898,8 @@ Future<int> BlobStoreEndpoint::readObject(std::string const &bucket, std::string
} }
ACTOR static Future<std::string> beginMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) { ACTOR static Future<std::string> beginMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
Void _ = wait(bstore->requestRateWrite->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object + "?uploads"; std::string resource = std::string("/") + bucket + "/" + object + "?uploads";
HTTP::Headers headers; HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0, {200})); Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0, {200}));
@ -892,6 +918,7 @@ Future<std::string> BlobStoreEndpoint::beginMultiPartUpload(std::string const &b
} }
ACTOR Future<std::string> uploadPart_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string uploadID, unsigned int partNumber, UnsentPacketQueue *pContent, int contentLen, std::string contentMD5) { ACTOR Future<std::string> uploadPart_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string uploadID, unsigned int partNumber, UnsentPacketQueue *pContent, int contentLen, std::string contentMD5) {
Void _ = wait(bstore->requestRateWrite->getAllowance(1));
Void _ = wait(bstore->concurrentUploads.take()); Void _ = wait(bstore->concurrentUploads.take());
state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1); state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1);
@ -921,6 +948,7 @@ Future<std::string> BlobStoreEndpoint::uploadPart(std::string const &bucket, std
ACTOR Future<Void> finishMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string uploadID, BlobStoreEndpoint::MultiPartSetT parts) { ACTOR Future<Void> finishMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string uploadID, BlobStoreEndpoint::MultiPartSetT parts) {
state UnsentPacketQueue part_list(); // NonCopyable state var so must be declared at top of actor state UnsentPacketQueue part_list(); // NonCopyable state var so must be declared at top of actor
Void _ = wait(bstore->requestRateWrite->getAllowance(1));
std::string manifest = "<CompleteMultipartUpload>"; std::string manifest = "<CompleteMultipartUpload>";
for(auto &p : parts) for(auto &p : parts)

View File

@ -55,6 +55,10 @@ public:
request_tries, request_tries,
request_timeout, request_timeout,
requests_per_second, requests_per_second,
list_requests_per_second,
write_requests_per_second,
read_requests_per_second,
delete_requests_per_second,
multipart_max_part_size, multipart_max_part_size,
multipart_min_part_size, multipart_min_part_size,
concurrent_requests, concurrent_requests,
@ -78,6 +82,10 @@ public:
"request_tries (or rt) Number of times to try each request until a parseable HTTP response other than 429 is received.", "request_tries (or rt) Number of times to try each request until a parseable HTTP response other than 429 is received.",
"request_timeout (or rto) Number of seconds to wait for a request to succeed after a connection is established.", "request_timeout (or rto) Number of seconds to wait for a request to succeed after a connection is established.",
"requests_per_second (or rps) Max number of requests to start per second.", "requests_per_second (or rps) Max number of requests to start per second.",
"list_requests_per_second (or lrps) Max number of list requests to start per second.",
"write_requests_per_second (or wrps) Max number of write requests to start per second.",
"read_requests_per_second (or rrps) Max number of read requests to start per second.",
"delete_requests_per_second (or drps) Max number of delete requests to start per second.",
"multipart_max_part_size (or maxps) Max part size for multipart uploads.", "multipart_max_part_size (or maxps) Max part size for multipart uploads.",
"multipart_min_part_size (or minps) Min part size for multipart uploads.", "multipart_min_part_size (or minps) Min part size for multipart uploads.",
"concurrent_requests (or cr) Max number of total requests in progress at once, regardless of operation-specific concurrency limits.", "concurrent_requests (or cr) Max number of total requests in progress at once, regardless of operation-specific concurrency limits.",
@ -97,6 +105,10 @@ public:
BlobStoreEndpoint(std::string const &host, std::string service, std::string const &key, std::string const &secret, BlobKnobs const &knobs = BlobKnobs()) BlobStoreEndpoint(std::string const &host, std::string service, std::string const &key, std::string const &secret, BlobKnobs const &knobs = BlobKnobs())
: host(host), service(service), key(key), secret(secret), lookupSecret(secret.empty()), knobs(knobs), : host(host), service(service), key(key), secret(secret), lookupSecret(secret.empty()), knobs(knobs),
requestRate(new SpeedLimit(knobs.requests_per_second, 1)), requestRate(new SpeedLimit(knobs.requests_per_second, 1)),
requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)),
requestRateWrite(new SpeedLimit(knobs.write_requests_per_second, 1)),
requestRateRead(new SpeedLimit(knobs.read_requests_per_second, 1)),
requestRateDelete(new SpeedLimit(knobs.delete_requests_per_second, 1)),
sendRate(new SpeedLimit(knobs.max_send_bytes_per_second, 1)), sendRate(new SpeedLimit(knobs.max_send_bytes_per_second, 1)),
recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)), recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)),
concurrentRequests(knobs.concurrent_requests), concurrentRequests(knobs.concurrent_requests),
@ -135,6 +147,10 @@ public:
// Speed and concurrency limits // Speed and concurrency limits
Reference<IRateControl> requestRate; Reference<IRateControl> requestRate;
Reference<IRateControl> requestRateList;
Reference<IRateControl> requestRateWrite;
Reference<IRateControl> requestRateRead;
Reference<IRateControl> requestRateDelete;
Reference<IRateControl> sendRate; Reference<IRateControl> sendRate;
Reference<IRateControl> recvRate; Reference<IRateControl> recvRate;
FlowLock concurrentRequests; FlowLock concurrentRequests;