Bug fix, and some code cleanup along the way. If a range backup task dies in finish() the re-run of the task will start at begin == end, which wasn’t being handled correctly.
This commit is contained in:
parent
d9c2f6d705
commit
39edda1804
|
@ -861,8 +861,16 @@ namespace fileBackup {
|
|||
|
||||
Void _ = wait(checkTaskVersion(cx, task, BackupRangeTaskFunc::name, BackupRangeTaskFunc::version));
|
||||
|
||||
state Key beginKey = Params.beginKey().get(task);
|
||||
state Key endKey = Params.endKey().get(task);
|
||||
|
||||
// When a key range task saves the last chunk of progress and then the executor dies, when the task continues
|
||||
// its beginKey and endKey will be equal but there is no work to be done.
|
||||
if(beginKey == endKey)
|
||||
return Void();
|
||||
|
||||
// Find out if there is a shard boundary in(beginKey, endKey)
|
||||
Standalone<VectorRef<KeyRef>> keys = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getBlockOfShards(tr, task->params[FileBackupAgent::keyBeginKey], task->params[FileBackupAgent::keyEndKey], 1); }));
|
||||
Standalone<VectorRef<KeyRef>> keys = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getBlockOfShards(tr, beginKey, endKey, 1); }));
|
||||
if (keys.size() > 0) {
|
||||
Params.addBackupRangeTasks().set(task, true);
|
||||
return Void();
|
||||
|
@ -873,16 +881,12 @@ namespace fileBackup {
|
|||
// task for the remainder.
|
||||
state Reference<IBackupFile> outFile;
|
||||
state Version outVersion = invalidVersion;
|
||||
|
||||
state Key beginKey = Params.beginKey().get(task);
|
||||
state Key endKey = Params.endKey().get(task);
|
||||
state Key lastKey;
|
||||
|
||||
state KeyRange range(KeyRangeRef(beginKey, endKey));
|
||||
|
||||
// retrieve kvData
|
||||
state PromiseStream<RangeResultWithVersion> results;
|
||||
state Future<Void> rc = readCommitted(cx, results, lock, range, true, true, true);
|
||||
|
||||
state Future<Void> rc = readCommitted(cx, results, lock, KeyRangeRef(beginKey, endKey), true, true, true);
|
||||
state RangeFileWriter rangeFile;
|
||||
|
||||
// TODO: Try to run forever until the shard is done, but also update the task after each output file is sync'd
|
||||
|
@ -951,13 +955,15 @@ namespace fileBackup {
|
|||
Void _ = wait(rangeFile.writeKey(beginKey));
|
||||
}
|
||||
|
||||
// write kvData to file
|
||||
state size_t i = 0;
|
||||
for (; i < values.first.size(); ++i) {
|
||||
lastKey = values.first[i].key;
|
||||
Void _ = wait(rangeFile.writeKV(lastKey, values.first[i].value));
|
||||
// write kvData to file, update lastKey and key count
|
||||
if(values.first.size() != 0) {
|
||||
state size_t i = 0;
|
||||
for (; i < values.first.size(); ++i) {
|
||||
Void _ = wait(rangeFile.writeKV(values.first[i].key, values.first[i].value));
|
||||
}
|
||||
lastKey = values.first.back().key;
|
||||
nrKeys += values.first.size();
|
||||
}
|
||||
nrKeys += values.first.size();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -965,7 +971,9 @@ namespace fileBackup {
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Key nextKey = Params.beginKey().get(task);
|
||||
state Standalone<VectorRef<KeyRef>> keys = wait(getBlockOfShards(tr, nextKey, Params.endKey().get(task), CLIENT_KNOBS->BACKUP_SHARD_TASK_LIMIT));
|
||||
state Key endKey = Params.endKey().get(task);
|
||||
|
||||
state Standalone<VectorRef<KeyRef>> keys = wait(getBlockOfShards(tr, nextKey, endKey, CLIENT_KNOBS->BACKUP_SHARD_TASK_LIMIT));
|
||||
|
||||
std::vector<Future<Key>> addTaskVector;
|
||||
for (int idx = 0; idx < keys.size(); ++idx) {
|
||||
|
@ -977,9 +985,9 @@ namespace fileBackup {
|
|||
|
||||
Void _ = wait(waitForAll(addTaskVector));
|
||||
|
||||
if (nextKey != Params.endKey().get(task)) {
|
||||
if (nextKey != endKey) {
|
||||
// Add task to cover nextKey to the end, using the priority of the current task
|
||||
Key _ = wait(addTask(tr, taskBucket, task, nextKey, Params.endKey().get(task), TaskCompletionKey::joinWith(onDone), Reference<TaskFuture>(), task->getPriority()));
|
||||
Key _ = wait(addTask(tr, taskBucket, task, nextKey, endKey, TaskCompletionKey::joinWith(onDone), Reference<TaskFuture>(), task->getPriority()));
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
|
Loading…
Reference in New Issue