Move transformRestoredDatabase from server to client
AtomicRestore workload turns out to rely on the FileBackupAgent client. Keeping transformRestoredDatabase in server makes linking harder.
This commit is contained in:
parent
6eb8dc58b6
commit
198696bc1e
|
@ -937,5 +937,12 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
|
|||
Value makePadding(int size);
|
||||
}
|
||||
|
||||
// For fast restore simulation test
|
||||
// For testing addPrefix feature in fast restore.
|
||||
// Transform db content in restoreRanges by removePrefix and then addPrefix.
|
||||
// Assume: DB is locked
|
||||
ACTOR Future<Void> transformRestoredDatabase(Database cx, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
Key addPrefix, Key removePrefix);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -41,6 +41,9 @@
|
|||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
#define SevFRTestInfo SevVerbose
|
||||
//#define SevFRTestInfo SevInfo
|
||||
|
||||
static std::string boolToYesOrNo(bool val) { return val ? std::string("Yes") : std::string("No"); }
|
||||
|
||||
static std::string versionToString(Optional<Version> version) {
|
||||
|
@ -4602,7 +4605,7 @@ public:
|
|||
bool lockDB = true;
|
||||
wait(submitParallelRestore(cx, tagName, ranges, KeyRef(bc->getURL()), targetVersion, lockDB, randomUid,
|
||||
addPrefix, removePrefix));
|
||||
bool hasPrefix = (addPrefix.size() > 0 || removePrefix.size() > 0);
|
||||
state bool hasPrefix = (addPrefix.size() > 0 || removePrefix.size() > 0);
|
||||
TraceEvent("AtomicParallelRestoreWaitForRestoreFinish").detail("HasPrefix", hasPrefix);
|
||||
wait(parallelRestoreFinish(cx, randomUid, !hasPrefix));
|
||||
// If addPrefix or removePrefix set, we want to transform the effect by copying data
|
||||
|
@ -4716,3 +4719,243 @@ Future<int> FileBackupAgent::waitBackup(Database cx, std::string tagName, bool s
|
|||
Future<Void> FileBackupAgent::changePause(Database db, bool pause) {
|
||||
return FileBackupAgentImpl::changePause(this, db, pause);
|
||||
}
|
||||
|
||||
// Fast Restore addPrefix test helper functions
|
||||
static std::pair<bool, bool> insideValidRange(KeyValueRef kv, Standalone<VectorRef<KeyRangeRef>> restoreRanges,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges) {
|
||||
bool insideRestoreRange = false;
|
||||
bool insideBackupRange = false;
|
||||
for (auto& range : restoreRanges) {
|
||||
TraceEvent("InsideValidRestoreRange")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Range", range)
|
||||
.detail("Inside", (kv.key >= range.begin && kv.key < range.end));
|
||||
if (kv.key >= range.begin && kv.key < range.end) {
|
||||
insideRestoreRange = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (auto& range : backupRanges) {
|
||||
TraceEvent("InsideValidBackupRange")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Range", range)
|
||||
.detail("Inside", (kv.key >= range.begin && kv.key < range.end));
|
||||
if (kv.key >= range.begin && kv.key < range.end) {
|
||||
insideBackupRange = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return std::make_pair(insideBackupRange, insideRestoreRange);
|
||||
}
|
||||
|
||||
// Write [begin, end) in kvs to DB
|
||||
ACTOR static Future<Void> writeKVs(Database cx, Standalone<VectorRef<KeyValueRef>> kvs, int begin, int end) {
|
||||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
int index = begin;
|
||||
while (index < end) {
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContentsWriteKV")
|
||||
.detail("Index", index)
|
||||
.detail("KVs", kvs.size())
|
||||
.detail("Key", kvs[index].key)
|
||||
.detail("Value", kvs[index].value);
|
||||
tr->set(kvs[index].key, kvs[index].value);
|
||||
++index;
|
||||
}
|
||||
return Void();
|
||||
}));
|
||||
|
||||
// Sanity check data has been written to DB
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
KeyRef k1 = kvs[begin].key;
|
||||
KeyRef k2 = end < kvs.size() ? kvs[end].key : normalKeys.end;
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContentsWriteKVReadBack")
|
||||
.detail("Range", KeyRangeRef(k1, k2))
|
||||
.detail("Begin", begin)
|
||||
.detail("End", end);
|
||||
Standalone<RangeResultRef> readKVs = wait(tr.getRange(KeyRangeRef(k1, k2), CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(readKVs.size() > 0 || begin == end);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TransformDatabaseContentsWriteKVReadBackError").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContentsWriteKVDone").detail("Begin", begin).detail("End", end);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// restoreRanges is the actual range that has applied removePrefix and addPrefix processed by restore system
|
||||
// Assume: restoreRanges do not overlap which is achieved by ensuring backup ranges do not overlap
|
||||
ACTOR static Future<Void> transformDatabaseContents(Database cx, Key addPrefix, Key removePrefix,
|
||||
Standalone<VectorRef<KeyRangeRef>> restoreRanges) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state Standalone<VectorRef<KeyValueRef>> oldData;
|
||||
|
||||
loop { // Read all data from DB
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
TraceEvent("FastRestoreWorkloadTransformDatabaseContents")
|
||||
.detail("AddPrefix", addPrefix)
|
||||
.detail("RemovePrefix", removePrefix);
|
||||
|
||||
state int i = 0;
|
||||
for (i = 0; i < restoreRanges.size(); ++i) {
|
||||
Standalone<RangeResultRef> kvs = wait(tr.getRange(restoreRanges[i], CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!kvs.more);
|
||||
for (auto kv : kvs) {
|
||||
oldData.push_back_deep(oldData.arena(), KeyValueRef(kv.key, kv.value));
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
oldData = Standalone<VectorRef<KeyValueRef>>(); // clear the vector
|
||||
}
|
||||
}
|
||||
|
||||
// Convert data by removePrefix and addPrefix in memory
|
||||
state Standalone<VectorRef<KeyValueRef>> newKVs;
|
||||
for (int i = 0; i < oldData.size(); ++i) {
|
||||
Key newKey(oldData[i].key);
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContents")
|
||||
.detail("Keys", oldData.size())
|
||||
.detail("Index", i)
|
||||
.detail("GetKey", oldData[i].key)
|
||||
.detail("GetValue", oldData[i].value);
|
||||
if (newKey.size() < removePrefix.size()) { // If true, must check why?!
|
||||
TraceEvent(SevError, "TransformDatabaseContents")
|
||||
.detail("Key", newKey)
|
||||
.detail("RemovePrefix", removePrefix);
|
||||
continue;
|
||||
}
|
||||
newKey = newKey.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||
newKVs.push_back_deep(newKVs.arena(), KeyValueRef(newKey.contents(), oldData[i].value));
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContents")
|
||||
.detail("Keys", newKVs.size())
|
||||
.detail("Index", i)
|
||||
.detail("NewKey", newKVs.back().key)
|
||||
.detail("NewValue", newKVs.back().value);
|
||||
}
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges; // dest. ranges
|
||||
for (auto& range : restoreRanges) {
|
||||
KeyRange tmpRange = range;
|
||||
backupRanges.push_back_deep(backupRanges.arena(), tmpRange.removePrefix(removePrefix).withPrefix(addPrefix));
|
||||
}
|
||||
|
||||
// Clear the transformed data (original data with removePrefix and addPrefix) in restoreRanges
|
||||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
for (int i = 0; i < restoreRanges.size(); i++) {
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContents")
|
||||
.detail("ClearRestoreRange", restoreRanges[i])
|
||||
.detail("ClearBackupRange", backupRanges[i]);
|
||||
tr->clear(restoreRanges[i]); // Clear the range.removePrefix().withPrefix()
|
||||
tr->clear(backupRanges[i]);
|
||||
}
|
||||
return Void();
|
||||
}));
|
||||
|
||||
// Sanity check to ensure no data in the ranges
|
||||
loop {
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Standalone<RangeResultRef> emptyData = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
for (int i = 0; i < emptyData.size(); ++i) {
|
||||
TraceEvent(SevError, "ExpectEmptyData")
|
||||
.detail("Index", i)
|
||||
.detail("Key", emptyData[i].key)
|
||||
.detail("Value", emptyData[i].value);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Write transformed KVs (i.e., kv backup took) back to DB
|
||||
state std::vector<Future<Void>> fwrites;
|
||||
loop {
|
||||
try {
|
||||
state int begin = 0;
|
||||
state int len = 0;
|
||||
while (begin < newKVs.size()) {
|
||||
len = std::min(100, newKVs.size() - begin);
|
||||
fwrites.push_back(writeKVs(cx, newKVs, begin, begin + len));
|
||||
begin = begin + len;
|
||||
}
|
||||
wait(waitForAll(fwrites));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreWorkloadTransformDatabaseContentsUnexpectedErrorOnWriteKVs").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity check
|
||||
loop {
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Standalone<RangeResultRef> allData = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
TraceEvent(SevFRTestInfo, "SanityCheckData").detail("Size", allData.size());
|
||||
for (int i = 0; i < allData.size(); ++i) {
|
||||
std::pair<bool, bool> backupRestoreValid = insideValidRange(allData[i], restoreRanges, backupRanges);
|
||||
TraceEvent(backupRestoreValid.first ? SevFRTestInfo : SevError, "SanityCheckData")
|
||||
.detail("Index", i)
|
||||
.detail("Key", allData[i].key)
|
||||
.detail("Value", allData[i].value)
|
||||
.detail("InsideBackupRange", backupRestoreValid.first)
|
||||
.detail("InsideRestoreRange", backupRestoreValid.second);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreWorkloadTransformDatabaseContentsFinish")
|
||||
.detail("AddPrefix", addPrefix)
|
||||
.detail("RemovePrefix", removePrefix);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// addPrefix and removePrefix are the options used in the restore request:
|
||||
// every backup key applied removePrefix and addPrefix in restore;
|
||||
// transformRestoredDatabase actor will revert it by remove addPrefix and add removePrefix.
|
||||
ACTOR Future<Void> transformRestoredDatabase(Database cx, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
Key addPrefix, Key removePrefix) {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> restoreRanges;
|
||||
for (int i = 0; i < backupRanges.size(); ++i) {
|
||||
KeyRange range(backupRanges[i]);
|
||||
Key begin = range.begin.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||
Key end = range.end.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||
TraceEvent("FastRestoreTransformRestoredDatabase")
|
||||
.detail("From", KeyRangeRef(begin.contents(), end.contents()))
|
||||
.detail("To", range);
|
||||
restoreRanges.push_back_deep(restoreRanges.arena(), KeyRangeRef(begin.contents(), end.contents()));
|
||||
}
|
||||
wait(transformDatabaseContents(cx, removePrefix, addPrefix, restoreRanges));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreTransformRestoredDatabaseUnexpectedError").error(e);
|
||||
throw;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
|
@ -34,11 +34,17 @@
|
|||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
#define SevFRTestInfo SevVerbose
|
||||
//#define SevFRTestInfo SevInfo
|
||||
|
||||
// Split RestoreConfigFR defined in FileBackupAgent.actor.cpp to declaration in Restore.actor.h and implementation in
|
||||
// RestoreCommon.actor.cpp
|
||||
template <>
|
||||
inline Tuple Codec<ERestoreState>::pack(ERestoreState const& val) {
|
||||
return Tuple().append(val);
|
||||
}
|
||||
template <>
|
||||
inline ERestoreState Codec<ERestoreState>::unpack(Tuple const& val) {
|
||||
return (ERestoreState)val.getInt(0);
|
||||
}
|
||||
|
||||
KeyBackedProperty<ERestoreState> RestoreConfigFR::stateEnum() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
@ -346,242 +352,3 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IA
|
|||
}
|
||||
|
||||
} // namespace parallelFileRestore
|
||||
|
||||
static std::pair<bool, bool> insideValidRange(KeyValueRef kv, Standalone<VectorRef<KeyRangeRef>> restoreRanges,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges) {
|
||||
bool insideRestoreRange = false;
|
||||
bool insideBackupRange = false;
|
||||
for (auto& range : restoreRanges) {
|
||||
TraceEvent("InsideValidRestoreRange")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Range", range)
|
||||
.detail("Inside", (kv.key >= range.begin && kv.key < range.end));
|
||||
if (kv.key >= range.begin && kv.key < range.end) {
|
||||
insideRestoreRange = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (auto& range : backupRanges) {
|
||||
TraceEvent("InsideValidBackupRange")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Range", range)
|
||||
.detail("Inside", (kv.key >= range.begin && kv.key < range.end));
|
||||
if (kv.key >= range.begin && kv.key < range.end) {
|
||||
insideBackupRange = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return std::make_pair(insideBackupRange, insideRestoreRange);
|
||||
}
|
||||
|
||||
// Write [begin, end) in kvs to DB
|
||||
ACTOR static Future<Void> writeKVs(Database cx, Standalone<VectorRef<KeyValueRef>> kvs, int begin, int end) {
|
||||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
int index = begin;
|
||||
while (index < end) {
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContentsWriteKV")
|
||||
.detail("Index", index)
|
||||
.detail("KVs", kvs.size())
|
||||
.detail("Key", kvs[index].key)
|
||||
.detail("Value", kvs[index].value);
|
||||
tr->set(kvs[index].key, kvs[index].value);
|
||||
++index;
|
||||
}
|
||||
return Void();
|
||||
}));
|
||||
|
||||
// Sanity check data has been written to DB
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
KeyRef k1 = kvs[begin].key;
|
||||
KeyRef k2 = end < kvs.size() ? kvs[end].key : normalKeys.end;
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContentsWriteKVReadBack")
|
||||
.detail("Range", KeyRangeRef(k1, k2))
|
||||
.detail("Begin", begin)
|
||||
.detail("End", end);
|
||||
Standalone<RangeResultRef> readKVs = wait(tr.getRange(KeyRangeRef(k1, k2), CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(readKVs.size() > 0 || begin == end);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TransformDatabaseContentsWriteKVReadBackError").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContentsWriteKVDone").detail("Begin", begin).detail("End", end);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// restoreRanges is the actual range that has applied removePrefix and addPrefix processed by restore system
|
||||
// Assume: restoreRanges do not overlap which is achieved by ensuring backup ranges do not overlap
|
||||
ACTOR Future<Void> transformDatabaseContents(Database cx, Key addPrefix, Key removePrefix,
|
||||
Standalone<VectorRef<KeyRangeRef>> restoreRanges) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state Standalone<VectorRef<KeyValueRef>> oldData;
|
||||
|
||||
loop { // Read all data from DB
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
TraceEvent("FastRestoreWorkloadTransformDatabaseContents")
|
||||
.detail("AddPrefix", addPrefix)
|
||||
.detail("RemovePrefix", removePrefix);
|
||||
|
||||
state int i = 0;
|
||||
for (i = 0; i < restoreRanges.size(); ++i) {
|
||||
Standalone<RangeResultRef> kvs = wait(tr.getRange(restoreRanges[i], CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!kvs.more);
|
||||
for (auto kv : kvs) {
|
||||
oldData.push_back_deep(oldData.arena(), KeyValueRef(kv.key, kv.value));
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
oldData = Standalone<VectorRef<KeyValueRef>>(); // clear the vector
|
||||
}
|
||||
}
|
||||
|
||||
// Convert data by removePrefix and addPrefix in memory
|
||||
state Standalone<VectorRef<KeyValueRef>> newKVs;
|
||||
for (int i = 0; i < oldData.size(); ++i) {
|
||||
Key newKey(oldData[i].key);
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContents")
|
||||
.detail("Keys", oldData.size())
|
||||
.detail("Index", i)
|
||||
.detail("GetKey", oldData[i].key)
|
||||
.detail("GetValue", oldData[i].value);
|
||||
if (newKey.size() < removePrefix.size()) { // If true, must check why?!
|
||||
TraceEvent(SevError, "TransformDatabaseContents")
|
||||
.detail("Key", newKey)
|
||||
.detail("RemovePrefix", removePrefix);
|
||||
continue;
|
||||
}
|
||||
newKey = newKey.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||
newKVs.push_back_deep(newKVs.arena(), KeyValueRef(newKey.contents(), oldData[i].value));
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContents")
|
||||
.detail("Keys", newKVs.size())
|
||||
.detail("Index", i)
|
||||
.detail("NewKey", newKVs.back().key)
|
||||
.detail("NewValue", newKVs.back().value);
|
||||
}
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges; // dest. ranges
|
||||
for (auto& range : restoreRanges) {
|
||||
KeyRange tmpRange = range;
|
||||
backupRanges.push_back_deep(backupRanges.arena(), tmpRange.removePrefix(removePrefix).withPrefix(addPrefix));
|
||||
}
|
||||
|
||||
// Clear the transformed data (original data with removePrefix and addPrefix) in restoreRanges
|
||||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
for (int i = 0; i < restoreRanges.size(); i++) {
|
||||
TraceEvent(SevFRTestInfo, "TransformDatabaseContents")
|
||||
.detail("ClearRestoreRange", restoreRanges[i])
|
||||
.detail("ClearBackupRange", backupRanges[i]);
|
||||
tr->clear(restoreRanges[i]); // Clear the range.removePrefix().withPrefix()
|
||||
tr->clear(backupRanges[i]);
|
||||
}
|
||||
return Void();
|
||||
}));
|
||||
|
||||
// Sanity check to ensure no data in the ranges
|
||||
loop {
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Standalone<RangeResultRef> emptyData = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
for (int i = 0; i < emptyData.size(); ++i) {
|
||||
TraceEvent(SevError, "ExpectEmptyData")
|
||||
.detail("Index", i)
|
||||
.detail("Key", emptyData[i].key)
|
||||
.detail("Value", emptyData[i].value);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Write transformed KVs (i.e., kv backup took) back to DB
|
||||
state std::vector<Future<Void>> fwrites;
|
||||
loop {
|
||||
try {
|
||||
state int begin = 0;
|
||||
state int len = 0;
|
||||
while (begin < newKVs.size()) {
|
||||
len = std::min(100, newKVs.size() - begin);
|
||||
fwrites.push_back(writeKVs(cx, newKVs, begin, begin + len));
|
||||
begin = begin + len;
|
||||
}
|
||||
wait(waitForAll(fwrites));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreWorkloadTransformDatabaseContentsUnexpectedErrorOnWriteKVs").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity check
|
||||
loop {
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Standalone<RangeResultRef> allData = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
TraceEvent(SevFRTestInfo, "SanityCheckData").detail("Size", allData.size());
|
||||
for (int i = 0; i < allData.size(); ++i) {
|
||||
std::pair<bool, bool> backupRestoreValid = insideValidRange(allData[i], restoreRanges, backupRanges);
|
||||
TraceEvent(backupRestoreValid.first ? SevFRTestInfo : SevError, "SanityCheckData")
|
||||
.detail("Index", i)
|
||||
.detail("Key", allData[i].key)
|
||||
.detail("Value", allData[i].value)
|
||||
.detail("InsideBackupRange", backupRestoreValid.first)
|
||||
.detail("InsideRestoreRange", backupRestoreValid.second);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreWorkloadTransformDatabaseContentsFinish")
|
||||
.detail("AddPrefix", addPrefix)
|
||||
.detail("RemovePrefix", removePrefix);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// addPrefix and removePrefix are the options used in the restore request:
|
||||
// every backup key applied removePrefix and addPrefix in restore;
|
||||
// transformRestoredDatabase actor will revert it by remove addPrefix and add removePrefix.
|
||||
ACTOR Future<Void> transformRestoredDatabase(Database cx, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
Key addPrefix, Key removePrefix) {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> restoreRanges;
|
||||
for (int i = 0; i < backupRanges.size(); ++i) {
|
||||
KeyRange range(backupRanges[i]);
|
||||
Key begin = range.begin.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||
Key end = range.end.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||
TraceEvent("FastRestoreTransformRestoredDatabase")
|
||||
.detail("From", KeyRangeRef(begin.contents(), end.contents()))
|
||||
.detail("To", range);
|
||||
restoreRanges.push_back_deep(restoreRanges.arena(), KeyRangeRef(begin.contents(), end.contents()));
|
||||
}
|
||||
wait(transformDatabaseContents(cx, removePrefix, addPrefix, restoreRanges));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreTransformRestoredDatabaseUnexpectedError").error(e);
|
||||
throw;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
|
@ -45,8 +45,6 @@
|
|||
// TODO: Merge this RestoreConfig with the original RestoreConfig in FileBackupAgent.actor.cpp
|
||||
// For convenience
|
||||
typedef FileBackupAgent::ERestoreState ERestoreState;
|
||||
template<> inline Tuple Codec<ERestoreState>::pack(ERestoreState const &val) { return Tuple().append(val); }
|
||||
template<> inline ERestoreState Codec<ERestoreState>::unpack(Tuple const &val) { return (ERestoreState)val.getInt(0); }
|
||||
|
||||
struct RestoreFileFR;
|
||||
|
||||
|
@ -383,14 +381,5 @@ Future<Void> sendBatchRequests(RequestStream<Request> Interface::*channel, std::
|
|||
return Void();
|
||||
}
|
||||
|
||||
// For fast restore simulation test
|
||||
// For testing addPrefix feature in fast restore.
|
||||
// Transform db content in restoreRanges by removePrefix and then addPrefix.
|
||||
// Assume: DB is locked
|
||||
ACTOR Future<Void> transformDatabaseContents(Database cx, Key addPrefix, Key removePrefix,
|
||||
Standalone<VectorRef<KeyRangeRef>> restoreRanges);
|
||||
ACTOR Future<Void> transformRestoredDatabase(Database cx, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
Key addPrefix, Key removePrefix);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif // FDBSERVER_RESTORECOMMON_ACTOR_H
|
|
@ -20,9 +20,10 @@
|
|||
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbserver/RestoreCommon.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
#include "fdbserver/RestoreCommon.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
//A workload which test the correctness of backup and restore process
|
||||
|
|
Loading…
Reference in New Issue