From 55e86c13360f3e5f1232525c928ad4736b9d993e Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Thu, 29 Oct 2020 20:42:23 -0700 Subject: [PATCH] Disambiguate between S3BlobStore and other blob stores --- fdbbackup/backup.actor.cpp | 8 +- ...tor.cpp => AsyncFileS3BlobStore.actor.cpp} | 38 +- ...e.actor.h => AsyncFileS3BlobStore.actor.h} | 160 +-- fdbclient/BackupContainer.actor.cpp | 14 +- .../BackupContainerS3BlobStore.actor.cpp | 20 +- fdbclient/BackupContainerS3BlobStore.h | 10 +- fdbclient/BlobStore.actor.cpp | 1194 ---------------- fdbclient/CMakeLists.txt | 6 +- fdbclient/S3BlobStore.actor.cpp | 1249 +++++++++++++++++ fdbclient/{BlobStore.h => S3BlobStore.h} | 154 +- fdbrpc/IAsyncFile.h | 2 +- tests/CMakeLists.txt | 2 +- tests/{BlobStore.txt => S3BlobStore.txt} | 0 13 files changed, 1457 insertions(+), 1400 deletions(-) rename fdbclient/{AsyncFileBlobStore.actor.cpp => AsyncFileS3BlobStore.actor.cpp} (74%) rename fdbclient/{AsyncFileBlobStore.actor.h => AsyncFileS3BlobStore.actor.h} (58%) delete mode 100644 fdbclient/BlobStore.actor.cpp create mode 100644 fdbclient/S3BlobStore.actor.cpp rename fdbclient/{BlobStore.h => S3BlobStore.h} (53%) rename tests/{BlobStore.txt => S3BlobStore.txt} (100%) diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index eb64bcbe3a..3c908d8f40 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -38,7 +38,7 @@ #include "fdbclient/BackupContainer.h" #include "fdbclient/KeyBackedTypes.h" #include "fdbclient/RunTransaction.actor.h" -#include "fdbclient/BlobStore.h" +#include "fdbclient/S3BlobStore.h" #include "fdbclient/json_spirit/json_spirit_writer_template.h" #include "flow/Platform.h" @@ -1460,12 +1460,12 @@ ACTOR Future getLayerStatus(Reference tr o.create("configured_workers") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT; if(exe == EXE_AGENT) { - static BlobStoreEndpoint::Stats last_stats; + static S3BlobStoreEndpoint::Stats last_stats; static double last_ts = 0; - BlobStoreEndpoint::Stats current_stats = BlobStoreEndpoint::s_stats; + S3BlobStoreEndpoint::Stats current_stats = S3BlobStoreEndpoint::s_stats; JSONDoc blobstats = o.create("blob_stats"); blobstats.create("total") = current_stats.getJSON(); - BlobStoreEndpoint::Stats diff = current_stats - last_stats; + S3BlobStoreEndpoint::Stats diff = current_stats - last_stats; json_spirit::mObject diffObj = diff.getJSON(); if(last_ts > 0) diffObj["bytes_per_second"] = double(current_stats.bytes_sent - last_stats.bytes_sent) / (now() - last_ts); diff --git a/fdbclient/AsyncFileBlobStore.actor.cpp b/fdbclient/AsyncFileS3BlobStore.actor.cpp similarity index 74% rename from fdbclient/AsyncFileBlobStore.actor.cpp rename to fdbclient/AsyncFileS3BlobStore.actor.cpp index a12317f8f8..5eb38d5063 100644 --- a/fdbclient/AsyncFileBlobStore.actor.cpp +++ b/fdbclient/AsyncFileS3BlobStore.actor.cpp @@ -1,5 +1,5 @@ /* - * AsyncFileBlobStore.actor.cpp + * AsyncFileS3BlobStore.actor.cpp * * This source file is part of the FoundationDB open source project * @@ -18,40 +18,37 @@ * limitations under the License. */ -#include "fdbclient/AsyncFileBlobStore.actor.h" +#include "fdbclient/AsyncFileS3BlobStore.actor.h" #include "fdbrpc/AsyncFileReadAhead.actor.h" #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // has to be last include -Future AsyncFileBlobStoreRead::size() const { - if(!m_size.isValid()) - m_size = m_bstore->objectSize(m_bucket, m_object); +Future AsyncFileS3BlobStoreRead::size() const { + if (!m_size.isValid()) m_size = m_bstore->objectSize(m_bucket, m_object); return m_size; } -Future AsyncFileBlobStoreRead::read( void *data, int length, int64_t offset ) { +Future AsyncFileS3BlobStoreRead::read(void* data, int length, int64_t offset) { return m_bstore->readObject(m_bucket, m_object, data, length, offset); } - ACTOR Future sendStuff(int id, Reference t, int bytes) { printf("Starting fake sender %d which will send send %d bytes.\n", id, bytes); state double ts = timer(); state int total = 0; - while(total < bytes) { - state int r = std::min(deterministicRandom()->randomInt(0,1000), bytes - total); + while (total < bytes) { + state int r = std::min(deterministicRandom()->randomInt(0, 1000), bytes - total); wait(t->getAllowance(r)); total += r; } double dur = timer() - ts; - printf("Sender %d: Sent %d in %fs, %f/s\n", id, total, dur, total/dur); + printf("Sender %d: Sent %d in %fs, %f/s\n", id, total, dur, total / dur); return Void(); } TEST_CASE("/backup/throttling") { // Test will not work in simulation. - if(g_network->isSimulated()) - return Void(); + if (g_network->isSimulated()) return Void(); state int limit = 100000; state Reference t(new SpeedLimit(limit, 1)); @@ -62,13 +59,18 @@ TEST_CASE("/backup/throttling") { state int total = 0; int s; s = 500000; - f.push_back(sendStuff(id++, t, s)); total += s; - f.push_back(sendStuff(id++, t, s)); total += s; + f.push_back(sendStuff(id++, t, s)); + total += s; + f.push_back(sendStuff(id++, t, s)); + total += s; s = 50000; - f.push_back(sendStuff(id++, t, s)); total += s; - f.push_back(sendStuff(id++, t, s)); total += s; + f.push_back(sendStuff(id++, t, s)); + total += s; + f.push_back(sendStuff(id++, t, s)); + total += s; s = 5000; - f.push_back(sendStuff(id++, t, s)); total += s; + f.push_back(sendStuff(id++, t, s)); + total += s; wait(waitForAll(f)); double dur = timer() - ts; @@ -78,5 +80,3 @@ TEST_CASE("/backup/throttling") { return Void(); } - - diff --git a/fdbclient/AsyncFileBlobStore.actor.h b/fdbclient/AsyncFileS3BlobStore.actor.h similarity index 58% rename from fdbclient/AsyncFileBlobStore.actor.h rename to fdbclient/AsyncFileS3BlobStore.actor.h index b070cd65d9..49f011ffa7 100644 --- a/fdbclient/AsyncFileBlobStore.actor.h +++ b/fdbclient/AsyncFileS3BlobStore.actor.h @@ -1,5 +1,5 @@ /* - * AsyncFileBlobStore.actor.h + * AsyncFileS3BlobStore.actor.h * * This source file is part of the FoundationDB open source project * @@ -20,12 +20,13 @@ #pragma once -// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version. +// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source +// version. #if defined(NO_INTELLISENSE) && !defined(FDBRPC_ASYNCFILEBLOBSTORE_ACTOR_G_H) - #define FDBRPC_ASYNCFILEBLOBSTORE_ACTOR_G_H - #include "fdbclient/AsyncFileBlobStore.actor.g.h" -#elif !defined(FDBRPC_ASYNCFILEBLOBSTORE_ACTOR_H) - #define FDBRPC_ASYNCFILEBLOBSTORE_ACTOR_H +#define FDBRPC_ASYNCFILEBLOBSTORE_ACTOR_G_H +#include "fdbclient/AsyncFileS3BlobStore.actor.g.h" +#elif !defined(FDBRPC_ASYNCFILES3BLOBSTORE_ACTOR_H) +#define FDBRPC_ASYNCFILES3BLOBSTORE_ACTOR_H #include #include @@ -34,55 +35,54 @@ #include "flow/serialize.h" #include "flow/Net2Packet.h" #include "fdbrpc/IRateControl.h" -#include "fdbclient/BlobStore.h" +#include "fdbclient/S3BlobStore.h" #include "fdbclient/md5/md5.h" #include "fdbclient/libb64/encode.h" -#include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/actorcompiler.h" // This must be the last #include. -ACTOR template static Future joinErrorGroup(Future f, Promise p) { +ACTOR template +static Future joinErrorGroup(Future f, Promise p) { try { wait(success(f) || p.getFuture()); return f.get(); - } catch(Error &e) { - if(p.canBeSet()) - p.sendError(e); + } catch (Error& e) { + if (p.canBeSet()) p.sendError(e); throw; } } // This class represents a write-only file that lives in an S3-style blob store. It writes using the REST API, // using multi-part upload and beginning to transfer each part as soon as it is large enough. // All write operations file operations must be sequential and contiguous. -// Limits on part sizes, upload speed, and concurrent uploads are taken from the BlobStoreEndpoint being used. -class AsyncFileBlobStoreWrite : public IAsyncFile, public ReferenceCounted { +// Limits on part sizes, upload speed, and concurrent uploads are taken from the S3BlobStoreEndpoint being used. +class AsyncFileS3BlobStoreWrite : public IAsyncFile, public ReferenceCounted { public: - virtual void addref() { ReferenceCounted::addref(); } - virtual void delref() { ReferenceCounted::delref(); } + virtual void addref() { ReferenceCounted::addref(); } + virtual void delref() { ReferenceCounted::delref(); } struct Part : ReferenceCounted { - Part(int n, int minSize) : number(n), writer(content.getWriteBuffer(minSize), nullptr, Unversioned()), length(0) { + Part(int n, int minSize) + : number(n), writer(content.getWriteBuffer(minSize), nullptr, Unversioned()), length(0) { etag = std::string(); ::MD5_Init(&content_md5_buf); } - virtual ~Part() { - etag.cancel(); - } + virtual ~Part() { etag.cancel(); } Future etag; int number; UnsentPacketQueue content; std::string md5string; PacketWriter writer; int length; - void write(const uint8_t *buf, int len) { + void write(const uint8_t* buf, int len) { writer.serializeBytes(buf, len); ::MD5_Update(&content_md5_buf, buf, len); length += len; } // MD5 sum can only be finalized once, further calls will do nothing so new writes will be reflected in the sum. void finalizeMD5() { - if(md5string.empty()) { + if (md5string.empty()) { std::string sumBytes; sumBytes.resize(16); - ::MD5_Final((unsigned char *)sumBytes.data(), &content_md5_buf); + ::MD5_Final((unsigned char*)sumBytes.data(), &content_md5_buf); md5string = base64::encoder::from_string(sumBytes); md5string.resize(md5string.size() - 1); } @@ -94,71 +94,75 @@ public: Future read(void* data, int length, int64_t offset) override { throw file_not_readable(); } - ACTOR static Future write_impl(Reference f, const uint8_t *data, int length) { - state Part *p = f->m_parts.back().getPtr(); - // If this write will cause the part to cross the min part size boundary then write to the boundary and start a new part. - while(p->length + length >= f->m_bstore->knobs.multipart_min_part_size) { + ACTOR static Future write_impl(Reference f, const uint8_t* data, int length) { + state Part* p = f->m_parts.back().getPtr(); + // If this write will cause the part to cross the min part size boundary then write to the boundary and start a + // new part. + while (p->length + length >= f->m_bstore->knobs.multipart_min_part_size) { // Finish off this part int finishlen = f->m_bstore->knobs.multipart_min_part_size - p->length; - p->write((const uint8_t *)data, finishlen); + p->write((const uint8_t*)data, finishlen); // Adjust source buffer args length -= finishlen; - data = (const uint8_t *)data + finishlen; + data = (const uint8_t*)data + finishlen; // End current part (and start new one) wait(f->endCurrentPart(f.getPtr(), true)); p = f->m_parts.back().getPtr(); } - p->write((const uint8_t *)data, length); + p->write((const uint8_t*)data, length); return Void(); } Future write(void const* data, int length, int64_t offset) override { - if(offset != m_cursor) - throw non_sequential_op(); + if (offset != m_cursor) throw non_sequential_op(); m_cursor += length; - return m_error.getFuture() || write_impl(Reference::addRef(this), (const uint8_t *)data, length); + return m_error.getFuture() || + write_impl(Reference::addRef(this), (const uint8_t*)data, length); } Future truncate(int64_t size) override { - if(size != m_cursor) - return non_sequential_op(); + if (size != m_cursor) return non_sequential_op(); return Void(); } - ACTOR static Future doPartUpload(AsyncFileBlobStoreWrite *f, Part *p) { + ACTOR static Future doPartUpload(AsyncFileS3BlobStoreWrite* f, Part* p) { p->finalizeMD5(); std::string upload_id = wait(f->getUploadID()); - std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string)); + std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, + p->length, p->md5string)); return etag; } - ACTOR static Future doFinishUpload(AsyncFileBlobStoreWrite* f) { + ACTOR static Future doFinishUpload(AsyncFileS3BlobStoreWrite* f) { // If there is only 1 part then it has not yet been uploaded so just write the whole file at once. - if(f->m_parts.size() == 1) { + if (f->m_parts.size() == 1) { Reference part = f->m_parts.back(); part->finalizeMD5(); - wait(f->m_bstore->writeEntireFileFromBuffer(f->m_bucket, f->m_object, &part->content, part->length, part->md5string)); + wait(f->m_bstore->writeEntireFileFromBuffer(f->m_bucket, f->m_object, &part->content, part->length, + part->md5string)); return Void(); } // There are at least 2 parts. End the last part (which could be empty) wait(f->endCurrentPart(f)); - state BlobStoreEndpoint::MultiPartSetT partSet; + state S3BlobStoreEndpoint::MultiPartSetT partSet; state std::vector>::iterator p; - // Wait for all the parts to be done to get their ETags, populate the partSet required to finish the object upload. - for(p = f->m_parts.begin(); p != f->m_parts.end(); ++p) { + // Wait for all the parts to be done to get their ETags, populate the partSet required to finish the object + // upload. + for (p = f->m_parts.begin(); p != f->m_parts.end(); ++p) { std::string tag = wait((*p)->etag); - if((*p)->length > 0) // The last part might be empty and has to be omitted. + if ((*p)->length > 0) // The last part might be empty and has to be omitted. partSet[(*p)->number] = tag; } - // No need to wait for the upload ID here because the above loop waited for all the parts and each part required the upload ID so it is ready + // No need to wait for the upload ID here because the above loop waited for all the parts and each part required + // the upload ID so it is ready wait(f->m_bstore->finishMultiPartUpload(f->m_bucket, f->m_object, f->m_upload_id.get(), partSet)); return Void(); @@ -167,43 +171,43 @@ public: // Ready once all data has been sent AND acknowledged from the remote side Future sync() override { // Only initiate the finish operation once, and also prevent further writing. - if(!m_finished.isValid()) { + if (!m_finished.isValid()) { m_finished = doFinishUpload(this); - m_cursor = -1; // Cause future write attempts to fail + m_cursor = -1; // Cause future write attempts to fail } return m_finished; } // - // Flush can't really do what the caller would "want" for a blob store file. The caller would probably notionally want - // all bytes written to be at least in transit to the blob store, but that is not very feasible. The blob store - // has a minimum size requirement for all but the final part, and parts must be sent with a header that specifies - // their size. So in the case of a write buffer that does not meet the part minimum size the part could be sent - // but then if there is any more data written then that part needs to be sent again in its entirety. So a client - // that calls flush often could generate far more blob store write traffic than they intend to. + // Flush can't really do what the caller would "want" for a blob store file. The caller would probably notionally + // want all bytes written to be at least in transit to the blob store, but that is not very feasible. The blob + // store has a minimum size requirement for all but the final part, and parts must be sent with a header that + // specifies their size. So in the case of a write buffer that does not meet the part minimum size the part could + // be sent but then if there is any more data written then that part needs to be sent again in its entirety. So a + // client that calls flush often could generate far more blob store write traffic than they intend to. Future flush() override { return Void(); } Future size() const override { return m_cursor; } Future readZeroCopy(void** data, int* length, int64_t offset) override { - TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "BlobStoreWrite"); + TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "S3BlobStoreWrite"); return platform_error(); } void releaseZeroCopy(void* data, int length, int64_t offset) override {} int64_t debugFD() const override { return -1; } - ~AsyncFileBlobStoreWrite() override { + ~AsyncFileS3BlobStoreWrite() override { m_upload_id.cancel(); m_finished.cancel(); - m_parts.clear(); // Contains futures + m_parts.clear(); // Contains futures } std::string getFilename() const override { return m_object; } private: - Reference m_bstore; + Reference m_bstore; std::string m_bucket; std::string m_object; @@ -216,48 +220,46 @@ private: FlowLock m_concurrentUploads; // End the current part and start uploading it, but also wait for a part to finish if too many are in transit. - ACTOR static Future endCurrentPart(AsyncFileBlobStoreWrite *f, bool startNew = false) { - if(f->m_parts.back()->length == 0) - return Void(); + ACTOR static Future endCurrentPart(AsyncFileS3BlobStoreWrite* f, bool startNew = false) { + if (f->m_parts.back()->length == 0) return Void(); // Wait for an upload slot to be available wait(f->m_concurrentUploads.take()); - // Do the upload, and if it fails forward errors to m_error and also stop if anything else sends an error to m_error - // Also, hold a releaser for the concurrent upload slot while all that is going on. + // Do the upload, and if it fails forward errors to m_error and also stop if anything else sends an error to + // m_error Also, hold a releaser for the concurrent upload slot while all that is going on. auto releaser = std::make_shared(f->m_concurrentUploads, 1); f->m_parts.back()->etag = holdWhile(std::move(releaser), joinErrorGroup(doPartUpload(f, f->m_parts.back().getPtr()), f->m_error)); // Make a new part to write to - if(startNew) - f->m_parts.push_back(Reference(new Part(f->m_parts.size() + 1, f->m_bstore->knobs.multipart_min_part_size))); + if (startNew) + f->m_parts.push_back( + Reference(new Part(f->m_parts.size() + 1, f->m_bstore->knobs.multipart_min_part_size))); return Void(); } Future getUploadID() { - if(!m_upload_id.isValid()) - m_upload_id = m_bstore->beginMultiPartUpload(m_bucket, m_object); + if (!m_upload_id.isValid()) m_upload_id = m_bstore->beginMultiPartUpload(m_bucket, m_object); return m_upload_id; } public: - AsyncFileBlobStoreWrite(Reference bstore, std::string bucket, std::string object) - : m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) { + AsyncFileS3BlobStoreWrite(Reference bstore, std::string bucket, std::string object) + : m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), + m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) { // Add first part m_parts.push_back(Reference(new Part(1, m_bstore->knobs.multipart_min_part_size))); } - }; - // This class represents a read-only file that lives in an S3-style blob store. It reads using the REST API. -class AsyncFileBlobStoreRead : public IAsyncFile, public ReferenceCounted { +class AsyncFileS3BlobStoreRead : public IAsyncFile, public ReferenceCounted { public: - virtual void addref() { ReferenceCounted::addref(); } - virtual void delref() { ReferenceCounted::delref(); } + virtual void addref() { ReferenceCounted::addref(); } + virtual void delref() { ReferenceCounted::delref(); } Future read(void* data, int length, int64_t offset) override; @@ -270,7 +272,7 @@ public: Future size() const override; Future readZeroCopy(void** data, int* length, int64_t offset) override { - TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "BlobStoreRead"); + TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "S3BlobStoreRead"); return platform_error(); } void releaseZeroCopy(void* data, int length, int64_t offset) override {} @@ -279,17 +281,15 @@ public: std::string getFilename() const override { return m_object; } - virtual ~AsyncFileBlobStoreRead() {} + virtual ~AsyncFileS3BlobStoreRead() {} - Reference m_bstore; + Reference m_bstore; std::string m_bucket; std::string m_object; mutable Future m_size; - AsyncFileBlobStoreRead(Reference bstore, std::string bucket, std::string object) - : m_bstore(bstore), m_bucket(bucket), m_object(object) { - } - + AsyncFileS3BlobStoreRead(Reference bstore, std::string bucket, std::string object) + : m_bstore(bstore), m_bucket(bucket), m_object(object) {} }; #include "flow/unactorcompiler.h" diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index 86c71ef70b..73b4e29a41 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -35,7 +35,7 @@ #include "fdbrpc/AsyncFileReadAhead.actor.h" #include "fdbrpc/simulator.h" #include "flow/Platform.h" -#include "fdbclient/AsyncFileBlobStore.actor.h" +#include "fdbclient/AsyncFileS3BlobStore.actor.h" #include "fdbclient/BackupContainerAzureBlobStore.h" #include "fdbclient/BackupContainerFileSystem.h" #include "fdbclient/BackupContainerLocalDirectory.h" @@ -265,9 +265,9 @@ Reference IBackupContainer::openContainer(const std::string& u std::string resource; // The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options. - BlobStoreEndpoint::ParametersT backupParams; - Reference bstore = - BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams); + S3BlobStoreEndpoint::ParametersT backupParams; + Reference bstore = + S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams); if (resource.empty()) throw backup_invalid_url(); for (auto c : resource) @@ -314,9 +314,9 @@ ACTOR Future> listContainers_impl(std::string baseURL) } else if (u.startsWith(LiteralStringRef("blobstore://"))) { std::string resource; - BlobStoreEndpoint::ParametersT backupParams; - Reference bstore = - BlobStoreEndpoint::fromString(baseURL, &resource, &IBackupContainer::lastOpenError, &backupParams); + S3BlobStoreEndpoint::ParametersT backupParams; + Reference bstore = + S3BlobStoreEndpoint::fromString(baseURL, &resource, &IBackupContainer::lastOpenError, &backupParams); if (!resource.empty()) { TraceEvent(SevWarn, "BackupContainer") diff --git a/fdbclient/BackupContainerS3BlobStore.actor.cpp b/fdbclient/BackupContainerS3BlobStore.actor.cpp index 400b3ba829..4713d87d21 100644 --- a/fdbclient/BackupContainerS3BlobStore.actor.cpp +++ b/fdbclient/BackupContainerS3BlobStore.actor.cpp @@ -18,7 +18,7 @@ * limitations under the License. */ -#include "fdbclient/AsyncFileBlobStore.actor.h" +#include "fdbclient/AsyncFileS3BlobStore.actor.h" #include "fdbclient/BackupContainerS3BlobStore.h" #include "fdbrpc/AsyncFileReadAhead.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -32,9 +32,9 @@ public: // number of slashes so the backup names are kept in a separate folder tree from their actual data. static const std::string INDEXFOLDER; - ACTOR static Future> listURLs(Reference bstore, std::string bucket) { + ACTOR static Future> listURLs(Reference bstore, std::string bucket) { state std::string basePath = INDEXFOLDER + '/'; - BlobStoreEndpoint::ListResult contents = wait(bstore->listObjects(bucket, basePath)); + S3BlobStoreEndpoint::ListResult contents = wait(bstore->listObjects(bucket, basePath)); std::vector results; for (auto& f : contents.objects) { results.push_back( @@ -79,7 +79,7 @@ public: return pathFilter(folderPath.substr(prefixTrim)); }; - state BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listObjects( + state S3BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listObjects( bc->m_bucket, bc->dataPath(path), '/', std::numeric_limits::max(), rawPathFilter)); BackupContainerFileSystem::FilesAndSizesT files; for (auto& o : result.objects) { @@ -130,8 +130,8 @@ std::string BackupContainerS3BlobStore::indexEntry() { return BackupContainerS3BlobStoreImpl::INDEXFOLDER + "/" + m_name; } -BackupContainerS3BlobStore::BackupContainerS3BlobStore(Reference bstore, const std::string& name, - const BlobStoreEndpoint::ParametersT& params) +BackupContainerS3BlobStore::BackupContainerS3BlobStore(Reference bstore, const std::string& name, + const S3BlobStoreEndpoint::ParametersT& params) : m_bstore(bstore), m_name(name), m_bucket("FDB_BACKUPS_V2") { // Currently only one parameter is supported, "bucket" @@ -156,24 +156,24 @@ void BackupContainerS3BlobStore::delref() { } std::string BackupContainerS3BlobStore::getURLFormat() { - return BlobStoreEndpoint::getURLFormat(true) + " (Note: The 'bucket' parameter is required.)"; + return S3BlobStoreEndpoint::getURLFormat(true) + " (Note: The 'bucket' parameter is required.)"; } Future> BackupContainerS3BlobStore::readFile(const std::string& path) { return Reference(new AsyncFileReadAheadCache( - Reference(new AsyncFileBlobStoreRead(m_bstore, m_bucket, dataPath(path))), + Reference(new AsyncFileS3BlobStoreRead(m_bstore, m_bucket, dataPath(path))), m_bstore->knobs.read_block_size, m_bstore->knobs.read_ahead_blocks, m_bstore->knobs.concurrent_reads_per_file, m_bstore->knobs.read_cache_blocks_per_file)); } -Future> BackupContainerS3BlobStore::listURLs(Reference bstore, +Future> BackupContainerS3BlobStore::listURLs(Reference bstore, const std::string& bucket) { return BackupContainerS3BlobStoreImpl::listURLs(bstore, bucket); } Future> BackupContainerS3BlobStore::writeFile(const std::string& path) { return Reference(new BackupContainerS3BlobStoreImpl::BackupFile( - path, Reference(new AsyncFileBlobStoreWrite(m_bstore, m_bucket, dataPath(path))))); + path, Reference(new AsyncFileS3BlobStoreWrite(m_bstore, m_bucket, dataPath(path))))); } Future BackupContainerS3BlobStore::deleteFile(const std::string& path) { diff --git a/fdbclient/BackupContainerS3BlobStore.h b/fdbclient/BackupContainerS3BlobStore.h index 52f70bf654..f9c811de92 100644 --- a/fdbclient/BackupContainerS3BlobStore.h +++ b/fdbclient/BackupContainerS3BlobStore.h @@ -22,12 +22,12 @@ #define FDBCLIENT_BACKUP_CONTAINER_S3_BLOBSTORE_H #pragma once -#include "fdbclient/AsyncFileBlobStore.actor.h" +#include "fdbclient/AsyncFileS3BlobStore.actor.h" #include "fdbclient/BackupContainerFileSystem.h" class BackupContainerS3BlobStore final : public BackupContainerFileSystem, ReferenceCounted { - Reference m_bstore; + Reference m_bstore; std::string m_name; // All backup data goes into a single bucket @@ -41,8 +41,8 @@ class BackupContainerS3BlobStore final : public BackupContainerFileSystem, friend class BackupContainerS3BlobStoreImpl; public: - BackupContainerS3BlobStore(Reference bstore, const std::string& name, - const BlobStoreEndpoint::ParametersT& params); + BackupContainerS3BlobStore(Reference bstore, const std::string& name, + const S3BlobStoreEndpoint::ParametersT& params); void addref() override; void delref() override; @@ -51,7 +51,7 @@ public: Future> readFile(const std::string& path) final; - static Future> listURLs(Reference bstore, const std::string& bucket); + static Future> listURLs(Reference bstore, const std::string& bucket); Future> writeFile(const std::string& path) final; diff --git a/fdbclient/BlobStore.actor.cpp b/fdbclient/BlobStore.actor.cpp deleted file mode 100644 index b29af56172..0000000000 --- a/fdbclient/BlobStore.actor.cpp +++ /dev/null @@ -1,1194 +0,0 @@ -/* - * BlobStore.actor.cpp - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "fdbclient/BlobStore.h" - -#include "fdbclient/md5/md5.h" -#include "fdbclient/libb64/encode.h" -#include "fdbclient/sha1/SHA1.h" -#include -#include -#include -#include "fdbrpc/IAsyncFile.h" -#include "fdbclient/rapidxml/rapidxml.hpp" -#include "flow/actorcompiler.h" // has to be last include - -using namespace rapidxml; - -json_spirit::mObject BlobStoreEndpoint::Stats::getJSON() { - json_spirit::mObject o; - - o["requests_failed"] = requests_failed; - o["requests_successful"] = requests_successful; - o["bytes_sent"] = bytes_sent; - - return o; -} - -BlobStoreEndpoint::Stats BlobStoreEndpoint::Stats::operator-(const Stats &rhs) { - Stats r; - r.requests_failed = requests_failed - rhs.requests_failed; - r.requests_successful = requests_successful - rhs.requests_successful; - r.bytes_sent = bytes_sent - rhs.bytes_sent; - return r; -} - -BlobStoreEndpoint::Stats BlobStoreEndpoint::s_stats; - -BlobStoreEndpoint::BlobKnobs::BlobKnobs() { - secure_connection = 1; - connect_tries = CLIENT_KNOBS->BLOBSTORE_CONNECT_TRIES; - connect_timeout = CLIENT_KNOBS->BLOBSTORE_CONNECT_TIMEOUT; - max_connection_life = CLIENT_KNOBS->BLOBSTORE_MAX_CONNECTION_LIFE; - request_tries = CLIENT_KNOBS->BLOBSTORE_REQUEST_TRIES; - request_timeout_min = CLIENT_KNOBS->BLOBSTORE_REQUEST_TIMEOUT_MIN; - requests_per_second = CLIENT_KNOBS->BLOBSTORE_REQUESTS_PER_SECOND; - concurrent_requests = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS; - list_requests_per_second = CLIENT_KNOBS->BLOBSTORE_LIST_REQUESTS_PER_SECOND; - write_requests_per_second = CLIENT_KNOBS->BLOBSTORE_WRITE_REQUESTS_PER_SECOND; - read_requests_per_second = CLIENT_KNOBS->BLOBSTORE_READ_REQUESTS_PER_SECOND; - delete_requests_per_second = CLIENT_KNOBS->BLOBSTORE_DELETE_REQUESTS_PER_SECOND; - multipart_max_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MAX_PART_SIZE; - multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE; - concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS; - concurrent_lists = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_LISTS; - concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE; - concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE; - read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE; - read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS; - read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE; - max_send_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_SEND_BYTES_PER_SECOND; - max_recv_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_RECV_BYTES_PER_SECOND; -} - -bool BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) { - #define TRY_PARAM(n, sn) if(name == LiteralStringRef(#n) || name == LiteralStringRef(#sn)) { n = value; return true; } - TRY_PARAM(secure_connection, sc) - TRY_PARAM(connect_tries, ct); - TRY_PARAM(connect_timeout, cto); - TRY_PARAM(max_connection_life, mcl); - TRY_PARAM(request_tries, rt); - TRY_PARAM(request_timeout_min, rtom); - // TODO: For backward compatibility because request_timeout was renamed to request_timeout_min - if(name == LiteralStringRef("request_timeout") || name == LiteralStringRef("rto")) { request_timeout_min = value; return true; } - TRY_PARAM(requests_per_second, rps); - TRY_PARAM(list_requests_per_second, lrps); - TRY_PARAM(write_requests_per_second, wrps); - TRY_PARAM(read_requests_per_second, rrps); - TRY_PARAM(delete_requests_per_second, drps); - TRY_PARAM(concurrent_requests, cr); - TRY_PARAM(multipart_max_part_size, maxps); - TRY_PARAM(multipart_min_part_size, minps); - TRY_PARAM(concurrent_uploads, cu); - TRY_PARAM(concurrent_lists, cl); - TRY_PARAM(concurrent_reads_per_file, crpf); - TRY_PARAM(concurrent_writes_per_file, cwpf); - TRY_PARAM(read_block_size, rbs); - TRY_PARAM(read_ahead_blocks, rab); - TRY_PARAM(read_cache_blocks_per_file, rcb); - TRY_PARAM(max_send_bytes_per_second, sbps); - TRY_PARAM(max_recv_bytes_per_second, rbps); - #undef TRY_PARAM - return false; -} - -// Returns a Blob URL parameter string that specifies all of the non-default options for the endpoint using option short names. -std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const { - static BlobKnobs defaults; - std::string r; - #define _CHECK_PARAM(n, sn) if(n != defaults. n) { r += format("%s%s=%d", r.empty() ? "" : "&", #sn, n); } - _CHECK_PARAM(secure_connection, sc); - _CHECK_PARAM(connect_tries, ct); - _CHECK_PARAM(connect_timeout, cto); - _CHECK_PARAM(max_connection_life, mcl); - _CHECK_PARAM(request_tries, rt); - _CHECK_PARAM(request_timeout_min, rto); - _CHECK_PARAM(requests_per_second, rps); - _CHECK_PARAM(list_requests_per_second, lrps); - _CHECK_PARAM(write_requests_per_second, wrps); - _CHECK_PARAM(read_requests_per_second, rrps); - _CHECK_PARAM(delete_requests_per_second, drps); - _CHECK_PARAM(concurrent_requests, cr); - _CHECK_PARAM(multipart_max_part_size, maxps); - _CHECK_PARAM(multipart_min_part_size, minps); - _CHECK_PARAM(concurrent_uploads, cu); - _CHECK_PARAM(concurrent_lists, cl); - _CHECK_PARAM(concurrent_reads_per_file, crpf); - _CHECK_PARAM(concurrent_writes_per_file, cwpf); - _CHECK_PARAM(read_block_size, rbs); - _CHECK_PARAM(read_ahead_blocks, rab); - _CHECK_PARAM(read_cache_blocks_per_file, rcb); - _CHECK_PARAM(max_send_bytes_per_second, sbps); - _CHECK_PARAM(max_recv_bytes_per_second, rbps); - #undef _CHECK_PARAM - return r; -} - -Reference BlobStoreEndpoint::fromString(std::string const &url, std::string *resourceFromURL, std::string *error, ParametersT *ignored_parameters) { - if(resourceFromURL) - resourceFromURL->clear(); - - try { - StringRef t(url); - StringRef prefix = t.eat("://"); - if(prefix != LiteralStringRef("blobstore")) - throw format("Invalid blobstore URL prefix '%s'", prefix.toString().c_str()); - StringRef cred = t.eat("@"); - uint8_t foundSeparator = 0; - StringRef hostPort = t.eatAny("/?", &foundSeparator); - StringRef resource; - if(foundSeparator == '/') { - resource = t.eat("?"); - } - - // hostPort is at least a host or IP address, optionally followed by :portNumber or :serviceName - StringRef h(hostPort); - StringRef host = h.eat(":"); - if(host.size() == 0) - throw std::string("host cannot be empty"); - - StringRef service = h.eat(); - - BlobKnobs knobs; - HTTP::Headers extraHeaders; - while(1) { - StringRef name = t.eat("="); - if(name.size() == 0) - break; - StringRef value = t.eat("&"); - - // Special case for header - if(name == LiteralStringRef("header")) { - StringRef originalValue = value; - StringRef headerFieldName = value.eat(":"); - StringRef headerFieldValue = value; - if(headerFieldName.size() == 0 || headerFieldValue.size() == 0) { - throw format("'%s' is not a valid value for '%s' parameter. Format is : where strings are not empty.", originalValue.toString().c_str(), name.toString().c_str()); - } - std::string &fieldValue = extraHeaders[headerFieldName.toString()]; - // RFC 2616 section 4.2 says header field names can repeat but only if it is valid to concatenate their values with comma separation - if(!fieldValue.empty()) { - fieldValue.append(","); - } - fieldValue.append(headerFieldValue.toString()); - continue; - } - - // See if the parameter is a knob - // First try setting a dummy value (all knobs are currently numeric) just to see if this parameter is known to BlobStoreEndpoint. - // If it is, then we will set it to a good value or throw below, so the dummy set has no bad side effects. - bool known = knobs.set(name, 0); - - // If the parameter is not known to BlobStoreEndpoint then throw unless there is an ignored_parameters set to add it to - if(!known) { - if(ignored_parameters == nullptr) { - throw format("%s is not a valid parameter name", name.toString().c_str()); - } - (*ignored_parameters)[name.toString()] = value.toString(); - continue; - } - - // The parameter is known to BlobStoreEndpoint so it must be numeric and valid. - char *valueEnd; - int ivalue = strtol(value.toString().c_str(), &valueEnd, 10); - if(*valueEnd || (ivalue == 0 && value.toString() != "0")) - throw format("%s is not a valid value for %s", value.toString().c_str(), name.toString().c_str()); - - // It should not be possible for this set to fail now since the dummy set above had to have worked. - ASSERT(knobs.set(name, ivalue)); - } - - if(resourceFromURL != nullptr) - *resourceFromURL = resource.toString(); - - StringRef c(cred); - StringRef key = c.eat(":"); - StringRef secret = c.eat(); - - return Reference(new BlobStoreEndpoint(host.toString(), service.toString(), key.toString(), secret.toString(), knobs, extraHeaders)); - - } catch(std::string &err) { - if(error != nullptr) - *error = err; - TraceEvent(SevWarnAlways, "BlobStoreEndpointBadURL").suppressFor(60).detail("Description", err).detail("Format", getURLFormat()).detail("URL", url); - throw backup_invalid_url(); - } -} - -std::string BlobStoreEndpoint::getResourceURL(std::string resource, std::string params) { - std::string hostPort = host; - if(!service.empty()) { - hostPort.append(":"); - hostPort.append(service); - } - - // If secret isn't being looked up from credentials files then it was passed explicitly in th URL so show it here. - std::string s; - if(!lookupSecret) - s = std::string(":") + secret; - - std::string r = format("blobstore://%s%s@%s/%s", key.c_str(), s.c_str(), hostPort.c_str(), resource.c_str()); - - // Get params that are deviations from knob defaults - std::string knobParams = knobs.getURLParameters(); - if(!knobParams.empty()) { - if(!params.empty()) { - params.append("&"); - } - params.append(knobParams); - } - - for(auto &kv : extraHeaders) { - if(!params.empty()) { - params.append("&"); - } - params.append("header="); - params.append(HTTP::urlEncode(kv.first)); - params.append(":"); - params.append(HTTP::urlEncode(kv.second)); - } - - if(!params.empty()) - r.append("?").append(params); - - return r; -} - -ACTOR Future bucketExists_impl(Reference b, std::string bucket) { - wait(b->requestRateRead->getAllowance(1)); - - std::string resource = std::string("/") + bucket; - HTTP::Headers headers; - - Reference r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, {200, 404})); - return r->code == 200; -} - -Future BlobStoreEndpoint::bucketExists(std::string const &bucket) { - return bucketExists_impl(Reference::addRef(this), bucket); -} - -ACTOR Future objectExists_impl(Reference b, std::string bucket, std::string object) { - wait(b->requestRateRead->getAllowance(1)); - - std::string resource = std::string("/") + bucket + "/" + object; - HTTP::Headers headers; - - Reference r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, {200, 404})); - return r->code == 200; -} - -Future BlobStoreEndpoint::objectExists(std::string const &bucket, std::string const &object) { - return objectExists_impl(Reference::addRef(this), bucket, object); -} - -ACTOR Future deleteObject_impl(Reference b, std::string bucket, std::string object) { - wait(b->requestRateDelete->getAllowance(1)); - - std::string resource = std::string("/") + bucket + "/" + object; - HTTP::Headers headers; - // 200 or 204 means object successfully deleted, 404 means it already doesn't exist, so any of those are considered successful - Reference r = wait(b->doRequest("DELETE", resource, headers, nullptr, 0, {200, 204, 404})); - - // But if the object already did not exist then the 'delete' is assumed to be successful but a warning is logged. - if(r->code == 404) { - TraceEvent(SevWarnAlways, "BlobStoreEndpointDeleteObjectMissing") - .detail("Host", b->host) - .detail("Bucket", bucket) - .detail("Object", object); - } - - return Void(); -} - -Future BlobStoreEndpoint::deleteObject(std::string const &bucket, std::string const &object) { - return deleteObject_impl(Reference::addRef(this), bucket, object); -} - -ACTOR Future deleteRecursively_impl(Reference b, std::string bucket, std::string prefix, int *pNumDeleted, int64_t *pBytesDeleted) { - state PromiseStream resultStream; - // Start a recursive parallel listing which will send results to resultStream as they are received - state Future done = b->listObjectsStream(bucket, resultStream, prefix, '/', std::numeric_limits::max()); - // Wrap done in an actor which will send end_of_stream since listObjectsStream() does not (so that many calls can write to the same stream) - done = map(done, [=](Void) { - resultStream.sendError(end_of_stream()); - return Void(); - }); - - state std::list> deleteFutures; - try { - loop { - choose { - // Throw if done throws, otherwise don't stop until end_of_stream - when(wait(done)) { - done = Never(); - } - - when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) { - for(auto &object : list.objects) { - deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [=](Void) -> Void { - if(pNumDeleted != nullptr) { - ++*pNumDeleted; - } - if(pBytesDeleted != nullptr) { - *pBytesDeleted += object.size; - } - return Void(); - })); - } - } - } - - // This is just a precaution to avoid having too many outstanding delete actors waiting to run - while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) { - wait(deleteFutures.front()); - deleteFutures.pop_front(); - } - } - } catch(Error &e) { - if(e.code() != error_code_end_of_stream) - throw; - } - - while(deleteFutures.size() > 0) { - wait(deleteFutures.front()); - deleteFutures.pop_front(); - } - - return Void(); -} - -Future BlobStoreEndpoint::deleteRecursively(std::string const &bucket, std::string prefix, int *pNumDeleted, int64_t *pBytesDeleted) { - return deleteRecursively_impl(Reference::addRef(this), bucket, prefix, pNumDeleted, pBytesDeleted); -} - -ACTOR Future createBucket_impl(Reference b, std::string bucket) { - wait(b->requestRateWrite->getAllowance(1)); - - bool exists = wait(b->bucketExists(bucket)); - if(!exists) { - std::string resource = std::string("/") + bucket; - HTTP::Headers headers; - Reference r = wait(b->doRequest("PUT", resource, headers, nullptr, 0, {200, 409})); - } - return Void(); -} - -Future BlobStoreEndpoint::createBucket(std::string const &bucket) { - return createBucket_impl(Reference::addRef(this), bucket); -} - -ACTOR Future objectSize_impl(Reference b, std::string bucket, std::string object) { - wait(b->requestRateRead->getAllowance(1)); - - std::string resource = std::string("/") + bucket + "/" + object; - HTTP::Headers headers; - - Reference r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, {200, 404})); - if(r->code == 404) - throw file_not_found(); - return r->contentLen; -} - -Future BlobStoreEndpoint::objectSize(std::string const &bucket, std::string const &object) { - return objectSize_impl(Reference::addRef(this), bucket, object); -} - -// Try to read a file, parse it as JSON, and return the resulting document. -// It will NOT throw if any errors are encountered, it will just return an empty -// JSON object and will log trace events for the errors encountered. -ACTOR Future> tryReadJSONFile(std::string path) { - state std::string content; - - // Event type to be logged in the event of an exception - state const char *errorEventType = "BlobCredentialFileError"; - - try { - state Reference f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0)); - state int64_t size = wait(f->size()); - state Standalone buf = makeString(size); - int r = wait(f->read(mutateString(buf), size, 0)); - ASSERT(r == size); - content = buf.toString(); - - // Any exceptions from hehre forward are parse failures - errorEventType = "BlobCredentialFileParseFailed"; - json_spirit::mValue json; - json_spirit::read_string(content, json); - if(json.type() == json_spirit::obj_type) - return json.get_obj(); - else - TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").suppressFor(60).detail("File", path); - - } catch(Error &e) { - if(e.code() != error_code_actor_cancelled) - TraceEvent(SevWarn, errorEventType).error(e).suppressFor(60).detail("File", path); - } - - return Optional(); -} - -ACTOR Future updateSecret_impl(Reference b) { - std::vector *pFiles = (std::vector *)g_network->global(INetwork::enBlobCredentialFiles); - if(pFiles == nullptr) - return Void(); - - state std::vector>> reads; - for(auto &f : *pFiles) - reads.push_back(tryReadJSONFile(f)); - - wait(waitForAll(reads)); - - std::string key = b->key + "@" + b->host; - - int invalid = 0; - - for(auto &f : reads) { - // If value not present then the credentials file wasn't readable or valid. Continue to check other results. - if(!f.get().present()) { - ++invalid; - continue; - } - - JSONDoc doc(f.get().get()); - if(doc.has("accounts") && doc.last().type() == json_spirit::obj_type) { - JSONDoc accounts(doc.last().get_obj()); - if(accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) { - JSONDoc account(accounts.last()); - std::string secret; - // Once we find a matching account, use it. - if(account.tryGet("secret", secret)) { - b->secret = secret; - return Void(); - } - } - } - } - - // If any sources were invalid - if(invalid > 0) - throw backup_auth_unreadable(); - - // All sources were valid but didn't contain the desired info - throw backup_auth_missing(); -} - -Future BlobStoreEndpoint::updateSecret() { - return updateSecret_impl(Reference::addRef(this)); -} - -ACTOR Future connect_impl(Reference b) { - // First try to get a connection from the pool - while(!b->connectionPool.empty()) { - BlobStoreEndpoint::ReusableConnection rconn = b->connectionPool.front(); - b->connectionPool.pop(); - - // If the connection expires in the future then return it - if(rconn.expirationTime > now()) { - TraceEvent("BlobStoreEndpointReusingConnected").suppressFor(60) - .detail("RemoteEndpoint", rconn.conn->getPeerAddress()) - .detail("ExpiresIn", rconn.expirationTime - now()); - return rconn; - } - } - std::string service = b->service; - if (service.empty()) - service = b->knobs.secure_connection ? "https" : "http"; - state Reference conn = wait(INetworkConnections::net()->connect(b->host, service, b->knobs.secure_connection ? true : false)); - wait(conn->connectHandshake()); - - TraceEvent("BlobStoreEndpointNewConnection").suppressFor(60) - .detail("RemoteEndpoint", conn->getPeerAddress()) - .detail("ExpiresIn", b->knobs.max_connection_life); - - if(b->lookupSecret) - wait(b->updateSecret()); - - return BlobStoreEndpoint::ReusableConnection({conn, now() + b->knobs.max_connection_life}); -} - -Future BlobStoreEndpoint::connect() { - return connect_impl(Reference::addRef(this)); -} - -void BlobStoreEndpoint::returnConnection(ReusableConnection &rconn) { - // If it expires in the future then add it to the pool in the front - if(rconn.expirationTime > now()) - connectionPool.push(rconn); - rconn.conn = Reference(); -} - -// Do a request, get a Response. -// Request content is provided as UnsentPacketQueue *pContent which will be depleted as bytes are sent but the queue itself must live for the life of this actor -// and be destroyed by the caller -ACTOR Future> doRequest_impl(Reference bstore, std::string verb, std::string resource, HTTP::Headers headers, UnsentPacketQueue *pContent, int contentLen, std::set successCodes) { - state UnsentPacketQueue contentCopy; - - headers["Content-Length"] = format("%d", contentLen); - headers["Host"] = bstore->host; - headers["Accept"] = "application/xml"; - - // Merge extraHeaders into headers - for(auto &kv : bstore->extraHeaders) { - std::string &fieldValue = headers[kv.first]; - if(!fieldValue.empty()) { - fieldValue.append(","); - } - fieldValue.append(kv.second); - } - - // For requests with content to upload, the request timeout should be at least twice the amount of time - // it would take to upload the content given the upload bandwidth and concurrency limits. - int bandwidthThisRequest = 1 + bstore->knobs.max_send_bytes_per_second / bstore->knobs.concurrent_uploads; - int contentUploadSeconds = contentLen / bandwidthThisRequest; - state int requestTimeout = std::max(bstore->knobs.request_timeout_min, 3 * contentUploadSeconds); - - wait(bstore->concurrentRequests.take()); - state FlowLock::Releaser globalReleaser(bstore->concurrentRequests, 1); - - state int maxTries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries); - state int thisTry = 1; - state double nextRetryDelay = 2.0; - - loop { - state Optional err; - state Optional remoteAddress; - state bool connectionEstablished = false; - state Reference r; - - try { - // Start connecting - Future frconn = bstore->connect(); - - // Make a shallow copy of the queue by calling addref() on each buffer in the chain and then prepending that chain to contentCopy - contentCopy.discardAll(); - if(pContent != nullptr) { - PacketBuffer *pFirst = pContent->getUnsent(); - PacketBuffer *pLast = nullptr; - for(PacketBuffer *p = pFirst; p != nullptr; p = p->nextPacketBuffer()) { - p->addref(); - // Also reset the sent count on each buffer - p->bytes_sent = 0; - pLast = p; - } - contentCopy.prependWriteBuffer(pFirst, pLast); - } - - // Finish connecting, do request - state BlobStoreEndpoint::ReusableConnection rconn = wait(timeoutError(frconn, bstore->knobs.connect_timeout)); - connectionEstablished = true; - - // Finish/update the request headers (which includes Date header) - // This must be done AFTER the connection is ready because if credentials are coming from disk they are refreshed - // when a new connection is established and setAuthHeaders() would need the updated secret. - bstore->setAuthHeaders(verb, resource, headers); - remoteAddress = rconn.conn->getPeerAddress(); - wait(bstore->requestRate->getAllowance(1)); - Reference _r = wait(timeoutError(HTTP::doRequest(rconn.conn, verb, resource, headers, &contentCopy, contentLen, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate), requestTimeout)); - r = _r; - - // Since the response was parsed successfully (which is why we are here) reuse the connection unless we received the "Connection: close" header. - if(r->headers["Connection"] != "close") - bstore->returnConnection(rconn); - rconn.conn.clear(); - - } catch(Error &e) { - if(e.code() == error_code_actor_cancelled) - throw; - err = e; - } - - // If err is not present then r is valid. - // If r->code is in successCodes then record the successful request and return r. - if(!err.present() && successCodes.count(r->code) != 0) { - bstore->s_stats.requests_successful++; - return r; - } - - // Otherwise, this request is considered failed. Update failure count. - bstore->s_stats.requests_failed++; - - // All errors in err are potentially retryable as well as certain HTTP response codes... - bool retryable = err.present() || r->code == 500 || r->code == 502 || r->code == 503 || r->code == 429; - - // But only if our previous attempt was not the last allowable try. - retryable = retryable && (thisTry < maxTries); - - TraceEvent event(SevWarn, retryable ? "BlobStoreEndpointRequestFailedRetryable" : "BlobStoreEndpointRequestFailed"); - - // Attach err to trace event if present, otherwise extract some stuff from the response - if(err.present()) { - event.error(err.get()); - } - event.suppressFor(60); - if(!err.present()) { - event.detail("ResponseCode", r->code); - } - - event.detail("ConnectionEstablished", connectionEstablished); - - if(remoteAddress.present()) - event.detail("RemoteEndpoint", remoteAddress.get()); - else - event.detail("RemoteHost", bstore->host); - - event.detail("Verb", verb) - .detail("Resource", resource) - .detail("ThisTry", thisTry); - - // If r is not valid or not code 429 then increment the try count. 429's will not count against the attempt limit. - if(!r || r->code != 429) - ++thisTry; - - // We will wait delay seconds before the next retry, start with nextRetryDelay. - double delay = nextRetryDelay; - // Double but limit the *next* nextRetryDelay. - nextRetryDelay = std::min(nextRetryDelay * 2, 60.0); - - if(retryable) { - // If r is valid then obey the Retry-After response header if present. - if(r) { - auto iRetryAfter = r->headers.find("Retry-After"); - if(iRetryAfter != r->headers.end()) { - event.detail("RetryAfterHeader", iRetryAfter->second); - char *pEnd; - double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd); - if(*pEnd) // If there were other characters then don't trust the parsed value, use a probably safe value of 5 minutes. - retryAfter = 300; - // Update delay - delay = std::max(delay, retryAfter); - } - } - - // Log the delay then wait. - event.detail("RetryDelay", delay); - wait(::delay(delay)); - } - else { - // We can't retry, so throw something. - - // This error code means the authentication header was not accepted, likely the account or key is wrong. - if(r && r->code == 406) - throw http_not_accepted(); - - if(r && r->code == 401) - throw http_auth_failed(); - - // Recognize and throw specific errors - if(err.present()) { - int code = err.get().code(); - - // If we get a timed_out error during the the connect() phase, we'll call that connection_failed despite the fact that - // there was technically never a 'connection' to begin with. It differentiates between an active connection - // timing out vs a connection timing out, though not between an active connection failing vs connection attempt failing. - // TODO: Add more error types? - if(code == error_code_timed_out && !connectionEstablished) - throw connection_failed(); - - if(code == error_code_timed_out || code == error_code_connection_failed || code == error_code_lookup_failed) - throw err.get(); - } - - throw http_request_failed(); - } - } -} - -Future> BlobStoreEndpoint::doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set successCodes) { - return doRequest_impl(Reference::addRef(this), verb, resource, headers, pContent, contentLen, successCodes); -} - -ACTOR Future listObjectsStream_impl(Reference bstore, std::string bucket, PromiseStream results, Optional prefix, Optional delimiter, int maxDepth, std::function recurseFilter) { - // Request 1000 keys at a time, the maximum allowed - state std::string resource = "/"; - resource.append(bucket); - resource.append("/?max-keys=1000"); - if(prefix.present()) - resource.append("&prefix=").append(HTTP::urlEncode(prefix.get())); - if(delimiter.present()) - resource.append("&delimiter=").append(HTTP::urlEncode(std::string(1, delimiter.get()))); - resource.append("&marker="); - state std::string lastFile; - state bool more = true; - - state std::vector> subLists; - - while(more) { - wait(bstore->concurrentLists.take()); - state FlowLock::Releaser listReleaser(bstore->concurrentLists, 1); - - HTTP::Headers headers; - state std::string fullResource = resource + HTTP::urlEncode(lastFile); - lastFile.clear(); - Reference r = wait(bstore->doRequest("GET", fullResource, headers, nullptr, 0, {200})); - listReleaser.release(); - - try { - BlobStoreEndpoint::ListResult listResult; - xml_document<> doc; - - // Copy content because rapidxml will modify it during parse - std::string content = r->content; - doc.parse<0>((char *)content.c_str()); - - // There should be exactly one node - xml_node<> *result = doc.first_node(); - if(result == nullptr || strcmp(result->name(), "ListBucketResult") != 0) { - throw http_bad_response(); - } - - xml_node<> *n = result->first_node(); - while(n != nullptr) { - const char *name = n->name(); - if(strcmp(name, "IsTruncated") == 0) { - const char *val = n->value(); - if(strcmp(val, "true") == 0) { - more = true; - } - else if(strcmp(val, "false") == 0) { - more = false; - } - else { - throw http_bad_response(); - } - } - else if(strcmp(name, "Contents") == 0) { - BlobStoreEndpoint::ObjectInfo object; - - xml_node<> *key = n->first_node("Key"); - if(key == nullptr) { - throw http_bad_response(); - } - object.name = key->value(); - - xml_node<> *size = n->first_node("Size"); - if(size == nullptr) { - throw http_bad_response(); - } - object.size = strtoull(size->value(), nullptr, 10); - - listResult.objects.push_back(object); - } - else if(strcmp(name, "CommonPrefixes") == 0) { - xml_node<> *prefixNode = n->first_node("Prefix"); - while(prefixNode != nullptr) { - const char *prefix = prefixNode->value(); - // If recursing, queue a sub-request, otherwise add the common prefix to the result. - if(maxDepth > 0) { - // If there is no recurse filter or the filter returns true then start listing the subfolder - if(!recurseFilter || recurseFilter(prefix)) { - subLists.push_back(bstore->listObjectsStream(bucket, results, prefix, delimiter, maxDepth - 1, recurseFilter)); - } - // Since prefix will not be in the final listResult below we have to set lastFile here in case it's greater than the last object - lastFile = prefix; - } - else { - listResult.commonPrefixes.push_back(prefix); - } - - prefixNode = prefixNode->next_sibling("Prefix"); - } - } - - n = n->next_sibling(); - } - - results.send(listResult); - - if(more) { - // lastFile will be the last commonprefix for which a sublist was started, if any. - // If there are any objects and the last one is greater than lastFile then make it the new lastFile. - if(!listResult.objects.empty() && lastFile < listResult.objects.back().name) { - lastFile = listResult.objects.back().name; - } - // If there are any common prefixes and the last one is greater than lastFile then make it the new lastFile. - if(!listResult.commonPrefixes.empty() && lastFile < listResult.commonPrefixes.back()) { - lastFile = listResult.commonPrefixes.back(); - } - - // If lastFile is empty at this point, something has gone wrong. - if(lastFile.empty()) { - TraceEvent(SevWarn, "BlobStoreEndpointListNoNextMarker").suppressFor(60).detail("Resource", fullResource); - throw http_bad_response(); - } - } - } catch(Error &e) { - if(e.code() != error_code_actor_cancelled) - TraceEvent(SevWarn, "BlobStoreEndpointListResultParseError").error(e).suppressFor(60).detail("Resource", fullResource); - throw http_bad_response(); - } - } - - wait(waitForAll(subLists)); - - return Void(); -} - -Future BlobStoreEndpoint::listObjectsStream(std::string const &bucket, PromiseStream results, Optional prefix, Optional delimiter, int maxDepth, std::function recurseFilter) { - return listObjectsStream_impl(Reference::addRef(this), bucket, results, prefix, delimiter, maxDepth, recurseFilter); -} - -ACTOR Future listObjects_impl(Reference bstore, std::string bucket, Optional prefix, Optional delimiter, int maxDepth, std::function recurseFilter) { - state BlobStoreEndpoint::ListResult results; - state PromiseStream resultStream; - state Future done = bstore->listObjectsStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter); - // Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same stream - done = map(done, [=](Void) { - resultStream.sendError(end_of_stream()); - return Void(); - }); - - try { - loop { - choose { - // Throw if done throws, otherwise don't stop until end_of_stream - when(wait(done)) { - done = Never(); - } - - when(BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) { - results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end()); - results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end()); - } - } - } - } catch(Error &e) { - if(e.code() != error_code_end_of_stream) - throw; - } - - return results; -} - -Future BlobStoreEndpoint::listObjects(std::string const &bucket, Optional prefix, Optional delimiter, int maxDepth, std::function recurseFilter) { - return listObjects_impl(Reference::addRef(this), bucket, prefix, delimiter, maxDepth, recurseFilter); -} - -ACTOR Future> listBuckets_impl(Reference bstore) { - state std::string resource = "/?marker="; - state std::string lastName; - state bool more = true; - state std::vector buckets; - - while(more) { - wait(bstore->concurrentLists.take()); - state FlowLock::Releaser listReleaser(bstore->concurrentLists, 1); - - HTTP::Headers headers; - state std::string fullResource = resource + HTTP::urlEncode(lastName); - Reference r = wait(bstore->doRequest("GET", fullResource, headers, nullptr, 0, {200})); - listReleaser.release(); - - try { - xml_document<> doc; - - // Copy content because rapidxml will modify it during parse - std::string content = r->content; - doc.parse<0>((char *)content.c_str()); - - // There should be exactly one node - xml_node<> *result = doc.first_node(); - if(result == nullptr || strcmp(result->name(), "ListAllMyBucketsResult") != 0) { - throw http_bad_response(); - } - - more = false; - xml_node<> *truncated = result->first_node("IsTruncated"); - if(truncated != nullptr && strcmp(truncated->value(), "true") == 0) { - more = true; - } - - xml_node<> *bucketsNode = result->first_node("Buckets"); - if(bucketsNode != nullptr) { - xml_node<> *bucketNode = bucketsNode->first_node("Bucket"); - while(bucketNode != nullptr) { - xml_node<> *nameNode = bucketNode->first_node("Name"); - if(nameNode == nullptr) { - throw http_bad_response(); - } - const char *name = nameNode->value(); - buckets.push_back(name); - - bucketNode = bucketNode->next_sibling("Bucket"); - } - } - - if(more) { - lastName = buckets.back(); - } - - } catch(Error &e) { - if(e.code() != error_code_actor_cancelled) - TraceEvent(SevWarn, "BlobStoreEndpointListBucketResultParseError").error(e).suppressFor(60).detail("Resource", fullResource); - throw http_bad_response(); - } - } - - return buckets; -} - -Future> BlobStoreEndpoint::listBuckets() { - return listBuckets_impl(Reference::addRef(this)); -} - -std::string BlobStoreEndpoint::hmac_sha1(std::string const &msg) { - std::string key = secret; - - // First pad the key to 64 bytes. - key.append(64 - key.size(), '\0'); - - std::string kipad = key; - for(int i = 0; i < 64; ++i) - kipad[i] ^= '\x36'; - - std::string kopad = key; - for(int i = 0; i < 64; ++i) - kopad[i] ^= '\x5c'; - - kipad.append(msg); - std::string hkipad = SHA1::from_string(kipad); - kopad.append(hkipad); - return SHA1::from_string(kopad); -} - -void BlobStoreEndpoint::setAuthHeaders(std::string const &verb, std::string const &resource, HTTP::Headers& headers) { - std::string &date = headers["Date"]; - - char dateBuf[20]; - time_t ts; - time(&ts); - // ISO 8601 format YYYYMMDD'T'HHMMSS'Z' - strftime(dateBuf, 20, "%Y%m%dT%H%M%SZ", gmtime(&ts)); - date = dateBuf; - - std::string msg; - msg.append(verb); - msg.append("\n"); - auto contentMD5 = headers.find("Content-MD5"); - if(contentMD5 != headers.end()) - msg.append(contentMD5->second); - msg.append("\n"); - auto contentType = headers.find("Content-Type"); - if(contentType != headers.end()) - msg.append(contentType->second); - msg.append("\n"); - msg.append(date); - msg.append("\n"); - for(auto h : headers) { - StringRef name = h.first; - if(name.startsWith(LiteralStringRef("x-amz")) || - name.startsWith(LiteralStringRef("x-icloud"))) { - msg.append(h.first); - msg.append(":"); - msg.append(h.second); - msg.append("\n"); - } - } - - msg.append(resource); - if(verb == "GET") { - size_t q = resource.find_last_of('?'); - if(q != resource.npos) - msg.resize(msg.size() - (resource.size() - q)); - } - - std::string sig = base64::encoder::from_string(hmac_sha1(msg)); - // base64 encoded blocks end in \n so remove it. - sig.resize(sig.size() - 1); - std::string auth = "AWS "; - auth.append(key); - auth.append(":"); - auth.append(sig); - headers["Authorization"] = auth; -} - -ACTOR Future readEntireFile_impl(Reference bstore, std::string bucket, std::string object) { - wait(bstore->requestRateRead->getAllowance(1)); - - std::string resource = std::string("/") + bucket + "/" + object; - HTTP::Headers headers; - Reference r = wait(bstore->doRequest("GET", resource, headers, nullptr, 0, {200, 404})); - if(r->code == 404) - throw file_not_found(); - return r->content; -} - -Future BlobStoreEndpoint::readEntireFile(std::string const &bucket, std::string const &object) { - return readEntireFile_impl(Reference::addRef(this), bucket, object); -} - -ACTOR Future writeEntireFileFromBuffer_impl(Reference bstore, std::string bucket, std::string object, UnsentPacketQueue *pContent, int contentLen, std::string contentMD5) { - if(contentLen > bstore->knobs.multipart_max_part_size) - throw file_too_large(); - - wait(bstore->requestRateWrite->getAllowance(1)); - wait(bstore->concurrentUploads.take()); - state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1); - - std::string resource = std::string("/") + bucket + "/" + object; - HTTP::Headers headers; - // Send MD5 sum for content so blobstore can verify it - headers["Content-MD5"] = contentMD5; - state Reference r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200})); - - // For uploads, Blobstore returns an MD5 sum of uploaded content so check it. - if (!r->verifyMD5(false, contentMD5)) - throw checksum_failed(); - - return Void(); -} - -ACTOR Future writeEntireFile_impl(Reference bstore, std::string bucket, std::string object, std::string content) { - state UnsentPacketQueue packets; - PacketWriter pw(packets.getWriteBuffer(content.size()), nullptr, Unversioned()); - pw.serializeBytes(content); - if(content.size() > bstore->knobs.multipart_max_part_size) - throw file_too_large(); - - // Yield because we may have just had to copy several MB's into packet buffer chain and next we have to calculate an MD5 sum of it. - // TODO: If this actor is used to send large files then combine the summing and packetization into a loop with a yield() every 20k or so. - wait(yield()); - - MD5_CTX sum; - ::MD5_Init(&sum); - ::MD5_Update(&sum, content.data(), content.size()); - std::string sumBytes; - sumBytes.resize(16); - ::MD5_Final((unsigned char *)sumBytes.data(), &sum); - std::string contentMD5 = base64::encoder::from_string(sumBytes); - contentMD5.resize(contentMD5.size() - 1); - - wait(writeEntireFileFromBuffer_impl(bstore, bucket, object, &packets, content.size(), contentMD5)); - return Void(); -} - -Future BlobStoreEndpoint::writeEntireFile(std::string const &bucket, std::string const &object, std::string const &content) { - return writeEntireFile_impl(Reference::addRef(this), bucket, object, content); -} - -Future BlobStoreEndpoint::writeEntireFileFromBuffer(std::string const &bucket, std::string const &object, UnsentPacketQueue *pContent, int contentLen, std::string const &contentMD5) { - return writeEntireFileFromBuffer_impl(Reference::addRef(this), bucket, object, pContent, contentLen, contentMD5); -} - -ACTOR Future readObject_impl(Reference bstore, std::string bucket, std::string object, void *data, int length, int64_t offset) { - if(length <= 0) - return 0; - wait(bstore->requestRateRead->getAllowance(1)); - - std::string resource = std::string("/") + bucket + "/" + object; - HTTP::Headers headers; - headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1); - Reference r = wait(bstore->doRequest("GET", resource, headers, nullptr, 0, {200, 206, 404})); - if(r->code == 404) - throw file_not_found(); - if(r->contentLen != r->content.size()) // Double check that this wasn't a header-only response, probably unnecessary - throw io_error(); - // Copy the output bytes, server could have sent more or less bytes than requested so copy at most length bytes - memcpy(data, r->content.data(), std::min(r->contentLen, length)); - return r->contentLen; -} - -Future BlobStoreEndpoint::readObject(std::string const &bucket, std::string const &object, void *data, int length, int64_t offset) { - return readObject_impl(Reference::addRef(this), bucket, object, data, length, offset); -} - -ACTOR static Future beginMultiPartUpload_impl(Reference bstore, std::string bucket, std::string object) { - wait(bstore->requestRateWrite->getAllowance(1)); - - std::string resource = std::string("/") + bucket + "/" + object + "?uploads"; - HTTP::Headers headers; - Reference r = wait(bstore->doRequest("POST", resource, headers, nullptr, 0, {200})); - - try { - xml_document<> doc; - // Copy content because rapidxml will modify it during parse - std::string content = r->content; - - doc.parse<0>((char *)content.c_str()); - - // There should be exactly one node - xml_node<> *result = doc.first_node(); - if(result != nullptr && strcmp(result->name(), "InitiateMultipartUploadResult") == 0) { - xml_node<> *id = result->first_node("UploadId"); - if(id != nullptr) { - return id->value(); - } - } - } catch(...) { - } - throw http_bad_response(); -} - -Future BlobStoreEndpoint::beginMultiPartUpload(std::string const &bucket, std::string const &object) { - return beginMultiPartUpload_impl(Reference::addRef(this), bucket, object); -} - -ACTOR Future uploadPart_impl(Reference bstore, std::string bucket, std::string object, std::string uploadID, unsigned int partNumber, UnsentPacketQueue *pContent, int contentLen, std::string contentMD5) { - wait(bstore->requestRateWrite->getAllowance(1)); - wait(bstore->concurrentUploads.take()); - state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1); - - std::string resource = format("/%s/%s?partNumber=%d&uploadId=%s", bucket.c_str(), object.c_str(), partNumber, uploadID.c_str()); - HTTP::Headers headers; - // Send MD5 sum for content so blobstore can verify it - headers["Content-MD5"] = contentMD5; - state Reference r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200})); - // TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry - // will see error 400. That could be detected and handled gracefully by retrieving the etag for the successful request. - - // For uploads, Blobstore returns an MD5 sum of uploaded content so check it. - if (!r->verifyMD5(false, contentMD5)) - throw checksum_failed(); - - // No etag -> bad response. - std::string etag = r->headers["ETag"]; - if(etag.empty()) - throw http_bad_response(); - - return etag; -} - -Future BlobStoreEndpoint::uploadPart(std::string const &bucket, std::string const &object, std::string const &uploadID, unsigned int partNumber, UnsentPacketQueue *pContent, int contentLen, std::string const &contentMD5) { - return uploadPart_impl(Reference::addRef(this), bucket, object, uploadID, partNumber, pContent, contentLen, contentMD5); -} - -ACTOR Future finishMultiPartUpload_impl(Reference bstore, std::string bucket, std::string object, std::string uploadID, BlobStoreEndpoint::MultiPartSetT parts) { - state UnsentPacketQueue part_list; // NonCopyable state var so must be declared at top of actor - wait(bstore->requestRateWrite->getAllowance(1)); - - std::string manifest = ""; - for(auto &p : parts) - manifest += format("%d%s\n", p.first, p.second.c_str()); - manifest += ""; - - std::string resource = format("/%s/%s?uploadId=%s", bucket.c_str(), object.c_str(), uploadID.c_str()); - HTTP::Headers headers; - PacketWriter pw(part_list.getWriteBuffer(manifest.size()), nullptr, Unversioned()); - pw.serializeBytes(manifest); - Reference r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), {200})); - // TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry - // will see error 400. That could be detected and handled gracefully by HEAD'ing the object before upload to get its (possibly - // nonexistent) eTag, then if an error 400 is seen then retrieve the eTag again and if it has changed then consider the finish complete. - return Void(); -} - -Future BlobStoreEndpoint::finishMultiPartUpload(std::string const &bucket, std::string const &object, std::string const &uploadID, MultiPartSetT const &parts) { - return finishMultiPartUpload_impl(Reference::addRef(this), bucket, object, uploadID, parts); -} diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index b51d7498ed..3ecf66218a 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -1,6 +1,6 @@ set(FDBCLIENT_SRCS - AsyncFileBlobStore.actor.cpp - AsyncFileBlobStore.actor.h + AsyncFileS3BlobStore.actor.cpp + AsyncFileS3BlobStore.actor.h AsyncTaskThread.actor.cpp AsyncTaskThread.h Atomic.h @@ -15,7 +15,6 @@ set(FDBCLIENT_SRCS BackupContainerLocalDirectory.h BackupContainerS3BlobStore.actor.cpp BackupContainerS3BlobStore.h - BlobStore.actor.cpp ClientLogEvents.h ClientWorkerInterface.h ClusterInterface.h @@ -61,6 +60,7 @@ set(FDBCLIENT_SRCS RunTransaction.actor.h RYWIterator.cpp RYWIterator.h + S3BlobStore.actor.cpp Schemas.cpp Schemas.h SnapshotCache.h diff --git a/fdbclient/S3BlobStore.actor.cpp b/fdbclient/S3BlobStore.actor.cpp new file mode 100644 index 0000000000..98e2352e84 --- /dev/null +++ b/fdbclient/S3BlobStore.actor.cpp @@ -0,0 +1,1249 @@ +/* + * S3BlobStore.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/S3BlobStore.h" + +#include "fdbclient/md5/md5.h" +#include "fdbclient/libb64/encode.h" +#include "fdbclient/sha1/SHA1.h" +#include +#include +#include +#include "fdbrpc/IAsyncFile.h" +#include "fdbclient/rapidxml/rapidxml.hpp" +#include "flow/actorcompiler.h" // has to be last include + +using namespace rapidxml; + +json_spirit::mObject S3BlobStoreEndpoint::Stats::getJSON() { + json_spirit::mObject o; + + o["requests_failed"] = requests_failed; + o["requests_successful"] = requests_successful; + o["bytes_sent"] = bytes_sent; + + return o; +} + +S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::Stats::operator-(const Stats& rhs) { + Stats r; + r.requests_failed = requests_failed - rhs.requests_failed; + r.requests_successful = requests_successful - rhs.requests_successful; + r.bytes_sent = bytes_sent - rhs.bytes_sent; + return r; +} + +S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::s_stats; + +S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() { + secure_connection = 1; + connect_tries = CLIENT_KNOBS->BLOBSTORE_CONNECT_TRIES; + connect_timeout = CLIENT_KNOBS->BLOBSTORE_CONNECT_TIMEOUT; + max_connection_life = CLIENT_KNOBS->BLOBSTORE_MAX_CONNECTION_LIFE; + request_tries = CLIENT_KNOBS->BLOBSTORE_REQUEST_TRIES; + request_timeout_min = CLIENT_KNOBS->BLOBSTORE_REQUEST_TIMEOUT_MIN; + requests_per_second = CLIENT_KNOBS->BLOBSTORE_REQUESTS_PER_SECOND; + concurrent_requests = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS; + list_requests_per_second = CLIENT_KNOBS->BLOBSTORE_LIST_REQUESTS_PER_SECOND; + write_requests_per_second = CLIENT_KNOBS->BLOBSTORE_WRITE_REQUESTS_PER_SECOND; + read_requests_per_second = CLIENT_KNOBS->BLOBSTORE_READ_REQUESTS_PER_SECOND; + delete_requests_per_second = CLIENT_KNOBS->BLOBSTORE_DELETE_REQUESTS_PER_SECOND; + multipart_max_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MAX_PART_SIZE; + multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE; + concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS; + concurrent_lists = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_LISTS; + concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE; + concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE; + read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE; + read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS; + read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE; + max_send_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_SEND_BYTES_PER_SECOND; + max_recv_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_RECV_BYTES_PER_SECOND; +} + +bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) { +#define TRY_PARAM(n, sn) \ + if (name == LiteralStringRef(#n) || name == LiteralStringRef(#sn)) { \ + n = value; \ + return true; \ + } + TRY_PARAM(secure_connection, sc) + TRY_PARAM(connect_tries, ct); + TRY_PARAM(connect_timeout, cto); + TRY_PARAM(max_connection_life, mcl); + TRY_PARAM(request_tries, rt); + TRY_PARAM(request_timeout_min, rtom); + // TODO: For backward compatibility because request_timeout was renamed to request_timeout_min + if (name == LiteralStringRef("request_timeout") || name == LiteralStringRef("rto")) { + request_timeout_min = value; + return true; + } + TRY_PARAM(requests_per_second, rps); + TRY_PARAM(list_requests_per_second, lrps); + TRY_PARAM(write_requests_per_second, wrps); + TRY_PARAM(read_requests_per_second, rrps); + TRY_PARAM(delete_requests_per_second, drps); + TRY_PARAM(concurrent_requests, cr); + TRY_PARAM(multipart_max_part_size, maxps); + TRY_PARAM(multipart_min_part_size, minps); + TRY_PARAM(concurrent_uploads, cu); + TRY_PARAM(concurrent_lists, cl); + TRY_PARAM(concurrent_reads_per_file, crpf); + TRY_PARAM(concurrent_writes_per_file, cwpf); + TRY_PARAM(read_block_size, rbs); + TRY_PARAM(read_ahead_blocks, rab); + TRY_PARAM(read_cache_blocks_per_file, rcb); + TRY_PARAM(max_send_bytes_per_second, sbps); + TRY_PARAM(max_recv_bytes_per_second, rbps); +#undef TRY_PARAM + return false; +} + +// Returns an S3 Blob URL parameter string that specifies all of the non-default options for the endpoint using option +// short names. +std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const { + static BlobKnobs defaults; + std::string r; +#define _CHECK_PARAM(n, sn) \ + if (n != defaults.n) { \ + r += format("%s%s=%d", r.empty() ? "" : "&", #sn, n); \ + } + _CHECK_PARAM(secure_connection, sc); + _CHECK_PARAM(connect_tries, ct); + _CHECK_PARAM(connect_timeout, cto); + _CHECK_PARAM(max_connection_life, mcl); + _CHECK_PARAM(request_tries, rt); + _CHECK_PARAM(request_timeout_min, rto); + _CHECK_PARAM(requests_per_second, rps); + _CHECK_PARAM(list_requests_per_second, lrps); + _CHECK_PARAM(write_requests_per_second, wrps); + _CHECK_PARAM(read_requests_per_second, rrps); + _CHECK_PARAM(delete_requests_per_second, drps); + _CHECK_PARAM(concurrent_requests, cr); + _CHECK_PARAM(multipart_max_part_size, maxps); + _CHECK_PARAM(multipart_min_part_size, minps); + _CHECK_PARAM(concurrent_uploads, cu); + _CHECK_PARAM(concurrent_lists, cl); + _CHECK_PARAM(concurrent_reads_per_file, crpf); + _CHECK_PARAM(concurrent_writes_per_file, cwpf); + _CHECK_PARAM(read_block_size, rbs); + _CHECK_PARAM(read_ahead_blocks, rab); + _CHECK_PARAM(read_cache_blocks_per_file, rcb); + _CHECK_PARAM(max_send_bytes_per_second, sbps); + _CHECK_PARAM(max_recv_bytes_per_second, rbps); +#undef _CHECK_PARAM + return r; +} + +Reference S3BlobStoreEndpoint::fromString(std::string const& url, std::string* resourceFromURL, + std::string* error, ParametersT* ignored_parameters) { + if (resourceFromURL) resourceFromURL->clear(); + + try { + StringRef t(url); + StringRef prefix = t.eat("://"); + if (prefix != LiteralStringRef("blobstore")) + throw format("Invalid blobstore URL prefix '%s'", prefix.toString().c_str()); + StringRef cred = t.eat("@"); + uint8_t foundSeparator = 0; + StringRef hostPort = t.eatAny("/?", &foundSeparator); + StringRef resource; + if (foundSeparator == '/') { + resource = t.eat("?"); + } + + // hostPort is at least a host or IP address, optionally followed by :portNumber or :serviceName + StringRef h(hostPort); + StringRef host = h.eat(":"); + if (host.size() == 0) throw std::string("host cannot be empty"); + + StringRef service = h.eat(); + + BlobKnobs knobs; + HTTP::Headers extraHeaders; + while (1) { + StringRef name = t.eat("="); + if (name.size() == 0) break; + StringRef value = t.eat("&"); + + // Special case for header + if (name == LiteralStringRef("header")) { + StringRef originalValue = value; + StringRef headerFieldName = value.eat(":"); + StringRef headerFieldValue = value; + if (headerFieldName.size() == 0 || headerFieldValue.size() == 0) { + throw format("'%s' is not a valid value for '%s' parameter. Format is : " + "where strings are not empty.", + originalValue.toString().c_str(), name.toString().c_str()); + } + std::string& fieldValue = extraHeaders[headerFieldName.toString()]; + // RFC 2616 section 4.2 says header field names can repeat but only if it is valid to concatenate their + // values with comma separation + if (!fieldValue.empty()) { + fieldValue.append(","); + } + fieldValue.append(headerFieldValue.toString()); + continue; + } + + // See if the parameter is a knob + // First try setting a dummy value (all knobs are currently numeric) just to see if this parameter is known + // to S3BlobStoreEndpoint. If it is, then we will set it to a good value or throw below, so the dummy set + // has no bad side effects. + bool known = knobs.set(name, 0); + + // If the parameter is not known to S3BlobStoreEndpoint then throw unless there is an ignored_parameters set + // to add it to + if (!known) { + if (ignored_parameters == nullptr) { + throw format("%s is not a valid parameter name", name.toString().c_str()); + } + (*ignored_parameters)[name.toString()] = value.toString(); + continue; + } + + // The parameter is known to S3BlobStoreEndpoint so it must be numeric and valid. + char* valueEnd; + int ivalue = strtol(value.toString().c_str(), &valueEnd, 10); + if (*valueEnd || (ivalue == 0 && value.toString() != "0")) + throw format("%s is not a valid value for %s", value.toString().c_str(), name.toString().c_str()); + + // It should not be possible for this set to fail now since the dummy set above had to have worked. + ASSERT(knobs.set(name, ivalue)); + } + + if (resourceFromURL != nullptr) *resourceFromURL = resource.toString(); + + StringRef c(cred); + StringRef key = c.eat(":"); + StringRef secret = c.eat(); + + return Reference(new S3BlobStoreEndpoint( + host.toString(), service.toString(), key.toString(), secret.toString(), knobs, extraHeaders)); + + } catch (std::string& err) { + if (error != nullptr) *error = err; + TraceEvent(SevWarnAlways, "S3BlobStoreEndpointBadURL") + .suppressFor(60) + .detail("Description", err) + .detail("Format", getURLFormat()) + .detail("URL", url); + throw backup_invalid_url(); + } +} + +std::string S3BlobStoreEndpoint::getResourceURL(std::string resource, std::string params) { + std::string hostPort = host; + if (!service.empty()) { + hostPort.append(":"); + hostPort.append(service); + } + + // If secret isn't being looked up from credentials files then it was passed explicitly in th URL so show it here. + std::string s; + if (!lookupSecret) s = std::string(":") + secret; + + std::string r = format("blobstore://%s%s@%s/%s", key.c_str(), s.c_str(), hostPort.c_str(), resource.c_str()); + + // Get params that are deviations from knob defaults + std::string knobParams = knobs.getURLParameters(); + if (!knobParams.empty()) { + if (!params.empty()) { + params.append("&"); + } + params.append(knobParams); + } + + for (auto& kv : extraHeaders) { + if (!params.empty()) { + params.append("&"); + } + params.append("header="); + params.append(HTTP::urlEncode(kv.first)); + params.append(":"); + params.append(HTTP::urlEncode(kv.second)); + } + + if (!params.empty()) r.append("?").append(params); + + return r; +} + +ACTOR Future bucketExists_impl(Reference b, std::string bucket) { + wait(b->requestRateRead->getAllowance(1)); + + std::string resource = std::string("/") + bucket; + HTTP::Headers headers; + + Reference r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, { 200, 404 })); + return r->code == 200; +} + +Future S3BlobStoreEndpoint::bucketExists(std::string const& bucket) { + return bucketExists_impl(Reference::addRef(this), bucket); +} + +ACTOR Future objectExists_impl(Reference b, std::string bucket, std::string object) { + wait(b->requestRateRead->getAllowance(1)); + + std::string resource = std::string("/") + bucket + "/" + object; + HTTP::Headers headers; + + Reference r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, { 200, 404 })); + return r->code == 200; +} + +Future S3BlobStoreEndpoint::objectExists(std::string const& bucket, std::string const& object) { + return objectExists_impl(Reference::addRef(this), bucket, object); +} + +ACTOR Future deleteObject_impl(Reference b, std::string bucket, std::string object) { + wait(b->requestRateDelete->getAllowance(1)); + + std::string resource = std::string("/") + bucket + "/" + object; + HTTP::Headers headers; + // 200 or 204 means object successfully deleted, 404 means it already doesn't exist, so any of those are considered + // successful + Reference r = wait(b->doRequest("DELETE", resource, headers, nullptr, 0, { 200, 204, 404 })); + + // But if the object already did not exist then the 'delete' is assumed to be successful but a warning is logged. + if (r->code == 404) { + TraceEvent(SevWarnAlways, "S3BlobStoreEndpointDeleteObjectMissing") + .detail("Host", b->host) + .detail("Bucket", bucket) + .detail("Object", object); + } + + return Void(); +} + +Future S3BlobStoreEndpoint::deleteObject(std::string const& bucket, std::string const& object) { + return deleteObject_impl(Reference::addRef(this), bucket, object); +} + +ACTOR Future deleteRecursively_impl(Reference b, std::string bucket, std::string prefix, + int* pNumDeleted, int64_t* pBytesDeleted) { + state PromiseStream resultStream; + // Start a recursive parallel listing which will send results to resultStream as they are received + state Future done = b->listObjectsStream(bucket, resultStream, prefix, '/', std::numeric_limits::max()); + // Wrap done in an actor which will send end_of_stream since listObjectsStream() does not (so that many calls can + // write to the same stream) + done = map(done, [=](Void) { + resultStream.sendError(end_of_stream()); + return Void(); + }); + + state std::list> deleteFutures; + try { + loop { + choose { + // Throw if done throws, otherwise don't stop until end_of_stream + when(wait(done)) { done = Never(); } + + when(S3BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) { + for (auto& object : list.objects) { + deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [=](Void) -> Void { + if (pNumDeleted != nullptr) { + ++*pNumDeleted; + } + if (pBytesDeleted != nullptr) { + *pBytesDeleted += object.size; + } + return Void(); + })); + } + } + } + + // This is just a precaution to avoid having too many outstanding delete actors waiting to run + while (deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) { + wait(deleteFutures.front()); + deleteFutures.pop_front(); + } + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) throw; + } + + while (deleteFutures.size() > 0) { + wait(deleteFutures.front()); + deleteFutures.pop_front(); + } + + return Void(); +} + +Future S3BlobStoreEndpoint::deleteRecursively(std::string const& bucket, std::string prefix, int* pNumDeleted, + int64_t* pBytesDeleted) { + return deleteRecursively_impl(Reference::addRef(this), bucket, prefix, pNumDeleted, + pBytesDeleted); +} + +ACTOR Future createBucket_impl(Reference b, std::string bucket) { + wait(b->requestRateWrite->getAllowance(1)); + + bool exists = wait(b->bucketExists(bucket)); + if (!exists) { + std::string resource = std::string("/") + bucket; + HTTP::Headers headers; + Reference r = wait(b->doRequest("PUT", resource, headers, nullptr, 0, { 200, 409 })); + } + return Void(); +} + +Future S3BlobStoreEndpoint::createBucket(std::string const& bucket) { + return createBucket_impl(Reference::addRef(this), bucket); +} + +ACTOR Future objectSize_impl(Reference b, std::string bucket, std::string object) { + wait(b->requestRateRead->getAllowance(1)); + + std::string resource = std::string("/") + bucket + "/" + object; + HTTP::Headers headers; + + Reference r = wait(b->doRequest("HEAD", resource, headers, nullptr, 0, { 200, 404 })); + if (r->code == 404) throw file_not_found(); + return r->contentLen; +} + +Future S3BlobStoreEndpoint::objectSize(std::string const& bucket, std::string const& object) { + return objectSize_impl(Reference::addRef(this), bucket, object); +} + +// Try to read a file, parse it as JSON, and return the resulting document. +// It will NOT throw if any errors are encountered, it will just return an empty +// JSON object and will log trace events for the errors encountered. +ACTOR Future> tryReadJSONFile(std::string path) { + state std::string content; + + // Event type to be logged in the event of an exception + state const char* errorEventType = "BlobCredentialFileError"; + + try { + state Reference f = wait(IAsyncFileSystem::filesystem()->open( + path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0)); + state int64_t size = wait(f->size()); + state Standalone buf = makeString(size); + int r = wait(f->read(mutateString(buf), size, 0)); + ASSERT(r == size); + content = buf.toString(); + + // Any exceptions from hehre forward are parse failures + errorEventType = "BlobCredentialFileParseFailed"; + json_spirit::mValue json; + json_spirit::read_string(content, json); + if (json.type() == json_spirit::obj_type) + return json.get_obj(); + else + TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").suppressFor(60).detail("File", path); + + } catch (Error& e) { + if (e.code() != error_code_actor_cancelled) + TraceEvent(SevWarn, errorEventType).error(e).suppressFor(60).detail("File", path); + } + + return Optional(); +} + +ACTOR Future updateSecret_impl(Reference b) { + std::vector* pFiles = (std::vector*)g_network->global(INetwork::enBlobCredentialFiles); + if (pFiles == nullptr) return Void(); + + state std::vector>> reads; + for (auto& f : *pFiles) reads.push_back(tryReadJSONFile(f)); + + wait(waitForAll(reads)); + + std::string key = b->key + "@" + b->host; + + int invalid = 0; + + for (auto& f : reads) { + // If value not present then the credentials file wasn't readable or valid. Continue to check other results. + if (!f.get().present()) { + ++invalid; + continue; + } + + JSONDoc doc(f.get().get()); + if (doc.has("accounts") && doc.last().type() == json_spirit::obj_type) { + JSONDoc accounts(doc.last().get_obj()); + if (accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) { + JSONDoc account(accounts.last()); + std::string secret; + // Once we find a matching account, use it. + if (account.tryGet("secret", secret)) { + b->secret = secret; + return Void(); + } + } + } + } + + // If any sources were invalid + if (invalid > 0) throw backup_auth_unreadable(); + + // All sources were valid but didn't contain the desired info + throw backup_auth_missing(); +} + +Future S3BlobStoreEndpoint::updateSecret() { + return updateSecret_impl(Reference::addRef(this)); +} + +ACTOR Future connect_impl(Reference b) { + // First try to get a connection from the pool + while (!b->connectionPool.empty()) { + S3BlobStoreEndpoint::ReusableConnection rconn = b->connectionPool.front(); + b->connectionPool.pop(); + + // If the connection expires in the future then return it + if (rconn.expirationTime > now()) { + TraceEvent("S3BlobStoreEndpointReusingConnected") + .suppressFor(60) + .detail("RemoteEndpoint", rconn.conn->getPeerAddress()) + .detail("ExpiresIn", rconn.expirationTime - now()); + return rconn; + } + } + std::string service = b->service; + if (service.empty()) service = b->knobs.secure_connection ? "https" : "http"; + state Reference conn = + wait(INetworkConnections::net()->connect(b->host, service, b->knobs.secure_connection ? true : false)); + wait(conn->connectHandshake()); + + TraceEvent("S3BlobStoreEndpointNewConnection") + .suppressFor(60) + .detail("RemoteEndpoint", conn->getPeerAddress()) + .detail("ExpiresIn", b->knobs.max_connection_life); + + if (b->lookupSecret) wait(b->updateSecret()); + + return S3BlobStoreEndpoint::ReusableConnection({ conn, now() + b->knobs.max_connection_life }); +} + +Future S3BlobStoreEndpoint::connect() { + return connect_impl(Reference::addRef(this)); +} + +void S3BlobStoreEndpoint::returnConnection(ReusableConnection& rconn) { + // If it expires in the future then add it to the pool in the front + if (rconn.expirationTime > now()) connectionPool.push(rconn); + rconn.conn = Reference(); +} + +// Do a request, get a Response. +// Request content is provided as UnsentPacketQueue *pContent which will be depleted as bytes are sent but the queue +// itself must live for the life of this actor and be destroyed by the caller +ACTOR Future> doRequest_impl(Reference bstore, std::string verb, + std::string resource, HTTP::Headers headers, + UnsentPacketQueue* pContent, int contentLen, + std::set successCodes) { + state UnsentPacketQueue contentCopy; + + headers["Content-Length"] = format("%d", contentLen); + headers["Host"] = bstore->host; + headers["Accept"] = "application/xml"; + + // Merge extraHeaders into headers + for (auto& kv : bstore->extraHeaders) { + std::string& fieldValue = headers[kv.first]; + if (!fieldValue.empty()) { + fieldValue.append(","); + } + fieldValue.append(kv.second); + } + + // For requests with content to upload, the request timeout should be at least twice the amount of time + // it would take to upload the content given the upload bandwidth and concurrency limits. + int bandwidthThisRequest = 1 + bstore->knobs.max_send_bytes_per_second / bstore->knobs.concurrent_uploads; + int contentUploadSeconds = contentLen / bandwidthThisRequest; + state int requestTimeout = std::max(bstore->knobs.request_timeout_min, 3 * contentUploadSeconds); + + wait(bstore->concurrentRequests.take()); + state FlowLock::Releaser globalReleaser(bstore->concurrentRequests, 1); + + state int maxTries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries); + state int thisTry = 1; + state double nextRetryDelay = 2.0; + + loop { + state Optional err; + state Optional remoteAddress; + state bool connectionEstablished = false; + state Reference r; + + try { + // Start connecting + Future frconn = bstore->connect(); + + // Make a shallow copy of the queue by calling addref() on each buffer in the chain and then prepending that + // chain to contentCopy + contentCopy.discardAll(); + if (pContent != nullptr) { + PacketBuffer* pFirst = pContent->getUnsent(); + PacketBuffer* pLast = nullptr; + for (PacketBuffer* p = pFirst; p != nullptr; p = p->nextPacketBuffer()) { + p->addref(); + // Also reset the sent count on each buffer + p->bytes_sent = 0; + pLast = p; + } + contentCopy.prependWriteBuffer(pFirst, pLast); + } + + // Finish connecting, do request + state S3BlobStoreEndpoint::ReusableConnection rconn = + wait(timeoutError(frconn, bstore->knobs.connect_timeout)); + connectionEstablished = true; + + // Finish/update the request headers (which includes Date header) + // This must be done AFTER the connection is ready because if credentials are coming from disk they are + // refreshed when a new connection is established and setAuthHeaders() would need the updated secret. + bstore->setAuthHeaders(verb, resource, headers); + remoteAddress = rconn.conn->getPeerAddress(); + wait(bstore->requestRate->getAllowance(1)); + Reference _r = + wait(timeoutError(HTTP::doRequest(rconn.conn, verb, resource, headers, &contentCopy, contentLen, + bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate), + requestTimeout)); + r = _r; + + // Since the response was parsed successfully (which is why we are here) reuse the connection unless we + // received the "Connection: close" header. + if (r->headers["Connection"] != "close") bstore->returnConnection(rconn); + rconn.conn.clear(); + + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) throw; + err = e; + } + + // If err is not present then r is valid. + // If r->code is in successCodes then record the successful request and return r. + if (!err.present() && successCodes.count(r->code) != 0) { + bstore->s_stats.requests_successful++; + return r; + } + + // Otherwise, this request is considered failed. Update failure count. + bstore->s_stats.requests_failed++; + + // All errors in err are potentially retryable as well as certain HTTP response codes... + bool retryable = err.present() || r->code == 500 || r->code == 502 || r->code == 503 || r->code == 429; + + // But only if our previous attempt was not the last allowable try. + retryable = retryable && (thisTry < maxTries); + + TraceEvent event(SevWarn, + retryable ? "S3BlobStoreEndpointRequestFailedRetryable" : "S3BlobStoreEndpointRequestFailed"); + + // Attach err to trace event if present, otherwise extract some stuff from the response + if (err.present()) { + event.error(err.get()); + } + event.suppressFor(60); + if (!err.present()) { + event.detail("ResponseCode", r->code); + } + + event.detail("ConnectionEstablished", connectionEstablished); + + if (remoteAddress.present()) + event.detail("RemoteEndpoint", remoteAddress.get()); + else + event.detail("RemoteHost", bstore->host); + + event.detail("Verb", verb).detail("Resource", resource).detail("ThisTry", thisTry); + + // If r is not valid or not code 429 then increment the try count. 429's will not count against the attempt + // limit. + if (!r || r->code != 429) ++thisTry; + + // We will wait delay seconds before the next retry, start with nextRetryDelay. + double delay = nextRetryDelay; + // Double but limit the *next* nextRetryDelay. + nextRetryDelay = std::min(nextRetryDelay * 2, 60.0); + + if (retryable) { + // If r is valid then obey the Retry-After response header if present. + if (r) { + auto iRetryAfter = r->headers.find("Retry-After"); + if (iRetryAfter != r->headers.end()) { + event.detail("RetryAfterHeader", iRetryAfter->second); + char* pEnd; + double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd); + if (*pEnd) // If there were other characters then don't trust the parsed value, use a probably safe + // value of 5 minutes. + retryAfter = 300; + // Update delay + delay = std::max(delay, retryAfter); + } + } + + // Log the delay then wait. + event.detail("RetryDelay", delay); + wait(::delay(delay)); + } else { + // We can't retry, so throw something. + + // This error code means the authentication header was not accepted, likely the account or key is wrong. + if (r && r->code == 406) throw http_not_accepted(); + + if (r && r->code == 401) throw http_auth_failed(); + + // Recognize and throw specific errors + if (err.present()) { + int code = err.get().code(); + + // If we get a timed_out error during the the connect() phase, we'll call that connection_failed despite + // the fact that there was technically never a 'connection' to begin with. It differentiates between an + // active connection timing out vs a connection timing out, though not between an active connection + // failing vs connection attempt failing. + // TODO: Add more error types? + if (code == error_code_timed_out && !connectionEstablished) throw connection_failed(); + + if (code == error_code_timed_out || code == error_code_connection_failed || + code == error_code_lookup_failed) + throw err.get(); + } + + throw http_request_failed(); + } + } +} + +Future> S3BlobStoreEndpoint::doRequest(std::string const& verb, std::string const& resource, + const HTTP::Headers& headers, + UnsentPacketQueue* pContent, int contentLen, + std::set successCodes) { + return doRequest_impl(Reference::addRef(this), verb, resource, headers, pContent, contentLen, + successCodes); +} + +ACTOR Future listObjectsStream_impl(Reference bstore, std::string bucket, + PromiseStream results, + Optional prefix, Optional delimiter, int maxDepth, + std::function recurseFilter) { + // Request 1000 keys at a time, the maximum allowed + state std::string resource = "/"; + resource.append(bucket); + resource.append("/?max-keys=1000"); + if (prefix.present()) resource.append("&prefix=").append(HTTP::urlEncode(prefix.get())); + if (delimiter.present()) resource.append("&delimiter=").append(HTTP::urlEncode(std::string(1, delimiter.get()))); + resource.append("&marker="); + state std::string lastFile; + state bool more = true; + + state std::vector> subLists; + + while (more) { + wait(bstore->concurrentLists.take()); + state FlowLock::Releaser listReleaser(bstore->concurrentLists, 1); + + HTTP::Headers headers; + state std::string fullResource = resource + HTTP::urlEncode(lastFile); + lastFile.clear(); + Reference r = wait(bstore->doRequest("GET", fullResource, headers, nullptr, 0, { 200 })); + listReleaser.release(); + + try { + S3BlobStoreEndpoint::ListResult listResult; + xml_document<> doc; + + // Copy content because rapidxml will modify it during parse + std::string content = r->content; + doc.parse<0>((char*)content.c_str()); + + // There should be exactly one node + xml_node<>* result = doc.first_node(); + if (result == nullptr || strcmp(result->name(), "ListBucketResult") != 0) { + throw http_bad_response(); + } + + xml_node<>* n = result->first_node(); + while (n != nullptr) { + const char* name = n->name(); + if (strcmp(name, "IsTruncated") == 0) { + const char* val = n->value(); + if (strcmp(val, "true") == 0) { + more = true; + } else if (strcmp(val, "false") == 0) { + more = false; + } else { + throw http_bad_response(); + } + } else if (strcmp(name, "Contents") == 0) { + S3BlobStoreEndpoint::ObjectInfo object; + + xml_node<>* key = n->first_node("Key"); + if (key == nullptr) { + throw http_bad_response(); + } + object.name = key->value(); + + xml_node<>* size = n->first_node("Size"); + if (size == nullptr) { + throw http_bad_response(); + } + object.size = strtoull(size->value(), nullptr, 10); + + listResult.objects.push_back(object); + } else if (strcmp(name, "CommonPrefixes") == 0) { + xml_node<>* prefixNode = n->first_node("Prefix"); + while (prefixNode != nullptr) { + const char* prefix = prefixNode->value(); + // If recursing, queue a sub-request, otherwise add the common prefix to the result. + if (maxDepth > 0) { + // If there is no recurse filter or the filter returns true then start listing the subfolder + if (!recurseFilter || recurseFilter(prefix)) { + subLists.push_back(bstore->listObjectsStream(bucket, results, prefix, delimiter, + maxDepth - 1, recurseFilter)); + } + // Since prefix will not be in the final listResult below we have to set lastFile here in + // case it's greater than the last object + lastFile = prefix; + } else { + listResult.commonPrefixes.push_back(prefix); + } + + prefixNode = prefixNode->next_sibling("Prefix"); + } + } + + n = n->next_sibling(); + } + + results.send(listResult); + + if (more) { + // lastFile will be the last commonprefix for which a sublist was started, if any. + // If there are any objects and the last one is greater than lastFile then make it the new lastFile. + if (!listResult.objects.empty() && lastFile < listResult.objects.back().name) { + lastFile = listResult.objects.back().name; + } + // If there are any common prefixes and the last one is greater than lastFile then make it the new + // lastFile. + if (!listResult.commonPrefixes.empty() && lastFile < listResult.commonPrefixes.back()) { + lastFile = listResult.commonPrefixes.back(); + } + + // If lastFile is empty at this point, something has gone wrong. + if (lastFile.empty()) { + TraceEvent(SevWarn, "S3BlobStoreEndpointListNoNextMarker") + .suppressFor(60) + .detail("Resource", fullResource); + throw http_bad_response(); + } + } + } catch (Error& e) { + if (e.code() != error_code_actor_cancelled) + TraceEvent(SevWarn, "S3BlobStoreEndpointListResultParseError") + .error(e) + .suppressFor(60) + .detail("Resource", fullResource); + throw http_bad_response(); + } + } + + wait(waitForAll(subLists)); + + return Void(); +} + +Future S3BlobStoreEndpoint::listObjectsStream(std::string const& bucket, PromiseStream results, + Optional prefix, Optional delimiter, + int maxDepth, + std::function recurseFilter) { + return listObjectsStream_impl(Reference::addRef(this), bucket, results, prefix, delimiter, + maxDepth, recurseFilter); +} + +ACTOR Future listObjects_impl(Reference bstore, + std::string bucket, Optional prefix, + Optional delimiter, int maxDepth, + std::function recurseFilter) { + state S3BlobStoreEndpoint::ListResult results; + state PromiseStream resultStream; + state Future done = + bstore->listObjectsStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter); + // Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same + // stream + done = map(done, [=](Void) { + resultStream.sendError(end_of_stream()); + return Void(); + }); + + try { + loop { + choose { + // Throw if done throws, otherwise don't stop until end_of_stream + when(wait(done)) { done = Never(); } + + when(S3BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) { + results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), + info.commonPrefixes.end()); + results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end()); + } + } + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) throw; + } + + return results; +} + +Future S3BlobStoreEndpoint::listObjects( + std::string const& bucket, Optional prefix, Optional delimiter, int maxDepth, + std::function recurseFilter) { + return listObjects_impl(Reference::addRef(this), bucket, prefix, delimiter, maxDepth, + recurseFilter); +} + +ACTOR Future> listBuckets_impl(Reference bstore) { + state std::string resource = "/?marker="; + state std::string lastName; + state bool more = true; + state std::vector buckets; + + while (more) { + wait(bstore->concurrentLists.take()); + state FlowLock::Releaser listReleaser(bstore->concurrentLists, 1); + + HTTP::Headers headers; + state std::string fullResource = resource + HTTP::urlEncode(lastName); + Reference r = wait(bstore->doRequest("GET", fullResource, headers, nullptr, 0, { 200 })); + listReleaser.release(); + + try { + xml_document<> doc; + + // Copy content because rapidxml will modify it during parse + std::string content = r->content; + doc.parse<0>((char*)content.c_str()); + + // There should be exactly one node + xml_node<>* result = doc.first_node(); + if (result == nullptr || strcmp(result->name(), "ListAllMyBucketsResult") != 0) { + throw http_bad_response(); + } + + more = false; + xml_node<>* truncated = result->first_node("IsTruncated"); + if (truncated != nullptr && strcmp(truncated->value(), "true") == 0) { + more = true; + } + + xml_node<>* bucketsNode = result->first_node("Buckets"); + if (bucketsNode != nullptr) { + xml_node<>* bucketNode = bucketsNode->first_node("Bucket"); + while (bucketNode != nullptr) { + xml_node<>* nameNode = bucketNode->first_node("Name"); + if (nameNode == nullptr) { + throw http_bad_response(); + } + const char* name = nameNode->value(); + buckets.push_back(name); + + bucketNode = bucketNode->next_sibling("Bucket"); + } + } + + if (more) { + lastName = buckets.back(); + } + + } catch (Error& e) { + if (e.code() != error_code_actor_cancelled) + TraceEvent(SevWarn, "S3BlobStoreEndpointListBucketResultParseError") + .error(e) + .suppressFor(60) + .detail("Resource", fullResource); + throw http_bad_response(); + } + } + + return buckets; +} + +Future> S3BlobStoreEndpoint::listBuckets() { + return listBuckets_impl(Reference::addRef(this)); +} + +std::string S3BlobStoreEndpoint::hmac_sha1(std::string const& msg) { + std::string key = secret; + + // First pad the key to 64 bytes. + key.append(64 - key.size(), '\0'); + + std::string kipad = key; + for (int i = 0; i < 64; ++i) kipad[i] ^= '\x36'; + + std::string kopad = key; + for (int i = 0; i < 64; ++i) kopad[i] ^= '\x5c'; + + kipad.append(msg); + std::string hkipad = SHA1::from_string(kipad); + kopad.append(hkipad); + return SHA1::from_string(kopad); +} + +void S3BlobStoreEndpoint::setAuthHeaders(std::string const& verb, std::string const& resource, HTTP::Headers& headers) { + std::string& date = headers["Date"]; + + char dateBuf[20]; + time_t ts; + time(&ts); + // ISO 8601 format YYYYMMDD'T'HHMMSS'Z' + strftime(dateBuf, 20, "%Y%m%dT%H%M%SZ", gmtime(&ts)); + date = dateBuf; + + std::string msg; + msg.append(verb); + msg.append("\n"); + auto contentMD5 = headers.find("Content-MD5"); + if (contentMD5 != headers.end()) msg.append(contentMD5->second); + msg.append("\n"); + auto contentType = headers.find("Content-Type"); + if (contentType != headers.end()) msg.append(contentType->second); + msg.append("\n"); + msg.append(date); + msg.append("\n"); + for (auto h : headers) { + StringRef name = h.first; + if (name.startsWith(LiteralStringRef("x-amz")) || name.startsWith(LiteralStringRef("x-icloud"))) { + msg.append(h.first); + msg.append(":"); + msg.append(h.second); + msg.append("\n"); + } + } + + msg.append(resource); + if (verb == "GET") { + size_t q = resource.find_last_of('?'); + if (q != resource.npos) msg.resize(msg.size() - (resource.size() - q)); + } + + std::string sig = base64::encoder::from_string(hmac_sha1(msg)); + // base64 encoded blocks end in \n so remove it. + sig.resize(sig.size() - 1); + std::string auth = "AWS "; + auth.append(key); + auth.append(":"); + auth.append(sig); + headers["Authorization"] = auth; +} + +ACTOR Future readEntireFile_impl(Reference bstore, std::string bucket, + std::string object) { + wait(bstore->requestRateRead->getAllowance(1)); + + std::string resource = std::string("/") + bucket + "/" + object; + HTTP::Headers headers; + Reference r = wait(bstore->doRequest("GET", resource, headers, nullptr, 0, { 200, 404 })); + if (r->code == 404) throw file_not_found(); + return r->content; +} + +Future S3BlobStoreEndpoint::readEntireFile(std::string const& bucket, std::string const& object) { + return readEntireFile_impl(Reference::addRef(this), bucket, object); +} + +ACTOR Future writeEntireFileFromBuffer_impl(Reference bstore, std::string bucket, + std::string object, UnsentPacketQueue* pContent, int contentLen, + std::string contentMD5) { + if (contentLen > bstore->knobs.multipart_max_part_size) throw file_too_large(); + + wait(bstore->requestRateWrite->getAllowance(1)); + wait(bstore->concurrentUploads.take()); + state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1); + + std::string resource = std::string("/") + bucket + "/" + object; + HTTP::Headers headers; + // Send MD5 sum for content so blobstore can verify it + headers["Content-MD5"] = contentMD5; + state Reference r = + wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, { 200 })); + + // For uploads, Blobstore returns an MD5 sum of uploaded content so check it. + if (!r->verifyMD5(false, contentMD5)) throw checksum_failed(); + + return Void(); +} + +ACTOR Future writeEntireFile_impl(Reference bstore, std::string bucket, std::string object, + std::string content) { + state UnsentPacketQueue packets; + PacketWriter pw(packets.getWriteBuffer(content.size()), nullptr, Unversioned()); + pw.serializeBytes(content); + if (content.size() > bstore->knobs.multipart_max_part_size) throw file_too_large(); + + // Yield because we may have just had to copy several MB's into packet buffer chain and next we have to calculate an + // MD5 sum of it. + // TODO: If this actor is used to send large files then combine the summing and packetization into a loop with a + // yield() every 20k or so. + wait(yield()); + + MD5_CTX sum; + ::MD5_Init(&sum); + ::MD5_Update(&sum, content.data(), content.size()); + std::string sumBytes; + sumBytes.resize(16); + ::MD5_Final((unsigned char*)sumBytes.data(), &sum); + std::string contentMD5 = base64::encoder::from_string(sumBytes); + contentMD5.resize(contentMD5.size() - 1); + + wait(writeEntireFileFromBuffer_impl(bstore, bucket, object, &packets, content.size(), contentMD5)); + return Void(); +} + +Future S3BlobStoreEndpoint::writeEntireFile(std::string const& bucket, std::string const& object, + std::string const& content) { + return writeEntireFile_impl(Reference::addRef(this), bucket, object, content); +} + +Future S3BlobStoreEndpoint::writeEntireFileFromBuffer(std::string const& bucket, std::string const& object, + UnsentPacketQueue* pContent, int contentLen, + std::string const& contentMD5) { + return writeEntireFileFromBuffer_impl(Reference::addRef(this), bucket, object, pContent, + contentLen, contentMD5); +} + +ACTOR Future readObject_impl(Reference bstore, std::string bucket, std::string object, + void* data, int length, int64_t offset) { + if (length <= 0) return 0; + wait(bstore->requestRateRead->getAllowance(1)); + + std::string resource = std::string("/") + bucket + "/" + object; + HTTP::Headers headers; + headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1); + Reference r = wait(bstore->doRequest("GET", resource, headers, nullptr, 0, { 200, 206, 404 })); + if (r->code == 404) throw file_not_found(); + if (r->contentLen != + r->content.size()) // Double check that this wasn't a header-only response, probably unnecessary + throw io_error(); + // Copy the output bytes, server could have sent more or less bytes than requested so copy at most length bytes + memcpy(data, r->content.data(), std::min(r->contentLen, length)); + return r->contentLen; +} + +Future S3BlobStoreEndpoint::readObject(std::string const& bucket, std::string const& object, void* data, + int length, int64_t offset) { + return readObject_impl(Reference::addRef(this), bucket, object, data, length, offset); +} + +ACTOR static Future beginMultiPartUpload_impl(Reference bstore, std::string bucket, + std::string object) { + wait(bstore->requestRateWrite->getAllowance(1)); + + std::string resource = std::string("/") + bucket + "/" + object + "?uploads"; + HTTP::Headers headers; + Reference r = wait(bstore->doRequest("POST", resource, headers, nullptr, 0, { 200 })); + + try { + xml_document<> doc; + // Copy content because rapidxml will modify it during parse + std::string content = r->content; + + doc.parse<0>((char*)content.c_str()); + + // There should be exactly one node + xml_node<>* result = doc.first_node(); + if (result != nullptr && strcmp(result->name(), "InitiateMultipartUploadResult") == 0) { + xml_node<>* id = result->first_node("UploadId"); + if (id != nullptr) { + return id->value(); + } + } + } catch (...) { + } + throw http_bad_response(); +} + +Future S3BlobStoreEndpoint::beginMultiPartUpload(std::string const& bucket, std::string const& object) { + return beginMultiPartUpload_impl(Reference::addRef(this), bucket, object); +} + +ACTOR Future uploadPart_impl(Reference bstore, std::string bucket, std::string object, + std::string uploadID, unsigned int partNumber, UnsentPacketQueue* pContent, + int contentLen, std::string contentMD5) { + wait(bstore->requestRateWrite->getAllowance(1)); + wait(bstore->concurrentUploads.take()); + state FlowLock::Releaser uploadReleaser(bstore->concurrentUploads, 1); + + std::string resource = + format("/%s/%s?partNumber=%d&uploadId=%s", bucket.c_str(), object.c_str(), partNumber, uploadID.c_str()); + HTTP::Headers headers; + // Send MD5 sum for content so blobstore can verify it + headers["Content-MD5"] = contentMD5; + state Reference r = + wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, { 200 })); + // TODO: In the event that the client times out just before the request completes (so the client is unaware) then + // the next retry will see error 400. That could be detected and handled gracefully by retrieving the etag for the + // successful request. + + // For uploads, Blobstore returns an MD5 sum of uploaded content so check it. + if (!r->verifyMD5(false, contentMD5)) throw checksum_failed(); + + // No etag -> bad response. + std::string etag = r->headers["ETag"]; + if (etag.empty()) throw http_bad_response(); + + return etag; +} + +Future S3BlobStoreEndpoint::uploadPart(std::string const& bucket, std::string const& object, + std::string const& uploadID, unsigned int partNumber, + UnsentPacketQueue* pContent, int contentLen, + std::string const& contentMD5) { + return uploadPart_impl(Reference::addRef(this), bucket, object, uploadID, partNumber, pContent, + contentLen, contentMD5); +} + +ACTOR Future finishMultiPartUpload_impl(Reference bstore, std::string bucket, + std::string object, std::string uploadID, + S3BlobStoreEndpoint::MultiPartSetT parts) { + state UnsentPacketQueue part_list; // NonCopyable state var so must be declared at top of actor + wait(bstore->requestRateWrite->getAllowance(1)); + + std::string manifest = ""; + for (auto& p : parts) + manifest += format("%d%s\n", p.first, p.second.c_str()); + manifest += ""; + + std::string resource = format("/%s/%s?uploadId=%s", bucket.c_str(), object.c_str(), uploadID.c_str()); + HTTP::Headers headers; + PacketWriter pw(part_list.getWriteBuffer(manifest.size()), nullptr, Unversioned()); + pw.serializeBytes(manifest); + Reference r = + wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), { 200 })); + // TODO: In the event that the client times out just before the request completes (so the client is unaware) then + // the next retry will see error 400. That could be detected and handled gracefully by HEAD'ing the object before + // upload to get its (possibly nonexistent) eTag, then if an error 400 is seen then retrieve the eTag again and if + // it has changed then consider the finish complete. + return Void(); +} + +Future S3BlobStoreEndpoint::finishMultiPartUpload(std::string const& bucket, std::string const& object, + std::string const& uploadID, MultiPartSetT const& parts) { + return finishMultiPartUpload_impl(Reference::addRef(this), bucket, object, uploadID, parts); +} diff --git a/fdbclient/BlobStore.h b/fdbclient/S3BlobStore.h similarity index 53% rename from fdbclient/BlobStore.h rename to fdbclient/S3BlobStore.h index c3d7d9a00b..d8db509ebf 100644 --- a/fdbclient/BlobStore.h +++ b/fdbclient/S3BlobStore.h @@ -1,5 +1,5 @@ /* - * BlobStore.h + * S3BlobStore.h * * This source file is part of the FoundationDB open source project * @@ -31,11 +31,11 @@ // Representation of all the things you need to connect to a blob store instance with some credentials. // Reference counted because a very large number of them could be needed. -class BlobStoreEndpoint : public ReferenceCounted { +class S3BlobStoreEndpoint : public ReferenceCounted { public: struct Stats { Stats() : requests_successful(0), requests_failed(0), bytes_sent(0) {} - Stats operator-(const Stats &rhs); + Stats operator-(const Stats& rhs); void clear() { memset(this, 0, sizeof(*this)); } json_spirit::mObject getJSON(); @@ -48,29 +48,12 @@ public: struct BlobKnobs { BlobKnobs(); - int secure_connection, - connect_tries, - connect_timeout, - max_connection_life, - request_tries, - request_timeout_min, - requests_per_second, - list_requests_per_second, - write_requests_per_second, - read_requests_per_second, - delete_requests_per_second, - multipart_max_part_size, - multipart_min_part_size, - concurrent_requests, - concurrent_uploads, - concurrent_lists, - concurrent_reads_per_file, - concurrent_writes_per_file, - read_block_size, - read_ahead_blocks, - read_cache_blocks_per_file, - max_send_bytes_per_second, - max_recv_bytes_per_second; + int secure_connection, connect_tries, connect_timeout, max_connection_life, request_tries, request_timeout_min, + requests_per_second, list_requests_per_second, write_requests_per_second, read_requests_per_second, + delete_requests_per_second, multipart_max_part_size, multipart_min_part_size, concurrent_requests, + concurrent_uploads, concurrent_lists, concurrent_reads_per_file, concurrent_writes_per_file, + read_block_size, read_ahead_blocks, read_cache_blocks_per_file, max_send_bytes_per_second, + max_recv_bytes_per_second; bool set(StringRef name, int value); std::string getURLParameters() const; static std::vector getKnobDescriptions() { @@ -79,8 +62,10 @@ public: "connect_tries (or ct) Number of times to try to connect for each request.", "connect_timeout (or cto) Number of seconds to wait for a connect request to succeed.", "max_connection_life (or mcl) Maximum number of seconds to use a single TCP connection.", - "request_tries (or rt) Number of times to try each request until a parseable HTTP response other than 429 is received.", - "request_timeout_min (or rtom) Number of seconds to wait for a request to succeed after a connection is established.", + "request_tries (or rt) Number of times to try each request until a parseable HTTP " + "response other than 429 is received.", + "request_timeout_min (or rtom) Number of seconds to wait for a request to succeed after a " + "connection is established.", "requests_per_second (or rps) Max number of requests to start per second.", "list_requests_per_second (or lrps) Max number of list requests to start per second.", "write_requests_per_second (or wrps) Max number of write requests to start per second.", @@ -88,8 +73,10 @@ public: "delete_requests_per_second (or drps) Max number of delete requests to start per second.", "multipart_max_part_size (or maxps) Max part size for multipart uploads.", "multipart_min_part_size (or minps) Min part size for multipart uploads.", - "concurrent_requests (or cr) Max number of total requests in progress at once, regardless of operation-specific concurrency limits.", - "concurrent_uploads (or cu) Max concurrent uploads (part or whole) that can be in progress at once.", + "concurrent_requests (or cr) Max number of total requests in progress at once, regardless of " + "operation-specific concurrency limits.", + "concurrent_uploads (or cu) Max concurrent uploads (part or whole) that can be in progress " + "at once.", "concurrent_lists (or cl) Max concurrent list operations that can be in progress at once.", "concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.", "concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.", @@ -97,43 +84,45 @@ public: "read_ahead_blocks (or rab) Number of blocks to read ahead of requested offset.", "read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.", "max_send_bytes_per_second (or sbps) Max send bytes per second for all requests combined.", - "max_recv_bytes_per_second (or rbps) Max receive bytes per second for all requests combined (NOT YET USED)." + "max_recv_bytes_per_second (or rbps) Max receive bytes per second for all requests combined (NOT YET " + "USED)." }; } }; - BlobStoreEndpoint(std::string const &host, std::string service, std::string const &key, std::string const &secret, BlobKnobs const &knobs = BlobKnobs(), HTTP::Headers extraHeaders = HTTP::Headers()) - : host(host), service(service), key(key), secret(secret), lookupSecret(secret.empty()), knobs(knobs), extraHeaders(extraHeaders), - requestRate(new SpeedLimit(knobs.requests_per_second, 1)), - requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)), - requestRateWrite(new SpeedLimit(knobs.write_requests_per_second, 1)), - requestRateRead(new SpeedLimit(knobs.read_requests_per_second, 1)), - requestRateDelete(new SpeedLimit(knobs.delete_requests_per_second, 1)), - sendRate(new SpeedLimit(knobs.max_send_bytes_per_second, 1)), - recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)), - concurrentRequests(knobs.concurrent_requests), - concurrentUploads(knobs.concurrent_uploads), - concurrentLists(knobs.concurrent_lists) { + S3BlobStoreEndpoint(std::string const& host, std::string service, std::string const& key, std::string const& secret, + BlobKnobs const& knobs = BlobKnobs(), HTTP::Headers extraHeaders = HTTP::Headers()) + : host(host), service(service), key(key), secret(secret), lookupSecret(secret.empty()), knobs(knobs), + extraHeaders(extraHeaders), requestRate(new SpeedLimit(knobs.requests_per_second, 1)), + requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)), + requestRateWrite(new SpeedLimit(knobs.write_requests_per_second, 1)), + requestRateRead(new SpeedLimit(knobs.read_requests_per_second, 1)), + requestRateDelete(new SpeedLimit(knobs.delete_requests_per_second, 1)), + sendRate(new SpeedLimit(knobs.max_send_bytes_per_second, 1)), + recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)), concurrentRequests(knobs.concurrent_requests), + concurrentUploads(knobs.concurrent_uploads), concurrentLists(knobs.concurrent_lists) { - if(host.empty()) - throw connection_string_invalid(); + if (host.empty()) throw connection_string_invalid(); } static std::string getURLFormat(bool withResource = false) { - const char *resource = ""; - if(withResource) - resource = ""; - return format("blobstore://:@[:]/%s[?=[&=]...]", resource); + const char* resource = ""; + if (withResource) resource = ""; + return format("blobstore://:@[:]/%s[?=[&=]...]", + resource); } typedef std::map ParametersT; - // Parse url and return a BlobStoreEndpoint - // If the url has parameters that BlobStoreEndpoint can't consume then an error will be thrown unless ignored_parameters is given in which case - // the unconsumed parameters will be added to it. - static Reference fromString(std::string const &url, std::string *resourceFromURL = nullptr, std::string *error = nullptr, ParametersT *ignored_parameters = nullptr); + // Parse url and return a S3BlobStoreEndpoint + // If the url has parameters that S3BlobStoreEndpoint can't consume then an error will be thrown unless + // ignored_parameters is given in which case the unconsumed parameters will be added to it. + static Reference fromString(std::string const& url, std::string* resourceFromURL = nullptr, + std::string* error = nullptr, + ParametersT* ignored_parameters = nullptr); - // Get a normalized version of this URL with the given resource and any non-default BlobKnob values as URL parameters in addition to the passed params string + // Get a normalized version of this URL with the given resource and any non-default BlobKnob values as URL + // parameters in addition to the passed params string std::string getResourceURL(std::string resource, std::string params); struct ReusableConnection { @@ -142,7 +131,7 @@ public: }; std::queue connectionPool; Future connect(); - void returnConnection(ReusableConnection &conn); + void returnConnection(ReusableConnection& conn); std::string host; std::string service; @@ -167,18 +156,21 @@ public: Future updateSecret(); // Calculates the authentication string from the secret key - std::string hmac_sha1(std::string const &msg); + std::string hmac_sha1(std::string const& msg); // Sets headers needed for Authorization (including Date which will be overwritten if present) - void setAuthHeaders(std::string const &verb, std::string const &resource, HTTP::Headers &headers); + void setAuthHeaders(std::string const& verb, std::string const& resource, HTTP::Headers& headers); // Prepend the HTTP request header to the given PacketBuffer, returning the new head of the buffer chain - static PacketBuffer * writeRequestHeader(std::string const &request, HTTP::Headers const &headers, PacketBuffer *dest); + static PacketBuffer* writeRequestHeader(std::string const& request, HTTP::Headers const& headers, + PacketBuffer* dest); // Do an HTTP request to the Blob Store, read the response. Handles authentication. // Every blob store interaction should ultimately go through this function - Future> doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set successCodes); + Future> doRequest(std::string const& verb, std::string const& resource, + const HTTP::Headers& headers, UnsentPacketQueue* pContent, + int contentLen, std::set successCodes); struct ObjectInfo { std::string name; @@ -192,51 +184,61 @@ public: // Get bucket contents via a stream, since listing large buckets will take many serial blob requests // If a delimiter is passed then common prefixes will be read in parallel, recursively, depending on recurseFilter. - // Recursefilter is a must be a function that takes a string and returns true if it passes. The default behavior is to assume true. - Future listObjectsStream(std::string const &bucket, PromiseStream results, Optional prefix = {}, Optional delimiter = {}, int maxDepth = 0, std::function recurseFilter = nullptr); + // Recursefilter is a must be a function that takes a string and returns true if it passes. The default behavior is + // to assume true. + Future listObjectsStream(std::string const& bucket, PromiseStream results, + Optional prefix = {}, Optional delimiter = {}, int maxDepth = 0, + std::function recurseFilter = nullptr); // Get a list of the files in a bucket, see listObjectsStream for more argument detail. - Future listObjects(std::string const &bucket, Optional prefix = {}, Optional delimiter = {}, int maxDepth = 0, std::function recurseFilter = nullptr); + Future listObjects(std::string const& bucket, Optional prefix = {}, + Optional delimiter = {}, int maxDepth = 0, + std::function recurseFilter = nullptr); // Get a list of all buckets Future> listBuckets(); // Check if a bucket exists - Future bucketExists(std::string const &bucket); + Future bucketExists(std::string const& bucket); // Check if an object exists in a bucket - Future objectExists(std::string const &bucket, std::string const &object); + Future objectExists(std::string const& bucket, std::string const& object); // Get the size of an object in a bucket - Future objectSize(std::string const &bucket, std::string const &object); + Future objectSize(std::string const& bucket, std::string const& object); // Read an arbitrary segment of an object - Future readObject(std::string const &bucket, std::string const &object, void *data, int length, int64_t offset); + Future readObject(std::string const& bucket, std::string const& object, void* data, int length, + int64_t offset); // Delete an object in a bucket - Future deleteObject(std::string const &bucket, std::string const &object); + Future deleteObject(std::string const& bucket, std::string const& object); // Delete all objects in a bucket under a prefix. Note this is not atomic as blob store does not // support this operation directly. This method is just a convenience method that lists and deletes // all of the objects in the bucket under the given prefix. // Since it can take a while, if a pNumDeleted and/or pBytesDeleted are provided they will be incremented every time // a deletion of an object completes. - Future deleteRecursively(std::string const &bucket, std::string prefix = "", int *pNumDeleted = nullptr, int64_t *pBytesDeleted = nullptr); + Future deleteRecursively(std::string const& bucket, std::string prefix = "", int* pNumDeleted = nullptr, + int64_t* pBytesDeleted = nullptr); // Create a bucket if it does not already exists. - Future createBucket(std::string const &bucket); + Future createBucket(std::string const& bucket); // Useful methods for working with tiny files - Future readEntireFile(std::string const &bucket, std::string const &object); - Future writeEntireFile(std::string const &bucket, std::string const &object, std::string const &content); - Future writeEntireFileFromBuffer(std::string const &bucket, std::string const &object, UnsentPacketQueue *pContent, int contentLen, std::string const &contentMD5); + Future readEntireFile(std::string const& bucket, std::string const& object); + Future writeEntireFile(std::string const& bucket, std::string const& object, std::string const& content); + Future writeEntireFileFromBuffer(std::string const& bucket, std::string const& object, + UnsentPacketQueue* pContent, int contentLen, std::string const& contentMD5); // MultiPart upload methods // Returns UploadID - Future beginMultiPartUpload(std::string const &bucket, std::string const &object); + Future beginMultiPartUpload(std::string const& bucket, std::string const& object); // Returns eTag - Future uploadPart(std::string const &bucket, std::string const &object, std::string const &uploadID, unsigned int partNumber, UnsentPacketQueue *pContent, int contentLen, std::string const &contentMD5); + Future uploadPart(std::string const& bucket, std::string const& object, std::string const& uploadID, + unsigned int partNumber, UnsentPacketQueue* pContent, int contentLen, + std::string const& contentMD5); typedef std::map MultiPartSetT; - Future finishMultiPartUpload(std::string const &bucket, std::string const &object, std::string const &uploadID, MultiPartSetT const &parts); + Future finishMultiPartUpload(std::string const& bucket, std::string const& object, + std::string const& uploadID, MultiPartSetT const& parts); }; - diff --git a/fdbrpc/IAsyncFile.h b/fdbrpc/IAsyncFile.h index d02fb8368c..85d879e3fe 100644 --- a/fdbrpc/IAsyncFile.h +++ b/fdbrpc/IAsyncFile.h @@ -26,7 +26,7 @@ #include "flow/flow.h" // All outstanding operations must be cancelled before the destructor of IAsyncFile is called. -// The desirability of the above semantic is disputed. Some classes (AsyncFileBlobStore, +// The desirability of the above semantic is disputed. Some classes (AsyncFileS3BlobStore, // AsyncFileCached) maintain references, while others (AsyncFileNonDurable) don't, and the comment // is unapplicable to some others as well (AsyncFileKAIO). It's safest to assume that all operations // must complete or cancel, but you should probably look at the file implementations you'll be using. diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ae8b85091b..a41f87d987 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -43,7 +43,6 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES BackupContainers.txt IGNORE) add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE) add_fdb_test(TEST_FILES BigInsert.txt IGNORE) - add_fdb_test(TEST_FILES BlobStore.txt IGNORE) add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE) add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE) add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE) @@ -76,6 +75,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES RedwoodPerfPrefixCompression.txt IGNORE) add_fdb_test(TEST_FILES RedwoodPerfSequentialInsert.txt IGNORE) add_fdb_test(TEST_FILES RocksDBTest.txt IGNORE) + add_fdb_test(TEST_FILES S3BlobStore.txt IGNORE) add_fdb_test(TEST_FILES SampleNoSimAttrition.txt IGNORE) if (NOT USE_UBSAN) # TODO re-enable in UBSAN after https://github.com/apple/foundationdb/issues/2410 is resolved add_fdb_test(TEST_FILES SimpleExternalTest.txt) diff --git a/tests/BlobStore.txt b/tests/S3BlobStore.txt similarity index 100% rename from tests/BlobStore.txt rename to tests/S3BlobStore.txt