Major usability and performance improvements to backup management. Backup descriptions now calculate and display timestamps using TimeKeeper data (if given a cluster) and restorability of snapshots. Expire now requires a --force option to leave a backup unrestorable or unrestorable after a given point in time, specified by version or timestamp. BackupContainerFilesystem now maintains metadata on key version boundaries in order to avoid large list operations for describe and expire operations. Blob parallel recursive list operations can now take a path (aka prefix) filter function. New describe and expire options are available in fdbbackup.

This commit is contained in:
Stephen Atherton 2018-01-17 04:09:43 -08:00
parent f955547796
commit 93b34a945f
9 changed files with 553 additions and 158 deletions

View File

@ -90,7 +90,9 @@ enum enumRestoreType {
//
enum {
// Backup constants
OPT_DESTCONTAINER, OPT_SNAPSHOTINTERVAL, OPT_ERRORLIMIT, OPT_NOSTOPWHENDONE, OPT_EXPVERSION, OPT_BASEURL, OPT_DATETIME, OPT_BLOB_CREDENTIALS,
OPT_DESTCONTAINER, OPT_SNAPSHOTINTERVAL, OPT_ERRORLIMIT, OPT_NOSTOPWHENDONE,
OPT_EXPIRE_BEFORE_VERSION, OPT_EXPIRE_BEFORE_DATETIME, OPT_EXPIRE_RESTORABLE_AFTER_VERSION, OPT_EXPIRE_RESTORABLE_AFTER_DATETIME,
OPT_BASEURL, OPT_BLOB_CREDENTIALS, OPT_DESCRIBE_DEEP,
// Backup and Restore constants
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
@ -157,8 +159,6 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_BACKUPKEYS, "--keys", SO_REQ_SEP },
{ OPT_DRYRUN, "-n", SO_NONE },
{ OPT_DRYRUN, "--dryrun", SO_NONE },
{ OPT_FORCE, "-f", SO_NONE },
{ OPT_FORCE, "--force", SO_NONE },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
@ -315,13 +315,10 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_DATETIME, "-D", SO_REQ_SEP },
{ OPT_DATETIME, "--date", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_EXPVERSION, "-u", SO_REQ_SEP },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
@ -333,6 +330,12 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_FORCE, "-f", SO_NONE },
{ OPT_FORCE, "--force", SO_NONE },
{ OPT_EXPIRE_RESTORABLE_AFTER_VERSION, "--restorable_after_version", SO_REQ_SEP },
{ OPT_EXPIRE_RESTORABLE_AFTER_DATETIME, "--restorable_after_timestamp", SO_REQ_SEP },
{ OPT_EXPIRE_BEFORE_VERSION, "--expire_before_version", SO_REQ_SEP },
{ OPT_EXPIRE_BEFORE_DATETIME, "--expire_before_timestamp", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
@ -366,6 +369,8 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
@ -383,6 +388,7 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_DESCRIBE_DEEP, "--deep", SO_NONE },
SO_END_OF_OPTIONS
};
@ -437,7 +443,6 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
{ OPT_DRYRUN, "-n", SO_NONE },
{ OPT_DRYRUN, "--dryrun", SO_NONE },
{ OPT_FORCE, "-f", SO_NONE },
{ OPT_FORCE, "--force", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
@ -697,10 +702,16 @@ static void printBackupUsage(bool devhelp) {
" Base backup URL for list operations. This looks like a Backup URL but without a backup name.\n");
printf(" --blob_credentials FILE\n"
" File containing blob credentials in JSON format. Can be specified multiple times for multiple files. See below for more details.\n");
printf(" -D, --date DATETIME\n"
printf(" --expire_before_timestamp DATETIME\n"
" Datetime cutoff for expire operations. Requires a cluster file and will use version/timestamp metadata\n"
" in the database to obtain a cutoff version very close to the timestamp given in YYYY-MM-DD.HH:MI:SS format (UTC).\n");
printf(" -u VERSION Version cutoff for expire operations. Deletes all backup files with data from < VERSION.\n");
printf(" --expire_before_version VERSION\n"
" Version cutoff for expire operations. Deletes data files containing no data at or after VERSION.\n");
printf(" --restorable_after_timestamp DATETIME\n"
" For expire operations, set minimum acceptable restorability to the version equivalent of DATETIME and later.\n");
printf(" --restorable_after_timestamp VERSION\n"
" For expire operations, set minimum acceptable restorability to the VERSION and later.\n");
printf(" -f, --force For expire operations, force expiration even if minimum restorability would be violated.\n");
printf(" -s, --snapshot_interval DURATION\n"
" For start operations, specifies the backup's target snapshot interval as DURATION seconds. Defaults to %d.\n", CLIENT_KNOBS->BACKUP_DEFAULT_SNAPSHOT_INTERVAL_SEC);
printf(" -e ERRORLIMIT The maximum number of errors printed by status (default is 10).\n");
@ -712,6 +723,16 @@ static void printBackupUsage(bool devhelp) {
printf(" -z, --no-stop-when-done\n"
" Do not stop backup when restorable.\n");
printf(" -h, --help Display this help and exit.\n");
if (devhelp) {
#ifdef _WIN32
printf(" -n Create a new console.\n");
printf(" -q Disable error dialog on crash.\n");
printf(" --parentpid PID\n");
printf(" Specify a process after whose termination to exit.\n");
#endif
printf(" --deep For describe operations, do not use cached metadata. Warning: Very slow\n");
}
printf("\n"
" KEYS FORMAT: \"<BEGINKEY> <ENDKEY>\" [...]\n");
printf("\n"
@ -726,15 +747,6 @@ static void printBackupUsage(bool devhelp) {
" The JSON schema is:\n"
" { \"accounts\" : { \"user@host\" : { \"secret\" : \"SECRETKEY\" }, \"user2@host2\" : { \"secret\" : \"SECRET\" } } }\n");
if (devhelp) {
#ifdef _WIN32
printf(" -n Create a new console.\n");
printf(" -q Disable error dialog on crash.\n");
printf(" --parentpid PID\n");
printf(" Specify a process after whose termination to exit.\n");
#endif
}
return;
}
@ -1719,17 +1731,31 @@ ACTOR Future<Void> runRestore(Database db, std::string tagName, std::string cont
restoreVersion = _restoreVersion;
}
else {
state Version defaultRestoreVersion = -1;
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(container);
state BackupDescription description = wait(bc->describeBackup());
// Get the folder information
std::string info = wait(FileBackupAgent::getBackupInfo(container, &defaultRestoreVersion));
if(dbVersion <= 0) {
Void _ = wait(description.resolveVersionTimes(db));
if(description.maxRestorableVersion.present())
restoreVersion = description.maxRestorableVersion.get();
else {
fprintf(stderr, "Backup is not restorable\n");
throw restore_invalid_version();
}
}
else
restoreVersion = dbVersion;
restoreVersion = (int64_t) (dbVersion > 0) ? dbVersion : defaultRestoreVersion;
state Optional<RestorableFileSet> rset = wait(bc->getRestoreSet(restoreVersion));
if(!rset.present()) {
fprintf(stderr, "Insufficient data to restore to version %lld\n", restoreVersion);
throw restore_invalid_version();
}
// Display the restore information, if requested
if (verbose) {
printf("[DRY RUN] Restoring backup to version: %lld\n", (long long) restoreVersion);
printf("%s\n", info.c_str());
printf("%s\n", description.toString().c_str());
}
}
@ -1807,12 +1833,17 @@ ACTOR Future<Version> getVersionFromDateTime(std::string datetime, Database db)
}
}
ACTOR Future<Void> expireBackupData(const char *name, std::string destinationContainer, Version endVersion, std::string datetime, Database db) {
if (!endVersion && datetime.length()) {
Version v = wait( getVersionFromDateTime(datetime, db) );
ACTOR Future<Void> expireBackupData(const char *name, std::string destinationContainer, Version endVersion, std::string endDatetime, Database db, bool force, Version restorableAfterVersion, std::string restorableAfterDatetime) {
if (!endDatetime.empty()) {
Version v = wait( getVersionFromDateTime(endDatetime, db) );
endVersion = v;
}
if (!restorableAfterDatetime.empty()) {
Version v = wait( getVersionFromDateTime(restorableAfterDatetime, db) );
restorableAfterVersion = v;
}
if (!endVersion) {
fprintf(stderr, "ERROR: No version or date/time is specified.\n");
printHelpTeaser(name);
@ -1821,13 +1852,16 @@ ACTOR Future<Void> expireBackupData(const char *name, std::string destinationCon
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
Void _ = wait(c->expireData(endVersion));
Void _ = wait(c->expireData(endVersion, force, restorableAfterVersion));
printf("All data before version %lld is deleted.\n", endVersion);
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
if(e.code() == error_code_backup_cannot_expire)
fprintf(stderr, "ERROR: Requested expiration would be unsafe. Backup would not meet minimum restorability. Use --force to delete data anyway.\n");
else
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
@ -1862,10 +1896,12 @@ ACTOR Future<Void> deleteBackupContainer(const char *name, std::string destinati
return Void();
}
ACTOR Future<Void> describeBackup(const char *name, std::string destinationContainer) {
ACTOR Future<Void> describeBackup(const char *name, std::string destinationContainer, bool deep, Optional<Database> cx) {
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
BackupDescription desc = wait(c->describeBackup());
state BackupDescription desc = wait(c->describeBackup(deep));
if(cx.present())
Void _ = wait(desc.resolveVersionTimes(cx.get()));
printf("%s\n", desc.toString().c_str());
}
catch (Error& e) {
@ -2226,12 +2262,15 @@ int main(int argc, char* argv[]) {
}
std::string destinationContainer;
bool describeDeep = false;
int snapshotIntervalSeconds = CLIENT_KNOBS->BACKUP_DEFAULT_SNAPSHOT_INTERVAL_SEC;
std::string clusterFile;
std::string sourceClusterFile;
std::string baseUrl;
std::string datetime;
Version expVersion = 0;
std::string expireDatetime;
Version expireVersion = 0;
std::string expireRestorableAfterDatetime;
Version expireRestorableAfterVersion = std::numeric_limits<Version>::max();
std::vector<std::pair<std::string, std::string>> knobs;
std::string tagName = BackupAgentBase::getDefaultTag().toString();
bool tagProvided = false;
@ -2304,7 +2343,8 @@ int main(int argc, char* argv[]) {
break;
}
switch (args->OptionId()) {
int optId = args->OptionId();
switch (optId) {
case OPT_HELP:
printUsage(programExe, false);
return FDB_EXIT_SUCCESS;
@ -2355,18 +2395,26 @@ int main(int argc, char* argv[]) {
localities.set(Standalone<StringRef>(syn), Standalone<StringRef>(std::string(args->OptionArg())));
break;
}
case OPT_DATETIME:
datetime = args->OptionArg();
case OPT_EXPIRE_BEFORE_DATETIME:
expireDatetime = args->OptionArg();
break;
case OPT_EXPVERSION: {
case OPT_EXPIRE_RESTORABLE_AFTER_DATETIME:
expireRestorableAfterDatetime = args->OptionArg();
break;
case OPT_EXPIRE_BEFORE_VERSION:
case OPT_EXPIRE_RESTORABLE_AFTER_VERSION:
{
const char* a = args->OptionArg();
long long expVersionValue = 0;
if (!sscanf(a, "%lld", &expVersionValue)) {
long long ver = 0;
if (!sscanf(a, "%lld", &ver)) {
fprintf(stderr, "ERROR: Could not parse expiration version `%s'\n", a);
printHelpTeaser(argv[0]);
return FDB_EXIT_ERROR;
}
expVersion = expVersionValue;
if(optId == OPT_EXPIRE_BEFORE_VERSION)
expireVersion = ver;
else
expireRestorableAfterVersion = ver;
break;
}
case OPT_BASEURL:
@ -2430,6 +2478,9 @@ int main(int argc, char* argv[]) {
if(StringRef(restoreContainer).startsWith(LiteralStringRef("/")))
restoreContainer = std::string("file://") + restoreContainer;
break;
case OPT_DESCRIBE_DEEP:
describeDeep = true;
break;
case OPT_PREFIX_ADD:
addPrefix = args->OptionArg();
break;
@ -2661,13 +2712,14 @@ int main(int argc, char* argv[]) {
}
}
auto initCluster = [&]() {
auto initCluster = [&](bool quiet = false) {
auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile);
try {
ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedClusterFile.first));
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
if(!quiet)
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
return false;
}
@ -2772,10 +2824,10 @@ int main(int argc, char* argv[]) {
break;
case BACKUP_EXPIRE:
if(!datetime.empty())
if(!expireDatetime.empty())
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( expireBackupData(argv[0], destinationContainer, expVersion, datetime, db) );
f = stopAfter( expireBackupData(argv[0], destinationContainer, expireVersion, expireDatetime, db, forceAction, expireRestorableAfterVersion, expireRestorableAfterDatetime) );
break;
case BACKUP_DELETE:
@ -2783,7 +2835,7 @@ int main(int argc, char* argv[]) {
break;
case BACKUP_DESCRIBE:
f = stopAfter( describeBackup(argv[0], destinationContainer) );
f = stopAfter( describeBackup(argv[0], destinationContainer, describeDeep, initCluster(true) ? db : Optional<Database>()) );
break;
case BACKUP_LIST:

View File

@ -274,8 +274,6 @@ public:
// will return when the backup directory is restorable.
Future<int> waitBackup(Database cx, std::string tagName, bool stopWhenDone = true);
static Future<std::string> getBackupInfo(std::string backupContainer, Version* defaultVersion = NULL);
static const Key keyLastRestorable;
Future<int64_t> getTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->getTaskCount(tr); }

View File

@ -26,11 +26,16 @@
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/Platform.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/RunTransaction.actor.h"
#include <algorithm>
#include <time.h>
namespace IBackupFile_impl {
ACTOR Future<Void> appendString(Reference<IBackupFile> file, Standalone<StringRef> s) {
ACTOR Future<Void> appendStringRefWithLen(Reference<IBackupFile> file, Standalone<StringRef> s) {
state uint32_t lenBuf = bigEndian32((uint32_t)s.size());
Void _ = wait(file->append(&lenBuf, sizeof(lenBuf)));
Void _ = wait(file->append(s.begin(), s.size()));
@ -38,31 +43,124 @@ namespace IBackupFile_impl {
}
}
Future<Void> IBackupFile::appendString(Standalone<StringRef> s) {
return IBackupFile_impl::appendString(Reference<IBackupFile>::addRef(this), s);
Future<Void> IBackupFile::appendStringRefWithLen(Standalone<StringRef> s) {
return IBackupFile_impl::appendStringRefWithLen(Reference<IBackupFile>::addRef(this), s);
}
ACTOR Future<Optional<int64_t>> timeKeeperDateFromVersion(Version v, Reference<ReadYourWritesTransaction> tr) {
state KeyBackedMap<int64_t, Version> versionMap(timeKeeperPrefixRange.begin);
// Binary search to find the closest date with a version <= v
state int64_t min = 0;
state int64_t max = (int64_t)now();
state int64_t mid;
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
mid = (min + max) / 2;
state std::vector<std::pair<int64_t, Version>> results = wait( versionMap.getRange(tr, min, mid, 1, false, true) );
if (results.size() != 1)
return Optional<int64_t>();
Version foundVersion = results[0].second;
int64_t foundTime = results[0].first;
if(v < foundVersion)
max = foundTime;
else {
if(foundTime == min) {
return foundTime + (v - foundVersion) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
}
min = foundTime;
}
}
}
std::string formatTime(int64_t t) {
time_t curTime = (time_t)t;
char buffer[128];
struct tm timeinfo;
localtime_r(&curTime, &timeinfo);
strftime(buffer, 128, "%Y-%m-%d %H:%M:%S", &timeinfo);
return buffer;
}
Future<Void> fetchTimes(Reference<ReadYourWritesTransaction> tr, std::map<Version, int64_t> *pVersionTimeMap) {
std::vector<Future<Void>> futures;
// Resolve each version in the map,
for(auto &p : *pVersionTimeMap) {
futures.push_back(map(timeKeeperDateFromVersion(p.first, tr), [=](Optional<int64_t> t) {
if(t.present())
pVersionTimeMap->at(p.first) = t.get();
else
pVersionTimeMap->erase(p.first);
return Void();
}));
}
return waitForAll(futures);
}
Future<Void> BackupDescription::resolveVersionTimes(Database cx) {
// Populate map with versions needed
versionTimeMap.clear();
for(const KeyspaceSnapshotFile &m : snapshots) {
versionTimeMap[m.beginVersion];
versionTimeMap[m.endVersion];
}
if(minLogBegin.present())
versionTimeMap[minLogBegin.get()];
if(maxLogEnd.present())
versionTimeMap[maxLogEnd.get()];
if(contiguousLogEnd.present())
versionTimeMap[contiguousLogEnd.get()];
if(minRestorableVersion.present())
versionTimeMap[minRestorableVersion.get()];
if(maxRestorableVersion.present())
versionTimeMap[maxRestorableVersion.get()];
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) { return fetchTimes(tr, &versionTimeMap); });
};
std::string BackupDescription::toString() const {
std::string info;
info.append(format("URL: %s\n", url.c_str()));
info.append(format("Restorable: %s\n", maxRestorableVersion.present() ? "true" : "false"));
for(const KeyspaceSnapshotFile &m : snapshots)
info.append(format("Snapshot: startVersion=%lld endVersion=%lld totalBytes=%lld\n", m.beginVersion, m.endVersion, m.totalSize));
auto formatVersion = [&](Version v) {
std::string s;
if(!versionTimeMap.empty()) {
auto i = versionTimeMap.find(v);
if(i != versionTimeMap.end())
s = format("%lld (%s)", v, formatTime(i->second).c_str());
else
s = format("%lld (unknown)", v);
}
else {
s = format("%lld", v);
}
return s;
};
for(const KeyspaceSnapshotFile &m : snapshots) {
info.append(format("Snapshot: startVersion=%s endVersion=%s totalBytes=%lld restorable=%s\n",
formatVersion(m.beginVersion).c_str(), formatVersion(m.endVersion).c_str(), m.totalSize, m.restorable.orDefault(false) ? "true" : "false"));
}
info.append(format("SnapshotBytes: %lld\n", snapshotBytes));
if(minLogBegin.present())
info.append(format("MinLogBeginVersion: %lld\n", minLogBegin.get()));
info.append(format("MinLogBeginVersion: %s\n", formatVersion(minLogBegin.get()).c_str()));
if(contiguousLogEnd.present())
info.append(format("MaxContiguousLogVersion: %lld\n", contiguousLogEnd.get()));
info.append(format("ContiguousLogEndVersion: %s\n", formatVersion(contiguousLogEnd.get()).c_str()));
if(maxLogEnd.present())
info.append(format("MaxLogEndVersion: %lld\n", maxLogEnd.get()));
info.append(format("MaxLogEndVersion: %s\n", formatVersion(maxLogEnd.get()).c_str()));
if(minRestorableVersion.present())
info.append(format("MinRestorableVersion: %lld\n", minRestorableVersion.get()));
info.append(format("MinRestorableVersion: %s\n", formatVersion(minRestorableVersion.get()).c_str()));
if(maxRestorableVersion.present())
info.append(format("MaxRestorableVersion: %lld\n", maxRestorableVersion.get()));
info.append(format("LogBytes: %lld\n", logBytes));
info.append(format("MaxRestorableVersion: %s\n", formatVersion(maxRestorableVersion.get()).c_str()));
if(!extendedDetail.empty())
info.append("ExtendedDetail: ").append(extendedDetail);
@ -96,8 +194,10 @@ public:
virtual Future<Void> create() = 0;
// Get a list of fileNames and their sizes in the container under the given path
// The implementation can (but does not have to) use the folder path filter to avoid traversing
// specific subpaths.
typedef std::vector<std::pair<std::string, int64_t>> FilesAndSizesT;
virtual Future<FilesAndSizesT> listFiles(std::string path = "") = 0;
virtual Future<FilesAndSizesT> listFiles(std::string path = "", std::function<bool(std::string const &)> folderPathFilter = nullptr) = 0;
// Open a file for read by fileName
virtual Future<Reference<IAsyncFile>> readFile(std::string fileName) = 0;
@ -241,7 +341,7 @@ public:
json_spirit::mArray &array = (doc.create("files") = json_spirit::mArray()).get_array();
Version minVer = std::numeric_limits<Version>::max();
Version maxVer = std::numeric_limits<Version>::min();
Version maxVer = 0;
RangeFile rf;
for(auto &f : fileNames) {
@ -273,9 +373,23 @@ public:
return writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem>::addRef(this), fileNames, totalBytes);
};
// List files in sorted order but without regard for overlaps in version ranges
Future<std::vector<LogFile>> listLogFiles(Version beginVersion = std::numeric_limits<Version>::min(), Version endVersion = std::numeric_limits<Version>::max()) {
return map(listFiles("logs/"), [=](const FilesAndSizesT &files) {
// List log files which contain data at any version >= beginVersion and < endVersion
// Lists files in sorted order by begin version. Does not check that results are non overlapping or contiguous.
Future<std::vector<LogFile>> listLogFiles(Version beginVersion = 0, Version endVersion = std::numeric_limits<Version>::max()) {
// The first relevant log file could have a begin version less than beginVersion based on the knobs which determine log file range size,
// so start at an earlier version adjusted by how many versions a file could contain.
//
// This path would be the first that could contain relevant results for this operation.
Standalone<StringRef> firstPath(format("logs/%s/", logVersionFolderString(beginVersion - (CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE)).c_str()));
// This path would be the last that could contain relevant results for this operation.
Standalone<StringRef> lastPath(format("logs/%s/", logVersionFolderString(endVersion).c_str()));
std::function<bool(std::string const &)> pathFilter = [=](const std::string &folderPath) {
return firstPath.startsWith(folderPath) || lastPath.startsWith(folderPath)
|| (folderPath > firstPath && folderPath < lastPath);
};
return map(listFiles("logs/", pathFilter), [=](const FilesAndSizesT &files) {
std::vector<LogFile> results;
LogFile lf;
for(auto &f : files) {
@ -286,8 +400,20 @@ public:
return results;
});
}
Future<std::vector<RangeFile>> listRangeFiles(Version beginVersion = std::numeric_limits<Version>::min(), Version endVersion = std::numeric_limits<Version>::max()) {
return map(listFiles("ranges/"), [=](const FilesAndSizesT &files) {
// List range files, in sorted version order, which contain data at or between beginVersion and endVersion
Future<std::vector<RangeFile>> listRangeFiles(Version beginVersion = 0, Version endVersion = std::numeric_limits<Version>::max()) {
// This path would be the first that could contain relevant results for this operation.
Standalone<StringRef> firstPath(format("ranges/%s/", rangeVersionFolderString(beginVersion).c_str()));
// This path would be the last that could contain relevant results for this operation.
Standalone<StringRef> lastPath(format("ranges/%s/", rangeVersionFolderString(endVersion).c_str()));
std::function<bool(std::string const &)> pathFilter = [=](const std::string &folderPath) {
return firstPath.startsWith(folderPath) || lastPath.startsWith(folderPath)
|| (folderPath > firstPath && folderPath < lastPath);
};
return map(listFiles("ranges/", pathFilter), [=](const FilesAndSizesT &files) {
std::vector<RangeFile> results;
RangeFile rf;
for(auto &f : files) {
@ -298,12 +424,14 @@ public:
return results;
});
}
Future<std::vector<KeyspaceSnapshotFile>> listKeyspaceSnapshots(Version beginVersion = std::numeric_limits<Version>::min(), Version endVersion = std::numeric_limits<Version>::max()) {
// List snapshots which have been fully written, in sorted beginVersion order.
Future<std::vector<KeyspaceSnapshotFile>> listKeyspaceSnapshots() {
return map(listFiles("snapshots/"), [=](const FilesAndSizesT &files) {
std::vector<KeyspaceSnapshotFile> results;
KeyspaceSnapshotFile sf;
for(auto &f : files) {
if(pathToKeyspaceSnapshotFile(sf, f.first) && sf.endVersion > beginVersion && sf.beginVersion < endVersion)
if(pathToKeyspaceSnapshotFile(sf, f.first))
results.push_back(sf);
}
std::sort(results.begin(), results.end());
@ -311,54 +439,100 @@ public:
});
}
ACTOR static Future<FullBackupListing> listBackup_impl(Reference<BackupContainerFileSystem> bc) {
ACTOR static Future<FullBackupListing> dumpFileList_impl(Reference<BackupContainerFileSystem> bc) {
state Future<std::vector<RangeFile>> fRanges = bc->listRangeFiles(0, std::numeric_limits<Version>::max());
state Future<std::vector<KeyspaceSnapshotFile>> fSnapshots = bc->listKeyspaceSnapshots(0, std::numeric_limits<Version>::max());
state Future<std::vector<KeyspaceSnapshotFile>> fSnapshots = bc->listKeyspaceSnapshots();
state Future<std::vector<LogFile>> fLogs = bc->listLogFiles(0, std::numeric_limits<Version>::max());
Void _ = wait(success(fRanges) && success(fSnapshots) && success(fLogs));
return FullBackupListing({fRanges.get(), fLogs.get(), fSnapshots.get()});
}
Future<FullBackupListing> listBackup() {
return listBackup_impl(Reference<BackupContainerFileSystem>::addRef(this));
Future<FullBackupListing> dumpFileList() {
return dumpFileList_impl(Reference<BackupContainerFileSystem>::addRef(this));
}
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc) {
ACTOR static Future<BackupDescription> describeBackup_impl(Reference<BackupContainerFileSystem> bc, bool deepScan) {
state BackupDescription desc;
desc.url = bc->getURL();
state std::vector<KeyspaceSnapshotFile> snapshots = wait(bc->listKeyspaceSnapshots());
// This is the range of logs we'll have to list to determine log continuity
state Version scanBegin = 0;
state Version scanEnd = std::numeric_limits<Version>::max();
// Get range for which we know there are logs, if available
state Optional<Version> begin;
state Optional<Version> end;
if(!deepScan) {
Void _ = wait(store(bc->logBeginVersion().get(), begin) && store(bc->logEndVersion().get(), end));
}
// Use the known log range if present
if(begin.present() && end.present()) {
// Move scanBegin to the end since there's no reason to scan [begin, end) as it is assumed present and contiguous.
scanBegin = end.get();
// Set the known minimum log begin for the contiguous log timeline
desc.minLogBegin = begin.get();
desc.maxLogEnd = end.get();
desc.contiguousLogEnd = end.get();
}
std::vector<KeyspaceSnapshotFile> snapshots = wait(bc->listKeyspaceSnapshots());
desc.snapshots = snapshots;
std::vector<LogFile> logs = wait(bc->listLogFiles(0, std::numeric_limits<Version>::max()));
std::vector<LogFile> logs = wait(bc->listLogFiles(scanBegin, scanEnd));
if(!logs.empty()) {
desc.maxLogEnd = logs.rbegin()->endVersion;
auto i = logs.begin();
desc.logBytes = i->fileSize;
desc.minLogBegin = i->beginVersion;
desc.contiguousLogEnd = i->endVersion;
// If we didn't get log versions above then seed them using the first log file
if(!desc.contiguousLogEnd.present()) {
desc.minLogBegin = i->beginVersion;
desc.contiguousLogEnd = i->endVersion;
++i;
}
auto &end = desc.contiguousLogEnd.get(); // For convenience to make loop cleaner
// Advance until continuity is broken
while(++i != logs.end()) {
desc.logBytes += i->fileSize;
while(i != logs.end()) {
if(i->beginVersion > end)
break;
// If the next link in the log chain is found, update the end
if(i->beginVersion == end)
end = i->endVersion;
}
// Scan the rest of the logs to update the total size
while(i != logs.end()) {
desc.logBytes += i->fileSize;
++i;
}
}
for(auto &s : snapshots) {
// Try to update the saved log versions if they are not set and we have values for them,
// but ignore errors in the update attempt in case the container is not writeable
// Also update logEndVersion if it has a value but it is less than contiguousLogEnd
try {
state Future<Void> updates = Void();
if(desc.minLogBegin.present() && !begin.present())
updates = updates && bc->logBeginVersion().set(desc.minLogBegin.get());
if(desc.contiguousLogEnd.present() && (!end.present() || end.get() < desc.contiguousLogEnd.get()) )
updates = updates && bc->logEndVersion().set(desc.contiguousLogEnd.get());
Void _ = wait(updates);
} catch(Error &e) {
if(e.code() == error_code_actor_cancelled)
throw;
TraceEvent(SevWarn, "BackupContainerSafeVersionUpdateFailure").detail("URL", bc->getURL());
}
for(auto &s : desc.snapshots) {
// Calculate restorability of each snapshot. Assume true, then try to prove false
s.restorable = true;
// If this is not a single-version snapshot then see if the available contiguous logs cover its range
if(s.beginVersion != s.endVersion) {
if(!desc.minLogBegin.present() || desc.minLogBegin.get() > s.beginVersion)
s.restorable = false;
if(!desc.contiguousLogEnd.present() || desc.contiguousLogEnd.get() < s.endVersion)
s.restorable = false;
}
desc.snapshotBytes += s.totalSize;
// If the snapshot is at a single version then it requires no logs. Update min and max restorable.
@ -385,32 +559,119 @@ public:
}
// Uses the virtual methods to describe the backup contents
Future<BackupDescription> describeBackup() {
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this));
Future<BackupDescription> describeBackup(bool deepScan = false) {
return describeBackup_impl(Reference<BackupContainerFileSystem>::addRef(this), deepScan);
}
ACTOR static Future<Void> expireData_impl(Reference<BackupContainerFileSystem> bc, Version endVersion) {
ACTOR static Future<Void> expireData_impl(Reference<BackupContainerFileSystem> bc, Version expireEndVersion, bool force, Version restorableBeginVersion) {
if(restorableBeginVersion < expireEndVersion)
throw backup_cannot_expire();
state Version scanBegin = 0;
// Get the backup description.
state BackupDescription desc = wait(bc->describeBackup());
// Assume force is needed, then try to prove otherwise.
// Force is required if there is not a restorable snapshot which both
// - begins at or after expireEndVersion
// - ends at or before restorableBeginVersion
bool forceNeeded = true;
for(KeyspaceSnapshotFile &s : desc.snapshots) {
if(s.restorable.orDefault(false) && s.beginVersion >= expireEndVersion && s.endVersion <= restorableBeginVersion) {
forceNeeded = false;
break;
}
}
if(forceNeeded && !force)
throw backup_cannot_expire();
// Get metadata
state Optional<Version> expiredEnd;
state Optional<Version> logBegin;
state Optional<Version> logEnd;
Void _ = wait(store(bc->expiredEndVersion().get(), expiredEnd) && store(bc->logBeginVersion().get(), logBegin) && store(bc->logEndVersion().get(), logEnd));
// Update scan range if expiredEnd is present
if(expiredEnd.present()) {
if(expireEndVersion <= expiredEnd.get()) {
// If the expire request is to the version already expired to then there is no work to do so return true
return Void();
}
scanBegin = expiredEnd.get();
}
// Get log files that contain any data at or before expireEndVersion
state std::vector<LogFile> logs = wait(bc->listLogFiles(scanBegin, expireEndVersion));
// Get range files up to and including expireEndVersion
state std::vector<RangeFile> ranges = wait(bc->listRangeFiles(scanBegin, expireEndVersion));
// The new logBeginVersion will be taken from the last log file, if there is one
state Optional<Version> newLogBeginVersion;
if(!logs.empty()) {
LogFile &last = logs.back();
// If the last log ends at expireEndVersion then that will be the next log begin
if(last.endVersion == expireEndVersion) {
newLogBeginVersion = expireEndVersion;
}
else {
// If the last log overlaps the expiredEnd then use the log's begin version
if(last.endVersion > expireEndVersion) {
newLogBeginVersion = last.beginVersion;
logs.pop_back();
}
}
}
// If we have a new log begin version then potentially update the property but definitely set
// expireEndVersion to the new log begin because we will only be deleting files up to but not
// including that version.
if(newLogBeginVersion.present()) {
expireEndVersion = newLogBeginVersion.get();
// If the new version is greater than the existing one or the
// existing one is not present then write the new value
if(logBegin.orDefault(0) < newLogBeginVersion.get()) {
Void _ = wait(bc->logBeginVersion().set(newLogBeginVersion.get()));
}
}
else {
// Otherwise, if the old logBeginVersion is present and older than expireEndVersion then clear it because
// it refers to a version in a range we're about to delete and apparently continuity through
// expireEndVersion is broken.
if(logBegin.present() && logBegin.get() < expireEndVersion)
Void _ = wait(bc->logBeginVersion().clear());
}
// Delete files
state std::vector<Future<Void>> deletes;
std::vector<LogFile> logs = wait(bc->listLogFiles(0, endVersion));
for(auto const &f : logs)
for(auto const &f : logs) {
deletes.push_back(bc->deleteFile(f.fileName));
}
std::vector<RangeFile> ranges = wait(bc->listRangeFiles(0, endVersion));
for(auto const &f : ranges)
deletes.push_back(bc->deleteFile(f.fileName));
for(auto const &f : ranges) {
// Must recheck version because list returns data up to and including the given endVersion
if(f.version < expireEndVersion)
deletes.push_back(bc->deleteFile(f.fileName));
}
std::vector<KeyspaceSnapshotFile> snapshots = wait(bc->listKeyspaceSnapshots(0, endVersion));
for(auto const &f : snapshots)
deletes.push_back(bc->deleteFile(f.fileName));
for(auto const &f : desc.snapshots) {
if(f.endVersion < expireEndVersion)
deletes.push_back(bc->deleteFile(f.fileName));
}
Void _ = wait(waitForAll(deletes));
// Update the expiredEndVersion property.
Void _ = wait(bc->expiredEndVersion().set(expireEndVersion));
return Void();
}
// Delete all data up to (but not including endVersion)
Future<Void> expireData(Version endVersion) {
return expireData_impl(Reference<BackupContainerFileSystem>::addRef(this), endVersion);
Future<Void> expireData(Version expireEndVersion, bool force, Version restorableBeginVersion) {
return expireData_impl(Reference<BackupContainerFileSystem>::addRef(this), expireEndVersion, force, restorableBeginVersion);
}
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc, Version targetVersion) {
@ -465,6 +726,74 @@ public:
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion){
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion);
}
private:
struct VersionProperty {
VersionProperty(Reference<BackupContainerFileSystem> bc, std::string name) : bc(bc), path("properties/" + name) {}
Reference<BackupContainerFileSystem> bc;
std::string path;
Future<Optional<Version>> get() {
return readVersionProperty(bc, path);
}
Future<Void> set(Version v) {
return writeVersionProperty(bc, path, v);
}
Future<Void> clear() {
return bc->deleteFile(path);
}
};
public:
// To avoid the need to scan the underyling filesystem in many cases, some important version boundaries are stored in named files.
// These files can be deleted from the filesystem if they appear to be wrong or corrupt, and full scans will done
// when needed.
//
// The three versions below, when present, describe 4 version ranges which collectively cover the entire version timeline.
// 0 - expiredEndVersion: All files in this range have been deleted
// expiredEndVersion - presentBeginVersion: Files in this range *may* have been deleted so their presence must not be assumed.
// presentBeginVersion - presentEndVersion: Files in this range have NOT been deleted by any FDB backup operations.
// presentEndVersion - infinity: Files in this range may or may not exist yet. Scan to find what is there.
//
VersionProperty logBeginVersion() { return {Reference<BackupContainerFileSystem>::addRef(this), "log_begin_version"}; }
VersionProperty logEndVersion() { return {Reference<BackupContainerFileSystem>::addRef(this), "log_end_version"}; }
VersionProperty expiredEndVersion() { return {Reference<BackupContainerFileSystem>::addRef(this), "expired_end_version"}; }
ACTOR static Future<Void> writeVersionProperty(Reference<BackupContainerFileSystem> bc, std::string path, Version v) {
try {
state Reference<IBackupFile> f = wait(bc->writeFile(path));
std::string s = format("%lld", v);
Void _ = wait(f->append(s.data(), s.size()));
Void _ = wait(f->finish());
return Void();
} catch(Error &e) {
if(e.code() != error_code_actor_cancelled)
TraceEvent(SevWarn, "BackupContainerWritePropertyFailed").detail("Path", path).error(e);
throw;
}
}
ACTOR static Future<Optional<Version>> readVersionProperty(Reference<BackupContainerFileSystem> bc, std::string path) {
try {
state Reference<IAsyncFile> f = wait(bc->readFile(path));
state int64_t size = wait(f->size());
state std::string s;
s.resize(size);
int rs = wait(f->read((uint8_t *)s.data(), size, 0));
Version v;
int len;
if(rs == size && sscanf(s.c_str(), "%lld%n", &v, &len) == 1 && len == size)
return v;
TraceEvent(SevWarn, "BackupContainerInvalidProperty");
throw backup_invalid_info();
} catch(Error &e) {
if(e.code() == error_code_file_not_found)
return Optional<Version>();
if(e.code() != error_code_actor_cancelled)
TraceEvent(SevWarn, "BackupContainerReadPropertyFailed").detail("Path", path).error(e);
throw;
}
}
};
class BackupContainerLocalDirectory : public BackupContainerFileSystem, ReferenceCounted<BackupContainerLocalDirectory> {
@ -597,13 +926,13 @@ public:
return Void();
}
Future<FilesAndSizesT> listFiles(std::string path) {
Future<FilesAndSizesT> listFiles(std::string path, std::function<bool(std::string const &)>) {
FilesAndSizesT results;
std::vector<std::string> files;
platform::findFilesRecursively(joinPath(m_path, path), files);
// Remove .lnk files from resultst, they are a side effect of a backup that was *read* during simulation. See openFile() above for more info on why they are created.
// Remove .lnk files from results, they are a side effect of a backup that was *read* during simulation. See openFile() above for more info on why they are created.
if(g_network->isSimulated())
files.erase(std::remove_if(files.begin(), files.end(), [](std::string const &f) { return StringRef(f).endsWith(LiteralStringRef(".lnk")); }), files.end());
@ -622,7 +951,7 @@ public:
// that should't be deleted. Also, platform::eraseDirectoryRecursive() intentionally doesn't work outside
// of the pre-simulation phase.
// By just expiring ALL data, only parsable backup files in the correct locations will be deleted.
return expireData(std::numeric_limits<Version>::max());
return expireData(std::numeric_limits<Version>::max(), true, std::numeric_limits<Version>::max());
}
private:
@ -698,32 +1027,28 @@ public:
return m_bstore->deleteObject(BACKUP_BUCKET, m_name + "/" + path);
}
ACTOR static Future<FilesAndSizesT> listFiles_impl(Reference<BackupContainerBlobStore> bc, std::string path) {
state BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listBucket(BACKUP_BUCKET, bc->m_name + "/" + path, '/', std::numeric_limits<int>::max()));
ACTOR static Future<FilesAndSizesT> listFiles_impl(Reference<BackupContainerBlobStore> bc, std::string path, std::function<bool(std::string const &)> pathFilter) {
// pathFilter expects container based paths, so create a wrapper which converts a raw path
// to a container path by removing the known backup name prefix.
int prefixTrim = bc->m_name.size() + 1;
std::function<bool(std::string const &)> rawPathFilter = [=](const std::string &folderPath) {
return pathFilter(folderPath.substr(prefixTrim));
};
state BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listBucket(BACKUP_BUCKET, bc->m_name + "/" + path, '/', std::numeric_limits<int>::max(), rawPathFilter));
FilesAndSizesT files;
for(auto &o : result.objects)
files.push_back({o.name.substr(bc->m_name.size() + 1), o.size});
return files;
}
Future<FilesAndSizesT> listFiles(std::string path) {
return listFiles_impl(Reference<BackupContainerBlobStore>::addRef(this), path);
Future<FilesAndSizesT> listFiles(std::string path, std::function<bool(std::string const &)> pathFilter) {
return listFiles_impl(Reference<BackupContainerBlobStore>::addRef(this), path, pathFilter);
}
ACTOR static Future<Void> create_impl(Reference<BackupContainerBlobStore> bc) {
Void _ = wait(bc->m_bstore->createBucket(BACKUP_BUCKET));
/*
Optional<BackupInfo> info = wait(bc->readInfo());
if(info.present())
throw backup_duplicate();
BackupInfo newInfo;
Void _ = wait(bc->writeInfo(newInfo));
*/
return Void();
}
@ -731,7 +1056,6 @@ public:
return create_impl(Reference<BackupContainerBlobStore>::addRef(this));
}
// TODO: If there is a need, this can be made faster by discovering common prefixes and listing levels of the folder structure in parallel.
ACTOR static Future<Void> deleteContainer_impl(Reference<BackupContainerBlobStore> bc, int *pNumDeleted) {
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
state Future<Void> done = bc->m_bstore->listBucketStream(BACKUP_BUCKET, resultStream, bc->m_name + "/", '/', std::numeric_limits<int>::max());
@ -918,7 +1242,7 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
Void _ = wait(c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size()));
FullBackupListing listing = wait(c->listBackup());
FullBackupListing listing = wait(c->dumpFileList());
ASSERT(listing.logs.size() == 2);
ASSERT(listing.ranges.size() == 3);
ASSERT(listing.snapshots.size() == 2);
@ -959,7 +1283,7 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
ASSERT(d.snapshots.size() == desc.snapshots.size());
ASSERT(d.maxRestorableVersion == desc.maxRestorableVersion);
Void _ = wait(c->expireData(161 * versionMultiplier));
Void _ = wait(c->expireData(161 * versionMultiplier, true));
BackupDescription d = wait(c->describeBackup());
printf("Backup Description\n%s", d.toString().c_str());
ASSERT(d.snapshots.size() == 1);

View File

@ -23,6 +23,7 @@
#include "flow/flow.h"
#include "fdbrpc/IAsyncFile.h"
#include "FDBTypes.h"
#include "fdbclient/NativeAPI.h"
#include <vector>
// Append-only file interface for writing backup data
@ -45,7 +46,7 @@ public:
virtual void addref() = 0;
virtual void delref() = 0;
Future<Void> appendString(Standalone<StringRef> s);
Future<Void> appendStringRefWithLen(Standalone<StringRef> s);
protected:
std::string m_fileName;
int64_t m_offset;
@ -83,6 +84,7 @@ struct KeyspaceSnapshotFile {
Version endVersion;
std::string fileName;
int64_t totalSize;
Optional<bool> restorable; // Whether or not the snapshot can be used in a restore, if known
// Order by beginVersion, break ties with endVersion
bool operator< (const KeyspaceSnapshotFile &rhs) const {
@ -98,7 +100,7 @@ struct FullBackupListing {
// The byte counts here only include usable log files and byte counts from kvrange manifests
struct BackupDescription {
BackupDescription() : snapshotBytes(0), logBytes(0) {}
BackupDescription() : snapshotBytes(0) {}
std::string url;
std::vector<KeyspaceSnapshotFile> snapshots;
int64_t snapshotBytes;
@ -107,8 +109,13 @@ struct BackupDescription {
Optional<Version> contiguousLogEnd;
Optional<Version> maxRestorableVersion;
Optional<Version> minRestorableVersion;
int64_t logBytes;
std::string extendedDetail; // Freeform container-specific info.
// Resolves the versions above to timestamps using a given database's TimeKeeper data.
// toString will use this information if present.
Future<Void> resolveVersionTimes(Database cx);
std::map<Version, int64_t> versionTimeMap;
std::string toString() const;
};
@ -154,17 +161,23 @@ public:
// Open a file for read by name
virtual Future<Reference<IAsyncFile>> readFile(std::string name) = 0;
// Delete all data up to (but not including endVersion)
virtual Future<Void> expireData(Version endVersion) = 0;
// Delete backup files which do not contain any data at or after (more recent than) expireEndVersion.
// If force is false, then nothing will be deleted unless there is a restorable snapshot which
// - begins at or after expireEndVersion
// - ends at or before restorableBeginVersion
// If force is true, data is deleted unconditionally which could leave the backup in an unusable state. This is not recommended.
// Returns true if expiration was done.
virtual Future<Void> expireData(Version expireEndVersion, bool force = false, Version restorableBeginVersion = std::numeric_limits<Version>::max()) = 0;
// Delete entire container. During the process, if pNumDeleted is not null it will be
// updated with the count of deleted files so that progress can be seen.
virtual Future<Void> deleteContainer(int *pNumDeleted = nullptr) = 0;
// Uses the virtual methods to describe the backup contents
virtual Future<BackupDescription> describeBackup() = 0;
// Return key details about a backup's contents, possibly using cached or stored metadata
// unless deepScan is true.
virtual Future<BackupDescription> describeBackup(bool deepScan = false) = 0;
virtual Future<FullBackupListing> listBackup() = 0;
virtual Future<FullBackupListing> dumpFileList() = 0;
// Get exactly the files necessary to restore to targetVersion. Returns non-present if
// restore to given version is not possible.

View File

@ -415,9 +415,9 @@ namespace fileBackup {
// If this is NOT the first block then write duplicate stuff needed from last block
if(self->blockEnd > self->blockSize) {
Void _ = wait(self->file->appendString(self->lastKey));
Void _ = wait(self->file->appendString(self->lastKey));
Void _ = wait(self->file->appendString(self->lastValue));
Void _ = wait(self->file->appendStringRefWithLen(self->lastKey));
Void _ = wait(self->file->appendStringRefWithLen(self->lastKey));
Void _ = wait(self->file->appendStringRefWithLen(self->lastValue));
}
// There must now be room in the current block for bytesNeeded or the block size is too small
@ -438,8 +438,8 @@ namespace fileBackup {
ACTOR static Future<Void> writeKV_impl(RangeFileWriter *self, Key k, Value v) {
int toWrite = sizeof(int32_t) + k.size() + sizeof(int32_t) + v.size();
Void _ = wait(self->newBlockIfNeeded(toWrite));
Void _ = wait(self->file->appendString(k));
Void _ = wait(self->file->appendString(v));
Void _ = wait(self->file->appendStringRefWithLen(k));
Void _ = wait(self->file->appendStringRefWithLen(v));
self->lastKey = k;
self->lastValue = v;
return Void();
@ -451,7 +451,7 @@ namespace fileBackup {
ACTOR static Future<Void> writeKey_impl(RangeFileWriter *self, Key k) {
int toWrite = sizeof(uint32_t) + k.size();
Void _ = wait(self->newBlockIfNeeded(toWrite));
Void _ = wait(self->file->appendString(k));
Void _ = wait(self->file->appendStringRefWithLen(k));
return Void();
}
@ -589,8 +589,8 @@ namespace fileBackup {
Void _ = wait(self->file->append((uint8_t *)&self->fileVersion, sizeof(self->fileVersion)));
}
Void _ = wait(self->file->appendString(k));
Void _ = wait(self->file->appendString(v));
Void _ = wait(self->file->appendStringRefWithLen(k));
Void _ = wait(self->file->appendStringRefWithLen(v));
// At this point we should be in whatever the current block is or the block size is too small
if(self->file->size() > self->blockEnd)
@ -3517,7 +3517,9 @@ public:
ACTOR static Future<Version> restore(FileBackupAgent* backupAgent, Database cx, Key tagName, Key url, bool waitForComplete, Version targetVersion, bool verbose, KeyRange range, Key addPrefix, Key removePrefix, bool lockDB, UID randomUid) {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
BackupDescription desc = wait(bc->describeBackup());
state BackupDescription desc = wait(bc->describeBackup());
Void _ = wait(desc.resolveVersionTimes(cx));
printf("Backup Description\n%s", desc.toString().c_str());
if(targetVersion == invalidVersion && desc.maxRestorableVersion.present())
targetVersion = desc.maxRestorableVersion.get();
@ -3714,10 +3716,3 @@ Future<int> FileBackupAgent::waitBackup(Database cx, std::string tagName, bool s
return FileBackupAgentImpl::waitBackup(this, cx, tagName, stopWhenDone);
}
Future<std::string> FileBackupAgent::getBackupInfo(std::string container, Version* defaultVersion) {
return map(IBackupContainer::openContainer(container)->describeBackup(), [=] (BackupDescription const &d) {
if(defaultVersion != nullptr && d.maxRestorableVersion.present())
*defaultVersion = d.maxRestorableVersion.get();
return d.toString();
});
}

View File

@ -261,7 +261,9 @@ ACTOR Future<int64_t> objectSize_impl(Reference<BlobStoreEndpoint> b, std::strin
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200}));
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200, 404}));
if(r->code == 404)
throw file_not_found();
return r->contentLen;
}
@ -283,7 +285,8 @@ ACTOR Future<json_spirit::mObject> tryReadJSONFile(std::string path) {
ASSERT(r == size);
content = buf.toString();
} catch(Error &e) {
TraceEvent(SevWarn, "BlobCredentialFileError").detail("File", path).error(e).suppressFor(60, true);
if(e.code() != error_code_actor_cancelled)
TraceEvent(SevWarn, "BlobCredentialFileError").detail("File", path).error(e).suppressFor(60, true);
return json_spirit::mObject();
}
@ -295,7 +298,8 @@ ACTOR Future<json_spirit::mObject> tryReadJSONFile(std::string path) {
else
TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").detail("File", path).suppressFor(60, true);
} catch(Error &e) {
TraceEvent(SevWarn, "BlobCredentialFileParseFailed").detail("File", path).error(e).suppressFor(60, true);
if(e.code() != error_code_actor_cancelled)
TraceEvent(SevWarn, "BlobCredentialFileParseFailed").detail("File", path).error(e).suppressFor(60, true);
}
return json_spirit::mObject();
@ -514,7 +518,7 @@ Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen, successCodes);
}
ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, PromiseStream<BlobStoreEndpoint::ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth) {
ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, PromiseStream<BlobStoreEndpoint::ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
// Request 1000 keys at a time, the maximum allowed
state std::string resource = "/";
resource.append(bucket);
@ -593,7 +597,9 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
objectDoc.get("Prefix", p);
// If recursing, queue a sub-request, otherwise add the common prefix to the result.
if(maxDepth > 0) {
subLists.push_back(bstore->listBucketStream(bucket, results, p, delimiter, maxDepth - 1));
// If there is no recurse filter or the filter returns true then start listing the subfolder
if(!recurseFilter || recurseFilter(p))
subLists.push_back(bstore->listBucketStream(bucket, results, p, delimiter, maxDepth - 1, recurseFilter));
if(more)
lastFile = std::move(p);
}
@ -617,7 +623,8 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
}
}
} catch(Error &e) {
TraceEvent(SevWarn, "BlobStoreEndpointListResultParseError").detail("Resource", fullResource).error(e).suppressFor(60, true);
if(e.code() != error_code_actor_cancelled)
TraceEvent(SevWarn, "BlobStoreEndpointListResultParseError").detail("Resource", fullResource).error(e).suppressFor(60, true);
throw http_bad_response();
}
}
@ -627,14 +634,14 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
return Void();
}
Future<Void> BlobStoreEndpoint::listBucketStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth) {
return listBucketStream_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, results, prefix, delimiter, maxDepth);
Future<Void> BlobStoreEndpoint::listBucketStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
return listBucketStream_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, results, prefix, delimiter, maxDepth, recurseFilter);
}
ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth) {
ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
state BlobStoreEndpoint::ListResult results;
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
state Future<Void> done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth);
state Future<Void> done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter);
loop {
choose {
when(Void _ = wait(done)) {
@ -649,8 +656,8 @@ ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreE
return results;
}
Future<BlobStoreEndpoint::ListResult> BlobStoreEndpoint::listBucket(std::string const &bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth) {
return listBucket_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, delimiter, maxDepth);
Future<BlobStoreEndpoint::ListResult> BlobStoreEndpoint::listBucket(std::string const &bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
return listBucket_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, delimiter, maxDepth, recurseFilter);
}
std::string BlobStoreEndpoint::hmac_sha1(std::string const &msg) {

View File

@ -21,6 +21,7 @@
#pragma once
#include <map>
#include <functional>
#include "flow/flow.h"
#include "flow/Net2Packet.h"
#include "fdbclient/Knobs.h"
@ -165,10 +166,12 @@ public:
};
// Get bucket contents via a stream, since listing large buckets will take many serial blob requests
Future<Void> listBucketStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0);
// If a delimiter is passed then common prefixes will be read in parallel, recursively, depending on recurseFilter.
// Recursefilter is a must be a function that takes a string and returns true if it passes. The default behavior is to assume true.
Future<Void> listBucketStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0, std::function<bool(std::string const &)> recurseFilter = nullptr);
// Get a list of the files in a bucket
Future<ListResult> listBucket(std::string const &bucket, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0);
// Get a list of the files in a bucket, see listBucketStream for more argument detail.
Future<ListResult> listBucket(std::string const &bucket, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0, std::function<bool(std::string const &)> recurseFilter = nullptr);
// Check if an object exists in a bucket
Future<bool> objectExists(std::string const &bucket, std::string const &object);

View File

@ -175,7 +175,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
state bool restorable = false;
if(lastBackupContainer) {
BackupDescription desc = wait(lastBackupContainer->describeBackup());
state BackupDescription desc = wait(lastBackupContainer->describeBackup());
Void _ = wait(desc.resolveVersionTimes(cx));
printf("BackupDescription:\n%s\n", desc.toString().c_str());
restorable = desc.maxRestorableVersion.present();
}

View File

@ -171,6 +171,7 @@ ERROR( backup_unneeded, 2312, "Backup unneeded request")
ERROR( backup_bad_block_size, 2313, "Backup file block size too small")
ERROR( backup_invalid_url, 2314, "Backup Container URL invalid")
ERROR( backup_invalid_info, 2315, "Backup Container URL invalid")
ERROR( backup_cannot_expire, 2316, "Cannot expire requested data from backup without violating minimum restorability")
ERROR( restore_invalid_version, 2361, "Invalid restore version")
ERROR( restore_corrupted_data, 2362, "Corrupted backup data")
ERROR( restore_missing_data, 2363, "Missing backup data")