Address review comments
This commit is contained in:
parent
e32750931b
commit
d5849af5c0
|
@ -41,7 +41,7 @@ void DatabaseConfiguration::resetInternal() {
|
|||
tLogPolicy = storagePolicy = remoteTLogPolicy = Reference<IReplicationPolicy>();
|
||||
remoteDesiredTLogCount = -1;
|
||||
remoteTLogReplicationFactor = repopulateRegionAntiQuorum = 0;
|
||||
backupWorkerEnabled = true;
|
||||
backupWorkerEnabled = false;
|
||||
}
|
||||
|
||||
void parse( int* i, ValueRef const& v ) {
|
||||
|
|
|
@ -900,6 +900,29 @@ namespace fileBackup {
|
|||
return LiteralStringRef("OnSetAddTask");
|
||||
}
|
||||
|
||||
// Clears the backup ID from "backupStartedKey" to pause backup workers.
|
||||
ACTOR static Future<Void> clearBackupStartID(Reference<ReadYourWritesTransaction> tr, UID backupUid) {
|
||||
// If backup worker is not enabled, exit early.
|
||||
Optional<Value> started = wait(tr->get(backupStartedKey));
|
||||
std::vector<std::pair<UID, Version>> ids;
|
||||
if (started.present()) {
|
||||
ids = decodeBackupStartedValue(started.get());
|
||||
}
|
||||
auto it = std::find_if(ids.begin(), ids.end(),
|
||||
[=](const std::pair<UID, Version>& p) { return p.first == backupUid; });
|
||||
if (it != ids.end()) {
|
||||
ids.erase(it);
|
||||
}
|
||||
|
||||
if (ids.empty()) {
|
||||
TraceEvent("ClearBackup").detail("BackupID", backupUid);
|
||||
tr->clear(backupStartedKey);
|
||||
} else {
|
||||
tr->set(backupStartedKey, encodeBackupStartedValue(ids));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Backup and Restore taskFunc definitions will inherit from one of the following classes which
|
||||
// servers to catch and log to the appropriate config any error that execute/finish didn't catch and log.
|
||||
struct RestoreTaskFuncBase : TaskFuncBase {
|
||||
|
@ -2149,8 +2172,8 @@ namespace fileBackup {
|
|||
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
state Key destUidValue = wait(backup.destUidValue().getOrThrow(tr));
|
||||
wait( eraseLogData(tr, backup.getUidAsKey(), destUidValue) );
|
||||
|
||||
wait(eraseLogData(tr, backup.getUidAsKey(), destUidValue) && clearBackupStartID(tr, uid));
|
||||
|
||||
backup.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
|
||||
|
||||
wait(taskBucket->finish(tr, task));
|
||||
|
@ -2336,29 +2359,6 @@ namespace fileBackup {
|
|||
return BackupSnapshotManifest::addTask(tr, taskBucket, parentTask, completionKey, waitFor);
|
||||
}
|
||||
|
||||
// Clears the backup ID from "backupStartedKey" to pause backup workers.
|
||||
ACTOR static Future<Void> clearBackupStartID(Reference<ReadYourWritesTransaction> tr, UID backupUid) {
|
||||
// If backup worker is not enabled, exit early.
|
||||
Optional<Value> started = wait(tr->get(backupStartedKey));
|
||||
std::vector<std::pair<UID, Version>> ids;
|
||||
if (started.present()) {
|
||||
ids = decodeBackupStartedValue(started.get());
|
||||
}
|
||||
auto it = std::find_if(ids.begin(), ids.end(),
|
||||
[=](const std::pair<UID, Version>& p) { return p.first == backupUid; });
|
||||
if (it != ids.end()) {
|
||||
ids.erase(it);
|
||||
}
|
||||
|
||||
if (ids.empty()) {
|
||||
//TraceEvent("ClearBackup").detail("BID", backupUid);
|
||||
tr->clear(backupStartedKey);
|
||||
} else {
|
||||
tr->set(backupStartedKey, encodeBackupStartedValue(ids));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
struct StartFullBackupTaskFunc : BackupTaskFuncBase {
|
||||
static StringRef name;
|
||||
static const uint32_t version;
|
||||
|
@ -2388,7 +2388,7 @@ namespace fileBackup {
|
|||
// Check if backup worker is enabled
|
||||
DatabaseConfiguration dbConfig = wait(getDatabaseConfiguration(cx));
|
||||
if (!dbConfig.backupWorkerEnabled) {
|
||||
return Void();
|
||||
wait(success(changeConfig(cx, "backup_worker_enabled:=1", true)));
|
||||
}
|
||||
|
||||
// Set the "backupStartedKey" and wait for all backup worker started
|
||||
|
@ -2468,7 +2468,7 @@ namespace fileBackup {
|
|||
// task will clean up and set the completed state.
|
||||
wait(success(FileBackupFinishedTask::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), backupFinished)));
|
||||
|
||||
wait(taskBucket->finish(tr, task) && clearBackupStartID(tr, config.getUid()));
|
||||
wait(taskBucket->finish(tr, task));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -3864,7 +3864,8 @@ public:
|
|||
|
||||
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
|
||||
wait(success(tr->getReadVersion()));
|
||||
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue) );
|
||||
wait(eraseLogData(tr, config.getUidAsKey(), destUidValue) &&
|
||||
fileBackup::clearBackupStartID(tr, config.getUid()));
|
||||
|
||||
config.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
|
||||
|
||||
|
@ -3903,8 +3904,9 @@ public:
|
|||
|
||||
// Cancel backup task through tag
|
||||
wait(tag.cancel(tr));
|
||||
|
||||
wait(eraseLogData(tr, config.getUidAsKey(), destUidValue) && fileBackup::clearBackupStartID(tr, config.getUid()));
|
||||
|
||||
wait(eraseLogData(tr, config.getUidAsKey(), destUidValue) &&
|
||||
fileBackup::clearBackupStartID(tr, config.getUid()));
|
||||
|
||||
config.stateEnum().set(tr, EBackupState::STATE_ABORTED);
|
||||
|
||||
|
|
|
@ -92,7 +92,6 @@ struct BackupData {
|
|||
|
||||
KeyRangeMap<std::set<UID>> rangeMap; // Save key ranges to a set of backup UIDs
|
||||
std::map<UID, PerBackupInfo> backups; // Backup UID to infos
|
||||
PromiseStream<std::vector<std::pair<UID, Version>>> backupUidVersions; // active backup (UID, StartVersion) pairs
|
||||
AsyncTrigger changedTrigger;
|
||||
|
||||
CounterCollection cc;
|
||||
|
@ -219,14 +218,23 @@ ACTOR Future<Void> monitorBackupStartedKeyChanges(BackupData* self, bool started
|
|||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> value = wait(tr.get(backupStartedKey));
|
||||
std::vector<std::pair<UID, Version>> uidVersions;
|
||||
if (value.present()) {
|
||||
self->backupUidVersions.send(decodeBackupStartedValue(value.get()));
|
||||
uidVersions = decodeBackupStartedValue(value.get());
|
||||
TraceEvent e("BackupWorkerGotStartKey", self->myId);
|
||||
int i = 1;
|
||||
for (auto uidVersion : uidVersions) {
|
||||
e.detail(format("BackupID%d", i), uidVersion.first)
|
||||
.detail(format("Version%d", i), uidVersion.second);
|
||||
i++;
|
||||
}
|
||||
self->onBackupChanges(uidVersions);
|
||||
if (started) return Void();
|
||||
} else {
|
||||
self->backupUidVersions.send({});
|
||||
TraceEvent("BackupWorkerEmptyStartKey", self->myId);
|
||||
self->onBackupChanges(uidVersions);
|
||||
|
||||
if (!started) {
|
||||
TraceEvent("BackupWorkerNoStartKey", self->myId);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
@ -576,15 +584,6 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
|||
}
|
||||
logSystemChange = self->logSystem.onChange();
|
||||
}
|
||||
when(wait(self->changedTrigger.onTrigger())) {
|
||||
// Check all backups and wait for container and key ranges
|
||||
std::vector<Future<Void>> all;
|
||||
for (auto& uidInfo : self->backups) {
|
||||
if (uidInfo.second.ready || uidInfo.second.stopped) continue;
|
||||
all.push_back(uidInfo.second.waitReady(uidInfo.first));
|
||||
}
|
||||
wait(waitForAll(all));
|
||||
}
|
||||
}
|
||||
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
|
||||
|
||||
|
@ -639,6 +638,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self) {
|
|||
pullFinished = pullAsyncData(self);
|
||||
wait(stopped || pullFinished);
|
||||
if (pullFinished.isReady()) return Void(); // backup is done for some old epoch.
|
||||
pullFinished = Future<Void>(); // cancels pullAsyncData()
|
||||
TraceEvent("BackupWorkerPaused", self->myId);
|
||||
}
|
||||
}
|
||||
|
@ -708,16 +708,6 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
|||
BackupWorkerDoneRequest(self.myId, self.backupEpoch))));
|
||||
break;
|
||||
}
|
||||
when(std::vector<std::pair<UID, Version>> uidVersions = waitNext(self.backupUidVersions.getFuture())) {
|
||||
TraceEvent e("BackupWorkerGotStartKey", self.myId);
|
||||
int i = 1;
|
||||
for (auto uidVersion : uidVersions) {
|
||||
e.detail(format("BackupID%d", i), uidVersion.first.toString())
|
||||
.detail(format("Version%d", i), uidVersion.second);
|
||||
i++;
|
||||
}
|
||||
self.onBackupChanges(uidVersions);
|
||||
}
|
||||
when(wait(error)) {}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -878,12 +878,12 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
|
|||
ASSERT(false); // Programmer forgot to adjust cases.
|
||||
}
|
||||
|
||||
set_config("backup_worker_enabled:=1");
|
||||
if (deterministicRandom()->random01() < 0.5) {
|
||||
int logSpill = deterministicRandom()->randomInt( TLogSpillType::VALUE, TLogSpillType::END );
|
||||
set_config(format("log_spill:=%d", logSpill));
|
||||
int logVersion = deterministicRandom()->randomInt( TLogVersion::MIN_RECRUITABLE, TLogVersion::MAX_SUPPORTED+1 );
|
||||
set_config(format("log_version:=%d", logVersion));
|
||||
set_config("backup_worker_enabled:=1");
|
||||
} else {
|
||||
if (deterministicRandom()->random01() < 0.7)
|
||||
set_config(format("log_version:=%d", TLogVersion::MAX_SUPPORTED));
|
||||
|
|
Loading…
Reference in New Issue