Merge pull request #3171 from apple/release-6.3

Merge Release 6.3 into Master
This commit is contained in:
Alvin Moore 2020-05-14 10:00:47 -07:00 committed by GitHub
commit a160f9199f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 247 additions and 83 deletions

View File

@ -22,16 +22,17 @@
import ctypes
import ctypes.util
import datetime
import functools
import inspect
import multiprocessing
import os
import platform
import sys
import threading
import traceback
import inspect
import datetime
import platform
import os
import sys
import multiprocessing
import fdb
from fdb import six
_network_thread = None
@ -203,7 +204,9 @@ def transactional(*tr_args, **tr_kwargs):
It is important to note that the wrapped method may be called
multiple times in the event of a commit failure, until the commit
succeeds.
succeeds. This restriction requires that the wrapped function
may not be a generator, or a function that returns a closure that
contains the `tr` object.
If given a Transaction, the Transaction will be passed into the
wrapped code, and WILL NOT be committed at completion of the
@ -247,9 +250,14 @@ def transactional(*tr_args, **tr_kwargs):
except FDBError as e:
yield asyncio.From(tr.on_error(e.code))
else:
@functools.wraps(func)
def wrapper(*args, **kwargs):
# We can't throw this from the decorator, as when a user runs
# >>> import fdb ; fdb.api_version(630)
# the code above uses @transactional before the API version is set
if fdb.get_api_version() >= 630 and inspect.isgeneratorfunction(func):
raise ValueError("Generators can not be wrapped with fdb.transactional")
if isinstance(args[index], TransactionRead):
return func(*args, **kwargs)
@ -262,8 +270,11 @@ def transactional(*tr_args, **tr_kwargs):
# last = start
while not committed:
ret = None
try:
ret = func(*largs, **kwargs)
if fdb.get_api_version() >= 630 and inspect.isgenerator(ret):
raise ValueError("Generators can not be wrapped with fdb.transactional")
tr.commit().wait()
committed = True
except FDBError as e:

View File

@ -127,6 +127,29 @@ class Instruction:
self.stack.push(self.index, val)
def test_fdb_transactional_generator(db):
try:
@fdb.transactional
def function_that_yields(tr):
yield 0
assert fdb.get_api_version() < 630, "Pre-6.3, a decorator may wrap a function that yields"
except ValueError as e:
assert fdb.get_api_version() >= 630, "Post-6.3, a decorator should throw if wrapped function yields"
def test_fdb_transactional_returns_generator(db):
try:
def function_that_yields(tr):
yield 0
@fdb.transactional
def function_that_returns(tr):
return function_that_yields(tr)
function_that_returns()
assert fdb.get_api_version() < 630, "Pre-6.3, returning a generator is allowed"
except ValueError as e:
assert fdb.get_api_version() >= 630, "Post-6.3, returning a generator should throw"
def test_db_options(db):
db.options.set_location_cache_size(100001)
db.options.set_max_watches(100001)

View File

@ -3,14 +3,16 @@ FROM centos:6
# Install dependencies for developer tools, bindings,\
# documentation, actorcompiler, and packaging tools\
RUN yum install -y yum-utils &&\
yum-config-manager --enable rhel-server-rhscl-7-rpms &&\
yum -y install centos-release-scl epel-release &&\
yum -y install devtoolset-8-8.1-1.el6 java-1.8.0-openjdk-devel \
devtoolset-8-gcc-8.3.1-3.1.el6 devtoolset-8-gcc-c++-8.3.1-3.1.el6 \
rh-python36-python-devel devtoolset-8-valgrind-devel \
mono-core rh-ruby24 golang python27 rpm-build debbuild \
python-pip dos2unix valgrind-devel ccache distcc devtoolset-8-libubsan-devel libubsan-devel &&\
pip install boto3==1.1.1
yum-config-manager --enable rhel-server-rhscl-7-rpms &&\
yum -y install centos-release-scl epel-release \
http://opensource.wandisco.com/centos/6/git/x86_64/wandisco-git-release-6-1.noarch.rpm &&\
yum -y install devtoolset-8-8.1-1.el6 java-1.8.0-openjdk-devel \
devtoolset-8-gcc-8.3.1 devtoolset-8-gcc-c++-8.3.1 \
devtoolset-8-libubsan-devel devtoolset-8-valgrind-devel \
rh-python36-python-devel rh-ruby24 golang python27 rpm-build \
mono-core debbuild python-pip dos2unix valgrind-devel ccache \
distcc wget git &&\
pip install boto3==1.1.1
USER root
@ -24,8 +26,8 @@ RUN cd /opt/ &&\
tar -xjf boost_1_67_0.tar.bz2 &&\
rm -rf boost_1_67_0.tar.bz2 boost-sha-67.txt boost_1_67_0/libs &&\
curl -L https://dl.bintray.com/boostorg/release/1.72.0/source/boost_1_72_0.tar.bz2 -o boost_1_72_0.tar.bz2 &&\
echo "59c9b274bc451cf91a9ba1dd2c7fdcaf5d60b1b3aa83f2c9fa143417cc660722 boost_1_72_0.tar.bz2" > boost-sha-72.txt &&\
sha256sum -c boost-sha-72.txt &&\
echo "59c9b274bc451cf91a9ba1dd2c7fdcaf5d60b1b3aa83f2c9fa143417cc660722 boost_1_72_0.tar.bz2" > boost-sha-72.txt &&\
sha256sum -c boost-sha-72.txt &&\
tar -xjf boost_1_72_0.tar.bz2 &&\
rm -rf boost_1_72_0.tar.bz2 boost-sha-72.txt boost_1_72_0/libs
@ -34,8 +36,8 @@ RUN curl -L https://github.com/Kitware/CMake/releases/download/v3.13.4/cmake-3.1
echo "563a39e0a7c7368f81bfa1c3aff8b590a0617cdfe51177ddc808f66cc0866c76 /tmp/cmake.tar.gz" > /tmp/cmake-sha.txt &&\
sha256sum -c /tmp/cmake-sha.txt &&\
cd /tmp && tar xf cmake.tar.gz &&\
cp -r cmake-3.13.4-Linux-x86_64/* /usr/local/ &&\
rm -rf cmake.tar.gz cmake-3.13.4-Linux-x86_64 cmake-sha.txt
cp -r cmake-3.13.4-Linux-x86_64/* /usr/local/ &&\
rm -rf cmake.tar.gz cmake-3.13.4-Linux-x86_64 cmake-sha.txt
# install Ninja
RUN cd /tmp && curl -L https://github.com/ninja-build/ninja/archive/v1.9.0.zip -o ninja.zip &&\
@ -48,11 +50,11 @@ RUN cd /tmp && curl -L https://www.openssl.org/source/openssl-1.1.1d.tar.gz -o o
sha256sum -c openssl-sha.txt && tar -xzf openssl.tar.gz &&\
cd openssl-1.1.1d && scl enable devtoolset-8 -- ./config CFLAGS="-fPIC -O3" --prefix=/usr/local &&\
scl enable devtoolset-8 -- make -j`nproc` && scl enable devtoolset-8 -- make -j1 install &&\
ln -sv /usr/local/lib64/lib*.so.1.1 /usr/lib64/ &&\
ln -sv /usr/local/lib64/lib*.so.1.1 /usr/lib64/ &&\
cd /tmp/ && rm -rf /tmp/openssl-1.1.1d /tmp/openssl.tar.gz
LABEL version=0.1.12
ENV DOCKER_IMAGEVER=0.1.12
LABEL version=0.1.13
ENV DOCKER_IMAGEVER=0.1.13
ENV JAVA_HOME=/usr/lib/jvm/java-1.8.0
ENV CC=/opt/rh/devtoolset-8/root/usr/bin/gcc
ENV CXX=/opt/rh/devtoolset-8/root/usr/bin/g++

68
build/Dockerfile.devel Normal file
View File

@ -0,0 +1,68 @@
FROM foundationdb/foundationdb-build:0.1.13
USER root
ARG FDB_ARTIFACTSURL=http://foundationdb.org
ADD artifacts /mnt/artifacts
# Install build tools for building via make
RUN \
yum install -y distcc-server gperf rubygems python34 libmpc-devel npm
# Download and install llvm-10.0.0
RUN cd / &&\
curl -L $FDB_ARTIFACTSURL/downloads/docker/foundationdb-dev/LLVM-10.0.0-Linux.rpm > /mnt/artifacts/LLVM-10.0.0-Linux.rpm &&\
yum install -y /mnt/artifacts/LLVM-10.0.0-Linux.rpm &&\
curl -L $FDB_ARTIFACTSURL/downloads/docker/foundationdb-dev/gcc910.conf > /etc/ld.so.conf.d/gcc910.conf
# Download and install gcc-9.3.0
RUN cd / &&\
curl -L $FDB_ARTIFACTSURL/downloads/docker/foundationdb-dev/gcc-9.3.0.tar.gz | tar -xvz
# Download and install distcc 3.3.2 new centos binaries
RUN cd / &&\
curl -L $FDB_ARTIFACTSURL/downloads/docker/foundationdb-dev/distcc-3.3.2-centos.tar.gz | tar -xvz &&\
mkdir -p /usr/lib/gcc-cross &&\
update-distcc-symlinks &&\
mv -iv /usr/bin/distcc /usr/bin/distcc.orig &&\
mv -iv /usr/bin/distccd /usr/bin/distccd.orig &&\
mv -iv /usr/bin/distccmon-text /usr/bin/distccmon-text.orig
# Replace the clang and gcc links with dereferenced file
# Add md5sum links to compilers to allow unique id of binary
# Copy new devtoolset tools to /usr/local/bin
RUN cp -iv /usr/local/bin/clang++ /usr/local/bin/clang++.deref &&\
mv -iv /usr/local/bin/clang++ /usr/local/bin/clang++.lnk &&\
mv -iv /usr/local/bin/clang++.deref /usr/local/bin/clang++ &&\
cp -iv /usr/local/bin/clang /usr/local/bin/clang.deref &&\
mv -iv /usr/local/bin/clang /usr/local/bin/clang.lnk &&\
mv -iv /usr/local/bin/clang.deref /usr/local/bin/clang &&\
cp -iv /usr/local/bin/g++ /usr/local/bin/g++.deref &&\
mv -iv /usr/local/bin/g++ /usr/local/bin/g++.lnk &&\
mv -iv /usr/local/bin/g++.deref /usr/local/bin/g++ &&\
cp -iv /usr/local/bin/gcc /usr/local/bin/gcc.deref &&\
mv -iv /usr/local/bin/gcc /usr/local/bin/gcc.lnk &&\
mv -iv /usr/local/bin/gcc.deref /usr/local/bin/gcc &&\
for compiler in /usr/local/bin/gcc /usr/local/bin/g++ /opt/rh/devtoolset-8/root/usr/bin/g++ /opt/rh/devtoolset-8/root/usr/bin/gcc /usr/local/bin/clang /usr/local/bin/clang++; do md5file=$(md5sum "${compiler}" | cut -d\ -f1) && ln -sv "${compiler##*\/}" "${compiler}.${md5file:0:8}"; done &&\
for toolexe in addr2line ar as ld gdb valgrind; do cp -iv "/opt/rh/devtoolset-8/root/usr/bin/${toolexe}" "/usr/local/bin/${toolexe}"; done &&\
ldconfig &&\
rm -rf /mnt/artifacts
LABEL version=0.11.6
ENV DOCKER_IMAGEVER=0.11.6
ENV CLANGCC=/usr/local/bin/clang.de8a65ef
ENV CLANGCXX=/usr/local/bin/clang++.de8a65ef
ENV GCC80CC=/opt/rh/devtoolset-8/root/usr/bin/gcc.00f99754
ENV GCC80CXX=/opt/rh/devtoolset-8/root/usr/bin/g++.12c01dd6
ENV GCC93CC=/usr/local/bin/gcc.04edd07a
ENV GCC93CXX=/usr/local/bin/g++.b058d8c5
ENV CC=/usr/local/bin/clang.de8a65ef
ENV CXX=/usr/local/bin/clang++.de8a65ef
ENV USE_LD=LLD
ENV USE_LIBCXX=1
ENV CCACHE_NOHASHDIR=true
ENV CCACHE_UMASK=0000
ENV CCACHE_SLOPPINESS="file_macro,time_macros,include_file_mtime,include_file_ctime,file_stat_matches"
CMD scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash

0
build/artifacts/.gitkeep Normal file
View File

View File

@ -2,7 +2,7 @@ version: "3"
services:
common: &common
image: foundationdb/foundationdb-build:0.1.12
image: foundationdb/foundationdb-build:0.1.13
build-setup: &build-setup
<<: *common
@ -60,15 +60,23 @@ services:
snapshot-cmake: &snapshot-cmake
<<: *build-setup
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=0 -DVALGRIND=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" "packages" "strip_targets" && cpack'
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DFDB_RELEASE=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" "packages" "strip_targets" && cpack'
prb-cmake:
<<: *snapshot-cmake
snapshot-bindings-cmake: &snapshot-bindings-cmake
<<: *build-setup
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DFDB_RELEASE=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" "bindings/all"'
prb-bindings-cmake:
<<: *snapshot-bindings-cmake
snapshot-ctest: &snapshot-ctest
<<: *build-setup
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure'
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure'
prb-ctest:
<<: *snapshot-ctest
@ -76,7 +84,7 @@ services:
snapshot-correctness: &snapshot-correctness
<<: *build-setup
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure'
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure'
prb-correctness:
<<: *snapshot-correctness

View File

@ -469,7 +469,6 @@ In some situations, you may want to explicitly control the number of key-value p
LIMIT = 100 # adjust according to data characteristics
@fdb.transactional
def get_range_limited(tr, begin, end):
keys_found = True
while keys_found:

View File

@ -48,7 +48,6 @@ Code
Heres a basic function that successively reads sub-ranges of a size determined by the value of ``LIMIT``.
::
@fdb.transactional
def get_range_limited(tr, begin, end):
keys_found = True
while keys_found:

View File

@ -52,6 +52,7 @@ private:
double tpsRate;
double expiration;
double lastCheck;
bool rateSet = false;
Smoother smoothRate;
Smoother smoothReleased;
@ -68,7 +69,14 @@ public:
void update(ClientTagThrottleLimits const& limits) {
ASSERT(limits.tpsRate >= 0);
this->tpsRate = limits.tpsRate;
smoothRate.setTotal(limits.tpsRate);
if(!rateSet || expired()) {
rateSet = true;
smoothRate.reset(limits.tpsRate);
}
else {
smoothRate.setTotal(limits.tpsRate);
}
expiration = limits.expiration;
}

View File

@ -64,6 +64,15 @@ Future<Void> IFailureMonitor::onFailedFor(Endpoint const& endpoint, double susta
return waitForContinuousFailure(this, endpoint, sustainedFailureDuration, slope);
}
SimpleFailureMonitor::SimpleFailureMonitor() : endpointKnownFailed() {
// Mark ourselves as available in FailureMonitor
const auto& localAddresses = FlowTransport::transport().getLocalAddresses();
addressStatus[localAddresses.address] = FailureStatus(false);
if (localAddresses.secondaryAddress.present()) {
addressStatus[localAddresses.secondaryAddress.get()] = FailureStatus(false);
}
}
void SimpleFailureMonitor::setStatus(NetworkAddress const& address, FailureStatus const& status) {
// if (status.failed)

View File

@ -136,7 +136,7 @@ public:
class SimpleFailureMonitor : public IFailureMonitor {
public:
SimpleFailureMonitor() : endpointKnownFailed() {}
SimpleFailureMonitor();
void setStatus(NetworkAddress const& address, FailureStatus const& status);
void endpointNotFound(Endpoint const&);
virtual void notifyDisconnect(NetworkAddress const&);

View File

@ -586,6 +586,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
if (firstConnFailedTime.present()) {
if (now() - firstConnFailedTime.get() > FLOW_KNOBS->PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT) {
TraceEvent(SevWarnAlways, "PeerUnavailableForLongTime", conn ? conn->getDebugID() : UID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
firstConnFailedTime = now() - FLOW_KNOBS->PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT/2.0;
}
@ -1446,18 +1447,11 @@ bool FlowTransport::incompatibleOutgoingConnectionsPresent() {
}
void FlowTransport::createInstance(bool isClient, uint64_t transportId) {
g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor());
g_network->setGlobal(INetwork::enClientFailureMonitor, isClient ? (flowGlobalType)1 : nullptr);
g_network->setGlobal(INetwork::enFlowTransport, (flowGlobalType) new FlowTransport(transportId));
g_network->setGlobal(INetwork::enNetworkAddressFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddress);
g_network->setGlobal(INetwork::enNetworkAddressesFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddresses);
// Mark ourselves as avaiable in FailureMonitor
const auto& localAddresses = FlowTransport::transport().getLocalAddresses();
IFailureMonitor::failureMonitor().setStatus(localAddresses.address, FailureStatus(false));
if (localAddresses.secondaryAddress.present()) {
IFailureMonitor::failureMonitor().setStatus(localAddresses.secondaryAddress.get(), FailureStatus(false));
}
g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor());
g_network->setGlobal(INetwork::enClientFailureMonitor, isClient ? (flowGlobalType)1 : nullptr);
}
HealthMonitor* FlowTransport::healthMonitor() {

View File

@ -743,7 +743,7 @@ private:
// Get the common prefix between this key and the previous one, or 0 if there was no previous one.
int commonPrefix;
if(useDelta) {
if(useDelta && SERVER_KNOBS->PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS) {
commonPrefix = commonPrefixLength(lastSnapshotKeyA, lastSnapshotKeyB);
}
else {

View File

@ -546,10 +546,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( BEHIND_CHECK_COUNT, 2 );
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
init( MAX_SHARED_LOAD_BALANCE_DELAY, 20 );
init( MIN_TAG_PAGES_READ_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_READ_RATE = 0;
init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0;
init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096;
init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, false ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = true;
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -475,10 +475,10 @@ public:
int BEHIND_CHECK_COUNT;
int64_t BEHIND_CHECK_VERSIONS;
double WAIT_METRICS_WRONG_SHARD_CHANCE;
int MAX_SHARED_LOAD_BALANCE_DELAY;
int64_t MIN_TAG_PAGES_READ_RATE;
double READ_TAG_MEASUREMENT_INTERVAL;
int64_t OPERATION_COST_BYTE_FACTOR;
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;
//Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;

View File

@ -1105,7 +1105,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
commitLockReleaser.release();
}
if( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT ) {
if( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT ) {
wait( delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskPriority::UpdateStorage) );
}
else {

View File

@ -141,8 +141,9 @@ private:
// Only used by auto-throttles
double created = now();
double lastUpdated = now();
double lastUpdated = 0;
double lastReduced = now();
bool rateSet = false;
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
@ -157,13 +158,26 @@ private:
Optional<double> updateAndGetClientRate(Optional<double> requestRate) {
if(limits.expiration > now()) {
clientRate.setTotal(getTargetRate(requestRate));
double targetRate = getTargetRate(requestRate);
if(targetRate == std::numeric_limits<double>::max()) {
rateSet = false;
return Optional<double>();
}
if(!rateSet) {
rateSet = true;
clientRate.reset(targetRate);
}
else {
clientRate.setTotal(targetRate);
}
double rate = clientRate.smoothTotal();
ASSERT(rate >= 0);
return rate;
}
else {
TEST(true); // Get throttle rate for expired throttle
rateSet = false;
return Optional<double>();
}
}
@ -246,10 +260,14 @@ public:
throttle.limits.tpsRate = tpsRate.get();
throttle.limits.expiration = expiration.get();
Optional<double> clientRate = throttle.updateAndGetClientRate(getRequestRate(tag));
ASSERT(clientRate.present());
throttle.updateAndGetClientRate(getRequestRate(tag));
return tpsRate.get();
if(tpsRate.get() != std::numeric_limits<double>::max()) {
return tpsRate.get();
}
else {
return Optional<double>();
}
}
void manualThrottleTag(TransactionTag const& tag, TransactionPriority priority, double tpsRate, double expiration, Optional<ClientTagThrottleLimits> const& oldLimits) {

View File

@ -235,8 +235,8 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
wait(waitForAll(fValues));
break;
} catch (Error& e) {
if (retries++ > 10) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysGetKeysStuck", applierID)
if (retries++ > 10) { // TODO: Can we stop retry at the first error?
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysGetKeysStuck", applierID)
.detail("BatchIndex", batchIndex)
.detail("GetKeys", incompleteStagingKeys.size())
.error(e);

View File

@ -633,32 +633,31 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
state Future<Void> watch4RestoreRequest;
state ReadYourWritesTransaction tr(cx);
// wait for the restoreRequestTriggerKey to be set by the client/test workload
// restoreRequestTriggerKey should already been set
loop {
try {
TraceEvent("FastRestoreMasterPhaseCollectRestoreRequestsWait");
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
// Sanity check
Optional<Value> numRequests = wait(tr.get(restoreRequestTriggerKey));
if (!numRequests.present()) {
watch4RestoreRequest = tr.watch(restoreRequestTriggerKey);
wait(tr.commit());
wait(watch4RestoreRequest);
} else {
Standalone<RangeResultRef> restoreRequestValues =
wait(tr.getRange(restoreRequestKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!restoreRequestValues.more);
if (restoreRequestValues.size()) {
for (auto& it : restoreRequestValues) {
restoreRequests.push_back(restoreRequests.arena(), decodeRestoreRequestValue(it.value));
TraceEvent("FastRestoreMasterPhaseCollectRestoreRequests")
.detail("RestoreRequest", restoreRequests.back().toString());
}
} else {
TraceEvent(SevWarnAlways, "FastRestoreMasterPhaseCollectRestoreRequestsEmptyRequests");
ASSERT(numRequests.present());
Standalone<RangeResultRef> restoreRequestValues =
wait(tr.getRange(restoreRequestKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!restoreRequestValues.more);
if (restoreRequestValues.size()) {
for (auto& it : restoreRequestValues) {
restoreRequests.push_back(restoreRequests.arena(), decodeRestoreRequestValue(it.value));
TraceEvent("FastRestoreMasterPhaseCollectRestoreRequests")
.detail("RestoreRequest", restoreRequests.back().toString());
}
break;
} else {
TraceEvent(SevError, "FastRestoreMasterPhaseCollectRestoreRequestsEmptyRequests");
wait(delay(5.0));
}
} catch (Error& e) {
wait(tr.onError(e));

View File

@ -248,6 +248,39 @@ ACTOR Future<Void> startRestoreWorker(Reference<RestoreWorkerData> self, Restore
return Void();
}
ACTOR static Future<Void> waitOnRestoreRequests(Database cx, UID nodeID = UID()) {
state Future<Void> watch4RestoreRequest;
state ReadYourWritesTransaction tr(cx);
state Optional<Value> numRequests;
// wait for the restoreRequestTriggerKey to be set by the client/test workload
TraceEvent("FastRestoreWaitOnRestoreRequest", nodeID);
loop {
try {
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> _numRequests = wait(tr.get(restoreRequestTriggerKey));
numRequests = _numRequests;
if (!numRequests.present()) {
watch4RestoreRequest = tr.watch(restoreRequestTriggerKey);
wait(tr.commit());
TraceEvent(SevInfo, "FastRestoreWaitOnRestoreRequestTriggerKey", nodeID);
wait(watch4RestoreRequest);
TraceEvent(SevInfo, "FastRestoreDetectRestoreRequestTriggerKeyChanged", nodeID);
} else {
TraceEvent(SevInfo, "FastRestoreRestoreRequestTriggerKey", nodeID)
.detail("TriggerKey", numRequests.get().toString());
break;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
// RestoreMaster is the leader
ACTOR Future<Void> monitorleader(Reference<AsyncVar<RestoreWorkerInterface>> leader, Database cx,
RestoreWorkerInterface myWorkerInterf) {
@ -331,6 +364,8 @@ ACTOR Future<Void> _restoreWorker(Database cx, LocalityData locality) {
.detail("TxnBatchSize", SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES)
.detail("VersionBatchSize", SERVER_KNOBS->FASTRESTORE_VERSIONBATCH_MAX_BYTES);
wait(waitOnRestoreRequests(cx, myWorkerInterf.id()));
wait(monitorleader(leader, cx, myWorkerInterf));
TraceEvent("FastRestoreWorker", myWorkerInterf.id()).detail("LeaderElection", "WaitForLeader");

View File

@ -1198,7 +1198,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
commitLockReleaser.release();
}
if( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT ) {
if( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT ) {
wait( delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskPriority::UpdateStorage) );
}
else {

View File

@ -408,8 +408,6 @@ public:
Reference<AsyncVar<ServerDBInfo>> db;
Database cx;
ActorCollection actors;
Future<Void> loadBalanceDelay;
int sharedDelayCount;
StorageServerMetrics metrics;
CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
@ -605,7 +603,7 @@ public:
rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
durableInProgress(Void()),
versionLag(0), primaryLocality(tagLocalityInvalid),
updateEagerReads(0), sharedDelayCount(0), loadBalanceDelay(Void()),
updateEagerReads(0),
shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
@ -626,13 +624,6 @@ public:
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
}
Future<Void> getLoadBalanceDelay() {
if(loadBalanceDelay.isReady() || ++sharedDelayCount > SERVER_KNOBS->MAX_SHARED_LOAD_BALANCE_DELAY) {
sharedDelayCount = 0;
loadBalanceDelay = delay(0, TaskPriority::DefaultEndpoint);
}
return loadBalanceDelay;
}
//~StorageServer() { fclose(log); }
// Puts the given shard into shards. The caller is responsible for adding shards
@ -927,7 +918,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
wait( data->getLoadBalanceDelay() );
wait( delay(0, TaskPriority::DefaultEndpoint) );
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
@ -1464,7 +1455,7 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
// // Placeholder for up-prioritizing fetches for important requests
// taskType = TaskPriority::DefaultDelay;
} else {
wait( data->getLoadBalanceDelay() );
wait( delay(0, TaskPriority::DefaultEndpoint) );
}
try {
@ -1595,7 +1586,7 @@ ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req ) {
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
wait( data->getLoadBalanceDelay() );
wait( delay(0, TaskPriority::DefaultEndpoint) );
try {
state Version version = wait( waitForVersion( data, req.version ) );

View File

@ -73,7 +73,7 @@ struct ReadHotDetectionWorkload : TestWorkload {
loop {
try {
for (int i = 0; i < self->keyCount; i++) {
auto key = StringRef(format("testkey%08x", i));
Standalone<StringRef> key = StringRef(format("testkey%08x", i));
if (key == self->readKey) {
tr.set(key, largeValue);
} else {