Check bucket exist or not, rather than listBucket

This commit is contained in:
hao fu 2024-09-01 15:38:53 -07:00
parent 640b0fe7f3
commit 5295920ded
1 changed files with 50 additions and 17 deletions

View File

@ -906,8 +906,8 @@ std::string getCanonicalURI(Reference<S3BlobStoreEndpoint> bstore, Reference<HTT
return canonicalURI;
}
Reference<HTTP::OutgoingRequest> getDryrunRequest(Reference<S3BlobStoreEndpoint> bstore) {
// dryrun with a listbucket request, to avoid sending duplicate data
Reference<HTTP::OutgoingRequest> getDryrunRequest(Reference<S3BlobStoreEndpoint> bstore, std::string bucket) {
// dryrun with a check bucket exist request, to avoid sending duplicate data
UnsentPacketQueue contentCopy;
HTTP::Headers headers;
Reference<HTTP::OutgoingRequest> dryrunRequest = makeReference<HTTP::OutgoingRequest>();
@ -920,12 +920,28 @@ Reference<HTTP::OutgoingRequest> getDryrunRequest(Reference<S3BlobStoreEndpoint>
dryrunRequest->data.headers["Host"] = bstore->host;
dryrunRequest->data.headers["Accept"] = "application/xml";
// this is copied from listBuckets_impl()listBuckets_impl()
dryrunRequest->resource = "/?marker=";
dryrunRequest->resource = constructResourcePath(bstore, bucket, "");
return dryrunRequest;
}
bool isWriteRequest(std::string verb) {
return verb == "POST" || verb == "PUT";
}
std::string parseBucketFromURI(std::string uri) {
if (uri.size() <= 1 || uri[0] != '/') {
// there is no bucket in the uri
return "";
}
uri = uri.substr(1);
size_t secondSlash = uri.find('/');
if (secondSlash == std::string::npos) {
return uri;
}
return uri.substr(0, secondSlash);
}
// Do a request, get a Response.
// Request content is provided as UnsentPacketQueue *pContent which will be depleted as bytes are sent but the queue
// itself must live for the life of this actor and be destroyed by the caller
@ -1018,32 +1034,40 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
reqStartTimer = g_network->timer();
try {
if (s3TokenError) {
// if there is a S3TokenError, this might due to expired or invalid S3 token from the disk
// we should avoid sending duplicate data indefinitly to save network bandwidth
// thus have a "dryrun" request to list all buckets
// we do not have authorization but authorization only happens after authentication.
if (s3TokenError && isWriteRequest(req->verb)) {
// if it is a write request with s3TokenError, retry with a HEAD dryrun request
// to avoid sending duplicate data indefinitly to save network bandwidth
// because it might due to expired or invalid S3 token from the disk
state std::string bucket = parseBucketFromURI(resource);
if (bucket.empty()) {
TraceEvent(SevError, "EmptyBucketRequest")
.detail("S3TokenError", s3TokenError)
.detail("Verb", req->verb)
.detail("Resource", resource)
.log();
throw bucket_not_in_url();
}
TraceEvent("RetryS3RequestDueToTokenIssue")
.detail("S3TokenError", s3TokenError)
.detail("Resource", resource)
.detail("Bucket", bucket)
.detail("V4", CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER)
.log();
state Reference<HTTP::OutgoingRequest> dryrunRequest = getDryrunRequest(bstore);
state Reference<HTTP::OutgoingRequest> dryrunRequest = getDryrunRequest(bstore, bucket);
setHeaders(bstore, dryrunRequest);
dryrunRequest->resource = getCanonicalURI(bstore, dryrunRequest);
wait(bstore->requestRate->getAllowance(1));
Future<Reference<HTTP::IncomingResponse>> dryrunResponse = HTTP::doRequest(
rconn.conn, dryrunRequest, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate);
Reference<HTTP::IncomingResponse> dryrun_R = wait(timeoutError(dryrunResponse, requestTimeout));
dryrunR = dryrun_R;
std::string s3Error = parseErrorCodeFromS3(dryrunR->data.content);
if (isS3TokenError(s3Error)) {
// authentication fails and s3 token error persists, retry in the hope that token from disk is
// fixed
if (dryrunR->code == badRequestCode && isS3TokenError(s3Error)) {
// authentication fails and s3 token error persists, retry in the hope token is corrected
wait(delay(bstore->knobs.max_delay_retryable_error));
} else {
// authentication has passed, authorization error is expected, 403 with `AccessDenied` S3 error.
// it might work now, or it might be another error, thus reset s3TokenError.
} else if (dryrunR->code == 200 || dryrunR->code == 404) {
// authentication has passed, and bucket existence has been verified(200 or 404)
// it might work now(or it might be another error) thus reset s3TokenError.
TraceEvent("S3TokenIssueResolved")
.detail("HttpCode", dryrunR->code)
.detail("HttpResponseContent", dryrunR->data.content)
@ -1051,10 +1075,19 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
.detail("URI", dryrunRequest->resource)
.log();
s3TokenError = false;
} else {
TraceEvent(SevError, "S3UnexpectedError")
.detail("HttpCode", dryrunR->code)
.detail("HttpResponseContent", dryrunR->data.content)
.detail("S3Error", s3Error)
.detail("URI", dryrunRequest->resource)
.log();
throw http_bad_response();
}
continue;
}
} catch (Error& e) {
// retry with GET failed, but continue to do original request anyway
TraceEvent(SevError, "ErrorDuringRetryS3TokenIssue").errorUnsuppressed(e);
}
setHeaders(bstore, req);