Add describePartitionedBackup() for parallel restore

For partitioned logs, computing continuous log end version from min logs begin
version. Old backup test keeps using describeBackup() to be correctness clean.

Rename partitioned log file so that the last number is block size.
This commit is contained in:
Jingyu Zhou 2020-02-25 16:37:25 -08:00
parent 2eac17b553
commit 1f95cba53e
4 changed files with 69 additions and 65 deletions

View File

@ -240,7 +240,7 @@ std::string BackupDescription::toJSON() const {
* file written will be after the start version of the snapshot's execution.
*
* Log files are at file paths like
* /plogs/...log,startVersion,endVersion,UID,blocksize,tagID-of-N
* /plogs/...log,startVersion,endVersion,UID,tagID-of-N,blocksize
* /logs/.../log,startVersion,endVersion,UID,blockSize
* where ... is a multi level path which sorts lexically into version order and results in approximately 1
* unique folder per day containing about 5,000 files. Logs after 7.0 are stored in "plogs"
@ -347,9 +347,9 @@ public:
Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
uint16_t tagId, int totalTags) final {
return writeFile(logVersionFolderString(beginVersion, true) +
format("log,%lld,%lld,%s,%d,%d-of-%d", beginVersion, endVersion,
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId,
totalTags));
format("log,%lld,%lld,%s,%d-of-%d,%d", beginVersion, endVersion,
deterministicRandom()->randomUniqueID().toString().c_str(), tagId, totalTags,
blockSize));
}
Future<Reference<IBackupFile>> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount, Version fileVersion, int blockSize) override {
@ -400,8 +400,8 @@ public:
if(sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size()) {
out = f;
return true;
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d-of-%d%n", &f.beginVersion,
&f.endVersion, &f.blockSize, &f.tagId, &f.totalTags, &len) == 5 &&
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%d-of-%d,%u%n", &f.beginVersion,
&f.endVersion, &f.tagId, &f.totalTags, &f.blockSize, &len) == 5 &&
len == name.size() && f.tagId >= 0) {
out = f;
return true;
@ -672,7 +672,27 @@ public:
return v;
}
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc, bool deepScan, Version logStartVersionOverride) {
// Computes the continuous end version for non-partitioned mutation logs up to
// the "targetVersion". If "outLogs" is not nullptr, it will be updated with
// continuous log files. "*end" is updated with the continuous end version.
static void computeRestoreEndVersion(const std::vector<LogFile>& logs, std::vector<LogFile>* outLogs, Version* end,
Version targetVersion) {
auto i = logs.begin();
if (outLogs != nullptr) outLogs->push_back(*i);
// Add logs to restorable logs set until continuity is broken OR we reach targetVersion
while (++i != logs.end()) {
if (i->beginVersion > *end || i->beginVersion > targetVersion) break;
// If the next link in the log chain is found, update the end
if (i->beginVersion == *end) {
if (outLogs != nullptr) outLogs->push_back(*i);
*end = i->endVersion;
}
}
}
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc, bool deepScan, Version logStartVersionOverride, bool partitioned) {
state BackupDescription desc;
desc.url = bc->getURL();
@ -690,8 +710,10 @@ public:
// from which to resolve the relative version.
// This could be handled more efficiently without recursion but it's tricky, this will do for now.
if(logStartVersionOverride != invalidVersion && logStartVersionOverride < 0) {
BackupDescription tmp = wait(bc->describeBackup(false, invalidVersion));
logStartVersionOverride = resolveRelativeVersion(tmp.maxLogEnd, logStartVersionOverride, "LogStartVersionOverride", invalid_option_value());
BackupDescription tmp = wait(partitioned ? bc->describePartitionedBackup(false, invalidVersion)
: bc->describeBackup(false, invalidVersion));
logStartVersionOverride = resolveRelativeVersion(tmp.maxLogEnd, logStartVersionOverride,
"LogStartVersionOverride", invalid_option_value());
}
// Get metadata versions
@ -777,45 +799,31 @@ public:
}
state std::vector<LogFile> logs;
state std::vector<LogFile> pLogs;
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, false)) &&
store(pLogs, bc->listLogFiles(scanBegin, scanEnd, true)) &&
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, partitioned)) &&
store(desc.snapshots, bc->listKeyspaceSnapshots()));
// List logs in version order so log continuity can be analyzed
std::sort(logs.begin(), logs.end());
// Check partitioned logs
if (!pLogs.empty()) {
std::sort(pLogs.begin(), pLogs.end());
// Find out contiguous log end version
if (partitioned) {
// If we didn't get log versions above then seed them using the first log file
if(!desc.contiguousLogEnd.present()) {
auto it = pLogs.begin();
desc.minLogBegin = it->beginVersion;
desc.contiguousLogEnd = it->endVersion;
if (!desc.contiguousLogEnd.present()) {
desc.minLogBegin = logs.begin()->beginVersion;
desc.contiguousLogEnd = logs.begin()->endVersion;
}
desc.contiguousLogEnd.get() = getPartitionedLogsContinuousEndVersion(pLogs, scanBegin);
// contiguousLogEnd is not inclusive, so +1 here.
desc.contiguousLogEnd.get() = getPartitionedLogsContinuousEndVersion(logs, desc.minLogBegin.get()) + 1;
} else if (!logs.empty()) {
desc.maxLogEnd = logs.rbegin()->endVersion;
auto i = logs.begin();
// If we didn't get log versions above then seed them using the first log file
if(!desc.contiguousLogEnd.present()) {
desc.minLogBegin = i->beginVersion;
desc.contiguousLogEnd = i->endVersion;
++i;
}
auto &end = desc.contiguousLogEnd.get(); // For convenience to make loop cleaner
// Advance until continuity is broken
while(i != logs.end()) {
if(i->beginVersion > end)
break;
// If the next link in the log chain is found, update the end
if(i->beginVersion == end)
end = i->endVersion;
++i;
desc.minLogBegin = logs.begin()->beginVersion;
desc.contiguousLogEnd = logs.begin()->endVersion;
}
Version& end = desc.contiguousLogEnd.get();
computeRestoreEndVersion(logs, nullptr, &end, std::numeric_limits<Version>::max());
}
// Only update stored contiguous log begin and end versions if we did NOT use a log start override.
@ -884,7 +892,11 @@ public:
// Uses the virtual methods to describe the backup contents
Future<BackupDescription> describeBackup(bool deepScan, Version logStartVersionOverride) final {
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride);
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride, false);
}
Future<BackupDescription> describePartitionedBackup(bool deepScan, Version logStartVersionOverride) final {
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride, true);
}
ACTOR static Future<Void> expireData_impl(Reference<BackupContainerFileSystem> bc, Version expireEndVersion, bool force, ExpireProgress *progress, Version restorableBeginVersion) {
@ -1175,7 +1187,7 @@ for (auto file : files) std::cout << file.toString() << "\n";
indices.push_back(i);
end = files[i].endVersion - 1;
}
std::cout << "Init end: " << end << "\n";
std::cout << "Init end: " << end << ", begin " << begin << "\n";
// check tag 0 is continuous in [begin, end] and create a map of ranges to tags
std::map<std::pair<Version, Version>, int> tags; // range [start, end] -> tags
@ -1249,22 +1261,9 @@ std::cout << "Return end = " << end << "\n\n";
// If there are logs and the first one starts at or before the snapshot begin version then proceed
if(!logs.empty() && logs.front().beginVersion <= snapshot.get().beginVersion) {
auto i = logs.begin();
Version end = i->endVersion;
restorable.logs.push_back(*i);
// Add logs to restorable logs set until continuity is broken OR we reach targetVersion
while(++i != logs.end()) {
if(i->beginVersion > end || i->beginVersion > targetVersion)
break;
// If the next link in the log chain is found, update the end
if(i->beginVersion == end) {
restorable.logs.push_back(*i);
end = i->endVersion;
}
}
if(end >= targetVersion) {
Version end = logs.begin()->endVersion;
computeRestoreEndVersion(logs, &restorable.logs, &end, targetVersion);
if (end >= targetVersion) {
return Optional<RestorableFileSet>(restorable);
}
}
@ -1460,6 +1459,7 @@ public:
if(deterministicRandom()->random01() < .01) {
blockSize /= deterministicRandom()->randomInt(1, 3);
}
ASSERT(blockSize > 0);
return map(f, [=](Reference<IAsyncFile> fr) {
int readAhead = deterministicRandom()->randomInt(0, 3);
@ -1609,15 +1609,16 @@ public:
virtual ~BackupContainerBlobStore() {}
Future<Reference<IAsyncFile>> readFile(std::string path) final {
return Reference<IAsyncFile>(
new AsyncFileReadAheadCache(
Reference<IAsyncFile>(new AsyncFileBlobStoreRead(m_bstore, m_bucket, dataPath(path))),
m_bstore->knobs.read_block_size,
m_bstore->knobs.read_ahead_blocks,
m_bstore->knobs.concurrent_reads_per_file,
m_bstore->knobs.read_cache_blocks_per_file
)
);
ASSERT(m_bstore->knobs.read_ahead_blocks > 0);
return Reference<IAsyncFile>(
new AsyncFileReadAheadCache(
Reference<IAsyncFile>(new AsyncFileBlobStoreRead(m_bstore, m_bucket, dataPath(path))),
m_bstore->knobs.read_block_size,
m_bstore->knobs.read_ahead_blocks,
m_bstore->knobs.concurrent_reads_per_file,
m_bstore->knobs.read_cache_blocks_per_file
)
);
}
ACTOR static Future<std::vector<std::string>> listURLs(Reference<BlobStoreEndpoint> bstore, std::string bucket) {

View File

@ -255,6 +255,9 @@ public:
// be after deleting all data prior to logStartVersionOverride.
virtual Future<BackupDescription> describeBackup(bool deepScan = false, Version logStartVersionOverride = invalidVersion) = 0;
// The same as above, except using partitioned mutation logs.
virtual Future<BackupDescription> describePartitionedBackup(bool deepScan = false, Version logStartVersionOverride = invalidVersion) = 0;
virtual Future<BackupFileList> dumpFileList(Version begin = 0, Version end = std::numeric_limits<Version>::max()) = 0;
// Get exactly the files necessary to restore to targetVersion. Returns non-present if

View File

@ -676,7 +676,7 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
std::vector<RestoreFileFR>* logFiles, Database cx,
RestoreRequest request) {
state BackupDescription desc = wait(bc->describeBackup());
state BackupDescription desc = wait(bc->describePartitionedBackup());
// Convert version to real time for operators to read the BackupDescription desc.
wait(desc.resolveVersionTimes(cx));

View File

@ -209,7 +209,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state bool restorable = false;
if(lastBackupContainer) {
state Future<BackupDescription> fdesc = lastBackupContainer->describeBackup();
state Future<BackupDescription> fdesc = lastBackupContainer->describePartitionedBackup();
wait(ready(fdesc));
if(!fdesc.isError()) {
@ -430,7 +430,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("BackupTag", printable(self->backupTag));
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
BackupDescription desc = wait(container->describeBackup());
BackupDescription desc = wait(container->describePartitionedBackup());
state Version targetVersion = -1;
if (desc.maxRestorableVersion.present()) {