FastRestore:Add addPrefix and removePrefix feature
This commit is contained in:
parent
d7a8e554ed
commit
2fcef90cbc
|
@ -2224,7 +2224,8 @@ ACTOR Future<Void> runFastRestoreAgent(Database db, std::string tagName, std::st
|
|||
.detail("SubmitRestoreRequests", ranges.size())
|
||||
.detail("RestoreUID", randomUID);
|
||||
wait(backupAgent.submitParallelRestore(db, KeyRef(tagName), ranges, KeyRef(container), dbVersion, true,
|
||||
randomUID));
|
||||
randomUID, LiteralStringRef(""), LiteralStringRef("")));
|
||||
ASSERT(false); // TODO: Support addPrefix and removePrefix
|
||||
if (waitForDone) {
|
||||
// Wait for parallel restore to finish and unlock DB after that
|
||||
TraceEvent("FastRestoreAgent").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
|
||||
|
|
|
@ -278,7 +278,8 @@ public:
|
|||
// parallel restore
|
||||
Future<Void> parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB = true);
|
||||
Future<Void> submitParallelRestore(Database cx, Key backupTag, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
Key bcUrl, Version targetVersion, bool lockDB, UID randomUID);
|
||||
Key bcUrl, Version targetVersion, bool lockDB, UID randomUID, Key addPrefix,
|
||||
Key removePrefix);
|
||||
Future<Void> atomicParallelRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges,
|
||||
Key addPrefix, Key removePrefix);
|
||||
|
||||
|
|
|
@ -3578,7 +3578,7 @@ public:
|
|||
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
|
||||
|
||||
// Parallel restore
|
||||
ACTOR static Future<Void> parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB) {
|
||||
ACTOR static Future<Void> parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB = true) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state Optional<Value> restoreRequestDoneKeyValue;
|
||||
TraceEvent("FastRestoreAgentWaitForRestoreToFinish").detail("DBLock", randomUID);
|
||||
|
@ -3626,7 +3626,8 @@ public:
|
|||
|
||||
ACTOR static Future<Void> submitParallelRestore(Database cx, Key backupTag,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, Key bcUrl,
|
||||
Version targetVersion, bool lockDB, UID randomUID) {
|
||||
Version targetVersion, bool lockDB, UID randomUID, Key addPrefix,
|
||||
Key removePrefix) {
|
||||
// Sanity check backup is valid
|
||||
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString());
|
||||
state BackupDescription desc = wait(bc->describeBackup());
|
||||
|
@ -3689,7 +3690,8 @@ public:
|
|||
Standalone<StringRef> restoreTag(backupTag.toString() + "_" + std::to_string(restoreIndex));
|
||||
// Register the request request in DB, which will be picked up by restore worker leader
|
||||
struct RestoreRequest restoreRequest(restoreIndex, restoreTag, bcUrl, targetVersion, range,
|
||||
deterministicRandom()->randomUniqueID());
|
||||
deterministicRandom()->randomUniqueID(), addPrefix,
|
||||
removePrefix);
|
||||
tr->set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
|
||||
}
|
||||
tr->set(restoreRequestTriggerKey,
|
||||
|
@ -4598,7 +4600,8 @@ public:
|
|||
TraceEvent("AtomicParallelRestoreStartRestore");
|
||||
Version targetVersion = -1;
|
||||
bool lockDB = true;
|
||||
wait(submitParallelRestore(cx, tagName, ranges, KeyRef(bc->getURL()), targetVersion, lockDB, randomUid));
|
||||
wait(submitParallelRestore(cx, tagName, ranges, KeyRef(bc->getURL()), targetVersion, lockDB, randomUid,
|
||||
addPrefix, removePrefix));
|
||||
TraceEvent("AtomicParallelRestoreWaitForRestoreFinish");
|
||||
wait(parallelRestoreFinish(cx, randomUid));
|
||||
return -1;
|
||||
|
@ -4631,9 +4634,10 @@ Future<Void> FileBackupAgent::parallelRestoreFinish(Database cx, UID randomUID,
|
|||
|
||||
Future<Void> FileBackupAgent::submitParallelRestore(Database cx, Key backupTag,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, Key bcUrl,
|
||||
Version targetVersion, bool lockDB, UID randomUID) {
|
||||
Version targetVersion, bool lockDB, UID randomUID, Key addPrefix,
|
||||
Key removePrefix) {
|
||||
return FileBackupAgentImpl::submitParallelRestore(cx, backupTag, backupRanges, bcUrl, targetVersion, lockDB,
|
||||
randomUID);
|
||||
randomUID, addPrefix, removePrefix);
|
||||
}
|
||||
|
||||
Future<Void> FileBackupAgent::atomicParallelRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges,
|
||||
|
|
|
@ -208,6 +208,8 @@ struct RestoreApplierInterface : RestoreRoleInterface {
|
|||
// By combining all RestoreAssets across all verstion batches, restore should process all mutations in
|
||||
// backup range and log files up to the target restore version.
|
||||
struct RestoreAsset {
|
||||
UID uid;
|
||||
|
||||
Version beginVersion, endVersion; // Only use mutation in [begin, end) versions;
|
||||
KeyRange range; // Only use mutations in range
|
||||
|
||||
|
@ -218,34 +220,40 @@ struct RestoreAsset {
|
|||
int64_t offset;
|
||||
int64_t len;
|
||||
|
||||
UID uid;
|
||||
Key addPrefix;
|
||||
Key removePrefix;
|
||||
|
||||
RestoreAsset() = default;
|
||||
|
||||
// Q: Can we simply use uid for == and use different comparison rule for less than operator.
|
||||
// The ordering of RestoreAsset may change, will that affect correctness or performance?
|
||||
bool operator==(const RestoreAsset& r) const {
|
||||
return beginVersion == r.beginVersion && endVersion == r.endVersion && range == r.range &&
|
||||
fileIndex == r.fileIndex && partitionId == r.partitionId && filename == r.filename &&
|
||||
offset == r.offset && len == r.len;
|
||||
offset == r.offset && len == r.len && addPrefix == r.addPrefix && removePrefix == r.removePrefix;
|
||||
}
|
||||
bool operator!=(const RestoreAsset& r) const {
|
||||
return !(*this == r);
|
||||
}
|
||||
bool operator<(const RestoreAsset& r) const {
|
||||
return std::make_tuple(fileIndex, filename, offset, len, beginVersion, endVersion, range.begin, range.end) <
|
||||
std::make_tuple(r.fileIndex, r.filename, r.offset, r.len, r.beginVersion, r.endVersion, r.range.begin,
|
||||
r.range.end);
|
||||
return std::make_tuple(fileIndex, filename, offset, len, beginVersion, endVersion, range.begin, range.end,
|
||||
addPrefix, removePrefix) < std::make_tuple(r.fileIndex, r.filename, r.offset, r.len,
|
||||
r.beginVersion, r.endVersion, r.range.begin,
|
||||
r.range.end, r.addPrefix, r.removePrefix);
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, beginVersion, endVersion, range, filename, fileIndex, partitionId, offset, len, uid);
|
||||
serializer(ar, uid, beginVersion, endVersion, range, filename, fileIndex, partitionId, offset, len, addPrefix,
|
||||
removePrefix);
|
||||
}
|
||||
|
||||
std::string toString() {
|
||||
std::stringstream ss;
|
||||
ss << "UID:" << uid.toString() << " begin:" << beginVersion << " end:" << endVersion
|
||||
<< " range:" << range.toString() << " filename:" << filename << " fileIndex:" << fileIndex
|
||||
<< " partitionId:" << partitionId << " offset:" << offset << " len:" << len;
|
||||
<< " partitionId:" << partitionId << " offset:" << offset << " len:" << len
|
||||
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
|
@ -539,7 +547,6 @@ struct RestoreFinishRequest : TimedRequest {
|
|||
struct RestoreRequest {
|
||||
constexpr static FileIdentifier file_identifier = 49589770;
|
||||
|
||||
// Database cx;
|
||||
int index;
|
||||
Key tagName;
|
||||
Key url;
|
||||
|
@ -547,24 +554,31 @@ struct RestoreRequest {
|
|||
KeyRange range;
|
||||
UID randomUid;
|
||||
|
||||
// Every key in backup will first removePrefix and then addPrefix;
|
||||
// Simulation testing does not cover when both addPrefix and removePrefix exist yet.
|
||||
Key addPrefix;
|
||||
Key removePrefix;
|
||||
|
||||
ReplyPromise<struct RestoreCommonReply> reply;
|
||||
|
||||
RestoreRequest() = default;
|
||||
explicit RestoreRequest(const int index, const Key& tagName, const Key& url, Version targetVersion,
|
||||
const KeyRange& range, const UID& randomUid)
|
||||
: index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid) {}
|
||||
const KeyRange& range, const UID& randomUid, Key& addPrefix, Key removePrefix)
|
||||
: index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid),
|
||||
addPrefix(addPrefix), removePrefix(removePrefix) {}
|
||||
|
||||
//To change this serialization, ProtocolVersion::RestoreRequestValue must be updated, and downgrades need to be considered
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, index, tagName, url, targetVersion, range, randomUid, reply);
|
||||
serializer(ar, index, tagName, url, targetVersion, range, randomUid, addPrefix, removePrefix, reply);
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
std::stringstream ss;
|
||||
ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString()
|
||||
<< " url:" << url.contents().toString() << " targetVersion:" << std::to_string(targetVersion)
|
||||
<< " range:" << range.toString() << " randomUid:" << randomUid.toString();
|
||||
<< " range:" << range.toString() << " randomUid:" << randomUid.toString()
|
||||
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
|
||||
return ss.str();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -245,6 +245,11 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
if (isRangeMutation(mutation)) {
|
||||
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
|
||||
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
|
||||
// Remove prefix or add prefix when we restore to a new key space
|
||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
||||
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
||||
} else {
|
||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
||||
}
|
||||
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreDecodePartitionedLogFile")
|
||||
|
@ -787,6 +792,11 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
|
|||
if (isRangeMutation(mutation)) {
|
||||
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
|
||||
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
|
||||
// Remove prefix or add prefix if we restore data to a new key space
|
||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
||||
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
||||
} else {
|
||||
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
||||
}
|
||||
|
||||
cc->sampledLogBytes += mutation.totalSize();
|
||||
|
@ -880,9 +890,9 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
|||
// Convert KV in data into SET mutations of different keys in kvOps
|
||||
for (const KeyValueRef& kv : data) {
|
||||
// NOTE: The KV pairs in range files are the real KV pairs in original DB.
|
||||
// Should NOT add prefix or remove surfix for the backup data!
|
||||
MutationRef m(MutationRef::Type::SetValue, kv.key,
|
||||
kv.value); // ASSUME: all operation in range file is set.
|
||||
MutationRef m(MutationRef::Type::SetValue, kv.key, kv.value);
|
||||
// Remove prefix or add prefix in case we restore data to a different sub keyspace
|
||||
m.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix);
|
||||
cc->loadedRangeBytes += m.totalSize();
|
||||
|
||||
// We cache all kv operations into kvOps, and apply all kv operations later in one place
|
||||
|
|
|
@ -204,8 +204,10 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
|
|||
try {
|
||||
for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) {
|
||||
state RestoreRequest request = restoreRequests[restoreIndex];
|
||||
state KeyRange range = request.range.removePrefix(request.removePrefix).withPrefix(request.addPrefix);
|
||||
TraceEvent("FastRestoreMasterProcessRestoreRequests", self->id())
|
||||
.detail("RestoreRequestInfo", request.toString());
|
||||
.detail("RestoreRequestInfo", request.toString())
|
||||
.detail("TransformedKeyRange", range);
|
||||
// TODO: Initialize MasterData and all loaders and appliers' data for each restore request!
|
||||
self->resetPerRestoreRequest();
|
||||
|
||||
|
@ -213,7 +215,7 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
|
|||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->clear(request.range);
|
||||
tr->clear(range);
|
||||
return Void();
|
||||
}));
|
||||
|
||||
|
@ -348,6 +350,7 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<MasterBatchData> batchDat
|
|||
std::set<RestoreFileFR>* files = isRangeFile ? &versionBatch.rangeFiles : &versionBatch.logFiles;
|
||||
|
||||
TraceEvent("FastRestoreMasterPhaseLoadFilesStart")
|
||||
.detail("RestoreRequestID", request.randomUid)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("FileTypeLoadedInVersionBatch", isRangeFile)
|
||||
.detail("BeginVersion", versionBatch.beginVersion)
|
||||
|
@ -388,6 +391,8 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<MasterBatchData> batchDat
|
|||
param.asset.endVersion = (isRangeFile || request.targetVersion == -1)
|
||||
? versionBatch.endVersion
|
||||
: std::min(versionBatch.endVersion, request.targetVersion + 1);
|
||||
param.asset.addPrefix = request.addPrefix;
|
||||
param.asset.removePrefix = request.removePrefix;
|
||||
|
||||
TraceEvent("FastRestoreMasterPhaseLoadFiles")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
#include "fdbclient/RestoreWorkerInterface.actor.h"
|
||||
#include "fdbclient/RunTransaction.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
#define TEST_ABORT_FASTRESTORE 0
|
||||
|
@ -73,8 +74,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
|
||||
usePartitionedLogs = getOption(options, LiteralStringRef("usePartitionedLogs"),
|
||||
deterministicRandom()->random01() < 0.5 ? true : false);
|
||||
addPrefix = getOption(options, LiteralStringRef("addPrefix"), "");
|
||||
removePrefix = getOption(options, LiteralStringRef("removePrefix"), "");
|
||||
addPrefix = getOption(options, LiteralStringRef("addPrefix"), LiteralStringRef(""));
|
||||
removePrefix = getOption(options, LiteralStringRef("removePrefix"), LiteralStringRef(""));
|
||||
|
||||
KeyRef beginRange;
|
||||
KeyRef endRange;
|
||||
|
@ -302,7 +303,9 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
// write [begin, end) in kvs to DB
|
||||
ACTOR static Future<Void> writeKVs(Database cx, Standalone<RangeResultRef> kvs, int begin, int end) {
|
||||
while (begin < end) {
|
||||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
|
||||
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
int i = 0;
|
||||
while (i < 100) {
|
||||
tr->set(kvs[begin].key, kvs[begin].value);
|
||||
|
@ -322,7 +325,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
.detail("AddPrefix", addPrefix)
|
||||
.detail("RemovePrefix", removePrefix);
|
||||
Standalone<RangeResultRef> kvs = wait(tr.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!kvs.ismore);
|
||||
ASSERT(!kvs.more);
|
||||
state int i = 0;
|
||||
for (i = 0; i < kvs.size(); ++i) {
|
||||
KeyValueRef kv = kvs[i];
|
||||
|
@ -520,7 +523,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
TraceEvent("FastRestore").detail("PrepareRestores", self->backupRanges.size());
|
||||
wait(backupAgent.submitParallelRestore(cx, self->backupTag, self->backupRanges,
|
||||
KeyRef(lastBackupContainer->getURL()), targetVersion,
|
||||
self->locked, randomID));
|
||||
self->locked, randomID, self->addPrefix, self->removePrefix));
|
||||
TraceEvent("FastRestore").detail("TriggerRestore", "Setting up restoreRequestTriggerKey");
|
||||
|
||||
// Sometimes kill and restart the restore
|
||||
|
@ -560,8 +563,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
// TODO: If addPrefix and removePrefix set, we want to transform the effect by copying data
|
||||
if (self->hasPrefix()) {
|
||||
ASSERT(finalPrefix.size() >= 0);
|
||||
transformDatabaseContents(cx, self->removePrefix, self->addPrefix);
|
||||
wait(transformDatabaseContents(cx, self->removePrefix, self->addPrefix));
|
||||
wait(unlockDatabase(cx, randomID));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue