Merge pull request #2871 from xumengpanda/mengxu/parallel-range-log-file-loading-PR
Performant restore [24/xx]: Process range and log files in parallel for old backup format
This commit is contained in:
commit
cf7404a771
|
@ -1208,7 +1208,8 @@ public:
|
|||
|
||||
std::map<int, std::vector<int>> tagIndices; // tagId -> indices in files
|
||||
for (int i = 0; i < logs.size(); i++) {
|
||||
ASSERT(logs[i].tagId >= 0 && logs[i].tagId < logs[i].totalTags);
|
||||
ASSERT(logs[i].tagId >= 0);
|
||||
ASSERT(logs[i].tagId < logs[i].totalTags);
|
||||
auto& indices = tagIndices[logs[i].tagId];
|
||||
// filter out if indices.back() is subset of files[i] or vice versa
|
||||
if (!indices.empty()) {
|
||||
|
|
|
@ -177,7 +177,7 @@ const KeyRangeRef serverTagConflictKeys(
|
|||
const KeyRef serverTagConflictPrefix = serverTagConflictKeys.begin;
|
||||
// serverTagHistoryKeys is the old tag a storage server uses before it is migrated to a different location.
|
||||
// For example, we can copy a SS file to a remote DC and start the SS there;
|
||||
// The new SS will need to cnosume the last bits of data from the old tag it is responsible for.
|
||||
// The new SS will need to consume the last bits of data from the old tag it is responsible for.
|
||||
const KeyRangeRef serverTagHistoryKeys(
|
||||
LiteralStringRef("\xff/serverTagHistory/"),
|
||||
LiteralStringRef("\xff/serverTagHistory0") );
|
||||
|
|
|
@ -640,6 +640,7 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Commit one batch of transactions trs
|
||||
ACTOR Future<Void> commitBatch(
|
||||
ProxyCommitData* self,
|
||||
vector<CommitTransactionRequest> trs,
|
||||
|
|
|
@ -222,8 +222,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
retries++;
|
||||
TraceEvent(retries > 10 ? SevError : SevWarn, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
|
||||
.detail("GetKeys", incompleteStagingKeys.size())
|
||||
.detail("Error", e.what())
|
||||
.detail("ErrorCode", e.code());
|
||||
.error(e);
|
||||
wait(tr->onError(e));
|
||||
fValues.clear();
|
||||
}
|
||||
|
|
|
@ -335,6 +335,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
.detail("UseRangeFile", req.useRangeFile)
|
||||
.detail("LoaderSendStatus", batchStatus->toString());
|
||||
|
||||
// Ensure each file is sent exactly once by using batchStatus->sendAllLogs and batchStatus->sendAllRanges
|
||||
if (!req.useRangeFile) {
|
||||
if (!batchStatus->sendAllLogs.present()) { // Has not sent
|
||||
batchStatus->sendAllLogs = Never();
|
||||
|
@ -342,7 +343,6 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
TraceEvent(SevInfo, "FastRestoreSendMutationsProcessLogRequest", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("UseRangeFile", req.useRangeFile);
|
||||
ASSERT(!batchStatus->sendAllRanges.present());
|
||||
} else if (!batchStatus->sendAllLogs.get().isReady()) { // In the process of sending
|
||||
TraceEvent(SevDebug, "FastRestoreSendMutationsWaitDuplicateLogRequest", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
|
@ -360,7 +360,6 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
TraceEvent(SevInfo, "FastRestoreSendMutationsProcessRangeRequest", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("UseRangeFile", req.useRangeFile);
|
||||
ASSERT(batchStatus->sendAllLogs.get().isReady());
|
||||
} else if (!batchStatus->sendAllRanges.get().isReady()) {
|
||||
TraceEvent(SevDebug, "FastRestoreSendMutationsWaitDuplicateRangeRequest", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
|
@ -723,6 +722,7 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
|
|||
|
||||
auto it = kvOps.insert(std::make_pair(LogMessageVersion(commitVersion, sub++), MutationsVec()));
|
||||
ASSERT(it.second); // inserted is true
|
||||
ASSERT(sub < std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
|
||||
it.first->second.push_back_deep(it.first->second.arena(), mutation);
|
||||
|
||||
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
|
||||
|
|
|
@ -84,10 +84,7 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker,
|
|||
wait(startProcessRestoreRequests(self, cx));
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_operation_cancelled) {
|
||||
TraceEvent(SevError, "FastRestoreMasterStart")
|
||||
.detail("Reason", "Unexpected unhandled error")
|
||||
.detail("ErrorCode", e.code())
|
||||
.detail("Error", e.what());
|
||||
TraceEvent(SevError, "FastRestoreMasterStart").detail("Reason", "Unexpected unhandled error").error(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -204,11 +201,13 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
|
|||
} catch (Error& e) {
|
||||
if (restoreIndex < restoreRequests.size()) {
|
||||
TraceEvent(SevError, "FastRestoreMasterProcessRestoreRequestsFailed", self->id())
|
||||
.detail("RestoreRequest", restoreRequests[restoreIndex].toString());
|
||||
.detail("RestoreRequest", restoreRequests[restoreIndex].toString())
|
||||
.error(e);
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreMasterProcessRestoreRequestsFailed", self->id())
|
||||
.detail("RestoreRequests", restoreRequests.size())
|
||||
.detail("RestoreIndex", restoreIndex);
|
||||
.detail("RestoreIndex", restoreIndex)
|
||||
.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -433,8 +432,6 @@ ACTOR static Future<Void> sendMutationsFromLoaders(Reference<MasterBatchData> ba
|
|||
|
||||
std::vector<std::pair<UID, RestoreSendMutationsToAppliersRequest>> requests;
|
||||
for (auto& loader : loadersInterf) {
|
||||
ASSERT(batchStatus->loadStatus.find(loader.first) == batchStatus->loadStatus.end() ||
|
||||
batchStatus->loadStatus[loader.first] == RestoreSendStatus::SendedLogs);
|
||||
requests.emplace_back(
|
||||
loader.first, RestoreSendMutationsToAppliersRequest(batchIndex, batchData->rangeToApplier, useRangeFile));
|
||||
batchStatus->loadStatus[loader.first] =
|
||||
|
@ -444,40 +441,6 @@ ACTOR static Future<Void> sendMutationsFromLoaders(Reference<MasterBatchData> ba
|
|||
wait(getBatchReplies(&RestoreLoaderInterface::sendMutations, loadersInterf, requests, &replies,
|
||||
TaskPriority::RestoreLoaderSendMutations));
|
||||
|
||||
// Update status and sanity check
|
||||
for (auto& reply : replies) {
|
||||
RestoreSendStatus status = batchStatus->loadStatus[reply.id];
|
||||
if ((status == RestoreSendStatus::SendingRanges || status == RestoreSendStatus::SendingLogs)) {
|
||||
batchStatus->loadStatus[reply.id] = (status == RestoreSendStatus::SendingRanges)
|
||||
? RestoreSendStatus::SendedRanges
|
||||
: RestoreSendStatus::SendedLogs;
|
||||
if (reply.isDuplicated) {
|
||||
TraceEvent(SevWarn, "FastRestoreMasterPhaseSendMutationsFromLoaders")
|
||||
.detail("Loader", reply.id)
|
||||
.detail("DuplicateRequestAcked", "Request should have been processed");
|
||||
}
|
||||
} else if ((status == RestoreSendStatus::SendedRanges || status == RestoreSendStatus::SendedLogs) &&
|
||||
reply.isDuplicated) {
|
||||
TraceEvent(SevDebug, "FastRestoreMasterPhaseSendMutationsFromLoaders")
|
||||
.detail("Loader", reply.id)
|
||||
.detail("RequestIgnored", "Send request was sent more than once");
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreMasterPhaseSendMutationsFromLoaders")
|
||||
.detail("Loader", reply.id)
|
||||
.detail("UnexpectedReply", reply.toString());
|
||||
}
|
||||
}
|
||||
// Sanity check all loaders have sent requests
|
||||
for (auto& loader : loadersInterf) {
|
||||
if ((useRangeFile && batchStatus->loadStatus[loader.first] != RestoreSendStatus::SendedRanges) ||
|
||||
(!useRangeFile && batchStatus->loadStatus[loader.first] != RestoreSendStatus::SendedLogs)) {
|
||||
TraceEvent(SevError, "FastRestoreMasterPhaseSendMutationsFromLoaders")
|
||||
.detail("Loader", loader.first)
|
||||
.detail("UseRangeFile", useRangeFile)
|
||||
.detail("SendStatus", batchStatus->loadStatus[loader.first]);
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreMasterPhaseSendMutationsFromLoadersDone")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("UseRangeFiles", useRangeFile)
|
||||
|
@ -512,17 +475,20 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
|
|||
ASSERT(batchStatus->loadStatus.empty());
|
||||
ASSERT(batchStatus->applyStatus.empty());
|
||||
|
||||
wait(loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, false));
|
||||
wait(loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, true));
|
||||
// New backup has subversion to order mutations at the same version. For mutations at the same version,
|
||||
// range file's mutations have the largest subversion and larger than log file's.
|
||||
// SOMEDAY: Extend subversion to old-style backup.
|
||||
wait(
|
||||
loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, false) &&
|
||||
loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, true));
|
||||
|
||||
ASSERT(batchData->rangeToApplier.empty());
|
||||
splitKeyRangeForAppliers(batchData, self->appliersInterf, batchIndex);
|
||||
|
||||
// Loaders should ensure log files' mutations sent to appliers before range files' mutations
|
||||
// TODO: Let applier buffer mutations from log and range files differently so that loaders can send mutations in
|
||||
// parallel
|
||||
wait(sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, false));
|
||||
wait(sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, true));
|
||||
// Ask loaders to send parsed mutations to appliers;
|
||||
// log mutations should be applied before range mutations at the same version, which is ensured by LogMessageVersion
|
||||
wait(sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, false) &&
|
||||
sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, true));
|
||||
|
||||
// Synchronization point for version batch pipelining.
|
||||
// self->finishedBatch will continuously increase by 1 per version batch.
|
||||
|
|
|
@ -439,6 +439,12 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
BackupDescription desc = wait(self->usePartitionedLogs ? container->describePartitionedBackup()
|
||||
: container->describeBackup());
|
||||
|
||||
TraceEvent("BAFRW_Restore", randomID)
|
||||
.detail("LastBackupContainer", lastBackupContainer->getURL())
|
||||
.detail("MinRestorableVersion", desc.minRestorableVersion.get())
|
||||
.detail("MaxRestorableVersion", desc.maxRestorableVersion.get())
|
||||
.detail("ContiguousLogEnd", desc.contiguousLogEnd.get());
|
||||
|
||||
state Version targetVersion = -1;
|
||||
if (desc.maxRestorableVersion.present()) {
|
||||
if (deterministicRandom()->random01() < 0.1) {
|
||||
|
@ -446,7 +452,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
} else if (deterministicRandom()->random01() < 0.1) {
|
||||
targetVersion = desc.maxRestorableVersion.get();
|
||||
} else if (deterministicRandom()->random01() < 0.5) {
|
||||
ASSERT_WE_THINK(desc.minRestorableVersion.get() <= desc.contiguousLogEnd.get());
|
||||
// The assertion may fail because minRestorableVersion may be decided by snapshot version.
|
||||
// ASSERT_WE_THINK(desc.minRestorableVersion.get() <= desc.contiguousLogEnd.get());
|
||||
// This assertion can fail when contiguousLogEnd < maxRestorableVersion and
|
||||
// the snapshot version > contiguousLogEnd. I.e., there is a gap between
|
||||
// contiguousLogEnd and snapshot version.
|
||||
|
|
|
@ -208,9 +208,12 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES slow/WriteDuringReadAtomicRestore.txt)
|
||||
add_fdb_test(TEST_FILES slow/WriteDuringReadSwitchover.txt)
|
||||
add_fdb_test(TEST_FILES slow/ddbalance.txt)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOp.txt)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessCycle.txt)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessMultiCycles.txt)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessAtomicOp.txt)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessCycle.txt)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.txt)
|
||||
# Note that status tests are not deterministic.
|
||||
add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt)
|
||||
add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt)
|
||||
|
|
|
@ -26,4 +26,5 @@ startAfter=10.0
|
|||
restoreAfter=50.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFile
|
||||
; This test file uses old restore, which does not work on new backup format
|
||||
usePartitionedLogs=false
|
||||
|
|
|
@ -25,6 +25,8 @@ testTitle=BackupAndParallelRestoreWithAtomicOp
|
|||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFile
|
||||
backupRangesCount=-1
|
||||
; use new backup
|
||||
usePartitionedLogs=true
|
||||
|
||||
testName=RandomClogging
|
||||
testDuration=90.0
|
|
@ -21,6 +21,7 @@ testTitle=BackupAndRestore
|
|||
simBackupAgents=BackupToFile
|
||||
; backupRangesCount<0 means backup the entire normal keyspace
|
||||
backupRangesCount=-1
|
||||
usePartitionedLogs=true
|
||||
|
||||
testName=RandomClogging
|
||||
testDuration=90.0
|
||||
|
|
|
@ -44,6 +44,7 @@ testTitle=BackupAndRestore
|
|||
simBackupAgents=BackupToFile
|
||||
; backupRangesCount<0 means backup the entire normal keyspace
|
||||
backupRangesCount=-1
|
||||
usePartitionedLogs=true
|
||||
|
||||
testName=RandomClogging
|
||||
testDuration=90.0
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
testTitle=BackupAndParallelRestoreWithAtomicOp
|
||||
testName=AtomicOps
|
||||
nodeCount=30000
|
||||
; Make ops space only 1 key per group
|
||||
; nodeCount=100
|
||||
transactionsPerSecond=2500.0
|
||||
; transactionsPerSecond=500.0
|
||||
; transactionsPerSecond=500.0
|
||||
; nodeCount=4
|
||||
; transactionsPerSecond=250.0
|
||||
testDuration=30.0
|
||||
clearAfterTest=false
|
||||
; Specify a type of atomicOp
|
||||
; opType=0
|
||||
; actorsPerClient=1
|
||||
|
||||
; Each testName=RunRestoreWorkerWorkload creates a restore worker
|
||||
; We need at least 3 restore workers: master, loader, and applier
|
||||
testName=RunRestoreWorkerWorkload
|
||||
|
||||
; Test case for parallel restore
|
||||
testName=BackupAndParallelRestoreCorrectness
|
||||
backupAfter=10.0
|
||||
restoreAfter=60.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFile
|
||||
backupRangesCount=-1
|
||||
; use old backup
|
||||
usePartitionedLogs=false
|
||||
|
||||
testName=RandomClogging
|
||||
testDuration=90.0
|
||||
|
||||
testName=Rollback
|
||||
meanDelay=90.0
|
||||
testDuration=90.0
|
||||
|
||||
; Do NOT kill restore worker process yet
|
||||
; Kill other process to ensure restore works when FDB cluster has faults
|
||||
testName=Attrition
|
||||
machinesToKill=10
|
||||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=90.0
|
||||
|
||||
testName=Attrition
|
||||
machinesToKill=10
|
||||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=90.0
|
||||
|
||||
; Disable buggify for parallel restore
|
||||
;buggify=on
|
||||
;testDuration=360000 ;not work
|
||||
;timeout is in seconds
|
||||
timeout=360000
|
|
@ -0,0 +1,52 @@
|
|||
testTitle=BackupAndRestore
|
||||
testName=Cycle
|
||||
; nodeCount=30000
|
||||
nodeCount=1000
|
||||
; transactionsPerSecond=500.0
|
||||
transactionsPerSecond=2500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
; keyPrefix=!
|
||||
|
||||
; Each testName=RunRestoreWorkerWorkload creates a restore worker
|
||||
; We need at least 3 restore workers: master, loader, and applier
|
||||
testName=RunRestoreWorkerWorkload
|
||||
|
||||
; Test case for parallel restore
|
||||
testName=BackupAndParallelRestoreCorrectness
|
||||
backupAfter=10.0
|
||||
restoreAfter=60.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFile
|
||||
; backupRangesCount<0 means backup the entire normal keyspace
|
||||
backupRangesCount=-1
|
||||
usePartitionedLogs=false
|
||||
|
||||
testName=RandomClogging
|
||||
testDuration=90.0
|
||||
|
||||
testName=Rollback
|
||||
meanDelay=90.0
|
||||
testDuration=90.0
|
||||
|
||||
; Do NOT kill restore worker process yet
|
||||
; Kill other process to ensure restore works when FDB cluster has faults
|
||||
testName=Attrition
|
||||
machinesToKill=10
|
||||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=90.0
|
||||
|
||||
testName=Attrition
|
||||
machinesToKill=10
|
||||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=90.0
|
||||
|
||||
; Disable buggify for parallel restore
|
||||
;buggify=off
|
||||
;testDuration=360000 ;not work
|
||||
;timeout is in seconds
|
||||
timeout=360000
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
testTitle=BackupAndRestore
|
||||
testName=Cycle
|
||||
; nodeCount=30000
|
||||
nodeCount=1000
|
||||
transactionsPerSecond=2500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
keyPrefix=!
|
||||
|
||||
testName=Cycle
|
||||
nodeCount=1000
|
||||
transactionsPerSecond=2500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
keyPrefix=z
|
||||
|
||||
testName=Cycle
|
||||
nodeCount=1000
|
||||
transactionsPerSecond=2500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
keyPrefix=A
|
||||
|
||||
testName=Cycle
|
||||
nodeCount=1000
|
||||
transactionsPerSecond=2500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
keyPrefix=Z
|
||||
|
||||
; Each testName=RunRestoreWorkerWorkload creates a restore worker
|
||||
; We need at least 3 restore workers: master, loader, and applier
|
||||
testName=RunRestoreWorkerWorkload
|
||||
|
||||
; Test case for parallel restore
|
||||
testName=BackupAndParallelRestoreCorrectness
|
||||
backupAfter=10.0
|
||||
restoreAfter=60.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFile
|
||||
; backupRangesCount<0 means backup the entire normal keyspace
|
||||
backupRangesCount=-1
|
||||
usePartitionedLogs=false
|
||||
|
||||
testName=RandomClogging
|
||||
testDuration=90.0
|
||||
|
||||
testName=Rollback
|
||||
meanDelay=90.0
|
||||
testDuration=90.0
|
||||
|
||||
; Do NOT kill restore worker process yet
|
||||
; Kill other process to ensure restore works when FDB cluster has faults
|
||||
testName=Attrition
|
||||
machinesToKill=10
|
||||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=90.0
|
||||
|
||||
testName=Attrition
|
||||
machinesToKill=10
|
||||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=90.0
|
||||
|
||||
; Disable buggify for parallel restore
|
||||
;buggify=off
|
||||
;testDuration=360000 ;not work
|
||||
;timeout is in seconds
|
||||
timeout=360000
|
||||
|
Loading…
Reference in New Issue