Initialize apply mutations map for restore to version (#10857)

This commit is contained in:
Hui Liu 2023-09-05 10:03:35 -07:00 committed by GitHub
parent 3c5a38b120
commit 00d3062728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 25 deletions

View File

@ -314,25 +314,19 @@ public:
this->addPrefix().set(tr, addPrefix);
this->removePrefix().set(tr, removePrefix);
// Skip applyMutationsKeyVersionCount, applyMutationsKeyVersionMap, applyMutationsEnd, applyMutationsBegin
// for only-apply-mutation-logs restore. They were initialized in preloadApplyMutationsKeyVersionMap
clearApplyMutationsKeys(tr, onlyApplyMutationLogs);
clearApplyMutationsKeys(tr);
// Initialize add/remove prefix, range version map count and set the map's start key to InvalidVersion
tr->set(uidPrefixKey(applyMutationsAddPrefixRange.begin, uid), addPrefix);
tr->set(uidPrefixKey(applyMutationsRemovePrefixRange.begin, uid), removePrefix);
// Skip init applyMutationsKeyVersionCount and applyMutationsKeyVersionMap for only-apply-mutation-logs restore.
// They were initialized in preloadApplyMutationsKeyVersionMap
if (!onlyApplyMutationLogs) {
int64_t startCount = 0;
tr->set(uidPrefixKey(applyMutationsKeyVersionCountRange.begin, uid), StringRef((uint8_t*)&startCount, 8));
Key mapStart = uidPrefixKey(applyMutationsKeyVersionMapRange.begin, uid);
tr->set(mapStart, BinaryWriter::toValue<Version>(invalidVersion, Unversioned()));
}
int64_t startCount = 0;
tr->set(uidPrefixKey(applyMutationsKeyVersionCountRange.begin, uid), StringRef((uint8_t*)&startCount, 8));
Key mapStart = uidPrefixKey(applyMutationsKeyVersionMapRange.begin, uid);
tr->set(mapStart, BinaryWriter::toValue<Version>(invalidVersion, Unversioned()));
}
void clearApplyMutationsKeys(Reference<ReadYourWritesTransaction> tr, bool skipMutationKeyVersionMap = false) {
void clearApplyMutationsKeys(Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
// Clear add/remove prefix keys
@ -340,21 +334,17 @@ public:
tr->clear(uidPrefixKey(applyMutationsRemovePrefixRange.begin, uid));
// Clear range version map and count key
if (!skipMutationKeyVersionMap) {
tr->clear(uidPrefixKey(applyMutationsKeyVersionCountRange.begin, uid));
Key mapStart = uidPrefixKey(applyMutationsKeyVersionMapRange.begin, uid);
tr->clear(KeyRangeRef(mapStart, strinc(mapStart)));
}
tr->clear(uidPrefixKey(applyMutationsKeyVersionCountRange.begin, uid));
Key mapStart = uidPrefixKey(applyMutationsKeyVersionMapRange.begin, uid);
tr->clear(KeyRangeRef(mapStart, strinc(mapStart)));
// Clear any loaded mutations that have not yet been applied
Key mutationPrefix = mutationLogPrefix();
tr->clear(KeyRangeRef(mutationPrefix, strinc(mutationPrefix)));
if (!skipMutationKeyVersionMap) {
// Clear end and begin versions (intentionally in this order)
tr->clear(uidPrefixKey(applyMutationsEndRange.begin, uid));
tr->clear(uidPrefixKey(applyMutationsBeginRange.begin, uid));
}
// Clear end and begin versions (intentionally in this order)
tr->clear(uidPrefixKey(applyMutationsEndRange.begin, uid));
tr->clear(uidPrefixKey(applyMutationsBeginRange.begin, uid));
}
void setApplyBeginVersion(Reference<ReadYourWritesTransaction> tr, Version ver) {
@ -5011,11 +5001,12 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
state RestoreConfig restore(task);
state Version firstVersion = Params.firstVersion().getOrDefault(task, invalidVersion);
state bool isBlobGranuleRestore;
wait(store(isBlobGranuleRestore, restore.isBlobGranuleRestore().getD(tr, Snapshot::False, false)));
if (firstVersion == invalidVersion) {
// For blob granule restore, we can complete the restore job if no mutation log is needed
state bool isBlobGranuleRestore;
wait(store(isBlobGranuleRestore, restore.isBlobGranuleRestore().getD(tr, Snapshot::False, false)));
if (isBlobGranuleRestore) {
// For blob granule restore, we can complete the restore job if no mutation log is needed
state Version beginVersion;
state Version restoreVersion;
wait(store(beginVersion, restore.beginVersion().getD(tr, Snapshot::False, ::invalidVersion)));
@ -5048,6 +5039,19 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
tr, taskBucket, task, 0, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE)));
wait(taskBucket->finish(tr, task));
// Initialize apply mutations map for non-blob-restore case. For blob restore, it's initialized
// in blob migrator
if (!isBlobGranuleRestore) {
state Future<Optional<bool>> logsOnly = restore.onlyApplyMutationLogs().get(tr);
wait(success(logsOnly));
if (logsOnly.get().present() && logsOnly.get().get()) {
// If this is an incremental restore, we need to set the applyMutationsMapPrefix
// to the earliest log version so no mutations are missed
Value versionEncoded = BinaryWriter::toValue(Params.firstVersion().get(task), Unversioned());
wait(krmSetRange(tr, restore.applyMutationsMapPrefix(), normalKeys, versionEncoded));
}
}
return Void();
}