Improved error handling and memory usage in AsyncFileBlobStoreWrite. Writes will now fail if any upload has already failed, rather than buffering unboundedly until sync() is called to complete the file. There is also a configurable limit on how many uploads can be pending before writes will stall waiting for one to finish.

This commit is contained in:
Stephen Atherton 2017-10-18 05:51:30 -07:00
parent ebd0234514
commit ef84e52127
5 changed files with 39 additions and 17 deletions

View File

@ -144,6 +144,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BLOBSTORE_CONCURRENT_UPLOADS, BACKUP_TASKS_PER_AGENT*2 );
init( BLOBSTORE_CONCURRENT_WRITES_PER_FILE, 5 );
init( BLOBSTORE_CONCURRENT_READS_PER_FILE, 3 );
init( BLOBSTORE_READ_BLOCK_SIZE, 1024 * 1024 );
init( BLOBSTORE_READ_AHEAD_BLOCKS, 0 );

View File

@ -150,6 +150,7 @@ public:
int BLOBSTORE_MULTIPART_MAX_PART_SIZE;
int BLOBSTORE_MULTIPART_MIN_PART_SIZE;
int BLOBSTORE_CONCURRENT_UPLOADS;
int BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
int BLOBSTORE_CONCURRENT_READS_PER_FILE;
int BLOBSTORE_READ_BLOCK_SIZE;
int BLOBSTORE_READ_AHEAD_BLOCKS;

View File

@ -38,6 +38,16 @@
#include "md5/md5.h"
#include "libb64/encode.h"
ACTOR template<typename T> static Future<T> joinErrorGroup(Future<T> f, Promise<Void> p) {
try {
Void _ = wait(success(f) || p.getFuture());
return f.get();
} catch(Error &e) {
if(p.canBeSet())
p.sendError(e);
throw;
}
}
// This class represents a write-only file that lives in an S3-style blob store. It writes using the REST API,
// using multi-part upload and beginning to transfer each part as soon as it is large enough.
// All write operations file operations must be sequential and contiguous.
@ -96,7 +106,7 @@ public:
data = (const uint8_t *)data + finishlen;
// End current part (and start new one)
Void _ = wait(f->endCurrentPart(true));
Void _ = wait(f->endCurrentPart(f.getPtr(), true));
p = f->m_parts.back().getPtr();
}
@ -109,7 +119,7 @@ public:
throw non_sequential_op();
m_cursor += length;
return write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
return m_error.getFuture() || write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
}
virtual Future<Void> truncate( int64_t size ) {
@ -119,14 +129,10 @@ public:
}
ACTOR static Future<std::string> doPartUpload(AsyncFileBlobStoreWrite *f, Part *p) {
try {
p->finalizeMD5();
std::string upload_id = wait(f->getUploadID());
std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string));
return etag;
} catch(Error &e) {
throw;
}
p->finalizeMD5();
std::string upload_id = wait(f->getUploadID());
std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string));
return etag;
}
ACTOR static Future<Void> doFinishUpload(AsyncFileBlobStoreWrite* f) {
@ -139,7 +145,7 @@ public:
}
// There are at least 2 parts. End the last part (which could be empty)
Void _ = wait(f->endCurrentPart());
Void _ = wait(f->endCurrentPart(f));
state BlobStoreEndpoint::MultiPartSetT partSet;
state std::vector<Reference<Part>>::iterator p;
@ -208,17 +214,26 @@ private:
Future<std::string> m_upload_id;
Future<Void> m_finished;
std::vector<Reference<Part>> m_parts;
Promise<Void> m_error;
FlowLock m_concurrentUploads;
Future<Void> endCurrentPart(bool startNew = false) {
if(m_parts.back()->length == 0)
// End the current part and start uploading it, but also wait for a part to finish if too many are in transit.
ACTOR static Future<Void> endCurrentPart(AsyncFileBlobStoreWrite *f, bool startNew = false) {
if(f->m_parts.back()->length == 0)
return Void();
// Start the upload
m_parts.back()->etag = doPartUpload(this, m_parts.back().getPtr());
// Wait for an upload slot to be available
Void _ = wait(f->m_concurrentUploads.take(1));
// Do the upload, and if it fails forward errors to m_error and also stop if anything else sends an error to m_error
// Also, hold a releaser for the concurrent upload slot while all that is going on.
f->m_parts.back()->etag = holdWhile(std::shared_ptr<FlowLock::Releaser>(new FlowLock::Releaser(f->m_concurrentUploads, 1)),
joinErrorGroup(doPartUpload(f, f->m_parts.back().getPtr()), f->m_error)
);
// Make a new part to write to
if(startNew)
m_parts.push_back(Reference<Part>(new Part(m_parts.size() + 1)));
f->m_parts.push_back(Reference<Part>(new Part(f->m_parts.size() + 1)));
return Void();
}
@ -231,7 +246,7 @@ private:
public:
AsyncFileBlobStoreWrite(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object)
: m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0) {
: m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) {
// Add first part
m_parts.push_back(Reference<Part>(new Part(1)));

View File

@ -59,6 +59,7 @@ BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE;
concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS;
concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE;
concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE;
read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS;
read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE;
@ -80,6 +81,7 @@ bool BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
TRY_PARAM(multipart_min_part_size, minps);
TRY_PARAM(concurrent_uploads, cu);
TRY_PARAM(concurrent_reads_per_file, crpf);
TRY_PARAM(concurrent_writes_per_file, cwpf);
TRY_PARAM(read_block_size, rbs);
TRY_PARAM(read_ahead_blocks, rab);
TRY_PARAM(read_cache_blocks_per_file, rcb);
@ -105,6 +107,7 @@ std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
_CHECK_PARAM(multipart_min_part_size, minps);
_CHECK_PARAM(concurrent_uploads, cu);
_CHECK_PARAM(concurrent_reads_per_file, crpf);
_CHECK_PARAM(concurrent_writes_per_file, cwpf);
_CHECK_PARAM(read_block_size, rbs);
_CHECK_PARAM(read_ahead_blocks, rab);
_CHECK_PARAM(read_cache_blocks_per_file, rcb);

View File

@ -57,6 +57,7 @@ public:
multipart_min_part_size,
concurrent_uploads,
concurrent_reads_per_file,
concurrent_writes_per_file,
read_block_size,
read_ahead_blocks,
read_cache_blocks_per_file,
@ -77,6 +78,7 @@ public:
"multipart_min_part_size (or minps) Min part size for multipart uploads.",
"concurrent_uploads (or cu) Max concurrent uploads (part or whole) that can be in progress at once.",
"concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.",
"concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.",
"read_block_size (or rbs) Block size in bytes to be used for reads.",
"read_ahead_blocks (or rab) Number of blocks to read ahead of requested offset.",
"read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.",