Merge pull request #224 from cie/add-fdbbackup-interface

Add fdbbackup interface
This commit is contained in:
Stephen Atherton 2017-12-19 23:33:17 -08:00 committed by GitHub Enterprise
commit 193c216f52
6 changed files with 415 additions and 333 deletions

View File

@ -29,9 +29,11 @@
#include "fdbclient/BackupAgent.h"
#include "fdbclient/Status.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbrpc/Platform.h"
#include "fdbrpc/BlobStore.h"
#include "fdbclient/json_spirit/json_spirit_writer_template.h"
#include <stdarg.h>
@ -39,6 +41,7 @@
#include <algorithm> // std::transform
#include <string>
#include <iostream>
#include <ctime>
using std::cout;
using std::endl;
@ -69,15 +72,15 @@ using std::endl;
// Type of program being executed
enum enumProgramExe {
EXE_AGENT, EXE_BACKUP, EXE_RESTORE, EXE_DR_AGENT, EXE_DB_BACKUP, EXE_BLOBMANAGER, EXE_UNDEFINED
EXE_AGENT, EXE_BACKUP, EXE_RESTORE, EXE_DR_AGENT, EXE_DB_BACKUP, EXE_UNDEFINED
};
enum enumBackupType {
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_DISABLE, BACKUP_ENABLE
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_PAUSE, BACKUP_RESUME, BACKUP_EXPIRE, BACKUP_DELETE, BACKUP_DESCRIBE, BACKUP_LIST
};
enum enumDBType {
DB_UNDEFINED=0, DB_START, DB_STATUS, DB_SWITCH, DB_ABORT, DB_DISABLE, DB_ENABLE
DB_UNDEFINED=0, DB_START, DB_STATUS, DB_SWITCH, DB_ABORT, DB_PAUSE, DB_RESUME
};
enum enumRestoreType {
@ -87,7 +90,7 @@ enum enumRestoreType {
//
enum {
// Backup constants
OPT_DESTCONTAINER, OPT_ERRORLIMIT, OPT_NOSTOPWHENDONE,
OPT_DESTCONTAINER, OPT_ERRORLIMIT, OPT_NOSTOPWHENDONE, OPT_EXPVERSION, OPT_BASEURL, OPT_DATETIME,
// Backup and Restore constants
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
@ -277,7 +280,7 @@ CSimpleOpt::SOption g_rgBackupWaitOptions[] = {
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupDisableOptions[] = {
CSimpleOpt::SOption g_rgBackupPauseOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
@ -300,6 +303,109 @@ CSimpleOpt::SOption g_rgBackupDisableOptions[] = {
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_DATETIME, "-D", SO_REQ_SEP },
{ OPT_DATETIME, "--date", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_EXPVERSION, "-u", SO_REQ_SEP },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupDeleteOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupListOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_BASEURL, "-b", SO_REQ_SEP },
{ OPT_BASEURL, "--base_url", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgRestoreOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
@ -478,7 +584,7 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgDBDisableOptions[] = {
CSimpleOpt::SOption g_rgDBPauseOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
@ -503,28 +609,11 @@ CSimpleOpt::SOption g_rgDBDisableOptions[] = {
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBlobOptions[] = {
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
const KeyRef exeAgent = LiteralStringRef("backup_agent");
const KeyRef exeBackup = LiteralStringRef("fdbbackup");
const KeyRef exeRestore = LiteralStringRef("fdbrestore");
const KeyRef exeDatabaseAgent = LiteralStringRef("dr_agent");
const KeyRef exeDatabaseBackup = LiteralStringRef("fdbdr");
const KeyRef exeBlobManager = LiteralStringRef("fdbblob");
extern void flushTraceFileVoid();
extern const char* getHGVersion();
@ -579,28 +668,25 @@ static void printAgentUsage(bool devhelp) {
return;
}
void printBlobStoreParameterInfo(const char *pad) {
printf("%sValid Blob Store parameters:\n\n", pad);
for(auto &f : BlobStoreEndpoint::BlobKnobs::getKnobDescriptions())
printf("%s %s\n", pad, f.c_str());
}
void printBackupContainerInfo() {
printf(" Backup URL forms:\n\n");
std::vector<std::string> formats = IBackupContainer::getURLFormats();
for(auto &f : formats)
printf(" %s\n", f.c_str());
printf("\n");
printBlobStoreParameterInfo(" ");
}
static void printBackupUsage(bool devhelp) {
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
printf("Usage: %s (start | status | abort | wait | discontinue | disable | enable) [OPTIONS]\n\n", exeBackup.toString().c_str());
printf("Usage: %s (start | status | abort | wait | discontinue | pause | resume | expire | delete | describe | list) [OPTIONS]\n\n", exeBackup.toString().c_str());
printf(" -b, --base_url BASEURL\n"
" Base URL of the blob store.\n");
printf(" -C CONNFILE The path of a file containing the connection string for the\n"
" FoundationDB cluster. The default is first the value of the\n"
" FDB_CLUSTER_FILE environment variable, then `./fdb.cluster',\n"
" then `%s'.\n", platform::getDefaultClusterFilePath().c_str());
printf(" -D, --date DATETIME\n"
" Delete all data from the beginning to the given date and time in YYYY-MM-DD.HH:MI:SS format (UTC).\n");
printf(" -d, --destcontainer URL\n"
" The Backup URL for the destination of this backup.\n");
printBackupContainerInfo();
@ -608,6 +694,7 @@ static void printBackupUsage(bool devhelp) {
printf(" -k KEYS List of key ranges to backup.\n"
" If not specified, the entire database will be backed up.\n");
printf(" -n, --dry-run Perform a trial run with no changes made.\n");
printf(" -u EXPVERSION Delete all data up to (but not including the given version).\n");
printf(" -v, --version Print version information and exit.\n");
printf(" -w, --wait Wait for the backup to complete (allowed with `start' and `discontinue').\n");
printf(" -z, --no-stop-when-done\n"
@ -692,7 +779,7 @@ static void printDBAgentUsage(bool devhelp) {
static void printDBBackupUsage(bool devhelp) {
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
printf("Usage: %s (start | status | switch | abort | disable | enable) [OPTIONS]\n\n", exeDatabaseBackup.toString().c_str());
printf("Usage: %s (start | status | switch | abort | pause | resume) [OPTIONS]\n\n", exeDatabaseBackup.toString().c_str());
printf(" -d, --destination CONNFILE\n"
" The path of a file containing the connection string for the\n");
printf(" destination FoundationDB cluster.\n");
@ -720,25 +807,6 @@ static void printDBBackupUsage(bool devhelp) {
return;
}
static void printBlobManagerUsage() {
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
printf("Usage: %s [options] <command> <arg>)\n\n", exeBlobManager.toString().c_str());
printf(" Commands:\n");
printf(" list <url> Lists the backups found at the given blob store URL. URL format is\n");
printf(" %s\n", BlobStoreEndpoint::getURLFormat().c_str());
printf(" listinfo <url> Same as list but shows 'info' output for each backup.\n");
printf(" info <url> Scans the given blob store Backup URL and outputs size and object count. URL format is\n");
printf(" %s\n", BackupContainerBlobStore::getURLFormat().c_str());
printf(" dump <url> Same as list but also lists all objects and their sizes.\n");
printf(" delete <url> Deletes the backup specified by the blob store Backup URL. URL format is\n");
printf(" %s\n", BackupContainerBlobStore::getURLFormat().c_str());
printf("\n");
printBlobStoreParameterInfo(" ");
printf(" -v, --version Print version information and exit.\n");
printf(" -h, --help Display this help and exit.\n");
return;
}
static void printUsage(enumProgramExe programExe, bool devhelp)
{
@ -759,9 +827,6 @@ static void printUsage(enumProgramExe programExe, bool devhelp)
case EXE_DB_BACKUP:
printDBBackupUsage(devhelp);
break;
case EXE_BLOBMANAGER:
printBlobManagerUsage();
break;
case EXE_UNDEFINED:
default:
break;
@ -830,13 +895,6 @@ enumProgramExe getProgramType(std::string programExe)
enProgramExe = EXE_DB_BACKUP;
}
// Check if blob manager
else if ((programExe.length() >= exeBlobManager.size()) &&
(programExe.compare(programExe.length() - exeBlobManager.size(), exeBlobManager.size(), (const char*)exeBlobManager.begin()) == 0))
{
enProgramExe = EXE_BLOBMANAGER;
}
return enProgramExe;
}
@ -854,8 +912,12 @@ enumBackupType getBackupType(std::string backupType)
values["abort"] = BACKUP_ABORT;
values["wait"] = BACKUP_WAIT;
values["discontinue"] = BACKUP_DISCONTINUE;
values["disable"] = BACKUP_DISABLE;
values["enable"] = BACKUP_ENABLE;
values["pause"] = BACKUP_PAUSE;
values["resume"] = BACKUP_RESUME;
values["expire"] = BACKUP_EXPIRE;
values["delete"] = BACKUP_DELETE;
values["describe"] = BACKUP_DESCRIBE;
values["list"] = BACKUP_LIST;
}
auto i = values.find(backupType);
@ -886,8 +948,8 @@ enumDBType getDBType(std::string dbType)
values["status"] = DB_STATUS;
values["switch"] = DB_SWITCH;
values["abort"] = DB_ABORT;
values["disable"] = DB_DISABLE;
values["enable"] = DB_ENABLE;
values["pause"] = DB_PAUSE;
values["resume"] = DB_RESUME;
}
auto i = values.find(dbType);
@ -951,10 +1013,10 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
state std::vector<KeyBackedTag> backupTags = wait(getAllBackupTags(tr));
state std::vector<Future<Version>> tagLastRestorableVersions;
state std::vector<Future<EBackupState>> tagStates;
state std::vector<Future<std::string>> tagContainers;
state std::vector<Future<Reference<IBackupContainer>>> tagContainers;
state std::vector<Future<int64_t>> tagRangeBytes;
state std::vector<Future<int64_t>> tagLogBytes;
state Future<Optional<Value>> fBackupDisabled = tr->get(fba.taskBucket->getDisableKey());
state Future<Optional<Value>> fBackupPaused = tr->get(fba.taskBucket->getPauseKey());
state int i = 0;
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -973,12 +1035,12 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
tagLastRestorableVersions.push_back(fba.getLastRestorable(tr, StringRef(tag->tagName)));
}
Void _ = wait( waitForAll(tagLastRestorableVersions) && waitForAll(tagStates) && waitForAll(tagContainers) && waitForAll(tagRangeBytes) && waitForAll(tagLogBytes) && success(fBackupDisabled));
Void _ = wait( waitForAll(tagLastRestorableVersions) && waitForAll(tagStates) && waitForAll(tagContainers) && waitForAll(tagRangeBytes) && waitForAll(tagLogBytes) && success(fBackupPaused));
JSONDoc tagsRoot = layerRoot.subDoc("tags.$latest");
layerRoot.create("tags.timestamp") = now();
layerRoot.create("total_workers.$sum") = fBackupDisabled.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
layerRoot.create("disabled.$latest") = fBackupDisabled.get().present();
layerRoot.create("total_workers.$sum") = fBackupPaused.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
layerRoot.create("paused.$latest") = fBackupPaused.get().present();
int j = 0;
for (KeyBackedTag eachTag : backupTags) {
@ -989,7 +1051,7 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
// The object for this backup tag inside this instance's subdocument
JSONDoc tagRoot = tagsRoot.subDoc(eachTag.tagName);
tagRoot.create("current_container") = tagContainers[j].get();
tagRoot.create("current_container") = tagContainers[j].get()->getURL();
tagRoot.create("current_status") = statusText;
tagRoot.create("last_restorable_version") = tagLastRestorableVersions[j].get();
tagRoot.create("last_restorable_seconds_behind") = last_restorable_seconds_behind;
@ -1012,7 +1074,7 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
state std::vector<Future<int>> backupStatus;
state std::vector<Future<int64_t>> tagRangeBytesDR;
state std::vector<Future<int64_t>> tagLogBytesDR;
state Future<Optional<Value>> fDRDisabled = tr->get(dba.taskBucket->getDisableKey());
state Future<Optional<Value>> fDRPaused = tr->get(dba.taskBucket->getPauseKey());
state std::vector<UID> drTagUids;
for(int i = 0; i < tagNames.size(); i++) {
@ -1024,12 +1086,12 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
tagLogBytesDR.push_back(dba.getLogBytesWritten(tr2, tagUID));
}
Void _ = wait(waitForAll(backupStatus) && waitForAll(backupVersion) && waitForAll(tagRangeBytesDR) && waitForAll(tagLogBytesDR) && success(fDRDisabled));
Void _ = wait(waitForAll(backupStatus) && waitForAll(backupVersion) && waitForAll(tagRangeBytesDR) && waitForAll(tagLogBytesDR) && success(fDRPaused));
JSONDoc tagsRoot = layerRoot.subDoc("tags.$latest");
layerRoot.create("tags.timestamp") = now();
layerRoot.create("total_workers.$sum") = fDRDisabled.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
layerRoot.create("disabled.$latest") = fDRDisabled.get().present();
layerRoot.create("total_workers.$sum") = fDRPaused.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
layerRoot.create("paused.$latest") = fDRPaused.get().present();
for (int i = 0; i < tagNames.size(); i++) {
std::string tagName = dba.sourceTagNames.unpack(tagNames[i].key).getString(0).toString();
@ -1584,11 +1646,11 @@ ACTOR Future<Void> discontinueBackup(Database db, std::string tagName, bool wait
return Void();
}
ACTOR Future<Void> changeBackupEnabled(Database db, bool disable) {
ACTOR Future<Void> changeBackupResumed(Database db, bool pause) {
try {
state FileBackupAgent backupAgent;
Void _ = wait(backupAgent.taskBucket->changeDisable(db, disable));
printf("All backup agents have been %s.\n", disable ? "disabled" : "enabled");
Void _ = wait(backupAgent.taskBucket->changePause(db, pause));
printf("All backup agents have been %s.\n", pause ? "paused" : "resumed");
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
@ -1600,11 +1662,11 @@ ACTOR Future<Void> changeBackupEnabled(Database db, bool disable) {
return Void();
}
ACTOR Future<Void> changeDBBackupEnabled(Database src, Database dest, bool disable) {
ACTOR Future<Void> changeDBBackupResumed(Database src, Database dest, bool pause) {
try {
state DatabaseBackupAgent backupAgent(src);
Void _ = wait(backupAgent.taskBucket->changeDisable(dest, disable));
printf("All DR agents have been %s.\n", disable ? "disabled" : "enabled");
Void _ = wait(backupAgent.taskBucket->changePause(dest, pause));
printf("All DR agents have been %s.\n", pause ? "paused" : "resumed");
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
@ -1663,142 +1725,146 @@ ACTOR Future<Void> runRestore(Database db, std::string tagName, std::string cont
return Void();
}
ACTOR Future<int> doBlobDelete(std::string url) {
state std::string error;
Reference<IBackupContainer> openBackupContainer(const char *name, std::string destinationContainer) {
// Error, if no dest container was specified
if (destinationContainer.empty()) {
fprintf(stderr, "ERROR: No backup destination was specified.\n");
printHelpTeaser(name);
throw backup_error();
}
std::string error;
Reference<IBackupContainer> c;
try {
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url, &error);
} catch(Error &e) {
fprintf(stderr, "ERROR: Invalid blobstore URL: %s (%s) Format is: %s\n", url.c_str(), error.c_str(), BackupContainerBlobStore::getURLFormat().c_str());
return FDB_EXIT_ERROR;
c = IBackupContainer::openContainer(destinationContainer);
}
state int pNumDeleted = 0;
state Future<Void> f = ((BackupContainerBlobStore *)c.getPtr())->deleteContainer(&pNumDeleted);
loop {
choose {
when(Void _ = wait(f)) {
break;
}
when(Void _ = wait(delay(3.0))) {
printf("%d objects deleted so far...\n", pNumDeleted);
}
}
}
printf("Done. %d objects deleted.\n", pNumDeleted);
return FDB_EXIT_SUCCESS;
}
ACTOR Future<int> doBlobInfo(std::string url, bool showObjects = false) {
state std::string error;
try {
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url, &error);
} catch(Error &e) {
fprintf(stderr, "ERROR: Invalid blobstore URL: %s (%s) Format is: %s\n", url.c_str(), error.c_str(), BackupContainerBlobStore::getURLFormat().c_str());
return FDB_EXIT_ERROR;
}
state BackupContainerBlobStore *bc = (BackupContainerBlobStore *)c.getPtr();
state PromiseStream<BlobStoreEndpoint::ObjectInfo> resultsStream;
state Future<Void> done = bc->listFilesStream(resultsStream);
state int64_t total_bytes = 0;
state int64_t total_objects = 0;
try {
loop {
choose {
when(Void _ = wait(done)) {
break;
}
when(BlobStoreEndpoint::ObjectInfo info = waitNext(resultsStream.getFuture())) {
++total_objects;
total_bytes += info.size;
if(showObjects)
printf("\t%lld\t%s/%s\n", info.size, info.bucket.c_str(), info.name.c_str());
}
}
}
} catch(Error &e) {
printf("ERROR (%s) on %s\n", e.what(), url.c_str());
return FDB_EXIT_ERROR;
}
printf("%lld\t%lld\t%s\n", total_bytes, total_objects, url.c_str());
return FDB_EXIT_SUCCESS;
}
ACTOR Future<int> doBlobList(std::string url, bool deep = false) {
state Reference<BlobStoreEndpoint> bse;
state std::string error;
try {
bse = BlobStoreEndpoint::fromString(url, NULL, &error);
} catch(Error &e) {
fprintf(stderr, "ERROR: Invalid blobstore endpoint: %s (%s). Must look like this: %s\n", url.c_str(), error.c_str(), BlobStoreEndpoint::getURLFormat().c_str());
return FDB_EXIT_ERROR;
}
state std::vector<std::string> results = wait(BackupContainerBlobStore::listBackupContainers(bse));
state std::vector<std::string>::iterator i;
state int status = FDB_EXIT_SUCCESS;
for(i = results.begin(); i != results.end(); ++i) {
std::string url = bse->getResourceURL(*i);
if(!deep)
printf("%s\n", url.c_str());
else {
int r = wait(doBlobInfo(url));
if(status == FDB_EXIT_SUCCESS)
status = r;
}
}
return status;
}
ACTOR Future<int> doBlobCommand(std::vector<std::string> args) {
if(args.size() < 2) {
printBlobManagerUsage();
return FDB_EXIT_ERROR;
}
state std::string cmd = args[0];
if(cmd == "-h" || cmd == "--help") {
printBlobManagerUsage();
return FDB_EXIT_ERROR;
}
try {
if(cmd == "list") {
int r = wait(doBlobList(args[1]));
return r;
}
if(cmd == "listinfo") {
printf("BYTES\tOBJECTS\tURL\n");
int r = wait(doBlobList(args[1], true));
return r;
}
else if(cmd == "delete") {
int r = wait(doBlobDelete(args[1]));
return r;
}
else if(cmd == "info") {
printf("BYTES\tOBJECTS\tURL\n");
int r = wait(doBlobInfo(args[1]));
return r;
}
else if(cmd == "dump") {
int r = wait(doBlobInfo(args[1], true));
return r;
}
else {
printf("ERROR: Unknown command: '%s'\n", cmd.c_str());
printBlobManagerUsage();
return FDB_EXIT_ERROR;
}
} catch(Error &e) {
fprintf(stderr, "ERROR: Blob command '%s' failed: %s\n", cmd.c_str(), e.what());
catch (Error& e) {
if(!error.empty())
error = std::string("[") + error + "]";
fprintf(stderr, "ERROR (%s) on %s %s\n", e.what(), destinationContainer.c_str(), error.c_str());
printHelpTeaser(name);
throw;
}
return c;
}
ACTOR Future<Version> getVersionFromDateTime(std::string datetime, Database db) {
state KeyBackedMap<int64_t, Version> versionMap(timeKeeperPrefixRange.begin);
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(db));
int year, month, day, hour, minute, second;
if (sscanf(datetime.c_str(), "%d-%d-%d.%d:%d:%d", &year, &month, &day, &hour, &minute, &second) != 6) {
fprintf(stderr, "ERROR: Incorrect date/time format.\n");
throw backup_error();
}
struct tm expDateTime = {0};
expDateTime.tm_year = year - 1900;
expDateTime.tm_mon = month - 1;
expDateTime.tm_mday = day;
expDateTime.tm_hour = hour;
expDateTime.tm_min = minute;
expDateTime.tm_sec = second;
expDateTime.tm_isdst = -1;
state int64_t time = (int64_t) mktime(&expDateTime);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state std::vector<std::pair<int64_t, Version>> results = wait( versionMap.getRange(tr, 0, Optional<int64_t>(time), 1, false, true) );
ASSERT(results.size() == 1);
return results[0].second;
} catch (Error& e) {
Void _ = wait(tr->onError(e));
}
}
}
ACTOR Future<Void> expireBackupData(const char *name, std::string destinationContainer, Version endVersion, std::string datetime, Database db) {
if (!endVersion && datetime.length()) {
Version v = wait( getVersionFromDateTime(datetime, db) );
endVersion = v;
}
if (!endVersion) {
fprintf(stderr, "ERROR: No version or date/time is specified.\n");
printHelpTeaser(name);
throw backup_error();;
}
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
Void _ = wait(c->expireData(endVersion));
printf("All data before version %lld are deleted\n", endVersion);
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
ACTOR Future<Void> deleteBackupContainer(const char *name, std::string destinationContainer) {
try {
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
loop {
choose {
when ( Void _ = wait(c->deleteContainer()) ) {
printf("The entire container has been deleted.\n");
break;
}
when ( Void _ = wait(delay(3)) ) {
int numDeleted = 0;
c->deleteContainer(&numDeleted);
printf("%d files have been deleted.\n", numDeleted);
}
}
}
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
ACTOR Future<Void> describeBackup(const char *name, std::string destinationContainer) {
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
BackupDescription desc = wait(c->describeBackup());
printf("%s\n", desc.toString().c_str());
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
ACTOR Future<Void> listBackup(std::string baseUrl) {
try {
std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl));
for (std::string container : containers) {
printf("%s\n", container.c_str());
}
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
static std::vector<std::vector<StringRef>> parseLine(std::string &line, bool& err, bool& partial)
@ -2017,11 +2083,23 @@ int main(int argc, char* argv[]) {
case BACKUP_DISCONTINUE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDiscontinueOptions, SO_O_EXACT);
break;
case BACKUP_DISABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDisableOptions, SO_O_EXACT);
case BACKUP_PAUSE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupPauseOptions, SO_O_EXACT);
break;
case BACKUP_ENABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDisableOptions, SO_O_EXACT);
case BACKUP_RESUME:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupPauseOptions, SO_O_EXACT);
break;
case BACKUP_EXPIRE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupExpireOptions, SO_O_EXACT);
break;
case BACKUP_DELETE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDeleteOptions, SO_O_EXACT);
break;
case BACKUP_DESCRIBE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDescribeOptions, SO_O_EXACT);
break;
case BACKUP_LIST:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupListOptions, SO_O_EXACT);
break;
case BACKUP_UNDEFINED:
default:
@ -2066,11 +2144,11 @@ int main(int argc, char* argv[]) {
case DB_ABORT:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBAbortOptions, SO_O_EXACT);
break;
case DB_DISABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBDisableOptions, SO_O_EXACT);
case DB_PAUSE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBPauseOptions, SO_O_EXACT);
break;
case DB_ENABLE:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBDisableOptions, SO_O_EXACT);
case DB_RESUME:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBPauseOptions, SO_O_EXACT);
break;
case DB_UNDEFINED:
default:
@ -2090,9 +2168,6 @@ int main(int argc, char* argv[]) {
}
}
break;
case EXE_BLOBMANAGER:
args = new CSimpleOpt(argc, argv, g_rgBlobOptions, SO_O_NOERR);
break;
case EXE_RESTORE:
if (argc < 2) {
printRestoreUsage(false);
@ -2127,6 +2202,9 @@ int main(int argc, char* argv[]) {
std::string destinationContainer;
std::string clusterFile;
std::string sourceClusterFile;
std::string baseUrl;
std::string datetime;
Version expVersion = 0;
std::vector<std::pair<std::string, std::string>> knobs;
std::string tagName = BackupAgentBase::getDefaultTag().toString();
bool tagProvided = false;
@ -2150,8 +2228,6 @@ int main(int argc, char* argv[]) {
uint64_t memLimit = 8LL << 30;
Optional<uint64_t> ti;
std::vector<std::string> blobArgs;
if( argc == 1 ) {
printUsage(programExe, false);
return FDB_EXIT_ERROR;
@ -2251,6 +2327,23 @@ int main(int argc, char* argv[]) {
localities.set(Standalone<StringRef>(syn), Standalone<StringRef>(std::string(args->OptionArg())));
break;
}
case OPT_DATETIME:
datetime = args->OptionArg();
break;
case OPT_EXPVERSION: {
const char* a = args->OptionArg();
long long expVersionValue = 0;
if (!sscanf(a, "%lld", &expVersionValue)) {
fprintf(stderr, "ERROR: Could not parse expiration version `%s'\n", a);
printHelpTeaser(argv[0]);
return FDB_EXIT_ERROR;
}
expVersion = expVersionValue;
break;
}
case OPT_BASEURL:
baseUrl = args->OptionArg();
break;
case OPT_CLUSTERFILE:
clusterFile = args->OptionArg();
break;
@ -2421,10 +2514,6 @@ int main(int argc, char* argv[]) {
}
break;
case EXE_BLOBMANAGER:
blobArgs.push_back(args->File(argLoop));
break;
case EXE_UNDEFINED:
default:
return FDB_EXIT_ERROR;
@ -2500,6 +2589,7 @@ int main(int argc, char* argv[]) {
Key tag;
Future<Optional<Void>> f;
Future<Optional<int>> fstatus;
Reference<IBackupContainer> c;
try {
setupNetwork(0, true);
@ -2512,58 +2602,55 @@ int main(int argc, char* argv[]) {
// Ordinarily, this is done when the network is run. However, network thread should be set before TraceEvents are logged. This thread will eventually run the network, so call it now.
TraceEvent::setNetworkThread();
// Blob Manager mode does not require connecting to any cluster
if(programExe != EXE_BLOBMANAGER) {
auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile);
auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile);
try {
ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedClusterFile.first));
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
return 1;
}
try {
cluster = Cluster::createCluster(ccf, -1);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
return 1;
}
TraceEvent("ProgramStart")
.detail("SourceVersion", getHGVersion())
.detail("Version", FDB_VT_VERSION )
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
.detail("CommandLine", commandLine)
.detail("MemoryLimit", memLimit)
.trackLatest("ProgramStart");
db = cluster->createDatabase(databaseKey, localities).get();
if(sourceClusterFile.size()) {
auto resolvedSourceClusterFile = ClusterConnectionFile::lookupClusterFileName(sourceClusterFile);
try {
ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedClusterFile.first));
source_ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedSourceClusterFile.first));
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedSourceClusterFile, e).c_str());
return 1;
}
try {
cluster = Cluster::createCluster(ccf, -1);
source_cluster = Cluster::createCluster(source_ccf, -1);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", source_ccf->getFilename().c_str());
return 1;
}
TraceEvent("ProgramStart")
.detail("SourceVersion", getHGVersion())
.detail("Version", FDB_VT_VERSION )
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
.detail("CommandLine", commandLine)
.detail("MemoryLimit", memLimit)
.trackLatest("ProgramStart");
db = cluster->createDatabase(databaseKey, localities).get();
if(sourceClusterFile.size()) {
auto resolvedSourceClusterFile = ClusterConnectionFile::lookupClusterFileName(sourceClusterFile);
try {
source_ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedSourceClusterFile.first));
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedSourceClusterFile, e).c_str());
return 1;
}
try {
source_cluster = Cluster::createCluster(source_ccf, -1);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", source_ccf->getFilename().c_str());
return 1;
}
source_db = source_cluster->createDatabase(databaseKey, localities).get();
}
source_db = source_cluster->createDatabase(databaseKey, localities).get();
}
switch (programExe)
@ -2576,26 +2663,8 @@ int main(int argc, char* argv[]) {
{
case BACKUP_START:
{
// Error, if no dest container was specified
if (destinationContainer.empty()) {
fprintf(stderr, "ERROR: No backup destination was specified.\n");
printHelpTeaser(argv[0]);
return FDB_EXIT_ERROR;
}
// Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable.
std::string error;
try {
Reference<IBackupContainer> c = IBackupContainer::openContainer(destinationContainer, &error);
}
catch (Error& e) {
if(!error.empty())
error = std::string("[") + error + "]";
fprintf(stderr, "ERROR (%s) on %s %s\n", e.what(), destinationContainer.c_str(), error.c_str());
printHelpTeaser(argv[0]);
return FDB_EXIT_ERROR;
}
openBackupContainer(argv[0], destinationContainer);
f = stopAfter( submitBackup(db, destinationContainer, backupKeys, tagName, dryRun, waitForDone, stopWhenDone) );
break;
}
@ -2616,12 +2685,28 @@ int main(int argc, char* argv[]) {
f = stopAfter( discontinueBackup(db, tagName, waitForDone) );
break;
case BACKUP_DISABLE:
f = stopAfter( changeBackupEnabled(db, true) );
case BACKUP_PAUSE:
f = stopAfter( changeBackupResumed(db, true) );
break;
case BACKUP_ENABLE:
f = stopAfter( changeBackupEnabled(db, false) );
case BACKUP_RESUME:
f = stopAfter( changeBackupResumed(db, false) );
break;
case BACKUP_EXPIRE:
f = stopAfter( expireBackupData(argv[0], destinationContainer, expVersion, datetime, db) );
break;
case BACKUP_DELETE:
f = stopAfter( deleteBackupContainer(argv[0], destinationContainer) );
break;
case BACKUP_DESCRIBE:
f = stopAfter( describeBackup(argv[0], destinationContainer) );
break;
case BACKUP_LIST:
f = stopAfter( listBackup(baseUrl) );
break;
case BACKUP_UNDEFINED:
@ -2679,11 +2764,11 @@ int main(int argc, char* argv[]) {
case DB_ABORT:
f = stopAfter( abortDBBackup(source_db, db, tagName, partial) );
break;
case DB_DISABLE:
f = stopAfter( changeDBBackupEnabled(source_db, db, true) );
case DB_PAUSE:
f = stopAfter( changeDBBackupResumed(source_db, db, true) );
break;
case DB_ENABLE:
f = stopAfter( changeDBBackupEnabled(source_db, db, false) );
case DB_RESUME:
f = stopAfter( changeDBBackupResumed(source_db, db, false) );
break;
case DB_UNDEFINED:
default:
@ -2693,9 +2778,6 @@ int main(int argc, char* argv[]) {
break;
}
break;
case EXE_BLOBMANAGER:
fstatus = stopAfter( doBlobCommand(blobArgs) );
break;
case EXE_UNDEFINED:
default:
return FDB_EXIT_ERROR;

View File

@ -1616,7 +1616,7 @@ public:
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<Value>> fDisabled = tr->get(backupAgent->taskBucket->getDisableKey());
state Future<Optional<Value>> fPaused = tr->get(backupAgent->taskBucket->getPauseKey());
int backupStateInt = wait(backupAgent->getStateValue(tr, logUid));
state BackupAgentBase::enumState backupState = (BackupAgentBase::enumState)backupStateInt;
@ -1702,9 +1702,9 @@ public:
}
}
Optional<Value> disabled = wait(fDisabled);
if(disabled.present()) {
statusText += format("\nAll DR agents have been disabled.\n");
Optional<Value> paused = wait(fPaused);
if(paused.present()) {
statusText += format("\nAll DR agents have been paused.\n");
}
break;

View File

@ -2947,7 +2947,7 @@ public:
statusText = "";
tag = makeBackupTag(tagName);
state Optional<UidAndAbortedFlagT> uidAndAbortedFlag = wait(tag.get(tr));
state Future<Optional<Value>> fDisabled = tr->get(backupAgent->taskBucket->getDisableKey());
state Future<Optional<Value>> fPaused = tr->get(backupAgent->taskBucket->getPauseKey());
if (uidAndAbortedFlag.present()) {
config = BackupConfig(uidAndAbortedFlag.get().first);
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
@ -2989,9 +2989,9 @@ public:
}
}
Optional<Value> disabled = wait(fDisabled);
if(disabled.present()) {
statusText += format("\nAll backup agents have been disabled.\n");
Optional<Value> paused = wait(fPaused);
if(paused.present()) {
statusText += format("\nAll backup agents have been paused.\n");
}
break;

View File

@ -229,10 +229,10 @@ public:
typedef std::vector<PairType> PairsType;
// If end is not present one key past the end of the map is used.
Future<PairsType> getRange(Reference<ReadYourWritesTransaction> tr, KeyType const &begin, Optional<KeyType> const &end, int limit, bool snapshot = false) const {
Future<PairsType> getRange(Reference<ReadYourWritesTransaction> tr, KeyType const &begin, Optional<KeyType> const &end, int limit, bool snapshot = false, bool reverse = false) const {
Subspace s = space; // 'this' could be invalid inside lambda
Key endKey = end.present() ? s.pack(Codec<KeyType>::pack(end.get())) : space.range().end;
return map(tr->getRange(KeyRangeRef(s.pack(Codec<KeyType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot),
return map(tr->getRange(KeyRangeRef(s.pack(Codec<KeyType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse),
[s] (Standalone<RangeResultRef> const &kvs) -> PairsType {
PairsType results;
for(int i = 0; i < kvs.size(); ++i) {

View File

@ -469,16 +469,16 @@ public:
}
}
ACTOR static Future<Void> watchDisabled(Database cx, Reference<TaskBucket> taskBucket, Reference<AsyncVar<bool>> disabled) {
ACTOR static Future<Void> watchPaused(Database cx, Reference<TaskBucket> taskBucket, Reference<AsyncVar<bool>> paused) {
loop {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
try {
taskBucket->setOptions(tr);
Optional<Value> disabledVal = wait(tr->get(taskBucket->disableKey));
disabled->set(disabledVal.present());
state Future<Void> watchDisabledFuture = tr->watch(taskBucket->disableKey);
Optional<Value> pausedVal = wait(tr->get(taskBucket->pauseKey));
paused->set(pausedVal.present());
state Future<Void> watchPausedFuture = tr->watch(taskBucket->pauseKey);
Void _ = wait(tr->commit());
Void _ = wait(watchDisabledFuture);
Void _ = wait(watchPausedFuture);
}
catch (Error &e) {
Void _ = wait(tr->onError(e));
@ -487,15 +487,15 @@ public:
}
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
state Reference<AsyncVar<bool>> disabled = Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) );
state Future<Void> watchDisabledFuture = watchDisabled(cx, taskBucket, disabled);
state Reference<AsyncVar<bool>> paused = Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) );
state Future<Void> watchPausedFuture = watchPaused(cx, taskBucket, paused);
loop {
while(disabled->get()) {
Void _ = wait(disabled->onChange() || watchDisabledFuture);
while(paused->get()) {
Void _ = wait(paused->onChange() || watchPausedFuture);
}
Void _ = wait(dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || disabled->onChange() || watchDisabledFuture);
Void _ = wait(dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || paused->onChange() || watchPausedFuture);
}
}
@ -762,7 +762,7 @@ TaskBucket::TaskBucket(const Subspace& subspace, bool sysAccess, bool priorityBa
, available(prefix.get(LiteralStringRef("av")))
, available_prioritized(prefix.get(LiteralStringRef("avp")))
, timeouts(prefix.get(LiteralStringRef("to")))
, disableKey(prefix.pack(LiteralStringRef("disable")))
, pauseKey(prefix.pack(LiteralStringRef("pause")))
, timeout(CLIENT_KNOBS->TASKBUCKET_TIMEOUT_VERSIONS)
, system_access(sysAccess)
, priority_batch(priorityBatch)
@ -781,13 +781,13 @@ Future<Void> TaskBucket::clear(Reference<ReadYourWritesTransaction> tr){
return Void();
}
Future<Void> TaskBucket::changeDisable(Reference<ReadYourWritesTransaction> tr, bool disable){
Future<Void> TaskBucket::changePause(Reference<ReadYourWritesTransaction> tr, bool pause){
setOptions(tr);
if(disable) {
tr->set(disableKey, StringRef());
if(pause) {
tr->set(pauseKey, StringRef());
} else {
tr->clear(disableKey);
tr->clear(pauseKey);
}
return Void();
@ -857,8 +857,8 @@ Future<Void> TaskBucket::run(Database cx, Reference<FutureBucket> futureBucket,
return TaskBucketImpl::run(cx, Reference<TaskBucket>::addRef(this), futureBucket, pollDelay, maxConcurrentTasks);
}
Future<Void> TaskBucket::watchDisabled(Database cx, Reference<AsyncVar<bool>> disabled) {
return TaskBucketImpl::watchDisabled(cx, Reference<TaskBucket>::addRef(this), disabled);
Future<Void> TaskBucket::watchPaused(Database cx, Reference<AsyncVar<bool>> paused) {
return TaskBucketImpl::watchPaused(cx, Reference<TaskBucket>::addRef(this), paused);
}
Future<bool> TaskBucket::isEmpty(Reference<ReadYourWritesTransaction> tr){

View File

@ -95,9 +95,9 @@ public:
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
}
Future<Void> changeDisable(Reference<ReadYourWritesTransaction> tr, bool disable);
Future<Void> changeDisable(Database cx, bool disable) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return changeDisable(tr, disable); });
Future<Void> changePause(Reference<ReadYourWritesTransaction> tr, bool pause);
Future<Void> changePause(Database cx, bool pause) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return changePause(tr, pause); });
}
Future<Void> clear(Reference<ReadYourWritesTransaction> tr);
@ -138,7 +138,7 @@ public:
Future<bool> doOne(Database cx, Reference<FutureBucket> futureBucket);
Future<Void> run(Database cx, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks);
Future<Void> watchDisabled(Database cx, Reference<AsyncVar<bool>> disabled);
Future<Void> watchPaused(Database cx, Reference<AsyncVar<bool>> paused);
Future<bool> isEmpty(Reference<ReadYourWritesTransaction> tr);
Future<bool> isEmpty(Database cx){
@ -195,8 +195,8 @@ public:
return lock_aware;
}
Key getDisableKey() const {
return disableKey;
Key getPauseKey() const {
return pauseKey;
}
Subspace getAvailableSpace(int priority = 0) {
@ -216,7 +216,7 @@ private:
Subspace prefix;
Subspace active;
Key disableKey;
Key pauseKey;
// Available task subspaces. Priority 0, the default, will be under available which is backward
// compatible with pre-priority TaskBucket processes. Priority 1 and higher will be in