FastRestore:Test for addPrefix and removePrefix
Add the skeleton code to test; addPrefix and removePrefix are all empty for now.
This commit is contained in:
parent
5455467f31
commit
d7a8e554ed
|
@ -276,7 +276,7 @@ public:
|
||||||
static StringRef restoreStateText(ERestoreState id);
|
static StringRef restoreStateText(ERestoreState id);
|
||||||
|
|
||||||
// parallel restore
|
// parallel restore
|
||||||
Future<Void> parallelRestoreFinish(Database cx, UID randomUID);
|
Future<Void> parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB = true);
|
||||||
Future<Void> submitParallelRestore(Database cx, Key backupTag, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
Future<Void> submitParallelRestore(Database cx, Key backupTag, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||||
Key bcUrl, Version targetVersion, bool lockDB, UID randomUID);
|
Key bcUrl, Version targetVersion, bool lockDB, UID randomUID);
|
||||||
Future<Void> atomicParallelRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges,
|
Future<Void> atomicParallelRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges,
|
||||||
|
|
|
@ -3578,7 +3578,7 @@ public:
|
||||||
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
|
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
|
||||||
|
|
||||||
// Parallel restore
|
// Parallel restore
|
||||||
ACTOR static Future<Void> parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB = true) {
|
ACTOR static Future<Void> parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB) {
|
||||||
state ReadYourWritesTransaction tr(cx);
|
state ReadYourWritesTransaction tr(cx);
|
||||||
state Optional<Value> restoreRequestDoneKeyValue;
|
state Optional<Value> restoreRequestDoneKeyValue;
|
||||||
TraceEvent("FastRestoreAgentWaitForRestoreToFinish").detail("DBLock", randomUID);
|
TraceEvent("FastRestoreAgentWaitForRestoreToFinish").detail("DBLock", randomUID);
|
||||||
|
@ -4625,8 +4625,8 @@ const int BackupAgentBase::logHeaderSize = 12;
|
||||||
const int FileBackupAgent::dataFooterSize = 20;
|
const int FileBackupAgent::dataFooterSize = 20;
|
||||||
|
|
||||||
// Return if parallel restore has finished
|
// Return if parallel restore has finished
|
||||||
Future<Void> FileBackupAgent::parallelRestoreFinish(Database cx, UID randomUID) {
|
Future<Void> FileBackupAgent::parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB) {
|
||||||
return FileBackupAgentImpl::parallelRestoreFinish(cx, randomUID);
|
return FileBackupAgentImpl::parallelRestoreFinish(cx, randomUID, unlockDB);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> FileBackupAgent::submitParallelRestore(Database cx, Key backupTag,
|
Future<Void> FileBackupAgent::submitParallelRestore(Database cx, Key backupTag,
|
||||||
|
|
|
@ -87,6 +87,7 @@ struct AtomicRestoreWorkload : TestWorkload {
|
||||||
|
|
||||||
if (self->fastRestore) { // New fast parallel restore
|
if (self->fastRestore) { // New fast parallel restore
|
||||||
TraceEvent(SevInfo, "AtomicParallelRestore");
|
TraceEvent(SevInfo, "AtomicParallelRestore");
|
||||||
|
// TODO: Use non-empty addPrefix and removePrefix
|
||||||
wait(backupAgent.atomicParallelRestore(cx, BackupAgentBase::getDefaultTag(), self->backupRanges,
|
wait(backupAgent.atomicParallelRestore(cx, BackupAgentBase::getDefaultTag(), self->backupRanges,
|
||||||
StringRef(), StringRef()));
|
StringRef(), StringRef()));
|
||||||
} else { // Old style restore
|
} else { // Old style restore
|
||||||
|
|
|
@ -42,6 +42,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
bool allowPauses;
|
bool allowPauses;
|
||||||
bool shareLogRange;
|
bool shareLogRange;
|
||||||
bool usePartitionedLogs;
|
bool usePartitionedLogs;
|
||||||
|
Key addPrefix, removePrefix; // Orignal key will be first apply removePrefix and then addPrefix
|
||||||
|
// CAVEAT: When removePrefix is used, we must ensure every key in backup have the removePrefix
|
||||||
|
|
||||||
std::map<Standalone<KeyRef>, Standalone<ValueRef>> dbKVs;
|
std::map<Standalone<KeyRef>, Standalone<ValueRef>> dbKVs;
|
||||||
|
|
||||||
|
@ -71,6 +73,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
|
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
|
||||||
usePartitionedLogs = getOption(options, LiteralStringRef("usePartitionedLogs"),
|
usePartitionedLogs = getOption(options, LiteralStringRef("usePartitionedLogs"),
|
||||||
deterministicRandom()->random01() < 0.5 ? true : false);
|
deterministicRandom()->random01() < 0.5 ? true : false);
|
||||||
|
addPrefix = getOption(options, LiteralStringRef("addPrefix"), "");
|
||||||
|
removePrefix = getOption(options, LiteralStringRef("removePrefix"), "");
|
||||||
|
|
||||||
KeyRef beginRange;
|
KeyRef beginRange;
|
||||||
KeyRef endRange;
|
KeyRef endRange;
|
||||||
|
@ -131,6 +135,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
return _start(cx, this);
|
return _start(cx, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool hasPrefix() { return addPrefix != LiteralStringRef("") || removePrefix != LiteralStringRef(""); }
|
||||||
|
|
||||||
virtual Future<bool> check(Database const& cx) { return true; }
|
virtual Future<bool> check(Database const& cx) { return true; }
|
||||||
|
|
||||||
virtual void getMetrics(vector<PerfMetric>& m) {}
|
virtual void getMetrics(vector<PerfMetric>& m) {}
|
||||||
|
@ -293,6 +299,41 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write [begin, end) in kvs to DB
|
||||||
|
ACTOR static Future<Void> writeKVs(Database cx, Standalone<RangeResultRef> kvs, int begin, int end) {
|
||||||
|
while (begin < end) {
|
||||||
|
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
|
||||||
|
int i = 0;
|
||||||
|
while (i < 100) {
|
||||||
|
tr->set(kvs[begin].key, kvs[begin].value);
|
||||||
|
++begin;
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> transformDatabaseContents(Database cx, Key addPrefix, Key removePrefix) {
|
||||||
|
state ReadYourWritesTransaction tr(cx);
|
||||||
|
|
||||||
|
TraceEvent("FastRestoreWorkloadTransformDatabaseContents")
|
||||||
|
.detail("AddPrefix", addPrefix)
|
||||||
|
.detail("RemovePrefix", removePrefix);
|
||||||
|
Standalone<RangeResultRef> kvs = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||||
|
ASSERT(!kvs.ismore);
|
||||||
|
state int i = 0;
|
||||||
|
for (i = 0; i < kvs.size(); ++i) {
|
||||||
|
KeyValueRef kv = kvs[i];
|
||||||
|
kv.key.removePrefix(removePrefix).withPrefix(addPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(writeKVs(cx, kvs, 0, kvs.size()));
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
// This actor attempts to restore the database without clearing the keyspace.
|
// This actor attempts to restore the database without clearing the keyspace.
|
||||||
// TODO: Enable this function in correctness test
|
// TODO: Enable this function in correctness test
|
||||||
ACTOR static Future<Void> attemptDirtyRestore(BackupAndParallelRestoreCorrectnessWorkload* self, Database cx,
|
ACTOR static Future<Void> attemptDirtyRestore(BackupAndParallelRestoreCorrectnessWorkload* self, Database cx,
|
||||||
|
@ -509,7 +550,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
|
|
||||||
// Wait for parallel restore to finish before we can proceed
|
// Wait for parallel restore to finish before we can proceed
|
||||||
TraceEvent("FastRestoreWorkload").detail("WaitForRestoreToFinish", randomID);
|
TraceEvent("FastRestoreWorkload").detail("WaitForRestoreToFinish", randomID);
|
||||||
wait(backupAgent.parallelRestoreFinish(cx, randomID));
|
// Do not unlock DB when restore finish because we need to transformDatabaseContents
|
||||||
|
wait(backupAgent.parallelRestoreFinish(cx, randomID, !self->hasPrefix()));
|
||||||
TraceEvent("FastRestoreWorkload").detail("RestoreFinished", randomID);
|
TraceEvent("FastRestoreWorkload").detail("RestoreFinished", randomID);
|
||||||
|
|
||||||
for (auto& restore : restores) {
|
for (auto& restore : restores) {
|
||||||
|
@ -517,6 +559,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: If addPrefix and removePrefix set, we want to transform the effect by copying data
|
// TODO: If addPrefix and removePrefix set, we want to transform the effect by copying data
|
||||||
|
if (self->hasPrefix()) {
|
||||||
|
ASSERT(finalPrefix.size() >= 0);
|
||||||
|
transformDatabaseContents(cx, self->removePrefix, self->addPrefix);
|
||||||
|
wait(unlockDatabase(cx, randomID));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Q: What is the extra backup and why do we need to care about it?
|
// Q: What is the extra backup and why do we need to care about it?
|
||||||
|
|
|
@ -316,7 +316,7 @@ struct SerializabilityWorkload : TestWorkload {
|
||||||
|
|
||||||
ACTOR static Future<Void> resetDatabase( Database cx, Standalone<VectorRef<KeyValueRef>> data ) {
|
ACTOR static Future<Void> resetDatabase( Database cx, Standalone<VectorRef<KeyValueRef>> data ) {
|
||||||
state ReadYourWritesTransaction tr(cx);
|
state ReadYourWritesTransaction tr(cx);
|
||||||
|
|
||||||
tr.clear(normalKeys);
|
tr.clear(normalKeys);
|
||||||
for(auto kv : data)
|
for(auto kv : data)
|
||||||
tr.set(kv.key, kv.value);
|
tr.set(kv.key, kv.value);
|
||||||
|
@ -346,7 +346,7 @@ struct SerializabilityWorkload : TestWorkload {
|
||||||
try {
|
try {
|
||||||
if(now() - startTime > self->testDuration)
|
if(now() - startTime > self->testDuration)
|
||||||
return Void();
|
return Void();
|
||||||
|
|
||||||
//Generate initial data
|
//Generate initial data
|
||||||
state Standalone<VectorRef<KeyValueRef>> initialData;
|
state Standalone<VectorRef<KeyValueRef>> initialData;
|
||||||
int initialAmount = deterministicRandom()->randomInt(0, 100);
|
int initialAmount = deterministicRandom()->randomInt(0, 100);
|
||||||
|
@ -356,7 +356,7 @@ struct SerializabilityWorkload : TestWorkload {
|
||||||
initialData.push_back_deep(initialData.arena(), KeyValueRef(key, value));
|
initialData.push_back_deep(initialData.arena(), KeyValueRef(key, value));
|
||||||
//TraceEvent("SRL_Init").detail("Key", printable(key)).detail("Value", printable(value));
|
//TraceEvent("SRL_Init").detail("Key", printable(key)).detail("Value", printable(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
//Generate three random transactions
|
//Generate three random transactions
|
||||||
state std::vector<TransactionOperation> a = self->randomTransaction();
|
state std::vector<TransactionOperation> a = self->randomTransaction();
|
||||||
state std::vector<TransactionOperation> b = self->randomTransaction();
|
state std::vector<TransactionOperation> b = self->randomTransaction();
|
||||||
|
@ -369,7 +369,7 @@ struct SerializabilityWorkload : TestWorkload {
|
||||||
wait( tr[0].commit() );
|
wait( tr[0].commit() );
|
||||||
|
|
||||||
//TraceEvent("SRL_FinishedA");
|
//TraceEvent("SRL_FinishedA");
|
||||||
|
|
||||||
wait( runTransaction(&tr[1], b, &getFutures[0], &getKeyFutures[0], &getRangeFutures[0], &watchFutures[0], true) );
|
wait( runTransaction(&tr[1], b, &getFutures[0], &getKeyFutures[0], &getRangeFutures[0], &watchFutures[0], true) );
|
||||||
wait( tr[1].commit() );
|
wait( tr[1].commit() );
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue