Merge branch 'release-6.2' into mengxu/merge-release620-to-master-v3

Resolved Conflicts:
	documentation/sphinx/source/release-notes.rst
	fdbserver/DataDistribution.actor.cpp
	versions.target
This commit is contained in:
Meng Xu 2019-10-02 13:22:56 -07:00
commit d0147e5e5d
35 changed files with 613 additions and 249 deletions

View File

@ -107,12 +107,10 @@ fdb_error_t fdb_network_set_option( FDBNetworkOption option,
API->setNetworkOption( (FDBNetworkOptions::Option)option, value ? StringRef( value, value_length ) : Optional<StringRef>() ); );
}
extern "C"
fdb_error_t fdb_setup_network_impl() {
CATCH_AND_RETURN( API->setupNetwork(); );
}
extern "C"
fdb_error_t fdb_setup_network_v13( const char* localAddress ) {
fdb_error_t errorCode = fdb_network_set_option( FDB_NET_OPTION_LOCAL_ADDRESS, (uint8_t const*)localAddress, strlen(localAddress) );
if(errorCode != 0)
@ -159,7 +157,6 @@ fdb_error_t fdb_future_block_until_ready( FDBFuture* f ) {
CATCH_AND_RETURN( TSAVB(f)->blockUntilReady(); );
}
extern "C" DLLEXPORT
fdb_bool_t fdb_future_is_error_v22( FDBFuture* f ) {
return TSAVB(f)->isError();
}
@ -200,12 +197,10 @@ fdb_error_t fdb_future_set_callback( FDBFuture* f,
CATCH_AND_RETURN( TSAVB(f)->callOrSetAsCallback( cb, ignore, 0 ); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_error_impl( FDBFuture* f ) {
return TSAVB(f)->getErrorCode();
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_error_v22( FDBFuture* f, const char** description ) {
if ( !( TSAVB(f)->isError() ) )
return error_code_future_not_error;
@ -214,7 +209,6 @@ fdb_error_t fdb_future_get_error_v22( FDBFuture* f, const char** description ) {
return TSAVB(f)->error.code();
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_version_v619( FDBFuture* f, int64_t* out_version ) {
CATCH_AND_RETURN( *out_version = TSAV(Version, f)->get(); );
}
@ -233,14 +227,12 @@ fdb_error_t fdb_future_get_key( FDBFuture* f, uint8_t const** out_key,
*out_key_length = key.size(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_cluster_v609( FDBFuture* f, FDBCluster** out_cluster ) {
CATCH_AND_RETURN(
*out_cluster = (FDBCluster*)
( (TSAV( char*, f )->get() ) ); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_database_v609( FDBFuture* f, FDBDatabase** out_database ) {
CATCH_AND_RETURN(
*out_database = (FDBDatabase*)
@ -259,7 +251,6 @@ fdb_error_t fdb_future_get_value( FDBFuture* f, fdb_bool_t* out_present,
} );
}
extern "C"
fdb_error_t fdb_future_get_keyvalue_array_impl(
FDBFuture* f, FDBKeyValue const** out_kv,
int* out_count, fdb_bool_t* out_more )
@ -271,7 +262,6 @@ fdb_error_t fdb_future_get_keyvalue_array_impl(
*out_more = rrr.more; );
}
extern "C"
fdb_error_t fdb_future_get_keyvalue_array_v13(
FDBFuture* f, FDBKeyValue const** out_kv, int* out_count)
{
@ -281,7 +271,7 @@ fdb_error_t fdb_future_get_keyvalue_array_v13(
*out_count = rrr.size(); );
}
extern "C"
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_string_array(
FDBFuture* f, const char*** out_strings, int* out_count)
{
@ -292,7 +282,6 @@ fdb_error_t fdb_future_get_string_array(
);
}
extern "C" DLLEXPORT
FDBFuture* fdb_create_cluster_v609( const char* cluster_file_path ) {
char *path;
if(cluster_file_path) {
@ -306,7 +295,6 @@ FDBFuture* fdb_create_cluster_v609( const char* cluster_file_path ) {
return (FDBFuture*)ThreadFuture<char*>(path).extractPtr();
}
extern "C" DLLEXPORT
fdb_error_t fdb_cluster_set_option_v609( FDBCluster* c,
FDBClusterOption option,
uint8_t const* value,
@ -316,12 +304,19 @@ fdb_error_t fdb_cluster_set_option_v609( FDBCluster* c,
return error_code_success;
}
extern "C" DLLEXPORT
void fdb_cluster_destroy_v609( FDBCluster* c ) {
CATCH_AND_DIE( delete[] CLUSTER(c); );
}
extern "C" DLLEXPORT
// This exists so that fdb_cluster_create_database doesn't need to call the public symbol fdb_create_database.
// If it does and this is an external client loaded though the multi-version API, then it may inadvertently call
// the version of the function in the primary library if it was loaded into the global symbols.
fdb_error_t fdb_create_database_impl( const char* cluster_file_path, FDBDatabase** out_database ) {
CATCH_AND_RETURN(
*out_database = (FDBDatabase*)API->createDatabase( cluster_file_path ? cluster_file_path : "" ).extractPtr();
);
}
FDBFuture* fdb_cluster_create_database_v609( FDBCluster* c, uint8_t const* db_name,
int db_name_length )
{
@ -330,7 +325,7 @@ FDBFuture* fdb_cluster_create_database_v609( FDBCluster* c, uint8_t const* db_na
}
FDBDatabase *db;
fdb_error_t err = fdb_create_database(CLUSTER(c), &db);
fdb_error_t err = fdb_create_database_impl(CLUSTER(c), &db);
if(err) {
return (FDBFuture*)ThreadFuture<Reference<IDatabase>>(Error(err)).extractPtr();
}
@ -340,9 +335,7 @@ FDBFuture* fdb_cluster_create_database_v609( FDBCluster* c, uint8_t const* db_na
extern "C" DLLEXPORT
fdb_error_t fdb_create_database( const char* cluster_file_path, FDBDatabase** out_database ) {
CATCH_AND_RETURN(
*out_database = (FDBDatabase*)API->createDatabase( cluster_file_path ? cluster_file_path : "" ).extractPtr();
);
return fdb_create_database_impl( cluster_file_path, out_database );
}
extern "C" DLLEXPORT
@ -394,21 +387,18 @@ FDBFuture* fdb_transaction_get_read_version( FDBTransaction* tr ) {
return (FDBFuture*)( TXN(tr)->getReadVersion().extractPtr() );
}
extern "C"
FDBFuture* fdb_transaction_get_impl( FDBTransaction* tr, uint8_t const* key_name,
int key_name_length, fdb_bool_t snapshot ) {
return (FDBFuture*)
( TXN(tr)->get( KeyRef( key_name, key_name_length ), snapshot ).extractPtr() );
}
extern "C"
FDBFuture* fdb_transaction_get_v13( FDBTransaction* tr, uint8_t const* key_name,
int key_name_length )
{
return fdb_transaction_get_impl( tr, key_name, key_name_length, 0 );
}
extern "C"
FDBFuture* fdb_transaction_get_key_impl( FDBTransaction* tr, uint8_t const* key_name,
int key_name_length, fdb_bool_t or_equal,
int offset, fdb_bool_t snapshot ) {
@ -419,7 +409,6 @@ FDBFuture* fdb_transaction_get_key_impl( FDBTransaction* tr, uint8_t const* key_
snapshot ).extractPtr() );
}
extern "C"
FDBFuture* fdb_transaction_get_key_v13( FDBTransaction* tr, uint8_t const* key_name,
int key_name_length, fdb_bool_t or_equal,
int offset ) {
@ -427,14 +416,13 @@ FDBFuture* fdb_transaction_get_key_v13( FDBTransaction* tr, uint8_t const* key_n
or_equal, offset, false );
}
extern "C"
extern "C" DLLEXPORT
FDBFuture* fdb_transaction_get_addresses_for_key( FDBTransaction* tr, uint8_t const* key_name,
int key_name_length ){
return (FDBFuture*)( TXN(tr)->getAddressesForKey( KeyRef(key_name, key_name_length) ).extractPtr() );
}
extern "C"
FDBFuture* fdb_transaction_get_range_impl(
FDBTransaction* tr, uint8_t const* begin_key_name,
int begin_key_name_length, fdb_bool_t begin_or_equal, int begin_offset,
@ -505,7 +493,6 @@ FDBFuture* fdb_transaction_get_range_impl(
snapshot, reverse ).extractPtr() );
}
extern "C"
FDBFuture* fdb_transaction_get_range_selector_v13(
FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length,
fdb_bool_t begin_or_equal, int begin_offset, uint8_t const* end_key_name,
@ -517,7 +504,6 @@ FDBFuture* fdb_transaction_get_range_selector_v13(
limit, 0, FDB_STREAMING_MODE_EXACT, 0, false, false);
}
extern "C"
FDBFuture* fdb_transaction_get_range_v13(
FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length,
uint8_t const* end_key_name, int end_key_name_length, int limit )
@ -600,7 +586,6 @@ FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr )
return (FDBFuture*)(TXN(tr)->getVersionstamp().extractPtr());
}
extern "C"
fdb_error_t fdb_transaction_set_option_impl( FDBTransaction* tr,
FDBTransactionOption option,
uint8_t const* value,
@ -610,7 +595,6 @@ fdb_error_t fdb_transaction_set_option_impl( FDBTransaction* tr,
TXN(tr)->setOption( (FDBTransactionOptions::Option)option, value ? StringRef( value, value_length ) : Optional<StringRef>() ); );
}
extern "C"
void fdb_transaction_set_option_v13( FDBTransaction* tr,
FDBTransactionOption option )
{
@ -680,6 +664,10 @@ fdb_error_t fdb_select_api_version_impl( int runtime_version, int header_version
// Versioned API changes -- descending order by version (new changes at top)
// FDB_API_CHANGED( function, ver ) means there is a new implementation as of ver, and a function function_(ver-1) is the old implementation
// FDB_API_REMOVED( function, ver ) means the function was removed as of ver, and function_(ver-1) is the old implementation
//
// WARNING: use caution when implementing removed functions by calling public API functions. This can lead to undesired behavior when
// using the multi-version API. Instead, it is better to have both the removed and public functions call an internal implementation function.
// See fdb_create_database_impl for an example.
FDB_API_REMOVED( fdb_future_get_version, 620 );
FDB_API_REMOVED( fdb_create_cluster, 610 );
FDB_API_REMOVED( fdb_cluster_create_database, 610 );

View File

@ -120,11 +120,6 @@ extern "C" {
fdb_future_get_error( FDBFuture* f );
#endif
#if FDB_API_VERSION < 620
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_version( FDBFuture* f, int64_t* out_version );
#endif
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_int64( FDBFuture* f, int64_t* out );
@ -265,6 +260,13 @@ extern "C" {
/* LEGACY API VERSIONS */
#if FDB_API_VERSION < 620
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_version( FDBFuture* f, int64_t* out_version );
#else
#define fdb_future_get_version(f, ov) FDB_REMOVED_FUNCTION
#endif
#if FDB_API_VERSION < 610 || defined FDB_INCLUDE_LEGACY_TYPES
typedef struct FDB_cluster FDBCluster;
@ -292,6 +294,13 @@ extern "C" {
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
fdb_cluster_create_database( FDBCluster* c, uint8_t const* db_name,
int db_name_length );
#else
#define fdb_future_get_cluster(f, oc) FDB_REMOVED_FUNCTION
#define fdb_future_get_database(f, od) FDB_REMOVED_FUNCTION
#define fdb_create_cluster(cfp) FDB_REMOVED_FUNCTION
#define fdb_cluster_destroy(c) FDB_REMOVED_FUNCTION
#define fdb_cluster_set_option(c, o, v, vl) FDB_REMOVED_FUNCTION
#define fdb_cluster_create_database(c, dn, dnl) FDB_REMOVED_FUNCTION
#endif
#if FDB_API_VERSION < 23

View File

@ -1,13 +1,13 @@
FROM centos:6
LABEL version=0.1.6
ENV DOCKER_IMAGEVER=0.1.7
LABEL version=0.1.8
ENV DOCKER_IMAGEVER=0.1.8
# Install dependencies for developer tools, bindings,\
# documentation, actorcompiler, and packaging tools\
RUN yum install -y yum-utils &&\
yum-config-manager --enable rhel-server-rhscl-7-rpms &&\
yum -y install centos-release-scl epel-release &&\
yum -y install devtoolset-8 java-1.8.0-openjdk-devel \
yum -y install devtoolset-8-8.1-1.el6 java-1.8.0-openjdk-devel \
rh-python36-python-devel devtoolset-8-valgrind-devel \
mono-core rh-ruby24 golang python27 rpm-build debbuild \
python-pip npm dos2unix valgrind-devel ccache distcc &&\

View File

@ -2,7 +2,7 @@ version: "3"
services:
common: &common
image: foundationdb/foundationdb-build:0.1.7
image: foundationdb/foundationdb-build:0.1.8
build-setup: &build-setup
<<: *common
@ -60,7 +60,7 @@ services:
snapshot-cmake: &snapshot-cmake
<<: *build-setup
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=0 -DVALGRIND=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" packages preinstall && cpack'
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=0 -DVALGRIND=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -j "$${MAKEJOBS}" && cpack'
prb-cmake:
<<: *snapshot-cmake
@ -68,7 +68,7 @@ services:
snapshot-ctest: &snapshot-ctest
<<: *build-setup
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure'
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure'
prb-ctest:
<<: *snapshot-ctest
@ -76,7 +76,7 @@ services:
snapshot-correctness: &snapshot-correctness
<<: *build-setup
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure'
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure'
prb-correctness:
<<: *snapshot-correctness

View File

@ -70,15 +70,23 @@ if (USE_CCACHE)
endif()
endif()
if ((NOT USE_LIBCXX) AND (NOT "$ENV{USE_LIBCXX}" STREQUAL ""))
string(TOUPPER "$ENV{USE_LIBCXX}" USE_LIBCXXENV)
if (("${USE_LIBCXXENV}" STREQUAL "ON") OR ("${USE_LIBCXXENV}" STREQUAL "1") OR ("${USE_LIBCXXENV}" STREQUAL "YES"))
set(USE_LIBCXX ON)
endif()
endif()
include(CheckFunctionExists)
set(CMAKE_REQUIRED_INCLUDES stdlib.h malloc.h)
set(CMAKE_REQUIRED_LIBRARIES c)
set(CMAKE_CXX_STANDARD 17)
if(WIN32)
# see: https://docs.microsoft.com/en-us/windows/desktop/WinProg/using-the-windows-headers
# this sets the windows target version to Windows 7
set(WINDOWS_TARGET 0x0601)
add_compile_options(/W3 /EHsc /std:c++17 /bigobj $<$<CONFIG:Release>:/Zi> /MP)
add_compile_options(/W3 /EHsc /bigobj $<$<CONFIG:Release>:/Zi> /MP)
add_compile_definitions(_WIN32_WINNT=${WINDOWS_TARGET} BOOST_ALL_NO_LIB)
else()
set(GCC NO)
@ -172,16 +180,17 @@ else()
-mmmx
-mavx
-msse4.2)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-std=c++17>)
if (USE_VALGRIND)
add_compile_options(-DVALGRIND -DUSE_VALGRIND)
endif()
if (CLANG)
add_compile_options()
if (APPLE OR USE_LIBCXX)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
add_compile_definitions(WITH_LIBCXX)
if (NOT APPLE)
add_link_options(-lc++abi -Wl,-build-id=sha1)
add_link_options(-lc++ -lc++abi -Wl,-build-id=sha1)
endif()
endif()
if (OPEN_FOR_IDE)
@ -198,6 +207,11 @@ else()
-Wno-undefined-var-template
-Wno-tautological-pointer-compare
-Wno-format)
if (USE_CCACHE)
add_compile_options(
-Wno-register
-Wno-error=unused-command-line-argument)
endif()
endif()
if (CMAKE_GENERATOR STREQUAL Xcode)
else()

View File

@ -380,6 +380,24 @@ The ``list`` subcommand will list the backups at a given 'base' or shortened Bac
This a shortened Backup URL which looks just like a Backup URL but without the backup <name> so that the list command will discover and list all of the backups in the bucket.
.. program:: fdbbackup cleanup
``cleanup``
------------
The ``cleanup`` subcommand will list orphaned backups and DRs and optionally remove their mutations.
::
user@host$ fdbbackup cleanup [--delete_data] [--min_cleanup_seconds] [-C <CLUSTER_FILE>]
``--delete_data``
This flag will cause ``cleanup`` to remove mutations for the most stale backup or DR.
``--min_cleanup_seconds``
Specifies the amount of time a backup or DR needs to be stale before ``cleanup`` will remove mutations for it. By default this is set to one hour.
``fdbrestore`` command line tool
================================

View File

@ -648,7 +648,7 @@ The number of replicas in each region is controlled by redundancy level. For exa
Asymmetric configurations
-------------------------
The fact that satellite policies are configured per region allows for asymmetric configurations. For example, FoudnationDB can have a three datacenter setup where there are two datacenters on the west coast (WC1, WC2) and one datacenter on the east coast (EC1). The west coast region can be set as the preferred active region by setting the priority of its primary datacenter higher than the east coast datacenter. The west coast region should have a satellite policy configured, so that when it is active, FoundationDB is making mutations durable in both west coast datacenters. In the rare event that one of the west coast datacenters has failed, FoundationDB will fail over to the east coast datacenter. Because this region does not a satellite datacenter, the mutations will only be made durable in one datacenter while the transaction subsystem is located here. However, this is justifiable because the region will only be active if a datacenter has already been lost.
The fact that satellite policies are configured per region allows for asymmetric configurations. For example, FoundationDB can have a three datacenter setup where there are two datacenters on the west coast (WC1, WC2) and one datacenter on the east coast (EC1). The west coast region can be set as the preferred active region by setting the priority of its primary datacenter higher than the east coast datacenter. The west coast region should have a satellite policy configured, so that when it is active, FoundationDB is making mutations durable in both west coast datacenters. In the rare event that one of the west coast datacenters has failed, FoundationDB will fail over to the east coast datacenter. Because this region does not have a satellite datacenter, the mutations will only be made durable in one datacenter while the transaction subsystem is located here. However, this is justifiable because the region will only be active if a datacenter has already been lost.
This is the region configuration that implements the example::

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.4.pkg <https://www.foundationdb.org/downloads/6.2.4/macOS/installers/FoundationDB-6.2.4.pkg>`_
* `FoundationDB-6.2.5.pkg <https://www.foundationdb.org/downloads/6.2.5/macOS/installers/FoundationDB-6.2.5.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.4/ubuntu/installers/foundationdb-clients_6.2.4-1_amd64.deb>`_
* `foundationdb-server-6.2.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.4/ubuntu/installers/foundationdb-server_6.2.4-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.5/ubuntu/installers/foundationdb-clients_6.2.5-1_amd64.deb>`_
* `foundationdb-server-6.2.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.5/ubuntu/installers/foundationdb-server_6.2.5-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel6/installers/foundationdb-clients-6.2.4-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel6/installers/foundationdb-server-6.2.4-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel6/installers/foundationdb-clients-6.2.5-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel6/installers/foundationdb-server-6.2.5-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel7/installers/foundationdb-clients-6.2.4-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.4/rhel7/installers/foundationdb-server-6.2.4-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel7/installers/foundationdb-clients-6.2.5-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.5/rhel7/installers/foundationdb-server-6.2.5-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.4-x64.msi <https://www.foundationdb.org/downloads/6.2.4/windows/installers/foundationdb-6.2.4-x64.msi>`_
* `foundationdb-6.2.5-x64.msi <https://www.foundationdb.org/downloads/6.2.5/windows/installers/foundationdb-6.2.5-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.4.tar.gz <https://www.foundationdb.org/downloads/6.2.4/bindings/python/foundationdb-6.2.4.tar.gz>`_
* `foundationdb-6.2.5.tar.gz <https://www.foundationdb.org/downloads/6.2.5/bindings/python/foundationdb-6.2.5.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.4.gem <https://www.foundationdb.org/downloads/6.2.4/bindings/ruby/fdb-6.2.4.gem>`_
* `fdb-6.2.5.gem <https://www.foundationdb.org/downloads/6.2.5/bindings/ruby/fdb-6.2.5.gem>`_
Java 8+
-------
* `fdb-java-6.2.4.jar <https://www.foundationdb.org/downloads/6.2.4/bindings/java/fdb-java-6.2.4.jar>`_
* `fdb-java-6.2.4-javadoc.jar <https://www.foundationdb.org/downloads/6.2.4/bindings/java/fdb-java-6.2.4-javadoc.jar>`_
* `fdb-java-6.2.5.jar <https://www.foundationdb.org/downloads/6.2.5/bindings/java/fdb-java-6.2.5.jar>`_
* `fdb-java-6.2.5-javadoc.jar <https://www.foundationdb.org/downloads/6.2.5/bindings/java/fdb-java-6.2.5-javadoc.jar>`_
Go 1.11+
--------

View File

@ -580,6 +580,7 @@
"max_machine_failures_without_losing_availability":0,
"total_disk_used_bytes":0,
"total_kv_size_bytes":0, // estimated
"system_kv_size_bytes":0, // estimated
"partitions_count":2,
"moving_data":{
"total_written_bytes":0,

View File

@ -2,6 +2,13 @@
Release Notes
#############
6.1.13
======
* Loading a 6.1 or newer ``fdb_c`` library as a secondary client using the multi-version client could lead to an infinite recursion when run with API versions older than 610. `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
* Using C API functions that were removed in 6.1 when using API version 610 or above now results in a compilation error. `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
* ``fdbrestore`` commands other than ``start`` required a default cluster file to be found but did not actually use it. `(PR #1912) <https://github.com/apple/foundationdb/pull/1912>`_.
6.1.12
======
@ -191,4 +198,4 @@ Earlier release notes
* :doc:`Beta 2 (API Version 22) </old-release-notes/release-notes-022>`
* :doc:`Beta 1 (API Version 21) </old-release-notes/release-notes-021>`
* :doc:`Alpha 6 (API Version 16) </old-release-notes/release-notes-016>`
* :doc:`Alpha 5 (API Version 14) </old-release-notes/release-notes-014>`
* :doc:`Alpha 5 (API Version 14) </old-release-notes/release-notes-014>`

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.2.4
6.2.5
=====
Performance
@ -46,6 +46,10 @@ Fixes
* The ``fileconfigure`` command in ``fdbcli`` could fail with an unknown error if the file did not contain a valid JSON object. `(PR #2017) <https://github.com/apple/foundationdb/pull/2017>`_.
* Configuring regions would fail with an internal error if the cluster contained storage servers that didn't set a datacenter ID. `(PR #2017) <https://github.com/apple/foundationdb/pull/2017>`_.
* Clients no longer prefer reading from servers with the same zone ID, because it could create hot shards. [6.2.3] `(PR #2019) <https://github.com/apple/foundationdb/pull/2019>`_.
* Data distribution could fail to start if any storage servers had misconfigured locality information. This problem could persist even after the offending storage servers were removed or fixed. [6.2.5] `(PR #2110) <https://github.com/apple/foundationdb/pull/2110>`_.
* Data distribution was running at too high of a priority, which sometimes caused other roles on the same process to stall. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* Loading a 6.1 or newer ``fdb_c`` library as a secondary client using the multi-version client could lead to an infinite recursion when run with API versions older than 610. [6.2.5] `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
* Using C API functions that were removed in 6.1 when using API version 610 or above now results in a compilation error. [6.2.5] `(PR #2169) <https://github.com/apple/foundationdb/pull/2169>`_
Status
------
@ -63,6 +67,7 @@ Status
* Add ``coordinator`` to the list of roles that can be reported for a process. [6.2.3] `(PR #2006) <https://github.com/apple/foundationdb/pull/2006>`_.
* Added ``worst_durability_lag_storage_server`` and ``limiting_durability_lag_storage_server`` to the ``cluster.qos`` section, each with subfields ``versions`` and ``seconds``. These report the durability lag values being used by ratekeeper to potentially limit the transaction rate. [6.2.3] `(PR #2003) <https://github.com/apple/foundationdb/pull/2003>`_.
* Added ``worst_data_lag_storage_server`` and ``limiting_data_lag_storage_server`` to the ``cluster.qos`` section, each with subfields ``versions`` and ``seconds``. These are meant to replace ``worst_version_lag_storage_server`` and ``limiting_version_lag_storage_server``, which are now deprecated. [6.2.3] `(PR #2003) <https://github.com/apple/foundationdb/pull/2003>`_.
* Added ``system_kv_size_bytes`` to the ``cluster.data`` section to record the size of the system keyspace. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
Bindings
--------
@ -77,6 +82,11 @@ Bindings
* Added a transaction option to control the whether ``get_addresses_for_key`` includes a port in the address. This will be deprecated in api version 700, and addresses will include ports by default. [6.2.4] `(PR #2060) <https://github.com/apple/foundationdb/pull/2060>`_.
* Python: ``Versionstamp`` comparisons didn't work in Python 3. [6.2.4] `(PR #2089) <https://github.com/apple/foundationdb/pull/2089>`_.
Features
--------
* Added the ``cleanup`` command to ``fdbbackup`` which can be used to remove orphaned backups or DRs. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
Other Changes
-------------
@ -111,6 +121,9 @@ Fixes only impacting 6.2.0+
* The cluster controller would saturate its CPU for a few seconds when sending configuration information to all of the worker processes. [6.2.4] `(PR #2086) <https://github.com/apple/foundationdb/pull/2086>`_.
* The data distributor would build all possible team combinations if it was tracking an unhealthy server with less than 10 teams. [6.2.4] `(PR #2099) <https://github.com/apple/foundationdb/pull/2099>`_.
* The cluster controller could crash if a coordinator was unreachable when compiling cluster status. [6.2.4] `(PR #2065) <https://github.com/apple/foundationdb/pull/2065>`_.
* The cluster controller could crash if a coordinator was unreachable when compiling cluster status. [6.2.4] `(PR #2065) <https://github.com/apple/foundationdb/pull/2065>`_.
* A storage server could crash if it took longer than 10 minutes to fetch a key range from another server. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* Excluding or including servers would restart the data distributor. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
Earlier release notes
---------------------

View File

@ -83,7 +83,7 @@ enum enumProgramExe {
};
enum enumBackupType {
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_MODIFY, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_PAUSE, BACKUP_RESUME, BACKUP_EXPIRE, BACKUP_DELETE, BACKUP_DESCRIBE, BACKUP_LIST, BACKUP_DUMP
BACKUP_UNDEFINED=0, BACKUP_START, BACKUP_MODIFY, BACKUP_STATUS, BACKUP_ABORT, BACKUP_WAIT, BACKUP_DISCONTINUE, BACKUP_PAUSE, BACKUP_RESUME, BACKUP_EXPIRE, BACKUP_DELETE, BACKUP_DESCRIBE, BACKUP_LIST, BACKUP_DUMP, BACKUP_CLEANUP
};
enum enumDBType {
@ -102,7 +102,7 @@ enum {
OPT_EXPIRE_BEFORE_VERSION, OPT_EXPIRE_BEFORE_DATETIME, OPT_EXPIRE_DELETE_BEFORE_DAYS,
OPT_EXPIRE_RESTORABLE_AFTER_VERSION, OPT_EXPIRE_RESTORABLE_AFTER_DATETIME, OPT_EXPIRE_MIN_RESTORABLE_DAYS,
OPT_BASEURL, OPT_BLOB_CREDENTIALS, OPT_DESCRIBE_DEEP, OPT_DESCRIBE_TIMESTAMPS,
OPT_DUMP_BEGIN, OPT_DUMP_END, OPT_JSON,
OPT_DUMP_BEGIN, OPT_DUMP_END, OPT_JSON, OPT_DELETE_DATA, OPT_MIN_CLEANUP_SECONDS,
// Backup and Restore constants
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
@ -260,6 +260,7 @@ CSimpleOpt::SOption g_rgBackupStatusOptions[] = {
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_JSON, "--json", SO_NONE},
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -289,6 +290,37 @@ CSimpleOpt::SOption g_rgBackupAbortOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
SO_END_OF_OPTIONS
};
CSimpleOpt::SOption g_rgBackupCleanupOptions[] = {
#ifdef _WIN32
{ OPT_PARENTPID, "--parentpid", SO_REQ_SEP },
#endif
{ OPT_CLUSTERFILE, "-C", SO_REQ_SEP },
{ OPT_CLUSTERFILE, "--cluster_file", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
{ OPT_QUIET, "-q", SO_NONE },
{ OPT_QUIET, "--quiet", SO_NONE },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
{ OPT_HELP, "-?", SO_NONE },
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_DELETE_DATA, "--delete_data", SO_NONE },
{ OPT_MIN_CLEANUP_SECONDS, "--min_cleanup_seconds", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -320,6 +352,7 @@ CSimpleOpt::SOption g_rgBackupDiscontinueOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -351,6 +384,7 @@ CSimpleOpt::SOption g_rgBackupWaitOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -378,6 +412,7 @@ CSimpleOpt::SOption g_rgBackupPauseOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -647,6 +682,7 @@ CSimpleOpt::SOption g_rgDBStartOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -680,6 +716,7 @@ CSimpleOpt::SOption g_rgDBStatusOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -712,6 +749,7 @@ CSimpleOpt::SOption g_rgDBSwitchOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -744,6 +782,7 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -773,6 +812,7 @@ CSimpleOpt::SOption g_rgDBPauseOptions[] = {
{ OPT_HELP, "-h", SO_NONE },
{ OPT_HELP, "--help", SO_NONE },
{ OPT_DEVHELP, "--dev-help", SO_NONE },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -1245,6 +1285,7 @@ enumBackupType getBackupType(std::string backupType)
values["start"] = BACKUP_START;
values["status"] = BACKUP_STATUS;
values["abort"] = BACKUP_ABORT;
values["cleanup"] = BACKUP_CLEANUP;
values["wait"] = BACKUP_WAIT;
values["discontinue"] = BACKUP_DISCONTINUE;
values["pause"] = BACKUP_PAUSE;
@ -1922,6 +1963,21 @@ ACTOR Future<Void> abortBackup(Database db, std::string tagName) {
return Void();
}
ACTOR Future<Void> cleanupMutations(Database db, bool deleteData) {
try
{
wait(cleanupBackup(db, deleteData));
}
catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
fprintf(stderr, "ERROR: %s\n", e.what());
throw;
}
return Void();
}
ACTOR Future<Void> waitBackup(Database db, std::string tagName, bool stopWhenDone) {
try
{
@ -2671,6 +2727,9 @@ int main(int argc, char* argv[]) {
case BACKUP_ABORT:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupAbortOptions, SO_O_EXACT);
break;
case BACKUP_CLEANUP:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupCleanupOptions, SO_O_EXACT);
break;
case BACKUP_WAIT:
args = new CSimpleOpt(argc - 1, &argv[1], g_rgBackupWaitOptions, SO_O_EXACT);
break;
@ -2863,6 +2922,7 @@ int main(int argc, char* argv[]) {
std::string restoreClusterFileDest;
std::string restoreClusterFileOrig;
bool jsonOutput = false;
bool deleteData = false;
BackupModifyOptions modifyOptions;
@ -2942,6 +3002,12 @@ int main(int argc, char* argv[]) {
case OPT_DRYRUN:
dryRun = true;
break;
case OPT_DELETE_DATA:
deleteData = true;
break;
case OPT_MIN_CLEANUP_SECONDS:
knobs.push_back( std::make_pair( "min_cleanup_seconds", args->OptionArg() ) );
break;
case OPT_FORCE:
forceAction = true;
break;
@ -3512,6 +3578,12 @@ int main(int argc, char* argv[]) {
f = stopAfter( abortBackup(db, tagName) );
break;
case BACKUP_CLEANUP:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( cleanupMutations(db, deleteData) );
break;
case BACKUP_WAIT:
if(!initCluster())
return FDB_EXIT_ERROR;

View File

@ -2211,6 +2211,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
ACTOR Future<bool> createSnapshot(Database db, std::vector<StringRef> tokens ) {
state Standalone<StringRef> snapCmd;
state UID snapUID = deterministicRandom()->randomUniqueID();
for ( int i = 1; i < tokens.size(); i++) {
snapCmd = snapCmd.withSuffix(tokens[i]);
if (i != tokens.size() - 1) {
@ -2218,11 +2219,11 @@ ACTOR Future<bool> createSnapshot(Database db, std::vector<StringRef> tokens ) {
}
}
try {
UID snapUID = wait(makeInterruptable(mgmtSnapCreate(db, snapCmd)));
wait(makeInterruptable(mgmtSnapCreate(db, snapCmd, snapUID)));
printf("Snapshot command succeeded with UID %s\n", snapUID.toString().c_str());
} catch (Error& e) {
fprintf(stderr, "Snapshot create failed %d (%s)."
" Please cleanup any instance level snapshots created.\n", e.code(), e.what());
fprintf(stderr, "Snapshot command failed %d (%s)."
" Please cleanup any instance level snapshots created with UID %s.\n", e.code(), e.what(), snapUID.toString().c_str());
return true;
}
return false;

View File

@ -485,7 +485,7 @@ bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key);
Version getVersionFromString(std::string const& value);
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize = CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid);
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Key getApplyKey( Version version, Key backupUid );
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key);
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value);
@ -503,6 +503,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key removePrefix, Version beginVersion,
Version* endVersion, RequestStream<CommitTransactionRequest> commit,
NotifiedVersion* committedVersion, Reference<KeyRangeMap<Version>> keyVersion);
ACTOR Future<Void> cleanupBackup(Database cx, bool deleteData);
typedef BackupAgentBase::enumState EBackupState;
template<> inline Tuple Codec<EBackupState>::pack(EBackupState const &val) { return Tuple().append(val); }

View File

@ -708,7 +708,7 @@ ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key remov
}
}
ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
ACTOR static Future<Void> _eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Key backupLatestVersionsKey = logUidValue.withPrefix(backupLatestVersionsPath);
@ -716,104 +716,189 @@ ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUi
return Void();
}
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (checkBackupUid) {
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
currBeginVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
break;
}
}
// Do not clear anything if version history key cannot be found
if (currBeginVersion == invalidVersion) {
return Void();
}
state Version currEndVersion = std::numeric_limits<Version>::max();
if(endVersion.present()) {
currEndVersion = std::min(currEndVersion, endVersion.get());
}
state Version nextSmallestVersion = currEndVersion;
bool clearLogRangesRequired = true;
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > currBeginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
}
}
}
if (endVersion.present() || backupVersions.size() != 1 || BUGGIFY) {
if (!endVersion.present()) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
if(backupVersions.size() == 1) {
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(currEndVersion, Unversioned()));
}
// Clear log ranges if needed
if (clearLogRangesRequired) {
if((nextSmallestVersion - currBeginVersion) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE >= std::numeric_limits<uint8_t>::max() || BUGGIFY) {
Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin);
for(int h = 0; h <= std::numeric_limits<uint8_t>::max(); h++) {
uint64_t bv = bigEndian64(Version(0));
uint64_t ev = bigEndian64(nextSmallestVersion);
uint8_t h1 = h;
Key vblockPrefix = StringRef(&h1, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
tr->clear(KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
} else {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
} else {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
return Void();
}
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
return _eraseLogData(tr, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid);
}
ACTOR Future<Void> cleanupLogMutations(Database cx, Value destUidValue, bool deleteData) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop{
state Optional<Key> removingLogUid;
state std::set<Key> loggedLogUids;
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (checkBackupUid) {
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
state Version readVer = tr->getReadVersion().get();
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
currBeginVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
break;
state Version minVersion = std::numeric_limits<Version>::max();
state Key minVersionLogUid;
state int backupIdx = 0;
for (; backupIdx < backupVersions.size(); backupIdx++) {
state Version currVersion = BinaryReader::fromStringRef<Version>(backupVersions[backupIdx].value, Unversioned());
state Key currLogUid = backupVersions[backupIdx].key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if( currVersion < minVersion ) {
minVersionLogUid = currLogUid;
minVersion = currVersion;
}
}
// Do not clear anything if version history key cannot be found
if (currBeginVersion == invalidVersion) {
return Void();
}
if(!loggedLogUids.count(currLogUid)) {
state Future<Optional<Value>> foundDRKey = tr->get(Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(currLogUid).pack(DatabaseBackupAgent::keyStateStatus));
state Future<Optional<Value>> foundBackupKey = tr->get(Subspace(currLogUid.withPrefix(LiteralStringRef("uid->config/")).withPrefix(fileBackupPrefixRange.begin)).pack(LiteralStringRef("stateEnum")));
wait(success(foundDRKey) && success(foundBackupKey));
state Version currEndVersion = currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
if(endVersion.present()) {
currEndVersion = std::min(currEndVersion, endVersion.get());
}
state Version nextSmallestVersion = currEndVersion;
bool clearLogRangesRequired = true;
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > currBeginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
if(foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("WARNING: Found a tag which looks like both a backup and a DR. This tag was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(foundDRKey.get().present() && !foundBackupKey.get().present()) {
printf("Found a DR which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(!foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("Found a Backup which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
printf("WARNING: Found a unknown tag which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
loggedLogUids.insert(currLogUid);
}
}
if (!endVersion.present() && backupVersions.size() == 1) {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
if( readVer - minVersion > CLIENT_KNOBS->MIN_CLEANUP_SECONDS*CLIENT_KNOBS->CORE_VERSIONSPERSECOND && deleteData && (!removingLogUid.present() || minVersionLogUid == removingLogUid.get()) ) {
removingLogUid = minVersionLogUid;
wait(eraseLogData(tr, minVersionLogUid, destUidValue));
wait(tr->commit());
printf("\nSuccessfully removed the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(removingLogUid.present() && minVersionLogUid != removingLogUid.get()) {
printf("\nWARNING: The oldest tag was possibly removed, run again without `--delete_data' to check.\n");
} else if( deleteData ) {
printf("\nWARNING: Did not delete data because the tag was not at least %.4f hours behind. Change `--min_cleanup_seconds' to adjust this threshold.\n", CLIENT_KNOBS->MIN_CLEANUP_SECONDS/3600.0);
} else {
if (!endVersion.present() && currEndVersion >= nextSmallestVersion) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(currEndVersion, Unversioned()));
}
printf("\nPassing `--delete_data' would delete the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
// Clear log ranges if needed
if (clearLogRangesRequired) {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
wait(tr->commit());
if (!endVersion.present() && (backupVersions.size() == 1 || currEndVersion >= nextSmallestVersion)) {
return Void();
}
if(endVersion.present() && currEndVersion == endVersion.get()) {
return Void();
}
tr->reset();
} catch (Error &e) {
return Void();
} catch( Error& e) {
wait(tr->onError(e));
}
}
}
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
return _eraseLogData(cx, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid);
ACTOR Future<Void> cleanupBackup(Database cx, bool deleteData) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Standalone<RangeResultRef> destUids = wait(tr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
for(auto destUid : destUids) {
wait(cleanupLogMutations(cx, destUid.value, deleteData));
}
return Void();
} catch( Error& e) {
wait(tr->onError(e));
}
}
}

View File

@ -482,11 +482,17 @@ namespace dbBackup {
wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
wait(eraseLogData(taskBucket->src, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], Optional<Version>(endVersion), true, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
return Void();
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(taskBucket->src));
loop {
try {
Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
wait(eraseLogData(tr, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], Optional<Version>(endVersion), true, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
wait(tr->commit());
return Void();
} catch( Error &e ) {
wait(tr->onError(e));
}
}
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version endVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
@ -833,8 +839,7 @@ namespace dbBackup {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(taskBucket->src));
state Key logUidValue = task->params[DatabaseBackupAgent::keyConfigLogUid];
state Key destUidValue = task->params[BackupAgentBase::destUid];
state Version beginVersion;
state Version endVersion;
state Version backupUid = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned());
loop {
try {
@ -844,25 +849,13 @@ namespace dbBackup {
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
return Void();
state Key latestVersionKey = logUidValue.withPrefix(task->params[BackupAgentBase::destUid].withPrefix(backupLatestVersionsPrefix));
state Optional<Key> bVersion = wait(tr->get(latestVersionKey));
if (!bVersion.present()) {
return Void();
}
beginVersion = BinaryReader::fromStringRef<Version>(bVersion.get(), Unversioned());
endVersion = tr->getReadVersion().get();
break;
wait(eraseLogData(tr, logUidValue, destUidValue, Optional<Version>(), true, backupUid));
wait(tr->commit());
return Void();
} catch(Error &e) {
wait(tr->onError(e));
}
}
Version backupUid = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned());
wait(eraseLogData(taskBucket->src, logUidValue, destUidValue, Optional<Version>(), true, backupUid));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
@ -2179,22 +2172,23 @@ public:
}
}
if(partial)
return Void();
state Future<Void> partialTimeout = partial ? delay(30.0) : Never();
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src));
state Version beginVersion;
state Version endVersion;
state bool clearSrcDb = true;
loop {
try {
srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> v = wait( srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) ) );
state Future<Optional<Value>> backupVersionF = srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) );
wait(success(backupVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned())) {
clearSrcDb = false;
if(backupVersionF.get().present() && BinaryReader::fromStringRef<Version>(backupVersionF.get().get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned())) {
break;
}
@ -2208,18 +2202,31 @@ public:
Key latestVersionKey = logUidValue.withPrefix(destUidValue.withPrefix(backupLatestVersionsPrefix));
Optional<Key> bVersion = wait(srcTr->get(latestVersionKey));
if (bVersion.present()) {
beginVersion = BinaryReader::fromStringRef<Version>(bVersion.get(), Unversioned());
state Future<Optional<Key>> bVersionF = srcTr->get(latestVersionKey);
wait(success(bVersionF) || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
if (bVersionF.get().present()) {
beginVersion = BinaryReader::fromStringRef<Version>(bVersionF.get().get(), Unversioned());
} else {
clearSrcDb = false;
break;
}
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) ));
srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid );
wait( eraseLogData(srcTr, logUidValue, destUidValue) || partialTimeout );
if(partialTimeout.isReady()) {
return Void();
}
wait(srcTr->commit());
wait(srcTr->commit() || partialTimeout);
if(partialTimeout.isReady()) {
return Void();
}
endVersion = srcTr->getCommittedVersion() + 1;
break;
@ -2229,10 +2236,6 @@ public:
}
}
if (clearSrcDb && !abortOldBackup) {
wait(eraseLogData(backupAgent->taskBucket->src, logUidValue, destUidValue));
}
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
loop {
try {

View File

@ -1988,6 +1988,7 @@ namespace fileBackup {
const uint32_t BackupLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
//This task stopped being used in 6.2, however the code remains here to handle upgrades.
struct EraseLogRangeTaskFunc : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
@ -2005,21 +2006,6 @@ namespace fileBackup {
}
} Params;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
state Version endVersion = Params.endVersion().get(task);
state Key destUidValue = Params.destUidValue().get(task);
state BackupConfig config(task);
state Key logUidValue = config.getUidAsKey();
wait(eraseLogData(cx, logUidValue, destUidValue, endVersion != 0 ? Optional<Version>(endVersion) : Optional<Version>()));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, UID logUid, TaskCompletionKey completionKey, Key destUidValue, Version endVersion = 0, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(EraseLogRangeTaskFunc::name,
EraseLogRangeTaskFunc::version,
@ -2036,16 +2022,23 @@ namespace fileBackup {
return key;
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task));
wait(checkTaskVersion(tr->getDatabase(), task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
state Version endVersion = Params.endVersion().get(task);
state Key destUidValue = Params.destUidValue().get(task);
state BackupConfig config(task);
state Key logUidValue = config.getUidAsKey();
wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task) && eraseLogData(tr, logUidValue, destUidValue, endVersion != 0 ? Optional<Version>(endVersion) : Optional<Version>()));
return Void();
}
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef EraseLogRangeTaskFunc::name = LiteralStringRef("file_backup_erase_logs_5.2");
@ -2132,7 +2125,7 @@ namespace fileBackup {
// Do not erase at the first time
if (prevBeginVersion > 0) {
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait(success(EraseLogRangeTaskFunc::addTask(tr, taskBucket, config.getUid(), TaskCompletionKey::joinWith(logDispatchBatchFuture), destUidValue, beginVersion)));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue, Optional<Version>(beginVersion)) );
}
wait(taskBucket->finish(tr, task));
@ -2183,7 +2176,7 @@ namespace fileBackup {
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
state Key destUidValue = wait(backup.destUidValue().getOrThrow(tr));
wait(success(EraseLogRangeTaskFunc::addTask(tr, taskBucket, backup.getUid(), TaskCompletionKey::noSignal(), destUidValue)));
wait( eraseLogData(tr, backup.getUidAsKey(), destUidValue) );
backup.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
@ -3820,8 +3813,7 @@ public:
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait(success(tr->getReadVersion()));
wait(success(fileBackup::EraseLogRangeTaskFunc::addTask(tr, backupAgent->taskBucket, config.getUid(), TaskCompletionKey::noSignal(), destUidValue)));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue) );
config.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
@ -3860,8 +3852,8 @@ public:
// Cancel backup task through tag
wait(tag.cancel(tr));
wait(success(fileBackup::EraseLogRangeTaskFunc::addTask(tr, backupAgent->taskBucket, config.getUid(), TaskCompletionKey::noSignal(), destUidValue)));
wait(eraseLogData(tr, config.getUidAsKey(), destUidValue));
config.stateEnum().set(tr, EBackupState::STATE_ABORTED);

View File

@ -145,7 +145,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BACKUP_ERROR_DELAY, 10.0 );
init( BACKUP_STATUS_DELAY, 40.0 );
init( BACKUP_STATUS_JITTER, 0.05 );
init( CLEAR_LOG_RANGE_COUNT, 1500); // transaction size / (size of '\xff\x02/blog/' + size of UID + size of hash result) = 200,000 / (8 + 16 + 8)
init( MIN_CLEANUP_SECONDS, 3600.0 );
// Configuration
init( DEFAULT_AUTO_PROXIES, 3 );

View File

@ -131,7 +131,6 @@ public:
int BACKUP_COPY_TASKS;
int BACKUP_BLOCK_SIZE;
int BACKUP_TASKS_PER_AGENT;
int CLEAR_LOG_RANGE_COUNT;
int SIM_BACKUP_TASKS_PER_AGENT;
int BACKUP_RANGEFILE_BLOCK_SIZE;
int BACKUP_LOGFILE_BLOCK_SIZE;
@ -147,6 +146,7 @@ public:
double BACKUP_ERROR_DELAY;
double BACKUP_STATUS_DELAY;
double BACKUP_STATUS_JITTER;
double MIN_CLEANUP_SECONDS;
// Configuration
int32_t DEFAULT_AUTO_PROXIES;

View File

@ -1209,8 +1209,6 @@ ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) ); //To conflict with parallel includeServers
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
tr.set( moveKeysLockOwnerKey, versionKey );
tr.set( excludedServersVersionKey, excludeVersionKey );
for(auto& s : servers)
tr.set( encodeExcludedServersKey(s), StringRef() );
@ -1241,9 +1239,6 @@ ACTOR Future<Void> includeServers( Database cx, vector<AddressExclusion> servers
// includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY
tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY );
tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) );
tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) );
tr.set( moveKeysLockOwnerKey, versionKey );
tr.set( excludedServersVersionKey, excludeVersionKey );
for(auto& s : servers ) {
@ -1533,12 +1528,11 @@ ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vec
return inProgressExclusion;
}
ACTOR Future<UID> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd) {
state UID snapUID = deterministicRandom()->randomUniqueID();
ACTOR Future<Void> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID) {
try {
wait(snapCreate(cx, snapCmd, snapUID));
TraceEvent("SnapCreateSucceeded").detail("snapUID", snapUID);
return snapUID;
return Void();
} catch (Error& e) {
TraceEvent(SevWarn, "SnapCreateFailed").detail("snapUID", snapUID).error(e);
throw;

View File

@ -196,7 +196,7 @@ bool schemaMatch( json_spirit::mValue const& schema, json_spirit::mValue const&
// execute payload in 'snapCmd' on all the coordinators, TLogs and
// storage nodes
ACTOR Future<UID> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd);
ACTOR Future<Void> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);
#include "flow/unactorcompiler.h"
#endif

View File

@ -606,6 +606,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"max_machine_failures_without_losing_availability":0,
"total_disk_used_bytes":0,
"total_kv_size_bytes":0,
"system_kv_size_bytes":0,
"partitions_count":2,
"moving_data":{
"total_written_bytes":0,

View File

@ -42,6 +42,7 @@ class TCTeamInfo;
struct TCMachineInfo;
class TCMachineTeamInfo;
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self);
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self);
struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
@ -90,7 +91,10 @@ struct TCMachineInfo : public ReferenceCounted<TCMachineInfo> {
explicit TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry) : localityEntry(entry) {
ASSERT(serversOnMachine.empty());
serversOnMachine.push_back(server);
machineID = server->lastKnownInterface.locality.zoneId().get();
LocalityData& locality = server->lastKnownInterface.locality;
ASSERT(locality.zoneId().present());
machineID = locality.zoneId().get();
}
std::string getServersIDStr() {
@ -609,6 +613,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
AsyncVar<bool> zeroOptimalTeams;
AsyncMap< AddressExclusion, bool > excludedServers; // true if an address is in the excluded list in the database. Updated asynchronously (eventually)
std::set<AddressExclusion> invalidLocalityAddr; // These address have invalidLocality for the configured storagePolicy
std::vector<Optional<Key>> includedDCs;
Optional<std::vector<Optional<Key>>> otherTrackedDCs;
@ -618,6 +623,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Future<Void> checkTeamDelay;
Promise<Void> addSubsetComplete;
Future<Void> badTeamRemover;
Future<Void> checkInvalidLocalities;
Future<Void> wrongStoreTypeRemover;
@ -662,7 +668,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<AsyncVar<bool>> processingUnhealthy)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), wrongStoreTypeRemover(Void()), configuration(configuration),
teamBuilder(Void()), badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()), configuration(configuration),
readyToStart(readyToStart), clearHealthyZoneFuture(true),
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)),
initialFailureReactionDelay(
@ -999,6 +1005,17 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// we preferentially mark the least used server as undesirable?
for (auto i = initTeams->allServers.begin(); i != initTeams->allServers.end(); ++i) {
if (self->shouldHandleServer(i->first)) {
if (!self->isValidLocality(self->configuration.storagePolicy, i->first.locality)) {
TraceEvent(SevWarnAlways, "MissingLocality")
.detail("Server", i->first.uniqueID)
.detail("Locality", i->first.locality.toString());
auto addr = i->first.address();
self->invalidLocalityAddr.insert(AddressExclusion(addr.ip, addr.port));
if (self->checkInvalidLocalities.isReady()) {
self->checkInvalidLocalities = checkAndRemoveInvalidLocalityAddr(self);
self->addActor.send(self->checkInvalidLocalities);
}
}
self->addServer(i->first, i->second, self->serverTrackerErrorOut, 0);
}
}
@ -1013,6 +1030,25 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return Void();
}
// Check if server or machine has a valid locality based on configured replication policy
bool isValidLocality(Reference<IReplicationPolicy> storagePolicy, const LocalityData& locality) {
// Future: Once we add simulation test that misconfigure a cluster, such as not setting some locality entries,
// DD_VALIDATE_LOCALITY should always be true. Otherwise, simulation test may fail.
if (!SERVER_KNOBS->DD_VALIDATE_LOCALITY) {
// Disable the checking if locality is valid
return true;
}
std::set<std::string> replicationPolicyKeys = storagePolicy->attributeKeys();
for (auto& policy : replicationPolicyKeys) {
if (!locality.isPresent(policy)) {
return false;
}
}
return true;
}
void evaluateTeamQuality() {
int teamCount = teams.size(), serverCount = allServers.size();
double teamsPerServer = (double)teamCount * configuration.storageTeamSize / serverCount;
@ -1394,6 +1430,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
Reference<TCServerInfo> representativeServer = machine->second->serversOnMachine[0];
auto& locality = representativeServer->lastKnownInterface.locality;
if (!isValidLocality(configuration.storagePolicy, locality)) {
TraceEvent(SevWarn, "RebuildMachineLocalityMapError")
.detail("Machine", machine->second->machineID.toString())
.detail("InvalidLocality", locality.toString());
continue;
}
const LocalityEntry& localityEntry = machineLocalityMap.add(locality, &representativeServer->id);
machine->second->localityEntry = localityEntry;
++numHealthyMachine;
@ -1431,6 +1473,11 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
ASSERT_WE_THINK(server_info.find(machine.second->serversOnMachine[0]->id) != server_info.end());
// Skip unhealthy machines
if (!isMachineHealthy(machine.second)) continue;
// Skip machine with incomplete locality
if (!isValidLocality(configuration.storagePolicy,
machine.second->serversOnMachine[0]->lastKnownInterface.locality)) {
continue;
}
// Invariant: We only create correct size machine teams.
// When configuration (e.g., team size) is changed, the DDTeamCollection will be destroyed and rebuilt
@ -1600,6 +1647,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
for (auto& server : server_info) {
// Only pick healthy server, which is not failed or excluded.
if (server_status.get(server.first).isUnhealthy()) continue;
if (!isValidLocality(configuration.storagePolicy, server.second->lastKnownInterface.locality)) continue;
int numTeams = server.second->teams.size();
if (numTeams < minTeams) {
@ -1612,6 +1660,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
if (leastUsedServers.empty()) {
// If we cannot find a healthy server with valid locality
TraceEvent("NoHealthyAndValidLocalityServers")
.detail("Servers", server_info.size())
.detail("UnhealthyServers", unhealthyServers);
return Reference<TCServerInfo>();
} else {
return deterministicRandom()->randomChoice(leastUsedServers);
@ -2582,7 +2634,7 @@ ACTOR Future<Void> machineTeamRemover(DDTeamCollection* self) {
}
// To avoid removing machine teams too fast, which is unlikely happen though
wait( delay(SERVER_KNOBS->TR_REMOVE_MACHINE_TEAM_DELAY) );
wait( delay(SERVER_KNOBS->TR_REMOVE_MACHINE_TEAM_DELAY, TaskPriority::DataDistribution) );
wait(waitUntilHealthy(self));
// Wait for the badTeamRemover() to avoid the potential race between adding the bad team (add the team tracker)
@ -2705,7 +2757,7 @@ ACTOR Future<Void> serverTeamRemover(DDTeamCollection* self) {
removeServerTeamDelay = removeServerTeamDelay / 100;
}
// To avoid removing server teams too fast, which is unlikely happen though
wait(delay(removeServerTeamDelay));
wait(delay(removeServerTeamDelay, TaskPriority::DataDistribution));
wait(waitUntilHealthy(self, SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_EXTRA_DELAY));
// Wait for the badTeamRemover() to avoid the potential race between
@ -3105,7 +3157,7 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk, process on
// each machine creates the TCServer with the correct serverID and StorageServerInterface.
ACTOR Future<Void> waitServerListChange( DDTeamCollection* self, FutureStream<Void> serverRemoved ) {
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY, TaskPriority::DataDistributionLaunch);
state Future<vector<std::pair<StorageServerInterface, ProcessClass>>> serverListAndProcessClasses = Never();
state bool isFetchingResults = false;
state Transaction tr(self->cx);
@ -3143,7 +3195,7 @@ ACTOR Future<Void> waitServerListChange( DDTeamCollection* self, FutureStream<Vo
}
tr = Transaction(self->cx);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY, TaskPriority::DataDistributionLaunch);
}
when( waitNext( serverRemoved ) ) {
if( isFetchingResults ) {
@ -3177,7 +3229,7 @@ ACTOR Future<Void> waitHealthyZoneChange( DDTeamCollection* self ) {
healthyZoneTimeout = Never();
} else if (p.second > tr.getReadVersion().get()) {
double timeoutSeconds = (p.second - tr.getReadVersion().get())/(double)SERVER_KNOBS->VERSIONS_PER_SECOND;
healthyZoneTimeout = delay(timeoutSeconds);
healthyZoneTimeout = delay(timeoutSeconds, TaskPriority::DataDistribution);
if(self->healthyZone.get() != p.first) {
TraceEvent("MaintenanceZoneStart", self->distributorId).detail("ZoneID", printable(p.first)).detail("EndVersion", p.second).detail("Duration", timeoutSeconds);
self->healthyZone.set(p.first);
@ -3648,6 +3700,68 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection* self) {
}
}
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self) {
state double start = now();
state bool hasCorrectedLocality = false;
loop {
try {
wait(delay(SERVER_KNOBS->DD_CHECK_INVALID_LOCALITY_DELAY, TaskPriority::DataDistribution));
// Because worker's processId can be changed when its locality is changed, we cannot watch on the old
// processId; This actor is inactive most time, so iterating all workers incurs little performance overhead.
state vector<ProcessData> workers = wait(getWorkers(self->cx));
state std::set<AddressExclusion> existingAddrs;
for (int i = 0; i < workers.size(); i++) {
const ProcessData& workerData = workers[i];
AddressExclusion addr(workerData.address.ip, workerData.address.port);
existingAddrs.insert(addr);
if (self->invalidLocalityAddr.count(addr) &&
self->isValidLocality(self->configuration.storagePolicy, workerData.locality)) {
// The locality info on the addr has been corrected
self->invalidLocalityAddr.erase(addr);
hasCorrectedLocality = true;
TraceEvent("InvalidLocalityCorrected").detail("Addr", addr.toString());
}
}
wait(yield(TaskPriority::DataDistribution));
// In case system operator permanently excludes workers on the address with invalid locality
for (auto addr = self->invalidLocalityAddr.begin(); addr != self->invalidLocalityAddr.end();) {
if (!existingAddrs.count(*addr)) {
// The address no longer has a worker
addr = self->invalidLocalityAddr.erase(addr);
hasCorrectedLocality = true;
TraceEvent("InvalidLocalityNoLongerExists").detail("Addr", addr->toString());
} else {
++addr;
}
}
if (hasCorrectedLocality) {
// Recruit on address who locality has been corrected
self->restartRecruiting.trigger();
hasCorrectedLocality = false;
}
if (self->invalidLocalityAddr.empty()) {
break;
}
if (now() - start > 300) { // Report warning if invalid locality is not corrected within 300 seconds
// The incorrect locality info has not been properly corrected in a reasonable time
TraceEvent(SevWarn, "PersistentInvalidLocality").detail("Addresses", self->invalidLocalityAddr.size());
start = now();
}
} catch (Error& e) {
TraceEvent("CheckAndRemoveInvalidLocalityAddrRetry", self->distributorId).detail("Error", e.what());
}
}
return Void();
}
int numExistingSSOnAddr(DDTeamCollection* self, const AddressExclusion& addr) {
int numExistingSS = 0;
for (auto& server : self->server_info) {
@ -3757,13 +3871,21 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
}
auto excl = self->excludedServers.getKeys();
for(auto& s : excl)
for (auto& s : excl) {
if (self->excludedServers.get(s)) {
TraceEvent(SevDebug, "DDRecruitExcl2")
.detail("Primary", self->primary)
.detail("Excluding", s.toString());
exclusions.insert( s );
}
}
// Exclude workers that have invalid locality
for (auto& addr : self->invalidLocalityAddr) {
TraceEvent(SevDebug, "DDRecruitExclInvalidAddr").detail("Excluding", addr.toString());
exclusions.insert(addr);
}
rsr.criticalRecruitment = self->healthyTeamCount == 0;
for(auto it : exclusions) {
rsr.excludeAddresses.push_back(it);
@ -3805,7 +3927,7 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<
}
when(wait(self->restartRecruiting.onTrigger())) {}
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
wait( delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::DataDistribution) );
} catch( Error &e ) {
if(e.code() != error_code_timed_out) {
throw;
@ -3876,7 +3998,7 @@ ACTOR Future<Void> remoteRecovered( Reference<AsyncVar<struct ServerDBInfo>> db
ACTOR Future<Void> monitorHealthyTeams( DDTeamCollection* self ) {
TraceEvent("DDMonitorHealthyTeamsStart").detail("ZeroHealthyTeams", self->zeroHealthyTeams->get());
loop choose {
when ( wait(self->zeroHealthyTeams->get() ? delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY) : Never()) ) {
when ( wait(self->zeroHealthyTeams->get() ? delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistribution) : Never()) ) {
self->doBuildTeams = true;
wait( DDTeamCollection::checkBuildTeams(self) );
}

View File

@ -69,6 +69,7 @@ struct DataDistributionTracker {
KeyRangeMap< ShardTrackedData > shards;
ActorCollection sizeChanges;
int64_t systemSizeEstimate;
Reference<AsyncVar<int64_t>> dbSizeEstimate;
Reference<AsyncVar<Optional<int64_t>>> maxShardSize;
Future<Void> maxShardSizeUpdater;
@ -81,7 +82,7 @@ struct DataDistributionTracker {
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), distributorId( distributorId ), dbSizeEstimate( new AsyncVar<int64_t>() ),
: cx(cx), distributorId( distributorId ), dbSizeEstimate( new AsyncVar<int64_t>() ), systemSizeEstimate(0),
maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
@ -138,8 +139,7 @@ int64_t getMaxShardSize( double dbSizeEstimate ) {
ACTOR Future<Void> trackShardBytes(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
bool addToSizeEstimate = true)
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize)
{
wait( delay( 0, TaskPriority::DataDistribution ) );
@ -203,8 +203,12 @@ ACTOR Future<Void> trackShardBytes(
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
.detail("TrackerID", trackerID);*/
if( shardSize->get().present() && addToSizeEstimate )
if( shardSize->get().present() ) {
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardSize->get().get().bytes );
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate += metrics.bytes - shardSize->get().get().bytes;
}
}
shardSize->set( metrics );
}
@ -256,8 +260,13 @@ ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<StorageMetrics>>
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRangeRef keys, int64_t oldShardsEndingSize ) {
state vector<Future<int64_t>> sizes;
state vector<Future<int64_t>> systemSizes;
for (auto it : self->shards.intersectingRanges(keys) ) {
sizes.push_back( getFirstSize( it->value().stats ) );
Future<int64_t> thisSize = getFirstSize( it->value().stats );
sizes.push_back( thisSize );
if(it->range().begin >= systemKeys.begin) {
systemSizes.push_back( thisSize );
}
}
wait( waitForAll( sizes ) );
@ -267,12 +276,20 @@ ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRangeRef keys,
for ( int i = 0; i < sizes.size(); i++ )
newShardsStartingSize += sizes[i].get();
int64_t newSystemShardsStartingSize = 0;
for ( int i = 0; i < systemSizes.size(); i++ )
newSystemShardsStartingSize += systemSizes[i].get();
int64_t totalSizeEstimate = self->dbSizeEstimate->get();
/*TraceEvent("TrackerChangeSizes")
.detail("TotalSizeEstimate", totalSizeEstimate)
.detail("EndSizeOfOldShards", oldShardsEndingSize)
.detail("StartingSizeOfNewShards", newShardsStartingSize);*/
self->dbSizeEstimate->set( totalSizeEstimate + newShardsStartingSize - oldShardsEndingSize );
self->systemSizeEstimate += newSystemShardsStartingSize;
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate -= oldShardsEndingSize;
}
return Void();
}
@ -641,7 +658,7 @@ ACTOR Future<Void> fetchShardMetrics_impl( DataDistributionTracker* self, GetMet
ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsRequest req ) {
choose {
when( wait( fetchShardMetrics_impl( self, req ) ) ) {}
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT ) ) ) {
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT, TaskPriority::DataDistribution ) ) ) {
TEST(true); // DD_SHARD_METRICS_TIMEOUT
StorageMetrics largeMetrics;
largeMetrics.bytes = SERVER_KNOBS->MAX_SHARD_BYTES;
@ -676,6 +693,7 @@ ACTOR Future<Void> dataDistributionTracker(
TraceEvent("DDTrackerStats", self.distributorId)
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest( "DDTrackerStats" );
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL);

View File

@ -187,6 +187,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( DD_ZERO_HEALTHY_TEAM_DELAY, 1.0 );
init( REBALANCE_MAX_RETRIES, 100 );
init( DD_OVERLAP_PENALTY, 10000 );
init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false;
init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600;
// TeamRemover
TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = false; if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -145,6 +145,8 @@ public:
double DEBOUNCE_RECRUITING_DELAY;
int REBALANCE_MAX_RETRIES;
int DD_OVERLAP_PENALTY;
bool DD_VALIDATE_LOCALITY;
int DD_CHECK_INVALID_LOCALITY_DELAY;
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -67,12 +67,11 @@ ACTOR Future<MoveKeysLock> takeMoveKeysLock( Database cx, UID ddId ) {
loop {
try {
state MoveKeysLock lock;
state UID txnId;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
if( !g_network->isSimulated() ) {
UID id(deterministicRandom()->randomUniqueID());
TraceEvent("TakeMoveKeysLockTransaction", ddId)
.detail("TransactionUID", id);
tr.debugTransaction( id );
txnId = deterministicRandom()->randomUniqueID();
tr.debugTransaction(txnId);
}
{
Optional<Value> readVal = wait( tr.get( moveKeysLockOwnerKey ) );
@ -85,6 +84,11 @@ ACTOR Future<MoveKeysLock> takeMoveKeysLock( Database cx, UID ddId ) {
lock.myOwner = deterministicRandom()->randomUniqueID();
tr.set(moveKeysLockOwnerKey, BinaryWriter::toValue(lock.myOwner, Unversioned()));
wait(tr.commit());
TraceEvent("TakeMoveKeysLockTransaction", ddId)
.detail("TransactionUID", txnId)
.detail("PrevOwner", lock.prevOwner.toString())
.detail("PrevWrite", lock.prevWrite.toString())
.detail("MyOwner", lock.myOwner.toString());
return lock;
} catch (Error &e){
wait(tr.onError(e));
@ -111,11 +115,19 @@ ACTOR Future<Void> checkMoveKeysLock( Transaction* tr, MoveKeysLock lock, bool i
}
// Take the lock
if(isWrite) {
BinaryWriter wrMyOwner(Unversioned()); wrMyOwner << lock.myOwner;
tr->set( moveKeysLockOwnerKey, wrMyOwner.toValue() );
BinaryWriter wrLastWrite(Unversioned()); wrLastWrite << deterministicRandom()->randomUniqueID();
tr->set( moveKeysLockWriteKey, wrLastWrite.toValue() );
if (isWrite) {
BinaryWriter wrMyOwner(Unversioned());
wrMyOwner << lock.myOwner;
tr->set(moveKeysLockOwnerKey, wrMyOwner.toValue());
BinaryWriter wrLastWrite(Unversioned());
UID lastWriter = deterministicRandom()->randomUniqueID();
wrLastWrite << lastWriter;
tr->set(moveKeysLockWriteKey, wrLastWrite.toValue());
TraceEvent("CheckMoveKeysLock")
.detail("PrevOwner", lock.prevOwner.toString())
.detail("PrevWrite", lock.prevWrite.toString())
.detail("MyOwner", lock.myOwner.toString())
.detail("Writer", lastWriter.toString());
}
return Void();

View File

@ -697,7 +697,7 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
conf->fromKeyValues( (VectorRef<KeyValueRef>) results );
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey);
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey);
wait( tr.commit() );
wait( watchFuture );
break;

View File

@ -1328,6 +1328,7 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
if (dataStats.size())
{
statusObjData.setKeyRawNumber("total_kv_size_bytes",dataStats.getValue("TotalSizeBytes"));
statusObjData.setKeyRawNumber("system_kv_size_bytes",dataStats.getValue("SystemSizeBytes"));
statusObjData.setKeyRawNumber("partitions_count",dataStats.getValue("Shards"));
}

View File

@ -1577,6 +1577,11 @@ int main(int argc, char* argv[]) {
// evictionPolicyStringToEnum will throw an exception if the string is not recognized as a valid
EvictablePageCache::evictionPolicyStringToEnum(flowKnobs->CACHE_EVICTION_POLICY);
if (memLimit <= FLOW_KNOBS->PAGE_CACHE_4K) {
fprintf(stderr, "ERROR: --memory has to be larger than --cache_memory\n");
flushAndExit(FDB_EXIT_ERROR);
}
if (role == SkipListTest) {
skipListTest();
flushAndExit(FDB_EXIT_SUCCESS);

View File

@ -1213,7 +1213,7 @@ ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
self->registrationTrigger.trigger();
}
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey);
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey);
wait(tr.commit());
wait(watchFuture);
break;

View File

@ -106,6 +106,7 @@ struct AddingShard : NonCopyable {
Version transferredVersion;
enum Phase { WaitPrevious, Fetching, Waiting };
Phase phase;
AddingShard( StorageServer* server, KeyRangeRef const& keys );
@ -1948,8 +1949,9 @@ void splitMutation(StorageServer* data, KeyRangeMap<T>& map, MutationRef const&
ACTOR Future<Void> logFetchKeysWarning(AddingShard* shard) {
state double startTime = now();
loop {
wait(delay(600));
TraceEvent(SevWarnAlways, "FetchKeysTooLong").detail("Duration", now() - startTime).detail("Phase", shard->phase).detail("Begin", shard->keys.begin.printable()).detail("End", shard->keys.end.printable());
state double waitSeconds = BUGGIFY ? 5.0 : 600.0;
wait(delay(waitSeconds));
TraceEvent(waitSeconds > 300.0 ? SevWarnAlways : SevInfo, "FetchKeysTooLong").detail("Duration", now() - startTime).detail("Phase", shard->phase).detail("Begin", shard->keys.begin.printable()).detail("End", shard->keys.end.printable());
}
}
@ -2068,6 +2070,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
shard = data->shards.rangeContaining( keys.begin ).value()->adding;
warningLogger = logFetchKeysWarning(shard);
AddingShard* otherShard = data->shards.rangeContaining( nfk ).value()->adding;
keys = shard->keys;

View File

@ -111,7 +111,7 @@ extern bool isAssertDisabled( int line );
// ASSERT_WE_THINK() is to be used for assertions that we want to validate in testing, but which are judged too
// risky to evaluate at runtime, because the code should work even if they are false and throwing internal_error() would
// result in a bug. Don't use it for assertions that are *expensive*; look at EXPENSIVE_VALIDATION.
#define ASSERT_WE_THINK( condition ) ASSERT( !g_network->isSimulated() || condition )
#define ASSERT_WE_THINK( condition ) ASSERT( !g_network->isSimulated() || (condition) )
#define ABORT_ON_ERROR( code_to_run ) \
try { code_to_run; } \

View File

@ -518,13 +518,13 @@ const char* getInterfaceName(const IPAddress& _ip) {
if(!iter->ifa_addr)
continue;
if (iter->ifa_addr->sa_family == AF_INET && _ip.isV4()) {
uint32_t ip = ntohl(((struct sockaddr_in*)iter->ifa_addr)->sin_addr.s_addr);
uint32_t ip = ntohl((reinterpret_cast<struct sockaddr_in*>(iter->ifa_addr))->sin_addr.s_addr);
if (ip == _ip.toV4()) {
ifa_name = iter->ifa_name;
break;
}
} else if (iter->ifa_addr->sa_family == AF_INET6 && _ip.isV6()) {
struct sockaddr_in6* ifa_addr = (struct sockaddr_in6*)iter->ifa_addr;
struct sockaddr_in6* ifa_addr = reinterpret_cast<struct sockaddr_in6*>(iter->ifa_addr);
if (memcmp(_ip.toV6().data(), &ifa_addr->sin6_addr, 16) == 0) {
ifa_name = iter->ifa_name;
break;

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{6BAF0715-4FDE-4F0D-8CA7-E4CAD53519B8}'
Id='{AE316E98-2C7C-4C06-8C4D-0BAD3183DEDD}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'