Merge branch 'master' of https://github.com/apple/foundationdb into add-health-metrics

This commit is contained in:
Trevor Clinkenbeard 2019-03-02 17:07:00 -08:00
commit 39f612d132
41 changed files with 618 additions and 271 deletions

View File

@ -21,7 +21,7 @@ project(foundationdb
VERSION 6.1.0
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
HOMEPAGE_URL "http://www.foundationdb.org/"
LANGUAGES ASM C CXX)
LANGUAGES C CXX ASM)
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${PROJECT_SOURCE_DIR}/cmake")
message (STATUS "${PROJECT_SOURCE_DIR} ${PROJECT_BINARY_DIR}")

View File

@ -1,6 +1,11 @@
export
PLATFORM := $(shell uname)
ARCH := $(shell uname -m)
ifeq ("$(wildcard /etc/centos-release)", "")
LIBSTDCPP_HACK = 1
else
LIBSTDCPP_HACK = 0
endif
TOPDIR := $(shell pwd)
@ -72,6 +77,12 @@ else
CCACHE_CXX := $(CXX)
endif
# Default variables don't get pushed into the environment, but scripts in build/
# rely on the existence of CC in the environment.
ifeq ($(origin CC), default)
CC := $(CC)
endif
ACTORCOMPILER := bin/actorcompiler.exe
# UNSTRIPPED := 1
@ -199,6 +210,7 @@ lib/libstdc++.a: $(shell $(CC) -print-file-name=libstdc++_pic.a)
@ar rcs $@ .libstdc++/*.o
@rm -r .libstdc++
docpreview: javadoc
@echo "Generating docpreview"
@TARGETS= $(MAKE) -C documentation docpreview

View File

@ -51,7 +51,7 @@ become the only build system available.
1. Run the docker image interactively [Docker Run](https://docs.docker.com/engine/reference/run/#general-form) with the directory containing the foundationdb repo mounted [Docker Mounts](https://docs.docker.com/storage/volumes/).
```shell
docker run -it -v '/local/dir/path/foundationdb:/docker/dir/path/foundationdb' foundationdb/foundationdb-build:latest /bin/bash
docker run -it -v '/local/dir/path/foundationdb:/docker/dir/path/foundationdb' foundationdb/foundationdb-build:latest
```
1. Navigate to the container's mounted directory which contains the foundationdb repo.
@ -73,9 +73,10 @@ based build system.
To build with CMake, generally the following is required (works on Linux and
Mac OS - for Windows see below):
1. Check out this repository.
2. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.52.0/).
1. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.67.0/).
1. Unpack boost (you don't need to compile it)
1. Install [Mono](http://www.mono-project.com/download/stable/).
1. Install a [JDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html). FoundationDB currently builds with Java 8.
@ -135,7 +136,7 @@ edit and navigation features your IDE supports.
For example, if you want to use XCode to make changes to FoundationDB you can
create a XCode-project with the following command:
```
```sh
cmake -G Xcode -DOPEN_FOR_IDE=ON <FDB_SOURCE_DIRECTORY>
```
@ -165,14 +166,14 @@ can use [Hombrew](https://brew.sh/). LibreSSL will not be installed in
`/usr/local` instead it will stay in `/usr/local/Cellar`. So the cmake command
will look somethink like this:
```
```sh
cmake -DLibreSSL_ROOT=/usr/local/Cellar/libressl/2.8.3 <PATH_TO_FOUNDATIONDB_SOURCE>
```
To generate a installable package, you have to call CMake with the corresponding
arguments and then use cpack to generate the package:
```
```sh
cmake -DINSTALL_LAYOUT=OSX <FDB_SOURCE_DIR>
make
cpack
@ -184,13 +185,13 @@ Under Windows, the build instructions are very similar, with the main difference
that Visual Studio is used to compile.
1. Install Visual Studio 2017 (Community Edition is tested)
2. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.52.0/).
1. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.67.0/).
1. Unpack boost (you don't need to compile it)
1. Install [Mono](http://www.mono-project.com/download/stable/).
1. Install a [JDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html). FoundationDB currently builds with Java 8.
1. Set JAVA_HOME to the unpacked location and JAVA_COMPILE to
$JAVA_HOME/bin/javac
1. Set `JAVA_HOME` to the unpacked location and JAVA_COMPILE to
`$JAVA_HOME/bin/javac`.
1. (Optional) Install [WIX](http://wixtoolset.org/). Without it Visual Studio
won't build the Windows installer.
1. Create a build directory (you can have the build directory anywhere you
@ -198,10 +199,11 @@ that Visual Studio is used to compile.
1. `cd build`
1. `cmake -G "Visual Studio 15 2017 Win64" -DBOOST_ROOT=<PATH_TO_BOOST> <PATH_TO_FOUNDATIONDB_DIRECTORY>`
1. This should succeed. In which case you can build using msbuild:
`msbuild /p:Configuration=Release fdb.sln`. You can also open the resulting
`msbuild /p:Configuration=Release foundationdb.sln`. You can also open the resulting
solution in Visual Studio and compile from there. However, be aware that
using Visual Studio for development is currently not supported as Visual
Studio will only know about the generated files.
Studio will only know about the generated files. `msbuild` is located at
`c:\Program Files (x86)\MSBuild\14.0\Bin\MSBuild.exe` for Visual Studio 15.
If you want TLS support to be enabled under Windows you currently have to build
and install LibreSSL yourself as the newer LibreSSL versions are not provided

View File

@ -60,7 +60,12 @@ class RandomGenerator(object):
sign = -1 if random.random() < 0.5 else 1
exponent = random.randint(-(1 << (exp_bits - 1)) - 10, (1 << (exp_bits - 1) - 1))
mantissa = random.random()
return sign * math.pow(2, exponent) * mantissa
result = sign * math.pow(2, exponent) * mantissa
if random.random() < 0.05:
result = float(int(result))
return result
def random_tuple(self, max_size, incomplete_versionstamps=False):
size = random.randint(1, max_size)

View File

@ -30,8 +30,10 @@ fdb_c_tests_HEADERS := -Ibindings/c
CLEAN_TARGETS += fdb_c_tests_clean
ifeq ($(PLATFORM),linux)
fdb_c_LIBS += lib/libstdc++.a -lm -lpthread -lrt -ldl
fdb_c_LDFLAGS += -Wl,--version-script=bindings/c/fdb_c.map -static-libgcc -Wl,-z,nodelete
fdb_c_LDFLAGS += -Wl,--version-script=bindings/c/fdb_c.map -static-libgcc -Wl,-z,nodelete -lm -lpthread -lrt -ldl
ifeq ($(LIBSTDCPP_HACK),1)
fdb_c_LIBS += lib/libstdc++.a
endif
fdb_c_tests_LIBS += -lpthread
endif

View File

@ -79,7 +79,7 @@ class SingleFloat(object):
self.value = ctypes.c_float(value).value
elif isinstance(value, ctypes.c_float):
self.value = value.value
elif isinstance(value, six.integertypes):
elif isinstance(value, six.integer_types):
self.value = ctypes.c_float(value).value
else:
raise ValueError("Incompatible type for single-precision float: " + repr(value))

View File

@ -21,6 +21,7 @@
import ctypes
import math
import sys
import os
import struct
@ -498,6 +499,8 @@ class Tester:
elif inst.op == six.u("ENCODE_FLOAT"):
f_bytes = inst.pop()
f = struct.unpack(">f", f_bytes)[0]
if not math.isnan(f) and not math.isinf(f) and not f == -0.0 and f == int(f):
f = int(f)
inst.push(fdb.tuple.SingleFloat(f))
elif inst.op == six.u("ENCODE_DOUBLE"):
d_bytes = inst.pop()

View File

@ -1,9 +1,9 @@
FROM ubuntu:15.04
LABEL version=0.0.5
LABEL version=0.0.6
RUN sed -i -e 's/archive.ubuntu.com\|security.ubuntu.com/old-releases.ubuntu.com/g' -e 's/us\.old/old/g' /etc/apt/sources.list && apt-get clean
RUN apt-get update && apt-get --no-install-recommends install -y --force-yes bzip2 ca-certificates=20141019 adduser apt base-files base-passwd bash binutils build-essential cpp cpp-4.9 dpkg dos2unix fakeroot findutils g++=4:4.9.2-2ubuntu2 g++-4.9=4.9.2-10ubuntu13 gawk=1:4.1.1+dfsg-1 gcc-5-base gcc=4:4.9.2-2ubuntu2 gcc-4.9=4.9.2-10ubuntu13 gcc-4.9-base:amd64=4.9.2-10ubuntu13 gcc-5-base:amd64=5.1~rc1-0ubuntu1 gdb git golang golang-go golang-go-linux-amd64 golang-src grep gzip hostname java-common libasan1 liblsan0 libtsan0 libubsan0 libcilkrts5 libgcc-4.9-dev libstdc++-4.9-dev libgl1-mesa-dri libgl1-mesa-glx libmono-system-xml-linq4.0-cil libmono-system-data-datasetextensions4.0-cil libstdc++-4.9-pic locales login m4 make makedev mawk mono-dmcs npm openjdk-8-jdk passwd python-distlib python-gevent python-greenlet python-html5lib python-minimal python-pip python-pkg-resources python-requests python-setuptools python-six python-urllib3 python-yaml python2.7 python2.7-minimal rpm rpm2cpio ruby ruby2.1 rubygems-integration sed tar texinfo tzdata-java udev unzip util-linux valgrind vim wget golang-go.tools curl sphinx-common gnupg python-dev python3 python3-dev
RUN apt-get update && apt-get --no-install-recommends install -y --force-yes bzip2 ca-certificates=20141019 adduser apt base-files base-passwd bash binutils build-essential cpp cpp-4.9 dpkg dos2unix elfutils fakeroot findutils g++=4:4.9.2-2ubuntu2 g++-4.9=4.9.2-10ubuntu13 gawk=1:4.1.1+dfsg-1 gcc-5-base gcc=4:4.9.2-2ubuntu2 gcc-4.9=4.9.2-10ubuntu13 gcc-4.9-base:amd64=4.9.2-10ubuntu13 gcc-5-base:amd64=5.1~rc1-0ubuntu1 gdb git golang golang-go golang-go-linux-amd64 golang-src grep gzip hostname java-common libasan1 liblsan0 libtsan0 libubsan0 libcilkrts5 libgcc-4.9-dev libstdc++-4.9-dev libgl1-mesa-dri libgl1-mesa-glx libmono-system-xml-linq4.0-cil libmono-system-data-datasetextensions4.0-cil libstdc++-4.9-pic locales login m4 make makedev mawk mono-dmcs npm openjdk-8-jdk passwd python-distlib python-gevent python-greenlet python-html5lib python-minimal python-pip python-pkg-resources python-requests python-setuptools python-six python-urllib3 python-yaml python2.7 python2.7-minimal rpm rpm2cpio ruby ruby2.1 rubygems-integration sed tar texinfo tzdata-java udev unzip util-linux valgrind vim wget golang-go.tools curl sphinx-common gnupg python-dev python3 python3-dev
RUN adduser --disabled-password --gecos '' fdb && chown -R fdb /opt && chmod -R 0777 /opt

View File

@ -18,7 +18,7 @@ case $1 in
OPTIONS=
fi
OPTIONS=$( eval echo "$OPTIONS $LDFLAGS \$$2_LDFLAGS \$$2_OBJECTS \$$2_LIBS \$$2_STATIC_LIBS_REAL -o $3" )
OPTIONS=$( eval echo "$OPTIONS $LDFLAGS \$$2_OBJECTS \$$2_LIBS \$$2_STATIC_LIBS_REAL \$$2_LDFLAGS -o $3" )
if echo $OPTIONS | grep -q -- -static-libstdc\+\+ ; then
OPTIONS=$( echo $OPTIONS | sed -e s,-static-libstdc\+\+,, -e s,\$,\ `$CC -print-file-name=libstdc++.a`\ -lm, )

View File

@ -49,7 +49,7 @@ Encoding: `b'\x05' + ''.join(map(lambda x: b'\x00\xff' if x is None else pack(x)
Test case: `pack( (“foo\x00bar”, None, ()) ) == b'\x05\x01foo\x00\xffbar\x00\x00\xff\x05\x00\x00'`
Status: Standard
The list is ended with a 0x00 byte. Nulls within the tuple are encoded as `\x00\xff`. There is no other null escaping. In particular, 0x00 bytes that are within the nested types can be left as-is as they are passed over when decoding the interior types. To show how this fixes the bug in the previous version of nested tuples, the empty tuple is now encoded as `\x05\x00` while the tuple containing only null is encoded as `\x05\x00\xff\x00`, so the first tuple will sort first.
The list ends with a 0x00 byte. Nulls within the tuple are encoded as `\x00\xff`. There is no other null escaping. In particular, 0x00 bytes that are within the nested types can be left as-is as they are passed over when decoding the interior types. To show how this fixes the bug in the previous version of nested tuples, the empty tuple is now encoded as `\x05\x00` while the tuple containing only null is encoded as `\x05\x00\xff\x00`, so the first tuple will sort first.
### **Negative arbitrary-precision Integer**
@ -90,7 +90,7 @@ Typecodes:
&nbsp;`0x21` - double (64 bits)
&nbsp;`0x22` - long double (80 bits)
Length: 4 - 10 bytes
Test case: `pack( -42f ) == b'=\xd7\xff\xff'`
Test case: `pack( -42f ) == b'\x20\x3d\xd7\xff\xff'`
Encoding: Big-endian IEEE binary representation, followed by the following transformation:
```python
if ord(rep[0])&0x80: # Check sign bit

View File

@ -13,6 +13,7 @@ Improved replication mechanism, a new hierarchical replication technique that fu
* Get read version, read, and commit requests are counted and aggregated by server-side latency in configurable latency bands and output in JSON status. `(PR #1084) <https://github.com/apple/foundationdb/pull/1084>`_
* Added configuration option to choose log spilling implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
* Added configuration option to choose log system implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
* Batch priority transactions are now limited separately by ratekeeper and will be throttled at lower levels of cluster saturation. This makes it possible to run a more intense background load at saturation without significantly affecting normal priority transactions. It is still recommended not to run excessive loads at batch priority. `(PR #1198) <https://github.com/apple/foundationdb/pull/1198>`_
Performance
-----------
@ -20,6 +21,8 @@ Performance
Fixes
-----
* Python: Creating a ``SingleFloat`` for the tuple layer didn't work with integers. `(PR #1216) <https://github.com/apple/foundationdb/pull/1216>`_
Status
------

View File

@ -69,8 +69,8 @@ Code
In this example, were storing user data based on user_ID but sometimes need to retrieve users based on their zipcode. We use a transactional function to set user data and its index and another to retrieve data using the index.
::
user = Subspace(('user',))
index = Subspace(('zipcode_index',))
user = fdb.Subspace(('user',))
index = fdb.Subspace(('zipcode_index',))
@fdb.transactional
def set_user(tr, ID, name, zipcode):
@ -80,11 +80,14 @@ In this example, were storing user data based on user_ID but sometimes need t
# Normal lookup
@fdb.transactional
def get_user(tr, ID):
return tr[user[ID]]
for k,v in tr[user[ID].range()]:
return v
return None
# Index lookup
@fdb.transactional
def get_user_IDs_in_region(tr, region):
return [index.unpack(k)[1] for k, _ in tr[index[region].range()]]
def get_user_IDs_in_region(tr, zipcode):
return [index.unpack(k)[1] for k, _ in tr[index[zipcode].range()]]
That's just about all you need to create an index.

View File

@ -26,8 +26,7 @@ fdbbackup_LIBS := lib/libfdbclient.a lib/libfdbrpc.a lib/libflow.a $(FDB_TLS_LIB
fdbbackup_STATIC_LIBS := $(TLS_LIBS)
ifeq ($(PLATFORM),linux)
fdbbackup_LIBS += -ldl -lpthread -lrt
fdbbackup_LDFLAGS += -static-libstdc++ -static-libgcc
fdbbackup_LDFLAGS += -static-libstdc++ -static-libgcc -ldl -lpthread -lrt
# GPerfTools profiler (uncomment to use)
# fdbbackup_CFLAGS += -I/opt/gperftools/include -DUSE_GPERFTOOLS=1

View File

@ -22,14 +22,13 @@
fdbcli_CFLAGS := $(fdbclient_CFLAGS)
fdbcli_LDFLAGS := $(fdbrpc_LDFLAGS)
fdbcli_LIBS := lib/libfdbclient.a lib/libfdbrpc.a lib/libflow.a -ldl $(FDB_TLS_LIB)
fdbcli_LIBS := lib/libfdbclient.a lib/libfdbrpc.a lib/libflow.a $(FDB_TLS_LIB)
fdbcli_STATIC_LIBS := $(TLS_LIBS)
fdbcli_GENERATED_SOURCES += versions.h
ifeq ($(PLATFORM),linux)
fdbcli_LDFLAGS += -static-libstdc++ -static-libgcc
fdbcli_LIBS += -lpthread -lrt
fdbcli_LDFLAGS += -static-libstdc++ -static-libgcc -lpthread -lrt -ldl
else ifeq ($(PLATFORM),osx)
fdbcli_LDFLAGS += -lc++
endif

View File

@ -1180,7 +1180,31 @@ public:
// can't be used because backup files are read-only. Cached mode can only help during restore task retries handled
// by the same process that failed the first task execution anyway, which is a very rare case.
#endif
return IAsyncFileSystem::filesystem()->open(fullPath, flags, 0644);
Future<Reference<IAsyncFile>> f = IAsyncFileSystem::filesystem()->open(fullPath, flags, 0644);
if(g_network->isSimulated()) {
int blockSize = 0;
// Extract block size from the filename, if present
size_t lastComma = path.find_last_of(',');
if(lastComma != path.npos) {
blockSize = atoi(path.substr(lastComma + 1).c_str());
}
if(blockSize <= 0) {
blockSize = g_random->randomInt(1e4, 1e6);
}
if(g_random->random01() < .01) {
blockSize /= g_random->randomInt(1, 3);
}
return map(f, [=](Reference<IAsyncFile> fr) {
int readAhead = g_random->randomInt(0, 3);
int reads = g_random->randomInt(1, 3);
int cacheSize = g_random->randomInt(0, 3);
return Reference<IAsyncFile>(new AsyncFileReadAheadCache(fr, blockSize, readAhead, reads, cacheSize));
});
}
return f;
}
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {

View File

@ -381,8 +381,16 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
else if (ck == LiteralStringRef("proxies")) parse(&masterProxyCount, value);
else if (ck == LiteralStringRef("resolvers")) parse(&resolverCount, value);
else if (ck == LiteralStringRef("logs")) parse(&desiredTLogCount, value);
else if (ck == LiteralStringRef("log_replicas")) parse(&tLogReplicationFactor, value);
else if (ck == LiteralStringRef("log_anti_quorum")) parse(&tLogWriteAntiQuorum, value);
else if (ck == LiteralStringRef("log_replicas")) {
parse(&tLogReplicationFactor, value);
tLogWriteAntiQuorum = std::min(tLogWriteAntiQuorum, tLogReplicationFactor/2);
}
else if (ck == LiteralStringRef("log_anti_quorum")) {
parse(&tLogWriteAntiQuorum, value);
if(tLogReplicationFactor > 0) {
tLogWriteAntiQuorum = std::min(tLogWriteAntiQuorum, tLogReplicationFactor/2);
}
}
else if (ck == LiteralStringRef("storage_replicas")) parse(&storageTeamSize, value);
else if (ck == LiteralStringRef("log_version")) {
parse((&type), value);

View File

@ -115,8 +115,8 @@ public:
// Client status updater
struct ClientStatusUpdater {
std::vector<BinaryWriter> inStatusQ;
std::vector<BinaryWriter> outStatusQ;
std::vector< std::pair<std::string, BinaryWriter> > inStatusQ;
std::vector< std::pair<std::string, BinaryWriter> > outStatusQ;
Future<Void> actor;
};
ClientStatusUpdater clientStatusUpdater;

View File

@ -400,10 +400,16 @@ FileBackupAgent::FileBackupAgent()
namespace fileBackup {
// Padding bytes for backup files. The largest padded area that could ever have to be written is
// the size of two 32 bit ints and the largest key size and largest value size. Since CLIENT_KNOBS
// may not be initialized yet a conservative constant is being used.
std::string paddingFFs(128 * 1024, 0xFF);
// Return a block of contiguous padding bytes, growing if needed.
Value makePadding(int size) {
static Value pad;
if(pad.size() < size) {
pad = makeString(size);
memset(mutateString(pad), '\xff', pad.size());
}
return pad.substr(0, size);
}
// File Format handlers.
// Both Range and Log formats are designed to be readable starting at any 1MB boundary
@ -441,11 +447,18 @@ namespace fileBackup {
RangeFileWriter(Reference<IBackupFile> file = Reference<IBackupFile>(), int blockSize = 0) : file(file), blockSize(blockSize), blockEnd(0), fileVersion(1001) {}
// Handles the first block and internal blocks. Ends current block if needed.
ACTOR static Future<Void> newBlock(RangeFileWriter *self, int bytesNeeded) {
// The final flag is used in simulation to pad the file's final block to a whole block size
ACTOR static Future<Void> newBlock(RangeFileWriter *self, int bytesNeeded, bool final = false) {
// Write padding to finish current block if needed
int bytesLeft = self->blockEnd - self->file->size();
if(bytesLeft > 0) {
wait(self->file->append((uint8_t *)paddingFFs.data(), bytesLeft));
state Value paddingFFs = makePadding(bytesLeft);
wait(self->file->append(paddingFFs.begin(), bytesLeft));
}
if(final) {
ASSERT(g_network->isSimulated());
return Void();
}
// Set new blockEnd
@ -468,6 +481,15 @@ namespace fileBackup {
return Void();
}
// Used in simulation only to create backup file sizes which are an integer multiple of the block size
Future<Void> padEnd() {
ASSERT(g_network->isSimulated());
if(file->size() > 0) {
return newBlock(this, 0, true);
}
return Void();
}
// Ends the current block if necessary based on bytesNeeded.
Future<Void> newBlockIfNeeded(int bytesNeeded) {
if(file->size() + bytesNeeded > blockEnd)
@ -620,7 +642,8 @@ namespace fileBackup {
// Write padding if needed
int bytesLeft = self->blockEnd - self->file->size();
if(bytesLeft > 0) {
wait(self->file->append((uint8_t *)paddingFFs.data(), bytesLeft));
state Value paddingFFs = makePadding(bytesLeft);
wait(self->file->append(paddingFFs.begin(), bytesLeft));
}
// Set new blockEnd
@ -1108,6 +1131,10 @@ namespace fileBackup {
state Key nextKey = done ? endKey : keyAfter(lastKey);
wait(rangeFile.writeKey(nextKey));
if(BUGGIFY) {
rangeFile.padEnd();
}
bool usedFile = wait(finishRangeFile(outFile, cx, task, taskBucket, KeyRangeRef(beginKey, nextKey), outVersion));
TraceEvent("FileBackupWroteRangeFile")
.suppressFor(60)

View File

@ -362,15 +362,17 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext *cx) {
cx->clientStatusUpdater.inStatusQ.swap(cx->clientStatusUpdater.outStatusQ);
// Split Transaction Info into chunks
state std::vector<TrInfoChunk> trChunksQ;
for (auto &bw : cx->clientStatusUpdater.outStatusQ) {
for (auto &entry : cx->clientStatusUpdater.outStatusQ) {
auto &bw = entry.second;
int64_t value_size_limit = BUGGIFY ? g_random->randomInt(1e3, CLIENT_KNOBS->VALUE_SIZE_LIMIT) : CLIENT_KNOBS->VALUE_SIZE_LIMIT;
int num_chunks = (bw.getLength() + value_size_limit - 1) / value_size_limit;
std::string random_id = g_random->randomAlphaNumeric(16);
std::string user_provided_id = entry.first.size() ? entry.first + "/" : "";
for (int i = 0; i < num_chunks; i++) {
TrInfoChunk chunk;
BinaryWriter chunkBW(Unversioned());
chunkBW << bigEndian32(i+1) << bigEndian32(num_chunks);
chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toStringRef().toString() + "/" + std::string(4, '\x00'));
chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toStringRef().toString() + "/" + user_provided_id + std::string(4, '\x00'));
int32_t pos = littleEndian32(clientLatencyName.size());
memcpy(mutateString(chunk.key) + chunk.key.size() - sizeof(int32_t), &pos, sizeof(int32_t));
if (i == num_chunks - 1) {
@ -1987,7 +1989,7 @@ void Transaction::operator=(Transaction&& r) noexcept(true) {
void Transaction::flushTrLogsIfEnabled() {
if (trLogInfo && trLogInfo->logsAdded && trLogInfo->trLogWriter.getData()) {
ASSERT(trLogInfo->flushed == false);
cx->clientStatusUpdater.inStatusQ.push_back(std::move(trLogInfo->trLogWriter));
cx->clientStatusUpdater.inStatusQ.push_back({ trLogInfo->identifier, std::move(trLogInfo->trLogWriter) });
trLogInfo->flushed = true;
}
}
@ -2763,25 +2765,40 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
break;
case FDBTransactionOptions::TRANSACTION_LOGGING_ENABLE:
setOption(FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER, value);
setOption(FDBTransactionOptions::LOG_TRANSACTION);
break;
case FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER:
validateOptionValue(value, true);
if(value.get().size() > 100) {
if (value.get().size() > 100) {
throw invalid_option_value();
}
if(trLogInfo) {
if(!trLogInfo->identifier.present()) {
if (trLogInfo) {
if (trLogInfo->identifier.empty()) {
trLogInfo->identifier = printable(value.get());
}
else if(trLogInfo->identifier.get() != printable(value.get())) {
TraceEvent(SevWarn, "CannotChangeTransactionLoggingIdentifier").detail("PreviousIdentifier", trLogInfo->identifier.get()).detail("NewIdentifier", printable(value.get()));
else if (trLogInfo->identifier != printable(value.get())) {
TraceEvent(SevWarn, "CannotChangeDebugTransactionIdentifier").detail("PreviousIdentifier", trLogInfo->identifier).detail("NewIdentifier", printable(value.get()));
throw client_invalid_operation();
}
}
else {
trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(printable(value.get())));
trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(printable(value.get()), TransactionLogInfo::DONT_LOG));
}
break;
case FDBTransactionOptions::LOG_TRANSACTION:
validateOptionValue(value, false);
if (trLogInfo) {
trLogInfo->logTo(TransactionLogInfo::TRACE_LOG);
}
else {
TraceEvent(SevWarn, "DebugTransactionIdentifierNotSet").detail("Error", "Debug Transaction Identifier option must be set before logging the transaction");
throw client_invalid_operation();
}
break;
case FDBTransactionOptions::MAX_RETRY_DELAY:
@ -3154,7 +3171,7 @@ Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(cons
if(!cx->isError()) {
double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : cx->clientInfo->get().clientTxnInfoSampleRate;
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) && g_random->random01() < clientSamplingProbability && (!g_network->isSimulated() || !g_simulator.speedUpSimulation)) {
return Reference<TransactionLogInfo>(new TransactionLogInfo());
return Reference<TransactionLogInfo>(new TransactionLogInfo(TransactionLogInfo::DATABASE));
}
}

View File

@ -172,34 +172,37 @@ struct TransactionInfo {
};
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
TransactionLogInfo() : logToDatabase(true) {}
TransactionLogInfo(std::string identifier) : logToDatabase(false), identifier(identifier) {}
enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 };
BinaryWriter trLogWriter{ IncludeVersion() };
bool logsAdded{ false };
bool flushed{ false };
TransactionLogInfo() : logLocation(DONT_LOG) {}
TransactionLogInfo(LoggingLocation location) : logLocation(location) {}
TransactionLogInfo(std::string id, LoggingLocation location) : logLocation(location), identifier(id) {}
void setIdentifier(std::string id) { identifier = id; }
void logTo(LoggingLocation loc) { logLocation = logLocation | loc; }
template <typename T>
void addLog(const T& event) {
ASSERT(logToDatabase || identifier.present());
if(identifier.present()) {
event.logEvent(identifier.get());
if(logLocation & TRACE_LOG) {
ASSERT(!identifier.empty())
event.logEvent(identifier);
}
if (flushed) {
return;
}
if(logToDatabase) {
if(logLocation & DATABASE) {
logsAdded = true;
static_assert(std::is_base_of<FdbClientLogEvents::Event, T>::value, "Event should be derived class of FdbClientLogEvents::Event");
trLogWriter << event;
}
}
bool logToDatabase;
Optional<std::string> identifier;
BinaryWriter trLogWriter{ IncludeVersion() };
bool logsAdded{ false };
bool flushed{ false };
int logLocation;
std::string identifier;
};
struct Watch : public ReferenceCounted<Watch>, NonCopyable {

View File

@ -230,6 +230,25 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"qos":{
"worst_queue_bytes_log_server":460,
"batch_performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
"name":{
"$enum":[
"workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio"
]
},
"description":"The database is not being saturated by the workload."
},
"performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
@ -249,7 +268,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"description":"The database is not being saturated by the workload."
},
"batch_transactions_per_second_limit":0,
"transactions_per_second_limit":0,
"batch_released_transactions_per_second":0,
"released_transactions_per_second":0,
"limiting_queue_bytes_storage_server":0,
"worst_queue_bytes_storage_server":0,

View File

@ -173,7 +173,11 @@ description is not currently required but encouraged.
hidden="true" />
<Option name="debug_retry_logging" code="401" paramType="String" paramDescription="Optional transaction name" />
<Option name="transaction_logging_enable" code="402" paramType="String" paramDescription="String identifier to be used in the logs when tracing this transaction. The identifier must not exceed 100 characters."
description="Enables tracing for this transaction and logs results to the client trace logs. Client trace logging must be enabled to get log output." />
description="Deprecated" />
<Option name="debug_transaction_identifier" code="403" paramType="String" paramDescription="String identifier to be used when tracing or profiling this transaction. The identifier must not exceed 100 characters."
description="Sets a client provided identifier for the transaction that will be used in scenarios like tracing or profiling. Client trace logging or transaction profiling must be separately enabled." />
<Option name="log_transaction" code="404"
description="Enables tracing for this transaction and logs results to the client trace logs. The DEBUG_TRANSACTION_IDENTIFIER option must be set before using this option, and client trace logging must be enabled and to get log output." />
<Option name="timeout" code="500"
paramType="Int" paramDescription="value in milliseconds of timeout"
description="Set a timeout in milliseconds which, when elapsed, will cause the transaction automatically to be cancelled. Valid parameter values are ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and any future uses of the transaction will throw an exception. The transaction can be used again after it is reset. Like all transaction options, a timeout must be reset after a call to onError. This behavior allows the user to make the timeout dynamic." />

View File

@ -67,14 +67,18 @@ public:
if(offset >= fileSize)
return 0; // TODO: Should this throw since the input isn't really valid?
if(length == 0) {
return 0;
}
// If reading past the end then clip length to just read to the end
if(offset + length > fileSize)
length = fileSize - offset; // Length is at least 1 since offset < fileSize
// Calculate block range for the blocks that contain this data
state int firstBlockNum = offset / f->m_block_size;
state int lastBlockNum = (offset + length) / f->m_block_size;
state int blockNum;
ASSERT(f->m_block_size > 0);
state int lastBlockNum = (offset + length - 1) / f->m_block_size;
// Start reads (if needed) of the block range required for this read, plus the read ahead blocks
// The futures for the read started will be stored in the cache but since things can be evicted from
@ -84,8 +88,10 @@ public:
// Start blocks up to the read ahead size beyond the last needed block but don't go past the end of the file
state int lastBlockNumInFile = ((fileSize + f->m_block_size - 1) / f->m_block_size) - 1;
ASSERT(lastBlockNum <= lastBlockNumInFile);
int lastBlockToStart = std::min<int>(lastBlockNum + f->m_read_ahead_blocks, lastBlockNumInFile);
state int blockNum;
for(blockNum = firstBlockNum; blockNum <= lastBlockToStart; ++blockNum) {
Future<Reference<CacheBlock>> fblock;

View File

@ -52,27 +52,29 @@ struct DataDistributorInterface {
struct GetRateInfoRequest {
UID requesterID;
int64_t totalReleasedTransactions;
ReplyPromise<struct GetRateInfoReply> reply;
int64_t batchReleasedTransactions;
bool detailed;
ReplyPromise<struct GetRateInfoReply> reply;
GetRateInfoRequest() {}
GetRateInfoRequest( UID const& requesterID, int64_t totalReleasedTransactions, bool detailed )
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), detailed(detailed) {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, reply, detailed);
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply);
}
};
struct GetRateInfoReply {
double transactionRate;
double batchTransactionRate;
double leaseDuration;
HealthMetrics healthMetrics;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionRate, leaseDuration, healthMetrics);
serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics);
}
};

View File

@ -347,11 +347,15 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
bool smallStorageTarget = randomize && BUGGIFY;
init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3;
init( SPRING_BYTES_STORAGE_SERVER, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER = 300e3;
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 500e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 50e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
bool smallTlogTarget = randomize && BUGGIFY;
init( TARGET_BYTES_PER_TLOG, 2400e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3;
init( TARGET_BYTES_PER_TLOG_BATCH, 1000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG_BATCH = 1000e3;
init( SPRING_BYTES_TLOG_BATCH, 200e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG_BATCH = 100e3;
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3;
init( TLOG_RECOVER_MEMORY_LIMIT, TARGET_BYTES_PER_TLOG + SPRING_BYTES_TLOG );
@ -362,6 +366,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MIN_FREE_SPACE_RATIO, 0.05 );
init( MAX_TL_SS_VERSION_DIFFERENCE, 1e99 ); // if( randomize && BUGGIFY ) MAX_TL_SS_VERSION_DIFFERENCE = std::max(1.0, 0.25 * VERSIONS_PER_SECOND); // spring starts at half this value //FIXME: this knob causes ratekeeper to clamp on idle cluster in simulation that have a large number of logs
init( MAX_TL_SS_VERSION_DIFFERENCE_BATCH, 1e99 );
init( MAX_MACHINES_FALLING_BEHIND, 1 );
//Storage Metrics

View File

@ -286,10 +286,14 @@ public:
double LAST_LIMITED_RATIO;
int64_t TARGET_BYTES_PER_STORAGE_SERVER;
double SPRING_BYTES_STORAGE_SERVER;
int64_t SPRING_BYTES_STORAGE_SERVER;
int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH;
int64_t SPRING_BYTES_STORAGE_SERVER_BATCH;
int64_t TARGET_BYTES_PER_TLOG;
double SPRING_BYTES_TLOG;
int64_t SPRING_BYTES_TLOG;
int64_t TARGET_BYTES_PER_TLOG_BATCH;
int64_t SPRING_BYTES_TLOG_BATCH;
int64_t TLOG_SPILL_THRESHOLD;
int64_t TLOG_HARD_LIMIT_BYTES;
int64_t TLOG_RECOVER_MEMORY_LIMIT;
@ -300,6 +304,7 @@ public:
double MIN_FREE_SPACE_RATIO;
double MAX_TL_SS_VERSION_DIFFERENCE; // spring starts at half this value
double MAX_TL_SS_VERSION_DIFFERENCE_BATCH;
int MAX_MACHINES_FALLING_BEHIND;
//Storage Metrics

View File

@ -87,8 +87,8 @@ Future<Void> forwardValue(Promise<T> out, Future<T> in)
int getBytes(Promise<Version> const& r) { return 0; }
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -97,7 +97,10 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
state int64_t lastTC = 0;
if (db->get().distributor.present()) nextRequestTimer = Void();
if (db->get().distributor.present()) {
nextRequestTimer = Void();
}
loop choose {
when ( wait( db->onChange() ) ) {
if ( db->get().distributor.present() ) {
@ -113,13 +116,14 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, detailed)));
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed)));
expectingDetailedReply = detailed;
}
when ( GetRateInfoReply rep = wait(reply) ) {
reply = Never();
*outTransactionRate = rep.transactionRate;
// TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
*outBatchTransactionRate = rep.batchTransactionRate;
//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
@ -129,9 +133,10 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
lastDetailedReply = now();
}
}
when ( wait(leaseTimeout ) ) {
when ( wait( leaseTimeout ) ) {
*outTransactionRate = 0;
// TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("Lease", "Expired");
*outBatchTransactionRate = 0;
//TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("BatchRate", 0).detail("Lease", "Expired");
leaseTimeout = Never();
}
}
@ -1089,6 +1094,27 @@ ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
}
}
struct TransactionRateInfo {
double rate;
double budget;
double limit;
TransactionRateInfo(double rate) : rate(rate), budget(0), limit(0) {}
void reset(double elapsed) {
this->limit = std::min(rate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START) + budget;
}
bool canStart(int64_t numToStart, int64_t numAlreadyStarted) {
return numToStart + numAlreadyStarted < limit || numToStart * g_random->random01() + numAlreadyStarted < limit - std::max(0.0, budget);
}
void updateBudget(int64_t numStarted) {
budget = std::max(std::min<double>(limit - numStarted, SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE), -SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE);
}
};
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
GetReadVersionReply reply = wait(replyFuture);
double end = timer();
@ -1112,13 +1138,15 @@ ACTOR static Future<Void> transactionStarter(
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
state int64_t transactionCount = 0;
state double transactionBudget = 0;
state double transactionRate = 10;
state int64_t batchTransactionCount = 0;
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state std::priority_queue<std::pair<GetReadVersionRequest, int64_t>, std::vector<std::pair<GetReadVersionRequest, int64_t>>> transactionQueue;
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &transactionRate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
// Get a list of the other proxies that go together with us
@ -1141,7 +1169,9 @@ ACTOR static Future<Void> transactionStarter(
lastGRVTime = t;
if(elapsed == 0) elapsed = 1e-15; // resolve a possible indeterminant multiplication with infinite transaction rate
double nTransactionsToStart = std::min(transactionRate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START) + transactionBudget;
normalRateInfo.reset(elapsed);
batchRateInfo.reset(elapsed);
int transactionsStarted[2] = {0,0};
int systemTransactionsStarted[2] = {0,0};
@ -1152,13 +1182,17 @@ ACTOR static Future<Void> transactionStarter(
Optional<UID> debugID;
double leftToStart = 0;
double batchLeftToStart = 0;
while (!transactionQueue.empty()) {
auto& req = transactionQueue.top().first;
int tc = req.transactionCount;
leftToStart = nTransactionsToStart - transactionsStarted[0] - transactionsStarted[1];
bool startNext = tc < leftToStart || req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE || tc * g_random->random01() < leftToStart - std::max(0.0, transactionBudget);
if (!startNext) break;
if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
break;
}
else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
break;
}
if (req.debugID.present()) {
if (!debugID.present()) debugID = g_nondeterministic_random->randomUniqueID();
@ -1189,10 +1223,15 @@ ACTOR static Future<Void> transactionStarter(
.detail("NumSystemTransactionsStarted", systemTransactionsStarted[0] + systemTransactionsStarted[1])
.detail("NumNonSystemTransactionsStarted", transactionsStarted[0] + transactionsStarted[1] - systemTransactionsStarted[0] - systemTransactionsStarted[1])
.detail("TransactionBudget", transactionBudget)
.detail("LastLeftToStart", leftToStart);*/
.detail("BatchTransactionBudget", batchTransactionBudget)
.detail("LastLeftToStart", leftToStart)
.detail("LastBatchLeftToStart", batchLeftToStart);*/
transactionCount += transactionsStarted[0] + transactionsStarted[1];
transactionBudget = std::max(std::min(nTransactionsToStart - transactionsStarted[0] - transactionsStarted[1], SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE), -SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE);
batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");

View File

@ -85,11 +85,10 @@ struct StorageQueueInfo {
Smoother smoothDurableVersion, smoothLatestVersion;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
limitReason_t limitReason;
StorageQueueInfo(UID id, LocalityData locality) : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(1.), smoothLatestVersion(1.), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited)
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT)
{
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
@ -112,26 +111,63 @@ struct TLogQueueInfo {
}
};
struct RatekeeperLimits {
double tpsLimit;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle reasonMetric;
int64_t storageTargetBytes;
int64_t storageSpringBytes;
int64_t logTargetBytes;
int64_t logSpringBytes;
int64_t maxVersionDifference;
std::string context;
RatekeeperLimits(std::string context, int64_t storageTargetBytes, int64_t storageSpringBytes, int64_t logTargetBytes, int64_t logSpringBytes, int64_t maxVersionDifference) :
tpsLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)),
storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes),
logTargetBytes(logTargetBytes),
logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
context(context)
{}
};
struct TransactionCounts {
int64_t total;
int64_t batch;
double time;
TransactionCounts() : total(0), batch(0), time(0) {}
};
struct Ratekeeper {
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, std::pair<int64_t, double> > proxy_transactionCountAndTime;
Smoother smoothReleasedTransactions, smoothTotalDurableBytes;
double TPSLimit;
std::map<UID, TransactionCounts> proxy_transactionCounts;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
DatabaseConfiguration configuration;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle actualTpsMetric;
Int64MetricHandle reasonMetric;
double lastWarning;
double* lastLimited;
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), TPSLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(LiteralStringRef("Ratekeeper.TPSLimit")),
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
reasonMetric(LiteralStringRef("Ratekeeper.Reason")),
lastWarning(0)
lastWarning(0),
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE),
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH)
{}
};
@ -253,15 +289,15 @@ ACTOR Future<Void> trackEachStorageServer(
}
}
void updateRate( Ratekeeper* self ) {
void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
//double controlFactor = ; // dt / eFoldingTime
double actualTPS = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTPS;
double actualTps = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTps;
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
actualTPS = std::max( std::max( 1.0, actualTPS ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
self->TPSLimit = std::numeric_limits<double>::infinity();
limits.tpsLimit = std::numeric_limits<double>::infinity();
UID reasonID = UID();
limitReason_t limitReason = limitReason_t::unlimited;
@ -272,7 +308,8 @@ void updateRate( Ratekeeper* self ) {
int64_t worstStorageDurabilityLagStorageServer = 0;
int64_t limitingStorageQueueStorageServer = 0;
std::multimap<double, StorageQueueInfo*> storageTPSLimitReverseIndex;
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
std::map<UID, limitReason_t> ssReasons;
// Look at each storage server's write queue, compute and store the desired rate ratio
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
@ -280,19 +317,19 @@ void updateRate( Ratekeeper* self ) {
if (!ss.valid) continue;
++sscount;
ss.limitReason = limitReason_t::unlimited;
limitReason_t ssLimitReason = limitReason_t::unlimited;
int64_t minFreeSpace = std::max(SERVER_KNOBS->MIN_FREE_SPACE, (int64_t)(SERVER_KNOBS->MIN_FREE_SPACE_RATIO * ss.smoothTotalSpace.smoothTotal()));
worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.storageTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
ss.limitReason = limitReason_t::storage_server_min_free_space;
ssLimitReason = limitReason_t::storage_server_min_free_space;
} else {
ss.limitReason = limitReason_t::storage_server_min_free_space_ratio;
ssLimitReason = limitReason_t::storage_server_min_free_space_ratio;
}
}
@ -312,10 +349,11 @@ void updateRate( Ratekeeper* self ) {
double targetRateRatio = std::min(( b + springBytes ) / (double)springBytes, 2.0);
double inputRate = ss.smoothInputBytes.smoothRate();
//inputRate = std::max( inputRate, actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
/*if( g_random->random01() < 0.1 ) {
TraceEvent("RateKeeperUpdateRate", ss.id)
std::string name = "RateKeeperUpdateRate" + limits.context;
TraceEvent(name, ss.id)
.detail("MinFreeSpace", minFreeSpace)
.detail("SpringBytes", springBytes)
.detail("TargetBytes", targetBytes)
@ -325,7 +363,7 @@ void updateRate( Ratekeeper* self ) {
.detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal())
.detail("TargetRateRatio", targetRateRatio)
.detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate())
.detail("ActualTPS", actualTPS)
.detail("ActualTPS", actualTps)
.detail("InputRate", inputRate)
.detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate())
.detail("B", b);
@ -333,36 +371,38 @@ void updateRate( Ratekeeper* self ) {
// Don't let any storage server use up its target bytes faster than its MVCC window!
double maxBytesPerSecond = (targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0);
double limitTPS = std::min(actualTPS * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ss.limitReason == limitReason_t::unlimited)
ss.limitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
double limitTps = std::min(actualTps * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ssLimitReason == limitReason_t::unlimited)
ssLimitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
if (targetRateRatio > 0 && inputRate > 0) {
ASSERT(inputRate != 0);
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double x = smoothedRate / (inputRate * targetRateRatio);
double lim = actualTPS * x;
if (lim < limitTPS) {
limitTPS = lim;
if (ss.limitReason == limitReason_t::unlimited || ss.limitReason == limitReason_t::storage_server_write_bandwidth_mvcc)
ss.limitReason = limitReason_t::storage_server_write_queue_size;
double lim = actualTps * x;
if (lim < limitTps) {
limitTps = lim;
if (ssLimitReason == limitReason_t::unlimited || ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc)
ssLimitReason = limitReason_t::storage_server_write_queue_size;
}
}
storageTPSLimitReverseIndex.insert(std::make_pair(limitTPS, &ss));
storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss));
if(limitTPS < self->TPSLimit && (ss.limitReason == limitReason_t::storage_server_min_free_space || ss.limitReason == limitReason_t::storage_server_min_free_space_ratio)) {
if(limitTps < limits.tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
reasonID = ss.id;
self->TPSLimit = limitTPS;
limitReason = ss.limitReason;
limits.tpsLimit = limitTps;
limitReason = ssLimitReason;
}
ssReasons[ss.id] = ssLimitReason;
}
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer;
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
for(auto ss = storageTPSLimitReverseIndex.begin(); ss != storageTPSLimitReverseIndex.end() && ss->first < self->TPSLimit; ++ss) {
for(auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits.tpsLimit; ++ss) {
if(ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
ignoredMachines.insert(ss->second->locality.zoneId());
continue;
@ -372,9 +412,9 @@ void updateRate( Ratekeeper* self ) {
}
limitingStorageQueueStorageServer = ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal();
self->TPSLimit = ss->first;
limitReason = storageTPSLimitReverseIndex.begin()->second->limitReason;
reasonID = storageTPSLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
limits.tpsLimit = ss->first;
limitReason = ssReasons[storageTpsLimitReverseIndex.begin()->second->id];
reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
break;
}
@ -406,7 +446,7 @@ void updateRate( Ratekeeper* self ) {
}
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE/2) / (SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE/4);
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits.maxVersionDifference/2) / (limits.maxVersionDifference/4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer);
}
@ -425,9 +465,9 @@ void updateRate( Ratekeeper* self ) {
worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->SPRING_BYTES_TLOG, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->TARGET_BYTES_PER_TLOG, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != SERVER_KNOBS->TARGET_BYTES_PER_TLOG) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.logTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
tlogLimitReason = limitReason_t::log_server_min_free_space;
} else {
@ -447,7 +487,7 @@ void updateRate( Ratekeeper* self ) {
}
reasonID = tl.id;
limitReason = limitReason_t::log_server_min_free_space;
self->TPSLimit = 0.0;
limits.tpsLimit = 0.0;
}
double targetRateRatio = std::min( ( b + springBytes ) / (double)springBytes, 2.0 );
@ -460,13 +500,13 @@ void updateRate( Ratekeeper* self ) {
double inputRate = tl.smoothInputBytes.smoothRate();
if (targetRateRatio > 0) {
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double x = smoothedRate / (inputRate * targetRateRatio);
if (targetRateRatio < .75) //< FIXME: KNOB for 2.0
x = std::max(x, 0.95);
double lim = actualTPS * x;
if (lim < self->TPSLimit){
self->TPSLimit = lim;
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
reasonID = tl.id;
limitReason = tlogLimitReason;
}
@ -474,9 +514,9 @@ void updateRate( Ratekeeper* self ) {
if (inputRate > 0) {
// Don't let any tlogs use up its target bytes faster than its MVCC window!
double x = ((targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)) / inputRate;
double lim = actualTPS * x;
if (lim < self->TPSLimit){
self->TPSLimit = lim;
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
reasonID = tl.id;
limitReason = limitReason_t::log_server_mvcc_write_bandwidth;
}
@ -485,10 +525,10 @@ void updateRate( Ratekeeper* self ) {
self->healthMetrics.worstTLogQueue = worstStorageQueueTLog;
self->TPSLimit = std::max(self->TPSLimit, 0.0);
limits.tpsLimit = std::max(limits.tpsLimit, 0.0);
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
self->TPSLimit = std::max(self->TPSLimit, 100.0);
limits.tpsLimit = std::max(limits.tpsLimit, 100.0);
}
int64_t totalDiskUsageBytes = 0;
@ -499,22 +539,20 @@ void updateRate( Ratekeeper* self ) {
if (s.value.valid)
totalDiskUsageBytes += s.value.lastReply.storageBytes.used;
self->tpsLimitMetric = std::min(self->TPSLimit, 1e6);
self->reasonMetric = limitReason;
limits.tpsLimitMetric = std::min(limits.tpsLimit, 1e6);
limits.reasonMetric = limitReason;
if( self->smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self->TPSLimit ) {
(*self->lastLimited) = now();
}
if (g_random->random01() < 0.1){
TraceEvent("RkUpdate")
.detail("TPSLimit", self->TPSLimit)
if (g_random->random01() < 0.1) {
std::string name = "RkUpdate" + limits.context;
TraceEvent(name.c_str())
.detail("TPSLimit", limits.tpsLimit)
.detail("Reason", limitReason)
.detail("ReasonServerID", reasonID)
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTPS)
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTps)
.detail("StorageServers", sscount)
.detail("Proxies", self->proxy_transactionCountAndTime.size())
.detail("Proxies", self->proxy_transactionCounts.size())
.detail("TLogs", tlcount)
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
@ -524,7 +562,7 @@ void updateRate( Ratekeeper* self ) {
.detail("TotalDiskUsageBytes", totalDiskUsageBytes)
.detail("WorstStorageServerVersionLag", worstVersionLag)
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.trackLatest("RkUpdate");
.trackLatest(name.c_str());
}
}
@ -583,11 +621,18 @@ ACTOR Future<Void> rateKeeper(
choose {
when (wait( track )) { break; }
when (wait( timeout )) {
updateRate( &self );
updateRate(&self, self.normalLimits);
updateRate(&self, self.batchLimits);
if(self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit) {
*self.lastLimited = now();
}
double tooOld = now() - 1.0;
for(auto p=self.proxy_transactionCountAndTime.begin(); p!=self.proxy_transactionCountAndTime.end(); ) {
if (p->second.second < tooOld)
p = self.proxy_transactionCountAndTime.erase(p);
for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) {
if (p->second.time < tooOld)
p = self.proxy_transactionCounts.erase(p);
else
++p;
}
@ -596,19 +641,25 @@ ACTOR Future<Void> rateKeeper(
when (GetRateInfoRequest req = waitNext(getRateInfo)) {
GetRateInfoReply reply;
auto& p = self.proxy_transactionCountAndTime[ req.requesterID ];
auto& p = self.proxy_transactionCounts[ req.requesterID ];
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
if (p.first > 0)
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.first );
if (p.total > 0) {
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.total );
}
if(p.batch > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batch );
}
p.first = req.totalReleasedTransactions;
p.second = now();
p.total = req.totalReleasedTransactions;
p.batch = req.batchReleasedTransactions;
p.time = now();
reply.transactionRate = self.TPSLimit / self.proxy_transactionCountAndTime.size();
reply.transactionRate = self.normalLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
reply.healthMetrics.tpsLimit = self.TPSLimit;
reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
req.reply.send( reply );
}

View File

@ -842,8 +842,9 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
break;
}
} else {
set_config("log_version:=3"); // 6.1
set_config("log_spill:=2"); // REFERENCE
// FIXME: Temporarily disable spill-by-reference.
//set_config("log_version:=3"); // 6.1
//set_config("log_spill:=2"); // REFERENCE
}
if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) {

View File

@ -209,37 +209,6 @@ protected:
int64_t counter;
};
static double parseDouble(std::string const& s, bool permissive = false) {
double d = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lf%n", &d, &consumed);
if (r == 1 && (consumed == s.size() || permissive))
return d;
throw attribute_not_found();
}
static int parseInt(std::string const& s, bool permissive = false) {
long long int iLong = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &iLong, &consumed);
if (r == 1 && (consumed == s.size() || permissive)){
if (std::numeric_limits<int>::min() <= iLong && iLong <= std::numeric_limits<int>::max())
return (int)iLong; // Downcast definitely safe
else
throw attribute_too_large();
}
throw attribute_not_found();
}
static int64_t parseInt64(std::string const& s, bool permissive = false) {
long long int i = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &i, &consumed);
if (r == 1 && (consumed == s.size() || permissive))
return i;
throw attribute_not_found();
}
static JsonBuilderObject getLocalityInfo(const LocalityData& locality) {
JsonBuilderObject localityObj;
@ -345,8 +314,8 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
statusObj["memory"] = memoryObj;
JsonBuilderObject cpuObj;
double cpu_seconds = parseDouble(event.getValue("CPUSeconds"));
double elapsed = parseDouble(event.getValue("Elapsed"));
double cpu_seconds = event.getDouble("CPUSeconds");
double elapsed = event.getDouble("Elapsed");
if (elapsed > 0){
cpuObj["logical_core_utilization"] = std::max(0.0, std::min(cpu_seconds / elapsed, 1.0));
}
@ -356,7 +325,7 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
networkObj["megabits_sent"] = JsonBuilderObject().setKeyRawNumber("hz", event.getValue("MbpsSent"));
networkObj["megabits_received"] = JsonBuilderObject().setKeyRawNumber("hz", event.getValue("MbpsReceived"));
metric = parseDouble(event.getValue("RetransSegs"));
metric = event.getDouble("RetransSegs");
JsonBuilderObject retransSegsObj;
if (elapsed > 0){
retransSegsObj["hz"] = metric / elapsed;
@ -460,13 +429,13 @@ struct RolesInfo {
obj["mutation_bytes"] = StatusCounter(storageMetrics.getValue("MutationBytes")).getStatus();
obj["mutations"] = StatusCounter(storageMetrics.getValue("Mutations")).getStatus();
Version version = parseInt64(storageMetrics.getValue("Version"));
Version durableVersion = parseInt64(storageMetrics.getValue("DurableVersion"));
Version version = storageMetrics.getInt64("Version");
Version durableVersion = storageMetrics.getInt64("DurableVersion");
obj["data_version"] = version;
obj["durable_version"] = durableVersion;
int64_t versionLag = parseInt64(storageMetrics.getValue("VersionLag"));
int64_t versionLag = storageMetrics.getInt64("VersionLag");
if(maxTLogVersion > 0) {
// It's possible that the storage server hasn't talked to the logs recently, in which case it may not be aware of how far behind it is.
// To account for that, we also compute the version difference between each storage server and the tlog with the largest version.
@ -522,7 +491,7 @@ struct RolesInfo {
obj.setKeyRawNumber("queue_disk_total_bytes", tlogMetrics.getValue("QueueDiskBytesTotal"));
obj["input_bytes"] = StatusCounter(tlogMetrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(tlogMetrics.getValue("BytesDurable")).getStatus();
metricVersion = parseInt64(tlogMetrics.getValue("Version"));
metricVersion = tlogMetrics.getInt64("Version");
obj["data_version"] = metricVersion;
} catch (Error& e) {
if(e.code() != error_code_attribute_not_found)
@ -622,7 +591,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
if(memInfo->second.valid()) {
if(processMetrics.size() > 0) {
memInfo->second.memoryUsage += parseDouble(processMetrics.getValue("Memory"));
memInfo->second.memoryUsage += processMetrics.getDouble("Memory");
++memInfo->second.numProcesses;
}
else
@ -697,11 +666,11 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
statusObj.setKeyRawNumber("uptime_seconds",event.getValue("UptimeSeconds"));
// rates are calculated over the last elapsed seconds
double elapsed = parseDouble(event.getValue("Elapsed"));;
double cpu_seconds = parseDouble(event.getValue("CPUSeconds"));
double diskIdleSeconds = parseDouble(event.getValue("DiskIdleSeconds"));
double diskReads = parseDouble(event.getValue("DiskReads"));
double diskWrites = parseDouble(event.getValue("DiskWrites"));
double elapsed = event.getDouble("Elapsed");
double cpu_seconds = event.getDouble("CPUSeconds");
double diskIdleSeconds = event.getDouble("DiskIdleSeconds");
double diskReads = event.getDouble("DiskReads");
double diskWrites = event.getDouble("DiskWrites");
JsonBuilderObject diskObj;
if (elapsed > 0){
@ -779,7 +748,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
// if this process address is in the machine metrics
if (mMetrics.count(address) && mMetrics[address].size()){
double availableMemory;
availableMemory = parseDouble(mMetrics[address].getValue("AvailableMemory"));
availableMemory = mMetrics[address].getDouble("AvailableMemory");
auto machineMemInfo = machineMemoryUsage[workerItr->first.locality.machineId()];
if (machineMemInfo.valid()) {
@ -883,7 +852,7 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(std::pair<Work
try {
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = parseInt( md.getValue("StatusCode") );
state int mStatusCode = md.getInt("StatusCode");
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -1162,9 +1131,9 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterfa
// If we have a MovingData message, parse it.
if (md.size())
{
int64_t partitionsInQueue = parseInt64(md.getValue("InQueue"));
int64_t partitionsInFlight = parseInt64(md.getValue("InFlight"));
int64_t averagePartitionSize = parseInt64(md.getValue("AverageShardSize"));
int64_t partitionsInQueue = md.getInt64("InQueue");
int64_t partitionsInFlight = md.getInt64("InFlight");
int64_t averagePartitionSize = md.getInt64("AverageShardSize");
if( averagePartitionSize >= 0 ) {
JsonBuilderObject moving_data;
@ -1192,8 +1161,8 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterfa
continue;
}
bool primary = parseInt(inFlight.getValue("Primary"));
int highestPriority = parseInt(inFlight.getValue("HighestPriority"));
bool primary = inFlight.getInt("Primary");
int highestPriority = inFlight.getInt("HighestPriority");
JsonBuilderObject team_tracker;
team_tracker["primary"] = primary;
@ -1388,6 +1357,30 @@ static int getExtraTLogEligibleMachines(const vector<std::pair<WorkerInterface,
return extraTlogEligibleMachines;
}
JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transPerSec, double tpsLimit) {
int reason = ratekeeper.getInt("Reason");
JsonBuilderObject perfLimit;
if (transPerSec > tpsLimit * 0.8) {
// If reason is known, set qos.performance_limited_by, otherwise omit
if (reason >= 0 && reason < limitReasonEnd) {
perfLimit = JsonString::makeMessage(limitReasonName[reason], limitReasonDesc[reason]);
std::string reason_server_id = ratekeeper.getValue("ReasonServerID");
if (!reason_server_id.empty())
perfLimit["reason_server_id"] = reason_server_id;
}
}
else {
perfLimit = JsonString::makeMessage("workload", "The database is not being saturated by the workload.");
}
if(!perfLimit.empty()) {
perfLimit["reason_id"] = reason;
}
return perfLimit;
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::pair<WorkerInterface, ProcessClass> ddWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
@ -1440,50 +1433,46 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Transactions
try {
TraceEventFields md = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
double tpsLimit = parseDouble(md.getValue("TPSLimit"));
double transPerSec = parseDouble(md.getValue("ReleasedTPS"));
int ssCount = parseInt(md.getValue("StorageServers"));
int tlogCount = parseInt(md.getValue("TLogs"));
int64_t worstFreeSpaceStorageServer = parseInt64(md.getValue("WorstFreeSpaceStorageServer"));
int64_t worstFreeSpaceTLog = parseInt64(md.getValue("WorstFreeSpaceTLog"));
(*data_overlay).setKeyRawNumber("total_disk_used_bytes",md.getValue("TotalDiskUsageBytes"));
state TraceEventFields ratekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
double tpsLimit = ratekeeper.getDouble("TPSLimit");
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
int ssCount = ratekeeper.getInt("StorageServers");
int tlogCount = ratekeeper.getInt("TLogs");
int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
int64_t worstFreeSpaceTLog = ratekeeper.getInt64("WorstFreeSpaceTLog");
(*data_overlay).setKeyRawNumber("total_disk_used_bytes",ratekeeper.getValue("TotalDiskUsageBytes"));
if(ssCount > 0) {
(*data_overlay)["least_operating_space_bytes_storage_server"] = std::max(worstFreeSpaceStorageServer, (int64_t)0);
(*qos).setKeyRawNumber("worst_queue_bytes_storage_server",md.getValue("WorstStorageServerQueue"));
(*qos).setKeyRawNumber("limiting_queue_bytes_storage_server",md.getValue("LimitingStorageServerQueue"));
(*qos).setKeyRawNumber("worst_version_lag_storage_server",md.getValue("WorstStorageServerVersionLag"));
(*qos).setKeyRawNumber("limiting_version_lag_storage_server",md.getValue("LimitingStorageServerVersionLag"));
(*qos).setKeyRawNumber("worst_queue_bytes_storage_server", ratekeeper.getValue("WorstStorageServerQueue"));
(*qos).setKeyRawNumber("limiting_queue_bytes_storage_server", ratekeeper.getValue("LimitingStorageServerQueue"));
(*qos).setKeyRawNumber("worst_version_lag_storage_server", ratekeeper.getValue("WorstStorageServerVersionLag"));
(*qos).setKeyRawNumber("limiting_version_lag_storage_server", ratekeeper.getValue("LimitingStorageServerVersionLag"));
}
if(tlogCount > 0) {
(*data_overlay)["least_operating_space_bytes_log_server"] = std::max(worstFreeSpaceTLog, (int64_t)0);
(*qos).setKeyRawNumber("worst_queue_bytes_log_server",md.getValue("WorstTLogQueue"));
(*qos).setKeyRawNumber("worst_queue_bytes_log_server", ratekeeper.getValue("WorstTLogQueue"));
}
(*qos)["transactions_per_second_limit"] = tpsLimit;
(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
(*qos)["released_transactions_per_second"] = transPerSec;
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
int reason = parseInt(md.getValue("Reason"));
JsonBuilderObject perfLimit;
if (transPerSec > tpsLimit * 0.8) {
// If reason is known, set qos.performance_limited_by, otherwise omit
if (reason >= 0 && reason < limitReasonEnd) {
perfLimit = JsonString::makeMessage(limitReasonName[reason], limitReasonDesc[reason]);
std::string reason_server_id = md.getValue("ReasonServerID");
if (!reason_server_id.empty())
perfLimit["reason_server_id"] = reason_server_id;
}
}
else {
perfLimit = JsonString::makeMessage("workload", "The database is not being saturated by the workload.");
}
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
if(!perfLimit.empty()) {
perfLimit["reason_id"] = reason;
(*qos)["performance_limited_by"] = perfLimit;
}
JsonBuilderObject batchPerfLimit = getPerfLimit(batchRatekeeper, transPerSec, batchTpsLimit);
if(!batchPerfLimit.empty()) {
(*qos)["batch_performance_limited_by"] = batchPerfLimit;
}
} catch (Error &e){
if (e.code() == error_code_actor_cancelled)
throw;

View File

@ -1227,7 +1227,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int safe_range_end = logSet->tLogReplicationFactor - absent;
if( !lastEnd.present() || ((safe_range_end > 0) && (safe_range_end-1 < results.size()) && results[ safe_range_end-1 ].end < lastEnd.get()) ) {
Version knownCommittedVersion = results[ new_safe_range_begin ].end - (g_network->isSimulated() ? 10*SERVER_KNOBS->VERSIONS_PER_SECOND : SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS); //In simulation this must be the maximum MAX_READ_TRANSACTION_LIFE_VERSIONS
Version knownCommittedVersion = 0;
for(int i = 0; i < results.size(); i++) {
knownCommittedVersion = std::max(knownCommittedVersion, results[i].knownCommittedVersion);
}

View File

@ -26,8 +26,7 @@ fdbserver_LIBS := lib/libfdbclient.a lib/libfdbrpc.a lib/libflow.a $(FDB_TLS_LIB
fdbserver_STATIC_LIBS := $(TLS_LIBS)
ifeq ($(PLATFORM),linux)
fdbserver_LIBS += -ldl -lpthread -lrt
fdbserver_LDFLAGS += -static-libstdc++ -static-libgcc
fdbserver_LDFLAGS += -ldl -lpthread -lrt -static-libstdc++ -static-libgcc
# GPerfTools profiler (uncomment to use)
# fdbserver_CFLAGS += -I/opt/gperftools/include -DUSE_GPERFTOOLS=1

View File

@ -590,7 +590,7 @@ public:
}
double getPenalty() {
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
}
};

View File

@ -150,8 +150,10 @@ int getOption( VectorRef<KeyValueRef> options, Key key, int defaultValue) {
if( sscanf(options[i].value.toString().c_str(), "%d", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;
@ -164,8 +166,10 @@ uint64_t getOption( VectorRef<KeyValueRef> options, Key key, uint64_t defaultVal
if( sscanf(options[i].value.toString().c_str(), "%lld", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;
@ -178,8 +182,10 @@ int64_t getOption( VectorRef<KeyValueRef> options, Key key, int64_t defaultValue
if( sscanf(options[i].value.toString().c_str(), "%lld", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;

View File

@ -14,8 +14,9 @@ SSSSSSSSSS - 10 bytes Version Stamp
RRRRRRRRRRRRRRRR - 16 bytes Transaction id
NNNN - 4 Bytes Chunk number (Big Endian)
TTTT - 4 Bytes Total number of chunks (Big Endian)
XXXX - Variable length user provided transaction identifier
*/
StringRef sampleTrInfoKey = LiteralStringRef("\xff\x02/fdbClientInfo/client_latency/SSSSSSSSSS/RRRRRRRRRRRRRRRR/NNNNTTTT/");
StringRef sampleTrInfoKey = LiteralStringRef("\xff\x02/fdbClientInfo/client_latency/SSSSSSSSSS/RRRRRRRRRRRRRRRR/NNNNTTTT/XXXX/");
static const auto chunkNumStartIndex = sampleTrInfoKey.toString().find('N');
static const auto numChunksStartIndex = sampleTrInfoKey.toString().find('T');
static const int chunkFormatSize = 4;

View File

@ -1092,7 +1092,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
contract = {
std::make_pair( error_code_invalid_option_value, ExceptionContract::Possible ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::possibleIf((FDBTransactionOptions::Option)op == FDBTransactionOptions::READ_YOUR_WRITES_DISABLE) ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::possibleIf((FDBTransactionOptions::Option)op == FDBTransactionOptions::READ_YOUR_WRITES_DISABLE ||
(FDBTransactionOptions::Option)op == FDBTransactionOptions::LOG_TRANSACTION) ),
std::make_pair( error_code_read_version_already_set, ExceptionContract::possibleIf((FDBTransactionOptions::Option)op == FDBTransactionOptions::INITIALIZE_NEW_DATABASE) )
};
}

View File

@ -96,6 +96,9 @@ struct ReadWriteWorkload : KVWorkload {
bool useRYW;
bool rampTransactionType;
bool rampUpConcurrency;
bool batchPriority;
Standalone<StringRef> descriptionString;
Int64MetricHandle totalReadsMetric;
Int64MetricHandle totalRetriesMetric;
@ -174,6 +177,8 @@ struct ReadWriteWorkload : KVWorkload {
rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false);
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
doSetup = getOption(options, LiteralStringRef("setup"), true);
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
if (rampUpConcurrency) ASSERT( rampSweepCount == 2 ); // Implementation is hard coded to ramp up and down
@ -213,7 +218,7 @@ struct ReadWriteWorkload : KVWorkload {
}
}
virtual std::string description() { return "ReadWrite"; }
virtual std::string description() { return descriptionString.toString(); }
virtual Future<Void> setup( Database const& cx ) { return _setup( cx, this ); }
virtual Future<Void> start( Database const& cx ) { return _start( cx, this ); }
@ -304,6 +309,13 @@ struct ReadWriteWorkload : KVWorkload {
return KeyValueRef( keyForIndex( n, false ), randomValue() );
}
template <class Trans>
void setupTransaction(Trans *tr) {
if(batchPriority) {
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
}
}
ACTOR static Future<Void> tracePeriodically( ReadWriteWorkload *self ) {
state double start = now();
state double elapsed = 0.0;
@ -313,10 +325,10 @@ struct ReadWriteWorkload : KVWorkload {
elapsed += self->periodicLoggingInterval;
wait( delayUntil(start + elapsed) );
TraceEvent("RW_RowReadLatency").detail("Mean", self->readLatencies.mean()).detail("Median", self->readLatencies.median()).detail("Percentile5", self->readLatencies.percentile(.05)).detail("Percentile95", self->readLatencies.percentile(.95)).detail("Count", self->readLatencyCount).detail("Elapsed", elapsed);
TraceEvent("RW_GRVLatency").detail("Mean", self->GRVLatencies.mean()).detail("Median", self->GRVLatencies.median()).detail("Percentile5", self->GRVLatencies.percentile(.05)).detail("Percentile95", self->GRVLatencies.percentile(.95));
TraceEvent("RW_CommitLatency").detail("Mean", self->commitLatencies.mean()).detail("Median", self->commitLatencies.median()).detail("Percentile5", self->commitLatencies.percentile(.05)).detail("Percentile95", self->commitLatencies.percentile(.95));
TraceEvent("RW_TotalLatency").detail("Mean", self->latencies.mean()).detail("Median", self->latencies.median()).detail("Percentile5", self->latencies.percentile(.05)).detail("Percentile95", self->latencies.percentile(.95));
TraceEvent((self->description() + "_RowReadLatency").c_str()).detail("Mean", self->readLatencies.mean()).detail("Median", self->readLatencies.median()).detail("Percentile5", self->readLatencies.percentile(.05)).detail("Percentile95", self->readLatencies.percentile(.95)).detail("Count", self->readLatencyCount).detail("Elapsed", elapsed);
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->GRVLatencies.mean()).detail("Median", self->GRVLatencies.median()).detail("Percentile5", self->GRVLatencies.percentile(.05)).detail("Percentile95", self->GRVLatencies.percentile(.95));
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->commitLatencies.mean()).detail("Median", self->commitLatencies.median()).detail("Percentile5", self->commitLatencies.percentile(.05)).detail("Percentile95", self->commitLatencies.percentile(.95));
TraceEvent((self->description() + "_TotalLatency").c_str()).detail("Mean", self->latencies.mean()).detail("Median", self->latencies.median()).detail("Percentile5", self->latencies.percentile(.05)).detail("Percentile95", self->latencies.percentile(.95));
int64_t ops = (self->aTransactions.getValue() * (self->readsPerTransactionA+self->writesPerTransactionA)) +
(self->bTransactions.getValue() * (self->readsPerTransactionB+self->writesPerTransactionB));
@ -456,7 +468,9 @@ struct ReadWriteWorkload : KVWorkload {
state double startTime = now();
loop {
state Transaction tr(cx);
try {
self->setupTransaction(&tr);
wait( self->readOp( &tr, keys, self, false ) );
wait( tr.warmRange( cx, allKeys ) );
break;
@ -564,6 +578,7 @@ struct ReadWriteWorkload : KVWorkload {
extra_ranges.push_back(singleKeyRange( g_random->randomUniqueID().toString() ));
state Trans tr(cx);
if(tstart - self->clientBegin > self->debugTime && tstart - self->clientBegin <= self->debugTime + self->debugInterval) {
debugID = g_random->randomUniqueID();
tr.debugTransaction(debugID);
@ -578,6 +593,8 @@ struct ReadWriteWorkload : KVWorkload {
loop{
try {
self->setupTransaction(&tr);
GRVStartTime = now();
self->transactionFailureMetric->startLatency = -1;

View File

@ -357,7 +357,7 @@ struct WriteDuringReadWorkload : TestWorkload {
}
}
ACTOR Future<Void> commitAndUpdateMemory( ReadYourWritesTransaction *tr, WriteDuringReadWorkload* self, bool *cancelled, bool readYourWritesDisabled, bool snapshotRYWDisabled, bool readAheadDisabled, bool* doingCommit, double* startTime, Key timebombStr ) {
ACTOR Future<Void> commitAndUpdateMemory( ReadYourWritesTransaction *tr, WriteDuringReadWorkload* self, bool *cancelled, bool readYourWritesDisabled, bool snapshotRYWDisabled, bool readAheadDisabled, bool useBatchPriority, bool* doingCommit, double* startTime, Key timebombStr ) {
state UID randomID = g_nondeterministic_random->randomUniqueID();
//TraceEvent("WDRCommit", randomID);
try {
@ -407,6 +407,8 @@ struct WriteDuringReadWorkload : TestWorkload {
tr->setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE);
if(readAheadDisabled)
tr->setOption(FDBTransactionOptions::READ_AHEAD_DISABLE);
if(useBatchPriority)
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
if(self->useSystemKeys)
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->addWriteConflictRange( self->conflictRange );
@ -574,6 +576,7 @@ struct WriteDuringReadWorkload : TestWorkload {
state bool readYourWritesDisabled = g_random->random01() < 0.5;
state bool readAheadDisabled = g_random->random01() < 0.5;
state bool snapshotRYWDisabled = g_random->random01() < 0.5;
state bool useBatchPriority = g_random->random01() < 0.5;
state int64_t timebomb = g_random->random01() < 0.01 ? g_random->randomInt64(1, 6000) : 0;
state std::vector<Future<Void>> operations;
state ActorCollection commits(false);
@ -614,6 +617,8 @@ struct WriteDuringReadWorkload : TestWorkload {
tr.setOption( FDBTransactionOptions::SNAPSHOT_RYW_DISABLE );
if( readAheadDisabled )
tr.setOption( FDBTransactionOptions::READ_AHEAD_DISABLE );
if( useBatchPriority )
tr.setOption( FDBTransactionOptions::PRIORITY_BATCH );
if( self->useSystemKeys )
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::TIMEOUT, timebombStr );
@ -647,7 +652,7 @@ struct WriteDuringReadWorkload : TestWorkload {
g_random->random01() > 0.5, readYourWritesDisabled, snapshotRYWDisabled, self, &doingCommit, &memLimit ) );
} else if( operationType == 3 && !disableCommit ) {
if( !self->rarelyCommit || g_random->random01() < 1.0 / self->numOps ) {
Future<Void> commit = self->commitAndUpdateMemory( &tr, self, &cancelled, readYourWritesDisabled, snapshotRYWDisabled, readAheadDisabled, &doingCommit, &startTime, timebombStr );
Future<Void> commit = self->commitAndUpdateMemory( &tr, self, &cancelled, readYourWritesDisabled, snapshotRYWDisabled, readAheadDisabled, useBatchPriority, &doingCommit, &startTime, timebombStr );
operations.push_back( commit );
commits.add( commit );
}

View File

@ -55,7 +55,7 @@ using namespace boost::asio::ip;
//
// xyzdev
// vvvv
const uint64_t currentProtocolVersion = 0x0FDB00B061030001LL;
const uint64_t currentProtocolVersion = 0x0FDB00B061040001LL;
const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -1120,10 +1120,95 @@ std::string TraceEventFields::getValue(std::string key) const {
return value;
}
else {
TraceEvent ev(SevWarn, "TraceEventFieldNotFound");
if(tryGetValue("Type", value)) {
ev.detail("Event", value);
}
ev.detail("FieldName", key);
throw attribute_not_found();
}
}
namespace {
void parseNumericValue(std::string const& s, double &outValue, bool permissive = false) {
double d = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lf%n", &d, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
outValue = d;
return;
}
throw attribute_not_found();
}
void parseNumericValue(std::string const& s, int &outValue, bool permissive = false) {
long long int iLong = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &iLong, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
if (std::numeric_limits<int>::min() <= iLong && iLong <= std::numeric_limits<int>::max()) {
outValue = (int)iLong; // Downcast definitely safe
return;
}
else {
throw attribute_too_large();
}
}
throw attribute_not_found();
}
void parseNumericValue(std::string const& s, int64_t &outValue, bool permissive = false) {
long long int i = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &i, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
outValue = i;
return;
}
throw attribute_not_found();
}
template<class T>
T getNumericValue(TraceEventFields const& fields, std::string key, bool permissive) {
std::string field = fields.getValue(key);
try {
T value;
parseNumericValue(field, value, permissive);
return value;
}
catch(Error &e) {
std::string type;
TraceEvent ev(SevWarn, "ErrorParsingNumericTraceEventField");
ev.error(e);
if(fields.tryGetValue("Type", type)) {
ev.detail("Event", type);
}
ev.detail("FieldName", key);
ev.detail("FieldValue", field);
throw;
}
}
} // namespace
int TraceEventFields::getInt(std::string key, bool permissive) const {
return getNumericValue<int>(*this, key, permissive);
}
int64_t TraceEventFields::getInt64(std::string key, bool permissive) const {
return getNumericValue<int64_t>(*this, key, permissive);
}
double TraceEventFields::getDouble(std::string key, bool permissive) const {
return getNumericValue<double>(*this, key, permissive);
}
std::string TraceEventFields::toString() const {
std::string str;
bool first = true;

View File

@ -71,6 +71,9 @@ public:
const Field &operator[] (int index) const;
bool tryGetValue(std::string key, std::string &outValue) const;
std::string getValue(std::string key) const;
int getInt(std::string key, bool permissive=false) const;
int64_t getInt64(std::string key, bool permissive=false) const;
double getDouble(std::string key, bool permissive=false) const;
std::string toString() const;
void validateFormat() const;