Backup tools and agent now accept blob account credentials via files containing JSON which are specified using command line arguments and/or an environment variable. Improved fdbbackup help, clarifying which options are for which operations. Fdbbackup operations which do not need to use a database no longer require a cluster file parameter. Added eat() commands to StringRef for incrementally tokenizing strings using separator strings.

This commit is contained in:
Stephen Atherton 2017-12-21 01:58:15 -08:00
parent c51de3bb88
commit e3aee45a74
7 changed files with 235 additions and 145 deletions

View File

@ -90,7 +90,7 @@ enum enumRestoreType {
//
enum {
// Backup constants
OPT_DESTCONTAINER, OPT_SNAPSHOTINTERVAL, OPT_ERRORLIMIT, OPT_NOSTOPWHENDONE, OPT_EXPVERSION, OPT_BASEURL, OPT_DATETIME,
OPT_DESTCONTAINER, OPT_SNAPSHOTINTERVAL, OPT_ERRORLIMIT, OPT_NOSTOPWHENDONE, OPT_EXPVERSION, OPT_BASEURL, OPT_DATETIME, OPT_BLOB_CREDENTIALS,
// Backup and Restore constants
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
@ -132,6 +132,7 @@ CSimpleOpt::SOption g_rgAgentOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -172,6 +173,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -329,6 +331,8 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -337,8 +341,6 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
@ -354,6 +356,8 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -362,8 +366,6 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
@ -379,6 +381,8 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -389,8 +393,6 @@ CSimpleOpt::SOption g_rgBackupListOptions[] = {
#endif
{ OPT_BASEURL, "-b", SO_REQ_SEP },
{ OPT_BASEURL, "--base_url", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
@ -404,6 +406,8 @@ CSimpleOpt::SOption g_rgBackupListOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -441,6 +445,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -681,23 +686,27 @@ void printBackupContainerInfo() {
static void printBackupUsage(bool devhelp) {
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
printf("Usage: %s (start | status | abort | wait | discontinue | pause | resume | expire | delete | describe | list) [OPTIONS]\n\n", exeBackup.toString().c_str());
printf(" -b, --base_url BASEURL\n"
" Base URL of the blob store.\n");
printf(" -C CONNFILE The path of a file containing the connection string for the\n"
" FoundationDB cluster. The default is first the value of the\n"
" FDB_CLUSTER_FILE environment variable, then `./fdb.cluster',\n"
" then `%s'.\n", platform::getDefaultClusterFilePath().c_str());
printf(" -D, --date DATETIME\n"
" For expire operations, delete all data prior to (approximately) the given timestamp in YYYY-MM-DD.HH:MI:SS format (UTC).\n");
printf(" -d, --destcontainer URL\n"
" The Backup URL for the operation.\n");
" The Backup container URL for start, describe, expire, and delete operations.\n");
printBackupContainerInfo();
printf(" -s DURATION When starting a backup, use snapshot interval DURATION in seconds. Defaults to %d.\n", CLIENT_KNOBS->BACKUP_DEFAULT_SNAPSHOT_INTERVAL_SEC);
printf(" -b, --base_url BASEURL\n"
" Base backup URL for list operations. This looks like a Backup URL but without a backup name.\n");
printf(" --blob_credentials FILE\n"
" File containing blob credentials in JSON format. Can be specified multiple times for multiple files.\n");
printf(" -D, --date DATETIME\n"
" Datetime cutoff for expire operations. Requires a cluster file and will use version/timestamp metadata\n"
" in the database to obtain a cutoff version very close to the timestamp given in YYYY-MM-DD.HH:MI:SS format (UTC).\n");
printf(" -u VERSION Version cutoff for expire operations. Deletes all backup files with data from < VERSION.\n");
printf(" -s, --snapshot_interval DURATION\n"
" For start operations, specifies the backup's target snapshot interval as DURATION seconds. Defaults to %d.\n", CLIENT_KNOBS->BACKUP_DEFAULT_SNAPSHOT_INTERVAL_SEC);
printf(" -e ERRORLIMIT The maximum number of errors printed by status (default is 10).\n");
printf(" -k KEYS List of key ranges to backup.\n"
" If not specified, the entire database will be backed up.\n");
printf(" -n, --dry-run Perform a trial run with no changes made.\n");
printf(" -u EXPVERSION Delete all data up to (but not including the given version).\n");
printf(" -n, --dry-run For start or restore operations, performs a trial run with no actual changes made.\n");
printf(" -v, --version Print version information and exit.\n");
printf(" -w, --wait Wait for the backup to complete (allowed with `start' and `discontinue').\n");
printf(" -z, --no-stop-when-done\n"
@ -2231,6 +2240,7 @@ int main(int argc, char* argv[]) {
LocalityData localities;
uint64_t memLimit = 8LL << 30;
Optional<uint64_t> ti;
std::vector<std::string> blobCredentials;
if( argc == 1 ) {
printUsage(programExe, false);
@ -2462,6 +2472,9 @@ int main(int argc, char* argv[]) {
}
memLimit = ti.get();
break;
case OPT_BLOB_CREDENTIALS:
blobCredentials.push_back(args->OptionArg());
break;
}
}
@ -2609,30 +2622,54 @@ int main(int argc, char* argv[]) {
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
return 1;
return FDB_EXIT_ERROR;
}
// Ordinarily, this is done when the network is run. However, network thread should be set before TraceEvents are logged. This thread will eventually run the network, so call it now.
TraceEvent::setNetworkThread();
auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile);
try {
ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedClusterFile.first));
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
return 1;
// Add blob credentials files from the environment to the list collected from the command line.
const char *blobCredsFromENV = getenv("FDB_BLOB_CREDENTIALS");
if(blobCredsFromENV != nullptr) {
StringRef t((uint8_t*)blobCredsFromENV, strlen(blobCredsFromENV));
do {
StringRef file = t.eat(":");
if(file.size() != 0)
blobCredentials.push_back(file.toString());
} while(t.size() != 0);
}
try {
cluster = Cluster::createCluster(ccf, -1);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
return 1;
// Update the global blob credential files list
std::vector<std::string> *pFiles = (std::vector<std::string> *)g_network->global(INetwork::enBlobCredentialFiles);
if(pFiles != nullptr) {
for(auto &f : blobCredentials) {
pFiles->push_back(f);
}
}
auto initCluster = [&]() {
auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile);
try {
ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedClusterFile.first));
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
return false;
}
try {
cluster = Cluster::createCluster(ccf, -1);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
return false;
}
db = cluster->createDatabase(databaseKey, localities).get();
return true;
};
TraceEvent("ProgramStart")
.detail("SourceVersion", getHGVersion())
.detail("Version", FDB_VT_VERSION )
@ -2641,8 +2678,6 @@ int main(int argc, char* argv[]) {
.detail("CommandLine", commandLine)
.detail("MemoryLimit", memLimit)
.trackLatest("ProgramStart");
db = cluster->createDatabase(databaseKey, localities).get();
if(sourceClusterFile.size()) {
auto resolvedSourceClusterFile = ClusterConnectionFile::lookupClusterFileName(sourceClusterFile);
@ -2651,7 +2686,7 @@ int main(int argc, char* argv[]) {
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedSourceClusterFile, e).c_str());
return 1;
return FDB_EXIT_ERROR;
}
try {
@ -2660,7 +2695,7 @@ int main(int argc, char* argv[]) {
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", source_ccf->getFilename().c_str());
return 1;
return FDB_EXIT_ERROR;
}
source_db = source_cluster->createDatabase(databaseKey, localities).get();
@ -2669,6 +2704,8 @@ int main(int argc, char* argv[]) {
switch (programExe)
{
case EXE_AGENT:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter(runAgent(db));
break;
case EXE_BACKUP:
@ -2676,6 +2713,8 @@ int main(int argc, char* argv[]) {
{
case BACKUP_START:
{
if(!initCluster())
return FDB_EXIT_ERROR;
// Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable.
openBackupContainer(argv[0], destinationContainer);
f = stopAfter( submitBackup(db, destinationContainer, snapshotIntervalSeconds, backupKeys, tagName, dryRun, waitForDone, stopWhenDone) );
@ -2683,30 +2722,45 @@ int main(int argc, char* argv[]) {
}
case BACKUP_STATUS:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( statusBackup(db, tagName, maxErrors) );
break;
case BACKUP_ABORT:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( abortBackup(db, tagName) );
break;
case BACKUP_WAIT:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( waitBackup(db, tagName, stopWhenDone) );
break;
case BACKUP_DISCONTINUE:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( discontinueBackup(db, tagName, waitForDone) );
break;
case BACKUP_PAUSE:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( changeBackupResumed(db, true) );
break;
case BACKUP_RESUME:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( changeBackupResumed(db, false) );
break;
case BACKUP_EXPIRE:
if(!datetime.empty())
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( expireBackupData(argv[0], destinationContainer, expVersion, datetime, db) );
break;
@ -2732,6 +2786,8 @@ int main(int argc, char* argv[]) {
break;
case EXE_RESTORE:
if(!initCluster())
return FDB_EXIT_ERROR;
switch(restoreType) {
case RESTORE_START:
f = stopAfter( runRestore(db, tagName, restoreContainer, backupKeys, dbVersion, !dryRun, !quietDisplay, waitForDone, addPrefix, removePrefix) );
@ -2760,9 +2816,13 @@ int main(int argc, char* argv[]) {
}
break;
case EXE_DR_AGENT:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( runDBAgent(source_db, db) );
break;
case EXE_DB_BACKUP:
if(!initCluster())
return FDB_EXIT_ERROR;
switch (dbType)
{
case DB_START:

View File

@ -26,8 +26,6 @@
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/Platform.h"
#include "fdbclient/Status.h"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <algorithm>
namespace IBackupFile_impl {
@ -421,71 +419,8 @@ public:
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion){
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion);
}
/*
struct BackupMetadata {
BackupMetadata() : format_version(1) {}
int64_t format_version;
std::string toJSON() {
json_spirit::mObject json;
JSONDoc doc(json);
doc.create("format_version") = formatVersion;
return json_spirit::write_string(json_spirit::mValue(json));
}
static BackupInfo fromJSON(std::string docString) {
BackupInfo info;
json_spirit::mValue json;
json_spirit::read_string(docString, json);
JSONDoc doc(json);
doc.tryGet("format_version", info.formatVersion);
return info;
}
};
ACTOR Future<Optional<BackupMetadata>> readInfo_impl(Reference<BackupContainerFileSystem> bc) {
state Reference<IAsyncFile> f;
try {
Reference<IAsyncFile> _f = wait(bc->readFile(bc->backupInfoPath()));
f = _f;
} catch(Error &e) {
if(e.code() == error_code_file_not_found)
return Optional<BackupMetadata>();
}
state int64_t size = wait(f->size());
state Standalone<StringRef> buf = makeString(size);
try {
int read = wait(f->read(mutateString(buf), size, 0));
if(read != size)
throw io_error();
return BackupMetadata::fromJSON(buf.toString());
} catch(Error &e) {
TraceEvent(SevWarn, "BackupContainerInvalidBackupInfo").error(e);
throw backup_invalid_info();
}
}
// Write backup metadata
ACTOR Future<Void> writeInfo_impl(Reference<BackupContainerFileSystem> bc, BackupMetadata info) {
state Reference<IBackupFile> f = wait(bc->writeFile(bc->backupInfoPath()));
Void _ = wait(f->append(info.toJSON()));
Void _ = wait(f->finish());
return Void();
}
*/
};
class BackupContainerLocalDirectory : public BackupContainerFileSystem, ReferenceCounted<BackupContainerLocalDirectory> {
public:
void addref() { return ReferenceCounted<BackupContainerLocalDirectory>::addref(); }

View File

@ -26,6 +26,7 @@
#include "time.h"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include "IAsyncFile.h"
json_spirit::mObject BlobStoreEndpoint::Stats::getJSON() {
json_spirit::mObject o;
@ -116,53 +117,33 @@ std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
return r;
}
struct tokenizer {
tokenizer(StringRef s) : s(s) {}
StringRef tok(StringRef sep) {
for(int i = 0, iend = s.size() - sep.size(); i <= iend; ++i) {
if(s.substr(i, sep.size()) == sep) {
StringRef token = s.substr(0, i);
s = s.substr(i + sep.size());
return token;
}
}
StringRef token = s;
s = StringRef();
return token;
}
StringRef tok(const char *sep) { return tok(StringRef((const uint8_t *)sep, strlen(sep))); }
StringRef s;
};
Reference<BlobStoreEndpoint> BlobStoreEndpoint::fromString(std::string const &url, std::string *resourceFromURL, std::string *error) {
if(resourceFromURL)
resourceFromURL->clear();
try {
tokenizer t(url);
StringRef prefix = t.tok("://");
StringRef t(url);
StringRef prefix = t.eat("://");
if(prefix != LiteralStringRef("blobstore"))
throw std::string("Invalid blobstore URL.");
StringRef key = t.tok(":");
StringRef secret = t.tok("@");
StringRef hostPort = t.tok("/");
StringRef resource = t.tok("?");
StringRef cred = t.eat("@");
StringRef hostPort = t.eat("/");
StringRef resource = t.eat("?");
// hostPort is at least a host or IP address, optionally followed by :portNumber or :serviceName
tokenizer h(hostPort);
StringRef host = h.tok(":");
StringRef h(hostPort);
StringRef host = h.eat(":");
if(host.size() == 0)
throw std::string("host cannot be empty");
StringRef service = h.s;
StringRef service = h.eat();
BlobKnobs knobs;
while(1) {
StringRef name = t.tok("=");
StringRef name = t.eat("=");
if(name.size() == 0)
break;
StringRef value = t.tok("&");
StringRef value = t.eat("&");
char *valueEnd;
int ivalue = strtol(value.toString().c_str(), &valueEnd, 10);
if(*valueEnd || ivalue == 0)
@ -174,6 +155,10 @@ Reference<BlobStoreEndpoint> BlobStoreEndpoint::fromString(std::string const &ur
if(resourceFromURL != nullptr)
*resourceFromURL = resource.toString();
StringRef c(cred);
StringRef key = c.eat(":");
StringRef secret = c.eat();
return Reference<BlobStoreEndpoint>(new BlobStoreEndpoint(host.toString(), service.toString(), key.toString(), secret.toString(), knobs));
} catch(std::string &err) {
@ -190,7 +175,13 @@ std::string BlobStoreEndpoint::getResourceURL(std::string resource) {
hostPort.append(":");
hostPort.append(service);
}
std::string r = format("blobstore://%s:%s@%s/%s", key.c_str(), secret.c_str(), hostPort.c_str(), resource.c_str());
// If secret isn't being looked up from credentials files then it was passed explicitly in th URL so show it here.
std::string s;
if(!lookupSecret)
s = std::string(":") + secret;
std::string r = format("blobstore://%s%s@%s/%s", key.c_str(), s.c_str(), hostPort.c_str(), resource.c_str());
std::string p = knobs.getURLParameters();
if(!p.empty())
r.append("?").append(p);
@ -275,11 +266,78 @@ Future<int64_t> BlobStoreEndpoint::objectSize(std::string const &bucket, std::st
return objectSize_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, object);
}
Future<BlobStoreEndpoint::ReusableConnection> BlobStoreEndpoint::connect() {
// Try to read a file, parse it as JSON, and return the resulting document.
// It will NOT throw if any errors are encountered, it will just return an empty
// JSON object and will log trace events for the errors encountered.
ACTOR Future<json_spirit::mObject> tryReadJSONFile(std::string path) {
state std::string content;
try {
state Reference<IAsyncFile> f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY, 0));
state int64_t size = wait(f->size());
state Standalone<StringRef> buf = makeString(size);
int r = wait(f->read(mutateString(buf), size, 0));
ASSERT(r == size);
content = buf.toString();
} catch(Error &e) {
TraceEvent(SevWarn, "BlobCredentialFileError").detail("File", path).error(e);
return json_spirit::mObject();
}
try {
json_spirit::mValue json;
json_spirit::read_string(content, json);
if(json.type() == json_spirit::obj_type)
return json.get_obj();
else
TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").detail("File", path);
} catch(Error &e) {
TraceEvent(SevWarn, "BlobCredentialFileParseFailed").detail("File", path).error(e);
}
return json_spirit::mObject();
}
ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
std::vector<std::string> *pFiles = (std::vector<std::string> *)g_network->global(INetwork::enBlobCredentialFiles);
if(pFiles == nullptr)
return Void();
state std::vector<Future<json_spirit::mObject>> reads;
for(auto &f : *pFiles)
reads.push_back(tryReadJSONFile(f));
Void _ = wait(waitForAll(reads));
std::string key = b->key + "@" + b->host;
for(auto &f : reads) {
JSONDoc doc(f.get());
if(doc.has("accounts") && doc.last().type() == json_spirit::obj_type) {
JSONDoc accounts(doc.last().get_obj());
if(accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) {
JSONDoc account(accounts.last());
std::string secret;
if(account.tryGet("secret", secret)) {
b->secret = secret;
return Void();
}
}
}
}
return Void();
}
Future<Void> BlobStoreEndpoint::updateSecret() {
return updateSecret_impl(Reference<BlobStoreEndpoint>::addRef(this));
}
ACTOR Future<BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<BlobStoreEndpoint> b) {
// First try to get a connection from the pool
while(!connectionPool.empty()) {
ReusableConnection rconn = connectionPool.front();
connectionPool.pop();
while(!b->connectionPool.empty()) {
BlobStoreEndpoint::ReusableConnection rconn = b->connectionPool.front();
b->connectionPool.pop();
// If the connection expires in the future then return it
if(rconn.expirationTime > now()) {
@ -291,13 +349,21 @@ Future<BlobStoreEndpoint::ReusableConnection> BlobStoreEndpoint::connect() {
}
}
return map(INetworkConnections::net()->connect(host, service.empty() ? "http" : service), [=] (Reference<IConnection> conn) {
state Reference<IConnection> conn = wait(INetworkConnections::net()->connect(b->host, b->service.empty() ? "http" : b->service));
TraceEvent("BlobStoreEndpointNewConnection")
.detail("RemoteEndpoint", conn->getPeerAddress())
.detail("ExpiresIn", knobs.max_connection_life)
.suppressFor(5, true);;
return ReusableConnection({conn, now() + knobs.max_connection_life});
});
.detail("ExpiresIn", b->knobs.max_connection_life)
.suppressFor(5, true);
if(b->lookupSecret)
Void _ = wait(b->updateSecret());
return BlobStoreEndpoint::ReusableConnection({conn, now() + b->knobs.max_connection_life});
}
Future<BlobStoreEndpoint::ReusableConnection> BlobStoreEndpoint::connect() {
return connect_impl(Reference<BlobStoreEndpoint>::addRef(this));
}
void BlobStoreEndpoint::returnConnection(ReusableConnection &rconn) {
@ -333,9 +399,6 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
// Start connecting
Future<BlobStoreEndpoint::ReusableConnection> frconn = bstore->connect();
// Finish/update the request headers (which includes Date header)
bstore->setAuthHeaders(verb, resource, headers);
// Make a shallow copy of the queue by calling addref() on each buffer in the chain and then prepending that chain to contentCopy
if(pContent != nullptr) {
contentCopy.discardAll();
@ -352,6 +415,12 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
// Finish connecting, do request
state BlobStoreEndpoint::ReusableConnection rconn = wait(timeoutError(frconn, bstore->knobs.connect_timeout));
// Finish/update the request headers (which includes Date header)
// This must be done AFTER the connection is ready because if credentials are coming from disk they are refreshed
// when a new connection is established and setAuthHeaders() would need the updated secret.
bstore->setAuthHeaders(verb, resource, headers);
remoteAddress = rconn.conn->getPeerAddress();
Void _ = wait(bstore->requestRate->getAllowance(1));
state Reference<HTTP::Response> r = wait(timeoutError(HTTP::doRequest(rconn.conn, verb, resource, headers, &contentCopy, contentLen, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate), bstore->knobs.request_timeout));

View File

@ -89,8 +89,8 @@ public:
}
};
BlobStoreEndpoint(std::string const &host, std::string service, std::string const &key, std::string const &key_secret, BlobKnobs const &knobs = BlobKnobs())
: host(host), service(service), key(key), secret(key_secret), knobs(knobs),
BlobStoreEndpoint(std::string const &host, std::string service, std::string const &key, std::string const &secret, BlobKnobs const &knobs = BlobKnobs())
: host(host), service(service), key(key), secret(secret), lookupSecret(secret.empty()), knobs(knobs),
requestRate(new SpeedLimit(knobs.requests_per_second, 1)),
sendRate(new SpeedLimit(knobs.max_send_bytes_per_second, 1)),
recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)),
@ -124,6 +124,7 @@ public:
std::string service;
std::string key;
std::string secret;
bool lookupSecret;
BlobKnobs knobs;
// Speed and concurrency limits
@ -133,6 +134,8 @@ public:
FlowLock concurrentRequests;
FlowLock concurrentUploads;
Future<Void> updateSecret();
// Calculates the authentication string from the secret key
std::string hmac_sha1(std::string const &msg);

View File

@ -461,6 +461,26 @@ public:
return size() - other.size();
}
// Removes bytes from begin up to and including the sep string, returns StringRef of the part before sep
StringRef eat(StringRef sep) {
for(int i = 0, iend = size() - sep.size(); i <= iend; ++i) {
if(sep.compare(substr(i, sep.size())) == 0) {
StringRef token = substr(0, i);
*this = substr(i + sep.size());
return token;
}
}
return eat();
}
StringRef eat() {
StringRef r = *this;
*this = StringRef();
return r;
}
StringRef eat(const char *sep) {
return eat(StringRef((const uint8_t *)sep, strlen(sep)));
}
private:
// Unimplemented; blocks conversion through std::string
StringRef( char* );

View File

@ -217,6 +217,8 @@ public:
BoolMetricHandle awakeMetric;
EventMetricHandle<SlowTask> slowTaskMetric;
std::vector<std::string> blobCredentialFiles;
};
static tcp::endpoint tcpEndpoint( NetworkAddress const& n ) {
@ -493,6 +495,7 @@ Net2::Net2(NetworkAddress localAddress, bool useThreadPool, bool useMetrics)
}
setGlobal(INetwork::enNetworkConnections, (flowGlobalType) network);
setGlobal(INetwork::enASIOService, (flowGlobalType) &reactor.ios);
setGlobal(INetwork::enBlobCredentialFiles, &blobCredentialFiles);
#ifdef __linux__
setGlobal(INetwork::enEventFD, (flowGlobalType) N2::ASIOReactor::newEventFD(reactor));

View File

@ -196,7 +196,7 @@ public:
enum enumGlobal {
enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3,
enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9
enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enBlobCredentialFiles = 10
};
virtual void longTaskCheck( const char* name ) {}