Attempt to fix

This commit is contained in:
Jingyu Zhou 2020-03-27 16:24:07 -07:00
parent 78f74ed6fe
commit f0d0c863e6
1 changed files with 9 additions and 4 deletions

View File

@ -2115,18 +2115,20 @@ namespace fileBackup {
// If a snapshot has ended for this backup then mutations are higher priority to reduce backup lag
state int priority = latestSnapshotEndVersion.present() ? 1 : 0;
wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, priority, beginVersion, endVersion, TaskCompletionKey::signal(onDone), logDispatchBatchFuture)));
// Skip mutation copy and erase backup mutations for partitioned logs
if (!partitionedLog.present() || !partitionedLog.get()) {
// Add the initial log range task to read/copy the mutations and the next logs dispatch task which will run after this batch is done
wait(success(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, priority, beginVersion, endVersion, TaskCompletionKey::joinWith(logDispatchBatchFuture))));
wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, priority, beginVersion, endVersion, TaskCompletionKey::signal(onDone), logDispatchBatchFuture)));
// Do not erase at the first time
if (prevBeginVersion > 0) {
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue, Optional<Version>(beginVersion)) );
}
} else {
// Skip mutation copy and erase backup mutations. Just check back periodically.
Version scheduledVersion = tr->getReadVersion().get() + 10 * SERVER_KNOBS->VERSIONS_PER_SECOND;
wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, 1, beginVersion, endVersion, TaskCompletionKey::noSignal(), Reference<TaskFuture>(), scheduledVersion)));
}
wait(taskBucket->finish(tr, task));
@ -2140,7 +2142,7 @@ namespace fileBackup {
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, int priority, Version prevBeginVersion, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, int priority, Version prevBeginVersion, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>(), Version scheduledVersion = invalidVersion) {
Key key = wait(addBackupTask(BackupLogsDispatchTask::name,
BackupLogsDispatchTask::version,
tr, taskBucket, completionKey,
@ -2149,6 +2151,9 @@ namespace fileBackup {
[=](Reference<Task> task) {
Params.prevBeginVersion().set(task, prevBeginVersion);
Params.beginVersion().set(task, beginVersion);
if (scheduledVersion != invalidVersion) {
ReservedTaskParams::scheduledVersion().set(task, scheduledVersion);
}
},
priority));
return key;