Merge pull request #3813 from xumengpanda/mengxu/merge-to-master-PR
Merge 6.3 to master and resolve conflicts
This commit is contained in:
commit
21071b6214
|
@ -78,8 +78,9 @@ type Subspace interface {
|
|||
// FoundationDB keys (corresponding to the prefix of this Subspace).
|
||||
fdb.KeyConvertible
|
||||
|
||||
// All Subspaces implement fdb.ExactRange and fdb.Range, and describe all
|
||||
// keys logically in this Subspace.
|
||||
// All Subspaces implement fdb.ExactRange and fdb.Range, and describe all
|
||||
// keys strictly within the subspace that encode tuples. Specifically,
|
||||
// this will include all keys in [prefix + '\x00', prefix + '\xff').
|
||||
fdb.ExactRange
|
||||
}
|
||||
|
||||
|
|
|
@ -10,38 +10,38 @@ macOS
|
|||
|
||||
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
|
||||
|
||||
* `FoundationDB-6.3.5.pkg <https://www.foundationdb.org/downloads/6.3.5/macOS/installers/FoundationDB-6.3.5.pkg>`_
|
||||
* `FoundationDB-6.3.6.pkg <https://www.foundationdb.org/downloads/6.3.6/macOS/installers/FoundationDB-6.3.6.pkg>`_
|
||||
|
||||
Ubuntu
|
||||
------
|
||||
|
||||
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
|
||||
|
||||
* `foundationdb-clients-6.3.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.5/ubuntu/installers/foundationdb-clients_6.3.5-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.3.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.5/ubuntu/installers/foundationdb-server_6.3.5-1_amd64.deb>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.3.6-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.6/ubuntu/installers/foundationdb-clients_6.3.6-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.3.6-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.6/ubuntu/installers/foundationdb-server_6.3.6-1_amd64.deb>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL6
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
|
||||
|
||||
* `foundationdb-clients-6.3.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel6/installers/foundationdb-clients-6.3.5-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.3.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel6/installers/foundationdb-server-6.3.5-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.3.6-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.6/rhel6/installers/foundationdb-clients-6.3.6-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.3.6-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.6/rhel6/installers/foundationdb-server-6.3.6-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL7
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
|
||||
|
||||
* `foundationdb-clients-6.3.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel7/installers/foundationdb-clients-6.3.5-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.3.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel7/installers/foundationdb-server-6.3.5-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.3.6-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.6/rhel7/installers/foundationdb-clients-6.3.6-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.3.6-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.6/rhel7/installers/foundationdb-server-6.3.6-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
Windows
|
||||
-------
|
||||
|
||||
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
|
||||
|
||||
* `foundationdb-6.3.5-x64.msi <https://www.foundationdb.org/downloads/6.3.5/windows/installers/foundationdb-6.3.5-x64.msi>`_
|
||||
* `foundationdb-6.3.6-x64.msi <https://www.foundationdb.org/downloads/6.3.6/windows/installers/foundationdb-6.3.6-x64.msi>`_
|
||||
|
||||
API Language Bindings
|
||||
=====================
|
||||
|
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
|
|||
|
||||
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
|
||||
|
||||
* `foundationdb-6.3.5.tar.gz <https://www.foundationdb.org/downloads/6.3.5/bindings/python/foundationdb-6.3.5.tar.gz>`_
|
||||
* `foundationdb-6.3.6.tar.gz <https://www.foundationdb.org/downloads/6.3.6/bindings/python/foundationdb-6.3.6.tar.gz>`_
|
||||
|
||||
Ruby 1.9.3/2.0.0+
|
||||
-----------------
|
||||
|
||||
* `fdb-6.3.5.gem <https://www.foundationdb.org/downloads/6.3.5/bindings/ruby/fdb-6.3.5.gem>`_
|
||||
* `fdb-6.3.6.gem <https://www.foundationdb.org/downloads/6.3.6/bindings/ruby/fdb-6.3.6.gem>`_
|
||||
|
||||
Java 8+
|
||||
-------
|
||||
|
||||
* `fdb-java-6.3.5.jar <https://www.foundationdb.org/downloads/6.3.5/bindings/java/fdb-java-6.3.5.jar>`_
|
||||
* `fdb-java-6.3.5-javadoc.jar <https://www.foundationdb.org/downloads/6.3.5/bindings/java/fdb-java-6.3.5-javadoc.jar>`_
|
||||
* `fdb-java-6.3.6.jar <https://www.foundationdb.org/downloads/6.3.6/bindings/java/fdb-java-6.3.6.jar>`_
|
||||
* `fdb-java-6.3.6-javadoc.jar <https://www.foundationdb.org/downloads/6.3.6/bindings/java/fdb-java-6.3.6-javadoc.jar>`_
|
||||
|
||||
Go 1.11+
|
||||
--------
|
||||
|
|
|
@ -2,11 +2,9 @@
|
|||
Release Notes
|
||||
#############
|
||||
|
||||
6.3.5
|
||||
6.3.6
|
||||
=====
|
||||
|
||||
* Report missing old tlogs information when in recovery before storage servers are fully recovered. `(PR #3706) <https://github.com/apple/foundationdb/pull/3706>`_
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
|
@ -110,6 +108,8 @@ Other Changes
|
|||
* Updated boost to 1.72. `(PR #2684) <https://github.com/apple/foundationdb/pull/2684>`_
|
||||
* Calling ``fdb_run_network`` multiple times in a single run of a client program now returns an error instead of causing undefined behavior. [6.3.1] `(PR #3229) <https://github.com/apple/foundationdb/pull/3229>`_
|
||||
* Blob backup URL parameter ``request_timeout`` changed to ``request_timeout_min``, with prior name still supported. `(PR #3533) <https://github.com/apple/foundationdb/pull/3533>`_
|
||||
* Support query command in backup CLI that allows users to query restorable files by key ranges. [6.3.6] `(PR #3703) <https://github.com/apple/foundationdb/pull/3703>`_
|
||||
* Report missing old tlogs information when in recovery before storage servers are fully recovered. [6.3.6] `(PR #3706) <https://github.com/apple/foundationdb/pull/3706>`_
|
||||
|
||||
Fixes from previous versions
|
||||
----------------------------
|
||||
|
@ -126,6 +126,7 @@ Fixes only impacting 6.3.0+
|
|||
* Refreshing TLS certificates could cause crashes. [6.3.2] `(PR #3352) <https://github.com/apple/foundationdb/pull/3352>`_
|
||||
* All storage class processes attempted to connect to the same coordinator. [6.3.2] `(PR #3361) <https://github.com/apple/foundationdb/pull/3361>`_
|
||||
* Adjusted the proxy load balancing algorithm to be based on the CPU usage of the process instead of the number of requests processed. [6.3.5] `(PR #3653) <https://github.com/apple/foundationdb/pull/3653>`_
|
||||
* Only return the error code ``batch_transaction_throttled`` for API versions greater than or equal to 630. [6.3.6] `(PR #3799) <https://github.com/apple/foundationdb/pull/3799>`_
|
||||
|
||||
Earlier release notes
|
||||
---------------------
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/JsonBuilder.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/Trace.h"
|
||||
#define BOOST_DATE_TIME_NO_LIB
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
|
||||
|
@ -81,7 +85,22 @@ enum enumProgramExe {
|
|||
};
|
||||
|
||||
enum enumBackupType {
|
||||
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_MODIFY, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_PAUSE, BACKUP_RESUME, BACKUP_EXPIRE, BACKUP_DELETE, BACKUP_DESCRIBE, BACKUP_LIST, BACKUP_DUMP, BACKUP_CLEANUP
|
||||
BACKUP_UNDEFINED = 0,
|
||||
BACKUP_START,
|
||||
BACKUP_MODIFY,
|
||||
BACKUP_STATUS,
|
||||
BACKUP_ABORT,
|
||||
BACKUP_WAIT,
|
||||
BACKUP_DISCONTINUE,
|
||||
BACKUP_PAUSE,
|
||||
BACKUP_RESUME,
|
||||
BACKUP_EXPIRE,
|
||||
BACKUP_DELETE,
|
||||
BACKUP_DESCRIBE,
|
||||
BACKUP_LIST,
|
||||
BACKUP_QUERY,
|
||||
BACKUP_DUMP,
|
||||
BACKUP_CLEANUP
|
||||
};
|
||||
|
||||
enum enumDBType {
|
||||
|
@ -121,6 +140,7 @@ enum {
|
|||
OPT_TAGNAME,
|
||||
OPT_BACKUPKEYS,
|
||||
OPT_WAITFORDONE,
|
||||
OPT_BACKUPKEYS_FILTER,
|
||||
OPT_INCREMENTALONLY,
|
||||
|
||||
// Backup Modify
|
||||
|
@ -624,6 +644,40 @@ CSimpleOpt::SOption g_rgBackupListOptions[] = {
|
|||
SO_END_OF_OPTIONS
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgBackupQueryOptions[] = {
|
||||
#ifdef _WIN32
|
||||
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
|
||||
#endif
|
||||
{ OPT_RESTORE_TIMESTAMP, "--query_restore_timestamp", SO_REQ_SEP },
|
||||
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
|
||||
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
|
||||
{ OPT_RESTORE_VERSION, "-qrv", SO_REQ_SEP },
|
||||
{ OPT_RESTORE_VERSION, "--query_restore_version", SO_REQ_SEP },
|
||||
{ OPT_BACKUPKEYS_FILTER, "-k", SO_REQ_SEP },
|
||||
{ OPT_BACKUPKEYS_FILTER, "--keys", SO_REQ_SEP },
|
||||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
{ OPT_VERSION, "-v", SO_NONE },
|
||||
{ OPT_VERSION, "--version", SO_NONE },
|
||||
{ OPT_CRASHONERROR, "--crash", SO_NONE },
|
||||
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
|
||||
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
|
||||
{ OPT_HELP, "-?", SO_NONE },
|
||||
{ OPT_HELP, "-h", SO_NONE },
|
||||
{ OPT_HELP, "--help", SO_NONE },
|
||||
{ OPT_DEVHELP, "--dev-help", SO_NONE },
|
||||
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
|
||||
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
|
||||
#ifndef TLS_DISABLED
|
||||
TLS_OPTION_FLAGS
|
||||
#endif
|
||||
SO_END_OF_OPTIONS
|
||||
};
|
||||
|
||||
// g_rgRestoreOptions is used by fdbrestore and fastrestore_tool
|
||||
CSimpleOpt::SOption g_rgRestoreOptions[] = {
|
||||
#ifdef _WIN32
|
||||
|
@ -959,13 +1013,16 @@ void printBackupContainerInfo() {
|
|||
|
||||
static void printBackupUsage(bool devhelp) {
|
||||
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
|
||||
printf("Usage: %s (start | status | abort | wait | discontinue | pause | resume | expire | delete | describe | list | cleanup) [OPTIONS]\n\n", exeBackup.toString().c_str());
|
||||
printf("Usage: %s (start | status | abort | wait | discontinue | pause | resume | expire | delete | describe | "
|
||||
"list | query | cleanup) [OPTIONS]\n\n",
|
||||
exeBackup.toString().c_str());
|
||||
printf(" -C CONNFILE The path of a file containing the connection string for the\n"
|
||||
" FoundationDB cluster. The default is first the value of the\n"
|
||||
" FDB_CLUSTER_FILE environment variable, then `./fdb.cluster',\n"
|
||||
" then `%s'.\n", platform::getDefaultClusterFilePath().c_str());
|
||||
printf(" -d, --destcontainer URL\n"
|
||||
" The Backup container URL for start, modify, describe, expire, and delete operations.\n");
|
||||
" The Backup container URL for start, modify, describe, query, expire, and delete "
|
||||
"operations.\n");
|
||||
printBackupContainerInfo();
|
||||
printf(" -b, --base_url BASEURL\n"
|
||||
" Base backup URL for list operations. This looks like a Backup URL but without a backup name.\n");
|
||||
|
@ -979,6 +1036,12 @@ static void printBackupUsage(bool devhelp) {
|
|||
printf(" --delete_before_days NUM_DAYS\n"
|
||||
" Another way to specify version cutoff for expire operations. Deletes data files containing no data at or after a\n"
|
||||
" version approximately NUM_DAYS days worth of versions prior to the latest log version in the backup.\n");
|
||||
printf(" -qrv --query_restore_version VERSION\n"
|
||||
" For query operations, set target version for restoring a backup. Set -1 for maximum\n"
|
||||
" restorable version (default) and -2 for minimum restorable version.\n");
|
||||
printf(" --query_restore_timestamp DATETIME\n"
|
||||
" For query operations, instead of a numeric version, use this to specify a timestamp in %s\n", BackupAgentBase::timeFormat().c_str());
|
||||
printf(" and it will be converted to a version from that time using metadata in the cluster file.\n");
|
||||
printf(" --restorable_after_timestamp DATETIME\n"
|
||||
" For expire operations, set minimum acceptable restorability to the version equivalent of DATETIME and later.\n");
|
||||
printf(" --restorable_after_version VERSION\n"
|
||||
|
@ -997,8 +1060,8 @@ static void printBackupUsage(bool devhelp) {
|
|||
" Specifies a UID to verify against the BackupUID of the running backup. If provided, the UID is verified in the same transaction\n"
|
||||
" which sets the new backup parameters (if the UID matches).\n");
|
||||
printf(" -e ERRORLIMIT The maximum number of errors printed by status (default is 10).\n");
|
||||
printf(" -k KEYS List of key ranges to backup.\n"
|
||||
" If not specified, the entire database will be backed up.\n");
|
||||
printf(" -k KEYS List of key ranges to backup or to filter the backup in query operations.\n"
|
||||
" If not specified, the entire database will be backed up or no filter will be applied.\n");
|
||||
printf(" --partitioned_log_experimental Starts with new type of backup system using partitioned logs.\n");
|
||||
printf(" -n, --dryrun For backup start or restore start, performs a trial run with no actual changes made.\n");
|
||||
printf(" --log Enables trace file logging for the CLI session.\n"
|
||||
|
@ -1320,6 +1383,7 @@ enumBackupType getBackupType(std::string backupType)
|
|||
values["delete"] = BACKUP_DELETE;
|
||||
values["describe"] = BACKUP_DESCRIBE;
|
||||
values["list"] = BACKUP_LIST;
|
||||
values["query"] = BACKUP_QUERY;
|
||||
values["dump"] = BACKUP_DUMP;
|
||||
values["modify"] = BACKUP_MODIFY;
|
||||
}
|
||||
|
@ -2458,6 +2522,135 @@ ACTOR Future<Void> describeBackup(const char *name, std::string destinationConta
|
|||
return Void();
|
||||
}
|
||||
|
||||
static void reportBackupQueryError(UID operationId, JsonBuilderObject& result, std::string errorMessage) {
|
||||
result["error"] = errorMessage;
|
||||
printf("%s\n", result.getJson().c_str());
|
||||
TraceEvent("BackupQueryFailure").detail("OperationId", operationId).detail("Reason", errorMessage);
|
||||
}
|
||||
|
||||
// If restoreVersion is invalidVersion or latestVersion, use the maximum or minimum restorable version respectively for
|
||||
// selected key ranges. If restoreTimestamp is specified, any specified restoreVersion will be overriden to the version
|
||||
// resolved to that timestamp.
|
||||
ACTOR Future<Void> queryBackup(const char* name, std::string destinationContainer,
|
||||
Standalone<VectorRef<KeyRangeRef>> keyRangesFilter, Version restoreVersion,
|
||||
std::string originalClusterFile, std::string restoreTimestamp, bool verbose) {
|
||||
state UID operationId = deterministicRandom()->randomUniqueID();
|
||||
state JsonBuilderObject result;
|
||||
state std::string errorMessage;
|
||||
result["key_ranges_filter"] = printable(keyRangesFilter);
|
||||
result["destination_container"] = destinationContainer;
|
||||
|
||||
TraceEvent("BackupQueryStart")
|
||||
.detail("OperationId", operationId)
|
||||
.detail("DestinationContainer", destinationContainer)
|
||||
.detail("KeyRangesFilter", printable(keyRangesFilter))
|
||||
.detail("SpecifiedRestoreVersion", restoreVersion)
|
||||
.detail("RestoreTimestamp", restoreTimestamp)
|
||||
.detail("BackupClusterFile", originalClusterFile);
|
||||
|
||||
// Resolve restoreTimestamp if given
|
||||
if (!restoreTimestamp.empty()) {
|
||||
if (originalClusterFile.empty()) {
|
||||
reportBackupQueryError(
|
||||
operationId, result,
|
||||
format("an original cluster file must be given in order to resolve restore target timestamp '%s'",
|
||||
restoreTimestamp.c_str()));
|
||||
return Void();
|
||||
}
|
||||
|
||||
if (!fileExists(originalClusterFile)) {
|
||||
reportBackupQueryError(operationId, result,
|
||||
format("The specified original source database cluster file '%s' does not exist\n",
|
||||
originalClusterFile.c_str()));
|
||||
return Void();
|
||||
}
|
||||
|
||||
Database origDb = Database::createDatabase(originalClusterFile, Database::API_VERSION_LATEST);
|
||||
Version v = wait(timeKeeperVersionFromDatetime(restoreTimestamp, origDb));
|
||||
result["restore_timestamp"] = restoreTimestamp;
|
||||
result["restore_timestamp_resolved_version"] = v;
|
||||
restoreVersion = v;
|
||||
}
|
||||
|
||||
try {
|
||||
state Reference<IBackupContainer> bc = openBackupContainer(name, destinationContainer);
|
||||
if (restoreVersion == invalidVersion) {
|
||||
BackupDescription desc = wait(bc->describeBackup());
|
||||
if (desc.maxRestorableVersion.present()) {
|
||||
restoreVersion = desc.maxRestorableVersion.get();
|
||||
// Use continuous log end version for the maximum restorable version for the key ranges.
|
||||
} else if (keyRangesFilter.size() && desc.contiguousLogEnd.present()) {
|
||||
restoreVersion = desc.contiguousLogEnd.get();
|
||||
} else {
|
||||
reportBackupQueryError(
|
||||
operationId, result,
|
||||
errorMessage = format("the backup for the specified key ranges is not restorable to any version"));
|
||||
}
|
||||
}
|
||||
|
||||
if (restoreVersion < 0 && restoreVersion != latestVersion) {
|
||||
reportBackupQueryError(operationId, result,
|
||||
errorMessage =
|
||||
format("the specified restorable version %ld is not valid", restoreVersion));
|
||||
return Void();
|
||||
}
|
||||
Optional<RestorableFileSet> fileSet = wait(bc->getRestoreSet(restoreVersion, keyRangesFilter));
|
||||
if (fileSet.present()) {
|
||||
int64_t totalRangeFilesSize = 0, totalLogFilesSize = 0;
|
||||
result["restore_version"] = fileSet.get().targetVersion;
|
||||
JsonBuilderArray rangeFilesJson;
|
||||
JsonBuilderArray logFilesJson;
|
||||
for (const auto& rangeFile : fileSet.get().ranges) {
|
||||
JsonBuilderObject object;
|
||||
object["file_name"] = rangeFile.fileName;
|
||||
object["file_size"] = rangeFile.fileSize;
|
||||
object["version"] = rangeFile.version;
|
||||
object["key_range"] = fileSet.get().keyRanges.count(rangeFile.fileName) == 0
|
||||
? "none"
|
||||
: fileSet.get().keyRanges.at(rangeFile.fileName).toString();
|
||||
rangeFilesJson.push_back(object);
|
||||
totalRangeFilesSize += rangeFile.fileSize;
|
||||
}
|
||||
for (const auto& log : fileSet.get().logs) {
|
||||
JsonBuilderObject object;
|
||||
object["file_name"] = log.fileName;
|
||||
object["file_size"] = log.fileSize;
|
||||
object["begin_version"] = log.beginVersion;
|
||||
object["end_version"] = log.endVersion;
|
||||
logFilesJson.push_back(object);
|
||||
totalLogFilesSize += log.fileSize;
|
||||
}
|
||||
|
||||
result["total_range_files_size"] = totalRangeFilesSize;
|
||||
result["total_log_files_size"] = totalLogFilesSize;
|
||||
|
||||
if (verbose) {
|
||||
result["ranges"] = rangeFilesJson;
|
||||
result["logs"] = logFilesJson;
|
||||
}
|
||||
|
||||
TraceEvent("BackupQueryReceivedRestorableFilesSet")
|
||||
.detail("DestinationContainer", destinationContainer)
|
||||
.detail("KeyRangesFilter", printable(keyRangesFilter))
|
||||
.detail("ActualRestoreVersion", fileSet.get().targetVersion)
|
||||
.detail("NumRangeFiles", fileSet.get().ranges.size())
|
||||
.detail("NumLogFiles", fileSet.get().logs.size())
|
||||
.detail("RangeFilesBytes", totalRangeFilesSize)
|
||||
.detail("LogFilesBytes", totalLogFilesSize);
|
||||
} else {
|
||||
reportBackupQueryError(operationId, result, "no restorable files set found for specified key ranges");
|
||||
return Void();
|
||||
}
|
||||
|
||||
} catch (Error& e) {
|
||||
reportBackupQueryError(operationId, result, e.what());
|
||||
return Void();
|
||||
}
|
||||
|
||||
printf("%s\n", result.getJson().c_str());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> listBackup(std::string baseUrl) {
|
||||
try {
|
||||
std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl));
|
||||
|
@ -2827,6 +3020,9 @@ int main(int argc, char* argv[]) {
|
|||
case BACKUP_LIST:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupListOptions, SO_O_EXACT);
|
||||
break;
|
||||
case BACKUP_QUERY:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupQueryOptions, SO_O_EXACT);
|
||||
break;
|
||||
case BACKUP_MODIFY:
|
||||
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupModifyOptions, SO_O_EXACT);
|
||||
break;
|
||||
|
@ -2966,6 +3162,7 @@ int main(int argc, char* argv[]) {
|
|||
std::string addPrefix;
|
||||
std::string removePrefix;
|
||||
Standalone<VectorRef<KeyRangeRef>> backupKeys;
|
||||
Standalone<VectorRef<KeyRangeRef>> backupKeysFilter;
|
||||
int maxErrors = 20;
|
||||
Version beginVersion = invalidVersion;
|
||||
Version restoreVersion = invalidVersion;
|
||||
|
@ -3188,6 +3385,15 @@ int main(int argc, char* argv[]) {
|
|||
return FDB_EXIT_ERROR;
|
||||
}
|
||||
break;
|
||||
case OPT_BACKUPKEYS_FILTER:
|
||||
try {
|
||||
addKeyRange(args->OptionArg(), backupKeysFilter);
|
||||
}
|
||||
catch (Error &) {
|
||||
printHelpTeaser(argv[0]);
|
||||
return FDB_EXIT_ERROR;
|
||||
}
|
||||
break;
|
||||
case OPT_DESTCONTAINER:
|
||||
destinationContainer = args->OptionArg();
|
||||
// If the url starts with '/' then prepend "file://" for backwards compatibility
|
||||
|
@ -3727,6 +3933,12 @@ int main(int argc, char* argv[]) {
|
|||
f = stopAfter( listBackup(baseUrl) );
|
||||
break;
|
||||
|
||||
case BACKUP_QUERY:
|
||||
initTraceFile();
|
||||
f = stopAfter(queryBackup(argv[0], destinationContainer, backupKeysFilter, restoreVersion,
|
||||
restoreClusterFileOrig, restoreTimestamp, !quietDisplay));
|
||||
break;
|
||||
|
||||
case BACKUP_DUMP:
|
||||
initTraceFile();
|
||||
f = stopAfter( dumpBackupData(argv[0], destinationContainer, dumpBegin, dumpEnd) );
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/JsonBuilder.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/Hash3.h"
|
||||
|
@ -245,7 +246,7 @@ std::string BackupDescription::toJSON() const {
|
|||
* file written will be after the start version of the snapshot's execution.
|
||||
*
|
||||
* Log files are at file paths like
|
||||
* /plogs/...log,startVersion,endVersion,UID,tagID-of-N,blocksize
|
||||
* /plogs/.../log,startVersion,endVersion,UID,tagID-of-N,blocksize
|
||||
* /logs/.../log,startVersion,endVersion,UID,blockSize
|
||||
* where ... is a multi level path which sorts lexically into version order and results in approximately 1
|
||||
* unique folder per day containing about 5,000 files. Logs after FDB 6.3 are stored in "plogs"
|
||||
|
@ -1403,8 +1404,15 @@ public:
|
|||
}
|
||||
|
||||
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc,
|
||||
Version targetVersion, bool logsOnly,
|
||||
Version beginVersion) {
|
||||
Version targetVersion,
|
||||
VectorRef<KeyRangeRef> keyRangesFilter, bool logsOnly = false,
|
||||
Version beginVersion = invalidVersion) {
|
||||
// Does not support use keyRangesFilter for logsOnly yet
|
||||
if (logsOnly && !keyRangesFilter.empty()) {
|
||||
TraceEvent(SevError, "BackupContainerRestoreSetUnsupportedAPI").detail("KeyRangesFilter", keyRangesFilter.size());
|
||||
return Optional<RestorableFileSet>();
|
||||
}
|
||||
|
||||
if (logsOnly) {
|
||||
state RestorableFileSet restorableSet;
|
||||
state std::vector<LogFile> logFiles;
|
||||
|
@ -1416,23 +1424,55 @@ public:
|
|||
return getRestoreSetFromLogs(logFiles, targetVersion, restorableSet);
|
||||
}
|
||||
}
|
||||
// Find the most recent keyrange snapshot to end at or before targetVersion
|
||||
state Optional<KeyspaceSnapshotFile> snapshot;
|
||||
std::vector<KeyspaceSnapshotFile> snapshots = wait(bc->listKeyspaceSnapshots());
|
||||
for(auto const &s : snapshots) {
|
||||
if(s.endVersion <= targetVersion)
|
||||
snapshot = s;
|
||||
}
|
||||
|
||||
if(snapshot.present()) {
|
||||
// Find the most recent keyrange snapshot through which we can restore filtered key ranges into targetVersion.
|
||||
state std::vector<KeyspaceSnapshotFile> snapshots = wait(bc->listKeyspaceSnapshots());
|
||||
state int i = snapshots.size() - 1;
|
||||
for (; i >= 0; i--) {
|
||||
// The smallest version of filtered range files >= snapshot beginVersion > targetVersion
|
||||
if (targetVersion >= 0 && snapshots[i].beginVersion > targetVersion) {
|
||||
continue;
|
||||
}
|
||||
|
||||
state RestorableFileSet restorable;
|
||||
restorable.snapshot = snapshot.get();
|
||||
restorable.targetVersion = targetVersion;
|
||||
state Version minKeyRangeVersion = MAX_VERSION;
|
||||
state Version maxKeyRangeVersion = -1;
|
||||
|
||||
std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>> results =
|
||||
wait(bc->readKeyspaceSnapshot(snapshot.get()));
|
||||
restorable.ranges = std::move(results.first);
|
||||
restorable.keyRanges = std::move(results.second);
|
||||
wait(bc->readKeyspaceSnapshot(snapshots[i]));
|
||||
|
||||
// Old backup does not have metadata about key ranges and can not be filtered with key ranges.
|
||||
if (keyRangesFilter.size() && results.second.empty() && !results.first.empty()) {
|
||||
throw backup_not_filterable_with_key_ranges();
|
||||
}
|
||||
|
||||
// Filter by keyRangesFilter.
|
||||
if (keyRangesFilter.empty()) {
|
||||
restorable.ranges = std::move(results.first);
|
||||
restorable.keyRanges = std::move(results.second);
|
||||
minKeyRangeVersion = snapshots[i].beginVersion;
|
||||
maxKeyRangeVersion = snapshots[i].endVersion;
|
||||
} else {
|
||||
for (const auto& rangeFile : results.first) {
|
||||
const auto& keyRange = results.second.at(rangeFile.fileName);
|
||||
if (keyRange.intersects(keyRangesFilter)) {
|
||||
restorable.ranges.push_back(rangeFile);
|
||||
restorable.keyRanges[rangeFile.fileName] = keyRange;
|
||||
minKeyRangeVersion = std::min(minKeyRangeVersion, rangeFile.version);
|
||||
maxKeyRangeVersion = std::max(maxKeyRangeVersion, rangeFile.version);
|
||||
}
|
||||
}
|
||||
// No range file matches 'keyRangesFilter'.
|
||||
if (restorable.ranges.empty()) {
|
||||
throw backup_not_overlapped_with_keys_filter();
|
||||
}
|
||||
}
|
||||
// 'latestVersion' represents using the minimum restorable version in a snapshot.
|
||||
restorable.targetVersion = targetVersion == latestVersion ? maxKeyRangeVersion : targetVersion;
|
||||
// Any version < maxKeyRangeVersion is not restorable.
|
||||
if (restorable.targetVersion < maxKeyRangeVersion) continue;
|
||||
|
||||
restorable.snapshot = snapshots[i];
|
||||
// TODO: Reenable the sanity check after TooManyFiles error is resolved
|
||||
if (false && g_network->isSimulated()) {
|
||||
// Sanity check key ranges
|
||||
|
@ -1446,18 +1486,21 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
// No logs needed if there is a complete key space snapshot at the target version.
|
||||
if (snapshot.get().beginVersion == snapshot.get().endVersion &&
|
||||
snapshot.get().endVersion == targetVersion) {
|
||||
// No logs needed if there is a complete filtered key space snapshot at the target version.
|
||||
if (minKeyRangeVersion == maxKeyRangeVersion && maxKeyRangeVersion == restorable.targetVersion) {
|
||||
restorable.continuousBeginVersion = restorable.continuousEndVersion = invalidVersion;
|
||||
TraceEvent("BackupContainerGetRestorableFilesWithoutLogs")
|
||||
.detail("KeyRangeVersion", restorable.targetVersion)
|
||||
.detail("NumberOfRangeFiles", restorable.ranges.size())
|
||||
.detail("KeyRangesFilter", printable(keyRangesFilter));
|
||||
return Optional<RestorableFileSet>(restorable);
|
||||
}
|
||||
|
||||
// FIXME: check if there are tagged logs. for each tag, there is no version gap.
|
||||
state std::vector<LogFile> logs;
|
||||
state std::vector<LogFile> plogs;
|
||||
wait(store(logs, bc->listLogFiles(snapshot.get().beginVersion, targetVersion, false)) &&
|
||||
store(plogs, bc->listLogFiles(snapshot.get().beginVersion, targetVersion, true)));
|
||||
wait(store(logs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, false)) &&
|
||||
store(plogs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, true)));
|
||||
|
||||
if (plogs.size() > 0) {
|
||||
logs.swap(plogs);
|
||||
|
@ -1469,13 +1512,12 @@ public:
|
|||
|
||||
// Remove duplicated log files that can happen for old epochs.
|
||||
std::vector<LogFile> filtered = filterDuplicates(logs);
|
||||
|
||||
restorable.logs.swap(filtered);
|
||||
// sort by version order again for continuous analysis
|
||||
std::sort(restorable.logs.begin(), restorable.logs.end());
|
||||
if (isPartitionedLogsContinuous(restorable.logs, snapshot.get().beginVersion, targetVersion)) {
|
||||
restorable.continuousBeginVersion = snapshot.get().beginVersion;
|
||||
restorable.continuousEndVersion = targetVersion + 1; // not inclusive
|
||||
if (isPartitionedLogsContinuous(restorable.logs, minKeyRangeVersion, restorable.targetVersion)) {
|
||||
restorable.continuousBeginVersion = minKeyRangeVersion;
|
||||
restorable.continuousEndVersion = restorable.targetVersion + 1; // not inclusive
|
||||
return Optional<RestorableFileSet>(restorable);
|
||||
}
|
||||
return Optional<RestorableFileSet>();
|
||||
|
@ -1483,20 +1525,19 @@ public:
|
|||
|
||||
// List logs in version order so log continuity can be analyzed
|
||||
std::sort(logs.begin(), logs.end());
|
||||
|
||||
// If there are logs and the first one starts at or before the snapshot begin version then proceed
|
||||
if(!logs.empty() && logs.front().beginVersion <= snapshot.get().beginVersion) {
|
||||
// If there are logs and the first one starts at or before the keyrange's snapshot begin version, then
|
||||
// it is valid restore set and proceed
|
||||
if (!logs.empty() && logs.front().beginVersion <= minKeyRangeVersion) {
|
||||
return getRestoreSetFromLogs(logs, targetVersion, restorable);
|
||||
}
|
||||
}
|
||||
|
||||
return Optional<RestorableFileSet>();
|
||||
}
|
||||
|
||||
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion, bool logsOnly,
|
||||
Version beginVersion) final {
|
||||
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion, logsOnly,
|
||||
beginVersion);
|
||||
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion, VectorRef<KeyRangeRef> keyRangesFilter,
|
||||
bool logsOnly, Version beginVersion) final {
|
||||
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion, keyRangesFilter,
|
||||
logsOnly, beginVersion);
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -280,10 +280,13 @@ public:
|
|||
|
||||
virtual Future<BackupFileList> dumpFileList(Version begin = 0, Version end = std::numeric_limits<Version>::max()) = 0;
|
||||
|
||||
// Get exactly the files necessary to restore to targetVersion. Returns non-present if
|
||||
// restore to given version is not possible.
|
||||
virtual Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion, bool logsOnly = false,
|
||||
Version beginVersion = -1) = 0;
|
||||
// Get exactly the files necessary to restore the key space filtered by the specified key ranges to targetVersion.
|
||||
// If targetVersion is 'latestVersion', use the minimum restorable version in a snapshot.
|
||||
// If logsOnly is set, only use log files in [beginVersion, targetVervions) in restore set.
|
||||
// Returns non-present if restoring to the given version is not possible.
|
||||
virtual Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion,
|
||||
VectorRef<KeyRangeRef> keyRangesFilter = {},
|
||||
bool logsOnly = false, Version beginVersion = -1) = 0;
|
||||
|
||||
// Get an IBackupContainer based on a container spec string
|
||||
static Reference<IBackupContainer> openContainer(std::string url);
|
||||
|
|
|
@ -257,6 +257,7 @@ struct Traceable<std::set<T>> : std::true_type {
|
|||
std::string printable( const StringRef& val );
|
||||
std::string printable( const std::string& val );
|
||||
std::string printable( const KeyRangeRef& range );
|
||||
std::string printable(const VectorRef<KeyRangeRef>& val);
|
||||
std::string printable( const VectorRef<StringRef>& val );
|
||||
std::string printable( const VectorRef<KeyValueRef>& val );
|
||||
std::string printable( const KeyValueRef& val );
|
||||
|
@ -289,6 +290,14 @@ struct KeyRangeRef {
|
|||
bool contains( const KeyRef& key ) const { return begin <= key && key < end; }
|
||||
bool contains( const KeyRangeRef& keys ) const { return begin <= keys.begin && keys.end <= end; }
|
||||
bool intersects( const KeyRangeRef& keys ) const { return begin < keys.end && keys.begin < end; }
|
||||
bool intersects(const VectorRef<KeyRangeRef>& keysVec) const {
|
||||
for (const auto& keys : keysVec) {
|
||||
if (intersects(keys)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
bool empty() const { return begin == end; }
|
||||
bool singleKeyRange() const { return equalsKeyAfter(begin, end); }
|
||||
|
||||
|
|
|
@ -3470,12 +3470,13 @@ namespace fileBackup {
|
|||
if (beginVersion == invalidVersion) {
|
||||
beginVersion = 0;
|
||||
}
|
||||
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(restoreVersion, incremental, beginVersion));
|
||||
if (!incremental) {
|
||||
beginVersion = restorable.get().snapshot.beginVersion;
|
||||
}
|
||||
Optional<RestorableFileSet> restorable =
|
||||
wait(bc->getRestoreSet(restoreVersion, VectorRef<KeyRangeRef>(), incremental, beginVersion));
|
||||
if (!incremental) {
|
||||
beginVersion = restorable.get().snapshot.beginVersion;
|
||||
}
|
||||
|
||||
if(!restorable.present())
|
||||
if(!restorable.present())
|
||||
throw restore_missing_data();
|
||||
|
||||
// First version for which log data should be applied
|
||||
|
@ -4519,7 +4520,7 @@ public:
|
|||
}
|
||||
|
||||
Optional<RestorableFileSet> restoreSet =
|
||||
wait(bc->getRestoreSet(targetVersion, incrementalBackupOnly, beginVersion));
|
||||
wait(bc->getRestoreSet(targetVersion, VectorRef<KeyRangeRef>(), incrementalBackupOnly, beginVersion));
|
||||
|
||||
if(!restoreSet.present()) {
|
||||
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
|
||||
|
|
|
@ -189,6 +189,12 @@ std::string printable( const KeyRangeRef& range ) {
|
|||
return printable(range.begin) + " - " + printable(range.end);
|
||||
}
|
||||
|
||||
std::string printable(const VectorRef<KeyRangeRef>& val) {
|
||||
std::string s;
|
||||
for (int i = 0; i < val.size(); i++) s = s + printable(val[i]) + " ";
|
||||
return s;
|
||||
}
|
||||
|
||||
int unhex( char c ) {
|
||||
if (c >= '0' && c <= '9')
|
||||
return c-'0';
|
||||
|
@ -3892,12 +3898,14 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan, Da
|
|||
TransactionPriority priority, uint32_t flags,
|
||||
TransactionTagMap<uint32_t> tags, Optional<UID> debugID) {
|
||||
state Span span("NAPI:getConsistentReadVersion"_loc, parentSpan);
|
||||
try {
|
||||
++cx->transactionReadVersionBatches;
|
||||
if( debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
|
||||
loop {
|
||||
|
||||
++cx->transactionReadVersionBatches;
|
||||
if( debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
|
||||
loop {
|
||||
try {
|
||||
state GetReadVersionRequest req( span.context, transactionCount, priority, flags, tags, debugID );
|
||||
|
||||
choose {
|
||||
when ( wait( cx->onProxiesChanged() ) ) {}
|
||||
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getGrvProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &GrvProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
|
||||
|
@ -3926,12 +3934,17 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan, Da
|
|||
return v;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_broken_promise && e.code() != error_code_batch_transaction_throttled)
|
||||
TraceEvent(SevError, "GetConsistentReadVersionError").error(e);
|
||||
if(e.code() == error_code_batch_transaction_throttled && !cx->apiVersionAtLeast(630)) {
|
||||
wait(delayJittered(5.0));
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_broken_promise && e.code() != error_code_batch_transaction_throttled)
|
||||
TraceEvent(SevError, "GetConsistentReadVersionError").error(e);
|
||||
throw;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<DatabaseContext::VersionRequest> versionStream, TransactionPriority priority, uint32_t flags ) {
|
||||
|
|
|
@ -54,6 +54,7 @@ struct RestoreSysInfo;
|
|||
struct RestoreApplierInterface;
|
||||
struct RestoreFinishRequest;
|
||||
struct RestoreSamplesRequest;
|
||||
struct RestoreUpdateRateRequest;
|
||||
|
||||
// RestoreSysInfo includes information each (type of) restore roles should know.
|
||||
// At this moment, it only include appliers. We keep the name for future extension.
|
||||
|
@ -174,6 +175,7 @@ struct RestoreApplierInterface : RestoreRoleInterface {
|
|||
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
|
||||
RequestStream<RestoreSimpleRequest> collectRestoreRoleInterfaces;
|
||||
RequestStream<RestoreFinishRequest> finishRestore;
|
||||
RequestStream<RestoreUpdateRateRequest> updateRate;
|
||||
|
||||
bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); }
|
||||
bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); }
|
||||
|
@ -193,12 +195,13 @@ struct RestoreApplierInterface : RestoreRoleInterface {
|
|||
initVersionBatch.getEndpoint(TaskPriority::LoadBalancedEndpoint);
|
||||
collectRestoreRoleInterfaces.getEndpoint(TaskPriority::LoadBalancedEndpoint);
|
||||
finishRestore.getEndpoint(TaskPriority::LoadBalancedEndpoint);
|
||||
updateRate.getEndpoint(TaskPriority::LoadBalancedEndpoint);
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, *(RestoreRoleInterface*)this, heartbeat, sendMutationVector, applyToDB, initVersionBatch,
|
||||
collectRestoreRoleInterfaces, finishRestore);
|
||||
collectRestoreRoleInterfaces, finishRestore, updateRate);
|
||||
}
|
||||
|
||||
std::string toString() const { return nodeID.toString(); }
|
||||
|
@ -616,6 +619,50 @@ struct RestoreFinishRequest : TimedRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct RestoreUpdateRateReply : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 13018414;
|
||||
|
||||
UID id;
|
||||
double remainMB; // remaining data in MB to write to DB;
|
||||
|
||||
RestoreUpdateRateReply() = default;
|
||||
explicit RestoreUpdateRateReply(UID id, double remainMB) : id(id), remainMB(remainMB) {}
|
||||
|
||||
std::string toString() const {
|
||||
std::stringstream ss;
|
||||
ss << "RestoreUpdateRateReply NodeID:" << id.toString() << " remainMB:" << remainMB;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, id, remainMB);
|
||||
}
|
||||
};
|
||||
|
||||
struct RestoreUpdateRateRequest : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 13018415;
|
||||
|
||||
int batchIndex;
|
||||
double writeMB;
|
||||
|
||||
ReplyPromise<RestoreUpdateRateReply> reply;
|
||||
|
||||
RestoreUpdateRateRequest() = default;
|
||||
explicit RestoreUpdateRateRequest(int batchIndex, double writeMB) : batchIndex(batchIndex), writeMB(writeMB) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, batchIndex, writeMB, reply);
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
std::stringstream ss;
|
||||
ss << "RestoreUpdateRateRequest batchIndex:" << batchIndex << " writeMB:" << writeMB;
|
||||
return ss.str();
|
||||
}
|
||||
};
|
||||
|
||||
struct RestoreRequest {
|
||||
constexpr static FileIdentifier file_identifier = 16035338;
|
||||
|
||||
|
|
|
@ -623,7 +623,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
|
||||
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
|
||||
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
|
||||
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() < 0.2 ? 5 * 1024 : deterministicRandom()->random01() < 0.4 ? 100 * 1024 * 1024 : deterministicRandom()->random01() * 1000.0 * 1024.0 * 1024.0; } // too small value may increase chance of TooManyFile error
|
||||
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() < 0.2 ? 10 * 1024 : deterministicRandom()->random01() < 0.4 ? 100 * 1024 * 1024 : deterministicRandom()->random01() * 1000.0 * 1024.0 * 1024.0; } // too small value may increase chance of TooManyFile error
|
||||
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() < 0.2 ? 2 : deterministicRandom()->random01() * 10 + 1; }
|
||||
init( FASTRESTORE_VB_MONITOR_DELAY, 30 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
|
||||
init( FASTRESTORE_VB_LAUNCH_DELAY, 1.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() < 0.2 ? 0.1 : deterministicRandom()->random01() * 10.0 + 1; }
|
||||
|
@ -646,7 +646,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( FASTRESTORE_REQBATCH_LOG, false ); if( randomize && BUGGIFY ) { FASTRESTORE_REQBATCH_LOG = deterministicRandom()->random01() < 0.2 ? true : false; }
|
||||
init( FASTRESTORE_TXN_CLEAR_MAX, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_CLEAR_MAX = deterministicRandom()->random01() * 100 + 1; }
|
||||
init( FASTRESTORE_TXN_RETRY_MAX, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_RETRY_MAX = deterministicRandom()->random01() * 100 + 1; }
|
||||
init( FASTRESTORE_TXN_EXTRA_DELAY, 0.1 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_EXTRA_DELAY = deterministicRandom()->random01() * 1 + 0.001;}
|
||||
init( FASTRESTORE_TXN_EXTRA_DELAY, 0.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_EXTRA_DELAY = deterministicRandom()->random01() * 1 + 0.001;}
|
||||
init( FASTRESTORE_NOT_WRITE_DB, false ); // Perf test only: set it to true will cause simulation failure
|
||||
init( FASTRESTORE_USE_RANGE_FILE, true ); // Perf test only: set it to false will cause simulation failure
|
||||
init( FASTRESTORE_USE_LOG_FILE, true ); // Perf test only: set it to false will cause simulation failure
|
||||
|
@ -661,7 +661,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 15 + 1;}
|
||||
init( FASTRESTORE_NUM_TRACE_EVENTS, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_TRACE_EVENTS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 500 + 1;}
|
||||
init( FASTRESTORE_EXPENSIVE_VALIDATION, false ); if( randomize && BUGGIFY ) { FASTRESTORE_EXPENSIVE_VALIDATION = deterministicRandom()->random01() < 0.5 ? true : false;}
|
||||
|
||||
init( FASTRESTORE_WRITE_BW_MB, 70 ); if( randomize && BUGGIFY ) { FASTRESTORE_WRITE_BW_MB = deterministicRandom()->random01() < 0.5 ? 2 : 100;}
|
||||
init( FASTRESTORE_RATE_UPDATE_SECONDS, 1.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_RATE_UPDATE_SECONDS = deterministicRandom()->random01() < 0.5 ? 0.1 : 2;}
|
||||
|
||||
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
|
||||
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
|
||||
|
|
|
@ -547,6 +547,7 @@ public:
|
|||
int64_t TIME_KEEPER_MAX_ENTRIES;
|
||||
|
||||
// Fast Restore
|
||||
// TODO: After 6.3, review FR knobs, remove unneeded ones and change default value
|
||||
int64_t FASTRESTORE_FAILURE_TIMEOUT;
|
||||
int64_t FASTRESTORE_HEARTBEAT_INTERVAL;
|
||||
double FASTRESTORE_SAMPLING_PERCENT;
|
||||
|
@ -594,6 +595,8 @@ public:
|
|||
int FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH; // number of future VB sendLoadingParam requests to process at once
|
||||
int FASTRESTORE_NUM_TRACE_EVENTS;
|
||||
bool FASTRESTORE_EXPENSIVE_VALIDATION; // when set true, performance will be heavily affected
|
||||
double FASTRESTORE_WRITE_BW_MB; // target aggregated write bandwidth from all appliers
|
||||
double FASTRESTORE_RATE_UPDATE_SECONDS; // how long to update appliers target write rate
|
||||
|
||||
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
|
||||
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
|
||||
|
|
|
@ -40,6 +40,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
Reference<RestoreApplierData> self);
|
||||
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
|
||||
Database cx);
|
||||
void handleUpdateRateRequest(RestoreUpdateRateRequest req, Reference<RestoreApplierData> self);
|
||||
|
||||
ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int nodeIndex, Database cx) {
|
||||
state Reference<RestoreApplierData> self =
|
||||
|
@ -71,6 +72,10 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
req, self, cx)); // TODO: Check how FDB uses TaskPriority for ACTORS. We may need to add
|
||||
// priority here to avoid requests at later VB block requests at earlier VBs
|
||||
}
|
||||
when(RestoreUpdateRateRequest req = waitNext(applierInterf.updateRate.getFuture())) {
|
||||
requestTypeStr = "updateRate";
|
||||
handleUpdateRateRequest(req, self);
|
||||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(applierInterf.initVersionBatch.getFuture())) {
|
||||
requestTypeStr = "initVersionBatch";
|
||||
actors.add(handleInitVersionBatchRequest(req, self));
|
||||
|
@ -218,6 +223,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
|
||||
loop {
|
||||
try {
|
||||
// TODO: Consider clearrange traffic in write traffic control
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
for (auto& range : ranges) {
|
||||
|
@ -463,31 +469,55 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
return Void();
|
||||
}
|
||||
|
||||
bool okToReleaseTxns(double targetMB, double applyingDataBytes) {
|
||||
return applyingDataBytes < targetMB * 1024 * 1024;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> shouldReleaseTransaction(double* targetMB, double* applyingDataBytes,
|
||||
AsyncTrigger* releaseTxns) {
|
||||
loop {
|
||||
if (okToReleaseTxns(*targetMB, *applyingDataBytes)) {
|
||||
break;
|
||||
} else {
|
||||
wait(releaseTxns->onTrigger());
|
||||
wait(delay(0.0)); // Avoid all waiting txns are triggered at the same time and all decide to proceed before
|
||||
// applyingDataBytes has a chance to update
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Apply mutations in batchData->stagingKeys [begin, end).
|
||||
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
|
||||
std::map<Key, StagingKey>::iterator end, Database cx,
|
||||
FlowLock* applyStagingKeysBatchLock, UID applierID,
|
||||
ApplierBatchData::Counters* cc) {
|
||||
std::map<Key, StagingKey>::iterator end, Database cx, UID applierID,
|
||||
ApplierBatchData::Counters* cc, double* appliedBytes,
|
||||
double* applyingDataBytes, double* targetMB,
|
||||
AsyncTrigger* releaseTxnTrigger) {
|
||||
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) {
|
||||
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchSkipped", applierID).detail("Begin", begin->first);
|
||||
ASSERT(!g_network->isSimulated());
|
||||
return Void();
|
||||
}
|
||||
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock?
|
||||
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
||||
wait(shouldReleaseTransaction(targetMB, applyingDataBytes, releaseTxnTrigger));
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state int sets = 0;
|
||||
state int clears = 0;
|
||||
state Key endKey = begin->first;
|
||||
state double txnSize = 0;
|
||||
state double txnSizeUsed = 0; // txn size accounted in applyingDataBytes
|
||||
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first);
|
||||
loop {
|
||||
try {
|
||||
txnSize = 0;
|
||||
txnSizeUsed = 0;
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
std::map<Key, StagingKey>::iterator iter = begin;
|
||||
while (iter != end) {
|
||||
if (iter->second.type == MutationRef::SetValue) {
|
||||
tr->set(iter->second.key, iter->second.val);
|
||||
txnSize += iter->second.totalSize();
|
||||
cc->appliedMutations += 1;
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
|
||||
.detail("SetKey", iter->second.key);
|
||||
|
@ -501,6 +531,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
.detail("SubVersion", iter->second.version.sub);
|
||||
}
|
||||
tr->clear(singleKeyRange(iter->second.key));
|
||||
txnSize += iter->second.totalSize();
|
||||
cc->appliedMutations += 1;
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
|
||||
.detail("ClearKey", iter->second.key);
|
||||
|
@ -523,12 +554,21 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
.detail("Sets", sets)
|
||||
.detail("Clears", clears);
|
||||
tr->addWriteConflictRange(KeyRangeRef(begin->first, keyAfter(endKey))); // Reduce resolver load
|
||||
txnSizeUsed = txnSize;
|
||||
*applyingDataBytes += txnSizeUsed; // Must account for applying bytes before wait for write traffic control
|
||||
wait(tr->commit());
|
||||
cc->appliedTxns += 1;
|
||||
cc->appliedBytes += txnSize;
|
||||
*appliedBytes += txnSize;
|
||||
*applyingDataBytes -= txnSizeUsed;
|
||||
if (okToReleaseTxns(*targetMB, *applyingDataBytes)) {
|
||||
releaseTxnTrigger->trigger();
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
cc->appliedTxnRetries += 1;
|
||||
wait(tr->onError(e));
|
||||
*applyingDataBytes -= txnSizeUsed;
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
|
@ -545,13 +585,14 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysStart", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
batchData->totalBytesToWrite = 0;
|
||||
while (cur != batchData->stagingKeys.end()) {
|
||||
txnSize += cur->second.expectedMutationSize();
|
||||
txnSize += cur->second.totalSize(); // should be consistent with receivedBytes accounting method
|
||||
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters));
|
||||
batchData->counters.appliedBytes += txnSize;
|
||||
batchData->appliedBytes += txnSize;
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, applierID, &batchData->counters,
|
||||
&batchData->appliedBytes, &batchData->applyingDataBytes,
|
||||
&batchData->targetWriteRateMB, &batchData->releaseTxnTrigger));
|
||||
batchData->totalBytesToWrite += txnSize;
|
||||
begin = cur;
|
||||
txnSize = 0;
|
||||
txnBatches++;
|
||||
|
@ -559,10 +600,10 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
cur++;
|
||||
}
|
||||
if (begin != batchData->stagingKeys.end()) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters));
|
||||
batchData->counters.appliedBytes += txnSize;
|
||||
batchData->appliedBytes += txnSize;
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, applierID, &batchData->counters,
|
||||
&batchData->appliedBytes, &batchData->applyingDataBytes,
|
||||
&batchData->targetWriteRateMB, &batchData->releaseTxnTrigger));
|
||||
batchData->totalBytesToWrite += txnSize;
|
||||
txnBatches++;
|
||||
}
|
||||
|
||||
|
@ -571,18 +612,19 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("StagingKeys", batchData->stagingKeys.size())
|
||||
.detail("TransactionBatches", txnBatches);
|
||||
.detail("TransactionBatches", txnBatches)
|
||||
.detail("TotalBytesToWrite", batchData->totalBytesToWrite);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Write mutations to the destination DB
|
||||
ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Reference<ApplierBatchData> batchData,
|
||||
Database cx) {
|
||||
TraceEvent("FastRestoreApplerPhaseApplyTxnStart", applierID).detail("BatchIndex", batchIndex);
|
||||
TraceEvent("FastRestoreApplierPhaseApplyTxnStart", applierID).detail("BatchIndex", batchIndex);
|
||||
wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx));
|
||||
|
||||
wait(applyStagingKeys(batchData, applierID, batchIndex, cx));
|
||||
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID)
|
||||
TraceEvent("FastRestoreApplierPhaseApplyTxnDone", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("AppliedBytes", batchData->appliedBytes)
|
||||
.detail("ReceivedBytes", batchData->receivedBytes);
|
||||
|
@ -590,6 +632,52 @@ ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Referen
|
|||
return Void();
|
||||
}
|
||||
|
||||
void handleUpdateRateRequest(RestoreUpdateRateRequest req, Reference<RestoreApplierData> self) {
|
||||
TraceEvent ev("FastRestoreApplierUpdateRateRequest", self->id());
|
||||
ev.suppressFor(10)
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("FinishedBatch", self->finishedBatch.get())
|
||||
.detail("WriteMB", req.writeMB);
|
||||
double remainingDataMB = 0;
|
||||
if (self->finishedBatch.get() == req.batchIndex - 1) { // current applying batch
|
||||
Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
||||
ASSERT(batchData.isValid());
|
||||
batchData->targetWriteRateMB = req.writeMB;
|
||||
remainingDataMB = batchData->totalBytesToWrite > 0
|
||||
? std::max(0.0, batchData->totalBytesToWrite - batchData->appliedBytes) / 1024 / 1024
|
||||
: batchData->receivedBytes / 1024 / 1024;
|
||||
ev.detail("TotalBytesToWrite", batchData->totalBytesToWrite)
|
||||
.detail("AppliedBytes", batchData->appliedBytes)
|
||||
.detail("ReceivedBytes", batchData->receivedBytes)
|
||||
.detail("TargetWriteRateMB", batchData->targetWriteRateMB)
|
||||
.detail("RemainingDataMB", remainingDataMB);
|
||||
}
|
||||
req.reply.send(RestoreUpdateRateReply(self->id(), remainingDataMB));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> traceRate(const char* context, Reference<ApplierBatchData> batchData, int batchIndex,
|
||||
UID nodeID, NotifiedVersion* finishedVB) {
|
||||
loop {
|
||||
if ((finishedVB->get() != batchIndex - 1) || !batchData.isValid()) {
|
||||
break;
|
||||
}
|
||||
TraceEvent(context, nodeID)
|
||||
.suppressFor(10)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("FinishedBatchIndex", finishedVB->get())
|
||||
.detail("TotalDataToWriteMB", batchData->totalBytesToWrite / 1024 / 1024)
|
||||
.detail("AppliedBytesMB", batchData->appliedBytes / 1024 / 1024)
|
||||
.detail("TargetBytesMB", batchData->targetWriteRateMB)
|
||||
.detail("InflightBytesMB", batchData->applyingDataBytes)
|
||||
.detail("ReceivedBytes", batchData->receivedBytes);
|
||||
wait(delay(5.0));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
|
||||
Database cx) {
|
||||
TraceEvent("FastRestoreApplierPhaseHandleApplyToDBStart", self->id())
|
||||
|
@ -601,9 +689,9 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
wait(self->finishedBatch.whenAtLeast(req.batchIndex - 1));
|
||||
|
||||
state bool isDuplicated = true;
|
||||
if (self->finishedBatch.get() ==
|
||||
req.batchIndex - 1) { // duplicate request from earlier version batch will be ignored
|
||||
Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
||||
if (self->finishedBatch.get() == req.batchIndex - 1) {
|
||||
// duplicate request from earlier version batch will be ignored
|
||||
state Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
||||
ASSERT(batchData.isValid());
|
||||
TraceEvent("FastRestoreApplierPhaseHandleApplyToDBRunning", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
|
@ -618,6 +706,8 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
batchData->dbApplier = Never();
|
||||
batchData->dbApplier = writeMutationsToDB(self->id(), req.batchIndex, batchData, cx);
|
||||
batchData->vbState = ApplierVersionBatchState::WRITE_TO_DB;
|
||||
batchData->rateTracer = traceRate("FastRestoreApplierTransactionRateControl", batchData, req.batchIndex,
|
||||
self->id(), &self->finishedBatch);
|
||||
}
|
||||
|
||||
ASSERT(batchData->dbApplier.present());
|
||||
|
@ -626,9 +716,11 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
|
||||
wait(batchData->dbApplier.get());
|
||||
|
||||
// Multiple actor invokation can wait on req.batchIndex-1;
|
||||
// Multiple actors can wait on req.batchIndex-1;
|
||||
// Avoid setting finishedBatch when finishedBatch > req.batchIndex
|
||||
if (self->finishedBatch.get() == req.batchIndex - 1) {
|
||||
batchData->rateTracer = traceRate("FastRestoreApplierTransactionRateControlDone", batchData, req.batchIndex,
|
||||
self->id(), &self->finishedBatch); // Track the last rate info
|
||||
self->finishedBatch.set(req.batchIndex);
|
||||
// self->batch[req.batchIndex]->vbState = ApplierVersionBatchState::DONE;
|
||||
// Free memory for the version batch
|
||||
|
|
|
@ -199,7 +199,7 @@ struct StagingKey {
|
|||
return pendingMutations.empty() || version >= pendingMutations.rbegin()->first;
|
||||
}
|
||||
|
||||
int expectedMutationSize() { return key.size() + val.size(); }
|
||||
int totalSize() { return MutationRef::OVERHEAD_BYTES + key.size() + val.size(); }
|
||||
};
|
||||
|
||||
// The range mutation received on applier.
|
||||
|
@ -244,7 +244,6 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
VersionedMutationsMap kvOps; // Mutations at each version
|
||||
std::map<Key, StagingKey> stagingKeys;
|
||||
std::set<StagingKeyRange> stagingKeyRanges;
|
||||
FlowLock applyStagingKeysBatchLock;
|
||||
|
||||
Future<Void> pollMetrics;
|
||||
|
||||
|
@ -253,8 +252,13 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
long receiveMutationReqs;
|
||||
|
||||
// Stats
|
||||
long receivedBytes;
|
||||
long appliedBytes;
|
||||
double receivedBytes; // received mutation size
|
||||
double appliedBytes; // after coalesce, how many bytes to write to DB
|
||||
double targetWriteRateMB; // target amount of data outstanding for DB;
|
||||
double totalBytesToWrite; // total amount of data in bytes to write
|
||||
double applyingDataBytes; // amount of data in flight of committing
|
||||
AsyncTrigger releaseTxnTrigger; // trigger to release more txns
|
||||
Future<Void> rateTracer; // trace transaction rate control info
|
||||
|
||||
// Status counters
|
||||
struct Counters {
|
||||
|
@ -280,14 +284,18 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
void delref() { return ReferenceCounted<ApplierBatchData>::delref(); }
|
||||
|
||||
explicit ApplierBatchData(UID nodeID, int batchIndex)
|
||||
: counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM),
|
||||
vbState(ApplierVersionBatchState::NOT_INIT), receiveMutationReqs(0), receivedBytes(0), appliedBytes(0) {
|
||||
: counters(this, nodeID, batchIndex),
|
||||
targetWriteRateMB(SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS),
|
||||
totalBytesToWrite(-1), applyingDataBytes(0), vbState(ApplierVersionBatchState::NOT_INIT),
|
||||
receiveMutationReqs(0), receivedBytes(0), appliedBytes(0) {
|
||||
pollMetrics = traceCounters(format("FastRestoreApplierMetrics%d", batchIndex), nodeID,
|
||||
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
|
||||
nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));
|
||||
TraceEvent("FastRestoreApplierMetricsCreated").detail("Node", nodeID);
|
||||
}
|
||||
~ApplierBatchData() = default;
|
||||
~ApplierBatchData() {
|
||||
rateTracer = Void(); // cancel actor
|
||||
}
|
||||
|
||||
void addMutation(MutationRef m, LogMessageVersion ver) {
|
||||
if (!isRangeMutation(m)) {
|
||||
|
|
|
@ -105,24 +105,23 @@ ACTOR Future<Void> sampleBackups(Reference<RestoreControllerData> self, RestoreC
|
|||
}
|
||||
|
||||
ACTOR Future<Void> startRestoreController(Reference<RestoreWorkerData> controllerWorker, Database cx) {
|
||||
state ActorCollection actors(false);
|
||||
|
||||
ASSERT(controllerWorker.isValid());
|
||||
ASSERT(controllerWorker->controllerInterf.present());
|
||||
state Reference<RestoreControllerData> self =
|
||||
Reference<RestoreControllerData>(new RestoreControllerData(controllerWorker->controllerInterf.get().id()));
|
||||
state Future<Void> error = actorCollection(self->addActor.getFuture());
|
||||
|
||||
try {
|
||||
// recruitRestoreRoles must come after controllerWorker has finished collectWorkerInterface
|
||||
wait(recruitRestoreRoles(controllerWorker, self));
|
||||
|
||||
actors.add(updateHeartbeatTime(self));
|
||||
actors.add(checkRolesLiveness(self));
|
||||
actors.add(updateProcessMetrics(self));
|
||||
actors.add(traceProcessMetrics(self, "RestoreController"));
|
||||
actors.add(sampleBackups(self, controllerWorker->controllerInterf.get()));
|
||||
self->addActor.send(updateHeartbeatTime(self));
|
||||
self->addActor.send(checkRolesLiveness(self));
|
||||
self->addActor.send(updateProcessMetrics(self));
|
||||
self->addActor.send(traceProcessMetrics(self, "RestoreController"));
|
||||
self->addActor.send(sampleBackups(self, controllerWorker->controllerInterf.get()));
|
||||
|
||||
wait(startProcessRestoreRequests(self, cx));
|
||||
wait(startProcessRestoreRequests(self, cx) || error);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_operation_cancelled) {
|
||||
TraceEvent(SevError, "FastRestoreControllerStart").detail("Reason", "Unexpected unhandled error").error(e);
|
||||
|
@ -304,7 +303,6 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
state std::vector<RestoreFileFR> logFiles;
|
||||
state std::vector<RestoreFileFR> allFiles;
|
||||
state Version minRangeVersion = MAX_VERSION;
|
||||
state Future<Void> error = actorCollection(self->addActor.getFuture());
|
||||
|
||||
self->initBackupContainer(request.url);
|
||||
|
||||
|
@ -383,7 +381,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
}
|
||||
|
||||
try {
|
||||
wait(waitForAll(fBatches) || error);
|
||||
wait(waitForAll(fBatches));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreControllerDispatchVersionBatchesUnexpectedError").error(e);
|
||||
}
|
||||
|
@ -748,7 +746,9 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
|||
std::cout << "Restore to version: " << request.targetVersion << "\nBackupDesc: \n" << desc.toString() << "\n\n";
|
||||
}
|
||||
|
||||
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(request.targetVersion));
|
||||
state VectorRef<KeyRangeRef> restoreRanges;
|
||||
restoreRanges.add(request.range);
|
||||
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(request.targetVersion, restoreRanges));
|
||||
|
||||
if (!restorable.present()) {
|
||||
TraceEvent(SevWarn, "FastRestoreControllerPhaseCollectBackupFiles")
|
||||
|
@ -908,6 +908,49 @@ ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInt
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Calculate the amount of data each applier should keep outstanding to DB;
|
||||
// This is the amount of data that are in in-progress transactions.
|
||||
ACTOR static Future<Void> updateApplierWriteBW(Reference<ControllerBatchData> batchData,
|
||||
std::map<UID, RestoreApplierInterface> appliersInterf, int batchIndex) {
|
||||
state std::unordered_map<UID, double> applierRemainMB;
|
||||
state double totalRemainMB = SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB;
|
||||
state double standardAvgBW = SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS;
|
||||
state int loopCount = 0;
|
||||
state std::vector<RestoreUpdateRateReply> replies;
|
||||
state std::vector<std::pair<UID, RestoreUpdateRateRequest>> requests;
|
||||
for (auto& applier : appliersInterf) {
|
||||
applierRemainMB[applier.first] = SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS;
|
||||
}
|
||||
|
||||
loop {
|
||||
requests.clear();
|
||||
for (auto& applier : appliersInterf) {
|
||||
double writeRate = totalRemainMB > 1 ? (applierRemainMB[applier.first] / totalRemainMB) *
|
||||
SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB
|
||||
: standardAvgBW;
|
||||
requests.emplace_back(applier.first, RestoreUpdateRateRequest(batchIndex, writeRate));
|
||||
}
|
||||
replies.clear();
|
||||
wait(getBatchReplies(
|
||||
&RestoreApplierInterface::updateRate, appliersInterf, requests, &replies,
|
||||
TaskPriority::DefaultEndpoint)); // DefaultEndpoint has higher priority than fast restore endpoints
|
||||
ASSERT(replies.size() == requests.size());
|
||||
totalRemainMB = 0;
|
||||
for (int i = 0; i < replies.size(); i++) {
|
||||
UID& applierID = requests[i].first;
|
||||
applierRemainMB[applierID] = replies[i].remainMB;
|
||||
totalRemainMB += replies[i].remainMB;
|
||||
}
|
||||
ASSERT(totalRemainMB >= 0);
|
||||
double delayTime = SERVER_KNOBS->FASTRESTORE_RATE_UPDATE_SECONDS;
|
||||
if (loopCount == 0) { // First loop: Need to update writeRate quicker
|
||||
delayTime = 0.2;
|
||||
}
|
||||
loopCount++;
|
||||
wait(delay(delayTime));
|
||||
}
|
||||
}
|
||||
|
||||
// Ask each applier to apply its received mutations to DB
|
||||
// NOTE: Controller cannot start applying mutations at batchIndex until all appliers have applied for (batchIndex - 1)
|
||||
// because appliers at different batchIndex may have overlapped key ranges.
|
||||
|
@ -921,6 +964,8 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<ControllerBatc
|
|||
|
||||
wait(finishedBatch->whenAtLeast(batchIndex - 1));
|
||||
|
||||
state Future<Void> updateRate;
|
||||
|
||||
if (finishedBatch->get() == batchIndex - 1) {
|
||||
// Prepare the applyToDB requests
|
||||
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
|
||||
|
@ -940,6 +985,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<ControllerBatc
|
|||
batchData->applyToDB = Never();
|
||||
batchData->applyToDB = getBatchReplies(&RestoreApplierInterface::applyToDB, appliersInterf, requests,
|
||||
&replies, TaskPriority::RestoreApplierWriteDB);
|
||||
updateRate = updateApplierWriteBW(batchData, appliersInterf, batchIndex);
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreControllerPhaseApplyToDB")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
|
@ -1051,6 +1097,7 @@ ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreControllerData
|
|||
}
|
||||
|
||||
// Update the most recent time when controller receives hearbeat from each loader and applier
|
||||
// TODO: Replace the heartbeat mechanism with FDB failure monitoring mechanism
|
||||
ACTOR static Future<Void> updateHeartbeatTime(Reference<RestoreControllerData> self) {
|
||||
wait(self->recruitedRoles.getFuture());
|
||||
|
||||
|
@ -1086,10 +1133,18 @@ ACTOR static Future<Void> updateHeartbeatTime(Reference<RestoreControllerData> s
|
|||
}
|
||||
|
||||
fTimeout = delay(SERVER_KNOBS->FASTRESTORE_HEARTBEAT_DELAY);
|
||||
wait(waitForAll(fReplies) || fTimeout);
|
||||
|
||||
// Here we have to handle error, otherwise controller worker will fail and exit.
|
||||
try {
|
||||
wait(waitForAll(fReplies) || fTimeout);
|
||||
} catch (Error& e) {
|
||||
// This should be an ignorable error.
|
||||
TraceEvent(g_network->isSimulated() ? SevWarnAlways : SevError, "FastRestoreUpdateHeartbeatError").error(e);
|
||||
}
|
||||
|
||||
// Update the most recent heart beat time for each role
|
||||
for (int i = 0; i < fReplies.size(); ++i) {
|
||||
if (fReplies[i].isReady()) {
|
||||
if (!fReplies[i].isError() && fReplies[i].isReady()) {
|
||||
double currentTime = now();
|
||||
auto item = self->rolesHeartBeatTime.emplace(nodes[i], currentTime);
|
||||
item.first->second = currentTime;
|
||||
|
|
|
@ -177,7 +177,8 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
|
|||
versionBatches.clear();
|
||||
batch.clear();
|
||||
batchStatus.clear();
|
||||
finishedBatch = NotifiedVersion();
|
||||
finishedBatch = NotifiedVersion(0);
|
||||
versionBatchId = NotifiedVersion(0);
|
||||
ASSERT(runningVersionBatches.get() == 0);
|
||||
}
|
||||
|
||||
|
|
|
@ -1900,11 +1900,13 @@ int main(int argc, char* argv[]) {
|
|||
g_network->run();
|
||||
}
|
||||
} else if (role == MultiTester) {
|
||||
setupRunLoopProfiler();
|
||||
f = stopAfter(runTests(opts.connectionFile, TEST_TYPE_FROM_FILE,
|
||||
opts.testOnServers ? TEST_ON_SERVERS : TEST_ON_TESTERS, opts.minTesterCount,
|
||||
opts.testFile, StringRef(), opts.localities));
|
||||
g_network->run();
|
||||
} else if (role == Test) {
|
||||
setupRunLoopProfiler();
|
||||
auto m = startSystemMonitor(opts.dataFolder, opts.zoneId, opts.zoneId);
|
||||
f = stopAfter(runTests(opts.connectionFile, TEST_TYPE_FROM_FILE, TEST_HERE, 1, opts.testFile, StringRef(),
|
||||
opts.localities));
|
||||
|
|
|
@ -101,7 +101,7 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
// case 10:
|
||||
// TEST(true); // Testing atomic CompareAndClear Not supported yet
|
||||
// opType = MutationRef::CompareAndClear
|
||||
// break;
|
||||
// break;
|
||||
default:
|
||||
ASSERT(false);
|
||||
}
|
||||
|
|
|
@ -476,6 +476,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
.detail("LastBackupContainer", lastBackupContainer->getURL())
|
||||
.detail("RestoreAfter", self->restoreAfter)
|
||||
.detail("BackupTag", printable(self->backupTag));
|
||||
// start restoring
|
||||
|
||||
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
|
||||
BackupDescription desc = wait(container->describeBackup());
|
||||
|
|
|
@ -104,6 +104,10 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
Future<Void> f;
|
||||
{
|
||||
ReadYourWritesTransaction ryw{ cx->clone() };
|
||||
if(!ryw.getDatabase()->apiVersionAtLeast(630)) {
|
||||
//This test is not valid for API versions smaller than 630
|
||||
return;
|
||||
}
|
||||
f = success(ryw.get(LiteralStringRef("\xff\xff/status/json")));
|
||||
TEST(!f.isReady());
|
||||
}
|
||||
|
|
|
@ -204,6 +204,8 @@ ERROR( backup_cannot_expire, 2316, "Cannot expire requested data from backup wit
|
|||
ERROR( backup_auth_missing, 2317, "Cannot find authentication details (such as a password or secret key) for the specified Backup Container URL")
|
||||
ERROR( backup_auth_unreadable, 2318, "Cannot read or parse one or more sources of authentication information for Backup Container URLs")
|
||||
ERROR( backup_does_not_exist, 2319, "Backup does not exist")
|
||||
ERROR( backup_not_filterable_with_key_ranges, 2320, "Backup before 6.3 cannot be filtered with key ranges")
|
||||
ERROR( backup_not_overlapped_with_keys_filter, 2321, "Backup key ranges doesn't overlap with key ranges filter")
|
||||
ERROR( restore_invalid_version, 2361, "Invalid restore version")
|
||||
ERROR( restore_corrupted_data, 2362, "Corrupted backup data")
|
||||
ERROR( restore_missing_data, 2363, "Missing backup data")
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{707FC06F-9954-4A7E-AC9C-A52C99AE776D}'
|
||||
Id='{1377F0A0-D1AC-4B72-ADA7-7180D002A307}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
|
Loading…
Reference in New Issue