added the ability to enable and disable all backup and DR agents from fdbbackup and fdbdr.
This commit is contained in:
parent
5a4a5985fd
commit
fb89ae9f85
|
@ -73,11 +73,11 @@ enum enumProgramExe {
|
|||
};
|
||||
|
||||
enum enumBackupType {
|
||||
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE
|
||||
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_DISABLE, BACKUP_ENABLE
|
||||
};
|
||||
|
||||
enum enumDBType {
|
||||
DB_UNDEFINED=0, DB_START, DB_STATUS, DB_SWITCH, DB_ABORT
|
||||
DB_UNDEFINED=0, DB_START, DB_STATUS, DB_SWITCH, DB_ABORT, DB_DISABLE, DB_ENABLE
|
||||
};
|
||||
|
||||
enum enumRestoreType {
|
||||
|
@ -277,6 +277,29 @@ CSimpleOpt::SOption g_rgBackupWaitOptions[] = {
|
|||
SO_END_OF_OPTIONS
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgBackupDisableOptions[] = {
|
||||
#ifdef _WIN32
|
||||
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
|
||||
#endif
|
||||
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
|
||||
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
|
||||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
{ OPT_VERSION, "--version", SO_NONE },
|
||||
{ OPT_VERSION, "-v", SO_NONE },
|
||||
{ OPT_CRASHONERROR, "--crash", SO_NONE },
|
||||
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
|
||||
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
|
||||
{ OPT_HELP, "-?", SO_NONE },
|
||||
{ OPT_HELP, "-h", SO_NONE },
|
||||
{ OPT_HELP, "--help", SO_NONE },
|
||||
{ OPT_DEVHELP, "--dev-help", SO_NONE },
|
||||
|
||||
SO_END_OF_OPTIONS
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgRestoreOptions[] = {
|
||||
#ifdef _WIN32
|
||||
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
|
||||
|
@ -455,6 +478,31 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
|
|||
SO_END_OF_OPTIONS
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgDBDisableOptions[] = {
|
||||
#ifdef _WIN32
|
||||
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
|
||||
#endif
|
||||
{ OPT_SOURCE_CLUSTER, "-s", SO_REQ_SEP },
|
||||
{ OPT_SOURCE_CLUSTER, "--source", SO_REQ_SEP },
|
||||
{ OPT_DEST_CLUSTER, "-d", SO_REQ_SEP },
|
||||
{ OPT_DEST_CLUSTER, "--destination", SO_REQ_SEP },
|
||||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
{ OPT_VERSION, "--version", SO_NONE },
|
||||
{ OPT_VERSION, "-v", SO_NONE },
|
||||
{ OPT_CRASHONERROR, "--crash", SO_NONE },
|
||||
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
|
||||
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
|
||||
{ OPT_HELP, "-?", SO_NONE },
|
||||
{ OPT_HELP, "-h", SO_NONE },
|
||||
{ OPT_HELP, "--help", SO_NONE },
|
||||
{ OPT_DEVHELP, "--dev-help", SO_NONE },
|
||||
|
||||
SO_END_OF_OPTIONS
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgBlobOptions[] = {
|
||||
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
|
||||
{ OPT_VERSION, "--version", SO_NONE },
|
||||
|
@ -548,7 +596,7 @@ void printBackupContainerInfo() {
|
|||
|
||||
static void printBackupUsage(bool devhelp) {
|
||||
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
|
||||
printf("Usage: %s (start | status | abort | wait | discontinue) [OPTIONS]\n\n", exeBackup.toString().c_str());
|
||||
printf("Usage: %s (start | status | abort | wait | discontinue | disable | enable) [OPTIONS]\n\n", exeBackup.toString().c_str());
|
||||
printf(" -C CONNFILE The path of a file containing the connection string for the\n"
|
||||
" FoundationDB cluster. The default is first the value of the\n"
|
||||
" FDB_CLUSTER_FILE environment variable, then `./fdb.cluster',\n"
|
||||
|
@ -644,7 +692,7 @@ static void printDBAgentUsage(bool devhelp) {
|
|||
|
||||
static void printDBBackupUsage(bool devhelp) {
|
||||
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
|
||||
printf("Usage: %s (start | status | switch | abort) [OPTIONS]\n\n", exeDatabaseBackup.toString().c_str());
|
||||
printf("Usage: %s (start | status | switch | abort | disable | enable) [OPTIONS]\n\n", exeDatabaseBackup.toString().c_str());
|
||||
printf(" -d, --destination CONNFILE\n"
|
||||
" The path of a file containing the connection string for the\n");
|
||||
printf(" destination FoundationDB cluster.\n");
|
||||
|
@ -806,6 +854,8 @@ enumBackupType getBackupType(std::string backupType)
|
|||
values["abort"] = BACKUP_ABORT;
|
||||
values["wait"] = BACKUP_WAIT;
|
||||
values["discontinue"] = BACKUP_DISCONTINUE;
|
||||
values["disable"] = BACKUP_DISABLE;
|
||||
values["enable"] = BACKUP_ENABLE;
|
||||
}
|
||||
|
||||
auto i = values.find(backupType);
|
||||
|
@ -836,6 +886,8 @@ enumDBType getDBType(std::string dbType)
|
|||
values["status"] = DB_STATUS;
|
||||
values["switch"] = DB_SWITCH;
|
||||
values["abort"] = DB_ABORT;
|
||||
values["disable"] = DB_DISABLE;
|
||||
values["enable"] = DB_ENABLE;
|
||||
}
|
||||
|
||||
auto i = values.find(dbType);
|
||||
|
@ -864,7 +916,6 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
op.create("version") = readVer + 120 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
|
||||
|
||||
layerRoot.create("instances_running.$sum") = 1;
|
||||
layerRoot.create("total_workers.$sum") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
|
||||
layerRoot.create("last_updated.$max") = now();
|
||||
|
||||
state JSONDoc o = layerRoot.subDoc("instances." + id);
|
||||
|
@ -876,7 +927,7 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
o.create("resident_size") = (int64_t)getResidentMemoryUsage();
|
||||
o.create("main_thread_cpu_seconds") = getProcessorTimeThread();
|
||||
o.create("process_cpu_seconds") = getProcessorTimeProcess();
|
||||
o.create("workers") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
|
||||
o.create("configured_workers") = CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
|
||||
|
||||
if(exe == EXE_AGENT) {
|
||||
static BlobStoreEndpoint::Stats last_stats;
|
||||
|
@ -899,10 +950,11 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
state FileBackupAgent fba;
|
||||
state std::vector<KeyBackedTag> backupTags = wait(getAllBackupTags(tr));
|
||||
state std::vector<Future<Version>> tagLastRestorableVersions;
|
||||
state std::vector<Future<int>> tagStates;
|
||||
state std::vector<Future<EBackupState>> tagStates;
|
||||
state std::vector<Future<std::string>> tagContainers;
|
||||
state std::vector<Future<int64_t>> tagRangeBytes;
|
||||
state std::vector<Future<int64_t>> tagLogBytes;
|
||||
state Future<Optional<Value>> fBackupDisabled = tr->get(fba.taskBucket->getDisableKey());
|
||||
state int i = 0;
|
||||
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
@ -910,27 +962,21 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
state std::vector<KeyBackedTag>::iterator tag;
|
||||
for (tag = backupTags.begin(); tag != backupTags.end(); tag++) {
|
||||
UidAndAbortedFlagT uidAndAbortedFlag = wait(tag->getOrThrow(tr));
|
||||
state BackupConfig config(uidAndAbortedFlag.first);
|
||||
|
||||
EBackupState status = wait(config.stateEnum().getOrThrow(tr));
|
||||
tagStates.push_back(status);
|
||||
|
||||
int64_t rangeBytesWritten = wait(config.rangeBytesWritten().getD(tr, 0));
|
||||
tagRangeBytes.push_back(rangeBytesWritten);
|
||||
|
||||
int64_t logBytesWritten = wait(config.logBytesWritten().getD(tr, 0));
|
||||
tagLogBytes.push_back(logBytesWritten);
|
||||
|
||||
std::string backupContainer = wait(config.backupContainer().getOrThrow(tr));
|
||||
tagContainers.push_back(backupContainer);
|
||||
BackupConfig config(uidAndAbortedFlag.first);
|
||||
|
||||
tagStates.push_back(config.stateEnum().getOrThrow(tr));
|
||||
tagRangeBytes.push_back(config.rangeBytesWritten().getD(tr, 0));
|
||||
tagLogBytes.push_back(config.logBytesWritten().getD(tr, 0));
|
||||
tagContainers.push_back(config.backupContainer().getOrThrow(tr));
|
||||
tagLastRestorableVersions.push_back(fba.getLastRestorable(tr, StringRef(tag->tagName)));
|
||||
}
|
||||
|
||||
Void _ = wait( waitForAll(tagLastRestorableVersions) && waitForAll(tagStates) && waitForAll(tagContainers) && waitForAll(tagRangeBytes) && waitForAll(tagLogBytes));
|
||||
Void _ = wait( waitForAll(tagLastRestorableVersions) && waitForAll(tagStates) && waitForAll(tagContainers) && waitForAll(tagRangeBytes) && waitForAll(tagLogBytes) && success(fBackupDisabled));
|
||||
|
||||
JSONDoc tagsRoot = layerRoot.subDoc("tags.$latest");
|
||||
layerRoot.create("tags.timestamp") = now();
|
||||
layerRoot.create("total_workers.$sum") = fBackupDisabled.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
|
||||
layerRoot.create("disabled.$latest") = fBackupDisabled.get().present();
|
||||
|
||||
int j = 0;
|
||||
for (KeyBackedTag eachTag : backupTags) {
|
||||
|
@ -963,6 +1009,7 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
state std::vector<Future<int>> backupStatus;
|
||||
state std::vector<Future<int64_t>> tagRangeBytesDR;
|
||||
state std::vector<Future<int64_t>> tagLogBytesDR;
|
||||
state Future<Optional<Value>> fDRDisabled = tr->get(dba.taskBucket->getDisableKey());
|
||||
|
||||
for(int i = 0; i < tagNames.size(); i++) {
|
||||
backupVersion.push_back(tr2->get(tagNames[i].value.withPrefix(applyMutationsBeginRange.begin)));
|
||||
|
@ -972,10 +1019,12 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
tagLogBytesDR.push_back(dba.getLogBytesWritten(tr2, tagUID));
|
||||
}
|
||||
|
||||
Void _ = wait(waitForAll(backupStatus) && waitForAll(backupVersion) && waitForAll(tagRangeBytesDR) && waitForAll(tagLogBytesDR));
|
||||
Void _ = wait(waitForAll(backupStatus) && waitForAll(backupVersion) && waitForAll(tagRangeBytesDR) && waitForAll(tagLogBytesDR) && success(fDRDisabled));
|
||||
|
||||
JSONDoc tagsRoot = layerRoot.subDoc("tags.$latest");
|
||||
layerRoot.create("tags.timestamp") = now();
|
||||
layerRoot.create("total_workers.$sum") = fDRDisabled.get().present() ? 0 : CLIENT_KNOBS->BACKUP_TASKS_PER_AGENT;
|
||||
layerRoot.create("disabled.$latest") = fDRDisabled.get().present();
|
||||
|
||||
for (int i = 0; i < tagNames.size(); i++) {
|
||||
std::string tagName = dba.sourceTagNames.unpack(tagNames[i].key).getString(0).toString();
|
||||
|
@ -1529,6 +1578,38 @@ ACTOR Future<Void> discontinueBackup(Database db, std::string tagName, bool wait
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeBackupEnabled(Database db, bool disable) {
|
||||
try {
|
||||
state FileBackupAgent backupAgent;
|
||||
Void _ = wait(backupAgent.taskBucket->changeDisable(db, disable));
|
||||
printf("All backup agents have been %s.\n", disable ? "disabled" : "enabled");
|
||||
}
|
||||
catch (Error& e) {
|
||||
if(e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
fprintf(stderr, "ERROR: %s\n", e.what());
|
||||
throw;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeDBBackupEnabled(Database src, Database dest, bool disable) {
|
||||
try {
|
||||
state DatabaseBackupAgent backupAgent(src);
|
||||
Void _ = wait(backupAgent.taskBucket->changeDisable(dest, disable));
|
||||
printf("All DR agents have been %s.\n", disable ? "disabled" : "enabled");
|
||||
}
|
||||
catch (Error& e) {
|
||||
if(e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
fprintf(stderr, "ERROR: %s\n", e.what());
|
||||
throw;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> runRestore(Database db, std::string tagName, std::string container, Standalone<VectorRef<KeyRangeRef>> ranges, Version dbVersion, bool performRestore, bool verbose, bool waitForDone, std::string addPrefix, std::string removePrefix) {
|
||||
try
|
||||
{
|
||||
|
@ -1930,6 +2011,12 @@ int main(int argc, char* argv[]) {
|
|||
case BACKUP_DISCONTINUE:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDiscontinueOptions, SO_O_EXACT);
|
||||
break;
|
||||
case BACKUP_DISABLE:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDisableOptions, SO_O_EXACT);
|
||||
break;
|
||||
case BACKUP_ENABLE:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupDisableOptions, SO_O_EXACT);
|
||||
break;
|
||||
case BACKUP_UNDEFINED:
|
||||
default:
|
||||
// Display help, if requested
|
||||
|
@ -1973,6 +2060,12 @@ int main(int argc, char* argv[]) {
|
|||
case DB_ABORT:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBAbortOptions, SO_O_EXACT);
|
||||
break;
|
||||
case DB_DISABLE:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBDisableOptions, SO_O_EXACT);
|
||||
break;
|
||||
case DB_ENABLE:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgDBDisableOptions, SO_O_EXACT);
|
||||
break;
|
||||
case DB_UNDEFINED:
|
||||
default:
|
||||
// Display help, if requested
|
||||
|
@ -2517,6 +2610,14 @@ int main(int argc, char* argv[]) {
|
|||
f = stopAfter( discontinueBackup(db, tagName, waitForDone) );
|
||||
break;
|
||||
|
||||
case BACKUP_DISABLE:
|
||||
f = stopAfter( changeBackupEnabled(db, true) );
|
||||
break;
|
||||
|
||||
case BACKUP_ENABLE:
|
||||
f = stopAfter( changeBackupEnabled(db, false) );
|
||||
break;
|
||||
|
||||
case BACKUP_UNDEFINED:
|
||||
default:
|
||||
fprintf(stderr, "ERROR: Unsupported backup action %s\n", argv[1]);
|
||||
|
@ -2557,7 +2658,7 @@ int main(int argc, char* argv[]) {
|
|||
case EXE_DR_AGENT:
|
||||
f = stopAfter( runDBAgent(source_db, db) );
|
||||
break;
|
||||
case EXE_DB_BACKUP: //DB_START, DB_STATUS, DB_SWITCH, DB_ABORT, DB_CLEANUP
|
||||
case EXE_DB_BACKUP:
|
||||
switch (dbType)
|
||||
{
|
||||
case DB_START:
|
||||
|
@ -2572,6 +2673,12 @@ int main(int argc, char* argv[]) {
|
|||
case DB_ABORT:
|
||||
f = stopAfter( abortDBBackup(source_db, db, tagName, partial) );
|
||||
break;
|
||||
case DB_DISABLE:
|
||||
f = stopAfter( changeDBBackupEnabled(source_db, db, true) );
|
||||
break;
|
||||
case DB_ENABLE:
|
||||
f = stopAfter( changeDBBackupEnabled(source_db, db, false) );
|
||||
break;
|
||||
case DB_UNDEFINED:
|
||||
default:
|
||||
fprintf(stderr, "ERROR: Unsupported DR action %s\n", argv[1]);
|
||||
|
|
|
@ -1633,6 +1633,7 @@ public:
|
|||
|
||||
statusText = "";
|
||||
|
||||
state Future<Optional<Value>> fDisabled = tr->get(backupAgent->taskBucket->getDisableKey());
|
||||
state UID logUid = wait(backupAgent->getLogUid(tr, tagName));
|
||||
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
@ -1722,6 +1723,11 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
Optional<Value> disabled = wait(fDisabled);
|
||||
if(disabled.present()) {
|
||||
statusText += format("\nAll DR agents have been disabled.\n");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
catch (Error &e) {
|
||||
|
|
|
@ -3503,6 +3503,7 @@ public:
|
|||
|
||||
statusText = "";
|
||||
tag = makeBackupTag(tagName);
|
||||
state Future<Optional<Value>> fDisabled = tr->get(backupAgent->taskBucket->getDisableKey());
|
||||
state Optional<UidAndAbortedFlagT> uidAndAbortedFlag = wait(tag.get(tr));
|
||||
if (uidAndAbortedFlag.present()) {
|
||||
config = BackupConfig(uidAndAbortedFlag.get().first);
|
||||
|
@ -3544,6 +3545,12 @@ public:
|
|||
statusText += format("[%lld]: %s\n", errMsg.get().second, errMsg.get().first.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
Optional<Value> disabled = wait(fDisabled);
|
||||
if(disabled.present()) {
|
||||
statusText += format("\nAll backup agents have been disabled.\n");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
catch (Error &e) {
|
||||
|
|
|
@ -386,7 +386,7 @@ public:
|
|||
return true;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
|
||||
ACTOR static Future<Void> dispatch(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
|
||||
state std::vector<Future<bool>> tasks(maxConcurrentTasks);
|
||||
for(auto &f : tasks)
|
||||
f = Never();
|
||||
|
@ -449,6 +449,38 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> watchDisabled(Database cx, Reference<TaskBucket> taskBucket, Reference<AsyncVar<bool>> disabled) {
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
loop {
|
||||
try {
|
||||
taskBucket->setOptions(tr);
|
||||
Optional<Value> disabledVal = wait(tr->get(taskBucket->disableKey));
|
||||
disabled->set(disabledVal.present());
|
||||
state Future<Void> watchDisabledFuture = tr->watch(taskBucket->disableKey);
|
||||
Void _ = wait(tr->commit());
|
||||
Void _ = wait(watchDisabledFuture);
|
||||
}
|
||||
catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
|
||||
state Reference<AsyncVar<bool>> disabled = Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) );
|
||||
state Future<Void> watchDisabledFuture = watchDisabled(cx, taskBucket, disabled);
|
||||
|
||||
loop {
|
||||
while(disabled->get()) {
|
||||
Void _ = wait(disabled->onChange() || watchDisabledFuture);
|
||||
}
|
||||
|
||||
Void _ = wait(dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || disabled->onChange() || watchDisabledFuture);
|
||||
}
|
||||
}
|
||||
|
||||
static Future<Standalone<StringRef>> addIdle(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket) {
|
||||
taskBucket->setOptions(tr);
|
||||
|
||||
|
@ -693,6 +725,7 @@ TaskBucket::TaskBucket(const Subspace& subspace, bool sysAccess, bool priorityBa
|
|||
, available(prefix.get(LiteralStringRef("av")))
|
||||
, available_prioritized(prefix.get(LiteralStringRef("avp")))
|
||||
, timeouts(prefix.get(LiteralStringRef("to")))
|
||||
, disableKey(prefix.pack(LiteralStringRef("disable")))
|
||||
, timeout(CLIENT_KNOBS->TASKBUCKET_TIMEOUT_VERSIONS)
|
||||
, system_access(sysAccess)
|
||||
, priority_batch(priorityBatch)
|
||||
|
@ -711,6 +744,18 @@ Future<Void> TaskBucket::clear(Reference<ReadYourWritesTransaction> tr){
|
|||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> TaskBucket::changeDisable(Reference<ReadYourWritesTransaction> tr, bool disable){
|
||||
setOptions(tr);
|
||||
|
||||
if(disable) {
|
||||
tr->set(disableKey, StringRef());
|
||||
} else {
|
||||
tr->clear(disableKey);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Key TaskBucket::addTask(Reference<ReadYourWritesTransaction> tr, Reference<Task> task) {
|
||||
setOptions(tr);
|
||||
|
||||
|
@ -775,6 +820,10 @@ Future<Void> TaskBucket::run(Database cx, Reference<FutureBucket> futureBucket,
|
|||
return TaskBucketImpl::run(cx, Reference<TaskBucket>::addRef(this), futureBucket, pollDelay, maxConcurrentTasks);
|
||||
}
|
||||
|
||||
Future<Void> TaskBucket::watchDisabled(Database cx, Reference<AsyncVar<bool>> disabled) {
|
||||
return TaskBucketImpl::watchDisabled(cx, Reference<TaskBucket>::addRef(this), disabled);
|
||||
}
|
||||
|
||||
Future<bool> TaskBucket::isEmpty(Reference<ReadYourWritesTransaction> tr){
|
||||
return TaskBucketImpl::isEmpty(tr, Reference<TaskBucket>::addRef(this));
|
||||
}
|
||||
|
|
|
@ -69,6 +69,11 @@ public:
|
|||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
}
|
||||
|
||||
Future<Void> changeDisable(Reference<ReadYourWritesTransaction> tr, bool disable);
|
||||
Future<Void> changeDisable(Database cx, bool disable) {
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return changeDisable(tr, disable); });
|
||||
}
|
||||
|
||||
Future<Void> clear(Reference<ReadYourWritesTransaction> tr);
|
||||
Future<Void> clear(Database cx) {
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return clear(tr); });
|
||||
|
@ -99,6 +104,7 @@ public:
|
|||
Future<bool> doOne(Database cx, Reference<FutureBucket> futureBucket);
|
||||
|
||||
Future<Void> run(Database cx, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks);
|
||||
Future<Void> watchDisabled(Database cx, Reference<AsyncVar<bool>> disabled);
|
||||
|
||||
Future<bool> isEmpty(Reference<ReadYourWritesTransaction> tr);
|
||||
Future<bool> isEmpty(Database cx){
|
||||
|
@ -148,6 +154,10 @@ public:
|
|||
return lock_aware;
|
||||
}
|
||||
|
||||
Key getDisableKey() const {
|
||||
return disableKey;
|
||||
}
|
||||
|
||||
Subspace getAvailableSpace(int priority = 0) {
|
||||
if(priority == 0)
|
||||
return available;
|
||||
|
@ -165,6 +175,7 @@ private:
|
|||
|
||||
Subspace prefix;
|
||||
Subspace active;
|
||||
Key disableKey;
|
||||
|
||||
// Available task subspaces. Priority 0, the default, will be under available which is backward
|
||||
// compatible with pre-priority TaskBucket processes. Priority 1 and higher will be in
|
||||
|
|
Loading…
Reference in New Issue