From ef84e52127c031a4562f3d26a27a65b8cd44b4e6 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Wed, 18 Oct 2017 05:51:30 -0700 Subject: [PATCH] Improved error handling and memory usage in AsyncFileBlobStoreWrite. Writes will now fail if any upload has already failed, rather than buffering unboundedly until sync() is called to complete the file. There is also a configurable limit on how many uploads can be pending before writes will stall waiting for one to finish. --- fdbclient/Knobs.cpp | 1 + fdbclient/Knobs.h | 1 + fdbrpc/AsyncFileBlobStore.actor.h | 49 ++++++++++++++++++++----------- fdbrpc/BlobStore.actor.cpp | 3 ++ fdbrpc/BlobStore.h | 2 ++ 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index 3ea46b286f..cc2d413b16 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -144,6 +144,7 @@ ClientKnobs::ClientKnobs(bool randomize) { init( BLOBSTORE_CONCURRENT_UPLOADS, BACKUP_TASKS_PER_AGENT*2 ); + init( BLOBSTORE_CONCURRENT_WRITES_PER_FILE, 5 ); init( BLOBSTORE_CONCURRENT_READS_PER_FILE, 3 ); init( BLOBSTORE_READ_BLOCK_SIZE, 1024 * 1024 ); init( BLOBSTORE_READ_AHEAD_BLOCKS, 0 ); diff --git a/fdbclient/Knobs.h b/fdbclient/Knobs.h index 5af4d4525b..71fcb34b34 100644 --- a/fdbclient/Knobs.h +++ b/fdbclient/Knobs.h @@ -150,6 +150,7 @@ public: int BLOBSTORE_MULTIPART_MAX_PART_SIZE; int BLOBSTORE_MULTIPART_MIN_PART_SIZE; int BLOBSTORE_CONCURRENT_UPLOADS; + int BLOBSTORE_CONCURRENT_WRITES_PER_FILE; int BLOBSTORE_CONCURRENT_READS_PER_FILE; int BLOBSTORE_READ_BLOCK_SIZE; int BLOBSTORE_READ_AHEAD_BLOCKS; diff --git a/fdbrpc/AsyncFileBlobStore.actor.h b/fdbrpc/AsyncFileBlobStore.actor.h index 4fc4ac73fb..6dd164b694 100644 --- a/fdbrpc/AsyncFileBlobStore.actor.h +++ b/fdbrpc/AsyncFileBlobStore.actor.h @@ -38,6 +38,16 @@ #include "md5/md5.h" #include "libb64/encode.h" +ACTOR template static Future joinErrorGroup(Future f, Promise p) { + try { + Void _ = wait(success(f) || p.getFuture()); + return f.get(); + } catch(Error &e) { + if(p.canBeSet()) + p.sendError(e); + throw; + } +} // This class represents a write-only file that lives in an S3-style blob store. It writes using the REST API, // using multi-part upload and beginning to transfer each part as soon as it is large enough. // All write operations file operations must be sequential and contiguous. @@ -96,7 +106,7 @@ public: data = (const uint8_t *)data + finishlen; // End current part (and start new one) - Void _ = wait(f->endCurrentPart(true)); + Void _ = wait(f->endCurrentPart(f.getPtr(), true)); p = f->m_parts.back().getPtr(); } @@ -109,7 +119,7 @@ public: throw non_sequential_op(); m_cursor += length; - return write_impl(Reference::addRef(this), (const uint8_t *)data, length); + return m_error.getFuture() || write_impl(Reference::addRef(this), (const uint8_t *)data, length); } virtual Future truncate( int64_t size ) { @@ -119,14 +129,10 @@ public: } ACTOR static Future doPartUpload(AsyncFileBlobStoreWrite *f, Part *p) { - try { - p->finalizeMD5(); - std::string upload_id = wait(f->getUploadID()); - std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string)); - return etag; - } catch(Error &e) { - throw; - } + p->finalizeMD5(); + std::string upload_id = wait(f->getUploadID()); + std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string)); + return etag; } ACTOR static Future doFinishUpload(AsyncFileBlobStoreWrite* f) { @@ -139,7 +145,7 @@ public: } // There are at least 2 parts. End the last part (which could be empty) - Void _ = wait(f->endCurrentPart()); + Void _ = wait(f->endCurrentPart(f)); state BlobStoreEndpoint::MultiPartSetT partSet; state std::vector>::iterator p; @@ -208,17 +214,26 @@ private: Future m_upload_id; Future m_finished; std::vector> m_parts; + Promise m_error; + FlowLock m_concurrentUploads; - Future endCurrentPart(bool startNew = false) { - if(m_parts.back()->length == 0) + // End the current part and start uploading it, but also wait for a part to finish if too many are in transit. + ACTOR static Future endCurrentPart(AsyncFileBlobStoreWrite *f, bool startNew = false) { + if(f->m_parts.back()->length == 0) return Void(); - // Start the upload - m_parts.back()->etag = doPartUpload(this, m_parts.back().getPtr()); + // Wait for an upload slot to be available + Void _ = wait(f->m_concurrentUploads.take(1)); + + // Do the upload, and if it fails forward errors to m_error and also stop if anything else sends an error to m_error + // Also, hold a releaser for the concurrent upload slot while all that is going on. + f->m_parts.back()->etag = holdWhile(std::shared_ptr(new FlowLock::Releaser(f->m_concurrentUploads, 1)), + joinErrorGroup(doPartUpload(f, f->m_parts.back().getPtr()), f->m_error) + ); // Make a new part to write to if(startNew) - m_parts.push_back(Reference(new Part(m_parts.size() + 1))); + f->m_parts.push_back(Reference(new Part(f->m_parts.size() + 1))); return Void(); } @@ -231,7 +246,7 @@ private: public: AsyncFileBlobStoreWrite(Reference bstore, std::string bucket, std::string object) - : m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0) { + : m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) { // Add first part m_parts.push_back(Reference(new Part(1))); diff --git a/fdbrpc/BlobStore.actor.cpp b/fdbrpc/BlobStore.actor.cpp index 7b70395250..2ca1ffcb71 100644 --- a/fdbrpc/BlobStore.actor.cpp +++ b/fdbrpc/BlobStore.actor.cpp @@ -59,6 +59,7 @@ BlobStoreEndpoint::BlobKnobs::BlobKnobs() { multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE; concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS; concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE; + concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE; read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE; read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS; read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE; @@ -80,6 +81,7 @@ bool BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) { TRY_PARAM(multipart_min_part_size, minps); TRY_PARAM(concurrent_uploads, cu); TRY_PARAM(concurrent_reads_per_file, crpf); + TRY_PARAM(concurrent_writes_per_file, cwpf); TRY_PARAM(read_block_size, rbs); TRY_PARAM(read_ahead_blocks, rab); TRY_PARAM(read_cache_blocks_per_file, rcb); @@ -105,6 +107,7 @@ std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const { _CHECK_PARAM(multipart_min_part_size, minps); _CHECK_PARAM(concurrent_uploads, cu); _CHECK_PARAM(concurrent_reads_per_file, crpf); + _CHECK_PARAM(concurrent_writes_per_file, cwpf); _CHECK_PARAM(read_block_size, rbs); _CHECK_PARAM(read_ahead_blocks, rab); _CHECK_PARAM(read_cache_blocks_per_file, rcb); diff --git a/fdbrpc/BlobStore.h b/fdbrpc/BlobStore.h index 26bc3f5431..13c4a65e5f 100644 --- a/fdbrpc/BlobStore.h +++ b/fdbrpc/BlobStore.h @@ -57,6 +57,7 @@ public: multipart_min_part_size, concurrent_uploads, concurrent_reads_per_file, + concurrent_writes_per_file, read_block_size, read_ahead_blocks, read_cache_blocks_per_file, @@ -77,6 +78,7 @@ public: "multipart_min_part_size (or minps) Min part size for multipart uploads.", "concurrent_uploads (or cu) Max concurrent uploads (part or whole) that can be in progress at once.", "concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.", + "concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.", "read_block_size (or rbs) Block size in bytes to be used for reads.", "read_ahead_blocks (or rab) Number of blocks to read ahead of requested offset.", "read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.",