Merge pull request #2931 from jzhou77/backup-progress

Consolidate backup container APIs
This commit is contained in:
Meng Xu 2020-04-11 22:47:20 -07:00 committed by GitHub
commit a272bf4e66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 74 deletions

View File

@ -115,6 +115,7 @@ std::string BackupDescription::toString() const {
info.append(format("URL: %s\n", url.c_str()));
info.append(format("Restorable: %s\n", maxRestorableVersion.present() ? "true" : "false"));
info.append(format("Partitioned logs: %s\n", partitioned ? "true" : "false"));
auto formatVersion = [&](Version v) {
std::string s;
@ -169,6 +170,7 @@ std::string BackupDescription::toJSON() const {
doc.setKey("SchemaVersion", "1.0.0");
doc.setKey("URL", url.c_str());
doc.setKey("Restorable", maxRestorableVersion.present());
doc.setKey("Partitioned", partitioned);
auto formatVersion = [&](Version v) {
JsonBuilderObject doc;
@ -243,10 +245,10 @@ std::string BackupDescription::toJSON() const {
* /plogs/...log,startVersion,endVersion,UID,tagID-of-N,blocksize
* /logs/.../log,startVersion,endVersion,UID,blockSize
* where ... is a multi level path which sorts lexically into version order and results in approximately 1
* unique folder per day containing about 5,000 files. Logs after 7.0 are stored in "plogs"
* directory and are partitioned according to tagIDs (0, 1, 2, ...) and the total number
* partitions is N. Logs before 7.0 are
* stored in "logs" directory and are not partitioned.
* unique folder per day containing about 5,000 files. Logs after FDB 6.3 are stored in "plogs"
* directory and are partitioned according to tagIDs (0, 1, 2, ...) and the total number partitions is N.
* Old backup logs FDB 6.2 and earlier are stored in "logs" directory and are not partitioned.
* After FDB 6.3, users can choose to use the new partitioned logs or old logs.
*
*
* BACKWARD COMPATIBILITY
@ -657,18 +659,6 @@ public:
return dumpFileList_impl(Reference<BackupContainerFileSystem>::addRef(this), begin, end);
}
ACTOR static Future<bool> isPartitionedBackup_impl(Reference<BackupContainerFileSystem> bc) {
BackupFileList list = wait(bc->dumpFileList(0, std::numeric_limits<Version>::max()));
for (const auto& file : list.logs) {
if (file.isPartitionedLog()) return true;
}
return false;
}
Future<bool> isPartitionedBackup() final {
return isPartitionedBackup_impl(Reference<BackupContainerFileSystem>::addRef(this));
}
static Version resolveRelativeVersion(Optional<Version> max, Version v, const char *name, Error e) {
if(v == invalidVersion) {
TraceEvent(SevError, "BackupExpireInvalidVersion").detail(name, v);
@ -704,7 +694,8 @@ public:
}
}
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc, bool deepScan, Version logStartVersionOverride, bool partitioned) {
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc, bool deepScan,
Version logStartVersionOverride) {
state BackupDescription desc;
desc.url = bc->getURL();
@ -722,8 +713,7 @@ public:
// from which to resolve the relative version.
// This could be handled more efficiently without recursion but it's tricky, this will do for now.
if(logStartVersionOverride != invalidVersion && logStartVersionOverride < 0) {
BackupDescription tmp = wait(partitioned ? bc->describePartitionedBackup(false, invalidVersion)
: bc->describeBackup(false, invalidVersion));
BackupDescription tmp = wait(bc->describeBackup(false, invalidVersion));
logStartVersionOverride = resolveRelativeVersion(tmp.maxLogEnd, logStartVersionOverride,
"LogStartVersionOverride", invalid_option_value());
}
@ -733,10 +723,12 @@ public:
state Optional<Version> metaLogEnd;
state Optional<Version> metaExpiredEnd;
state Optional<Version> metaUnreliableEnd;
state Optional<Version> metaLogType;
std::vector<Future<Void>> metaReads;
metaReads.push_back(store(metaExpiredEnd, bc->expiredEndVersion().get()));
metaReads.push_back(store(metaUnreliableEnd, bc->unreliableEndVersion().get()));
metaReads.push_back(store(metaLogType, bc->logType().get()));
// Only read log begin/end versions if not doing a deep scan, otherwise scan files and recalculate them.
if(!deepScan) {
@ -747,12 +739,13 @@ public:
wait(waitForAll(metaReads));
TraceEvent("BackupContainerDescribe2")
.detail("URL", bc->getURL())
.detail("LogStartVersionOverride", logStartVersionOverride)
.detail("ExpiredEndVersion", metaExpiredEnd.orDefault(invalidVersion))
.detail("UnreliableEndVersion", metaUnreliableEnd.orDefault(invalidVersion))
.detail("LogBeginVersion", metaLogBegin.orDefault(invalidVersion))
.detail("LogEndVersion", metaLogEnd.orDefault(invalidVersion));
.detail("URL", bc->getURL())
.detail("LogStartVersionOverride", logStartVersionOverride)
.detail("ExpiredEndVersion", metaExpiredEnd.orDefault(invalidVersion))
.detail("UnreliableEndVersion", metaUnreliableEnd.orDefault(invalidVersion))
.detail("LogBeginVersion", metaLogBegin.orDefault(invalidVersion))
.detail("LogEndVersion", metaLogEnd.orDefault(invalidVersion))
.detail("LogType", metaLogType.orDefault(-1));
// If the logStartVersionOverride is positive (not relative) then ensure that unreliableEndVersion is equal or greater
if(logStartVersionOverride != invalidVersion && metaUnreliableEnd.orDefault(invalidVersion) < logStartVersionOverride) {
@ -811,9 +804,18 @@ public:
}
state std::vector<LogFile> logs;
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, partitioned)) &&
state std::vector<LogFile> plogs;
wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, false)) &&
store(plogs, bc->listLogFiles(scanBegin, scanEnd, true)) &&
store(desc.snapshots, bc->listKeyspaceSnapshots()));
if (plogs.size() > 0) {
desc.partitioned = true;
logs.swap(plogs);
} else {
desc.partitioned = metaLogType.present() && metaLogType.get() == PARTITIONED_MUTATION_LOG;
}
// List logs in version order so log continuity can be analyzed
std::sort(logs.begin(), logs.end());
@ -823,7 +825,7 @@ public:
// If we didn't get log versions above then seed them using the first log file
if (!desc.contiguousLogEnd.present()) {
desc.minLogBegin = logs.begin()->beginVersion;
if (partitioned) {
if (desc.partitioned) {
// Cannot use the first file's end version, which may not be contiguous
// for other partitions. Set to its beginVersion to be safe.
desc.contiguousLogEnd = logs.begin()->beginVersion;
@ -832,7 +834,7 @@ public:
}
}
if (partitioned) {
if (desc.partitioned) {
updatePartitionedLogsContinuousEnd(&desc, logs, scanBegin, scanEnd);
} else {
Version& end = desc.contiguousLogEnd.get();
@ -858,6 +860,11 @@ public:
updates = updates && bc->logEndVersion().set(desc.contiguousLogEnd.get());
}
if (!metaLogType.present()) {
updates = updates && bc->logType().set(desc.partitioned ? PARTITIONED_MUTATION_LOG
: NON_PARTITIONED_MUTATION_LOG);
}
wait(updates);
} catch(Error &e) {
if(e.code() == error_code_actor_cancelled)
@ -906,11 +913,8 @@ public:
// Uses the virtual methods to describe the backup contents
Future<BackupDescription> describeBackup(bool deepScan, Version logStartVersionOverride) final {
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride, false);
}
Future<BackupDescription> describePartitionedBackup(bool deepScan, Version logStartVersionOverride) final {
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan, logStartVersionOverride, true);
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan,
logStartVersionOverride);
}
ACTOR static Future<Void> expireData_impl(Reference<BackupContainerFileSystem> bc, Version expireEndVersion, bool force, ExpireProgress *progress, Version restorableBeginVersion) {
@ -1287,7 +1291,7 @@ public:
return end;
}
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc, Version targetVersion, bool partitioned) {
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc, Version targetVersion) {
// Find the most recent keyrange snapshot to end at or before targetVersion
state Optional<KeyspaceSnapshotFile> snapshot;
std::vector<KeyspaceSnapshotFile> snapshots = wait(bc->listKeyspaceSnapshots());
@ -1311,9 +1315,13 @@ public:
}
// FIXME: check if there are tagged logs. for each tag, there is no version gap.
state std::vector<LogFile> logs = wait(bc->listLogFiles(snapshot.get().beginVersion, targetVersion, partitioned));
state std::vector<LogFile> logs;
state std::vector<LogFile> plogs;
wait(store(logs, bc->listLogFiles(snapshot.get().beginVersion, targetVersion, false)) &&
store(plogs, bc->listLogFiles(snapshot.get().beginVersion, targetVersion, true)));
if (partitioned) {
if (plogs.size() > 0) {
logs.swap(plogs);
// sort by tag ID so that filterDuplicates works.
std::sort(logs.begin(), logs.end(), [](const LogFile& a, const LogFile& b) {
return std::tie(a.tagId, a.beginVersion, a.endVersion) <
@ -1349,11 +1357,7 @@ public:
}
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion) final {
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion, false);
}
Future<Optional<RestorableFileSet>> getPartitionedRestoreSet(Version targetVersion) final {
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion, true);
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion);
}
private:
@ -1388,6 +1392,11 @@ public:
VersionProperty expiredEndVersion() { return {Reference<BackupContainerFileSystem>::addRef(this), "expired_end_version"}; }
VersionProperty unreliableEndVersion() { return {Reference<BackupContainerFileSystem>::addRef(this), "unreliable_end_version"}; }
// Backup log types
const static Version NON_PARTITIONED_MUTATION_LOG = 0;
const static Version PARTITIONED_MUTATION_LOG = 1;
VersionProperty logType() { return { Reference<BackupContainerFileSystem>::addRef(this), "mutation_log_type" }; }
ACTOR static Future<Void> writeVersionProperty(Reference<BackupContainerFileSystem> bc, std::string path, Version v) {
try {
state Reference<IBackupFile> f = wait(bc->writeFile(path));

View File

@ -178,6 +178,7 @@ struct BackupDescription {
// The minimum version which this backup can be used to restore to
Optional<Version> minRestorableVersion;
std::string extendedDetail; // Freeform container-specific info.
bool partitioned; // If this backup contains partitioned mutation logs.
// Resolves the versions above to timestamps using a given database's TimeKeeper data.
// toString will use this information if present.
@ -260,23 +261,12 @@ public:
// be after deleting all data prior to logStartVersionOverride.
virtual Future<BackupDescription> describeBackup(bool deepScan = false, Version logStartVersionOverride = invalidVersion) = 0;
// The same as above, except using partitioned mutation logs.
virtual Future<BackupDescription> describePartitionedBackup(bool deepScan = false, Version logStartVersionOverride = invalidVersion) = 0;
virtual Future<BackupFileList> dumpFileList(Version begin = 0, Version end = std::numeric_limits<Version>::max()) = 0;
// If there are partitioned log files, then returns true; otherwise, returns false.
virtual Future<bool> isPartitionedBackup() = 0;
// Get exactly the files necessary to restore to targetVersion. Returns non-present if
// restore to given version is not possible.
virtual Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion) = 0;
// Get exactly the files necessary to restore to targetVersion. Returns non-present if
// restore to given version is not possible. This is intended for parallel
// restore in FDB 7.0, which reads partitioned mutation logs.
virtual Future<Optional<RestorableFileSet>> getPartitionedRestoreSet(Version targetVersion) = 0;
// Get an IBackupContainer based on a container spec string
static Reference<IBackupContainer> openContainer(std::string url);
static std::vector<std::string> getURLFormats();

View File

@ -45,17 +45,21 @@ void BackupProgress::addBackupStatus(const WorkerBackupStatus& status) {
}
void BackupProgress::updateTagVersions(std::map<Tag, Version>* tagVersions, std::set<Tag>* tags,
const std::map<Tag, Version>& progress, Version endVersion, LogEpoch epoch) {
const std::map<Tag, Version>& progress, Version endVersion,
Version adjustedBeginVersion, LogEpoch epoch) {
for (const auto& [tag, savedVersion] : progress) {
// If tag is not in "tags", it means the old epoch has more tags than
// new epoch's tags. Just ignore the tag here.
auto n = tags->erase(tag);
if (n > 0 && savedVersion < endVersion - 1) {
tagVersions->insert({ tag, savedVersion + 1 });
const Version beginVersion =
(savedVersion + 1 > adjustedBeginVersion) ? (savedVersion + 1) : adjustedBeginVersion;
tagVersions->insert({ tag, beginVersion });
TraceEvent("BackupVersionRange", dbgid)
.detail("OldEpoch", epoch)
.detail("Tag", tag.toString())
.detail("BeginVersion", savedVersion + 1)
.detail("AdjustedBeginVersion", beginVersion)
.detail("EndVersion", endVersion);
}
}
@ -66,12 +70,20 @@ std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgr
if (!backupStartedValue.present()) return toRecruit; // No active backups
Version lastEnd = invalidVersion;
for (const auto& [epoch, info] : epochInfos) {
std::set<Tag> tags = enumerateLogRouterTags(info.logRouterTags);
std::map<Tag, Version> tagVersions;
// Sometimes, an epoch's begin version is lower than the previous epoch's
// end version. In this case, adjust the epoch's begin version to be the
// same as previous end version.
Version adjustedBeginVersion = lastEnd > info.epochBegin ? lastEnd : info.epochBegin;
lastEnd = info.epochEnd;
auto progressIt = progress.lower_bound(epoch);
if (progressIt != progress.end() && progressIt->first == epoch) {
updateTagVersions(&tagVersions, &tags, progressIt->second, info.epochEnd, epoch);
updateTagVersions(&tagVersions, &tags, progressIt->second, info.epochEnd, adjustedBeginVersion, epoch);
} else {
auto rit = std::find_if(
progress.rbegin(), progress.rend(),
@ -90,17 +102,18 @@ std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgr
// The logRouterTags are the same
// ASSERT(info.logRouterTags == epochTags[rit->first]);
updateTagVersions(&tagVersions, &tags, rit->second, info.epochEnd, epoch);
updateTagVersions(&tagVersions, &tags, rit->second, info.epochEnd, adjustedBeginVersion, epoch);
}
}
}
for (const Tag tag : tags) { // tags without progress data
tagVersions.insert({ tag, info.epochBegin });
tagVersions.insert({ tag, adjustedBeginVersion });
TraceEvent("BackupVersionRange", dbgid)
.detail("OldEpoch", epoch)
.detail("Tag", tag.toString())
.detail("BeginVersion", info.epochBegin)
.detail("AdjustedBeginVersion", adjustedBeginVersion)
.detail("EndVersion", info.epochEnd);
}
if (!tagVersions.empty()) {

View File

@ -81,7 +81,8 @@ private:
// For each tag in progress, the saved version is smaller than endVersion - 1,
// add {tag, savedVersion+1} to tagVersions and remove the tag from "tags".
void updateTagVersions(std::map<Tag, Version>* tagVersions, std::set<Tag>* tags,
const std::map<Tag, Version>& progress, Version endVersion, LogEpoch epoch);
const std::map<Tag, Version>& progress, Version endVersion, Version adjustedBeginVersion,
LogEpoch epoch);
const UID dbgid;

View File

@ -179,7 +179,9 @@ struct BackupData {
config.startedBackupWorkers().set(tr, workers.get());
}
for (auto p : workers.get()) {
TraceEvent("BackupWorkerDebug", self->myId).detail("Epoch", p.first).detail("TagID", p.second);
TraceEvent("BackupWorkerDebugTag", self->myId)
.detail("Epoch", p.first)
.detail("TagID", p.second);
}
wait(tr->commit());

View File

@ -617,8 +617,7 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
std::vector<RestoreFileFR>* logFiles, Database cx,
RestoreRequest request) {
state bool partitioned = wait(bc->isPartitionedBackup());
state BackupDescription desc = wait(partitioned ? bc->describePartitionedBackup() : bc->describeBackup());
state BackupDescription desc = wait(bc->describeBackup());
// Convert version to real time for operators to read the BackupDescription desc.
wait(desc.resolveVersionTimes(cx));
@ -634,8 +633,7 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
std::cout << "Restore to version: " << request.targetVersion << "\nBackupDesc: \n" << desc.toString() << "\n\n";
}
Optional<RestorableFileSet> restorable = wait(partitioned ? bc->getPartitionedRestoreSet(request.targetVersion)
: bc->getRestoreSet(request.targetVersion));
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(request.targetVersion));
if (!restorable.present()) {
TraceEvent(SevWarn, "FastRestoreMasterPhaseCollectBackupFiles").detail("NotRestorable", request.targetVersion);

View File

@ -21,6 +21,7 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
@ -213,9 +214,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state bool restorable = false;
if (lastBackupContainer) {
state Future<BackupDescription> fdesc = self->usePartitionedLogs
? lastBackupContainer->describePartitionedBackup()
: lastBackupContainer->describeBackup();
state Future<BackupDescription> fdesc = lastBackupContainer->describeBackup();
wait(ready(fdesc));
if(!fdesc.isError()) {
@ -423,6 +422,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// wait(attemptDirtyRestore(self, cx, &backupAgent, StringRef(lastBackupContainer->getURL()),
// randomID));
}
// We must ensure no backup workers are running, otherwise the clear DB
// below can be picked up by backup workers and applied during restore.
wait(success(changeConfig(cx, "backup_worker_enabled:=0", true)));
// Clear DB before restore
wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
for (auto& kvrange : self->backupRanges) tr->clear(kvrange);
@ -436,14 +440,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("BackupTag", printable(self->backupTag));
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
BackupDescription desc = wait(self->usePartitionedLogs ? container->describePartitionedBackup()
: container->describeBackup());
TraceEvent("BAFRW_Restore", randomID)
.detail("LastBackupContainer", lastBackupContainer->getURL())
.detail("MinRestorableVersion", desc.minRestorableVersion.get())
.detail("MaxRestorableVersion", desc.maxRestorableVersion.get())
.detail("ContiguousLogEnd", desc.contiguousLogEnd.get());
BackupDescription desc = wait(container->describeBackup());
ASSERT(self->usePartitionedLogs == desc.partitioned);
state Version targetVersion = -1;
if (desc.maxRestorableVersion.present()) {
@ -463,6 +461,13 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
}
}
TraceEvent("BAFRW_Restore", randomID)
.detail("LastBackupContainer", lastBackupContainer->getURL())
.detail("MinRestorableVersion", desc.minRestorableVersion.get())
.detail("MaxRestorableVersion", desc.maxRestorableVersion.get())
.detail("ContiguousLogEnd", desc.contiguousLogEnd.get())
.detail("TargetVersion", targetVersion);
state std::vector<Future<Version>> restores;
state std::vector<Standalone<StringRef>> restoreTags;