Add describePartitionedBackup() for parallel restore
For partitioned logs, computing continuous log end version from min logs begin version. Old backup test keeps using describeBackup() to be correctness clean. Rename partitioned log file so that the last number is block size.
This commit is contained in:
parent
af967210ee
commit
f697ccd1b9
|
@ -240,7 +240,7 @@ std::string BackupDescription::toJSON() const {
|
|||
* file written will be after the start version of the snapshot's execution.
|
||||
*
|
||||
* Log files are at file paths like
|
||||
* /plogs/...log,startVersion,endVersion,UID,blocksize,tagID-of-N
|
||||
* /plogs/...log,startVersion,endVersion,UID,tagID-of-N,blocksize
|
||||
* /logs/.../log,startVersion,endVersion,UID,blockSize
|
||||
* where ... is a multi level path which sorts lexically into version order and results in approximately 1
|
||||
* unique folder per day containing about 5,000 files. Logs after 7.0 are stored in "plogs"
|
||||
|
@ -347,9 +347,9 @@ public:
|
|||
Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
|
||||
uint16_t tagId, int totalTags) final {
|
||||
return writeFile(logVersionFolderString(beginVersion, true) +
|
||||
format("log,%lld,%lld,%s,%d,%d-of-%d", beginVersion, endVersion,
|
||||
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId,
|
||||
totalTags));
|
||||
format("log,%lld,%lld,%s,%d-of-%d,%d", beginVersion, endVersion,
|
||||
deterministicRandom()->randomUniqueID().toString().c_str(), tagId, totalTags,
|
||||
blockSize));
|
||||
}
|
||||
|
||||
Future<Reference<IBackupFile>> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount, Version fileVersion, int blockSize) override {
|
||||
|
@ -400,8 +400,8 @@ public:
|
|||
if(sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size()) {
|
||||
out = f;
|
||||
return true;
|
||||
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d-of-%d%n", &f.beginVersion,
|
||||
&f.endVersion, &f.blockSize, &f.tagId, &f.totalTags, &len) == 5 &&
|
||||
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%d-of-%d,%u%n", &f.beginVersion,
|
||||
&f.endVersion, &f.tagId, &f.totalTags, &f.blockSize, &len) == 5 &&
|
||||
len == name.size() && f.tagId >= 0) {
|
||||
out = f;
|
||||
return true;
|
||||
|
@ -672,7 +672,27 @@ public:
|
|||
return v;
|
||||
}
|
||||
|
||||
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc, bool deepScan, Version logStartVersionOverride) {
|
||||
// Computes the continuous end version for non-partitioned mutation logs up to
|
||||
// the "targetVersion". If "outLogs" is not nullptr, it will be updated with
|
||||
// continuous log files. "*end" is updated with the continuous end version.
|
||||
static void computeRestoreEndVersion(const std::vector<LogFile>& logs, std::vector<LogFile>* outLogs, Version* end,
|
||||
Version targetVersion) {
|
||||
auto i = logs.begin();
|
||||
if (outLogs != nullptr) outLogs->push_back(*i);
|
||||
|
||||
// Add logs to restorable logs set until continuity is broken OR we reach targetVersion
|
||||
while (++i != logs.end()) {
|
||||
if (i->beginVersion > *end || i->beginVersion > targetVersion) break;
|
||||
|
||||
// If the next link in the log chain is found, update the end
|
||||
if (i->beginVersion == *end) {
|
||||
if (outLogs != nullptr) outLogs->push_back(*i);
|
||||
*end = i->endVersion;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc, bool deepScan, Version logStartVersionOverride, bool partitioned) {
|
||||
state BackupDescription desc;
|
||||
desc.url = bc->getURL();
|
||||
|
||||
|
@ -690,8 +710,10 @@ public:
|
|||
// from which to resolve the relative version.
|
||||
// This could be handled more efficiently without recursion but it's tricky, this will do for now.
|
||||
if(logStartVersionOverride != invalidVersion && logStartVersionOverride < 0) {
|
||||
BackupDescription tmp = wait(bc->describeBackup(false, invalidVersion));
|
||||
logStartVersionOverride = resolveRelativeVersion(tmp.maxLogEnd, logStartVersionOverride, "LogStartVersionOverride", invalid_option_value());
|
||||
BackupDescription tmp = wait(partitioned ? bc->describePartitionedBackup(false, invalidVersion)
|
||||
: bc->describeBackup(false, invalidVersion));
|
||||
logStartVersionOverride = resolveRelativeVersion(tmp.maxLogEnd, logStartVersionOverride,
|
||||
"LogStartVersionOverride", invalid_option_value());
|
||||
}
|
||||
|
||||
// Get metadata versions
|
||||
|
@ -777,45 +799,31 @@ public:
|
|||
}
|
||||
|
||||
state std::vector<LogFile> logs;
|
||||
state std::vector<LogFile> pLogs;
|
||||
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, false)) &&
|
||||
store(pLogs, bc->listLogFiles(scanBegin, scanEnd, true)) &&
|
||||
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, partitioned)) &&
|
||||
store(desc.snapshots, bc->listKeyspaceSnapshots()));
|
||||
|
||||
// List logs in version order so log continuity can be analyzed
|
||||
std::sort(logs.begin(), logs.end());
|
||||
|
||||
// Check partitioned logs
|
||||
if (!pLogs.empty()) {
|
||||
std::sort(pLogs.begin(), pLogs.end());
|
||||
// Find out contiguous log end version
|
||||
if (partitioned) {
|
||||
// If we didn't get log versions above then seed them using the first log file
|
||||
if(!desc.contiguousLogEnd.present()) {
|
||||
auto it = pLogs.begin();
|
||||
desc.minLogBegin = it->beginVersion;
|
||||
desc.contiguousLogEnd = it->endVersion;
|
||||
if (!desc.contiguousLogEnd.present()) {
|
||||
desc.minLogBegin = logs.begin()->beginVersion;
|
||||
desc.contiguousLogEnd = logs.begin()->endVersion;
|
||||
}
|
||||
desc.contiguousLogEnd.get() = getPartitionedLogsContinuousEndVersion(pLogs, scanBegin);
|
||||
// contiguousLogEnd is not inclusive, so +1 here.
|
||||
desc.contiguousLogEnd.get() = getPartitionedLogsContinuousEndVersion(logs, desc.minLogBegin.get()) + 1;
|
||||
} else if (!logs.empty()) {
|
||||
desc.maxLogEnd = logs.rbegin()->endVersion;
|
||||
|
||||
auto i = logs.begin();
|
||||
// If we didn't get log versions above then seed them using the first log file
|
||||
if(!desc.contiguousLogEnd.present()) {
|
||||
desc.minLogBegin = i->beginVersion;
|
||||
desc.contiguousLogEnd = i->endVersion;
|
||||
++i;
|
||||
}
|
||||
auto &end = desc.contiguousLogEnd.get(); // For convenience to make loop cleaner
|
||||
|
||||
// Advance until continuity is broken
|
||||
while(i != logs.end()) {
|
||||
if(i->beginVersion > end)
|
||||
break;
|
||||
// If the next link in the log chain is found, update the end
|
||||
if(i->beginVersion == end)
|
||||
end = i->endVersion;
|
||||
++i;
|
||||
desc.minLogBegin = logs.begin()->beginVersion;
|
||||
desc.contiguousLogEnd = logs.begin()->endVersion;
|
||||
}
|
||||
Version& end = desc.contiguousLogEnd.get();
|
||||
computeRestoreEndVersion(logs, nullptr, &end, std::numeric_limits<Version>::max());
|
||||
}
|
||||
|
||||
// Only update stored contiguous log begin and end versions if we did NOT use a log start override.
|
||||
|
@ -884,7 +892,11 @@ public:
|
|||
|
||||
// Uses the virtual methods to describe the backup contents
|
||||
Future<BackupDescription> describeBackup(bool deepScan, Version logStartVersionOverride) final {
|
||||
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride);
|
||||
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride, false);
|
||||
}
|
||||
|
||||
Future<BackupDescription> describePartitionedBackup(bool deepScan, Version logStartVersionOverride) final {
|
||||
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride, true);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> expireData_impl(Reference<BackupContainerFileSystem> bc, Version expireEndVersion, bool force, ExpireProgress *progress, Version restorableBeginVersion) {
|
||||
|
@ -1175,7 +1187,7 @@ for (auto file : files) std::cout << file.toString() << "\n";
|
|||
indices.push_back(i);
|
||||
end = files[i].endVersion - 1;
|
||||
}
|
||||
std::cout << "Init end: " << end << "\n";
|
||||
std::cout << "Init end: " << end << ", begin " << begin << "\n";
|
||||
|
||||
// check tag 0 is continuous in [begin, end] and create a map of ranges to tags
|
||||
std::map<std::pair<Version, Version>, int> tags; // range [start, end] -> tags
|
||||
|
@ -1249,22 +1261,9 @@ std::cout << "Return end = " << end << "\n\n";
|
|||
|
||||
// If there are logs and the first one starts at or before the snapshot begin version then proceed
|
||||
if(!logs.empty() && logs.front().beginVersion <= snapshot.get().beginVersion) {
|
||||
auto i = logs.begin();
|
||||
Version end = i->endVersion;
|
||||
restorable.logs.push_back(*i);
|
||||
|
||||
// Add logs to restorable logs set until continuity is broken OR we reach targetVersion
|
||||
while(++i != logs.end()) {
|
||||
if(i->beginVersion > end || i->beginVersion > targetVersion)
|
||||
break;
|
||||
// If the next link in the log chain is found, update the end
|
||||
if(i->beginVersion == end) {
|
||||
restorable.logs.push_back(*i);
|
||||
end = i->endVersion;
|
||||
}
|
||||
}
|
||||
|
||||
if(end >= targetVersion) {
|
||||
Version end = logs.begin()->endVersion;
|
||||
computeRestoreEndVersion(logs, &restorable.logs, &end, targetVersion);
|
||||
if (end >= targetVersion) {
|
||||
return Optional<RestorableFileSet>(restorable);
|
||||
}
|
||||
}
|
||||
|
@ -1460,6 +1459,7 @@ public:
|
|||
if(deterministicRandom()->random01() < .01) {
|
||||
blockSize /= deterministicRandom()->randomInt(1, 3);
|
||||
}
|
||||
ASSERT(blockSize > 0);
|
||||
|
||||
return map(f, [=](Reference<IAsyncFile> fr) {
|
||||
int readAhead = deterministicRandom()->randomInt(0, 3);
|
||||
|
@ -1609,15 +1609,16 @@ public:
|
|||
virtual ~BackupContainerBlobStore() {}
|
||||
|
||||
Future<Reference<IAsyncFile>> readFile(std::string path) final {
|
||||
return Reference<IAsyncFile>(
|
||||
new AsyncFileReadAheadCache(
|
||||
Reference<IAsyncFile>(new AsyncFileBlobStoreRead(m_bstore, m_bucket, dataPath(path))),
|
||||
m_bstore->knobs.read_block_size,
|
||||
m_bstore->knobs.read_ahead_blocks,
|
||||
m_bstore->knobs.concurrent_reads_per_file,
|
||||
m_bstore->knobs.read_cache_blocks_per_file
|
||||
)
|
||||
);
|
||||
ASSERT(m_bstore->knobs.read_ahead_blocks > 0);
|
||||
return Reference<IAsyncFile>(
|
||||
new AsyncFileReadAheadCache(
|
||||
Reference<IAsyncFile>(new AsyncFileBlobStoreRead(m_bstore, m_bucket, dataPath(path))),
|
||||
m_bstore->knobs.read_block_size,
|
||||
m_bstore->knobs.read_ahead_blocks,
|
||||
m_bstore->knobs.concurrent_reads_per_file,
|
||||
m_bstore->knobs.read_cache_blocks_per_file
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
ACTOR static Future<std::vector<std::string>> listURLs(Reference<BlobStoreEndpoint> bstore, std::string bucket) {
|
||||
|
|
|
@ -255,6 +255,9 @@ public:
|
|||
// be after deleting all data prior to logStartVersionOverride.
|
||||
virtual Future<BackupDescription> describeBackup(bool deepScan = false, Version logStartVersionOverride = invalidVersion) = 0;
|
||||
|
||||
// The same as above, except using partitioned mutation logs.
|
||||
virtual Future<BackupDescription> describePartitionedBackup(bool deepScan = false, Version logStartVersionOverride = invalidVersion) = 0;
|
||||
|
||||
virtual Future<BackupFileList> dumpFileList(Version begin = 0, Version end = std::numeric_limits<Version>::max()) = 0;
|
||||
|
||||
// Get exactly the files necessary to restore to targetVersion. Returns non-present if
|
||||
|
|
|
@ -676,7 +676,7 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
|
|||
ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
|
||||
std::vector<RestoreFileFR>* logFiles, Database cx,
|
||||
RestoreRequest request) {
|
||||
state BackupDescription desc = wait(bc->describeBackup());
|
||||
state BackupDescription desc = wait(bc->describePartitionedBackup());
|
||||
|
||||
// Convert version to real time for operators to read the BackupDescription desc.
|
||||
wait(desc.resolveVersionTimes(cx));
|
||||
|
|
|
@ -209,7 +209,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
state bool restorable = false;
|
||||
if(lastBackupContainer) {
|
||||
state Future<BackupDescription> fdesc = lastBackupContainer->describeBackup();
|
||||
state Future<BackupDescription> fdesc = lastBackupContainer->describePartitionedBackup();
|
||||
wait(ready(fdesc));
|
||||
|
||||
if(!fdesc.isError()) {
|
||||
|
@ -430,7 +430,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
.detail("BackupTag", printable(self->backupTag));
|
||||
|
||||
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
|
||||
BackupDescription desc = wait(container->describeBackup());
|
||||
BackupDescription desc = wait(container->describePartitionedBackup());
|
||||
|
||||
state Version targetVersion = -1;
|
||||
if (desc.maxRestorableVersion.present()) {
|
||||
|
|
Loading…
Reference in New Issue