Merge branch 'master' into fdb_cache_subfeature2

This commit is contained in:
negoyal 2020-04-02 12:27:04 -07:00
commit a0c8946f31
77 changed files with 1889 additions and 269 deletions

View File

@ -189,15 +189,15 @@ if(NOT WIN32)
else()
add_subdirectory(fdbservice)
endif()
add_subdirectory(fdbbackup)
add_subdirectory(contrib)
add_subdirectory(tests)
if(WITH_PYTHON)
add_subdirectory(bindings)
endif()
add_subdirectory(fdbbackup)
add_subdirectory(tests)
if(WITH_DOCUMENTATION)
add_subdirectory(documentation)
endif()
add_subdirectory(contrib/monitoring)
if(WIN32)
add_subdirectory(packaging/msi)

View File

@ -13,3 +13,6 @@ endif()
if(WITH_RUBY)
add_subdirectory(ruby)
endif()
if(NOT WIN32 AND NOT OPEN_FOR_IDE)
package_bindingtester()
endif()

View File

@ -1626,6 +1626,7 @@ struct UnitTestsFunc : InstructionFunc {
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_READ_LOCK_AWARE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_INCLUDE_PORT_IN_ADDRESS);
tr->setOption(FDBTransactionOption::FDB_TR_OPTION_REPORT_CONFLICTING_KEYS);
Optional<FDBStandalone<ValueRef> > _ = wait(tr->get(LiteralStringRef("\xff")));
tr->cancel();

View File

@ -184,15 +184,87 @@ function(create_test_package)
file(COPY ${file} DESTINATION ${CMAKE_BINARY_DIR}/packages/${dest_dir})
endforeach()
endforeach()
set(tar_file ${CMAKE_BINARY_DIR}/packages/correctness.tar.gz)
if(NOT USE_VALGRIND)
set(tar_file ${CMAKE_BINARY_DIR}/packages/correctness-${CMAKE_PROJECT_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}
DEPENDS ${out_files}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTest.sh ${CMAKE_BINARY_DIR}/packages/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTimeout.sh ${CMAKE_BINARY_DIR}/packages/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${CMAKE_BINARY_DIR}/packages/bin/fdbserver
${CMAKE_BINARY_DIR}/packages/bin/TestHarness.exe
${CMAKE_BINARY_DIR}/packages/bin/TraceLogHelper.dll
${CMAKE_BINARY_DIR}/packages/joshua_test
${CMAKE_BINARY_DIR}/packages/joshua_timeout
${out_files} ${external_files}
COMMAND ${CMAKE_COMMAND} -E remove ${CMAKE_BINARY_DIR}/packages/joshua_test ${CMAKE_BINARY_DIR}/packages/joshua_timeout
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/packages
COMMENT "Package correctness archive"
)
add_custom_target(package_tests ALL DEPENDS ${tar_file})
add_dependencies(package_tests strip_only_fdbserver TestHarness)
endif()
if(USE_VALGRIND)
set(tar_file ${CMAKE_BINARY_DIR}/packages/valgrind-${CMAKE_PROJECT_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}
DEPENDS ${out_files}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/valgrindTest.sh ${CMAKE_BINARY_DIR}/packages/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/valgrindTimeout.sh ${CMAKE_BINARY_DIR}/packages/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${CMAKE_BINARY_DIR}/packages/bin/fdbserver
${CMAKE_BINARY_DIR}/packages/bin/TestHarness.exe
${CMAKE_BINARY_DIR}/packages/bin/TraceLogHelper.dll
${CMAKE_BINARY_DIR}/packages/joshua_test
${CMAKE_BINARY_DIR}/packages/joshua_timeout
${out_files} ${external_files}
COMMAND ${CMAKE_COMMAND} -E remove ${CMAKE_BINARY_DIR}/packages/joshua_test ${CMAKE_BINARY_DIR}/packages/joshua_timeout
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/packages
COMMENT "Package correctness archive"
)
add_custom_target(package_valgrind_tests ALL DEPENDS ${tar_file})
add_dependencies(package_valgrind_tests strip_only_fdbserver TestHarness)
endif()
endfunction()
function(package_bindingtester)
if(WIN32 OR OPEN_FOR_IDE)
return()
elseif(APPLE)
set(fdbcName "libfdb_c.dylib")
else()
set(fdbcName "libfdb_c.so")
endif()
set(bdir ${CMAKE_BINARY_DIR}/bindingtester)
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/bindingtester)
set(outfiles ${bdir}/fdbcli ${bdir}/fdbserver ${bdir}/${fdbcName} ${bdir}/joshua_test ${bdir}/joshua_timeout)
add_custom_command(
OUTPUT ${outfiles}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_BINARY_DIR}/packages/bin/fdbcli
${CMAKE_BINARY_DIR}/packages/bin/fdbserver
${CMAKE_BINARY_DIR}/packages/lib/${fdbcName}
${bdir}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/bindingTest.sh ${bdir}/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/bindingTimeout.sh ${bdir}/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/localClusterStart.sh ${bdir}/localClusterStart.sh
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/bindingTestScript.sh ${bdir}/bindingTestScript.sh
COMMENT "Copy executes to bindingtester dir")
file(GLOB_RECURSE test_files ${CMAKE_SOURCE_DIR}/bindings/*)
add_custom_command(
OUTPUT "${CMAKE_BINARY_DIR}/bindingtester.touch"
COMMAND ${CMAKE_COMMAND} -E remove_directory ${CMAKE_BINARY_DIR}/bindingtester/tests
COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_BINARY_DIR}/bindingtester/tests
COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_SOURCE_DIR}/bindings ${CMAKE_BINARY_DIR}/bindingtester/tests
COMMAND ${CMAKE_COMMAND} -E touch "${CMAKE_BINARY_DIR}/bindingtester.touch"
COMMENT "Copy test files for bindingtester")
add_custom_target(copy_bindingtester_binaries DEPENDS ${outfiles} "${CMAKE_BINARY_DIR}/bindingtester.touch")
add_dependencies(copy_bindingtester_binaries strip_only_fdbserver strip_only_fdbcli strip_only_fdb_c)
set(tar_file ${CMAKE_BINARY_DIR}/packages/bindingtester-${CMAKE_PROJECT_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}
DEPENDS ${out_files}
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${CMAKE_BINARY_DIR}/packages/bin/fdbserver
${out_files} ${external_files}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/packages
COMMENT "Package correctness archive"
)
add_custom_target(package_tests DEPENDS ${tar_file})
add_dependencies(package_tests strip_fdbserver)
COMMAND ${CMAKE_COMMAND} -E tar czf ${tar_file} *
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/bindingtester
COMMENT "Pack bindingtester")
add_custom_target(bindingtester ALL DEPENDS ${tar_file})
add_dependencies(bindingtester copy_bindingtester_binaries)
endfunction()

View File

@ -1,14 +1,22 @@
function(env_set var_name default_value type docstring)
set(val ${default_value})
if(DEFINED ENV{${var_name}})
set(val $ENV{${var_name}})
endif()
set(${var_name} ${val} CACHE ${type} "${docstring}")
endfunction()
set(USE_GPERFTOOLS OFF CACHE BOOL "Use gperfools for profiling")
set(USE_VALGRIND OFF CACHE BOOL "Compile for valgrind usage")
env_set(USE_VALGRIND OFF BOOL "Compile for valgrind usage")
set(USE_VALGRIND_FOR_CTEST ${USE_VALGRIND} CACHE BOOL "Use valgrind for ctest")
set(ALLOC_INSTRUMENTATION OFF CACHE BOOL "Instrument alloc")
set(WITH_UNDODB OFF CACHE BOOL "Use rr or undodb")
set(USE_ASAN OFF CACHE BOOL "Compile with address sanitizer")
set(USE_UBSAN OFF CACHE BOOL "Compile with undefined behavior sanitizer")
set(FDB_RELEASE OFF CACHE BOOL "This is a building of a final release")
set(USE_LD "DEFAULT" CACHE STRING "The linker to use for building: can be LD (system default, default choice), BFD, GOLD, or LLD")
set(USE_LIBCXX OFF CACHE BOOL "Use libc++")
set(USE_CCACHE OFF CACHE BOOL "Use ccache for compilation if available")
env_set(USE_LD "DEFAULT" STRING "The linker to use for building: can be LD (system default, default choice), BFD, GOLD, or LLD")
env_set(USE_LIBCXX OFF BOOL "Use libc++")
env_set(USE_CCACHE OFF BOOL "Use ccache for compilation if available")
set(RELATIVE_DEBUG_PATHS OFF CACHE BOOL "Use relative file paths in debug info")
set(STATIC_LINK_LIBCXX ON CACHE BOOL "Statically link libstdcpp/libc++")
set(USE_WERROR OFF CACHE BOOL "Compile with -Werror. Recommended for local development and CI.")
@ -56,11 +64,6 @@ else()
add_definitions(-DUSE_UCONTEXT)
endif()
if ((NOT USE_CCACHE) AND (NOT "$ENV{USE_CCACHE}" STREQUAL ""))
if (("$ENV{USE_CCACHE}" STREQUAL "ON") OR ("$ENV{USE_CCACHE}" STREQUAL "1") OR ("$ENV{USE_CCACHE}" STREQUAL "YES"))
set(USE_CCACHE ON)
endif()
endif()
if (USE_CCACHE)
FIND_PROGRAM(CCACHE_FOUND "ccache")
if(CCACHE_FOUND)
@ -71,13 +74,6 @@ if (USE_CCACHE)
endif()
endif()
if ((NOT USE_LIBCXX) AND (NOT "$ENV{USE_LIBCXX}" STREQUAL ""))
string(TOUPPER "$ENV{USE_LIBCXX}" USE_LIBCXXENV)
if (("${USE_LIBCXXENV}" STREQUAL "ON") OR ("${USE_LIBCXXENV}" STREQUAL "1") OR ("${USE_LIBCXXENV}" STREQUAL "YES"))
set(USE_LIBCXX ON)
endif()
endif()
include(CheckFunctionExists)
set(CMAKE_REQUIRED_INCLUDES stdlib.h malloc.h)
set(CMAKE_REQUIRED_LIBRARIES c)
@ -109,16 +105,6 @@ else()
set(GCC YES)
endif()
# Use the linker environmental variable, if specified and valid
if ((USE_LD STREQUAL "DEFAULT") AND (NOT "$ENV{USE_LD}" STREQUAL ""))
string(TOUPPER "$ENV{USE_LD}" USE_LDENV)
if (("${USE_LDENV}" STREQUAL "LD") OR ("${USE_LDENV}" STREQUAL "GOLD") OR ("${USE_LDENV}" STREQUAL "LLD") OR ("${USE_LDENV}" STREQUAL "BFD") OR ("${USE_LDENV}" STREQUAL "DEFAULT"))
set(USE_LD "${USE_LDENV}")
else()
message (FATAL_ERROR "USE_LD must be set to DEFAULT, LD, BFD, GOLD, or LLD!")
endif()
endif()
# check linker flags.
if (USE_LD STREQUAL "DEFAULT")
set(USE_LD "LD")
@ -196,12 +182,6 @@ else()
# -mavx
# -msse4.2)
if ((NOT USE_VALGRIND) AND (NOT "$ENV{USE_VALGRIND}" STREQUAL ""))
if (("$ENV{USE_VALGRIND}" STREQUAL "ON") OR ("$ENV{USE_VALGRIND}" STREQUAL "1") OR ("$ENV{USE_VALGRIND}" STREQUAL "YES"))
set(USE_VALGRIND ON)
endif()
endif()
if (USE_VALGRIND)
add_compile_options(-DVALGRIND -DUSE_VALGRIND)
endif()

View File

@ -130,21 +130,20 @@ function(strip_debug_symbols target)
list(APPEND strip_command -o "${out_file}")
add_custom_command(OUTPUT "${out_file}"
COMMAND ${strip_command} $<TARGET_FILE:${target}>
DEPENDS ${target}
COMMENT "Stripping symbols from ${target}")
add_custom_target(strip_only_${target} DEPENDS ${out_file})
if(is_exec AND NOT APPLE)
add_custom_command(OUTPUT "${out_file}.debug"
COMMAND objcopy --verbose --only-keep-debug $<TARGET_FILE:${target}> "${out_file}.debug"
COMMAND objcopy --verbose --add-gnu-debuglink="${out_file}.debug" "${out_file}"
DEPENDS ${out_file}
COMMENT "Copy debug symbols to ${out_name}.debug")
list(APPEND out_files "${out_file}.debug")
add_custom_target(strip_${target} DEPENDS "${out_file}.debug")
add_custom_target(strip_${target} DEPENDS "${out_file}.debug")
add_dependencies(strip_${target} ${target} strip_only_${target})
else()
add_custom_target(strip_${target})
add_dependencies(strip_${target} strip_only_${target})
endif()
add_dependencies(strip_${target} strip_only_${target})
add_dependencies(strip_${target} ${target})
add_dependencies(strip_targets strip_${target})
endfunction()

5
contrib/CMakeLists.txt Normal file
View File

@ -0,0 +1,5 @@
if(NOT WIN32)
add_subdirectory(monitoring)
add_subdirectory(TraceLogHelper)
add_subdirectory(TestHarness)
endif()

23
contrib/Joshua/README.md Normal file
View File

@ -0,0 +1,23 @@
# Overview
This directory provides the files needed to create a Joshua correctness bundle for testing FoundationDB.
Rigorous testing is central to our engineering process. The features of our core are challenging, requiring us to meet exacting standards of correctness and performance. Data guarantees and transactional integrity must be maintained not only during normal operations but over a broad range of failure scenarios. At the same time, we aim to achieve performance goals such as low latencies and near-linear scalability. To meet these challenges, we use a combined regime of robust simulation, live performance testing, and hardware-based failure testing.
# Joshua
Joshua is a powerful tool for testing system correctness. Our simulation technology, called Joshua, is enabled by and tightly integrated with `flow`, our programming language for actor-based concurrency. In addition to generating efficient production code, Flow works with Joshua for simulated execution.
The major goal of Joshua is to make sure that we find and diagnose issues in simulation rather than the real world. Joshua runs tens of thousands of simulations every night, each one simulating large numbers of component failures. Based on the volume of tests that we run and the increased intensity of the failures in our scenarios, we estimate that we have run the equivalent of roughly one trillion CPU-hours of simulation on FoundationDB.
Joshua is able to conduct a *deterministic* simulation of an entire FoundationDB cluster within a single-threaded process. Determinism is crucial in that it allows perfect repeatability of a simulated run, facilitating controlled experiments to home in on issues. The simulation steps through time, synchronized across the system, representing a larger amount of real time in a smaller amount of simulated time. In practice, our simulations usually have about a 10-1 factor of real-to-simulated time, which is advantageous for the efficiency of testing.
We run a broad range of simulations testing various aspects of the system. For example, we run a cycle test that uses key-values pairs arranged in a ring that executes transactions to change the values in a manner designed to maintain the ring's integrity, allowing a clear test of transactional isolation.
Joshua simulates all physical components of a FoundationDB system, beginning with the number and type of machines in the cluster. For example, Joshua models drive performance on each machine, including drive space and the possibility of the drive filling up. Joshua also models the network, allowing a small amount of code to specify delivery of packets.
We use Joshua to simulate failures modes at the network, machine, and datacenter levels, including connection failures, degradation of machine performance, machine shutdowns or reboots, machines coming back from the dead, etc. We stress-test all of these failure modes, failing machines at very short intervals, inducing unusually severe loads, and delaying communications channels.
For a while, there was an informal competition within the engineering team to design failures that found the toughest bugs and issues the most easily. After a period of one-upsmanship, the reigning champion is called "swizzle-clogging". To swizzle-clog, you first pick a random subset of nodes in the cluster. Then, you "clog" (stop) each of their network connections one by one over a few seconds. Finally, you unclog them in a random order, again one by one, until they are all up. This pattern seems to be particularly good at finding deep issues that only happen in the rarest real-world cases.
Joshua's success has surpassed our expectation and has been vital to our engineering team. It seems unlikely that we would have been able to build FoundationDB without this technology.

View File

@ -0,0 +1,11 @@
#!/bin/bash
SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
pkill fdbserver
ulimit -S -c unlimited
unset FDB_NETWORK_OPTION_EXTERNAL_CLIENT_DIRECTORY
WORKDIR="$(pwd)/tmp/$$"
if [ ! -d "${WORKDIR}" ] ; then
mkdir -p "${WORKDIR}"
fi
DEBUGLEVEL=0 DISPLAYERROR=1 RANDOMTEST=1 WORKDIR="${WORKDIR}" FDBSERVERPORT="${PORT_FDBSERVER:-4500}" ${SCRIPTDIR}/bindingTestScript.sh 1

View File

@ -0,0 +1,80 @@
#/bin/bash
SCRIPTDIR=$( cd "${BASH_SOURCE[0]%\/*}" && pwd )
cwd="$(pwd)"
BINDIR="${BINDIR:-${SCRIPTDIR}}"
LIBDIR="${BINDIR}:${LD_LIBRARY_PATH}"
SCRIPTID="${$}"
SAVEONERROR="${SAVEONERROR:-1}"
PYTHONDIR="${BINDIR}/tests/python"
testScript="${BINDIR}/tests/bindingtester/run_binding_tester.sh"
VERSION="1.6"
source ${SCRIPTDIR}/localClusterStart.sh
# Display syntax
if [ "$#" -lt 1 ]
then
echo "bindingTestScript.sh <number of test cycles>"
echo " version: ${VERSION}"
exit 1
fi
cycles="${1}"
if [ "${DEBUGLEVEL}" -gt 0 ]
then
echo "Work dir: ${WORKDIR}"
echo "Bin dir: ${BINDIR}"
echo "Log dir: ${LOGDIR}"
echo "Python path: ${PYTHONDIR}"
echo "Lib dir: ${LIBDIR}"
echo "Server port: ${FDBSERVERPORT}"
echo "Script Id: ${SCRIPTID}"
echo "Version: ${VERSION}"
fi
# Begin the cluster using the logic in localClusterStart.sh.
startCluster
# Display user message
if [ "${status}" -ne 0 ]; then
:
elif ! displayMessage "Running binding tester"
then
echo 'Failed to display user message'
let status="${status} + 1"
elif ! PYTHONPATH="${PYTHONDIR}" LD_LIBRARY_PATH="${LIBDIR}" FDB_CLUSTER_FILE="${FDBCONF}" LOGSTDOUT=1 CONSOLELOG="${WORKDIR}/console.log" "${testScript}" "${cycles}" "${WORKDIR}/errors/run.log"
then
if [ "${DEBUGLEVEL}" -gt 0 ]; then
printf "\n%-16s %-40s \n" "$(date '+%F %H-%M-%S')" "Failed to complete binding tester in ${SECONDS} seconds."
fi
let status="${status} + 1"
elif [ "${DEBUGLEVEL}" -gt 0 ]; then
printf "\n%-16s %-40s \n" "$(date '+%F %H-%M-%S')" "Completed binding tester in ${SECONDS} seconds"
fi
# Display directory and log information, if an error occurred
if [ "${status}" -ne 0 ]
then
ls "${WORKDIR}" > "${LOGDIR}/dir.log"
ps -eafw > "${LOGDIR}/process-preclean.log"
if [ -f "${FDBCONF}" ]; then
cp -f "${FDBCONF}" "${LOGDIR}/"
fi
# Display the severity errors
if [ -d "${LOGDIR}" ]; then
grep -ir 'Severity="40"' "${LOGDIR}"
fi
fi
# Save debug information files, environment, and log information, if an error occurred
if [ "${status}" -ne 0 ] && [ "${SAVEONERROR}" -gt 0 ]; then
ps -eafw > "${LOGDIR}/process-exit.log"
netstat -na > "${LOGDIR}/netstat.log"
df -h > "${LOGDIR}/disk.log"
env > "${LOGDIR}/env.log"
fi
exit "${status}"

View File

@ -0,0 +1,28 @@
#!/bin/bash -u
# Look for the start cluster log file.
notstarted=0
for file in `find . -name startcluster.log` ; do
if [ -n "$(grep 'Could not create database' "${file}")" ] ; then
echo "${file}:"
cat "${file}"
echo
notstarted=1
fi
done
# Print information on how the server didn't start.
if [ "${notstarted}" -gt 0 ] ; then
for file in `find . -name fdbclient.log` ; do
echo "${file}:"
cat "${file}"
echo
done
fi
# Print the test output.
for file in `find . -name console.log` ; do
echo "${file}:"
cat "${file}"
echo
done

View File

@ -0,0 +1,3 @@
#!/bin/sh
OLDBINDIR="${OLDBINDIR:-/app/deploy/global_data/oldBinaries}"
mono bin/TestHarness.exe joshua-run "${OLDBINDIR}" false

View File

@ -0,0 +1,4 @@
#!/bin/bash -u
for file in `find . -name 'trace*.xml'` ; do
mono ./bin/TestHarness.exe summarize "${file}" summary.xml "" JoshuaTimeout true
done

View File

@ -0,0 +1,315 @@
#!/bin/bash
SCRIPTDIR="${SCRIPTDIR:-$( cd "${BASH_SOURCE[0]%\/*}" && pwd )}"
DEBUGLEVEL="${DEBUGLEVEL:-1}"
WORKDIR="${WORKDIR:-${SCRIPTDIR}/tmp/fdb.work}"
LOGDIR="${WORKDIR}/log"
ETCDIR="${WORKDIR}/etc"
BINDIR="${BINDIR:-${SCRIPTDIR}}"
FDBSERVERPORT="${FDBSERVERPORT:-4500}"
FDBCONF="${ETCDIR}/fdb.cluster"
LOGFILE="${LOGFILE:-${LOGDIR}/startcluster.log}"
# Initialize the variables
status=0
messagetime=0
messagecount=0
function log
{
local status=0
if [ "$#" -lt 1 ]
then
echo "Usage: log <message> [echo]"
echo
echo "Logs the message and timestamp to LOGFILE (${LOGFILE}) and, if the"
echo "second argument is either not present or is set to 1, stdout."
let status="${status} + 1"
else
# Log to stdout.
if [ "$#" -lt 2 ] || [ "${2}" -ge 1 ]
then
echo "${1}"
fi
# Log to file.
datestr=$(date +"%Y-%m-%d %H:%M:%S (%s)")
dir=$(dirname "${LOGFILE}")
if ! [ -d "${dir}" ] && ! mkdir -p "${dir}"
then
echo "Could not create directory to log output."
let status="${status} + 1"
elif ! [ -f "${LOGFILE}" ] && ! touch "${LOGFILE}"
then
echo "Could not create file ${LOGFILE} to log output."
let status="${status} + 1"
elif ! echo "[ ${datestr} ] ${1}" >> "${LOGFILE}"
then
echo "Could not log output to ${LOGFILE}."
let status="${status} + 1"
fi
fi
return "${status}"
}
# Display a message for the user.
function displayMessage
{
local status=0
if [ "$#" -lt 1 ]
then
echo "displayMessage <message>"
let status="${status} + 1"
elif ! log "${1}" 0
then
log "Could not write message to file."
else
# Increment the message counter
let messagecount="${messagecount} + 1"
# Display successful message, if previous message
if [ "${messagecount}" -gt 1 ]
then
# Determine the amount of transpired time
let timespent="${SECONDS}-${messagetime}"
if [ "${DEBUGLEVEL}" -gt 0 ]; then
printf "... done in %3d seconds\n" "${timespent}"
fi
fi
# Display message
if [ "${DEBUGLEVEL}" -gt 0 ]; then
printf "%-16s %-35s " "$(date "+%F %H-%M-%S")" "$1"
fi
# Update the variables
messagetime="${SECONDS}"
fi
return "${status}"
}
# Create the directories used by the server.
function createDirectories {
# Display user message
if ! displayMessage "Creating directories"
then
echo 'Failed to display user message'
let status="${status} + 1"
elif ! mkdir -p "${LOGDIR}" "${ETCDIR}"
then
log "Failed to create directories"
let status="${status} + 1"
# Display user message
elif ! displayMessage "Setting file permissions"
then
log 'Failed to display user message'
let status="${status} + 1"
elif ! chmod 755 "${BINDIR}/fdbserver" "${BINDIR}/fdbcli"
then
log "Failed to set file permissions"
let status="${status} + 1"
else
while read filepath
do
if [ -f "${filepath}" ] && [ ! -x "${filepath}" ]
then
# if [ "${DEBUGLEVEL}" -gt 1 ]; then
# log " Enable executable: ${filepath}"
# fi
log " Enable executable: ${filepath}" "${DEBUGLEVEL}"
if ! chmod 755 "${filepath}"
then
log "Failed to set executable for file: ${filepath}"
let status="${status} + 1"
fi
fi
done < <(find "${BINDIR}" -iname '*.py' -o -iname '*.rb' -o -iname 'fdb_flow_tester' -o -iname '_stacktester' -o -iname '*.js' -o -iname '*.sh' -o -iname '*.ksh')
fi
return ${status}
}
# Create a cluster file for the local cluster.
function createClusterFile {
if [ "${status}" -ne 0 ]; then
:
# Display user message
elif ! displayMessage "Creating Fdb Cluster file"
then
log 'Failed to display user message'
let status="${status} + 1"
else
description=$(LC_CTYPE=C tr -dc A-Za-z0-9 < /dev/urandom 2> /dev/null | head -c 8)
random_str=$(LC_CTYPE=C tr -dc A-Za-z0-9 < /dev/urandom 2> /dev/null | head -c 8)
echo "$description:$random_str@127.0.0.1:${FDBSERVERPORT}" > "${FDBCONF}"
fi
if [ "${status}" -ne 0 ]; then
:
elif ! chmod 0664 "${FDBCONF}"; then
log "Failed to set permissions on fdbconf: ${FDBCONF}"
let status="${status} + 1"
fi
return ${status}
}
# Start the server running.
function startFdbServer {
if [ "${status}" -ne 0 ]; then
:
elif ! displayMessage "Starting Fdb Server"
then
log 'Failed to display user message'
let status="${status} + 1"
elif ! "${BINDIR}/fdbserver" -C "${FDBCONF}" -p "auto:${FDBSERVERPORT}" -L "${LOGDIR}" -d "${WORKDIR}/fdb/$$" &> "${LOGDIR}/fdbserver.log" &
then
log "Failed to start FDB Server"
# Maybe the server is already running
FDBSERVERID="$(pidof fdbserver)"
let status="${status} + 1"
else
FDBSERVERID="${!}"
fi
if ! kill -0 ${FDBSERVERID} ; then
log "FDB Server start failed."
let status="${status} + 1"
fi
return ${status}
}
function getStatus {
if [ "${status}" -ne 0 ]; then
:
elif ! date &>> "${LOGDIR}/fdbclient.log"
then
log 'Failed to get date'
let status="${status} + 1"
elif ! "${BINDIR}/fdbcli" -C "${FDBCONF}" --exec 'status json' --timeout 120 &>> "${LOGDIR}/fdbclient.log"
then
log 'Failed to get status from fdbcli'
let status="${status} + 1"
elif ! date &>> "${LOGDIR}/fdbclient.log"
then
log 'Failed to get date'
let status="${status} + 1"
fi
return ${status}
}
# Verify that the cluster is available.
function verifyAvailable {
# Verify that the server is running.
if ! kill -0 "${FDBSERVERID}"
then
log "FDB server process (${FDBSERVERID}) is not running"
let status="${status} + 1"
return 1
# Display user message.
elif ! displayMessage "Checking cluster availability"
then
log 'Failed to display user message'
let status="${status} + 1"
return 1
# Determine if status json says the database is available.
else
avail=`"${BINDIR}/fdbcli" -C "${FDBCONF}" --exec 'status json' --timeout 10 2> /dev/null | grep -E '"database_available"|"available"' | grep 'true'`
log "Avail value: ${avail}" "${DEBUGLEVEL}"
if [[ -n "${avail}" ]] ; then
return 0
else
return 1
fi
fi
}
# Configure the database on the server.
function createDatabase {
if [ "${status}" -ne 0 ]; then
:
# Ensure that the server is running
elif ! kill -0 "${FDBSERVERID}"
then
log "FDB server process: (${FDBSERVERID}) is not running"
let status="${status} + 1"
# Display user message
elif ! displayMessage "Creating database"
then
log 'Failed to display user message'
let status="${status} + 1"
elif ! echo "Client log:" &> "${LOGDIR}/fdbclient.log"
then
log 'Failed to create fdbclient.log'
let status="${status} + 1"
elif ! getStatus
then
log 'Failed to get status'
let status="${status} + 1"
# Configure the database.
else
"${BINDIR}/fdbcli" -C "${FDBCONF}" --exec 'configure new single memory; status' --timeout 240 --log --log-dir "${LOGDIR}" &>> "${LOGDIR}/fdbclient.log"
if ! displayMessage "Checking if config succeeded"
then
log 'Failed to display user message.'
fi
iteration=0
while [[ "${iteration}" -lt 10 ]] && ! verifyAvailable
do
log "Database not created (iteration ${iteration})."
let iteration="${iteration} + 1"
done
if ! verifyAvailable
then
log "Failed to create database via cli"
getStatus
cat "${LOGDIR}/fdbclient.log"
log "Ignoring -- moving on"
#let status="${status} + 1"
fi
fi
return ${status}
}
# Begin the local cluster from scratch.
function startCluster {
if [ "${status}" -ne 0 ]; then
:
elif ! createDirectories
then
log "Could not create directories."
let status="${status} + 1"
elif ! createClusterFile
then
log "Could not create cluster file."
let status="${status} + 1"
elif ! startFdbServer
then
log "Could not start FDB server."
let status="${status} + 1"
elif ! createDatabase
then
log "Could not create database."
let status="${status} + 1"
fi
return ${status}
}

View File

@ -0,0 +1,3 @@
#!/bin/sh
OLDBINDIR="${OLDBINDIR:-/app/deploy/global_data/oldBinaries}"
mono bin/TestHarness.exe joshua-run "${OLDBINDIR}" true

View File

@ -0,0 +1,6 @@
#!/bin/bash -u
for file in `find . -name 'trace*.xml'` ; do
for valgrindFile in `find . -name 'valgrind*.xml'` ; do
mono ./bin/TestHarness.exe summarize "${file}" summary.xml "${valgrindFile}" JoshuaTimeout true
done
done

View File

@ -0,0 +1,17 @@
set(SRCS
Program.cs
Properties/AssemblyInfo.cs)
set(TEST_HARNESS_REFERENCES
"-r:System,System.Core,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml,${TraceLogHelperDll}")
set(out_file ${CMAKE_BINARY_DIR}/packages/bin/TestHarness.exe)
add_custom_command(OUTPUT ${out_file}
COMMAND ${MCS_EXECUTABLE} ARGS ${TEST_HARNESS_REFERENCES} ${SRCS} "-target:exe" "-out:${out_file}"
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS ${SRCS}
COMMENT "Compile TestHarness" VERBATIM)
add_custom_target(TestHarness DEPENDS ${out_file})
add_dependencies(TestHarness TraceLogHelper)
set(TestHarnesExe "${out_file}" PARENT_SCOPE)

View File

@ -0,0 +1,20 @@
set(SRCS
Event.cs
JsonParser.cs
Properties/AssemblyInfo.cs
TraceLogUtil.cs
XmlParser.cs)
set(TRACE_LOG_HELPER_REFERENCES
"-r:System,System.Core,System.Runtime.Serialization,System.Xml.Linq,System.Data.DataSetExtensions,Microsoft.CSharp,System.Data,System.Xml")
set(out_file ${CMAKE_BINARY_DIR}/packages/bin/TraceLogHelper.dll)
add_custom_command(OUTPUT ${out_file}
COMMAND ${MCS_EXECUTABLE} ARGS ${TRACE_LOG_HELPER_REFERENCES} ${SRCS} "-target:library" "-out:${out_file}"
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS ${SRCS}
COMMENT "Compile TraceLogHelper" VERBATIM)
add_custom_target(TraceLogHelper DEPENDS ${out_file})
set(TraceLogHelperDll "${out_file}" PARENT_SCOPE)

View File

@ -162,6 +162,11 @@ The ``getrangekeys`` command fetches keys in a range. Its syntax is ``getrangeke
Note that :ref:`characters can be escaped <cli-escaping>` when specifying keys (or values) in ``fdbcli``.
getversion
----------
The ``getversion`` command fetches the current read version of the cluster or currently running transaction.
help
----

View File

@ -24,6 +24,7 @@ Bindings
Other Changes
-------------
* Double the number of shard locations that the client will cache locally. `(PR #2198) <https://github.com/apple/foundationdb/pull/2198>`_
* Add an option for transactions to report conflicting keys by calling getRange with the special key prefix \xff\xff/transaction/conflicting_keys/. `(PR 2257) <https://github.com/apple/foundationdb/pull/2257>`_
Earlier release notes
---------------------

View File

@ -219,10 +219,16 @@ struct VersionedMutations {
*/
struct DecodeProgress {
DecodeProgress() = default;
DecodeProgress(const LogFile& file) : file(file) {}
DecodeProgress(const LogFile& file, std::vector<std::tuple<Arena, Version, int32_t, StringRef>> values)
: file(file), keyValues(values) {}
// If there are no more mutations to pull.
bool finished() { return eof && keyValues.empty(); }
// If there are no more mutations to pull from the file.
// However, we could have unfinished version in the buffer when EOF is true,
// which means we should look for data in the next file. The caller
// should call getUnfinishedBuffer() to get these left data.
bool finished() { return (eof && keyValues.empty()) || (leftover && !keyValues.empty()); }
std::vector<std::tuple<Arena, Version, int32_t, StringRef>>&& getUnfinishedBuffer() { return std::move(keyValues); }
// Returns all mutations of the next version in a batch.
Future<VersionedMutations> getNextBatch() { return getNextBatchImpl(this); }
@ -242,12 +248,14 @@ struct DecodeProgress {
// PRECONDITION: finished() must return false before calling this function.
// Returns the next batch of mutations along with the arena backing it.
// Note the returned batch can be empty when the file has unfinished
// version batch data that are in the next file.
ACTOR static Future<VersionedMutations> getNextBatchImpl(DecodeProgress* self) {
ASSERT(!self->finished());
loop {
if (self->keyValues.size() == 1) {
// Try to decode another block when only one left
if (self->keyValues.size() <= 1) {
// Try to decode another block when less than one left
wait(readAndDecodeFile(self));
}
@ -281,19 +289,21 @@ struct DecodeProgress {
Standalone<StringRef> buf = self->combineValues(idx, bufSize);
value = buf;
m.arena = buf.arena();
} else {
m.arena = std::get<0>(tuple);
}
if (self->isValueComplete(value)) {
m.mutations = decode_value(value);
if (m.arena.getSize() == 0) {
m.arena = std::get<0>(tuple);
}
self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx);
return m;
} else if (!self->eof) {
// Read one more block, hopefully the missing part of the value can be found.
wait(readAndDecodeFile(self));
} else {
TraceEvent(SevError, "MissingValue").detail("Version", m.version);
throw restore_corrupted_data();
TraceEvent(SevWarn, "MissingValue").detail("Version", m.version);
self->leftover = true;
return m; // Empty mutations
}
}
}
@ -407,6 +417,7 @@ struct DecodeProgress {
Reference<IAsyncFile> fd;
int64_t offset = 0;
bool eof = false;
bool leftover = false; // Done but has unfinished version batch data left
// A (version, part_number)'s mutations and memory arena.
std::vector<std::tuple<Arena, Version, int32_t, StringRef>> keyValues;
};
@ -432,8 +443,12 @@ ACTOR Future<Void> decode_logs(DecodeParams params) {
printLogFiles("Relevant files are: ", logs);
state int i = 0;
// Previous file's unfinished version data
state std::vector<std::tuple<Arena, Version, int32_t, StringRef>> left;
for (; i < logs.size(); i++) {
state DecodeProgress progress(logs[i]);
if (logs[i].fileSize == 0) continue;
state DecodeProgress progress(logs[i], left);
wait(progress.openFile(container));
while (!progress.finished()) {
VersionedMutations vms = wait(progress.getNextBatch());
@ -441,6 +456,10 @@ ACTOR Future<Void> decode_logs(DecodeParams params) {
std::cout << vms.version << " " << m.toString() << "\n";
}
}
left = progress.getUnfinishedBuffer();
if (!left.empty()) {
TraceEvent("UnfinishedFile").detail("File", logs[i].fileName).detail("Q", left.size());
}
}
return Void();
}

View File

@ -103,6 +103,7 @@ enum {
OPT_EXPIRE_RESTORABLE_AFTER_VERSION, OPT_EXPIRE_RESTORABLE_AFTER_DATETIME, OPT_EXPIRE_MIN_RESTORABLE_DAYS,
OPT_BASEURL, OPT_BLOB_CREDENTIALS, OPT_DESCRIBE_DEEP, OPT_DESCRIBE_TIMESTAMPS,
OPT_DUMP_BEGIN, OPT_DUMP_END, OPT_JSON, OPT_DELETE_DATA, OPT_MIN_CLEANUP_SECONDS,
OPT_USE_PARTITIONED_LOG,
// Backup and Restore constants
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
@ -169,6 +170,8 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_NOSTOPWHENDONE, "--no-stop-when-done",SO_NONE },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_USE_PARTITIONED_LOG, "-p", SO_NONE },
{ OPT_USE_PARTITIONED_LOG, "--partitioned_log", SO_NONE },
{ OPT_SNAPSHOTINTERVAL, "-s", SO_REQ_SEP },
{ OPT_SNAPSHOTINTERVAL, "--snapshot_interval", SO_REQ_SEP },
{ OPT_TAGNAME, "-t", SO_REQ_SEP },
@ -953,6 +956,7 @@ static void printBackupUsage(bool devhelp) {
printf(" -e ERRORLIMIT The maximum number of errors printed by status (default is 10).\n");
printf(" -k KEYS List of key ranges to backup.\n"
" If not specified, the entire database will be backed up.\n");
printf(" -p, --partitioned_log Starts with new type of backup system using partitioned logs.\n");
printf(" -n, --dryrun For backup start or restore start, performs a trial run with no actual changes made.\n");
printf(" --log Enables trace file logging for the CLI session.\n"
" --logdir PATH Specifes the output directory for trace files. If\n"
@ -1744,9 +1748,10 @@ ACTOR Future<Void> submitDBBackup(Database src, Database dest, Standalone<Vector
return Void();
}
ACTOR Future<Void> submitBackup(Database db, std::string url, int snapshotIntervalSeconds, Standalone<VectorRef<KeyRangeRef>> backupRanges, std::string tagName, bool dryRun, bool waitForCompletion, bool stopWhenDone) {
try
{
ACTOR Future<Void> submitBackup(Database db, std::string url, int snapshotIntervalSeconds,
Standalone<VectorRef<KeyRangeRef>> backupRanges, std::string tagName, bool dryRun,
bool waitForCompletion, bool stopWhenDone, bool usePartitionedLog) {
try {
state FileBackupAgent backupAgent;
// Backup everything, if no ranges were specified
@ -1789,7 +1794,8 @@ ACTOR Future<Void> submitBackup(Database db, std::string url, int snapshotInterv
}
else {
wait(backupAgent.submitBackup(db, KeyRef(url), snapshotIntervalSeconds, tagName, backupRanges, stopWhenDone));
wait(backupAgent.submitBackup(db, KeyRef(url), snapshotIntervalSeconds, tagName, backupRanges, stopWhenDone,
usePartitionedLog));
// Wait for the backup to complete, if requested
if (waitForCompletion) {
@ -1811,8 +1817,7 @@ ACTOR Future<Void> submitBackup(Database db, std::string url, int snapshotInterv
}
}
}
}
catch (Error& e) {
} catch (Error& e) {
if(e.code() == error_code_actor_cancelled)
throw;
switch (e.code())
@ -2908,6 +2913,7 @@ int main(int argc, char* argv[]) {
std::string restoreTimestamp;
bool waitForDone = false;
bool stopWhenDone = true;
bool usePartitionedLog = false; // Set to true to use new backup system
bool forceAction = false;
bool trace = false;
bool quietDisplay = false;
@ -3153,6 +3159,9 @@ int main(int argc, char* argv[]) {
case OPT_NOSTOPWHENDONE:
stopWhenDone = false;
break;
case OPT_USE_PARTITIONED_LOG:
usePartitionedLog = true;
break;
case OPT_RESTORECONTAINER:
restoreContainer = args->OptionArg();
// If the url starts with '/' then prepend "file://" for backwards compatibility
@ -3564,7 +3573,8 @@ int main(int argc, char* argv[]) {
return FDB_EXIT_ERROR;
// Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable.
openBackupContainer(argv[0], destinationContainer);
f = stopAfter( submitBackup(db, destinationContainer, snapshotIntervalSeconds, backupKeys, tagName, dryRun, waitForDone, stopWhenDone) );
f = stopAfter(submitBackup(db, destinationContainer, snapshotIntervalSeconds, backupKeys, tagName,
dryRun, waitForDone, stopWhenDone, usePartitionedLog));
break;
}

View File

@ -522,6 +522,9 @@ void initHelp() {
"getrangekeys <BEGINKEY> [ENDKEY] [LIMIT]",
"fetch keys in a range of keys",
"Displays up to LIMIT keys for keys between BEGINKEY (inclusive) and ENDKEY (exclusive). If ENDKEY is omitted, then the range will include all keys starting with BEGINKEY. LIMIT defaults to 25 if omitted." ESCAPINGK);
helpMap["getversion"] =
CommandHelp("getversion", "Fetch the current read version",
"Displays the current read version of the database or currently running transaction.");
helpMap["reset"] = CommandHelp(
"reset",
"reset the current transaction",
@ -2208,36 +2211,44 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
workerPorts[addr.address.ip].insert(addr.address.port);
// Print a list of all excluded addresses that don't have a corresponding worker
std::vector<AddressExclusion> absentExclusions;
std::set<AddressExclusion> absentExclusions;
for(auto addr : addresses) {
auto worker = workerPorts.find(addr.ip);
if(worker == workerPorts.end())
absentExclusions.push_back(addr);
absentExclusions.insert(addr);
else if(addr.port > 0 && worker->second.count(addr.port) == 0)
absentExclusions.push_back(addr);
absentExclusions.insert(addr);
}
if(!absentExclusions.empty()) {
printf("\nWARNING: the following servers were not present in the cluster. Be sure that you\n"
"excluded the correct machines or processes before removing them from the cluster:\n");
for(auto addr : absentExclusions) {
for (auto addr : addresses) {
NetworkAddress _addr(addr.ip, addr.port);
if (absentExclusions.find(addr) != absentExclusions.end()) {
if(addr.port == 0)
printf(" %s\n", addr.ip.toString().c_str());
printf(" %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the "
"correct machines before removing them from the cluster!\n",
addr.ip.toString().c_str());
else
printf(" %s\n", addr.toString().c_str());
}
printf("\n");
} else if (notExcludedServers.empty()) {
printf("\nIt is now safe to remove these machines or processes from the cluster.\n");
} else {
printf("\nWARNING: Exclusion in progress. It is not safe to remove the following machines\n"
"or processes from the cluster:\n");
for (auto addr : notExcludedServers) {
printf(" %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes "
"before removing them from the cluster!\n",
addr.toString().c_str());
} else if (notExcludedServers.find(_addr) != notExcludedServers.end()) {
if (addr.port == 0)
printf(" %s\n", addr.ip.toString().c_str());
printf(" %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this "
"machine from the cluster\n",
addr.ip.toString().c_str());
else
printf(" %s\n", addr.toString().c_str());
printf(" %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the "
"cluster\n",
addr.toString().c_str());
} else {
if (addr.port == 0)
printf(" %s(Whole machine) ---- Successfully excluded. It is now safe to remove this machine "
"from the cluster.\n",
addr.ip.toString().c_str());
else
printf(
" %s ---- Successfully excluded. It is now safe to remove this process from the cluster.\n",
addr.toString().c_str());
}
}
@ -3065,6 +3076,17 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "getversion")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);
is_error = true;
} else {
Version v = wait(makeInterruptable(getTransaction(db, tr, options, intrans)->getReadVersion()));
printf("%ld\n", v);
}
continue;
}
if (tokencmp(tokens[0], "kill")) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {

View File

@ -657,6 +657,18 @@ public:
return dumpFileList_impl(Reference<BackupContainerFileSystem>::addRef(this), begin, end);
}
ACTOR static Future<bool> isPartitionedBackup_impl(Reference<BackupContainerFileSystem> bc) {
BackupFileList list = wait(bc->dumpFileList(0, std::numeric_limits<Version>::max()));
for (const auto& file : list.logs) {
if (file.isPartitionedLog()) return true;
}
return false;
}
Future<bool> isPartitionedBackup() final {
return isPartitionedBackup_impl(Reference<BackupContainerFileSystem>::addRef(this));
}
static Version resolveRelativeVersion(Optional<Version> max, Version v, const char *name, Error e) {
if(v == invalidVersion) {
TraceEvent(SevError, "BackupExpireInvalidVersion").detail(name, v);
@ -811,11 +823,17 @@ public:
// If we didn't get log versions above then seed them using the first log file
if (!desc.contiguousLogEnd.present()) {
desc.minLogBegin = logs.begin()->beginVersion;
desc.contiguousLogEnd = logs.begin()->endVersion;
if (partitioned) {
// Cannot use the first file's end version, which may not be contiguous
// for other partitions. Set to its beginVersion to be safe.
desc.contiguousLogEnd = logs.begin()->beginVersion;
} else {
desc.contiguousLogEnd = logs.begin()->endVersion;
}
}
if (partitioned) {
determinePartitionedLogsBeginEnd(&desc, logs);
updatePartitionedLogsContinuousEnd(&desc, logs, scanBegin, scanEnd);
} else {
Version& end = desc.contiguousLogEnd.get();
computeRestoreEndVersion(logs, nullptr, &end, std::numeric_limits<Version>::max());
@ -1137,14 +1155,24 @@ public:
indices.push_back(i);
}
// check tag 0 is continuous and create a map of ranges to tags
std::map<std::pair<Version, Version>, int> tags; // range [start, end] -> tags
if (!isContinuous(files, tagIndices[0], begin, end, &tags)) return false;
// check partition 0 is continuous and create a map of ranges to tags
std::map<std::pair<Version, Version>, int> tags; // range [begin, end] -> tags
if (!isContinuous(files, tagIndices[0], begin, end, &tags)) {
TraceEvent(SevWarn, "BackupFileNotContinuous")
.detail("Partition", 0)
.detail("RangeBegin", begin)
.detail("RangeEnd", end);
return false;
}
// for each range in tags, check all tags from 1 are continouous
for (const auto [beginEnd, count] : tags) {
for (int i = 1; i < count; i++) {
if (!isContinuous(files, tagIndices[i], beginEnd.first, beginEnd.second, nullptr)) {
if (!isContinuous(files, tagIndices[i], beginEnd.first, std::min(beginEnd.second - 1, end), nullptr)) {
TraceEvent(SevWarn, "BackupFileNotContinuous")
.detail("Partition", i)
.detail("RangeBegin", beginEnd.first)
.detail("RangeEnd", beginEnd.second);
return false;
}
}
@ -1172,19 +1200,34 @@ public:
return filtered;
}
// Analyze partitioned logs and set minLogBegin and contiguousLogEnd.
// For partitioned logs, different tags may start at different versions, so
// we need to find the "minLogBegin" version as well.
static void determinePartitionedLogsBeginEnd(BackupDescription* desc, const std::vector<LogFile>& logs) {
// Analyze partitioned logs and set contiguousLogEnd for "desc" if larger
// than the "scanBegin" version.
static void updatePartitionedLogsContinuousEnd(BackupDescription* desc, const std::vector<LogFile>& logs,
const Version scanBegin, const Version scanEnd) {
if (logs.empty()) return;
for (const LogFile& file : logs) {
Version end = getPartitionedLogsContinuousEndVersion(logs, file.beginVersion);
if (end > file.beginVersion) {
// desc->minLogBegin = file.beginVersion;
Version snapshotBeginVersion = desc->snapshots.size() > 0 ? desc->snapshots[0].beginVersion : invalidVersion;
Version begin = std::max(scanBegin, desc->minLogBegin.get());
TraceEvent("ContinuousLogEnd")
.detail("ScanBegin", scanBegin)
.detail("ScanEnd", scanEnd)
.detail("Begin", begin)
.detail("ContiguousLogEnd", desc->contiguousLogEnd.get());
for (const auto& file : logs) {
if (file.beginVersion > begin) {
if (scanBegin > 0) return;
// scanBegin is 0
desc->minLogBegin = file.beginVersion;
begin = file.beginVersion;
}
Version ver = getPartitionedLogsContinuousEndVersion(logs, begin);
if (ver >= desc->contiguousLogEnd.get()) {
// contiguousLogEnd is not inclusive, so +1 here.
desc->contiguousLogEnd.get() = end + 1;
return;
desc->contiguousLogEnd.get() = ver + 1;
TraceEvent("UpdateContinuousLogEnd").detail("Version", ver + 1);
if (ver > snapshotBeginVersion) return;
}
}
}
@ -1196,7 +1239,8 @@ public:
std::map<int, std::vector<int>> tagIndices; // tagId -> indices in files
for (int i = 0; i < logs.size(); i++) {
ASSERT(logs[i].tagId >= 0 && logs[i].tagId < logs[i].totalTags);
ASSERT(logs[i].tagId >= 0);
ASSERT(logs[i].tagId < logs[i].totalTags);
auto& indices = tagIndices[logs[i].tagId];
// filter out if indices.back() is subset of files[i] or vice versa
if (!indices.empty()) {
@ -1210,26 +1254,32 @@ public:
}
end = std::max(end, logs[i].endVersion - 1);
}
TraceEvent("ContinuousLogEnd").detail("Begin", begin).detail("InitVersion", end);
// check tag 0 is continuous in [begin, end] and create a map of ranges to tags
std::map<std::pair<Version, Version>, int> tags; // range [start, end] -> tags
// check partition 0 is continuous in [begin, end] and create a map of ranges to partitions
std::map<std::pair<Version, Version>, int> tags; // range [start, end] -> partitions
isContinuous(logs, tagIndices[0], begin, end, &tags);
if (tags.empty() || end <= begin) return 0;
end = std::min(end, tags.rbegin()->first.second);
TraceEvent("ContinuousLogEnd").detail("Partition", 0).detail("EndVersion", end).detail("Begin", begin);
// for each range in tags, check all tags from 1 are continouous
// for each range in tags, check all partitions from 1 are continouous
Version lastEnd = begin;
for (const auto [beginEnd, count] : tags) {
Version tagEnd = end; // This range's minimum continous tag version
Version tagEnd = beginEnd.second; // This range's minimum continous partition version
for (int i = 1; i < count; i++) {
std::map<std::pair<Version, Version>, int> rangeTags;
isContinuous(logs, tagIndices[i], beginEnd.first, beginEnd.second, &rangeTags);
tagEnd = rangeTags.empty() ? 0 : std::min(tagEnd, rangeTags.rbegin()->first.second);
if (tagEnd == 0) return lastEnd;
TraceEvent("ContinuousLogEnd")
.detail("Partition", i)
.detail("EndVersion", tagEnd)
.detail("RangeBegin", beginEnd.first)
.detail("RangeEnd", beginEnd.second);
if (tagEnd == 0) return lastEnd == begin ? 0 : lastEnd;
}
if (tagEnd < beginEnd.second) {
end = tagEnd;
break;
return tagEnd;
}
lastEnd = beginEnd.second;
}

View File

@ -88,6 +88,10 @@ struct LogFile {
return beginVersion >= rhs.beginVersion && endVersion <= rhs.endVersion && tagId == rhs.tagId;
}
bool isPartitionedLog() const {
return tagId >= 0 && tagId < totalTags;
}
std::string toString() const {
std::stringstream ss;
ss << "beginVersion:" << std::to_string(beginVersion) << " endVersion:" << std::to_string(endVersion)
@ -261,6 +265,9 @@ public:
virtual Future<BackupFileList> dumpFileList(Version begin = 0, Version end = std::numeric_limits<Version>::max()) = 0;
// If there are partitioned log files, then returns true; otherwise, returns false.
virtual Future<bool> isPartitionedBackup() = 0;
// Get exactly the files necessary to restore to targetVersion. Returns non-present if
// restore to given version is not possible.
virtual Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion) = 0;

View File

@ -48,7 +48,8 @@ static const char* typeString[] = { "SetValue",
"ByteMax",
"MinV2",
"AndV2",
"CompareAndClear"};
"CompareAndClear",
"MAX_ATOMIC_OP" };
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
@ -150,21 +151,28 @@ static inline bool isNonAssociativeOp(MutationRef::Type mutationType) {
}
struct CommitTransactionRef {
CommitTransactionRef() : read_snapshot(0) {}
CommitTransactionRef(Arena &a, const CommitTransactionRef &from)
: read_conflict_ranges(a, from.read_conflict_ranges),
write_conflict_ranges(a, from.write_conflict_ranges),
mutations(a, from.mutations),
read_snapshot(from.read_snapshot) {
}
CommitTransactionRef() : read_snapshot(0), report_conflicting_keys(false) {}
CommitTransactionRef(Arena& a, const CommitTransactionRef& from)
: read_conflict_ranges(a, from.read_conflict_ranges), write_conflict_ranges(a, from.write_conflict_ranges),
mutations(a, from.mutations), read_snapshot(from.read_snapshot),
report_conflicting_keys(from.report_conflicting_keys) {}
VectorRef< KeyRangeRef > read_conflict_ranges;
VectorRef< KeyRangeRef > write_conflict_ranges;
VectorRef< MutationRef > mutations;
Version read_snapshot;
bool report_conflicting_keys;
template <class Ar>
force_inline void serialize( Ar& ar ) {
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot);
force_inline void serialize(Ar& ar) {
if constexpr (is_fb_function<Ar>) {
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot,
report_conflicting_keys);
} else {
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot);
if (ar.protocolVersion().hasReportConflictingKeys()) {
serializer(ar, report_conflicting_keys);
}
}
}
// Convenience for internal code required to manipulate these without the Native API

View File

@ -21,6 +21,7 @@
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
@ -1970,7 +1971,6 @@ namespace fileBackup {
if (Params.addBackupLogRangeTasks().get(task)) {
wait(startBackupLogRangeInternal(tr, taskBucket, futureBucket, task, taskFuture, beginVersion, endVersion));
endVersion = beginVersion;
} else {
wait(taskFuture->set(tr, taskBucket));
}
@ -2073,12 +2073,14 @@ namespace fileBackup {
state EBackupState backupState;
state Optional<std::string> tag;
state Optional<Version> latestSnapshotEndVersion;
state Optional<bool> partitionedLog;
wait(store(stopWhenDone, config.stopWhenDone().getOrThrow(tr))
&& store(restorableVersion, config.getLatestRestorableVersion(tr))
&& store(backupState, config.stateEnum().getOrThrow(tr))
&& store(tag, config.tag().get(tr))
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr)));
&& store(latestSnapshotEndVersion, config.latestSnapshotEndVersion().get(tr))
&& store(partitionedLog, config.partitionedLogEnabled().get(tr)));
// If restorable, update the last restorable version for this tag
if(restorableVersion.present() && tag.present()) {
@ -2114,14 +2116,20 @@ namespace fileBackup {
// If a snapshot has ended for this backup then mutations are higher priority to reduce backup lag
state int priority = latestSnapshotEndVersion.present() ? 1 : 0;
// Add the initial log range task to read/copy the mutations and the next logs dispatch task which will run after this batch is done
wait(success(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, priority, beginVersion, endVersion, TaskCompletionKey::joinWith(logDispatchBatchFuture))));
wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, priority, beginVersion, endVersion, TaskCompletionKey::signal(onDone), logDispatchBatchFuture)));
if (!partitionedLog.present() || !partitionedLog.get()) {
// Add the initial log range task to read/copy the mutations and the next logs dispatch task which will run after this batch is done
wait(success(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, priority, beginVersion, endVersion, TaskCompletionKey::joinWith(logDispatchBatchFuture))));
wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, priority, beginVersion, endVersion, TaskCompletionKey::signal(onDone), logDispatchBatchFuture)));
// Do not erase at the first time
if (prevBeginVersion > 0) {
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue, Optional<Version>(beginVersion)) );
// Do not erase at the first time
if (prevBeginVersion > 0) {
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue, Optional<Version>(beginVersion)) );
}
} else {
// Skip mutation copy and erase backup mutations. Just check back periodically.
Version scheduledVersion = tr->getReadVersion().get() + CLIENT_KNOBS->BACKUP_POLL_PROGRESS_SECONDS * CLIENT_KNOBS->VERSIONS_PER_SECOND;
wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, 1, beginVersion, endVersion, TaskCompletionKey::signal(onDone), Reference<TaskFuture>(), scheduledVersion)));
}
wait(taskBucket->finish(tr, task));
@ -2135,7 +2143,7 @@ namespace fileBackup {
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, int priority, Version prevBeginVersion, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, int priority, Version prevBeginVersion, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>(), Version scheduledVersion = invalidVersion) {
Key key = wait(addBackupTask(BackupLogsDispatchTask::name,
BackupLogsDispatchTask::version,
tr, taskBucket, completionKey,
@ -2144,6 +2152,9 @@ namespace fileBackup {
[=](Reference<Task> task) {
Params.prevBeginVersion().set(task, prevBeginVersion);
Params.beginVersion().set(task, beginVersion);
if (scheduledVersion != invalidVersion) {
ReservedTaskParams::scheduledVersion().set(task, scheduledVersion);
}
},
priority));
return key;
@ -2453,13 +2464,16 @@ namespace fileBackup {
state Future<std::vector<KeyRange>> backupRangesFuture = config.backupRanges().getOrThrow(tr);
state Future<Key> destUidValueFuture = config.destUidValue().getOrThrow(tr);
wait(success(backupRangesFuture) && success(destUidValueFuture));
state Future<Optional<bool>> partitionedLog = config.partitionedLogEnabled().get(tr);
wait(success(backupRangesFuture) && success(destUidValueFuture) && success(partitionedLog));
std::vector<KeyRange> backupRanges = backupRangesFuture.get();
Key destUidValue = destUidValueFuture.get();
// Start logging the mutations for the specified ranges of the tag
for (auto &backupRange : backupRanges) {
config.startMutationLogs(tr, backupRange, destUidValue);
// Start logging the mutations for the specified ranges of the tag if needed
if (!partitionedLog.get().present() || !partitionedLog.get().get()) {
for (auto& backupRange : backupRanges) {
config.startMutationLogs(tr, backupRange, destUidValue);
}
}
config.stateEnum().set(tr, EBackupState::STATE_RUNNING);
@ -3719,9 +3733,10 @@ public:
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
TraceEvent(SevInfo, "FBA_SubmitBackup")
.detail("TagName", tagName.c_str())
.detail("StopWhenDone", stopWhenDone)
.detail("OutContainer", outContainer.toString());
.detail("TagName", tagName.c_str())
.detail("StopWhenDone", stopWhenDone)
.detail("UsePartitionedLog", partitionedLog)
.detail("OutContainer", outContainer.toString());
state KeyBackedTag tag = makeBackupTag(tagName);
Optional<UidAndAbortedFlagT> uidAndAbortedFlag = wait(tag.get(tr));

View File

@ -150,7 +150,9 @@ ACTOR Future<Void> krmSetRange( Reference<ReadYourWritesTransaction> tr, Key map
//Sets a range of keys in a key range map, coalescing with adjacent regions if the values match
//Ranges outside of maxRange will not be coalesced
//CAUTION: use care when attempting to coalesce multiple ranges in the same prefix in a single transaction
ACTOR Future<Void> krmSetRangeCoalescing( Transaction *tr, Key mapPrefix, KeyRange range, KeyRange maxRange, Value value ) {
ACTOR template <class Transaction>
static Future<Void> krmSetRangeCoalescing_(Transaction* tr, Key mapPrefix, KeyRange range, KeyRange maxRange,
Value value) {
ASSERT(maxRange.contains(range));
state KeyRange withPrefix = KeyRangeRef( mapPrefix.toString() + range.begin.toString(), mapPrefix.toString() + range.end.toString() );
@ -216,3 +218,11 @@ ACTOR Future<Void> krmSetRangeCoalescing( Transaction *tr, Key mapPrefix, KeyRan
return Void();
}
Future<Void> krmSetRangeCoalescing(Transaction* const& tr, Key const& mapPrefix, KeyRange const& range,
KeyRange const& maxRange, Value const& value) {
return krmSetRangeCoalescing_(tr, mapPrefix, range, maxRange, value);
}
Future<Void> krmSetRangeCoalescing(Reference<ReadYourWritesTransaction> const& tr, Key const& mapPrefix,
KeyRange const& range, KeyRange const& maxRange, Value const& value) {
return holdWhile(tr, krmSetRangeCoalescing_(tr.getPtr(), mapPrefix, range, maxRange, value));
}

View File

@ -103,6 +103,8 @@ void krmSetPreviouslyEmptyRange( struct CommitTransactionRef& tr, Arena& trArena
Future<Void> krmSetRange( Transaction* const& tr, Key const& mapPrefix, KeyRange const& range, Value const& value );
Future<Void> krmSetRange( Reference<ReadYourWritesTransaction> const& tr, Key const& mapPrefix, KeyRange const& range, Value const& value );
Future<Void> krmSetRangeCoalescing( Transaction* const& tr, Key const& mapPrefix, KeyRange const& range, KeyRange const& maxRange, Value const& value );
Future<Void> krmSetRangeCoalescing(Reference<ReadYourWritesTransaction> const& tr, Key const& mapPrefix,
KeyRange const& range, KeyRange const& maxRange, Value const& value);
Standalone<RangeResultRef> krmDecodeRanges( KeyRef mapPrefix, KeyRange keys, Standalone<RangeResultRef> kv );
template <class Val, class Metric, class MetricFunc>

View File

@ -138,6 +138,8 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BACKUP_COPY_TASKS, 90 );
init( BACKUP_BLOCK_SIZE, LOG_RANGE_BLOCK_SIZE/10 );
init( BACKUP_TASKS_PER_AGENT, 10 );
init( BACKUP_POLL_PROGRESS_SECONDS, 10 );
init( VERSIONS_PER_SECOND, 1e6 ); // Must be the same as SERVER_KNOBS->VERSIONS_PER_SECOND
init( SIM_BACKUP_TASKS_PER_AGENT, 10 );
init( BACKUP_RANGEFILE_BLOCK_SIZE, 1024 * 1024);
init( BACKUP_LOGFILE_BLOCK_SIZE, 1024 * 1024);

View File

@ -139,6 +139,8 @@ public:
int BACKUP_COPY_TASKS;
int BACKUP_BLOCK_SIZE;
int BACKUP_TASKS_PER_AGENT;
int BACKUP_POLL_PROGRESS_SECONDS;
int64_t VERSIONS_PER_SECOND; // Copy of SERVER_KNOBS, as we can't link with it
int SIM_BACKUP_TASKS_PER_AGENT;
int BACKUP_RANGEFILE_BLOCK_SIZE;
int BACKUP_LOGFILE_BLOCK_SIZE;

View File

@ -105,14 +105,18 @@ struct CommitID {
Version version; // returns invalidVersion if transaction conflicts
uint16_t txnBatchId;
Optional<Value> metadataVersion;
Optional<Standalone<VectorRef<int>>> conflictingKRIndices;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, txnBatchId, metadataVersion);
serializer(ar, version, txnBatchId, metadataVersion, conflictingKRIndices);
}
CommitID() : version(invalidVersion), txnBatchId(0) {}
CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion) {}
CommitID(Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion,
const Optional<Standalone<VectorRef<int>>>& conflictingKRIndices = Optional<Standalone<VectorRef<int>>>())
: version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion),
conflictingKRIndices(conflictingKRIndices) {}
};
struct CommitTransactionRequest : TimedRequest {

View File

@ -22,6 +22,7 @@
#include <algorithm>
#include <iterator>
#include <unordered_set>
#include <tuple>
#include <utility>
#include <vector>
@ -32,6 +33,7 @@
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/MultiInterface.h"
#include "flow/Trace.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/CoordinationInterface.h"
@ -42,6 +44,7 @@
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/MutationList.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbrpc/LoadBalance.h"
@ -2964,6 +2967,45 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
trLogInfo->addLog(FdbClientLogEvents::EventCommit(startTime, latency, req.transaction.mutations.size(), req.transaction.mutations.expectedSize(), req));
return Void();
} else {
// clear the RYW transaction which contains previous conflicting keys
tr->info.conflictingKeysRYW.reset();
if (ci.conflictingKRIndices.present()) {
// In general, if we want to use getRange to expose conflicting keys,
// we need to support all the parameters getRange provides.
// It is difficult to take care of all corner cases of what getRange does.
// Consequently, we use a hack way here to achieve it.
// We create an empty RYWTransaction and write all conflicting key/values to it.
// Since it is RYWTr, we can call getRange on it with same parameters given to the original
// getRange.
tr->info.conflictingKeysRYW = std::make_shared<ReadYourWritesTransaction>(tr->getDatabase());
state Reference<ReadYourWritesTransaction> hackTr =
Reference<ReadYourWritesTransaction>(tr->info.conflictingKeysRYW.get());
try {
state Standalone<VectorRef<int>> conflictingKRIndices = ci.conflictingKRIndices.get();
// To make the getRange call local, we need to explicitly set the read version here.
// This version number 100 set here does nothing but prevent getting read version from the
// proxy
tr->info.conflictingKeysRYW->setVersion(100);
// Clear the whole key space, thus, RYWTr knows to only read keys locally
tr->info.conflictingKeysRYW->clear(normalKeys);
// initialize value
tr->info.conflictingKeysRYW->set(conflictingKeysPrefix, conflictingKeysFalse);
// drop duplicate indices and merge overlapped ranges
// Note: addReadConflictRange in native transaction object does not merge overlapped ranges
state std::unordered_set<int> mergedIds(conflictingKRIndices.begin(),
conflictingKRIndices.end());
for (auto const& rCRIndex : mergedIds) {
const KeyRange kr = req.transaction.read_conflict_ranges[rCRIndex];
wait(krmSetRangeCoalescing(hackTr, conflictingKeysPrefix, kr, allKeys,
conflictingKeysTrue));
}
} catch (Error& e) {
hackTr.extractPtr(); // Make sure the RYW is not freed twice in case exception thrown
throw;
}
hackTr.extractPtr(); // Avoid the Reference to destroy the RYW object
}
if (info.debugID.present())
TraceEvent(interval.end()).detail("Conflict", 1);
@ -3076,6 +3118,9 @@ Future<Void> Transaction::commitMutations() {
if(options.firstInBatch) {
tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
}
if (options.reportConflictingKeys) {
tr.transaction.report_conflicting_keys = true;
}
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
@ -3267,7 +3312,12 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
options.includePort = true;
break;
default:
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
break;
default:
break;
}
}

View File

@ -128,6 +128,7 @@ struct TransactionOptions {
bool readOnly : 1;
bool firstInBatch : 1;
bool includePort : 1;
bool reportConflictingKeys : 1;
TransactionOptions(Database const& cx);
TransactionOptions();
@ -135,10 +136,14 @@ struct TransactionOptions {
void reset(Database const& cx);
};
class ReadYourWritesTransaction; // workaround cyclic dependency
struct TransactionInfo {
Optional<UID> debugID;
TaskPriority taskID;
bool useProvisionalProxies;
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
// shared_ptr used here since TransactionInfo is sometimes copied as function parameters.
std::shared_ptr<ReadYourWritesTransaction> conflictingKeysRYW;
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
};

View File

@ -1279,7 +1279,43 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
return Standalone<RangeResultRef>();
}
}
// Use special key prefix "\xff\xff/transaction/conflicting_keys/<some_key>",
// to retrieve keys which caused latest not_committed(conflicting with another transaction) error.
// The returned key value pairs are interpretted as :
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
// Currently, the conflicting keyranges returned are original read_conflict_ranges or union of them.
// TODO : This interface needs to be integrated into the framework that handles special keys' calls in the future
if (begin.getKey().startsWith(conflictingKeysAbsolutePrefix) &&
end.getKey().startsWith(conflictingKeysAbsolutePrefix)) {
// Remove the special key prefix "\xff\xff"
KeyRef beginConflictingKey = begin.getKey().removePrefix(specialKeys.begin);
KeyRef endConflictingKey = end.getKey().removePrefix(specialKeys.begin);
// Check if the conflicting key range to be read is valid
KeyRef maxKey = getMaxReadKey();
if (beginConflictingKey > maxKey || endConflictingKey > maxKey) return key_outside_legal_range();
begin.setKey(beginConflictingKey);
end.setKey(endConflictingKey);
if (tr.info.conflictingKeysRYW) {
Future<Standalone<RangeResultRef>> resultWithoutPrefixFuture =
tr.info.conflictingKeysRYW->getRange(begin, end, limits, snapshot, reverse);
// Make sure it happens locally
ASSERT(resultWithoutPrefixFuture.isReady());
Standalone<RangeResultRef> resultWithoutPrefix = resultWithoutPrefixFuture.get();
// Add prefix to results, making keys consistent with the getRange query
Standalone<RangeResultRef> resultWithPrefix;
resultWithPrefix.reserve(resultWithPrefix.arena(), resultWithoutPrefix.size());
for (auto const& kv : resultWithoutPrefix) {
KeyValueRef kvWithPrefix(kv.key.withPrefix(specialKeys.begin, resultWithPrefix.arena()), kv.value);
resultWithPrefix.push_back(resultWithPrefix.arena(), kvWithPrefix);
}
return resultWithPrefix;
} else {
return Standalone<RangeResultRef>();
}
}
if(checkUsedDuringCommit()) {
return used_during_commit();
}

View File

@ -31,6 +31,7 @@ const KeyRangeRef systemKeys(systemKeysPrefix, LiteralStringRef("\xff\xff") );
const KeyRangeRef nonMetadataSystemKeys(LiteralStringRef("\xff\x02"), LiteralStringRef("\xff\x03"));
const KeyRangeRef allKeys = KeyRangeRef(normalKeys.begin, systemKeys.end);
const KeyRef afterAllKeys = LiteralStringRef("\xff\xff\x00");
const KeyRangeRef specialKeys = KeyRangeRef(LiteralStringRef("\xff\xff"), LiteralStringRef("\xff\xff\xff\xff"));
// keyServersKeys.contains(k) iff k.startsWith(keyServersPrefix)
const KeyRangeRef keyServersKeys( LiteralStringRef("\xff/keyServers/"), LiteralStringRef("\xff/keyServers0") );
@ -61,6 +62,11 @@ void decodeKeyServersValue( const ValueRef& value, vector<UID>& src, vector<UID>
}
}
const KeyRef conflictingKeysPrefix = LiteralStringRef("/transaction/conflicting_keys/");
const Key conflictingKeysAbsolutePrefix = conflictingKeysPrefix.withPrefix(specialKeys.begin);
const ValueRef conflictingKeysTrue = LiteralStringRef("1");
const ValueRef conflictingKeysFalse = LiteralStringRef("0");
// "\xff/cacheServer/[[UID]] := StorageServerInterface"
// This will be added by the cache server on initialization and removed by DD
// TODO[mpilman]: We will need a way to map uint16_t ids to UIDs in a future
@ -203,7 +209,7 @@ const KeyRangeRef serverTagConflictKeys(
const KeyRef serverTagConflictPrefix = serverTagConflictKeys.begin;
// serverTagHistoryKeys is the old tag a storage server uses before it is migrated to a different location.
// For example, we can copy a SS file to a remote DC and start the SS there;
// The new SS will need to cnosume the last bits of data from the old tag it is responsible for.
// The new SS will need to consume the last bits of data from the old tag it is responsible for.
const KeyRangeRef serverTagHistoryKeys(
LiteralStringRef("\xff/serverTagHistory/"),
LiteralStringRef("\xff/serverTagHistory0") );

View File

@ -36,6 +36,7 @@ extern const KeyRangeRef normalKeys; // '' to systemKeys.begin
extern const KeyRangeRef systemKeys; // [FF] to [FF][FF]
extern const KeyRangeRef nonMetadataSystemKeys; // [FF][00] to [FF][01]
extern const KeyRangeRef allKeys; // '' to systemKeys.end
extern const KeyRangeRef specialKeys; // [FF][FF] to [FF][FF][FF][FF]
extern const KeyRef afterAllKeys;
// "\xff/keyServers/[[begin]]" := "[[vector<serverID>, vector<serverID>]]"
@ -70,6 +71,10 @@ const Key serverKeysPrefixFor( UID serverID );
UID serverKeysDecodeServer( const KeyRef& key );
bool serverHasKey( ValueRef storedValue );
extern const KeyRef conflictingKeysPrefix;
extern const Key conflictingKeysAbsolutePrefix;
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;
extern const KeyRef cacheKeysPrefix;
const Key cacheKeysKey( uint16_t idx, const KeyRef& key );

View File

@ -253,6 +253,8 @@ description is not currently required but encouraged.
hidden="true" />
<Option name="use_provisional_proxies" code="711"
description="This option should only be used by tools which change the database configuration." />
<Option name="report_conflicting_keys" code="712"
description="The transaction can retrieve keys that are conflicting with other transactions." />
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -78,16 +78,16 @@ public:
Range range() { return Range(begin(),end()); }
Val& value() {
Val& value() {
//ASSERT( it->key != allKeys.end );
return it->value;
return it->value;
}
void operator ++() { ++it; }
void operator --() { it.decrementNonEnd(); }
bool operator ==(Iterator const& r) const { return it == r.it; }
bool operator !=(Iterator const& r) const { return it != r.it; }
// operator* and -> return this
Iterator& operator*() { return *this; }
Iterator* operator->() { return this; }
@ -131,10 +131,10 @@ public:
--i;
return i;
}
Iterator lastItem() {
Iterator lastItem() {
auto i = map.lastItem();
i.decrementNonEnd();
return Iterator(i);
return Iterator(i);
}
int size() const { return map.size() - 1; } // We always have one range bounded by two entries
Iterator randomRange() {

View File

@ -47,8 +47,10 @@ void BackupProgress::addBackupStatus(const WorkerBackupStatus& status) {
void BackupProgress::updateTagVersions(std::map<Tag, Version>* tagVersions, std::set<Tag>* tags,
const std::map<Tag, Version>& progress, Version endVersion, LogEpoch epoch) {
for (const auto& [tag, savedVersion] : progress) {
tags->erase(tag);
if (savedVersion < endVersion - 1) {
// If tag is not in "tags", it means the old epoch has more tags than
// new epoch's tags. Just ignore the tag here.
auto n = tags->erase(tag);
if (n > 0 && savedVersion < endVersion - 1) {
tagVersions->insert({ tag, savedVersion + 1 });
TraceEvent("BackupVersionRange", dbgid)
.detail("OldEpoch", epoch)

View File

@ -596,6 +596,29 @@ ACTOR Future<Void> addMutation(Reference<IBackupFile> logFile, VersionedMessage
return Void();
}
ACTOR static Future<Void> updateLogBytesWritten(BackupData* self, std::vector<UID> backupUids,
std::vector<Reference<IBackupFile>> logFiles) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
ASSERT(backupUids.size() == logFiles.size());
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (int i = 0; i < backupUids.size(); i++) {
BackupConfig config(backupUids[i]);
config.logBytesWritten().atomicOp(tr, logFiles[i]->size(), MutationRef::AddValue);
}
wait(tr->commit());
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
// Saves messages in the range of [0, numMsg) to a file and then remove these
// messages. The file content format is a sequence of (Version, sub#, msgSize, message).
// Note only ready backups are saved.
@ -703,6 +726,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
self->backups[uid].lastSavedVersion = popVersion + 1;
}
wait(updateLogBytesWritten(self, activeUids, logFiles));
return Void();
}

View File

@ -167,6 +167,7 @@ set(FDBSERVER_SRCS
workloads/RandomSelector.actor.cpp
workloads/ReadWrite.actor.cpp
workloads/RemoveServersSafely.actor.cpp
workloads/ReportConflictingKeys.actor.cpp
workloads/Rollback.actor.cpp
workloads/RyowCorrectness.actor.cpp
workloads/RYWDisable.actor.cpp

View File

@ -33,7 +33,8 @@ void clearConflictSet(ConflictSet*, Version);
void destroyConflictSet(ConflictSet*);
struct ConflictBatch {
explicit ConflictBatch(ConflictSet*);
explicit ConflictBatch(ConflictSet*, std::map<int, VectorRef<int>>* conflictingKeyRangeMap = nullptr,
Arena* resolveBatchReplyArena = nullptr);
~ConflictBatch();
enum TransactionCommitResult {
@ -55,6 +56,8 @@ private:
std::vector<std::pair<StringRef, StringRef>> combinedWriteConflictRanges;
std::vector<struct ReadConflictRange> combinedReadConflictRanges;
bool* transactionConflictStatus;
std::map<int, VectorRef<int>>* conflictingKeyRangeMap;
Arena* resolveBatchReplyArena;
void checkIntraBatchConflicts();
void combineWriteConflictRanges();

View File

@ -570,7 +570,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS, 60 ); if( randomize && BUGGIFY ) { FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS = deterministicRandom()->random01() * 240 + 10; }
init( FASTRESTORE_USE_PARTITIONED_LOGS, true );
init( FASTRESTORE_TRACK_REQUEST_LATENCY, true ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_REQUEST_LATENCY = false; }
init( FASTRESTORE_TRACK_LOADER_SEND_REQUESTS, false ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_LOADER_SEND_REQUESTS = true; }
init( FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT, 6144 ); if( randomize && BUGGIFY ) { FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT = 1; }

View File

@ -512,7 +512,6 @@ public:
int64_t FASTRESTORE_APPLYING_PARALLELISM; // number of outstanding txns writing to dest. DB
int64_t FASTRESTORE_MONITOR_LEADER_DELAY;
int64_t FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS;
bool FASTRESTORE_USE_PARTITIONED_LOGS;
bool FASTRESTORE_TRACK_REQUEST_LATENCY; // true to track reply latency of each request in a request batch
bool FASTRESTORE_TRACK_LOADER_SEND_REQUESTS; // track requests of load send mutations to appliers?
int64_t FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT; // threshold when pipelined actors should be delayed

View File

@ -447,8 +447,10 @@ struct ResolutionRequestBuilder {
vector<int> resolversUsed;
for (int r = 0; r<outTr.size(); r++)
if (outTr[r])
if (outTr[r]) {
resolversUsed.push_back(r);
outTr[r]->report_conflicting_keys = trIn.report_conflicting_keys;
}
transactionResolverMap.push_back(std::move(resolversUsed));
}
};
@ -640,6 +642,7 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
return Void();
}
// Commit one batch of transactions trs
ACTOR Future<Void> commitBatch(
ProxyCommitData* self,
vector<CommitTransactionRequest> trs,
@ -819,7 +822,9 @@ ACTOR Future<Void> commitBatch(
// Determine which transactions actually committed (conservatively) by combining results from the resolvers
state vector<uint8_t> committed(trs.size());
ASSERT(transactionResolverMap.size() == committed.size());
vector<int> nextTr(resolution.size());
// For each commitTransactionRef, it is only sent to resolvers specified in transactionResolverMap
// Thus, we use this nextTr to track the correct transaction index on each resolver.
state vector<int> nextTr(resolution.size());
for (int t = 0; t<trs.size(); t++) {
uint8_t commit = ConflictBatch::TransactionCommitted;
for (int r : transactionResolverMap[t])
@ -1153,6 +1158,8 @@ ACTOR Future<Void> commitBatch(
// Send replies to clients
double endTime = g_network->timer();
// Reset all to zero, used to track the correct index of each commitTransacitonRef on each resolver
std::fill(nextTr.begin(), nextTr.end(), 0);
for (int t = 0; t < trs.size(); t++) {
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
ASSERT_WE_THINK(commitVersion != invalidVersion);
@ -1162,9 +1169,27 @@ ACTOR Future<Void> commitBatch(
trs[t].reply.sendError(transaction_too_old());
}
else {
trs[t].reply.sendError(not_committed());
// If enable the option to report conflicting keys from resolvers, we send back all keyranges' indices
// through CommitID
if (trs[t].transaction.report_conflicting_keys) {
Standalone<VectorRef<int>> conflictingKRIndices;
for (int resolverInd : transactionResolverMap[t]) {
auto const& cKRs = resolution[resolverInd].conflictingKeyRangeMap[nextTr[resolverInd]];
for (auto const& rCRIndex : cKRs)
conflictingKRIndices.push_back(conflictingKRIndices.arena(), rCRIndex);
}
// At least one keyRange index should be returned
ASSERT(conflictingKRIndices.size());
trs[t].reply.send(CommitID(invalidVersion, t, Optional<Value>(),
Optional<Standalone<VectorRef<int>>>(conflictingKRIndices)));
} else {
trs[t].reply.sendError(not_committed());
}
}
// Update corresponding transaction indices on each resolver
for (int resolverInd : transactionResolverMap[t]) nextTr[resolverInd]++;
// TODO: filter if pipelined with large commit
if(self->latencyBandConfig.present()) {
bool filter = maxTransactionBytes > self->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault(std::numeric_limits<int>::max());

View File

@ -166,13 +166,15 @@ ACTOR Future<Void> resolveBatch(
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer");
ResolveTransactionBatchReply& reply = proxyInfo.outstandingBatches[req.version];
vector<int> commitList;
vector<int> tooOldList;
// Detect conflicts
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
ConflictBatch conflictBatch( self->conflictSet );
ConflictBatch conflictBatch(self->conflictSet, &reply.conflictingKeyRangeMap, &reply.arena);
int keys = 0;
for(int t=0; t<req.transactions.size(); t++) {
conflictBatch.addTransaction( req.transactions[t] );
@ -189,7 +191,6 @@ ACTOR Future<Void> resolveBatch(
}
conflictBatch.detectConflicts( req.version, req.version - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS, commitList, &tooOldList);
ResolveTransactionBatchReply &reply = proxyInfo.outstandingBatches[req.version];
reply.debugID = req.debugID;
reply.committed.resize( reply.arena, req.transactions.size() );
for(int c=0; c<commitList.size(); c++)

View File

@ -80,10 +80,12 @@ struct ResolveTransactionBatchReply {
VectorRef<uint8_t> committed;
Optional<UID> debugID;
VectorRef<VectorRef<StateTransactionRef>> stateMutations; // [version][transaction#] -> (committed, [mutation#])
std::map<int, VectorRef<int>>
conflictingKeyRangeMap; // transaction index -> conflicting read_conflict_range ids given by the resolver
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, committed, stateMutations, arena, debugID);
serializer(ar, committed, stateMutations, debugID, conflictingKeyRangeMap, arena);
}
};

View File

@ -161,6 +161,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Note: Log and range mutations may be delivered out of order. Can we handle it?
if (mutation.type == MutationRef::SetVersionstampedKey ||
mutation.type == MutationRef::SetVersionstampedValue) {
ASSERT(false); // No version stamp mutations in backup logs
batchData->addVersionStampedKV(mutation, mutationVersion, numVersionStampedKV);
numVersionStampedKV++;
} else {
@ -219,11 +220,12 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
wait(waitForAll(fValues));
break;
} catch (Error& e) {
retries++;
TraceEvent(retries > 10 ? SevError : SevWarn, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("GetKeys", incompleteStagingKeys.size())
.detail("Error", e.what())
.detail("ErrorCode", e.code());
if (retries++ > 10) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysGetKeysStuck")
.detail("GetKeys", incompleteStagingKeys.size())
.error(e);
}
wait(tr->onError(e));
fValues.clear();
}
@ -233,17 +235,17 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
int i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].get().present()) {
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
.detail("Key", key.first)
.detail("Reason", "Not found in DB")
.detail("PendingMutations", key.second->second.pendingMutations.size())
.detail("StagingKeyType", (int)key.second->second.type);
for (auto& vm : key.second->second.pendingMutations) {
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
.detail("PendingMutationVersion", vm.first.toString())
.detail("PendingMutation", vm.second.toString());
}
key.second->second.precomputeResult();
key.second->second.precomputeResult("GetAndComputeStagingKeysNoBaseValueInDB");
i++;
continue;
} else {
@ -251,7 +253,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
key.second->second.add(m, LogMessageVersion(1));
key.second->second.precomputeResult();
key.second->second.precomputeResult("GetAndComputeStagingKeys");
i++;
}
}
@ -296,9 +298,16 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
.detail("ClearRanges", batchData->stagingKeyRanges.size());
for (auto& rangeMutation : batchData->stagingKeyRanges) {
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2);
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param2);
while (lb != ub) {
lb->second.add(rangeMutation.mutation, rangeMutation.version);
if (lb->first >= rangeMutation.mutation.param2) {
TraceEvent(SevError, "FastRestoreApplerPhasePrecomputeMutationsResult_IncorrectUpperBound")
.detail("Key", lb->first)
.detail("ClearRangeUpperBound", rangeMutation.mutation.param2)
.detail("UsedUpperBound", ub->first);
}
MutationRef clearKey(MutationRef::ClearRange, lb->first, lb->first);
lb->second.add(clearKey, rangeMutation.version);
lb++;
}
}
@ -338,7 +347,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
stagingKeyIter++) {
if (stagingKeyIter->second.hasBaseValue()) {
stagingKeyIter->second.precomputeResult();
stagingKeyIter->second.precomputeResult("HasBaseValue");
}
}

View File

@ -79,7 +79,20 @@ struct StagingKey {
// newVersion can be smaller than version as different loaders can send
// mutations out of order.
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
if (m.type == MutationRef::ClearRange) {
// We should only clear this key! Otherwise, it causes side effect to other keys
ASSERT(m.param1 == m.param2);
}
if (version < newVersion) {
if (debugMutation("StagingKeyAdd", newVersion.version, m)) {
TraceEvent("StagingKeyAdd")
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("MType", typeString[(int)type])
.detail("Key", key)
.detail("Val", val)
.detail("NewMutation", m.toString());
}
key = m.param1;
val = m.param2;
type = (MutationRef::Type)m.type;
@ -91,6 +104,7 @@ struct StagingKey {
pendingMutations.emplace(newVersion, m);
} else {
// Duplicated mutation ignored.
// TODO: Add SevError here
TraceEvent("SameVersion")
.detail("Version", version.toString())
.detail("Mutation", m.toString())
@ -102,16 +116,21 @@ struct StagingKey {
// Precompute the final value of the key.
// TODO: Look at the last LogMessageVersion, if it set or clear, we can ignore the rest of versions.
void precomputeResult() {
void precomputeResult(const char* context) {
// TODO: Change typeString[(int)type] to a safe function that validate type range
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
.detail("Key", key)
.detail("Context", context)
.detail("Version", version.toString())
.detail("Key", key)
.detail("Value", val)
.detail("MType", type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "[Unset]")
.detail("LargestPendingVersion",
(pendingMutations.empty() ? "[none]" : pendingMutations.rbegin()->first.toString()));
std::map<LogMessageVersion, Standalone<MutationRef>>::iterator lb = pendingMutations.lower_bound(version);
if (lb == pendingMutations.end()) {
return;
}
ASSERT(!pendingMutations.empty());
if (lb->first == version) {
// Sanity check mutations at version are either atomicOps which can be ignored or the same value as buffered
MutationRef m = lb->second;
@ -130,7 +149,11 @@ struct StagingKey {
MutationRef mutation = lb->second;
if (type == MutationRef::CompareAndClear) { // Special atomicOp
Arena arena;
Optional<ValueRef> retVal = doCompareAndClear(val, mutation.param2, arena);
Optional<StringRef> inputVal;
if (hasBaseValue()) {
inputVal = val;
}
Optional<ValueRef> retVal = doCompareAndClear(inputVal, mutation.param2, arena);
if (!retVal.present()) {
val = key;
type = MutationRef::ClearRange;
@ -152,6 +175,7 @@ struct StagingKey {
.detail("MutationType", typeString[mutation.type])
.detail("Version", lb->first.toString());
}
ASSERT(lb->first > version);
version = lb->first;
}
}

View File

@ -21,6 +21,7 @@
// This file implements the functions and actors used by the RestoreLoader role.
// The RestoreLoader role starts with the restoreLoaderCore actor
#include "flow/UnitTest.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
@ -201,7 +202,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
}
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
TraceEvent(SevFRMutationInfo, "FastRestoreDecodePartitionedLogFile")
.detail("CommitVersion", msgVersion.toString())
.detail("ParsedMutation", mutation.toString());
it->second.push_back_deep(it->second.arena(), mutation);
@ -335,6 +336,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
.detail("UseRangeFile", req.useRangeFile)
.detail("LoaderSendStatus", batchStatus->toString());
// Ensure each file is sent exactly once by using batchStatus->sendAllLogs and batchStatus->sendAllRanges
if (!req.useRangeFile) {
if (!batchStatus->sendAllLogs.present()) { // Has not sent
batchStatus->sendAllLogs = Never();
@ -342,7 +344,6 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
TraceEvent(SevInfo, "FastRestoreSendMutationsProcessLogRequest", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile);
ASSERT(!batchStatus->sendAllRanges.present());
} else if (!batchStatus->sendAllLogs.get().isReady()) { // In the process of sending
TraceEvent(SevDebug, "FastRestoreSendMutationsWaitDuplicateLogRequest", self->id())
.detail("BatchIndex", req.batchIndex)
@ -360,7 +361,6 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
TraceEvent(SevInfo, "FastRestoreSendMutationsProcessRangeRequest", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile);
ASSERT(batchStatus->sendAllLogs.get().isReady());
} else if (!batchStatus->sendAllRanges.get().isReady()) {
TraceEvent(SevDebug, "FastRestoreSendMutationsWaitDuplicateRangeRequest", self->id())
.detail("BatchIndex", req.batchIndex)
@ -587,8 +587,12 @@ void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mv
curm.param2 = itlow->first;
}
ASSERT(curm.param1 <= curm.param2);
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
// itup > m.param2: (itup-1) may be out of mutation m's range
// Ensure the added mutations have overlap with mutation m
if (m.param1 < curm.param2 && m.param2 > curm.param1) {
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
}
}
}
@ -680,8 +684,6 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
// We have already filter the commit not in [beginVersion, endVersion) when we concatenate kv pair in log file
ASSERT_WE_THINK(asset.isInVersionRange(commitVersion));
auto it = kvOps.insert(std::make_pair(LogMessageVersion(commitVersion), MutationsVec()));
ASSERT(it.second); // inserted is true
StringRefReader vReader(val, restore_corrupted_data());
vReader.consume<uint64_t>(); // Consume the includeVersion
@ -691,6 +693,7 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
uint32_t val_length_decoded = vReader.consume<uint32_t>();
ASSERT(val_length_decoded == val.size() - sizeof(uint64_t) - sizeof(uint32_t));
int sub = 0;
while (1) {
// stop when reach the end of the string
if (vReader.eof()) { //|| *reader.rptr == 0xFF
@ -718,10 +721,15 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
cc->sampledLogBytes += mutation.totalSize();
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
auto it = kvOps.insert(std::make_pair(LogMessageVersion(commitVersion, sub++), MutationsVec()));
ASSERT(it.second); // inserted is true
ASSERT(sub < std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
samples.push_back_deep(samples.arena(), mutation);
@ -802,7 +810,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// We cache all kv operations into kvOps, and apply all kv operations later in one place
auto it = kvOps.insert(std::make_pair(msgVersion, MutationsVec()));
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeRangeFile")
.detail("CommitVersion", version)
.detail("ParsedMutationKV", m.toString());
@ -879,3 +887,99 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
req.reply.send(RestoreCommonReply(self->id(), false));
return Void();
}
// Test splitMutation
TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
std::map<Key, UID> rangeToApplier;
MutationsVec mvector;
Standalone<VectorRef<UID>> nodeIDs;
// Prepare RangeToApplier
rangeToApplier.emplace(normalKeys.begin, deterministicRandom()->randomUniqueID());
int numAppliers = deterministicRandom()->randomInt(1, 50);
for (int i = 0; i < numAppliers; ++i) {
Key k = Key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(1, 1000)));
UID node = deterministicRandom()->randomUniqueID();
rangeToApplier.emplace(k, node);
TraceEvent("RangeToApplier").detail("Key", k).detail("Node", node);
}
Key k1 = Key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(1, 500)));
Key k2 = Key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(1, 1000)));
Key beginK = k1 < k2 ? k1 : k2;
Key endK = k1 < k2 ? k2 : k1;
Standalone<MutationRef> mutation(MutationRef(MutationRef::ClearRange, beginK.contents(), endK.contents()));
// Method 1: Use splitMutation
splitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
// Method 2: Use intersection
KeyRangeMap<UID> krMap;
std::map<Key, UID>::iterator beginKey = rangeToApplier.begin();
std::map<Key, UID>::iterator endKey = std::next(beginKey, 1);
while (endKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", endKey->first)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second);
beginKey = endKey;
endKey++;
}
if (beginKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", normalKeys.end)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second);
}
int splitMutationIndex = 0;
auto r = krMap.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2));
bool correctResult = true;
for (auto i = r.begin(); i != r.end(); ++i) {
// intersectionRange result
// Calculate the overlap range
KeyRef rangeBegin = mutation.param1 > i->range().begin ? mutation.param1 : i->range().begin;
KeyRef rangeEnd = mutation.param2 < i->range().end ? mutation.param2 : i->range().end;
KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd));
UID nodeID = i->value();
// splitMuation result
if (splitMutationIndex >= mvector.size()) {
correctResult = false;
break;
}
MutationRef result2M = mvector[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
KeyRange krange2(KeyRangeRef(result2M.param1, result2M.param2));
TraceEvent("Result")
.detail("KeyRange1", krange1.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
if (krange1 != krange2 || nodeID != applierID) {
correctResult = false;
TraceEvent(SevError, "IncorrectResult")
.detail("Mutation", mutation.toString())
.detail("KeyRange1", krange1.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
}
splitMutationIndex++;
}
if (splitMutationIndex != mvector.size()) {
correctResult = false;
TraceEvent(SevError, "SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Results", mvector.size());
for (; splitMutationIndex < mvector.size(); splitMutationIndex++) {
TraceEvent("SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Result", mvector[splitMutationIndex].toString());
}
}
return Void();
}

View File

@ -84,10 +84,7 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker,
wait(startProcessRestoreRequests(self, cx));
} catch (Error& e) {
if (e.code() != error_code_operation_cancelled) {
TraceEvent(SevError, "FastRestoreMasterStart")
.detail("Reason", "Unexpected unhandled error")
.detail("ErrorCode", e.code())
.detail("Error", e.what());
TraceEvent(SevError, "FastRestoreMasterStart").detail("Reason", "Unexpected unhandled error").error(e);
}
}
@ -204,11 +201,13 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
} catch (Error& e) {
if (restoreIndex < restoreRequests.size()) {
TraceEvent(SevError, "FastRestoreMasterProcessRestoreRequestsFailed", self->id())
.detail("RestoreRequest", restoreRequests[restoreIndex].toString());
.detail("RestoreRequest", restoreRequests[restoreIndex].toString())
.error(e);
} else {
TraceEvent(SevError, "FastRestoreMasterProcessRestoreRequestsFailed", self->id())
.detail("RestoreRequests", restoreRequests.size())
.detail("RestoreIndex", restoreIndex);
.detail("RestoreIndex", restoreIndex)
.error(e);
}
}
@ -433,8 +432,6 @@ ACTOR static Future<Void> sendMutationsFromLoaders(Reference<MasterBatchData> ba
std::vector<std::pair<UID, RestoreSendMutationsToAppliersRequest>> requests;
for (auto& loader : loadersInterf) {
ASSERT(batchStatus->loadStatus.find(loader.first) == batchStatus->loadStatus.end() ||
batchStatus->loadStatus[loader.first] == RestoreSendStatus::SendedLogs);
requests.emplace_back(
loader.first, RestoreSendMutationsToAppliersRequest(batchIndex, batchData->rangeToApplier, useRangeFile));
batchStatus->loadStatus[loader.first] =
@ -444,40 +441,6 @@ ACTOR static Future<Void> sendMutationsFromLoaders(Reference<MasterBatchData> ba
wait(getBatchReplies(&RestoreLoaderInterface::sendMutations, loadersInterf, requests, &replies,
TaskPriority::RestoreLoaderSendMutations));
// Update status and sanity check
for (auto& reply : replies) {
RestoreSendStatus status = batchStatus->loadStatus[reply.id];
if ((status == RestoreSendStatus::SendingRanges || status == RestoreSendStatus::SendingLogs)) {
batchStatus->loadStatus[reply.id] = (status == RestoreSendStatus::SendingRanges)
? RestoreSendStatus::SendedRanges
: RestoreSendStatus::SendedLogs;
if (reply.isDuplicated) {
TraceEvent(SevWarn, "FastRestoreMasterPhaseSendMutationsFromLoaders")
.detail("Loader", reply.id)
.detail("DuplicateRequestAcked", "Request should have been processed");
}
} else if ((status == RestoreSendStatus::SendedRanges || status == RestoreSendStatus::SendedLogs) &&
reply.isDuplicated) {
TraceEvent(SevDebug, "FastRestoreMasterPhaseSendMutationsFromLoaders")
.detail("Loader", reply.id)
.detail("RequestIgnored", "Send request was sent more than once");
} else {
TraceEvent(SevError, "FastRestoreMasterPhaseSendMutationsFromLoaders")
.detail("Loader", reply.id)
.detail("UnexpectedReply", reply.toString());
}
}
// Sanity check all loaders have sent requests
for (auto& loader : loadersInterf) {
if ((useRangeFile && batchStatus->loadStatus[loader.first] != RestoreSendStatus::SendedRanges) ||
(!useRangeFile && batchStatus->loadStatus[loader.first] != RestoreSendStatus::SendedLogs)) {
TraceEvent(SevError, "FastRestoreMasterPhaseSendMutationsFromLoaders")
.detail("Loader", loader.first)
.detail("UseRangeFile", useRangeFile)
.detail("SendStatus", batchStatus->loadStatus[loader.first]);
}
}
TraceEvent("FastRestoreMasterPhaseSendMutationsFromLoadersDone")
.detail("BatchIndex", batchIndex)
.detail("UseRangeFiles", useRangeFile)
@ -512,17 +475,20 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
ASSERT(batchStatus->loadStatus.empty());
ASSERT(batchStatus->applyStatus.empty());
wait(loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, false));
wait(loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, true));
// New backup has subversion to order mutations at the same version. For mutations at the same version,
// range file's mutations have the largest subversion and larger than log file's.
// SOMEDAY: Extend subversion to old-style backup.
wait(
loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, false) &&
loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, true));
ASSERT(batchData->rangeToApplier.empty());
splitKeyRangeForAppliers(batchData, self->appliersInterf, batchIndex);
// Loaders should ensure log files' mutations sent to appliers before range files' mutations
// TODO: Let applier buffer mutations from log and range files differently so that loaders can send mutations in
// parallel
wait(sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, false));
wait(sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, true));
// Ask loaders to send parsed mutations to appliers;
// log mutations should be applied before range mutations at the same version, which is ensured by LogMessageVersion
wait(sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, false) &&
sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, true));
// Synchronization point for version batch pipelining.
// self->finishedBatch will continuously increase by 1 per version batch.
@ -641,7 +607,8 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
std::vector<RestoreFileFR>* logFiles, Database cx,
RestoreRequest request) {
state BackupDescription desc = wait(bc->describePartitionedBackup());
state bool partitioned = wait(bc->isPartitionedBackup());
state BackupDescription desc = wait(partitioned ? bc->describePartitionedBackup() : bc->describeBackup());
// Convert version to real time for operators to read the BackupDescription desc.
wait(desc.resolveVersionTimes(cx));
@ -657,9 +624,8 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
std::cout << "Restore to version: " << request.targetVersion << "\nBackupDesc: \n" << desc.toString() << "\n\n";
}
Optional<RestorableFileSet> restorable =
wait(SERVER_KNOBS->FASTRESTORE_USE_PARTITIONED_LOGS ? bc->getPartitionedRestoreSet(request.targetVersion)
: bc->getRestoreSet(request.targetVersion));
Optional<RestorableFileSet> restorable = wait(partitioned ? bc->getPartitionedRestoreSet(request.targetVersion)
: bc->getRestoreSet(request.targetVersion));
if (!restorable.present()) {
TraceEvent(SevWarn, "FastRestoreMasterPhaseCollectBackupFiles").detail("NotRestorable", request.targetVersion);

View File

@ -70,8 +70,14 @@ struct ReadConflictRange {
StringRef begin, end;
Version version;
int transaction;
ReadConflictRange(StringRef begin, StringRef end, Version version, int transaction)
: begin(begin), end(end), version(version), transaction(transaction) {}
int indexInTx;
VectorRef<int>* conflictingKeyRange;
Arena* cKRArena;
ReadConflictRange(StringRef begin, StringRef end, Version version, int transaction, int indexInTx,
VectorRef<int>* cKR = nullptr, Arena* cKRArena = nullptr)
: begin(begin), end(end), version(version), transaction(transaction), indexInTx(indexInTx),
conflictingKeyRange(cKR), cKRArena(cKRArena) {}
bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin) < 0; }
};
@ -287,10 +293,10 @@ private:
int nPointers, valueLength;
};
static force_inline bool less( const uint8_t* a, int aLen, const uint8_t* b, int bLen ) {
int c = memcmp(a,b,min(aLen,bLen));
if (c<0) return true;
if (c>0) return false;
static force_inline bool less(const uint8_t* a, int aLen, const uint8_t* b, int bLen) {
int c = memcmp(a, b, min(aLen, bLen));
if (c < 0) return true;
if (c > 0) return false;
return aLen < bLen;
}
@ -415,7 +421,8 @@ public:
int started = min(M, count);
for (int i = 0; i < started; i++) {
inProgress[i].init(ranges[i], header, transactionConflictStatus);
inProgress[i].init(ranges[i], header, transactionConflictStatus, ranges[i].indexInTx,
ranges[i].conflictingKeyRange, ranges[i].cKRArena);
nextJob[i] = i + 1;
}
nextJob[started - 1] = 0;
@ -429,8 +436,11 @@ public:
if (prevJob == job) break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
} else
inProgress[job].init(ranges[started++], header, transactionConflictStatus);
} else {
int temp = started++;
inProgress[job].init(ranges[temp], header, transactionConflictStatus, ranges[temp].indexInTx,
ranges[temp].conflictingKeyRange, ranges[temp].cKRArena);
}
}
prevJob = job;
job = nextJob[job];
@ -598,18 +608,26 @@ private:
Version version;
bool* result;
int state;
int indexInTx;
VectorRef<int>* conflictingKeyRange; // nullptr if report_conflicting_keys is not enabled.
Arena* cKRArena; // nullptr if report_conflicting_keys is not enabled.
void init(const ReadConflictRange& r, Node* header, bool* tCS) {
void init(const ReadConflictRange& r, Node* header, bool* tCS, int indexInTx, VectorRef<int>* cKR,
Arena* cKRArena) {
this->start.init(r.begin, header);
this->end.init(r.end, header);
this->version = r.version;
this->indexInTx = indexInTx;
this->cKRArena = cKRArena;
result = &tCS[r.transaction];
conflictingKeyRange = cKR;
this->state = 0;
}
bool noConflict() { return true; }
bool conflict() {
*result = true;
if (conflictingKeyRange != nullptr) conflictingKeyRange->push_back(*cKRArena, indexInTx);
return true;
}
@ -728,7 +746,10 @@ void destroyConflictSet(ConflictSet* cs) {
delete cs;
}
ConflictBatch::ConflictBatch(ConflictSet* cs) : cs(cs), transactionCount(0) {}
ConflictBatch::ConflictBatch(ConflictSet* cs, std::map<int, VectorRef<int>>* conflictingKeyRangeMap,
Arena* resolveBatchReplyArena)
: cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap),
resolveBatchReplyArena(resolveBatchReplyArena) {}
ConflictBatch::~ConflictBatch() {}
@ -736,6 +757,7 @@ struct TransactionInfo {
VectorRef<std::pair<int, int>> readRanges;
VectorRef<std::pair<int, int>> writeRanges;
bool tooOld;
bool reportConflictingKeys;
};
void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
@ -743,6 +765,7 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
Arena& arena = transactionInfo.arena();
TransactionInfo* info = new (arena) TransactionInfo;
info->reportConflictingKeys = tr.report_conflicting_keys;
if (tr.read_snapshot < cs->oldestVersion && tr.read_conflict_ranges.size()) {
info->tooOld = true;
@ -756,7 +779,10 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
const KeyRangeRef& range = tr.read_conflict_ranges[r];
points.emplace_back(range.begin, true, false, t, &info->readRanges[r].first);
points.emplace_back(range.end, false, false, t, &info->readRanges[r].second);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, r,
tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t]
: nullptr,
tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr);
}
for (int r = 0; r < tr.write_conflict_ranges.size(); r++) {
const KeyRangeRef& range = tr.write_conflict_ranges[r];
@ -793,11 +819,15 @@ void ConflictBatch::checkIntraBatchConflicts() {
const TransactionInfo& tr = *transactionInfo[t];
if (transactionConflictStatus[t]) continue;
bool conflict = tr.tooOld;
for (int i = 0; i < tr.readRanges.size(); i++)
for (int i = 0; i < tr.readRanges.size(); i++) {
if (mcs.any(tr.readRanges[i].first, tr.readRanges[i].second)) {
if (tr.reportConflictingKeys) {
(*conflictingKeyRangeMap)[t].push_back(*resolveBatchReplyArena, i);
}
conflict = true;
break;
}
}
transactionConflictStatus[t] = conflict;
if (!conflict)
for (int i = 0; i < tr.writeRanges.size(); i++) mcs.set(tr.writeRanges[i].first, tr.writeRanges[i].second);

View File

@ -370,6 +370,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
newState.oldTLogData.emplace_back(oldData);
TraceEvent("BWToCore")
.detail("Epoch", newState.oldTLogData.back().epoch)
.detail("TotalTags", newState.oldTLogData.back().logRouterTags)
.detail("BeginVersion", newState.oldTLogData.back().epochBegin)
.detail("EndVersion", newState.oldTLogData.back().epochEnd);
}

View File

@ -172,6 +172,7 @@
<ActorCompiler Include="workloads\SnapTest.actor.cpp" />
<ActorCompiler Include="workloads\Mako.actor.cpp" />
<ActorCompiler Include="workloads\ExternalWorkload.actor.cpp" />
<ActorCompiler Include="workloads\ReportConflictingKeys.actor.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="ApplyMetadataMutation.h" />

View File

@ -303,6 +303,9 @@
<ActorCompiler Include="workloads\Mako.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\ReportConflictingKeys.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="OldTLogServer.actor.cpp" />
</ItemGroup>
<ItemGroup>

View File

@ -29,6 +29,7 @@ struct AtomicRestoreWorkload : TestWorkload {
double startAfter, restoreAfter;
bool fastRestore; // true: use fast restore, false: use old style restore
Standalone<VectorRef<KeyRangeRef>> backupRanges;
bool usePartitionedLogs;
AtomicRestoreWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx) {
@ -37,6 +38,8 @@ struct AtomicRestoreWorkload : TestWorkload {
restoreAfter = getOption(options, LiteralStringRef("restoreAfter"), 20.0);
fastRestore = getOption(options, LiteralStringRef("fastRestore"), false);
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
usePartitionedLogs = getOption(options, LiteralStringRef("usePartitionedLogs"),
deterministicRandom()->random01() < 0.5 ? true : false);
}
virtual std::string description() {
@ -64,13 +67,14 @@ struct AtomicRestoreWorkload : TestWorkload {
state FileBackupAgent backupAgent;
wait( delay(self->startAfter * deterministicRandom()->random01()) );
TraceEvent("AtomicRestore_Start");
TraceEvent("AtomicRestore_Start").detail("UsePartitionedLog", self->usePartitionedLogs);
state std::string backupContainer = "file://simfdb/backups/";
try {
wait(backupAgent.submitBackup(cx, StringRef(backupContainer), deterministicRandom()->randomInt(0, 100), BackupAgentBase::getDefaultTagName(), self->backupRanges, false));
}
catch (Error& e) {
wait(backupAgent.submitBackup(cx, StringRef(backupContainer), deterministicRandom()->randomInt(0, 100),
BackupAgentBase::getDefaultTagName(), self->backupRanges, false,
self->usePartitionedLogs));
} catch (Error& e) {
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
throw;
}

View File

@ -40,6 +40,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
bool locked;
bool allowPauses;
bool shareLogRange;
bool usePartitionedLogs;
std::map<Standalone<KeyRef>, Standalone<ValueRef>> dbKVs;
@ -67,6 +68,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
agentRequest = getOption(options, LiteralStringRef("simBackupAgents"), true);
allowPauses = getOption(options, LiteralStringRef("allowPauses"), true);
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
usePartitionedLogs = getOption(options, LiteralStringRef("usePartitionedLogs"),
deterministicRandom()->random01() < 0.5 ? true : false);
KeyRef beginRange;
KeyRef endRange;
@ -181,7 +184,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
try {
wait(backupAgent->submitBackup(cx, StringRef(backupContainer), deterministicRandom()->randomInt(0, 100),
tag.toString(), backupRanges, stopDifferentialDelay ? false : true,
/*partitionedLog=*/true));
self->usePartitionedLogs));
} catch (Error& e) {
TraceEvent("BARW_DoBackupSubmitBackupException", randomID).error(e).detail("Tag", printable(tag));
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) throw;
@ -209,8 +212,10 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
TraceEvent("BARW_DoBackupWaitForRestorable", randomID).detail("Tag", backupTag.tagName).detail("Result", resultWait);
state bool restorable = false;
if(lastBackupContainer) {
state Future<BackupDescription> fdesc = lastBackupContainer->describePartitionedBackup();
if (lastBackupContainer) {
state Future<BackupDescription> fdesc = self->usePartitionedLogs
? lastBackupContainer->describePartitionedBackup()
: lastBackupContainer->describeBackup();
wait(ready(fdesc));
if(!fdesc.isError()) {
@ -398,7 +403,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
try {
extraBackup = backupAgent.submitBackup(
cx, LiteralStringRef("file://simfdb/backups/"), deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(), self->backupRanges, true, /*partitionedLog=*/true);
self->backupTag.toString(), self->backupRanges, true, self->usePartitionedLogs);
} catch (Error& e) {
TraceEvent("BARW_SubmitBackup2Exception", randomID)
.error(e)
@ -431,7 +436,14 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("BackupTag", printable(self->backupTag));
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
BackupDescription desc = wait(container->describePartitionedBackup());
BackupDescription desc = wait(self->usePartitionedLogs ? container->describePartitionedBackup()
: container->describeBackup());
TraceEvent("BAFRW_Restore", randomID)
.detail("LastBackupContainer", lastBackupContainer->getURL())
.detail("MinRestorableVersion", desc.minRestorableVersion.get())
.detail("MaxRestorableVersion", desc.maxRestorableVersion.get())
.detail("ContiguousLogEnd", desc.contiguousLogEnd.get());
state Version targetVersion = -1;
if (desc.maxRestorableVersion.present()) {
@ -440,7 +452,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
} else if (deterministicRandom()->random01() < 0.1) {
targetVersion = desc.maxRestorableVersion.get();
} else if (deterministicRandom()->random01() < 0.5) {
ASSERT_WE_THINK(desc.minRestorableVersion.get() <= desc.contiguousLogEnd.get());
// The assertion may fail because minRestorableVersion may be decided by snapshot version.
// ASSERT_WE_THINK(desc.minRestorableVersion.get() <= desc.contiguousLogEnd.get());
// This assertion can fail when contiguousLogEnd < maxRestorableVersion and
// the snapshot version > contiguousLogEnd. I.e., there is a gap between
// contiguousLogEnd and snapshot version.

View File

@ -0,0 +1,239 @@
/*
* ReportConflictingKeys.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// For this test to report properly buggify must be disabled (flow.h) , and failConnection must be disabled in
// (sim2.actor.cpp)
struct ReportConflictingKeysWorkload : TestWorkload {
double testDuration, transactionsPerSecond, addReadConflictRangeProb, addWriteConflictRangeProb;
Key keyPrefix;
int nodeCount, actorCount, keyBytes, valueBytes, readConflictRangeCount, writeConflictRangeCount;
PerfIntCounter invalidReports, commits, conflicts, xacts;
ReportConflictingKeysWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), invalidReports("InvalidReports"), conflicts("Conflicts"), commits("Commits"),
xacts("Transactions") {
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
// transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
actorCount = getOption(options, LiteralStringRef("actorsPerClient"), 1);
keyPrefix = unprintable(
getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("ReportConflictingKeysWorkload"))
.toString());
keyBytes = getOption(options, LiteralStringRef("keyBytes"), 16);
readConflictRangeCount = getOption(options, LiteralStringRef("readConflictRangeCountPerTx"), 1);
writeConflictRangeCount = getOption(options, LiteralStringRef("writeConflictRangeCountPerTx"), 1);
ASSERT(readConflictRangeCount >= 1 && writeConflictRangeCount >= 1);
// modeled by geometric distribution: (1 - prob) / prob = mean - 1, where we add at least one conflictRange to
// each tx
addReadConflictRangeProb = (readConflictRangeCount - 1.0) / readConflictRangeCount;
addWriteConflictRangeProb = (writeConflictRangeCount - 1.0) / writeConflictRangeCount;
ASSERT(keyPrefix.size() + 16 <= keyBytes); // make sure the string format is valid
nodeCount = getOption(options, LiteralStringRef("nodeCount"), 100);
}
std::string description() override { return "ReportConflictingKeysWorkload"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(const Database& cx) override { return _start(cx->clone(), this); }
ACTOR Future<Void> _start(Database cx, ReportConflictingKeysWorkload* self) {
if (self->clientId == 0) wait(timeout(self->conflictingClient(cx, self), self->testDuration, Void()));
return Void();
}
Future<bool> check(Database const& cx) override { return invalidReports.getValue() == 0; }
void getMetrics(vector<PerfMetric>& m) override {
m.push_back(PerfMetric("Measured Duration", testDuration, true));
m.push_back(xacts.getMetric());
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
m.push_back(commits.getMetric());
m.push_back(PerfMetric("Commits/sec", commits.getValue() / testDuration, true));
m.push_back(conflicts.getMetric());
m.push_back(PerfMetric("Conflicts/sec", conflicts.getValue() / testDuration, true));
}
// disable the default timeout setting
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
// Copied from tester.actor.cpp, added parameter to determine the key's length
Key keyForIndex(int n) {
double p = (double)n / nodeCount;
int paddingLen = keyBytes - 16 - keyPrefix.size();
// left padding by zero
return StringRef(format("%0*llx", paddingLen, *(uint64_t*)&p)).withPrefix(keyPrefix);
}
void addRandomReadConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>* readConflictRanges) {
int startIdx, endIdx;
Key startKey, endKey;
do { // add at least one non-empty range
startIdx = deterministicRandom()->randomInt(0, nodeCount);
endIdx = deterministicRandom()->randomInt(startIdx + 1, nodeCount + 1);
startKey = keyForIndex(startIdx);
endKey = keyForIndex(endIdx);
tr->addReadConflictRange(KeyRangeRef(startKey, endKey));
if (readConflictRanges) readConflictRanges->push_back(KeyRangeRef(startKey, endKey));
} while (deterministicRandom()->random01() < addReadConflictRangeProb);
}
void addRandomWriteConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>* writeConflictRanges) {
int startIdx, endIdx;
Key startKey, endKey;
do { // add at least one non-empty range
startIdx = deterministicRandom()->randomInt(0, nodeCount);
endIdx = deterministicRandom()->randomInt(startIdx + 1, nodeCount + 1);
startKey = keyForIndex(startIdx);
endKey = keyForIndex(endIdx);
tr->addWriteConflictRange(KeyRangeRef(startKey, endKey));
if (writeConflictRanges) writeConflictRanges->push_back(KeyRangeRef(startKey, endKey));
} while (deterministicRandom()->random01() < addWriteConflictRangeProb);
}
ACTOR Future<Void> conflictingClient(Database cx, ReportConflictingKeysWorkload* self) {
state ReadYourWritesTransaction tr1(cx);
state ReadYourWritesTransaction tr2(cx);
state std::vector<KeyRange> readConflictRanges;
state std::vector<KeyRange> writeConflictRanges;
loop {
try {
tr2.setOption(FDBTransactionOptions::REPORT_CONFLICTING_KEYS);
// If READ_YOUR_WRITES_DISABLE set, it behaves like native transaction object
// where overlapped conflict ranges are not merged.
if (deterministicRandom()->random01() < 0.5)
tr1.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
if (deterministicRandom()->random01() < 0.5)
tr2.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
// We have the two tx with same grv, then commit the first
// If the second one is not able to commit due to conflicts, verify the returned conflicting keys
// Otherwise, there is no conflicts between tr1's writeConflictRange and tr2's readConflictRange
Version readVersion = wait(tr1.getReadVersion());
tr2.setVersion(readVersion);
self->addRandomReadConflictRange(&tr1, nullptr);
self->addRandomWriteConflictRange(&tr1, &writeConflictRanges);
++self->commits;
wait(tr1.commit());
++self->xacts;
state bool foundConflict = false;
try {
self->addRandomReadConflictRange(&tr2, &readConflictRanges);
self->addRandomWriteConflictRange(&tr2, nullptr);
++self->commits;
wait(tr2.commit());
++self->xacts;
} catch (Error& e) {
if (e.code() != error_code_not_committed) throw e;
foundConflict = true;
++self->conflicts;
}
// check API correctness
if (foundConflict) {
// \xff\xff/transaction/conflicting_keys is always initialized to false, skip it here
state KeyRange ckr =
KeyRangeRef(keyAfter(LiteralStringRef("").withPrefix(conflictingKeysAbsolutePrefix)),
LiteralStringRef("\xff\xff").withPrefix(conflictingKeysAbsolutePrefix));
// The getRange here using the special key prefix "\xff\xff/transaction/conflicting_keys/" happens
// locally Thus, the error handling is not needed here
Future<Standalone<RangeResultRef>> conflictingKeyRangesFuture =
tr2.getRange(ckr, CLIENT_KNOBS->TOO_MANY);
ASSERT(conflictingKeyRangesFuture.isReady());
const Standalone<RangeResultRef> conflictingKeyRanges = conflictingKeyRangesFuture.get();
ASSERT(conflictingKeyRanges.size() &&
(conflictingKeyRanges.size() <= readConflictRanges.size() * 2));
ASSERT(conflictingKeyRanges.size() % 2 == 0);
ASSERT(!conflictingKeyRanges.more);
for (int i = 0; i < conflictingKeyRanges.size(); i += 2) {
KeyValueRef startKeyWithPrefix = conflictingKeyRanges[i];
ASSERT(startKeyWithPrefix.value == conflictingKeysTrue);
KeyValueRef endKeyWithPrefix = conflictingKeyRanges[i + 1];
ASSERT(endKeyWithPrefix.value == conflictingKeysFalse);
// Remove the prefix of returning keys
Key startKey = startKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
Key endKey = endKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
KeyRangeRef kr = KeyRangeRef(startKey, endKey);
if (!std::any_of(readConflictRanges.begin(), readConflictRanges.end(), [&kr](KeyRange rCR) {
// Read_conflict_range remains same in the resolver.
// Thus, the returned keyrange is either the original read_conflict_range or merged
// by several overlapped ones in either cases, it contains at least one original
// read_conflict_range
return kr.contains(rCR);
})) {
++self->invalidReports;
TraceEvent(SevError, "TestFailure")
.detail("Reason",
"Returned conflicting keys are not original or merged readConflictRanges");
} else if (!std::any_of(writeConflictRanges.begin(), writeConflictRanges.end(),
[&kr](KeyRange wCR) {
// Returned key range should be conflicting with at least one
// writeConflictRange
return kr.intersects(wCR);
})) {
++self->invalidReports;
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Returned keyrange is not conflicting with any writeConflictRange");
}
}
} else {
// make sure no conflicts between tr2's readConflictRange and tr1's writeConflictRange
for (const KeyRange& rCR : readConflictRanges) {
if (std::any_of(writeConflictRanges.begin(), writeConflictRanges.end(), [&rCR](KeyRange wCR) {
bool result = wCR.intersects(rCR);
if (result)
TraceEvent(SevError, "TestFailure")
.detail("Reason", "No conflicts returned but it should")
.detail("WriteConflictRangeInTr1", wCR.toString())
.detail("ReadConflictRangeInTr2", rCR.toString());
return result;
})) {
++self->invalidReports;
TraceEvent(SevError, "TestFailure").detail("Reason", "No conflicts returned but it should");
break;
}
}
}
} catch (Error& e) {
state Error e2 = e;
wait(tr1.onError(e2));
wait(tr2.onError(e2));
}
readConflictRanges.clear();
writeConflictRanges.clear();
tr1.reset();
tr2.reset();
}
}
};
WorkloadFactory<ReportConflictingKeysWorkload> ReportConflictingKeysWorkload("ReportConflictingKeys");

View File

@ -73,7 +73,7 @@ struct WriteDuringReadWorkload : TestWorkload {
nodes = newNodes;
TEST(adjacentKeys && (nodes + minNode) > CLIENT_KNOBS->KEY_SIZE_LIMIT); //WriteDuringReadWorkload testing large keys
useExtraDB = g_simulator.extraDB != NULL;
if(useExtraDB) {
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));

View File

@ -496,7 +496,7 @@ public:
return substr( 0, size() - s.size() );
}
std::string toString() const { return std::string( (const char*)data, length ); }
std::string toString() const { return std::string((const char*)data, length); }
static bool isPrintable(char c) { return c > 32 && c < 127; }
inline std::string printable() const;

View File

@ -90,6 +90,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B061070000LL, ShardedTxsTags);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063000000LL, UnifiedTLogSpilling);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, BackupWorker);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ReportConflictingKeys);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole);
};

View File

@ -56,6 +56,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES KVStoreTestWrite.txt UNIT IGNORE)
add_fdb_test(TEST_FILES KVStoreValueSize.txt UNIT IGNORE)
add_fdb_test(TEST_FILES LayerStatusMerge.txt IGNORE)
add_fdb_test(TEST_FILES ParallelRestoreApiCorrectnessAtomicRestore.txt IGNORE)
add_fdb_test(TEST_FILES PureNetwork.txt IGNORE)
add_fdb_test(TEST_FILES RRW2500.txt IGNORE)
add_fdb_test(TEST_FILES RandomRead.txt IGNORE)
@ -121,6 +122,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/MoveKeysCycle.txt)
add_fdb_test(TEST_FILES fast/RandomSelector.txt)
add_fdb_test(TEST_FILES fast/RandomUnitTests.txt)
add_fdb_test(TEST_FILES fast/ReportConflictingKeys.txt)
add_fdb_test(TEST_FILES fast/SelectorCorrectness.txt)
add_fdb_test(TEST_FILES fast/Sideband.txt)
add_fdb_test(TEST_FILES fast/SidebandWithStatus.txt)
@ -208,10 +210,16 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/WriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/WriteDuringReadSwitchover.txt)
add_fdb_test(TEST_FILES slow/ddbalance.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreApiCorrectnessAtomicRestore.txt IGNORE)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupCorrectnessAtomicOp.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupWriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessAtomicOp.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupApiCorrectnessAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreTestSplitMutation.txt)
# Note that status tests are not deterministic.
add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt)
add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt)
@ -228,7 +236,7 @@ if(WITH_PYTHON)
verify_testing()
if (NOT OPEN_FOR_IDE)
if (NOT OPEN_FOR_IDE AND NOT WIN32)
create_test_package()
endif()
endif()

View File

@ -27,6 +27,7 @@ restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=true
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier

View File

@ -0,0 +1,10 @@
testTitle=ReportConflictingKeysTest
testName=ReportConflictingKeys
testDuration=20.0
nodeCount=10000
keyPrefix=RCK
keyBytes=64
readConflictRangeCountPerTx=10
writeConflictRangeCountPerTx=10
connectionFailuresDisableDuration=100000
buggify=off

View File

@ -25,4 +25,6 @@ testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
simBackupAgents=BackupToFile
; This test file uses old restore, which does not work on new backup format
usePartitionedLogs=false

View File

@ -25,6 +25,8 @@ testTitle=BackupAndParallelRestoreWithAtomicOp
clearAfterTest=false
simBackupAgents=BackupToFile
backupRangesCount=-1
; use new backup
usePartitionedLogs=true
testName=RandomClogging
testDuration=90.0

View File

@ -21,6 +21,7 @@ testTitle=BackupAndRestore
simBackupAgents=BackupToFile
; backupRangesCount<0 means backup the entire normal keyspace
backupRangesCount=-1
usePartitionedLogs=true
testName=RandomClogging
testDuration=90.0

View File

@ -44,6 +44,7 @@ testTitle=BackupAndRestore
simBackupAgents=BackupToFile
; backupRangesCount<0 means backup the entire normal keyspace
backupRangesCount=-1
usePartitionedLogs=true
testName=RandomClogging
testDuration=90.0

View File

@ -0,0 +1,43 @@
testTitle=WriteDuringReadTest
testName=WriteDuringRead
maximumTotalData=1000000
testDuration=240.0
slowModeStart=60.0
minNode=1
useSystemKeys=false
testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=true
testName=RandomClogging
testDuration=60.0
testName=Rollback
meanDelay=60.0
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
StderrSeverity=30
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,37 @@
testTitle=ApiCorrectnessTest
testName=ApiCorrectness
runSetup=true
clearAfterTest=true
numKeys=5000
onlyLowerCase=true
shortKeysRatio=0.5
minShortKeyLength=1
maxShortKeyLength=3
minLongKeyLength=1
maxLongKeyLength=128
minValueLength=1
maxValueLength=1000
numGets=1000
numGetRanges=100
numGetRangeSelectors=100
numGetKeys=100
numClears=100
numClearRanges=10
maxTransactionBytes=500000
randomTestDuration=60
timeout=2100
testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=false
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,56 @@
testTitle=BackupAndParallelRestoreWithAtomicOp
testName=AtomicOps
nodeCount=30000
; Make ops space only 1 key per group
; nodeCount=100
transactionsPerSecond=2500.0
; transactionsPerSecond=500.0
; transactionsPerSecond=500.0
; nodeCount=4
; transactionsPerSecond=250.0
testDuration=30.0
clearAfterTest=false
; Specify a type of atomicOp
; opType=0
; actorsPerClient=1
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
; Test case for parallel restore
testName=BackupAndParallelRestoreCorrectness
backupAfter=10.0
restoreAfter=60.0
clearAfterTest=false
simBackupAgents=BackupToFile
backupRangesCount=-1
; use old backup
usePartitionedLogs=false
testName=RandomClogging
testDuration=90.0
testName=Rollback
meanDelay=90.0
testDuration=90.0
; Do NOT kill restore worker process yet
; Kill other process to ensure restore works when FDB cluster has faults
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=90.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=90.0
; Disable buggify for parallel restore
;buggify=on
;testDuration=360000 ;not work
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,52 @@
testTitle=BackupAndRestore
testName=Cycle
; nodeCount=30000
nodeCount=1000
; transactionsPerSecond=500.0
transactionsPerSecond=2500.0
testDuration=30.0
expectedRate=0
clearAfterTest=false
; keyPrefix=!
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
; Test case for parallel restore
testName=BackupAndParallelRestoreCorrectness
backupAfter=10.0
restoreAfter=60.0
clearAfterTest=false
simBackupAgents=BackupToFile
; backupRangesCount<0 means backup the entire normal keyspace
backupRangesCount=-1
usePartitionedLogs=false
testName=RandomClogging
testDuration=90.0
testName=Rollback
meanDelay=90.0
testDuration=90.0
; Do NOT kill restore worker process yet
; Kill other process to ensure restore works when FDB cluster has faults
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=90.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=90.0
; Disable buggify for parallel restore
;buggify=off
;testDuration=360000 ;not work
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,75 @@
testTitle=BackupAndRestore
testName=Cycle
; nodeCount=30000
nodeCount=1000
transactionsPerSecond=2500.0
testDuration=30.0
expectedRate=0
clearAfterTest=false
keyPrefix=!
testName=Cycle
nodeCount=1000
transactionsPerSecond=2500.0
testDuration=30.0
expectedRate=0
clearAfterTest=false
keyPrefix=z
testName=Cycle
nodeCount=1000
transactionsPerSecond=2500.0
testDuration=30.0
expectedRate=0
clearAfterTest=false
keyPrefix=A
testName=Cycle
nodeCount=1000
transactionsPerSecond=2500.0
testDuration=30.0
expectedRate=0
clearAfterTest=false
keyPrefix=Z
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
; Test case for parallel restore
testName=BackupAndParallelRestoreCorrectness
backupAfter=10.0
restoreAfter=60.0
clearAfterTest=false
simBackupAgents=BackupToFile
; backupRangesCount<0 means backup the entire normal keyspace
backupRangesCount=-1
usePartitionedLogs=false
testName=RandomClogging
testDuration=90.0
testName=Rollback
meanDelay=90.0
testDuration=90.0
; Do NOT kill restore worker process yet
; Kill other process to ensure restore works when FDB cluster has faults
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=90.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=90.0
; Disable buggify for parallel restore
;buggify=off
;testDuration=360000 ;not work
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,43 @@
testTitle=WriteDuringReadTest
testName=WriteDuringRead
maximumTotalData=1000000
testDuration=240.0
slowModeStart=60.0
minNode=1
useSystemKeys=false
testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=false
testName=RandomClogging
testDuration=60.0
testName=Rollback
meanDelay=60.0
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
StderrSeverity=30
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,6 @@
testTitle=UnitTests
testName=UnitTests
startDelay=0
useDB=false
maxTestCases=0
testsMatching=/FastRestore/RestoreLoader/splitMutation

View File

@ -11,7 +11,8 @@ testTitle=WriteDuringReadTest
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
usePartitionedLogs=false
testName=RandomClogging
testDuration=60.0