Merge remote-tracking branch 'origin/master' into bugfixes/simulator-close-files

This commit is contained in:
Markus Pilman 2021-04-01 11:14:28 -06:00
commit 1987682e1e
48 changed files with 915 additions and 400 deletions

View File

@ -1,5 +1,7 @@
<img alt="FoundationDB logo" src="documentation/FDB_logo.png?raw=true" width="400">
![Build Status](https://codebuild.us-west-2.amazonaws.com/badges?uuid=eyJlbmNyeXB0ZWREYXRhIjoidTh3TTlZa2FQdkdPL1drZzJUQnA1NWg0MzQ3c1VnMXlaVWQ0MVUwcUJpRlltUExBYmRCc3h2c0p1TXZLdWhDQ3BoS0Jmc2ZZdG5yVmxGUHNJM0JtV0MwPSIsIml2UGFyYW1ldGVyU3BlYyI6InBrclM3R0J2d3hmRUFDTjgiLCJtYXRlcmlhbFNldFNlcmlhbCI6MX0%3D&branch=master)
FoundationDB is a distributed database designed to handle large volumes of structured data across clusters of commodity servers. It organizes data as an ordered key-value store and employs ACID transactions for all operations. It is especially well-suited for read/write workloads but also has excellent performance for write-intensive workloads. Users interact with the database using API language binding.
To learn more about FoundationDB, visit [foundationdb.org](https://www.foundationdb.org/)

View File

@ -357,6 +357,13 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
.extractPtr());
}
// Get network thread busyness (updated every 1s)
// A value of 0 indicates that the client is more or less idle
// A value of 1 (or more) indicates that the client is saturated
extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d) {
return DB(d)->getMainThreadBusyness();
}
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {
try {
TXN(tr)->delref();

View File

@ -187,6 +187,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_create_snapshot(FDBDatabase
uint8_t const* snap_command,
int snap_command_length);
DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db);
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);
DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr);

View File

@ -35,6 +35,7 @@
#include <tuple>
#include <vector>
#include <random>
#include <chrono>
#define DOCTEST_CONFIG_IMPLEMENT
#include "doctest.h"
@ -2126,6 +2127,24 @@ TEST_CASE("block_from_callback") {
context.event.wait();
}
// monitors network busyness for 2 sec (40 readings)
TEST_CASE("monitor_network_busyness") {
bool containsGreaterZero = false;
for (int i = 0; i < 40; i++) {
double busyness = fdb_database_get_main_thread_busyness(db);
// make sure the busyness is between 0 and 1
CHECK(busyness >= 0);
CHECK(busyness <= 1);
if (busyness > 0) {
containsGreaterZero = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
// assert that at least one of the busyness readings was greater than 0
CHECK(containsGreaterZero);
}
int main(int argc, char** argv) {
if (argc != 3 && argc != 4) {
std::cout << "Unit tests for the FoundationDB C API.\n"

View File

@ -1,158 +0,0 @@
version: 0.2
env:
secrets-manager:
DOCKERHUB_AUTH: dockerhub_foundationdb:foundationdb
phases:
install:
commands:
- echo "install phase"
- 'ACCOUNT_ID=$(echo $CODEBUILD_BUILD_ARN | cut -d : -f 5)'
- REGISTRY=${ACCOUNT_ID}.dkr.ecr.${AWS_DEFAULT_REGION}.amazonaws.com
- aws ecr get-login-password | docker login --username AWS --password-stdin ${REGISTRY}
- docker pull ${REGISTRY}/centos:6
- docker tag ${REGISTRY}/centos:6 centos:6
- docker pull ${REGISTRY}/centos:7
- docker tag ${REGISTRY}/centos:7 centos:7
pre_build:
commands:
- echo "pre_build phase"
- COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7)
- DATE_STR=$(date +"%Y%m%d%H%M%S")
build:
commands:
- echo "build phase"
- ################################################################################
- # CENTOS 7 foundationdb/build
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos7/build
- docker pull ${REGISTRY}/foundationdb/build:centos7-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/build:centos7-latest
--tag ${REGISTRY}/foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/build:centos7-latest
--tag ${REGISTRY}/foundationdb/build:latest
--tag foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/build:centos7-latest
--tag foundationdb/build:latest
.
- ################################################################################
- # CENTOS 7 foundationdb/devel
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos7/devel
- docker pull ${REGISTRY}/foundationdb/devel:centos7-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/devel:centos7-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/devel:centos7-latest
--tag ${REGISTRY}/foundationdb/devel:latest
--tag foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/devel:centos7-latest
--tag foundationdb/devel:latest
.
- ################################################################################
- # CENTOS 7 foundationdb/distcc
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos7/distcc
- docker pull ${REGISTRY}/foundationdb/distcc:centos7-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/distcc:centos7-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/distcc:centos7-latest
--tag ${REGISTRY}/foundationdb/distcc:latest
--tag foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/distcc:centos7-latest
--tag foundationdb/distcc:latest
.
- ################################################################################
- # CENTOS 6 foundationdb/build
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos6/build
- docker pull ${REGISTRY}/foundationdb/build:centos6-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/build:centos6-latest
--tag ${REGISTRY}/foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/build:centos6-latest
--tag foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/build:centos6-latest
.
- ################################################################################
- # CENTOS 6 foundationdb/devel
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos6/devel
- docker pull ${REGISTRY}/foundationdb/devel:centos6-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/devel:centos6-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/devel:centos6-latest
--tag foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/devel:centos6-latest
.
- ################################################################################
- # CENTOS 6 foundationdb/distcc
- ################################################################################
- cd ${CODEBUILD_SRC_DIR}/build/docker/centos6/distcc
- docker pull ${REGISTRY}/foundationdb/distcc:centos6-latest || true
- docker build --cache-from ${REGISTRY}/foundationdb/distcc:centos6-latest
--build-arg REPOSITORY=${REGISTRY}/foundationdb/build
--tag ${REGISTRY}/foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
--tag ${REGISTRY}/foundationdb/distcc:centos6-latest
--tag foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
--tag foundationdb/distcc:centos6-latest
.
post_build:
commands:
- echo "post_build phase"
- echo ${DOCKERHUB_AUTH} | docker login --username foundationdb --password-stdin
- ################################################################################
- # CENTOS 7 PUSH TO ECR
- ################################################################################
- # PUSH TO build ECR
- docker push ${REGISTRY}/foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/build:centos7-latest
- docker push ${REGISTRY}/foundationdb/build:latest
- # PUSH TO devel ECR
- docker push ${REGISTRY}/foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/devel:centos7-latest
- docker push ${REGISTRY}/foundationdb/devel:latest
- # PUSH TO distcc ECR
- docker push ${REGISTRY}/foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/distcc:centos7-latest
- docker push ${REGISTRY}/foundationdb/distcc:latest
- ################################################################################
- # CENTOS 7 PUSH TO DOCKERHUB
- ################################################################################
- # PUSH TO build DOCKERHUB
- docker push foundationdb/build:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/build:centos7-latest
- docker push foundationdb/build:latest
- # PUSH TO devel DOCKERHUB
- docker push foundationdb/devel:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/devel:centos7-latest
- docker push foundationdb/devel:latest
- # PUSH TO distcc DOCKERHUB
- docker push foundationdb/distcc:centos7-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/distcc:centos7-latest
- docker push foundationdb/distcc:latest
- ################################################################################
- # CENTOS 6 PUSH TO ECR
- ################################################################################
- # PUSH TO build ECR
- docker push ${REGISTRY}/foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/build:centos6-latest
- # PUSH TO devel ECR
- docker push ${REGISTRY}/foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/devel:centos6-latest
- # PUSH TO distcc ECR
- docker push ${REGISTRY}/foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push ${REGISTRY}/foundationdb/distcc:centos6-latest
- ################################################################################
- # CENTOS 6 PUSH TO DOCKERHUB
- ################################################################################
- # PUSH TO build DOCKERHUB
- docker push foundationdb/build:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/build:centos6-latest
- # PUSH TO devel DOCKERHUB
- docker push foundationdb/devel:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/devel:centos6-latest
- # PUSH TO distcc DOCKERHUB
- docker push foundationdb/distcc:centos6-${DATE_STR}-${COMMIT_HASH}
- docker push foundationdb/distcc:centos6-latest

View File

@ -37,13 +37,13 @@ RUN sed -i -e '/enabled/d' /etc/yum.repos.d/CentOS-Base.repo && \
lz4-devel \
lz4-static \
mono-devel \
rh-python36 \
rh-python36-python-devel \
rh-ruby24 \
rpm-build \
tcl-devel \
unzip \
wget && \
wget \
rh-python36 \
rh-python36-python-devel \
rh-ruby24 && \
yum clean all && \
rm -rf /var/cache/yum

View File

@ -5,13 +5,16 @@ FROM ${REPOSITORY}:${VERSION}
# add vscode server
RUN yum repolist && \
yum -y install \
bash-completion \
byobu \
cgdb \
emacs-nox \
jq \
the_silver_searcher \
tmux \
tree \
emacs-nox \
vim \
bash-completion \
jq \
cgdb && \
zsh && \
yum clean all && \
rm -rf /var/cache/yum
@ -19,14 +22,25 @@ WORKDIR /tmp
RUN source /opt/rh/devtoolset-8/enable && \
source /opt/rh/rh-python36/enable && \
pip3 install \
lxml \
psutil \
python-dateutil \
subprocess32 \
psutil && \
subprocess32 && \
mkdir fdb-joshua && \
cd fdb-joshua && \
git clone --branch code_pipeline https://github.com/FoundationDB/fdb-joshua . && \
pip3 install /tmp/fdb-joshua && \
cd /tmp && \
curl -Ls https://amazon-eks.s3.us-west-2.amazonaws.com/1.18.9/2020-11-02/bin/linux/amd64/kubectl -o kubectl && \
echo "3dbe69e6deb35fbd6fec95b13d20ac1527544867ae56e3dae17e8c4d638b25b9 kubectl" > kubectl.txt && \
sha256sum -c kubectl.txt && \
mv kubectl /usr/local/bin/kubectl && \
chmod 755 /usr/local/bin/kubectl && \
curl https://awscli.amazonaws.com/awscli-exe-linux-x86_64-2.0.30.zip -o "awscliv2.zip" && \
echo "7ee475f22c1b35cc9e53affbf96a9ffce91706e154a9441d0d39cbf8366b718e awscliv2.zip" > awscliv2.txt && \
sha256sum -c awscliv2.txt && \
unzip -qq awscliv2.zip && \
./aws/install && \
rm -rf /tmp/*
ARG OLD_FDB_BINARY_DIR=/app/deploy/global_data/oldBinaries/
@ -45,17 +59,23 @@ RUN mkdir -p ${OLD_FDB_BINARY_DIR} \
ln -s ${OLD_TLS_LIBRARY_DIR}/FDBGnuTLS.so /usr/lib/foundationdb/plugins/FDBGnuTLS.so
WORKDIR /root
RUN echo -en "\n"\
"source /opt/rh/devtoolset-8/enable\n"\
"source /opt/rh/rh-python36/enable\n"\
"source /opt/rh/rh-ruby24/enable\n"\
"\n"\
"function cmk() {\n"\
" cmake -S ${HOME}/src/foundationdb -B build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84\n"\
"}\n"\
"function ct() {\n"\
" cd ${HOME}/build_output && ctest -j 32 --output-on-failure\n"\
"}\n"\
"function j() {\n"\
" python3 -m joshua.joshua --cluster-file /etc/foundationdb/cluster-file \"\${@}\"\n"\
"}\n" >> .bashrc
RUN rm -f /root/anaconda-ks.cfg && \
printf '%s\n' \
'source /opt/rh/devtoolset-8/enable' \
'source /opt/rh/rh-python36/enable' \
'source /opt/rh/rh-ruby26/enable' \
'' \
'function cmk() {' \
' cmake -S ${HOME}/src/foundationdb -B ${HOME}/build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84' \
'}' \
'function ct() {' \
' cd ${HOME}/build_output && ctest -j 32 --output-on-failure' \
'}' \
'function j() {' \
' python3 -m joshua.joshua "${@}"' \
'}' \
'function jsd() {' \
' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \
'}' \
'' \
>> .bashrc

View File

@ -10,6 +10,7 @@ RUN rpmkeys --import mono-project.com.rpmkey.pgp && \
epel-release \
scl-utils \
yum-utils && \
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo && \
yum install -y \
autoconf \
automake \
@ -19,6 +20,7 @@ RUN rpmkeys --import mono-project.com.rpmkey.pgp && \
devtoolset-8 \
devtoolset-8-libubsan-devel \
devtoolset-8-valgrind-devel \
docker-ce \
dos2unix \
dpkg \
gettext-devel \
@ -59,9 +61,10 @@ RUN source /opt/rh/devtoolset-8/enable && \
tar --strip-components 1 --no-same-owner --directory git -xf git.tar.gz && \
cd git && \
make configure && \
./configure \
&& make && \
./configure && \
make && \
make install && \
cd ../ && \
rm -rf /tmp/*
# build/install ninja

View File

@ -3,15 +3,21 @@ ARG VERSION=centos7-latest
FROM ${REPOSITORY}:${VERSION}
# add vscode server
RUN yum repolist && \
RUN yum-config-manager --add-repo=https://copr.fedorainfracloud.org/coprs/carlwgeorge/ripgrep/repo/epel-7/carlwgeorge-ripgrep-epel-7.repo && \
yum repolist && \
yum -y install \
bash-completion \
byobu \
cgdb \
emacs-nox \
fish \
jq \
ripgrep \
the_silver_searcher \
tmux \
tree \
emacs-nox \
vim \
bash-completion \
jq \
cgdb && \
zsh && \
yum clean all && \
rm -rf /var/cache/yum
@ -19,14 +25,25 @@ WORKDIR /tmp
RUN source /opt/rh/devtoolset-8/enable && \
source /opt/rh/rh-python36/enable && \
pip3 install \
lxml \
psutil \
python-dateutil \
subprocess32 \
psutil && \
subprocess32 && \
mkdir fdb-joshua && \
cd fdb-joshua && \
git clone --branch code_pipeline https://github.com/FoundationDB/fdb-joshua . && \
pip3 install /tmp/fdb-joshua && \
cd /tmp && \
curl -Ls https://amazon-eks.s3.us-west-2.amazonaws.com/1.18.9/2020-11-02/bin/linux/amd64/kubectl -o kubectl && \
echo "3dbe69e6deb35fbd6fec95b13d20ac1527544867ae56e3dae17e8c4d638b25b9 kubectl" > kubectl.txt && \
sha256sum -c kubectl.txt && \
mv kubectl /usr/local/bin/kubectl && \
chmod 755 /usr/local/bin/kubectl && \
curl https://awscli.amazonaws.com/awscli-exe-linux-x86_64-2.0.30.zip -o "awscliv2.zip" && \
echo "7ee475f22c1b35cc9e53affbf96a9ffce91706e154a9441d0d39cbf8366b718e awscliv2.zip" > awscliv2.txt && \
sha256sum -c awscliv2.txt && \
unzip -qq awscliv2.zip && \
./aws/install && \
rm -rf /tmp/*
ARG OLD_FDB_BINARY_DIR=/app/deploy/global_data/oldBinaries/
@ -49,18 +66,44 @@ RUN curl -Ls https://update.code.visualstudio.com/latest/server-linux-x64/stable
mkdir -p .vscode-server/bin/latest && \
tar --strip-components 1 --no-same-owner --directory .vscode-server/bin/latest -xf /tmp/vscode-server-linux-x64.tar.gz && \
touch .vscode-server/bin/latest/0 && \
rm /tmp/*
RUN echo -en "\n"\
"source /opt/rh/devtoolset-8/enable\n"\
"source /opt/rh/rh-python36/enable\n"\
"source /opt/rh/rh-ruby26/enable\n"\
"\n"\
"function cmk() {\n"\
" cmake -S ${HOME}/src/foundationdb -B build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84\n"\
"}\n"\
"function ct() {\n"\
" cd ${HOME}/build_output && ctest -j 32 --output-on-failure\n"\
"}\n"\
"function j() {\n"\
" python3 -m joshua.joshua --cluster-file /etc/foundationdb/cluster-file \"\${@}\"\n"\
"}\n" >> .bashrc
rm -rf /tmp/*
RUN rm -f /root/anaconda-ks.cfg && \
printf '%s\n' \
'#!/usr/bin/env bash' \
'set -Eeuo pipefail' \
'' \
'mkdir -p ~/.docker' \
'cat > ~/.docker/config.json << EOF' \
'{' \
' "proxies":' \
' {' \
' "default":' \
' {' \
' "httpProxy": "${HTTP_PROXY}",' \
' "httpsProxy": "${HTTPS_PROXY}",' \
' "noProxy": "${NO_PROXY}"' \
' }' \
' }' \
'}' \
'EOF' \
> docker_proxy.sh && \
chmod 755 docker_proxy.sh && \
printf '%s\n' \
'source /opt/rh/devtoolset-8/enable' \
'source /opt/rh/rh-python36/enable' \
'source /opt/rh/rh-ruby26/enable' \
'' \
'function cmk() {' \
' cmake -S ${HOME}/src/foundationdb -B ${HOME}/build_output -D USE_CCACHE=1 -D RocksDB_ROOT=/opt/rocksdb-6.10.1 -G Ninja && ninja -C build_output -j 84' \
'}' \
'function ct() {' \
' cd ${HOME}/build_output && ctest -j 32 --output-on-failure' \
'}' \
'function j() {' \
' python3 -m joshua.joshua "${@}"' \
'}' \
'function jsd() {' \
' j start --tarball $(find ${HOME}/build_output/packages -name correctness\*.tar.gz) "${@}"' \
'}' \
'' \
>> .bashrc

View File

@ -0,0 +1,20 @@
ARG REPOSITORY=foundationdb/build
ARG VERSION=centos7-latest
FROM ${REPOSITORY}:${VERSION}
ENV YCSB_VERSION=ycsb-foundationdb-binding-0.17.0 \
PATH=${PATH}:/usr/bin
RUN cd /opt \
&& eval curl "-Ls https://github.com/brianfrankcooper/YCSB/releases/download/0.17.0/ycsb-foundationdb-binding-0.17.0.tar.gz" \
| tar -xzvf -
RUN rm -Rf /opt/${YCSB_VERSION}/lib/fdb-java-5.2.5.jar
# COPY The Appropriate fdb-java-.jar Aaron from packages
# COPY binary RPM for foundationd-db
# Install Binary
WORKDIR "/opt/${YCSB_VERSION}"
ENTRYPOINT ["bin/ycsb.sh"]

View File

@ -10,7 +10,7 @@ function(compile_boost)
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++14 -w)
set(BOOST_CXX_COMPILER "${CMAKE_CXX_COMPILER}")
if(APPLE)
set(BOOST_TOOLSET "darwin")
set(BOOST_TOOLSET "clang-darwin")
# this is to fix a weird macOS issue -- by default
# cmake would otherwise pass a compiler that can't
# compile boost

View File

@ -3,7 +3,7 @@ add_library(jemalloc INTERFACE)
set(USE_JEMALLOC ON)
# We don't want to use jemalloc on Windows
# Nor on FreeBSD, where jemalloc is the default system allocator
if(USE_SANITIZER OR WIN32 OR (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD"))
if(USE_SANITIZER OR WIN32 OR (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") OR APPLE)
set(USE_JEMALLOC OFF)
return()
endif()

View File

@ -481,7 +481,11 @@ An |database-blurb1| Modifications to a database are performed via transactions.
|length-of| ``snapshot_command``
.. note:: The function is exposing the functionality of the fdbcli command ``snapshot``. Please take a look at the documentation before using (see :ref:`disk-snapshot-backups`).
.. function:: double fdb_database_get_main_thread_busyness(FDBDatabase* database)
Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated. By default, this value is updated every second.
Transaction
===========

View File

@ -6,6 +6,7 @@ Release Notes
======
* Change the default for --knob_tls_server_handshake_threads to 64. The previous was 1000. This avoids starting 1000 threads by default, but may adversely affect recovery time for large clusters using tls. Users with large tls clusters should consider explicitly setting this knob in their foundationdb.conf file. `(PR #4421) <https://github.com/apple/foundationdb/pull/4421>`_
* Fix accounting error that could cause commits to incorrectly fail with ``proxy_memory_limit_exceeded``. `(PR #4526) <https://github.com/apple/foundationdb/pull/4526>`_
* As an optimization, partial restore using target key ranges now filters backup log data prior to loading it into the database. `(PR #4554) <https://github.com/apple/foundationdb/pull/4554>`_
6.3.11
======

View File

@ -2236,6 +2236,8 @@ Reference<IBackupContainer> openBackupContainer(const char* name, std::string de
return c;
}
// Submit the restore request to the database if "performRestore" is true. Otherwise,
// check if the restore can be performed.
ACTOR Future<Void> runRestore(Database db,
std::string originalClusterFile,
std::string tagName,
@ -2328,7 +2330,7 @@ ACTOR Future<Void> runRestore(Database db,
printf("Restored to version %" PRId64 "\n", restoredVersion);
}
} else {
state Optional<RestorableFileSet> rset = wait(bc->getRestoreSet(targetVersion));
state Optional<RestorableFileSet> rset = wait(bc->getRestoreSet(targetVersion, ranges));
if (!rset.present()) {
fprintf(stderr,

View File

@ -142,8 +142,9 @@ Version getVersionFromString(std::string const& value) {
}
// Transaction log data is stored by the FoundationDB core in the
// \xff / bklog / keyspace in a funny order for performance reasons.
// Return the ranges of keys that contain the data for the given range
// "backupLogKeys" (i.e., \xff\x02/blog/) keyspace in a funny order for
// performance reasons.
// Returns the ranges of keys that contain the data for the given range
// of versions.
// assert CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE % blocksize = 0. Otherwise calculation of hash will be incorrect
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,

View File

@ -891,16 +891,21 @@ public:
return Optional<RestorableFileSet>();
}
// Get a set of files that can restore the given "keyRangesFilter" to the "targetVersion".
// If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup.
// It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for
// restore times.
//
// If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored,
// because the log can contain mutations of the whole key space, unlike range files that each
// is limited to a smaller key range.
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet(Reference<BackupContainerFileSystem> bc,
Version targetVersion,
VectorRef<KeyRangeRef> keyRangesFilter,
bool logsOnly = false,
Version beginVersion = invalidVersion) {
// Does not support use keyRangesFilter for logsOnly yet
if (logsOnly && !keyRangesFilter.empty()) {
TraceEvent(SevError, "BackupContainerRestoreSetUnsupportedAPI")
.detail("KeyRangesFilter", keyRangesFilter.size());
return Optional<RestorableFileSet>();
for (const auto& range : keyRangesFilter) {
TraceEvent("BackupContainerGetRestoreSet").detail("RangeFilter", printable(range));
}
if (logsOnly) {

View File

@ -160,33 +160,44 @@ struct DatabaseConfiguration {
}
// Retuns the maximum number of discrete failures a cluster can tolerate.
// In HA mode, `fullyReplicatedRegions` is set to false initially when data is being
// replicated to remote, and will be true later. `forAvailablity` is set to true
// In HA mode, `fullyReplicatedRegions` is set to "1" initially when data is being
// replicated to remote, and will be incremented later. `forAvailablity` is set to true
// if we want to account the number for machines that can recruit new tLogs/SS after failures.
// Killing an entire datacenter counts as killing one zone in modes that support it
// Killing an entire datacenter counts as killing one zone in modes that support it.
int32_t maxZoneFailuresTolerated(int fullyReplicatedRegions, bool forAvailability) const {
int worstSatellite = regions.size() ? std::numeric_limits<int>::max() : 0;
int worstSatelliteTLogReplicationFactor = regions.size() ? std::numeric_limits<int>::max() : 0;
int regionsWithNonNegativePriority = 0;
for (auto& r : regions) {
if (r.priority >= 0) {
regionsWithNonNegativePriority++;
}
worstSatellite =
std::min(worstSatellite, r.satelliteTLogReplicationFactor - r.satelliteTLogWriteAntiQuorum);
worstSatelliteTLogReplicationFactor = std::min(
worstSatelliteTLogReplicationFactor, r.satelliteTLogReplicationFactor - r.satelliteTLogWriteAntiQuorum);
if (r.satelliteTLogUsableDcsFallback > 0) {
worstSatellite = std::min(
worstSatellite, r.satelliteTLogReplicationFactorFallback - r.satelliteTLogWriteAntiQuorumFallback);
worstSatelliteTLogReplicationFactor =
std::min(worstSatelliteTLogReplicationFactor,
r.satelliteTLogReplicationFactorFallback - r.satelliteTLogWriteAntiQuorumFallback);
}
}
if (usableRegions > 1 && fullyReplicatedRegions > 1 && worstSatellite > 0 &&
(!forAvailability || regionsWithNonNegativePriority > 1)) {
return 1 + std::min(std::max(tLogReplicationFactor - 1 - tLogWriteAntiQuorum, worstSatellite - 1),
storageTeamSize - 1);
} else if (worstSatellite > 0) {
// Primary and Satellite tLogs are synchronously replicated, hence we can lose all but 1.
return std::min(tLogReplicationFactor + worstSatellite - 1 - tLogWriteAntiQuorum, storageTeamSize - 1);
if (worstSatelliteTLogReplicationFactor <= 0) {
// HA is not enabled in this database. Return single cluster zone failures to tolerate.
return std::min(tLogReplicationFactor - 1 - tLogWriteAntiQuorum, storageTeamSize - 1);
}
return std::min(tLogReplicationFactor - 1 - tLogWriteAntiQuorum, storageTeamSize - 1);
// Compute HA enabled database zone failure tolerance.
auto isGeoReplicatedData = [this, &fullyReplicatedRegions]() {
return usableRegions > 1 && fullyReplicatedRegions > 1;
};
if (isGeoReplicatedData() && (!forAvailability || regionsWithNonNegativePriority > 1)) {
return 1 + std::min(std::max(tLogReplicationFactor - 1 - tLogWriteAntiQuorum,
worstSatelliteTLogReplicationFactor - 1),
storageTeamSize - 1);
}
// Primary and Satellite tLogs are synchronously replicated, hence we can lose all but 1.
return std::min(tLogReplicationFactor + worstSatelliteTLogReplicationFactor - 1 - tLogWriteAntiQuorum,
storageTeamSize - 1);
}
// CommitProxy Servers

View File

@ -3192,6 +3192,154 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase {
StringRef RestoreRangeTaskFunc::name = LiteralStringRef("restore_range_data");
REGISTER_TASKFUNC(RestoreRangeTaskFunc);
// Decodes a mutation log key, which contains (hash, commitVersion, chunkNumber) and
// returns (commitVersion, chunkNumber)
std::pair<Version, int32_t> decodeLogKey(const StringRef& key) {
ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t));
uint8_t hash;
Version version;
int32_t part;
BinaryReader rd(key, Unversioned());
rd >> hash >> version >> part;
version = bigEndian64(version);
part = bigEndian32(part);
int32_t v = version / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
ASSERT(((uint8_t)hashlittle(&v, sizeof(v), 0)) == hash);
return std::make_pair(version, part);
}
// Decodes an encoded list of mutations in the format of:
// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k],
// where a mutation is encoded as:
// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2]
std::vector<MutationRef> decodeLogValue(const StringRef& value) {
StringRefReader reader(value, restore_corrupted_data());
Version protocolVersion = reader.consume<uint64_t>();
if (protocolVersion <= 0x0FDB00A200090001) {
throw incompatible_protocol_version();
}
uint32_t val_length = reader.consume<uint32_t>();
if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) {
TraceEvent(SevError, "FileRestoreLogValueError")
.detail("ValueLen", val_length)
.detail("ValueSize", value.size())
.detail("Value", printable(value));
}
std::vector<MutationRef> mutations;
while (1) {
if (reader.eof())
break;
// Deserialization of a MutationRef, which was packed by MutationListRef::push_back_deep()
uint32_t type, p1len, p2len;
type = reader.consume<uint32_t>();
p1len = reader.consume<uint32_t>();
p2len = reader.consume<uint32_t>();
const uint8_t* key = reader.consume(p1len);
const uint8_t* val = reader.consume(p2len);
mutations.emplace_back((MutationRef::Type)type, StringRef(key, p1len), StringRef(val, p2len));
}
return mutations;
}
// Accumulates mutation log value chunks, as both a vector of chunks and as a combined chunk,
// in chunk order, and can check the chunk set for completion or intersection with a set
// of ranges.
struct AccumulatedMutations {
AccumulatedMutations() : lastChunkNumber(-1) {}
// Add a KV pair for this mutation chunk set
// It will be accumulated onto serializedMutations if the chunk number is
// the next expected value.
void addChunk(int chunkNumber, const KeyValueRef& kv) {
if (chunkNumber == lastChunkNumber + 1) {
lastChunkNumber = chunkNumber;
serializedMutations += kv.value.toString();
} else {
lastChunkNumber = -2;
serializedMutations.clear();
}
kvs.push_back(kv);
}
// Returns true if both
// - 1 or more chunks were added to this set
// - The header of the first chunk contains a valid protocol version and a length
// that matches the bytes after the header in the combined value in serializedMutations
bool isComplete() const {
if (lastChunkNumber >= 0) {
StringRefReader reader(serializedMutations, restore_corrupted_data());
Version protocolVersion = reader.consume<uint64_t>();
if (protocolVersion <= 0x0FDB00A200090001) {
throw incompatible_protocol_version();
}
uint32_t vLen = reader.consume<uint32_t>();
return vLen == reader.remainder().size();
}
return false;
}
// Returns true if a complete chunk contains any MutationRefs which intersect with any
// range in ranges.
// It is undefined behavior to run this if isComplete() does not return true.
bool matchesAnyRange(const std::vector<KeyRange>& ranges) const {
std::vector<MutationRef> mutations = decodeLogValue(serializedMutations);
for (auto& m : mutations) {
for (auto& r : ranges) {
if (m.type == MutationRef::ClearRange) {
if (r.intersects(KeyRangeRef(m.param1, m.param2))) {
return true;
}
} else {
if (r.contains(m.param1)) {
return true;
}
}
}
}
return false;
}
std::vector<KeyValueRef> kvs;
std::string serializedMutations;
int lastChunkNumber;
};
// Returns a vector of filtered KV refs from data which are either part of incomplete mutation groups OR complete
// and have data relevant to one of the KV ranges in ranges
std::vector<KeyValueRef> filterLogMutationKVPairs(VectorRef<KeyValueRef> data, const std::vector<KeyRange>& ranges) {
std::unordered_map<Version, AccumulatedMutations> mutationBlocksByVersion;
for (auto& kv : data) {
auto versionAndChunkNumber = decodeLogKey(kv.key);
mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv);
}
std::vector<KeyValueRef> output;
for (auto& vb : mutationBlocksByVersion) {
AccumulatedMutations& m = vb.second;
// If the mutations are incomplete or match one of the ranges, include in results.
if (!m.isComplete() || m.matchesAnyRange(ranges)) {
output.insert(output.end(), m.kvs.begin(), m.kvs.end());
}
}
return output;
}
struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
static StringRef name;
static constexpr uint32_t version = 1;
@ -3223,6 +3371,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Reference<IBackupContainer> bc;
state std::vector<KeyRange> ranges;
loop {
try {
@ -3232,6 +3381,8 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
Reference<IBackupContainer> _bc = wait(restore.sourceContainer().getOrThrow(tr));
bc = _bc;
wait(store(ranges, restore.getRestoreRangesOrDefault(tr)));
wait(checkTaskVersion(tr->getDatabase(), task, name, version));
wait(taskBucket->keepRunning(tr, task));
@ -3243,10 +3394,14 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
state Key mutationLogPrefix = restore.mutationLogPrefix();
state Reference<IAsyncFile> inFile = wait(bc->readFile(logFile.fileName));
state Standalone<VectorRef<KeyValueRef>> data = wait(decodeLogFileBlock(inFile, readOffset, readLen));
state Standalone<VectorRef<KeyValueRef>> dataOriginal = wait(decodeLogFileBlock(inFile, readOffset, readLen));
// Filter the KV pairs extracted from the log file block to remove any records known to not be needed for this
// restore based on the restore range set.
state std::vector<KeyValueRef> dataFiltered = filterLogMutationKVPairs(dataOriginal, ranges);
state int start = 0;
state int end = data.size();
state int end = dataFiltered.size();
state int dataSizeLimit =
BUGGIFY ? deterministicRandom()->randomInt(256 * 1024, 10e6) : CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE;
@ -3262,8 +3417,8 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
state int i = start;
state int txBytes = 0;
for (; i < end && txBytes < dataSizeLimit; ++i) {
Key k = data[i].key.withPrefix(mutationLogPrefix);
ValueRef v = data[i].value;
Key k = dataFiltered[i].key.withPrefix(mutationLogPrefix);
ValueRef v = dataFiltered[i].value;
tr->set(k, v);
txBytes += k.expectedSize();
txBytes += v.expectedSize();
@ -3291,7 +3446,8 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
.detail("CommitVersion", tr->getCommittedVersion())
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("RecordCountOriginal", dataOriginal.size())
.detail("RecordCountFiltered", dataFiltered.size())
.detail("Bytes", txBytes)
.detail("TaskInstance", THIS_ADDR);
@ -3845,6 +4001,8 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
static TaskParam<Version> firstVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
// Find all files needed for the restore and save them in the RestoreConfig for the task.
// Update the total number of files and blocks and change state to starting.
ACTOR static Future<Void> _execute(Database cx,
Reference<TaskBucket> taskBucket,
Reference<FutureBucket> futureBucket,
@ -3854,6 +4012,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
state Version restoreVersion;
state Version beginVersion;
state Reference<IBackupContainer> bc;
state std::vector<KeyRange> ranges;
loop {
try {
@ -3861,10 +4020,12 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkTaskVersion(tr->getDatabase(), task, name, version));
Version _restoreVersion = wait(restore.restoreVersion().getOrThrow(tr));
restoreVersion = _restoreVersion;
Optional<Version> _beginVersion = wait(restore.beginVersion().get(tr));
beginVersion = _beginVersion.present() ? _beginVersion.get() : invalidVersion;
wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)));
wait(store(ranges, restore.getRestoreRangesOrDefault(tr)));
wait(taskBucket->keepRunning(tr, task));
ERestoreState oldState = wait(restore.stateEnum().getD(tr));
@ -3909,13 +4070,18 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
wait(tr->onError(e));
}
}
Optional<bool> _incremental = wait(restore.incrementalBackupOnly().get(tr));
state bool incremental = _incremental.present() ? _incremental.get() : false;
if (beginVersion == invalidVersion) {
beginVersion = 0;
}
state Standalone<VectorRef<KeyRangeRef>> keyRangesFilter;
for (auto const& r : ranges) {
keyRangesFilter.push_back_deep(keyRangesFilter.arena(), KeyRangeRef(r));
}
Optional<RestorableFileSet> restorable =
wait(bc->getRestoreSet(restoreVersion, VectorRef<KeyRangeRef>(), incremental, beginVersion));
wait(bc->getRestoreSet(restoreVersion, keyRangesFilter, incremental, beginVersion));
if (!incremental) {
beginVersion = restorable.get().snapshot.beginVersion;
}
@ -5034,6 +5200,24 @@ public:
return r;
}
// Submits the restore request to the database and throws "restore_invalid_version" error if
// restore is not possible. Parameters:
// cx: the database to be restored to
// cxOrig: if present, is used to resolve the restore timestamp into a version.
// tagName: restore tag
// url: the backup container's URL that contains all backup files
// ranges: the restored key ranges; if empty, restore all key ranges in the backup
// waitForComplete: if set, wait until the restore is completed before returning; otherwise,
// return when the request is submitted to the database.
// targetVersion: the version to be restored.
// verbose: print verbose information.
// addPrefix: each key is added this prefix during restore.
// removePrefix: for each key to be restored, remove this prefix first.
// lockDB: if set lock the database with randomUid before performing restore;
// otherwise, check database is locked with the randomUid
// incrementalBackupOnly: only perform incremental backup
// beginVersion: restore's begin version
// randomUid: the UID for lock the database
ACTOR static Future<Version> restore(FileBackupAgent* backupAgent,
Database cx,
Optional<Database> cxOrig,
@ -5065,7 +5249,7 @@ public:
}
Optional<RestorableFileSet> restoreSet =
wait(bc->getRestoreSet(targetVersion, VectorRef<KeyRangeRef>(), incrementalBackupOnly, beginVersion));
wait(bc->getRestoreSet(targetVersion, ranges, incrementalBackupOnly, beginVersion));
if (!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")

View File

@ -96,6 +96,7 @@ public:
virtual Reference<ITransaction> createTransaction() = 0;
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0;
virtual void addref() = 0;
virtual void delref() = 0;

View File

@ -38,6 +38,7 @@ void ClientKnobs::initialize(bool randomize) {
init( TOO_MANY, 1000000 );
init( SYSTEM_MONITOR_INTERVAL, 5.0 );
init( NETWORK_BUSYNESS_MONITOR_INTERVAL, 1.0 );
init( FAILURE_MAX_DELAY, 5.0 );
init( FAILURE_MIN_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 1.0;

View File

@ -30,6 +30,7 @@ public:
int TOO_MANY; // FIXME: this should really be split up so we can control these more specifically
double SYSTEM_MONITOR_INTERVAL;
double NETWORK_BUSYNESS_MONITOR_INTERVAL; // The interval in which we should update the network busyness metric
double FAILURE_MAX_DELAY;
double FAILURE_MIN_DELAY;

View File

@ -347,6 +347,15 @@ ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef& uid, const String
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// Get network thread busyness
double DLDatabase::getMainThreadBusyness() {
if (api->databaseGetMainThreadBusyness != nullptr) {
return api->databaseGetMainThreadBusyness(db);
}
return 0;
}
// DLApi
template <class T>
void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) {
@ -360,6 +369,7 @@ void loadClientFunction(T* fp, void* lib, std::string libPath, const char* funct
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad)
: api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
// Loads client API functions (definitions are in FdbCApi struct)
void DLApi::init() {
if (isLibraryLoaded(fdbCPath.c_str())) {
throw external_client_already_loaded();
@ -388,6 +398,11 @@ void DLApi::init() {
loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction");
loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option");
loadClientFunction(&api->databaseGetMainThreadBusyness,
lib,
fdbCPath,
"fdb_database_get_main_thread_busyness",
headerVersion >= 700);
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
loadClientFunction(&api->databaseForceRecoveryWithDataLoss,
@ -917,6 +932,15 @@ ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef& uid, co
return abortableFuture(f, dbState->dbVar->get().onChange);
}
// Get network thread busyness
double MultiVersionDatabase::getMainThreadBusyness() {
if (dbState->db) {
return dbState->db->getMainThreadBusyness();
}
return 0;
}
void MultiVersionDatabase::Connector::connect() {
addref();
onMainThreadVoid(

View File

@ -80,6 +80,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int uidLength,
uint8_t const* snapshotCommmand,
int snapshotCommandLength);
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
// Transaction
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
@ -262,6 +263,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
@ -422,6 +424,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }

View File

@ -1743,6 +1743,30 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
}
}
// update the network busyness on a 1s cadence
ACTOR Future<Void> monitorNetworkBusyness() {
state double prevTime = now();
loop {
wait(delay(CLIENT_KNOBS->NETWORK_BUSYNESS_MONITOR_INTERVAL, TaskPriority::FlushTrace));
double elapsed = now() - prevTime; // get elapsed time from last execution
prevTime = now();
struct NetworkMetrics::PriorityStats& tracker = g_network->networkInfo.metrics.starvationTrackerNetworkBusyness;
if (tracker.active) { // update metrics
tracker.duration += now() - tracker.windowedTimer;
tracker.maxDuration = std::max(tracker.maxDuration, now() - tracker.timer);
tracker.windowedTimer = now();
}
g_network->networkInfo.metrics.networkBusyness =
std::min(elapsed, tracker.duration) / elapsed; // average duration spent doing "work"
tracker.duration = 0;
tracker.maxDuration = 0;
}
}
// Setup g_network and start monitoring for network busyness
void setupNetwork(uint64_t transportId, bool useMetrics) {
if (g_network)
throw network_already_setup();
@ -1756,6 +1780,8 @@ void setupNetwork(uint64_t transportId, bool useMetrics) {
g_network->addStopCallback(TLS::DestroyOpenSSLGlobalState);
FlowTransport::createInstance(true, transportId);
Net2FileSystem::newFileSystem();
uncancellable(monitorNetworkBusyness());
}
void runNetwork() {

View File

@ -91,6 +91,12 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
}
// Return the main network thread busyness
double ThreadSafeDatabase::getMainThreadBusyness() {
ASSERT(g_network);
return g_network->networkInfo.metrics.networkBusyness;
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);

View File

@ -35,6 +35,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
ThreadFuture<Void>
onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The

View File

@ -145,8 +145,17 @@ private:
// The maximum amount of time a write is delayed before being passed along to the underlying file
double maxWriteDelay;
// Modifications which haven't been pushed to file, mapped by the location in the file that is being modified
// Modifications which haven't been pushed to file, mapped by the location in the file that is being modified.
// Be sure to update minSizeAfterPendingModifications when modifying pendingModifications.
RangeMap<uint64_t, Future<Void>> pendingModifications;
// The size of the file after the set of pendingModifications completes,
// (the set pending at the time of reading this member). Must be updated in
// lockstep with any inserts into the pendingModifications map. Tracking
// this variable is necessary so that we can know the range of the file a
// truncate is modifying, so we can insert it into the pendingModifications
// map. Until minSizeAfterPendingModificationsIsExact is true, this is only a lower bound.
mutable int64_t minSizeAfterPendingModifications = 0;
mutable bool minSizeAfterPendingModificationsIsExact = false;
// Will be blocked whenever kill is running
Promise<Void> killed;
@ -475,6 +484,7 @@ private:
Future<Void> writeEnded = wait(ownFuture);
std::vector<Future<Void>> priorModifications =
self->getModificationsAndInsert(offset, length, true, writeEnded);
self->minSizeAfterPendingModifications = std::max(self->minSizeAfterPendingModifications, offset + length);
if (BUGGIFY_WITH_PROB(0.001))
priorModifications.push_back(
@ -641,9 +651,19 @@ private:
//TraceEvent("AsyncFileNonDurable_Truncate", self->id).detail("Delay", delayDuration).detail("Filename", self->filename);
wait(checkKilled(self, "Truncate"));
Future<Void> truncateEnded = wait(ownFuture);
state Future<Void> truncateEnded = wait(ownFuture);
// Need to know the size of the file directly before this truncate
// takes effect to see what range it modifies.
if (!self->minSizeAfterPendingModificationsIsExact) {
wait(success(self->size()));
}
ASSERT(self->minSizeAfterPendingModificationsIsExact);
int64_t beginModifiedRange = std::min(size, self->minSizeAfterPendingModifications);
self->minSizeAfterPendingModifications = size;
std::vector<Future<Void>> priorModifications =
self->getModificationsAndInsert(size, -1, true, truncateEnded);
self->getModificationsAndInsert(beginModifiedRange, /*through end of file*/ -1, true, truncateEnded);
if (BUGGIFY_WITH_PROB(0.001))
priorModifications.push_back(
@ -789,8 +809,9 @@ private:
wait(checkKilled(self, "SizeEnd"));
// Include any modifications which extend past the end of the file
uint64_t maxModification = self->pendingModifications.lastItem().begin();
self->approximateSize = std::max<int64_t>(sizeFuture.get(), maxModification);
self->approximateSize = self->minSizeAfterPendingModifications =
std::max<int64_t>(sizeFuture.get(), self->minSizeAfterPendingModifications);
self->minSizeAfterPendingModificationsIsExact = true;
return self->approximateSize;
}

View File

@ -182,3 +182,22 @@ TEST_CASE("/fileio/rename") {
wait(IAsyncFileSystem::filesystem()->deleteFile(renamedFile, true));
return Void();
}
// Truncating to extend size should zero the new data
TEST_CASE("/fileio/truncateAndRead") {
state std::string filename = "/tmp/__JUNK__";
state Reference<IAsyncFile> f = wait(IAsyncFileSystem::filesystem()->open(
filename, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, 0));
state std::array<char, 4096> data;
wait(f->sync());
wait(f->truncate(4096));
int length = wait(f->read(&data[0], 4096, 0));
ASSERT(length == 4096);
for (auto c : data) {
ASSERT(c == '\0');
}
// close the file by deleting the reference
f.clear();
wait(IAsyncFileSystem::filesystem()->incrementalDeleteFile(filename, true));
return Void();
}

View File

@ -391,6 +391,7 @@ public:
std::string disablePrimary;
std::string disableRemote;
std::string originalRegions;
std::string startingDisabledConfiguration;
bool allowLogSetKills;
Optional<Standalone<StringRef>> remoteDcId;
bool hasSatelliteReplication;

View File

@ -339,7 +339,7 @@ public:
bool checkStable = false,
std::set<Optional<Key>> dcIds = std::set<Optional<Key>>(),
std::vector<UID> exclusionWorkerIds = {}) {
std::map<std::pair<ProcessClass::Fitness, bool>, vector<WorkerDetails>> fitness_workers;
std::map<std::tuple<ProcessClass::Fitness, int, bool, bool>, vector<WorkerDetails>> fitness_workers;
std::vector<WorkerDetails> results;
std::vector<LocalityData> unavailableLocals;
Reference<LocalitySet> logServerSet;
@ -406,80 +406,94 @@ public:
}
// This worker is a candidate for TLog recruitment.
fitness_workers[std::make_pair(fitness, worker_details.degraded)].push_back(worker_details);
bool inCCDC = worker_details.interf.locality.dcId() == clusterControllerDcId;
fitness_workers[std::make_tuple(fitness, id_used[worker_process_id], worker_details.degraded, inCCDC)]
.push_back(worker_details);
}
results.reserve(results.size() + id_worker.size());
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign && !bCompleted; fitness++) {
// FIXME: it's not clear whether this is necessary.
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign; fitness++) {
auto fitnessEnum = (ProcessClass::Fitness)fitness;
for (int addingDegraded = 0; addingDegraded < 2; addingDegraded++) {
auto workerItr = fitness_workers.find(std::make_pair(fitnessEnum, (bool)addingDegraded));
if (workerItr != fitness_workers.end()) {
for (auto& worker : workerItr->second) {
logServerMap->add(worker.interf.locality, &worker);
}
}
fitness_workers[std::make_tuple(fitnessEnum, 0, addingDegraded, false)];
}
}
results.reserve(results.size() + id_worker.size());
for (auto workerIter = fitness_workers.begin(); workerIter != fitness_workers.end(); ++workerIter) {
auto fitness = std::get<0>(workerIter->first);
auto used = std::get<1>(workerIter->first);
auto addingDegraded = std::get<2>(workerIter->first);
ASSERT(fitness < ProcessClass::NeverAssign);
if (bCompleted) {
break;
}
if (logServerSet->size() < (addingDegraded == 0 ? desired : required)) {
} else if (logServerSet->size() == required || logServerSet->size() <= desired) {
if (logServerSet->validate(policy)) {
for (auto& object : logServerMap->getObjects()) {
results.push_back(*object);
}
bCompleted = true;
break;
for (auto& worker : workerIter->second) {
logServerMap->add(worker.interf.locality, &worker);
}
if (logServerSet->size() < (std::get<2>(workerIter->first) ? required : desired)) {
} else if (logServerSet->size() == required || logServerSet->size() <= desired) {
if (logServerSet->validate(policy)) {
for (auto& object : logServerMap->getObjects()) {
results.push_back(*object);
}
TraceEvent(SevWarn, "GWFTADNotAcceptable", id)
bCompleted = true;
break;
}
TraceEvent(SevWarn, "GWFTADNotAcceptable", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("TLogPolicy", policy->info())
.detail("DesiredLogs", desired)
.detail("Used", used)
.detail("AddingDegraded", addingDegraded);
}
// Try to select the desired size, if larger
else {
std::vector<LocalityEntry> bestSet;
std::vector<LocalityData> tLocalities;
// Try to find the best team of servers to fulfill the policy
if (findBestPolicySet(bestSet,
logServerSet,
policy,
desired,
SERVER_KNOBS->POLICY_RATING_TESTS,
SERVER_KNOBS->POLICY_GENERATIONS)) {
results.reserve(results.size() + bestSet.size());
for (auto& entry : bestSet) {
auto object = logServerMap->getObject(entry);
ASSERT(object);
results.push_back(*object);
tLocalities.push_back(object->interf.locality);
}
TraceEvent("GWFTADBestResults", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Used", used)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("TLogPolicy", policy->info())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
}
// Try to select the desired size, if larger
else {
std::vector<LocalityEntry> bestSet;
std::vector<LocalityData> tLocalities;
// Try to find the best team of servers to fulfill the policy
if (findBestPolicySet(bestSet,
logServerSet,
policy,
desired,
SERVER_KNOBS->POLICY_RATING_TESTS,
SERVER_KNOBS->POLICY_GENERATIONS)) {
results.reserve(results.size() + bestSet.size());
for (auto& entry : bestSet) {
auto object = logServerMap->getObject(entry);
ASSERT(object);
results.push_back(*object);
tLocalities.push_back(object->interf.locality);
}
TraceEvent("GWFTADBestResults", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Processes", logServerSet->size())
.detail("BestCount", bestSet.size())
.detail("BestZones", ::describeZones(tLocalities))
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
.detail("TLogPolicy", policy->info())
.detail("TotalResults", results.size())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
bCompleted = true;
break;
}
TraceEvent(SevWarn, "GWFTADNoBest", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("BestCount", bestSet.size())
.detail("BestZones", ::describeZones(tLocalities))
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
.detail("TLogPolicy", policy->info())
.detail("TotalResults", results.size())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
bCompleted = true;
break;
}
TraceEvent(SevWarn, "GWFTADNoBest", id)
.detail("DcIds", dcList)
.detail("Fitness", fitness)
.detail("Used", used)
.detail("Processes", logServerSet->size())
.detail("Required", required)
.detail("TLogPolicy", policy->info())
.detail("DesiredLogs", desired)
.detail("AddingDegraded", addingDegraded);
}
}
@ -1157,12 +1171,14 @@ public:
req.configuration,
used,
first_commit_proxy);
auto grv_proxies = getWorkersForRoleInDatacenter(dcId,
ProcessClass::GrvProxy,
req.configuration.getDesiredGrvProxies(),
req.configuration,
used,
first_grv_proxy);
auto resolvers = getWorkersForRoleInDatacenter(dcId,
ProcessClass::Resolver,
req.configuration.getDesiredResolvers(),
@ -1216,6 +1232,7 @@ public:
}
if (bestDC != clusterControllerDcId) {
TraceEvent("BestDCIsNotClusterDC");
vector<Optional<Key>> dcPriority;
dcPriority.push_back(bestDC);
desiredDcIds.set(dcPriority);

View File

@ -404,9 +404,16 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
nextNominee = *availableLeaders.begin();
}
// If the current leader's priority became worse, we still need to notified all clients because now one
// of them might be better than the leader. In addition, even though FitnessRemote is better than
// FitnessUnknown, we still need to notified clients so that monitorLeaderRemotely has a chance to switch
// from passively monitoring the leader to actively attempting to become the leader.
if (!currentNominee.present() || !nextNominee.present() ||
!currentNominee.get().equalInternalId(nextNominee.get()) ||
nextNominee.get() > currentNominee.get()) {
nextNominee.get() > currentNominee.get() ||
(currentNominee.get().getPriorityInfo().dcFitness ==
ClusterControllerPriorityInfo::FitnessUnknown &&
nextNominee.get().getPriorityInfo().dcFitness == ClusterControllerPriorityInfo::FitnessRemote)) {
TraceEvent("NominatingLeader")
.detail("NextNominee", nextNominee.present() ? nextNominee.get().changeID : UID())
.detail("CurrentNominee", currentNominee.present() ? currentNominee.get().changeID : UID())

View File

@ -720,6 +720,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
~DDTeamCollection() {
TraceEvent("DDTeamCollectionDestructed", distributorId).detail("Primary", primary);
// Cancel the teamBuilder to avoid creating new teams after teams are cancelled.
teamBuilder.cancel();
// TraceEvent("DDTeamCollectionDestructed", distributorId)
// .detail("Primary", primary)
// .detail("TeamBuilderDestroyed", server_info.size());
// Other teamCollections also hold pointer to this teamCollection;
// TeamTracker may access the destructed DDTeamCollection if we do not reset the pointer
for (int i = 0; i < teamCollections.size(); i++) {
@ -756,12 +763,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
info->collection = nullptr;
}
// TraceEvent("DDTeamCollectionDestructed", distributorId)
// .detail("Primary", primary)
// .detail("ServerTrackerDestroyed", server_info.size());
teamBuilder.cancel();
// TraceEvent("DDTeamCollectionDestructed", distributorId)
// .detail("Primary", primary)
// .detail("TeamBuilderDestroyed", server_info.size());
// .detail("Primary", primary)
// .detail("ServerTrackerDestroyed", server_info.size());
}
void addLaggingStorageServer(Key zoneId) {

View File

@ -492,6 +492,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( MAX_REBOOT_TIME, 5.0 ); if( longReboots ) MAX_REBOOT_TIME = 20.0;
init( LOG_DIRECTORY, "."); // Will be set to the command line flag.
init( SERVER_MEM_LIMIT, 8LL << 30 );
init( SYSTEM_MONITOR_FREQUENCY, 5.0 );
//Ratekeeper
bool slowRatekeeper = randomize && BUGGIFY;

View File

@ -416,6 +416,7 @@ public:
double MAX_REBOOT_TIME;
std::string LOG_DIRECTORY;
int64_t SERVER_MEM_LIMIT;
double SYSTEM_MONITOR_FREQUENCY;
// Ratekeeper
double SMOOTHING_AMOUNT;

View File

@ -839,6 +839,7 @@ ACTOR Future<Void> restartSimulatedSystem(vector<Future<Void>>* systemActors,
return Void();
}
// Configuration details compiled in a structure used when setting up a simulated cluster
struct SimulationConfig {
explicit SimulationConfig(const TestConfig& testConfig);
int extraDB;
@ -857,7 +858,7 @@ private:
void generateNormalConfig(const TestConfig& testConfig);
};
SimulationConfig::SimulationConfig(const TestConfig& testConfig) {
SimulationConfig::SimulationConfig(const TestConfig& testConfig) : extraDB(testConfig.extraDB) {
generateNormalConfig(testConfig);
}
@ -1179,9 +1180,6 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
regionArr.push_back(remoteObj);
}
set_config("regions=" +
json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none));
if (needsRemote) {
g_simulator.originalRegions = "regions=" + json_spirit::write_string(json_spirit::mValue(regionArr),
json_spirit::Output_options::none);
@ -1195,6 +1193,11 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
disableRemote[1].get_obj()["datacenters"].get_array()[0].get_obj()["priority"] = -1;
g_simulator.disableRemote = "regions=" + json_spirit::write_string(json_spirit::mValue(disableRemote),
json_spirit::Output_options::none);
} else {
// In order to generate a starting configuration with the remote disabled, do not apply the region
// configuration to the DatabaseConfiguration until after creating the starting conf string.
set_config("regions=" +
json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none));
}
}
@ -1276,6 +1279,12 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
}
}
if (g_simulator.originalRegions != "") {
simconfig.set_config(g_simulator.originalRegions);
g_simulator.startingDisabledConfiguration = startingConfigString + " " + g_simulator.disableRemote;
startingConfigString += " " + g_simulator.originalRegions;
}
g_simulator.storagePolicy = simconfig.db.storagePolicy;
g_simulator.tLogPolicy = simconfig.db.tLogPolicy;
g_simulator.tLogWriteAntiQuorum = simconfig.db.tLogWriteAntiQuorum;

View File

@ -1585,7 +1585,7 @@ static JsonBuilderObject configurationFetcher(Optional<DatabaseConfiguration> co
ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
DatabaseConfiguration configuration,
int* minReplicasRemaining) {
int* minStorageReplicasRemaining) {
state JsonBuilderObject statusObjData;
try {
@ -1648,9 +1648,9 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
}
JsonBuilderArray teamTrackers;
for (int i = 0; i < 2; i++) {
TraceEventFields inFlight = dataInfo[3 + i];
if (!inFlight.size()) {
for (int i = 3; i < 5; i++) {
const TraceEventFields& inFlight = dataInfo[i];
if (inFlight.size() == 0) {
continue;
}
@ -1674,19 +1674,16 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
stateSectionObj["healthy"] = false;
stateSectionObj["name"] = "missing_data";
stateSectionObj["description"] = "No replicas remain of some data";
stateSectionObj["min_replicas_remaining"] = 0;
replicas = 0;
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_1_LEFT) {
stateSectionObj["healthy"] = false;
stateSectionObj["name"] = "healing";
stateSectionObj["description"] = "Only one replica remains of some data";
stateSectionObj["min_replicas_remaining"] = 1;
replicas = 1;
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) {
stateSectionObj["healthy"] = false;
stateSectionObj["name"] = "healing";
stateSectionObj["description"] = "Only two replicas remain of some data";
stateSectionObj["min_replicas_remaining"] = 2;
replicas = 2;
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
stateSectionObj["healthy"] = false;
@ -1720,6 +1717,10 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
stateSectionObj["name"] = "healthy";
}
// Track the number of min replicas the storage servers in this region has. The sum of the replicas from
// both primary and remote region give the total number of data replicas this database currently has.
stateSectionObj["min_replicas_remaining"] = replicas;
if (!stateSectionObj.empty()) {
team_tracker["state"] = stateSectionObj;
teamTrackers.push_back(team_tracker);
@ -1728,10 +1729,13 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
}
}
// Update minStorageReplicasRemaining. It is mainly used for fault tolerance computation later. Note that
// FDB treats the entire remote region as one zone, and all the zones in the remote region are in the same
// failure domain.
if (primary) {
*minReplicasRemaining = std::max(*minReplicasRemaining, 0) + replicas;
*minStorageReplicasRemaining = std::max(*minStorageReplicasRemaining, 0) + replicas;
} else if (replicas > 0) {
*minReplicasRemaining = std::max(*minReplicasRemaining, 0) + 1;
*minStorageReplicasRemaining = std::max(*minStorageReplicasRemaining, 0) + 1;
}
}
statusObjData["team_trackers"] = teamTrackers;
@ -1850,7 +1854,7 @@ ACTOR static Future<vector<std::pair<GrvProxyInterface, EventMap>>> getGrvProxie
return results;
}
// Returns the number of zones eligble for recruiting new tLogs after failures, to maintain the current replication
// Returns the number of zones eligble for recruiting new tLogs after zone failures, to maintain the current replication
// factor.
static int getExtraTLogEligibleZones(const vector<WorkerDetails>& workers, const DatabaseConfiguration& configuration) {
std::set<StringRef> allZones;
@ -1868,17 +1872,20 @@ static int getExtraTLogEligibleZones(const vector<WorkerDetails>& workers, const
if (configuration.regions.size() == 0) {
return allZones.size() - std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize);
}
int extraTlogEligibleZones = 0;
int regionsWithNonNegativePriority = 0;
for (auto& region : configuration.regions) {
int maxRequiredReplicationFactor =
std::max(configuration.remoteTLogReplicationFactor,
std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize));
for (const auto& region : configuration.regions) {
if (region.priority >= 0) {
int eligible = dcId_zone[region.dcId].size() -
std::max(configuration.remoteTLogReplicationFactor,
std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize));
int eligible = dcId_zone[region.dcId].size() - maxRequiredReplicationFactor;
// FIXME: does not take into account fallback satellite policies
if (region.satelliteTLogReplicationFactor > 0 && configuration.usableRegions > 1) {
int totalSatelliteEligible = 0;
for (auto& sat : region.satellites) {
for (const auto& sat : region.satellites) {
totalSatelliteEligible += dcId_zone[sat.dcId].size();
}
eligible = std::min<int>(eligible, totalSatelliteEligible - region.satelliteTLogReplicationFactor);
@ -1890,6 +1897,8 @@ static int getExtraTLogEligibleZones(const vector<WorkerDetails>& workers, const
}
}
if (regionsWithNonNegativePriority > 1) {
// If the database is replicated across multiple regions, we can afford to lose one entire region without
// losing data.
extraTlogEligibleZones++;
}
return extraTlogEligibleZones;
@ -2229,8 +2238,8 @@ static JsonBuilderObject tlogFetcher(int* logFaultTolerance,
int minFaultTolerance = 1000;
int localSetsWithNonNegativeFaultTolerance = 0;
for (int i = 0; i < tLogs.size(); i++) {
if (tLogs[i].tLogs.size() == 0) {
for (const auto& tLogSet : tLogs) {
if (tLogSet.tLogs.size() == 0) {
// We can have LogSets where there are no tLogs but some LogRouters. It's the way
// recruiting is implemented for old LogRouters in TagPartitionedLogSystem, where
// it adds an empty LogSet for missing locality.
@ -2238,7 +2247,7 @@ static JsonBuilderObject tlogFetcher(int* logFaultTolerance,
}
int failedLogs = 0;
for (auto& log : tLogs[i].tLogs) {
for (auto& log : tLogSet.tLogs) {
JsonBuilderObject logObj;
bool failed = !log.present() || !address_workers.count(log.interf().address());
logObj["id"] = log.id().shortString();
@ -2252,14 +2261,14 @@ static JsonBuilderObject tlogFetcher(int* logFaultTolerance,
}
}
if (tLogs[i].isLocal) {
ASSERT_WE_THINK(tLogs[i].tLogReplicationFactor > 0);
int currentFaultTolerance = tLogs[i].tLogReplicationFactor - 1 - tLogs[i].tLogWriteAntiQuorum - failedLogs;
if (tLogSet.isLocal) {
ASSERT_WE_THINK(tLogSet.tLogReplicationFactor > 0);
int currentFaultTolerance = tLogSet.tLogReplicationFactor - 1 - tLogSet.tLogWriteAntiQuorum - failedLogs;
if (currentFaultTolerance >= 0) {
localSetsWithNonNegativeFaultTolerance++;
}
if (tLogs[i].locality == tagLocalitySatellite) {
if (tLogSet.locality == tagLocalitySatellite) {
// FIXME: This hack to bump satellite fault tolerance, is to make it consistent
// with 6.2.
minFaultTolerance = std::min(minFaultTolerance, currentFaultTolerance + 1);
@ -2268,17 +2277,17 @@ static JsonBuilderObject tlogFetcher(int* logFaultTolerance,
}
}
if (tLogs[i].isLocal && tLogs[i].locality == tagLocalitySatellite) {
sat_log_replication_factor = tLogs[i].tLogReplicationFactor;
sat_log_write_anti_quorum = tLogs[i].tLogWriteAntiQuorum;
sat_log_fault_tolerance = tLogs[i].tLogReplicationFactor - 1 - tLogs[i].tLogWriteAntiQuorum - failedLogs;
} else if (tLogs[i].isLocal) {
log_replication_factor = tLogs[i].tLogReplicationFactor;
log_write_anti_quorum = tLogs[i].tLogWriteAntiQuorum;
log_fault_tolerance = tLogs[i].tLogReplicationFactor - 1 - tLogs[i].tLogWriteAntiQuorum - failedLogs;
if (tLogSet.isLocal && tLogSet.locality == tagLocalitySatellite) {
sat_log_replication_factor = tLogSet.tLogReplicationFactor;
sat_log_write_anti_quorum = tLogSet.tLogWriteAntiQuorum;
sat_log_fault_tolerance = tLogSet.tLogReplicationFactor - 1 - tLogSet.tLogWriteAntiQuorum - failedLogs;
} else if (tLogSet.isLocal) {
log_replication_factor = tLogSet.tLogReplicationFactor;
log_write_anti_quorum = tLogSet.tLogWriteAntiQuorum;
log_fault_tolerance = tLogSet.tLogReplicationFactor - 1 - tLogSet.tLogWriteAntiQuorum - failedLogs;
} else {
remote_log_replication_factor = tLogs[i].tLogReplicationFactor;
remote_log_fault_tolerance = tLogs[i].tLogReplicationFactor - 1 - failedLogs;
remote_log_replication_factor = tLogSet.tLogReplicationFactor;
remote_log_fault_tolerance = tLogSet.tLogReplicationFactor - 1 - failedLogs;
}
}
if (minFaultTolerance == 1000) {
@ -2321,6 +2330,8 @@ static JsonBuilderArray tlogFetcher(int* logFaultTolerance,
std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) {
JsonBuilderArray tlogsArray;
JsonBuilderObject tlogsStatus;
// First, fetch from the current TLog generation.
tlogsStatus = tlogFetcher(logFaultTolerance, db->get().logSystemConfig.tLogs, address_workers);
tlogsStatus["epoch"] = db->get().logSystemConfig.epoch;
tlogsStatus["current"] = true;
@ -2328,6 +2339,8 @@ static JsonBuilderArray tlogFetcher(int* logFaultTolerance,
tlogsStatus["begin_version"] = db->get().logSystemConfig.recoveredAt.get();
}
tlogsArray.push_back(tlogsStatus);
// fetch all the old generations of TLogs.
for (auto it : db->get().logSystemConfig.oldTLogs) {
JsonBuilderObject oldTlogsStatus = tlogFetcher(logFaultTolerance, it.tLogs, address_workers);
oldTlogsStatus["epoch"] = it.epoch;
@ -2343,7 +2356,7 @@ static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration confi
ServerCoordinators coordinators,
std::vector<WorkerDetails>& workers,
int extraTlogEligibleZones,
int minReplicasRemaining,
int minStorageReplicasRemaining,
int oldLogFaultTolerance,
int fullyReplicatedRegions,
bool underMaintenance) {
@ -2383,8 +2396,8 @@ static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration confi
// max zone failures that we can tolerate to not lose data
int zoneFailuresWithoutLosingData = std::min(maxZoneFailures, maxCoordinatorZoneFailures);
if (minReplicasRemaining >= 0) {
zoneFailuresWithoutLosingData = std::min(zoneFailuresWithoutLosingData, minReplicasRemaining - 1);
if (minStorageReplicasRemaining >= 0) {
zoneFailuresWithoutLosingData = std::min(zoneFailuresWithoutLosingData, minStorageReplicasRemaining - 1);
}
// oldLogFaultTolerance means max failures we can tolerate to lose logs data. -1 means we lose data or availability.
@ -2633,10 +2646,9 @@ ACTOR Future<StatusReply> clusterGetStatus(
Version datacenterVersionDifference) {
state double tStart = timer();
// Check if master worker is present
state JsonBuilderArray messages;
state std::set<std::string> status_incomplete_reasons;
state WorkerDetails mWorker;
state WorkerDetails mWorker; // Master worker
state WorkerDetails ddWorker; // DataDistributor worker
state WorkerDetails rkWorker; // Ratekeeper worker
@ -2649,6 +2661,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
messages.push_back(
JsonString::makeMessage("unreachable_master_worker", "Unable to locate the master worker."));
}
// Get the DataDistributor worker interface
Optional<WorkerDetails> _ddWorker;
if (db->get().distributor.present()) {
@ -2677,12 +2690,12 @@ ACTOR Future<StatusReply> clusterGetStatus(
// Get latest events for various event types from ALL workers
// WorkerEvents is a map of worker's NetworkAddress to its event string
// The pair represents worker responses and a set of worker NetworkAddress strings which did not respond
// The pair represents worker responses and a set of worker NetworkAddress strings which did not respond.
std::vector<Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>>> futures;
futures.push_back(latestEventOnWorkers(workers, "MachineMetrics"));
futures.push_back(latestEventOnWorkers(workers, "ProcessMetrics"));
futures.push_back(latestEventOnWorkers(workers, "NetworkMetrics"));
futures.push_back(latestErrorOnWorkers(workers));
futures.push_back(latestErrorOnWorkers(workers)); // Get all latest errors.
futures.push_back(latestEventOnWorkers(workers, "TraceFileOpenError"));
futures.push_back(latestEventOnWorkers(workers, "ProgramStart"));
@ -2697,13 +2710,13 @@ ACTOR Future<StatusReply> clusterGetStatus(
// For each (optional) pair, if the pair is present and not empty then add the unreachable workers to the set.
for (auto pair : workerEventsVec) {
if (pair.present() && pair.get().second.size())
if (pair.present() && !pair.get().second.empty())
mergeUnreachable.insert(pair.get().second.begin(), pair.get().second.end());
}
// We now have a unique set of workers who were in some way unreachable. If there is anything in that set,
// create a message for it and include the list of unreachable processes.
if (mergeUnreachable.size()) {
if (!mergeUnreachable.empty()) {
JsonBuilderObject message =
JsonBuilder::makeMessage("unreachable_processes", "The cluster has some unreachable processes.");
JsonBuilderArray unreachableProcs;
@ -2814,11 +2827,11 @@ ACTOR Future<StatusReply> clusterGetStatus(
state Future<ErrorOr<vector<std::pair<GrvProxyInterface, EventMap>>>> grvProxyFuture =
errorOr(getGrvProxiesAndMetrics(db, address_workers));
state int minReplicasRemaining = -1;
state int minStorageReplicasRemaining = -1;
state int fullyReplicatedRegions = -1;
state Future<Optional<Value>> primaryDCFO = getActivePrimaryDC(cx, &fullyReplicatedRegions, &messages);
std::vector<Future<JsonBuilderObject>> futures2;
futures2.push_back(dataStatusFetcher(ddWorker, configuration.get(), &minReplicasRemaining));
futures2.push_back(dataStatusFetcher(ddWorker, configuration.get(), &minStorageReplicasRemaining));
futures2.push_back(workloadStatusFetcher(
db, workers, mWorker, rkWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons));
@ -2833,18 +2846,16 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["logs"] = tlogFetcher(&logFaultTolerance, db, address_workers);
}
if (configuration.present()) {
int extraTlogEligibleZones = getExtraTLogEligibleZones(workers, configuration.get());
statusObj["fault_tolerance"] =
faultToleranceStatusFetcher(configuration.get(),
coordinators,
workers,
extraTlogEligibleZones,
minReplicasRemaining,
logFaultTolerance,
fullyReplicatedRegions,
loadResult.present() && loadResult.get().healthyZone.present());
}
int extraTlogEligibleZones = getExtraTLogEligibleZones(workers, configuration.get());
statusObj["fault_tolerance"] =
faultToleranceStatusFetcher(configuration.get(),
coordinators,
workers,
extraTlogEligibleZones,
minStorageReplicasRemaining,
logFaultTolerance,
fullyReplicatedRegions,
loadResult.present() && loadResult.get().healthyZone.present());
state JsonBuilderObject configObj =
configurationFetcher(configuration, coordinators, &status_incomplete_reasons);

View File

@ -99,6 +99,8 @@ struct WorkloadRequest {
}
};
// Configuration details specified in workload test files that change the simulation
// environment details
struct TestConfig {
int extraDB = 0;
int minimumReplication = 0;

View File

@ -464,7 +464,7 @@ Future<Void> startSystemMonitor(std::string dataFolder,
SystemMonitorMachineState(dataFolder, dcId, zoneId, machineId, g_network->getLocalAddress().ip));
systemMonitor();
return recurring(&systemMonitor, 5.0, TaskPriority::FlushTrace);
return recurring(&systemMonitor, SERVER_KNOBS->SYSTEM_MONITOR_FREQUENCY, TaskPriority::FlushTrace);
}
void testIndexedSet();

View File

@ -363,6 +363,64 @@ TestWorkload* getWorkloadIface(WorkloadRequest work, Reference<AsyncVar<ServerDB
return compound;
}
/**
* Only works in simulation. This method prints all simulated processes in a human readable form to stdout. It groups
* processes by data center, data hall, zone, and machine (in this order).
*/
void printSimulatedTopology() {
if (!g_network->isSimulated()) {
return;
}
auto processes = g_simulator.getAllProcesses();
std::sort(processes.begin(), processes.end(), [](ISimulator::ProcessInfo* lhs, ISimulator::ProcessInfo* rhs) {
auto l = lhs->locality;
auto r = rhs->locality;
if (l.dcId() != r.dcId()) {
return l.dcId() < r.dcId();
}
if (l.dataHallId() != r.dataHallId()) {
return l.dataHallId() < r.dataHallId();
}
if (l.zoneId() != r.zoneId()) {
return l.zoneId() < r.zoneId();
}
if (l.machineId() != r.zoneId()) {
return l.machineId() < r.machineId();
}
return lhs->address < rhs->address;
});
printf("Simulated Cluster Topology:\n");
printf("===========================\n");
Optional<Standalone<StringRef>> dcId, dataHallId, zoneId, machineId;
for (auto p : processes) {
std::string indent = "";
if (dcId != p->locality.dcId()) {
dcId = p->locality.dcId();
printf("%sdcId: %s\n", indent.c_str(), p->locality.describeDcId().c_str());
}
indent += " ";
if (dataHallId != p->locality.dataHallId()) {
dataHallId = p->locality.dataHallId();
printf("%sdataHallId: %s\n", indent.c_str(), p->locality.describeDataHall().c_str());
}
indent += " ";
if (zoneId != p->locality.zoneId()) {
zoneId = p->locality.zoneId();
printf("%szoneId: %s\n", indent.c_str(), p->locality.describeZone().c_str());
}
indent += " ";
if (machineId != p->locality.machineId()) {
machineId = p->locality.machineId();
printf("%smachineId: %s\n", indent.c_str(), p->locality.describeMachineId().c_str());
}
indent += " ";
printf("%sAddress: %s\n", indent.c_str(), p->address.toString().c_str(), p->name);
indent += " ";
printf("%sClass: %s\n", indent.c_str(), p->startingClass.toString().c_str());
printf("%sName: %s\n", indent.c_str(), p->name);
}
}
ACTOR Future<Void> databaseWarmer(Database cx) {
loop {
state Transaction tr(cx);
@ -977,7 +1035,9 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
TraceEvent("TestParserTest").detail("ClientInfoLogging", value);
} },
{ "startIncompatibleProcess",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value); } }
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStartIncompatibleProcess", value); } },
{ "storageEngineExcludeType",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStorageEngineExcludeType", ""); } }
};
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {
@ -1291,6 +1351,24 @@ ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterContro
}
}
/**
* \brief Test orchestrator: sends test specification to testers in the right order and collects the results.
*
* There are multiple actors in this file with similar names (runTest, runTests) and slightly different signatures.
*
* This is the actual orchestrator. It reads the test specifications (from tests), prepares the cluster (by running the
* configure command given in startingConfiguration) and then runs the workload.
*
* \param cc The cluster controller interface
* \param ci Same as cc.clientInterface
* \param testers The interfaces of the testers that should run the actual workloads
* \param tests The test specifications to run
* \param startingConfiguration If non-empty, the orchestrator will attempt to set this configuration before starting
* the tests.
* \param locality client locality (it seems this is unused?)
*
* \returns A future which will be set after all tests finished.
*/
ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc,
Reference<AsyncVar<Optional<struct ClusterInterface>>> ci,
vector<TesterInterface> testers,
@ -1346,6 +1424,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
// Change the configuration (and/or create the database) if necessary
printf("startingConfiguration:%s start\n", startingConfiguration.toString().c_str());
printSimulatedTopology();
if (useDB && startingConfiguration != StringRef()) {
try {
wait(timeoutError(changeConfiguration(cx, testers, startingConfiguration), 2000.0));
@ -1402,6 +1481,24 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
return Void();
}
/**
* \brief Proxy function that waits until enough testers are available and then calls into the orchestrator.
*
* There are multiple actors in this file with similar names (runTest, runTests) and slightly different signatures.
*
* This actor wraps the actual orchestrator (also called runTests). But before calling that actor, it waits for enough
* testers to come up.
*
* \param cc The cluster controller interface
* \param ci Same as cc.clientInterface
* \param tests The test specifications to run
* \param minTestersExpected The number of testers to expect. This actor will block until it can find this many testers.
* \param startingConfiguration If non-empty, the orchestrator will attempt to set this configuration before starting
* the tests.
* \param locality client locality (it seems this is unused?)
*
* \returns A future which will be set after all tests finished.
*/
ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc,
Reference<AsyncVar<Optional<struct ClusterInterface>>> ci,
vector<TestSpec> tests,
@ -1443,6 +1540,32 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
return Void();
}
/**
* \brief Set up testing environment and run the given tests on a cluster.
*
* There are multiple actors in this file with similar names (runTest, runTests) and slightly different signatures.
*
* This actor is usually the first entry point into the test environment. It itself doesn't implement too much
* functionality. Its main purpose is to generate the test specification from passed arguments and then call into the
* correct actor which will orchestrate the actual test.
*
* \param connFile A cluster connection file. Not all tests require a functional cluster but all tests require
* a cluster file.
* \param whatToRun TEST_TYPE_FROM_FILE to read the test description from a passed toml file or
* TEST_TYPE_CONSISTENCY_CHECK to generate a test spec for consistency checking
* \param at TEST_HERE: this process will act as a test client and execute the given workload. TEST_ON_SERVERS: Run a
* test client on every worker in the cluster. TEST_ON_TESTERS: Run a test client on all servers with class Test
* \param minTestersExpected In at is not TEST_HERE, this will instruct the orchestrator until it can find at least
* minTestersExpected test-clients. This is usually passed through from a command line argument. In simulation, the
* simulator will pass the number of testers that it started.
* \param fileName The path to the toml-file containing the test description. Is ignored if whatToRun !=
* TEST_TYPE_FROM_FILE
* \param startingConfiguration Can be used to configure a cluster before running the test. If this is an empty string,
* it will be ignored, otherwise it will be passed to changeConfiguration.
* \param locality The client locality to be used. This is only used if at == TEST_HERE
*
* \returns A future which will be set after all tests finished.
*/
ACTOR Future<Void> runTests(Reference<ClusterConnectionFile> connFile,
test_type_t whatToRun,
test_location_t at,

View File

@ -52,6 +52,8 @@ struct ChangeConfigWorkload : TestWorkload {
void getMetrics(vector<PerfMetric>& m) override {}
// When simulated two clusters for DR tests, this actor sets the starting configuration
// for the extra cluster.
ACTOR Future<Void> extraDatabaseConfigure(ChangeConfigWorkload* self) {
if (g_network->isSimulated() && g_simulator.extraDB) {
auto extraFile = makeReference<ClusterConnectionFile>(*g_simulator.extraDB);
@ -59,10 +61,15 @@ struct ChangeConfigWorkload : TestWorkload {
wait(delay(5 * deterministicRandom()->random01()));
if (self->configMode.size()) {
if (g_simulator.startingDisabledConfiguration != "") {
// It is not safe to allow automatic failover to a region which is not fully replicated,
// so wait for both regions to be fully replicated before enabling failover
wait(success(changeConfig(extraDB, g_simulator.startingDisabledConfiguration, true)));
TraceEvent("WaitForReplicasExtra");
wait(waitForFullReplication(extraDB));
TraceEvent("WaitForReplicasExtraEnd");
}
wait(success(changeConfig(extraDB, self->configMode, true)));
TraceEvent("WaitForReplicasExtra");
wait(waitForFullReplication(extraDB));
TraceEvent("WaitForReplicasExtraEnd");
}
if (self->networkAddresses.size()) {
if (self->networkAddresses == "auto")
@ -75,6 +82,8 @@ struct ChangeConfigWorkload : TestWorkload {
return Void();
}
// Either changes the database configuration, or changes the coordinators based on the parameters
// of the workload.
ACTOR Future<Void> ChangeConfigClient(Database cx, ChangeConfigWorkload* self) {
wait(delay(self->minDelayBeforeChange +
deterministicRandom()->random01() * (self->maxDelayBeforeChange - self->minDelayBeforeChange)));
@ -86,10 +95,15 @@ struct ChangeConfigWorkload : TestWorkload {
}
if (self->configMode.size()) {
if (g_network->isSimulated() && g_simulator.startingDisabledConfiguration != "") {
// It is not safe to allow automatic failover to a region which is not fully replicated,
// so wait for both regions to be fully replicated before enabling failover
wait(success(changeConfig(cx, g_simulator.startingDisabledConfiguration, true)));
TraceEvent("WaitForReplicas");
wait(waitForFullReplication(cx));
TraceEvent("WaitForReplicasEnd");
}
wait(success(changeConfig(cx, self->configMode, true)));
TraceEvent("WaitForReplicas");
wait(waitForFullReplication(cx));
TraceEvent("WaitForReplicasEnd");
}
if (self->networkAddresses.size()) {
if (self->networkAddresses == "auto")

View File

@ -74,6 +74,7 @@ struct LowLatencyWorkload : TestWorkload {
++self->operations;
loop {
try {
TraceEvent("StartLowLatencyTransaction");
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
if (doCommit) {
@ -84,6 +85,7 @@ struct LowLatencyWorkload : TestWorkload {
}
break;
} catch (Error& e) {
TraceEvent("LowLatencyTransactionFailed").error(e, true);
wait(tr.onError(e));
++self->retries;
}

View File

@ -184,6 +184,12 @@ Reference<IRandom> deterministicRandom();
// non-deterministic contexts.
Reference<IRandom> nondeterministicRandom();
// This returns a deterministic random number generator initialized with the same seed as the one returned by
// deterministicRandom. The main use-case for this is to generate deterministic random numbers without changing the
// determinism of the simulator. This is useful for things like generating random UIDs for debug transactions.
// WARNING: This is not thread safe and must not be called from any other thread than the network thread!
Reference<IRandom> debugRandom();
// Populates a buffer with a random sequence of bytes
void generateRandomData(uint8_t* buffer, int length);

View File

@ -135,6 +135,12 @@ thread_local INetwork* thread_network = 0;
class Net2 final : public INetwork, public INetworkConnections {
private:
void updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats,
TaskPriority priority,
TaskPriority lastPriority,
double now);
public:
Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics);
void initTLS(ETLSInitState targetState) override;
@ -1582,6 +1588,28 @@ void Net2::run() {
#endif
}
// Updates the PriorityStats found in NetworkMetrics
void Net2::updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats,
TaskPriority priority,
TaskPriority lastPriority,
double now) {
// Busy -> idle at binStats.priority
if (binStats.priority > priority && binStats.priority <= lastPriority) {
binStats.active = false;
binStats.duration += now - binStats.windowedTimer;
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
}
// Idle -> busy at binStats.priority
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
binStats.active = true;
binStats.timer = now;
binStats.windowedTimer = now;
}
}
// Update both vectors of starvation trackers (one that updates every 5s and the other every 1s)
void Net2::trackAtPriority(TaskPriority priority, double now) {
if (lastPriorityStats == nullptr || priority != lastPriorityStats->priority) {
// Start tracking current priority
@ -1601,22 +1629,12 @@ void Net2::trackAtPriority(TaskPriority priority, double now) {
if (binStats.priority > lastPriority && binStats.priority > priority) {
break;
}
// Busy -> idle at binStats.priority
if (binStats.priority > priority && binStats.priority <= lastPriority) {
binStats.active = false;
binStats.duration += now - binStats.windowedTimer;
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
}
// Idle -> busy at binStats.priority
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
binStats.active = true;
binStats.timer = now;
binStats.windowedTimer = now;
}
updateStarvationTracker(binStats, priority, lastPriority, now);
}
// Update starvation trackers for network busyness
updateStarvationTracker(networkInfo.metrics.starvationTrackerNetworkBusyness, priority, lastPriority, now);
lastPriorityStats = &activeStatsItr.first->second;
}
}

View File

@ -47,11 +47,17 @@ INetwork* g_network = 0;
FILE* randLog = 0;
thread_local Reference<IRandom> seededRandom;
Reference<IRandom> seededDebugRandom;
uint64_t debug_lastLoadBalanceResultEndpointToken = 0;
bool noUnseed = false;
void setThreadLocalDeterministicRandomSeed(uint32_t seed) {
seededRandom = Reference<IRandom>(new DeterministicRandom(seed, true));
seededDebugRandom = Reference<IRandom>(new DeterministicRandom(seed));
}
Reference<IRandom> debugRandom() {
return seededDebugRandom;
}
Reference<IRandom> deterministicRandom() {

View File

@ -27,6 +27,7 @@
#include <string>
#include <stdint.h>
#include <variant>
#include <atomic>
#include "boost/asio.hpp"
#ifndef TLS_DISABLED
#include "boost/asio/ssl.hpp"
@ -320,6 +321,7 @@ class Future;
template <class T>
class Promise;
// Metrics which represent various network properties
struct NetworkMetrics {
enum { SLOW_EVENT_BINS = 16 };
uint64_t countSlowEvents[SLOW_EVENT_BINS] = {};
@ -340,16 +342,37 @@ struct NetworkMetrics {
};
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
double lastRunLoopBusyness;
double lastRunLoopBusyness; // network thread busyness (measured every 5s by default)
std::atomic<double> networkBusyness; // network thread busyness which is returned to the the client (measured every 1s by default)
// starvation trackers which keeps track of different task priorities
std::vector<struct PriorityStats> starvationTrackers;
struct PriorityStats starvationTrackerNetworkBusyness;
static const std::vector<int> starvationBins;
NetworkMetrics() : lastRunLoopBusyness(0) {
for (int priority : starvationBins) {
NetworkMetrics()
: lastRunLoopBusyness(0), networkBusyness(0),
starvationTrackerNetworkBusyness(PriorityStats(static_cast<TaskPriority>(starvationBins.at(0)))) {
for (int priority : starvationBins) { // initalize starvation trackers with given priorities
starvationTrackers.emplace_back(static_cast<TaskPriority>(priority));
}
}
// Since networkBusyness is atomic we need to redefine copy assignment operator
NetworkMetrics& operator=(const NetworkMetrics& rhs) {
for (int i = 0; i < SLOW_EVENT_BINS; i++) {
countSlowEvents[i] = rhs.countSlowEvents[i];
}
secSquaredSubmit = rhs.secSquaredSubmit;
secSquaredDiskStall = rhs.secSquaredDiskStall;
activeTrackers = rhs.activeTrackers;
lastRunLoopBusyness = rhs.lastRunLoopBusyness;
networkBusyness = rhs.networkBusyness.load();
starvationTrackers = rhs.starvationTrackers;
starvationTrackerNetworkBusyness = rhs.starvationTrackerNetworkBusyness;
return *this;
}
};
struct FlowLock;

View File

@ -1,3 +1,4 @@
storageEngineExcludeType=-1
testTitle=Clogged
clearAfterTest=false
testName=Cycle

View File

@ -1,3 +1,4 @@
storageEngineExcludeType=-1
testTitle=Clogged
runSetup=false
testName=Cycle