Merge branch 'release-6.3'

# Conflicts:
#	fdbserver/RestoreLoader.actor.cpp
This commit is contained in:
Evan Tschannen 2020-08-11 22:45:43 -07:00
commit 2f52c5f79b
28 changed files with 822 additions and 187 deletions

View File

@ -30,7 +30,7 @@ import sun.misc.Unsafe;
/**
* Utility code to do optimized byte-array comparison.
* This is borrowed and slightly modified from Guava's {@link UnsignedBytes}
* This is borrowed and slightly modified from Guava's UnsignedBytes
* class to be able to compare arrays that start at non-zero offsets.
*/
abstract class FastByteComparisons {
@ -38,14 +38,14 @@ abstract class FastByteComparisons {
private static final int UNSIGNED_MASK = 0xFF;
/**
* Lexicographically compare two byte arrays.
*
*
* @param buffer1 left operand, expected to not be null
* @param buffer2 right operand, expected to not be null
* @param offset1 Where to start comparing in the left buffer, expected to be >= 0
* @param offset2 Where to start comparing in the right buffer, expected to be >= 0
* @param length1 How much to compare from the left buffer, expected to be >= 0
* @param length2 How much to compare from the right buffer, expected to be >= 0
* @return 0 if equal, < 0 if left is less than right, etc.
* @param offset1 Where to start comparing in the left buffer, expected to be &gt;= 0
* @param offset2 Where to start comparing in the right buffer, expected to be &gt;= 0
* @param length1 How much to compare from the left buffer, expected to be &gt;= 0
* @param length2 How much to compare from the right buffer, expected to be &gt;= 0
* @return 0 if equal, &lt; 0 if left is less than right, etc.
*/
public static int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
@ -59,7 +59,7 @@ abstract class FastByteComparisons {
interface Comparer<T> extends Comparator<T> {
/**
* Lexicographically compare two byte arrays.
*
*
* @param buffer1 left operand
* @param buffer2 right operand
* @param offset1 Where to start comparing in the left buffer
@ -291,4 +291,4 @@ abstract class FastByteComparisons {
}
}
}
}
}

View File

@ -1,5 +1,5 @@
set(SRCS
Program.cs
${CMAKE_CURRENT_BINARY_DIR}/Program.cs
Properties/AssemblyInfo.cs)
set(TEST_HARNESS_REFERENCES
@ -7,6 +7,8 @@ set(TEST_HARNESS_REFERENCES
set(out_file ${CMAKE_BINARY_DIR}/packages/bin/TestHarness.exe)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/Program.cs.cmake ${CMAKE_CURRENT_BINARY_DIR}/Program.cs)
add_custom_command(OUTPUT ${out_file}
COMMAND ${MCS_EXECUTABLE} ARGS ${TEST_HARNESS_REFERENCES} ${SRCS} "-target:exe" "-out:${out_file}"
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}

View File

@ -154,6 +154,10 @@ namespace SummarizeTest
throw;
}
}
else if (args[0] == "version")
{
return VersionInfo();
}
return UsageMessage();
}
@ -1522,6 +1526,16 @@ namespace SummarizeTest
}
}
private static int VersionInfo()
{
Console.WriteLine("Version: 1.02");
Console.WriteLine("FDB Project Ver: " + "${CMAKE_PROJECT_VERSION}");
Console.WriteLine("FDB Version: " + "${CMAKE_PROJECT_VERSION_MAJOR}" + "." + "${CMAKE_PROJECT_VERSION_MINOR}");
Console.WriteLine("Source Version: " + "${CURRENT_GIT_VERSION}");
return 1;
}
private static int UsageMessage()
{
Console.WriteLine("Usage:");
@ -1532,7 +1546,7 @@ namespace SummarizeTest
Console.WriteLine(" TestHarness remote [queue folder] [root foundation folder] [duration in hours] [amount of tests] [all/fast/<test_path>] [scope]");
Console.WriteLine(" TestHarness extract-errors [summary-file] [error-summary-file]");
Console.WriteLine(" TestHarness joshua-run <useValgrind> <maxTries>");
Console.WriteLine("Version: 1.01");
VersionInfo();
return 1;
}
}

View File

@ -7,3 +7,365 @@ FoundationDB makes your architecture flexible and easy to operate. Your applicat
The following diagram details the logical architecture.
.. image:: /images/Architecture.png
Detailed FoundationDB Architecture
----------------------------------
The FoundationDB architecture chooses a decoupled design, where
processes are assigned different heterogeneous roles (e.g.,
Coordinators, Storage Servers, Master). Scaling the database is achieved
by horizontally expanding the number of processes for separate roles:
Coordinators
~~~~~~~~~~~~
All clients and servers connect to a FoundationDB cluster with a cluster
file, which contains the IP:PORT of the coordinators. Both the clients
and servers use the coordinators to connect with the cluster controller.
The servers will attempt to become the cluster controller if one does
not exist, and register with the cluster controller once one has been
elected. Clients use the cluster controller to keep an up-to-date list
of proxies.
Cluster Controller
~~~~~~~~~~~~~~~~~~
The cluster controller is a singleton elected by a majority of
coordinators. It is the entry point for all processes in the cluster. It
is responsible for determining when a process has failed, telling
processes which roles they should become, and passing system information
between all of the processes.
Master
~~~~~~
The master is responsible for coordinating the transition of the write
sub-system from one generation to the next. The write sub-system
includes the master, proxies, resolvers, and transaction logs. The three
roles are treated as a unit, and if any of them fail, we will recruit a
replacement for all three roles. The master provides the commit versions
for batches of the mutations to the proxies.
Historically, Ratekeeper and Data Distributor are coupled with Master on
the same process. Since 6.2, both have become a singleton in the
cluster. The life time is no longer tied with Master.
|image1|
Proxies
~~~~~~~
The proxies are responsible for providing read versions, committing
transactions, and tracking the storage servers responsible for each
range of keys. To provide a read version, a proxy will ask all other
proxies to see the largest committed version at this point in time,
while simultaneously checking that the transaction logs have not been
stopped. Ratekeeper will artificially slow down the rate at which the
proxy provides read versions.
Commits are accomplished by:
- Get a commit version from the master.
- Use the resolvers to determine if the transaction conflicts with
previously committed transactions.
- Make the transaction durable on the transaction logs.
The key space starting with the ``\xff`` byte is reserved for system
metadata. All mutations committed into this key space are distributed to
all of the proxies through the resolvers. This metadata includes a
mapping between key ranges and the storage servers which have the data
for that range of keys. The proxies provides this information to clients
on-demand. The clients cache this mapping; if they ask a storage server
for a key it does not have, they will clear their cache and get a more
up-to-date list of servers from the proxies.
Transaction Logs
~~~~~~~~~~~~~~~~
The transaction logs make mutations durable to disk for fast commit
latencies. The logs receive commits from the proxy in version order, and
only respond to the proxy once the data has been written and fsynced to
an append only mutation log on disk. Before the data is even written to
disk we forward it to the storage servers responsible for that mutation.
Once the storage servers have made the mutation durable, they pop it
from the log. This generally happens roughly 6 seconds after the
mutation was originally committed to the log. We only read from the
logs disk when the process has been rebooted. If a storage server has
failed, mutations bound for that storage server will build up on the
logs. Once data distribution makes a different storage server
responsible for all of the missing storage servers data we will discard
the log data bound for the failed server.
Resolvers
~~~~~~~~~
The resolvers are responsible determining conflicts between
transactions. A transaction conflicts if it reads a key that has been
written between the transactions read version and commit version. The
resolver does this by holding the last 5 seconds of committed writes in
memory, and comparing a new transactions reads against this set of
commits.
Storage Servers
~~~~~~~~~~~~~~~
The vast majority of processes in a cluster are storage servers. Storage
servers are assigned ranges of key, and are responsible to storing all
of the data for that range. They keep 5 seconds of mutations in memory,
and an on disk copy of the data as of 5 second ago. Clients must read at
a version within the last 5 seconds, or they will get a
``transaction_too_old`` error. The SSD storage engine stores the data in
a B-tree based on SQLite. The memory storage engine store the data in
memory with an append only log that is only read from disk if the
process is rebooted. In the upcoming FoundationDB 7.0 release, the
B-tree storage engine will be replaced with a brand new *Redwood*
engine.
Data Distributor
~~~~~~~~~~~~~~~~
Data distributor manages the lifetime of storage servers, decides which
storage server is responsible for which data range, and ensures data is
evenly distributed across all storage servers (SS). Data distributor as
a singleton in the cluster is recruited and monitored by Cluster
Controller. See `internal
documentation <https://github.com/apple/foundationdb/blob/master/design/data-distributor-internals.md>`__.
Ratekeeper
~~~~~~~~~~
Ratekeeper monitors system load and slows down client transaction rate
when the cluster is close to saturation by lowering the rate at which
the proxy provides read versions. Ratekeeper as a singleton in the
cluster is recruited and monitored by Cluster Controller.
Clients
~~~~~~~
A client links with specific language bindings (i.e., client libraries)
in order to communicate with a FoundationDB cluster. The language
bindings support loading multiple versions of C libraries, allowing the
client communicates with older version of the FoundationDB clusters.
Currently, C, Go, Python, Java, Ruby bindings are officially supported.
Transaction Processing
----------------------
A database transaction in FoundationDB starts by a client contacting one
of the Proxies to obtain a read version, which is guaranteed to be
larger than any of commit version that client may know about (even
through side channels outside the FoundationDB cluster). This is needed
so that a client will see the result of previous commits that have
happened.
Then the client may issue multiple reads to storage servers and obtain
values at that specific read version. Client writes are kept in local
memory without contacting the cluster. By default, reading a key that
was written in the same transaction will return the newly written value.
At commit time, the client sends the transaction data (all reads and
writes) to one of the Proxies and waits for commit or abort response
from the proxy. If the transaction conflicts with another one and cannot
commit, the client may choose to retry the transaction from the
beginning again. If the transaction commits, the proxy also returns the
commit version back to the client. Note this commit version is larger
than the read version and is chosen by the master.
The FoundationDB architecture separates the scaling of client reads and
writes (i.e., transaction commits). Because clients directly issue reads
to sharded storage servers, reads scale linearly to the number of
storage servers. Similarly, writes are scaled by adding more processes
to Proxies, Resolvers, and Log Servers in the transaction system.
Determine Read Version
~~~~~~~~~~~~~~~~~~~~~~
When a client requests a read version from a proxy, the proxy asks all
other proxies for their last commit versions, and checks a set of
transaction logs satisfying replication policy are live. Then the proxy
returns the maximum commit version as the read version to the client.
|image2|
The reason for the proxy to contact all other proxies for commit
versions is to ensure the read version is larger than any previously
committed version. Consider that if proxy ``A`` commits a transaction,
and then the client asks proxy ``B`` for a read version. The read
version from proxy ``B`` must be larger than the version committed by
proxy ``A``. The only way to get this information is by asking proxy
``A`` for its largest committed version.
The reason for checking a set of transaction logs satisfying replication
policy are live is to ensure the proxy is not replaced with newer
generation of proxies. This is because proxy is a stateless role
recruited in each generation. If a recovery has happened and the old
proxy is still live, this old proxy could still give out read versions.
As a result, a *read-only* transaction may see stale results (a
read-write transaction will be aborted). By checking a set of
transaction logs satisfying replication policy are live, the proxy makes
sure no recovery has happened, thus the *read-only* transaction sees the
latest data.
Note that the client cannot simply ask the master for read versions. The
master gives out versions to proxies to be committed, but the master
does not know when the versions it gives out are durable on the
transaction logs. Therefore it is not safe to do reads at the largest
version the master has provided because that version might be rolled
back in the event of a failure, so the client could end up reading data
that was never committed. In order for the client to use versions from
the master, the client needs to wait until all in-flight
transaction-batches (a write version is used for a batch of
transactions) to commit. This can take a long time and thus is
inefficient. Another drawback of this approach is putting more work
towards the master, because the master role cant be scaled. Even though
giving out read-versions isnt very expensive, it still requires the
master to get a transaction budget from the Ratekeeper, batches
requests, and potentially maintains thousands of network connections
from clients.
|image3|
Transaction Commit
~~~~~~~~~~~~~~~~~~
A client transaction commits in the following steps:
1. A client sends a transaction to a proxy.
2. The proxy asks the master for a commit version.
3. The master sends back a commit version that is higher than any commit
version seen before.
4. The proxy sends the read and write conflict ranges to the resolver(s)
with the commit version included.
5. The resolver responds back with whether the transaction has any
conflicts with previous transactions by sorting transactions
according to their commit versions and computing if such a serial
execution order is conflict-free.
- If there are conflicts, the proxy responds back to the client with
a not_committed error.
- If there are no conflicts, the proxy sends the mutations and
commit version of this transaction to the transaction logs.
6. Once the mutations are durable on the logs, the proxy responds back
success to the user.
Note the proxy sends each resolver their respective key ranges, if any
one of the resolvers detects a conflict then the transaction is not
committed. This has the flaw that if only one of the resolvers detects a
conflict, the other resolver will still think the transaction has
succeeded and may fail future transactions with overlapping write
conflict ranges, even though these future transaction can commit. In
practice, a well designed workload will only have a very small
percentage of conflicts, so this amplification will not affect
performance. Additionally, each transaction has a five seconds window.
After five seconds, resolvers will remove the conflict ranges of old
transactions, which also limits the chance of this type of false
conflict.
|image4|
|image5|
Background Work
~~~~~~~~~~~~~~~
There are a number of background work happening besides the transaction
processing:
- **Ratekeeper** collects statistic information from proxies,
transaction logs, and storage servers and compute the target
transaction rate for the cluster.
- **Data distribution** monitors all storage servers and perform load
balancing operations to evenly distribute data among all storage
servers.
- **Storage servers** pull mutations from transaction logs, write them
into storage engine to persist on disks.
- **Proxies** periodically send empty commits to transaction logs to
keep commit versions increasing, in case there is no client generated
transactions.
|image6|
Transaction System Recovery
~~~~~~~~~~~~~~~~~~~~~~~~~~~
The transaction system implements the write pipeline of the FoundationDB
cluster and its performance is critical to the transaction commit
latency. A typical recovery takes about a few hundred milliseconds, but
longer recovery time (usually a few seconds) can happen. Whenever there
is a failure in the transaction system, a recovery process is performed
to restore the transaction system to a new configuration, i.e., a clean
state. Specifically, the Master process monitors the health of Proxies,
Resolvers, and Transaction Logs. If any one of the monitored process
failed, the Master process terminates. The Cluster Controller will
detect this event, and then recruits a new Master, which coordinates the
recovery and recruits a new transaction system instance. In this way,
the transaction processing is divided into a number of epochs, where
each epoch represents a generation of the transaction system with its
unique Master process.
For each epoch, the Master initiates recovery in several steps. First,
the Master reads the previous transaction system states from
Coordinators and lock the coordinated states to prevent another Master
process from recovering at the same time. Then the Master recovers
previous transaction system states, including all Log Servers
Information, stops these Log Servers from accepting transactions, and
recruits a new set of Proxies, Resolvers, and Transaction Logs. After
previous Log Servers are stopped and new transaction system is
recruited, the Master writes the coordinated states with current
transaction system information. Finally, the Master accepts new
transaction commits. See details in this
`documentation <https://github.com/apple/foundationdb/blob/master/design/recovery-internals.md>`__.
Because Proxies and Resolvers are stateless, their recoveries have no
extra work. In contrast, Transaction Logs save the logs of committed
transactions, and we need to ensure all previously committed
transactions are durable and retrievable by storage servers. That is,
for any transactions that the Proxies may have sent back commit
response, their logs are persisted in multiple Log Servers (e.g., three
servers if replication degree is 3).
Finally, a recovery will *fast forward* time by 90 seconds, which would
abort any in-progress client transactions with ``transaction_too_old``
error. During retry, these client transactions will find the new
generation of transaction system and commit.
**``commit_result_unknown`` error:** If a recovery happened while a
transaction is committing (i.e., a proxy has sent mutations to
transaction logs). A client would have received
``commit_result_unknown``, and then retried the transaction. Its
completely permissible for FDB to commit both the first attempt, and the
second retry, as ``commit_result_unknown`` means the transaction may or
may not have committed. This is why its strongly recommended that
transactions should be idempotent, so that they handle
``commit_result_unknown`` correctly.
Resources
---------
`Forum
Post <https://forums.foundationdb.org/t/technical-overview-of-the-database/135/26>`__
`Existing Architecture
Documentation <https://github.com/apple/foundationdb/blob/master/documentation/sphinx/source/kv-architecture.rst>`__
`Summit
Presentation <https://www.youtube.com/watch?list=PLbzoR-pLrL6q7uYN-94-p_-Q3hyAmpI7o&v=EMwhsGsxfPU&feature=emb_logo>`__
`Data Distribution
Documentation <https://github.com/apple/foundationdb/blob/master/design/data-distributor-internals.md>`__
`Recovery
Documentation <https://github.com/apple/foundationdb/blob/master/design/recovery-internals.md>`__
.. |image1| image:: images/architecture-1.jpeg
.. |image2| image:: images/architecture-2.jpeg
.. |image3| image:: images/architecture-3.jpeg
.. |image4| image:: images/architecture-4.jpeg
.. |image5| image:: images/architecture-5.jpeg
.. |image6| image:: images/architecture-6.jpeg

Binary file not shown.

After

Width:  |  Height:  |  Size: 142 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 139 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 152 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 160 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 159 KiB

View File

@ -6,6 +6,7 @@ Release Notes
=====
* Fix an issue where ``fdbcli --exec 'exclude no_wait ...'`` would incorrectly report that processes can safely be removed from the cluster. `(PR #3566) <https://github.com/apple/foundationdb/pull/3566>`_
* When a configuration key is changed, it will always be included in ``status json`` output, even the value is reverted back to the default value. `(PR #3610) <https://github.com/apple/foundationdb/pull/3610>`_
6.3.4
=====

View File

@ -224,111 +224,123 @@ bool DatabaseConfiguration::isValid() const {
StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
StatusObject result;
if( initialized ) {
std::string tlogInfo = tLogPolicy->info();
std::string storageInfo = storagePolicy->info();
bool customRedundancy = false;
if( tLogWriteAntiQuorum == 0 ) {
if( tLogReplicationFactor == 1 && storageTeamSize == 1 ) {
result["redundancy_mode"] = "single";
} else if( tLogReplicationFactor == 2 && storageTeamSize == 2 ) {
result["redundancy_mode"] = "double";
} else if( tLogReplicationFactor == 4 && storageTeamSize == 6 && tlogInfo == "dcid^2 x zoneid^2 x 1" && storageInfo == "dcid^3 x zoneid^2 x 1" ) {
result["redundancy_mode"] = "three_datacenter";
} else if( tLogReplicationFactor == 4 && storageTeamSize == 4 && tlogInfo == "dcid^2 x zoneid^2 x 1" && storageInfo == "dcid^2 x zoneid^2 x 1" ) {
result["redundancy_mode"] = "three_datacenter_fallback";
} else if( tLogReplicationFactor == 3 && storageTeamSize == 3 ) {
result["redundancy_mode"] = "triple";
} else if( tLogReplicationFactor == 4 && storageTeamSize == 3 && tlogInfo == "data_hall^2 x zoneid^2 x 1" && storageInfo == "data_hall^3 x 1" ) {
result["redundancy_mode"] = "three_data_hall";
} else if( tLogReplicationFactor == 4 && storageTeamSize == 2 && tlogInfo == "data_hall^2 x zoneid^2 x 1" && storageInfo == "data_hall^2 x 1" ) {
result["redundancy_mode"] = "three_data_hall_fallback";
} else {
customRedundancy = true;
}
if (!initialized) {
return result;
}
std::string tlogInfo = tLogPolicy->info();
std::string storageInfo = storagePolicy->info();
bool customRedundancy = false;
if (tLogWriteAntiQuorum == 0) {
if (tLogReplicationFactor == 1 && storageTeamSize == 1) {
result["redundancy_mode"] = "single";
} else if (tLogReplicationFactor == 2 && storageTeamSize == 2) {
result["redundancy_mode"] = "double";
} else if (tLogReplicationFactor == 4 && storageTeamSize == 6 && tlogInfo == "dcid^2 x zoneid^2 x 1" &&
storageInfo == "dcid^3 x zoneid^2 x 1") {
result["redundancy_mode"] = "three_datacenter";
} else if (tLogReplicationFactor == 4 && storageTeamSize == 4 && tlogInfo == "dcid^2 x zoneid^2 x 1" &&
storageInfo == "dcid^2 x zoneid^2 x 1") {
result["redundancy_mode"] = "three_datacenter_fallback";
} else if (tLogReplicationFactor == 3 && storageTeamSize == 3) {
result["redundancy_mode"] = "triple";
} else if (tLogReplicationFactor == 4 && storageTeamSize == 3 && tlogInfo == "data_hall^2 x zoneid^2 x 1" &&
storageInfo == "data_hall^3 x 1") {
result["redundancy_mode"] = "three_data_hall";
} else if (tLogReplicationFactor == 4 && storageTeamSize == 2 && tlogInfo == "data_hall^2 x zoneid^2 x 1" &&
storageInfo == "data_hall^2 x 1") {
result["redundancy_mode"] = "three_data_hall_fallback";
} else {
customRedundancy = true;
}
if(customRedundancy) {
result["storage_replicas"] = storageTeamSize;
result["log_replicas"] = tLogReplicationFactor;
result["log_anti_quorum"] = tLogWriteAntiQuorum;
if(!noPolicies) result["storage_replication_policy"] = storagePolicy->info();
if(!noPolicies) result["log_replication_policy"] = tLogPolicy->info();
}
if ( tLogVersion > TLogVersion::DEFAULT ) {
result["log_version"] = (int)tLogVersion;
}
if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V1 && storageServerStoreType == KeyValueStoreType::SSD_BTREE_V1) {
result["storage_engine"] = "ssd-1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_BTREE_V2) {
result["storage_engine"] = "ssd-2";
} else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_REDWOOD_V1 ) {
result["storage_engine"] = "ssd-redwood-experimental";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
result["storage_engine"] = "ssd-rocksdb-experimental";
} else if( tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY ) {
result["storage_engine"] = "memory-1";
} else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE ) {
result["storage_engine"] = "memory-radixtree-beta";
} else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::MEMORY ) {
result["storage_engine"] = "memory-2";
} else {
result["storage_engine"] = "custom";
}
result["log_spill"] = (int)tLogSpillType;
if( remoteTLogReplicationFactor == 1 ) {
result["remote_redundancy_mode"] = "remote_single";
} else if( remoteTLogReplicationFactor == 2 ) {
result["remote_redundancy_mode"] = "remote_double";
} else if( remoteTLogReplicationFactor == 3 ) {
result["remote_redundancy_mode"] = "remote_triple";
} else if( remoteTLogReplicationFactor > 3 ) {
result["remote_log_replicas"] = remoteTLogReplicationFactor;
if(noPolicies && remoteTLogPolicy) result["remote_log_policy"] = remoteTLogPolicy->info();
}
result["usable_regions"] = usableRegions;
if(regions.size()) {
result["regions"] = getRegionJSON();
}
if( desiredTLogCount != -1 ) {
result["logs"] = desiredTLogCount;
}
if( masterProxyCount != -1 ) {
result["proxies"] = masterProxyCount;
}
if( resolverCount != -1 ) {
result["resolvers"] = resolverCount;
}
if( desiredLogRouterCount != -1 ) {
result["log_routers"] = desiredLogRouterCount;
}
if( remoteDesiredTLogCount != -1 ) {
result["remote_logs"] = remoteDesiredTLogCount;
}
if( repopulateRegionAntiQuorum != 0 ) {
result["repopulate_anti_quorum"] = repopulateRegionAntiQuorum;
}
if( autoMasterProxyCount != CLIENT_KNOBS->DEFAULT_AUTO_PROXIES ) {
result["auto_proxies"] = autoMasterProxyCount;
}
if (autoResolverCount != CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS) {
result["auto_resolvers"] = autoResolverCount;
}
if (autoDesiredTLogCount != CLIENT_KNOBS->DEFAULT_AUTO_LOGS) {
result["auto_logs"] = autoDesiredTLogCount;
}
result["backup_worker_enabled"] = (int32_t)backupWorkerEnabled;
} else {
customRedundancy = true;
}
if (customRedundancy) {
result["storage_replicas"] = storageTeamSize;
result["log_replicas"] = tLogReplicationFactor;
result["log_anti_quorum"] = tLogWriteAntiQuorum;
if (!noPolicies) result["storage_replication_policy"] = storagePolicy->info();
if (!noPolicies) result["log_replication_policy"] = tLogPolicy->info();
}
if (tLogVersion > TLogVersion::DEFAULT || isOverridden("log_version")) {
result["log_version"] = (int)tLogVersion;
}
if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V1 &&
storageServerStoreType == KeyValueStoreType::SSD_BTREE_V1) {
result["storage_engine"] = "ssd-1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::SSD_BTREE_V2) {
result["storage_engine"] = "ssd-2";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::SSD_REDWOOD_V1) {
result["storage_engine"] = "ssd-redwood-experimental";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
result["storage_engine"] = "ssd-rocksdb-experimental";
} else if (tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY) {
result["storage_engine"] = "memory-1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE) {
result["storage_engine"] = "memory-radixtree-beta";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::MEMORY) {
result["storage_engine"] = "memory-2";
} else {
result["storage_engine"] = "custom";
}
result["log_spill"] = (int)tLogSpillType;
if (remoteTLogReplicationFactor == 1) {
result["remote_redundancy_mode"] = "remote_single";
} else if (remoteTLogReplicationFactor == 2) {
result["remote_redundancy_mode"] = "remote_double";
} else if (remoteTLogReplicationFactor == 3) {
result["remote_redundancy_mode"] = "remote_triple";
} else if (remoteTLogReplicationFactor > 3) {
result["remote_log_replicas"] = remoteTLogReplicationFactor;
if (noPolicies && remoteTLogPolicy) result["remote_log_policy"] = remoteTLogPolicy->info();
}
result["usable_regions"] = usableRegions;
if (regions.size()) {
result["regions"] = getRegionJSON();
}
if (desiredTLogCount != -1 || isOverridden("logs")) {
result["logs"] = desiredTLogCount;
}
if (masterProxyCount != -1 || isOverridden("proxies")) {
result["proxies"] = masterProxyCount;
}
if (resolverCount != -1 || isOverridden("resolvers")) {
result["resolvers"] = resolverCount;
}
if (desiredLogRouterCount != -1 || isOverridden("log_routers")) {
result["log_routers"] = desiredLogRouterCount;
}
if (remoteDesiredTLogCount != -1 || isOverridden("remote_logs")) {
result["remote_logs"] = remoteDesiredTLogCount;
}
if (repopulateRegionAntiQuorum != 0 || isOverridden("repopulate_anti_quorum")) {
result["repopulate_anti_quorum"] = repopulateRegionAntiQuorum;
}
if (autoMasterProxyCount != CLIENT_KNOBS->DEFAULT_AUTO_PROXIES || isOverridden("auto_proxies")) {
result["auto_proxies"] = autoMasterProxyCount;
}
if (autoResolverCount != CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS || isOverridden("auto_resolvers")) {
result["auto_resolvers"] = autoResolverCount;
}
if (autoDesiredTLogCount != CLIENT_KNOBS->DEFAULT_AUTO_LOGS || isOverridden("auto_logs")) {
result["auto_logs"] = autoDesiredTLogCount;
}
result["backup_worker_enabled"] = (int32_t)backupWorkerEnabled;
return result;
}
@ -540,3 +552,31 @@ void DatabaseConfiguration::makeConfigurationImmutable() {
rawConfiguration[i++] = KeyValueRef( rawConfiguration.arena(), KeyValueRef( r->first, r->second ) );
mutableConfiguration = Optional<std::map<std::string,std::string>>();
}
void DatabaseConfiguration::fromKeyValues(Standalone<VectorRef<KeyValueRef>> rawConfig) {
resetInternal();
this->rawConfiguration = rawConfig;
for (auto c = rawConfiguration.begin(); c != rawConfiguration.end(); ++c) {
setInternal(c->key, c->value);
}
setDefaultReplicationPolicy();
}
bool DatabaseConfiguration::isOverridden(std::string key) const {
key = configKeysPrefix.toString() + key;
if (mutableConfiguration.present()) {
return mutableConfiguration.get().find(key) != mutableConfiguration.get().end();
}
const int keyLen = key.size();
for (auto iter = rawConfiguration.begin(); iter != rawConfiguration.end(); ++iter) {
const auto& rawConfKey = iter->key;
if (keyLen == rawConfKey.size() &&
strncmp(key.c_str(), reinterpret_cast<const char*>(rawConfKey.begin()), keyLen) == 0) {
return true;
}
}
return false;
}

View File

@ -219,13 +219,7 @@ struct DatabaseConfiguration {
}
}
void fromKeyValues( Standalone<VectorRef<KeyValueRef>> rawConfig ) {
resetInternal();
this->rawConfiguration = rawConfig;
for(auto c=rawConfiguration.begin(); c!=rawConfiguration.end(); ++c)
setInternal(c->key, c->value);
setDefaultReplicationPolicy();
}
void fromKeyValues(Standalone<VectorRef<KeyValueRef>> rawConfig);
private:
Optional< std::map<std::string, std::string> > mutableConfiguration; // If present, rawConfiguration is not valid
@ -237,6 +231,9 @@ private:
bool setInternal( KeyRef key, ValueRef value );
void resetInternal();
void setDefaultReplicationPolicy();
/// Check if the key is overridden by either mutableConfiguration or rawConfiguration
bool isOverridden(std::string key) const;
};
#endif

View File

@ -53,6 +53,7 @@ struct RestoreSendVersionedMutationsRequest;
struct RestoreSysInfo;
struct RestoreApplierInterface;
struct RestoreFinishRequest;
struct RestoreSamplesRequest;
// RestoreSysInfo includes information each (type of) restore roles should know.
// At this moment, it only include appliers. We keep the name for future extension.
@ -203,6 +204,31 @@ struct RestoreApplierInterface : RestoreRoleInterface {
std::string toString() { return nodeID.toString(); }
};
struct RestoreControllerInterface : RestoreRoleInterface {
constexpr static FileIdentifier file_identifier = 54253047;
RequestStream<RestoreSamplesRequest> samples;
bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); }
RestoreControllerInterface() {
role = RestoreRole::Controller;
nodeID = deterministicRandom()->randomUniqueID();
}
NetworkAddress address() const { return samples.getEndpoint().addresses.address; }
void initEndpoints() { samples.getEndpoint(TaskPriority::LoadBalancedEndpoint); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, *(RestoreRoleInterface*)this, samples);
}
std::string toString() { return nodeID.toString(); }
};
// RestoreAsset uniquely identifies the work unit done by restore roles;
// It is used to ensure exact-once processing on restore loader and applier;
// By combining all RestoreAssets across all verstion batches, restore should process all mutations in
@ -361,22 +387,25 @@ struct RestoreRecruitRoleReply : TimedRequest {
struct RestoreRecruitRoleRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 3136280;
RestoreControllerInterface ci;
RestoreRole role;
int nodeIndex; // Each role is a node
ReplyPromise<RestoreRecruitRoleReply> reply;
RestoreRecruitRoleRequest() : role(RestoreRole::Invalid) {}
explicit RestoreRecruitRoleRequest(RestoreRole role, int nodeIndex) : role(role), nodeIndex(nodeIndex) {}
explicit RestoreRecruitRoleRequest(RestoreControllerInterface ci, RestoreRole role, int nodeIndex)
: ci(ci), role(role), nodeIndex(nodeIndex) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, role, nodeIndex, reply);
serializer(ar, ci, role, nodeIndex, reply);
}
std::string printable() {
std::stringstream ss;
ss << "RestoreRecruitRoleRequest Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex;
ss << "RestoreRecruitRoleRequest Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex
<< " RestoreController:" << ci.id().toString();
return ss.str();
}
@ -410,26 +439,47 @@ struct RestoreSysInfoRequest : TimedRequest {
}
};
struct RestoreLoadFileReply : TimedRequest {
constexpr static FileIdentifier file_identifier = 523470;
struct RestoreSamplesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 34077901;
UID id; // deduplicate data
int batchIndex;
SampledMutationsVec samples; // sampled mutations
LoadingParam param;
MutationsVec samples; // sampled mutations
bool isDuplicated; // true if loader thinks the request is a duplicated one
ReplyPromise<RestoreCommonReply> reply;
RestoreLoadFileReply() = default;
explicit RestoreLoadFileReply(LoadingParam param, MutationsVec samples, bool isDuplicated)
: param(param), samples(samples), isDuplicated(isDuplicated) {}
RestoreSamplesRequest() = default;
explicit RestoreSamplesRequest(UID id, int batchIndex, SampledMutationsVec samples)
: id(id), batchIndex(batchIndex), samples(samples) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, param, samples, isDuplicated);
serializer(ar, id, batchIndex, samples, reply);
}
std::string toString() {
std::stringstream ss;
ss << "LoadingParam:" << param.toString() << " samples.size:" << samples.size()
<< " isDuplicated:" << isDuplicated;
ss << "ID:" << id.toString() << " BatchIndex:" << batchIndex << " samples:" << samples.size();
return ss.str();
}
};
struct RestoreLoadFileReply : TimedRequest {
constexpr static FileIdentifier file_identifier = 523470;
LoadingParam param;
bool isDuplicated; // true if loader thinks the request is a duplicated one
RestoreLoadFileReply() = default;
explicit RestoreLoadFileReply(LoadingParam param, bool isDuplicated) : param(param), isDuplicated(isDuplicated) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, param, isDuplicated);
}
std::string toString() {
std::stringstream ss;
ss << "LoadingParam:" << param.toString() << " isDuplicated:" << isDuplicated;
return ss.str();
}
};

View File

@ -611,7 +611,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_NUM_LOADERS, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 2.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; } // too small value may increase chance of TooManyFile error
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 30 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
@ -638,6 +638,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_NOT_WRITE_DB, false ); // Perf test only: set it to true will cause simulation failure
init( FASTRESTORE_USE_RANGE_FILE, true ); // Perf test only: set it to false will cause simulation failure
init( FASTRESTORE_USE_LOG_FILE, true ); // Perf test only: set it to false will cause simulation failure
init( FASTRESTORE_SAMPLE_MSG_BYTES, 1048576 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLE_MSG_BYTES = deterministicRandom()->random01() * 2048;}
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );

View File

@ -570,6 +570,7 @@ public:
bool FASTRESTORE_NOT_WRITE_DB; // do not write result to DB. Only for dev testing
bool FASTRESTORE_USE_RANGE_FILE; // use range file in backup
bool FASTRESTORE_USE_LOG_FILE; // use log file in backup
int64_t FASTRESTORE_SAMPLE_MSG_BYTES; // sample message desired size
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.

View File

@ -185,7 +185,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
state int retries = 0;
state double numOps = 0;
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
TraceEvent("FastRestoreApplierClearRangeMutationsStart", applierID)
TraceEvent(delayTime > 5 ? SevWarnAlways : SevInfo, "FastRestoreApplierClearRangeMutationsStart", applierID)
.detail("BatchIndex", batchIndex)
.detail("Ranges", ranges.size())
.detail("DelayTime", delayTime);
@ -558,7 +558,10 @@ ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Referen
wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx));
wait(applyStagingKeys(batchData, applierID, batchIndex, cx));
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID).detail("BatchIndex", batchIndex);
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID)
.detail("BatchIndex", batchIndex)
.detail("AppliedBytes", batchData->appliedBytes)
.detail("ReceivedBytes", batchData->receivedBytes);
return Void();
}

View File

@ -253,8 +253,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
long receiveMutationReqs;
// Stats
double receivedBytes;
double appliedBytes;
long receivedBytes;
long appliedBytes;
// Status counters
struct Counters {
@ -281,7 +281,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
explicit ApplierBatchData(UID nodeID, int batchIndex)
: counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM),
vbState(ApplierVersionBatchState::NOT_INIT) {
vbState(ApplierVersionBatchState::NOT_INIT), receiveMutationReqs(0), receivedBytes(0), appliedBytes(0) {
pollMetrics = traceCounters(format("FastRestoreApplierMetrics%d", batchIndex), nodeID,
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));

View File

@ -73,9 +73,43 @@ ACTOR static Future<Void> checkRolesLiveness(Reference<RestoreControllerData> se
void splitKeyRangeForAppliers(Reference<ControllerBatchData> batchData,
std::map<UID, RestoreApplierInterface> appliersInterf, int batchIndex);
ACTOR Future<Void> sampleBackups(Reference<RestoreControllerData> self, RestoreControllerInterface ci) {
loop {
try {
RestoreSamplesRequest req = waitNext(ci.samples.getFuture());
TraceEvent(SevDebug, "FastRestoreControllerSampleBackups")
.detail("SampleID", req.id)
.detail("BatchIndex", req.batchIndex)
.detail("Samples", req.samples.size());
ASSERT(req.batchIndex < self->batch.size());
Reference<ControllerBatchData> batch = self->batch[req.batchIndex];
if (batch->sampleMsgs.find(req.id) != batch->sampleMsgs.end()) {
req.reply.send(RestoreCommonReply(req.id));
continue;
}
batch->sampleMsgs.insert(req.id);
for (auto& m : req.samples) {
batch->samples.addMetric(m.key, m.size);
batch->samplesSize += m.size;
}
req.reply.send(RestoreCommonReply(req.id));
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestoreControllerSampleBackupsError", self->id()).error(e);
break;
}
}
return Void();
}
ACTOR Future<Void> startRestoreController(Reference<RestoreWorkerData> controllerWorker, Database cx) {
state Reference<RestoreControllerData> self = Reference<RestoreControllerData>(new RestoreControllerData());
state ActorCollectionNoErrors actors;
state ActorCollection actors(false);
ASSERT(controllerWorker.isValid());
ASSERT(controllerWorker->controllerInterf.present());
state Reference<RestoreControllerData> self =
Reference<RestoreControllerData>(new RestoreControllerData(controllerWorker->controllerInterf.get().id()));
try {
// recruitRestoreRoles must come after controllerWorker has finished collectWorkerInterface
@ -85,6 +119,7 @@ ACTOR Future<Void> startRestoreController(Reference<RestoreWorkerData> controlle
actors.add(checkRolesLiveness(self));
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreController"));
actors.add(sampleBackups(self, controllerWorker->controllerInterf.get()));
wait(startProcessRestoreRequests(self, cx));
} catch (Error& e) {
@ -107,6 +142,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
.detail("NumLoaders", SERVER_KNOBS->FASTRESTORE_NUM_LOADERS)
.detail("NumAppliers", SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS);
ASSERT(controllerData->loadersInterf.empty() && controllerData->appliersInterf.empty());
ASSERT(controllerWorker->controllerInterf.present());
ASSERT(controllerData.isValid());
ASSERT(SERVER_KNOBS->FASTRESTORE_NUM_LOADERS > 0 && SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS > 0);
@ -129,7 +165,8 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
}
TraceEvent("FastRestoreController", controllerData->id()).detail("WorkerNode", workerInterf.first);
requests.emplace_back(workerInterf.first, RestoreRecruitRoleRequest(role, nodeIndex));
requests.emplace_back(workerInterf.first,
RestoreRecruitRoleRequest(controllerWorker->controllerInterf.get(), role, nodeIndex));
nodeIndex++;
}
@ -146,6 +183,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
TraceEvent(SevError, "FastRestoreController").detail("RecruitRestoreRolesInvalidRole", reply.role);
}
}
controllerData->recruitedRoles.send(Void());
TraceEvent("FastRestoreRecruitRestoreRolesDone", controllerData->id())
.detail("Workers", controllerWorker->workerInterfaces.size())
.detail("RecruitedRoles", replies.size());
@ -229,13 +267,13 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreControllerData>
} catch (Error& e) {
if (restoreIndex < restoreRequests.size()) {
TraceEvent(SevError, "FastRestoreControllerProcessRestoreRequestsFailed", self->id())
.detail("RestoreRequest", restoreRequests[restoreIndex].toString())
.error(e);
.error(e)
.detail("RestoreRequest", restoreRequests[restoreIndex].toString());
} else {
TraceEvent(SevError, "FastRestoreControllerProcessRestoreRequestsFailed", self->id())
.error(e)
.detail("RestoreRequests", restoreRequests.size())
.detail("RestoreIndex", restoreIndex)
.error(e);
.detail("RestoreIndex", restoreIndex);
}
}
@ -270,6 +308,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
state Version targetVersion =
wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, &minRangeVersion, cx, request));
ASSERT(targetVersion > 0);
ASSERT(minRangeVersion != MAX_VERSION); // otherwise, all mutations will be skipped
std::sort(rangeFiles.begin(), rangeFiles.end());
std::sort(logFiles.begin(), logFiles.end(), [](RestoreFileFR const& f1, RestoreFileFR const& f2) -> bool {
@ -453,12 +492,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<ControllerBatchData> batc
.detail("RestoreAsset", reply.param.asset.toString())
.detail("UnexpectedReply", reply.toString());
}
// Update sampled data
for (int i = 0; i < reply.samples.size(); ++i) {
MutationRef mutation = reply.samples[i];
batchData->samples.addMetric(mutation.param1, mutation.weightedTotalSize());
batchData->samplesSize += mutation.weightedTotalSize();
}
}
// Sanity check: all restore assets status should be Loaded
@ -737,6 +770,9 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
*minRangeVersion = std::min(*minRangeVersion, file.version);
}
}
if (MAX_VERSION == *minRangeVersion) {
*minRangeVersion = 0; // If no range file, range version must be 0 so that we apply all mutations
}
if (SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE) {
for (const LogFile& f : restorable.get().logs) {
@ -1007,6 +1043,8 @@ ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreControllerData
// Update the most recent time when controller receives hearbeat from each loader and applier
ACTOR static Future<Void> updateHeartbeatTime(Reference<RestoreControllerData> self) {
wait(self->recruitedRoles.getFuture());
int numRoles = self->loadersInterf.size() + self->appliersInterf.size();
state std::map<UID, RestoreLoaderInterface>::iterator loader = self->loadersInterf.begin();
state std::map<UID, RestoreApplierInterface>::iterator applier = self->appliersInterf.begin();

View File

@ -74,9 +74,11 @@ struct ControllerBatchData : public ReferenceCounted<ControllerBatchData> {
// sent.
// KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Key, UID> rangeToApplier;
Optional<Future<Void>> applyToDB;
IndexedSet<Key, int64_t> samples; // sample of range and log files
double samplesSize; // sum of the metric of all samples
Optional<Future<Void>> applyToDB;
std::set<UID> sampleMsgs; // deduplicate sample messages
ControllerBatchData() = default;
~ControllerBatchData() = default;
@ -150,9 +152,9 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
void addref() { return ReferenceCounted<RestoreControllerData>::addref(); }
void delref() { return ReferenceCounted<RestoreControllerData>::delref(); }
RestoreControllerData() {
RestoreControllerData(UID interfId) {
role = RestoreRole::Controller;
nodeID = UID();
nodeID = interfId;
runningVersionBatches.set(0);
}

View File

@ -27,6 +27,7 @@
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/StorageMetrics.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -42,7 +43,7 @@ void splitMutation(const KeyRangeMap<UID>& krMap, MutationRef m, Arena& mvector_
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
@ -57,13 +58,14 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
Reference<IBackupContainer> bc, RestoreAsset asset);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
Version version, RestoreAsset asset);
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
Reference<IBackupContainer> bc, Version version, RestoreAsset asset);
ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx) {
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx,
RestoreControllerInterface ci) {
state Reference<RestoreLoaderData> self =
Reference<RestoreLoaderData>(new RestoreLoaderData(loaderInterf.id(), nodeIndex));
Reference<RestoreLoaderData>(new RestoreLoaderData(loaderInterf.id(), nodeIndex, ci));
state ActorCollection actors(false);
state Future<Void> exitRole = Never();
@ -114,7 +116,8 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
}
}
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestoreLoaderError", self->id())
TraceEvent(e.code() == error_code_broken_promise ? SevError : SevWarnAlways, "FastRestoreLoaderError",
self->id())
.detail("RequestType", requestTypeStr)
.error(e, true);
actors.clear(false);
@ -126,11 +129,13 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
}
static inline bool _logMutationTooOld(KeyRangeMap<Version>* pRangeVersions, KeyRangeRef keyRange, Version v) {
ASSERT(pRangeVersions != nullptr);
auto ranges = pRangeVersions->intersectingRanges(keyRange);
Version minVersion = MAX_VERSION;
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
minVersion = std::min(minVersion, r->value());
}
ASSERT(minVersion != MAX_VERSION); // pRangeVersions is initialized as entired keyspace, ranges cannot be empty
return minVersion >= v;
}
@ -178,8 +183,8 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
KeyRangeMap<Version>* pRangeVersions, NotifiedVersion* processedFileOffset,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
RestoreAsset asset) {
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
Reference<IBackupContainer> bc, RestoreAsset asset) {
state Standalone<StringRef> buf = makeString(asset.len);
state Reference<IAsyncFile> file = wait(bc->readFile(asset.filename));
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
@ -263,9 +268,13 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
.detail("CommitVersion", msgVersion.toString())
.detail("ParsedMutation", mutation.toString());
it->second.push_back_deep(it->second.arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
samplesIter->second.push_back_deep(samplesIter->second.arena(), mutation);
cc->loadedLogBytes += mutation.totalSize();
// Sampling data similar to SS sample kvs
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samplesIter->second.push_back_deep(samplesIter->second.arena(),
SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
}
@ -295,7 +304,7 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = batchData->kvOpsPerLP.end();
state std::map<LoadingParam, MutationsVec>::iterator samplesIter = batchData->sampleMutations.end();
state std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter = batchData->sampleMutations.end();
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestoreLoaderProcessLoadingParam", loaderID).detail("LoadingParam", param.toString());
@ -307,7 +316,7 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
bool inserted;
std::tie(kvOpsPerLPIter, inserted) = batchData->kvOpsPerLP.emplace(param, VersionedMutationsMap());
ASSERT(inserted);
std::tie(samplesIter, inserted) = batchData->sampleMutations.emplace(param, MutationsVec());
std::tie(samplesIter, inserted) = batchData->sampleMutations.emplace(param, SampledMutationsVec());
ASSERT(inserted);
for (int64_t j = param.asset.offset; j < param.asset.len; j += param.blockSize) {
@ -381,7 +390,41 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ASSERT(it != batchData->processedFileParams.end());
wait(it->second); // wait on the processing of the req.param.
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param], isDuplicated));
// Send sampled mutations back to controller: batchData->sampleMutations[req.param]
std::vector<Future<RestoreCommonReply>> fSendSamples;
SampledMutationsVec& samples = batchData->sampleMutations[req.param];
SampledMutationsVec sampleBatch = SampledMutationsVec(); // sampleBatch: Standalone pointer to the created object
long sampleBatchSize = 0;
for (int i = 0; i < samples.size(); ++i) {
sampleBatchSize += samples[i].totalSize();
sampleBatch.push_back_deep(sampleBatch.arena(), samples[i]); // TODO: may not need deep copy
if (sampleBatchSize >= SERVER_KNOBS->FASTRESTORE_SAMPLE_MSG_BYTES) {
fSendSamples.push_back(self->ci.samples.getReply(
RestoreSamplesRequest(deterministicRandom()->randomUniqueID(), req.batchIndex, sampleBatch)));
sampleBatchSize = 0;
sampleBatch = SampledMutationsVec();
}
}
if (sampleBatchSize > 0) {
fSendSamples.push_back(self->ci.samples.getReply(
RestoreSamplesRequest(deterministicRandom()->randomUniqueID(), req.batchIndex, sampleBatch)));
sampleBatchSize = 0;
}
try {
state int samplesMessages = fSendSamples.size();
wait(waitForAll(fSendSamples));
} catch (Error& e) { // In case ci.samples throws broken_promise due to unstable network
if (e.code() == error_code_broken_promise) {
TraceEvent(SevWarnAlways, "FastRestoreLoaderPhaseLoadFileSendSamples")
.detail("SamplesMessages", samplesMessages);
} else {
TraceEvent(SevError, "FastRestoreLoaderPhaseLoadFileSendSamplesUnexpectedError").error(e, true);
}
}
// Ack restore controller the param is processed
req.reply.send(RestoreLoadFileReply(req.param, isDuplicated));
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFileDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("ProcessLoadParam", req.param.toString());
@ -729,10 +772,10 @@ bool concatenateBackupMutationForLogFile(SerializedMutationListMap* pMutationMap
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset) {
VersionedMutationsMap& kvOps = kvOpsIter->second;
MutationsVec& samples = samplesIter->second;
SampledMutationsVec& samples = samplesIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap;
TraceEvent(SevFRMutationInfo, "FastRestoreLoaderParseSerializedLogMutation")
@ -812,10 +855,11 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
ASSERT(sub < std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
cc->sampledLogBytes += mutation.totalSize();
samples.push_back_deep(samples.arena(), mutation);
// Sampling data similar to how SS sample bytes
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samples.push_back_deep(samples.arena(), SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
@ -831,10 +875,10 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
// asset: RestoreAsset about which backup data should be parsed
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
Version version, RestoreAsset asset) {
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
Reference<IBackupContainer> bc, Version version, RestoreAsset asset) {
state VersionedMutationsMap& kvOps = kvOpsIter->second;
state MutationsVec& sampleMutations = samplesIter->second;
state SampledMutationsVec& sampleMutations = samplesIter->second;
TraceEvent(SevFRDebugInfo, "FastRestoreDecodedRangeFile")
.detail("Filename", asset.filename)
@ -912,9 +956,10 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
it.first->second.push_back_deep(it.first->second.arena(), m);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
cc->sampledRangeBytes += m.totalSize();
sampleMutations.push_back_deep(sampleMutations.arena(), m);
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(m.param1, m.param2));
if (sampleInfo.inSample) {
cc->sampledRangeBytes += sampleInfo.sampledSize;
sampleMutations.push_back_deep(sampleMutations.arena(), SampledMutation(m.param1, sampleInfo.sampledSize));
}
}

View File

@ -70,7 +70,7 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
std::map<Key, UID> rangeToApplier;
// Sampled mutations to be sent back to restore controller
std::map<LoadingParam, MutationsVec> sampleMutations;
std::map<LoadingParam, SampledMutationsVec> sampleMutations;
int numSampledMutations; // The total number of mutations received from sampled data.
Future<Void> pollMetrics;
@ -132,6 +132,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
// buffered data per version batch
std::map<int, Reference<LoaderBatchData>> batch;
std::map<int, Reference<LoaderBatchStatus>> status;
RestoreControllerInterface ci;
KeyRangeMap<Version> rangeVersions;
@ -141,7 +142,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
void addref() { return ReferenceCounted<RestoreLoaderData>::addref(); }
void delref() { return ReferenceCounted<RestoreLoaderData>::delref(); }
explicit RestoreLoaderData(UID loaderInterfID, int assignedIndex) {
explicit RestoreLoaderData(UID loaderInterfID, int assignedIndex, RestoreControllerInterface ci) : ci(ci) {
nodeID = loaderInterfID;
nodeIndex = assignedIndex;
role = RestoreRole::Loader;
@ -191,7 +192,8 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
}
};
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx);
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx,
RestoreControllerInterface ci);
#include "flow/unactorcompiler.h"
#endif

View File

@ -100,6 +100,7 @@ public:
std::map<UID, RestoreLoaderInterface> loadersInterf; // UID: loaderInterf's id
std::map<UID, RestoreApplierInterface> appliersInterf; // UID: applierInterf's id
Promise<Void> recruitedRoles; // sent when loaders and appliers are recruited
NotifiedVersion versionBatchId; // The index of the version batch that has been initialized and put into pipeline
NotifiedVersion finishedBatch; // The highest batch index all appliers have applied mutations

View File

@ -58,9 +58,26 @@ struct VersionedMutation {
}
};
struct SampledMutation {
KeyRef key;
long size;
explicit SampledMutation(KeyRef key, long size) : key(key), size(size) {}
explicit SampledMutation(Arena& arena, const SampledMutation& sm) : key(arena, sm.key), size(sm.size) {}
SampledMutation() = default;
int totalSize() { return key.size() + sizeof(size); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, size);
}
};
using MutationsVec = Standalone<VectorRef<MutationRef>>;
using LogMessageVersionVec = Standalone<VectorRef<LogMessageVersion>>;
using VersionedMutationsVec = Standalone<VectorRef<VersionedMutation>>;
using SampledMutationsVec = Standalone<VectorRef<SampledMutation>>;
enum class RestoreRole { Invalid = 0, Controller = 1, Loader, Applier };
BINARY_SERIALIZABLE(RestoreRole);

View File

@ -88,6 +88,7 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
if (req.role == RestoreRole::Loader) {
ASSERT(!self->loaderInterf.present());
self->controllerInterf = req.ci;
self->loaderInterf = RestoreLoaderInterface();
self->loaderInterf.get().initEndpoints();
RestoreLoaderInterface& recruited = self->loaderInterf.get();
@ -100,12 +101,13 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
DUMPTOKEN(recruited.finishVersionBatch);
DUMPTOKEN(recruited.collectRestoreRoleInterfaces);
DUMPTOKEN(recruited.finishRestore);
actors->add(restoreLoaderCore(self->loaderInterf.get(), req.nodeIndex, cx));
actors->add(restoreLoaderCore(self->loaderInterf.get(), req.nodeIndex, cx, req.ci));
TraceEvent("FastRestoreWorker").detail("RecruitedLoaderNodeIndex", req.nodeIndex);
req.reply.send(
RestoreRecruitRoleReply(self->loaderInterf.get().id(), RestoreRole::Loader, self->loaderInterf.get()));
} else if (req.role == RestoreRole::Applier) {
ASSERT(!self->applierInterf.present());
self->controllerInterf = req.ci;
self->applierInterf = RestoreApplierInterface();
self->applierInterf.get().initEndpoints();
RestoreApplierInterface& recruited = self->applierInterf.get();
@ -202,6 +204,10 @@ ACTOR Future<Void> startRestoreWorkerLeader(Reference<RestoreWorkerData> self, R
// TODO: Needs to keep this monitor's future. May use actorCollection
state Future<Void> workersFailureMonitor = monitorWorkerLiveness(self);
RestoreControllerInterface recruited;
DUMPTOKEN(recruited.samples);
self->controllerInterf = recruited;
wait(startRestoreController(self, cx) || workersFailureMonitor);
return Void();

View File

@ -49,6 +49,7 @@ struct RestoreWorkerData : NonCopyable, public ReferenceCounted<RestoreWorkerDa
std::map<UID, RestoreWorkerInterface> workerInterfaces; // UID is worker's node id, RestoreWorkerInterface is worker's communication workerInterface
// Restore Roles
Optional<RestoreControllerInterface> controllerInterf;
Optional<RestoreLoaderInterface> loaderInterf;
Optional<RestoreApplierInterface> applierInterf;

View File

@ -79,4 +79,41 @@ TEST_CASE("/flow/Deque/max_size") {
return Void();
}
struct RandomlyThrows {
int data = 0;
RandomlyThrows() = default;
explicit RandomlyThrows(int data) : data(data) {}
~RandomlyThrows() = default;
RandomlyThrows(const RandomlyThrows& other) : data(other.data) { randomlyThrow(); }
RandomlyThrows& operator=(const RandomlyThrows& other) {
data = other.data;
randomlyThrow();
return *this;
}
private:
void randomlyThrow() {
if (deterministicRandom()->random01() < 0.1) {
throw success();
}
}
};
TEST_CASE("/flow/Deque/grow_exception_safety") {
Deque<RandomlyThrows> q;
for (int i = 0; i < 100; ++i) {
loop {
try {
q.push_back(RandomlyThrows{ i });
break;
} catch (Error& e) {
}
}
}
for (int i = 0; i < 100; ++i) {
ASSERT(q[i].data == i);
}
return Void();
}
void forceLinkDequeTests() {}

View File

@ -181,10 +181,18 @@ private:
if (newSize > max_size()) throw std::bad_alloc();
//printf("Growing to %lld (%u-%u mask %u)\n", (long long)newSize, begin, end, mask);
T* newArr = (T*)aligned_alloc(std::max(__alignof(T), sizeof(void*)),
newSize * sizeof(T)); // SOMEDAY: FastAllocator, exception safety
newSize * sizeof(T)); // SOMEDAY: FastAllocator
ASSERT(newArr != nullptr);
for (int i = begin; i != end; i++) {
new (&newArr[i - begin]) T(std::move(arr[i&mask]));
try {
new (&newArr[i - begin]) T(std::move_if_noexcept(arr[i & mask]));
} catch (...) {
cleanup(newArr, i-begin);
throw;
}
}
for (int i = begin; i != end; i++) {
static_assert(std::is_nothrow_destructible_v<T>);
arr[i&mask].~T();
}
aligned_free(arr);
@ -194,7 +202,14 @@ private:
mask = uint32_t(newSize - 1);
}
void cleanup() {
static void cleanup(T *data, size_t size) noexcept {
for (int i = 0; i < size; ++i) {
data[i].~T();
}
aligned_free(data);
}
void cleanup() noexcept {
for (int i = begin; i != end; i++)
arr[i&mask].~T();
if(arr)