fixed issue with mutations not applying and allow backup to non-empty db

This commit is contained in:
Jon Fu 2020-08-11 15:39:21 -04:00
parent 21635f8a28
commit 7dce3a9187
3 changed files with 18 additions and 18 deletions

View File

@ -2195,8 +2195,8 @@ ACTOR Future<Void> runRestore(Database db, std::string originalClusterFile, std:
BackupDescription desc = wait(bc->describeBackup());
if (incrementalBackupOnly && desc.contiguousLogEnd.present()) {
targetVersion = desc.contiguousLogEnd.get();
if (incrementalBackupOnly && desc.maxLogEnd.present()) {
targetVersion = desc.maxLogEnd.get() - 1;
} else if (desc.maxRestorableVersion.present()) {
targetVersion = desc.maxRestorableVersion.get();
} else {

View File

@ -1362,6 +1362,8 @@ public:
state RestorableFileSet restorableSet;
state std::vector<LogFile> logFiles;
wait(store(logFiles, bc->listLogFiles(0, targetVersion, false)));
// List logs in version order so log continuity can be analyzed
std::sort(logFiles.begin(), logFiles.end());
if (!logFiles.empty()) {
Version end = logFiles.begin()->endVersion;
computeRestoreEndVersion(logFiles, &restorableSet.logs, &end, targetVersion);

View File

@ -2890,7 +2890,6 @@ namespace fileBackup {
wait(tr->onError(e));
}
}
TraceEvent("FileRestoreLogCheckpoint1");
state Key mutationLogPrefix = restore.mutationLogPrefix();
state Reference<IAsyncFile> inFile = wait(bc->readFile(logFile.fileName));
@ -2911,15 +2910,9 @@ namespace fileBackup {
state int i = start;
state int txBytes = 0;
TraceEvent("FileRestoreLogCheckpoint2")
.detail("I", i)
.detail("End", end);
for(; i < end && txBytes < dataSizeLimit; ++i) {
Key k = data[i].key.withPrefix(mutationLogPrefix);
ValueRef v = data[i].value;
TraceEvent("FileRestoreTrackKV")
.detail("K", k)
.detail("V", v);
tr->set(k, v);
txBytes += k.expectedSize();
txBytes += v.expectedSize();
@ -2930,7 +2923,6 @@ namespace fileBackup {
wait(taskBucket->keepRunning(tr, task));
wait( checkLock );
TraceEvent("FileRestoreLogCheckpoint3");
// Add to bytes written count
restore.bytesWritten().atomicOp(tr, txBytes, MutationRef::Type::AddValue);
@ -3176,9 +3168,6 @@ namespace fileBackup {
break;
if(f.isRange) {
// if (incrementalBackupOnly.get().present() && incrementalBackupOnly.get().get()) {
// continue;
// }
addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, taskBucket, task,
f, j, std::min<int64_t>(f.blockSize, f.fileSize - j),
TaskCompletionKey::joinWith(allPartsDone)));
@ -3467,7 +3456,6 @@ namespace fileBackup {
wait(tr->onError(e));
}
}
TraceEvent("RestoreCheckpoint1");
state Future<Optional<bool>> logsOnly = restore.incrementalBackupOnly().get(tr);
wait(success(logsOnly));
state bool incremental = false;
@ -3480,9 +3468,6 @@ namespace fileBackup {
beginVer = restorable.get().snapshot.beginVersion;
}
TraceEvent("RestoreCheckpoint2")
.detail("BeginVer", beginVer)
.detail("Incremental", incremental);
if(!restorable.present())
throw restore_missing_data();
@ -3560,12 +3545,21 @@ namespace fileBackup {
restore.stateEnum().set(tr, ERestoreState::RUNNING);
// Set applyMutation versions
restore.setApplyBeginVersion(tr, firstVersion);
restore.setApplyEndVersion(tr, firstVersion);
// Apply range data and log data in order
wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, 0, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE)));
state Future<Optional<bool>> logsOnly = restore.incrementalBackupOnly().get(tr);
wait(success(logsOnly));
if (logsOnly.isReady() && logsOnly.get().present() && logsOnly.get().get()) {
// If this is an incremental restore, we need to set the applyMutationsMapPrefix
// to the earliest log version so no mutations are missed
Value versionEncoded = BinaryWriter::toValue(Params.firstVersion().get(task), Unversioned());
wait(krmSetRange(tr, restore.applyMutationsMapPrefix(), normalKeys, versionEncoded));
}
wait(taskBucket->finish(tr, task));
return Void();
}
@ -3955,7 +3949,7 @@ public:
for (index = 0; index < restoreRanges.size(); index++) {
KeyRange restoreIntoRange = KeyRangeRef(restoreRanges[index].begin, restoreRanges[index].end).removePrefix(removePrefix).withPrefix(addPrefix);
Standalone<RangeResultRef> existingRows = wait(tr->getRange(restoreIntoRange, 1));
if (existingRows.size() > 0) {
if (existingRows.size() > 0 && !incrementalBackupOnly) {
throw restore_destination_not_empty();
}
}
@ -4502,6 +4496,10 @@ public:
if(targetVersion == invalidVersion && desc.maxRestorableVersion.present())
targetVersion = desc.maxRestorableVersion.get();
if (targetVersion == invalidVersion && incrementalBackupOnly && desc.maxLogEnd.present()) {
targetVersion = desc.maxLogEnd.get() - 1;
}
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion, incrementalBackupOnly));
if(!restoreSet.present()) {