Describe backup uses partitioned logs to find continuous end version
For partitioned logs, the continuous end version has to be done range by range, where each range must contain continuous version for all tags.
This commit is contained in:
parent
659843ff51
commit
938a6f358d
|
@ -781,13 +781,21 @@ public:
|
|||
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, false)) &&
|
||||
store(pLogs, bc->listLogFiles(scanBegin, scanEnd, true)) &&
|
||||
store(desc.snapshots, bc->listKeyspaceSnapshots()));
|
||||
// FIXME: check partitioned logs & maybe enable the below line
|
||||
// logs.insert(logs.end(), std::make_move_iterator(pLogs.begin()), std::make_move_iterator(pLogs.end()));
|
||||
|
||||
// List logs in version order so log continuity can be analyzed
|
||||
std::sort(logs.begin(), logs.end());
|
||||
|
||||
if(!logs.empty()) {
|
||||
// Check partitioned logs
|
||||
if (!pLogs.empty()) {
|
||||
std::sort(pLogs.begin(), pLogs.end());
|
||||
// If we didn't get log versions above then seed them using the first log file
|
||||
if(!desc.contiguousLogEnd.present()) {
|
||||
auto it = pLogs.begin();
|
||||
desc.minLogBegin = it->beginVersion;
|
||||
desc.contiguousLogEnd = it->endVersion;
|
||||
}
|
||||
desc.contiguousLogEnd.get() = getPartitionedLogsContinuousEndVersion(pLogs, scanBegin);
|
||||
} else if (!logs.empty()) {
|
||||
desc.maxLogEnd = logs.rbegin()->endVersion;
|
||||
|
||||
auto i = logs.begin();
|
||||
|
@ -875,7 +883,7 @@ public:
|
|||
}
|
||||
|
||||
// Uses the virtual methods to describe the backup contents
|
||||
Future<BackupDescription> describeBackup(bool deepScan, Version logStartVersionOverride) override {
|
||||
Future<BackupDescription> describeBackup(bool deepScan, Version logStartVersionOverride) final {
|
||||
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride);
|
||||
}
|
||||
|
||||
|
@ -1067,15 +1075,20 @@ public:
|
|||
}
|
||||
|
||||
// For a list of log files specified by their indices (of the same tag),
|
||||
// returns if they are continous in the range [begin, end].
|
||||
// returns if they are continous in the range [begin, end]. If "tags" is not
|
||||
// nullptr, then it will be populated with [begin, end] -> tags, where next
|
||||
// pair's begin == previous pair's end + 1. On return, the last pair's end
|
||||
// version (inclusive) gives the continuous range from begin.
|
||||
static bool isContinuous(const std::vector<LogFile>& files, std::vector<int> indices, Version begin, Version end,
|
||||
std::map<std::pair<Version, Version>, int>* tags) {
|
||||
Version lastBegin = invalidVersion;
|
||||
Version lastEnd = invalidVersion;
|
||||
int lastTags = -1;
|
||||
|
||||
ASSERT(tags == nullptr || tags->empty());
|
||||
for (int idx : indices) {
|
||||
const LogFile& file = files[idx];
|
||||
std::cout << file.toString() << " " << "lastBegin " << lastBegin << ", lastEnd " << lastEnd << ", end " << end << ", lastTags" << lastTags << "\n";
|
||||
if (lastEnd == invalidVersion) {
|
||||
if (file.beginVersion > begin) return false;
|
||||
if (file.endVersion > begin) {
|
||||
|
@ -1085,7 +1098,10 @@ public:
|
|||
continue;
|
||||
}
|
||||
} else if (lastEnd != file.beginVersion) {
|
||||
return false; // not continuous
|
||||
if (tags != nullptr) {
|
||||
tags->emplace(std::make_pair(lastBegin, lastEnd - 1), lastTags);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (lastTags != file.totalTags) {
|
||||
|
@ -1098,11 +1114,11 @@ public:
|
|||
lastEnd = file.endVersion;
|
||||
if (lastEnd > end) break;
|
||||
}
|
||||
if (lastBegin == invalidVersion || lastEnd <= end) return false; // not covering the range
|
||||
if (tags != nullptr) {
|
||||
tags->emplace(std::make_pair(lastBegin, end), lastTags);
|
||||
std::cout << "lastBegin " << lastBegin << ", lastEnd " << lastEnd << ", end " << end << ", lastTags" << lastTags << "\n";
|
||||
if (tags != nullptr && lastBegin != invalidVersion) {
|
||||
tags->emplace(std::make_pair(lastBegin, std::min(end, lastEnd - 1)), lastTags);
|
||||
}
|
||||
return true;
|
||||
return lastBegin != invalidVersion && lastEnd > end;
|
||||
}
|
||||
|
||||
// Returns true if logs are continuous in the range [begin, end].
|
||||
|
@ -1116,7 +1132,7 @@ public:
|
|||
}
|
||||
|
||||
// check tag 0 is continuous and create a map of ranges to tags
|
||||
std::map<std::pair<Version, Version>, int> tags; // range [start, end) -> tags
|
||||
std::map<std::pair<Version, Version>, int> tags; // range [start, end] -> tags
|
||||
if (!isContinuous(files, tagIndices[0], begin, end, &tags)) return false;
|
||||
|
||||
// for each range in tags, check all tags from 1 are continouous
|
||||
|
@ -1130,6 +1146,67 @@ public:
|
|||
return true;
|
||||
}
|
||||
|
||||
// Returns log files that are not duplicated.
|
||||
static std::vector<LogFile> filterDuplicates(std::vector<LogFile>& logs) {
|
||||
std::sort(logs.begin(), logs.end());
|
||||
|
||||
std::vector<LogFile> filtered;
|
||||
int i = 0;
|
||||
for (int j = 1; j < logs.size(); j++) {
|
||||
if (!logs[i].sameContent(logs[j])) {
|
||||
filtered.push_back(logs[i]);
|
||||
i = j;
|
||||
}
|
||||
}
|
||||
if (i < logs.size()) filtered.push_back(logs[i]);
|
||||
return filtered;
|
||||
}
|
||||
|
||||
// Returns the end version such that [begin, end] is continuous.
|
||||
static Version getPartitionedLogsContinuousEndVersion(std::vector<LogFile>& logs, Version begin) {
|
||||
auto files = filterDuplicates(logs);
|
||||
for (auto file : files) std::cout << file.toString() << "\n";
|
||||
Version end = 0;
|
||||
|
||||
std::map<int, std::vector<int>> tagIndices; // tagId -> indices in files
|
||||
for (int i = 0; i < files.size(); i++) {
|
||||
ASSERT(files[i].tagId >= 0 && files[i].tagId < files[i].totalTags);
|
||||
auto& indices = tagIndices[files[i].tagId];
|
||||
indices.push_back(i);
|
||||
end = files[i].endVersion - 1;
|
||||
}
|
||||
std::cout << "Init end: " << end << "\n";
|
||||
|
||||
// check tag 0 is continuous in [begin, end] and create a map of ranges to tags
|
||||
std::map<std::pair<Version, Version>, int> tags; // range [start, end] -> tags
|
||||
isContinuous(files, tagIndices[0], begin, end, &tags);
|
||||
if (tags.empty() || end <= begin) return 0;
|
||||
end = std::min(end, tags.rbegin()->first.second);
|
||||
std::cout << "Tag 0 end: " << end << "\n";
|
||||
for (auto [p, v] : tags) std::cout<<"[" << p.first << ", " << p.second << "] " << v << "\n";
|
||||
|
||||
// for each range in tags, check all tags from 1 are continouous
|
||||
Version lastEnd = begin;
|
||||
for (const auto [beginEnd, count] : tags) {
|
||||
Version tagEnd = end; // This range's minimum continous tag version
|
||||
for (int i = 1; i < count; i++) {
|
||||
std::map<std::pair<Version, Version>, int> rangeTags;
|
||||
isContinuous(files, tagIndices[i], beginEnd.first, beginEnd.second, &rangeTags);
|
||||
tagEnd = rangeTags.empty() ? 0 : std::min(tagEnd, rangeTags.rbegin()->first.second);
|
||||
std::cout << "Tag " << i << " end: " << tagEnd << "\n";
|
||||
if (tagEnd == 0) return lastEnd;
|
||||
}
|
||||
if (tagEnd < beginEnd.second) {
|
||||
end = tagEnd;
|
||||
break;
|
||||
}
|
||||
lastEnd = beginEnd.second;
|
||||
}
|
||||
|
||||
std::cout << "Return end = " << end << "\n\n";
|
||||
return end;
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc, Version targetVersion, bool partitioned) {
|
||||
// Find the most recent keyrange snapshot to end at or before targetVersion
|
||||
state Optional<KeyspaceSnapshotFile> snapshot;
|
||||
|
@ -1161,15 +1238,7 @@ public:
|
|||
|
||||
if (partitioned) {
|
||||
// Remove duplicated log files that can happen for old epochs.
|
||||
std::vector<LogFile> filtered;
|
||||
int i = 0;
|
||||
for (int j = 1; j < logs.size(); j++) {
|
||||
if (!logs[i].sameContent(logs[j])) {
|
||||
filtered.push_back(logs[i]);
|
||||
i = j;
|
||||
}
|
||||
}
|
||||
if (i < logs.size()) filtered.push_back(logs[i]);
|
||||
std::vector<LogFile> filtered = filterDuplicates(logs);
|
||||
|
||||
restorable.logs.swap(filtered);
|
||||
if (isPartitionedLogsContinuous(restorable.logs, snapshot.get().beginVersion, targetVersion)) {
|
||||
|
@ -2081,10 +2150,12 @@ TEST_CASE("/backup/continuous") {
|
|||
// [0, 100) 2 tags
|
||||
files.push_back({ 0, 100, 10, "file1", 100, 0, 2 }); // Tag 0: 0-100
|
||||
ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 99));
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 0);
|
||||
|
||||
files.push_back({ 0, 100, 10, "file2", 200, 1, 2 }); // Tag 1: 0-100
|
||||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 99));
|
||||
ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 100));
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 99);
|
||||
|
||||
// [100, 300) 3 tags
|
||||
files.push_back({ 100, 200, 10, "file3", 200, 0, 3 }); // Tag 0: 100-200
|
||||
|
@ -2093,17 +2164,21 @@ TEST_CASE("/backup/continuous") {
|
|||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 99));
|
||||
ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 100));
|
||||
ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 150));
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 99);
|
||||
|
||||
files.push_back({ 100, 300, 10, "file5", 200, 2, 3 }); // Tag 2: 100-300
|
||||
std::sort(files.begin(), files.end());
|
||||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 150));
|
||||
ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 200));
|
||||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 10, 199));
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 199);
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 100) == 199);
|
||||
|
||||
files.push_back({ 250, 300, 10, "file6", 200, 0, 3 }); // Tag 0: 250-300, missing 200-250
|
||||
std::sort(files.begin(), files.end());
|
||||
ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 240));
|
||||
ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 100, 280));
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 99) == 199);
|
||||
|
||||
files.push_back({ 250, 300, 10, "file7", 200, 1, 3 }); // Tag 1: 250-300
|
||||
std::sort(files.begin(), files.end());
|
||||
|
@ -2113,6 +2188,7 @@ TEST_CASE("/backup/continuous") {
|
|||
std::sort(files.begin(), files.end());
|
||||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 299));
|
||||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 100, 280));
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 150) == 299);
|
||||
|
||||
// [300, 400) 1 tag
|
||||
// files.push_back({200, 250, 10, "file9", 200, 0, 3}); // Tag 0: 200-250, duplicate file
|
||||
|
@ -2122,6 +2198,9 @@ TEST_CASE("/backup/continuous") {
|
|||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 100, 399));
|
||||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 150, 399));
|
||||
ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 250, 399));
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 399);
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 99) == 399);
|
||||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 250) == 399);
|
||||
|
||||
return Void();
|
||||
}
|
Loading…
Reference in New Issue