FastRestore:Fix:Forgot to assign transformed key to mutations
The earlier change in restore logic did not really take effect. This commit fix it and confirm the addPrefix test does cause expected failures if we do not transform the keys at the end of the test.
This commit is contained in:
parent
1873ba6f90
commit
8d59568dc7
|
@ -197,6 +197,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
||||||
wait(processedFileOffset->whenAtLeast(asset.offset));
|
wait(processedFileOffset->whenAtLeast(asset.offset));
|
||||||
ASSERT(processedFileOffset->get() == asset.offset);
|
ASSERT(processedFileOffset->get() == asset.offset);
|
||||||
|
|
||||||
|
Arena tempArena;
|
||||||
StringRefReader reader(buf, restore_corrupted_data());
|
StringRefReader reader(buf, restore_corrupted_data());
|
||||||
try {
|
try {
|
||||||
// Read block header
|
// Read block header
|
||||||
|
@ -246,10 +247,13 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
||||||
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
|
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
|
||||||
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
|
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
|
||||||
// Remove prefix or add prefix when we restore to a new key space
|
// Remove prefix or add prefix when we restore to a new key space
|
||||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
mutation.param1 =
|
||||||
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
|
||||||
|
mutation.param2 =
|
||||||
|
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
|
||||||
} else {
|
} else {
|
||||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
mutation.param1 =
|
||||||
|
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent(SevFRMutationInfo, "FastRestoreDecodePartitionedLogFile")
|
TraceEvent(SevFRMutationInfo, "FastRestoreDecodePartitionedLogFile")
|
||||||
|
@ -745,6 +749,7 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
|
||||||
MutationsVec& samples = samplesIter->second;
|
MutationsVec& samples = samplesIter->second;
|
||||||
SerializedMutationListMap& mutationMap = *pmutationMap;
|
SerializedMutationListMap& mutationMap = *pmutationMap;
|
||||||
|
|
||||||
|
Arena tempArena;
|
||||||
for (auto& m : mutationMap) {
|
for (auto& m : mutationMap) {
|
||||||
StringRef k = m.first.contents();
|
StringRef k = m.first.contents();
|
||||||
StringRef val = m.second.first.contents();
|
StringRef val = m.second.first.contents();
|
||||||
|
@ -793,10 +798,13 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
|
||||||
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
|
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
|
||||||
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
|
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
|
||||||
// Remove prefix or add prefix if we restore data to a new key space
|
// Remove prefix or add prefix if we restore data to a new key space
|
||||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
mutation.param1 =
|
||||||
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
|
||||||
|
mutation.param2 =
|
||||||
|
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
|
||||||
} else {
|
} else {
|
||||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
mutation.param1 =
|
||||||
|
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
|
||||||
}
|
}
|
||||||
|
|
||||||
cc->sampledLogBytes += mutation.totalSize();
|
cc->sampledLogBytes += mutation.totalSize();
|
||||||
|
@ -837,7 +845,8 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
||||||
.detail("Filename", asset.filename)
|
.detail("Filename", asset.filename)
|
||||||
.detail("Version", version)
|
.detail("Version", version)
|
||||||
.detail("BeginVersion", asset.beginVersion)
|
.detail("BeginVersion", asset.beginVersion)
|
||||||
.detail("EndVersion", asset.endVersion);
|
.detail("EndVersion", asset.endVersion)
|
||||||
|
.detail("RestoreAsset", asset.toString());
|
||||||
// Sanity check the range file is within the restored version range
|
// Sanity check the range file is within the restored version range
|
||||||
ASSERT_WE_THINK(asset.isInVersionRange(version));
|
ASSERT_WE_THINK(asset.isInVersionRange(version));
|
||||||
|
|
||||||
|
@ -888,11 +897,12 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
||||||
const LogMessageVersion msgVersion(version, std::numeric_limits<int32_t>::max());
|
const LogMessageVersion msgVersion(version, std::numeric_limits<int32_t>::max());
|
||||||
|
|
||||||
// Convert KV in data into SET mutations of different keys in kvOps
|
// Convert KV in data into SET mutations of different keys in kvOps
|
||||||
|
Arena tempArena;
|
||||||
for (const KeyValueRef& kv : data) {
|
for (const KeyValueRef& kv : data) {
|
||||||
// NOTE: The KV pairs in range files are the real KV pairs in original DB.
|
// NOTE: The KV pairs in range files are the real KV pairs in original DB.
|
||||||
MutationRef m(MutationRef::Type::SetValue, kv.key, kv.value);
|
MutationRef m(MutationRef::Type::SetValue, kv.key, kv.value);
|
||||||
// Remove prefix or add prefix in case we restore data to a different sub keyspace
|
// Remove prefix or add prefix in case we restore data to a different sub keyspace
|
||||||
m.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
m.param1 = m.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
|
||||||
cc->loadedRangeBytes += m.totalSize();
|
cc->loadedRangeBytes += m.totalSize();
|
||||||
|
|
||||||
// We cache all kv operations into kvOps, and apply all kv operations later in one place
|
// We cache all kv operations into kvOps, and apply all kv operations later in one place
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
#include <cstdarg>
|
#include <cstdarg>
|
||||||
|
|
||||||
#define SevFRMutationInfo SevVerbose
|
#define SevFRMutationInfo SevVerbose
|
||||||
//#define SevFRMutationInfo SevInfo
|
// #define SevFRMutationInfo SevInfo
|
||||||
|
|
||||||
struct VersionedMutation {
|
struct VersionedMutation {
|
||||||
MutationRef mutation;
|
MutationRef mutation;
|
||||||
|
|
|
@ -316,7 +316,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write [begin, end) in kvs to DB
|
// write [begin, end) in kvs to DB
|
||||||
ACTOR static Future<Void> writeKVs(Database cx, Standalone<RangeResultRef> kvs, int begin, int end) {
|
ACTOR static Future<Void> writeKVs(Database cx, Standalone<VectorRef<KeyValueRef>> kvs, int begin, int end) {
|
||||||
while (begin < end) {
|
while (begin < end) {
|
||||||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
@ -341,13 +341,15 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
.detail("RemovePrefix", removePrefix);
|
.detail("RemovePrefix", removePrefix);
|
||||||
Standalone<RangeResultRef> kvs = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
Standalone<RangeResultRef> kvs = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||||
ASSERT(!kvs.more);
|
ASSERT(!kvs.more);
|
||||||
state int i = 0;
|
int i = 0;
|
||||||
|
|
||||||
|
Standalone<VectorRef<KeyValueRef>> newKVs;
|
||||||
for (i = 0; i < kvs.size(); ++i) {
|
for (i = 0; i < kvs.size(); ++i) {
|
||||||
KeyValueRef kv = kvs[i];
|
KeyRef keyRef = kvs[i].key.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||||
kv.key.removePrefix(removePrefix).withPrefix(addPrefix);
|
newKVs.push_back_deep(newKVs.arena(), KeyValueRef(keyRef, kvs[i].value));
|
||||||
}
|
}
|
||||||
|
|
||||||
wait(writeKVs(cx, kvs, 0, kvs.size()));
|
wait(writeKVs(cx, newKVs, 0, newKVs.size()));
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
@ -582,7 +584,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
|
|
||||||
// If addPrefix or removePrefix set, we want to transform the effect by copying data
|
// If addPrefix or removePrefix set, we want to transform the effect by copying data
|
||||||
if (self->hasPrefix()) {
|
if (self->hasPrefix()) {
|
||||||
// wait(transformDatabaseContents(cx, self->removePrefix, self->addPrefix));
|
wait(transformDatabaseContents(cx, self->removePrefix, self->addPrefix));
|
||||||
wait(unlockDatabase(cx, randomID));
|
wait(unlockDatabase(cx, randomID));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue