Rewrote most error handling in BlobStoreEndpoint to fix several shortcomings in error handling and logging. The request loop now logs but rate limits all errors, and the exceptions thrown are more appropriate. HTTP 503 is now treated as retryable. Callers of BlobStoreEndpoint::doRequest() now specify which codes they consider to be successful so that more error handling can take place in the main request loop.

This commit is contained in:
Stephen Atherton 2017-10-18 02:52:09 -07:00
parent 770d9663d8
commit ebd0234514
3 changed files with 103 additions and 95 deletions

View File

@ -247,13 +247,8 @@ ACTOR Future<bool> objectExists_impl(Reference<BlobStoreEndpoint> b, std::string
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0));
if(r->code == 200)
return true;
if(r->code == 404)
return false;
throw http_bad_response();
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200, 404}));
return r->code == 200;
}
Future<bool> BlobStoreEndpoint::objectExists(std::string const &bucket, std::string const &object) {
@ -263,11 +258,9 @@ Future<bool> BlobStoreEndpoint::objectExists(std::string const &bucket, std::str
ACTOR Future<Void> deleteObject_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0));
// 200 means object deleted, 404 means it doesn't exist, so we'll call that success as well
if(r->code == 200 || r->code == 404)
return Void();
throw http_bad_response();
Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0, {200, 404}));
// 200 means object deleted, 404 means it doesn't exist already, so either success code passed above is fine.
return Void();
}
Future<Void> BlobStoreEndpoint::deleteObject(std::string const &bucket, std::string const &object) {
@ -304,9 +297,7 @@ ACTOR Future<int64_t> objectSize_impl(Reference<BlobStoreEndpoint> b, std::strin
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0));
if(r->code != 200)
throw io_error();
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200}));
return r->contentLen;
}
@ -334,7 +325,7 @@ Future<Reference<IConnection>> BlobStoreEndpoint::connect( NetworkAddress addres
// Do a request, get a Response.
// Request content is provided as UnsentPacketQueue *pContent which will be depleted as bytes are sent but the queue itself must live for the life of this actor
// and be destroyed by the caller
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoint> bstore, std::string verb, std::string resource, HTTP::Headers headers, UnsentPacketQueue *pContent, int contentLen) {
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoint> bstore, std::string verb, std::string resource, HTTP::Headers headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes) {
state UnsentPacketQueue contentCopy;
// Set content length header if there is content
@ -344,12 +335,15 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
Void _ = wait(bstore->concurrentRequests.take(1));
state FlowLock::Releaser globalReleaser(bstore->concurrentRequests, 1);
state int tries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries);
state double retryDelay = 2.0;
state int maxTries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries);
state int thisTry = 1;
state double nextRetryDelay = 2.0;
state NetworkAddress address;
loop {
state Optional<Error> err;
try {
// Pick an adress
state NetworkAddress address = bstore->addresses[g_random->randomInt(0, bstore->addresses.size())];
address = bstore->addresses[g_random->randomInt(0, bstore->addresses.size())];
state FlowLock::Releaser perAddressReleaser;
Void _ = wait(bstore->concurrentRequestsPerAddress[address]->take(1));
perAddressReleaser = FlowLock::Releaser(*bstore->concurrentRequestsPerAddress[address], 1);
@ -379,57 +373,85 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
Void _ = wait(bstore->requestRate->getAllowance(1));
state Reference<HTTP::Response> r = wait(timeoutError(HTTP::doRequest(conn, verb, resource, headers, &contentCopy, contentLen, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate), bstore->knobs.request_timeout));
std::string connectionHeader;
HTTP::Headers::iterator i = r->headers.find("Connection");
if(i != r->headers.end())
connectionHeader = i->second;
// If the response parsed successfully (which is why we reached this point) and the connection can be reused, put the connection in the connection_pool
if(connectionHeader != "close")
// Since the response was parsed successfully (which is why we are here) reuse the connection unless we received the "Connection: close" header.
if(r->headers["Connection"] != "close")
bstore->connectionPool[address].push_back(BlobStoreEndpoint::ConnPoolEntry(conn, now()));
// Handle retry-after response code
if(r->code == 429) {
bstore->s_stats.requests_failed++;
conn = Reference<IConnection>();
double d = 60;
if(r->headers.count("Retry-After"))
d = atof(r->headers["Retry-After"].c_str());
Void _ = wait(delay(d));
// Just continue, don't throw an error, don't decrement tries
}
else if(r->code == 406) {
// Blob returns this when the account doesn't exist
throw http_not_accepted();
}
else if(r->code == 500) {
// For error 500 just treat it like connection_failed
throw connection_failed();
}
else
break;
conn.clear();
} catch(Error &e) {
// If the error is connection failed and a retry is allowed then ignore the error
if((e.code() == error_code_connection_failed || e.code() == error_code_timed_out) && --tries > 0) {
bstore->s_stats.requests_failed++;
//TraceEvent(SevWarn, "BlobStoreHTTPConnectionFailed").detail("Verb", verb).detail("Resource", resource).detail("Host", bstore->host).detail("Port", bstore->port);
//printf("Retrying (%d left) %s %s\n", tries, verb.c_str(), resource.c_str());
Void _ = wait(delay(retryDelay));
retryDelay *= 2;
retryDelay = std::min(retryDelay, 60.0);
}
// For timeouts, conn failure, or bad reponse reported by HTTP:doRequest, save the error and handle it / possibly retry below.
// Any other error is rethrown.
if(e.code() == error_code_connection_failed || e.code() == error_code_timed_out || e.code() == error_code_http_bad_response)
err = e;
else
throw;
}
// If err is not present then r is valid.
// If r->code is in successCodes then record the successful request and return r.
if(!err.present() && successCodes.count(r->code) != 0) {
bstore->s_stats.requests_successful++;
return r;
}
// Otherwise, this request is considered failed. Update failure count.
bstore->s_stats.requests_failed++;
// All errors in err are potentially retryable as well as certain HTTP response codes...
bool retryAble = err.present() || r->code == 500 || r->code == 503;
// But only if our previous attempt was not the last allowable try.
retryAble = retryAble && (thisTry < maxTries);
TraceEvent event(SevInfo, retryAble ? "BlobStoreEndpointRequestFailed" : "BlobStoreEndpointRequestFailedRetryable");
event.detail("RemoteEndpoint", address)
.detail("Verb", verb)
.detail("Resource", resource)
.detail("ThisTry", thisTry)
.suppressFor(5, true);
// We will wait delay seconds before the next retry, start with nextRetryDelay.
double delay = nextRetryDelay;
// Double but limit the *next* nextRetryDelay.
nextRetryDelay = std::min(nextRetryDelay * 2, 60.0);
// Attach err to trace event if present, otherwise extract some stuff from the response
if(err.present())
event.error(err.get());
else {
event.detail("ResponseCode", r->code);
// Check for the Retry-After header which is present with certain types of errors
auto iRetryAfter = r->headers.find("Retry-After");
if(iRetryAfter != r->headers.end()) {
event.detail("RetryAfterHeader", iRetryAfter->second);
char *pEnd;
double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd);
if(*pEnd) // If there were other characters then don't trust the parsed value, use a probably safe value of 5 minutes.
retryAfter = 300;
delay = std::max(delay, retryAfter);
}
}
// For retryable errors, log the delay then wait.
if(retryAble) {
event.detail("RetryDelay", delay);
Void _ = wait(::delay(delay));
}
else {
// We can't retry, so throw something.
// This error code means the authentication header was not accepted, likely the account or key is wrong.
if(r->code == 406)
throw http_not_accepted();
throw http_request_failed();
}
}
bstore->s_stats.requests_successful++;
return r;
}
Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen) {
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen);
Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes) {
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen, successCodes);
}
ACTOR Future<Void> getBucketContentsStream_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, PromiseStream<BlobStoreEndpoint::ObjectInfo> results) {
@ -440,7 +462,7 @@ ACTOR Future<Void> getBucketContentsStream_impl(Reference<BlobStoreEndpoint> bst
while(more) {
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0));
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0, {200}));
try {
// Parse the json assuming it is valid and contains the right stuff. If any exceptions are thrown, throw http_bad_response
@ -571,12 +593,10 @@ void BlobStoreEndpoint::setAuthHeaders(std::string const &verb, std::string cons
ACTOR Future<std::string> readEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0));
if(r->code == 200)
return r->content;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 404}));
if(r->code == 404)
throw file_not_found();
throw http_bad_response();
return r->content;
}
Future<std::string> BlobStoreEndpoint::readEntireFile(std::string const &bucket, std::string const &object) {
@ -597,16 +617,13 @@ ACTOR Future<Void> writeEntireFileFromBuffer_impl(Reference<BlobStoreEndpoint> b
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200}));
// For uploads, Blobstore returns an MD5 sum of uploaded content so check that too.
auto sum = r->headers.find("Content-MD5");
if(sum == r->headers.end() || sum->second != contentMD5)
throw http_bad_response();
// For uploads, Blobstore returns an MD5 sum of uploaded content so check it.
if(r->headers["Content-MD5"] != contentMD5)
throw checksum_failed();
if(r->code == 200)
return Void();
throw http_bad_response();
return Void();
}
ACTOR Future<Void> writeEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string content) {
@ -647,9 +664,7 @@ ACTOR Future<int> readObject_impl(Reference<BlobStoreEndpoint> bstore, std::stri
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1);
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0));
if(r->code != 200 && r->code != 206)
throw file_not_readable();
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 206}));
if(r->contentLen != r->content.size()) // Double check that this wasn't a header-only response, probably unnecessary
throw io_error();
// Copy the output bytes, server could have sent more or less bytes than requested so copy at most length bytes
@ -664,9 +679,7 @@ Future<int> BlobStoreEndpoint::readObject(std::string const &bucket, std::string
ACTOR static Future<std::string> beginMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object + "?uploads";
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0));
if(r->code != 200)
throw file_not_writable();
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0, {200}));
int start = r->content.find("<UploadId>");
if(start == std::string::npos)
throw http_bad_response();
@ -689,16 +702,13 @@ ACTOR Future<std::string> uploadPart_impl(Reference<BlobStoreEndpoint> bstore, s
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200}));
// For uploads, Blobstore returns an MD5 sum of uploaded content so check that too.
auto sum = r->headers.find("Content-MD5");
if(sum == r->headers.end() || sum->second != contentMD5)
throw http_bad_response();
if(r->code != 200)
throw http_bad_response();
// For uploads, Blobstore returns an MD5 sum of uploaded content so check it.
if(r->headers["Content-MD5"] != contentMD5)
throw checksum_failed();
// No etag -> bad response.
std::string etag = r->headers["ETag"];
if(etag.empty())
throw http_bad_response();
@ -722,10 +732,7 @@ ACTOR Future<Void> finishMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstor
HTTP::Headers headers;
PacketWriter pw(part_list.getWriteBuffer(), NULL, Unversioned());
pw.serializeBytes(manifest);
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size()));
if(r->code != 200)
throw http_bad_response();
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), {200}));
return Void();
}

View File

@ -149,8 +149,8 @@ public:
// Do an HTTP request to the Blob Store, read the response. Handles authentication.
// Every blob store interaction should ultimately go through this function
Future<Reference<HTTP::Response>> doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen);
Future<Reference<HTTP::Response>> doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes);
struct ObjectInfo {
std::string bucket;
std::string name;

View File

@ -92,11 +92,12 @@ ERROR( file_not_writable, 1514, "File could not be written to" )
ERROR( no_cluster_file_found, 1515, "No cluster file found in current directory or default location" )
ERROR( file_too_large, 1516, "File too large to be read" )
ERROR( non_sequential_op, 1517, "Non sequential file operation not allowed." )
ERROR( http_bad_response, 1518, "HTTP response was not valid." )
ERROR( http_bad_response, 1518, "HTTP response was badly formed." )
ERROR( http_not_accepted, 1519, "HTTP request not accepted." )
ERROR( checksum_failed, 1520, "A data checksum failed." )
ERROR( io_timeout, 1521, "A disk IO operation failed to complete in a timely manner." )
ERROR( file_corrupt, 1522, "A structurally corrupt data file was detected." )
ERROR( http_request_failed, 1523, "HTTP response code indicated failure." )
// 2xxx Attempt (presumably by a _client_) to do something illegal. If an error is known to
// be internally caused, it should be 41xx