Merge pull request #2170 from etschannen/feature-cleanup-mutations

A few of small features and fixes
This commit is contained in:
Evan Tschannen 2019-09-30 13:44:09 -07:00 committed by GitHub
commit 286020b381
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 380 additions and 180 deletions

View File

@ -380,6 +380,24 @@ The ``list`` subcommand will list the backups at a given 'base' or shortened Bac
This a shortened Backup URL which looks just like a Backup URL but without the backup <name> so that the list command will discover and list all of the backups in the bucket.
.. program:: fdbbackup cleanup
``cleanup``
------------
The ``cleanup`` subcommand will list orphaned backups and DRs and optionally remove their mutations.
::
user@host$ fdbbackup cleanup [--delete_data] [--min_cleanup_seconds] [-C <CLUSTER_FILE>]
``--delete_data``
This flag will cause ``cleanup`` to remove mutations for the most stale backup or DR.
``--min_cleanup_seconds``
Specifies the amount of time a backup or DR needs to be stale before ``cleanup`` will remove mutations for it. By default this is set to one hour.
``fdbrestore`` command line tool
================================

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.4.pkg <https://www.foundationdb.org/downloads/6.2.4/macOS/installers/FoundationDB-6.2.4.pkg>`_
* `FoundationDB-6.2.5.pkg <https://www.foundationdb.org/downloads/6.2.5/macOS/installers/FoundationDB-6.2.5.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.4/ubuntu/installers/foundationdb-clients_6.2.4-1_amd64.deb>`_
* `foundationdb-server-6.2.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.4/ubuntu/installers/foundationdb-server_6.2.4-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.5/ubuntu/installers/foundationdb-clients_6.2.5-1_amd64.deb>`_
* `foundationdb-server-6.2.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.5/ubuntu/installers/foundationdb-server_6.2.5-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel6/installers/foundationdb-clients-6.2.4-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel6/installers/foundationdb-server-6.2.4-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel6/installers/foundationdb-clients-6.2.5-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel6/installers/foundationdb-server-6.2.5-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel7/installers/foundationdb-clients-6.2.4-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel7/installers/foundationdb-server-6.2.4-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel7/installers/foundationdb-clients-6.2.5-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel7/installers/foundationdb-server-6.2.5-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.4-x64.msi <https://www.foundationdb.org/downloads/6.2.4/windows/installers/foundationdb-6.2.4-x64.msi>`_
* `foundationdb-6.2.5-x64.msi <https://www.foundationdb.org/downloads/6.2.5/windows/installers/foundationdb-6.2.5-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.4.tar.gz <https://www.foundationdb.org/downloads/6.2.4/bindings/python/foundationdb-6.2.4.tar.gz>`_
* `foundationdb-6.2.5.tar.gz <https://www.foundationdb.org/downloads/6.2.5/bindings/python/foundationdb-6.2.5.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.4.gem <https://www.foundationdb.org/downloads/6.2.4/bindings/ruby/fdb-6.2.4.gem>`_
* `fdb-6.2.5.gem <https://www.foundationdb.org/downloads/6.2.5/bindings/ruby/fdb-6.2.5.gem>`_
Java 8+
-------
* `fdb-java-6.2.4.jar <https://www.foundationdb.org/downloads/6.2.4/bindings/java/fdb-java-6.2.4.jar>`_
* `fdb-java-6.2.4-javadoc.jar <https://www.foundationdb.org/downloads/6.2.4/bindings/java/fdb-java-6.2.4-javadoc.jar>`_
* `fdb-java-6.2.5.jar <https://www.foundationdb.org/downloads/6.2.5/bindings/java/fdb-java-6.2.5.jar>`_
* `fdb-java-6.2.5-javadoc.jar <https://www.foundationdb.org/downloads/6.2.5/bindings/java/fdb-java-6.2.5-javadoc.jar>`_
Go 1.11+
--------

View File

@ -577,6 +577,7 @@
"max_machine_failures_without_losing_availability":0,
"total_disk_used_bytes":0,
"total_kv_size_bytes":0, // estimated
"system_kv_size_bytes":0, // estimated
"partitions_count":2,
"moving_data":{
"total_written_bytes":0,

View File

@ -47,6 +47,7 @@ Fixes
* Configuring regions would fail with an internal error if the cluster contained storage servers that didn't set a datacenter ID. `(PR #2017) <https://github.com/apple/foundationdb/pull/2017>`_.
* Clients no longer prefer reading from servers with the same zone ID, because it could create hot shards. [6.2.3] `(PR #2019) <https://github.com/apple/foundationdb/pull/2019>`_.
* Data distribution could fail to start if any storage servers had misconfigured locality information. This problem could persist even after the offending storage servers were removed or fixed. [6.2.5] `(PR #2110) <https://github.com/apple/foundationdb/pull/2110>`_.
* Data distribution was running at too high of a priority, which sometimes caused other roles on the same process to stall. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* Loading a 6.1 or newer ``fdb_c`` library as a secondary client using the multi-version client could lead to an infinite recursion when run with API versions older than 610. [6.2.5] `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
* Using C API functions that were removed in 6.1 when using API version 610 or above now results in a compilation error. [6.2.5] `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
@ -66,6 +67,7 @@ Status
* Add ``coordinator`` to the list of roles that can be reported for a process. [6.2.3] `(PR #2006) <https://github.com/apple/foundationdb/pull/2006>`_.
* Added ``worst_durability_lag_storage_server`` and ``limiting_durability_lag_storage_server`` to the ``cluster.qos`` section, each with subfields ``versions`` and ``seconds``. These report the durability lag values being used by ratekeeper to potentially limit the transaction rate. [6.2.3] `(PR #2003) <https://github.com/apple/foundationdb/pull/2003>`_.
* Added ``worst_data_lag_storage_server`` and ``limiting_data_lag_storage_server`` to the ``cluster.qos`` section, each with subfields ``versions`` and ``seconds``. These are meant to replace ``worst_version_lag_storage_server`` and ``limiting_version_lag_storage_server``, which are now deprecated. [6.2.3] `(PR #2003) <https://github.com/apple/foundationdb/pull/2003>`_.
* Added ``system_kv_size_bytes`` to the ``cluster.data`` section to record the size of the system keyspace. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
Bindings
--------
@ -80,6 +82,11 @@ Bindings
* Added a transaction option to control the whether ``get_addresses_for_key`` includes a port in the address. This will be deprecated in api version 700, and addresses will include ports by default. [6.2.4] `(PR #2060) <https://github.com/apple/foundationdb/pull/2060>`_.
* Python: ``Versionstamp`` comparisons didn't work in Python 3. [6.2.4] `(PR #2089) <https://github.com/apple/foundationdb/pull/2089>`_.
Features
--------
* Added the ``cleanup`` command to ``fdbbackup`` which can be used to remove orphaned backups or DRs. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
Other Changes
-------------
@ -114,6 +121,9 @@ Fixes only impacting 6.2.0+
* The cluster controller would saturate its CPU for a few seconds when sending configuration information to all of the worker processes. [6.2.4] `(PR #2086) <https://github.com/apple/foundationdb/pull/2086>`_.
* The data distributor would build all possible team combinations if it was tracking an unhealthy server with less than 10 teams. [6.2.4] `(PR #2099) <https://github.com/apple/foundationdb/pull/2099>`_.
* The cluster controller could crash if a coordinator was unreachable when compiling cluster status. [6.2.4] `(PR #2065) <https://github.com/apple/foundationdb/pull/2065>`_.
* The cluster controller could crash if a coordinator was unreachable when compiling cluster status. [6.2.4] `(PR #2065) <https://github.com/apple/foundationdb/pull/2065>`_.
* A storage server could crash if it took longer than 10 minutes to fetch a key range from another server. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* Excluding or including servers would restart the data distributor. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
Earlier release notes
---------------------

View File

@ -77,7 +77,7 @@ enum enumProgramExe {
};
enum enumBackupType {
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_MODIFY, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_PAUSE, BACKUP_RESUME, BACKUP_EXPIRE, BACKUP_DELETE, BACKUP_DESCRIBE, BACKUP_LIST, BACKUP_DUMP
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_MODIFY, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_PAUSE, BACKUP_RESUME, BACKUP_EXPIRE, BACKUP_DELETE, BACKUP_DESCRIBE, BACKUP_LIST, BACKUP_DUMP, BACKUP_CLEANUP
};
enum enumDBType {
@ -95,7 +95,7 @@ enum {
OPT_EXPIRE_BEFORE_VERSION, OPT_EXPIRE_BEFORE_DATETIME, OPT_EXPIRE_DELETE_BEFORE_DAYS,
OPT_EXPIRE_RESTORABLE_AFTER_VERSION, OPT_EXPIRE_RESTORABLE_AFTER_DATETIME, OPT_EXPIRE_MIN_RESTORABLE_DAYS,
OPT_BASEURL, OPT_BLOB_CREDENTIALS, OPT_DESCRIBE_DEEP, OPT_DESCRIBE_TIMESTAMPS,
OPT_DUMP_BEGIN, OPT_DUMP_END, OPT_JSON,
OPT_DUMP_BEGIN, OPT_DUMP_END, OPT_JSON, OPT_DELETE_DATA, OPT_MIN_CLEANUP_SECONDS,
// Backup and Restore constants
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
@ -253,6 +253,7 @@ CSimpleOpt::SOption g_rgBackupStatusOptions[] = {
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_JSON, "--json", SO_NONE},
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -282,6 +283,37 @@ CSimpleOpt::SOption g_rgBackupAbortOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupCleanupOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_DELETE_DATA, "--delete_data", SO_NONE },
{ OPT_MIN_CLEANUP_SECONDS, "--min_cleanup_seconds", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -313,6 +345,7 @@ CSimpleOpt::SOption g_rgBackupDiscontinueOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -344,6 +377,7 @@ CSimpleOpt::SOption g_rgBackupWaitOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -371,6 +405,7 @@ CSimpleOpt::SOption g_rgBackupPauseOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -640,6 +675,7 @@ CSimpleOpt::SOption g_rgDBStartOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -673,6 +709,7 @@ CSimpleOpt::SOption g_rgDBStatusOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -705,6 +742,7 @@ CSimpleOpt::SOption g_rgDBSwitchOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -737,6 +775,7 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -766,6 +805,7 @@ CSimpleOpt::SOption g_rgDBPauseOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -1186,6 +1226,7 @@ enumBackupType getBackupType(std::string backupType)
values["start"] = BACKUP_START;
values["status"] = BACKUP_STATUS;
values["abort"] = BACKUP_ABORT;
values["cleanup"] = BACKUP_CLEANUP;
values["wait"] = BACKUP_WAIT;
values["discontinue"] = BACKUP_DISCONTINUE;
values["pause"] = BACKUP_PAUSE;
@ -1863,6 +1904,21 @@ ACTOR Future<Void> abortBackup(Database db, std::string tagName) {
return Void();
}
ACTOR Future<Void> cleanupMutations(Database db, bool deleteData) {
try
{
wait(cleanupBackup(db, deleteData));
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
ACTOR Future<Void> waitBackup(Database db, std::string tagName, bool stopWhenDone) {
try
{
@ -2540,6 +2596,9 @@ int main(int argc, char* argv[]) {
case BACKUP_ABORT:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupAbortOptions, SO_O_EXACT);
break;
case BACKUP_CLEANUP:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupCleanupOptions, SO_O_EXACT);
break;
case BACKUP_WAIT:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupWaitOptions, SO_O_EXACT);
break;
@ -2712,6 +2771,7 @@ int main(int argc, char* argv[]) {
std::string restoreClusterFileDest;
std::string restoreClusterFileOrig;
bool jsonOutput = false;
bool deleteData = false;
BackupModifyOptions modifyOptions;
@ -2791,6 +2851,12 @@ int main(int argc, char* argv[]) {
case OPT_DRYRUN:
dryRun = true;
break;
case OPT_DELETE_DATA:
deleteData = true;
break;
case OPT_MIN_CLEANUP_SECONDS:
knobs.push_back( std::make_pair( "min_cleanup_seconds", args->OptionArg() ) );
break;
case OPT_FORCE:
forceAction = true;
break;
@ -3354,6 +3420,12 @@ int main(int argc, char* argv[]) {
f = stopAfter( abortBackup(db, tagName) );
break;
case BACKUP_CLEANUP:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( cleanupMutations(db, deleteData) );
break;
case BACKUP_WAIT:
if(!initCluster())
return FDB_EXIT_ERROR;

View File

@ -485,7 +485,7 @@ bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key);
Version getVersionFromString(std::string const& value);
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize = CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid);
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Key getApplyKey( Version version, Key backupUid );
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key);
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value);
@ -503,6 +503,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key removePrefix, Version beginVersion,
Version* endVersion, RequestStream<CommitTransactionRequest> commit,
NotifiedVersion* committedVersion, Reference<KeyRangeMap<Version>> keyVersion);
ACTOR Future<Void> cleanupBackup(Database cx, bool deleteData);
typedef BackupAgentBase::enumState EBackupState;
template<> inline Tuple Codec<EBackupState>::pack(EBackupState const &val) { return Tuple().append(val); }

View File

@ -708,7 +708,7 @@ ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key remov
}
}
ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
ACTOR static Future<Void> _eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Key backupLatestVersionsKey = logUidValue.withPrefix(backupLatestVersionsPath);
@ -716,104 +716,189 @@ ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUi
return Void();
}
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (checkBackupUid) {
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
currBeginVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
break;
}
}
// Do not clear anything if version history key cannot be found
if (currBeginVersion == invalidVersion) {
return Void();
}
state Version currEndVersion = std::numeric_limits<Version>::max();
if(endVersion.present()) {
currEndVersion = std::min(currEndVersion, endVersion.get());
}
state Version nextSmallestVersion = currEndVersion;
bool clearLogRangesRequired = true;
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > currBeginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
}
}
}
if (endVersion.present() || backupVersions.size() != 1 || BUGGIFY) {
if (!endVersion.present()) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
if(backupVersions.size() == 1) {
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(currEndVersion, Unversioned()));
}
// Clear log ranges if needed
if (clearLogRangesRequired) {
if((nextSmallestVersion - currBeginVersion) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE >= std::numeric_limits<uint8_t>::max() || BUGGIFY) {
Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin);
for(int h = 0; h <= std::numeric_limits<uint8_t>::max(); h++) {
uint64_t bv = bigEndian64(Version(0));
uint64_t ev = bigEndian64(nextSmallestVersion);
uint8_t h1 = h;
Key vblockPrefix = StringRef(&h1, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
tr->clear(KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
} else {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
} else {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
return Void();
}
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
return _eraseLogData(tr, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid);
}
ACTOR Future<Void> cleanupLogMutations(Database cx, Value destUidValue, bool deleteData) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop{
state Optional<Key> removingLogUid;
state std::set<Key> loggedLogUids;
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (checkBackupUid) {
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
state Version readVer = tr->getReadVersion().get();
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
currBeginVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
break;
state Version minVersion = std::numeric_limits<Version>::max();
state Key minVersionLogUid;
state int backupIdx = 0;
for (; backupIdx < backupVersions.size(); backupIdx++) {
state Version currVersion = BinaryReader::fromStringRef<Version>(backupVersions[backupIdx].value, Unversioned());
state Key currLogUid = backupVersions[backupIdx].key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if( currVersion < minVersion ) {
minVersionLogUid = currLogUid;
minVersion = currVersion;
}
}
// Do not clear anything if version history key cannot be found
if (currBeginVersion == invalidVersion) {
return Void();
}
if(!loggedLogUids.count(currLogUid)) {
state Future<Optional<Value>> foundDRKey = tr->get(Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(currLogUid).pack(DatabaseBackupAgent::keyStateStatus));
state Future<Optional<Value>> foundBackupKey = tr->get(Subspace(currLogUid.withPrefix(LiteralStringRef("uid->config/")).withPrefix(fileBackupPrefixRange.begin)).pack(LiteralStringRef("stateEnum")));
wait(success(foundDRKey) && success(foundBackupKey));
state Version currEndVersion = currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
if(endVersion.present()) {
currEndVersion = std::min(currEndVersion, endVersion.get());
}
state Version nextSmallestVersion = currEndVersion;
bool clearLogRangesRequired = true;
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > currBeginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
if(foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("WARNING: Found a tag which looks like both a backup and a DR. This tag was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(foundDRKey.get().present() && !foundBackupKey.get().present()) {
printf("Found a DR which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(!foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("Found a Backup which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
printf("WARNING: Found a unknown tag which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
loggedLogUids.insert(currLogUid);
}
}
if (!endVersion.present() && backupVersions.size() == 1) {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
if( readVer - minVersion > CLIENT_KNOBS->MIN_CLEANUP_SECONDS*CLIENT_KNOBS->CORE_VERSIONSPERSECOND && deleteData && (!removingLogUid.present() || minVersionLogUid == removingLogUid.get()) ) {
removingLogUid = minVersionLogUid;
wait(eraseLogData(tr, minVersionLogUid, destUidValue));
wait(tr->commit());
printf("\nSuccessfully removed the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(removingLogUid.present() && minVersionLogUid != removingLogUid.get()) {
printf("\nWARNING: The oldest tag was possibly removed, run again without `--delete_data' to check.\n");
} else if( deleteData ) {
printf("\nWARNING: Did not delete data because the tag was not at least %.4f hours behind. Change `--min_cleanup_seconds' to adjust this threshold.\n", CLIENT_KNOBS->MIN_CLEANUP_SECONDS/3600.0);
} else {
if (!endVersion.present() && currEndVersion >= nextSmallestVersion) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(currEndVersion, Unversioned()));
}
printf("\nPassing `--delete_data' would delete the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
// Clear log ranges if needed
if (clearLogRangesRequired) {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
wait(tr->commit());
if (!endVersion.present() && (backupVersions.size() == 1 || currEndVersion >= nextSmallestVersion)) {
return Void();
}
if(endVersion.present() && currEndVersion == endVersion.get()) {
return Void();
}
tr->reset();
} catch (Error &e) {
return Void();
} catch( Error& e) {
wait(tr->onError(e));
}
}
}
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
return _eraseLogData(cx, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid);
ACTOR Future<Void> cleanupBackup(Database cx, bool deleteData) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Standalone<RangeResultRef> destUids = wait(tr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
for(auto destUid : destUids) {
wait(cleanupLogMutations(cx, destUid.value, deleteData));
}
return Void();
} catch( Error& e) {
wait(tr->onError(e));
}
}
}

View File

@ -482,11 +482,17 @@ namespace dbBackup {
wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
wait(eraseLogData(taskBucket->src, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], Optional<Version>(endVersion), true, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
return Void();
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(taskBucket->src));
loop {
try {
Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
wait(eraseLogData(tr, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], Optional<Version>(endVersion), true, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
wait(tr->commit());
return Void();
} catch( Error &e ) {
wait(tr->onError(e));
}
}
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version endVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
@ -833,8 +839,7 @@ namespace dbBackup {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(taskBucket->src));
state Key logUidValue = task->params[DatabaseBackupAgent::keyConfigLogUid];
state Key destUidValue = task->params[BackupAgentBase::destUid];
state Version beginVersion;
state Version endVersion;
state Version backupUid = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned());
loop {
try {
@ -844,25 +849,13 @@ namespace dbBackup {
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
return Void();
state Key latestVersionKey = logUidValue.withPrefix(task->params[BackupAgentBase::destUid].withPrefix(backupLatestVersionsPrefix));
state Optional<Key> bVersion = wait(tr->get(latestVersionKey));
if (!bVersion.present()) {
return Void();
}
beginVersion = BinaryReader::fromStringRef<Version>(bVersion.get(), Unversioned());
endVersion = tr->getReadVersion().get();
break;
wait(eraseLogData(tr, logUidValue, destUidValue, Optional<Version>(), true, backupUid));
wait(tr->commit());
return Void();
} catch(Error &e) {
wait(tr->onError(e));
}
}
Version backupUid = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned());
wait(eraseLogData(taskBucket->src, logUidValue, destUidValue, Optional<Version>(), true, backupUid));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
@ -2179,22 +2172,23 @@ public:
}
}
if(partial)
return Void();
state Future<Void> partialTimeout = partial ? delay(30.0) : Never();
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src));
state Version beginVersion;
state Version endVersion;
state bool clearSrcDb = true;
loop {
try {
srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> v = wait( srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) ) );
state Future<Optional<Value>> backupVersionF = srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) );
wait(success(backupVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned())) {
clearSrcDb = false;
if(backupVersionF.get().present() && BinaryReader::fromStringRef<Version>(backupVersionF.get().get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned())) {
break;
}
@ -2208,18 +2202,31 @@ public:
Key latestVersionKey = logUidValue.withPrefix(destUidValue.withPrefix(backupLatestVersionsPrefix));
Optional<Key> bVersion = wait(srcTr->get(latestVersionKey));
if (bVersion.present()) {
beginVersion = BinaryReader::fromStringRef<Version>(bVersion.get(), Unversioned());
state Future<Optional<Key>> bVersionF = srcTr->get(latestVersionKey);
wait(success(bVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
if (bVersionF.get().present()) {
beginVersion = BinaryReader::fromStringRef<Version>(bVersionF.get().get(), Unversioned());
} else {
clearSrcDb = false;
break;
}
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) ));
srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid );
wait( eraseLogData(srcTr, logUidValue, destUidValue) || partialTimeout );
if(partialTimeout.isReady()) {
return Void();
}
wait(srcTr->commit());
wait(srcTr->commit() || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
endVersion = srcTr->getCommittedVersion() + 1;
break;
@ -2229,10 +2236,6 @@ public:
}
}
if (clearSrcDb && !abortOldBackup) {
wait(eraseLogData(backupAgent->taskBucket->src, logUidValue, destUidValue));
}
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
loop {
try {

View File

@ -1988,6 +1988,7 @@ namespace fileBackup {
const uint32_t BackupLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
//This task stopped being used in 6.2, however the code remains here to handle upgrades.
struct EraseLogRangeTaskFunc : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
@ -2005,21 +2006,6 @@ namespace fileBackup {
}
} Params;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
state Version endVersion = Params.endVersion().get(task);
state Key destUidValue = Params.destUidValue().get(task);
state BackupConfig config(task);
state Key logUidValue = config.getUidAsKey();
wait(eraseLogData(cx, logUidValue, destUidValue, endVersion != 0 ? Optional<Version>(endVersion) : Optional<Version>()));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, UID logUid, TaskCompletionKey completionKey, Key destUidValue, Version endVersion = 0, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(EraseLogRangeTaskFunc::name,
EraseLogRangeTaskFunc::version,
@ -2036,16 +2022,23 @@ namespace fileBackup {
return key;
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task));
wait(checkTaskVersion(tr->getDatabase(), task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
state Version endVersion = Params.endVersion().get(task);
state Key destUidValue = Params.destUidValue().get(task);
state BackupConfig config(task);
state Key logUidValue = config.getUidAsKey();
wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task) && eraseLogData(tr, logUidValue, destUidValue, endVersion != 0 ? Optional<Version>(endVersion) : Optional<Version>()));
return Void();
}
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef EraseLogRangeTaskFunc::name = LiteralStringRef("file_backup_erase_logs_5.2");
@ -2132,7 +2125,7 @@ namespace fileBackup {
// Do not erase at the first time
if (prevBeginVersion > 0) {
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait(success(EraseLogRangeTaskFunc::addTask(tr, taskBucket, config.getUid(), TaskCompletionKey::joinWith(logDispatchBatchFuture), destUidValue, beginVersion)));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue, Optional<Version>(beginVersion)) );
}
wait(taskBucket->finish(tr, task));
@ -2183,7 +2176,7 @@ namespace fileBackup {
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
state Key destUidValue = wait(backup.destUidValue().getOrThrow(tr));
wait(success(EraseLogRangeTaskFunc::addTask(tr, taskBucket, backup.getUid(), TaskCompletionKey::noSignal(), destUidValue)));
wait( eraseLogData(tr, backup.getUidAsKey(), destUidValue) );
backup.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
@ -3820,8 +3813,7 @@ public:
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait(success(tr->getReadVersion()));
wait(success(fileBackup::EraseLogRangeTaskFunc::addTask(tr, backupAgent->taskBucket, config.getUid(), TaskCompletionKey::noSignal(), destUidValue)));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue) );
config.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
@ -3860,8 +3852,8 @@ public:
// Cancel backup task through tag
wait(tag.cancel(tr));
wait(success(fileBackup::EraseLogRangeTaskFunc::addTask(tr, backupAgent->taskBucket, config.getUid(), TaskCompletionKey::noSignal(), destUidValue)));
wait(eraseLogData(tr, config.getUidAsKey(), destUidValue));
config.stateEnum().set(tr, EBackupState::STATE_ABORTED);

View File

@ -145,7 +145,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BACKUP_ERROR_DELAY, 10.0 );
init( BACKUP_STATUS_DELAY, 40.0 );
init( BACKUP_STATUS_JITTER, 0.05 );
init( CLEAR_LOG_RANGE_COUNT, 1500); // transaction size / (size of '\xff\x02/blog/' + size of UID + size of hash result) = 200,000 / (8 + 16 + 8)
init( MIN_CLEANUP_SECONDS, 3600.0 );
// Configuration
init( DEFAULT_AUTO_PROXIES, 3 );

View File

@ -131,7 +131,6 @@ public:
int BACKUP_COPY_TASKS;
int BACKUP_BLOCK_SIZE;
int BACKUP_TASKS_PER_AGENT;
int CLEAR_LOG_RANGE_COUNT;
int SIM_BACKUP_TASKS_PER_AGENT;
int BACKUP_RANGEFILE_BLOCK_SIZE;
int BACKUP_LOGFILE_BLOCK_SIZE;
@ -147,6 +146,7 @@ public:
double BACKUP_ERROR_DELAY;
double BACKUP_STATUS_DELAY;
double BACKUP_STATUS_JITTER;
double MIN_CLEANUP_SECONDS;
// Configuration
int32_t DEFAULT_AUTO_PROXIES;

View File

@ -1208,8 +1208,6 @@ ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) ); //To conflict with parallel includeServers
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
tr.set( moveKeysLockOwnerKey, versionKey );
tr.set( excludedServersVersionKey, excludeVersionKey );
for(auto& s : servers)
tr.set( encodeExcludedServersKey(s), StringRef() );
@ -1240,9 +1238,6 @@ ACTOR Future<Void> includeServers( Database cx, vector<AddressExclusion> servers
// includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY
tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY );
tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) );
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
tr.set( moveKeysLockOwnerKey, versionKey );
tr.set( excludedServersVersionKey, excludeVersionKey );
for(auto& s : servers ) {

View File

@ -603,6 +603,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"max_machine_failures_without_losing_availability":0,
"total_disk_used_bytes":0,
"total_kv_size_bytes":0,
"system_kv_size_bytes":0,
"partitions_count":2,
"moving_data":{
"total_written_bytes":0,

View File

@ -2558,7 +2558,7 @@ ACTOR Future<Void> machineTeamRemover(DDTeamCollection* self) {
}
// To avoid removing machine teams too fast, which is unlikely happen though
wait( delay(SERVER_KNOBS->TR_REMOVE_MACHINE_TEAM_DELAY) );
wait( delay(SERVER_KNOBS->TR_REMOVE_MACHINE_TEAM_DELAY, TaskPriority::DataDistribution) );
wait(waitUntilHealthy(self));
// Wait for the badTeamRemover() to avoid the potential race between adding the bad team (add the team tracker)
@ -2681,7 +2681,7 @@ ACTOR Future<Void> serverTeamRemover(DDTeamCollection* self) {
removeServerTeamDelay = removeServerTeamDelay / 100;
}
// To avoid removing server teams too fast, which is unlikely happen though
wait(delay(removeServerTeamDelay));
wait(delay(removeServerTeamDelay, TaskPriority::DataDistribution));
wait(waitUntilHealthy(self, SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_EXTRA_DELAY));
// Wait for the badTeamRemover() to avoid the potential race between
@ -3064,7 +3064,7 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
}
ACTOR Future<Void> waitServerListChange( DDTeamCollection* self, FutureStream<Void> serverRemoved ) {
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY, TaskPriority::DataDistributionLaunch);
state Future<vector<std::pair<StorageServerInterface, ProcessClass>>> serverListAndProcessClasses = Never();
state bool isFetchingResults = false;
state Transaction tr(self->cx);
@ -3102,7 +3102,7 @@ ACTOR Future<Void> waitServerListChange( DDTeamCollection* self, FutureStream<Vo
}
tr = Transaction(self->cx);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY, TaskPriority::DataDistributionLaunch);
}
when( waitNext( serverRemoved ) ) {
if( isFetchingResults ) {
@ -3136,7 +3136,7 @@ ACTOR Future<Void> waitHealthyZoneChange( DDTeamCollection* self ) {
healthyZoneTimeout = Never();
} else if (p.second > tr.getReadVersion().get()) {
double timeoutSeconds = (p.second - tr.getReadVersion().get())/(double)SERVER_KNOBS->VERSIONS_PER_SECOND;
healthyZoneTimeout = delay(timeoutSeconds);
healthyZoneTimeout = delay(timeoutSeconds, TaskPriority::DataDistribution);
if(self->healthyZone.get() != p.first) {
TraceEvent("MaintenanceZoneStart", self->distributorId).detail("ZoneID", printable(p.first)).detail("EndVersion", p.second).detail("Duration", timeoutSeconds);
self->healthyZone.set(p.first);
@ -3591,7 +3591,7 @@ ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self) {
loop {
try {
wait(delay(SERVER_KNOBS->DD_CHECK_INVALID_LOCALITY_DELAY));
wait(delay(SERVER_KNOBS->DD_CHECK_INVALID_LOCALITY_DELAY, TaskPriority::DataDistribution));
// Because worker's processId can be changed when its locality is changed, we cannot watch on the old
// processId; This actor is inactive most time, so iterating all workers incurs little performance overhead.
@ -3770,7 +3770,7 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
}
when( wait( self->restartRecruiting.onTrigger() ) ) {}
}
wait( delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY) );
wait( delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::DataDistribution) );
} catch( Error &e ) {
if(e.code() != error_code_timed_out) {
throw;
@ -3830,7 +3830,7 @@ ACTOR Future<Void> remoteRecovered( Reference<AsyncVar<struct ServerDBInfo>> db
ACTOR Future<Void> monitorHealthyTeams( DDTeamCollection* self ) {
loop choose {
when ( wait(self->zeroHealthyTeams->get() ? delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY) : Never()) ) {
when ( wait(self->zeroHealthyTeams->get() ? delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistribution) : Never()) ) {
self->doBuildTeams = true;
wait( DDTeamCollection::checkBuildTeams(self) );
}

View File

@ -69,6 +69,7 @@ struct DataDistributionTracker {
KeyRangeMap< ShardTrackedData > shards;
ActorCollection sizeChanges;
int64_t systemSizeEstimate;
Reference<AsyncVar<int64_t>> dbSizeEstimate;
Reference<AsyncVar<Optional<int64_t>>> maxShardSize;
Future<Void> maxShardSizeUpdater;
@ -81,7 +82,7 @@ struct DataDistributionTracker {
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), distributorId( distributorId ), dbSizeEstimate( new AsyncVar<int64_t>() ),
: cx(cx), distributorId( distributorId ), dbSizeEstimate( new AsyncVar<int64_t>() ), systemSizeEstimate(0),
maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
@ -138,8 +139,7 @@ int64_t getMaxShardSize( double dbSizeEstimate ) {
ACTOR Future<Void> trackShardBytes(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
bool addToSizeEstimate = true)
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize)
{
wait( delay( 0, TaskPriority::DataDistribution ) );
@ -203,8 +203,12 @@ ACTOR Future<Void> trackShardBytes(
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
.detail("TrackerID", trackerID);*/
if( shardSize->get().present() && addToSizeEstimate )
if( shardSize->get().present() ) {
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardSize->get().get().bytes );
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate += metrics.bytes - shardSize->get().get().bytes;
}
}
shardSize->set( metrics );
}
@ -256,8 +260,13 @@ ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<StorageMetrics>>
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRangeRef keys, int64_t oldShardsEndingSize ) {
state vector<Future<int64_t>> sizes;
state vector<Future<int64_t>> systemSizes;
for (auto it : self->shards.intersectingRanges(keys) ) {
sizes.push_back( getFirstSize( it->value().stats ) );
Future<int64_t> thisSize = getFirstSize( it->value().stats );
sizes.push_back( thisSize );
if(it->range().begin >= systemKeys.begin) {
systemSizes.push_back( thisSize );
}
}
wait( waitForAll( sizes ) );
@ -267,12 +276,20 @@ ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRangeRef keys,
for ( int i = 0; i < sizes.size(); i++ )
newShardsStartingSize += sizes[i].get();
int64_t newSystemShardsStartingSize = 0;
for ( int i = 0; i < systemSizes.size(); i++ )
newSystemShardsStartingSize += systemSizes[i].get();
int64_t totalSizeEstimate = self->dbSizeEstimate->get();
/*TraceEvent("TrackerChangeSizes")
.detail("TotalSizeEstimate", totalSizeEstimate)
.detail("EndSizeOfOldShards", oldShardsEndingSize)
.detail("StartingSizeOfNewShards", newShardsStartingSize);*/
self->dbSizeEstimate->set( totalSizeEstimate + newShardsStartingSize - oldShardsEndingSize );
self->systemSizeEstimate += newSystemShardsStartingSize;
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate -= oldShardsEndingSize;
}
return Void();
}
@ -641,7 +658,7 @@ ACTOR Future<Void> fetchShardMetrics_impl( DataDistributionTracker* self, GetMet
ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsRequest req ) {
choose {
when( wait( fetchShardMetrics_impl( self, req ) ) ) {}
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT ) ) ) {
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT, TaskPriority::DataDistribution ) ) ) {
TEST(true); // DD_SHARD_METRICS_TIMEOUT
StorageMetrics largeMetrics;
largeMetrics.bytes = SERVER_KNOBS->MAX_SHARD_BYTES;
@ -676,6 +693,7 @@ ACTOR Future<Void> dataDistributionTracker(
TraceEvent("DDTrackerStats", self.distributorId)
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest( "DDTrackerStats" );
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL);

View File

@ -697,7 +697,7 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
conf->fromKeyValues( (VectorRef<KeyValueRef>) results );
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey);
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey);
wait( tr.commit() );
wait( watchFuture );
break;

View File

@ -1328,6 +1328,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
if (dataStats.size())
{
statusObjData.setKeyRawNumber("total_kv_size_bytes",dataStats.getValue("TotalSizeBytes"));
statusObjData.setKeyRawNumber("system_kv_size_bytes",dataStats.getValue("SystemSizeBytes"));
statusObjData.setKeyRawNumber("partitions_count",dataStats.getValue("Shards"));
}

View File

@ -1213,7 +1213,7 @@ ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
self->registrationTrigger.trigger();
}
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey);
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey);
wait(tr.commit());
wait(watchFuture);
break;

View File

@ -106,6 +106,7 @@ struct AddingShard : NonCopyable {
Version transferredVersion;
enum Phase { WaitPrevious, Fetching, Waiting };
Phase phase;
AddingShard( StorageServer* server, KeyRangeRef const& keys );
@ -1948,8 +1949,9 @@ void splitMutation(StorageServer* data, KeyRangeMap<T>& map, MutationRef const&
ACTOR Future<Void> logFetchKeysWarning(AddingShard* shard) {
state double startTime = now();
loop {
wait(delay(600));
TraceEvent(SevWarnAlways, "FetchKeysTooLong").detail("Duration", now() - startTime).detail("Phase", shard->phase).detail("Begin", shard->keys.begin.printable()).detail("End", shard->keys.end.printable());
state double waitSeconds = BUGGIFY ? 5.0 : 600.0;
wait(delay(waitSeconds));
TraceEvent(waitSeconds > 300.0 ? SevWarnAlways : SevInfo, "FetchKeysTooLong").detail("Duration", now() - startTime).detail("Phase", shard->phase).detail("Begin", shard->keys.begin.printable()).detail("End", shard->keys.end.printable());
}
}
@ -2068,6 +2070,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
shard = data->shards.rangeContaining( keys.begin ).value()->adding;
warningLogger = logFetchKeysWarning(shard);
AddingShard* otherShard = data->shards.rangeContaining( nfk ).value()->adding;
keys = shard->keys;