Make backup work with s3 compatible service (#6355)(#6382) (#7324)

1. Support virtual hosting endpoint.

2. On-premise s3 compatible storage service may use IP instead of s3 form domain name,
especially for development/test environment.

Instead of parsing service and region from domain name,

1). Hard code "s3" as service name in v4 signature
2). Add new parameter to allow pass region name from url

3. Fix creating bucket issue on aws, adding request body.
This commit is contained in:
Zhanwei Wang 2022-06-14 04:33:05 +08:00 committed by GitHub
parent 013b290ca5
commit e632aef1c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 29 deletions

View File

@ -83,7 +83,7 @@ For blob store backup locations, the Backup URL format is
::
blobstore://[<api_key>][:<secret>[:<security_token>]]@<hostname>[:<port>]/<name>?bucket=<bucket_name>[&<param>=<value>]...]
blobstore://[<api_key>][:<secret>[:<security_token>]]@<hostname>[:<port>]/<name>?bucket=<bucket_name>[&region=<region_name>][&<param>=<value>]...]
<api_key> - API key to use for authentication. Optional.
<secret> - API key's secret. Optional.
@ -92,6 +92,7 @@ For blob store backup locations, the Backup URL format is
<port> - Remote port to connect to. Optional. Default is 80.
<name> - Name of the backup within the backup bucket. It can contain '/' characters in order to organize backups into a folder-like structure.
<bucket_name> - Name of the bucket to use for backup data.
<region_name> - If <hostname> is not in s3 compatible form (s3.region-name.example.com) and aws v4 signature is enabled, region name is required.
<param>=<value> - Optional URL parameters. See below for details.

View File

@ -169,6 +169,32 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
return r;
}
std::string guessRegionFromDomain(std::string domain) {
static const std::vector<const char*> knownServices = { "s3.", "cos.", "oss-", "obs." };
boost::algorithm::to_lower(domain);
for (int i = 0; i < knownServices.size(); ++i) {
const char* service = knownServices[i];
std::size_t p = domain.find(service);
if (p == std::string::npos || (p >= 1 && domain[p - 1] != '.')) {
// eg. 127.0.0.1, example.com, s3-service.example.com, mys3.example.com
continue;
}
StringRef h(domain.c_str() + p);
if (!h.startsWith(LiteralStringRef("oss-"))) {
h.eat(service); // ignore s3 service
}
return h.eat(".").toString();
}
return "";
}
Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(const std::string& url,
const Optional<std::string>& proxy,
std::string* resourceFromURL,
@ -222,6 +248,8 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(const std::string
StringRef service = h.eat();
std::string region = guessRegionFromDomain(host.toString());
BlobKnobs knobs;
HTTP::Headers extraHeaders;
while (1) {
@ -251,6 +279,12 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(const std::string
continue;
}
// overwrite s3 region from parameter
if (name == LiteralStringRef("region")) {
region = value.toString();
continue;
}
// See if the parameter is a knob
// First try setting a dummy value (all knobs are currently numeric) just to see if this parameter is known
// to S3BlobStoreEndpoint. If it is, then we will set it to a good value or throw below, so the dummy set
@ -289,8 +323,13 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(const std::string
creds = S3BlobStoreEndpoint::Credentials{ key.toString(), secret.toString(), securityToken.toString() };
}
if (region.empty() && CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER) {
throw std::string(
"Failed to get region from host or parameter in url, region is required for aws v4 signature");
}
return makeReference<S3BlobStoreEndpoint>(
host.toString(), service.toString(), proxyHost, proxyPort, creds, knobs, extraHeaders);
host.toString(), service.toString(), region, proxyHost, proxyPort, creds, knobs, extraHeaders);
} catch (std::string& err) {
if (error != nullptr)
@ -356,10 +395,25 @@ std::string S3BlobStoreEndpoint::getResourceURL(std::string resource, std::strin
return r;
}
std::string constructResourcePath(Reference<S3BlobStoreEndpoint> b, std::string bucket, std::string object) {
std::string resource;
if (b->getHost().find(bucket + ".") != 0) {
resource += std::string("/") + bucket; // not virtual hosting mode
}
if (!object.empty()) {
resource += "/";
resource += object;
}
return std::move(resource);
}
ACTOR Future<bool> bucketExists_impl(Reference<S3BlobStoreEndpoint> b, std::string bucket) {
wait(b->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket;
std::string resource = constructResourcePath(b, bucket, "");
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, { 200, 404 }));
@ -373,7 +427,7 @@ Future<bool> S3BlobStoreEndpoint::bucketExists(std::string const& bucket) {
ACTOR Future<bool> objectExists_impl(Reference<S3BlobStoreEndpoint> b, std::string bucket, std::string object) {
wait(b->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object;
std::string resource = constructResourcePath(b, bucket, object);
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, { 200, 404 }));
@ -387,7 +441,7 @@ Future<bool> S3BlobStoreEndpoint::objectExists(std::string const& bucket, std::s
ACTOR Future<Void> deleteObject_impl(Reference<S3BlobStoreEndpoint> b, std::string bucket, std::string object) {
wait(b->requestRateDelete->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object;
std::string resource = constructResourcePath(b, bucket, object);
HTTP::Headers headers;
// 200 or 204 means object successfully deleted, 404 means it already doesn't exist, so any of those are considered
// successful
@ -477,9 +531,24 @@ ACTOR Future<Void> createBucket_impl(Reference<S3BlobStoreEndpoint> b, std::stri
bool exists = wait(b->bucketExists(bucket));
if (!exists) {
std::string resource = std::string("/") + bucket;
std::string resource = constructResourcePath(b, bucket, "");
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("PUT", resource, headers, nullptr, 0, { 200, 409 }));
std::string region = b->getRegion();
if (region.empty()) {
Reference<HTTP::Response> r = wait(b->doRequest("PUT", resource, headers, nullptr, 0, { 200, 409 }));
} else {
UnsentPacketQueue packets;
StringRef body(format("<CreateBucketConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">"
" <LocationConstraint>%s</LocationConstraint>"
"</CreateBucketConfiguration>",
region.c_str()));
PacketWriter pw(packets.getWriteBuffer(), nullptr, Unversioned());
pw.serializeBytes(body);
Reference<HTTP::Response> r =
wait(b->doRequest("PUT", resource, headers, &packets, body.size(), { 200, 409 }));
}
}
return Void();
}
@ -491,7 +560,7 @@ Future<Void> S3BlobStoreEndpoint::createBucket(std::string const& bucket) {
ACTOR Future<int64_t> objectSize_impl(Reference<S3BlobStoreEndpoint> b, std::string bucket, std::string object) {
wait(b->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object;
std::string resource = constructResourcePath(b, bucket, object);
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, { 200, 404 }));
@ -966,8 +1035,8 @@ ACTOR Future<Void> listObjectsStream_impl(Reference<S3BlobStoreEndpoint> bstore,
int maxDepth,
std::function<bool(std::string const&)> recurseFilter) {
// Request 1000 keys at a time, the maximum allowed
state std::string resource = "/";
resource.append(bucket);
state std::string resource = constructResourcePath(bstore, bucket, "");
resource.append("/?max-keys=1000");
if (prefix.present())
resource.append("&prefix=").append(prefix.get());
@ -1324,10 +1393,6 @@ void S3BlobStoreEndpoint::setV4AuthHeaders(std::string const& verb,
amzDate = date;
dateStamp = datestamp;
}
// Extract service and region
StringRef hostRef(host);
std::string service = hostRef.eat(".").toString();
std::string region = hostRef.eat(".").toString();
// ************* TASK 1: CREATE A CANONICAL REQUEST *************
// Create Create canonical URI--the part of the URI from domain to query string (use '/' if no path)
@ -1370,14 +1435,14 @@ void S3BlobStoreEndpoint::setV4AuthHeaders(std::string const& verb,
// ************* TASK 2: CREATE THE STRING TO SIGN*************
std::string algorithm = "AWS4-HMAC-SHA256";
std::string credentialScope = dateStamp + "/" + region + "/" + service + "/" + "aws4_request";
std::string credentialScope = dateStamp + "/" + region + "/s3/" + "aws4_request";
std::string stringToSign =
algorithm + "\n" + amzDate + "\n" + credentialScope + "\n" + sha256_hex(canonicalRequest);
// ************* TASK 3: CALCULATE THE SIGNATURE *************
// Create the signing key using the function defined above.
std::string signingKey = hmac_sha256(
hmac_sha256(hmac_sha256(hmac_sha256("AWS4" + secretKey, dateStamp), region), service), "aws4_request");
std::string signingKey =
hmac_sha256(hmac_sha256(hmac_sha256(hmac_sha256("AWS4" + secretKey, dateStamp), region), "s3"), "aws4_request");
// Sign the string_to_sign using the signing_key
std::string signature = hmac_sha256_hex(signingKey, stringToSign);
// ************* TASK 4: ADD SIGNING INFORMATION TO THE Header *************
@ -1445,7 +1510,7 @@ ACTOR Future<std::string> readEntireFile_impl(Reference<S3BlobStoreEndpoint> bst
std::string object) {
wait(bstore->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object;
std::string resource = constructResourcePath(bstore, bucket, object);
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, nullptr, 0, { 200, 404 }));
if (r->code == 404)
@ -1470,7 +1535,7 @@ ACTOR Future<Void> writeEntireFileFromBuffer_impl(Reference<S3BlobStoreEndpoint>
wait(bstore->concurrentUploads.take());
state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1);
std::string resource = std::string("/") + bucket + "/" + object;
std::string resource = constructResourcePath(bstore, bucket, object);
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
@ -1540,7 +1605,7 @@ ACTOR Future<int> readObject_impl(Reference<S3BlobStoreEndpoint> bstore,
return 0;
wait(bstore->requestRateRead->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object;
std::string resource = constructResourcePath(bstore, bucket, object);
HTTP::Headers headers;
headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1);
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, nullptr, 0, { 200, 206, 404 }));
@ -1567,7 +1632,8 @@ ACTOR static Future<std::string> beginMultiPartUpload_impl(Reference<S3BlobStore
std::string object) {
wait(bstore->requestRateWrite->getAllowance(1));
std::string resource = std::string("/") + bucket + "/" + object + "?uploads";
std::string resource = constructResourcePath(bstore, bucket, object);
resource += "?uploads";
HTTP::Headers headers;
if (!CLIENT_KNOBS->BLOBSTORE_ENCRYPTION_TYPE.empty())
headers["x-amz-server-side-encryption"] = CLIENT_KNOBS->BLOBSTORE_ENCRYPTION_TYPE;
@ -1609,8 +1675,8 @@ ACTOR Future<std::string> uploadPart_impl(Reference<S3BlobStoreEndpoint> bstore,
wait(bstore->concurrentUploads.take());
state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1);
std::string resource =
format("/%s/%s?partNumber=%d&uploadId=%s", bucket.c_str(), object.c_str(), partNumber, uploadID.c_str());
std::string resource = constructResourcePath(bstore, bucket, object);
resource += format("?partNumber=%d&uploadId=%s", partNumber, uploadID.c_str());
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
@ -1662,7 +1728,8 @@ ACTOR Future<Void> finishMultiPartUpload_impl(Reference<S3BlobStoreEndpoint> bst
manifest += format("<Part><PartNumber>%d</PartNumber><ETag>%s</ETag></Part>\n", p.first, p.second.c_str());
manifest += "</CompleteMultipartUpload>";
std::string resource = format("/%s/%s?uploadId=%s", bucket.c_str(), object.c_str(), uploadID.c_str());
std::string resource = constructResourcePath(bstore, bucket, object);
resource += format("?uploadId=%s", uploadID.c_str());
HTTP::Headers headers;
PacketWriter pw(part_list.getWriteBuffer(manifest.size()), nullptr, Unversioned());
pw.serializeBytes(manifest);
@ -1686,7 +1753,7 @@ TEST_CASE("/backup/s3/v4headers") {
S3BlobStoreEndpoint::Credentials creds{ "AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "" }
// GET without query parameters
{
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds);
S3BlobStoreEndpoint s3("s3.amazonaws.com", "443", "amazonaws", "proxy", "port", creds);
std::string verb("GET");
std::string resource("/test.txt");
HTTP::Headers headers;
@ -1701,7 +1768,7 @@ TEST_CASE("/backup/s3/v4headers") {
// GET with query parameters
{
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds);
S3BlobStoreEndpoint s3("s3.amazonaws.com", "443", "amazonaws", "proxy", "port", creds);
std::string verb("GET");
std::string resource("/test/examplebucket?Action=DescribeRegions&Version=2013-10-15");
HTTP::Headers headers;
@ -1716,7 +1783,7 @@ TEST_CASE("/backup/s3/v4headers") {
// POST
{
S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", "proxy", "port", creds);
S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "443", "us-west-2", "proxy", "port", creds);
std::string verb("POST");
std::string resource("/simple.json");
HTTP::Headers headers;
@ -1733,4 +1800,4 @@ TEST_CASE("/backup/s3/v4headers") {
}
return Void();
}
}

View File

@ -100,12 +100,13 @@ public:
S3BlobStoreEndpoint(std::string const& host,
std::string const& service,
std::string region,
Optional<std::string> const& proxyHost,
Optional<std::string> const& proxyPort,
Optional<Credentials> const& creds,
BlobKnobs const& knobs = BlobKnobs(),
HTTP::Headers extraHeaders = HTTP::Headers())
: host(host), service(service), proxyHost(proxyHost), proxyPort(proxyPort),
: host(host), service(service), region(region), proxyHost(proxyHost), proxyPort(proxyPort),
useProxy(proxyHost.present() && proxyPort.present()), credentials(creds),
lookupKey(creds.present() && creds.get().key.empty()),
lookupSecret(creds.present() && creds.get().secret.empty()), knobs(knobs), extraHeaders(extraHeaders),
@ -156,6 +157,7 @@ public:
std::string host;
std::string service;
std::string region;
Optional<std::string> proxyHost;
Optional<std::string> proxyPort;
bool useProxy;
@ -193,6 +195,10 @@ public:
std::string date = "",
std::string datestamp = "");
std::string getHost() const { return host; }
std::string getRegion() const { return region; }
// Prepend the HTTP request header to the given PacketBuffer, returning the new head of the buffer chain
static PacketBuffer* writeRequestHeader(std::string const& request,
HTTP::Headers const& headers,