Merge branch 'master' of github.com:apple/foundationdb into refactor-fdbcli

This commit is contained in:
Chaoguang Lin 2021-03-30 16:35:41 -07:00
commit 107f66e4e1
49 changed files with 864 additions and 376 deletions

View File

@ -1,5 +1,7 @@
<img alt="FoundationDB logo" src="documentation/FDB_logo.png?raw=true" width="400">
![Build Status](https://codebuild.us-west-2.amazonaws.com/badges?uuid=eyJlbmNyeXB0ZWREYXRhIjoidTh3TTlZa2FQdkdPL1drZzJUQnA1NWg0MzQ3c1VnMXlaVWQ0MVUwcUJpRlltUExBYmRCc3h2c0p1TXZLdWhDQ3BoS0Jmc2ZZdG5yVmxGUHNJM0JtV0MwPSIsIml2UGFyYW1ldGVyU3BlYyI6InBrclM3R0J2d3hmRUFDTjgiLCJtYXRlcmlhbFNldFNlcmlhbCI6MX0%3D&branch=master)
FoundationDB is a distributed database designed to handle large volumes of structured data across clusters of commodity servers. It organizes data as an ordered key-value store and employs ACID transactions for all operations. It is especially well-suited for read/write workloads but also has excellent performance for write-intensive workloads. Users interact with the database using API language binding.
To learn more about FoundationDB, visit [foundationdb.org](https://www.foundationdb.org/)

View File

@ -357,6 +357,13 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
.extractPtr());
}
// Get network thread busyness (updated every 1s)
// A value of 0 indicates that the client is more or less idle
// A value of 1 (or more) indicates that the client is saturated
extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d) {
return DB(d)->getMainThreadBusyness();
}
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {
try {
TXN(tr)->delref();

View File

@ -187,6 +187,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_create_snapshot(FDBDatabase
uint8_t const* snap_command,
int snap_command_length);
DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db);
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);
DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr);

View File

@ -35,6 +35,7 @@
#include <tuple>
#include <vector>
#include <random>
#include <chrono>
#define DOCTEST_CONFIG_IMPLEMENT
#include "doctest.h"
@ -2126,6 +2127,24 @@ TEST_CASE("block_from_callback") {
context.event.wait();
}
// monitors network busyness for 2 sec (40 readings)
TEST_CASE("monitor_network_busyness") {
bool containsGreaterZero = false;
for (int i = 0; i < 40; i++) {
double busyness = fdb_database_get_main_thread_busyness(db);
// make sure the busyness is between 0 and 1
CHECK(busyness >= 0);
CHECK(busyness <= 1);
if (busyness > 0) {
containsGreaterZero = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
// assert that at least one of the busyness readings was greater than 0
CHECK(containsGreaterZero);
}
int main(int argc, char** argv) {
if (argc != 3 && argc != 4) {
std::cout << "Unit tests for the FoundationDB C API.\n"

View File

@ -1,158 +0,0 @@
version: 0.2
env:
secrets-manager:
DOCKERHUB_AUTH: dockerhub_foundationdb:foundationdb
phases:
install:
commands:
- echo "install phase"
- 'ACCOUNT_ID=$(echo $CODEBUILD_BUILD_ARN | cut -d : -f 5)'
- REGISTRY=${ACCOUNT_ID}.dkr.ecr.${AWS_DEFAULT_REGION}.amazonaws.com
- aws ecr get-login-password | docker login --username AWS --password-stdin ${REGISTRY}
- docker pull ${REGISTRY}/centos:6
- docker tag ${REGISTRY}/centos:6 centos:6
- docker pull ${REGISTRY}/centos:7
- docker tag ${REGISTRY}/centos:7 centos:7
pre_build:
commands:
- echo "pre_build phase"
- COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7)
- DATE_STR=$(date +"%Y%m%d%H%M%S")
build:
commands:
- echo "build phase"
- ################################################################################
- # CENTOS 7 foundationdb/build
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos7/build
- docker pull ${REGISTRY}/foundationdb/build:centos7-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/build:centos7-latest
--tag ${REGISTRY}/foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/build:centos7-latest
--tag ${REGISTRY}/foundationdb/build:latest
--tag foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/build:centos7-latest
--tag foundationdb/build:latest
.
- ################################################################################
- # CENTOS 7 foundationdb/devel
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos7/devel
- docker pull ${REGISTRY}/foundationdb/devel:centos7-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/devel:centos7-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/devel:centos7-latest
--tag ${REGISTRY}/foundationdb/devel:latest
--tag foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/devel:centos7-latest
--tag foundationdb/devel:latest
.
- ################################################################################
- # CENTOS 7 foundationdb/distcc
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos7/distcc
- docker pull ${REGISTRY}/foundationdb/distcc:centos7-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/distcc:centos7-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/distcc:centos7-latest
--tag ${REGISTRY}/foundationdb/distcc:latest
--tag foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/distcc:centos7-latest
--tag foundationdb/distcc:latest
.
- ################################################################################
- # CENTOS 6 foundationdb/build
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos6/build
- docker pull ${REGISTRY}/foundationdb/build:centos6-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/build:centos6-latest
--tag ${REGISTRY}/foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/build:centos6-latest
--tag foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/build:centos6-latest
.
- ################################################################################
- # CENTOS 6 foundationdb/devel
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos6/devel
- docker pull ${REGISTRY}/foundationdb/devel:centos6-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/devel:centos6-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/devel:centos6-latest
--tag foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/devel:centos6-latest
.
- ################################################################################
- # CENTOS 6 foundationdb/distcc
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos6/distcc
- docker pull ${REGISTRY}/foundationdb/distcc:centos6-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/distcc:centos6-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/distcc:centos6-latest
--tag foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/distcc:centos6-latest
.
post_build:
commands:
- echo "post_build phase"
- echo ${DOCKERHUB_AUTH} | docker login --username foundationdb --password-stdin
- ################################################################################
- # CENTOS 7 PUSH TO ECR
- ################################################################################
- # PUSH TO build ECR
- docker push ${REGISTRY}/foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/build:centos7-latest
- docker push ${REGISTRY}/foundationdb/build:latest
- # PUSH TO devel ECR
- docker push ${REGISTRY}/foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/devel:centos7-latest
- docker push ${REGISTRY}/foundationdb/devel:latest
- # PUSH TO distcc ECR
- docker push ${REGISTRY}/foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/distcc:centos7-latest
- docker push ${REGISTRY}/foundationdb/distcc:latest
- ################################################################################
- # CENTOS 7 PUSH TO DOCKERHUB
- ################################################################################
- # PUSH TO build DOCKERHUB
- docker push foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/build:centos7-latest
- docker push foundationdb/build:latest
- # PUSH TO devel DOCKERHUB
- docker push foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/devel:centos7-latest
- docker push foundationdb/devel:latest
- # PUSH TO distcc DOCKERHUB
- docker push foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/distcc:centos7-latest
- docker push foundationdb/distcc:latest
- ################################################################################
- # CENTOS 6 PUSH TO ECR
- ################################################################################
- # PUSH TO build ECR
- docker push ${REGISTRY}/foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/build:centos6-latest
- # PUSH TO devel ECR
- docker push ${REGISTRY}/foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/devel:centos6-latest
- # PUSH TO distcc ECR
- docker push ${REGISTRY}/foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/distcc:centos6-latest
- ################################################################################
- # CENTOS 6 PUSH TO DOCKERHUB
- ################################################################################
- # PUSH TO build DOCKERHUB
- docker push foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/build:centos6-latest
- # PUSH TO devel DOCKERHUB
- docker push foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/devel:centos6-latest
- # PUSH TO distcc DOCKERHUB
- docker push foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/distcc:centos6-latest

View File

@ -37,13 +37,13 @@ RUN sed -i -e '/enabled/d' /etc/yum.repos.d/CentOS-Base.repo && \
lz4-devel \
lz4-static \
mono-devel \
rh-python36 \
rh-python36-python-devel \
rh-ruby24 \
rpm-build \
tcl-devel \
unzip \
wget && \
wget \
rh-python36 \
rh-python36-python-devel \
rh-ruby24 && \
yum clean all && \
rm -rf /var/cache/yum

View File

@ -5,13 +5,16 @@ FROM ${REPOSITORY}:${VERSION}
# add vscode server
RUN yum repolist && \
yum -y install \
bash-completion \
byobu \
cgdb \
emacs-nox \
jq \
the_silver_searcher \
tmux \
tree \
emacs-nox \
vim \
bash-completion \
jq \
cgdb && \
zsh && \
yum clean all && \
rm -rf /var/cache/yum
@ -19,14 +22,25 @@ WORKDIR /tmp
RUN source /opt/rh/devtoolset-8/enable && \
source /opt/rh/rh-python36/enable && \
pip3 install \
lxml \
psutil \
python-dateutil \
subprocess32 \
psutil && \
subprocess32 && \
mkdir fdb-joshua && \
cd fdb-joshua && \
git clone --branch code_pipeline https://github.com/FoundationDB/fdb-joshua . && \
pip3 install /tmp/fdb-joshua && \
cd /tmp && \
curl -Ls https://amazon-eks.s3.us-west-2.amazonaws.com/1.18.9/2020-11-02/bin/linux/amd64/kubectl -o kubectl && \
echo "3dbe69e6deb35fbd6fec95b13d20ac1527544867ae56e3dae17e8c4d638b25b9 kubectl" > kubectl.txt && \
sha256sum -c kubectl.txt && \
mv kubectl /usr/local/bin/kubectl && \
chmod 755 /usr/local/bin/kubectl && \
curl https://awscli.amazonaws.com/awscli-exe-linux-x86_64-2.0.30.zip -o "awscliv2.zip" && \
echo "7ee475f22c1b35cc9e53affbf96a9ffce91706e154a9441d0d39cbf8366b718e awscliv2.zip" > awscliv2.txt && \
sha256sum -c awscliv2.txt && \
unzip -qq awscliv2.zip && \
./aws/install && \
rm -rf /tmp/*
ARG OLD_FDB_BINARY_DIR=/app/deploy/global_data/oldBinaries/
@ -45,17 +59,23 @@ RUN mkdir -p ${OLD_FDB_BINARY_DIR} \
ln -s ${OLD_TLS_LIBRARY_DIR}/FDBGnuTLS.so /usr/lib/foundationdb/plugins/FDBGnuTLS.so
WORKDIR /root
RUN echo -en "\n"\
"source /opt/rh/devtoolset-8/enable\n"\
"source /opt/rh/rh-python36/enable\n"\
"source /opt/rh/rh-ruby24/enable\n"\
"\n"\
"function cmk() {\n"\
" cmake -S ${HOME}/src/foundationdb -B build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84\n"\
"}\n"\
"function ct() {\n"\
" cd ${HOME}/build_output && ctest -j 32 --output-on-failure\n"\
"}\n"\
"function j() {\n"\
" python3 -m joshua.joshua --cluster-file /etc/foundationdb/cluster-file \"\${@}\"\n"\
"}\n" >> .bashrc
RUN rm -f /root/anaconda-ks.cfg && \
printf '%s\n' \
'source /opt/rh/devtoolset-8/enable' \
'source /opt/rh/rh-python36/enable' \
'source /opt/rh/rh-ruby26/enable' \
'' \
'function cmk() {' \
' cmake -S ${HOME}/src/foundationdb -B ${HOME}/build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84' \
'}' \
'function ct() {' \
' cd ${HOME}/build_output && ctest -j 32 --output-on-failure' \
'}' \
'function j() {' \
' python3 -m joshua.joshua "${@}"' \
'}' \
'function jsd() {' \
' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \
'}' \
'' \
>> .bashrc

View File

@ -10,6 +10,7 @@ RUN rpmkeys --import mono-project.com.rpmkey.pgp && \
epel-release \
scl-utils \
yum-utils && \
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo && \
yum install -y \
autoconf \
automake \
@ -19,6 +20,7 @@ RUN rpmkeys --import mono-project.com.rpmkey.pgp && \
devtoolset-8 \
devtoolset-8-libubsan-devel \
devtoolset-8-valgrind-devel \
docker-ce \
dos2unix \
dpkg \
gettext-devel \
@ -59,9 +61,10 @@ RUN source /opt/rh/devtoolset-8/enable && \
tar --strip-components 1 --no-same-owner --directory git -xf git.tar.gz && \
cd git && \
make configure && \
./configure \
&& make && \
./configure && \
make && \
make install && \
cd ../ && \
rm -rf /tmp/*
# build/install ninja

View File

@ -3,15 +3,21 @@ ARG VERSION=centos7-latest
FROM ${REPOSITORY}:${VERSION}
# add vscode server
RUN yum repolist && \
RUN yum-config-manager --add-repo=https://copr.fedorainfracloud.org/coprs/carlwgeorge/ripgrep/repo/epel-7/carlwgeorge-ripgrep-epel-7.repo && \
yum repolist && \
yum -y install \
bash-completion \
byobu \
cgdb \
emacs-nox \
fish \
jq \
ripgrep \
the_silver_searcher \
tmux \
tree \
emacs-nox \
vim \
bash-completion \
jq \
cgdb && \
zsh && \
yum clean all && \
rm -rf /var/cache/yum
@ -19,14 +25,25 @@ WORKDIR /tmp
RUN source /opt/rh/devtoolset-8/enable && \
source /opt/rh/rh-python36/enable && \
pip3 install \
lxml \
psutil \
python-dateutil \
subprocess32 \
psutil && \
subprocess32 && \
mkdir fdb-joshua && \
cd fdb-joshua && \
git clone --branch code_pipeline https://github.com/FoundationDB/fdb-joshua . && \
pip3 install /tmp/fdb-joshua && \
cd /tmp && \
curl -Ls https://amazon-eks.s3.us-west-2.amazonaws.com/1.18.9/2020-11-02/bin/linux/amd64/kubectl -o kubectl && \
echo "3dbe69e6deb35fbd6fec95b13d20ac1527544867ae56e3dae17e8c4d638b25b9 kubectl" > kubectl.txt && \
sha256sum -c kubectl.txt && \
mv kubectl /usr/local/bin/kubectl && \
chmod 755 /usr/local/bin/kubectl && \
curl https://awscli.amazonaws.com/awscli-exe-linux-x86_64-2.0.30.zip -o "awscliv2.zip" && \
echo "7ee475f22c1b35cc9e53affbf96a9ffce91706e154a9441d0d39cbf8366b718e awscliv2.zip" > awscliv2.txt && \
sha256sum -c awscliv2.txt && \
unzip -qq awscliv2.zip && \
./aws/install && \
rm -rf /tmp/*
ARG OLD_FDB_BINARY_DIR=/app/deploy/global_data/oldBinaries/
@ -49,18 +66,44 @@ RUN curl -Ls https://update.code.visualstudio.com/latest/server-linux-x64/stable
mkdir -p .vscode-server/bin/latest && \
tar --strip-components 1 --no-same-owner --directory .vscode-server/bin/latest -xf /tmp/vscode-server-linux-x64.tar.gz && \
touch .vscode-server/bin/latest/0 && \
rm /tmp/*
RUN echo -en "\n"\
"source /opt/rh/devtoolset-8/enable\n"\
"source /opt/rh/rh-python36/enable\n"\
"source /opt/rh/rh-ruby26/enable\n"\
"\n"\
"function cmk() {\n"\
" cmake -S ${HOME}/src/foundationdb -B build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84\n"\
"}\n"\
"function ct() {\n"\
" cd ${HOME}/build_output && ctest -j 32 --output-on-failure\n"\
"}\n"\
"function j() {\n"\
" python3 -m joshua.joshua --cluster-file /etc/foundationdb/cluster-file \"\${@}\"\n"\
"}\n" >> .bashrc
rm -rf /tmp/*
RUN rm -f /root/anaconda-ks.cfg && \
printf '%s\n' \
'#!/usr/bin/env bash' \
'set -Eeuo pipefail' \
'' \
'mkdir -p ~/.docker' \
'cat > ~/.docker/config.json << EOF' \
'{' \
' "proxies":' \
' {' \
' "default":' \
' {' \
' "httpProxy": "${HTTP_PROXY}",' \
' "httpsProxy": "${HTTPS_PROXY}",' \
' "noProxy": "${NO_PROXY}"' \
' }' \
' }' \
'}' \
'EOF' \
> docker_proxy.sh && \
chmod 755 docker_proxy.sh && \
printf '%s\n' \
'source /opt/rh/devtoolset-8/enable' \
'source /opt/rh/rh-python36/enable' \
'source /opt/rh/rh-ruby26/enable' \
'' \
'function cmk() {' \
' cmake -S ${HOME}/src/foundationdb -B ${HOME}/build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84' \
'}' \
'function ct() {' \
' cd ${HOME}/build_output && ctest -j 32 --output-on-failure' \
'}' \
'function j() {' \
' python3 -m joshua.joshua "${@}"' \
'}' \
'function jsd() {' \
' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \
'}' \
'' \
>> .bashrc

View File

@ -0,0 +1,20 @@
ARG REPOSITORY=foundationdb/build
ARG VERSION=centos7-latest
FROM ${REPOSITORY}:${VERSION}
ENV YCSB_VERSION=ycsb-foundationdb-binding-0.17.0 \
PATH=${PATH}:/usr/bin
RUN cd /opt \
&& eval curl "-Ls https://github.com/brianfrankcooper/YCSB/releases/download/0.17.0/ycsb-foundationdb-binding-0.17.0.tar.gz" \
| tar -xzvf -
RUN rm -Rf /opt/${YCSB_VERSION}/lib/fdb-java-5.2.5.jar
# COPY The Appropriate fdb-java-.jar Aaron from packages
# COPY binary RPM for foundationd-db
# Install Binary
WORKDIR "/opt/${YCSB_VERSION}"
ENTRYPOINT ["bin/ycsb.sh"]

View File

@ -10,7 +10,7 @@ function(compile_boost)
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++14 -w)
set(BOOST_CXX_COMPILER "${CMAKE_CXX_COMPILER}")
if(APPLE)
set(BOOST_TOOLSET "darwin")
set(BOOST_TOOLSET "clang-darwin")
# this is to fix a weird macOS issue -- by default
# cmake would otherwise pass a compiler that can't
# compile boost

View File

@ -3,7 +3,7 @@ add_library(jemalloc INTERFACE)
set(USE_JEMALLOC ON)
# We don't want to use jemalloc on Windows
# Nor on FreeBSD, where jemalloc is the default system allocator
if(USE_SANITIZER OR WIN32 OR (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD"))
if(USE_SANITIZER OR WIN32 OR (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") OR APPLE)
set(USE_JEMALLOC OFF)
return()
endif()

View File

@ -481,7 +481,11 @@ An |database-blurb1| Modifications to a database are performed via transactions.
|length-of| ``snapshot_command``
.. note:: The function is exposing the functionality of the fdbcli command ``snapshot``. Please take a look at the documentation before using (see :ref:`disk-snapshot-backups`).
.. function:: double fdb_database_get_main_thread_busyness(FDBDatabase* database)
Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated. By default, this value is updated every second.
Transaction
===========

View File

@ -6,6 +6,7 @@ Release Notes
======
* Change the default for --knob_tls_server_handshake_threads to 64. The previous was 1000. This avoids starting 1000 threads by default, but may adversely affect recovery time for large clusters using tls. Users with large tls clusters should consider explicitly setting this knob in their foundationdb.conf file. `(PR #4421) <https://github.com/apple/foundationdb/pull/4421>`_
* Fix accounting error that could cause commits to incorrectly fail with ``proxy_memory_limit_exceeded``. `(PR #4526) <https://github.com/apple/foundationdb/pull/4526>`_
* As an optimization, partial restore using target key ranges now filters backup log data prior to loading it into the database. `(PR #4554) <https://github.com/apple/foundationdb/pull/4554>`_
6.3.11
======

View File

@ -2236,6 +2236,8 @@ Reference<IBackupContainer> openBackupContainer(const char* name, std::string de
return c;
}
// Submit the restore request to the database if "performRestore" is true. Otherwise,
// check if the restore can be performed.
ACTOR Future<Void> runRestore(Database db,
std::string originalClusterFile,
std::string tagName,
@ -2328,7 +2330,7 @@ ACTOR Future<Void> runRestore(Database db,
printf("Restored to version %" PRId64 "\n", restoredVersion);
}
} else {
state Optional<RestorableFileSet> rset = wait(bc->getRestoreSet(targetVersion));
state Optional<RestorableFileSet> rset = wait(bc->getRestoreSet(targetVersion, ranges));
if (!rset.present()) {
fprintf(stderr,

View File

@ -142,8 +142,9 @@ Version getVersionFromString(std::string const& value) {
}
// Transaction log data is stored by the FoundationDB core in the
// \xff / bklog / keyspace in a funny order for performance reasons.
// Return the ranges of keys that contain the data for the given range
// "backupLogKeys" (i.e., \xff\x02/blog/) keyspace in a funny order for
// performance reasons.
// Returns the ranges of keys that contain the data for the given range
// of versions.
// assert CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE % blocksize = 0. Otherwise calculation of hash will be incorrect
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,

View File

@ -891,16 +891,21 @@ public:
return Optional<RestorableFileSet>();
}
// Get a set of files that can restore the given "keyRangesFilter" to the "targetVersion".
// If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup.
// It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for
// restore times.
//
// If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored,
// because the log can contain mutations of the whole key space, unlike range files that each
// is limited to a smaller key range.
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet(Reference<BackupContainerFileSystem> bc,
Version targetVersion,
VectorRef<KeyRangeRef> keyRangesFilter,
bool logsOnly = false,
Version beginVersion = invalidVersion) {
// Does not support use keyRangesFilter for logsOnly yet
if (logsOnly && !keyRangesFilter.empty()) {
TraceEvent(SevError, "BackupContainerRestoreSetUnsupportedAPI")
.detail("KeyRangesFilter", keyRangesFilter.size());
return Optional<RestorableFileSet>();
for (const auto& range : keyRangesFilter) {
TraceEvent("BackupContainerGetRestoreSet").detail("RangeFilter", printable(range));
}
if (logsOnly) {

View File

@ -3192,6 +3192,154 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase {
StringRef RestoreRangeTaskFunc::name = LiteralStringRef("restore_range_data");
REGISTER_TASKFUNC(RestoreRangeTaskFunc);
// Decodes a mutation log key, which contains (hash, commitVersion, chunkNumber) and
// returns (commitVersion, chunkNumber)
std::pair<Version, int32_t> decodeLogKey(const StringRef& key) {
ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t));
uint8_t hash;
Version version;
int32_t part;
BinaryReader rd(key, Unversioned());
rd >> hash >> version >> part;
version = bigEndian64(version);
part = bigEndian32(part);
int32_t v = version / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
ASSERT(((uint8_t)hashlittle(&v, sizeof(v), 0)) == hash);
return std::make_pair(version, part);
}
// Decodes an encoded list of mutations in the format of:
// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k],
// where a mutation is encoded as:
// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2]
std::vector<MutationRef> decodeLogValue(const StringRef& value) {
StringRefReader reader(value, restore_corrupted_data());
Version protocolVersion = reader.consume<uint64_t>();
if (protocolVersion <= 0x0FDB00A200090001) {
throw incompatible_protocol_version();
}
uint32_t val_length = reader.consume<uint32_t>();
if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) {
TraceEvent(SevError, "FileRestoreLogValueError")
.detail("ValueLen", val_length)
.detail("ValueSize", value.size())
.detail("Value", printable(value));
}
std::vector<MutationRef> mutations;
while (1) {
if (reader.eof())
break;
// Deserialization of a MutationRef, which was packed by MutationListRef::push_back_deep()
uint32_t type, p1len, p2len;
type = reader.consume<uint32_t>();
p1len = reader.consume<uint32_t>();
p2len = reader.consume<uint32_t>();
const uint8_t* key = reader.consume(p1len);
const uint8_t* val = reader.consume(p2len);
mutations.emplace_back((MutationRef::Type)type, StringRef(key, p1len), StringRef(val, p2len));
}
return mutations;
}
// Accumulates mutation log value chunks, as both a vector of chunks and as a combined chunk,
// in chunk order, and can check the chunk set for completion or intersection with a set
// of ranges.
struct AccumulatedMutations {
AccumulatedMutations() : lastChunkNumber(-1) {}
// Add a KV pair for this mutation chunk set
// It will be accumulated onto serializedMutations if the chunk number is
// the next expected value.
void addChunk(int chunkNumber, const KeyValueRef& kv) {
if (chunkNumber == lastChunkNumber + 1) {
lastChunkNumber = chunkNumber;
serializedMutations += kv.value.toString();
} else {
lastChunkNumber = -2;
serializedMutations.clear();
}
kvs.push_back(kv);
}
// Returns true if both
// - 1 or more chunks were added to this set
// - The header of the first chunk contains a valid protocol version and a length
// that matches the bytes after the header in the combined value in serializedMutations
bool isComplete() const {
if (lastChunkNumber >= 0) {
StringRefReader reader(serializedMutations, restore_corrupted_data());
Version protocolVersion = reader.consume<uint64_t>();
if (protocolVersion <= 0x0FDB00A200090001) {
throw incompatible_protocol_version();
}
uint32_t vLen = reader.consume<uint32_t>();
return vLen == reader.remainder().size();
}
return false;
}
// Returns true if a complete chunk contains any MutationRefs which intersect with any
// range in ranges.
// It is undefined behavior to run this if isComplete() does not return true.
bool matchesAnyRange(const std::vector<KeyRange>& ranges) const {
std::vector<MutationRef> mutations = decodeLogValue(serializedMutations);
for (auto& m : mutations) {
for (auto& r : ranges) {
if (m.type == MutationRef::ClearRange) {
if (r.intersects(KeyRangeRef(m.param1, m.param2))) {
return true;
}
} else {
if (r.contains(m.param1)) {
return true;
}
}
}
}
return false;
}
std::vector<KeyValueRef> kvs;
std::string serializedMutations;
int lastChunkNumber;
};
// Returns a vector of filtered KV refs from data which are either part of incomplete mutation groups OR complete
// and have data relevant to one of the KV ranges in ranges
std::vector<KeyValueRef> filterLogMutationKVPairs(VectorRef<KeyValueRef> data, const std::vector<KeyRange>& ranges) {
std::unordered_map<Version, AccumulatedMutations> mutationBlocksByVersion;
for (auto& kv : data) {
auto versionAndChunkNumber = decodeLogKey(kv.key);
mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv);
}
std::vector<KeyValueRef> output;
for (auto& vb : mutationBlocksByVersion) {
AccumulatedMutations& m = vb.second;
// If the mutations are incomplete or match one of the ranges, include in results.
if (!m.isComplete() || m.matchesAnyRange(ranges)) {
output.insert(output.end(), m.kvs.begin(), m.kvs.end());
}
}
return output;
}
struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
static StringRef name;
static constexpr uint32_t version = 1;
@ -3223,6 +3371,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Reference<IBackupContainer> bc;
state std::vector<KeyRange> ranges;
loop {
try {
@ -3232,6 +3381,8 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
Reference<IBackupContainer> _bc = wait(restore.sourceContainer().getOrThrow(tr));
bc = _bc;
wait(store(ranges, restore.getRestoreRangesOrDefault(tr)));
wait(checkTaskVersion(tr->getDatabase(), task, name, version));
wait(taskBucket->keepRunning(tr, task));
@ -3243,10 +3394,14 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
state Key mutationLogPrefix = restore.mutationLogPrefix();
state Reference<IAsyncFile> inFile = wait(bc->readFile(logFile.fileName));
state Standalone<VectorRef<KeyValueRef>> data = wait(decodeLogFileBlock(inFile, readOffset, readLen));
state Standalone<VectorRef<KeyValueRef>> dataOriginal = wait(decodeLogFileBlock(inFile, readOffset, readLen));
// Filter the KV pairs extracted from the log file block to remove any records known to not be needed for this
// restore based on the restore range set.
state std::vector<KeyValueRef> dataFiltered = filterLogMutationKVPairs(dataOriginal, ranges);
state int start = 0;
state int end = data.size();
state int end = dataFiltered.size();
state int dataSizeLimit =
BUGGIFY ? deterministicRandom()->randomInt(256 * 1024, 10e6) : CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE;
@ -3262,8 +3417,8 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
state int i = start;
state int txBytes = 0;
for (; i < end && txBytes < dataSizeLimit; ++i) {
Key k = data[i].key.withPrefix(mutationLogPrefix);
ValueRef v = data[i].value;
Key k = dataFiltered[i].key.withPrefix(mutationLogPrefix);
ValueRef v = dataFiltered[i].value;
tr->set(k, v);
txBytes += k.expectedSize();
txBytes += v.expectedSize();
@ -3291,7 +3446,8 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
.detail("CommitVersion", tr->getCommittedVersion())
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("RecordCountOriginal", dataOriginal.size())
.detail("RecordCountFiltered", dataFiltered.size())
.detail("Bytes", txBytes)
.detail("TaskInstance", THIS_ADDR);
@ -3845,6 +4001,8 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
static TaskParam<Version> firstVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
// Find all files needed for the restore and save them in the RestoreConfig for the task.
// Update the total number of files and blocks and change state to starting.
ACTOR static Future<Void> _execute(Database cx,
Reference<TaskBucket> taskBucket,
Reference<FutureBucket> futureBucket,
@ -3854,6 +4012,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
state Version restoreVersion;
state Version beginVersion;
state Reference<IBackupContainer> bc;
state std::vector<KeyRange> ranges;
loop {
try {
@ -3861,10 +4020,12 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkTaskVersion(tr->getDatabase(), task, name, version));
Version _restoreVersion = wait(restore.restoreVersion().getOrThrow(tr));
restoreVersion = _restoreVersion;
Optional<Version> _beginVersion = wait(restore.beginVersion().get(tr));
beginVersion = _beginVersion.present() ? _beginVersion.get() : invalidVersion;
wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)));
wait(store(ranges, restore.getRestoreRangesOrDefault(tr)));
wait(taskBucket->keepRunning(tr, task));
ERestoreState oldState = wait(restore.stateEnum().getD(tr));
@ -3909,13 +4070,18 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
wait(tr->onError(e));
}
}
Optional<bool> _incremental = wait(restore.incrementalBackupOnly().get(tr));
state bool incremental = _incremental.present() ? _incremental.get() : false;
if (beginVersion == invalidVersion) {
beginVersion = 0;
}
state Standalone<VectorRef<KeyRangeRef>> keyRangesFilter;
for (auto const& r : ranges) {
keyRangesFilter.push_back_deep(keyRangesFilter.arena(), KeyRangeRef(r));
}
Optional<RestorableFileSet> restorable =
wait(bc->getRestoreSet(restoreVersion, VectorRef<KeyRangeRef>(), incremental, beginVersion));
wait(bc->getRestoreSet(restoreVersion, keyRangesFilter, incremental, beginVersion));
if (!incremental) {
beginVersion = restorable.get().snapshot.beginVersion;
}
@ -5034,6 +5200,24 @@ public:
return r;
}
// Submits the restore request to the database and throws "restore_invalid_version" error if
// restore is not possible. Parameters:
// cx: the database to be restored to
// cxOrig: if present, is used to resolve the restore timestamp into a version.
// tagName: restore tag
// url: the backup container's URL that contains all backup files
// ranges: the restored key ranges; if empty, restore all key ranges in the backup
// waitForComplete: if set, wait until the restore is completed before returning; otherwise,
// return when the request is submitted to the database.
// targetVersion: the version to be restored.
// verbose: print verbose information.
// addPrefix: each key is added this prefix during restore.
// removePrefix: for each key to be restored, remove this prefix first.
// lockDB: if set lock the database with randomUid before performing restore;
// otherwise, check database is locked with the randomUid
// incrementalBackupOnly: only perform incremental backup
// beginVersion: restore's begin version
// randomUid: the UID for lock the database
ACTOR static Future<Version> restore(FileBackupAgent* backupAgent,
Database cx,
Optional<Database> cxOrig,
@ -5065,7 +5249,7 @@ public:
}
Optional<RestorableFileSet> restoreSet =
wait(bc->getRestoreSet(targetVersion, VectorRef<KeyRangeRef>(), incrementalBackupOnly, beginVersion));
wait(bc->getRestoreSet(targetVersion, ranges, incrementalBackupOnly, beginVersion));
if (!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")

View File

@ -95,6 +95,7 @@ public:
virtual Reference<ITransaction> createTransaction() = 0;
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0;
virtual void addref() = 0;
virtual void delref() = 0;

View File

@ -38,6 +38,7 @@ void ClientKnobs::initialize(bool randomize) {
init( TOO_MANY, 1000000 );
init( SYSTEM_MONITOR_INTERVAL, 5.0 );
init( NETWORK_BUSYNESS_MONITOR_INTERVAL, 1.0 );
init( FAILURE_MAX_DELAY, 5.0 );
init( FAILURE_MIN_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 1.0;

View File

@ -30,6 +30,7 @@ public:
int TOO_MANY; // FIXME: this should really be split up so we can control these more specifically
double SYSTEM_MONITOR_INTERVAL;
double NETWORK_BUSYNESS_MONITOR_INTERVAL; // The interval in which we should update the network busyness metric
double FAILURE_MAX_DELAY;
double FAILURE_MIN_DELAY;

View File

@ -347,6 +347,15 @@ ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef& uid, const String
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// Get network thread busyness
double DLDatabase::getMainThreadBusyness() {
if (api->databaseGetMainThreadBusyness != nullptr) {
return api->databaseGetMainThreadBusyness(db);
}
return 0;
}
// DLApi
template <class T>
void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) {
@ -360,6 +369,7 @@ void loadClientFunction(T* fp, void* lib, std::string libPath, const char* funct
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad)
: api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
// Loads client API functions (definitions are in FdbCApi struct)
void DLApi::init() {
if (isLibraryLoaded(fdbCPath.c_str())) {
throw external_client_already_loaded();
@ -388,6 +398,11 @@ void DLApi::init() {
loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction");
loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option");
loadClientFunction(&api->databaseGetMainThreadBusyness,
lib,
fdbCPath,
"fdb_database_get_main_thread_busyness",
headerVersion >= 700);
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
loadClientFunction(&api->databaseForceRecoveryWithDataLoss,
@ -917,6 +932,15 @@ ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef& uid, co
return abortableFuture(f, dbState->dbVar->get().onChange);
}
// Get network thread busyness
double MultiVersionDatabase::getMainThreadBusyness() {
if (dbState->db) {
return dbState->db->getMainThreadBusyness();
}
return 0;
}
void MultiVersionDatabase::Connector::connect() {
addref();
onMainThreadVoid(

View File

@ -80,6 +80,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int uidLength,
uint8_t const* snapshotCommmand,
int snapshotCommandLength);
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
// Transaction
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
@ -262,6 +263,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
@ -422,6 +424,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }

View File

@ -1743,6 +1743,30 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
}
}
// update the network busyness on a 1s cadence
ACTOR Future<Void> monitorNetworkBusyness() {
state double prevTime = now();
loop {
wait(delay(CLIENT_KNOBS->NETWORK_BUSYNESS_MONITOR_INTERVAL, TaskPriority::FlushTrace));
double elapsed = now() - prevTime; // get elapsed time from last execution
prevTime = now();
struct NetworkMetrics::PriorityStats& tracker = g_network->networkInfo.metrics.starvationTrackerNetworkBusyness;
if (tracker.active) { // update metrics
tracker.duration += now() - tracker.windowedTimer;
tracker.maxDuration = std::max(tracker.maxDuration, now() - tracker.timer);
tracker.windowedTimer = now();
}
g_network->networkInfo.metrics.networkBusyness =
std::min(elapsed, tracker.duration) / elapsed; // average duration spent doing "work"
tracker.duration = 0;
tracker.maxDuration = 0;
}
}
// Setup g_network and start monitoring for network busyness
void setupNetwork(uint64_t transportId, bool useMetrics) {
if (g_network)
throw network_already_setup();
@ -1756,6 +1780,8 @@ void setupNetwork(uint64_t transportId, bool useMetrics) {
g_network->addStopCallback(TLS::DestroyOpenSSLGlobalState);
FlowTransport::createInstance(true, transportId);
Net2FileSystem::newFileSystem();
uncancellable(monitorNetworkBusyness());
}
void runNetwork() {

View File

@ -91,6 +91,12 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
}
// Return the main network thread busyness
double ThreadSafeDatabase::getMainThreadBusyness() {
ASSERT(g_network);
return g_network->networkInfo.metrics.networkBusyness;
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);

View File

@ -35,6 +35,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
ThreadFuture<Void>
onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The

View File

@ -145,8 +145,17 @@ private:
// The maximum amount of time a write is delayed before being passed along to the underlying file
double maxWriteDelay;
// Modifications which haven't been pushed to file, mapped by the location in the file that is being modified
// Modifications which haven't been pushed to file, mapped by the location in the file that is being modified.
// Be sure to update minSizeAfterPendingModifications when modifying pendingModifications.
RangeMap<uint64_t, Future<Void>> pendingModifications;
// The size of the file after the set of pendingModifications completes,
// (the set pending at the time of reading this member). Must be updated in
// lockstep with any inserts into the pendingModifications map. Tracking
// this variable is necessary so that we can know the range of the file a
// truncate is modifying, so we can insert it into the pendingModifications
// map. Until minSizeAfterPendingModificationsIsExact is true, this is only a lower bound.
mutable int64_t minSizeAfterPendingModifications = 0;
mutable bool minSizeAfterPendingModificationsIsExact = false;
// Will be blocked whenever kill is running
Promise<Void> killed;
@ -437,6 +446,7 @@ private:
Future<Void> writeEnded = wait(ownFuture);
std::vector<Future<Void>> priorModifications =
self->getModificationsAndInsert(offset, length, true, writeEnded);
self->minSizeAfterPendingModifications = std::max(self->minSizeAfterPendingModifications, offset + length);
if (BUGGIFY_WITH_PROB(0.001))
priorModifications.push_back(
@ -603,9 +613,19 @@ private:
//TraceEvent("AsyncFileNonDurable_Truncate", self->id).detail("Delay", delayDuration).detail("Filename", self->filename);
wait(checkKilled(self, "Truncate"));
Future<Void> truncateEnded = wait(ownFuture);
state Future<Void> truncateEnded = wait(ownFuture);
// Need to know the size of the file directly before this truncate
// takes effect to see what range it modifies.
if (!self->minSizeAfterPendingModificationsIsExact) {
wait(success(self->size()));
}
ASSERT(self->minSizeAfterPendingModificationsIsExact);
int64_t beginModifiedRange = std::min(size, self->minSizeAfterPendingModifications);
self->minSizeAfterPendingModifications = size;
std::vector<Future<Void>> priorModifications =
self->getModificationsAndInsert(size, -1, true, truncateEnded);
self->getModificationsAndInsert(beginModifiedRange, /*through end of file*/ -1, true, truncateEnded);
if (BUGGIFY_WITH_PROB(0.001))
priorModifications.push_back(
@ -751,8 +771,9 @@ private:
wait(checkKilled(self, "SizeEnd"));
// Include any modifications which extend past the end of the file
uint64_t maxModification = self->pendingModifications.lastItem().begin();
self->approximateSize = std::max<int64_t>(sizeFuture.get(), maxModification);
self->approximateSize = self->minSizeAfterPendingModifications =
std::max<int64_t>(sizeFuture.get(), self->minSizeAfterPendingModifications);
self->minSizeAfterPendingModificationsIsExact = true;
return self->approximateSize;
}

View File

@ -182,3 +182,22 @@ TEST_CASE("/fileio/rename") {
wait(IAsyncFileSystem::filesystem()->deleteFile(renamedFile, true));
return Void();
}
// Truncating to extend size should zero the new data
TEST_CASE("/fileio/truncateAndRead") {
state std::string filename = "/tmp/__JUNK__";
state Reference<IAsyncFile> f = wait(IAsyncFileSystem::filesystem()->open(
filename, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, 0));
state std::array<char, 4096> data;
wait(f->sync());
wait(f->truncate(4096));
int length = wait(f->read(&data[0], 4096, 0));
ASSERT(length == 4096);
for (auto c : data) {
ASSERT(c == '\0');
}
// close the file by deleting the reference
f.clear();
wait(IAsyncFileSystem::filesystem()->incrementalDeleteFile(filename, true));
return Void();
}

View File

@ -712,6 +712,7 @@ private:
return Void();
}
// Simulated sync does not actually do anything besides wait a random amount of time
ACTOR static Future<Void> sync_impl(SimpleFile* self) {
state UID opId = deterministicRandom()->randomUniqueID();
if (randLog)
@ -737,7 +738,6 @@ private:
.detail("FileCount", machineCache.count(self->filename));
renameFile(sourceFilename.c_str(), self->filename.c_str());
ASSERT(!machineCache.count(self->filename));
machineCache[self->filename] = machineCache[sourceFilename];
machineCache.erase(sourceFilename);
self->actualFilename = self->filename;
@ -2436,19 +2436,19 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
if (flags & IAsyncFile::OPEN_UNCACHED) {
auto& machineCache = g_simulator.getCurrentProcess()->machine->openFiles;
std::string actualFilename = filename;
if (machineCache.find(filename) == machineCache.end()) {
if (flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) {
actualFilename = filename + ".part";
auto partFile = machineCache.find(actualFilename);
if (partFile != machineCache.end()) {
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(partFile->second);
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) {
return Reference<IAsyncFile>(new AsyncFileWriteChecker(r));
});
return f;
}
if (flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) {
actualFilename = filename + ".part";
auto partFile = machineCache.find(actualFilename);
if (partFile != machineCache.end()) {
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(partFile->second);
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) {
return Reference<IAsyncFile>(new AsyncFileWriteChecker(r));
});
return f;
}
}
if (machineCache.find(actualFilename) == machineCache.end()) {
// Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile.
// This way, they can both keep up with the time to start the next operation
auto diskParameters =

View File

@ -339,7 +339,7 @@ public:
bool checkStable = false,
std::set<Optional<Key>> dcIds = std::set<Optional<Key>>(),
std::vector<UID> exclusionWorkerIds = {}) {
std::map<std::pair<ProcessClass::Fitness, bool>, vector<WorkerDetails>> fitness_workers;
std::map<std::tuple<ProcessClass::Fitness, int, bool, bool>, vector<WorkerDetails>> fitness_workers;
std::vector<WorkerDetails> results;
std::vector<LocalityData> unavailableLocals;
Reference<LocalitySet> logServerSet;
@ -406,80 +406,94 @@ public:
}
// This worker is a candidate for TLog recruitment.
fitness_workers[std::make_pair(fitness, worker_details.degraded)].push_back(worker_details);
bool inCCDC = worker_details.interf.locality.dcId() == clusterControllerDcId;
fitness_workers[std::make_tuple(fitness, id_used[worker_process_id], worker_details.degraded, inCCDC)]
.push_back(worker_details);
}
results.reserve(results.size() + id_worker.size());
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign && !bCompleted; fitness++) {
// FIXME: it's not clear whether this is necessary.
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign; fitness++) {
auto fitnessEnum = (ProcessClass::Fitness)fitness;
for (int addingDegraded = 0; addingDegraded < 2; addingDegraded++) {
auto workerItr = fitness_workers.find(std::make_pair(fitnessEnum, (bool)addingDegraded));
if (workerItr != fitness_workers.end()) {
for (auto& worker : workerItr->second) {
logServerMap->add(worker.interf.locality, &worker);
}
}
fitness_workers[std::make_tuple(fitnessEnum, 0, addingDegraded, false)];
}
}
results.reserve(results.size() + id_worker.size());
for (auto workerIter = fitness_workers.begin(); workerIter != fitness_workers.end(); ++workerIter) {
auto fitness = std::get<0>(workerIter->first);
auto used = std::get<1>(workerIter->first);
auto addingDegraded = std::get<2>(workerIter->first);
ASSERT(fitness < ProcessClass::NeverAssign);
if (bCompleted) {
break;
}
if (logServerSet->size() < (addingDegraded == 0 ? desired : required)) {
} else if (logServerSet->size() == required || logServerSet->size() <= desired) {
if (logServerSet->validate(policy)) {
for (auto& object : logServerMap->getObjects()) {
results.push_back(*object);
}
bCompleted = true;
break;
for (auto& worker : workerIter->second) {
logServerMap->add(worker.interf.locality, &worker);
}
if (logServerSet->size() < (std::get<2>(workerIter->first) ? required : desired)) {
} else if (logServerSet->size() == required || logServerSet->size() <= desired) {
if (logServerSet->validate(policy)) {
for (auto& object : logServerMap->getObjects()) {
results.push_back(*object);
}
TraceEvent(SevWarn, "GWFTADNotAcceptable", id)
bCompleted = true;
break;
}
TraceEvent(SevWarn, "GWFTADNotAcceptable", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("TLogPolicy", policy->info())
.detail("DesiredLogs", desired)
.detail("Used", used)
.detail("AddingDegraded", addingDegraded);
}
// Try to select the desired size, if larger
else {
std::vector<LocalityEntry> bestSet;
std::vector<LocalityData> tLocalities;
// Try to find the best team of servers to fulfill the policy
if (findBestPolicySet(bestSet,
logServerSet,
policy,
desired,
SERVER_KNOBS->POLICY_RATING_TESTS,
SERVER_KNOBS->POLICY_GENERATIONS)) {
results.reserve(results.size() + bestSet.size());
for (auto& entry : bestSet) {
auto object = logServerMap->getObject(entry);
ASSERT(object);
results.push_back(*object);
tLocalities.push_back(object->interf.locality);
}
TraceEvent("GWFTADBestResults", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Used", used)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("TLogPolicy", policy->info())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
}
// Try to select the desired size, if larger
else {
std::vector<LocalityEntry> bestSet;
std::vector<LocalityData> tLocalities;
// Try to find the best team of servers to fulfill the policy
if (findBestPolicySet(bestSet,
logServerSet,
policy,
desired,
SERVER_KNOBS->POLICY_RATING_TESTS,
SERVER_KNOBS->POLICY_GENERATIONS)) {
results.reserve(results.size() + bestSet.size());
for (auto& entry : bestSet) {
auto object = logServerMap->getObject(entry);
ASSERT(object);
results.push_back(*object);
tLocalities.push_back(object->interf.locality);
}
TraceEvent("GWFTADBestResults", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Processes", logServerSet->size())
.detail("BestCount", bestSet.size())
.detail("BestZones", ::describeZones(tLocalities))
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
.detail("TLogPolicy", policy->info())
.detail("TotalResults", results.size())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
bCompleted = true;
break;
}
TraceEvent(SevWarn, "GWFTADNoBest", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("BestCount", bestSet.size())
.detail("BestZones", ::describeZones(tLocalities))
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
.detail("TLogPolicy", policy->info())
.detail("TotalResults", results.size())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
bCompleted = true;
break;
}
TraceEvent(SevWarn, "GWFTADNoBest", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Used", used)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("TLogPolicy", policy->info())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
}
}
@ -1157,12 +1171,14 @@ public:
req.configuration,
used,
first_commit_proxy);
auto grv_proxies = getWorkersForRoleInDatacenter(dcId,
ProcessClass::GrvProxy,
req.configuration.getDesiredGrvProxies(),
req.configuration,
used,
first_grv_proxy);
auto resolvers = getWorkersForRoleInDatacenter(dcId,
ProcessClass::Resolver,
req.configuration.getDesiredResolvers(),
@ -1216,6 +1232,7 @@ public:
}
if (bestDC != clusterControllerDcId) {
TraceEvent("BestDCIsNotClusterDC");
vector<Optional<Key>> dcPriority;
dcPriority.push_back(bestDC);
desiredDcIds.set(dcPriority);
@ -1491,13 +1508,15 @@ public:
bool oldSatelliteFallback = false;
for (auto& logSet : dbi.logSystemConfig.tLogs) {
if (region.satelliteTLogPolicy.isValid() && logSet.isLocal && logSet.locality == tagLocalitySatellite) {
oldSatelliteFallback = logSet.tLogPolicy->info() != region.satelliteTLogPolicy->info();
ASSERT(!oldSatelliteFallback ||
(region.satelliteTLogPolicyFallback.isValid() &&
logSet.tLogPolicy->info() == region.satelliteTLogPolicyFallback->info()));
break;
if (region.satelliteTLogPolicyFallback.isValid()) {
for (auto& logSet : dbi.logSystemConfig.tLogs) {
if (region.satelliteTLogPolicy.isValid() && logSet.isLocal && logSet.locality == tagLocalitySatellite) {
oldSatelliteFallback = logSet.tLogPolicy->info() != region.satelliteTLogPolicy->info();
ASSERT(!oldSatelliteFallback ||
(region.satelliteTLogPolicyFallback.isValid() &&
logSet.tLogPolicy->info() == region.satelliteTLogPolicyFallback->info()));
break;
}
}
}

View File

@ -658,6 +658,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
AsyncTrigger printDetailedTeamsInfo;
PromiseStream<GetMetricsRequest> getShardMetrics;
Promise<UID> removeFailedServer;
void resetLocalitySet() {
storageServerSet = Reference<LocalitySet>(new LocalityMap<UID>());
@ -695,7 +696,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<AsyncVar<bool>> zeroHealthyTeams,
bool primary,
Reference<AsyncVar<bool>> processingUnhealthy,
PromiseStream<GetMetricsRequest> getShardMetrics)
PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()),
@ -710,7 +712,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary),
medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0),
processingUnhealthy(processingUnhealthy), lowestUtilizationTeam(0), highestUtilizationTeam(0),
getShardMetrics(getShardMetrics) {
getShardMetrics(getShardMetrics), removeFailedServer(removeFailedServer) {
if (!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId).detail("State", "Inactive").trackLatest("DDTrackerStarting");
}
@ -718,6 +720,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
~DDTeamCollection() {
TraceEvent("DDTeamCollectionDestructed", distributorId).detail("Primary", primary);
// Cancel the teamBuilder to avoid creating new teams after teams are cancelled.
teamBuilder.cancel();
// TraceEvent("DDTeamCollectionDestructed", distributorId)
// .detail("Primary", primary)
// .detail("TeamBuilderDestroyed", server_info.size());
// Other teamCollections also hold pointer to this teamCollection;
// TeamTracker may access the destructed DDTeamCollection if we do not reset the pointer
for (int i = 0; i < teamCollections.size(); i++) {
@ -754,12 +763,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
info->collection = nullptr;
}
// TraceEvent("DDTeamCollectionDestructed", distributorId)
// .detail("Primary", primary)
// .detail("ServerTrackerDestroyed", server_info.size());
teamBuilder.cancel();
// TraceEvent("DDTeamCollectionDestructed", distributorId)
// .detail("Primary", primary)
// .detail("TeamBuilderDestroyed", server_info.size());
// .detail("Primary", primary)
// .detail("ServerTrackerDestroyed", server_info.size());
}
void addLaggingStorageServer(Key zoneId) {
@ -4145,10 +4150,14 @@ ACTOR Future<Void> storageServerTracker(
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
.detail("Server", server->id)
.detail("Excluded", worstAddr.toString());
wait(removeKeysFromFailedServer(cx, server->id, self->lock, ddEnabledState));
if (BUGGIFY)
wait(delay(5.0));
self->shardsAffectedByTeamFailure->eraseServer(server->id);
wait(delay(0.0)); //Do not throw an error while still inside trackExcludedServers
while (!ddEnabledState->isDDEnabled()) {
wait(delay(1.0));
}
if (self->removeFailedServer.canBeSet()) {
self->removeFailedServer.send(server->id);
}
throw movekeys_conflict();
}
}
@ -4944,6 +4953,7 @@ ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db,
}
}
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
const DDEnabledState* ddEnabledState) {
@ -4973,7 +4983,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
state KeyRangeMap<ShardTrackedData> shards;
state Promise<UID> removeFailedServer;
try {
loop {
TraceEvent("DDInitTakingMoveKeysLock", self->ddId);
@ -5204,7 +5214,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
zeroHealthyTeams[0],
true,
processingUnhealthy,
getShardMetrics);
getShardMetrics,
removeFailedServer);
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
remoteTeamCollection =
@ -5220,7 +5231,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
zeroHealthyTeams[1],
false,
processingUnhealthy,
getShardMetrics);
getShardMetrics,
removeFailedServer);
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(
@ -5252,12 +5264,21 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
primaryTeamCollection = Reference<DDTeamCollection>();
remoteTeamCollection = Reference<DDTeamCollection>();
wait(shards.clearAsync());
if (err.code() != error_code_movekeys_conflict)
throw err;
bool ddEnabled = wait(isDataDistributionEnabled(cx, ddEnabledState));
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if (ddEnabled)
throw err;
TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err);
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err);
wait(removeKeysFromFailedServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState));
wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState));
} else {
if (err.code() != error_code_movekeys_conflict) {
throw err;
}
bool ddEnabled = wait(isDataDistributionEnabled(cx, ddEnabledState));
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if (ddEnabled) {
throw err;
}
}
}
}
}
@ -5682,7 +5703,8 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
makeReference<AsyncVar<bool>>(true),
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>()));
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
for (int id = 1; id <= processCount; ++id) {
UID uid(id, 0);
@ -5723,7 +5745,8 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
makeReference<AsyncVar<bool>>(true),
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>()));
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
for (int id = 1; id <= processCount; id++) {
UID uid(id, 0);

View File

@ -178,7 +178,6 @@ public:
void moveShard(KeyRangeRef keys, std::vector<Team> destinationTeam);
void finishMove(KeyRangeRef keys);
void check();
void eraseServer(UID ssID);
private:
struct OrderByTeamKey {

View File

@ -999,10 +999,6 @@ void ShardsAffectedByTeamFailure::erase(Team team, KeyRange const& range) {
}
}
void ShardsAffectedByTeamFailure::eraseServer(UID ssID) {
storageServerShards[ssID] = 0;
}
void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
if (team_shards.insert(std::pair<Team, KeyRange>(team, range)).second) {
for (auto uid = team.servers.begin(); uid != team.servers.end(); ++uid)

View File

@ -492,6 +492,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( MAX_REBOOT_TIME, 5.0 ); if( longReboots ) MAX_REBOOT_TIME = 20.0;
init( LOG_DIRECTORY, "."); // Will be set to the command line flag.
init( SERVER_MEM_LIMIT, 8LL << 30 );
init( SYSTEM_MONITOR_FREQUENCY, 5.0 );
//Ratekeeper
bool slowRatekeeper = randomize && BUGGIFY;

View File

@ -416,6 +416,7 @@ public:
double MAX_REBOOT_TIME;
std::string LOG_DIRECTORY;
int64_t SERVER_MEM_LIMIT;
double SYSTEM_MONITOR_FREQUENCY;
// Ratekeeper
double SMOOTHING_AMOUNT;

View File

@ -839,6 +839,7 @@ ACTOR Future<Void> restartSimulatedSystem(vector<Future<Void>>* systemActors,
return Void();
}
// Configuration details compiled in a structure used when setting up a simulated cluster
struct SimulationConfig {
explicit SimulationConfig(const TestConfig& testConfig);
int extraDB;
@ -857,7 +858,7 @@ private:
void generateNormalConfig(const TestConfig& testConfig);
};
SimulationConfig::SimulationConfig(const TestConfig& testConfig) {
SimulationConfig::SimulationConfig(const TestConfig& testConfig) : extraDB(testConfig.extraDB) {
generateNormalConfig(testConfig);
}

View File

@ -99,6 +99,8 @@ struct WorkloadRequest {
}
};
// Configuration details specified in workload test files that change the simulation
// environment details
struct TestConfig {
int extraDB = 0;
int minimumReplication = 0;

View File

@ -464,7 +464,7 @@ Future<Void> startSystemMonitor(std::string dataFolder,
SystemMonitorMachineState(dataFolder, dcId, zoneId, machineId, g_network->getLocalAddress().ip));
systemMonitor();
return recurring(&systemMonitor, 5.0, TaskPriority::FlushTrace);
return recurring(&systemMonitor, SERVER_KNOBS->SYSTEM_MONITOR_FREQUENCY, TaskPriority::FlushTrace);
}
void testIndexedSet();

View File

@ -1406,30 +1406,26 @@ ACTOR Future<Void> rejoinRequestHandler(Reference<MasterData> self) {
}
}
// Keeps the coordinated state (cstate) updated as the set of recruited tlogs change through recovery.
ACTOR Future<Void> trackTlogRecovery(Reference<MasterData> self,
Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems,
Future<Void> minRecoveryDuration) {
state Future<Void> rejoinRequests = Never();
state DBRecoveryCount recoverCount = self->cstate.myDBState.recoveryCount + 1;
state DatabaseConfiguration configuration =
self->configuration; // self-configuration can be changed by configurationMonitor so we need a copy
loop {
state DBCoreState newState;
self->logSystem->toCoreState(newState);
newState.recoveryCount = recoverCount;
state Future<Void> changed = self->logSystem->onCoreStateChanged();
if (newState.tLogs[0].tLogWriteAntiQuorum != self->configuration.tLogWriteAntiQuorum ||
newState.tLogs[0].tLogReplicationFactor != self->configuration.tLogReplicationFactor) {
TraceEvent("MasterConfigChanged", self->dbgid)
.setMaxEventLength(11000)
.setMaxFieldLength(10000)
.detail("Config", self->configuration.toString())
.detail("TLogWriteAntiQuorum", newState.tLogs[0].tLogWriteAntiQuorum)
.detail("TLogReplicationFactor", newState.tLogs[0].tLogReplicationFactor);
throw master_recovery_failed();
}
ASSERT(newState.tLogs[0].tLogWriteAntiQuorum == configuration.tLogWriteAntiQuorum &&
newState.tLogs[0].tLogReplicationFactor == configuration.tLogReplicationFactor);
state bool allLogs =
newState.tLogs.size() ==
self->configuration.expectedLogSets(self->primaryDcId.size() ? self->primaryDcId[0] : Optional<Key>());
configuration.expectedLogSets(self->primaryDcId.size() ? self->primaryDcId[0] : Optional<Key>());
state bool finalUpdate = !newState.oldTLogData.size() && allLogs;
wait(self->cstate.write(newState, finalUpdate));
wait(minRecoveryDuration);
@ -1463,7 +1459,7 @@ ACTOR Future<Void> trackTlogRecovery(Reference<MasterData> self,
.trackLatest("MasterRecoveryState");
}
if (newState.oldTLogData.size() && self->configuration.repopulateRegionAntiQuorum > 0 &&
if (newState.oldTLogData.size() && configuration.repopulateRegionAntiQuorum > 0 &&
self->logSystem->remoteStorageRecovered()) {
TraceEvent(SevWarnAlways, "RecruitmentStalled_RemoteStorageRecovered", self->dbgid);
self->recruitmentStalled->set(true);

View File

@ -363,6 +363,64 @@ TestWorkload* getWorkloadIface(WorkloadRequest work, Reference<AsyncVar<ServerDB
return compound;
}
/**
* Only works in simulation. This method prints all simulated processes in a human readable form to stdout. It groups
* processes by data center, data hall, zone, and machine (in this order).
*/
void printSimulatedTopology() {
if (!g_network->isSimulated()) {
return;
}
auto processes = g_simulator.getAllProcesses();
std::sort(processes.begin(), processes.end(), [](ISimulator::ProcessInfo* lhs, ISimulator::ProcessInfo* rhs) {
auto l = lhs->locality;
auto r = rhs->locality;
if (l.dcId() != r.dcId()) {
return l.dcId() < r.dcId();
}
if (l.dataHallId() != r.dataHallId()) {
return l.dataHallId() < r.dataHallId();
}
if (l.zoneId() != r.zoneId()) {
return l.zoneId() < r.zoneId();
}
if (l.machineId() != r.zoneId()) {
return l.machineId() < r.machineId();
}
return lhs->address < rhs->address;
});
printf("Simulated Cluster Topology:\n");
printf("===========================\n");
Optional<Standalone<StringRef>> dcId, dataHallId, zoneId, machineId;
for (auto p : processes) {
std::string indent = "";
if (dcId != p->locality.dcId()) {
dcId = p->locality.dcId();
printf("%sdcId: %s\n", indent.c_str(), p->locality.describeDcId().c_str());
}
indent += " ";
if (dataHallId != p->locality.dataHallId()) {
dataHallId = p->locality.dataHallId();
printf("%sdataHallId: %s\n", indent.c_str(), p->locality.describeDataHall().c_str());
}
indent += " ";
if (zoneId != p->locality.zoneId()) {
zoneId = p->locality.zoneId();
printf("%szoneId: %s\n", indent.c_str(), p->locality.describeZone().c_str());
}
indent += " ";
if (machineId != p->locality.machineId()) {
machineId = p->locality.machineId();
printf("%smachineId: %s\n", indent.c_str(), p->locality.describeMachineId().c_str());
}
indent += " ";
printf("%sAddress: %s\n", indent.c_str(), p->address.toString().c_str(), p->name);
indent += " ";
printf("%sClass: %s\n", indent.c_str(), p->startingClass.toString().c_str());
printf("%sName: %s\n", indent.c_str(), p->name);
}
}
ACTOR Future<Void> databaseWarmer(Database cx) {
loop {
state Transaction tr(cx);
@ -977,7 +1035,9 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
TraceEvent("TestParserTest").detail("ClientInfoLogging", value);
} },
{ "startIncompatibleProcess",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value); } }
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value); } },
{ "storageEngineExcludeType",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStorageEngineExcludeType", ""); } }
};
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {
@ -1291,6 +1351,24 @@ ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterContro
}
}
/**
* \brief Test orchestrator: sends test specification to testers in the right order and collects the results.
*
* There are multiple actors in this file with similar names (runTest, runTests) and slightly different signatures.
*
* This is the actual orchestrator. It reads the test specifications (from tests), prepares the cluster (by running the
* configure command given in startingConfiguration) and then runs the workload.
*
* \param cc The cluster controller interface
* \param ci Same as cc.clientInterface
* \param testers The interfaces of the testers that should run the actual workloads
* \param tests The test specifications to run
* \param startingConfiguration If non-empty, the orchestrator will attempt to set this configuration before starting
* the tests.
* \param locality client locality (it seems this is unused?)
*
* \returns A future which will be set after all tests finished.
*/
ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc,
Reference<AsyncVar<Optional<struct ClusterInterface>>> ci,
vector<TesterInterface> testers,
@ -1346,6 +1424,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
// Change the configuration (and/or create the database) if necessary
printf("startingConfiguration:%s start\n", startingConfiguration.toString().c_str());
printSimulatedTopology();
if (useDB && startingConfiguration != StringRef()) {
try {
wait(timeoutError(changeConfiguration(cx, testers, startingConfiguration), 2000.0));
@ -1402,6 +1481,24 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
return Void();
}
/**
* \brief Proxy function that waits until enough testers are available and then calls into the orchestrator.
*
* There are multiple actors in this file with similar names (runTest, runTests) and slightly different signatures.
*
* This actor wraps the actual orchestrator (also called runTests). But before calling that actor, it waits for enough
* testers to come up.
*
* \param cc The cluster controller interface
* \param ci Same as cc.clientInterface
* \param tests The test specifications to run
* \param minTestersExpected The number of testers to expect. This actor will block until it can find this many testers.
* \param startingConfiguration If non-empty, the orchestrator will attempt to set this configuration before starting
* the tests.
* \param locality client locality (it seems this is unused?)
*
* \returns A future which will be set after all tests finished.
*/
ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc,
Reference<AsyncVar<Optional<struct ClusterInterface>>> ci,
vector<TestSpec> tests,
@ -1443,6 +1540,32 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
return Void();
}
/**
* \brief Set up testing environment and run the given tests on a cluster.
*
* There are multiple actors in this file with similar names (runTest, runTests) and slightly different signatures.
*
* This actor is usually the first entry point into the test environment. It itself doesn't implement too much
* functionality. Its main purpose is to generate the test specification from passed arguments and then call into the
* correct actor which will orchestrate the actual test.
*
* \param connFile A cluster connection file. Not all tests require a functional cluster but all tests require
* a cluster file.
* \param whatToRun TEST_TYPE_FROM_FILE to read the test description from a passed toml file or
* TEST_TYPE_CONSISTENCY_CHECK to generate a test spec for consistency checking
* \param at TEST_HERE: this process will act as a test client and execute the given workload. TEST_ON_SERVERS: Run a
* test client on every worker in the cluster. TEST_ON_TESTERS: Run a test client on all servers with class Test
* \param minTestersExpected In at is not TEST_HERE, this will instruct the orchestrator until it can find at least
* minTestersExpected test-clients. This is usually passed through from a command line argument. In simulation, the
* simulator will pass the number of testers that it started.
* \param fileName The path to the toml-file containing the test description. Is ignored if whatToRun !=
* TEST_TYPE_FROM_FILE
* \param startingConfiguration Can be used to configure a cluster before running the test. If this is an empty string,
* it will be ignored, otherwise it will be passed to changeConfiguration.
* \param locality The client locality to be used. This is only used if at == TEST_HERE
*
* \returns A future which will be set after all tests finished.
*/
ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
test_type_t whatToRun,
test_location_t at,

View File

@ -27,6 +27,7 @@
#include "flow/actorcompiler.h" // This must be the last #include.
struct FastTriggeredWatchesWorkload : TestWorkload {
// Tests the time it takes for a watch to be fired after the value has changed in the storage server
int nodes, keyBytes;
double testDuration;
vector<Future<Void>> clients;
@ -75,6 +76,7 @@ struct FastTriggeredWatchesWorkload : TestWorkload {
ACTOR Future<Version> setter(Database cx, Key key, Optional<Value> value) {
state ReadYourWritesTransaction tr(cx);
// set the value of key and return the commit version
wait(delay(deterministicRandom()->random01()));
loop {
try {
@ -105,22 +107,24 @@ struct FastTriggeredWatchesWorkload : TestWorkload {
state Optional<Value> setValue;
if (deterministicRandom()->random01() > 0.5)
setValue = StringRef(format("%010d", deterministicRandom()->randomInt(0, 1000)));
// Set the value at setKey to something random
state Future<Version> setFuture = self->setter(cx, setKey, setValue);
wait(delay(deterministicRandom()->random01()));
loop {
state ReadYourWritesTransaction tr(cx);
try {
Optional<Value> val = wait(tr.get(setKey));
if (!first) {
getDuration = now() - watchEnd;
}
lastReadVersion = tr.getReadVersion().get();
//TraceEvent("FTWGet").detail("Key", printable(setKey)).detail("Value", printable(val)).detail("Ver", tr.getReadVersion().get());
// if the value is already setValue then there is no point setting a watch so break out of the loop
if (val == setValue)
break;
ASSERT(first);
// set a watch and wait for it to be triggered (i.e for self->setter to set the value)
state Future<Void> watchFuture = tr.watch(setKey);
wait(tr.commit());
//TraceEvent("FTWStartWatch").detail("Key", printable(setKey));
@ -134,8 +138,10 @@ struct FastTriggeredWatchesWorkload : TestWorkload {
}
Version ver = wait(setFuture);
//TraceEvent("FTWWatchDone").detail("Key", printable(setKey));
// Assert that the time from setting the key to triggering the watch is no greater than 25s
// TODO: This assertion can cause flaky behaviour since sometimes a watch can take longer to fire
ASSERT(lastReadVersion - ver >= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT ||
lastReadVersion - ver < SERVER_KNOBS->VERSIONS_PER_SECOND * (12 + getDuration));
lastReadVersion - ver < SERVER_KNOBS->VERSIONS_PER_SECOND * (25 + getDuration));
if (now() - testStart > self->testDuration)
break;

View File

@ -74,6 +74,7 @@ struct LowLatencyWorkload : TestWorkload {
++self->operations;
loop {
try {
TraceEvent("StartLowLatencyTransaction");
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
if (doCommit) {
@ -84,6 +85,7 @@ struct LowLatencyWorkload : TestWorkload {
}
break;
} catch (Error& e) {
TraceEvent("LowLatencyTransactionFailed").error(e, true);
wait(tr.onError(e));
++self->retries;
}

View File

@ -502,6 +502,8 @@ struct RemoveServersSafelyWorkload : TestWorkload {
return killProcArray;
}
// Attempts to exclude a set of processes, and once the exclusion is successful it kills them.
// If markExcludeAsFailed is true, then it is an error if we cannot complete the exclusion.
ACTOR static Future<Void> removeAndKill(RemoveServersSafelyWorkload* self,
Database cx,
std::set<AddressExclusion> toKill,
@ -556,7 +558,11 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("Step", "SafetyCheck")
.detail("Exclusions", describe(toKillMarkFailedArray));
choose {
when(bool _safe = wait(checkSafeExclusions(cx, toKillMarkFailedArray))) { safe = _safe; }
when(bool _safe = wait(checkSafeExclusions(cx, toKillMarkFailedArray))) {
safe = _safe && self->protectServers(std::set<AddressExclusion>(toKillMarkFailedArray.begin(),
toKillMarkFailedArray.end()))
.size() == toKillMarkFailedArray.size();
}
when(wait(delay(5.0))) {
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "SafetyCheckTimedOut")

View File

@ -184,6 +184,12 @@ Reference<IRandom> deterministicRandom();
// non-deterministic contexts.
Reference<IRandom> nondeterministicRandom();
// This returns a deterministic random number generator initialized with the same seed as the one returned by
// deterministicRandom. The main use-case for this is to generate deterministic random numbers without changing the
// determinism of the simulator. This is useful for things like generating random UIDs for debug transactions.
// WARNING: This is not thread safe and must not be called from any other thread than the network thread!
Reference<IRandom> debugRandom();
// Populates a buffer with a random sequence of bytes
void generateRandomData(uint8_t* buffer, int length);

View File

@ -135,6 +135,12 @@ thread_local INetwork* thread_network = 0;
class Net2 final : public INetwork, public INetworkConnections {
private:
void updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats,
TaskPriority priority,
TaskPriority lastPriority,
double now);
public:
Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics);
void initTLS(ETLSInitState targetState) override;
@ -1582,6 +1588,28 @@ void Net2::run() {
#endif
}
// Updates the PriorityStats found in NetworkMetrics
void Net2::updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats,
TaskPriority priority,
TaskPriority lastPriority,
double now) {
// Busy -> idle at binStats.priority
if (binStats.priority > priority && binStats.priority <= lastPriority) {
binStats.active = false;
binStats.duration += now - binStats.windowedTimer;
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
}
// Idle -> busy at binStats.priority
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
binStats.active = true;
binStats.timer = now;
binStats.windowedTimer = now;
}
}
// Update both vectors of starvation trackers (one that updates every 5s and the other every 1s)
void Net2::trackAtPriority(TaskPriority priority, double now) {
if (lastPriorityStats == nullptr || priority != lastPriorityStats->priority) {
// Start tracking current priority
@ -1601,22 +1629,12 @@ void Net2::trackAtPriority(TaskPriority priority, double now) {
if (binStats.priority > lastPriority && binStats.priority > priority) {
break;
}
// Busy -> idle at binStats.priority
if (binStats.priority > priority && binStats.priority <= lastPriority) {
binStats.active = false;
binStats.duration += now - binStats.windowedTimer;
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
}
// Idle -> busy at binStats.priority
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
binStats.active = true;
binStats.timer = now;
binStats.windowedTimer = now;
}
updateStarvationTracker(binStats, priority, lastPriority, now);
}
// Update starvation trackers for network busyness
updateStarvationTracker(networkInfo.metrics.starvationTrackerNetworkBusyness, priority, lastPriority, now);
lastPriorityStats = &activeStatsItr.first->second;
}
}

View File

@ -47,11 +47,17 @@ INetwork* g_network = 0;
FILE* randLog = 0;
thread_local Reference<IRandom> seededRandom;
Reference<IRandom> seededDebugRandom;
uint64_t debug_lastLoadBalanceResultEndpointToken = 0;
bool noUnseed = false;
void setThreadLocalDeterministicRandomSeed(uint32_t seed) {
seededRandom = Reference<IRandom>(new DeterministicRandom(seed, true));
seededDebugRandom = Reference<IRandom>(new DeterministicRandom(seed));
}
Reference<IRandom> debugRandom() {
return seededDebugRandom;
}
Reference<IRandom> deterministicRandom() {

View File

@ -27,6 +27,7 @@
#include <string>
#include <stdint.h>
#include <variant>
#include <atomic>
#include "boost/asio.hpp"
#ifndef TLS_DISABLED
#include "boost/asio/ssl.hpp"
@ -320,6 +321,7 @@ class Future;
template <class T>
class Promise;
// Metrics which represent various network properties
struct NetworkMetrics {
enum { SLOW_EVENT_BINS = 16 };
uint64_t countSlowEvents[SLOW_EVENT_BINS] = {};
@ -340,16 +342,37 @@ struct NetworkMetrics {
};
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
double lastRunLoopBusyness;
double lastRunLoopBusyness; // network thread busyness (measured every 5s by default)
std::atomic<double> networkBusyness; // network thread busyness which is returned to the the client (measured every 1s by default)
// starvation trackers which keeps track of different task priorities
std::vector<struct PriorityStats> starvationTrackers;
struct PriorityStats starvationTrackerNetworkBusyness;
static const std::vector<int> starvationBins;
NetworkMetrics() : lastRunLoopBusyness(0) {
for (int priority : starvationBins) {
NetworkMetrics()
: lastRunLoopBusyness(0), networkBusyness(0),
starvationTrackerNetworkBusyness(PriorityStats(static_cast<TaskPriority>(starvationBins.at(0)))) {
for (int priority : starvationBins) { // initalize starvation trackers with given priorities
starvationTrackers.emplace_back(static_cast<TaskPriority>(priority));
}
}
// Since networkBusyness is atomic we need to redefine copy assignment operator
NetworkMetrics& operator=(const NetworkMetrics& rhs) {
for (int i = 0; i < SLOW_EVENT_BINS; i++) {
countSlowEvents[i] = rhs.countSlowEvents[i];
}
secSquaredSubmit = rhs.secSquaredSubmit;
secSquaredDiskStall = rhs.secSquaredDiskStall;
activeTrackers = rhs.activeTrackers;
lastRunLoopBusyness = rhs.lastRunLoopBusyness;
networkBusyness = rhs.networkBusyness.load();
starvationTrackers = rhs.starvationTrackers;
starvationTrackerNetworkBusyness = rhs.starvationTrackerNetworkBusyness;
return *this;
}
};
struct FlowLock;

View File

@ -1,3 +1,4 @@
storageEngineExcludeType=-1
testTitle=Clogged
clearAfterTest=false
testName=Cycle

View File

@ -1,3 +1,4 @@
storageEngineExcludeType=-1
testTitle=Clogged
runSetup=false
testName=Cycle