Merge branch 'master' into bindings-versionstamps-in-tuples

This commit is contained in:
Alec Grieser 2017-10-24 18:05:10 -07:00
commit 50e41c3968
13 changed files with 230 additions and 193 deletions

View File

@ -1145,6 +1145,7 @@ namespace fileBackup {
// Enable the stop key
state Version readVersion = wait(tr->getReadVersion());
config.stopVersion().set(tr, readVersion);
TraceEvent(SevInfo, "FBA_setStopVersion").detail("stopVersion", readVersion);
Void _ = wait(taskBucket->finish(tr, task));
@ -1755,6 +1756,8 @@ namespace fileBackup {
Void _ = wait(mf->sync());
std::string fileName = format("kvmanifest,%lld,%lld,%lld,%s", minVer, maxVer, totalBytes, g_random->randomUniqueID().toString().c_str());
TraceEvent(SevInfo, "FBA_KVManifest").detail("fileName", fileName.c_str());
Void _ = wait(bc->renameFile(tempFile, fileName));
return Void();
@ -3261,9 +3264,7 @@ public:
// To be consistent with directory handling behavior since FDB backup was first released, if the container string
// describes a local directory then "/backup-UID" will be added to it.
if(backupContainer.find("file://") == 0) {
if(backupContainer[backupContainer.size() - 1] != '/')
backupContainer += "/";
backupContainer += std::string("backup-") + nowStr.toString();
backupContainer = joinPath(backupContainer, std::string("backup-") + nowStr.toString());
}
Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupContainer);
@ -3429,7 +3430,7 @@ public:
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeBackupTag(tagName.toString());
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr));
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
state BackupConfig config(current.first);
state EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
@ -3457,7 +3458,7 @@ public:
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeBackupTag(tagName);
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr));
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
state BackupConfig config(current.first);
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));

View File

@ -120,11 +120,13 @@ public:
// Get property's value or throw error if it doesn't exist
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr, bool snapshot = false, Error err = key_not_found()) const {
auto keyCopy = key;
auto backtrace = platform::get_backtrace();
return map(get(tr, snapshot), [=](Optional<T> val) -> T {
if (!val.present()) {
TraceEvent(SevError, "KeyBackedProperty keyNotFound")
TraceEvent(SevInfo, "KeyBackedProperty_keyNotFound")
.detail("key", printable(keyCopy))
.detail("err", err.code());
.detail("err", err.code())
.detail("parentTrace", backtrace.c_str());
throw err;
}
@ -156,7 +158,7 @@ public:
Future<Void> set(Database cx, T const &val) {
auto _key = key;
auto _val = Codec<T>::pack(val).pack();
Value _val = Codec<T>::pack(val).pack();
return runRYWTransaction(cx, [_key, _val](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);

View File

@ -144,6 +144,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BLOBSTORE_CONCURRENT_UPLOADS, BACKUP_TASKS_PER_AGENT*2 );
init( BLOBSTORE_CONCURRENT_WRITES_PER_FILE, 5 );
init( BLOBSTORE_CONCURRENT_READS_PER_FILE, 3 );
init( BLOBSTORE_READ_BLOCK_SIZE, 1024 * 1024 );
init( BLOBSTORE_READ_AHEAD_BLOCKS, 0 );

View File

@ -150,6 +150,7 @@ public:
int BLOBSTORE_MULTIPART_MAX_PART_SIZE;
int BLOBSTORE_MULTIPART_MIN_PART_SIZE;
int BLOBSTORE_CONCURRENT_UPLOADS;
int BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
int BLOBSTORE_CONCURRENT_READS_PER_FILE;
int BLOBSTORE_READ_BLOCK_SIZE;
int BLOBSTORE_READ_AHEAD_BLOCKS;

View File

@ -304,7 +304,10 @@ public:
}
ACTOR static Future<bool> doTask(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
if (task && TaskFuncBase::isValidTask(task)) {
if (!task || !TaskFuncBase::isValidTask(task))
return false;
try {
state Reference<TaskFuncBase> taskFunc = TaskFuncBase::create(task->params[Task::reservedTaskParamKeyType]);
if (taskFunc) {
state bool verifyTask = (task->params.find(Task::reservedTaskParamValidKey) != task->params.end());
@ -358,10 +361,8 @@ public:
state Reference<ReadYourWritesTransaction> tr3(new ReadYourWritesTransaction(cx));
taskBucket->setOptions(tr3);
Version version = wait(tr3->getReadVersion());
if(version >= task->timeout) {
TraceEvent(SevWarn, "TB_ExecuteTimedOut").detail("TaskType", printable(task->params[Task::reservedTaskParamKeyType]));
return true;
}
if(version >= task->timeout)
throw timed_out();
// Otherwise reset the timeout
timeout = delay((BUGGIFY ? (2 * g_random->random01()) : 1.0) * (double)(task->timeout - (uint64_t)versionNow) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND);
}
@ -370,12 +371,19 @@ public:
if (BUGGIFY) Void _ = wait(delay(10.0));
Void _ = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
return finishTaskRun(tr, taskBucket, futureBucket, task, taskFunc, verifyTask); }));
return true;
return finishTaskRun(tr, taskBucket, futureBucket, task, taskFunc, verifyTask);
}));
}
} catch(Error &e) {
TraceEvent(SevWarn, "TB_ExecuteFailure")
.detail("TaskUID", task->key.printable())
.detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable())
.detail("Priority", task->getPriority())
.error(e);
}
return false;
// Return true to indicate that we did work.
return true;
}
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {

View File

@ -38,6 +38,16 @@
#include "md5/md5.h"
#include "libb64/encode.h"
ACTOR template<typename T> static Future<T> joinErrorGroup(Future<T> f, Promise<Void> p) {
try {
Void _ = wait(success(f) || p.getFuture());
return f.get();
} catch(Error &e) {
if(p.canBeSet())
p.sendError(e);
throw;
}
}
// This class represents a write-only file that lives in an S3-style blob store. It writes using the REST API,
// using multi-part upload and beginning to transfer each part as soon as it is large enough.
// All write operations file operations must be sequential and contiguous.
@ -96,7 +106,7 @@ public:
data = (const uint8_t *)data + finishlen;
// End current part (and start new one)
Void _ = wait(f->endCurrentPart(true));
Void _ = wait(f->endCurrentPart(f.getPtr(), true));
p = f->m_parts.back().getPtr();
}
@ -109,7 +119,7 @@ public:
throw non_sequential_op();
m_cursor += length;
return write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
return m_error.getFuture() || write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
}
virtual Future<Void> truncate( int64_t size ) {
@ -119,14 +129,10 @@ public:
}
ACTOR static Future<std::string> doPartUpload(AsyncFileBlobStoreWrite *f, Part *p) {
try {
p->finalizeMD5();
std::string upload_id = wait(f->getUploadID());
std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string));
return etag;
} catch(Error &e) {
throw;
}
p->finalizeMD5();
std::string upload_id = wait(f->getUploadID());
std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string));
return etag;
}
ACTOR static Future<Void> doFinishUpload(AsyncFileBlobStoreWrite* f) {
@ -139,7 +145,7 @@ public:
}
// There are at least 2 parts. End the last part (which could be empty)
Void _ = wait(f->endCurrentPart());
Void _ = wait(f->endCurrentPart(f));
state BlobStoreEndpoint::MultiPartSetT partSet;
state std::vector<Reference<Part>>::iterator p;
@ -208,17 +214,26 @@ private:
Future<std::string> m_upload_id;
Future<Void> m_finished;
std::vector<Reference<Part>> m_parts;
Promise<Void> m_error;
FlowLock m_concurrentUploads;
Future<Void> endCurrentPart(bool startNew = false) {
if(m_parts.back()->length == 0)
// End the current part and start uploading it, but also wait for a part to finish if too many are in transit.
ACTOR static Future<Void> endCurrentPart(AsyncFileBlobStoreWrite *f, bool startNew = false) {
if(f->m_parts.back()->length == 0)
return Void();
// Start the upload
m_parts.back()->etag = doPartUpload(this, m_parts.back().getPtr());
// Wait for an upload slot to be available
Void _ = wait(f->m_concurrentUploads.take(1));
// Do the upload, and if it fails forward errors to m_error and also stop if anything else sends an error to m_error
// Also, hold a releaser for the concurrent upload slot while all that is going on.
f->m_parts.back()->etag = holdWhile(std::shared_ptr<FlowLock::Releaser>(new FlowLock::Releaser(f->m_concurrentUploads, 1)),
joinErrorGroup(doPartUpload(f, f->m_parts.back().getPtr()), f->m_error)
);
// Make a new part to write to
if(startNew)
m_parts.push_back(Reference<Part>(new Part(m_parts.size() + 1)));
f->m_parts.push_back(Reference<Part>(new Part(f->m_parts.size() + 1)));
return Void();
}
@ -231,7 +246,7 @@ private:
public:
AsyncFileBlobStoreWrite(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object)
: m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0) {
: m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) {
// Add first part
m_parts.push_back(Reference<Part>(new Part(1)));

View File

@ -59,6 +59,7 @@ BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE;
concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS;
concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE;
concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE;
read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS;
read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE;
@ -80,6 +81,7 @@ bool BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
TRY_PARAM(multipart_min_part_size, minps);
TRY_PARAM(concurrent_uploads, cu);
TRY_PARAM(concurrent_reads_per_file, crpf);
TRY_PARAM(concurrent_writes_per_file, cwpf);
TRY_PARAM(read_block_size, rbs);
TRY_PARAM(read_ahead_blocks, rab);
TRY_PARAM(read_cache_blocks_per_file, rcb);
@ -105,6 +107,7 @@ std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
_CHECK_PARAM(multipart_min_part_size, minps);
_CHECK_PARAM(concurrent_uploads, cu);
_CHECK_PARAM(concurrent_reads_per_file, crpf);
_CHECK_PARAM(concurrent_writes_per_file, cwpf);
_CHECK_PARAM(read_block_size, rbs);
_CHECK_PARAM(read_ahead_blocks, rab);
_CHECK_PARAM(read_cache_blocks_per_file, rcb);
@ -195,7 +198,7 @@ Reference<BlobStoreEndpoint> BlobStoreEndpoint::fromString(std::string const &ur
} catch(std::string &err) {
if(error != nullptr)
*error = err;
TraceEvent(SevWarn, "BlobStoreEndpoint").detail("Description", err).detail("Format", getURLFormat()).detail("URL", url);
TraceEvent(SevWarnAlways, "BlobStoreEndpoint").detail("Description", err).detail("Format", getURLFormat()).detail("URL", url);
throw file_not_found();
}
}
@ -247,13 +250,8 @@ ACTOR Future<bool> objectExists_impl(Reference<BlobStoreEndpoint> b, std::string
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0));
if(r->code == 200)
return true;
if(r->code == 404)
return false;
throw http_bad_response();
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200, 404}));
return r->code == 200;
}
Future<bool> BlobStoreEndpoint::objectExists(std::string const &bucket, std::string const &object) {
@ -263,11 +261,9 @@ Future<bool> BlobStoreEndpoint::objectExists(std::string const &bucket, std::str
ACTOR Future<Void> deleteObject_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0));
// 200 means object deleted, 404 means it doesn't exist, so we'll call that success as well
if(r->code == 200 || r->code == 404)
return Void();
throw http_bad_response();
Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0, {200, 404}));
// 200 means object deleted, 404 means it doesn't exist already, so either success code passed above is fine.
return Void();
}
Future<Void> BlobStoreEndpoint::deleteObject(std::string const &bucket, std::string const &object) {
@ -304,9 +300,7 @@ ACTOR Future<int64_t> objectSize_impl(Reference<BlobStoreEndpoint> b, std::strin
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0));
if(r->code != 200)
throw io_error();
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200}));
return r->contentLen;
}
@ -335,7 +329,7 @@ Future<Reference<IConnection>> BlobStoreEndpoint::connect( NetworkAddress addres
// Do a request, get a Response.
// Request content is provided as UnsentPacketQueue *pContent which will be depleted as bytes are sent but the queue itself must live for the life of this actor
// and be destroyed by the caller
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoint> bstore, std::string verb, std::string resource, HTTP::Headers headers, UnsentPacketQueue *pContent, int contentLen) {
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoint> bstore, std::string verb, std::string resource, HTTP::Headers headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes) {
state UnsentPacketQueue contentCopy;
// Set content length header if there is content
@ -345,11 +339,13 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
Void _ = wait(bstore->concurrentRequests.take(1));
state FlowLock::Releaser globalReleaser(bstore->concurrentRequests, 1);
state int tries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries);
state double retryDelay = 2.0;
state int maxTries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries);
state int thisTry = 1;
state double nextRetryDelay = 2.0;
state NetworkAddress address;
loop {
state Optional<Error> err;
try {
// Pick an adress
address = bstore->addresses[g_random->randomInt(0, bstore->addresses.size())];
@ -382,56 +378,85 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
Void _ = wait(bstore->requestRate->getAllowance(1));
state Reference<HTTP::Response> r = wait(timeoutError(HTTP::doRequest(conn, verb, resource, headers, &contentCopy, contentLen, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate), bstore->knobs.request_timeout));
std::string connectionHeader;
HTTP::Headers::iterator i = r->headers.find("Connection");
if(i != r->headers.end())
connectionHeader = i->second;
// If the response parsed successfully (which is why we reached this point) and the connection can be reused, put the connection in the connection_pool
if(connectionHeader != "close")
// Since the response was parsed successfully (which is why we are here) reuse the connection unless we received the "Connection: close" header.
if(r->headers["Connection"] != "close")
bstore->connectionPool[address].push_back(BlobStoreEndpoint::ConnPoolEntry(conn, now()));
// Handle retry-after response code
if(r->code == 429) {
bstore->s_stats.requests_failed++;
conn = Reference<IConnection>();
double d = 60;
if(r->headers.count("Retry-After"))
d = atof(r->headers["Retry-After"].c_str());
Void _ = wait(delay(d));
// Just continue, don't throw an error, don't decrement tries
}
else if(r->code == 406) {
// Blob returns this when the account doesn't exist
throw http_not_accepted();
}
else if(r->code == 500 || r->code == 503) {
// For these errors just treat it like connection_failed
throw connection_failed();
}
else
break;
conn.clear();
} catch(Error &e) {
// If the error is connection failed and a retry is allowed then ignore the error
if((e.code() == error_code_connection_failed || e.code() == error_code_timed_out) && --tries > 0) {
bstore->s_stats.requests_failed++;
TraceEvent(SevWarn, "BlobStoreHTTPConnectionFailed").detail("RemoteEndpoint", address).detail("Verb", verb).detail("Resource", resource).suppressFor(5.0, true);
Void _ = wait(delay(retryDelay));
retryDelay *= 2;
retryDelay = std::min(retryDelay, 60.0);
}
// For timeouts, conn failure, or bad reponse reported by HTTP:doRequest, save the error and handle it / possibly retry below.
// Any other error is rethrown.
if(e.code() == error_code_connection_failed || e.code() == error_code_timed_out || e.code() == error_code_http_bad_response)
err = e;
else
throw;
}
// If err is not present then r is valid.
// If r->code is in successCodes then record the successful request and return r.
if(!err.present() && successCodes.count(r->code) != 0) {
bstore->s_stats.requests_successful++;
return r;
}
// Otherwise, this request is considered failed. Update failure count.
bstore->s_stats.requests_failed++;
// All errors in err are potentially retryable as well as certain HTTP response codes...
bool retryable = err.present() || r->code == 500 || r->code == 503;
// But only if our previous attempt was not the last allowable try.
retryable = retryable && (thisTry < maxTries);
TraceEvent event(retryable ? SevWarn : SevWarnAlways, retryable ? "BlobStoreEndpointRequestFailedRetryable" : "BlobStoreEndpointRequestFailed");
event.detail("RemoteEndpoint", address)
.detail("Verb", verb)
.detail("Resource", resource)
.detail("ThisTry", thisTry)
.suppressFor(5, true);
// We will wait delay seconds before the next retry, start with nextRetryDelay.
double delay = nextRetryDelay;
// Double but limit the *next* nextRetryDelay.
nextRetryDelay = std::min(nextRetryDelay * 2, 60.0);
// Attach err to trace event if present, otherwise extract some stuff from the response
if(err.present())
event.error(err.get());
else {
event.detail("ResponseCode", r->code);
// Check for the Retry-After header which is present with certain types of errors
auto iRetryAfter = r->headers.find("Retry-After");
if(iRetryAfter != r->headers.end()) {
event.detail("RetryAfterHeader", iRetryAfter->second);
char *pEnd;
double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd);
if(*pEnd) // If there were other characters then don't trust the parsed value, use a probably safe value of 5 minutes.
retryAfter = 300;
delay = std::max(delay, retryAfter);
}
}
// For retryable errors, log the delay then wait.
if(retryable) {
event.detail("RetryDelay", delay);
Void _ = wait(::delay(delay));
}
else {
// We can't retry, so throw something.
// This error code means the authentication header was not accepted, likely the account or key is wrong.
if(r->code == 406)
throw http_not_accepted();
throw http_request_failed();
}
}
bstore->s_stats.requests_successful++;
return r;
}
Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen) {
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen);
Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes) {
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen, successCodes);
}
ACTOR Future<Void> getBucketContentsStream_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, PromiseStream<BlobStoreEndpoint::ObjectInfo> results) {
@ -442,7 +467,7 @@ ACTOR Future<Void> getBucketContentsStream_impl(Reference<BlobStoreEndpoint> bst
while(more) {
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0));
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0, {200}));
try {
// Parse the json assuming it is valid and contains the right stuff. If any exceptions are thrown, throw http_bad_response
@ -573,12 +598,10 @@ void BlobStoreEndpoint::setAuthHeaders(std::string const &verb, std::string cons
ACTOR Future<std::string> readEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0));
if(r->code == 200)
return r->content;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 404}));
if(r->code == 404)
throw file_not_found();
throw http_bad_response();
return r->content;
}
Future<std::string> BlobStoreEndpoint::readEntireFile(std::string const &bucket, std::string const &object) {
@ -599,16 +622,13 @@ ACTOR Future<Void> writeEntireFileFromBuffer_impl(Reference<BlobStoreEndpoint> b
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200}));
// For uploads, Blobstore returns an MD5 sum of uploaded content so check that too.
auto sum = r->headers.find("Content-MD5");
if(sum == r->headers.end() || sum->second != contentMD5)
throw http_bad_response();
// For uploads, Blobstore returns an MD5 sum of uploaded content so check it.
if(r->headers["Content-MD5"] != contentMD5)
throw checksum_failed();
if(r->code == 200)
return Void();
throw http_bad_response();
return Void();
}
ACTOR Future<Void> writeEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string content) {
@ -649,9 +669,7 @@ ACTOR Future<int> readObject_impl(Reference<BlobStoreEndpoint> bstore, std::stri
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1);
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0));
if(r->code != 200 && r->code != 206)
throw file_not_readable();
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 206}));
if(r->contentLen != r->content.size()) // Double check that this wasn't a header-only response, probably unnecessary
throw io_error();
// Copy the output bytes, server could have sent more or less bytes than requested so copy at most length bytes
@ -666,9 +684,7 @@ Future<int> BlobStoreEndpoint::readObject(std::string const &bucket, std::string
ACTOR static Future<std::string> beginMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object + "?uploads";
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0));
if(r->code != 200)
throw file_not_writable();
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0, {200}));
int start = r->content.find("<UploadId>");
if(start == std::string::npos)
throw http_bad_response();
@ -691,16 +707,15 @@ ACTOR Future<std::string> uploadPart_impl(Reference<BlobStoreEndpoint> bstore, s
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200}));
// TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry
// will see error 400. That could be detected and handled gracefully by retrieving the etag for the successful request.
// For uploads, Blobstore returns an MD5 sum of uploaded content so check that too.
auto sum = r->headers.find("Content-MD5");
if(sum == r->headers.end() || sum->second != contentMD5)
throw http_bad_response();
if(r->code != 200)
throw http_bad_response();
// For uploads, Blobstore returns an MD5 sum of uploaded content so check it.
if(r->headers["Content-MD5"] != contentMD5)
throw checksum_failed();
// No etag -> bad response.
std::string etag = r->headers["ETag"];
if(etag.empty())
throw http_bad_response();
@ -724,10 +739,10 @@ ACTOR Future<Void> finishMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstor
HTTP::Headers headers;
PacketWriter pw(part_list.getWriteBuffer(), NULL, Unversioned());
pw.serializeBytes(manifest);
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size()));
if(r->code != 200)
throw http_bad_response();
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), {200}));
// TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry
// will see error 400. That could be detected and handled gracefully by HEAD'ing the object before upload to get its (possibly
// nonexistent) eTag, then if an error 400 is seen then retrieve the eTag again and if it has changed then consider the finish complete.
return Void();
}

View File

@ -57,6 +57,7 @@ public:
multipart_min_part_size,
concurrent_uploads,
concurrent_reads_per_file,
concurrent_writes_per_file,
read_block_size,
read_ahead_blocks,
read_cache_blocks_per_file,
@ -77,6 +78,7 @@ public:
"multipart_min_part_size (or minps) Min part size for multipart uploads.",
"concurrent_uploads (or cu) Max concurrent uploads (part or whole) that can be in progress at once.",
"concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.",
"concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.",
"read_block_size (or rbs) Block size in bytes to be used for reads.",
"read_ahead_blocks (or rab) Number of blocks to read ahead of requested offset.",
"read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.",
@ -149,8 +151,8 @@ public:
// Do an HTTP request to the Blob Store, read the response. Handles authentication.
// Every blob store interaction should ultimately go through this function
Future<Reference<HTTP::Response>> doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen);
Future<Reference<HTTP::Response>> doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes);
struct ObjectInfo {
std::string bucket;
std::string name;

View File

@ -1373,22 +1373,18 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
#define TIME_KEEPER_VERSION LiteralStringRef("1")
ACTOR Future<Void> timeKeeperSetVersion(ClusterControllerData *self) {
try {
loop {
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(
new ReadYourWritesTransaction(self->cx));
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->set(timeKeeperVersionKey, TIME_KEEPER_VERSION);
Void _ = wait(tr->commit());
break;
} catch (Error &e) {
Void _ = wait(tr->onError(e));
}
loop {
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(
new ReadYourWritesTransaction(self->cx));
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->set(timeKeeperVersionKey, TIME_KEEPER_VERSION);
Void _ = wait(tr->commit());
break;
} catch (Error &e) {
Void _ = wait(tr->onError(e));
}
} catch (Error & e) {
TraceEvent(SevWarnAlways, "TimeKeeperSetupVersionFailed").detail("cause", e.what());
}
return Void();
@ -1405,36 +1401,31 @@ ACTOR Future<Void> timeKeeper(ClusterControllerData *self) {
Void _ = wait(timeKeeperSetVersion(self));
loop {
try {
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(self->cx));
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(self->cx));
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> disableValue = wait( tr->get(timeKeeperDisableKey) );
if(disableValue.present()) {
break;
}
Version v = tr->getReadVersion().get();
int64_t currentTime = (int64_t)now();
versionMap.set(tr, currentTime, v);
int64_t ttl = currentTime - SERVER_KNOBS->TIME_KEEPER_DELAY * SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES;
if (ttl > 0) {
versionMap.erase(tr, 0, ttl);
}
Void _ = wait(tr->commit());
Optional<Value> disableValue = wait( tr->get(timeKeeperDisableKey) );
if(disableValue.present()) {
break;
} catch (Error &e) {
Void _ = wait(tr->onError(e));
}
Version v = tr->getReadVersion().get();
int64_t currentTime = (int64_t)now();
versionMap.set(tr, currentTime, v);
int64_t ttl = currentTime - SERVER_KNOBS->TIME_KEEPER_DELAY * SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES;
if (ttl > 0) {
versionMap.erase(tr, 0, ttl);
}
Void _ = wait(tr->commit());
break;
} catch (Error &e) {
Void _ = wait(tr->onError(e));
}
} catch (Error &e) {
// Failed to update time-version map even after retries, just ignore this iteration
TraceEvent(SevWarn, "TimeKeeperFailed").detail("cause", e.what());
}
Void _ = wait(delay(SERVER_KNOBS->TIME_KEEPER_DELAY));

View File

@ -122,26 +122,26 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
Void _ = wait( delay( startDelay ));
if (startDelay || BUGGIFY) {
TraceEvent("BARW_doBackup abortBackup1", randomID).detail("tag", printable(tag)).detail("startDelay", startDelay);
TraceEvent("BARW_doBackupAbortBackup1", randomID).detail("tag", printable(tag)).detail("startDelay", startDelay);
try {
Void _ = wait(backupAgent->abortBackup(cx, tag.toString()));
}
catch (Error& e) {
TraceEvent("BARW_doBackup abortBackup Exception", randomID).detail("tag", printable(tag)).error(e);
TraceEvent("BARW_doBackupAbortBackupException", randomID).detail("tag", printable(tag)).error(e);
if (e.code() != error_code_backup_unneeded)
throw;
}
}
TraceEvent("BARW_doBackup submitBackup", randomID).detail("tag", printable(tag)).detail("stopWhenDone", stopDifferentialDelay ? "False" : "True");
TraceEvent("BARW_doBackupSubmitBackup", randomID).detail("tag", printable(tag)).detail("stopWhenDone", stopDifferentialDelay ? "False" : "True");
state std::string backupContainer = "file://simfdb/backups/";
try {
Void _ = wait(backupAgent->submitBackup(cx, StringRef(backupContainer), tag.toString(), backupRanges, stopDifferentialDelay ? false : true));
}
catch (Error& e) {
TraceEvent("BARW_doBackup submitBackup Exception", randomID).detail("tag", printable(tag)).error(e);
TraceEvent("BARW_doBackupSubmitBackupException", randomID).detail("tag", printable(tag)).error(e);
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
throw;
}
@ -152,17 +152,17 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
if (stopDifferentialDelay) {
TEST(!stopDifferentialFuture.isReady()); //Restore starts at specified time
Void _ = wait(stopDifferentialFuture);
TraceEvent("BARW_doBackup waitToDiscontinue", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
TraceEvent("BARW_doBackupWaitToDiscontinue", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
try {
if (BUGGIFY) {
state KeyBackedTag backupTag = makeBackupTag(tag.toString());
TraceEvent("BARW_doBackup waitForRestorable", randomID).detail("tag", backupTag.tagName);
TraceEvent("BARW_doBackupWaitForRestorable", randomID).detail("tag", backupTag.tagName);
// Wait until the backup is in a restorable state
state int resultWait = wait(backupAgent->waitBackup(cx, backupTag.tagName, false));
UidAndAbortedFlagT uidFlag = wait(backupTag.getOrThrow(cx));
state UID logUid = uidFlag.first;
state std::string lastBackupContainer = wait(BackupConfig(logUid).backupContainer().getOrThrow(cx));
state std::string lastBackupContainer = wait(BackupConfig(logUid).backupContainer().getOrThrow(cx, false, backup_unneeded()));
state std::string restorableFile = joinPath(lastBackupContainer, "restorable");
TraceEvent("BARW_lastBackupContainer", randomID).detail("backupTag", printable(tag)).detail("lastBackupContainer", lastBackupContainer)
@ -190,30 +190,30 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
// Abort the backup, if not the first backup because the second backup may have aborted the backup by now
if (startDelay) {
TraceEvent("BARW_doBackup abortBackup2", randomID).detail("tag", printable(tag))
TraceEvent("BARW_doBackupAbortBackup2", randomID).detail("tag", printable(tag))
.detail("waitStatus", resultWait).detail("lastBackupContainer", lastBackupContainer).detail("restorable", restorableFile);
Void _ = wait(backupAgent->abortBackup(cx, tag.toString()));
}
else {
TraceEvent("BARW_doBackup discontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
TraceEvent("BARW_doBackupDiscontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
Void _ = wait(backupAgent->discontinueBackup(cx, tag));
}
}
else {
TraceEvent("BARW_doBackup discontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
TraceEvent("BARW_doBackupDiscontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
Void _ = wait(backupAgent->discontinueBackup(cx, tag));
}
}
catch (Error& e) {
TraceEvent("BARW_doBackup discontinueBackup Exception", randomID).detail("tag", printable(tag)).error(e);
TraceEvent("BARW_doBackupDiscontinueBackupException", randomID).detail("tag", printable(tag)).error(e);
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
throw;
}
}
// Wait for the backup to complete
TraceEvent("BARW_doBackup waitBackup", randomID).detail("tag", printable(tag));
TraceEvent("BARW_doBackupWaitBackup", randomID).detail("tag", printable(tag));
state int statusValue = wait(backupAgent->waitBackup(cx, tag.toString(), true));
state std::string statusText;
@ -222,7 +222,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
statusText = _statusText;
// Can we validate anything about status?
TraceEvent("BARW_doBackup complete", randomID).detail("tag", printable(tag))
TraceEvent("BARW_doBackupComplete", randomID).detail("tag", printable(tag))
.detail("status", statusText).detail("statusValue", statusValue);
return Void();
@ -318,7 +318,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
extraBackup = backupAgent.submitBackup(cx, LiteralStringRef("file://simfdb/backups/"), self->backupTag.toString(), self->backupRanges, true);
}
catch (Error& e) {
TraceEvent("BARW_submitBackup2 Exception", randomID).detail("backupTag", printable(self->backupTag)).error(e);
TraceEvent("BARW_submitBackup2Exception", randomID).detail("backupTag", printable(self->backupTag)).error(e);
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
throw;
}
@ -376,23 +376,23 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
}
if (extraBackup.isValid()) {
TraceEvent("BARW_wait extraBackup", randomID).detail("backupTag", printable(self->backupTag));
TraceEvent("BARW_waitExtraBackup", randomID).detail("backupTag", printable(self->backupTag));
extraTasks = true;
try {
Void _ = wait(extraBackup);
}
catch (Error& e) {
TraceEvent("BARW_extraBackup Exception", randomID).detail("backupTag", printable(self->backupTag)).error(e);
TraceEvent("BARW_extraBackupException", randomID).detail("backupTag", printable(self->backupTag)).error(e);
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
throw;
}
TraceEvent("BARW_abortBackup extra", randomID).detail("backupTag", printable(self->backupTag));
TraceEvent("BARW_abortBackupExtra", randomID).detail("backupTag", printable(self->backupTag));
try {
Void _ = wait(backupAgent.abortBackup(cx, self->backupTag.toString()));
}
catch (Error& e) {
TraceEvent("BARW_abortBackup extra Exception", randomID).error(e);
TraceEvent("BARW_abortBackupExtraException", randomID).error(e);
if (e.code() != error_code_backup_unneeded)
throw;
}
@ -484,7 +484,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
break;
}
catch (Error &e) {
TraceEvent("BARW_check Exception", randomID).error(e);
TraceEvent("BARW_checkException", randomID).error(e);
Void _ = wait(tr->onError(e));
}
}

View File

@ -48,10 +48,10 @@ intptr_t g_stackYieldLimit = 0;
using namespace boost::asio::ip;
// These impact both communications and the deserialization of certain zookeeper, database and IKeyValueStore keys
// These impact both communications and the deserialization of certain database and IKeyValueStore keys
// xyzdev
// vvvv
uint64_t currentProtocolVersion = 0x0FDB00A551000001LL;
uint64_t currentProtocolVersion = 0x0FDB00A551010001LL;
uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -92,11 +92,12 @@ ERROR( file_not_writable, 1514, "File could not be written" )
ERROR( no_cluster_file_found, 1515, "No cluster file found in current directory or default location" )
ERROR( file_too_large, 1516, "File too large to be read" )
ERROR( non_sequential_op, 1517, "Non sequential file operation not allowed" )
ERROR( http_bad_response, 1518, "HTTP response was not valid" )
ERROR( http_bad_response, 1518, "HTTP response was badly formed" )
ERROR( http_not_accepted, 1519, "HTTP request not accepted" )
ERROR( checksum_failed, 1520, "Data does not match checksum" )
ERROR( io_timeout, 1521, "Disk i/o operation failed to complete in a timely manner" )
ERROR( file_corrupt, 1522, "Structurally corrupt data file detected" )
ERROR( checksum_failed, 1520, "A data checksum failed" )
ERROR( io_timeout, 1521, "A disk IO operation failed to complete in a timely manner" )
ERROR( file_corrupt, 1522, "A structurally corrupt data file was detected" )
ERROR( http_request_failed, 1523, "HTTP response code indicated failure" )
// 2xxx Attempt (presumably by a _client_) to do something illegal. If an error is known to
// be internally caused, it should be 41xx

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{643F0D4A-CBBC-41B6-9BEE-E6C477509403}'
Id='{193E8019-234D-4DB0-8241-F7F21EFE8104}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'