diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index eed735eaa4..7b7f27c14e 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -781,13 +781,21 @@ public: wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, false)) && store(pLogs, bc->listLogFiles(scanBegin, scanEnd, true)) && store(desc.snapshots, bc->listKeyspaceSnapshots())); - // FIXME: check partitioned logs & maybe enable the below line - // logs.insert(logs.end(), std::make_move_iterator(pLogs.begin()), std::make_move_iterator(pLogs.end())); // List logs in version order so log continuity can be analyzed std::sort(logs.begin(), logs.end()); - if(!logs.empty()) { + // Check partitioned logs + if (!pLogs.empty()) { + std::sort(pLogs.begin(), pLogs.end()); + // If we didn't get log versions above then seed them using the first log file + if(!desc.contiguousLogEnd.present()) { + auto it = pLogs.begin(); + desc.minLogBegin = it->beginVersion; + desc.contiguousLogEnd = it->endVersion; + } + desc.contiguousLogEnd.get() = getPartitionedLogsContinuousEndVersion(pLogs, scanBegin); + } else if (!logs.empty()) { desc.maxLogEnd = logs.rbegin()->endVersion; auto i = logs.begin(); @@ -875,7 +883,7 @@ public: } // Uses the virtual methods to describe the backup contents - Future describeBackup(bool deepScan, Version logStartVersionOverride) override { + Future describeBackup(bool deepScan, Version logStartVersionOverride) final { return describeBackup_impl(Reference::addRef(this), deepScan, logStartVersionOverride); } @@ -1067,15 +1075,20 @@ public: } // For a list of log files specified by their indices (of the same tag), - // returns if they are continous in the range [begin, end]. + // returns if they are continous in the range [begin, end]. If "tags" is not + // nullptr, then it will be populated with [begin, end] -> tags, where next + // pair's begin == previous pair's end + 1. On return, the last pair's end + // version (inclusive) gives the continuous range from begin. static bool isContinuous(const std::vector& files, std::vector indices, Version begin, Version end, std::map, int>* tags) { Version lastBegin = invalidVersion; Version lastEnd = invalidVersion; int lastTags = -1; + ASSERT(tags == nullptr || tags->empty()); for (int idx : indices) { const LogFile& file = files[idx]; +std::cout << file.toString() << " " << "lastBegin " << lastBegin << ", lastEnd " << lastEnd << ", end " << end << ", lastTags" << lastTags << "\n"; if (lastEnd == invalidVersion) { if (file.beginVersion > begin) return false; if (file.endVersion > begin) { @@ -1085,7 +1098,10 @@ public: continue; } } else if (lastEnd != file.beginVersion) { - return false; // not continuous + if (tags != nullptr) { + tags->emplace(std::make_pair(lastBegin, lastEnd - 1), lastTags); + } + return false; } if (lastTags != file.totalTags) { @@ -1098,11 +1114,11 @@ public: lastEnd = file.endVersion; if (lastEnd > end) break; } - if (lastBegin == invalidVersion || lastEnd <= end) return false; // not covering the range - if (tags != nullptr) { - tags->emplace(std::make_pair(lastBegin, end), lastTags); +std::cout << "lastBegin " << lastBegin << ", lastEnd " << lastEnd << ", end " << end << ", lastTags" << lastTags << "\n"; + if (tags != nullptr && lastBegin != invalidVersion) { + tags->emplace(std::make_pair(lastBegin, std::min(end, lastEnd - 1)), lastTags); } - return true; + return lastBegin != invalidVersion && lastEnd > end; } // Returns true if logs are continuous in the range [begin, end]. @@ -1116,7 +1132,7 @@ public: } // check tag 0 is continuous and create a map of ranges to tags - std::map, int> tags; // range [start, end) -> tags + std::map, int> tags; // range [start, end] -> tags if (!isContinuous(files, tagIndices[0], begin, end, &tags)) return false; // for each range in tags, check all tags from 1 are continouous @@ -1130,6 +1146,67 @@ public: return true; } + // Returns log files that are not duplicated. + static std::vector filterDuplicates(std::vector& logs) { + std::sort(logs.begin(), logs.end()); + + std::vector filtered; + int i = 0; + for (int j = 1; j < logs.size(); j++) { + if (!logs[i].sameContent(logs[j])) { + filtered.push_back(logs[i]); + i = j; + } + } + if (i < logs.size()) filtered.push_back(logs[i]); + return filtered; + } + + // Returns the end version such that [begin, end] is continuous. + static Version getPartitionedLogsContinuousEndVersion(std::vector& logs, Version begin) { + auto files = filterDuplicates(logs); +for (auto file : files) std::cout << file.toString() << "\n"; + Version end = 0; + + std::map> tagIndices; // tagId -> indices in files + for (int i = 0; i < files.size(); i++) { + ASSERT(files[i].tagId >= 0 && files[i].tagId < files[i].totalTags); + auto& indices = tagIndices[files[i].tagId]; + indices.push_back(i); + end = files[i].endVersion - 1; + } +std::cout << "Init end: " << end << "\n"; + + // check tag 0 is continuous in [begin, end] and create a map of ranges to tags + std::map, int> tags; // range [start, end] -> tags + isContinuous(files, tagIndices[0], begin, end, &tags); + if (tags.empty() || end <= begin) return 0; + end = std::min(end, tags.rbegin()->first.second); +std::cout << "Tag 0 end: " << end << "\n"; +for (auto [p, v] : tags) std::cout<<"[" << p.first << ", " << p.second << "] " << v << "\n"; + + // for each range in tags, check all tags from 1 are continouous + Version lastEnd = begin; + for (const auto [beginEnd, count] : tags) { + Version tagEnd = end; // This range's minimum continous tag version + for (int i = 1; i < count; i++) { + std::map, int> rangeTags; + isContinuous(files, tagIndices[i], beginEnd.first, beginEnd.second, &rangeTags); + tagEnd = rangeTags.empty() ? 0 : std::min(tagEnd, rangeTags.rbegin()->first.second); +std::cout << "Tag " << i << " end: " << tagEnd << "\n"; + if (tagEnd == 0) return lastEnd; + } + if (tagEnd < beginEnd.second) { + end = tagEnd; + break; + } + lastEnd = beginEnd.second; + } + +std::cout << "Return end = " << end << "\n\n"; + return end; + } + ACTOR static Future> getRestoreSet_impl(Reference bc, Version targetVersion, bool partitioned) { // Find the most recent keyrange snapshot to end at or before targetVersion state Optional snapshot; @@ -1161,15 +1238,7 @@ public: if (partitioned) { // Remove duplicated log files that can happen for old epochs. - std::vector filtered; - int i = 0; - for (int j = 1; j < logs.size(); j++) { - if (!logs[i].sameContent(logs[j])) { - filtered.push_back(logs[i]); - i = j; - } - } - if (i < logs.size()) filtered.push_back(logs[i]); + std::vector filtered = filterDuplicates(logs); restorable.logs.swap(filtered); if (isPartitionedLogsContinuous(restorable.logs, snapshot.get().beginVersion, targetVersion)) { @@ -2081,10 +2150,12 @@ TEST_CASE("/backup/continuous") { // [0, 100) 2 tags files.push_back({ 0, 100, 10, "file1", 100, 0, 2 }); // Tag 0: 0-100 ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 99)); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 0); files.push_back({ 0, 100, 10, "file2", 200, 1, 2 }); // Tag 1: 0-100 ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 99)); ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 100)); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 99); // [100, 300) 3 tags files.push_back({ 100, 200, 10, "file3", 200, 0, 3 }); // Tag 0: 100-200 @@ -2093,17 +2164,21 @@ TEST_CASE("/backup/continuous") { ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 99)); ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 100)); ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 150)); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 99); files.push_back({ 100, 300, 10, "file5", 200, 2, 3 }); // Tag 2: 100-300 std::sort(files.begin(), files.end()); ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 150)); ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 200)); ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 10, 199)); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 199); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 100) == 199); files.push_back({ 250, 300, 10, "file6", 200, 0, 3 }); // Tag 0: 250-300, missing 200-250 std::sort(files.begin(), files.end()); ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 50, 240)); ASSERT(!BackupContainerFileSystem::isPartitionedLogsContinuous(files, 100, 280)); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 99) == 199); files.push_back({ 250, 300, 10, "file7", 200, 1, 3 }); // Tag 1: 250-300 std::sort(files.begin(), files.end()); @@ -2113,6 +2188,7 @@ TEST_CASE("/backup/continuous") { std::sort(files.begin(), files.end()); ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 0, 299)); ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 100, 280)); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 150) == 299); // [300, 400) 1 tag // files.push_back({200, 250, 10, "file9", 200, 0, 3}); // Tag 0: 200-250, duplicate file @@ -2122,6 +2198,9 @@ TEST_CASE("/backup/continuous") { ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 100, 399)); ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 150, 399)); ASSERT(BackupContainerFileSystem::isPartitionedLogsContinuous(files, 250, 399)); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 0) == 399); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 99) == 399); + ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 250) == 399); return Void(); } \ No newline at end of file